1. 程式人生 > 其它 >演算法 | 排序演算法圖形化比較:快速排序、插入排序、選擇排序、氣泡排序

演算法 | 排序演算法圖形化比較:快速排序、插入排序、選擇排序、氣泡排序

Kafka

1. 非同步通訊原理

1.1 觀察者模式

觀察者模式,又交發布-訂閱模式;定義物件間一種一對多的依賴關係,使得當一個物件改變狀態,則所有依賴於它的物件都會得到通知並自動更新

一個物件(目標物件)的狀態發生改變,所有的依賴物件(觀察者物件)都會得到通知

現實中的應用場景: 線上購物的到貨通知,降價通知 ,就是這些使用者訂閱了這些 貨物的狀態變更訊息,一旦狀態變更,會主動把這些訊息推送給這些訂閱的使用者。

1.2 生產者消費者模型

傳統模式:生產者直接將訊息傳遞給指定的消費者,缺點是耦合性特別高,當生產者或者消費者發生變化,都需要重寫業務邏輯。

生產者消費者模式:

通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊

在生產者消費者模式中,可以有n 個執行緒進行生產,同時 m 個執行緒進行消費,兩種角色通過記憶體緩衝區進行通訊,生產者負責向緩衝區裡面新增資料單元,消費者負責從緩衝區裡面取出資料單元, 其快取區一般是一個佇列,遵循 先進先出 的原則,即先到的資料訊息先被消費掉。

優點:

  1. 生產者和消費者解耦,如果讓生產者和消費者直接呼叫對方的某個方法,那麼生產者和消費者之間必然形成依賴。

  2. 支援併發。生產者直接呼叫消費者的某個方法過程中函式呼叫是同步的,萬一消費者處理資料很慢,生產者就會白白糟蹋大好時光。

  3. 支援忙閒不均,削峰填谷。如果生產者生產資料的速度時快時慢,快取區的好處就體現出來了,當生產者產生資料快,消費者來不及處理,未處理的資料可以暫時存放在緩衝區,當生產者的生產資料速度慢下來,消費者再慢慢處理掉生產的資料。

資料單元

  1. 資料單元必須關聯到某種業務物件,資料單元和業務物件應該處於一對一或者一對多的關係,如果資料單元的顆粒度太小,會增加資料傳輸的次數,顆粒度太大會增加單個數據單元傳輸的時間,影響後期消費。

  2. 在傳輸過程中,要保證資料單元的完整性

  3. 各個資料單元之間沒有互相依賴,某個資料單元傳輸失敗不應該影響已經完成傳輸的單元,,也不影響尚未完成傳輸的單元

2. 訊息系統原理

一個訊息系統負責將資料從一個應用傳遞到另外一個應用,應用只需關注於資料,無需關注資料在兩個或多個應用間是如何傳遞的。

2.1 點對點訊息傳遞

在點對點訊息系統中,訊息持久化到一個佇列中。此時,將有一個或多個消費者消費佇列中的資料。但是一條訊息只能被消費一次。當一個消費者消費了佇列中的某條資料之後,該條資料則從訊息佇列中刪除。該模式即使有多個消費者同時消費資料,也能保證資料處理的順序。基於推送模型的訊息系統,由訊息代理記錄消費狀態

訊息代理將訊息推送(push)到消費者後,標記這條訊息為已經被消費,但是這種方式無法很好

地保證消費的處理語義。

2.2 釋出訂閱訊息傳遞

在釋出-訂閱訊息系統中,訊息的生產者稱為釋出者,消費者稱為訂閱者。

訊息被持久化到一個topic中,消費者可以訂閱一個或多個topic,消費者可以消費該topic中所有的資料,同一條資料可以被多個消費者消費,資料被消費後不會立馬刪除。

Kafka 採取拉取模型(Poll),由自己控制消費速度,以及消費的進度,消費者可以按照任意的偏移量

進行消費。

主流的訊息中介軟體有:

ActiveMQ

RocketMQ

RabbitMQ

KAFKA

Redis 也可以做為訊息中介軟體,但是Redis 在資料量超過10k 之後效能很慢,並且基於記憶體的資料儲存有可能丟失資料

3. kafka 簡介

Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。

Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者在網站中的所有動作流資料。

3.1 設計目標

  1. 以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間的訪問效能。

  2. 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條訊息的傳輸。

  3. 支援Kafka Server間的訊息分割槽,及分散式消費,同時保證每個partition內的訊息順序傳輸。

  4. 同時支援離線資料處理和實時資料處理。

  5. 支援線上水平擴充套件

3.2 Kafka 的優點

  1. 解耦。

  2. 冗餘。

  1. 擴充套件性

  1. 靈活性&峰值處理能力

  1. 可恢復性

  1. 順序保證

  2. 緩衝

  1. 非同步通訊

4.Kafka 系統架構

4.1. Broker

Kafka 叢集包含一個或多個伺服器,伺服器節點稱為broker。

4.2. Topic

每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。類似於資料庫的表名 或者ES的 Index

