1. 程式人生 > >kafka+java 偽分散式 之二

kafka+java 偽分散式 之二

對上一篇 kafka+java 偽分散式有生產者和消費者 ,接下來對High Level Consumer做處理 如下:

先看程式碼 : 

package com.xiefg.kafka;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import com.xiefg.conf.ConfigureConstant.KafkaProperties;
import com.xiefg.util.PropertiesUtils;
 
/**
 * 
  * @ClassName: KafkaConsumer
  * @Description: 消費者例項
  * @author Comsys-xiefg
  * @date 2017年2月5日 上午9:50:48
  *
 */
public class KafkaConsumer {
	
	private static ConsumerConnector consumer=null;
	private static  ExecutorService executor;
    private static final int THREAD_AMOUNT = 2;//
 
    public static void main(String[] args) {
    	
       //1、獲取配置
    	Properties props=getPros();
    	//2、 建立消費者
         consumer = CreateConsumer(props);

    	
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        //每個topic使用多少個kafkastream讀取, 多個consumer
        topicCountMap.put(KafkaProperties.TOPIC, THREAD_AMOUNT);
        //可以讀取多個topic
//      topicCountMap.put(TOPIC2, 1);
         
        Map<String, List<KafkaStream<byte[], byte[]>>> msgStreams = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> msgStreamList = msgStreams.get(KafkaProperties.TOPIC);
         
        //使用ExecutorService來排程執行緒
         executor = Executors.newFixedThreadPool(THREAD_AMOUNT);//執行緒數不要多於分割槽數
        for (int i = 0; i < msgStreamList.size(); i++) {
            KafkaStream<byte[], byte[]> kafkaStream = msgStreamList.get(i);
            executor.submit(new HanldMessageThread(kafkaStream, i));
        }
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    	//關閉consumer
        closeConsumer();
    }
     /***
      * 
     * @Title: closeConsumer
     * @Description: 關閉consumer
     * @return void    返回型別
     * @throws
      */
	private static void closeConsumer() {
	
		    if (consumer != null) {
		        consumer.shutdown();
		    }
		    if (executor != null) {
		        executor.shutdown();
		    }
		    //在shutdown之後,等待了5秒鐘,給consumer執行緒時間來處理完kafka stream裡保留的訊息
		    try {
		        if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
		            System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
		        }
		    } catch (InterruptedException e) {
		        System.out.println("Interrupted during shutdown, exiting uncleanly");
		    }
	}

   /**
    * 
   * @Title: CreateConsumer
   * @Description: 建立消費者
   * @param props
   * @return    設定檔案
   * @return ConsumerConnector    返回型別
   * @throws
    */
	private static ConsumerConnector CreateConsumer(Properties props) {
		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
		return consumer;
	}
    
    /**
     * 
    * @Title: getPros
    * @Description: 從配置檔案中獲取相關配置
    * @return    設定檔案
    * @return Properties    返回型別
    * @throws
     */
	public static Properties getPros(){
		Properties props = new Properties();
        props.put("zookeeper.connect", PropertiesUtils.getPropertiesValue(KafkaProperties.ZK));
        props.put("group.id", PropertiesUtils.getPropertiesValue(KafkaProperties.GROUP_ID));
        props.put("zookeeper.session.timeout.ms", PropertiesUtils.getPropertiesValue(KafkaProperties.SESSION_TIMEOUT));
        props.put("zookeeper.sync.time.ms", PropertiesUtils.getPropertiesValue(KafkaProperties.SYNC_TIME));
        props.put("auto.commit.interval.ms", PropertiesUtils.getPropertiesValue(KafkaProperties.INTERVAL));
		return props;
	}
 
}
package com.xiefg.kafka;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

/**
 * 
  * @ClassName: HanldMessageThread
  * @Description: 具體處理message的執行緒
  * @author Comsys-xiefg
  * @date 2017年2月5日 上午10:46:05
  *
 */
class HanldMessageThread implements Runnable {
 
