查询

本节将介绍通常在关系数据库上执行的基本CRUD操作:

注解

还有大量示例查询,这些查询来自 Postgresql Exercises 网站。示例列在 query examples 文件。

创建新记录

你可以使用 Model.create() 创建新模型实例。此方法接受关键字参数,其中键对应于模型字段的名称。将返回一个新实例,并向表中添加一行。

>>> User.create(username='Charlie')
<__main__.User object at 0x2529350>

本遗嘱 INSERT 数据库中的新行。主键将自动检索并存储在模型实例上。

或者,可以通过编程构建模型实例,然后调用 save()

>>> user = User(username='Charlie')
>>> user.save()  # save() returns the number of rows modified.
1
>>> user.id
1
>>> huey = User()
>>> huey.username = 'Huey'
>>> huey.save()
1
>>> huey.id
2

当模型具有外键时,可以在创建新记录时将模型实例直接分配给外键字段。

>>> tweet = Tweet.create(user=huey, message='Hello!')

还可以使用相关对象的主键值:

>>> tweet = Tweet.create(user=2, message='Hello again!')

如果只希望插入数据而不需要创建模型实例,则可以使用 Model.insert()

>>> User.insert(username='Mickey').execute()
3

执行插入查询后,将返回新行的主键。

注解

有几种方法可以加速大容量插入操作。退房 大块镶块 配方部分了解更多信息。

大块镶块

有几种方法可以快速加载大量数据。天真的做法是 Model.create() 在循环中:

data_source = [
    {'field1': 'val1-1', 'field2': 'val1-2'},
    {'field1': 'val2-1', 'field2': 'val2-2'},
    # ...
]

for data_dict in data_source:
    MyModel.create(**data_dict)

上述方法缓慢的原因有两个:

  1. 如果不在事务中包装循环,则每次调用 create() 发生在它自己的事务中。那真是太慢了!

  2. 有相当多的python逻辑妨碍了您的工作,并且 InsertQuery 必须生成并解析为SQL。

  3. 这是您发送到数据库进行解析的大量数据(以SQL的原始字节为单位)。

  4. 我们正在检索 上次插入ID, 这会导致在某些情况下执行额外的查询。

只需将其包装在事务中 atomic() .

# This is much faster.
with db.atomic():
    for data_dict in data_source:
        MyModel.create(**data_dict)

上述代码仍然受到点2、3和4的影响。我们可以通过使用 insert_many() . 此方法接受元组或字典列表,并在单个查询中插入多行:

data_source = [
    {'field1': 'val1-1', 'field2': 'val1-2'},
    {'field1': 'val2-1', 'field2': 'val2-2'},
    # ...
]

# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()

这个 insert_many() 方法还接受行元组列表,前提是还指定了相应的字段:

# We can INSERT tuples as well...
data = [('val1-1', 'val1-2'),
        ('val2-1', 'val2-2'),
        ('val3-1', 'val3-2')]

# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()

在事务中包装大容量插入也是一个很好的实践:

# You can, of course, wrap this in a transaction as well:
with db.atomic():
    MyModel.insert_many(data, fields=fields).execute()

注解

当使用大容量插入时,SQLite用户应该知道一些警告。具体来说,您的sqlite3版本必须是3.7.11.0或更高版本才能利用大容量插入API。此外,默认情况下,sqlite将SQL查询中绑定变量的数量限制为 999 .

批量插入行

根据数据源中的行数,您可能需要将其分成块。尤其是sqlite通常具有 limit of 999 每个查询的变量(批量大小大约为1000/行长度)。

您可以编写一个循环,将您的数据批处理成块(在这种情况下是 strongly recommended 您使用事务处理):

# Insert rows 100 at a time.
with db.atomic():
    for idx in range(0, len(data_source), 100):
        MyModel.insert_many(data_source[idx:idx+100]).execute()

Peewee带着 chunked() 可用于的助手函数 efficiently 将一个泛型iterable分为一系列 批处理- 尺寸:

from peewee import chunked

# Insert rows 100 at a time.
with db.atomic():
    for batch in chunked(data_source, 100):
        MyModel.insert_many(batch).execute()

选择

这个 Model.bulk_create() 方法的行为与 Model.insert_many() ,但它接受要插入的未保存模型实例的列表,并且可以选择接受批处理大小参数。使用 bulk_create() 应用程序编程接口:

# Read list of usernames from a file, for example.
with open('user_list.txt') as fh:
    # Create a list of unsaved User instances.
    users = [User(username=line.strip()) for line in fh.readlines()]

# Wrap the operation in a transaction and batch INSERT the users
# 100 at a time.
with db.atomic():
    User.bulk_create(users, batch_size=100)

注解

如果您使用的是PostgreSQL(它支持 RETURNING 子句),则以前未保存的模型实例将自动填充其新的主键值。

此外,Peewee还提供 Model.bulk_update() ,它可以有效地更新模型列表中的一个或多个列。例如:

# First, create 3 users with usernames u1, u2, u3.
u1, u2, u3 = [User.create(username='u%s' % i) for i in (1, 2, 3)]

# Now we'll modify the user instances.
u1.username = 'u1-x'
u2.username = 'u2-y'
u3.username = 'u3-z'

# Update all three users with a single UPDATE query.
User.bulk_update([u1, u2, u3], fields=[User.username])

注解

对于大型对象列表,应指定一个合理的批处理大小并将调用包装到 bulk_update() 具有 Database.atomic()

with database.atomic():
    User.bulk_update(list_of_users, fields=['username'], batch_size=50)

或者,您可以使用 Database.batch_commit() 帮助处理内部行块 批处理- 调整交易规模。该方法还为PostgreSQL之外的数据库提供了一种解决方案,当必须获取新创建行的主键时。

# List of row data to insert.
row_data = [{'username': 'u1'}, {'username': 'u2'}, ...]

# Assume there are 789 items in row_data. The following code will result in
# 8 total transactions (7x100 rows + 1x89 rows).
for row in db.batch_commit(row_data, 100):
    User.create(**row)

