1. 程式人生 > 其它 >BigData專案 -將flume完的日誌檔案匯入到hive數倉中

BigData專案 -將flume完的日誌檔案匯入到hive數倉中

技術標籤:BigDatahadoophdfsflume

1. flume日誌檔案

1.1 上游conf檔案

a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2


a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1 g2
a1.sources.r1.filegroups.g1 = /opt/data/logdata/app/event.*
a1.sources.r1.filegroups.g2 = /opt/data/logdata/wx/event.*
a1.sources.r1.headers.g1.datatype = app
a1.sources.r1.headers.g2.datatype = wx
a1.sources.r1.batchSize = 100

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = flume.interceptor.FieldEncryptInterceptor$FieldEncryptInterceptorBuilder
a1.sources.r1.interceptors.i1.headerName = timestamp
a1.sources.r1.interceptors.i1.timestamp_field = timeStamp
a1.sources.r1.interceptors.i1.to_encrypt_field = account


a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flumedata/file-channel/checkpoint
a1.channels.c1.dataDirs = /opt/data/flumedata/file-channel/data


a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux02
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100


a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux03
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 100

# 定義sink組及其配套的sink處理器
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000

1.2 下游conf檔案

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sources.r1.batchSize = 100

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flumedata/file-channel/checkpoint
a1.channels.c1.dataDirs = /opt/data/flumedata/file-channel/data

a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://linux01:8020/logdata/%{datatype}/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = %{datatype}EduData
a1.sinks.k1.hdfs.fileSuffix = .log.gz
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.codeC = gzip
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.useLocalTimeStamp = false

1.3 攔截器

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class FieldEncryptInterceptor implements Interceptor {

    String timestamp_field;
    String to_encrypt_field;
    String headerName;

    public FieldEncryptInterceptor(String timestamp_field, String to_encrypt_field, String headerName) {

        this.timestamp_field = timestamp_field;
        this.to_encrypt_field = to_encrypt_field;
        this.headerName = headerName;

    }

    public void initialize() {

    }

    public Event intercept(Event event) {

        // 根據要加密的欄位,從event中提取原值(用json解析)
        try {
            String line = new String(event.getBody());
            JSONObject jsonObject = JSON.parseObject(line);

            String toEncryptField = jsonObject.getString(to_encrypt_field);
            String timeStampField = jsonObject.getString(timestamp_field);

            // 加密
            if (StringUtils.isNotBlank(toEncryptField)) {
                String encrypted = DigestUtils.md5Hex(toEncryptField);

                // 將加密後的值替換掉原值
                jsonObject.put(to_encrypt_field, encrypted);

                // 轉回json,並放回event
                String res = jsonObject.toJSONString();
                event.setBody(res.getBytes("UTF-8"));
            }

            // 放入時間戳到header中
            event.getHeaders().put(headerName, timeStampField);

        } catch (Exception e) {

            event.getHeaders().put("datatype", "malformed");

            e.printStackTrace();
        }


        return event;
    }

    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }

        return list;
    }

    public void close() {

    }


    public static class FieldEncryptInterceptorBuilder implements Interceptor.Builder {
        String timestamp_field;
        String to_encrypt_field;
        String headerName;

        public Interceptor build() {

            return new FieldEncryptInterceptor(timestamp_field, to_encrypt_field, headerName);
        }

        public void configure(Context context) {
            timestamp_field = context.getString("timestamp_field");
            headerName = context.getString("headerName");
            to_encrypt_field = context.getString("to_encrypt_field");


        }
    }
}

問題:

如果下游寫資料的時候壓縮了【a1.sinks.k1.hdfs.codeC = gzip】,在寫入hdfs中的時候字尾名要加上【.log.gz】,要不匯入hive中的時候無法識別JSON

2. 建立hive資料庫和表

2.1 啟動hive服務和遠端連線hiveservice2

hive --service metastore前臺啟動
hive --service metastore &後臺啟動
[[email protected] ~]# netstat -nltp | grep 9083檢視埠啟用情況
tcp6 0 0 :::9083—>有該結果說明埠已在使用

前臺啟動 bin/hiveserver2

前臺啟動 bin/hiveserver2&

[[email protected] ~]# beeline

beeline> !connect jdbc:hive2://linux01:10000

2.2建立資料庫

create database ods;

2.2 建立表

  • app端
DROP TABLE IF EXISTS `ods.event_app_log`;
CREATE EXTERNAL TABLE `ods.event_app_log`(         
  `account` string ,    
  `appid` string ,      
  `appversion` string , 
  `carrier` string ,    
  `deviceid` string ,   
  `devicetype` string , 
  `eventid` string ,    
  `ip` string ,         
  `latitude` double ,   
  `longitude` double ,  
  `nettype` string ,    
  `osname` string ,     
  `osversion` string ,  
  `properties` map<string,string> ,  
  `releasechannel` string ,   
  `resolution` string ,   
  `sessionid` string ,   
  `timestamp` bigint 
)   
PARTITIONED BY (`dt` string)                                      
ROW FORMAT SERDE                                    
  'org.apache.hive.hcatalog.data.JsonSerDe'         
