滴滴雲部署 ZooKeeper + Kafka
Kafka 是一種分散式的流處理平臺,流處理平臺有以下三個特點:
- 釋出和訂閱流記錄,類似於訊息佇列和企業訊息系統。
- 流記錄的儲存具有容錯性。
- 實時處理流記錄。
Kafka 廣泛應用於以下兩方面:
- 為系統和應用之間的資料可靠傳輸建立實時的流式資料通道。
- 為傳輸或響應資料流建立實時的流式應用。
Kafka 以叢集的方式執行在一臺或跨機房的多臺伺服器上,儲存在 Kafka 叢集上的流記錄用不同的 topic 進行分類,每一條記錄包含一個 key(鍵),一個 value(值)和一個 timestamp(時間戳)。
ZooKeeper 是一個針對大型分散式系統的可靠的協調系統,提供的功能包括命名服務、配置維護、分散式同步、組服務等。ZooKeeper 作為一個分散式的服務框架,主要用來解決分散式叢集中應用系統的一致性問題。
本文中 Kafka 正是利用了 ZooKeeper 的協調作用,管理、協調 Kafka 例項(broker)。每個 Kafka 例項都通過 ZooKeeper 協調其它 Kafka 例項。
當 Kafka 系統中新增了例項或者某個代理故障失效時,ZooKeeper 服務將通知訊息記錄的生產者和消費者。生產者和消費者據此開始與其它例項協調工作。
本例叢集架構如下:
此處我們使用的是滴滴雲主機內網 IP,在本文的 Java 示例需要外部訪問 Kafka,需要繫結公網 IP 即 EIP。有關滴滴雲 EIP 的使用請參考以下鏈:https://help.didiyun.com/hc/kb/section/1035272/
部署 ZooKeeper
以下操作在三臺節點進行。
登入滴滴雲虛擬主機 DC2,滴滴雲 DC2 預設登陸使用者 dc2-user,本文部署過程將使用 root,輸入 sudo su 命令切換到 root使用者,進入 /usr/local 資料夾下載 ZooKeeper 和 JDK。
sudo su cd /usr/local wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u191-b12/2787e4a523244c269598db4e85c51e0c/jdk-8u191-linux-x64.tar.gz tar -zxf jdk-8u191-linux-x64.tar.gz
解壓 JDK 並配置 Java 環境變數:
tar zxvf jdk-8u191-linux-x64.tar.gz
在 /etc/profile 檔案末尾新增以下內容:
vi /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_191
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
使環境變數生效:
source /etc/profile
輸入 java -version,如果看到以下輸出說明 Java 環境變數配置成功:
java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
安裝配置 ZooKeeper:
tar zxvf zookeeper-3.4.12.tar.gz
cd zookeeper-3.4.12/conf
vi zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
server.101=zk101:2888:3888
server.102=zk102:2888:3888
server.103=zk103:2888:3888
建立配置檔案中的 dataDir 目錄和 myid 檔案:
mkdir /tmp/zookeeper
touch /tmp/zookeeper/myid
echo 101 > /tmp/zookeeper/myid #此處的101來源於配置檔案中server.101,例外兩臺節點分別為102,103
將三臺節點的主機名與 IP 對映新增到 /etc/hosts 檔案中:
vi /etc/hosts
10.254.116.249 zk101
10.254.125.48 zk102
10.254.237.61 zk103
分別在三臺節點啟動 ZooKeeper:
/usr/local/zookeeper-3.4.12/bin/zkServer.sh start
檢視 ZooKeeper 是否啟動成功:
jps
5249 QuorumPeerMain #此程序為zookeeper叢集的啟動入口類
5864 Jps
在三臺節點分別檢視 ZooKeeper 角色,一個 leader 兩個 follower:
/usr/local/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower
/usr/local/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower
/usr/local/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: leader
關於 ZooKeeper 的選舉機制可以參考以下連結:https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection
至此 ZooKeeper 啟動成功。
部署 Kafka
本例使用兩個節點部署 Kafka,在 zk101 和 zk102 解壓 Kafka:
cd /usr/local
tar zxvf kafka_2.12-0.10.2.1.tgz
cd kafka_2.12-0.10.2.1/config
編輯 server.properties:
vi server.properties
auto.create.topics.enable=false
broker.id=101 #此處為zk101的id
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
listeners=PLAINTEXT://zk101:9092
advertised.listeners=PLAINTEXT://116.85.55.205:9092 #此處為zk101的公網IP
hostname=zk101
advertised.host.name=116.85.55.205 #此處為zk101的公網IP
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=6
num.recovery.threads.per.data.dir=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk101:2181,zk102:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
zk102 的配置檔案更改相應 IP 即可。
在 zk101 和 zk102 上啟動 Kafka:
nohup /usr/local/kafka_2.12-0.10.2.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-0.10.2.1/config/server.properties &
jps
24994 QuorumPeerMain
28675 Kafka #Kafka已經啟動
28974 Jps
測試 Kafka
以 zk101 作為 producer,建立 topic:
/usr/local/kafka_2.12-0.10.2.1/bin/kafka-topics.sh --create --zookeeper zk101:2181 --replication-factor 2 --partitions 1 --topic test
Created topic "test".
同樣在 zk101 上建立一個生產者:
/usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-producer.sh --broker-list zk101:9092 --topic test
在 zk102 上建立一個消費者:
/usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-consumer.sh --zookeeper zk102:2181 --topic test --from-beginning
在 zk101 上輸入內容,可以在 zk102 上看到:
zk101
[[email protected] kafka_2.12-0.10.2.1]# /usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-producer.sh --broker-list zk101:9092 --topic test
hello world
this is hello from didiyun
zk102
[[email protected] config]# /usr/local/kafka_2.12-0.10.2.1/bin/kafka-console-consumer.sh --zookeeper zk102:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
hello world
this is hello from didiyun
Kafka 測試程式碼(Java)
下面的 Java 測試程式碼中會用到 zk101 節點的 EIP 和對應的 9092 埠,因此要在安全組中開啟 9092 埠,有關安全組的使用請參照以下連結:https://help.didiyun.com/hc/kb/article/1091031/
建立 mave 專案,編輯 pom.xml,解決依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
生產者程式碼 ProducerTest.java:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerTest {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "116.85.55.205:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 20; i++) {
String msg = "This is hello Message " + i;
producer.send(new ProducerRecord<String, String>("test2", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
消費者程式碼 Consumer.java:
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
public class Consumer {
public static void main(String[] s) {
Properties props = new Properties();
props.put("bootstrap.servers", "116.85.55.205:9092");
props.put("group.id", "1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("test2"));
System.out.println("poll start...");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
record.offset(), record.key(), record.value());
System.out.println(record.value());
}
}
}
先執行 Consumer:
執行 ProducerTest,傳送訊息:
在 Consumer 上可以收到訊息:
參考連結:
https://www.cnblogs.com/skying555/p/7873345.html
https://kafka.apache.org/intro
https://zookeeper.apache.org/