场景:2 个消费者进程中,创建了 2 个消费者,同属于 1 个消费组,但是订阅了不同的 topic,会因为订阅信息相互覆盖,导致拉不到消息。
原因是 rocketMQ 的订阅关系,是根据 group 来管理的,c1 订阅 t1,c2 订阅 t2,他们同属于 group,当 c1 拉取 t1 的消息时,broker 发现 group 订阅的是 t2,就不会返回消息给 c1。
client 在 MQClientInstance 中发送心跳给所有的 broker,心跳中包含 consumer 的订阅信息,
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// org.apache.rocketmq.client.impl.factory.MQClientInstance#prepareHeartbeatData private HeartbeatData prepareHeartbeatData() { HeartbeatData heartbeatData = new HeartbeatData(); // clientID heartbeatData.setClientID(this.clientId); // Consumer for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { ConsumerData consumerData = new ConsumerData(); consumerData.setGroupName(impl.groupName()); consumerData.setConsumeType(impl.consumeType()); consumerData.setMessageModel(impl.messageModel()); consumerData.setConsumeFromWhere(impl.consumeFromWhere()); consumerData.getSubscriptionDataSet().addAll(impl.subscriptions()); consumerData.setUnitMode(impl.isUnitMode()); heartbeatData.getConsumerDataSet().add(consumerData); } } // Producer for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); if (impl != null) { ProducerData producerData = new ProducerData(); producerData.setGroupName(entry.getKey()); heartbeatData.getProducerDataSet().add(producerData); } } return heartbeatData; }
broker 处理心跳请求
// org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(null); HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( ctx.channel(), heartbeatData.getClientID(), request.getLanguage(), request.getVersion() ); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName()); boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); int topicSysFlag = 0; if (data.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } String newTopic = MixAll.getRetryTopic(data.getGroupName()); this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); if (changed) { log.info("registerConsumer info changed {} {}", data.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()) ); } } for (ProducerData data : heartbeatData.getProducerDataSet()) { this.brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
broker 比较当前 consumer group 的订阅信息,如果当前消费组加入了新的消费者,或者订阅的 topic 发生了变化,则通知消费者进行 rebalance
// org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); // 本文讨论的场景,重点在这,group 的订阅数据被覆盖 boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2; }
发生 ConsumerGroupEvent.CHANGE 事件,broker 通知 client 进行 rebalance
// org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener#handle public void handle(ConsumerGroupEvent event, String group, Object... args) { if (event == null) { return; } switch (event) { case CHANGE: if (args == null || args.length < 1) { return; } List<Channel> channels = (List<Channel>) args[0]; if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { for (Channel chl : channels) { this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); } } break; case UNREGISTER: this.brokerController.getConsumerFilterManager().unRegister(group); break; case REGISTER: if (args == null || args.length < 1) { return; } Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0]; this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList); break; default: throw new RuntimeException("Unknown event " + event); } }
broker 向 client 发送 NOTIFY_CONSUMER_IDS_CHANGED 请求
// org.apache.rocketmq.broker.client.net.Broker2Client#notifyConsumerIdsChanged public void notifyConsumerIdsChanged( final Channel channel, final String consumerGroup) { if (null == consumerGroup) { log.error("notifyConsumerIdsChanged consumerGroup is null"); return; } NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); } }
client 收到 NOTIFY_CONSUMER_IDS_CHANGED 请求,触发 rebalance
// org.apache.rocketmq.client.impl.ClientRemotingProcessor#notifyConsumerIdsChanged public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { try { final NotifyConsumerIdsChangedRequestHeader requestHeader = (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class); log.info("receive broker‘s notification[{}], the consumer group: {} changed, rebalance immediately", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup()); this.mqClientFactory.rebalanceImmediately(); } catch (Exception e) { log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e)); } return null; }
需要说明的是,client 一直在进行 rebalance,只是间隔 10s 而已。这里只是让 client 立马 rebalance,但其实并 没有任何效果,因为消费者数量没有变化。
但是由于 broker 保存的订阅关系一直被修改,导致 consumer 去拉数据时,会因为订阅关系不存在,而拉不到数据。
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (null == subscriptionData) { log.warn("the consumer‘s subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer‘s subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; }
原文:https://www.cnblogs.com/allenwas3/p/11882089.html