1. 程式人生 > >在Hadoop中處理輸入的CSV檔案

在Hadoop中處理輸入的CSV檔案

No Reply , Posted in Hadoop on December 2, 2012

在Hadoop中,InputFormat類用來生成可供Mapper處理的<key, value>鍵值對。當資料傳送給Mapper時,Mapper會將輸入分片傳送到InputFormat上,InputFormat呼叫getRecordReader()方法生成RecordReader,RecordReader再建立可供map函式處理的鍵值對<K1, V1>。

Hadoop預定義了多種方法將不同型別的輸入資料轉化為map能夠處理的鍵值對。比如,TextInputFormat,Hadoop中預設的輸入方法,會將每行資料生成一條記錄,其中key值為每條記錄在分片中的位元組偏移量,value則為每行的內容。

在Hadoop預定義的InputFormat中,並沒有處理CSV檔案的方法。CSV檔案的本質其實是用逗號分隔開的文字檔案。一種很直觀的處理方法是:將CSV檔案作為文字檔案處理,使用TextInputFormat將檔案按行傳入map函式,在map函式中再按照CSV檔案的格式進行處理。但這樣很容易將資料格式的處理邏輯與業務處理邏輯混淆在一起,並且出現很多copy-and-pasted的程式碼。

實際上,可以寫一個自己的InputFormat以及RecordReader類,專門用來處理CSV檔案的輸入,直接傳遞給map函式解析後的資料。 


1 資料結構


我們傳遞給map函式一個ArrayWritable(A Writable for arrays containing instances of a class),元素型別為Text,即CSV檔案每一行各個欄位的資料。資料結構如下:

程式碼1:TextArrayWritable.java


  1. publicclassTextArrayWritableextendsArrayWritable{
  2. publicTextArrayWritable(){
  3. super(Text.class);
  4. }
  5. publicTextArrayWritable(Text[] strings){
  6. super(Text.class, strings);
  7. }
  8. }

2 CSVInputFormat


FileInputFormat是所有使用檔案作為其資料來源的InputFormat實現的基類。它提供了兩個功能:一是定義哪些檔案包含在一個作業的輸入中,另一個是為輸入檔案生成分片(Input Splits)。而把分片分割成記錄的事情交由其子類來完成。所以CSVInputFormat類的實現上,同樣是繼承InputFormat類,並只需要簡單的重寫createRecordReader和isSplitable即可。

程式碼2:CSVInputFormat.java


  1. publicclassCSVInputFormat
  2. extendsFileInputFormat<LongWritable,TextArrayWritable>{
  3. publicstaticfinalString CSV_TOKEN_SEPARATOR_CONFIG
  4. ="csvinputformat.token.delimiter";
  5. @Override
  6. protectedboolean isSplitable(JobContext context,Path filename){
  7. CompressionCodec codec =
  8. newCompressionCodecFactory(context.getConfiguration())
  9. .getCodec(filename);
  10. return codec ==null;
  11. }
  12. @Override
  13. publicRecordReader<LongWritable,TextArrayWritable> createRecordReader(
  14. InputSplit split,TaskAttemptContext context)
  15. throwsIOException,InterruptedException{
  16. String csvDelimiter = context.getConfiguration()
  17. .get(CSV_TOKEN_SEPARATOR_CONFIG);
  18. Character separator =null;
  19. if(csvDelimiter !=null&& csvDelimiter.length()==1){
  20. separator = csvDelimiter.charAt(0);
  21. }
  22. returnnewCSVRecordReader(separator);
  23. }
  24. }

其中csvinputformat.token.delimiter是可在配置檔案中配置的CSV輸入檔案分隔符,createRecordReader完成的工作只是從配置檔案中得到分隔符,呼叫真正對CSV檔案分片進行處理,並生成鍵值對的CSVRecordReader函式,並返回RecordReader物件。


3 CSVRecordReader


對於CSVRecordReader,要實現的功能無非就是將CSV檔案中每一行的各欄位提取出來,並將各欄位作為TextArrayWritable型別的資料結構傳遞給map函式。

