Appearance
RabbitMQ安装参考使用Docker安装RabbitMQ,而RabbitMQ的使用参考RabbitMQ极速入门篇就足够使用了。
动态配置RabbitMQ连接信息
首先是Config.yml
配置文件新增RabbitMQ配置。
yml
xim:
tcpPort: 9000
webSocketPort: 19000
bossThreadSize: 1
workThreadSize: 4
heartBeatTime: 30000 # 心跳超时时间,单位毫秒
redis:
mode: single # 单机模式:single 哨兵模式:Sentinel 集群模式:cluster
database: 5
password:
timeout: 3000 # 超时时间
poolMinIdle: 8 #最小空闲数
poolConnTimeout: 3000 # 连接超时时间(毫秒)
poolSize: 10 # 连接池大小
single: #redis单机配置
address: 127.0.0.1:6379
rabbitmq:
host: 192.168.31.76
port: 5672
virtualHost: /
userName: 你的账号
password: 你的密码
BootstrapConfig
文件新增RabbitMQ配置文件读取
java
@Data
public class BootstrapConfig {
private TcpConfig xim;
@Data
public static class TcpConfig {
// ……
private RedisConfig redis;
private Rabbitmq rabbitmq;
}
// ……
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class Rabbitmq {
private String host;
private Integer port;
private String virtualHost;
private String userName;
private String password;
}
}
统一管理RabbitMQ连接信息
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xk857.im.codec.config.BootstrapConfig;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
/**
* @author cv大魔王
* @description MQ工厂工具类,统一管理RabbitMQ连接
*/
public class MqFactory {
private static ConnectionFactory factory = null;
private static Channel defaultChannel;
private static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
private static Connection getConnection() throws IOException, TimeoutException {
return factory.newConnection();
}
/**
* 获取和RabbitMQ通信的Channel,根据Channel名称获取,如果找不到则新建
*/
public static Channel getChannel(String channelName) throws IOException, TimeoutException {
Channel channel = channelMap.get(channelName);
if (channel == null) {
channel = getConnection().createChannel();
channelMap.put(channelName, channel);
}
return channel;
}
/**
* 初始化RabbitMQ配置,填入地址、端口等信息,从yml文件读取
*/
public static void init(BootstrapConfig.Rabbitmq rabbitmq) {
if (factory == null) {
factory = new ConnectionFactory();
factory.setHost(rabbitmq.getHost());
factory.setPort(rabbitmq.getPort());
factory.setUsername(rabbitmq.getUserName());
factory.setPassword(rabbitmq.getPassword());
factory.setVirtualHost(rabbitmq.getVirtualHost());
}
}
}
为什么要根据名称获取?
一个Channel只负责一个内容,否则会造成紊乱,比如ChannelA只负责生产发送到q1队列的消息,ChannelB只负责消费x1交换机中q2队列的消息
在Start中初调用init()方法初始化RabbitMQ工厂信息
java
public class Starter {
// ……
private static void start(String path) {
try {
// ……
RedisManager.init(bootstrapConfig);
MqFactory.init(bootstrapConfig.getXim().getRabbitmq()); // 初始化RabbitMQ连接工厂
} catch (Exception e) {
// ……
}
}
}
消费来自消息服务发送的消息
消息服务发送的消息先通过RabbitMQ发送到指定交换机,将来要主动推送给用户,消息服务的交换机是相同的,但是会根据不同的Netty服务器推送到不同队列上,也就是一个队列仅对应一个Netty服务,本质上是通过brokerId区分。
java
@Slf4j
public class MessageReceiver {
private static String brokerId;
private static void startReceiverMessage() {
try {
Channel channel = MqFactory.getChannel(Constants.RabbitConstants.MessageService2Im + brokerId);
// 声明队列,加上brokerId代表只消费自己的Netty服务器消息
channel.queueDeclare(Constants.RabbitConstants.MessageService2Im + brokerId, true, false, false, null);
// 将队列绑定到指定的交换机上,参数是:队列名称 交换机名称 路由键
channel.queueBind(Constants.RabbitConstants.MessageService2Im + brokerId, Constants.RabbitConstants.MessageService2Im, brokerId);
// 消费队列中的消息
channel.basicConsume(Constants.RabbitConstants.MessageService2Im + brokerId, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// TODO 处理消息服务发来的消息
String msgStr = new String(body);
log.info("消息服务发来的消息:{}", msgStr);
}
});
} catch (Exception e) {
log.error("接收消息服务Consumer出现异常:{}", e.getMessage());
throw new RuntimeException(e);
}
}
public static void init() {
startReceiverMessage();
}
public static void init(String brokerId) {
if (StrUtil.isBlank(MessageReceiver.brokerId)) {
MessageReceiver.brokerId = brokerId;
}
startReceiverMessage();
}
}
MQ消息推送
在推送时调用这个类即可,推送的队列名称暂时为空,后续会详细设计
java
@Slf4j
public class MqMessageProducer {
public static void sendMessage(Object message) {
Channel channel = null;
String channelName = "";
try {
channel = MqFactory.getChannel(channelName);
channel.basicPublish(channelName, "", null, JSONUtil.toJsonStr(message).getBytes());
} catch (IOException | TimeoutException e) {
log.error("发送消息出现异常:{}", e.getMessage());
throw new RuntimeException(e);
}
}
}