SpringBoot分布式事务的解决方案(JTA+Atomic+多数据源)

首先,到底啥是分布式事务呢,比如我们在执行一个业务逻辑的时候有两步分别操作a数据源和b数据源,当我们在a数据源执行数据更改后,在b数据源执行时出现运行时异常,那么我们必须要让b数据源的操作回滚,并回滚对a数据源的操作;这种情况在支付业务时常常出现;比如买票业务在最后支付失败,那之前的操作必须全部回滚,如果之前的操作分布在多个数据源中,那么这就是典型的分布式事务回滚;
了解了什么是分布式事务,那分布式事务在java的解决方案就是jta(即java transaction api);springboot官方提供了 atomikos or bitronix的解决思路;
其实,大多数情况下很多公司是使用消息队列的方式实现分布式事务。
本篇文章重点讲解springboot环境下,整合 atomikos +mysql+mybatis+tomcat/jetty;
一、项目依赖 pom.xml中添加atomikos的springboot相关依赖:
    org.springframework.boot    spring-boot-starter-jta-atomikos 点进去会发现里面整合好了:transactions-jms、transactions-jta、transactions-jdbc、javax.transaction-api
二、把数据源的相关配置项单独提炼到一个application.yml中: 注意:
这回我们的spring.datasource.type 是com.alibaba.druid.pool.xa.druidxadatasource;
spring.jta.transaction-manager-id的值在你的电脑中是唯一的,这个详细请阅读官方文档;
完整的yml文件如下:
spring:  datasource:    type: com.alibaba.druid.pool.xa.druidxadatasource    druid:          systemdb:        name: systemdb        url: jdbc//localhost:3306/springboot-mybatis?useunicode=true&characterencoding=utf-8        username: root        password: root        # 下面为连接池的补充设置,应用到上面所有数据源中        # 初始化大小,最小,最大        initialsize: 5        minidle: 5        maxactive: 20        # 配置获取连接等待超时的时间        maxwait: 60000        # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒        timebetweenevictionrunsmillis: 60000        # 配置一个连接在池中最小生存的时间,单位是毫秒        minevictableidletimemillis: 30        validationquery: select 1        validationquerytimeout: 10000        testwhileidle: true        testonborrow: false        testonreturn: false        # 打开pscache,并且指定每个连接上pscache的大小        poolpreparedstatements: true        maxpoolpreparedstatementperconnectionsize: 20        filters: stat,wall        # 通过connectproperties属性来打开mergesql功能;慢sql记录        connectionproperties: druid.stat.mergesql=true;druid.stat.slowsqlmillis=5000        # 合并多个druiddatasource的监控数据        useglobaldatasourcestat: true       businessdb:        name: businessdb         url: jdbc//localhost:3306/springboot-mybatis2?useunicode=true&characterencoding=utf-8        username: root        password: root        # 下面为连接池的补充设置,应用到上面所有数据源中        # 初始化大小,最小,最大        initialsize: 5        minidle: 5        maxactive: 20        # 配置获取连接等待超时的时间        maxwait: 60000        # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒        timebetweenevictionrunsmillis: 60000        # 配置一个连接在池中最小生存的时间,单位是毫秒        minevictableidletimemillis: 30        validationquery: select 1        validationquerytimeout: 10000        testwhileidle: true        testonborrow: false        testonreturn: false        # 打开pscache,并且指定每个连接上pscache的大小        poolpreparedstatements: true        maxpoolpreparedstatementperconnectionsize: 20        filters: stat,wall        # 通过connectproperties属性来打开mergesql功能;慢sql记录        connectionproperties: druid.stat.mergesql=true;druid.stat.slowsqlmillis=5000        # 合并多个druiddatasource的监控数据        useglobaldatasourcestat: true   #jta相关参数配置  jta:    log-dir: classpath:tx-logs    transaction-manager-id: txmanager 三、在druidconfig.java中实现多个数据源的注册;分布式事务管理器的注册;druid的注册; package com.zjt.config; import com.alibaba.druid.filter.stat.statfilter;import com.alibaba.druid.support.http.statviewservlet;import com.alibaba.druid.support.http.webstatfilter;import com.alibaba.druid.wall.wallconfig;import com.alibaba.druid.wall.wallfilter;import com.atomikos.icatch.jta.usertransactionimp;import com.atomikos.icatch.jta.usertransactionmanager;import org.springframework.beans.factory.annotation.autowired;import org.springframework.boot.jta.atomikos.atomikosdatasourcebean;import org.springframework.boot.web.servlet.filterregistrationbean;import org.springframework.boot.web.servlet.servletregistrationbean;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;import org.springframework.context.annotation.primary;import org.springframework.core.env.environment;import org.springframework.transaction.jta.jtatransactionmanager; import javax.sql.datasource;import javax.transaction.usertransaction;import java.util.properties; /** * druid配置 * *  */@configurationpublic class druidconfig {    @bean(name = systemdatasource)    @primary    @autowired    public datasource systemdatasource(environment env) {        atomikosdatasourcebean ds = new atomikosdatasourcebean();        properties prop = build(env, spring.datasource.druid.systemdb.);        ds.setxadatasourceclassname(com.alibaba.druid.pool.xa.druidxadatasource);        ds.setuniqueresourcename(systemdb);        ds.setpoolsize(5);        ds.setxaproperties(prop);        return ds;     }     @autowired    @bean(name = businessdatasource)    public atomikosdatasourcebean businessdatasource(environment env) {         atomikosdatasourcebean ds = new atomikosdatasourcebean();        properties prop = build(env, spring.datasource.druid.businessdb.);        ds.setxadatasourceclassname(com.alibaba.druid.pool.xa.druidxadatasource);        ds.setuniqueresourcename(businessdb);        ds.setpoolsize(5);        ds.setxaproperties(prop);         return ds;    }      /**     * 注入事物管理器     * @return     */    @bean(name = xatx)    public jtatransactionmanager regtransactionmanager () {        usertransactionmanager usertransactionmanager = new usertransactionmanager();        usertransaction usertransaction = new usertransactionimp();        return new jtatransactionmanager(usertransaction, usertransactionmanager);    }      private properties build(environment env, string prefix) {         properties prop = new properties();        prop.put(url, env.getproperty(prefix + url));        prop.put(username, env.getproperty(prefix + username));        prop.put(password, env.getproperty(prefix + password));        prop.put(driverclassname, env.getproperty(prefix + driverclassname, ));        prop.put(initialsize, env.getproperty(prefix + initialsize, integer.class));        prop.put(maxactive, env.getproperty(prefix + maxactive, integer.class));        prop.put(minidle, env.getproperty(prefix + minidle, integer.class));        prop.put(maxwait, env.getproperty(prefix + maxwait, integer.class));        prop.put(poolpreparedstatements, env.getproperty(prefix + poolpreparedstatements, boolean.class));         prop.put(maxpoolpreparedstatementperconnectionsize,                env.getproperty(prefix + maxpoolpreparedstatementperconnectionsize, integer.class));         prop.put(maxpoolpreparedstatementperconnectionsize,                env.getproperty(prefix + maxpoolpreparedstatementperconnectionsize, integer.class));        prop.put(validationquery, env.getproperty(prefix + validationquery));        prop.put(validationquerytimeout, env.getproperty(prefix + validationquerytimeout, integer.class));        prop.put(testonborrow, env.getproperty(prefix + testonborrow, boolean.class));        prop.put(testonreturn, env.getproperty(prefix + testonreturn, boolean.class));        prop.put(testwhileidle, env.getproperty(prefix + testwhileidle, boolean.class));        prop.put(timebetweenevictionrunsmillis,                env.getproperty(prefix + timebetweenevictionrunsmillis, integer.class));        prop.put(minevictableidletimemillis, env.getproperty(prefix + minevictableidletimemillis, integer.class));        prop.put(filters, env.getproperty(prefix + filters));         return prop;    }     @bean    public servletregistrationbean druidservlet() {        servletregistrationbean servletregistrationbean = new servletregistrationbean(new statviewservlet(), /druid/*);         //控制台管理用户,加入下面2行 进入druid后台就需要登录        //servletregistrationbean.addinitparameter(loginusername, admin);        //servletregistrationbean.addinitparameter(loginpassword, admin);        return servletregistrationbean;    }     @bean    public filterregistrationbean filterregistrationbean() {        filterregistrationbean filterregistrationbean = new filterregistrationbean();        filterregistrationbean.setfilter(new webstatfilter());        filterregistrationbean.addurlpatterns(/*);        filterregistrationbean.addinitparameter(exclusions, *.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*);        filterregistrationbean.addinitparameter(profileenable, true);        return filterregistrationbean;    }     @bean    public statfilter statfilter(){        statfilter statfilter = new statfilter();        statfilter.setlogslowsql(true); //slowsqlmillis用来配置sql慢的标准,执行时间超过slowsqlmillis的就是慢。        statfilter.setmergesql(true); //sql合并配置        statfilter.setslowsqlmillis(1000);//slowsqlmillis的缺省值为3000,也就是3秒。        return statfilter;    }     @bean    public wallfilter wallfilter(){        wallfilter wallfilter = new wallfilter();        //允许执行多条sql        wallconfig config = new wallconfig();        config.setmultistatementallow(true);        wallfilter.setconfig(config);        return wallfilter;    } } 四、分别配置每个数据源对应的sqlsessionfactory,以及mapperscan扫描的包: mybatisdatasourceconfig.java
package com.zjt.config; import com.zjt.util.mymapper;import org.apache.ibatis.session.sqlsessionfactory;import org.mybatis.spring.sqlsessionfactorybean;import org.mybatis.spring.sqlsessiontemplate;import org.mybatis.spring.annotation.mapperscan;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;import org.springframework.core.io.support.pathmatchingresourcepatternresolver;import org.springframework.core.io.support.resourcepatternresolver; import javax.sql.datasource; /** *  * @description */@configuration// 精确到 mapper 目录,以便跟其他数据源隔离@mapperscan(basepackages = com.zjt.mapper, markerinterface = mymapper.class, sqlsessionfactoryref = sqlsessionfactory)public class mybatisdatasourceconfig {     @autowired    @qualifier(systemdatasource)    private datasource ds;     @bean    public sqlsessionfactory sqlsessionfactory() throws exception {        sqlsessionfactorybean factorybean = new sqlsessionfactorybean();        factorybean.setdatasource(ds);        //指定mapper xml目录        resourcepatternresolver resolver = new pathmatchingresourcepatternresolver();        factorybean.setmapperlocations(resolver.getresources(classpath:mapper/*.xml));        return factorybean.getobject();     }     @bean    public sqlsessiontemplate sqlsessiontemplate() throws exception {        sqlsessiontemplate template = new sqlsessiontemplate(sqlsessionfactory()); // 使用上面配置的factory        return template;    }     //关于事务管理器,不管是jpa还是jdbc等都实现自接口 platformtransactionmanager    // 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 datasourcetransactionmanager 实例。    //在spring容器中,我们手工注解@bean 将被优先加载,框架不会重新实例化其他的 platformtransactionmanager 实现类。    /*@bean(name = transactionmanager)    @primary    public datasourcetransactionmanager mastertransactionmanager() {        //mybatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.sqlsessionfactorybean引用的数据源        // 与datasourcetransactionmanager引用的数据源一致即可,否则事务管理会不起作用。        return new datasourcetransactionmanager(ds);    }*/ } mybatisdatasource2config.java
package com.zjt.config; import com.zjt.util.mymapper;import org.apache.ibatis.session.sqlsessionfactory;import org.mybatis.spring.sqlsessionfactorybean;import org.mybatis.spring.sqlsessiontemplate;import org.mybatis.spring.annotation.mapperscan;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;import org.springframework.core.io.support.pathmatchingresourcepatternresolver;import org.springframework.core.io.support.resourcepatternresolver; import javax.sql.datasource; /** *  * @description */@configuration// 精确到 mapper 目录,以便跟其他数据源隔离@mapperscan(basepackages = com.zjt.mapper2, markerinterface = mymapper.class, sqlsessionfactoryref = sqlsessionfactory2)public class mybatisdatasource2config {     @autowired    @qualifier(businessdatasource)    private datasource ds;     @bean    public sqlsessionfactory sqlsessionfactory2() throws exception {        sqlsessionfactorybean factorybean = new sqlsessionfactorybean();        factorybean.setdatasource(ds);        //指定mapper xml目录        resourcepatternresolver resolver = new pathmatchingresourcepatternresolver();        factorybean.setmapperlocations(resolver.getresources(classpath:mapper2/*.xml));        return factorybean.getobject();     }     @bean    public sqlsessiontemplate sqlsessiontemplate2() throws exception {        sqlsessiontemplate template = new sqlsessiontemplate(sqlsessionfactory2()); // 使用上面配置的factory        return template;    }     //关于事务管理器,不管是jpa还是jdbc等都实现自接口 platformtransactionmanager    // 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 datasourcetransactionmanager 实例。    //在spring容器中,我们手工注解@bean 将被优先加载,框架不会重新实例化其他的 platformtransactionmanager 实现类。    /*@bean(name = transactionmanager2)    @primary    public datasourcetransactionmanager mastertransactionmanager() {        //mybatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.sqlsessionfactorybean引用的数据源        // 与datasourcetransactionmanager引用的数据源一致即可,否则事务管理会不起作用。        return new datasourcetransactionmanager(ds);    }*/ } 由于我们本例中只使用一个事务管理器:xatx,故就不在使用txadviceinterceptor.java和txadvice2interceptor.java中配置的事务管理器了;有需求的童鞋可以自己配置其他的事务管理器;(见druidconfig.java中查看)
五、新建分布式业务测试接口jtatestservice.java和实现类jtatestserviceimpl.java 其实就是一个很简单的test01()方法,在该方法中我们分别先后调用classservice.saveorupdatetclass(tclass);和teacherservice.saveorupdateteacher(teacher);
实现先后操作两个数据源:然后我们可以自己debug跟踪事务的提交时机,此外,也可以在在两个方法全执行结束之后,手动制造一个运行时异常,来检查分布式事务是否全部回滚;
注意:
在实现类的方法中我使用的是:
@transactional(transactionmanager = xatx, propagation = propagation.required, rollbackfor = { java.lang.runtimeexception.class }) 从而指定了使用哪个事务管理器,事务隔离级别(一般都用我这个默认的),回滚的条件(一般可以使用exception),这三个可以自己根据业务实际修改;
package com.zjt.service3; import java.util.map; public interface jtatestservice {     public map test01(); } package com.zjt.service3.impl;  import com.zjt.entity.tclass;import com.zjt.entity.teacher;import com.zjt.service.tclassservice;import com.zjt.service2.teacherservice;import com.zjt.service3.jtatestservice;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.stereotype.service;import org.springframework.transaction.annotation.propagation;import org.springframework.transaction.annotation.transactional; import java.util.linkedhashmap;import java.util.map; @service(jtatestserviceimpl)public class jtatestserviceimpl implements jtatestservice{     @autowired    @qualifier(teacherserviceimpl)    private teacherservice teacherservice;    @autowired    @qualifier(tclassserviceimpl)    private tclassservice tclassservice;     @override    @transactional(transactionmanager = xatx, propagation = propagation.required, rollbackfor = { java.lang.runtimeexception.class })    public map test01() {        linkedhashmap resultmap=new linkedhashmap();        tclass tclass=new tclass();        tclass.setname(8888);        tclassservice.saveorupdatetclass(tclass);         teacher teacher=new teacher();        teacher.setname(8888);        teacherservice.saveorupdateteacher(teacher);         system.out.println(1/0);         resultmap.put(state,success);        resultmap.put(message,分布式事务同步成功);        return resultmap;    }} 六、建立jtatestcontoller.java,接受一个来自前端的http请求,触发jtatestservice 的test01方法: package com.zjt.web;  import com.zjt.service3.jtatestservice;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.stereotype.controller;import org.springframework.web.bind.annotation.requestmapping;import org.springframework.web.bind.annotation.responsebody; import java.util.linkedhashmap;import java.util.map; @controller@requestmapping(/jtatest)public class jtatestcontoller {     @autowired    @qualifier(jtatestserviceimpl)    private jtatestservice tatestservice;       @responsebody    @requestmapping(/test01)    public map test01(){        linkedhashmap resultmap=new linkedhashmap();        try {            return tatestservice.test01();        }catch (exception e){            resultmap.put(state,fail);            resultmap.put(message,分布式事务同步失败);            return resultmap;        }    }} 七、在test.ftl中增加一个按钮来测试; //分布式事务测试$(#jtatest).click(function(){ $.ajax({ type: post, url: ${basepath!}/jtatest/test01, data: {} , async: false, error: function (request) { layer.alert(与服务器连接失败/(ㄒoㄒ)/~~); return false; }, success: function (data) { if (data.state == 'fail') { layer.alert(data.message); return false; }else if(data.state == 'success'){ layer.alert(data.message); } } });}); 同时向班级和老师表插入名为8888的班级和老师 八、启动服务,验证结果: 点击这个按钮,跳转到controller:
当正常执行了sql语句之后,我们可以发现数据库并没有变化,因为整个方法的事务还没有走完,当我们走到1/0这步时:
抛出运行时异常,并被spring事务拦截器拦截,并捕获异常:
在this.completetransactionafterthrowing(txinfo, var16);方法中会将事务全部回滚:
2204.243 logback [http-nio-8080-exec-5] info c.a.i.imp.compositetransactionimp - rollback() done of transaction 192.168.1.103.tm0000400006 此时,当我们再次打开数据库验证,依旧没有变化,证明分布式事务配置成功;
大家可以基于我的代码自己练习一下,自己尝试着使用多事务管理器的情况下的灵活配置;
九、后记: 本文源代码:
https://github.com/zhaojiatao/springboot-zjt-chapter10-springboot-atomikos-mysql-mybatis-druid.git
代码在tomcat和jetty环境下均可完成事务回滚;
在事务回滚时可能报一个transactional not active的警告,我google后,老外也说不出这个具体作用,大部分人认为这只是一个警告,可以忽略;
-end-

稀土元素在电镀设备技术中的应用
如何在博途环境下载但不重新初始化数据块呢?
新能源汽车电子电气架构及高压连接器的发展
叮咚2代智能音箱拆机图详解_叮咚2代智能音箱深度评测
西藏首个虚拟现实和增强现实技术应用协会揭牌
SpringBoot分布式事务的解决方案(JTA+Atomic+多数据源)
一种产生激光脉冲新方法
助力世界一流能源企业建设 央企探索人工智能管理应用
盘点物联网行业的最新资讯
环信语音连麦聊天室体验指南
桁架机械手维修方式?
优炫软件与蜀道智慧交通集团签署战略合作协议,打造智慧交通新形态
激光点云感知三维空间介绍
上海积塔半导体特色工艺生产线项目将于本月开工!
警惕工业自动化控制系统所带来的安全风险
S32G3量产,S32G汽车网络处理器规模翻番!这颗芯为何这么受欢迎?
发那科iRVision 机器视觉堆垛程序概述
低于1KB的低频振荡器电路图
无人机技术日益成熟
锂电池防爆箱成为锂电池发展的助推剂