一、前言
异步执行对于开发者来说并不陌生,在实际的开发过程中,很多场景多会使用到异步,相比同步执行,异步可以大大缩短请求链路耗时时间,比如:「发送短信、邮件、异步更新等」,这些都是典型的可以通过异步实现的场景。本文介绍了java实现异步编程的八种方式
二、异步的八种实现方式
线程thread
future
异步框架completablefuture
spring注解@async
spring applicationevent事件
消息队列
第三方异步框架,比如hutool的threadutil
guava异步
三、什么是异步?
首先我们先看一个常见的用户下单的场景:
在同步操作中,我们执行到 「发送短信」 的时候,我们必须等待这个方法彻底执行完才能执行 「赠送积分」 这个操作,如果 「赠送积分」 这个动作执行时间较长,发送短信需要等待,这就是典型的同步场景。
实际上,发送短信和赠送积分没有任何的依赖关系,通过异步,我们可以实现赠送积分和发送短信这两个操作能够同时进行,比如:
这就是所谓的异步,是不是非常简单,下面就说说异步的几种实现方式吧。
四、异步编程
4.1 线程异步
public class asyncthread extends thread { @override public void run() { system.out.println(current thread name: + thread.currentthread().getname() + send email success!); } public static void main(string[] args) { asyncthread asyncthread = new asyncthread(); asyncthread.run(); }}
当然如果每次都创建一个thread线程,频繁的创建、销毁,浪费系统资源,我们可以采用线程池:
private executorservice executorservice = executors.newcachedthreadpool();public void fun() { executorservice.submit(new runnable() { @override public void run() { log.info(执行业务逻辑...); } });}
可以将业务逻辑封装到runnable或callable中,交由线程池来执行。
4.2 future异步
@slf4jpublic class futuremanager { public string execute() throws exception { executorservice executor = executors.newfixedthreadpool(1); future future = executor.submit(new callable() { @override public string call() throws exception { system.out.println( --- task start --- ); thread.sleep(3000); system.out.println( --- task finish ---); return this is future execute final result!!!; } }); //这里需要返回值时会阻塞主线程 string result = future.get(); log.info(future get result: {}, result); return result; } @sneakythrows public static void main(string[] args) { futuremanager manager = new futuremanager(); manager.execute(); }}
输出结果:
--- task start --- --- task finish --- future get result: this is future execute final result!!!
4.2.1 future的不足之处
future的不足之处的包括以下几点:
无法被动接收异步任务的计算结果:虽然我们可以主动将异步任务提交给线程池中的线程来执行,但是待异步任务执行结束之后,主线程无法得到任务完成与否的通知,它需要通过get方法主动获取任务执行的结果。
2. future件彼此孤立:有时某一个耗时很长的异步任务执行结束之后,你想利用它返回的结果再做进一步的运算,该运算也会是一个异步任务,两者之间的关系需要程序开发人员手动进行绑定赋予,future并不能将其形成一个任务流(pipeline),每一个future都是彼此之间都是孤立的,所以才有了后面的completablefuture,completablefuture就可以将多个future串联起来形成任务流。
3. futrue没有很好的错误处理机制:截止目前,如果某个异步任务在执行发的过程中发生了异常,调用者无法被动感知,必须通过捕获get方法的异常才知晓异步任务执行是否出现了错误,从而在做进一步的判断处理。
4.3 completablefuture实现异步
public class completablefuturecompose { /** * thenaccept子任务和父任务公用同一个线程 */ @sneakythrows public static void thenrunasync() { completablefuture cf1 = completablefuture.supplyasync(() -> { system.out.println(thread.currentthread() + cf1 do something....); return 1; }); completablefuture cf2 = cf1.thenrunasync(() -> { system.out.println(thread.currentthread() + cf2 do something...); }); //等待任务1执行完成 system.out.println(cf1结果-> + cf1.get()); //等待任务2执行完成 system.out.println(cf2结果-> + cf2.get()); } public static void main(string[] args) { thenrunasync(); }}
我们不需要显式使用executorservice,completablefuture 内部使用了forkjoinpool来处理异步任务,如果在某些业务场景我们想自定义自己的异步线程池也是可以的。
4.4 spring的@async异步
4.4.1 自定义异步线程池
/** * 线程池参数配置,多个线程池实现线程池隔离,@async注解,默认使用系统自定义线程池,可在项目中设置多个线程池,在异步调用的时候,指明需要调用的线程池名称,比如:@async(taskname)@enableasync@configurationpublic class taskpoolconfig { /** * 自定义线程池 * **/ @bean(taskexecutor) public executor taskexecutor() { //返回可用处理器的java虚拟机的数量 12 int i = runtime.getruntime().availableprocessors(); system.out.println(系统最大线程数 : + i); threadpooltaskexecutor executor = new threadpooltaskexecutor(); //核心线程池大小 executor.setcorepoolsize(16); //最大线程数 executor.setmaxpoolsize(20); //配置队列容量,默认值为integer.max_value executor.setqueuecapacity(99999); //活跃时间 executor.setkeepaliveseconds(60); //线程名字前缀 executor.setthreadnameprefix(asyncserviceexecutor -); //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行 executor.setawaitterminationseconds(60); //等待所有的任务结束后再关闭线程池 executor.setwaitfortaskstocompleteonshutdown(true); return executor; }}
4.4.2 asyncservice
public interface asyncservice { messageresult sendsms(string callprefix, string mobile, string actiontype, string content); messageresult sendemail(string email, string subject, string content);}@slf4j@servicepublic class asyncserviceimpl implements asyncservice { @autowired private imessagehandler mesagehandler; @override @async(taskexecutor) public messageresult sendsms(string callprefix, string mobile, string actiontype, string content) { try { thread.sleep(1000); mesagehandler.sendsms(callprefix, mobile, actiontype, content); } catch (exception e) { log.error(发送短信异常 -> , e) } } @override @async(taskexecutor) public sendemail(string email, string subject, string content) { try { thread.sleep(1000); mesagehandler.sendsendemail(email, subject, content); } catch (exception e) { log.error(发送email异常 -> , e) } }}
在实际项目中, 使用@async调用线程池,推荐等方式是是使用自定义线程池的模式,不推荐直接使用@async直接实现异步。
4.5 spring applicationevent事件实现异步
4.5.1 定义事件
public class asyncsendemailevent extends applicationevent { /** * 邮箱 **/ private string email; /** * 主题 **/ private string subject; /** * 内容 **/ private string content; /** * 接收者 **/ private string targetuserid;}
4.5.2 定义事件处理器
@slf4j@componentpublic class asyncsendemaileventhandler implements applicationlistener { @autowired private imessagehandler mesagehandler; @async(taskexecutor) @override public void onapplicationevent(asyncsendemailevent event) { if (event == null) { return; } string email = event.getemail(); string subject = event.getsubject(); string content = event.getcontent(); string targetuserid = event.gettargetuserid(); mesagehandler.sendsendemailsms(email, subject, content, targeruserid); }}
另外,可能有些时候采用applicationevent实现异步的使用,当程序出现异常错误的时候,需要考虑补偿机制,那么这时候可以结合spring retry重试来帮助我们避免这种异常造成数据不一致问题。
4.6 消息队列
4.6.1 回调事件消息生产者
@slf4j@componentpublic class callbackproducer { @autowired amqptemplate amqptemplate; public void sendcallbackmessage(callbackdto allbackdto, final long delaytimes) { log.info(生产者发送消息,callbackdto,{}, callbackdto); amqptemplate.convertandsend(callbackqueueenum.queue_gensee_callback.getexchange(), callbackqueueenum.queue_gensee_callback.getroutingkey(), jsonmapper.getinstance().tojson(genseecallbackdto), new messagepostprocessor() { @override public message postprocessmessage(message message) throws amqpexception { //给消息设置延迟毫秒值,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间 message.getmessageproperties().setheader(x-delay, delaytimes); message.getmessageproperties().setcorrelationid(callbackdto.getsdkid()); return message; } }); }}
4.6.2 回调事件消息消费者
@slf4j@component@rabbitlistener(queues = message.callback, containerfactory = rabbitlistenercontainerfactory)public class callbackconsumer { @autowired private iglobaluserservice globaluserservice; @rabbithandler public void handle(string json, channel channel, @headers map map) throws exception { if (map.get(error) != null) { //否认消息 channel.basicnack((long) map.get(amqpheaders.delivery_tag), false, true); return; } try { callbackdto callbackdto = jsonmapper.getinstance().fromjson(json, callbackdto.class); //执行业务逻辑 globaluserservice.execute(callbackdto); //消息消息成功手动确认,对应消息确认模式acknowledge-mode: manual channel.basicack((long) map.get(amqpheaders.delivery_tag), false); } catch (exception e) { log.error(回调失败 -> {}, e); } }}
4.7 threadutil异步工具类
@slf4jpublic class threadutils { public static void main(string[] args) { for (int i = 0; i { threadlocalrandom threadlocalrandom = threadlocalrandom.current(); int number = threadlocalrandom.nextint(20) + 1; system.out.println(number); }); log.info(当前第: + i + 个线程); } log.info(task finish!); }}
4.8 guava异步
guava的listenablefuture顾名思义就是可以监听的future,是对java原生future的扩展增强。我们知道future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用「guava listenablefuture」可以帮我们检测future是否完成了,不需要再通过get()方法苦苦等待异步的计算结果,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。
listenablefuture是一个接口,它从jdk的future接口继承,添加了void addlistener(runnable listener, executor executor)方法。
我们看下如何使用listenablefuture。首先需要定义listenablefuture的实例:
listeningexecutorservice executorservice = moreexecutors.listeningdecorator(executors.newcachedthreadpool()); final listenablefuture listenablefuture = executorservice.submit(new callable() { @override public integer call() throws exception { log.info(callable execute...) timeunit.seconds.sleep(1); return 1; } });
首先通过moreexecutors类的静态方法listeningdecorator方法初始化一个listeningexecutorservice的方法,然后使用此实例的submit方法即可初始化listenablefuture对象。
listenablefuture要做的工作,在callable接口的实现类中定义,这里只是休眠了1秒钟然后返回一个数字1,有了listenablefuture实例,可以执行此future并执行future完成之后的回调函数。
futures.addcallback(listenablefuture, new futurecallback() { @override public void onsuccess(integer result) { //成功执行... system.out.println(get listenable future's result with callback + result); } @override public void onfailure(throwable t) { //异常情况处理... t.printstacktrace(); }});
那么,以上就是本期介绍的实现异步的8种方式了。
全球物联网百家争鸣 中国有望成为全球物联网领航者
海信AWE集中队伍“主场秀肌肉”,“Hi,冠军”或成大展C位看点
一级浪涌保护器和二级浪涌保护器的区别是什么?
关于太赫兹辐射在医学领域中的应用分析
首个集成磁性传感器和有机电路的柔性电子产品问世
Java实现异步编程的八种实现方式
努比亚Z17mini和荣耀8哪个值得买?你想知道的都在这里
无线网络中怎么隐藏电脑
C语言中strlen和sizeof的区别
智能化妆镜的魅力,它能否刺激你的购买欲望
S7-300 PLC:I/O 访问故障组织块
苹果承认将在iPhone中弃用高通5G基带
抛开情怀,诺基亚9是一款非常有实力的旗舰
蔚来在新交所成功上市,港股受其影响上涨8.82%
ASUS智能路由器RT-AC5300怎么样 值不值得买
简单认识俊凯达101W大功率快充车充
机械硬盘价格暴跌,出货量触底反弹
闻泰科技经营资金盈余亮眼 偿还贷款绰绰有余
永贵电器发布2019年第三季度报告营业收入为29138660799元同比减少14.27%
香橙派Orange pi i 96技术分析