网站建设js,网站开发报价文件,wordpress英文模板,网页设计免费模板图片一、说明 在这篇博文中#xff0c;我们将介绍如何将开源流式处理解决方案 bytewax 与 ydata 分析相结合并加以利用#xff0c;以提高流式处理流的质量。 STream 处理支持在传输中和存储之前对数据进行实时分析#xff0c;并且可以是有状态的#xff0c;也可以是无状态的。 … 一、说明 在这篇博文中我们将介绍如何将开源流式处理解决方案 bytewax 与 ydata 分析相结合并加以利用以提高流式处理流的质量。 STream 处理支持在传输中和存储之前对数据进行实时分析并且可以是有状态的也可以是无状态的。 有状态流处理用于实时建议、模式检测或复杂事件处理其中处理需要已发生事件的历史记录窗口、按键连接等。 无状态流处理用于内联转换不需要了解流中的其他数据点例如屏蔽电子邮件或转换类型。 总体而言数据流在工业中被广泛使用并且可以应用于欺诈检测、患者监控或事件预测维护等用例。 二、 数据流必须考虑关键是数据的质量 与通常在创建数据仓库或仪表板解决方案期间评估数据质量的传统模型不同流数据需要持续监视。 在从收集到馈送下游应用程序的整个过程中保持数据质量至关重要。毕竟对于组织来说糟糕的数据质量的成本可能很高。 在本文中我们将向您展示如何结合以分析和提高流媒体流的质量bytewaxydata-profiling 三、使用 Bytewax 为数据专业人员提供流处理 是专门为Python开发人员设计的OSS流处理框架。 它允许用户构建具有类似于FlinkSpark和Kafka Streams功能的流数据管道和实时应用程序同时提供友好和熟悉的界面以及与Python生态系统的100%兼容性。 使用内置连接器或现有的 Python 库您可以连接到实时和流数据源Kafka、RedPanda、WebSocket 等并将转换后的数据写入各种下游系统Kafka、拼花地板文件、数据湖等。 对于转换Bytewax 通过映射、窗口和聚合方法促进有状态和无状态转换并具有恢复和可伸缩性等熟悉的功能。 Bytewax 促进了 Python 优先和以数据为中心的数据流体验并且是为数据工程师和数据科学家构建的。它允许用户构建流数据管道和实时应用程序并创建满足其需求所需的自定义项而无需学习和维护基于 JVM 的流平台如 Spark 或 Flink。 Bytewax 非常适合许多用例即为生成 AI 嵌入管道、处理数据流中的缺失值、在流上下文中使用语言模型来理解金融市场等等。有关用例灵感和更多信息如文档、教程和指南请随时查看字节蜡网站。 四、为什么要对数据流进行数据剖析 数据剖析是成功启动任何机器学习任务的关键指的是彻底了解数据的步骤其结构、行为和质量。 简而言之数据分析涉及分析与数据格式和基本描述符相关的方面例如样本数量、特征的数量/类型、重复值、其内在特征例如存在缺失数据或不平衡的特征以及在数据收集或处理过程中可能出现的其他复杂因素例如错误值或不一致的特征。 确保高数据质量标准对所有领域和组织都至关重要但对于使用输出连续数据的域运营的领域尤其重要其中情况可能会快速变化可能需要立即采取行动例如医疗保健监测、股票价值、空气质量政策。 对于许多领域从探索性数据分析的角度使用数据分析考虑存储在数据库中的历史数据。相反对于数据流数据分析对于沿流持续验证和质量控制变得至关重要需要在流程的不同时间范围或阶段检查数据。 通过将自动分析嵌入到我们的数据流中我们可以立即获得有关数据当前状态的反馈并收到任何潜在关键问题的警报 - 无论是与数据一致性和完整性有关例如损坏的值或更改格式还是与短时间内发生的事件例如数据漂移 偏离业务规则和结果。 在现实世界的领域——你只知道墨菲定律一定会发生“一切都可能出错”——自动分析可能会让我们免于多个大脑难题和需要停止生产的系统 在涉及数据剖析方面无论是表格数据还是时间序列数据它一直是大众的最爱。难怪为什么 - 它是一组广泛的分析和见解的一行代码。ydata-profiling 复杂且耗时的操作是在后台完成的ydata 分析会自动检测数据中包含的特征类型并根据特征类型数字或分类调整分析报告中显示的汇总统计数据和可视化效果。 该软件包促进了以数据为中心的分析还突出了特征之间的现有关系重点关注它们的成对交互和相关性并提供了对数据质量警报的全面评估从重复或常量值到偏斜和不平衡的特征。 它实际上是我们数据质量的 360º 视图 - 只需最少的努力。 分析报告突出显示潜在的数据质量问题。图片由作者提供。 五、把所有东西放在一起字节蜡和ydata-profile。 在开始项目之前我们需要先设置 python 依赖项并配置数据源。 首先让我们安装 和 软件包您可能希望为此使用虚拟环境 - 如果您需要一些额外的指导请查看这些说明bytewaxydata-profiling pip install bytewax0.16.2 ydata-profiling4.3.1 然后我们将上传环境传感器遥测数据集许可证 — CC0公共域其中包含来自不同 IoT 设备的温度、湿度、一氧化碳液化石油气、烟雾、光线和运动的多个测量值 在生产环境中这些测量将由每个设备连续生成输入看起来像我们在 Kafka 等流媒体平台中期望的。在本文中为了模拟我们在流数据中找到的上下文我们将一次一行地从 CSV 文件中读取数据并使用字节蜡创建数据流。 作为快速旁注数据流本质上是一个数据管道可以描述为有向无环图 — DAG 首先让我们进行一些必要的导入 from datetime import datetime, timedelta, timezonefrom bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput
from bytewax.testing import run_main 然后我们定义数据流对象。之后我们将使用无状态映射方法在其中传入一个函数将字符串转换为 datetime 对象并将数据重组为格式device_id数据。 map 方法将以无状态的方式对每个数据点进行更改。我们修改数据形状的原因是我们可以在接下来的步骤中轻松地对数据进行分组以单独分析每个设备的数据而不是同时分析所有设备的数据。 flow Dataflow()
flow.input(simulated_stream, CSVInput(/content/iot_telemetry_data_1000))# parse timestamp
def parse_time(reading_data):reading_data[ts] datetime.fromtimestamp(float(reading_data[ts]), timezone.utc)return reading_dataflow.map(parse_time)# remap format to tuple (device_id, reading_data)
flow.map(lambda reading_data: (reading_data[device], reading_data)) 现在我们将利用有状态功能在我们定义的时间段内收集每个设备的数据。 需要一段时间内的数据快照这使得窗口运算符成为执行此操作的完美方法。 在bytewaxydata-profiling 中我们能够为为特定上下文指定的数据帧生成汇总统计信息。例如在我们的示例中我们可以生成引用每个物联网设备或特定时间框架的数据快照ydata-profiling from bytewax.window import EventClockConfig, TumblingWindow# This is the accumulator function, and outputs a list of readings
def acc_values(acc, reading):acc.append(reading)return acc# This function instructs the event clock on how to retrieve the
# events datetime from the input.
def get_time(reading):return reading[ts]# Configure the fold_window operator to use the event time.
cc EventClockConfig(get_time, wait_for_system_durationtimedelta(seconds30))# And a tumbling window
align_to datetime(2020, 1, 1, tzinfotimezone.utc)
wc TumblingWindow(align_toalign_to, lengthtimedelta(hours1))flow.fold_window(running_average, cc, wc, list, acc_values)flow.inspect(print) 定义快照后利用就像为我们要分析的每个数据帧调用 一样简单ydata-profilingPorfileReport import pandas as pd
from ydata_profiling import ProfileReportdef profile(device_id__readings):print(device_id__readings)device_id, readings device_id__readingsstart_time readings[0][ts].replace(minute0, second0, microsecond0).strftime(%Y-%m-%d %H:%M:%S)df pd.DataFrame(readings)profile ProfileReport(df,tsmodeTrue,sortbyts,titlefSensor Readings - device: {device_id})profile.to_file(fTs_Profile_{device_id}-{start_time}.html)return fdevice {device_id} profiled at hour {start_time}flow.map(profile) 在此示例中我们将图像作为 map 方法中函数的一部分写入本地文件。这些可以通过消息传递工具报告出来或者我们可以在将来将它们保存到一些远程存储中。配置文件完成后数据流需要一些输出因此我们可以使用内置设备打印已分析的设备以及从映射步骤中的配置文件函数传递的配置文件时间StdOutput flow.output(out, StdOutput()) 有多种方法可以执行字节蜡数据流。在这个例子中我们使用相同的本地机器但 Bytewax 也可以在多个 Python 进程上运行跨多个主机在 Docker 容器中运行使用 Kubernetes 集群等等。 在本文中我们将继续使用本地设置但我们鼓励你查看我们的帮助程序工具 waxctl该工具在管道准备好过渡到生产环境后管理 Kubernetes 数据流部署。 假设我们与具有数据流定义的文件位于同一目录中则可以使用以下方法运行它 python -m bytewax.run ydata-profiling-streaming:flow 然后我们可以使用分析报告来验证数据质量检查模式或数据格式的更改并比较不同设备或时间窗口之间的数据特征。 事实上我们可以利用比较报告功能以直接的方式突出显示两个数据配置文件之间的差异从而更容易检测需要调查的重要模式或必须解决的问题 snapshot_a_report ProfileReport(df_a, titleSnapshot A)
snapshot_b_report ProfileReport(df_b, titleSnapshot B)comparison_report snapshot_a_report(snapshot_b_report)
comparison_report.to_file(comparison_report.html) 六、准备好探索您自己的数据流了吗 验证数据流对于连续识别数据质量问题并比较不同时间段的数据状态至关重要。 对于医疗保健、能源、制造和娱乐领域的组织所有组织都在处理连续的数据流分析是建立从质量评估到数据隐私的数据治理最佳实践的关键。 这需要对数据快照进行分析如本文所示可以通过组合 和 无缝实现数据快照。bytewaxydata-profiling Bytewax负责处理数据流并将其构建为快照所需的所有过程然后可以通过数据特征的综合报告对其进行汇总并与ydata分析进行比较。 能够适当地处理和分析传入的数据开启了跨不同领域的大量用例从纠正数据架构和格式中的错误到突出显示和缓解实际活动产生的其他问题例如异常检测例如欺诈或入侵/威胁检测、设备故障以及其他偏离预期的事件例如数据偏移或与业务规则不一致。 现在您就可以开始探索数据流了让我们知道您发现了哪些其他用例并一如既往地在评论中给我们留言或在以数据为中心的 AI 社区中找到我们以获取进一步的问题和建议再见