1. 程式人生 > 實用技巧 >參悟python元類(又稱metaclass)系列實戰(五)

參悟python元類(又稱metaclass)系列實戰(五)

寫在前面

在上一章節參悟python元類(又稱metaclass)系列實戰(四)完成了Mysql類, 用來連線資料庫以及執行sql語句;
繼續豐富系列實戰(三)的users類, 忘記的小夥伴請戳此處;
本章內容為該系列的終結篇, 感謝大家從而終;
有誤的地方懇請大神指正下。

熱身預備

  • Mysql中, 嚴謹的sql語句都長的類似下面的格式

    SELECT `uid`, `name` FROM `users`;     -- 關鍵字都帶"反引號"
    
  • Python中如何把一個list中的元素都加上反引號呢?

    fields = ['uid', 'name']
    new_fields = list(map(lambda a: f'`{a}`', fields))   # ['`uid`', '`name`']
    
  • 如何把fields帶入到sql中呢?

    f"SELECT {', '.join(new_fields)} FROM `users`"
    
  • 我們上一章定義Mysql.execute時, 對傳入的sql字串做了特殊要求

    await cur.execute(sql.replace('?', '%s'), args)
    # 即在寫sql時希望用 ? 代替佔位符 %s, 執行時再通過 replace 替換回來
    
    # 該要求用於`INSERT`時, 就得構造類似如下的sql
    "INSERT INTO `users`(`uid`, 'name') VALUES('?', '?')"
    
    # 定義createArgsStr方法, 傳入長度, 生成 ? 佔位符
    def createArgsStr(num):
        return ', '.join(['?' for _ in range(num)])
    
    # 那 INSERT語句就可以這樣構造
    f"INSERT INTO `users`({', '.join(new_fields)}) VALUES({createArgsStr(len(new_fields))})"
    
  • 熱身完畢, 接下來我們把insert, select, delete, update的功能加到ORM中

更新元類ModelMetaClass

class ModelMetaClass(type):
    def __new__(cls, name, bases, attrs):
        if name == 'Model':
            # 當出現與'Model'同名的類時, 直接建立這類
            return type.__new__(cls, name, bases, attrs)

        # 定義表名: 要麼在類中定義__table__屬性(目的是"類名可以與表名不相同"), 否則與類名相同
        tableName = attrs.get('__table__') or name
        print(f'建立對映關係: {name}類 --> {tableName}表')

        mappings = Dict()   # 儲存column與Field 子類的對應關係, Field在上一章中定義的, 忘了回去翻
        fields = []         # 用來儲存除主鍵以外的所有欄位名
        primaryKey = None   # 用來記錄主鍵欄位的名字, 初始沒有

        for k, v in attrs.items():
            # 遍歷所有屬性, 即對映表的欄位, 讀不懂請回看第二章 Users 的定義
            if isinstance(v, Field):     # Field類, 所有欄位型別的父類
                print(f'建立對映... column: {k} ==> class: {v}')
                mappings[k] = v

                if v.primaryKey:         # 判斷欄位是否被設定成了主鍵
                    if primaryKey:       # 因為一張表只能有一個主鍵
                        raise Exception(f'Duplicate primary key for field {k}')
                    primaryKey = k
                else:
                    fields.append(k)

        if not primaryKey:               # 這裡做了一步強制要求設定主鍵, 你也可以去掉
            raise Exception(f'請給表{tableName}設定主鍵')

        for k in mappings.keys():
            # 刪除原屬性, 避免例項的屬性遮蓋類的同名屬性, 況且我們已經儲存到 mappings 中了, 不怕丟
            attrs.pop(k)

        # 接下來給本元類(ModelMetaClass)建立的class(如 Model)設定私有屬性
        attrs['__mappings__'] = mappings
        attrs['__table__'] = tableName
        attrs['__primaryKey__'] = primaryKey
        attrs['__fields__'] = fields

        # 以下 8 行程式碼是新增的內容(注意提前定義好 createArgsStr), update 單拎出來, 這裡不做處理
        escapedFields = list(map(lambda a: f'`{a}`', fields))
        attrs['__select__'] = f"SELECT `{primaryKey}`, {', '.join(escapedFields)} FROM `{tableName}`"
        attrs['__delete__'] = f"DELETE FROM `{tableName}` WHERE `{primaryKey}`=?"

        # 由於 update_at & created_at 可以由mysql自動寫入, 我們寫入時可以不傳
        fields.remove('updated_at')
        fields.remove('created_at')
        attrs['__fields_2__'] = fields
        escapedFields_2 = list(map(lambda a: f'`{a}`', fields))
        attrs['__insert__'] = f"INSERT INTO `{tableName}` ({', '.join(escapedFields_2)}) VALUES ({createArgsStr(len(escapedFields_2))})"

        return type.__new__(cls, name, bases, attrs)

更新Model類, 新增findAll方法

class Model(Dict, metaclass=ModelMetaClass):
    """指定metaclass, 以實現動態定製"""

    def __init__(self, **kw):
        super().__init__(**kw)

    def getValue(self, key):
        return getattr(self, key, None)

    def getValueOrDefault(self, key):
        value = getattr(self, key, None)
        if value is None:
            field = self.__mappings__[key]   # 從所有column中獲取value
            if field.default is not None:
                # 如果default指向是方法(如time.time), 則呼叫方法獲取其值; 否則直接賦值
                value = field.default() if callable(field.default) else field.default
                print(f'using defalut value for {key}: {value}')
                setattr(self, key, value)    # 其實是調 Dict.__setattr__, 以支援用"."訪問
        return value

    # 以下內容為更新(新增)
    @classmethod
    def appendCondition(cls, sql, where, args, **kw):
        if where:
            sql.append('WHERE')
            sql.append(where)
        if not args:
            args = []
        orderBy = kw.get('orderBy')
        if orderBy:
            sql.append('ORDER BY')
            sql.append(orderBy)
        limit = kw.get('limit')
        if limit:
            sql.append('LIMIT')
            if isinstance(limit, int):
                sql.append('?')
                args.append(limit)
            elif isinstance(limit, tuple) and len(limit) == 2:
                sql.append('?, ?')
                args.extend(limit)  # 在列表末尾一次性追加另一個序列中的多個值
            else:
                raise Exception(f'Invalid limit value: {limit}')
        return sql

    @classmethod
    async def findAll(cls, where=None, args=None, **kw):
        '''返回結果集'''
        sql = [cls.__select__]
        sql = cls.appendCondition(sql, where, args, **kw)
        rs = await Mysql.select(' '.join(sql), args)
        # cls(**r)是呼叫Model.__init__方法, 將dict轉為Dict, 就可以 . 的方式獲取value
        return [cls(**r) for r in rs]
  • 挨個特性測試下findAll
if __name__ == '__main__':
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_until_complete(Mysql.createPool())
    u = users()
    # 測試查詢整張表
    rs = loop.run_until_complete(u.findAll())
    for i in rs:
        print(i.uid)
    # 測試where條件
    rs = loop.run_until_complete(u.findAll(where='is_deleted=?', args=[1]))
    print(rs[0].name)
    # 測試排序
    rs = loop.run_until_complete(u.findAll(where='is_deleted=?', args=[0], orderBy='uid DESC'))
    print(rs)
    # 測試分頁查詢
    rs = loop.run_until_complete(u.findAll(where='is_deleted=?', args=[1], orderBy='uid DESC', limit=1))
    print(rs[0].email)

再給Model類新增一個findFieldValue方法, 可以查詢指定欄位

@classmethod
async def findFieldValue(cls, selectField, where=None, args=None, **kw):
    sql = [f"SELECT {selectField} FROM `{cls.__table__}`"]
    sql = cls.appendCondition(sql, where, args, **kw)
    rs = await Mysql.select(' '.join(sql), args)
    return [cls(**r) for r in rs]


# 測試程式碼
if __name__ == '__main__':
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_until_complete(Mysql.createPool())
    u = users()
    rs = loop.run_until_complete(u.findFieldValue(
        'uid, name, email', 'is_deleted=?', [1], orderBy='uid DESC', limit=(0, 2)))
    print(rs)

再新增findByPrimaryKey, 根據主鍵id查詢

@classmethod
async def findByPrimaryKey(cls, pk):
    '''find object by primary key.'''
    rs = await Mysql.select(f'{cls.__select__} where `{cls.__primaryKey__}`=?', [pk], 1)
    if len(rs) == 0:
        return {}
    return cls(**rs[0])

還有insert, update, delete, logicDelete

async def insert(self):
    # 因為insert都是具體的一行資料, 即單個例項, 所以沒有定義成類方法
    args = list(map(self.getValueOrDefault, self.__fields_2__))  # __fields_2__不含update_at & created_at
    rows = await Mysql.execute(self.__insert__, args)
    if rows != 1:
        print(f'插入資料失敗, 影響行數: {rows}')

@classmethod
async def update(cls, setField, where, args):
    """這樣可以批量update"""
    sql = f"UPDATE `{cls.__table__}` SET {', '.join(map(lambda a: f'`{a}`=?', setField))} WHERE {where}"
    rows = await Mysql.execute(sql, args)
    if not rows:
        print(f'failed to update by {where}, affected rows: {rows}')

@classmethod
async def delete(cls, pk):
    """delete by primaryKey"""
    rows = await Mysql.execute(cls.__delete__, [pk])
    if rows != 1:
        print(
            f'failed to delete by {cls.__primaryKey__}, affected rows: {rows}')

@classmethod
async def logicDelete(cls, pk):
    await cls.update(setField=['is_deleted'],
                    where=f'{cls.__primaryKey__}=?', args=[1, pk])
  • 測試程式碼
if __name__ == '__main__':
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_until_complete(Mysql.createPool())
    u = users()
    
    import hashlib
    passwd = hashlib.md5('123456'.encode(encoding='utf-8')).hexdigest()
    test = users(email='[email protected]', passwd=passwd, name='test',
                 created_by=100, updated_by=100)
    loop.run_until_complete(test.insert())
    loop.run_until_complete(u.update(setField=['name'], where='uid=106', args=['newTest']))
    loop.run_until_complete(u.logicDelete(106))
    print(loop.run_until_complete(u.findByPrimaryKey(106)))

總結

  1. 到這裡小夥伴可能有看懵的, 不要緊, 原始碼和sql我都打包好了戳這裡下載, 請修改資料庫密碼啥的等配置項

  2. 其實此ORM缺點很多, 普適性差, 對非同步也沒能應用的合適; 只作為理解metaclass的小練習

  3. 不要重複造輪子, 推薦小夥伴還是用成熟的ORM框架, 如SQLAlchemy, aiomysql對它做了支援