1. 程式人生 > >Large-Scale Machine Learning with Spark on Amazon EMR

Large-Scale Machine Learning with Spark on Amazon EMR

This is a guest post by Jeff Smith, Data Engineer at Intent Media. Intent Media, in their own words: “Intent Media operates a platform for advertising on commerce sites.  We help online travel companies optimize revenue on their websites and apps through sophisticated data science capabilities. On the data team at Intent Media, we are responsible for processing terabytes of e-commerce data per day and using that data and machine learning techniques to power prediction services for our customers.”

Our Big Data Journey

Building large-scale machine learning models has never been simple.  Over the history of our team, we’ve continually evolved our approach for running modeling jobs.

The dawn of big data: Java and Pig on Apache Hadoop

Our first data processing jobs were built on Hadoop MapReduce using the Java API.  After building some basic aggregation jobs, we went on to develop a scalable, reliable

implementation of logistic regression on Hadoop using this paradigm.  While Hadoop MapReduce certainly gave us the ability to operate at the necessary scale, using the Java API resulted in verbose, difficult-to-maintain code.  More importantly, the achievable feature development velocity using the complex Java API was not fast enough to keep up with our growing business. Our implementation of Alternating Direction Method of Multipliers (ADMM) logistic regression on Hadoop consists of several thousand lines of Java code.  As you might imagine, it took months to develop.  Compared with a library implementation of logistic regression that can be imported and applied in a single line, this was simply too large of a time investment.

Around the same time, we built some of our decisioning capabilities in Pig, a Hadoop-specific domain language (DSL).  Part of our motivation for looking into Pig was to write workflows at a higher level of abstraction than the Java Hadoop API allowed. Although we had some successes with Pig, eventually we abandoned it.  Pig was still a young application, and because it is implemented as a DSL, it led to certain inherent difficulties for our team, such as immature tooling. For example, PigUnit, the xUnit testing framework for Pig, was only released in December 2010 and took a while to mature.  For years after its release, it was still not integrated into standard Pig distributions or published as a Maven artifact.  Given our strong TDD culture, we really craved mature tooling for testing. Other difficulties included the impedance mismatch with our codebaase at the time, which was largely Java.

Going functional and logical with Cascalog

Our issues with Pig drove us to re-implement many of these capabilities in Cascalog, a Hadoop DSL written in Clojure and inspired by Datalog.  We found that Cascalog’s more concise logic programming style can be far more declarative than low-level Java Hadoop MapReduce code.  Moreover, by using Cascalog, we were able to write the supporting functionality of our capabilities in Clojure with less of an impedance mismatch problem than with Pig.  As a functional language, Clojure allowed us to write more pure, testable, and composable code.

Despite these advantages, we still found Cascalog lacking as a long-term solution.  As a macro-driven DSL, it was often difficult to reason about its behavior.  Particular composition approaches still led to difficulties with testing isolated components, rather than whole pipelines.  The interoperation with Clojure also proved to be less true in practice than in principle.  Logic programming is a useful paradigm for reasoning about data transformations.  But Cascalog’s version of logic programming simply has different semantics than Clojure’s version of functional programming.  This mismatch made it hard to develop good patterns for combined Clojure/Cascalog applications.

Questioning Hadoop MapReduce

By this time, we had also realized that Hadoop MapReduce’s approach to data processing was not always a good fit for our use case.  By writing to disk after each step, Hadoop ensures fault-tolerance but incurs a significant runtime cost.  Because we use Amazon Elastic MapReduce (Amazon EMR), we largely did not need most of this fault-tolerance.  Our EMR clusters were being spun up dynamically.  The outputs were stored in S3 and an application database, so persistence using HDFS after a job was completed was not necessary.  Also, Hadoop MapReduce’s execution model was simply not a great fit for the highly-iterative machine learning algorithms that we were trying to implement.

We realized that we wanted a data processing platform suited for iterative computations, ideally with some understanding of machine learning out of the box.  It needed to have a high-level API that allowed us to compose our applications using functional programming idioms in a robust production language.  There is, in fact, such a platform; it’s called Spark.

Success with Spark