从另一个表批量加载

如果要批量加载的数据存储在另一个表中,则还可以创建 INSERT 源为的查询 SELECT 查询。使用 Model.insert_from() 方法:

res = (TweetArchive
       .insert_from(
           Tweet.select(Tweet.user, Tweet.message),
           fields=[TweetArchive.user, TweetArchive.message])
       .execute())

上面的查询等价于以下SQL:

INSERT INTO "tweet_archive" ("user_id", "message")
SELECT "user_id", "message" FROM "tweet";

更新现有记录

一旦模型实例具有主键,任何后续调用 save() 将导致 UPDATE 而不是另一个 INSERT. 模型的主键不会更改:

>>> user.save()  # save() returns the number of rows modified.
1
>>> user.id
1
>>> user.save()
>>> user.id
1
>>> huey.save()
1
>>> huey.id
2

如果要更新多个记录,请发出 UPDATE 查询。以下示例将更新所有 Tweet 对象,标记为 出版, 如果它们是在今天之前创建的。 Model.update() 接受关键字参数,其中键对应于模型的字段名:

>>> today = datetime.today()
>>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today)
>>> query.execute()  # Returns the number of rows that were updated.
4

有关详细信息,请参阅 Model.update()UpdateModel.bulk_update() .

注解

如果希望了解有关执行原子更新的更多信息(例如增加列的值),请检查 atomic update 食谱。

Atomic更新

Peewee允许您执行原子更新。假设我们需要更新一些计数器。幼稚的方法是写这样的东西:

>>> for stat in Stat.select().where(Stat.url == request.url):
...     stat.counter += 1
...     stat.save()

**不要这样做!**这不仅速度很慢,而且如果多个进程同时更新计数器,它还容易受到竞争条件的影响。

相反,您可以使用 update()

>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()

您可以使这些更新语句尽可能复杂。让我们给所有员工一笔奖金,相当于他们以前的奖金加上他们工资的10%:

>>> query = Employee.update(bonus=(Employee.bonus + (Employee.salary * .1)))
>>> query.execute()  # Give everyone a bonus!

我们甚至可以使用子查询来更新列的值。假设我们在 User 这个模型存储了用户发的微博数量,我们定期更新这个值。下面是如何编写这样的查询:

>>> subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id)
>>> update = User.update(num_tweets=subquery)
>>> update.execute()

上衣

Peewee为不同类型的upsert功能提供支持。对于3.24.0之前的sqlite和mysql,peewee提供 replace() ,它允许您插入一条记录,或者在违反约束的情况下替换现有记录。

使用示例 replace()on_conflict_replace()

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)

# Insert or update the user. The "last_login" value will be updated
# regardless of whether the user existed previously.
user_id = (User
           .replace(username='the-user', last_login=datetime.now())
           .execute())

# This query is equivalent:
user_id = (User
           .insert(username='the-user', last_login=datetime.now())
           .on_conflict_replace()
           .execute())

注解

除了 代替, sqlite、mysql和postgresql提供 ignore 行动(见: on_conflict_ignore() )如果您只希望插入并忽略任何潜在的约束冲突。

MySQL 支持通过 ON DUPLICATE KEY UPDATE 条款。例如:

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)
    login_count = IntegerField()

# Insert a new user.
User.create(username='huey', login_count=0)

# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
         .insert(username='huey', last_login=now, login_count=1)
         .on_conflict(
             preserve=[User.last_login],  # Use the value we would have inserted.
             update={User.login_count: User.login_count + 1})
         .execute())

在上面的示例中,我们可以根据需要安全地多次调用upsert查询。登录计数将自动递增,最后一个登录列将被更新,并且不会创建重复的行。

Postgresql and SQLite (3.24.0及更高版本)提供不同的语法,允许对哪些约束冲突应触发冲突解决以及应更新或保留哪些值进行更精细的控制。

使用示例 on_conflict() 要执行PostgreSQL样式的upsert(或sqlite 3.24+):

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)
    login_count = IntegerField()

# Insert a new user.
User.create(username='huey', login_count=0)

# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
         .insert(username='huey', last_login=now, login_count=1)
         .on_conflict(
             conflict_target=[User.username],  # Which constraint?
             preserve=[User.last_login],  # Use the value we would have inserted.
             update={User.login_count: User.login_count + 1})
         .execute())

在上面的示例中,我们可以根据需要安全地多次调用upsert查询。登录计数将自动递增,最后一个登录列将被更新,并且不会创建重复的行。

注解

mysql和postgresql/sqlite的主要区别在于postgresql和sqlite要求您指定 conflict_target .

下面是一个更高级的(如果是人为的)示例,使用 EXCLUDED 命名空间。这个 EXCLUDED helper允许我们引用冲突数据中的值。对于我们的示例,我们假设一个简单的表将唯一键(字符串)映射为值(整数):

class KV(Model):
    key = CharField(unique=True)
    value = IntegerField()

# Create one row.
KV.create(key='k1', value=1)

# Demonstrate usage of EXCLUDED.
# Here we will attempt to insert a new value for a given key. If that
# key already exists, then we will update its value with the *sum* of its
# original value and the value we attempted to insert -- provided that
# the new value is larger than the original value.
query = (KV.insert(key='k1', value=10)
         .on_conflict(conflict_target=[KV.key],
                      update={KV.value: KV.value + EXCLUDED.value},
                      where=(EXCLUDED.value > KV.value)))

# Executing the above query will result in the following data being
# present in the "kv" table:
# (key='k1', value=11)
query.execute()

# If we attempted to execute the query *again*, then nothing would be
# updated, as the new value (10) is now less than the value in the
# original row (11).

有关详细信息,请参阅 Insert.on_conflict()OnConflict .

删除记录

要删除单个模型实例,可以使用 Model.delete_instance() 捷径。 delete_instance() 将删除给定的模型实例,并且可以选择递归删除任何依赖对象(通过指定 recursive=True) .

