nettyversion: 4.1.55.final
传统的io模型的web容器,比如老版本的tomcat,为了增加系统的吞吐量,需要不断增加系统核心线程数量,或者通过水平扩展服务器数量,来增加系统处理请求的能力。 有了nio之后,一个线程即可处理多个连接事件,基于多路复用模型的netty框架,不仅降低了使用nio的复杂度,
优点netty是一款以java nio为基础,基于事件驱动模型支持异步、高并发的网络应用框架
api使用简单,开发门槛低,简化了nio开发网络程序的复杂度功能强大,预置多种编解码功能,支持多种主流协议,比如http、websocket。定制能力强,可以通过channelhandler对通信框架灵活扩展。性能高,支持异步非阻塞通信模型成熟稳定,社区活跃,已经修复了java nio所有的bug。经历了大规模商业应用的考验,质量有保证。io模型select、poll和epoll
操作系统内核基于这些函数实现非阻塞io,以此实现多路复用模型
selectselect
select 调用需要传入 fd 数组,需要拷贝一份到内核,高并发场景下这样的拷贝消耗的资源是惊人的。(可优化为不复制)select 在内核层仍然是通过遍历的方式检查文件描述符的就绪状态,是个同步过程,只不过无系统调用切换上下文的开销。(内核层可优化为异步事件通知)select 仅仅返回可读文件描述符的个数,具体哪个可读还是要用户自己遍历。(可优化为只返回给用户就绪的文件描述符,无需用户做无效的遍历)pool和 select 的主要区别就是,去掉了 select 只能监听 1024 个文件描述符的限制
epoolepool
内核中保存一份文件描述符集合,无需用户每次都重新传入,只需告诉内核修改的部分即可。内核不再通过轮询的方式找到就绪的文件描述符,而是通过异步 io 事件唤醒。内核仅会将有 io 事件的文件描述符返回给用户,用户也无需遍历整个文件描述符集合。reactor模型一、单reactor单线程 1)可以实现通过一个阻塞对象监听多个链接请求
2)reactor对象通过select监听客户端请求事件,通过dispatch进行分发
3)如果是建立链接请求,则由acceptor通过accept处理链接请求,然后创建一个handler对象处理完成链接后的各种事件
4)如果不是链接请求,则由reactor分发调用链接对应的handler来处理
5)handler会完成read->业务处理->send的完整业务流程
reactor
二、单reactor多线程 1)reactor对象通过select监听客户端请求事件,收到事件后,通过dispatch分发
2)如果是建立链接请求,则由acceptor通过accept处理链接请求,然后创建一个handler对象处理完成链接后的各种事件
3)如果不是链接请求,则由reactor分发调用链接对应的handler来处理
4)handler只负责事件响应不做具体业务处理
5)通过read读取数据后,分发到worker线程池处理,处理完成后返回给handler,handler收到后,通过send将结果返回给client
reactor
三、主从reactor多线程 1)reactor主线程mainreactor对象通过select监听链接事件,通过acceptor处理
2)当acceptor处理链接事件后,mainreactor将链接分配给subreactor
3)subreactor将链接加入到队列进行监听,并创建handler进行事件处理
4)当有新事件发生时,subreactor就会调用对应的handler处理
5)handler通过read读取数据,分发到worker线程池处理,处理完成后返回给handler,handler收到后,通过send将结果返回给client
6)reactor主线程可以对应多个reactor子线程
reactor
三种模式用生活案例来理解 1)单reactor单线程,前台接待员和服务员是同一个人,全程为顾客服务
2)单reactor多线程,1个前台接待员,多个服务员,接待员只负责接待
3)主从reactor多线程,多个前台接待员,多个服务员
reactor模型具有如下优点 1)响应快,不必为单个同步事件所阻塞,虽然reactor本身依然是同步的
2)可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
3)扩展性好,可以方便的通过增加reactor实例个数来充分利用cpu资源
4)复用性好,reactor模型本身与具体事件处理逻辑无关,具有很高的复用性
核心组件1.bootstrap 一个netty应用通常由一个bootstrap开始,它主要作用是配置整个netty程序,串联起各个组件。
handler,为了支持各种协议和处理数据的方式,便诞生了handler组件。handler主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
2.channelinboundhandler 一个最常用的handler。这个handler的作用就是处理接收到数据时的事件,也就是说,我们的业务逻辑一般就是写在这个handler里面的,channelinboundhandler就是用来处理我们的核心业务逻辑。
3.channelinitializer 当一个链接建立时,我们需要知道怎么来接收或者发送数据,当然,我们有各种各样的handler实现来处理它,那么channelinitializer便是用来配置这些handler,它会提供一个channelpipeline,并把handler加入到channelpipeline。
4.channelpipeline 一个netty应用基于channelpipeline机制,这种机制需要依赖于eventloop和eventloopgroup,因为它们三个都和事件或者事件处理相关。
eventloops的目的是为channel处理io操作,一个eventloop可以为多个channel服务。
eventloopgroup会包含多个eventloop。
5.channel 代表了一个socket链接,或者其它和io操作相关的组件,它和eventloop一起用来参与io处理。
6.future 在netty中所有的io操作都是异步的,因此,你不能立刻得知消息是否被正确处理,但是我们可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过future和channelfutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发。
示例通过一个简单的示例,首先了解怎么基于netty开发一个通信程序,包括服务的与客户端:
server:
@slf4jpublic class server { private eventloopgroup boosgroup; private eventloopgroup workgroup; public server(int port){ try { init(port); log.info(----- 服务启动成功 -----); } catch (interruptedexception e) { log.error(启动服务出错:{}, e.getcause()); } } private void init(int port) throws interruptedexception { // 处理连接 this.boosgroup = new nioeventloopgroup(); // 处理业务 this.workgroup = new nioeventloopgroup(); serverbootstrap bootstrap = new serverbootstrap(); // 绑定 bootstrap.group(boosgroup, workgroup) .channel(nioserversocketchannel.class) //配置服务端 .option(channeloption.connect_timeout_millis, 3000) .option(channeloption.so_backlog, 1024) .childoption(channeloption.tcp_nodelay, true) .childoption(channeloption.so_rcvbuf, 1024) .childoption(channeloption.so_sndbuf, 1024) .childhandler(new channelinitializer() { @override protected void initchannel(socketchannel socketchannel) throws exception { socketchannel.pipeline().addlast(new serverhandler()); } }); channelfuture channelfuture = bootstrap.bind(port).sync(); channelfuture.channel().closefuture().sync(); } public void close(){ this.boosgroup.shutdowngracefully(); this.workgroup.shutdowngracefully(); }}@slf4jclass serverhandler extends channelinboundhandleradapter { @override public void channelactive(channelhandlercontext ctx) throws exception { log.info( >> >> >> > server active); } @override public void channelread(channelhandlercontext ctx, object msg) throws exception { //1. 读取客户端的数据(缓存中去取并打印到控制台) bytebuf buf = (bytebuf) msg; byte[] request = new byte[buf.readablebytes()]; buf.readbytes(request); string requestbody = new string(request, utf-8); log.info( >> >> >> >> > receive message: {}, requestbody); //2. 返回响应数据 ctx.writeandflush(unpooled.copiedbuffer((requestbody+ too).getbytes())); } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { super.exceptioncaught(ctx, cause); }}client:
@slf4jpublic class client { private eventloopgroup workgroup; private channelfuture channelfuture; public client(int port){ init(port); } private void init(int port){ this.workgroup = new nioeventloopgroup(); bootstrap bootstrap = new bootstrap(); bootstrap.group(workgroup) .channel(niosocketchannel.class) .option(channeloption.connect_timeout_millis, 3000) .option(channeloption.so_rcvbuf, 1024) .option(channeloption.so_sndbuf, 1024) .handler(new channelinitializer() { @override protected void initchannel(socketchannel socketchannel) throws exception { socketchannel.pipeline().addlast(new clienthandler()); } }); this.channelfuture = bootstrap.connect(127.0.0.1, port).syncuninterruptibly(); } /** * * @param message */ public void send(string message){ this.channelfuture.channel().writeandflush(unpooled.copiedbuffer(message.getbytes())); } /** * */ public void close(){ try { channelfuture.channel().closefuture().sync(); } catch (interruptedexception e) { throw new runtimeexception(e); } workgroup.shutdowngracefully(); }}@slf4jclass clienthandler extends channelinboundhandleradapter { @override public void channelactive(channelhandlercontext ctx) throws exception { log.info( >> >> >> > client active); } @override public void channelread(channelhandlercontext ctx, object msg) throws exception { try { bytebuf buf = (bytebuf) msg; byte[] req = new byte[buf.readablebytes()]; buf.readbytes(req); string body = new string(req, utf-8); log.info( >> >> >> >> > receive message: {}, body); } finally { referencecountutil.release(msg); } } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { super.exceptioncaught(ctx, cause); }}测试:
public class startertests { static int port = 9011; @test public void startserver(){ server server = new server(9011); } @test public void startclient(){ client client = new client(port); client.send(hello netty!); while (true){} }}生态dubbospring reactive类似技术mina、netty、grizzly
其他proactor非阻塞异步网络模型
疏散通道余压控制的相关建议
自动驾驶车辆ADAS系统的电磁抗干扰测试解决方案
探索汽车生产线规划过程
OLED拼接屏55英寸的应用广泛吗?
架空电力线路的防雷保护
基于多路复用模型的Netty框架
浅析交易机器人的利弊
大疆的MG-1系列无人机在工业行业上的应用
机器人租赁潜力大 但想要长远发展还需解决这四大难题
软通智服荣膺“数字化转型最佳服务商”称号
小米的2000亿美元的物联网之路
开关模式电源电流检测:电流检测方法
比特币突破58000美元每枚
荣耀5G机供不应求 将增加产能
realme官网
高通重新开始为iPhone 12智能手机提供零部件
了解PIPE4.4规范及PCIe 4.0的开发设计
回顾DIY硬件超频的历史 为什么现在变得越来越小众了
国产给力!今年Android手机续航能力飙升
储罐液位计的安装说明