复杂版:
消息队列,简称MQ(Message Queue),消息从发送者到接收者的方式也有两种。
一种我们称为即时消息通讯,也就是说消息从一端发出后(消息发送者)立即就可以达到另一端(消息接收者),这种方式的具体实现就是RPC(当然单纯的http通讯也满足这个定义);另一种方式称为延迟消息通讯,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列。
简单版:
从字面上的意思理解,消息就是信息的载(第四声)体,队列就是一种先进先出的数据结构。简单理解为:把要传输的数据放在队列中。
消息队列常用的使用场景有3个: 异步处理、应用解耦、流量削锋。
日志记录,用户在页面进行一些业务操作(修改用户信息,注册等),通常需要将用户操作记录存入日志表里(大部分的日志记录行为其实是和用户操作的主业务没有直接关系的,只是运营人和经营人员需要拿到这部分用户操作的日志信息,来进行用户行为分析或行为监控)。在我们没有使用消息队列之前,当有用户请求时,先处理用户的请求再记录日志,这两个操作是放在一起的,而用户也需要等待日志添加完成之后才能拿到后台的响应信息,这样其实浪费了用户的部分时间。此时我们可以使用消息队列,当响应完用户请求之后,只需要把这个操作信息放入消息队列之后,就可以直接返回结果给用户了,无序等待日志处理和日志添加完成,从而缩短了前台用户的等待时间。
使用了消息队列之后,我们可以把系统的业务功能模块化,实现系统的解耦。例如,在没有使用消息队列之前,当前台用户完善了个人信息之后,首先我们需要更新用户的资料,再添加一条用户信息修改日志。但突然有一天产品经理提了一个需求,在前台用户信息更新之后,需要给此用户的增加一定的积分奖励,然后没过几天产品经理又提了一个需求,在前台用户信息更新之后,不但要增加积分奖励,还要增加用户的经验值,但没过几天产品经理的需求又变了,他要求完善资料无需增加用户的积分了,这样反反复复、来来回回的折腾,我想研发的同学一定受不了,但这是互联网公司的常态,那我们有没有一劳永逸的办法呢?没错,这个时候我们想到了使用消息队列来实现系统的解耦,每个功能的实现独立开,只需要一个订阅或者取消订阅的开关就可以了,当需要增加功能时,只需要打开订阅“用户信息完善”的队列就行,如果过两天不用了,再把订阅的开关关掉就行了,这样我们就不用来来回回的改业务代码了,也就轻松的实现了系统模块间的解耦。
在WKD项目,三方系统联调中采购出入库业务用到了MQ,里面设计到三个系统,系统A(SAP),系统B(WMS),还有中台,A系统订单入库到我们中台后,中台进行相应的业务处理后需用通知B系统入库信息时用了MQ(exchange中心调用gateway时),假如用传统的做法,直接中台去调B系统的接口的话,假如B系统有故障的话,无法访问,那么订单就入库失败了,用MQ后,即使B系统有故障,也不影响订单入库,实现了应用解耦。
商品秒杀或团抢活动中使用广泛,会发生短时间内出现爆发式的用户请求,如果不采取相关的措施,会导致服务器忙不过来,响应超时的问题,轻则会导致服务假死,重则会让服务器直接宕(dang第四声)机,给用户带来的体验也非常不好。加上了消息队列后,服务器接收到用户的所有请求后,先把这些请求全部写入到消息队列中再排队处理,这样就不会导致同时处理多个请求的情况;如果消息队列长度超过可以承载的最大数量,那么我们可以抛弃当前用户的请求,通知前台用户“页面出错啦,请重新刷新”等提示,这样就会有更好的交互体验。
好比疫情严重期间,京东8点时秒杀口罩,有时你会发现自己点进去直接报404或者网络异常或者重新跳转到京东首页,然后过会儿页面正常了,但是发现口罩已经售空了,其实就是用了消息队列来防止应用挂掉,当消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
具体场景描述请看https://www.cnblogs.com/qdhxhz/p/9071863.html
(1)系统复杂度提高,可用性降低.
(2)系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?
(3)硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
(4)一致性问题
A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。
我们可以通过 JDK 提供的 Queue 来实现自定义消息队列,使用 DelayQueue 实现延迟消息队列。
我们可使用 Queue 来实现消息队列,Queue 大体可分为以下三类:
• **双端队列(Deque)**是 Queue 的子类也是 Queue 的补充类,头部和尾部都支持元素插入和获取;
• 阻塞队列指的是在元素操作时(添加或删除),如果没有成功,会阻塞等待执行,比如当添加元素时,如果队列元素已满,队列则会阻塞等待直到有空位时再插入;
• 非阻塞队列,和阻塞队列相反,它会直接返回操作的结果,而非阻塞等待操作,双端队列也属于非阻塞队列。
/** * @author 佛大Java程序员 * @since 1.0.0 */ public class CustomQueue { /** * 定义消息队列 */ private static Queue<String> queue = new LinkedList<>(); public static void main(String[] args) { producer();// 调用生产者 consumer();// 调用消费者 } // 生产者 public static void producer(){ // 添加消息 queue.add("first message."); queue.add("second message."); queue.add("third message."); } // 消费者 public static void consumer(){ while (!queue.isEmpty()){ // 消费消息 System.out.println(queue.poll()); } } }
运行结果
实现自定义延迟队列需要实现 Delayed 接口,重写 getDelay() 方法。
实现 Delayed 接口
/** * @author 佛大Java程序员 * @since 1.0.0 */ public class MyDelay implements Delayed { /** * 延迟截止时间(单位:毫秒) */ long delayTime = System.currentTimeMillis(); private String msg; public long getDelayTime() { return delayTime; } public void setDelayTime(long delayTime) { this.delayTime = delayTime; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } /** * 初始化 * @param delayTime 设置延迟执行时间 * @param msg 执行的消息 */ public MyDelay(long delayTime, String msg) { this.delayTime = (this.delayTime + delayTime); this.msg = msg; } /** * 获取剩余时间 * * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } /** * 队列里元素的排序依据 * * @param o * @return */ @Override public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1; } else { return 0; } }
@Override
public String toString() {
return this.msg;
} }
实现类
/** * 延迟消息队列 * * @author 佛大Java程序员 * @since 1.0.0 */ public class CustomDelayQueue { private static DelayQueue delayQueue = new DelayQueue(); public static void main(String[] args) throws InterruptedException { // 调用生产者 producer(); // 调用消费者 consumer(); } /** * 生产者 */ public static void producer() { // 添加消息 delayQueue.put(new MyDelay(1000, "消息1")); delayQueue.put(new MyDelay(3000, "消息2")); } /** * 消费者 * * @throws InterruptedException */ public static void consumer() throws InterruptedException { System.out.println("开始执行时间:" + DateFormat.getDateTimeInstance().format(new Date())); while (!delayQueue.isEmpty()) { System.out.println(delayQueue.take()); } System.out.println("结束执行时间:" + DateFormat.getDateTimeInstance().format(new Date())); }
}
可以看出,消息 1 和消息 2 都实现了延迟执行3秒的功能。
综合上面的材料得出以下两点:
(1)中小型软件公司,建议选RabbitMQ.一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。正所谓,成也萧何,败也萧何!他的弊端也在这里,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。不考虑rocketmq和kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里放弃维护rocketmq,中小型公司一般抽不出人来进行rocketmq的定制化开发,因此不推荐。
大型软件公司,根据具体使用在rocketMq和kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。具体该选哪个,看使用场景。
未完待续......
(1)消息队列的使用场景有哪些?(为什么使用消息队列?)
(2)消息队列有什么优点和缺点?
(3)介绍一个你熟悉的消息中间件?
(4)如何手动实现一个消息队列和延迟消息队列?
(5)Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,以及适合哪些场景?
消息队列概念和使用场景 --
https://www.cnblogs.com/qdhxhz/p/9071863.html
什么是消息队列? --
https://blog.csdn.net/yue_2018/article/details/89305275
拉钩教育 --
https://kaiwu.lagou.com/course/courseInfo.htm?courseId=59#/detail/pc?id=1770
原文:https://www.cnblogs.com/liaowenhui/p/12391639.html