Appearance
RabbitMQ实现延时消息
什么是延迟队列?Producer将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。例如超时关单、优惠券回收等场景会用到。
RabbitMQ本身是不支持延迟队列的,我们可以通过死信队列的特性来实现延时队列。
延迟固定时间消费
常见适用场景:
- 超时关单:延迟15分钟后检查订单是否已支付,如果未支付则取消订单。
- 促销活动:在促销活动开始前,提前一定时间发送提醒消息给用户。
- 自动续订:在订阅服务到期前,提前通知用户进行续订。
java
@Slf4j
public class DelayedTest {
/**
* 构建死信队列参数
* @param dlxExchangeName 死信队列交换机名称
* @param routingKey 路由键
* @param delay 延迟时间
* @param unit 延迟单位
* @return
*/
private static Map<String, Object> buildDlxMap(String dlxExchangeName, String routingKey, long delay, TimeUnit unit) {
Map<String, Object> dlxMap = new HashMap<>();
// 超时放置死信队列的交换机名称
dlxMap.put("x-dead-letter-exchange", dlxExchangeName);
// 超时放置死信队列的路由键
dlxMap.put("x-dead-letter-routing-key", routingKey);
// 超时时间(超时放置死信队列)
dlxMap.put("x-message-ttl", unit.toMillis(delay));
return dlxMap;
}
}
java
@Slf4j
public class DelayedTest {
// 延时交换器(消息发给这个交换器)
private static final String EXCHANGE_NAME = "delayed_exchange";
// 死信交换器(消费绑定这个交换器,死亡后投递到这个交换机)
public final static String DLX_EXCHANGE_NAME = "dlx_topic_exchange"; //死信交换器
// 消息发送次数
private static final int SEND_NUM = 10;
private static final String DELAYED_QUEUE = "delayed_queue";
private static final String DELAYED_QUEUE1 = "delayed_queue001";
private static final String DLX_QUEUE = "dlx_queue";
private static final String DLX_QUEUE1 = "dlx_queue001";
@Test
public void send() throws IOException, TimeoutException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
// 1.设置消息超时某个时间后,后发送到哪个交换机
Map<String, Object> dlxMap = buildDlxMap(DLX_EXCHANGE_NAME, "key0", 10, TimeUnit.SECONDS);
channel.queueDeclare(DELAYED_QUEUE, true, false, false, dlxMap);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(DELAYED_QUEUE, EXCHANGE_NAME, "key0");
Map<String, Object> dlxMap1 = buildDlxMap(DLX_EXCHANGE_NAME, "key1", 10, TimeUnit.SECONDS);
channel.queueDeclare(DELAYED_QUEUE1, true, false, false, dlxMap1);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(DELAYED_QUEUE1, EXCHANGE_NAME, "key1");
// 2.声明死信队列
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");
channel.queueBind(DLX_QUEUE1, DLX_EXCHANGE_NAME, "key1");
String[] keys = new String[]{"key0", "key1"};
for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "发送RabbitMQ消息" + i + ":" + DateUtil.format(new Date(), "HH:mm:ss");
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes(StandardCharsets.UTF_8));
log.info("sendMessage:{}==={}", key, message);
}
channel.close();
connection.close();
}
}
java
@Slf4j
public class DelayedTest {
// 死信交换器,消费者消费这个交换器绑定队列的消息
public final static String DLX_EXCHANGE_NAME = "dlx_topic_exchange"; //死信交换器
private static final String DLX_QUEUE = "dlx_queue";
@Test
public void consumer() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
// 只消费死信队列的消息
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer:{}========{}", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(DLX_QUEUE, true, consumer);
Thread.sleep(100000);
}
}
java
@Test
public void consumer1() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
// 只消费死信队列的消息
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(DLX_QUEUE1, true, false, false, null);
channel.queueBind(DLX_QUEUE1, DLX_EXCHANGE_NAME, "key1");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer:{}========{}", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(DLX_QUEUE1, true, consumer);
Thread.sleep(100000);
}
通过日志我们能够观察出:
- 观察消费者日志确实发现延时10s才消费的消息。
- 生产者需要指定死信队列的交换机和队列名称以及绑定键。
- 消费者不关系消息发送到哪个交换机,只关心死信交换机、死信队列及绑定键。
shell
12:40:12.789 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息0:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息1:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息2:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息3:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息4:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息5:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息6:12:40:12
12:40:12.791 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息7:12:40:12
12:40:12.791 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息8:12:40:12
12:40:12.791 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息9:12:40:12
shell
12:40:03.188 [main] INFO DelayedTest - 等待 message.....
12:40:22.798 [pool-1-thread-4] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息0:12:40:12
12:40:22.799 [pool-1-thread-5] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息2:12:40:12
12:40:22.799 [pool-1-thread-5] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息4:12:40:12
12:40:22.799 [pool-1-thread-5] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息6:12:40:12
12:40:22.799 [pool-1-thread-5] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息8:12:40:12
shell
12:39:55.984 [main] INFO DelayedTest - 等待 message.....
12:40:22.797 [pool-1-thread-6] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息1:12:40:12
12:40:22.797 [pool-1-thread-7] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息3:12:40:12
12:40:22.797 [pool-1-thread-7] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息5:12:40:12
12:40:22.797 [pool-1-thread-7] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息7:12:40:12
12:40:22.797 [pool-1-thread-7] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息9:12:40:12
延迟任意时间消费
在上文中我们注意到,所有消息的发送延迟均为固定的10秒。然而实际应用中,可能需要为不同的消息设置不同的延迟时间,例如10秒或5分钟。为此,我们可以为每条消息单独设置超时时间。
java
@Slf4j
public class DelayedTest {
/**
* 构建死信队列参数
* @param dlxExchangeName 死信队列交换机名称
* @param routingKey 路由键
* @param delay 延迟时间
* @param unit 延迟单位
* @return
*/
private static Map<String, Object> buildDlxMap(String dlxExchangeName, String routingKey, long delay, TimeUnit unit) {
Map<String, Object> dlxMap = new HashMap<>();
// 超时放置死信队列的交换机名称
dlxMap.put("x-dead-letter-exchange", dlxExchangeName);
// 超时放置死信队列的路由键
dlxMap.put("x-dead-letter-routing-key", routingKey);
// 超时时间(超时放置死信队列)
if (delay > 0) {
dlxMap.put("x-message-ttl", unit.toMillis(delay));
}
return dlxMap;
}
}
java
@Slf4j
public class DelayedTest2 {
// 延时交换器
private static final String EXCHANGE_NAME = "delayed_exchange2";
// 死信交换器
public final static String DLX_EXCHANGE_NAME = "dlx_topic_exchange2";
private static final int SEND_NUM = 10;
private static final String DELAYED_QUEUE = "delayed_queue2";
private static final String DLX_QUEUE = "dlx_queue2";
@Test
public void send() throws IOException, TimeoutException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
// 1.设置消息超时某个时间后,后发送到哪个交换机
Map<String, Object> dlxMap = buildDlxMap(DLX_EXCHANGE_NAME, "key0", 10, TimeUnit.SECONDS);
channel.queueDeclare(DELAYED_QUEUE, true, false, false, dlxMap);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(DELAYED_QUEUE, EXCHANGE_NAME, "delay_key");
// 2.声明死信队列
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");
// Send messages
for (int i = 0; i < SEND_NUM; i++) {
String message = "发送RabbitMQ消息" + i + ":" + DateUtil.format(new Date(), "HH:mm:ss");
// 给每个消息设置延迟时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration(TimeUnit.SECONDS.toMillis(i + 1) + "").build();
channel.basicPublish(EXCHANGE_NAME, "delay_key", properties, message.getBytes(StandardCharsets.UTF_8));
log.info("sendMessage:{}==={}", "delay_key", message);
}
channel.close();
connection.close();
}
}
java
@Slf4j
public class DelayedTest2 {
// 死信交换器
public final static String DLX_EXCHANGE_NAME = "dlx_topic_exchange2";
private static final String DLX_QUEUE = "dlx_queue2";
@Test
public void consumer() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
// 只消费死信队列的消息
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer:{}========{}", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(DLX_QUEUE, true, consumer);
Thread.sleep(100000);
}
}
通过分析以下日志,我们注意到每条消息的消费时间确实在1至10秒之间不等,成功实现了各消息不同的延迟时间设定。
shell
12:57:28.764 [main] INFO DelayedTest2 - 等待 message.....
12:57:31.034 [pool-1-thread-4] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息0:12:57:30
12:57:32.035 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息1:12:57:30
12:57:33.035 [pool-1-thread-6] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息2:12:57:30
12:57:34.038 [pool-1-thread-7] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息3:12:57:30
12:57:35.036 [pool-1-thread-8] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息4:12:57:30
12:57:36.040 [pool-1-thread-9] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息5:12:57:30
12:57:37.037 [pool-1-thread-10] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息6:12:57:30
12:57:38.037 [pool-1-thread-11] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息7:12:57:30
12:57:39.040 [pool-1-thread-12] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息8:12:57:30
12:57:40.038 [pool-1-thread-13] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息9:12:57:30
shell
12:57:30.030 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息0:12:57:30
12:57:30.031 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息1:12:57:30
12:57:30.031 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息2:12:57:30
12:57:30.031 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息3:12:57:30
12:57:30.031 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息4:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息5:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息6:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息7:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息8:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息9:12:57:30
重要说明
如果我们把循环反过来呢?刚才的消息是1~10s发送,现在发送10~1s的发送,观察效果。
消费者代码不改变,生产者改变for循环
java
@Slf4j
public class DelayedTest2 {
@Test
public void send1() throws IOException, TimeoutException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
// 1.设置消息超时某个时间后,后发送到哪个交换机
Map<String, Object> dlxMap = buildDlxMap(DLX_EXCHANGE_NAME, "key0", 10, TimeUnit.SECONDS);
channel.queueDeclare(DELAYED_QUEUE, true, false, false, dlxMap);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(DELAYED_QUEUE, EXCHANGE_NAME, "delay_key");
// 2.声明死信队列
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");
// Send messages
for (int i = SEND_NUM - 1; i >= 0; i--) {
String message = "发送RabbitMQ消息" + i + ":" + DateUtil.format(new Date(), "HH:mm:ss");
// 给每个消息设置延迟时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration(TimeUnit.SECONDS.toMillis(i + 1) + "").build();
channel.basicPublish(EXCHANGE_NAME, "delay_key", properties, message.getBytes(StandardCharsets.UTF_8));
log.info("sendMessage:{}==={}", "delay_key", message);
}
channel.close();
connection.close();
}
}
shell
13:01:32.873 [main] INFO DelayedTest2 - 等待 message.....
13:01:48.427 [pool-1-thread-4] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息9:13:01:38
13:01:48.429 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息8:13:01:38
13:01:48.429 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息7:13:01:38
13:01:48.429 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息6:13:01:38
13:01:48.429 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息5:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息4:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息3:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息2:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息1:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息0:13:01:38
DANGER
通过日志可见,所有消息均延迟了10秒,这并非我们预期的结果。这表明在RabbitMQ的延时队列实现上存在一定的局限性,因为官方并未提供对此功能的直接支持。若要实现此功能,我们需另寻他法:
- 创建多个队列,如队列1延迟10秒,队列2延迟30秒,队列3延迟60秒,依此类推。
- 将所有消息的死信队列设置为同一队列,可以由单一消费者监听该死信队列。