1. 程式人生 > 資料庫 >ORM框架實現 & sqlalchemy

ORM框架實現 & sqlalchemy

  

import pymysql
from pymysql.cursors import DictCursor


# class Field:
#     def __init__(self,name,column=None,chief=False,unique=False,index=False,nullable=True,default=None):
#         self.name=name
#         if column is None:
#             self.column=name
#         else:
#             self.column=column
#         self.chief=chief
#         self.unique=unique
#         self.index=index
#         self.nullable=nullable
#         self.default=default
#
#     def validate(self,value):
#         raise NotImplementedError
#
#     def __get__(self,instance,owner):
#         if instance is None:
#             return self
#         return instance.__dict__[self.name]
#
#     def __set__(self,instance,value):
#         self.validate(value)
#         instance.__dict__[self.name]=value
#
#     def __str__(self):
#         return '<{} <{}>>'.format(self.__class__.__name__,self.name)
#
#     __repr__=__str__
#
# class IntField(Field):
#     def __init__(self,name,column=None,chief=False,unique=False,index=False,nullable=True,default=None,
#                  auto_increment=False):
#         super(IntField,self).__init__(name,column,chief,unique,index,nullable,default)
#         self.auto_increment=auto_increment
#
#     def validate(self,value):
#         if value is None:
#             if self.chief:
#                 raise TypeError('{} is primary key,yet value is None'.format(self.name))
#             if not self.nullable:
#                 raise TypeError('{} required'.format(self.name))
#         else:
#             if not isinstance(value,int):
#                 raise GeneratorExit('{} should be integer'.format(value))
#
# class StringField(Field):
#     def __init__(self,name,column=None,chief=None,unique=None,index=None,nullable=True,default=None,length=None):
#         super(StringField,self).__init__(name,column,chief,unique,index,nullable,default)
#         self.length=length
#
#     def validate(self,value):
#         if value is None:
#             if self.chief:
#                 raise TypeError('{} is primary key,yet value is None'.format(self.name))
#             if not self.nullable:
#                 raise TypeError('{} required'.format(self.name))
#         else:
#             if not isinstance(value, str):
#                 raise GeneratorExit('{} should be str'.format(value))
#             if len(value) > self.length:
#                 raise ValueError('{} too long'.format(value))
#
# class S:
#     id=IntField('id','id',True,nullable=False,auto_increment=True)
#     name=StringField('name',nullable=False,length=64)
#     age=IntField('age')
#
#     def __init__(self,id,name,age):
#         self.id=id
#         self.name=name
#         self.age=age
#
#     def __str__(self):
#         return 'S({} {} {})'.format(self.id,self.name,self.age)
#
#     __repr__=__str__
#
#     def save(self,conn:pymysql.connections.Connection):
#         sql='insert into s (id,name,age) values (%s,%s,%s)'
#         cursor=conn.cursor(cursor=DictCursor)
#         cursor.execute(sql,args=(self.id,self.name,self.age))
#         cursor.close()

class Field:
    def __init__(self, name=None, column=None, chief=False, unique=False, index=False, nullable=True, default=None):
        self.name = name
        if column is None:
            self.column = name
        else:
            self.column = column
        self.chief = chief
        self.unique = unique
        self.index = index
        self.nullable = nullable
        self.default = default

    def validate(self, value):
        raise NotImplementedError()

    def __get__(self, instance, owner):
        if instance is None:
            return self
        return instance.__dict__[self.name]

    def __set__(self, instance, value):
        self.validate(value)
        instance.__dict__[self.name] = value

    def __str__(self):
        return '<{} {}>'.format(self.__class__.__name__, self.name)

    __repr__ = __str__


class IntField(Field, object):
    def __init__(self, name=None, column=None, chief=False, unique=False, index=False, nullable=True, default=None,
                 auto_increment=False):
        self.auto_increment = auto_increment
        super(IntField, self).__init__(name, column, chief, unique, index, nullable, default)

    def validate(self, value):
        if value is None:
            if self.chief:
                raise TypeError('{} is primary key,yet value is None'.format(self.name))
            if not self.nullable:
                raise TypeError('{} required'.format(self.name))
        else:
            if not isinstance(value, int):
                raise GeneratorExit('{} should be integer'.format(value))


class StringField(Field):
    def __init__(self, length, name=None, column=None, chief=False, unique=False, index=False, nullable=True,
                 default=None, ):
        self.length = length
        super(StringField, self).__init__(name, column, chief, unique, index, nullable, default)

    def validate(self, value):
        if value is None:
            if self.chief:
                raise TypeError('{} is primary key,yet value is None'.format(self.name))
            if not self.nullable:
                raise TypeError('{} required'.format(self.name))
        else:
            if not isinstance(value, str):
                raise GeneratorExit('{} should be str'.format(value))
            if len(value) > self.length:
                raise ValueError('{} too long'.format(value))


class Session:
    def __init__(self, conn: pymysql.connections.Connection):
        self.conn = conn
        self.cursor = None
        self.mark = False

    def execute(self, sql, *args, **kwargs):
        if self.cursor is None:
            self.mark = True
            self.cursor = self.conn.cursor(cursor=DictCursor)
        if args:
            self.cursor.execute(sql, args)
        if kwargs:
            self.cursor.execute(sql, args=kwargs)
        if self.mark:
            self.conn.commit()
            self.mark = False

    def __enter__(self):
        self.cursor = self.conn.cursor(cursor=DictCursor)
        return self  # 以後呼叫session物件的execute

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.cursor.close()
        print(exc_type, exc_value, exc_tb)
        if exc_type:
            self.conn.rollback()
        else:
            self.conn.commit()


