Preview of Apache Pulsar 2.5.0

Pulsar 2.5.0 功能预览 + Pulsar Schema + KOP

展开查看详情

1.Preview of Apache Pulsar 2.5.0 Transactional streaming Sticky consumer Batch receiving Namespace change events

2.Messaging semantics - 1 try { Message msg = consumer.receive() // processing 1. At least once consumer.acknowledge(msg) } catch (Exception e) { consumer.negativeAcknowledge(msg) } try { Message msg = consumer.receive() // processing } catch (Exception e) { 2. At most once log.error(“processing error”, e) } finally { consumer.acknowledge(msg) } 3. Exactly once ?

3.Messaging semantics - 2 idempotent produce and idempotent consume be used more in practice

4.Messaging semantics - 3 Effectively once ledgerId + messageId -> sequenceId + Broker deduplication

5.Messaging semantics - 4 Limitations in effectively once 1. Only works with one partition producing 2. Only works with one message producing 3. Only works with on partition consuming 4. Consumers are required to store the message id and state for restoring

6. Streaming processing - 1 1. Received message A from Topic-1 and do some processing f (A) 1 Topic-1 A B Topic-2

7. Streaming processing - 2 2. Write the result message B to Topic-2 f (A) 2 Topic-1 A B Topic-2

8. Streaming processing - 3 3. Get send response from Topic-2 Topic-1 A f (A) 3 B Topic-2 How to handle get response timeout or consumer/function crash? Ack message A = At most once Nack message A = At least once

9.Streaming processing - 4 4. Ack message A Topic-1 A 4 f (A) B Topic-2 How to handle ack failed or consumer/function crash?

10.Transactional streaming semantics READ_COMMITTED 1. Atomic multi-topic publish and acknowledge 2. Message only dispatch to one consumer until transaction abort 3. Only committed message can be read by consumer https://github.com/apache/pulsar/wiki/PIP-31%3A-Transaction-Support

11.Transactional streaming demo Message<String> message = inputConsumer.receive(); Transaction txn = client.newTransaction().withTransactionTimeout(…).build().get(); CompletableFuture<MessageId> sendFuture1 = producer1.newMessage(txn).value(“output-message-1”).sendAsync(); CompletableFuture<MessageId> sendFuture2 = producer2.newMessage(txn).value(“output-message-2”).sendAsync(); inputConsumer.acknowledgeAsync(message.getMessageId(), txn); txn.commit().get(); MessageId msgId1 = sendFuture1.get(); MessageId msgId2 = sendFuture2.get();

12.Sticky consumer

13.Sticky consumer Consumer consumer1 = client.newConsumer() .topic(“my-topic“) .subscription(“my-subscription”) .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.sticky() .ranges(Range.of(0, 32767))) ).subscribe(); Consumer consumer2 = client.newConsumer() .topic(“my-topic“) .subscription(“my-subscription”) .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.sticky() .ranges(Range.of(32768, 65535))) ).subscribe(); https://github.com/apache/pulsar/wiki/PIP-34%3A-Add-new-subscribe-type-Key_shared

14.Batch receiving messages Consumer consumer = client.newConsumer() .topic(“my-topic“) .subscription(“my-subscription”) .batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(100) .maxNumBytes(2 * 1024 * 1024) .timeout(1, TimeUnit.SECONDS) ).subscribe(); Messages msgs = consumer.batchReceive(); // doing some batch operate https://github.com/apache/pulsar/wiki/PIP-38%3A-Batch-Receiving-Messages

15. Namespace change events persistent://tenant/ns/__change_events class PulsarEvent { EventType eventType; ActionType actionType; TopicEvent topicEvent; } https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events

16.Thanks Penghui Li

17.Pulsar Schema Bo Cong / 丛搏 智联招聘消息系统研发⼯程师 Pulsar schema、HDFS Offload 核⼼贡献者

18.Schema Evolution Data management can't escape the evolution of schema 2

19.Single version schema message 1 message 2 message 3 version 1 3

20.Multiple version schemas message 1 message 2 message 3 version 1 version 2 Version 3 4

21.Schema compatibility can read = Deserialization

22.Compatibility strategy evolution can read can read Back Ward version 2 version 1 version 0 may can’t read can read can read Back Ward Transitive version 2 version 1 version 0 can read

23.Evolution of the situation Class Person { Class Person { @Nullable @Nullable String name; @AvroDefault("\"Zhang San\"") Can read String name; } } Version 2 Version 1 Class Person { Can’t read Can read String name; } Version 3 7

24.Compatibility check Separate schema compatibility checker for producer and consumer isAllowAutoUpdateSchema = false Producer Check if exist Consumer

25.Upgrade way Different strategy with different upgrade way BACKWORD Consumers BACKWORD_TRANSITIVE FORWORD Producers FORWORD_TRANSITIVE Full Any order Full_TRANSITIVE

26.Produce Different Message Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)) .topic(topic).create(); Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class)) .topic(topic) .subscriptionName("sub1").subscribe() p.newMessage().value(data1).send(); p.newMessage(Schema.AVRO(V2Data.class)).value(data2).send(); p.newMessage(Schema.AVRO(V1Data.class)).value(data3).send(); Message<V2Data> msg1 = c.receive(); V2Data msg1Value = msg1.getValue(); Message<V2Data> msg2 = c.receive(); Message<V2Data> msg3 = c.receive(); V2Data msg3Value = msg3.getValue(); 10

27.Thanks Bo Cong

28.Kafka On Pulsar(KOP) 翟佳

29. What is Apache Pulsar? Flexible Pub/Sub Messaging backed by Durable log Storage