Release: 1.4.25 | Release Date: September 22, 2021

SQLAlchemy 1.4 Documentation

连接/引擎

如何配置日志记录?

配置日志记录 .

如何池数据库连接?我的关系合并了吗?

在大多数情况下,SQLAlchemy自动执行应用程序级连接池。除了sqlite,a Engine 对象引用了 QueuePool 作为连接的来源。

有关详细信息,请参阅 引擎配置连接池 .

如何将自定义连接参数传递到数据库API?

这个 create_engine() 调用接受其他参数,或者直接通过 connect_args 关键字参数:

e = create_engine("mysql://scott:tiger@localhost/test",
                    connect_args={"encoding": "utf8"})

或者对于基本的字符串和整数参数,通常可以在URL的查询字符串中指定它们:

e = create_engine("mysql://scott:tiger@localhost/test?encoding=utf8")

“MySQL服务器已经消失”

此错误的主要原因是MySQL连接已超时,并且已被服务器关闭。mysql服务器关闭空闲一段时间的连接,默认为8小时。为了适应这种情况,立即设置为启用 create_engine.pool_recycle 设置,这将确保早于设置的秒数的连接将被丢弃,并在下次签出时替换为新连接。

对于适应数据库重新启动和其他由于网络问题导致的临时连接丢失的更一般的情况,池中的连接可以根据更普遍的断开检测技术进行回收。断面 处理断开连接 提供“悲观”(如预ping)和“乐观”(如优雅恢复)技术的背景。现代的SQL炼金术倾向于“悲观”的方法。

“命令不同步;现在不能运行此命令”/“此结果对象不返回行。它已自动关闭”

MySQL驱动程序有相当广泛的故障模式,其中到服务器的连接状态处于无效状态。通常,当再次使用连接时,会出现这两条错误消息之一。原因是服务器的状态已更改为客户机库不期望的状态,这样当客户机库在连接上发出新语句时,服务器不会按预期响应。

在sqlAlchemy中,由于数据库连接被合并,连接上的消息传递不同步的问题变得更加重要,因为当一个操作失败时,如果连接本身处于不可用状态,如果它返回到连接池,当再次签出时,它将出现故障。这个问题的缓解措施是 失效的 当出现这样的故障模式时,将放弃与MySQL的底层数据库连接。对于许多已知的故障模式,这种失效会自动发生,也可以通过 Connection.invalidate() 方法。

在这个类别中还有第二类失败模式,其中上下文管理器如 with session.begin_nested(): 希望在发生错误时“回滚”事务;但是在连接的某些失败模式中,回滚本身(也可以是释放保存点操作)也会失败,从而导致堆栈跟踪误导。

最初,这个错误的原因相当简单,它意味着多线程程序正在从多个线程对单个连接调用命令。这适用于原始的“mysqldb”本机C驱动程序,它几乎是唯一使用的驱动程序。但是,随着pymysql和mysql connector python等纯python驱动程序的引入,以及gevent/eventlet、多处理(通常使用celery)等工具的使用量的增加,导致这个问题的因素有一系列,其中一些已经在sqlachemy版本中得到了改进,但是其他的一些已经在sqlachemy版本中得到了改进。不可避免的风险:

  • 在线程之间共享连接 -这就是发生这些错误的原始原因。一个程序同时在两个或多个线程中使用相同的连接,这意味着连接上的多组消息混合在一起,使服务器端会话处于客户机不再知道如何解释的状态。然而,其他原因在今天通常更为可能。

  • 共享进程间连接的文件句柄 -这通常发生在程序使用 os.fork() 为了产生一个新进程,父进程中存在的TCP连接将被共享到一个或多个子进程中。由于多个进程现在向基本相同的filehandle发送消息,服务器接收交错消息并中断连接状态。

    如果程序使用python的“多处理”模块并使用 Engine 在父进程中创建的。在使用芹菜之类的工具时,“多处理”很常见。正确的方法应该是 Engine 在子进程首次启动时生成,丢弃任何 Engine 来自父进程的;或 Engine 从父进程继承的可以通过调用释放其内部连接池 Engine.dispose() .

  • Greenlet Monkeypatching w/ Exits -当使用像gevent或eventlet这样的库(monkeypatches the python networking api)时,pymysql这样的库现在正以异步操作模式工作,即使它们不是针对该模型显式开发的。一个常见的问题是一个greenthread被中断,通常是由于应用程序中的超时逻辑。这导致 GreenletExit 引发异常,纯python mysql驱动程序的工作被中断,这可能是因为它正在从服务器接收响应,或者正在准备以其他方式重置连接状态。当异常将所有工作时间缩短时,客户机和服务器之间的对话现在不同步,后续的连接使用可能会失败。从1.1.0版开始的SQLAlchemy知道如何防范这种情况,就像数据库操作被一个所谓的“退出异常”中断一样,其中包括 GreenletExit 以及python的任何其他子类 BaseException 这也不是 Exception ,连接无效。

  • Rollbacks / SAVEPOINT releases failing -某些类型的错误会导致连接在事务上下文中以及在“保存点”块中操作时不可用。在这些情况下,连接失败导致任何保存点不再存在,但是当SQLAlchemy或应用程序尝试“回滚”此保存点时,“释放保存点”操作失败,通常会显示一条消息,如“保存点不存在”。在这种情况下,在python 3下将有一系列异常输出,其中也将显示错误的最终“原因”。在python 2下,没有“链接”异常,但是最近版本的sqlAlchemy将尝试发出一个警告,说明原始失败原因,同时仍然抛出即时错误,即回滚失败。

