1. 程式人生 > 實用技巧 >代理模式之動態代理

代理模式之動態代理

1. Kafka簡介


1.1 訊息佇列

1.1.1 訊息佇列簡介

  • 訊息Message:通訊裝置之間傳遞的資料
  • 佇列Queue:一種特殊的線性表(資料元素首尾相連),特殊之處在於只允許在首部刪除元素和在尾部追加元素(fifo)
  • 訊息佇列:訊息+佇列,儲存訊息的佇列,訊息的傳輸過程中的容器,主要提供生產,消費介面供外部呼叫的儲存和獲取.

1.1.2 訊息佇列分類

message queue主要分為兩類:點對點(peer to peer),釋出訂閱

  • 點對點(peer to peer):

    一般基於Pull或者Polling接收資料。傳送到佇列中的訊息被一個而且僅僅一個接收者所接收,即使有多個接收者在同一個佇列中監聽同一訊息;即支援即發即收的訊息傳遞方式,也支援同步請求/應答傳送方式。

  • 釋出訂閱:

    釋出到同一個主題的訊息,可被多個訂閱者所接收。釋出/訂閱即可基於Push消費資料,也可基於Pull或者Polling消費資料

1.1.3 兩種型別的比較

  • p2p模型包括:訊息佇列(Queue),傳送者(Sender),接收者(Receiver);一個生產者只有一個消費者(Consumer)即一旦被消費,訊息就不存在訊息佇列中.
  • pub/Sub包含:訊息佇列(Queue),主題(Topic),釋出者(Publisher),訂閱者(Subscriber);每個訊息可以有多個消費者,彼此互不影響.

1.1.4 訊息系統的使用場景

  1. 解耦

    各系統之間通過訊息系統統一的介面交換資料,無需瞭解彼此的存在

  2. 非同步通訊

    在不需要立即處理請求的場景下,可以將請求放入訊息系統,合適的時候在處理

  3. 峰值處理能力

    訊息系統可頂住峰值流量,業務系統可根據處理能力從訊息系統中獲取並處理對應量的請求

  4. 擴充套件

    訊息系統是統一的資料介面,各系統可獨立擴充套件

  5. 冗餘

    部分訊息系統具有訊息持久化能力,可規避訊息處理前丟失的風險

  6. 可恢復性

    系統中部分鍵失效並不會影響整個系統,它恢復會仍然可從訊息系統中獲取並處理資料

1.1.5 常見的訊息系統

  • RabbitMQ:erlang編寫,支援amqp,xmpp,smtp,stomp。支援負載均衡,資料持久化。同時支援p2p和釋出/訂閱模式。
  • Redis :基於key-value的nosql資料庫,同時支援mq功能,可作輕量級的佇列服務使用。就入隊操作而言,Redis對短訊息(小於10kb)的效能比RabbitMQ好,長訊息效能比RabbitMQ差。
  • ZeroMQ:輕量級,不需要單獨的訊息伺服器或者中介軟體,應用程式本身扮演該角色,Peer-To-Peer。它實質上是一個庫,需要開發人員自己組合多種技術,使用複雜度高。
  • ActiveMQ:JMS實現,p2p,支援持久化,XA(分散式)事務
  • Kafka/Jafka:高效能跨語言的分散式釋出/訂閱訊息系統,資料持久化,全分散式,同時支援線上和離線處理。
  • MetaQ/RocketMQ:純java實現,釋出/訂閱訊息系統,支援本地事務和XA分散式事務。

1.2 kafka簡介

1.2.1 簡介

是分散式的釋出-訂閱訊息系統.由Scala語言編寫。Kafka是一個高吞吐量,持久化的,分散式訂閱訊息系統。主要用於處理活躍live資料(登入,瀏覽,點選,分享,喜歡等使用者行為產生的資料)。 Kafka作為一個叢集,執行在一臺後者多臺伺服器上;kafka通過topic對儲存的流資料進行分類;每條記錄中包含一個key,一個value和一個timestamp(時間戳)

1.2.2 適用場景

  • 構建實時流資料管道,它可以在系統或應用之間可靠的獲取資料(相當於message queue)
  • 構建實時流式應用程式,對這些流資料進行轉換或影響。(即流處理,通過kafka stream topic和topic之間內部進行變化)

1.2.3 三大特點

  1. 高吞吐量:可以滿足每秒百萬級別訊息的生產和消費-生產消費
  2. 持久化:有一套完善的訊息儲存機制,確保資料的高效安全的持久化-中間儲存
  3. 分散式:基於分散式的擴充套件和容錯機制;kafka的資料都會複製到幾臺伺服器上.當某一個伺服器發生故障時,生產者和消費者轉而使用其它的機器-整體健壯性

1.2.4 四大核心API

  1. The Producer API

    允許一個應用程式釋出一串流式的資料到一個或多個Kafka topic

  2. The Consumer API

    允許一個應用程式訂閱一個或者多個topic,並且對釋出給它們的流式資料進行處理

  3. The Streams API

    允許一個應用程式作為一個流處理器,消費一個或者多個topic產生的資料流,然後生產一個輸入流到一個或者多個topic中去,在輸入輸出流中進行有效的轉換

  4. The Connector API

    允許構建並執行可重用的生產者或者消費者,將Kafka topics連線到已存在的應用程式或者資料系統.比如,連線到一個關係型資料庫,捕捉表(table)的所有變更內容

