1. 程式人生 > >Why Apache Spark is a Crossover Hit for Data Scientists [FWD]

Why Apache Spark is a Crossover Hit for Data Scientists [FWD]

Spark is a compelling multi-purpose platform for use cases that span investigative, as well as operational, analytics.

Data science is a broad church. I am a data scientist — or so I’ve been told — but what I do is actually quite different from what other “data scientists” do. For example, there are those practicing “investigative analytics” and those implementing “operational analytics.” (I’m in the second camp.)

Data scientists performing investigative analytics use interactive statistical environments like R to perform ad-hoc, exploratory analytics in order to answer questions and gain insights. By contrast, data scientists building operational analytics systems have more in common with engineers. They build software that creates and queries machine-learning models that operate at scale in real-time serving environments, using systems languages like C++ and Java, and often use several elements of an enterprise data hub, including the 

Apache Hadoop ecosystem.

And there are subgroups within these groups of data scientists. For example, some analysts who are proficient with R have never heard of Python or scikit-learn, or vice versa, even though both provide libraries of statistical functions that are accessible from a REPL (Read-Evaluate-Print Loop) environment.

A World of Tradeoffs

It would be wonderful to have one tool for everyone, and one architecture and language for investigative as well as operational analytics. If I primarily work in Java, should I really need to know a language like Python or R in order to be effective at exploring data? Coming from a conventional data analyst background, must I understand MapReduce in order to scale up computations? The array of tools available to data scientists tells a story of unfortunate tradeoffs:

  • R offers a rich environment for statistical analysis and machine learning, but it has some rough edges when performing many of the data processing and cleanup tasks that are required before the real analysis work can begin. As a language, it’s not similar to the mainstream languages developers know.
  • Python is a general purpose programming language with excellent libraries for data analysis like Pandas and scikit-learn. But like R, it’s still limited to working with an amount of data that can fit on one machine.
  • It’s possible to develop distributed machine learning algorithms on the classic MapReduce computation framework in Hadoop (see Apache Mahout). But MapReduce is notoriously low-level and difficult to express complex computations in.
  • Apache Crunch offers a simpler, idiomatic Java API for expressing MapReduce computations. But still, the nature of MapReduce makes it inefficient for iterative computations, and most machine learning algorithms have an iterative component.

And so on. There are both gaps and overlaps between these and other data science tools. Coming from a background in Java and Hadoop, I do wonder with envy sometimes: why can’t we have a nice REPL-like investigative analytics environment like the Python and R users have? That’s still scalable and distributed? And has the nice distributed-collection design of Crunch? And can equally be used in operational contexts?

Common Ground in Spark

These are the desires that make me excited about Apache Spark. While discussion about Spark for data science has mostly noted its ability to keep data resident in memory, which can speed up iterative machine learning workloads compared to MapReduce, this is perhaps not even the big news, not to me. It does not solve every problem for everyone. However, Spark has a number of features that make it a compelling crossover platform for investigative as well as operational analytics:

  • Spark comes with a machine-learning library, MLlib, albeit bare bones so far.
  • Being Scala-based, Spark embeds in any JVM-based operational system, but can also be used interactively in a REPL in a way that will feel familiar to R and Python users.
  • For Java programmers, Scala still presents a learning curve. But at least, any Java library can be used from within Scala.
  • Spark’s RDD (Resilient Distributed Dataset) abstraction resembles Crunch’s PCollection, which has proved a useful abstraction in Hadoop that will already be familiar to Crunch developers. (Crunch can even be used on top of Spark.)
  • Spark imitates Scala’s collections API and functional style, which is a boon to Java and Scala developers, but also somewhat familiar to developers coming from Python. Scala is also a compelling choice for statistical computing.
  • Spark itself, and Scala underneath it, are not specific to machine learning. They provide APIs supporting related tasks, like data access, ETL, and integration. As with Python, the entire data science pipeline can be implemented within this paradigm, not just the model fitting and analysis.
  • Code that is implemented in the REPL environment can be used mostly as-is in an operational context.
  • Data operations are transparently distributed across the cluster, even as you type.

