mac下單機版 kafka + spark + python搭建與例項
kafka+zookeeper
不提供spark安裝,這裡從kafka安裝開始
首先下載kafka和zookeeper
brew install zookeeper
等它安裝完畢,先進入zookeeper資料夾,往往在/usr/local/Cellar下,啟動zookeeper:
cd /usr/local/Cellar/zookeeper/3.4.6_1/bin
zkServer start
如果啟動kafka下的zookeeper我這裡會報錯,然後進入kafka,啟動kafka:
bin/kafka-server-start.sh config/kafka.properties
啟動後會一直在最後一個INFO 不會動了,其實這已經表明啟動成功了,開啟另一個終端,進入同樣的kafka路徑,新建一個topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
檢視新建的topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以看到test,測試下kafka:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
直接輸入幾個字元如
hello world!
再新建一個終端,注意上面已經有兩個終端了,這裡開啟第三個,進入同樣的路徑下,:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
就可以看到剛才輸入的hello world了。
接下來介紹python的kafka+spark的使用。
python+kafka+spark
編寫python例項程式碼:
# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sconf=SparkConf()
sconf.set('spark.cores.max' , 2)
sc=SparkContext(appName="KafkaWordCount",conf=sconf)
ssc=StreamingContext(sc,3)
zookeeper="localhost:2181"
topic={"test":1}
groupid="test-consumer-group"
lines = KafkaUtils.createStream(ssc, zookeeper,groupid,topic)
lines1=lines.map(lambda x:x[1])
words=lines1.flatMap(lambda line:line.split(" "))
pairs=words.map(lambda word:(word,1))
wordcounts=pairs.reduceByKey(lambda x,y:x+y)
#wordcounts.saveAsTextFiles("/kafka")
wordcounts.pprint()
ssc.start()
ssc.awaitTermination()
提交python程式碼
spark-submit --jars /usr/local/Cellar/kafka/0.8.2.1/libexec/spark-streaming-kafka-assembly_2.11-2.2.0.jar new.py 2>
如果想在自己的編譯器裡編寫python並選擇python3,則在spark預設的python源需要改為python3,這樣在spark-submit時就會選擇python3,而不是預設的python2.
在環境變數裡增加下列程式碼即可:
export PYSPARK_PYTHON=python3
且還需要pip一些包,pyspark、kafka等。
下面是以搜狗資料為實驗的生產者程式碼
注意生產者程式碼可以直接在自己的編譯器執行即可,不必提交:
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from kafka.producer import SimpleProducer
from kafka.client import KafkaClient
import json
import time
import sys
def main():
##測試生產模組
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
fileName1 = '/Volumes/DATA/BigData/bigdata_homework/sogou.500w.utf8'
fileName2 = '/Users/tcd-pc/Desktop/sogou.10k.utf8'
with open(fileName1,encoding='utf-8') as file:
a = 20111230000000
for fileline in file:
#print(int(fileline.split('\t')[0])//10000-a//10000)
if int(fileline.split('\t')[0])//10000%2==0:
a = int(fileline.split('\t')[0])
time.sleep(10)
try:
producer.send('test1',fileline.encode('utf-8'))
producer.flush()
except kafkaError as e:
print(e)
if __name__ == '__main__':
main()
下面是消費者的程式碼框架,具體分析的程式碼就不放了:
# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import pyspark
import jieba
from jieba import analyse
import sys
import os
os.system('hdfs dfs -rm -r /tmp1/checkpoint/')
zookeeper="localhost:2181"
kafkatopic="test1"
groupid='test-consumer-group'
#建立上下文
def createContext():
sconf = SparkConf()
sconf.set('spark.cores.max', 8)
sc = SparkContext(appName="KafkaWordCount", conf=sconf)
ssc2 = StreamingContext(sc, 60)
ssc2.checkpoint("/tmp1/checkpoint/")
return ssc2
def main_main(ssc):
consumer = KafkaUtils.createStream(ssc, zookeeper, groupid, {kafkatopic: 1})
lines1 = consumer.map(lambda x: x[1])
words = lines1.map(lambda line: line.split("\t"))
#分割為 時間 使用者ID 搜素內容 rank1 rank2 點選的網址
seqs = words.map(lambda word: [word[0], word[1],word[2],word[3], word[4],
word[5].split("//")[1].split("/")[0] if len(word[5].split("//"))>1 else 'nokey',
analyse.extract_tags(word[2] if len(word[2])>0 else 'nokey', topK=1, withWeight=False)
])
def main():
##測試消費模組
ssc = StreamingContext.getOrCreate("/tmp1/checkpoint/",createContext)
#呼叫測試
main_main(ssc)
ssc.start()
ssc.awaitTermination()
if __name__ == '__main__':
main()
用spark streaming流資料分析時,會遇到需要排序的情況,而spark並沒有這樣的方法直接可以用,如果需要可以參考本渣寫的:
webs_time = seqs.map(lambda item: (item[5], 1)) \
.reduceByKey(lambda x, y: x + y) \
.map(lambda x: (1, [x[0], x[1]])).groupByKey() \
.mapValues(lambda x: sorted(x, reverse=True, key=lambda y: y[1])[:10])
用groupbykey讓一個key對應所有這個key的資料,然後用mapValues對key下的所有資料進行排序。其中groupbykey之前用了map方法把需要的key提出來,其他的資料用list表示。
實驗表明:這樣的操作實在時非常慢,但還是可以排序的。。。。。。。