scheduler 模块

此文件是Web2py Web框架的一部分
版权所有:Massimo di Pierro<mdipierro@cs.depaul.edu>

简化了后台进程

class gluon.scheduler.CronParser(cronline, base=None)[源代码]

基类:object

next()[源代码]

根据规格获取下一个日期。

class gluon.scheduler.IS_CRONLINE(error_message=None)[源代码]

基类:object

验证cronline

class gluon.scheduler.JobGraph(db, job_name)[源代码]

基类:object

实验性:任务的依赖性。

add_deps(task_parent, task_child)[源代码]

在任务父级和任务子级之间创建依赖项。

validate(job_name=None)[源代码]

验证是否可以完成所有任务作业名称。

检查任务之间是否没有相互依赖关系。如果成功,则在结束时提交,否则将回滚整个事务。小心轻放!

class gluon.scheduler.Scheduler(db, tasks=None, migrate=True, worker_name=None, group_names=None, heartbeat=3, max_empty_runs=0, discard_results=False, utc_time=False, use_spawn=False)[源代码]

基类:threading.Thread

调度程序对象

参数
  • db -- 调度程序将创建其表的DAL连接

  • tasks (dict) -- 包含name->func或none的dict。如果没有,将在环境中搜索函数

  • migrate (bool) -- 打开/关闭计划程序表的迁移

  • worker_name (str) -- 强制worker_name标识每个进程。将其保留为“无”以自动分配名称(主机名PID)

  • group_names (list) -- 处理属于此组的任务默认为 [“主要”] 如果什么都没有通过

  • heartbeat (int) -- 工人在一次执行和下一次执行之间休眠多少秒。间接设置新任务检查之间经过的秒数

  • max_empty_runs (int) -- 在退出进程之前,允许多少个循环在不处理任何任务的情况下通过。0以使进程始终处于活动状态

  • discard_results (bool) -- 调度程序将执行的详细信息存储到调度程序运行表中。默认情况下,只有在有结果时,才会保留详细信息。如果将此设置为真,则意味着即使对于返回某些内容的任务,也会丢弃结果。

  • utc_time (bool) -- 所有的日期时间计算都假定UTC是时区。记得传球 start_timestop_time 相应的任务

  • use_spawn (bool) -- 对子进程使用spawn(仅适用于python3)

adj_hibernation()[源代码]

用于增加残疾工人的“睡眠”间隔。

assign_tasks()[源代码]

将任务分配给工人,然后将他们从队列中弹出。

处理组名逻辑,以便将线性任务分配给这些组的可用工作人员

being_a_ticker()[源代码]

选择一个为可用工人分配任务的断续器流程。

尽可能选择一个不忙于处理其他任务的工作人员,以便尽快在所有活动工作人员中正确分配任务

define_tables(db, migrate)[源代码]

定义调度程序表结构。

die()[源代码]

强制终止工作进程以及任何正在运行的任务

disable(group_names=None, limit=None, worker_name=None)[源代码]

在工作进程上设置为禁用 group_names 任务。

一个残废的工人将保持生命,但它将无法处理任何等待的任务,基本上是把它置于睡眠状态。默认情况下,将选择计划程序实例化的所有组名

execute(task)[源代码]

启动后台进程。

参数

task -- 一 Task 对象

返回

TaskReport 对象

get_workers(only_ticker=False)[源代码]

返回听写保持 worker_name : {{**columns}} 仅代表所有“已注册”工人,如果有,Ticker只返回作为Ticker运行的工人

give_up()[源代码]

等待执行任何正在运行的任务,然后退出工作进程

kill(group_names=None, limit=None, worker_name=None)[源代码]

将kill设置为worker状态。即使工人正在处理一项任务,也会被杀。

loop(worker_name=None)[源代码]

主回路。

这基本上是一个不断循环:

  • 检查工作人员是否准备好处理任务(未禁用)

  • 从队列中弹出任务

  • 如果有任务:

    • 生成执行器后台进程

    • 等待进程完成

    • 睡觉 heartbeat

  • 如果没有任务:

    • 检查最大空运行次数

    • 睡觉 heartbeat

now()[源代码]

基于UTC首选项获取当前时间的快捷方式。

pop_task()[源代码]

从队列中获取准备执行的任务。

queue_task(function, pargs=[], pvars={}, **kwargs)[源代码]

队列任务。这将处理所有参数的验证。

