芹菜的后台任务¶
如果您的应用程序有一个长时间运行的任务,例如处理一些上载的数据或发送电子邮件,您不希望在请求期间等待它完成。相反,使用任务队列将必要的数据发送到另一个进程,该进程将在请求立即返回时在后台运行任务。
Celery 是一个强大的任务队列,可用于简单的后台任务以及复杂的多阶段程序和时间表。本指南将向您展示如何使用烧瓶来配置芹菜。读西芹的 First Steps with Celery 指南学习如何使用芹菜本身。
烧瓶储存库包含 an example 基于此页面上的信息,该页面还显示了如何使用JavaScript提交任务以及轮询进度和结果。
安装¶
从PYPI安装芹菜,例如使用pip:
$ pip install celery
将芹菜与瓶子融为一体¶
您可以使用芹菜而无需与Flask任何集成,但通过FlaskConfiger配置它并让任务访问FlaskTM应用程序会很方便。
芹菜使用了类似于烧瓶的想法,有一个 Celery
具有配置和注册任务的App对象。在创建Flaskapp的同时,还可以使用以下代码来创建和配置芹菜app。
from celery import Celery, Task
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
这将创建并返回一个 Celery
应用程序对象。芹菜 configuration 取自 CELERY
输入烧瓶配置。芹菜应用程序被设置为默认应用程序,以便在每次请求时都能看到它。这个 Task
SubClass在Flaskapp上下文处于活动状态的情况下自动运行任务功能,以便像您的数据库连接这样的服务可用。
这是一个基本的 example.py
这就将西芹配置为使用Redis进行通信。我们启用结果后端,但默认情况下忽略结果。这允许我们只存储我们关心结果的任务的结果。
from flask import Flask
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=True,
),
)
celery_app = celery_init_app(app)
指向 celery worker
命令,它就会找到 celery_app
对象。
$ celery -A example worker --loglevel INFO
您还可以运行 celery beat
命令按计划运行任务。有关定义时间表的更多信息,请参阅芹菜的文档。
$ celery -A example beat --loglevel INFO
应用程序工厂¶
使用FlaskTM应用程序工厂模式时,请调用 celery_init_app
在工厂内部运行。它设置了 app.extensions["celery"]
到芹菜APP对象,可以用来从工厂返回的FlaskAPP中获取芹菜APP。
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app
使用 celery
命令,芹菜需要一个应用程序对象,但这不再是直接可用的。创建 make_celery.py
调用FlaskApp工厂并从返回的Flaskapp中获取芹菜应用程序的文件。
from example import create_app
flask_app = create_app()
celery_app = flask_app.extensions["celery"]
指向 celery
命令添加到此文件。
$ celery -A make_celery worker --loglevel INFO
$ celery -A make_celery beat --loglevel INFO
定义任务¶
vbl.使用 @celery_app.task
若要修饰任务函数,需要访问 celery_app
对象,该对象在使用工厂模式时将不可用。这也意味着装饰的任务被绑定到特定的Flaskand芹菜应用程序实例,如果您更改测试的配置,这可能会在测试期间出现问题。
取而代之的是用芹菜 @shared_task
装饰师。这将创建任务对象,可以访问任何“当前应用程序”,这是一个类似于FlASK的蓝图和应用程序上下文的概念。这就是为什么我们打电话给 celery_app.set_default()
上面。
下面是一个将两个数字相加并返回结果的示例任务。
from celery import shared_task
@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
return a + b
早些时候,我们将Celery配置为默认忽略任务结果。由于我们想知道此任务的返回值,因此我们设置 ignore_result=False
。另一方面,一项不需要结果的任务,如发送电子邮件,不会设置这一点。
调用任务¶
修饰后的函数成为一个TASK对象,具有在后台调用它的方法。最简单的方法是使用 delay(*args, **kwargs)
方法。有关更多方法,请参阅芹菜的文档。
芹菜工人必须运行才能运行该任务。启动Worker的过程在前面的小节中介绍。
from flask import request
@app.post("/add")
def start_add() -> dict[str, object]:
a = request.form.get("a", type=int)
b = request.form.get("b", type=int)
result = add_together.delay(a, b)
return {"result_id": result.id}
该路线不会立即获得任务的结果。这将阻碍回应,从而违背目的。相反,我们返回正在运行的任务的结果id,稍后我们可以使用它来获得结果。
正在取得成果¶
为了获取我们在上面开始的任务的结果,我们将添加另一个路径,该路径采用我们之前返回的结果id。我们返回任务是否已完成(就绪)、是否已成功完成,以及如果已完成,返回值(或错误)是什么。
from celery.result import AsyncResult
@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
result = AsyncResult(id)
return {
"ready": result.ready(),
"successful": result.successful(),
"value": result.result if result.ready() else None,
}
现在,您可以使用第一个路由启动任务,然后使用第二个路由轮询结果。这使FlaskRequestWorker不会被阻止等待任务完成。
烧瓶储存库包含 an example 使用JavaScript提交任务并轮询进度和结果。
将数据传递给任务¶
上面的“添加”任务接受两个整数作为参数。要将参数传递给任务,Celery必须将它们序列化为可以传递给其他进程的格式。因此,不建议传递复杂对象。例如,传递一个SQLAlChemy模型对象是不可能的,因为该对象可能是不可序列化的,并且被绑定到查询它的会话。
传递获取或重新创建任务中的任何复杂数据所需的最少量数据。考虑一项任务,该任务将在登录用户请求其数据的存档时运行。FlASK请求知道登录的用户,并从数据库中查询User对象。它通过在数据库中查询给定的id来获得该ID,因此该任务可以执行相同的操作。传递用户的id,而不是用户对象。
@shared_task
def generate_user_archive(user_id: str) -> None:
user = db.session.get(User, user_id)
...
generate_user_archive.delay(current_user.id)