kafka的分割槽數與多執行緒消費
kafka算是很麻煩的一件事兒,起因是最近需要採集大量的資料,原先是隻用了典型的high-level Consumer的API,最經典的不過如下:
- Properties props = new Properties();
- props.put("zookeeper.connect", "xxxx:2181");
- props.put("zookeeper.connectiontimeout.ms", "1000000");
-
props.put("group.id", "test_group");
- props.put("zookeeper.session.timeout.ms", "40000");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- ConsumerConfig consumerConfig = new ConsumerConfig(props);
-
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put("test", new Integer(1));
- //key--topic
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
-
consumerConnector.createMessageStreams(topicCountMap);
- KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0);
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- StringBuffer sb = new StringBuffer();
- while(it.hasNext()) {
- try {
- String msg = new String(it.next().message(), "utf-8").trim();
- System.out.println("receive:" + msg);
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
這是典型的kafka消費端消費資料的程式碼,但可以看出這是十分典型的單執行緒消費。在本地玩玩熟悉kafka還行,(就跟入門Java學會寫main方法列印hello world一樣~~~~),問題是學的東西必須真正應用到實際中,你不可能只在單執行緒採集裡原地打轉吧。。so,多執行緒採集迫在眉急啊!!
本人研究卡夫卡多執行緒消費還是耗了一段時間的,希望把過程儘可能完整地記錄下來,以便各位同行有需要可以參考。。
首先,最好理解kafka的基本原理和一些基本概念,閱讀官網文件很有必要,這樣才會有一個比較清晰的概念,而不是跟無頭蒼蠅一樣亂撞——出了錯去網上查是灰常痛苦滴!!
http://kafka.apache.org/documentation.html
好了,大概說下卡夫卡的“分割槽·”的概念吧:
這張圖比較清晰地描述了“分割槽”的概念,對於某一個topic的訊息來說,我們可以把這組訊息傳送給若干個分割槽,就相當於一組訊息分發一樣。
分割槽、Offset、消費執行緒、group.id的關係
1)一組(類)訊息通常由某個topic來歸類,我們可以把這組訊息“分發”給若干個分割槽(partition),每個分割槽的訊息各不相同;
2)每個分割槽都維護著他自己的偏移量(Offset),記錄著該分割槽的訊息此時被消費的位置;
3)一個消費執行緒可以對應若干個分割槽,但一個分割槽只能被具體某一個消費執行緒消費;
4)group.id用於標記某一個消費組,每一個消費組都會被記錄他在某一個分割槽的Offset,即不同consumer group針對同一個分割槽,都有“各自”的偏移量。
說完概念,必須要注意的一點是,必須確認卡夫卡的server.properties裡面的一個屬性num.partitions必須被設定成大於1的值,否則消費端再怎麼折騰,也用不了多執行緒哦。我這裡的環境下,該屬性值被設定成10了。
重構一下上述經典的消費端程式碼:
- publicclass KafakConsumer implements Runnable {
- private ConsumerConfig consumerConfig;
- privatestatic String topic="blog";
- Properties props;
- finalint a_numThreads = 6;
- public KafakConsumer() {
- props = new Properties();
- props.put("zookeeper.connect", "xxx:2181,yyy:2181,zzz:2181");
- // props.put("zookeeper.connect", "localhost:2181");
- // props.put("zookeeper.connectiontimeout.ms", "30000");
- props.put("group.id", "blog");
- props.put("zookeeper.session.timeout.ms", "400");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- props.put("auto.offset.reset", "smallest");
- consumerConfig = new ConsumerConfig(props);
- }
- @Override
- publicvoid run() {
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(topic, new Integer(a_numThreads));
- ConsumerConfig consumerConfig = new ConsumerConfig(props);
- ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
- ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
- for (final KafkaStream stream : streams) {
- executor.submit(new KafkaConsumerThread(stream));
- }
- }
- publicstaticvoid main(String[] args) {
- System.out.println(topic);
- Thread t = new Thread(new KafakConsumer());
- t.start();
- }
- }
從這段重構的程式碼可以看出,KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0); 這行程式碼已被廢棄,得到List<KafkaStream<byte[], byte[]>>之後不再是得到他的頭元素get(0)就ok了,而且topicCountMap.put(topic, new Integer(a_numThreads));的第二個引數也不再是1.
其中,具體消費執行緒KafkaConsumerThread程式碼為:
- publicclass KafkaConsumerThread implements Runnable {
- private KafkaStream<byte[], byte[]> stream;
- public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
- this.stream = stream;
- }
- @Override
- publicvoid run() {
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- while (it.hasNext()) {
- MessageAndMetadata<byte[], byte[]> mam = it.next();
- System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "],"
- + "offset[" + mam.offset() + "], " + new String(mam.message()));
- }
- }
- }
相關推薦
探討kafka的分割槽數與多執行緒消費
kafka算是很麻煩的一件事兒,起因是最近需要採集大量的資料,原先是隻用了典型的high-level Consumer的API,最經典的不過如下: [java] view plain copy
kafka的分割槽數與多執行緒消費
kafka算是很麻煩的一件事兒,起因是最近需要採集大量的資料,原先是隻用了典型的high-level Consumer的API,最經典的不過如下: Properties props = new Properties();
kafka多執行緒消費及處理和手動提交處理方案設計
kafka與其他訊息佇列不同的是, kafka的消費者狀態由外部( 消費者本身或者類似於Zookeeper之類的外部儲存 )進行維護, 所以kafka的消費就更加靈活, 但是也帶來了很多的問題, 因為客戶端消費超時被判定掛掉而消費者重新分配分割槽, 導致重複消費
叢集下的kafka實現多執行緒消費
上一篇文章講述瞭如何部署kafka叢集,而這篇文章則來探討一下如何使用多執行緒消費,提高消費能力,保障資料的時效性。而實現多執行緒消費其實很簡單,只需要三步即可: 一:kafka叢集配置 多執行緒消費,說白了就是多區消費,kafka可以給topic設定多個p
c#中委託與多執行緒的實質
delegate(委託)的概念,.Net的委託本質上就是指向函式的指標,只不過這種指標是經過封裝後型別安全的。委託和執行緒是兩個不同的概念,執行緒是動態的,委託就是一個或一組記憶體地址,是靜態的。執行緒執行時如果遇到了指向函式的指標就執行這個函式。.Net為了方便程式設計,給委託賦予了兩種方式以供呼
第六章單例模式與多執行緒——立即載入“餓漢模式”與延遲載入“懶漢模式”
立即載入就是使用類的時候已經將物件建立完畢了,也稱為“餓漢模式” package test01; public class MyObject { // 建立物件 private static MyObject object = new MyObject(); private MyObjec
爬蟲與多執行緒
多執行緒和多程序爬蟲 一.執行緒 1.什麼是執行緒。 執行緒是作業系統能夠進行運算排程的最小單位。它被包含在程序中,是進城中的實際運作單位。一條執行緒指的是程序中一個單一順序的控制流,一個執行緒可以併發多個執行緒,每條執行緒執行不同的任務。 2.執行緒常用的方法
C語言高階篇 - 4.連結串列&狀態機與多執行緒
1.連結串列的引入 1、從陣列的缺陷說起 (1)陣列有2個缺陷,一個是陣列中所有元素的型別必須一致;第二個是陣列的元素個數必須事先制定並且一旦指定之後不能更改。 (2)如何解決
【Linux】多程序與多執行緒之間的區別
http://blog.csdn.net/byrsongqq/article/details/6339240 網路程式設計中設計併發伺服器,使用多程序與多執行緒 ,請問有什麼區別? 答案一: 1,程序:子程序是父程序的複製品。子程序獲得父程序資料空間、堆和棧的複製品。 2,執行緒:相
程序、執行緒與多執行緒
一、說說概念 1、程序(process) 狹義定義:程序就是一段程式的執行過程。 廣義定義:程序是一個具有一定獨立功能的程式關於某個資料集合的一次執行活動。它是作業系統動態執行的基本單元,在傳統的作業系統中,程序既是基本的分配單元,也是基本的執行單元。 簡單的來講程序的概念主
Java多執行緒學習筆記21之單例模式與多執行緒
詳細程式碼見:github程式碼地址 第六章 單例模式與多執行緒 前言: 我之前已經開設了23個設計模式這個專欄,介紹了很多的Java設計模式,其中一些模式對於絕 大多數程式語言設計思想都是類似的,需要了解單例模式的可以去看看。 我們在實際開發中經常用到單例模式,但
Python佇列與多執行緒及檔案鎖
佇列實現生產-多執行緒消費 先看程式碼 # -*- coding: utf-8 -*- import queue import threading mu = threading.Lock() class Producer(threading.Thread): def __init__(
執行緒(二):執行緒開啟方式與多執行緒(threading模組)
目錄 執行緒的建立Threading.Thread類 1)執行緒的建立 2)多執行緒與多程序 3)Thread類的其他方法 4)守護執行緒 multiprocess模組的完全模仿了threading模組的介面,二者在使用層面,有很大的相似性,因而不再詳細介紹(官方連結)
JavaSE基礎學習筆記及案例(一)IO流與多執行緒(上)
IO流 1. IO流知識點 IO流(字元輸入流FileReader) 位元組輸入流 FileInputStream IO流(字元輸出流FileWriter) 位元組輸出流 FileOutputStream 字元緩衝區輸入流( BufferedReader) 位元組緩衝區輸入流Bu
高併發與多執行緒的關係、區別、高併發的技術方案
高併發與多執行緒的關係、區別、高併發的技術方案 http://youzhixueyuan.com/high-concurrency-and-multithreading-high-concurrency-technical-solutions.html 什麼是高併發? 高併發(High
java中的Executors簡介與多執行緒在網站上逐步優化的運用案例
提供Executor的工廠類 忽略了自定義的ThreadFactory、callable和unconfigurable相關的方法 newFixedxxx:在任意時刻,最多有nThreads個執行緒在處理task;如果所有執行緒都在執行時來了新的任務,它會被扔
單例模式與多執行緒之間的關係總結
給大家推薦個靠譜的公眾號程式設計師探索之路,大家一起加油 單例模式與多執行緒之間的關係總結(魔怔多執行緒中~~~~~) 近日筆者被多執行緒與單例物件之間的關係產生了混淆。通過了一段時間的查閱,理清了兩者之間的管理,現做筆記梳理。如有不足,歡迎指出:) 在我在考慮考慮他們的時候思考了以
Java執行緒安全與多執行緒開發
網際網路上充斥著對Java多執行緒程式設計的介紹,每篇文章都從不同的角度介紹並總結了該領域的內容。但大部分文章都沒有說明多執行緒的實現本質,沒能讓開發者真正“過癮”。 從Java的執行緒安全鼻祖內建鎖介紹開始,讓你瞭解內建鎖的實現邏輯和原理以及引發的效能問題,接著說明了Java多執行緒程式設計中鎖的存在是為
[Xcode10 實際操作]八、網路與多執行緒-(4)使用UIApplication物件傳送簡訊
本文將演示如何使用應用程式單例物件,傳送簡訊的功能。在專案導航區,開啟檢視控制器的程式碼檔案【ViewController.swift】注:需要使用真機進行測試。 1 import UIKit 2 3 class ViewController: UIViewController { 4
[Xcode10 實際操作]八、網路與多執行緒-(3)使用UIApplication物件撥打電話
本文將演示如何使用應用程式單例物件,撥打電話的功能。 在專案導航區,開啟檢視控制器的程式碼檔案【ViewController.swift】 注:需要使用真機進行測試。 1 import UIKit 2 3 class ViewController: UIViewController {