首页 > 其他 > 详细

RabbitMQ(七)——主题模式

时间:2020-04-21 17:44:29      阅读:56      评论:0      收藏:0      [点我收藏+]

RabbitMQ系列

RabbitMQ(一)——简介

RabbitMQ(二)——模式类型

RabbitMQ(三)——简单模式

RabbitMQ(四)——工作队列模式

RabbitMQ(五)——发布订阅模式

RabbitMQ(六)——路由模式

RabbitMQ(七)——主题模式

 

 

前言

   本章我们学习主题模式(Topic),主题模式与路由模式类似,不同的是路由模式生产者向指定路由发送消息,消费者接收指定路由的消息;主题模式生产者向交换机发送消息后,消费者可以对队列进行匹配,提供两个匹配字符,’ * ‘星号和 ‘ # ’井号,‘ * ’匹配一个词,‘ # ’ 匹配多个词,一般使用#号匹配多个,*号用的比较少。

  技术分享图片

  解释:生产者P向topic交换机发送消息,向指定routekey发布消息,消费者通过绑定routekey匹配词接收消息。

  如:生产者向routekey : topic.orange.rabbit 发送消息,消费者可通过绑定*.orange.* / *.*.rabbit接收到消息;

 

示例

  生产者:

技术分享图片技术分享图片
static void Main(string[] args)
        {
            Console.WriteLine("TopicServer发布服务器启动...");
            //创建连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //创建连接和通道
            using (var conn = factory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    //创建topic类型交换机
                    channel.ExchangeDeclare("topicExchange", "topic");

                    string msg = "";
                    //模拟向不同路由发送消息
                    for (int i = 0; i < 40; i++)
                    {
                        msg = $"发布消息{i}";
                        var body = Encoding.UTF8.GetBytes(msg);
                        string Topic = "";
                        if (i % 2 == 0)
                        {
                            Topic = "Topic.add";
                        }
                        else
                        {
                            Topic = "Topic.remove";
                        }
                        channel.BasicPublish("topicExchange", Topic, null, body);
                        Console.WriteLine($"{Topic}-发布消息成功:{msg}");

                        Thread.Sleep(1000);
                    }
                    Console.ReadKey();
                }
            }
        }
View Code

 

  消费者1:

技术分享图片技术分享图片
static void Main(string[] args)
        {
            Console.WriteLine("TopicClient接收客户端启动...");
            //创建连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //创建连接和通道
            using (var conn = factory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    //创建队列
                    var queue = channel.QueueDeclare().QueueName;
                    //绑定交换机,指定匹配路由Topic.#
                    channel.QueueBind(queue, "topicExchange", "Topic.#");
                    //创建消费者 可接收topic路由下全部消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        //接收消息
                        var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"Topic.#接收消息:{body.ToString()}");
                    };
                    channel.BasicConsume(queue, true, consumer);

                    Console.ReadKey();
                }
            }
        }
View Code

 

  消费者2:

 

技术分享图片技术分享图片
static void Main(string[] args)
        {
            Console.WriteLine("TopicClient接收客户端启动...");
            //创建连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //创建连接和通道
            using (var conn = factory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    //创建队列
                    var queue = channel.QueueDeclare().QueueName;
                    //绑定交换机 匹配路由Topic.add
                    channel.QueueBind(queue, "topicExchange", "Topic.add");
                    //创建消费者 只能接受到Topic.add路由消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        //接收消息
                        var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"Topic.add接收消息:{body.ToString()}");
                    };
                    channel.BasicConsume(queue, true, consumer);

                    Console.ReadKey();
                }
            }
        }
View Code

 

 

 

结果

  理想的结果应该是生产者分别向Topic.add与Topic.remove路由发送消息,消费者1匹配路由Topic.# ,可以接收到Topic路由下所有消息,而消费者2匹配的是Topic.add路由,能接受到生产者发送给Topic.add路由的消息;

 

技术分享图片

 

  与上面猜测一直、致,消费者1能接收到生产者topic路由下的所有消息,消费者2能接收到topic.add路由下的消息,这与消费者交换机绑定的路由有关。

 

总结

  1. 主题模式(Topic)是最常用的模式,相较于路由模式(Direct)更灵活、更方便、更强大。
  2. 生产者模式需要在声明交换机时指定类型为Topic。
  3. 消费者接收消息有两种模糊查询:# 匹配一个或多个词,* 匹配一个词,# 使用较多

 

附上Demo源码:https://github.com/1164887865/RabbitMQDemo

 

RabbitMQ(七)——主题模式

原文:https://www.cnblogs.com/zousc/p/12745160.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!