一. 简介
1. 基本概念
RabbitMQ是AMQP协议的一个开源实现;
Message:消息是不具名的,由消息头和消息体组成;
Publisher:消息生产者,向交换器发布消息的客户端应用程序;
Exchange:交换器,用来接收消息,并将消息路由给队列;
Binding:用于消息队列和交换器之间的关联;
Queue:消息队列,用了保存消息知道发送给消费者;
Connection:网络连接,比如一个TCP连接;
Channel:信道,多路复用连接中的一条独立的双向数据流通道;鼓励在一个连接中创建多个通道,通道的创建和销毁比Connection小很多;避免线程间共享通道;
Consumer:消费者,从队列中区的消息的客户端应用程序;
Virtual Host:虚拟主机,一批交换器,消息队列和相关对象;
Broker:消息队列的服务实体;
2. 交换器类型
Headers:匹配AMQP消息的Header而不是路由键,此外和Direct完全一致,但性能相差很多,几乎不用;
Direct:消息中的路由键routing-key和binding中的绑定键bingding-key一致,交换器将消息发送到对应的队列中,完全匹配,单播的模式;
Fanout:不处理路由键,简单的将队列绑定到交换器,发送到交换器的每条消息都会被转发到所有队列中,速度很快;
Topic:队列需要绑定一种模式,通过模式匹配消息的路由键属性,消息的路由器和字符串用“.”分割为两部分,队列的模式会识别“#”和“*”两个通配符;
二. 简单实例
1. 消息生产者
ConnectionFactory factory = new ConnectionFactory(); // 创建连接工厂 factory.setUsername(""); factory.setPassword(""); factory.setHost(""); // 设置rabbitMQ地址 factory.setVirtualHost(""); Connection conn = factory.newConnection(); // 建立到代理服务器的连接 Channel channel = conn.creatChannel(); // 创建信道 String exchangeName = ""; // 声明交换器 channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "routingKey"; byte[] msg = "".getBytes(); channel.basicPublish(exchangeName, routingKey, null, msg); // 发布消息 channel.close(); conn.close();
2. 消息消费者
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(""); factory.setPassword(""); factory.setHost(""); factory.setVirtualHost(""); Connection conn = factory.newConnection(); Channel channel = conn.creatChannel(); String exchangeName = ""; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "routingKey"; String queneName = channel.queueDeclare().getQqueue(); // 声明队列 channel.queueBind(queneName, exchangeName, routingKey); // 绑定队列,通过路由键将队列和交换器绑定 while(true) { boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queneName, autoAck, consumerTag, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); // 路由键 String contentType = properties.getContentType(); // 内容类型 long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false); // 确认消息 String bodyStr = new String(body, "UTF-8"); // 消息体内容 } }); }
3. Spring整合
在生产者和消费者中有很多重复代码,很多都是配置信息,基于此,Spring框架集成了RabbitMQ,用于简化使用操作;
主要API:
MessageListenerContainer:用来监听容器,为消息入队提供异步处理;
RabbitTemplate:用来发送和接收消息;
RabbitAdmin:用来声明队列,交换器,绑定;
三. 实践建议
1. 虚拟主机
客户端在连接消息服务器时必须指定一个虚拟主机,RabbitMQ中的权限控制是以vhost为单位的;
这种方式既能把同一台RabbitMQ服务器的不同业务应用区分开,又可以避免其内部队列,交换器的命名冲突;
可以通过RabbitMQ提供的rabbitmqctl工具管理vhost;
2. 消息保存
消息的保存方式有disk和RAM两种;
disk:一种情况是发布消息时指明需要写入磁盘;二是当服务器内存紧张时会将部分内存中的消息转移到磁盘;
RAM:只在RAM中保存内部数据库表数据,不会保存消息,消息存储索引,队列索引和其他节点状态等数据;
3. 消息确认模式
生产者确认消息正确到达broker的方式:通过AMQP协议中的事务机制;把信道设置为确认模式;
事务机制需要生产者应用同步等待broker的执行结果,在性能上极大降低消息服务器的吞吐量;
确认模式会把信道上发布的消息都分配一个唯一ID,消息被成功投递后,信道会向生产者发布包括ID的确认消息,异步进行,对性能影响小;
4. 消费者应答
要求消费者在消费完消息后发送一个回执给RabbitMQ服务器;
自动回执:服务器成功发送消息给消费者后会立即把消息从队列中删除;
手动回执:等到消费者回送的确认消息(向Broker显式发送ACK)后才会删除,如果因为意外没有发送ACK,Broker会把消息转发给其他消费者,如果没有则缓存起来指定有新的消费者注册;
当消费者处理消息失败或当期不能处理消息时,可以给broker发送一个拒绝一条或多条消息的指令,要求broker将消息丢弃或重新放入队列中;
可通过设置预取数量(Prefetch Count)限制每个消费者在发送ACK回执前一次最多可以接收多少条消息;
原文:https://www.cnblogs.com/bbbbs/p/12540648.html