1. 程式人生 > >[kafka掃盲]--(5)kafka原始碼閱讀之模型物件--(1)ProducerRecord.java

[kafka掃盲]--(5)kafka原始碼閱讀之模型物件--(1)ProducerRecord.java

Author:趙志乾
Date:2018-10-21
Declaration:All Right Reserved!!!

ProducerRecord.java

該類的例項用於存放生產者向kafka叢集傳送的單條訊息記錄。其內容如下,共包含6個屬性欄位。其中的topic欄位用於指定該訊息記錄的一級分屬,partition欄位和key欄位用於指定該訊息記錄的二級分屬,value欄位用於存放該訊息記錄的有效載荷,timestamp欄位用於指定時間戳,headers欄位用於指定.......。

topic欄位指定訊息記錄的一級分屬:kafka叢集可以作為訊息匯流排來使用,此時叢集可以接收位於不同系統中的生產者傳送來的不同類別的訊息,而topic欄位便可認為是邏輯層面的一種劃分。

partition欄位和key欄位指定訊息記錄的二級分屬:kafka叢集為了提高訊息投遞和消費的併發度,引入了分割槽概念,對訊息記錄進行二級分屬指定。這兩個欄位屬於可選欄位,不同的組合方式對應不同的劃分策略。

partition和key欄位決定的劃分策略:
    1、如果partiton欄位不空,則直接由該欄位指定具體分割槽,實現分割槽的定投;
    2、如果partition欄位為空,且key欄位不為空,則通過對key值進行hash,來確定訊息記錄應歸屬的二級歸屬;
    3、如果partition和key欄位均為空,則對分割槽進行輪詢投遞,實現負載均衡;

value欄位存放訊息記錄有效載荷:生產者真正要釋出的訊息內容會存放於value欄位;

timestamp欄位指定時間戳:如果使用者沒有指定該欄位,則其會有生產者自動填充。而其最終的值是由topic中指定的時間戳型別來決定的,如果指定為建立時間,則該值由生產者客戶端填寫;如果指定為日誌追加時間,則該值會由broker在將訊息記錄持久化後用當前時間進行覆蓋。

注:在構造該類的例項時,要保證主題不能為空,分割槽號如果設定的話,要保證其值不小於0的整數,而時間戳如果設定的話,也要保證其值為不小於0的整數。

package org.apache.kafka.clients.producer;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;

/*該類的例項用於存放生產者要向kafka叢集釋出的訊息記錄,其包括一個topic名稱,用來給訊息記錄做邏輯層面的一級劃分,即該訊息記錄應該被髮布到kafka叢集的哪個類別(主題)下;一個可選的分割槽號,用來給訊息記錄做邏輯上二級劃分,即該訊息記錄應該被髮布到kafka叢集指定類別下的哪個佇列(分割槽)中;*/
public class ProducerRecord<K, V> {

    //訊息主題
    private final String topic;
    //分割槽號:可選
    private final Integer partition;
    private final Headers headers;
    //key值:可選
    private final K key;
    //value值:生產者要傳送的有效訊息載荷
    private final V value;
    //時間戳
    private final Long timestamp;

