数据库

Peewee Database 对象表示到数据库的连接。这个 Database 类是用打开到数据库的连接所需的所有信息实例化的,然后可以用于:

  • 打开和关闭连接。

  • 执行查询。

  • 管理事务(和保存点)。

  • 自省表、列、索引和约束。

Peewee支持sqlite、mysql和postgres。每个数据库类都提供一些基本的、特定于数据库的配置选项。

from peewee import *

# SQLite database using WAL journal mode and 64MB cache.
sqlite_db = SqliteDatabase('/path/to/app.db', pragmas={
    'journal_mode': 'wal',
    'cache_size': -1024 * 64})

# Connect to a MySQL database on network.
mysql_db = MySQLDatabase('my_app', user='app', password='db_password',
                         host='10.1.0.8', port=3306)

# Connect to a Postgres database.
pg_db = PostgresqlDatabase('my_app', user='postgres', password='secret',
                           host='10.1.0.9', port=5432)

Peewee通过特定于数据库的扩展模块为SQLite、Postgres和CockroachDB提供高级支持。要使用扩展功能,请导入相应的特定于数据库的模块并使用提供的数据库类:

from playhouse.sqlite_ext import SqliteExtDatabase

# Use SQLite (will register a REGEXP function and set busy timeout to 3s).
db = SqliteExtDatabase('/path/to/app.db', regexp_function=True, timeout=3,
                       pragmas={'journal_mode': 'wal'})


from playhouse.postgres_ext import PostgresqlExtDatabase

# Use Postgres (and register hstore extension).
db = PostgresqlExtDatabase('my_app', user='postgres', register_hstore=True)


from playhouse.cockroachdb import CockroachDatabase

# Use CockroachDB.
db = CockroachDatabase('my_app', user='root', port=26257, host='10.1.0.8')

# CockroachDB connections may require a number of parameters, which can
# alternatively be specified using a connection-string.
db = CockroachDatabase('postgresql://...')

有关数据库扩展的详细信息,请参阅:

初始化数据库

这个 Database 初始化方法要求将数据库的名称作为第一个参数。在建立连接时,后续的关键字参数将传递给基础数据库驱动程序,使您能够轻松传递特定于供应商的参数。

例如,对于PostgreSQL,通常需要指定 hostuserpassword 创建连接时。这些不是标准的 Peewee Database 参数,因此它们将直接传递回 psycopg2 创建连接时:

db = PostgresqlDatabase(
    'database_name',  # Required by Peewee.
    user='postgres',  # Will be passed directly to psycopg2.
    password='secret',  # Ditto.
    host='db.mysite.com')  # Ditto.

另一个例子是 pymysql 驾驶员接受 charset 不是标准Peewee的参数 Database 参数。要设置此值,只需传入 charset 除了你的其他价值观:

db = MySQLDatabase('database_name', user='www-data', charset='utf8mb4')

有关可用参数,请参阅数据库驱动程序的文档:

使用PostgreSQL

要连接到PostgreSQL数据库,我们将使用 PostgresqlDatabase . 第一个参数始终是数据库的名称,之后可以指定任意 psycopg2 parameters .

psql_db = PostgresqlDatabase('my_database', user='postgres')

class BaseModel(Model):
    """A base model that will use our Postgresql database"""
    class Meta:
        database = psql_db

class User(BaseModel):
    username = CharField()

这个 Playhouse,扩展到Peewee 包含一个 Postgresql extension module 它提供了许多Postgres特有的功能,例如:

如果您想使用这些出色的功能,请使用 PostgresqlExtDatabaseplayhouse.postgres_ext 模块:

from playhouse.postgres_ext import PostgresqlExtDatabase

psql_db = PostgresqlExtDatabase('my_database', user='postgres')

隔离级别

自Peewee 3.9.7起,可以使用中的符号常量将隔离级别指定为初始化参数。 psycopg2.extensions

from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE

db = PostgresqlDatabase('my_app', user='postgres', host='db-host',
                        isolation_level=ISOLATION_LEVEL_SERIALIZABLE)

备注

在旧版本中,可以手动设置基础psycopg2连接的隔离级别。这可以一次性完成:

db = PostgresqlDatabase(...)
conn = db.connection()  # returns current connection.

from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)

要在每次创建连接时运行此函数,请对 _initialize_database() 钩子,设计用于此目的:

class SerializedPostgresqlDatabase(PostgresqlDatabase):
    def _initialize_connection(self, conn):
        conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)

使用CockroachDB

使用 CockroachDatabase 数据库类,在中定义 playhouse.cockroachdb

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app', user='root', port=26257, host='localhost')

如果您正在使用 Cockroach Cloud ,您可能会发现使用连接字符串指定连接参数更容易:

db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')

备注

CockroachDB需要 psycopg2 (Postgres)Python驱动程序。

备注

CockroachDB安装和入门指南可在此处找到:https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html

CRDB提供客户端事务重试,使用特殊的 CockroachDatabase.run_transaction() 辅助方法。此方法接受callable,它负责执行任何可能需要重试的事务语句。

最简单的例子 run_transaction()

def create_user(email):
    # Callable that accepts a single argument (the database instance) and
    # which is responsible for executing the transactional SQL.
    def callback(db_ref):
        return User.create(email=email)

    return db.run_transaction(callback, max_attempts=10)

huey = create_user('huey@example.com')

备注

