BigData專案 -將flume完的日誌檔案匯入到hive數倉中
1. flume日誌檔案
1.1 上游conf檔案
a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 a1.sources.r1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = g1 g2 a1.sources.r1.filegroups.g1 = /opt/data/logdata/app/event.* a1.sources.r1.filegroups.g2 = /opt/data/logdata/wx/event.* a1.sources.r1.headers.g1.datatype = app a1.sources.r1.headers.g2.datatype = wx a1.sources.r1.batchSize = 100 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = flume.interceptor.FieldEncryptInterceptor$FieldEncryptInterceptorBuilder a1.sources.r1.interceptors.i1.headerName = timestamp a1.sources.r1.interceptors.i1.timestamp_field = timeStamp a1.sources.r1.interceptors.i1.to_encrypt_field = account a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/data/flumedata/file-channel/checkpoint a1.channels.c1.dataDirs = /opt/data/flumedata/file-channel/data a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = linux02 a1.sinks.k1.port = 41414 a1.sinks.k1.batch-size = 100 a1.sinks.k2.channel = c1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = linux03 a1.sinks.k2.port = 41414 a1.sinks.k2.batch-size = 100 # 定義sink組及其配套的sink處理器 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 1 a1.sinkgroups.g1.processor.maxpenalty = 10000
1.2 下游conf檔案
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 a1.sources.r1.batchSize = 100 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/data/flumedata/file-channel/checkpoint a1.channels.c1.dataDirs = /opt/data/flumedata/file-channel/data a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://linux01:8020/logdata/%{datatype}/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = %{datatype}EduData a1.sinks.k1.hdfs.fileSuffix = .log.gz a1.sinks.k1.hdfs.rollInterval = 600 a1.sinks.k1.hdfs.rollSize = 268435456 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.codeC = gzip a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.useLocalTimeStamp = false
1.3 攔截器
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.io.UnsupportedEncodingException; import java.util.List; public class FieldEncryptInterceptor implements Interceptor { String timestamp_field; String to_encrypt_field; String headerName; public FieldEncryptInterceptor(String timestamp_field, String to_encrypt_field, String headerName) { this.timestamp_field = timestamp_field; this.to_encrypt_field = to_encrypt_field; this.headerName = headerName; } public void initialize() { } public Event intercept(Event event) { // 根據要加密的欄位,從event中提取原值(用json解析) try { String line = new String(event.getBody()); JSONObject jsonObject = JSON.parseObject(line); String toEncryptField = jsonObject.getString(to_encrypt_field); String timeStampField = jsonObject.getString(timestamp_field); // 加密 if (StringUtils.isNotBlank(toEncryptField)) { String encrypted = DigestUtils.md5Hex(toEncryptField); // 將加密後的值替換掉原值 jsonObject.put(to_encrypt_field, encrypted); // 轉回json,並放回event String res = jsonObject.toJSONString(); event.setBody(res.getBytes("UTF-8")); } // 放入時間戳到header中 event.getHeaders().put(headerName, timeStampField); } catch (Exception e) { event.getHeaders().put("datatype", "malformed"); e.printStackTrace(); } return event; } public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } public void close() { } public static class FieldEncryptInterceptorBuilder implements Interceptor.Builder { String timestamp_field; String to_encrypt_field; String headerName; public Interceptor build() { return new FieldEncryptInterceptor(timestamp_field, to_encrypt_field, headerName); } public void configure(Context context) { timestamp_field = context.getString("timestamp_field"); headerName = context.getString("headerName"); to_encrypt_field = context.getString("to_encrypt_field"); } } }
問題:
如果下游寫資料的時候壓縮了【a1.sinks.k1.hdfs.codeC = gzip】,在寫入hdfs中的時候字尾名要加上【.log.gz】,要不匯入hive中的時候無法識別JSON
2. 建立hive資料庫和表
2.1 啟動hive服務和遠端連線hiveservice2
hive --service metastore前臺啟動
hive --service metastore &後臺啟動
[[email protected] ~]# netstat -nltp | grep 9083檢視埠啟用情況
tcp6 0 0 :::9083—>有該結果說明埠已在使用前臺啟動 bin/hiveserver2
前臺啟動 bin/hiveserver2&
[[email protected] ~]# beeline
beeline> !connect jdbc:hive2://linux01:10000
2.2建立資料庫
create database ods;
2.2 建立表
- app端
DROP TABLE IF EXISTS `ods.event_app_log`;
CREATE EXTERNAL TABLE `ods.event_app_log`(
`account` string ,
`appid` string ,
`appversion` string ,
`carrier` string ,
`deviceid` string ,
`devicetype` string ,
`eventid` string ,
`ip` string ,
`latitude` double ,
`longitude` double ,
`nettype` string ,
`osname` string ,
`osversion` string ,
`properties` map<string,string> ,
`releasechannel` string ,
`resolution` string ,
`sessionid` string ,
`timestamp` bigint
)
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://linux01:8020/user/hive/warehouse/ods.db/event_app_log'
TBLPROPERTIES (
'bucketing_version'='2',
'transient_lastDdlTime'='1610337798'
);
- wx端
DROP TABLE IF EXISTS `ods.event_wx_log`;
CREATE EXTERNAL TABLE `ods.event_wx_log`(
`account` string ,
`appid` string ,
`appversion` string ,
`carrier` string ,
`deviceid` string ,
`devicetype` string ,
`eventid` string ,
`ip` string ,
`latitude` double ,
`longitude` double ,
`nettype` string ,
`osname` string ,
`osversion` string ,
`properties` map<string,string> ,
`releasechannel` string ,
`resolution` string ,
`sessionid` string ,
`timestamp` bigint
)
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://linux01:8020/user/hive/warehouse/ods.db/event_wx_log'
TBLPROPERTIES (
'bucketing_version'='2',
'transient_lastDdlTime'='1610337798'
);
3. 編寫指令碼 將日誌資料 load到hive表中
- app端
#!/bin/bash
######################################
#
# @author : 披著狼皮的小女孩
# @date : 2021-01-11
# @desc : wx端埋點日誌入庫
# @other
######################################
export JAVA_HOME=/opt/apps/jdk1.8.0_141
export HIVE_HOME=/opt/apps/apache-hive-3.1.2/
DT=$(date -d'-1 day' +%Y-%m-%d)
${HIVE_HOME}/bin/hive -e "
load data inpath '/anli/wx/${DT}' into table ods.event_wx_log partition(dt='${DT}')
"
if [ $? -eq 0 ]
then
echo "${DT}wx埋點日誌,入庫成功"
else
echo "入庫失敗"
fi
- wx端
#!/bin/bash
######################################
#
# @author : 披著狼皮的小女孩
# @date : 2021-01-11
# @desc : app端埋點日誌入庫
# @other
######################################
export JAVA_HOME=/opt/apps/jdk1.8.0_141
export HIVE_HOME=/opt/apps/apache-hive-3.1.2/
DT=$(date -d'-1 day' +%Y-%m-%d)
${HIVE_HOME}/bin/hive -e "
load data inpath '/anli/app/${DT}' into table ods.event_app_log partition(dt='${DT}')
"
if [ $? -eq 0 ]
then
echo "${DT}app埋點日誌,入庫成功"
else
echo "入庫失敗"
fi
- 啟動指令碼
sh event_app_load.sh
sh event_wx_load.sh
4. 問題總結
-- 問題總結
1. flume的agent的堆記憶體大小
預設只有20M,在生產中是肯定不夠的
一般需要給到1G
vi bin/flume-ng
搜尋 Xmx ,並修改2. channel阻塞
啟動flume之前,積壓的資料過多,所以,source讀得很快,而sink寫hdfs速度有限,會導致反壓
反壓從下游傳遞到上游,上游的flume的執行日誌中會不斷報:channel已滿,source重試這裡就涉及到flume的執行監控
如果通過監控,發現channel頻繁處於阻塞狀態,可以通過如下措施予以改善(優化):
a. 如果資源允許,可以增加寫入hdfs的agent機器數,通過負載均衡來提高整體吞吐量
b. 如果資源不允許,可以增大batchSize,來提高寫入hdfs的效率
c. 如果資源不允許,可以配置資料壓縮,來降低寫入hdfs的資料流量
d. 如果source的資料流量不是恆定大於sink的寫出速度,可以提高channel的快取容量,來削峰
3.如果agent程序宕機,如何處理?
下游宕機:問題不大,我們配置高可用模式,會自動切換;當然,還是要告警,通知運維儘快修復;
上游宕機:問題較大,通過指令碼監控程序狀態,發現異常則重新拉起agent程序;並告警通知運維儘快查明原因予以修復;
- flume 監控agent
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545