大資料入門(23)kafka的第一個例項
阿新 • • 發佈:2018-12-10
匯入kafka下lib的jar #################生產者:直接右鍵執行,weekend05的consumer會接收到################################# public class ProduceTest { public static void main(String[] args) throws Exception { Properties pro = new Properties(); pro.put("zookeeper.connect", "weekend05:2181,weekend06:2181,weekend07:2181"); pro.put("metadata.broker.list","weekend05:9092,weekend06:9092,weekend07:9092"); //序列號機制 pro.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig conf = new ProducerConfig(pro); Producer<String, String> producer = new Producer<String, String>(conf); // 傳送業務訊息 // 讀取檔案 讀取記憶體資料庫 讀socket埠 for(int i =0;i<1000;i++){ Thread.sleep(100); producer.send(new KeyedMessage("order_name", "huawei--"+i+"--times recall")); } } } ###############消費者:直接右鍵執行,輸出producer的資訊############################################################## public class ConsumerTest { public static void main(String[] args) { Properties pro = new Properties(); pro.put("zookeeper.connect", "weekend05:2181,weekend06:2181,weekend07:2181"); pro.put("group.id", "1111"); pro.put("auto.offset.reset", "smallest"); ConsumerConfig conf = new ConsumerConfig(pro); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(conf); Map<String,Integer> map = new HashMap<String, Integer>(); map.put("order_name", 1); //話題,執行緒個數 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(map); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("order_name"); for(final KafkaStream<byte[], byte[]> stream : streams){ new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub for(MessageAndMetadata<byte[], byte[]> msg : stream){ String ss = new String(msg.message()); System.out.println("------"+ss); } } }).start(); } } }