物理上不同 Topic 的訊息分開儲存,邏輯上一個Topic 的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的 Topic 即可生產或消費資料而不必關心資料存於何處)

建立流程:

  1. controller在ZooKeeper的/brokers/topics節點上註冊watcher,當topic被建立,則 controller會通過watch得到該topic的partition/replica分配。

  2. controller從/brokers/ids讀取當前所有可用的broker列表,對於set_p中的每一個 partition:

2.1 從分配給該partition的所有replica(稱為AR)中任選一個可用的broker作為新的 leader,並將AR設定為新的ISR

2.2 將新的leader和ISR寫 入/brokers/topics/[topic]/partitions/[partition]/state

  1. controller通過RPC向相關的broker傳送LeaderAndISRRequest。

刪除流程:

  1. controller在zooKeeper的/brokers/topics節點上註冊watcher,當topic被刪除,則 controller會通過watch得到該topic的partition/replica分配。

  2. 若delete.topic.enable=false,結束;否則controller註冊在/admin/delete_topics上 的watch被fire controller 通過回撥向對應的broker傳送StopReplicaRequest。

4.3 Partition

  1. 一個topic 中的資料被分割為一個或者多個partition , 每個topic 至少有一個partition.

  2. 當生產者生產資料的時候,根據分配策略,選擇分割槽,然後將訊息追加到指定的分割槽的末尾,即佇列的末尾

    3. Partition 資料路由規則: 如果指定了partition ,則直接使用,沒有指定partition ,但指定了key ,通過對key 的value 進行Hash, 選出一個partition, 如果partition  和key 都沒有指定,使用輪詢選出一個partition 

  1. 每個訊息都會有一個自增的編號,offset , 用來標識順序和標識訊息的偏移量

  2. 每個partition 中的資料使用多個 segment 檔案進行儲存

  3. partition 中的資料是有序的,不同partition 之間的資料丟失了資料的順序。

  4. 如果topic 中有多個 partition ,消費資料時不能保證資料的順序,嚴格保證訊息的消費順序的場景下,需要將partition 的數目 設定為 1 。

PS :

這點我們在實際的專案中,在 十分鐘損失電量計算過程中,由於同一個測點可能多發,我們選用後到的為最終的值進行計算,如果上游 topic 中設定的 partition 數目不為1 的話,可能在消費的時候,先到的錯誤資料反而後消費,這就導致錯誤的資料覆蓋正確的資料。這單需要驗證,驗證 partion 的資料,以及 key 是什麼 ?

 

4.4 Leader

  1. 每個 partition 有多個副本,並且有且僅有一個作為 Leader, Leader 是當前負責資料的讀寫的partition

  2. 操作順序:

2.1 producer 先從zookeeper 的 state 節點找到該 partition 的 Leader

2.2 producer 將訊息傳送給該 Leader

2.3 Leader 將訊息寫入本地 log

2.4 followers 從 Leader pull 訊息,寫入本地 log 後向 Leader 傳送ACK ,確定同步完成

2.5 Leader 收到所有ISR中的 replica 傳送的ACK 訊息後,確定所有的從節點同步完成之後,增加HW (high watermark ,最後commit 的 offset ) ,並向 producer 傳送ACK

 

4.5 Follower

  1. Follower 跟隨 Leader ,所有寫請求都通過Leader 路由,資料變更會廣播給所有的Follower, Follower 與Leader 保持資料一致

  2. 如果Leader 失效,則從Follower 中選舉一個新的Leader

  3. 當Follower 掛掉、卡主或者同步太慢,Leader 會把這個Follower 從 ISR 列表中刪除,重新建立一個Follower

 

4.6 Replication

  1. 資料會存放到 topic 的partition 中,但是有可能分割槽會損壞,因此我們需要對分割槽的資料進行備份,備份多少份取決資料的重要程度

  2. 將分割槽分為Leader(1) 和Follower (n) ,Leader 負責寫入和讀取資料,Follower 只負責備份,保證了資料的一致性

  3. 備份數 設定 為 n ,則表示 Leader + Follower = n 個,主資料加備份資料一共 n 份

  4. Kafka 分配Replica的演算法如下: 順序取餘

4.1 將所有 broker (假設共 n 個 broker ) 和待分配的 partition 順序

4.2 將第 i 個 partition 分配到 第 ( i % n ) 個 broker 上

4.3 將第 i 個 partition 的第 j 個 replica 分配到 第 (i + j )% n 個 broker 上。

4.7 Producer

  1. 生產者即資料的釋出者,該角色將訊息釋出到kafka 的 topic 中

  2. broker 接收到生產者傳送的訊息後, broker 將該訊息追加到當前用於追加資料的 segment 檔案中

  3. 生產者傳送的訊息,儲存到一個partioton 中,生產者也可以指定資料儲存的partition

4.8 consumer

  1. 消費者可以從 broker 中讀取資料,消費者可以消費多個 topic 中的資料

  2. kafka 提供的兩套 consumer API :

2.1 The hige-level Consumer API

