1. 程式人生 > >Getting Streaming data from Kafka with Spark Streaming using Python.

Getting Streaming data from Kafka with Spark Streaming using Python.

Getting Streaming data from Kafka with Spark Streaming using Python.

If you are looking to use spark to perform data transformation and manipulation when data ingested using Kafka, then you are at right place.

In this article, we going to look at Spark Streaming and this is one of several other libraries exposed by Spark Platform.

Spark Streaming provides a way to process unbound data commonly known as streaming data. You can read more at “https://spark.apache.org/docs/latest/streaming-programming-guide.html".

Using Spark we can do processing within-batch and in the batch. The benefit we can get using Spark stream processing is taking an action while an event is occurring.

The use case I am showing here is very simple unbound data read from Kafka topic. Here I just print the message as a simple string.

Preparing the Environment

We need to make sure that packages we use are available to Spark. Instead of downloading jar files and worrying about paths, we can instead use the — packages option and specify the group/artifact/version based on what’s available on Maven and Spark will handle the downloading. We specify PYSPARK_SUBMIT_ARGS for this to get passed correctly when executing from within Python Command shell. There are two options to work with pyspark below.

1. Install pyspark using pip

2. Use findspark library if you have Spark running.

I am choosing option 2 for now as I am running HDP2.6 at my end.

import osimport findsparkfindspark.init('/usr/hdp/2.5.6.0-40/spark')

Import dependencies

We need to import the necessary pySpark modules for Spark, Spark Streaming, and Spark Streaming with Kafka.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

Create Spark context

The Spark context is the primary object under which everything else is called. The setLogLevel call is optional.

sc = SparkContext(appName="PythonSparkStreamingKafka")
sc.setLogLevel("WARN")

Create Streaming Context

We pass the Spark context (from above) along with the batch duration which here is set to 60 seconds.

ssc = StreamingContext(sc,60)

Connect to Kafka

Using the native Spark Streaming Kafka capabilities, we use the streaming context from above to connect to our Kafka cluster.

kafkaStream = KafkaUtils.createStream(ssc, 'victoria.com:2181', 'spark-streaming', {'imagetext':1})

Message Processing

The inbound stream is a DStream, which supports various built-in transformations such as map.

lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

Start the streaming context

Having defined the streaming context, now we’re ready to actually start it! When you run this cell, the program will start, and you’ll see the result of all the pprint functions above appear in the output to this cell below.

ssc.start()  
ssc.awaitTermination()

You can find the full code on My GitHub Repo.