Apache Calcite for Enabling SQL Access to NoSQL Data Systems

Apache Calcite作为一个SQL解析和优化的中间件,提供了很灵活的扩展性去对接不同的底层存储系统。Apache GEODE是一个开源的分布式内存数据库,它在大规模集群下支持低延时和强一致性的事务型操作,本文介绍GEODE如何利用Calcite完成来SQL的操作。

1.Apache Calcite for Enabling SQL Access to NoSQL Data Systems such as Apache Geode Christian Tzolov

2. Whoami Christian Tzolov Engineer at Pivotal, Big-Data, Hadoop, Spring Cloud Dataflow, Apache Geode, Apache HAWQ, Apache Committer, Apache Crunch PMC member ctzolov@pivotal.io blog.tzolov.net twitter: @christzolov https://nl.linkedin.com/in/tzolov Disclaimer This talk expresses my personal opinions. It is not read or approved by Pivotal and does not necessarily reflect the views and opinions of Pivotal nor does it constitute any official communication of Pivotal. Pivotal does not support any of the code shared here. 2

3.Big Data Landscape 2016 •  Volume •  Velocity •  Varity •  Scalability •  Latency •  Consistency vs. Availability (CAP) 3

4.Data Access •  {Old | New} SQL •  Custom APIs –  Key / Value –  Fluent APIs –  REST APIs •  {My} Query Language Unified Data Access? At What Cost? 4

5. SQL? •  Apache Apex •  SQL-Gremlin •  Apache Drill … •  Apache Flink •  Apache Geode •  Apache Hive •  Apache Kylin •  Apache Phoenix •  Apache Samza •  Apache Storm •  Cascading •  Qubole Quark 5

6.Geode Adapter - Overview SQL/JDBC/ODBC Parse SQL, converts into relational expression and Apache Calcite optimizes Push down the relational expressions supported by Geode Spring Data API for Enumerable OQL and falls back to the Calcite interacting with Geode Adapter Enumerable Adapter for the rest Convert SQL relational Spring Data Geode Adapter expressions into OQL queries Geode (Geode Client) Geode API and OQL Geode Server Geode Server Geode Server Data Data Data

7. SQL Relational Expressions SELECT b."totalPrice", c."firstName” FROM "BookOrder" as b INNER JOIN "Customer" as c ON b."customerNumber" = c."customerNumber” WHERE b."totalPrice" > 0; Project (c.firstName, b.totalPrice) (c.firstName, b.totalPrice) Project (on customerNumber) optimize Join (b.totalPrice > 0) Filter (totalPrice, Project customerNumber) (on customerNumber) Join (firstName, Project Filter (totalPrice > 0) customerNumber) Scan Scan Scan Scan Customer [c] BookOrder [b] Customer [c] BookOrder [b] 7

8. Geode Push Down Candidates Relational Operator Geode Support LIMIT YES (without OFFSET) PROJECT YES FILTER YES JOIN For collocated Regions only AGGREGATE YES for GROUP BY, DISTINCT, MAX, MIN, SUM, AVG, COUNT http://bit.ly/2eKApd0 SORT YES 8

9.Apache Geode? “… in-memory, distributed database with strong consistency built to support low latency transactional applications at extreme scale”

10. Why Apache Geode? China Railway 5,700 train stations 7,000 stations 4.5 million tickets per day 72,000 miles of track 20 million daily users 23 million passengers daily 1.4 billion page views per day 120,000 concurrent users 40,000 visits per second 10,000 transactions per minute https://pivotal.io/big-data/case-study/distributed-in-memory-data-management-solution https://pivotal.io/big-data/case-study/scaling-online-sales-for-the-largest-railway-in-the-world-china-railway-corporation 10

11. Apache Geode Features •  In-Memory Data Storage •  Streaming and Event Processing –  Over 100TB Memory –  Listeners –  JVM Heap + Off Heap –  Distributed Functions •  Any Data Format –  Continuous OQL Queries –  Key-Value/Object Store •  Multi-site / Inter-cluster •  ACID and JTA Compliant •  Full Text Search (Lucene indexes) Transactions •  Embedded and Standalone •  HA and Linear Scalability •  Top Level Apache Project •  Strong Consistency 11

