Skip to content

RabbitMQ消息可靠性保障

首先抛出两个问题:

  1. 生产者的消息有没有发送成功,是否正常路由给消费者了;
  2. 消费者收到消息,是否正确处理完毕,如果只处理了一半怎么办?

生产者可靠性保障

失败通知

在 RabbitMQ 中,生产者通过 Exchange 和 RoutingKey 发送消息,消费者监听队列进行消费。若 Exchange 不存在或 RoutingKey 无法路由,消息将无法投递。默认情况下,生产者无法感知消息是否成功到达。为确保可靠性,可通过设置 mandatory 标志启用失败通知,当消息无法投递时,RabbitMQ 会将其返回给生产者。

java
@Slf4j
public class DirectTest {

    // 生产者绑定交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange";
    // 消息发送次数
    private static final int SEND_NUM = 10;
    // 队列名称
    private static final String QUEUE_NAME = "direct_queue";

    @Test
    public void send() throws IOException, TimeoutException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 处理发送失败的消息
        channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
            String message = new String(body, StandardCharsets.UTF_8);
            log.error("消息发送失败:路由键={}, message={}", routingKey, message);
        });

        String[] keys = new String[]{"key1", "key2"};

        for (int i = 0; i < SEND_NUM; i++) {
            String key = keys[i % keys.length];
            String message = "发送RabbitMQ消息" + i;
          	// 注意第三个属性mandatory要设置成true
            channel.basicPublish(EXCHANGE_NAME, key, true, null, message.getBytes(StandardCharsets.UTF_8));
            log.info("sendMessage:{}==={}", key, message);
        }
        
        channel.waitForConfirms();
        channel.close();
        connection.close();
    }
}
java
@Slf4j
public class DirectTest {

    // 生产者绑定交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange";
    // 队列名称
    private static final String QUEUE_NAME = "direct_queue";
  
    @Test
    public void consumer1() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
        // 绑定多个key
        // channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, "key3");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        //消费者在指定的对队列上消费
        channel.basicConsume(QUEUE_NAME, true, consumer);
        // 保持主线程不退出
        Thread.sleep(100000);
    }
}

控制台日志如下:

shell
16:21:30.592 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息0
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息1
16:21:30.593 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息2
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息3
16:21:30.593 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息4
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息5
16:21:30.593 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息6
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息7
16:21:30.593 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息8
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息9
16:21:30.593 [AMQP Connection 127.0.0.1:5672] ERROR DirectTest - 消息发送失败:路由键=key2, message=发送RabbitMQ消息1
16:21:30.593 [AMQP Connection 127.0.0.1:5672] ERROR DirectTest - 消息发送失败:路由键=key2, message=发送RabbitMQ消息3
16:21:30.593 [AMQP Connection 127.0.0.1:5672] ERROR DirectTest - 消息发送失败:路由键=key2, message=发送RabbitMQ消息5
16:21:30.594 [AMQP Connection 127.0.0.1:5672] ERROR DirectTest - 消息发送失败:路由键=key2, message=发送RabbitMQ消息7
shell
16:21:37.017 [main] INFO DirectTest - 等待 message.....
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息0
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息2
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息4
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息6
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息8

发送方确认

发送方确认机制是指生产者投递消息后,Broker 接收到消息后会向生产者发送一个应答,生产者通过接收应答确认消息是否成功发送到 Broker。这是消息可靠性投递的核心保障。RabbitMQ 的消息发送分为两个阶段:

  1. 消息发送到 Broker,即发送到 Exchange。
  2. 消息通过 Exchange 路由到 Queue。

一旦消息成功投递到 Queue,Queue 会向生产者发送确认通知。如果启用了消息持久化,Queue 会等待消息持久化到磁盘后再发送通知。

需要注意的是,发送方确认仅在 RabbitMQ 内部错误导致消息无法投递时才会失败。对于消息不可路由的情况,发送方确认的行为需要单独分析(笔者目前认为,只有RabbitMQ宕机或MQ内部有BUG才会出现这个情况)。

消费者代码不变,生产者代码如下所示:

java
@Slf4j
public class DirectTest {

    private static final String EXCHANGE_NAME = "direct_exchange";
    private static final int SEND_NUM = 10;
    private static final String QUEUE_NAME = "direct_queue";

    @Test
    public void send() throws IOException, TimeoutException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 启用发送方确认模式
        channel.confirmSelect();

