1. 程式人生 > >編寫flume-ng擴充套件提升吞吐

編寫flume-ng擴充套件提升吞吐

地址:https://www.tuicool.com/articles/i2UnYbY

最近在公司做openresty+flume+kafka的前端日誌採集,在測試flume時發現向kafka傳輸的頻寬吞吐才20MB/s,遠遠無法滿足需求。

找到瓶頸

不可盲目

盲目優化是很浪費時間的,一開始只能調調flume引數,改改batchSize之類的引數,結果無功而返。

最終無奈,決定靜心看一下flume的架構原理。

瞭解架構

flume的流水線上,包括source採集磁碟日誌,channel快取採集的日誌,sink將日誌發往遠端,按道理我需要分析出到底哪個環節拖慢了整體的流水線頻寬。

對於流水線系統來說,靠後的環節慢,則會導致之前的環節全部變慢,因為流水線擁塞了。

比如sink處理慢,就導致channel填滿,channel填滿就導致source停止採集。

懷疑的點

source只是簡單的TAILDIR模式,採集目錄下的增量日誌,順序讀磁碟的頻寬遠不止20MB/s,所以我對這個環節的懷疑是最輕的。

channel我採用了memory channel,因為source和sink各只有1個執行緒,鎖競爭不至於成為瓶頸。

sink採用了kafka sink,採用阻塞模型,傳送完一批才會從channel取下一批。

kafka叢集是否存在效能問題呢?為了驗證這一點,我配置了2個channel,讓source採用repliacating模式複製2份流量,併為2個channel各配置1個kafka sink,發現頻寬就是40MB/s了,說明kafka不是問題。

資料說話

效能調優不能靠猜,最好有資料為證,啟動flume時可以配置開啟metrics埠,這是一個http介面,可以查詢實時flume效能指標。

啟動時指定引數:

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

檢視實時指標:

curl localhost:34545/metrics

返回JSON如下:

{
 "SINK.k1": {
 "ConnectionCreatedCount": "0",
 "BatchCompleteCount": "0",
 "BatchEmptyCount": "257",
 "EventDrainAttemptCount"
: "0", "StartTime": "1519711797903", "BatchUnderflowCount": "3", "ConnectionFailedCount": "0", "ConnectionClosedCount": "0", "Type": "SINK", "RollbackCount": "0", "EventDrainSuccessCount": "40713702", "KafkaEventSendTimer": "1545475", "StopTime": "0" }, "SINK.k2": { "ConnectionCreatedCount": "0", "BatchCompleteCount": "0", "BatchEmptyCount": "256", "EventDrainAttemptCount": "0", "StartTime": "1519711799103", "BatchUnderflowCount": "3", "ConnectionFailedCount": "0", "ConnectionClosedCount": "0", "Type": "SINK", "RollbackCount": "0", "EventDrainSuccessCount": "40713702", "KafkaEventSendTimer": "1555840", "StopTime": "0" }, "SINK.k3": { "ConnectionCreatedCount": "0", "BatchCompleteCount": "0", "BatchEmptyCount": "256", "EventDrainAttemptCount": "0", "StartTime": "1519711799335", "BatchUnderflowCount": "3", "ConnectionFailedCount": "0", "ConnectionClosedCount": "0", "Type": "SINK", "RollbackCount": "0", "EventDrainSuccessCount": "40713702", "KafkaEventSendTimer": "1556911", "StopTime": "0" }, "CHANNEL.c3": { "ChannelCapacity": "1000000", "ChannelFillPercentage": "0.0", "Type": "CHANNEL", "EventTakeSuccessCount": "40713702", "ChannelSize": "0", "EventTakeAttemptCount": "40713962", "StartTime": "1519711796033", "EventPutAttemptCount": "40713702", "EventPutSuccessCount": "40713702", "StopTime": "0" }, "CHANNEL.c2": { "ChannelCapacity": "1000000", "ChannelFillPercentage": "0.0", "Type": "CHANNEL", "EventTakeSuccessCount": "40713702", "ChannelSize": "0", "EventTakeAttemptCount": "40713962", "StartTime": "1519711796033", "EventPutAttemptCount": "40713702", "EventPutSuccessCount": "40713702", "StopTime": "0" }, "CHANNEL.c1": { "ChannelCapacity": "1000000", "ChannelFillPercentage": "0.0", "Type": "CHANNEL", "EventTakeSuccessCount": "40713702", "ChannelSize": "0", "EventTakeAttemptCount": "40713963", "StartTime": "1519711796033", "EventPutAttemptCount": "40713702", "EventPutSuccessCount": "40713702", "StopTime": "0" }, "SOURCE.src_taildir": { "EventReceivedCount": "122141106", "AppendBatchAcceptedCount": "1566", "Type": "SOURCE", "AppendReceivedCount": "0", "EventAcceptedCount": "122141106", "StartTime": "1519711796541", "AppendAcceptedCount": "0", "OpenConnectionCount": "0", "AppendBatchReceivedCount": "1566", "StopTime": "0" } }

主要觀察channel的填充率ChannelFillPercentage,如果接近100%說明佇列無法及時被消費,瓶頸在sink端。(上述json是我優化後的,發現channel基本為空)。

優化

既然瓶頸是同步阻塞推送kafka的sink端,那麼顯然增加sink的數量就可以加快channel消費。

一開始我未經瞭解,直接使用了sink gourp為channel配置了2個kafka sink,採用load balance來分發流量,結果發現仍舊20MB/s,毫無提升。

經過思考得知,sink group仍舊採用單執行緒工作,只是充當了2個kafka sink的代理而已,日誌輪轉的被交給2個kafka sink物件,即2個kafka sink物件在同一個執行緒裡交替被呼叫,根本沒有並行能力。

照著這個優化方向,我期望可以配置多個channel,每個channel一個sink執行緒,並讓source將流量均勻的派發給2個channel,從而實現多執行緒併發。

/**
 * 背景: Kafka Sink 單執行緒同步呼叫,吞吐無法繼續提升。
 *
 * 解決方案:自定義實現channel selector, 將source的流量均勻分發到多個channel, 並讓每個channel由一個獨立的kafka sink消費
 *
 * 配置:為source指定selector.type=org.apache.flume.channel.RRChannelSelector
 *
 */
public class RRChannelSelector extends AbstractChannelSelector {
  private static final List<Channel> EMPTY_LIST = new ArrayList<>();
 
  private int rrIndex = 0;
 
  @Override
  public List<Channel> getRequiredChannels(Event event) {
    List<Channel> allChannels = getAllChannels();
 
    int index = rrIndex;
    rrIndex = (rrIndex + 1) % allChannels.size();
 
    List<Channel> result = new ArrayList<>();
    result.add(allChannels.get(index));
    return result;
  }
 
  @Override
  public List<Channel> getOptionalChannels(Event event) {
    return RRChannelSelector.EMPTY_LIST;
  }
 
  @Override
  public void configure(Context context) {
 
  }
}

flume-ng-round-robin-channel-selector

用途

flume-ng預設source -> channel -> sink的流水線配置,會受到sink處理速率的影響,吞吐無法線性提升。

本外掛通過實現自定義channel selector,實現了source均勻派發流量到多個channel,從而可以為每個channel配備一個獨立的sink(執行緒),從而實現吞吐線性提升。

編譯方法

  • 將RRChannelSelector.java檔案拷貝到原始碼子路徑:flume-ng-core/src/main/java/org/apache/flume/channel
  • 回到flume原始碼根目錄,編譯整個專案:mvn clean install -DskipTests
  • 拷貝flume-ng-core/target/flume-ng-core-1.8.0.jar(其中1.8.0是你flume版本)到線上flume環境的lib目錄下覆蓋對應檔案

配置方法

為source指定selector.type,例如:

agent.sources.src_taildir.selector.type = org.apache.flume.channel.RRChannelSelector

為source指定多個channel,例如:

agent.sources.src_taildir.channels = c1 c2 c3

為每個channel指定一個sink,例如:

agent.sinks.k1.channel = c1 agent.sinks.k2.channel = c2 agent.sinks.k3.channel = c3

我們實現一個自定義的Channel Selector類,然後重新編譯flume生成新的jar包,覆蓋到flume即可。

下面是我最終的配置,我配置了3個channel,3個kafka sink,1個source,並配置source的channel selector為我的外掛,從而可以將流量輪轉的發給每一個channel:

# describe the agent
agent_zdm1.sources=src_taildir
agent_zdm1.sinks=k1 k2 k3
agent_zdm1.channels=c1 c2 c3
 
# Describe/configure the source
agent_zdm1.sources.src_taildir.type = TAILDIR
agent_zdm1.sources.src_taildir.positionFile = /root/log-analyze/taildir/taildir_position.json
agent_zdm1.sources.src_taildir.filegroups = f1
agent_zdm1.sources.src_taildir.filegroups.f1 = /data/logs/collect/.*
agent_zdm1.sources.src_taildir.batchSize = 100000
agent_zdm1.sources.src_taildir.selector.type = org.apache.flume.channel.RRChannelSelector
 
#agent_zdm1.sources.src_taildir.backoffSleepIncrement = 2
#agent_zdm1.sources.src_taildir.maxBackoffSleep = 10
 
# Describe the sink
agent_zdm1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent_zdm1.sinks.k1.kafka.topic = analytics-zcollect
agent_zdm1.sinks.k1.kafka.bootstrap.servers = localhost:9092
agent_zdm1.sinks.k1.kafka.flumeBatchSize = 1000
agent_zdm1.sinks.k1.kafka.producer.acks = 0
agent_zdm1.sinks.k1.kafka.producer.linger.ms = 100
agent_zdm1.sinks.k1.kafka.producer.batch.size = 100000
 
agent_zdm1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent_zdm1.sinks.k2.kafka.topic = analytics-zcollect
agent_zdm1.sinks.k2.kafka.bootstrap.servers = localhost:9092
agent_zdm1.sinks.k2.kafka.flumeBatchSize = 1000
agent_zdm1.sinks.k2.kafka.producer.acks = 0
agent_zdm1.sinks.k2.kafka.producer.linger.ms = 500
agent_zdm1.sinks.k2.kafka.producer.batch.size = 100000
 
agent_zdm1.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink
agent_zdm1.sinks.k3.kafka.topic = analytics-zcollect
agent_zdm1.sinks.k3.kafka.bootstrap.servers = localhost:9092
agent_zdm1.sinks.k3.kafka.flumeBatchSize = 1000
agent_zdm1.sinks.k3.kafka.producer.acks = 0
agent_zdm1.sinks.k3.kafka.producer.linger.ms = 500
agent_zdm1.sinks.k3.kafka.producer.batch.size = 100000
 
# Use a channel which buffers events in memory
agent_zdm1.channels.c1.type = memory
agent_zdm1.channels.c1.capacity = 1000000
agent_zdm1.channels.c1.transactionCapacity = 100000
 
agent_zdm1.channels.c2.type = memory
agent_zdm1.channels.c2.capacity = 1000000
agent_zdm1.channels.c2.transactionCapacity = 100000
 
agent_zdm1.channels.c3.type = memory
agent_zdm1.channels.c3.capacity = 1000000
agent_zdm1.channels.c3.transactionCapacity = 100000
 
# Bind the source and sink to the channel
agent_zdm1.sources.src_taildir.channels = c1 c2 c3
 
agent_zdm1.sinks.k1.channel = c1
agent_zdm1.sinks.k2.channel = c2
agent_zdm1.sinks.k3.channel = c3

batchSize等引數當然具有一定的意義,但是僅用於優化單個pipeline(流水線),要實現線性擴充套件是需要線上程擴充套件性方面做上述優化工作的。

成果

經過優化,在4核的伺服器上執行4個openresty+1個flume程序,仍舊可以跑出40MB/s的網絡卡外出流量,日誌採集無延遲,達到了我預期中的效果。


相關推薦

編寫flume-ng擴充套件提升吞吐

地址:https://www.tuicool.com/articles/i2UnYbY最近在公司做openresty+flume+kafka的前端日誌採集,在測試flume時發現向kafka傳輸的頻寬吞吐才20MB/s,遠遠無法滿足需求。找到瓶頸不可盲目盲目優化是很浪費時間的

【Java】【FlumeFlume-NG啟動過程源代碼分析(一)

code extends fix tar top 依據 oid article gif 從bin/flume 這個shell腳本能夠看到Flume的起始於org.apache.flume.node.Application類,這是flume的main函數所在。   m

Flume 學習筆記之 Flume NG高可用集群搭建

哈哈 process bind under hdf ora chan lsp max Flume NG高可用集群搭建: 架構總圖: 架構分配: 角色 Host 端口 agent1 hadoop3 52020 collect

高可用flume-ng搭建

flume一、概述1.通過搭建高可用flume來實現對數據的收集並存儲到hdfs上,架構圖如下:二、配置Agent1.cat flume-client.properties#name the components on this agent 聲明source、channel、sink的名稱 a1.sou

flume ng簡介

targe flume aliyun article con https log com spm https://yq.aliyun.com/articles/50487?spm=5176.100239.blogcont43566.18.uawbnYflume ng簡介

Flume NG原始碼分析(七)ChannelSelector

前幾篇介紹了Flume NG Source元件的基本情況,接下來看看Channel相關的元件,Channel相關元件有: 1. Channel 2. ChannelSelector 3. Interceptor / InterceptorChain 4. ChannelProcess

Flume NG原始碼分析(六)應用程式使用的RpcClient設計

上一篇Flume NG原始碼分析(五)使用ThriftSource通過RPC方式收集日誌 介紹了ThriftSource利用Thrfit服務ThriftSourceProtocol來收集日誌。這篇說說flume-ng-sdk中提供給應用層序使用的RpcClient的設計和實現。繼續使用ThriftR

Flume NG原始碼分析(五)使用ThriftSource通過RPC方式收集日誌

上一篇說了利用ExecSource從本地日誌檔案非同步的收集日誌,這篇說說採用RPC方式同步收集日誌的方式。筆者對Thrift比較熟悉,所以用ThriftSource來介紹RPC的日誌收集方式。 整體的結構圖如下: 1. ThriftSource包含了一個Thrift Server,以及一個

Flume NG原始碼分析(四)使用ExecSource從本地日誌檔案中收集日誌

常見的日誌收集方式有兩種,一種是經由本地日誌檔案做媒介,非同步地傳送到遠端日誌倉庫,一種是基於RPC方式的同步日誌收集,直接傳送到遠端日誌倉庫。這篇講講Flume NG如何從本地日誌檔案中收集日誌。 ExecSource是用來執行本地shell命令,並把本地日誌檔案中的資料封裝成Event

Flume NG原始碼分析(三)使用Event介面表示資料流

Flume NG有4個主要的元件: Event表示在Flume各個Agent之間傳遞的資料流 Source表示從外部源接收Event資料流,然後傳遞給Channel Channel表示對從Source傳遞的Event資料流的臨時儲存 Sink表示從Channel中接收儲存的Event

Flume NG原始碼分析(二)支援執行時動態修改配置的配置模組

在上一篇中講了Flume NG配置模組基本的介面的類,PropertiesConfigurationProvider提供了基於properties配置檔案的靜態配置的能力,這篇細說一下PollingPropertiesFileConfigurationProvider提供的執行時動態修改配置並生效的

Flume NG原始碼分析(一)基於靜態properties檔案的配置模組

日誌收集是網際網路公司的一個重要服務,Flume NG是Apache的頂級專案,是分散式日誌收集服務的一個開源實現,具有良好的擴充套件性,與其他很多開源元件可以無縫整合。搜了一圈發現介紹Flume NG的文章有不少,但是深入分析Flume NG原始碼的卻沒有。準備寫一個系列分析一下Flume NG的

Logstash和Flume-NG Syslog接收小測試

目前在大規模日誌處理平臺中常見的日誌採集器可以採用Logstash或Flume。這兩種日誌採集器架構設計理念基本相似,都採用採集-過濾處理-輸出的方式。下面對這兩種採集器Syslog接收效能做個簡單測試,供大家參考。  測試過程 基本測試過程是使用2臺機器,1臺負責傳送Syslog資料

Flume NG高可用叢集搭建詳解(基於flume-1.7.0)

1、Flume NG簡述 Flume NG是一個分散式,高可用,可靠的系統,它能將不同的海量資料收集,移動並存儲到一個數據儲存系統中。輕量,配置簡單,適用於各種日誌收集,並支援 Failover和負載均衡。並且它擁有非常豐富的元件。Flume NG採用的是三層架構:Agent層,Collecto

Spark和Flume-ng整合

如何將Flune-ng裡面的資料傳送到Spark,利用Spark進行實時的分析計算。本文將通過Java和Scala版本的程式進行程式的測試。 Spark和Flume-ng的整合屬於Spark的Streaming這塊。在講述如何使用Spark Streaming之前,我們先來了解一下什麼是Spar

Flume(NG)架構設計要點及配置實踐

Flume(NG)架構設計要點及配置實踐 Yanjun 架構師 2016-04-03 架構師(JiaGouX) 我們都是架構師!   Flume NG是一個分散式、可靠、可用的系統,它能夠將不同資料來源的海量日誌資料進行高效收集、聚合、移動,最後儲存

Flink SQL 核心解密 —— 提升吞吐的利器 MicroBatch

之前我們在 Flink SQL 中支援了 MiniBatch, 在支援高吞吐場景發揮了重要作用。今年我們在 Flink SQL 效能優化中一項重要的改進就是升級了微批模型,我們稱之為 MicroBatch,也叫 MiniBatch2.0。 在設計和實現 Flink 的流計算運算元時,我們一般會把“面向狀態程

[Hadoop] CentOS7 安裝flume-ng-1.6.0-cdh5.7.0

1. Flume 安裝部署 根據官方文件描述,市面上的Flume主流版本有兩個:0.9.x and 1.x。這兩個版本差異非常非常大,舊版本已經被淘汰了,要用的話就使用新版本。當然本文中既定版本為cd

Flume-NG + HDFS + HIVE 日誌收集分析

[[email protected] apache-flume-1.3.0-bin]# cat /data/apache-flume-1.3.0-bin/conf/flume.conf# Define a memory channel called c1 on a1a1.channels.c1.ty

Flume(ng) 自定義sink實現和屬性注入

最近需要利用flume來做收集遠端日誌,所以學習一些flume最基本的用法。這裡僅作記錄。 遠端日誌收集的整體思路是遠端自定義實現log4j的appender把訊息傳送到flume端,flume端自定義實現一個sink來按照我們的規則儲存日誌。