基于 Apache Spark的现代硬件无机器学习

在这次谈话中,我首先提供了一个详细的性能故障从机器学习工作量使用Apache Spark。展示了如何使用远程存储来交换任务的中间状态(例如模型更新或广播消息)以及性能开销。稍后,将使用部署在高性能集群(100Gbps网络、NVMe Flash等)上的Apache Spark和Apache Crail说明相同的工作负载如何在内部执行。无服务器计算简化了机器学习应用程序的部署。
展开查看详情

1.Serverless Machine Learning on Modern Hardware Patrick Stuedi, Michael Kaufmann, Adrian Schuepbach IBM Research #Res6SAIS

2.Serverless Computing ● No need to setup/manage a cluster ● Automatic, dynamic and fine- grained scaling ● Sub-second billing ● Many frameworks: AWS Lambda, Google Cloud Functions, Azure Functions, Databricks Serverless, etc.

3. Challenge: Performance Example: Sorting 100GB 500 Runtime [seconds] 400 300 200 100 0 AWSAWS Lambda λ Spark Serverless Spark Databricks Cloud Databricks On-Premise Spark On-Premise Spark Serverless Standard On-premise On-premise++ Increasing flexibility Increasing performance Spark/On-Premise++: Running Apache Spark on a High-Performance Cluster using RDMA and NVMe Flash, Spark Summit’17

4. Challenge: Performance Example: Sorting 100GB 64 lambda 500 workers Runtime [seconds] 400 300 200 100 0 AWSAWS Lambda λ Spark Serverless Spark Databricks Cloud Databricks On-Premise Spark On-Premise Spark Serverless Standard On-premise On-premise++ Increasing flexibility Increasing performance Spark/On-Premise++: Running Apache Spark on a High-Performance Cluster using RDMA and NVMe Flash, Spark Summit’17

5. Challenge: Performance Example: serverless cluster Sorting 100GB with autoscaling min machines: 1 64 lambda 500 max machines: 8 workers Runtime [seconds] 400 300 200 100 0 AWSAWS Lambda λ Spark Serverless Spark Databricks Cloud Databricks On-Premise Spark On-Premise Spark Serverless Standard On-premise On-premise++ Increasing flexibility Increasing performance Spark/On-Premise++: Running Apache Spark on a High-Performance Cluster using RDMA and NVMe Flash, Spark Summit’17

6. Challenge: Performance Example: serverless cluster Sorting 100GB with autoscaling min machines: 1 64 lambda 500 max machines: 8 workers Runtime [seconds] standard cluster 400 no autoscaling 300 8 machines 200 100 0 AWSAWS Lambda λ Spark Serverless Spark Databricks Cloud Databricks On-Premise Spark On-Premise Spark Serverless Standard On-premise On-premise++ Increasing flexibility Increasing performance Spark/On-Premise++: Running Apache Spark on a High-Performance Cluster using RDMA and NVMe Flash, Spark Summit’17

7. Challenge: Performance Example: serverless cluster Sorting 100GB with autoscaling min machines: 1 64 lambda 500 max machines: 8 workers Runtime [seconds] standard cluster 400 no autoscaling 300 8 machines 200 100Gb/s Ethernet 100 0 AWSAWS Lambda λ Spark Serverless Spark Databricks Cloud Databricks On-Premise Spark On-Premise Spark Serverless Standard On-premise On-premise++ Increasing flexibility Increasing performance Spark/On-Premise++: Running Apache Spark on a High-Performance Cluster using RDMA and NVMe Flash, Spark Summit’17

8. Challenge: Performance Example: serverless cluster Sorting 100GB with autoscaling min machines: 1 64 lambda 500 max machines: 8 workers Runtime [seconds] standard cluster 400 no autoscaling 300 8 machines 200 100Gb/s Ethernet 100 RDMA, NVMe flash, 0 AWSAWS Lambda NVMeF λ Spark Serverless Spark Databricks Cloud Databricks On-Premise Spark On-Premise Spark Serverless Standard On-premise On-premise++ Increasing flexibility Increasing performance Spark/On-Premise++: Running Apache Spark on a High-Performance Cluster using RDMA and NVMe Flash, Spark Summit’17

9.Why is it so hard? ● Scheduler: when to best add/remove resources? put your #assignedhashtag here by setting the footer in view-header/footer ● Container startup: may have to dynamically spin up containers ● Storage: input data needs to be fetched from remote storage (e.g., S3) – As opposed to compute-local storage such as HDFS ● Data sharing: intermediate needs to be temporarily stored on remote storage (S3, Redis) – Affects operations like shuffle, broadcast, etc.,

10.Why is it so hard? ● Scheduler: when to best add/remove resources? put your #assignedhashtag here by setting the footer in view-header/footer ● Container startup: may have to dynamically spin up containers ● Storage: input data needs to be fetched from remote storage (e.g., S3) – As opposed to compute-local storage such as HDFS ● Data sharing: intermediate needs to be temporarily stored on remote storage (S3, Redis) – Affects operations like shuffle, broadcast, etc.,

