【分会场三05-李钰 唐云】Flink中的两类新型状态存储

Flink中的两类新型状态存储
展开查看详情

1. FlinkӾጱӷᔄෛࣳᇫாਂ‫ؙ‬ Introduction of Two New StateBackends ‫ݪل‬ғᴨ᯾૬૬ ‫ݪل‬ғᴨ᯾૬૬ ᘳ֖ғṛᕆದ๞ӫਹ ᘳ֖ғṛᕆ୏‫ݎ‬ૡᑕ૵ ᄍᦖᘏғ๫᰷ҁᕷᶮ҂ ᄍᦖᘏғࠈԯҁ᝷ଗ҂ $OLEDED 6HQLRU([SHUW $OLEDED 6HQLRU6RIWZDUH(QJLQHHU $SDFKH+%DVH 30& $SDFKH)OLQN &RQWULEXWRU <X/L OL\X#DSDFKHRUJ <XQ7DQJ FKDJDQW\#DOLEDEDLQFFRP

2. य़ᕐ 2XWOLQH Ø )OLQNӾጱᇫாහഝٌ݊ਂ‫ࢧؙ‬ᶶ 5HYLHZRI)OLQN 6WDWHDQG6WDWH%DFNHQG Ø Ӟᐿ࿞ӧ)XOO*&ጱٖਂᇫாਂ‫ؙ‬ᥴ٬ොໜ $+HDS.H\HG6WDWH%DFNHQG WKDW1HYHU)XOO*& Ø Ӟᐿᦇᓒਂ‫ړؙ‬ᐶጱᇫாਂ‫ؙ‬ᥴ٬ොໜ 'HFRXSOH6WRUDJHDQG &RPSXWHRI 6WDWH0DQDJHPHQWLQ6WUHDP&RPSXWLQJ

3. )OLQNӾጱᇫாහഝ Review of Flink State Input Stream Output Stream Keyed vs. Operator Event Pattern ML Model State Historic Data Raw vs. Managed

4. )OLQNጱᇫாਂ‫ؙ‬ Review of Flink State Backends HeapKeyedStateBackend RocksDBKeyedStateBackend Ø State lives as Java objects on the heap Ø State lives as serialized bytes in offheap memory Ø Data is de/serialized only during state and on local disk snapshot and restore Ø Data is de/serialized on every read and update Ø Highest Performance Ø Lower Performance (~order of magnitude) Ø Memory overhead of representation Ø Relative low overhead of representation Ø State size limited by available heap memory Ø State size limited by available local disk space Ø Affected by GC overheads Ø Not affected by GC overheads Ø Currently no incremental checkpoints Ø Naturally supports incremental checkpoints

5.ਁ֛ ࿞ӧ)XOO*&ጱᇫாਂ‫ؙ‬ᥴ٬ොໜ A HEAP KEYED STATEBACKEND THAT NEVER FULLGC

6. ᭌೠᇫாਂ‫ؙ‬෸੫੩ጱᔿᕮ 7KH+HVLWDWLRQ:KHQ&KRRVLQJ%DFNHQGV ԅՋԍ๤ể‫ܩ‬አᇍ‫ڏ‬Ҙ ጱᦡᦇአԭଫ੒ጱ‫ܧ‬ᴾ :K\XVLQJVOHGJHKDPPHUWRFUDFNDQXW" FRQVLGHUDWLRQIRUWKHFDVH