参数
  • function -- 函数(任何可调用的 __name__)

  • pargs -- 要传递给函数的“raw”参数。自动JSonified。

  • pvars -- 要传递给函数的“原始”Kwarg。自动JSONIFIED

  • kwargs -- 所有可用参数(基本上,每个 scheduler_task 专栏)。如果args和var在这里,那么它们应该已经被jsonified了,并且它们将覆盖parg和pvar。

返回

一个dict,就像一个普通的validate_and_insert(),加上一个保存排队任务的uuid的uuid键。如果未通过验证(即某些参数无效),则ID和UUID都将为无,并且您将得到保存所发现错误的“错误”dict。

report_task(task, task_report)[源代码]

根据偏好保存结果。

处理重复任务的逻辑。

resume(group_names=None, limit=None, worker_name=None)[源代码]

唤醒工作进程(它将能够处理排队的任务)

run()[源代码]

这是由心跳线程执行的

send_heartbeat(counter)[源代码]

现有工人之间的协调。

它:-发送心跳-在可用的工人中选择一个断续器(唯一的进程

有效地将任务分派给工人)

  • 处理工人的状态

  • 为死去的工人“打扫房间”吗?

  • 触发任务分配给工作人员

set_requirements(scheduler_task)[源代码]

调用以设置懒惰表连接的默认值。

set_worker_status(group_names=None, action='ACTIVE', exclude=None, limit=None, worker_name=None)[源代码]

用于设置工作人员状态的内部函数。

sleep()[源代码]

计算睡眠的秒数。

start_heartbeats()[源代码]
stop_task(ref)[源代码]

任务终止的快捷方式。

如果任务正在运行,它将终止任务,这意味着状态将被设置为失败。

如果任务排队,其停止时间将设置为“现在”,

启用标志将设置为假,状态为“已停止”

参数

ref -- 可以是-整数:查找将由计划程序_task.id完成-字符串:查找将由计划程序_task.uuid完成

返回

-1如果任务已停止(意味着已完成更新)-如果未找到任务,或者任务未运行或排队,则为无

注解

实验

task_status(ref, output=False)[源代码]

检索任务状态和任务的结果(可选)

参数
  • ref -- 可以是-整数:查找将由计划程序执行task.id-a字符串:查找将由计划程序执行task.uuid-a Query :根据需要查找,例如:db.scheduler_task.task_name='test1'

  • output (bool) -- 如果 True ,同时获取调度程序运行记录

返回

一个单行对象,用于最后一个排队的任务。如果output==true,则还返回最后一个调度程序运行记录。调度程序运行记录是由左联接获取的,因此它可以具有所有字段==none

terminate(group_names=None, limit=None, worker_name=None)[源代码]

将Terminate设置为Worker状态。工作进程将等待当前正在运行的任何任务执行,然后将优雅地退出。

terminate_process(flush_out=True, flush_ret=True)[源代码]

终止任何正在运行的任务(仅限内部使用)

update_dependencies(task_id)[源代码]

取消阻止作业的执行路径。

wrapped_assign_tasks()[源代码]

要调用的商品函数 assign_tasks 和陷阱例外。

如果引发异常,则假定发生异常是因为数据库争用和重试 assign_task 0.5秒后

wrapped_pop_task()[源代码]

要调用的商品函数 pop_task 和陷阱例外。

如果引发异常,则假定发生异常是因为数据库争用和重试 pop_task 0.5秒后

wrapped_report_task(task, task_report)[源代码]

要调用的商品函数 report_task 和陷阱例外。

如果引发异常,则假定发生异常是因为数据库争用和重试 pop_task 0.5秒后

class gluon.scheduler.TYPE(myclass=<class 'list'>, parse=False)[源代码]

基类:object

验证程序,用于检查字段是否为有效的JSON并验证其类型。用于 argsvars 调度程序任务表的

class gluon.scheduler.Task(app, function, timeout, args='[]', vars='{}', **kwargs)[源代码]

基类:object

定义从主线程传递到执行器的“任务”对象

class gluon.scheduler.TaskReport(status, result=None, output=None, tb=None)[源代码]

基类:object

定义从执行者线程传递到主线程的“任务报告”对象

gluon.scheduler.executor(retq, task, outq)[源代码]

用于在后台进程中执行任务的函数。

gluon.scheduler.main()[源代码]

允许在没有python web2py.py的情况下运行worker….简单地说:

python gluon/scheduler.py