1.2.5 核心概念

  1. 服務組成

    • Topic:主題,Kafka處理的訊息的不同分類
    • Broker:訊息伺服器代理,Kafka叢集中的一個kafka伺服器節點稱為一個broker,主要儲存訊息資料.存於硬碟中
    • Partition:Topic物理上的分組,一個topic在broker中被分為1個或者多個partition,分割槽在建立的時候指定
    • Message:訊息,是通訊的基本單位,每個訊息都屬於一個partition
  2. 服務相關

    • Producer:訊息和資料的生產者,向Kafka的一個topic釋出訊息
    • Consumer:訊息和資料的消費者,定於topic並處理其釋出的資訊
    • Zookeeper:協調kafka的正常執行

2. Kafka的分散式安裝

2.1 安裝

  1. 下載解壓

  2. 新增環境變數

    etc/profile末尾新增:

    export KAFKA_HOME=/usr/lcoal/DevInstall/kafka
    export PATH=$KAFKA_HOME/bin:$PATH
    
  3. 重新整理profile配置

    source /etc/profile

  4. 配置server.properties(/kafka/config)檔案

    ##當前kafka例項的id,必須為整數,一個叢集不可重複
    broker.id=1 
    ## 生產到kafka中的資料儲存的目錄,目錄需要手動建立(kafka-logs是目錄,不是檔案)
    log.dirs=log.dirs=/usr/local/DevInstall/kafka-2.1.1/data/kafka-logs
    ## kafka連線zk的url和kafka資料在zk中的儲存目錄
    zookeeper.connect=172.18.19.129:2181,172.18.19.143:2181,172.18.19.15:2181/kafk
    
  5. 將kafka的安裝檔案同步到其它機器:

    scp -r /path/to/kafka root@ip:/path/to/kafka

  6. 修改其它機器上kafka的server.properties中的broker.id

2.2 啟動

  1. 啟動命令:

    cd kafka
    ./bin/kafka-server-start.sh -daemon ./config/server.properties
    
  2. 檢視是否啟動成功:

    jps
    

    使用jps命令檢視顯示列表有無kafka

  3. 檢視啟動日誌:

    cd /usr/local/DevInstall/kafka-2.1.1/logs
    vim kafkaServer.out
    

2.3 服務測試

在zookeeper中檢視kafka的服務ID:

[zk: 172.18.19.129(CONNECTED) 1] ls /kafka/brokers/ids

控制檯輸出了:

 [1, 2, 3]

證明服務均已註冊上

2.4 目錄詳解

| --kafka
		|--cluster
				|--id	代表的是一個kafka叢集中包含叢集的版本和叢集的id
		|--brokers #伺服器的id,使用get [id] 後顯示連線的封裝資訊
				|--ids	#存放當前kafka的broker例項列表
				|--topics	#當前kafka中的topic列表
				|--seqid	#系統的序列id        
        |--controller :#get /kafka/controller中資訊的brokerid顯示當前的leader
        |--controller_epoch: #代表的是controller的紀元,即表示controller的迭代;每當controller中的brokerid更換一次,controller_epoch就+1
        |--consumers:#老版本用於儲存kafka消費者的資訊,主要儲存對應的offset;新版本中基本不用,此時使用者的消費資訊,儲存在一個系統的topic中:_consumer_offsets
        |--config:#存放配置資訊

3. Kafka的基本操作

3.1 KafKa的topic的操作

topic是kafka的核心概念,用來儲存各種型別的資料。

關於topic操作的指令碼命令:kafka-topics.sh

3.1.1 建立topic

  1. 命令:

    ./bin/kafka-topics.sh --create --topic topic1 --zookeeper dev-local-1:2181,dev-local-2:2181,dev-local-3:2181/kafka --partitions 3 --replication-factor 3
    
  2. 注意:指定副本因子(--replication-factor)的時候,不能超過brokers的數量

3.1.2 檢視topic

  1. 檢視所有的topic命令:

    ./bin/kafka-topics.sh --list --zookeeper dev-local-1:2181,dev-local-2:2181,dev-local-3:2181/kafka
    
  2. 檢視指定topic的詳盡資訊(例如檢視topic1的詳盡資訊)

    ./bin/kafka-topics.sh --zookeeper 172.18.19.129:2181,172.18.19.143:2181,172.18.19.15:2181/kafka --describe --topic topic1
    

    結果顯示:

    Topic:topic1	PartitionCount:3	ReplicationFactor:3	Configs:
    Topic: topic1	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1
    Topic: topic1	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2
    Topic: topic1	Partition: 2	Leader: 2	Replicas: 3,1,2	Isr: 2
    # Partition:當前topic對應的分割槽編號
    # Replicas:副本因子,當前kafka對應的partition所在的broker例項的broker.id
    # Leader:該partition的所有副本中的leader領導者,處理所有kafka該partition讀寫請求
    # Isr:該partition的存活的副本對應的broker例項的broker.id列表
    

3.1.3 修改一個topic

[root@dev-local-3 kafka-2.1.1]./bin/kafka-topics.sh  --alter  --topic topic1 --partitions 4 --zookeeper 172.18.19.129:2181,172.18.19.143:2181,172.18.19.15:2181/kafka

注意:partition個數,只能增加,不能減少

3.1.4 刪除一個topic

