CarbonData是一种高性能大数据存储方案,支持快速过滤查找和即席OLAP分析,已在20+企业生产环境上部署应用,其中最大的单一集群数据规模达到几万亿。针对当前大数据领域分析场景需求各异而导致的存储冗余问题,业务驱动下的数据分析灵活性要求越来越高,CarbonData提供了一种新的融合数据存储方案,以一份数据同时支持多种应用场景,并通过多级索引、字典编码、预聚合、动态Partition、准实时数据查询等特性提升了IO扫描和计算性能,实现万亿数据分析秒级响应。

注脚

展开查看详情

1.Apache CarbonData技术原理及使用介绍

2.内容大纲 – 1. 项目背景 – 2. CarbonData 技术原理 – 3. CarbonData 使用介绍 – 4. What’s coming in 1.5

3.企业中包含多种数据应用,从商业智能、批处理到机器学习 Report & Dashboard OLAP & Ad-hoc Batch processing Machine learning Realtime Analytics Big Table Small table data Ex. CDR, transaction, Small table Web log,… Unstructured data

4. 来自数据的挑战 • Data Size 数据规模 • Single Table >10 B 单表大于100亿行 • Fast growing 快速增长 • Nested data structure for complex object 数据结构复杂 • Multi-dimensional 数据维度多 • Every record > 100 dimensions 分析的维度超过100 • Add new dimension occasionally 维度不断增长 • Billion level high cardinality 不同值范围在亿级别

5.来自应用的挑战 • Enterprise Integration 企业应用集成 • SQL 2003 Standard Syntax Multi-dimensional OLAP Query • BI integration, JDBC/ODBC • Flexible Query 灵活查询 无固定模式 • Any combination of dimensions • OLAP Vs Detail Record • Full scan Vs Small scan Full Scan Query Small Scan Query • Precise search & Fuzzy search

6.How to choose storage? 如何构建大数据统一存储平台?

7.How to choose storage 当前各种大数据方案分析 1. NoSQL Data(适合实时应用,不适合分析型应用) •只支持单列key value查询 <5ms •不支持标准SQL 2. MPP Data(适合中小规模数据分析) •Shared-nothing架构,并行计算 •不支持大集群 <100节点,没有容错,扩展能力有上 限,不能与大数据生态集成 3. Cube Data(适合BI类MOLAP应用) •预聚合,查询快 •但数据膨胀大,支持维度少,不支持查明细数据 4. Search Engine Data(适合文本类分析) •通过索引快速找到数据 •数据膨胀大2-4倍,不支持标准SQL 5. SQL on Hadoop •聚焦计算引擎的分布式扫描 •存储效率不高

8.架构师的苦恼:不同应用不同数据存储,如果让数据存储统一? Choice 1: Compromising Choice 2: Replicating of data 做出妥协,只满足部分应用 复制多份数据,满足所有应用 App1 App2 App3 App1 App2 App3 Loading Replication Data Data

9.CarbonData目标: 一份数据满足多种业务需求,与大数据生态无缝集成 Multi-dimensional OLAP Query CarbonData: Unified Storage Full Scan Query Small Scan Query 一份数据满足多种分析场景 详单过滤,海量数仓,数据集市,…

10.内容大纲 – 2. CarbonData 技术原理 – CarbonData File 文件格式 – CarbonData Table – DataMap 介绍

11.CarbonData设计思路 – 数据统一存储:一份数据支持多种业务场景,减少数据孤岛和冗余,通过数 据共享产生更大价值。 – 大集群:区别于以往的单机系统,用户希望新的大数据存储方案能应对日益 增多的数据,随时可以通过增加资源的方式横向扩展,无极扩容。 – 易集成:提供标准接口,新的大数据方案与企业已采购的工具和IT系统要能 无缝集成,支撑老业务快速迁移。与大数据生态软件能无缝集成。 – 高性能:数据分析要求越来越高效、实时。 – 开放生态:通过开源开放,让更多的客户和合作伙伴的数据连接在一起,发 挥更大的价值,与当前大数据生态无缝集成。

12.CarbonData File文件格式 包含索引的列式文件格式