7. ໐ஞದ๞ғٖਂᇫாፊഴ Key Tech: Heap Status Monitor Ø ੒ྯӻ.*؉ٖਂᕹᦇҁ݇ᘍ/XQFHQH 5DP8VDJH(VWLPDWRU҂ 'RKHDSDFFRXQWLQJIRUHDFK.* VLPLODUWR/XQFHQHŇV 5DP8VDJH(VWLPDWRU Ø ਧ๗᭗ᬦ-DYD0;%HDQV឴‫ݐ‬ञٖਂֵአఘ‫ེ&*҅٭‬හ޾؊๢෸ᳵ ٖਂֵአఘ‫٭‬ፊഴ *HWKHDSXVHGJF FRXQWDQGSDXVHWLPHZLWKIL[HGLQWHUYDO Heap Status Monitor Ø ༄ັฎ‫ํވ‬ROGJF‫ݎ‬ኞଚӬٖਂֵአ᩻ᬦᯈᗝᴀ꧊ &KHFNZKHWKHUKHDSXVHGH[FHHGVWKUHVKROGDIWHUROGJF Ø ༄ັฎ‫ํވ‬JF؊๢෸ᳵ᩻ᬦᯈᗝᴀ꧊ &KHFNZKHWKHUJF SDXVHWLPHH[FHHGVWKUHVKROG

8. ໐ஞದ๞ғහഝ៧ፏ޾‫ے‬᫹ᒽኼ Key Tech: Data Spill and Load Policy Ø ᴻٖਂᕹᦇक़҅੒ྯӻ.*؉ᦢᳯེ᷇ᕹᦇ 5HFRUGDQGFRPSXWHUHTXHVWUDWHIRUHDFK.* Ø ᯻‫ےݐ‬๦ଘ࣐ጱොୗᦇᓒӧ‫ݶ‬.*ጱ๦᯿ හഝ៧ፏ޾‫ے‬᫹ᒽኼ &RPSXWHWKHZHLJKWHGDYHUDJHRQWKHQRUPDOL]HG.*VL]HDQGUHTXHVWUDWH Data Spill and Load Policy Ø ៧ፏᭌೠᒽኼғս‫ض‬៧ፏ‫ܛ‬አٖਂ๋ग़҅᧗࿢๋੝ጱ.* 6SLOOSROLF\FKRRVH.*ZLWKPRUHKHDSRFFXSDQF\DQGORZHUUHTXHVWUDWH Ø ‫ے‬᫹ᭌೠᒽኼғս‫ےض‬᫹᧗࿢๋᷇ᔺ҅‫ܛ‬አٖਂ๋੝ጱ.* /RDGSROLF\FKRRVH.*ZLWKKLJKHUUHTXHVWUDWHDQGOHVVKHDSRFFXSDQF\

9. ໐ஞದ๞ғ៧ፏහഝਂ‫ؙ‬ᕮ຅݊ीᰁFKHFNSRLQWඪ೮ Key Tech: Data Structure on Disk and Incremental Checkpoint Ø ֵአӞᐿᔲ‫ٻ‬ጱ᪡ᤒᕮ຅ $FRPSDFWHG6NLS/LVW VWUXFWXUHIRUGDWDRQGLVN ᏺፏහഝਂ‫ؙ‬ᕮ຅ Ø ᭗ᬦGHOWDFKDLQጱොୗඪ೮ٟ෸॔‫҅ګ‬᭿‫ع‬ፗളๅෛ 'DWD6WUXFWXUHRQ'LVN 8VHGHOWDFKDLQLQVWHDGRIXSGDWHLQSODFHWRVXSSRUWFRS\RQZULWH Ø ݇ᘍᦞ෈ғ1LWUR IURP9/'% 5HIHUULQJWRWKH1LWUR SDSHUIURP9/'% Ø ी‫ے‬VQDSVKRWᇇ๜ଧ‫ݩ‬ $GGDQGUHFRUGVQDSVKRWYHUVLRQ ीᰁFKHFNSRLQWඪ೮ Ø ࣁහഝӾी‫ے‬ᇇ๜‫ֵ҅௳מ‬አGHOWDFKDLQጱොୗ‫כ‬ኸVQDSVKRWӾጱහഝᇇ๜ ,QFUHPHQWDO&KHFNSRLQW 6DYHYHUVLRQLQYDOXHDQGUHVHUYHGDWDLQVQDSVKRWWKURXJKGHOWDFKDLQ Ø ٟ‫޾ف‬ளᆙ෸੒෯ᇇ๜ᬰᤈ༄ັ޾Ⴔቘ &KHFNGDWDLQGHOWDFKDLQIRUFOHDQXSGXULQJSXWRUVQDSVKRW

