Skip to content

RabbitMQ实现延时消息

什么是延迟队列?Producer将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。例如超时关单、优惠券回收等场景会用到。

RabbitMQ本身是不支持延迟队列的,我们可以通过死信队列的特性来实现延时队列。

image-20220811111213039

延迟固定时间消费

常见适用场景:

  1. 超时关单:延迟15分钟后检查订单是否已支付,如果未支付则取消订单。
  2. 促销活动:在促销活动开始前,提前一定时间发送提醒消息给用户。
  3. 自动续订:在订阅服务到期前,提前通知用户进行续订。
java
@Slf4j
public class DelayedTest {

    /**
     * 构建死信队列参数
     * @param dlxExchangeName 死信队列交换机名称
     * @param routingKey 路由键
     * @param delay 延迟时间
     * @param unit 延迟单位
     * @return
     */
    private static Map<String, Object> buildDlxMap(String dlxExchangeName, String routingKey, long delay, TimeUnit unit) {
        Map<String, Object> dlxMap = new HashMap<>();
        // 超时放置死信队列的交换机名称
        dlxMap.put("x-dead-letter-exchange", dlxExchangeName);
        // 超时放置死信队列的路由键
        dlxMap.put("x-dead-letter-routing-key", routingKey);
        // 超时时间(超时放置死信队列)
        dlxMap.put("x-message-ttl", unit.toMillis(delay));
        return dlxMap;
    }
}
java
@Slf4j
public class DelayedTest {

    // 延时交换器(消息发给这个交换器)
    private static final String EXCHANGE_NAME = "delayed_exchange";
    // 死信交换器(消费绑定这个交换器,死亡后投递到这个交换机)
    public final static String DLX_EXCHANGE_NAME = "dlx_topic_exchange"; //死信交换器
    // 消息发送次数
    private static final int SEND_NUM = 10;
    private static final String DELAYED_QUEUE = "delayed_queue";
    private static final String DELAYED_QUEUE1 = "delayed_queue001";
    private static final String DLX_QUEUE = "dlx_queue";
    private static final String DLX_QUEUE1 = "dlx_queue001";

    @Test
    public void send() throws IOException, TimeoutException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();

        // 1.设置消息超时某个时间后,后发送到哪个交换机
        Map<String, Object> dlxMap = buildDlxMap(DLX_EXCHANGE_NAME, "key0", 10, TimeUnit.SECONDS);
        channel.queueDeclare(DELAYED_QUEUE, true, false, false, dlxMap);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueBind(DELAYED_QUEUE, EXCHANGE_NAME, "key0");

        Map<String, Object> dlxMap1 = buildDlxMap(DLX_EXCHANGE_NAME, "key1", 10, TimeUnit.SECONDS);
        channel.queueDeclare(DELAYED_QUEUE1, true, false, false, dlxMap1);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueBind(DELAYED_QUEUE1, EXCHANGE_NAME, "key1");

        // 2.声明死信队列
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(DLX_QUEUE, true, false, false, null);
        channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");
        channel.queueBind(DLX_QUEUE1, DLX_EXCHANGE_NAME, "key1");

        String[] keys = new String[]{"key0", "key1"};
        for (int i = 0; i < SEND_NUM; i++) {
            String key = keys[i % keys.length];
            String message = "发送RabbitMQ消息" + i + ":" + DateUtil.format(new Date(), "HH:mm:ss");
            channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes(StandardCharsets.UTF_8));
            log.info("sendMessage:{}==={}", key, message);
        }

        channel.close();
        connection.close();
    }
}
java
@Slf4j
public class DelayedTest {

    // 死信交换器,消费者消费这个交换器绑定队列的消息
    public final static String DLX_EXCHANGE_NAME = "dlx_topic_exchange"; //死信交换器
    private static final String DLX_QUEUE = "dlx_queue";

    @Test
    public void consumer() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();

        // 只消费死信队列的消息
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(DLX_QUEUE, true, false, false, null);
        channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");

        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer:{}========{}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(DLX_QUEUE, true, consumer);
        Thread.sleep(100000);
    }
}
java
@Test
public void consumer1() throws IOException, InterruptedException {
    Connection connection = MQConstant.buildConnection();
    Channel channel = connection.createChannel();

    // 只消费死信队列的消息
    channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
    channel.queueDeclare(DLX_QUEUE1, true, false, false, null);
    channel.queueBind(DLX_QUEUE1, DLX_EXCHANGE_NAME, "key1");

    log.info("等待 message.....");
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, StandardCharsets.UTF_8);
            log.info("Received in consumer:{}========{}", envelope.getRoutingKey(), message);
        }
    };

    channel.basicConsume(DLX_QUEUE1, true, consumer);
    Thread.sleep(100000);
}

