kafka+java 偽分散式 之二
對上一篇 kafka+java 偽分散式有生產者和消費者 ,接下來對High Level Consumer做處理 如下:
先看程式碼 :
package com.xiefg.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import com.xiefg.conf.ConfigureConstant.KafkaProperties; import com.xiefg.util.PropertiesUtils; /** * * @ClassName: KafkaConsumer * @Description: 消費者例項 * @author Comsys-xiefg * @date 2017年2月5日 上午9:50:48 * */ public class KafkaConsumer { private static ConsumerConnector consumer=null; private static ExecutorService executor; private static final int THREAD_AMOUNT = 2;// public static void main(String[] args) { //1、獲取配置 Properties props=getPros(); //2、 建立消費者 consumer = CreateConsumer(props); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //每個topic使用多少個kafkastream讀取, 多個consumer topicCountMap.put(KafkaProperties.TOPIC, THREAD_AMOUNT); //可以讀取多個topic // topicCountMap.put(TOPIC2, 1); Map<String, List<KafkaStream<byte[], byte[]>>> msgStreams = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> msgStreamList = msgStreams.get(KafkaProperties.TOPIC); //使用ExecutorService來排程執行緒 executor = Executors.newFixedThreadPool(THREAD_AMOUNT);//執行緒數不要多於分割槽數 for (int i = 0; i < msgStreamList.size(); i++) { KafkaStream<byte[], byte[]> kafkaStream = msgStreamList.get(i); executor.submit(new HanldMessageThread(kafkaStream, i)); } try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } //關閉consumer closeConsumer(); } /*** * * @Title: closeConsumer * @Description: 關閉consumer * @return void 返回型別 * @throws */ private static void closeConsumer() { if (consumer != null) { consumer.shutdown(); } if (executor != null) { executor.shutdown(); } //在shutdown之後,等待了5秒鐘,給consumer執行緒時間來處理完kafka stream裡保留的訊息 try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } /** * * @Title: CreateConsumer * @Description: 建立消費者 * @param props * @return 設定檔案 * @return ConsumerConnector 返回型別 * @throws */ private static ConsumerConnector CreateConsumer(Properties props) { ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); return consumer; } /** * * @Title: getPros * @Description: 從配置檔案中獲取相關配置 * @return 設定檔案 * @return Properties 返回型別 * @throws */ public static Properties getPros(){ Properties props = new Properties(); props.put("zookeeper.connect", PropertiesUtils.getPropertiesValue(KafkaProperties.ZK)); props.put("group.id", PropertiesUtils.getPropertiesValue(KafkaProperties.GROUP_ID)); props.put("zookeeper.session.timeout.ms", PropertiesUtils.getPropertiesValue(KafkaProperties.SESSION_TIMEOUT)); props.put("zookeeper.sync.time.ms", PropertiesUtils.getPropertiesValue(KafkaProperties.SYNC_TIME)); props.put("auto.commit.interval.ms", PropertiesUtils.getPropertiesValue(KafkaProperties.INTERVAL)); return props; } }
package com.xiefg.kafka; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; /** * * @ClassName: HanldMessageThread * @Description: 具體處理message的執行緒 * @author Comsys-xiefg * @date 2017年2月5日 上午10:46:05 * */ class HanldMessageThread implements Runnable { private KafkaStream<byte[], byte[]> kafkaStream = null; private int num = 0; public HanldMessageThread(KafkaStream<byte[], byte[]> kafkaStream, int num) { super(); this.kafkaStream = kafkaStream; this.num = num; } public void run() { ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()) { String message = new String(iterator.next().message()); System.out.println("Thread-->: " + num + ", message: " + message); } } }
package com.xiefg.util; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.util.PropertyResourceBundle; import java.util.ResourceBundle; import com.xiefg.conf.ConfigureConstant.KafkaProperties; /*** * * @ClassName: PropertiesUtils * @Description: TODO * @author Comsys-xiefg * @date 2017年2月5日 下午2:41:52 * */ public class PropertiesUtils { private static ResourceBundle resources=null; static{ InputStream in; try { String config_path = System.getProperty("user.dir") + "/config/system.properties"; in = new BufferedInputStream(new FileInputStream(config_path)); resources = new PropertyResourceBundle(in); } catch (Exception e) { e.printStackTrace(); } } /** * 獲取指定屬性值 * @param property 屬性名 * @return */ public static String getPropertiesValue(String property) { String val = ""; try { val = resources.getString(property); } catch (Exception e) { // ignore e.printStackTrace(); } return val; } public static void main(String[] args) { System.out.println(PropertiesUtils.getPropertiesValue(KafkaProperties.ZK)); } }
package com.xiefg.conf;
/**
*
* @ClassName: ConfigureConstant
* @Description: kafka 配置常量
* @author Comsys-xiefg
* @date 2017年2月5日 下午2:22:58
*
*/
public class ConfigureConstant {
public interface KafkaProperties {
public final static String ZK = "kafka.zookeeper.connect";
public final static String GROUP_ID = "kafka.group.id";
public final static String SESSION_TIMEOUT = "kafka.zookeeper.session.timeout.ms";
public final static String SYNC_TIME = "kafka.zookeeper.sync.time.ms";
public final static String INTERVAL = "kafka.auto.commit.interval.ms";
//主題
public final static String TOPIC = "test_topic";
}
}
配置檔案system.properties
kafka.zookeeper.connect=192.168.110.69:2181
kafka.group.id=69
kafka.zookeeper.session.timeout.ms=40000
kafka.zookeeper.sync.time.ms=20000
kafka.auto.commit.interval.ms=10000
執行 ProducerTest 類 (上一篇文章中的),然後執行KafkaConsumer 結果如下:
2017-02-05 16:07:21,490 INFO [kafka.utils.VerifiableProperties] - Verifying properties
2017-02-05 16:07:21,532 INFO [kafka.utils.VerifiableProperties] - Property auto.commit.interval.ms is overridden to 10000
2017-02-05 16:07:21,533 INFO [kafka.utils.VerifiableProperties] - Property group.id is overridden to 69
2017-02-05 16:07:21,533 INFO [kafka.utils.VerifiableProperties] - Property zookeeper.connect is overridden to 192.168.170.69:2181
2017-02-05 16:07:21,533 INFO [kafka.utils.VerifiableProperties] - Property zookeeper.session.timeout.ms is overridden to 40000
2017-02-05 16:07:21,533 INFO [kafka.utils.VerifiableProperties] - Property zookeeper.sync.time.ms is overridden to 20000
2017-02-05 16:07:25,052 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Connecting to zookeeper instance at 192.168.170.69:2181
2017-02-05 16:07:25,120 INFO [org.I0Itec.zkclient.ZkEventThread] - Starting ZkClient event thread.
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:host.name=192.168.10.89
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.version=1.7.0_03
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.vendor=Oracle Corporation
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.home=D:\Java\jdk1.7.0_03\jre
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.class.path=D:\hadoop\workspace\kafka\target\classes;C:\Users\xiefg\.m2\repository\org\apache\kafka\kafka_2.10\0.8.2.0\kafka_2.10-0.8.2.0.jar;C:\Users\xiefg\.m2\repository\org\apache\kafka\kafka-clients\0.8.2.0\kafka-clients-0.8.2.0.jar;C:\Users\xiefg\.m2\repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar;C:\Users\xiefg\.m2\repository\net\jpountz\lz4\lz4\1.2.0\lz4-1.2.0.jar;C:\Users\xiefg\.m2\repository\org\xerial\snappy\snappy-java\1.1.1.6\snappy-java-1.1.1.6.jar;C:\Users\xiefg\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\xiefg\.m2\repository\org\scala-lang\scala-library\2.10.4\scala-library-2.10.4.jar;C:\Users\xiefg\.m2\repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;C:\Users\xiefg\.m2\repository\org\slf4j\slf4j-log4j12\1.6.1\slf4j-log4j12-1.6.1.jar;C:\Users\xiefg\.m2\repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;C:\Users\xiefg\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\xiefg\.m2\repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;C:\Users\xiefg\.m2\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;C:\Users\xiefg\.m2\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.library.path=D:\Java\jdk1.7.0_03\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;E:/MyEclipse/binary/com.sun.java.jdk7.win32.x86_1.7.0.u45/jre/bin/client;E:/MyEclipse/binary/com.sun.java.jdk7.win32.x86_1.7.0.u45/jre/bin;E:/MyEclipse/binary/com.sun.java.jdk7.win32.x86_1.7.0.u45/jre/lib/i386;C:\Program Files\Common Files\NetSarang;C:\Program Files\NVIDIA Corporation\PhysX\Common;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\Intel\OpenCL SDK\2.0\bin\x86; D:\Java\jdk1.7.0_03\bin;D:\Java\jdk1.7.0_03\jre\bin;D:\hadoop-2.6.1-64/bin;D:\Rational\common;C:\Python33;D:\instantclient_11_2;C:\Program Files\MySQL\MySQL Server 5.5\bin;.
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.io.tmpdir=C:\Users\xiefg\AppData\Local\Temp\
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.compiler=<NA>
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:os.name=Windows 7
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:os.arch=x86
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:os.version=6.1
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:user.name=xiefg
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:user.home=C:\Users\xiefg
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:user.dir=D:\hadoop\workspace\kafka
2017-02-05 16:07:25,152 INFO [org.apache.zookeeper.ZooKeeper] - Initiating client connection, connectString=192.168.170.69:2181 sessionTimeout=40000 [email protected]
2017-02-05 16:07:25,446 INFO [org.apache.zookeeper.ClientCnxn] - Opening socket connection to server 192.168.170.69/192.168.170.69:2181. Will not attempt to authenticate using SASL (unknown error)
2017-02-05 16:07:25,447 INFO [org.apache.zookeeper.ClientCnxn] - Socket connection established to 192.168.170.69/192.168.170.69:2181, initiating session
2017-02-05 16:07:25,457 INFO [org.apache.zookeeper.ClientCnxn] - Session establishment complete on server 192.168.170.69/192.168.170.69:2181, sessionid = 0x15a0c0833ba0006, negotiated timeout = 40000
2017-02-05 16:07:25,459 INFO [org.I0Itec.zkclient.ZkClient] - zookeeper state changed (SyncConnected)
2017-02-05 16:07:25,520 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], starting auto committer every 10000 ms
2017-02-05 16:07:25,626 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], begin registering consumer 69_xiefg-PC-1486282045030-7d9770a7 in ZK
2017-02-05 16:07:25,774 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], end registering consumer 69_xiefg-PC-1486282045030-7d9770a7 in ZK
2017-02-05 16:07:25,825 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], starting watcher executor thread for consumer 69_xiefg-PC-1486282045030-7d9770a7
2017-02-05 16:07:25,919 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], begin rebalancing consumer 69_xiefg-PC-1486282045030-7d9770a7 try #0
2017-02-05 16:07:26,448 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Stopping leader finder thread
2017-02-05 16:07:26,448 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Stopping all fetchers
2017-02-05 16:07:26,451 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] All connections stopped
2017-02-05 16:07:26,452 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Cleared all relevant queues for this fetcher
2017-02-05 16:07:26,454 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Cleared the data chunks in all the consumer message iterators
2017-02-05 16:07:26,455 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Committing all offsets after clearing the fetcher queues
2017-02-05 16:07:26,456 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Releasing partition ownership
2017-02-05 16:07:26,538 INFO [kafka.consumer.RangeAssignor] - Consumer 69_xiefg-PC-1486282045030-7d9770a7 rebalancing the following partitions: ArrayBuffer(0, 1) for topic test_topic with consumers: List(69_xiefg-PC-1486282045030-7d9770a7-0, 69_xiefg-PC-1486282045030-7d9770a7-1)
2017-02-05 16:07:26,540 INFO [kafka.consumer.RangeAssignor] - 69_xiefg-PC-1486282045030-7d9770a7-0 attempting to claim partition 0
2017-02-05 16:07:26,541 INFO [kafka.consumer.RangeAssignor] - 69_xiefg-PC-1486282045030-7d9770a7-1 attempting to claim partition 1
2017-02-05 16:07:26,601 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], 69_xiefg-PC-1486282045030-7d9770a7-0 successfully owned partition 0 for topic test_topic
2017-02-05 16:07:26,603 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], 69_xiefg-PC-1486282045030-7d9770a7-1 successfully owned partition 1 for topic test_topic
2017-02-05 16:07:26,655 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Consumer 69_xiefg-PC-1486282045030-7d9770a7 selected partitions : test_topic:0: fetched offset = 50: consumed offset = 50,test_topic:1: fetched offset = 50: consumed offset = 50
2017-02-05 16:07:26,692 INFO [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread] - [69_xiefg-PC-1486282045030-7d9770a7-leader-finder-thread], Starting
2017-02-05 16:07:26,694 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], end rebalancing consumer 69_xiefg-PC-1486282045030-7d9770a7 try #0
2017-02-05 16:07:26,714 INFO [kafka.utils.VerifiableProperties] - Verifying properties
2017-02-05 16:07:26,714 INFO [kafka.utils.VerifiableProperties] - Property client.id is overridden to 69
2017-02-05 16:07:26,714 INFO [kafka.utils.VerifiableProperties] - Property metadata.broker.list is overridden to 192.168.170.69:9092
2017-02-05 16:07:26,715 INFO [kafka.utils.VerifiableProperties] - Property request.timeout.ms is overridden to 30000
2017-02-05 16:07:26,731 INFO [kafka.client.ClientUtils$] - Fetching metadata from broker id:69,host:192.168.170.69,port:9092 with correlation id 0 for 1 topic(s) Set(test_topic)
2017-02-05 16:07:26,734 INFO [kafka.producer.SyncProducer] - Connected to 192.168.170.69:9092 for producing
2017-02-05 16:07:26,752 INFO [kafka.producer.SyncProducer] - Disconnecting from 192.168.170.69:9092
2017-02-05 16:07:26,821 INFO [kafka.consumer.ConsumerFetcherThread] - [ConsumerFetcherThread-69_xiefg-PC-1486282045030-7d9770a7-0-69], Starting
2017-02-05 16:07:26,824 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Added fetcher for partitions ArrayBuffer([[test_topic,0], initOffset 50 to broker id:69,host:192.168.170.69,port:9092] , [[test_topic,1], initOffset 50 to broker id:69,host:192.168.170.69,port:9092] )
Thread-->: 0, message: xiefg.org0=2017年02月05日 16:07:09 758
Thread-->: 1, message: xiefg.org1=2017年02月05日 16:07:11 467
Thread-->: 0, message: xiefg.org2=2017年02月05日 16:07:11 471
Thread-->: 1, message: xiefg.org3=2017年02月05日 16:07:11 473
Thread-->: 0, message: xiefg.org4=2017年02月05日 16:07:11 476
Thread-->: 1, message: xiefg.org5=2017年02月05日 16:07:11 479
Thread-->: 0, message: xiefg.org6=2017年02月05日 16:07:11 482
Thread-->: 0, message: xiefg.org8=2017年02月05日 16:07:11 487
Thread-->: 1, message: xiefg.org7=2017年02月05日 16:07:11 484
Thread-->: 1, message: xiefg.org9=2017年02月05日 16:07:11 489
2017-02-05 16:07:46,697 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], ZKConsumerConnector shutting down
2017-02-05 16:07:46,749 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Stopping leader finder thread
2017-02-05 16:07:46,751 INFO [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread] - [69_xiefg-PC-1486282045030-7d9770a7-leader-finder-thread], Shutting down
2017-02-05 16:07:46,752 INFO [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread] - [69_xiefg-PC-1486282045030-7d9770a7-leader-finder-thread], Stopped
2017-02-05 16:07:46,752 INFO [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread] - [69_xiefg-PC-1486282045030-7d9770a7-leader-finder-thread], Shutdown completed
2017-02-05 16:07:46,752 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Stopping all fetchers
2017-02-05 16:07:46,753 INFO [kafka.consumer.ConsumerFetcherThread] - [ConsumerFetcherThread-69_xiefg-PC-1486282045030-7d9770a7-0-69], Shutting down
2017-02-05 16:07:46,769 INFO [kafka.consumer.SimpleConsumer] - Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
2017-02-05 16:07:46,769 INFO [kafka.consumer.ConsumerFetcherThread] - [ConsumerFetcherThread-69_xiefg-PC-1486282045030-7d9770a7-0-69], Stopped
2017-02-05 16:07:46,769 INFO [kafka.consumer.ConsumerFetcherThread] - [ConsumerFetcherThread-69_xiefg-PC-1486282045030-7d9770a7-0-69], Shutdown completed
2017-02-05 16:07:46,770 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] All connections stopped
2017-02-05 16:07:46,785 INFO [org.I0Itec.zkclient.ZkEventThread] - Terminate ZkClient event thread.
2017-02-05 16:07:46,802 INFO [org.apache.zookeeper.ZooKeeper] - Session: 0x15a0c0833ba0006 closed
2017-02-05 16:07:46,803 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], ZKConsumerConnector shutdown completed in 105 ms
2017-02-05 16:07:46,803 INFO [org.apache.zookeeper.ClientCnxn] - EventThread shut down
2017-02-05 16:07:46,836 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], stopping watcher executor thread for consumer 69_xiefg-PC-1486282045030-7d9770a7
可以看到在shutdown之後,等待了5秒鐘,給consumer執行緒時間來處理完kafka stream裡保留的訊息。
為什麼要用High Level Consumer
某些場景,從Kafka中讀取訊息的邏輯不處理訊息的offset,僅僅是獲取訊息資料。High Level Consumer就提供了這種功能。首先要知道的是,High Level Consumer在ZooKeeper上儲存最新的offset(從指定的分割槽中讀取)。這個offset基於consumer group名儲存。Consumer group名在Kafka叢集上是全域性性的,在啟動新的consumer group的時候要小心叢集上沒有關閉的consumer。當一個consumer執行緒啟動了,Kafka會將它加入到相同的topic下的相同consumer group裡,並且觸發重新分配。在重新分配時,Kafka將partition分配給consumer,有可能會移動一個partition給另一個consumer。如果老的、新的處理邏輯同時存在,有可能將一些訊息傳遞到了老的consumer上。
設計High Level Consumer
使用High LevelConsumer,它是多執行緒的。消費者執行緒的數量跟topic的partition數量有關,它們之間有一些特定的規則:
1、如果執行緒數量大於主題的分割槽數量,一些執行緒將得不到任何訊息。
2、如果分割槽數大於執行緒數,一些執行緒將得到多個分割槽的訊息。
3、如果一個執行緒處理多個分割槽的訊息,它接收到訊息的順序是不能保證的。比如,先從分割槽10獲取了5條訊息,從分割槽11獲取了6條訊息,然後從分割槽10獲取了5條,緊接著又從分割槽10獲取了5條,雖然分割槽2還有訊息。
新增更多了同consumer group的consumer將觸發Kafka重新分配,某個分割槽本來分配給a執行緒,當從新分配後,有可能分配給了b執行緒。
關閉消費組和錯誤處理
Kafka不會再每次讀取訊息後馬上更新zookeeper上的offset,而是等待一段時間。由於這種延遲,有可能消費者讀取了一條訊息,但沒有更新offset。所以,當客戶端關閉或崩潰後,從新啟動時有些訊息重複讀取了。另外,broker宕機或其他原因導致更換了partition的leader,也會導致訊息重複讀取。
為了避免這種問題,你應該提供一個平滑的關閉方式,而不是直接kill 掉。
上面的是解釋是從 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example 上翻譯的 不對的地方還請指正
相關推薦
kafka+java 偽分散式 之二
對上一篇 kafka+java 偽分散式有生產者和消費者 ,接下來對High Level Consumer做處理 如下: 先看程式碼 : package com.xiefg.kafka; import java.util.HashMap; import java.u
【Java學習筆記之二十二】解析接口在Java繼承中的用法及實例分析
ani 復制代碼 ads compute 現在 target body 常量 實現接口 一、定義 Java接口(Interface),是一系列方法的聲明,是一些方法特征的集合,一個接口只有方法的特征沒有方法的實現,因此這些方法可以在不同的地方被不同的類實現,而這些實現可以具
Selenium+Java元素定位之二
art pin log 技術 path 通過 cli 元素 ndk //通過完全匹配鏈接來定位 driver.findElement(By.linkText("新聞")).click(); //通過部分匹配鏈接來定位 driver.findElement(By.par
Java 數組 之 二維數組
英語 println 數組定義和初始化 stat code ava pri http mes http://www.verejava.com/?id=16992693216433 public class BinaryArray { public static vo
thinking in java (九) ----- 陣列之二(Array)
效能 在java中有一系列的方式可以用來作為存放物件的容器,並且有很多在操作上比陣列要簡單。但是我們仍然會使用陣列。因為陣列有自己的優點,效率,型別和儲存基本型別的能力。在java中,陣列是效率最高的儲存和隨訪問物件引用序列的方式。 在一般的專案中,確實陣列是咩有List,Ma
java畫圖總結之二(常用類Graphics,JFrame,JPanel)
二,常用類 Graphics,JFrame,JPanel javax.swing.JFrame; javax.swing.JPanel; ------------
java資料結構之二叉排序樹
binary sort tree / binary search tree 性質: 1.若左子樹不為空,則左子樹上所有節點的值均小於它的根節點的值。 2.若右子樹不為空,則右子樹上所有節點的值均大於它的根節點的值。 3.左右子樹也是二叉排序樹。 4.沒有值相同的節點
java-js知識庫之二——canvas繪製炫彩氣泡
現在使用canvas繪製氣泡,雖說很多人都已經實現過了,可能方法都大同小異,但自己寫和看別人寫完全是兩碼事,自己會寫的才是自己的,話不多說,直接上程式碼。 先來一張效果圖: &nb
Java設計模式之二 ----- 工廠模式
在上一篇中我們學習了單例模式,介紹了單例模式建立的幾種方法以及最優的方法。本篇則介紹設計模式中的工廠模式,主要分為簡單工廠模式、工廠方法和抽象工廠模式。 簡單工廠模式 簡單工廠模式是屬於建立型模式,又叫做靜態工廠方法模式。簡單工廠模式是由一個工廠物件決定創建出哪一種產品類的例項。呼叫只需要告訴工廠類
kafka權威指南中文版之二
上圖所示,consumer訂閱kafka叢集中(一個broker中的一個topic中)的訊息,然後對broker發起一個獲取訊息的請求,請求中攜帶了topic、partition、offset等資訊,接著用pull的方式獲取kafka log中所有可用訊息,並對訊息中的資料進行處理,比如使用spark進行計算
Java學習筆記之二叉樹的節點數、深度
在上篇博文《Java學習筆記之建立二叉樹》後,我們現在來求增加二叉樹的節點數、二叉樹的深度的函式,以下程式碼中黃色背景是增加實現的程式碼,由於註釋較多,我用綠色字型將自己解讀的註解區分。 老樣子還是先註明這句話:【本文的程式碼請見原創http://blog.csdn.net
java IO流之二 使用IO流讀取儲存檔案
http://blog.csdn.net/a107494639/article/details/7586440 一、使用字元流,讀取和儲存純文字檔案。 儲存檔案,也就是像一個檔案裡寫內容,既然是寫,那就需要使用輸出流。而且我們寫的是純文字檔案,所以這裡
15 Java常用API之二
抽象類 simple mat java常用 ron 內容 date類 cal 表達 JavaSE 基礎之十五 15 Java常用API之二 ① 常用包裝類 分類:Byte類、Short類、Integer類、Long類、Float類、Double類、Boolean類
zookeeper+kafka集群安裝之二
聲明 r+ object ise width cli top 直接 partition 版權聲明:本文為博主原創文章,未
Java程式設計思想之二十 併發
20.1 併發得多面性 併發程式設計令人困惑的一個主要原因:使用併發時需要解決的問題有多個,而實現併發的方法也有多種,並且在這兩者之間沒有明顯的對映關係。 20.1.1 更快的執行 速度問題初聽起來很簡單:如果你需要一個程式執行得更快,那麼可以將起斷開為多個片段,在單個處理器上執行每個片段。 併發通常是提高執
Kubernetes官方java客戶端之二:序列化和反序列化問題
### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ###
大資料環境搭建之Kafka偽分散式環境搭建步驟詳解
文章目錄 Kafka簡介 環境準備 解壓安裝 配置檔案 服務啟動 Topic相關操作 控制檯生產者 控制檯消費者 Kafka簡介
Kafka 入門教程之二: Java連線Kafka之生產者
1. 檢查service配置檔案 修改引數 advertised.listeners=PLAINTEXT://tjtestrac1:9092 注意防火牆對埠的遮蔽 [[email protected] config]$ vi server.properties ###
Spring kafka 學習之二 採用java 配置類方式傳送與接收訊息
參考資料:https://docs.spring.io/spring-kafka/reference/html/_introduction.html#compatibilityspring-kafka 版本:2.1.5.release1、配置類package com.hdsx
轉: 【Java並發編程】之二十:並發新特性—Lock鎖和條件變量(含代碼)
ets exc n) 否則 max 長時間 info trace space 簡單使用Lock鎖 Java 5中引入了新的鎖機制——Java.util.concurrent.locks中的顯式的互斥鎖:Lock接口,它提供了比synchronized更加廣泛的鎖