申请试用
HOT
登录
注册
 
Apache DolphinScheduler核心之 DAG 大 JSON 拆分详解
3 点赞
3 收藏
7下载
DolphinScheduler社区
/
发布于
/
162
人观看
展开查看详情

1.dolphinscheduler json拆解 1、为什么拆解json 在dolphinscheduler 1.3.x及以前的工作流中的任务及任务关系保存时是以大json的方式保存到 数据库中process_definiton表的process_definition_json字段,如果某个工作流很大比如有100 或者1000个任务,这个json字段也就非常大,在使用时需要解析json,非常耗费性能,且任务 没法重用;基于大json,在工作流版本及任务版本上也没有好的实现方案,否则会导致数据大量 冗余。 故社区计划启动json拆解项目,实现的需求目标: 大json完全拆分 新增工作流及任务版本 引入全局唯一键(code) 2、如何设计拆解后的表 1、1.3.6版本工作流 1、比如在当前1.3.6版本创建个a-->b的工作流 以下是processDefiniton save 接口在controller入口打印的入参日志

2. create process definition, project name: hadoop, process definition name: ab, process_definition_json: {"globalParams":[],"tasks": [{"type":"SHELL","id":"tasks-77643","name":"a","params":{"resourceList": [],"localParams": [{"prop":"yesterday","direct":"IN","type":"VARCHAR","value":"${system.biz. date}"}],"rawScript":"echo ${yesterday}"},"description":"","timeout": {"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditi onResult":{"successNode":[""],"failedNode":[""]},"dependence": {},"maxRetryTimes":"0","retryInterval":"1","taskInstancePriority":"MEDIUM" ,"workerGroup":"default","preTasks":[]},{"type":"SHELL","id":"tasks- 99814","name":"b","params":{"resourceList":[],"localParams": [{"prop":"today","direct":"IN","type":"VARCHAR","value":"${system.biz.curd ate}"}],"rawScript":"echo ${today}"},"description":"","timeout": {"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditi onResult":{"successNode":[""],"failedNode":[""]},"dependence": {},"maxRetryTimes":"0","retryInterval":"1","taskInstancePriority":"MEDIUM" ,"workerGroup":"default","preTasks":["a"]}],"tenantId":1,"timeout":0}, desc: locations:{"tasks-77643": {"name":"a","targetarr":"","nodenumber":"1","x":251,"y":166},"tasks- 99814":{"name":"b","targetarr":"tasks- 77643","nodenumber":"0","x":533,"y":161}}, connects: [{"endPointSourceId":"tasks-77643","endPointTargetId":"tasks-99814"}] 2、依赖节点的工作流,dep是依赖节点 以下是processDefiniton save 接口在controller入口打印的入参日志

3. create process definition, project name: hadoop, process definition name: dep_c, process_definition_json: {"globalParams":[],"tasks": [{"type":"SHELL","id":"tasks-69503","name":"c","params":{"resourceList": [],"localParams":[],"rawScript":"echo 11"},"description":"","timeout": {"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditi onResult":{"successNode":[""],"failedNode":[""]},"dependence": {},"maxRetryTimes":"0","retryInterval":"1","taskInstancePriority":"MEDIUM" ,"workerGroup":"default","preTasks":["dep"]}, {"type":"DEPENDENT","id":"tasks-22756","name":"dep","params": {},"description":"","timeout": {"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditi onResult":{"successNode":[""],"failedNode":[""]},"dependence": {"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList": [{"projectId":1,"definitionId":1,"depTasks":"b","cycle":"day","dateValue": "today"}]}]},"maxRetryTimes":"0","retryInterval":"1","taskInstancePriority ":"MEDIUM","workerGroup":"default","preTasks": []}],"tenantId":1,"timeout":0}, desc: locations:{"tasks-69503": {"name":"c","targetarr":"tasks- 22756","nodenumber":"0","x":597,"y":166},"tasks-22756": {"name":"dep","targetarr":"","nodenumber":"1","x":308,"y":164}}, connects: [{"endPointSourceId":"tasks-22756","endPointTargetId":"tasks-69503"}] 3、条件判断的工作流 以下是processDefiniton save 接口在controller入口打印的入参日志

4. create process definition, project name: hadoop, process definition name: condition_test, process_definition_json: {"globalParams":[],"tasks": [{"type":"SHELL","id":"tasks-68456","name":"d","params":{"resourceList": [],"localParams":[],"rawScript":"echo 11"},"description":"","timeout": {"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditi onResult":{"successNode":[""],"failedNode":[""]},"dependence": {},"maxRetryTimes":"0","retryInterval":"1","taskInstancePriority":"MEDIUM" ,"workerGroup":"default","preTasks":[]},{"type":"SHELL","id":"tasks- 58183","name":"e","params":{"resourceList":[],"localParams": [],"rawScript":"echo 22"},"description":"","timeout": {"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditi onResult":{"successNode":[""],"failedNode":[""]},"dependence": {},"maxRetryTimes":"0","retryInterval":"1","taskInstancePriority":"MEDIUM" ,"workerGroup":"default","preTasks":["cond"]},{"type":"SHELL","id":"tasks- 43996","name":"f","params":{"resourceList":[],"localParams": [],"rawScript":"echo 33"},"description":"","timeout": {"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditi onResult":{"successNode":[""],"failedNode":[""]},"dependence": {},"maxRetryTimes":"0","retryInterval":"1","taskInstancePriority":"MEDIUM" ,"workerGroup":"default","preTasks":["cond"]}, {"type":"CONDITIONS","id":"tasks-38972","name":"cond","params": {},"description":"","timeout": {"strategy":"","interval":null,"enable":false},"runFlag":"NORMAL","conditi onResult":{"successNode":["e"],"failedNode":["f"]},"dependence": {"relation":"AND","dependTaskList":[{"relation":"AND","dependItemList": [{"depTasks":"d","status":"SUCCESS"}]}]},"maxRetryTimes":"0","retryInterva l":"1","taskInstancePriority":"MEDIUM","workerGroup":"default","preTasks": ["d"]}],"tenantId":1,"timeout":0}, desc: locations:{"tasks-68456": {"name":"d","targetarr":"","nodenumber":"1","x":168,"y":158},"tasks- 58183":{"name":"e","targetarr":"tasks- 38972","nodenumber":"0","x":573,"y":82},"tasks-43996": {"name":"f","targetarr":"tasks- 38972","nodenumber":"0","x":591,"y":288},"tasks-38972": {"name":"cond","targetarr":"tasks- 68456","nodenumber":"2","x":382,"y":175}}, connects: [{"endPointSourceId":"tasks-68456","endPointTargetId":"tasks-38972"}, {"endPointSourceId":"tasks-38972","endPointTargetId":"tasks-58183"}, {"endPointSourceId":"tasks-38972","endPointTargetId":"tasks-43996"}] 从以上三个案例中,我们知道controller的入口参数的每个参数都可以在 t_ds_process_definition表中找到对应,故表中数据如下图 2、拆解后的表设计思路

5.工作流只是dag的展现形式,任务通过工作流进行组织,组织的同时存在了任务之间的关系,也 就是依赖。就好比一个画板,画板上有些图案,工作流就是画板,图案就是任务,图案之间的关 系就是依赖。而调度的核心是调度任务,依赖只是表述调度的先后顺序。当前定时还是对整个工 作流进行的定时,拆解后就方便对单独任务进行调度。正是基于这个思想设计了拆解的思路,所 以这就需要三张表,工作流定义表、任务定义表、任务关系表。 工作流定义表:描述工作流的基本信息,比如全局参数、dag中节点的位置信息 任务定义表:描述任务的详情信息,比如任务类别、任务容错信息、优先级等 任务关系表:描述任务的关系信息,比如当前节点、前置节点等 基于这个设计思想再扩展到版本,无非是对于这三张表,每张表新增个保存版本的日志表。 工作流定义表 现在看案例中save接口日志,现有字段(project、process_definition_name、desc、 locations、connects),对于json中除了task之外的还剩下 {"globalParams":[],"tenantId":1,"timeout":0} 所以可知工作流定义表: CREATE TABLE `t_ds_process_definition` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id', `code` bigint(20) NOT NULL COMMENT 'encoding', `name` varchar(200) DEFAULT NULL COMMENT 'process definition name', `version` int(11) DEFAULT NULL COMMENT 'process definition version', `description` text COMMENT 'description', `project_code` bigint(20) NOT NULL COMMENT 'project code', `release_state` tinyint(4) DEFAULT NULL COMMENT 'process definition release state: 0:offline,1:online', `user_id` int(11) DEFAULT NULL COMMENT 'process definition creator id', `global_params` text COMMENT 'global parameters', `flag` tinyint(4) DEFAULT NULL COMMENT '0 not available, 1 available', `locations` text COMMENT 'Node location information', `connects` text COMMENT 'Node connection information', `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id', `timeout` int(11) DEFAULT '0' COMMENT 'time out, unit: minute', `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`,`code`), UNIQUE KEY `process_unique` (`name`,`project_code`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `t_ds_process_definition_log` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id', `code` bigint(20) NOT NULL COMMENT 'encoding', `name` varchar(200) DEFAULT NULL COMMENT 'process definition name', `version` int(11) DEFAULT NULL COMMENT 'process definition version', `description` text COMMENT 'description', `project_code` bigint(20) NOT NULL COMMENT 'project code',

6. `release_state` tinyint(4) DEFAULT NULL COMMENT 'process definition release state: 0:offline,1:online', `user_id` int(11) DEFAULT NULL COMMENT 'process definition creator id', `global_params` text COMMENT 'global parameters', `flag` tinyint(4) DEFAULT NULL COMMENT '0 not available, 1 available', `locations` text COMMENT 'Node location information', `connects` text COMMENT 'Node connection information', `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id', `timeout` int(11) DEFAULT '0' COMMENT 'time out,unit: minute', `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id', `operator` int(11) DEFAULT NULL COMMENT 'operator user id', `operate_time` datetime DEFAULT NULL COMMENT 'operate time', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 从表字段可以看出 日志表仅仅比主表多了两个字段operator、operate_time 任务定义表 案例中ab工作流task的json "tasks": [{ "type": "SHELL", "id": "tasks-77643", "name": "a", "params": { "resourceList": [], "localParams": [{ "prop": "yesterday", "direct": "IN", "type": "VARCHAR", "value": "${system.biz.date}" }], "rawScript": "echo ${yesterday}" }, "description": "", "timeout": { "strategy": "", "interval": null, "enable": false }, "runFlag": "NORMAL", "conditionResult": { "successNode": [""], "failedNode": [""] }, "dependence": {}, "maxRetryTimes": "0",

7. "retryInterval": "1", "taskInstancePriority": "MEDIUM", "workerGroup": "default", "preTasks": [] }, { "type": "SHELL", "id": "tasks-99814", "name": "b", "params": { "resourceList": [], "localParams": [{ "prop": "today", "direct": "IN", "type": "VARCHAR", "value": "${system.biz.curdate}" }], "rawScript": "echo ${today}" }, "description": "", "timeout": { "strategy": "", "interval": null, "enable": false }, "runFlag": "NORMAL", "conditionResult": { "successNode": [""], "failedNode": [""] }, "dependence": {}, "maxRetryTimes": "0", "retryInterval": "1", "taskInstancePriority": "MEDIUM", "workerGroup": "default", "preTasks": ["a"] }] dep_c工作流task的json "tasks": [{ "type": "SHELL", "id": "tasks-69503", "name": "c", "params": { "resourceList": [], "localParams": [], "rawScript": "echo 11" }, "description": "", "timeout": { "strategy": "",

8. "interval": null, "enable": false }, "runFlag": "NORMAL", "conditionResult": { "successNode": [""], "failedNode": [""] }, "dependence": {}, "maxRetryTimes": "0", "retryInterval": "1", "taskInstancePriority": "MEDIUM", "workerGroup": "default", "preTasks": ["dep"] }, { "type": "DEPENDENT", "id": "tasks-22756", "name": "dep", "params": {}, "description": "", "timeout": { "strategy": "", "interval": null, "enable": false }, "runFlag": "NORMAL", "conditionResult": { "successNode": [""], "failedNode": [""] }, "dependence": { "relation": "AND", "dependTaskList": [{ "relation": "AND", "dependItemList": [{ "projectId": 1, "definitionId": 1, "depTasks": "b", "cycle": "day", "dateValue": "today" }] }] }, "maxRetryTimes": "0", "retryInterval": "1", "taskInstancePriority": "MEDIUM", "workerGroup": "default", "preTasks": [] }] condition_test工作流task的json

9."tasks": [{ "type": "SHELL", "id": "tasks-68456", "name": "d", "params": { "resourceList": [], "localParams": [], "rawScript": "echo 11" }, "description": "", "timeout": { "strategy": "", "interval": null, "enable": false }, "runFlag": "NORMAL", "conditionResult": { "successNode": [""], "failedNode": [""] }, "dependence": {}, "maxRetryTimes": "0", "retryInterval": "1", "taskInstancePriority": "MEDIUM", "workerGroup": "default", "preTasks": [] }, { "type": "SHELL", "id": "tasks-58183", "name": "e", "params": { "resourceList": [], "localParams": [], "rawScript": "echo 22" }, "description": "", "timeout": { "strategy": "", "interval": null, "enable": false }, "runFlag": "NORMAL", "conditionResult": { "successNode": [""], "failedNode": [""] }, "dependence": {}, "maxRetryTimes": "0", "retryInterval": "1", "taskInstancePriority": "MEDIUM", "workerGroup": "default", "preTasks": ["cond"]

10.}, { "type": "SHELL", "id": "tasks-43996", "name": "f", "params": { "resourceList": [], "localParams": [], "rawScript": "echo 33" }, "description": "", "timeout": { "strategy": "", "interval": null, "enable": false }, "runFlag": "NORMAL", "conditionResult": { "successNode": [""], "failedNode": [""] }, "dependence": {}, "maxRetryTimes": "0", "retryInterval": "1", "taskInstancePriority": "MEDIUM", "workerGroup": "default", "preTasks": ["cond"] }, { "type": "CONDITIONS", "id": "tasks-38972", "name": "cond", "params": {}, "description": "", "timeout": { "strategy": "", "interval": null, "enable": false }, "runFlag": "NORMAL", "conditionResult": { "successNode": ["e"], "failedNode": ["f"] }, "dependence": { "relation": "AND", "dependTaskList": [{ "relation": "AND", "dependItemList": [{ "depTasks": "d", "status": "SUCCESS" }] }] },

11. "maxRetryTimes": "0", "retryInterval": "1", "taskInstancePriority": "MEDIUM", "workerGroup": "default", "preTasks": ["d"] }] 从案例中可以知道SHELL/DEPENDENT/CONDITIONS类型的节点的json构成(其他任务类似 SHELL),preTasks标识前置依赖节点。conditionResult结构比较固定,而dependence结构复 杂,DEPENDENT和CONDITIONS类型任务的dependence结构还不一样,所以为了统一,我们 将conditionResult和dependence整体放到params中,params对应表字段的task_params。 这样我们就确定了t_ds_task_definition表 CREATE TABLE `t_ds_task_definition` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id', `code` bigint(20) NOT NULL COMMENT 'encoding', `name` varchar(200) DEFAULT NULL COMMENT 'task definition name', `version` int(11) DEFAULT NULL COMMENT 'task definition version', `description` text COMMENT 'description', `project_code` bigint(20) NOT NULL COMMENT 'project code', `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id', `task_type` varchar(50) NOT NULL COMMENT 'task type', `task_params` text COMMENT 'job custom parameters', `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available', `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority', `worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping', `fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries', `fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval', `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open', `timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail', `timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute', `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute', `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`,`code`), UNIQUE KEY `task_unique` (`name`,`project_code`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `t_ds_task_definition_log` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id', `code` bigint(20) NOT NULL COMMENT 'encoding', `name` varchar(200) DEFAULT NULL COMMENT 'task definition name', `version` int(11) DEFAULT NULL COMMENT 'task definition version',

12. `description` text COMMENT 'description', `project_code` bigint(20) NOT NULL COMMENT 'project code', `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id', `task_type` varchar(50) NOT NULL COMMENT 'task type', `task_params` text COMMENT 'job custom parameters', `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available', `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority', `worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping', `fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries', `fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval', `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open', `timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail', `timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute', `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute', `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma', `operator` int(11) DEFAULT NULL COMMENT 'operator user id', `operate_time` datetime DEFAULT NULL COMMENT 'operate time', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 注意:dev版本和1.3.6版本区别,dev版本已将description换成desc,并且新增了delayTime { "globalParams": [], "tasks": [{ "type": "SHELL", "id": "tasks-18200", "name": "d", "code": "", "params": { "resourceList": [], "localParams": [], "rawScript": "echo 5" }, "desc": "", "runFlag": "NORMAL", "conditionResult": { "successNode": [ "" ], "failedNode": [ "" ]

13. }, "dependence": {}, "maxRetryTimes": "0", "retryInterval": "1", "delayTime": "0", "timeout": { "strategy": "", "interval": null, "enable": false }, "waitStartTimeout": {}, "taskInstancePriority": "MEDIUM", "workerGroup": "hadoop", "preTasks": [], "depList": null }, { "type": "SHELL", "id": "tasks-55225", "name": "e", "code": "", "params": { "resourceList": [], "localParams": [], "rawScript": "echo 6" }, "desc": "", "runFlag": "NORMAL", "conditionResult": { "successNode": [ "" ], "failedNode": [ "" ] }, "dependence": {}, "maxRetryTimes": "0", "retryInterval": "1", "delayTime": "0", "timeout": { "strategy": "", "interval": null, "enable": false }, "waitStartTimeout": {}, "taskInstancePriority": "MEDIUM", "workerGroup": "hadoop", "preTasks": [ "def" ], "depList": null

14.}, { "type": "SHELL", "id": "tasks-67639", "name": "f", "code": "", "params": { "resourceList": [], "localParams": [], "rawScript": "echo 7" }, "desc": "", "runFlag": "NORMAL", "conditionResult": { "successNode": [ "" ], "failedNode": [ "" ] }, "dependence": {}, "maxRetryTimes": "0", "retryInterval": "1", "delayTime": "0", "timeout": { "strategy": "", "interval": null, "enable": false }, "waitStartTimeout": {}, "taskInstancePriority": "MEDIUM", "workerGroup": "hadoop", "preTasks": [ "def" ], "depList": null }, { "type": "CONDITIONS", "id": "tasks-67387", "name": "def", "code": "", "params": {}, "desc": "", "runFlag": "NORMAL", "conditionResult": { "successNode": [ "e" ], "failedNode": [ "f"

15. ] }, "dependence": { "relation": "AND", "dependTaskList": [{ "relation": "AND", "dependItemList": [{ "depTasks": "d", "status": "SUCCESS" }, { "depTasks": "d", "status": "FAILURE" } ] }] }, "maxRetryTimes": "0", "retryInterval": "1", "delayTime": "0", "timeout": { "strategy": "", "interval": null, "enable": false }, "waitStartTimeout": {}, "taskInstancePriority": "MEDIUM", "workerGroup": "hadoop", "preTasks": [ "d" ], "depList": null } ], "tenantId": 1, "timeout": 0 } 任务关系表 preTasks标识前置依赖节点,当前节点在关系表中使用postTask标识。由于当前节点肯定存在 而前置节点不一定存在,所以post不可能为空,而preTask有可能为空 CREATE TABLE `t_ds_process_task_relation` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id', `name` varchar(200) DEFAULT NULL COMMENT 'relation name', `process_definition_version` int(11) DEFAULT NULL COMMENT 'process version', `project_code` bigint(20) NOT NULL COMMENT 'project code', `process_definition_code` bigint(20) NOT NULL COMMENT 'process code',

