1. 程式人生 > 其它 >Python: threading.Lock threading.RLock

Python: threading.Lock threading.RLock

10個工人生產100個杯子

import time, logging, threading, datetime

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

cups = []


def vagary(task=100):
    while True:
        n = len(cups)
        logging.info(f'{threading.current_thread()} number: {n}
') if n >= task: break cups.append(5) logging.warning(f'{threading.current_thread()} produce {n + 1}') for b in range(10): threading.Thread(target=vagary, name=f'vagary-{b}', args=(100,)).start()
import time, logging, threading, datetime

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s
' logging.basicConfig(level=logging.INFO, format=FORMAT) cups = [] lock = threading.Lock() def vagary(lock: threading.Lock, task=100): while True: lock.acquire(blocking=True) n = len(cups) logging.info(f'{threading.current_thread()} number: {n}') if n >= task:
lock.release() break cups.append(5) logging.warning(f'{threading.current_thread()} produce {n + 1}') lock.release() for b in range(10): threading.Thread(target=vagary, name=f'vagary-{b}', args=(lock, 100)).start()
import time, logging, threading, datetime

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


class Counter:
    def __init__(self):
        self.__value = 0
        self.lock = threading.Lock()

    def increment(self):
        # self.lock.acquire(blocking=True)
        self.__value += 1
        # self.lock.release()

    def decrement(self):
        # self.lock.acquire(blocking=True)
        self.__value -= 1
        # self.lock.release()

    @property
    def value(self):
        return self.__value


def do(c: Counter, lock: threading.Lock, count=100):
    for _ in range(count):
        for i in range(-50, 50):
            lock.acquire()
            if i < 0:
                c.decrement()
            else:
                c.increment()
            lock.release()


c = Counter()
lock = threading.Lock()
for i in range(10):
    t = threading.Thread(target=do, args=(c, lock, 10000))  # 此數值給高,否則執行緒瞬間結束
    t.start()
    # t.join()  # 主執行緒會等待t結束,再開啟下個執行緒

while True:
    time.sleep(1)
    print(threading.enumerate())
    print(c.value)
import time, logging, threading, datetime

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


class Counter:
    def __init__(self):
        self.__value = 0
        self.__lock = threading.Lock()

    @property
    def value(self):
        with self.__lock:
            return self.__value

    def increase(self):
        try:
            self.__lock.acquire(blocking=True)
            self.__value += 1
        finally:
            self.__lock.release()

    def decrease(self):
        # try:
        #     self.__lock.acquire(blocking=True)
        #     self.__value -= 1
        # finally:
        #     self.__lock.release()
        with self.__lock:
            self.__value -= 1


def run(c: Counter, count=100):
    for _ in range(count):
        for i in range(-50, 50):
            if i < 0:
                c.decrease()
            else:
                c.increase()


c = Counter()

for b in range(10):
    threading.Thread(name=f'Thread--{b}', target=run, args=(c, 10000)).start()

while True:
    time.sleep(1)
    if threading.active_count() == 1:
        print(threading.enumerate())
        logging.warning(f'end {c.value}')
        break  # 結束主程序
    else:
        print(threading.enumerate())
        logging.info(f'progress {c.value}')
import threading, time

lock = threading.Lock()


def w():
    time.sleep(3)
    lock.release()


lock.acquire()
print(lock.locked())
threading.Thread(target=w).start()
lock.acquire()
print('get lock')
lock.release()
print('release')
print(lock.locked())

****** 如果一個執行緒 lock.acquire()成功後, 結束 則不會自動lock.release()

import threading, time

lock = threading.Lock()


def b(lock: threading.Lock):
    lock.acquire()
    time.sleep(2)
    print(threading.current_thread(), 'exit 555555')
    # lock.release()


def p(lock: threading.Lock):
    while True:
        time.sleep(1)
        if lock.acquire():
            print('lock.acquire success')
            break
        print('lock.acquire failed')


threading.Thread(args=(lock,), target=b, name='bb').start()

threading.Thread(args=(lock,), target=p, name='pp').start()

threading.Timer(interval=6, function=lambda lock: lock.release(), args=(lock,)).start()

while True:
    print(threading.enumerate())
    time.sleep(0.7)

非阻塞鎖:

  

import threading, time, logging

FORMAT = '%(asctime)-20s [%(threadName)10s, %(thread)8d] %(message)s'
logging.basicConfig(format=FORMAT, level=logging.WARNING)


def vagary(vails):
    for vail in vails:
        time.sleep(0.001)
        if vail.lock.acquire(blocking=False):  # 獲取鎖, 返回True
            logging.warning('{} {} acquire success'.format(threading.current_thread(), vail.name))
            # 不釋放鎖
        else:
            logging.error(f'{threading.current_thread()} {vail.name} acquire failed')


class Vail:
    def __init__(self, name):
        self.name = name
        self.lock = threading.Lock()


vails = [Vail(f'vail-{i}') for i in range(10)]

# 10個vail, 每個vail, 5個執行緒只有一個獲取成功
for i in range(5):  # 啟動五個執行緒
    threading.Thread(name=f'Thread-{i}', target=vagary, args=(vails,)).start()

Reentrant Lock threading.RLock

一個執行緒內可acquire多次, 必須全部釋放完畢 ,其他執行緒才可acquire, 必須在自己執行緒內釋放

import threading
import time

lock = threading.RLock()
print(lock, type(lock))
print(lock.acquire())
print(lock.acquire())
print(lock, threading.current_thread())
print(lock.acquire())


def sub(rlock: threading.RLock):
    print('in sub', rlock, threading.current_thread())
    rlock.acquire()
    print('in sub', rlock, threading.current_thread())
    rlock.release()


threading.Thread(target=sub, args=(lock,)).start()
# 不能使用Timer, 必須在主執行緒內release
# threading.Timer(interval=5, function=lambda lock: [lock.release()] * 3, args=(lock,)).start()

time.sleep(5)
lock.release()
lock.release()
lock.release()