做网站有什么好书籍,东莞市塘厦镇,建筑网站制作,做网站建设的一般在哪儿找作者#xff5c;SelectDB 技术团队
在数据管理愈加精细化的需求背景下#xff0c;定时调度在其中扮演着重要的角色。它通常被应用于以下场景#xff1a;
定期数据更新#xff0c;如周期性数据导入和 ETL 操作#xff0c;减少人工干预#xff0c;提高数据处理的效率和准…作者SelectDB 技术团队
在数据管理愈加精细化的需求背景下定时调度在其中扮演着重要的角色。它通常被应用于以下场景
定期数据更新如周期性数据导入和 ETL 操作减少人工干预提高数据处理的效率和准确性。结合 Catalog 实现外部数据源数据定期同步确保多源数据高效、准确的整合到目标系统中满足复杂的业务分析需求。定期清理过期/无效数据释放存储空间避免过多过期/无效数据对系统性能产生影响。
在 Apache Doris 之前版本中通常需要依赖于外部调度系统如通过业务代码定时调度或者引入第三方调度工具、分布式调度平台来满足上述需求。然而因受限于外部系统自身能力可能无法满足 Doris 对调度策略及资源管理灵活性的要求。此外如果外部调度系统出现故障这不仅会增加业务风险还需投入额外的运维时间和人力来应对。
引入 Job Scheduler
为解决上述问题Apache Doris 在 2.1 版本中引入了 Job Scheduler 功能实现了自主任务调度能力调度的精准度可达到秒级。该功能的推出不仅保障了数据导入的完整性和一致性更让用户能够灵活、便捷调整调度策略。同时因减少了对外部系统的依赖也降低了系统故障的风险和运维成本为社区用户带来更加统一、可靠的使用体验。
Doris Job Scheduler 是一种基于预设计划运行的任务管理系统能够在特定时间点或按照指定时间间隔触发预定义操作实现任务的自动化执行。Job Scheduler 具备以下特点
高效调度Job Scheduler 可以在指定的时间间隔内安排任务和事件确保数据处理的高效性。采用时间轮算法保证事件能够精准做到秒级触发。灵活调度Job Scheduler 提供了多种调度选项如按 分、小时、天或周的间隔进行调度同时支持一次性调度以及循环周期事件调度并且周期调度也可以指定开始时间、结束时间。事件池和高性能处理队列Job Scheduler 采用 Disruptor 实现高性能的生产消费者模型最大可能的避免任务执行过载。调度记录可追溯Job Scheduler 会存储最新的 Task 执行记录可配置通过简单的命令即可查看任务执行记录确保过程可追溯。高可用依托于 Doris 自身的高可用机制Job Schedule 可以很轻松的做到自恢复、高可用。
具体实现原理可参考本文“设计与实现”章节介绍
语法及示例
01 语法说明
一条有效的 Job 语句需包含以下内容
关键字 CREATE JOB 需加作业名称它在数据库中标识唯一事件。ON SCHEDULE 子句用于指定 Job 作业的类型、触发时间和频率。 AT timestamp 用于一次性事件。它指定 JOB 仅在给定的日期和时间执行一次AT CURRENT_TIMESTAMP 指定当前日期和时间。因 JOB 一旦创建则会立即运行也可用于异步任务创建。EVERY用于周期性作业可指定作业的执行频率关键字后需指定时间间隔周、天、小时、分钟。 Interval用于指定作业执行频率。1 DAY 表示每天执行一次1 HOUR表示每小时执行一次1 MINUTE 表示每分钟执行一次1 WEEK 表示每周执行一次。子句EVERY包含可选 STARTS子句。STARTS后面为 timestamp 值该值用于定义开始重复的时间CURRENT_TIMESTAMP 用于指定当前日期和时间。JOB 一旦创建则会立即运行。子句EVERY包含可选 ENDS子句。ENDS 关键字后面为***timestamp*** 值该值定义 JOB 事件停止运行的时间。 DO 子句用于指定 Job 作业触发时所需执行的操作目前仅支持 Insert 语句。
CREATEJOBjob_nameON SCHEDULE schedule[COMMENT string]DO execute_sql;schedule: {AT timestamp | EVERY interval[STARTS timestamp ][ENDS timestamp ]
}interval:quantity { WEEK |DAY | HOUR | MINUTE}
下方为简单的示例
CREATE JOB my_job ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
该语句表示创建一个名为 my_job的作业每分钟执行一次执行的操作是将 db2.tbl2 中的数据导入到 db1.tbl1中。
02 举例说明
创建一次性的 Job 在 2025-01-01 00:00:00 时执行一次将 db2.tbl2中数据导入到 db1.tbl1 中。
CREATE JOB my_job ON SCHEDULE AT 2025-01-01 00:00:00 DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
创建周期性的 Job未指定结束时间 在 22025-01-01 00:00:00 时开始每天执行 1 次将 db2.tbl2 中数据导入到 db1.tbl1 中。
CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS 2025-01-01 00:00:00 DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time days_add(now(),-1);
创建周期性的 Job指定结束时间 在 2025-01-01 00:00:00 时开始每天执行 1 次将 db2.tbl2 中的数据导入到 db1.tbl1 中在 2026-01-01 00:10:00 时结束。
CREATE JOB my_job ON SCHEDULER EVERY 1 DAY STARTS 2025-01-01 00:00:00 ENDS 2026-01-01 00:10:00 DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 create_time days_add(now(),-1);
借助 Job 实现异步执行 由于 Job 在 Doris 中是以同步任务的形式创建的但其执行过程却是异步进行的这一特性使得 Job 非常适合用于实现异步任务例如常见的 insert into select 任务。
假设需要将db2.tbl2 中的数据导入到 db1.tbl1 中这里只需要指定 JOB 为一次性任务且开始时间设置为当前时间即可。
CREATE JOB my_job ON SCHEDULE AT current_timestamp DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
基于 Catalog 与 Job Scheduler 的数据自动同步
以某电商场景为例用户常常需要从 MySQL 中提取业务数据并将这些数据同步到 Doris 中进行数据分析从而支持精准的营销活动。而 Job Scheduler 可与数据湖能力 Multi Catalog 配合高效完成跨数据源的定期数据同步 以上表为例用户希望查询符合_总消费金额、最后一次访问时间、性别、所在城市_这几个数值条件的用户并将满足条件的用户信息导入到 Doris 中以便后续的定向推送。
1. 首先创建一张 Doris 表
CREATE TABLE IF NOT EXISTS user_activity
(user_id LARGEINT NOT NULL COMMENT 用户id,date DATE NOT NULL COMMENT 数据灌入日期时间,city VARCHAR(20) COMMENT 用户所在城市,age SMALLINT COMMENT 用户年龄,sex TINYINT COMMENT 用户性别,last_visit_date DATETIME REPLACE DEFAULT 1970-01-01 00:00:00 COMMENT 用户最后一次访问时间,cost BIGINT SUM DEFAULT 0 COMMENT 用户总消费,max_dwell_time INT MAX DEFAULT 0 COMMENT 用户最大停留时间,min_dwell_time INT MIN DEFAULT 99999 COMMENT 用户最小停留时间
)
AGGREGATE KEY(user_id, date, city, age, sex)
DISTRIBUTED BY HASH(user_id) BUCKETS 1
PROPERTIES (
replication_allocation tag.location.default: 1
);
2. 其次创建对应 MySQL 库的 Catalog
CREATE CATALOG activity PROPERTIES (typejdbc,userroot,jdbc_url jdbc:mysql://127.0.0.1:9734/user?useSSLfalse,driver_url mysql-connector-java-5.1.49.jar,driver_class com.mysql.jdbc.Driver
);
3. 最后将 MySQL 数据导入到 Doris 中。采用 Catalog Insert Into 的方式来导入全量数据由于全量导入操作可能会引发系统服务波动通常选择在业务闲暇时进行操作。
一次性调度 如下方代码所示使用一次性任务来定时触发全量导入任务触发时间为凌晨 3:00。
CREATE JOB one_time_load_job
ON SCHEDULE
AT 2024-8-10 03:00:00
DOINSERT INTO user_activity FROM SELECT * FROM activity.user.activity
周期调度 用户也可以创建一个周期性的调度任务定期更新最新的数据。
CREATE JOB schedule_load
ON SCHEDULE EVERY 1 DAY
DOINSERT INTO user_activity FROM SELECT * FROM activity.user.activity where create_time days_add(now(),-1)
设计与实现
高效的调度通常伴随着大量的资源消耗高精度的调度更是如此。传统的实现方式是直接使用 Java 内置的定时调度能力——定时调度线程周期访问或采用一些定时调度的工具类库但其在精度以及内存占用上存在较大的问题。为更好保障性能的前提下降低资源的占用我们选择 TimingWheel 算法与 Disruptor 结合实现秒级别的任务调度。 具体来说利用 Netty 的 HashedWheelTimer 实现时间轮算法Job Manager 会周期性默认十分钟地将未来事件放入时间轮中调度。为了保证任务高效触发并避免资源过度占用采用 Disruptor 构建单生产者多消费者模型。时间轮仅负责触发并不直接执行任务。对于到期需触发的任务时会将其放入 Diapatch 线程由其负责将任务分发至相应的执行线程池对于需立即执行的任务则直接将其投递至相应的任务执行线程池中。
对于单次执行事件将在调度完成后删除事件定义对于周期性事件时间轮中的系统事件将定期拉取下一个周期的执行任务。这样可以避免大量任务集中在一个 Bucket 中减少无意义的遍历、提高处理效率。
而对于事务型任务Job Scheduler 能够通过与事务的强关联以及事务回调机制确保事务型任务的执行结果与预期一致从而保证数据的完整性和一致性。
结束语
Doris Job Scheduler 是一款强大且灵活的任务调度工具是数据处理中必不可少的功能之一。除了在数据湖分析、内部 ETL 等常见场景的应用外Job Scheduler 对于异步物化视图的实现也起到关键的作用。异步物化视图是一个预先计算并存储的结果集其数据更新的频率与源表的变动紧密相关。当源表数据更新频繁时为确保物化视图中数据保持最新状态就需要对物化视图定期刷新。因此在 2.1 版本中我们巧妙地利用 JOB 定时调度功能保障了物化视图与源表数据的一致性大幅降低了人工干预的成本。
未来Doris Job Scheduler 还会支持以下特性
支持通过 UI 界面查看不同时段执行的任务分布情况。支持 JOB 流程编排即 DAG JOB。这意味着我们可以在内部实现数仓任务编排与 Catalog 功能叠加将会更高效地完成数据处理和分析工作。支持对导入任务、UPDATE、DELETE 操作进行定时调度。