1. 程式人生 > >spark streaming 根據檔案內容自定義檔名輸出,並實現檔案內容追加

spark streaming 根據檔案內容自定義檔名輸出,並實現檔案內容追加

spark streaming 從kafka拉取資料,根絕訊息內容,需要將不容的訊息放到不同的資料夾下,大致內容為 從訊息中拆分出域名,不同域名分不到不同目錄,域名下按訊息中的時間分年月日目錄,底層目錄下自定義檔案個數,實現追加
由於sparkstreaming 10秒執行一次job,需要重寫 OutputFormat,來實現按內容分目錄,檔案追加

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

val line = lines.map(x => (x, 1)).repartition
(20) line.saveAsHadoopFiles("","",classOf[Text],classOf[NullWritable],classOf[MyMultipleTextOutputFormat[Text,NullWritable]])

MyMultipleTextOutputFormat 即為我們重寫的類

package com.linkingcloud.bigdata.common;

import com.linkingcloud.bigdata.common.interpret.LineInterpret;
import org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.mapred.RecordWriter; import java.io.IOException; import java.util.Iterator; import java.util.Random; import
java.util.TreeMap; /** * Created by apple on 2017/2/15. */ public class MyMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> { private TextOutputFormat<K, V> theTextOutputFormat = null; public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable arg3) throws IOException { return new RecordWriter() { TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap(); public void write(Object key, Object value) throws IOException { //key中為訊息內容,value無意義 String line = key.toString(); //根據訊息內容,定義輸出路徑和輸出內容(同時清洗資料) String[] ss = LineInterpret.interpretLine(line, "/test/spark/kafka"); if (ss != null && ss.length == 2) { //name的最後兩位為jobid,同一個檔案只能同時允許一個job寫入,多個job寫一個檔案會報錯,將jobid作為檔名的一部分 //能解決此問題 String finalPath = ss[1] + "-" + name.substring(name.length() - 2); RecordWriter rw = (RecordWriter) this.recordWriters.get(finalPath); try { if (rw == null) { rw = getBaseRecordWriter(fs, job, finalPath, arg3); this.recordWriters.put(finalPath, rw); } rw.write(ss[0], null); } catch (Exception e) { //一個週期內,job不能完成,下一個job啟動,會造成同時寫一個檔案的情況,變更檔名,新增字尾 this.rewrite(finalPath + "-", ss[0]); } } } public void rewrite(String path, String line) { String finalPath = path + new Random().nextInt(10); RecordWriter rw = (RecordWriter) this.recordWriters.get(finalPath); try { if (rw == null) { rw = getBaseRecordWriter(fs, job, finalPath, arg3); this.recordWriters.put(finalPath, rw); } rw.write(line, null); } catch (Exception e) { //重試 this.rewrite(finalPath, line); } } public void close(Reporter reporter) throws IOException { Iterator keys = this.recordWriters.keySet().iterator(); while (keys.hasNext()) { RecordWriter rw = (RecordWriter) this.recordWriters.get(keys.next()); rw.close(reporter); } this.recordWriters.clear(); } }; } protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String path, Progressable arg3) throws IOException { if (this.theTextOutputFormat == null) { this.theTextOutputFormat = new MyTextOutputFormat(); } return this.theTextOutputFormat.getRecordWriter(fs, job, path, arg3); } }

MyTextOutputFormat中實現對存在的檔案進行append,而不是覆蓋

package com.linkingcloud.bigdata.common;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.DataOutputStream;
import java.io.IOException;

/**
 * Created by apple on 2017/2/15.
 */

public class MyTextOutputFormat<K, V> extends TextOutputFormat<K, V> {

    public MyTextOutputFormat() {
    }

    @Override
    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String path, Progressable progress) throws IOException {

        String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t");

        CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, job);

        Path file = FileOutputFormat.getTaskOutputPath(job, path + codec.getDefaultExtension());

        FileSystem fs = file.getFileSystem(job);

        String file_path = path + codec.getDefaultExtension();

        Path newFile = new Path(FileOutputFormat.getOutputPath(job), file_path);

        FSDataOutputStream fileOut;

        if (fs.exists(newFile)) {
            fileOut = fs.append(newFile,4096,progress);
        } else {
            fileOut = fs.create(newFile, progress);
        }
        return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);

    }
}

結果如下:
這裡寫圖片描述
spark streaming 採用gzip壓縮,會導致derect memory洩露,暫時沒有找到好的解決方法,只能不使用壓縮,誰解決了此問題,可以留言,感謝!

相關推薦

spark streaming 根據檔案內容定義檔名輸出實現檔案內容追加

spark streaming 從kafka拉取資料,根絕訊息內容,需要將不容的訊息放到不同的資料夾下,大致內容為 從訊息中拆分出域名,不同域名分不到不同目錄,域名下按訊息中的時間分年月日目錄,底層目錄下自定義檔案個數,實現追加 由於sparkstrea

springmvc在處理請求過程中出現異常資訊交由異常處理器進行處理,自定義異常處理器可以實現一個系統的異常處理邏輯。為了區別不同的異常通常根據異常型別自定義異常類這裡我們建立一個自定義系統異常如果controller、service、dao丟擲此類異常說明是系統預期處理的異常資訊。

springmvc在處理請求過程中出現異常資訊交由異常處理器進行處理,自定義異常處理器可以實現一個系統的異常處理邏輯。 1.1 異常處理思路 系統中異常包括兩類:預期異常和執行時異常RuntimeException,前者通過捕獲異常從而獲取異常資訊,後者主要通過規範程式碼開發、測試通過手段減少執

js定義方法通過隱藏iframe實現檔案下載

<script> function download() { //下載檔案的地址 var url="http://music.baidu.com/data/music/file?link=http://zhangmenshiting.baidu.com/data2/music/13618994/1

gin框架定義日誌輸出定義gin中介軟體擴充套件Logger

gin框架是款高效能的GoWeb框架,可以快速開發部署api服務。在使用過程中我們需要記錄各種各樣的日誌,下面介紹下我們怎麼自定義日誌記錄格式或擴充套件日誌。 gin簡單剖析 api服務建立 package main import "gith

Android 定義Dialog類在Activity中實現按鈕監聽。

實際開發中,經常會用到Dialog,比如退出時候會彈出是否退出,或者還有一些編輯框也會用Dialog實現,效果圖如下: 開發中遇到的問題無非在於如果在Activity中監聽這個Dialog中實現的按鈕,Dialog類如下,在MyDialog這個類中實現了一個LeaveMyDialogLi

【enum】如何在列舉中定義定義的方法進行使用

1、定一個列舉類 packagecom.eud.t1; publicenumColor { //定義列舉中的常量 RED(1,"hongse"), GREEN(2,"qingse"),BLACK

如何定義一個View動態設定style?

定義一個TextView的style,常用的方法是在xml檔案中實現,但有時候我們需要動態的在程式碼中自定義個style的View,進行動態的新增或刪除等控制,這時候需要用到inflater方法。 TextView的xml,R.layout.empty_tex

菜鳥學習shiro之實現定義的Realm從而實現登入驗證身份驗證和許可權驗證4

講了那麼多使用的內建的類從而實現四郎,現在講自定義的境界 首先行家的依賴依然是第一篇的那個依賴 下邊是自定義的境界: import org.apache.shiro.authc.AuthenticationException; import org.apache.shi

Mac ssh連線遠端伺服器實現檔案的上傳和下載

使用scp命令實現上傳下載 1、從伺服器上下載檔案 scp [email protected]:/path/filename /Users/mac/Desktop(本地目錄) 例如:scp [email protected]:/root/test.txt /Users/mac/Desk

MAC實用SSH連線遠端伺服器實現檔案上傳下載

MAC作為程式設計師的神器,在程式設計上的使用遠超window的電腦,而MAC本身提供了SSH功能,配合VIM編輯器對程式設計十分有幫助。 使用ssh連線遠端主機   sudo -i 切換root許可權 ssh [email protected] 其中,

檔案操作檔案操作時的許可權設定快速實現檔案拷貝C語言常用IO函式

定義一個檔案型別指標。用於儲存開啟檔案後的指標。 C語言中可以用 gets(char *str) 來輸入一行(包括空格)。 FILE *p = fopen("檔名",“開啟的許可權”); 其中許可權可以是w ,此時如果檔案不存在會自動建立,如果存在就會自動覆蓋(內容); 許

idea中定義設定xml的標頭檔案內容

  因為在idea中新建的xml預設的標頭檔案,有時候並不是我們需要的這時候可以通過自定義來解決. 如搭建hibernate的實體類的xml. 首先    fiel→settings出現 如下框框 在上面搜尋 temp     點選

mybatis generator 定義 xml 檔名稱和內容定義dao名稱定義

最近在用mybatis generator 生成程式碼的時候,生成的xml檔案 和類檔案 不是自己想要的,於是修改mybatis generator 的原始碼,重寫方法來達到效果,這裡記錄一下,後期如果需要還可以隨便改成自己想要的! 一 修改註釋     &nb

如何使用js將canvas儲存為圖片檔案並且可以定義檔名

1、從canvas中直接提取圖片元資料 // 圖片匯出為 png 格式 var type = 'png'; var imgData = canvas.toDataURL(type); 上面的程式碼得到的資料格式為:data:image/p

Java File類學習筆記4:定義一個類過濾指定副檔名檔案

一、FilenameFilter介紹 java.io.File類提供了四個方法用於列舉某個路徑下的檔案和目錄,但不會遞迴列舉子目錄下的內容 其中兩個是列舉路徑下的所有檔案和目錄。 (1)String

研究MapReduce原始碼之實現定義LineRecordReader完成多行讀取檔案內容

TextInputFormat是Hadoop預設的資料輸入格式,但是它只能一行一行的讀記錄,如果要讀取多行怎麼辦? 很簡單 自己寫一個輸入格式,然後寫一個對應的Recordreader就可以了,但是要實現確不是這麼簡單的 首先看看TextInputForma

001_定義過濾及添加文件內容腳本(nginx)

parser sys idt fin rip readlines pat utf default 一、工作中遇到sed添加及修改在nginx末尾添加太麻煩了 需求:隨意查找添加一條以前不存在的內容加到"}"前一行 實現: #!/usr/bin/env python #

C#-XML文件提取字符串+字符串存為XML文件+創建XML(定義節點)文件+讀取節點內容

cts ted set clas desc format view nav child 一、將字符串寫入xml文件(並保存) 寫入: XmlDocument xdoc = new XmlDocument(); xdoc.LoadXml(“xmlstring”); 保存

jsp定義標簽獲取標簽體內容輸出到頁面展示《三》

ride write bubuko his color back lan invoke mage 1.BodyTag.java package com.zy.tag; import java.io.IOException; import javax.servlet.jsp