1. 程式人生 > >Kafka筆記二之Topic操作,檔案引數配置

Kafka筆記二之Topic操作,檔案引數配置

以下的例子我只啟動了一個shb01,沒有加入139

主題Topic的一般操作(增刪改查),通過指令碼kafka-topics.sh來執行

建立

[[email protected] bin]# kafka-topics.sh --create--topic hello --zookeeper shb01:2181 --partition 2 --replication-factor 1

Created topic "hello".

--partition 2表示分割槽

--replication-factor 1表示副本因子,之前說過broker無主副之分但是partition有主副之分,所以此引數的值不能大於broker數就是kafka服務節點數,我的例子中只有一個shb01節點所以如果指定2會報錯的。

查詢

[[email protected] bin]# kafka-topics.sh --list--zookeeper shb01:2181

hello

也可以指定topic

[[email protected] bin]# kafka-topics.sh --list --zookeepershb01:2181 --topic hello

查詢指定topic詳細資訊

[[email protected] bin]# kafka-topics.sh--describe --zookeeper shb01:2181 --topic hello

Topic:hello     PartitionCount:2        ReplicationFactor:1     Configs:

       Topic: hello    Partition: 0    Leader: 0       Replicas: 0     Isr: 0

       Topic: hello    Partition: 1    Leader: 0       Replicas: 0     Isr: 0

我只有一個broker,id為0

Leader: 0,Partition的leader所在broker的id

Replicas: 0,Partition的副本的brokerlist

Isr: 0,表示可用的broker的id

查詢topic詳細資訊

[[email protected] bin]# kafka-topics.sh--describe --zookeeper shb01:2181

修改,只能增加partition數量不能減少

[[email protected] bin]# kafka-topics.sh  --alter --zookeeper  shb01:2181 --topic hello --partition 3

[[email protected] bin]# kafka-topics.sh--describe --zookeeper  shb01:2181--topic hello

Topic:hello     PartitionCount:3        ReplicationFactor:1     Configs:

       Topic: hello    Partition: 0    Leader: 0       Replicas: 0     Isr: 0

       Topic: hello    Partition: 1    Leader: 0       Replicas: 0     Isr: 0

       Topic: hello    Partition: 2    Leader: 0       Replicas: 0     Isr: 0

刪除

[[email protected] bin]# kafka-topics.sh --delete--zookeeper shb01:2181 --topic hello

刪除只是標記刪除,此topic仍然可用,需要修改server.properties檔案中的引數

delete.topic.enable=true開啟刪除然後重啟kafka,此引數在檔案中預設是沒有的需要手動加入

生產者,消費者

Kafka自帶的生產者和消費者只是用來測試

生產者

[[email protected] bin]# kafka-console-producer.sh--broker-list shb01:9092 --topic topic_shb01

--broker-list表示broker地址,多個地址用逗號分開

--topic 表示向那個主題生產訊息

消費者

[[email protected] bin]# kafka-console-consumer.sh--topic topic_shb01 --zookeeper shb01:2181,192.168.79.139:2181 --from-beginning


--topic表示消費那個主題

topic\whitelist\blacklist:

1、  具體的單個topic

[[email protected] bin]# kafka-console-consumer.sh--topic topic_shb01 --zookeeper shb01:2181,192.168.79.139:2181 --from-beginning

2、多個白名單topic字串[逗號隔開]。

         --topic  t1,t2表示只消費t1,t2主題

         [[email protected]]# kafka-console-consumer.sh --whitelist t1,t2 --zookeepershb01:2181,192.168.79.139:2181 --from-beginning

3、多個黑名單topic字串[逗號隔開]。

         --topic  t1,t2表示除過t1,t2主題之外消費所有主題

[[email protected]]# kafka-console-consumer.sh --blacklist t1,t2 --zookeepershb01:2181,192.168.79.139:2181 --from-beginning

--zookeeper表示kafka叢集的zk地址

--from-beginning表示從前開始,就是啟動消費者之前生產的訊息也可以消費

最後被標記刪除的主題也可以消費

檔案引數配置

broker,server.propertie

1.生產者生產傳送訊息,broker快取資料達到一定閥值或一定時間後就會flush到磁碟中這樣可以減少IO呼叫。在config目錄下的server.properties中有兩個引數用於控制flush

# The number of messages to accept beforeforcing a flush of data to disk

#log.flush.interval.messages=10000

# The maximum amount of time a message cansit in a log before we force a flush

#log.flush.interval.ms=1000

2.kafka資料被消費後會依然存在於磁碟中,消費者配合zookeeper讀取資料,通常資料會儲存168小時,可以在server.properties中通過引數進行控制

