RocketMq Remoting通信模块源码分析

中间件 / 消息队列 / 2022-11-13

rocketmq 通信模块解析

通信协议对我们来说一直比较什么神秘,因为偏底层。而我们在平时的编程环境中,都是基于前人给我们搭建好的软件框架来面向顶层编程,因为越是顶层可读性越高,但是背后做的工作其实非常多,比如JVM。我们平时写的对象如果要经过网络传输,就只能先转换成字节流(序列化),然后再接收到之后再通过字节流转换成对象(反序列化)。计算机只能识别二进制,而二进制对我们来说可读性又比较差, 如果让我们基于二进制编程去写业务代码,那我相信写的人不会疯掉,后面维护的人会崩溃。

言归正传,从源码结构目录看,rocket-remoting正是通信模块,那我们其内部单元测试开始(ps:这里看到,一个优秀的项目不仅架构设计合理,最基础的单元测试也非常详细,细致到一个特别简单的封装方法。所以我们在做项目过程中不管公司是否有这方面的要求,也建议做好详尽的单元测试,也有助于培养良好的架构设计的一个思路)。

@Test
public void testInvokeSync() throws InterruptedException, RemotingConnectException,
    RemotingSendRequestException, RemotingTimeoutException {
    RequestHeader requestHeader = new RequestHeader();
    requestHeader.setCount(1);
    requestHeader.setMessageTitle("Welcome");
    RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
    request.setBody("hello mq".getBytes());
    System.out.println(request.toString());
    RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 300);
    System.out.println(response.toString());
    assertTrue(response != null);
    assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
    assertThat(response.getExtFields()).hasSize(2);

}

执行此单元测试前,会执行@BeforeClass的方法,主要是启动server(比如nameserver),以及创建client实例。

先看下主要类图的继承关系

NettyRemotingServer

RemotingService声明接口,里面包含start()启动方法,shutdown()销毁方法,registerRPCHook()注册钩子函数方法
RemotingClient则声明了对于客户端来说,常用的一些方法,比如invokeSync()同步调用,invokeAsync()异步调用,以及getNameServerAddressList()获取nameserver的方法等
RemotingServer同client相似,此外还有的registerDefaultProcessor,注册默认处理器,用于分发请求
NettyRemotingAbstract则基于netty封装了一些公用的处理方法逻辑
NettyRemotingClient和NettyRemotingServer则是基于netty那套绑定eventGroup ,监听端口的start()启动方法以及销毁方法

RemotingCommand对象是通信自定义协议的对象,一切通信的交互全部封装成该对象,包括编解码操作。其主要核心成员字段:

private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();//相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应
private int flag = 0;//区分是普通RPC还是onewayRPC的标志
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC的标志 区分是普通RPC还是onewayRPC的标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap<String, String> 请求自定义扩展信息 响应自定义扩展信息

成员对象CommandCustomHeader,用transient修饰,即该对象不直接参与序列化。body[]字节数据也同样如此。

看下CommandCustomHeader的关系继承图谱:

CustomerHeader

可以看出,针对所有mq的所有命令操作都会封装成对应的Header对象去放入到RemotingCommand中,譬如创建topic的命令

