- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
TGIP-CN 031:KoP(Kafka on Pulsar)2.8.0 新特性前瞻
本次 TGIP-CN 由 StreamNative 工程师徐昀泽为大家带来「KoP(Kafka on Pulsar)2.8.0 新特性前瞻」主题分享。
关于 KoP
KoP 将 Kafka 协议处理插件引入 Pulsar broker,从而实现 Apache Pulsar 对原生 Apache Kafka 协议的支持。将 KoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 Kafka 应用程序和服务迁移到 Pulsar,从而使用 Pulsar 的强大功能,例如:
•利用企业级多租户特性简化运营;
•避免数据搬迁,简化操作;
•利用 Apache BookKeeper 和分层存储持久保留事件流;
•利用 Pulsar Functions 进行无服务器化事件处理。
KoP GitHub地址:http://GitHub.com/streamnative/kop
展开查看详情
1 .TGIP-31 KoP 2.8.0 新特性前瞻 徐昀泽 2020/04/11
2 .关于我 • StreamNative Software Engineer • Apache Pulsar Contributor • KoP Main Contributor Pulsar KoP Until 2021/04/09
3 .KoP 版本号 Apache Pulsar: • Major release,⽐如 2.7.0 • Minor release,⽐如 2.7.1 StreamNative Pulsar (https://github.com/streamnative/pulsar) • Daily release,⽐如 2.8.0-rc-202104032208 • Weekly release,⽐如 2.7.1.2 从 2.6.2.0 起,KoP 的版本号和 Pulsar ⼀致,master 分支会不定期更新依赖的 SN Pulsar。
4 .概览 1. 为什么需要 KoP? 2. KoP 的基本实现。 3. KoP 2.8.0-SNAPSHOT 版本的近期进展。 4. 近期计划
5 .Kafka vs Pulsar Client Broker Broker Broker 计算层 Broker Broker Broker Bookie Bookie Bookie Bookie 存储层 Follower Leader Follower Kafka Pulsar
6 .从 Kafka 迁移到 Pulsar • 推动业务更换客户端? • 麻烦。 • Pulsar adaptors?看起来不错,可惜我不是⽤的 Java 客户端。 • 我不嫌麻烦,但我只会 PHP。 • ⽤户直接使⽤了 Kafka 连接器(近百种)连接到外部系统怎么办? • ⽤户使⽤外部系统的连接器连接到 Kafka 怎么办?
7 .KoP (Kafka on Pulsar) 如何使⽤? 1. 将 Protocol Handler 的 NAR 包放⼊ protocols ⽬录 2. 对 broker.conf 或 standalone.conf 添加相应配置 ⽀持的客户端: • Java >= 1.0 • C/C++: librdkafka • Golang: sarama • NodeJS: Pulsar Architecture from 2.5.0 • 其他基于 rdkafka 的客户端
8 .Protocol Handler BrokerService 掌控每个 broker 的⼀切资源 • 连接的 producers,subscriptions • 持有的 topic 及其对应的 managed ledgers • 内置的 admin 和 client • … Broker 启动流程 Load Verify initialize() start() newChannelInitializer()
9 .Topic & Partition Kafka Pulsar persistent://public/default/my-topic-partition-0 • 是否持久化 • 租户 • 命名空间 • 主题 • 分区编号
10 .Topic & Partition • my-topic => persistent://public/default/my-topi • Tenant-0/Namespace-0/Topic-0 => persistent://Tenant-0/Namespace-0/Topic-0 • xxx/my-topic => invalid topic name • persistent://Tenant-0/Namespace-0/Topic-0 => persistent://Tenant-0/Namespace-0/Topic-0 c
11 . Produce & Fetch 请求 PRODUCE 请求: 1. 通过 topic 名字找到 PersistentTopic 对象(内含 ManagedLedger)。 2. 对消息格式进⾏转换。 3. 异步写⼊消息到 bookie。 FETCH 请求: producers 1. 通过 topic 名字找到 PersistentTopic 对象。 2. 通过 o set 找到对应的 ManagedCursor。 PersistentTopic subscriptions 3. 从 ManagedCursor 对应位置读取 entry。 4. 对 entry 格式进⾏转换后将消息返回给客户端。 ledgers Cursors Bookie Bookie ff
12 .Group Coordinator Group Topic C1 C2 Partition 0 进⾏ rebalance,决定 partition 和 group 的映射关系。 Partition 1 Partition 2 当 consumer 加⼊(订阅)⼀个 group 时: 1. 发送 JoinGroup 请求,通知 broker 有新的消费者加⼊。 2. 发送 SyncGroup 请求⽤于 partition 的分配。
13 .Group Coordinator 特殊 topic:__consumer_o sets offsetProducers GroupMetadataManager offsetReaders Write/Read __consumer_offsets Kafka group 等价于 Pulsar Failover subscription OffsetAcker consumers Acknowledge cumulatively ff
14 . Group Coordinator 0x00000000 • 每台 broker 都拥有(own)⼀些 bundle range。 • Topic 会按名字哈希到其中⼀个 bundle range,这个 range 的 owner broker 就是 topic 的 owner broker。 0xc0000000 0x40000000 • Bundle 可能分裂,broker 也有可能挂掉,因此 bundle ownership 会发⽣改变。 0x80000000 Namespace bundle 因此,KoP 注册了 bundle ownership 的 listener 用于通知 group coordinator。
15 .Kafka offset Before KoP 2.8.0 • Kafka o set 是⼀个 64 位整型,⽤来标识消息存储的位置。 • Pulsar 使⽤ MessageId 来标识消息位置。 Kafka Offset 8 bytes Batch Pulsar Ledger id Entry id Index 8 bytes 8 bytes 4 bytes KoP 之前的分配策略:20 bits + 32 bits + 12bits ff
16 .Kafka offset MessageId 拆分的问题 Offset (64 bits) = LedgerId (20 bits) + EntryId (32 bits) + BatchIndex (12bits) • 难以提出『合适』的分配⽅案 • 从 cursor 读取 entry 时只能⼀个⼀个读取,否则可能导致 Maximum o set delta exceeded • 有些第三⽅组件(⽐如 Spark)依赖于连续 o set 的功能 ff ff
17 .Kafka offset PIP 70 BrokerEntryMetadata Message Metadata Payload Message Metadata Payload Raw Message BrokerEntryMetadata Metadata Payload Message Message Broker Client Broker Client Raw Message Message Bookie Bookie Before After
18 .Kafka offset PIP 70 BrokerEntryMetadata ManagedLedger 中使⽤ interceptor Broker 中将 BrokerEntryMetadata 添加到 entry ⾸部,并传递给 BrokerEntryMetadataInterceptor
19 . KoP 2.8.0 连续 offset 基于 BrokerEntryMetadata 很容易实现连续 o set: • FETCH 请求:直接读 bookie,解析 BrokerEntryMetadata 即可。 • PRODUCE 请求:将 ManagedLedger 传⼊异步写 bookie 的上下⽂,从 ManagedLedger 的 interceptor 中拿到 o set。(此外 https://github.com/apache/pulsar/pull/9257 已⽀持在回调中直接获取 BrokerEntryMetadata) • COMMIT_OFFSET 请求:对于 __consumer_o sets,原封不动写⼊ topic。对于 Pulsar 的 cumulative acknowledgement,则对 ManagedLedger 进⾏⼆分查找。 因此 KoP 2.8.0 必须配置: brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor ff ff ff
20 .Message encode & decode Before 2.8.0 ⽣产: 1. 解析 MemoryRecords,重新构造 Pulsar 的 MessageMetadata。 2. 解压缩,解 batch,为所有单条消息重新构造 Pulsar 的 Message。 3. 同 Pulsar 客户端的操作。 消费: 1. 解析 Pulsar 消息的元数据,从⽽构造 MemoryRecordsBuilder。 2. 通过 MessageId 反算出 o set。 3. 解压缩,解 batch,将所有单条消息及其 o set传⼊ appendWithO set ⽅法。 ff ff ff
21 .Message encode & decode 如果不兼容 Pulsar 客户端呢? Kafka Producer & Kafka Consumer Pulsar Producer & Pulsar Consumer Kafka Producer & Pulsar Consumer Pulsar Producer & Kafka Consumer ⽣产的时候,直接将 MemoryRecords 内部的 ByteBu er 写⼊ bookie 即可。 其实还额外给 Metadata 加了个 property 消费的时候呢? entry.format=kafka ff
22 .Message encode & decode ManagedCursor 简单粗暴的做法:继续⽤ MemoryRecordsBuilder 把所有 Entry 拼起来。 实际发现开销还是⽐较⼤,追根溯源看到 appendWithO set 其实会对每条消息重新计算校验和。 ff
23 .Message encode & decode appendWithO set 简化版 Entry 1 Entry 2 Entry N Fill baseOffset Fill batchLength Bytes 1 Bytes 2 Bytes N Final bytes 若不想兼容 Pulsar 和 Kafka 客户端互操作,可以添加如下配置: entryFormat=kafka ff
24 .性能测试(WIP) 16 分区,batch.size=1KB,batch.ms=1,200k * 1KB msg/s。 https://github.com/BewareMyPower/openmessaging-benchmark/commits/bewaremypower/deploy-kop
25 .性能测试(WIP) • HandleProduceRequest:PRODUCE 请求的处理开始,到这次请求所有消息全部成功写⼊ bookie。 • ProduceEncode:对 Kafka 消息编码的时间。 • MessageQueuedLatency:从每个分区的消息排队开始,到准备异步发送的时间。 • MessagePublish:单个分区的消息从异步发送开始,到成功写⼊ bookie 的时间。
26 .KoP Authentication Before 2.8.0 KoP 对 authentication 的⽀持仅限于 SASL/PLAIN 机制,它基于 Pulsar 的 JSON Web Token 认证,在 broker 的基本配置之外,只需要额外配置 ⽤户端则需要输⼊ namespace 和 token 作为 JAAS 配置的⽤户名和密码。
27 .KoP Authentication ⽀持 OAuth 2.0 KoP 2.8.0 ⽀持 OAuth 2.0 进⾏认证,也就是 SASL/OAUTHBEARER 机制。 +--------+ +--------------- | |--(A)- Authorization Request ->| Resource | | | Owner | |<-(B)-- Authorization Grant ---| | | +--------------- | | | +--------------- | |--(C)-- Authorization Grant -->| Authorization | Client | | Server | |<-(D)----- Access Token -------| | | +--------------- | | | +--------------- | |--(E)----- Access Token ------>| Resource | | | Server | |<-(F)--- Protected Resource ---| +--------+ +--------------- | | + | | | + + | | | + + | | | +
28 .KoP Authentication ⽀持 OAuth 2.0 类似 Kafka,KoP 也需要在 broker 端配置 Server Callback Handler ⽤于 token 验证: • kopOauth2AuthenticateCallbackHandler :handler 类 • kopOauth2Con gFile:配置⽂件路径 KoP 提供了⼀种实现类,它基于 Pulsar broker 配置的 AutnticationProvider 进⾏验证, 因此配置⽂件中仅需配置 auth.validate.method=<method>。 fi
29 .KoP Authentication ⽀持 OAuth 2.0 对于 Kafka 客户端,KoP 提供了⼀种 Login Callback Handler 实现。 Kafka Java 客户端 OAuth 2.0 认证 Pulsar Java 客户端 OAuth 2.0 认证