python之操作kafka
python操作kafka
kafka簡介(摘自百度百科)
簡介:
afka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的訊息處理,也是為了通過叢集來提供實時的消費。
特性:
通過O(1)的磁碟資料結構提供訊息的持久化,這種結構對於即使數以TB的訊息儲存也能夠保持長時間的穩定效能。
高吞吐量[2] :即使是非常普通的硬體Kafka也可以支援每秒數百萬[2] 的訊息
支援通過Kafka伺服器和消費機叢集來分割槽訊息
支援Hadoop並行資料載入
術語:
Broker
Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker
Topic
每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)
Partition
Partition是物理上的概念,每個Topic包含一個或多個Partition.
Producer
負責釋出訊息到Kafka broker
Consumer
訊息消費者,向Kafka broker讀取訊息的客戶端。
Consumer Group
每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。
一、安裝
在pypi.python.org有很多關於操作kafka的元件,我們選擇weight最高的kafka 1.3.5
1、有網的情況下執行如下命令安裝:
pip install kafka
easy_install kafka
2、無網的情況下把原始碼下載下來,上傳到需要安裝的主機
壓縮包:kafka-1.3.5.tar.gz
解壓: tar xvf kafka-1.3.5.tar.gz
執行安裝命令: cd kafka-1.3.5
python setup.py install
如安裝報依賴錯誤,需要把依賴的元件也下載下來,然後進行安裝,同樣的方法,不贅述!
二、按照官網的樣例,先跑一個應用
1、生產者:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092']) #此處ip可以是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
msg = "msg%d" % i
producer.send('test', msg)
producer.close()
2、消費者(簡單demo):
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
啟動後生產者、消費者可以正常消費。
3、消費者(消費群組)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
啟動多個消費者,只有其中可以可以消費到,滿足要求,消費組可以橫向擴充套件提高處理能力
4、消費者(讀取目前最早可讀的訊息)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset:重置偏移量,earliest移到最早的可用訊息,latest最新的訊息,預設為latest
原始碼定義:{'smallest': 'earliest', 'largest': 'latest'}
5、消費者(手動設定偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',
bootstrap_servers=['172.21.10.136:9092'])
print consumer.partitions_for_topic("test") #獲取test主題的分割槽資訊
print consumer.topics() #獲取主題列表
print consumer.subscription() #獲取當前消費者訂閱的主題
print consumer.assignment() #獲取當前消費者topic、分割槽資訊
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,從第5個偏移量消費
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
6、消費者(訂閱多個主題)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0')) #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
7、消費者(手動拉取訊息)
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll(timeout_ms=5) #從kafka獲取訊息
print msg
time.sleep(1)
8、消費者(訊息掛起與恢復)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
print num
print consumer.paused() #獲取當前掛起的消費者
msg = consumer.poll(timeout_ms=5)
print msg
time.sleep(2)
num = num + 1
if num == 10:
print "resume..."
consumer.resume(TopicPartition(topic=u'test', partition=0))
print "resume......"
pause執行後,consumer不能讀取,直到呼叫resume後恢復。