基于html5个人网站设计论文,网站建设网页制作,淘宝联盟链接的网站怎么做,网站建设专业简介不同环境Flink配置信息是不同的#xff0c;为了区分不同环境的配置文件#xff0c;使用ParameterTool工具读取带有环境的配置文件信息 区分环境的配置文件 三个配置文件#xff1a;
flink.properties#xff1a;决定那个配置文件生效
flink-dev.properties#xff1a;测… 不同环境Flink配置信息是不同的为了区分不同环境的配置文件使用ParameterTool工具读取带有环境的配置文件信息 区分环境的配置文件 三个配置文件
flink.properties决定那个配置文件生效
flink-dev.properties测试环境配置文件
flink-prod.properties生产环境配置文件
flink.properties配置文件中只配置一项flink.env.activedev读取该配置项然后组装出生效的配置文件名
工具类实现
public class ParameterUtil {/*** 默认配置文件 flink.properties*/private static final String DEFAULT_CONFIG ParameterConstants.FLINK_ROOT_FILE;/*** 带环境配置文件 flink-%s.properties*/private static final String FLINK_ENV_FILE ParameterConstants.FLINK_ENV_FILE;/*** 环境变量 flink.env.active*/private static final String ENV_ACTIVE ParameterConstants.FLINK_ENV_ACTIVE;/*** 配置文件启动参数系统环境变量 生成ParameterTool*/public static ParameterTool getParameters(final String[] args) {/* *********************** Java读取资源的方式** a. Class.getResourceAsStream(Path): Path 必须以 “/”表示从ClassPath的根路径读取资源* b. Class.getClassLoader().getResourceAsStream(Path)Path 无须以 “/”, 默认从ClassPath的根路径读取资源** 推荐使用第2种,也就是类加载器的方式获取静态资源文件, 不要通过ClassPath的相对路径查找* *********************/InputStream inputStream ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);try {// 读取根配置文件ParameterTool defaultPropertiesFile ParameterTool.fromPropertiesFile(inputStream);// 获取环境参数String envActive getEnvActiveValue(defaultPropertiesFile);// 读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)return ParameterTool// ParameterTool读取变量优先级 系统环境变量 启动参数变量 配置文件变量// 从配置文件获取配置.fromPropertiesFile(//当前线程Thread.currentThread()//返回该线程的上下文信息, 获取类加载器.getContextClassLoader().getResourceAsStream(envActive))// 从启动参数中获取配置.mergeWith(ParameterTool.fromArgs(args))// 从系统环境变量获取配置.mergeWith(ParameterTool.fromSystemProperties());} catch (IOException e) {throw new RuntimeException();}}/*** 配置文件系统环境变量 生成ParameterTool*/public static ParameterTool getParameters() {InputStream inputStream ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);/* ************************ 注意** ParameterTool 读取配置文件需要抛出 IOException,* IOException 的捕捉就在这里 catch** 以前代码是直接抛出,没有进行catch,要注意对以前代码的修改** *********************/try {ParameterTool defaultPropertiesFile ParameterTool.fromPropertiesFile(inputStream);//获取环境参数String envActive getEnvActiveValue(defaultPropertiesFile);//读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)return ParameterTool// ParameterTool读取变量优先级 系统环境变量启动参数变量配置文件变量// 从配置文件获取配置.fromPropertiesFile(//当前线程Thread.currentThread()//返回该线程的上下文信息, 获取类加载器.getContextClassLoader().getResourceAsStream(envActive))// 从系统环境变量获取配置.mergeWith(ParameterTool.fromSystemProperties());} catch (Exception e) {throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);}}/*** 获取环境配置变量*/private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {// 选择参数环境String envActive null;if (defaultPropertiesFile.has(ENV_ACTIVE)) {envActive String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));}return envActive;}/*** 从配置文件参数配置流式计算的上下文环境*/public static void envWithConfig(StreamExecutionEnvironment env,ParameterTool parameterTool) {/* ************************ checkpoint 设置** 1.* 若checkpoint 时间不要设置太短,* 这里的时间包括了超时时间** 2.* 设置了周期性checkpoint,* 若上一个周期的checkpoint没完成,* 下一个周期的checkpoint不会开始的.** 3.* 若checkpoint的持续时间超过了超时时间,* 会出现排队,* 过多的checkpoint排队会耗费资源** 4.* 为了解决checkpoint排队堆积,* 需要优化checkpoint的完成效率** *********************/// 每60秒触发checkpointenv.enableCheckpointing(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_INTERVAL));CheckpointConfig ck env.getCheckpointConfig();// checkpoint 必须在60秒内结束否则被丢弃ck.setCheckpointTimeout(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_TIMEOUT));// checkpoint间最小间隔 30秒 (指定了这个值, setMaxConcurrentCheckpoints自动默认为1)ck.setMinPauseBetweenCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MINPAUSE));// checkpoint 语义设置为 精确一致( EXACTLY_ONCE )ck.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 最多允许 checkpoint 失败 3 次ck.setTolerableCheckpointFailureNumber(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_FAILURENUMBER));// 同一时间只允许一个 checkpoint 进行ck.setMaxConcurrentCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MAXCONCURRENT));// 设置 State 存储env.setStateBackend(new HashMapStateBackend());// 并行度设置env.setParallelism(parameterTool.getInt(ParameterConstants.FLINK_PARALLELISM));}}读取环境信息
该方法会读取 flink.properties 配置的生效的配置文件组装成要读取的配置文件 // flink.properties InputStream inputStream ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);ParameterTool defaultPropertiesFile ParameterTool.fromPropertiesFile(inputStream); private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {// 选择参数环境String envActive null;// 配置文件中是否有该属性 flink.env.activeif (defaultPropertiesFile.has(ENV_ACTIVE)) {// 有的话直接拼装 flink-%s.properties - flink-dev.propertiesenvActive String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));}return envActive;}ParameterTool 获取参数的3种方式 fromPropertiesFile 配置文件 fromArgs 程序启动参数 - 或者 -- 开头 空格分隔 如-name likelong --age 21 fromSystemProperties 系统环境变量 包括程序 -D启动的变量 内部调用的是 Java提供的 System.getProperties()
ParameterTool 获取参数优先级, 可通过 mergeWith() 设置优先级, 但 mergeWith() 会覆盖前面的同名变量
因此上述ParameterTool读取变量优先级 系统环境变量 启动参数变量 配置文件变量
ParameterTool 注册 global 变量 ParameterTool 注册为 global 变量env.getConfig().setGlobalJobParameter() 这样, 在上下文中就能获取 ParameterTool (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters() 【该方法可以在富函数生命周期方法中调用】 如下 private static void initEnv(String[] args) {// ParameterTool 注册为 globalparameterTool ParameterUtil.getParameters();env.getConfig().setGlobalJobParameters(parameterTool);// 配置上下文环境ParameterUtil.envWithConfig(env, parameterTool);}