Playhouse,扩展到Peewee

Peewee带有许多扩展模块,这些扩展模块收集在 playhouse 命名空间。尽管名字很傻,但还是有一些非常有用的扩展,特别是那些公开特定于供应商的数据库功能的扩展,如 sqlite扩展PostgreSQL扩展 扩展。

下面您将看到一个松散组织的各种模块列表,这些模块构成了 playhouse .

数据库驱动程序/特定于供应商的数据库功能

高级功能

数据库管理和框架集成

sqlite扩展

sqlite扩展已移动到 their own page .

SqliteQ

这个 playhouse.sqliteq 模块提供的子类 SqliteExtDatabase ,它将序列化对sqlite数据库的并发写入。 SqliteQueueDatabase 可作为常规 SqliteDatabase 如果你想简单点 read and write 从访问sqlite数据库 多线程.

SQLite在任何给定的时间只允许一个连接写入数据库。因此,如果您有一个多线程应用程序(例如Web服务器)需要写入数据库,当一个或多个尝试写入的线程无法获取锁时,您可能会偶尔看到错误。

SqliteQueueDatabase 旨在通过一个长期存在的连接发送所有写查询,从而简化操作。好处是,您可以看到多个线程在向数据库写入时没有冲突或超时。但是,缺点是,您不能发出包含多个查询的写事务——本质上,所有写操作都在自动提交模式下运行。

备注

模块的名称来自这样一个事实:所有写查询都被放入线程安全队列中。单个工作线程监听队列并执行发送到队列的所有查询。

交易

因为所有查询都是由一个工作线程序列化和执行的,所以来自不同线程的事务性SQL可能会无序执行。在下面的示例中,由线程“B”启动的事务由线程“A”回滚(结果很糟糕!):

  • 线程A:更新移植集器官='肝',…;

  • B线:开始交易;

  • 线程B:更新life_支持_系统设置计时器+=60…;

  • 线程A:回滚;--哦不…

由于有可能对来自不同事务的查询进行交错,因此 transaction()atomic() 方法在上被禁用 SqliteQueueDatabase .

对于希望从其他线程临时写入数据库的情况,可以使用 pause()unpause() 方法。在编写器线程完成其当前工作负载之前,这些方法会阻止调用方。然后作者断开连接,调用者接管直到 unpause 被称为。

这个 stop()start()is_stopped() 方法还可以用于控制编写器线程。

备注

看看sqlite的 isolation 有关SQLite如何处理并发连接的详细信息,请参阅文档。

代码样本

创建数据库实例不需要任何特殊处理。这个 SqliteQueueDatabase 接受一些您应该知道的特殊参数。如果您正在使用 gevent ,必须指定 use_gevent=True 当实例化数据库时——通过这种方式,peewee将知道如何使用适当的对象来处理队列、线程创建和锁定。

from playhouse.sqliteq import SqliteQueueDatabase

db = SqliteQueueDatabase(
    'my_app.db',
    use_gevent=False,  # Use the standard library "threading" module.
    autostart=False,  # The worker thread now must be started manually.
    queue_max_size=64,  # Max. # of pending writes that can accumulate.
    results_timeout=5.0)  # Max. time to wait for query to be executed.

如果 autostart=False ,在上面的示例中,您需要调用 start() 显示将执行实际写入查询执行的工作线程。

@app.before_first_request
def _start_worker_threads():
    db.start()

如果计划执行select查询或通常希望访问数据库,则需要调用 connect()close() 就像处理任何其他数据库实例一样。

当应用程序准备终止时,使用 stop() 方法关闭工作线程。如果有积压的工作,那么这个方法将被阻塞,直到所有待定的工作完成(尽管不允许有新的工作)。

import atexit

@atexit.register
def _stop_worker_threads():
    db.stop()

最后, is_stopped() 方法可用于确定数据库编写器是否已启动并正在运行。

sqlite用户定义函数

这个 sqlite_udf Playhouse模块包含许多用户定义函数、聚合函数和表值函数,您可能会发现这些函数很有用。函数在集合中分组,您可以单独注册这些用户定义的扩展,或者通过集合注册,或者注册所有内容。

标量函数是接受多个参数并返回单个值的函数。例如,将字符串转换为大写,或计算MD5十六进制摘要。

聚合函数类似于对多行数据进行操作并生成单个结果的标量函数。例如,计算整数列表的和,或在特定列中查找最小值。

表值函数只是可以返回多行数据的函数。例如,返回给定字符串中所有匹配项的正则表达式搜索函数,或接受两个日期并生成所有中间日期的函数。

备注

要使用表值函数,需要构建 playhouse._sqlite_ext C扩展。

正在注册用户定义函数:

db = SqliteDatabase('my_app.db')

# Register *all* functions.
register_all(db)

# Alternatively, you can register individual groups. This will just
# register the DATE and MATH groups of functions.
register_groups(db, 'DATE', 'MATH')

# If you only wish to register, say, the aggregate functions for a
# particular group or groups, you can:
register_aggregate_groups(db, 'DATE')

# If you only wish to register a single function, then you can:
from playhouse.sqlite_udf import gzip, gunzip
db.register_function(gzip, 'gzip')
db.register_function(gunzip, 'gunzip')

使用库函数(“hostname”):

# Assume we have a model, Link, that contains lots of arbitrary URLs.
# We want to discover the most common hosts that have been linked.
query = (Link
         .select(fn.hostname(Link.url).alias('host'), fn.COUNT(Link.id))
         .group_by(fn.hostname(Link.url))
         .order_by(fn.COUNT(Link.id).desc())
         .tuples())

# Print the hostname along with number of links associated with it.
for host, count in query:
    print('%s: %s' % (host, count))

函数,按集合名称列出

标量函数表示为 (f) ,聚合函数 (a) 和表值函数 (t) .

CONTROL_FLOW

if_then_else(cond, truthy[, falsey=None])

简单的三元类型运算符,其中,取决于 cond 参数,或者 truthyfalsey 将返回值。

DATE

strip_tz(date_str)
参数:

date_str -- 日期时间,编码为字符串。

返回:

除去所有时区信息的日期时间。

时间不会以任何方式调整,时区只会被删除。

humandelta(nseconds[, glue=', '])
参数:
  • nseconds (int) -- 以时间增量表示的总秒数。

  • glue (str) -- 连接值的片段。

返回:

易于阅读的TimeDelta描述。

例如,86471->“1天,1分钟,11秒”

mintdiff(datetime_value)
参数:

datetime_value -- 日期时间。

返回:

列表中任意两个值之间的最小差异。

聚合函数,用于计算任意两个日期时间之间的最小差异。

avgtdiff(datetime_value)
参数:

datetime_value -- 日期时间。

返回:

列表中值之间的平均差异。

计算列表中连续值之间平均差的聚合函数。

duration(datetime_value)
参数:

datetime_value -- 日期时间。

返回:

列表中从最小值到最大值的持续时间(秒)。

聚合函数,用于计算列表中从最小值到最大值的持续时间,以秒为单位返回。

date_series(start, stop[, step_seconds=86400])
参数:
  • start (datetime) -- 开始日期时间

  • stop (datetime) -- 停止日期时间

  • step_seconds (int) -- 包含步骤的秒数。

表值函数,返回从开始到停止迭代时遇到的由日期/时间值组成的行, step_seconds 一次。

此外,如果start没有时间组件,且step_seconds大于或等于一天(86400秒),则返回的值将是日期。相反,如果start没有日期组件,则值将作为times返回。否则,值将作为日期时间返回。

例子:

SELECT * FROM date_series('2017-01-28', '2017-02-02');

value
-----
2017-01-28
2017-01-29
2017-01-30
2017-01-31
2017-02-01
2017-02-02

FILE

file_ext(filename)
参数:

filename (str) -- 要从中提取扩展名的文件名。

返回:

返回文件扩展名,包括前导“.”。

file_read(filename)
参数:

filename (str) -- 要读取的文件名。

返回:

文件的内容。

HELPER

gzip(data[, compression=9])
参数:
  • data (bytes) -- 要压缩的数据。

  • compression (int) -- 压缩级别(9为最大值)。

返回:

压缩的二进制数据。

gunzip(data)
参数:

data (bytes) -- 压缩数据。

返回:

未压缩的二进制数据。

hostname(url)
参数:

url (str) -- 从中提取主机名的URL。

返回:

URL的主机名部分

toggle(key)
参数:

key -- 切换键。

在真/假状态之间切换键。例子:

>>> toggle('my-key')
True
>>> toggle('my-key')
False
>>> toggle('my-key')
True
setting(key[, value=None])
参数:
  • key -- 设置/检索的键。

  • value -- 设置值。

返回:

与键关联的值。

在内存中存储/检索设置,并在应用程序的生命周期中保持。要获取当前值,只需指定键。要设置新值,请使用键和新值调用。

clear_toggles()

清除与 toggle() 功能。

clear_settings()

清除与 setting() 功能。

MATH

randomrange(start[, stop=None[, step=None]])
参数:
  • start (int) -- 范围开始(含)

  • end (int) -- 范围结束(不包括)

  • step (int) -- 返回值的间隔。

返回一个介于 [start, end) .

gauss_distribution(mean, sigma)
参数:
  • mean (float) -- 平均值

  • sigma (float) -- 标准偏差

sqrt(n)

计算的平方根 n .

tonumber(s)
参数:

s (str) -- 要转换为数字的字符串。

返回:

整数、浮点或失败时为空。

mode(val)
参数:

val -- 列表中的数字。

返回:

观察到的模式或最常见的数字。

计算的聚合函数 mode 价值观。

minrange(val)
参数:

val -- 价值

返回:

两个值之间的最小差异。

聚合函数,计算序列中两个数字之间的最小距离。

avgrange(val)
参数:

val -- 价值

返回:

值之间的平均差。

聚合函数,计算序列中两个连续数字之间的平均距离。

range(val)
参数:

val -- 价值

返回:

按顺序从最小值到最大值的范围。

返回观测值范围的聚合函数。

median(val)
参数:

val -- 价值

返回:

中间值序列中的中间值。

计算序列中值的聚合函数。

备注

仅当您编译 _sqlite_udf 延伸。

STRING

substr_count(haystack, needle)

返回次数 needle 出现在 haystack .

strip_chars(haystack, chars)

删除中的任何字符 chars 从开始和结束 haystack .

damerau_levenshtein_dist(s1, s2)

使用Levenshtein算法的Damerau变量计算从s1到s2的编辑距离。

备注

仅当您编译 _sqlite_udf 延伸。

levenshtein_dist(s1, s2)

使用Levenshtein算法计算从s1到s2的编辑距离。

备注

仅当您编译 _sqlite_udf 延伸。

str_dist(s1, s2)

使用标准库SequenceMatcher算法计算从s1到s2的编辑距离。

备注

仅当您编译 _sqlite_udf 延伸。

参数:
  • regex (str) -- 正则表达式

  • search_string (str) -- 用于搜索regex实例的字符串。

表值函数,在字符串中搜索与提供的 regex . 返回找到的每个匹配项的行。

例子:

SELECT * FROM regex_search('\w+', 'extract words, ignore! symbols');

value
-----
extract
words
ignore
symbols

高级sqlite驱动程序apsw

这个 apsw_ext 模块包含一个适用于APSWSQLite驱动程序的数据库类。

APSW项目页面:https://github.com/rogerbins/apsw

APSW是一个非常整洁的库,它在sqlite的C接口上提供了一个很薄的包装器,使得使用sqlite的所有高级功能成为可能。

以下是从文档中获取的使用APSW的几个原因:

  • APSW提供了SQLite的所有功能,包括虚拟表、虚拟文件系统、BLOB I/O、备份和文件控制。

  • 连接可以跨线程共享,而无需任何附加锁定。

  • 事务由代码显式管理。

  • APSW可以处理嵌套事务。

  • Unicode处理正确。

  • APSW速度更快。

有关apsw和pysqlite之间差异的详细信息,请检查 the apsw docs .

如何使用apswdatabase

from apsw_ext import *

db = APSWDatabase(':memory:')

class BaseModel(Model):
    class Meta:
        database = db

class SomeModel(BaseModel):
    col1 = CharField()
    col2 = DateTimeField()

apsw_ext api说明

APSWDatabase 扩展了 SqliteExtDatabase 继承了它的高级功能。