[root@dev-local-3 kafka-2.1.1]./bin/kafka-topics.sh  --delete --topic topic1 --zookeeper 172.18.19.129:2181,172.18.19.143:2181,172.18.19.15:2181/kafka

老版本是不能直接刪除topic,除非你配置了delete.topic.enable=true,可以直接刪除掉。若未配置,那麼就不會直接刪除,會做一個標記,表明這個topic不能再用了。在新版本中,不需要這些設定,可直接刪除。

3.1.5 生產資料

生產及消費訊息,預設監聽埠未9092,防火牆需要開放此埠;並在/etc/hosts中配置主機名和對應的IP

./bin/kafka-console-producer.sh --broker-list dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 --topic topic3

3.1.6 消費資料

./bin/kafka-console-consumer.sh --bootstrap-server dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 --topic topic3

若想從頭消費,需要加上引數--from-beginning:

./bin/kafka-console-consumer.sh --bootstrap-server dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 --topic topic3 --from-beginning

指定消費某個分割槽:--partition 0;從什麼位置開始消費(訊息的偏移量)--offset earliest:

./bin/kafka-console-consumer.sh --bootstrap-server dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 --topic topic3 --from-beginning --partition 0 --offset earliest

資料消費的順序不是按照發送訊息的順序;因為生產出的訊息會發送至不同的分割槽。

3.2 Kafka的資料消費總結

​ Kafka消費者在 消費資料的時候,都是分組別的。不同組的消費不受影響。例如在兩個消費者組內,同一時間可以有兩個消費者同時消費一個分割槽的資料。但在同一組內,不會有兩個消費者同時消費一個分割槽的資料。

​ 相同組內的消費,需要注意。若partition有3個,消費者有三個,那麼便是每一個消費者消費其中一個partition對應的資料;若有兩個消費者,此時一個消費者消費其中一個partition資料,另一個消費者消費2個partition的資料。若存在超過3個消費者,同一時間只能最多有3個消費者能消費得到資料。

./bin/kafka-console-consumer.sh \
--bootstrap-server dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 \
--topic topic3 \
--group haha \
--partition 2 \
--offset earliest

--group haha:消費者對應的消費者組。

offset是kafka的topic的partition中的每一條訊息的標誌,如何區分該條訊息在kafka對應的partition的位置,就是用該偏移量。offset的資料 型別是Long,8個位元組長度。offset在分割槽內是有序的,分割槽間是不一定有序。若想要kafka中的資料全域性有序,就只能讓partition個數為1.

​ 在組內,kafka的topic的partition個數,代表了kafka的topic的並行度。同一時間最多可以有多個執行緒來消費topic資料,所以如果想要提高kafka的topic的消費能力,應該增大partition的個數。

3.3 Kafka的程式設計api

3.3.1 建立kafka的專案

  1. 新增依賴:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
</dependency>
  1. 複製linux上kafka/config/producer.properties檔案至專案中並修改和新增如下:
bootstrap.servers=dev-local-1:9092,dev-local-2:9092,dev-local-3:9092
# key和value的序列化,最後面的StringSerializer應該和程式碼中的key value型別一致
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

3.3.2 生產者的api操作

public class ProducerDemo {
    /**
     * kafka的資料是由key,value和timestamp組成
     */
    public static void main(String[] args) {
        Properties properties = new Properties();
        try {
            properties.load(ProducerDemo.class.getClassLoader().getResourceAsStream("producer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("未成功讀取producer.properties檔案");
        }
        // 建立執行入口
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        String topic = "topic3";
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "111");
        // 傳送訊息
        producer.send(producerRecord);

        //釋放資源
        producer.close();

    }

}
  • 建立producer時需要指定的配置資訊
acks=[0|-1|1|all]
# 訊息確認機制
# 0:不做確認,直接傳送訊息即可;1:只需要leader進行訊息確認即可,後期follower可以從leader進行同步;
# -1/all:不僅需要leader進行訊息確認,還需要等待follower進行確認

batch.size=1024 
# 每個分割槽內的使用者快取未傳送record記錄空間的大小
# 若快取區中的資料,沒有沾滿,也就是仍然有未用的空間,那麼也會將請求傳送儲區,為了減少請求次數,可以配置linger.ms大於0

linger.ms=10
# 不管緩衝區是否被沾滿,延遲10ms傳送request

buffer.memory=10240
# 控制的是一個producer中的所有快取空間

retires=0
# 傳送訊息失敗之後的重試次數
  • 開啟冪等性

在producer.properties檔案中新增:enable.idempotence=true

開啟冪等卻並未生效的原因:

其實主要原因在於大資料框架設定問題上:

​ 若有1000w條資料,在其中不斷增加,要保證新增的資料不能重複。

​ 方案1:在新增的時候進行判斷,若該條資料訊息已經存在,直接覆蓋掉對應的資料;

​ 方案2:在新增的時候先不進行判斷,直接進行新增,在後續的操作中滿足條件之後,再進行合併操作。

出於效率,必然選擇方案2

  • 如何保證Kafka的資料的一致性?

答:Kafka生產者可以選擇生產的兩種模式(冪等和事務)

3.3.3 消費者的api操作

入口類:Consumer