如何自动“重试”语句执行?

文件科 处理断开连接 讨论自上次签出特定连接后已断开连接的池连接的可用策略。这方面最现代的特点是 create_engine.pre_ping 参数,该参数允许在从池中检索数据库连接时对其发出“ping”,并在当前连接已断开时重新连接。

需要注意的是,这个“ping”只是发出的 之前 连接实际上用于操作。一旦连接被传递到调用方,根据Python DBAPI 它现在受 汽车域 操作,这意味着它将在第一次使用时自动开始一个新事务,该事务对后续语句仍然有效,直到DBAPI级别 connection.commit()connection.rollback() 方法被调用。

如上所述 库级(例如仿真)自动提交 ,有一个库级别的“自动提交”功能在1.4中已被弃用,这导致 DMLDDL 在执行单个语句后自动提交的执行;但是,在这种不推荐的情况下,SQLAlchemy的现代用法在所有情况下都可以处理此事务,除非明确要求提交,否则不会提交任何数据。

在ORM级别,类似于ORM Session 对象还表示存在旧的“自动提交”操作;但是,即使使用此旧操作模式 Session 仍然在内部使用事务,特别是在 Session.flush() 过程。

在这个例子中,当这个陈述失去了联系时, 整个事务丢失 . 由于数据已经丢失,因此没有任何有用的方法可以让数据库“重新连接并重试”并在停止的地方继续。出于这个原因,SQLAlchemy没有一个透明的“重新连接”特性,它可以在事务处理过程中工作,以防数据库连接在使用时断开连接。处理操作中断的标准方法是 从事务开始处重试整个操作 ,通常使用Python“retry”装饰器,或者以其他方式构建应用程序,使其能够抵御被丢弃的事务。

还有一种扩展的概念,它可以跟踪事务中执行的所有语句,然后在新事务中重放它们,以便近似于“重试”操作。SQL炼金术 event system 允许这样一个系统被构建,但是这种方法也不是一般有用的,因为没有办法保证这些 DML 语句将针对相同的状态工作,因为一旦事务结束,新事务中数据库的状态可能会完全不同。在事务操作开始和提交时,将“重试”显式地架构到应用程序中仍然是更好的方法,因为应用程序级事务方法最了解如何重新运行它们的步骤。

否则,如果SQLAlchemy要提供一个透明且无提示地在事务中“重新连接”一个连接的特性,那么结果就是数据会悄无声息地丢失。通过试图隐藏问题,SQLAlchemy会使情况变得更糟。

但是,如果我们是 not 如果使用事务,则有更多可用的选项,如下一节所述。

使用DBAPI Autocommit允许透明重新连接的只读版本

由于没有说明透明的重新连接机制的理由,上一节基于这样一个假设:应用程序实际上正在使用DBAPI级别的事务。正如大多数dbapi现在提供的那样 native "autocommit" settings ,我们可以利用这些功能为 只读、仅自动提交操作 . 可将透明语句重试应用于 cursor.execute() 方法,但它仍然不安全地应用于 cursor.executemany() 方法,因为该语句可能已使用给定参数的任何部分。

警告

以下配方应该 not 用于写入数据的操作。在正式使用这个配方之前,用户应该仔细阅读并理解配方是如何工作的,并针对特定目标的DBAPI驱动程序非常仔细地测试故障模式。重试机制并不能保证在所有情况下都能防止断开连接错误。

简单的重试机制可以应用到DBAPI级别 cursor.execute() 方法利用 DialectEvents.do_execute()DialectEvents.do_execute_no_params() 钩子,它将能够在语句执行期间拦截断开连接。会的 not 对于那些没有完全缓冲结果集的dbapi,在结果集获取操作期间拦截连接失败。配方要求数据库支持DBAPI级别的autocommit,并且 不保证 对于特定的后端。单一功能 reconnecting_engine() 将事件钩子应用于给定的 Engine 对象,返回启用DBAPI级自动提交的always autocommit版本。对于单参数和无参数语句执行,连接将透明地重新连接:

import time

from sqlalchemy import event


def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(
                    raw_dbapi_err, connection, cursor
                ):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e

根据上面的方法,可以使用下面的概念验证脚本演示重新连接中间事务。一旦运行,它将发出 SELECT 1 每隔5秒向数据库发送语句::

from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":

    engine = create_engine("mysql://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine(
            "mysql://scott:tiger@localhost/test", echo_pool=True
        ),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)

