网站小图片素材,商城平台推广方案,菜单栏颜色wordpress,河北省建设厅5.Neety入门
什么是Netty Netty是一个基于Java NIO的异步事件驱动的网络应用框架。它被广泛用于开发高性能、高可靠性的网络通信程序#xff0c;特别是服务器端和客户端程序。Netty提供了简洁而强大的API#xff0c;使得开发者能够轻松地构建各种网络应用#xff0c;包括实…5.Neety入门
什么是Netty Netty是一个基于Java NIO的异步事件驱动的网络应用框架。它被广泛用于开发高性能、高可靠性的网络通信程序特别是服务器端和客户端程序。Netty提供了简洁而强大的API使得开发者能够轻松地构建各种网络应用包括实时通信系统、游戏服务器、分布式系统等。 Netty的主要特点包括 异步和事件驱动Netty基于事件驱动模型使用异步的方式处理网络IO操作可以处理大量的并发连接而不需要大量的线程。高性能Netty采用了高效的NIO模型以及优化的数据结构和算法能够实现出色的性能表现。可定制性Netty提供了丰富的组件和可定制的API使得开发者可以根据自己的需求对网络通信进行灵活的定制和扩展。多协议支持Netty支持多种常见的网络协议包括HTTP、WebSocket、TCP、UDP等使得开发者可以轻松地实现各种类型的网络应用。 Netty的地位
以下是一些使用Netty框架的知名项目和框架
Apache KafkaKafka是一个分布式流处理平台它使用Netty来处理网络通信实现高效的消息传输。ElasticsearchElasticsearch是一个分布式搜索引擎它使用Netty作为其节点之间通信的底层框架。gRPCgRPC是一个高性能的远程过程调用框架它使用Netty来实现底层的网络通信。Apache DubboDubbo是一个高性能的分布式服务框架它使用Netty作为其网络通信的实现方式。RocketMQRocketMQ是一个开源的分布式消息队列它使用Netty来实现消息的传输和通信。Spring 5Spring 5 引入了对反应式编程的支持使得开发人员可以更轻松地构建异步和非阻塞的应用程序而 Netty 作为其底层的网络通信实现之一。Flux API完全抛弃Tomcat 使用Netty作为服务端ZookeeperZookeeper 是一个开源的分布式协调服务用于管理和协调分布式系统中的各种信息比如配置管理、命名服务、分布式锁等。
Netty的优势
Netty相对于Java NIO具有以下优势
简化的编程模型Netty提供了更高级别的抽象使得开发者可以更轻松地编写网络应用程序。Netty隐藏了许多底层细节提供了易于理解和使用的API使得开发者能够更专注于业务逻辑的实现。丰富的功能和组件Netty提供了许多预置的编解码器、处理器和插件涵盖了各种常见的网络通信场景例如HTTP、WebSocket、UDP等。开发者可以直接使用这些组件而无需重复实现相同的功能大大加速了开发过程。高性能和可靠性Netty在底层实现了高效的事件驱动模型和异步IO机制能够实现更高的吞吐量和更低的延迟。开发者可以借助Netty的优秀性能构建高性能、高可靠性的网络应用程序。可扩展性和灵活性Netty的组件化设计使得它具有很好的可扩展性和灵活性。开发者可以根据自己的需求选择合适的组件和插件或者自行扩展和定制特定的功能从而更好地满足业务需求。文档和社区支持Netty拥有丰富的文档和活跃的社区支持提供了大量的教程、示例代码和问题解答帮助开发者快速上手并解决问题。开发者可以通过文档和社区获取到丰富的资源和支持加速开发过程。
5.1.Hello World
5.1.1.目标 开发一个简单的服务器端 和 客户端 客户端向服务器端发送 Hello,World服务器仅接收不返回 implementation io.netty:netty-codec-http:4.1.106.Final5.1.2.服务器端的代码实现
public class HelloServer {private static final Logger logger LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// ServerBootstrap 是一个启动 NIO 服务的辅助启动类// 1.负责组装Netty组装组件启动服务器new ServerBootstrap()// 2。配置两个NioEventLoopGroup一个用于接收客户端连接另一个用于处理客户端读写// NioEventLoopGroup 是一个处理I/O操作的多线程事件循环 NioEventLoopGroup(selector,thread)(包含 选择器 和 线程 ).group(new NioEventLoopGroup())// 3.Netty 支持多种协议这里是 NIO 的实现 还有OIO BIO ,EPOLL 等.channel(NioServerSocketChannel.class)// 4.配置ServerSocketChannel的选项 将来处理褚时间的一个分工决定了worker(child)能执行哪些操作(handler).childHandler(// 5.channel代表和客户进行数据读写的通道 Initializer代表初始化器 负责添加别的handlernew ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6.添加具体的handler// 6.1.添加StringDecoder解码器 数据传输过来的时候会先经过这个解码器数据传输的时候都是ByteBuf 解码成Stringch.pipeline().addLast(new StringDecoder());// 6.2.添加ChannelInboundHandlerAdapter处理器(自定义的业务处理器)ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 读时间处理器logger.info(server receive msg: {}, msg);}});}// 7.绑定的监听端口}).bind(8080);}
}
5.1.3.客户端代码实现
public class HelloClient {private static final Logger logger LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 1.创建启动器try {new Bootstrap()// 2.指定线程模型 一个用于接收客户端连接另一个用于处理客户端读写.group(new NioEventLoopGroup())// 3.选择客户端的Channel的实现.channel(NioSocketChannel.class)// 4.添加处理器.handler(new ChannelInitializerNioSocketChannel() {// 5.初始化处理器Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6.添加具体的handler 客户端是需要一个编码器ch.pipeline().addLast(new StringEncoder());}})// 7.连接到服务器.connect(new InetSocketAddress(localhost, 8080)).sync().channel()// 8.向服务器发送数据.writeAndFlush(hello, world);} catch (InterruptedException e) {throw new RuntimeException(e);}}
} 5.2.组件
5.2.1.EventLoop 事件循环对象 EventLoop本质是一个单线程执行器同时维护了一个Selector里面 run方法处理Channel上远远不断的事件 继承关系 一条线是继承ScheduledExecutorService 因此包含了线程池中所有方法 public interface EventLoop extends EventLoopGroup public interface EventLoopGroup extends EventExecutorGroup public interface EventExecutorGroup extends ScheduledExecutorService, IterableEventExecutor另一条线继承了netty自己的OrderedEventExecutor public interface EventLoop extends OrderedEventExecutorpublic interface OrderedEventExecutor extends EventExecutor public interface EventExecutor extends EventExecutorGroup public interface EventExecutorGroup extends ScheduledExecutorService, IterableEventExecutor boolean inEventLoop(Thread var1); 用于判断一个线程是否属于EventLoopEventExecutorGroup parent(); 用于判断自己属于哪一个EventLoopGroup 事件循环组 EventLoopGroup是一组EventLoopChannel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop后续这个Channel上的IO事件都由此EventLoop来处理保证了io事件处理时的线程安全
继承Netty自己的EventLoopGrouy 实现了Iterable接口提供遍历EventLoop的能力next()方法 用于获取集合中的下一个EventLoop
5.2.1.1. 普通-定时任务
/*** author 13723* version 1.0* 2024/2/24 19:34*/
public class TestEventLoop {private static final Logger logger LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 1.创建事件循环粗// NioEventLoopGroup 比较全面因为可以向他提交 IO事件 普通任务 定时任务// DefaultEventLoop 主要处理普通任务 和 定时任务// 如果不传线程 那么会自动分配// private static final int DEFAULT_EVENT_LOOP_THREADS // Math.max(1,
// SystemPropertyUtil.getInt(io.netty.eventLoopThreads, NettyRuntime.availableProcessors() * 2)
// ); 1 和你当前 cpu核心数 * 2进行比较 选最大哪个// protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {// super(nThreads 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);// }EventLoopGroup group new NioEventLoopGroup(2);// 2,获取下一个事件的循环对象(本身就设置了两个)logger.error(获取下一个事件的循环对象:{}, group.next());logger.error(获取下一个事件的循环对象:{}, group.next());logger.error(获取下一个事件的循环对象:{}, group.next());// 3.执行普通任务group.next().submit(()-{// 会将任务提交事件组中的某一个事件对象去执行// 类似于线程池的submit只不过这个是提交到事件循环组中logger.error(普通任务);},t1);// 4.执行定时任务 让你延迟执行一度任务 或者 周期性执行任务// 参数1.任务 2.延迟时间 3.间隔时间 4.时间单位// 每隔1s打印一次 定时任务group.next().scheduleAtFixedRate(()-{logger.error(定时任务);},0,1, TimeUnit.SECONDS);}}5.2.1.2. IO任务 依次轮流分配因为分配给EnventLoopGroup是两个线程 所以 每次Channel进来的时候会依次和 对应的EventLoop进行绑定这样下次就是再执行对应的Channel是 还是由绑定的EventLoop去执行 5.2.1.2.1.服务端
/*** author 13723* version 1.0* 2024/2/24 18:23*/
public class EventLoopClient {private static final Logger logger LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 1.创建启动器try {Channel channel new Bootstrap()// netty职责进行划分 boos 和 worker// .group(new NioEventLoopGroup())// 第一个参数就是Boos 只负责处理ServerSocketChannel Accept事件// 第二个参数是worker 负责处理读SocketChannel 读写事件.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))// 3.选择客户端的Channel的实现.channel(NioSocketChannel.class)// 4.添加处理器.handler(new ChannelInitializerNioSocketChannel() {// 5.初始化处理器Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6.添加具体的handler 客户端是需要一个编码器ch.pipeline().addLast(new StringEncoder());}})// 7.连接到服务器.connect(new InetSocketAddress(localhost, 8080)).sync() // 阻塞方法 知道连接建立.channel();// 代表客户端和服务端的连接// TODO 在这里通过DEBUG 模式 发送多次数据System.out.println(发送数据 结束);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
5.2.1.2.2.客户端
/*** author 13723* version 1.0* 2024/2/24 18:23*/
public class EventLoopClient {private static final Logger logger LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 1.创建启动器try {Channel channel new Bootstrap()// 2.指定线程模型 一个用于接收客户端连接另一个用于处理客户端读写.group(new NioEventLoopGroup())// 3.选择客户端的Channel的实现.channel(NioSocketChannel.class)// 4.添加处理器.handler(new ChannelInitializerNioSocketChannel() {// 5.初始化处理器Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6.添加具体的handler 客户端是需要一个编码器ch.pipeline().addLast(new StringEncoder());}})// 7.连接到服务器.connect(new InetSocketAddress(localhost, 8080)).sync() // 阻塞方法 知道连接建立.channel();// 代表客户端和服务端的连接// TODO 在这里通过DEBUG 模式 发送多次数据System.out.println(发送数据 结束);} catch (InterruptedException e) {throw new RuntimeException(e);}}
} 上面是没有进行职责划分时场景下面对职责进行划分了
// netty职责进行划分 boos 和 worker
// .group(new NioEventLoopGroup())
// 第一个参数就是Boos 只负责处理ServerSocketChannel Accept事件
// 第二个参数是worker 负责处理读SocketChannel 读写事件
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))如果单个handler执行时间过长 可以再单独划分出来一个EventLoopGoop去处理这样就不会影响其他IO线程
public class EventLoopServer {private static final Logger logger LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 细分2创建一个独立的EventLoopGroup// 1.创建一个独立的EventLoopGroup 用于处理耗时较长的handlerEventLoopGroup group new DefaultEventLoop(2);new ServerBootstrap()// 细分1// netty职责进行划分 boos 和 worker// .group(new NioEventLoopGroup())// 第一个参数就是Boos 只负责处理ServerSocketChannel Accept事件// 第二个参数是worker 负责处理读SocketChannel 读写事件 将来的worker线程有两个.group(new NioEventLoopGroup(),new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerNioSocketChannel(){Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(handler1,new ChannelInboundHandlerAdapter(){/*** 读事件* param ctx 上下文* param msg 读到的消息 ByteBuf事件* throws Exception 异常*/Overridepublic void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf (ByteBuf) msg;String message buf.toString(Charset.defaultCharset());logger.error(接收到消息:{},message);// 处理完毕之后 交给交给handler2进行处理// 将消息传递给下一个handlerctx.fireChannelRead(msg);}// TODO 注意 这里处理较长时间的handler 会导致boos线程组的线程被阻塞 从而影响新的连接的接入 所以下面分配了,一个独立的EventLoopGroup 用于处理耗时较长的handler}).addLast(group,handler2,new ChannelInboundHandlerAdapter(){/*** 读事件* param ctx 上下文* param msg 读到的消息 ByteBuf事件* throws Exception 异常*/Overridepublic void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf (ByteBuf) msg;String message buf.toString(Charset.defaultCharset());logger.error(接收到消息:{},message);}});}}).bind(8080);}}