class APSWDatabase(database, **connect_kwargs)
参数:
  • database (string) -- sqlite数据库的文件名

  • connect_kwargs -- 打开连接时传递给APSW的关键字参数

register_module(mod_name, mod_inst)

提供全局注册模块的方法。有关详细信息,请参阅 documentation on virtual tables .

参数:
  • mod_name (string) -- 用于模块的名称

  • mod_inst (object) -- 实现 Virtual Table 界面

unregister_module(mod_name)

注销模块。

参数:

mod_name (string) -- 用于模块的名称

备注

一定要使用 Field 在中定义的子类 apsw_ext 模块,因为它们将正确处理适应存储的数据类型。

例如,而不是使用 peewee.DateTimeField ,确保正在导入和使用 playhouse.apsw_ext.DateTimeField .

sqlcipher后端

备注

尽管这个扩展的代码很短,但它还没有经过适当的同行评审,可能引入了漏洞。

另请注意,此代码依赖于 sqlcipher3 (Python绑定)和 sqlcipher, 那里的代码也可能有漏洞,但由于这些都是广泛使用的密码模块,我们可以预期那里会有“短暂的零日”。

sqlcipher_ext api说明

class SqlCipherDatabase(database, passphrase, **kwargs)

的子类 SqliteDatabase 它存储加密的数据库。不是标准 sqlite3 后端,它使用 sqlcipher3: 一个python包装器,用于 sqlcipher, 这反过来又是一个加密的包装器 sqlite3 ,所以API是 identicalSqliteDatabase 的,但对象构造参数除外:

参数:
  • database -- 要打开[或创建]的加密数据库文件名的路径。

  • passphrase -- 数据库加密密码:长度至少为8个字符,但 strongly advised 更好地执行 passphrase strength 实现中的标准。

  • 如果 database 文件不存在,将 created 通过从中派生的密钥进行加密 passhprase .

  • 尝试打开现有数据库时, passhprase 应该与创建时使用的相同。如果密码短语不正确,则在首次尝试访问数据库时将引发错误。

rekey(passphrase)
参数:

passphrase (str) -- 数据库的新密码。

更改数据库的密码。

备注

可以使用许多扩展pragma配置sqlcipher。pragma及其描述的列表可以在 SQLCipher documentation .

例如,要指定密钥派生的pbkdf2迭代次数(默认情况下,sqlcipher 3.x中为64K,sqlcipher 4.x中为256K):

# Use 1,000,000 iterations.
db = SqlCipherDatabase('my_app.db', pragmas={'kdf_iter': 1000000})

要使用16KB的密码页大小和10000页的缓存大小,请执行以下操作:

db = SqlCipherDatabase('my_app.db', passphrase='secret!!!', pragmas={
    'cipher_page_size': 1024 * 16,
    'cache_size': 10000})  # 10,000 16KB pages, or 160MB.

提示用户输入密码短语的示例:

db = SqlCipherDatabase(None)

class BaseModel(Model):
    """Parent for all app's models"""
    class Meta:
        # We won't have a valid db until user enters passhrase.
        database = db

# Derive our model subclasses
class Person(BaseModel):
    name = TextField(primary_key=True)

right_passphrase = False
while not right_passphrase:
    db.init(
        'testsqlcipher.db',
        passphrase=get_passphrase_from_user())

    try:  # Actually execute a query against the db to test passphrase.
        db.get_tables()
    except DatabaseError as exc:
        # This error indicates the password was wrong.
        if exc.args[0] == 'file is encrypted or is not a database':
            tell_user_the_passphrase_was_wrong()
            db.init(None)  # Reset the db.
        else:
            raise exc
    else:
        # The password was correct.
        right_passphrase = True

另见:稍微详细一点 example .

PostgreSQL扩展

PostgreSQL扩展模块提供了许多“仅Postgres”功能,目前:

以后我想增加对PostgreSQL更多功能的支持。如果您希望看到添加的特定功能,请 open a Github issue .

警告

为了开始使用下面描述的功能,您需要使用扩展 PostgresqlExtDatabase 类而不是 PostgresqlDatabase .

下面的代码将假定您正在使用以下数据库和基本模型:

from playhouse.postgres_ext import *

ext_db = PostgresqlExtDatabase('peewee_test', user='postgres')

class BaseExtModel(Model):
    class Meta:
        database = ext_db

JSON支持

Peewee基本支持Postgres的原生JSON数据类型,其形式为 JSONField . 从2.4.7版开始,Peewee还支持Postgres9.4二进制JSON jsonb 通过类型 BinaryJSONField .

警告

Postgres从9.2开始支持本地JSON数据类型(在9.3中完全支持)。要使用此功能,必须使用正确版本的Postgres psycopg2 2.5或更高版本。

使用 BinaryJSONField 具有许多性能和查询优势,您必须拥有Postgres9.4或更高版本。

备注

必须确保数据库是 PostgresqlExtDatabase 为了使用 JSONField.

下面是一个使用JSON字段声明模型的示例:

import json
import urllib2
from playhouse.postgres_ext import *

db = PostgresqlExtDatabase('my_database')

class APIResponse(Model):
    url = CharField()
    response = JSONField()

    class Meta:
        database = db

    @classmethod
    def request(cls, url):
        fh = urllib2.urlopen(url)
        return cls.create(url=url, response=json.loads(fh.read()))

APIResponse.create_table()

# Store a JSON response.
offense = APIResponse.request('http://crime-api.com/api/offense/')
booking = APIResponse.request('http://crime-api.com/api/booking/')

# Query a JSON data structure using a nested key lookup:
offense_responses = APIResponse.select().where(
    APIResponse.response['meta']['model'] == 'offense')

# Retrieve a sub-key for each APIResponse. By calling .as_json(), the
# data at the sub-key will be returned as Python objects (dicts, lists,
# etc) instead of serialized JSON.
q = (APIResponse
     .select(
       APIResponse.data['booking']['person'].as_json().alias('person'))
     .where(APIResponse.data['meta']['model'] == 'booking'))

for result in q:
    print(result.person['name'], result.person['dob'])

这个 BinaryJSONField 工作原理和支持与常规操作相同的操作 JSONField ,但为测试提供了几个附加操作 遏制. 使用二进制JSON字段,可以测试JSON数据是否包含其他部分JSON结构。( contains()contains_any()contains_all() 或者它是更大的JSON文档的子集( contained_by()

有关更多示例,请参见 JSONFieldBinaryJSONField 以下为API文件。

HStand支持

Postgresql hstore 是嵌入的密钥/值存储。使用hstore,您可以在数据库中存储任意键/值对以及结构化关系数据。

使用 hstore ,在实例化 PostgresqlExtDatabase

# Specify "register_hstore=True":
db = PostgresqlExtDatabase('my_db', register_hstore=True)

目前 postgres_ext 模块支持以下操作:

  • 存储和检索任意字典

  • 按关键字或部分字典筛选

  • 更新/向现有词典添加一个或多个键

  • 从现有词典中删除一个或多个键

  • 选择键、值或zip键和值

  • 检索键/值的切片

  • 测试是否存在密钥

  • 测试键是否具有非空值

使用HSt铺

首先,您需要从导入自定义数据库类和hstore函数 playhouse.postgres_ext (请参见上面的代码段)。然后,只需添加一个 HStoreField 你的模型:

class House(BaseExtModel):
    address = CharField()
    features = HStoreField()

现在可以在上存储任意键/值对 House 实例:

>>> h = House.create(
...     address='123 Main St',
...     features={'garage': '2 cars', 'bath': '2 bath'})
...
>>> h_from_db = House.get(House.id == h.id)
>>> h_from_db.features
{'bath': '2 bath', 'garage': '2 cars'}

您可以按单个键、多个键或部分字典进行筛选:

>>> query = House.select()
>>> garage = query.where(House.features.contains('garage'))
>>> garage_and_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))

假设您想对房子进行原子更新:

>>> new_features = House.features.update({'bath': '2.5 bath', 'sqft': '1100'})
>>> query = House.update(features=new_features)
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'bath': '2.5 bath', 'garage': '2 cars', 'sqft': '1100'}

或者,原子删除:

>>> query = House.update(features=House.features.delete('bath'))
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'garage': '2 cars', 'sqft': '1100'}

可以同时删除多个键:

>>> query = House.update(features=House.features.delete('garage', 'sqft'))

您可以只选择键、值或压缩这两个值:

>>> for h in House.select(House.address, House.features.keys().alias('keys')):
...     print(h.address, h.keys)

123 Main St [u'bath', u'garage']

>>> for h in House.select(House.address, House.features.values().alias('vals')):
...     print(h.address, h.vals)

123 Main St [u'2 bath', u'2 cars']

>>> for h in House.select(House.address, House.features.items().alias('mtx')):
...     print(h.address, h.mtx)

123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]

您可以检索数据切片,例如,所有车库数据:

>>> query = House.select(House.address, House.features.slice('garage').alias('garage_data'))
>>> for house in query:
...     print(house.address, house.garage_data)

123 Main St {'garage': '2 cars'}

您可以检查是否存在键并相应地筛选行:

>>> has_garage = House.features.exists('garage')
>>> for house in House.select(House.address, has_garage.alias('has_garage')):
...     print(house.address, house.has_garage)

123 Main St True

>>> for house in House.select().where(House.features.exists('garage')):
...     print(house.address, house.features['garage'])  # <-- just houses w/garage data

123 Main St 2 cars

间隔支撑

