Skip to content

私有协议设计

私有协议由请求头和请求体组成,其中请求体由7个部分组成,均使用int类型表述,int类型占4字节,故请求头共占8字节。

自研IM协议设置.drawio

  • 指令:本次请求要干什么,比如发消息、好友上线通知等,类似HTTP请求的GET、POST请求。
  • 版本:协议版本,可根据不同版本,响应不同的Handler处理业务。
  • 客户端类型:PC(win、mac)、web、移动端
  • 消息解析类型:比如JSON、protobuf等
  • imei长度:由于imei使用字符串表述,且属于敏感信息,因此不便放于请求头,imei用于标识不同客户端,比如知道用户在A设备或B设备登录
  • appId:本IM系统为中台业务对外提供服务,因此使用appId标识不同服务厂商
  • 消息体长度:本次发送的消息长度,用于解决TCP粘包与半包问题

消息实体类设计

首先是消息头,由指令、版本、客户端类型、消息解析类型、IMEI、appId和消息体长度构成,其中IMEI通过IMEI长度可直接从消息体获得,信息直接将IMEI字符串设置到Header对象中,不用存储长度。

java
@Data
public class MessageHeader {
    // 指令
    private Integer command;

    // 版本
    private Integer version;

    // 客户端类型 PC、web等
    private Integer clientType;

    // 消息解析类型
    private Integer messageType = 0x0;

    // appId
    private Integer appId;

    // imei长度
    private Integer imeiLength;

    // 消息体长度
    private int bodyLen;

    // IMEI
    private String imei;
}

在私有协议中我们设计消息由消息头+消息体组成,因此消息对象中包含两个对象;

java
@Data
public class Message {

    private MessageHeader messageHeader;

    private Object messagePack;

    @Override
    public String toString() {
        return "Message{ messageHeader=" + messageHeader + ", messagePack=" + messagePack +  '}';
    }
}

解码器实现

解码器实现步骤

  1. 如果本次消息长度小于28字节,直接返回不处理。
  2. 将“请求头”的28个字节解析出来封装到header对象,获取指令、版本、客户端类型、消息解析类型、imei长度、appId和消息体长度。
  3. 如果剩余长度不足imei长度+消息体长度,则直接返回(粘包半包问题导致消息本次发送不完全)。
  4. 根据imei长度获取到imei,根据body长度获取到消息体。
  5. 将消息体和消息体封装到Message对象,并返回到流中。
java
package com.xk857.im.codec;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.xk857.im.codec.proto.Message;
import com.xk857.im.codec.proto.MessageHeader;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * @author cv大魔王
 * @version 1.0
 * @description 私有协议解码器
 * @date 2023/8/20
 */
public class MessageDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 28) {
            return;
        }

        // 获取请求头数据,顺序不能乱
        MessageHeader header = new MessageHeader();
        header.setCommand(in.readInt());
        header.setVersion(in.readInt());
        header.setClientType(in.readInt());
        header.setMessageType(in.readInt());
        header.setAppId(in.readInt());
        header.setImeiLength(in.readInt());
        header.setBodyLen(in.readInt());

        // 如果剩余长度不足IMEI长度+消息体长度则直接返回
        if (in.readableBytes() < header.getBodyLen() + header.getImeiLength()) {
            in.resetReaderIndex();
            return;
        }

        // 获取IMEI
        byte[] imeiData = new byte[header.getImeiLength()];
        in.readBytes(imeiData);
        header.setImei(new String(imeiData));

        // 获取消息体
        byte[] bodyData = new byte[header.getBodyLen()];
        in.readBytes(bodyData);
        String body = new String(bodyData);

        Message message = new Message();
        message.setMessageHeader(header);

        if (header.getMessageType() == 0x0) {
            // 如果是0X0则解析成JSON格式
            JSONObject jsonObject = JSONUtil.parseObj(body);
            message.setMessagePack(jsonObject);
        }

        in.markReaderIndex();
        out.add(message);
    }
}

编码器实现

Netty服务器内的编码器,不是将信息编译成上文设计的私有协议那样,私有协议解析的是客户端向Netty服务发送的数据;而此编码器的功能是对Netty服务器之间的通信消息进行编码,MessagePack对象是Netty服务器内部之间通信的对象。

java
package com.xk857.im.codec.proto;
/**
 * @author cv大魔王
 * @description 消息服务发送给tcp的包体, tepA 根据改包体解析成Message2 发给客户端
 * @date 2023/8/20
 */
@Data
public class MessagePack<T> implements Serializable {

    private String userId;
    private Integer appId;
    /** 接收方 */
    private String toId;
    private int clientType;
    /** 消息ID */
    private String messageId;
    private String imei;
    private Integer command;

    /** 业务数据对象,如果是聊天消息则不需要解析直接透传 */
    private T data;
}

MessageEncoder编码器的功能是对Netty服务器之间通信的数据进行编码,所以协议设计按最简单的来,数据长度+数据内容构成。

java
package com.xk857.im.codec;

import cn.hutool.json.JSONUtil;
import com.xk857.im.codec.proto.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 消息编码类,私有协议规则,前4位表示长度,接着command4位,后面是数据
 */
public class MessageEncoder extends MessageToByteEncoder {

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        if(msg instanceof MessagePack){
            MessagePack msgBody = (MessagePack) msg;
            String s = JSONUtil.toJsonStr(msgBody.getData());
            byte[] bytes = s.getBytes();
            out.writeInt(msgBody.getCommand());
            out.writeInt(bytes.length);
            out.writeBytes(bytes);
        }
    }
}

启动类添加Handler

java
public class XimServer {
    
    // ……
    public XimServer(BootstrapConfig.TcpConfig config) {
        server = new ServerBootstrap();
        // ……
        server.group(bossGroup, workerGroup)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(new MessageDecoder());
                    sc.pipeline().addLast(new MessageEncoder());
                    // ……
                }
            });
    }

    public void start() {
        this.server.bind(this.config.getTcpPort());
    }
}