0%

Netty 的高性能设计思想

一 Linux 的 5 种 主要 IO 模式

实现高性能的网络应用框架离不开 I/O 模型问题,在了解 Netty 前先了解 I/O 模型的基本知识。

I/O 请求可以分为两个阶段,分别为调用阶段和执行阶段。

  • 第一个阶段为I/O 调用阶段,即用户进程向内核发起系统调用。
  • 第二个阶段为I/O 执行阶段。此时,内核等待 I/O 请求处理完成返回。该阶段分为两个过程:首先等待数据就绪,并写入内核缓冲区;随后将内核缓冲区数据拷贝至用户态缓冲区。

1.png

下面 对比Linux 的 5 种主要 I/O 模式以及优劣势都在哪里?

同步阻塞 I/O(BIO)

2.png

如上图所示,应用进程向内核发起 I/O 请求,发起调用的线程一直等待内核返回结果。一次完整的 I/O 请求称为BIO(Blocking IO,阻塞 I/O),所以 BIO 在实现异步操作时,只能使用多线程模型,一个请求对应一个线程。但是,线程的资源是有限且宝贵的,创建过多的线程会增加线程切换的开销。

同步非阻塞 I/O(NIO)

3.png

与BIO 不同,应用进程向内核发起 I/O 请求后不再会同步等待结果,而是会立即返回,通过轮询的方式获取请求结果。NIO 相比 BIO 虽然大幅提升了性能,但是轮询过程中大量的系统调用导致上下文切换开销很大。所以,单独使用非阻塞 I/O 时效率并不高,而且随着并发量的提升,非阻塞 I/O 会存在严重的性能浪费。

I/O 多路复用

4.png

多路复用实现了一个线程处理多个 I/O 句柄的操作。多路指的是多个数据通道,复用指的是使用一个或多个固定线程来处理每一个 Socket。select、poll、epoll 都是 I/O 多路复用的具体实现,线程一次 select 调用可以获取内核态中多个数据通道的数据状态。多路复用解决了同步阻塞 I/O 和同步非阻塞 I/O 的问题,是一种非常高效的 I/O 模型。

信号驱动 I/O

5.jpeg

信号驱动 I/O 并不常用,它是一种半异步的 I/O 模型。在使用信号驱动 I/O 时,当数据准备就绪后,内核通过发送一个 SIGIO 信号通知应用进程,应用进程就可以开始读取数据了。

异步 I/O

6.png

异步 I/O 最重要的一点是从内核缓冲区拷贝数据到用户态缓冲区的过程也是由系统异步完成,应用进程只需要在指定的数组中引用数据即可。异步 I/O 与信号驱动 I/O 这种半异步模式的主要区别:信号驱动 I/O 由内核通知何时可以开始一个 I/O 操作,而异步 I/O 由内核通知 I/O 操作何时已经完成。

了解了上述五种 I/O,那么 Netty 使用的是哪一种 I/O 模型呢?

二 Netty 是什么

先看看 Netty 官网 的描述

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty 是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

说白了就是,Netty 是一款用于高效开发网络应用的 NIO 网络框架,可以大大简化网络应用的开发过程。

三 谁在使用 Netty

Netty 经过很多出名产品在线上的大规模验证,其健壮性和稳定性都被业界认可,其中典型的产品如下:

  • 服务治理:Apache Dubbo、gRPC
  • 消息队列:RocketMQ、ActiveMQ
  • 大数据:Hbase、Spark、Flink、Storm
  • 搜索引擎:Elasticsearch

GitHub。截止至 2021 年 7 月,2.7w+ star,一共被 4w+ 的项目所使用。

更多产品可看 https://netty.io/wiki/related-projects.html

四 Netty 为什么这么流行

既然 Netty 是网络应用框架,那我们永远绕不开以下几个核心关注点:

  • I/O 模型、线程模型和事件处理机制;
  • 易用性 API 接口;
  • 对数据协议、序列化的支持。

之所以会最终选择 Netty,是因为 Netty 围绕这些核心要点可以做到尽善尽美,其健壮性、性能、可扩展性在同领域的框架中都首屈一指。

高性能

前面了解了上述五种 I/O,那么 Netty 如何实现自己的 I/O 模型的呢?

Netty 的 I/O 模型是基于非阻塞 I/O 实现的,底层依赖的是 JDK NIO 框架的多路复用器 Selector。一个多路复用器 Selector 可以同时轮询多个 Channel,采用 epoll 模式后,只需要一个线程负责 Selector 的轮询,就可以接入成千上万的客户端。

在 I/O 多路复用的场景下,当有数据处于就绪状态后,需要一个事件分发器(Event Dispather),它负责将读写事件分发给对应的读写事件处理器(Event Handler)

下图是 Netty 所采用的比较主流的线程模型,主从 Reactor 多线程模型,所有的 I/O 事件都注册到一个 I/O 多路复用器上,当有 I/O 事件准备就绪后,I/O 多路复用器会将该 I/O 事件通过事件分发器分发到对应的事件处理器中。该线程模型避免了同步问题以及多线程切换带来的资源开销,真正做到高性能、低延迟。

7.png

摘自 Lea D. Scalable IO in Java

比 Java NIO 更完善并且易用

