基於Docker的Kafka+Flume+flink日誌處理實驗
目錄
目錄
一、序言
實驗用到的元件有:docker、kafka、kafka-manager、zookeeper、flume;由於資源限制使用docker下安裝kafka和zookeeper,在試驗機上直接安裝flume和kafka-manager。
實驗內容:1.本地產生日誌資料,通過log4j將日誌收集到flume中,flume將資料sink到kafka中;2.flume從kafka中獲取資料然後列印到控制檯中;(或者使用flink從kafka中拿到資料,新增標識欄位後重新放入kafka另一個topic中,注稍後補全這部分)
實驗目的:通過實驗學習到docker安裝、使用、kafka操作、flume操作以及部署工作;
所需maven依賴包
<!-- kafka start --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.1</version> </dependency> <!-- kafka end --> <!--flume--> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>1.7.0</version> </dependency> <!--flink--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.5.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.9 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.11</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.5.0</version> </dependency> <!-- log4j start --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <!-- log4j end --> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
二、環境準備
2.1 docker環境準備
實驗機環境為centos7
2.2 安裝zookeeper、kafka、kafka-manager環境
2.2.1 zookeeper
使用docker search zookeeper命令獲取資源庫zookeeper列表
下載首先pull獲取 wurstmeister的zookeeper
啟動zookeeper:docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
2.2.2 kafka
使用docker search zookeeper命令獲取資源庫kafka列表
下載首先pull獲取 wurstmeister的kafka
啟動kafka:
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.83.112:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.83.112:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
解釋:
KAFKA_BROKER_ID=0 //broker id,如果想要啟動多個就執行多次命令保證 id不相同就行了
KAFKA_ZOOKEEPER_CONNECT=192.168.83.112:2181 //外界連線kafka所需
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.83.112:9092 //外界連線kafka所需,地址是宿主機地址
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
2.2.3 kafka-manager
安裝在宿主機上,沒有安裝到docker中,因為docker中的映象存在元件缺失
從 GitHub 下載後編譯,編譯方法自行百度,或者直接下載已經編譯好的
連結:https://pan.baidu.com/s/1zmhG6-eP_0RsGDxvcEMzyw
密碼:sc8w
解壓到最終安裝位置,然後配置兩項
kafka-manager.zkhosts="192.168.83.112:2181
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
logger-startup-timeout = 30s
}
然後執行啟動命令
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &
2.3 安裝flume
從官網下載檔案,解壓後上傳到最終安裝目錄,
配置java路徑到flume-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
2.4 安裝flink
從官網下載檔案,解壓上傳到最終目錄
因為本次試驗使用的是單機版的,因此直接在bin目錄下執行.start-cluster.sh即可,web埠為8081
至此,我們的環境已經全部準備好了
三、程式開發
3.1.程式生成日誌到flume
log4j配置:
log4j.rootLogger=INFO,flume,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern="%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.83.112
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n
迴圈生成日誌
import java.util.Date;
import org.apache.log4j.Logger;
public class WriteLog {
private static Logger logger = Logger.getLogger(WriteLog.class);
public static void main(String[] args) throws InterruptedException {
// 記錄debug級別的資訊
logger.debug("This is debug message.");
// 記錄info級別的資訊
logger.info("This is info message.");
// 記錄error級別的資訊
logger.error("This is error message.");
int i = 0;
while (true) {
logger.info(new Date().getTime());
logger.info("測試資料" + i);
Thread.sleep(2000);
i += 1;
}
}
}
在flume中的conf,複製模板為example.conf
書寫配置:
# Name the components on this agent
# 定義一個 agent 的元素
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 配置 source
#使用avro接收log4j過來的資料
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Describe the sink
# 配置 sink
#a1.sinks.k1.type = logger
#將資料寫入kafka,設定topic和brokers地址
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = 192.168.83.112:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
# Use a channel which buffers events in memory
# 定義 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 用 channel 連線起來 source 和 sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
執行flume命令
flume-ng agent -c /opt/soft/apache-flume-1.8.0-bin/conf -f example.conf --name a1 -Dflume.root.logger=INFO,console
啟動java程式
3.2程式獲取kafka中的資料
java程式:
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
/**
* Created by anan on 2018-7-31 14:20.
*/
public class CustomerSource extends AbstractSink implements Configurable {
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to
// do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
String eventBody = new String(event.getBody(), "utf-8");
System.out.println("============= " + eventBody + " ========");
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
}
// you must add this line of code in order to close the Transaction.
txn.close();
return status;
}
@Override
public void configure(Context context) {
}
@Override
public synchronized void start() {
super.start();
}
@Override
public synchronized void stop() {
super.stop();
}
}
新建flume conf test.conf
#soource的名字
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.kafkaSource.channels = memoryChannel
agent.sinks.hdfsSink.channel = memoryChannel
#-------- kafkaSource相關配置-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect =192.168.83.112:2181
# 配置消費的kafka topic
agent.sources.kafkaSource.topic = test
# 配置消費者組的id
agent.sources.kafkaSource.groupId = flume
# 消費超時時間,參照如下寫法可以配置其他所有kafka的consumer選項。注意格式從kafka.xxx開始是consumer的配置屬性
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100
#------- memoryChannel相關配置-------------------------
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
#---------hdfsSink 相關配置------------------
agent.sinks.hdfsSink.type = com.gd.bigdataleran.flume.customerSource.CustomerSource
執行flume命令
flume-ng agent -c /opt/soft/apache-flume-1.8.0-bin/conf -f test.conf --name agent -Dflume.root.logger=INFO,console
然後檢視控制檯是否列印生成的日誌。
flink接收
如果想要flink接收kafka資料然後將資料經過簡單處理後放到kafka,就需要使用到flinkkafkaconsumer和flinkkafkaproductor
,java程式碼如下
package com.gd.bigdataleran.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Created by anan on 2018-8-3 15:44.
*/
public class kafkaconsumer {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 非常關鍵,一定要設定啟動檢查點!!
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.83.112:9092");
// properties.setProperty("zookeeper.connect", "192.168.83.112:2181");
properties.setProperty("group.id", "test112");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("test", new SimpleStringSchema(), properties);
myConsumer.setStartFromEarliest();
System.out.println("執行輸入");
DataStream<String> stream = env.addSource(myConsumer);
DataStream ds = stream.map(new MapFunction<String, Object>() {
@Override
public Object map(String s) throws Exception {
return s + "==" + new Date().getTime();
}
});
FlinkKafkaProducer09<String> flinkKafkaProducer09 = new FlinkKafkaProducer09<String>("192.168.83.112:9092","test1",new SimpleStringSchema());
ds.addSink(flinkKafkaProducer09);
System.out.println("執行輸出");
env.execute();
}
}
將需要的jar包上傳到flink目錄下的lib目錄下,然後執行flink命令即可
flume-ng agent -c /opt/soft/apache-flume-1.8.0-bin/conf -f example.conf --name a1 -Dflume.root.logger=INFO,console
驗證資料
使用java連線kafka,獲取topic中的資料進行驗證;驗證程式碼 如下
private void getKafkaData() {
String topic = "test1";
Properties kafkaProps = new Properties();
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("bootstrap.servers", "192.168.83.112:9092");
kafkaProps.put("zookeeper.connect", "192.168.83.112:2181");
kafkaProps.put("group.id", "farmtest1");
kafkaProps.put("auto.offset.reset", "smallest");
kafkaProps.put("enable.auto.commit", "true");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(kafkaProps));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1); // 一次從主題中獲取一個數據
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 獲取每次接收到的這個資料
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()) {
String message = new String(iterator.next().message());
consumer.commitOffsets();
System.out.println(message);
}
}
時間差匆忙,有問題歡迎大家提問討論
相關推薦
基於Docker的Kafka+Flume+flink日誌處理實驗
目錄 目錄 一、序言 驗證資料 一、序言 實驗用到的元件有:docker、kafka、kafka-manager、zookeeper、flume;由於資源限制使用docker下安裝kafka和zookeeper,在試驗
基於SparkStreaming對銀行日誌處理系列--整體技術框架
基於SparkStreaming對銀行日誌分析,實時技術架構圖 通過flume實時採集原日誌,送到kafka快取,SparkStreaming準實時從kafka拿資料,經過ETL、聚合計算送到redis,供前端展示,具體技術及程式碼見後面部落格; 除了實時部分,還有離線這一塊,技術框架如下: 前面都一樣,
基於flink和drools的實時日誌處理
1、背景 日誌系統接入的日誌種類多、格式複雜多樣,主流的有以下幾種日誌: filebeat採集到的文字日誌,格式多樣 winbeat採集到的作業系統日誌 裝置上報到logstash的syslog日誌 接入到kafka的業務日誌 以上通過各種渠道接入的日誌,存在2個主要的問題: 格式不統一、不規範、標準化
Flink視頻教程_基於Flink流處理的動態實時電商實時分析系統
分布 業務 電商分析 apr 進行 處理 密碼 教程 包括 Flink視頻教程_基於Flink流處理的動態實時電商實時分析系統 課程分享地址鏈接:https://pan.baidu.com/s/1cX7O-45y6yUPT4B-ACfliA 密碼:jqmk 在開始學習前給
基於flume的日誌收集系統配置
大資料系統中通常需要採集的日誌有: 系統訪問日誌 使用者點選日誌 其他業務日誌(比如推薦系統的點選日誌) 在收集日誌的時候,一般分為三層結構:採集層、彙總層和儲存層,而不是直接從採集端將資料傳送到儲存端,這樣的好處有: 如果儲存端如Hadoop叢集、Kafka等需要停
springboot2.0---05、基於AOP日誌處理
@Aspect @Component public class LogAop { private final Logger log = LoggerFactory.getLogger(LogA
基於Apache Flume Datahub外掛將日誌資料同步上雲
本文用到的 簡介 Apache Flume是一個分散式的、可靠的、可用的系統,可用於從不同的資料來源中高效地收集、聚合和移動海量日誌資料到集中式資料儲存系統,支援多種Source和Sink外掛。本文將介紹如何使用Apache Flume的Datahub Sink外掛將日誌
基於Flink流處理的動態實時電商實時分析系統
在開始學習前給大家說下什麼是Flink? 1.Flink是一個針對流資料和批資料的分散式處理引擎,主要用Java程式碼實現。 2.Apache Flink作為Apache的頂級專案,Flink集眾多優點於一身,包括快速、可靠可擴充套件、完全相容Hadoop、使
基於flume的日誌系統
思路 日誌統一輸出至kafka flume agent充當kafka消費者,將日誌輸出至elasticsearch kibana負責展示日誌資訊 準備工作 flume 1.8 kafka 1.1.0 elasticsearch&kibana 6.5.4 專案中一般
雲星資料---Apache Flink實戰系列(精品版)】:Flink流處理API詳解與程式設計實戰002-Flink基於流的wordcount示例002
三、基於socket的wordcount 1.傳送資料 1.傳送資料命令 nc -lk 9999 2.傳送資料內容 good good study day day
基於Flink流處理的動態實時億級全端用戶畫像系統
發展 完美 企業應用 應用 消費 spring 服務架構 習慣 link 基於Flink流處理的動態實時億級全端用戶畫像系統網盤地址:https://pan.baidu.com/s/1t_VNw9I6ML1o-jSqVlhBJQ 提取碼: 3t5q備用地址(騰訊微雲):ht
基於Heka+Flume+Kafka+ELK的日誌系統
前期準備 系統是centos6.6,64位機器。 所用軟體版本: Logstash:2.3.3 JDK:1.8.0_25 Elasticsearch:2.3.4 Kibana:4.5.2 Heka:0.10.0 Flume:1.7.0 Zookeeper:3.4
Flume、Kafka與Storm實現日誌處理
1. ZooKeeper 安裝參考 2. Kafka 2.1 解壓安裝 # 確保scala已經安裝好,本文安裝的是2.11.7 tar -xf kafka_2.11-0.9.0.1.tgz cd kafka_2.11-0.9.0.1 mkdi
【Spark深入學習 -10】基於spark構建企業級流處理系統
變現 大內存 空間換時間 detail python 訪問量 新版本 kafak 計算框架 ----本節內容------- 1.流式處理系統背景 1.1 技術背景 1.2 Spark技術很火 2.流式處理技術介紹 2.1流式處理技術概念 2.
學習筆記17_網站異常和日誌處理
add asa asp 日誌類 cte clob rect string 學習 *在clobal.asax中,寫protected void Application_Error() { Respone.Redirect("Default.aspx");//發生錯誤能在此
人工智能會促成一張基於物聯網可自主處理的數據互聯網絡
雲計算 物聯網 邊緣計算 移動互聯 雲計算——信息和數據爆炸的自然產物互聯網發展至今,總結下來大概經歷了這麽三個歷史時期或階段,分別是IT時代、ICT時代、DT時代。IT時代可以理解為桌面時代,那個時候網絡剛起步發展,人們只是通過桌面機進行簡單的信息展示或者獲取,雖然很多設備當時都處於脫
controller層統一攔截進行日誌處理
uuid final asp end sna ogg long lan tostring 前言 在項目中添加統一日誌時,我們往往會用到aop進行切面處理,常用在controller層添加切面,以處理請求和返回的各項參數數據。 使用切面進行日誌處理 下面我們就看一個例子說明基
實現基於MySQL管理rsyslog日誌
mysql 管理 rsyslog 實驗概要: 本實驗主機默認采用系統為Centos 6.9 host1主機作為遠程日誌客戶端 安裝:rsyslog (默認安裝)、rsyslog-mysql host2主機作為日誌服務器 安裝:mysql(默認安裝,也可安裝
基於LAMP實現web日誌管理查看
日誌 web 管理 前言:日誌是一個重要的信息庫,如何高效便捷的查看系統中的日誌信息,是系統管理員管理系統的必備的技術。實現方式: 1、將日誌存儲於數據庫。 2、采用LAMP架構,搭建PHP應用,通過web服務訪問數據庫,提取日誌信息,展現到web頁面。基本結構圖:一、搭建日誌服務器
Python 日誌處理(二) 使用正則表達式處理Nginx 日誌
表示 cnblogs sunday sta return __main__ pattern eth 解析 使用正則表達式來處理Nginx 日誌 一、 先對單行的日誌進行分組正則匹配,返回匹配後的結果(字典格式): from datetime import dateti