在Hadoop中有一個LineRecordReader類,它將文字檔案每一行的內容作為值返回,型別為Text。所以可以直接在CSVRecordReader中使用LineRecordReader,將LineRecordReader返回的每一行再次進行處理。在CSV檔案的處理上,這裡用到了OpenCSV對CSV檔案的每一行進行解析,具體可參見這裡。

下面是CSVRecordReader的實現程式碼。除了CSV檔案的解析、nextKeyValue()方法和getCurrentValue()方法外,大部分方法都直接呼叫LineRecordReader例項的相應方法。畢竟我們是踩在巨人的肩膀上繼續前進嘛。O(∩_∩)O~

程式碼3:CSVRecordReader.java


  1. publicclassCSVRecordReader
  2. extendsRecordReader<LongWritable,TextArrayWritable>{
  3. privateLineRecordReader lineReader;
  4. privateTextArrayWritable value;
  5. privateCSVParser parser;
  6. // 新建CSVParser例項,用來解析每一行CSV檔案的每一行
  7. publicCSVRecordReader(Character delimiter){
  8. this.lineReader =newLineRecordReader();
  9. if(delimiter ==null){
  10. this.parser =newCSVParser();
  11. }
  12. else{
  13. this.parser =newCSVParser(delimiter);
  14. }
  15. }
  16. // 呼叫LineRecordReader的初始化方法,尋找分片的開始位置
  17. @Override
  18. publicvoid initialize(InputSplit split,TaskAttemptContext context)
  19. throwsIOException,InterruptedException{
  20. lineReader.initialize(split, context);
  21. }
  22. // 使用LineRecordReader來得到下一條記錄(即下一行)。
  23. // 如果到了分片(Input Split)的尾部,nextKeyValue將返回NULL
  24. @Override
  25. publicboolean nextKeyValue()
  26. throwsIOException,InterruptedException{
  27. if(lineReader.nextKeyValue()){
  28. //如果有新記錄,則進行處理
  29. loadCSV();
  30. returntrue;
  31. }
  32. else{
  33. value =null;
  34. returnfalse;
  35. }
  36. }
  37. @Override
  38. publicLongWritable getCurrentKey()throwsIOException,
  39. InterruptedException{
  40. return lineReader.getCurrentKey();
  41. }
  42. @Override
  43. publicTextArrayWritable getCurrentValue()throwsIOException,
  44. InterruptedException{
  45. return value;
  46. }
  47. @Override
  48. publicfloat getProgress()throwsIOException,InterruptedException{
  49. return lineReader.getProgress();
  50. }
  51. @Override
  52. publicvoid close()throwsIOException{
  53. lineReader.close();
  54. }
  55. // 對CSV檔案的每一行進行處理
  56. privatevoid loadCSV()throwsIOException{
  57. String line = lineReader.getCurrentValue().toString();
  58. // 通過OpenCSV將解析每一行的各欄位
  59. String[] tokens = parser.parseLine(line);
  60. value =newTextArrayWritable(convert(tokens));
  61. }
  62. // 將字串陣列批量處理為Text陣列
  63. privateText[] convert(String[] tokens){
  64. Text[] t =newText[tokens.length];
  65. for(int i =0; i < t.length; i++){
  66. t[i]=newText(tokens[i]);
  67. }
  68. return t;
  69. }
  70. }

4 簡單的應用


用於處理CSV檔案輸入的InputFormat已經寫完了,現在構造一個簡單的應用場景,來試驗下這個CSVInputFormat。

假設有這樣一些資料,每一列第一個欄位為一個標識,後面為隨機產生的數字,標識各不相同,求每一行標識後的數字之和並輸出,輸出格式為:每一行為標識和數字和。

由於標識沒有重複,並且邏輯比較簡單,這裡只寫一個Mapper即可,不需要Reducer。