在 JDK 1.4 投入使用之前,只有 BIO 一种模式。开发过程相对简单。新来一个连接就会创建一个新的线程处理。随着请求并发度的提升,BIO 很快遇到了性能瓶颈。JDK 1.4 以后开始引入了 NIO 技术,支持 select 和 poll;JDK 1.5 支持了 epoll。

既然 JDK NIO 性能已经非常优秀,为什么还要选择 Netty?主要是因为 Netty 做了 JDK 该做的事,但是做得更加完备,主要体现在一面几个方面:

  • 稳定 :Netty 更加可靠稳定,修复和完善了 JDK NIO 较多已知问题,例如臭名昭著的 select 空转导致 CPU 消耗 100%,TCP 断线重连,keep-alive 检测等问题。
  • 易用: 使用 JDK NIO 编程需要了解很多复杂的概念,比如 Channels、Selectors、Sockets、Buffers 等,编码复杂程度令人发指。相反,Netty 在 NIO 基础上进行了更高层次的封装,屏蔽了 NIO 的复杂性;Netty 封装了更加人性化的 API,统一的 API(阻塞/非阻塞) 大大降低了开发者的上手难度;与此同时,Netty 提供了很多开箱即用的工具,例如常用的行解码器、长度域解码器等,而这些在 JDK NIO 中都需要你自己实现。
  • 可扩展: Netty 的可扩展性在很多地方都有体现,比如:一个是可定制化的线程模型,用户可以通过启动的配置参数选择 Reactor 线程模型;另一个是可扩展的事件驱动模型,将框架层和业务层的关注点分离。大部分情况下,开发者只需要关注 ChannelHandler 的业务逻辑实现。

更低的资源消耗

作为网络通信框架,有大量的网络对象需要创建和销毁的问题,对于 JVM GC 很不友好。为了降低 JVM 垃圾回收的压力,Netty 主要采用了两种优化手段:

  • 对象池复用技术。 Netty 通过复用对象,避免频繁创建和销毁带来的开销。
  • 零拷贝技术。 除了操作系统级别的零拷贝技术外,Netty 提供了更多面向用户态的零拷贝技术,例如 Netty 在 I/O 读写时直接使用 DirectBuffer,从而避免了数据在堆内存和堆外内存之间的拷贝。

因为 Netty 不仅做到了高性能、低延迟以及更低的资源消耗,还完美弥补了 Java NIO 的缺陷,所以在网络编程时越来越受到开发者们的青睐。

五 Netty 的整体架构脉络

整体结构

Netty 官网给出了有关 Netty 的整体功能模块结构

Core 核心层

Core 核心层是 Netty 最精华的内容,它提供了底层网络通信的通用抽象和实现,包括可扩展的事件模型、通用的通信 API、支持零拷贝的 ByteBuf 等。

Protocol Support 协议支持层

协议支持层基本上覆盖了主流协议的编解码实现,如 HTTP、SSL、Protobuf、压缩、大文件传输、WebSocket、文本、二进制等主流协议,此外 Netty 还支持自定义应用层协议。Netty 丰富的协议支持降低了用户的开发成本,基于 Netty 我们可以快速开发 HTTP、WebSocket 等服务。

Transport Service 传输服务层

传输服务层提供了网络传输能力的定义和实现方法。它支持 Socket、HTTP 隧道、虚拟机管道等传输方式。Netty 对 TCP、UDP 等数据传输做了抽象和封装,用户可以更聚焦在业务逻辑实现上,而不必关系底层数据传输的细节。

Netty 的模块设计具备较高的通用性和可扩展性,它不仅是一个优秀的网络框架,还可以作为网络编程的工具箱。

逻辑结构

Netty 的逻辑处理架构为典型网络分层架构设计,共分为网络通信层、事件调度层、服务编排层,每一层各司其职。图中包含了 Netty 每一层所用到的核心组件。我将为你介绍 Netty 的每个逻辑分层中的各个核心组件以及组件之间是如何协调运作的。

9.png

网络通行层

网络通信层的核心组件包含BootStrap、ServerBootStrap、Channel三个组件。BootStrap 和 ServerBootStrap 分别负责客户端和服务端的启动,它们是非常强大的辅助工具类;Channel 是网络通信的载体,提供了与底层 Socket 交互的能力。

BootStrap & ServerBootStrap

Bootstrap 是“引导”的意思,它主要负责整个 Netty 程序的启动、初始化、服务器连接等过程,它相当于一条主线,串联了 Netty 的其他核心组件。

Netty 中的引导器共分为两种类型:一个为用于客户端引导的 Bootstrap,另一个为用于服务端引导的 ServerBootStrap。它们都继承自抽象类 AbstractBootstrap。

10.png

Channel

Channel 的字面意思是“通道”,它是网络通信的载体。Channel提供了基本的 API 用于网络 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己实现的 Channel 是以 JDK NIO Channel 为基础的,相比较于 JDK NIO,Netty 的 Channel 提供了更高层次的抽象,同时屏蔽了底层 Socket 的复杂性。

下图是 Channel 家族的图谱。AbstractChannel 是整个家族的基类

11.png

  • NioServerSocketChannel 异步 TCP 服务端 (使用比较多)
  • NioSocketChannel 异步 TCP 客户端 (使用比较多)
  • OioServerSocketChannel 同步 TCP 服务端
  • OioSocketChannel 同步 TCP 客户端
  • NioDatagramChannel 异步 UDP 连接
  • OioDatagramChannel 同步 UDP 连接

