- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Apache DolphinScheduler与计算中间件Apache Linkis的集成
展开查看详情
1 . 2022 Apache Dolphinscheduler与计算中间件 Apache Linkis的集成 讲师:单葛尧
2 .目录 CONTENTS 01 任务插件介绍 02 与Linkis集成过程
3 . 01 DolphinScheduler 正在处于微内核 + 插件 化的架构更改之中,所有核心能力如任务、 任务插件介绍 资源存储、注册中心等都将被设计为扩展点, 我们希望通过 SPI 来提高 DolphinScheduler 本身的灵活性以及友好 性(扩展性)。
4 .Apache Dolphinscheduler介绍 Apache DolphinScheduler是一个分布式去中心化,易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流 程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
5 .Dolphinscheduler特性 高可靠性 简单易用 丰富的使用场景 高扩展性 去中心化的多 DAG监控界面, 支持多租户,支 支持自定义任务 Master和多 所有流程定义都 持暂停恢复操作. 类型,调度器使 Worker服务对等 是可视化,通过 紧密贴合大数据 用分布式调度, 架构, 避免单 拖拽任务完成定 生态,提供Spark, 调度能力随集群 Master压力过大, 制DAG,通过 Hive, M/R, 线性增长, 另外采用任务缓 API方式与第三方 Python, Master和Worker 冲队列来避免过 系统集成, 一键部 Sub_process, 支持动态上下线 载 署 Shell等近20种任 务类型
6 . Dolphinscheduler任务组件使用示例 本案例为创建一个视图表 terms 并写入三行数据和一个格式为 parquet 的表 wc 并判断该表是否存在。程序类型为 SQL。将视图表 terms 的数据插入到格式为 parquet 的表 wc。
7 .SPI 服务发现 SPI 全称为 (Service Provider Interface) , 是 JDK 内置的一种服务提供发现机制。大 多数人可能会很少用到它,因为它的定位主 要是面向开发厂商的,在 java.util.ServiceLoader 的文档里有比较详 细的介绍,其抽象的概念是指动态加载某个 服务实现。 Apache DolphinScheduler 分为逻辑 Task 以及物理 Task,逻辑 Task 指 DependTask, SwitchTask 这种逻辑上的 Task;物理 Task 是指 ShellTask,SQLTask 这种执行 任务的 Task。而在 Apache DolphinScheduler中,我们一般扩充的都是 物理 Task,而物理 Task 都是交由 Worker 去执行,所以我们要明白的是,当我们在有 多台 Worker 的情况下,要将自定义的 Task 分发到每一台有 Worker 的机器上, 当我们启动 Worker 服务时,worker 会去启 动一个 ClassLoader 来加载相应的实现了 规则的 Task lib。
8 .构建Task实现 通过 TaskChannel 我们 得到了可执行的物理 Task,但是我们需要给 当前 Task 添加相应的实 现,才能够让Apache DolphinScheduler 去执 行你的任务,首先在编写 Task 之前我们需要先了 解一下 Task 之间的关系:
9 .任务插件开发流程(基于SHELL的任务) 基于YARN的计算(参见MapReduceTask) • 需要在 org.apache.dolphinscheduler.server.worker.task 下的 TaskManager 类中创建自 定义任务(也需在TaskType注册对应的任务类型) • 需要继承org.apache.dolphinscheduler.server.worker.task 下的 AbstractYarnTask • 构造方法调度 AbstractYarnTask 构造方法 • 继承 AbstractParameters 自定义任务参数实体 • 重写 AbstractTask 的 init 方法中解析自定义任务参数 • 重写 buildCommand 封装command
10 .任务插件开发流程(基于SHELL的任务) 基于非YARN的计算(参见ShellTask) • 需要在 org.apache.dolphinscheduler.server.worker.task 下的 TaskManager 中创建自定义任务 • 需要继承 org.apache.dolphinscheduler.server.worker.task 下的 AbstractTask • 构造方法中实例化 ShellCommandExecutor public ShellTask(TaskProps props, Logger logger) { super(props, logger); this.taskDir = props.getTaskDir(); this.processTask = new ShellCommandExecutor(this::logHandle, props.getTaskDir(), props.getTaskAppId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(), props.getTaskTimeout(), logger); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } • 传入自定义任务的 TaskProps和自定义Logger,TaskProps 封装了任务的信息,Logger分装了自定义日志信息 • 继承 AbstractParameters 自定义任务参数实体 • 重写 AbstractTask 的 init 方法中解析自定义任务参数实体 • 重写 handle 方法,调用 ShellCommandExecutor 的 run 方法,第一个参数传入自己的command,第二个参数传入 ProcessDao,设置相应 的 exitStatusCode
11 .任务插件开发流程(基于非SHELL的任务) 基于非SHELL的任务(参见SqlTask) • 需要在 org.apache.dolphinscheduler.server.worker.task 下的 TaskManager 中创建自定 义任务 • 需要继承org.apache.dolphinscheduler.server.worker.task 下的 AbstractTask • 继承 AbstractParameters 自定义任务参数实体 • 构造方法或者重写 AbstractTask 的 init 方法中,解析自定义任务参数实体 • 重写 handle 方法实现业务逻辑并设置相应的exitStatusCode
12 . 02 本着开源共建的思想,我们集成了优秀 的计算中间件Linkis作为任务插件。作为计 与Linkis的集成 算中间件,Linkis 提供了强大的连通、复用、 编排、扩展和治理管控能力。通过计算中间 件将应用层和引擎层解耦,简化了复杂的网 络调用关系,降低了整体复杂度,同时节约 了整体开发和维护成本。
13 .Linkis架构概要 Linkis 基于微服务架构开发,其服务可 以分为3类:计算治理服务、公共增强服 务和微服务治理服务。 • 计算治理服务,支持计算任务/请求 处理流程的3个主要阶段:提交->准备 ->执行; • 公共增强服务,包括上下文服务、 物料管理服务及数据源服务等; • 微服务治理服务,包括定制化的 Spring Cloud Gateway、Eureka、 Open Feign。 左侧是Linkis 的架构概要图.
14 . 应用方式(JAVA SDK) private static JobExecuteResult toSubmit(String user, String code) { 引入依赖模块 // 1. build params // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant <dependency> Map<String, Object> labels = new HashMap<String, Object>(); <groupId>org.apache.linkis</groupId> labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType <artifactId>linkis-computation- Label client</artifactId> labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-APPName");// <version>${linkis.version}</version> required execute user and creator eg:hadoop-IDE </dependency> labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // required codeType 如: // set start up map :engineConn start params <dependency> Map<String, Object> startupMap = new HashMap<String, Object>(16); <groupId>org.apache.linkis</groupId> // Support setting engine native parameters,For example: parameters of engines such as <artifactId>linkis-computation- spark/hive client</artifactId> startupMap.put("spark.executor.instances", 2); <version>1.0.3</version> // setting linkis params </dependency> startupMap.put("wds.linkis.rm.yarnqueue", "dws"); // 2. build jobSubmitAction JobSubmitAction jobSubmitAction = JobSubmitAction.builder() .addExecuteCode(code) .setStartupParams(startupMap) .setUser(user) //submit user .addExecuteUser(user) // execute user .setLabels(labels) .build(); // 3. to execute return client.submit(jobSubmitAction); }
15 .应用方式(Linkis-Cli) 第一步,检查conf/目录下是否存在默认配置文件linkis-cli.properties,且包含以下配置: #linkis-mg-gateway服务地址 wds.linkis.client.common.gatewayUrl=http://127.0.0.1:9001 #认证鉴权策略 token/static wds.linkis.client.common.authStrategy=token #static 模式下为用户名/密码,token模式下为linkis-mg-gateway_auth_token表中token_name 和 logal_users wds.linkis.client.common.tokenKey=Validation-Code wds.linkis.client.common.tokenValue=BML-AUTH 第二步,进入linkis安装目录,输入指令: sh ./bin/linkis-cli -engineType spark-2.4.3 -codeType sql -code "select count(*) from testdb.test;" - submitUser hadoop -proxyUser hadoop 第三步,您会在控制台看到任务被提交到linkis,并开始执行的信息。
16 .使用案例 若生产环境中要是使用到 Linkis 任务类型,则需要先配置好所需的环境,配置文件如下: /dolphinscheduler/conf/env/dolphinscheduler_env.sh
17 .使用案例
18 .集成流程 考虑到多版本的兼容问题,我们选择了以shell方式使用Linkis-cli来执行Linkis任务。 接下来我们讲一下dolphinscheduler的集成的详细过程。 左侧Linkis插件的目录结构,主要就分为四个主要的文 件: • LinkisParameters(参数) • LinkisTask(任务执行) • LinkisTaskChannel(创建、取消等实现类) • LinkisTaskChannelFactory(TaskChannel的工厂实 现类)
19 .集成流程 我们来看这个提交方法,我们根据用户的输入、参数的配置构建了一个Linkis的shell执行字符串,通过我们定义的 shellCommandExecutor去执行该command,实现对Linkis任务的提交。 Tips: • 通过Linkis-cli的status来实现。 • dolphincheduler在提交Linkis任务时会自动加入参数--async true以实现异步提交。
20 .集成流程 接着我们来看这个监听任务状态的方 法,它会每隔一段时间去运行该监听 方法,根据对返回日志中status的检 查,我们获取到最新的任务状态,并 随着Linkis状态的改变来更新 dolohinscheduler的任务状态。 Tips:通过linkis-cli的status来实现。 当然KILL任务同理,我们也就不再 赘述了。
21 .愿景 首先非常感谢大家选择和使用 DolphinScheduler,非常欢迎大家加入 DolphinScheduler 大家 庭,融入开源世界! 我们鼓励任何形式的参与社区,最终成为 Committer 或 PPMC,如: • 将遇到的问题通过 github 上 issue 的形式反馈出来 • 回答别人遇到的 issue 问题 • 帮助完善文档 • 帮助项目增加测试用例 • 为代码添加注释 • 提交修复 Bug 或者 Feature 的 PR • 发表应用案例实践、调度流程分析或者与调度相关的技术文章 • 帮助推广 DolphinScheduler,参与技术大会或者 meetup 的分享等 另外本次演讲我们详细讲了Dolphinscheduler集成任务插件的流程,希望大家可以根据自己的 使用情况,多多丰富我们的任务插件,让我们一起共建蓬勃的开源大数据生态! 相信参与 DolphinScheduler,一定会让您从开源中受益!
22 .愿景 首先非常感谢大家选择和使用 DolphinScheduler,非常欢迎大家加入 DolphinScheduler 大家庭,融入开源世界! 我们鼓励任何形式的参与社区,最终成为 Committer 或 PPMC,如: • 将遇到的问题通过 github 上 issue 的形式反馈出来 • 回答别人遇到的 issue 问题 • 帮助完善文档 • 帮助项目增加测试用例 • 为代码添加注释 • 提交修复 Bug 或者 Feature 的 PR • 发表应用案例实践、调度流程分析或者与调度相关的技术文章 • 帮助推广 DolphinScheduler,参与技术大会或者 meetup 的分享等 另外本次演讲我们详细讲了Dolphinscheduler集成任务插件的流程,希望大 家可以根据自己的使用情况,多多丰富我们的任务插件,让我们一起共建蓬 勃的开源大数据生态! 获取更多活动资讯,关注海豚调度公众号 相信参与 DolphinScheduler,一定会让您从开源中受益!
23 . THANKS! Ending