1. 程式人生 > >pyspark讀寫SequenceFile

pyspark讀寫SequenceFile

完整程式碼如下:

# -*- coding: utf-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# @Author: appleyuchi
# @Date:   2018-07-19 14:59:02
# @Last Modified by:   appleyuchi
# @Last Modified time: 2018-07-20 14:59:51
import subprocess
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
conf = SparkConf(). setMaster( "local"). setAppName( "My App")
sc = SparkContext( conf = conf)
lines=sc.textFile("README.md")
def g(x):
    print x


print"-----------------Example 5-20書上程式碼有誤,誤用了scala----------------------------------------------------"
print"-----------------下面先是序列化,寫入SequenceFile-------------------"
rdd = sc.parallelize(["2,Fitness", "3,Footwear", "4,Apparel"])
ret = subprocess.call(["rm", "-r","testSeq"], shell=False)
rdd.map(lambda x: tuple(x.split(",", 1))).saveAsSequenceFile("testSeq")
ret = subprocess.call(["rm", "-r","testSeqNone"], shell=False)
rdd.map(lambda x: (None, x)).saveAsSequenceFile("testSeqNone")#這的意思是保留整個字串

print"-----------------再是反序列化,讀取SequenceFile-------------------"
Text = "org.apache.hadoop.io.Text"
print (sc.sequenceFile("./testSeq/part-00000", Text, Text).values().first())
print"------------------------------------"
result=sc.sequenceFile("./testSeqNone/part-00000", Text, Text).values()
print type(result)
print result.foreach(g)
print (sc.sequenceFile("./testSeqNone/part-00000", Text, Text).values().first())