首页 > 其他 > 详细

【mq读书笔记】消息队列负载与重新分配

时间:2020-02-04 09:21:02      阅读:121      评论:0      收藏:0      [点我收藏+]

回顾PullMessageService#run:

 

技术分享图片

 

如果队列总没有PullRequest对象,线程将阻塞。

围绕PullRequest有2个问题:

1.PullRequest对象在什么时候创建并加入pullRequestQueue中以便唤醒PullMessageService县城

2.集群内多个消费者如何负载主题下的多个消费队列,并且如果有新的消费者加入时,消息队列又会如何重新分布。

重新分布实现:RebalanceService,一个MQClientInstance持有一个RebalanceService实现,并随MQClientInstance启动而启动。

 

技术分享图片

 

默认每隔20s rebalance一次。

技术分享图片

 

遍历已注册的消费者(这个consumerTable是怎么来的),对消费者执行doRebalance

  public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();//在消费者调用subscribe方法时填充。
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }

每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象,该方法遍历订阅信息对每个主题的队列进行重新负载。

RebalanceImpl#rebalanceByTopic:

技术分享图片

 

从主题订阅信息缓存表中获取主体的队列消息,发送请求从Broker中该消费组内当前所有的消费者客户端ID,主题topic的队列可能分布在多个Broker上,那请求发往哪个Broker呢?

答案是随机选择一个,Broker为什么会存在消费组内所有消费者的信息呢?MQClientInstance会向所有的Broker发送心跳包,心跳中包含MQClientInstance的消费者信息。

 

 

技术分享图片

 

allocate()一共有5种分配算法:

 

【mq读书笔记】消息队列负载与重新分配

原文:https://www.cnblogs.com/lccsblog/p/12258191.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!