public class CreateTopicRequestHeader implements CommandCustomHeader {
    @CFNotNull
    private String topic;
    @CFNotNull
    private String defaultTopic;
    @CFNotNull
    private Integer readQueueNums;
    @CFNotNull
    private Integer writeQueueNums;
    @CFNotNull
    private Integer perm;
    @CFNotNull
    private String topicFilterType;
    private Integer topicSysFlag;
    @CFNotNull
    private Boolean order = false;

封装好RemotingCommand对象之后,在调用服务端的ip以及端口触发发送操作,发送之前呢,则会触发Netty pipeline 的编码操作,

NettyEncoder

@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
    throws Exception {
    try {
       // System.out.println("即将对消息进行编码:"+remotingCommand.toString());
        ByteBuffer header = remotingCommand.encodeHeader();
        out.writeBytes(header);
        byte[] body = remotingCommand.getBody();
        if (body != null) {
            out.writeBytes(body);
        }
    } catch (Exception e) {
        log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
        if (remotingCommand != null) {
            log.error(remotingCommand.toString());
        }
        RemotingUtil.closeChannel(ctx.channel());
    }
}

可以看到编码操作主要分为了2步,一个是写入header部分,一个写入body部分数据。

先来看看其自定义网络协议是由哪几部分组成的:

自定义通信协议

第一部分相当于是一个总长度,也就是int类型,占4个字节,

第二部分第一个字节是header的一个序列化类型,其余三个字节存储header的长度,总共4个字节

第三部分则存储序列化后的header对象

第四部分存储自定义body的内容

public ByteBuffer encodeHeader(final int bodyLength) {
        // 1> header length size,总长度占4个字节
        int length = 4;

        // 2> header data length
        byte[] headerData;
        //headerEncode则是将CommandCustomHeader对象数据通过反射方式将字段名称以及值放在extFieldsmap中,然后对整个对象进行json序列化,
        //譬如{"code":0,"extFields":{"count":"1","messageTitle":"Welcome"},"flag":0,"language":"JAVA","opaque":0,"serializeTypeCurrentRPC":"JSON","version":0}转bytes数组
        headerData = this.headerEncode();

        //总长度加上headerData的长度
        length += headerData.length;

        // 3> body data length,加上自定义body的长度
        length += bodyLength;
        //分配(4+length-bodyLength)也就是(4+4+headerDataLength)长度的ByteBuffer
        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
        //前4个字节写入总长度
        // length
        result.putInt(length);
        //第二个4字节写入header的长度
        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
        //写入headerData的内容
        // header data
        result.put(headerData);

        result.flip();

        return result;
    }

最后再写入body数据。客户端编码完成。

接下来看解码操作:

NettyDecoder 继承于LengthFieldBasedFrameDecoder,LengthFieldBasedFrameDecoder是netty中自带的基于自定义长度的解码器,其构造函数
    public NettyDecoder() {
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }
查看父类构造函数定义得知:
maxFrameLength – the maximum length of the frame. If the length of the frame is greater than this value, TooLongFrameException will be thrown.
lengthFieldOffset – the offset of the length field
lengthFieldLength – the length of the length field
lengthAdjustment – the compensation value to add to the value of the length field
initialBytesToStrip – the number of first bytes to strip out from the decoded frame,
第一个参数表示数据包的最大长度,如果实际长度超过此长度则报异常
第二个参数标识长度域的偏移量,此处为0代表无偏移,因为我们第一个字节就是传入的长度域,因此无偏移。
第三个参数表示长度域的长度大小,我们用的int来标识,所以是占4个字节,因此此处传入4
第四个参数表示要添加到长度字段值的补偿调整值,此处不需要调整,因此传入0
第五个参数表示跳过前4个长度域的字节,应用解码器就拿不到总长度的这个数据

NettyDecode的解码方法

 @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            //经过父类解码器操作,实际拿到的即为去除长度域后的数据包
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            ByteBuffer byteBuffer = frame.nioBuffer();
            //对数据包进行解码,还原成对象的操作
            RemotingCommand remotingCommand =  RemotingCommand.decode(byteBuffer);
            //System.out.println("对消息进行解码:"+remotingCommand.toString());
            return  remotingCommand;
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }
    
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    int length = byteBuffer.limit();//获取字节缓冲区总长度,也就是网络协议中第2,3,4部分的总长
    int oriHeaderLen = byteBuffer.getInt();//取出第一个4字节的长度,也就是header总长度
    int headerLength = getHeaderLength(oriHeaderLen);//取int后24位为header实际长度,前8位为序列化的类型

    byte[] headerData = new byte[headerLength];
    //根据headerData实际长度,读取出这些字节来
    byteBuffer.get(headerData);
    //根据前8位序列化类型,还原成remotingCommand对象
    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    int bodyLength = length - 4 - headerLength;//取出body的总长度
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    //还原body
    cmd.body = bodyData;

    return cmd;
}

接下来继续看请求的流转:

在经过decode之后,进入到server handler的方法:

@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
		//处理所有请求
        processMessageReceived(ctx, msg);
    }
}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
            	//处理请求方法
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
            	//处理响应方法
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}
processRequestCommand的业务代码比较多,
核心逻辑:
1):根据requestCode判断是否有对应的请求处理器,即NettyRequestProcessor
2):如果有的话,就异步执行asyncProcessRequest方法,并注册callback函数
3):如果没有的话,就走默认的DefaultRequestProcessor处理逻辑
4):执行callback函数,返回response类型的RemotingCommand

在此处,requestCode为0,而且也添加了默认的处理器,如下图
    
   remotingServer.registerProcessor(0, new AsyncNettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
                request.setRemark("Hi " + ctx.channel().remoteAddress());
                return request;
            }

            @Override
            public boolean rejectRequest() {
                return false;
            }
        }, Executors.newCachedThreadPool());

接着就又是服务端的encode编码,客户端的decode,执行remotingClient的clientHandler处理器的channelRead方法,

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

然后从responseTable根据请求id取出ResponseFuture,ResponseFuture为异步接收结果的一个类,里面还涉及信号量,CAS的知识,这些平时如果工作只编码基础业务代码可能很少用得上。

通信模块的代码就暂时分析到这了,如果有理解不对的地方欢迎大家留言指正。