package com.weichai.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

 * Kafka生產者的簡單示例
 * @author lhy
 * @date 2018.10.09
public class SimpleProducer {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Properties props = new Properties();
		props.setProperty("metadata.broker.list", "localhost:9092"); // 設定kafka的埠為預設埠9020
		props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
		props.put("request.required.acks", "1");
		ProducerConfig config = new ProducerConfig(props);
		Producer<String, String> producer = new Producer<String, String>(config);
		KeyedMessage<String, String> data = new KeyedMessage<String, String>("SimpleNode", "Kafka Simple Test");
		int i = 1;
		try {
			while (i<100){ 
				// 傳送訊息
		} catch (Exception e) {
			// TODO Auto-generated catch block

package com.weichai.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

 * 消費者簡單測試(單執行緒獲取消費資料)
 * @author lhy
 * @date 2018.10.09
public class SimpleConsumer extends Thread{

	private final ConsumerConnector consumer;
	// 要消費的話題
    private final String topic;
	public SimpleConsumer(String topic) {
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
		this.topic = topic;

	private static ConsumerConfig createConsumerConfig() {
		// TODO Auto-generated method stub
		Properties props = new Properties();
		// props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
		// 配置要連線的zookeeper地址與埠
		props.put("zookeeper.connect", "localhost:2181");
		// 配置zookeeper的組id (The ‘group.id’ string defines the Consumer Group
		// this process is consuming on behalf of.)
		props.put("group.id", "0");
		// 配置zookeeper連線超時間隔
		props.put("zookeeper.session.timeout.ms", "10000");
		// The ‘zookeeper.sync.time.ms’ is the number of milliseconds a
		// ZooKeeper ‘follower’ can be behind the master before an error occurs.
		props.put("zookeeper.sync.time.ms", "200");
		// The ‘auto.commit.interval.ms’ setting is how often updates to the
		// consumed offsets are written to ZooKeeper.
		// Note that since the commit frequency is time based instead of # of
		// messages consumed, if an error occurs between updates to ZooKeeper on
		// restart you will get replayed messages.
		props.put("auto.commit.interval.ms", "1000");
		return new ConsumerConfig(props);

    public void run(){
    	Map<String, Integer> topickMap = new HashMap<String, Integer>();
    	topickMap.put(topic, 1);
    	Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
    	KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
				System.err.println(Thread.currentThread() + " get kafka data:" + new String(it.next().message()));
			try {
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
	public static void main(String[] args) {
		// TODO Auto-generated method stub
        SimpleConsumer consumerThread = new SimpleConsumer("SimpleNode");