Postgres通过 INTERVAL 数据类型( docs

class IntervalField([null=False[, ...]])

能够存储python的字段类 datetime.timedelta 实例。

例子:

from datetime import timedelta

from playhouse.postgres_ext import *

db = PostgresqlExtDatabase('my_db')

class Event(Model):
    location = CharField()
    duration = IntervalField()
    start_time = DateTimeField()

    class Meta:
        database = db

    @classmethod
    def get_long_meetings(cls):
        return cls.select().where(cls.duration > timedelta(hours=1))

伺服器端游标

当psycopg2执行一个查询时,通常所有结果都会被后端提取并返回给客户机。这可能会导致应用程序在进行大型查询时使用大量内存。使用服务器端的游标,每次返回少量结果(默认为2000条记录)。有关最终参考,请参见 psycopg2 documentation .

备注

要使用服务器端(或命名的)光标,必须使用 PostgresqlExtDatabase .

要使用服务器端光标执行查询,只需使用 ServerSide() 帮手:

large_query = PageView.select()  # Build query normally.

# Iterate over large query inside a transaction.
for page_view in ServerSide(large_query):
    # do some interesting analysis here.
    pass

# Server-side resources are released.

如果你想要全部 SELECT 查询以自动使用服务器端光标,可以在创建 PostgresqlExtDatabase

from postgres_ext import PostgresqlExtDatabase

ss_db = PostgresqlExtDatabase('my_db', server_side_cursors=True)

备注

服务器端光标只与事务存在一段时间,因此Peewee不会自动调用 commit() 在执行 SELECT 查询。如果你不 commit 迭代完成后,在连接关闭(或稍后提交事务)之前,不会释放服务器端资源。此外,由于peewee将默认缓存光标返回的行,因此应始终调用 .iterator() 迭代大型查询时。

如果您正在使用 ServerSide() 帮助程序、事务和调用 iterator() 将透明处理。

Postgres扩展API注释

class PostgresqlExtDatabase(database[, server_side_cursors=False[, register_hstore=False[, ...]]])

相同的 PostgresqlDatabase 但为了支持:

参数:
  • database (str) -- 要连接到的数据库的名称。

  • server_side_cursors (bool) -- 是否 SELECT 查询应该使用服务器端游标。

  • register_hstore (bool) -- 用连接注册hstore扩展。

如果要使用hstore扩展,必须指定 register_hstore=True .

如果使用 server_side_cursors ,还要确保用 ServerSide() .

ServerSide(select_query)
参数:

select_query -- 一 SelectQuery 实例。

R型发生器:

将给定的select查询包装在事务中,并调用 iterator() 方法以避免缓存行实例。为了释放服务器端资源,请确保耗尽生成器(遍历所有行)。

用途:

large_query = PageView.select()
for page_view in ServerSide(large_query):
    # Do something interesting.
    pass

# At this point server side resources are released.
class ArrayField([field_class=IntegerField[, field_kwargs=None[, dimensions=1[, convert_values=False]]]])
参数:
  • field_class -- 一个子类 Field ,例如 IntegerField .

  • field_kwargs (dict) -- 要初始化的参数 field_class .

  • dimensions (int) -- 数组的维度。

  • convert_values (bool) -- 应用 field_class 值转换为数组数据。

能够存储所提供的 field_class.

备注

默认情况下,arrayfield将使用gin索引。要禁用此功能,请使用初始化字段 index=False .

您可以存储和检索列表(或列表列表):

class BlogPost(BaseModel):
    content = TextField()
    tags = ArrayField(CharField)


post = BlogPost(content='awesome', tags=['foo', 'bar', 'baz'])

此外,您可以使用 __getitem__ 用于查询数据库中的值或切片的API:

# Get the first tag on a given blog post.
first_tag = (BlogPost
             .select(BlogPost.tags[0].alias('first_tag'))
             .where(BlogPost.id == 1)
             .dicts()
             .get())

# first_tag = {'first_tag': 'foo'}

获取值切片:

# Get the first two tags.
two_tags = (BlogPost
            .select(BlogPost.tags[:2].alias('two'))
            .dicts()
            .get())
# two_tags = {'two': ['foo', 'bar']}
contains(*items)
参数:

items -- 必须在给定数组字段中的一个或多个项。

# Get all blog posts that are tagged with both "python" and "django".
Blog.select().where(Blog.tags.contains('python', 'django'))
contains_any(*items)
参数:

items -- 要在给定数组字段中搜索的一个或多个项。

喜欢 contains() ,但将与数组包含的行匹配 any 给定项目的。

# Get all blog posts that are tagged with "flask" and/or "django".
Blog.select().where(Blog.tags.contains_any('flask', 'django'))
class DateTimeTZField(*args, **kwargs)

时区感知的子类 DateTimeField .

class HStoreField(*args, **kwargs)

用于存储和检索任意键/值对的字段。有关用法的详细信息,请参阅 HStand支持 .

注意

使用 HStoreField 你需要确定 hstore 扩展已注册到连接。要完成此操作,请实例化 PostgresqlExtDatabase 具有 register_hstore=True .

备注

默认情况下 HStoreField 将使用 GiST 索引。要禁用此功能,请使用初始化字段 index=False .

keys()

返回给定行的键。

>>> for h in House.select(House.address, House.features.keys().alias('keys')):
...     print(h.address, h.keys)

123 Main St [u'bath', u'garage']
values()

返回给定行的值。

>>> for h in House.select(House.address, House.features.values().alias('vals')):
...     print(h.address, h.vals)

123 Main St [u'2 bath', u'2 cars']
items()

像 Python 一样 dict ,返回列表中的键和值:

>>> for h in House.select(House.address, House.features.items().alias('mtx')):
...     print(h.address, h.mtx)

123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
slice(*args)

返回给定键列表的数据切片。

>>> for h in House.select(House.address, House.features.slice('garage').alias('garage_data')):
...     print(h.address, h.garage_data)

123 Main St {'garage': '2 cars'}
exists(key)

查询给定的键是否存在。

>>> for h in House.select(House.address, House.features.exists('garage').alias('has_garage')):
...     print(h.address, h.has_garage)

123 Main St True

>>> for h in House.select().where(House.features.exists('garage')):
...     print(h.address, h.features['garage']) # <-- just houses w/garage data

123 Main St 2 cars
defined(key)

查询给定的键是否有关联的值。

update(**data)

对给定行的键/值执行原子更新。

>>> query = House.update(features=House.features.update(
...     sqft=2000,
...     year_built=2012))
>>> query.where(House.id == 1).execute()
delete(*keys)

删除为一个或多个给定行提供的键。

备注

我们将使用 UPDATE 查询。


>>> query = House.update(features=House.features.delete(
...     'sqft', 'year_built'))
>>> query.where(House.id == 1).execute()
contains(value)
参数:

value -- 要么是 dict ,A list 或者一把键。

查询行中是否存在:

  • 部分字典。

  • 一串键。

  • 一把键。

>>> query = House.select()
>>> has_garage = query.where(House.features.contains('garage'))
>>> garage_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
contains_any(*keys)
参数:

keys -- 要搜索的一个或多个键。

查询行是否存在 any 关键。

class JSONField(dumps=None, *args, **kwargs)
参数:

dumps -- 默认值是调用json.dumps()或dumps函数。您可以重写这个方法来创建一个定制的JSON包装器。

适用于存储和查询任意JSON的字段类。在模型上使用这个时,将字段的值设置为python对象(或者 dict 或A list )从数据库中检索值时,它将作为python数据结构返回。

备注

您必须使用Postgres9.2/psycopg2 2.5或更高版本。

备注

如果您使用的是Postgres9.4,请强烈考虑使用 BinaryJSONField 相反,它提供了更好的性能和更强大的查询选项。

示例模型声明:

db = PostgresqlExtDatabase('my_db')

class APIResponse(Model):
    url = CharField()
    response = JSONField()

    class Meta:
        database = db

存储JSON数据的示例:

url = 'http://foo.com/api/resource/'
resp = json.loads(urllib2.urlopen(url).read())
APIResponse.create(url=url, response=resp)

APIResponse.create(url='http://foo.com/baz/', response={'key': 'value'})

要查询,请使用python的 [] 用于指定嵌套键或数组查找的运算符:

APIResponse.select().where(
    APIResponse.response['key1']['nested-key'] == 'some-value')

以说明 [] 运算符,假设我们将以下数据存储在 APIResponse

{
  "foo": {
    "bar": ["i1", "i2", "i3"],
    "baz": {
      "huey": "mickey",
      "peewee": "nugget"
    }
  }
}

以下是一些查询的结果:

def get_data(expression):
    # Helper function to just retrieve the results of a
    # particular expression.
    query = (APIResponse
             .select(expression.alias('my_data'))
             .dicts()
             .get())
    return query['my_data']

# Accessing the foo -> bar subkey will return a JSON
# representation of the list.
get_data(APIResponse.data['foo']['bar'])
# '["i1", "i2", "i3"]'

# In order to retrieve this list as a Python list,
# we will call .as_json() on the expression.
get_data(APIResponse.data['foo']['bar'].as_json())
# ['i1', 'i2', 'i3']

# Similarly, accessing the foo -> baz subkey will
# return a JSON representation of the dictionary.
get_data(APIResponse.data['foo']['baz'])
# '{"huey": "mickey", "peewee": "nugget"}'

# Again, calling .as_json() will return an actual
# python dictionary.
get_data(APIResponse.data['foo']['baz'].as_json())
# {'huey': 'mickey', 'peewee': 'nugget'}

# When dealing with simple values, either way works as
# you expect.
get_data(APIResponse.data['foo']['bar'][0])
# 'i1'

# Calling .as_json() when the result is a simple value
# will return the same thing as the previous example.
get_data(APIResponse.data['foo']['bar'][0].as_json())
# 'i1'
class BinaryJSONField(dumps=None, *args, **kwargs)
参数:

dumps -- 默认值是调用json.dumps()或dumps函数。您可以重写这个方法来创建一个定制的JSON包装器。

存储和查询任意JSON文档。应该使用普通的python存储数据 dictlist 对象,当从数据库返回数据时,将使用 dictlist 也。

有关基本查询操作的示例,请参见上面的代码示例 JSONField . 下面的示例查询将使用相同的 APIResponse 上述型号。

备注

默认情况下,binaryjsonfield将使用gist索引。要禁用此功能,请使用初始化字段 index=False .

备注

您必须使用Postgres9.4/psycopg2 2.5或更高版本。如果您使用的是Postgres9.2或9.3,则可以使用 JSONField 相反。

contains(other)

测试给定的JSON数据是否包含给定的JSON片段或键。

例子:

search_fragment = {
    'foo': {'bar': ['i2']}
}
query = (APIResponse
         .select()
         .where(APIResponse.data.contains(search_fragment)))

# If we're searching for a list, the list items do not need to
# be ordered in a particular way:
query = (APIResponse
         .select()
         .where(APIResponse.data.contains({
             'foo': {'bar': ['i2', 'i1']}})))

我们也可以传递简单的键。查找包含键的APIResponses foo 在顶层:

APIResponse.select().where(APIResponse.data.contains('foo'))

我们还可以使用方括号搜索子键:

APIResponse.select().where(
    APIResponse.data['foo']['bar'].contains(['i2', 'i1']))
contains_any(*items)

搜索一个或多个给定项的存在。

APIResponse.select().where(
    APIResponse.data.contains_any('foo', 'baz', 'nugget'))

喜欢 contains() ,我们也可以搜索子键:

APIResponse.select().where(
    APIResponse.data['foo']['bar'].contains_any('i2', 'ix'))
contains_all(*items)

搜索所有给定项目的存在。

APIResponse.select().where(
    APIResponse.data.contains_all('foo'))

喜欢 contains_any() ,我们也可以搜索子键:

APIResponse.select().where(
    APIResponse.data['foo']['bar'].contains_all('i1', 'i2', 'i3'))
contained_by(other)

测试给定的JSON文档是否包含在给定的JSON文档中(是该文档的子集)。此方法与 contains() .

big_doc = {
    'foo': {
        'bar': ['i1', 'i2', 'i3'],
        'baz': {
            'huey': 'mickey',
            'peewee': 'nugget',
        }
    },
    'other_key': ['nugget', 'bear', 'kitten'],
}
APIResponse.select().where(
    APIResponse.data.contained_by(big_doc))
concat(data)

总结两个现场数据和提供的数据。请注意,此操作不会合并或执行“Deep Concat”。

has_key(key)

测试该键是否存在于JSON对象的顶层。

remove(*keys)

从JSON对象的顶层删除一个或多个键。

Match(field, query)

生成全文搜索表达式,自动将左侧操作数转换为 tsvector 和右操作数 tsquery .

例子:

def blog_search(search_term):
    return Blog.select().where(
        (Blog.status == Blog.STATUS_PUBLISHED) &
        Match(Blog.content, search_term))
class TSVectorField

适用于存储的字段类型 tsvector 数据。此字段将自动创建为 GIN 提高搜索性能的索引。

备注

存储在此字段中的数据仍需要手动转换为 tsvector 类型。

备注

默认情况下,tsvectorfield将使用GIN索引。要禁用此功能,请使用初始化字段 index=False .

示例用法:

class Blog(Model):
    content = TextField()
    search_content = TSVectorField()

content = 'this is a sample blog entry.'
blog_entry = Blog.create(
    content=content,
    search_content=fn.to_tsvector(content))  # Note `to_tsvector()`.
match(query[, language=None[, plain=False]])
参数:
  • query (str) -- 全文搜索查询。

  • language (str) -- 语言名称(可选)。

  • plain (bool) -- 使用普通(简单)分析器分析搜索查询。

返回:

表示全文搜索/匹配的表达式。

例子:

# Perform a search using the "match" method.
terms = 'python & (sqlite | postgres)'
results = Blog.select().where(Blog.search_content.match(terms))

Cockroach数据库

CockroachDB (CRDB)得到了peewee的大力支持。

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app', user='root', host='10.1.0.8')

如果您正在使用 Cockroach Cloud ,您可能会发现使用连接字符串指定连接参数更容易:

db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')

备注

CockroachDB需要 psycopg2 (Postgres)Python驱动程序。

备注

CockroachDB安装和入门指南可在此处找到:https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html

安全套接字层配置

在运行Cockroach群集时,强烈建议使用SSL证书。但是,您可能需要在初始化数据库时指定一些额外的选项:

db = CockroachDatabase(
    'my_app',
    user='root',
    host='10.1.0.8',
    sslmode='verify-full',  # Verify the cert common-name.
    sslrootcert='/path/to/root.crt')


# Or, alternatively, specified as part of a connection-string:
db = CockroachDatabase('postgresql://root:secret@host:26257/dbname'
                       '?sslmode=verify-full&sslrootcert=/path/to/root.crt'
                       '&options=--cluster=my-cluster-xyz')

有关客户端验证的更多详细信息,请访问 libpq docs

蟑螂扩展接口

这个 playhouse.cockroachdb 扩展模块提供以下类和助手:

使用CRDB时可能有用的特殊字段类型:

  • UUIDKeyField -使用CRDB的主键字段实现 UUID 使用默认随机生成的UUID键入。

  • RowIDField -使用CRDB的主键字段实现 INT 使用默认值键入 unique_rowid() .

  • JSONField -和博士后一样 BinaryJSONField ,因为CRDB将JSON视为JSONB。

  • ArrayField -与Postgres扩展相同(但不支持多维数组)。

CRDB与Postgres的wire协议兼容,并且公开了一个非常相似的SQL接口,因此它是可能的(尽管 未推荐的 )使用 PostgresqlDatabase 使用CRDB:

  1. CRDB不支持嵌套事务(保存点),因此 atomic() 方法已实现,以便在使用 CockroachDatabase . 了解更多信息 CRDB事务 .

  2. CRDB可能在字段类型、日期函数和Postgres的自省方面有细微差别。

  3. CRDB的特定功能由 CockroachDatabase ,例如指定事务优先级或 AS OF SYSTEM TIME 条款。

CRDB事务

CRDB不支持嵌套事务(保存点),因此 atomic() 方法在 CockroachDatabase 已修改为在遇到无效嵌套时引发异常。如果希望能够嵌套事务性代码,可以使用 transaction() 方法,该方法将确保最外层的块将管理事务(例如,退出嵌套块不会导致提前提交)。

例子:

@db.transaction()
def create_user(username):
    return User.create(username=username)

def some_other_function():
    with db.transaction() as txn:
        # do some stuff...

        # This function is wrapped in a transaction, but the nested
        # transaction will be ignored and folded into the outer
        # transaction, as we are already in a wrapped-block (via the
        # context manager).
        create_user('some_user@example.com')

        # do other stuff.

    # At this point we have exited the outer-most block and the transaction
    # will be committed.
    return

CRDB提供客户端事务重试,使用特殊的 run_transaction() 帮手。这个helper方法接受callable,它负责执行任何可能需要重试的事务语句。

最简单的例子 run_transaction()

def create_user(email):
    # Callable that accepts a single argument (the database instance) and
    # which is responsible for executing the transactional SQL.
    def callback(db_ref):
        return User.create(email=email)

    return db.run_transaction(callback, max_attempts=10)

huey = create_user('huey@example.com')

备注

这个 cockroachdb.ExceededMaxAttempts 如果在给定的尝试次数后无法提交事务,将引发异常。如果SQL格式错误、违反约束等,则函数将向调用方引发异常。

使用示例 run_transaction() 要对将金额从一个帐户转移到另一个帐户的事务执行客户端重试:

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app')


def transfer_funds(from_id, to_id, amt):
    """
    Returns a 3-tuple of (success?, from balance, to balance). If there are
    not sufficient funds, then the original balances are returned.
    """
    def thunk(db_ref):
        src, dest = (Account
                     .select()
                     .where(Account.id.in_([from_id, to_id])))
        if src.id != from_id:
            src, dest = dest, src  # Swap order.

        # Cannot perform transfer, insufficient funds!
        if src.balance < amt:
            return False, src.balance, dest.balance

        # Update each account, returning the new balance.
        src, = (Account
                .update(balance=Account.balance - amt)
                .where(Account.id == from_id)
                .returning(Account.balance)
                .execute())
        dest, = (Account
                 .update(balance=Account.balance + amt)
                 .where(Account.id == to_id)
                 .returning(Account.balance)
                 .execute())
        return True, src.balance, dest.balance

    # Perform the queries that comprise a logical transaction. In the
    # event the transaction fails due to contention, it will be auto-
    # matically retried (up to 10 times).
    return db.run_transaction(thunk, max_attempts=10)

CRDB应用程序接口

class CockroachDatabase(database[, **kwargs])

CockroachDB数据库实现,基于 PostgresqlDatabase 并使用 psycopg2 驱动程序。

其他关键字参数被传递给psycopg2连接构造函数,并可用于指定数据库 userport 等。

或者,可以以URL形式指定连接详细信息。

run_transaction(callback[, max_attempts=None[, system_time=None[, priority=None]]])
参数:
  • callback -- 可赎回的,接受单一 db 参数(将是此方法从中调用的数据库实例)。

  • max_attempts (int) -- 放弃前尝试的最大次数。

  • system_time (datetime) -- 执行交易 AS OF SYSTEM TIME 相对于给定值。

  • priority (str) -- “低”、“正常”或“高”。

返回:

返回回调返回的值。

加薪:

ExceededMaxAttempts 如果 max_attempts 超过了。

在事务中运行SQL,并在客户端自动重试。

用户提供 callback

  • Must 接受一个参数 db 实例,该实例表示正在运行事务的连接。

  • Must 不尝试提交、回滚或以其他方式管理事务。

  • May 不止一次地被召唤。

  • 应该 理想情况下只包含SQL操作。

此外,在调用此函数时,数据库不能有任何打开的事务,因为CRDB不支持嵌套事务。尝试这样做将引发一个 NotImplementedError .

最简单的例子:

def create_user(email):
    def callback(db_ref):
        return User.create(email=email)

    return db.run_transaction(callback, max_attempts=10)

user = create_user('huey@example.com')
class PooledCockroachDatabase(database[, **kwargs])

CockroachDB连接池实现,基于 PooledPostgresqlDatabase . 实现与相同的API CockroachDatabase ,但将执行客户端连接池。

run_transaction(db, callback[, max_attempts=None[, system_time=None[, priority=None]]])

在事务中运行SQL,并在客户端自动重试。看到了吗 CockroachDatabase.run_transaction() 有关详细信息。

参数:
  • db (CockroachDatabase) -- 数据库实例。

  • callback -- 可赎回的,接受单一 db 参数(与上面传递的值相同)。

备注

此函数等效于 CockroachDatabase 班级。

class UUIDKeyField

使用CRDB的UUID主键字段 gen_random_uuid() 函数自动填充初始值。

class RowIDField

使用CRDB的自动递增整数主键字段 unique_rowid() 函数自动填充初始值。

参见:

  • BinaryJSONField 从Postgresql扩展(在 cockroachdb 扩展模块,并别名为 JSONField

  • ArrayField 从Postgresql扩展。

MySQL扩展

Peewee为使用 mysql-connector 司机或 mariadb-connector 。这些实现可以在 playhouse.mysql_ext

class MySQLConnectorDatabase(database, **kwargs)

使用以下工具实现数据库 mysql-connector 。支持的完整列表 connection parameters

mysql-connector用法示例:

from playhouse.mysql_ext import MySQLConnectorDatabase

# MySQL database implementation that utilizes mysql-connector driver.
db = MySQLConnectorDatabase('my_database', host='1.2.3.4', user='mysql')
class MariaDBConnectorDatabase(database, **kwargs)

使用以下工具实现数据库 mariadb-connector 。支持的完整列表 connection parameters

MariaDB-connector的用法示例:

from playhouse.mysql_ext import MariaDBConnectorDatabase

# MySQL database implementation that utilizes mysql-connector driver.
db = MariaDBConnectorDatabase('my_database', host='1.2.3.4', user='mysql')

备注

这个 MariaDBConnectorDatabase 会吗? not 接受以下参数:

  • charset (始终为utf8mb4)

  • sql_mode

  • use_unicode

其他特定于MySQL的帮助程序:

class JSONField

延伸 TextField 并在python中实现透明的JSON编码和解码。

extract(path)
参数:

path (str) -- JSON路径,例如 $.key1

从给定路径的JSON文档中提取一个值。

Match(columns, expr[, modifier=None])
参数:
  • columns -- 单人间 Field 或多个字段的元组。

  • expr (str) -- 全文搜索表达式。

  • modifier (str) -- 搜索的可选修饰符,例如 '在布尔模式' .

用于构造表单的MySQL全文搜索查询的帮助程序类:

MATCH (columns, ...) AGAINST (expr[ modifier])

DataSet

这个 dataset 模块包含一个高级API,用于处理根据流行的 project of the same name . 目的 dataset 模块应提供:

  • 用于处理关系数据的简化API,与JSON一起工作。

  • 将关系数据导出为JSON或CSV的简单方法。

  • 将JSON或CSV数据导入关系数据库的简单方法。

最小数据加载脚本可能如下所示:

from playhouse.dataset import DataSet

db = DataSet('sqlite:///:memory:')

table = db['sometable']
table.insert(name='Huey', age=3)
table.insert(name='Mickey', age=5, gender='male')

huey = table.find_one(name='Huey')
print(huey)
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}

for obj in table:
    print(obj)
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# {'age': 5, 'gender': 'male', 'id': 2, 'name': 'Mickey'}

您还可以使用字典API插入、更新或删除:

huey = table.find_one(name='Huey')
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}

