TGIP-CN-012: Pulsar Schema


1. Getting Started Producer<byte[]> producer = client.newProducer() public class User { .topic(topic) String name; .create(); int age; User user = new User(“Tom”, 28); } // serialize the `user` by yourself; byte[] message = …; producer.send(message);

2. Parse Exceptions • The field you’re looking for doesn’t exist any more • The type of field has changed • e.g. what used to be a `String` is now an `Integer`

3.“I am starting to think that `schemaless` just means your schema is scattered randomly throughout your code. It is almost impossible to troubleshoot anything non-trivial because there are endless assumptions, but few explicit requirements” –Robert Kluin (@robertkluin) May 29, 2018

4.Introducing Schema

5. Schema • Define how to serialize and deserialize data • Define how to evolve your data format • Handle backward compatibility

6. Schema Info • `type`: the type of schema. • `schema`: the schema data. Schema type implementation specific. • `properties`: the properties associated with the schema. Application specific data.

7. Schema Types • Primitive Types • Complex Types

8. Primitive Types • `BOOLEAN`: a binary value • `INT8`: 8-bit signed integer • `INT16`: 16-bit signed integer • `INT32`: 32-bit signed integer • `INT64`: 64-bit signed integer • `FLOAT`: single precision (32-bit) IEEE 754 floating-point number • `DOUBLE`: double precision (64-bit) IEEE 754 floating-point number • `BYTES`: sequence of 8-bit unsigned bytes • `STRING`: unicode character sequence • `TIMESTAMP` (`DATE`, `TIME`) : A logic type represents a specific instant in time, with millisecond precision. It stores the number of milliseconds since `January 1, 1970, 00:00:00 GMT` as a `INT64` value.

9.Primitive Types - Example

10. Complex Types • Key-Value • Struct

11. Struct Types • Supported Types: AVRO / JSON / PROTOBUF • Schema Definition - AVRO • Two approaches • Static - The struct is predefined. POJO, or Avro/Protobuf generated classes • Generic - The struct is unknown or not predefined.

12.Static Schema

13. Generic Schema • `GenericSchemaBuilder`: Build a generic schema • `GenericRecordBuilder`: Build a generic record

14.Generic Schema - Example

15. Auto Schema • AUTO_PRODUCE • Producers validate bytes according to the schema in the topic • AUTO_CONSUME • Consumers deserialize messages into `GenericRecord` • Schema is unknown in advance

16. How does Schema work 1. `Schema.AVRO(User.class)` => Generates SchemaInfo 2. `newProducer` => connect to broker and send the schema info 3. Broker receives the schema info 1. If a topic doesn’t have a schema, creates the schema 2. If a topic already have a schema, broker verifies if the schema is compatible with existing schemas. 1. If it is compatible and is a new schema, generates a new version of schema 2. If it is not compatible, fail the producer

17.Schema Evolution

18. Schema Compatibility Check • Schema Compatibility Checker • One checker per schema type • Configured by `schemaRegistryCompatibilityCheckers` • Only AVRO and JSON supports schema evolution for now • All other schema types don’t allow schema evolution

19. Compatibility Check Strategy Check against what Strategy Changes Allowed Upgrade first schemas ALWAYS_INCOMPATIBLE All changes are disabled All previous versions None ALWAYS_COMPATIBLE All changes are allowed Latest version Depends Delete fields BACKWARD Latest version Consumers Add optional fields Add fields FORWARD Latest version Producers Delete optional fields FULL Modify optional fields Latest version Any Order

20. Order of upgrading clients • BACKWARD: Upgrade all consumers before start producing new events • FORWARD: • Upgrade all producers to new schema • Make sure the data produced using old schemas are not available to consumers anymore • Then upgrade producers and consumers independently • FULL: Upgrade producers and consumers independently • ALWAYS_COMPATIBLE: Be cautious about when to upgrade clients

21.Managing Schemas

22. Schema Restful API • Upload Schema • Get Schema : latest or by version • Delete Schema

23. Query Streams using Schema • Presto • Hive • Flink SQL • Spark SQL

24.Pulsar Presto SQL

25.Pulsar Presto SQL

StreamNative 是一家围绕 Apache Pulsar 和 Apache BookKeeper 打造下一代流数据平台的开源基础软件公司。秉承 Event Streaming 是大数据的未来基石、开源是基础软件的未来这两个理念,专注于开源生态和社区的构建,致力于前沿技术。