>>> user = User.get(User.id == 1)
>>> user.delete_instance()  # Returns the number of rows deleted.
1

>>> User.get(User.id == 1)
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."id" = ?
PARAMS: [1]

要删除任意行集,可以发出 DELETE 查询。以下内容将全部删除 Tweet 超过一年的对象:

>>> query = Tweet.delete().where(Tweet.creation_date < one_year_ago)
>>> query.execute()  # Returns the number of rows deleted.
7

有关详细信息,请参阅以下文档:

选择单个记录

你可以使用 Model.get() 方法来检索与给定查询匹配的单个实例。对于主键查找,还可以使用快捷方式 Model.get_by_id() .

此方法是调用 Model.select() 使用给定的查询,但将结果集限制为一行。此外,如果没有与给定查询匹配的模型,则 DoesNotExist 将引发异常。

>>> User.get(User.id == 1)
<__main__.User object at 0x25294d0>

>>> User.get_by_id(1)  # Same as above.
<__main__.User object at 0x252df10>

>>> User[1]  # Also same as above.
<__main__.User object at 0x252dd10>

>>> User.get(User.id == 1).username
u'Charlie'

>>> User.get(User.username == 'Charlie')
<__main__.User object at 0x2529410>

>>> User.get(User.username == 'nobody')
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."username" = ?
PARAMS: ['nobody']

对于更高级的操作,可以使用 SelectBase.get() . 下面的查询从名为 查理:

>>> (Tweet
...  .select()
...  .join(User)
...  .where(User.username == 'charlie')
...  .order_by(Tweet.created_date.desc())
...  .get())
<__main__.Tweet object at 0x2623410>

有关详细信息,请参阅以下文档:

创建或获取

Peewee有一个用于执行“get/create”类型操作的助手方法: Model.get_or_create() ,它首先尝试检索匹配的行。如果失败,将创建一个新行。

对于“创建或获取”类型逻辑,通常需要依赖 unique 用于防止创建重复对象的约束或主键。例如,假设我们希望使用 example User model . 这个 User 模型有 unique 限制用户名字段,因此我们将依赖数据库的完整性保证,以确保最终不会出现重复的用户名:

try:
    with db.atomic():
        return User.create(username=username)
except peewee.IntegrityError:
    # `username` is a unique column, so this username already exists,
    # making it safe to call .get().
    return User.get(User.username == username)

您可以很容易地将这种类型的逻辑封装为 classmethod 靠你自己 Model 类。

上面的示例首先尝试创建,然后返回到检索,依赖数据库来强制执行唯一约束。如果您希望先尝试检索记录,则可以使用 get_or_create() . 此方法与同名的django函数沿同一行实现。可以使用django-style关键字参数筛选器指定 WHERE 条件。函数返回一个包含实例的2元组和一个指示对象是否已创建的布尔值。

下面是如何使用 get_or_create()

user, created = User.get_or_create(username=username)

假设我们有不同的型号 Person 并希望获取或创建一个Person对象。我们在检索 Person 是他们的姓和名, but 如果我们最终需要创建一个新记录,我们还将指定他们的出生日期和最喜欢的颜色:

person, created = Person.get_or_create(
    first_name=first_name,
    last_name=last_name,
    defaults={'dob': dob, 'favorite_color': 'green'})

传递给的任何关键字参数 get_or_create() 将用于 get() 逻辑的一部分,除了 defaults 字典,用于在新创建的实例上填充值。

有关更多详细信息,请阅读 Model.get_or_create() .

选择多个记录

我们可以使用 Model.select() 从表中检索行。当你构建一个 SELECT 查询时,数据库将返回与查询对应的所有行。Peewee允许您遍历这些行,并使用索引和切片操作:

>>> query = User.select()
>>> [user.username for user in query]
['Charlie', 'Huey', 'Peewee']

>>> query[1]
<__main__.User at 0x7f83e80f5550>

>>> query[1].username
'Huey'

>>> query[:2]
[<__main__.User at 0x7f83e80f53a8>, <__main__.User at 0x7f83e80f5550>]

Select 查询是智能的,因为您可以多次迭代、索引和切片查询,但查询只执行一次。

在下面的示例中,我们将简单地调用 select() 并迭代返回值,返回值是 Select . 这将返回 User

>>> for user in User.select():
...     print user.username
...
Charlie
Huey
Peewee

注解

在缓存结果时,同一查询的后续迭代将不会命中数据库。要禁用此行为(减少内存使用),请调用 Select.iterator() 迭代时。

在对包含外键的模型进行迭代时,请注意访问相关模型上的值的方式。意外地解析外键或在后引用上迭代可能导致 N+1 query behavior .

创建外键时,例如 Tweet.user ,您可以使用 backref 创建后参照的步骤( User.tweets )背面参考暴露为 Select 实例:

>>> tweet = Tweet.get()
>>> tweet.user  # Accessing a foreign key returns the related model.
<tw.User at 0x7f3ceb017f50>

>>> user = User.get()
>>> user.tweets  # Accessing a back-reference returns a query.
<peewee.ModelSelect at 0x7f73db3bafd0>

您可以迭代 user.tweets 像其他任何参考一样 Select

>>> for tweet in user.tweets:
...     print(tweet.message)
...
hello world
this is fun
look at this picture of my food

除了返回模型实例外, Select 查询可以返回字典、元组和namedtuples。根据您的用例,您可能会发现使用行作为字典更容易,例如:

>>> query = User.select().dicts()
>>> for row in query:
...     print(row)

{'id': 1, 'username': 'Charlie'}
{'id': 2, 'username': 'Huey'}
{'id': 3, 'username': 'Peewee'}

namedtuples()tuples()dicts() 更多信息。

迭代大型结果集

默认情况下,peewee将缓存在 Select 查询。这是一种优化,允许多次迭代以及索引和切片,而不会导致其他查询。但是,当您计划遍历大量行时,这种缓存可能会有问题。