当然 Channel 会有多种状态,如连接建立、连接注册、数据读写、连接销毁等。随着状态的变化,Channel 处于不同的生命周期,每一种状态都会绑定相应的事件回调。

1
2
3
4
5
6
channelRegistered			Channel 创建后被注册到 EventLoop 上
channelUnregistered Channel 创建后未注册或者从 EventLoop 取消注册
channelActive Channel 处于就绪状态,可以被读写
channelInactive Channel 处于非就绪状态
channelRead Channel 可以从远端读取到数据
channelReadComplete Channel 读取数据完成

事件调度层

事件调度层的职责是通过 Reactor 线程模型对各类事件进行聚合处理,通过 Selector 主循环线程集成多种事件( I/O 事件、信号事件、定时事件等),实际的业务处理逻辑是交由服务编排层中相关的 Handler 完成。

EventLoopGroup & EventLoop

EventLoopGroup 本质是一个线程池,主要负责接收 I/O 请求,并分配线程执行处理请求。在下图中,我为你讲述了 EventLoopGroups、EventLoop 与 Channel 的关系。

00.png

从上图中,可以看出 EventLoopGroup、EventLoop、Channel 的几点关系。

  • 一个 EventLoopGroup 往往包含一个或者多个 EventLoop。EventLoop 用于处理 Channel 生命周期内的所有 I/O 事件,如 accept、connect、read、write 等 I/O 事件。
  • EventLoop 同一时间会与一个线程绑定,每个 EventLoop 负责处理多个 Channel。
  • 每新建一个 Channel,EventLoopGroup 会选择一个 EventLoop 与其绑定。该 Channel 在生命周期内都可以对 EventLoop 进行多次绑定和解绑。

服务编排层

服务编排层的职责是负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。

服务编排层的核心组件包括 ChannelPipeline、ChannelHandler、ChannelHandlerContext。

ChannelPipeline

ChannelPipeline 是 Netty 的核心编排组件,负责组装各种 ChannelHandler,实际数据的编解码以及加工处理操作都是由 ChannelHandler 完成的。ChannelPipeline 可以理解为ChannelHandler 的实例列表——内部通过双向链表将不同的 ChannelHandler 链接在一起。当 I/O 读写事件触发时,ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。

ChannelPipeline 是线程安全的,因为每一个新的 Channel 都会对应绑定一个新的 ChannelPipeline。一个 ChannelPipeline 关联一个 EventLoop,一个 EventLoop 仅会绑定一个线程

io.netty.channel.ChannelPipeline

企业微信截图_ceaf06b8-6a6f-4039-a075-bfbe7c8b454c.png

ChannelPipeline 中包含入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器,我结合客户端和服务端的数据收发流程来理解 Netty 的这两个概念。

12.png

客户端和服务端都有各自的 ChannelPipeline。以客户端为例,数据从客户端发向服务端,该过程称为出站,反之则称为入站。数据入站会由一系列 InBoundHandler 处理,然后再以相反方向的 OutBoundHandler 处理后完成出站。我们经常使用的编码 Encoder 是出站操作,解码 Decoder 是入站操作。服务端接收到客户端数据后,需要先经过 Decoder 入站处理后,再通过 Encoder 出站通知客户端。所以客户端和服务端一次完整的请求应答过程可以分为三个步骤:客户端出站(请求数据)、服务端入站(解析数据并执行业务逻辑)、服务端出站(响应结果)。

ChannelHandler & ChannelHandlerContext

每创建一个 Channel 都会绑定一个新的 ChannelPipeline,ChannelPipeline 中每加入一个 ChannelHandler 都会绑定一个 ChannelHandlerContext。

13.png

ChannelHandlerContext 用于保存 ChannelHandler 上下文,通过 ChannelHandlerContext 可以实现 ChannelHandler 之间的交互,ChannelHandlerContext 包含了 ChannelHandler 生命周期的所有事件,如 connect、bind、read、flush、write、close 等。

多了有 ChannelHandlerContext 这层模型抽象,可以避免在 ChannelHandler 中写很多重复的代码。

Netty 的架构分层设计得非常合理,屏蔽了底层 NIO 以及框架层的实现细节,对于业务开发者来说,只需要关注业务逻辑的编排和实现即可。

各个组件的交互流程

上面了解了 Netty 核心组件,那么组件之间是如何协作的呢?

14.png

  • 服务端启动初始化时有 Boss EventLoopGroup 和 Worker EventLoopGroup 两个组件,其中 Boss 负责监听网络连接事件。当有新的网络连接事件到达时,则将 Channel 注册到 Worker EventLoopGroup。
  • Worker EventLoopGroup 会被分配一个 EventLoop 负责处理该 Channel 的读写事件。每个 EventLoop 都是单线程的,通过 Selector 进行事件循环。
  • 当客户端发起 I/O 读写事件时,服务端 EventLoop 会进行数据的读取,然后通过 Pipeline 触发各种监听器进行数据的加工处理。
  • 客户端数据会被传递到 ChannelPipeline 的第一个 ChannelInboundHandler 中,数据处理完成后,将加工完成的数据传递给下一个 ChannelInboundHandler。
  • 当数据写回客户端时,会将处理结果在 ChannelPipeline 的 ChannelOutboundHandler 中传播,最后到达客户端。

