1. 程式人生 > 其它 >中讀取資料_Pyspark處理資料中帶有列分隔符的資料集

中讀取資料_Pyspark處理資料中帶有列分隔符的資料集

技術標籤:中讀取資料

本篇文章目標是處理在資料集中存在列分隔符或分隔符的特殊場景。對於Pyspark開發人員來說,處理這種型別的資料集有時是一件令人頭疼的事情,但無論如何都必須處理它。

資料集基本上如下所示:

#first line is the headerNAME|AGE|DEPVivek|Chaudhary|32|BSCJohn|Morgan|30|BEAshwin|Rao|30|BE

資料集包含三個列" Name ", " AGE ", " DEP ",用分隔符" | "分隔。如果我們關注資料集,它也包含' | '列名。

讓我們看看如何進行下一步:

步驟1。使用spark的Read .csv()方法讀取資料集:

#create spark session import pysparkfrom pyspark.sql import SparkSessionspark=SparkSession.builder.appName(‘delimit’).getOrCreate()

上面的命令幫助我們連線到spark環境,並讓我們使用spark.read.csv()讀取資料集

#create df=spark.read.option(‘delimiter’,’|’).csv(r’/delimit_data.txt’,inferSchema=True,header=True)df.show()
eb7e593dd6e63b019c97dbe40561c91c.png

從檔案中讀取資料並將資料放入記憶體後我們發現,最後一列資料在哪裡,列年齡必須有一個整數資料型別,但是我們看到了一些其他的東西。這不是我們所期望的。一團糟,完全不匹配,不是嗎?答案是肯定的,確實一團糟。

現在,讓我們來學習如何解決這個問題。

步驟2。再次讀取資料,但這次使用Read .text()方法:

df=spark.read.text(r’/Python_Pyspark_Corp_Training/delimit_data.txt’)df.show(truncate=0)
561747353bafc4062d6bbfcf812a2c4b.png
#extract first row as this is our headerhead=df.first()[0]schema=[‘fname’,’lname’,’age’,’dep’]print(schema)Output: ['fname', 'lname', 'age', 'dep']

下一步是根據列分隔符對資料集進行分割:

#filter the header, separate the columns and apply the schemadf_new=df.filter(df[‘value’]!=head).rdd.map(lambda x:x[0].split(‘|’)).toDF(schema)df_new.show()
1f70896345ff1ca98b85003a7d830fca.png

現在,我們已經成功分離出列。

我們已經成功地將"|"分隔的列("name")資料分成兩列。現在,資料更加乾淨,可以輕鬆地使用。

接下來,連線列"fname"和"lname":

from pyspark.sql.functions import concat, col, litdf1=df_new.withColumn(‘fullname’,concat(col(‘fname’),lit(“|”),col(‘lname’)))df1.show()
c58d4e1f57139aaa77ee1e2623140061.png

要驗證資料轉換,我們將把轉換後的資料集寫入CSV檔案,然後使用read. CSV()方法讀取它。

df1.write.option(‘sep’,’|’).mode(‘overwrite’).option(‘header’,’true’).csv(r’cust_sep.csv’)

下一步是資料驗證:

df=spark.read.option(‘delimiter’,’|’).csv(r,inferSchema=True,header=True)df.show()
595a2e70ff3bc417989c40bbf7459922.png

現在的資料看起來像我們想要的那樣。

作者:Vivek Chaudhary

deephub翻譯組