# Perform an update by supplying a partial record of changes.
table[1] = {'gender': 'male', 'age': 4}
print(table[1])
# {'age': 4, 'gender': 'male', 'id': 1, 'name': 'Huey'}

# Or insert a new record:
table[3] = {'name': 'Zaizee', 'age': 2}
print(table[3])
# {'age': 2, 'gender': None, 'id': 3, 'name': 'Zaizee'}

# Or delete a record:
del table[3]  # Remove the row we just added.

可以使用导出或导入数据 freeze()thaw()

# Export table content to the `users.json` file.
db.freeze(table.all(), format='json', filename='users.json')

# Import data from a CSV file into a new table. Columns will be automatically
# created for each field in the CSV file.
new_table = db['stats']
new_table.thaw(format='csv', filename='monthly_stats.csv')

入门

DataSet 通过传入格式为的数据库URL初始化对象。 dialect://user:password@host/dbname . 见 数据库URL 有关连接到各种数据库的示例的部分。

# Create an in-memory SQLite database.
db = DataSet('sqlite:///:memory:')

存储数据

要存储数据,我们必须首先获取对表的引用。如果该表不存在,将自动创建:

# Get a table reference, creating the table if it does not exist.
table = db['users']

我们现在可以 insert() 表中的新行。如果列不存在,将自动创建它们:

table.insert(name='Huey', age=3, color='white')
table.insert(name='Mickey', age=5, gender='male')

若要更新表中的现有条目,请传入包含新值和筛选条件的字典。要用作筛选器的列列表在 columns 参数。如果未指定筛选列,则将更新所有行。

# Update the gender for "Huey".
table.update(name='Huey', gender='male', columns=['name'])

# Update all records. If the column does not exist, it will be created.
table.update(favorite_orm='peewee')

导入数据

要从外部源(如JSON或CSV文件)导入数据,可以使用 thaw() 方法。默认情况下,将为遇到的任何属性创建新列。如果只希望填充表中已定义的列,则可以传入 strict=True .

# Load data from a JSON file containing a list of objects.
table = dataset['stock_prices']
table.thaw(filename='stocks.json', format='json')
table.all()[:3]

# Might print...
[{'id': 1, 'ticker': 'GOOG', 'price': 703},
 {'id': 2, 'ticker': 'AAPL', 'price': 109},
 {'id': 3, 'ticker': 'AMZN', 'price': 300}]

使用事务

数据集支持使用简单的上下文管理器嵌套事务。

table = db['users']
with db.transaction() as txn:
    table.insert(name='Charlie')

    with db.transaction() as nested_txn:
        # Set Charlie's favorite ORM to Django.
        table.update(name='Charlie', favorite_orm='django', columns=['name'])

        # jk/lol
        nested_txn.rollback()

正在检查数据库

你可以使用 tables() 列出当前数据库中表的方法:

>>> print(db.tables)
['sometable', 'user']

对于给定的表,可以打印列:

>>> table = db['user']
>>> print(table.columns)
['id', 'age', 'name', 'gender', 'favorite_orm']

我们还可以找出表中有多少行:

>>> print(len(db['user']))
3

阅读数据

要检索所有行,可以使用 all() 方法:

# Retrieve all the users.
users = db['user'].all()

# We can iterate over all rows without calling `.all()`
for user in db['user']:
    print(user['name'])

可以使用 find()find_one() .

# Find all the users who like peewee.
peewee_users = db['user'].find(favorite_orm='peewee')

# Find Huey.
huey = db['user'].find_one(name='Huey')

导出数据

要导出数据,请使用 freeze() 方法,传入要导出的查询:

peewee_users = db['user'].find(favorite_orm='peewee')
db.freeze(peewee_users, format='json', filename='peewee_users.json')

