1. 程式人生 > >Kafka應用入門

Kafka應用入門

Kafka是一款基於釋出和訂閱的訊息系統
運作方式如圖:
在這裡插入圖片描述
各個元件的作用:
生產者:將資料依據主題,分割槽發往broker
broker:broker接收來自生產者的訊息,為休息設定偏移量,並將訊息儲存到磁碟
zookeeper:管理各個分散式broker節點,為各個節點提供資料共享,資料一致性,選主服務。(可以看一下Zookeeper和Kafka的關係,為啥Kafka依賴Zookeeper )
消費者:從主題分割槽中讀取資料

Kafka的釋出訂閱體現在,生產者將資料一個一個寫入相應的主題,消費者通過訂閱主題,從主題分割槽中讀取一個一個數據。
下面是主題和分割槽圖:
在這裡插入圖片描述
消費者消費規則:


1.主題的一個分割槽只能供同個消費者群組的一個消費者消費。
如圖所示:
在這裡插入圖片描述在這裡插入圖片描述
2.如果分割槽數小於消費者數量,那麼多出來的消費者啥也不幹。
如圖所示
在這裡插入圖片描述
所以增加分割槽,往消費者群組裡增加消費者是橫向伸縮消費能力的主要方式。

Kafka的安裝
1.zookeeper的安裝和使用
先要裝java1.8,然後下載zookeeper包,解壓
(1)先設定配置檔案
將conf目錄下的zoo_sample.cfg更名為zoo.cfg,預設埠為2181
(2)使用bin/zkServer.sh start 啟動zookeeper
2.kafka安裝
解壓縮包即可,預設埠9092,連線的zookeeper埠2181
啟動kafka
bin/kafka-server-start.sh config/server.properties &
也可以在一臺機子上開啟另外2個節點broker然後再啟動
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

簡單實用Kafka
建立主題,只有一個副本,一個分割槽
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
驗證主題
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
往測試主題上釋出訊息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
讀取訊息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning(舊版本)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning(新版本2.1)

使用java編寫生產者和消費者
pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.zte.apt</groupId>
	<artifactId>KafkaTest</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<name>KafkaTest</name>
	<!-- FIXME change it to the project's website -->
	<url>http://www.example.com</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.1.0</version>
		</dependency>
		<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>
		
	</dependencies>

	<build>
		<pluginManagement><!-- lock down plugins versions to avoid using Maven 
				defaults (may be moved to parent pom) -->
			<plugins>
			<plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
            <archive>  
                    <manifest>  
                        <mainClass>com.zte.apt.KafkaTest.KafkaProducerTest</mainClass>  
                    </manifest>  
                </archive>
                <descriptorRefs>  
                    <descriptorRef>jar-with-dependencies</descriptorRef>  
                </descriptorRefs>  
            </configuration>  
            <executions>  
                <execution>  
                    <id>make-assembly</id>  
                    <phase>package</phase>  
                    <goals>  
                        <goal>single</goal>  
                    </goals>  
                </execution>  
            </executions>  
        </plugin>  
			</plugins>
		</pluginManagement>
	</build>
</project>

生產者
1建立Properties配置物件,配置broker地址,鍵,鍵值的序列化等等
2.建立生產者KafkaProducer
3.建立記錄物件ProducerRecord物件,往ProducerRecord存放主題,鍵,鍵值
4.KafkaProducer傳送ProducerRecord物件,其中傳送方式有:
(1)fire-and-forget傳送並忘記,並不關心是否正常到達,雖然生產者有自動嘗試重發機制,但有時候也會丟失一些資訊
(2)send(ProducerRecord)同步傳送,它會返回一個future物件,呼叫get()方法進行等候,就可以知道訊息是否傳送成功
(3)send(ProducerRecord,Callback)非同步傳送,多一個回撥函式
5.程式碼

package com.zte.KafkaTest;

import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaProducerTest {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put("bootstrap.servers", "10.41.41.202:9092");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		Producer<String, String> producer = new KafkaProducer<>(props);
		//主題的名字,傳送的鍵,值
		ProducerRecord<String, String> record = new ProducerRecord<>("topictest", "keyhaha", "valuehehe");
		try
		{
			for (int i = 0; i < 100; i++)
			     producer.send(new ProducerRecord<String, String>("topictest", Integer.toString(i), Integer.toString(i)));
			System.out.println("已經發送完畢");
			//非同步傳送,為了傳送成功,主函式最好等待一小段時間
			Thread.sleep(5000);
			
			//同步傳送.呼叫get等待
//			Future<RecordMetadata> result = producer.send(record);
//			System.out.println(result.get().partition());
		}
		catch(Exception e)
		{
			e.printStackTrace();
		}
	}
}

消費者
向消費者群組增加消費者是橫向伸縮消費能力的主要方式,一個主題的每一個分割槽只能分配給同一群組的一個消費者
所以我們有必要為主題建立大量的分割槽,在負載增長時可以加入更多的消費者,不過要注意,不要讓消費者的數量超過主題分割槽的數量,多餘的消費者只會被閒置。
1.建立Properties配置物件,配置broker地址,groupid 群組名字,鍵,鍵值的反序列化等等
2.建立消費者KafkaConsumer
3.訂閱consumer.subscribe(Collections.singletonList(“主題,也可使用正則”))
4.輪詢consumer.poll(100),返回ConsumerRecords記錄物件,
5.提交偏移量,即更新分割槽當前的位置
(1)自動提交設定enable.auto,commit設定為true,每隔一段時間提交一次
(2)同步提交commitSync,阻塞
(3)非同步提交commitAsync,非阻塞
(4)也可以使用非同步和同步結合提交

package com.zte.KafkaTest;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerTest {

	public static void main(String[] args) 
	{
		Properties props = new Properties();
		props.put("bootstrap.servers", "10.41.41.202:9092");
		//指定消費者所屬群組的名字
		props.put("group.id", "group1");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		//訂閱單個主題
		consumer.subscribe(Collections.singletonList("topictest"));
//		//訂閱多個主題
//		consumer.subscribe(Arrays.asList("foo", "bar"));
//		//使用正則訂閱主題
//		consumer.subscribe("test.*");
		try
		{
			while (true) 
			{
		         ConsumerRecords<String, String> records = consumer.poll(100);
		         for (ConsumerRecord<String, String> record : records)
		         {
		             System.out.printf("tppic=%s,partition=%s,offset=%d, key=%s, value =%s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());
		         }
		     }
		}
		finally
		{
			consumer.close();
		}
	}
}