Skip to content

RabbitMQ极速入门-四大交换机

重要概念

  1. Publisher[消息生产者]:负责生产消息并将其发送到指定的交换机。
  2. Message[消息]:由消息头和消息体组成;消息头包含元数据,如交换机名称、路由键;消息体是实际传递的数据。
  3. Exchange[交换机]:接收来自生产者的消息,并根据路由规则将消息发送到一个或多个队列。
  4. BindingKey[绑定键]:用于在交换机和队列之间建立绑定关系。
  5. RoutingKey[路由键]:生产者发送消息时指定的路由规则。与绑定键匹配时,消息被路由到相应的队列。
  6. Queue[队列]:消息存储载体,多个消费者可以订阅同一个队列,消息以轮询方式分发给消费者。
  7. Consumer[消息消费者]:订阅并消费队列中的消息。
  8. Channel[通道]:在每个连接中可建立多个通道,每个通道代表一个会话任务。通过通道复用 TCP 连接,减少性能开销。
  9. Virtual Host[虚拟主机]:实现逻辑分组和资源隔离。每个虚拟主机是一个独立的 RabbitMQ 实例,拥有独立的队列、交换机和绑定关系,确保业务隔离和数据安全。
  10. Broker[消息代理]:即消息队列服务器实体。

WARNING

  • 如果消息没有找到可以发送的地方,将取决于 mandatory 属性,可能会返回给生产者或被丢弃,默认丢弃。
  • 消息确认机制: ① 消息发送后即认为消费成功,可能导致消息丢失。② 需手动确认,确保消息可靠性,但降低吞吐量。

通用配置

首先是工具类用于获取链接(实际使用时需要放到配置文件,这里代码为了简便,所以没有规范书写)。

java
public interface MQConstant {

    String RABBITMQ_HOST = "localhost";
    Integer RABBITMQ_PORT = 5672;
    String RABBITMQ_USERNAME = "guest";
    String RABBITMQ_PASSWORD = "guest";

    static Connection buildConnection() {
        //创建连接工厂,连接RabbitMQ
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(MQConstant.RABBITMQ_HOST);//端口号、用户名、密码可以使用默认的
        connectionFactory.setUsername(MQConstant.RABBITMQ_USERNAME);
        connectionFactory.setPassword(MQConstant.RABBITMQ_PASSWORD);
        connectionFactory.setPort(MQConstant.RABBITMQ_PORT);
        try {
            // 创建连接
            return connectionFactory.newConnection();
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}

直连交换机

定义一个测试类,消费者仅消费路由键为key1的消息,观察消费者输出,

java
@Slf4j
public class DirectTest {

    // 生产者绑定交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange";
    // 消息发送次数
    private static final int SEND_NUM = 10;

    @Test
    public void send() throws IOException, TimeoutException {
        // 获取信道,设置成直连交换机模式,并设置路由键在keys数组中随机选中
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String[] keys = new String[]{"key1", "key2", "key3"};

        // 发送消息
        for (int i = 0; i < SEND_NUM; i++) {
            String key = keys[i % keys.length];
            String message = "发送RabbitMQ消息" + i;
            //消息进行发送
            channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
            log.info("sendMessage:{}==={}", key, message);
        }
        channel.close();
        connection.close();
    }
}
java
@Slf4j
public class DirectTest {

    // 生产者绑定交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange";
    // 消息发送次数
    private static final int SEND_NUM = 10;

    @Test
    public void consumer1() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 声明一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 交换器和队列绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "key1");
        // 绑定多个key
        // channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, "key3");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        //消费者在指定的对队列上消费
        channel.basicConsume(queueName, true, consumer);
        // 保持主线程不退出
        Thread.sleep(100000);
    }
}

控制台输出如下:

shell
demo.DirectTest - sendMessage:key1===发送RabbitMQ消息3
demo.DirectTest - sendMessage:key2===发送RabbitMQ消息4
demo.DirectTest - sendMessage:key3===发送RabbitMQ消息5
demo.DirectTest - sendMessage:key1===发送RabbitMQ消息6
demo.DirectTest - sendMessage:key2===发送RabbitMQ消息7
demo.DirectTest - sendMessage:key3===发送RabbitMQ消息8
demo.DirectTest - sendMessage:key1===发送RabbitMQ消息9
shell
demo.DirectTest - 等待 message.....
demo.DirectTest - Received:key1========发送RabbitMQ消息0
demo.DirectTest - Received:key1========发送RabbitMQ消息3
demo.DirectTest - Received:key1========发送RabbitMQ消息6
demo.DirectTest - Received:key1========发送RabbitMQ消息9

队列不同、绑定键相同

队列不同绑定键相同的情况下,所有消费者都会消费消息,代码如下:

