当前位置: 首页 > news >正文

哪些网站上可以做seo推广的隆尧网站

哪些网站上可以做seo推广的,隆尧网站,青海公路建设信息服务网站,小公司有必要买财务软件吗文章目录 四. 优化与源码1. 优化1.1 扩展序列化算法jdk序列化与反序列化Serializer AlgorithmConfigapplication.properties MessageCodecSharableMessage#xff08;抽象类#xff09; 测试序列化测试反序列化测试 1.2 参数调优1#xff09;CONNECT_TIMEOUT_MILLIS2 AlgorithmConfigapplication.properties MessageCodecSharableMessage抽象类 测试序列化测试反序列化测试 1.2 参数调优1CONNECT_TIMEOUT_MILLIS2SO_BACKLOG三次握手与accept课堂测试TestBacklogServerTestBacklogClientdebug的位置报错 笔记测试 3ulimit -n4TCP_NODELAY5SO_SNDBUF SO_RCVBUF6ALLOCATORByteBufUtil测试TestByteBufTestByteBufClient测试结果 7RCVBUF_ALLOCATOR测试源码要点 1.3 RPC 框架1)准备工作修改MessageRpcRequestMessageRpcResponseMessageRpcServer服务器架子RpcClient客户端架子ServicesFactory 2)服务器 handlerRpcRequestMessageHandler 3)RpcClient第一版4)客户端 handler 第一版流程分析Gson问题 5)RpcClientManagerSequenceIdGenerator流程分析 6)RpcResponseMessageHandler 2. 源码分析2.1 启动剖析2.2 NioEventLoop 剖析注意 2.3 accept 剖析2.4 read 剖析 四. 优化与源码 1. 优化 1.1 扩展序列化算法 序列化反序列化主要用在消息正文的转换上 序列化时需要将 Java 对象变为要传输的数据可以是 byte[]或 json 等最终都需要变成 byte[]反序列化时需要将传入的正文数据还原成 Java 对象便于处理 jdk序列化与反序列化 目前的代码仅支持 Java 自带的序列化反序列化机制核心代码如下 // 反序列化将 字节数组 转换为 java对象, jdk反序列化不需要提供类型信息 byte[] body new byte[bodyLength]; byteBuf.readBytes(body); ObjectInputStream in new ObjectInputStream(new ByteArrayInputStream(body)); Message message (Message) in.readObject(); message.setSequenceId(sequenceId);// 序列化将 java对象 转换为 字节数组 ByteArrayOutputStream out new ByteArrayOutputStream(); new ObjectOutputStream(out).writeObject(message); byte[] bytes out.toByteArray();Serializer Algorithm 为了支持更多序列化算法抽象一个 Serializer 接口提供 Serializer 接口 两个实现我这里直接将实现加入了枚举类 Serializer.Algorithm 中此Algorithm枚举类写在Serializer接口中作为Serializer接口类中的成员内部类 public interface Serializer {// 反序列化方法T T deserialize(ClassT clazz, byte[] bytes);// 序列化方法T byte[] serialize(T object);enum Algorithm implements Serializer {// 枚举类特点// 1. 如下枚举类对象因为实现了Serializer接口, 因此, 须实现serialize方法 deserialize方法// 2. 枚举类中定义的枚举对象有特定的顺序, 枚举类对象调用ordinal()方法可获取到枚举类对象的顺序值// 顺序值从0开始// 3. 调用 枚举类.valueOf(枚举类对象字符串形式), 可获取到对应的枚举类对象// 如Serializer.Algorithm.valueOf(Java)// 4. 调用 枚举类.values(), 可获取到所有的枚举类对象的数组// Java 实现Java {Overridepublic T T deserialize(ClassT clazz, byte[] bytes) {try {ObjectInputStream in new ObjectInputStream(new ByteArrayInputStream(bytes));Object object in.readObject();return (T) object;} catch (IOException | ClassNotFoundException e) {throw new RuntimeException(SerializerAlgorithm.Java 反序列化错误, e);}}Overridepublic T byte[] serialize(T object) {try {ByteArrayOutputStream out new ByteArrayOutputStream();new ObjectOutputStream(out).writeObject(object);return out.toByteArray();} catch (IOException e) {throw new RuntimeException(SerializerAlgorithm.Java 序列化错误, e);}}}, // Json 实现(引入了 Gson 依赖)Json {Overridepublic T T deserialize(ClassT clazz, byte[] bytes) {// Gson反序列化, 需要提供具体的反序列化类return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);}Overridepublic T byte[] serialize(T object) {// 指定json字符串转为字节数组时使用Gson, 所使用的编码集return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);}};}}Config 增加配置类和配置文件将序列化算法配置到类路径下的application.properties文件中 public abstract class Config {static Properties properties;static {/* 加载类路径下的application.properties文件 */try (InputStream in Config.class.getResourceAsStream(/application.properties)) {properties new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}// 读取配置文件中 server.portpublic static int getServerPort() {String value properties.getProperty(server.port);if(value null) {return 8080;} else {return Integer.parseInt(value);}}// 读取配置文件中 serializer.algorithmpublic static Serializer.Algorithm getSerializerAlgorithm() {String value properties.getProperty(serializer.algorithm);if(value null) {// 默认使用jdk序列化return Serializer.Algorithm.Java;} else {// 传入枚举类字面形式字符串作为参数, 以返回对应的枚举类对象, 可配置为 Java 或 Gsonreturn Serializer.Algorithm.valueOf(value);}} }application.properties 配置文件 # 配置序列化算法, 配置的值就是枚举类对象的字面形式字符串 serializer.algorithmJsonMessageCodecSharable 修改编解码器 /*** 必须和 LengthFieldBasedFrameDecoder 一起使用确保接到的 ByteBuf 消息是完整的*/ public class MessageCodecSharable extends MessageToMessageCodecByteBuf, Message {Overridepublic void encode(ChannelHandlerContext ctx, Message msg, ListObject outList) throws Exception {ByteBuf out ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1//获取配置文件中的序列化对象 的 索引out.writeByte(Config.getSerializerAlgorithm().ordinal());// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组//使用配置文件指定的算法 将 java对象 转换为 字节数组byte[] bytes Config.getSerializerAlgorithm().serialize(msg);// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}Overrideprotected void decode(ChannelHandlerContext ctx,ByteBuf in, ListObject out) throws Exception {int magicNum in.readInt();byte version in.readByte();// 获取待解码的数据中 指定的序列化算法 标识byte serializerAlgorithm in.readByte(); // 0 或 1byte messageType in.readByte(); // 0,1,2...int sequenceId in.readInt();in.readByte();int length in.readInt();// 这个长度是通过解析ByteBuf字节数据消息来的byte[] bytes new byte[length];in.readBytes(bytes, 0, length);// 确定具体消息类型Class? extends Message messageClass Message.getMessageClass(messageType);// 获取待解码的数据反序列化算法枚举类的顺序, 从0开始Serializer.Algorithm algorithm Serializer.Algorithm.values()[serializerAlgorithm];// 使用指定的反序列化算法Message message algorithm.deserialize(messageClass, bytes);out.add(message);} }Message抽象类 其中确定具体消息类型可以根据 消息类型字节 获取到对应的 消息 class 下面就是为了让Gson能够根据消息类型 获取到 具体的Message实现类 ​ 从而 让Gson能够把Json字符串编码得到的字节数组转为 具体的Message对象 ​ 而不能简单的指定Message这个抽象类作为Gson的反序列化目标类 Data public abstract class Message implements Serializable {/*** 根据消息类型字节获得对应的消息 class* param messageType 消息类型字节* return 消息 class*/public static Class? extends Message getMessageClass(int messageType) {return messageClasses.get(messageType);}private int sequenceId;private int messageType;public abstract int getMessageType();public static final int LoginRequestMessage 0;public static final int LoginResponseMessage 1;public static final int ChatRequestMessage 2;public static final int ChatResponseMessage 3;public static final int GroupCreateRequestMessage 4;public static final int GroupCreateResponseMessage 5;public static final int GroupJoinRequestMessage 6;public static final int GroupJoinResponseMessage 7;public static final int GroupQuitRequestMessage 8;public static final int GroupQuitResponseMessage 9;public static final int GroupChatRequestMessage 10;public static final int GroupChatResponseMessage 11;public static final int GroupMembersRequestMessage 12;public static final int GroupMembersResponseMessage 13;public static final int PingMessage 14;public static final int PongMessage 15;private static final MapInteger, Class? extends Message messageClasses new HashMap();static {messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);} }测试 序列化测试 将application.properties的序列化算法分别配置为Java、Json public class TestSerializer {public static void main(String[] args) {MessageCodecSharable CODEC new MessageCodecSharable();LoggingHandler LOGGING new LoggingHandler();/* 在自定义编解码器的前后, 各配置1个日志处理器 */EmbeddedChannel channel new EmbeddedChannel(LOGGING, CODEC, LOGGING);LoginRequestMessage message new LoginRequestMessage(zhangsan, 123);/* 写 1个 出站消息, 将会经过 编码处理 */channel.writeOutbound(message);}}从下面的日志输出可以看到使用Json的方式所传输的数据更小并且易读 使用java序列化日志输出 18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED 18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED 18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE 18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE 18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: LoginRequestMessage(superMessage(sequenceId0, messageType0), usernamezhangsan, password123) 18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 224B-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 d0 |................| |00000010| ac ed 00 05 73 72 00 2a 63 6f 6d 2e 7a 7a 68 75 |....sr.*com.zzhu| |00000020| 61 2e 63 68 61 74 2e 6d 65 73 73 61 67 65 2e 4c |a.chat.message.L| |00000030| 6f 67 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 |oginRequestMessa| |00000040| 67 65 62 ad c8 8d f0 30 e5 ae 02 00 02 4c 00 08 |geb....0.....L..| |00000050| 70 61 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 |passwordt..Ljava| |00000060| 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 |/lang/String;L..| |00000070| 75 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 |usernameq.~..xr.| |00000080| 1e 63 6f 6d 2e 7a 7a 68 75 61 2e 63 68 61 74 2e |.com.zzhua.chat.| |00000090| 6d 65 73 73 61 67 65 2e 4d 65 73 73 61 67 65 c3 |message.Message.| |000000a0| fd 8f 9c bb 24 c3 cd 02 00 02 49 00 0b 6d 65 73 |....$.....I..mes| |000000b0| 73 61 67 65 54 79 70 65 49 00 0a 73 65 71 75 65 |sageTypeI..seque| |000000c0| 6e 63 65 49 64 78 70 00 00 00 00 00 00 00 00 74 |nceIdxp........t| |000000d0| 00 03 31 32 33 74 00 08 7a 68 61 6e 67 73 61 6e |..123t..zhangsan| ------------------------------------------------------------------------- 18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH 18:10:29 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH使用Json序列化日志输出 18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED 18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED 18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE 18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE 18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: LoginRequestMessage(superMessage(sequenceId0, messageType0), usernamezhangsan, password123) 18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 87B-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 47 |...............G| |00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{username:zha| |00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan,password| |00000030| 22 3a 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 |:123,sequenc| |00000040| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId:0,messageT| |00000050| 79 70 65 22 3a 30 7d |ype:0} | ------------------------------------------------------------------------- 18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH 18:10:47 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH反序列化测试 public class TestSerializer {public static void main(String[] args) {MessageCodecSharable CODEC new MessageCodecSharable();LoggingHandler LOGGING new LoggingHandler();/* 在自定义编解码器的前后, 各配置1个日志处理器 */EmbeddedChannel channel new EmbeddedChannel(LOGGING, CODEC, LOGGING);LoginRequestMessage message new LoginRequestMessage(zhangsan, 123);/* 先使用自定义的方法, 把消息写到ByteBuf中 */ByteBuf buf messageToByteBuf(message);/* 写入1个入站消息, 将会经过解码处理 */channel.writeInbound(buf);}public static ByteBuf messageToByteBuf(Message msg) {int algorithm Config.getSerializerAlgorithm().ordinal();ByteBuf out ByteBufAllocator.DEFAULT.buffer();out.writeBytes(new byte[]{1, 2, 3, 4});out.writeByte(1);out.writeByte(algorithm);out.writeByte(msg.getMessageType());out.writeInt(msg.getSequenceId());out.writeByte(0xff);byte[] bytes Serializer.Algorithm.values()[algorithm].serialize(msg);out.writeInt(bytes.length);out.writeBytes(bytes);return out;} } java反序列化日志输出 18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED 18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED 18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE 18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE 18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 224B-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 d0 |................| |00000010| ac ed 00 05 73 72 00 2a 63 6f 6d 2e 7a 7a 68 75 |....sr.*com.zzhu| |00000020| 61 2e 63 68 61 74 2e 6d 65 73 73 61 67 65 2e 4c |a.chat.message.L| |00000030| 6f 67 69 6e 52 65 71 75 65 73 74 4d 65 73 73 61 |oginRequestMessa| |00000040| 67 65 62 ad c8 8d f0 30 e5 ae 02 00 02 4c 00 08 |geb....0.....L..| |00000050| 70 61 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 |passwordt..Ljava| |00000060| 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 |/lang/String;L..| |00000070| 75 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 |usernameq.~..xr.| |00000080| 1e 63 6f 6d 2e 7a 7a 68 75 61 2e 63 68 61 74 2e |.com.zzhua.chat.| |00000090| 6d 65 73 73 61 67 65 2e 4d 65 73 73 61 67 65 c3 |message.Message.| |000000a0| fd 8f 9c bb 24 c3 cd 02 00 02 49 00 0b 6d 65 73 |....$.....I..mes| |000000b0| 73 61 67 65 54 79 70 65 49 00 0a 73 65 71 75 65 |sageTypeI..seque| |000000c0| 6e 63 65 49 64 78 70 00 00 00 00 00 00 00 00 74 |nceIdxp........t| |000000d0| 00 03 31 32 33 74 00 08 7a 68 61 6e 67 73 61 6e |..123t..zhangsan| ------------------------------------------------------------------------- 18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: LoginRequestMessage(superMessage(sequenceId0, messageType0), usernamezhangsan, password123) 18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE 18:13:24 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETEjson反序列化日志输出 18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED 18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED 18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE 18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE 18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 87B-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 47 |...............G| |00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{username:zha| |00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan,password| |00000030| 22 3a 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 |:123,sequenc| |00000040| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId:0,messageT| |00000050| 79 70 65 22 3a 30 7d |ype:0} | ------------------------------------------------------------------------- 18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: LoginRequestMessage(superMessage(sequenceId0, messageType0), usernamezhangsan, password123) 18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE 18:12:58 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE1.2 参数调优 1CONNECT_TIMEOUT_MILLIS 属于 SocketChannal 参数 用在客户端建立连接时如果在指定毫秒内无法连接会抛出 timeout 异常 SO_TIMEOUT 主要用在阻塞 IO即BIO阻塞 IO 中 acceptread 等都是无限等待的如果不希望永远阻塞使用它调整超时时间这就是SO_TIMEOUT与CONNECT_TIMEOUT_MILLIS的区别 Slf4j public class TestConnectionTimeout {public static void main(String[] args) {// 1. 客户端通过 new Bootstrap().option() 方法配置参数 给 SocketChannel 配置参数// 2. 服务器端// new ServerBootstrap().option() // 是给 ServerSocketChannel 配置参数// new ServerBootstrap().childOption() // 是给 SocketChannel 配置参数NioEventLoopGroup group new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000).channel(NioSocketChannel.class).handler(new LoggingHandler());// 此处仅发出建立连接的命令, 而不会等建立连接后才返回,ChannelFuture future bootstrap.connect(127.0.0.1, 8080); // 调用future.sync()可以等到建立连接后停止阻塞, 程序才会向下运行//可使用idea的右键该future变量选择markObject标记此future变量future.sync().channel().closeFuture().sync(); // 断点1注意断点的模式选择Thread} catch (Exception e) {e.printStackTrace();log.debug(timeout);} finally {group.shutdownGracefully();}} }另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect Override public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {// ...// Schedule connect timeout.int connectTimeoutMillis config().getConnectTimeoutMillis();if (connectTimeoutMillis 0) {// eventLoop开启了1个定时任务connectTimeoutFuture eventLoop().schedule(new Runnable() {Overridepublic void run() { ChannelPromise connectPromise AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause new ConnectTimeoutException(connection timed out: remoteAddress); // 断点2// Promise用于在2个线程间交换数据, 可以通知另外1个线程是否成功或失败//通过idea调试工具, 发现此处的connectPromise正好就是上面标记的future//此处通过Promise传入异常, 唤醒主线程, 通知主线程失败, 主线程的sync就停止阻塞了, // 但是主线程的future.sync()调用等来的是个异常, 所以不会调用后面的channel()方法, // 而是直接抛出了异常, 进入到了catch块if (connectPromise ! null connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}// ... }2SO_BACKLOG 属于 ServerSocketChannal 参数 三次握手与accept #mermaid-svg-1tCQCUuCUZMF2WTP {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-1tCQCUuCUZMF2WTP .error-icon{fill:#552222;}#mermaid-svg-1tCQCUuCUZMF2WTP .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-1tCQCUuCUZMF2WTP .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-1tCQCUuCUZMF2WTP .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-1tCQCUuCUZMF2WTP .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-1tCQCUuCUZMF2WTP .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-1tCQCUuCUZMF2WTP .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-1tCQCUuCUZMF2WTP .marker{fill:#333333;stroke:#333333;}#mermaid-svg-1tCQCUuCUZMF2WTP .marker.cross{stroke:#333333;}#mermaid-svg-1tCQCUuCUZMF2WTP svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-1tCQCUuCUZMF2WTP .actor{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-1tCQCUuCUZMF2WTP text.actortspan{fill:black;stroke:none;}#mermaid-svg-1tCQCUuCUZMF2WTP .actor-line{stroke:grey;}#mermaid-svg-1tCQCUuCUZMF2WTP .messageLine0{stroke-width:1.5;stroke-dasharray:none;stroke:#333;}#mermaid-svg-1tCQCUuCUZMF2WTP .messageLine1{stroke-width:1.5;stroke-dasharray:2,2;stroke:#333;}#mermaid-svg-1tCQCUuCUZMF2WTP #arrowhead path{fill:#333;stroke:#333;}#mermaid-svg-1tCQCUuCUZMF2WTP .sequenceNumber{fill:white;}#mermaid-svg-1tCQCUuCUZMF2WTP #sequencenumber{fill:#333;}#mermaid-svg-1tCQCUuCUZMF2WTP #crosshead path{fill:#333;stroke:#333;}#mermaid-svg-1tCQCUuCUZMF2WTP .messageText{fill:#333;stroke:#333;}#mermaid-svg-1tCQCUuCUZMF2WTP .labelBox{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-1tCQCUuCUZMF2WTP .labelText,#mermaid-svg-1tCQCUuCUZMF2WTP .labelTexttspan{fill:black;stroke:none;}#mermaid-svg-1tCQCUuCUZMF2WTP .loopText,#mermaid-svg-1tCQCUuCUZMF2WTP .loopTexttspan{fill:black;stroke:none;}#mermaid-svg-1tCQCUuCUZMF2WTP .loopLine{stroke-width:2px;stroke-dasharray:2,2;stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);}#mermaid-svg-1tCQCUuCUZMF2WTP .note{stroke:#aaaa33;fill:#fff5ad;}#mermaid-svg-1tCQCUuCUZMF2WTP .noteText,#mermaid-svg-1tCQCUuCUZMF2WTP .noteTexttspan{fill:black;stroke:none;}#mermaid-svg-1tCQCUuCUZMF2WTP .activation0{fill:#f4f4f4;stroke:#666;}#mermaid-svg-1tCQCUuCUZMF2WTP .activation1{fill:#f4f4f4;stroke:#666;}#mermaid-svg-1tCQCUuCUZMF2WTP .activation2{fill:#f4f4f4;stroke:#666;}#mermaid-svg-1tCQCUuCUZMF2WTP .actorPopupMenu{position:absolute;}#mermaid-svg-1tCQCUuCUZMF2WTP .actorPopupMenuPanel{position:absolute;fill:#ECECFF;box-shadow:0px 8px 16px 0px rgba(0,0,0,0.2);filter:drop-shadow(3px 5px 2px rgb(0 0 0 / 0.4));}#mermaid-svg-1tCQCUuCUZMF2WTP .actor-man line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-1tCQCUuCUZMF2WTP .actor-man circle,#mermaid-svg-1tCQCUuCUZMF2WTP line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;stroke-width:2px;}#mermaid-svg-1tCQCUuCUZMF2WTP :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} client server syns queue accept queue bind() listen() connect() 1. SYN SYN_SEND put SYN_RCVD 2. SYN ACK ESTABLISHED 3. ACK put ESTABLISHED accept() client server syns queue accept queue 第一次握手client 发送 SYN 到 server状态修改为 SYN_SENDserver 收到状态改变为 SYN_REVD并将该请求放入 sync queue 队列第二次握手server 回复 SYN ACK 给 clientclient 收到状态改变为 ESTABLISHED并发送 ACK 给 server第三次握手server 收到 ACK状态改变为 ESTABLISHED将该请求从 sync queue 放入 accept queue 其中 3次握手是发生在在accept前面的为什么要放入1个全连接队列呢因为accept接受连接的能力是有限的放入队列就是为了接受完1个连接之后再从队列里面取出来接受下一个连接这个accept就可以理解为nio中的accept接受连接的方法。 在 linux 2.2 之前backlog 大小包括了两个队列的大小在 2.2 之后分别用下面两个参数来控制sync queue - 半连接队列 半连接队列就是还没有完成3次握手的连接信息先放入半连接队列大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定在 syncookies 启用的情况下逻辑上没有最大值限制这个设置便被忽略 accept queue - 全连接队列 已经完成3次握手的连接信息放入全连接队列其大小通过 /proc/sys/net/core/somaxconn 指定在使用 listen 函数时内核会根据传入的 backlog 参数与系统参数取二者的较小值如果 accpet queue 队列满了再有一个客户端再想连server 将发送一个拒绝连接的错误信息到 client netty 中可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小 可以通过下面源码查看默认大小 public class DefaultServerSocketChannelConfig extends DefaultChannelConfigimplements ServerSocketChannelConfig {private volatile int backlog NetUtil.SOMAXCONN;// ... }课堂调试关键断点为io.netty.channel.nio.NioEventLoop#processSelectedKey 课堂测试 为了测试SO_BACKLOG参数的作用使用ServerBootstrap().option(…)设置SocketChannel的参数为2取系统的参数与应用参数的最小值这里肯定就是2了2比较小然后在NioEventLoop#processSelectedKey中打上debug并且开3个客户端与服务端建立连接发现第三个客户端就报错了拒绝连接 TestBacklogServer public class TestBacklogServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).option(ChannelOption.SO_BACKLOG, 2) // 全队列满了.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).bind(8080);} }TestBacklogClient Slf4j public class TestBacklogClient {public static void main(String[] args) {NioEventLoopGroup worker new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());}});ChannelFuture channelFuture bootstrap.connect(127.0.0.1, 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}} }debug的位置 public final class NioEventLoop extends SingleThreadEventLoop {// ...private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {// ...// 在此处打上debug使用Thread模式, 这里当触发了accept事件, 下面的read()方法就相当于accept接受连接// 这里打上debug之后可短请求建立连接的过程就都卡在 全连接队列里面了if ((readyOps (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! 0 || readyOps 0) {unsafe.read();}// ...}// ... }报错 当连接第三个客户端时客户端报错如下 21:28:41 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xc42b2531] REGISTERED 21:28:41 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xc42b2531] CONNECT: /127.0.0.1:8080 21:28:43 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xc42b2531] CLOSE 21:28:43 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xc42b2531] UNREGISTERED Exception in thread main io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080 Caused by: java.net.ConnectException: Connection refused: no further informationat sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)at java.lang.Thread.run(Thread.java:748)笔记测试 oio 中更容易说明不用 debug 模式 public class Server {public static void main(String[] args) throws IOException {ServerSocket ss new ServerSocket(8888, 2);Socket accept ss.accept();System.out.println(accept);System.in.read();} }客户端启动 4 个 public class Client {public static void main(String[] args) throws IOException {try {Socket s new Socket();System.out.println(new Date() connecting...);s.connect(new InetSocketAddress(localhost, 8888),1000);System.out.println(new Date() connected...);s.getOutputStream().write(1);System.in.read();} catch (IOException e) {System.out.println(new Date() connecting timeout...);e.printStackTrace();}} }第 123 个客户端都打印但除了第一个处于 accpet 外其它两个都处于 accept queue 中 Tue Apr 21 20:30:28 CST 2020 connecting... Tue Apr 21 20:30:28 CST 2020 connected...第 4 个客户端连接时 Tue Apr 21 20:53:58 CST 2020 connecting... Tue Apr 21 20:53:59 CST 2020 connecting timeout... java.net.SocketTimeoutException: connect timed out3ulimit -n 属于操作系统参数它是限制一个进程能够同时打开的最大的文件描述符的数量是为了保护系统防止1个进程打开文件或socket太多了 4TCP_NODELAY 属于 SocketChannal 参数在服务端使用childOption(…)来设置 Nagle算法会当发送的数据包攒够了一批了才会把数据包发送出去所以可能造成接收方不能及时的收到消息。netty中默认该设置为false即开启了Nagle算法建议为了保证及时设置为true 5SO_SNDBUF SO_RCVBUF SO_SNDBUF 是发送缓冲区SO_RCVBUF是接收缓冲区它两决定了滑动窗口的上限建议不要调整操作系统会自动调整 SO_SNDBUF 属于 SocketChannal 参数SO_RCVBUF 既可用于 SocketChannal 参数也可以用于 ServerSocketChannal 参数建议设置到 ServerSocketChannal 上 6ALLOCATOR 属于 SocketChannal 参数用来分配 ByteBuf ctx.alloc() 追踪配置参数过程由ChannelConfig 找到默认的实现类 DefaultChannelConfig在DefaultChannelConfig的成员变量ByteBufAllocator allocator中默认赋值为ByteBufAllocator.DEFAULT从而找到ByteBufUtil.DEFAULT_ALLOCATOR ByteBufUtil public final class ByteBufUtil {static final int WRITE_CHUNK_SIZE 8192;static final ByteBufAllocator DEFAULT_ALLOCATOR;// ...static {// 系统属性配置: io.netty.allocator.typeString allocType SystemPropertyUtil.get(io.netty.allocator.type, PlatformDependent.isAndroid() ? unpooled : pooled);allocType allocType.toLowerCase(Locale.US).trim();ByteBufAllocator alloc;if (unpooled.equals(allocType)) {// 从这里点击DEFAULT进去可看到io.netty.noPreferDirect系统属性可配置是否不优先使用直接内存,默认false// 即优先使用直接内存alloc UnpooledByteBufAllocator.DEFAULT;logger.debug(-Dio.netty.allocator.type: {}, allocType);} else if (pooled.equals(allocType)) {alloc PooledByteBufAllocator.DEFAULT;logger.debug(-Dio.netty.allocator.type: {}, allocType);} else {alloc PooledByteBufAllocator.DEFAULT;logger.debug(-Dio.netty.allocator.type: pooled (unknown: {}), allocType);}DEFAULT_ALLOCATOR alloc;THREAD_LOCAL_BUFFER_SIZE SystemPropertyUtil.getInt(io.netty.threadLocalDirectBufferSize, 0);logger.debug(-Dio.netty.threadLocalDirectBufferSize: {}, THREAD_LOCAL_BUFFER_SIZE);MAX_CHAR_BUFFER_SIZE SystemPropertyUtil.getInt(io.netty.maxThreadLocalCharBufferSize, 16 * 1024);logger.debug(-Dio.netty.maxThreadLocalCharBufferSize: {}, MAX_CHAR_BUFFER_SIZE);}// ... }Idea中为java程序添加启动参数含VM options、Program arguments、Environment variable 测试 如下配置系统属性查看服务端分配的ByteBuf类型注意是使用ctx.alloc()方法时分配得到的ByteBuf类型哦 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jqxEwsDA-1690809538413)(assets/image-20230730230801541.png)] TestByteBuf Slf4j public class TestByteBuf {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, // 前面没有设置任何的解码器, 因此这里的msg就是ByteBuf,// 注意这里的msg会强制使用直接内存,不受所设置的参数控制,// 因为netty认为直接内存对于网络io的效率更高,Object msg) throws Exception {// 注意: 这里是从ctx中分配的ByteBufByteBuf buf ctx.alloc().buffer();log.debug(alloc buf {}, buf);}});}}).bind(8080);} }TestByteBufClient Slf4j public class TestByteBufClient {public static void main(String[] args) {NioEventLoopGroup worker new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(hello!.getBytes()));}});}});ChannelFuture channelFuture bootstrap.connect(127.0.0.1, 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}} }测试结果 当不按照图上配置时即默认的情况如下使用的是 PooledUnsafeDirectByteBuf即默认为池化的直接内存 23:15:16 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x0e291fc0, L:/127.0.0.1:8080 - R:/127.0.0.1:61682] REGISTERED 23:15:16 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x0e291fc0, L:/127.0.0.1:8080 - R:/127.0.0.1:61682] ACTIVE 23:15:16 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x0e291fc0, L:/127.0.0.1:8080 - R:/127.0.0.1:61682] READ: 6B-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 68 65 6c 6c 6f 21 |hello! | ------------------------------------------------------------------------- 23:15:16 [DEBUG] [nioEventLoopGroup-2-2] c.z.s.TestByteBuf - alloc buf PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256) 23:15:16 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x0e291fc0, L:/127.0.0.1:8080 - R:/127.0.0.1:61682] READ COMPLETE 当按照图上配置时使用的是UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf即非池化堆内存 23:17:34 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0xfa6477e0, L:/127.0.0.1:8080 - R:/127.0.0.1:61787] REGISTERED 23:17:34 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0xfa6477e0, L:/127.0.0.1:8080 - R:/127.0.0.1:61787] ACTIVE 23:17:34 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0xfa6477e0, L:/127.0.0.1:8080 - R:/127.0.0.1:61787] READ: 6B-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 68 65 6c 6c 6f 21 |hello! | ------------------------------------------------------------------------- 23:17:34 [DEBUG] [nioEventLoopGroup-2-2] c.z.s.TestByteBuf - alloc buf UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 256) 23:17:34 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0xfa6477e0, L:/127.0.0.1:8080 - R:/127.0.0.1:61787] READ COMPLETE 7RCVBUF_ALLOCATOR 属于 SocketChannal 参数控制 netty 接收缓冲区大小负责入站数据ByteBuf的初始分配决定入站缓冲区的大小并可动态调整统一采用 direct 直接内存具体池化还是非池化由 上面的提到的 allocator 决定 测试 客户端向服务端发送消息后服务端收到这个消息初始默认封装为ByteBuf这个初始的ByteBuf设置其实就是由RCVBUF_ALLOCATOR设置决定的具体规则上面有写 服务端 Slf4j public class TestByteBuf {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {/*ByteBuf buf ctx.alloc().buffer();log.debug(alloc buf {}, buf);*/log.info(msg: {}, msg);}});}}).bind(8080);} } 客户端 Slf4j public class TestBacklogClient {public static void main(String[] args) {NioEventLoopGroup worker new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(hello!.getBytes()));}});}});ChannelFuture channelFuture bootstrap.connect(127.0.0.1, 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}} }输出如下测试仍然是设置了-Dio.netty.allocator.typeunpooled -Dio.netty.noPreferDirecttrue系统属性但是显然第二个属性设置对此是无效的 23:50:44 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x27e1162b, L:/127.0.0.1:8080 - R:/127.0.0.1:62407] REGISTERED 23:50:44 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x27e1162b, L:/127.0.0.1:8080 - R:/127.0.0.1:62407] ACTIVE 23:50:44 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x27e1162b, L:/127.0.0.1:8080 - R:/127.0.0.1:62407] READ: 6B-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 68 65 6c 6c 6f 21 |hello! | ------------------------------------------------------------------------- 23:50:44 [INFO ] [nioEventLoopGroup-2-2] c.z.s.TestByteBuf - msg: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(ridx: 0, widx: 6, cap: 1024) 23:50:44 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x27e1162b, L:/127.0.0.1:8080 - R:/127.0.0.1:62407] READ COMPLETE 源码要点 跟源码时候注意以下几个关键的类 AbstractNioByteChannel#read()方法 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EWouPHAv-1690809538416)(assets/image-20230730234754420.png)] DefaultMaxMessageRecvByteBufAllocator#allocate(ByteBufAllocator alloc) [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e0jhsMzy-1690809538416)(assets/image-20230730234813577.png)] AdaptiveRecvByteBufAllocator默认无参构造 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-My61lpdG-1690809538417)(assets/image-20230730234824342.png)] 1.3 RPC 框架 1)准备工作 这些代码可以认为是现成的无需从头编写练习 为了简化起见在原来聊天项目的基础上新增 Rpc 请求和响应消息 修改Message Data public abstract class Message implements Serializable {// 省略旧的代码public static final int RPC_MESSAGE_TYPE_REQUEST 101;public static final int RPC_MESSAGE_TYPE_RESPONSE 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}RpcRequestMessage 请求消息 Getter ToString(callSuper true) public class RpcRequestMessage extends Message {/*** 调用的接口全限定名服务端根据它找到实现*/private String interfaceName;/*** 调用接口中的方法名*/private String methodName;/*** 方法返回类型*/private Class? returnType;/*** 方法参数类型数组*/private Class[] parameterTypes;/*** 方法参数值数组*/private Object[] parameterValue;public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class? returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName interfaceName;this.methodName methodName;this.returnType returnType;this.parameterTypes parameterTypes;this.parameterValue parameterValue;}Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;} }RpcResponseMessage 响应消息 Data ToString(callSuper true) public class RpcResponseMessage extends Message {/*** 返回值*/private Object returnValue;/*** 异常值*/private Exception exceptionValue;Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;} }RpcServer服务器架子 Slf4j public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss new NioEventLoopGroup();NioEventLoopGroup worker new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();// rpc 请求消息处理器待实现RpcRequestMessageHandler RPC_HANDLER new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error(server error, e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}} }RpcClient客户端架子 public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();// rpc 响应消息处理器待实现RpcResponseMessageHandler RPC_HANDLER new RpcResponseMessageHandler();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel bootstrap.connect(localhost, 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error(client error, e);} finally {group.shutdownGracefully();}} }ServicesFactory 服务器端的 service 获取 public class ServicesFactory {static Properties properties;static MapClass?, Object map new ConcurrentHashMap();static {try (InputStream in Config.class.getResourceAsStream(/application.properties)) {properties new Properties();properties.load(in);SetString names properties.stringPropertyNames();for (String name : names) {if (name.endsWith(Service)) {Class? interfaceClass Class.forName(name);Class? instanceClass Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static T T getService(ClassT interfaceClass) {return (T) map.get(interfaceClass);} }相关配置 application.properties serializer.algorithmJson cn.itcast.server.service.HelloServicecn.itcast.server.service.HelloServiceImpl2)服务器 handler RpcRequestMessageHandler Slf4j ChannelHandler.Sharable public class RpcRequestMessageHandler extends SimpleChannelInboundHandlerRpcRequestMessage {Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {// 获取真正的实现对象Object service ServicesFactory.getService(Class.forName(message.getInterfaceName()));// 获取要调用的方法Method method service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());// 调用方法Object invoke method.invoke(service, message.getParameterValue());// 调用成功response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();// 调用异常response.setExceptionValue(e);}// 返回结果ctx.writeAndFlush(response);} }3)RpcClient第一版 只发消息 Slf4j public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER new RpcResponseMessageHandler();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});// 客户端请求与服务端建立连接Channel channel bootstrap.connect(localhost, 8080).sync().channel();// 客户端发送rpc请求消息//这个writeAndFlush方法也是命令调用, 发出写出消息命令后, 就会立即返回, 真正写出消息是交给其它线程完成ChannelFuture future channel.writeAndFlush(new RpcRequestMessage( 1,cn.itcast.server.service.HelloService,sayHello,String.class,new Class[]{String.class},new Object[]{张三}))// 这里可以使用addListener方法对上面这个writeAndFlush作调试//也可以调用future.sync()对future在当前线程作同步调用//这里的promise就是上面的future, 是同一个对象.addListener(promise - {if (!promise.isSuccess()) {Throwable cause promise.cause();log.error(error, cause);}});channel.closeFuture().sync(); // 调用sync()方法, 同步阻塞, 当channel关闭时, 才会向下运行} catch (Exception e) {log.error(client error, e);} finally {group.shutdownGracefully();}} }4)客户端 handler 第一版 Slf4j ChannelHandler.Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandlerRpcResponseMessage {Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug({}, msg);} }流程分析 客户端请求与服务端建立连接然后客户端发送了1个rpc请求消息这个请求消息就找到了客户端的所有出站处理器从下往上找取依次执行首先是自定义编解码器对消息编码、日志处理器记录日志最后就发给了服务端。消息到达服务器端服务端拿到消息后进行入站处理器依次处理首先是粘包半包处理、日志处理器记录日志、自定义编解码器对消息解码、服务端自定义rpc请求处理器在服务端自定义rpc请求处理器中通过获取解码后的消息获取到反射相关的信息找到 要调用的目标对象和方法进行反射调用获得反射调用结果然后把调用结果成功就是结果失败就是异常封装到rpc响应消息对象中通过ctx写回给客户端服务端写回响应消息之前又会依次经过服务端出站处理器首先是自定义编解码器对消息编码、日志处理器记录日志最后就发给了客户端。消息到达客户端客户端拿到消息后再做入站处理器依次处理首先是粘包半包处理、日志处理器记录日志、自定义编解码器对消息解码、客户端rpc响应处理器其中客户端rpc响应处理器最后对消息进行了打印。 Gson问题 上面的自定义编解码器在序列化 和反序列化 Class类的时候会报错。因此需要往Gson对象中注册类型适配器使用注册过类型适配器的Gson对象去做转换。下面创建了ClassCodec对象实现了JsonSerializerClass?, JsonDeserializerClass?接口并注册到了Gson中以支持Class类的json转换操作。 /*** 用于扩展序列化、反序列化算法*/ public interface Serializer {// 反序列化方法T T deserialize(ClassT clazz, byte[] bytes);// 序列化方法T byte[] serialize(T object);enum Algorithm implements Serializer {Java {// ...java序列化和反序列化不作修改, 省略},Json {Overridepublic T T deserialize(ClassT clazz, byte[] bytes) {Gson gson new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json new String(bytes, StandardCharsets.UTF_8);return gson.fromJson(json, clazz);}Overridepublic T byte[] serialize(T object) {Gson gson new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json gson.toJson(object);return json.getBytes(StandardCharsets.UTF_8);}}}class ClassCodec implements JsonSerializerClass?, JsonDeserializerClass? {Overridepublic Class? deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {try {String str json.getAsString();return Class.forName(str);} catch (ClassNotFoundException e) {throw new JsonParseException(e);}}Override // String.classpublic JsonElement serialize(Class? src, Type typeOfSrc, JsonSerializationContext context) {// class - jsonreturn new JsonPrimitive(src.getName());}} }5)RpcClientManager 获得channel使用netty的Bootstrap客户端启动器连接netty服务端获得channel并将此channel使用单例模式设计成唯一实例 使用代理发起rpc请求使用动态代理屏蔽发起rpc请求是封装消息的过程把这个复杂的过程封装到jdk的InvocationHandler中然后使用channel写出消息 两个线程通信拿结果的问题发起rpc请求的线程是main线程获得结果的线程是netty的nio线程netty中的eventLoop因此需要把nio线程获得的结果给到main线程的调用处提示使用Promise可以做这件事。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zzI0YCUf-1690809538418)(assets/image-20230731121531720.png)] Slf4j public class RpcClientManager {public static void main(String[] args) {HelloService service getProxyService(HelloService.class);System.out.println(service.sayHello(zhangsan));// System.out.println(service.sayHello(lisi));// System.out.println(service.sayHello(wangwu));}// 创建代理类public static T T getProxyService(ClassT serviceClass) {ClassLoader loader serviceClass.getClassLoader();Class?[] interfaces new Class[]{serviceClass};Object o Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) - {// 1. 将方法调用转换为 消息对象int sequenceId SequenceIdGenerator.nextId(); RpcRequestMessage msg new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 准备一个空 Promise 对象来接收结果 指定 promise 对象 异步接收结果线程DefaultPromiseObject promise new DefaultPromise(getChannel().eventLoop());RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);//promise.addListener(future - { // 这个回调方法就是由创建DefaultPromise时构造方法中所传入的// 线程 // 的 异步接收结果线程 来执行。但是这里没有使用这种用法, 这里// }); // 使用的是下面在调用线程上(main线程)作同步等待// 4. 等待 promise 结果// 与promise.sync()的区别在于promise.await()不会抛出异常;而promise.sync()会抛出异常promise.await(); if(promise.isSuccess()) {// 调用正常return promise.getNow();} else {// 调用失败throw new RuntimeException(promise.cause());}});return (T) o;}private static Channel channel null;private static final Object LOCK new Object();// 获取唯一的 channel 对象public static Channel getChannel() {if (channel ! null) {return channel;}synchronized (LOCK) {if (channel ! null) {return channel;}initChannel();return channel;}}// 初始化 channel 方法private static void initChannel() {NioEventLoopGroup group new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER new RpcResponseMessageHandler();Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel bootstrap.connect(localhost, 8080).sync().channel();// 当channel关闭时, 执行的异步任务由原来的同步, 改成了异步channel.closeFuture().addListener(future - {group.shutdownGracefully();});} catch (Exception e) {log.error(client error, e);}} }SequenceIdGenerator public abstract class SequenceIdGenerator {private static final AtomicInteger id new AtomicInteger();public static int nextId() {return id.incrementAndGet();} }流程分析 动态代理执行流程分析 main线程调用代理对象的方法并传入方法参数触发代理对象的InvocationHandler#invoke方法执行使用channel写出消息发出rpc请求但是不能立即得到响应结果因此创建1个Promise对象来接收响应消息而Promise放在ConcurrentHashMap中Promise与rpc请求绑定的sequenceId绑定到一起保存在这个Map中然后调用promise.await()方法等待结果。当响应回来后在RpcResponseMessageHandler中由nio线程执行即eventLoop根据响应的sequenceId从ConcurrentHashMap中找到对应的Promise将结果设置到此Promise对象中一旦Promise对象中被填入结果不管是成功还是失败main线程中调用promise.await()方法就会停止阻塞而向下运行然后获取响应结果将结果返回给代理对象的调用者。 6)RpcResponseMessageHandler Slf4j ChannelHandler.Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandlerRpcResponseMessage {// 序号 用来接收结果的 promise 对象public static final MapInteger, PromiseObject PROMISES new ConcurrentHashMap();Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug({}, msg);// 根据响应消息的的sequenceId 拿到对应的 空promise, 并且移除该Promise对象PromiseObject promise PROMISES.remove(msg.getSequenceId());if (promise ! null) {Object returnValue msg.getReturnValue();Exception exceptionValue msg.getExceptionValue();// 当promise被填充值后, 主线程使用promise.await()方法就会停止阻塞向下运行if(exceptionValue ! null) {promise.setFailure(exceptionValue);} else {promise.setSuccess(returnValue);}}} }2. 源码分析 2.1 启动剖析 我们就来看看 netty 中对下面的代码是怎样进行处理的 //1 netty 中使用 NioEventLoopGroup 简称 nio boss 线程来封装线程和 selector Selector selector Selector.open(); //2 创建 NioServerSocketChannel同时会初始化它关联的 handler以及为原生 ssc 存储 config NioServerSocketChannel attachment new NioServerSocketChannel();//3 创建 NioServerSocketChannel 时创建了 java 原生的 ServerSocketChannel ServerSocketChannel serverSocketChannel ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false);//4 启动 nio boss 线程执行接下来的操作//5 注册仅关联 selector 和 NioServerSocketChannel未关注事件 SelectionKey selectionKey serverSocketChannel.register(selector, 0, attachment);//6 head - 初始化器 - ServerBootstrapAcceptor - tail初始化器是一次性的只为添加 acceptor//7 绑定端口 serverSocketChannel.bind(new InetSocketAddress(8080));//8 触发 channel active 事件在 head 中关注 op_accept 事件 selectionKey.interestOps(SelectionKey.OP_ACCEPT);入口 io.netty.bootstrap.ServerBootstrap#bind 关键代码 io.netty.bootstrap.AbstractBootstrap#doBind private ChannelFuture doBind(final SocketAddress localAddress) {// 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成从而回调 3.2 处代码final ChannelFuture regFuture initAndRegister();final Channel channel regFuture.channel();if (regFuture.cause() ! null) {return regFuture;}// 2. 因为是 initAndRegister 异步执行需要分两种情况来看调试时也需要通过 suspend 断点类型加以区分// 2.1 如果已经完成if (regFuture.isDone()) {ChannelPromise promise channel.newPromise();// 3.1 立刻调用 doBind0doBind0(regFuture, channel, localAddress, promise);return promise;} // 2.2 还没有完成else {final PendingRegistrationPromise promise new PendingRegistrationPromise(channel);// 3.2 回调 doBind0regFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause future.cause();if (cause ! null) {// 处理异常...promise.setFailure(cause);} else {promise.registered();// 3. 由注册线程去执行 doBind0doBind0(regFuture, channel, localAddress, promise);}}});return promise;} }关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister final ChannelFuture initAndRegister() {Channel channel null;try {channel channelFactory.newChannel();// 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializerinit(channel);} catch (Throwable t) {// 处理异常...return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上ChannelFuture regFuture config().group().register(channel);if (regFuture.cause() ! null) {// 处理异常...}return regFuture; }关键代码 io.netty.bootstrap.ServerBootstrap#init // 这里 channel 实际上是 NioServerSocketChannel void init(Channel channel) throws Exception {final MapChannelOption?, Object options options0();synchronized (options) {setChannelOptions(channel, options, logger);}final MapAttributeKey?, Object attrs attrs0();synchronized (attrs) {for (EntryAttributeKey?, Object e: attrs.entrySet()) {SuppressWarnings(unchecked)AttributeKeyObject key (AttributeKeyObject) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p channel.pipeline();final EventLoopGroup currentChildGroup childGroup;final ChannelHandler currentChildHandler childHandler;final EntryChannelOption?, Object[] currentChildOptions;final EntryAttributeKey?, Object[] currentChildAttrs;synchronized (childOptions) {currentChildOptions childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs childAttrs.entrySet().toArray(newAttrArray(0));}// 为 NioServerSocketChannel 添加初始化器p.addLast(new ChannelInitializerChannel() {Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline ch.pipeline();ChannelHandler handler config.handler();if (handler ! null) {pipeline.addLast(handler);}// 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannelch.eventLoop().execute(new Runnable() {Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}}); }关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 一些检查略...AbstractChannel.this.eventLoop eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 首次执行 execute 方法时会启动 nio 线程之后注册等操作在 nio 线程上执行// 因为只有一个 NioServerSocketChannel 因此也只会有一个 boss nio 线程// 这行代码完成的事实是 main - nio boss 线程的切换eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 日志记录...closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}} }io.netty.channel.AbstractChannel.AbstractUnsafe#register0 private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration neverRegistered;// 1.2.1 原生的 nio channel 绑定到 selector 上注意此时没有注册 selector 关注事件附件为 NioServerSocketChanneldoRegister();neverRegistered false;registered true;// 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannelpipeline.invokeHandlerAddedIfNeeded();// 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0safeSetSuccess(promise);pipeline.fireChannelRegistered();// 对应 server socket channel 还未绑定isActive 为 falseif (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);} }关键代码 io.netty.channel.ChannelInitializer#initChannel private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.try {// 1.2.2.1 执行初始化initChannel((C) ctx.channel());} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {// 1.2.2.2 移除初始化器ChannelPipeline pipeline ctx.pipeline();if (pipeline.context(this) ! null) {pipeline.remove(this);}}return true;}return false; }关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0 // 3.1 或 3.2 执行 doBind0 private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}}); }关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) localAddress instanceof InetSocketAddress !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() !PlatformDependent.isWindows() !PlatformDependent.maybeSuperUser()) {// 记录日志...}boolean wasActive isActive();try {// 3.3 执行端口绑定doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive isActive()) {invokeLater(new Runnable() {Overridepublic void run() {// 3.4 触发 active 事件pipeline.fireChannelActive();}});}safeSetSuccess(promise); }3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());} }3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();// 触发 read (NioServerSocketChannel 上的 read 不是读取数据只是为了触发 channel 的事件注册)readIfIsAutoRead(); }关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey this.selectionKey;if (!selectionKey.isValid()) {return;}readPending true;final int interestOps selectionKey.interestOps();// readInterestOp 取值是 16在 NioServerSocketChannel 创建时初始化好代表关注 accept 事件if ((interestOps readInterestOp) 0) {selectionKey.interestOps(interestOps | readInterestOp);} }2.2 NioEventLoop 剖析 NioEventLoop 线程不仅要处理 IO 事件还要处理 Task包括普通任务和定时任务 提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute public void execute(Runnable task) {if (task null) {throw new NullPointerException(task);}boolean inEventLoop inEventLoop();// 添加任务其中队列使用了 jctools 提供的 mpsc 无锁队列addTask(task);if (!inEventLoop) {// inEventLoop 如果为 false 表示由其它线程来调用 execute即首次调用这时需要向 eventLoop 提交首个任务启动死循环会执行到下面的 doStartThreadstartThread();if (isShutdown()) {// 如果已经 shutdown做拒绝逻辑代码略...}}if (!addTaskWakesUp wakesUpForTask(task)) {// 如果线程由于 IO select 阻塞了添加的任务的线程需要负责唤醒 NioEventLoop 线程wakeup(inEventLoop);} }唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup Override protected void wakeup(boolean inEventLoop) {if (!inEventLoop wakenUp.compareAndSet(false, true)) {selector.wakeup();} }启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread private void doStartThread() {assert thread null;executor.execute(new Runnable() {Overridepublic void run() {// 将线程池的当前线程保存在成员变量中以便后续使用thread Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success false;updateLastExecutionTime();try {// 调用外部类 SingleThreadEventExecutor 的 run 方法进入死循环run 方法见下SingleThreadEventExecutor.this.run();success true;} catch (Throwable t) {logger.warn(Unexpected exception from an event executor: , t);} finally {// 清理工作代码略...}}}); }io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环不断看有没有新任务有没有 IO 事件 protected void run() {for (;;) {try {try {// calculateStrategy 的逻辑如下// 有任务会执行一次 selectNow清除上一次的 wakeup 结果无论有没有 IO 事件都会跳过 switch// 没有任务会匹配 SelectStrategy.SELECT看是否应当阻塞switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:// 因为 IO 线程和提交任务线程都有可能执行 wakeup而 wakeup 属于比较昂贵的操作因此使用了一个原子布尔对象 wakenUp它取值为 true 时表示该由当前线程唤醒// 进行 select 阻塞并设置唤醒状态为 falseboolean oldWakenUp wakenUp.getAndSet(false);// 如果在这个位置非 EventLoop 线程抢先将 wakenUp 置为 true并 wakeup// 下面的 select 方法不会阻塞// 等 runAllTasks 处理完成后到再循环进来这个阶段新增的任务会不会及时执行呢?// 因为 oldWakenUp 为 true因此下面的 select 方法就会阻塞直到超时// 才能执行让 select 方法无谓阻塞select(oldWakenUp);if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys 0;needsToSelectAgain false;// ioRatio 默认是 50final int ioRatio this.ioRatio;if (ioRatio 100) {try {processSelectedKeys();} finally {// ioRatio 为 100 时总是运行完所有非 IO 任务runAllTasks();}} else { final long ioStartTime System.nanoTime();try {processSelectedKeys();} finally {// 记录 io 事件处理耗时final long ioTime System.nanoTime() - ioStartTime;// 运行非 IO 任务一旦超时会退出 runAllTasksrunAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}} }注意 这里有个费解的地方就是 wakeup它既可以由提交任务的线程来调用比较好理解也可以由 EventLoop 线程来调用比较费解这里要知道 wakeup 方法的效果 由非 EventLoop 线程调用会唤醒当前在执行 select 阻塞的 EventLoop 线程由 EventLoop 自己调用会本次的 wakeup 会取消下一次的 select 操作 参考下图 io.netty.channel.nio.NioEventLoop#select private void select(boolean oldWakenUp) throws IOException {Selector selector this.selector;try {int selectCnt 0;long currentTimeNanos System.nanoTime();// 计算等待时间// * 没有 scheduledTask超时时间为 1s// * 有 scheduledTask超时时间为 下一个定时任务执行时间 - 当前时间long selectDeadLineNanos currentTimeNanos delayNanos(currentTimeNanos);for (;;) {long timeoutMillis (selectDeadLineNanos - currentTimeNanos 500000L) / 1000000L;// 如果超时退出循环if (timeoutMillis 0) {if (selectCnt 0) {selector.selectNow();selectCnt 1;}break;}// 如果期间又有 task 退出循环如果没这个判断那么任务就会等到下次 select 超时时才能被执行// wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeupif (hasTasks() wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt 1;break;}// select 有限时阻塞// 注意 nio 有 bug当 bug 出现时select 方法即使没有时间发生也不会阻塞住导致不断空轮询cpu 占用 100%int selectedKeys selector.select(timeoutMillis);// 计数加 1selectCnt ;// 醒来后如果有 IO 事件、或是由非 EventLoop 线程唤醒或者有任务退出循环if (selectedKeys ! 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {// 线程被打断退出循环// 记录日志selectCnt 1;break;}long time System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) currentTimeNanos) {// 如果超时计数重置为 1下次循环就会 breakselectCnt 1;} // 计数超过阈值由 io.netty.selectorAutoRebuildThreshold 指定默认 512// 这是为了解决 nio 空轮询 bugelse if (SELECTOR_AUTO_REBUILD_THRESHOLD 0 selectCnt SELECTOR_AUTO_REBUILD_THRESHOLD) {// 重建 selectorselector selectRebuildSelector(selectCnt);selectCnt 1;break;}currentTimeNanos time;}if (selectCnt MIN_PREMATURE_SELECTOR_RETURNS) {// 记录日志}} catch (CancelledKeyException e) {// 记录日志} }处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys private void processSelectedKeys() {if (selectedKeys ! null) {// 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet // SelectedSelectionKeySet 底层为数组实现可以提高遍历性能原本为 HashSetprocessSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());} }io.netty.channel.nio.NioEventLoop#processSelectedKey private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe ch.unsafe();// 当 key 取消或关闭时会导致这个 key 无效if (!k.isValid()) {// 无效时处理...return;}try {int readyOps k.readyOps();// 连接事件if ((readyOps SelectionKey.OP_CONNECT) ! 0) {int ops k.interestOps();ops ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// 可写事件if ((readyOps SelectionKey.OP_WRITE) ! 0) {ch.unsafe().forceFlush();}// 可读或可接入事件if ((readyOps (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! 0 || readyOps 0) {// 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read// 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#readunsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());} }2.3 accept 剖析 nio 中如下代码在 netty 中的流程 //1 阻塞直到事件发生 selector.select();IteratorSelectionKey iter selector.selectedKeys().iterator(); while (iter.hasNext()) { //2 拿到一个事件SelectionKey key iter.next();//3 如果是 accept 事件if (key.isAcceptable()) {//4 执行 acceptSocketChannel channel serverSocketChannel.accept();channel.configureBlocking(false);//5 关注 read 事件channel.register(selector, SelectionKey.OP_READ);}// ... }先来看可接入事件处理accept io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read public void read() {assert eventLoop().inEventLoop();final ChannelConfig config config();final ChannelPipeline pipeline pipeline(); final RecvByteBufAllocator.Handle allocHandle unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed false;Throwable exception null;try {try {do {// doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf// readBuf 是一个 ArrayList 用来缓存消息int localRead doReadMessages(readBuf);if (localRead 0) {break;}if (localRead 0) {closed true;break;}// localRead 为 1就一条消息即接收一个客户端连接allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception t;}int size readBuf.size();for (int i 0; i size; i ) {readPending false;// 触发 read 事件让 pipeline 上的 handler 处理这时是处理// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelReadpipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception ! null) {closed closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown true;if (isOpen()) {close(voidPromise());}}} finally {if (!readPending !config.isAutoRead()) {removeReadOp();}} }关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead public void channelRead(ChannelHandlerContext ctx, Object msg) {// 这时的 msg 是 NioSocketChannelfinal Channel child (Channel) msg;// NioSocketChannel 添加 childHandler 即初始化器child.pipeline().addLast(childHandler);// 设置选项setChannelOptions(child, childOptions, logger);for (EntryAttributeKey?, Object e: childAttrs) {child.attr((AttributeKeyObject) e.getKey()).set(e.getValue());}try {// 注册 NioSocketChannel 到 nio worker 线程接下来的处理也移交至 nio worker 线程childGroup.register(child).addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);} }又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法 public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 一些检查略...AbstractChannel.this.eventLoop eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 这行代码完成的事实是 nio boss - nio worker 线程的切换eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 日志记录...closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}} }io.netty.channel.AbstractChannel.AbstractUnsafe#register0 private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration neverRegistered;doRegister();neverRegistered false;registered true;// 执行初始化器执行前 pipeline 中只有 head - 初始化器 - tailpipeline.invokeHandlerAddedIfNeeded();// 执行后就是 head - logging handler - my handler - tailsafeSetSuccess(promise);pipeline.fireChannelRegistered();if (isActive()) {if (firstRegistration) {// 触发 pipeline 上 active 事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);} }回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();// 触发 read (NioSocketChannel 这里 read只是为了触发 channel 的事件注册还未涉及数据读取)readIfIsAutoRead(); }io.netty.channel.nio.AbstractNioChannel#doBeginRead protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey this.selectionKey;if (!selectionKey.isValid()) {return;}readPending true;// 这时候 interestOps 是 0final int interestOps selectionKey.interestOps();if ((interestOps readInterestOp) 0) {// 关注 read 事件selectionKey.interestOps(interestOps | readInterestOp);} }2.4 read 剖析 再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read注意发送的数据未必能够一次读完因此会触发多次 nio read 事件一次事件内会触发多次 pipeline read一次事件会触发一次 pipeline read complete public final void read() {final ChannelConfig config config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline pipeline();// io.netty.allocator.type 决定 allocator 的实现final ByteBufAllocator allocator config.getAllocator();// 用来分配 byteBuf确定单次读取大小final RecvByteBufAllocator.Handle allocHandle recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf null;boolean close false;try {do {byteBuf allocHandle.allocate(allocator);// 读取allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() 0) {byteBuf.release();byteBuf null;close allocHandle.lastBytesRead() 0;if (close) {readPending false;}break;}allocHandle.incMessagesRead(1);readPending false;// 触发 read 事件让 pipeline 上的 handler 处理这时是处理 NioSocketChannel 上的 handlerpipeline.fireChannelRead(byteBuf);byteBuf null;} // 是否要继续循环while (allocHandle.continueReading());allocHandle.readComplete();// 触发 read complete 事件pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {if (!readPending !config.isAutoRead()) {removeReadOp();}} }io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier) public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {return // 一般为 trueconfig.isAutoRead() // respectMaybeMoreData 默认为 true// maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等返回 true(!respectMaybeMoreData || maybeMoreDataSupplier.get()) // 小于最大次数maxMessagePerRead 默认 16totalMessages maxMessagePerRead // 实际读到了数据totalBytesRead 0; }
http://www.zqtcl.cn/news/193258/

