1. 程式人生 > 實用技巧 >ST表求LCA整理

ST表求LCA整理

一、簡介

1.1 概述

Kafka是最初由Linkedin公司開發,是一個分散式、分割槽的、多副本的、多訂閱者,基於zookeeper協調的分散式日誌系統(也可以當做MQ系統),常見可以用於web/nginx日誌、訪問日誌,訊息服務等等,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源專案。

主要應用場景是:日誌收集系統和訊息系統。

Kafka主要設計目標如下:

  • 以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間的訪問效能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條訊息的傳輸。
  • 支援Kafka Server間的訊息分割槽,及分散式消費,同時保證每個partition內的訊息順序傳輸。
  • 同時支援離線資料處理和實時資料處理。
  • Scale out:支援線上水平擴充套件

1.2 訊息系統介紹

一個訊息系統負責將資料從一個應用傳遞到另外一個應用,應用只需關注於資料,無需關注資料在兩個或多個應用間是如何傳遞的。分散式訊息傳遞基於可靠的訊息佇列,在客戶端應用和訊息系統之間非同步傳遞訊息。有兩種主要的訊息傳遞模式:點對點傳遞模式、釋出-訂閱模式。大部分的訊息系統選用釋出-訂閱模式。Kafka就是一種釋出-訂閱模式

1.3 點對點訊息傳遞模式

在點對點訊息系統中,訊息持久化到一個佇列中。此時,將有一個或多個消費者消費佇列中的資料。但是一條訊息只能被消費一次。當一個消費者消費了佇列中的某條資料之後,該條資料則從訊息佇列中刪除。該模式即使有多個消費者同時消費資料,也能保證資料處理的順序。這種架構描述示意圖如下:

生產者傳送一條訊息到queue,只有一個消費者能收到

1.4 釋出-訂閱訊息傳遞模式

在釋出-訂閱訊息系統中,訊息被持久化到一個topic中。與點對點訊息系統不同的是,消費者可以訂閱一個或多個topic,消費者可以消費該topic中所有的資料,同一條資料可以被多個消費者消費,資料被消費後不會立馬刪除。在釋出-訂閱訊息系統中,訊息的生產者稱為釋出者,消費者稱為訂閱者。該模式的示例圖如下:

釋出者傳送到topic的訊息,只有訂閱了topic的訂閱者才會收到訊息

回到頂部

二、Kafka的優點

2.1 解耦

在專案啟動之初來預測將來專案會碰到什麼需求,是極其困難的。訊息系統在處理過程中間插入了一個隱含的、基於資料的介面層,兩邊的處理過程都要實現這一介面。這允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。

2.2 冗餘(副本)

有些情況下,處理資料的過程會失敗。除非資料被持久化,否則將造成丟失。訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險。許多訊息佇列所採用的"插入-獲取-刪除"正規化中,在把一個訊息從佇列中刪除之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的儲存直到你使用完畢。

2.3 擴充套件性

因為訊息佇列解耦了你的處理過程,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變程式碼、不需要調節引數。擴充套件就像調大電力按鈕一樣簡單。

2.4 靈活性&峰值處理能力

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

2.5 可恢復性

系統的一部分元件失效時,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。

2.6 順序保證

在大多使用場景下,資料處理的順序都很重要。大部分訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。Kafka保證一個Partition內的訊息的有序性。

2.7 緩衝

在任何重要的系統中,都會有需要不同的處理時間的元素。例如,載入一張圖片比應用過濾器花費更少的時間。訊息佇列通過一個緩衝層來幫助任務最高效率的執行———寫入佇列的處理會盡可能的快速。該緩衝有助於控制和優化資料流經過系統的速度。

2.8 非同步通訊

很多時候,使用者不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們。

回到頂部

三、常用Message Queue對比

3.1 RabbitMQ

RabbitMQ是使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更適合於企業級的開發。同時實現了Broker構架,這意味著訊息在傳送給客戶端時先在中心佇列排隊。對路由,負載均衡或者資料持久化都有很好的支援。

3.2 Redis

Redis是一個基於Key-Value對的NoSQL資料庫,開發維護很活躍。雖然它是一個Key-Value資料庫儲存系統,但它本身支援MQ功能,所以完全可以當做一個輕量級的佇列服務來使用。對於RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試資料分為128Bytes、512Bytes、1K和10K四個不同大小的資料。實驗表明:入隊時,當資料比較小時Redis的效能要高於RabbitMQ,而如果資料大小超過了10K,Redis則慢的無法忍受;出隊時,無論資料大小,Redis都表現出非常好的效能,而RabbitMQ的出隊效能則遠低於Redis。

3.3 ZeroMQ