java
@Slf4j
public class DirectTest1 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange";
    // 消息发送次数
    private static final int SEND_NUM = 10;

    @Test
    public void send() throws IOException, TimeoutException {
        // 获取信道,设置成直连交换机模式,并设置路由键在keys数组中随机选中
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        String[] keys = new String[]{"key1", "key2", "key3"};

        for (int i = 0; i < SEND_NUM; i++) {
            String key = keys[i % keys.length];
            String message = "发送RabbitMQ消息" + i;
            channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
            log.info("sendMessage:{}==={}", key, message);
        }
        channel.close();
        connection.close();
    }
}
java
@Slf4j
public class DirectTest1 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange";
    // 消息发送次数
    private static final int SEND_NUM = 10;
    // 队列名称1
    private static final String QUEUE_NAME1 = "direct_queue1";

    @Test
    public void consumer1() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "key1");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        channel.basicConsume(QUEUE_NAME1, true, consumer);
        Thread.sleep(100000);
    }
}
java
@Slf4j
public class DirectTest1 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange";
    // 消息发送次数
    private static final int SEND_NUM = 10;
    // 队列名称2
    private static final String QUEUE_NAME2 = "direct_queue2";

    @Test
    public void consumer2() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "key1");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer2:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        channel.basicConsume(QUEUE_NAME2, true, consumer);
        Thread.sleep(100000);
    }
}

控制台日志如下:

shell
direct.DirectTest1 - sendMessage:key1===发送RabbitMQ消息0
direct.DirectTest1 - sendMessage:key2===发送RabbitMQ消息1
direct.DirectTest1 - sendMessage:key3===发送RabbitMQ消息2
direct.DirectTest1 - sendMessage:key1===发送RabbitMQ消息3
direct.DirectTest1 - sendMessage:key2===发送RabbitMQ消息4
direct.DirectTest1 - sendMessage:key3===发送RabbitMQ消息5
direct.DirectTest1 - sendMessage:key1===发送RabbitMQ消息6
direct.DirectTest1 - sendMessage:key2===发送RabbitMQ消息7
direct.DirectTest1 - sendMessage:key3===发送RabbitMQ消息8
direct.DirectTest1 - sendMessage:key1===发送RabbitMQ消息9
shell
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息0
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息3
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息6
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息9
shell
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息0
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息3
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息6
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息9

队列相同、绑定键相同

轮训消费消息

java
@Slf4j
public class DirectTest2 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange6";
    // 消息发送次数
    private static final int SEND_NUM = 10;

    @Test
    public void send() throws IOException, TimeoutException {
        // 获取信道,设置成直连交换机模式,并设置路由键在keys数组中随机选中
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        String[] keys = new String[]{"key1", "key2", "key3"};

        // 发送消息
        for (int i = 0; i < SEND_NUM; i++) {
            String key = keys[i % keys.length];
            String message = "发送RabbitMQ消息" + i;
            channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
            log.info("sendMessage:{}==={}", key, message);
        }
        channel.close();
        connection.close();
    }
}
java
@Slf4j
public class DirectTest2 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange6";
    // 消息发送次数
    private static final int SEND_NUM = 10;
    // 队列名称1
    private static final String QUEUE_NAME1 = "direct_queue6";

    @Test
    public void consumer1() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "key1");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        channel.basicConsume(QUEUE_NAME1, true, consumer);
        Thread.sleep(100000);
    }
}
java
@Slf4j
public class DirectTest2 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange6";
    // 消息发送次数
    private static final int SEND_NUM = 10;
    // 队列名称1
    private static final String QUEUE_NAME1 = "direct_queue6";

    @Test
    public void consumer2() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "key1");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        channel.basicConsume(QUEUE_NAME1, true, consumer);
        Thread.sleep(100000);
    }
}

控制台输出情况如下:

shell
// 生产者
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息0
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息1
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息2
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息3
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息4
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息5
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息6
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息7
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息8
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息9
shell
// 消费者1
direct.DirectTest2 - Received in consumer1:key1========发送RabbitMQ消息0
direct.DirectTest2 - Received in consumer1:key1========发送RabbitMQ消息6
shell
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息3
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息9

队列相同、绑定键不同

队列相同、绑定键不同时,轮训消费各自绑定键的消息的消息。

