1. 程式人生 > 實用技巧 >使用Python進行 kafka的生產與消費

使用Python進行 kafka的生產與消費

kafka的生產與消費

在生產前需要 .需要建立一個topic,和消費的的groupid

比如可以在kafka管理系統中建立,不需要手動敲命令建立

1.建立topic和繫結消費組

2.kafka的生產

import json
from kafka import KafkaProducer


topic_name="active_user_simplified"  #生產的topic
kafka_addr = '172.17.9.151:9092,172.17.9.157:9092,172.17.9.155:9092'


def create_kafaka():
    """
    寫入kafka資料,useractive資料,如果使用者活躍,useractiveUpdata服務就會消費
    :return:
    
""" producer = KafkaProducer(bootstrap_servers=kafka_addr, value_serializer=lambda m: json.dumps(m).encode()) for i in range(1): data = { "app_key": "07b6ed26c8bcce540204c8f7", "uid": 8004540429, "platform": "W",
"type": "active_terminate", "itime": 1608977776 } result=producer.send(topic_name, data) print(result) create_kafaka()

3.kafka的消費

from kafka import KafkaConsumer

topic_name="active_user_simplified"
kafka_addr = '172.17.9.151:9092,172.17.9.157:9092,172.17.9.155:9092
' def consumer_kafaka(): group_id = "group-segment-useractiveUpdate" consumer = KafkaConsumer(topic_name, bootstrap_servers=kafka_addr, group_id=group_id, auto_offset_reset='earliest') for msg in consumer: print(msg.value) consumer_kafaka()

消費資料的日誌