1. 程式人生 > >訊息中介軟體Kafka學習知識總結

訊息中介軟體Kafka學習知識總結

1. 什麼是訊息系統?

  • (1)訊息系統是負責將資料從一個程式傳輸到另一個應用程式,因此應用程式可以專注於資料,不用擔心如何共享它
    分散式訊息傳遞給基於可靠訊息佇列的概念.
    訊息在客戶端應用程式和訊息傳遞系統之間非同步排隊
  • (2)有兩種型別的訊息模式可用
    a.點對點的方式
    b.釋出-訂閱訊息系統(pub-sub)

2.訊息系統問題?

  • (1)訊息系統怎麼知道某一個主題被哪些訂閱者訂閱?
    需要進行維護,訊息需要推送給訂閱者
  • (2)採用訂閱方式,會存在訂閱者越多,訊息系統維護的元資料很多,name傳送的推送的時候就曾大
    增加訊息系統的處理單元(加主機/CPU)
  • (3)如果在推送的時候,訂閱者掛掉? 那麼之後釋出的訊息該如何處理?
    通過消費者拉的方式去獲取訊息

3.訊息系統的主題介紹

RSS :中指是頻道
訊息系統當中:主題(訊息分類)
主題設計的原則?
如果沒有主題的話,訂閱者接受到的訊息將會是訊息系統中的所有資訊
有主題的話,只需要關注自己想要的內容.

4.系統的演變過程?

一個普通的j2EE專案(所有的東西全在一個系統裡面)====>叢集擴充套件(1變N) ---->分散式(某一些伺服器只處理特定的業務)
業務與業務之間就回存在資料傳遞!

5.業務員場景

1.應用解耦
	(訂單資訊的佇列)
2.流量肖鋒
	可控制活動人數
	可以緩解短時間內的高流量壓垮應用
3.日誌處理
	解決大量日誌傳輸問題,負責日誌的採集,定時寫入kafka佇列,日誌資料的接受儲存和換髮
4.訊息通訊,聊天室,實時對話...

6.什麼是kafka?

  • (1)使用Scala語言開發,kafka是一個分散式,分割槽的,多副本的,多訂閱者的日誌系統,可用於web/ngnix日誌,搜尋日誌,監控日誌,訪問日誌.
  • (2)目前支援多種客戶端語言: java python c++ php …
  • (3)apache kafka 是一個分散式 釋出-訂閱訊息系統和一個強大的佇列,可以處理大量的資料,並且使能夠將訊息從一個斷電傳遞到另一個斷電.
    kafka適合離線和線上訊息消費.kafka訊息保留在磁碟上,兵仔叢集內複製以防止資料丟失,kafka構建在zookeeper同步服務至上
    它與Apache storm 和 spark非常好的整合,用於實施流式資料分析.

7.kafka的好處?

可靠性 – 分散式 ,分割槽 複製 和容錯
可擴充套件性 – kafka訊息傳遞系統輕鬆縮放
耐用性 --分散式提交日誌,儘快保留在磁碟上
效能—對釋出和訂閱訊息都具有高吞吐量,即使儲存量TB的訊息,也可以保持穩定的效能
kafka非常的快

8.kafka的應用?

用於操作實時監控資料,分散式應用程式資訊的統計
日誌聚合解決方案,收集日誌
流處理-流行的框架從主體中讀取資料,兵對其進行處理,將處理後的資料寫入新主題.

9.kafka術語?

broker: kafka叢集包含一個或多個服務例項,建議一個broker包含一個例項
topic: 每釋出到kafka叢集的訊息都有一個類別—主題
partition: 物理上的概念,每個topic包含一個或多個partition(分割槽)
producer: 負責釋出訊息到kafka broker
consumer: 訊息消費者, 向kafka讀取訊息的客戶端
consumer group : 每個consumer屬於一個特定的consumer group