# The minimum age of a log file to beeligible for deletion

log.retention.hours=168

#log.retention.bytes=1073741824

第一個是168小時,第二個引數是按照大小超過則刪除

log.retention.check.interval.ms=300000

這個是掃描的間隔時間,滿足上面兩個條件(貌似是二選一)則執行刪除

生產者,producer.properties

1.Producer可以指定將訊息傳送給具體的partition.在配置檔案中通過partitioner.class指定自定義分割槽器。包含包名類名

partitioner.class= kafka.ProducerTest

2.通過producer.type=sync引數控制生產者使用非同步或同步方式傳送資料。

非同步或者同步傳送

同步是指:傳送方發出資料後,等接收方發回響應以後才發下一個數據的通訊方式。 

非同步是指:傳送方發出資料後,不等接收方發回響應,接著傳送下個數據的通訊方式。

3.生產者採用非同步方式可以提高發送效率,先將資料快取在磁碟然後一次性發送出去。

有兩個引數來控制什麼時候傳送訊息,一個按照時間間隔另一個按照資料大小

# maximum time, in milliseconds, forbuffering data on the producer queue

#queue.buffering.max.ms=

# the maximum size of the blocking queuefor buffering on the producer

#queue.buffering.max.messages=

消費者,consumer.properties

Kafka中每個消費者都屬於一個group且都已一個組id

消費資料

組內:一個消費者可以消費多個partition,但是一個topic下的partition只能被一個group中的一個消費者消費。下圖中group2中有4個消費者但是topic2種只有3個分割槽此時c4處於空閒狀態。此時可以增減topic2的分割槽數來增加消費能力。


組間:每個消費組之間消費同一份資料互不影響。

業務複雜增加topic,資料量大增加分割槽。分割槽越多意味著consumer的消費能力越強。

Message是通訊的基本單位儲存於topic的分割槽中,parttion在儲存層面表現為一個append.log檔案,新的message會直接追加到檔案尾部,每條訊息在檔案中的位置稱為offset(偏移量)這個資訊儲存在zookeeper中,所以消費者在讀取message時需要zk配合。Message有型別大小偏移量3個屬性,其中offset相當於message在分割槽中的id


下面是一個檢視offset的命令,這個例子我在資料中找的,沒有實際操作過

bin/kafka-run-class.shkafka.tools.ConsumerOffsetChecker --zookeeper zkserver1:2181,zkserver2:2181--group consumer_demo_11

kafka不能指定offset進行消費

kafka叢集容錯

kafka叢集中一個broker掛了後zk會選擇讓別的broker提供服務,當有broker被修復後zk會有一個自動的均衡策略讓此broker再次成為topic中partition的leader

topic:hello    partition:0    leader:0

topic:hello    partition:1    leader:2

topic:hello    partition:2    leader:1

此引數指定自動均衡策略,300秒後kafka執行均衡操作

Auto.leader.rebalance.enable=true

leader.imbalance.check.interval.seconds 預設值:300。

也可以手動執行

bin/kafka-preferred-replica-election.sh--zookeeper zkserver1:2181,zkserver2:2181


相關推薦

Kafka筆記Topic操作,檔案引數配置

以下的例子我只啟動了一個shb01,沒有加入139 主題Topic的一般操作(增刪改查),通過指令碼kafka-topics.sh來執行 建立 [[email protected] bin]# kafka-topics.sh --create--topic hel

Kafka筆記java操作

maven依賴,我使用的是版本是0.8.22,scala是2.11 <dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.11&l

薛開宇學習筆記總結筆記--caffe 中solver.prototxt;train_val.prototxt的一些引數介紹

原文地址:http://blog.csdn.net/cyh_24/article/details/51537709 solver.prototxt net: "models/bvlc_alexnet/train_val.prototxt" test_iter: 10

Redis筆記Redis命令操作簡介及五種value資料型別

Redis是使用鍵值儲存資料,key必須是字串value支援五種資料型別,最新版本又新增加了兩種這裡暫不介紹。下面會介紹redis的一些基本命令的使用。 首先啟動redis並登入客戶端。   Redis中的命令不區分大小寫。 一:基本命令及string型別 1:help

node.js學習筆記版本問題

nodejs targe tle 下一個 .cn blank 網站 mage 功能 一、版本說明 進入node.js官網https://nodejs.org/en/download/ 點擊上面的【All download options】進入到所有下載列表的地址 下載地

多執行緒學習筆記JUC元件

