基于 Pulsar 和 Flink 进行批流一体的弹性数据处理-Flink Forward Asia

展开查看详情

1.基于 Pulsar 和 Flink 进行批流一体的弹性数据处理 Elastic data processing with Apache Pulsar and Apache Flink 申毅杰 Senior Software Engineer at StreamNative

2. 01 批流融合的处理需求 Motivation on Elastic Stream and Batch Processing Contents 02 Apache Pulsar 简介 目录 Why Apache Pulsar 03 Pulsar Flink 连接器 Pulsar Flink connector 04 未来发展 Future Directions

3.批流融合的弹性处理需求 Motivation on Elastic Stream and Batch Processing 01

4. 对批流融合的弹性数据处理需求 Motivation on Elastic Stream and Batch Processing 无处不在的流数据 快速发掘数据价值 计算引擎批流融合的趋势 Batch and interactive analysis, Sensors, logs from mobile app, IoT Unified / similar API for batch/interactive and stream processing, machine Organizations got better at capturing data stream processing learning, graph processing

5. 批流融合处理的挑战 Challenges for Traditional MQs or Log Storage Systems • 云原生架构的兼容性 Compatible with cloud native architecture • 多租户管理 Multi-tenant management • 扩展性 Scalability • 数据存储组织的复杂度 Complexity in a multi-system architecture • 多系统存储维护开销 Maintenance as well as provisioning • 数据可见性问题 Visibility of data

6.Apache Pulsar 简介 Why Apache Pulsar 02

7.云原生的架构 Pulsar -- Cloud Native Architecture 无状态服务层 Stateless serving 数据持久层 Durable storage 7

8. 基于分片的数据存储 Pulsar -- Segment-based Storage • Managed Ledger • Topic 的存储抽象 Storage layer for a single topic • Ledger • 单写者,追加写 Single writer, append-only • 被复制到多个 bookie 节点上 Replicated to multiple bookies

9. 无限、廉价的数据存储 Pulsar -- Infinite Data Storage • 使用廉价存储,持久化无限数据 Reduce storage cost • 按照分片粒度将数据卸载到廉价存储中 Offloading segment to tiered storage one-by-one

10. 有结构的数据 Pulsar – Structured Data • 内置的 Schema 注册 Built-in schema registry • 在服务器端的消息结构共识 Consensus of data at server-side • Topic 级别的消息结构 Data schema on a per-topic basis • 直接产生、消费有结构的数据 Send and receive typed message directly • Pulsar 进行消息验证 Validation • 支持消息版本的演化 Multi-version

11. Pulsar Flink 连接器 Pulsar Flink Connector 03

12. 连接器API Flink Pulsar Connector -- API •Read •Write val props = new Properties() val prop = new Properties() props.setProperty("service.url", ...) prop.setProperty("service.url", ...) props.setProperty("admin.url", ...) prop.setProperty("admin.url", ...) props.setProperty("partitionDiscoveryInterval prop.setProperty("flushOnCheckpoint", "true") Millis", "5000") prop.setProperty("failOnWrite", "true") props.setProperty("startingOffsets", "earliest") props.setProperty("topic", "test-sink-topic") props.setProperty("topic", "test-source-topic") stream.addSink(new FlinkPulsarSink[Row](prop, val source = new FlinkPulsarSource(props) DummyTopicKeyExtractor)) val dataStream = env.addSource(source) tEnv .connect(new Pulsar().properties(props)) .inAppendMode() .registerTableSource("pulsar-test-table")

13. 持久化、可重放的数据源 Durable and ordered source • 故障无法避免 Failures are inevitable for engines • Task 从 checkpoint 中恢复 Tasks recover from checkpoint • Exactly-once • 基于 topic 内消息有序的特性 Based on message order in topic • 通过 Seek & read 实现 Implement based on seek and read • 通过额外的订阅避免消息被删除 Messages ”keep-alive” by subscription • 在得到 checkpoint 完成通知时移动订阅游标 Move sub cursor on commit

14. 结构化数据存取 Processing typed records • 将 Pulsar topic 看作是一张有结构的表 Regard Pulsar as structured storage • 在任务调度期获取表 Schema 定义 Fetching schema as the first step • 将 Pulsar message (反)序列化成Row SerDe your messages into Row • 支持 avro/json/protobuf 的消息转换 Avro schema and avro/json/protobuf Message • 消息元数据转化为表的内部列 Message metadata as metadata fields • __key, __publishTime, __eventTime, __messageId, __topic

15. Topic 和 Partition 发现 Topic/Partition discovery • 流处理作业是长时间运行的 Streaming jobs are long running • 在作业执行期间,topic 可能被添加或删除 Topics & partitions may be added on removed during a job • 阶段性检查 topic 状态 Periodically check topic for status • 每个 task 内部一个用于监控的线程 With a monitoring thread in each task

16.未来方向 Future Directions 04

17. 分析友好的数据组织、访问 Analytical-friendly data organizations and access method • 谓词下推 + 粗粒度索引 • 列形式组织 Segment • 更灵活的数据消费模式 Filter push down & coarse-grained index Organize segment data in columnar format More flexible data consumption mode • Segment 级别的 max、min • 针对分析型负载 Max/min at segment level Target at analytical workloads • Broker 收集,写入 • 节约磁盘带宽 Generated by brokers Save disk-bandwidth / network IO Segment 的元数据 • 节约 CPU 时间 Save CPU time Indices are generated during broker put

18.Pulsar 社区 Pulsar Community

19.Pulsar 社区 Pulsar Community

20.THANKS