查询生成器

Peewee的高级 ModelField API建立在较低的级别上 TableColumn 相对应的人。虽然这些较低级别的API没有像它们的高级别对应的那样详细地记录,但是本文档将提供一个概述,其中包含一些示例,希望您能够进行实验。

我们将使用以下模式:

CREATE TABLE "person" (
    "id" INTEGER NOT NULL PRIMARY KEY,
    "first" TEXT NOT NULL,
    "last" TEXT NOT NULL);

CREATE TABLE "note" (
    "id" INTEGER NOT NULL PRIMARY KEY,
    "person_id" INTEGER NOT NULL,
    "content" TEXT NOT NULL,
    "timestamp" DATETIME NOT NULL,
    FOREIGN KEY ("person_id") REFERENCES "person" ("id"));

CREATE TABLE "reminder" (
    "id" INTEGER NOT NULL PRIMARY KEY,
    "note_id" INTEGER NOT NULL,
    "alarm" DATETIME NOT NULL,
    FOREIGN KEY ("note_id") REFERENCES "note" ("id"));

声明表

我们有两种方法可以申报 Table 用于处理这些表的对象:

# Explicitly declare columns
Person = Table('person', ('id', 'first', 'last'))

Note = Table('note', ('id', 'person_id', 'content', 'timestamp'))

# Do not declare columns, they will be accessed using magic ".c" attribute
Reminder = Table('reminder')

通常我们会想 bind() 我们的表到数据库。这就避免了每次希望对表执行查询时都必须显式传递数据库:

db = SqliteDatabase('my_app.db')
Person = Person.bind(db)
Note = Note.bind(db)
Reminder = Reminder.bind(db)

选择查询

要选择前三个注释并打印其内容,我们可以编写:

query = Note.select().order_by(Note.timestamp).limit(3)
for note_dict in query:
    print(note_dict['content'])

注解

默认情况下,行将作为字典返回。你可以使用 tuples()namedtuples()objects() 方法为行数据指定不同的容器(如果需要)。

因为我们没有指定任何列,所以我们在注释中定义的所有列 Table 将选择构造函数。这不适用于提醒,因为我们根本没有指定任何列。

要选择2018年发布的所有注释以及创建者姓名,我们将使用 join() . 我们还将请求将行作为 namedtuple 物体:

query = (Note
         .select(Note.content, Note.timestamp, Person.first, Person.last)
         .join(Person, on=(Note.person_id == Person.id))
         .where(Note.timestamp >= datetime.date(2018, 1, 1))
         .order_by(Note.timestamp)
         .namedtuples())

for row in query:
    print(row.timestamp, '-', row.content, '-', row.first, row.last)

让我们来查询最多产的人,也就是说,找到创造了最多笔记的人。这将引入调用SQL函数(count),该函数使用 fn 对象:

name = Person.first.concat(' ').concat(Person.last)
query = (Person
         .select(name.alias('name'), fn.COUNT(Note.id).alias('count'))
         .join(Note, JOIN.LEFT_OUTER, on=(Note.person_id == Person.id))
         .group_by(name)
         .order_by(fn.COUNT(Note.id).desc()))
for row in query:
    print(row['name'], row['count'])

