sqlContext.filter()返回的RDD為空
阿新 • • 發佈:2019-01-22
Hive中已有表records:
hive> desc records;
OK
year string
temperature int
quality int
hive> select * from records;
OK
201315
18
201423
32
201519
91
把records表中temperature中!=15的篩選出來,另建立一張新表存入篩選後的資料。程式碼如下:
from pyspark import SparkContext from pyspark.sql import HiveContext def inside(row): <span style="color:#ff0000;"> if int(row[1]) == 15: print "*******************************" +str( row[1]) return False</span> if __name__ == "__main__": sc = SparkContext(appName = "records") sqlContext = HiveContext(sc) table_df = sqlContext.sql("select * from records").rdd rltrdd = table_df.filter(lambda row : inside(row))
count = rltrdd.count()
if count == 0:
print "**************************nothing****************"
else:
print "********************************************" + str(count)
提示RDD為空,報錯如下:<span style="white-space:pre"> </span>tablename = "temp" newdf = sqlContext.createDataFrame(rltrdd) newdf.registerAsTable(tablename) sql_create = "create table temptable like records" sql_insert = "insert into table temptable select * from temp" sqlContext.sql(sql_create) sqlContext.sql(sql_insert) sc.stop()
Traceback (most recent call last):
File "/home/sky/spark/bin/workspace/query.py", line 24, in <module>
newdf = sqlContext.createDataFrame(rltrdd)
File "/home/sky/spark/python/pyspark/sql/context.py", line 284, in createDataFrame
schema = self._inferSchema(rdd, samplingRatio)
File "/home/sky/spark/python/pyspark/sql/context.py", line 164, in _inferSchema
first = rdd.first()
File "/home/sky/spark/python/pyspark/rdd.py", line 1245, in first
raise ValueError("RDD is empty")
ValueError: RDD is empty
修改程式碼如下:
from pyspark import SparkContext
from pyspark.sql import HiveContext
def inside(row):
<span style="color:#ff0000;"> if int(row[1]) != 15:
print "*******************************" +str( row[1])
return True
else:
return False</span>
if __name__ == "__main__":
sc = SparkContext(appName = "records")
sqlContext = HiveContext(sc)
table_df = sqlContext.sql("select * from records").rdd
print "*************************************"
print table_df
rltrdd = table_df.filter(lambda row : inside(row))
print "*************************************"+str(rltrdd)
count = rltrdd.count()
if count == 0:
print "**************************nothing****************"
else:
<span style="white-space:pre"> </span>print "********************************************" + str(count)
tablename = "temp"
newdf = sqlContext.createDataFrame(rltrdd)
newdf.registerAsTable(tablename)
sql_create = "create table temptable like records"
sql_insert = "insert into table temptable select * from temp"
sqlContext.sql(sql_create)
sqlContext.sql(sql_insert)
sc.stop()
錯誤原因:
filter必須返回有True的值,否則為空