通过日志我们能够观察出:

  1. 观察消费者日志确实发现延时10s才消费的消息。
  2. 生产者需要指定死信队列的交换机和队列名称以及绑定键。
  3. 消费者不关系消息发送到哪个交换机,只关心死信交换机、死信队列及绑定键。
shell
12:40:12.789 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息0:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息1:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息2:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息3:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息4:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息5:12:40:12
12:40:12.790 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息6:12:40:12
12:40:12.791 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息7:12:40:12
12:40:12.791 [main] INFO DelayedTest - sendMessage:key0===发送RabbitMQ消息8:12:40:12
12:40:12.791 [main] INFO DelayedTest - sendMessage:key1===发送RabbitMQ消息9:12:40:12
shell
12:40:03.188 [main] INFO DelayedTest - 等待 message.....
12:40:22.798 [pool-1-thread-4] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息0:12:40:12
12:40:22.799 [pool-1-thread-5] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息2:12:40:12
12:40:22.799 [pool-1-thread-5] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息4:12:40:12
12:40:22.799 [pool-1-thread-5] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息6:12:40:12
12:40:22.799 [pool-1-thread-5] INFO DelayedTest - Received in consumer:key0========发送RabbitMQ消息8:12:40:12
shell
12:39:55.984 [main] INFO DelayedTest - 等待 message.....
12:40:22.797 [pool-1-thread-6] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息1:12:40:12
12:40:22.797 [pool-1-thread-7] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息3:12:40:12
12:40:22.797 [pool-1-thread-7] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息5:12:40:12
12:40:22.797 [pool-1-thread-7] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息7:12:40:12
12:40:22.797 [pool-1-thread-7] INFO DelayedTest - Received in consumer:key1========发送RabbitMQ消息9:12:40:12

延迟任意时间消费

在上文中我们注意到,所有消息的发送延迟均为固定的10秒。然而实际应用中,可能需要为不同的消息设置不同的延迟时间,例如10秒或5分钟。为此,我们可以为每条消息单独设置超时时间。

java
@Slf4j
public class DelayedTest {

    /**
     * 构建死信队列参数
     * @param dlxExchangeName 死信队列交换机名称
     * @param routingKey 路由键
     * @param delay 延迟时间
     * @param unit 延迟单位
     * @return
     */
    private static Map<String, Object> buildDlxMap(String dlxExchangeName, String routingKey, long delay, TimeUnit unit) {
        Map<String, Object> dlxMap = new HashMap<>();
        // 超时放置死信队列的交换机名称
        dlxMap.put("x-dead-letter-exchange", dlxExchangeName);
        // 超时放置死信队列的路由键
        dlxMap.put("x-dead-letter-routing-key", routingKey);
        // 超时时间(超时放置死信队列)
        if (delay > 0) {
            dlxMap.put("x-message-ttl", unit.toMillis(delay));
        }
        return dlxMap;
    }
}
java
@Slf4j
public class DelayedTest2 {

    // 延时交换器
    private static final String EXCHANGE_NAME = "delayed_exchange2";
    // 死信交换器
    public final static String DLX_EXCHANGE_NAME = "dlx_topic_exchange2";
    private static final int SEND_NUM = 10;
    private static final String DELAYED_QUEUE = "delayed_queue2";
    private static final String DLX_QUEUE = "dlx_queue2";

    @Test
    public void send() throws IOException, TimeoutException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();

        // 1.设置消息超时某个时间后,后发送到哪个交换机
        Map<String, Object> dlxMap = buildDlxMap(DLX_EXCHANGE_NAME, "key0", 10, TimeUnit.SECONDS);
        channel.queueDeclare(DELAYED_QUEUE, true, false, false, dlxMap);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueBind(DELAYED_QUEUE, EXCHANGE_NAME, "delay_key");

        // 2.声明死信队列
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(DLX_QUEUE, true, false, false, null);
        channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");

        // Send messages
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "发送RabbitMQ消息" + i + ":" + DateUtil.format(new Date(), "HH:mm:ss");
            // 给每个消息设置延迟时间
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration(TimeUnit.SECONDS.toMillis(i + 1) + "").build();
            channel.basicPublish(EXCHANGE_NAME, "delay_key", properties, message.getBytes(StandardCharsets.UTF_8));
            log.info("sendMessage:{}==={}", "delay_key", message);
        }

        channel.close();
        connection.close();
    }
}
java
@Slf4j
public class DelayedTest2 {

