1. 程式人生 > 其它 >kafka入門與安裝

kafka入門與安裝

技術標籤:python進階kafkakafka

Kafka是最初由Linkedin公司開發,是一個分散式、分割槽的、多副本的、釋出-訂閱模式,基於zookeeper協調的分散式日誌系統。

簡介

Kafka用於構建實時資料管道和流式應用程式。它的底層是由java和scala語音所編寫的,具有水平可擴充套件性、容錯性、速度極快,並在數千家公司投入生產。

特點

  • 釋出和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。
  • 以容錯、持久的方式儲存記錄流。
  • 流式處理資訊,即當資料產生時立即處理。

適用場景

  • 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer;
  • 應用解耦:Kafka可作為不同應用間傳遞訊息的中間平臺。例如銷售系統和庫存系統中間的訊息傳遞,降低系統耦合,提升整體穩定性。
  • 流量削鋒:在秒殺應用中,防止使用者數量過大而引起伺服器超負荷,可以將使用者請求寫入訊息佇列,再由秒殺系統消費訊息佇列中的使用者請求資料。

工作模式

基本概念

1. broker
kafka 叢集可以由多個 kafka 例項組成,每個例項(server)稱為 broker,可以理解為一次單機安裝kafka是一個例項。broker儲存topic的資料。
2. Topic
每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。每一個生成者都可以往一個或多個topic中寫入資訊,消費者可以從已訂閱的topic中消費資訊。(同一個Topic的資料在實際上可能是存放在不同的broker的)

3. Partition
topic中的資料分割為一個或多個partition。每個topic至少有一個partition。每個partition中的資料使用多個segment檔案儲存。
4. Producer
生產者即資料的釋出者,該角色將訊息釋出到Kafka的topic中。broker接收到生產者傳送的訊息後,broker將該訊息追加到當前用於追加資料的segment檔案中。
5. Consumer
消費者可以從broker中讀取資料。消費者可以消費多個topic中的資料。
6. Consumer Group
每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)
7. Leader
每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當前負責資料的讀寫的partition。
8. Follower
Follower跟隨Leader,所有寫請求都通過Leader路由,資料變更會廣播給所有Follower,Follower與Leader保持資料同步。如果Leader失效,則從Follower中選舉出一個新的Leader。

單機版安裝

一、環境準備

  • 安裝java環境,linux環境,可使用sudo apt install openjdk-8-jdk命令安裝。
  • 安裝zookeeper,apt install zookeeper下載,預設下載路徑為: /usr/share/zookeeper/bin
  • 啟動zookeeper:./zkServer.sh start
  • 下載kafka安裝包並解壓:
wgethttps://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.12-2.7.0.tgz
tar-zxvfkafka_2.12-2.7.0.tgz
  • 修改配置檔案

Python介面

kafka提供了python呼叫介面,通過kafka模組,可以使用python建立kafka叢集的生產者和消費者。

#!/usr/bin/envpython
#-*-coding:utf-8-*-
fromkafkaimportKafkaConsumer
fromkafkaimportKafkaProducer

classPipe:
def__init__(self,group_id=None):
self.topic='YourTopicName'
brokers=BROKERS[YourIpAddress:9092]
self.consumer=KafkaConsumer(bootstrap_servers=brokers,auto_offset_reset='latest',group_id=group_id,fetch_min_bytes=1024)
self.producer=KafkaProducer(bootstrap_servers=brokers)

defconsume(self):
self.consumer.subscribe([self.topic])
res=[]
whileTrue:
formessageinself.consumer:
res.append(message.value)
returnres

defproduce(self,data):
raw_data=json.dumps(data).encode('utf-8')
self.producer.send(self.topic,raw_data)

if__name__=='__main__':
data={'age':18,'name':'Lihua'}
pipe=Pipe()
pipe.produce(data)
pipe.consume()

更多內容歡迎關注個人公眾號:PythonAndDataTech
在這裡插入圖片描述