Kafka 單機和分散式環境搭建與案例使用
實驗環境:
1、Ubuntu Server 16.04
2、kafka_2.11-0.11.0.0
一、單機環境搭建
官方參考文章:
1、下載和解壓安裝包
這裡下載了zookeeper和kafaka兩個安裝包,下載地址:
2、啟動Zookeeper服務
這裡的kafka預設是由內建的zookeeper的,如果使用內建的zookeeper的話,啟動的方式如下:
zookeeper的配置檔案是在:/kafka_2.12-0.11.0.0/config 目錄下
啟動Zookeeper:
>bin/zookeeper-server-start.sh config/zookeeper.properties
當看到如下資訊的時候,就表示成功了!
3、啟動Kafka
kafka的配置檔案是在/kafka_2.12-0.11.0.0/config 目錄下,預設情況下不需要修改。
>bin/kafka-server-start.sh config/server.properties
4、建立一個Topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
–replication-factor 複製因子為1;
–partitions 分割槽為1;
檢視已建立的Topic:
5、傳送測試訊息
kafka支援從Console傳送資訊,消費者從Console接受資訊。
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
–broker-list 表示代理伺服器的列表,這裡只有一個;
建立一個消費者:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
–from-beginning 表示從訊息開始處讀取;
然後在生產者的Console輸入資料,消費者的Console就可以看到資訊:
二、偽叢集環境搭建
官方提供了一種方式在一臺機器上啟動多個Broker機器構成multi-broker cluster,這是一種偽叢集的方式,下邊就配置一下。
1、修改配置檔案
思路是配置多個config/server.properties檔案,修改其中的broker.id=1
和埠號,日誌檔案位置。
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
編輯配置檔案,修改如下對應的位置:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
2、分別啟動另外兩個Kafka
>bin/kafka-server-start.sh config/server-1.properties &
>>bin/kafka-server-start.sh config/server-2.properties &
&表示在後臺執行
3、檢視執行結果:
QuorumPeerMain表示Zookeeper進行;
另外有3個Kafka程序;
4、建立Topic
新建一個複製因子為3的Topic
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
檢視Topic的描述資訊:
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
5、傳送訊息
啟動生產者,這裡有3個Kafka例項,但是–broker-list 仍是啟動的Zookeeper服務。
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
啟動消費者:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
和單機的情況是一樣的。
三、分散式叢集環境搭建
搭建的分散式叢集和偽叢集的方式大致相同,這裡假設使用3臺伺服器模擬實驗,部署3個Zookeeper例項和3個Kafka例項,當然也可以直接部署一個Zookeeper例項,這裡只是演示分散式Zookeeper和kafka的搭建。
工具使用的是SecureCRT。
1、分散式Zookeeper的搭建
(1)將Zookeeper安裝包分別上傳到3臺伺服器,我的是放在:/home/xuliugen/server
目錄下。
(2)配置第一臺Zookeeper
複製zookeeper-3.4.10/conf/zoo_sample.cfg
為 zookeeper-3.4.10/conf/zoo.cfg
,修改zoo.cfg檔案如下,只更改data的目錄:
因為,修改了dataDir目錄的位置,那麼就需要建立一個/zookeeper-3.4.6/data
目錄。
(3)按同樣的方式修改第二臺Zookeeper和第三臺Zookeeper伺服器配置。
(4)然後,在每一臺Zookeeper的配置檔案中的最下邊新增Zookeeper的叢集配置:
(5)最後建立每一個Zookeeper的 myid
檔案,在/data/myid
檔案
xuliugen@xuliugen-pc:~/server/zookeeper-3.4.6/data$ echo 1 > myid
則,另外兩臺分別為:
xuliugen@xuliugen-pc:~/server/zookeeper-3.4.6/data$ echo 2 > myid
xuliugen@xuliugen-pc:~/server/zookeeper-3.4.6/data$ echo 3 > myid
注意:
1、myid和IP地址的對應
server.1=
server.2=
server.3=
這裡的1、2、3是和我們剛才配置的myid的數值是相對應的,即1的IP地址為192.168.1.120,那麼server.1=192.168.1.120:2888:3888
2、防火牆埠的配置
另外,2888:3888埠要設定防火牆許可權
2、啟動Zookeeper伺服器
依此使用命令./bin/zkServer.sh start
啟動Zookeeper服務。
使用jps
檢視是否已經啟動
檢視zookeeper日誌的話,是在/zookeeper-3.4.6/bin
目錄下的zookeeper.out
檔案:
使用tailf zookeeper.out
可以進行檢視。
3、分散式Kafka的搭建
(1)將Kafka安裝包分別上傳到3臺伺服器,我的是放在:/home/xuliugen/server
目錄下。
(2)配置第一臺Kafka
Kafka的配置檔案是在/conf/server.properties
,修改日誌的目錄:
配置主機IP或者hostname:
然後修改kafka中使用的Zookeeper叢集地址:
多個Zookeeper之間以英文逗號分開。
注意:
這裡需要注意的是,如果按照上述的方式配置:
listeners=PLAINTEXT://192.168.1.120:9092
這樣配置的話,是在內網環境下允許的,如果使用外網進行訪問的話,可以配置為如下:
(3)按同樣的方式配置第二臺kafka和第三臺kafka伺服器。
要注意的是不同的kafka的broker.id
一定要不一樣,我這裡分別配置的是0、1、2。
4、分別啟動Kafka服務
>bin/kafka-server-start.sh config/server.properties
四、程式碼測試
1、專案結構
2、pom檔案內容
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
</dependencies>
3、日誌配置log4j.properties
log4j.rootLogger=DEBUG,rolling,errlog,stdout
#stdout log
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} [%-5p] %c{1}.%M:%L-[%X{traceId}]-%m%n
#common log
log4j.appender.rolling=org.apache.log4j.DailyRollingFileAppender
log4j.appender.rolling.File=${catalina.base}/logs/kafka-demo.log
log4j.appender.rolling.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.ConversionPattern=%d{HH:mm:ss.SSS} [%-5p] %-20c{1} [%t]%x [%X{traceId}]-%m%n
#error log
log4j.appender.errlog=org.apache.log4j.DailyRollingFileAppender
log4j.appender.errlog.Threshold=ERROR
log4j.appender.errlog.File=${catalina.base}/logs/error.log
log4j.appender.errlog.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.errlog.layout=org.apache.log4j.PatternLayout
log4j.appender.errlog.layout.ConversionPattern=%d{MM-dd HH:mm:ss.SSS} [%-5p] %-20c{1} [%.11t] [%X{traceId}]%x-%m%n
3、生產者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerDemo {
// Topic
private static final String topic = "kafkaTopic";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.120:9092,192.168.1.135:9093,192.168.1.227:9094");
props.put("acks", "0");
props.put("group.id", "1111");
props.put("retries", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//生產者例項
KafkaProducer producer = new KafkaProducer(props);
int i = 1;
// 傳送業務訊息
// 讀取檔案 讀取記憶體資料庫 讀socket埠
while (true) {
Thread.sleep(1000);
producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));
System.out.println("key:" + i + " " + "value:" + i);
i++;
}
}
}
4、消費者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
private static final String topic = "kafkaTopic";
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.120:9092,192.168.1.135:9093,192.168.1.227:9094");
props.put("group.id", "1111");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
5、測試結果
生產者:
消費者:
也可以到官網下載Kafka的原始碼包,包裡邊有example程式碼可以參考