申请试用
HOT
登录
注册
 
徐昀泽-KoP (Kafka on Pulsar) 工作原理解析
0 点赞
0 收藏
0下载
openLooKeng
/
发布于
/
31
人观看

议题

KoP (Kafka on pulsar) 工作原理解析:助力你从 Kafka 到 Pulsar

同样是本次Meetup的新朋友,徐昀泽,目前担任StreamNative 软件工程师,是Apache Pulsar社区的Committer,也是 KoP (Kafka on Pulsar) 的核心维护者。他将带来 KoP 的工作原理分享。KoP 的诞生主要是希望基于 Apache Pulsar 来支持 Kafka 协议。用户只需将 KoP 插件添加到现有的 Pulsar 集群后,就可直接用现有的 Kafka 客户端和 Pulsar 进行交互,包括消息的生产消费以及管理端相关操作。

展开查看详情

1.KoP (Kafka on Pulsar) 工作原理解析 助力你从 Kafka 到 Pulsar 徐昀泽 https://openlookeng.io

2. 个人介绍 • StreamNative Software Engineer • Apache Pulsar Committer • KoP Core Maintainer https://openlookeng.io

3.Content 为什么需要 KoP? Protocol Handler KoP 基本实现 KoP 2.8.0 的重大改进 近期计划 https://openlookeng.io

4.01. 为什么需要 KoP? https://openlookeng.io

5.Kafka & Pulsar 作为消息队列 Producer Broker Consumer Producer topic Consumer Producer Consumer partition partition partition https://openlookeng.io

6.Pulsar vs Kafka Client Broker Broker Broker Broker Broker Broker Bookie Bookie Follower Leader Follower Bookie Bookie Kafka Pulsar https://openlookeng.io

7. 从 Kafka 迁移到 Pulsar • 修改现有代码? • 业务:我很忙 • Pulsar adapters? 业务:我不用 Java。 • 业务:我有空改代码。对了,我写 PHP. • 如果我线上跑的是 Kafka connector 呢? • 亦或者我用的是其他系统连接 Kafka 的现有插件呢? https://openlookeng.io

8.02. Protocol Handler https://openlookeng.io

9.Protocol Handler https://openlookeng.io

10. How to use KoP? 1. 把 nar 包放到 protocols 目录 2. 对 broker.conf 进行额外配置 3. 启动 broker https://openlookeng.io

11. Supported clients 核心测试基于 Kafka 2.0 客户端 以下客户端通过了基本的端到端测试: • Java client 1.0 ~ 2.6 • rdkafka based clients • golang-sarama https://openlookeng.io

12.ProtocolHandler interface BrokerService 管理 broker 的所有资源 • Producer, subscription 及其 consumer • Topic 以及相关的 managed ledger • 内置的 admin 和 client(使用 broker 配置的认证 信息) • … 加载 验证 配置 启动 绑定端口 https://openlookeng.io

13.03. KoP 基本实现 https://openlookeng.io

14. Topic & Partition Kafka Pulsar persistent://public/default/my-topic-partition-0 • 是否持久化消息 • 租户/命名空间 • 短 topic 名字 • Partition 后缀 https://openlookeng.io • 分区编号

15. Topic & Partition • my-topic => persistent://public/default/my-topic • Tenant-0/Namespace-0/Topic-0 => persistent://Tenant-0/Namespace-0/Topic-0 • xxx/my-topic => invalid topic name • persistent://Tenant-0/Namespace-0/Topic-0 https://openlookeng.io

16.Produce & Fetch 请求 Find PersistentTopic Encode MemoryRecords Read bytes from bookie Write the bytes to bookie Decode bytes to via ManagedLedger MemoryRecords Produce Fetch Q: 如果 PersistentTopic 查找失败了呢? A: 返回 NOT_LEADER_FOR_PARTITION 错误码来触发 Kafka 客户端重试. https://openlookeng.io

