企业网站策划案怎么写,资源网站哪个好,商城小程序价格,北京百度搜索排名优化作者#xff1a;潘伟龙#xff08;豁朗#xff09;
背景
日志服务 SLS 是云原生观测与分析平台#xff0c;为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务#xff0c;基于日志服务的便捷的数据接入能力#xff0c;可以将系统日志、业务日志等接入 …作者潘伟龙豁朗
背景
日志服务 SLS 是云原生观测与分析平台为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务基于日志服务的便捷的数据接入能力可以将系统日志、业务日志等接入 SLS 进行存储、分析阿里云 Flink 是阿里云基于 Apache Flink 构建的大数据分析平台在实时数据分析、风控检测等场景应用广泛。阿里云 Flink 原生支持阿里云日志服务 SLS 的 Connector可以在阿里云 Flink 平台将 SLS 作为源表或者结果表使用。
在阿里云 Flink 配置 SLS 作为源表时默认会消费 SLS 的 Logstore 数据进行动态表的构建在消费的过程中可以指定起始时间点消费的数据也是指定时间点以后的全量数据在特定场景中往往只需要对某类特征的日志或者日志的某些字段进行分析处理此类需求可以通过 Flink SQL 的 WHERE 和 SELECT 完成这样做有两个问题
1Connector 从源头拉取了过多不必要的数据行或者数据列造成了网络的开销
2这些不必要的数据需要在 Flink 中进行过滤投影计算这些清洗工作并不是数据分析的关注的重点造成了计算的浪费。
对于这种场景有没有更好的办法呢
答案是肯定的SLS 推出了 SPL 语言 可以高效的对日志数据的清洗加工。 这种能力也集成在了日志消费场景包括阿里云 Flink 中 SLS Connector通过配置 SLS SPL 即可实现对数据的清洗规则在减少网络传输的数据量的同时也可以减少 Flink 端计算消耗。
接下来对 SPL 及 SPL 在阿里云 Flink SLS Connector 中应用进行介绍及举例。
SLS SPL 介绍 SLS SPL 是日志服务推出的一款针对弱结构化的高性能日志处理语言可以同时在 Logtail 端、查询扫描、流式消费场景使用具有交互式、探索式、使用简洁等特点。
SPL 基本语法如下
data-source
| spl-cmd -optionoption -option ... expression, ... as output, ...
| spl-cmd ...
| spl-cmd ...spl-cmd 是 SPL 指令支持行过滤、列扩展、列裁剪、正则取值、字段投影、数值计算、JSON、CSV 等半结构化数据处理具体参考 SPL 指令 [ 1] 介绍具体来说包括
结构化数据 SQL 计算指令
支持行过滤、列扩展、数值计算、SQL 函数调用
extend 通过 SQL 表达式计算结果产生新字段where 根据 SQL 表达式计算结果过滤数据条目
*
| extend latencycast(latency as BIGINT)
| where status200 AND latency100字段操作指令
支持字段投影、字段重名、列裁剪
project 保留与给定模式相匹配的字段、重命名指定字段project-away 保留与给定模式相匹配的字段、重命名指定字段project-rename 重命名指定字段并原样保留其他所有字段
*
| project-away -wildcard __tag__:*
| project-rename __source__remote_addr非结构化数据提取指令
支持 JSON、正则、CSV 等非结构化字段值处理
parse-regexp 提取指定字段中的正则表达式分组匹配信息parse-json 提取指定字段中的第一层 JSON 信息parse-csv 提取指定字段中的 CSV 格式信息
*
| parse-csv -delim^_^ content as time, body
| parse-regexp body, (\S)\s(\w) as msg, userSPL 在 Flink SLS Connector 中的原理介绍
阿里云 Flink 支持 SLS Connector通过 SLS Connector 实时拉取 SLS 中 Logstore 的数据分析后的数据也可以实时写入 SLS作为一个高性能计算引擎Flink SQL 也在越来越广泛的应用在 Flink 计算中借助 SQL 语法可以对结构化的数据进行分析。
在 SLS Connector 中可以配置日志字段为 Flink SQL 中的 Table 字段然后基于 SQL 进行数据分析在未支持 SPL 配置之前SLS Connector 会实时消费全量的日志数据到 Flink 计算平台当前消费方式有如下特点
在 Flink 中计算的往往不需要所有的日志行比如在安全场景中可能仅需要符合某种特征的数据需要进行日志进行过滤事实上不需要的日志行也会被拉取造成网络带宽的浪费。在 Flink 中计算的一般是特定的字段列比如在 Logstore 中有 30 个字段真正需要在 Flink 计算的可能仅有 10 个字段全字段的拉取造成了网络带宽的浪费。
在以上场景中可能会增加并不需要的网络流量和计算开销基于这些特点SLS 将 SPL 的能力集成到 SLS Connector 的新版本中可以实现数据在到达 Flink 之前已经进行了行过滤和列裁剪这些预处理能力内置在 SLS 服务端可以达到同时节省网络流量与 Flink 计算过滤、列裁剪开销的目的。
原理对比
未配置 SPL 语句时Flink 会拉取 SLS 的全量日志数据包含所有列、所有行进行计算如图 1。配置 SPL 语句时SPL 可以对拉取到的数据如果 SPL 语句包含过滤及列裁剪等Flink 拉取到的是进行过滤和列裁剪后部分数据进行计算如图 2。 在 Flink 中使用 SLS SPL
接下来以一个 Nginx 日志为例来介绍基于 SLS SPL 的能力来使用 Flink。为了便于演示这里在 Flink 控制台配置 SLS 的源表然后开启一个连续查询以观察效果。在实际使用过程中可以直接修改 SLS 源表保留其余分析和写出逻辑。
接下来介绍下阿里云 Flink 中使用 SPL 实现行过滤与列裁剪功能。
在 SLS 准备数据
开通 SLS在 SLS 创建 ProjectLogstore并创建具有消费 Logstore 的权限的账号 AK/SK。当前 Logstore 数据使用 SLS 的的 SLB 七层日志模拟接入方式产生模拟数据其中包含 10 多个字段。 模拟接入会持续产生随机的日志数据日志内容示例如下
{__source__: 127.0.0.1,__tag__:__receive_time__: 1706531737,__time__: 1706531727,__topic__: slb_layer7,body_bytes_sent: 3577,client_ip: 114.137.195.189,host: www.pi.mock.com,http_host: www.cwj.mock.com,http_user_agent: Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0,request_length: 1662,request_method: GET,request_time: 31,request_uri: /request/path-0/file-3,scheme: https,slbid: slb-02,status: 200,upstream_addr: 42.63.187.102,upstream_response_time: 32,upstream_status: 200,vip_addr: 223.18.47.239
}Logstore 中 slbid 字段有两种值slb-01 和 slb-02对 15 分钟的日志数据进行 slbid 统计可以发现 slb-01 与 slb-02 数量相当。 行过滤场景
在数据处理中过滤数据是一种常见需求在 Flink 中可以使用 filter 算子或者 SQL 中的 where 条件进行过滤使用非常方便但是在 Flink 使用 filter 算子往往意味着数据已经通过网络进入 Flink 计算引擎中全量的数据会消耗着网络带宽和 Flink 的计算性能这种场景下SLS SPL 为 Flink SLS Connector 提供了一种支持过滤“下推”的能力通过配置 SLS Connector 的 query 语句中过滤条件即可实现过滤条件下推。避免全量数据传输和全量数据过滤计算。 创建 SQL 作业
在阿里云 Flink 控制台创建一个空白的 SQL 的流作业草稿点击下一步进入作业编写。 在作业草稿中输入如下创建临时表的语句
CREATE TEMPORARY TABLE sls_input(request_uri STRING,scheme STRING,slbid STRING,status STRING,__topic__ STRING METADATA VIRTUAL,__source__ STRING METADATA VIRTUAL,__timestamp__ STRING METADATA VIRTUAL,__tag__ MAPVARCHAR, VARCHAR METADATA VIRTUAL,proctime as PROCTIME()
) WITH (connector sls,endpoint cn-beijing-intranet.log.aliyuncs.com,accessId ${ak},accessKey ${sk},starttime 2024-01-21 00:00:00,project ${project},logstore test-nginx-log,query * | where slbid slb-01
);这里为了演示方便仅设置 request_uri、scheme、slbid、status 和一些元数据字段作为表字段。 a k 、 {ak}、 ak、{sk}、${project} 替换为具有 Logstore 消费权限的账号。endpoint填写同地域的 SLS 的私网地址。query填写 SLS 的 SPL 语句这里填写了 SPL 的过滤语句* | where slbid ‘‘slb-01’’注意在阿里云 Flink 的 SQL 作业开发中字符串需要使用英文单引号进行转义。
连续查询及效果
在作业中输入分析语句按照 slbid 进行聚合查询动态查询会根据日志的变化实时刷新数字。
SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid点击右上角调试按钮进行调试可以看到结果中 slbid 的字段值始终是 slb-01。 可以看出设置了 SPL 语句后sls_input 仅包含 slbid‘slb-01’ 的数据其他不符合条件的数据被过滤掉了。
流量对比
使用 SPL 后可以看出在 SLS 的写流量不变的情况下Flink 对 SLS 的读流量有大幅度下降同时在过滤占主要很多 Flink CU 的场景下经过过滤后Flink CU 也会有相应的降低。 列裁剪场景
在数据处理中列裁剪也是一种常见需求在原始数据中往往会有全量的字段但是实际的计算只需要特定的字段类似需要在 Flink 中可以使用 project 算子或者 SQL 中的 select 进行列裁剪与变换使用 Flink 使用 project 算子往往意味着数据已经通过网络进入 Flink 计算引擎中全量的数据会消耗着网络带宽和 Flink 的计算性能这种场景下SLS SPL 为 Flink SLS Connector 提供了一种支持投影下推的能力通过配置 SLS Connector 的 query 参数即可实现投影字段下推。避免全量数据传输和全量数据过滤计算。
创建 SQL 作业
创建步骤同行过滤场景在作业草稿中输入如下创建临时表的语句这里 query 参数配置进行了修改在过滤的基础上增加了投影语句可以实现从 SLS 服务端仅拉取特定字段的内容。
CREATE TEMPORARY TABLE sls_input(request_uri STRING,scheme STRING,slbid STRING,status STRING,__topic__ STRING METADATA VIRTUAL,__source__ STRING METADATA VIRTUAL,__timestamp__ STRING METADATA VIRTUAL,__tag__ MAPVARCHAR, VARCHAR METADATA VIRTUAL,proctime as PROCTIME()
) WITH (connector sls,endpoint cn-beijing-intranet.log.aliyuncs.com,accessId ${ak},accessKey ${sk},starttime 2024-01-21 00:00:00,project ${project},logstore test-nginx-log,query * | where slbid slb-01 | project request_uri, scheme, slbid, status, __topic__, __source__, __tag__:__receive_time__
);为了效果下面分行展示语句中配置在 Flink 语句中任然需要单行配置。
*
| where slbid slb-01
| project request_uri, scheme, slbid, status, __topic__, __source__, __tag__:__receive_time__上面使用了 SLS SPL 的管道式语法来实现数据过滤后投影的操作类似 Unix 管道使用|符号将不同指令进行分割上一条指令的输出作为下一条指令的输入最后的指令的输出表示整个管道的输出。
连续查询及效果 在作业中输入分析语句可以看到结果与行过滤场景结果类似。
SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid注意 这里与行过滤不同的是上面的行过滤场景会返回全量的字段而当前的语句令 SLS Connector 只返回特定的字段再次减少了数据的网络传输。
SPL 还可以做什么
上述实例中演示了使用 SLS SPL 的过滤和投影功能来实现 SLS Connector 的“下推”功能可以有效地减少网络流量和 Flink CU 的使用。可以避免在 Flink 进行计算之前进行额外的过滤和投影计算消耗。SLS SPL 的功能不止于过滤与投影SLS SPL 完整支持的语法可以参考文档SPL 指令 [ 1] 。同时SPL管道式语法已全面支持在 Flink Connector 中进行配置。SLS SPL 支持对于数据进行预处理比如正则字段、JSON 字段CSV 字段展开数据格式转换列的增加和减少过滤等。除了用于消费场景在 SLS 的 Scan 模式与采集端都会应用场景以便用户在采集端、消费端都可以使用 SPL 的能力。
相关链接
[1] SPL 指令
https://help.aliyun.com/zh/sls/user-guide/spl-instruction?spma2c4g.11186623.0.0.33f35a3dl8g8KD
[2] 日志服务概述
https://help.aliyun.com/zh/sls/product-overview/what-is-log-service
[3] SPL 概述
https://help.aliyun.com/zh/sls/user-guide/spl-overview
[4] 阿里云 Flink Connector SLS
https://help.aliyun.com/zh/flink/developer-reference/log-service-connector