1. 程式人生 > 其它 >企業版Spark Databricks + 企業版Kafka Confluent 聯合高效挖掘資料價值

企業版Spark Databricks + 企業版Kafka Confluent 聯合高效挖掘資料價值

簡介:本文介紹瞭如何使用阿里雲的Confluent Cloud和Databricks構建資料流和LakeHouse,並介紹瞭如何使用Databricks提供的能力來挖掘資料價值,使用Spark MLlib構建您的機器學習模型。

前提條件

  • 已註冊阿里雲賬號,詳情請參見阿里雲賬號註冊流程
  • 已開通 Databricks 資料洞察服務
  • 已開通 OSS 物件儲存服務
  • 已開通 Confluent 流資料服務

建立Databricks叢集 & Confluent叢集

  1. 登入Confluent管理控制檯,建立Confluent叢集,並開啟公網服務
  2. 登入Databricks管理控制檯,建立Databricks叢集

Databricks Worker節點公網訪問

Databricks的worker節點暫時不支援公網訪問,為了能訪問Confluent的公網地址,請聯絡Databricks的開發人員新增NAT閘道器。

案例:計程車資料入湖及分析

計程車和網約車在每天的執行中持續產生行駛軌跡和交易資料,這些資料對於車輛排程,流量預測,安全監控等場景有著極大的價值。

本案例中我們使用紐約市的計程車資料來模擬網約車資料從產生,釋出到流資料服務Confluent,通過Databricks Structured Streaming進行實時資料處理,並存儲到LakeHouse的整個流程。資料儲存到LakeHouse後,我們使用spark和spark sql對資料進行分析,並使用Spark的MLlib進行機器學習訓練。

前置準備:

  1. 建立topic:
    登入Confluent的control center,在左側選中Topics,點選Add a topic按鈕,建立一個名為nyc_taxi_data的topic,將partition設定為3,其他配置保持預設。

  2. 建立OSS bucket:
    在和Databricks同一Region的OSS中,建立bucket,bucket命名為:databricks-confluent-integration
    進入到Bucket列表頁,點選建立bucket按鈕


    建立好bucket之後,在該bucket建立目錄:checkpoint_dir和data/nyc_taxi_data兩個目錄
  3. 收集url,使用者名稱,密碼,路徑等以便後續使用a。
  4. confluent叢集ID:在csp的管控介面,叢集詳情頁獲取
  5. Confluent Control Center的使用者名稱和密碼
  6. 路徑:
  • Databricks Structured Streaming的checkpoint儲存目錄
  • 採集的資料的儲存目錄

下面是我們後續會使用到的一些變數:

# 叢集管控介面獲取
confluent_cluster_id = "your_confluent_cluster_id"    
# 使用confluent叢集ID拼接得到
confluent_server = "rb-{confluent_cluster_id}.csp.aliyuncs.com:9092" 
control_center_username = "your_confluent_control_center_username"
control_center_password = "your_confluent_control_center_password"

topic = "nyc_taxi_data"

checkpoint_location = "oss://databricks-confluent-integration/checkpoint_dir"
taxi_data_delta_lake = "oss://databricks-confluent-integration/data/nyc_taxi_data"

資料的產生

在本案例中,我們使用Kaggle上的NYC計程車資料集來模擬資料產生。

  • 我們先安裝confluent的python客戶端,其他語言的客戶端參考confluent官網
pip install confluent_kafka
  • 構造用於建立Kafka Producer的基礎資訊,如:bootstrap-server,control center的username,password等
conf = {
    'bootstrap.servers': confluent_server,
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': StringSerializer('utf_8'),
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': control_center_username,
    'sasl.password': control_center_password
}
  • 建立Producer:
producer = Producer(conf)
  • 向Kafka中傳送訊息(模擬資料的產生):
with open("/Path/To/train.csv", "rt") as f:
    float_field = ['fare_amount', 'pickup_longitude', 'pickup_latitude', 
                   'dropoff_longitude', 'dropoff_latitude']
    for row in reader:
        i += 1
        try:
            for field in float_field:
                row[field] = float(row[field])
            row['passenger_count'] = int(row['passenger_count'])
            producer.produce(topic=topic, value=json.dumps(row))
            if i % 1000 == 0:
                producer.flush()
                if i == 200000:
                    break
        except ValueError: # discard null/NAN data
            continue 

Kafka中的partition和offset

在使用spark讀取Kafka中的資料之前,我們回顧一下Kafka中的概念:partition和offset

  • partition:kafka為了能並行進行資料的寫入,將每個topic的資料分為多個partition,每個partition由一個Broker負責,向partition寫入資料時,負責該partition的Broker將訊息複製給它的follower
  • offset:Kafka會為每條寫入partition裡的訊息進行編號,訊息的編號即為offset

我們在讀取Kafka中的資料時,需要指定我們想要讀取的資料,該指定需要從兩個維度:partition的維度 + offset的維度。

  • Earliest:從每個partition的offset 0開始讀取和載入
  • Latest:從每個partition最新的資料開始讀取
  • 自定義:指定每個partition的開始offset和結束offset
  • 讀取topic1 partition 0 offset 23和partition 0 offset -2之後的資料:"""{"topic1":{"0":23,"1":-2}}"""

