1. 程式人生 > >python多程序監聽rabbitmq

python多程序監聽rabbitmq

# -*- coding: utf-8 -*
import pika
import sys
from db import Db
import time
from multiprocessing import Process, Pool
import os
import json
credentials = pika.PlainCredentials('qql', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.233.130',5672,'message',credentials))
channel = connection.channel()

channel.exchange_declare(exchange='exchange_msg',
                         exchange_type='direct', 
                         durable=True)

try:
    channel.queue_declare(queue='send_msg', durable=True)
except:
    channel = connection.channel()
    channel.queue_delete(queue='send_msg')
    channel.queue_declare(queue='send_msg', durable=True)
# queue_name = result.method.queue
# 獲取執行指令碼所有的引數
severities = sys.argv[1:]
# if not severities:
#     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
#     sys.exit(1)
# 迴圈列表去繫結
for severity in severities:
    channel.queue_bind(exchange='exchange_msg',
                       queue='send_msg',
                       routing_key='qql')



print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    # url = 'https://www.qqlong.top/sendceshi'
    datas = json.loads(body)
    # post_data={'num':datas['num'],'key':datas['key']}
    # response = requests.post(url, data=datas)
    # res = response.json()
    Db().table('get_msg').insert({"msg":datas})
    time.sleep(5)
    print(datas)

channel.basic_consume(callback,
                      queue='send_msg',
                      no_ack=True)


def run_proc(name):        ##定義一個函式用於程序呼叫
    for i in range(5):
        channel.start_consuming()
#執行一次該函式共需1秒的時間

if __name__ =='__main__': #執行主程序
    mainStart = time.time() #記錄主程序開始的時間
    p = Pool(4)           #開闢程序池
    for i in range(16):                                 #開闢14個程序
        p.apply_async(run_proc,args=('Process'+str(i),))#每個程序都呼叫run_proc函式,
                                                        #args表示給該函式傳遞的引數。
    p.close() #關閉程序池
    p.join()  #等待開闢的所有程序執行完後,主程序才繼續往下執行