源码结构

15.png

Core 核心层模块

netty-common模块是 Netty 的核心基础包,提供了丰富的工具类,其他模块都需要依赖它。在 common 模块中,常用的包括通用工具类和自定义并发包。

Protocol Support 协议支持层模块

netty-codec模块主要负责编解码工作,通过编解码实现原始字节数据与业务实体对象之间的相互转化。Netty 支持了大多数业界主流协议的编解码器,如 HTTP、HTTP2、Redis、XML 等,为开发者节省了大量的精力。此外该模块提供了抽象的编解码类 ByteToMessageDecoder 和 MessageToByteEncoder,通过继承这两个类我们可以轻松实现自定义的编解码逻辑。

16.png

Transport Service 传输服务层模块

netty-transport 模块可以说是 Netty 提供数据处理和传输的核心模块。该模块提供了很多非常重要的接口,如 Bootstrap、Channel、ChannelHandler、EventLoop、EventLoopGroup、ChannelPipeline 等。其中 Bootstrap 负责客户端或服务端的启动工作,包括创建、初始化 Channel 等;EventLoop 负责向注册的 Channel 发起 I/O 读写操作;ChannelPipeline 负责 ChannelHandler 的有序编排

源码 https://github.com/netty/netty

六 Netty 的精髓 EvenLoop

网络框架的设计离不开 I/O 线程模型,线程模型的优劣直接决定了系统的吞吐量、可扩展性、安全性等。

目前主流的网络框架几乎都采用了 I/O 多路复用的方案。Reactor 模式作为其中的事件分发器,负责将读写事件分发给对应的读写事件处理者。

大名鼎鼎的 Java 并发包作者 Doug Lea,在 Scalable I/O in Java 一文中阐述了服务端开发中 I/O 模型的演进过程。Netty 中三种 Reactor 线程模型也来源于这篇经典文章。下面对这三种 Reactor 线程模型做一个详细的分析。

三种 Reactor 线程模型

单线程模型

17.png

在 Reactor 单线程模型中,所有 I/O 操作(包括连接建立、数据读写、事件分发等),都是由一个线程完成的。单线程模型逻辑简单,缺陷也十分明显:

  • 一个线程支持处理的连接数非常有限,CPU 很容易打满,性能方面有明显瓶颈;
  • 当多个事件被同时触发时,只要有一个事件没有处理完,其他后面的事件就无法执行,这就会造成消息积压及请求超时;
  • 线程在处理 I/O 事件时,Select 无法同时处理连接建立、事件分发等操作;
  • 如果 I/O 线程一直处于满负荷状态,很可能造成服务端节点不可用。

多线程模型

18.png

由于单线程模型有性能方面的瓶颈,多线程模型作为解决方案就应运而生了。

Reactor 多线程模型将业务逻辑交给多个线程进行处理。除此之外,多线程模型其他的操作与单线程模型是类似的,例如读取数据依然保留了串行化的设计。当客户端有数据发送至服务端时,Select 会监听到可读事件,数据读取完毕后提交到业务线程池中并发处理。

主从多线程模型

19.png

主从多线程模型由多个 Reactor 线程组成,每个 Reactor 线程都有独立的 Selector 对象。MainReactor 仅负责处理客户端连接的 Accept 事件,连接建立成功后将新创建的连接对象注册至 SubReactor。再由 SubReactor 分配线程池中的 I/O 线程与其连接绑定,它将负责连接生命周期内所有的 I/O 事件。

Netty 推荐使用主从多线程模型,这样就可以轻松达到成千上万规模的客户端连接。在海量客户端并发请求的场景下,主从多线程模式甚至可以适当增加 SubReactor 线程的数量,从而利用多核能力提升系统的吞吐量。

Reactor 线程模型运行机制

前面介绍了三种 Reactor 线程模型,能大致总结出 Reactor 线程模型运行机制的四个步骤,分别为连接注册、事件轮询、事件分发、任务处理,如下图所示。

20.png

  • 连接注册:Channel 建立后,注册至 Reactor 线程中的 Selector 选择器。
  • 事件轮询:轮询 Selector 选择器中已注册的所有 Channel 的 I/O 事件。
  • 事件分发:为准备就绪的 I/O 事件分配相应的处理线程。
  • 任务处理:Reactor 线程还负责任务队列中的非 I/O 任务,每个 Worker 线程从各自维护的任务队列中取出任务异步执行。

以上便是 Reactor 线程模型的演进过程和基本原理,而Netty 也同样遵循 Reactor 线程模型的运行机制。下面再来看看 EventLoop 作为 Netty Reactor 线程模型的核心处理引擎,是如何高效地实现事件循环和任务处理机制的?

EventLoop 是什么?

EventLoop 这个概念其实并不是 Netty 独有的,它是一种事件等待和处理的程序模型,可以解决多线程资源消耗高的问题。

Netty 中的 EventLoop

下面再看另外一张展示 EventLoop 通用运行模式的图:

21.png

每当事件发生时,应用程序都会将产生的事件放入事件队列当中,然后 EventLoop 会轮询从队列中取出事件执行或者将事件分发给相应的事件监听者执行。事件执行的方式通常分为立即执行、延后执行、定期执行几种。

Node.js 基于EventLoop 的运行机制

