首页 > 编程语言 > 详细

RabbitMQ - 01Spring整合

时间:2021-01-10 15:25:33      阅读:25      评论:0      收藏:0      [点我收藏+]

RabbitMQ - 01Spring整合

(1)项目框架

(1.1)POM文件(与Spring整合)

  <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.3.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.11.3</version>
        </dependency>

(1.2)工具类

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.100.86");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/test");
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

 

(2)RabbitMQ工作方式

(2.1)简单模式

                       技术分享图片

发送者

public class Sender {
    private final static String QUEUE = "testHello";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        // 参数1: 队列名称
        // 参数2: 是否持久化队列;队列实在内存中的,rabbitMQ重启会丢失,设置为true,会保存到erlang自带的数据库,
        // 参数3: 是否排外,两个作用,一是当我们的连接关闭后是否自动删除队列,二是是否私有化当前队列,私有化后,其他通道不能访问
        // 参数4: 是否自动删除
        // 参数5: 其他参数。
        channel.queueDeclare(QUEUE, durable:false, exclusive:false, autoDelete:false, arguments:null);
        // 发送内容
        channel.basicPublish("",QUEUE,null,"发送消息1".getBytes());
        // 关闭连接
        channel.close();
        connection.close();
    }
}

消费者

public class Receiver {
    private final static String QUEUE = "testHello";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE, false, false, false, null);
        // 接收消息
        while (true){
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Receiver message :" + message);
            };
            channel.basicConsume(QUEUE,true,deliverCallback, consumerTag ->{});
        }
        // 关闭连接
//        channel.close();
//        connection.close();
    }
}

 

测试结果 

技术分享图片

(2.2) Worker 队列        Distributing tasks among workers (the competing consumers pattern)

                                            技术分享图片        技术分享图片

生产者

public class WorkSender {
    private final static String QUEUE = "testWork";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        // 发送内容
        for(int i = 0; i< 10; i++){
            channel.basicPublish("",QUEUE,null,("发送消息" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者

public class WorkReceiver1 {
    private final static String QUEUE = "testWork";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        // 告诉服务器,在没有确认当前消息完成之前,不要发送新的消息
        channel.basicQos(1);
        // 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 收到的内容是:" + new String(body));
                try {
                    Thread.sleep(10);    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 注册消费者, 参数2:手动确认,代表收到消息后需要手动告诉服务器,收到消息了。
        channel.basicConsume(QUEUE,false,consumer);
    }
}

测试结果: 消费者1 和 消费者2 sleep 时间一样

                         技术分享图片         技术分享图片

测试结果: 消费者1 sleep(10)    消费者2 sleep(40) 

                           技术分享图片     技术分享图片

(2.3)Publish/Subscribe(交换机:Fanout) Sending messages to many consumers at once

                                                   技术分享图片 

生产者

public class FanoutSender {
    private final static String EXCHANGE_NAME = "testExchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机, 类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 发布订阅模式,消息先发到交换机,交换机没有保存功能,如果没有消费者,消息会丢失
        channel.basicPublish(EXCHANGE_NAME,"",null,"Fanout发布订阅模式的消息1".getBytes());
        // 关闭连接
        channel.close();
        connection.close();
    }
}

消费者

public class FanoutReceiver1 {
    private final static String EXCHANGE_NAME = "testExchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("testPubQueue1", false, false, false, null);
        // 绑定队列到交换机
        channel.queueBind("testPubQueue1", EXCHANGE_NAME, routingKey:"");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Fanout消费者1 收到的内容是:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testPubQueue1",false,consumer);
    }
}

测试结果: 

                    技术分享图片                     技术分享图片

Web端查看绑定关系:

技术分享图片

(2.4) Routing(路由模式,type=direct)   Receiving messages selectively

                                    技术分享图片

生产者

public class RouteSender {
    private final static String EXCHANGE_NAME = "testRoute";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机, 类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 路由模式
        channel.basicPublish(EXCHANGE_NAME,"key1",null,"路由模式的消息1".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"key2",null,"路由模式的消息2".getBytes());
        // 关闭连接
        channel.close();
        connection.close();
    }
}

消费者

public class RouteReceiver1 {
    private final static String EXCHANGE_NAME = "testRoute";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("testRouteQueue1", false, false, false, null);
        // 绑定队列到交换机
        // 参数3: 标记,绑定到交换机时指定一个标记,只有和它一样标记的消息才会到这个队列
        channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key1");
        // 如果要接收多个标记,只需要再执行一次即可
        channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key2");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 收到的内容是:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testRouteQueue1",false,consumer);
    }
}

测试结果:

                                       技术分享图片             技术分享图片

(2.5) Topics    Receiving messages based on a pattern (topics)

                                                                技术分享图片

生产者

public class TopicSender {
    private final static String EXCHANGE_NAME = "testTopic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        channel.basicPublish(EXCHANGE_NAME,"key.1",null,"Topic 模式的消息 : key.1".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"key.1.2",null,"Topic 模式的消息 : key.1.2".getBytes());
        channel.close();
        connection.close();
    }
}

消费者

public class TopicReceiver1 {
    private final static String EXCHANGE_NAME = "testTopic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testTopicQueue1", false, false, false, null);
        channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "key.*");
// channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "key.#"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1 收到的内容是:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testTopicQueue1",false,consumer); } }

测试结果: 

                       技术分享图片      技术分享图片

 

RabbitMQ - 01Spring整合

原文:https://www.cnblogs.com/kingdomer/p/14238305.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!