xxl-job通信设计流程

通信底层介绍
xxl-job 使用 netty http 的方式进行通信,虽然也支持 mina,jetty,netty tcp 等方式,但是代码里面固定写死的是 netty http。
通信整体流程
我以调度器通知执行器执行任务为例,绘制的活动图:
活动图
惊艳的设计
看完了整个处理流程代码,设计上可以说独具匠心,将 netty,多线程的知识运用得行云流水。
我现在就将这些设计上出彩的点总结如下:
使用动态代理模式,隐藏通信细节
xxl-job 定义了两个接口 executorbiz,adminbiz,executorbiz 接口中封装了向心跳,暂停,触发执行等操作,adminbiz 封装了回调,注册,取消注册操作,接口的实现类中,并没有通信相关的处理。
xxlrpcreferencebean 类的 getobject() 方法会生成一个代理类,这个代理类会进行远程通信。
全异步处理
执行器收到消息进行反序列化,并没有同步执行任务代码,而是将任务信息存储在 linkedblockingqueue 中,异步线程从这个队列中获取任务信息,然后执行。
而任务的处理结果,也不是说处理完之后,同步返回的,也是放到回调线程的阻塞队列中,异步的将处理结果返回回去。
这样处理的好处就是减少了 netty 工作线程的处理时间,提升了吞吐量。
对异步处理的包装
对异步处理进行了包装,代码看起来是同步调用的。
我们看下调度器,xxljobtrigger 类触发任务执行的代码:
public static returnt runexecutor(triggerparam triggerparam, string address){    returnt runresult = null;    try {        executorbiz executorbiz = xxljobscheduler.getexecutorbiz(address);        //这里面做了很多异步处理,最终同步得到处理结果        runresult = executorbiz.run(triggerparam);    } catch (exception e) {        logger.error(>>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running., address, e);        runresult = new returnt(returnt.fail_code, throwableutil.tostring(e));    }    stringbuffer runresultsb = new stringbuffer(i18nutil.getstring(jobconf_trigger_run) + :);    runresultsb.append(
address:).append(address);    runresultsb.append(
code:).append(runresult.getcode());    runresultsb.append(
msg:).append(runresult.getmsg());    runresult.setmsg(runresultsb.tostring());    return runresult;} executorbiz.run 方法我们说过了,是走的动态代理,和执行器进行通信,执行器执行结果也是异步处理完,才返回的,而这里看到的 run 方法是同步等待处理结果返回。
我们看下xxl-job是如何同步获取处理结果的:调度器向执行器发出消息后,该线程阻塞。等到执行器处理完毕后,将处理结果返回,唤醒被阻塞的线程,调用处拿到返回值。
动态代理代码如下:
//代理类中的触发调用if (calltype.sync == calltype) {   // future-response set   xxlrpcfutureresponse futureresponse = new xxlrpcfutureresponse(invokerfactory, xxlrpcrequest, null);   try {      // do invoke      client.asyncsend(finaladdress, xxlrpcrequest);      // future get      xxlrpcresponse xxlrpcresponse = futureresponse.get(timeout, timeunit.milliseconds);      if (xxlrpcresponse.geterrormsg() != null) {         throw new xxlrpcexception(xxlrpcresponse.geterrormsg());      }      return xxlrpcresponse.getresult();   } catch (exception e) {      logger.info(>>>>>>>>>>> xxl-rpc, invoke error, address:{}, xxlrpcrequest{}, finaladdress, xxlrpcrequest);      throw (e instanceof xxlrpcexception)?e:new xxlrpcexception(e);   } finally{      // future-response remove      futureresponse.removeinvokerfuture();   }}  xxlrpcfutureresponse 类中实现了线程的等待,和线程唤醒的处理:
//返回结果,唤醒线程public void setresponse(xxlrpcresponse response) {   this.response = response;   synchronized (lock) {      done = true;      lock.notifyall();   }}@override    public xxlrpcresponse get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception {        if (!done) {            synchronized (lock) {                try {                    if (timeout < 0) {            //线程阻塞                        lock.wait();                    } else {                        long timeoutmillis = (timeunit.milliseconds==unit)?timeout:timeunit.milliseconds.convert(timeout , unit);                        lock.wait(timeoutmillis);                    }                } catch (interruptedexception e) {                    throw e;                }            }        }        if (!done) {            throw new xxlrpcexception(xxl-rpc, request timeout at:+ system.currenttimemillis() +, request: + request.tostring());        }        return response;    }
有的同学可能会问了,调度器接收到返回结果,怎么确定唤醒哪个线程呢?
每一次远程调用,都会生成 uuid 的请求 id,这个 id 是在整个调用过程中一直传递的,就像一把钥匙,在你回家的的时候,拿着它就带开门。
这里拿着请求 id 这把钥匙,就能找到对应的 xxlrpcfutureresponse,然后调用 setresponse 方法,设置返回值,唤醒线程。
public void notifyinvokerfuture(string requestid, final xxlrpcresponse xxlrpcresponse){


iPhone8将通过双卡双待和双曲面屏向安卓机皇三星手机逼近
这几家芯片商加强布局,瞄准车用半导体市场
正式确认Redmi K30 Pro将采用Snapdragon 865 SoC
《程序设计与数据结构》——什么是类?
相机传感器的常用接口USB和GigE讲解
xxl-job通信设计流程
智能手机产业发展成熟 5G发展的机会与挑战
一款解决视觉处理功耗WiseEye IoT传感器解决方案
如何给电脑USB端口加锁
泰克携手第三代半导体产业技术创新战略联盟标准化委员会(CASAS)攻克第三代半导体测试难题
知行合一打开混合云之门
董明珠为何铁了心,孤注一掷也要造电动车?
18650电芯的容量是多少_18650电芯容量测试
关于iPhone13系列的最新消息
MAX5723,8/10/12位缓冲带内部参考和SPI接口输出DAC
使用NodeMCU构建低成本wifi中继器
Marvell公布旗下Arm服务器芯片,性能将提升至两倍
为远程无线设备供电:了解您的电池和应用
自制投影的灯电源
怎样使Bluetooth 4.0 CC2540模块连接到Android智能手机或平板电脑