1. 程式人生 > >sqlContext.filter()返回的RDD為空

sqlContext.filter()返回的RDD為空

Hive中已有表records:

hive> desc records;
OK
year                string                                 
temperature         int                                    
quality             int 

hive> select * from records;
OK
201315 18
201423 32
201519 91

把records表中temperature中!=15的篩選出來,另建立一張新表存入篩選後的資料。程式碼如下:

from pyspark import SparkContext
from pyspark.sql import HiveContext

def inside(row):
<span style="color:#ff0000;">        if int(row[1]) == 15:
                print "*******************************" +str( row[1])
                return False</span>

if __name__ == "__main__":
        sc = SparkContext(appName = "records")
        sqlContext = HiveContext(sc)

        table_df = sqlContext.sql("select * from records").rdd
        rltrdd = table_df.filter(lambda row : inside(row))
        count = rltrdd.count()
        if count == 0:
                print "**************************nothing****************"
        else:
                print "********************************************" + str(count)
 <span style="white-space:pre">	</span>tablename = "temp"
        newdf = sqlContext.createDataFrame(rltrdd)
        newdf.registerAsTable(tablename)

        sql_create = "create table  temptable like records"
        sql_insert = "insert into table temptable select * from temp"
        sqlContext.sql(sql_create)
        sqlContext.sql(sql_insert)
        sc.stop()
                                               
提示RDD為空,報錯如下:
Traceback (most recent call last):
  File "/home/sky/spark/bin/workspace/query.py", line 24, in <module>
    newdf = sqlContext.createDataFrame(rltrdd)
  File "/home/sky/spark/python/pyspark/sql/context.py", line 284, in createDataFrame
    schema = self._inferSchema(rdd, samplingRatio)
  File "/home/sky/spark/python/pyspark/sql/context.py", line 164, in _inferSchema
    first = rdd.first()
  File "/home/sky/spark/python/pyspark/rdd.py", line 1245, in first
    raise ValueError("RDD is empty")
ValueError: RDD is empty

修改程式碼如下:

from pyspark import SparkContext
from pyspark.sql import HiveContext

def inside(row):
<span style="color:#ff0000;">        if int(row[1]) != 15:
                print "*******************************" +str( row[1])
                return True
        else:   
                return False</span>
if __name__ == "__main__":
        sc = SparkContext(appName = "records")
        sqlContext = HiveContext(sc)
      
        table_df = sqlContext.sql("select * from records").rdd
        print "*************************************" 
        print  table_df
        rltrdd = table_df.filter(lambda row : inside(row))
        print "*************************************"+str(rltrdd)
        count = rltrdd.count()
        if count == 0:
                print "**************************nothing****************"
        else:
<span style="white-space:pre">	</span>print "********************************************" + str(count)
        tablename = "temp"
        newdf = sqlContext.createDataFrame(rltrdd)
        newdf.registerAsTable(tablename)

        sql_create = "create table  temptable like records"
        sql_insert = "insert into table temptable select * from temp"
        sqlContext.sql(sql_create)
        sqlContext.sql(sql_insert)
        sc.stop()

錯誤原因:

filter必須返回有True的值,否則為空