芹菜的后台任务

如果您的应用程序有一个长时间运行的任务,例如处理一些上载的数据或发送电子邮件,您不希望在请求期间等待它完成。相反,使用任务队列将必要的数据发送到另一个进程,该进程将在请求立即返回时在后台运行任务。

Celery 是一个强大的任务队列,可用于简单的后台任务以及复杂的多阶段程序和时间表。本指南将向您展示如何使用烧瓶来配置芹菜。读西芹的 First Steps with Celery 指南学习如何使用芹菜本身。

烧瓶储存库包含 an example 基于此页面上的信息,该页面还显示了如何使用JavaScript提交任务以及轮询进度和结果。

安装

从PYPI安装芹菜,例如使用pip:

$ pip install celery

将芹菜与瓶子融为一体

您可以使用芹菜而无需与Flask任何集成,但通过FlaskConfiger配置它并让任务访问FlaskTM应用程序会很方便。

芹菜使用了类似于烧瓶的想法,有一个 Celery 具有配置和注册任务的App对象。在创建Flaskapp的同时,还可以使用以下代码来创建和配置芹菜app。

from celery import Celery, Task

def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app

这将创建并返回一个 Celery 应用程序对象。芹菜 configuration 取自 CELERY 输入烧瓶配置。芹菜应用程序被设置为默认应用程序,以便在每次请求时都能看到它。这个 Task SubClass在Flaskapp上下文处于活动状态的情况下自动运行任务功能,以便像您的数据库连接这样的服务可用。

这是一个基本的 example.py 这就将西芹配置为使用Redis进行通信。我们启用结果后端,但默认情况下忽略结果。这允许我们只存储我们关心结果的任务的结果。

from flask import Flask

app = Flask(__name__)
app.config.from_mapping(
    CELERY=dict(
        broker_url="redis://localhost",
        result_backend="redis://localhost",
        task_ignore_result=True,
    ),
)
celery_app = celery_init_app(app)

指向 celery worker 命令,它就会找到 celery_app 对象。

$ celery -A example worker --loglevel INFO

您还可以运行 celery beat 命令按计划运行任务。有关定义时间表的更多信息,请参阅芹菜的文档。

$ celery -A example beat --loglevel INFO

应用程序工厂

使用FlaskTM应用程序工厂模式时,请调用 celery_init_app 在工厂内部运行。它设置了 app.extensions["celery"] 到芹菜APP对象,可以用来从工厂返回的FlaskAPP中获取芹菜APP。

def create_app() -> Flask:
    app = Flask(__name__)
    app.config.from_mapping(
        CELERY=dict(
            broker_url="redis://localhost",
            result_backend="redis://localhost",
            task_ignore_result=True,
        ),
    )
    app.config.from_prefixed_env()
    celery_init_app(app)
    return app

使用 celery 命令,芹菜需要一个应用程序对象,但这不再是直接可用的。创建 make_celery.py 调用FlaskApp工厂并从返回的Flaskapp中获取芹菜应用程序的文件。

from example import create_app

flask_app = create_app()
celery_app = flask_app.extensions["celery"]

指向 celery 命令添加到此文件。

$ celery -A make_celery worker --loglevel INFO
$ celery -A make_celery beat --loglevel INFO

定义任务

vbl.使用 @celery_app.task 若要修饰任务函数,需要访问 celery_app 对象,该对象在使用工厂模式时将不可用。这也意味着装饰的任务被绑定到特定的Flaskand芹菜应用程序实例,如果您更改测试的配置,这可能会在测试期间出现问题。

取而代之的是用芹菜 @shared_task 装饰师。这将创建任务对象,可以访问任何“当前应用程序”,这是一个类似于FlASK的蓝图和应用程序上下文的概念。这就是为什么我们打电话给 celery_app.set_default() 上面。

下面是一个将两个数字相加并返回结果的示例任务。

from celery import shared_task

@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
    return a + b

早些时候,我们将Celery配置为默认忽略任务结果。由于我们想知道此任务的返回值,因此我们设置 ignore_result=False 。另一方面,一项不需要结果的任务,如发送电子邮件,不会设置这一点。

调用任务

修饰后的函数成为一个TASK对象,具有在后台调用它的方法。最简单的方法是使用 delay(*args, **kwargs) 方法。有关更多方法,请参阅芹菜的文档。

芹菜工人必须运行才能运行该任务。启动Worker的过程在前面的小节中介绍。

from flask import request

@app.post("/add")
def start_add() -> dict[str, object]:
    a = request.form.get("a", type=int)
    b = request.form.get("b", type=int)
    result = add_together.delay(a, b)
    return {"result_id": result.id}

该路线不会立即获得任务的结果。这将阻碍回应,从而违背目的。相反,我们返回正在运行的任务的结果id,稍后我们可以使用它来获得结果。

正在取得成果

为了获取我们在上面开始的任务的结果,我们将添加另一个路径,该路径采用我们之前返回的结果id。我们返回任务是否已完成(就绪)、是否已成功完成,以及如果已完成,返回值(或错误)是什么。

from celery.result import AsyncResult

@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
    result = AsyncResult(id)
    return {
        "ready": result.ready(),
        "successful": result.successful(),
        "value": result.result if result.ready() else None,
    }

现在,您可以使用第一个路由启动任务,然后使用第二个路由轮询结果。这使FlaskRequestWorker不会被阻止等待任务完成。

烧瓶储存库包含 an example 使用JavaScript提交任务并轮询进度和结果。

将数据传递给任务

上面的“添加”任务接受两个整数作为参数。要将参数传递给任务,Celery必须将它们序列化为可以传递给其他进程的格式。因此,不建议传递复杂对象。例如,传递一个SQLAlChemy模型对象是不可能的,因为该对象可能是不可序列化的,并且被绑定到查询它的会话。

传递获取或重新创建任务中的任何复杂数据所需的最少量数据。考虑一项任务,该任务将在登录用户请求其数据的存档时运行。FlASK请求知道登录的用户,并从数据库中查询User对象。它通过在数据库中查询给定的id来获得该ID,因此该任务可以执行相同的操作。传递用户的id,而不是用户对象。

@shared_task
def generate_user_archive(user_id: str) -> None:
    user = db.session.get(User, user_id)
    ...

generate_user_archive.delay(current_user.id)