application.yml的配置
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirms: true #消息发送到Broker,是否确认
publisher-returns: true #消息未送达队列,是否回调
listener:
simple:
acknowledge-mode: manual #消费确认模式,手动MANUAL,自动AUTO,无NONE
logging:
level:
org.springframework.amqp.rabbit.core.RabbitTemplate: debug
org.springframework.amqp.rabbit.AsyncRabbitTemplate: debug
确认部分日志代码:
public class AckAsyncRabbitTemplate extends AsyncRabbitTemplate {
public AckAsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {
super(template, container);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息主体:{},应答码:{},应答信息:{},交换机:{},路由键:{}", new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送成功:{}", correlationData);
} else {
log.info("消息发送失败:{}", cause);
}
}
}
1.消息发送确认
发送成功时
消息发送成功:CorrelationData [id=5799031a-a0df-49cb-aadd-af0db28edbd8]
当页面删除exchange之后,消息发送失败
消息发送失败:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘exchangeAck‘ in vhost ‘/‘, class-id=60, method-id=40)
消息主体:avcs,应答码:312,应答信息:NO_ROUTE,交换机:exchangeAck,路由键:A
2.消息消费确认
@Slf4j
public class AckConsumer {
int index = 1;
@RabbitListener(queues = QUEUE_NAME)
public void consume(Message message, Channel channel) {
String s = new String(message.getBody());
log.info("msg-{}", s);
try {
if ("ack".equals(s)) {
//手动确认消息已消费,可以直接删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if ("requeue".equals(s) && index < 5) {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
//重新入队列,测试结果看到后面发送的消息会被先消费,所以应该是排到队列尾部的
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
index++;
} else {
//不重新入队列,直接删除
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
index = 0;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.确保消息顺序消费 - 仅代表个人理解
一个队列中的消息是顺序被消费的,那么我们只需保证是同一个消费者即可
具体做法:比如同一批业务数据的处理,加一个唯一的线索mqID,当mqID一致时,都发送到某一个队列中去
PS:如果开启了手动确认机制,在消费消息1时失败(如消费过程中消费者宕机,rabbit服务端检测到链接断开,会自动将消息重新入队列,放到队列的后面去),那此时消息2被消息也会出问题,这种情况考虑在业务上做处理(如状态不符合),依次将这批消息重新入队列,重新从头开始消费
原文:https://www.cnblogs.com/Hleaves/p/13691455.html