【分会场二06-戴资力】Apache Flink 流式应用中状态的数据结构定义升级

【分会场二06-戴资力】Apache Flink 流式应用中状态的数据结构定义升级
展开查看详情

1.State Schema Evolution for Apache Flink ® Applications Apache Flink ® ‫ݪل‬ғ data Artisans ᘳ֖ғ Software Engineer ᄍᦖᘏғ , Tzu-Li (Gordon) Tai

2.Speaker • Software Engineer @ data Artisans • PMC Member @ Apache Flink • Flink connectors (Apache Kafka, AWS Kinesis, Elasticsearch ...) • Upgradability of stateful Flink applications (e.g. State Schema Evolution)

3. About data Artisans data Artisans PLATFORM Original Creators of Real-Time Stream Processing Apache Flink® Enterprise Ready Apache Flink

4. 1.Evolving Stateful Flink Streaming Applications Flink Agenda 2. Schema Evolution for Flink Built-in Types Flink 3. Implementing Custom State Serializers

5.Evolving Stateful Flink Streaming Applications Flink

6. User code Anatomy of a Flink Local state local read / writes that manipulate state stream job upgrade backend Flink Persisted savepoint

7. User code Anatomy of a Flink Local state local read / writes that manipulate state stream job upgrade backend Flink Persisted savepoint persist to DFS on savepoint

8. User code upgrade application Anatomy of a Flink Local state stream job upgrade backend Flink Persisted savepoint

9. User code Anatomy of a Flink Local state stream job upgrade backend Flink Persisted Restore state savepoint to state backends

10. User code Anatomy of a Flink Local state continue to stream job upgrade backend access state Flink Persisted savepoint

11.Schema Evolution for Built-In Types Flink

12.State registration with built-in serialization ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, MyStateType.class ); ValueState<MyStateType> state = getRuntimeContext().getState(desc);

13.State registration with built-in serialization ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, type information for state MyStateType.class ); ValueState<MyStateType> state = getRuntimeContext().getState(desc);

14.State registration with built-in serialization ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, type information for state MyStateType.class ); ValueState<MyStateType> state = getRuntimeContext().getState(desc); • Flink infers information about the type and creates a serializer for it ● Primitive types: IntSerializer, DoubleSerializer, LongArraySerializer, etc. ● Tuples: TupleSerializer ● POJOs / Scala case classes: PojoSerializer, CaseClassSerializer ● Apache Avro types: AvroSerializer ● Fallback is Kryo: KryoSerializer

15. Evolving state schema for Apache Avro types Apache Avro • Can evolve schema according to Avro specifications* Avro * • Can swap between GenericRecord and code generated SpecificRecords GenericRecord SpecificRecord • Cannot change namespace of generated SpecificRecord classes SpecificRecord *Avro specifications: http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution

16.Status quo of schema evolution support • Avro types are the only built-in types that support schema evolution (as of 1.7) Avro (Flink 1.7 ) • More is planned for 1.8+: POJOs, Scala case classes, Rows (for Flink Tables) POJOs, Scala case class, Rows • Avoid using Kryo if you want evolvable schema for state KryoSerializer

17.Implementing Custom State Serializers

18.State registration with custom serializers ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, new MyStateTypeSerializer(); ); class MyStateTypeSerializer extends TypeSerializer<MyStateType> { … } ValueState<MyStateType> state = getRuntimeContext().getState(desc);

19.State Schema and Serialization • The terms data schema and serialization format are interchangeable here • Evolving state’s data schema requires evolving the state’s serializer • Depending on serialization behaviour of state backends (heap v.s. off-heap) state migration may be required ( / )

20. ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV1() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted savepoint

21. ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV1() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted Serialized by Key 1 bytes Key 2 bytes V1 V1 savepoint V1 serializer Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1

22. ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Heap Backends Persisted Key 1 bytes Key 2 bytes V1 V1 savepoint Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1

23. ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted Key 1 bytes V1 Requires Key 2 bytes V1 V1 serializer savepoint Key 3 bytes V1 for restore Key 4 bytes V1 Key 5 bytes V1

24. ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted Key 1 bytes Key 2 bytes V1 V1 savepoint Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1

25. ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted Key 1 bytes Key 2 bytes V1 V1 Serialized by V2 serializer savepoint Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1

26.State Serialization for Heap Backends • Serialization happens on restore + snapshot: lazy serialization, eager deserialization • By nature, restoring + snapshotting state is already a state migration process • Requires a written form of the previous serializer in the snapshot

27. ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV1() ); … 01110 … 01110 State Serialization for Local state Key 1 bytes Key 2 bytes Key 3 bytes Key 4 bytes Key 5 bytes backend Out-of-Core Backends V1 V1 V1 V1 V1 Persisted savepoint

28. ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV1() ); … 01110 … 01110 State Serialization for Local state Key 1 bytes Key 2 bytes Key 3 bytes Key 4 bytes Key 5 bytes backend Out-of-Core Backends V1 V1 V1 V1 V1 Persisted File transfer Key 1 bytes V1 savepoint Key 2 bytes V1 Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1

29. User code ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Out-of-Core Backends Persisted Key 1 bytes V1 savepoint Key 2 bytes V1 Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1