Skip to content

RabbitMQ提供了6种工作模式,本篇会挑选工作中最常用的几种模式进行讲解,6中模式如下:

  1. Simple模式:这是最简单的模式,只有一个生产者和一个消费者。
  2. Work模式:在这种模式下,每个消费者都有一个独立的线程,可以处理多个消息。
  3. Publish/Subscribe模式:在这种模式下,生产者将消息发布到交换机上,消费者从交换机上订阅消息。
  4. Routing模式:在这种模式下,消息根据路由键被分发到不同的队列中。
  5. Topic模式:在这种模式下,生产者和消费者都不需要关心消息的具体类型,只需要发送或接收消息即可。
  6. RPC模式:在这种模式下,生产者和消费者之间通过远程过程调用进行通信。
xml
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

Simple简单模式

生产者发送消息到队列,有一个或多个消费者,当多个消费者同时监听一个队列时,他们并不能同时消费一条消息,而是随机消费消息,即一个队列中一条消息,只能被一个消费者消费。

简单模式.drawio

代码如下所示,无论是生产者还是消费者都需要先调用getChannel()方法获取连接,获取连接的步骤是固定的:

  1. 首先输入地址、端口号等信息创建连接工厂。
  2. 通过工厂建立连接,获取到连接对象建立通信通道。

首先启动消费者时刻监听hello队列,一旦有消息传来立即消费,并打印消息到控制台,这里的消费者可以创建多个。然后运行producer()生产者方法,向队列发送信息。

java
public class HelloWorld {
    // 队列名称
    private final static String QUEUE_NAME = "hello";

    @Test
    public void producer() throws Exception {
        Channel channel = getChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        // 发送消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
    }

    public static void main(String[] args) throws Exception {
        Channel channel = getChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.printf("customer1收到消息:%s%n", message);
            }
        };
        // 创建消费者
        Consumer consumer2 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.printf("customer2收到消息:%s%n", message);
            }
        };
        // 监听队列并消费消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
        channel.basicConsume(QUEUE_NAME, true, consumer2);
    }

    /**
     * 获取RabbitMQ连接对象
     */
    private static Channel getChannel() throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.76");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("monkeyz1368");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建通道
        return connection.createChannel();
    }
}

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  • QUEUE_NAME:队列的名称,是一个字符串类型的参数。
  • 第2个参数表示这个队列是否持久化,false表示不持久化,也就是一旦服务器重启,队列就会被删除。
  • 第3个参数表示这个队列是否独占,即是否只允许一个消费者同时消费这个队列中的消息。false表示不独占,也就是多个消费者可以同时消费这个队列中的消息。
  • 第4个参数表示这个队列是否自动删除,即当最后一个消费者取消订阅后,是否自动删除这个队列。false表示不自动删除,也就是需要手动删除这个队列。
  • null:表示队列的属性,这里传入null表示使用默认属性。

常见的交换机类型

  • direct直连交换机:最简单的交换机类型,它将消息直接路由到与消息中的路由键完全匹配的队列。当路由键与绑定时指定的路由键完全匹配时,消息将被投递到对应的队列。
  • fanout扇形交换机:扇形交换机将消息广播到绑定到该交换机的所有队列,它忽略消息的路由键,只需将消息发送到所有绑定的队列。
  • topic主题交换机:主题交换机根据消息的路由键和绑定的模式进行匹配,将消息路由到一个或多个队列。模式可以使用通配符进行匹配,例如*代表一个单词,#代表零个或多个单词。
  • headers头交换机:头交换机根据消息的头部属性进行匹配和路由。消息中的头部属性与绑定时指定的头部属性进行匹配,如果匹配成功,则消息被路由到对应的队列。在绑定消息队列与交换机之前声明一个map键值对,通过这个map对象实现消息队列和交换机的绑定。当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列,有两种匹配规则:
    • x-match = all :表示所有的键值对都匹配才能接受到消息
    • x-match = any :表示只要有键值对匹配就能接受到消息
  • 总结:fanout只和交换机有关,只要消费者绑定在此交换机就会收到消息;而Direct的消费者只有当交换机和路由键同时匹配才能收到消息;Topic提供更为强大的通配符来表示路由,类似MySQL的like模糊查询功能;Headers与路由键无关,匹配消息头中的属性信息,用的较少。
  • 发布订阅模式一般使用fanout,有多个消费者消费,也就是发布消息后,所有订阅此交换机的都会收到消息进行消费。
  • Routing模式一般使用direct,绑定交换机后再通过路由Key确定消费队列。

