Python 9 進程,線程
python GIL全局解釋器鎖
Python GIL(Global Interpreter Lock)
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
首先,需要明確的是GIL並不是Python的特征,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來便宜成可執行代碼。有名的編譯器例如GCC, INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行,像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行換機,所以的很多人的概念裏面CPython就等同於Python,也就想當然的把GIL歸類為Python語言的缺陷。所以首先要明確一點:GIL並不是Python的特性,Python完全可以不依賴與GIL。
Python threading 模塊
import threading import time def sayhi(num): print("test run ==>", num) time.sleep(2) if __name__ == "__main__": t1 = threading.Thread(target=sayhi, args=(1,)) t2 = threading.Thread(target=sayhi, args=(2,)) t1.start() t2.start() print(t1.getName()) print(t2.getName())
class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): print("test run ==>", self.num) time.sleep(3) if __name__ == "__main__": t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
Join &Daemon
Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it‘s okay to kill them off once the other, non-daemon, threads have exited.
Without daemon threads, you‘d have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.
import time import threading def run(n): print("----- run:%s -----" % n) time.sleep(2) print("----done----") def main(): for i in range(5): t = threading.Thread(target=run, args=(i, )) t.start() t.join(1) print("start threading", t.getName()) m = threading.Thread(target=main, args=[]) m.setDaemon(True) m.start() m.join(timeout=2) print("----main thread done----")
import time import threading def addNum(): global num print("--get num:", num) time.sleep(1) num +=1 num = 0 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: t.join() print("ending ===>", num) # 講道理,這個在python2.7上多運行幾次應該可以發現結果不是100, anyway。
import time import threading def addNum(): global num print("--get num:", num) time.sleep(1) lock.acquire() # 先加鎖後處理數據 num += 1 lock.release() # 數據處理完後解除,釋放掉線程鎖(互斥鎖) num = 0 # 全局變量 thread_list = [] lock = thread_list.Lock() # 生成線程鎖 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: t.join() # 等待所有線程執行完成 print("ending ===>", num)
這時你可能有一點疑惑, 因為之前我們提到GIL保證了在同一時間只有一個線程執行,為什麽這裏還是要Mutex這樣一個互斥鎖呢?
基本就是說,其實線程是根據python裏面的上下文執行解釋器來串行執行的,當線程1去到count值,但是線程一還沒執行完,這時候就要執行線程2時,線程2對count進行了+1處理,返回給了公共數據池,但是再繼續執行線程1沒有走完的部分,線程一因為已經取到了數據count=0, 會執行繼續+1,count就會=1,這就是問題所在.
#!/user/bin/env python # -*-coding: utf-8-*- import threading, time def run1(): print("grab the first part data") lock.acquire() global num num += 1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2 += 1 lock.release() return num2 def run3(): lock.acquire() res = run1() print(‘--------between run1 and run2-----‘) res2 = run2() lock.release() print(res, res2) if __name__ == ‘__main__‘: num, num2 = 0, 0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count()) else: print(‘----all threads done---‘) print(num, num2)
#!/user/bin/env python # -*-coding: utf-8-*- import threading,time def run(n): semaphore.acquire() time.sleep(1) print("run ==> %s " % n) semaphore.release() if __name__ == "__main__": semaphore = threading.BoundedSemaphore(5) # 最大同時運行線程的數量 for i in range(20): t = threading.Thread(target=run, args=(i, )) t.start() while threading.active_count()!= 1: time.sleep(1) print(threading.active_count()) else: print("------All Done------")
This class represents an action that should be run only after a certain amount of time has passed
Timers are started, as with threads, by calling their start()
method. The timer can be stopped (before its action has begun) by calling thecancel()
method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.
import threading def hello(): print("hello world") t = threading.Timer(5, hello) t.start()
An event is a simple synchronization object;
the event represents an internal flag, and threads can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set event.wait()
# a server thread can set or reset it event.set() event.clear() If the flag is set, the wait method doesn’t do anything. If the flag is cleared, wait will block until it becomes set again. Any number of threads may wait for the same event.
#!/user/bin/env python # -*-coding: utf-8-*- import threading,time def light(): i = 0 event.set() while True: if i >= 10: event.set() # 信號代表綠燈 i = 0 print("\033[42;1m綠燈請出行》》》》\033[0m") elif i >= 5 and i < 10: event.clear() print("\033[41;1m紅燈請停步》》》》\033[0m") else: print("\033[42;1m綠燈請出行》》》》\033[0m") time.sleep(1) i += 1 def car(): while True: if event.is_set(): print("car is running") time.sleep(1) else: print("car is waiting for greening light") event.wait() event = threading.Event() _Light = threading.Thread(target=light) _Car = threading.Thread(target=car) _Light.start() _Car.start()
#_*_coding:utf-8_*_ __author__ = ‘Alex Li‘ import threading import time import random def door(): door_open_time_counter = 0 while True: if door_swiping_event.is_set(): print("\033[32;1mdoor opening....\033[0m") door_open_time_counter +=1 else: print("\033[31;1mdoor closed...., swipe to open.\033[0m") door_open_time_counter = 0 #清空計時器 door_swiping_event.wait() if door_open_time_counter > 3:#門開了已經3s了,該關了 door_swiping_event.clear() time.sleep(0.5) def staff(n): print("staff [%s] is comming..." % n ) while True: if door_swiping_event.is_set(): print("\033[34;1mdoor is opened, passing.....\033[0m") break else: print("staff [%s] sees door got closed, swipping the card....." % n) print(door_swiping_event.set()) door_swiping_event.set() print("after set ",door_swiping_event.set()) time.sleep(0.5) door_swiping_event = threading.Event() #設置事件 door_thread = threading.Thread(target=door) door_thread.start() for i in range(5): p = threading.Thread(target=staff,args=(i,)) time.sleep(random.randrange(3)) p.start()
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- class
(maxsize=0) #先入先出
- class
(maxsize=0) #last in fisrt out - class
(maxsize=0) #存儲數據時可設置優先級的隊列
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by
). A typical pattern for entries is a tuple in the form:(priority_number, data)
- exception
Exception raised when non-blocking
) is called on aQueue
object which is empty.
- exception
Exception raised when non-blocking
) is called on aQueue
object which is full.
() #return True if empty
() # return True if full
(item, block=True, timeout=None)-
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the
exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise theFull
exception (timeout is ignored in that case).
Equivalent to
put(item, False)
(block=True, timeout=None)-
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the
exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise theEmpty
exception (timeout is ignored in that case).
Equivalent to
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each
used to fetch a task, a subsequent call totask_done()
tells the queue that the processing on the task is complete.If a
is currently blocking, it will resume when all items have been processed (meaning that atask_done()
call was received for every item that had beenput()
into the queue).Raises a
if called more times than there were items placed in the queue.
() block直到queue被消費完畢
import threading import queue import time q = queue.Queue() def producer(): count = 1 while True: q.put("Pizza %s" % count) print("\033[42;1mPizza %s 做好了。。\033[0m" % count ) time.sleep(1) count += 1 def consumer(n): while True > 0: print("%s 取到" % n, q.get()) time.sleep(1) p = threading.Thread(target=producer) c = threading.Thread(target=consumer, args=("dandy",)) c1 = threading.Thread(target=consumer, args=("renee",)) p.start() c.start() c1.start()
is a package that supports spawning processes using an API similar to the threading
module. The multiprocessing
package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
from multiprocessing import Process import time def Foo(name): time.sleep(2) print("hello ", name) if __name__ == "__main__": p = Process(target=Foo, args=("dandy", )) p.start() p.join()
from multiprocessing import Process import time import os def info(title): print(title) print("module name:", __name__) print("parent process:", os.getppid()) print("process id:", os.getpid()) def Foo(name): info("Here is the title.") print("hello", name) if __name__ == "__main__": info("\033[32;1mMain Process Line\033[0m") p = Process(target=Foo, args=("dandy", )) p.start() p.join()
Main Process Line module name: __main__ parent process: 8404 process id: 9136 Here is the title. module name: __mp_main__ parent process: 9136 process id: 7300 hello dandy
首先先是運行主程序方法:先傳參給info,然後運行info,這時候打印出來的module 那麽肯定是main,即主程序,我們分析一下裏面的2個進程id,getppid==》get parent process,獲取父進程ID;getpid==》get process ID獲取進程ID。這邊需要解釋的也就是順序問題,8404在window裏面可以查到,是PyCharm的進程ID,9136是這個py文件的主程序或者主進程的ID,即每一個子進程都是由一個父進程產生的。然後下面的語法就是調用Process,實例化出來一個進程,我們既然實例化出來了一個進程,那麽這個進程很顯然是由父進程9136,文件主進程起來的子進程:7300.這樣理解起來應該可以輕松很多。
from multiprocessing import Process,Queue def Foo(qq): qq.put("hello dandy") if __name__ == "__main__": q = Queue() p = Process(target=Foo, args=(q, )) p.start() print(q.get()) p.join()
很簡單,一眼望穿,queue就是一邊put,一邊get,2個不同的進程各在一邊就形成了數據交換。註意進程的實例化,其實跟線程差不多,然後就是process queue的import。
from multiprocessing import Process,Pipe def Foo(conn): conn.send("hello dandy") conn.close() if __name__ == "__main__": parent_conn, child_conn = Pipe() p = Process(target=Foo, args=(child_conn, )) p.start() print(parent_conn.recv()) p.join ()
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).
The two connection objects returned by Pipe()
represent the two ends of the pipe. Each connection object has send()
and recv()
methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example,
#!/user/bin/env python # -*-coding: utf-8-*- from multiprocessing import Process,Manager import os def Foo(d, l): d[os.getpid()] = os.getpid() l.append(os.getpid()) if __name__ == "__main__": with Manager() as manager: # manager = Manager( )==>d = Manager(.dict( d = manager.dict() # 通過manager實例化出來一個用於進程通訊的dict l = manager.list(range(5)) # 這個就不要說了吧。。 p_list = [] for i in range(10): p = Process(target=Foo, args=(d,l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
Without using the lock output from the different processes is liable to get all mixed up.
def Foo(l,i): l.acquire() try: print("hello world", i) finally: l.release() if __name__ == "__main__": lock = Lock() for i in range(10): Process(target=Foo,args=(lock, i)).start()
apply & apply_async
#!/user/bin/env python # -*-coding: utf-8-*- from multiprocessing import Pool import os,time def Foo(i): time.sleep(2) print("In process Foo", os.getpid()) return i +50 def bar(args): print("===> Done:", args, os.getpid()) if __name__ == "__main__": pool = Pool(processes=3) # 允許的最大進程數 print("主進程", os.getpid()) for i in range(10): # pool.apply(func=Foo, args=(i, ), callback= bar) # 串行 pool.apply_async(func=Foo, args=(i, ), callback= bar) # 異步 print("Ending..") pool.close() pool.join() # 這裏需要主要的是close 跟join的位置跟之前的所有遇到的都不一樣,這裏的順序只能是這樣 # 如果註釋掉join,進程池直接關閉,進程被關閉
Python 9 進程,線程