除了指定start offset,我們還可以通過endingOffsets引數指定讀取到什麼位置為止。

將資料儲存到LakeHouse:Spark整合Confluent

理解上述概念後,Databricks和Confluent的整合非常簡單,只需要對spark session的readStream引數進行簡單的設定就可以將Kafka中的實時流資料轉換為Spark中的Dataframe:

lines = (spark.readStream
         # 指定資料來源: kafka
         .format("kafka")
         # 指定kafka bootstrap server的URL
         .option("kafka.bootstrap.servers", confluent_server)
         # 指定訂閱的topic
         .option("subscribe", topic)
         # 指定想要讀取的資料的offset,earliest表示從每個partition的起始點開始讀取
         .option("startingOffsets", "earliest")
         # 指定認證協議
         .option("kafka.security.protocol", "SASL_SSL")
         .option("kafka.sasl.mechanism", "PLAIN")
         # 指定confluent的使用者名稱和密碼
         .option("kafka.sasl.jaas.config",
                 f"""org.apache.kafka.common.security.plain.PlainLoginModule 
                 required username="{control_center_username}" password="{control_center_password}";""")
         .load())

從kafka中讀取的資料格式如下:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

由於key和value都是binary格式的,我們需要將value(json)由binary轉換為string格式,並定義schema,提取出Json中的資料,並轉換為對應的格式:

schema = (StructType().add('key', TimestampType())
          .add('fare_amount', FloatType())
          .add('pickup_datetime', TimestampType())
          .add('pickup_longitude', FloatType())
          .add('pickup_latitude', FloatType())
          .add('dropoff_longitude', FloatType())
          .add('dropoff_latitude', FloatType())
          .add('passenger_count', IntegerType())
          )

# 將json中的列提取出來
lines = (lines.withColumn('data', 
                          from_json(
                              col('value').cast('string'), # binary 轉 string
                              schema))                     # 解析為schema
         .select(col('data.*')))                           # select value中的所有列

過濾掉錯誤,為空,NaN的資料:

lines = (lines.filter(col('pickup_longitude') != 0)
         .filter(col('pickup_latitude') != 0)
         .filter(col('dropoff_longitude') != 0)
         .filter(col('dropoff_latitude') != 0)
         .filter(col('fare_amount') != 0)
         .filter(col('passenger_count') != 0))

最後,我們將解析出來的資料輸出到LakeHouse中,以進行後續的分析和機器學習模型訓練:

# lakehouse 的儲存格式為 delta
query = (lines.writeStream.format('delta')
         .option('checkpointLocation', checkpoint_location)
         .option('path', taxi_data_delta_lake).start())
# 執行job,直到出現異常(如果只想執行該Job一段時間,可以指定timeout引數)
query.awaitTermination()

資料分析

我們先將LakeHouse中的資料使用Spark載入進來:

然後,我們對該Dataframe建立一個Table View,並探索fare_amount的分佈:

可以看到fare_amount的最小值是負數,這顯然是一條錯誤的資料,我們將這些錯誤的資料過濾,並探索fare_amount的分佈:

然後我們探索價格和年份,月份,星期,打車時間的關係:

從上面可以看出兩點:

  • 計程車的價格和年份有很大關係,從09年到15年呈不斷增長的態勢
  • 在中午和凌晨打車比上午和下午打車更貴一些。

我們再進一步探索價格和乘客數量的關係:

此外,計程車價格的另一個影響因素就是距離,這裡我們藉助python的geopy包和Spark的UDF來計算給定兩個位置的距離,然後再分析費用和距離的關係。

經緯度的範圍為[-90, 90],因此,我們第一步是清除錯誤的資料:

然後,我們增加一列資料:計程車行駛的距離,並將距離離散化,進行後續的分析:

編輯

統計打車距離的分佈:

從上圖可以看出:打車距離分佈在區間[0, 15]miles內,我們繼續統計在該區間內,打車價格和打車距離的關係:

如上圖所示:打車價格和打車距離呈現出線性增長的趨勢。

機器學習建模

在上一小節的資料分析中,我們已經提取了和計程車相關聯的一些特徵,根據這些特徵,我們建立一個簡單的線性迴歸模型:

打車費用 ~ (年份,打車時間,乘客數,距離)

先將特徵和目標值提取出來:

對特徵做歸一化:

分割訓練集和測試集:

建立線性迴歸模型進行訓練:

訓練結果統計:

使用Evaluator對模型進行評價:

總結

我們在本文中介紹瞭如何使用阿里雲的Confluent Cloud和Databricks來構建您的資料流和LakeHouse,並介紹瞭如何使用Databricks提供的能力來挖掘資料價值,使用Spark MLlib構建您的機器學習模型。有了Confluent Cloud和Databricks,您可以輕鬆實現資料入湖,及時在最新的資料上進行探索,挖掘您的資料價值。歡迎您試用阿里雲ConfluentDatabricks

原文連結

本文為阿里雲原創內容,未經允許不得轉載。