from orm_pool.mysql_pool import Mysql
# 定义字段类
class Field(object):
def __init__(self, name, colmun_type, primary_key, default):
self.name = name
self.colmun_type = colmun_type
self.primary_key = primary_key
self.default = default
# 细化字段,定义字符串类
class StringField(Field):
def __init__(self, name, colmun_type=‘varchar(255)‘, primary_key=False, default=None):
# 重用父类方法,super()自动传值效果
super().__init__(name, colmun_type, primary_key, default)
# 细化字段,定义字符串类
class IntegerField(Field):
def __init__(self, name, colmun_type=‘int‘, primary_key=False, default=0):
super().__init__(name, colmun_type, primary_key, default)
# 定义元类拦截需要映射的类,user,movie等
class MymitaClass(type):
# class_name是类名, class_bases类的基类,class_artt类的名称空间
def __new__(cls, class_name, class_bases, class_artt):
# 拦截我们需要映射的类,是他们在创建之时就具备表的特性,而Models不用来映射表
if class_name == ‘Models‘:
# 重用父类type的new方法,new产生一个空对象
return type.__new__(cls, class_name, class_bases, class_artt) # 重用父类type的new方法,这里拦截已结束!
table_name = class_artt.get(‘table_name‘, class_name) # 表名=表名,没有默认等于类名
primary_key = None # 定义主键为None
mappings = {} # 定义空字典,用来装我们自己定义的字段名和字段对应的对象:
# 获取我们定义的字段属性,把类中内置的拿走
for k, v in class_artt.items():
if isinstance(v, Field):
# id = IntegerField(name=‘id‘, primary_key=True),id是k, v=IntegerField(name=‘id‘, primary_key=True)
mappings[k] = v
if v.primary_key:
# 表的健壮性考研
if primary_key:
raise TypeError(‘一张表只能有一个主键‘)
primary_key = v.name
# 把重复的字段的删除掉,在从新添加进去
for k in mappings.keys():
class_artt.pop(k)
if not primary_key:
raise TypeError(‘一张表必须要有一个主键‘)
# 将表的属性添加的类的名称空间里去,使得它具备表的特性
class_artt[‘table_name‘] = table_name
class_artt[‘primary_key‘] = primary_key
class_artt[‘mappings‘] = mappings
return type.__new__(cls, class_name, class_bases, class_artt)
class Models(dict, metaclass=MymitaClass):
def __init__(self, **kwargs):
# 重用父类字典的方法,使得该类具备字典的特性
super().__init__(**kwargs)
def __getattr__(self, item):
# 字典对象获取值
return self.get(item, ‘没有改键‘)
def __setattr__(self, key, value):
# 字典对象设置值
self[key] = value
# 我们查询方法都是针对表,查询表,表就是类,所以定义为类方法
@classmethod
def select(cls, **kwargs):
me = Mysql()
# 查询分两种:select * from table; select * from table where 字段=字段记录(字段=字段记录,所以用**)
if not kwargs:
sql = "select * from %s" % cls.table_name
res = me.select(sql)
# select * from table where id=1
else:
k = list(kwargs.keys())[0]
v = kwargs.get(k)
sql = "select * from %s where %s=?" % (cls.table_name, k)
sql = sql.replace(‘?‘, ‘%s‘)
res = me.select(sql, v)
if res:
# 列表推导式获取到一个个字典, ** 打散成[obj1,obj2,obj3...]
return [cls(**r) for r in res]
def update(self):
me = Mysql()
# update %s set name=? where id=1
values = []
pr = None
field = []
for k, v in self.mappings.items():
if v.primary_key: # 获取主键,构成id=1的形式
pr = getattr(self, v.name, v.name) # 获取主键值
else:
field.append(v.name + ‘=?‘) # [name=? password=?]
values.append(getattr(self, v.name, v.default)) # 字段对应的记录值也就是问号值
sql = "update %s set %s where %s=%s" % (self.table_name, ‘,‘.join(field), self.primary_key, pr)
sql = sql.replace(‘?‘, ‘%s‘)
me.execute(sql, values)
def save(self):
me = Mysql()
values = []
args = []
field = []
# "insert into % (%) values(%s)"
for k, v in self.mappings.items():
if not v.primary_key:
field.append(v.name)
args.append(‘?‘)
values.append(getattr(self, v.name, v.default))
sql = "insert into %s (%s) values(%s)" % (self.table_name, ‘,‘.join(field), ‘,‘.join(args))
sql = sql.replace(‘?‘, ‘%s‘)
me.execute(sql, values)
# if __name__ == ‘__main__‘:
# # class W(Models):
# # table_name = ‘w‘
# # id = IntegerField(name=‘id‘, primary_key=True)
# # name = StringField(name=‘name‘)
# #
# # obj = W(name=‘owen‘)
# # obj.save()
原文:https://www.cnblogs.com/jingandyuer/p/10946542.html