RocketMq NameServer源码分析

中间件 / 消息队列 / 2022-11-11

前言:windows下采用idea调试rocketmq(v4.7.1)

首先来看看RocketMq架构图

rocketmq_architecture

NameServer:无状态节点,节点之间不进行通信,每个节点都保存完整的数据,包括brkoer,生产者及消费者,topic元数据等路由信息

Broker:服务端,用于接收生产者发送消息,并持久化消息。

Producer:生产者,消息发送方

Consumer:消费者,消息接收方

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

一:NameServer启动流程分析

public class NamesrvStartup {

    private static InternalLogger log;
    private static Properties properties = null;
    private static CommandLine commandLine = null;

    public static void main(String[] args) {
        main0(args);
    }
public static NamesrvController main0(String[] args) {

    try {
        NamesrvController controller = createNamesrvController(args);
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

核心类NamesrvController构造函数创建、initialize、以及start方法

先看下NamesrvController主要字段

private final NamesrvConfig namesrvConfig;//基础配置

    private final NettyServerConfig nettyServerConfig;//网络通信netty的一些基础配置参数,端口,线程数目等

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));//单例线程池
    private final KVConfigManager kvConfigManager;//加载user.home目录下的KvConfig.json数据到内存,持有this对象
    private final RouteInfoManager routeInfoManager;//路由信息,

    private RemotingServer remotingServer;//网络通信netty 模块,引用NettyServerConfig

    private BrokerHousekeepingService brokerHousekeepingService;//顾名思义,对broker的一些活动(broker关闭,心跳)进行监听,并更新路由信息等

    private ExecutorService remotingExecutor;//线程池

    private Configuration configuration;//基础配置类
    private FileWatchService fileWatchService;//ssl相关的文件监听

其他类先不谈,重点关注下路由管理这个类:RouteInfoManager

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;//NameServer 与 Broker 空闲时长,默认2分钟,在2分钟内 Nameserver 没有收到 Broker 的心跳包,则关闭该连接。
private final ReadWriteLock lock = new ReentrantReadWriteLock();//读写锁,维护map时使用
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;//主题-队列数据,key是topic名字,value是队列Queue集合,
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;//key是brokerName名字,valuebroker详情
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//集群名称,brokerName集合
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;//broker地址-
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

其中QueueData

private String brokerName;//broker名称
private int readQueueNums;//读队列个数,默认4
private int writeQueueNums;//写队列个数,默认4
private int perm;//权限
private int topicSynFlag;//同步复制 or 异步复制

BrokerData:

private String cluster;//集群名称
private String brokerName;//broker名称
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

BrokerLiveInfo:

private long lastUpdateTimestamp;//上次心跳更新时间戳
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;

1:namesrcontroller构造函数

``

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
    this.namesrvConfig = namesrvConfig;
    this.nettyServerConfig = nettyServerConfig;
    
    //关联kvConfig.json与HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable的管理类
    this.kvConfigManager = new KVConfigManager(this);
    //用于管理路由信息,比如topic队列,broker列表,集群信息,可用的broker列表,filer信息
    this.routeInfoManager = new RouteInfoManager();
    //顾名思义,broker的保活管理服务,监听netty各种销毁关闭事件,针对性处理
    this.brokerHousekeepingService = new BrokerHousekeepingService(this);
    this.configuration = new Configuration(
        log,
        this.namesrvConfig, this.nettyServerConfig
    );
    this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

2:initialize()

public static NamesrvController start(final NamesrvController controller) throws Exception {

 
    //初始化
    boolean initResult = controller.initialize(); //(1)  
    //添加钩子方法
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            //执行销毁逻辑
            controller.shutdown();
            return null;
        }
    }));
    //启动
    controller.start(); //(2)

    return controller;
}

(1)具体逻辑主要是

​ 加载user home目录的kvConfig.json文件到内存中(如果文件有的话)

​ 初始化netty的配置,第二个参数:brokerHousekeepingService实现了ChannelEventListener,用于监听channel连接,断开的执行逻辑

public boolean initialize() {
    //加载user home目录的kvConfig.json文件到内存中(如果文件有的话)
    this.kvConfigManager.load();
    //初始化netty
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    //注册默认的请求处理器,DefaultRequestProcessor,针对请求的处理,判断请求方式然后分发处理
    this.registerProcessor();

    //定时每隔10s扫描borker的保活情况
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
     //定时每个10秒钟扫描broker信息,比较(最后一次心跳时间+默认空闲时间2分钟)<当前时间,则说明该broker挂掉了,移除掉该broker信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

3:start方法

主要是netty绑定EventGroup,监听9876端口,注册Channel handler到pipeline

HandshakeHandler  //ssl握手信息,握手完成后移除pipeline
NettyEncoder//编码器,将具体RemotingCommand对象转化成字节数据bytes[]写入到ByteBuf
NettyDecoder //解码器 基于自定义通信协议,定长取出字节,转化成RemotingCommand对象
IdleStateHandler//心跳空闲检测
NettyConnectManageHandler//负责链接建立,断开,异常等处理
NettyServerHandler// 核心逻辑,处理请求与响应

至此namesrv启动完成。

下一章我们一起分析下Rocketmq的网络通信模块是如何设计的?