API

class DataSet(url, **kwargs)
参数:

这个 DataSet 类提供用于处理关系数据库的高级API。

tables

返回存储在数据库中的表的列表。每次访问该列表时,都会动态计算该列表。

__getitem__(table_name)

提供一个 Table 对指定表的引用。如果表不存在,将创建它。

query(sql[, params=None[, commit=True]])
参数:
  • sql (str) -- SQL查询。

  • params (list) -- 查询的可选参数。

  • commit (bool) -- 执行时是否应提交查询。

返回:

数据库光标。

对数据库执行提供的查询。

transaction()

创建表示新事务(或保存点)的上下文管理器。

freeze(query[, format='csv'[, filename=None[, file_obj=None[, encoding='utf8'[, **kwargs]]]]])
参数:
  • query -- A SelectQuery ,使用生成 all() 或`~表。查找`。

  • format -- 输出格式。默认情况下, csvjson 支持。

  • filename -- 要写入输出的文件名。

  • file_obj -- 类似文件的对象,用于写入输出。

  • encoding (str) -- 文件编码。

  • kwargs -- 用于导出特定功能的任意参数。

thaw(table[, format='csv'[, filename=None[, file_obj=None[, strict=False[, encoding='utf8'[, **kwargs]]]]]])
参数:
  • table (str) -- 要将数据加载到的表的名称。

  • format -- 输入格式。默认情况下, csvjson 支持。

  • filename -- 要从中读取数据的文件名。

  • file_obj -- 从中读取数据的类似文件的对象。

  • strict (bool) -- 是否存储表中不存在的列的值。

  • encoding (str) -- 文件编码。

  • kwargs -- 导入特定功能的任意参数。

connect()

打开与基础数据库的连接。如果未显式打开连接,则第一次执行查询时将打开一个连接。

close()

关闭与基础数据库的连接。

class Table(dataset, name, model_class)
诺因德克斯:

提供用于处理给定表中的行的高级API。

columns

返回给定表中的列列表。

model_class

动态创建的 Model 班级。

create_index(columns[, unique=False])

在给定列上创建索引:

# Create a unique index on the `username` column.
db['users'].create_index(['username'], unique=True)
insert(**data)

将给定的数据字典插入表中,根据需要创建新列。

update(columns=None, conjunction=None, **data)

使用提供的数据更新表。如果在 columns 参数,然后这些列的值 data 字典将用于确定要更新的行。

# Update all rows.
db['users'].update(favorite_orm='peewee')

# Only update Huey's record, setting his age to 3.
db['users'].update(name='Huey', age=3, columns=['name'])
find(**query)

在表中查询与指定的相等条件匹配的行。如果未指定查询,则返回所有行。

peewee_users = db['users'].find(favorite_orm='peewee')
find_one(**query)

返回与指定的相等条件匹配的单行。如果找不到匹配的行,则 None 将被退回。

huey = db['users'].find_one(name='Huey')
all()

返回给定表中的所有行。

delete(**query)

删除与给定相等条件匹配的所有行。如果没有提供查询,则将删除所有行。

# Adios, Django!
db['users'].delete(favorite_orm='Django')

# Delete all the secret messages.
db['secret_messages'].delete()
freeze([format='csv'[, filename=None[, file_obj=None[, **kwargs]]]])
参数:
  • format -- 输出格式。默认情况下, csvjson 支持。

  • filename -- 要写入输出的文件名。

  • file_obj -- 类似文件的对象,用于写入输出。

  • kwargs -- 用于导出特定功能的任意参数。

thaw([format='csv'[, filename=None[, file_obj=None[, strict=False[, **kwargs]]]]])
参数:
  • format -- 输入格式。默认情况下, csvjson 支持。

  • filename -- 要从中读取数据的文件名。

  • file_obj -- 从中读取数据的类似文件的对象。

  • strict (bool) -- 是否存储表中不存在的列的值。

  • kwargs -- 导入特定功能的任意参数。

领域

这些字段可以在 playhouse.fields 模块。

class CompressedField([compression_level=6[, algorithm='zlib'[, **kwargs]]])
参数:
  • compression_level (int) -- 从0到9的值。

  • algorithm (str) -- 要么 'zlib''bz2' .

使用指定的算法存储压缩数据。此字段扩展 BlobField ,透明地在数据库中存储数据的压缩表示形式。

class PickleField

通过透明的酸洗和非酸洗存储在字段中的数据来存储任意的python数据。此字段扩展 BlobField . 如果 cPickle 模块可用,将使用。

混合属性

混合属性封装了在两个Python上运行的功能 and SQL级别。混合属性的想法来自于 same name in SQLAlchemy 。请考虑以下示例:

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    @hybrid_property
    def length(self):
        return self.end - self.start

    @hybrid_method
    def contains(self, point):
        return (self.start <= point) & (point < self.end)

这个 hybrid attribute 它的名字来源于 length 根据是否通过 Interval 类或类 Interval 实例。

如果通过实例访问,那么它的行为与您预期的一样。

如果通过访问 Interval.length 但是,长度计算将表示为SQL表达式。例如:

query = Interval.select().where(Interval.length > 5)

此查询将等效于以下SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."end" - "t1"."start") > 5)

这个 playhouse.hybrid 模块还包含一个用于实现可以接受参数的混合方法的修饰器。与混合属性一样,当通过模型实例访问时,函数会正常执行。但是,当对类调用混合方法时,它将生成一个SQL表达式。

例子:

query = Interval.select().where(Interval.contains(2))

此查询等效于以下SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))

对于Python实现与SQL实现略有不同的情况,还有一个附加的API。让我们添加一个 radius 方法到 Interval 模型。因为这个方法计算绝对值,所以我们将使用python abs() 实例部分和 fn.ABS() 类部分的SQL函数。

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    @hybrid_property
    def length(self):
        return self.end - self.start

    @hybrid_property
    def radius(self):
        return abs(self.length) / 2

    @radius.expression
    def radius(cls):
        return fn.ABS(cls.length) / 2

干净利落的是这两个 radius 实现是指 length 混合属性!当通过 Interval 例如,半径计算将在python中执行。当通过 Interval 类,我们将获得适当的SQL。

例子:

query = Interval.select().where(Interval.radius < 3)

此查询等效于以下SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE ((abs("t1"."end" - "t1"."start") / 2) < 3)

挺干净的,对吧?谢谢你的好主意,SQLAlchemy!

混合API

class hybrid_method(func[, expr=None])

方法修饰器,允许定义具有实例级和类级行为的Python对象方法。

例子:

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    @hybrid_method
    def contains(self, point):
        return (self.start <= point) & (point < self.end)

当使用 Interval 实例 contains 方法的行为将如您所期望的那样。但是,当作为类方法调用时,将生成一个SQL表达式:

query = Interval.select().where(Interval.contains(2))

将生成以下SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
expression(expr)

用于指定SQL表达式生成方法的方法修饰器。

class hybrid_property(fget[, fset=None[, fdel=None[, expr=None]]])

方法修饰器,它允许使用实例级和类级行为定义Python对象属性。

实例:

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    @hybrid_property
    def length(self):
        return self.end - self.start

    @hybrid_property
    def radius(self):
        return abs(self.length) / 2

    @radius.expression
    def radius(cls):
        return fn.ABS(cls.length) / 2

当访问 Interval 实例 lengthradius 属性的行为将如您所期望的那样。但是,当作为类属性访问时,将生成一个SQL表达式:

query = (Interval
         .select()
         .where(
             (Interval.length > 6) &
             (Interval.radius >= 3)))

将生成以下SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (
    (("t1"."end" - "t1"."start") > 6) AND
    ((abs("t1"."end" - "t1"."start") / 2) >= 3)
)

密钥/值存储器

这个 playhouse.kv 模块包含持久字典的实现。

class KeyValue([key_field=None[, value_field=None[, ordered=False[, database=None[, table_name='keyvalue']]]]])
参数:
  • key_field (Field) -- 用于键的字段。默认为 CharField . Must have primary_key=True .

  • value_field (Field) -- 用于值的字段。默认为 PickleField .

  • ordered (bool) -- 数据应按键排序的顺序返回。

  • database (Database) -- 存储键/值数据的数据库。如果未指定,将使用内存中的sqlite数据库。

  • table_name (str) -- 数据存储的表名。

用于存储键/值数据的类似字典的API。与字典一样,支持预期的API,但也具有接受表达式以获取、设置和删除项的附加功能。

KeyValue 已实例化。

使用高效的upsert实现来设置和更新/覆盖键/值对。

基本实例:

# Create a key/value store, which uses an in-memory SQLite database
# for data storage.
KV = KeyValue()

# Set (or overwrite) the value for "k1".
KV['k1'] = 'v1'

# Set (or update) multiple keys at once (uses an efficient upsert).
KV.update(k2='v2', k3='v3')

# Getting values works as you'd expect.
assert KV['k2'] == 'v2'

# We can also do this:
for value in KV[KV.key > 'k1']:
    print(value)

# 'v2'
# 'v3'

# Update multiple values at once using expression:
KV[KV.key > 'k1'] = 'vx'

# What's stored in the KV?
print(dict(KV))

# {'k1': 'v1', 'k2': 'vx', 'k3': 'vx'}

# Delete a single item.
del KV['k2']

# How many items are stored in the KV?
print(len(KV))
# 2

# Delete items that match the given condition.
del KV[KV.key > 'k1']
__contains__(expr)
参数:

expr -- 单个键或表达式

返回:

布尔值是否存在键/表达式。

例子:

>>> kv = KeyValue()
>>> kv.update(k1='v1', k2='v2')

>>> 'k1' in kv
True
>>> 'kx' in kv
False

>>> (KV.key < 'k2') in KV
True
>>> (KV.key > 'k2') in KV
False
__len__()
返回:

存储的项目计数。

__getitem__(expr)
参数:

expr -- 单个键或表达式。

返回:

与键/表达式对应的值。

加薪:

KeyError 如果给出了一把键,但找不到。

实例:

>>> KV = KeyValue()
>>> KV.update(k1='v1', k2='v2', k3='v3')

>>> KV['k1']
'v1'
>>> KV['kx']
KeyError: "kx" not found

>>> KV[KV.key > 'k1']
['v2', 'v3']
>>> KV[KV.key < 'k1']
[]
__setitem__(expr, value)
参数:
  • expr -- 单个键或表达式。

  • value -- 要为键设置的值

为给定键设置值。如果 expr 是表达式,则与该表达式匹配的任何键都将更新其值。

例子:

>>> KV = KeyValue()
>>> KV.update(k1='v1', k2='v2', k3='v3')

>>> KV['k1'] = 'v1-x'
>>> print(KV['k1'])
'v1-x'

>>> KV[KV.key >= 'k2'] = 'v99'
>>> dict(KV)
{'k1': 'v1-x', 'k2': 'v99', 'k3': 'v99'}
__delitem__(expr)
参数:

expr -- 单个键或表达式。

删除给定的键。如果给定了表达式,请删除与该表达式匹配的所有键。

例子:

>>> KV = KeyValue()
>>> KV.update(k1=1, k2=2, k3=3)

>>> del KV['k1']  # Deletes "k1".
>>> del KV['k1']
KeyError: "k1" does not exist

>>> del KV[KV.key > 'k2']  # Deletes "k3".
>>> del KV[KV.key > 'k99']  # Nothing deleted, no keys match.
keys()
返回:

表中所有键中的一个不可重复项。

values()
返回:

表中所有值的I表。

items()
返回:

表中所有键/值对的ITable。

update([__data=None[, **mapping]])

有效地批量插入或替换给定的键/值对。

例子:

>>> KV = KeyValue()
>>> KV.update(k1=1, k2=2)  # Sets 'k1'=1, 'k2'=2.

>>> dict(KV)
{'k1': 1, 'k2': 2}

>>> KV.update(k2=22, k3=3)  # Updates 'k2'->22, sets 'k3'=3.

>>> dict(KV)
{'k1': 1, 'k2': 22, 'k3': 3}

>>> KV.update({'k2': -2, 'k4': 4})  # Also can pass a dictionary.

>>> dict(KV)
{'k1': 1, 'k2': -2, 'k3': 3, 'k4': 4}
get(expr[, default=None])
参数:
  • expr -- 单个键或表达式。

  • default -- 未找到键时的默认值。