这个 cockroachdb.ExceededMaxAttempts 如果在给定的尝试次数后无法提交事务,将引发异常。如果SQL格式错误、违反约束等,则函数将向调用方引发异常。

有关详细信息,请参阅:

使用SQLite

要连接到sqlite数据库,我们将使用 SqliteDatabase . 第一个参数是包含数据库的文件名或字符串 ':memory:' 创建内存中的数据库。在数据库文件名之后,可以指定一个列表或pragma或任何其他任意的 sqlite3 parameters .

sqlite_db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})

class BaseModel(Model):
    """A base model that will use our Sqlite database."""
    class Meta:
        database = sqlite_db

class User(BaseModel):
    username = TextField()
    # etc, etc

Peewee包括 SQLite extension module 它提供了许多特定于sqlite的功能,例如 full-text searchjson extension support 还有更多。如果您想使用这些出色的功能,请使用 SqliteExtDatabaseplayhouse.sqlite_ext 模块:

from playhouse.sqlite_ext import SqliteExtDatabase

sqlite_db = SqliteExtDatabase('my_app.db', pragmas={
    'journal_mode': 'wal',  # WAL-mode.
    'cache_size': -64 * 1000,  # 64MB cache.
    'synchronous': 0})  # Let the OS manage syncing.

pragma语句

sqlite允许通过 PRAGMA 声明( SQLite documentation )。这些语句通常在创建新的数据库连接时运行。运行一个或多个 PRAGMA 针对新连接的语句,可以将其指定为字典或包含pragma名称和值的2个元组的列表:

db = SqliteDatabase('my_app.db', pragmas={
    'journal_mode': 'wal',
    'cache_size': 10000,  # 10000 pages, or ~40MB
    'foreign_keys': 1,  # Enforce foreign-key constraints
})

pragma也可以使用 pragma() 方法或暴露在 SqliteDatabase 对象:

# Set cache size to 64MB for *current connection*.
db.pragma('cache_size', -1024 * 64)

# Same as above.
db.cache_size = -1024 * 64

# Read the value of several pragmas:
print('cache_size:', db.cache_size)
print('foreign_keys:', db.foreign_keys)
print('journal_mode:', db.journal_mode)
print('page_size:', db.page_size)

# Set foreign_keys pragma on current connection *AND* on all
# connections opened subsequently.
db.pragma('foreign_keys', 1, permanent=True)

注意

使用 pragma() 默认情况下,方法在连接关闭后不持久。要将pragma配置为在每次打开连接时运行,请指定 permanent=True .

备注

在sqlite文档中可以找到pragma设置的完整列表、它们的含义和接受的值:http://sqlite.org/pragma.html

用户定义函数

可以使用用户定义的python代码扩展sqlite。这个 SqliteDatabase 类支持三种类型的用户定义扩展:

  • 函数-获取任意数量的参数并返回单个值。

  • 聚合-从多行聚合参数并返回单个值。

  • 排序规则-描述如何对某个值排序。

备注

有关更多扩展支持,请参见 SqliteExtDatabase ,在 playhouse.sqlite_ext 模块。

用户定义函数示例:

db = SqliteDatabase('analytics.db')

from urllib.parse import urlparse

@db.func('hostname')
def hostname(url):
    if url is not None:
        return urlparse(url).netloc

# Call this function in our code:
# The following finds the most common hostnames of referrers by count:
query = (PageView
         .select(fn.hostname(PageView.referrer), fn.COUNT(PageView.id))
         .group_by(fn.hostname(PageView.referrer))
         .order_by(fn.COUNT(PageView.id).desc()))

用户定义聚合示例:

from hashlib import md5

@db.aggregate('md5')
class MD5Checksum(object):
    def __init__(self):
        self.checksum = md5()

    def step(self, value):
        self.checksum.update(value.encode('utf-8'))

    def finalize(self):
        return self.checksum.hexdigest()

# Usage:
# The following computes an aggregate MD5 checksum for files broken
# up into chunks and stored in the database.
query = (FileChunk
         .select(FileChunk.filename, fn.MD5(FileChunk.data))
         .group_by(FileChunk.filename)
         .order_by(FileChunk.filename, FileChunk.sequence))

排序规则示例:

@db.collation('ireverse')
def collate_reverse(s1, s2):
    # Case-insensitive reverse.
    s1, s2 = s1.lower(), s2.lower()
    return (s1 < s2) - (s1 > s2)  # Equivalent to -cmp(s1, s2)

# To use this collation to sort books in reverse order...
Book.select().order_by(collate_reverse.collation(Book.title))

# Or...
Book.select().order_by(Book.title.asc(collation='reverse'))

用户定义的表值函数示例(请参见 TableFunctiontable_function )更多详情:

from playhouse.sqlite_ext import TableFunction

db = SqliteDatabase('my_app.db')

@db.table_function('series')
class Series(TableFunction):
    columns = ['value']
    params = ['start', 'stop', 'step']

    def initialize(self, start=0, stop=None, step=1):
        """
        Table-functions declare an initialize() method, which is
        called with whatever arguments the user has called the
        function with.
        """
        self.start = self.current = start
        self.stop = stop or float('Inf')
        self.step = step

    def iterate(self, idx):
        """
        Iterate is called repeatedly by the SQLite database engine
        until the required number of rows has been read **or** the
        function raises a `StopIteration` signalling no more rows
        are available.
        """
        if self.current > self.stop:
            raise StopIteration

        ret, self.current = self.current, self.current + self.step
        return (ret,)

