rocketmq 通信模块解析
通信协议对我们来说一直比较什么神秘,因为偏底层。而我们在平时的编程环境中,都是基于前人给我们搭建好的软件框架来面向顶层编程,因为越是顶层可读性越高,但是背后做的工作其实非常多,比如JVM。我们平时写的对象如果要经过网络传输,就只能先转换成字节流(序列化),然后再接收到之后再通过字节流转换成对象(反序列化)。计算机只能识别二进制,而二进制对我们来说可读性又比较差, 如果让我们基于二进制编程去写业务代码,那我相信写的人不会疯掉,后面维护的人会崩溃。
言归正传,从源码结构目录看,rocket-remoting正是通信模块,那我们其内部单元测试开始(ps:这里看到,一个优秀的项目不仅架构设计合理,最基础的单元测试也非常详细,细致到一个特别简单的封装方法。所以我们在做项目过程中不管公司是否有这方面的要求,也建议做好详尽的单元测试,也有助于培养良好的架构设计的一个思路)。
@Test
public void testInvokeSync() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RequestHeader requestHeader = new RequestHeader();
requestHeader.setCount(1);
requestHeader.setMessageTitle("Welcome");
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
request.setBody("hello mq".getBytes());
System.out.println(request.toString());
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 300);
System.out.println(response.toString());
assertTrue(response != null);
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(response.getExtFields()).hasSize(2);
}
执行此单元测试前,会执行@BeforeClass的方法,主要是启动server(比如nameserver),以及创建client实例。
先看下主要类图的继承关系
RemotingService声明接口,里面包含start()启动方法,shutdown()销毁方法,registerRPCHook()注册钩子函数方法
RemotingClient则声明了对于客户端来说,常用的一些方法,比如invokeSync()同步调用,invokeAsync()异步调用,以及getNameServerAddressList()获取nameserver的方法等
RemotingServer同client相似,此外还有的registerDefaultProcessor,注册默认处理器,用于分发请求
NettyRemotingAbstract则基于netty封装了一些公用的处理方法逻辑
NettyRemotingClient和NettyRemotingServer则是基于netty那套绑定eventGroup ,监听端口的start()启动方法以及销毁方法
RemotingCommand对象是通信自定义协议的对象,一切通信的交互全部封装成该对象,包括编解码操作。其主要核心成员字段:
private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();//相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应
private int flag = 0;//区分是普通RPC还是onewayRPC的标志
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
Header字段 | 类型 | Request说明 | Response说明 |
---|---|---|---|
code | int | 请求操作码,应答方根据不同的请求码进行不同的业务处理 | 应答响应码。0表示成功,非0则表示各种错误 |
language | LanguageCode | 请求方实现的语言 | 应答方实现的语言 |
version | int | 请求方程序的版本 | 应答方程序的版本 |
opaque | int | 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回 |
flag | int | 区分是普通RPC还是onewayRPC的标志 | 区分是普通RPC还是onewayRPC的标志 |
remark | String | 传输自定义文本信息 | 传输自定义文本信息 |
extFields | HashMap<String, String> | 请求自定义扩展信息 | 响应自定义扩展信息 |
成员对象CommandCustomHeader,用transient修饰,即该对象不直接参与序列化。body[]字节数据也同样如此。
看下CommandCustomHeader的关系继承图谱:
可以看出,针对所有mq的所有命令操作都会封装成对应的Header对象去放入到RemotingCommand中,譬如创建topic的命令
public class CreateTopicRequestHeader implements CommandCustomHeader {
@CFNotNull
private String topic;
@CFNotNull
private String defaultTopic;
@CFNotNull
private Integer readQueueNums;
@CFNotNull
private Integer writeQueueNums;
@CFNotNull
private Integer perm;
@CFNotNull
private String topicFilterType;
private Integer topicSysFlag;
@CFNotNull
private Boolean order = false;
封装好RemotingCommand对象之后,在调用服务端的ip以及端口触发发送操作,发送之前呢,则会触发Netty pipeline 的编码操作,
NettyEncoder
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
// System.out.println("即将对消息进行编码:"+remotingCommand.toString());
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
可以看到编码操作主要分为了2步,一个是写入header部分,一个写入body部分数据。
先来看看其自定义网络协议是由哪几部分组成的:
第一部分相当于是一个总长度,也就是int类型,占4个字节,
第二部分第一个字节是header的一个序列化类型,其余三个字节存储header的长度,总共4个字节
第三部分则存储序列化后的header对象
第四部分存储自定义body的内容
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size,总长度占4个字节
int length = 4;
// 2> header data length
byte[] headerData;
//headerEncode则是将CommandCustomHeader对象数据通过反射方式将字段名称以及值放在extFieldsmap中,然后对整个对象进行json序列化,
//譬如{"code":0,"extFields":{"count":"1","messageTitle":"Welcome"},"flag":0,"language":"JAVA","opaque":0,"serializeTypeCurrentRPC":"JSON","version":0}转bytes数组
headerData = this.headerEncode();
//总长度加上headerData的长度
length += headerData.length;
// 3> body data length,加上自定义body的长度
length += bodyLength;
//分配(4+length-bodyLength)也就是(4+4+headerDataLength)长度的ByteBuffer
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
//前4个字节写入总长度
// length
result.putInt(length);
//第二个4字节写入header的长度
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
//写入headerData的内容
// header data
result.put(headerData);
result.flip();
return result;
}
最后再写入body数据。客户端编码完成。
接下来看解码操作:
NettyDecoder 继承于LengthFieldBasedFrameDecoder,LengthFieldBasedFrameDecoder是netty中自带的基于自定义长度的解码器,其构造函数
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
查看父类构造函数定义得知:
maxFrameLength – the maximum length of the frame. If the length of the frame is greater than this value, TooLongFrameException will be thrown.
lengthFieldOffset – the offset of the length field
lengthFieldLength – the length of the length field
lengthAdjustment – the compensation value to add to the value of the length field
initialBytesToStrip – the number of first bytes to strip out from the decoded frame,
第一个参数表示数据包的最大长度,如果实际长度超过此长度则报异常
第二个参数标识长度域的偏移量,此处为0代表无偏移,因为我们第一个字节就是传入的长度域,因此无偏移。
第三个参数表示长度域的长度大小,我们用的int来标识,所以是占4个字节,因此此处传入4
第四个参数表示要添加到长度字段值的补偿调整值,此处不需要调整,因此传入0
第五个参数表示跳过前4个长度域的字节,应用解码器就拿不到总长度的这个数据
NettyDecode的解码方法
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
//经过父类解码器操作,实际拿到的即为去除长度域后的数据包
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
//对数据包进行解码,还原成对象的操作
RemotingCommand remotingCommand = RemotingCommand.decode(byteBuffer);
//System.out.println("对消息进行解码:"+remotingCommand.toString());
return remotingCommand;
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();//获取字节缓冲区总长度,也就是网络协议中第2,3,4部分的总长
int oriHeaderLen = byteBuffer.getInt();//取出第一个4字节的长度,也就是header总长度
int headerLength = getHeaderLength(oriHeaderLen);//取int后24位为header实际长度,前8位为序列化的类型
byte[] headerData = new byte[headerLength];
//根据headerData实际长度,读取出这些字节来
byteBuffer.get(headerData);
//根据前8位序列化类型,还原成remotingCommand对象
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;//取出body的总长度
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
//还原body
cmd.body = bodyData;
return cmd;
}
接下来继续看请求的流转:
在经过decode之后,进入到server handler的方法:
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
//处理所有请求
processMessageReceived(ctx, msg);
}
}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
//处理请求方法
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
//处理响应方法
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
processRequestCommand的业务代码比较多,
核心逻辑:
1):根据requestCode判断是否有对应的请求处理器,即NettyRequestProcessor
2):如果有的话,就异步执行asyncProcessRequest方法,并注册callback函数
3):如果没有的话,就走默认的DefaultRequestProcessor处理逻辑
4):执行callback函数,返回response类型的RemotingCommand
在此处,requestCode为0,而且也添加了默认的处理器,如下图
remotingServer.registerProcessor(0, new AsyncNettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.channel().remoteAddress());
return request;
}
@Override
public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
接着就又是服务端的encode编码,客户端的decode,执行remotingClient的clientHandler处理器的channelRead方法,
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
然后从responseTable根据请求id取出ResponseFuture,ResponseFuture为异步接收结果的一个类,里面还涉及信号量,CAS的知识,这些平时如果工作只编码基础业务代码可能很少用得上。
通信模块的代码就暂时分析到这了,如果有理解不对的地方欢迎大家留言指正。