1. 程式人生 > >kafka詳解三:開發Kafka應用

kafka詳解三:開發Kafka應用

問題導讀

1.Kafka系統由什麼組成?
2.Kafka中和producer相關的API是什麼?












一、整體看一下Kafka
我們知道,Kafka系統有三大元件:Producer、Consumer、broker 。
 
file:///C:/Users/ADMINI~1/AppData/Local/Temp/enhtmlclip/Image.png
producers 生產(produce)訊息(message)並推(push)送給brokers,consumers從brokers把訊息提取(pull)出來消費(consume)。

二、開發一個Producer應用 
     Producers用來生產訊息並把產生的訊息推送到Kafka的Broker。Producers可以是各種應用,比如web應用,伺服器端應用,代理應用以及log系統等等。當然,Producers現在有各種語言的實現比如Java、C、Python等。

     我們先看一下Producer在Kafka中的角色:

 file:///C:/Users/ADMINI~1/AppData/Local/Temp/enhtmlclip/Image(1).png

2.1.kafka Producer 的 API
Kafka中和producer相關的API有三個類
  • Producer:最主要的類,用來建立和推送訊息
  • KeyedMessage:定義要傳送的訊息物件,比如定義傳送給哪個topic,partition key和傳送的內容等。
  • ProducerConfig:配置Producer,比如定義要連線的brokers、partition class、serializer class、partition key等


2.2下面我們就寫一個最簡單的Producer:產生一條訊息並推送給broker
  1. package bonree.producer;
  2. import java.util.Properties;
  3. import kafka.javaapi.producer.Producer;
  4. import kafka.producer.KeyedMessage;
  5. import kafka.producer.ProducerConfig;
  6. /*******************************************************************************
  7. * BidPlanStructForm.java Created on 2014-7-8
  8. * Author: <a href=mailto:[email protected]>houda</a>
  9. * @Title: SimpleProducer.java
  10. * @Package bonree.producer
  11. * Description:
  12. * Version: 1.0
  13. ******************************************************************************/
  14. public class SimpleProducer {
  15.         private static Producer<Integer,String> producer;
  16.         private final Properties props=new Properties();
  17.         public SimpleProducer(){
  18.                 //定義連線的broker list
  19.                 props.put("metadata.broker.list", "192.168.4.31:9092");
  20.                 //定義序列化類(Java物件傳輸前要序列化)
  21.                 props.put("serializer.class", "kafka.serializer.StringEncoder");
  22.                 producer = new Producer<Integer, String>(new ProducerConfig(props));
  23.         }
  24.         public static void main(String[] args) {
  25.                 SimpleProducer sp=new SimpleProducer();
  26.                 //定義topic
  27.                 String topic="mytopic";
  28.                 //定義要傳送給topic的訊息
  29.                 String messageStr = "send a message to broker ";
  30.                 //構建訊息物件
  31.                 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
  32.                 //推送訊息到broker
  33.                 producer.send(data);
  34.                 producer.close();
  35.         }
  36. }
複製程式碼
三、開發一個consumer應用
     Consumer是用來消費Producer產生的訊息的,當然一個Consumer可以是各種應用,如可以是一個實時的分析系統,也可以是一個數據倉庫或者是一個基於釋出訂閱模式的解決方案等。Consumer端同樣有多種語言的實現,如Java、C、Python等。
     我們看一下Consumer在Kafka中的角色:
 
3.1.kafka Producer 的 API
     Kafka和Producer稍微有些不同,它提供了兩種型別的API
  • high-level consumer API:提供了對底層API的抽象,使用起來比較簡單
  • simple consumer API:允許重寫底層API的實現,提供了更多的控制權,當然使用起來也複雜一些


由於是第一個應用,我們這部分使用high-level API,它的特點每消費一個message自動移動offset值到下一個message,關於offset在後面的部分會單獨介紹。與Producer類似,和Consumer相關的有三個主要的類:
  • KafkaStream:這裡面返回的就是Producer生產的訊息
  • ConsumerConfig:定義要連線zookeeper的一些配置資訊(Kafka通過zookeeper均衡壓力,具體請查閱見面幾篇文章),比如定義zookeeper的URL、group id、連線zookeeper過期時間等。
  • ConsumerConnector:負責和zookeeper進行連線等工作


3.2下面我們就寫一個最簡單的Consumer:從broker中消費一個訊息

  1. package bonree.consumer;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import kafka.consumer.Consumer;
  7. import kafka.consumer.ConsumerConfig;
  8. import kafka.consumer.ConsumerIterator;
  9. import kafka.consumer.KafkaStream;
  10. import kafka.javaapi.consumer.ConsumerConnector;
  11. /*******************************************************************************
  12. * Created on 2014-7-8 Author: <a
  13. * href=mailto:[email protected]>houda</a>
  14. * @Title: SimpleHLConsumer.java
  15. * @Package bonree.consumer Description: Version: 1.0
  16. ******************************************************************************/
  17. public class SimpleHLConsumer {
  18.         private final ConsumerConnector consumer;
  19.         private final String topic;
  20.         public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
  21.                 Properties props = new Properties();
  22.                 //定義連線zookeeper資訊
  23.                 props.put("zookeeper.connect", zookeeper);
  24.                 //定義Consumer所有的groupID,關於groupID,後面會繼續介紹
  25.                 props.put("group.id", groupId);
  26.                 props.put("zookeeper.session.timeout.ms", "500");
  27.                 props.put("zookeeper.sync.time.ms", "250");
  28.                 props.put("auto.commit.interval.ms", "1000");
  29.                 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
  30.                 this.topic = topic;
  31.         }
  32.         public void testConsumer() {
  33.                 Map<String, Integer> topicCount = new HashMap<String, Integer>();
  34.                 //定義訂閱topic數量
  35.                 topicCount.put(topic, new Integer(1));
  36.                 //返回的是所有topic的Map
  37.                 Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
  38.                 //取出我們要需要的topic中的訊息流
  39.                 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
  40.                 for (final KafkaStream stream : streams) {
  41.                         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
  42.                         while (consumerIte.hasNext())
  43.                                 System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
  44.                 }
  45.                 if (consumer != null)
  46.                         consumer.shutdown();
  47.         }
  48.         public static void main(String[] args) {
  49.                 String topic = "mytopic";
  50.                 SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.4.32:2181", "testgroup", topic);
  51.                 simpleHLConsumer.testConsumer();
  52.         }
  53. }
複製程式碼
四、執行檢視結果
先啟動伺服器端相關程序:
  • 執行zookeeper:[[email protected] kafka-0.8]# bin/zookeeper-server-start.sh config/zookeeper.properties
  • 執行Kafkabroker:[[email protected] kafka-0.8]# bin/kafka-server-start.sh config/server.properties


再執行我們寫的應用
  • 執行剛才寫的SimpleHLConsumer 類的main函式,等待生產者生產訊息
  • 執行SimpleProducer的main函式,生產訊息並push到broker


結果:執行完SimpleProducer後在SimpleHLConsumer的控制檯即可看到生產者生產的訊息:“send a message to broker”。

轉載: http://www.aboutyun.com/thread-11115-1-1.html