前言 请求合并到底有什么意义呢?我们来看下图。
假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?
这里把数据库换成被调用的远程服务,也是同样的道理。
我们改变下思路,如下图所示。
我们在服务器端把请求合并,只发出一条sql查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求id,把数据分组,返回给对应用户。
基于 spring boot + mybatis plus + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/yunaiv/ruoyi-vue-pro 视频教程:https://doc.iocoder.cn/video/ 技术手段 linkedblockqueue 阻塞队列 scheduledthreadpoolexecutor 定时任务线程池 completeablefuture future 阻塞机制(java 8 的 completablefuture 并没有 timeout 机制,后面优化,使用了队列替代) 基于 spring cloud alibaba + gateway + nacos + rocketmq + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/yunaiv/yudao-cloud 视频教程:https://doc.iocoder.cn/video/ 代码实现 查询用户的代码 public interface userservice { map queryuserbyidbatch(list userreqs);}@servicepublic class userserviceimpl implements userservice { @resource private usersmapper usersmapper; @override public map queryuserbyidbatch(list userreqs) { // 全部参数 list userids = userreqs.stream().map(userwrapbatchservice.request::getuserid).collect(collectors.tolist()); querywrapper querywrapper = new querywrapper(); // 用in语句合并成一条sql,避免多次请求数据库的io querywrapper.in(id, userids); list users = usersmapper.selectlist(querywrapper); map usergroup = users.stream().collect(collectors.groupingby(users::getid)); hashmap result = new hashmap(); userreqs.foreach(val -> { list userslist = usergroup.get(val.getuserid()); if (!collectionutils.isempty(userslist)) { result.put(val.getrequestid(), userslist.get(0)); } else { // 表示没数据 result.put(val.getrequestid(), null); } }); return result; }} 合并请求的实现 package com.springboot.sample.service.impl;import com.springboot.sample.bean.users;import com.springboot.sample.service.userservice;import org.springframework.stereotype.service;import javax.annotation.postconstruct;import javax.annotation.resource;import java.util.*;import java.util.concurrent.*;/*** * zzq * 包装成批量执行的地方 * */@servicepublic class userwrapbatchservice { @resource private userservice userservice; /** * 最大任务数 **/ public static int max_task_num = 100; /** * 请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分 * completablefuture将处理结果返回 */ public class request { // 请求id 唯一 string requestid; // 参数 long userid; //todo java 8 的 completablefuture 并没有 timeout 机制 completablefuture completablefuture; public string getrequestid() { return requestid; } public void setrequestid(string requestid) { this.requestid = requestid; } public long getuserid() { return userid; } public void setuserid(long userid) { this.userid = userid; } public completablefuture getcompletablefuture() { return completablefuture; } public void setcompletablefuture(completablefuture completablefuture) { this.completablefuture = completablefuture; } } /* linkedblockingqueue是一个阻塞的队列,内部采用链表的结果,通过两个reentrantlock来保证线程安全 linkedblockingqueue与arrayblockingqueue的区别 arrayblockingqueue默认指定了长度,而linkedblockingqueue的默认长度是integer.max_value,也就是无界队列,在移除的速度小于添加的速度时,容易造成oom。 arrayblockingqueue的存储容器是数组,而linkedblockingqueue是存储容器是链表 两者的实现队列添加或移除的锁不一样,arrayblockingqueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个reenterlock锁, 而linkedblockingqueue实现的队列中的锁是分离的,其添加采用的是putlock,移除采用的则是takelock,这样能大大提高队列的吞吐量, 也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 */ private final queue queue = new linkedblockingqueue(); @postconstruct public void init() { //定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池 scheduledexecutorservice scheduledexecutorservice = executors.newscheduledthreadpool(1); scheduledexecutorservice.scheduleatfixedrate(() -> { int size = queue.size(); //如果队列没数据,表示这段时间没有请求,直接返回 if (size == 0) { return; } list list = new arraylist(); system.out.println(合并了 [ + size + ] 个请求); //将队列的请求消费到一个集合保存 for (int i = 0; i < size; i++) { // 后面的sql语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行 if (i < max_task_num) { list.add(queue.poll()); } } //拿到我们需要去数据库查询的特征,保存为集合 list userreqs = new arraylist(); for (request request : list) { userreqs.add(request); } //将参数传入service处理, 这里是本地服务,也可以把userservice 看成rpc之类的远程调用 map response = userservice.queryuserbyidbatch(userreqs); //将处理结果返回各自的请求 for (request request : list) { users result = response.get(request.requestid); request.completablefuture.complete(result); //completablefuture.complete方法完成赋值,这一步执行完毕,下面future.get()阻塞的请求可以继续执行了 } }, 100, 10, timeunit.milliseconds); //scheduleatfixedrate是周期性执行 schedule是延迟执行 initialdelay是初始延迟 period是周期间隔 后面是单位 //这里我写的是 初始化后100毫秒后执行,周期性执行10毫秒执行一次 } public users queryuser(long userid) { request request = new request(); // 这里用uuid做请求id request.requestid = uuid.randomuuid().tostring().replace(-, ); request.userid = userid; completablefuture future = new completablefuture(); request.completablefuture = future; //将对象传入队列 queue.offer(request); //如果这时候没完成赋值,那么就会阻塞,直到能够拿到值 try { return future.get(); } catch (interruptedexception e) { e.printstacktrace(); } catch (executionexception e) { e.printstacktrace(); } return null; }} 控制层调用 /*** * 请求合并 * */@requestmapping(/merge)public callable merge(long userid) { return new callable() { @override public users call() throws exception { return userbatchservice.queryuser(userid); } };} callable是什么可以参考:
https://blog.csdn.net/baidu_19473529/article/details/123596792 模拟高并发查询的代码 package com.springboot.sample;import org.springframework.web.client.resttemplate;import java.util.random;import java.util.concurrent.countdownlatch;public class testbatch { private static int threadcount = 30; private final static countdownlatch count_down_latch = new countdownlatch(threadcount); //为保证30个线程同时并发运行 private static final resttemplate resttemplate = new resttemplate(); public static void main(string[] args) { for (int i = 0; i < threadcount; i++) {//循环开30个线程 new thread(new runnable() { public void run() { count_down_latch.countdown();//每次减一 try { count_down_latch.await(); //此处等待状态,为了让30个线程同时进行 } catch (interruptedexception e) { e.printstacktrace(); } for (int j = 1; j <= 3; j++) { int param = new random().nextint(4); if (param { int size = queue.size(); //如果队列没数据,表示这段时间没有请求,直接返回 if (size == 0) { return; } list list = new arraylist(); system.out.println(合并了 [ + size + ] 个请求); //将队列的请求消费到一个集合保存 for (int i = 0; i < size; i++) { // 后面的sql语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行 if (i < max_task_num) { list.add(queue.poll()); } } //拿到我们需要去数据库查询的特征,保存为集合 list userreqs = new arraylist(); for (request request : list) { userreqs.add(request); } //将参数传入service处理, 这里是本地服务,也可以把userservice 看成rpc之类的远程调用 map response = userservice.queryuserbyidbatchqueue(userreqs); for (request userreq : userreqs) { // 这里再把结果放到队列里 users users = response.get(userreq.getrequestid()); userreq.usersqueue.offer(users); } }, 100, 10, timeunit.milliseconds); //scheduleatfixedrate是周期性执行 schedule是延迟执行 initialdelay是初始延迟 period是周期间隔 后面是单位 //这里我写的是 初始化后100毫秒后执行,周期性执行10毫秒执行一次 } public users queryuser(long userid) { request request = new request(); // 这里用uuid做请求id request.requestid = uuid.randomuuid().tostring().replace(-, ); request.userid = userid; linkedblockingqueue usersqueue = new linkedblockingqueue(); request.usersqueue = usersqueue; //将对象传入队列 queue.offer(request); //取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒 try { return usersqueue.poll(3000,timeunit.milliseconds); } catch (interruptedexception e) { e.printstacktrace(); } return null; }}...省略.. @override public map queryuserbyidbatchqueue(list userreqs) { // 全部参数 list userids = userreqs.stream().map(userwrapbatchqueueservice.request::getuserid).collect(collectors.tolist()); querywrapper querywrapper = new querywrapper(); // 用in语句合并成一条sql,避免多次请求数据库的io querywrapper.in(id, userids); list users = usersmapper.selectlist(querywrapper); map usergroup = users.stream().collect(collectors.groupingby(users::getid)); hashmap result = new hashmap(); // 数据分组 userreqs.foreach(val -> { list userslist = usergroup.get(val.getuserid()); if (!collectionutils.isempty(userslist)) { result.put(val.getrequestid(), userslist.get(0)); } else { // 表示没数据 , 这里要new,不然加入队列会空指针 result.put(val.getrequestid(), new users()); } }); return result; }...省略... 小结 请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他rpc调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。
代码地址 https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5 参考 https://www.cnblogs.com/oyjg/p/13099998.html
我国将于2017年建成时速400公里真空管道轨道试验线
使用电磁流量计被测定媒介中有气泡怎么办
未来智慧医院的建设方向
如何给排针排母电镀?
科创板睿创微纳独立董事孙志梅介绍、履历信息
高并发场景下请求合并
什么是TLBs/VALU/VLIW?
如何使用Arduino和非晶体管控制小型直流电动机
8848M5评测 扎实进取发展质量过硬的产品才是重中之重
随着时间的推移 智能音箱的发展变得力不从心
手机摄像头巧接笔记本电脑,Mobiola web Camera
2019 Q2 中国公有云市场,阿里云第一,百度智能云增速亮眼
蓄电池充电方法 蓄电池充电注意事项
1Q19国内面板商增长动能显著 领先日本、韩国及台湾对手
莫仕汽车电路测试提高了车辆的性能和可靠性
如何绕过CDN获取服务器真实IP地址呢
Prada时尚智能手机LG P940(LG Prada K2)再次曝光
新的宽带隙半导体技术提高了功率转换效率
这样的小米6要发布了,华为P10还怎么卖?
连接成形综合实验