返回:

给定键/expr的值,如果找不到单个键,则为默认值。

获取给定键的值。如果键不存在,则返回默认值,除非键是表达式,在这种情况下,将返回空列表。

pop(expr[, default=Sentinel])
参数:
  • expr -- 单个键或表达式。

  • default -- 键不存在时的默认值。

返回:

给定键/expr的值,如果找不到单个键,则为默认值。

获取值并删除给定的键。如果键不存在,则返回默认值,除非键是表达式,在这种情况下返回空列表。

clear()

从键值表中删除所有项。

快捷方式

此模块包含帮助函数,用于表示使用Peewee的API可能有些冗长或繁琐的内容。还有一些帮助器可以将模型序列化到字典,反之亦然。

model_to_dict(model[, recurse=True[, backrefs=False[, only=None[, exclude=None[, extra_attrs=None[, fields_from_query=None[, max_depth=None[, manytomany=False]]]]]]]])
参数:
  • recurse (bool) -- 是否应重复使用外键。

  • backrefs (bool) -- 是否应递归相关对象的列表。

  • only -- 应包含在结果字典中的字段实例列表(或集合)。

  • exclude -- 应从结果字典中排除的字段实例列表(或集合)。

  • extra_attrs -- 实例上应包含在字典中的属性或方法名称列表。

  • fields_from_query (Select) -- 这个 SelectQuery 创建了此模型实例。只有由查询显式选择的字段和值将被序列化。

  • max_depth (int) -- 递归时的最大深度。

  • manytomany (bool) -- 处理多对多字段。

将模型实例(以及任意相关实例)转换为字典。

实例:

>>> user = User.create(username='charlie')
>>> model_to_dict(user)
{'id': 1, 'username': 'charlie'}

>>> model_to_dict(user, backrefs=True)
{'id': 1, 'tweets': [], 'username': 'charlie'}

>>> t1 = Tweet.create(user=user, message='tweet-1')
>>> t2 = Tweet.create(user=user, message='tweet-2')
>>> model_to_dict(user, backrefs=True)
{
  'id': 1,
  'tweets': [
    {'id': 1, 'message': 'tweet-1'},
    {'id': 2, 'message': 'tweet-2'},
  ],
  'username': 'charlie'
}

>>> model_to_dict(t1)
{
  'id': 1,
  'message': 'tweet-1',
  'user': {
    'id': 1,
    'username': 'charlie'
  }
}

>>> model_to_dict(t2, recurse=False)
{'id': 1, 'message': 'tweet-2', 'user': 1}

实施 model_to_dict 是相当复杂的,因为它试图支持的各种用法。如果你有特殊用法,我强烈建议你这样做 not 尝试在这个函数中加入一些疯狂的参数组合。只需编写一个简单的函数,它就可以精确地完成您想要做的事情。

dict_to_model(model_class, data[, ignore_unknown=False])
参数:
  • model_class (Model) -- 要构造的模型类。

  • data (dict) -- 数据字典。外键可以作为嵌套字典包含,而后面的引用可以作为字典列表包含。

  • ignore_unknown (bool) -- 是否允许不可识别的(非字段)属性。

将数据字典转换为模型实例,并在适当的情况下创建相关实例。

实例:

>>> user_data = {'id': 1, 'username': 'charlie'}
>>> user = dict_to_model(User, user_data)
>>> user
<__main__.User at 0x7fea8fa4d490>

>>> user.username
'charlie'

>>> note_data = {'id': 2, 'text': 'note text', 'user': user_data}
>>> note = dict_to_model(Note, note_data)
>>> note.text
'note text'
>>> note.user.username
'charlie'

>>> user_with_notes = {
...     'id': 1,
...     'username': 'charlie',
...     'notes': [{'id': 1, 'text': 'note-1'}, {'id': 2, 'text': 'note-2'}]}
>>> user = dict_to_model(User, user_with_notes)
>>> user.notes[0].text
'note-1'
>>> user.notes[0].user.username
'charlie'
update_model_from_dict(instance, data[, ignore_unknown=False])
参数:
  • instance (Model) -- 要更新的模型实例。

  • data (dict) -- 数据字典。外键可以作为嵌套字典包含,而后面的引用可以作为字典列表包含。

  • ignore_unknown (bool) -- 是否允许不可识别的(非字段)属性。

使用给定的数据字典更新模型实例。

resolve_multimodel_query(query[, key='_model_identifier'])
参数:
  • query -- 复合选择查询。

  • key (str) -- 用于存储模型标识符的键

返回:

一个可迭代的游标,为在复合选择查询中选择的每一行生成正确的模型实例。

帮助程序,用于将复合选择查询中返回的行解析为正确的模型实例类型。例如,如果您有两个不同表的联合,则当迭代查询结果时,此助手将把每一行解析为适当的模型。

class ThreadSafeDatabaseMetadata

型号 Metadata 实现,该实现提供对 database 属性,从而允许应用程序在运行时在多线程应用程序中安全地交换数据库。

用途:

from playhouse.shortcuts import ThreadSafeDatabaseMetadata

# Our multi-threaded application will sometimes swap out the primary
# for the read-replica at run-time.
primary = PostgresqlDatabase(...)
read_replica = PostgresqlDatabase(...)

class BaseModel(Model):
    class Meta:
        database = primary
        model_metadata_class = ThreadSafeDatabaseMetadata

信号支持

带有信号钩(A-LA Django)的型号见 playhouse.signals . 要使用这些信号,您需要将项目的所有模型都作为 playhouse.signals.Model 它覆盖了为各种信号提供支持的必要方法。

from playhouse.signals import Model, post_save


class MyModel(Model):
    data = IntegerField()

@post_save(sender=MyModel)
def on_save_handler(model_class, instance, created):
    put_data_in_cache(instance.data)

警告

我希望这是明显的原因,当你使用 Model.insert()Model.update()Model.delete() 方法。这些方法生成在ORM范围之外执行的查询,而ORM不知道在执行查询时哪些模型实例可能会受到或可能不会受到影响。

信号通过钩住更高级的peewee API工作,比如 Model.save()Model.delete_instance() ,其中受影响的模型实例提前已知。

提供以下信号:

pre_save

在对象保存到数据库之前立即调用。提供其他关键字参数 created ,指示模型是第一次保存还是更新。

post_save

在对象保存到数据库后立即调用。提供其他关键字参数 created ,指示模型是第一次保存还是更新。

pre_delete

在从数据库中删除对象之前立即调用 Model.delete_instance() 使用。

post_delete

当对象从数据库中删除后立即调用 Model.delete_instance() 使用。

pre_init

当模型类首次实例化时调用

连接处理程序

无论何时发送信号,它都将调用已注册的任何处理程序。这允许完全独立的代码响应模型保存和删除等事件。

这个 Signal 类提供 connect() 方法,它接受回调函数和“sender”和“name”的两个可选参数。如果指定,“sender”参数应该是单个模型类,并且允许回调只接收来自该模型类的信号。“name”参数在您想要注销信号处理程序的事件中用作方便的别名。

示例用法:

from playhouse.signals import *

def post_save_handler(sender, instance, created):
    print('%s was just saved' % instance)

# our handler will only be called when we save instances of SomeModel
post_save.connect(post_save_handler, sender=SomeModel)

所有信号处理程序都接受作为前两个参数 senderinstance 在哪里 sender 是模型类和 instance 是实际的模型。

如果愿意,还可以使用修饰器连接信号处理程序。这在功能上等同于上述示例:

@post_save(sender=SomeModel)
def post_save_handler(sender, instance, created):
    print('%s was just saved' % instance)

信号API

class Signal

存储接收器列表(回调),并在调用“send”方法时调用它们。

connect(receiver[, name=None[, sender=None]])
参数:
  • receiver (callable) -- 一个至少接受两个参数的可调用文件,“sender”是触发信号的模型子类,“instance”是实际的模型实例。

  • name (string) -- 短别名

  • sender (Model) -- 如果指定,则只有此模型类的实例才会触发接收器回调。

将接收器添加到接收器的内部列表中,在发送信号时将调用该列表。

from playhouse.signals import post_save
from project.handlers import cache_buster

post_save.connect(cache_buster, name='project.cache_buster')
disconnect([receiver=None[, name=None[, sender=None]]])
参数:
  • receiver (callable) -- 要断开连接的回调

  • name (string) -- 短别名

  • sender (Model) -- 断开特定于型号的处理程序。

断开给定接收器(或具有给定名称别名的接收器)的连接,使其不再被调用。必须提供接收器或名称。

post_save.disconnect(name='project.cache_buster')
send(instance, *args, **kwargs)
参数:

instance -- 模型实例

循环访问接收器,并按连接顺序调用它们。如果接收者指定了发送者,则只有当实例是发送者的实例时才会调用它。

Pwiz,模型发生器

pwiz 是一个与Peewee一起提供的小脚本,能够内省现有数据库并生成适合与基础数据交互的模型代码。如果您已经有了一个数据库,那么pwiz可以通过使用正确的列关联性和外键生成框架代码来给您带来很大的帮助。

如果安装Peewee,请使用 setup.py install ,pwiz将作为“脚本”安装,您只需运行:

python -m pwiz -e postgresql -u postgres my_postgres_db

这将打印出一系列的模型到标准输出。所以你可以这样做:

python -m pwiz -e postgresql my_postgres_db > mymodels.py
python # <-- fire up an interactive shell
>>> from mymodels import Blog, Entry, Tag, Whatever
>>> print([blog.name for blog in Blog.select()])

命令行选项

pwiz接受以下命令行选项:

期权

意义

例子

-H

显示帮助

-e

数据库后端

E-MySQL

-H

要连接的主机

-H远程数据库服务器

-P

要连接的端口

-P 9001

-U

数据库用户

-后邮局

-P

数据库密码

-P(将提示输入密码)

-S

图式

公众

-T

要生成的表

-推特、用户、关系

-V

为视图生成模型

(没有参数)

-我

向生成的文件添加信息元数据

(没有参数)

-O

保留表列顺序

(没有参数)

以下是的有效参数 engine-e ):

  • SQLite

  • MySQL

  • 波斯特雷斯尔

警告

如果访问数据库需要密码,系统将提示您使用安全提示输入密码。

密码将包含在输出中. 具体来说,在文件的顶部 Database 将与任何必需的参数一起定义——包括密码。

PWIZ实例

反省各种数据库的示例:

# Introspect a Sqlite database.
python -m pwiz -e sqlite path/to/sqlite_database.db

# Introspect a MySQL database, logging in as root. You will be prompted
# for a password ("-P").
python -m pwiz -e mysql -u root -P mysql_db_name

# Introspect a Postgresql database on a remote server.
python -m pwiz -e postgres -u postgres -H 10.1.0.3 pg_db_name

完整例子:

$ sqlite3 example.db << EOM
CREATE TABLE "user" ("id" INTEGER NOT NULL PRIMARY KEY, "username" TEXT NOT NULL);
CREATE TABLE "tweet" (
    "id" INTEGER NOT NULL PRIMARY KEY,
    "content" TEXT NOT NULL,
    "timestamp" DATETIME NOT NULL,
    "user_id" INTEGER NOT NULL,
    FOREIGN KEY ("user_id") REFERENCES "user" ("id"));
CREATE UNIQUE INDEX "user_username" ON "user" ("username");
EOM

$ python -m pwiz -e sqlite example.db

生成以下输出:

from peewee import *

database = SqliteDatabase('example.db', **{})

class UnknownField(object):
    def __init__(self, *_, **__): pass

class BaseModel(Model):
    class Meta:
        database = database

class User(BaseModel):
    username = TextField(unique=True)

    class Meta:
        table_name = 'user'

class Tweet(BaseModel):
    content = TextField()
    timestamp = DateTimeField()
    user = ForeignKeyField(column_name='user_id', field='id', model=User)

    class Meta:
        table_name = 'tweet'

观察:

  • 外键 Tweet.user_id 正确检测和映射。

  • 这个 User.username 检测到唯一约束。

  • 每个模型都显式地声明其表名,即使在不需要的情况下也是如此(因为peewee会自动将类名转换为适当的表名)。

  • 的所有参数 ForeignKeyField 是显式声明的,即使它们在默认情况下遵循peewee使用的约定。