程式碼4:CSVMapper.java


  1. publicclassCSVMapper
  2. extendsMapper<LongWritable,TextArrayWritable,Text,IntWritable>{
  3. @Override
  4. protectedvoid map(LongWritable key,TextArrayWritable value,Context context)
  5. throwsIOException,InterruptedException{
  6. String[] values = value.toStrings();
  7. int sum =0;
  8. Text resultKey =newText(values[0]);
  9. for(int i =1; i < values.length; i++){
  10. sum = sum +Integer.valueOf(values[i].trim());
  11. }
  12. IntWritable resultValue =newIntWritable(sum);
  13. context.write(resultKey, resultValue);
  14. }
  15. }

在作業的提交部分,由於沒有Reducer,所以將ReduceTask設定為了0

程式碼5:JustRun.java


  1. publicclassJustRunextendsConfiguredimplementsTool{
  2. @Override
  3. publicint run(String[] args)throwsException{
  4. Configuration conf =newConfiguration();
  5. Job job =newJob(conf);
  6. job.setJobName("CSVTest");
  7. job.setJarByClass(JustRun.class);
  8. job.setMapperClass(CSVMapper.class);
  9. job.setOutputKeyClass(Text.class);
  10. job.setOutputValueClass(IntWritable.class);
  11. job.setInputFormatClass(CSVInputFormat.class);
  12. job.setNumReduceTasks(0);
  13. FileInputFormat.setInputPaths(job,newPath(args[0]));
  14. FileOutputFormat.setOutputPath(job,newPath(args[1]));
  15. return job.waitForCompletion(true)?0:1;
  16. }
  17. publicstaticvoid main(String[] args)throwsException{
  18. int ret =ToolRunner.run(newJustRun(), args);
  19. System.exit(ret);
  20. }
  21. }

執行完畢後,輸出如下,跟預想是一致的。

好了,這就是利用InputFormat對CSV檔案的處理過程。除了CSV檔案,還可根據處理資料的型別,寫出更多的InputFormat。同時,我們還可以利用OutputFormat輸出需要的格式。

轉自

http://bukp.me/hadoop/work-with-csv-input-file-in-hadoop.html

相關推薦

Hadoop處理輸入CSV檔案

No Reply , Posted in Hadoop on December 2, 2012 在Hadoop中,InputFormat類用來生成可供Mapper處理的<key, value>鍵值對。當資料傳送給Mapper時,Mapper會將輸入分片傳送到

資料處理:用pandas處理大型csv檔案

在訓練機器學習模型的過程中,源資料常常不符合我們的要求。大量繁雜的資料,需要按照我們的需求進行過濾。拿到我們想要的資料格式,並建立能夠反映資料間邏輯結構的資料表達形式。 最近就拿到一個小任務,需要處理70多萬條資料。 我們在處理csv檔案時,經常使用pandas,可以幫助處理較大的

[爬蟲技巧] Scrapy定製寫入CSV檔案的Pipeline

前言:         在使用Scrapy寫專案時,難免有時會需要將資料寫入csv檔案中,自帶的FEED寫法如下:         settings.py 

Python資料的CSV檔案存取

Python中資料的CSV檔案存取 csv(逗號分隔值檔案格式)用來儲存批量資料 一.資料儲存 np.savetxt(frame, array, fmt, delimiter=None) • frame : 檔案、字串或產生器,可以是.gz或.bz2的壓縮檔案

Pythonpandas讀取*.csv檔案出現編碼問題

1、問題 在使用Python中pandas讀取csv檔案時,由於檔案編碼格式出現以下問題: Traceback (most recent call last): File "pandas\_libs\parsers.pyx", line 1134, in pandas

python從SQLAlchemy匯出匯入CSV檔案

讀寫CSV資料 讀csv檔案 將資料讀取為一個元祖的序列 import csv with open('stocks.csv') as f: f_csv = csv.reader(f) headers = next(f_csv) for row in f_c

python處理csv檔案1內容修改後寫入到csv2檔案

我這裡的方法或許不是很好,主要做法是使用列表和字典進行轉換,感覺很繁瑣,但是也是一種方法。 如果有大神有更好的方法,請留言。 # -*- coding:utf-8 -*- #author:zgd import pandas import csv # with open("url_fea