例如 Node.js 就采用了 EventLoop 的运行机制,不仅占用资源低,而且能够支撑了大规模的流量访问。 如下图所示:图片来源@BusyRich

22.png

根据上图,Node.js的运行机制如下:

(1)V8引擎解析JavaScript脚本。

(2)解析后的代码,调用Node API。

(3)libuv库负责Node API的执行。它将不同的任务分配给不同的线程,形成一个Event Loop(事件循环),以异步的方式将任务的执行结果返回给V8引擎。

(4)V8引擎再将结果返回给用户。

拓展阅读:

JavaScript 运行机制详解:再谈Event Loop

理解浏览器与Nodejs中的event loop

Redis 的事件循环器-EventLoop的应用

Redis作为一个单线程高性能的内存缓存, 能够达到如此高的性能,主要是因为Redis内置了一个高性能事件循环器AE,也是基于 EventLoop 的实现。

23.png

核心代码主要是event selector 和 event processor。

event selector 其实就是简单的while true系循环, 执行顺序如下

  1. 执行event loop前置的钩子函数 beforesleep
  2. 调用event processor函数: aeProcessEvents执行所有队列中的IO事件 和 时间事件
1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); //处理IO事件 和 时间事件
}
}

event processor 执行顺序如下:

3.1 从链表中获取待执行的io event
3.2 在循环中顺序执行io event
3.3 check是否有待执行的time event, 如果有会执行time event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
... 代码省略
// poll所有带执行event
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);

// 执行event... 此处代码很多直接忽略
for (j = 0; j < numevents; j++) {
... 代码省略
processed++;
}
... 代码省略
// 在所有io event执行完后, 会check是否有需要执行的time event
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
}

Netty 如何实现 EventLoop

在 Netty 中 EventLoop 可以理解为 Reactor 线程模型的事件处理引擎,每个 EventLoop 线程都维护一个 Selector 选择器和任务队列 taskQueue。它主要负责处理 I/O 事件、普通任务和定时任务。

Netty 中推荐使用 NioEventLoop 作为实现类,以 NioEventLoop 实现 为例:

@see io.netty.channel.nio.NioEventLoop#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false)); // 轮询 I/O 事件
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                }
            } catch (IOException e) {
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys(); // 处理 I/O 事件
                } finally {
                    runAllTasks(); // 处理所有任务
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys(); // 处理 I/O 事件
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 处理完 I/O 事件,再处理异步任务队列
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

NioEventLoop 的 run() 方法是一个无限循环,没有任何退出条件,在不间断循环执行以下三件事情

24.png

  • 轮询 I/O 事件(select):轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件。
  • 处理 I/O 事件(processSelectedKeys):处理已经准备就绪的 I/O 事件。
  • 处理异步任务队列(runAllTasks):Reactor 线程还有一个非常重要的职责,就是处理任务队列中的非 I/O 任务。Netty 提供了 ioRatio 参数用于调整 I/O 事件处理和任务处理的时间比例。

下面再来看看Netty是如何进行事件处理和任务处理的。

事件处理机制

25.png

NioEventLoop 的事件处理机制采用的是无锁串行化的设计思路。

  • BossEventLoopGroup 和 WorkerEventLoopGroup 包含一个或者多个 NioEventLoop。BossEventLoopGroup 负责监听客户端的 Accept 事件,当事件触发时,将事件注册至 WorkerEventLoopGroup 中的一个 NioEventLoop 上。每新建一个 Channel, 只选择一个 NioEventLoop 与其绑定。所以说 Channel 生命周期的所有事件处理都是线程独立的,不同的 NioEventLoop 线程之间不会发生任何交集。
  • NioEventLoop 完成数据读取后,会调用绑定的 ChannelPipeline 进行事件传播,ChannelPipeline 也是线程安全的,数据会被传递到 ChannelPipeline 的第一个 ChannelHandler 中。数据处理完成后,将加工完成的数据再传递给下一个 ChannelHandler,整个过程是串行化执行,不会发生线程上下文切换的问题。

NioEventLoop 无锁串行化的设计不仅使系统吞吐量达到最大化,而且降低了用户开发业务逻辑的难度,不需要花太多精力关心线程安全问题。

需要特别注意,虽然单线程执行避免了线程切换,但是它的缺陷就是不能执行时间过长的 I/O 操作,一旦某个 I/O 事件发生阻塞,那么后续的所有 I/O 事件都无法执行,甚至造成事件积压。在使用 Netty 进行程序开发时,一定要对 ChannelHandler 的实现逻辑有充分的风险意识。

任务处理机制

NioEventLoop 不仅负责处理 I/O 事件,还要兼顾执行任务队列中的任务。任务队列遵循 FIFO 规则,可以保证任务执行的公平性。NioEventLoop 处理的任务类型基本可以分为三类。

  • 普通任务:通过 NioEventLoop 的 execute() 方法向任务队列 taskQueue 中添加任务。例如 Netty 在写数据时会封装 WriteAndFlushTask 提交给 taskQueue。taskQueue 的实现类是多生产者单消费者队列 MpscChunkedArrayQueue,在多线程并发添加任务时,可以保证线程安全。
  • 定时任务:通过调用 NioEventLoop 的 schedule() 方法向定时任务队列 scheduledTaskQueue 添加一个定时任务,用于周期性执行该任务。例如,心跳消息发送等。定时任务队列 scheduledTaskQueue 采用优先队列 PriorityQueue 实现。
  • 尾部队列:tailTasks 相比于普通任务队列优先级较低,在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。尾部任务并不常用,主要用于做一些收尾工作,例如统计事件循环的执行时间、监控信息上报等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected boolean runAllTasks(long timeoutNanos) {
    // 1. 合并定时任务到普通任务队列
    fetchFromScheduledTaskQueue();
    // 2. 从普通任务队列中取出任务
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
    // 3. 计算任务处理的超时时间
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 4. 安全执行任务
        safeExecute(task);
        runTasks ++;
        // 5. 每执行 64 个任务检查一下是否超时
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // 6. 收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}
  1. fetchFromScheduledTaskQueue 函数:将定时任务从 scheduledTaskQueue 中取出,聚合放入普通任务队列 taskQueue 中,只有定时任务的截止时间小于当前时间才可以被合并。
  2. 从普通任务队列 taskQueue 中取出任务。
  3. 计算任务执行的最大超时时间。
  4. safeExecute 函数:安全执行任务,实际直接调用的 Runnable 的 run() 方法。
  5. 每执行 64 个任务进行超时时间的检查,如果执行时间大于最大超时时间,则立即停止执行任务,避免影响下一轮的 I/O 事件的处理。
  6. 最后获取尾部队列中的任务执行。

EventLoop 的最佳实践

  • 网络连接建立过程中三次握手、安全认证的过程会消耗不少时间。这里建议采用 Boss 和 Worker 两个 EventLoopGroup,有助于分担 Reactor 线程的压力。
  • 由于 Reactor 线程模式适合处理耗时短的任务场景,对于耗时较长的 ChannelHandler 可以考虑维护一个业务线程池,将编解码后的数据封装成 Task 进行异步处理,避免 ChannelHandler 阻塞而造成 EventLoop 不可用。
  • 如果业务逻辑执行时间较短,建议直接在 ChannelHandler 中执行。例如编解码操作,这样可以避免过度设计而造成架构的复杂性。
  • 不宜设计过多的 ChannelHandler。对于系统性能和可维护性都会存在问题,在设计业务架构的时候,需要明确业务分层和 Netty 分层之间的界限。不要一味地将业务逻辑都添加到 ChannelHandler 中。

EventLoop 小结

EventLoop 的设计思想被运用于较多的高性能框架中,如 Redis、Nginx、Node.js 等。

下面对 Netty EventLoop 做一个简单的总结归纳:

  • MainReactor 线程:处理客户端请求接入。
  • SubReactor 线程:数据读取、I/O 事件的分发与执行。
  • 任务处理线程:用于执行普通任务或者定时任务,如空闲连接检测、心跳上报等。

七 Netty 的编解码

Netty 如何解决网络传输过程的粘包拆包问题

为什么有拆包/粘包?

TCP 传输协议是面向流的,没有数据包界限。客户端向服务端发送数据时,可能将一个完整的报文拆分成多个小报文进行发送,也可能将多个报文合并成一个大的报文进行发送。因此就有了拆包和粘包。

26.png

在客户端和服务端通信的过程中,服务端一次读到的数据大小是不确定的。如上图所示,拆包/粘包可能会出现以下五种情况:

  • 服务端恰巧读到了两个完整的数据包 A 和 B,没有出现拆包/粘包问题;
  • 服务端接收到 A 和 B 粘在一起的数据包,服务端需要解析出 A 和 B;
  • 服务端收到完整的 A 和 B 的一部分数据包 B-1,服务端需要解析出完整的 A,并等待读取完整的 B 数据包;
  • 服务端接收到 A 的一部分数据包 A-1,此时需要等待接收到完整的 A 数据包;
  • 数据包 A 较大,服务端需要多次才可以接收完数据包 A。

据接收方很难界定数据包的边界在哪里,很难识别出一个完整的数据包。所以需要提供一种机制来识别数据包的界限,这也是解决拆包/粘包的唯一方法:定义应用层的通信协议。

有 消息长度固定、特定分隔符、消息长度 + 消息内容 等方式:

其中,消息长度 + 消息内容是项目开发中最常用的一种协议,如下图所示:

1
2
3
4
消息头     消息体
+--------+----------+
| Length |  Content |
+--------+----------+

消息头中存放消息的总长度,例如使用 4 字节的 int 值记录消息的长度,消息体实际的二进制的字节数据。接收方在解析数据时,首先读取消息头的长度字段 Length,然后紧接着读取长度为 Len 的字节数据,该数据即判定为一个完整的数据报文。

如何判断 ByteBuf 是否存在完整的报文?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/*
+---------------------------------------------------------------+
| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |
+---------------------------------------------------------------+
| 状态 1byte |        保留字段 4byte     |      数据长度 4byte     | 
+---------------------------------------------------------------+
|                   数据内容 (长度不定)                          |
+---------------------------------------------------------------+
 */
 
 @Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    // 判断 ByteBuf 可读取字节
    if (in.readableBytes() < 14) { 
        return;
    }
    in.markReaderIndex(); // 标记 ByteBuf 读指针位置
    in.skipBytes(2); // 跳过魔数
    in.skipBytes(1); // 跳过协议版本号
    byte serializeType = in.readByte();
    in.skipBytes(1); // 跳过报文类型
    in.skipBytes(1); // 跳过状态字段
    in.skipBytes(4); // 跳过保留字段
    int dataLength = in.readInt();
    if (in.readableBytes() < dataLength) {
        in.resetReaderIndex(); // 重置 ByteBuf 读指针位置
        return;
    }
    byte[] data = new byte[dataLength];
    in.readBytes(data);
    SerializeService serializeService = getSerializeServiceByType(serializeType);
    Object obj = serializeService.deserialize(data);
    if (obj != null) {
        out.add(obj);
    }
}

Netty 的常用解码器

  • 一次编解码器:MessageToByteEncoder/ByteToMessageDecoder
  • 二次编解码器:MessageToMessageEncoder/MessageToMessageDecoder

27.png

八 Netty 的内存管理

Netty 为什么要使用堆外内存?

在 Java 中对象都是在堆内分配的,通常我们说的JVM 内存也就指的堆内内存,堆内内存完全被JVM 虚拟机所管理,JVM 有自己的垃圾回收算法,对于使用者来说不必关心对象的内存如何回收。

28.png

堆外内存和堆内内存各有利弊:

  • 堆内内存由 JVM GC 自动回收内存,降低 Java开发者的难度,但是 GC 是需要时间开销成本的,堆外内存由于不受 JVM 管理,所以在一定程度上可以降低 GC 对应用运行时带来的影响。
  • 堆外内存需要手动释放,这一点跟 C/C++ 很像,稍有不慎就会造成应用程序内存泄漏,当出现内存泄漏问题时排查起来会相对困难。
  • 当进行网络 I/O 操作、文件读写时,堆内内存都需要转换为堆外内存,然后再与底层设备进行交互,所以直接使用堆外内存可以减少一次内存拷贝。(最重要的一点)
  • 堆外内存可以实现进程之间、JVM 多实例之间的数据共享。

为了实现高效的 I/O 操作、缓存常用的对象、不受 JVM 约束,同时降低 JVM GC 压力,降低对业务应用的影响,

Netty 选择使用堆外内存。

Netty 如何实现高性能的内存管理?

// TODO

九 Netty 的使用例子

以一个简单的 HTTP 服务器开始,通过程序示例为你展现 Netty 程序如何配置启动,以及引导器如何与核心组件产生联系。

服务端启动类

Netty 服务端的启动过程大致分为三个步骤:

  • 配置线程池;
  • Channel 初始化;
  • 端口绑定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class HttpServer {
    public void start(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline()
                                    .addLast("codec", new HttpServerCodec())                  // HTTP 编解码
                                    .addLast("compressor", new HttpContentCompressor())       // HttpContent 压缩
                                    .addLast("aggregator", new HttpObjectAggregator(65536))   // HTTP 消息聚合
                                    .addLast("handler", new HttpServerHandler());             // 自定义业务逻辑处理器
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture f = b.bind().sync(); //bind() 方法会真正触发启动,sync() 方法则会阻塞,直至整个启动过程完成
            System.out.println("Http Server started, Listening on " + port);
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new HttpServer().start(8088);
    }
}

详细的步骤就是:

  1. 配置 EventLoopGroup 线程组
  2. 配置 Channel 的类型
  3. 设置 ServerSocketChannel 对应的 Handler
  4. 设置网络监听的端口
  5. 设置 SocketChannel 对应的 Handler
  6. 配置 Channel 参数

HTTP 客户端类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class HttpClient {
    public void connect(String host, int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new HttpResponseDecoder());
                    ch.pipeline().addLast(new HttpRequestEncoder());
                    ch.pipeline().addLast(new HttpClientHandler());
                }
            });
            ChannelFuture f = b.connect(host, port).sync();
            URI uri = new URI("http://127.0.0.1:8088");
            String content = "hello world";
            DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
                    uri.toASCIIString(), Unpooled.wrappedBuffer(content.getBytes(StandardCharsets.UTF_8)));
            request.headers().set(HttpHeaderNames.HOST, host);
            request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
            f.channel().write(request);
            f.channel().flush();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        HttpClient client = new HttpClient();
        client.connect("127.0.0.1", 8088);
    }
}

