在Hadoop中處理輸入的CSV檔案
No Reply , Posted in Hadoop on December 2, 2012
在Hadoop中,InputFormat類用來生成可供Mapper處理的<key, value>鍵值對。當資料傳送給Mapper時,Mapper會將輸入分片傳送到InputFormat上,InputFormat呼叫getRecordReader()方法生成RecordReader,RecordReader再建立可供map函式處理的鍵值對<K1, V1>。
Hadoop預定義了多種方法將不同型別的輸入資料轉化為map能夠處理的鍵值對。比如,TextInputFormat,Hadoop中預設的輸入方法,會將每行資料生成一條記錄,其中key值為每條記錄在分片中的位元組偏移量,value則為每行的內容。
在Hadoop預定義的InputFormat中,並沒有處理CSV檔案的方法。CSV檔案的本質其實是用逗號分隔開的文字檔案。一種很直觀的處理方法是:將CSV檔案作為文字檔案處理,使用TextInputFormat將檔案按行傳入map函式,在map函式中再按照CSV檔案的格式進行處理。但這樣很容易將資料格式的處理邏輯與業務處理邏輯混淆在一起,並且出現很多copy-and-pasted的程式碼。
實際上,可以寫一個自己的InputFormat以及RecordReader類,專門用來處理CSV檔案的輸入,直接傳遞給map函式解析後的資料。
1 資料結構
我們傳遞給map函式一個ArrayWritable(A Writable for arrays containing instances of a class),元素型別為Text,即CSV檔案每一行各個欄位的資料。資料結構如下:
程式碼1:TextArrayWritable.java
- publicclassTextArrayWritableextendsArrayWritable{
- publicTextArrayWritable(){
- super(Text.class);
- }
- publicTextArrayWritable(Text[] strings){
- super(Text.class, strings);
- }
- }
2 CSVInputFormat
FileInputFormat是所有使用檔案作為其資料來源的InputFormat實現的基類。它提供了兩個功能:一是定義哪些檔案包含在一個作業的輸入中,另一個是為輸入檔案生成分片(Input Splits)。而把分片分割成記錄的事情交由其子類來完成。所以CSVInputFormat類的實現上,同樣是繼承InputFormat類,並只需要簡單的重寫createRecordReader和isSplitable即可。
程式碼2:CSVInputFormat.java
- publicclassCSVInputFormat
- extendsFileInputFormat<LongWritable,TextArrayWritable>{
- publicstaticfinalString CSV_TOKEN_SEPARATOR_CONFIG
- ="csvinputformat.token.delimiter";
- @Override
- protectedboolean isSplitable(JobContext context,Path filename){
- CompressionCodec codec =
- newCompressionCodecFactory(context.getConfiguration())
- .getCodec(filename);
- return codec ==null;
- }
- @Override
- publicRecordReader<LongWritable,TextArrayWritable> createRecordReader(
- InputSplit split,TaskAttemptContext context)
- throwsIOException,InterruptedException{
- String csvDelimiter = context.getConfiguration()
- .get(CSV_TOKEN_SEPARATOR_CONFIG);
- Character separator =null;
- if(csvDelimiter !=null&& csvDelimiter.length()==1){
- separator = csvDelimiter.charAt(0);
- }
- returnnewCSVRecordReader(separator);
- }
- }
其中csvinputformat.token.delimiter是可在配置檔案中配置的CSV輸入檔案分隔符,createRecordReader完成的工作只是從配置檔案中得到分隔符,呼叫真正對CSV檔案分片進行處理,並生成鍵值對的CSVRecordReader函式,並返回RecordReader物件。
3 CSVRecordReader
對於CSVRecordReader,要實現的功能無非就是將CSV檔案中每一行的各欄位提取出來,並將各欄位作為TextArrayWritable型別的資料結構傳遞給map函式。
在Hadoop中有一個LineRecordReader類,它將文字檔案每一行的內容作為值返回,型別為Text。所以可以直接在CSVRecordReader中使用LineRecordReader,將LineRecordReader返回的每一行再次進行處理。在CSV檔案的處理上,這裡用到了OpenCSV對CSV檔案的每一行進行解析,具體可參見這裡。
下面是CSVRecordReader的實現程式碼。除了CSV檔案的解析、nextKeyValue()方法和getCurrentValue()方法外,大部分方法都直接呼叫LineRecordReader例項的相應方法。畢竟我們是踩在巨人的肩膀上繼續前進嘛。O(∩_∩)O~
程式碼3:CSVRecordReader.java
- publicclassCSVRecordReader
- extendsRecordReader<LongWritable,TextArrayWritable>{
- privateLineRecordReader lineReader;
- privateTextArrayWritable value;
- privateCSVParser parser;
- // 新建CSVParser例項,用來解析每一行CSV檔案的每一行
- publicCSVRecordReader(Character delimiter){
- this.lineReader =newLineRecordReader();
- if(delimiter ==null){
- this.parser =newCSVParser();
- }
- else{
- this.parser =newCSVParser(delimiter);
- }
- }
- // 呼叫LineRecordReader的初始化方法,尋找分片的開始位置
- @Override
- publicvoid initialize(InputSplit split,TaskAttemptContext context)
- throwsIOException,InterruptedException{
- lineReader.initialize(split, context);
- }
- // 使用LineRecordReader來得到下一條記錄(即下一行)。
- // 如果到了分片(Input Split)的尾部,nextKeyValue將返回NULL
- @Override
- publicboolean nextKeyValue()
- throwsIOException,InterruptedException{
- if(lineReader.nextKeyValue()){
- //如果有新記錄,則進行處理
- loadCSV();
- returntrue;
- }
- else{
- value =null;
- returnfalse;
- }
- }
- @Override
- publicLongWritable getCurrentKey()throwsIOException,
- InterruptedException{
- return lineReader.getCurrentKey();
- }
- @Override
- publicTextArrayWritable getCurrentValue()throwsIOException,
- InterruptedException{
- return value;
- }
- @Override
- publicfloat getProgress()throwsIOException,InterruptedException{
- return lineReader.getProgress();
- }
- @Override
- publicvoid close()throwsIOException{
- lineReader.close();
- }
- // 對CSV檔案的每一行進行處理
- privatevoid loadCSV()throwsIOException{
- String line = lineReader.getCurrentValue().toString();
- // 通過OpenCSV將解析每一行的各欄位
- String[] tokens = parser.parseLine(line);
- value =newTextArrayWritable(convert(tokens));
- }
- // 將字串陣列批量處理為Text陣列
- privateText[] convert(String[] tokens){
- Text[] t =newText[tokens.length];
- for(int i =0; i < t.length; i++){
- t[i]=newText(tokens[i]);
- }
- return t;
- }
- }
4 簡單的應用
用於處理CSV檔案輸入的InputFormat已經寫完了,現在構造一個簡單的應用場景,來試驗下這個CSVInputFormat。
假設有這樣一些資料,每一列第一個欄位為一個標識,後面為隨機產生的數字,標識各不相同,求每一行標識後的數字之和並輸出,輸出格式為:每一行為標識和數字和。
由於標識沒有重複,並且邏輯比較簡單,這裡只寫一個Mapper即可,不需要Reducer。
程式碼4:CSVMapper.java
- publicclassCSVMapper
- extendsMapper<LongWritable,TextArrayWritable,Text,IntWritable>{
- @Override
- protectedvoid map(LongWritable key,TextArrayWritable value,Context context)
- throwsIOException,InterruptedException{
- String[] values = value.toStrings();
- int sum =0;
- Text resultKey =newText(values[0]);
- for(int i =1; i < values.length; i++){
- sum = sum +Integer.valueOf(values[i].trim());
- }
- IntWritable resultValue =newIntWritable(sum);
- context.write(resultKey, resultValue);
- }
- }
在作業的提交部分,由於沒有Reducer,所以將ReduceTask設定為了0
程式碼5:JustRun.java
- publicclassJustRunextendsConfiguredimplementsTool{
- @Override
- publicint run(String[] args)throwsException{
- Configuration conf =newConfiguration();
- Job job =newJob(conf);
- job.setJobName("CSVTest");
- job.setJarByClass(JustRun.class);
- job.setMapperClass(CSVMapper.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- job.setInputFormatClass(CSVInputFormat.class);
- job.setNumReduceTasks(0);
- FileInputFormat.setInputPaths(job,newPath(args[0]));
- FileOutputFormat.setOutputPath(job,newPath(args[1]));
- return job.waitForCompletion(true)?0:1;
- }
- publicstaticvoid main(String[] args)throwsException{
- int ret =ToolRunner.run(newJustRun(), args);
- System.exit(ret);
- }
- }
執行完畢後,輸出如下,跟預想是一致的。
好了,這就是利用InputFormat對CSV檔案的處理過程。除了CSV檔案,還可根據處理資料的型別,寫出更多的InputFormat。同時,我們還可以利用OutputFormat輸出需要的格式。
轉自
http://bukp.me/hadoop/work-with-csv-input-file-in-hadoop.html
相關推薦
在Hadoop中處理輸入的CSV檔案
No Reply , Posted in Hadoop on December 2, 2012 在Hadoop中,InputFormat類用來生成可供Mapper處理的<key, value>鍵值對。當資料傳送給Mapper時,Mapper會將輸入分片傳送到
資料處理:用pandas處理大型csv檔案
在訓練機器學習模型的過程中,源資料常常不符合我們的要求。大量繁雜的資料,需要按照我們的需求進行過濾。拿到我們想要的資料格式,並建立能夠反映資料間邏輯結構的資料表達形式。 最近就拿到一個小任務,需要處理70多萬條資料。 我們在處理csv檔案時,經常使用pandas,可以幫助處理較大的
[爬蟲技巧] Scrapy中定製寫入CSV檔案的Pipeline
前言: 在使用Scrapy寫專案時,難免有時會需要將資料寫入csv檔案中,自帶的FEED寫法如下: settings.py
Python中資料的CSV檔案存取
Python中資料的CSV檔案存取 csv(逗號分隔值檔案格式)用來儲存批量資料 一.資料儲存 np.savetxt(frame, array, fmt, delimiter=None) • frame : 檔案、字串或產生器,可以是.gz或.bz2的壓縮檔案
Python中pandas讀取*.csv檔案出現編碼問題
1、問題 在使用Python中pandas讀取csv檔案時,由於檔案編碼格式出現以下問題: Traceback (most recent call last): File "pandas\_libs\parsers.pyx", line 1134, in pandas
python從SQLAlchemy中匯出匯入CSV檔案
讀寫CSV資料 讀csv檔案 將資料讀取為一個元祖的序列 import csv with open('stocks.csv') as f: f_csv = csv.reader(f) headers = next(f_csv) for row in f_c
python處理將csv檔案1內容修改後寫入到csv2檔案
我這裡的方法或許不是很好,主要做法是使用列表和字典進行轉換,感覺很繁瑣,但是也是一種方法。 如果有大神有更好的方法,請留言。 # -*- coding:utf-8 -*- #author:zgd import pandas import csv # with open("url_fea
java中如何設定.csv檔案中時間的格式
csv是逗號分割值得縮寫,不可以定義單元格格式,所以匯出來的檔案,如下圖,時間格式不完整,想了半天, 給時間前面拼接一個製表符程式碼如下圖 匯出來的時間就是yyyy-mm-dd hh:mm:ss的完整格式,
使用python處理中文csv檔案,並讓excel正確顯示中文(避免亂碼)
使用python2處理中文時,經常會出現亂碼問題,之前使用python處理中文csv檔案時,處理過程程式輸出出現了亂碼,最後在程式中能夠正常顯示出中文,但是匯出檔案時,使用excel打開出現了中文亂碼問
遠端從Mongodb 資料庫中匯出為CSV檔案
Java 連線Mongodb 資料庫並且從資料庫匯出資料為CSV檔案。 需要匯入三個包: bson-3.4.3.jar mongo-java-driver-3.4.3.jar mongodb-driver-core-3.4.3.jar 當然版本可以改變。可以從http:/
python處理某些資料夾下的csv檔案,提取指定行到新檔案中
需求: 提取2017-06--2017-08三個月的資料中的某些行到一個新的檔案中。 思路: 1.迴圈讀取每個資料夾下的每一個檔案 2.解析檔案,把滿足條件的資料寫到list中 3.把list寫入新檔案。 程式碼如下:【環境是Windows7專業版+Python2.7.9】
利用numpy和pandas處理csv檔案中的時間
環境:numpy,pandas,python3 在機器學習和深度學習的過程中,對於處理預測,迴歸問題,有時候變數是時間,需要進行合適的轉換處理後才能進行學習分析,關於時間的變數如下所示,利用pandas和numpy對csv檔案中時間進行處理。 date
如何在Hadoop的MapReduce程式中處理JSON檔案
簡介: 最近在寫MapReduce程式處理日誌時,需要解析JSON配置檔案,簡化Java程式和處理邏輯。但是Hadoop本身似乎沒有內建對JSON檔案的解析功能,我們不得不求助於第三方JSON工具包。這裡選擇json-simple實現我們的功能。 在Hadoop上執行Jav
hadoop處理不同輸入目錄檔案
在寫mapred任務的時候免不了要處理join。 在join中最簡單的就是一對一的join。 下面通過一個小例子介紹如果在mapred中實現一對一的join。 name.txt 100 tom 101 mary 102 kate score.txt 100 90 10
命令列使用SVN以及指令獲取SVN中的差異列表檔案和其中的錯誤處理
當我們需要獲取SVN管理檔案中的差異列表的檔案,用來進行其他任務的時候: 我們需要使用的指令: svn diff -r COMMITTED:HEAD --summarize [需要獲取的庫的路徑] > [生成的差異列表檔案
python中如何把log輸入到檔案
import logging,os import time def getlogger(loggername='mylogger'): # 使用一個名字為mylogger的logger logger = logging.getLogger(loggername) # 設定lo
爬蟲資料儲存為csv檔案時,表格中間隔有空行問題
問題描述:將爬取的資料儲存的csv檔案,遇到幾個問題,原始碼如下: with open('F:\\Pythontest1\\douban.csv','w') as f: writer = csv.writer(f,dialect='excel') writer.writero
檔案輸入輸出及csv檔案
讀取檔案 fileobject=open('b.txt') result=fileobject.read() print(result) fileobject.close() 測試結果如下圖所示: 寫入檔案: fileobject=open('c.txt',mode='w')
shell中處理使用者輸入
1、使用命令列引數 在shell執行的時候命令列中輸入的所有引數可以賦值給一些特殊變數,這些變數成為位置變數引數。 包括: $0返回指令碼名稱、$1為第一個引數、$2為第二個引數 ...$9第九個引數 在變數到9個之後,必須使用大括號將變數括起來 ${10}第十個引數 $#是獲取傳入的引數數量 $*是獲取所
python 處理CSV檔案
讀取 import csv with open('test.csv','r',encoding='utf8') as csvfile: reader = csv.reader(csvfile) for i in reader: print(i) 寫