12.Apache Geode Concepts Locator – tracks system Client –read and modify the members and provides content of the distributed membership information system Locator (member) Client (member) Listener – event handler. CacheServer – process Registers for one or connected to the distributed more events and notified Cache Server (member) system with created Cache when they occur Cache Listeners Region 1 Ke Val y k1 v1 Region - consistent, distributed Functions k2 v2 Map (key-value), Partitioned or Replicated Functions … – distributed, … concurrent data Cache - In-memory collection processing Region N of Regions

13.Geode Topology Cache Server Cache Server Cache Server Cache Data Cache Data Cache Data Peer-to-Peer Client Local Cache Client-Server pool Cache Server Cache Server Cache Server Cache Data Cache Data Cache Data Cache Server… Cache Server Cache Server Cache Server Cache Data Cache Data Cache Data Cache Data Gateway Receiver Gateway Sender WAN Multi-site Boundary Multi-Site … Gateway Sender Gateway Receiver Cache Data Cache Data Cache Data Cache Data Cache Server Cache Server Cache Server Cache Server

14.Geode Client API •  Client Cache •  Key / Value - Region GET, PUT, REMOVE •  OQL – QueryService

15.Geode Data Types & Serialization •  Key-Value with complex value formats •  Portable Data eXchange (PDX) Serialization – Delta propagation, schema evolution, polyglot support … •  Object Query Language (OQL) { SELECT p.name id: 1, FROM /Person p name: “Fred”, WHERE p.pet.type = “dino” age: 42, pet: { nested fields name: “Barney”, single field deserialization type: “dino” } }

16.Geode Demo (GFSH and OQL) •  Connect to Geode cluster, •  List available Regions •  Run OQL query

17.Apache Calcite? Java framework that allows SQL interface and advanced query optimization, for virtually any data system •  Query Parser, Validator and Optimizer(s) •  JDBC drivers - local and remote •  SQL Streaming •  Agnostic to data storage and processing •  SQL completes vs. NoSQL integrity

18.Calcite Data Types JDBC •  Catalog – namespaces accessed in queries Calcite •  Schema - collection of schemas and tables SQL Engine •  Table - single data set, collection of rows Schema Table Table … Table •  RelDataType – SQL fields types in a Table Data Type Mapping SELECT title, author FROM test.BookMaster Data System Data Types Data Type Fields Schema Table Your Data System

19. Calcite Data Types: RelDataType Type of a scalar expression or row •  RelDataTypeFactory – RelDataType factory •  JavaTypeFactory - registers Java classes as record types •  JavaTypeFactoryImpl - Java Reflection to build RelDataTypes •  SqlTypeFactoryImpl - Default implementation with all SQL types 19

20. Geode to Calcite Data Types Mapping Geode Cache is mapped into Calcite Schema Create Column Types Geode Cache (RelDataType) from Geode Calcite Schema Value class Region 1 (JavaTypeFactoryImpl) Table 1 Col1 Col2 ColN Key Val Row1 V(1,1) V(1,2) V(1,N) k1 v1 Row2 V(2,1) V(2,2) V(2,N) k2 v2 RowM V(M,1) V(M,2) V(M,N) … Geode Key/Value is mapped … into Table Row Region K Table K Regions are mapped into Tables 20

21. Calcite Bootstrap Flow Typical calcite initialization flow Configures Calcite Model (JSON) Creates SchemaFactory Creates Schema Creates Tables 21

22.Calcite Model Model The path to <my-model>.json is passed as JDBC connection argument: SchemaFactory Schema !connect jdbc:calcite:model=target/test-classes/<my-model-path>.json︎ Tables { version: '1.0', defaultSchema: 'TEST', schemas: [ { Schema Name Reference to your adapter name: 'TEST', schema factory implementation type: 'custom', class factory: 'org.apache.calcite.adapter.geode.simple.GeodeSchemaFactory', operand: { locatorHost: 'localhost', locatorPort: '10334', Parameters to be passed to regions: 'BookMaster', your adapter schema factory pdxSerializablePackagePath: 'net.tzolov.geode.bookstore.domain.*' implementation } }] }

23.Geode Calcite Schema and Schema Factory Model public class GeodeSchemaFactory implements SchemaFactory { SchemaFactory public Schema create(SchemaPlus parentSchema, String schemaName, Map<String, Object> operand) { Schema String locatorHost = (String) operand.get(“locatorHost”); Retrieves the parameters set in Tables int locatorPort = … String[] regionNames = … the model.json String pdxPackagePath = … Create an Adapter Schema return new GeodeSchema(locatorHost, locatorPort, regionNames, pdxPackagePath); instance with the provided } parameters. } public class GeodeSchema extends AbstractSchema { private String regionName = .. protected Map<String, Table> getTableMap() { Create GeodeScannableTable final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); instance for each Geode Region Region region = … Get Geode Region by region name … Class valueClass= … Find region’s value type … builder.put(regionName, new GeodeScannableTable(regionName, valueClass, clientCache)); return tableMap; }

