<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.3.4.RELEASE</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.11.3</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.3</version> </dependency>
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.100.86"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); return connection; } }
发送者
public class Sender { private final static String QUEUE = "testHello"; public static void main(String[] args) throws IOException, TimeoutException { // 获取连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 声明队列 // 参数1: 队列名称 // 参数2: 是否持久化队列;队列实在内存中的,rabbitMQ重启会丢失,设置为true,会保存到erlang自带的数据库, // 参数3: 是否排外,两个作用,一是当我们的连接关闭后是否自动删除队列,二是是否私有化当前队列,私有化后,其他通道不能访问 // 参数4: 是否自动删除 // 参数5: 其他参数。 channel.queueDeclare(QUEUE, durable:false, exclusive:false, autoDelete:false, arguments:null); // 发送内容 channel.basicPublish("",QUEUE,null,"发送消息1".getBytes()); // 关闭连接 channel.close(); connection.close(); } }
消费者
public class Receiver { private final static String QUEUE = "testHello"; public static void main(String[] args) throws IOException, TimeoutException { // 获取连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE, false, false, false, null); // 接收消息 while (true){ DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Receiver message :" + message); }; channel.basicConsume(QUEUE,true,deliverCallback, consumerTag ->{}); } // 关闭连接 // channel.close(); // connection.close(); } }
测试结果
生产者
public class WorkSender { private final static String QUEUE = "testWork"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); // 发送内容 for(int i = 0; i< 10; i++){ channel.basicPublish("",QUEUE,null,("发送消息" + i).getBytes()); } channel.close(); connection.close(); } }
消费者
public class WorkReceiver1 { private final static String QUEUE = "testWork"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); // 告诉服务器,在没有确认当前消息完成之前,不要发送新的消息 channel.basicQos(1); // 接收消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1 收到的内容是:" + new String(body)); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } // 确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; // 注册消费者, 参数2:手动确认,代表收到消息后需要手动告诉服务器,收到消息了。 channel.basicConsume(QUEUE,false,consumer); } }
测试结果: 消费者1 和 消费者2 sleep 时间一样
测试结果: 消费者1 sleep(10) 消费者2 sleep(40)
生产者
public class FanoutSender { private final static String EXCHANGE_NAME = "testExchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明交换机, 类型为fanout channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 发布订阅模式,消息先发到交换机,交换机没有保存功能,如果没有消费者,消息会丢失 channel.basicPublish(EXCHANGE_NAME,"",null,"Fanout发布订阅模式的消息1".getBytes()); // 关闭连接 channel.close(); connection.close(); } }
消费者
public class FanoutReceiver1 { private final static String EXCHANGE_NAME = "testExchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("testPubQueue1", false, false, false, null); // 绑定队列到交换机 channel.queueBind("testPubQueue1", EXCHANGE_NAME, routingKey:""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Fanout消费者1 收到的内容是:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testPubQueue1",false,consumer); } }
测试结果:
Web端查看绑定关系:
生产者
public class RouteSender { private final static String EXCHANGE_NAME = "testRoute"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明交换机, 类型为fanout channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 路由模式 channel.basicPublish(EXCHANGE_NAME,"key1",null,"路由模式的消息1".getBytes()); channel.basicPublish(EXCHANGE_NAME,"key2",null,"路由模式的消息2".getBytes()); // 关闭连接 channel.close(); connection.close(); } }
消费者
public class RouteReceiver1 { private final static String EXCHANGE_NAME = "testRoute"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("testRouteQueue1", false, false, false, null); // 绑定队列到交换机 // 参数3: 标记,绑定到交换机时指定一个标记,只有和它一样标记的消息才会到这个队列 channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key1"); // 如果要接收多个标记,只需要再执行一次即可 channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key2"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1 收到的内容是:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testRouteQueue1",false,consumer); } }
测试结果:
生产者
public class TopicSender { private final static String EXCHANGE_NAME = "testTopic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.basicPublish(EXCHANGE_NAME,"key.1",null,"Topic 模式的消息 : key.1".getBytes()); channel.basicPublish(EXCHANGE_NAME,"key.1.2",null,"Topic 模式的消息 : key.1.2".getBytes()); channel.close(); connection.close(); } }
消费者
public class TopicReceiver1 { private final static String EXCHANGE_NAME = "testTopic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testTopicQueue1", false, false, false, null); channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "key.*");
// channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "key.#"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1 收到的内容是:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testTopicQueue1",false,consumer); } }
测试结果:
原文:https://www.cnblogs.com/kingdomer/p/14238305.html