python中socket、進程、線程、協程、池的創建方式
阿新 • • 發佈:2018-09-27
num join() 進行 set rom style 高效率 accept cep
一、TCP-socket 服務端: import socket tcp_sk = socket.socket() tcp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) tcp_sk.bind((‘127.0.0.1‘,8000)) tcp_sk.listen() conn,addr = tcp_sk.accept() conn.send(‘你好‘.encode(‘utf-8‘)) print(conn.recv(1024).decode(‘utf-8‘)) conn.close() tcp_sk.close() 客戶端:import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8000)) print(sk.recv(1024).decode(‘utf-8‘)) sk.send(‘嘿嘿嘿‘.encode(‘utf-8‘)) sk.close() 二、UDP-socket 服務端: import socket udp_sk = socket.socket(type=socket.SOCK_DGRAM) udp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) udp_sk.bind((‘127.0.0.1‘,8001)) msg,addr = udp_sk.recvfrom(1024) print(msg.decode(‘utf-8‘)) udp_sk.sendto(‘你好‘.encode(‘utf-8‘),addr) udp_sk.close() 客戶端: import socket sk = socket.socket(type=socket.SOCK_DGRAM) sk.sendto(‘哈哈‘.encode(‘utf-8‘),(‘127.0.0.1‘,8001)) msg,addr = sk.recvfrom(1024) print(msg.decode(‘utf-8‘)) sk.close() 三、socketserver 服務端: import socketserver class Myserver(socketserver.BaseRequestHandler): def handle(self): conn = self.request while True: conn.send(b‘hello‘) print(conn.recv(1024).decode(‘utf-8‘)) socketserver.TCPServer.allow_reuse_address = True server = socketserver.ThreadingTCPServer((‘127.0.0.1‘,8080),Myserver) server.serve_forever() 客戶端: import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8080)) while True: ret = sk.recv(1024) print(ret.decode(‘utf-8‘)) sk.send(b‘hiworld‘) sk.close() 四、進程 方式一、 from multiprocessing import Process def func(arg): print(arg) if __name__ == ‘__main__‘: p = Process(target=func,args=(‘子進程‘,)) p.start() p.join() print(‘主進程‘) 方式二、 from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name = name def run(self): print(self.name) if __name__ == ‘__main__‘: p = MyProcess(‘小明‘) p.start() 五、線程 方式一、 from threading import Thread import time def sleep_boy(name): time.sleep(1) print(‘%s is sleeping‘ %name) t = Thread(target=sleep_boy,args=(‘xiaoming‘,)) # 這裏可以不需要main,因為現在只是在一個進程內操作,不需要導入進程就不會import主進程了 t.start() print(‘主線程‘) 方式二、 from threading import Thread import time class Sleep_boy(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): time.sleep(1) print(‘%s is sleeping‘ % self.name) t = Sleep_boy(‘xiaoming‘) t.start() print(‘主線程‘) 六、協程 1、greenlet例子: import time from greenlet import greenlet def cooking(): print(‘cooking 1‘) g2.switch() # 切換到g2,讓g2的函數工作 time.sleep(1) print(‘cooking 2‘) def watch(): print(‘watch TV 1‘) time.sleep(1) print(‘watch TV 2‘) g1.switch() # 切換到g1,讓g1的函數工作 g1 = greenlet(cooking) g2 = greenlet(watch) g1.switch() # 切換到g1,讓g1的函數工作 greenlet的缺陷:很顯然greenlet實現了協程的切換功能,可以自己設置什麽時候切,在哪切,但是它遇到阻塞並沒有自動切換, 因此並不能提高效率。所以一般我們都使用gevent模塊實現協程 2、gevent例子: from gevent import monkey monkey.patch_all() import time import gevent def cooking(): print(‘cooking 1‘) time.sleep(1) print(‘cooking 2‘) def watch(): print(‘watch TV 1‘) time.sleep(1) print(‘watch TV 2‘) g1 = gevent.spawn(cooking) # 自動檢測阻塞事件,遇見阻塞了就會進行切換 g2 = gevent.spawn(watch) g1.join() # 阻塞直到g1結束 g2.join() # 阻塞直到g2結束 七、進程池 1、同步提交apply: import os import time from multiprocessing import Pool def test(num): time.sleep(1) print(‘%s:%s‘ %(num,os.getpid())) return num*2 if __name__ == ‘__main__‘: p = Pool() for i in range(20): res = p.apply(test,args=(i,)) # 提交任務的方法 同步提交 print(‘-->‘,res) # res就是test的return的值,同步提交的返回值可以直接使用 2、異步提交apply_async: 2-1無返回值: import time from multiprocessing import Pool def func(num): time.sleep(1) print(‘做了%s件衣服‘%num) if __name__ == ‘__main__‘: p = Pool(4) # 進程池中創建4個進程,不寫的話,默認值為你電腦的CUP數量 for i in range(50): p.apply_async(func,args=(i,)) # 異步提交func到一個子進程中執行,沒有返回值的情況 p.close() # 關閉進程池,用戶不能再向這個池中提交任務了 p.join() # 阻塞,直到進程池中所有的任務都被執行完 2-2有返回值: import time import os from multiprocessing import Pool def test(num): time.sleep(1) print(‘%s:%s‘ %(num,os.getpid())) return num*2 if __name__ == ‘__main__‘: p = Pool() res_lst = [] for i in range(20): res = p.apply_async(test,args=(i,)) # 提交任務的方法 異步提交 res_lst.append(res) for res in res_lst: print(res.get()) # 異步提交的返回值需要get,get有阻塞效果,此時就不需要close和join 2-3map: map接收一個函數和一個可叠代對象,是異步提交的簡化版本,自帶close和join方法 可叠代對象的每一個值就是函數接收的實參,可叠代對象的長度就是創建的任務數量 map可以直接拿到返回值的可叠代對象(列表),循環就可以獲取返回值 import time from multiprocessing import Pool def func(num): print(‘子進程:‘,num) # time.sleep(1) return num if __name__ == ‘__main__‘: p = Pool() ret = p.map(func,range(10)) # ret是列表 for i in ret: print(‘返回值:‘,i) 2-4回調函數: import os from multiprocessing import Pool def func(i): print(‘子進程:‘,os.getpid()) return i def call_back(res): print(‘回調函數:‘,os.getpid()) print(‘res--->‘,res) if __name__ == ‘__main__‘: p = Pool() print(‘主進程:‘,os.getpid()) p.apply_async(func,args=(1,),callback=call_back) # callback關鍵字傳參,參數是回調函數 p.close() p.join() 八、進程池、線程池 線程池: 1、 import time from concurrent.futures import ThreadPoolExecutor def func(i): print(‘thread‘,i) time.sleep(1) print(‘thread %s end‘%i) tp = ThreadPoolExecutor(5) # 相當於tp = Pool(5) tp.submit(func,1) # 相當於tp.apply_async(func,args=(1,)) tp.shutdown() # 相當於tp.close() + tp.join() print(‘主線程‘) 2、 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print(‘thread‘,i,currentThread().ident) time.sleep(1) print(‘thread %s end‘%i) tp = ThreadPoolExecutor(5) for i in range(20): tp.submit(func,i) tp.shutdown() # shutdown一次就夠了,會自動把所有的線程都join() print(‘主線程‘) 3、返回值 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print(‘thread‘,i,currentThread().ident) time.sleep(1) print(‘thread %s end‘ %i) return i * ‘*‘ tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): ret = tp.submit(func,i) ret_lst.append(ret) for ret in ret_lst: print(ret.result()) # 相當於ret.get() print(‘主線程‘) 4、map map接收一個函數和一個可叠代對象 可叠代對象的每一個值就是函數接收的實參,可叠代對象的長度就是創建的線程數量 map可以直接拿到返回值的可叠代對象(列表),循環就可以獲取返回值 import time from concurrent.futures import ThreadPoolExecutor def func(i): print(‘thread‘,i) time.sleep(1) print(‘thread %s end‘%i) return i * ‘*‘ tp = ThreadPoolExecutor(5) ret = tp.map(func,range(20)) for i in ret: print(i) 5、回調函數 回調函數在進程池是由主進程實現的 回調函數在線程池是由子線程實現的 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print(‘thread‘,i,currentThread().ident) time.sleep(1) print(‘thread %s end‘%i) return i * ‘*‘ def call_back(arg): print(‘call back : ‘,currentThread().ident) print(‘ret : ‘,arg.result()) # multiprocessing的Pool回調函數中的參數不需要get(),這裏需要result() tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): tp.submit(func,i).add_done_callback(call_back) # 使用add_done_callback()方法實現回調函數 print(‘主線程‘,currentThread().ident)
python中socket、進程、線程、協程、池的創建方式