Apache Spark  was originally developed at UC Berkeley explicitly for the use case of large-scale machine learning.  Early in Spark’s development, the team realized that Spark could be a general data processing platform, so they carved out different pieces of functionality into separate subprojects, all relying on common facilities provided by Spark Core.  The machine learning capabilities became a library called MLlib, and there are libraries for streaming, SQL, and graph processing as well.

Performance

Compared to Hadoop, Spark is much better suited for building large-scale machine learning problems.  By maintaining and reasoning about the execution’s directed acyclic graph (DAG), Spark can figure out when to cache data in memory.  This and other features allow it to be up to 100 times faster than Hadoop for some workflows.  In our experience, it we have seen an order of magnitude of performance improvement before any tuning.

Loving our code again

Beyond better performance, the developer experience when using Spark is much better than when developing against Hadoop.  Spark’s Scala, Java, and Python APIs are famously well-conceived and provide a functional programming data model that is declarative and high-level.  At first, we wrote our jobs against the Java API using a Clojure DSL called flambo.  The usability and functional style of Spark’s API kept flambo’s design very simple, allowing us to extend the library’s functionality where we needed greater access to Spark and MLlib’s capabilities.

More recently, we’ve been exploring Scala for writing some of our Spark jobs.  By removing the intermediate layer of flambo, we get even simpler code and can adopt new Spark features as soon as they’re released.  We expect the transition to be smooth and easy, due to the high quality and clarity of our existing Clojure codebase that we use to build Spark jobs.  This is largely a testament to the power of Spark’s resilient distributed dataset abstraction and the functional programming model it allows. Scala, like Clojure, is an excellent functional programming language that allows us to write clean, testable code.

We just released an open source library for composing pipelines called Mario, which the example below will use. It is motivated by our experiences with Spark and our desire to compose pipelines in a functional, declarative style, similar to the way Spark programs are written.  For more details about Mario, see the launch post on the Intent Media blog.

Algorithms for free

Another significant advantage of using Spark as a platform has been getting access to scalable library implementations of common machine learning algorithms via MLlib.  In the past, we had to implement every machine learning algorithm that we wanted to employ because they were not readily available for running jobs at scale with Hadoop.  With MLlib and Spark, many learning algorithms are already available, and a vibrant open source community is actively improving them and adding new algorithms.

Enabling Services

Some people might be overwhelmed by the number of changes that we’ve made to our data platform in just a few years.  We’re proud of our ability to iterate rapidly and find new solutions.  It’s part of the fun at a fast-moving startup.

A big reason why we’ve been able to continually refine our approaches to machine learning at scale is that we use Amazon Web Services (AWS) to provide us with the infrastructure we need to execute on our ambitions.  All of our applications have always been deployed on Amazon EC2.  All of our raw data is stored on Amazon S3.  We use Amazon DynamoDB to store the data for our user identification system.

Iterating and shipping using Amazon EMR

Perhaps most importantly, all of our large-scale data processing jobs are executed on EMR.  When we started using Hadoop with EMR, we were able to focus on the higher-level problems of data processing and modeling, rather than creating and maintaining Hadoop clusters.  This allowed us to rapidly and broadly explore all of the approaches discussed above.  After learning that Hadoop would not be a long-term solution for our modeling workflows, we used EMR to get up and running quickly on the Spark platform.

By being able to construct on-demand clusters programmatically that auto-terminate on completion, we’ve been able to use ephemeral clusters for all our data jobs. For much of the day, we can have very few data processing clusters running at any given time.  But periodically, we spin up many large clusters via EMR that train all of the models that we need to learn.  This usage pattern is neither harder to implement nor more expensive than a serial execution of all of our jobs and matches our preferred workflow much better.  For our usage pattern, this actually represents a large cost savings over a persistent cluster.

Spark on EMR can also read and write directly to S3 using EMRFS.  This allowed us to continue to use S3 as our persistent data store as we had done in our Hadoop workflows.

Similar to the advantages we get from Spark and MLlib, there is a huge advantage for a startup like ours to pick up tooling advances made by other teams.  This frees up time that we would have otherwise spent building this functionality in-house.  In the case of EMR, we don’t need to worry about finding the time to implement cluster termination, Hadoop installation, cluster-level monitoring, or a cluster management interface.  By giving us a web console that allows us to manage our clusters, EMR makes it much easier to get everyone up to speed on the status of their jobs and to show people how to debug jobs as they are developed.  The simplicity of creating and managing clusters via the web interface allows data analysts to use Hadoop and Spark on EMR clusters for ad hoc analyses without needing deep knowledge in cluster creation or management.

