Flume-接入Hive數倉搭建流程
阿新 • • 發佈:2020-12-06
實時流接入數倉,基本在大公司都會有,在Flume1.8
以後支援taildir source
, 其有以下幾個特點,而被廣泛使用:
1.使用正則表示式匹配目錄中的檔名
2.監控的檔案中,一旦有資料寫入,Flume
就會將資訊寫入到指定的Sink
3.高可靠,不會丟失資料
4.不會對跟蹤檔案有任何處理,不會重新命名也不會刪除
5.不支援Windows
,不能讀二進位制檔案。支援按行讀取文字檔案
本文以開源Flume
流為例,介紹流接入HDFS
,後面在其上面建立ods
層外表。
1.1 taildir source配置
a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log
1.2 hdfs sink 配置
a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # 配置檔案滾動方式(檔案大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上重新整理的event的個數 a1.sinks.k1.hdfs.batchSize = 100 # 使用本地時間 a1.sinks.k1.hdfs.useLocalTimeStamp = true
1.3 Agent的配置
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # 配置檔案滾動方式(檔案大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上重新整理的event的個數 a1.sinks.k1.hdfs.batchSize = 1000 # 使用本地時間 a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
/opt/hoult/servers/conf/flume-log2hdfs.conf
1.4 啟動
flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console
export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"
# 要想使配置檔案生效,還要在命令列中指定配置檔案目錄
flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console
要$FLUME_HOME/conf/flume-env.sh
加下面的引數,否則會報錯誤如下:
1.5 使用自定義攔截器解決Flume Agent替換本地時間為日誌裡面的時間戳
使用netcat source → logger sink來測試
# a1是agent的名稱。source、channel、sink的名稱分別為:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux121
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之間的關係
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
攔截器主要程式碼如下:
public class CustomerInterceptor implements Interceptor {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 獲得body的內容
String eventBody = new String(event.getBody(), Charsets.UTF_8);
// 獲取header的內容
Map<String, String> headerMap = event.getHeaders();
final String[] bodyArr = eventBody.split("\\s+");
try {
String jsonStr = bodyArr[6];
if (Strings.isNullOrEmpty(jsonStr)) {
return null;
}
// 將 string 轉成 json 物件
JSONObject jsonObject = JSON.parseObject(jsonStr);
String timestampStr = jsonObject.getString("time");
//將timestamp 轉為時間日期型別(格式 :yyyyMMdd)
long timeStamp = Long.valueOf(timestampStr);
String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault()));
headerMap.put("logtime", date);
event.setHeaders(headerMap);
} catch (Exception e) {
headerMap.put("logtime", "unknown");
event.setHeaders(headerMap);
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> out = new ArrayList<>();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomerInterceptor();
}
@Override
public void configure(Context context) {
}
}
啟動
flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console
## 測試
telnet linux121 9999
吳邪,小三爺,混跡於後臺,大資料,人工智慧領域的小菜鳥。
更多請關注