转载:https://www.jianshu.com/p/453c6e7ff81c
分布式消息系统作为实现分布式系统可扩展、可伸缩的关键组件,需要具有高吞吐、高可用等特点,而谈到消息系统的设计,就回避不了两个问题:
1.消息的顺序问题 2.消息的重复问题
RockerMQ作为阿里开源的一款高性能、高吞吐的消息中间件,它是怎样来解决这两的问题的?
RocketMQ有哪些关键特性?
其实现原理是怎样的?
【关键特性及其实现原理】
【一、顺序消费】
消息有序指的是可以按照消息的发送顺序来消费。
例如:一笔订单产生了3条消息,分别是订单创建、订单付款、订单完成。消费时,必须按照顺序消费才有意义,与此同时多笔订单之间又是可以并行消费的。
例如生产者差生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?可能脑中想到的是这样的:

假定M1-->S1,M2-->S2,如果要保证M1比M2想消费,那么M1到达消费端被消费后,通知S2,然后S2再将M2发送到消费端。
但是这个模型存在的问题是:如果M1和M2分别发送到两台Server上,就不能保证M1先到达MQ集群,也不能保证M1被先消费。换个角度看,如果M2先与M1到达MQ集群,甚至M2被消费后,M1才到达消费端,这时候消息就乱序了,说明以上模型是不能保证消息的顺序的。
如何才能在MQ集群保证消息的顺序?一种简单的方式就是将M1、M2发送到同一个Server上:

上面这样可以保证M1先与M2达到MQ集群(Producer等待M1发送成功后再发送M2),根据先到达先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。
但是这个模型也仅仅是在理论上可以保证消息的顺序,在实际场景中可能会遇到下面的问题:网络延迟问题

只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题,如上图所示,如果发送M1耗时大于发送M2耗时,那么仍然存在可能M2被先消费,仍然不能保证消息的顺序,即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然可能出现M2先于M1被消费的情况。
那如何解决这个问题呢?
将M1和M2发往同一个消费者,且发送M1后,需要消费端响应成功后才能发送M2。
但是这里又会存在另外的问题:如果M1被发送到消费端后,消费端1没有响应,那么是继续发送M2呢,还是重新发送M1?一般来说为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,如下图,保证消息顺序的正确方式:

这样的模型就可以严格保证消息的顺序,但是仍然会有问题:消费端1没有响应Server时,有两种情况,一种是M1确实没有到达(数据可能在网络传输中丢失),另一种是消费端已经消费M1并且已经发回响应消息,但是MQ Server没有收到。如果是第二种情况,会导致M1被重复消费。
回过头来看消息顺序消费问题,严格的顺序消息非常容易理解,也可以通过文中所描述的方式来简化处理,总结起来,要实现严格的顺序消息,简单可行的办法就是:
保证 生产者—MQServer—消费者 是“一对一对一”的关系。
这样的设计虽然简单易行,但是也存在一些很严重的问题,比如:
1.并行度会成为消息系统的瓶颈(吞吐量不够) 2.产生更多的异常处理。比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。
我们最终的目标是要集群的高容错性和高吞吐量,这似乎是一对不可调和的矛盾,那么阿里是如何解决的呢?
世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沈询
有些问题,看起来很重要,但实际上我们可以通过合理的设计将问题分解来规避,如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。从这个角度来看消息的顺序问题,可以得出两个结论:
1.不关注乱序的应用大量存在 2.队列无序并不意味着消息无序
所以从业务层面来保证消息的顺序,而不仅仅是依赖于消息系统,是不是我们更应该寻求的一种合理的方式?
最后从源码角度分析RocketMQ怎么实现发送顺序消息。
RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。
比如下面的例子中,订单号相同的消息挥别先后发送到同一个队列中:
// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上 // RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash // 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);
原文:https://www.cnblogs.com/HigginCui/p/9900148.html