使用PHP處理Kafka訊息
Kafka 是一種高吞吐的分散式訊息系統,能夠替代傳統的訊息佇列用於解耦合資料處理,快取未處理訊息等,同時具有更高的吞吐率,支援分割槽、多副本、冗餘,因此被廣泛用於大規模訊息資料處理應用。
Kafka的特點:
- 以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的訪問效能。
- 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條以上訊息的傳輸。【據瞭解,Kafka每秒可以生產約25萬訊息(50 MB),每秒處理55萬訊息(110 MB)】
- 支援Kafka Server間的訊息分割槽,同時保證每個Partition內的訊息順序傳輸。
- 分散式系統,易於向外擴充套件。所有的producer、broker和consumer都會有多個,均為分散式的。無需停機即可擴充套件機器。
- 訊息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。
- 同時支援離線資料處理和實時資料處理。
Kafka的架構:
kafka
Kafka的整體架構非常簡單,producer、broker(kafka)和consumer都可以有多個。Producer,consumer實現Kafka註冊的介面,資料從producer傳送到broker,broker承擔一箇中間快取和分發的作用。broker分發註冊到系統中的consumer。broker的作用類似於快取,即活躍的資料和離線處理系統之間的快取。客戶端和伺服器端的通訊,是基於簡單,高效能,且與程式語言無關的TCP協議。
Kafka基本概念:
- Topic:特指Kafka處理的訊息源(feeds of messages)的不同分類。
- Partition:Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id(offset)。
- Message:訊息,是通訊的基本單位,每個producer可以向一個topic(主題)釋出一些訊息。
- Producers:訊息和資料生產者,向Kafka的一個topic釋出訊息的過程叫做producers。
- Consumers:訊息和資料消費者,訂閱topics並處理其釋出的訊息的過程叫做consumers。
- Broker:快取代理,Kafa叢集中的一臺或多臺伺服器統稱為broker。
Kafka訊息傳送的流程:
Kafka-Message
下面是PHP生產、消費Kafka訊息的例子(假設已經配置好Kafka):
1.從zookeeper原始碼src/c/src安裝zookeeper c client
cd zookeeper-3.4.8/src/c
./configure
make && make install
2.編譯php libzookper擴充套件
git clone https://github.com/Timandes/libzookeeper.git
cd libzookeeper
phpize
./configure --with-libzookeeper=/usr/local/bin/cli_mt
make && make install
3.編譯php zookeeper擴充套件
git clone https://github.com/andreiz/php-zookeeper.git
cd php-zookeeper
phpize
./configure
make && make install
4.修改php.ini配置,新增libzookeeper和php-zookeeper擴充套件
extension=libzookeeper.so
extension=zookeeper.so
PHP處理Kafka訊息:
1.啟動zookeeper和kafka
kafka_2.11-0.10.0.0/bin/zookeeper-server-start.sh --daemon
kafka_2.11-0.10.0.0/config/zookeeper.properties
kafka_2.11-0.10.0.0/bin/kafka-server-start.sh kafka_2.11-0.10.0.0/config/server.properties
2.建立由2個partition組成的、名為testtopic的topic
kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic testtopic
3.composer安裝nmred/kafka-php
composer require "nmred/kafka-php"
4.producer.php程式碼
<?php
require_once('./vendor/autoload.php');
$produce = \Kafka\Produce::getInstance('localhost:2181', 3000);
$produce->setRequireAck(-1);
$topicName = 'testtopic';
//獲取到topic下可用的partitions
$partitions = $produce->getAvailablePartitions($topicName);
$partitionCount = count($partitions);
$count = 1;
while(true){
$message = json_encode(array('uid' => $count, 'age' => $count%100, 'datetime' => date('Y-m-d H:i:s')));
//傳送訊息到不同的partition
$partitionId = $count%$partitionCount;
$produce->setMessages('testtopic', $partitionId, array($message));
$result = $produce->send();
var_dump($result);
$count++;
echo "producer sleeping\n";
sleep(1);
}
5.consumer.php程式碼
<?php
require_once('./vendor/autoload.php');
//獲取需要處理的partitionId
$partitionId = isset($argv[1]) ? intval($argv[1]) : 0;
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('test-consumer-group');
$consumer->setPartition('testtopic', $partitionId);
$consumer->setFromOffset(true);
$consumer->setMaxBytes(102400);
while(true){
$topic = $consumer->fetch();
foreach ($topic as $topicName => $partition) {
foreach ($partition as $partId => $messageSet) {
foreach ($messageSet as $message) {
var_dump($message);
}
}
}
echo "consumer sleeping\n";
sleep(1);
}
6.執行php程式碼
在3個終端介面分別執行
php producer.php
php consumer.php 0
php consumer.php 1
7.結果
兩個consumer指令碼依次收到producer傳送的訊息
php-kafka-consumer-output
轉自:https://aiddroid.com/kafka-introduction-and-php-kafka-usage/
作者:daos
連結:https://www.jianshu.com/p/b9d06f33f060
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。