1. 程式人生 > >使用python操作kafka的過程記錄

使用python操作kafka的過程記錄

一、首先是安裝安裝python3.6

 在安裝時,按照網上的命令,發現報錯

sudo add-apt-repository ppa:jonathonf/python-3.6

報錯資訊:找不到add-apt-repository命令。

sudo apt-get install python-software-properties
sudo apt-get update

sudo apt install software-properties-common
sudo apt-get update

成功!然後繼續

sudo apt-get update
sudo apt-get install python3.6

安裝成功,設定python的預設為python3.6

sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.5 1
sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.6 2

更改預設值,python預設為Python2,現在修改為Python3 

sudo update-alternatives --install /usr/bin/python python /usr/bin/python2 100
sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 150

如果要切換到Python2,執行:

sudo update-alternatives --config python

安裝pip3*

curl https://bootstrap.pypa.io/get-pip.py | sudo python3.6

二、安裝zookeeper和Kafka

 按照上面的安裝又開始踩坑了。。

1、安裝jdk,本人安裝的java10.0最新版

[email protected]:/var/pywww$ java -version
java version "10.0.2" 2018-07-17
Java(TM) SE Runtime Environment 18.3 (build 10.0.2+13)
Java HotSpot(TM) 64-Bit Server VM 18.3 (build 10.0.2+13, mixed mode)

2、安裝zookeeper,

 解壓到了/usr/local/kafka/下

[email protected]:/var/pywww$ ls -ll /usr/local/kafka/
drwxr-xr-x  6 idve root 4096 7月  24 22:20 kafka_2.12-2.0.0
drwxr-xr-x 10 idve root 4096 3月  27 12:36 zookeeper-3.4.12

然後匯入環境變數

2. 配置環境變數

export zookeeper_home=/usr/local/kafka/zookeeper-3.3.6

3. 使配置生效

source /etc/profile

4. 成功標識

[[email protected]_2.10-0.9.0.1]# sh $zookeeper_home/bin/zkServer.sh start

JMX enabled by default

Using config: /usr/local/kafka/zookeeper-3.3.6/bin/../conf/zoo.cfg

grep: /usr/local/kafka/zookeeper-3.3.6/bin/../conf/zoo.cfg: No such file or directory

Starting zookeeper ... STARTED

但是~~~但是~~~我沒有成功標識,報錯了如下第一次是

Syntax error: "(" unexpected (expecting "fi")

百度得到的答案是Ubuntu的預設shell有問題,把dash改成bash,所以我又改成bash試試

[email protected]:~# cd /bin/
[email protected]:/bin# ls -l /bin/sh
lrwxrwxrwx 1 root root 4 Dec 23 22:30 /bin/sh -> dash(預設)
[email protected]:/bin# ln -sf bash /bin/sh
[email protected]:/bin# ls -l /bin/sh
lrwxrwxrwx 1 root root 4 Dec 23 22:37 /bin/sh -> bash

這樣修改後成功了啟動了。

接下來安裝kafka,沒什麼說的,解壓就可以了,需要注意的是配置檔案的說明:

然而~~~坑沒有填完,執行zookeeper時沒有問題,但是在執行kafka的時候,也就是

執行:sudo bin/kafka-server-start.sh config/server.properties

時候又報錯了:

/usr/local/kafka/kafka_2.12-2.0.0/bin/kafka-run-class.sh: 第 306 行: exec: java: 未找到

這個問題排查耗費了我3個小時,對,沒錯就是3個小時,一看就知道是java環境問題,但是網上的回答都不完整。

在配置java環境的時候,java從java6開始就沒有java/path/to/lib/tool.jar 和dt.dat這兩個檔案了。

同時,在配置java環境的時候不能只修改/etc/profile 檔案,本人就是一直按照網上的修改這個檔案報錯的。

真正的做法是修改/etc/environment 和 /etc/profile,這兩個檔案都要修改才行。

三、大招---生產和消費者程式碼

直接貼吧:

producer:

#!/var/bin/env python
# -*- coding: utf-8 -*-


import json
from kafka import KafkaProducer


def produce():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    for i in range(2):
        msg_dict = {
            "db_config": {
                "database": "test_1",
                "host": "localhost",
                "user": "root",
                "password": "password"
            },
            "table": "msg",
            "msg": "測試下的第%s條資訊" % i
        }
        msg = json.dumps(msg_dict)
        re = producer.send('test', key=b'test', value=msg.encode("utf-8"))
    producer.close()
    print(re)


def to_str(bytes_or_str):
    if isinstance(bytes_or_str, str):
        value = bytes_or_str.encode('utf-8')
    else:
        value = bytes_or_str
    return value


if __name__ == '__main__':
    produce()

consumer:

#!/var/bin/env python
# -*- coding: utf-8 -*-


import json

from kafka import KafkaConsumer


def consumer():
    com = KafkaConsumer('test', bootstrap_servers=['localhost:9092'])
    for msg in com:
        value=json.loads(msg.value)
        rec = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, value)
        print(rec)


if __name__ == "__main__":
    consumer()

相關推薦

使用python操作kafka過程記錄

一、首先是安裝安裝python3.6  在安裝時,按照網上的命令,發現報錯 sudo add-apt-repository ppa:jonathonf/python-3.6 報錯資訊:找不到add-apt-repository命令。 sudo apt-get inst

kafka實戰教程(python操作kafka),kafka配置檔案詳解