A Sample Workflow

Below is an example of a simplified version of the sort of workflow we run many times every day on EMR.  It is a basic model learning pipeline, starting with ingesting previously learned features and ending with persisting a learned model.

Machine learning workflow with Spark

This example builds on our initial prototype workflow developed last year, as well as the examples in the Spark programming guide.  Additionally, this example uses the Mario library that we developed to compose our pipelines in a type-safe and declarative style using functional idioms.

In our production pipeline, we extract features from the raw data collected by our applications and stored on S3.  These features are semantically meaningful, derived representations of our raw data.  They are the input to our model learning algorithm.   Our production machine learning pipeline extracts hundreds of non-trivial features, but this example simply uses arbitrary identifiers to stand in for real features.

In this example, we begin with defining a function to load the feature data from S3.  This function is used to load both the training and testing datasets.

def loadFeatures(inputPath: String) = MLUtils.loadLibSVMFile(sc, inputPath)

The example files use LibSVM format, a common format for storing features.  MLlib comes with utilities that understand this format and can parse it into an RDD of LabeledPoints.  It’s also worth noting that LibSVM is a sparse format.  Having native support for a common sparse format is tremendously valuable for us.  In previous versions of our pipeline, we wrote a lot of code that handled transformations between dense and sparse formats for different libraries.

Because the loading of training and testing data does not rely on any upstream dependencies, these two steps can be started concurrently.  Spark is able to determine this through its understanding of the DAG of the job.  Also, the Mario library provides similar guarantees around concurrent execution for all operations defined in the pipeline, even if those operations are not Spark operations (e.g., retrieving data from a database).

Next, you define a function to learn a model from the features in the training set.  This is one of the key advantages we picked up in our transition to Spark and MLlib.  All of the model learning functionality comes from the library implementation.

def learnModel(trainingData: RDD[LabeledPoint]) = new LogisticRegressionWithLBFGS()
  .setNumClasses(2)
  .run(trainingData)

Then you define a function to evaluate that model over a test set.  This allows you to see how the model can be expected to perform in live usage.  In our production workflow, it also allows us to set a threshold for classification based on arbitrary parameters that come from our business use case.

def predictAndLabel(testingData: RDD[LabeledPoint], model: LogisticRegressionModel) =
testingData.map { case LabeledPoint(label, features) =>
  val prediction = model.predict(features)
  (prediction, label)
}

You can define a function to see some of those basic statistics around model performance using built-in functionality from MLlib:

def metrics(predictionsAndLabels: RDD[(Double, Double)]) = new MulticlassMetrics(predictionsAndLabels)

Then, you define functions to persist the model and the performance statistics to disk (S3):

def saveModel(outputPath: String)(model: LogisticRegressionModel) = {
  val modelPath = outputPath + "/model"
  model.save(sc, modelPath)
}

def saveMetrics(outputPath: String)(metrics: MulticlassMetrics) = {
  val precision = metrics.precision
  val recall = metrics.recall
  val metricsPath = outputPath + "/metrics"
  val metricsStringRDD = sc.parallelize(List(precision, recall))
  metricsStringRDD.saveAsTextFile(metricsPath)
}

Then you compose all of these steps into a single pipeline:

for {
  trainingDataStep <- pipe(loadFeatures(trainingDataPath))
  testingDataStep <- pipe(loadFeatures(testingDataPath))

  modelStep <- pipe(learnModel, trainingDataStep)

  predictionsAndLabelsStep <- pipe(predictAndLabel, testingDataStep, modelStep)

  metricsStep <- pipe(metrics, predictionsAndLabelsStep)
  saveMetricsStep <- pipe(saveMetrics(currentOutputPath), metricsStep)

  saveModelStep <- pipe(saveModel(currentOutputPath), modelStep)

} yield modelStep.runWith(saveMetricsStep, saveModelStep)

Those steps that do not yield a result and simply have a side effect (writing to disk) are specified in the call to the runWith method.  The composed pipeline yields the learned model as its return value.

In our production workflow, we load some of this persisted data into an application database for use in runtime model serving.  Other data about the model learning pipeline’s performance is loaded to an analytics database for reporting and analytics tasks.

This job must be compiled into a JAR to be provided to your EMR cluster.  Do this using sbt:

sbt assembly

After you have a JAR, you can push it to S3.  The following AWS CLI command copies the JAR to S3:

aws s3 cp spark-emr/target/scala-2.10/spark-emr-assembly-1.0.jar s3://your-bucket-name/$USER/spark/jars/spark-emr-assembly-1.0.jar

Then, start up an EMR cluster that executes the Spark job you just wrote using the AWS CLI:

aws emr create-cluster 
  --name "exampleJob" 
  --ec2-attributes KeyName=MyKeyName 
  --auto-terminate 
  --ami-version 3.8.0 
  --instance-type m3.xlarge 
  --instance-count 3 
  --log-uri s3://your-bucket-name/$USER/spark/`date +%Y%m%d%H%M%S`/logs 
  --applications Name=Spark,Args=[-x] 
  --steps "Name="Run Spark",Type=Spark,Args=[--deploy-mode,cluster,--master,yarn-cluster,--conf,spark.executor.extraJavaOptions=-XX:MaxPermSize=256m,--conf,spark.driver.extraJavaOptions=-XX:MaxPermSize=512m,--class,ModelingWorkflow,s3://your-bucket-name/$USER/spark/jars/spark-emr-assembly-1.0.jar,s3://support.elasticmapreduce/bigdatademo/intentmedia/,s3://your-bucket-name/$USER/spark/output/]"

After the job finishes, the cluster terminates automatically.  You can adjust the size of the nodes and the size of the cluster by just changing the InstanceCount and InstanceType arguments to suit your workload.  You can see the full code for this example in the AWS Big Data Blog GitHub Repo.

Wrapping Up

This post has given you an overview of how Intent Media has evolved our data platform through a variety of different approaches.  We’ve found great success using popular open source frameworks like Spark and MLlib to learn models at massive scale.  The advantages of using these tools are further amplified by relying on AWS and EMR, specifically, to create and manage our clusters.  The combination of these approaches has enabled us to move quickly and scale with our technology alongside our rapidly expanding business.

If you have questions or suggestions, please leave a comment below.

——————————

More posts about Machine Learning:

———————————-

Love to work on open source? Check out EMR’s careers page.

相關推薦

Large-Scale Machine Learning with Spark on Amazon EMR

This is a guest post by Jeff Smith, Data Engineer at Intent Media. Intent Media, in their own words: “Intent Media operates a platform for adverti

TensorFlow: Large-Scale Machine Learning on Heterogeneous Distributed Systems

1 Introduction 介紹背景:谷歌的計算平臺計劃等。 2 Programming Model and Basic Concepts  graph: 基本的計算單元是由很多節點(nodes )與邊組成的有向圖(graph) 。 每個節點有對應的operation,有0或多個輸入,0或多個輸出,以及

Ng第十七課:大規模機器學習(Large Scale Machine Learning)

在線 src 化簡 ima 機器學習 learning 大型數據集 machine cnblogs 17.1 大型數據集的學習 17.2 隨機梯度下降法 17.3 微型批量梯度下降 17.4 隨機梯度下降收斂 17.5 在線學習 17.6 映射化簡和數據並行

Coursera-吳恩達-機器學習-第十週-測驗-Large Scale Machine Learning

本片文章內容: Coursera吳恩達機器學習課程,第十週 Large Scale Machine Learning 部分的測驗,題目及答案截圖。 1.cost increase ,說明資料diverge。減小learning rate。 stochastic不需要每步都是減

機器學習系列之coursera week 10 Large Scale Machine Learning

目錄 1. Gradient Descent with Large Datasets 1.1 Learning with large datasets Learn with large datasets: m = 100,000,000

spark機器學習 原始碼 Machine Learning With Spark source code

@rover這個是C++模板 --胡滿超 stack<Postion> path__;這個裡面 ”<> “符號是什麼意思?我在C++語言裡面沒見過呢? 初學者,大神勿噴。

Large-scale Graph Mining with Spark

Graphs 101A graph is a data structure for representing pairwise relationships between objects. Graphs are comprised of nodes (also called vertices) and edg

Machine Learning with GPUs on vSphere

Performance of Machine Learning workloads using GPUs is by no means compromised when running on vSphere. In fact, you can often achieve better aggregate pe

