如何使用注解实现redis分布式锁!

一、业务背景
有些业务请求,属于耗时操作,需要加锁,防止后续的并发操作,同时对数据库的数据进行操作,需要避免对之前的业务造成影响。
二、分析流程
使用 redis 作为分布式锁,将锁的状态放到 redis 统一维护,解决集群中单机 jvm 信息不互通的问题,规定操作顺序,保护用户的数据正确。
梳理设计流程
1、新建注解 @interface,在注解里设定入参标志
2、增加 aop 切点,扫描特定注解
3、建立 @aspect 切面任务,注册 bean 和拦截特定方法
4、特定方法参数 proceedingjoinpoint,对方法 pjp.proceed() 前后进行拦截
5、切点前进行加锁,任务执行后进行删除 key
核心步骤:加锁、解锁和续时
加锁
使用了 redistemplate 的 opsforvalue.setifabsent 方法,判断是否有 key,设定一个随机数 uuid.random().tostring,生成一个随机数作为 value。
从 redis 中获取锁之后,对 key 设定 expire 失效时间,到期后自动释放锁。
按照这种设计,只有第一个成功设定 key 的请求,才能进行后续的数据操作,后续其它请求由于无法获得资源,将会失败结束。
超时问题
担心 pjp.proceed() 切点执行的方法太耗时,导致 redis 中的 key 由于超时提前释放了。
例如,线程 a 先获取锁,proceed 方法耗时,超过了锁超时时间,到期释放了锁,这时另一个线程 b 成功获取 redis 锁,两个线程同时对同一批数据进行操作,导致数据不准确。
解决方案:增加一个「续时」
任务不完成,锁不释放:
维护了一个定时线程池 scheduledexecutorservice,每隔 2s 去扫描加入队列中的 task,判断是否失效时间是否快到了,公式为:【失效时间】 {        // do something to extend time    }, 0,  2, timeunit.seconds);} 三、设计方案
经过上面的分析,设计出了这个方案:
前面已经说了整体流程,这里强调一下几个核心步骤:
拦截注解 @redislock,获取必要的参数
加锁操作
续时操作
结束业务,释放锁
四、实操
之前也有整理过 aop 使用方法,可以参考一下
相关属性类配置
业务属性枚举设定
public enum redislocktypeenum {
    /**     * 自定义 key 前缀     */    one(business1, test1),    two(business2, test2);    private string code;    private string desc;    redislocktypeenum(string code, string desc) {        this.code = code;        this.desc = desc;    }    public string getcode() {        return code;    }    public string getdesc() {        return desc;    }    public string getuniquekey(string key) {        return string.format(%s:%s, this.getcode(), key);    }} 任务队列保存参数
public class redislockdefinitionholder {    /**     * 业务唯一 key     */    private string businesskey;    /**     * 加锁时间 (秒 s)     */    private long locktime;    /**     * 上次更新时间(ms)     */    private long lastmodifytime;    /**     * 保存当前线程     */    private thread currenttread;    /**     * 总共尝试次数     */    private int trycount;    /**     * 当前尝试次数     */    private int currentcount;    /**     * 更新的时间周期(毫秒),公式 = 加锁时间(转成毫秒) / 3     */    private long modifyperiod;    public redislockdefinitionholder(string businesskey, long locktime, long lastmodifytime, thread currenttread, int trycount) {        this.businesskey = businesskey;        this.locktime = locktime;        this.lastmodifytime = lastmodifytime;        this.currenttread = currenttread;        this.trycount = trycount;        this.modifyperiod = locktime * 1000 / 3;    }} 设定被拦截的注解名字
@retention(retentionpolicy.runtime)@target({elementtype.method, elementtype.type})public @interface redislockannotation {    /**     * 特定参数识别,默认取第 0 个下标     */    int lockfiled() default 0;    /**     * 超时重试次数     */    int trycount() default 3;    /**     * 自定义加锁类型     */    redislocktypeenum typeenum();    /**     * 释放时间,秒 s 单位     */    long locktime() default 30;} 核心切面拦截的操作
redislockaspect.java 该类分成三部分来描述具体作用
pointcut 设定
/** * @annotation 中的路径表示拦截特定注解 */@pointcut(@annotation(cn.sevenyuan.demo.aop.lock.redislockannotation))public void redislockpc() {} around 前后进行加锁和释放锁 前面步骤定义了我们想要拦截的切点,下一步就是在切点前后做一些自定义操作:
@around(value = redislockpc())public object around(proceedingjoinpoint pjp) throws throwable {    // 解析参数    method method = resolvemethod(pjp);    redislockannotation annotation = method.getannotation(redislockannotation.class);    redislocktypeenum typeenum = annotation.typeenum();    object[] params = pjp.getargs();    string ukstring = params[annotation.lockfiled()].tostring();    // 省略很多参数校验和判空    string businesskey = typeenum.getuniquekey(ukstring);    string uniquevalue = uuid.randomuuid().tostring();    // 加锁    object result = null;    try {        boolean issuccess = redistemplate.opsforvalue().setifabsent(businesskey, uniquevalue);        if (!issuccess) {            throw new exception(you can't do it,because another has get the lock =-=);        }        redistemplate.expire(businesskey, annotation.locktime(), timeunit.seconds);        thread currentthread = thread.currentthread();        // 将本次 task 信息加入「延时」队列中        holderlist.add(new redislockdefinitionholder(businesskey, annotation.locktime(), system.currenttimemillis(),                currentthread, annotation.trycount()));        // 执行业务操作        result = pjp.proceed();        // 线程被中断,抛出异常,中断此次请求        if (currentthread.isinterrupted()) {            throw new interruptedexception(you had been interrupted =-=);        }    } catch (interruptedexception e ) {        log.error(interrupt exception, rollback transaction, e);        throw new exception(interrupt exception, please send request again);    } catch (exception e) {        log.error(has some error, please check again, e);    } finally {        // 请求结束后,强制删掉 key,释放锁        redistemplate.delete(businesskey);        log.info(release the lock, businesskey is [ + businesskey + ]);    }    return result;} 上述流程简单总结一下:
解析注解参数,获取注解值和方法上的参数值
redis 加锁并且设置超时时间
将本次 task 信息加入「延时」队列中,进行续时,方式提前释放锁
加了一个线程中断标志
结束请求,finally 中释放锁
续时操作
这里用了 scheduledexecutorservice,维护了一个线程,不断对任务队列中的任务进行判断和延长超时时间:
// 扫描的任务队列private static concurrentlinkedqueue holderlist = new concurrentlinkedqueue();/** * 线程池,维护keyalivetime */private static final scheduledexecutorservice scheduler = new scheduledthreadpoolexecutor(1,        new basicthreadfactory.builder().namingpattern(redislock-schedule-pool).daemon(true).build());{    // 两秒执行一次「续时」操作    scheduler.scheduleatfixedrate(() -> {        // 这里记得加 try-catch,否者报错后定时任务将不会再执行=-=        iterator iterator = holderlist.iterator();        while (iterator.hasnext()) {            redislockdefinitionholder holder = iterator.next();            // 判空            if (holder == null) {                iterator.remove();                continue;            }            // 判断 key 是否还有效,无效的话进行移除            if (redistemplate.opsforvalue().get(holder.getbusinesskey()) == null) {                iterator.remove();                continue;            }            // 超时重试次数,超过时给线程设定中断            if (holder.getcurrentcount() > holder.gettrycount()) {                holder.getcurrenttread().interrupt();                iterator.remove();                continue;            }            // 判断是否进入最后三分之一时间            long curtime = system.currenttimemillis();            boolean shouldextend = (holder.getlastmodifytime() + holder.getmodifyperiod()) <= curtime;            if (shouldextend) {                holder.setlastmodifytime(curtime);                redistemplate.expire(holder.getbusinesskey(), holder.getlocktime(), timeunit.seconds);                log.info(businesskey : [ + holder.getbusinesskey() + ], try count :  + holder.getcurrentcount());                holder.setcurrentcount(holder.getcurrentcount() + 1);            }        }    }, 0, 2, timeunit.seconds);} 这段代码,用来实现设计图中虚线框的思想,避免一个请求十分耗时,导致提前释放了锁。
这里加了「线程中断」thread#interrupt,希望超过重试次数后,能让线程中断(未经严谨测试,仅供参考哈哈哈哈)
不过建议如果遇到这么耗时的请求,还是能够从根源上查找,分析耗时路径,进行业务优化或其它处理,避免这些耗时操作。
所以记得多打点 log,分析问题时可以更快一点。
五、开始测试
在一个入口方法中,使用该注解,然后在业务中模拟耗时请求,使用了 thread#sleep
@getmapping(/testredislock)@redislockannotation(typeenum = redislocktypeenum.one, locktime = 3)public book testredislock(@requestparam(userid) long userid) {    try {        log.info(睡眠执行前);        thread.sleep(10000);        log.info(睡眠执行后);    } catch (exception e) {        // log error        log.info(has some error, e);    }    return null;} 使用时,在方法上添加该注解,然后设定相应参数即可,根据 typeenum 可以区分多种业务,限制该业务被同时操作。
测试结果:
2020-04-04 1450.864  info 9326 --- [nio-8081-exec-1] c.s.demo.controller.bookcontroller       : 睡眠执行前2020-04-04 1452.855  info 9326 --- [k-schedule-pool] c.s.demo.aop.lock.redislockaspect        : businesskey : [business1:1024], try count : 02020-04-04 1454.851  info 9326 --- [k-schedule-pool] c.s.demo.aop.lock.redislockaspect        : businesskey : [business1:1024], try count : 12020-04-04 1456.851  info 9326 --- [k-schedule-pool] c.s.demo.aop.lock.redislockaspect        : businesskey : [business1:1024], try count : 22020-04-04 1458.852  info 9326 --- [k-schedule-pool] c.s.demo.aop.lock.redislockaspect        : businesskey : [business1:1024], try count : 32020-04-04 1400.857  info 9326 --- [nio-8081-exec-1] c.s.demo.controller.bookcontroller       : has some errorjava.lang.interruptedexception: sleep interrupted at java.lang.thread.sleep(native method) [na:1.8.0_221] 我这里测试的是重试次数过多,失败的场景,如果减少睡眠时间,就能让业务正常执行。
如果同时请求,你将会发现以下错误信息:
表示我们的锁的确生效了,避免了重复请求。
六、总结
对于耗时业务和核心数据,不能让重复的请求同时操作数据,避免数据的不正确,所以要使用分布式锁来对它们进行保护。
再来梳理一下设计流程:
新建注解 @interface,在注解里设定入参标志
增加 aop 切点,扫描特定注解
建立 @aspect 切面任务,注册 bean 和拦截特定方法4、特定方法参数 proceedingjoinpoint,对方法 pjp.proceed() 前后进行拦截5、切点前进行加锁,任务执行后进行删除 key


简述智能化集成系统(IBMS)联动设计技术要求
信通院李珊:全社会发力推动5G发展,5G领先新基建
一加6T到底有多流畅
传感器有哪些特性_传感器误差分析
物联网独白:我正在行动!
如何使用注解实现redis分布式锁!
三星S9+的性能跑分已经出炉,其强悍的性能给我们留下了深刻的印象
储能连接器公司谈产品可靠性与什么有关
霍尔元件工作原理视频
视联网的实现路径及发展的三大要素解析
NEC与三星合作共同开发5G基站,并利用其销售网络在全球推广彼此的产品
2018年开启深度学习硬件大战 英伟达/AMD/英特尔谁能笑到最后
关于在腾讯云上解析LPWAN节点数据的步骤介绍
未来几年商用5G网络将获得大量部署,到2025年将覆盖全球近三分之一的人口
关于德国仿生机器人的简单分析介绍
PCB阻焊设计对PCBA的影响
特斯拉断电甩锅国网,专家表态大概率汽车问题
太阳能警示灯制作,solar warning light
RIM新操作系统将让人震惊
尼康D6与尼康Z5性能差异比较