java如何設定.csv檔案時間的格式

csv是逗號分割值得縮寫,不可以定義單元格格式,所以匯出來的檔案,如下圖,時間格式不完整,想了半天, 給時間前面拼接一個製表符程式碼如下圖 匯出來的時間就是yyyy-mm-dd hh:mm:ss的完整格式,

使用python處理中文csv檔案,並讓excel正確顯示中文(避免亂碼)

使用python2處理中文時,經常會出現亂碼問題,之前使用python處理中文csv檔案時,處理過程程式輸出出現了亂碼,最後在程式中能夠正常顯示出中文,但是匯出檔案時,使用excel打開出現了中文亂碼問

遠端從Mongodb 資料庫匯出為CSV檔案

Java 連線Mongodb 資料庫並且從資料庫匯出資料為CSV檔案。 需要匯入三個包: bson-3.4.3.jar mongo-java-driver-3.4.3.jar mongodb-driver-core-3.4.3.jar 當然版本可以改變。可以從http:/

python處理某些資料夾下的csv檔案,提取指定行到新檔案

需求: 提取2017-06--2017-08三個月的資料中的某些行到一個新的檔案中。 思路: 1.迴圈讀取每個資料夾下的每一個檔案 2.解析檔案,把滿足條件的資料寫到list中 3.把list寫入新檔案。 程式碼如下:【環境是Windows7專業版+Python2.7.9】

利用numpy和pandas處理csv檔案的時間

環境:numpy,pandas,python3 在機器學習和深度學習的過程中,對於處理預測,迴歸問題,有時候變數是時間,需要進行合適的轉換處理後才能進行學習分析,關於時間的變數如下所示,利用pandas和numpy對csv檔案中時間進行處理。 date

如何在Hadoop的MapReduce程式處理JSON檔案

簡介: 最近在寫MapReduce程式處理日誌時,需要解析JSON配置檔案,簡化Java程式和處理邏輯。但是Hadoop本身似乎沒有內建對JSON檔案的解析功能,我們不得不求助於第三方JSON工具包。這裡選擇json-simple實現我們的功能。 在Hadoop上執行Jav

hadoop處理不同輸入目錄檔案

在寫mapred任務的時候免不了要處理join。 在join中最簡單的就是一對一的join。 下面通過一個小例子介紹如果在mapred中實現一對一的join。 name.txt 100 tom 101 mary 102 kate score.txt 100 90 10

命令列使用SVN以及指令獲取SVN的差異列表檔案和其中的錯誤處理

當我們需要獲取SVN管理檔案中的差異列表的檔案,用來進行其他任務的時候: 我們需要使用的指令: svn diff -r COMMITTED:HEAD --summarize [需要獲取的庫的路徑] > [生成的差異列表檔案

python如何把log輸入檔案

import logging,os import time def getlogger(loggername='mylogger'): # 使用一個名字為mylogger的logger logger = logging.getLogger(loggername) # 設定lo

爬蟲資料儲存為csv檔案時,表格間隔有空行問題

問題描述:將爬取的資料儲存的csv檔案,遇到幾個問題,原始碼如下: with open('F:\\Pythontest1\\douban.csv','w') as f: writer = csv.writer(f,dialect='excel') writer.writero

檔案輸入輸出及csv檔案

讀取檔案 fileobject=open('b.txt') result=fileobject.read() print(result) fileobject.close() 測試結果如下圖所示: 寫入檔案: fileobject=open('c.txt',mode='w')

shell處理使用者輸入

1、使用命令列引數 在shell執行的時候命令列中輸入的所有引數可以賦值給一些特殊變數,這些變數成為位置變數引數。 包括: $0返回指令碼名稱、$1為第一個引數、$2為第二個引數 ...$9第九個引數 在變數到9個之後,必須使用大括號將變數括起來 ${10}第十個引數 $#是獲取傳入的引數數量 $*是獲取所

python 處理CSV檔案

讀取 import csv with open('test.csv','r',encoding='utf8') as csvfile: reader = csv.reader(csvfile) for i in reader: print(i) 寫