Skip to content

SpringBoot中RabbitMQ可靠性保障

可靠性投递

什么是消息的可靠性投递?即保证消息百分百发送到消息队列中去,消息发送端需要接受到mq服务端接受到消息的确认应答。除此之外还应有完善的消息补偿机制,发送失败的消息可以再感知并二次处理。

生产者到交换机通过confirmCallback,交换机到队列通过returnCallback

可靠性投递confirmCallback

confirmCallback是生产者到交换机,可以理解为确认消息是否发送成功。新版依赖可靠性投递默认是关闭的,使用以下方法开启

properties
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated
properties
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true

::

此时的配置文件

yaml
spring:
  rabbitmq:
    host: 111.5.111.111
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    # 开启消息二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated

编码实实现confirmCallback

java
@Slf4j
@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testConfirmCallback() {
        /*
          correlationData:配置
          ack:交换机是否收到消息,true是成功,false是失败
          cause:失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("confirm==== ack={}", ack);
            log.info("confirm==== cause={}", cause);
            if (ack) {
                log.info("发送成功,{}", cause);
            } else {
                log.error("发送失败,{}", cause);
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦!!");
    }
}

可靠性投递returnCallback

returnCallback交换机到队列,消息从交换器发送到对应队列失败时触发。

  • 第一步 开启returnCallback配置

    properties
    spring.rabbitmq.publisher-returns=true #新版
  • 第二步 修改交换机投递到队列失败的策略

    properties
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    spring.rabbitmq.template.mandatory=true

完整配置参考

yaml
spring:
  rabbitmq:
    host: 111.5.111.111
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    # 开启消息二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated
    # 开启消息二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true

编码实实现returnCallback

java
@Test
void testReturnCallback() {
    // 为true,则交换机处理消息到路由失败,则会返回给生产者,开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
    rabbitTemplate.setReturnsCallback(returnedMessage -> {
        int code = returnedMessage.getReplyCode();
        log.info("code={}", code);
        log.info("returned={}", returnedMessage);
    });
    // 这个routingKey是不存在的,它找不到这个路由,所以会出现异常从而触发上面的回调方法
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xk857.order.new", "新订单来啦!!");
}

DANGER

开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制

消息确认机制ACK

消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除

  • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
  • 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  • 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked

我们也可以将其改成手工确认模式

yaml
spring:
  rabbitmq:
    #开启手动确认消息,如果消息重新入队,进行重试
    listener:
      simple:
        acknowledge-mode: manual

重写之前的Handler

java
@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
    long msgTag = message.getMessageProperties().getDeliveryTag();
    System.out.println("msgTag="+msgTag);
    System.out.println("message="+message.toString());
    System.out.println("body="+body);

    //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
    //channel.basicAck(msgTag,false); // 正常返回ACK确认信息
    //channel.basicNack(msgTag,false,true); // 告诉broker,消息拒绝确认,最后一个true代表返回队列,为False代表丢弃
}