1. 程式人生 > 實用技巧 >Kafka 叢集部署

Kafka 叢集部署

  • Kafka 叢集部署

jar 包下載 http://kafka.apache.org/downloads.html

Kafka 叢集部署

  • 解壓安裝包
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
  • 修改解壓後的檔名稱
[atguigu@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
  • 在/opt/module/kafka 目錄下建立 logs 資料夾
[atguigu@hadoop102 kafka]$ mkdir logs
  • 修改配置檔案
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vi server.properties
  • 輸入以下內容:
#broker 的全域性唯一編號,不能重複
broker.id=0
#刪除 topic 功能使能
delete.topic.enable=true
#處理網路請求的執行緒數量
num.network.threads=3
#用來處理磁碟 IO 的現成數量
num.io.threads=8
#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes
=102400 #請求套接字的緩衝區大小 socket.request.max.bytes=104857600 #kafka 執行日誌存放的路徑 log.dirs=/opt/module/kafka/logs #topic 在當前 broker 上的分割槽個數 num.partitions=1 #用來恢復和清理 data 下資料的執行緒數量 num.recovery.threads.per.data.dir=1
#segment 檔案保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連線 Zookeeper 叢集地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

  • 配置環境變數
[atguigu@hadoop102 module]$ sudo vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[atguigu@hadoop102 module]$ source /etc/profile
  • 分發安裝包
[atguigu@hadoop102 module]$ xsync kafka/

注意: 分發之後記得配置其他機器的環境變數

分別在 hadoop103 和 hadoop104 上修改配置檔案/opt/module/kafka/config/server.properties中的 broker.id=1、 broker.id=2

注: broker.id 不得重複

  • 啟動叢集

依次在 hadoop102、 hadoop103、 hadoop104 節點上啟動 kafka

[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &
  • 關閉叢集
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
  • Kafka 命令列操作

  • 檢視當前伺服器中的所有 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
  • 建立 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--create --replication-factor 3 --partitions 1 --topic first

選項說明:
--topic 定義 topic 名
--replication-factor 定義副本數
--partitions 定義分割槽數

  • 刪除 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--delete --topic first

需要 server.properties 中設定 delete.topic.enable=true 否則只是標記刪除或者直接重啟

  • 傳送訊息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
>hello world
>atguigu atguigu
  • 消費訊息
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic first
--from-beginning: 會把 first 主題中以往所有的資料都讀取出來。 根據業務場景選擇是否 增加該配置。

  • 檢視某個 Topic 的詳情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic first
  • 消費者組案例

  • 需求:測試同一個消費者組中的消費者, 同一時刻只能有一個消費者消費。
  • 案例實操
在 hadoop102、 hadoop103 上修改/opt/module/kafka/config/consumer.properties 配置
檔案中的 group.id 屬性為任意組名。

[atguigu@hadoop103 config]$ vi consumer.properties
group.id=atguigu

  • 在 hadoop102、 hadoop103 上分別啟動消費者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first --consumer.config
config/consumer.properties
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181
--topic first --consumer.config config/consumer.properties
  • 在 hadoop104 上啟動生產者
[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
>hello world

檢視 hadoop102 和 hadoop103 的接收者 同一時刻只有一個消費者接收到訊息。

  • 環境準備

啟動 zk 和 kafka 叢集,在 kafka 叢集中開啟一個消費者

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first
  • 匯入 pom 依賴
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>0.11.0.0</version>
    </dependency>
</dependencies>
  • 建立生產者(新 API)
package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NewProducer {

    public static void main(String[] args) {
        
        Properties props = new Properties();
        // Kafka服務端的主機名和埠號
        props.put("bootstrap.servers", "hadoop103:9092");
        // 等待所有副本節點的應答
        props.put("acks", "all");
        // 訊息傳送最大嘗試次數
        props.put("retries", 0);
        // 一批訊息處理大小
        props.put("batch.size", 16384);
        // 請求延時
        props.put("linger.ms", 1);
        // 傳送快取區記憶體大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 50; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
        }

        producer.close();
    }
}
  • 建立生產者帶回調函式(新API)
package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CallBackProducer {

    public static void main(String[] args) {

Properties props = new Properties();
        // Kafka服務端的主機名和埠號
        props.put("bootstrap.servers", "hadoop103:9092");
        // 等待所有副本節點的應答
        props.put("acks", "all");
        // 訊息傳送最大嘗試次數
        props.put("retries", 0);
        // 一批訊息處理大小
        props.put("batch.size", 16384);
        // 增加服務端請求延時
        props.put("linger.ms", 1);
// 傳送快取區記憶體大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        for (int i = 0; i < 50; i++) {

            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (metadata != null) {

                        System.err.println(metadata.partition() + "---" + metadata.offset());
                    }
                }
            });
        }

        kafkaProducer.close();
    }
}
  • 自定義分割槽生產者

    0)需求:將所有資料儲存到topic的第0號分割槽上

    1)定義一個類實現Partitioner介面,重寫裡面的方法(過時API)

