1. 程式人生 > >Kafka在大數據環境中的應用

Kafka在大數據環境中的應用

rri 開發 iter pack success 應用 引入 class 支持

我們生活在一個數據爆炸的時代,數據的巨量增長給我們的業務處理帶來了壓力,同時巨量的數據也給我們帶來了十分可觀的財富。隨著大數據將各個行業用戶、運營商、服務商的數據整合進大數據環境,或用戶取用大數據環境中海量的數據,業務平臺間的消息處理將變得尤為復雜。如何高效地采集、使用數據,如何減輕各業務系統的壓力,也變得越來越突出。在早期的系統實現時,業務比較簡單。即便是數據量、業務量比較大,大數據環境也能做出處理。但是隨著接入的系統增多,數據量、業務量增大,大數據環境、業務系統都可出現一定的瓶頸。下面我們看幾個場景。

場景一:我們開發過一個設備信息挖掘平臺。這個平臺需要實時將采集互聯網關采集到的路由節點的狀態信息存入數據中心。通常一個網關一次需要上報幾十甚至幾百個變化的路由信息。全區有幾萬個這種互聯網關。當信息采集平臺將這些變化的數據信息寫入或更新到數據庫時候,會給數據庫代理非常大的壓力,甚至可以直接將數據庫搞掛掉。這就對我們的數據采集系統提出了很高的要求。如何穩定高效地把消息更新到數據庫這一要求擺了出來。

場景二:數據中心處理過的數據需要實時共享給幾個不同的機構。我們常采用的方法是將數據批量存放在數據采集機,分支機構定時來采集;或是分支機構通過JDBC、RPC、http或其他機制實時從數據中心獲取數據。這兩種方式都存在一定的問題,前者在於實時性不足,還牽涉到數據完整性問題;後者在於,當數據量很大的時候,多個分支機構同時讀取數據,會對數據中心的造成很大的壓力,也造成很大的資源浪費。

為了解決以上場景提出的問題,我們需要這樣一個消息系統:

緩沖能力,系統可以提供一個緩沖區,當有大量數據來臨時,系統可以將數據可靠的緩沖起來,供後續模塊處理;

訂閱、分發能力,系統可以接收消息可靠的緩存下來,也可以將可靠緩存的數據發布給使用者。

這就要我們找一個高吞吐的、能滿足訂閱發布需求的系統。

Kafka是一個分布式的、高吞吐的、基於發布/訂閱的消息系統。利用kafka技術可以在廉價PC Server上搭建起大規模的消息系統。Kafka具有消息持久化、高吞吐、分布式、實時、低耦合、多客戶端支持、數據可靠等諸多特點,適合在線和離線的消息處理。

使用kafka解決我們上述提到的問題。

技術分享圖片

互聯網關采集到變化的路由信息,通過kafka的producer將歸集後的信息批量傳入kafka。Kafka按照接收順序對歸集的信息進行緩存,並加入待消費隊列。Kafka的consumer讀取隊列信息,並一定的處理策略,將獲取的信息更新到數據庫。完成數據到數據中心的存儲。

數據中心的數據需要共享時,kafka的producer先從數據中心讀取數據,然後傳入kafka緩存並加入待消費隊列。各分支結構作為數據消費者,啟動消費動作,從kafka隊列讀取數據,並對獲取的數據進行處理。

Kafka生產的代碼如下:

public void produce(){

//生產消息預處理

produceInfoProcess();

pro.send(ProducerRecord,new Callback(){

@Override

onCompletion() {

if (metadata == null) {

// 發送失敗

failedSend();

} else {

//發送成功!"

successedSend();

}

}

});

}

消息生產者根據需求,靈活定義produceInfoProcess()方法,對相關數據進行處理。並依據數據發布到kafka的情況,處理回調機制。在數據發送失敗時,定義failedSend()方法;當數據發送成功時,定義successedSend()方法。

Kafka消費的代碼如下:

public void consumer() {

//配置文件

properties();

//獲取當前數據的叠代器

iterator = stream.iterator();

while (iterator.hasNext()) {

//取出消息

MessageAndMetadata<byte[], byte[]> next = iterator.next();

messageProcess();

}

}

Kafka消費者會和kafka集群建立一個連接。從kafka讀取數據,調用messageProcess()方法,對獲取的數據靈活處理。

結論

Kafka的高吞吐能力、緩存機制能有效的解決高峰流量沖擊問題。實踐表明,在未將kafka引入系統前,當互聯網關發送的數據量較大時,往往會掛起關系數據庫,數據常常丟失。在引入kafka後,更新程序能夠結合能力自主處理消息,不會引起數據丟失,關系型數據庫的壓力波動不會發生過於顯著的變化,不會出現數據庫掛起鎖死現象。

依靠kafka的訂閱分發機制,實現了一次發布,各分支依據需求自主訂閱的功能。避免了各分支機構直接向數據中心請求數據,或者數據中心依次批量向分支機構傳輸數據以致實時性不足的情況。kafka提高了實時性,減輕了數據中心的壓力,提高了效率。

為了幫助大家讓學習變得輕松、高效,給大家免費分享一大批資料,幫助大家在成為大數據工程師,乃至架構師的路上披荊斬棘。在這裏給大家推薦一個大數據學習交流圈:658558542 歡迎大家進群交流討論,學習交流,共同進步。

當真正開始學習的時候難免不知道從哪入手,導致效率低下影響繼續學習的信心。

但最重要的是不知道哪些技術需要重點掌握,學習時頻繁踩坑,最終浪費大量時間,所以有有效資源還是很有必要的。

最後祝福所有遇到瓶疾且不知道怎麽辦的大數據程序員們,祝福大家在往後的工作與面試中一切順利。

Kafka在大數據環境中的應用