1. 程式人生 > >趙凡導師併發知識第一次分享觀後感

趙凡導師併發知識第一次分享觀後感

主要是滲透了解 asyncio  相關概念

提及部分描述:1.執行緒(阻塞,非阻塞)

       2.執行緒之間通訊(執行緒之間的傳遞(import queue))

          3.程序之間通訊(程序之間通訊from multiprocessing import Process,Queue  用程序Queue)單獨詳解

舉例圖:

 

 

場景描述:

資料庫的要操作的表的資訊為:

     當多個請求都到資料庫操作介面程式的時候,針對同一個name的count進行增加或者減少,就要保證操作的同一個時刻只有一個可以去獲取count的值並進行update操作,所以我是在這一步增加了鎖,因為使用aiohttp寫的,所以想要在這裡也用了aiorwlock,但是在我測試的過程中發現了,當一個協程獲取鎖還沒釋放鎖的時候,另外一個協程也獲取到鎖,下面我是具體的程式碼

核心程式:

 

class CntHandler(object):

    def __init__(self, db, loop):
        self.db = db
        self.loop = loop
        self.company_lock = {}

    def response(self, request, msg):
        peer = request.transport.get_extra_info('peername')
        logging.info("request url[%s] from[%s]: %s
", request.raw_path, peer, msg) origin = request.headers.get("Origin") if origin is not None: headers = {"Access-Control-Allow-Origin": origin, "Access-Control-Allow-Credentials": "true"} resp = web.Response(text=util.dictToJson(msg), content_type='application/json
', headers=headers) else: resp = web.Response(text=util.dictToJson(msg), content_type='application/json') return resp async def cnt_set(self, request): """ 用於設定company表中的count值 :param request: :return: """ post = await request.post() logging.info('post %s', post) company_name = post.get("company") cnt = post.get("cnt") sql = "update shield.company set count=%s where name=%s" args_values = [cnt, company_name] rwlock = self.company_lock.get(company_name, "") if not rwlock: rwlock = aiorwlock.RWLock(loop=self.loop) self.company_lock[company_name] = rwlock async with rwlock.writer: msg = dict() po_sql = "select * from shield.company where name=%s" po = await self.db.get(po_sql, company_name) if not po: # 找不到企業 logging.error("not found company name [%s]", company_name) msg["code"] = 404 msg["code"] = "not found company" return self.response(request, msg) res = await self.db.execute(sql, args_values) if not isinstance(res, int): logging.error("sql update is err:", res) msg["code"] = 403 msg["reason"] = "set fail" return self.response(request, msg) logging.info("company [%s] set cnt [%s] is success", company_name, cnt) msg["code"] = 200 msg["reason"] = "ok" return self.response(request, msg) async def cnt_inc(self, request): """ 用於增加company表中的count值 :param request: :return: """ post = await request.post() logging.info('post %s', post) company_name = post.get("company") cnt = int(post.get("cnt", 0)) rwlock = self.company_lock.get(company_name, "") if not rwlock: rwlock = aiorwlock.RWLock(loop=self.loop) self.company_lock[company_name] = rwlock async with rwlock.writer: uuid_s = uuid.uuid1().hex logging.debug("[%s]---[%s]", uuid_s, id(rwlock)) msg = dict() sql = "select * from shield.company where name=%s" po = await self.db.get(sql, company_name) if not po: # 找不到企業 logging.error("not found company name [%s]", company_name) msg["code"] = 404 msg["code"] = "not found company" return self.response(request, msg) old_cnt = po.get("count") po_cnt = int(po.get("count")) res = po_cnt + cnt update_sql = "update shield.company set count=%s where name=%s" args_values = [res, company_name] update_res = await self.db.execute(update_sql, args_values) if not isinstance(update_res, int): # 資料庫update失敗 logging.error("sql update is err:", update_res) msg["code"] = 403 msg["reason"] = "inc fail" return self.response(request, msg) logging.info("uuid [%s] lock [%s] company [%s] inc cnt [%s] old cnt [%s] true will is [%s] success", uuid_s,id(rwlock), company_name, cnt, old_cnt, res) msg["code"] = 200 msg["reason"] = "ok" return self.response(request, msg) async def cnt_dec(self, request): """ 用於減少company表中count的值 :param request: :return: """ post = await request.post() logging.info('post %s', post) company_name = post.get("company") cnt = int(post.get("cnt", 0)) rwlock = self.company_lock.get(company_name, "") if not rwlock: rwlock = aiorwlock.RWLock(loop=self.loop) self.company_lock[company_name] = rwlock async with rwlock.writer: uuid_s = uuid.uuid1().hex logging.debug("[%s]---[%s]", uuid_s, id(rwlock)) msg = dict() sql = "select * from shield.company where name=%s" po = await self.db.get(sql, company_name) if not po: # 找不到企業 logging.error("not found company name [%s]", company_name) msg["code"] = 404 msg["code"] = "not found company" return self.response(request, msg) po_cnt = int(po.get("count")) old_cnt = po.get("count") if po_cnt == 0: logging.error("company [%s] cnt is 0", company_name) msg["code"] = 400 msg["reason"] = "cnt is 0" return self.response(request, msg) if po_cnt < cnt: # 資料庫餘額不足 logging.error("company [%s] count is not enough", company_name) msg["code"] = 405 msg["reason"] = "count is not enough" return self.response(request, msg) res = po_cnt - cnt update_sql = "update shield.company set count=%s where name=%s" args_values = [res, company_name] update_res = await self.db.execute(update_sql, args_values) if not isinstance(update_res, int): # 執行update 失敗 logging.error("sql update is err:", update_res) msg["code"] = 403 msg["reason"] = "inc fail" return self.response(request, msg) logging.info("uuid [%s] lock [%s] company [%s] dec cnt [%s] old cnt [%s] true will is [%s] success",uuid_s,id(rwlock), company_name, cnt, old_cnt, res) msg["code"] = 200 msg["reason"] = "ok" return self.response(request, msg)

上面程式碼出問題的程式碼是在增加和減少的時候:

 

async with rwlock.writer:

在一個協程還沒有釋放鎖的時候,另外一個操作也就進來了,到之後在測試併發的時候,對同一個name的count進行操作導致最後的count值不符合的問題

這個完整的程式碼地址:https://github.com/pythonsite/test_aiorwlock