package com.atguigu.kafka;
import java.util.Map;
import kafka.producer.Partitioner;

public class CustomPartitioner implements Partitioner {

    public CustomPartitioner() {
        super();
    }

    @Override
    public int partition(Object key, int numPartitions) {
        // 控制分割槽
        return 0;
    }
}
  • 自定義分割槽(新API)
package com.atguigu.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class CustomPartitioner implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {
        
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 控制分割槽
        return 0;
    }

    @Override
    public void close() {
        
    }
}
  • 在程式碼中呼叫
package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class PartitionerProducer {

    public static void main(String[] args) {
        
        Properties props = new Properties();
        // Kafka服務端的主機名和埠號
        props.put("bootstrap.servers", "hadoop103:9092");
        // 等待所有副本節點的應答
        props.put("acks", "all");
        // 訊息傳送最大嘗試次數
        props.put("retries", 0);
        // 一批訊息處理大小
        props.put("batch.size", 16384);
        // 增加服務端請求延時
        props.put("linger.ms", 1);
        // 傳送快取區記憶體大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 自定義分割槽
        props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner");

        Producer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<String, String>("first", "1", "atguigu"));

        producer.close();
    }
}
  • 高階API

在控制檯建立傳送者

[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
>hello world

建立消費者(過時API)

package com.atguigu.kafka.consume;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class CustomConsumer {

    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Properties properties = new Properties();
        
        properties.put("zookeeper.connect", "hadoop102:2181");
        properties.put("group.id", "g1");
        properties.put("zookeeper.session.timeout.ms", "500");
        properties.put("zookeeper.sync.time.ms", "250");
        properties.put("auto.commit.interval.ms", "1000");
        
        // 建立消費者聯結器
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        
        HashMap<String, Integer> topicCount = new HashMap<>();
        topicCount.put("first", 1);
        
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);
        
        KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);
        
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        
        while (it.hasNext()) {
            System.out.println(new String(it.next().message()));
        }
    }
}

官方提供案例(自動維護消費情況)(新API)

package com.atguigu.kafka.consume;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class CustomNewConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        // 定義kakfa 服務的地址,不需要將所有broker指定上 
        props.put("bootstrap.servers", "hadoop102:9092");
        // 制定consumer group 
        props.put("group.id", "test");
        // 是否自動確認offset 
        props.put("enable.auto.commit", "true");
        // 自動確認offset的時間間隔 
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化類 
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 定義consumer 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // 消費者訂閱的topic, 可同時訂閱多個 
        consumer.subscribe(Arrays.asList("first", "second","third"));

        while (true) {
            // 讀取資料,讀取超時時間為100ms 
            ConsumerRecords<String, String> records = consumer.poll(100);
            
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

低階API

實現使用低階API讀取指定topic,指定partition,指定offset的資料。

1)消費者使用低階API 的主要步驟:

步驟

主要工作

1

根據指定的分割槽從主題元資料中找到主副本

2

獲取分割槽最新的消費進度

3

從主副本拉取分割槽的訊息

4

識別主副本的變化,重試

2)方法描述:

findLeader()

客戶端向種子節點發送主題元資料,將副本集加入備用節點

getLastOffset()

消費者客戶端傳送偏移量請求,獲取分割槽最近的偏移量

run()

消費者低階AP I拉取訊息的主要方法

findNewLeader()

當分割槽的主副本節點發生故障,客戶將要找出新的主副本

3)程式碼:

package com.atguigu;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class SimpleExample {
    private List<String> m_replicaBrokers = new ArrayList<>();

    public SimpleExample() {
        m_replicaBrokers = new ArrayList<>();
    }

    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        // 最大讀取訊息數量
        long maxReads = Long.parseLong("3");
        // 要訂閱的topic
        String topic = "test1";
        // 要查詢的分割槽
        int partition = Integer.parseInt("0");
        // broker節點的ip
        List<String> seeds = new ArrayList<>();
        seeds.add("192.168.9.102");
        seeds.add("192.168.9.103");
        seeds.add("192.168.9.104");
        //
        int port = Integer.parseInt("9092");
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // 獲取指定Topic partition的元資料
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5)
                    break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null)
            consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }


    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give
                // ZooKeeper a second to recover
                // second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                    Thread.sleep(1000);
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (BrokerEndPoint replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}