    private KafkaStream<byte[], byte[]> kafkaStream = null;
    private int num = 0;
     
    public HanldMessageThread(KafkaStream<byte[], byte[]> kafkaStream, int num) {
        super();
        this.kafkaStream = kafkaStream;
        this.num = num;
    }
 
    public void run() {
        ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
        while(iterator.hasNext()) {
            String message = new String(iterator.next().message());
            System.out.println("Thread-->: " + num + ", message: " + message);
        }
    }
     
}

package com.xiefg.util;
import java.io.BufferedInputStream;

import java.io.FileInputStream;

import java.io.InputStream;

import java.util.PropertyResourceBundle;
import java.util.ResourceBundle;

import com.xiefg.conf.ConfigureConstant.KafkaProperties;

/***
 * 
  * @ClassName: PropertiesUtils
  * @Description: TODO
  * @author Comsys-xiefg
  * @date 2017年2月5日 下午2:41:52
  *
 */
public class PropertiesUtils {
	
	private static ResourceBundle resources=null;
	
	static{
		InputStream in;
		try {
			String config_path = System.getProperty("user.dir") + "/config/system.properties";  
			in = new BufferedInputStream(new FileInputStream(config_path));
			resources = new PropertyResourceBundle(in);
		} catch (Exception e) {
			e.printStackTrace();
		}  
		
	}
	
	   /**
     * 獲取指定屬性值
     * @param property 屬性名
     * @return
     */
    public static String getPropertiesValue(String property) {
        String val = "";
        try {
            val = resources.getString(property);
        } catch (Exception e) {
            // ignore
        	e.printStackTrace();
        }
        return val;
    }
	

	

	
	public static void main(String[] args) {
		
		System.out.println(PropertiesUtils.getPropertiesValue(KafkaProperties.ZK));
	}
	
}

package com.xiefg.conf;
/**
 * 
  * @ClassName: ConfigureConstant
  * @Description: kafka 配置常量
  * @author Comsys-xiefg
  * @date 2017年2月5日 下午2:22:58
  *
 */
public class ConfigureConstant {

    public interface KafkaProperties {
    	
        public final static String ZK = "kafka.zookeeper.connect";
        public final static String GROUP_ID = "kafka.group.id";
        public final static String SESSION_TIMEOUT = "kafka.zookeeper.session.timeout.ms";
        public final static String SYNC_TIME = "kafka.zookeeper.sync.time.ms";
        public final static String INTERVAL = "kafka.auto.commit.interval.ms";
        //主題
        public final static String TOPIC = "test_topic";
   
       
    }

}

配置檔案system.properties
kafka.zookeeper.connect=192.168.110.69:2181
kafka.group.id=69
kafka.zookeeper.session.timeout.ms=40000
kafka.zookeeper.sync.time.ms=20000
kafka.auto.commit.interval.ms=10000

