简介
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展
官方文档:
https://www.xuxueli.com/xxl-job/
本文基于xxl-job 2.2.0版本
架构图
quick start
1:初始化XxlJobSpringExecutor对象,设置调度中心地址及执行器的基本属性等
源码分析
看下XxlJobSpringExecutor的类继承图谱
可以看到实现了SmartInitializingSingleton,该接口只有一个方法,
* Invoked right at the end of the singleton pre-instantiation phase,
* with a guarantee that all regular singleton beans have been created
* already. {@link ListableBeanFactory#getBeansOfType} calls within
* this method won't trigger accidental side effects during bootstrap.
* <p><b>NOTE:</b> This callback won't be triggered for singleton beans
* lazily initialized on demand after {@link BeanFactory} bootstrap,
* and not for any other bean scope either. Carefully use it for beans
* with the intended bootstrap semantics only.
*/
void afterSingletonsInstantiated();
可以看到,在bean预初始化阶段会调用当前方法
看下XxlJobSpringExecutor的afterSingletonsInstantiated方法逻辑
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);//遍历查询所有带有XxlJob注解的方法的bean,校验其合法性,注册(存在map中);
// refresh GlueFactory
GlueFactory.refreshInstance(1);//初始化glue factory
// super start
try {
super.start();//关键点在于此:调用父类的start方法
} catch (Exception e) {
throw new RuntimeException(e);
}
}
```java
XxlJobExecutor的start方法
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);//创建log目录
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);//初始化adminBiz,
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);//启动定时清理日志线程
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();//启动job result 回调线程
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);//启动netty,绑定地址,端口
}
进一步看一下initEmbedServer方法,获取当前的ip地址,port,调用EmbedServer的start方法
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();//netty bossgroup
EventLoopGroup workerGroup = new NioEventLoopGroup();//netty workgroup
//定义线程池
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server,bind port
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
主要就是定义了一个线程池,然后启动netty server ,先看下 startRegistry(appname, address);
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);//执行注册逻辑,
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
// registry remove
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
}
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
启动注册线程,通过rest方式调用调度中心的注册接口(api/registry),接下来看下调度中心的逻辑:
JobApiController.api方法
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
其实如果存在则更新,不存在则保存,其中registryGroup对应的是Executor,registryKey对应的是执行器的appName,registryValue对应则是当前机器ip及端口
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
至此,执行器自动注册逻辑完毕
接下来看下,调度中心如何执行时是如何传输通信的?
调度中心是如何管理定时任务定时执行的?
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig() {
return adminConfig;
}
// ---------------------- XxlJobScheduler ----------------------
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
初始化bean时执行init方法
public void init() throws Exception {
// init i18n
initI18n();
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
// admin fail-monitor run
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run
JobLosedMonitorHelper.getInstance().start();
// admin trigger pool start
JobTriggerPoolHelper.toStart();
// admin log report start
JobLogReportHelper.getInstance().start();
// start-schedule
JobScheduleHelper.getInstance().start();//主要是这里:定时线程池通过jdbc加索,遍历查询并判定任务是否可执行
logger.info(">>>>>>>>> init xxl-job admin success.");
}
会把符合条件的job调用trigger触发
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
一步步的跟下去,会调用到:XxlJobRemotingUtil.postBody方法,也就是向注册的执行器发送http请求,
请求的相关参数如下:
接下来就要看执行器netty的handler如何处理请求?
执行的netty的handler链如下:->IdleStateHandler->HttpServerCodec->HttpObjectAggregator0->EmbedHttpServerHandler
依次分别是心跳检测handler,http url编解码,http body 请求解析,业务处理handler
前三个可以忽略,不了解的可以学写下netty相关知识,主要关注业务处理handler,
查看channelRead方法逻辑:
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);//获取到请求体参数
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken!=null
&& accessToken.trim().length()>0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();//心跳
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {//运行job
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
从jobHandlerRepository取出该jobHandler,放到triggerQueue中待执行,triggerQueue也由JobThread去队列中拉取任务,执行execute方法,将执行结果executeResult放到TriggerCallbackThread队列中,调用调度中心rest 接口:api/callback,更新调度日志
总结:xxl-job虽然对于中间件来说是轻量级的(学习门槛低),但是其很多设计思路都值得我们学习借鉴,涉及到的知识点也很多,netty,线程池,队列,spring的知识点等等,个人能力有限,有总结的不对的地方也请各位指出!