xxl-job源码浅析

中间件 / 2021-05-08

简介

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展

官方文档:

https://www.xuxueli.com/xxl-job/

本文基于xxl-job 2.2.0版本

架构图

image-1667906871678

quick start

1:初始化XxlJobSpringExecutor对象,设置调度中心地址及执行器的基本属性等

image-1667906885927

image-1667906893480

源码分析

看下XxlJobSpringExecutor的类继承图谱

image-1667906905692

可以看到实现了SmartInitializingSingleton,该接口只有一个方法,

 * Invoked right at the end of the singleton pre-instantiation phase,
 * with a guarantee that all regular singleton beans have been created
 * already. {@link ListableBeanFactory#getBeansOfType} calls within
 * this method won't trigger accidental side effects during bootstrap.
 * <p><b>NOTE:</b> This callback won't be triggered for singleton beans
 * lazily initialized on demand after {@link BeanFactory} bootstrap,
 * and not for any other bean scope either. Carefully use it for beans
 * with the intended bootstrap semantics only.
 */
void afterSingletonsInstantiated();


可以看到,在bean预初始化阶段会调用当前方法

看下XxlJobSpringExecutor的afterSingletonsInstantiated方法逻辑

@Override
public void afterSingletonsInstantiated() {

// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/

// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);//遍历查询所有带有XxlJob注解的方法的bean,校验其合法性,注册(存在map中);

// refresh GlueFactory
GlueFactory.refreshInstance(1);//初始化glue factory

// super start
try {
    super.start();//关键点在于此:调用父类的start方法
} catch (Exception e) {
    throw new RuntimeException(e);
}

}


```java
XxlJobExecutor的start方法
public void start() throws Exception {

    // init logpath
    XxlJobFileAppender.initLogPath(logPath);//创建log目录

    // init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);//初始化adminBiz,


    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);//启动定时清理日志线程

    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();//启动job result 回调线程

    // init executor-server
    initEmbedServer(address, ip, port, appname, accessToken);//启动netty,绑定地址,端口
}

进一步看一下initEmbedServer方法,获取当前的ip地址,port,调用EmbedServer的start方法

public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {

        @Override
        public void run() {

            // param
            EventLoopGroup bossGroup = new NioEventLoopGroup();//netty bossgroup
            EventLoopGroup workerGroup = new NioEventLoopGroup();//netty workgroup
            //定义线程池
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                    0,
                    200,
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });


            try {
                // start server,bind port
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // start registry
                startRegistry(appname, address);

                // wait util stop
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                if (e instanceof InterruptedException) {
                    logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                } else {
                    logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                }
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

        }

    });
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

主要就是定义了一个线程池,然后启动netty server ,先看下 startRegistry(appname, address);

registryThread = new Thread(new Runnable() {
    @Override
    public void run() {

        // registry
        while (!toStop) {
            try {
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        ReturnT<String> registryResult = adminBiz.registry(registryParam);//执行注册逻辑,
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                    }

                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }

            }

            try {
                if (!toStop) {
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                }
            } catch (InterruptedException e) {
                if (!toStop) {
                    logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                }
            }
        }

        // registry remove
        try {
            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                try {
                    ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                    if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                        registryResult = ReturnT.SUCCESS;
                        logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        break;
                    } else {
                        logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                    }

                }

            }
        } catch (Exception e) {
            if (!toStop) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

    }
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();

启动注册线程,通过rest方式调用调度中心的注册接口(api/registry),接下来看下调度中心的逻辑:

JobApiController.api方法
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);

其实如果存在则更新,不存在则保存,其中registryGroup对应的是Executor,registryKey对应的是执行器的appName,registryValue对应则是当前机器ip及端口

int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
    xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

至此,执行器自动注册逻辑完毕

接下来看下,调度中心如何执行时是如何传输通信的?

调度中心是如何管理定时任务定时执行的?

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() {
        return adminConfig;
    }


    // ---------------------- XxlJobScheduler ----------------------

    private XxlJobScheduler xxlJobScheduler;

    @Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }

初始化bean时执行init方法

public void init() throws Exception {
    // init i18n
    initI18n();

    // admin registry monitor run
    JobRegistryMonitorHelper.getInstance().start();

    // admin fail-monitor run
    JobFailMonitorHelper.getInstance().start();

    // admin lose-monitor run
    JobLosedMonitorHelper.getInstance().start();

    // admin trigger pool start
    JobTriggerPoolHelper.toStart();

    // admin log report start
    JobLogReportHelper.getInstance().start();

    // start-schedule
    JobScheduleHelper.getInstance().start();//主要是这里:定时线程池通过jdbc加索,遍历查询并判定任务是否可执行

    logger.info(">>>>>>>>> init xxl-job admin success.");
}

会把符合条件的job调用trigger触发

JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

一步步的跟下去,会调用到:XxlJobRemotingUtil.postBody方法,也就是向注册的执行器发送http请求,

请求的相关参数如下:

image-20210508170323887.png

接下来就要看执行器netty的handler如何处理请求?

执行的netty的handler链如下:->IdleStateHandler->HttpServerCodec->HttpObjectAggregator0->EmbedHttpServerHandler

依次分别是心跳检测handler,http url编解码,http body 请求解析,业务处理handler

前三个可以忽略,不了解的可以学写下netty相关知识,主要关注业务处理handler,

查看channelRead方法逻辑:

@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

    // request parse
    //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); 
    String requestData = msg.content().toString(CharsetUtil.UTF_8);//获取到请求体参数
    String uri = msg.uri();
    HttpMethod httpMethod = msg.method();
    boolean keepAlive = HttpUtil.isKeepAlive(msg);
    String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

    // invoke
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // do invoke
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

            // to json
            String responseJson = GsonTool.toJson(responseObj);

            // write response
            writeResponse(ctx, keepAlive, responseJson);
        }
    });
}

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

    // valid
    if (HttpMethod.POST != httpMethod) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    }
    if (uri==null || uri.trim().length()==0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    }
    if (accessToken!=null
            && accessToken.trim().length()>0
            && !accessToken.equals(accessTokenReq)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    }

    // services mapping
    try {
        if ("/beat".equals(uri)) {
            return executorBiz.beat();//心跳
        } else if ("/idleBeat".equals(uri)) {
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);
        } else if ("/run".equals(uri)) {//运行job
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        } else if ("/kill".equals(uri)) {
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        } else if ("/log".equals(uri)) {
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
    }
}

从jobHandlerRepository取出该jobHandler,放到triggerQueue中待执行,triggerQueue也由JobThread去队列中拉取任务,执行execute方法,将执行结果executeResult放到TriggerCallbackThread队列中,调用调度中心rest 接口:api/callback,更新调度日志

总结:xxl-job虽然对于中间件来说是轻量级的(学习门槛低),但是其很多设计思路都值得我们学习借鉴,涉及到的知识点也很多,netty,线程池,队列,spring的知识点等等,个人能力有限,有总结的不对的地方也请各位指出!