執行 ProducerTest 類 (上一篇文章中的),然後執行KafkaConsumer 結果如下:
2017-02-05 16:07:21,490 INFO [kafka.utils.VerifiableProperties] - Verifying properties
2017-02-05 16:07:21,532 INFO [kafka.utils.VerifiableProperties] - Property auto.commit.interval.ms is overridden to 10000
2017-02-05 16:07:21,533 INFO [kafka.utils.VerifiableProperties] - Property group.id is overridden to 69
2017-02-05 16:07:21,533 INFO [kafka.utils.VerifiableProperties] - Property zookeeper.connect is overridden to 192.168.170.69:2181
2017-02-05 16:07:21,533 INFO [kafka.utils.VerifiableProperties] - Property zookeeper.session.timeout.ms is overridden to 40000
2017-02-05 16:07:21,533 INFO [kafka.utils.VerifiableProperties] - Property zookeeper.sync.time.ms is overridden to 20000
2017-02-05 16:07:25,052 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Connecting to zookeeper instance at 192.168.170.69:2181
2017-02-05 16:07:25,120 INFO [org.I0Itec.zkclient.ZkEventThread] - Starting ZkClient event thread.
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:host.name=192.168.10.89
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.version=1.7.0_03
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.vendor=Oracle Corporation
2017-02-05 16:07:25,150 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.home=D:\Java\jdk1.7.0_03\jre
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.class.path=D:\hadoop\workspace\kafka\target\classes;C:\Users\xiefg\.m2\repository\org\apache\kafka\kafka_2.10\0.8.2.0\kafka_2.10-0.8.2.0.jar;C:\Users\xiefg\.m2\repository\org\apache\kafka\kafka-clients\0.8.2.0\kafka-clients-0.8.2.0.jar;C:\Users\xiefg\.m2\repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar;C:\Users\xiefg\.m2\repository\net\jpountz\lz4\lz4\1.2.0\lz4-1.2.0.jar;C:\Users\xiefg\.m2\repository\org\xerial\snappy\snappy-java\1.1.1.6\snappy-java-1.1.1.6.jar;C:\Users\xiefg\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\xiefg\.m2\repository\org\scala-lang\scala-library\2.10.4\scala-library-2.10.4.jar;C:\Users\xiefg\.m2\repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;C:\Users\xiefg\.m2\repository\org\slf4j\slf4j-log4j12\1.6.1\slf4j-log4j12-1.6.1.jar;C:\Users\xiefg\.m2\repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;C:\Users\xiefg\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\xiefg\.m2\repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;C:\Users\xiefg\.m2\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;C:\Users\xiefg\.m2\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.library.path=D:\Java\jdk1.7.0_03\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;E:/MyEclipse/binary/com.sun.java.jdk7.win32.x86_1.7.0.u45/jre/bin/client;E:/MyEclipse/binary/com.sun.java.jdk7.win32.x86_1.7.0.u45/jre/bin;E:/MyEclipse/binary/com.sun.java.jdk7.win32.x86_1.7.0.u45/jre/lib/i386;C:\Program Files\Common Files\NetSarang;C:\Program Files\NVIDIA Corporation\PhysX\Common;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\Intel\OpenCL SDK\2.0\bin\x86; D:\Java\jdk1.7.0_03\bin;D:\Java\jdk1.7.0_03\jre\bin;D:\hadoop-2.6.1-64/bin;D:\Rational\common;C:\Python33;D:\instantclient_11_2;C:\Program Files\MySQL\MySQL Server 5.5\bin;.
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.io.tmpdir=C:\Users\xiefg\AppData\Local\Temp\
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:java.compiler=<NA>
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:os.name=Windows 7
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:os.arch=x86
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:os.version=6.1
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:user.name=xiefg
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:user.home=C:\Users\xiefg
2017-02-05 16:07:25,151 INFO [org.apache.zookeeper.ZooKeeper] - Client environment:user.dir=D:\hadoop\workspace\kafka
2017-02-05 16:07:25,152 INFO [org.apache.zookeeper.ZooKeeper] - Initiating client connection, connectString=192.168.170.69:2181 sessionTimeout=40000 [email protected]
2017-02-05 16:07:25,446 INFO [org.apache.zookeeper.ClientCnxn] - Opening socket connection to server 192.168.170.69/192.168.170.69:2181. Will not attempt to authenticate using SASL (unknown error)
2017-02-05 16:07:25,447 INFO [org.apache.zookeeper.ClientCnxn] - Socket connection established to 192.168.170.69/192.168.170.69:2181, initiating session
2017-02-05 16:07:25,457 INFO [org.apache.zookeeper.ClientCnxn] - Session establishment complete on server 192.168.170.69/192.168.170.69:2181, sessionid = 0x15a0c0833ba0006, negotiated timeout = 40000
2017-02-05 16:07:25,459 INFO [org.I0Itec.zkclient.ZkClient] - zookeeper state changed (SyncConnected)
2017-02-05 16:07:25,520 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], starting auto committer every 10000 ms
2017-02-05 16:07:25,626 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], begin registering consumer 69_xiefg-PC-1486282045030-7d9770a7 in ZK
2017-02-05 16:07:25,774 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], end registering consumer 69_xiefg-PC-1486282045030-7d9770a7 in ZK
2017-02-05 16:07:25,825 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], starting watcher executor thread for consumer 69_xiefg-PC-1486282045030-7d9770a7
2017-02-05 16:07:25,919 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], begin rebalancing consumer 69_xiefg-PC-1486282045030-7d9770a7 try #0
2017-02-05 16:07:26,448 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Stopping leader finder thread
2017-02-05 16:07:26,448 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Stopping all fetchers
2017-02-05 16:07:26,451 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] All connections stopped
2017-02-05 16:07:26,452 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Cleared all relevant queues for this fetcher
2017-02-05 16:07:26,454 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Cleared the data chunks in all the consumer message iterators
2017-02-05 16:07:26,455 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Committing all offsets after clearing the fetcher queues
2017-02-05 16:07:26,456 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Releasing partition ownership
2017-02-05 16:07:26,538 INFO [kafka.consumer.RangeAssignor] - Consumer 69_xiefg-PC-1486282045030-7d9770a7 rebalancing the following partitions: ArrayBuffer(0, 1) for topic test_topic with consumers: List(69_xiefg-PC-1486282045030-7d9770a7-0, 69_xiefg-PC-1486282045030-7d9770a7-1)
2017-02-05 16:07:26,540 INFO [kafka.consumer.RangeAssignor] - 69_xiefg-PC-1486282045030-7d9770a7-0 attempting to claim partition 0
2017-02-05 16:07:26,541 INFO [kafka.consumer.RangeAssignor] - 69_xiefg-PC-1486282045030-7d9770a7-1 attempting to claim partition 1
2017-02-05 16:07:26,601 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], 69_xiefg-PC-1486282045030-7d9770a7-0 successfully owned partition 0 for topic test_topic
2017-02-05 16:07:26,603 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], 69_xiefg-PC-1486282045030-7d9770a7-1 successfully owned partition 1 for topic test_topic
2017-02-05 16:07:26,655 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], Consumer 69_xiefg-PC-1486282045030-7d9770a7 selected partitions : test_topic:0: fetched offset = 50: consumed offset = 50,test_topic:1: fetched offset = 50: consumed offset = 50
2017-02-05 16:07:26,692 INFO [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread] - [69_xiefg-PC-1486282045030-7d9770a7-leader-finder-thread], Starting 
2017-02-05 16:07:26,694 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], end rebalancing consumer 69_xiefg-PC-1486282045030-7d9770a7 try #0
2017-02-05 16:07:26,714 INFO [kafka.utils.VerifiableProperties] - Verifying properties
2017-02-05 16:07:26,714 INFO [kafka.utils.VerifiableProperties] - Property client.id is overridden to 69
2017-02-05 16:07:26,714 INFO [kafka.utils.VerifiableProperties] - Property metadata.broker.list is overridden to 192.168.170.69:9092
2017-02-05 16:07:26,715 INFO [kafka.utils.VerifiableProperties] - Property request.timeout.ms is overridden to 30000
2017-02-05 16:07:26,731 INFO [kafka.client.ClientUtils$] - Fetching metadata from broker id:69,host:192.168.170.69,port:9092 with correlation id 0 for 1 topic(s) Set(test_topic)
2017-02-05 16:07:26,734 INFO [kafka.producer.SyncProducer] - Connected to 192.168.170.69:9092 for producing
2017-02-05 16:07:26,752 INFO [kafka.producer.SyncProducer] - Disconnecting from 192.168.170.69:9092
2017-02-05 16:07:26,821 INFO [kafka.consumer.ConsumerFetcherThread] - [ConsumerFetcherThread-69_xiefg-PC-1486282045030-7d9770a7-0-69], Starting 
2017-02-05 16:07:26,824 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Added fetcher for partitions ArrayBuffer([[test_topic,0], initOffset 50 to broker id:69,host:192.168.170.69,port:9092] , [[test_topic,1], initOffset 50 to broker id:69,host:192.168.170.69,port:9092] )
Thread-->: 0, message: xiefg.org0=2017年02月05日 16:07:09 758
Thread-->: 1, message: xiefg.org1=2017年02月05日 16:07:11 467
Thread-->: 0, message: xiefg.org2=2017年02月05日 16:07:11 471
Thread-->: 1, message: xiefg.org3=2017年02月05日 16:07:11 473
Thread-->: 0, message: xiefg.org4=2017年02月05日 16:07:11 476
Thread-->: 1, message: xiefg.org5=2017年02月05日 16:07:11 479
Thread-->: 0, message: xiefg.org6=2017年02月05日 16:07:11 482
Thread-->: 0, message: xiefg.org8=2017年02月05日 16:07:11 487
Thread-->: 1, message: xiefg.org7=2017年02月05日 16:07:11 484
Thread-->: 1, message: xiefg.org9=2017年02月05日 16:07:11 489
2017-02-05 16:07:46,697 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], ZKConsumerConnector shutting down
2017-02-05 16:07:46,749 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Stopping leader finder thread
2017-02-05 16:07:46,751 INFO [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread] - [69_xiefg-PC-1486282045030-7d9770a7-leader-finder-thread], Shutting down
2017-02-05 16:07:46,752 INFO [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread] - [69_xiefg-PC-1486282045030-7d9770a7-leader-finder-thread], Stopped 
2017-02-05 16:07:46,752 INFO [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread] - [69_xiefg-PC-1486282045030-7d9770a7-leader-finder-thread], Shutdown completed
2017-02-05 16:07:46,752 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] Stopping all fetchers
2017-02-05 16:07:46,753 INFO [kafka.consumer.ConsumerFetcherThread] - [ConsumerFetcherThread-69_xiefg-PC-1486282045030-7d9770a7-0-69], Shutting down
2017-02-05 16:07:46,769 INFO [kafka.consumer.SimpleConsumer] - Reconnect due to socket error: java.nio.channels.ClosedByInterruptException
2017-02-05 16:07:46,769 INFO [kafka.consumer.ConsumerFetcherThread] - [ConsumerFetcherThread-69_xiefg-PC-1486282045030-7d9770a7-0-69], Stopped 
2017-02-05 16:07:46,769 INFO [kafka.consumer.ConsumerFetcherThread] - [ConsumerFetcherThread-69_xiefg-PC-1486282045030-7d9770a7-0-69], Shutdown completed
2017-02-05 16:07:46,770 INFO [kafka.consumer.ConsumerFetcherManager] - [ConsumerFetcherManager-1486282045496] All connections stopped
2017-02-05 16:07:46,785 INFO [org.I0Itec.zkclient.ZkEventThread] - Terminate ZkClient event thread.
2017-02-05 16:07:46,802 INFO [org.apache.zookeeper.ZooKeeper] - Session: 0x15a0c0833ba0006 closed
2017-02-05 16:07:46,803 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], ZKConsumerConnector shutdown completed in 105 ms
2017-02-05 16:07:46,803 INFO [org.apache.zookeeper.ClientCnxn] - EventThread shut down
2017-02-05 16:07:46,836 INFO [kafka.consumer.ZookeeperConsumerConnector] - [69_xiefg-PC-1486282045030-7d9770a7], stopping watcher executor thread for consumer 69_xiefg-PC-1486282045030-7d9770a7

可以看到在shutdown之後,等待了5秒鐘,給consumer執行緒時間來處理完kafka stream裡保留的訊息。

為什麼要用High Level Consumer

 某些場景,從Kafka中讀取訊息的邏輯不處理訊息的offset,僅僅是獲取訊息資料。High Level Consumer就提供了這種功能。首先要知道的是,High Level Consumer在ZooKeeper上儲存最新的offset(從指定的分割槽中讀取)。這個offset基於consumer group名儲存。Consumer group名在Kafka叢集上是全域性性的,在啟動新的consumer group的時候要小心叢集上沒有關閉的consumer。當一個consumer執行緒啟動了,Kafka會將它加入到相同的topic下的相同consumer group裡,並且觸發重新分配。在重新分配時,Kafka將partition分配給consumer,有可能會移動一個partition給另一個consumer。如果老的、新的處理邏輯同時存在,有可能將一些訊息傳遞到了老的consumer上。

設計High Level Consumer

      使用High LevelConsumer,它是多執行緒的。消費者執行緒的數量跟topic的partition數量有關,它們之間有一些特定的規則:

        1、如果執行緒數量大於主題的分割槽數量,一些執行緒將得不到任何訊息。

        2、如果分割槽數大於執行緒數,一些執行緒將得到多個分割槽的訊息。

        3、如果一個執行緒處理多個分割槽的訊息,它接收到訊息的順序是不能保證的。比如,先從分割槽10獲取了5條訊息,從分割槽11獲取了6條訊息,然後從分割槽10獲取了5條,緊接著又從分割槽10獲取了5條,雖然分割槽2還有訊息。

