当前位置: 首页 > news >正文

专业网站建设哪家好代理注册公司要多少钱

专业网站建设哪家好,代理注册公司要多少钱,工商公示系统查询入口,开网页cpu使用率高使用ParameterTool读取配置文件 Flink读取参数的对象 Commons-cli#xff1a; Apache提供的#xff0c;需要引入依赖ParameterTool#xff1a;Flink内置 ParameterTool 比 Commons-cli 使用上简便#xff1b; ParameterTool能避免Jar包的依赖冲突 建议使用第二种 使用Par…使用ParameterTool读取配置文件 Flink读取参数的对象 Commons-cli Apache提供的需要引入依赖ParameterToolFlink内置 ParameterTool 比 Commons-cli 使用上简便 ParameterTool能避免Jar包的依赖冲突 建议使用第二种 使用ParameterTool对象可以直接获取配置文件中的信息需要如下依赖 !-- Flink基础依赖 【ParameterTool类 在该依赖中】 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactId/dependency!-- Flink流批处理依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_${scala.binary.version}/artifactId/dependencyJava读取资源的方式 Class.getResourceAsStream(Path)Path 必须以 “/”表示从ClassPath的根路径读取资源Class.getClassLoader().getResourceAsStream(Path)Path 无须以 “/”默认从ClassPath的根路径读取资源 推荐使用第2种以类加载器的方式获取静态资源文件不要通过ClassPath的相对路径查找 最基本的工具类 public class ParameterUtil {// 创建 ParameterTool 对象public static ParameterTool getParameters() {// 读取 resources 文件夹下 flink.properties 文件InputStream inputStream ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);try {return ParameterTool.fromPropertiesFile(inputStream);} catch (Exception e) {throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);}} }可以通过 ParameterUtil.getParameters().get(redis.port) 直接读取key对应的value值 Flink写入Redis方式 继承RichSinkFunction (Flink-Stream)使用第3方的包 (Apache-Bahir-Flink) Apache-Bahir-Flink 的 Redis-Connector的缺点 使用Jedis, 没有使用Lettuce没有对 Flink Table/SQL Api 的支持 不少基于bahir二开的例子解决了上述问题 gitee地址https://gitee.com/jeff-zou/flink-connector-redis?_fromgitee_search github地址https://github.com/apache/bahir-flink bahir 集成了许多连接器其中就包含Redis Flink官网上也可以看到bahir的影子 方便起见接下来就基于bahirFlink写入Redis集群 基于巴希尔(Bahir)-Flink写入Redis集群 引入connector连接器依赖 !-- Flink-Connector-Redis --dependencygroupIdorg.apache.bahir/groupIdartifactIdflink-connector-redis_${scala.binary.version}/artifactId/dependency依赖版本定义在父模块中 实现RedisMapper接口自定义Sink 首先实现RedisMapper接口并指定泛型——处理元素的类型 /*** 基于apache bachir flink的RedisSink作用于Redis String数据类型*/ public class RedisSinkByBahirWithString implements RedisMapperTuple2String, String {/*** 指定Redis的命令*/Overridepublic RedisCommandDescription getCommandDescription() {/* ************************ 如果Redis的数据类型是 hash 或 z-Set* RedisCommandDescription 的构造方法必须传入 additionalKey* additionalKey就是Redis的键** *********************/return new RedisCommandDescription(RedisCommand.SET);}/*** 从数据流里获取Key值*/Overridepublic String getKeyFromData(Tuple2String, String input) {return input.f0;}/*** 从数据流里获取Value值*/Overridepublic String getValueFromData(Tuple2String, String input) {return input.f1;} }写入Redis工具类 public class RedisWriteUtil {/* ************************ FlinkJedisClusterConfig集群模式* FlinkJedisPoolConfig单机模式* FlinkJedisSentinelConfig哨兵模式** *********************/// Jedis配置private static final FlinkJedisClusterConfig JEDIS_CONF;static {ParameterTool parameterTool ParameterUtil.getParameters();String host parameterTool.get(redis.host);String port parameterTool.get(redis.port);/* ************************ InetSocketAddress 是Java的套接字** *********************/InetSocketAddress inetSocketAddress new InetSocketAddress(host, Integer.parseInt(port));SetInetSocketAddress set new HashSet();set.add(inetSocketAddress);JEDIS_CONF new FlinkJedisClusterConfig.Builder().setNodes(set).build();}/*** 基于Bahir写入RedisRedis的数据是String类型*/public static void writeByBahirWithString(DataStreamTuple2String, String input) {input.addSink(new RedisSink(JEDIS_CONF, new RedisSinkByBahirWithString()));}}测试一下 class RedisWriteUtilTest {DisplayName(测试基于Bahir写入Redis,Redis数据类型是String类型)Testvoid writeByBahirWithString() throws Exception {LocalStreamEnvironment env StreamExecutionEnvironment.createLocalEnvironment();DataStreamSourceTuple2String, String dataStream env.fromElements(Tuple2.of(k, v));RedisWriteUtil.writeByBahirWithString(dataStream);env.execute();} }非常完美写入成功 Flink读取Redis方式 继承RichSourceFunction (实现自定义Source)继承RichParallelSourceFunction (实现自定义Source)【可以指定并行度】实现SourceFunction接口 (实现自定义Source) RichParallelSourceFunction 和 RichSourceFunction区别 RichParallelSourceFunction 可以设置并行度 RichParallelSourceFunction 和 RichSourceFunction 代码是可以互相套用 RichParallelSourceFunction 默认的并行度是cpu 的 核心数(core数) RichSourceFunction 的并行度只能是1 继承RichSourceFunction类-Flink读取Redis集群 前置准备 定义枚举类 Redis数据类型枚举类 Getter public enum RedisDataType {STRING,HASH,LIST,SET,SORTED_SET,;RedisDataType() {} }定义Redis命令的枚举类便于Source判断操作 Getter public enum RedisCommand {// get stringGET(RedisDataType.STRING);private final RedisDataType redisDataType;RedisCommand(RedisDataType redisDataType) {this.redisDataType redisDataType;} }Jedis配置类 bahir依赖中自带jedis依赖一般不用自行引入jedisjedis依赖版本要与巴希尔中jedis版本保持一致 public class JedisConf {public static JedisCluster getJedisCluster() throws IOException {ParameterTool parameterTool ParameterUtil.getParameters();String host parameterTool.get(redis.host);String port parameterTool.get(redis.port);/* *********************** Jedis对象** JedisPool : 用于redis单机版* JedisCluster: 用于redis集群** JedisCluster对象能够自动发现正常的redis节点** *********************/HostAndPort hostAndPort new HostAndPort(host,Integer.parseInt(port));SetHostAndPort nodes new HashSet();nodes.add(hostAndPort);return new JedisCluster(nodes);} }封装Jedis对象的redis方法 封装Jedis对象的redis方法方便统一调用和维护 public class JedisBuilder {private JedisCluster jedis null;public JedisBuilder(JedisCluster jedisCluster) {this.jedis jedisCluster;}public void close() {if (this.jedis ! null) {this.jedis.close();}}/*** Redis的Get方法*/public String get(String key) {return jedis.get(key);} }自定义Source Redis数据的映射对象 Data AllArgsConstructor NoArgsConstructor public class RedisPO implements Serializable {private String data;}Flink 自定义Redis Source读取Redis /* *********************** 【富函数类】 比函数类提供了更多函数生命周期提供了获取上下文的方法* 富函数类通常是抽象类* *********************/ public class RedisSource extends RichSourceFunctionRedisPO {/*** Jedis对象*/private JedisBuilder jedisBuilder;/*** Redis命令枚举对象*/private final RedisCommand redisCommand;/*** redis key*/private final String key;public RedisSource(RedisCommand redisCommand, String key) {this.redisCommand redisCommand;this.key key;}/*** volatile 修饰的变量它的更新都会通知其他线程.*/private volatile boolean isRunning true;/*** Redis的连接初始化*/Overridepublic void open(Configuration parameters) throws Exception {JedisCluster jedisCluster JedisConf.getJedisCluster();jedisBuilder new JedisBuilder(jedisCluster);}/*** Redis数据的读取*/Overridepublic void run(SourceContextRedisPO output) throws Exception {/* ************************ 一直监听Redis数据的读取** *********************/String data null;// while (isRunning) {switch (redisCommand.getRedisDataType()) {case STRING:data jedisBuilder.get(key);}output.collect(new RedisPO(data));// }}Overridepublic void cancel() {this.isRunning false;}}读取Redis工具类 public class RedisReadUtil {public static DataStreamRedisPO read(StreamExecutionEnvironment env,RedisCommand redisCommand,String key) {return env.addSource(new RedisSource(redisCommand, key));} }测试一下 class RedisReadUtilTest {DisplayName(测试自定义Source读取Redis,Redis数据类型是String类型)Testvoid testReadByCustomSourceWithString() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamRedisPO dataStream RedisReadUtil.read(env,RedisCommand.GET,k);dataStream.print();env.execute();} }测试成功
http://www.zqtcl.cn/news/835293/