    //建立一個訊息記錄例項,所需指定的引數包括:主題、分割槽號、時間戳、key、value和headers
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }


    //建立一個訊息記錄例項,所需指定的引數包括:主題、分割槽號、時間戳、key、value
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }

    //建立一個訊息記錄例項,所需指定的引數包括:主題、分割槽號、key、value、headers
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }
    
    //建立一個訊息記錄例項,所需指定的引數包括:主題、分割槽號、key值和訊息有效載荷value值
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }
    
    //建立一個訊息記錄例項,所需指定的引數包括主題、key值和訊息有效載荷value值
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }
    
    //建立一個訊息記錄例項,所需指定的引數包括:主題和訊息有效載荷value值
    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }

    //返回記錄的主題名稱
    public String topic() {
        return topic;
    }

    public Headers headers() {
        return headers;
    }

    //返回訊息記錄的key值
    public K key() {
        return key;
    }

    //返回訊息記錄的value值
    public V value() {
        return value;
    }

    //返回訊息記錄的時間戳:單位毫秒
    public Long timestamp() {
        return timestamp;
    }

    //返回訊息記錄要被投遞到的分割槽號
    public Integer partition() {
        return partition;
    }

    @Override
    public String toString() {
        String headers = this.headers == null ? "null" : this.headers.toString();
        String key = this.key == null ? "null" : this.key.toString();
        String value = this.value == null ? "null" : this.value.toString();
        String timestamp = this.timestamp == null ? "null" : this.timestamp.toString();
        return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", headers=" + headers + ", key=" + key + ", value=" + value +
            ", timestamp=" + timestamp + ")";
    }

    @Override
    public boolean equals(Object o) {
        if (this == o)
            return true;
        else if (!(o instanceof ProducerRecord))
            return false;

        ProducerRecord<?, ?> that = (ProducerRecord<?, ?>) o;

        if (key != null ? !key.equals(that.key) : that.key != null) 
            return false;
        else if (partition != null ? !partition.equals(that.partition) : that.partition != null) 
            return false;
        else if (topic != null ? !topic.equals(that.topic) : that.topic != null) 
            return false;
        else if (headers != null ? !headers.equals(that.headers) : that.headers != null)
            return false;
        else if (value != null ? !value.equals(that.value) : that.value != null) 
            return false;
        else if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null)
            return false;

        return true;
    }

    @Override
    public int hashCode() {
        int result = topic != null ? topic.hashCode() : 0;
        result = 31 * result + (partition != null ? partition.hashCode() : 0);
        result = 31 * result + (headers != null ? headers.hashCode() : 0);
        result = 31 * result + (key != null ? key.hashCode() : 0);
        result = 31 * result + (value != null ? value.hashCode() : 0);
        result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
        return result;
    }
}

相關推薦

[kafka掃盲]--(5)kafka原始碼閱讀模型物件--(1)ProducerRecord.java

Author:趙志乾 Date:2018-10-21 Declaration:All Right Reserved!!! ProducerRecord.java 該類的例項用於存放生產者向kafka叢集傳送的單條訊息記錄。其內容如下,共包含6個屬性欄位。其中的topic欄

[kafka掃盲]---(7)kafka原始碼閱讀生產者客戶端緩衝池

Author:趙志乾 Date:2018-10-21 Declaration:All Right Reserved!!! BufferPool.java 1、檔案位置: 該檔案在原始碼中的位置:kafka-2.0.0-src/clients/src/main/java/

[kafka掃盲]---(6)kafka原始碼閱讀分割槽器

Author:趙志乾 Date:2018-10-21 Declaration:All Right Reserved!!! DefaultPartitioner.java 該類實現了Partitioner介面,核心方法為partition():用於給未指定分割槽號的訊息記錄

Promise原始碼閱讀建構函式+then過程

前言 Promise是非同步程式設計的一種方案,ES6規範中將其寫入規範標準中,統一了用法。 考慮到瀏覽器的相容性,Vue專案中使用promise,就具體閱讀promise原始碼,看看內部的具體實現。 具體分析 通過具體例項來閱讀promise原始碼的實現,例項如下: new

Netty 原始碼閱讀初始環境搭建

推薦 netty 系列原始碼解析合集 http://www.iocoder.cn/Netty/Netty-collection/?aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3R6c18xMDQxMjE4MTI5L2FydGljbGUvZGV0YWlscy83OD

jdk原始碼閱讀——arraylist

首先看一下他的建構函式: public ArrayList() { this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA; } 其實arraylist還有其他的建構函式,可以指定陣列的長度,這裡先從最基本的入

netty原始碼閱讀效能優化工具類Recycle異執行緒獲取物件

