- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
3 Apache SeaTunnel实现非CDC数据抽取实践 -陈胡
本次分享主要从ST的应用场景与业务痛点着手,解释为什么会选择ST,并详细解释其实现方案与具体流程。
展开查看详情
1 . Apache SeaTunnel 1.X 数据抽取实践 使用场景 业务痛点 实现方案 流程详解 资深大数据工程师 陈胡 chenhu1008@me.com https://seatunnel.apache.org/
2 . 01 SeaTunnel 简介 02 应用场景 03 业务痛点 CONTENT 04 为什么选择SeaTunnel 05 实现方案 06 具体流程 07 问答
3 .01 SeaTunnel 简介
4 .Apache SeaTunnel 简介 Apache SeaTunnel是下一代高性能、分布式、海量 数据集成框架。通过我们的努力让Spark的使用更简 单,更高效,并将业界和我们使用Spark的优质经验 固化到seatunnel这个产品中,明显减少学习成本, 加快分布式数据处理能力在生产环境落地. https://seatunnel.apache.org/
5 .Apache SeaTunnel 简介 高性能 插件式 Apache 分布式 Spark/Flink SeaTunnel 简单易用 无需开发 https://seatunnel.apache.org/
6 .Apache SeaTunnel 简介 https://seatunnel.apache.org/
7 .Apache SeaTunnel 简介 https://seatunnel.apache.org/
8 .Apache SeaTunnel 简介 https://seatunnel.apache.org/
9 .02 应用场景
10 .应用场景-SeaTunnel在交管行业的应用 行业数据 车辆驾驶人 交通警情 交通违法 机动车登记信息 卡口过车 车辆GPS 执勤执法 交通事故 其他互联网数据 https://seatunnel.apache.org/
11 .应用场景-SeaTunnel在交管行业的应用 数据特点 数据量 大小不一 数据安全 实时性要求不高 要求较高 分钟级别 数据一致性 更新频率不高 要求比较高 分钟级别 https://seatunnel.apache.org/
12 .03 业务痛点
13 .业务痛点 数据来源 01 Oracle 02 只有视图查询权限 03 表结构随时更新,无通知下发 04 Session数量受限 05 不能保证有稳定的增量列 https://seatunnel.apache.org/
14 .04 为什么选择SeaTunnel
15 .为什么选择SeaTunnel 支持Spark 不需要编写程序,只需要简单配置即可 丰富的Source和Sink,对于数据源的权限要求不高 拆箱即用,没有复杂的设置流程,执行效率高 首个进入了apache孵化的国人开源数据集成平台 https://seatunnel.apache.org/
16 .为什么选择SeaTunnel https://seatunnel.apache.org/
17 .为什么选择SeaTunnel https://seatunnel.apache.org/
18 .为什么选择SeaTunnel https://seatunnel.apache.org/
19 .05 实现方案
20 .实现方案 增量更新实现 增量列的选择 如何记录增量列的 checkpoint https://seatunnel.apache.org/
21 .06 具体流程
22 .具体流程 确定运算资源 spark { spark.app.name = "acd_file2ck" spark.executor.instances = 6 spark.executor.cores = 2 spark.executor.memory = "2g" spark.sql.catalogImplementation = "hive" } https://seatunnel.apache.org/
23 .具体流程 确定数据来源 input { hdfs { path = "hdfs://xxx.xxx.com:8020/user/hdfs/checkpoint/accident/acd_filehuman/maxgxsj /" result_table_name = "acd_filehuman_max_tmp" format = "json" } jdbc { driver = "oracle.jdbc.driver.OracleDriver" url = "jdbc:oracle:thin:@<OracleHostIp>:1521/<sid>” #table = "(select * from demo_user.acd_filehuman@demoDB where GXSJ >= trunc(${initDate}))" table = "(select * from demo_user.acd_filehuman@demoDB where GXSJ >= trunc(sysdate) )" result_table_name = "acd_filehuman_interval_tmp" user = ”demo" password = ”demo" jdbc.partitionColumn = "XZQH" jdbc.numPartitions = "100" jdbc.lowerBound = 510000 jdbc.upperBound = 520000 } https://seatunnel.apache.org/ }
24 .具体流程 必要的数据转换 filter { sql { sql = "select * from acd_filehuman_interval_tmp where GXSJ > (select max(gxsj) from acd_filehuman_max_tmp)" } convert{ source_field = "RYBH" new_type= "integer" } convert{ … … } convert{ source_field = "GXSJ" new_type= "string" result_table_name = "acd_filehuman" } sql { sql = "select string(max(GXSJ)) as gxsj from acd_filehuman" result_table_name = "max_gxsj" } } https://seatunnel.apache.org/
25 .具体流程 数据输出 output { clickhouse { source_table_name = "acd_filehuman" host = "es1:8123,es2:8123,es3:8123" database = ”demoDB" table = "acd_filehuman" username = ”demoUser" password = ”password" bulk_size = 5000 } hdfs { source_table_name = "max_gxsj" path = "hdfs:// xxx.xxx.com:8020/user/hdfs/checkpoint/accident/acd_filehuman/maxgxsj/" save_mode = "append" format = "json" } } https://seatunnel.apache.org/
26 .具体流程 Shell &Scheduler #!/usr/bin/bash #导入当前用户的环境变量 export CLASSPATH=".:/usr/local/soft/jdk1.8.0_73/jre/lib/rt.jar:/usr/local/soft/jdk1.8.0_73/ lib/dt.jar:/usr/local/soft/jdk1.8.0_73/lib/tools.jar" export PATH="/usr/lib64/qt- 3.3/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/usr/local/soft/jdk1.8.0_ 73/bin:/home/hdfs/.local/bin:/home/hdfs/bin" export JAVA_HOME="/usr/local/soft/jdk1.8.0_73” nohup sh /usr/waterdrop/bin/start-waterdrop.sh --master yarn --deploy-mode client --config /usr/waterdrop/config/acd_file_interval_ck.conf > /home/hdfs/logs/acd_file_interval_ck.log 2>&1 & Crontab 或者DophinScheduler https://seatunnel.apache.org/
27 .系统截图 HDFS上的CheckPoint文件 Crontab 调度脚本 Yarn 应用列表 https://seatunnel.apache.org/
28 .Thanks. CETC 陈胡