从请求处理剖析 Netty
1. Netty 总览
Netty 编译需要操作系统 64 位,保证下载 jar 包为 64 位,下图是 Netty 的核心包。
底部是工具类,左边是 Netty 实现的 tcp 等协议支持,右边是编解码和 handler 。
codec 主要存放编解码;common 方工具类、过线程包、日志;example 用来抄代码;handler 用来实现人性化功能,方便大家使用;proxy 代理;microbench 测试用;resolver 地址解析;transport 主要是 tcp udp ;rxtx 和 udt 被放弃。
2. 启动服务
记录顺序:主线、源码、知识点
- our thread
- 创建 selector
- 创建 server socket channel
- 初始化 server socket channel
- 给 server socket channel 从 boss group 选择一个 NioEventLoop
- boss thread
- 将 server socket channel 注册到选择的 NioEventLoop 的 selector
- 绑定地址启动
- 注册接受连接事件 OP_ACCEPT 到 selector 上
1 | Selector selector = sun.nio.ch.SelectorProviderImpl.openSelector() |
- Selector 是在
new NioEventLoopGroup()
(创建一批 NioEventLoop)时创建。 - 第一次 Register 并不是监听 OP_ACCEPT,而是 0:
javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
- 最终监听 OP_ACCEPT 是通过 bind 完成后的
fireChannelActive()
来触发的。 - NioEventLoop 是通过 Register 操作的执行来完成启动的。
- 类似 ChannelInitializer,一些 Hander 可以设计成一次性的,用完就移除,例如授权。
channel 是连接;
eventloop 是为连接服务的执行器,或者说一个死循环 loop 轮询,处理 channel 上的事件 event ;
一个 channel 只会绑定到一个 eventloop ,但是一个 eventloop 可以服务多个 channel ;
eventloopgroup 相当于 eventloop 的多线程。
3. 构建连接
- boss thread
- NioEventLoop 中的 selector 轮询创建连接事件(OP_ACCEPT)
- 创建 socket channel
- 初始化 socket channel 并从 worker group 中选择一个 NioEventLoop
- worker thread
- 将 socket channel 注册到选择的 NioEventLoop 的 selector
- 注册读事件(OP_READ)到 selector 上
1 | 接收连接: |
- 创建连接的初始化和注册是通过 pipeline.fireChannelRead 在 ServerBootstrapAcceptor 中完成的。
- 第一次 Register 并不是监听 OP_READ ,而是 0 :
javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
- 最终监听 OP_READ 是通过“Register”完成后的 fireChannelActive (
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
中)来触发的。 - Worker’s NioEventLoop 是通过 Register 操作执行来启动。
- 接受连接的读操作,不会尝试读取更多次(16次)。
4. 接收数据
读数据技巧:
- 自适应数据大小的分配器 AdaptiveRecvByteBufAllocator :发放东西时,用多大桶装?小了不够,大了浪费。
- 连续度 defaultMaxMessagesPerRead :加入发放时,桶装满了,如果还要继续所以直接拿新的桶装,直到没有了或者需要给别人机会了才放弃。
- worker thread
- 多路复用器( Selector )接收到 OP_READ 事件
- 处理 OP_READ 事件:
NioSocketChannel.NioSocketChannelUnsafe.read()
- 分配一个初始 1024 字节的 byte buffer 来接受数据
- 从 Channel 接受数据到 byte buffer
- 记录实际接受数据大小,调整下次分配 byte buffer 大小
- 触发
pipeline.fireChannelRead(byteBuf)
把读取到的数据传播出去 - 判断接受 byte buffer 是否满载而归:是,尝试继续读取直到没有数据或满 16 次; 否,结束本轮读取,等待下次 OP_READ 事件
- 读取数据本质:
sun.nio.ch.SocketChannelImpl#read(java.nio.ByteBuffer)
NioSocketChannel read()
是读数据,NioServerSocketChannel read()
是创建连接pipeline.fireChannelReadComplete()
一次读事件处理完成pipeline.fireChannelRead(byteBuf)
一次读数据完成,一次读事件处理可能会包含多次读数据操作。- 为什么最多只尝试读取 16 次?NioEventLoop 是复用的,需要“雨露均沾”。
- AdaptiveRecvByteBufAllocator 对 bytebuf 的猜测:放大果断,缩小谨慎(需要连续 2 次判断)。
5. 业务处理
- worker thread
多路复用器( Selector )接收到 OP_READ 事件处理 OP_READ 事件:NioSocketChannel.NioSocketChannelUnsafe.read()
分配一个初始 1024 字节的 byte buffer 来接受数据从 Channel 接受数据到 byte buffer记录实际接受数据大小,调整下次分配 byte buffer 大小- 触发
pipeline.fireChannelRead(byteBuf)
把读取到的数据传播出去 判断接受 byte buffer 是否满载而归:是,尝试继续读取直到没有数据或满 16 次; 否,结束本轮读取,等待下次 OP_READ 事件
在 pipline 上的 handler 不会都执行,需要实现 ChannelInboundHandler
,同时不能加注解 @Skip 。
- 处理业务本质:数据在 pipeline 中所有的 handler 的
channelRead()
执行过程 - Handler 要实现
io.netty.channel.ChannelInboundHandler#channelRead (ChannelHandlerContext ctx, Object msg)
,且不能加注解 @Skip 才能被执行到。 - 中途可退出,不保证执行到 Tail Handler。
- 默认处理线程就是 Channel 绑定的 NioEventLoop 线程,也可以设置其他:
pipeline.addLast(new UnorderedThreadPoolEventExecutor(10), serverHandler)
6. 发送数据
- write : 将数据放到一个 buffer 里;
- flush : 从 buffer 中将数据发送出去;
- writeAndFlush : 写到 buffer 并发送;
- write 与 flush 之间有一个 ChannelOutboundBuffer 。
写数据技巧:
Netty 写数据,写不进去时,会停止写,然后注册一个 OP_WRITE 事件,来通知什么时候可以写进去了再写。
Netty 批量写数据时,如果想写的都写进去了,接下来的尝试写更多(调整maxBytesPerGatheringWrite)。
Netty 只要有数据要写,且能写的出去,则一直尝试,直到写不出去或者满 16 次(writeSpinCount)。
Netty 待写数据太多,超过一定的水位线
writeBufferWaterMark.high()
,会将可写的标志位改成false ,让应用端自己做决定要不要发送数据了。
- Write - 写数据到 buffer :
ChannelOutboundBuffer#addMessage
- Flush - 发送 buffer 里面的数据:
AbstractChannel.AbstractUnsafe#flush
- 准备数据:
ChannelOutboundBuffer#addFlush
- 发送:
NioSocketChannel#doWrite
- 写的本质:
- single write:
sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer)
- gathering write:
sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer[], int, int)
- single write:
- 写数据写不进去时,会停止写,注册一个 OP_WRITE 事件,来通知什么时候可以写进去了,再来 flush。
- OP_WRITE 不是说有数据可写,而是说可以写进去,所以正常情况,不能注册,否则一直触发。
- 批量写数据时,如果尝试写的都写进去了,接下来会尝试写更多( maxBytesPerGatheringWrite )。
- 只要有数据要写,且能写,则一直尝试,直到 16 次( writeSpinCount ),写 16 次还没有写完,就直 接 schedule 一个 task 来继续写,而不是用注册写事件来触发,更简洁有力。
- 待写数据太多,超过一定的水位线(
writeBufferWaterMark .high()
),会将可写的标志位改成 false , 让应用端自己做决定要不要继续写。 channelHandlerContext.channel().write()
:从 TailContext 开始执行。channelHandlerContext.write()
: 从当前的 Context 开始。
7. 断开连接
- 多路复用器(Selector)接收到 OP_READ 事件。
- 处理 OP_READ 事件,即
NioSocketChannel.NioSocketChannelUnsafe.read()
:- 接受数据
- 判断接受的数据大小是否 < 0 , 如果是,说明是关闭,开始执行关闭:
- 关闭 channel(包含 cancel 多路复用器的 key)。
- 清理消息:不接受新信息,fail 掉所有 queue 中消息。
- 触发 fireChannelInactive 和 fireChannelUnregistered 。
- 关闭连接本质:
java.nio.channels.spi.AbstractInterruptibleChannel#close
java.nio.channels.SelectionKey#cancel
- 要点:
- 关闭连接,会触发 OP_READ 方法。读取字节数是 -1 代表关闭。
- 数据读取进行时,强行关闭,触发 IO Exception,进而执行关闭。
- Channel 的关闭包含了 SelectionKey 的 cancel 。
8. 关闭服务
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
- 关闭所有 Group 中的 NioEventLoop :
- 修改 NioEventLoop 的 State 标志位
- NioEventLoop 判断 State 执行退出
- 关闭服务本质:
- 关闭所有连接及 Selector :
java.nio.channels.Selector#keys
java.nio.channels.spi.AbstractInterruptibleChannel#close
java.nio.channels.SelectionKey#cancel
selector.close()
- 关闭所有线程:退出循环体 for (;;)
- 关闭所有连接及 Selector :
- 关闭服务要点:
- 优雅(DEFAULT_SHUTDOWN_QUIET_PERIOD),静默期
- 可控(DEFAULT_SHUTDOWN_TIMEOUT),默认退出时间
- 先不接活,后尽量干完手头的活(先关 boss 后关 worker:不是100%保证)
Netty 的主线就是基于 Java NIO 的编程。