不依赖外部库的情况下,限流算法有什么实现的思路?本文介绍了3种实现限流的方式。
一、漏桶算法
算法思想 与令牌桶是“反向”的算法,当有请求到来时先放到木桶中,worker以固定的速度从木桶中取出请求进行相应。如果木桶已经满了,直接返回请求频率超限的错误码或者页面 适用场景流量最均匀的限流方式,一般用于流量“整形”,例如保护数据库的限流。先把对数据库的访问加入到木桶中,worker再以db能够承受的qps从木桶中取出请求,去访问数据库。不太适合电商抢购和微博出现热点事件等场景的限流,一是应对突发流量不是很灵活,二是为每个user_id/ip维护一个队列(木桶),workder从这些队列中拉取任务,资源的消耗会比较大。 go语言实现通常使用队列来实现,在go语言中可以通过buffered channel来快速实现,任务加入channel,开启一定数量的worker从channel中获取任务执行。 package mainimport ( fmt sync time)// 每个请求来了,把需要执行的业务逻辑封装成task,放入木桶,等待worker取出执行type task struct { handler func() result // worker从木桶中取出请求对象后要执行的业务逻辑函数 reschan chan result // 等待worker执行并返回结果的channel taskid int}// 封装业务逻辑的执行结果type result struct {}// 模拟业务逻辑的函数func handler() result { time.sleep(300 * time.millisecond) return result{}}func newtask(id int) task { return task{ handler: handler, reschan: make(chan result), taskid: id, }}// 漏桶type leakybucket struct { bucketsize int // 木桶的大小 numworker int // 同时从木桶中获取任务执行的worker数量 bucket chan task // 存方任务的木桶}func newleakybucket(bucketsize int, numworker int) *leakybucket { return &leakybucket{ bucketsize: bucketsize, numworker: numworker, bucket: make(chan task, bucketsize), }}func (b *leakybucket) validate(task task) bool { // 如果木桶已经满了,返回false select { case b.bucket <- task: default: fmt.printf(request[id=%d] is refused, task.taskid) return false } // 等待worker执行 <-task.reschan fmt.printf(request[id=%d] is run, task.taskid) return true}func (b *leakybucket) start() { // 开启worker从木桶拉取任务执行 go func() { for i := 0; i < b.numworker; i++ { go func() { for { task := <-b.bucket result := task.handler() task.reschan <- result } }() } }()}func main() { bucket := newleakybucket(10, 4) bucket.start() var wg sync.waitgroup for i := 0; i 100ms, 增加 (now - last) / 100ms个令牌。然后,如果令牌数 > 0,令牌数 -1 继续执行后续的业务逻辑,否则返回请求频率超限的错误码或页面。 package mainimport ( fmt sync time)// 并发访问同一个user_id/ip的记录需要上锁var recordmu map[string]*sync.rwmutexfunc init() { recordmu = make(map[string]*sync.rwmutex)}func max(a, b int) int { if a > b { return a } return b}type tokenbucket struct { bucketsize int // 木桶内的容量:最多可以存放多少个令牌 tokenrate time.duration // 多长时间生成一个令牌 records map[string]*record // 报错user_id/ip的访问记录}// 上次访问时的时间戳和令牌数type record struct { last time.time token int}func newtokenbucket(bucketsize int, tokenrate time.duration) *tokenbucket { return &tokenbucket{ bucketsize: bucketsize, tokenrate: tokenrate, records: make(map[string]*record), }}func (t *tokenbucket) getuidorip() string { // 获取请求用户的user_id或者ip地址 return 127.0.0.1}// 获取这个user_id/ip上次访问时的时间戳和令牌数func (t *tokenbucket) getrecord(uidorip string) *record { if r, ok := t.records[uidorip]; ok { return r } return &record{}}// 保存user_id/ip最近一次请求时的时间戳和令牌数量func (t *tokenbucket) storerecord(uidorip string, r *record) { t.records[uidorip] = r}// 验证是否能获取一个令牌func (t *tokenbucket) validate(uidorip string) bool { // 并发修改同一个用户的记录上写锁 rl, ok := recordmu[uidorip] if !ok { var mu sync.rwmutex rl = &mu recordmu[uidorip] = rl } rl.lock() defer rl.unlock() r := t.getrecord(uidorip) now := time.now() if r.last.iszero() { // 第一次访问初始化为最大令牌数 r.last, r.token = now, t.bucketsize } else { if r.last.add(t.tokenrate).before(now) { // 如果与上次请求的间隔超过了token rate // 则增加令牌,更新last r.token += max(int(now.sub(r.last) / t.tokenrate), t.bucketsize) r.last = now } } var result bool if r.token > 0 { // 如果令牌数大于1,取走一个令牌,validate结果为true r.token-- result = true } // 保存最新的record t.storerecord(uidorip, r) return result}// 返回是否被限流func (t *tokenbucket) islimited() bool { return !t.validate(t.getuidorip())}func main() { tokenbucket := newtokenbucket(5, 100*time.millisecond) for i := 0; i -1 { win = win[timeoutoffset+1:] } // 判断请求是否超限 var result bool if countreq(win) 0 { lastslot = win[len(win)-1] if lastslot.timestamp.add(l.slotduration).before(now) { lastslot = ×lot{timestamp: now, count: 1} win = append(win, lastslot) } else { lastslot.count++ } } else { lastslot = ×lot{timestamp: now, count: 1} win = append(win, lastslot) } l.storewindow(uidorip, win) return result}func (l *slidingwindowlimiter) getuidorip() string { return 127.0.0.1}func (l *slidingwindowlimiter) islimited() bool { return !l.validate(l.getuidorip())}func main() { limiter := newsliding(100*time.millisecond, time.second, 10) for i := 0; i < 5; i++ { fmt.println(limiter.islimited()) } time.sleep(100 * time.millisecond) for i := 0; i < 5; i++ { fmt.println(limiter.islimited()) } fmt.println(limiter.islimited()) for _, v := range limiter.windows[limiter.getuidorip()] { fmt.println(v.timestamp, v.count) } fmt.println(a thousand years later...) time.sleep(time.second) for i := 0; i < 7; i++ { fmt.println(limiter.islimited()) } for _, v := range limiter.windows[limiter.getuidorip()] { fmt.println(v.timestamp, v.count) }}
原文标题:几种限流算法的go语言实现
文章出处:【微信公众号:马哥linux运维】欢迎添加关注!文章转载请注明出处。
Xilinx Zynq-7000系列:基于ARM Cortex-A9内核处理器和FPGA结构
2023年EDA巨头的收购案件盘点
低功耗液晶电视LED背光源设计
继电器触点符号_继电器触点容量
出差旅行必备的倍思氮化镓充电器,它如何保障说走就走
一文详解限流算法的实现方式
排污水管水质在线监测系统和硬件供应数传平台
低时延!高可靠!华为云CDN赋能中小企业
C语言中预定义宏的用法和使用场景
广电总局计划2025年在5G网络和智慧广电建设上取得重要成果
苹果明年将在后置三摄像头上添加3D Sensing技术
iPhone手机怎样防止垃圾短信骚扰
FPGAs,Zynq和Zynq MPSoC器件的特点
多项数据显示,制造业或在3月迎来较大反弹
如何为树莓派选择最佳版本的Kodi
SF-EP1C开发板之基于74HC595的数码管实验
一个关于三相Vienna拓扑技术知识(上)
如何建设RISC-V软件生态?
清华大学发布《智能无人系统产业研究报告(2022版)》:北醒助力激光雷达产业蓬勃发展
金融大模型落地,还需跨越几重山?