杭州制作网站的公司,男男做暧暧视频网站,庆阳网站建设推广,做网站的网站源码文章目录 LimServerLimServersnakeyaml依赖使用配置类配置文件 私有协议解码MessageDecoderByteBufToMessageUtils 这个很全#xff1a;
IM即时通讯系统[SpringBootNetty]——梳理#xff08;总#xff09; IO线程模型
Redis 分布式客户端 Redisson 分布式锁快速入门
Lim… 文章目录 LimServerLimServersnakeyaml依赖使用配置类配置文件 私有协议解码MessageDecoderByteBufToMessageUtils 这个很全
IM即时通讯系统[SpringBootNetty]——梳理总 IO线程模型
Redis 分布式客户端 Redisson 分布式锁快速入门
LimServer
public class LimServer {private final static Logger logger LoggerFactory.getLogger(LimServer.class);BootstrapConfig.TcpConfig config;EventLoopGroup mainGroup;EventLoopGroup subGroup;ServerBootstrap server;public LimServer(BootstrapConfig.TcpConfig config) {this.config config;mainGroup new NioEventLoopGroup(config.getBossThreadSize());subGroup new NioEventLoopGroup(config.getWorkThreadSize());server new ServerBootstrap();server.group(mainGroup,subGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小.option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口.childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销但影响消息实时性.childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new MessageDecoder());ch.pipeline().addLast(new MessageEncoder());
// ch.pipeline().addLast(new IdleStateHandler(
// 0, 0,
// 10));ch.pipeline().addLast(new HeartBeatHandler(config.getHeartBeatTime()));ch.pipeline().addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));}});}public void start(){this.server.bind(this.config.getTcpPort());}}
LimServer
public class LimWebSocketServer {private final static Logger logger LoggerFactory.getLogger(LimWebSocketServer.class);BootstrapConfig.TcpConfig config;EventLoopGroup mainGroup;EventLoopGroup subGroup;ServerBootstrap server;public LimWebSocketServer(BootstrapConfig.TcpConfig config) {this.config config;mainGroup new NioEventLoopGroup();subGroup new NioEventLoopGroup();server new ServerBootstrap();server.group(mainGroup, subGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小.option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口.childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销但影响消息实时性.childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline ch.pipeline();// websocket 基于http协议所以要有http编解码器pipeline.addLast(http-codec, new HttpServerCodec());// 对写大数据流的支持pipeline.addLast(http-chunked, new ChunkedWriteHandler());// 几乎在netty中的编程都会使用到此hanlerpipeline. addLast(aggregator, new HttpObjectAggregator(65535));/*** websocket 服务器处理的协议用于指定给客户端连接访问的路由 : /ws* 本handler会帮你处理一些繁重的复杂的事* 会帮你处理握手动作 handshakingclose, ping, pong ping pong 心跳* 对于websocket来讲都是以frames进行传输的不同的数据类型对应的frames也不同*/pipeline.addLast(new WebSocketServerProtocolHandler(/ws));pipeline.addLast(new WebSocketMessageDecoder());pipeline.addLast(new WebSocketMessageEncoder());pipeline.addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));}});}public void start(){this.server.bind(this.config.getWebSocketPort());}
}
snakeyaml
依赖
!-- yaml --
dependencygroupIdorg.yaml/groupIdartifactIdsnakeyaml/artifactIdversion${snakeyaml.version}/version
/dependency使用 private static void start(String path){try {Yaml yaml new Yaml();InputStream inputStream new FileInputStream(path);BootstrapConfig bootstrapConfig yaml.loadAs(inputStream, BootstrapConfig.class);new LimServer(bootstrapConfig.getLim()).start();new LimWebSocketServer(bootstrapConfig.getLim()).start();RedisManager.init(bootstrapConfig);MqFactory.init(bootstrapConfig.getLim().getRabbitmq());MessageReciver.init(bootstrapConfig.getLim().getBrokerId());registerZK(bootstrapConfig);}catch (Exception e){e.printStackTrace();System.exit(500);}}配置类
Data
public class BootstrapConfig {private TcpConfig lim;Datapublic static class TcpConfig {private Integer tcpPort;// tcp 绑定的端口号private Integer webSocketPort; // webSocket 绑定的端口号private boolean enableWebSocket; //是否启用webSocketprivate Integer bossThreadSize; // boss线程 默认1private Integer workThreadSize; //work线程private Long heartBeatTime; //心跳超时时间 单位毫秒private Integer loginModel;/*** redis配置*/private RedisConfig redis;/*** rabbitmq配置*/private Rabbitmq rabbitmq;/*** zk配置*/private ZkConfig zkConfig;/*** brokerId*/private Integer brokerId;private String logicUrl;}Datapublic static class ZkConfig {/*** zk连接地址*/private String zkAddr;/*** zk连接超时时间*/private Integer zkConnectTimeOut;}DataBuilderNoArgsConstructorAllArgsConstructorpublic static class RedisConfig {/*** 单机模式single 哨兵模式sentinel 集群模式cluster*/private String mode;/*** 数据库*/private Integer database;/*** 密码*/private String password;/*** 超时时间*/private Integer timeout;/*** 最小空闲数*/private Integer poolMinIdle;/*** 连接超时时间(毫秒)*/private Integer poolConnTimeout;/*** 连接池大小*/private Integer poolSize;/*** redis单机配置*/private RedisSingle single;}/*** redis单机配置*/DataBuilderNoArgsConstructorAllArgsConstructorpublic static class RedisSingle {/*** 地址*/private String address;}/*** rabbitmq哨兵模式配置*/DataBuilderNoArgsConstructorAllArgsConstructorpublic static class Rabbitmq {private String host;private Integer port;private String virtualHost;private String userName;private String password;}}配置文件
lim:tcpPort: 9000webSocketPort: 19000bossThreadSize: 1workThreadSize: 8heartBeatTime: 20000 #心跳超时时间 单位毫秒brokerId: 1000loginModel: 3logicUrl: http://127.0.0.1:8000/v1# * 多端同步模式1 只允许一端在线手机/电脑/web 踢掉除了本clientimel的设备# * 2 允许手机/电脑的一台设备 web在线 踢掉除了本clientimel的非web端设备# * 3 允许手机和电脑单设备 web 同时在线 踢掉非本clientimel的同端设备# * 4 允许所有端多设备登录 不踢任何设备redis:mode: single # 单机模式single 哨兵模式sentinel 集群模式clusterdatabase: 0password:timeout: 3000 # 超时时间poolMinIdle: 8 #最小空闲数poolConnTimeout: 3000 # 连接超时时间(毫秒)poolSize: 10 # 连接池大小single: #redis单机配置address: 127.0.0.1:6379rabbitmq:host: 127.0.0.1port: 5672virtualHost: /userName: guestpassword: guestzkConfig:zkAddr: 127.0.0.1:2181zkConnectTimeOut: 5000
私有协议解码
MessageDecoder
public class MessageDecoder extends ByteToMessageDecoder {Overrideprotected void decode(ChannelHandlerContext ctx,ByteBuf in, ListObject out) throws Exception {//请求头指令// 版本// clientType// 消息解析类型// appId// imei长度// bodylen imei号 请求体if(in.readableBytes() 28){return;}Message message ByteBufToMessageUtils.transition(in);if(message null){return;}out.add(message);}
}ByteBufToMessageUtils
/*** author: Chackylee* description: 将ByteBuf转化为Message实体根据私有协议转换* 私有协议规则* 4位表示Command表示消息的开始* 4位表示version* 4位表示clientType* 4位表示messageType* 4位表示appId* 4位表示imei长度* imei* 4位表示数据长度* data* 后续将解码方式加到数据头根据不同的解码方式解码如pbjson现在用json字符串* version: 1.0*/
public class ByteBufToMessageUtils {public static Message transition(ByteBuf in){/** 获取command*/int command in.readInt();/** 获取version*/int version in.readInt();/** 获取clientType*/int clientType in.readInt();/** 获取clientType*/int messageType in.readInt();/** 获取appId*/int appId in.readInt();/** 获取imeiLength*/int imeiLength in.readInt();/** 获取bodyLen*/int bodyLen in.readInt();if(in.readableBytes() bodyLen imeiLength){in.resetReaderIndex();return null;}byte [] imeiData new byte[imeiLength];in.readBytes(imeiData);String imei new String(imeiData);byte [] bodyData new byte[bodyLen];in.readBytes(bodyData);MessageHeader messageHeader new MessageHeader();messageHeader.setAppId(appId);messageHeader.setClientType(clientType);messageHeader.setCommand(command);messageHeader.setLength(bodyLen);messageHeader.setVersion(version);messageHeader.setMessageType(messageType);messageHeader.setImei(imei);Message message new Message();message.setMessageHeader(messageHeader);if(messageType 0x0){String body new String(bodyData);JSONObject parse (JSONObject) JSONObject.parse(body);message.setMessagePack(parse);}in.markReaderIndex();return message;}}