Release: 1.4.25 | Release Date: September 22, 2021

SQLAlchemy 1.4 Documentation

性能

如何分析一个由SQLAlchemy支持的应用程序?

寻找性能问题通常涉及两种策略。一个是查询分析,另一个是代码分析。

查询分析

有时只是简单的SQL日志记录(通过python的日志记录模块或通过 echo=True 争论 create_engine() )能告诉你事情需要多长时间。例如,如果您在一个SQL操作之后记录了一些东西,您会在日志中看到如下内容:

17:37:48,325 INFO  [sqlalchemy.engine.base.Engine.0x...048c] SELECT ...
17:37:48,326 INFO  [sqlalchemy.engine.base.Engine.0x...048c] {<params>}
17:37:48,660 DEBUG [myapp.somemessage]

如果你登录了 myapp.somemessage 在操作之后,您知道需要334ms才能完成SQL部分的工作。

日志SQL还将说明是否发出了几十个/数百个查询,这可以更好地组织成更少的查询。当使用SQLAlchemy ORM时,将提供“热切加载”功能,以部分地 (contains_eager() 或完全 (joinedload()subqueryload() )自动化此活动,但不使用ORM“热切加载”通常意味着使用联接,以便可以将多个表中的结果加载到一个结果集中,而不是随着添加的深度(即 r + r*r2 + r*r2*r3 ……)

为了对查询进行更长期的分析,或者实现应用程序端的“慢速查询”监视器,可以使用如下的方法,使用事件来截获光标执行:

from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
import logging

logging.basicConfig()
logger = logging.getLogger("myapp.sqltime")
logger.setLevel(logging.DEBUG)

@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())
    logger.debug("Start Query: %s", statement)

@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    total = time.time() - conn.info['query_start_time'].pop(-1)
    logger.debug("Query Complete!")
    logger.debug("Total Time: %f", total)

上面,我们使用 ConnectionEvents.before_cursor_execute()ConnectionEvents.after_cursor_execute() 事件在执行语句时建立一个截取点。我们使用 info dictionary;我们在这里使用一个堆栈来处理光标执行事件可能被嵌套的偶然情况。

代码性能测试

如果日志记录显示单个查询花费的时间太长,则需要对数据库中处理查询、通过网络发送结果以及由 DBAPI 最后被SQLAlchemy的结果集和/或ORM层接收。根据具体情况,这些阶段中的每个阶段都可以呈现各自的瓶颈。

为此,您需要使用 Python Profiling Module . 下面是一个简单的方法,它可以将分析工作放到上下文管理器中:

import cProfile
import io
import pstats
import contextlib

@contextlib.contextmanager
def profiled():
    pr = cProfile.Profile()
    pr.enable()
    yield
    pr.disable()
    s = io.StringIO()
    ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
    ps.print_stats()
    # uncomment this to see who's calling what
    # ps.print_callers()
    print(s.getvalue())

要分析代码段:

with profiled():
    Session.query(FooClass).filter(FooClass.somevalue==8).all()

分析的输出可以用来给出时间花在什么地方。分析输出的一部分如下所示:

13726 function calls (13042 primitive calls) in 0.014 seconds

Ordered by: cumulative time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
222/21    0.001    0.000    0.011    0.001 lib/sqlalchemy/orm/loading.py:26(instances)
220/20    0.002    0.000    0.010    0.001 lib/sqlalchemy/orm/loading.py:327(_instance)
220/20    0.000    0.000    0.010    0.000 lib/sqlalchemy/orm/loading.py:284(populate_state)
   20    0.000    0.000    0.010    0.000 lib/sqlalchemy/orm/strategies.py:987(load_collection_from_subq)
   20    0.000    0.000    0.009    0.000 lib/sqlalchemy/orm/strategies.py:935(get)
    1    0.000    0.000    0.009    0.009 lib/sqlalchemy/orm/strategies.py:940(_load)
   21    0.000    0.000    0.008    0.000 lib/sqlalchemy/orm/strategies.py:942(<genexpr>)
    2    0.000    0.000    0.004    0.002 lib/sqlalchemy/orm/query.py:2400(__iter__)
    2    0.000    0.000    0.002    0.001 lib/sqlalchemy/orm/query.py:2414(_execute_and_instances)
    2    0.000    0.000    0.002    0.001 lib/sqlalchemy/engine/base.py:659(execute)
    2    0.000    0.000    0.002    0.001 lib/sqlalchemy/sql/elements.py:321(_execute_on_connection)
    2    0.000    0.000    0.002    0.001 lib/sqlalchemy/engine/base.py:788(_execute_clauseelement)

