首页 > 其他 > 详细

分布式开放消息系统(RocketMQ)的原理与实践

时间:2018-11-03 11:59:25      阅读:147      评论:0      收藏:0      [点我收藏+]

转载:https://www.jianshu.com/p/453c6e7ff81c

 

分布式消息系统作为实现分布式系统可扩展、可伸缩的关键组件,需要具有高吞吐、高可用等特点,而谈到消息系统的设计,就回避不了两个问题:

1.消息的顺序问题
2.消息的重复问题

RockerMQ作为阿里开源的一款高性能、高吞吐的消息中间件,它是怎样来解决这两的问题的?

RocketMQ有哪些关键特性?

其实现原理是怎样的?

 

【关键特性及其实现原理】

【一、顺序消费】

消息有序指的是可以按照消息的发送顺序来消费。

例如:一笔订单产生了3条消息,分别是订单创建、订单付款、订单完成。消费时,必须按照顺序消费才有意义,与此同时多笔订单之间又是可以并行消费的。

例如生产者差生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?可能脑中想到的是这样的:

技术分享图片

假定M1-->S1,M2-->S2,如果要保证M1比M2想消费,那么M1到达消费端被消费后,通知S2,然后S2再将M2发送到消费端。

但是这个模型存在的问题是:如果M1和M2分别发送到两台Server上,就不能保证M1先到达MQ集群,也不能保证M1被先消费。换个角度看,如果M2先与M1到达MQ集群,甚至M2被消费后,M1才到达消费端,这时候消息就乱序了,说明以上模型是不能保证消息的顺序的。

如何才能在MQ集群保证消息的顺序?一种简单的方式就是将M1、M2发送到同一个Server上:

技术分享图片

上面这样可以保证M1先与M2达到MQ集群(Producer等待M1发送成功后再发送M2),根据先到达先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。

但是这个模型也仅仅是在理论上可以保证消息的顺序,在实际场景中可能会遇到下面的问题:网络延迟问题

技术分享图片

只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题,如上图所示,如果发送M1耗时大于发送M2耗时,那么仍然存在可能M2被先消费,仍然不能保证消息的顺序,即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然可能出现M2先于M1被消费的情况。

那如何解决这个问题呢?

将M1和M2发往同一个消费者,且发送M1后,需要消费端响应成功后才能发送M2。

但是这里又会存在另外的问题:如果M1被发送到消费端后,消费端1没有响应,那么是继续发送M2呢,还是重新发送M1?一般来说为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,如下图,保证消息顺序的正确方式:

技术分享图片

这样的模型就可以严格保证消息的顺序,但是仍然会有问题:消费端1没有响应Server时,有两种情况,一种是M1确实没有到达(数据可能在网络传输中丢失),另一种是消费端已经消费M1并且已经发回响应消息,但是MQ Server没有收到。如果是第二种情况,会导致M1被重复消费

回过头来看消息顺序消费问题,严格的顺序消息非常容易理解,也可以通过文中所描述的方式来简化处理,总结起来,要实现严格的顺序消息,简单可行的办法就是:

保证 生产者—MQServer—消费者  是“一对一对一”的关系。

这样的设计虽然简单易行,但是也存在一些很严重的问题,比如:

1.并行度会成为消息系统的瓶颈(吞吐量不够)
2.产生更多的异常处理。比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

我们最终的目标是要集群的高容错性和高吞吐量,这似乎是一对不可调和的矛盾,那么阿里是如何解决的呢?

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沈询

有些问题,看起来很重要,但实际上我们可以通过合理的设计将问题分解来规避,如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。从这个角度来看消息的顺序问题,可以得出两个结论:

1.不关注乱序的应用大量存在
2.队列无序并不意味着消息无序

所以从业务层面来保证消息的顺序,而不仅仅是依赖于消息系统,是不是我们更应该寻求的一种合理的方式?

最后从源码角度分析RocketMQ怎么实现发送顺序消息。

RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。

比如下面的例子中,订单号相同的消息挥别先后发送到同一个队列中:

// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

 

分布式开放消息系统(RocketMQ)的原理与实践

原文:https://www.cnblogs.com/HigginCui/p/9900148.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!