1. 程式人生 > >滴滴雲部署 ZooKeeper + Kafka

滴滴雲部署 ZooKeeper + Kafka

Kafka 是一種分散式的流處理平臺,流處理平臺有以下三個特點:

  1. 釋出和訂閱流記錄,類似於訊息佇列和企業訊息系統。
  2. 流記錄的儲存具有容錯性。
  3. 實時處理流記錄。

Kafka 廣泛應用於以下兩方面:

  1. 為系統和應用之間的資料可靠傳輸建立實時的流式資料通道。
  2. 為傳輸或響應資料流建立實時的流式應用。

Kafka 以叢集的方式執行在一臺或跨機房的多臺伺服器上,儲存在 Kafka 叢集上的流記錄用不同的 topic 進行分類,每一條記錄包含一個 key(鍵),一個 value(值)和一個 timestamp(時間戳)。

ZooKeeper 是一個針對大型分散式系統的可靠的協調系統,提供的功能包括命名服務、配置維護、分散式同步、組服務等。ZooKeeper 作為一個分散式的服務框架,主要用來解決分散式叢集中應用系統的一致性問題。

本文中 Kafka 正是利用了 ZooKeeper 的協調作用,管理、協調 Kafka 例項(broker)。每個 Kafka 例項都通過 ZooKeeper 協調其它 Kafka 例項。

當 Kafka 系統中新增了例項或者某個代理故障失效時,ZooKeeper 服務將通知訊息記錄的生產者和消費者。生產者和消費者據此開始與其它例項協調工作。

本例叢集架構如下:

在這裡插入圖片描述

此處我們使用的是滴滴雲主機內網 IP,在本文的 Java 示例需要外部訪問 Kafka,需要繫結公網 IP 即 EIP。有關滴滴雲 EIP 的使用請參考以下鏈:https://help.didiyun.com/hc/kb/section/1035272/

部署 ZooKeeper

以下操作在三臺節點進行。
登入滴滴雲虛擬主機 DC2,滴滴雲 DC2 預設登陸使用者 dc2-user,本文部署過程將使用 root,輸入 sudo su 命令切換到 root使用者,進入 /usr/local 資料夾下載 ZooKeeper 和 JDK。

sudo su
cd /usr/local
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz

wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u191-b12/2787e4a523244c269598db4e85c51e0c/jdk-8u191-linux-x64.tar.gz
tar -zxf jdk-8u191-linux-x64.tar.gz

解壓 JDK 並配置 Java 環境變數:

tar zxvf jdk-8u191-linux-x64.tar.gz

在 /etc/profile 檔案末尾新增以下內容:

vi /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_191
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

使環境變數生效:

source /etc/profile

輸入 java -version,如果看到以下輸出說明 Java 環境變數配置成功:

java -version

java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)

安裝配置 ZooKeeper:

tar zxvf zookeeper-3.4.12.tar.gz
cd  zookeeper-3.4.12/conf
vi zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
server.101=zk101:2888:3888
server.102=zk102:2888:3888
server.103=zk103:2888:3888

建立配置檔案中的 dataDir 目錄和 myid 檔案:

mkdir /tmp/zookeeper
touch /tmp/zookeeper/myid
echo 101 > /tmp/zookeeper/myid   #此處的101來源於配置檔案中server.101,例外兩臺節點分別為102,103

將三臺節點的主機名與 IP 對映新增到 /etc/hosts 檔案中:

vi /etc/hosts

10.254.116.249	zk101
10.254.125.48	zk102
10.254.237.61	zk103

分別在三臺節點啟動 ZooKeeper:

/usr/local/zookeeper-3.4.12/bin/zkServer.sh start

檢視 ZooKeeper 是否啟動成功:

jps
5249 QuorumPeerMain   #此程序為zookeeper叢集的啟動入口類
5864 Jps

在三臺節點分別檢視 ZooKeeper 角色,一個 leader 兩個 follower:

/usr/local/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower

/usr/local/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower


/usr/local/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: leader

關於 ZooKeeper 的選舉機制可以參考以下連結:https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection

至此 ZooKeeper 啟動成功。

部署 Kafka

本例使用兩個節點部署 Kafka,在 zk101 和 zk102 解壓 Kafka:

cd /usr/local
tar zxvf kafka_2.12-0.10.2.1.tgz 
cd kafka_2.12-0.10.2.1/config

編輯 server.properties:

vi server.properties

auto.create.topics.enable=false
broker.id=101       #此處為zk101的id
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
listeners=PLAINTEXT://zk101:9092
advertised.listeners=PLAINTEXT://116.85.55.205:9092  #此處為zk101的公網IP

hostname=zk101
advertised.host.name=116.85.55.205   #此處為zk101的公網IP
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=6
num.recovery.threads.per.data.dir=1

log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000


zookeeper.connect=zk101:2181,zk102:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

zk102 的配置檔案更改相應 IP 即可。

在 zk101 和 zk102 上啟動 Kafka:

nohup /usr/local/kafka_2.12-0.10.2.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-0.10.2.1/config/server.properties &

jps
24994 QuorumPeerMain
28675 Kafka         #Kafka已經啟動
28974 Jps

測試 Kafka

以 zk101 作為 producer,建立 topic:

/usr/local/kafka_2.12-0.10.2.1/bin/kafka-topics.sh --create --zookeeper zk101:2181 --replication-factor 2 --partitions 1 --topic test
Created topic "test".

同樣在 zk101 上建立一個生產者:

/usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-producer.sh --broker-list zk101:9092 --topic test

在 zk102 上建立一個消費者:

/usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-consumer.sh --zookeeper zk102:2181 --topic test --from-beginning

在 zk101 上輸入內容,可以在 zk102 上看到:

zk101

[[email protected] kafka_2.12-0.10.2.1]# /usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-producer.sh --broker-list zk101:9092 --topic test
hello world
this is hello from didiyun

zk102
[[email protected] config]# /usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-consumer.sh --zookeeper zk102:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
hello world
this is hello from didiyun

Kafka 測試程式碼(Java)

下面的 Java 測試程式碼中會用到 zk101 節點的 EIP 和對應的 9092 埠,因此要在安全組中開啟 9092 埠,有關安全組的使用請參照以下連結:https://help.didiyun.com/hc/kb/article/1091031/

建立 mave 專案,編輯 pom.xml,解決依賴:

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
</dependency>
<dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-nop</artifactId>
        <version>1.7.2</version>
</dependency>

生產者程式碼 ProducerTest.java:

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerTest {
    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "116.85.55.205:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<String, String>(properties);

            for (int i = 0; i < 20; i++) {
                String msg = "This is hello Message " + i;
                producer.send(new ProducerRecord<String, String>("test2", msg));
                System.out.println("Sent:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            producer.close();
        }

    }

}

消費者程式碼 Consumer.java:

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
public class Consumer {
    public static void main(String[] s) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "116.85.55.205:9092");
        props.put("group.id", "1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("test2"));
        System.out.println("poll start...");
        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(100);
           
            for (ConsumerRecord<String, String> record : records)
               record.offset(), record.key(), record.value());
                System.out.println(record.value());
        }
    }
}

先執行 Consumer:
在這裡插入圖片描述
執行 ProducerTest,傳送訊息:
在這裡插入圖片描述
在 Consumer 上可以收到訊息:
在這裡插入圖片描述

參考連結:
https://www.cnblogs.com/skying555/p/7873345.html
https://kafka.apache.org/intro
https://zookeeper.apache.org/