16. `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code', `pre_task_version` int(11) NOT NULL COMMENT 'pre task version', `post_task_code` bigint(20) NOT NULL COMMENT 'post task code', `post_task_version` int(11) NOT NULL COMMENT 'post task version', `condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay', `condition_params` text COMMENT 'condition params(json)', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `t_ds_process_task_relation_log` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id', `name` varchar(200) DEFAULT NULL COMMENT 'relation name', `process_definition_version` int(11) DEFAULT NULL COMMENT 'process version', `project_code` bigint(20) NOT NULL COMMENT 'project code', `process_definition_code` bigint(20) NOT NULL COMMENT 'process code', `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code', `pre_task_version` int(11) NOT NULL COMMENT 'pre task version', `post_task_code` bigint(20) NOT NULL COMMENT 'post task code', `post_task_version` int(11) NOT NULL COMMENT 'post task version', `condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay', `condition_params` text COMMENT 'condition params(json)', `operator` int(11) DEFAULT NULL COMMENT 'operator user id', `operate_time` datetime DEFAULT NULL COMMENT 'operate time', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 对于依赖关系复杂的场景

17.3、API模块如何改造 api模块进行save操作时 1. 通过雪花算法生成13位的数字作为process_definition_code,工作流定义保存至 process_definition(主表)和process_definition_log(日志表),这两个表保存的是同样 的数据,工作流定义版本为1 2. 通过雪花算法生成13位的数字作为task_definition_code,任务定义表保存至 task_definition(主表)和task_definition_log(日志表),也是保存同样的数据,任务定 义版本为1 3. 工作流任务关系保存在 process_task_relation(主表)和process_task_relation_log(日 志表),该表保存的code和version是工作流的code和version,因为任务是通过工作流进 行组织,以工作流来画dag。也是通过post_task_code和post_task_version标识dag的当前 节点,这个节点的前置依赖通过pre_task_code和pre_task_version来标识,如果没有依 赖,pre_task_code和pre_task_version为0 api模块进行update操作时,工作流定义和任务定义直接更新主表数据,更新后的数据 insert到日志表。关系表主表先删除然后再插入新的关系,关系表日志表直接插入新的关系 api模块进行delete操作时,工作流定义、任务定义和关系表直接删除主表数据,日志表数 据不变动 api模块进行switch操作时,直接将日志表中对应version数据覆盖到主表 4、数据交互如何改造

