Kafka 叢集部署
阿新 • • 發佈:2020-12-02
-
Kafka 叢集部署
jar 包下載 http://kafka.apache.org/downloads.html
Kafka 叢集部署
- 解壓安裝包
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
- 修改解壓後的檔名稱
[atguigu@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
- 在/opt/module/kafka 目錄下建立 logs 資料夾
[atguigu@hadoop102 kafka]$ mkdir logs
- 修改配置檔案
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vi server.properties
- 輸入以下內容:
#broker 的全域性唯一編號,不能重複 broker.id=0 #刪除 topic 功能使能 delete.topic.enable=true #處理網路請求的執行緒數量 num.network.threads=3 #用來處理磁碟 IO 的現成數量 num.io.threads=8 #傳送套接字的緩衝區大小 socket.send.buffer.bytes=102400 #接收套接字的緩衝區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩衝區大小 socket.request.max.bytes=104857600 #kafka 執行日誌存放的路徑 log.dirs=/opt/module/kafka/logs #topic 在當前 broker 上的分割槽個數 num.partitions=1 #用來恢復和清理 data 下資料的執行緒數量 num.recovery.threads.per.data.dir=1
#segment 檔案保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連線 Zookeeper 叢集地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
- 配置環境變數
[atguigu@hadoop102 module]$ sudo vi /etc/profile #KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin [atguigu@hadoop102 module]$ source /etc/profile
- 分發安裝包
[atguigu@hadoop102 module]$ xsync kafka/
注意: 分發之後記得配置其他機器的環境變數
分別在 hadoop103 和 hadoop104 上修改配置檔案/opt/module/kafka/config/server.properties中的 broker.id=1、 broker.id=2
注: broker.id 不得重複
- 啟動叢集
依次在 hadoop102、 hadoop103、 hadoop104 節點上啟動 kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties & [atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties & [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &
- 關閉叢集
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop [atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop [atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
-
Kafka 命令列操作
- 檢視當前伺服器中的所有 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
- 建立 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --create --replication-factor 3 --partitions 1 --topic first
選項說明:
--topic 定義 topic 名
--replication-factor 定義副本數
--partitions 定義分割槽數
- 刪除 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --delete --topic first
需要 server.properties 中設定 delete.topic.enable=true 否則只是標記刪除或者直接重啟
- 傳送訊息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic first >hello world >atguigu atguigu
- 消費訊息
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --from-beginning --topic first
--from-beginning: 會把 first 主題中以往所有的資料都讀取出來。 根據業務場景選擇是否 增加該配置。
- 檢視某個 Topic 的詳情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic first
-
消費者組案例
- 需求:測試同一個消費者組中的消費者, 同一時刻只能有一個消費者消費。
- 案例實操
在 hadoop102、 hadoop103 上修改/opt/module/kafka/config/consumer.properties 配置
檔案中的 group.id 屬性為任意組名。
[atguigu@hadoop103 config]$ vi consumer.properties
group.id=atguigu
- 在 hadoop102、 hadoop103 上分別啟動消費者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties [atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
- 在 hadoop104 上啟動生產者
[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic first >hello world
檢視 hadoop102 和 hadoop103 的接收者 同一時刻只有一個消費者接收到訊息。
-
環境準備
啟動 zk 和 kafka 叢集,在 kafka 叢集中開啟一個消費者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first
- 匯入 pom 依賴
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>0.11.0.0</version> </dependency> </dependencies>
- 建立生產者(新 API)
package com.atguigu.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class NewProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka服務端的主機名和埠號 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本節點的應答 props.put("acks", "all"); // 訊息傳送最大嘗試次數 props.put("retries", 0); // 一批訊息處理大小 props.put("batch.size", 16384); // 請求延時 props.put("linger.ms", 1); // 傳送快取區記憶體大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i)); } producer.close(); } }
- 建立生產者帶回調函式(新API)
package com.atguigu.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CallBackProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka服務端的主機名和埠號 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本節點的應答 props.put("acks", "all"); // 訊息傳送最大嘗試次數 props.put("retries", 0); // 一批訊息處理大小 props.put("batch.size", 16384); // 增加服務端請求延時 props.put("linger.ms", 1); // 傳送快取區記憶體大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata != null) { System.err.println(metadata.partition() + "---" + metadata.offset()); } } }); } kafkaProducer.close(); } }
- 自定義分割槽生產者
0)需求:將所有資料儲存到topic的第0號分割槽上
1)定義一個類實現Partitioner介面,重寫裡面的方法(過時API)
package com.atguigu.kafka; import java.util.Map; import kafka.producer.Partitioner; public class CustomPartitioner implements Partitioner { public CustomPartitioner() { super(); } @Override public int partition(Object key, int numPartitions) { // 控制分割槽 return 0; } }
- 自定義分割槽(新API)
package com.atguigu.kafka; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; public class CustomPartitioner implements Partitioner { @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 控制分割槽 return 0; } @Override public void close() { } }
- 在程式碼中呼叫
package com.atguigu.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class PartitionerProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka服務端的主機名和埠號 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本節點的應答 props.put("acks", "all"); // 訊息傳送最大嘗試次數 props.put("retries", 0); // 一批訊息處理大小 props.put("batch.size", 16384); // 增加服務端請求延時 props.put("linger.ms", 1); // 傳送快取區記憶體大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 自定義分割槽 props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner"); Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>("first", "1", "atguigu")); producer.close(); } }
- 高階API
在控制檯建立傳送者
[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic first >hello world
建立消費者(過時API)
package com.atguigu.kafka.consume; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class CustomConsumer { @SuppressWarnings("deprecation") public static void main(String[] args) { Properties properties = new Properties(); properties.put("zookeeper.connect", "hadoop102:2181"); properties.put("group.id", "g1"); properties.put("zookeeper.session.timeout.ms", "500"); properties.put("zookeeper.sync.time.ms", "250"); properties.put("auto.commit.interval.ms", "1000"); // 建立消費者聯結器 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); HashMap<String, Integer> topicCount = new HashMap<>(); topicCount.put("first", 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount); KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println(new String(it.next().message())); } } }
官方提供案例(自動維護消費情況)(新API)
package com.atguigu.kafka.consume; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class CustomNewConsumer { public static void main(String[] args) { Properties props = new Properties(); // 定義kakfa 服務的地址,不需要將所有broker指定上 props.put("bootstrap.servers", "hadoop102:9092"); // 制定consumer group props.put("group.id", "test"); // 是否自動確認offset props.put("enable.auto.commit", "true"); // 自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定義consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消費者訂閱的topic, 可同時訂閱多個 consumer.subscribe(Arrays.asList("first", "second","third")); while (true) { // 讀取資料,讀取超時時間為100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
低階API
實現使用低階API讀取指定topic,指定partition,指定offset的資料。
1)消費者使用低階API 的主要步驟:
步驟 |
主要工作 |
1 |
根據指定的分割槽從主題元資料中找到主副本 |
2 |
獲取分割槽最新的消費進度 |
3 |
從主副本拉取分割槽的訊息 |
4 |
識別主副本的變化,重試 |
2)方法描述:
findLeader() |
客戶端向種子節點發送主題元資料,將副本集加入備用節點 |
getLastOffset() |
消費者客戶端傳送偏移量請求,獲取分割槽最近的偏移量 |
run() |
消費者低階AP I拉取訊息的主要方法 |
findNewLeader() |
當分割槽的主副本節點發生故障,客戶將要找出新的主副本 |
3)程式碼:
package com.atguigu; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.BrokerEndPoint; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; public class SimpleExample { private List<String> m_replicaBrokers = new ArrayList<>(); public SimpleExample() { m_replicaBrokers = new ArrayList<>(); } public static void main(String args[]) { SimpleExample example = new SimpleExample(); // 最大讀取訊息數量 long maxReads = Long.parseLong("3"); // 要訂閱的topic String topic = "test1"; // 要查詢的分割槽 int partition = Integer.parseInt("0"); // broker節點的ip List<String> seeds = new ArrayList<>(); seeds.add("192.168.9.102"); seeds.add("192.168.9.103"); seeds.add("192.168.9.104"); // 埠 int port = Integer.parseInt("9092"); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // 獲取指定Topic partition的元資料 PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { System.out.println("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { System.out.println("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (a_maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); } FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for // the last element to reset readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) consumer.close(); } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give // ZooKeeper a second to recover // second time, assume the broker did recover before failover, // or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { Thread.sleep(1000); } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == a_partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { m_replicaBrokers.clear(); for (BrokerEndPoint replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); } } return returnMetaData; } }