...

上面,我们可以看到 instances() sqlAlchemy函数被调用222次(递归调用,从外部调用21次),所有调用的总时间为0.011秒。

执行缓慢

这些电话的细节可以告诉我们时间花在哪里。例如,如果您看到 cursor.execute() 例如,针对DBAPI:

2    0.102    0.102    0.204    0.102 {method 'execute' of 'sqlite3.Cursor' objects}

这表明数据库开始返回结果需要很长时间,这意味着应该通过添加索引或重新构造查询和/或基础架构来优化查询。对于该任务,可以使用数据库后端提供的解释、显示计划等系统对查询计划进行分析。

结果获取慢 - 核心

另一方面,如果您看到数千个与提取行相关的调用,或者非常长的调用 fetchall() ,这可能意味着您的查询返回的行比预期的多,或者提取行本身的速度很慢。ORM本身通常使用 fetchall() 提取行(或 fetchmany() 如果 Query.yield_per() 使用选项)。

如果调用的速度非常慢,则会显示大量行。 fetchall() 在DBAPI级别:

2    0.300    0.600    0.300    0.600 {method 'fetchall' of 'sqlite3.Cursor' objects}

当多组行组合在一起而不适当地将表连接在一起时,即使最终结果似乎没有多行,也可能是笛卡尔积的结果。如果错误的话,使用sqlacalchemy core或orm查询通常很容易产生这种行为 Column 对象用于复杂的查询中,从意外的子句中拉入其他的。

另一方面,快速呼叫 fetchall() 在DBAPI级别,但当SQLAlchemy的 CursorResult 被要求做一个 fetchall() ,可能表示数据类型处理速度慢,例如Unicode转换和类似:

# the DBAPI cursor is fast...
2    0.020    0.040    0.020    0.040 {method 'fetchall' of 'sqlite3.Cursor' objects}

...

# but SQLAlchemy's result proxy is slow, this is type-level processing
2    0.100    0.200    0.100    0.200 lib/sqlalchemy/engine/result.py:778(fetchall)

在某些情况下,后端可能正在执行不需要的类型级处理。更具体地说,在类型API中看到速度较慢的调用是更好的指示器-下面是我们使用类似类型时的情况:

from sqlalchemy import TypeDecorator
import time

class Foo(TypeDecorator):
    impl = String

    def process_result_value(self, value, thing):
        # intentionally add slowness for illustration purposes
        time.sleep(.001)
        return value

此故意缓慢操作的分析输出如下所示:

200    0.001    0.000    0.237    0.001 lib/sqlalchemy/sql/type_api.py:911(process)
200    0.001    0.000    0.236    0.001 test.py:28(process_result_value)
200    0.235    0.001    0.235    0.001 {time.sleep}

也就是说,我们看到许多昂贵的电话 type_api 系统,而实际耗时的事情是 time.sleep() 打电话。

确保检查 Dialect documentation 有关此级别的已知性能优化建议的说明,尤其是对于Oracle等数据库。有些系统可能与确保数字精度或字符串处理相关,但并非所有情况下都需要。

也可能会有更多的低级点,在这些点上,行提取性能会受到影响;例如,如果花费的时间似乎集中在像这样的调用上 socket.receive() 这可能表明除了实际的网络连接之外,一切都很快,而且在网络上移动数据花费了太多时间。

结果获取慢度-ORM

为了检测ORM获取行的速度慢(这是性能问题最常见的方面),调用如下 populate_state()_instance() 将说明单个ORM对象总体:

# the ORM calls _instance for each ORM-loaded row it sees, and
# populate_state for each ORM-loaded row that results in the population
# of an object's attributes
220/20    0.001    0.000    0.010    0.000 lib/sqlalchemy/orm/loading.py:327(_instance)
220/20    0.000    0.000    0.009    0.000 lib/sqlalchemy/orm/loading.py:284(populate_state)

