安裝kafka到window上,編寫kafka java客戶端連線kafka
阿新 • • 發佈:2019-01-10
package com.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class ConsumerDemo { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerDemo(String zookeeper, String groupid, String aTopic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerProps(zookeeper, groupid)); this.topic = aTopic; } public void run(int threads) { Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, new Integer(threads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(threads); int numThread = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerDemoRun(stream, numThread)); numThread++; } } private static ConsumerConfig ConsumerProps(String zookeeper, String groupid) { Properties properties = new Properties(); // config properties file properties.put("zookeeper.connect", zookeeper); properties.put("group.id", groupid); properties.put("zookeeper.session.timeout.ms", "400"); properties.put("zookeeper.sync.time.ms", "200"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "smallest"); return new ConsumerConfig(properties); } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public static void main(String[] args) { String zookeeper = "localhost:2181"; String groupid = "group1"; String topic = "hellotest"; int threads = 1; ConsumerDemo test = new ConsumerDemo(zookeeper, groupid, topic); test.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } test.shutdown(); } }
程式碼二
package com.kafka; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; public class ConsumerDemoRun implements Runnable { private KafkaStream aStream; private int aThread; public ConsumerDemoRun(KafkaStream stream, int thread) { aStream = stream; // set stream from main read aThread = thread; // set thread from main read } public void run() { ConsumerIterator<byte[], byte[]> iterator = aStream.iterator(); // used to // check // throughout // the list // continiously while (iterator.hasNext()) { System.out.println("Thread " + aThread + ": " + new String(iterator.next().message())); } System.out.println("Shutting down Thread: " + aThread); } }
要執行上面的程式碼,需要加入如下依賴