十 其他

Netty 是如何解决 JDK epoll 空轮询问题的

JDK Epoll空轮询bug

在 JDK 中, Epoll 的实现是存在漏洞的,即使 Selector 轮询的事件列表为空,NIO 线程一样可以被唤醒,导致 CPU 100% 占用。这就是臭名昭著的 JDK epoll 空轮询的 Bug。Netty 作为一个高性能、高可靠的网络框架,需要保证 I/O 线程的安全性。那么它是如何解决 JDK epoll 空轮询的 Bug 呢?实际上 Netty 并没有从根源上解决该问题,而是巧妙地规避了这个问题。

直接定位到事件轮询 select() 方法中的最后一部分代码,看下 Netty 是如何解决 epoll 空轮询的 Bug。

@see io.netty.channel.nio.NioEventLoop#select

1
2
3
4
5
6
7
8
9
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    selector = selectRebuildSelector(selectCnt);
    selectCnt = 1;
    break;
}

Netty 提供了一种检测机制判断线程是否可能陷入空轮询,如下:

  1. 每次执行 Select 操作之前记录当前时间 currentTimeNanos。
  2. time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos,如果事件轮询的持续时间大于等于 timeoutMillis,那么说明是正常的,否则表明阻塞时间并未达到预期,可能触发了空轮询的 Bug。
  3. Netty 引入了计数变量 selectCnt。在正常情况下,selectCnt 会重置,否则会对 selectCnt 自增计数。当 selectCnt 达到 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 阈值时,会触发重建 Selector 对象。

