大資料叢集搭建和使用之八——kafka配置和使用
這個系列指南使用真實叢集搭建環境,不是偽叢集,用了三臺騰訊雲伺服器
或者訪問我的個人部落格站點,連結
Kafka
配置
kafka依賴zookeeper,所以先確保叢集已經安裝zookeeper並且能夠正常啟動。
浪費了一整天的時間debug結果bug很簡單(至少現在叢集沒有崩潰)
建立目錄樹 /opt/kafka/kafka2.12
在/root/kafka/kafka-logs/logs建立一個用於存放日誌的檔案
配置環境變數/etc/profile,新增bin目錄
修改配置檔案kafka/config/server.properties
1. 修改broker.id id和zookeeper的myid一致
2. 修改zookeeper.connect和zookeeper.connection.timeout.ms
zookeeper.connect=master:2181,slave1:2181,slave2:2181
zookeeper.connection.timeout.ms=6000
3. 修改logdir(記得必須先建立檔案,kafka不會自己建立資料夾)
4. 修改兩處listener(vim使用/listener查詢),手動新增hostname(例如master,slave1,slave2),其實按照文件,只需要修改一處即可。
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://master:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://master:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/root/kafka/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
啟動
cd /${KAFKA_HOME}
bin/kafka-server-start.sh config/server.properties -daemon > /root/kafka/kafka-logs/logs &
指定日誌的存放地點為/root/kafka/kafka-logs/logs
使用jps命令檢視kafka是否配置成功。
記一次伺服器被黑客攻擊
起因:kafka啟動總是異常(kafka程序啟動一兩分鐘後自動退出),日誌卻沒有記錄
- [ ] 系統執行情況檢視工具top,關於top的介紹點這裡
- [ ] 重新配置kafka,檢查各個配置項,仍然宕機
- [ ] 重新配置與kafka相關聯的zookeeper,仍然宕機
- [ ] 關閉不需要的程序,例如hbase,yarn,storm,hdfs,重啟kafka,仍然宕機
- [ ] 重啟伺服器,重新開啟各個程序,順序為hdfs,yarn,zookeeper,hbase,storm,kafka,仍然宕機
- [ ] 重啟伺服器,更換kafka版本,仍然宕機
- [ ] 發現kafka總是宕機的伺服器cpu佔用幾乎100%,而master卻正常,100%的cpu佔用由java程序貢獻,具體執行任務未知。
- [ ] google發現,hadoop叢集有cpu佔用過高的風險,參考這個連結,懷疑datanode導致cpu佔用過高,原因是linux核心記憶體申請優化對hadoop的副作用。
- [ ] 關閉hadoop相關程序(只剩jps)cpu佔用仍然是100%,但是master節點cpu正常,100%的cpu佔用由java程序貢獻,具體執行任務未知。
- [ ] 嘗試啟動kafka,master正常,slave失敗,slave節點cpu佔用仍然是100%
- [ ] 重啟所有叢集伺服器,實時檢測cpu動態(top命令),先啟動zookeeper,正常,再啟動kafka,所有節點正常
- [ ] 挨個啟動其他服務。hdfs無影響,yarn無影響,zookeeper無影響,hive無影響,hbase啟動後cpu飆升100%,導致kafka宕機,具體導致宕機的程序為regionserver。
- [ ] 啟動除了hbase其他所有服務,一切正常。原因未知。
- [ ] 第二天一早伺服器宕機
- [ ] 百度重新配置了yarn框架引數
- [ ] 重啟伺服器仍然宕機
- [ ] 參考這裡和這裡的連結有理由相信雲伺服器被黑了。
- [ ] 暫時的解決方案
- [ ] 諮詢騰訊雲客服,修改了安全組配置,關閉了8088埠。
- [ ] 所有元件執行正常,之前懷疑是hbase的原因是,病毒檔案的執行需要一定的時間,而在這段時間裡,我剛好啟動了hbase,也有可能是病毒檔案需要依賴hbase作為資料庫?
使用KAFKA
基本概念
- kafka是一個分散式的訊息快取系統
- kafka叢集中的伺服器叫做broker
- kafka有兩種客戶端,producer(訊息生產者),consumer(訊息消費者),客戶端(兩種)與kafka伺服器之間使用tcp通訊
- kafka中不同業務系統的訊息可以通過topic進行區分,而且每一個訊息topic都會被分割槽,以分擔訊息讀寫的負載
- 每一個分割槽可以有多個副本,防止資料的丟失
- 如果某個分割槽中的資料需要更新,必須通過該分割槽所有副本中的leader來更新
- 消費者可以分組,比如有兩個消費者AB,共同消費一個topic:testTopic,AB所消費的訊息不會重複,比如testTopic中有100個訊息,編號為0-99,如果A消費0-49,那麼B就消費50-99。消費者在消費時可以指定訊息的起始偏移量
kafka架構圖:
producer是資料來源,比如flume架構,consumer是資料的輸出,例如storm架構。kafka伺服器支援訊息的分主題、分割槽。不同的子系統可以使用不同的主題。分割槽的意義在於負載均衡。
kafka shell
- 建立話題
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic mytopics
建立的話題名稱是有要求的Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
如果在zookeeper裡指定了kafka的目錄,例如/kafka,那麼在用shell進行topic操作的時候,需要指定被操作的topic所屬的zookeeper目錄,例如bin/kafka-topics.sh –create –zookeeper master:2181
/kafka
–replication-factor 3 –partitions 1 –topic mytopics。(因為kafka的叢集化是歸zookeeper管的
)列出當前話題
bin/ kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
- 刪除話題
kafka-topics.sh --delete --zookeeper master:2181 --topic mytopics
注意由控制檯的提示:Note: This will have no impact if delete.topic.enable is not set to true.
可知,需要修改一下server.properties檔案,在最後一行加上delete.topic.enable=true
- 建立一個生產者
kafka-console-producer.sh --broker-list master:9092 --topic t_test
- 建立一個消費者
kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic t_test
- 檢視話題狀態資訊
kafka-topics.sh --describe --zookeeper master:2181 --topic t_test
isr表示現在處於同步狀態的broker,如果殺掉某一臺伺服器,例如殺掉leader:0的伺服器,也就是0號伺服器:master中的kafka程序。執行kill -9 pid
kafka會立即進行容災處理,同時,生產和消費並不受影響。
再次恢復kafka程序,三臺伺服器又會立即同步。
kafka java demo
推薦使用maven來構建專案,如果沒有使用maven,匯入kafka壓縮包裡的libs中的jar包即可
生產者樣例
配置說明
- bootstrap.servers: kafka的地址。
- acks:訊息的確認機制,預設值是0。
- acks=0 :如果設定為0,生產者不會等待kafka的響應。
- acks=1 :這個配置意味著kafka會把這條訊息寫到本地日誌檔案中,但是不會等待叢集中其他機器的成功響應。
- acks=all :這個配置意味著leader會等待所有的follower同步完成。這個確保訊息不會丟失,除非kafka叢集中所有機器掛掉。這是最強的可用性保證。
- retries:配置為大於0的值的話,客戶端會在訊息傳送失敗時重新發送。
- batch.size:當多條訊息需要傳送到同一個分割槽時,生產者會嘗試合併網路請求。這會提高client和生產者的效率。
- key.serializer: 鍵序列化,預設org.apache.kafka.common.serialization.StringDeserializer。
- value.deserializer:值序列化,預設org.apache.kafka.common.serialization.StringDeserializer。
新增完配置之後,producer就可以生產資料,使用producer.send()方法。傳入的引數為topic,key,value。如果topic在kafka叢集中還沒有被建立,那麼便會自動建立一個新的topic(新建的topic各個屬性我不知道)
package cn.colony.cloudhadoop.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class ProducerDemo {
public static void main(String[] args) throws InterruptedException{
Properties props = new Properties();//配置項
props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");//使用新的API指定kafka叢集位置
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
String messageStr = null;
for (int i = 1;i<1000;i++){
Thread.sleep(50);
messageStr = "hello, this is "+i+"th message";
producer.send(new ProducerRecord<String, String>("t_topic","Message",messageStr));
}
producer.close();
}
}
消費者樣例
配置說明
- bootstrap.servers: kafka的地址。
- group.id:組名 不同組名可以重複消費。例如你先使用了組名A消費了kafka的1000條資料,但是你還想再次進行消費這1000條資料,並且不想重新去產生,那麼這裡你只需要更改組名就可以重複消費了。
- enable.auto.commit:是否自動提交,預設為true。
- auto.commit.interval.ms: 從poll(拉)的回話處理時長。
- session.timeout.ms:超時時間。
- max.poll.records:一次最大拉取的條數。
- auto.offset.reset:消費規則,預設earliest 。
- earliest: 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。
- latest: 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料 。
- none: topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常。
- key.serializer: 鍵序列化,預設org.apache.kafka.common.serialization.StringDeserializer。
- value.deserializer:值序列化,預設org.apache.kafka.common.serialization.StringDeserializer。
首先訂閱一個topic,consumer就可以開始消費資料。
package cn.colony.cloudhadoop.kafka;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ConsumerDemo implements Runnable{
private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private static final String GROUDID = "groupA";
public ConsumerDemo(String topicName){
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
props.put("group.id", GROUDID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run(){
int messageNum = 1;
try{
for (;;){
msgList = consumer.poll(500);
if (msgList!=null && msgList.count()>0){
for (ConsumerRecord<String, String> record : msgList){
if (messageNum % 50 ==0){
System.out.println(messageNum+"=receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
}
if (messageNum % 1000 == 0)
break;
messageNum++;
}
}
else{
Thread.sleep(1000);
}
}
}
catch (InterruptedException e){
e.printStackTrace();
}
finally{
consumer.close();
}
}
public static void main(String[] args){
ConsumerDemo demo = new ConsumerDemo("t_topic");
Thread thread = new Thread(demo);
thread.start();
}
}
執行說明
在eclipse中使用兩個控制檯檢視輸出,由於先前的配置,可以在本地通過程式碼來監測雲伺服器叢集中的執行情況。生產者生產出的訊息可以被消費者消費。
兩個控制檯分別對應不同Java程式輸出的方法點這裡