1. 程式人生 > 程式設計 >Pyspark獲取並處理RDD資料程式碼例項

Pyspark獲取並處理RDD資料程式碼例項

彈性分散式資料集(RDD)是一組不可變的JVM物件的分佈集,可以用於執行高速運算,它是Apache Spark的核心。

在pyspark中獲取和處理RDD資料集的方法如下:

1. 首先是匯入庫和環境配置(本測試在linux的pycharm上完成)

import os
from pyspark import SparkContext,SparkConf
from pyspark.sql.session import SparkSession
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
conf = SparkConf().setAppName('test_rdd')
sc = SparkContext('local','test',conf=conf)
spark = SparkSession(sc)

2. 然後,提供hdfs分割槽資料的路徑或者分割槽表名

txt_File = r"hdfs://host:port/apps/hive/warehouse/資料庫名.db/表名/分割槽名/part-m-00029.deflate" # part-m-00029.deflate
# txt_File = r"hdfs://host:port/apps/hive/warehouse/資料庫名.db/表名" # hive table

3. sc.textFile進行讀取,得到RDD格式資料<還可以用 spark.sparkContext.parallelize(data) 來獲取RDD資料>,引數中還可設定資料被劃分的分割槽數

txt_ = sc.textFile(txt_File)

4. 基本操作:

  • type(txt_):顯示資料型別,這時屬於 'pyspark.rdd.RDD'
  • txt_.first():獲取第一條資料
  • txt_.take(2):獲取前2條資料,形成長度為2的list
  • txt_.take(2)[1].split('\1')[1]:表示獲取前兩條中的第[1]條資料(也就是第2條,因為python的索引是從0開始的),並以 '\1'字元分隔開(這要看你的表用什麼作為分隔符的),形成list,再獲取該list的第2條資料
  • txt_.map(lambda x:x.split('\1')):使用lambda函式和map函式快速處理每一行資料,這裡表示將每一行以 '\1'字元分隔開,每一行返回一個list;此時資料結構是:'pyspark.rdd.PipelinedRDD'
  • txt_.map(lambda x:(x,x.split('\1'))).filter(lambda y:y[0].startswith('北京')):表示在返回 (x,x.split('\1')) 後,進行篩選filter,獲取其中以 '北京' 開頭的行,並按照相同格式 (例如,這裡是(x,x.split('\1'))格式,即原資料+分割後的列表資料) 返回資料
  • txt_.collect():返回所有RDD資料元素,當資料量很大時謹慎操作
  • txt_.toDF():不能直接轉成DataFrame格式,需要設定Schema

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。