Python實戰網站開發:Day4-編寫Model
阿新 • • 發佈:2021-11-25
編寫Model
orm.py
編寫完成後,就可以把網站應用需要的三個表(user, blog, comment)用Model
表示出來。在www
目錄下,新建models.py
:
import time, uuid # orm中匯入模組 from orm import Model, StringField, BooleanField, FloatField, TextField # 定義函式,用於隨機生成一個字串作為主鍵id的值 # 加括號執行函式返回一個字串字串去前15位為時間戳乘以1000,然後不足15位使用0補齊 # 後面為使用uuid.uuid4().hex隨機生成的一段字串 # 最後補000 def next_id(): return '%015d%s000' % (int(time.time() * 1000), uuid.uuid4().hex) # 使用者模組 class User(Model): __table__ = 'users' id = StringField(primary_key=True, default=next_id, ddl='varchar(50)') email = StringField(ddl='varchar(50)') passwd = StringField(ddl='varchar(50)') admin = BooleanField() name = StringField(ddl='varchar(50)') image = StringField(ddl='varchar(500)') created_at = FloatField(default=time.time) # 部落格模組 class Blog(Model): __table__ = 'blogs' id = StringField(primary_key=True, default=next_id, ddl='varchar(50)') user_id = StringField(ddl='varchar(50)') user_name = StringField(ddl='varchar(50)') user_image = StringField(ddl='varchar(500)') name = StringField(ddl='varchar(50)') summary = StringField(ddl='varchar(200)') content = TextField() created_at = FloatField(default=time.time) class Comment(Model): __table__ = 'comments' id = StringField(primary_key=True, default=next_id, ddl='varchar(50)') blog_id = StringField(ddl='varchar(50)') user_id = StringField(ddl='varchar(50)') user_name = StringField(ddl='varchar(50)') user_image = StringField(ddl='varchar(500)') content = TextField() created_at = FloatField(default=time.time)
在編寫ORM時,給Field增加一個default引數可以讓ORM自己填入預設值,非常方便,而且,預設值可以作為函式物件傳入,在呼叫save()時自動計算。例如,主鍵id的預設值是函式next_id,建立時間created_at的預設值是函式time.time,可以自動設定當前日期和時間。日期和時間用float型別儲存在資料庫中,而不是datetime型別,這麼做的好處是不必關心資料庫的時區以及時區轉換問題,排序非常簡單,顯示的時候,只需要做一個float到str的轉換,也非常容易。
關於預設值,在上一節從拆分解析過程我們已經有講過,id和created_at的預設值都是一個函式名,然後在呼叫save()時函式內部使用callable判斷是不是可呼叫物件,如果是則加()執行把返回值賦值給對應的欄位例如id欄位獲得的返回值是一個前面是時間戳乘以1000加一個uuid隨機生成的字串組成的一整個隨機字串,createad_at獲得的返回值就是time.time()即當前的時間戳。
初始化資料庫表
由於網站表的數量較少,可以手動建立SQL指令碼schema.sql
到根目錄:
drop database if exists awesome; drop user if exists 'www-data'@'localhost'; create database awesome; use awesome; create user 'www-data'@'localhost' identified by 'www-data'; alter user 'www-data'@'localhost' identified with mysql_native_password by 'www-data'; grant select, insert, update, delete on awesome.* to 'www-data'@'localhost'; create table users ( `id` varchar(50) not null, `email` varchar(50) not null, `passwd` varchar(50) not null, `admin` bool not null, `name` varchar(50) not null, `image` varchar(500) not null, `created_at` real not null, unique key `idx_email` (`email`), key `idx_created_at` (`created_at`), primary key (`id`) ) engine=innodb default charset=utf8; create table blogs ( `id` varchar(50) not null, `user_id` varchar(50) not null, `user_name` varchar(50) not null, `user_image` varchar(500) not null, `name` varchar(50) not null, `summary` varchar(200) not null, `content` mediumtext not null, `created_at` real not null, key `idx_created_at` (`created_at`), primary key (`id`) ) engine=innodb default charset=utf8; create table comments ( `id` varchar(50) not null, `blog_id` varchar(50) not null, `user_id` varchar(50) not null, `user_name` varchar(50) not null, `user_image` varchar(500) not null, `content` mediumtext not null, `created_at` real not null, key `idx_created_at` (`created_at`), primary key (`id`) ) engine=innodb default charset=utf8;
把SQL指令碼schema.sql
放到MySQL命令列裡執行,就完成了資料庫表的初始化:
mysql -u root -p < schema.sql
然後我們可以編寫資料訪問程式碼test.py
測試一下。如新建一個User的物件:
import orm import asyncio from models import User, Blog, Comment # 定義協程函式test async def test(loop): # 建立連線池 await orm.create_pool(loop=loop,user='www-data', password='www-data', db='awesome') # 使用類User例項化一個物件,其實就是相當於建立了一個字典 # id和created_at沒有傳入,使用預設值next_id和time.time # 如果id和created_at在初始化的時候傳入會優先獲取初始化時傳入的值,不使用預設值 u = User(name='Test', email='[email protected]', passwd='1234567890', image='about:blank') # print(dir(u)) # print(u.__fields__) await u.save() # print(u.__fields__) ## 網友指出新增到資料庫後需要關閉連線池,否則會報錯 RuntimeError: Event loop is closed orm.__pool.close() await orm.__pool.wait_closed() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(test(loop)) loop.close()
執行test.py
後,可以在MySQL客戶端命令列查詢,看看測試的資料是不是正常儲存到MySQL裡面。
mysql> select * from users; +----------------------------------------------------+-------------+------------+-------+------+-------------+------------------+ | id | email | passwd | admin | name | image | created_at | +----------------------------------------------------+-------------+------------+-------+------+-------------+------------------+ | 00163780190327232747957103e40eda717c9c7c5e057b2000 | [email protected] | 1234567890 | 0 | Test | about:blank | 1637801903.27281 | +----------------------------------------------------+-------------+------------+-------+------+-------------+------------------+ 1 row in set (0.00 sec)