Appearance
SpringBoot中RabbitMQ可靠性保障
可靠性投递
什么是消息的可靠性投递?即保证消息百分百发送到消息队列中去,消息发送端需要接受到mq服务端接受到消息的确认应答。除此之外还应有完善的消息补偿机制,发送失败的消息可以再感知并二次处理。
生产者到交换机通过confirmCallback,交换机到队列通过returnCallback
可靠性投递confirmCallback
confirmCallback是生产者到交换机,可以理解为确认消息是否发送成功。新版依赖可靠性投递默认是关闭的,使用以下方法开启
properties
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated
properties
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
::
此时的配置文件
yaml
spring:
rabbitmq:
host: 111.5.111.111
port: 5672
virtual-host: /dev
password: password
username: admin
# 开启消息二次确认,生产者到broker的交换机
publisher-confirm-type: correlated
编码实实现confirmCallback
java
@Slf4j
@SpringBootTest
class RabbitmqDemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testConfirmCallback() {
/*
correlationData:配置
ack:交换机是否收到消息,true是成功,false是失败
cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("confirm==== ack={}", ack);
log.info("confirm==== cause={}", cause);
if (ack) {
log.info("发送成功,{}", cause);
} else {
log.error("发送失败,{}", cause);
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦!!");
}
}
可靠性投递returnCallback
returnCallback交换机到队列,消息从交换器发送到对应队列失败时触发。
第一步 开启returnCallback配置
propertiesspring.rabbitmq.publisher-returns=true #新版
第二步 修改交换机投递到队列失败的策略
properties#为true,则交换机处理消息到路由失败,则会返回给生产者 spring.rabbitmq.template.mandatory=true
完整配置参考
yaml
spring:
rabbitmq:
host: 111.5.111.111
port: 5672
virtual-host: /dev
password: password
username: admin
# 开启消息二次确认,生产者到broker的交换机
publisher-confirm-type: correlated
# 开启消息二次确认,交换机到队列的可靠性投递
publisher-returns: true
#为true,则交换机处理消息到路由失败,则会返回给生产者
template:
mandatory: true
编码实实现returnCallback
java
@Test
void testReturnCallback() {
// 为true,则交换机处理消息到路由失败,则会返回给生产者,开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
rabbitTemplate.setReturnsCallback(returnedMessage -> {
int code = returnedMessage.getReplyCode();
log.info("code={}", code);
log.info("returned={}", returnedMessage);
});
// 这个routingKey是不存在的,它找不到这个路由,所以会出现异常从而触发上面的回调方法
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xk857.order.new", "新订单来啦!!");
}
DANGER
开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制
消息确认机制ACK
消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
- 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
- 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
- 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked
我们也可以将其改成手工确认模式
yaml
spring:
rabbitmq:
#开启手动确认消息,如果消息重新入队,进行重试
listener:
simple:
acknowledge-mode: manual
重写之前的Handler
java
@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message.toString());
System.out.println("body="+body);
//成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
//channel.basicAck(msgTag,false); // 正常返回ACK确认信息
//channel.basicNack(msgTag,false,true); // 告诉broker,消息拒绝确认,最后一个true代表返回队列,为False代表丢弃
}