        // 添加确认监听器
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) {
                log.info("消息发送成功: deliveryTag={}, multiple={}", deliveryTag, multiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) {
                log.error("消息发送失败: deliveryTag={}, multiple={}", deliveryTag, multiple);
            }
        });

        String[] keys = new String[]{"key1", "key2"};
        for (int i = 0; i < SEND_NUM; i++) {
            String key = keys[i % keys.length];
            String message = "发送RabbitMQ消息" + i;
            channel.basicPublish(EXCHANGE_NAME, key, true, null, message.getBytes(StandardCharsets.UTF_8));
            log.info("sendMessage:{}==={}", key, message);
        }

        // 等待所有消息的确认
        channel.waitForConfirms();
        channel.close();
        connection.close();
    }
}

Broker丢失消息

从 Broker 的角度来看,消息可靠性传输的关键在于如何确保消息在 MQ 挂掉重启后仍然存在。假设生产者已成功将消息发送到交换机并路由到队列,但在消费者未消费前 MQ 宕机,若重启后消息丢失,则无法保证消息的可靠性。

为解决这一问题,需开启 RabbitMQ 的持久化机制。通过将消息持久化到磁盘,即使 MQ 挂掉重启,也能自动恢复之前存储的数据,从而确保消息不丢失。这是保证消息可靠性传输的重要措施。

我们将消息设置成持久化消息,发送消息时,设置消息的deliveryMode=2。

java
@Slf4j
public class DirectTest {

    // 生产者绑定交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange";
    // 消息发送次数
    private static final int SEND_NUM = 10;
    // 队列名称
    private static final String QUEUE_NAME = "direct_queue";

    @Test
    public void send() throws IOException, TimeoutException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String[] keys = new String[]{"key1", "key2"};
        for (int i = 0; i < SEND_NUM; i++) {
            String key = keys[i % keys.length];
            String message = "发送RabbitMQ消息" + i;
            // 设置消息持久化,2表示持久化消息
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
            channel.basicPublish(EXCHANGE_NAME, key, true, properties, message.getBytes(StandardCharsets.UTF_8));
            log.info("sendMessage:{}==={}", key, message);
        }
        channel.close();
        connection.close();
    }
}

通过以上方式,可以确保大部分消息在 Broker 中不会丢失,但在极端情况下仍有可能丢失。例如,当消息到达队列后还未持久化到磁盘时,若 MQ 突然挂掉,消息可能会丢失。

为解决这一问题,需将 MQ 的持久化机制与发送方确认(Confirm)机制结合使用。只有当消息成功写入磁盘后,Broker 才会向生产者发送确认(ack)信号。若在持久化之前 MQ 挂掉,生产者因未收到 ack 信号,可以进行消息重发,从而进一步保障消息的可靠性。

消费者可靠性保障

当消费者接收到消息但未处理完时,若进程挂掉(如重启或断电),MQ 会认为消息已消费并从队列中删除,导致消息丢失。为避免此问题,可使用 RabbitMQ 的手动确认(ack)机制。默认情况下,RabbitMQ 为自动 ack,需改为手动 ack,确保程序在处理完消息后手动提交 ack。若未提交 ack,MQ 不会删除消息,而是将其重新分发给其他消费者,从而保证消息不丢失。

消费者手动确认ACK

channel.basicConsume(QUEUE_NAME, false, consumer);第二个参数设置为false;

java
@Slf4j
public class DirectTest {

    private static final String EXCHANGE_NAME = "direct_exchange";
    private static final int SEND_NUM = 10;
    private static final String QUEUE_NAME = "direct_queue";

  	@Test
    public void consumer1() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received:{}========{}", envelope.getRoutingKey(), message);
                try {
                    // 模拟消息处理
                    log.info("处理消息: {}", message);
                    Thread.sleep(1000); // 模拟处理耗时
                    log.info("消息处理完成: {}", message);

                    // 手动确认消息(ACK)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    log.info("已发送ACK: {}", envelope.getDeliveryTag());
                } catch (InterruptedException e) {
                    log.error("消息处理异常: {}", e.getMessage());
                    // 如果处理失败,可以拒绝消息并重新入队
                    channel.basicNack(envelope.getDeliveryTag(), false, true);
                    log.info("消息重新入队: {}", envelope.getDeliveryTag());
                }
            }
        };
        // 消费者在指定的队列上消费,关闭自动确认(autoAck=false)
        channel.basicConsume(QUEUE_NAME, false, consumer);
        Thread.sleep(100000);
    }
}