构建工作流 (skbio.workflow

构造任意复杂的工作流,其中特定方法的运行在运行时确定。此模块支持在项目失败时使工作流短路,支持排序方法、对已处理项的回调,以及根据状态或运行时选项决定执行哪些方法。

Classes

Workflow(state[, short_circuit, debug, options])

任意工作流支持结构

装饰器

requires([option, values, state])

在满足要求时执行函数的装饰器

method([priority])

修饰函数以指示它是工作流方法

示例

>>> from skbio.workflow import Workflow

作为一个例子 Workflow 对象,让我们构造一个序列处理器,它将过滤小于10个核苷酸的序列,如果运行时选项指示为,则反转序列,如果观察到特定的核苷酸模式,则截短。这个 Workflow 仅通过短路方法评估装饰物的要求 method . 开发人员可以在对象定义中随意定义任意多个方法,这些方法可以从工作流方法中调用,但它们不会直接接受工作流检查。

>>> nuc_pattern = 'AATTG'
>>> has_nuc_pattern = lambda s: s[:len(nuc_pattern)] == nuc_pattern
>>> class SequenceProcessor(Workflow):
...    def initialize_state(self, item):
...        # Setup the state for a new item (e.g., a new sequence)
...        self.state = item
...    @method(priority=100)
...    def check_length(self):
...        # Always make sure the sequence is at least 10 nucleotides
...        if len(self.state) < 10:
...            self.failed = True
...    @method(priority=90)
...    @requires(state=has_nuc_pattern)
...    def truncate(self):
...        # Truncate if a specific starting nucleotide pattern is observed
...        self.state = self.state[len(nuc_pattern):]
...    @method(priority=80)
...    @requires(option='reverse', values=True)
...    def reverse(self):
...        # Reverse the sequence if indicatd at runtime
...        self.state = self.state[::-1]

一个 Workflow 必须通过a state 对象和任何运行时选项。还有一些其他有用的参数可以指定,但超出了本例的范围。我们也不需要提供一个state对象作为 initialize_state 方法重写 self.state . 现在,让我们创建实例。

>>> wf = SequenceProcessor(state=None, options={'reverse=': False})

通过 SequenceProcessor ,我们需要传递一个iterable。所以,让我们创建一个 list 序列。

>>> seqs = ['AAAAAAATTTTTTT', 'ATAGACC', 'AATTGCCGGAC', 'ATATGAACAAA']

在运行这些序列之前,我们还将定义回调,这些回调应用于 Workflow . 回调是可选的——默认情况下,成功只会在忽略失败的情况下生成状态成员变量——但是,根据您的工作流程,处理失败或在成功时做一些有趣和令人兴奋的事情可能会很有用。

>>> def success_f(obj):
...     return "SUCCESS: %s" % obj.state
>>> def fail_f(obj):
...     return "FAIL: %s" % obj.state

现在,让我们处理一些数据!

>>> for result in wf(seqs, success_callback=success_f, fail_callback=fail_f):
...     print(result)
SUCCESS: AAAAAAATTTTTTT
FAIL: ATAGACC
SUCCESS: CCGGAC
SUCCESS: ATATGAACAAA

一些值得注意的事情发生了。首先,没有一个序列与 SequenceProcessor 未将选项“reverse”设置为 True . 第二,你会注意到第三个序列被截断了,这是因为它符合我们感兴趣的核苷酸模式。最后,在我们处理的序列中,只有一个序列失败。

为了帮助构造工作流,可以使用调试信息,但必须在实例化时打开它。让我们这样做,当我们在做的时候,让我们继续并启用反转方法。不过,这次我们将一次遍历一个项,以便我们可以检查调试信息。

>>> wf = SequenceProcessor(state=None, options={'reverse':True}, debug=True)
>>> gen = wf(seqs, fail_callback=lambda x: x.state)
>>> next(gen)
'TTTTTTTAAAAAAA'
>>> wf.failed
False
>>> sorted(wf.debug_trace)
[('check_length', 0), ('reverse', 2)]

这个 debug_trace 指定执行的方法及其执行顺序,接近零表示执行顺序较早。间隙表明存在已评估但未执行的方法。中的每个项目 debug_trace 是一把钥匙 dict 我们稍后将讨论的调试信息。你有没有看到这次通过工作流程颠倒了顺序?

现在,让我们看看下一个项目,在我们之前的工作流运行中,它是一个失败的项目。

>>> next(gen)
'ATAGACC'
>>> wf.failed
True
>>> sorted(wf.debug_trace)
[('check_length', 0)]

我们可以看到,失败的序列只执行check_length方法。因为这个序列没有通过我们10个核苷酸的长度过滤器,所以在 check_length 方法。因此,没有对其他方法进行评估(注意:如果需要,可以禁用此短路行为)。

这第三项先前与我们感兴趣的截短核苷酸模式相匹配。让我们看看调试输出中的内容。

>>> next(gen)
'CAGGCC'
>>> wf.failed
False
>>> sorted(wf.debug_trace)
[('check_length', 0), ('reverse', 2), ('truncate', 1)]

在最后一个例子中,我们可以看到 truncate 方法在 reverse 方法和遵循 check_length 方法。考虑到我们为这些方法指定的优先级,这与预期的一样。自从 truncate 让我们来看看如何做一些有趣的事情 state 正在改变。首先,我们将在调用 truncate 然后我们要把 state 接到电话后 truncate ,这将使我们能够迅速了解正在发生的事情。

>>> wf.debug_pre_state[('truncate', 1)]
'AATTGCCGGAC'
>>> wf.debug_post_state[('truncate', 1)]
'CCGGAC'

正如我们所料,我们的原始序列 truncate ,并遵循 truncate ,我们的序列缺少我们感兴趣的核苷酸模式。太棒了,对吧?

有最后一个调试输出, wf.debug_runtime ,这在诊断特定状态(而不是cProfile提供的聚合)所需的时间量时非常有用。

工作流的最后三个非常方便的组件是允许您指示 anything 作为选项值 not_none ,以及定义有效值范围的机制。

>>> from skbio.workflow import not_none, anything
>>> class Ex(Workflow):
...     @method()
...     @requires(option='foo', values=not_none)
...     def do_something(self):
...         pass
...     @method()
...     @requires(option='bar', values=anything)
...     def do_something_else(self):
...         pass
...     @method()
...     @requires(option='foobar', values=[1,2,3])
...     def do_something_awesome(self):
...         pass
...