怎么建设小说网站,做网站外包群,搜狗推广效果好吗,梧州龙圩文章目录 PreNetty主从Reactor线程模型服务端channel注册流程源码解读入口 serverBootstrap.bind(port)执行队列中的任务 #xff1a; AbstractUnsafe#register0注册 doRegister() 源码流程图 Pre
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketCh… 文章目录 PreNetty主从Reactor线程模型服务端channel注册流程源码解读入口 serverBootstrap.bind(port)执行队列中的任务 AbstractUnsafe#register0注册 doRegister() 源码流程图 Pre
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析 Netty主从Reactor线程模型
Netty 使用主从 Reactor 线程模型来处理并发连接和网络事件。
在 Netty 中通常有两种类型的线程池 Boss 线程池用于接受客户端连接请求并将接受到的连接注册到 Worker 线程池的 EventLoop 中。Boss 线程池中的线程负责监听 ServerSocketChannel并将接受到的连接分配给 Worker 线程池中的某个 EventLoop 处理。 Worker 线程池每个 Worker 线程池包含多个 EventLoop每个 EventLoop 负责处理一组连接的读写和事件处理。当一个连接被注册到某个 Worker 线程池的 EventLoop 中时该 EventLoop 将负责处理这个连接的所有事件包括读取数据、写入数据、处理网络事件等。 主从 Reactor 线程模型的工作流程如下 主线程池Boss 线程池负责监听 ServerSocketChannel 上的连接请求并将接受到的连接请求分配给 Worker 线程池中的某个 EventLoop。 Worker 线程池中的每个 EventLoop 都独立负责一组连接的读写和事件处理。当一个连接被注册到某个 EventLoop 上时该 EventLoop 将会不断地轮询连接上是否有可读事件或可写事件并在事件发生时进行相应的处理。 当有读写事件发生时EventLoop 将调用对应的 ChannelHandler 进行处理。这些 ChannelHandler 可以进行数据解析、业务逻辑处理等操作。 处理完事件后EventLoop 可能会将结果写回到连接中或者关闭连接等。
通过主从 Reactor 线程模型Netty 可以高效地处理大量的并发连接和网络事件提高了网络应用程序的性能和可扩展性。 服务端channel注册流程
在Netty中服务端Channel注册流程涉及以下几个关键步骤 创建ServerBootstrap实例: 首先需要创建一个ServerBootstrap实例它是Netty提供的用于启动服务端的引导类。 配置ServerBootstrap: 使用ServerBootstrap实例设置一系列参数包括线程模型、Channel类型、处理器等。 绑定端口并启动服务: 调用ServerBootstrap的bind方法指定端口并启动服务端。在bind方法内部会进行以下操作 创建NioServerSocketChannel实例用于表示服务端的Channel内部封装了Java NIO中的ServerSocketChannel。 初始化ChannelPipeline为NioServerSocketChannel实例创建一个ChannelPipeline对象用于管理ChannelHandler链。 创建ChannelInitializer并添加到ChannelPipelineChannelInitializer是一个特殊的ChannelHandler它用于在Channel注册到EventLoop之后初始化ChannelPipeline。在ChannelInitializer的initChannel方法中可以向ChannelPipeline中添加自定义的ChannelHandler。 获取EventLoopGroup并注册Channel从ServerBootstrap中获取Boss EventLoopGroup然后调用其register方法注册NioServerSocketChannel到EventLoop上。 注册Channel到EventLoop: 在调用register方法时会将NioServerSocketChannel注册到Boss EventLoop上。在注册过程中会执行以下操作 获取EventLoop根据配置从Boss EventLoopGroup中选择一个EventLoop。 调用EventLoop的register方法将NioServerSocketChannel注册到选定的EventLoop上。注册过程中会创建一个NioServerSocketChannelUnsafe实例来处理注册过程其中会调用底层的Java NIO方法将ServerSocketChannel注册到Selector上并监听ACCEPT事件。 事件处理: 一旦NioServerSocketChannel注册到了EventLoop上就会开始监听ACCEPT事件。当有新的连接接入时会触发ACCEPT事件EventLoop会调用相关的ChannelHandler进行处理如调用ChannelInitializer的initChannel方法添加用户自定义的ChannelHandler到新的连接的ChannelPipeline中。接着新的连接就可以接受和处理客户端的请求了。
通过以上流程服务端Channel在Netty中的注册过程就完成了它可以接受客户端的连接并将连接注册到EventLoop上进行事件处理。 源码解读
当我们梳理完
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
接下来让我们从下面这一行代码开始
ChannelFuture channelFuture serverBootstrap.bind(9000).sync();
channelFuture.channel().closeFuture().sync();这段代码用于启动服务端并阻塞当前线程直到服务端关闭。 serverBootstrap.bind(9000)调用serverBootstrap的bind()方法绑定端口9000并返回一个ChannelFuture对象表示绑定操作的异步结果。 .sync()调用sync()方法阻塞当前线程直到绑定操作完成。这样做是为了确保服务端在端口绑定完成后再继续执行后续代码。 channelFuture.channel().closeFuture().sync()获取channelFuture中的channel()然后调用其closeFuture()方法获取一个表示关闭操作的ChannelFuture对象。接着再次调用sync()方法阻塞当前线程直到关闭操作完成。这样做是为了让当前线程一直等待直到服务端关闭。
入口 serverBootstrap.bind(port)
这段代码是bind(int inetPort)方法的实现。
/*** Create a new {link Channel} and bind it.*/
public ChannelFuture bind(int inetPort) {// 调用bind方法传入一个InetSocketAddress对象该对象使用指定的端口号创建return bind(new InetSocketAddress(inetPort));
}创建一个新的Channel并绑定到指定的端口。 doBind(final SocketAddress localAddress) 这段代码是doBind(final SocketAddress localAddress)方法的实现。
private ChannelFuture doBind(final SocketAddress localAddress) {// 初始化并注册Channel并返回一个ChannelFuturefinal ChannelFuture regFuture initAndRegister();// 获取注册完成的Channelfinal Channel channel regFuture.channel();// 如果注册过程中发生异常则直接返回注册的ChannelFutureif (regFuture.cause() ! null) {return regFuture;}if (regFuture.isDone()) {// 如果注册已经完成则创建一个新的ChannelPromise并执行绑定操作ChannelPromise promise channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// 如果注册尚未完成则创建一个PendingRegistrationPromise并添加一个监听器等待注册完成后再执行绑定操作final PendingRegistrationPromise promise new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause future.cause();if (cause ! null) {// 如果注册过程中发生异常则直接设置失败状态promise.setFailure(cause);} else {// 注册成功后执行绑定操作promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}这段代码的作用是执行绑定操作并返回一个与绑定操作相关的ChannelFuture。 initAndRegister() final ChannelFuture initAndRegister() {Channel channel null;try {// 使用channelFactory创建一个新的Channel实例channel channelFactory.newChannel();// 对新创建的Channel进行初始化init(channel);} catch (Throwable t) {if (channel ! null) {// 如果初始化过程中发生异常关闭Channelchannel.unsafe().closeForcibly();// 创建一个新的ChannelPromise并设置失败状态return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// 如果channel为null则创建一个FailedChannel实例并设置失败状态return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 使用ChannelConfig的EventLoopGroup进行注册ChannelFuture regFuture config().group().register(channel);// 如果注册过程中发生异常则关闭Channelif (regFuture.cause() ! null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// 返回注册的ChannelFuturereturn regFuture;
}创建一个新的Channel实例并对其进行初始化然后使用EventLoopGroup将其注册到事件循环中。最后返回一个与注册操作相关的ChannelFuture。 channelFactory.newChannel() channelFactory.newChannel() 中的实现请移步 Netty Review - NioServerSocketChannel源码分析 init(channel) Override
void init(Channel channel) throws Exception {// 设置Channel的选项final MapChannelOption?, Object options options0();synchronized (options) {setChannelOptions(channel, options, logger);}// 设置Channel的属性final MapAttributeKey?, Object attrs attrs0();synchronized (attrs) {for (EntryAttributeKey?, Object e: attrs.entrySet()) {SuppressWarnings(unchecked)AttributeKeyObject key (AttributeKeyObject) e.getKey();channel.attr(key).set(e.getValue());}}// 获取Channel的PipelineChannelPipeline p channel.pipeline();// 复制当前的子组、子处理器、子选项和子属性final EventLoopGroup currentChildGroup childGroup;final ChannelHandler currentChildHandler childHandler;final EntryChannelOption?, Object[] currentChildOptions;final EntryAttributeKey?, Object[] currentChildAttrs;synchronized (childOptions) {currentChildOptions childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs childAttrs.entrySet().toArray(newAttrArray(0));}// 向Channel的Pipeline中添加一个ChannelInitializerp.addLast(new ChannelInitializerChannel() {Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline ch.pipeline();ChannelHandler handler config.handler();if (handler ! null) {// 添加用户配置的处理器到Pipeline中pipeline.addLast(handler);}// 在Channel的事件循环中执行ch.eventLoop().execute(new Runnable() {Overridepublic void run() {// 添加一个ServerBootstrapAcceptor到Pipeline中用于接收新连接pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}
init()方法的作用是初始化Channel设置Channel的选项和属性然后向Channel的Pipeline中添加一个ChannelInitializer该Initializer在Channel的事件循环中执行并向Pipeline中添加一个ServerBootstrapAcceptor用于接收新连接。 config().group().register(channel) Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}next() io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#nextprivate static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {// 使用原子整数来维护索引以确保在多线程环境中安全地获取下一个 EventExecutor 实例。private final AtomicInteger idx new AtomicInteger();// 存储所有可用的 EventExecutor 实例的数组。private final EventExecutor[] executors;// 构造方法接收一个 EventExecutor 实例数组作为参数并将其存储在 executors 成员变量中。PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors executors;}// 选择下一个要使用的 EventExecutor 实例。// 通过对索引进行按位与操作idx.getAndIncrement() executors.length - 1// 来确保索引始终在 executors 数组的有效范围内。// 由于 executors.length 必须是 2 的幂次方因此使用按位与运算可以有效地实现取模操作// 从而将索引限制在数组长度范围内。Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() executors.length - 1];}
}Override
public ChannelFuture register(final ChannelPromise promise) {// 检查传入的 promise 参数是否为 null如果为 null则抛出 NullPointerException 异常。ObjectUtil.checkNotNull(promise, promise);// 获取与该 promise 关联的 Channel 实例通过 unsafe() 方法获取 Channel 的 Unsafe 实例然后调用 register() 方法注册 Channel。// 这里调用的是 unsafe() 方法表示使用一种不安全的方式直接注册 Channel而不经过 EventLoop 的事件循环。// register() 方法将 Channel 注册到当前 EventLoop由 EventLoop 负责管理该 Channel。promise.channel().unsafe().register(this, promise);// 返回传入的 promise 对象即注册 Channel 的异步结果。return promise;
}AbstractUnsafe#register Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 检查 eventLoop 参数是否为 null如果为 null则抛出 NullPointerException 异常。if (eventLoop null) {throw new NullPointerException(eventLoop);}// 检查 Channel 是否已经注册到某个 EventLoop如果已经注册则设置 promise 的失败状态并返回。if (isRegistered()) {promise.setFailure(new IllegalStateException(registered to an event loop already));return;}// 检查传入的 eventLoop 是否与当前 Channel 兼容如果不兼容则设置 promise 的失败状态并返回。if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException(incompatible event loop type: eventLoop.getClass().getName()));return;}// 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。AbstractChannel.this.eventLoop eventLoop;// 判断当前线程是否在 eventLoop 的事件循环中如果是则直接调用 register0() 方法进行注册。if (eventLoop.inEventLoop()) {register0(promise);} else {// 如果当前线程不在 eventLoop 的事件循环中则通过 eventLoop.execute() 方法提交一个任务让 eventLoop 执行注册操作。try {eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 如果无法将注册任务提交给 eventLoop 执行则强制关闭 Channel并设置 promise 的失败状态。logger.warn(Force-closing a channel whose registration task was not accepted by an event loop: {},AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 检查 eventLoop 参数是否为 null如果为 null则抛出 NullPointerException 异常。if (eventLoop null) {throw new NullPointerException(eventLoop);}// 检查 Channel 是否已经注册到某个 EventLoop如果已经注册则设置 promise 的失败状态并返回。if (isRegistered()) {promise.setFailure(new IllegalStateException(registered to an event loop already));return;}// 检查传入的 eventLoop 是否与当前 Channel 兼容如果不兼容则设置 promise 的失败状态并返回。if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException(incompatible event loop type: eventLoop.getClass().getName()));return;}// 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。AbstractChannel.this.eventLoop eventLoop;// 判断当前线程是否在 eventLoop 的事件循环中如果是则直接调用 register0() 方法进行注册。if (eventLoop.inEventLoop()) {register0(promise);} else {// 如果当前线程不在 eventLoop 的事件循环中则通过 eventLoop.execute() 方法提交一个任务让 eventLoop 执行注册操作。try {eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 如果无法将注册任务提交给 eventLoop 执行则强制关闭 Channel并设置 promise 的失败状态。logger.warn(Force-closing a channel whose registration task was not accepted by an event loop: {},AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}进入到异步这里 try {eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});} catch (Throwable t) {....}eventLoop.execute Override
public void execute(Runnable task) {// 检查任务是否为nullif (task null) {throw new NullPointerException(task);}// 判断当前线程是否在EventLoop的事件循环中boolean inEventLoop inEventLoop();// 将任务添加到任务队列中addTask(task);// 如果当前线程不在EventLoop的事件循环中if (!inEventLoop) {// 启动一个新的线程来执行任务startThread();// 如果EventLoop已经被关闭if (isShutdown()) {boolean reject false;try {// 检查任务是否可以从任务队列中移除if (removeTask(task)) {reject true;}} catch (UnsupportedOperationException e) {// 任务队列不支持移除操作直接忽略异常希望在任务完全终止之前能够拾取到任务// 最坏的情况下在终止时进行记录}// 如果需要拒绝执行该任务if (reject) {reject();}}}// 如果不需要唤醒EventLoop来处理任务if (!addTaskWakesUp wakesUpForTask(task)) {// 如果当前线程不在EventLoop的事件循环中则唤醒EventLoop来处理任务wakeup(inEventLoop);}
}addTask(task); /*** 将任务添加到任务队列中如果实例在关闭之前被关闭则抛出{link RejectedExecutionException}。*/
protected void addTask(Runnable task) {// 检查任务是否为nullif (task null) {throw new NullPointerException(task);}// 如果无法将任务添加到队列中则拒绝执行该任务if (!offerTask(task)) {reject(task);}
}startThread(); private void startThread() {// 如果线程状态为未启动if (state ST_NOT_STARTED) {// 尝试将状态从未启动更改为已启动if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {// 启动线程doStartThread();} catch (Throwable cause) {// 如果启动线程时发生异常则将状态重置为未启动并抛出异常STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}
}doStartThread() private void doStartThread() {// 确保线程为空assert thread null;// 在执行器上执行一个新的 Runnableexecutor.execute(new Runnable() {Overridepublic void run() {// 将当前线程设置为执行线程thread Thread.currentThread();// 如果标记为已中断则中断线程if (interrupted) {thread.interrupt();}boolean success false;// 更新上次执行时间updateLastExecutionTime();try {// 运行单线程事件执行器的主要逻辑SingleThreadEventExecutor.this.run();// 执行成功success true;} catch (Throwable t) {// 捕获并记录执行期间的异常logger.warn(Unexpected exception from an event executor: , t);} finally {// 循环直到能够安全关闭for (;;) {int oldState state;// 尝试将状态更改为正在关闭if (oldState ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// 检查是否在循环结束时调用了 confirmShutdown()if (success gracefulShutdownStartTime 0) {if (logger.isErrorEnabled()) {logger.error(Buggy EventExecutor.class.getSimpleName() implementation; SingleThreadEventExecutor.class.getSimpleName() .confirmShutdown() must be called before run() implementation terminates.);}}try {// 运行所有剩余任务和关闭钩子for (;;) {if (confirmShutdown()) {break;}}} finally {try {// 清理资源cleanup();} finally {// 移除线程上的所有 FastThreadLocals因为线程即将终止并通知未来。// 用户可能会在未来上阻塞一旦解除阻塞JVM 可能会终止并开始卸载类。// 详情请参阅 https://github.com/netty/netty/issues/6596。FastThreadLocal.removeAll();// 设置执行器状态为终止STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);// 释放线程锁threadLock.release();// 如果任务队列不为空则记录警告日志if (!taskQueue.isEmpty()) {if (logger.isWarnEnabled()) {logger.warn(An event executor terminated with non-empty task queue ( taskQueue.size() ));}}// 设置终止未来为成功terminationFuture.setSuccess(null);}}}}});
}SingleThreadEventExecutor.this.run() Override
protected void run() {// for死循环for (;;) {try {try {// 根据选择策略计算下一步的操作switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:// 继续循环不进行任何操作continue;case SelectStrategy.BUSY_WAIT:// 忙等待策略由于 NIO 不支持忙等待因此执行 select 操作// 不会直接执行 select 方法而是会先设置 wakenUp 为 false然后执行 select 方法// 如果 wakenUp 为 true则再次唤醒 Selector// 这样做是为了减少唤醒 Selector 的开销// 但是存在一个竞态条件如果 wakenUp 在 select 之前设置为 true则会导致不必要的唤醒// 因此在 select 之后需要再次判断 wakenUp 是否为 true如果是则再次唤醒 Selectorselect(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// 继续执行后续操作default:}} catch (IOException e) {// 如果在这里收到 IOException则表示 Selector 出现问题需要重建 Selector 并重试rebuildSelector0();handleLoopException(e);continue;}cancelledKeys 0;needsToSelectAgain false;final int ioRatio this.ioRatio;if (ioRatio 100) {// 如果 ioRatio 为 100%则优先处理所有的 IO 事件然后再执行所有任务try {processSelectedKeys();} finally {// 确保始终运行所有任务runAllTasks();}} else {// 如果 ioRatio 不为 100%则按比例处理 IO 事件和任务final long ioStartTime System.nanoTime();try {processSelectedKeys();} finally {// 确保始终运行所有任务final long ioTime System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {// 处理循环中的异常handleLoopException(t);}// 即使循环处理中抛出异常也始终处理关闭try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {// 处理关闭过程中的异常handleLoopException(t);}}
}select(wakenUp.getAndSet(false)) private void select(boolean oldWakenUp) throws IOException {Selector selector this.selector;try {int selectCnt 0;long currentTimeNanos System.nanoTime();long selectDeadLineNanos currentTimeNanos delayNanos(currentTimeNanos);for (;;) {long timeoutMillis (selectDeadLineNanos - currentTimeNanos 500000L) / 1000000L;if (timeoutMillis 0) {// 如果超时时间小于等于 0则立即进行非阻塞的 selectNow 操作if (selectCnt 0) {selector.selectNow();selectCnt 1;}break;}// 如果有任务并且 wakenUp 的值为 true则立即进行非阻塞的 selectNow 操作// 这是为了确保任务在 select 操作之前已经被执行if (hasTasks() wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt 1;break;}// 执行阻塞的 select 操作并记录选择的次数int selectedKeys selector.select(timeoutMillis);selectCnt;// 如果 select 操作返回了结果或者已经被唤醒或者有任务待执行则立即退出循环if (selectedKeys ! 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}// 如果当前线程被中断则重置选择的键并退出循环if (Thread.interrupted()) {selectCnt 1;break;}// 更新当前时间并判断是否需要继续循环long time System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) currentTimeNanos) {selectCnt 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD 0 selectCnt SELECTOR_AUTO_REBUILD_THRESHOLD) {// 如果 select 次数超过阈值则重建 Selectorselector selectRebuildSelector(selectCnt);selectCnt 1;break;}currentTimeNanos time;}// 如果 select 次数超过了预设的阈值则输出警告日志if (selectCnt MIN_PREMATURE_SELECTOR_RETURNS) {logger.debug(Selector.select() returned prematurely {} times in a row for Selector {}.,selectCnt - 1, selector);}} catch (CancelledKeyException e) {// 取消键异常通常是无害的只输出调试日志即可logger.debug(CancelledKeyException.class.getSimpleName() raised by a Selector {} - JDK bug?,selector, e);}
}int selectedKeys selector.select(timeoutMillis); 熟悉的NIO代码。 processSelectedKeys() private void processSelectedKeys() {// 如果使用了优化的 selectedKeys 集合则调用优化过的处理方法if (selectedKeys ! null) {processSelectedKeysOptimized();} else {// 否则调用普通的处理方法并传入 selector.selectedKeys() 作为参数processSelectedKeysPlain(selector.selectedKeys());}
}private void processSelectedKeysOptimized() {// 遍历优化过的 selectedKeys 集合for (int i 0; i selectedKeys.size; i) {// 获取当前索引处的 SelectionKeyfinal SelectionKey k selectedKeys.keys[i];// 将数组中的该元素置为 null以便在 Channel 关闭后可以被垃圾回收selectedKeys.keys[i] null;// 获取 SelectionKey 对应的附件final Object a k.attachment();// 如果附件是 AbstractNioChannel 类型则调用 processSelectedKey 方法处理if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {// 否则认为附件是 NioTask 类型将其转换为 NioTaskSelectableChannel 并调用 processSelectedKey 方法处理SuppressWarnings(unchecked)NioTaskSelectableChannel task (NioTaskSelectableChannel) a;processSelectedKey(k, task);}// 如果需要再次进行 select则重置 selectedKeys 集合并再次进行 selectif (needsToSelectAgain) {// 将数组中的剩余元素置为 null以便在 Channel 关闭后可以被垃圾回收selectedKeys.reset(i 1);// 再次进行 select 操作selectAgain();// 重置索引为 -1使循环从 0 开始i -1;}}
}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {// 获取与 Channel 相关联的 NioUnsafe 对象final AbstractNioChannel.NioUnsafe unsafe ch.unsafe();// 如果 SelectionKey 不再有效则关闭 Channelif (!k.isValid()) {final EventLoop eventLoop;try {eventLoop ch.eventLoop();} catch (Throwable ignored) {// 如果 Channel 实现抛出异常表示没有事件循环我们忽略此异常// 因为我们只想确定 ch 是否注册到此事件循环因此具有关闭 ch 的权限return;}// 只有在 ch 仍然注册到此 EventLoop 时才关闭 ch// 如果 ch 已从事件循环中注销则 SelectionKey 可能作为注销过程的一部分被取消注册// 但是通道仍然健康且不应关闭。// 详见 https://github.com/netty/netty/issues/5125if (eventLoop ! this || eventLoop null) {return;}// 关闭 Channelunsafe.close(unsafe.voidPromise());return;}try {int readyOps k.readyOps();// 如果 OP_CONNECT 位被设置则调用 finishConnect() 完成连接if ((readyOps SelectionKey.OP_CONNECT) ! 0) {// 移除 OP_CONNECT 位否则 Selector.select(..) 将总是立即返回而不阻塞// 详见 https://github.com/netty/netty/issues/924int ops k.interestOps();ops ~SelectionKey.OP_CONNECT;k.interestOps(ops);// 完成连接unsafe.finishConnect();}// 先处理 OP_WRITE因为我们可能能够写入一些排队的缓冲区从而释放内存if ((readyOps SelectionKey.OP_WRITE) ! 0) {// 调用 forceFlush() 方法该方法还会负责在没有剩余可写内容时清除 OP_WRITE 位ch.unsafe().forceFlush();}// 如果 OP_READ 或 OP_ACCEPT 被设置或者 readyOps 为 0则进行读取操作if ((readyOps (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! 0 || readyOps 0) {unsafe.read();}} catch (CancelledKeyException ignored) {// 如果出现 CancelledKeyException 异常则关闭 Channelunsafe.close(unsafe.voidPromise());}
}runAllTasks() /*** 从任务队列中获取并运行所有任务。* * return 如果至少运行了一个任务则返回 true*/
protected boolean runAllTasks() {// 断言当前线程在事件循环中assert inEventLoop();boolean fetchedAll;boolean ranAtLeastOne false;do {// 从计划任务队列中获取任务fetchedAll fetchFromScheduledTaskQueue();// 运行任务队列中的所有任务if (runAllTasksFrom(taskQueue)) {ranAtLeastOne true;}} while (!fetchedAll); // 继续处理直到获取所有计划任务。// 如果至少运行了一个任务则更新最后执行时间if (ranAtLeastOne) {lastExecutionTime ScheduledFutureTask.nanoTime();}// 在运行完所有任务后执行的操作afterRunningAllTasks();return ranAtLeastOne;
}/*** 从传入的任务队列中运行所有任务。** param taskQueue 要轮询和执行所有任务的任务队列。** return 如果至少执行了一个任务则返回 true。*/
protected final boolean runAllTasksFrom(QueueRunnable taskQueue) {// 从任务队列中轮询出一个任务Runnable task pollTaskFrom(taskQueue);if (task null) {return false;}for (;;) {// 执行任务safeExecute(task);// 继续轮询下一个任务task pollTaskFrom(taskQueue);if (task null) {return true;}}
}/*** 从任务队列中轮询出一个任务。** param taskQueue 要轮询的任务队列。** return 任务队列中的下一个任务如果没有任务则返回 null。*/
protected static Runnable pollTaskFrom(QueueRunnable taskQueue) {for (;;) {// 从任务队列中轮询出一个任务Runnable task taskQueue.poll();// 如果轮询出来的任务是 WAKEUP_TASK则继续轮询下一个任务if (task WAKEUP_TASK) {continue;}return task;}
}取出来的任务就是 执行队列中的任务 AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {// 检查通道是否仍然打开因为在注册调用之外的时间内通道可能已关闭// 设置 ChannelPromise 为不可取消状态以确保注册成功后无法取消// 并且检查通道是否仍然打开if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}// 判断是否是第一次注册boolean firstRegistration neverRegistered;// 执行具体的注册逻辑doRegister();// 标记通道已经注册过neverRegistered false;registered true;// 在实际通知 promise 之前确保先调用 handlerAdded(...) 方法。这是必要的因为用户可能已经通过管道触发了事件。pipeline.invokeHandlerAddedIfNeeded();// 设置注册成功并通知 ChannelPromisesafeSetSuccess(promise);// 触发 ChannelRegistered 事件通知管道上下文pipeline.fireChannelRegistered();// 只有在通道尚未激活并且已经注册过一次时才触发 channelActive 事件避免重复触发if (isActive()) {if (firstRegistration) {// 如果是第一次注册则触发 channelActive 事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {// 如果通道已经注册过且设置了自动读取则重新开始读取以便处理传入数据beginRead();}}} catch (Throwable t) {// 发生异常时直接关闭通道以避免 FD 泄漏closeForcibly();closeFuture.setClosed();// 设置注册失败并通知 ChannelPromisesafeSetFailure(promise, t);}
}注册 doRegister() doRegister(); Override
protected void doRegister() throws Exception {boolean selected false;for (;;) {try {// 尝试将通道注册到 EventLoop 的 Selector 上关注的事件为 0表示不关注任何事件selectionKey javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// 强制 Selector 立即执行 select 操作因为 canceled SelectionKey 可能仍然被缓存尚未移除因为尚未调用 Select.select(..) 操作。eventLoop().selectNow();selected true;} else {// 我们在之前已经强制执行了一次选择操作但是 SelectionKey 仍然因为某种原因被缓存了可能是 JDK 的 bugthrow e;}}}
}Override
protected void doRegister() throws Exception {boolean selected false;for (;;) {try {// 尝试将通道注册到 EventLoop 的 Selector 上关注的事件为 0表示不关注任何事件selectionKey javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// 强制 Selector 立即执行 select 操作因为 canceled SelectionKey 可能仍然被缓存尚未移除因为尚未调用 Select.select(..) 操作。eventLoop().selectNow();selected true;} else {// 我们在之前已经强制执行了一次选择操作但是 SelectionKey 仍然因为某种原因被缓存了可能是 JDK 的 bugthrow e;}}}
}熟悉的NIO代码
javaChannel().register(eventLoop().unwrappedSelector(), 0, this);pipeline.invokeHandlerAddedIfNeeded(); pipeline.fireChannelRegistered(); pipeline.fireChannelActive(); 源码流程图 图都给你画好了戳这里