CarbonData是首个由中国公司发起并捐献给Apache基金会的开源项目,于2017年4月正式成为Apache顶级项目,由华为开源并支持Hadoop的高性能列式存储文件格式,其目的是提供一种统一的数据存储方案,以一份数据同时支持大数据分析的多种应用场景,All In One,并通过多级索引、字典编码、列式存储等特性提升 I/O 扫描和计算性能,实现百亿数据级秒级响应。CarbonData里程碑版1.3于2018年2月正式发布,该版本包含了集成Spark 2.2.1,支持预聚合,流式准实时入库,支持标准Hive分区等几个重要特性,而即将发布的1.4版本更是包含了支持Lucene Index加强文本检索能力,支持Bloom Filter Index等重要特性,在All In One的道路上又迈进了一步。

恩爸发布于2018/09/12

注脚

展开查看详情

1.CarbonData 源码浅析一:Create Table 前言 一个偶然的机会,从某 Spark 微信群知道了 CarbonData,从断断续续地去了解,到测试 1.2 版本,再到实际应用 1.3 版本的流式入库,也一年有余,在这期间,得到了 CarbonData 社区的陈亮,李昆,蔡强等大牛的鼎力支持,自己也从认识 CarbonData 到应用 CarbonData, 再到参与社区的转变,感谢他们! 要把 CarbonData 用得好,姿势必须正确:),优化步骤就必不可少,熟悉源码的必要性就 不言而喻了,因此准备再进一步研究学习 CarbonData 源码,同时把学习中的一些点滴记录 下来。目前,暂定推出四篇博客,从 Create Table,Load Data(DataFrame.write),Select Data, 及结合流式入库等四个方面来浅析下 CarbonData 源码。 言归正传哈。 简介 CarbonData 是首个由中国公司发起并捐献给 Apache 基金会的开源项目,于 2017 年 4 月正式成为 Apache 顶级项目,由华为开源并支持 Hadoop 的高性能列式存储文件格式,其 目的是提供一种统一的数据存储方案,以一份数据同时支持大数据分析的多种应用场景, All In One,并通过多级索引、字典编码、列式存储等特性提升 I/O 扫描和计算性能,实现 百亿数据级秒级响应。CarbonData 里程碑版 1.3 于 2018 年 2 月正式发布,该版本包含了集 成 Spark 2.2.1,支持预聚合,流式准实时入库,支持标准 Hive 分区等几个重要特性,而即 将发布的 1.4 版本更是包含了支持 Lucene Index 加强文本检索能力,支持 Bloom Filter Index 等重要特性,在 All In One 的道路上又迈进了一步。 目前 CarbonData 与 Spark,Presto, Hive 等框架做了集成,其中与 Spark 的集成最深入, 提供了基于索引、预聚合、全局字典等更多查询优化手段来提升性能,也提供了数据更新、 删除、 增量入库等数据管理能力, 可以说是在 Spark 开源框架上针对数据仓库类应用的增强。 一个计算框架方面的 All In One,一个存储格式方面的 All In One(其实 CarbonData 的功能已 经远远超越了数据格式的范畴了) ,两者结合碰出的火花着实十分吸引人,这也是当初我会 想要使用 CarbonData 并深入了解它的最根本原因。 此系列就是基于 CarbonData 里程碑版 1.3 的源码进行浅析。 Create Table 浅析 建表语句兼容 SparkSQL,只是使用‘carbondata’这个 DataSource,并扩展了一些属性 来描述 Global Dictionary Columns,Sort Columns,Sort Scope 等:  DICTIONARY_INCLUDE:指定做全局字典的列,主要用途在于: 1) 压缩数据,String 类型转换为 Int 进行存储,并采用 RLE 算法进行压缩,因为压缩 率提升了,而且是全局统一编码的字典,所以在做 group by 汇聚计算时读取数据

2. 量和 shuffle 的数据量减少了很多,带来性能提升; 2) 1.3 版本前,默认是对所有 String 类型的列做全局字典,不需要的列需要使用 DICTIONARY_EXCLUDE 属性来排除,对于上百列 String 类型列的表,配置起来有点 麻烦,而且上百列 String 列做全局字典对导入也是个梦魇,因此在 1.3 版本,废除 了 String 类型列默认做全局字典的规则,只保留了 DICTIONARY_INCLUDE 来配置需 要做全局字典的列,其他列一律不做;  SORT_COLUMNS:指定索引列,CarbonData 中默认采用一种多级索引的策略,能在 Driver 侧做 Pruning 时过滤掉不必要的 Block 或 Blocklet(文件内的数据块) : 1) 提升过滤查询的性能,对于过滤查询、点查,设置合理的 SORT_COLUMNS,会有 不小的性能提升; 2) 多级索引的顺序按照 SORT_COLUMNS 配置的列顺序,越常用的查询列放在最前面, 相同查询频率的列按基数从小到大排列; 3) 如果配置 SORT_COLUMNS="",即不做索引。  NO_INVERTED_INDEX:SORT_COLUMNS 配置的列中,如果不做倒排索引,可以通过该属 性进行排除,因为做倒排索引会使文件变大,如果希望提升压缩率,可以减少建立倒排 索引的列;  SORT_SCOPE:加载时,数据排序的范围,目前支持如下几种: 1) LOCAL_SORT:默认值,表示在一个 node 下做数据排序; 2) NO_SORT:即不排序,在需要快速入库时使用,可以在入库后系统闲时通过 Compaction 命令再建立索引; 3) BATCH_SORT:表示在一个 node 下,内存排序后直接生成 carbondata 文件,不再进 行 node 下的全排序; 使用该配置,可以提升加载速度,但查询性能不如 LOCAL_SORT; 4) GLOBAL_SORT:使用 spark 的资源调度算法和 GroupBy 做数据排序,会做 shuffle 操 作,因此对于点查,会有不错的性能提升,但加载性能会不如 LOCAL_SORT;  TABLE_BLOCKSIZE:表的 block 大小,默认值是 1024MB,类似于 HDFS 中的 block 概念, 对每条数据的 size 比较小的表做点查,可以设置较小的值,达到性能的提升。  STREAMING:设置为 true 即表示启动 Spark 流式入库作业进行小批量入库,为了避免小 文件问题,CarbonData 在流式入库时会先把数据 append 到一个行存文件中,在文件达 到一定大小后再转换为列存文件。 建表流程大致如下:

