制作人:全心全意
MQ全称为Message Queue(消息队列),是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。消息队列指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。
RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。遵循Mozilla Public License开源协议。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制。
RabbitMQ基于Erlang实现
RabbitMQ优点
性能很好,延时低
吞吐量到万级,功能完备
有良好的的管理界面和管理工具
社区相对比较活跃
RabbitMQ缺点
吞吐量相对低
#1.安装erlang solution
erlang官网:http://erlang.org
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
yum -y install epel-release
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
yum -y install erlang
#2.安装rabbit
yum -y install rabbitmq-server #出错
#=====源码安装
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.8/rabbitmq-server-generic-unix-3.7.8.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.7.8.tar.xz -C /usr/local/
vim /etc/profile
source /etc/profile
rabbitmq-plugins enable rabbitmq_management
rabbitmq-server -detached #后台启动
#3.开启rabbitmq的远程访问
vi /etc/rabbitmq/rabbitmq.config
[{rabbit,[{loopback_users,[]}]}] //添加该行,新版本不需要,直接进行第4部
#4.开启web管理端访问(非必须,需先开启允许远程访问)
rabbitmq-plugins enable rabbitmq_management
#5.安装消息延迟插件
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.3.5/plugins
wget https://dl.bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange-0.0.1.ez
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#6.放行端口或关闭防火墙
firewalld-cmd --add-port=15672/tcp --permanent
firewalld-cmd --add-port=5672/tcp --permanent
#7.启动
service rabbitmq-server start
service rabbitmq-server restart
地址:http://IP:15672
默认用户名密码:guest
超级管理员:administrator,可以登录控制台,查看所有信息,可以对用户和策略进行操作。
监控者:monitoring,可以登录控制台,可以查看节点的相关信息,比如进程数、内存和磁盘的使用情况。
策略制定者:policymaker,可以登录控制台,指定策略,但是无法查看节点信息。
普通管理员:management,仅能登录控制台。
其他:无法登录控制台,一般指提供者和消费者。
添加账号命令:(还可通过web页面进行管理)
rabbitmqctl adduser zhangsan mima123 #添加zhangsan用户,密码为mima123
rabbitmqctl set_user_tags zhangsan administrator #设置zhangsan用户为administrator级别
网址:https://www.rabbitmq.com/getstarted.html
“Hello World!”:一个生产者对应一个消费者
Work queues:一个生产者对应多个消费者
Publish/Subscribe:引入交换机概念,生产者将消息发入交换机,由交换机分配到不同队列,不同队列对应不同消费者,两个消费者都能接收到消息(交换机类型:fanout)
Routing:根据关键字(单词)发送到指定队列(交换机类型:direct)
Topics:根据关键字(单词)发送到指定队列,允许使用通配符*和#号(*:任意一个单词,#:任意多个单词,关键词包含多个单词使用.分隔,例如:www.zhang)(交换机类型:topic)
RPC:同步请求(有一些情况需要等待返回,不能异步),生产者发送消息指定唯一ID和返回结果的消息队列,生产者从返回消息队列中取得返回结果。
工具类:
package zq5;
import org.apache.log4j.Logger;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
private static Logger logger = Logger.getLogger(ConnectionUtil.class);
public static Connection getConnection() {
try {
Connection connection = null;
// 定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务端地址(域名地址/ip)
factory.setHost("172.16.1.12");
// 设置服务器端口号
factory.setPort(5673);
// 设置虚拟主机(相当于数据库中的库)
factory.setVirtualHost("/zhangsan");
// 设置用户名
factory.setUsername("zhangsan");
// 设置密码
factory.setPassword("123456");
connection = factory.newConnection();
return connection;
} catch (Exception e) {
return null;
}
}
}
Send类
package zq;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "queue_hello";
public static void main(String[] args) throws Exception {
// 获取链接
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
Channel channel = connection.createChannel();
/**
* 1:队列名称 2:是否持久化 3:是否独占模式 4:是否自动删除队列中的消息 5:其它额外参数
*/
// 创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
/**
* 1:交换机名 2:队列名 3:BasicProperties 4:消息字节数组
*/
// 发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送:‘" + message + "‘");
// 关闭链接
channel.close();
connection.close();
}
}
Recv类:
package zq;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv {
private final static String QUEUE_NAME = "queue_hello";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTage, Envelope envelope, BasicProperties properties,
byte[] body) {
String mess = new String(body);
System.out.println("接收到的消息为:" + mess);
}
};
// 创建一个消费者(监听器)
// 2:是否确认收到消息,手动确认需要加代码
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
Send类:
package zq1;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws Exception {
// 获取链接
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
Channel channel = connection.createChannel();
/**
* 1:队列名称 2:是否持久化 3:是否独占模式 4:是否自动删除队列中的消息 5:其它额外参数
*/
// 创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 20; i++) {
String message = i + "queue_work";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送:‘" + message + "‘");
}
/**
* 1:交换机名 2:队列名 3:BasicProperties 4:消息字节数组
*/
// 发送消息
// 关闭链接
channel.close();
connection.close();
}
}
Recv类:
package zq1;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTage, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException{
String mess = new String(body);
System.out.println("1接收到的消息为:" + mess);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 创建一个消费者(监听器)
// 2:是否确认收到消息,手动确认需要加代码
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
Recv1类:
package zq1;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import zq2.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv1 {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
final Channel channel = connection.createChannel();
// 消费者预取的消费数量
channel.basicQos(1); // 没处理完1条消息,不会接收新的消息
// (不使用默认是轮循,使用此项会根据机器处理速度来分配)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTage, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String mess = new String(body);
System.out.println("2接收到的消息为:" + mess);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 手动返回回执确认
/**
* 1:回执确认消息的编号 2:是否批量确认,一般为false,不批量确认
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 创建一个消费者(监听器)
// 2:是否确认收到消息,手动确认需要加代码
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
Send类:
package zq2;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws Exception {
// 获取链接
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
Channel channel = connection.createChannel();
// 声明exchange(交换机存在直接使用,不存在会创建)
// 注意:在消费者没有启动的情况下,发送的消息会丢失
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 交换机类型 fanout
*/
// 消息内容
String message = "Publish/Subscrib";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("发送:" + message);
/**
* 1:交换机名 2:队列名 3:BasicProperties 4:消息字节数组
*/
// 关闭链接
channel.close();
connection.close();
}
}
Recv类:
package zq2;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv {
private final static String QUEUE_NAME = "queue_fanout1";
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 声明消费者预取的消息数量
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTage, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String mess = new String(body);
System.out.println("1接收到的消息为:" + mess);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 创建一个消费者(监听器)
// 2:是否确认收到消息,手动确认需要加代码
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
Recv1类:
package zq2;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import zq1.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv1 {
private final static String QUEUE_NAME = "queue_fanout2";
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 消费者预取的消费数量
channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTage, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String mess = new String(body);
System.out.println("2接收到的消息为:" + mess);
// 手动返回回执确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 创建一个消费者(监听器)
// 2:是否确认收到消息,手动确认需要加代码
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
Send类:
package zq3;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws Exception {
// 获取链接
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
Channel channel = connection.createChannel();
// 声明exchange(交换机存在直接使用,不存在会创建)
// 注意:在消费者没有启动的情况下,发送的消息会丢失
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
/**
* 交换机类型 direct
*/
// 消息内容
// info和error为关键字,关键字可自定义
String message = "Routing";
channel.basicPublish(EXCHANGE_NAME, "zhangsan", null, message.getBytes());
System.out.println("发送:" + message);
/**
* 1:交换机名 2:队列名 3:BasicProperties 4:消息字节数组
*/
// 关闭链接
channel.close();
connection.close();
}
}
Recv类:
package zq3;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv {
private final static String QUEUE_NAME = "queue_direct1";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
// 声明消费者预取的消息数量
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTage, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String mess = new String(body);
System.out.println("1接收到的消息为:" + mess);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 创建一个消费者(监听器)
// 2:是否确认收到消息,手动确认需要加代码
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
Recv1类:
package zq3;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import zq1.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv1 {
private final static String QUEUE_NAME = "queue_direct2";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "zhangsan");
// 消费者预取的消费数量
channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTage, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String mess = new String(body);
System.out.println("2接收到的消息为:" + mess);
// 手动返回回执确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 创建一个消费者(监听器)
// 2:是否确认收到消息,手动确认需要加代码
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
Send类:
package zq4;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws Exception {
// 获取链接
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
Channel channel = connection.createChannel();
// 声明exchange(交换机存在直接使用,不存在会创建)
// 注意:在消费者没有启动的情况下,发送的消息会丢失
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/**
* 交换机类型 direct
*/
// 消息内容
// info和error为关键字,关键字可自定义
String message = "Routing";
channel.basicPublish(EXCHANGE_NAME, "zhangsan", null, message.getBytes());
System.out.println("发送:" + message);
/**
* 1:交换机名 2:队列名 3:BasicProperties 4:消息字节数组
*/
// 关闭链接
channel.close();
connection.close();
}
}
Recv类:
package zq4;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv {
private final static String QUEUE_NAME = "queue_direct1";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
// 声明消费者预取的消息数量
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTage, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String mess = new String(body);
System.out.println("1接收到的消息为:" + mess);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 创建一个消费者(监听器)
// 2:是否确认收到消息,手动确认需要加代码
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
Recv1类:
package zq4;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import zq1.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv1 {
private final static String QUEUE_NAME = "queue_direct2";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从链接中创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "zhangsan");
// 消费者预取的消费数量
channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTage, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String mess = new String(body);
System.out.println("2接收到的消息为:" + mess);
// 手动返回回执确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 创建一个消费者(监听器)
// 2:是否确认收到消息,手动确认需要加代码
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
RpcServer类:
package zq5;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RpcServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) {
Connection connection = null;
try {
connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 消费者监听队列
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
while (true) {
// 接收并处理消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("服务端接收:" + message);
// 确认收到消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 取出消息的correlationld
AMQP.BasicProperties properties = delivery.getProperties();
String correlationld = properties.getCorrelationId();
// 创建具有与接收消息相同的correlationld的消息属性
AMQP.BasicProperties replyProperties = new AMQP.BasicProperties().builder().correlationId(correlationld)
.build();
// properties.getReplyTo():获取回调队列名
channel.basicPublish("", properties.getReplyTo(), replyProperties, "1111111".getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
RpcClient类:
package zq5;
import java.util.UUID;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RpcClient {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = ConnectionUtil.getConnection();
channel = connection.createChannel();
// 创建回调队列
String callbackQueue = channel.queueDeclare().getQueue();
// 消费者从回调队列中接收服务端传送的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(callbackQueue, true, consumer);
// 创建带有correlationld的消息属性
String correlationld = UUID.randomUUID().toString();
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(correlationld)
.replyTo(callbackQueue).build();
String message = "hello rabbitmq rpc";
channel.basicPublish("", RPC_QUEUE_NAME, basicProperties, message.getBytes());
System.out.println("客户端发送的消息:" + message + ",correaltionld=" + correlationld);
// 接收回调消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String receivedCorrelationld = delivery.getProperties().getCorrelationId();
if (correlationld.equals(receivedCorrelationld)) {
System.out.println(
"客户端接收的回调消息:" + new String(delivery.getBody()) + ",conrrealtionld=" + correlationld);
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
原文:https://www.cnblogs.com/zhangquan-yw/p/14626463.html