相关文章:

  • 企业网站优化报价自己做个网站怎么赚钱
  • 做ui的网站有哪些网站建设订单模板
  • 重庆企业网站优化wordpress 接收询盘
  • 小米4路由器可以做网站嘛杭州淘宝代运营公司十大排名
  • 枞阳做网站的百度搜索入口
  • 网站建设提议徐州网站建设方案咨询
  • 昆明高端网站建设专门做游轮的网站
  • 教育培训网站抄袭网站是广西住房和城乡建设厅
  • 广州做网站建设的公司哪家好网站建设运营公司
  • 网站集约化建设 技术国内永久在线免费建站
  • 极简资讯网站开发有什么免费推广项目的好软件
  • 网站有哪几种类型如何让百度分享按钮在网站每个页面都有
  • 北京市昌平建设工程招标网站网站建设要学哪些软件有哪些方面
  • 部队内网网站建设方案诱导视频网站怎么做
  • 安徽省美好乡村建设网站郑州网站建设 论坛
  • 手机网站怎么建设软件外包公司绩效考核内容
  • 北京最大的火车站网站免费推广方式
  • 外贸网站建设系统工程公司名称大全
  • 手机视频网站建站吴江区经济开发区建设工程网站
  • 网站建设存在哪些问题学校网站手机站的建设
  • 婚恋网站设计手机免费制作网站模板
  • 北京网站建设与维护公司网络组建方案设计
  • 自己做网站好还是凡科樱花动漫做网站
  • 自己做外贸开通什么网站wordpress万能主题
  • 网站建设中添加图片链接cad线下培训班
  • 网站建站系统程序长宁区网站建设网站制
  • 合肥网站建设合肥做网站wordpress 关于页面
  • 软件开发公司赚钱吗北京网站优化解决方案
  • 泰安的网站建设公司哪家好国外ps网站
  • 网站建设制作方案做字典网站开发