二、智能ORM框架实现
1. 基础元类定义
class ModelMeta(type):
def __new__(cls, name, bases, namespace):
# 收集字段定义
fields = {}
for key, value in namespace.items():
if isinstance(value, Field):
fields[key] = value
value.name = key
# 创建类对象
new_class = super().__new__(cls, name, bases, namespace)
# 添加元数据
new_class._meta = {
'table': name.lower(),
'fields': fields
}
return new_class
2. 字段类型系统
class Field:
def __init__(self, primary_key=False, null=False, default=None):
self.primary_key = primary_key
self.null = null
self.default = default
self.name = None
class CharField(Field):
def __init__(self, max_length=255, **kwargs):
super().__init__(**kwargs)
self.max_length = max_length
class IntegerField(Field):
pass
class ForeignKey(Field):
def __init__(self, to, **kwargs):
super().__init__(**kwargs)
self.to = to
3. 模型基类实现
class Model(metaclass=ModelMeta):
def __init__(self, **kwargs):
for name, field in self._meta['fields'].items():
value = kwargs.get(name, field.default)
setattr(self, name, value)
@classmethod
def create_table(cls):
sql = f"CREATE TABLE {cls._meta['table']} ("
columns = []
for name, field in cls._meta['fields'].items():
column = f"{name} {field.db_type()}"
if field.primary_key:
column += " PRIMARY KEY"
if not field.null:
column += " NOT NULL"
columns.append(column)
sql += ", ".join(columns) + ")"
Database.execute(sql)
def save(self):
fields = []
values = []
for name, field in self._meta['fields'].items():
fields.append(name)
values.append(getattr(self, name))
sql = f"INSERT INTO {self._meta['table']} ({','.join(fields)}) VALUES ({','.join(['?']*len(values))})"
Database.execute(sql, values)
三、高级功能扩展
1. 多数据库支持
class DatabaseRouter:
_engines = {
'sqlite': SQLiteEngine,
'mysql': MySQLEngine,
'postgres': PostgresEngine
}
@classmethod
def get_engine(cls, db_type):
return cls._engines[db_type]()
def __init__(self, using='default'):
self.using = using
def execute(self, sql, params=None):
engine = self.get_engine(settings.DATABASES[self.using]['engine'])
return engine.execute(sql, params)
2. 查询表达式构建
class Q:
def __init__(self, **kwargs):
self.conditions = kwargs
def __and__(self, other):
return And(self, other)
def __or__(self, other):
return Or(self, other)
class QuerySet:
def filter(self, *args, **kwargs):
self._filters.extend(args)
self._filters.append(Q(**kwargs))
return self
def execute(self):
where = []
params = []
for condition in self._filters:
sql, cond_params = condition.as_sql()
where.append(sql)
params.extend(cond_params)
sql = f"SELECT * FROM {self.model._meta['table']}"
if where:
sql += " WHERE " + " AND ".join(where)
return Database.execute(sql, params)
四、实际应用示例
class User(Model):
id = IntegerField(primary_key=True)
username = CharField(max_length=50)
email = CharField(max_length=255)
created_at = DateTimeField(default=datetime.now)
class Post(Model):
id = IntegerField(primary_key=True)
title = CharField(max_length=100)
content = TextField()
author = ForeignKey(to=User)
published = BooleanField(default=False)
# 使用示例
User.create_table()
admin = User(username='admin', email='admin@example.com')
admin.save()
# 复杂查询
posts = Post.objects.filter(
Q(published=True) &
(Q(title__contains='Python') | Q(content__contains='Python'))
).order_by('-created_at')