1. 程式人生 > 實用技巧 >大資料實戰(二十):電商數倉(十三)之使用者行為資料採集(十三)元件安裝(九)資料採集通道

大資料實戰(二十):電商數倉(十三)之使用者行為資料採集(十三)元件安裝(九)資料採集通道

1 簡介

2 第一層資料採集通道

2.1 元件

1.第一層採集指令碼Source的選擇
①Source:
  資料來源在日誌檔案中!
    讀取日誌中的資料,可以使用以下Source
      ExecSource: 可以執行一個linux命令,例如tail -f 日誌檔案,
        講讀取的到的資料封裝為Event!
        不用!不安全,可能丟資料!
      SpoolingDirSource: 可以讀取一個目錄中的文字檔案!
        保證目錄中沒有重名的檔案!
        保證目錄中的檔案都是封閉狀態,一旦放入目錄中,不能再繼續寫入!

        每個日誌封閉後,才能放入到SpoolingDir,不然agent就故障!
      TailDirSource: 接近實時第讀取指定的檔案!斷點續傳功能!
        使用此Source!

  使用TailDirSource

②Channel:
KafkaChannel:
  優點: 基於kafka的副本功能,提供了高可用性!event被儲存在kafka中!
      即便agent掛掉或broker掛掉,依然可以讓sink從channel中讀取資料!

  應用場景:
    ①KafkaChannel和sink和source一起使用,單純作為channel。
    ②KafkaChannel+攔截器+Source,只要Source把資料寫入到kafka就完成
            目前使用的場景!
    ③KafkaChannel+sink,使用flume將kafka中的資料寫入到其他的目的地,例如hdfs!

    為例在上述場景工作,KafkaChannel可以配置生產者和消費者的引數!

  配置引數:
    ①在channel層面的引數,例如channel的型別,channel的容量等,需要和之前一樣,
    在channel層面配置,例如:a1.channel.k1.type
    ②和kafka叢集相關的引數,需要在channel層面配置後,再加上kafka.
        例如: a1.channels.k1.kafka.topic : 向哪個主題傳送資料
            a1.channels.k1.kafka.bootstrap.servers: 叢集地址
    ③和Produer和Consumer相關的引數,需要加上produer和consumer的字首:
        例如:a1.channels.k1.kafka.producer.acks=all
              a1.channels.k1.kafka.consumer.group.id=atguigu

    必須的配置:
    type=org.apache.flume.channel.kafka.KafkaChannel
    kafka.bootstrap.servers=
    可選:
    kafka.topic: 生成到哪個主題
    parseAsFlumeEvent=true(預設):
              如果parseAsFlumeEvent=true,kafkaChannel會把資料以flume中Event的結構作為參考,
              把event中的header+body放入ProducerRecord的value中!

              如果parseAsFlumeEvent=false,kafkaChannel會把資料以flume中Event的結構作為參考,
              把event中body放入ProducerRecord的value中!

    a1.channels.k1.kafka.producer.acks=0

2. 攔截器
        日誌資料有兩種型別,一種是事件日誌,格式 時間戳|{"ap":xx,"cm":{},"et":[{},{}]}
        另一種是啟動日誌,格式:{"en":"start"}

        在1個source對接兩個KafkaChannel時,需要使用MulitPlexing Channel Selector,
        講啟動日誌,分配到啟動日誌所在的Chanel,講事件日誌分配到事件日誌所在的Channel!

        MulitPlexing Channel Selector根據event,header中指定key的對映,來分配!

        需要自定義攔截器,根據不同的資料型別,在每個Event物件的header中新增key!

        功能: ①為每個Event,在header中新增key
              ②過濾不符合要求的資料(格式有損壞)
                    啟動日誌: {},驗證JSON字串的完整性,是否以{}開頭結尾
                    事件日誌: 時間戳|{}
                        時間戳需要合法:
                            a)長度合法(13位)
                            b)都是數字
                        驗證JSON字串的完整性,是否以{}開頭結尾

2.2 元件關係(請先複習flume)

2.3 flume配置

f1.conf

#a1是agent的名稱,a1中定義了一個叫r1的source,如果有多個,使用空格間隔
a1.sources = r1
a1.channels = c1 c2

