青海省建设厅网站姚宽一,漯河做网站优化,外贸网站打开速度,wordpress固定连接404文章目录 前言一、POM依赖二、主函数代码示例三、FilterFunction实现总结 前言
SpEL表达式与Flink fiter结合可以实现基于表达式的灵活动态过滤。有关SpEL表达式的使用请参考Spring SpEL在Flink中的应用-SpEL详解。 可以将过滤规则放入数据库#xff0c;根据不同的数据设置不… 文章目录 前言一、POM依赖二、主函数代码示例三、FilterFunction实现总结 前言
SpEL表达式与Flink fiter结合可以实现基于表达式的灵活动态过滤。有关SpEL表达式的使用请参考Spring SpEL在Flink中的应用-SpEL详解。 可以将过滤规则放入数据库根据不同的数据设置不同的过滤表达式从而实现只需修改过滤表达式不用修改Flink代码的效果。 一、POM依赖
首先在 pom.xml 中加入依赖
dependencygroupIdorg.springframework/groupIdartifactIdspring-expression/artifactIdversion5.2.0.RELEASE/version
/dependency二、主函数代码示例 import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;import java.text.SimpleDateFormat;public class FlinkSpelFilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Row rowRow.of(张三,001,getTimestamp(2016-10-24 21:59:06),23);Row row2Row.of(张三,002,getTimestamp(2016-10-24 21:50:06),33);Row row3Row.of(张三,003,getTimestamp(2016-10-24 21:51:06),43);Row row4Row.of(李四,004,getTimestamp(2016-10-24 21:50:56),13);Row row5Row.of(李四,005,getTimestamp(2016-10-24 00:48:36),53);Row row6Row.of(李四,006,getTimestamp(2016-10-24 00:48:36),34);Row row7Row.of(李四,007,getTimestamp(2016-10-24 00:48:36),23);Row row8Row.of(李四,008,getTimestamp(2016-10-24 00:48:36),26);Row row9Row.of(李四,009,getTimestamp(2016-10-24 00:48:36),63);DataStreamSourceRow source env.fromElements(row,row2,row3,row4,row5,row6,row7,row8,row9);//spel表达式实现日期的比较过滤String spelcompareDate(#row.getField(2), \2016-10-24 00:48:36\)0;//实现对数字的过滤
// spel#row.getField(3)33;SingleOutputStreamOperatorRow filterStream source.filter(new FilterSpelFunction(spel));filterStream.print();env.execute();}private static java.sql.Timestamp getTimestamp(String str) throws Exception {
// String string 2016-10-24 21:59:06;SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);java.util.Date datesdf.parse(str);java.sql.Timestamp s new java.sql.Timestamp(date.getTime());return s;}三、FilterFunction实现 import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import spel.demo.util.SpelMethodUtil;/*** 基于spel 表达式的过滤*/
public class FilterSpelFunction extends RichFilterFunctionRow {private static final Logger logger LoggerFactory.getLogger(FilterSpelFunction.class);private transient Expression exp;private String filterExpr;public FilterSpelFunction(String filterSpel) {filterExprfilterSpel;logger.info(filterExpr:{},filterExpr);}Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);SpelExpressionParser parser new SpelExpressionParser();exp parser.parseExpression(filterExpr);}Overridepublic boolean filter(Row row) throws Exception {try {//注册自定义函数类StandardEvaluationContext conetxt new StandardEvaluationContext(new SpelMethodUtil());//设置变量conetxt.setVariable(row,row);Boolean value exp.getValue(conetxt, Boolean.class);if (value null) {logger.error(表达式结果为null);throw new Exception(表达式结果为null);}return value;}catch (Exception e){logger.error(filter 异常, e);throw e;}}
}
自定义函数类 import org.apache.commons.lang3.StringUtils;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;public class SpelMethodUtil {public static final String TIMESTAMP_FORMAT yyyy-MM-dd HH:mm:ss;public static final String DATE_FORMAT yyyy-MM-dd;public static final String TIME_FORMAT HH:mm:ss;public static Integer compareDate(Date date, String strDate){Integer result;if(datenull StringUtils.isBlank(strDate)){return 0;}else{if(datenull || StringUtils.isBlank(strDate)){return -2;}}String trimDatestrDate.trim();String format findFormat(trimDate);Date date2 stringToDate(trimDate, format);resultdate.compareTo(date2);return result;}public static Integer compareDate(Date first, Date second){if(firstnull secondnull){return 0;}else{if(firstnull || secondnull){return -2;}}return first.compareTo(second);}public static Date stringToDate(String dateStr,String format){SimpleDateFormat sdf new SimpleDateFormat(format);Date datenull;try {date sdf.parse(dateStr);} catch (ParseException e) {e.printStackTrace();}return date;}/*** 查找与输入的字符型日期相匹配的format* param strDate* return*/public static String findFormat(String strDate){String resultnull;String trimDatestrDate.trim();int lentrimDate.length();String dateRegex ;if(lenTIMESTAMP_FORMAT.length()){dateRegex ^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$;if(trimDate.matches(dateRegex)){resultTIMESTAMP_FORMAT;}}else if(lenDATE_FORMAT.length()){dateRegex ^\\d{4}-\\d{2}-\\d{2}$;if(trimDate.matches(dateRegex)){resultDATE_FORMAT;}}else if(lenTIME_FORMAT.length()){dateRegex ^\\d{2}:\\d{2}:\\d{2}$;if(trimDate.matches(dateRegex)){resultTIME_FORMAT;}}else{throw new RuntimeException(不可识别的日期格式strDate);}return result;}public static Integer addAge(Integer age){return age4;}
} 总结
以上只是简单的示例在实际应用中可以将过滤表达式放到数据库将过滤规则放入缓存定时刷新。大家可以根据实际需求进行扩展。