发布/订阅模式

生产者,一个交换机(fanoutExchange),没有路由规则,多个队列,多个消费者。生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,来消费消息。

发布订阅模式

在发布订阅模式下可以实现一个生产者发送的消息,可以被多个消费者多次消费,之前的消息只能消费一次。来看下面代码,生产者加了交换机名称和路由Key,在本案例中,路由Key等于没用,因为交换机类型设置为fanout,后文有说明。

而消费者创建了两个q1和q2队列,绑定到my_exchange队列上进行消费,当发送消息时,两个队列的消费者会同时接收到消息。如果q1有多个消费者,那么只会有一个q1的消费者接收到消息。

java
public class Pubsub {
    // 交换机名称
    private static final String EXCHANGE_NAME = "my_exchange";
    // 路由Key
    private static final String ROUTING_KEY = "hello";

    @Test
    public void producer() throws Exception {
        Channel channel = getConnection().createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String message = "Hello World!";
        // 比之前多了一个交换机名称,发送4条消息方便测试
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
    }

    public static void main(String[] args) throws Exception {
        Channel channel = getConnection().createChannel();
        // 声明一个交换机,并设置其类型为"fanout"
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 声明一个队列并绑定交换机和路由Key
        String queueName = channel.queueDeclare("q1", false, false, false, null).getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
        String queueName2 = channel.queueDeclare("q2", false, false, false, null).getQueue();
        channel.queueBind(queueName2, EXCHANGE_NAME, ROUTING_KEY);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.printf("q1 customer收到消息:%s%n", message);
            }
        };
        Consumer consumer1 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.printf("q11 1customer收到消息:%s%n", message);
            }
        };
        // q1队列有两个消费者,但每次只会有一个q1的消费者收到消息
        channel.basicConsume("q1", true, consumer);
        channel.basicConsume("q1", true, consumer1);

        Consumer consumer2 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.printf("q2 customer收到消息:%s%n", message);
            }
        };
        // q2只绑定一个消费者,所以这个消费者100%会收到消息。
        channel.basicConsume("q2", true, consumer2);
    }

    /**
     * 获取RabbitMQ连接对象
     */
    private static Connection getConnection() throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.76");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("monkeyz1368");
        // 创建连接
        return factory.newConnection();
    }
}

Routing模式

生产者分别向ROUTING_KEY1和ROUTING_KEY2发送一条消息,两个Customer都只监听ROUTING_KEY1,控制台只打印ROUTING_KEY1的两条消息。

java
public class Routing {
    // 交换机名称
    private static final String EXCHANGE_NAME = "my_routing";
    // 路由Key
    private static final String ROUTING_KEY1 = "routing1";
    private static final String ROUTING_KEY2 = "routing2";

    @Test
    public void producer() throws Exception {
        Channel channel = getConnection().createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String message = "ROUTING_KEY1 我是1的消息";
        String message2 = "ROUTING_KEY2 我是2的消息";
        // 比之前多了一个交换机名称
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY1, null, message.getBytes());
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY2, null, message2.getBytes());
    }

    public static void main(String[] args) throws Exception {
        Channel channel = getConnection().createChannel();
        // 声明一个交换机,并设置其类型为"fanout"
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 声明一个队列并绑定交换机和路由Key
        String queueName = channel.queueDeclare("r1", false, false, false, null).getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY1);
        String queueName2 = channel.queueDeclare("r2", false, false, false, null).getQueue();
        channel.queueBind(queueName2, EXCHANGE_NAME, ROUTING_KEY1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.printf("r1 customer收到消息:%s%n", message);
            }
        };
        channel.basicConsume("r1", true, consumer);

        Consumer consumer2 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.printf("r2 customer收到消息:%s%n", message);
            }
        };
        channel.basicConsume("r2", true, consumer2);
    }

    /**
     * 获取RabbitMQ连接对象
     */
    private static Connection getConnection() throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.76");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("monkeyz1368");
        // 创建连接
        return factory.newConnection();
    }
}