1. 程式人生 > 其它 >資料採集的flume架構

資料採集的flume架構

測試1:

新建一個flume1.conf檔案

name

a1.sources = r1
a1.channels = c1
a1.sinks = k1

source

a1.sources.r1.type = netcat
a1.sources.r1.bind = DAQ102
a1.sources.r1.port = 6666

channel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = atguigu
a1.sinks.k1.kafka.bootstrap.servers = DAQ102:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.useFlumeEventFormat = false

bind

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

測試2:
使用選擇器,將不同的資料新增到不同的topic中

攔截器程式碼:

package com.hybg.daq;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class MyInterceptor implements Interceptor {

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {

    //原理:根據body的資料包含什麼內容在header中新增什麼內容
    //獲取header檔案
    Map<String, String> headers = event.getHeaders();
    //獲取body檔案
    String string = new String(event.getBody(), StandardCharsets.UTF_8);
    //判斷是否包含某個檔案
    if(string.contains("atguigu")){
        headers.put("topic","atguigu");
    }else if(string.contains("shangguigu")){
        headers.put("topic","shangguigu");
    }else {
        headers.put("topic","other");
    }
    return event;
}

@Override
public List<Event> intercept(List<Event> list) {
    for (Event event : list) {
        intercept(event);
    }

    return list;
}

@Override
public void close() {

}

public static class MyBuilder implements Builder{

    @Override
    public Interceptor build() {
        return new MyInterceptor();
    }

    @Override
    public void configure(Context context) {

    }
}

}

name

a1.sources = r1
a1.channels = c1
a1.sinks = k1

source

a1.sources.r1.type = netcat
a1.sources.r1.bind = DAQ102
a1.sources.r1.port = 4444

配置攔截器

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hybg.daq.MyInterceptor$MyBuilder

channel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = other
a1.sinks.k1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.useFlumeEventFormat = false

bind

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

測試三:

name

a1.sources = r1
a1.channels = c1
a1.sinks = k1

source

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.batchSize = 5000
a1.sources.source1.batchDurationMillis = 2000
a1.sources.source1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092
a1.sources.source1.kafka.topics = atguigu,shangguigu,other
a1.sources.source1.kafka.consumer.group.id = customs

channel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

sinks

a1.sinks.k1.type = logger

bind

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

①三個元件都有

name

a1.sources = r1
a1.channels = c1
a1.sinks=k1

sources

a1.sources.r1.type = netcat
a1.sources.r1.bind = DAQ102
a1.sources.r1.port = 5555

channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092

a1.channels.c1.kafka.topic = atguigu

a1.channels.c1.parseAsFlumeEvent = false

sink

a1.sinks.k1.type = logger

bind

a1.sources.r1.channels = c1
a1.sinks.r1.channel = c1

②有source有channel

name

a1.sources = r1
a1.channels = c1

sources

a1.sources.r1.type = netcat
a1.sources.r1.bind = DAQ102
a1.sources.r1.port = 5555

channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092
a1.channels.c1.kafka.topic = atguigu
a1.channels.c1.parseAsFlumeEvent = false

bind

a1.sources.r1.channels = c1

③有sink有channel

name

a1.channels = c1
a1.sinks=k1

channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092

a1.channels.c1.kafka.topic = atguigu

a1.channels.c1.parseAsFlumeEvent = false

sink

a1.sinks.k1.type = logger

bind

a1.sinks.r1.channel = c1

資料採集模組:

第一層flume:

name

a1.sources = r1
a1.channels = c1

sources

a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1

將資料夾中所有的app的檔案都進行讀取

a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.batchSize = 1000

設定斷點續傳的檔案儲存位置

a1.sources.r1.positionFile =/opt/module/flume-1.9.0/position/taildir_position.json

攔截器j

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hybg.daq.TaildirInterceptor$MyBuilder

channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

bind

a1.sources.r1.channels = c1

第二層flume:

name

a2.sources = r1
a2.channels = c1
a2.sinks = k1

source

a2.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a2.sources.r1.batchSize = 1000
a2.sources.r1.batchDurationMillis = 2000
a2.sources.r1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092
a2.sources.r1.kafka.topics = topic_log
a2.sources.r1.useFlumeEventFormat = false

攔截器

設定攔截器後可能會導致錯誤,出現timestrap不能讀取的問題

a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type = com.hybg.daq.timeInterceptor$MyBuilder

channel

a2.channels.c1.type = file

檔案的

a2.channels.c1.dataDirs = /opt/module/flume-1.9.0/jobs/filechannel
a2.channels.c1.capacity = 1000000
a2.channels.c1.transactionCapacity = 10000
a2.channels.c1.checkpointDir = /opt/module/flume-1.9.0/jobs/checkpointdir
a2.channels.c1.keep-alive = 3

a1.channels.c1.useDualCheckpoints = true

a1.channels.c1. backupCheckpointDir = /otherdiskdir

sinks

a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a2.sinks.k1.hdfs.filePrefix = log-
a2.sinks.k1.hdfs.round = false
a2.sinks.k1.hdfs.roundValue = 10
a2.sinks.k1.hdfs.rollSize = 134217728
a2.sinks.k1.hdfs.rollCount = 0

控制輸出檔案是原生檔案。

a2.sinks.k1.hdfs.fileType = CompressedStream
a2.sinks.k1.hdfs.codeC = lzop

bind

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

啟動指令碼2:

!/bin/bash

if [ $# -lt 1 ]
then
echo “<<<<<<<<<<<<<<<<<<<<輸入有效引數>>>>>>>>>>>>>>>>>>>>”
echo “{start,stop}”
exit
fi

case $1 in
“start”)
echo “<<<<<<<<<<<<<<<<<<<<向HDFS傳遞資料>>>>>>>>>>>>>>>>>>>>”
ssh DAQ104 “flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume_2.conf -n a2 -Dflume.root.logger=INFO,console 1>/opt/module/flume-1.9.0 2>&1 &”
;;
“stop”)
echo “<<<<<<<<<<<<<<<<<<<<停止向HDFS傳遞資料>>>>>>>>>>>>>>>>>>>>”
ssh DAQ104 “ps -ef | grep flume_2.conf | grep -v grep | awk ‘{print $2}’ | xargs kill -9”
;;
*)
echo “<<<<<<<<<<<<<<<<<<<<引數錯誤>>>>>>>>>>>>>>>>>>>>”
;;
esac
————————————————
版權宣告:本文為CSDN博主「海洋餅乾1126」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處連結及本宣告。
原文連結:https://blog.csdn.net/weixin_44868211/article/details/118104017