参悟python元类(又称metaclass)系列实战(五)[Python基础]

python

写在前面

在上一章节参悟python元类(又称metaclass)系列实战(四)完成了Mysql类, 用来连接数据库以及执行sql语句;

继续丰富系列实战(三)的users类, 忘记的小伙伴请戳此处;

本章内容为该系列的终结篇, 感谢大家从一而终;

有误的地方恳请大神指正下。

热身预备

  • Mysql中, 严谨的sql语句都长的类似下面的格式

    SELECT `uid`, `name` FROM `users`;     -- 关键字都带"反引号"

  • Python中如何把一个list中的元素都加上反引号呢?

    fields = ["uid", "name"]

    new_fields = list(map(lambda a: f"`{a}`", fields)) # ["`uid`", "`name`"]

  • 如何把fields带入到sql中呢?

    f"SELECT {", ".join(new_fields)} FROM `users`"

  • 我们上一章定义Mysql.execute时, 对传入的sql字符串做了特殊要求

    await cur.execute(sql.replace("?", "%s"), args)

    # 即在写sql时希望用 ? 代替占位符 %s, 执行时再通过 replace 替换回来

    # 该要求用于`INSERT`时, 就得构造类似如下的sql

    "INSERT INTO `users`(`uid`, "name") VALUES("?", "?")"

    # 定义createArgsStr方法, 传入长度, 生成 ? 占位符

    def createArgsStr(num):

    return ", ".join(["?" for _ in range(num)])

    # 那 INSERT语句就可以这样构造

    f"INSERT INTO `users`({", ".join(new_fields)}) VALUES({createArgsStr(len(new_fields))})"

  • 热身完毕, 接下来我们把insert, select, delete, update的功能加到ORM中

更新元类ModelMetaClass

class ModelMetaClass(type):

def __new__(cls, name, bases, attrs):

if name == "Model":

# 当出现与"Model"同名的类时, 直接创建这类

return type.__new__(cls, name, bases, attrs)

# 定义表名: 要么在类中定义__table__属性(目的是"类名可以与表名不相同"), 否则与类名相同

tableName = attrs.get("__table__") or name

print(f"建立映射关系: {name}类 --> {tableName}表")

mappings = Dict() # 存储column与Field 子类的对应关系, Field在上一章中定义的, 忘了回去翻

fields = [] # 用来存储除主键以外的所有字段名

primaryKey = None # 用来记录主键字段的名字, 初始没有

for k, v in attrs.items():

# 遍历所有属性, 即映射表的字段, 读不懂请回看第二章 Users 的定义

if isinstance(v, Field): # Field类, 所有字段类型的父类

print(f"建立映射... column: {k} ==> class: {v}")

mappings[k] = v

if v.primaryKey: # 判断字段是否被设置成了主键

if primaryKey: # 因为一张表只能有一个主键

raise Exception(f"Duplicate primary key for field {k}")

primaryKey = k

else:

fields.append(k)

if not primaryKey: # 这里做了一步强制要求设置主键, 你也可以去掉

raise Exception(f"请给表{tableName}设置主键")

for k in mappings.keys():

# 删除原属性, 避免实例的属性遮盖类的同名属性, 况且我们已经保存到 mappings 中了, 不怕丢

attrs.pop(k)

# 接下来给本元类(ModelMetaClass)创建的class(如 Model)设置私有属性

attrs["__mappings__"] = mappings

attrs["__table__"] = tableName

attrs["__primaryKey__"] = primaryKey

attrs["__fields__"] = fields

# 以下 8 行代码是新增的内容(注意提前定义好 createArgsStr), update 单拎出来, 这里不做处理

escapedFields = list(map(lambda a: f"`{a}`", fields))

attrs["__select__"] = f"SELECT `{primaryKey}`, {", ".join(escapedFields)} FROM `{tableName}`"

attrs["__delete__"] = f"DELETE FROM `{tableName}` WHERE `{primaryKey}`=?"

# 由于 update_at & created_at 可以由mysql自动写入, 我们写入时可以不传

fields.remove("updated_at")

fields.remove("created_at")

attrs["__fields_2__"] = fields

escapedFields_2 = list(map(lambda a: f"`{a}`", fields))

attrs["__insert__"] = f"INSERT INTO `{tableName}` ({", ".join(escapedFields_2)}) VALUES ({createArgsStr(len(escapedFields_2))})"

return type.__new__(cls, name, bases, attrs)

更新Model类, 新增findAll方法

class Model(Dict, metaclass=ModelMetaClass):

"""指定metaclass, 以实现动态定制"""

def __init__(self, **kw):

super().__init__(**kw)

def getValue(self, key):

return getattr(self, key, None)

def getValueOrDefault(self, key):

value = getattr(self, key, None)

if value is None:

field = self.__mappings__[key] # 从所有column中获取value

if field.default is not None:

# 如果default指向是方法(如time.time), 则调用方法获取其值; 否则直接赋值

value = field.default() if callable(field.default) else field.default

print(f"using defalut value for {key}: {value}")

setattr(self, key, value) # 其实是调 Dict.__setattr__, 以支持用"."访问

return value

# 以下内容为更新(新增)

@classmethod

