Appearance
RabbitMQ消息可靠性保障
首先抛出两个问题:
- 生产者的消息有没有发送成功,是否正常路由给消费者了;
- 消费者收到消息,是否正确处理完毕,如果只处理了一半怎么办?
生产者可靠性保障
失败通知
在 RabbitMQ 中,生产者通过 Exchange 和 RoutingKey 发送消息,消费者监听队列进行消费。若 Exchange 不存在或 RoutingKey 无法路由,消息将无法投递。默认情况下,生产者无法感知消息是否成功到达。为确保可靠性,可通过设置 mandatory
标志启用失败通知,当消息无法投递时,RabbitMQ 会将其返回给生产者。
java
@Slf4j
public class DirectTest {
// 生产者绑定交换器名称
private static final String EXCHANGE_NAME = "direct_exchange";
// 消息发送次数
private static final int SEND_NUM = 10;
// 队列名称
private static final String QUEUE_NAME = "direct_queue";
@Test
public void send() throws IOException, TimeoutException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 处理发送失败的消息
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
String message = new String(body, StandardCharsets.UTF_8);
log.error("消息发送失败:路由键={}, message={}", routingKey, message);
});
String[] keys = new String[]{"key1", "key2"};
for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "发送RabbitMQ消息" + i;
// 注意第三个属性mandatory要设置成true
channel.basicPublish(EXCHANGE_NAME, key, true, null, message.getBytes(StandardCharsets.UTF_8));
log.info("sendMessage:{}==={}", key, message);
}
channel.waitForConfirms();
channel.close();
connection.close();
}
}
java
@Slf4j
public class DirectTest {
// 生产者绑定交换器名称
private static final String EXCHANGE_NAME = "direct_exchange";
// 队列名称
private static final String QUEUE_NAME = "direct_queue";
@Test
public void consumer1() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
// 绑定多个key
// channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, "key3");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received:{}========{}", envelope.getRoutingKey(), message);
}
};
//消费者在指定的对队列上消费
channel.basicConsume(QUEUE_NAME, true, consumer);
// 保持主线程不退出
Thread.sleep(100000);
}
}
控制台日志如下:
shell
16:21:30.592 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息0
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息1
16:21:30.593 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息2
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息3
16:21:30.593 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息4
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息5
16:21:30.593 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息6
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息7
16:21:30.593 [main] INFO DirectTest - sendMessage:key1===发送RabbitMQ消息8
16:21:30.593 [main] INFO DirectTest - sendMessage:key2===发送RabbitMQ消息9
16:21:30.593 [AMQP Connection 127.0.0.1:5672] ERROR DirectTest - 消息发送失败:路由键=key2, message=发送RabbitMQ消息1
16:21:30.593 [AMQP Connection 127.0.0.1:5672] ERROR DirectTest - 消息发送失败:路由键=key2, message=发送RabbitMQ消息3
16:21:30.593 [AMQP Connection 127.0.0.1:5672] ERROR DirectTest - 消息发送失败:路由键=key2, message=发送RabbitMQ消息5
16:21:30.594 [AMQP Connection 127.0.0.1:5672] ERROR DirectTest - 消息发送失败:路由键=key2, message=发送RabbitMQ消息7
shell
16:21:37.017 [main] INFO DirectTest - 等待 message.....
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息0
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息2
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息4
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息6
16:21:37.022 [pool-1-thread-5] INFO DirectTest - Received:key1========发送RabbitMQ消息8
发送方确认
发送方确认机制是指生产者投递消息后,Broker 接收到消息后会向生产者发送一个应答,生产者通过接收应答确认消息是否成功发送到 Broker。这是消息可靠性投递的核心保障。RabbitMQ 的消息发送分为两个阶段:
- 消息发送到 Broker,即发送到 Exchange。
- 消息通过 Exchange 路由到 Queue。
一旦消息成功投递到 Queue,Queue 会向生产者发送确认通知。如果启用了消息持久化,Queue 会等待消息持久化到磁盘后再发送通知。
需要注意的是,发送方确认仅在 RabbitMQ 内部错误导致消息无法投递时才会失败。对于消息不可路由的情况,发送方确认的行为需要单独分析(笔者目前认为,只有RabbitMQ宕机或MQ内部有BUG才会出现这个情况)。
消费者代码不变,生产者代码如下所示:
java
@Slf4j
public class DirectTest {
private static final String EXCHANGE_NAME = "direct_exchange";
private static final int SEND_NUM = 10;
private static final String QUEUE_NAME = "direct_queue";
@Test
public void send() throws IOException, TimeoutException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 启用发送方确认模式
channel.confirmSelect();
// 添加确认监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
log.info("消息发送成功: deliveryTag={}, multiple={}", deliveryTag, multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
log.error("消息发送失败: deliveryTag={}, multiple={}", deliveryTag, multiple);
}
});
String[] keys = new String[]{"key1", "key2"};
for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "发送RabbitMQ消息" + i;
channel.basicPublish(EXCHANGE_NAME, key, true, null, message.getBytes(StandardCharsets.UTF_8));
log.info("sendMessage:{}==={}", key, message);
}
// 等待所有消息的确认
channel.waitForConfirms();
channel.close();
connection.close();
}
}
Broker丢失消息
从 Broker 的角度来看,消息可靠性传输的关键在于如何确保消息在 MQ 挂掉重启后仍然存在。假设生产者已成功将消息发送到交换机并路由到队列,但在消费者未消费前 MQ 宕机,若重启后消息丢失,则无法保证消息的可靠性。
为解决这一问题,需开启 RabbitMQ 的持久化机制。通过将消息持久化到磁盘,即使 MQ 挂掉重启,也能自动恢复之前存储的数据,从而确保消息不丢失。这是保证消息可靠性传输的重要措施。
我们将消息设置成持久化消息,发送消息时,设置消息的deliveryMode=2。
java
@Slf4j
public class DirectTest {
// 生产者绑定交换器名称
private static final String EXCHANGE_NAME = "direct_exchange";
// 消息发送次数
private static final int SEND_NUM = 10;
// 队列名称
private static final String QUEUE_NAME = "direct_queue";
@Test
public void send() throws IOException, TimeoutException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String[] keys = new String[]{"key1", "key2"};
for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "发送RabbitMQ消息" + i;
// 设置消息持久化,2表示持久化消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, key, true, properties, message.getBytes(StandardCharsets.UTF_8));
log.info("sendMessage:{}==={}", key, message);
}
channel.close();
connection.close();
}
}
通过以上方式,可以确保大部分消息在 Broker 中不会丢失,但在极端情况下仍有可能丢失。例如,当消息到达队列后还未持久化到磁盘时,若 MQ 突然挂掉,消息可能会丢失。
为解决这一问题,需将 MQ 的持久化机制与发送方确认(Confirm)机制结合使用。只有当消息成功写入磁盘后,Broker 才会向生产者发送确认(ack)信号。若在持久化之前 MQ 挂掉,生产者因未收到 ack 信号,可以进行消息重发,从而进一步保障消息的可靠性。
消费者可靠性保障
当消费者接收到消息但未处理完时,若进程挂掉(如重启或断电),MQ 会认为消息已消费并从队列中删除,导致消息丢失。为避免此问题,可使用 RabbitMQ 的手动确认(ack)机制。默认情况下,RabbitMQ 为自动 ack,需改为手动 ack,确保程序在处理完消息后手动提交 ack。若未提交 ack,MQ 不会删除消息,而是将其重新分发给其他消费者,从而保证消息不丢失。
消费者手动确认ACK
channel.basicConsume(QUEUE_NAME, false, consumer);
第二个参数设置为false;
java
@Slf4j
public class DirectTest {
private static final String EXCHANGE_NAME = "direct_exchange";
private static final int SEND_NUM = 10;
private static final String QUEUE_NAME = "direct_queue";
@Test
public void consumer1() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received:{}========{}", envelope.getRoutingKey(), message);
try {
// 模拟消息处理
log.info("处理消息: {}", message);
Thread.sleep(1000); // 模拟处理耗时
log.info("消息处理完成: {}", message);
// 手动确认消息(ACK)
channel.basicAck(envelope.getDeliveryTag(), false);
log.info("已发送ACK: {}", envelope.getDeliveryTag());
} catch (InterruptedException e) {
log.error("消息处理异常: {}", e.getMessage());
// 如果处理失败,可以拒绝消息并重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
log.info("消息重新入队: {}", envelope.getDeliveryTag());
}
}
};
// 消费者在指定的队列上消费,关闭自动确认(autoAck=false)
channel.basicConsume(QUEUE_NAME, false, consumer);
Thread.sleep(100000);
}
}