Base利用Coprocessor实现数据聚合

1.HBase Coprocessor简介 2.HBase Coprocessor的应用场景 3.HBase Coprocessor的数据聚合实现与序列化
展开查看详情

1. HBase Coprocessor 中国 HBase 技术社区网站:http://hbase.group

2. 目录 / Contents 01 Coprocessor简介 02 Endpoint服务端实现 03 Endpoint客户端实现 04 Observer实现二级索引 中国 HBase 技术社区网站:http://hbase.group

3. 01 Coprocessor简介 Remember what should be remembered, and forget what should be forgotten.Remember what should be remembered, and forget what should be forgotten. 中国 HBase 技术社区网站:http://hbase.group

4. 1 Coprocessor简介 提供接口 RegionObser ver:提供客户端的数据操纵事件钩子: Get、Put、Delete、Scan等 实现目的 WALObser ver:提供WAL相关操作钩子 HBase无法轻易建立“二级索引” MasterObserver:提供DDL-类型的操作钩子。如创 执行求和、计数、排序等操作比较困难,必 建、删除、修改数据表等 须通过mapreduce/Spark实现,对于简单 Endpoint:终端是动态RPC插件的接口,它的实现代 的统计或聚合计算时,可能会因为网络开销 码被安装在服务器端,能够通过HBase RPC调用唤醒 大而带来性能问题 灵感来源 应用范围 灵感来源于bigtable的协处理器,包含如下特性 通过使用RegionObser ver接口可以实现二 每个表服务器的任意子表都可以运行代码 级索引的创建和维护 客户端能够直接访问数据表的行,多行读写会自 通过使用Endpoint接口,在对数据进行简 动分片成多个并行的RPC调用 单排序和sum,count等统计操作时,能够 极大提高性能 中国 HBase 技术社区网站:http://hbase.group

5. 02 EndPoint的服务端实现 Remember what should be remembered, and forget what should be forgotten.Remember what should be remembered, and forget what should be forgotten. 中国 HBase 技术社区网站:http://hbase.group

6. 2 EndPoint的服务端实现/ Endpoint服务端架构 Region Server EndPoint (sum) region EndPoint (sum) Client region …….. EndPoint (sum) region 中国 HBase 技术社区网站:http://hbase.group

