Skip to content

RabbitMQ安装参考使用Docker安装RabbitMQ,而RabbitMQ的使用参考RabbitMQ极速入门篇就足够使用了。

动态配置RabbitMQ连接信息

首先是Config.yml配置文件新增RabbitMQ配置。

yml
xim:
  tcpPort: 9000
  webSocketPort: 19000
  bossThreadSize: 1
  workThreadSize: 4
  heartBeatTime: 30000 # 心跳超时时间,单位毫秒
  redis:
    mode: single # 单机模式:single 哨兵模式:Sentinel 集群模式:cluster
    database: 5
    password:
    timeout: 3000 # 超时时间
    poolMinIdle: 8 #最小空闲数
    poolConnTimeout: 3000 # 连接超时时间(毫秒)
    poolSize: 10 # 连接池大小
    single: #redis单机配置
      address: 127.0.0.1:6379
  rabbitmq:
    host: 192.168.31.76
    port: 5672
    virtualHost: /
    userName: 你的账号
    password: 你的密码

BootstrapConfig文件新增RabbitMQ配置文件读取

java
@Data
public class BootstrapConfig {

    private TcpConfig xim;

    @Data
    public static class TcpConfig {
        // ……
        private RedisConfig redis;
        private Rabbitmq rabbitmq;
    }
    
    // ……
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Rabbitmq {
        private String host;
        private Integer port;
        private String virtualHost;
        private String userName;
        private String password;
    }
}

统一管理RabbitMQ连接信息

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xk857.im.codec.config.BootstrapConfig;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

/**
 * @author cv大魔王
 * @description MQ工厂工具类,统一管理RabbitMQ连接  
 */
public class MqFactory {
    
    private static ConnectionFactory factory = null;
    private static Channel defaultChannel;
    private static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
    
    private static Connection getConnection() throws IOException, TimeoutException {
        return factory.newConnection();
    }

    /**
     * 获取和RabbitMQ通信的Channel,根据Channel名称获取,如果找不到则新建
     */
    public static Channel getChannel(String channelName) throws IOException, TimeoutException {
        Channel channel = channelMap.get(channelName);
        if (channel == null) {
            channel = getConnection().createChannel();
            channelMap.put(channelName, channel);
        }
        return channel;
    }

    /**
     * 初始化RabbitMQ配置,填入地址、端口等信息,从yml文件读取
     */
    public static void init(BootstrapConfig.Rabbitmq rabbitmq) {
        if (factory == null) {
            factory = new ConnectionFactory();
            factory.setHost(rabbitmq.getHost());
            factory.setPort(rabbitmq.getPort());
            factory.setUsername(rabbitmq.getUserName());
            factory.setPassword(rabbitmq.getPassword());
            factory.setVirtualHost(rabbitmq.getVirtualHost());
        }
    }
}

为什么要根据名称获取?

一个Channel只负责一个内容,否则会造成紊乱,比如ChannelA只负责生产发送到q1队列的消息,ChannelB只负责消费x1交换机中q2队列的消息

在Start中初调用init()方法初始化RabbitMQ工厂信息

java
public class Starter {
     // ……
    private static void start(String path) {
        try {
            // ……
            RedisManager.init(bootstrapConfig);
            MqFactory.init(bootstrapConfig.getXim().getRabbitmq()); // 初始化RabbitMQ连接工厂
        } catch (Exception e) {
             // ……
        }
    }
}

消费来自消息服务发送的消息

消息服务发送的消息先通过RabbitMQ发送到指定交换机,将来要主动推送给用户,消息服务的交换机是相同的,但是会根据不同的Netty服务器推送到不同队列上,也就是一个队列仅对应一个Netty服务,本质上是通过brokerId区分。

java
@Slf4j
public class MessageReceiver {

    private static String brokerId;

    private static void startReceiverMessage() {
        try {
            Channel channel = MqFactory.getChannel(Constants.RabbitConstants.MessageService2Im + brokerId);
            // 声明队列,加上brokerId代表只消费自己的Netty服务器消息
            channel.queueDeclare(Constants.RabbitConstants.MessageService2Im + brokerId, true, false, false, null);
            // 将队列绑定到指定的交换机上,参数是:队列名称 交换机名称 路由键
            channel.queueBind(Constants.RabbitConstants.MessageService2Im + brokerId, Constants.RabbitConstants.MessageService2Im, brokerId);
            // 消费队列中的消息
            channel.basicConsume(Constants.RabbitConstants.MessageService2Im + brokerId, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // TODO 处理消息服务发来的消息
                    String msgStr = new String(body);
                    log.info("消息服务发来的消息:{}", msgStr);
                }
            });
        } catch (Exception e) {
            log.error("接收消息服务Consumer出现异常:{}", e.getMessage());
            throw new RuntimeException(e);
        }
    }

    public static void init() {
        startReceiverMessage();
    }

    public static void init(String brokerId) {
        if (StrUtil.isBlank(MessageReceiver.brokerId)) {
            MessageReceiver.brokerId = brokerId;
        }
        startReceiverMessage();
    }
}

MQ消息推送

在推送时调用这个类即可,推送的队列名称暂时为空,后续会详细设计

java
@Slf4j
public class MqMessageProducer {

    public static void sendMessage(Object message) {
        Channel channel = null;
        String channelName = "";
        try {
            channel = MqFactory.getChannel(channelName);
            channel.basicPublish(channelName, "", null, JSONUtil.toJsonStr(message).getBytes());
        } catch (IOException | TimeoutException e) {
            log.error("发送消息出现异常:{}", e.getMessage());
            throw new RuntimeException(e);
        }
    }
}