11.Example: MapReduce (Cluster) Compute & Store Nodes Map Stage data is mostly Compute & written and Store Nodes read locally Reduce Stage Compute & Store Nodes

12. Example: MapReduce (Serverless) Dynamically growing/shrinking data is compute cloud exclusively Map written and Stage read remotely Shuffle Reduce Stage Storage Service (e.g, S3, Redis)

13. I/O Overhead: Sorting 100GB 500 Shuffle I/O Runtime [seconds] Compute 60 400 Input/Output 50 300 60% 32% 40 200 30 18% 49% 100 20 22% 38% 0 10 3% AWS AWS λ Lambda Databricks Spark ServerlessDatabricks Spark Cloud Spark Spark Cluster SparkHPC Spark 19% 59% Serverless Standard On-premise On-premise++ 0 Spark Spark Cluster SparkHPC Spark On-premise On-premise++ Shuffle overheads are significantly higher when intermediate data is stored remotely

14.What about other workloads? Example: SQL, Query 77 / TPC-DS benchmark

15.What about other workloads? Example: SQL, Query 77 / TPC-DS benchmark Shuffle/Broadcast (needs to be stored remotely)

16. What about other workloads? Example: Iterative ML (e.g., linear regression) could be co-located with worker nodes W W PS W W *) read training data *) fetch model params *) compute *) update model *) use cached data *) fetch model params *) compute *) update model

17. What about other workloads? Example: Iterative ML (e.g., linear regression) could be co-located with worker nodes W W PS W W *) read training data *) fetch model params *) compute *) update model *) use cached data *) fetch model params *) compute *) update model

18. What about other workloads? Needs to be Example: Iterative ML (e.g., linear regression) could be co-located with worker nodes remote W W PS W W W scale *) read training data *) fetch model params out *) compute *) update model read remote data *) use cached data *) fetch model params *) compute *) update model

19. What about other workloads? Needs to be Example: Iterative ML (e.g., linear regression) could be co-located with worker nodes remote W W PS W W W scale *) read training data *) fetch model params out *) compute *) update model read remote data *) use cached data *) fetch model params *) compute *) update model barrier, need to wait

20.Can we.. ● ..use Spark to run such workloads in a serverless fashion? – Dynamic scaling of compute nodes while jobs are running – No cluster configuration – No startup time overhead ● ..eliminate the performance overheads? – Workloads should run as fast as on a dedicated cluster

21.Design Options Scheduling: put your #assignedhashtag here by setting the footer in view-header/footer ● –1 Use serverless framework to schedule executors –2 Use serverless framework to schedule tasks –3 Enable sharing of executors among different applications ● Intermediate data: –1 Executors cooperate with scheduler to flush data remotely –2 Consequently store all intermediate state remotely

22.Design Options High startup Scheduling: put your #assignedhashtag here by setting the footer in view-header/footer ● Latency! –1 Use serverless framework to schedule executors –2 Use serverless framework to schedule tasks –3 Enable sharing of executors among different applications ● Intermediate data: –1 Executors cooperate with scheduler to flush data remotely –2 Consequently store all intermediate state remotely

23.Design Options High startup Scheduling: put your #assignedhashtag here by setting the footer in view-header/footer ● Latency! –1 Use serverless framework to schedule executors Slow! –2 Use serverless framework to schedule tasks –3 Enable sharing of executors among different applications ● Intermediate data: –1 Executors cooperate with scheduler to flush data remotely –2 Consequently store all intermediate state remotely

24.Design Options High startup Scheduling: put your #assignedhashtag here by setting the footer in view-header/footer ● Latency! –1 Use serverless framework to schedule executors Slow! –2 Use serverless framework to schedule tasks –3 Enable sharing of executors among different applications ● Intermediate data: –1 Executors cooperate with scheduler to flush data remotely –2 Consequently store all intermediate state remotely

25.Design Options High startup Scheduling: put your #assignedhashtag here by setting the footer in view-header/footer ● Latency! –1 Use serverless framework to schedule executors Slow! –2 Use serverless framework to schedule tasks –3 Enable sharing of executors among different applications ● Intermediate data: Complex! –1 Executors cooperate with scheduler to flush data remotely –2 Consequently store all intermediate state remotely

26.Design Options High startup Scheduling: put your #assignedhashtag here by setting the footer in view-header/footer ● Latency! –1 Use serverless framework to schedule executors Slow! –2 Use serverless framework to schedule tasks –3 Enable sharing of executors among different applications ● Intermediate data: Complex! –1 Executors cooperate with scheduler to flush data remotely –2 Consequently store all intermediate state remotely

27.Architecture Overview send schedule register HCS/ Driver send job DAG Scheduler Intermediate data launch assign task application register register Metadata Storage Intermediate Executor server server data crail.apache.org

28.Architecture Overview send schedule register HCS/ Driver send job DAG Scheduler Intermediate data launch assign task application register register Metadata Storage Intermediate Executor server server data crail.apache.org

29.Architecture Overview send schedule register HCS/ Driver send job DAG Scheduler Intermediate data launch assign task application register register Metadata Storage Intermediate Executor server server data crail.apache.org