Appearance
RabbitMQ极速入门-四大交换机
重要概念
- Publisher[消息生产者]:负责生产消息并将其发送到指定的交换机。
- Message[消息]:由消息头和消息体组成;消息头包含元数据,如交换机名称、路由键;消息体是实际传递的数据。
- Exchange[交换机]:接收来自生产者的消息,并根据路由规则将消息发送到一个或多个队列。
- BindingKey[绑定键]:用于在交换机和队列之间建立绑定关系。
- RoutingKey[路由键]:生产者发送消息时指定的路由规则。与绑定键匹配时,消息被路由到相应的队列。
- Queue[队列]:消息存储载体,多个消费者可以订阅同一个队列,消息以轮询方式分发给消费者。
- Consumer[消息消费者]:订阅并消费队列中的消息。
- Channel[通道]:在每个连接中可建立多个通道,每个通道代表一个会话任务。通过通道复用 TCP 连接,减少性能开销。
- Virtual Host[虚拟主机]:实现逻辑分组和资源隔离。每个虚拟主机是一个独立的 RabbitMQ 实例,拥有独立的队列、交换机和绑定关系,确保业务隔离和数据安全。
- Broker[消息代理]:即消息队列服务器实体。
WARNING
- 如果消息没有找到可以发送的地方,将取决于 mandatory 属性,可能会返回给生产者或被丢弃,默认丢弃。
- 消息确认机制: ① 消息发送后即认为消费成功,可能导致消息丢失。② 需手动确认,确保消息可靠性,但降低吞吐量。
通用配置
首先是工具类用于获取链接(实际使用时需要放到配置文件,这里代码为了简便,所以没有规范书写)。
java
public interface MQConstant {
String RABBITMQ_HOST = "localhost";
Integer RABBITMQ_PORT = 5672;
String RABBITMQ_USERNAME = "guest";
String RABBITMQ_PASSWORD = "guest";
static Connection buildConnection() {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(MQConstant.RABBITMQ_HOST);//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername(MQConstant.RABBITMQ_USERNAME);
connectionFactory.setPassword(MQConstant.RABBITMQ_PASSWORD);
connectionFactory.setPort(MQConstant.RABBITMQ_PORT);
try {
// 创建连接
return connectionFactory.newConnection();
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
直连交换机
定义一个测试类,消费者仅消费路由键为key1
的消息,观察消费者输出,
java
@Slf4j
public class DirectTest {
// 生产者绑定交换器名称
private static final String EXCHANGE_NAME = "direct_exchange";
// 消息发送次数
private static final int SEND_NUM = 10;
@Test
public void send() throws IOException, TimeoutException {
// 获取信道,设置成直连交换机模式,并设置路由键在keys数组中随机选中
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String[] keys = new String[]{"key1", "key2", "key3"};
// 发送消息
for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "发送RabbitMQ消息" + i;
//消息进行发送
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
log.info("sendMessage:{}==={}", key, message);
}
channel.close();
connection.close();
}
}
java
@Slf4j
public class DirectTest {
// 生产者绑定交换器名称
private static final String EXCHANGE_NAME = "direct_exchange";
// 消息发送次数
private static final int SEND_NUM = 10;
@Test
public void consumer1() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 交换器和队列绑定
channel.queueBind(queueName, EXCHANGE_NAME, "key1");
// 绑定多个key
// channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, "key3");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received:{}========{}", envelope.getRoutingKey(), message);
}
};
//消费者在指定的对队列上消费
channel.basicConsume(queueName, true, consumer);
// 保持主线程不退出
Thread.sleep(100000);
}
}
控制台输出如下:
shell
demo.DirectTest - sendMessage:key1===发送RabbitMQ消息3
demo.DirectTest - sendMessage:key2===发送RabbitMQ消息4
demo.DirectTest - sendMessage:key3===发送RabbitMQ消息5
demo.DirectTest - sendMessage:key1===发送RabbitMQ消息6
demo.DirectTest - sendMessage:key2===发送RabbitMQ消息7
demo.DirectTest - sendMessage:key3===发送RabbitMQ消息8
demo.DirectTest - sendMessage:key1===发送RabbitMQ消息9
shell
demo.DirectTest - 等待 message.....
demo.DirectTest - Received:key1========发送RabbitMQ消息0
demo.DirectTest - Received:key1========发送RabbitMQ消息3
demo.DirectTest - Received:key1========发送RabbitMQ消息6
demo.DirectTest - Received:key1========发送RabbitMQ消息9
队列不同、绑定键相同
队列不同绑定键相同的情况下,所有消费者都会消费消息,代码如下:
java
@Slf4j
public class DirectTest1 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_exchange";
// 消息发送次数
private static final int SEND_NUM = 10;
@Test
public void send() throws IOException, TimeoutException {
// 获取信道,设置成直连交换机模式,并设置路由键在keys数组中随机选中
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
String[] keys = new String[]{"key1", "key2", "key3"};
for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "发送RabbitMQ消息" + i;
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
log.info("sendMessage:{}==={}", key, message);
}
channel.close();
connection.close();
}
}
java
@Slf4j
public class DirectTest1 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_exchange";
// 消息发送次数
private static final int SEND_NUM = 10;
// 队列名称1
private static final String QUEUE_NAME1 = "direct_queue1";
@Test
public void consumer1() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "key1");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(QUEUE_NAME1, true, consumer);
Thread.sleep(100000);
}
}
java
@Slf4j
public class DirectTest1 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_exchange";
// 消息发送次数
private static final int SEND_NUM = 10;
// 队列名称2
private static final String QUEUE_NAME2 = "direct_queue2";
@Test
public void consumer2() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "key1");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer2:{}========{}", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(QUEUE_NAME2, true, consumer);
Thread.sleep(100000);
}
}
控制台日志如下:
shell
direct.DirectTest1 - sendMessage:key1===发送RabbitMQ消息0
direct.DirectTest1 - sendMessage:key2===发送RabbitMQ消息1
direct.DirectTest1 - sendMessage:key3===发送RabbitMQ消息2
direct.DirectTest1 - sendMessage:key1===发送RabbitMQ消息3
direct.DirectTest1 - sendMessage:key2===发送RabbitMQ消息4
direct.DirectTest1 - sendMessage:key3===发送RabbitMQ消息5
direct.DirectTest1 - sendMessage:key1===发送RabbitMQ消息6
direct.DirectTest1 - sendMessage:key2===发送RabbitMQ消息7
direct.DirectTest1 - sendMessage:key3===发送RabbitMQ消息8
direct.DirectTest1 - sendMessage:key1===发送RabbitMQ消息9
shell
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息0
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息3
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息6
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息9
shell
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息0
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息3
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息6
direct.DirectTest1 - Received in consumer1:key1========发送RabbitMQ消息9
队列相同、绑定键相同
轮训消费消息
java
@Slf4j
public class DirectTest2 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_exchange6";
// 消息发送次数
private static final int SEND_NUM = 10;
@Test
public void send() throws IOException, TimeoutException {
// 获取信道,设置成直连交换机模式,并设置路由键在keys数组中随机选中
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
String[] keys = new String[]{"key1", "key2", "key3"};
// 发送消息
for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "发送RabbitMQ消息" + i;
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
log.info("sendMessage:{}==={}", key, message);
}
channel.close();
connection.close();
}
}
java
@Slf4j
public class DirectTest2 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_exchange6";
// 消息发送次数
private static final int SEND_NUM = 10;
// 队列名称1
private static final String QUEUE_NAME1 = "direct_queue6";
@Test
public void consumer1() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "key1");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(QUEUE_NAME1, true, consumer);
Thread.sleep(100000);
}
}
java
@Slf4j
public class DirectTest2 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_exchange6";
// 消息发送次数
private static final int SEND_NUM = 10;
// 队列名称1
private static final String QUEUE_NAME1 = "direct_queue6";
@Test
public void consumer2() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "key1");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(QUEUE_NAME1, true, consumer);
Thread.sleep(100000);
}
}
控制台输出情况如下:
shell
// 生产者
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息0
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息1
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息2
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息3
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息4
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息5
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息6
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息7
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息8
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息9
shell
// 消费者1
direct.DirectTest2 - Received in consumer1:key1========发送RabbitMQ消息0
direct.DirectTest2 - Received in consumer1:key1========发送RabbitMQ消息6
shell
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息3
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息9
队列相同、绑定键不同
队列相同、绑定键不同时,轮训消费各自绑定键的消息的消息。
java
@Slf4j
public class DirectTest3 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_exchange2";
// 消息发送次数
private static final int SEND_NUM = 10;
@Test
public void send() throws IOException, TimeoutException {
// 获取信道,设置成直连交换机模式,并设置路由键在keys数组中随机选中
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
String[] keys = new String[]{"key1", "key2", "key3"};
// 发送消息
for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "发送RabbitMQ消息" + i;
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
log.info("sendMessage:{}==={}", key, message);
}
channel.close();
connection.close();
}
}
java
@Slf4j
public class DirectTest3 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_exchange2";
// 队列名称1
private static final String QUEUE_NAME1 = "direct_queue3";
@Test
public void consumer1() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "key1");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(QUEUE_NAME1, true, consumer);
Thread.sleep(100000);
}
}
java
@Slf4j
public class DirectTest3 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_exchange2";
// 队列名称2
private static final String QUEUE_NAME2 = "direct_queue3";
@Test
public void consumer2() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "key2");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer2:{}========{}", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(QUEUE_NAME2, true, consumer);
Thread.sleep(100000);
}
}
控制台输出情况如下:
shell
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息0
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息1
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息2
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息3
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息4
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息5
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息6
direct.DirectTest2 - sendMessage:key2===发送RabbitMQ消息7
direct.DirectTest2 - sendMessage:key3===发送RabbitMQ消息8
direct.DirectTest2 - sendMessage:key1===发送RabbitMQ消息9
shell
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息0
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息3
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息6
direct.DirectTest2 - Received in consumer2:key1========发送RabbitMQ消息9
shell
direct.DirectTest2 - Received in consumer1:key2========发送RabbitMQ消息1
direct.DirectTest2 - Received in consumer1:key2========发送RabbitMQ消息4
direct.DirectTest2 - Received in consumer1:key2========发送RabbitMQ消息7
扇形交换器(广播)
扇形交换器:将消息广播到所有绑定到该交换机的队列。
java
@Slf4j
public class FanoutTest {
// 交换器名称
private static final String EXCHANGE_NAME = "fanout_exchange";
// 消息发送次数
private static final int SEND_NUM = 10;
@Test
public void send() throws IOException, TimeoutException {
// 获取信道,设置成扇形交换机模式
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
// 发送消息
for (int i = 0; i < SEND_NUM; i++) {
String message = "发送RabbitMQ消息" + i;
// 消息进行发送
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
log.info("sendMessage:{}", message);
}
channel.close();
connection.close();
}
}
java
@Slf4j
public class FanoutTest {
// 交换器名称
private static final String EXCHANGE_NAME = "fanout_exchange";
// 队列名称1
private static final String QUEUE_NAME1 = "fanout_queue1";
@Test
public void consumer1() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer1:{}", message);
}
};
channel.basicConsume(QUEUE_NAME1, true, consumer);
Thread.sleep(100000);
}
}
java
@Slf4j
public class FanoutTest {
// 交换器名称
private static final String EXCHANGE_NAME = "fanout_exchange";
// 队列名称2
private static final String QUEUE_NAME2 = "fanout_queue2";
@Test
public void consumer2() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
// 交换器和队列绑定
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer2:{}", message);
}
};
channel.basicConsume(QUEUE_NAME2, true, consumer);
Thread.sleep(100000);
}
}
控制台输出如下:
markdown
fanout.FanoutTest - sendMessage:发送RabbitMQ消息0
fanout.FanoutTest - sendMessage:发送RabbitMQ消息1
fanout.FanoutTest - sendMessage:发送RabbitMQ消息2
fanout.FanoutTest - sendMessage:发送RabbitMQ消息3
fanout.FanoutTest - sendMessage:发送RabbitMQ消息4
fanout.FanoutTest - sendMessage:发送RabbitMQ消息5
fanout.FanoutTest - sendMessage:发送RabbitMQ消息6
fanout.FanoutTest - sendMessage:发送RabbitMQ消息7
fanout.FanoutTest - sendMessage:发送RabbitMQ消息8
fanout.FanoutTest - sendMessage:发送RabbitMQ消息9
markdown
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息0
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息1
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息2
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息3
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息4
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息5
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息6
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息7
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息8
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息9
markdown
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息0
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息1
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息2
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息3
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息4
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息5
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息6
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息7
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息8
fanout.FanoutTest - Received in consumer1:发送RabbitMQ消息9
主题交换机
主题交换机:可以配置复杂条件的以.区分单词,*模糊匹配单个,#模糊匹配多个单词
- RabbitMQ的主题以
.
来区分单词; - *:模糊匹配单个单词
- #:模糊匹配多个单词
java
@Slf4j
public class TopicTest {
// 交换器名称
private static final String EXCHANGE_NAME = "topic_exchange";
// 消息发送次数
private static final int SEND_NUM = 10;
@Test
public void send() throws IOException, TimeoutException {
// 获取信道,设置成主题交换机模式
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
// 发送消息
String[] routingKeys = {"send.mail.num", "send.mail.rate", "send.sms.num", "send.sms.success.num"};
for (int i = 0; i < SEND_NUM; i++) {
String routingKey = routingKeys[i % routingKeys.length];
String message = "发送RabbitMQ消息" + i;
// 消息进行发送
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
log.info("sendMessage:{}==={}", routingKey, message);
}
channel.close();
connection.close();
}
}
java
@Slf4j
public class TopicTest {
// 交换器名称
private static final String EXCHANGE_NAME = "topic_exchange";
// 队列名称1
private static final String QUEUE_NAME1 = "topic_queue1";
@Test
public void consumer1() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
// 交换器和队列绑定,使用绑定键 "quick.*.*"
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "send.#");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer1:{}========{}", envelope.getRoutingKey(), message);
}
};
// 消费者在指定的对队列上消费
channel.basicConsume(QUEUE_NAME1, true, consumer);
// 保持主线程不退出
Thread.sleep(100000);
}
}
java
@Slf4j
public class TopicTest {
// 交换器名称
private static final String EXCHANGE_NAME = "topic_exchange";
// 队列名称2
private static final String QUEUE_NAME2 = "topic_queue2";
@Test
public void consumer2() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
// 交换器和队列绑定,使用绑定键 "*.orange.*"
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "*.sms.*");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer2:{}========{}", envelope.getRoutingKey(), message);
}
};
// 消费者在指定的对队列上消费
channel.basicConsume(QUEUE_NAME2, true, consumer);
// 保持主线程不退出
Thread.sleep(100000);
}
}
java
@Slf4j
public class TopicTest {
// 交换器名称
private static final String EXCHANGE_NAME = "topic_exchange";
// 队列名称3
private static final String QUEUE_NAME3 = "topic_queue3";
@Test
public void consumer3() throws IOException, InterruptedException {
Connection connection = MQConstant.buildConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME3, true, false, false, null);
// 交换器和队列绑定,使用绑定键 "lazy.#"
channel.queueBind(QUEUE_NAME3, EXCHANGE_NAME, "#.num");
log.info("等待 message.....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("Received in consumer3:{}========{}", envelope.getRoutingKey(), message);
}
};
// 消费者在指定的对队列上消费
channel.basicConsume(QUEUE_NAME3, true, consumer);
// 保持主线程不退出
Thread.sleep(100000);
}
}
控制台输出如下所示:
shell
topic.TopicTest - sendMessage:send.mail.num===发送RabbitMQ消息0
topic.TopicTest - sendMessage:send.mail.rate===发送RabbitMQ消息1
topic.TopicTest - sendMessage:send.sms.num===发送RabbitMQ消息2
topic.TopicTest - sendMessage:send.sms.success.num===发送RabbitMQ消息3
topic.TopicTest - sendMessage:send.mail.num===发送RabbitMQ消息4
topic.TopicTest - sendMessage:send.mail.rate===发送RabbitMQ消息5
topic.TopicTest - sendMessage:send.sms.num===发送RabbitMQ消息6
topic.TopicTest - sendMessage:send.sms.success.num===发送RabbitMQ消息7
topic.TopicTest - sendMessage:send.mail.num===发送RabbitMQ消息8
topic.TopicTest - sendMessage:send.mail.rate===发送RabbitMQ消息9
shell
topic.TopicTest - Received in consumer1:send.mail.num========发送RabbitMQ消息0
topic.TopicTest - Received in consumer1:send.mail.rate========发送RabbitMQ消息1
topic.TopicTest - Received in consumer1:send.sms.num========发送RabbitMQ消息2
topic.TopicTest - Received in consumer1:send.sms.success.num========发送RabbitMQ消息3
topic.TopicTest - Received in consumer1:send.mail.num========发送RabbitMQ消息4
topic.TopicTest - Received in consumer1:send.mail.rate========发送RabbitMQ消息5
topic.TopicTest - Received in consumer1:send.sms.num========发送RabbitMQ消息6
topic.TopicTest - Received in consumer1:send.sms.success.num========发送RabbitMQ消息7
topic.TopicTest - Received in consumer1:send.mail.num========发送RabbitMQ消息8
topic.TopicTest - Received in consumer1:send.mail.rate========发送RabbitMQ消息9
shell
topic.TopicTest - Received in consumer2:send.sms.num========发送RabbitMQ消息2
topic.TopicTest - Received in consumer2:send.sms.num========发送RabbitMQ消息6
shell
topic.TopicTest - Received in consumer3:send.mail.num========发送RabbitMQ消息0
topic.TopicTest - Received in consumer3:send.sms.num========发送RabbitMQ消息2
topic.TopicTest - Received in consumer3:send.sms.success.num========发送RabbitMQ消息3
topic.TopicTest - Received in consumer3:send.mail.num========发送RabbitMQ消息4
topic.TopicTest - Received in consumer3:send.sms.num========发送RabbitMQ消息6
topic.TopicTest - Received in consumer3:send.sms.success.num========发送RabbitMQ消息7
topic.TopicTest - Received in consumer3:send.mail.num========发送RabbitMQ消息8
头交换机
头交换机可以传递非字符串内容,更为灵活(实际使用少)