当前位置: 首页 > news >正文

找人网站 优帮云西凤九网站建设的目标

找人网站 优帮云,西凤九网站建设的目标,微信公众号怎么分享wordpress,网站做seo安全吗基于Netty的分布式通信框架实现 前提介绍回顾Dubbo分布式通信框架组成元素程序执行流程消息协议设计实现机制ChannelInboundHandlerAdapter自定义事件处理 ChannelOutboundHandlerAdapter 编(解)码处理器编码过程阶段ChannelOutboundHandlerAdapter序列化实现ChannelOutboundHa… 基于Netty的分布式通信框架实现 前提介绍回顾Dubbo分布式通信框架组成元素程序执行流程消息协议设计实现机制ChannelInboundHandlerAdapter自定义事件处理 ChannelOutboundHandlerAdapter 编(解)码处理器编码过程阶段ChannelOutboundHandlerAdapter序列化实现ChannelOutboundHandlerAdapter压缩实现LengthBasedEncoder编码器 解码过程阶段 处理器链的建立创建ChannelPipeline对象ChannelPipeline中添加处理器添加的顺序形成处理器链 未完待续 前提介绍 今天我要向大家实现一个基于Netty实现的高性能远程通信框架这个框架利用了 Netty 的强大功能提供了快速、可靠的远程通信能力。 无论是构建大规模微服务架构还是实现分布式计算这个分布式通信框架都是一个不可或缺的利器。 回顾Dubbo 相信大家都指导DubboDubbo3这个非常著名的RPC框架对吧如果你忘记了那么我给您先垫垫底可以看到下面就是Dubbo的借本架构图当然Dubbo3会更加复杂我们先按照基础的Dubbo架构进行回顾 无论是在分布式系统、微服务架构还是其他需要跨网络进行通信的场景下这个框架都能够帮助你实现高效的数据传输和通信。它具备出色的性能和可扩展性能够满足各种复杂的通信需求。 但是无论是从层次化和结构化而言Dubbo/Dubbo3都过于的复杂了我们起始未必会用到那么复杂以及扩展性那么强的功能因此我们来实现一个属于我们自己的一个可靠且高性能的远程通信解决方案。 分布式通信框架 分布式通信框架是一种卓越的高性能远程通信解决方案它基于 Netty 实现了 TCP 通信的底层细节并对上层进行了封装以提供简单易用和高度可扩展的能力。 这个框架能够帮助开发者轻松构建分布式系统并实现可靠的跨网络通信。通过利用 Netty 的强大功能该框架能够提供出色的性能和可靠性同时还具备灵活的扩展性可以满足各种复杂的通信需求。 组成元素 先介绍一下网络通信的两个最基本的元素和属性如下所示。 Channel可以理解为一个通道即一条连接线路的概念。它承载着数据、信息或者信号的传输功能。 ChannelGroup由多个通道组合而成的一个概念。它将多条通道有机地集合在一起形成一个整体以便更高效地进行数据、信息或者信号的传输。 程序执行流程 下图自上而下分别为boss接受连接、channel、dispatcher、event listener和service。 这五个部分各自承载着独特的任务又彼此协作形成了一个系统化、高效化的运行流程。 Boss线程接受连接流程主要负责接受外部请求这些请求可能是来自用户的操作或是其他服务的调用。一旦接收到请求boss会进行必要的处理然后将请求分发给下面的线程池worker进行处理。 Worker线程系统中的工作执行者负责接收boss分发的任务然后执行具体的业务逻辑。这些任务可能涉及到数据的处理、服务的调用等。线程池worker通过channel与boss进行通信确保任务能够准确无误地传递。 dispatcher机制在worker执行任务的过程中需要有一个机制来调度和分配任务。这就是dispatcher的作用。 dispatcher根据一定的策略和规则将任务分配给合适的worker线程进行处理。这一环节保证了系统的负载均衡和高效运行。 EventListener基于在每个worker线程内部eventListener发挥着关键作用。它负责监听和处理线程中的事件比如任务的完成、异常等。通过eventListener系统能够及时响应各种事件进行必要的处理和反馈。 Service业务逻辑实现它代表了整个系统的核心业务逻辑。service接收并处理来自worker线程的任务完成具体的业务操作。这些操作可能涉及到数据的处理、服务的调用等。 消息协议设计 消息协议这里是指对消息编码和解码的规范的一种定义通信内置的消息协议采用如下结构其中包含了三个部分ID、Length 和 Content。 ID: 长度1 字节用途表示 Content 部分是否被压缩其中 1 表示 Content 部分被压缩0 表示未被压缩。 Length: 长度4 字节用途表示 ID 和 Content 的总长度。这通常用于消息分片或分批传输确保接收方可以正确地重新组装消息。 Content: 长度不定由 Length 字段决定用途真实的消息内容。根据 ID 的值它可能是压缩的或未压缩的。 如果 ID 为 1则 Content 部分可能会被某种算法如gzip压缩以减少存储或传输的空间需求。Length 字段确保了数据的完整性因为接收方可以根据这个长度字段正确地读取和重组数据。 在实际应用中这种结构通常用于网络通信、文件存储或数据库存储等场景其中需要对数据进行有效且紧凑的表示。 实现机制 Netty框架原生提供了一个处理器链该链用于对事件进行处理。每个处理器都实现了 ChannelHandler 接口。ChannelHandler 接口是一个空接口其中ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter。 我们主要关注这两个接口因为它们被用于处理读取输入和写入输出的消息。 ChannelInboundHandlerAdapter ChannelInboundHandlerAdapter是Netty框架中用于处理从网络到应用程序的事件的组件。它是一种特殊的ChannelHandler主要负责处理读取操作。 当网络通道接收到数据时ChannelInboundHandlerAdapter会被触发然后开发者可以通过重写其中的方法来执行需要的操作。常见的操作包括数据的解码、解压或反序列化等。 自定义事件处理 ChannelInboundHandlerAdapter是Netty中实现业务逻辑的关键组件它提供了丰富的方法来处理不同的事件例如通道激活、数据读取和异常处理等下面是对应的源码 public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {/*** Calls {link ChannelHandlerContext#fireChannelRegistered()} to forward* to the next {link ChannelInboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelRegistered();}/*** Calls {link ChannelHandlerContext#fireChannelUnregistered()} to forward* to the next {link ChannelInboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelUnregistered();}/*** Calls {link ChannelHandlerContext#fireChannelActive()} to forward* to the next {link ChannelInboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();}/*** Calls {link ChannelHandlerContext#fireChannelInactive()} to forward* to the next {link ChannelInboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();}/*** Calls {link ChannelHandlerContext#fireChannelRead(Object)} to forward* to the next {link ChannelInboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}/*** Calls {link ChannelHandlerContext#fireChannelReadComplete()} to forward* to the next {link ChannelInboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();}/*** Calls {link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward* to the next {link ChannelInboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ctx.fireUserEventTriggered(evt);}/*** Calls {link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward* to the next {link ChannelInboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelWritabilityChanged();}/*** Calls {link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward* to the next {link ChannelHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverrideSuppressWarnings(deprecation)public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.fireExceptionCaught(cause);} }通过自定义ChannelInboundHandlerAdapter开发者可以灵活地处理从网络到应用程序的数据传输过程稍后回进行分析介绍。 ChannelOutboundHandlerAdapter ChannelOutboundHandlerAdapter也是一种特殊的ChannelHandler用于处理从应用程序到网络的事件主要包括写出操作。它是Netty框架中的一个关键组件负责将应用程序的数据写入网络通道中以便发送给对应的接收端。 使用ChannelOutboundHandlerAdapter可以实现对写出事件的定制化处理例如数据的编码、压缩或序列化等操作以满足具体业务需求。它可以直接扩展ChannelOutboundHandlerAdapter类并重写其中的方法来实现特定的功能。 package io.netty.channel; import io.netty.channel.ChannelHandlerMask.Skip; import java.net.SocketAddress; /*** Skeleton implementation of a {link ChannelOutboundHandler}. This implementation just forwards each method call via* the {link ChannelHandlerContext}.*/ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {/*** Calls {link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward* to the next {link ChannelOutboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress,ChannelPromise promise) throws Exception {ctx.bind(localAddress, promise);}/*** Calls {link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)} to forward* to the next {link ChannelOutboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.connect(remoteAddress, localAddress, promise);}/*** Calls {link ChannelHandlerContext#disconnect(ChannelPromise)} to forward* to the next {link ChannelOutboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {ctx.disconnect(promise);}/*** Calls {link ChannelHandlerContext#close(ChannelPromise)} to forward* to the next {link ChannelOutboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {ctx.close(promise);}/*** Calls {link ChannelHandlerContext#deregister(ChannelPromise)} to forward* to the next {link ChannelOutboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.deregister(promise);}/*** Calls {link ChannelHandlerContext#read()} to forward* to the next {link ChannelOutboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void read(ChannelHandlerContext ctx) throws Exception {ctx.read();}/*** Calls {link ChannelHandlerContext#write(Object, ChannelPromise)} to forward* to the next {link ChannelOutboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);}/*** Calls {link ChannelHandlerContext#flush()} to forward* to the next {link ChannelOutboundHandler} in the {link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/SkipOverridepublic void flush(ChannelHandlerContext ctx) throws Exception {ctx.flush();} }编(解)码处理器 编码(解码)处理器、压缩(解压)处理器以及序列化(反序列化)处理器等都是直接或间接用于实现ChannelHandler的组件。 编码过程阶段 编码过程由三个Handler组合完成分别为序列化压缩数据以及编码处理。 ChannelOutboundHandlerAdapter序列化实现 当你需要实现序列化数据的发送时可以基于ChannelOutboundHandlerAdapter接口进行实现。下面是一个简单的示例代码展示了如何使用write方法将序列化后的数据发送到网络 public class SerializationHandler extends ChannelOutboundHandlerAdapter {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {// 进行数据序列化操作这里假设使用Java内置的序列化方式ByteArrayOutputStream bos new ByteArrayOutputStream();ObjectOutputStream oos new ObjectOutputStream(bos);oos.writeObject(msg);oos.flush();byte[] serializedData bos.toByteArray();// 将序列化后的数据写入网络通道ByteBuf byteBuf ctx.alloc().buffer();byteBuf.writeBytes(serializedData);ctx.write(byteBuf, promise);} }重写了write方法在该方法中进行了数据的序列化操作。具体来说我们使用Java内置的序列化方式将msg对象序列化为字节数组serializedData然后将序列化后的数据写入网络通道。 ChannelOutboundHandlerAdapter压缩实现 要通过数据压缩进行处理基于ChannelOutboundHandlerAdapter接口实现一个压缩处理器。使用DeflaterOutputStream进行数据压缩并发送到网络 public class CompressionHandler extends ChannelOutboundHandlerAdapter {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {// 创建输出流使用DeflaterOutputStream进行数据压缩ByteArrayOutputStream bos new ByteArrayOutputStream();DeflaterOutputStream dos new DeflaterOutputStream(bos);// 压缩数据dos.write((byte[]) msg);dos.finish();// 获取压缩后的数据byte[] compressedData bos.toByteArray();// 将压缩后的数据写入网络通道ByteBuf byteBuf ctx.alloc().buffer();byteBuf.writeBytes(compressedData);ctx.write(byteBuf, promise);} }同样也是重写了write方法在该方法中进行了数据的压缩操作。我们使用DeflaterOutputStream将原始数据(byte[]) msg进行压缩然后将压缩后的数据写入网络通道。 LengthBasedEncoder编码器 要实现Netty中的编码器你可以自定义一个类并实现MessageToByteEncoder接口。展示了如何编写一个基于字符串的编码器 public class LengthBasedEncoder extends MessageToByteEncoderString {Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {byte[] data msg.getBytes(StandardCharsets.UTF_8);int length data.length;out.writeInt(length);out.writeBytes(data);} }在encode方法中我们首先将字符串转换为字节数组使用UTF-8字符集进行编码。然后我们获取字节数组的长度并将其写入输出ByteBuf。最后我们将字节数组写入输出缓冲区。 注意上述示例中使用的是字符串编码器你可以根据实际需求替换成其他类型的编码器。同时也请确保在创建ByteBuf对象时使用适当的Allocator以获取更高效的内存分配和释放。 通过将自定义的编码器StringEncoder添加到Netty的ChannelPipeline中作为ChannelOutboundHandler使用你就可以在数据发送前将字符串编码为字节并写入网络通道中了。 解码过程阶段 解码的代码和编码的代码就是一个镜像操作和处理在这里就进行赘余了相信小伙伴都可以实现如果真的有不会实现的可以评论区留言告诉我我把完整代码给你们。 对于 TCP 通信而言粘包是很正常的现象因此 decoder 必须处理粘包问题。LengthFrameDecoder 是一个支持粘包处理的decoder 类抽象可基于基于长度的解码器的实现方式进行控制。 处理器链的建立 通过处理器链Netty框架可以非常灵活地处理不同类型的事件在Netty中我们可以通过ChannelPipeline来建立处理器链。ChannelPipeline是一个用于管理和执行处理器的容器它负责处理入站和出站的事件并将这些事件传递给适当的处理器。 创建ChannelPipeline对象 ChannelPipeline pipeline channel.pipeline();ChannelPipeline中添加处理器 pipeline.addLast(handler1, new Handler1()); pipeline.addLast(handler2, new Handler2());这里的 handler1 和 handler2 是处理器的名称可以根据需要进行命名。 添加的顺序形成处理器链 数据将按照顺序在处理器之间传递。最后一个添加的处理器将是数据的出站处理器第一个添加的处理器将是数据的入站处理器。 ChannelPipeline pipeline channel.pipeline(); pipeline.addLast(decoder, new StringDecoder()); pipeline.addLast(encoder, new StringEncoder()); pipeline.addLast(handler, new MyHandler());通过建立处理器链可以根据需要按照一定的顺序和逻辑处理数据。 未完待续 由于篇幅过长本文就到这里为止。下一篇文章将继续介绍《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现Dispatcher和EventListener下》并详细说明剩下的内容。敬请期待
http://www.zqtcl.cn/news/195758/

相关文章:

  • 网站开发常遇到客户问题wordpress怎么升级
  • 网站的空间是网站 建设 维护 公司
  • 关于网站建设的书籍网站设计的趋势
  • 临漳+网站建设深圳国贸网站建设
  • 安全的南昌网站制作上海网站建设网
  • 360网站制作潍坊医疗网站建设方案
  • 深圳网站策划公司域名解析暂时失败
  • 怎么做安居客网站wordpress 函数文件
  • 微名片网站怎么做html代码表示
  • 两学一做纪实评价系统网站如何做好百度推广
  • 网站设置手机才能播放企业网站开发需求
  • 网站建设微信运营销售做网站用啥语言
  • dw建设网站步骤活动汪活动策划网站
  • 民和县公司网站建设网站开发的特点
  • 模板企业快速建站上传网站中ftp地址写什么
  • 云南本地企业做网站太原网站制作公司哪家好
  • 西部数码域名网站模板wordpress抓取股票行情
  • 丰台深圳网站建设公司关于服装店网站建设的策划方案
  • win7 iis网站无法显示随州网站建设哪家实惠
  • 利用网站新媒体宣传法治建设建站哪个平台好
  • 网站seo课设wordpress 500 根目录
  • 电子商务网站建设的阶段化分析如何利用视频网站做数字营销推广
  • 电子商务网站建设ppt模板国外注册机网站
  • 西部数码做跳转网站百度seo排名培训优化
  • 农业网站素材wordpress all in one
  • 学习网站建设有前景没wordpress 和dokuwiki
  • 服装网站开发方案网站设计美工排版编辑
  • 旅游网站首页模板下载广州市建设工程检测中心网站
  • 餐饮加盟网站建设wordpress 首行缩进
  • kkday是哪里做的网站橙云 php网站建设