1. 程式人生 > >python環境下運用kafka對資料實時傳輸

python環境下運用kafka對資料實時傳輸

背景:

為了滿足各個平臺間資料的傳輸,以及能確保歷史性和實時性。先選用kafka作為不同平臺數據傳輸的中轉站,來滿足我們對跨平臺資料傳送與接收的需要。

kafka簡介:

Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規範的實現。kafka對訊息儲存時根據Topic進行歸類,傳送訊息者成為Producer,訊息接受者成為Consumer,此外kafka叢集有多個kafka例項組成,每個例項(server)成為broker。無論是kafka叢集,還是producer和consumer都依賴於zookeeper來保證系統可用性叢集儲存一些meta資訊。

總之:kafka做為中轉站有以下功能:1.生產者(產生資料或者說是從外部接收資料)2.消費著(將接收到的資料轉花為自己所需用的格式)

環境:

1.python3.5.x

2.kafka1.4.3

3.pandas

準備開始:

1.kafka的安裝

pip install kafka-python

2.檢驗kafka是否安裝成功

3.pandas的安裝

pip install pandas

4.kafka資料的傳輸

直接擼程式碼:

# -*- coding: utf-8 -*-
'''
@author: 真夢行路
@file: kafka.py
@time: 2018/9/3 10:20
'''
import sys
import json
import pandas as pd
import os
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

KAFAKA_HOST = "xxx.xxx.x.xxx"  #伺服器埠地址
KAFAKA_PORT = 9092             #埠號
KAFAKA_TOPIC = "topic0"        #topic

data=pd.read_csv(os.getcwd()+'\\data\\1.csv')
key_value=data.to_json()
class Kafka_producer():
    '''
    生產模組:根據不同的key,區分訊息
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, key):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.key = key
        self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort)
        )

    def sendjsondata(self, params):
        try:
            parmas_message = params      #注意dumps
            producer = self.producer
            producer.send(self.kafkatopic, key=self.key, value=parmas_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print(e)


class Kafka_consumer():


    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid,key):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.key = key
        self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
                                      bootstrap_servers='{kafka_host}:{kafka_port}'.format(
                                          kafka_host=self.kafkaHost,
                                          kafka_port=self.kafkaPort)
                                      )

    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except KeyboardInterrupt as e:
            print(e)

def sortedDictValues(adict):
    items = adict.items()
    items=sorted(items,reverse=False)
    return [value for key, value in items]

def main(xtype, group, key):
    '''
    測試consumer和producer
    '''
    if xtype == "p":
        # 生產模組
        producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
        print("===========> producer:", producer)
        params =key_value
        producer.sendjsondata(params)


    if xtype == 'c':
        # 消費模組
        consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group,key)
        print("===========> consumer:", consumer)

        message = consumer.consume_data()
        for msg in message:
            msg=msg.value.decode('utf-8')
            python_data=json.loads(msg)   ##這是一個字典
            key_list=list(python_data)
            test_data=pd.DataFrame()
            for index in key_list:
                print(index)
                if index=='Month':
                    a1=python_data[index]
                    data1 = sortedDictValues(a1)
                    test_data[index]=data1
                else:
                    a2 = python_data[index]
                    data2 = sortedDictValues(a2)
                    test_data[index] = data2
                    print(test_data)



            # print('value---------------->', python_data)
            # print('msg---------------->', msg)
            # print('key---------------->', msg.kry)
            # print('offset---------------->', msg.offset)



if __name__ == '__main__':
    main(xtype='p',group='py_test',key=None)
    main(xtype='c',group='py_test',key=None)

資料1.csv如下所示:

幾點注意:

1.一定要有一個伺服器的埠地址,不要用本機的ip或者亂寫一個ip不然程式會報錯。(我開始就是拿本機ip懟了半天,總是報錯)

2.注意資料的傳輸格式以及編碼問題(二進位制傳輸),資料先轉成json資料格式傳輸,然後將json格式轉為需要格式。(不是json格式的注意dumps)

例中,dataframe->json->dataframe

3.例中dict轉dataframe,也可以用簡單方法直接轉。

eg:  type(data) ==>dict,data=pd.Dateframe(data)

參考文獻: