- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
【分会场二06-戴资力】Apache Flink 流式应用中状态的数据结构定义升级
展开查看详情
1 .State Schema Evolution for Apache Flink ® Applications Apache Flink ® ݪلғ data Artisans ᘳ֖ғ Software Engineer ᄍᦖᘏғ , Tzu-Li (Gordon) Tai
2 .Speaker • Software Engineer @ data Artisans • PMC Member @ Apache Flink • Flink connectors (Apache Kafka, AWS Kinesis, Elasticsearch ...) • Upgradability of stateful Flink applications (e.g. State Schema Evolution)
3 . About data Artisans data Artisans PLATFORM Original Creators of Real-Time Stream Processing Apache Flink® Enterprise Ready Apache Flink
4 . 1.Evolving Stateful Flink Streaming Applications Flink Agenda 2. Schema Evolution for Flink Built-in Types Flink 3. Implementing Custom State Serializers
5 .Evolving Stateful Flink Streaming Applications Flink
6 . User code Anatomy of a Flink Local state local read / writes that manipulate state stream job upgrade backend Flink Persisted savepoint
7 . User code Anatomy of a Flink Local state local read / writes that manipulate state stream job upgrade backend Flink Persisted savepoint persist to DFS on savepoint
8 . User code upgrade application Anatomy of a Flink Local state stream job upgrade backend Flink Persisted savepoint
9 . User code Anatomy of a Flink Local state stream job upgrade backend Flink Persisted Restore state savepoint to state backends
10 . User code Anatomy of a Flink Local state continue to stream job upgrade backend access state Flink Persisted savepoint
11 .Schema Evolution for Built-In Types Flink
12 .State registration with built-in serialization ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, MyStateType.class ); ValueState<MyStateType> state = getRuntimeContext().getState(desc);
13 .State registration with built-in serialization ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, type information for state MyStateType.class ); ValueState<MyStateType> state = getRuntimeContext().getState(desc);
14 .State registration with built-in serialization ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, type information for state MyStateType.class ); ValueState<MyStateType> state = getRuntimeContext().getState(desc); • Flink infers information about the type and creates a serializer for it ● Primitive types: IntSerializer, DoubleSerializer, LongArraySerializer, etc. ● Tuples: TupleSerializer ● POJOs / Scala case classes: PojoSerializer, CaseClassSerializer ● Apache Avro types: AvroSerializer ● Fallback is Kryo: KryoSerializer
15 . Evolving state schema for Apache Avro types Apache Avro • Can evolve schema according to Avro specifications* Avro * • Can swap between GenericRecord and code generated SpecificRecords GenericRecord SpecificRecord • Cannot change namespace of generated SpecificRecord classes SpecificRecord *Avro specifications: http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
16 .Status quo of schema evolution support • Avro types are the only built-in types that support schema evolution (as of 1.7) Avro (Flink 1.7 ) • More is planned for 1.8+: POJOs, Scala case classes, Rows (for Flink Tables) POJOs, Scala case class, Rows • Avoid using Kryo if you want evolvable schema for state KryoSerializer
17 .Implementing Custom State Serializers
18 .State registration with custom serializers ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, new MyStateTypeSerializer(); ); class MyStateTypeSerializer extends TypeSerializer<MyStateType> { … } ValueState<MyStateType> state = getRuntimeContext().getState(desc);
19 .State Schema and Serialization • The terms data schema and serialization format are interchangeable here • Evolving state’s data schema requires evolving the state’s serializer • Depending on serialization behaviour of state backends (heap v.s. off-heap) state migration may be required ( / )
20 . ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV1() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted savepoint
21 . ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV1() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted Serialized by Key 1 bytes Key 2 bytes V1 V1 savepoint V1 serializer Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1
22 . ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Heap Backends Persisted Key 1 bytes Key 2 bytes V1 V1 savepoint Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1
23 . ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted Key 1 bytes V1 Requires Key 2 bytes V1 V1 serializer savepoint Key 3 bytes V1 for restore Key 4 bytes V1 Key 5 bytes V1
24 . ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted Key 1 bytes Key 2 bytes V1 V1 savepoint Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1
25 . ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Heap Backends Key 1 Key 2 Key 3 Key 4 Key 5 Persisted Key 1 bytes Key 2 bytes V1 V1 Serialized by V2 serializer savepoint Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1
26 .State Serialization for Heap Backends • Serialization happens on restore + snapshot: lazy serialization, eager deserialization • By nature, restoring + snapshotting state is already a state migration process • Requires a written form of the previous serializer in the snapshot
27 . ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV1() ); … 01110 … 01110 State Serialization for Local state Key 1 bytes Key 2 bytes Key 3 bytes Key 4 bytes Key 5 bytes backend Out-of-Core Backends V1 V1 V1 V1 V1 Persisted savepoint
28 . ValueStateDescriptor<MyStateType> desc = User code new ValueStateDescriptor<>( “my-value-state”, new SerializerV1() ); … 01110 … 01110 State Serialization for Local state Key 1 bytes Key 2 bytes Key 3 bytes Key 4 bytes Key 5 bytes backend Out-of-Core Backends V1 V1 V1 V1 V1 Persisted File transfer Key 1 bytes V1 savepoint Key 2 bytes V1 Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1
29 . User code ValueStateDescriptor<MyStateType> desc = new ValueStateDescriptor<>( “my-value-state”, new SerializerV2() ); State Serialization for Local state backend Out-of-Core Backends Persisted Key 1 bytes V1 savepoint Key 2 bytes V1 Key 3 bytes V1 Key 4 bytes V1 Key 5 bytes V1