17. Group Coordinator Topic C1 C2 Group Partition 0 Partition 1 Partition 2 Rebalance: 决定 partition 到 consumer 的映射关系 当 consumer 订阅 group 时: 1. 发送 JoinGroup 请求来通知 broker 和其他 consumers 2. 发送 SyncGroup 请求来广播分配结果 https://openlookeng.io

18. Group Coordinator 我们需要一次额外的 “offset commit”: acknowledge cumulatively https://openlookeng.io

19. Group Coordinator & Namespace Bundle • 每个 broker 拥有(owns)一些 bundle range(简称 bundle) • Topic 会被哈希到其中一个 bundle,owner broker 就是这个 topic 的 owner broker. 在 KoP 中我们注册了一个 listener 来 监听 bundle ownership 的变化,从而通知 group coordinator 加载或移除对应元数据。 https://openlookeng.io

20.04. KoP 2.8.0 的重大改进 https://openlookeng.io

21. Kafka offset Kafka offset 是一个 64 位整型,它代表消息在某个 partition 的下标。 Segment Offset 0 Offset 1 Offset 2 … 在 Pulsar 中, • Segment 也被称为 ledger,它分散在不同的 bookie 节点 • 基本存储单位是 entry,它代表一个 batched message • 在 batch 中,每个 single message 对应 Kafka 的 offset https://openlookeng.io

22. Kafka offset 实现(2.8.0 之前) Kafka Offset 8 bytes Batch Pulsar Ledger id Entry id Index 8 bytes 8 bytes 4 bytes 64 bits Offset = 20 bits Ledger id + 32 bits Entry id + 12 bits Batch Index Flaws: 1.没有绝对的分配方案 2.跨不同 ledger 读取多个 entry 后拼接时: Maximum offset delta exceeded 3.一些第三方系统,比如 Spark,依赖连续 offset 的设计 https://openlookeng.io

23. PIP 70 BrokerEntryMetadata Message Metadata Payload Raw Message BrokerEntry Metadata Metadata Payload Client Broker Client Broker Bookie Bookie Before After 2.8.0 2.8.0 https://openlookeng.io

24. PIP 70 BrokerEntryMetadata brokerEntryMetadataInterceptors=\ org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor KoP 2.8.0 required • Managed Ledger:为每个 entry 注册一个 interceptor • Broker: 实现 interceptor,并创建若干 BrokerEntryMetadata interceptor • Broker Timestamp • Topic Index • … https://openlookeng.io

25. KoP 中使用 Topic Index • PRODUCE or FETCH • 从 entry 中解析 BrokerEntryMetadata • COMMIT_OFFSET • 对于 __consumer_offsets,直接将数据写入 topic • 对于 Pulsar’s cumulative ACK,使用二分查找来找到 ManagedLedger 的 offset • LIST_OFFSET • Latest: 读取 ManagedLedgerInterceptor 的 index • Earliest or Timestamp: 定位消息,然后解析 BrokerEntryMetadata https://openlookeng.io

26. 消息编解码 Before 2.8.0 • 解压缩 • 解 batch • 对每条 single message 进行转换 Is it necessary? • 重新创建 metadata 如果用户是从 Kafka 移植到 Pulsar,那么他们现在肯定没有 Pulsar 客户端。 https://openlookeng.io

27. 消息编解码 entryFormat=kafka • Produce: Entry 1 Entry 2 Entry N • 只添加 metadata header • Consume: • 填充每个 entry 的 offset 和 length 字段 Bytes 1 Bytes 2 Bytes N • 合并这些 entry https://openlookeng.io

28. 频繁 GC 的 bug 堆内存增长很快,频繁触发 GC。 然而 Pulsar 对消息是使用 堆外内存 的,这种情况不该发生。 https://openlookeng.io

29. 频繁 GC 的 bug 当你排除一切不可能之后,剩下的无论多么难以置信,那都是事实。 by 夏洛克·福尔摩斯 https://openlookeng.io

0 点赞
0 收藏
0下载