1. 程式人生 > >Kafka的單執行緒生產消費測試

Kafka的單執行緒生產消費測試

程式碼:

package com.weichai.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * Kafka生產者的簡單示例
 * @author lhy
 * @date 2018.10.09
 */
public class SimpleProducer {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Properties props = new Properties();
		props.setProperty("metadata.broker.list", "localhost:9092"); // 設定kafka的埠為預設埠9020
		props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
		props.put("request.required.acks", "1");
		ProducerConfig config = new ProducerConfig(props);
		
		//建立生產者物件
		Producer<String, String> producer = new Producer<String, String>(config);
		//生成訊息
		KeyedMessage<String, String> data = new KeyedMessage<String, String>("SimpleNode", "Kafka Simple Test");
		int i = 1;
		try {
			while (i<100){ 
				// 傳送訊息
				producer.send(data);
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		producer.close();
	}

}
package com.weichai.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * 消費者簡單測試(單執行緒獲取消費資料)
 * @author lhy
 * @date 2018.10.09
 */
public class SimpleConsumer extends Thread{

	//消費者連線
	private final ConsumerConnector consumer;
	// 要消費的話題
    private final String topic;
	
	public SimpleConsumer(String topic) {
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
		this.topic = topic;
	}

    //配置相關資訊
	private static ConsumerConfig createConsumerConfig() {
		// TODO Auto-generated method stub
		Properties props = new Properties();
		// props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
		// 配置要連線的zookeeper地址與埠
		props.put("zookeeper.connect", "localhost:2181");
		// 配置zookeeper的組id (The ‘group.id’ string defines the Consumer Group
		// this process is consuming on behalf of.)
		props.put("group.id", "0");
		// 配置zookeeper連線超時間隔
		props.put("zookeeper.session.timeout.ms", "10000");
		// The ‘zookeeper.sync.time.ms’ is the number of milliseconds a
		// ZooKeeper ‘follower’ can be behind the master before an error occurs.
		props.put("zookeeper.sync.time.ms", "200");
		// The ‘auto.commit.interval.ms’ setting is how often updates to the
		// consumed offsets are written to ZooKeeper.
		// Note that since the commit frequency is time based instead of # of
		// messages consumed, if an error occurs between updates to ZooKeeper on
		// restart you will get replayed messages.
		props.put("auto.commit.interval.ms", "1000");
		return new ConsumerConfig(props);
	}

    public void run(){
    	Map<String, Integer> topickMap = new HashMap<String, Integer>();
    	topickMap.put(topic, 1);
    	Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
    	KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		System.out.println("*********Kafka消費者結果********");
		while(true){
			if(it.hasNext()){
				//列印得到的訊息
				System.err.println(Thread.currentThread() + " get kafka data:" + new String(it.next().message()));
			}
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
    }
 
	public static void main(String[] args) {
		// TODO Auto-generated method stub
        SimpleConsumer consumerThread = new SimpleConsumer("SimpleNode");
        consumerThread.start();
	}

}

執行完生產者,開啟Kafka-manager即可看到剛才生產者產生的Topic---SimpleNode

執行消費者程式,單執行緒列印生產的topic資訊,如下: