kafka+spark streaming程式碼例項(pyspark+python)
阿新 • • 發佈:2019-01-07
一、系統準備
1.啟動zookeeper:bin/zkServer.cmd start
2.啟動kafka:bin/kafka-server-start.sh -daemon config/server.properties
3.啟動spark:sbin/start-all.sh
資料來源:http://files.grouplens.org/datasets/movielens/ml-100k.zip
流程:kafka讀取user資料集並生產資料流——spark streaming 計算每個職業人數——計算結果存入MySQL
二、kafka讀取user資料集並生產資料流,1秒生產1條記錄。
先建立topic:
bin/kafka-topics.sh --create --zookeeper 192.168.26.247:2181 --replication-factor2 --partitions 1 --topic txt
驗證topic:bin/kafka-topics.sh --list --zookeeper 192.168.26.247:2181
bin/kafka-topics.sh --describe --zookeeper192.168.26.247:2181 --topic txt
儲存txt.py執行結果如下:from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError import time def main(): ##生產模組 producer = KafkaProducer(bootstrap_servers=['192.168.26.247:9092']) with open('/home/hadoop/ml-100k/u.user','r') as f: for line in f.readlines(): time.sleep(1) producer.send("txt",line) print line #producer.flush() if __name__ == '__main__': main()
spark streaming 消費並計算資料,並將結果存入資料庫。
三、向叢集submitfrom pyspark import SparkContext from pyspark import SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition import MySQLdb def start(): sconf=SparkConf() sconf.set('spark.cores.max',3) sc=SparkContext(appName='txt',conf=sconf) ssc=StreamingContext(sc,5) brokers ="192.168.26.247:9092,192.168.26.246:9092" topic='txt' start = 70000 partition=0 user_data = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list":brokers}) #fromOffsets 設定從起始偏移量消費 #user_data = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list":brokers},fromOffsets={TopicAndPartition(topic,partition):long(start)}) user_fields = user_data.map(lambda line: line[1].split('|')) gender_users = user_fields.map(lambda fields: fields[3]).map(lambda gender: (gender,1)).reduceByKey(lambda a,b: a+b) user_data.foreachRDD(offset)#儲存offset資訊 gender_users.pprint() gender_users.foreachRDD(lambda rdd: rdd.foreach(echo))#返回元組 ssc.start() ssc.awaitTermination() offsetRanges = [] def offset(rdd): global offsetRanges offsetRanges = rdd.offsetRanges() def echo(rdd): zhiye = rdd[0] num = rdd[1] for o in offsetRanges: topic = o.topic partition = o.partition fromoffset = o.fromOffset untiloffset = o.untilOffset #結果插入MySQL conn = MySQLdb.connect(user="root",passwd="******",host="192.168.26.245",db="test",charset="utf8") cursor = conn.cursor() sql = "insert into zhiye(id,zhiye,num,topic,partitions,fromoffset,untiloffset) \ values (NULL,'%s','%d','%s','%d','%d','%d')" % (zhiye,num,topic,partition,fromoffset,untiloffset) cursor.execute(sql) conn.commit() conn.close() if __name__ == '__main__': start()
bin/spark-submit --master spark://192.168.26.245:7077 --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar python/txt.py
執行結果
資料庫部分資料:
WEB顯示資料: