1. 程式人生 > 實用技巧 >【大資料Kafka之 high-level--Consumer 】

【大資料Kafka之 high-level--Consumer 】

>>> hot3.png

一、特點:

不用關心offset, 會自動的讀zookeeper中該Consumer group的last offset

二、注意事項

1. 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許併發的,

所以consumer數不要大於partition數

2. 如果consumer比partition少,一個consumer會對應於多個partitions,

這裡主要合理分配consumer數和partition數,否則會導致partition裡面的資料被取的不均勻

最好partiton數目是consumer數目的整數倍,所以partition數目很重要,

比如取24,就很容易設定consumer數目

3. 如果consumer從多個partition讀到資料,不保證資料間的順序性,

kafka只保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同

4. 增減consumer,broker,partition會導致rebalance,

所以rebalance後consumer對應的partition會發生變化

5. High-level介面中獲取不到資料的時候是會block的

三、程式碼如下:

package kafkatest.kakfademo;

import java.io.UnsupportedEncodingException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerDemo1 {

public static void main(String[] args) {

ConsumerDemo1 demo = new ConsumerDemo1();

demo.test();

}

@SuppressWarnings("rawtypes")

public void test() {

String topicName = "test";

int numThreads = 1;

Properties properties = new Properties();

properties.put("zookeeper.connect", "hadoop0:2181");// 宣告zk

properties.put("group.id", "group--demo");// 必須要使用別的組名稱,

// 如果生產者和消費者都在同一組,則不能訪問同一組內的topic資料

ConsumerConnector consumer = Consumer

.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topicName, numThreads); // 一次從主題中獲取一個數據

Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer

.createMessageStreams(topicCountMap);

// 獲取每次接收到的這個資料

List<KafkaStream<byte[], byte[]>> streams = messageStreams

.get(topicName);

// now launch all the threads

ExecutorService executor = Executors.newFixedThreadPool(numThreads);

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.execute(new ConsumerMsgTask(stream, threadNumber));

threadNumber++;

}

}

class ConsumerMsgTask implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;

public ConsumerMsgTask(KafkaStream stream, int threadNumber) {

m_threadNumber = threadNumber;

m_stream = stream;

}

public void run() {

ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

long offset = 0;

try {

while (it.hasNext())

offset = it.next().offset();

byte[] bytes = it.next().message();

String msg = new String(bytes, "UTF-8");

System.out.print("offset: " + offset + ",msg:" + msg);

System.out.println("Shutting down Thread: " + m_threadNumber);

} catch (UnsupportedEncodingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

四、實驗驗證







轉載於:https://my.oschina.net/boltwu/blog/703831