java
@Slf4j
public class DirectTest3 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange2";
    // 消息发送次数
    private static final int SEND_NUM = 10;
    
    @Test
    public void send() throws IOException, TimeoutException {
        // 获取信道,设置成直连交换机模式,并设置路由键在keys数组中随机选中
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        String[] keys = new String[]{"key1", "key2", "key3"};

        // 发送消息
        for (int i = 0; i < SEND_NUM; i++) {
            String key = keys[i % keys.length];
            String message = "发送RabbitMQ消息" + i;
            channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
            log.info("sendMessage:{}==={}", key, message);
        }
        channel.close();
        connection.close();
    }
}
java
@Slf4j
public class DirectTest3 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange2";
    // 队列名称1
    private static final String QUEUE_NAME1 = "direct_queue3";

    @Test
    public void consumer1() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "key1");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        channel.basicConsume(QUEUE_NAME1, true, consumer);
        Thread.sleep(100000);
    }
}
java
@Slf4j
public class DirectTest3 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_exchange2";
    // 队列名称2
    private static final String QUEUE_NAME2 = "direct_queue3";

    @Test
    public void consumer2() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "key2");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer2:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        channel.basicConsume(QUEUE_NAME2, true, consumer);
        Thread.sleep(100000);
    }
}

控制台输出情况如下:

shell
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息0
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息1
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息2
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息3
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息4
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息5
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息6
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息7
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息8
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息9
shell
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息0
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息3
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息6
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息9
shell
direct.DirectTest2 - Received in consumer1:key2========发送RabbitMQ消息1
direct.DirectTest2 - Received in consumer1:key2========发送RabbitMQ消息4
direct.DirectTest2 - Received in consumer1:key2========发送RabbitMQ消息7

扇形交换器(广播)

扇形交换器:将消息广播到所有绑定到该交换机的队列。

java
@Slf4j
public class FanoutTest {

    // 交换器名称
    private static final String EXCHANGE_NAME = "fanout_exchange";
    // 消息发送次数
    private static final int SEND_NUM = 10;

    @Test
    public void send() throws IOException, TimeoutException {
        // 获取信道,设置成扇形交换机模式
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);

        // 发送消息
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "发送RabbitMQ消息" + i;
            // 消息进行发送
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            log.info("sendMessage:{}", message);
        }
        channel.close();
        connection.close();
    }
}
java
@Slf4j
public class FanoutTest {

    // 交换器名称
    private static final String EXCHANGE_NAME = "fanout_exchange";
    // 队列名称1
    private static final String QUEUE_NAME1 = "fanout_queue1";

    @Test
    public void consumer1() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer1:{}", message);
            }
        };
        channel.basicConsume(QUEUE_NAME1, true, consumer);
        Thread.sleep(100000);
    }
}
java
@Slf4j
public class FanoutTest {

    // 交换器名称
    private static final String EXCHANGE_NAME = "fanout_exchange";
    // 队列名称2
    private static final String QUEUE_NAME2 = "fanout_queue2";

    @Test
    public void consumer2() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
        // 交换器和队列绑定
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer2:{}", message);
            }
        };
        channel.basicConsume(QUEUE_NAME2, true, consumer);
        Thread.sleep(100000);
    }
}

控制台输出如下:

markdown
fanout.FanoutTest - sendMessage:发送RabbitMQ消息0
fanout.FanoutTest - sendMessage:发送RabbitMQ消息1
fanout.FanoutTest - sendMessage:发送RabbitMQ消息2
fanout.FanoutTest - sendMessage:发送RabbitMQ消息3
fanout.FanoutTest - sendMessage:发送RabbitMQ消息4
fanout.FanoutTest - sendMessage:发送RabbitMQ消息5
fanout.FanoutTest - sendMessage:发送RabbitMQ消息6
fanout.FanoutTest - sendMessage:发送RabbitMQ消息7
fanout.FanoutTest - sendMessage:发送RabbitMQ消息8
fanout.FanoutTest - sendMessage:发送RabbitMQ消息9
markdown
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息0
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息1
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息2
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息3
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息4
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息5
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息6
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息7
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息8
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息9
markdown
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息0
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息1
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息2
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息3
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息4
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息5
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息6
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息7
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息8
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息9

主题交换机

主题交换机:可以配置复杂条件的以.区分单词,*模糊匹配单个,#模糊匹配多个单词

  1. RabbitMQ的主题以.来区分单词;
  2. *:模糊匹配单个单词
  3. #:模糊匹配多个单词
java
@Slf4j
public class TopicTest {

    // 交换器名称
    private static final String EXCHANGE_NAME = "topic_exchange";
    // 消息发送次数
    private static final int SEND_NUM = 10;

    @Test
    public void send() throws IOException, TimeoutException {
        // 获取信道,设置成主题交换机模式
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);

        // 发送消息
        String[] routingKeys = {"send.mail.num", "send.mail.rate", "send.sms.num", "send.sms.success.num"};
        for (int i = 0; i < SEND_NUM; i++) {
            String routingKey = routingKeys[i % routingKeys.length];
            String message = "发送RabbitMQ消息" + i;
            // 消息进行发送
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            log.info("sendMessage:{}==={}", routingKey, message);
        }
        channel.close();
        connection.close();
    }
}
java
@Slf4j
public class TopicTest {

