Kafka應用入門
Kafka是一款基於釋出和訂閱的訊息系統
運作方式如圖:
各個元件的作用:
生產者:將資料依據主題,分割槽發往broker
broker:broker接收來自生產者的訊息,為休息設定偏移量,並將訊息儲存到磁碟
zookeeper:管理各個分散式broker節點,為各個節點提供資料共享,資料一致性,選主服務。(可以看一下Zookeeper和Kafka的關係,為啥Kafka依賴Zookeeper )
消費者:從主題分割槽中讀取資料
Kafka的釋出訂閱體現在,生產者將資料一個一個寫入相應的主題,消費者通過訂閱主題,從主題分割槽中讀取一個一個數據。
下面是主題和分割槽圖:
消費者消費規則:
1.主題的一個分割槽只能供同個消費者群組的一個消費者消費。
如圖所示:
2.如果分割槽數小於消費者數量,那麼多出來的消費者啥也不幹。
如圖所示
所以增加分割槽,往消費者群組裡增加消費者是橫向伸縮消費能力的主要方式。
Kafka的安裝
1.zookeeper的安裝和使用
先要裝java1.8,然後下載zookeeper包,解壓
(1)先設定配置檔案
將conf目錄下的zoo_sample.cfg更名為zoo.cfg,預設埠為2181
(2)使用bin/zkServer.sh start 啟動zookeeper
2.kafka安裝
解壓縮包即可,預設埠9092,連線的zookeeper埠2181
啟動kafka
bin/kafka-server-start.sh config/server.properties &
也可以在一臺機子上開啟另外2個節點broker然後再啟動
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
簡單實用Kafka
建立主題,只有一個副本,一個分割槽
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
驗證主題
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
往測試主題上釋出訊息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
讀取訊息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning(舊版本)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning(新版本2.1)
使用java編寫生產者和消費者
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zte.apt</groupId>
<artifactId>KafkaTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>KafkaTest</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven
defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.zte.apt.KafkaTest.KafkaProducerTest</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
生產者
1建立Properties配置物件,配置broker地址,鍵,鍵值的序列化等等
2.建立生產者KafkaProducer
3.建立記錄物件ProducerRecord物件,往ProducerRecord存放主題,鍵,鍵值
4.KafkaProducer傳送ProducerRecord物件,其中傳送方式有:
(1)fire-and-forget傳送並忘記,並不關心是否正常到達,雖然生產者有自動嘗試重發機制,但有時候也會丟失一些資訊
(2)send(ProducerRecord)同步傳送,它會返回一個future物件,呼叫get()方法進行等候,就可以知道訊息是否傳送成功
(3)send(ProducerRecord,Callback)非同步傳送,多一個回撥函式
5.程式碼
package com.zte.KafkaTest;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.41.41.202:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
//主題的名字,傳送的鍵,值
ProducerRecord<String, String> record = new ProducerRecord<>("topictest", "keyhaha", "valuehehe");
try
{
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("topictest", Integer.toString(i), Integer.toString(i)));
System.out.println("已經發送完畢");
//非同步傳送,為了傳送成功,主函式最好等待一小段時間
Thread.sleep(5000);
//同步傳送.呼叫get等待
// Future<RecordMetadata> result = producer.send(record);
// System.out.println(result.get().partition());
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
消費者
向消費者群組增加消費者是橫向伸縮消費能力的主要方式,一個主題的每一個分割槽只能分配給同一群組的一個消費者
所以我們有必要為主題建立大量的分割槽,在負載增長時可以加入更多的消費者,不過要注意,不要讓消費者的數量超過主題分割槽的數量,多餘的消費者只會被閒置。
1.建立Properties配置物件,配置broker地址,groupid 群組名字,鍵,鍵值的反序列化等等
2.建立消費者KafkaConsumer
3.訂閱consumer.subscribe(Collections.singletonList(“主題,也可使用正則”))
4.輪詢consumer.poll(100),返回ConsumerRecords記錄物件,
5.提交偏移量,即更新分割槽當前的位置
(1)自動提交設定enable.auto,commit設定為true,每隔一段時間提交一次
(2)同步提交commitSync,阻塞
(3)非同步提交commitAsync,非阻塞
(4)也可以使用非同步和同步結合提交
package com.zte.KafkaTest;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerTest {
public static void main(String[] args)
{
Properties props = new Properties();
props.put("bootstrap.servers", "10.41.41.202:9092");
//指定消費者所屬群組的名字
props.put("group.id", "group1");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱單個主題
consumer.subscribe(Collections.singletonList("topictest"));
// //訂閱多個主題
// consumer.subscribe(Arrays.asList("foo", "bar"));
// //使用正則訂閱主題
// consumer.subscribe("test.*");
try
{
while (true)
{
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("tppic=%s,partition=%s,offset=%d, key=%s, value =%s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());
}
}
}
finally
{
consumer.close();
}
}
}