#組名名.屬性名=屬性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.batchSize=1000
#讀取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx開頭$代表以什麼結尾 .代表匹配任意字元
#+代表匹配任意位置
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
#JSON檔案的儲存位置
a1.sources.r1.positionFile=/opt/module/flume/test/log_position.json

#定義攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder

#定義ChannelSelector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2


#定義chanel
a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic=topic_start
a1.channels.c1.parseAsFlumeEvent=false

a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic=topic_event
a1.channels.c2.parseAsFlumeEvent=false

#連線元件 同一個source可以對接多個channel,一個sink只能從一個channel拿資料!
a1.sources.r1.channels=c1 c2

2.4 第一層通道啟動指令碼

#!/bin/bash
#使用start啟動指令碼,使用stop停止指令碼
if (($#!=1))
then
        echo 請輸入start或stop!
        exit;
fi
#定義cmd用來儲存要執行的命令
cmd=cmd
if [ $1 = start ]
then
        cmd="source /etc/profile;nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f1.conf -Dflume.root.logger=DEBUG,console > /home/atguigu/f1.log 2>&1 &"
        elif [ $1 = stop ]
                then
                        cmd="ps -ef  | grep f1.conf | grep -v grep | awk  '{print \$2}' | xargs kill -9"
        else
                echo 請輸入start或stop!
fi

#在hadoop102和hadoop103開啟採集
for i in hadoop102 hadoop103
do
        ssh $i $cmd
done

3 第二層資料採集通道

3.1 元件

①kafkaSource:kafkaSource就是kafka的一個消費者執行緒,可以從指定的主題中讀取資料!
    如果希望提供消費的速率,可以配置多個kafkaSource,這些source組成同一個組!

    kafkaSource在工作時,會檢查event的header中有沒有timestamp屬性,如果沒有,
    kafkaSource會自動為event新增timestamp=當前kafkaSource所在機器的時間!

    kafkaSource啟動一個消費者,消費者在消費時,預設從分割槽的最後一個位置消費!

必須的配置:
type=org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.topics=消費的主題
kafka.topics.regex=使用正則表示式匹配主題

可選的配置:
kafka.consumer.group.id=消費者所在的組id
batchSize=一次put多少資料,小於10000
batchDurationMillis=一次put可以最多使用多少時間

和kafkaConsumer相關的屬性:kafka.consumer=consumer的屬性名
    例如:kafka.consumer.auto.offset.reset

②fileChannel: channel中的event是儲存在檔案中!比memorychannel可靠,但是效率略低!
必須的配置:
type=file
checkpointDir=checkpoint執行緒(負責檢查檔案中哪些event已經被sink消費了,將這些event的檔案刪除)儲存資料的目錄!
useDualCheckpoints=false 是否啟動雙檢查點,如果啟動後,會再啟動一個備用的checkpoint執行緒!
          如果改為true,還需要設定backupCheckpointDir(備用的checkpoint執行緒的工作目錄)
dataDirs=在哪些目錄下儲存event,預設為~/.flume/file-channel/data,可以是逗號分割的多個目錄!

③hdfssink: hdfssink將event寫入到HDFS!目前只支援生成兩種型別的檔案: text | sequenceFile,這兩種檔案都可以使用壓縮!
              寫入到HDFS的檔案可以自動滾動(關閉當前正在寫的檔案,建立一個新檔案)。基於時間、events的數量、資料大小進行週期性的滾動!
              支援基於時間和採集資料的機器進行分桶和分割槽操作!
              HDFS資料所上傳的目錄或檔名可以包含一個格式化的轉義序列,這個路徑或檔名會在上傳event時,被自動替換,替換為完整的路徑名!
              使用此Sink要求本機已經安裝了hadoop,或持有hadoop的jar包!
      配置:
          必須配置:
          type – The component type name, needs to be hdfs
          hdfs.path – HDFS directory path (eg hdfs://namenode/flume/webdata/)

3.2 元件關係(請先複習flume)

3.3 flume 配置

#配置檔案編寫
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.kafka.consumer.group.id=CG_Start

a1.sources.r2.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
a1.sources.r2.kafka.consumer.auto.offset.reset=earliest
a1.sources.r2.kafka.consumer.group.id=CG_Event
#配置channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume/c1/checkpoint
#啟動備用checkpoint
a1.channels.c1.useDualCheckpoints=true
a1.channels.c1.backupCheckpointDir=/opt/module/flume/c1/backupcheckpoint
#event儲存的目錄
a1.channels.c1.dataDirs=/opt/module/flume/c1/datas


a1.channels.c2.type=file
a1.channels.c2.checkpointDir=/opt/module/flume/c2/checkpoint
a1.channels.c2.useDualCheckpoints=true
a1.channels.c2.backupCheckpointDir=/opt/module/flume/c2/backupcheckpoint
a1.channels.c2.dataDirs=/opt/module/flume/c2/datas


#sink
a1.sinks.k1.type = hdfs
#一旦路徑中含有基於時間的轉義序列,要求event的header中必須有timestamp=時間戳,如果沒有需要將useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-

a1.sinks.k1.hdfs.batchSize = 1000

#檔案的滾動
#60秒滾動生成一個新的檔案
a1.sinks.k1.hdfs.rollInterval = 30
#設定每個檔案到128M時滾動
a1.sinks.k1.hdfs.rollSize = 134217700
#禁用基於event數量的檔案滾動策略
a1.sinks.k1.hdfs.rollCount = 0
#指定檔案使用LZO壓縮格式
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second



a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.batchSize = 1000
a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 134217700
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.codeC = lzop
#a1.sinks.k2.hdfs.round = true
#a1.sinks.k2.hdfs.roundValue = 10
#a1.sinks.k2.hdfs.roundUnit = second

#連線元件
a1.sources.r1.channels=c1
a1.sources.r2.channels=c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

3.4 第二層通道指令碼

#!/bin/bash
#使用start啟動指令碼,使用stop停止指令碼
if(($#!=1))
then
    echo 請輸入start或stop!
    exit;
fi


if [ $1 = start ]
then
    ssh hadoop104 "source /etc/profile;nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f2.conf -Dflume.root.logger=INFO,console > /home/atguigu/f2.log 2>&1 &"

    elif [ $1 = stop ]
        then 
            ssh hadoop104 "ps -ef  | grep f2.conf | grep -v grep | awk  '{print \$2}' | xargs kill -9"
    else
        echo 請輸入start或stop!
fi

 
    

4 資料採集通道的啟動

onekeyboot

#!/bin/bash
#輸入start和stop引數,一鍵啟動或關閉hadoop,zk,kafka叢集,啟動f1,f2採集通道
if(($#!=1))
then 
    echo 請輸入start或stop!
    exit;
fi

#編寫函式,這個函式的功能為返回叢集中啟動成功的broker的數量
function countKafkaBrokders()
{
    count=0
    for((i=102;i<=104;i++))
    do
        result=$(ssh hadoop$i "jps | grep Kafka | wc -l")
        count=$[$result+$count]
    done

    #函式可以定義返回值,如果不定義,返回函式最後一條命令的執行狀態(返回0,代表成功,非0,即為異常)
    return $count
    

}

#啟動,注意啟動時,各個元件的依賴關係,例如zk必須先於kafka啟動,後於kafka關閉
if [ $1 = start ]
then
    zk start
    hd start
    kf start
    #保證kafka叢集已經啟動時,才能啟動f1,f2,判斷當前kafka叢集啟動了多少 broker例項
    while [ 1 ]
    do
        countKafkaBrokders
        #如果返回值不為3,有可能是機器還尚未執行broker的啟動命令,因此繼續判斷
            if(($?==3))
          then
            break
        fi
        sleep 2s
    done

 
    f1 start
    f2 start
    #檢視啟動了哪些程序
    xcall jps

    elif [ $1 = stop ]
    then
        f1 stop
        f2 stop
        kf stop
        #在kafka沒有停止完成之前,不能停止zk叢集
        while [ 1 ]
            do
                    countKafkaBrokders
                        #如果返回值不為0,kafka叢集沒有停止完成
                        if(($?==0))
                      then
                                break
                        fi
                        sleep 2s
            done

        zk stop
        hd stop
        #檢視還剩了哪些程序
            xcall jps
    else
        echo 請輸入start或stop!
fi