中讀取資料_Pyspark處理資料中帶有列分隔符的資料集
阿新 • • 發佈:2021-01-20
技術標籤:中讀取資料
本篇文章目標是處理在資料集中存在列分隔符或分隔符的特殊場景。對於Pyspark開發人員來說,處理這種型別的資料集有時是一件令人頭疼的事情,但無論如何都必須處理它。
資料集基本上如下所示:
#first line is the headerNAME|AGE|DEPVivek|Chaudhary|32|BSCJohn|Morgan|30|BEAshwin|Rao|30|BE
資料集包含三個列" Name ", " AGE ", " DEP ",用分隔符" | "分隔。如果我們關注資料集,它也包含' | '列名。
讓我們看看如何進行下一步:
步驟1。使用spark的Read .csv()方法讀取資料集:
#create spark session import pysparkfrom pyspark.sql import SparkSessionspark=SparkSession.builder.appName(‘delimit’).getOrCreate()
上面的命令幫助我們連線到spark環境,並讓我們使用spark.read.csv()讀取資料集
#create df=spark.read.option(‘delimiter’,’|’).csv(r’/delimit_data.txt’,inferSchema=True,header=True)df.show()
從檔案中讀取資料並將資料放入記憶體後我們發現,最後一列資料在哪裡,列年齡必須有一個整數資料型別,但是我們看到了一些其他的東西。這不是我們所期望的。一團糟,完全不匹配,不是嗎?答案是肯定的,確實一團糟。
現在,讓我們來學習如何解決這個問題。
步驟2。再次讀取資料,但這次使用Read .text()方法:
df=spark.read.text(r’/Python_Pyspark_Corp_Training/delimit_data.txt’)df.show(truncate=0)
#extract first row as this is our headerhead=df.first()[0]schema=[‘fname’,’lname’,’age’,’dep’]print(schema)Output: ['fname', 'lname', 'age', 'dep']
下一步是根據列分隔符對資料集進行分割:
#filter the header, separate the columns and apply the schemadf_new=df.filter(df[‘value’]!=head).rdd.map(lambda x:x[0].split(‘|’)).toDF(schema)df_new.show()
現在,我們已經成功分離出列。
我們已經成功地將"|"分隔的列("name")資料分成兩列。現在,資料更加乾淨,可以輕鬆地使用。
接下來,連線列"fname"和"lname":
from pyspark.sql.functions import concat, col, litdf1=df_new.withColumn(‘fullname’,concat(col(‘fname’),lit(“|”),col(‘lname’)))df1.show()
要驗證資料轉換,我們將把轉換後的資料集寫入CSV檔案,然後使用read. CSV()方法讀取它。
df1.write.option(‘sep’,’|’).mode(‘overwrite’).option(‘header’,’true’).csv(r’cust_sep.csv’)
下一步是資料驗證:
df=spark.read.option(‘delimiter’,’|’).csv(r,inferSchema=True,header=True)df.show()
現在的資料看起來像我們想要的那樣。
作者:Vivek Chaudhary
deephub翻譯組