实现一个秒杀系统,采用spring boot 2.x + mybatis+ redis + swagger2 + lombok实现。
先说说基本流程,就是提供一个秒杀接口,然后针对秒杀接口进行限流,限流的方式目前我实现了两种,上次实现的是累计计数方式,这次还有这个功能,并且我增加了令牌桶方式的lua脚本进行限流。
然后不被限流的数据进来之后,加一把分布式锁,获取分布式锁之后就可以对数据库进行操作了。直接操作数据库的方式可以,但是速度会比较慢,咱们直接通过一个初始化接口,将库存数据放到缓存中,然后对缓存中的数据进行操作。
写库的操作采用异步方式,实现的方式就是将操作好的数据放入到队列中,然后由另一个线程对队列进行消费。当然,也可以将数据直接写入mq中,由另一个线程进行消费,这样也更稳妥。
好了,看一下项目的基本结构:
看一下入口controller类,入口类有两个方法,一个是初始化订单的方法,即秒杀开始的时候,秒杀接口才会有效,这个方法可以采用定时任务自动实现也可以。
初始化后就可以调用placeorder的方法了。在placeorder上面有个自定义的注解distrilimitanno,这个是我在上篇文章写的,用作限流使用。
采用的方式目前有两种,一种是使用计数方式限流,一种方式是令牌桶,上次使用了计数,咱们这次采用令牌桶方式实现。
package com.hqs.flashsales.controller;
import com.hqs.flashsales.annotation.distrilimitanno;import com.hqs.flashsales.aspect.limitaspect;import com.hqs.flashsales.lock.distributedlock;import com.hqs.flashsales.limit.distributedlimit;import com.hqs.flashsales.service.orderservice;import lombok.extern.slf4j.slf4j;import org.springframework.beans.factory.annotation.autowired;import org.springframework.data.redis.core.redistemplate;import org.springframework.data.redis.core.script.redisscript;import org.springframework.stereotype.controller;import org.springframework.web.bind.annotation.getmapping;import org.springframework.web.bind.annotation.postmapping;import org.springframework.web.bind.annotation.responsebody;
import javax.annotation.resource;import java.util.collections;
/** * @author huangqingshi * @date 2019-01-23 */@slf4j@controllerpublic class flashsalecontroller {
@autowired orderservice orderservice; @autowired distributedlock distributedlock; @autowired limitaspect limitaspect; //注意redistemplate用的string,string,后续所有用到的key和value都是string的 @autowired redistemplate redistemplate;
private static final string lock_pre = lock_order;
@postmapping(/initcatalog) @responsebody public string initcatalog() { try { orderservice.initcatalog(); } catch (exception e) { log.error(error, e); }
return init is ok; }
@postmapping(/placeorder) @responsebody @distrilimitanno(limitkey = limit, limit = 100, seconds = 1) public long placeorder(long orderid) { long saleorderid = 0l; boolean locked = false; string key = lock_pre + orderid; string uuid = string.valueof(orderid); try { locked = distributedlock.distributedlock(key, uuid, 10 ); if(locked) { //直接操作数据库// saleorderid = orderservice.placeorder(orderid); //操作缓存 异步操作数据库 saleorderid = orderservice.placeorderwithqueue(orderid); } log.info(saleorderid:{}, saleorderid); } catch (exception e) { log.error(e.getmessage()); } finally { if(locked) { distributedlock.distributedunlock(key, uuid); } } return saleorderid; }
}
令牌桶的方式比直接计数更加平滑,直接计数可能会瞬间达到最高值,令牌桶则把最高峰给削掉了,令牌桶的基本原理就是有一个桶装着令牌,然后又一队人排队领取令牌,领到令牌的人就可以去做做自己想做的事情了,没有领到令牌的人直接就走了(也可以重新排队)。
发令牌是按照一定的速度发放的,所以这样在多人等令牌的时候,很多人是拿不到的。当桶里边的令牌在一定时间内领完后,则没有令牌可领,都直接走了。如果过了一定的时间之后可以再次把令牌桶装满供排队的人领。
基本原理是这样的,看一下脚本简单了解一下,里边有一个key和四个参数,第一个参数是获取一个令牌桶的时间间隔,第二个参数是重新填装令牌的时间(精确到毫秒),第三个是令牌桶的数量限制,第四个是隔多长时间重新填装令牌桶。
-- bucket namelocal key = keys[1]-- token generate intervallocal intervalperpermit = tonumber(argv[1])-- grant timestamplocal refilltime = tonumber(argv[2])-- limit token countlocal limit = tonumber(argv[3])-- ratelimit time periodlocal interval = tonumber(argv[4])
local counter = redis.call('hgetall', key)
if table.getn(counter) == 0 then -- first check if bucket not exists, if yes, create a new one with full capacity, then grant access redis.call('hmset', key, 'lastrefilltime', refilltime, 'tokensremaining', limit - 1) -- expire will save memory redis.call('expire', key, interval) return 1elseif table.getn(counter) == 4 then -- if bucket exists, first we try to refill the token bucket local lastrefilltime, tokensremaining = tonumber(counter[2]), tonumber(counter[4]) local currenttokens if refilltime > lastrefilltime then -- check if refilltime larger than lastrefilltime. -- if not, it means some other operation later than this call made the call first. -- there is no need to refill the tokens. local intervalsincelast = refilltime - lastrefilltime if intervalsincelast > interval then currenttokens = limit redis.call('hset', key, 'lastrefilltime', refilltime) else local grantedtokens = math.floor(intervalsincelast / intervalperpermit) if grantedtokens > 0 then -- ajust lastrefilltime, we want shift left the refill time. local padmillis = math.fmod(intervalsincelast, intervalperpermit) redis.call('hset', key, 'lastrefilltime', refilltime - padmillis) end currenttokens = math.min(grantedtokens + tokensremaining, limit) end else -- if not, it means some other operation later than this call made the call first. -- there is no need to refill the tokens. currenttokens = tokensremaining end
assert(currenttokens >= 0)
if currenttokens == 0 then -- we didn't consume any keys redis.call('hset', key, 'tokensremaining', currenttokens) return 0 else -- we take 1 token from the bucket redis.call('hset', key, 'tokensremaining', currenttokens - 1) return 1 endelse error(size of counter is .. table.getn(counter) .. , should be 0 or 4.)end
看一下调用令牌桶lua的java代码,也比较简单:
public boolean distributedratelimit(string key, string limit, string seconds) { long id = 0l; long intervalinmills = long.valueof(seconds) * 1000; long limitinlong = long.valueof(limit); long intervalperpermit = intervalinmills / limitinlong;// long refilltime = system.currenttimemillis();// log.info(调用redis执行lua脚本, {} {} {} {} {}, ratelimit, intervalperpermit, refilltime,// limit, intervalinmills); try { id = redistemplate.execute(ratelimitscript, collections.singletonlist(key), string.valueof(intervalperpermit), string.valueof(system.currenttimemillis()), string.valueof(limitinlong), string.valueof(intervalinmills)); } catch (exception e) { log.error(error, e); }
if(id == 0l) { return false; } else { return true; } }
创建两张简单表,一个库存表,一个是销售订单表:
create table `catalog` ( `id` int(11) unsigned not null auto_increment, `name` varchar(50) not null default '' comment '名称', `total` int(11) not null comment '库存', `sold` int(11) not null comment '已售', `version` int(11) null comment '乐观锁,版本号', primary key (`id`)) engine=innodb default charset=utf8;
create table `sales_order` ( `id` int(11) unsigned not null auto_increment, `cid` int(11) not null comment '库存id', `name` varchar(30) not null default '' comment '商品名称', `create_time` timestamp not null default current_timestamp on update current_timestamp comment '创建时间', primary key (`id`)) engine=innodb default charset=utf8;
基本已经准备完毕,然后启动程序,打开swagger(http://localhost:8080/swagger-ui.html#),执行初始化方法initcatalog:
日志里边会输出初始化的记录内容,初始化库存为1000:
初始化执行的方法,十分简单,写到缓存中。
@override public void initcatalog() { catalog catalog = new catalog(); catalog.setname(mac); catalog.settotal(1000l); catalog.setsold(0l); catalogmapper.insertcatalog(catalog); log.info(catalog:{}, catalog); redistemplate.opsforvalue().set(catalog_total + catalog.getid(), catalog.gettotal().tostring()); redistemplate.opsforvalue().set(catalog_sold + catalog.getid(), catalog.getsold().tostring()); log.info(redis value:{}, redistemplate.opsforvalue().get(catalog_total + catalog.getid())); handlecatalog(); }
我写了一个测试类,启动3000个线程,然后去进行下单请求:
package com.hqs.flashsales;
import lombok.extern.slf4j.slf4j;import org.junit.test;import org.junit.runner.runwith;import org.springframework.beans.factory.annotation.autowired;import org.springframework.boot.test.context.springboottest;import org.springframework.boot.test.web.client.testresttemplate;import org.springframework.test.context.junit4.springrunner;import org.springframework.util.linkedmultivaluemap;import org.springframework.util.multivaluemap;
import java.util.concurrent.timeunit;
@slf4j@runwith(springrunner.class)@springboottest(classes = flashsalesapplication.class, webenvironment = springboottest.webenvironment.random_port)public class flashsalesapplicationtests {
@autowired private testresttemplate testresttemplate;
@test public void flashsaletest() { string url = http://localhost:8080/placeorder; for(int i = 0; i { multivaluemap params = new linkedmultivaluemap(); params.add(orderid, 1); long result = testresttemplate.postforobject(url, params, long.class); if(result != 0) { system.out.println(------------- + result); } } ).start(); } catch (exception e) { log.info(error:{}, e.getmessage()); }
} }
@test public void contextloads() { }
}
然后开始运行测试代码,查看一下测试日志和程序日志,均显示卖了1000后直接显示sold out了。分别看一下日志和数据库:
商品库存catalog表和订单明细表sales_order表,都是1000条,没有问题。
总结:
通过采用分布式锁和分布式限流,即可实现秒杀流程,当然分布式限流也可以用到很多地方,比如限制某些ip在多久时间访问接口多少次,都可以的。
令牌桶的限流方式使得请求可以得到更加平滑的处理,不至于瞬间把系统达到最高负载。在这其中其实还有一个小细节,就是redis的锁,单机情况下没有任何问题,如果是集群的话需要注意,一个key被hash到同一个slot的时候没有问题,如果说扩容或者缩容的话,如果key被hash到不同的slot,程序可能会出问题。
在写代码的过程中还出现了一个小问题,就是写controller的方法的时候,方法一定要声明成public的,否则自定义的注解用不了,其他service的注解直接变为空,这个问题也是找了很久才找到。
磐石测控:深圳拉力试验机怎么校验?
AIoT已成为各大行业智能化升级的最佳通道 也是物联网发展的重要方向
机器人再厉害,也永远不会超越人类
智能家居的4点好处详细说明
三星Galaxy S24系列国行版发布
如何实现一个秒杀系统
Oculus浏览器10.2版本发布 刷新率保持在72Hz左右,整体表现非常流畅
电瓶修复技术—想让你电动车电池更持久吗?保养专业店方法给你
泰克提供业界首创的 PCIe 6.0 测试解决方案
搞定ESD:ESD干扰耦合路径深入分析(一)
单片机下载程序的三种方式介绍
一种结合数据完整性保护和数据机密性保护的安全路由协议
aigo国民好物固态硬盘P3000:性能出色性价比高
直降200!高颜值+旗舰级功能体验OPPOA95直接冲就对了
如何使用模拟和数字万用表测试二极管?
vivo Z6新机预热,搭载搭载骁龙765G SoC芯片
wm是什么意思
北美半导体设备订单出货比连续下降,半导体不妙?
领途明确产品定位,专注生产小型优质纯电动汽车
使用虹科模块化数字化仪进行车辆测试