pyspark dataframe 常用操作
1、union、unionAll、unionByName,row 合併(上下拼接)
data_all = data_neg.unionByName(data_pos)
2、dataframe 樣本抽樣
data_all.sample(False, 0.5, 1000).count()
3、條件過濾
data_all.filter("label >= 1").count()
4、註冊為臨時表,再使用spark.sql 對dataframe進行操作
res = predictions.select("user_log_acct", split_udf('probability').alias('probability'))
res.registerTempTable("tmp")
spark.sql("insert overwrite table dev.dev_result_temp select user_log_acct,probability from tmp")
spark.stop()
建立和儲存spark dataframe:
spark.createDataFrame(data, schema=None, samplingRatio=None),直接建立
其中data是行或元組或列表或字典的RDD、list、pandas.DataFrame。
df = spark.createDataFrame([ (1, 144.5, 5.9, 33, 'M'), (2, 167.2, 5.4, 45, 'M'), (3, 124.1, 5.2, 23, 'F'), (4, 144.5, 5.9, 33, 'M'), (5, 133.2, 5.7, 54, 'F'), (3, 124.1, 5.2, 23, 'F'), (5, 129.2, 5.3, 42, 'M'), ], ['id', 'weight', 'height', 'age', 'gender']) #直接建立Dataframe df = spark.createDataFrame([{'name':'Alice','age':1}, {'name':'Polo','age':1}]) #從字典建立 schema = StructType([ StructField("id", LongType(), True), StructField("name", StringType(), True), StructField("age", LongType(), True), StructField("eyeColor", StringType(), True) ]) df = spark.createDataFrame(csvRDD, schema) #指定schema。
spark.read 從檔案中讀資料
>>> airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t') >>> rdd = sc.textFile('python/test_support/sql/ages.csv') #可以用這種方法將用逗號分隔的rdd轉為dataframe >>> df2 = spark.read.csv(rdd) >>> df = spark.read.format('json').load('python/test_support/sql/people.json') >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes [('age', 'bigint'), ('name', 'string')] >>> rdd = sc.textFile('python/test_support/sql/people.json') >>> df2 = spark.read.json(rdd) >>> df = spark.read.text('python/test_support/sql/text-test.txt') >>> df.collect() [Row(value='hello'), Row(value='this')] >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True) >>> df.collect() [Row(value='hello\nthis')]
Spark function
1)foreach(f),應用f函式,將df的每一行作為f函式的輸入
例如:
def f(person):
print(person.name)
df.foreach(f)
2) apply(udf)
3) map(f),應用f函式,作用物件為rdd的每一行
參考:https://blog.csdn.net/kittyzc/article/details/82862089