ORM将行转换为ORM映射对象的速度缓慢,这是此操作复杂性和cpython开销的结果。缓解这一问题的常用策略包括:

  • 获取单个列而不是完整实体,即:

    session.query(User.id, User.name)

    而不是::

    session.query(User)
  • 使用 Bundle 要组织基于列的结果的对象::

    u_b = Bundle('user', User.id, User.name)
    a_b = Bundle('address', Address.id, Address.email)
    
    for user, address in session.query(u_b, a_b).join(User.addresses):
        # ...
  • 使用结果缓存-请参阅 狗堆缓存 这是一个深入的例子。

  • 考虑一下像Pypy这样更快的解释程序。

一个概要文件的输出可能有点令人生畏,但是经过一些实践,它们非常容易阅读。

参见

性能 -一套具有捆绑分析功能的性能演示。

我用ORM插入了400000行,速度非常慢!

sqlAlchemy ORM使用 unit of work 将更改同步到数据库时的模式。这种模式远远超出了简单的“插入”数据。它包括在对象上分配的属性是使用属性检测系统接收的,该系统跟踪对象所做的更改,包括在标识映射中跟踪插入的所有行,该标识映射的作用是,对于每一行,如果尚未给定,则SQLAlchemy必须检索其“最后插入的ID”,并且还调用对要插入的行进行扫描,并根据需要对依赖项进行排序。为了保持所有这些操作的正常运行,对象还需要进行一定程度的簿记,对于非常多的行,一次就可以在大的数据结构上花费大量的时间,因此最好将这些内容分成块。

基本上,工作单元是一个很大程度的自动化,以便自动化将复杂对象图持久化到关系数据库中的任务,而不需要显式持久化代码,这种自动化是有代价的。

ORM基本上不适用于高性能的大容量插入——这就是sqlacalchemy除了作为一流组件提供ORM之外还提供核心的全部原因。

对于快速批量插入的用例,ORM构建在其上的SQL生成和执行系统是 Core . 直接使用这个系统,我们可以生成一个与直接使用原始数据库API相竞争的插入。

注解

使用psycopg2方言时,请考虑使用 batch execution helpers psycopg2的特性,现在由sqlacalchemy psycopg2方言直接支持。

或者,SQLAlchemy ORM提供 散装作业 一套方法,它提供到工作流程单元的子部分的钩子,以便用少量基于ORM的自动化来发出核心级别的插入和更新构造。

下面的示例说明了几种不同的插入行方法的基于时间的测试,从自动化程度最高到自动化程度最低。使用cPython时,观察到的运行时间如下:

Python: 3.8.12 | packaged by conda-forge | (default, Sep 29 2021, 19:42:05)  [Clang 11.1.0 ]
sqlalchemy v1.4.22 (future=True)
SQLA ORM:
        Total time for 100000 records 5.722 secs
SQLA ORM pk given:
        Total time for 100000 records 3.781 secs
SQLA ORM bulk_save_objects:
        Total time for 100000 records 1.385 secs
SQLA ORM bulk_save_objects, return_defaults:
        Total time for 100000 records 3.858 secs
SQLA ORM bulk_insert_mappings:
        Total time for 100000 records 0.472 secs
SQLA ORM bulk_insert_mappings, return_defaults:
        Total time for 100000 records 2.840 secs
SQLA Core:
        Total time for 100000 records 0.246 secs
sqlite3:
        Total time for 100000 records 0.153 secs

使用最新版本的,我们可以将时间减少到原来的1/3 PyPy ::

Python: 3.7.10 | packaged by conda-forge | (77787b8f, Sep 07 2021, 14:06:31) [PyPy 7.3.5 with GCC Clang 11.1.0]
sqlalchemy v1.4.25 (future=True)
SQLA ORM:
        Total time for 100000 records 2.976 secs
SQLA ORM pk given:
        Total time for 100000 records 1.676 secs
SQLA ORM bulk_save_objects:
        Total time for 100000 records 0.658 secs
SQLA ORM bulk_save_objects, return_defaults:
        Total time for 100000 records 1.158 secs
SQLA ORM bulk_insert_mappings:
        Total time for 100000 records 0.403 secs
SQLA ORM bulk_insert_mappings, return_defaults:
        Total time for 100000 records 0.976 secs
SQLA Core:
        Total time for 100000 records 0.241 secs
sqlite3:
        Total time for 100000 records 0.128 secs

脚本::

import contextlib
import sqlite3
import sys
import tempfile
import time

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import __version__, Column, Integer, String, create_engine, insert
from sqlalchemy.orm import Session

Base = declarative_base()


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


