目录
RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用 Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的
RabbitMQ的优点:
RabbitMQ的整体架构:
?
RabbitMQ的消息流转:
?
?
AMQP全称: Advanced Message Queuing Protocol
AMQP翻译: 高级消息队列协议
AMQP定义: 是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
?
?
AMQP核心概念:
?
?
后台启动: ./rabbitmq start &
关闭: ./rabbitmqctl stop
节点状态: ./rabbitmqctl status
管控台: http://ip:15672
?
?
RabbitMQ生产消费快速入门:
环境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依赖配置)
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
?
public class Procuder {
public static void main(String[] args) throws Exception {
//1.创建一个ConnectionFactory 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过Connection 创建一个 Channel
Channel channel = connection.createChannel();
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* exchange:指定交换机 不指定 则默认 (AMQP default交换机) 通过routingkey进行匹配
* props 消息属性
* body 消息体
*/
//4.通过Channel发送数据
for(int i = 0; i < 5; i++){
System.out.println("生产消息:" + i);
String msg = "Hello RabbitMQ" + i;
channel.basicPublish("", "test", null, msg.getBytes());
}
//5.记得关闭相关的连接
channel.close();
connection.close();
}
}
?
public class Consumer {
public static void main(String[] args) throws Exception{
//1.创建一个ConnectionFactory 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过Connection 创建一个 Channel
Channel channel = connection.createChannel();
//4. 声明创建一个队列
String queueName = "test";
/**
* durable 是否持久化
* exclusive 独占的 相当于加了一把锁
*/
channel.queueDeclare(queueName,true,false,false,null);
//5.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6.设置channel
/**
* ACK: 当一条消息从生产端发到消费端,消费端接收到消息后会马上回送一个ACK信息给broker,告诉它这条消息收到了
* autoack:
* true 自动签收 当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
* false 手动签收 当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了
*
*/
channel.basicConsume(queueName, true, queueingConsumer);
//7.获取消息
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端:" + msg);
//Envelope envelope = delivery.getEnvelope();
}
}
}
?
Exchange: 接收消息,并根据路由键转发消息所绑定的队列
?
交换机属性:
?
所有发送到Direct Exchange的消息被转发到RouteKey指定的Queue
注意:Direct模式可以使用RabbitMQ自带的Exchange: default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RoutingKey必须完全匹配才会被队列接收,否则该消息会被抛弃
?
public class ProducerDirectExchange {
public static void main(String[] args) throws Exception {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.创建Connection
Connection connection = connectionFactory.newConnection();
//3.创建Channel
Channel channel = connection.createChannel();
//4.声明
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
//5.发送
String msg = "Hello World RabbitMQ4 Direct Exchange Message";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
?
public class ConsumerDirectExchange {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示声明了一个交换机
channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
//表示声明了一个队列
channel.queueDeclare(queueName,false,false,false,null);
//建立一个绑定关系
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称,是否自动ACK,Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
?
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
注意:可以使用通配符进行匹配
符号 # 匹配一个或多个词
符号 * 匹配不多不少一个词
例如: "log.#" 能够匹配到 “log.info.oa”
? "log.*" 只会匹配到 "log.err"
public class ProducerTopicExchange {
public static void main(String[] args) throws Exception {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2.创建Connection
Connection connection = connectionFactory.newConnection();
//3.创建Channel
Channel channel = connection.createChannel();
//4.声明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5.发送
String msg = "Hello World RabbitMQ4 Direct Exchange Message";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
}
}
?
public class ConsumerTopicExchange {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.#";
//表示声明了一个交换机
channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
//表示声明了一个队列
channel.queueDeclare(queueName,false,false,false,null);
//建立一个绑定关系
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称,是否自动ACK,Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
?
不处理路由键,只需要简单的将队列绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
所以Fanout交换机转发消息是最快的
?
public class ProducerFanoutExchange {
public static void main(String[] args) throws Exception {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2.创建Connection
Connection connection = connectionFactory.newConnection();
//3.创建Channel
Channel channel = connection.createChannel();
//4.声明
String exchangeName = "test_fanout_exchange";
//5.发送
for(int i = 0; i < 10 ; i++){
String msg = "Hello World RabbitMQ4 Direct Exchange Message";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
}
channel.close();
connection.close();
}
}
?
public class ConsumerFanoutExchange {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_topic_queue";
//无需指定路由key
String routingKey = "";
//表示声明了一个交换机
channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
//表示声明了一个队列
channel.queueDeclare(queueName,false,false,false,null);
//建立一个绑定关系
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称,是否自动ACK,Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
?
服务器与应用程序之间传递的数据,本质上就是一段数据,由Properties和Body组成
常用属性:delivery mode、headers (自定义属性)
其他属性:content_type、content_encoding、priority、expiration
消息的properties属性用法示例:
public class Procuder {
public static void main(String[] args) throws Exception {
//1.创建一个ConnectionFactory 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过Connection 创建一个 Channel
Channel channel = connection.createChannel();
Map<String,Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
//10秒不消费 消息过期移除消息队列
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("utf-8")
.expiration("10000")
.headers(headers)
.build();
//4.通过Channel发送数据
for(int i = 0; i < 5; i++){
System.out.println("生产消息:" + i);
String msg = "Hello RabbitMQ" + i;
channel.basicPublish("", "test", properties, msg.getBytes());
}
//5.记得关闭相关的连接
channel.close();
connection.close();
}
}
?
public class Consumer {
public static void main(String[] args) throws Exception{
//1.创建一个ConnectionFactory 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过Connection 创建一个 Channel
Channel channel = connection.createChannel();
//4. 声明创建一个队列
String queueName = "test";
channel.queueDeclare(queueName,true,false,false,null);
//5.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6.设置channel
channel.basicConsume(queueName, true, queueingConsumer);
//7.获取消息
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端:" + msg);
Map<String, Object> headers = delivery.getProperties().getHeaders();
System.err.println("headers value:" + headers.get("my1"));
}
}
}
原文:https://www.cnblogs.com/dwlovelife/p/10982735.html