kafka詳解三:開發Kafka應用
阿新 • • 發佈:2019-01-11
問題導讀
1.Kafka系統由什麼組成?
2.Kafka中和producer相關的API是什麼?
一、整體看一下Kafka
我們知道,Kafka系統有三大元件:Producer、Consumer、broker 。
file:///C:/Users/ADMINI~1/AppData/Local/Temp/enhtmlclip/Image.png
producers 生產(produce)訊息(message)並推(push)送給brokers,consumers從brokers把訊息提取(pull)出來消費(consume)。
二、開發一個Producer應用
Producers用來生產訊息並把產生的訊息推送到Kafka的Broker。Producers可以是各種應用,比如web應用,伺服器端應用,代理應用以及log系統等等。當然,Producers現在有各種語言的實現比如Java、C、Python等。
我們先看一下Producer在Kafka中的角色:
file:///C:/Users/ADMINI~1/AppData/Local/Temp/enhtmlclip/Image(1).png
2.1.kafka Producer 的 API
Kafka中和producer相關的API有三個類
2.2下面我們就寫一個最簡單的Producer:產生一條訊息並推送給broker
三、開發一個consumer應用
Consumer是用來消費Producer產生的訊息的,當然一個Consumer可以是各種應用,如可以是一個實時的分析系統,也可以是一個數據倉庫或者是一個基於釋出訂閱模式的解決方案等。Consumer端同樣有多種語言的實現,如Java、C、Python等。
我們看一下Consumer在Kafka中的角色:
3.1.kafka Producer 的 API
Kafka和Producer稍微有些不同,它提供了兩種型別的API
由於是第一個應用,我們這部分使用high-level API,它的特點每消費一個message自動移動offset值到下一個message,關於offset在後面的部分會單獨介紹。與Producer類似,和Consumer相關的有三個主要的類:
3.2下面我們就寫一個最簡單的Consumer:從broker中消費一個訊息
四、執行檢視結果
先啟動伺服器端相關程序:
再執行我們寫的應用
1.Kafka系統由什麼組成?
2.Kafka中和producer相關的API是什麼?
一、整體看一下Kafka
我們知道,Kafka系統有三大元件:Producer、Consumer、broker 。
file:///C:/Users/ADMINI~1/AppData/Local/Temp/enhtmlclip/Image.png
producers 生產(produce)訊息(message)並推(push)送給brokers,consumers從brokers把訊息提取(pull)出來消費(consume)。
二、開發一個Producer應用
Producers用來生產訊息並把產生的訊息推送到Kafka的Broker。Producers可以是各種應用,比如web應用,伺服器端應用,代理應用以及log系統等等。當然,Producers現在有各種語言的實現比如Java、C、Python等。
我們先看一下Producer在Kafka中的角色:
file:///C:/Users/ADMINI~1/AppData/Local/Temp/enhtmlclip/Image(1).png
2.1.kafka Producer 的 API
Kafka中和producer相關的API有三個類
- Producer:最主要的類,用來建立和推送訊息
- KeyedMessage:定義要傳送的訊息物件,比如定義傳送給哪個topic,partition key和傳送的內容等。
-
ProducerConfig:配置Producer,比如定義要連線的brokers、partition class、serializer class、partition key等
2.2下面我們就寫一個最簡單的Producer:產生一條訊息並推送給broker
-
package bonree.producer;
-
import java.util.Properties;
-
import kafka.javaapi.producer.Producer;
-
import kafka.producer.KeyedMessage;
-
import kafka.producer.ProducerConfig;
-
/*******************************************************************************
-
* BidPlanStructForm.java Created on 2014-7-8
-
* Author: <a href=mailto:[email protected]>houda</a>
-
* @Title: SimpleProducer.java
-
* @Package bonree.producer
-
* Description:
-
* Version: 1.0
-
******************************************************************************/
-
public class SimpleProducer {
-
private static Producer<Integer,String> producer;
-
private final Properties props=new Properties();
-
public SimpleProducer(){
-
//定義連線的broker list
-
props.put("metadata.broker.list", "192.168.4.31:9092");
-
//定義序列化類(Java物件傳輸前要序列化)
-
props.put("serializer.class", "kafka.serializer.StringEncoder");
-
producer = new Producer<Integer, String>(new ProducerConfig(props));
-
}
-
public static void main(String[] args) {
-
SimpleProducer sp=new SimpleProducer();
-
//定義topic
-
String topic="mytopic";
-
//定義要傳送給topic的訊息
-
String messageStr = "send a message to broker ";
-
//構建訊息物件
-
KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
-
//推送訊息到broker
-
producer.send(data);
-
producer.close();
-
}
- }
三、開發一個consumer應用
Consumer是用來消費Producer產生的訊息的,當然一個Consumer可以是各種應用,如可以是一個實時的分析系統,也可以是一個數據倉庫或者是一個基於釋出訂閱模式的解決方案等。Consumer端同樣有多種語言的實現,如Java、C、Python等。
我們看一下Consumer在Kafka中的角色:
3.1.kafka Producer 的 API
Kafka和Producer稍微有些不同,它提供了兩種型別的API
- high-level consumer API:提供了對底層API的抽象,使用起來比較簡單
-
simple consumer API:允許重寫底層API的實現,提供了更多的控制權,當然使用起來也複雜一些
由於是第一個應用,我們這部分使用high-level API,它的特點每消費一個message自動移動offset值到下一個message,關於offset在後面的部分會單獨介紹。與Producer類似,和Consumer相關的有三個主要的類:
- KafkaStream:這裡面返回的就是Producer生產的訊息
- ConsumerConfig:定義要連線zookeeper的一些配置資訊(Kafka通過zookeeper均衡壓力,具體請查閱見面幾篇文章),比如定義zookeeper的URL、group id、連線zookeeper過期時間等。
-
ConsumerConnector:負責和zookeeper進行連線等工作
3.2下面我們就寫一個最簡單的Consumer:從broker中消費一個訊息
-
package bonree.consumer;
-
import java.util.HashMap;
-
import java.util.List;
-
import java.util.Map;
-
import java.util.Properties;
-
import kafka.consumer.Consumer;
-
import kafka.consumer.ConsumerConfig;
-
import kafka.consumer.ConsumerIterator;
-
import kafka.consumer.KafkaStream;
-
import kafka.javaapi.consumer.ConsumerConnector;
-
/*******************************************************************************
-
* Created on 2014-7-8 Author: <a
-
* href=mailto:[email protected]>houda</a>
-
* @Title: SimpleHLConsumer.java
-
* @Package bonree.consumer Description: Version: 1.0
-
******************************************************************************/
-
public class SimpleHLConsumer {
-
private final ConsumerConnector consumer;
-
private final String topic;
-
public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
-
Properties props = new Properties();
-
//定義連線zookeeper資訊
-
props.put("zookeeper.connect", zookeeper);
-
//定義Consumer所有的groupID,關於groupID,後面會繼續介紹
-
props.put("group.id", groupId);
-
props.put("zookeeper.session.timeout.ms", "500");
-
props.put("zookeeper.sync.time.ms", "250");
-
props.put("auto.commit.interval.ms", "1000");
-
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
-
this.topic = topic;
-
}
-
public void testConsumer() {
-
Map<String, Integer> topicCount = new HashMap<String, Integer>();
-
//定義訂閱topic數量
-
topicCount.put(topic, new Integer(1));
-
//返回的是所有topic的Map
-
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
-
//取出我們要需要的topic中的訊息流
-
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
-
for (final KafkaStream stream : streams) {
-
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
-
while (consumerIte.hasNext())
-
System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
-
}
-
if (consumer != null)
-
consumer.shutdown();
-
}
-
public static void main(String[] args) {
-
String topic = "mytopic";
-
SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.4.32:2181", "testgroup", topic);
-
simpleHLConsumer.testConsumer();
-
}
-
}
四、執行檢視結果
先啟動伺服器端相關程序:
- 執行zookeeper:[[email protected] kafka-0.8]# bin/zookeeper-server-start.sh config/zookeeper.properties
-
執行Kafkabroker:[[email protected] kafka-0.8]# bin/kafka-server-start.sh config/server.properties
再執行我們寫的應用
- 執行剛才寫的SimpleHLConsumer 類的main函式,等待生產者生產訊息
-
執行SimpleProducer的main函式,生產訊息並push到broker
結果:執行完SimpleProducer後在SimpleHLConsumer的控制檯即可看到生產者生產的訊息:“send a message to broker”。
轉載: http://www.aboutyun.com/thread-11115-1-1.html