RabbitAdmin类用来管理RabbitMQ;
创建方法:
ConnectionFactory connectionFactory = new CachingConnectionFactory(); RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
CachingConnectionFactory是Spring AMQP下一个连接工厂,适合SpringBoot的深度整合的连接工厂;
从构造方法出发
org.springframework.amqp.rabbit.connection.CachingConnectionFactory#CachingConnectionFactory()
最终会调用这个有参构造方法
org.springframework.amqp.rabbit.connection.CachingConnectionFactory#CachingConnectionFactory(java.lang.String, int)
CachingConnectionFactory最终会调用RabbitMQ的原生API,对RabbitMQ Client包下的ConnectionFactory的包装;
对于下面的这个写法,这个ConnectionFactory是Spring AMQP包下的,它继承了AbstractConnectionFactory,而AbstractConnectionFactory实现了Spring AMQP包下的ConnectionFactory;如下图;
ConnectionFactory connectionFactory = new CachingConnectionFactory();
org.springframework.amqp.core.Exchange实现类,每种实现对应一种交换机类型,如下图;
org.springframework.amqp.core.Queue#Queue(java.lang.String) 对应的参数,如下图;
org.springframework.amqp.core.Binding#Binding 对应的参数,如下图;
org.springframework.amqp.rabbit.core.RabbitAdmin#declareExchange RabbitAdmin使用该方法将交换机进行绑定,使用了RabbitTemplate对RabbitMQ Client包进行了封装;
最终在org.springframework.amqp.rabbit.core.RabbitAdmin#declareExchanges会有调用RabbitMQ Client的原生API,如下图;
测试代码
@Slf4j @Configuration public class RabbitConfig { @PostConstruct public void initRabbit() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("192.168.211.135"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/dev"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); Exchange exchange = new TopicExchange("exchange.order"); // 创建交换机 rabbitAdmin.declareExchange(exchange); Queue queue = new Queue("queue.order"); // 创建队列 rabbitAdmin.declareQueue(queue); Binding binding = new Binding( "queue.order", Binding.DestinationType.QUEUE, "exchange.order", "key.order", null ); // 新建绑定关系 rabbitAdmin.declareBinding(binding); } }
服务重启后,在RabbitMQ的管控台会出现新建的队列,交换机;
代码如下:
@Slf4j @Configuration public class RabbitConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("192.168.211.135"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/dev"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); // connectionFactory.createConnection(); return connectionFactory; } @Bean public Exchange exchange() { Exchange exchange = new TopicExchange("exchange.order"); return exchange; } @Bean public Queue queue() { Queue queue = new Queue("queue.order"); return queue; } @Bean public Binding binding() { Binding binding = new Binding( "queue.order", Binding.DestinationType.QUEUE, "exchange.order", "key.order", null ); return binding; } @Bean public RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); // 自动创建打开 rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
不过@Bean这种注入方式有点懒加载的意思,就是当你注入的Bean没有被使用时,它是不会被加载的;上面配置的Bean如果没有在其他地方被使用,这些Bean是不会被加载;
org.springframework.amqp.rabbit.core.RabbitAdmin类中实现了ApplicationContextAware,InitializingBean接口;之前讲过ApplicationContextAware是可以获取Spring容器的上下文,从而可以注入别的组件注入到该对象;InitializingBean是Bean创建后进行初始化的操作,此时是Bean对象已经创建;
在RabbitAdmin实现的InitializingBean接口的afterPropertiesSet方法,有如下一个操作;
org.springframework.amqp.rabbit.connection.CachingConnectionFactory#addConnectionListener
在RabbitMQ进行连接的时候会回调添加的listener;
在RabbitAdmin实现的afterPropertiesSet方法,最终会调用一个initialize方法;
org.springframework.amqp.rabbit.core.RabbitAdmin#initialize
通过applicationContext获取到所有的Exchange,Queue,Binding类型的Bean;
最终会创建相应的 Exchange,Queue,Binding;
当ConnectionFactory执行createConnection,相应的 Exchange,Queue,Binding通过Spring容器被创建;
connectionFactory.createConnection();
原文:https://www.cnblogs.com/coder-zyc/p/14891769.html