生產者和消費者模式-代碼
阿新 • • 發佈:2018-05-04
模型 stream 主函數 end main div 綁定ip AR 定義
函數:生產者和消費者
import random from queue import Queue from threading import Thread, current_thread import time # 實例化一個隊列 myq = Queue() # 定義生產者 def producer(): while True: tmp = random.randint(1,100) myq.put(tmp) print("%s生產了%s,生產後,現在產品總量:%s" % (current_thread().name, tmp, myq.qsize())) time.sleep(0.5) # 定義消費者 def consumer(): while True: print("%s消費了%s,剩余產品%s" % (current_thread().name, myq.get(), myq.qsize())) time.sleep(1.1) # 啟動生產者和消費者 # 啟動生產者 tp = Thread(target=producer) tp.start() # 啟動消費者 for i in range(2): tc = Thread(target=consumer) tc.start()
函數2:
# coding:utf-8 from queue import Queue from threading import Thread,current_thread import random import time # 實例化一個隊列,線程安全 myq = Queue() # 定義生產者 def produce(): while True: tmp = random.randint(1,100) myq.put(tmp) print(‘%s生產了%s‘ % (current_thread().name,tmp)) time.sleep(0.5) # 消費者 def consumer(): while True: print(‘%s消費了%s‘ % (current_thread().name, myq.get())) time.sleep(1) # 啟動生產者和消費者 t_p = Thread(target=produce) t_p.start() # 啟動消費者 for i in range(2): t_cs = Thread(target=consumer) t_cs.start()
函數3:
# 編寫一個基於tcp的echo服務器(回響服務器,即將客戶端發送的信息返回給客戶端), # 要求使用線程和生產者消費者模型(提示:一個線程accept--生產者;兩個線程用於接收和發送--消費者)。 import socket from threading import Thread, current_thread from queue import Queue # 生產者 def accept_t(queue): print("當前線程",current_thread().name) # client_info = server.accept() # queue.put(client_info) # 消費者recv def recv_t(queue, queue_data): client_info = queue.get() client_sock = client_info[0] data = client_sock.recv(1024) queue_data.put(data) pass try: print(data.decode()) except: print(data.decode(‘gbk‘)) # 消費者send def send_t(queue_data): data = queue_data.get() client_sock = client_info[0] client_sock.send(data) client_sock.close() pass if __name__ == "__main__": client_info = None server = None # 創建服務器的套接字(監聽套接字) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設置地址復用屬性 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 綁定IP和端口 server_address = ("", 7972) server.bind(server_address) # 監聽 server.listen(128) queue = Queue() queue_data = Queue() t1 = Thread(target=accept_t, args=(queue)) t1.start() t2 = Thread(target=recv_t, args=(queue, queue_data)) t2.start() t3 = Thread(target=send_t, args=(queue_data,)) t3.start() t1.join() t2.join() t3.join()
類:生產者和消費者
import socket from queue import Queue from threading import Thread import time import chardet client_queue = Queue() # 生產者 class Producer(Thread): def __init__(self, tcp_server): super().__init__() self.tcp_server = tcp_server def run(self): client_info = self.tcp_server.accept() client_queue.put(client_info) # 消費者 class Consumer(Thread): def __init__(self): super().__init__() def run(self): client_info = client_queue.get() client_sock = client_info[0] client_addr = client_info[1] msg = client_sock.recv(1024) print("原始字節流:",msg) a = ‘abcd‘.encode("UTF-8") print(‘a:‘, a) # a = msg.decode() code = chardet.detect(a) print(‘獲取到a的編碼是‘,code[‘encoding‘]) print("%s說:%s" % (client_addr, msg.decode())) client_sock.send(msg.decode().encode(‘gbk‘)) client_sock.close() print(‘consumer is over‘) # 主函數 def main(): tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(("", 7892)) tcp_server.listen(128) p = Producer(tcp_server) c1 = Consumer() # c2 = Consumer() p.start() c1.start() # c2.start() # time.sleep(2) p.join() c1.join() # c2.join() tcp_server.close() if __name__ == ‘__main__‘: main()
生產者和消費者模式-代碼