【原】Coursera—Andrew Ng機器學習—課程筆記 Lecture 17—Large Scale Machine Learning 大規模機器學習

Lecture17 Large Scale Machine Learning大規模機器學習 17.1 大型資料集的學習 Learning With Large Datasets 如果有一個低方差的模型, 通常通過增加資料集的規模,可以獲得更好的結果。 但是如果資料集特別大,則首先應該檢查這麼大規模是否真

coursera Machine Learning 第十週 測驗quiz答案解析Large Scale Machine Learning

1.選擇:D 解析:由於代價函式上升了,所以應該減少學習速率,選擇D 2.選擇:BC 解析:A並不需要代價函式總是減少,可能會降低故錯誤。B在執行隨機梯度下降演算法前最好將樣本打亂隨機化,正確。C也就隨機的優點正確。D並行可不是隨機的優點,是對映約減的優點,故錯

【學習筆記】【Coursera】【MachineLearning】Large scale machine learning

1、背景 大量的資料勝過最好的演算法 低偏差/高方差的學習演算法 + 大量資料 = 一個高效的機器學習系統 模型高偏差時,選擇增加特徵或新增隱藏神經元更有效 產生問題:計算量太大 梯度下降演算法需要計算微分項(對所有m個訓練樣本求和) 2、梯度下

Machine Learning week 10 quiz: Large Scale Machine Learning

Large Scale Machine Learning 5 試題 1.  Suppose you are training a logistic regres

Apache Spark on Amazon EMR

Apache Spark includes several libraries to help build applications for machine learning (MLlib), stream processing (Spark Streaming), and graph p

OReilly.Hands-On.Machine.Learning.with.Scikit-Learn.and.TensorFlow學習筆記彙總

其中用到的知識點我都記錄在部落格中了:https://blog.csdn.net/dss_dssssd 第一章知識點總結: supervised learning k-Nearest Neighbors Linear Regression

Hands-on Machine Learning with Scikit-Learn and TensorFlow(中文版)和深度學習原理與TensorFlow實踐-學習筆記

監督學習:新增標籤。學習的目標是求出輸入與輸出之間的關係函式y=f(x)。樸素貝葉斯、邏輯迴歸和神經網路等都屬於監督學習的方法。 監督學習主要解決兩類核心問題,即迴歸和分類。 迴歸和分類的區別在於強調一個是連續的,一個是離散的。 非監督學習:不新增標籤。學習目標是為了探索樣本資料之間是否

二、《Hands-On Machine Learning with Scikit-Learn and TensorFlow》一個完整的機器學習專案

  本章中,你會假裝作為被一家地產公司剛剛僱傭的資料科學家,完整地學習一個案例專案。 下面是主要步驟: 1. 專案概述。 2. 獲取資料。 3. 發現並可視化資料,發現規律。 4. 為機器學習演算法準備資料。 5. 選擇模型,進行訓練。 6. 微調模型。 7. 給出解決方案。 8. 部

《Hands-On Machine Learning with Scikit-Learn & TensorFlow》讀書筆記 第一章 機器學習概覽

一、機器學習概覽  為什麼使用機器學習? 機器學習善於: 需要進行大量手工調整或需要擁有長串規則才能解決的問題:機器學習演算法通常可以簡化程式碼、提高效能。 問題複雜,傳統方法難以解決:最好的機器學習方法可以找到解決方案。 環境有波動:機器學習演算法可以適

Hands on Machine Learning with Sklearn and TensorFlow學習筆記——機器學習概覽

 一、什麼是機器學習?   計算機程式利用經驗E(訓練資料)學習任務T(要做什麼,即目標),效能是P(效能指標),如果針對任務T的效能P隨著經驗E不斷增長,成為機器學習。【這是湯姆米切爾在1997年定義】   大白話:類比於學生學習考試,你先練習一套有一套的模擬卷 (這就相當於訓練資料),在這幾

How to Scale Machine Learning Data From Scratch With Python

Tweet Share Share Google Plus Many machine learning algorithms expect data to be scaled consiste

Machine Learning with Data Lake Foundation on AWS

The Machine Learning with Data Lake Foundation on Amazon Web Services (AWS) solution integrates with a variety of AWS services to provide a fully