18. 在json拆分一期api模块controller层整体未变动,传入的大json还是在service层映射为 ProcessData对象。insert或update操作在公共Service模块通过 ProcessService.saveProcessDefiniton()入口完成保存数据库操作,按照task_definition、 process_task_relation、process_definition的顺序保存。保存时,如果该任务已经存在并 且关联的工作流未上线,则更改任务;如果任务关联的工作流已上线,则不允许更改任务 api查询操作时,当前还是通过工作流id来查询,在公共Service模块通过 ProcessService.genTaskNodeList()入口完成数据组装,还是组装为ProcessData对象,进 而生成json返回 Server模块(Master)也是通过公共Service模块ProcessService.genTaskNodeList()获得 TaskNodeList生成调度dag,把当前任务所有信息放到 MasterExecThread.readyToSubmitTaskQueue队列,以便生成taskInstance,dispatch给 worker 5、当前json还需做什么

19. controller对外restAPI接口改造 ui模块dag改造 ui模块新增task操作页面 processDefinition 数据类 接口名称 参数名称 参数说明 型 projectName 项目名称 string name 流程定义名称 string

20. 数据类 接口名称 参数名称 参数说明 型 description 流程定义描述信息 string globalParams 全局参数 string 流程定义节点图标连接信息 save connects string (json格式) 流程定义节点坐标位置信息 locations string (json格式) timeout 超时分钟数 int tenantId 租户id int taskRelationJson 任务关系(json格式) string -- -- -- -- projectName 项目名称 string code 流程定义code long name 流程定义名称 string description 流程定义描述信息 string 发布流程定义,可用 update releaseState ref 值:OFFLINE,ONLINE 流程定义节点图标连接信息 connects string (json格式) 流程定义节点坐标位置信息 locations string (json格式) timeout 超时分钟数 int tenantId 租户id int taskRelationJson 任务关系(json格式) string -- -- -- code 流程定义code long switch/deleteCode projectName 项目名称 string version 版本号 string 备注:taskRelationJson格式: [{"name":"","pre_task_code":0,"pre_task_version":0,"post_task_code":123456789,"post_task _version":1,"condition_type":0,"condition_params":{}}, {"name":"","pre_task_code":123456789,"pre_task_version":1,"post_task_code":123451234,"

21.post_task_version":1,"condition_type":0,"condition_params":{}}] 同理其他接口请求参数processDefinitionId换成code schedule 数据 接口名称 参数名称 参数说明 类型 code 流程定义code long projectName 项目名称 string failureStrategy 失败策略,可用值:END,CONTINUE string 流程实例优先级,可用 createSchedule processInstancePriority string 值:HIGHEST,HIGH,MEDIUM,LOW,LOWEST schedule 定时 string warningGroupId 发送组ID int 发送策略,可用 warningType string 值:NONE,SUCCESS,FAILURE,ALL workerGroup workerGroup string -- -- -- -- code 流程定义code long projectName 项目名称 string queryScheduleListPaging pageNo 页码号 int pageSize 页大小 int searchVal 搜索值 string taskDefinition(新增) 接口名称 参数名称 参数说明 数据类型 save projectName 项目名称 taskDefinitionJson task信息 string -- -- -- -- projectName 项目名称 string update code task code long taskDefinitionJson task信息 string -- -- -- -- projectName 项目名称 string switch/deleteCode task_code taskCode long

22. 接口名称 参数名称 version 参数说明 版本号 数据类型 int -- -- -- -- query/delete projectName 项目名称 string task_code task code long -- -- -- -- queryTaskListPaging taskDefinitionJson:[{"name":"test","description":"","task_type":"SHELL","task_params": [],"flag":0,"task_priority":0,"worker_group":"default","fail_retry_times":0,"fail_retry_interval": 0,"timeout_flag":0,"timeout_notify_strategy":0,"timeout":0,"delay_time":0,"resource_ids":""} ] 对应需求issue [Feature][JsonSplit-api] api module controller design #5498 1. [Feature][JsonSplit-api]processDefinition save/update interface #5499 2. [Feature][JsonSplit-api]processDefinition switch interface #5501 3. [Feature][JsonSplit-api]processDefinition delete interface #5502 4. [Feature][JsonSplit-api]processDefinition copy interface #5503 5. [Feature][JsonSplit-api]processDefinition export interface #5504 6. [Feature][JsonSplit-api]processDefinition list-paging interface #5505 7. [Feature][JsonSplit-api]processDefinition move interface #5506 8. [Feature][JsonSplit-api]processDefinition queryProcessDefinitionAllByProjectId interface #5507 9. [Feature][JsonSplit-api]processDefinition select-by-id interface #5508 10. [Feature][JsonSplit-api]processDefinition view-tree interface #5509 11. [Feature][JsonSplit-api]schedule create interface #5510 12. [Feature][JsonSplit-api]schedule list-paging interface #5511 13. [Feature][JsonSplit-api]schedule update interface #5512 14. [Feature][JsonSplit-api]taskDefinition save interface #5513 15. [Feature][JsonSplit-api]taskDefinition update interface #5514 16. [Feature][JsonSplit-api]taskDefinition switch interface #5515 17. [Feature][JsonSplit-api]taskDefinition query interface #5516 18. [Feature][JsonSplit-api]taskDefinition delete interface #5517 19. [Feature][JsonSplit-api]WorkFlowLineage interface #5518 20. [Feature][JsonSplit-api]analysis interface #5519 21. [Feature][JsonSplit-api]executors interface #5520 22. [Feature][JsonSplit-api]processInstance interface #5521 23. [Feature][JsonSplit-api]project interface #5522

3 点赞
3 收藏
7下载