要减少Peewee在迭代查询时使用的内存量,请使用 iterator() 方法。此方法允许您在不缓存返回的每个模型的情况下进行迭代,在迭代大型结果集时使用更少的内存。

# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()

# Our imaginary serializer class
serializer = CSVSerializer()

# Loop over all the stats and serialize.
for stat in stats.iterator():
    serializer.serialize_object(stat)

对于简单的查询,您可以通过将行作为字典、命名上传或元组返回来进一步提高速度。以下方法可用于任何 Select 更改结果行类型的查询:

别忘了附加 iterator() 方法调用还可以减少内存消耗。例如,上面的代码可能如下所示:

# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()

# Our imaginary serializer class
serializer = CSVSerializer()

# Loop over all the stats (rendered as tuples, without caching) and serialize.
for stat_tuple in stats.tuples().iterator():
    serializer.serialize_tuple(stat_tuple)

当遍历包含多个表中的列的大量行时,Peewee将为返回的每一行重建模型图。对于复杂的图,此操作可能很慢。例如,如果我们选择一个tweet列表以及tweet作者的用户名和化身,那么peewee必须为每行创建两个对象(tweet和一个用户)。除了上面的行类型之外,还有第四种方法 objects() 它将以模型实例的形式返回行,但不会尝试解析模型图。

例如:

query = (Tweet
         .select(Tweet, User)  # Select tweet and user data.
         .join(User))

# Note that the user columns are stored in a separate User instance
# accessible at tweet.user:
for tweet in query:
    print(tweet.user.username, tweet.content)

# Using ".objects()" will not create the tweet.user object and assigns all
# user attributes to the tweet instance:
for tweet in query.objects():
    print(tweet.username, tweet.content)

为了获得最大的性能,可以执行查询,然后使用底层数据库光标迭代结果。 Database.execute() 接受查询对象,执行查询,并返回db-api 2.0 Cursor 对象。光标将返回原始行元组:

query = Tweet.select(Tweet.content, User.username).join(User)
cursor = database.execute(query)
for (content, username) in cursor:
    print(username, '->', content)

筛选记录

您可以使用普通的python操作符过滤特定的记录。Peewee支持多种 query operators .

>>> user = User.get(User.username == 'Charlie')
>>> for tweet in Tweet.select().where(Tweet.user == user, Tweet.is_published == True):
...     print(tweet.user.username, '->', tweet.message)
...
Charlie -> hello world
Charlie -> this is fun

>>> for tweet in Tweet.select().where(Tweet.created_date < datetime.datetime(2011, 1, 1)):
...     print(tweet.message, tweet.created_date)
...
Really old tweet 2010-01-01 00:00:00

您还可以跨联接进行筛选:

>>> for tweet in Tweet.select().join(User).where(User.username == 'Charlie'):
...     print(tweet.message)
hello world
this is fun
look at this picture of my food

如果要表示复杂的查询,请使用括号和python的位 orand 运营商:

>>> Tweet.select().join(User).where(
...     (User.username == 'Charlie') |
...     (User.username == 'Peewee Herman'))

注解

请注意,peewee使用 bitwise 操作员( &| )而不是逻辑运算符( andor )原因是python将逻辑操作的返回值强制为布尔值。这也是必须使用 .in_() 而不是 in 操作员。

退房 the table of query operations 以查看哪些类型的查询是可能的。

注解

查询的WHERE子句中可以包含很多有趣的内容,例如:

  • 字段表达式,例如 User.username == 'Charlie'

  • 函数表达式,例如 fn.Lower(fn.Substr(User.username, 1, 1)) == 'a'

  • 一列与另一列的比较,例如 Employee.salary < (Employee.tenure * 1000) + 40000

您还可以嵌套查询,例如用户名以“a”开头的用户发出的tweets:

# get users whose username starts with "a"
a_users = User.select().where(fn.Lower(fn.Substr(User.username, 1, 1)) == 'a')

# the ".in_()" method signifies an "IN" query
a_user_tweets = Tweet.select().where(Tweet.user.in_(a_users))

更多查询示例

注解

有关广泛的示例查询,请参见 Query Examples 文档,其中显示如何从 PostgreSQL Exercises 网站。

获取活动用户:

User.select().where(User.active == True)

获取员工或超级用户:

User.select().where(
    (User.is_staff == True) | (User.is_superuser == True))

获取名为“charlie”的用户的tweets:

Tweet.select().join(User).where(User.username == 'charlie')

按员工或超级用户获取推文(假设为FK关系):

Tweet.select().join(User).where(
    (User.is_staff == True) | (User.is_superuser == True))

使用子查询按员工或超级用户获取推文:

staff_super = User.select(User.id).where(
    (User.is_staff == True) | (User.is_superuser == True))
Tweet.select().where(Tweet.user.in_(staff_super))

分类记录

要按顺序返回行,请使用 order_by() 方法:

>>> for t in Tweet.select().order_by(Tweet.created_date):
...     print(t.pub_date)
...
2010-01-01 00:00:00
2011-06-07 14:08:48
2011-06-07 14:12:57

>>> for t in Tweet.select().order_by(Tweet.created_date.desc()):
...     print(t.pub_date)
...
2011-06-07 14:12:57
2011-06-07 14:08:48
2010-01-01 00:00:00

您也可以使用 +- 用于指示排序的前缀运算符:

# The following queries are equivalent:
Tweet.select().order_by(Tweet.created_date.desc())

Tweet.select().order_by(-Tweet.created_date)  # Note the "-" prefix.

# Similarly you can use "+" to indicate ascending order, though ascending
# is the default when no ordering is otherwise specified.
User.select().order_by(+User.username)

您还可以跨联接排序。假设您希望按作者的用户名订购tweets,然后按创建日期订购:

query = (Tweet
         .select()
         .join(User)
         .order_by(User.username, Tweet.created_date.desc()))