    // 死信交换器
    public final static String DLX_EXCHANGE_NAME = "dlx_topic_exchange2";
    private static final String DLX_QUEUE = "dlx_queue2";

    @Test
    public void consumer() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();

        // 只消费死信队列的消息
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(DLX_QUEUE, true, false, false, null);
        channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");

        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer:{}========{}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(DLX_QUEUE, true, consumer);
        Thread.sleep(100000);
    }
}

通过分析以下日志,我们注意到每条消息的消费时间确实在1至10秒之间不等,成功实现了各消息不同的延迟时间设定。

shell
12:57:28.764 [main] INFO DelayedTest2 - 等待 message.....
12:57:31.034 [pool-1-thread-4] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息0:12:57:30
12:57:32.035 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息1:12:57:30
12:57:33.035 [pool-1-thread-6] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息2:12:57:30
12:57:34.038 [pool-1-thread-7] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息3:12:57:30
12:57:35.036 [pool-1-thread-8] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息4:12:57:30
12:57:36.040 [pool-1-thread-9] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息5:12:57:30
12:57:37.037 [pool-1-thread-10] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息6:12:57:30
12:57:38.037 [pool-1-thread-11] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息7:12:57:30
12:57:39.040 [pool-1-thread-12] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息8:12:57:30
12:57:40.038 [pool-1-thread-13] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息9:12:57:30
shell
12:57:30.030 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息0:12:57:30
12:57:30.031 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息1:12:57:30
12:57:30.031 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息2:12:57:30
12:57:30.031 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息3:12:57:30
12:57:30.031 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息4:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息5:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息6:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息7:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息8:12:57:30
12:57:30.032 [main] INFO DelayedTest2 - sendMessage:delay_key===发送RabbitMQ消息9:12:57:30

重要说明

如果我们把循环反过来呢?刚才的消息是1~10s发送,现在发送10~1s的发送,观察效果。

消费者代码不改变,生产者改变for循环

java
@Slf4j
public class DelayedTest2 {

    @Test
    public void send1() throws IOException, TimeoutException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();

        // 1.设置消息超时某个时间后,后发送到哪个交换机
        Map<String, Object> dlxMap = buildDlxMap(DLX_EXCHANGE_NAME, "key0", 10, TimeUnit.SECONDS);
        channel.queueDeclare(DELAYED_QUEUE, true, false, false, dlxMap);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueBind(DELAYED_QUEUE, EXCHANGE_NAME, "delay_key");

        // 2.声明死信队列
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(DLX_QUEUE, true, false, false, null);
        channel.queueBind(DLX_QUEUE, DLX_EXCHANGE_NAME, "key0");

        // Send messages
        for (int i = SEND_NUM - 1; i >= 0; i--) {
            String message = "发送RabbitMQ消息" + i + ":" + DateUtil.format(new Date(), "HH:mm:ss");
            // 给每个消息设置延迟时间
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration(TimeUnit.SECONDS.toMillis(i + 1) + "").build();
            channel.basicPublish(EXCHANGE_NAME, "delay_key", properties, message.getBytes(StandardCharsets.UTF_8));
            log.info("sendMessage:{}==={}", "delay_key", message);
        }

        channel.close();
        connection.close();
    }
}
shell
13:01:32.873 [main] INFO DelayedTest2 - 等待 message.....
13:01:48.427 [pool-1-thread-4] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息9:13:01:38
13:01:48.429 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息8:13:01:38
13:01:48.429 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息7:13:01:38
13:01:48.429 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息6:13:01:38
13:01:48.429 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息5:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息4:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息3:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息2:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息1:13:01:38
13:01:48.430 [pool-1-thread-5] INFO DelayedTest2 - Received in consumer:key0========发送RabbitMQ消息0:13:01:38

DANGER

通过日志可见,所有消息均延迟了10秒,这并非我们预期的结果。这表明在RabbitMQ的延时队列实现上存在一定的局限性,因为官方并未提供对此功能的直接支持。若要实现此功能,我们需另寻他法:

  1. 创建多个队列,如队列1延迟10秒,队列2延迟30秒,队列3延迟60秒,依此类推。
  2. 将所有消息的死信队列设置为同一队列,可以由单一消费者监听该死信队列。