Kafka筆記二之Topic操作,檔案引數配置
以下的例子我只啟動了一個shb01,沒有加入139
主題Topic的一般操作(增刪改查),通過指令碼kafka-topics.sh來執行
建立
[[email protected] bin]# kafka-topics.sh --create--topic hello --zookeeper shb01:2181 --partition 2 --replication-factor 1
Created topic "hello".
--partition 2表示分割槽
--replication-factor 1表示副本因子,之前說過broker無主副之分但是partition有主副之分,所以此引數的值不能大於broker數就是kafka服務節點數,我的例子中只有一個shb01節點所以如果指定2會報錯的。
查詢
[[email protected] bin]# kafka-topics.sh --list--zookeeper shb01:2181
hello
也可以指定topic
[[email protected] bin]# kafka-topics.sh --list --zookeepershb01:2181 --topic hello
查詢指定topic詳細資訊
[[email protected] bin]# kafka-topics.sh--describe --zookeeper shb01:2181 --topic hello
Topic:hello PartitionCount:2 ReplicationFactor:1 Configs:
Topic: hello Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello Partition: 1 Leader: 0 Replicas: 0 Isr: 0
我只有一個broker,id為0
Leader: 0,Partition的leader所在broker的id
Replicas: 0,Partition的副本的brokerlist
Isr: 0,表示可用的broker的id
查詢topic詳細資訊
[[email protected] bin]# kafka-topics.sh--describe --zookeeper shb01:2181
修改,只能增加partition數量不能減少
[[email protected] bin]# kafka-topics.sh --alter --zookeeper shb01:2181 --topic hello --partition 3
[[email protected] bin]# kafka-topics.sh--describe --zookeeper shb01:2181--topic hello
Topic:hello PartitionCount:3 ReplicationFactor:1 Configs:
Topic: hello Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello Partition: 2 Leader: 0 Replicas: 0 Isr: 0
刪除
[[email protected] bin]# kafka-topics.sh --delete--zookeeper shb01:2181 --topic hello
刪除只是標記刪除,此topic仍然可用,需要修改server.properties檔案中的引數
delete.topic.enable=true開啟刪除然後重啟kafka,此引數在檔案中預設是沒有的需要手動加入
生產者,消費者
Kafka自帶的生產者和消費者只是用來測試
生產者
[[email protected] bin]# kafka-console-producer.sh--broker-list shb01:9092 --topic topic_shb01
--broker-list表示broker地址,多個地址用逗號分開
--topic 表示向那個主題生產訊息
消費者
[[email protected] bin]# kafka-console-consumer.sh--topic topic_shb01 --zookeeper shb01:2181,192.168.79.139:2181 --from-beginning
--topic表示消費那個主題
topic\whitelist\blacklist:
1、 具體的單個topic
[[email protected] bin]# kafka-console-consumer.sh--topic topic_shb01 --zookeeper shb01:2181,192.168.79.139:2181 --from-beginning
2、多個白名單topic字串[逗號隔開]。
--topic t1,t2表示只消費t1,t2主題
[[email protected]]# kafka-console-consumer.sh --whitelist t1,t2 --zookeepershb01:2181,192.168.79.139:2181 --from-beginning
3、多個黑名單topic字串[逗號隔開]。
--topic t1,t2表示除過t1,t2主題之外消費所有主題
[[email protected]]# kafka-console-consumer.sh --blacklist t1,t2 --zookeepershb01:2181,192.168.79.139:2181 --from-beginning
--zookeeper表示kafka叢集的zk地址
--from-beginning表示從前開始,就是啟動消費者之前生產的訊息也可以消費
最後被標記刪除的主題也可以消費
檔案引數配置
broker,server.propertie
1.生產者生產傳送訊息,broker快取資料達到一定閥值或一定時間後就會flush到磁碟中這樣可以減少IO呼叫。在config目錄下的server.properties中有兩個引數用於控制flush
# The number of messages to accept beforeforcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message cansit in a log before we force a flush
#log.flush.interval.ms=1000
2.kafka資料被消費後會依然存在於磁碟中,消費者配合zookeeper讀取資料,通常資料會儲存168小時,可以在server.properties中通過引數進行控制
# The minimum age of a log file to beeligible for deletion
log.retention.hours=168
#log.retention.bytes=1073741824
第一個是168小時,第二個引數是按照大小超過則刪除
log.retention.check.interval.ms=300000
這個是掃描的間隔時間,滿足上面兩個條件(貌似是二選一)則執行刪除
生產者,producer.properties
1.Producer可以指定將訊息傳送給具體的partition.在配置檔案中通過partitioner.class指定自定義分割槽器。包含包名類名
partitioner.class= kafka.ProducerTest
2.通過producer.type=sync引數控制生產者使用非同步或同步方式傳送資料。
非同步或者同步傳送
同步是指:傳送方發出資料後,等接收方發回響應以後才發下一個數據的通訊方式。
非同步是指:傳送方發出資料後,不等接收方發回響應,接著傳送下個數據的通訊方式。
3.生產者採用非同步方式可以提高發送效率,先將資料快取在磁碟然後一次性發送出去。
有兩個引數來控制什麼時候傳送訊息,一個按照時間間隔另一個按照資料大小
# maximum time, in milliseconds, forbuffering data on the producer queue
#queue.buffering.max.ms=
# the maximum size of the blocking queuefor buffering on the producer
#queue.buffering.max.messages=
消費者,consumer.properties
Kafka中每個消費者都屬於一個group且都已一個組id
消費資料
組內:一個消費者可以消費多個partition,但是一個topic下的partition只能被一個group中的一個消費者消費。下圖中group2中有4個消費者但是topic2種只有3個分割槽此時c4處於空閒狀態。此時可以增減topic2的分割槽數來增加消費能力。
組間:每個消費組之間消費同一份資料互不影響。
業務複雜增加topic,資料量大增加分割槽。分割槽越多意味著consumer的消費能力越強。
Message是通訊的基本單位儲存於topic的分割槽中,parttion在儲存層面表現為一個append.log檔案,新的message會直接追加到檔案尾部,每條訊息在檔案中的位置稱為offset(偏移量)這個資訊儲存在zookeeper中,所以消費者在讀取message時需要zk配合。Message有型別大小偏移量3個屬性,其中offset相當於message在分割槽中的id
下面是一個檢視offset的命令,這個例子我在資料中找的,沒有實際操作過
bin/kafka-run-class.shkafka.tools.ConsumerOffsetChecker --zookeeper zkserver1:2181,zkserver2:2181--group consumer_demo_11
kafka不能指定offset進行消費
kafka叢集容錯
kafka叢集中一個broker掛了後zk會選擇讓別的broker提供服務,當有broker被修復後zk會有一個自動的均衡策略讓此broker再次成為topic中partition的leader
topic:hello partition:0 leader:0
topic:hello partition:1 leader:2
topic:hello partition:2 leader:1
此引數指定自動均衡策略,300秒後kafka執行均衡操作
Auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds 預設值:300。
也可以手動執行
bin/kafka-preferred-replica-election.sh--zookeeper zkserver1:2181,zkserver2:2181
相關推薦
Kafka筆記二之Topic操作,檔案引數配置
以下的例子我只啟動了一個shb01,沒有加入139 主題Topic的一般操作(增刪改查),通過指令碼kafka-topics.sh來執行 建立 [[email protected] bin]# kafka-topics.sh --create--topic hel
Kafka筆記三之java操作
maven依賴,我使用的是版本是0.8.22,scala是2.11 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11&l
薛開宇學習筆記二之總結筆記--caffe 中solver.prototxt;train_val.prototxt的一些引數介紹
原文地址:http://blog.csdn.net/cyh_24/article/details/51537709 solver.prototxt net: "models/bvlc_alexnet/train_val.prototxt" test_iter: 10
Redis筆記二之Redis命令操作簡介及五種value資料型別
Redis是使用鍵值儲存資料,key必須是字串value支援五種資料型別,最新版本又新增加了兩種這裡暫不介紹。下面會介紹redis的一些基本命令的使用。 首先啟動redis並登入客戶端。 Redis中的命令不區分大小寫。 一:基本命令及string型別 1:help
node.js學習筆記二之版本問題
nodejs targe tle 下一個 .cn blank 網站 mage 功能 一、版本說明 進入node.js官網https://nodejs.org/en/download/ 點擊上面的【All download options】進入到所有下載列表的地址 下載地
多執行緒學習筆記二之JUC元件
概述 為了對共享資源提供更細粒度的同步控制,JDK5新增了java.util.concurrent(JUC)併發工具包,併發包新增了Lock介面(以及相關實現類)用來實現鎖功能,它提供了與synchronized關鍵字相似的同步功能,只是在使用時需要顯式地獲取和釋放鎖,還具備內建鎖不具備的自由操作鎖
mysql筆記二之資料增刪改查
-- 資料增刪改查(curd) 1. -- 增加 insert -- 全列插入 值和表的欄位的順序一一對應 -- insert [
mysql筆記一之庫操作與 表操作
資料庫使用者名稱: root 密碼: mysql char(5) 長度固定為5的字串 ab --> "ab " varchar(5) abc --> "abc" 不能夠超
[基礎篇]ESP8266-NonOS學習筆記(二)之Hello World!
相信聰明伶俐的你,肯定完成了上一篇文章的環境搭建,如果你是第一次看本系列文章,可以先去看看前兩篇文章,先了解一下並完成一些基礎操作。 這裡推薦大家安裝一下Git這個灰常炒雞好用的分散式版本管理神器,本人是非常喜歡這款由Linux之父Linus Torvalds開發的工具,可以很方便讓我完成一些工作,比如說c
[基礎篇]ESP8266-NonOS學習筆記(四)之GPIO操作(按鍵、LED、中斷、定時器)
本篇文章我們再回到基礎篇,難道你還以為我會講UDP?啊哈哈哈,UDP肯定是會講的,但是應用場景不是很多,我們放到後面再講,不過也是簡單一講,畢竟熟悉UDP協議的人來說,都知道UDP一種不可靠的傳輸協議,可以這樣形容“我(Client)只管發,你(Server)愛收不收”,所以在一些實際應用場景
MyBatis筆記二之增刪改查
一、環境配置 將mybatis-3.4.6.jar和lib目錄下的jar包全匯入到專案中,不匯入lib中的jar包會報日誌錯誤。 注意:由於mybatis是基於JDBC實現的,所以需要匯入jdbc的jar包 二、編寫配置檔案 1、在src/pojo目錄下新建U
Django入門二之models操作試驗
第一部分: 1.使用 manage.py 工具載入我們的專案來啟動 Python shell : (env) D:\Development\myproject\myproject>python manage.py shell Python 3.6.4 (v3.6.4
RocketMQ學習筆記二之【DefaultMQPushConsumer使用與流程原理分析】
版本: <dependency> <groupId>org.apache.rocketmq</groupId> &
Detectron 之 config.py 檔案引數 快速跑自己模型 Mmdetection
Detectron: Detectron 1、引數理解 中文引數說明 官方引數說明 多GPU配置 2、格式轉換 訓練集格式轉換 coco格式 3、訓練流程 訓練流程1 訓練流程2 4、其他 新增垂直翻轉 faster rcnn流程:附帶部分引數解釋 Faster-RCNN演算
openwrt的研習筆記二之刷機(TL-WR703N)
昨天是稍微瞭解了下openwrt,今天本想直接進行編譯環節的,但工作有點忙,所以決定直接刷機先吧 先了解結果,再從其他地方朝著刷機的結果進攻會比較好 有目的去做事會比較容易辦成。呵呵 (如果英語比較不錯的,建議還是去看原文,出錯可不怪我哦,o(∩_∩)o) 首
Elasticsearch筆記五之java操作es
Java操作es叢集步驟1:配置叢集物件資訊;2:建立客戶端;3:檢視叢集資訊 1:叢集名稱 預設叢集名為elasticsearch,如果叢集名稱和指定的不一致則在使用節點資源時會報錯。 2:嗅探功能 通過client
Nginx運維之四 Http模組引數配置
Nginx運維之四 Http模組引數配置 案例 核心引數配置 client_body_buffer_size client_body_temp_path client_body_timeout client_header_b
MySQL複製(二)--基於二進位制日誌檔案(binlog)配置複製
基礎環境: 主庫 從庫 伺服器IP地址 192.168.10.11 192.168.10.12 版本 5.7.24 5.7.24 已存在的資料庫 mysql> show databases; +--------------------+ | Databa
《MySQL 學習筆記》 SQL語句之庫操作(二)
efault create AC format AS rman databases class mysq 創建數據庫 語法規則: create databases 數據庫名稱 [庫選項] 創建一個lyshark數據庫 MariaDB [(no
MySQL筆記系列之二檔案
檔案 MySQL資料庫和InnoDB儲存引擎由以下幾種型別檔案:引數檔案,日誌檔案,socket檔案,pid檔案,MySQL表結構檔案,儲存引擎檔案。 引數檔案(配置檔案) 當MySQL例項啟動時,MySQL會先去讀取一個配置引數檔案,用來尋找資料庫的各種檔案所在的位置以及指定某些初