1. 程式人生 > >kafka的java api示例

kafka的java api示例

kafka的javaAPI

生產者示例:
建立配置:

    1、new Properties()
	2、新增配置 metadata.broker.list
				serializer.class = kafka.serializer.StringEncoder
	3、ProducerConfig()
	
	4、建立Producer
	
	5、傳送

示例程式碼:

	Properties proper = new Properties();
    proper.put("metadata.broker.list","node1:9092,node2:9092,node3:9092");
    proper.put("serializer.class","kafka.serializer.StringEncoder");

    ProducerConfig producerConfig = new ProducerConfig(proper);

    Producer<String, String> producer = new Producer<String, String>(producerConfig);

    for (int i=0;i<100;i++)
    {
        producer.send(new KeyedMessage<String, String>("yuan","test"+i));
    }

消費者示例:

	1、指定topic   指定執行緒數
	2、new Properties()
	3、新增配置:zookeeper   group.id    
				 auto.offset.reset=smallest  相當於--from-beginning
	4、ConsumerConfig
	
	5、建立Consumer連線
	
	6、建立訊息流並,將1中的值新增
	
	7、獲取主題中的流資料並輸出

示例程式碼

	final static String topic="yuan";
	final static Integer threadNum=2;
	public static void main(String[] args) {
	Properties prop = new Properties();
	prop.put("zookeeper.connect", "node1:2181,node2:2181,node3:2181");
	prop.put("group.id", "yuan");
	prop.put("auto.offset.reset", "smallest");
	
	ConsumerConfig consumerConfig = new ConsumerConfig(prop);
	
	ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(consumerConfig);
	HashMap<String, Integer> map = new HashMap<String, Integer>();
	map.put(topic,threadNum );
	Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConn.createMessageStreams(map);
	
	List<KafkaStream<byte[], byte[]>> messageStream = messageStreams.get(topic);
	
	for (KafkaStream<byte[], byte[]> kafkaStream : messageStream) {
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				// TODO Auto-generated method stub
				for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
					String mess = new String(messageAndMetadata.message());
					System.out.println(mess);
				}
			}
		}).start();
	}
	
}