【Flume】【原始碼分析】flume中sink到hdfs,檔案系統頻繁產生檔案,檔案滾動配置不起作用?
本人在測試hdfs的sink,發現sink端的檔案滾動配置項起不到任何作用,配置如下:
a1.sinks.k1.type=hdfs a1.sinks.k1.channel=c1 a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M a1.sinks.k1.hdfs.filePrefix=XXX a1.sinks.k1.hdfs.rollInterval=60 a1.sinks.k1.hdfs.rollSize=0 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=0
這裡配置的是60秒,檔案滾動一次,也就每隔60秒,會新產生一個檔案【前提,flume的source端有資料來】
這裡注意
useLocalTimeStamp
這個屬性的目的就是相當於時間戳的攔截器,否則%Y 等等這些東西都識別不了
要麼用上面這個屬性,要麼用時間戳攔截器。
但是當我啟動flume的時候,執行十幾秒,不斷寫入資料,發現hdfs端頻繁的產生檔案,每隔幾秒就有新檔案產生
而且在flume的日誌輸出可以頻繁看到這句:
[WARN] Block Under-replication detected. Rotating file.
只要有這句,就會產生一個新的檔案
意思就是檢測到複製塊正在滾動檔案,結合原始碼看下:
這是判斷是否滾動檔案,但是這裡面的第一判斷條件是判斷是否當前的HDFSWriter正在複製塊private boolean shouldRotate() { boolean doRotate = false; if (writer.isUnderReplicated()) { this.isUnderReplicated = true; doRotate = true; } else { this.isUnderReplicated = false; } if ((rollCount > 0) && (rollCount <= eventCounter)) { LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter); doRotate = true; } if ((rollSize > 0) && (rollSize <= processSize)) { LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize); doRotate = true; } return doRotate; }
public boolean isUnderReplicated() {
try {
int numBlocks = getNumCurrentReplicas();
if (numBlocks == -1) {
return false;
}
int desiredBlocks;
if (configuredMinReplicas != null) {
desiredBlocks = configuredMinReplicas;
} else {
desiredBlocks = getFsDesiredReplication();
}
return numBlocks < desiredBlocks;
} catch (IllegalAccessException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (InvocationTargetException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (IllegalArgumentException e) {
logger.error("Unexpected error while checking replication factor", e);
}
return false;
}
通過讀取的配置複製塊數量和當前正在複製的塊比較,判斷是否正在被複制
if (shouldRotate()) {
boolean doRotate = true;
if (isUnderReplicated) {
if (maxConsecUnderReplRotations > 0 &&
consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
doRotate = false;
if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
LOG.error("Hit max consecutive under-replication rotations ({}); " +
"will not continue rolling files under this path due to " +
"under-replication", maxConsecUnderReplRotations);
}
} else {
LOG.warn("Block Under-replication detected. Rotating file.");
}
consecutiveUnderReplRotateCount++;
} else {
consecutiveUnderReplRotateCount = 0;
}
以上方法,入口是shouldRotate()方法,也就是如果你配置了rollcount,rollsize大於0,會按照你的配置來滾動的,但是在入口進來後,發現,又去判斷了是否有塊在複製;裡面就讀取了一個固定變數maxConsecUnderReplRotations=30,也就是正在複製的塊,最多之能滾動出30個檔案,如果超過了30次,該資料塊如果還在複製中,那麼資料也不會滾動了,doRotate=false,不會滾動了,所以有的人發現自己一旦執行一段時間,會出現30個檔案
再結合上面的原始碼看一下:
如果你配置了10秒滾動一次,寫了2秒,恰好這時候該檔案內容所在的塊在複製中,那麼雖然沒到10秒,依然會給你滾動檔案的,檔案大小,事件數量的配置同理了。
為了解決上述問題,我們只要讓程式感知不到寫的檔案所在塊正在複製就行了,怎麼做呢??
只要讓isUnderReplicated()方法始終返回false就行了
該方法是通過當前正在被複制的塊和配置中讀取的複製塊數量比較的,我們能改的就只有配置項中複製塊的數量,而官方給出的flume配置項中有該項
hdfs.minBlockReplicas
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
預設讀的是hadoop中的dfs.replication屬性,該屬性預設值是3
這裡我們也不去該hadoop中的配置,在flume中新增上述屬性為1即可
配置如下:
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix=cmcc
a1.sinks.k1.hdfs.minBlockReplicas=1
#a1.sinks.k1.hdfs.fileType=DataStream
#a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
這樣程式就永遠不會因為檔案所在塊的複製而滾動檔案了,只會根據你的配置項來滾動檔案了,試試吧!!
望各位網友不吝指教!!!