sparkStreaming整合Kafka
阿新 • • 發佈:2019-01-06
這幾天看了spark整合Kafka,消費Kafka資料並向Kafka傳送資料,仿照官方樣例寫了兩個小例子。在此分享一下。
- 1.新增Kafka的repository
- 2.DirectKafkaWordCountDemo程式碼展示
- 3.kafkaProducer程式碼展示
- 4.從Kafka 叢集中消費資料並處理後再存入Kafka程式碼展示
本案例中使用的Kafka為三個broker一個zookeeper的Kafka叢集。
本案例中原始訊息體為“message: 我是第 n 條資訊“
1.新增Kafka的repository
spark中整合Kafka需要引入Kafka的repository,在pom檔案中新增如下依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
2.DirectKafka程式碼展示
該段程式碼,從 Kafka叢集中消費原始“message: 我是第n條資訊“並把資料進行截斷,過濾處理,輸出為“我是第n條資訊“
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by yangyibo on 16/12/1.
*/
object DirectKafkaWordCountDemo {
def main(args: Array[String]) {
val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")
//此處在idea中執行時請保證local[2]核心數大於2
sprakConf.setMaster("local[2]")
val ssc = new StreamingContext(sprakConf, Seconds(3))
val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092";
val topics = "abel";
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" ")).filter(!_.equals("message:"))
val wordCounts = words.map(x=>(x, 1l)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
3.kafkaProducer程式碼展示
此段程式碼吧array 中的字串當作資料傳送到Kafka叢集中。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
/**
* Created by yangyibo on 16/11/29.
*/
object KafkaProducerDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaProducer")
val sc = new SparkContext(conf)
val array = ArrayBuffer("one","tow","three")
kafkaProducer(array)
}
def kafkaProducer(args: ArrayBuffer[String]) {
if (args != null) {
val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092"
// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val topic = "abel2"
// Send some messages
for (arg <- args) {
println(arg + "----------我已經發送")
val message = new ProducerRecord[String, String](topic, null, arg)
producer.send(message)
}
Thread.sleep(500)
producer.close()
}
}
}
4.從Kafka 叢集中讀取資料處理後再存入Kafka程式碼展示
此段程式碼是sparkStreaming集合Kafka的消費和傳送資料。從Kafka叢集中消費原始資料“message: 我是第n條資訊“將原始資料處理後為“message: 我是第n條資訊--read“並將處理後的資料傳送到Kafka叢集的另外一個topic中。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf}
/**
* Created by yangyibo on 16/11/28.
*/
object KafkaWordCountDemo {
private val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092"
// Zookeeper connection properties
private val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
private val producer = new KafkaProducer[String, String](this.props)
def main(args: Array[String]): Unit = {
run()
}
def run(): Unit = {
val zkQuorum = "192.168.100.48:2181"
val group = "spark-streaming-test"
val topics = "abel"
val numThreads = 1
val sparkConf = new SparkConf().setAppName("KafkaWordCountDemo")
sparkConf.setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val array = ArrayBuffer[String]()
lines.foreachRDD(rdd => {
val count = rdd.count().toInt;
rdd.take(count + 1).take(count).foreach(x => {
array += x + "--read"
})
kafkaProducerSend(array)
array.clear()
})
ssc.start()
ssc.awaitTermination()
}
def kafkaProducerSend(args: ArrayBuffer[String]) {
if (args != null) {
val topic = "abel2"
// Send some messages
for (arg <- args) {
println(arg + "----------我已經讀取")
val message = new ProducerRecord[String, String](topic, null, arg)
producer.send(message)
}
Thread.sleep(500)
}
}
}