13.CarbonData File文件格式 Carbon Data File File Header • 数据布局 Version • Block:一个HDFS文件 Schema • Blocklet:文件内的列存数据块,是最小的IO读取单元 • Column chunk: Blocklet内的列数据 Blocklet 1 Column 1 Chunk • Page:Column chunk内的数据页,是最小的解码单元 header Page1 Page2 … Column 2 Chunk … • 元数据信息 Column n Chunk • Header:Version,Schema … • Footer: Blocklet Offset, Index & 文件级统计信息 Blocklet N • 内置索引和统计信息 File Footer • Blocklet索引:B Tree start key, end key Blocklet Offset, Index & Stats • Blocklet级和Page级统计信息:min,max等

14.CarbonData File • 压缩编码 • RLE, 本地字典编码, 全局字典编码 • 自适应编码 • Snappy, Zstd • 数据类型 SMALLINT, INT/INTEGER, BIGINT, DOUBLE, DECIMAL TIMESTAMP, DATE STRING, CHAR, VARCHAR ARRAY, STRUCT, MAP BOOLEAN, BINARY

15.CarbonData Table 支持索引的、对CarbonData File的原子化读写操作

16.CarbonData Table • 支持Segment级的读写操作的数据一致性和原子性。 • 一次Load/Insert对应生成一个Segment • 每个Segment 包含数据和元数据: CarbonData File和Index文件 • 不同的Segment可以有不同的文件格式 • 流式入库的Streaming Segment采用了写优化的文件格式 • 历史数据的Batch Segment采用读优化的文件格式 • 将支持更多其他格式,例如: CSV, Parquet

17. CarbonData Table Layout Spark Driver Table Level Index Spark HDFS /table_name/fact/segment_id /table_name/meta Carbon File Carbon File Carbon File Carbon File Index File Dictionary Schema File File Data Data Data Data All Footer Dictionary Latest Footer Footer Footer Footer Map Schema

18.CarbonData DataMap 数据地图

19.DataMap执行模型 Query • 下推filter和projection到DataMap Spark Driver • DataMap可以是集中式或者分布 DataMap Execution 式的(避免大内存问题) Spark Driver (集中式, 分布式) • DataMap可以缓存在内存或者存 储在磁盘上 Executor Executor Executor • 主要包括Index DataMap和MV DataMap Carbon Data Carbon Data Carbon Data File Map File Map File Map Data Node Data Node Data Node

20.Index DataMap • 主索引 • B-Tree • MinMax • 二级索引 • B-Tree:适合一般OLAP多维分析 • BloomFilter:适合id类字段过滤,如手机号,终端ID,车牌等

21. MV DataMap • 物化视图可以进行预汇聚、Join等操作,用 数据入库时间换取查询时间 • 入库方式: • 立即入库:入库主表时自动入库MV • 延迟入库:用户手动操作 • 查询时会将查询语句转换为Modular Plan的 查询计划,将原SQL改写为针对MV的SQL • 当命中多个MV时,自动选取代价最小 的MV • 当前支持projection, filter, groupby, join, having等语法

22.内容大纲 – 3. CarbonData 使用介绍 – CarbonData File 读写 – CarbonData Table 操作 – DataMap 使用 – 流式入库介绍

23.CarbonData File读写 使用CarbonData SDK读写CarbonData文件

24.CarbonData File 读写 CarbonWriter writer = CarbonWriter.builder() .outputPath(path) .buildWriterForCSVInput(new Schema(fields)); writer.write(row); writer.close; CarbonReader reader = CarbonReader .builder(path, "_temp") .projection(strings) .build(); reader.readNextRow(); reader.close();

25.CarbonData Table操作 与SparkSQL深度集成

26.新建CarbonData表 CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type , ...)] STORED AS carbondata [TBLPROPERTIES (property_name=property_value, ...)] [LOCATION 'path'] 如何正确配置Table Properties,提升查询性能?

27.配置Table Properties • SORT_COLUMNS • 默认按所有维度列在schema中的顺序对数据做row排序 • 详单过滤:频繁使用的过滤条件列,越靠前的列过滤效果越好 • OLAP分析:查询使用的列,按基数由低到高排序,提升压缩率,降IO • COLUMN_META_CACHE • 默认会缓存所有列的Min/Max值 • 建议:查询中使用的过滤条件列,可以减少索引对内存的使用 • CACHE_LEVEL • Block:默认值,占用较少的内存,降低部分driver侧过滤性能 • Blocklet:更加精准的driver侧过滤,占用较多的内存

