pyspark讀寫SequenceFile
阿新 • • 發佈:2018-12-21
完整程式碼如下:
# -*- coding: utf-8 -*- import sys reload(sys) sys.setdefaultencoding('utf-8') # @Author: appleyuchi # @Date: 2018-07-19 14:59:02 # @Last Modified by: appleyuchi # @Last Modified time: 2018-07-20 14:59:51 import subprocess from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext conf = SparkConf(). setMaster( "local"). setAppName( "My App") sc = SparkContext( conf = conf) lines=sc.textFile("README.md") def g(x): print x print"-----------------Example 5-20書上程式碼有誤,誤用了scala----------------------------------------------------" print"-----------------下面先是序列化,寫入SequenceFile-------------------" rdd = sc.parallelize(["2,Fitness", "3,Footwear", "4,Apparel"]) ret = subprocess.call(["rm", "-r","testSeq"], shell=False) rdd.map(lambda x: tuple(x.split(",", 1))).saveAsSequenceFile("testSeq") ret = subprocess.call(["rm", "-r","testSeqNone"], shell=False) rdd.map(lambda x: (None, x)).saveAsSequenceFile("testSeqNone")#這的意思是保留整個字串 print"-----------------再是反序列化,讀取SequenceFile-------------------" Text = "org.apache.hadoop.io.Text" print (sc.sequenceFile("./testSeq/part-00000", Text, Text).values().first()) print"------------------------------------" result=sc.sequenceFile("./testSeqNone/part-00000", Text, Text).values() print type(result) print result.foreach(g) print (sc.sequenceFile("./testSeqNone/part-00000", Text, Text).values().first())