3.1. CarbonHelperSqlAstBuilder.createCarbonTable 1) 生成 TableIdentifier; 2) 对不支持的建表语句进行验证,比如不支持 Temp View,SKEWED BY,CLUSTERED BY 及 EXTERNAL TABLE; 3) 获取 column 列表,如果有分区字段,则进行合并fields; 4) 如果是 streaming 表,暂不支持分区表; 2. CarbonSpark2SqlParser.prepareTableModel: 该方法就是对所有列进行 dimension 及 measure 的划分; 1) 对所有列 fields 按建表时的顺序进行编号; 2) 调用 extractDimAndMsrFields 方法开始划分 dimension 和 measure 列:  sort columns 目 前 暂 不 支 持 如 下 类 型 : "array", "struct", "double", "float", "decimal",希望后续有更多的人来参与实现;  如果在 DICTIONARY_INCLUDE 中有定义的列,则加到 dimFields;  为 TIMESTAMP 类 型 且 不 存 在 于 dictIncludeCols 中 的 列 , 则 同 时 加 到 noDictionaryDims 和 dimFields;  类型如果是"string", "array", "struct", "timestamp", "date", "char",则加到 dimFields,但如果是 string 类型,则同时需要加入到 noDictionaryDims;  如果在 SORT_COLUMNS 中的列, 则同时加入到 noDictionaryDims 和 dimFields;  其他则加入到 msrFields  如果没有定义 sort columns,则把 dimFields 中(除"array", "struct"类型外)的 列都作为 sortKeyDims; 3) 调用 extractNoInvertedIndexColumns 获取 NO_INVERTED_INDEX 的列信息; 4) 调用 getPartitionInfo 获取分区信息:支持 HASH,RANGE,LIST 三种分区类型 (CarbonData 社区用户主要用的是 Hive 标准分区,HASH,RANGE,LIST 三种分区当前 为 Alpha 特性) ; 5) 检验 TABLE_BLOCKSIZE,只支持 1-2048MB 的范围; 6) 检验 TableLevelCompaction 属性; 7) 对 dimFields 中列进行重新排序,把复杂(Array,Struct)类型的数据排到最后,普 通类型放在前面;

4.3. TableNewProcessor(tableModel).process 1) 对 TableModel 中的 sortKeyDims、dimCols、msrCols 进行 UUID 和 Encoding 设置, 生成对应的 ColumnSchema;  对 于 sortKeyDims : 出 现 在 noDictionaryDims 中 的 列 , 都 不 具 有 Encoding.DICTIONARY;Date 和 Timestamp(不在 noDictionaryDims 中)类型则 具有 Encoding.DIRECT_DICTIONARY;  对于 dimCols:不在 sortKeyDims 中的列都增加 Encoding.DICTIONARY;  对于 msrCols:不具有任何 Encoding; 2) 扫描 allColumns, 如果列在 TableModel.noInvertedIdxCols 中或者 TableModel.msrCols 中,则为 NO INVERTED INDEX 列,否则就具有 Encoding.INVERTED_INDEX(此部分 逻辑可以进一步优化,后续会提交 PR 到社区,应该是 SORT COLUMNS 列都默认具 有 INVERTED INDEX,除非在 NO_INVERTED_INDEX 属性中特别指定的列,不过该问 题不会影响实际的写数据,写数据时的判断逻辑是正确的) ; 3) 生成 val tableInfo = new TableInfo() ,val tableSchema = new TableSchema() , tableInfo.setFactTable(tableSchema); 4) 至此,列的划分结束,allColumns 中的列是按照先 sort column 列,非 sort column 的 dimensions 列、complex data type 列、measures 列的顺序排序; 4. CarbonCreateTableCommand.processMetadata: 1) 生成 CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo); 2) 调用 CarbonUtil.convertToMultiGsonStrings,用 Gson 把 TableInfo 转为 json string, 并分割为几个部分,中间会加上一些 carbonSchemaPartsNo 及 carbonSchemaN 的字 符串,每个部分长度为 4000; 3) 拼凑实际的 Create Table 语句并执行: 指定 DataSource:'USING org.apache.spark.sql.CarbonSource',并把上一步骤分割的 字符串作为建表的 options 加入到 SQL 中; 5. CarbonSource.updateCatalogTableWithCarbonSchema : 把 上 步 骤 中 建 表 语 句 中 的 carbonSchemaPartsNo 和 carbonSchemaN 属性读取出来拼凑为 TableInfo 的 json string, 然 后 反 序 列 化 为 TableInfo 实 例 , 调 用 CarbonFileMetastore.saveToDisk(tableInfo, properties("tablePath"))方法,使用 ThriftWriter 写到 Metadata/schema 文件中;然后把 建表语句中的 carbonSchemaPartsNo 和 carbonSchemaN 属性去除; 6. CreateDataSourceTableCommand.run : 调 用 sessionState.catalog.createTable(newTable,

5.ignoreIfExists = false)执行建表语句;