pyspark拉取kafka資料
阿新 • • 發佈:2018-12-20
1.建立kafka的topic:
./kafka-topics.sh --create --zookeeper xxxx:2181,xxxx:2181 --replication-factor 3 --partitions 3 --topic test
2.pyspark上傳具有spark客戶端的節點
vim ttt.py
# encoding:utf-8 from pyspark import SparkContext from pyspark import SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils def start(): conf = SparkConf().set("spark.python.profile", "true").set("spark.io.compression.codec", "snappy")#.setMaster('local[*]') conf.setAppName('spark-test') sc = SparkContext(conf=conf) ssc=StreamingContext(sc,6) brokers="xxx:6667,xxx:6667,xxx:666" topic='test' kafkaStreams = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list": brokers}) result=kafkaStreams.map(lambda x:(x[1],1)).reduceByKey(lambda x, y: x + y) kafkaStreams.transform(storeOffsetRanges).foreachRDD(printOffsetRanges) result.pprint() ssc.start() ssc.awaitTermination() offsetRanges = [] def storeOffsetRanges(rdd): global offsetRanges offsetRanges = rdd.offsetRanges() return rdd def printOffsetRanges(rdd): for o in offsetRanges: print "%s %s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset,o.untilOffset-o.fromOffset) if __name__ == '__main__': start() 3.zip ttt.py ./ttt.py 4.提交程式: spark-submit --master yarn-cluster --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 --jars spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar --py-files ttt.zip ttt.py 5.新開客戶端 ./kafka-console-producer.sh --broker-list xxx:6667,xxx:6667,xxx:6667 --topic test
end
有問題加QQ群:877769335
或者用QQ掃描二維碼加群: