1. 程式人生 > >python3之threading模塊(下)

python3之threading模塊(下)

round 調用 format ren %s space 對象 線程同步 work

同步線程

threading.Condition(),Condition使用了一個Lock,所以可以綁定一個共享資源,使多個線程等待這個資源的更新再啟動。

當然Condition也可以顯示地使用acquire()和release()方法。

一個簡單的示例

  1: import logging
  2: import threading
  3: import time
  4: def consumer(cond):
  5:     """
  6:     等待condition設置然後再使用資源
  7:     :param cond:
  8:     :return:
  9:     "
""
 10:     logging.debug("開啟consumer線程")
 11:     with cond:
 12:         cond.wait()
 13:         logging.debug("對consumer線程資源可用")
 14: def producer(cond):
 15:     """
 16:     配置資源
 17:     :param cond:
 18:     :return:
 19:     """
 20:     logging.debug("開始producer線程")
 21:     with cond:
 22:         logging.debug("使資源可用
")
 23:         # 喚醒所有等待的線程,老的寫法叫notifyAll()
 24:         cond.notify_all()
 25: logging.basicConfig(
 26:     level=logging.DEBUG,
 27:     format="%(asctime)s %(threadName)-2s %(message)s"
 28: )
 29: condition = threading.Condition()
 30: c1 = threading.Thread(name="c1", target=consumer,
 31:                       args=(condition,))
 32: c2 = threading.Thread(name="c2", target=consumer,
 33:                       args=(condition,))
 34: p = threading.Thread(name="p", target=producer,
 35:                      args=(condition, ))
 36: c1.start()
 37: time.sleep(0.2)
 38: c2.start()
 39: time.sleep(0.2)
 40: p.start()

結果:

  1: 2019-01-26 11:56:06,025 c1 開啟consumer線程
  2: 2019-01-26 11:56:06,226 c2 開啟consumer線程
  3: 2019-01-26 11:56:06,426 p  開始producer線程
  4: 2019-01-26 11:56:06,426 p  使資源可用
  5: 2019-01-26 11:56:06,426 c2 對consumer線程資源可用
  6: 2019-01-26 11:56:06,427 c1 對consumer線程資源可用

屏障barrier是另一種線程同步機制。Barrier建立一個控制點,阻塞所有的參與的線程,直到所有的線程都到達這一點,然後同時釋放阻塞的線程。

  1: import threading
  2: import time
  3: 
  4: def worker(barrier):
  5:     print(threading.current_thread().name,
  6:           "waiting for barrier with {} others.".format(barrier.n_waiting))
  7:     # 所有等待的線程都在等待時,所有的線程都被同時釋放了。
  8:     worker_id = barrier.wait()
  9:     print(threading.current_thread().name, ‘after barrier‘, worker_id)
 10: NUM_THREAD = 3
 11: barrier = threading.Barrier(NUM_THREAD)
 12: # 推倒式
 13: threads = [
 14:     threading.Thread(
 15:         name="worker - %s" % i,
 16:         target=worker,
 17:         args=(barrier, )
 18:     )
 19:     for i in range(NUM_THREAD)
 20: ]
 21: for t in threads:
 22:     print(t.name, "starting")
 23:     t.start()
 24:     time.sleep(0.1)
 25: for t in threads:
 26:     t.join()

結果:

  1: worker - 0 starting
  2: worker - 0 waiting for barrier with 0 others.
  3: worker - 1 starting
  4: worker - 1 waiting for barrier with 1 others.
  5: worker - 2 starting
  6: worker - 2 waiting for barrier with 2 others.
  7: worker - 2 after barrier 2
  8: worker - 1 after barrier 1
  9: worker - 0 after barrier 0

abort()方法會使所有等待線程接收一個BrokenBarrierError。直到reset方法恢復,重新開始攔截。

限制資源的並發訪問

如果多個線程同時訪問一個資源,但要限制總數。這個可以使用Semaphore來管理。

使用方法:

  1: s = threading.Semaphore(2)
  2: t = threading.Thread(
  3:     target=worker,
  4:     name="t1",
  5:     args=(s, )
  6: )

線程特定的數據

對於一些需要保護的資源,需要對這些並非資源所有者的線程隱藏。 threading.local()函數會創建一個對象,它能隱藏值,除非在某個線程中設置了這個屬性,這個線程才能看到它。

  1: import random
  2: import threading
  3: import logging
  4: def show_value(data):
  5:     try:
  6:         val = data.value
  7:     except AttributeError:
  8:         logging.debug("No value yet")
  9:     else:
 10:         logging.debug("value=%s" % val)
 11: def worker(data):
 12:     show_value(data)
 13:     data.value = random.randint(1, 100)
 14:     show_value(data)
 15: logging.basicConfig(
 16:     level=logging.DEBUG,
 17:     format="(%(threadName)-10s %(message)s)",
 18: )
 19: local_data = threading.local()
 20: show_value(local_data)
 21: local_data.value = 1000
 22: show_value(local_data)
 23: 
 24: # 這個worker是看不到local_data的
 25: for i in range(2):
 26:     t = threading.Thread(target=worker, args=(local_data, ))
 27:     t.start()
 28:     t.join()
 29: # 使用子類,來初始化所有的線程開始時都有相同的值
 30: class MyLocal(threading.local):
 31:     def __init__(self, value):
 32:         super().__init__()
 33:         logging.debug("Initializing %s" % self)
 34:         self.value = value
 35: local_data = MyLocal(1000)
 36: # 同樣的worker調用__init__(),每調用一次以設置默認值
 37: for i in range(2):
 38:     t = threading.Thread(target=worker, args=(local_data, ))
 39:     t.start()
 40:     t.join()

結果:

  1: (MainThread No value yet)
  2: (MainThread value=1000)
  3: (Thread-1   No value yet)
  4: (Thread-1   value=76)
  5: (Thread-2   No value yet)
  6: (Thread-2   value=88)
  7: (MainThread Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
  8: (Thread-3   Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
  9: (Thread-3   value=1000)
 10: (Thread-3   value=31)
 11: (Thread-4   Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
 12: (Thread-4   value=1000)
 13: (Thread-4   value=7)

python3之threading模塊(下)