企業版Spark Databricks + 企業版Kafka Confluent 聯合高效挖掘資料價值
前提條件
- 已註冊阿里雲賬號,詳情請參見
- 已開通 Databricks 資料洞察服務
- 已開通 OSS 物件儲存服務
- 已開通 Confluent 流資料服務
建立Databricks叢集 & Confluent叢集
- 登入
- 登入
Databricks Worker節點公網訪問
Databricks的worker節點暫時不支援公網訪問,為了能訪問Confluent的公網地址,請聯絡Databricks的開發人員新增NAT閘道器。
案例:計程車資料入湖及分析
計程車和網約車在每天的執行中持續產生行駛軌跡和交易資料,這些資料對於車輛排程,流量預測,安全監控等場景有著極大的價值。
本案例中我們使用
前置準備:
- 建立topic:
登入Confluent的control center,在左側選中Topics,點選Add a topic
按鈕,建立一個名為nyc_taxi_data
的topic,將partition設定為3,其他配置保持預設。 - 建立OSS bucket:
在和Databricks同一Region的OSS中,建立bucket,bucket命名為:databricks-confluent-integration
進入到 - 收集url,使用者名稱,密碼,路徑等以便後續使用a。
- confluent叢集ID:在csp的管控介面,叢集詳情頁獲取
- Confluent Control Center的使用者名稱和密碼
- 路徑:
- Databricks Structured Streaming的checkpoint儲存目錄
- 採集的資料的儲存目錄
下面是我們後續會使用到的一些變數:
# 叢集管控介面獲取 confluent_cluster_id = "your_confluent_cluster_id" # 使用confluent叢集ID拼接得到 confluent_server = "rb-{confluent_cluster_id}.csp.aliyuncs.com:9092" control_center_username = "your_confluent_control_center_username" control_center_password = "your_confluent_control_center_password" topic = "nyc_taxi_data" checkpoint_location = "oss://databricks-confluent-integration/checkpoint_dir" taxi_data_delta_lake = "oss://databricks-confluent-integration/data/nyc_taxi_data"
資料的產生
在本案例中,我們使用
- 我們先安裝confluent的python客戶端,其他語言的客戶端參考
pip install confluent_kafka
- 構造用於建立Kafka Producer的基礎資訊,如:bootstrap-server,control center的username,password等
conf = { 'bootstrap.servers': confluent_server, 'key.serializer': StringSerializer('utf_8'), 'value.serializer': StringSerializer('utf_8'), 'client.id': socket.gethostname(), 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.username': control_center_username, 'sasl.password': control_center_password }
- 建立Producer:
producer = Producer(conf)
- 向Kafka中傳送訊息(模擬資料的產生):
with open("/Path/To/train.csv", "rt") as f: float_field = ['fare_amount', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude'] for row in reader: i += 1 try: for field in float_field: row[field] = float(row[field]) row['passenger_count'] = int(row['passenger_count']) producer.produce(topic=topic, value=json.dumps(row)) if i % 1000 == 0: producer.flush() if i == 200000: break except ValueError: # discard null/NAN data continue
Kafka中的partition和offset
在使用spark讀取Kafka中的資料之前,我們回顧一下Kafka中的概念:partition和offset
- partition:kafka為了能並行進行資料的寫入,將每個topic的資料分為多個partition,每個partition由一個Broker負責,向partition寫入資料時,負責該partition的Broker將訊息複製給它的follower
- offset:Kafka會為每條寫入partition裡的訊息進行編號,訊息的編號即為offset
我們在讀取Kafka中的資料時,需要指定我們想要讀取的資料,該指定需要從兩個維度:partition的維度 + offset的維度。
- Earliest:從每個partition的offset 0開始讀取和載入
- Latest:從每個partition最新的資料開始讀取
- 自定義:指定每個partition的開始offset和結束offset
- 讀取topic1 partition 0 offset 23和partition 0 offset -2之後的資料:
"""{"topic1":{"0":23,"1":-2}}"""
除了指定start offset,我們還可以通過endingOffsets引數指定讀取到什麼位置為止。
將資料儲存到LakeHouse:Spark整合Confluent
理解上述概念後,Databricks和Confluent的整合非常簡單,只需要對spark session的readStream引數進行簡單的設定就可以將Kafka中的實時流資料轉換為Spark中的Dataframe:
lines = (spark.readStream # 指定資料來源: kafka .format("kafka") # 指定kafka bootstrap server的URL .option("kafka.bootstrap.servers", confluent_server) # 指定訂閱的topic .option("subscribe", topic) # 指定想要讀取的資料的offset,earliest表示從每個partition的起始點開始讀取 .option("startingOffsets", "earliest") # 指定認證協議 .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "PLAIN") # 指定confluent的使用者名稱和密碼 .option("kafka.sasl.jaas.config", f"""org.apache.kafka.common.security.plain.PlainLoginModule required username="{control_center_username}" password="{control_center_password}";""") .load())
從kafka中讀取的資料格式如下:
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
由於key和value都是binary格式的,我們需要將value(json)由binary轉換為string格式,並定義schema,提取出Json中的資料,並轉換為對應的格式:
schema = (StructType().add('key', TimestampType()) .add('fare_amount', FloatType()) .add('pickup_datetime', TimestampType()) .add('pickup_longitude', FloatType()) .add('pickup_latitude', FloatType()) .add('dropoff_longitude', FloatType()) .add('dropoff_latitude', FloatType()) .add('passenger_count', IntegerType()) ) # 將json中的列提取出來 lines = (lines.withColumn('data', from_json( col('value').cast('string'), # binary 轉 string schema)) # 解析為schema .select(col('data.*'))) # select value中的所有列
過濾掉錯誤,為空,NaN的資料:
lines = (lines.filter(col('pickup_longitude') != 0) .filter(col('pickup_latitude') != 0) .filter(col('dropoff_longitude') != 0) .filter(col('dropoff_latitude') != 0) .filter(col('fare_amount') != 0) .filter(col('passenger_count') != 0))
最後,我們將解析出來的資料輸出到LakeHouse中,以進行後續的分析和機器學習模型訓練:
# lakehouse 的儲存格式為 delta query = (lines.writeStream.format('delta') .option('checkpointLocation', checkpoint_location) .option('path', taxi_data_delta_lake).start()) # 執行job,直到出現異常(如果只想執行該Job一段時間,可以指定timeout引數) query.awaitTermination()
資料分析
我們先將LakeHouse中的資料使用Spark載入進來:
然後,我們對該Dataframe建立一個Table View,並探索fare_amount的分佈:
可以看到fare_amount的最小值是負數,這顯然是一條錯誤的資料,我們將這些錯誤的資料過濾,並探索fare_amount的分佈:
然後我們探索價格和年份,月份,星期,打車時間的關係:
從上面可以看出兩點:
- 計程車的價格和年份有很大關係,從09年到15年呈不斷增長的態勢
- 在中午和凌晨打車比上午和下午打車更貴一些。
我們再進一步探索價格和乘客數量的關係:
此外,計程車價格的另一個影響因素就是距離,這裡我們藉助python的geopy包和Spark的UDF來計算給定兩個位置的距離,然後再分析費用和距離的關係。
經緯度的範圍為[-90, 90],因此,我們第一步是清除錯誤的資料:
然後,我們增加一列資料:計程車行駛的距離,並將距離離散化,進行後續的分析:
統計打車距離的分佈:
從上圖可以看出:打車距離分佈在區間[0, 15]miles內,我們繼續統計在該區間內,打車價格和打車距離的關係:
如上圖所示:打車價格和打車距離呈現出線性增長的趨勢。
機器學習建模
在上一小節的資料分析中,我們已經提取了和計程車相關聯的一些特徵,根據這些特徵,我們建立一個簡單的線性迴歸模型:
打車費用 ~ (年份,打車時間,乘客數,距離)
先將特徵和目標值提取出來:
對特徵做歸一化:
分割訓練集和測試集:
建立線性迴歸模型進行訓練:
使用Evaluator對模型進行評價:
總結
我們在本文中介紹瞭如何使用阿里雲的Confluent Cloud和Databricks來構建您的資料流和LakeHouse,並介紹瞭如何使用Databricks提供的能力來挖掘資料價值,使用Spark MLlib構建您的機器學習模型。有了Confluent Cloud和Databricks,您可以輕鬆實現資料入湖,及時在最新的資料上進行探索,挖掘您的資料價值。歡迎您試用阿里雲
本文為阿里雲原創內容,未經允許不得轉載。