kafka的java api示例
阿新 • • 發佈:2019-01-12
kafka的javaAPI
生產者示例:
建立配置:
1、new Properties()
2、新增配置 metadata.broker.list
serializer.class = kafka.serializer.StringEncoder
3、ProducerConfig()
4、建立Producer
5、傳送
示例程式碼:
Properties proper = new Properties(); proper.put("metadata.broker.list","node1:9092,node2:9092,node3:9092"); proper.put("serializer.class","kafka.serializer.StringEncoder"); ProducerConfig producerConfig = new ProducerConfig(proper); Producer<String, String> producer = new Producer<String, String>(producerConfig); for (int i=0;i<100;i++) { producer.send(new KeyedMessage<String, String>("yuan","test"+i)); }
消費者示例:
1、指定topic 指定執行緒數
2、new Properties()
3、新增配置:zookeeper group.id
auto.offset.reset=smallest 相當於--from-beginning
4、ConsumerConfig
5、建立Consumer連線
6、建立訊息流並,將1中的值新增
7、獲取主題中的流資料並輸出
示例程式碼
final static String topic="yuan"; final static Integer threadNum=2; public static void main(String[] args) { Properties prop = new Properties(); prop.put("zookeeper.connect", "node1:2181,node2:2181,node3:2181"); prop.put("group.id", "yuan"); prop.put("auto.offset.reset", "smallest"); ConsumerConfig consumerConfig = new ConsumerConfig(prop); ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(consumerConfig); HashMap<String, Integer> map = new HashMap<String, Integer>(); map.put(topic,threadNum ); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConn.createMessageStreams(map); List<KafkaStream<byte[], byte[]>> messageStream = messageStreams.get(topic); for (KafkaStream<byte[], byte[]> kafkaStream : messageStream) { new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) { String mess = new String(messageAndMetadata.message()); System.out.println(mess); } } }).start(); } }