1. 程式人生 > >使用PHP處理Kafka訊息

使用PHP處理Kafka訊息

Kafka 是一種高吞吐的分散式訊息系統,能夠替代傳統的訊息佇列用於解耦合資料處理,快取未處理訊息等,同時具有更高的吞吐率,支援分割槽、多副本、冗餘,因此被廣泛用於大規模訊息資料處理應用。

Kafka的特點:

  • 以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的訪問效能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條以上訊息的傳輸。【據瞭解,Kafka每秒可以生產約25萬訊息(50 MB),每秒處理55萬訊息(110 MB)】
  • 支援Kafka Server間的訊息分割槽,同時保證每個Partition內的訊息順序傳輸。
  • 分散式系統,易於向外擴充套件。所有的producer、broker和consumer都會有多個,均為分散式的。無需停機即可擴充套件機器。
  • 訊息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。
  • 同時支援離線資料處理和實時資料處理。

Kafka的架構:

kafka

Kafka的整體架構非常簡單,producer、broker(kafka)和consumer都可以有多個。Producer,consumer實現Kafka註冊的介面,資料從producer傳送到broker,broker承擔一箇中間快取和分發的作用。broker分發註冊到系統中的consumer。broker的作用類似於快取,即活躍的資料和離線處理系統之間的快取。客戶端和伺服器端的通訊,是基於簡單,高效能,且與程式語言無關的TCP協議。

Kafka基本概念:

  • Topic:特指Kafka處理的訊息源(feeds of messages)的不同分類。
  • Partition:Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id(offset)。
  • Message:訊息,是通訊的基本單位,每個producer可以向一個topic(主題)釋出一些訊息。
  • Producers:訊息和資料生產者,向Kafka的一個topic釋出訊息的過程叫做producers。
  • Consumers:訊息和資料消費者,訂閱topics並處理其釋出的訊息的過程叫做consumers。
  • Broker:快取代理,Kafa叢集中的一臺或多臺伺服器統稱為broker。

Kafka訊息傳送的流程:

Kafka-Message

 

下面是PHP生產、消費Kafka訊息的例子(假設已經配置好Kafka):

1.從zookeeper原始碼src/c/src安裝zookeeper c client

 cd  zookeeper-3.4.8/src/c
   ./configure
   make  &&  make  install

2.編譯php libzookper擴充套件

git clone  https://github.com/Timandes/libzookeeper.git
cd  libzookeeper
phpize
./configure  --with-libzookeeper=/usr/local/bin/cli_mt
make  &&  make  install

3.編譯php zookeeper擴充套件

git clone  https://github.com/andreiz/php-zookeeper.git
cd  php-zookeeper
phpize
./configure
make  &&  make  install

4.修改php.ini配置,新增libzookeeper和php-zookeeper擴充套件

extension=libzookeeper.so
extension=zookeeper.so

PHP處理Kafka訊息

1.啟動zookeeper和kafka

 kafka_2.11-0.10.0.0/bin/zookeeper-server-start.sh  --daemon 
 kafka_2.11-0.10.0.0/config/zookeeper.properties
 kafka_2.11-0.10.0.0/bin/kafka-server-start.sh  kafka_2.11-0.10.0.0/config/server.properties

2.建立由2個partition組成的、名為testtopic的topic

kafka_2.11-0.10.0.0/bin/kafka-topics.sh  --create  --zookeeper localhost:2181  --replication-factor  1  --partitions  2  --topic testtopic

3.composer安裝nmred/kafka-php

composer require  "nmred/kafka-php"

4.producer.php程式碼

<?php
 require_once('./vendor/autoload.php');
 $produce  =  \Kafka\Produce::getInstance('localhost:2181',  3000);
 $produce->setRequireAck(-1);
$topicName  =  'testtopic';
//獲取到topic下可用的partitions
$partitions  =  $produce->getAvailablePartitions($topicName);
$partitionCount  =  count($partitions);
$count  =  1;
while(true){
    $message  =  json_encode(array('uid'  =>  $count,  'age'  =>  $count%100,  'datetime'  =>  date('Y-m-d H:i:s')));

    //傳送訊息到不同的partition

    $partitionId  =  $count%$partitionCount;

    $produce->setMessages('testtopic',  $partitionId,  array($message));

    $result  =  $produce->send();

    var_dump($result);

    $count++;

    echo  "producer sleeping\n";

    sleep(1);
}

5.consumer.php程式碼

<?php

require_once('./vendor/autoload.php');

//獲取需要處理的partitionId

$partitionId  =  isset($argv[1])  ?  intval($argv[1])  :  0;

$consumer  =  \Kafka\Consumer::getInstance('localhost:2181');

$consumer->setGroup('test-consumer-group');

$consumer->setPartition('testtopic',  $partitionId);

$consumer->setFromOffset(true);

$consumer->setMaxBytes(102400);

while(true){

    $topic  =  $consumer->fetch();

foreach  ($topic  as  $topicName  =>  $partition)  {

    foreach  ($partition  as  $partId  =>  $messageSet)  {

        foreach  ($messageSet  as  $message)  {

            var_dump($message);

        }

    }

    }

echo  "consumer sleeping\n";

sleep(1);

}

6.執行php程式碼

在3個終端介面分別執行

php producer.php

php consumer.php  0

php consumer.php  1

7.結果

兩個consumer指令碼依次收到producer傳送的訊息

 

php-kafka-consumer-output

轉自:https://aiddroid.com/kafka-introduction-and-php-kafka-usage/

作者:daos
連結:https://www.jianshu.com/p/b9d06f33f060
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。