相关文章:

  • 做网站现在挣钱吗wordpress 网址导航主题
  • 外贸网站什么采集wordpress主题更换logo
  • 唐山开发网站的公司长沙营销型网站设计
  • 数据库策略网站推广的有效方法有美辰网站建设
  • c 网站开发构想做网站的点子
  • 个人网站模板下载提供网站建设备案公司
  • 做网站需要会写代码6山东东营
  • 兼职刷客在哪个网站做网站搬家数据库配置
  • 做搬运的话哪个网站好网站模板建站
  • 建设个人信息网站wordpress 用户权限
  • 网站不显示域名解析错误怎么办公益网站设计
  • 怎么上传网站图片的链接手表网站排行榜
  • 网站推广方法100种百度排名规则
  • 上海专业网站建设公司站霸网络萝岗区网站建设推广
  • 做微商网站的公司永久免费crm管理系统
  • 网站开发的环境专业的建设网站
  • 公司网站建设知识注册网站备案
  • 营销型网站建设申请域名在域名做网站
  • 电商网站设计公司立找亿企邦山东德州网站建设哪家最好
  • 免费自建网站工具网站建设公司那个好
  • wordpress集成环境搭建短视频优化
  • 做网站一般把宽度做多少中国企业报集团官网
  • 什么软件可以建网站网站建设应该计入什么费用
  • 网站制作 手机版重庆网站建设mswzjs
  • 网站建设犀牛云品牌建设方案和思路
  • 网络管理系统的管理软件抖音优化推广
  • 昆山市有没有做网站设计的交互设计研究生
  • 本地网站asp iiswordpress 感染支付宝
  • 成都最专业做网站的wordpress升级500
  • 做网站首页图的规格网站建设的市场分析