服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - 基于RocketMQ推拉模式详解

基于RocketMQ推拉模式详解

2021-09-25 01:08mingxungu Java教程

这篇文章主要介绍了RocketMQ推拉模式的使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

消费者客户端有两种方式从消息中间件获取消息并消费。严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。

通过 Pull 不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。

1、概述

1.1、PULL方式

由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的拉取频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。

如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

1.2、PUSH方式

由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。

但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。

概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;

2、PUSH模式

主动推送的模式实现起来简单,避免了拉取的消费端业务逻辑的复杂度,消息的消费可以认为是实时的,同时也存在一定的弊端,要求消费端要有很强的消费能力。

2.1、代码实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Consumer1 {   
    public static void main(String[] args){
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup("consumer_push");
            consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
            consumer.subscribe("TopicTest", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently(){
 
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
                        ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
                    try {
                        for(MessageExt msg : paramList){
                            String msgbody = new String(msg.getBody(), "utf-8");
                            SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
                            Date date = new Date(msg.getStoreTimestamp());
                            System.out.println("Consumer1===  存入时间 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//输出消息内容
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
                }
            });
            consumer.start();
            System.out.println("Consumer1===启动成功!");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

PUSH消费方式,需要注册一个监听器Listener,,用来监听最新的消息,进行业务处理,同时反馈消息的消费状态,消费成功(CONSUME_SUCCESS)、消费重试(RECONSUME_LATER),消息重试会根据配置的消息的延迟等级的时间间隔,定时重新发送消费失败的记录。(PS:延迟消息中会重点讨论)

PUSH消息方式由于返回了消息的状态,服务端会维护每个消费端的消费进度,内部会记录消费进度,消息发送成功后会更新消费进度。

PUSH消息方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。

上一个章节说明了服务端存储的每个主题对应的消费组的每个消息队列的偏移量

查看服务器文件上的消费进度信息:

/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json

基于RocketMQ推拉模式详解

3、PULL模式

3.1、代码实现(1)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class PullConsumer {
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
 
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pullConsumer");
        consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        consumer.start();
 
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            
            SINGLE_MQ: while (true) {
                try {
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println("=============================================================");
                    System.out.println("Consume from the queue: " + mq + "offset:" + getMessageQueueOffset(mq) + "结果:" + pullResult.getPullStatus());
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                        for (MessageExt m : messageExtList) {
                            System.out.print(new String(m.getBody()) +" == ");
                        }
                        System.out.println("");
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
 
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }
 
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
}

结果:

基于RocketMQ推拉模式详解

每次拉取消息的时候需要提供偏移量和拉取的消息的个数,需要自己业务实现每个主题下的队列的消费进度。

代码实现(1)这种方式只能拉取历史的消息,最新的消息拉取不了,也可以进行改造,来实现一直拉取。

3.2、代码实现(2)

在MQPullConsumer这个类里面,有一个MessageQueueListener,它的目的就是当queue发生变化的时候,通知Consumer。也正是这个借口,帮助我们在Pull模式里面,实现负载均衡。

注意,这个接口在MQPushConsumer里面是没有的,那里面有的是上面代码里的MessageListener。

?
1
2
3
4
5
void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
public interface MessageQueueListener {
    void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
                             final Set<MessageQueue> mqDivided);
}

有了这个Listener,我们就可以动态的知道当前的Consumer分摊到了几个MessageQueue。然后对这些MessageQueue,我们可以开个线程池来消费。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class PullConsumerExtend {
    public static void main(String[] args) throws MQClientException {
           //消费组
            final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("pullConsumer");
           //MQ NameService地址
            scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
           //负载均衡模式
            scheduleService.setMessageModel(MessageModel.CLUSTERING);
           //需要处理的消息topic
            scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {
 
                @Override
                public void doPullTask(MessageQueue mq, PullTaskContext context) {
                    MQPullConsumer consumer = context.getPullConsumer();
                    try {
                        
                        long offset = consumer.fetchConsumeOffset(mq, false);
                        if (offset < 0)
                            offset = 0;
                        PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                        System.out.println("");
                        System.out.println("Consume from the queue: " + mq + "offset:" + offset + "结果:" + pullResult.getPullStatus());
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                                for (MessageExt m : messageExtList) {
                                    System.out.print(new String(m.getBody()) +" == ");
                                }
                                break;
                            case NO_MATCHED_MSG:
                                break;
                            case NO_NEW_MSG:
                            case OFFSET_ILLEGAL:
                                break;
                            default:
                                break;
                        }
                        consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                        //设置下一下拉取的间隔时间
                        context.setPullNextDelayTimeMillis(10000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            scheduleService.start();
    }
}

结果:

基于RocketMQ推拉模式详解

比较**代码实现(1)**这种方式改进了很多,不需要业务维护每个消费队列的消费进度,可以更新到服务端的。

弊端也很明显就是每次队列拉取消息的时间间隔,时间长导致消息挤压,时间段消息少,影响服务端性能。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。

原文链接:https://my.oschina.net/mingxungu/blog/3083956

延伸 · 阅读

精彩推荐
  • Java教程Java实现抢红包功能

    Java实现抢红包功能

    这篇文章主要为大家详细介绍了Java实现抢红包功能,采用多线程模拟多人同时抢红包,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙...

    littleschemer13532021-05-16
  • Java教程升级IDEA后Lombok不能使用的解决方法

    升级IDEA后Lombok不能使用的解决方法

    最近看到提示IDEA提示升级,寻思已经有好久没有升过级了。升级完毕重启之后,突然发现好多错误,本文就来介绍一下如何解决,感兴趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程xml与Java对象的转换详解

    xml与Java对象的转换详解

    这篇文章主要介绍了xml与Java对象的转换详解的相关资料,需要的朋友可以参考下...

    Java教程网2942020-09-17
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    这篇文章主要介绍了Java使用SAX解析xml的示例,帮助大家更好的理解和学习使用Java,感兴趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程20个非常实用的Java程序代码片段

    20个非常实用的Java程序代码片段

    这篇文章主要为大家分享了20个非常实用的Java程序片段,对java开发项目有所帮助,感兴趣的小伙伴们可以参考一下 ...

    lijiao5352020-04-06
  • Java教程小米推送Java代码

    小米推送Java代码

    今天小编就为大家分享一篇关于小米推送Java代码,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    富贵稳中求8032021-07-12
  • Java教程Java8中Stream使用的一个注意事项

    Java8中Stream使用的一个注意事项

    最近在工作中发现了对于集合操作转换的神器,java8新特性 stream,但在使用中遇到了一个非常重要的注意点,所以这篇文章主要给大家介绍了关于Java8中S...

    阿杜7472021-02-04
  • Java教程Java BufferWriter写文件写不进去或缺失数据的解决

    Java BufferWriter写文件写不进去或缺失数据的解决

    这篇文章主要介绍了Java BufferWriter写文件写不进去或缺失数据的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望...

    spcoder14552021-10-18