  • 配置檔案:
bootstrap.servers=dev-local-1:9092,dev-local-2:9092,dev-local-3:9092

# consumer group id
group.id=group1903

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
# 預設值未latest,即從最新的開始消費
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • 執行程式碼:
public class ConsumerDemo {
    public static void main(String[] args) throws Exception{
        Properties properties = new Properties();
        properties.load(ConsumerDemo.class.getClassLoader().getResourceAsStream("consumer.properties"));

        //構建消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("topic3"));
        System.out.println("topic\tpartition\toffset\tkey\tvalue");
        try {
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.topic()+"\t"+record.partition()+"\t"+record.offset()+"\t"+record.key()+"\t"+record.value());
                }
            }
        } finally {
            consumer.close();
        }


    }
  • 從指定的位置開始消費,需要使用.assign().seek()api:(line8-13)
public class ConsumerDemo2 {
    public static void main(String[] args) throws Exception{
        Properties properties = new Properties();
        properties.load(ConsumerDemo.class.getClassLoader().getResourceAsStream("consumer.properties"));

        //構建消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        TopicPartition partition1 = new TopicPartition("topic3", 0);
        TopicPartition partition2 = new TopicPartition("topic3", 1);
        consumer.assign(Arrays.asList(partition1, partition2));
        System.out.println("topic\tpartition\toffset\tkey\tvalue");
        consumer.seek(partition1,2);
        consumer.seek(partition2,5);
        try {
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.topic()+"\t"+record.partition()+"\t"+record.offset()+"\t"+record.key()+"\t"+record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}
  • offset消費的問題:

若沒有手動管理offset,當程式崩潰時只能從頭開始消費,導致訊息的重複消費或丟失,效率較低。

手動管理offset:消費時將offset存入資料庫。

3.3.4 record進入分割槽的策略

每一條producerRecord都有topic名稱,可選的partition分割槽編號,以及一堆可選的key和value組成。

  • 三種策略進入分割槽:

    • 若指定了partition,那麼直接進入該partition
    • 若沒有指定partition,但指定了key,則使用key的hash選擇partition
    • 若既沒有指定partition,也沒有指定key,則使用輪詢的方式進入partition
  • 自定義分割槽需要實現Partitioner介面

public class RandomPartitions implements Partitioner {

    private Random random = new Random();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 獲取總的分割槽數
        Integer countForTopic = cluster.partitionCountForTopic(topic);
        // 取0到countForTopic之間的隨機數
        int index = random.nextInt(countForTopic);
        return index;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 隨機分割槽方式
public class ProducerDemo2 {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put("partitioner.class", com.kong.partition.RandomPartitions.class);
        properties.load(ProducerDemo2.class.getClassLoader().getResourceAsStream("producer.properties"));
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        String topic = "topic3";

        int start = 100;
        int end = start + 10;
        for (int i = start; i < end; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "random" + i, i + "");
            kafkaProducer.send(record);
        }

        kafkaProducer.close();
    }
}

或者將上面line4刪除,在producer.properties檔案中新增:

## 指定自定義分割槽
partitioner.class=com.kong.partition.RandomPartitions
  • Hash分割槽
/**
 * @author gedachao
 * @description 自定義分割槽之Hash分割槽
 * 計算方法:key.hashValue()%partitionerCount
 * @date 2020-08-03 9:47
 */
public class HashPartitions implements Partitioner {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 獲取topic分割槽的數量
        Integer countForTopic = cluster.partitionCountForTopic(topic);
        if (keyBytes != null) {
            return Math.abs(keyBytes.hashCode()) % countForTopic;
        }
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 輪詢分割槽
public class RoundRobinPartition implements Partitioner {