SELECT t1."id", t1."user_id", t1."message", t1."is_published", t1."created_date"
FROM "tweet" AS t1
INNER JOIN "user" AS t2
  ON t1."user_id" = t2."id"
ORDER BY t2."username", t1."created_date" DESC

对计算值进行排序时,可以包含必要的SQL表达式,也可以引用分配给该值的别名。以下是两个例子,说明了这些方法:

# Let's start with our base query. We want to get all usernames and the number of
# tweets they've made. We wish to sort this list from users with most tweets to
# users with fewest tweets.
query = (User
         .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username))

您可以使用 select 条款。在下面的示例中,我们按照 COUNT() tweet id降序:

query = (User
         .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(fn.COUNT(Tweet.id).desc()))

或者,您可以引用分配给计算值的别名 select 条款。这种方法的好处是更容易阅读。注意,我们不是直接引用命名别名,而是使用 SQL 帮手:

query = (User
         .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(SQL('num_tweets').desc()))

或者,以“peewee”的方式做事:

ntweets = fn.COUNT(Tweet.id)
query = (User
         .select(User.username, ntweets.alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(ntweets.desc())

获取随机记录

有时您可能希望从数据库中提取随机记录。您可以通过 randomrand 函数(取决于数据库):

PostgreSQL和SQLite使用 Random 功能:

# Pick 5 lucky winners:
LotteryNumber.select().order_by(fn.Random()).limit(5)

MySQL使用 Rand:

# Pick 5 lucky winners:
LotterNumber.select().order_by(fn.Rand()).limit(5)

分页记录

这个 paginate() 方法使获取 page 或记录。 paginate() 取两个参数, page_numberitems_per_page .

注意

页码以1为基础,因此结果的第一页将是第1页。

>>> for tweet in Tweet.select().order_by(Tweet.id).paginate(2, 10):
...     print(tweet.message)
...
tweet 10
tweet 11
tweet 12
tweet 13
tweet 14
tweet 15
tweet 16
tweet 17
tweet 18
tweet 19

如果您想要更细粒度的控制,可以始终使用 limit()offset() .

盘点记录

您可以计算任何选择查询中的行数:

>>> Tweet.select().count()
100
>>> Tweet.select().where(Tweet.id > 50).count()
50

Peewee将把您的查询包装在执行计数的外部查询中,这将导致类似SQL的结果:

SELECT COUNT(1) FROM ( ... your query ... );

正在聚合记录

假设您有一些用户,并且想要得到他们的列表以及每个用户的tweet数量。

query = (User
         .select(User, fn.Count(Tweet.id).alias('count'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User))

结果查询将返回 User 具有所有正常属性和附加属性的对象 count 它将包含每个用户的tweet数。我们使用左外部连接来包括没有tweet的用户。

假设您有一个标记应用程序,希望找到具有一定数量相关对象的标记。对于这个例子,我们将在 many-to-many 配置:

class Photo(Model):
    image = CharField()

class Tag(Model):
    name = CharField()

class PhotoTag(Model):
    photo = ForeignKeyField(Photo)
    tag = ForeignKeyField(Tag)

现在假设我们想要找到至少有5张照片与之关联的标签:

query = (Tag
         .select()
         .join(PhotoTag)
         .join(Photo)
         .group_by(Tag)
         .having(fn.Count(Photo.id) > 5))

此查询等效于以下SQL:

SELECT t1."id", t1."name"
FROM "tag" AS t1
INNER JOIN "phototag" AS t2 ON t1."id" = t2."tag_id"
INNER JOIN "photo" AS t3 ON t2."photo_id" = t3."id"
GROUP BY t1."id", t1."name"
HAVING Count(t3."id") > 5

假设我们想要获取相关的计数并将其存储在标签上:

query = (Tag
         .select(Tag, fn.Count(Photo.id).alias('count'))
         .join(PhotoTag)
         .join(Photo)
         .group_by(Tag)
         .having(fn.Count(Photo.id) > 5))

检索标量值

您可以通过调用 Query.scalar() . 例如:

>>> PageView.select(fn.Count(fn.Distinct(PageView.url))).scalar()
100

可以通过传递来检索多个标量值 as_tuple=True

>>> Employee.select(
...     fn.Min(Employee.salary), fn.Max(Employee.salary)
... ).scalar(as_tuple=True)
(30000, 50000)

窗口功能

A Window 函数是指一个聚合函数,该函数在作为 SELECT 查询。窗口功能使您可以执行以下操作:

  1. 对结果集的子集执行聚合。

  2. 计算运行总数。

  3. 排名结果。

  4. 将行值与前面(或后面)中的值进行比较!行。

Peewee支持SQL窗口函数,可以通过调用 Function.over() 并传入分区或排序参数。

对于以下示例,我们将使用以下模型和示例数据:

class Sample(Model):
    counter = IntegerField()
    value = FloatField()

data = [(1, 10),
        (1, 20),
        (2, 1),
        (2, 3),
        (3, 100)]
Sample.insert_many(data, fields=[Sample.counter, Sample.value]).execute()

我们的示例表现在包含:

身份证件

柜台

价值

1

1

10.0

2

1

20.0

3

2

1.0

4

2

3.0

5

3

100.0

有序窗口

我们来计算 value 字段。为了使它成为一个“连续的”总数,我们需要订购它,所以我们将订购关于样品的 id 领域:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(order_by=[Sample.id]).alias('total'))

for sample in query:
    print(sample.counter, sample.value, sample.total)

# 1    10.    10.
# 1    20.    30.
# 2     1.    31.
# 2     3.    34.
# 3   100    134.

对于另一个例子,我们将计算当前值和上一个值之间的差异,当 id

difference = Sample.value - fn.LAG(Sample.value, 1).over(order_by=[Sample.id])
query = Sample.select(
    Sample.counter,
    Sample.value,
    difference.alias('diff'))

for sample in query:
    print(sample.counter, sample.value, sample.diff)

# 1    10.   NULL
# 1    20.    10.  -- (20 - 10)
# 2     1.   -19.  -- (1 - 20)
# 2     3.     2.  -- (3 - 1)
# 3   100     97.  -- (100 - 3)

分区窗口

让我们计算一下平均值 value 对于每个不同的“计数器”值。注意,有三个可能的值 counter 字段(1、2和3)。我们可以通过计算 AVG()value 列位于根据 counter 领域:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.AVG(Sample.value).over(partition_by=[Sample.counter]).alias('cavg'))

for sample in query:
    print(sample.counter, sample.value, sample.cavg)

# 1    10.    15.
# 1    20.    15.
# 2     1.     2.
# 2     3.     2.
# 3   100    100.

我们可以通过指定 order_bypartition_by 参数。例如,让我们在每个不同的 counter 组。

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.RANK().over(
        order_by=[Sample.value],
        partition_by=[Sample.counter]).alias('rank'))