7. 2 EndPoint的服务端实现/ Protobuf定义 如前所述,客户端和服务端之间存在RPC通信,所以两者间需要确 定接口,HBase的协处理器是通过Protobuf协议来实现数据交换 的,所以需要通过Protobuf来定义接口。 message AggregateRequest { //column interpreter的类名 required string interpreter_class_name = 1; required Scan scan = 2; service AggregateService { optional bytes interpreter_specific_bytes = 3; rpc GetMax (AggregateRequest) returns (AggregateResponse); } rpc GetMin (AggregateRequest) returns (AggregateResponse); message AggregateResponse { rpc GetSum (AggregateRequest) returns (AggregateResponse); repeated bytes first_part = 1; rpc GetRowNum (AggregateRequest) returns optional bytes second_part = 2; (AggregateResponse); } rpc GetAvg (AggregateRequest) returns (AggregateResponse); rpc GetStd (AggregateRequest) returns (AggregateResponse); rpc GetMedian (AggregateRequest) returns (AggregateResponse); } 中国 HBase 技术社区网站:http://hbase.group

8. 2 EndPoint的服务端实现/ EndPoint服务端框架 <<接口>> AggregateService Coprocessor getMax(RpcController controller, <<接口>> void start(CoprocessorEnvironment AggregateRequest request, CoprocessorService env) RpcCallback<AggregateResponse> Service getService() void stop(CoprocessorEnvironment done) env) void getSum void getMax void getMin AggregateImplementation Service getService() void start void stop void getSum 中国 HBase 技术社区网站:http://hbase.group

9. 2 EndPoint的服务端实现/ EndPoint sum代码实现 public void getSum(RpcController controller, do { hasMoreRows = scanner.next(results); AggregateRequest request, RpcCallback<AggregateResponse> int listSize = results.size(); done) { AggregateResponse response = null; for(int i = 0; i < listSize; ++i) { RegionScanner scanner = null; //取出列值 // 单个region上的计算结果值 Object temp = ignored.getValue(colFamily, qualifier, (Cell)results.get(i)); long sum = 0L; if(temp != null) { sumVal = ignored.add(sumVal, ignored.castToReturnType(temp)); try { } ColumnInterpreter ignored = } this.constructColumnInterpreterFromRequest(request); Object sumVal = null; results.clear(); Scan scan = ProtobufUtil.toScan(request.getScan()); } while(hasMoreRows); scanner = this.env.getRegion().getScanner(scan); if(sumVal != null) { byte[] colFamily = scan.getFamilies()[0]; response = NavigableSet qualifiers = AggregateResponse.newBuilder().addFirstPart(ignored.getProtoForPromotedType(sumVa (NavigableSet)scan.getFamilyMap().get(colFamily); l).toByteString()).build(); byte[] qualifier = null; } if(qualifiers != null && !qualifiers.isEmpty()) { } catch (IOException var27) { ResponseConverter.setControllerException(controller, var27); qualifier = (byte[])qualifiers.pollFirst(); } finally { } if(scanner != null) { try { ArrayList results = new ArrayList(); scanner.close(); boolean hasMoreRows = false; } catch (IOException var26) { ; } 中国 HBase 技术社区网站:http://hbase.group }

10. 03 EndPoint的客户端实现 Remember what should be remembered, and forget what should be forgotten.Remember what should be remembered, and forget what should be forgotten. 中国 HBase 技术社区网站:http://hbase.group

11. 3 EndPoint的客户端实现 HBase 提供了客户端 Java 包 org.apache.hadoop.hbase.client.coprocessor。它提供 以下三种方法来调用协处理器提供的服务: Table.coprocessorService(byte[]) Table.coprocessorService(Class, byte[], byte[],Batch.Call), Table.coprocessorService(Class, byte[], byte[], Batch.Call, Batch.Callback) RegionServer 客户端 Region 1 coprocessorService(row1) Region 2 结果 …….. Region N 中国 HBase 技术社区网站:http://hbase.group

12. 3 EndPoint的客户端实现 客户端 RegionServer coprocessorService(row1) Region 1 插入Map Batch.call Batch.call Region 2 …… …….. Batch.call Region N 结果 Table.coprocessorService(Class, byte[], byte[],Batch.Call), 中国 HBase 技术社区网站:http://hbase.group

13. 3 EndPoint的客户端实现 客户端 RegionServer coprocessorService() Region 1 Batch.call callback Batch.call Region 2 Update callback …… …….. Batch.call callback Region N 结果 Table.coprocessorService(Class, byte[], byte[], Batch.Call, Batch.Callback) 中国 HBase 技术社区网站:http://hbase.group

14. 3 EndPoint的客户端实现 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, S>() { public <R, S, P extends Message, Q extends Message, T extends Message> @Override S sum(final HTable table, final ColumnInterpreter<R, S, P, public S call(AggregateService instance) throws IOException Q, T> ci, { final Scan scan) throws Throwable { ServerRpcController controller = new ServerRpcController(); final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); class SumCallBack implements Batch.Callback<S> { instance.getSum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); S sumVal = null; if (controller.failedOnException()) { public S getSumResult() { throw controller.getFailedOn(); return sumVal; } } if (response.getFirstPartCount() == 0) { return null; } @Override ByteString b = response.getFirstPart(0); public synchronized void update(byte[] region, byte[] row, S result) { T t = sumVal = ci.add(sumVal, result); ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); S s = ci.getPromotedValueFromProto(t); } return s; } } SumCallBack sumCallBack = new SumCallBack(); }, sumCallBack); return sumCallBack.getSumResult(); 中国 HBase 技术社区网站:http://hbase.group }

15. 04 Observer实现二级索引 Remember what should be remembered, and forget what should be forgotten.Remember what should be remembered, and forget what should be forgotten. 中国 HBase 技术社区网站:http://hbase.group

16. 4 Observer实现二级索引 索引的实现方式 基于表的索引,多个索引时每个索引建一个表或者建 组合索引 HBase索引需求 基于列的索引,所有索引存单张表,每个索引字段为 HBase只支持针对与主键key的高性能查询 一个列簇 HBase本身不支持快速的复杂查询和join 索引的实现目标 第三方工具框架 高性能的数据检索 ITHbase,HBase索引的事务行解决方案, 数据的低冗余 0.92版本以后的HBase已经可以通过 数据的一致性 Coprocessor实现其功能 Lily(Hbase indexer),基于HBase和SOLR 实现,能够提供快速的检索和模糊查询 中国 HBase 技术社区网站:http://hbase.group

17. 4 Observer实现二级索引 RegionObserver工作原理 RegionObser ver提供客户端的数据操纵事 件钩子,Get、Put、Delete、Scan,使用 此功能能够解决主表以及多个索引表之间数 据一致性的问题 中国 HBase 技术社区网站:http://hbase.group

18. 4 Observer实现二级索引 • 发挥HBase基于主键查询效率高的特点,添加索引表,把基于索引字段的查询 转换为基于HBase主键的查询 row key column1 column2 …… key101 101 …… …… 业务表 key102 102 …… …… key103 103 …… …… 101 key101 102 key102 Index表 103 key103 基于column1字段的查询先查Index表 中国 HBase 技术社区网站:http://hbase.group

19. 4 Observer实现二级索引 • 索引表的维护 • 索引表的使用 • 通过开发基于Coprocessor的 • 通过Coprocessor的perScan判断是否是scan RegionObserver接口的程序实 索引字段 现对索引表的维护 • 如果走索引,通过Index Table查到UserTable 的key,再去UserTable中查询Value HMaster perScan Scan 判断查询是否走 Coprocessor 索引 RegionServer User Index Put/Delete Table Table 不走索引,直 接scan table User Index postPut/postDelete Table Table 通过索引获 取原表数据 Coprocessor Coprocessor 中国 HBase 技术社区网站:http://hbase.group

20. 5 协处理器主要应用场景 • Observer 允许集群在正常的客户端操作过程中可以有不 同的行为表现 • Endpoint 允许扩展集群的能力,对客户端应用开放新的 运算命令 • Observer 类似于 RDBMS 中的触发器,主要在服务端工作 • Endpoint 类似于 RDBMS 中的存储过程,主要在服务端工 作 • Observer 可以实现权限管理、优先级设置、监控、ddl 控 制、二级索引等功能 • Endpoint 可以实现 min、max、avg、sum、distinct、 group by 等功能 中国 HBase 技术社区网站:http://hbase.group

21. THANK YOU When a cigarette falls in love with a match,it is destined to be hurt.When a cigarette falls in love with a match,it is destined to be hurt. 中国 HBase 技术社区网站:http://hbase.group

22. Me HBase 技术社区公众号 中国 HBase 技术社区网站:http://hbase.group