在這篇《netty原始碼閱讀之效能優化工具類之Recycler獲取物件》文章裡面,我們還有一個scavenge()方法沒有解析,也就是在別的執行緒裡面回收物件。下面我們開始介紹,從這個方法開始進入: boolean scavenge() { // con

我的原始碼閱讀路:redux原始碼剖析

前言 用過react的小夥伴對redux其實並不陌生,基本大多數的React應用用到它。一般大家用redux的時候基本都不會單獨去使用它,而是配合react-redux一起去使用。剛學習redux的時候很容易弄混淆redux和react-redux,以為他倆是同一個

Django閱讀模型

模型 模型是您的資料唯一而且準確的資訊來源。它包含您正在儲存的資料的重要欄位和行為。一般來說,每一個模型都對映一個數據庫表。 基礎: 每個模型都是一個 Python 的類,這些類繼承 django.db.models.Model 模型類的每個屬性都相當於一個數據庫的欄位。每個屬性對映

netty原始碼閱讀解碼值基於固定長度解碼器分析

固定長度解碼器FixedLengthFrameDecoder比較簡單,我們看下它類的註釋: /** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes.

netty原始碼閱讀解碼基於長度域解碼器引數分析

這篇文章我們放鬆一點,只分析基於長度域解碼器的幾個引數, lengthFieldOffset :長度域的偏移量,也就是長度域要從什麼地方開始 lengthFieldLength:長度域的長度,也就是長度域佔多少個位元組 lengthAdjustment:長度域的值的調整

netty原始碼閱讀解碼基於長度域解碼器分析

基於長度域解碼器LengthFieldBasedFrameDecoder我們主要分析以下三點: 1、計算需要抽取的資料包的長度 2、跳過位元組邏輯處理 3、丟棄模式下的處理 首先原始碼還是LengthFieldBasedFrameDecoder的decode方法:

netty原始碼閱讀編碼MessageToByteEncoder

MessageToByteEncoder的write過程,我們分析以下幾步: 1、匹配物件 2、分配記憶體 3、編碼實現 4、釋放物件 5、傳播資料 6、釋放記憶體 原始碼在這裡: @Override public void write(Cha

netty原始碼閱讀效能優化工具類FastThreadLocal的使用

先說明FastThreadLocal使用的效果。 1、比jdk原生的ThreadLocal的快 2、不同執行緒之間能保證執行緒安全 這是我們的使用者程式碼: public class FastThreadLocalTest { private static F

netty原始碼閱讀效能優化工具類FastThreadLocal的建立

建立的話我們直接從FastThreadLocal的構造方法進入: public FastThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex(); } 可見他是現

JAVA原始碼閱讀java.util—List

List List被宣告為一個介面,程式碼量很少,只聲明瞭方法。 public interface List<E> extends Collection<E> { int size(); boolean isEmpty(); boo

netty原始碼閱讀效能優化工具類Recycler獲取物件

 Recycler獲取物件主要分為以下幾部分: 1、獲取當前執行緒的Stack 2、從Stack裡面彈出物件 3、如果彈出物件為空,那就建立物件並且繫結到Stack裡面 我們從Recycler的get方法進入,就是這個原始碼: @SuppressWarnin

jdk原始碼閱讀Object類

Object的作用 Object是java所有類的基類,定義了所有類的基礎方法 。這個類所定義的方法也不多,大部分是native方法。 什麼是native方法 native關鍵字標識的java方法為本地方法,底層是有c/c++編寫的程式編譯後dll檔案,jav

JDK原始碼閱讀ArrayList

ArrayList簡介 List 介面的大小可變陣列的實現。實現了所有可選列表操作,並允許包括 null 在內的所有元素。除了實現 List 介面外,此類還提供一些方法來操作內部用來儲存列表的陣列的大小。(此類大致上等同於 Vector 類,除了此類是不同步

JDK原始碼閱讀HashMap

HashMap簡介 基於雜湊表的 Map 介面的實現。此實現提供所有可選的對映操作,並允許使用 null 值和 null 鍵。(除了非同步和允許使用 null 之外,HashMap 類與 Hashtable 大致相同。)此類不保證對映的順序,特別是它不保證該