def appendCondition(cls, sql, where, args, **kw):

if where:

sql.append("WHERE")

sql.append(where)

if not args:

args = []

orderBy = kw.get("orderBy")

if orderBy:

sql.append("ORDER BY")

sql.append(orderBy)

limit = kw.get("limit")

if limit:

sql.append("LIMIT")

if isinstance(limit, int):

sql.append("?")

args.append(limit)

elif isinstance(limit, tuple) and len(limit) == 2:

sql.append("?, ?")

args.extend(limit) # 在列表末尾一次性追加另一个序列中的多个值

else:

raise Exception(f"Invalid limit value: {limit}")

return sql

@classmethod

async def findAll(cls, where=None, args=None, **kw):

"""返回结果集"""

sql = [cls.__select__]

sql = cls.appendCondition(sql, where, args, **kw)

rs = await Mysql.select(" ".join(sql), args)

# cls(**r)是调用Model.__init__方法, 将dict转为Dict, 就可以 . 的方式获取value

return [cls(**r) for r in rs]

  • 挨个特性测试下findAll

if __name__ == "__main__":

import asyncio

loop = asyncio.get_event_loop()

loop.run_until_complete(Mysql.createPool())

u = users()

# 测试查询整张表

rs = loop.run_until_complete(u.findAll())

for i in rs:

print(i.uid)

# 测试where条件

rs = loop.run_until_complete(u.findAll(where="is_deleted=?", args=[1]))

print(rs[0].name)

# 测试排序

rs = loop.run_until_complete(u.findAll(where="is_deleted=?", args=[0], orderBy="uid DESC"))

print(rs)

# 测试分页查询

rs = loop.run_until_complete(u.findAll(where="is_deleted=?", args=[1], orderBy="uid DESC", limit=1))

print(rs[0].email)

再给Model类新增一个findFieldValue方法, 可以查询指定字段

@classmethod

async def findFieldValue(cls, selectField, where=None, args=None, **kw):

sql = [f"SELECT {selectField} FROM `{cls.__table__}`"]

sql = cls.appendCondition(sql, where, args, **kw)

rs = await Mysql.select(" ".join(sql), args)

return [cls(**r) for r in rs]

# 测试代码

if __name__ == "__main__":

import asyncio

loop = asyncio.get_event_loop()

loop.run_until_complete(Mysql.createPool())

u = users()

rs = loop.run_until_complete(u.findFieldValue(

"uid, name, email", "is_deleted=?", [1], orderBy="uid DESC", limit=(0, 2)))

print(rs)

再新增findByPrimaryKey, 根据主键id查找

@classmethod

async def findByPrimaryKey(cls, pk):

"""find object by primary key."""

rs = await Mysql.select(f"{cls.__select__} where `{cls.__primaryKey__}`=?", [pk], 1)

if len(rs) == 0:

return {}

return cls(**rs[0])

还有insert, update, delete, logicDelete

async def insert(self):

# 因为insert都是具体的一行数据, 即单个实例, 所以没有定义成类方法

args = list(map(self.getValueOrDefault, self.__fields_2__)) # __fields_2__不含update_at & created_at

rows = await Mysql.execute(self.__insert__, args)

if rows != 1:

print(f"插入数据失败, 影响行数: {rows}")

@classmethod

async def update(cls, setField, where, args):

"""这样可以批量update"""

sql = f"UPDATE `{cls.__table__}` SET {", ".join(map(lambda a: f"`{a}`=?", setField))} WHERE {where}"

rows = await Mysql.execute(sql, args)

if not rows:

print(f"failed to update by {where}, affected rows: {rows}")

@classmethod

async def delete(cls, pk):

"""delete by primaryKey"""

rows = await Mysql.execute(cls.__delete__, [pk])

if rows != 1:

print(

f"failed to delete by {cls.__primaryKey__}, affected rows: {rows}")

@classmethod

async def logicDelete(cls, pk):

await cls.update(setField=["is_deleted"],

where=f"{cls.__primaryKey__}=?", args=[1, pk])

  • 测试代码

if __name__ == "__main__":

import asyncio

loop = asyncio.get_event_loop()

loop.run_until_complete(Mysql.createPool())

u = users()

import hashlib

passwd = hashlib.md5("123456".encode(encoding="utf-8")).hexdigest()

test = users(email="test@ztest.top", passwd=passwd, name="test",

created_by=100, updated_by=100)

loop.run_until_complete(test.insert())

loop.run_until_complete(u.update(setField=["name"], where="uid=106", args=["newTest"]))

loop.run_until_complete(u.logicDelete(106))

print(loop.run_until_complete(u.findByPrimaryKey(106)))

总结

  1. 到这里小伙伴可能有看懵的, 不要紧, 源码和sql我都打包好了戳这里下载, 请修改数据库密码啥的等配置项

  2. 其实此ORM缺点很多, 普适性差, 对异步也没能应用的合适; 只作为理解metaclass的小练习

  3. 不要重复造轮子, 推荐小伙伴还是用成熟的ORM框架, 如SQLAlchemy, aiomysql对它做了支持

以上是 参悟python元类(又称metaclass)系列实战(五)[Python基础] 的全部内容, 来源链接: utcz.com/z/530288.html

回到顶部