Spark, and MLlib in particular, still has a lot of growing to do. For example, the project needs optimizations, fixes, and deeper integration with YARN. It doesn’t yet provide nearly the depth of library functions that conventional data analysis tools do. But as a best-of-most-worlds platform, it is already sufficiently interesting for a data scientist of any denomination to look at seriously.

In Action: Tagging Stack Overflow Questions

A complete example will give a sense of using Spark as an environment for transforming data and building models on Hadoop. The following example uses a dump of data from the popular Stack Overflow Q&A site. On Stack Overflow, developers can ask and answer questions about software. Questions can be tagged with short strings like “java” or “sql“. This example will build a model that can suggest new tags to questions based on existing tags, using thealternating least squares (ALS) recommender algorithm; questions are “users” and tags are “items”.

Getting the Data

Stack Exchange provides complete dumps of all data, most recently from January 20, 2014. The data is provided as a torrent containing different types of data from Stack Overflow and many sister sites. Only the filestackoverflow.com-Posts.7z needs to be downloaded from the torrent.

This file is just a bzip-compressed file. Spark, like Hadoop, can directly read and split some compressed files, but in this case it is necessary to uncompress a copy on to HDFS. In one step, that’s:

1 bzcat stackoverflow.com-Posts.7z | hdfs dfs -put - /user/srowen/Posts.xml

Uncompressed, it consumes about 24.4GB, and contains about 18 million posts, of which 2.1 million are questions. These questions have about 9.3 million tags from approximately 34,000 unique tags.

Set Up Spark

Given that Spark’s integration with Hadoop is relatively new, it can be time-consuming to get it working manually. Fortunately, CDH hides that complexity by integrating Spark and managing setup of its processes. Spark can beinstalled separately with CDH 4.6.0, and is included in CDH 5 Beta 2. This example uses an installation of CDH 5 Beta 2.

This example uses MLlib, which uses the jblas library for linear algebra, which in turn calls native code using LAPACKand Fortran. At the moment, it is necessary to manually install the Fortran library dependency to enable this. The package is called libgfortran or libgfortran3, and should be available from the standard package manager of major Linux distributions. For example, for RHEL 6, install it with:

1 sudo yum install libgfortran

This must be installed on all machines that have been designated as Spark workers.

Log in to the machine designated as the Spark master with ssh. It will be necessary, at the moment, to ask Spark to let its workers use a large amount of memory. The code in MLlib that is used in this example, in version 0.9.0, has amemory issue, one that is already fixed for the next release. To configure for more memory and launch the shell:

1 2 export SPARK_JAVA_OPTS="-Dspark.executor.memory=8g" spark-shell

Interactive Processing in the Shell

The shell is the Scala REPL. It’s possible to execute lines of code, define methods, and in general access any Scala or Spark functionality in this environment, one line at a time. You can paste the following steps into the REPL, one by one.

First, get a handle on the Posts.xml file:

1 val postsXML = sc.textFile("hdfs:///user/srowen/Posts.xml")

In response the REPL will print:

1 postsXML: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12

The text file is an RDD (Resilient Distributed Dataset) of Strings, which are the lines of the file. You can query it by calling methods of the RDD class. For example, to count the lines:

1 postsXML.count

This command yields a great deal of output from Spark as it counts lines in a distributed way, and finally prints18066983.

The next snippet transforms the lines of the XML file into a collection of (questionID,tag) tuples. This demonstrates Scala’s functional programming style, and other quirks. (Explaining them is out of scope here.) RDDs behave like Scala collections, and expose many of the same methods, like map:

(You can copy the source for the above from here.)

You will notice that this returns immediately, unlike previously. So far, nothing requires Spark to actually perform this transformation. It is possible to force Spark to perform the computation by, for example, calling a method like count. Or Spark can be told to compute and persist the result through checkpointing, for example.

