1. 程式人生 > >python生產消費者模式

python生產消費者模式

from threading import Thread
import time
import random
from queue import Queue
from collections import deque

#建立佇列,設定佇列最大數限制為3個
queue = Queue(3)

#生產者執行緒
class Pro_Thread(Thread):
    def run(self):
        #原材料準備,等待被生產
        tasks = deque([1, 2, 3, 4, 5, 6, 7, 8])
        global queue
        while True:
            try:
                #從原材料左邊開始生產
task = tasks.popleft() queue.put(task) print("生產", task, "現在佇列數:", queue.qsize()) #休眠隨機時間 time.sleep(random.random()) #如果原材料被生產完,生產執行緒跳出迴圈 except IndexError: print("原材料已被生產完畢") break #消費者執行緒
class Con_Thread(Thread): def run(self): global queue while True: if queue.not_empty: #通過get(),這裡已經將佇列減去了1 task = queue.get() time.sleep(2) #這裡可能佇列數已經空了,但是消費者手裡還有正在消費的佇列 #發出完成的訊號,不發的話,join會永遠阻塞,程式不會停止
queue.task_done() print("消費", task) else: break #r入口方法,主執行緒 def main(): Pro_1 = Pro_Thread() #把生產執行緒列為守護執行緒,否則主執行緒結束之後不會銷燬該執行緒,程式不會停止,影響實驗結果 Pro_1.setDaemon(True) #啟動執行緒 Pro_1.start() for i in range(2): Con_i = Con_Thread() # 把兩個消費者執行緒列為守護執行緒,否則主執行緒結束之後不會銷燬該執行緒,程式不會停止,影響實驗結果 Con_i.setDaemon(True) #啟動執行緒 Con_i.start() global queue #這裡休眠一秒鐘,等到佇列有值,否則佇列建立時是空的,主執行緒直接就結束了,實驗失敗,造成誤導 time.sleep(1) #接收訊號,主執行緒在這裡等待佇列被處理完畢後再做下一步 queue.join() #給個標示,表示主執行緒已經結束 print("主執行緒結束") if __name__ == '__main__': main()

 

 

 

wrong:

#coding:utf8
#因為註釋中含有中文,所以在程式碼檔案中首先要指明編碼方式為UTF-8

#多執行緒處理Ping

import db1
import sys
reload(sys)
sys.setdefaultencoding('utf8')#指定程式碼編碼格式
sys_encoding = sys.getfilesystemencoding()

from threading import Thread
import subprocess
from Queue import Queue
import re
import time


def files_ip(ips):
        f = '';
        try :
            f = open('C:\\Users\\cctv1\\Desktop\\PING_IP_LIST.txt','rb')
            for line in f:
                str = line.strip('\n\r')
                ips.append(str)
        finally:
            if f:
                f.close()

def pingme(i,queue,ipdatavalue):
    while not queue.empty():
        ip=queue.get()
        print 'Thread %s pinging %s' %(i,ip)
        cmd = 'ping -n %s %s' % (1, ip)
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        result = p.stdout.read()
        regex = re.findall('100%', result)
        ip_state = '1'
        ipmsg=''
        avg_time = 9999
        if len(regex) == 0:
            ipmsg = "\033[31m%s UP\033[0m\r" % (ip)
            
            reg_receive = '平均\s=\s\d'
            out =  str(result.decode('gbk'))
            match_receive = re.search(reg_receive,out)
            print('^^^')
            print(match_receive)
            if match_receive:
                    match_receivegrp = str(match_receive.group()).decode('utf-8')
                    
                    avg_time = int(match_receivegrp[5:])
            else:
                    avg_time = 9999
        else:
            ipmsg = "\033[32m%s DOWN\033[0m\r" % (ip)
            ip_state='0'
            avg_time = 9999
        ipdatavalue.append((ip,ip_state,ipmsg,avg_time))
        queue.task_done()
       
def start_run_ip(ips,batch_no):
        ipdatavalue = []
        num_threads=30
        
        q=Queue()
        for ip in ips:
            q.put(ip)

        #start num_threads threads
        for i in range(num_threads):
            t=Thread(target=pingme,args=(i,q,ipdatavalue))
               #print i
            t.setDaemon(True)
            t.start()
        q.join();

        insertsql = "INSERT INTO monitor_ip_log (ip_address,ip_state,batch_no,date_time,state_remark,avg_time) VALUES (%s,%s,"+batch_no+",NOW(),%s,%s)"
        db1.insert_data_batch(insertsql,ipdatavalue)

        

def t_main():
        time_start = time.time()
        #ip列表
        ips = []
        files_ip(ips)
        #批次號
        batch_no = db1.get_date_time()
        start_run_ip(ips,batch_no)
        print 'Done'

        time_end = time.time()
        time_cost = time_end - time_start
        print("%.2f s" % (time_cost))
 

 

https://blog.csdn.net/u011655220/article/details/79037032