当前位置: 首页 > news >正文

奖励网站源码昆山网站建设哪家便宜

奖励网站源码,昆山网站建设哪家便宜,wordpress网站迁移,网上注册公司在哪个网址注册作者 | 张彦飞allen来源 | 开发内功修炼Netty 是一个在 Java 生态里应用非常广泛的的网络编程工具包#xff0c;它在 2004 年诞生到现在依然是火的一塌糊涂#xff0c;光在 github 上就有 30000 多个项目在用它。所以要想更好地掌握网络编程#xff0c;我想就绕不开 Netty。… 作者 | 张彦飞allen来源 | 开发内功修炼Netty 是一个在 Java 生态里应用非常广泛的的网络编程工具包它在 2004 年诞生到现在依然是火的一塌糊涂光在 github 上就有 30000 多个项目在用它。所以要想更好地掌握网络编程我想就绕不开 Netty。所以今天我们就来分析分析 Netty 内部网络模块的工作原理。一、Netty 用法我们首先找一个 Netty 的例子本篇文章整体都是围绕这个例子来展开叙述的。我们下载 Netty 的源码并在 examples 中找到 echo 这个 demo。同时为了防止代码更新导致对本文叙述的影响我们切到 4.1 分支上来。# git checkout https://github.com/netty/netty.git # git checkout -b 4.1 # cd example/src/main/java/io/netty/example/echo在这个 demo 的 EchoServer 中展示了使用 Netty 写 Server 的经典用法。public final class EchoServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup  new NioEventLoopGroup(1);EventLoopGroup workerGroup  new NioEventLoopGroup();final EchoServerHandler serverHandler  new EchoServerHandler();ServerBootstrap b  new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p  ch.pipeline();if (sslCtx ! null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(serverHandler);}});// Start the server.ChannelFuture f  b.bind(PORT).sync();......} }如果你是一个 Java 新手或者干脆像飞哥一样没用 Netty 写过服务相信上述代码基本是看不懂的。究其根本原因是相比 C/C Java 的封装程度比较高。Java 语言本身的 JVM 中 NIO 对网络的封装就已经屏蔽了很多底层的概念了再加上 Netty 又封装了一层所以 Java 开发者常用的一些术语和概念和其它语言出入很大。比如上面代码中的 Channel、NioEventLoopGroup 等都是其它语言中所没见过的。不过你也不用感到害怕因为这其中的每一个概念都是 socket、进程等底层概念穿了一身不同的衣服而已。接下来我们分别细了解一下这些概念。1.1 NioEventLoopGroup如果你没接触过 Netty可以简单把 NioEventLoopGroup 理解为一个线程池就可以。每一个 NioEventLoopGroup 内部包含一个或者多个 NioEventLoop。其中 NioEventLoop 是对线程、epoll 等概念进行了一个集中的封装。首先EventLoop 本身就是一个线程。为什么这么说我们通过看 NioEventLoop 的继承关系就能看出来。NioEventLoop 继承于 SingleThreadEventLoop而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor。SingleThreadEventExecutor 实现了在 Netty 中对本地线程的抽象。public abstract class SingleThreadEventExecutor extends ... {private volatile Thread thread;private final QueueRunnable taskQueue; }在 SingleThreadEventExecutor 中不但封装了线程对象 Thread而且还配置了一个任务队列 taskQueue用于其它线程向它来放置待处理的任务。1.2 selector另外 NioEventLoopEventLoop 以 selector 的名义封装了 epoll在 Linux 操作系统下。在 NioEventLoop 对象内部会有 selector 成员定义。这其实就是封装的 epoll 而来的。我们来看具体的封装过程。以及 selectedKeys这是从 selector 上发现的待处理的事件列表。public final class NioEventLoop extends SingleThreadEventLoop{// selectorprivate Selector selector;private Selector unwrappedSelector;// selector 上发现的各种待处理事件private SelectedSelectionKeySet selectedKeys; }NioEventLoopGroup 在构造的时候会调用 SelectorProvider#provider 来生成 provider在默认情况下会调用 sun.nio.ch.DefaultSelectorProvider.create 来创建。//file:java/nio/channels/spi/SelectorProvider.java public abstract class SelectorProvider {public static SelectorProvider provider() {// 1. java.nio.channels.spi.SelectorProvider 属性指定实现类// 2. SPI 指定实现类......// 3. 默认实现Windows 和 Linux 下不同provider  sun.nio.ch.DefaultSelectorProvider.create();return provider;} }在 Linux 下默认创建的 provider 使用的就是 epoll。//file:sun/nio/ch/DefaultSelectorProvider.java public class DefaultSelectorProvider {public static SelectorProvider create() {String osname  AccessController.doPrivileged(new GetPropertyAction(os.name));if (osname.equals(Linux))return createProvider(sun.nio.ch.EPollSelectorProvider);}}1.3 ChannelChannel 是 JavaNIO 里的一个概念。大家把它理解成 socket以及在 socket 之上的一系列操作方法的封装就可以了。Java 在 Channel 中把 connect、bind、read、write 等方法都以成员方法的形式给封装起来了。public interface Channel extends ... {Channel read();Channel flush();......interface Unsafe {void bind(SocketAddress localAddress, ...);void connect(SocketAddress remoteAddress, ...);void write(Object msg, ...);......} }另外在 Java 中习惯把 listen socket 叫做父 channel客户端握手请求到达以后创建出来的新连接叫做子 channel方便区分。1.4 Pipeline在每个 Channel 对象的内部除了封装了 socket 以外还都一个特殊的数据结构 DefaultChannelPipeline pipeline。在这个 pipeline 里是各种时机里注册的 handler。Channel 上的读写操作都会走到这个 DefaultChannelPipeline 中当 channel 上完成 register、active、read、readComplete 等操作时会触发 pipeline 中的相应方法。这个 ChannelPipeline 其实就是一个双向链表以及链表上的各式各样的操作方法。public interface ChannelPipeline {ChannelPipeline addFirst(String name, ChannelHandler handler);ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);ChannelPipeline addLast(String name, ChannelHandler handler);ChannelPipeline fireChannelRead(Object msg); }1.5 EchoServer 解读现在我们具备了对 Java、对 Netty 的初步理解以后我们再会后来看一下开篇提到的 EchoServer 源码。public final class EchoServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup  new NioEventLoopGroup(1);EventLoopGroup workerGroup  new NioEventLoopGroup();final EchoServerHandler serverHandler  new EchoServerHandler();ServerBootstrap b  new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p  ch.pipeline();if (sslCtx ! null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(serverHandler);}});// Start the server.ChannelFuture f  b.bind(PORT).sync();......} }在该代码一开头bossGroup new NioEventLoopGroup(1)这一行是创建了一个只有一个线程的线程池。workerGroup new NioEventLoopGroup又创建了 worker 线程池没有指定数量Netty 内部会根据当前机器的 CPU 核数来灵活决定。ServerBootstrap 这是一个脚手架类是为了让我们写起服务器程序来更方便一些。b.group(bossGroup, workerGroup)这一行是将两个线程池传入第一个作为 boss 只处理 accept 接收新的客户端连接请求。第二个参数作为 worker 线程池来处理连接上的请求接收、处理以及结果发送发送。我们注意下 childHandler是传入了一个 ChannelInitializer这是当有新的客户端连接到达时会回调的一个方法。在这个方法内部我们给这个新的 chaneel 的 pipeline 上添加了一个处理器 serverHandler以便收到数据的时候执行该处理器进行请求处理。上面的几个方法都是定义在b.bind方法中真正开始启动服务创建父 channellisten socket创建 boss 线程。当有新连接到达的时候 boss 线程再创建子 channel为其 pipeline 添加处理器并启动 worker 线程来进行处理。二、Netty bootstrap 参数构建简言之 bootstrap.group() .channel() .childHandler() .childOption() 就是在构建 Netty Server 的各种参数。2.1 group 设置ServerBootstrap 和其父类 AbstractBootstrap 内部分别定义了两个 EventLoopGroup group 成员。父类 AbstractBootstrap 的 group 是用来处理 accpet 事件的ServerBootstrap 下的 childGroup 用来处理其它所有的读写等事件。group() 方法就是把 EventLoopGroup 参数设置到自己的成员上完事。其中如果调用 group() 只传入了一个线程池那么将来本服务下的所有事件都由这个线程池来处理。详情查看飞哥精简后的源码。//file:io/netty/bootstrap/ServerBootstrap.java public class ServerBootstrap extends AbstractBootstrapServerBootstrap, ServerChannel {//用来处理非 accept 以外的线程池private volatile EventLoopGroup childGroup;public ServerBootstrap group(EventLoopGroup group) {return group(group, group);}public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);this.childGroup  ObjectUtil.checkNotNull(childGroup, childGroup);return this;} }public abstract class AbstractBootstrap ... {//用来处理 accept 的线程volatile EventLoopGroup group;public B group(EventLoopGroup group) {this.group  group;......} }2.2 channel 设置再看 ServerBootstrap#channel 方法 是用来定义一个工厂方法将来需要创建 channel 的时候都调用该工厂进行创建。//file:io/netty/bootstrap/ServerBootstrap.java public class ServerBootstrap extends AbstractBootstrapServerBootstrap, ServerChannel {public B channel(Class? extends C channelClass) {return channelFactory(new ReflectiveChannelFactoryC(ObjectUtil.checkNotNull(channelClass, channelClass)));} }回头看本文开头 demo.channel(NioServerSocketChannel.class)指的是将来需要创建 channel 的时候创建 NioServerSocketChannel 这个类型的。2.3 option 设置再看 option 方法只是设置到了 options 成员中而已//file:io/netty/bootstrap/ServerBootstrap.java public class ServerBootstrap extends AbstractBootstrapServerBootstrap, ServerChannel {public T B option(ChannelOptionT option, T value) {ObjectUtil.checkNotNull(option, option);synchronized (options) {if (value  null) {options.remove(option);} else {options.put(option, value);}}return self();} }2.4 handler 方法本文 demo 设置了两处 handler一处是 handler另一处是 childHandler。他们都是分别设置到自己的成员上就完事看源码。//file:io/netty/bootstrap/ServerBootstrap.java public class ServerBootstrap extends ...... {public B handler(ChannelHandler handler) {this.handler  ObjectUtil.checkNotNull(handler, handler);return self();}public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler  ObjectUtil.checkNotNull(childHandler, childHandler);return this;} }三、Netty bootstrap 启动服务ServerBootstrap 下的 bind 方法是服务启动过程中非常重要的一个方法。创建父 channellisten socket创建 boss 线程为 boss 线程绑定 Acceptor 处理器调用系统调用 bind 进行绑定和监听都是在这里完成的。先来直接看一下 bind 相关的入口源码。//file:io/netty/bootstrap/ServerBootstrap.java public class ServerBootstrap extends AbstractBootstrap ... {...... }//file:io/netty/bootstrap/AbstractBootstrap.java public abstract class AbstractBootstrap ... {public ChannelFuture bind(SocketAddress localAddress) {validate();return doBind(...);}private ChannelFuture doBind(final SocketAddress localAddress) {//创建父 channel、初始化并且注册final ChannelFuture regFuture  initAndRegister();final Channel channel  regFuture.channel();......//如果 Register 已经完成则直接 doBind0if (regFuture.isDone()) {ChannelPromise promise  channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;//否则就注册一个 listener回调等 register 完成的时候调用 } else {final PendingRegistrationPromise promise  new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {promise.registered();doBind0(regFuture, channel, localAddress, promise);}return promise;}}//创建 channel对其初始化并且 register会创建 parent 线程final ChannelFuture initAndRegister() {//3.1 创建父 channellisten socketchannel  channelFactory.newChannel();//3.2 对父 channellisten socket进行初始化init(channel);//3.3 注册并启动 boss 线程ChannelFuture regFuture  config().group().register(channel);......}//3.4 真正的bindprivate static void doBind0(...) {channel.eventLoop().execute(new Runnable() {Overridepublic void run() {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);......}});} }在这个过程中做了如下几件重要的事情创建父 channellisten socket对父 channellisten socket进行初始化register父 channellisten socket到主 group并启动主进程真正的 bind接下来我们分开来看。3.1 创建父 channellisten socket在 initAndRegister() 方法中创建 channelsocket它调用了 channelFactory.newChannel()。public abstract class AbstractBootstrap//创建 channel对其初始化并且 register会创建 parent 线程final ChannelFuture initAndRegister() {//3.1 创建 listen socketchannel  channelFactory.newChannel();......} }回想下2.2节的channel 方法返回的是一个反射 ReflectiveChannelFactory。没错这里的 newChannel 就是调用这个工厂方法来创建出来一个 NioServerSocketChannel 对象。3.2 对父 channellisten socket进行初始化在 initAndRegister 创建除了 channel 之后需要调用 init 对其进行初始化。public abstract class AbstractBootstrapfinal ChannelFuture initAndRegister() {//3.1 创建父 channellisten socket//3.2 对父 channellisten socket进行初始化init(channel);......} }在 init() 中对 channel 进行初始化一是给 options 和 attrs 赋值二是构建了父 channel 的 pipeline。//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java public class ServerBootstrap extends AbstractBootstrapServerBootstrap, ServerChannel {void init(Channel channel) {//设置 option 和 attrsetChannelOptions(channel, newOptionsArray(), logger);setAttributes(channel, newAttributesArray());//设置 pipelineChannelPipeline p  channel.pipeline();p.addLast(new ChannelInitializerChannel() {......});} }在 setChannelOptions 中对 channel 的各种 option 进行设置。回忆我们在使用 ServerBootstrap 时可以传入 SO_BACKLOG这就是其中的一个 option。在这里会真正设置到 channelsocket上。ServerBootstrap b  new ServerBootstrap(); b.option(ChannelOption.SO_BACKLOG, 100)在 init 中稍微难理解一点是 p.addLast(new ChannelInitializer...)。这一段代码只是给父 channel 添加一个 handler 而已。其真正的执行要等到 register 后我们待会再看。3.3 register 父 channel父 channel 在创建完并且初始化之后需要注册到 boss 线程上才可用。public abstract class AbstractBootstrapfinal ChannelFuture initAndRegister() {//3.1 创建父 channellisten socket//3.2 对父 channellisten socket进行初始化//3.3 注册并启动 boss 线程ChannelFuture regFuture  config().group().register(channel);......} }其中 config().group() 最终会调用到 AbstractBootstrap#group在这个方法里获取的是我们传入进来的 bossGroup。public abstract class AbstractBootstrapvolatile EventLoopGroup group;public final EventLoopGroup group() {return group;} }其中 bossGroup 是一个 NioEventLoopGroup 实例所以代码会进入到 NioEventLoopGroup#register 方法。public class NioEventLoopGroup extends MultithreadEventLoopGroup {}public abstract class MultithreadEventLoopGroup extends ... {Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}Overridepublic EventLoop next() {return (EventLoop) super.next();} }在 NioEventLoopGroup 里包含一个或多个 EventLoop。上面的 next 方法就是从中选择一个出来然后将 channel 注册到其上。对于本文来讲我们使用的是 NioEventLoopGroup其内部包含的自然也就是 NioEventLoop我们继续查找其 register 方法。public final class NioEventLoop extends SingleThreadEventLoop//在 eventloop 里注册一个 channlesocketpublic void register(final SelectableChannel ch, ...) {......register0(ch, interestOps, task);}//最终调用 channel 的 registerprivate void register0(SelectableChannel ch, int interestOps, NioTask? task) {ch.register(unwrappedSelector, interestOps, task);} }可见NioEventLoop 的 register 最后又调用到 channel 的 register 上了。在我们本文中我们创建的 channel 是 NioServerSocketChannel我们就依照这条线索来查。//file:src/main/java/io/netty/channel/AbstractChannel.java public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {public final void register(EventLoop eventLoop, final ChannelPromise promise) {......//关联自己到 eventLoopAbstractChannel.this.eventLoop  eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});}......}} }在 channel 的父类 AbstractChannel 中的 register 中先是把自己关联到传入的 eventLoop 上。接着调用 inEventLoop 来判断线程当前运行的线程是否是 EventExecutor的支撑线程是则返回直接 register0。一般来说服务在启动的时候都是主线程在运行。这个时候很可能 boss 线程还没有启动。所以如果发现当前不是 boss 线程的话就调用 eventLoop.execute 来启动 boss 线程。NioEventLoop 的父类是 SingleThreadEventExecutor, 找到 execute 方法。public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {public void execute(Runnable task) {execute0(task);}private void execute0(Schedule Runnable task) {execute(task, !(task instanceof LazyRunnable)  wakesUpForTask(task));}private void execute(Runnable task, boolean immediate) {boolean inEventLoop  inEventLoop();addTask(task);if (!inEventLoop) {startThread();}if (!addTaskWakesUp  immediate) {wakeup(inEventLoop);}} }我们先来看 addTask(task)它是将 task 添加到任务队列中。等待线程起来以后再运行。public abstract class SingleThreadEventExecutor extends ... {private final QueueRunnable taskQueue;protected void addTask(Runnable task) {(task);}final boolean offerTask(Runnable task) {return taskQueue.offer(task);} }inEventLoop() 是判断当前线程是不是自己绑定的线程这时还在主线程中运行所以 inEventLoop 为假会进入 startThread 开始为 EventLoop 创建线程。public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private void startThread() {doStartThread();......}private void doStartThread() {executor.execute(new Runnable() {Overridepublic void run() {SingleThreadEventExecutor.this.run();......}}}   }在 doStartThread 中调用 Java 线程管理工具 Executor 来启动 boss 线程。3.4 boss 线程启动当线程起来以后就进入了自己的线程循环中了会遍历自己的任务队列然后开始处理自己的任务。public final class NioEventLoop extends SingleThreadEventLoop {protected void run() {for (;;) {if (!hasTasks()) {strategy  select(curDeadlineNanos);}//如果有任务的话就开始处理runAllTasks(0);//任务处理完毕就调用 epoll_wait 等待事件发生processSelectedKeys();}} }前面我们在 3.3 节看到 eventLoop.execute 把一个 Runnable 任务添加到了任务队列里。当 EventLoop 线程启动后它会遍历自己的任务队列并开始处理。这时会进入到 AbstractChannel#register0 方法开始运行。//file:src/main/java/io/netty/channel/AbstractChannel.java public abstract class AbstractChannel extends ... {public final void register(...) {eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});......}private void register0(ChannelPromise promise) {doRegister();......} }函数 doRegister 是在 AbstractNioChannel 类下。//file:io/netty/channel/nio/AbstractNioChannel.java public abstract class AbstractNioChannel extends AbstractChannel {private final SelectableChannel ch;protected SelectableChannel javaChannel() {return ch;}public NioEventLoop eventLoop() {return (NioEventLoop) super.eventLoop();}protected void doRegister() throws Exception {boolean selected  false;for (;;) {selectionKey  javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            }} }上面最关键的一句是selectionKey javaChannel().register(eventLoop().unwrappedSelector(), 0, this);。这一句就相当于在 C 语言下调用 epoll_ctl 把 listen socket 添加到了 epoll 对象下。其中 javaChannel 获取父 channel相当于 listen socket。unwrappedSelector 获取 selector相当于 epoll 对象。register 相当于使用 epoll_ctl 执行 add 操作。当 channel 注册完后前面 init 时注册的 ChannelInitializer 回调就会被执行。再回头看它的 回调定义。//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java public class ServerBootstrap extends AbstractBootstrapServerBootstrap, ServerChannel {void init(Channel channel) ......p.addLast(new ChannelInitializerChannel() {Overridepublic void initChannel(final Channel ch) {......ch.eventLoop().execute(new Runnable() {Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});} }在 ChannelInitializer#initChannel 里又给 boss 线程的 pipeline 里添加了一个任务。该任务是让其在自己的 pipeline 上注册一个 ServerBootstrapAcceptor handler。将来有新连接到达的时候ServerBootstrapAcceptor 将会被执行。3.5 真正的 bind再看 doBind0 方法调用 channel.bind 完成绑定。private static void doBind0(...) {channel.eventLoop().execute(new Runnable() {Overridepublic void run() {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);......}});}四、新连接到达我们再回到 boss 线程的主循环中。public final class NioEventLoop extends SingleThreadEventLoop {protected void run() {for (;;) {strategy  selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());//任务队列都处理完就开始 selectif (!hasTasks()) {strategy  select(curDeadlineNanos);}//处理各种事件if (strategy  0) {processSelectedKeys();}}  }private int select(long deadlineNanos) throws IOException {if (deadlineNanos  NONE) {return selector.select();}// Timeout will only be 0 if deadline is within 5 microsecslong timeoutMillis  deadlineToDelayNanos(deadlineNanos  995000L) / 1000000L;return timeoutMillis  0 ? selector.selectNow() : selector.select(timeoutMillis);} }假如线程任务队列中的任务都处理干净了的情况下boss 线程会调用 select 来发现其 selector 上的各种事件。相当于 C 语言中的 epoll_wait。当发现有事件发生的时候例如 OP_WRITE、OP_ACCEPT、OP_READ 等的时候会进入相应的处理public final class NioEventLoop extends SingleThreadEventLoop {private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {......if ((readyOps  SelectionKey.OP_WRITE) ! 0) {ch.unsafe().forceFlush();}if ((readyOps  (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! 0 || readyOps  0) {unsafe.read();}} }对于服务端的 Unsafe.read()  这里会执行 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() 方法它会调用 JDK 底层的 ServerSocketChannel.accept() 接收到客户端的连接后将其封装成 Netty 的 NioSocketChannel再通过 Pipeline 将 ChannelRead 事件传播出去这样 ServerBootstrapAcceptor 就可以在 ChannelRead 回调里处理新的客户端连接了。我们直接看 ServerBootstrapAcceptor#ChannelRead。//file: public class ServerBootstrap extends AbstractBootstrapServerBootstrap, ServerChannel {......private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) {// 获取child channelfinal Channel child  (Channel) msg;// 设置 childHandler 到 child channelchild.pipeline().addLast(childHandler);// 设置 childOptions、 childAttrssetChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);// 将 child channel 注册到 childGroupchildGroup.register(child).addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});}} }在 channelRead 先是获取到了新创建出来的子 channel并为其 pipeline 添加 childHandler。回头看 1.5 节childHandler 是我们自定义的。紧接着调用 childGroup.register(child) 将子 channel 注册到 workerGroup 上。这个 register 过程和 3.3节、3.5节过程一样。区别就是前面是父 channel 注册到 bossGroup 上这里是子 channel 注册到 workerGroup上。在 register 完成后子 channel 被挂到了 workerGroup 其中一个线程上相应的线程如果没有创建也会被创建出来并进入到自己的线程循环中。当子 channel 注册完毕的时候childHandler 中 ChannelInitializer#initChannel 会被执行public final class EchoServer {public static void main(String[] args) throws Exception {...ServerBootstrap b  new ServerBootstrap();b.childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p  ch.pipeline();if (sslCtx ! null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(serverHandler);}});......} }在 initChannel 把子 channel 的处理类 serverHandler 添加上来了。Netty demo 中对这个处理类的定义非常的简单仅仅只是打印出来而已。public class EchoServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(......) {ctx.write(msg);}...... }五、用户请求到达当 worker 线程起来以后会进入线程循环(boss 线程和 worker 线程的 run 函数是一个)。在循环中会遍历自己的任务队列如果没有任务可处理便 select 来观察自己所负责的 channel 上是否有事件发生。public final class NioEventLoop extends SingleThreadEventLoop {protected void run() {for (;;) {if (!hasTasks()) {strategy  select(curDeadlineNanos);}//如果有任务的话就开始处理runAllTasks(0);//任务处理完毕就调用 epoll_wait 等待事件发生processSelectedKeys();}}private int select(long deadlineNanos) throws IOException {selector.selectNow();......    } }worker 线程会调用 select 发现自己所管理的所有子 channel 上的可读可写事件。在发现有可读事件后会调用 processSelectedKeys最后触发 pipeline 使得 EchoServerHandler 方法开始执行。public class EchoServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(......) {ctx.write(msg);}...... }六、总结事实上Netty 对网络封装的比较灵活。既支持单线程 Reactor也支持多线程 Reactor、还支持主从多线程 Reactor。三种模型对应的用法如下public static void main(String[] args) throws Exception {//单线程 ReactorEventLoopGroup eventGroup  new NioEventLoopGroup(1);ServerBootstrap serverBootstrap  new ServerBootstrap(); serverBootstrap.group(eventGroup);    ......//多线程 ReactorEventLoopGroup eventGroup  new NioEventLoopGroup();ServerBootstrap serverBootstrap  new ServerBootstrap(); serverBootstrap.group(eventGroup);    ......//主从多线程 ReactorEventLoopGroup bossGroup  new NioEventLoopGroup(1); EventLoopGroup workerGroup  new NioEventLoopGroup();ServerBootstrap serverBootstrap  new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup);    ...... }为了表述的更全面本文飞哥选择的是最为经典的 主从多线程 Reactor 模式。本文中所描述的内容可以用下面一幅图来表示。在 Netty 中的 boss 线程中负责对父 channellisten socket上事件的监听和处理当有新连接到达的时候选择一个 worker 线程把这个子 channel连接 socket 叫给 worker 线程来处理。其中 Worker 线程就是等待其管理的所有子 channel连接 socket上的事件的监听和处理。当发现有事件发生的时候回调用户设置的 handler 进行处理。在本文的例子中这个用户 handler 就是 EchoServerHandler#channelRead。至此Netty 网络模块的工作核心原理咱们就介绍完了。飞哥一直“鼓吹”内功的好处。只要你具备了坚实的内功各种语言里看似风牛马不相及的东西在底层实际上原理是想通的。我本人从来没用 Java 开发过服务器程序更没碰过 Netty。但是当你多epoll有了深入理解的时候再看Netty也能很容易看懂很快就能理解它的核心。这就是锻炼内功的好处往期推荐Redis 内存优化神技小内存保存大数据使用 nginx 轻松管理 kubernetes 资源文件Redis 内存满了怎么办这样置才正确实战 Kubectl 创建 Deployment 部署应用点分享点收藏点点赞点在看
http://www.zqtcl.cn/news/82641/

相关文章:

  • 中山精品网站建设公司Opt wordpress
  • 深圳网站seo优化排名公司专业做微视频的网站
  • 岳阳网站建设团队织梦个人网站模版
  • 网站建设费应该怎样入账成都最专业做网站的
  • 如何更改网站源码品网站建设
  • 培训通网站建设wordpress 切换域名
  • 手机可以建设网站吗wordpress 程序
  • 外贸网站模板建立wordpress主题显示不完整
  • 互动型网站杭州建设局网站
  • linux系统如何做网站做网做网站建设
  • 保定网站建设设计房网房天下官网
  • 怎么做婚介网站建设网上银行官网
  • 韩国网站后缀潍坊vi设计公司
  • 国内投资咨询网站 html模板档案网站建设的步骤
  • 中小学网站模板源码168电商平台
  • 大气宽屏网站模板企业源码带后台网络品牌传播推广策略
  • 医院网站前置审批文件专业信息门户网站建设
  • 网站跳转是什么意思企业信息系统有哪些类型
  • 缙云 网站建设有名网站建设公司
  • 网站建设调研报告百度账号申诉
  • html5网站正在建设中cocos2d-js可以做网站吗
  • 网站建设定金合同范本哪个电商平台好做
  • 网站产品怎么改顺序做漫画视频在线观看网站
  • 个体户营业执照科研做企业网站吗广告发布网站模板
  • 石家庄规划建设局网站北京展览网站建设
  • 网站建设 招聘需求哪个公司的app开发
  • 建设银行网站每天几点更新什么是域名空间
  • 怎么查询网站开发公司团购网站app制作
  • .net做网站之前设置吕梁做网站公司
  • 百度站长工具官网店招免费设计在线生成