10.kafka的安裝應用

  • (1)kafka中需要用到zk ,先啟動zk,如果在偽分散式下,kafka已經集成了zk (後天自啟動)
    nohup zookeeper-server-start.sh /kafka-2.11-2.0.0/config/zookeeper.properties &

  • (2)啟動broker
    nohup kafka-server-start.sh /kafka-2.33-2.0.0/config/server.properties &

  • (3)測試
    模擬一個生產者,向另一個消費者傳送資料
    3.1建立一個主題

      kafka-topics.sh --create --zookeeper localhost:2181 --topic mytopic --partitions 1 --replication-factor 1
      --topic : 主題的名稱
      --partitions: 主題下有幾個分割槽
      --replication-factor: 副本數
    

    3.2建立一個生產者

      kafka-console-producer.sh --topic mytopic --broker-list localhost:9092
    

    3.3消費者的建立

      kafka-console-consumer.sh --topic mytopic --zookeeper localhost:2181
    

11 kafka主題(topic)

訊息在進入kafka叢集的時候,事宜主題進行資料歸納[一條訊息必須屬於某個主題]
在kafka叢集中,可以有無數的主題
從生產者角度: 它所操作的單元,一般情況下以主題為單位
從消費者角度: 它鎖操作的單元,一般情況下是以主題為單位
生產者 消費者可以以主題更細的單位操作-->分割槽
從kafka角度: 沒有限制生產者也沒有限制消費者
生產者與消費這咋處理資料的時候,必須要知道要操作的主題是哪一個.

12.如何建立主題?

  • (1)一個broker下是否可以建立多個分割槽?
    可以,broker數與分割槽屬無關
    kafka-topics-sh --create --topic mytopic2 --zookeeper localhost:2181 --partitions 3–replication-factor 1
  • (2)一個broker下是否可以建立多個副本?
    不可以,建立主題時,副本應為應該小魚等於可用的broker數
    (3) 配置屬性 --config
    eg: --config max.message.bytes=6400 --config flush.messages=1 最大訊息位元組, flush一次

13.檢視有哪些主題?

  • (1)主題詳細資訊
    kafka-topics.sh --describe --zookeeper localhost:2181
    在kafka中每一個分割槽會有一個編號:從0開始
    Leader : broker.id=0 應為在kafka中有多個副本的話,就回存在leader和follower的關係
    表示當前這個副本為leader所在的broker是哪一個 server.properties檔案中,如果有多個broker ,broker.id 不能重複
    Replicas: 所有的副本列表 0,1,2
    Isr:可用列表數 0,1
  • (2)檢視主題名稱:
    kafka-topics.sh --list --zookeeper localhost:2181
  • (3)某一個主題查詢
    kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic
  • (4) 某一個是佛UCUN在
    kafka-topics.sh --describe --zookeeper localhost:2181 | grep mytopic
  • (5)修改主題
    kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config flush.messages=1
  • (6)刪除配置引數
    kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --delete-config flush.messages
  • (7)刪除
    kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytopic
    當執行命令之後,topic不是物理刪除,而是一個標記刪除的操作
    mytopic - marked for deletion
    標記刪除後的主題還可以繼續生產資料
  • 注意?
    當伺服器重啟,就回刪除已經標記的topic ,這個和版本有關係
    真正的物理刪除,需要在server.properties檔案中加入引數
    delete .topic.enable=true (一般放在zookeeper程式碼註釋之上)

14 分割槽 partitions ?

分割槽是針對某一個主題下資料的拆分
讓消費者在拿資料的時候,速度會更快
一個broker服務下可以有多個分割槽
分割槽中資料是有序的,且不可修改,但不同分割槽中的資料可以是無序的

生產方的生產資料順序與拿到的順序不一致? 可能出現先退貨,然後再發貨
讓同一類的資料進入到同一個分割槽裡!同一個分割槽下的資料是有序的
如何保證一個主題下的資料,一定有序的? 生產什麼樣的順序,那麼消費的時候也是什麼樣的順序?
讓主題只有一個分割槽

新進來的訊息會追加到某一個分割槽的尾部