备注

这个 UnknownField 是在架构包含Peewee不知道如何映射到字段类的列声明时使用的占位符。

架构迁移

Peewee现在支持模式迁移,对PostgreSQL、SQLite和MySQL的支持测试良好。与其他模式迁移工具不同,Peewee的迁移不处理自省和数据库“版本控制”。相反,peewee为生成和运行模式更改语句提供了许多帮助函数。这个引擎提供了一个更复杂的工具将来可以建立的基础。

迁移可以写成简单的python脚本,并从命令行执行。因为迁移只依赖于您的应用程序 Database 对象,在不引入依赖项的情况下,应该可以轻松地管理更改模型定义和维护一组迁移脚本。

示例用法

首先从 migrate 模块:

from playhouse.migrate import *

实例化A migrator . 这个 SchemaMigrator 类负责生成模式更改操作,然后可以由 migrate() 帮手。

# Postgres example:
my_db = PostgresqlDatabase(...)
migrator = PostgresqlMigrator(my_db)

# SQLite example:
my_db = SqliteDatabase('my_database.db')
migrator = SqliteMigrator(my_db)

使用 migrate() 要执行一个或多个操作:

title_field = CharField(default='')
status_field = IntegerField(null=True)

migrate(
    migrator.add_column('some_table', 'title', title_field),
    migrator.add_column('some_table', 'status', status_field),
    migrator.drop_column('some_table', 'old_column'),
)

警告

迁移不在事务内部运行。如果希望在事务中运行迁移,则需要将调用包装到 migrate 在一个 atomic() 上下文管理器,例如

with my_db.atomic():
    migrate(...)

支持的操作

将新字段添加到现有模型:

# Create your field instances. For non-null fields you must specify a
# default value.
pubdate_field = DateTimeField(null=True)
comment_field = TextField(default='')

# Run the migration, specifying the database table, field name and field.
migrate(
    migrator.add_column('comment_tbl', 'pub_date', pubdate_field),
    migrator.add_column('comment_tbl', 'comment', comment_field),
)

备注

Peewee遵循Django约定,默认情况下,将 _id 设置为给定的 ForeignKeyField 。添加外键时,您需要确保为其指定正确的列名。例如,如果我想添加一个 user 外键-- Tweet 型号:

# Our desired model will look like this:
class Tweet(BaseModel):
    user = ForeignKeyField(User)  # I want to add this field.
    # ... other fields ...

# Migration code:
user = ForeignKeyField(User, field=User.id, null=True)
migrate(
    # Note that the column name given is "user_id".
    migrator.add_column(Tweet._meta.table_name, 'user_id', user),
)

重命名字段:

# Specify the table, original name of the column, and its new name.
migrate(
    migrator.rename_column('story', 'pub_date', 'publish_date'),
    migrator.rename_column('story', 'mod_date', 'modified_date'),
)

删除字段:

migrate(
    migrator.drop_column('story', 'some_old_field'),
)

使字段可以为空或不可以为空:

# Note that when making a field not null that field must not have any
# NULL values present.
migrate(
    # Make `pub_date` allow NULL values.
    migrator.drop_not_null('story', 'pub_date'),

    # Prevent `modified_date` from containing NULL values.
    migrator.add_not_null('story', 'modified_date'),
)

更改字段的数据类型:

# Change a VARCHAR(50) field to a TEXT field.
migrate(
    migrator.alter_column_type('person', 'email', TextField())
)

重命名表:

migrate(
    migrator.rename_table('story', 'stories_tbl'),
)

添加索引:

# Specify the table, column names, and whether the index should be
# UNIQUE or not.
migrate(
    # Create an index on the `pub_date` column.
    migrator.add_index('story', ('pub_date',), False),

    # Create a multi-column index on the `pub_date` and `status` fields.
    migrator.add_index('story', ('pub_date', 'status'), False),

    # Create a unique index on the category and title fields.
    migrator.add_index('story', ('category_id', 'title'), True),
)

删除索引:

# Specify the index name.
migrate(migrator.drop_index('story', 'story_pub_date_status'))

添加或删除表约束:

# Add a CHECK() constraint to enforce the price cannot be negative.
migrate(migrator.add_constraint(
    'products',
    'price_check',
    Check('price >= 0')))

# Remove the price check constraint.
migrate(migrator.drop_constraint('products', 'price_check'))

# Add a UNIQUE constraint on the first and last names.
migrate(migrator.add_unique('person', 'first_name', 'last_name'))

备注

使用非标准模式时,Postgres用户可能需要设置搜索路径。可按如下方式进行:

new_field = TextField(default='', null=False)
migrator = PostgresqlMigrator(db)
migrate(migrator.set_search_path('my_schema_name'),
        migrator.add_column('table', 'field_name', new_field))

迁移API

migrate(*operations)

执行一个或多个架构更改操作。

用途:

migrate(
    migrator.add_column('some_table', 'new_column', CharField(default='')),
    migrator.create_index('some_table', ('new_column',)),
)
class SchemaMigrator(database)
参数:

database -- 一 Database 实例。

这个 SchemaMigrator 负责生成模式更改语句。

add_column(table, column_name, field)
参数:
  • table (str) -- 要添加列的表的名称。

  • column_name (str) -- 新列的名称。

  • field (Field) -- A Field 实例。

向提供的表中添加新列。这个 field 提供的将用于生成适当的列定义。

备注

如果字段不可为空,则必须指定默认值。

备注

对于非空字段,该字段最初将作为空字段添加,然后 UPDATE 将执行语句以用默认值填充列。最后,该列将标记为非空。

drop_column(table, column_name[, cascade=True])
参数:
  • table (str) -- 要从中删除列的表的名称。

  • column_name (str) -- 要删除的列的名称。

  • cascade (bool) -- 是否应使用 CASCADE.

rename_column(table, old_name, new_name)
参数:
  • table (str) -- 包含要重命名的列的表的名称。

  • old_name (str) -- 列的当前名称。

  • new_name (str) -- 列的新名称。

add_not_null(table, column)
参数:
  • table (str) -- 包含列的表的名称。

  • column (str) -- 要使其不可为空的列的名称。

drop_not_null(table, column)
参数:
  • table (str) -- 包含列的表的名称。

  • column (str) -- 要使其可以为空的列的名称。

alter_column_type(table, column, field[, cast=None])
参数:
  • table (str) -- 表的名称。

  • column_name (str) -- 要修改的列的名称。

  • field (Field) -- Field 表示新数据类型的实例。

  • cast -- (仅限postgres)如果数据类型不兼容,请指定强制转换表达式,例如。 column_name::int . 可以作为字符串或 Cast 实例。

改变数据类型。应谨慎使用此方法,因为数据库可能不支持使用不兼容的类型。

rename_table(old_name, new_name)
参数:
  • old_name (str) -- 表的当前名称。

  • new_name (str) -- 表的新名称。

add_index(table, columns[, unique=False[, using=None]])
参数:
  • table (str) -- 要在其上创建索引的表的名称。

  • columns (list) -- 应编入索引的列的列表。

  • unique (bool) -- 新索引是否应指定唯一约束。

  • using (str) -- 索引类型(如果支持),例如GIST或GIN。

drop_index(table, index_name)
参数:
  • table (str) -- 包含要删除的索引的表的名称。

  • index_name (str) -- 要删除的索引的名称。

add_constraint(table, name, constraint)
参数:
  • table (str) -- 要添加约束的表。

  • name (str) -- 用于标识约束的名称。

  • constraint -- 或者 Check() 约束或用于添加任意约束的使用 SQL .

drop_constraint(table, name)
参数:
  • table (str) -- 要从中删除约束的表。

  • name (str) -- 要删除的约束的名称。

add_unique(table, *column_names)
参数:
  • table (str) -- 要添加约束的表。

  • column_names (str) -- 唯一约束的一列或多列。

class PostgresqlMigrator(database)

为PostgreSQL数据库生成迁移。

set_search_path(schema_name)
参数:

schema_name (str) -- 要使用的模式。

为后续操作设置搜索路径(架构)。

class SqliteMigrator(database)

为sqlite数据库生成迁移。

sqlite对 ALTER TABLE 查询,因此当前不支持对sqlite执行以下操作:

  • add_constraint

  • drop_constraint

  • add_unique

class MySQLMigrator(database)

为MySQL数据库生成迁移。

反射

反射模块包含用于自省现有数据库的帮助程序。该模块由Playhouse内的其他几个模块内部使用,包括 DataSetPwiz,模型发生器 .

generate_models(database[, schema=None[, **options]])
参数:
返回:

dict 将表名映射到模型类。

为给定数据库中的表生成模型。有关如何使用此函数的示例,请参见 交互使用peewee .

例子:

>>> from peewee import *
>>> from playhouse.reflection import generate_models
>>> db = PostgresqlDatabase('my_app')
>>> models = generate_models(db)
>>> list(models.keys())
['account', 'customer', 'order', 'orderitem', 'product']

>>> globals().update(models)  # Inject models into namespace.
>>> for cust in customer.select():  # Query using generated model.
...     print(cust.name)
...

Huey Kitty
Mickey Dog
参数:

model (Model) -- 要打印的模型类

返回:

无返回值

打印模型类的用户友好描述,用于调试或交互使用。当前,这将打印表名以及所有字段及其数据类型。这个 交互使用peewee 部分包含一个示例。

实例输出:

>>> from playhouse.reflection import print_model
>>> print_model(User)
user
  id AUTO PK
  email TEXT
  name TEXT
  dob DATE

index(es)
  email UNIQUE

>>> print_model(Tweet)
tweet
  id AUTO PK
  user INT FK: User.id
  title TEXT
  content TEXT
  timestamp DATETIME
  is_published BOOL

index(es)
  user_id
  is_published, timestamp
参数:

model (Model) -- 打印模型

返回:

无返回值

打印SQL CREATE TABLE 对于给定的模型类,这对于调试或交互使用可能很有用。见 交互使用peewee 例如用法部分。注意,这个函数的输出中不包括索引和约束。

实例输出:

>>> from playhouse.reflection import print_table_sql
>>> print_table_sql(User)
CREATE TABLE IF NOT EXISTS "user" (
  "id" INTEGER NOT NULL PRIMARY KEY,
  "email" TEXT NOT NULL,
  "name" TEXT NOT NULL,
  "dob" DATE NOT NULL
)

>>> print_table_sql(Tweet)
CREATE TABLE IF NOT EXISTS "tweet" (
  "id" INTEGER NOT NULL PRIMARY KEY,
  "user_id" INTEGER NOT NULL,
  "title" TEXT NOT NULL,
  "content" TEXT NOT NULL,
  "timestamp" DATETIME NOT NULL,
  "is_published" INTEGER NOT NULL,
  FOREIGN KEY ("user_id") REFERENCES "user" ("id")
)
class Introspector(metadata[, schema=None])

元数据可以通过实例化 Introspector . 建议使用工厂方法,而不是直接实例化此类 from_database() .

classmethod from_database(database[, schema=None])
参数:
  • database -- 一 Database 实例。

  • schema (str) -- 可选架构(某些数据库支持)。

创建一个 Introspector 适用于给定数据库的实例。

用途:

db = SqliteDatabase('my_app.db')
introspector = Introspector.from_database(db)
models = introspector.generate_models()

# User and Tweet (assumed to exist in the database) are
# peewee Model classes generated from the database schema.
User = models['user']
Tweet = models['tweet']
generate_models([skip_invalid=False[, table_names=None[, literal_column_names=False[, bare_fields=False[, include_views=False]]]]])
参数:
  • skip_invalid (bool) -- 跳过名称为无效python标识符的表。

  • table_names (list) -- 要生成的表名列表。如果未指定,则为所有表生成模型。

  • literal_column_names (bool) -- 按原样使用列名。默认情况下,列名称是“python-ized”,即混合大小写变为小写。

  • bare_fields -- 仅SQLite. 不要为自省的列指定数据类型。

  • include_views -- 也为视图生成模型。

返回:

将表名映射到模型类的字典。

内省数据库,读取表、列和外键约束,然后生成一个字典,将每个数据库表映射到动态生成的 Model 班级。

数据库URL

此模块包含一个助手函数,用于从URL连接字符串生成数据库连接。

connect(url, **connect_params)

