如何实现一个秒杀系统

实现一个秒杀系统,采用spring boot 2.x + mybatis+ redis + swagger2 + lombok实现。
先说说基本流程,就是提供一个秒杀接口,然后针对秒杀接口进行限流,限流的方式目前我实现了两种,上次实现的是累计计数方式,这次还有这个功能,并且我增加了令牌桶方式的lua脚本进行限流。
然后不被限流的数据进来之后,加一把分布式锁,获取分布式锁之后就可以对数据库进行操作了。直接操作数据库的方式可以,但是速度会比较慢,咱们直接通过一个初始化接口,将库存数据放到缓存中,然后对缓存中的数据进行操作。
写库的操作采用异步方式,实现的方式就是将操作好的数据放入到队列中,然后由另一个线程对队列进行消费。当然,也可以将数据直接写入mq中,由另一个线程进行消费,这样也更稳妥。
好了,看一下项目的基本结构:
看一下入口controller类,入口类有两个方法,一个是初始化订单的方法,即秒杀开始的时候,秒杀接口才会有效,这个方法可以采用定时任务自动实现也可以。
初始化后就可以调用placeorder的方法了。在placeorder上面有个自定义的注解distrilimitanno,这个是我在上篇文章写的,用作限流使用。
采用的方式目前有两种,一种是使用计数方式限流,一种方式是令牌桶,上次使用了计数,咱们这次采用令牌桶方式实现。
package com.hqs.flashsales.controller;  
import com.hqs.flashsales.annotation.distrilimitanno;import com.hqs.flashsales.aspect.limitaspect;import com.hqs.flashsales.lock.distributedlock;import com.hqs.flashsales.limit.distributedlimit;import com.hqs.flashsales.service.orderservice;import lombok.extern.slf4j.slf4j;import org.springframework.beans.factory.annotation.autowired;import org.springframework.data.redis.core.redistemplate;import org.springframework.data.redis.core.script.redisscript;import org.springframework.stereotype.controller;import org.springframework.web.bind.annotation.getmapping;import org.springframework.web.bind.annotation.postmapping;import org.springframework.web.bind.annotation.responsebody;
import javax.annotation.resource;import java.util.collections;
/** * @author huangqingshi * @date 2019-01-23 */@slf4j@controllerpublic class flashsalecontroller {
   @autowired    orderservice orderservice;    @autowired    distributedlock distributedlock;    @autowired    limitaspect limitaspect;    //注意redistemplate用的string,string,后续所有用到的key和value都是string的    @autowired    redistemplate redistemplate;
   private static final string lock_pre = lock_order;
   @postmapping(/initcatalog)    @responsebody    public string initcatalog()  {        try {            orderservice.initcatalog();        } catch (exception e) {            log.error(error, e);        }
       return init is ok;    }
   @postmapping(/placeorder)    @responsebody    @distrilimitanno(limitkey = limit, limit = 100, seconds = 1)    public long placeorder(long orderid) {        long saleorderid = 0l;        boolean locked = false;        string key = lock_pre + orderid;        string uuid = string.valueof(orderid);        try {            locked = distributedlock.distributedlock(key, uuid,                    10 );            if(locked) {                //直接操作数据库//                saleorderid = orderservice.placeorder(orderid);                //操作缓存 异步操作数据库                saleorderid = orderservice.placeorderwithqueue(orderid);            }            log.info(saleorderid:{}, saleorderid);        } catch (exception e) {            log.error(e.getmessage());        } finally {            if(locked) {                distributedlock.distributedunlock(key, uuid);            }        }        return saleorderid;    }
}
令牌桶的方式比直接计数更加平滑,直接计数可能会瞬间达到最高值,令牌桶则把最高峰给削掉了,令牌桶的基本原理就是有一个桶装着令牌,然后又一队人排队领取令牌,领到令牌的人就可以去做做自己想做的事情了,没有领到令牌的人直接就走了(也可以重新排队)。
发令牌是按照一定的速度发放的,所以这样在多人等令牌的时候,很多人是拿不到的。当桶里边的令牌在一定时间内领完后,则没有令牌可领,都直接走了。如果过了一定的时间之后可以再次把令牌桶装满供排队的人领。
基本原理是这样的,看一下脚本简单了解一下,里边有一个key和四个参数,第一个参数是获取一个令牌桶的时间间隔,第二个参数是重新填装令牌的时间(精确到毫秒),第三个是令牌桶的数量限制,第四个是隔多长时间重新填装令牌桶。
-- bucket namelocal key = keys[1]-- token generate intervallocal intervalperpermit = tonumber(argv[1])-- grant timestamplocal refilltime = tonumber(argv[2])-- limit token countlocal limit = tonumber(argv[3])-- ratelimit time periodlocal interval = tonumber(argv[4])  
local counter = redis.call('hgetall', key)
if table.getn(counter) == 0 then    -- first check if bucket not exists, if yes, create a new one with full capacity, then grant access    redis.call('hmset', key, 'lastrefilltime', refilltime, 'tokensremaining', limit - 1)    -- expire will save memory    redis.call('expire', key, interval)    return 1elseif table.getn(counter) == 4 then    -- if bucket exists, first we try to refill the token bucket    local lastrefilltime, tokensremaining = tonumber(counter[2]), tonumber(counter[4])    local currenttokens    if refilltime > lastrefilltime then        -- check if refilltime larger than lastrefilltime.        -- if not, it means some other operation later than this call made the call first.        -- there is no need to refill the tokens.        local intervalsincelast = refilltime - lastrefilltime        if intervalsincelast > interval then            currenttokens = limit            redis.call('hset', key, 'lastrefilltime', refilltime)        else            local grantedtokens = math.floor(intervalsincelast / intervalperpermit)            if grantedtokens > 0 then                -- ajust lastrefilltime, we want shift left the refill time.                local padmillis = math.fmod(intervalsincelast, intervalperpermit)                redis.call('hset', key, 'lastrefilltime', refilltime - padmillis)            end            currenttokens = math.min(grantedtokens + tokensremaining, limit)        end    else        -- if not, it means some other operation later than this call made the call first.        -- there is no need to refill the tokens.        currenttokens = tokensremaining    end
   assert(currenttokens >= 0)
   if currenttokens == 0 then        -- we didn't consume any keys        redis.call('hset', key, 'tokensremaining', currenttokens)        return 0    else        -- we take 1 token from the bucket        redis.call('hset', key, 'tokensremaining', currenttokens - 1)        return 1    endelse    error(size of counter is .. table.getn(counter) .. , should be 0 or 4.)end
看一下调用令牌桶lua的java代码,也比较简单:
public boolean distributedratelimit(string key, string limit, string seconds) {        long id = 0l;        long intervalinmills = long.valueof(seconds) * 1000;        long limitinlong = long.valueof(limit);        long intervalperpermit = intervalinmills / limitinlong;//        long refilltime = system.currenttimemillis();//        log.info(调用redis执行lua脚本, {} {} {} {} {}, ratelimit, intervalperpermit, refilltime,//                limit, intervalinmills);        try {             id = redistemplate.execute(ratelimitscript, collections.singletonlist(key),                    string.valueof(intervalperpermit), string.valueof(system.currenttimemillis()),                    string.valueof(limitinlong), string.valueof(intervalinmills));        } catch (exception e) {            log.error(error, e);        }  
       if(id == 0l) {            return false;        } else {            return true;        }    }
创建两张简单表,一个库存表,一个是销售订单表:
create table `catalog` (  `id` int(11) unsigned not null auto_increment,  `name` varchar(50) not null default '' comment '名称',  `total` int(11) not null comment '库存',  `sold` int(11) not null comment '已售',  `version` int(11) null comment '乐观锁,版本号',  primary key (`id`)) engine=innodb default charset=utf8;  
create table `sales_order` (  `id` int(11) unsigned not null auto_increment,  `cid` int(11) not null comment '库存id',  `name` varchar(30) not null default '' comment '商品名称',  `create_time` timestamp not null default current_timestamp on update current_timestamp comment '创建时间',  primary key (`id`)) engine=innodb default charset=utf8;
基本已经准备完毕,然后启动程序,打开swagger(http://localhost:8080/swagger-ui.html#),执行初始化方法initcatalog:
日志里边会输出初始化的记录内容,初始化库存为1000:
初始化执行的方法,十分简单,写到缓存中。
@override    public void initcatalog() {        catalog catalog = new catalog();        catalog.setname(mac);        catalog.settotal(1000l);        catalog.setsold(0l);        catalogmapper.insertcatalog(catalog);        log.info(catalog:{}, catalog);        redistemplate.opsforvalue().set(catalog_total + catalog.getid(), catalog.gettotal().tostring());        redistemplate.opsforvalue().set(catalog_sold + catalog.getid(), catalog.getsold().tostring());        log.info(redis value:{}, redistemplate.opsforvalue().get(catalog_total + catalog.getid()));        handlecatalog();    }  
我写了一个测试类,启动3000个线程,然后去进行下单请求:
package com.hqs.flashsales;  
import lombok.extern.slf4j.slf4j;import org.junit.test;import org.junit.runner.runwith;import org.springframework.beans.factory.annotation.autowired;import org.springframework.boot.test.context.springboottest;import org.springframework.boot.test.web.client.testresttemplate;import org.springframework.test.context.junit4.springrunner;import org.springframework.util.linkedmultivaluemap;import org.springframework.util.multivaluemap;
import java.util.concurrent.timeunit;
@slf4j@runwith(springrunner.class)@springboottest(classes = flashsalesapplication.class, webenvironment = springboottest.webenvironment.random_port)public class flashsalesapplicationtests {
   @autowired    private testresttemplate testresttemplate;
   @test    public void flashsaletest() {        string url = http://localhost:8080/placeorder;        for(int i = 0; i {                    multivaluemap params = new linkedmultivaluemap();                    params.add(orderid, 1);                    long result = testresttemplate.postforobject(url, params, long.class);                    if(result != 0) {                        system.out.println(------------- + result);                    }                }                ).start();            } catch (exception e) {                log.info(error:{}, e.getmessage());            }
       }    }
   @test    public void contextloads() {    }
}
然后开始运行测试代码,查看一下测试日志和程序日志,均显示卖了1000后直接显示sold out了。分别看一下日志和数据库:
商品库存catalog表和订单明细表sales_order表,都是1000条,没有问题。
总结:
通过采用分布式锁和分布式限流,即可实现秒杀流程,当然分布式限流也可以用到很多地方,比如限制某些ip在多久时间访问接口多少次,都可以的。
令牌桶的限流方式使得请求可以得到更加平滑的处理,不至于瞬间把系统达到最高负载。在这其中其实还有一个小细节,就是redis的锁,单机情况下没有任何问题,如果是集群的话需要注意,一个key被hash到同一个slot的时候没有问题,如果说扩容或者缩容的话,如果key被hash到不同的slot,程序可能会出问题。
在写代码的过程中还出现了一个小问题,就是写controller的方法的时候,方法一定要声明成public的,否则自定义的注解用不了,其他service的注解直接变为空,这个问题也是找了很久才找到。


磐石测控:深圳拉力试验机怎么校验?
AIoT已成为各大行业智能化升级的最佳通道 也是物联网发展的重要方向
机器人再厉害,也永远不会超越人类
智能家居的4点好处详细说明
三星Galaxy S24系列国行版发布
如何实现一个秒杀系统
Oculus浏览器10.2版本发布 刷新率保持在72Hz左右,整体表现非常流畅
电瓶修复技术—想让你电动车电池更持久吗?保养专业店方法给你
泰克提供业界首创的 PCIe 6.0 测试解决方案
搞定ESD:ESD干扰耦合路径深入分析(一)
单片机下载程序的三种方式介绍
一种结合数据完整性保护和数据机密性保护的安全路由协议
aigo国民好物固态硬盘P3000:性能出色性价比高
直降200!高颜值+旗舰级功能体验OPPOA95直接冲就对了
如何使用模拟和数字万用表测试二极管?
vivo Z6新机预热,搭载搭载骁龙765G SoC芯片
wm是什么意思
北美半导体设备订单出货比连续下降,半导体不妙?
领途明确产品定位,专注生产小型优质纯电动汽车
使用虹科模块化数字化仪进行车辆测试