kafka簡介和入門
一、Kafka概述
Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
這是Kafka官網裡的一段介紹,從這段話,我們可以知道,kafka在構建實時資料管道和streaming應用中被常使用,它可以橫向擴充套件,容錯,快速地執行在數以千計的公司產品上。
它可以像訊息系統一樣讀寫資料流,並且可以在實時業務的場景中寫可靠的流處理應用,並且能安全地儲存資料流到分散式、多副本、容錯的叢集中。所以通俗點理解,可以說Kafka就是一個訊息中介軟體。
Kafka場景比喻
接下來我大概比喻下Kafka的使用場景
訊息中介軟體:生產者和消費者
媽媽:生產者
你:消費者
饅頭:資料流、訊息
正常情況下: 生產一個 消費一個
其他情況:
1)一直生產,你吃到某一個饅頭時,你卡主(機器故障), 饅頭就丟失了
2)一直生產,做饅頭速度快,你吃來不及,饅頭也就丟失了
為了放著其他生產情況的出現,我們可以拿個碗/籃子,饅頭做好以後先放到籃子裡,你要吃的時候去籃子裡面取出來吃,而這籃子/框就可以為:Kafka。當籃子滿了,饅頭就裝不下了,咋辦? 多準備幾個籃子 === Kafka的擴容
二、Kafka的架構和核心概念
這是張我在Kafka官網上截的圖,我大概可以把Kafka的主要結構分為以下幾點:
producer:生產者,就是生產饅頭(老媽)
consumer:消費者,就是吃饅頭的(你)
broker:籃子
topic:主題,給饅頭帶一個標籤,topica的饅頭是給你吃的,topicb的饅頭是給你弟弟吃
另外,我們在看下官網上的這段解釋:
First a few concepts:
- Kafka is run as a cluster on one or more servers.
- The Kafka cluster stores streams of records
- Each record consists of a key, a value, and a timestamp.
可以看出來:
1.Kafka可以作為叢集執行在一臺或者多個伺服器上面;
2.Kafka叢集可以分類地儲存記錄流,以打標籤的方式,就是採用topics,每個broker可以打個topic,這樣能保證消費者可以根據topic選擇性消費;
3.每個記錄由Key、Value、timestamp構成。
Kafka四個核心的API
1.ProducerAPI:允許一個應用向一個或多個topic裡釋出記錄流;
2.ConsumerAPI:允許一個應用訂閱一個或多個topics,處理topic裡的資料流,就相當於消費;
3.StreamAPI:允許應用扮演流處理的作用,從一個或多個topic裡消費資料流,然後產生輸出流資料到其他一個或多個topic裡,對輸入流資料有效傳輸到輸出口;
4.ConnectorAPI:允許執行和構建一個可重複利用的生產者和消費者,能將kafka的topic與其他存在的應用和資料庫裝置相連線,比如連結一個實時資料庫,可以捕捉到每張表的變化。
這四個API,主要應用在IDEA上對應用程式的開發中,通過程式碼的形式管理Kafka。在第四部分將會對前兩個API寫個簡單DEMO演示。
三、Kafka的快速使用
Kafka使用到了zookeeper,所以首先你得安裝zookeeper再安裝kafka。
1.單節點的broker部署
首先我們需要修改$KAFKA_HOME/config/server.properties這個配置檔案,主要以下幾處需要修改:
broker.id=0,每個broker的ID需要唯一
listeners:監聽的埠(此處筆者設定的是預設埠9092)
host.name:當前機器
log.dirs:儲存日誌的資料夾
num.partitions:分割槽的數量
zookeeper.connect:zookeeper的地址(預設為localhost:2181)
這幾處根據你自身需要進行配置,然後啟動步驟如下:
1)開啟zookeeper,此處需要注意的是zookeeper的conf目錄下的zoo.cfg配置檔案,主要修改的也是日誌儲存目錄那塊。
2)啟動Kafka,命令為:kafka-server-start.sh $KAFKA_HOME/config/server.properties
3)建立topic,需要指定zookeeper,命令為:kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic。 注意指定zookeeper,後面幾個屬性可以根據你實際情況進行定義。另外檢視所有topic的命令為:
kafka-topics.sh --list --zookeeper hadoop000:2181
4)傳送訊息,需要指定broker,命令為:kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic
5)消費訊息,需要指定zookeeper,命令為:kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic --from-beginning。意思就是指定zookeeper上的topic進行消費,from-beginning的設定,可以檢視之前的訊息。
2.單節點,多broker
主要是增加多個server.properties檔案,一個配置檔案就相當於一個broker,我就設定三個broker:
-
server-1.properties
-
log.dirs=/home/hadoop/app/tmp/kafka-logs-1
-
listeners=PLAINTEXT://:9093
-
broker.id=1
-
server-2.properties
-
log.dirs=/home/hadoop/app/tmp/kafka-logs-2
-
listeners=PLAINTEXT://:9094
-
broker.id=2
-
server-3.properties
-
log.dirs=/home/hadoop/app/tmp/kafka-logs-3
-
listeners=PLAINTEXT://:9095
-
broker.id=3
然後依次開啟,命令如下:
-
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
-
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
-
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &
接下來就跟上面的步驟一樣:
-
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
-
kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic
-
kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic my-replicated-topic
檢視 topic的詳細資訊:
kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic my-replicated-topic
要注意的是,副本中會有個leader,而多副本也實現了kafka的容錯性,掛掉一個副本後,會自動在剩下副本里選出一個leader來同步操作。
根據上面步驟操作,我們在producer視窗輸入,在consumer消費視窗看到相應輸出。
四、Producer和Consumer API的使用
接下來展示一個簡單的Demo,在生產端簡單建立個執行緒進行迴圈輸出,然後用消費者端對輸出的內容進行展示,也就是消費。
配置檔案
-
/**
-
* Kafka常用配置檔案
-
*/
-
public class KafkaProperties {
-
public static final String ZK = "192.168.199.111:2181";
-
public static final String TOPIC = "hello_topic";
-
public static final String BROKER_LIST = "192.168.199.111:9092";
-
public static final String GROUP_ID = "test_group1";
-
}
Producer API DEMO
-
import kafka.javaapi.producer.Producer;
-
import kafka.producer.KeyedMessage;
-
import kafka.producer.ProducerConfig;
-
import java.util.Properties;
-
/**
-
* Kafka生產者
-
*/
-
public class KafkaProducer extends Thread{
-
private String topic;
-
private Producer<Integer, String> producer;
-
public KafkaProducer(String topic) {
-
this.topic = topic;
-
Properties properties = new Properties();
-
properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
-
properties.put("serializer.class","kafka.serializer.StringEncoder");
-
properties.put("request.required.acks","1");
-
producer = new Producer<Integer, String>(new ProducerConfig(properties));
-
}
-
@Override
-
public void run() {
-
int messageNo = 1;
-
while(true) {
-
String message = "message_" + messageNo;
-
producer.send(new KeyedMessage<Integer, String>(topic, message));
-
System.out.println("Sent: " + message);
-
messageNo ++ ;
-
try{
-
Thread.sleep(2000);
-
} catch (Exception e){
-
e.printStackTrace();
-
}
-
}
-
}
-
}
Consumer API DEMO
-
import kafka.consumer.Consumer;
-
import kafka.consumer.ConsumerConfig;
-
import kafka.consumer.ConsumerIterator;
-
import kafka.consumer.KafkaStream;
-
import kafka.javaapi.consumer.ConsumerConnector;
-
import java.util.HashMap;
-
import java.util.List;
-
import java.util.Map;
-
import java.util.Properties;
-
/**
-
* Kafka消費者
-
*/
-
public class KafkaConsumer extends Thread{
-
private String topic;
-
public KafkaConsumer(String topic) {
-
this.topic = topic;
-
}
-
private ConsumerConnector createConnector(){
-
Properties properties = new Properties();
-
properties.put("zookeeper.connect", KafkaProperties.ZK);
-
properties.put("group.id",KafkaProperties.GROUP_ID);
-
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
-
}
-
@Override
-
public void run() {
-
ConsumerConnector consumer = createConnector();
-
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-
topicCountMap.put(topic, 1);
-
// topicCountMap.put(topic2, 1);
-
// topicCountMap.put(topic3, 1);
-
// String: topic
-
// List<KafkaStream<byte[], byte[]>> 對應的資料流
-
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
-
KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0); //獲取我們每次接收到的暑假
-
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
-
while (iterator.hasNext()) {
-
String message = new String(iterator.next().message());
-
System.out.println("rec: " + message);
-
}
-
}
-
}
最後在main函式對這兩個類呼叫即可,結果如下:
--------------------- 作者:瘋兔子大叔 來源:CSDN 原文:https://blog.csdn.net/wing_93/article/details/78513782?utm_source=copy 版權宣告:本文為博主原創文章,轉載請附上博文連結!