新增更多了同consumer group的consumer將觸發Kafka重新分配,某個分割槽本來分配給a執行緒,當從新分配後,有可能分配給了b執行緒。

關閉消費組和錯誤處理

        Kafka不會再每次讀取訊息後馬上更新zookeeper上的offset,而是等待一段時間。由於這種延遲,有可能消費者讀取了一條訊息,但沒有更新offset。所以,當客戶端關閉或崩潰後,從新啟動時有些訊息重複讀取了。另外,broker宕機或其他原因導致更換了partition的leader,也會導致訊息重複讀取。

為了避免這種問題,你應該提供一個平滑的關閉方式,而不是直接kill 掉。


上面的是解釋是從  https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example   上翻譯的 不對的地方還請指正

相關推薦

kafka+java 分散式

對上一篇 kafka+java 偽分散式有生產者和消費者 ,接下來對High Level Consumer做處理 如下: 先看程式碼 :  package com.xiefg.kafka; import java.util.HashMap; import java.u

Java學習筆記】解析接口在Java繼承中的用法及實例分析

ani 復制代碼 ads compute 現在 target body 常量 實現接口 一、定義 Java接口(Interface),是一系列方法的聲明,是一些方法特征的集合,一個接口只有方法的特征沒有方法的實現,因此這些方法可以在不同的地方被不同的類實現,而這些實現可以具