在上面的查询中需要注意以下几点:

  • 我们将表达式存储在变量中( name ,然后在查询中使用它。
  • 我们使用 fn.<function>(...) 传递参数,就好像它是一个普通的python函数一样。
  • 这个 alias() 方法用于指定用于列或计算的名称。

作为一个更复杂的例子,我们将生成一个所有人的列表,以及他们最近发布的注释的内容和时间戳。为此,我们将在同一查询中的不同上下文中使用note表两次,这将要求我们使用表别名。

# Start with the query that calculates the timestamp of the most recent
# note for each person.
NA = Note.alias('na')
max_note = (NA
            .select(NA.person_id, fn.MAX(NA.timestamp).alias('max_ts'))
            .group_by(NA.person_id)
            .alias('max_note'))

# Now we'll select from the note table, joining on both the subquery and
# on the person table to construct the result set.
query = (Note
         .select(Note.content, Note.timestamp, Person.first, Person.last)
         .join(max_note, on=((max_note.c.person_id == Note.person_id) &
                             (max_note.c.max_ts == Note.timestamp)))
         .join(Person, on=(Note.person_id == Person.id))
         .order_by(Person.first, Person.last))

for row in query.namedtuples():
    print(row.first, row.last, ':', row.timestamp, '-', row.content)

在join谓词中, max_note 子查询,我们可以使用神奇的“.c”属性引用子查询中的列。所以, max_note.c.max_ts 转换为“max_note子查询中的max_ts列值”。

我们还可以使用“.c”magic属性访问没有显式定义其列的表上的列,就像我们对reminder表所做的那样。下面是一个简单的查询,用于获取今天的所有提醒及其关联的笔记内容:

today = datetime.date.today()
tomorrow = today + datetime.timedelta(days=1)

query = (Reminder
         .select(Reminder.c.alarm, Note.content)
         .join(Note, on=(Reminder.c.note_id == Note.id))
         .where(Reminder.c.alarm.between(today, tomorrow))
         .order_by(Reminder.c.alarm))
for row in query:
    print(row['alarm'], row['content'])

注解

“.c”属性不能用于显式定义其列的表,以防止混淆。

插入查询

插入数据很简单。我们可以指定数据 insert() 以两种不同的方式(在这两种情况下,都会返回新行的ID):

# Using keyword arguments:
zaizee_id = Person.insert(first='zaizee', last='cat').execute()

# Using column: value mappings:
Note.insert({
    Note.person_id: zaizee_id,
    Note.content: 'meeeeowwww',
    Note.timestamp: datetime.datetime.now()}).execute()

批量插入数据很容易,只需传入以下任一项即可:

  • 字典列表(所有字典都必须具有相同的键/列)。
  • 如果显式指定了列,则为元组列表。

实例:

people = [
    {'first': 'Bob', 'last': 'Foo'},
    {'first': 'Herb', 'last': 'Bar'},
    {'first': 'Nuggie', 'last': 'Bar'}]

# Inserting multiple rows returns the ID of the last-inserted row.
last_id = Person.insert(people).execute()

# We can also specify row tuples, so long as we tell Peewee which
# columns the tuple values correspond to:
people = [
    ('Bob', 'Foo'),
    ('Herb', 'Bar'),
    ('Nuggie', 'Bar')]
Person.insert(people, columns=[Person.first, Person.last]).execute()

更新查询

update() 查询接受关键字参数或字典将列映射到值,就像 insert() .

实例:

# "Bob" changed his last name from "Foo" to "Baze".
nrows = (Person
         .update(last='Baze')
         .where((Person.first == 'Bob') &
                (Person.last == 'Foo'))
         .execute())

# Use dictionary mapping column to value.
nrows = (Person
         .update({Person.last: 'Baze'})
         .where((Person.first == 'Bob') &
                (Person.last == 'Foo'))
         .execute())

还可以使用表达式作为值来执行原子更新。假设我们有一个 PageView 表,我们需要自动增加某些URL的页面视图计数:

# Do an atomic update:
(PageView
 .update({PageView.count: PageView.count + 1})
 .where(PageView.url == some_url)
 .execute())

删除查询

delete() 查询是最简单的,因为它们不接受任何参数:

# Delete all notes created before 2018, returning number deleted.
n = Note.delete().where(Note.timestamp < datetime.date(2018, 1, 1)).execute()

因为删除(和更新)查询不支持联接,所以可以使用子查询根据相关表中的值删除行。例如,下面是如何删除姓氏为“foo”的任何人的所有注释:

# Get the id of all people whose last name is "Foo".
foo_people = Person.select(Person.id).where(Person.last == 'Foo')

# Delete all notes by any person whose ID is in the previous query.
Note.delete().where(Note.person_id.in_(foo_people)).execute()

查询对象

Peewee2.x提供的抽象的一个基本限制是缺少一个表示与给定模型类没有关系的结构化查询的类。

例如,通过子查询计算聚合值。例如, count() 方法返回任意查询中的行数,通过包装查询实现:

SELECT COUNT(1) FROM (...)

为了用peewee实现这一点,实现是这样编写的:

def count(query):
    # Select([source1, ... sourcen], [column1, ...columnn])
    wrapped = Select(from_list=[query], columns=[fn.COUNT(SQL('1'))])
    curs = wrapped.tuples().execute(db)
    return curs[0][0]  # Return first column from first row of result.

我们可以用 scalar() 方法,适用于从聚合查询返回值:

def count(query):
    wrapped = Select(from_list=[query], columns=[fn.COUNT(SQL('1'))])
    return wrapped.scalar(db)

这个 查询实例 文档有一个更复杂的示例,在该示例中,我们为预订的可用插槽数最多的设备编写查询:

我们希望表达的SQL是:

SELECT facid, total FROM (
  SELECT facid, SUM(slots) AS total,
         rank() OVER (order by SUM(slots) DESC) AS rank
  FROM bookings
  GROUP BY facid
) AS ranked
WHERE rank = 1

我们可以用普通的 Select 对于外部查询:

# Store rank expression in variable for readability.
rank_expr = fn.rank().over(order_by=[fn.SUM(Booking.slots).desc()])

subq = (Booking
        .select(Booking.facility, fn.SUM(Booking.slots).alias('total'),
                rank_expr.alias('rank'))
        .group_by(Booking.facility))

# Use a plain "Select" to create outer query.
query = (Select(columns=[subq.c.facid, subq.c.total])
         .from_(subq)
         .where(subq.c.rank == 1)
         .tuples())

# Iterate over the resulting facility ID(s) and total(s):
for facid, total in query.execute(db):
    print(facid, total)

对于另一个示例,让我们创建一个递归公用表表达式来计算前10个斐波那契数:

base = Select(columns=(
    Value(1).alias('n'),
    Value(0).alias('fib_n'),
    Value(1).alias('next_fib_n'))).cte('fibonacci', recursive=True)

n = (base.c.n + 1).alias('n')
recursive_term = Select(columns=(
    n,
    base.c.next_fib_n,
    base.c.fib_n + base.c.next_fib_n)).from_(base).where(n < 10)

fibonacci = base.union_all(recursive_term)
query = fibonacci.select_from(fibonacci.c.n, fibonacci.c.fib_n)

results = list(query.execute(db))

# Generates the following result list:
[{'fib_n': 0, 'n': 1},
 {'fib_n': 1, 'n': 2},
 {'fib_n': 1, 'n': 3},
 {'fib_n': 2, 'n': 4},
 {'fib_n': 3, 'n': 5},
 {'fib_n': 5, 'n': 6},
 {'fib_n': 8, 'n': 7},
 {'fib_n': 13, 'n': 8},
 {'fib_n': 21, 'n': 9},
 {'fib_n': 34, 'n': 10}]

更多

有关用于描述SQL AST的各种类的描述,请参见 query builder API documentation .

如果您有兴趣了解更多信息,还可以查看 project source code .