1. 程式人生 > >pyspark系列--讀寫dataframe

pyspark系列--讀寫dataframe

目錄

1. 連線spark

from pyspark.sql import SparkSession

spark=SparkSession \
        .builder \
        .appName('my_first_app_name') \
        .getOrCreate()

2. 建立dataframe

2.1. 從變數建立

# 生成以逗號分隔的資料
stringCSVRDD = spark.sparkContext.parallelize([
    (123, "Katie", 19, "brown"),
    (234
, "Michael", 22, "green"), (345, "Simone", 23, "blue") ]) # 指定模式, StructField(name,dataType,nullable) # 其中: # name: 該欄位的名字, # dataType:該欄位的資料型別, # nullable: 指示該欄位的值是否為空 from pyspark.sql.types import StructType, StructField, LongType, StringType # 匯入型別 schema = StructType([ StructField("id"
, LongType(), True), StructField("name", StringType(), True), StructField("age", LongType(), True), StructField("eyeColor", StringType(), True) ]) # 對RDD應用該模式並且建立DataFrame swimmers = spark.createDataFrame(stringCSVRDD,schema) # 利用DataFrame建立一個臨時檢視 swimmers.registerTempTable("swimmers") # 檢視DataFrame的行數
swimmers.count()

2.2. 從變數建立

# 使用自動型別推斷的方式建立dataframe

data = [(123, "Katie", 19, "brown"),
        (234, "Michael", 22, "green"),
        (345, "Simone", 23, "blue")]
df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])
df.show()
df.count()

2.3. 讀取json

# 讀取spark下面的示例資料

file = r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\people.json"
df = spark.read.json(file)
df.show()

2.4. 讀取csv

# 先建立csv檔案
import pandas as pd
import numpy as np
df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e']).\
    applymap(lambda x: int(x*10))
file=r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\random.csv"
df.to_csv(file,index=False)

# 再讀取csv檔案
monthlySales = spark.read.csv(file, header=True, inferSchema=True)
monthlySales.show()

2.5. 讀取MySQL

# 此時需要將mysql-jar驅動放到spark-2.2.0-bin-hadoop2.7\jars下面
# 單機環境可行,叢集環境不行
# 重新執行
df = spark.read.format('jdbc').options(
    url='jdbc:mysql://127.0.0.1',
    dbtable='mysql.db',
    user='root',
    password='123456' 
    ).load()
df.show()

# 也可以傳入SQL語句

sql="(select * from mysql.db where db='wp230') t"
df = spark.read.format('jdbc').options(
    url='jdbc:mysql://127.0.0.1',
    dbtable=sql,
    user='root',
    password='123456' 
    ).load()
df.show()

2.6. 從pandas.dataframe建立

# 如果不指定schema則用pandas的列名
df = pd.DataFrame(np.random.random((4,4)))
spark_df = spark.createDataFrame (df,schema=['a','b','c','d'])  

2.7. 從列式儲存的parquet讀取

# 讀取example下面的parquet檔案
file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\users.parquet"
df=spark.read.parquet(file)
df.show()

2.8. 從hive讀取

# 如果已經配置spark連線hive的引數,可以直接讀取hive資料
spark = SparkSession \
        .builder \
        .enableHiveSupport() \      
        .master("172.31.100.170:7077") \
        .appName("my_first_app_name") \
        .getOrCreate()

df=spark.sql("select * from hive_tb_name")
df.show()

3. 儲存資料

3.1. 寫到csv

# 建立dataframe
import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])
spark_df = spark.createDataFrame(df)

# 寫到csv
file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.csv"
spark_df.write.csv(path=file, header=True, sep=",", mode='overwrite')

3.2. 儲存到parquet

# 建立dataframe
import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=['a', 'b', 'c', 'd'])
spark_df = spark.createDataFrame(df)

# 寫到parquet
file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.parquet"
spark_df.write.parquet(path=file,mode='overwrite')

3.3. 寫到hive

# 開啟動態分割槽
spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")

# 使用普通的hive-sql寫入分割槽表
spark.sql("""
    insert overwrite table ai.da_aipurchase_dailysale_hive 
    partition (saledate) 
    select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate 
    from szy_aipurchase_tmp_szy_dailysale distribute by saledate
    """)

# 或者使用每次重建分割槽表的方式
jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate')

# 不寫分割槽表,只是簡單的匯入到hive表
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)

3.4. 寫到hdfs

# 資料寫到hdfs,而且以csv格式儲存
jdbcDF.write.mode("overwrite").options(header="true").csv("/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv")

3.5. 寫到mysql

# 會自動對齊欄位,也就是說,spark_df 的列不一定要全部包含MySQL的表的全部列才行

# overwrite 清空表再匯入
spark_df.write.mode("overwrite").format("jdbc").options(
    url='jdbc:mysql://127.0.0.1',
    user='root',
    password='123456',
    dbtable="test.test",
    batchsize="1000",
).save()

# append 追加方式
spark_df.write.mode("append").format("jdbc").options(
    url='jdbc:mysql://127.0.0.1',
    user='root',
    password='123456',
    dbtable="test.test",
    batchsize="1000",
).save()