(2) 分割槽與消費組之間的關係?

   消費組: 由一個或者多個消費者組成,同一個組中的消費者對於同一條訊息只消費一次
   當某一個主題中的分割槽數對於消費組來說應該小於等於該主題下的分割槽數
   eg :  某一個主題有4個分割槽,那麼消費組中的消費者應該小於4,而且最好分割槽與分割槽數成整倍
   同一個分割槽下的資料,在同一時刻,不能被同一個消費組的不同消費者消費

15.offset資料相應的編號

偏移量: 記錄當前有多少記錄數,而且讓消費者可以知道自己消費到什麼地方
可以讓消費者自定義選擇消費某一條訊息對於同一條訊息,消費者可以消費多次
確定一條唯一的資料:
訊息可以通過 offset partition topic 跟蹤記錄

16.分割槽下的副本因子

建立副本因子的時候,副本因子應該小於等於可用的broker數
副本應為操作的單位是分割槽為單位
當有多個副本數時,kafka並不是將多個副本同時對外讀寫提供服務

作用?
讓kafka讀取和寫入資料時的高可靠
副本因子數時包含本身的 同一個副本因子不能放在同一個broker中
在有多個副本的情況下,kafka會為同一個分割槽下的分割槽,設定角色關係,一個leader和多個follower
leader負責和外面(生產者/消費者)的角色,進行讀寫處理

生產 ----> 副本關係?
生產方: 可以採用同步或者非同步的方式
同步: 傳送一批資料給kafka之後,等待kafka返回結果
非同步: 傳送一批資料給kafka之後,只提供一個回撥函式
生產者獲取返回結果是什麼時候? (確認機制)
 1)需要所有的副本確認才表示該條訊息寫入成功,返回給生產者.
    可能出現,在返回給生產者時,未及時返回, 出現重複傳送
 2)不許要確認.資料丟失
	再次傳送
 3)只需要leader確認
 4)部分確認

如果某一個分割槽有三個副本因子就算其中一個掛掉,那麼只會在剩下的2個鐘選擇一個leader.
不會再其他的broker鍾,另啟動一個副本,[因為另外啟動一個副本的話,存在資料傳遞,只要主機之間有資料傳遞,就會長時間暫用網路IO]
但是kafka是一個高吞吐量的訊息系統,這種情況不允許傳送,所以不會再另外一個broker中啟動.
如果所有的副本都掛了? 生產者如果生產的資料時指定分割槽的話,將寫入不成功.
將重新啟動,最先重新啟動好的作為leader

ISR(可用副本) : 選擇副本做為leader
ack機制: 確認機制

在使用者獲取資料的時候,可以採取kafka的兩種API
高階API和低階API
高階API:隱藏consumer和broker的相關細節,相關資訊儲存在zookeeper中  [推薦]
	讓使用者使用時很方便,大部分操作都是封裝好的. 比如: 當前消費到哪個位置下.  但是不夠靈活
低階API: 沒有進行包裝,所有操作由使用者決定,需要維護leader broker
	 如: 自己儲存某一個分割槽下的記錄,你當前消費的位置

leader broker是之前的一種說法,現在其它上不用
kafka叢集中 ---> 包含了很多的broker,但是這麼多的broker中也會有一個老大存在
	是在kafka節點中的一個臨時節點,去建立相應的資料. controller broker (最先建立的broker)
	controller broker作用: 管理所有的broker
controller_epoch 選舉
brokers : broker 服務
consumers: 消費
config : kafka 配置資訊
admin -->delete-topic 標記刪除的主題

17.kafka shell

#bin/kafka-topics.sh
create,delete,describe,or change a topic
--alter		更改或新增
--create	建立
--delete	刪除(標記刪除,若要物理刪除需要在server.properties中設定)
--describe	主題資訊
--list		主題列表

--partitions  分割槽
--replication-factor 副本

建立: topic 名稱  zookeeper地址  partitions 個數 replication-factor 副本數 是必須的引數
刪除:  topic zookeeper 是必要引數,在服務端配置delete.topic.enable=true
修改: topic zookeeper 必須引數, 副本因子一般只能新增
list: topic zookeeper 必須引數
describe: zookeeper必須引數