ZeroMQ號稱最快的訊息佇列系統,尤其針對大吞吐量的需求場景。ZeroMQ能夠實現RabbitMQ不擅長的高階/複雜的佇列,但是開發人員需要自己組合多種技術框架,技術上的複雜度是對這MQ能夠應用成功的挑戰。ZeroMQ具有一個獨特的非中介軟體的模式,你不需要安裝和執行一個訊息伺服器或中介軟體,因為你的應用程式將扮演這個伺服器角色。你只需要簡單的引用ZeroMQ程式庫,可以使用NuGet安裝,然後你就可以愉快的在應用程式之間傳送訊息了。但是ZeroMQ僅提供非永續性的佇列,也就是說如果宕機,資料將會丟失。其中,Twitter的Storm 0.9.0以前的版本中預設使用ZeroMQ作為資料流的傳輸(Storm從0.9版本開始同時支援ZeroMQ和Netty作為傳輸模組)。

3.4 ActiveMQ

ActiveMQ是Apache下的一個子專案。 類似於ZeroMQ,它能夠以代理人和點對點的技術實現佇列。同時類似於RabbitMQ,它少量程式碼就可以高效地實現高階應用場景。

3.5 Kafka/Jafka

Kafka是Apache下的一個子專案,是一個高效能跨語言分散式釋出/訂閱訊息佇列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行訊息持久化;高吞吐,在一臺普通的伺服器上既可以達到10W/s的吞吐速率;完全的分散式系統,Broker、Producer、Consumer都原生自動支援分散式,自動實現負載均衡;支援Hadoop資料並行載入,對於像Hadoop的一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的並行載入機制統一了線上和離線的訊息處理。Apache Kafka相對於ActiveMQ是一個非常輕量級的訊息系統,除了效能非常好之外,還是一個工作良好的分散式系統。

回到頂部

四、Kafka中的術語解釋

4.1 概述

在深入理解Kafka之前,先介紹一下Kafka中的術語。下圖展示了Kafka的相關術語以及之間的關係:

上圖中一個topic配置了3個partition。Partition1有兩個offset:0和1。Partition2有4個offset。Partition3有1個offset。副本的id和副本所在的機器的id恰好相同。

如果一個topic的副本數為3,那麼Kafka將在叢集中為每個partition建立3個相同的副本。叢集中的每個broker儲存一個或多個partition。多個producer和consumer可同時生產和消費資料。

4.2 broker

Kafka 叢集包含一個或多個伺服器,伺服器節點稱為broker。

broker儲存topic的資料。如果某topic有N個partition,叢集有N個broker,那麼每個broker儲存該topic的一個partition。

如果某topic有N個partition,叢集有(N+M)個broker,那麼其中有N個broker儲存該topic的一個partition,剩下的M個broker不儲存該topic的partition資料。

如果某topic有N個partition,叢集中broker數目少於N個,那麼一個broker儲存該topic的一個或多個partition。在實際生產環境中,儘量避免這種情況的發生,這種情況容易導致Kafka叢集資料不均衡。

4.3 Topic

每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)

類似於資料庫的表名

4.3 Partition

topic中的資料分割為一個或多個partition。每個topic至少有一個partition。每個partition中的資料使用多個segment檔案儲存。partition中的資料是有序的,不同partition間的資料丟失了資料的順序。如果topic有多個partition,消費資料時就不能保證資料的順序。在需要嚴格保證訊息的消費順序的場景下,需要將partition數目設為1。

4.4 Producer

生產者即資料的釋出者,該角色將訊息釋出到Kafka的topic中。broker接收到生產者傳送的訊息後,broker將該訊息追加到當前用於追加資料的segment檔案中。生產者傳送的訊息,儲存到一個partition中,生產者也可以指定資料儲存的partition。

4.5 Consumer

消費者可以從broker中讀取資料。消費者可以消費多個topic中的資料。

4.6 Consumer Group

每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。

4.7 Leader

每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當前負責資料的讀寫的partition。

4.8 Follower

Follower跟隨Leader,所有寫請求都通過Leader路由,資料變更會廣播給所有Follower,Follower與Leader保持資料同步。如果Leader失效,則從Follower中選舉出一個新的Leader。當Follower與Leader掛掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)列表中刪除,重新建立一個Follower。

下面開始整合:

1.引入依賴

        <!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2.新增配置

  kafka:
    bootstrap-servers: centos_1318:19090,centos_1318:19091,centos_1318:19092 # 指定kafka 代理地址,可以多個
    producer: # 生產者
      retries: 1 # 設定大於0的值,則客戶端會將傳送失敗的記錄重新發送
      # 每次批量傳送訊息的數量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定訊息key和訊息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: realTimeRefund
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.生產者

/**
 * 生產者
 */
@RestController
public class Producer {

    @Resource
    KafkaTemplate<Field.Str, Object> kafkaTemplate;

    @GetMapping("/message/send")
    public boolean send() {
//        Map map=new HashMap<>();
//        map.put("name","John Doe");
//        map.put("age",18);
//        map.put("address","西安");
        String message = "6101130001|181,連線,空閒";
        System.out.println("傳送的資料:" + message);
        kafkaTemplate.send("GunStatusNotify", message); return true; } }

2.消費者

    /**
     * 監聽裝置狀態變化
     *
     * @param message
     */
    @KafkaListener(topics = "GunStatusNotify")
    public void deviceStatus(String message) {
        deviceStatusUtils.deviceStatus(message);
    }