spark streaming 根據檔案內容自定義檔名輸出,並實現檔案內容追加
spark streaming 從kafka拉取資料,根絕訊息內容,需要將不容的訊息放到不同的資料夾下,大致內容為 從訊息中拆分出域名,不同域名分不到不同目錄,域名下按訊息中的時間分年月日目錄,底層目錄下自定義檔案個數,實現追加
由於sparkstreaming 10秒執行一次job,需要重寫 OutputFormat,來實現按內容分目錄,檔案追加
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val line = lines.map(x => (x, 1)).repartition (20)
line.saveAsHadoopFiles("","",classOf[Text],classOf[NullWritable],classOf[MyMultipleTextOutputFormat[Text,NullWritable]])
MyMultipleTextOutputFormat 即為我們重寫的類
package com.linkingcloud.bigdata.common;
import com.linkingcloud.bigdata.common.interpret.LineInterpret;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.mapred.RecordWriter;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
import java.util.TreeMap;
/**
* Created by apple on 2017/2/15.
*/
public class MyMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
private TextOutputFormat<K, V> theTextOutputFormat = null;
public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable arg3) throws IOException {
return new RecordWriter() {
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();
public void write(Object key, Object value) throws IOException {
//key中為訊息內容,value無意義
String line = key.toString();
//根據訊息內容,定義輸出路徑和輸出內容(同時清洗資料)
String[] ss = LineInterpret.interpretLine(line, "/test/spark/kafka");
if (ss != null && ss.length == 2) {
//name的最後兩位為jobid,同一個檔案只能同時允許一個job寫入,多個job寫一個檔案會報錯,將jobid作為檔名的一部分
//能解決此問題
String finalPath = ss[1] + "-" + name.substring(name.length() - 2);
RecordWriter rw = (RecordWriter) this.recordWriters.get(finalPath);
try {
if (rw == null) {
rw = getBaseRecordWriter(fs, job, finalPath, arg3);
this.recordWriters.put(finalPath, rw);
}
rw.write(ss[0], null);
} catch (Exception e) {
//一個週期內,job不能完成,下一個job啟動,會造成同時寫一個檔案的情況,變更檔名,新增字尾
this.rewrite(finalPath + "-", ss[0]);
}
}
}
public void rewrite(String path, String line) {
String finalPath = path + new Random().nextInt(10);
RecordWriter rw = (RecordWriter) this.recordWriters.get(finalPath);
try {
if (rw == null) {
rw = getBaseRecordWriter(fs, job, finalPath, arg3);
this.recordWriters.put(finalPath, rw);
}
rw.write(line, null);
} catch (Exception e) {
//重試
this.rewrite(finalPath, line);
}
}
public void close(Reporter reporter) throws IOException {
Iterator keys = this.recordWriters.keySet().iterator();
while (keys.hasNext()) {
RecordWriter rw = (RecordWriter) this.recordWriters.get(keys.next());
rw.close(reporter);
}
this.recordWriters.clear();
}
};
}
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String path, Progressable arg3) throws IOException {
if (this.theTextOutputFormat == null) {
this.theTextOutputFormat = new MyTextOutputFormat();
}
return this.theTextOutputFormat.getRecordWriter(fs, job, path, arg3);
}
}
MyTextOutputFormat中實現對存在的檔案進行append,而不是覆蓋
package com.linkingcloud.bigdata.common;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* Created by apple on 2017/2/15.
*/
public class MyTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
public MyTextOutputFormat() {
}
@Override
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String path, Progressable progress) throws IOException {
String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t");
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, job);
Path file = FileOutputFormat.getTaskOutputPath(job, path + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
String file_path = path + codec.getDefaultExtension();
Path newFile = new Path(FileOutputFormat.getOutputPath(job), file_path);
FSDataOutputStream fileOut;
if (fs.exists(newFile)) {
fileOut = fs.append(newFile,4096,progress);
} else {
fileOut = fs.create(newFile, progress);
}
return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
}
}
結果如下:
spark streaming 採用gzip壓縮,會導致derect memory洩露,暫時沒有找到好的解決方法,只能不使用壓縮,誰解決了此問題,可以留言,感謝!
相關推薦
spark streaming 根據檔案內容自定義檔名輸出,並實現檔案內容追加
spark streaming 從kafka拉取資料,根絕訊息內容,需要將不容的訊息放到不同的資料夾下,大致內容為 從訊息中拆分出域名,不同域名分不到不同目錄,域名下按訊息中的時間分年月日目錄,底層目錄下自定義檔案個數,實現追加 由於sparkstrea
springmvc在處理請求過程中出現異常資訊交由異常處理器進行處理,自定義異常處理器可以實現一個系統的異常處理邏輯。為了區別不同的異常通常根據異常型別自定義異常類,這裡我們建立一個自定義系統異常,如果controller、service、dao丟擲此類異常說明是系統預期處理的異常資訊。
springmvc在處理請求過程中出現異常資訊交由異常處理器進行處理,自定義異常處理器可以實現一個系統的異常處理邏輯。 1.1 異常處理思路 系統中異常包括兩類:預期異常和執行時異常RuntimeException,前者通過捕獲異常從而獲取異常資訊,後者主要通過規範程式碼開發、測試通過手段減少執
js自定義方法通過隱藏iframe實現檔案下載
<script> function download() { //下載檔案的地址 var url="http://music.baidu.com/data/music/file?link=http://zhangmenshiting.baidu.com/data2/music/13618994/1
gin框架自定義日誌輸出,自定義gin中介軟體擴充套件Logger
gin框架是款高效能的GoWeb框架,可以快速開發部署api服務。在使用過程中我們需要記錄各種各樣的日誌,下面介紹下我們怎麼自定義日誌記錄格式或擴充套件日誌。 gin簡單剖析 api服務建立 package main import "gith
Android 自定義Dialog類,並在Activity中實現按鈕監聽。
實際開發中,經常會用到Dialog,比如退出時候會彈出是否退出,或者還有一些編輯框也會用Dialog實現,效果圖如下: 開發中遇到的問題無非在於如果在Activity中監聽這個Dialog中實現的按鈕,Dialog類如下,在MyDialog這個類中實現了一個LeaveMyDialogLi
從鍵盤輸入4位學生的資料(學生資料包括學號、姓名和三門課成績),然後把它們轉存到磁碟檔案中去,再從磁碟檔案讀入相應變數中,並將變數內容輸出到螢幕。
#include<stdio.h>#define num 4struct stu{char number[20];char name[20];float Sub1;float Sub2;float Sub3;};int main(){int i;FILE *fp;
【enum】如何在列舉中定義自定義的方法,並進行使用
1、定一個列舉類 packagecom.eud.t1; publicenumColor { //定義列舉中的常量 RED(1,"hongse"), GREEN(2,"qingse"),BLACK
如何自定義一個View,並動態設定style?
定義一個TextView的style,常用的方法是在xml檔案中實現,但有時候我們需要動態的在程式碼中自定義個style的View,進行動態的新增或刪除等控制,這時候需要用到inflater方法。 TextView的xml,R.layout.empty_tex
菜鳥學習shiro之實現自定義的Realm,從而實現登入驗證,身份驗證和許可權驗證4
講了那麼多使用的內建的類從而實現四郎,現在講自定義的境界 首先行家的依賴依然是第一篇的那個依賴 下邊是自定義的境界: import org.apache.shiro.authc.AuthenticationException; import org.apache.shi
Mac ssh連線遠端伺服器,並實現檔案的上傳和下載
使用scp命令實現上傳下載 1、從伺服器上下載檔案 scp [email protected]:/path/filename /Users/mac/Desktop(本地目錄) 例如:scp [email protected]:/root/test.txt /Users/mac/Desk
MAC實用SSH連線遠端伺服器,並實現檔案上傳下載
MAC作為程式設計師的神器,在程式設計上的使用遠超window的電腦,而MAC本身提供了SSH功能,配合VIM編輯器對程式設計十分有幫助。 使用ssh連線遠端主機 sudo -i 切換root許可權 ssh [email protected] 其中,
檔案操作,及檔案操作時的許可權設定,快速實現檔案拷貝,C語言常用IO函式
定義一個檔案型別指標。用於儲存開啟檔案後的指標。 C語言中可以用 gets(char *str) 來輸入一行(包括空格)。 FILE *p = fopen("檔名",“開啟的許可權”); 其中許可權可以是w ,此時如果檔案不存在會自動建立,如果存在就會自動覆蓋(內容); 許
idea中自定義設定xml的標頭檔案的內容
因為在idea中新建的xml預設的標頭檔案,有時候並不是我們需要的這時候可以通過自定義來解決. 如搭建hibernate的實體類的xml. 首先 fiel→settings出現 如下框框 在上面搜尋 temp 點選
mybatis generator 自定義 ,xml 檔名稱和內容自定義,dao名稱自定義
最近在用mybatis generator 生成程式碼的時候,生成的xml檔案 和類檔案 不是自己想要的,於是修改mybatis generator 的原始碼,重寫方法來達到效果,這裡記錄一下,後期如果需要還可以隨便改成自己想要的! 一 修改註釋 &nb
如何使用js將canvas儲存為圖片檔案,並且可以自定義檔名
1、從canvas中直接提取圖片元資料 // 圖片匯出為 png 格式 var type = 'png'; var imgData = canvas.toDataURL(type); 上面的程式碼得到的資料格式為:data:image/p
Java File類學習筆記4:自定義一個類,過濾指定副檔名的檔案
一、FilenameFilter介紹 java.io.File類提供了四個方法用於列舉某個路徑下的檔案和目錄,但不會遞迴列舉子目錄下的內容 其中兩個是列舉路徑下的所有檔案和目錄。 (1)String
研究MapReduce原始碼之實現自定義LineRecordReader完成多行讀取檔案內容
TextInputFormat是Hadoop預設的資料輸入格式,但是它只能一行一行的讀記錄,如果要讀取多行怎麼辦? 很簡單 自己寫一個輸入格式,然後寫一個對應的Recordreader就可以了,但是要實現確不是這麼簡單的 首先看看TextInputForma
001_自定義過濾及添加文件內容腳本(nginx)
parser sys idt fin rip readlines pat utf default 一、工作中遇到sed添加及修改在nginx末尾添加太麻煩了 需求:隨意查找添加一條以前不存在的內容加到"}"前一行 實現: #!/usr/bin/env python #
C#-XML文件提取字符串+字符串存為XML文件+創建XML(自定義節點)文件+讀取節點內容
cts ted set clas desc format view nav child 一、將字符串寫入xml文件(並保存) 寫入: XmlDocument xdoc = new XmlDocument(); xdoc.LoadXml(“xmlstring”); 保存
jsp自定義標簽獲取標簽體內容輸出到頁面展示《三》
ride write bubuko his color back lan invoke mage 1.BodyTag.java package com.zy.tag; import java.io.IOException; import javax.servlet.jsp