18. 叢集搭建

1.zookeeper叢集搭建
2.kafka叢集搭建
jps

注意:
每個broker的broker.id 必須不一樣
log.dirs 修改地址,之前的在/tmp
zookeeper.connect=ziji de zk叢集地址 (多個,分割)
啟動kafka,每一臺都要啟動

19.kafka生產者程式碼:

1. 映入包
	一般安裝包的版本,maven引用的版本一致


建立主題:
bin/kafka-topics.sh --create --zookeeper master,salve01,salve02 --topic mytopic --partitions 3 --replication-factor 3
檢視詳情
bin/kafka-topics.sh --describe --zookeeper master,slave01,salve02 --topic mytopic
每個broker 上對於分割槽上都有一個副本leader

// ProduceRecord:
 /**
	2個引數: 主題,內容
	3個引數: 主題,key,內容傳送
	4個引數: 主題,分割槽,key,內容傳送
	5個引數: 主題,分割槽,時間戳,key,內容傳送
	主題: 自定義
	分割槽: 0 1 2 ...
	當沒有key的時候,消費者拿到的資料時什麼形式?
	採用內容分割槽規則,資料均衡的原則

	採用自己的key,key值決定了進入哪個分割槽
	指定了key的時候,key 相同,訊息會進入同一個分割槽

	如果按照key值進行處理(採用的是kafka內部的)
	假設一個主題下有10個分割槽:按省份處理
		四川 : 0
		廣州 : 1
	若省份大於了分割槽數...怎麼辦?
	採用自定義分割槽來解決: 得到key ,分割槽數
 */

20.kafka 消費者程式碼

// 手動提交,偏移量
 props.put("enable.auto.commit", "false");
// 設定從哪些主題下獲取資料
//consumer.subscribe(Arrays.asList("mytopic"));
 // 通過分割槽獲取
 // (主題 分割槽)
 TopicPartition tp = new TopicPartition("mytopic","0");
 // 主題下分割槽獲取資料
 consumer.assign(Arrays.asList(tp));
 
 // 同步方式提交
consumer.commitAsync();

21.提交過程

已經消費的資料對於kafka來說,會將該消費組裡面的offset值進行修改, 什麼時候去修改?
自動提交: 提交過程是通過kafka可以將offset進行移動到下一個資訊

比如資料拿到之後,存入到hbase中
如果hbase在這個時候,連線不上,如果在處理的時候已經進行了提交,那麼kafka上的offset值已經修改了, 但是hbase中沒有資料
這樣會導致資料丟失,怎麼處理?
手動提交: 將資料處理完之後,再來進行offset進行修改提交,
	  預設情況下是自動提交,修改手動提交
	  propes.put("enable.auto.commit","false");
	   // 同步方式提交
	  consumer.commitAsync();

	  若資料寫入完成, 在提交修改offset請求是,中斷產生了, 那麼修改該次的offset是失敗的
	  那麼下一次讀取同一個分割槽中的資料時,會從已經處理的offset值再次進行處理一次,造成資料重複
	  怎麼解決?

22. producer自定義分割槽

	/**
		partitioner.class=com.zhuihui.Mypartition   分割槽介面指定為自己的, 在properties檔案中,或者在生產者用的時候
	*/
	class MyPartition implements Partitioner 
	{
		/**
			topic : 主題
			key  : 
			keyBytes : key的位元組陣列
			value
			valueBytes : 
			cluster 叢集資訊
			模擬:
			135開頭  0
			138     1
			130     2
		*/
		public int partition(String topic,Object key,byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster){
			// 獲取主題下的分割槽數\
			// 如果分割槽數不為3,那麼可以進入到0
			Integer count = cluster.partitionCountForTopic(topic);
			String keyString = key.toString();
			if (keyString !=null && count==3 )
			{
				if (keyString.startWith("135")
				{
					return 0;
				}
				if (keyString.startWith("138")
				{
					return 1;
				}
				if (keyString.startWith("130")
				{
					return 2;
				}
			}
		}
	}