2.2 The simpleConsumer API

  1. High-level Consumer API 提供了一個從kafka 消費資料的高層抽象,而 SimpleConsumer API 則需要開發人員更多的關注細節

4.9 Consumer Group

  1. 每個Consumer 屬於一個特定的Consumer Group ,可為每個Consumer 指定Group name ,若不指定則屬於預設的group

  2. 將多個消費者集中到一起去處理某個 topic 的資料,可以更快的提高資料的消費能力

  3. 整個消費者共享一組偏移量,防止資料被重複讀取消費

4.10 offset 偏移量

  1. 可以唯一的標識一條訊息

  2. 偏移量決定讀取資料的位置,不會有執行緒安全的問題,消費者可以通過偏移量來決定下次讀取的訊息

  3. 訊息被消費之後,並不一定馬上刪除,這樣多個業務就可以重複使用kafka的訊息

  4. 我們某一個業務也可以通過修改偏移量達到重新讀取訊息的目的,偏移量由使用者控制

  5. 訊息最終還是會被刪除的,預設生命週期 為 7 天

 

4.11 Zookeeper

kafka 通過 zookeeper 來儲存叢集的 meta 資訊

5. Kafka 資料檢索機制

為什麼kafka 的效能如此高,是 這個要重點看看,做兩頁PPT ,對比redis 的B+ 樹結構

  1. topic在物理層面以partition為分組,一個topic可以分成若干個partition

  2. partition還可以細分為Segment,一個partition物理上由多個Segment組成 , segment 的引數有兩個:

    log.segment.bytes:單個segment可容納的最大資料量,預設為1GB

    log.segment.ms:Kafka在commit一個未寫滿的segment前,所等待的時間(預設為7天)

  3. LogSegment 檔案由兩部分組成,分別為“.index”檔案和“.log”檔案,分別表示為 Segment 索引文

    件和資料檔案。

    3.1 partition全域性的第一個segment從0開始,後續每個segment檔名為上一個segment檔案最後一條訊息的offset值

    3.2 數值大小為64位,20位數字字元長度,沒有數字用0填充

  1. 訊息都具有固定的物理結構,包括:offset(8 Bytes)、訊息體的大小(4 Bytes)、crc32(4 Bytes)、

    magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等

    欄位,可以確定一條訊息的大小,即讀取到哪裡截止。

 

 

 

PS:

有一個引數 anto.commit.enable 預設為 true ,消費之後自動更新 offset ,增加偏移量,將這個改為 false ,然後再消費使用完之後,再去手動提交這個 offset ,這樣就不會因為服務掛掉,而導致資料丟失。結合我們實際的服務,訂閱到測點的資料之後,要寫入到clikhouse 之後,才預設為這條資料 消費處理完了,才更新這個offset ,這樣才能最大限度的保障,我們的服務不會因為故障而導致資料丟失,也不會有重複消費,基於此分散式架構,最大限度的提升可靠性和穩定性

 

 

6. 資料的安全性

6.1 Producer delivery guarantee

Producers可以選擇是否為資料的寫入接收ack,有以下3種ack的配置選項:

request.required.acks :

0:Producer 只需要把訊息發出去就認為成功了,效能高,安全性低,傳輸最快,但可能會丟資料

1:主節點已經接收到資訊,效能一般,安全性一般

-1:Producer 需要等待 ISR 中的所有 Follower 都確認接收到資料後才算一次傳送完成,可

靠性最高,但是效能較慢。效能最差,可能需要重複傳輸多次

 

6.2 ISR 機制

ISR : In Sync Replicas 加入同步佇列的副本

ISR = Leader + 沒有落後太多的副本

當主節點掛掉之後,會從ISR 中挑選一個新的節點作為新的主節點

判斷的標準:

  1. 超過10s沒有同步資料 replica.lag.time.max.ms=10000

  2. 主副節點相差 4000條資料 rerplica.lag.max.messages=4000

髒節點選舉

kafka採用一種降級措施來處理:選舉第一個恢復的node作為leader提供服務,以它的資料為基準,這個措施被稱為髒 leader選舉; 如果在 follower 還沒來得及同步資料的時候,主節點掛了,並且 生產者不會重發資料,這個時候就有可能導致資料丟失。

6.3 Broker 資料儲存機制

無論訊息是否被消費,kafka 都會保留所有訊息

有兩種策略可以刪除舊資料

  1. 基於時間 ,儲存時間超過一定時限刪除 ,預設 log.retention.hours=168 168 小時,7天

  2. 基於大小,資料容量超過一定大小刪除 ,預設 log.retention.bytes=1073741824 , 10G

6.4 重複消費和資料的丟失

有可能一個消費者取出了一條資料(offset=88),但是還沒有處理完成,但是消費者被關閉了

如果下次還能從88重新處理就屬於完美情況

如果下次資料從86開始,就屬於資料的重複消費

如果下次資料從89開始,就是與資料的丟失

 

消費者自動提交偏移量的時間間隔props.put("auto.commit.interval.ms", "1010");

提交間隔 > 單條執行時間 (重複)

提交間隔 < 單條執行時間 (丟失)