for sample in query:
    print(sample.counter, sample.value, sample.rank)

# 1    10.    1
# 1    20.    2
# 2     1.    1
# 2     3.    2
# 3   100     1

有界窗口

默认情况下,使用 unbounded preceding 从窗口开始,然后 current row 作为结束。我们可以通过指定 start 和/或 end 在召唤 Function.over() . 此外,Peewee在 Window 用于生成适当边界引用的对象:

为了检查边界是如何工作的,我们将计算 value 列,按 idbut 我们只查看当前行的运行合计,它是前面两行:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.id],
        start=Window.preceding(2),
        end=Window.CURRENT_ROW).alias('rsum'))

for sample in query:
    print(sample.counter, sample.value, sample.rsum)

# 1    10.    10.
# 1    20.    30.  -- (20 + 10)
# 2     1.    31.  -- (1 + 20 + 10)
# 2     3.    24.  -- (3 + 1 + 20)
# 3   100    104.  -- (100 + 3 + 1)

注解

从技术上讲,我们不需要具体说明 end=Window.CURRENT 因为这是默认值。示例中显示了它以供演示。

让我们来看另一个例子。在这个例子中,我们将计算一个运行总数的“相反”值,其中所有值的总和都会被样本值减少,排序是 id . 为此,我们将计算从当前行到最后一行的总和。

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.id],
        start=Window.CURRENT_ROW,
        end=Window.following()).alias('rsum'))

# 1    10.   134.  -- (10 + 20 + 1 + 3 + 100)
# 1    20.   124.  -- (20 + 1 + 3 + 100)
# 2     1.   104.  -- (1 + 3 + 100)
# 2     3.   103.  -- (3 + 100)
# 3   100    100.  -- (100)

过滤集料

聚合函数也可以支持过滤函数(postgres和sqlite 3.25+),这些函数被转换为 FILTER (WHERE...) 条款。将筛选表达式添加到聚合函数中, Function.filter() 方法。

例如,我们将计算 value 字段相对于 id ,但我们将筛选出 counter=2 .

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).filter(Sample.counter != 2).over(
        order_by=[Sample.id]).alias('csum'))

for sample in query:
    print(sample.counter, sample.value, sample.csum)

# 1    10.    10.
# 1    20.    30.
# 2     1.    30.
# 2     3.    30.
# 3   100    130.

注解

呼唤 filter() 必须在调用之前 over() .

重用窗口定义

如果要对多个聚合使用相同的窗口定义,可以创建 Window 对象。这个 Window 对象的参数与 Function.over() ,并可以传递给 over() 方法代替单个参数。

在这里,我们将申报一个单窗,按样品订购。 id ,并使用该窗口定义调用多个窗口函数:

win = Window(order_by=[Sample.id])
query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.LEAD(Sample.value).over(win),
    fn.LAG(Sample.value).over(win),
    fn.SUM(Sample.value).over(win)
).window(win)  # Include our window definition in query.

for row in query.tuples():
    print(row)

# counter  value  lead()  lag()  sum()
# 1          10.     20.   NULL    10.
# 1          20.      1.    10.    30.
# 2           1.      3.    20.    31.
# 2           3.    100.     1.    34.
# 3         100.    NULL     3.   134.

多窗口定义

在前面的示例中,我们看到了如何声明 Window 定义并重新用于多个不同的聚合。您可以根据需要在查询中包含尽可能多的窗口定义,但必须确保每个窗口具有唯一的别名:

w1 = Window(order_by=[Sample.id]).alias('w1')
w2 = Window(partition_by=[Sample.counter]).alias('w2')
query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(w1).alias('rsum'),  # Running total.
    fn.AVG(Sample.value).over(w2).alias('cavg')   # Avg per category.
).window(w1, w2)  # Include our window definitions.

for sample in query:
    print(sample.counter, sample.value, sample.rsum, sample.cavg)

# counter  value   rsum     cavg
# 1          10.     10.     15.
# 1          20.     30.     15.
# 2           1.     31.      2.
# 2           3.     34.      2.
# 3         100     134.    100.

同样,如果有多个窗口定义共享类似的定义,则可以扩展以前定义的窗口定义。例如,这里我们将按计数器值对数据集进行分区,因此我们将针对计数器进行聚合。然后,我们将定义扩展此分区的第二个窗口,并添加一个排序子句:

w1 = Window(partition_by=[Sample.counter]).alias('w1')

# By extending w1, this window definition will also be partitioned
# by "counter".
w2 = Window(extends=w1, order_by=[Sample.value.desc()]).alias('w2')

query = (Sample
         .select(Sample.counter, Sample.value,
                 fn.SUM(Sample.value).over(w1).alias('group_sum'),
                 fn.RANK().over(w2).alias('revrank'))
         .window(w1, w2)
         .order_by(Sample.id))

for sample in query:
    print(sample.counter, sample.value, sample.group_sum, sample.revrank)

# counter  value   group_sum   revrank
# 1        10.     30.         2
# 1        20.     30.         1
# 2        1.      4.          2
# 2        3.      4.          1
# 3        100.    100.        1

帧类型:范围vs行vs组