STORED AS INPUTFORMAT                               
  'org.apache.hadoop.mapred.TextInputFormat'        
OUTPUTFORMAT                                        
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  
LOCATION                                            
  'hdfs://linux01:8020/user/hive/warehouse/ods.db/event_app_log'  
TBLPROPERTIES (                                     
  'bucketing_version'='2',                          
  'transient_lastDdlTime'='1610337798'
);            
  • wx端
DROP TABLE IF EXISTS `ods.event_wx_log`;
CREATE EXTERNAL TABLE `ods.event_wx_log`(         
  `account` string ,    
  `appid` string ,      
  `appversion` string , 
  `carrier` string ,    
  `deviceid` string ,   
  `devicetype` string , 
  `eventid` string ,    
  `ip` string ,         
  `latitude` double ,   
  `longitude` double ,  
  `nettype` string ,    
  `osname` string ,     
  `osversion` string ,  
  `properties` map<string,string> ,  
  `releasechannel` string ,   
  `resolution` string ,   
  `sessionid` string ,   
  `timestamp` bigint 
)   
PARTITIONED BY (`dt` string)                                      
ROW FORMAT SERDE                                    
  'org.apache.hive.hcatalog.data.JsonSerDe'         
STORED AS INPUTFORMAT                               
  'org.apache.hadoop.mapred.TextInputFormat'        
OUTPUTFORMAT                                        
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  
LOCATION                                            
  'hdfs://linux01:8020/user/hive/warehouse/ods.db/event_wx_log'  
TBLPROPERTIES (                                     
  'bucketing_version'='2',                          
  'transient_lastDdlTime'='1610337798'
);

3. 編寫指令碼 將日誌資料 load到hive表中

  • app端
#!/bin/bash
######################################
#
#  @author :  披著狼皮的小女孩
#  @date   :  2021-01-11
#  @desc   :  wx端埋點日誌入庫
#  @other  
######################################

export JAVA_HOME=/opt/apps/jdk1.8.0_141
export HIVE_HOME=/opt/apps/apache-hive-3.1.2/


DT=$(date -d'-1 day' +%Y-%m-%d)

${HIVE_HOME}/bin/hive -e "
load data inpath '/anli/wx/${DT}' into table ods.event_wx_log partition(dt='${DT}')
"

if [ $? -eq 0 ]
then
 echo "${DT}wx埋點日誌,入庫成功"
else
 echo "入庫失敗"
fi
  • wx端
#!/bin/bash
######################################
#
#  @author :  披著狼皮的小女孩
#  @date   :  2021-01-11
#  @desc   :  app端埋點日誌入庫
#  @other  
######################################

export JAVA_HOME=/opt/apps/jdk1.8.0_141
export HIVE_HOME=/opt/apps/apache-hive-3.1.2/


DT=$(date -d'-1 day' +%Y-%m-%d)

${HIVE_HOME}/bin/hive -e "
load data inpath '/anli/app/${DT}' into table ods.event_app_log partition(dt='${DT}')
"

if [ $? -eq 0 ]
then
 echo "${DT}app埋點日誌,入庫成功"
else
 echo "入庫失敗"
fi

  • 啟動指令碼
sh event_app_load.sh
sh event_wx_load.sh

4. 問題總結

-- 問題總結
1. flume的agent的堆記憶體大小
預設只有20M,在生產中是肯定不夠的
一般需要給到1G
vi bin/flume-ng
搜尋 Xmx ,並修改

2. channel阻塞
啟動flume之前,積壓的資料過多,所以,source讀得很快,而sink寫hdfs速度有限,會導致反壓
反壓從下游傳遞到上游,上游的flume的執行日誌中會不斷報:channel已滿,source重試

這裡就涉及到flume的執行監控
如果通過監控,發現channel頻繁處於阻塞狀態,可以通過如下措施予以改善(優化):
a. 如果資源允許,可以增加寫入hdfs的agent機器數,通過負載均衡來提高整體吞吐量
b. 如果資源不允許,可以增大batchSize,來提高寫入hdfs的效率
c. 如果資源不允許,可以配置資料壓縮,來降低寫入hdfs的資料流量
d. 如果source的資料流量不是恆定大於sink的寫出速度,可以提高channel的快取容量,來削峰


3.如果agent程序宕機,如何處理?
下游宕機:問題不大,我們配置高可用模式,會自動切換;當然,還是要告警,通知運維儘快修復;
上游宕機:問題較大,通過指令碼監控程序狀態,發現異常則重新拉起agent程序;並告警通知運維儘快查明原因予以修復;

  • flume 監控agent
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545