28.配置Table Properties • DICTIONARY_INCLUDE • 低基数String列,提升压缩率 • 延迟解码,可以提升Group by效率 • LOCAL_DICTIONARY • Blocklet级别的字典编码,可以节省存储空间,提升查询速度 • SORT_SCOPE • NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT

29.数据导入 使用SQL LOAD DATA [LOCAL] INPATH 'folder path' [OVERWRITE] • 符合事务的原子性,一致性 INTO TABLE tablename OPTIONS(...) • 支持多并发数据导入 INSERT INTO TABLE tablennme select ... FROM table1 • 支持并发导入和查询 使用Dataframe df.write .format(“carbondata") .options("tableName“, “t1")) .mode(SaveMode.Overwrite) .save()

30. 查询数据 使用SQL SELECT project_list FROM t1 • Same SQL syntax and DataFrame WHERE cond_list API as in SparkSQL GROUP BY columns ORDER BY columns • 支持并发导入,更新,合并和查 询。 使用Dataframe df = sparkSession.read .format(“carbondata”) .option(“tableName”, “t1”) .load(“path_to_carbon_file”) df.select(…).show

31.数据更新/删除 修改表1的一列 UPDATE table1 A phone, 70 60 SET (A.REVENUE)= (A.REVENUE – 10) car,100 WHERE A.PRODUCT = ‘phone’ phone, 30 20 使用表2的数据修改表1的两列 UPDATE table1 A SET (A.PRODUCT, A.REVENUE) = ( SELECT PRODUCT, REVENUE table1 table2 FROM table2 B WHERE B.CITY = A.CITY AND B.BROKER = A.BROKER) WHERE A.DATE BETWEEN ‘2017-01-01’ AND ‘2017-01-31’ 删除表1的部分记录 123, DELETE FROM table1 A abc WHERE A.CUSTOMERID = ‘123’ 456, jkd

32.Segment管理 • 展示t1表的segment列表 SHOW SEGMENTS FOR TABLE t1 LIMIT 100 • 按segment id 删除指定segment DELETE FROM TABLE t1 WHERE SEGMENT.ID IN (98, 97, 30) • 按segment加载时间删除segment DELETE FROM TABLE t1 WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06' • 指定需要查询的segment SET carbon.input.segments.default.t1 = 100, 99

33.数据合并 Minor合并 ALTER TABLE table_name COMPACT 'MINOR' Major合并 ALTER TABLE table_name COMPACT MAJOR' 自定义 ALTER TABLE table_name COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (2,3,4) 合并后清除SEGMENTS文件及目录 CLEAN FILES FOR TABLE carbon_table

34.分区表 新建分区表 CREATE TABLE sale(id string, quantity int ...) PARTITIONED BY (country string, state string) STORED AS carbondata 静态分区加载数据 LOAD DATA LOCAL INPATH ‘folder path’ INTO TABLE sale PARTITION (country = 'US', state = 'CA') INSERT INTO TABLE sale PARTITION (country = 'US', state = 'AL') SELECT <columns list excluding partition columns> FROM another_sale 动态分区加载数据 LOAD DATA LOCAL INPATH ‘folder path’ INTO TABLE INSERT INTO TABLE sale SELECT <columns list including partition columns> FROM another_sale

35.DataMap使用 加速CarbonData Table查询

36.新建DataMap CREATE DATAMAP [IF NOT EXISTS] datamap_name [ON TABLE main_table] • bloomfilter USING "datamap_provider“ [WITH DEFERRED REBUILD] • preaggregate DMPROPERTIES ('key'='value', ...) • mv AS SELECT statement 启用DataMap SET carbon.datamap.visible.dbName.tableName.dataMapName = true 禁用DataMap SET carbon.datamap.visible.dbName.tableName.dataMapName = false

37.BloomFilter DataMap CREATE DATAMAP [IF NOT EXISTS] datamap_name ON TABLE main_table USING 'bloomfilter’ DMPROPERTIES ( ‘INDEX_COLUMNS'='city, name’, • Index_columns: 索引的列 'BLOOM_SIZE'='640000’, • Bloom_size: 位数组大小 'BLOOM_FPP'='0.00001’, • Bloom_fpp: 期望的误判率 'BLOOM_COMPRESS'='true') • Bloom_compress: 是否压缩索引

38.Pre-aggregate DataMap CREATE DATAMAP agg_sales ON TABLE sales USING "preaggregate" AS SELECT country, sex, sum(quantity), avg(price) FROM sales GROUP BY country, sex • 支持的聚合函数: SUM,AVG,MAX,MIN,COUNT • 只支持单表的聚合操作

39.MV DataMap CREATE DATAMAP simple_agg_with_join USING 'mv’ AS SELECT id,address, sum(age) FROM mainTable INNER JOIN dimtable ON mainTable.name=dimtable.name GROUP BY id ,address

40.Streaming Ingestion 流式入库,准实时查询

41. 流式入库 Kafka • 混合格式 Query Stream • 近期数据:streaming segment采用写优 化的文件格式 Ingest • 历史数据: batch segment采用列存文件 SparkStream& SparSQL & CarbonData CarbonData Stream Ingest Query Process • 可以查询近期数据和历史数据 • 按streaming segment大小自动转 换为batch segment Index file Carbon Carbon Stream Columnar file Carbon • 支持预聚合 file Preagg file Auto Conversion Auto Aggregate CarbonTable

42.Stream SQL – 新建kafka source表 – 新建流式入库作业 CREATE TABLE source (sensor_Id string, …) CREATE STREAM stream ON TABLE sink STOREAD AS 'carbondata' STMPROPERTIES ( TBLPROPERTIES ( ‘'trigger'='ProcessingTime’, 'streaming'='source', 'interval'=‘10 seconds’) 'format'='kafka’, AS 'kafka.bootstrap.servers'='host1:port1,…’, SELECT * FROM source 'subscribe'='topic1') WHERE temperature > 30.0 – 新建carbon sink表 – 停止作业 CREATE TABLE sink (sensor_Id string, …) DROP STREAM stream STOREAD AS ‘carbondata’ TBLPROPERTIES (streaming=‘sink’)

43.总结:CarbonData的能力 – 数据管理: • 增量入库,建立索引:由用户权衡入库时间、索引粒度和查询性能 • 流式入库:可与Kafka, Spark, HBase集成,实现准实时分析 • 批量更新:支持快速更新事实表或维表,闲时做数据合并 – 查询: • 通过DataMap索引,减少IO:适合ad-hoc查询,任意维度组合查询场景 • 向量化处理,提升扫描性能:适合全表扫描、汇总分析场景 • 基于物化视图和CBO的查询优化:BI分析类场景加速 – 大规模: • 计算与存储分离:支持从GB到PB大规模数据,十万亿数据秒级响应 – 部署: • 与大数据生态无缝集成,利用云存储和Hadoop集群

44.• Apache Incubator Project since June, 2016 Compute • Apache releases • >10 stable releases • Coming release: 1.5.0, 2018-09 Storage • Contributors:

45.4. What’s coming in 1.5 – 1.5.0 – 1.5.1 – Support Spark File Format – OLAP on demand – Improve performance on S3 – MV – Spark 2.3 and Hadoop 3.1 – Incremental load – Complex datatype Enhancement – DataMap choosing – Stream format support min/max – Presto enhancement index

46.欢迎参与Apache CarbonData社区 • website: http://carbondata.apache.org • Code: https://github.com/apache/carbondata • JIRA: https://issues.apache.org/jira/browse/CARBONDATA • Mail list: dev@carbondata.apache.org, user@carbondata.apache.org • 欢迎在Maillist上提问,共同探讨和开发CarbonData 2.0新特性

user picture
CarbonData是一种高性能大数据融合存储方案,以一份数据同时支持多种应用场景,通过多级索引、字典编码、预聚合、动态Partition等特性提升了IO扫描和计算性能,已在30+企业生产环境上部署应用,其中最大的单一集群数据规模达到十万亿。

相关文档