根据帧类型,数据库将以不同的方式处理有序组。我们再创建两个 Sample 显示差异的行:

>>> Sample.create(counter=1, value=20.)
<Sample 6>
>>> Sample.create(counter=2, value=1.)
<Sample 7>

我们的表现在包含:

身份证件

柜台

价值

1

1

10.0

2

1

20.0

3

2

1.0

4

2

3.0

5

3

100.0

6

1

20.0

7

2

1.0

让我们通过计算样本的“运行总和”来检查差异,这些样本是按照 countervalue 领域。要指定帧类型,可以使用以下任一项:

行为 RANGE 如果存在逻辑重复项,则可能导致意外结果:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.counter, Sample.value],
        frame_type=Window.RANGE).alias('rsum'))

for sample in query.order_by(Sample.counter, Sample.value):
    print(sample.counter, sample.value, sample.rsum)

# counter  value   rsum
# 1          10.     10.
# 1          20.     50.
# 1          20.     50.
# 2           1.     52.
# 2           1.     52.
# 2           3.     55.
# 3         100     155.

通过包含新行,我们现在有了一些重复的行 categoryvalue 价值观。这个 RANGE 帧类型使这些重复项一起计算,而不是单独计算。

通过使用 ROWS 作为框架类型:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.counter, Sample.value],
        frame_type=Window.ROWS).alias('rsum'))

for sample in query.order_by(Sample.counter, Sample.value):
    print(sample.counter, sample.value, sample.rsum)

# counter  value   rsum
# 1          10.     10.
# 1          20.     30.
# 1          20.     50.
# 2           1.     51.
# 2           1.     52.
# 2           3.     55.
# 3         100     155.

Peewee使用以下规则确定要使用的帧类型:

  • 如果用户指定 frame_type ,将使用该框架类型。

  • 如果 start 和/或 end 边界是指定的,peewee将默认为使用 ROWS .

  • 如果用户没有指定帧类型或开始/结束边界,Peewee将使用数据库默认值,即 RANGE .

这个 Window.GROUPS 框架类型根据排序术语按行组查看窗口范围规范。使用 GROUPS ,我们可以定义框架,以便它覆盖不同的行分组。让我们来看一个例子:

query = (Sample
         .select(Sample.counter, Sample.value,
                 fn.SUM(Sample.value).over(
                    order_by=[Sample.counter, Sample.value],
                    frame_type=Window.GROUPS,
                    start=Window.preceding(1)).alias('gsum'))
         .order_by(Sample.counter, Sample.value))

for sample in query:
    print(sample.counter, sample.value, sample.gsum)

#  counter   value    gsum
#  1         10       10
#  1         20       50
#  1         20       50   (10) + (20+0)
#  2         1        42
#  2         1        42   (20+20) + (1+1)
#  2         3        5    (1+1) + 3
#  3         100      103  (3) + 100

正如您希望推断的那样,窗口按其排序术语分组,即 (counter, value) .我们正在查看一个在前一组和当前组之间扩展的窗口。

注解

有关窗口函数API的信息,请参见:

有关窗口函数的一般信息,请阅读Postgres window functions tutorial

另外, postgres docs 以及 sqlite docs 包含很多好信息。

检索行元组/字典/名称双重

有时,您不需要创建模型实例的开销,只需要遍历行数据,而不需要提供所有API。 Model . 为此,请使用:

stats = (Stat
         .select(Stat.url, fn.Count(Stat.url))
         .group_by(Stat.url)
         .tuples())

# iterate over a list of 2-tuples containing the url and count
for stat_url, stat_count in stats:
    print(stat_url, stat_count)

同样,您可以使用 dicts()

stats = (Stat
         .select(Stat.url, fn.Count(Stat.url).alias('ct'))
         .group_by(Stat.url)
         .dicts())

# iterate over a list of 2-tuples containing the url and count
for stat in stats:
    print(stat['url'], stat['ct'])

退回条款

PostgresqlDatabase 支持 RETURNING 条款 UPDATEINSERTDELETE 查询。指定一个 RETURNING 子句允许您迭代查询访问的行。

默认情况下,执行不同查询时的返回值为:

  • INSERT -自动递增新插入行的主键值。如果不使用自动递增的主键,Postgres将返回新行的主键,但SQLite和MySQL将不返回。

  • UPDATE -修改的行数

  • DELETE -删除的行数

当使用返回子句时,执行查询时的返回值将是一个可ITerable光标对象。

PostgreSQL允许通过 RETURNING 子句,从查询插入或修改的行返回数据。

例如,假设您有 Update 这将停用注册已过期的所有用户帐户。停用后,您希望向每个用户发送一封电子邮件,让他们知道他们的帐户已停用。而不是写两个查询, SELECT 和一个 UPDATE ,您可以在一个 UPDATE 用A查询 RETURNING 条款:

query = (User
         .update(is_active=False)
         .where(User.registration_expired == True)
         .returning(User))

# Send an email to every user that was deactivated.
for deactivate_user in query.execute():
    send_deactivation_email(deactivated_user.email)

这个 RETURNING 条款也可用于 InsertDelete . 当使用时 INSERT ,将返回新创建的行。当使用时 DELETE ,将返回已删除的行。

唯一的限制 RETURNING 子句是它只能由查询中列出的表中的列组成 FROM 条款。要从特定表中选择所有列,只需传入 Model 班级。

作为另一个示例,我们添加一个用户,并将其创建日期设置为服务器生成的当前时间戳。我们将在单个查询中创建和检索新用户的ID、电子邮件和创建时间戳:

query = (User
         .insert(email='foo@bar.com', created=fn.now())
         .returning(User))  # Shorthand for all columns on User.

# When using RETURNING, execute() returns a cursor.
cursor = query.execute()

# Get the user object we just inserted and log the data:
user = cursor[0]
logger.info('Created user %s (id=%s) at %s', user.email, user.id, user.created)

默认情况下,光标将返回 Model 实例,但可以指定其他行类型:

data = [{'name': 'charlie'}, {'name': 'huey'}, {'name': 'mickey'}]
query = (User
         .insert_many(data)
         .returning(User.id, User.username)
         .dicts())

for new_user in query.execute():
    print('Added user "%s", id=%s' % (new_user['username'], new_user['id']))

就像 Select 查询,您可以指定 result row types .

公用表表达式

Peewee支持在所有类型的查询中包含公共表表达式(CTE)。CTE可用于:

  • 分解出一个公共子查询。

  • 按CTE结果集中派生的列进行分组或筛选。

  • 正在写入递归查询。

宣布一项 Select 查询用作CTE,使用 cte() 方法,它将查询包装在 CTE 对象。表示 CTE 应作为查询的一部分包含,使用 Query.with_cte() 方法,传递CTE对象列表。

简单实例

例如,假设我们有一些由键和浮点值组成的数据点。让我们定义模型并填充一些测试数据:

class Sample(Model):
    key = TextField()
    value = FloatField()

data = (
    ('a', (1.25, 1.5, 1.75)),
    ('b', (2.1, 2.3, 2.5, 2.7, 2.9)),
    ('c', (3.5, 3.5)))

# Populate data.
for key, values in data:
    Sample.insert_many([(key, value) for value in values],
                       fields=[Sample.key, Sample.value]).execute()

让我们使用一个CTE来计算每个不同的键,哪个值高于该键的平均值。

# First we'll declare the query that will be used as a CTE. This query
# simply determines the average value for each key.
cte = (Sample
       .select(Sample.key, fn.AVG(Sample.value).alias('avg_value'))
       .group_by(Sample.key)
       .cte('key_avgs', columns=('key', 'avg_value')))

# Now we'll query the sample table, using our CTE to find rows whose value
# exceeds the average for the given key. We'll calculate how far above the
# average the given sample's value is, as well.
query = (Sample
         .select(Sample.key, Sample.value)
         .join(cte, on=(Sample.key == cte.c.key))
         .where(Sample.value > cte.c.avg_value)
         .order_by(Sample.value)
         .with_cte(cte))

我们可以迭代查询返回的样本,以查看哪些样本的给定组的值高于平均值:

>>> for sample in query:
...     print(sample.key, sample.value)

# 'a', 1.75
# 'b', 2.7
# 'b', 2.9

复杂实例

对于更完整的示例,让我们考虑下面的查询,它使用多个CTE来查找仅在顶部销售区域中的每个产品销售总额。我们的模型如下:

class Order(Model):
    region = TextField()
    amount = FloatField()
    product = TextField()
    quantity = IntegerField()

下面是如何用SQL编写查询。这个例子可以在 postgresql documentation .

WITH regional_sales AS (
    SELECT region, SUM(amount) AS total_sales
    FROM orders
    GROUP BY region
  ), top_regions AS (
    SELECT region
    FROM regional_sales
    WHERE total_sales > (SELECT SUM(total_sales) / 10 FROM regional_sales)
  )
SELECT region,
       product,
       SUM(quantity) AS product_units,
       SUM(amount) AS product_sales
FROM orders
WHERE region IN (SELECT region FROM top_regions)
GROUP BY region, product;

用peewee,我们会写:

reg_sales = (Order
             .select(Order.region,
                     fn.SUM(Order.amount).alias('total_sales'))
             .group_by(Order.region)
             .cte('regional_sales'))

top_regions = (reg_sales
               .select(reg_sales.c.region)
               .where(reg_sales.c.total_sales > (
                   reg_sales.select(fn.SUM(reg_sales.c.total_sales) / 10)))
               .cte('top_regions'))

query = (Order
         .select(Order.region,
                 Order.product,
                 fn.SUM(Order.quantity).alias('product_units'),
                 fn.SUM(Order.amount).alias('product_sales'))
         .where(Order.region.in_(top_regions.select(top_regions.c.region)))
         .group_by(Order.region, Order.product)
         .with_cte(regional_sales, top_regions))

递归CTE

Peewee支持递归CTE。例如,当您具有由父链接外键表示的树数据结构时,递归CTE可能很有用。例如,假设我们有一个在线书店的类别层次结构。我们希望生成一个显示所有类别及其绝对深度的表,以及从根目录到类别的路径。

我们将假设以下模型定义,其中每个类别都有其直接父类别的外键:

class Category(Model):
    name = TextField()
    parent = ForeignKeyField('self', backref='children', null=True)

要列出所有类别及其深度和父级,我们可以使用递归CTE:

# Define the base case of our recursive CTE. This will be categories that
# have a null parent foreign-key.
Base = Category.alias()
level = Value(1).alias('level')
path = Base.name.alias('path')
base_case = (Base
             .select(Base.name, Base.parent, level, path)
             .where(Base.parent.is_null())
             .cte('base', recursive=True))

# Define the recursive terms.
RTerm = Category.alias()
rlevel = (base_case.c.level + 1).alias('level')
rpath = base_case.c.path.concat('->').concat(RTerm.name).alias('path')
recursive = (RTerm
             .select(RTerm.name, RTerm.parent, rlevel, rpath)
             .join(base_case, on=(RTerm.parent == base_case.c.id)))

# The recursive CTE is created by taking the base case and UNION ALL with
# the recursive term.
cte = base_case.union_all(recursive)

# We will now query from the CTE to get the categories, their levels,  and
# their paths.
query = (cte
         .select_from(cte.c.name, cte.c.level, cte.c.path)
         .order_by(cte.c.path))

# We can now iterate over a list of all categories and print their names,
# absolute levels, and path from root -> category.
for category in query:
    print(category.name, category.level, category.path)

# Example output:
# root, 1, root
# p1, 2, root->p1
# c1-1, 3, root->p1->c1-1
# c1-2, 3, root->p1->c1-2
# p2, 2, root->p2
# c2-1, 3, root->p2->c2-1

外键和联接

此分区已移动到其自己的文档中: 关系和连接 .