- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
徐昀泽-KoP (Kafka on Pulsar) 工作原理解析
议题
KoP (Kafka on pulsar) 工作原理解析:助力你从 Kafka 到 Pulsar
同样是本次Meetup的新朋友,徐昀泽,目前担任StreamNative 软件工程师,是Apache Pulsar社区的Committer,也是 KoP (Kafka on Pulsar) 的核心维护者。他将带来 KoP 的工作原理分享。KoP 的诞生主要是希望基于 Apache Pulsar 来支持 Kafka 协议。用户只需将 KoP 插件添加到现有的 Pulsar 集群后,就可直接用现有的 Kafka 客户端和 Pulsar 进行交互,包括消息的生产消费以及管理端相关操作。
展开查看详情
1 .KoP (Kafka on Pulsar) 工作原理解析 助力你从 Kafka 到 Pulsar 徐昀泽 https://openlookeng.io
2 . 个人介绍 • StreamNative Software Engineer • Apache Pulsar Committer • KoP Core Maintainer https://openlookeng.io
3 .Content 为什么需要 KoP? Protocol Handler KoP 基本实现 KoP 2.8.0 的重大改进 近期计划 https://openlookeng.io
4 .01. 为什么需要 KoP? https://openlookeng.io
5 .Kafka & Pulsar 作为消息队列 Producer Broker Consumer Producer topic Consumer Producer Consumer partition partition partition https://openlookeng.io
6 .Pulsar vs Kafka Client Broker Broker Broker Broker Broker Broker Bookie Bookie Follower Leader Follower Bookie Bookie Kafka Pulsar https://openlookeng.io
7 . 从 Kafka 迁移到 Pulsar • 修改现有代码? • 业务:我很忙 • Pulsar adapters? 业务:我不用 Java。 • 业务:我有空改代码。对了,我写 PHP. • 如果我线上跑的是 Kafka connector 呢? • 亦或者我用的是其他系统连接 Kafka 的现有插件呢? https://openlookeng.io
8 .02. Protocol Handler https://openlookeng.io
9 .Protocol Handler https://openlookeng.io
10 . How to use KoP? 1. 把 nar 包放到 protocols 目录 2. 对 broker.conf 进行额外配置 3. 启动 broker https://openlookeng.io
11 . Supported clients 核心测试基于 Kafka 2.0 客户端 以下客户端通过了基本的端到端测试: • Java client 1.0 ~ 2.6 • rdkafka based clients • golang-sarama https://openlookeng.io
12 .ProtocolHandler interface BrokerService 管理 broker 的所有资源 • Producer, subscription 及其 consumer • Topic 以及相关的 managed ledger • 内置的 admin 和 client(使用 broker 配置的认证 信息) • … 加载 验证 配置 启动 绑定端口 https://openlookeng.io
13 .03. KoP 基本实现 https://openlookeng.io
14 . Topic & Partition Kafka Pulsar persistent://public/default/my-topic-partition-0 • 是否持久化消息 • 租户/命名空间 • 短 topic 名字 • Partition 后缀 https://openlookeng.io • 分区编号
15 . Topic & Partition • my-topic => persistent://public/default/my-topic • Tenant-0/Namespace-0/Topic-0 => persistent://Tenant-0/Namespace-0/Topic-0 • xxx/my-topic => invalid topic name • persistent://Tenant-0/Namespace-0/Topic-0 https://openlookeng.io
16 .Produce & Fetch 请求 Find PersistentTopic Encode MemoryRecords Read bytes from bookie Write the bytes to bookie Decode bytes to via ManagedLedger MemoryRecords Produce Fetch Q: 如果 PersistentTopic 查找失败了呢? A: 返回 NOT_LEADER_FOR_PARTITION 错误码来触发 Kafka 客户端重试. https://openlookeng.io
17 . Group Coordinator Topic C1 C2 Group Partition 0 Partition 1 Partition 2 Rebalance: 决定 partition 到 consumer 的映射关系 当 consumer 订阅 group 时: 1. 发送 JoinGroup 请求来通知 broker 和其他 consumers 2. 发送 SyncGroup 请求来广播分配结果 https://openlookeng.io
18 . Group Coordinator 我们需要一次额外的 “offset commit”: acknowledge cumulatively https://openlookeng.io
19 . Group Coordinator & Namespace Bundle • 每个 broker 拥有(owns)一些 bundle range(简称 bundle) • Topic 会被哈希到其中一个 bundle,owner broker 就是这个 topic 的 owner broker. 在 KoP 中我们注册了一个 listener 来 监听 bundle ownership 的变化,从而通知 group coordinator 加载或移除对应元数据。 https://openlookeng.io
20 .04. KoP 2.8.0 的重大改进 https://openlookeng.io
21 . Kafka offset Kafka offset 是一个 64 位整型,它代表消息在某个 partition 的下标。 Segment Offset 0 Offset 1 Offset 2 … 在 Pulsar 中, • Segment 也被称为 ledger,它分散在不同的 bookie 节点 • 基本存储单位是 entry,它代表一个 batched message • 在 batch 中,每个 single message 对应 Kafka 的 offset https://openlookeng.io
22 . Kafka offset 实现(2.8.0 之前) Kafka Offset 8 bytes Batch Pulsar Ledger id Entry id Index 8 bytes 8 bytes 4 bytes 64 bits Offset = 20 bits Ledger id + 32 bits Entry id + 12 bits Batch Index Flaws: 1.没有绝对的分配方案 2.跨不同 ledger 读取多个 entry 后拼接时: Maximum offset delta exceeded 3.一些第三方系统,比如 Spark,依赖连续 offset 的设计 https://openlookeng.io
23 . PIP 70 BrokerEntryMetadata Message Metadata Payload Raw Message BrokerEntry Metadata Metadata Payload Client Broker Client Broker Bookie Bookie Before After 2.8.0 2.8.0 https://openlookeng.io
24 . PIP 70 BrokerEntryMetadata brokerEntryMetadataInterceptors=\ org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor KoP 2.8.0 required • Managed Ledger:为每个 entry 注册一个 interceptor • Broker: 实现 interceptor,并创建若干 BrokerEntryMetadata interceptor • Broker Timestamp • Topic Index • … https://openlookeng.io
25 . KoP 中使用 Topic Index • PRODUCE or FETCH • 从 entry 中解析 BrokerEntryMetadata • COMMIT_OFFSET • 对于 __consumer_offsets,直接将数据写入 topic • 对于 Pulsar’s cumulative ACK,使用二分查找来找到 ManagedLedger 的 offset • LIST_OFFSET • Latest: 读取 ManagedLedgerInterceptor 的 index • Earliest or Timestamp: 定位消息,然后解析 BrokerEntryMetadata https://openlookeng.io
26 . 消息编解码 Before 2.8.0 • 解压缩 • 解 batch • 对每条 single message 进行转换 Is it necessary? • 重新创建 metadata 如果用户是从 Kafka 移植到 Pulsar,那么他们现在肯定没有 Pulsar 客户端。 https://openlookeng.io
27 . 消息编解码 entryFormat=kafka • Produce: Entry 1 Entry 2 Entry N • 只添加 metadata header • Consume: • 填充每个 entry 的 offset 和 length 字段 Bytes 1 Bytes 2 Bytes N • 合并这些 entry https://openlookeng.io
28 . 频繁 GC 的 bug 堆内存增长很快,频繁触发 GC。 然而 Pulsar 对消息是使用 堆外内存 的,这种情况不该发生。 https://openlookeng.io
29 . 频繁 GC 的 bug 当你排除一切不可能之后,剩下的无论多么难以置信,那都是事实。 by 夏洛克·福尔摩斯 https://openlookeng.io