全棧工程師開發手冊 (作者:欒鵬) kafka介紹 1.1. 主要功能 根據官網的介紹,ApacheKafka®是一個分散式流媒體平臺,它主要有3種功能: 1:It lets you publish and subscribe to strea

kfka學習筆記二:使用Python操作Kafka

1、準備工作 使用python操作kafka目前比較常用的庫是kafka-python庫,但是在安裝這個庫的時候需要依賴setuptools庫和six庫,下面就要分別來下載這幾個庫 1、下載setuptools 開啟這個網址會彈出類似下面的額下載視窗,選擇儲存檔案,點選確定

pythonkafka的基本操作

消息 server close trap cor timeout produce posit 拉取 -- coding:utf-8 -- from kafka import KafkaProducerfrom kafka import KafkaConsumerfrom k

HBase基礎操作過程記錄

這裡是雲端計算學習實驗過程中關於HBase部分的操作記錄,免得自己忘記了 安裝 HBase 1、解壓並配置環境變數 解壓安裝檔案到/software 目錄: tar –xvzfhbase-0.92.0.tar.gz 修改環境變數: export HBASE_HOME=/software

Faster R-cnn :訓練自己的資料集 caffe/python/windows 過程記錄

一、製作自己的資料集:為了方便直接新建與VOC2007一樣名稱的資料夾 VOC2007資料夾中包含: 1.Annotations中包含: .xml檔案是由labellmg.exe標定ground truth後生成的檔案: 2.ImageSets中包含:

python 3.6 lxml包安裝過程記錄

安裝其他模組時候提示需要安裝lxml 本來 pip install lxml, 提示出錯,說要安裝 Microsoft Visual C++ 14.0 is required,, 根據提示到下面網站下載: http://landinghub.visualstudio.co

在叢集上執行python編寫的spark應用程式(過程記錄

啟動hadoop[email protected]:/usr/local/hadoop-2.7.5/sbin#./start-all.sh This script is Deprecated. Instead use start-dfs.shand start-ya

python操作kafka

 python操作kafka kafka簡介(摘自百度百科) 簡介: afka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些資料通常是

Python報ImportError: No module named錯誤解決過程記錄

錯誤原因: 剛開始學習Python的小夥伴應該和我一樣不知道怎麼解決這個錯誤,其實報這個錯是因為模組的缺失,只需要使用python的命令進行安裝模組即可 解決方案: Windows環境下: 進入Python目錄:C:/Python27/下,

魚C工作室《零基礎入門學習Python》學習過程筆記記錄第一天 001-010

魚C工作室練習題從第四節開始就要收費.. 習題在百度文庫的連結如下:https://wenku.baidu.com/view/e6b8c88ecc175527072208d7.html?re=view

python伺服器安裝配置過程記錄

*新伺服器安裝步驟 1. 下載python2.7.6 wget http://www.python.org/ftp/python/2.7.6/Python-2.7.6.tgz 2. 解壓縮 tar -xf Python2.7.6 3. 安裝 1. cd Python2.7

python 操作mongoDB數據庫

查看 $set 一個 關於 pan date set 完整 mongodb 網上關於python 操作mongoDB的相關文章相對不是很多,並且質量也不是很高!下面給出一個完整的 增刪改查示例程序! #!/usr/bin/python # -*- coding

sqoop的數據抽取過程記錄

over 每次 spi sqoop load 避免 wirte 效應 hive 今天公司抽取了4千萬的表大概十幾G 用sqoop抽取是30--40分鐘 開了兩個map。模型是oracle----hdfs(hive)。以前只抽過幾十萬級別,所以千萬級別感覺還是spilt做好切

Python操作遠程數據庫

his exec 通配符 .get toc 大量 while 避免 登入 我的項目要往數據庫中插入create_time和update_time,那就勢必要引用現在的系統時間,經過大量的查找,終於發現往python是沒有對應時間datetime的相關通配符的,那麽我們要怎麽

#python# 操作文件和目錄

級別 編寫一個程序 路徑 pre 找文件 練習 itext os.path 當前 總結於廖雪峰老師的python教程。 操作文件和目錄的函數一部分放在os模塊中,一部分放在os.path模塊中。 總結如下: 1 os.path.abspath(‘.‘)

python 操作excel表格

雷達圖 tle sum utf-8 範圍 red lod 圖標 data Python 操作excel 表格 #coding=utf-8 import xlsxwriter #1.創建excel 對象 work = xlsxwr

python小工具:用python操作HP的Quality Center

over cti 步驟 response headers 服務器 登錄 chm format 背景是這樣的:這個組的測試人員每跑一個case都要上傳測試結果附件到QC。每個待測功能模塊可能包含幾十上百的case。於是手工上傳測試結果變成了繁重的體力勞動。令人驚訝的是我們的工

[原創]Nexus5 源碼下載、編譯、真機燒錄過程記錄

慢慢 prop sha shell 腳本 .cn 等待 strong download dos asop使用清華鏡像源https://mirror.tuna.tsinghua.edu.cn/help/AOSP/ 一開始使用每月初始化包的方式因為無法搞定版本的問題,沒能通過編

VirtualBox中安裝CentOS7過程記錄

linux ssh centos virtualbox 在開發過程中常常需要進行一些預研,而有些操作對操作系統可能具有破壞性且是不可恢復的,或者需要在不同的操作系統中去觀察結果,雖然在某些場合下Ghost可以解決一部分問題,但是有時候很繁瑣,因此在虛擬機中進行測試不失為一種不錯的選擇。自201