ORM框架實現 & sqlalchemy
阿新 • • 發佈:2020-12-14
import pymysql from pymysql.cursors import DictCursor # class Field: # def __init__(self,name,column=None,chief=False,unique=False,index=False,nullable=True,default=None): # self.name=name # if column is None: # self.column=name # else: # self.column=column # self.chief=chief # self.unique=unique # self.index=index # self.nullable=nullable # self.default=default # # def validate(self,value): # raise NotImplementedError # # def __get__(self,instance,owner): # if instance is None: # return self # return instance.__dict__[self.name] # # def __set__(self,instance,value): # self.validate(value) # instance.__dict__[self.name]=value # # def __str__(self): # return '<{} <{}>>'.format(self.__class__.__name__,self.name) # # __repr__=__str__ # # class IntField(Field): # def __init__(self,name,column=None,chief=False,unique=False,index=False,nullable=True,default=None, # auto_increment=False): # super(IntField,self).__init__(name,column,chief,unique,index,nullable,default) # self.auto_increment=auto_increment # # def validate(self,value): # if value is None: # if self.chief: # raise TypeError('{} is primary key,yet value is None'.format(self.name)) # if not self.nullable: # raise TypeError('{} required'.format(self.name)) # else: # if not isinstance(value,int): # raise GeneratorExit('{} should be integer'.format(value)) # # class StringField(Field): # def __init__(self,name,column=None,chief=None,unique=None,index=None,nullable=True,default=None,length=None): # super(StringField,self).__init__(name,column,chief,unique,index,nullable,default) # self.length=length # # def validate(self,value): # if value is None: # if self.chief: # raise TypeError('{} is primary key,yet value is None'.format(self.name)) # if not self.nullable: # raise TypeError('{} required'.format(self.name)) # else: # if not isinstance(value, str): # raise GeneratorExit('{} should be str'.format(value)) # if len(value) > self.length: # raise ValueError('{} too long'.format(value)) # # class S: # id=IntField('id','id',True,nullable=False,auto_increment=True) # name=StringField('name',nullable=False,length=64) # age=IntField('age') # # def __init__(self,id,name,age): # self.id=id # self.name=name # self.age=age # # def __str__(self): # return 'S({} {} {})'.format(self.id,self.name,self.age) # # __repr__=__str__ # # def save(self,conn:pymysql.connections.Connection): # sql='insert into s (id,name,age) values (%s,%s,%s)' # cursor=conn.cursor(cursor=DictCursor) # cursor.execute(sql,args=(self.id,self.name,self.age)) # cursor.close() class Field: def __init__(self, name=None, column=None, chief=False, unique=False, index=False, nullable=True, default=None): self.name = name if column is None: self.column = name else: self.column = column self.chief = chief self.unique = unique self.index = index self.nullable = nullable self.default = default def validate(self, value): raise NotImplementedError() def __get__(self, instance, owner): if instance is None: return self return instance.__dict__[self.name] def __set__(self, instance, value): self.validate(value) instance.__dict__[self.name] = value def __str__(self): return '<{} {}>'.format(self.__class__.__name__, self.name) __repr__ = __str__ class IntField(Field, object): def __init__(self, name=None, column=None, chief=False, unique=False, index=False, nullable=True, default=None, auto_increment=False): self.auto_increment = auto_increment super(IntField, self).__init__(name, column, chief, unique, index, nullable, default) def validate(self, value): if value is None: if self.chief: raise TypeError('{} is primary key,yet value is None'.format(self.name)) if not self.nullable: raise TypeError('{} required'.format(self.name)) else: if not isinstance(value, int): raise GeneratorExit('{} should be integer'.format(value)) class StringField(Field): def __init__(self, length, name=None, column=None, chief=False, unique=False, index=False, nullable=True, default=None, ): self.length = length super(StringField, self).__init__(name, column, chief, unique, index, nullable, default) def validate(self, value): if value is None: if self.chief: raise TypeError('{} is primary key,yet value is None'.format(self.name)) if not self.nullable: raise TypeError('{} required'.format(self.name)) else: if not isinstance(value, str): raise GeneratorExit('{} should be str'.format(value)) if len(value) > self.length: raise ValueError('{} too long'.format(value)) class Session: def __init__(self, conn: pymysql.connections.Connection): self.conn = conn self.cursor = None self.mark = False def execute(self, sql, *args, **kwargs): if self.cursor is None: self.mark = True self.cursor = self.conn.cursor(cursor=DictCursor) if args: self.cursor.execute(sql, args) if kwargs: self.cursor.execute(sql, args=kwargs) if self.mark: self.conn.commit() self.mark = False def __enter__(self): self.cursor = self.conn.cursor(cursor=DictCursor) return self # 以後呼叫session物件的execute def __exit__(self, exc_type, exc_value, exc_tb): self.cursor.close() print(exc_type, exc_value, exc_tb) if exc_type: self.conn.rollback() else: self.conn.commit() # class Session: # def __init__(self, conn: pymysql.connections.Connection): # self.conn = conn # # def execute(self, sql, *args, **kwargs): # try: # cursor = self.conn.cursor(cursor=DictCursor) # with cursor: # if args: # cursor.execute(sql, args) # if kwargs: # cursor.execute(sql, kwargs) # except: # self.conn.rollback() # else: # self.conn.commit() class ModelMeta(type): def __new__(cls, what: str, bases, attrs: dict): # print(what,bases,attrs) if attrs.get('__tablename__', None) is None: # if '__tablename__' not in attrs.keys(): attrs.setdefault('__tablename__', what.lower()) mapping = {} primary_key = [] for k, v in attrs.items(): if isinstance(v, (Field,)): if v.name is None: v.name = k # descriptor if v.column is None: v.column = k mapping[k] = v if v.chief: primary_key.append(v) attrs.setdefault('__mapping__', mapping) attrs.setdefault('__primary_key__', primary_key) return super(ModelMeta, cls).__new__(cls, what, bases, attrs) # 需要return,否則Model為None class Model(metaclass=ModelMeta): def save(self, session: Session): names = [] columns = [] values = [] for k, v in type(self).__dict__.items(): # 遍歷self例項字典,v是值,而type(self)字典為描述器,藉助描述器得到column if isinstance(v, (Field,)): if k in self.__dict__.keys(): names.append(k) columns.append(v.column) values.append(self.__dict__[k]) print(names, columns, values) sql = 'insert into {} ({}) values ({})'.format( self.__tablename__, ','.join(columns), ','.join(['%s'] * len(columns)) ) with session: session.execute(sql, *values) class S(Model): __tablename__ = 'pp' id = IntField('id', column='id', chief=True, nullable=False, auto_increment=True) sname = StringField(64, 'sname', column='name', nullable=False) age = IntField('age', chief=True) def __init__(self, id, name, age): self.id = id self.sname = name self.age = age def __str__(self): return 'Student({},{},{})'.format(self.id, self.sname, self.age) __repr__ = __str__ # def save(self, conn: pymysql.connections.Connection): # sql = 'insert into pp (id,name,age) values (%(id)s,%(name)s,%(age)s)' # try: # cursor = conn.cursor(cursor=DictCursor) # with cursor: # cursor.execute(sql, args={'id': self.id, 'name': self.name, 'age': self.age}) # conn.commit() # except: # conn.rollback() # # def save(self, session: Session): # sql = 'insert into pp (id,name,age) values (%(id)s,%(name)s,%(age)s)' # session.execute(sql, id=self.id, name=self.name, age=self.age) # # with session: # # session.execute(sql,id=self.id,name=self.name,age=self.age) # # def save(self, session: Session): # sql = 'insert into pp (id,age,name) values (%(id)s,%(age)s,%(name)s)' # session.execute(sql, name=self.name, age=self.age, id=self.id) class Engine: def __init__(self,*args,**kwargs): self.conn=pymysql.connect(*args,**kwargs) def save(self,instance:S): names=[] columns=[] values=[] for k,v in instance.__mapping__.items(): if k in instance.__dict__.keys(): names.append('`{}`'.format(k)) columns.append('`{}`'.format(v.column)) values.append(instance.__dict__[k]) query='insert into {} ({}) values ({})'.format( instance.__tablename__, ','.join(columns), ','.join(['%s']*len(columns)) ) try: cursor=self.conn.cursor(cursor=DictCursor) with cursor: cursor.execute(query,args=values) except Exception as e: print(e.args,e.__context__) self.conn.rollback() else: self.conn.commit() conn = pymysql.connections.Connection('localhost', 'root', 'cruces', 'uranus') s = S(444, 'zxc', 88) print(s.id, s.sname, s.age) # s.save(Session(conn)) print(s.__dict__) print(S.__dict__) engine=Engine('localhost', 'root', 'cruces', 'uranus') engine.save(s) # for k,v in S.__dict__.items(): # # print(v,v.__class__.__bases__,type(v.__class__)) # if v.__class__.__bases__[0] == Field: # print(k)
import sqlalchemy from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column,Integer,String,Float,Enum,Date Base=declarative_base() class Student(Base): __tablename__='Stu' id=Column(Integer,primary_key=True,nullable=False,autoincrement=True) name=Column(String(64),nullable=False) age=Column(Float,unique=True) def __repr__(self): return '<{} id:{}, name:{}, age:{}>'.format(self.__class__.__name__,self.id,self.name,self.age) __str__=__repr__ engine=sqlalchemy.create_engine("mysql+pymysql://root:cruces@localhost:3306/uranus",echo=True) Base.metadata.create_all(engine)