申请试用
HOT
登录
注册
 
《Apache Doris 源码解析系列直播 —— 第2讲 Stream Load导入作业的执行流程》
Apache Doris
/
发布于
/
333
人观看

《 Apache Doris 源码阅读与解析》系列直播活动旨在帮助 Apache Doris 社区的开发者或者有意向参与 Apache Doris 社区建设的小伙伴们,可以更快熟悉 Apache Doris 代码的组织结构和一些主要流程的实现原理以及代码位置,以便于各位小伙伴们能够快速上手,参与到开发工作中来。

本期内容为《第二讲:stream load 导入作业的执行过程》,这一讲我们将通过讲解用户最用的 Stream Load 导入作业的执行过程,帮助大家了解以下内容:

  1. 如何使用 HTTP 协议处理 Stream Load 导入作业请求
  2. 导入事务的生命周期。
  3. 导入作业的执行计划的生成与计划执行过程。
  4. 导入事务的成功与失败处理。
展开查看详情

1 .Apache Doris 源码阅读与解析 第二讲:Stream Load 任务的执行流程 杨政国

2 .课程介绍 • 流式导入是 Doris 的一个重要功能 • Stream Load,Broker Load 均使用流式导入框架 • 面向开发者 • 如何使用 HTTP 协议处理 Stream Load 导入作业请求 • 导入事务的生命周期 • 导入作业的执行计划的生成与执行过程

3 . 01 Warming Up

4 .导入需要的信息 • 数据源 • 从哪里导入数据 • 数据格式 • 原始数据的格式描述 • 指定数据转换方式 • 如果原始数据和目标表的格式不一致时如何解决 • 导入目标 • 需要导入到的目的表

5 .举个例子 curl --location-trusted -u root \ -H "columns: k1, k2, v1=to_bitmap(k1)" \ -H "column_separator:," \ -H "format: json" \ -T testData http://host:port/api/testDb/testTbl/_stream_load • 需要导入的文件 testData • 数据库 testDb • 表 testTbl • 列映射 • 列分割符 column_separator:, 详细语法帮助:https://doris.apache.org/master/zh-CN/sql-reference/sql- statements/Data%20Manipulation/STREAM%20LOAD.html#description

6 . 数据完整性问题 原始数据 导入结果 V1-Vn-1 • 部分数据丢失,产生脏数据 Vn Vn Vn Vn • 数据重复

7 . 读写冲突问题 原始数据 查询导入结果 未生效 已生效 数据 V1-Vn-1 • 读取到未生效数据 Vn 已生效 已生效 Vn V1-Vn-1 • 生效的数据未被读取到

8 . Coordinator Participant 事务和两阶段提交 Request-t o -prepare • FE 充当协调者 Prepare Phase • Prepare 阶段下发任务和写入数据 Prepared • Submit阶段 Commit/ Abort • 数据状态改为COMMITED Commit • publish 版本 Phase D on e • 状态改为 VISIBLE

9 . FE Leader Phase 1 事务状态 1.Begin Txn PREPARE Analyzer 事务管理 FE Follower 数据版本 N Prepare Txn • 创建事务 • 规划导入执行计划 • 分发子任务

10 . FE Leader Phase 1 事务状态 1.Begin Txn PREPARE Analyzer 事务管理 FE Follower 数据版本 N Execute Txn 3.汇报导入 • 接受查询计划 • 初始化LoadScanNode BE • 初始化 TableSink和 tablet writer 2. Load 数据 BE 未生效数据 生效数据 • Extract & Transform & Load tid-1 V1 V2 • 汇报导入结果 tid-n … Vn BE

11 . FE Leader Phase 2 事务状态 1.Begin Txn COMMITED Analyzer 事务管理 FE Follower 数据版本 N Publish 3.汇报导入 4 publish • 收集导入任务汇报结果 • 发送Publish 消息 BE • 事务状态改为 COMMITED 2. Load 数据 BE 未生效数据 生效数据 • 等待BE 返回 tid-1 V1 V2 tid-n … Vn BE

12 . FE Leader Phase 2 事务状态 1.Begin Txn VISIBLE Analyzer 事务管理 FE Follower 数据版本 N+1 Publish 3.汇报导入 • 修改BE元数据,数据版本 +1 4. publish 5. publish 回调 • 修改FE元数据, 数据版本 +1 BE • 事务状态改为 VISIBLE 2. Load 数据 BE 生效数据 V1 V2 … Vn Vn+1 BE

13 . FE Leader Phase 2 事务状态 1.Begin Txn ABORT Analyzer 事务管理 FE Follower 数据版本 N RollBack 3.汇报导入 • 事务状态改为 Abort • BE 等待回收任务删除已写入数据 BE 2. Load 数据 BE 生效数据 V1 V2 … Vn BE

14 . 02 导入流程

15 . 3.推/拉数据 导入流程概览 1.用户请求 在 Doris 中,数据导入功能主要分为以下几个模块 任务解析 数据接收 • 任务解析模块(Analyzer) FE 2.开启事务 4.ETL • 事务管理器(Txn Manager) BE • 数据接收与转换(Data Receiver, Extract & Transform) 事务管理器 数据转换 • 存储引擎(Storage Engine) 5.写入文件 6.汇报 7.提交/回滚事务 存储引擎

16 .LoadAction.java • 接收HTTP请求 • HTTP PUT 请求,校验密码 • 检验请求参数和权限

17 .LoadAction.java • 使用选择BE • 重定向HTTP 请求

18 .stream_load.h on_header() • stream_load.h中有几个主要方法: 1.解析 • on_header() 2.提交执行计划 process_put() • _process_put() • on_chunk_data() Plan Executor • handle() 3.接收数据 on_chunk_data() 4.等待执行完成 handle()

19 .stream_load.h • on_header() • 生成一个 StreamLoadContext • 调用_on_header() • begin_txn()

20 .stream_load_executor.h • begin_txn(), commit_txn(), rollback_txn() • 分别用于调用 FE 中的 Txn Manager 来开始、提交或回滚一个事务 • execute_plan_fragment() • 用于提交一个导入的执行计划到线程池中执行

21 .stream_load.h • _process_put() • 生成并注册一个 StreamLoadPipe • 向 FE 请求一个执行计划 • 开始执行计划

22 .stream_load.h • on_chunk_data() • 接收数据 • 写入StreamLoadPipe

23 .stream_load.h • handle() • 提交事务 • 回滚事务

24 .GlobalTransactionMgr.java • beginTransaction() • abortTransaction() • commitTransaction

25 . 03 导入规划

26 .FrontendServiceImpl.java • streamLoadPutImpl() • 创建建StreamLoadTask • 创建 StreamLoadPlanner • 创建 执行计划

27 .StreamLoadPlanner.java • plan() • 构建tuple descriptor • 创建 scan node • 创建 sink node

28 .StreamLoadScanNode.java • init() • 初始化扫描范围 • 初始化列映射 • 初始化where 条件 • 初始化行列分隔符

29 .Load.java • initColumns() • 重写表达式 • (v1,v2=v1+1,v3=v2+1) --> (v1, v2=v1+1, v3=v1+1+1) • 解析表达式 • 构建列映射

30 点赞
3 收藏
42下载