# Usage:
cursor = db.execute_sql('SELECT * FROM series(?, ?, ?)', (0, 5, 2))
for value, in cursor:
    print(value)

# Prints:
# 0
# 2
# 4

有关详细信息,请参阅:

设置事务的锁定模式

可以以三种不同的模式打开SQLite事务:

  • Deferred默认)-仅在执行读或写时获取锁。第一次读取创建 shared lock 第一次写会创建一个 reserved lock . 由于锁的获取被延迟到实际需要时,另一个线程或进程可能会创建一个单独的事务,并在当前线程上的begin执行之后写入数据库。

  • Immediate -A reserved lock 立即获得。在此模式下,其他数据库不能写入数据库或打开 immediateexclusive 交易。但是,其他进程可以继续从数据库中读取数据。

  • Exclusive -打开一个 exclusive lock 这将阻止所有(除了读取未提交的)连接在事务完成之前访问数据库。

指定锁定模式的示例:

db = SqliteDatabase('app.db')

with db.atomic('EXCLUSIVE'):
    do_something()


@db.atomic('IMMEDIATE')
def some_other_function():
    # This function is wrapped in an "IMMEDIATE" transaction.
    do_something_else()

有关详细信息,请参见sqlite locking documentation. To learn more about transactions in Peewee, see the 管理交易 文档。

高级sqlite驱动程序apsw

Peewee还附带了一个备用的SQLite数据库,该数据库使用 高级sqlite驱动程序apsw 。有关APSW的更多信息,请访问 APSW project website 。APSW提供以下特殊功能:

  • 虚拟表、虚拟文件系统、Blob I/O、备份和文件控制。

  • 连接可以跨线程共享,而无需任何附加锁定。

  • 事务由代码显式管理。

  • 处理Unicode 正确地.

  • APSW比标准库sqlite3模块更快。

  • 向您的python应用程序公开几乎所有的sqlite c API。

如果要使用APSW,请使用 APSWDatabaseapsw_ext 模块:

from playhouse.apsw_ext import APSWDatabase

apsw_db = APSWDatabase('my_app.db')

使用MySQL

要连接到MySQL数据库,我们将使用 MySQLDatabase 。在数据库名称之后,您可以指定将传递回驱动程序的任意连接参数(例如 pymysqlmysqlclient )。

mysql_db = MySQLDatabase('my_database')

class BaseModel(Model):
    """A base model that will use our MySQL database"""
    class Meta:
        database = mysql_db

class User(BaseModel):
    username = CharField()
    # etc, etc

驱动程序信息:

  • pymysql 是一个纯PYTHON的MySQL客户端,支持PYTHON2和PYTHON3。peewee会先尝试使用pymysql。

  • mysqlclient 使用c扩展并支持python3。它公开了一个 MySQLdb 模块。如果未安装pymysql,Peewee将尝试使用此模块。

  • mysql-python 也被称为 MySQLdb1 而且是遗留下来的,不应该使用。因为它与mysqlclient共享相同的模块名称,所以同样适用。

  • mysql-connector python 纯 Python (我想??)支持python3。要使用此驱动程序,您可以使用 MySQLConnectorDatabaseplayhouse.mysql_ext 分机。

错误2006:MySQL服务器已离开

当MySQL终止一个空闲的数据库连接时,可能会发生这个特定的错误。这通常发生在不显式管理数据库连接的Web应用程序上。发生的情况是应用程序启动,连接打开以处理执行的第一个查询,并且,由于该连接从未关闭,它将保持打开状态,等待更多查询。

要解决此问题,请确保在需要执行查询时显式连接到数据库,并在完成后关闭连接。在Web应用程序中,这通常意味着您将在请求进入时打开连接,并在返回响应时关闭连接。

框架集成 有关配置通用Web框架以管理数据库连接的示例的部分。

使用数据库URL连接

Playhouse模块 数据库URL 提供帮助者 connect() 接受数据库URL并返回 Database 实例。

示例代码:

import os

from peewee import *
from playhouse.db_url import connect

# Connect to the database URL defined in the environment, falling
# back to a local Sqlite database if no database URL is specified.
db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')

class BaseModel(Model):
    class Meta:
        database = db

数据库URL示例:

运行时数据库配置

有时数据库连接设置直到运行时才知道,此时可以从配置文件或环境加载这些值。在这种情况下,你可以 defer 通过指定 None 作为数据库的名称。

database = PostgresqlDatabase(None)  # Un-initialized database.

class SomeModel(Model):
    class Meta:
        database = database

如果在数据库未初始化时尝试连接或发出任何查询,将出现异常:

>>> database.connect()
Exception: Error, database not properly initialized before opening connection

要初始化数据库,请调用 init() 具有数据库名称和任何其他关键字参数的方法:

database_name = input('What is the name of the db? ')
database.init(database_name, host='localhost', user='postgres')

有关初始化数据库的更多控制,请参阅下一节, 动态定义数据库 .

动态定义数据库

为了更好地控制数据库的定义/初始化方式,可以使用 DatabaseProxy 帮手。 DatabaseProxy 对象充当占位符,然后在运行时可以将其替换为其他对象。在下面的示例中,我们将根据应用程序的配置方式交换数据库:

database_proxy = DatabaseProxy()  # Create a proxy for our db.

class BaseModel(Model):
    class Meta:
        database = database_proxy  # Use proxy for our DB.