# class Session:
#     def __init__(self, conn: pymysql.connections.Connection):
#         self.conn = conn
#
#     def execute(self, sql, *args, **kwargs):
#         try:
#             cursor = self.conn.cursor(cursor=DictCursor)
#             with cursor:
#                 if args:
#                     cursor.execute(sql, args)
#                 if kwargs:
#                     cursor.execute(sql, kwargs)
#         except:
#             self.conn.rollback()
#         else:
#             self.conn.commit()


class ModelMeta(type):
    def __new__(cls, what: str, bases, attrs: dict):
        # print(what,bases,attrs)
        if attrs.get('__tablename__', None) is None:
            # if '__tablename__' not in attrs.keys():
            attrs.setdefault('__tablename__', what.lower())

        mapping = {}
        primary_key = []

        for k, v in attrs.items():
            if isinstance(v, (Field,)):
                if v.name is None:
                    v.name = k  # descriptor
                if v.column is None:
                    v.column = k

                mapping[k] = v

                if v.chief:
                    primary_key.append(v)

        attrs.setdefault('__mapping__', mapping)
        attrs.setdefault('__primary_key__', primary_key)

        return super(ModelMeta, cls).__new__(cls, what, bases, attrs)  # 需要return,否則Model為None


class Model(metaclass=ModelMeta):
    def save(self, session: Session):
        names = []
        columns = []
        values = []
        for k, v in type(self).__dict__.items():  # 遍歷self例項字典,v是值,而type(self)字典為描述器,藉助描述器得到column
            if isinstance(v, (Field,)):
                if k in self.__dict__.keys():
                    names.append(k)
                    columns.append(v.column)
                    values.append(self.__dict__[k])
        print(names, columns, values)

        sql = 'insert into {} ({}) values ({})'.format(
            self.__tablename__,
            ','.join(columns),
            ','.join(['%s'] * len(columns))
        )

        with session:
            session.execute(sql, *values)


class S(Model):
    __tablename__ = 'pp'
    id = IntField('id', column='id', chief=True, nullable=False, auto_increment=True)
    sname = StringField(64, 'sname', column='name', nullable=False)
    age = IntField('age', chief=True)

    def __init__(self, id, name, age):
        self.id = id
        self.sname = name
        self.age = age

    def __str__(self):
        return 'Student({},{},{})'.format(self.id, self.sname, self.age)

    __repr__ = __str__

    # def save(self, conn: pymysql.connections.Connection):
    #     sql = 'insert into pp (id,name,age) values (%(id)s,%(name)s,%(age)s)'
    #     try:
    #         cursor = conn.cursor(cursor=DictCursor)
    #         with cursor:
    #             cursor.execute(sql, args={'id': self.id, 'name': self.name, 'age': self.age})
    #         conn.commit()
    #     except:
    #         conn.rollback()
    #
    # def save(self, session: Session):
    #     sql = 'insert into pp (id,name,age) values (%(id)s,%(name)s,%(age)s)'
    #     session.execute(sql, id=self.id, name=self.name, age=self.age)
    #     # with session:
    #     #     session.execute(sql,id=self.id,name=self.name,age=self.age)
    #
    # def save(self, session: Session):
    #     sql = 'insert into pp (id,age,name) values (%(id)s,%(age)s,%(name)s)'
    #     session.execute(sql, name=self.name, age=self.age, id=self.id)

class Engine:
    def __init__(self,*args,**kwargs):
        self.conn=pymysql.connect(*args,**kwargs)

    def save(self,instance:S):
        names=[]
        columns=[]
        values=[]
        for k,v in instance.__mapping__.items():
            if k in instance.__dict__.keys():
                names.append('`{}`'.format(k))
                columns.append('`{}`'.format(v.column))
                values.append(instance.__dict__[k])

        query='insert into {} ({}) values ({})'.format(
            instance.__tablename__,
            ','.join(columns),
            ','.join(['%s']*len(columns))
        )

        try:
            cursor=self.conn.cursor(cursor=DictCursor)
            with cursor:
                cursor.execute(query,args=values)
        except Exception as e:
            print(e.args,e.__context__)
            self.conn.rollback()
        else:
            self.conn.commit()

conn = pymysql.connections.Connection('localhost', 'root', 'cruces', 'uranus')
s = S(444, 'zxc', 88)
print(s.id, s.sname, s.age)
# s.save(Session(conn))
print(s.__dict__)
print(S.__dict__)
engine=Engine('localhost', 'root', 'cruces', 'uranus')
engine.save(s)
# for k,v in S.__dict__.items():
#     # print(v,v.__class__.__bases__,type(v.__class__))
#     if v.__class__.__bases__[0] == Field:
#         print(k)

 

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,Float,Enum,Date

Base=declarative_base()

class Student(Base):
    __tablename__='Stu'
    id=Column(Integer,primary_key=True,nullable=False,autoincrement=True)
    name=Column(String(64),nullable=False)
    age=Column(Float,unique=True)

    def __repr__(self):
        return '<{} id:{}, name:{}, age:{}>'.format(self.__class__.__name__,self.id,self.name,self.age)

    __str__=__repr__

engine=sqlalchemy.create_engine("mysql+pymysql://root:cruces@localhost:3306/uranus",echo=True)

Base.metadata.create_all(engine)