Preview of Apache Pulsar 2.5.0

Pulsar 2.5.0 功能预览 + Pulsar Schema + KOP


1.Preview of Apache Pulsar 2.5.0 Transactional streaming Sticky consumer Batch receiving Namespace change events

2.Messaging semantics - 1 try { Message msg = consumer.receive() // processing 1. At least once consumer.acknowledge(msg) } catch (Exception e) { consumer.negativeAcknowledge(msg) } try { Message msg = consumer.receive() // processing } catch (Exception e) { 2. At most once log.error(“processing error”, e) } finally { consumer.acknowledge(msg) } 3. Exactly once ?

3.Messaging semantics - 2 idempotent produce and idempotent consume be used more in practice

4.Messaging semantics - 3 Effectively once ledgerId + messageId -> sequenceId + Broker deduplication

5.Messaging semantics - 4 Limitations in effectively once 1. Only works with one partition producing 2. Only works with one message producing 3. Only works with on partition consuming 4. Consumers are required to store the message id and state for restoring

6. Streaming processing - 1 1. Received message A from Topic-1 and do some processing f (A) 1 Topic-1 A B Topic-2

7. Streaming processing - 2 2. Write the result message B to Topic-2 f (A) 2 Topic-1 A B Topic-2

8. Streaming processing - 3 3. Get send response from Topic-2 Topic-1 A f (A) 3 B Topic-2 How to handle get response timeout or consumer/function crash? Ack message A = At most once Nack message A = At least once

9.Streaming processing - 4 4. Ack message A Topic-1 A 4 f (A) B Topic-2 How to handle ack failed or consumer/function crash?

10.Transactional streaming semantics READ_COMMITTED 1. Atomic multi-topic publish and acknowledge 2. Message only dispatch to one consumer until transaction abort 3. Only committed message can be read by consumer

11.Transactional streaming demo Message<String> message = inputConsumer.receive(); Transaction txn = client.newTransaction().withTransactionTimeout(…).build().get(); CompletableFuture<MessageId> sendFuture1 = producer1.newMessage(txn).value(“output-message-1”).sendAsync(); CompletableFuture<MessageId> sendFuture2 = producer2.newMessage(txn).value(“output-message-2”).sendAsync(); inputConsumer.acknowledgeAsync(message.getMessageId(), txn); txn.commit().get(); MessageId msgId1 = sendFuture1.get(); MessageId msgId2 = sendFuture2.get();

12.Sticky consumer

13.Sticky consumer Consumer consumer1 = client.newConsumer() .topic(“my-topic“) .subscription(“my-subscription”) .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.sticky() .ranges(Range.of(0, 32767))) ).subscribe(); Consumer consumer2 = client.newConsumer() .topic(“my-topic“) .subscription(“my-subscription”) .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.sticky() .ranges(Range.of(32768, 65535))) ).subscribe();

14.Batch receiving messages Consumer consumer = client.newConsumer() .topic(“my-topic“) .subscription(“my-subscription”) .batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(100) .maxNumBytes(2 * 1024 * 1024) .timeout(1, TimeUnit.SECONDS) ).subscribe(); Messages msgs = consumer.batchReceive(); // doing some batch operate

15. Namespace change events persistent://tenant/ns/__change_events class PulsarEvent { EventType eventType; ActionType actionType; TopicEvent topicEvent; }

16.Thanks Penghui Li

17.Pulsar Schema Bo Cong / 丛搏 智联招聘消息系统研发⼯程师 Pulsar schema、HDFS Offload 核⼼贡献者

18.Schema Evolution Data management can't escape the evolution of schema 2

19.Single version schema message 1 message 2 message 3 version 1 3

20.Multiple version schemas message 1 message 2 message 3 version 1 version 2 Version 3 4

21.Schema compatibility can read = Deserialization

22.Compatibility strategy evolution can read can read Back Ward version 2 version 1 version 0 may can’t read can read can read Back Ward Transitive version 2 version 1 version 0 can read

23.Evolution of the situation Class Person { Class Person { @Nullable @Nullable String name; @AvroDefault("\"Zhang San\"") Can read String name; } } Version 2 Version 1 Class Person { Can’t read Can read String name; } Version 3 7

24.Compatibility check Separate schema compatibility checker for producer and consumer isAllowAutoUpdateSchema = false Producer Check if exist Consumer

25.Upgrade way Different strategy with different upgrade way BACKWORD Consumers BACKWORD_TRANSITIVE FORWORD Producers FORWORD_TRANSITIVE Full Any order Full_TRANSITIVE

26.Produce Different Message Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)) .topic(topic).create(); Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class)) .topic(topic) .subscriptionName("sub1").subscribe() p.newMessage().value(data1).send(); p.newMessage(Schema.AVRO(V2Data.class)).value(data2).send(); p.newMessage(Schema.AVRO(V1Data.class)).value(data3).send(); Message<V2Data> msg1 = c.receive(); V2Data msg1Value = msg1.getValue(); Message<V2Data> msg2 = c.receive(); Message<V2Data> msg3 = c.receive(); V2Data msg3Value = msg3.getValue(); 10

27.Thanks Bo Cong

28.Kafka On Pulsar(KOP) 翟佳

29. What is Apache Pulsar? Flexible Pub/Sub Messaging backed by Durable log Storage

StreamNative 是一家围绕 Apache Pulsar 和 Apache BookKeeper 打造下一代流数据平台的开源基础软件公司。秉承 Event Streaming 是大数据的未来基石、开源是基础软件的未来这两个理念,专注于开源生态和社区的构建,致力于前沿技术。