Spark 學習(6)
阿新 • • 發佈:2018-12-09
SparkStream
邏輯
- 當ssc啟動之後,driver會執行一個長時間執行的Task
- 作為Reveiver的executors,接受傳來的資料收到資料,並將其分成塊儲存在記憶體中
- 這寫塊也會被賦值給另一個Executors,以免資料丟失
- 每個批次間隔(通常這是每1秒),驅動程式將啟動Spark任務處理塊。然後這些塊被持久化到任意數量的目標中
- SparkSteam 與 Tcp 對接 每一秒會請求一次連結然後返回資料 而不是建立一個長連線
- 資料儲存,包括雲端儲存(例如S3、WASB等),關係資料儲存(例如,MySQL、PostgreSQL等等)NoSQL儲存。
DStreams
- 一組隨著時間到底的資料序列,代表著每一個時間段內到達的RDDs序列
Windowed transformations
- 每一次資料計算的視窗
checkpointing
- 目的是將資料儲存在可靠的檔案系統中,如HDFS
ssc.checkpoint("hdfs://...")
- 目的是將資料儲存在可靠的檔案系統中,如HDFS
Load
SparkContext
# Create a local SparkContext and Streaming Contexts from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create
Sparksession
""" 與之前的指令碼不同,現在使用的是更熟悉的指令碼 先建立一個session 這裡不需要再去建立一個SparkStreaming """ # Import the necessary classes and create a local SparkSession from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() """ SparkStream 通過在第4行呼叫readStream來發起的 """ # Create DataFrame representing the stream of input lines # from connection to localhost:9999 lines = spark\ .readStream\ .format('socket')\ .option('host', 'localhost')\ .option('port', 9999)\ .load() """ 在這裡就不需要使用RDD的複雜操作, 直接使用SQL便可以 """ # Split the lines into words words = lines.select( explode( split(lines.value, ' ') ).alias('word') ) # Generate running word count wordCounts = words.groupBy('word').count() """ 沒有使用pprint(),而是顯式地呼叫writeStream來編寫 流,並定義格式和輸出模式。雖然寫的時間要長一些, 這些方法和屬性在語法上與其他DataFrame呼叫相似 你只需要改變outputMode和格式屬性來儲存它 對於資料庫、檔案系統、控制檯等等。 """ # Start running the query that prints the # running counts to the console query = wordCounts\ .writeStream\ .outputMode('complete')\ .format('console')\ .start() """ 最後,執行等待取消這個流媒體工作。 """ # Await Spark Streaming termination query.awaitTermination()
Save to text
ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")
Example
# Create a local SparkContext and Streaming Contexts from pyspark import SparkContext from pyspark import SparkConf from pyspark.streaming import StreamingContext import datetime # from pyspark.sql import SQLContext # from pyspark.sql.types import * import numpy as np # Create sc with four working threads sconf = SparkConf() sconf.setMaster("local[4]") sc = SparkContext(appName="act_analysis",conf = sconf) # Create local StreamingContextwith batch interval of 1 second ssc = StreamingContext(sc, 3) # Create DStream that connects to localhost:9999 lines = ssc.socketTextStream("192.168.14.2",1234) def get_rows(x): res = x.split(" ") return (res[0],res[1]) rows = lines.map(get_rows) roles = rows.groupByKey() def get_speed_std(action): temp_list = [] temp = 0 for i in action: if temp == 0: temp_list.append(0) temp = i continue temp_list.append((i - temp)) temp = i res = np.std(np.array(temp_list), ddof=1) return res def get_role_action_feature(action): try: action = sorted(action) except Exception as e: print(e) return None count = len(action) stay_time = int(action[-1] - action[0]) try: ave_speed = count / stay_time std_speed = get_speed_std(action) except: ave_speed = 0 std_speed = 0 return ave_speed, std_speed def feature(x): role = x[0] action = [float(i)for i in list(x[1])] res = get_role_action_feature(action) return (role,res[0],res[1],datetime.datetime.now()) res = roles.map(feature) # res.pprint() from pymongo import MongoClient MONGODB = { 'MONGO_HOST': '', 'MONGO_PORT': '27017', 'MONGO_USERNAME': '', 'MONGO_PASSWORD': '' } mongo_uri = 'mongodb://{account}{host}:{port}/'.format( account='{username}:{password}@'.format( username=MONGODB['MONGO_USERNAME'], password=MONGODB['MONGO_PASSWORD']) if MONGODB['MONGO_USERNAME'] else '', host=MONGODB['MONGO_HOST'], port=MONGODB['MONGO_PORT']) conn = MongoClient(mongo_uri) db = conn['104'] def save(x): col = db['act_live_res'] res = x.collect() for i in res: item = { 'role_id' : i[0], 'ave':i[1], 'std':i[2], 'date':i[3] } col.insert_one(item) # conn.close() res.foreachRDD(save) ssc.start() # Wait for the computation to terminate ssc.awaitTermination()