Selenium+Java元素定位

art pin log 技術 path 通過 cli 元素 ndk //通過完全匹配鏈接來定位 driver.findElement(By.linkText("新聞")).click(); //通過部分匹配鏈接來定位 driver.findElement(By.par

Java 數組 維數組

英語 println 數組定義和初始化 stat code ava pri http mes http://www.verejava.com/?id=16992693216433 public class BinaryArray { public static vo

thinking in java (九) ----- 陣列(Array)

效能 在java中有一系列的方式可以用來作為存放物件的容器,並且有很多在操作上比陣列要簡單。但是我們仍然會使用陣列。因為陣列有自己的優點,效率,型別和儲存基本型別的能力。在java中,陣列是效率最高的儲存和隨訪問物件引用序列的方式。 在一般的專案中,確實陣列是咩有List,Ma

java畫圖總結(常用類Graphics,JFrame,JPanel)

二,常用類 Graphics,JFrame,JPanel         javax.swing.JFrame;     javax.swing.JPanel; ------------

java資料結構叉排序樹

binary sort tree / binary search tree 性質: 1.若左子樹不為空,則左子樹上所有節點的值均小於它的根節點的值。 2.若右子樹不為空,則右子樹上所有節點的值均大於它的根節點的值。 3.左右子樹也是二叉排序樹。 4.沒有值相同的節點

java-js知識庫——canvas繪製炫彩氣泡

        現在使用canvas繪製氣泡,雖說很多人都已經實現過了,可能方法都大同小異,但自己寫和看別人寫完全是兩碼事,自己會寫的才是自己的,話不多說,直接上程式碼。         先來一張效果圖:   &nb

Java設計模式 ----- 工廠模式

在上一篇中我們學習了單例模式,介紹了單例模式建立的幾種方法以及最優的方法。本篇則介紹設計模式中的工廠模式,主要分為簡單工廠模式、工廠方法和抽象工廠模式。 簡單工廠模式 簡單工廠模式是屬於建立型模式,又叫做靜態工廠方法模式。簡單工廠模式是由一個工廠物件決定創建出哪一種產品類的例項。呼叫只需要告訴工廠類

kafka權威指南中文版

上圖所示,consumer訂閱kafka叢集中(一個broker中的一個topic中)的訊息,然後對broker發起一個獲取訊息的請求,請求中攜帶了topic、partition、offset等資訊,接著用pull的方式獲取kafka log中所有可用訊息,並對訊息中的資料進行處理,比如使用spark進行計算

Java學習筆記叉樹的節點數、深度

在上篇博文《Java學習筆記之建立二叉樹》後,我們現在來求增加二叉樹的節點數、二叉樹的深度的函式,以下程式碼中黃色背景是增加實現的程式碼,由於註釋較多,我用綠色字型將自己解讀的註解區分。 老樣子還是先註明這句話:【本文的程式碼請見原創http://blog.csdn.net

java IO流 使用IO流讀取儲存檔案

http://blog.csdn.net/a107494639/article/details/7586440 一、使用字元流,讀取和儲存純文字檔案。        儲存檔案,也就是像一個檔案裡寫內容,既然是寫,那就需要使用輸出流。而且我們寫的是純文字檔案,所以這裡

15 Java常用API

抽象類 simple mat java常用 ron 內容 date類 cal 表達 JavaSE 基礎之十五 15 Java常用API之二 ① 常用包裝類 分類:Byte類、Short類、Integer類、Long類、Float類、Double類、Boolean類

zookeeper+kafka集群安裝

聲明 r+ object ise width cli top 直接 partition 版權聲明:本文為博主原創文章,未

Java程式設計思想十 併發

20.1 併發得多面性 併發程式設計令人困惑的一個主要原因:使用併發時需要解決的問題有多個,而實現併發的方法也有多種,並且在這兩者之間沒有明顯的對映關係。 20.1.1 更快的執行 速度問題初聽起來很簡單:如果你需要一個程式執行得更快,那麼可以將起斷開為多個片段,在單個處理器上執行每個片段。 併發通常是提高執

Kubernetes官方java客戶端:序列化和反序列化問題

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ###

大資料環境搭建Kafka分散式環境搭建步驟詳解

文章目錄 Kafka簡介 環境準備 解壓安裝 配置檔案 服務啟動 Topic相關操作 控制檯生產者 控制檯消費者 Kafka簡介

Kafka 入門教程: Java連線Kafka生產者

1. 檢查service配置檔案 修改引數 advertised.listeners=PLAINTEXT://tjtestrac1:9092 注意防火牆對埠的遮蔽 [[email protected] config]$ vi server.properties ###

Spring kafka 學習 採用java 配置類方式傳送與接收訊息

參考資料:https://docs.spring.io/spring-kafka/reference/html/_introduction.html#compatibilityspring-kafka 版本:2.1.5.release1、配置類package com.hdsx

轉: 【Java並發編程】十:並發新特性—Lock鎖和條件變量(含代碼)

ets exc n) 否則 max 長時間 info trace space 簡單使用Lock鎖 Java 5中引入了新的鎖機制——Java.util.concurrent.locks中的顯式的互斥鎖:Lock接口,它提供了比synchronized更加廣泛的鎖