    private AtomicInteger counter =  new AtomicInteger();
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer countForTopic = cluster.partitionCountForTopic(topic);
        return counter.getAndIncrement() % countForTopic;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 分組分割槽
public class GroupPartition implements Partitioner {
    private HashMap<String,Integer> map = new HashMap<>();
    {
        map.put("java.learn.com",0);
        map.put("ui.learn.com",0);
        map.put("data.learn.com",0);
        map.put("android.learn.com",0);
        map.put("h5.learn.com",0);
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String line = value.toString();
        String[] str = line.split("\\s+");
        try {
            if (str == null || str.length != 2) {
                return 0;
            } else {
                URL url = new URL(str[1]);
                String host = url.getHost();
                return map.getOrDefault(host,0);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

4 Kafka架構之道

4.1 Kafka相關屬於介紹

broker,topic,producer,partition,consumer,consumergroup等等不作介紹

replica

​ 每一個分割槽,根據副本因子N,會有N個副本。比如broker1上有一個topic,分割槽為topic-1,副本因子為2,那麼在兩個broker的資料目錄裡,都都有一個topic-1,其中一個是leader,一個follower。

Segment

​ partition物理上由多個segment組成,每個Segment存著message資訊。

Leader

​ 每個partition有多個副本,其中有且僅有一個Leader,Leader是當前負責資料的讀寫的partiton。

Follower

​ Follower跟隨Leader,所有寫請求都通過Leader路由,資料變更會廣播給所有Follower,Follower於Leader保持資料同步。若Leader失效,則從Follower中選舉出一個新的Leader。當Follower於Leader掛掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)列表中刪除,重新建立一個Follower。

Offset

​ Kafka的儲存檔案都是按照offset.log來命令,用offset作名字的好處是方便查詢。存放於~/kafka/data/kafka-log/topic-name/

4.2 Kafka的架構

​ 通常,一個典型的kafka叢集中包含若干Producer(可以是web前端產生的Page View,或者是伺服器日誌,系統CPU,Memory等),若干broker(Kafka支援水平擴充套件,一般broker數量越多,叢集吞吐率越高),若干Consumer Group,以及一個Zookeeper叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將訊息釋出到broker,Consumer使用Pull模式從broker訂閱並消費資訊。注意:follower同步不是本機同步,而是跨節點同步。比如上圖中的broker0-partition1為follower,它將同步的是broker1-partition1(leader),而不是本地(broker0)中的leader。

4.3 Kafka的分散式模型

​ kafka分散式主要是指分割槽被分佈在多臺server(broker)上,同時每個分割槽都有leader和follower(不是必須),leader負責處理,follower負責同步。leader和follower之間身份可互相轉化,形成分散式模型。

​ kafka的分割槽日誌(message)被分佈在kafka叢集的伺服器上,每一個伺服器處理資料和共享分割槽請求。每個伺服器處理和共享分割槽請求。每個分割槽是被複制到一系列配置好的伺服器上來進行容錯。

​ 每個分割槽有一個server節點來作為leader和零個或者多個server接待你來作為followers。leader處理指定分割槽的所有讀寫請求,同時follower被動複製leader。若leader失敗,followers中的一個將會自動變成一個新的leader。每個伺服器都能作為分割槽的一個leader和作為其它分割槽的follower,因此kafka叢集能被很好的平衡。kafka叢集是一個去中心化的叢集。

​ kafka消費的並行度就是kafka topic分割槽的個數,或者分割槽的個數決定了同一時間同一消費者組內最多可以有多少個消費者消費資料。

4.4 Kafka的檔案儲存

  • 在kafka叢集中,分單個broker和多個broker。每個broker中有多個topic,topic數量可以自己設定。在每個topic中又有0到多個partition,每個partition為一個分割槽。kafka分割槽命名規則為topic的名稱+有序序號,這個序號從0開始一次增加。

  • 每個partition中有多個segment file。建立分割槽時,預設會生成一個segment file,kafka預設每個segment file的大小是1G。當生產者往partition中儲存資料時,記憶體中已滿,就會向segment file裡重新整理。在儲存資料時,會生成一個segment file,當這個segment file到1G之後,在生成第二個segment file,以此類推。每個segment file 對應兩個檔案,分別是以.log結尾的資料檔案和以.index結尾的索引檔案。在伺服器上,每個partition是一個目錄,每個segment是分割槽目錄下的一個檔案。

  • 每個segment file也有自己的命名規則,每個名字有20個字元,不夠用0填充。每個名字從0開始命名,下一個segment file檔案的名字就是,上一個segment file中最後一條訊息的索引值。在.index檔案中,儲存的是key-value格式的,key代表在.log中按順序開始的第n條訊息,value代表該訊息的位置偏移。但是在.index中不是對每條訊息都做記錄,它是每隔一些訊息記錄一次,避免佔用太多記憶體。即使訊息不在index記錄中,在已有的記錄中查詢,範圍也大大縮小了。.index中存放的訊息索引是一個稀疏索引列表

  • Kafka中的訊息是以topic進行分類的,生產者和消費者都是面向topic。topic是邏輯上的概念,而partition是物理上的概念,每個 partition對應於一個log檔案,該log檔案中儲存的就是producer生產的資料。Producer生產的資料會不斷追加到該log檔案末端,且每條資料都有自己的offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset,以便出錯恢復時,從上次的位置繼續消費。

  • offset由Consumer自身維護。在0.8版本之前,offset儲存於zookeeper上;在0.9之後,offset儲存在Kafka內建的topic中,這個topic是專門用來儲存offset,無需我們建立,kafka會自動建立。offset每個分割槽都是獨立的,每個消費者的每次消費只能消費一個分割槽。消費順序:不能保證全域性是有序的,但是可以保證分割槽內部是有序的。

4.5 topic中的partition

為什麼要分割槽?

​ 可以想象,若一個topic就一個分割槽,要是這個分割槽有1T的資料,那麼kafka就想把大檔案劃分到更多的目錄來管理,即Kafka所謂的分割槽。

分割槽的好處:

  1. 方便在叢集中擴充套件。因為一個topic由一個或者多個partition構成,而每個節點中通常可以儲存多個partition,這樣就方便分割槽儲存與移動,也就增加其擴充套件性。同時也可以增加其topic的數量。

  2. 可以提高併發。因為一個topic多個partition,而每個主題讀寫資料時,其實就是讀寫不同的partition。

4.6 partition中檔案儲存

  • 每個分割槽一個目錄,該目錄中是一堆segment file(預設一個segment是1G),該目錄和file都是物理儲存與磁碟。
  • 每個partion(目錄)相當於一個巨型檔案被平均分配到多個大小相等segment(段)資料檔案中。但每個段segment file 訊息數量不一定相等,這種特性方便old segment file 快速被刪除。
  • 每個partition 只需要支援順序讀寫就行了,segment檔案生命週期由服務端配置引數決定。由於使用順序讀寫,使得Kafka雖然將資料儲存於物理記憶體中,但其存取速度可媲美對記憶體的讀寫。(Kafka不支援隨機讀寫)。
  • 這樣做的好處就是能快速刪除無用檔案,有效提高磁碟利用率。

4.7 Kafka分割槽中的segment

​ 由於生產者生產的訊息會不斷追加到log檔案末尾,為防止log檔案過大導致資料定位效率低下,Kafka採取了分片和索引機制,將每個partition分為多個segment。每個segment對應兩個檔案(.index和.log)。這些檔案位於一個資料夾下,該資料夾的命名規則為:topic名稱+分割槽序號。例如:first這個topic有3個分割槽,則其對應的資料夾有first-0,first-1,first-2。index和log檔案以當前segment的第一條訊息的offset命名。

  • segment file組成

由2個部分組成,分別為index file 和 log file(即資料檔案)。這兩個檔案一一對應,成對出現。它們分別表示segment索引檔案、資料檔案。

  • segment 檔案命名規則

partition全域性的第一個segment從0開始,後續每個segment檔名為上一個segment檔案最後一條訊息的offset值。數值最大為64位long大小,20位數字字元長度,不夠的左邊用0填充。

4.8 Kafka中訊息查詢流程

舉例

查詢offset=23066的message,需要通過以下兩個步驟完成:

第一步查詢segment file:

00000000000000000000.index
00000000000000000000.log
00000000000000023060.index
00000000000000023060.log

​ 根據.index和.log物理結構對應關係圖可知,其中00000000000000000000.index表示最開始檔案,起始偏移量(offset)為0。第二個檔案00000000000000023060.index的訊息起始偏移量為23060。同樣,其它後續檔案以此類推,以起始偏移量命名並排序這些檔案,只要根據offset二分查詢檔案列表,就可以快速定位到具體檔案。

​ 當offset=23060時定位到00000000000000023060.index和log檔案。

第二步:通過segment file 查詢message:

​ 通過第一步定位到segment file ,當offset=23060時,依次定位到00000000000000023060.index的元資料物理位置和00000000000000023060.log的物理偏移地址,然後通過00000000000000023060.log順序查詢直到offset=23066為止。 若.index中的偏移量沒有此offset,則先找到小於此offset的索引,然後通過二分查詢找到確切位置。

​ segment index file 採取稀疏索引儲存方式,即<偏移量、位置>,它減少索引檔案大小,通過map可以直接記憶體操作,稀疏索引為資料檔案的每個對應message設定一個元資料指標,它比稠密索引節省了更多的儲存空間,但查詢起來需要消耗更多的時間。

4.9 Consumer Group架構

consumer group是kafka提供的可擴充套件具有容錯性的消費機制。既然是一個組,那麼組內必然可以有多個消費者或者消費者例項(cnsumer instance),它們共享一個公共的id,即group id。組內的所有消費者協調在一起來消費訂閱主題(subscribed topics)的所有分割槽(partition)。當然,每個分割槽只能由同一個消費組內的一個consumer來消費。3個特性:

  • consumer group 下可以有一個或多個consumer instance,consumer instance 可以是一個程序,也可以是一個執行緒。
  • group.id 是一個字串,唯一標識一個consumer group
  • consumer group 下訂閱的topic 下的每個分割槽只能分配給某個group 下的一個consumer(當然該分割槽還可以被分配給其它group ),即組合組之間的消費者不受彼此影響。

4.10 offset的維護

​ 由於consumer在消費過程中可能會出現斷電宕機等故障,consumer恢復後,需要從故障前的位置繼續消費,所以consuemr需要實時記錄自己消費到了哪個offset,以便故障恢復後繼續消費。

​ Kafka預設是定期自動提交位移的(enable.auto.commit=true),當然也可以手動提交位移實現自己的控制。另外Kafka會定期把group 消費情況儲存起來,做成一個offset map。

兩種提交方式詳解:

  • 自動提交。設定enable.auto.commit=true,更新的頻率根據引數auto.commit.interval.ms來定。這種方式也成為at most once,fetch到訊息後就可以更新offset,無論是否消費成功。預設就是true。
  • 手動提交。設定enable.auto.commit=false,這種方式稱為at least once。fetch到訊息後,等消費完成再呼叫方法consumer.commitSync(),手動更新offset;若消費失敗,則offset也不會更新,此條訊息會被重複消費一次。

4.11 Kafka中Push 和 Pull

​ 一個較早的問題是我們應該考慮消費者從broker中Pull資料還是broker將資料push給消費者。kafka遵守傳統設計和借鑑很多訊息系統,這兒kafka選擇producer向broker去push訊息,並由consumer從broker pull訊息。一些ogging-centric system,例如Facebook的Scribe和Cloudera的Flume,採用不同的push模式。事實上,push模式和pull模式各有優劣。

  • push模式很難適用消費速率不同的消費者,因為訊息傳送速率是由broker決定的。push模式的目標是儘可能以最快速度傳遞訊息,但是這樣很容易造成consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費訊息。
  • pull模式的不足之處是,若kafka沒有資料,消費者可能會陷入迴圈中,一直返回空資料。針對這一點,Kafka的消費者再消費資料時會傳入一個時長引數timeout,若當前沒有資料可供消費,consumer會等待一段時間之後返回,這段時間為timeout。(timeout官方案例為100毫秒)

4.12 Kafka中資料傳送保障

​ 為保證producer傳送的資料,能可靠的傳送到指定的topic,topic的每個partition收到producer傳送的資料後,都需要向producer傳送ack(acknowledgement確認收到),若producer收到ack,就會進行下一輪的傳送,否則重新發送資料。

  • 何時傳送ack?

確保有follower與leader同步完成,leader再發送ack,這樣才能保證leader掛掉之後,能在follower中選舉出新的leader。

  • 多少個follower同步完成之後傳送ack?
  1. 半數以上的follower同步完成,即可傳送ack
  2. 全部的follower同步完成,即可傳送ack

副本資料同步策略:

方案 優點 缺點
半數以上完成同步,就傳送ack 延遲低 選舉新的leader時,容忍n臺節點的故障,需要2n+1個副本
全部完成同步,才傳送ack 選舉新的leader時,容忍n臺節點的故障,需要n+1個副本 延遲高

kafka選擇了第二種方案,理由如下:

  1. 同樣為了容忍n臺節點的故障,第一張方案需要2n+1個副本,而第二種方案只需要n+1個副本,而Kafka的每個分割槽都有大量的資料,第一張方案會造成大量資料的冗餘。
  2. 雖然第二種方案的網路延遲較高,但網路延遲對Kafka的影響較小
  • ISR

採用第二種方案之後,設想以下情景:leader收到資料,所有follower都開始同步資料,但有一個follower因為某種故障,遲遲不能與leader進行同步,那麼leader就要一直等下去,直到它完成同步,才能傳送ack。如何解決此問題?

答:Leader維護了一個動態的in-sync replica set(ISR),意為和leader保持同步的follower集合。當ISR中的follower完成資料的同步之後,leader就會給follower傳送ack。若follower長時間未向leader同步資料,則該follower將被踢出ISR,該事件閾值由replica.lag.time.max.ms 引數設定。Leader發生故障之後,就會從ISR中選舉新的leader。

注:

  • 生產者傳送到特定主題分割槽的訊息是按照發送的順序來追加。也就是說,若訊息M1和訊息M2由相同的生產者傳送,並且M1是先發送的,那麼M1的偏移量將比M2低,並出現在日誌的前面。
  • 消費者是按照儲存在日誌中記錄順序來查詢訊息。
  • 對於具有n個副本的主題,我們將容忍最多N-1個伺服器失敗故障,從而不會丟失提交到日誌的任何訊息記錄。

4.13 ack應答機制

對於某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丟失,所以沒必要等ISR中的follower全部接收成功。

所以Kafka為使用者提供了三種可靠性級別,使用者根據可靠性和延遲的要求進行權衡,選擇以下的配置。

ack可以在kafka中進行配置acks引數配置

ack 說明
acks=0 producer不等待broker的ack,這樣的操作是一種低延遲的操作,broker一接收到但還沒寫出成功就返回,當broker故障的時候可能會出現資料丟失(相當於非同步傳送)
acks=1 producer等待broker的ack,partition的leader將資料落地磁碟後返回ack,若在follower同步成功之前leader故障,此時資料會丟失。(此時Follower需要去同步訊息,但是leader已宕機,Kafka叢集就要重新進行leader選舉,但原leader資料沒有同步,所以會出現資料丟失問題)
acks=-1(all) producer等待broker的ack,partition的leader和follower完成全部的資料同步才返回ack(即會觸發ISR),但若在follower同步完成之後,broker傳送ack之前,leader宕機,會重複資料。
  • 故障處理細節

LEO(Log End Offset)每一個副本最後一個offset的位置,這裡可以發現每天一個副本的LEO不一樣,是因為每個副本的同步速度都不同。

HW(High watermark)即水位,leader中的HW是所有副本中最小的LEO。

這裡可以看到HW後的資料還沒有進行完全同步,即說明HW後的位置是可以變動的,因為還沒有提交,所以在HW之前的資料對Consuemer可見,之後的資料Consumer是不可見的,並且Consumer只能消費HW之前的資料。

  1. follower故障

    follower發生故障後會被臨時踢出ISR,待該follower恢復後,follower會讀取本地磁碟記錄的上次的HW,並將log檔案高於HW的部分擷取掉,從HW開始向leader進行同步。等待follower的LEO大於等於該Partition的HW,即follower追上leader之後,就可以重新加入ISR了。

  2. leader故障

    leader發生故障之後,會從ISR中選出一個新的leader,之後,為保證多個副本之間的資料一致性,其餘的follower會先將各自的log檔案高於HW的部門擷取掉,然後從新的leader同步資料。

注意:這隻能保證副本間資料的一致性,並不能保證資料不丟失或者不重複

4.14 Exactly Once(一次正好)語義

對於某些重要的訊息,需要保證exactly once語義,即保證每條訊息被髮送且僅被髮送一次。

在0.11版本之後,Kafka引入了冪等性機制(idempotent),配合acks=-1時的at least once(最少一次)語義,實現了producer到broker的exactly once 語義。

使用時,只需將enable.idempotence屬性設定為true(在生產者位置),kafka自動將acks屬性設定為-1。

  • 何為冪等性

簡單的說1的幾次冪都等於1,也就是說一條訊息無論傳送幾次都只算一次,無論多少條訊息但只例項化一次。

Kafka完成冪等性其實就是給訊息添加了唯一的id,這個id的組成是PID(ProducerID)這樣保證每一個Producer傳送的時候是唯一的,還會為Producer中每條訊息新增一個訊息ID,即當前Producer中生產的訊息會加入Producer的ID和訊息的ID,這樣就能保證訊息唯一性了,這個訊息傳送到Kafka中的時候暫時快取ID,寫入資料後沒有收到ack,那麼會重新發送這個訊息,新訊息過來的時候會和快取中ID進行比較,若發現已經存在就不會再次接受了。

  • 詳細解析

​ 為實現producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。

​ PID:每個新的producer在初始化的收會被分配一個新的pid,這個pid對使用者是不可見的。

​ Sequence Number。對於每個PID,該Producer傳送資料的每個<Topic,Partition>都對應一個從0開始單調遞增的Sequence Number。

​ Kafka可能存在多個生產者,會同時產生訊息,但對Kafka來說,只需要保證每個生產者內部的訊息冪等就可以了,所以引入了PID來標識不同的生產者。

​ 對於Kafka來說,要解決的是生產者傳送訊息的冪等問題。即需要區分每條訊息是否重複。

​ Kafka通過為每條訊息增加一個Sequence Number,通過Sequence Number來區分每條訊息。每條訊息對應一個分割槽,不同的分割槽產生的訊息不可能重複。所有Sequence Number對應每個分割槽。

​ Broker端在快取中儲存了這seq number,對於接受的每條訊息,若其序號比Broker快取中序號大於1則接收它,否則將其丟棄。這樣就可以實現訊息重複提交了。但是,只能保證單個Producer對於同一個<Topic,Partition>的Exactly Once語義。不能保證同一個Producer一個topic不同的partition冪等。

4.15 Zookeeper在Kafka中的作用

​ Kafka叢集中有一個broker會被選舉為Controller,負責管理叢集broker的上下線,所有topic的分割槽副本分配和leader選舉等工作。

  • broker啟動時會向Zookeeper註冊一個臨時節點/controller,此時誰先到就是controller leader。

  • broker還會向Zookeeper註冊一個/brokers/ids/0,1,2;kafkaController會監聽/brokers/ids/0,1,2

  • 當建立topic的時候會向Zookeeper中註冊/brokers/topics/first/partitions/0/state

  • 節點中的資料是“leader”:0,“isr”:[0,1,2];這裡的0對應的是brokere的序號leader誕生

  • 當leader宕機,因kafkaController監控著/brokers/ids/0,1,2節點並更改資訊為[1,2]

  • KafkaController有MetadataCache資訊,所以知道誰是leader,此時會向"leader":0,"isr":[0,1,2]獲取ISR進行重新選舉leader,選舉後KafkaController更新leader及ISR。

    ​ 只有KafkaController Leader會向zookeeper上註冊Watcher,其它broker幾乎不用監聽zookeeper的狀態變化。

    ​ Kafka叢集中多個broker,有一個會被選舉為controller leader(誰先到就是誰),負責管理整個叢集中分割槽和副本的狀態,比如partition的leader副本故障,由controller負責為該partition重新選舉新的leader副本;當檢測到ISR列表發生變化,有controller通知叢集中所有broker更新其MetadataCache資訊;或者增加某個topic 分割槽的時候也會由controller管理分割槽的重新分配工作。

    ​ 當broker啟動的時候,都會建立KafkaController物件,但是叢集中只能有一個leader對外提供服務,這些每個節點上的KafkaController會在指定的zookeeper路徑下建立臨時節點,只有第一個成功建立的節點的KafkaController才可以稱為leader,其餘的都是follower。當leader發生故障後,所有的follower會收到通知,再次競爭在該路徑下建立節點從而選舉新的leader。

6 Kafka的Log

6.1 kafka的log

6.1.1 日誌結構

6.1.2 Kafka的log寫

日誌允許序列附加,總是附加到最後一個檔案。當該檔案達到可配置的大小(比如1GB)時,就會將其重新整理到一個新檔案。日誌採用兩個配置引數:M和S。前者給出在強制OS將問價重新整理到磁碟之前要寫入的訊息數量(條數),後者給出多少秒之後被強制重新整理。這提供了一個永續性保證,在系統崩潰的情況下最多丟失M條訊息或S秒的資料。

6.1.3 Kafka的log讀

  1. 讀取的實際過程是:首先根據offset 去定位資料檔案中log segment檔案,然後從全域性的offset值中計算指定檔案offset,然後從指定檔案offset 讀取訊息。查詢使用的是二分查詢(基於快排對segment檔名進行排序),每一個檔案的範圍都被維護到記憶體中。
  2. 讀取是通過提供訊息的64位邏輯偏移量(8位元組的offset)和s位元組的最大塊大小來完成。
  3. 讀取將返回一個迭代器包含有s位元組的緩衝區,緩衝區中含有訊息。S位元組應該比任何單個訊息都大,但是在出現異常大的訊息時,可以多次重試讀取,每次都將緩衝區大小加倍,知道成功讀取訊息為止。
  4. 可以指定最大的訊息和緩衝區大小,以使伺服器拒絕的訊息大於某個大小,併為客戶機提供其獲得完整訊息所需的最大讀取量。

6.1.4 Kafka的log的刪除

通過設定引數log.retention.hours=xxx即可

6.1.5 Kafka的log保障

  1. 日誌提供了一個配置引數M,該引數控制在強制重新整理磁碟之前寫入的訊息的最大數量(M條)。
  2. 啟動日誌恢復去處理最近訊息在總訊息中是否有效,使用crc32來校驗,若訊息長度和offset總和小於檔案長度且crc32和儲存的訊息能匹配上,則表示有效。

請注意:

必須處理兩種型別的損壞:中斷(由於崩潰而丟失未寫的塊)和損壞(向檔案新增無意義塊)。