class User(BaseModel):
    username = CharField()

# Based on configuration, use a different database.
if app.config['DEBUG']:
    database = SqliteDatabase('local.db')
elif app.config['TESTING']:
    database = SqliteDatabase(':memory:')
else:
    database = PostgresqlDatabase('mega_production_db')

# Configure our proxy to use the db we specified in config.
database_proxy.initialize(database)

警告

只有当实际的数据库驱动程序在运行时发生变化时才使用此方法。例如,如果测试和本地开发环境在sqlite上运行,但部署的应用程序使用PostgreSQL,则可以使用 DatabaseProxy 在运行时更换发动机。

但是,如果只有连接值在运行时发生变化,例如到数据库文件的路径或数据库主机的路径,则应改为使用 Database.init() . 见 运行时数据库配置 了解更多详细信息。

备注

可能更容易避免使用 DatabaseProxy 而是使用 Database.bind() 以及设置或更改数据库的相关方法。参见 在运行时设置数据库 有关详细信息。

在运行时设置数据库

我们已经看到了三种使用peewee配置数据库的方法:

# The usual way:
db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})


# Specify the details at run-time:
db = SqliteDatabase(None)
...
db.init(db_filename, pragmas={'journal_mode': 'wal'})


# Or use a placeholder:
db = DatabaseProxy()
...
db.initialize(SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'}))

Peewee还可以为模型类设置或更改数据库。Peewee测试套件使用此技术在运行测试时将测试模型类绑定到各种数据库实例。

有两套互补的方法:

例如,我们将声明两个模型 没有 指定任何数据库:

class User(Model):
    username = TextField()

class Tweet(Model):
    user = ForeignKeyField(User, backref='tweets')
    content = TextField()
    timestamp = TimestampField()

在运行时将模型绑定到数据库:

postgres_db = PostgresqlDatabase('my_app', user='postgres')
sqlite_db = SqliteDatabase('my_app.db')

# At this point, the User and Tweet models are NOT bound to any database.

# Let's bind them to the Postgres database:
postgres_db.bind([User, Tweet])

# Now we will temporarily bind them to the sqlite database:
with sqlite_db.bind_ctx([User, Tweet]):
    # User and Tweet are now bound to the sqlite database.
    assert User._meta.database is sqlite_db

# User and Tweet are once again bound to the Postgres database.
assert User._meta.database is postgres_db

这个 Model.bind()Model.bind_ctx() 方法对绑定给定模型类的作用相同:

# Bind the user model to the sqlite db. By default, Peewee will also
# bind any models that are related to User via foreign-key as well.
User.bind(sqlite_db)

assert User._meta.database is sqlite_db
assert Tweet._meta.database is sqlite_db  # Related models bound too.

# Here we will temporarily bind *just* the User model to the postgres db.
with User.bind_ctx(postgres_db, bind_backrefs=False):
    assert User._meta.database is postgres_db
    assert Tweet._meta.database is sqlite_db  # Has not changed.

# And now User is back to being bound to the sqlite_db.
assert User._meta.database is sqlite_db

这个 测试Peewee应用程序 本文档的部分还包含一些使用 bind() 方法。

线程安全与多数据库

如果您计划在多线程应用程序中的运行时更改数据库,则将模型的数据库存储在本地线程中将防止争用条件。这可以通过自定义模型来实现 Metadata 类(请参见 ThreadSafeDatabaseMetadata ,包含在 playhouse.shortcuts ):

from peewee import *
from playhouse.shortcuts import ThreadSafeDatabaseMetadata

class BaseModel(Model):
    class Meta:
        # Instruct peewee to use our thread-safe metadata implementation.
        model_metadata_class = ThreadSafeDatabaseMetadata

现在可以使用熟悉的在多线程环境中运行时安全地交换数据库 Database.bind()Database.bind_ctx() 方法。

连接管理

要打开到数据库的连接,请使用 Database.connect() 方法:

>>> db = SqliteDatabase(':memory:')  # In-memory SQLite database.
>>> db.connect()
True

如果我们试着调用 connect() 在一个已经打开的数据库中,我们得到一个 OperationalError

>>> db.connect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/charles/pypath/peewee.py", line 2390, in connect
    raise OperationalError('Connection already opened.')
peewee.OperationalError: Connection already opened.

为了防止引发此异常,我们可以调用 connect() 再加上一个论点, reuse_if_open

>>> db.close()  # Close connection.
True
>>> db.connect()
True
>>> db.connect(reuse_if_open=True)
False

注意,呼叫 connect() 收益率 False 如果数据库连接已打开。

要关闭连接,请使用 Database.close() 方法:

>>> db.close()
True

调用 close() 在已关闭的连接上,不会导致异常,但会返回 False

>>> db.connect()  # Open connection.
True
>>> db.close()  # Close connection.
True
>>> db.close()  # Connection already closed, returns False.
False

您可以使用 Database.is_closed() 方法:

>>> db.is_closed()
True

使用自动连接

如果数据库初始化为 autoconnect=True (默认)。显式管理连接被视为 最佳实践 因此,您可以考虑禁用 autoconnect 行为。

明确您的连接生命周期是非常有帮助的。例如,如果连接失败,则在打开连接时将捕获异常,而不是在执行查询后的某个任意时间。此外,如果使用 connection pool ,有必要打电话 connect()close() 以确保正确回收连接。

为确保正确性,请禁用 autoconnect

db = PostgresqlDatabase('my_app', user='postgres', autoconnect=False)

线程安全性

peewee使用线程本地存储跟踪连接状态,使peewee Database 对象可以安全地与多个线程一起使用。每个线程都有自己的连接,因此任何给定的线程在给定的时间只能有一个打开的连接。

上下文管理器

数据库对象本身可以用作上下文管理器,它在打包的代码块期间打开连接。此外,事务在包装块的开头打开,并在连接关闭之前提交(除非发生错误,在这种情况下事务将回滚)。

>>> db.is_closed()
True
>>> with db:
...     print(db.is_closed())  # db is open inside context manager.
...
False
>>> db.is_closed()  # db is closed.
True

如果要单独管理事务,可以使用 Database.connection_context() 上下文管理器。

>>> with db.connection_context():
...     # db connection is open.
...     pass
...
>>> db.is_closed()  # db connection is closed.
True

这个 connection_context() 方法也可以用作修饰器:

@db.connection_context()
def prepare_database():
    # DB connection will be managed by the decorator, which opens
    # a connection, calls function, and closes upon returning.
    db.create_tables(MODELS)  # Create schema.
    load_fixture_data(db)

DB-API连接对象

要获取对基础DB-API 2.0连接的引用,请使用 Database.connection() 方法。此方法将返回当前打开的连接对象(如果存在),否则将打开新的连接。

>>> db.connection()
<sqlite3.Connection object at 0x7f94e9362f10>

连接池

连接池由 pool module ,包括在 playhouse 扩展库。池支持:

  • 超时后将回收连接。

  • 打开的连接数的上限。

from playhouse.pool import PooledPostgresqlExtDatabase

db = PooledPostgresqlExtDatabase(
    'my_database',
    max_connections=8,
    stale_timeout=300,
    user='postgres')

class BaseModel(Model):
    class Meta:
        database = db

以下集合数据库类可用:

有关Peewee连接池的深入讨论,请参见 连接池 剖面图 playhouse 文档。

测试Peewee应用程序

当为使用peewee的应用程序编写测试时,可能需要使用特殊的测试数据库。另一种常见的做法是对干净的数据库运行测试,这意味着确保在每个测试开始时表是空的。

要在运行时将模型绑定到数据库,可以使用以下方法:

  • Database.bind_ctx() 返回一个上下文管理器,该管理器将给定模型绑定到包装块期间的数据库实例。

  • Model.bind_ctx() ,它同样返回一个上下文管理器,该管理器在包装块期间将模型(及其依赖项)绑定到给定的数据库。

  • Database.bind() 这是一个一次性操作,将模型(及其依赖项)绑定到给定的数据库。

  • Model.bind() 这是一个一次性操作,将模型(及其依赖项)绑定到给定的数据库。

根据您的用例,这些选项中的一个可能更有意义。对于下面的示例,我将使用 Model.bind() .

测试用例设置示例:

# tests.py
import unittest
from my_app.models import EventLog, Relationship, Tweet, User

MODELS = [User, Tweet, EventLog, Relationship]

# use an in-memory SQLite for tests.
test_db = SqliteDatabase(':memory:')

class BaseTestCase(unittest.TestCase):
    def setUp(self):
        # Bind model classes to test db. Since we have a complete list of
        # all models, we do not need to recursively bind dependencies.
        test_db.bind(MODELS, bind_refs=False, bind_backrefs=False)

        test_db.connect()
        test_db.create_tables(MODELS)

    def tearDown(self):
        # Not strictly necessary since SQLite in-memory databases only live
        # for the duration of the connection, and in the next step we close
        # the connection...but a good practice all the same.
        test_db.drop_tables(MODELS)

        # Close connection to db.
        test_db.close()

        # If we wanted, we could re-bind the models to their original
        # database here. But for tests this is probably not necessary.

除此之外,根据经验,我建议您使用生产中使用的相同数据库后端测试应用程序,以避免任何潜在的兼容性问题。

如果您想看到更多关于如何使用peewee运行测试的示例,请查看peewee自己的 test-suite .

与gevent异步

gevent 建议使用PostgreSQL或MySQL进行异步I/O。我喜欢GEvent的原因:

  • 无需特殊用途的“循环感知”重新实现 一切. 使用Asyncio的第三方库通常必须重新实现代码的层和层,以及重新实现协议本身。

  • gevent允许您用普通、干净、惯用的Python编写应用程序。不需要乱丢每一行“异步”、“等待”和其他杂音。没有回电,未来,任务,承诺。没有污点。

  • gevent与python 2一起工作 and Python 3。

  • 盖特斯群岛 Python 的. Asyncio是一个不喜欢吃 Python 的人。

除猴子修补插座外,如果您使用 MySQL 使用纯python驱动程序 pymysql 或正在使用 mysql-connector 在纯Python模式下。用C语言编写的MySQL驱动程序需要特殊的配置,这超出了本文档的范围。

为了 Postgrespsycopg2 ,这是C扩展,您可以使用以下代码段注册将使您的连接异步的事件挂钩:

from gevent.socket import wait_read, wait_write
from psycopg2 import extensions

# Call this function after monkey-patching socket (etc).
def patch_psycopg2():
    extensions.set_wait_callback(_psycopg2_gevent_callback)

def _psycopg2_gevent_callback(conn, timeout=None):
    while True:
        state = conn.poll()
        if state == extensions.POLL_OK:
            break
        elif state == extensions.POLL_READ:
            wait_read(conn.fileno(), timeout=timeout)
        elif state == extensions.POLL_WRITE:
            wait_write(conn.fileno(), timeout=timeout)
        else:
            raise ValueError('poll() returned unexpected result')

数据库, 因为它是嵌入在Python应用程序本身中的,所以不执行任何套接字操作,这将是非阻塞的候选操作。异步对SQLite数据库没有任何影响。

框架集成

对于Web应用程序,通常在收到请求时打开连接,在传递响应时关闭连接。在这一节中,我将描述如何向Web应用添加钩子,以确保正确处理数据库连接。

这些步骤将确保无论您使用的是简单的sqlite数据库还是多个postgres连接池,peewee都能正确处理这些连接。

备注

接收大量流量的应用程序可能会从使用 connection pool 减少每次请求时建立和断开连接的成本。

Flask

Flask和peewee是一个很好的组合,我可以选择任何规模的项目。Flask提供两个钩子,我们将使用它们来打开和关闭DB连接。我们将在收到请求时打开连接,然后在返回响应时关闭连接。

from flask import Flask
from peewee import *

database = SqliteDatabase('my_app.db')
app = Flask(__name__)

# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.before_request
def _db_connect():
    database.connect()

# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.teardown_request
def _db_close(exc):
    if not database.is_closed():
        database.close()

丹戈

虽然Peewee和Django一起使用不太常见,但实际上很容易使用这两者。为了管理与Django的Peewee数据库连接,我认为最简单的方法是向应用程序添加中间件。中间件应该是中间件列表中的第一个,以确保它在处理请求时首先运行,在返回响应时最后运行。

如果你有一个叫Django的项目 my_blog Peewee数据库在模块中定义。 my_blog.db ,您可以添加以下中间件类:

# middleware.py
from my_blog.db import database  # Import the peewee database instance.


def PeeweeConnectionMiddleware(get_response):
    def middleware(request):
        database.connect()
        try:
            response = get_response(request)
        finally:
            if not database.is_closed():
                database.close()
        return response
    return middleware


# Older Django < 1.10 middleware.
class PeeweeConnectionMiddleware(object):
    def process_request(self, request):
        database.connect()

    def process_response(self, request, response):
        if not database.is_closed():
            database.close()
        return response

为了确保中间件得到执行,请将其添加到 settings 模块:

# settings.py
MIDDLEWARE_CLASSES = (
    # Our custom middleware appears first in the list.
    'my_blog.middleware.PeeweeConnectionMiddleware',

    # These are the default Django 1.7 middlewares. Yours may differ,
    # but the important this is that our Peewee middleware comes first.
    'django.middleware.common.CommonMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
)

# ... other Django settings ...

Boottle

我自己没有使用瓶子,但查看文档时,我认为以下代码应该确保数据库连接得到正确管理:

# app.py
from bottle import hook  #, route, etc, etc.
from peewee import *

db = SqliteDatabase('my-bottle-app.db')

@hook('before_request')
def _connect_db():
    db.connect()

@hook('after_request')
def _close_db():
    if not db.is_closed():
        db.close()

# Rest of your bottle app goes here.

Web.py

参见文档 application processors .

db = SqliteDatabase('my_webpy_app.db')

def connection_processor(handler):
    db.connect()
    try:
        return handler()
    finally:
        if not db.is_closed():
            db.close()

app.add_processor(connection_processor)

Tornado

看起来像龙卷风 RequestHandler 类实现了两个钩子,可以在处理请求时用于打开和关闭连接。

from tornado.web import RequestHandler

db = SqliteDatabase('my_db.db')

class PeeweeRequestHandler(RequestHandler):
    def prepare(self):
        db.connect()
        return super(PeeweeRequestHandler, self).prepare()

    def on_finish(self):
        if not db.is_closed():
            db.close()
        return super(PeeweeRequestHandler, self).on_finish()

在应用程序中,而不是扩展默认值 RequestHandler ,现在可以扩展 PeeweeRequestHandler .

注意,这并不能解决如何在Tornado或其他事件循环中异步使用Peewee。

Wheezy.web

连接处理代码可以放在 middleware .

def peewee_middleware(request, following):
    db.connect()
    try:
        response = following(request)
    finally:
        if not db.is_closed():
            db.close()
    return response

app = WSGIApplication(middleware=[
    lambda x: peewee_middleware,
    # ... other middlewares ...
])

感谢github用户*@tuukkamustonen*提交此代码。

Falcon

连接处理代码可以放在 middleware component .

import falcon
from peewee import *

database = SqliteDatabase('my_app.db')

class PeeweeConnectionMiddleware(object):
    def process_request(self, req, resp):
        database.connect()

    def process_response(self, req, resp, resource, req_succeeded):
        if not database.is_closed():
            database.close()

application = falcon.API(middleware=[
    PeeweeConnectionMiddleware(),
    # ... other middlewares ...
])

Pyramid

设置处理数据库连接生存期的请求工厂,如下所示:

from pyramid.request import Request

db = SqliteDatabase('pyramidapp.db')

class MyRequest(Request):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        db.connect()
        self.add_finished_callback(self.finish)

    def finish(self, request):
        if not db.is_closed():
            db.close()

在您的应用程序中 main() make sure MyRequest 用作 request_factory:

def main(global_settings, **settings):
    config = Configurator(settings=settings, ...)
    config.set_request_factory(MyRequest)

CherryPy

Publish/Subscribe pattern .

def _db_connect():
    db.connect()

def _db_close():
    if not db.is_closed():
        db.close()

cherrypy.engine.subscribe('before_request', _db_connect)
cherrypy.engine.subscribe('after_request', _db_close)

Sanic

在SANIC中,连接处理代码可以放在请求和响应中间件中。 sanic middleware .

# app.py
@app.middleware('request')
async def handle_request(request):
    db.connect()

@app.middleware('response')
async def handle_response(request, response):
    if not db.is_closed():
        db.close()

快速API

FastAPI是一个与Ayncio兼容的框架。Peewee依赖线程本地变量(也与GEvent兼容)来管理跨请求的连接状态。为了与Ayncio一起使用,需要一些重写,以将线程本地行为替换为与Ayncio兼容的上下文本地行为。

有关使用Peewee和FastAPI的完整处理,请参阅此处的FastAPI文档:

https://fastapi.tiangolo.com/advanced/sql-databases-peewee/

上述文件涵盖:

  • 添加Ayncio上下文感知连接状态跟踪

  • 每个请求的连接处理

其他框架

在这里看不到您的框架?拜托 open a GitHub ticket 我将看到如何添加一个部分,或者更好地说,提交一个文档请求。

正在执行查询

SQL查询通常通过调用 execute() 在使用查询生成器API构造的查询上(或者在 Select 查询)。对于希望直接执行SQL的情况,可以使用 Database.execute_sql() 方法。

db = SqliteDatabase('my_app.db')
db.connect()

# Example of executing a simple query and ignoring the results.
db.execute_sql("ATTACH DATABASE ':memory:' AS cache;")

# Example of iterating over the results of a query using the cursor.
cursor = db.execute_sql('SELECT * FROM users WHERE status = ?', (ACTIVE,))
for row in cursor.fetchall():
    # Do something with row, which is a tuple containing column data.
    pass

管理交易

Peewee提供了几个处理事务的接口。最普遍的是 Database.atomic() 方法,它还支持嵌套事务。 atomic() 块将在事务或保存点中运行,具体取决于嵌套的级别。

如果包装块中发生异常,则当前事务/保存点将回滚。否则,语句将在包装块的末尾提交。

备注

而在一块被 atomic() 上下文管理器,您可以通过调用 Transaction.rollback()Transaction.commit() . 当您在包装好的代码块内执行此操作时,将自动启动新事务。

with db.atomic() as transaction:  # Opens new transaction.
    try:
        save_some_objects()
    except ErrorSavingData:
        # Because this block of code is wrapped with "atomic", a
        # new transaction will begin automatically after the call
        # to rollback().
        transaction.rollback()
        error_saving = True

    create_report(error_saving=error_saving)
    # Note: no need to call commit. Since this marks the end of the
    # wrapped block of code, the `atomic` context manager will
    # automatically call commit for us.

备注

atomic() 可以用作 context manager 或A 装饰者.

备注

Peewee的行为与您可能习惯的DB-API2.0行为不同(有关详细信息,请参阅PEP-249)。默认情况下,Peewee将所有连接放入 autocommit-mode 交易管理由Peewee处理。

上下文管理器

使用 atomic 作为上下文管理器:

db = SqliteDatabase(':memory:')

with db.atomic() as txn:
    # This is the outer-most level, so this block corresponds to
    # a transaction.
    User.create(username='charlie')

    with db.atomic() as nested_txn:
        # This block corresponds to a savepoint.
        User.create(username='huey')

        # This will roll back the above create() query.
        nested_txn.rollback()

    User.create(username='mickey')

# When the block ends, the transaction is committed (assuming no error
# occurs). At that point there will be two users, "charlie" and "mickey".

你可以使用 atomic 执行方法 get or create 操作也是如此:

try:
    with db.atomic():
        user = User.create(username=username)
    return 'Success'
except peewee.IntegrityError:
    return 'Failure: %s is already in use.' % username

装饰者

使用 atomic 作为装饰者:

@db.atomic()
def create_user(username):
    # This statement will run in a transaction. If the caller is already
    # running in an `atomic` block, then a savepoint will be used instead.
    return User.create(username=username)

create_user('charlie')

嵌套事务

atomic() 提供事务的透明嵌套。使用时 atomic() ,最外部的调用将包装在事务中,并且任何嵌套调用都将使用保存点。

with db.atomic() as txn:
    perform_operation()

    with db.atomic() as nested_txn:
        perform_another_operation()

Peewee通过使用保存点支持嵌套事务(有关详细信息,请参阅 savepoint()

显式事务

如果希望在事务中显式运行代码,可以使用 transaction() . 喜欢 atomic()transaction() 可以用作上下文管理器或装饰器。

如果包装块中发生异常,则事务将回滚。否则,语句将在包装块的末尾提交。

db = SqliteDatabase(':memory:')

with db.transaction() as txn:
    # Delete the user and their associated tweets.
    user.delete_instance(recursive=True)

事务可以在包装的块中显式提交或回滚。发生这种情况时,将启动一个新事务。

with db.transaction() as txn:
    User.create(username='mickey')
    txn.commit()  # Changes are saved and a new transaction begins.
    User.create(username='huey')

    # Roll back. "huey" will not be saved, but since "mickey" was already
    # committed, that row will remain in the database.
    txn.rollback()

with db.transaction() as txn:
    User.create(username='whiskers')
    # Roll back changes, which removes "whiskers".
    txn.rollback()

    # Create a new row for "mr. whiskers" which will be implicitly committed
    # at the end of the `with` block.
    User.create(username='mr. whiskers')

备注

如果您尝试使用 transaction() 上下文管理器,则仅使用最外层的事务。如果嵌套块中发生异常,事务将不会回滚--只有冒泡到最外层事务的异常才会触发回滚。

由于这可能会导致不可预测的行为,因此建议您使用 atomic()

显式保存点

正如可以显式创建事务一样,也可以使用 savepoint() 方法。保存点必须出现在事务中,但可以任意深度嵌套。

with db.transaction() as txn:
    with db.savepoint() as sp:
        User.create(username='mickey')

    with db.savepoint() as sp2:
        User.create(username='zaizee')
        sp2.rollback()  # "zaizee" will not be saved, but "mickey" will be.

警告

如果手动提交或回滚保存点,则为新的保存点 will not 自动创建。这与 transaction ,这将在手动提交/回滚后自动打开新事务。

自动提交模式

默认情况下,Peewee在 自动提交模式, 这样,在事务外部执行的任何语句都在自己的事务中运行。要将多个语句分组为一个事务,peewee提供 atomic() 上下文管理器/装饰器。这应该涵盖所有用例,但在不太可能的情况下,您希望暂时完全禁用Peewee的事务管理,您可以使用 Database.manual_commit() 上下文管理器/装饰器。

以下是您如何模拟 transaction() 上下文管理器:

with db.manual_commit():
    db.begin()  # Have to begin transaction explicitly.
    try:
        user.delete_instance(recursive=True)
    except:
        db.rollback()  # Rollback! An error occurred.
        raise
    else:
        try:
            db.commit()  # Commit changes.
        except:
            db.rollback()
            raise

再说一遍——我没料到会有人需要这个,但它在这里以防万一。

数据库错误

python db-api 2.0规范描述了 several types of exceptions . 因为大多数数据库驱动程序都有它们自己的异常实现,Peewee通过在任何特定于实现的异常类周围提供自己的包装器来简化这些事情。这样,您就不必担心导入任何特殊的异常类,只需使用peewee中的类:

  • DatabaseError

  • DataError

  • IntegrityError

  • InterfaceError

  • InternalError

  • NotSupportedError

  • OperationalError

  • ProgrammingError

备注

所有这些错误类都扩展 PeeweeException .

日志查询

所有查询都记录到 peewee 使用标准库的命名空间 logging 模块。查询记录使用 DEBUG 水平。如果您对查询感兴趣,可以简单地注册一个处理程序。

# Print all queries to stderr.
import logging
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

添加新的数据库驱动程序

Peewee内置了对Postgres、MySQL和SQLite的支持。这些数据库非常流行,可以从快速、可嵌入的数据库到适合大规模部署的重量级服务器。也就是说,如果驱动程序支持 DB-API 2.0 spec .

警告

Peewee要求将数据库连接置于自动提交模式。

如果您已经使用了标准库sqlite3驱动程序、psycopg2等,那么DB-API 2.0规范应该对您很熟悉。Peewee目前依赖于以下几个部分:

  • Connection.commit

  • Connection.execute

  • Connection.rollback

  • Cursor.description

  • Cursor.fetchone

这些方法通常包含在更高级别的抽象中,并由 Database 所以,即使你的司机没有做到这些,你仍然可以得到很多里程的peewee。一个例子是 apsw sqlite driver 在“Playhouse”模块中。

第一件事是提供 Database 这将打开一个连接,并确保该连接处于自动提交模式(从而禁用所有DB-API事务语义):

from peewee import Database
import foodb  # Our fictional DB-API 2.0 driver.


class FooDatabase(Database):
    def _connect(self, database):
        return foodb.connect(self.database, autocommit=True, **self.connect_params)

这个 Database 提供更高级别的API,负责执行查询、创建表和索引,以及自省数据库以获取表列表。上面的实现是所需的绝对最小值,尽管有些功能不起作用——为了获得最佳结果,您需要另外添加一个方法,用于从数据库中提取表的表和索引列表。我们会假装的 FooDB 与MySQL非常相似,并且有特殊的“show”语句:

class FooDatabase(Database):
    def _connect(self):
        return foodb.connect(self.database, autocommit=True, **self.connect_params)

    def get_tables(self):
        res = self.execute('SHOW TABLES;')
        return [r[0] for r in res.fetchall()]

这里不介绍的数据库处理的其他内容包括:

  • last_insert_id() and rows_affected()

  • paramquote 它告诉SQL生成代码如何添加参数占位符和引用实体名称。

  • field_types 用于将int或text等数据类型映射到其供应商特定的类型名称。

  • operations 用于将“like/ilike”等操作映射到其等效数据库

参考 Database API参考或 source code . 详情。

备注

如果您的驱动程序符合DB-API 2.0规范,那么就不需要太多的工作来启动和运行。

我们的新数据库可以像其他任何数据库子类那样使用:

from peewee import *
from foodb_ext import FooDatabase

db = FooDatabase('my_database', user='foo', password='secret')

class BaseModel(Model):
    class Meta:
        database = db

class Blog(BaseModel):
    title = CharField()
    contents = TextField()
    pub_date = DateTimeField()