首先,到底啥是分布式事务呢,比如我们在执行一个业务逻辑的时候有两步分别操作a数据源和b数据源,当我们在a数据源执行数据更改后,在b数据源执行时出现运行时异常,那么我们必须要让b数据源的操作回滚,并回滚对a数据源的操作;这种情况在支付业务时常常出现;比如买票业务在最后支付失败,那之前的操作必须全部回滚,如果之前的操作分布在多个数据源中,那么这就是典型的分布式事务回滚;
了解了什么是分布式事务,那分布式事务在java的解决方案就是jta(即java transaction api);springboot官方提供了 atomikos or bitronix的解决思路;
其实,大多数情况下很多公司是使用消息队列的方式实现分布式事务。
本篇文章重点讲解springboot环境下,整合 atomikos +mysql+mybatis+tomcat/jetty;
一、项目依赖 pom.xml中添加atomikos的springboot相关依赖:
org.springframework.boot spring-boot-starter-jta-atomikos 点进去会发现里面整合好了:transactions-jms、transactions-jta、transactions-jdbc、javax.transaction-api
二、把数据源的相关配置项单独提炼到一个application.yml中: 注意:
这回我们的spring.datasource.type 是com.alibaba.druid.pool.xa.druidxadatasource;
spring.jta.transaction-manager-id的值在你的电脑中是唯一的,这个详细请阅读官方文档;
完整的yml文件如下:
spring: datasource: type: com.alibaba.druid.pool.xa.druidxadatasource druid: systemdb: name: systemdb url: jdbc//localhost:3306/springboot-mybatis?useunicode=true&characterencoding=utf-8 username: root password: root # 下面为连接池的补充设置,应用到上面所有数据源中 # 初始化大小,最小,最大 initialsize: 5 minidle: 5 maxactive: 20 # 配置获取连接等待超时的时间 maxwait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 timebetweenevictionrunsmillis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 minevictableidletimemillis: 30 validationquery: select 1 validationquerytimeout: 10000 testwhileidle: true testonborrow: false testonreturn: false # 打开pscache,并且指定每个连接上pscache的大小 poolpreparedstatements: true maxpoolpreparedstatementperconnectionsize: 20 filters: stat,wall # 通过connectproperties属性来打开mergesql功能;慢sql记录 connectionproperties: druid.stat.mergesql=true;druid.stat.slowsqlmillis=5000 # 合并多个druiddatasource的监控数据 useglobaldatasourcestat: true businessdb: name: businessdb url: jdbc//localhost:3306/springboot-mybatis2?useunicode=true&characterencoding=utf-8 username: root password: root # 下面为连接池的补充设置,应用到上面所有数据源中 # 初始化大小,最小,最大 initialsize: 5 minidle: 5 maxactive: 20 # 配置获取连接等待超时的时间 maxwait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 timebetweenevictionrunsmillis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 minevictableidletimemillis: 30 validationquery: select 1 validationquerytimeout: 10000 testwhileidle: true testonborrow: false testonreturn: false # 打开pscache,并且指定每个连接上pscache的大小 poolpreparedstatements: true maxpoolpreparedstatementperconnectionsize: 20 filters: stat,wall # 通过connectproperties属性来打开mergesql功能;慢sql记录 connectionproperties: druid.stat.mergesql=true;druid.stat.slowsqlmillis=5000 # 合并多个druiddatasource的监控数据 useglobaldatasourcestat: true #jta相关参数配置 jta: log-dir: classpath:tx-logs transaction-manager-id: txmanager 三、在druidconfig.java中实现多个数据源的注册;分布式事务管理器的注册;druid的注册; package com.zjt.config; import com.alibaba.druid.filter.stat.statfilter;import com.alibaba.druid.support.http.statviewservlet;import com.alibaba.druid.support.http.webstatfilter;import com.alibaba.druid.wall.wallconfig;import com.alibaba.druid.wall.wallfilter;import com.atomikos.icatch.jta.usertransactionimp;import com.atomikos.icatch.jta.usertransactionmanager;import org.springframework.beans.factory.annotation.autowired;import org.springframework.boot.jta.atomikos.atomikosdatasourcebean;import org.springframework.boot.web.servlet.filterregistrationbean;import org.springframework.boot.web.servlet.servletregistrationbean;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;import org.springframework.context.annotation.primary;import org.springframework.core.env.environment;import org.springframework.transaction.jta.jtatransactionmanager; import javax.sql.datasource;import javax.transaction.usertransaction;import java.util.properties; /** * druid配置 * * */@configurationpublic class druidconfig { @bean(name = systemdatasource) @primary @autowired public datasource systemdatasource(environment env) { atomikosdatasourcebean ds = new atomikosdatasourcebean(); properties prop = build(env, spring.datasource.druid.systemdb.); ds.setxadatasourceclassname(com.alibaba.druid.pool.xa.druidxadatasource); ds.setuniqueresourcename(systemdb); ds.setpoolsize(5); ds.setxaproperties(prop); return ds; } @autowired @bean(name = businessdatasource) public atomikosdatasourcebean businessdatasource(environment env) { atomikosdatasourcebean ds = new atomikosdatasourcebean(); properties prop = build(env, spring.datasource.druid.businessdb.); ds.setxadatasourceclassname(com.alibaba.druid.pool.xa.druidxadatasource); ds.setuniqueresourcename(businessdb); ds.setpoolsize(5); ds.setxaproperties(prop); return ds; } /** * 注入事物管理器 * @return */ @bean(name = xatx) public jtatransactionmanager regtransactionmanager () { usertransactionmanager usertransactionmanager = new usertransactionmanager(); usertransaction usertransaction = new usertransactionimp(); return new jtatransactionmanager(usertransaction, usertransactionmanager); } private properties build(environment env, string prefix) { properties prop = new properties(); prop.put(url, env.getproperty(prefix + url)); prop.put(username, env.getproperty(prefix + username)); prop.put(password, env.getproperty(prefix + password)); prop.put(driverclassname, env.getproperty(prefix + driverclassname, )); prop.put(initialsize, env.getproperty(prefix + initialsize, integer.class)); prop.put(maxactive, env.getproperty(prefix + maxactive, integer.class)); prop.put(minidle, env.getproperty(prefix + minidle, integer.class)); prop.put(maxwait, env.getproperty(prefix + maxwait, integer.class)); prop.put(poolpreparedstatements, env.getproperty(prefix + poolpreparedstatements, boolean.class)); prop.put(maxpoolpreparedstatementperconnectionsize, env.getproperty(prefix + maxpoolpreparedstatementperconnectionsize, integer.class)); prop.put(maxpoolpreparedstatementperconnectionsize, env.getproperty(prefix + maxpoolpreparedstatementperconnectionsize, integer.class)); prop.put(validationquery, env.getproperty(prefix + validationquery)); prop.put(validationquerytimeout, env.getproperty(prefix + validationquerytimeout, integer.class)); prop.put(testonborrow, env.getproperty(prefix + testonborrow, boolean.class)); prop.put(testonreturn, env.getproperty(prefix + testonreturn, boolean.class)); prop.put(testwhileidle, env.getproperty(prefix + testwhileidle, boolean.class)); prop.put(timebetweenevictionrunsmillis, env.getproperty(prefix + timebetweenevictionrunsmillis, integer.class)); prop.put(minevictableidletimemillis, env.getproperty(prefix + minevictableidletimemillis, integer.class)); prop.put(filters, env.getproperty(prefix + filters)); return prop; } @bean public servletregistrationbean druidservlet() { servletregistrationbean servletregistrationbean = new servletregistrationbean(new statviewservlet(), /druid/*); //控制台管理用户,加入下面2行 进入druid后台就需要登录 //servletregistrationbean.addinitparameter(loginusername, admin); //servletregistrationbean.addinitparameter(loginpassword, admin); return servletregistrationbean; } @bean public filterregistrationbean filterregistrationbean() { filterregistrationbean filterregistrationbean = new filterregistrationbean(); filterregistrationbean.setfilter(new webstatfilter()); filterregistrationbean.addurlpatterns(/*); filterregistrationbean.addinitparameter(exclusions, *.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*); filterregistrationbean.addinitparameter(profileenable, true); return filterregistrationbean; } @bean public statfilter statfilter(){ statfilter statfilter = new statfilter(); statfilter.setlogslowsql(true); //slowsqlmillis用来配置sql慢的标准,执行时间超过slowsqlmillis的就是慢。 statfilter.setmergesql(true); //sql合并配置 statfilter.setslowsqlmillis(1000);//slowsqlmillis的缺省值为3000,也就是3秒。 return statfilter; } @bean public wallfilter wallfilter(){ wallfilter wallfilter = new wallfilter(); //允许执行多条sql wallconfig config = new wallconfig(); config.setmultistatementallow(true); wallfilter.setconfig(config); return wallfilter; } } 四、分别配置每个数据源对应的sqlsessionfactory,以及mapperscan扫描的包: mybatisdatasourceconfig.java
package com.zjt.config; import com.zjt.util.mymapper;import org.apache.ibatis.session.sqlsessionfactory;import org.mybatis.spring.sqlsessionfactorybean;import org.mybatis.spring.sqlsessiontemplate;import org.mybatis.spring.annotation.mapperscan;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;import org.springframework.core.io.support.pathmatchingresourcepatternresolver;import org.springframework.core.io.support.resourcepatternresolver; import javax.sql.datasource; /** * * @description */@configuration// 精确到 mapper 目录,以便跟其他数据源隔离@mapperscan(basepackages = com.zjt.mapper, markerinterface = mymapper.class, sqlsessionfactoryref = sqlsessionfactory)public class mybatisdatasourceconfig { @autowired @qualifier(systemdatasource) private datasource ds; @bean public sqlsessionfactory sqlsessionfactory() throws exception { sqlsessionfactorybean factorybean = new sqlsessionfactorybean(); factorybean.setdatasource(ds); //指定mapper xml目录 resourcepatternresolver resolver = new pathmatchingresourcepatternresolver(); factorybean.setmapperlocations(resolver.getresources(classpath:mapper/*.xml)); return factorybean.getobject(); } @bean public sqlsessiontemplate sqlsessiontemplate() throws exception { sqlsessiontemplate template = new sqlsessiontemplate(sqlsessionfactory()); // 使用上面配置的factory return template; } //关于事务管理器,不管是jpa还是jdbc等都实现自接口 platformtransactionmanager // 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 datasourcetransactionmanager 实例。 //在spring容器中,我们手工注解@bean 将被优先加载,框架不会重新实例化其他的 platformtransactionmanager 实现类。 /*@bean(name = transactionmanager) @primary public datasourcetransactionmanager mastertransactionmanager() { //mybatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.sqlsessionfactorybean引用的数据源 // 与datasourcetransactionmanager引用的数据源一致即可,否则事务管理会不起作用。 return new datasourcetransactionmanager(ds); }*/ } mybatisdatasource2config.java
package com.zjt.config; import com.zjt.util.mymapper;import org.apache.ibatis.session.sqlsessionfactory;import org.mybatis.spring.sqlsessionfactorybean;import org.mybatis.spring.sqlsessiontemplate;import org.mybatis.spring.annotation.mapperscan;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;import org.springframework.core.io.support.pathmatchingresourcepatternresolver;import org.springframework.core.io.support.resourcepatternresolver; import javax.sql.datasource; /** * * @description */@configuration// 精确到 mapper 目录,以便跟其他数据源隔离@mapperscan(basepackages = com.zjt.mapper2, markerinterface = mymapper.class, sqlsessionfactoryref = sqlsessionfactory2)public class mybatisdatasource2config { @autowired @qualifier(businessdatasource) private datasource ds; @bean public sqlsessionfactory sqlsessionfactory2() throws exception { sqlsessionfactorybean factorybean = new sqlsessionfactorybean(); factorybean.setdatasource(ds); //指定mapper xml目录 resourcepatternresolver resolver = new pathmatchingresourcepatternresolver(); factorybean.setmapperlocations(resolver.getresources(classpath:mapper2/*.xml)); return factorybean.getobject(); } @bean public sqlsessiontemplate sqlsessiontemplate2() throws exception { sqlsessiontemplate template = new sqlsessiontemplate(sqlsessionfactory2()); // 使用上面配置的factory return template; } //关于事务管理器,不管是jpa还是jdbc等都实现自接口 platformtransactionmanager // 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 datasourcetransactionmanager 实例。 //在spring容器中,我们手工注解@bean 将被优先加载,框架不会重新实例化其他的 platformtransactionmanager 实现类。 /*@bean(name = transactionmanager2) @primary public datasourcetransactionmanager mastertransactionmanager() { //mybatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.sqlsessionfactorybean引用的数据源 // 与datasourcetransactionmanager引用的数据源一致即可,否则事务管理会不起作用。 return new datasourcetransactionmanager(ds); }*/ } 由于我们本例中只使用一个事务管理器:xatx,故就不在使用txadviceinterceptor.java和txadvice2interceptor.java中配置的事务管理器了;有需求的童鞋可以自己配置其他的事务管理器;(见druidconfig.java中查看)
五、新建分布式业务测试接口jtatestservice.java和实现类jtatestserviceimpl.java 其实就是一个很简单的test01()方法,在该方法中我们分别先后调用classservice.saveorupdatetclass(tclass);和teacherservice.saveorupdateteacher(teacher);
实现先后操作两个数据源:然后我们可以自己debug跟踪事务的提交时机,此外,也可以在在两个方法全执行结束之后,手动制造一个运行时异常,来检查分布式事务是否全部回滚;
注意:
在实现类的方法中我使用的是:
@transactional(transactionmanager = xatx, propagation = propagation.required, rollbackfor = { java.lang.runtimeexception.class }) 从而指定了使用哪个事务管理器,事务隔离级别(一般都用我这个默认的),回滚的条件(一般可以使用exception),这三个可以自己根据业务实际修改;
package com.zjt.service3; import java.util.map; public interface jtatestservice { public map test01(); } package com.zjt.service3.impl; import com.zjt.entity.tclass;import com.zjt.entity.teacher;import com.zjt.service.tclassservice;import com.zjt.service2.teacherservice;import com.zjt.service3.jtatestservice;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.stereotype.service;import org.springframework.transaction.annotation.propagation;import org.springframework.transaction.annotation.transactional; import java.util.linkedhashmap;import java.util.map; @service(jtatestserviceimpl)public class jtatestserviceimpl implements jtatestservice{ @autowired @qualifier(teacherserviceimpl) private teacherservice teacherservice; @autowired @qualifier(tclassserviceimpl) private tclassservice tclassservice; @override @transactional(transactionmanager = xatx, propagation = propagation.required, rollbackfor = { java.lang.runtimeexception.class }) public map test01() { linkedhashmap resultmap=new linkedhashmap(); tclass tclass=new tclass(); tclass.setname(8888); tclassservice.saveorupdatetclass(tclass); teacher teacher=new teacher(); teacher.setname(8888); teacherservice.saveorupdateteacher(teacher); system.out.println(1/0); resultmap.put(state,success); resultmap.put(message,分布式事务同步成功); return resultmap; }} 六、建立jtatestcontoller.java,接受一个来自前端的http请求,触发jtatestservice 的test01方法: package com.zjt.web; import com.zjt.service3.jtatestservice;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.stereotype.controller;import org.springframework.web.bind.annotation.requestmapping;import org.springframework.web.bind.annotation.responsebody; import java.util.linkedhashmap;import java.util.map; @controller@requestmapping(/jtatest)public class jtatestcontoller { @autowired @qualifier(jtatestserviceimpl) private jtatestservice tatestservice; @responsebody @requestmapping(/test01) public map test01(){ linkedhashmap resultmap=new linkedhashmap(); try { return tatestservice.test01(); }catch (exception e){ resultmap.put(state,fail); resultmap.put(message,分布式事务同步失败); return resultmap; } }} 七、在test.ftl中增加一个按钮来测试; //分布式事务测试$(#jtatest).click(function(){ $.ajax({ type: post, url: ${basepath!}/jtatest/test01, data: {} , async: false, error: function (request) { layer.alert(与服务器连接失败/(ㄒoㄒ)/~~); return false; }, success: function (data) { if (data.state == 'fail') { layer.alert(data.message); return false; }else if(data.state == 'success'){ layer.alert(data.message); } } });}); 同时向班级和老师表插入名为8888的班级和老师 八、启动服务,验证结果: 点击这个按钮,跳转到controller:
当正常执行了sql语句之后,我们可以发现数据库并没有变化,因为整个方法的事务还没有走完,当我们走到1/0这步时:
抛出运行时异常,并被spring事务拦截器拦截,并捕获异常:
在this.completetransactionafterthrowing(txinfo, var16);方法中会将事务全部回滚:
2204.243 logback [http-nio-8080-exec-5] info c.a.i.imp.compositetransactionimp - rollback() done of transaction 192.168.1.103.tm0000400006 此时,当我们再次打开数据库验证,依旧没有变化,证明分布式事务配置成功;
大家可以基于我的代码自己练习一下,自己尝试着使用多事务管理器的情况下的灵活配置;
九、后记: 本文源代码:
https://github.com/zhaojiatao/springboot-zjt-chapter10-springboot-atomikos-mysql-mybatis-druid.git
代码在tomcat和jetty环境下均可完成事务回滚;
在事务回滚时可能报一个transactional not active的警告,我google后,老外也说不出这个具体作用,大部分人认为这只是一个警告,可以忽略;
-end-
稀土元素在电镀设备技术中的应用
如何在博途环境下载但不重新初始化数据块呢?
新能源汽车电子电气架构及高压连接器的发展
叮咚2代智能音箱拆机图详解_叮咚2代智能音箱深度评测
西藏首个虚拟现实和增强现实技术应用协会揭牌
SpringBoot分布式事务的解决方案(JTA+Atomic+多数据源)
一种产生激光脉冲新方法
助力世界一流能源企业建设 央企探索人工智能管理应用
盘点物联网行业的最新资讯
环信语音连麦聊天室体验指南
桁架机械手维修方式?
优炫软件与蜀道智慧交通集团签署战略合作协议,打造智慧交通新形态
激光点云感知三维空间介绍
上海积塔半导体特色工艺生产线项目将于本月开工!
警惕工业自动化控制系统所带来的安全风险
S32G3量产,S32G汽车网络处理器规模翻番!这颗芯为何这么受欢迎?
发那科iRVision 机器视觉堆垛程序概述
低于1KB的低频振荡器电路图
无人机技术日益成熟
锂电池防爆箱成为锂电池发展的助推剂