    // 交换器名称
    private static final String EXCHANGE_NAME = "topic_exchange";
    // 队列名称1
    private static final String QUEUE_NAME1 = "topic_queue1";
    
    @Test
    public void consumer1() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
        // 交换器和队列绑定,使用绑定键 "quick.*.*"
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "send.#");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        // 消费者在指定的对队列上消费
        channel.basicConsume(QUEUE_NAME1, true, consumer);
        // 保持主线程不退出
        Thread.sleep(100000);
    }
}
java
@Slf4j
public class TopicTest {

    // 交换器名称
    private static final String EXCHANGE_NAME = "topic_exchange";
    // 队列名称2
    private static final String QUEUE_NAME2 = "topic_queue2";

    @Test
    public void consumer2() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
        // 交换器和队列绑定,使用绑定键 "*.orange.*"
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "*.sms.*");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer2:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        // 消费者在指定的对队列上消费
        channel.basicConsume(QUEUE_NAME2, true, consumer);
        // 保持主线程不退出
        Thread.sleep(100000);
    }
}
java
@Slf4j
public class TopicTest {

    // 交换器名称
    private static final String EXCHANGE_NAME = "topic_exchange";
    // 队列名称3
    private static final String QUEUE_NAME3 = "topic_queue3";

    @Test
    public void consumer3() throws IOException, InterruptedException {
        Connection connection = MQConstant.buildConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        // 声明一个持久化队列
        channel.queueDeclare(QUEUE_NAME3, true, false, false, null);
        // 交换器和队列绑定,使用绑定键 "lazy.#"
        channel.queueBind(QUEUE_NAME3, EXCHANGE_NAME, "#.num");
        log.info("等待 message.....");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                log.info("Received in consumer3:{}========{}", envelope.getRoutingKey(), message);
            }
        };
        // 消费者在指定的对队列上消费
        channel.basicConsume(QUEUE_NAME3, true, consumer);
        // 保持主线程不退出
        Thread.sleep(100000);
    }
}

控制台输出如下所示:

shell
topic.TopicTest - sendMessage:send.mail.num===发送RabbitMQ消息0
topic.TopicTest - sendMessage:send.mail.rate===发送RabbitMQ消息1
topic.TopicTest - sendMessage:send.sms.num===发送RabbitMQ消息2
topic.TopicTest - sendMessage:send.sms.success.num===发送RabbitMQ消息3
topic.TopicTest - sendMessage:send.mail.num===发送RabbitMQ消息4
topic.TopicTest - sendMessage:send.mail.rate===发送RabbitMQ消息5
topic.TopicTest - sendMessage:send.sms.num===发送RabbitMQ消息6
topic.TopicTest - sendMessage:send.sms.success.num===发送RabbitMQ消息7
topic.TopicTest - sendMessage:send.mail.num===发送RabbitMQ消息8
topic.TopicTest - sendMessage:send.mail.rate===发送RabbitMQ消息9
shell
topic.TopicTest - Received in consumer1:send.mail.num========发送RabbitMQ消息0
topic.TopicTest - Received in consumer1:send.mail.rate========发送RabbitMQ消息1
topic.TopicTest - Received in consumer1:send.sms.num========发送RabbitMQ消息2
topic.TopicTest - Received in consumer1:send.sms.success.num========发送RabbitMQ消息3
topic.TopicTest - Received in consumer1:send.mail.num========发送RabbitMQ消息4
topic.TopicTest - Received in consumer1:send.mail.rate========发送RabbitMQ消息5
topic.TopicTest - Received in consumer1:send.sms.num========发送RabbitMQ消息6
topic.TopicTest - Received in consumer1:send.sms.success.num========发送RabbitMQ消息7
topic.TopicTest - Received in consumer1:send.mail.num========发送RabbitMQ消息8
topic.TopicTest - Received in consumer1:send.mail.rate========发送RabbitMQ消息9
shell
topic.TopicTest - Received in consumer2:send.sms.num========发送RabbitMQ消息2
topic.TopicTest - Received in consumer2:send.sms.num========发送RabbitMQ消息6
shell
topic.TopicTest - Received in consumer3:send.mail.num========发送RabbitMQ消息0
topic.TopicTest - Received in consumer3:send.sms.num========发送RabbitMQ消息2
topic.TopicTest - Received in consumer3:send.sms.success.num========发送RabbitMQ消息3
topic.TopicTest - Received in consumer3:send.mail.num========发送RabbitMQ消息4
topic.TopicTest - Received in consumer3:send.sms.num========发送RabbitMQ消息6
topic.TopicTest - Received in consumer3:send.sms.success.num========发送RabbitMQ消息7
topic.TopicTest - Received in consumer3:send.mail.num========发送RabbitMQ消息8

头交换机

头交换机可以传递非字符串内容,更为灵活(实际使用少)