Pyboot 促进大数据和 AI 融合-铁杰-诚历(二)诚历部分

议题四:
从Python 到Java ,Pyboot加速大数据和AI的融合

郑锴,花名铁杰,阿里巴巴高级技术专家,Apache Hadoop PMC,Apache Kerby 创立者。深耕分布式系统开发和开源大数据多年,目前专注于在阿里云上提供更好用更有弹性的 Hadoop/Spark 大数据平台;

孙大鹏,花名诚历,阿里巴巴计算平台事业部 EMR 技术专家,Apache Sentry PMC,Apache Commons Committer,目前从事开源大数据存储和优化方面的工作;

议题简介:
Python 代表机器学习生态,而以 Hadoop/Spark 为核心的开源大数据则以 Java 为主。前者拥有数不清的算法库和程序,后者承载着海量数据和大量的企业应用。除了 SQL 这个标准方式和各种五花八门的协议接口,还有没有更高效的一手数据通道,将两个生态对接起来,乃至深度融合?Pyboot 是我们在这个方向上的探索。有兴趣的同学欢迎现场观摩演示和技术交流。

展开查看详情

1.Pyboot 促进大数据和 AI 融合 阿里云智能 铁杰、诚历 {zhengkai.zk, dapeng.sdp} [at] alibaba-inc.com

2. 一个趋势 • Github 发布年度报告: Python 超越 Java 成为第二受欢迎的编程语言 • 深层次原因: 云计算兴起,云服务和云化势不可挡 AI 大潮

3. 开源大数据的现状 • 主要核心项目均为 Java/Scala 语言实现 • SQL 是数据透出的标准和主流方式 • Parquet/Orc 数据格式逐渐统一和标准化 • 在多语言和 Python 的支持上参差不齐

4. 机器学习的现状 • 数据科学家习惯以 Python 来编写机器学习程序 • 计算引擎上则是 Python 为壳,C++ 为魂 • 主要是依赖 Posix 文件系统接口来访问数据 • 针对重要存储系统或者是采用专有的 C/C++/Python SDK PySpark HDFS S3 OSS

5. Big Data + AI 的挑战 • 问题:Python 如何与 Java 生态的数据打通? • 在数据交换方面: 缺乏一个核心和广泛的基础支撑 缺乏可维护性,更新不及时 效率低下,关键路径上需要专门去优化 靠多层次/多环节/多进程/额外服务协作来完成,难以排查

6. 我们的思路和原则 • 构建一手的 Python <——> Java 数据通道 Pyboot,赋能 Python 程序和生 态,可直接从 Java 系统拉取和操纵数据 • 基于一个 Java SDK 写出来的 Java 应用程序,可以类似地手写/转换成 Python 程序 • 序列化/RPC 协议/跨进程来交换数据的方式 ——> 进程内地址访问, Python 类型和 Java 类型转换 • 一般情况下性能上接近 Java 调 Java,重要场景下支持 Arrow 集成,完成 大数据集高性能和高可靠交换 • 针对大数据内置计算优化,采取 C++/Cython 实现,利用硬件加速和向量化

7. Pyboot 简而言之 • Pyboot 是什么???Pyboot 是一个 Python 扩展库,一个 pyboot.so 文件 • Pyboot 具有工业级强度,而不是某个 toy • Pyboot 核心代码 C++,壳代码 Cython,优化空间极大 • 允许 Python 程序进程内置 JVM,直接调用 Java 代码 • 允许 Java 程序进程内置 Python 解释器,直接调用 Python 代码

8. Pyboot 功能 • 在 Python 程序里面,如何表达和调用 Java? 支持 Java 基本类型和对应的装箱类型 支持数组,多维数组 支持自定义类型和接口 支持泛型 支持类型强转和类型自动转换

9. Pyboot 功能(cont’d) • 在 Python 程序里面,你可以创建任意特定的 Java 类型,包括泛型推 导出来的 • 调用任意 Java 接口和方法,自动进行参数和返回值类型匹配和转换 • 遵循 Java 强类型检查,类型不一致在 Python side 运行时检查和报错 • 支持对要调用的 Java SDK 打 image 进行封装,简化 Python side 设 置 • 支持管理和调用多个版本的 Java SDK,比如多个 Spark/Hive 版本的 客户端

10. 规划和路径 • 做好基础,内置核心能力,赋能生态 • 先赋能机器学习 Python 程序访问大数据 Java 数据系统 Hadoop Hive Spark HBase • 再赋能机器学习引擎打通 Java 数据系统,形成紧致无缝的 pipeline Spark Tensorflow Pytorch • 开放和开源

11. 现场演示 • Pyboot 赋能大数据,Jupyter Notebook 一站式操作各种组 件

12.Pyboot: Python 调用 Java 代码 import pyboot pyboot.initialize() jboot = pyboot.getJboot() jstr = jboot.String("Hello Pyboot!") jstr = jboot.java.lang.String("Hello Pyboot!")

13. 类型转换 • 基本类型 • Int • Double • Boolean • … • String • Array Path = jboot.importClass("org.apache.hadoop.fs.Path") hdfsDir = Path("hdfs://emr-header-1:9000/") • 泛型

14. 泛型 import java.util.ArrayList; ArrayList aStrList = new ArrayList<String>(); aStrList.add("Hello world!") ArrayList = jboot.java.util.ArrayList StringArrayList = ArrayList.of(jboot.String) aStrList = StringArrayList() aStrList.add("Hello world!")

15. JbootClassLoader & Bundle • Classpath 问题 • 组件多版本 • Jar 包冲突 • Java Classpath -> Jundle 文件 • jboot –bundle hadoop.jundle \ –cp “/usr/lib/hadoop/share/common/*:/usr/lib/hadoop/share/hdfs/*:...”

16. Python + 大数据的问题 • 很多大数据组件没有原生的 Python 库 • 依赖第三方服务 ThriftServer,REST API • 序列化效率

17. Pyboot 读写 HDFS # 初始化HadoopFS conf = jboot.org.apache.hadoop.conf.Configuration() hadoopPath = jboot.importClass("org.apache.hadoop.fs.Path") hdfsPath = hadoopPath("hdfs://emr-header-1:9000/") fs = hdfsPath.getFileSystem(conf) # List 根目录 file_list = fs.listStatus(hdfsPath) for file_status in file_list: print (file_status.getPath()) # 写入hdfs文件 file = hadoopPath("/tmp/hello-pyboot3") outStream = fs.create(file) outStream.write(jboot.String("Hello").getBytes()) … outStream.close()

18. Pyboot 操作 Hive # 初始化 Hive Client hiveConf = jboot.org.apache.hadoop.hive.conf.HiveConf() hiveCli = jboot.org.apache.hadoop.hive.ql.metadata.Hive.get(hiveConf) # 查询 Hive 的数据库 dbs = hiveCli.getAllDatabases() for db in dbs: print("Database : " + str(db))

19. Pyboot 操作 HBase # 初始化 HBase conf = jboot.org.apache.hadoop.hbase.HBaseConfiguration.create(); hConnection = jboot.org.apache.hadoop.hbase.client.HConnectionManager.createConnectio n(conf) # 读取表信息 hTable = hConnection.getTable("tableName") cfs = hTable.getTableDescriptor().getColumnFamilies() for cf in cfs: print(cf)

20. Pyboot 操作 Spark df = spark.read.json("examples/src/main/resources/people.json") df.show() df.select("name").show() df.select(df['name'], df['age'] + 1).show() df.filter(df['age'] > 21).show() df.groupBy("age").count().show()

21.Thanks.