在脚本运行时重新启动数据库以演示透明的重新连接操作:

$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

上面的配方在SQLAlchemy 1.4中进行了测试。

为什么SQLAlchemy会发布这么多回滚?

SQLAlChemy当前假设DBAPI连接处于“非自动提交”模式-这是Python数据库API的默认行为,这意味着必须假设事务始终在进行中。连接池问题 connection.rollback() 当返回连接时。这是为了释放连接上剩余的任何事务资源。在PostgreSQL或MSSQL等表资源被积极锁定的数据库上,这一点至关重要,这样行和表才不会在不再使用的连接中保持锁定。否则,应用程序可能会挂起。然而,它不仅适用于锁,而且对于具有任何类型的事务隔离的任何数据库(包括带有InnoDB的MySQL)都同样重要。任何仍在旧事务内的连接都将返回陈旧数据,前提是该数据已在隔离内的该连接上进行了查询。有关即使在mysql上也可能看到陈旧数据的背景信息,请参阅https://dev.mysql.com/doc/refman/5.1/en/innodb-transaction-model.html

我在Myisam上-我怎么关掉它?

连接池的连接返回行为可以使用配置 reset_on_return ::

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine('mysql://scott:tiger@localhost/myisam_database', pool=QueuePool(reset_on_return=False))

我在SQL Server上-如何将这些回滚转换为提交?

reset_on_return 接受值 commitrollback 除了 TrueFalseNone . 设置为 commit 将导致提交,因为任何连接都将返回池::

engine = create_engine('mssql://scott:tiger@mydsn', pool=QueuePool(reset_on_return='commit'))

我正在使用与一个sqlite数据库的多个连接(通常用于测试事务操作),而我的测试程序不工作!

如果使用sqlite :memory: 数据库或0.7版之前的SQLAlchemy版本,默认连接池是 SingletonThreadPool ,每个线程只维护一个SQLite连接。所以在同一线程中使用的两个连接实际上是同一个sqlite连接。确保您没有使用 :memory: database and use NullPool 这是当前SQLAlchemy版本中非内存数据库的默认值。

参见

线程/池行为 -关于pysqlite行为的信息。

使用引擎时,如何获得原始DBAPI连接?

使用常规的SA引擎级连接,您可以通过以下方式获得DBAPI连接的池代理版本 Connection.connection 属性打开 Connection 对于真正的DBAPI连接,您可以调用 _ConnectionFairy.dbapi_connection 属性。在常规同步驱动程序上,通常不需要访问非池代理的DBAPI连接,因为所有方法都通过::

engine = create_engine(...)
conn = engine.connect()

# pep-249 style ConnectionFairy connection pool proxy object
connection_fairy = conn.connection

# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj

# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection

# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection

在 1.4.24 版更改: 添加了 _ConnectionFairy.dbapi_connection 属性,该属性将取代以前的 _ConnectionFairy.connection 属性,该属性仍然可用;该属性始终提供PEP-249同步样式连接对象。这个 _ConnectionFairy.driver_connection 属性,该属性将始终引用真正的驱动程序级连接,而不管它提供什么API。

访问异步CIO驱动程序的基础连接

当使用异步驱动程序时,上述方案有两个更改。第一个是在使用 AsyncConnection ,即 _ConnectionFairy 必须使用可等待的方法访问 AsyncConnection.get_raw_connection() 。返回者 _ConnectionFairy 在这种情况下,保留同步样式的PEP-249使用模式,并且 _ConnectionFairy.dbapi_connection 属性引用SQLAlChemy适配的Connection对象,该对象将异步连接适配为同步样式的PEP-249API,换言之,有 two 使用异步驱动程序时正在进行的代理级别。实际的异步连接可从 driver_connection 属性。在异步方面重申前面的示例,如下所示:

async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

在 1.4.24 版更改: 添加了 _ConnectionFairy.dbapi_connection_ConnectionFairy.driver_connection 属性,以允许使用一致的接口访问PEP-249连接、PEP-249适配层和底层驱动程序连接。

当使用异步驱动程序时,上面的“DBAPI”连接实际上是一种SQLAlChemy适配的连接形式,它呈现同步样式的PEP-249样式的API。若要访问实际的异步驱动程序连接,该连接将显示正在使用的驱动程序的原始异步cio API,可以通过 _ConnectionFairy.driver_connection 的属性 _ConnectionFairy 。对于标准PEP-249驱动器, _ConnectionFairy.dbapi_connection_ConnectionFairy.driver_connection 是同义词。

在将连接恢复到池之前,必须确保将连接上的任何隔离级别设置或其他操作特定设置恢复为正常设置。

作为恢复设置的替代方法,您可以调用 Connection.detach() 方法 Connection 或代理连接,它将取消与池中的连接的关联,以便在 Connection.close() 被称为:

conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

如何将引擎/连接/会话与python multiprocessing或os.fork()一起使用?

这在本节中有介绍 将连接池用于多处理或os.fork操作系统() .

Previous: 常见问题 Next: 元数据/架构