前言:windows下采用idea调试rocketmq(v4.7.1)
首先来看看RocketMq架构图
NameServer:无状态节点,节点之间不进行通信,每个节点都保存完整的数据,包括brkoer,生产者及消费者,topic元数据等路由信息
Broker:服务端,用于接收生产者发送消息,并持久化消息。
Producer:生产者,消息发送方
Consumer:消费者,消息接收方
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
一:NameServer启动流程分析
public class NamesrvStartup {
private static InternalLogger log;
private static Properties properties = null;
private static CommandLine commandLine = null;
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
核心类NamesrvController构造函数创建、initialize、以及start方法
先看下NamesrvController主要字段
private final NamesrvConfig namesrvConfig;//基础配置
private final NettyServerConfig nettyServerConfig;//网络通信netty的一些基础配置参数,端口,线程数目等
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));//单例线程池
private final KVConfigManager kvConfigManager;//加载user.home目录下的KvConfig.json数据到内存,持有this对象
private final RouteInfoManager routeInfoManager;//路由信息,
private RemotingServer remotingServer;//网络通信netty 模块,引用NettyServerConfig
private BrokerHousekeepingService brokerHousekeepingService;//顾名思义,对broker的一些活动(broker关闭,心跳)进行监听,并更新路由信息等
private ExecutorService remotingExecutor;//线程池
private Configuration configuration;//基础配置类
private FileWatchService fileWatchService;//ssl相关的文件监听
其他类先不谈,重点关注下路由管理这个类:RouteInfoManager
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;//NameServer 与 Broker 空闲时长,默认2分钟,在2分钟内 Nameserver 没有收到 Broker 的心跳包,则关闭该连接。
private final ReadWriteLock lock = new ReentrantReadWriteLock();//读写锁,维护map时使用
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;//主题-队列数据,key是topic名字,value是队列Queue集合,
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;//key是brokerName名字,valuebroker详情
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//集群名称,brokerName集合
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;//broker地址-
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
其中QueueData
private String brokerName;//broker名称
private int readQueueNums;//读队列个数,默认4
private int writeQueueNums;//写队列个数,默认4
private int perm;//权限
private int topicSynFlag;//同步复制 or 异步复制
BrokerData:
private String cluster;//集群名称
private String brokerName;//broker名称
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
BrokerLiveInfo:
private long lastUpdateTimestamp;//上次心跳更新时间戳
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
1:namesrcontroller构造函数
``
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
//关联kvConfig.json与HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable的管理类
this.kvConfigManager = new KVConfigManager(this);
//用于管理路由信息,比如topic队列,broker列表,集群信息,可用的broker列表,filer信息
this.routeInfoManager = new RouteInfoManager();
//顾名思义,broker的保活管理服务,监听netty各种销毁关闭事件,针对性处理
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
2:initialize()
public static NamesrvController start(final NamesrvController controller) throws Exception {
//初始化
boolean initResult = controller.initialize(); //(1)
//添加钩子方法
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
//执行销毁逻辑
controller.shutdown();
return null;
}
}));
//启动
controller.start(); //(2)
return controller;
}
(1)具体逻辑主要是
加载user home目录的kvConfig.json文件到内存中(如果文件有的话)
初始化netty的配置,第二个参数:brokerHousekeepingService实现了ChannelEventListener,用于监听channel连接,断开的执行逻辑
public boolean initialize() {
//加载user home目录的kvConfig.json文件到内存中(如果文件有的话)
this.kvConfigManager.load();
//初始化netty
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册默认的请求处理器,DefaultRequestProcessor,针对请求的处理,判断请求方式然后分发处理
this.registerProcessor();
//定时每隔10s扫描borker的保活情况
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//定时每个10秒钟扫描broker信息,比较(最后一次心跳时间+默认空闲时间2分钟)<当前时间,则说明该broker挂掉了,移除掉该broker信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
3:start方法
主要是netty绑定EventGroup,监听9876端口,注册Channel handler到pipeline
HandshakeHandler //ssl握手信息,握手完成后移除pipeline
NettyEncoder//编码器,将具体RemotingCommand对象转化成字节数据bytes[]写入到ByteBuf
NettyDecoder //解码器 基于自定义通信协议,定长取出字节,转化成RemotingCommand对象
IdleStateHandler//心跳空闲检测
NettyConnectManageHandler//负责链接建立,断开,异常等处理
NettyServerHandler// 核心逻辑,处理请求与响应
至此namesrv启动完成。
下一章我们一起分析下Rocketmq的网络通信模块是如何设计的?