@contextlib.contextmanager
def sqlalchemy_session(future):
    with tempfile.NamedTemporaryFile(suffix=".db") as handle:
        dbpath = handle.name
        engine = create_engine(f"sqlite:///{dbpath}", future=future, echo=False)
        session = Session(
            bind=engine, future=future, autoflush=False, expire_on_commit=False
        )
        Base.metadata.create_all(engine)
        yield session
        session.close()


def print_result(name, nrows, seconds):
    print(f"{name}:\n{' '*10}Total time for {nrows} records {seconds:.3f} secs")


def test_sqlalchemy_orm(n=100000, future=True):
    with sqlalchemy_session(future) as session:
        t0 = time.time()
        for i in range(n):
            customer = Customer()
            customer.name = "NAME " + str(i)
            session.add(customer)
            if i % 1000 == 0:
                session.flush()
        session.commit()
        print_result("SQLA ORM", n, time.time() - t0)


def test_sqlalchemy_orm_pk_given(n=100000, future=True):
    with sqlalchemy_session(future) as session:
        t0 = time.time()
        for i in range(n):
            customer = Customer(id=i + 1, name="NAME " + str(i))
            session.add(customer)
            if i % 1000 == 0:
                session.flush()
        session.commit()
        print_result("SQLA ORM pk given", n, time.time() - t0)


def test_sqlalchemy_orm_bulk_save_objects(n=100000, future=True, return_defaults=False):
    with sqlalchemy_session(future) as session:
        t0 = time.time()
        for chunk in range(0, n, 10000):
            session.bulk_save_objects(
                [
                    Customer(name="NAME " + str(i))
                    for i in range(chunk, min(chunk + 10000, n))
                ],
                return_defaults=return_defaults,
            )
        session.commit()
        print_result(
            f"SQLA ORM bulk_save_objects{', return_defaults' if return_defaults else ''}",
            n,
            time.time() - t0,
        )


def test_sqlalchemy_orm_bulk_insert(n=100000, future=True, return_defaults=False):
    with sqlalchemy_session(future) as session:
        t0 = time.time()
        for chunk in range(0, n, 10000):
            session.bulk_insert_mappings(
                Customer,
                [
                    dict(name="NAME " + str(i))
                    for i in range(chunk, min(chunk + 10000, n))
                ],
                return_defaults=return_defaults,
            )
        session.commit()
        print_result(
            f"SQLA ORM bulk_insert_mappings{', return_defaults' if return_defaults else ''}",
            n,
            time.time() - t0,
        )


def test_sqlalchemy_core(n=100000, future=True):
    with sqlalchemy_session(future) as session:
        with session.bind.begin() as conn:
            t0 = time.time()
            conn.execute(
                insert(Customer.__table__),
                [{"name": "NAME " + str(i)} for i in range(n)],
            )
            conn.commit()
            print_result("SQLA Core", n, time.time() - t0)


@contextlib.contextmanager
def sqlite3_conn():
    with tempfile.NamedTemporaryFile(suffix=".db") as handle:
        dbpath = handle.name
        conn = sqlite3.connect(dbpath)
        c = conn.cursor()
        c.execute("DROP TABLE IF EXISTS customer")
        c.execute(
            "CREATE TABLE customer (id INTEGER NOT NULL, "
            "name VARCHAR(255), PRIMARY KEY(id))"
        )
        conn.commit()
        yield conn


def test_sqlite3(n=100000):
    with sqlite3_conn() as conn:
        c = conn.cursor()
        t0 = time.time()
        for i in range(n):
            row = ("NAME " + str(i),)
            c.execute("INSERT INTO customer (name) VALUES (?)", row)
        conn.commit()
        print_result("sqlite3", n, time.time() - t0)


if __name__ == "__main__":
    rows = 100000
    _future = True
    print(f"Python: {' '.join(sys.version.splitlines())}")
    print(f"sqlalchemy v{__version__} (future={_future})")
    test_sqlalchemy_orm(rows, _future)
    test_sqlalchemy_orm_pk_given(rows, _future)
    test_sqlalchemy_orm_bulk_save_objects(rows, _future)
    test_sqlalchemy_orm_bulk_save_objects(rows, _future, True)
    test_sqlalchemy_orm_bulk_insert(rows, _future)
    test_sqlalchemy_orm_bulk_insert(rows, _future, True)
    test_sqlalchemy_core(rows, _future)
    test_sqlite3(rows)
Previous: ORM配置 Next: 会话/查询