Netty 采用这种方法巧妙地规避了 JDK Bug。异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector 上,重建完成之后就废弃掉异常的 Selector 。

Netty 是如何做到零拷贝的

从 Linux 操作系统的角度来说,零拷贝就是为了避免用户态和内核态之间的数据拷贝。无论是传统的数据拷贝还是使用零拷贝技术,其中有 2 次 DMA 的数据拷贝必不可少,只是这 2 次 DMA 拷贝都是依赖硬件来完成,不需要 CPU 参与。所以,在这里我们讨论的零拷贝是个广义的概念,只要能够减少不必要的 CPU 拷贝,都可以被称为零拷贝。

Linux 中的零拷贝技术

模拟一个场景,从文件中读取数据,然后将数据传输到网络上,那么传统的数据拷贝过程会分为哪几个阶段呢?具体如下图所示:

29.png

从数据读取到发送一共经历了四次数据拷贝,具体流程如下:

  1. 当用户进程发起 read() 调用后,上下文从用户态切换至内核态。DMA 引擎从文件中读取数据,并存储到内核态缓冲区,这里是第一次数据拷贝。
  2. 请求的数据从内核态缓冲区拷贝到用户态缓冲区,然后返回给用户进程。第二次数据拷贝的过程同时,会导致上下文从内核态再次切换到用户态。
  3. 用户进程调用 send() 方法期望将数据发送到网络中,此时会触发第三次线程切换,用户态会再次切换到内核态,请求的数据从用户态缓冲区被拷贝到 Socket 缓冲区。
  4. 最终 send() 系统调用结束返回给用户进程,发生了第四次上下文切换。第四次拷贝会异步执行,从 Socket 缓冲区拷贝到协议引擎中。

DMA(Direct Memory Access,直接内存存取),现代大部分硬盘都支持的特性,DMA 接管了数据读写的工作,不需要 CPU 再参与 I/O 中断的处理,从而减轻了 CPU 的负担。

传统的数据拷贝过程为什么不是将数据直接传输到用户缓冲区呢?其实引入内核缓冲区可以充当缓存的作用,这样就可以实现文件数据的预读,提升 I/O 的性能。

而二次和第三次拷贝是可以去除的,DMA 引擎从文件读取数据后放入到内核缓冲区,然后可以直接从内核缓冲区传输到 Socket 缓冲区,从而减少内存拷贝的次数。

在 Linux 中系统调用 sendfile() 可以实现将数据从一个文件描述符传输到另一个文件描述符,从而实现了零拷贝技术。在 Java 中也使用了零拷贝技术,它就是 NIO FileChannel 类中的 transferTo() 方法,transferTo() 底层就依赖了操作系统零拷贝的机制,它可以将数据从 FileChannel 直接传输到另外一个 Channel。

30.png

比较大的一个变化是,DMA 引擎从文件中读取数据拷贝到内核态缓冲区之后,由操作系统直接拷贝到 Socket 缓冲区,不再拷贝到用户态缓冲区,所以数据拷贝的次数从之前的 4 次减少到 3 次。

能否继续减少内核中的数据拷贝次数呢?在 Linux 2.4 版本之后,开发者对 Socket Buffer 追加一些 Descriptor 信息来进一步减少内核数据的复制。如下图所示,DMA 引擎读取文件内容并拷贝到内核缓冲区,然后并没有再拷贝到 Socket 缓冲区,只是将数据的长度以及位置信息被追加到 Socket 缓冲区,然后 DMA 引擎根据这些描述信息,直接从内核缓冲区读取数据并传输到协议引擎中,从而消除最后一次 CPU 拷贝。

31.png

Netty 的零拷贝技术

  • 使用堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝
  • 对 Linux 操作系零拷贝的功能封装,Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝,这属于操作系统级别的零拷贝
  • 通过 Unpooled.wrappedBuffer 可以将 byte 数组包装成 ByteBuf 对象,包装过程中不会产生内存拷贝
  • CompositeByteBuf 类,可以组合多个 Buffer 对象合并成一个逻辑上的对象,避免通过传统内存拷贝的方式将几个 Buffer 合并成一个大的 Buffer

Netty 的对象池复用技术

//TODO

Netty 实现了很多高性能的工具类

比 ThreadLocal 更快的 FastThreadLocal

io.netty.util.concurrent#FastThreadLocal

用于处理延迟任务的时间轮工具 HashedWheelTimer

io.netty.util.HashedWheelTimer

十一 Netty 的现状

Netty 官方提供 3.x、4.x 的稳定版本,之前一直处于测试阶段的 5.x 版本已被作者放弃维护。目前主流推荐 Netty 4.x 的稳定版本,Netty 3.x 到 4.x 版本发生了较大变化,属于不兼容升级:具体可以参考

https://netty.io/wiki/new-and-noteworthy-in-4.0.html

十二 参考资源

  1. 4.1.40.Final-SNAPSHOT
  1. Scalable IO in Java
  1. The Node.js Event Loop
  1. JDK Epoll空轮询bug
  1. 拉钩教育 《Netty 核心原理剖析》