演算法 | 排序演算法圖形化比較:快速排序、插入排序、選擇排序、氣泡排序
1. 非同步通訊原理
1.1 觀察者模式
觀察者模式,又交發布-訂閱模式;定義物件間一種一對多的依賴關係,使得當一個物件改變狀態,則所有依賴於它的物件都會得到通知並自動更新
一個物件(目標物件)的狀態發生改變,所有的依賴物件(觀察者物件)都會得到通知
現實中的應用場景: 線上購物的到貨通知,降價通知 ,就是這些使用者訂閱了這些 貨物的狀態變更訊息,一旦狀態變更,會主動把這些訊息推送給這些訂閱的使用者。
1.2 生產者消費者模型
傳統模式:生產者直接將訊息傳遞給指定的消費者,缺點是耦合性特別高,當生產者或者消費者發生變化,都需要重寫業務邏輯。
生產者消費者模式:
通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊
在生產者消費者模式中,可以有n 個執行緒進行生產,同時 m 個執行緒進行消費,兩種角色通過記憶體緩衝區進行通訊,生產者負責向緩衝區裡面新增資料單元,消費者負責從緩衝區裡面取出資料單元, 其快取區一般是一個佇列,遵循 先進先出 的原則,即先到的資料訊息先被消費掉。
優點:
-
-
支援併發。生產者直接呼叫消費者的某個方法過程中函式呼叫是同步的,萬一消費者處理資料很慢,生產者就會白白糟蹋大好時光。
-
支援忙閒不均,削峰填谷。如果生產者生產資料的速度時快時慢,快取區的好處就體現出來了,當生產者產生資料快,消費者來不及處理,未處理的資料可以暫時存放在緩衝區,當生產者的生產資料速度慢下來,消費者再慢慢處理掉生產的資料。
資料單元
-
資料單元必須關聯到某種業務物件,資料單元和業務物件應該處於一對一或者一對多的關係,如果資料單元的顆粒度太小,會增加資料傳輸的次數,顆粒度太大會增加單個數據單元傳輸的時間,影響後期消費。
-
在傳輸過程中,要保證資料單元的完整性
-
各個資料單元之間沒有互相依賴,某個資料單元傳輸失敗不應該影響已經完成傳輸的單元,,也不影響尚未完成傳輸的單元
2. 訊息系統原理
一個訊息系統負責將資料從一個應用傳遞到另外一個應用,應用只需關注於資料,無需關注資料在兩個或多個應用間是如何傳遞的。
2.1 點對點訊息傳遞
在點對點訊息系統中,訊息持久化到一個佇列中。此時,將有一個或多個消費者消費佇列中的資料。但是一條訊息只能被消費一次。當一個消費者消費了佇列中的某條資料之後,該條資料則從訊息佇列中刪除。該模式即使有多個消費者同時消費資料,也能保證資料處理的順序。基於推送模型的訊息系統,由訊息代理記錄消費狀態
訊息代理將訊息推送(push)到消費者後,標記這條訊息為已經被消費,但是這種方式無法很好
地保證消費的處理語義。
2.2 釋出訂閱訊息傳遞
在釋出-訂閱訊息系統中,訊息的生產者稱為釋出者,消費者稱為訂閱者。
訊息被持久化到一個topic中,消費者可以訂閱一個或多個topic,消費者可以消費該topic中所有的資料,同一條資料可以被多個消費者消費,資料被消費後不會立馬刪除。
Kafka 採取拉取模型(Poll),由自己控制消費速度,以及消費的進度,消費者可以按照任意的偏移量
進行消費。
主流的訊息中介軟體有:
ActiveMQ
RocketMQ
RabbitMQ
KAFKA
Redis 也可以做為訊息中介軟體,但是Redis 在資料量超過10k 之後效能很慢,並且基於記憶體的資料儲存有可能丟失資料
3. kafka 簡介
Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。
Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者在網站中的所有動作流資料。
3.1 設計目標
-
以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間的訪問效能。
-
高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條訊息的傳輸。
-
支援Kafka Server間的訊息分割槽,及分散式消費,同時保證每個partition內的訊息順序傳輸。
-
同時支援離線資料處理和實時資料處理。
-
支援線上水平擴充套件
3.2 Kafka 的優點
-
解耦。
-
冗餘。
-
擴充套件性
-
靈活性&峰值處理能力
-
可恢復性
-
順序保證
-
緩衝
-
非同步通訊
4.Kafka 系統架構
4.1. Broker
Kafka 叢集包含一個或多個伺服器,伺服器節點稱為broker。
4.2. Topic
每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。類似於資料庫的表名 或者ES的 Index
物理上不同 Topic 的訊息分開儲存,邏輯上一個Topic 的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的 Topic 即可生產或消費資料而不必關心資料存於何處)
建立流程:
-
controller在ZooKeeper的/brokers/topics節點上註冊watcher,當topic被建立,則 controller會通過watch得到該topic的partition/replica分配。
-
controller從/brokers/ids讀取當前所有可用的broker列表,對於set_p中的每一個 partition:
2.1 從分配給該partition的所有replica(稱為AR)中任選一個可用的broker作為新的 leader,並將AR設定為新的ISR
2.2 將新的leader和ISR寫 入/brokers/topics/[topic]/partitions/[partition]/state
-
controller通過RPC向相關的broker傳送LeaderAndISRRequest。
刪除流程:
-
controller在zooKeeper的/brokers/topics節點上註冊watcher,當topic被刪除,則 controller會通過watch得到該topic的partition/replica分配。
-
若delete.topic.enable=false,結束;否則controller註冊在/admin/delete_topics上 的watch被fire controller 通過回撥向對應的broker傳送StopReplicaRequest。
4.3 Partition
-
一個topic 中的資料被分割為一個或者多個partition , 每個topic 至少有一個partition.
-
當生產者生產資料的時候,根據分配策略,選擇分割槽,然後將訊息追加到指定的分割槽的末尾,即佇列的末尾
3. Partition 資料路由規則: 如果指定了partition ,則直接使用,沒有指定partition ,但指定了key ,通過對key 的value 進行Hash, 選出一個partition, 如果partition 和key 都沒有指定,使用輪詢選出一個partition
-
每個訊息都會有一個自增的編號,offset , 用來標識順序和標識訊息的偏移量
-
每個partition 中的資料使用多個 segment 檔案進行儲存
-
partition 中的資料是有序的,不同partition 之間的資料丟失了資料的順序。
-
如果topic 中有多個 partition ,消費資料時不能保證資料的順序,嚴格保證訊息的消費順序的場景下,需要將partition 的數目 設定為 1 。
PS :
這點我們在實際的專案中,在 十分鐘損失電量計算過程中,由於同一個測點可能多發,我們選用後到的為最終的值進行計算,如果上游 topic 中設定的 partition 數目不為1 的話,可能在消費的時候,先到的錯誤資料反而後消費,這就導致錯誤的資料覆蓋正確的資料。這單需要驗證,驗證 partion 的資料,以及 key 是什麼 ?
4.4 Leader
-
每個 partition 有多個副本,並且有且僅有一個作為 Leader, Leader 是當前負責資料的讀寫的partition
-
操作順序:
2.1 producer 先從zookeeper 的 state 節點找到該 partition 的 Leader
2.2 producer 將訊息傳送給該 Leader
2.3 Leader 將訊息寫入本地 log
2.4 followers 從 Leader pull 訊息,寫入本地 log 後向 Leader 傳送ACK ,確定同步完成
2.5 Leader 收到所有ISR中的 replica 傳送的ACK 訊息後,確定所有的從節點同步完成之後,增加HW (high watermark ,最後commit 的 offset ) ,並向 producer 傳送ACK
4.5 Follower
-
Follower 跟隨 Leader ,所有寫請求都通過Leader 路由,資料變更會廣播給所有的Follower, Follower 與Leader 保持資料一致
-
如果Leader 失效,則從Follower 中選舉一個新的Leader
-
當Follower 掛掉、卡主或者同步太慢,Leader 會把這個Follower 從 ISR 列表中刪除,重新建立一個Follower
4.6 Replication
-
資料會存放到 topic 的partition 中,但是有可能分割槽會損壞,因此我們需要對分割槽的資料進行備份,備份多少份取決資料的重要程度
-
將分割槽分為Leader(1) 和Follower (n) ,Leader 負責寫入和讀取資料,Follower 只負責備份,保證了資料的一致性
-
備份數 設定 為 n ,則表示 Leader + Follower = n 個,主資料加備份資料一共 n 份
-
Kafka 分配Replica的演算法如下: 順序取餘
4.1 將所有 broker (假設共 n 個 broker ) 和待分配的 partition 順序
4.2 將第 i 個 partition 分配到 第 ( i % n ) 個 broker 上
4.3 將第 i 個 partition 的第 j 個 replica 分配到 第 (i + j )% n 個 broker 上。
4.7 Producer
-
生產者即資料的釋出者,該角色將訊息釋出到kafka 的 topic 中
-
broker 接收到生產者傳送的訊息後, broker 將該訊息追加到當前用於追加資料的 segment 檔案中
-
生產者傳送的訊息,儲存到一個partioton 中,生產者也可以指定資料儲存的partition
4.8 consumer
-
消費者可以從 broker 中讀取資料,消費者可以消費多個 topic 中的資料
-
kafka 提供的兩套 consumer API :
2.1 The hige-level Consumer API
2.2 The simpleConsumer API
-
High-level Consumer API 提供了一個從kafka 消費資料的高層抽象,而 SimpleConsumer API 則需要開發人員更多的關注細節
4.9 Consumer Group
-
每個Consumer 屬於一個特定的Consumer Group ,可為每個Consumer 指定Group name ,若不指定則屬於預設的group
-
將多個消費者集中到一起去處理某個 topic 的資料,可以更快的提高資料的消費能力
-
整個消費者共享一組偏移量,防止資料被重複讀取消費
4.10 offset 偏移量
-
可以唯一的標識一條訊息
-
偏移量決定讀取資料的位置,不會有執行緒安全的問題,消費者可以通過偏移量來決定下次讀取的訊息
-
訊息被消費之後,並不一定馬上刪除,這樣多個業務就可以重複使用kafka的訊息
-
我們某一個業務也可以通過修改偏移量達到重新讀取訊息的目的,偏移量由使用者控制
-
訊息最終還是會被刪除的,預設生命週期 為 7 天
4.11 Zookeeper
kafka 通過 zookeeper 來儲存叢集的 meta 資訊
5. Kafka 資料檢索機制
為什麼kafka 的效能如此高,是 這個要重點看看,做兩頁PPT ,對比redis 的B+ 樹結構
-
topic在物理層面以partition為分組,一個topic可以分成若干個partition
-
partition還可以細分為Segment,一個partition物理上由多個Segment組成 , segment 的引數有兩個:
log.segment.bytes:單個segment可容納的最大資料量,預設為1GB
log.segment.ms:Kafka在commit一個未寫滿的segment前,所等待的時間(預設為7天)
-
LogSegment 檔案由兩部分組成,分別為“.index”檔案和“.log”檔案,分別表示為 Segment 索引文
件和資料檔案。
3.1 partition全域性的第一個segment從0開始,後續每個segment檔名為上一個segment檔案最後一條訊息的offset值
3.2 數值大小為64位,20位數字字元長度,沒有數字用0填充
-
訊息都具有固定的物理結構,包括:offset(8 Bytes)、訊息體的大小(4 Bytes)、crc32(4 Bytes)、
magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等
欄位,可以確定一條訊息的大小,即讀取到哪裡截止。
PS:
有一個引數 anto.commit.enable 預設為 true ,消費之後自動更新 offset ,增加偏移量,將這個改為 false ,然後再消費使用完之後,再去手動提交這個 offset ,這樣就不會因為服務掛掉,而導致資料丟失。結合我們實際的服務,訂閱到測點的資料之後,要寫入到clikhouse 之後,才預設為這條資料 消費處理完了,才更新這個offset ,這樣才能最大限度的保障,我們的服務不會因為故障而導致資料丟失,也不會有重複消費,基於此分散式架構,最大限度的提升可靠性和穩定性
6. 資料的安全性
6.1 Producer delivery guarantee
Producers可以選擇是否為資料的寫入接收ack,有以下3種ack的配置選項:
request.required.acks :
0:Producer 只需要把訊息發出去就認為成功了,效能高,安全性低,傳輸最快,但可能會丟資料
1:主節點已經接收到資訊,效能一般,安全性一般
-1:Producer 需要等待 ISR 中的所有 Follower 都確認接收到資料後才算一次傳送完成,可
靠性最高,但是效能較慢。效能最差,可能需要重複傳輸多次
6.2 ISR 機制
ISR : In Sync Replicas 加入同步佇列的副本
ISR = Leader + 沒有落後太多的副本
當主節點掛掉之後,會從ISR 中挑選一個新的節點作為新的主節點
判斷的標準:
-
超過10s沒有同步資料 replica.lag.time.max.ms=10000
-
主副節點相差 4000條資料 rerplica.lag.max.messages=4000
髒節點選舉
kafka採用一種降級措施來處理:選舉第一個恢復的node作為leader提供服務,以它的資料為基準,這個措施被稱為髒 leader選舉; 如果在 follower 還沒來得及同步資料的時候,主節點掛了,並且 生產者不會重發資料,這個時候就有可能導致資料丟失。
6.3 Broker 資料儲存機制
無論訊息是否被消費,kafka 都會保留所有訊息
有兩種策略可以刪除舊資料
-
基於時間 ,儲存時間超過一定時限刪除 ,預設 log.retention.hours=168 168 小時,7天
-
基於大小,資料容量超過一定大小刪除 ,預設 log.retention.bytes=1073741824 , 10G
6.4 重複消費和資料的丟失
有可能一個消費者取出了一條資料(offset=88),但是還沒有處理完成,但是消費者被關閉了
如果下次還能從88重新處理就屬於完美情況
如果下次資料從86開始,就屬於資料的重複消費
如果下次資料從89開始,就是與資料的丟失
消費者自動提交偏移量的時間間隔props.put("auto.commit.interval.ms", "1010");
提交間隔 > 單條執行時間 (重複)
提交間隔 < 單條執行時間 (丟失)