rabbitmq安装延时插件 rabbitmq_delayed_message_exchange
1.到官网https://www.rabbitmq.com/community-plugins.html,下载对应版本的rabbitmq_delayed_message_exchange

2.将插件拷贝到rabbitmq的plugins目录下,我本地使用的docker启动的rabbitmq服务,使用命令 docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 容器ID:/plugins
3.进入容器内部,docker exec -it 5af /bin/bash, 进入plugins目录,查看是否拷贝成功 cd /plugins

4.启用延时插件,执行命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

5.打开web页面的Exchange模块,可以看见多了一种类型(PS:如果没有看见,可以重启一下服务)

6.测试
import org.springframework.amqp.core.*;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQExchangeConfig {
public static final String CUSTOM_EXCHANGE_NAME = "custom-Exchange";
public static final String QUEUEA_NAME = "queueA";
public static final String QUEUEB_NAME = "queueB";
public static final String ROUTING_KEY_A_NAME = "routingKeyA";
public static final String ROUTING_KEY_B_NAME = "routingKeyB";
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<String, Object>();
//路由策略,必填项,参考ExchangeTypes
args.put("x-delayed-type", "direct");
return new CustomExchange(CUSTOM_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
Queue queueA() {
return new Queue(QUEUEA_NAME);
}
@Bean
Queue queueB() {
return new Queue(QUEUEB_NAME);
}
@Bean
Binding bindingAC(Queue queueA, CustomExchange customExchange) {
return BindingBuilder.bind(queueA).to(customExchange).with(ROUTING_KEY_A_NAME).noargs();
}
@Bean
Binding bindingBC(Queue queueB, CustomExchange customExchange) {
return BindingBuilder.bind(queueB).to(customExchange).with(ROUTING_KEY_B_NAME).noargs();
}
//先初始化队列
@Bean
@ConditionalOnBean(Queue.class)
MQExchangeConsumer mqExchangeConsumer() {
return new MQExchangeConsumer();
}
}
//发送消息,注意:延时时间设置:Delay > 0, Delay =< ?ERL_MAX_T (In Erlang a timer can be set up to (2^32)-1 milliseconds in the future).
rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_A_NAME, msg + "a", message -> {
// 本质还是设置header的x-delay=10000,可以参考日志信息
message.getMessageProperties().setDelay(10000);
return message;
});
rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_B_NAME, msg + "b", message -> {
message.getMessageProperties().setHeader("x-delay", 20000);
return message;
});
//结果

7.停用延时插件
执行命令:rabbitmq-plugins disable rabbitmq_delayed_message_exchange。
注:停用后延时未分发的消息将会丢失
8.其他
消息分发前是存储在节点下的Mnesia table中,通过计时器调度实现分发,官网写到:这个插件的设计并不适合大量延迟消息的情况(例如100s数千条或数百万条)。因为随着mnesia数据库的增长,延迟消息的延时时间变得难以控制,就很难达到预期的效果
原文:https://www.cnblogs.com/Hleaves/p/13594278.html