Appearance
私有协议设计
私有协议由请求头和请求体组成,其中请求体由7个部分组成,均使用int类型表述,int类型占4字节,故请求头共占8字节。
- 指令:本次请求要干什么,比如发消息、好友上线通知等,类似HTTP请求的GET、POST请求。
- 版本:协议版本,可根据不同版本,响应不同的Handler处理业务。
- 客户端类型:PC(win、mac)、web、移动端
- 消息解析类型:比如JSON、protobuf等
- imei长度:由于imei使用字符串表述,且属于敏感信息,因此不便放于请求头,imei用于标识不同客户端,比如知道用户在A设备或B设备登录
- appId:本IM系统为中台业务对外提供服务,因此使用appId标识不同服务厂商
- 消息体长度:本次发送的消息长度,用于解决TCP粘包与半包问题
消息实体类设计
首先是消息头,由指令、版本、客户端类型、消息解析类型、IMEI、appId和消息体长度构成,其中IMEI通过IMEI长度可直接从消息体获得,信息直接将IMEI字符串设置到Header对象中,不用存储长度。
java
@Data
public class MessageHeader {
// 指令
private Integer command;
// 版本
private Integer version;
// 客户端类型 PC、web等
private Integer clientType;
// 消息解析类型
private Integer messageType = 0x0;
// appId
private Integer appId;
// imei长度
private Integer imeiLength;
// 消息体长度
private int bodyLen;
// IMEI
private String imei;
}
在私有协议中我们设计消息由消息头+消息体组成,因此消息对象中包含两个对象;
java
@Data
public class Message {
private MessageHeader messageHeader;
private Object messagePack;
@Override
public String toString() {
return "Message{ messageHeader=" + messageHeader + ", messagePack=" + messagePack + '}';
}
}
解码器实现
解码器实现步骤
- 如果本次消息长度小于28字节,直接返回不处理。
- 将“请求头”的28个字节解析出来封装到header对象,获取指令、版本、客户端类型、消息解析类型、imei长度、appId和消息体长度。
- 如果剩余长度不足
imei长度+消息体长度
,则直接返回(粘包半包问题导致消息本次发送不完全)。 - 根据imei长度获取到imei,根据body长度获取到消息体。
- 将消息体和消息体封装到Message对象,并返回到流中。
java
package com.xk857.im.codec;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.xk857.im.codec.proto.Message;
import com.xk857.im.codec.proto.MessageHeader;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* @author cv大魔王
* @version 1.0
* @description 私有协议解码器
* @date 2023/8/20
*/
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 28) {
return;
}
// 获取请求头数据,顺序不能乱
MessageHeader header = new MessageHeader();
header.setCommand(in.readInt());
header.setVersion(in.readInt());
header.setClientType(in.readInt());
header.setMessageType(in.readInt());
header.setAppId(in.readInt());
header.setImeiLength(in.readInt());
header.setBodyLen(in.readInt());
// 如果剩余长度不足IMEI长度+消息体长度则直接返回
if (in.readableBytes() < header.getBodyLen() + header.getImeiLength()) {
in.resetReaderIndex();
return;
}
// 获取IMEI
byte[] imeiData = new byte[header.getImeiLength()];
in.readBytes(imeiData);
header.setImei(new String(imeiData));
// 获取消息体
byte[] bodyData = new byte[header.getBodyLen()];
in.readBytes(bodyData);
String body = new String(bodyData);
Message message = new Message();
message.setMessageHeader(header);
if (header.getMessageType() == 0x0) {
// 如果是0X0则解析成JSON格式
JSONObject jsonObject = JSONUtil.parseObj(body);
message.setMessagePack(jsonObject);
}
in.markReaderIndex();
out.add(message);
}
}
编码器实现
Netty服务器内的编码器,不是将信息编译成上文设计的私有协议那样,私有协议解析的是客户端向Netty服务发送的数据;而此编码器的功能是对Netty服务器之间的通信消息进行编码,MessagePack对象是Netty服务器内部之间通信的对象。
java
package com.xk857.im.codec.proto;
/**
* @author cv大魔王
* @description 消息服务发送给tcp的包体, tepA 根据改包体解析成Message2 发给客户端
* @date 2023/8/20
*/
@Data
public class MessagePack<T> implements Serializable {
private String userId;
private Integer appId;
/** 接收方 */
private String toId;
private int clientType;
/** 消息ID */
private String messageId;
private String imei;
private Integer command;
/** 业务数据对象,如果是聊天消息则不需要解析直接透传 */
private T data;
}
MessageEncoder编码器的功能是对Netty服务器之间通信的数据进行编码,所以协议设计按最简单的来,数据长度+数据内容构成。
java
package com.xk857.im.codec;
import cn.hutool.json.JSONUtil;
import com.xk857.im.codec.proto.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 消息编码类,私有协议规则,前4位表示长度,接着command4位,后面是数据
*/
public class MessageEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if(msg instanceof MessagePack){
MessagePack msgBody = (MessagePack) msg;
String s = JSONUtil.toJsonStr(msgBody.getData());
byte[] bytes = s.getBytes();
out.writeInt(msgBody.getCommand());
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
}
启动类添加Handler
java
public class XimServer {
// ……
public XimServer(BootstrapConfig.TcpConfig config) {
server = new ServerBootstrap();
// ……
server.group(bossGroup, workerGroup)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new MessageDecoder());
sc.pipeline().addLast(new MessageEncoder());
// ……
}
});
}
public void start() {
this.server.bind(this.config.getTcpPort());
}
}