1. 程式人生 > >kafka安裝及Kafka-PHP擴充套件的使用

kafka安裝及Kafka-PHP擴充套件的使用

實話說,如果用於佇列的話,跟PHP比較配的,還是Redis。用的順手,呵呵,只是Redis不能有多個consumer。但Kafka官方對PHP不支援,PHP擴充套件是愛好者或使用者寫的。下面就開始講Kafka的安裝吧。我以CentOS6.4為例,64位。

一. 首先確認下jdk有沒有安裝

使用命令

[[email protected] ~]# java -version
java version "1.8.0_73"
Java(TM) SE Runtime Environment (build 1.8.0_73-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.73
-b02, mixed mode)

如果有以上資訊的話,就往下安裝吧,有些可能是jdk對不上,那就裝到對的上的。如果沒有安裝,就看一下下面的jdk安裝方法:

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

到這個地址下載jdk8版本,我下載的是jdk-8u73-linux-x64.tar.gz,然後解壓到/usr/local/jdk/下。

然後開啟/etc/profile檔案

[[email protected] ~]# vim /etc/profile

把下面這段程式碼寫到檔案裡

export JAVA_HOME=/usr/local/jdk/jdk1.8
.0_73 export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar export PATH=$JAVA_HOME/bin:$PATH

最後

[[email protected] ~]# source /etc/profile

這時jdk就生效了,可以使用 java -version驗證下。

二. 接下來安裝Kafka

1. 下載Kafka

到http://kafka.apache.org/downloads.html下載相應的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz

wget http://apache.fayea.com/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz

2. 下載完解壓到你喜歡的目錄

我是解壓到 /usr/local/kafka/kafka_2.9.1-0.8.2.2

3. 執行預設的Kafka

啟動Zookeeper server

[[email protected] kafka_2.9.1-0.8.2.2]# sh bin/zookeeper-server-start.sh config/zookeeper.properties &

啟動Kafka server

[[email protected] kafka_2.9.1-0.8.2.2]# sh bin/kafka-server-start.sh config/server.properties &

執行生產者producer

[[email protected] kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

執行消費者consumer

[[email protected] kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

這樣,在producer那邊輸入內容,consumer馬上就能接收到。

4. 當有跨機的producer或consumer連線時

需要配置config/server.properties的host.name,要不然跨機的連不上。

三. Kafka-PHP擴充套件

使用了一圈,就https://github.com/nmred/kafka-php可以用。

我是使用composer安裝的,以下是示例:

producer.php

複製程式碼
<?php
require 'vendor/autoload.php';

while (1) {
    $part = mt_rand(0, 1);
    $produce = \Kafka\Produce::getInstance('kafka0:2181', 3000);
    // get available partitions
    $partitions = $produce->getAvailablePartitions('topic_name');
    var_dump($partitions);
    // send message
    $produce->setRequireAck(-1);
    $produce->setMessages('topic_name', 0, array(date('Y-m-d H:i:s'));
   
    sleep(3);
}
複製程式碼

consumer.php

複製程式碼
require 'vendor/autoload.php';

$consumer = \Kafka\Consumer::getInstance('kafka0:2181');
$group = 'topic_name';
$consumer->setGroup($group);
$consumer->setFromOffset(true);
$consumer->setTopic('topic_name', 0);
$consumer->setMaxBytes(102400);
$result = $consumer->fetch();
print_r($result);
foreach ($result as $topicName => $partition) {
    foreach ($partition as $partId => $messageSet) {
    var_dump($partition->getHighOffset());
        foreach ($messageSet as $message) {
            var_dump((string)$message);
        }
    var_dump($partition->getMessageOffset());
    }
}
複製程式碼

http://www.cnblogs.com/imarno/p/5198940.html