24.Geode Scannable Table Model public class GeodeScannableTable extends AbstractTable implements ScannableTable { public RelDataType getRowType(RelDataTypeFactory typeFactory) { Uses reflection (or pdx-instance) SchemaFactory return new JavaTypeFactoryImpl().createStructType(valueClass); to builds RelDataType from value’s Schema } class type Tables Returns an Enumeration over the entire public Enumerable<Object[]> scan(DataContext root) { target data store return new AbstractEnumerable<Object[]>() { public Enumerator<Object[]> enumerator() { return new GeodeEnumerator<Object[]>(clientCache, regionName); } } public class GeodeEnumerator<E> implements Enumerator<E> { private E current; private SelectResults geodeIterator; Defined in the Linq4j sub-project public GeodeEnumerator(ClientCache clientCache, String regionName) { geodeterator = clientCache.getQueryService().newQuery("select * from /" + regionName).execute().iterator(); } Retrieves the entire Region!! public boolean moveNext() { current = convert(geodeIterator.next()); return true;} public E current() {return current;} Converts Geode value response into Calcite row data public abstract E convert(Object geodeValue) { Convert PDX value into RelDataType row }

25.Geode Demo (Scannable Tables) $ ./sqlline ︎ sqlline> !connect jdbc:calcite:model=target/test-classes/model2.json admin admin︎ ︎ jdbc:calcite> !tables ︎ jdbc:calcite> SELECT * FROM "BookMaster”;︎ jdbc:calcite> SELECT "yearPublished", AVG("retailCost") AS “AvgRetailCost” FROM "BookMaster" GROUP BY "yearPublished";︎ jdbc:calcite> SELECT b."totalPrice", c."firstName” FROM "BookOrder" AS b INNER JOIN "Customer" AS c ON b."customerNumber" = c."customerNumber” WHERE b."totalPrice" > 0;︎ ︎ ︎ Without and With Implementation

26. Non-Relational Tables Scanned without intermediate relational expression. •  ScannableTable - can be scanned Enumerable<Object[]> scan(DataContext root); •  FilterableTable - can be scanned, applying supplied filter expressions Enumerable<Object[]> scan(DataContext root, List<RexNode> filters); •  ProjectableFilterableTable - can be scanned, applying supplied filter expressions and projecting a given list of columns Enumerable<Object[]> scan(DataContext root, List<RexNode> filters, int[] projects); 26

27. Calcite Ecosystem Several “semi-independent” projects. Local and Remote Port of LINQ (Language-Integrated Query) JDBC driver to Java. JDBC and Avatica Linq4j Method for translating Expression executable code into data Converts SQL (LINQ/MSN port) queries Into AST SQL Parser & AST Tree (SqlNode …) Complies Java code Relational generated from linq4j Relational Algebra, Interpreter “Expressions”. Part of the expression, •  Relational Expressions physical plan implementer optimizations … •  Row Expression •  Optimization Rules Enumerable Default (In-memory) Data •  Planner … Adapter Store Adapter implementation. Leverages Linq4j 3rd party Adapters 27

28. Calcite SQL Query Execution Flow 1. On new SQL query JDBC delegates to Prepare to prepare the query JDBC execution 2. Parse SQL, convert to rel. 1 Calcite Framework expressions. Validate and Optimize 2 them Prepare Geode Adapter 3. Start building a physical plan from the relation expressions SQL, 3 4. Implement the Geode relations and Relational, 5 Interpreter Enumerable encode them as Expression tree Planner 5. Pass the Expression tree to the 4 6 7 Interpreter to generate Java code 2 Geode 7 6. Generate and Compile a Binder Binder instance that on ‘bind()’ call runs Adapter Geodes’ query method 7 7. JDBC uses the newly compiled Binder to perform the query on the Geode Geode Cluster Cluster 28

29. Linq4j and Expression Tree Block Condition Enumberable For Queryable Goto Node Statement Label + accept(Visitor) QueryProvider + evaluate(Node) Switch (Node) Visitor Throw Expression Try While … Binary Constant MethodCall Parameter … Member 29