概述   為了對共享資源提供更細粒度的同步控制,JDK5新增了java.util.concurrent(JUC)併發工具包,併發包新增了Lock介面(以及相關實現類)用來實現鎖功能,它提供了與synchronized關鍵字相似的同步功能,只是在使用時需要顯式地獲取和釋放鎖,還具備內建鎖不具備的自由操作鎖

mysql筆記資料增刪改查

-- 資料增刪改查(curd)    1. -- 增加 insert          -- 全列插入  值和表的欄位的順序一一對應         -- insert [

mysql筆記操作與 表操作

資料庫使用者名稱: root 密碼:            mysql char(5) 長度固定為5的字串 ab  --> "ab   " varchar(5) abc --> "abc" 不能夠超

[基礎篇]ESP8266-NonOS學習筆記()Hello World!

相信聰明伶俐的你,肯定完成了上一篇文章的環境搭建,如果你是第一次看本系列文章,可以先去看看前兩篇文章,先了解一下並完成一些基礎操作。 這裡推薦大家安裝一下Git這個灰常炒雞好用的分散式版本管理神器,本人是非常喜歡這款由Linux之父Linus Torvalds開發的工具,可以很方便讓我完成一些工作,比如說c

[基礎篇]ESP8266-NonOS學習筆記(四)GPIO操作(按鍵、LED、中斷、定時器)

  本篇文章我們再回到基礎篇,難道你還以為我會講UDP?啊哈哈哈,UDP肯定是會講的,但是應用場景不是很多,我們放到後面再講,不過也是簡單一講,畢竟熟悉UDP協議的人來說,都知道UDP一種不可靠的傳輸協議,可以這樣形容“我(Client)只管發,你(Server)愛收不收”,所以在一些實際應用場景

MyBatis筆記增刪改查

一、環境配置 將mybatis-3.4.6.jar和lib目錄下的jar包全匯入到專案中,不匯入lib中的jar包會報日誌錯誤。 注意:由於mybatis是基於JDBC實現的,所以需要匯入jdbc的jar包 二、編寫配置檔案 1、在src/pojo目錄下新建U

Django入門models操作試驗

第一部分: 1.使用 manage.py 工具載入我們的專案來啟動 Python shell : (env) D:\Development\myproject\myproject>python manage.py shell Python 3.6.4 (v3.6.4

RocketMQ學習筆記【DefaultMQPushConsumer使用與流程原理分析】

版本:        <dependency>        <groupId>org.apache.rocketmq</groupId>   &

Detectron config.py 檔案引數 快速跑自己模型 Mmdetection

Detectron: Detectron 1、引數理解 中文引數說明 官方引數說明 多GPU配置 2、格式轉換 訓練集格式轉換 coco格式 3、訓練流程 訓練流程1 訓練流程2 4、其他 新增垂直翻轉 faster rcnn流程:附帶部分引數解釋 Faster-RCNN演算

openwrt的研習筆記刷機(TL-WR703N)

昨天是稍微瞭解了下openwrt,今天本想直接進行編譯環節的,但工作有點忙,所以決定直接刷機先吧 先了解結果,再從其他地方朝著刷機的結果進攻會比較好 有目的去做事會比較容易辦成。呵呵 (如果英語比較不錯的,建議還是去看原文,出錯可不怪我哦,o(∩_∩)o) 首

Elasticsearch筆記java操作es

Java操作es叢集步驟1:配置叢集物件資訊;2:建立客戶端;3:檢視叢集資訊 1:叢集名稱       預設叢集名為elasticsearch,如果叢集名稱和指定的不一致則在使用節點資源時會報錯。 2:嗅探功能        通過client

Nginx運維四 Http模組引數配置

Nginx運維之四 Http模組引數配置 案例 核心引數配置 client_body_buffer_size client_body_temp_path client_body_timeout client_header_b

MySQL複製()--基於二進位制日誌檔案(binlog)配置複製

  基礎環境:   主庫 從庫 伺服器IP地址 192.168.10.11 192.168.10.12 版本 5.7.24 5.7.24 已存在的資料庫 mysql> show databases; +--------------------+ | Databa

《MySQL 學習筆記》 SQL語句操作()

efault create AC format AS rman databases class mysq 創建數據庫 語法規則: create databases 數據庫名稱 [庫選項] 創建一個lyshark數據庫 MariaDB [(no

MySQL筆記系列檔案

檔案 MySQL資料庫和InnoDB儲存引擎由以下幾種型別檔案:引數檔案,日誌檔案,socket檔案,pid檔案,MySQL表結構檔案,儲存引擎檔案。 引數檔案(配置檔案) 當MySQL例項啟動時,MySQL會先去讀取一個配置引數檔案,用來尋找資料庫的各種檔案所在的位置以及指定某些初