10. ௔ᚆ੒ྲ޾୏რᦇ‫ښ‬ Performance and Upstreaming Plan Ø :RUGFRXQWMREVWDWHVL]H 0% 7HVWHGZLWK3&,H66' QPS (K/s) Note ௔ᚆ੒ྲ Heap (all in memory) 400 -Xmx=10GB 3HUIRUPDQFH RocksDB (all in memory) 100 Cache=10GB Heap (memory + disk) 160 -Xmx=3GB (spill ratio=57%) RocksDB (memory + disk) 100 Cache=3GB Ø ੴᴴ௔ғFKHFNSRLQW๗ᳵ&38ၾᘙ᫾ṛ ୏რᦇ‫ښ‬ 0RUH&38FRVWGXULQJFKHFNSRLQWEHFDXVHRIVFDQ 8SVWUHDPLQJ 3ODQ Ø ਖ਼ࣁᬪ๗ᨯሠࢧᐒ‫܄‬ :LOOVWDUWWKHXSVWUHDPLQJ VRRQ

11.ਁ֛ Ӟᐿᦇᓒਂ‫ړؙ‬ᐶጱᇫாਂ‫ؙ‬ᥴ٬ොໜ A SLOUTION TO DECOUPLE STORAGE AND COMPUTE OF STATE MANAGERMENT IN STREAM COMPUTING

12. %DFNJURXQG ਂ‫ᦇؙ‬ᓒᘠ‫ݳ‬ጱຝ຅ The Compute-storage Coupled Arch Data locality was important when network is bottleneck, but things have already changed Year 2003 2018 Network 100Mbps 10Gbps+

13. %DFNJURXQG ԯᦇᓒ෸դ҅ਂ‫ᦇؙ‬ᓒᘠ‫ݳ‬૪ᕪӧٚᭇଫ୨௔ᦇᓒጱᵱ࿢ Decoupling storage resource is necessary for elastic computing Ø ਂ‫ᦇؙ‬ᓒ‫ړ‬ᐶጱ᩽۠ Trend to Decouple Decoupling makes it easier for hardware budget Compute and Storage Ø Decoupling makes better resource utilization Ø Decoupling makes better power saving

14. %DFNJURXQG Take RocksDB StateBackend for example ፓ‫)ڹ‬OLQNጱၞᦇᓒ 6WDWHᓕቘ๢‫ګ‬ &XUUHQW6WDWH 0DQDJHPHQW ,Q $SDFKH )OLQN

15. 2YHUYLHZ Take Niagara* StateBackend for example ᦇᓒਂ‫ړؙ‬ᐶጱ 6WDWHᓕቘ๢‫ګ‬ Storage-Compute Decoupled Arch ,- ,

16.,PSOHPHQWDWLRQ Ø Checkpoint created and restored more smoothly in seconds for DFS ਂ‫ᦇؙ‬ᓒ‫ړ‬ ᐶጱսᅩ Advantages

17. ,PSOHPHQWDWLRQ Ø BFO Checkpoint created and restored more smoothly in seconds for DFS D a D ਂ‫ᦇؙ‬ᓒ‫ړ‬ I c ᐶጱսᅩ Restore backend in seconds when rescaling, avoid long consumed time of IO and DB scan in current Flink arch Advantages

18. ,PSOHPHQWDWLRQ Ø Checkpoint created and restored more smoothly in seconds for DFS ਂ‫ᦇؙ‬ᓒ‫ړ‬ ᐶጱսᅩ Advantages

19. ,PSOHPHQWDWLRQ Ø m it lf nru Avoid write latency for decoupled storage arch DO - - fN if e dc A l s ਂ‫ᦇؙ‬ᓒ‫ړ‬ᐶጱੴᴴ௔ n b ga - S eI pF Constraints and Solution n o f P v By means of async-arch of Niagara on Pangu*, data would be flushed to DFS asynchronously - , * * * ** *, *

20. ,PSOHPHQWDWLRQ Ø ha I n Le i lo p Avoid read latency for decoupled storage arch B DAbd Lem i M f kB c Seg ਂ‫ᦇؙ‬ᓒ‫ړ‬ By means of DADI*, a local fast second-level cache for LSM-like DB, we could guarantee the ᐶጱੴᴴ௔ read latency nearly to traditional arch. Constraints and Solution Integrated Decoupled ** . . * * * -. - * * * * * -* . * * . .

21.,PSOHPHQWDWLRQ Ø F Clean useless directories/files on DFS eagerly ਂ‫ᦇؙ‬ᓒ‫ړ‬ ᐶጱੴᴴ௔ F D M Constraints J and Solution Introduce checkpoint trash cleaner mechanism in JobManager

22.We are recruiting ங‫מ‬ ᰓᰓ WeChat DingDing

23.THANKS