The MLlib implementation of ALS operates on numeric IDs, not strings. The tags (“items”) in this data set are strings. It will be sufficient here to hash tags to a nonnegative integer value, use the integer values for the computation, and then use a reverse mapping to translate back to tag strings later. Here, a hash function is defined since it will be reused shortly.

1 2 def nnHash(tag: String) = tag.hashCode & 0x7FFFFF var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))

Now, you can convert the tuples from before into the format that the ALS implementation expects, and the model can be computed:

1 2 3 4 5 import org.apache.spark.mllib.recommendation._ // Convert to Rating(Int,Int,Double) objects val alsInput = postIDTags.map(t => Rating(t._1, nnHash(t._2), 1.0)) // Train model with 40 features, 10 iterations of ALS val model = ALS.trainImplicit(alsInput, 40, 10)

This will take minutes or more, depending on the size of your cluster, and will spew a large amount of output from the workers. Take a moment to find the Spark master web UI, which can be found from Cloudera Manager, and will run by default at http://[master]:18080. There will be one running application. Click through, then click “Application Detail UI”. In this view it’s possible to monitor Spark’s distributed execution of lines of code in ALS.scala:

When it is complete, a factored matrix model is available in Spark. It can be used to predict question-tag associations by “recommending” tags to questions. At this early stage of MLlib’s life, there is not even a proper recommend method yet, that would give suggested tags for a question. However it is easy to define one:     

1 2 3 4 5 6 7 8 def recommend(questionID: Int, howMany: Int = 5): Array[(String, Double)] = {   // Build list of one question and all items and predict value for all of them   val predictions = model.predict(tagHashes.map(t => (questionID,t._1)))   // Get top howMany recommendations ordered by prediction value   val topN = predictions.top(howMany)(Ordering.by[Rating,Double](_.rating))   // Translate back to tags from IDs   topN.map(r => (tagHashes.lookup(r.product)(0), r.rating)) }

And to call it, pick any question with at least four tags, like “How to make substring-matching query work fast on a large table?” and get its ID from the URL. Here, that’s 7122697:

1 recommend(7122697).foreach(println)

This method will take a minute or more to complete, which is slow. The lookups in the last line are quite expensive since each requires a distributed search. It would be somewhat faster if this mapping were available in memory. It’s possible to tell Spark to do this:

1 tagHashes = tagHashes.cache

Because of the magic of Scala closures, this does in fact affect the object used inside the recommend method just defined. Run the method call again and it will return faster. The result in both cases will be something similar to the following:

1 2 3 4 5 (sql,0.17745152481166354) (database,0.13526622226672633) (oracle,0.1079428707621154) (ruby-on-rails,0.06067207312463499) (postgresql,0.050933613169706474)

(Your result will not be identical, since ALS starts from a random solution and iterates.) The original question was tagged “postgresql”, “query-optimization”, “substring”, and “text-search”. It’s reasonable that the question might also be tagged “sql” and “database”. “oracle” makes sense in the context of questions about optimization and text search, and “ruby-on-rails” often comes up with PostgreSQL, even though these tags are not in fact related to this particular question.

Something for Everyone

Of course, this example could be more efficient and more general. But for the practicing data scientists out there — whether you came in as an R analyst, Python hacker, or Hadoop developer — hopefully you saw something familiar in different elements of the example, and have discovered a way to use Spark to access some benefits that the other tribes take for granted.

Learn more about Spark’s role in an EDH, and join the discussion in our brand-new Spark forum.

Sean is Director of Data Science for EMEA at Cloudera, helping customers build large-scale machine learning solutions on Hadoop. Previously, Sean founded Myrrix Ltd, producing a real-time recommender and clustering product evolved from Apache Mahout. Sean was primary author of recommender components in Mahout, and has been an active committer and PMC member for the project. He is co-author of Mahout in Action.

-----