创建一个 Database 来自给定连接URL的实例。

实例:

  • sqlite:///my_database.db 将创建一个 SqliteDatabase 文件的实例 my_database.db 在当前目录中。

  • *sqlite:///:内存:*将在内存中创建 SqliteDatabase 实例。

  • postgresql://postgres:my_password@localhost:5432/my_database 将创建一个 PostgresqlDatabase 实例。提供用户名和密码,以及要连接的主机和端口。

  • mysql://user:passwd@ip:port/my_db 将创建一个 MySQLDatabase 本地mysql数据库实例 my_db.

  • mysql+pool://user:passwd@ip:port/my_db?max_connections=20&stale_timeout=300 将创建一个 PooledMySQLDatabase 本地mysql数据库实例 my_db 最大连接数设置为20,过时超时设置为300秒。

支持的方案:

用途:

import os
from playhouse.db_url import connect

# Connect to the database URL defined in the environment, falling
# back to a local Sqlite database if no database URL is specified.
db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')
parse(url)

将给定URL中的信息解析为包含 databasehostportuser 和/或 password . 可以在URL查询字符串中传递其他连接参数。

如果使用自定义数据库类,则可以使用 parse() 函数从URL提取信息,然后将其传递给数据库对象。

register_database(db_class, *names)
参数:
  • db_class -- 一个子类 Database .

  • names -- 在URL中用作方案的名称列表,例如“sqlite”或“firebird”

在指定的名称下注册其他数据库类。此函数可用于扩展 connect() 支持其他方案的功能。假设您有一个自定义数据库类用于 Firebird 已命名 FirebirdDatabase .

from playhouse.db_url import connect, register_database

register_database(FirebirdDatabase, 'firebird')
db = connect('firebird://my-firebird-db')

连接池

这个 pool 模块包含多个 Database 为PostgreSQL、MySQL和SQLite数据库提供连接池的类。池通过重写 Database 类打开和关闭与后端的连接。池可以指定一个超时,在该超时之后将回收连接,以及打开连接数的上限。

在多线程应用程序中,最多 max_connections 将被打开。每个线程(或者,如果使用gevent,greenlet)将有自己的连接。

在单线程应用程序中,只创建一个连接。它将被持续回收,直到超过过时超时或显式关闭(使用`.manual_close()`)。

默认情况下,您的应用程序所需要做的就是确保连接在使用完后关闭,并将它们返回到池中。. 对于Web应用程序,这通常意味着在请求开始时,您将打开一个连接,当您返回响应时,您将关闭该连接。

简单Postgres池示例代码:

# Use the special postgresql extensions.
from playhouse.pool import PooledPostgresqlExtDatabase

db = PooledPostgresqlExtDatabase(
    'my_app',
    max_connections=32,
    stale_timeout=300,  # 5 minutes.
    user='postgres')

class BaseModel(Model):
    class Meta:
        database = db

就这样!如果您希望对连接池进行更细粒度的控制,请查看 连接管理 部分。

池API

class PooledDatabase(database[, max_connections=20[, stale_timeout=None[, timeout=None[, **kwargs]]]])
参数:
  • database (str) -- 数据库或数据库文件的名称。

  • max_connections (int) -- 最大连接数。提供 None 无限。

  • stale_timeout (int) -- 允许使用连接的秒数。

  • timeout (int) -- 池满时要阻止的秒数。默认情况下,当池满时,peewee不会阻塞,但只会抛出一个异常。要无限期阻止,请将此值设置为 0 .

  • kwargs -- 传递给数据库类的任意关键字参数。

Mixin类用于子类 Database .

备注

当连接超过其 stale_timeout. 相反,只有在请求新连接时才会关闭过时的连接。

备注

如果打开的连接数超过 max_connections, 一 ValueError 将被提升。

manual_close()

关闭当前打开的连接而不将其返回池。

close_idle()

关闭所有空闲连接。这不包括当前正在使用的任何连接——只包括以前创建但后来返回池的连接。

close_stale([age=600])
参数:

age (int) -- 连接应被视为过时的时间。

返回:

关闭的连接数。

使用中但超过给定年龄的紧密连接。调用此方法时请小心!

close_all()

关闭所有连接。这包括当时可能使用的任何连接。调用此方法时请小心!

class PooledPostgresqlDatabase

的子类 PostgresqlDatabase 混合在一起的 PooledDatabase 帮手。

class PooledPostgresqlExtDatabase

的子类 PostgresqlExtDatabase 混合在一起的 PooledDatabase 帮手。这个 PostgresqlExtDatabasePostgreSQL扩展 并为许多Postgres特定功能提供支持。

class PooledMySQLDatabase

的子类 MySQLDatabase 混合在一起的 PooledDatabase 帮手。

class PooledSqliteDatabase

用于sqlite应用程序的永久连接。

class PooledSqliteExtDatabase

用于sqlite应用程序的持久连接,使用 sqlite扩展 高级数据库驱动程序 SqliteExtDatabase .

测试用例

包含测试Peewee项目时有用的实用程序。

class count_queries([only_select=False])

将计算在上下文中执行的查询数的上下文管理器。

参数:

only_select (bool) -- 仅计数 SELECT 查询。

with count_queries() as counter:
    huey = User.get(User.username == 'huey')
    huey_tweets = [tweet.message for tweet in huey.tweets]

assert counter.count == 2
count

执行的查询数。

get_queries()

返回由SQL查询和参数列表组成的两个元组的列表。

assert_query_count(expected[, only_select=False])

将引发 AssertionError 如果在修饰函数中执行的查询数不等于预期数。

class TestMyApp(unittest.TestCase):
    @assert_query_count(1)
    def test_get_popular_blogs(self):
        popular_blogs = Blog.get_popular()
        self.assertEqual(
            [blog.title for blog in popular_blogs],
            ["Peewee's Playhouse!", "All About Huey", "Mickey's Adventures"])

此函数还可用作上下文管理器:

class TestMyApp(unittest.TestCase):
    def test_expensive_operation(self):
        with assert_query_count(1):
            perform_expensive_operation()

Flask实用工具

这个 playhouse.flask_utils 模块包含多个帮助器,用于将Peewee与 Flask Web框架。

数据库包装

这个 FlaskDB 类是一个包装器,用于从Flask应用程序中配置和引用Peewee数据库。别让它的名字蒙蔽了你:它是 与Peewee数据库不同 . FlaskDB 旨在从Flask应用程序中删除以下样板文件:

  • 基于应用配置数据动态创建Peewee数据库实例。

  • 创建一个基类,应用程序的所有模型都将从该类下降。

  • 在请求的开始和结束处注册钩子,以处理打开和关闭数据库连接。

基本用法:

import datetime
from flask import Flask
from peewee import *
from playhouse.flask_utils import FlaskDB

DATABASE = 'postgresql://postgres:password@localhost:5432/my_database'

# If we want to exclude particular views from the automatic connection
# management, we list them this way:
FLASKDB_EXCLUDED_ROUTES = ('logout',)

app = Flask(__name__)
app.config.from_object(__name__)

db_wrapper = FlaskDB(app)

class User(db_wrapper.Model):
    username = CharField(unique=True)

class Tweet(db_wrapper.Model):
    user = ForeignKeyField(User, backref='tweets')
    content = TextField()
    timestamp = DateTimeField(default=datetime.datetime.now)

上面的代码示例将创建并实例化一个peewee PostgresqlDatabase 由给定的数据库URL指定。请求挂钩将配置为在收到请求时建立连接,并在发送响应时自动关闭连接。最后, FlaskDB 类公开了 FlaskDB.Model 可以用作应用程序模型基础的属性。

下面是如何访问由 FlaskDB 包装:

# Obtain a reference to the Peewee database instance.
peewee_db = db_wrapper.database

@app.route('/transfer-funds/', methods=['POST'])
def transfer_funds():
    with peewee_db.atomic():
        # ...

    return jsonify({'transfer-id': xid})

备注

可以使用 FlaskDB.database 属性。

下面是另一种配置peewee数据库的方法 FlaskDB

app = Flask(__name__)
db_wrapper = FlaskDB(app, 'sqlite:///my_app.db')

上面的示例显示使用数据库URL,对于更高级的用法,您可以指定配置选项的字典,或者简单地传递一个peewee Database 实例:

DATABASE = {
    'name': 'my_app_db',
    'engine': 'playhouse.pool.PooledPostgresqlDatabase',
    'user': 'postgres',
    'max_connections': 32,
    'stale_timeout': 600,
}

app = Flask(__name__)
app.config.from_object(__name__)

wrapper = FlaskDB(app)
pooled_postgres_db = wrapper.database

使用PeeWee Database 对象:

peewee_db = PostgresqlExtDatabase('my_app')
app = Flask(__name__)
db_wrapper = FlaskDB(app, peewee_db)

带应用程序工厂的数据库

如果您喜欢使用 application factory pattern , the FlaskDB 类实现 init_app() 方法。

作为工厂使用:

db_wrapper = FlaskDB()

# Even though the database is not yet initialized, you can still use the
# `Model` property to create model classes.
class User(db_wrapper.Model):
    username = CharField(unique=True)


def create_app():
    app = Flask(__name__)
    app.config['DATABASE'] = 'sqlite:////home/code/apps/my-database.db'
    db_wrapper.init_app(app)
    return app

查询实用程序

这个 flask_utils 模块提供了几个帮助程序来管理Web应用程序中的查询。一些常见的模式包括:

get_object_or_404(query_or_model, *query)
参数:
  • query_or_model -- 要么是 Model 类或预筛选 SelectQuery .

  • query -- 任意复杂的peewee表达式。

检索与给定查询匹配的对象,或返回404未找到响应。一个常见的用例可能是一个日志的详细页面。您希望检索与给定URL匹配的文章,或者返回404。

例子:

@app.route('/blog/<slug>/')
def post_detail(slug):
    public_posts = Post.select().where(Post.published == True)
    post = get_object_or_404(public_posts, (Post.slug == slug))
    return render_template('post_detail.html', post=post)
object_list(template_name, query[, context_variable='object_list'[, paginate_by=20[, page_var='page'[, check_bounds=True[, **kwargs]]]]])
参数:
  • template_name -- 要呈现的模板的名称。

  • query -- A SelectQuery 分页实例。

  • context_variable -- 用于分页对象列表的上下文变量名。

  • paginate_by -- 每页的对象数。

  • page_var -- 的名字 GET 包含该页的参数。

  • check_bounds -- 是否检查给定页是否为有效页。如果 check_boundsTrue 并且指定了一个无效的页面,那么将返回404。

  • kwargs -- 要传递到模板上下文中的任意键/值对。

检索由给定查询指定的对象的分页列表。分页对象列表将使用给定的 context_variable 以及有关当前页和总页数的元数据,最后是作为关键字参数传递的任意上下文数据。

该页是使用 page GET 参数,例如 /my-object-list/?page=3 将返回对象的第三页。

例子:

@app.route('/blog/')
def post_index():
    public_posts = (Post
                    .select()
                    .where(Post.published == True)
                    .order_by(Post.timestamp.desc()))

    return object_list(
        'post_index.html',
        query=public_posts,
        context_variable='post_list',
        paginate_by=10)

模板将具有以下上下文:

  • post_list 其中包含最多10个帖子的列表。

  • page ,其中包含基于 page GET 参数。

  • pagination ,A PaginatedQuery 实例。

class PaginatedQuery(query_or_model, paginate_by[, page_var='page'[, check_bounds=False]])
参数:
  • query_or_model -- 要么是 Model 或A SelectQuery 包含要分页的记录集合的实例。

  • paginate_by -- 每页的对象数。

  • page_var -- 的名字 GET 包含该页的参数。

  • check_bounds -- 是否检查给定页是否为有效页。如果 check_boundsTrue 并且指定了一个无效的页面,那么将返回404。

根据帮助器类执行分页 GET 参数。

get_page()

返回当前选定的页面,如 page_var GET 参数。如果没有显式选择页面,则此方法将返回1,指示第一页。

get_page_count()

返回可能的总页数。

get_object_list()

使用的值 get_page() ,返回用户请求的对象页。返回值为 SelectQuery 与适当的 LIMITOFFSET 条款。

如果 check_bounds 被设定为 True 并且请求的页面不包含任何对象,那么将引发404。