构建工作流 (skbio.workflow
)¶
构造任意复杂的工作流,其中特定方法的运行在运行时确定。此模块支持在项目失败时使工作流短路,支持排序方法、对已处理项的回调,以及根据状态或运行时选项决定执行哪些方法。
Classes¶
|
任意工作流支持结构 |
装饰器¶
|
在满足要求时执行函数的装饰器 |
|
修饰函数以指示它是工作流方法 |
示例
>>> from skbio.workflow import Workflow
作为一个例子 Workflow
对象,让我们构造一个序列处理器,它将过滤小于10个核苷酸的序列,如果运行时选项指示为,则反转序列,如果观察到特定的核苷酸模式,则截短。这个 Workflow
仅通过短路方法评估装饰物的要求 method
. 开发人员可以在对象定义中随意定义任意多个方法,这些方法可以从工作流方法中调用,但它们不会直接接受工作流检查。
>>> nuc_pattern = 'AATTG'
>>> has_nuc_pattern = lambda s: s[:len(nuc_pattern)] == nuc_pattern
>>> class SequenceProcessor(Workflow):
... def initialize_state(self, item):
... # Setup the state for a new item (e.g., a new sequence)
... self.state = item
... @method(priority=100)
... def check_length(self):
... # Always make sure the sequence is at least 10 nucleotides
... if len(self.state) < 10:
... self.failed = True
... @method(priority=90)
... @requires(state=has_nuc_pattern)
... def truncate(self):
... # Truncate if a specific starting nucleotide pattern is observed
... self.state = self.state[len(nuc_pattern):]
... @method(priority=80)
... @requires(option='reverse', values=True)
... def reverse(self):
... # Reverse the sequence if indicatd at runtime
... self.state = self.state[::-1]
一个 Workflow
必须通过a state
对象和任何运行时选项。还有一些其他有用的参数可以指定,但超出了本例的范围。我们也不需要提供一个state对象作为 initialize_state
方法重写 self.state
. 现在,让我们创建实例。
>>> wf = SequenceProcessor(state=None, options={'reverse=': False})
通过 SequenceProcessor
,我们需要传递一个iterable。所以,让我们创建一个 list
序列。
>>> seqs = ['AAAAAAATTTTTTT', 'ATAGACC', 'AATTGCCGGAC', 'ATATGAACAAA']
在运行这些序列之前,我们还将定义回调,这些回调应用于 Workflow
. 回调是可选的——默认情况下,成功只会在忽略失败的情况下生成状态成员变量——但是,根据您的工作流程,处理失败或在成功时做一些有趣和令人兴奋的事情可能会很有用。
>>> def success_f(obj):
... return "SUCCESS: %s" % obj.state
>>> def fail_f(obj):
... return "FAIL: %s" % obj.state
现在,让我们处理一些数据!
>>> for result in wf(seqs, success_callback=success_f, fail_callback=fail_f):
... print(result)
SUCCESS: AAAAAAATTTTTTT
FAIL: ATAGACC
SUCCESS: CCGGAC
SUCCESS: ATATGAACAAA
一些值得注意的事情发生了。首先,没有一个序列与 SequenceProcessor
未将选项“reverse”设置为 True
. 第二,你会注意到第三个序列被截断了,这是因为它符合我们感兴趣的核苷酸模式。最后,在我们处理的序列中,只有一个序列失败。
为了帮助构造工作流,可以使用调试信息,但必须在实例化时打开它。让我们这样做,当我们在做的时候,让我们继续并启用反转方法。不过,这次我们将一次遍历一个项,以便我们可以检查调试信息。
>>> wf = SequenceProcessor(state=None, options={'reverse':True}, debug=True)
>>> gen = wf(seqs, fail_callback=lambda x: x.state)
>>> next(gen)
'TTTTTTTAAAAAAA'
>>> wf.failed
False
>>> sorted(wf.debug_trace)
[('check_length', 0), ('reverse', 2)]
这个 debug_trace
指定执行的方法及其执行顺序,接近零表示执行顺序较早。间隙表明存在已评估但未执行的方法。中的每个项目 debug_trace
是一把钥匙 dict
我们稍后将讨论的调试信息。你有没有看到这次通过工作流程颠倒了顺序?
现在,让我们看看下一个项目,在我们之前的工作流运行中,它是一个失败的项目。
>>> next(gen)
'ATAGACC'
>>> wf.failed
True
>>> sorted(wf.debug_trace)
[('check_length', 0)]
我们可以看到,失败的序列只执行check_length方法。因为这个序列没有通过我们10个核苷酸的长度过滤器,所以在 check_length
方法。因此,没有对其他方法进行评估(注意:如果需要,可以禁用此短路行为)。
这第三项先前与我们感兴趣的截短核苷酸模式相匹配。让我们看看调试输出中的内容。
>>> next(gen)
'CAGGCC'
>>> wf.failed
False
>>> sorted(wf.debug_trace)
[('check_length', 0), ('reverse', 2), ('truncate', 1)]
在最后一个例子中,我们可以看到 truncate
方法在 reverse
方法和遵循 check_length
方法。考虑到我们为这些方法指定的优先级,这与预期的一样。自从 truncate
让我们来看看如何做一些有趣的事情 state
正在改变。首先,我们将在调用 truncate
然后我们要把 state
接到电话后 truncate
,这将使我们能够迅速了解正在发生的事情。
>>> wf.debug_pre_state[('truncate', 1)]
'AATTGCCGGAC'
>>> wf.debug_post_state[('truncate', 1)]
'CCGGAC'
正如我们所料,我们的原始序列 truncate
,并遵循 truncate
,我们的序列缺少我们感兴趣的核苷酸模式。太棒了,对吧?
有最后一个调试输出, wf.debug_runtime
,这在诊断特定状态(而不是cProfile提供的聚合)所需的时间量时非常有用。
工作流的最后三个非常方便的组件是允许您指示 anything
作为选项值 not_none
,以及定义有效值范围的机制。
>>> from skbio.workflow import not_none, anything
>>> class Ex(Workflow):
... @method()
... @requires(option='foo', values=not_none)
... def do_something(self):
... pass
... @method()
... @requires(option='bar', values=anything)
... def do_something_else(self):
... pass
... @method()
... @requires(option='foobar', values=[1,2,3])
... def do_something_awesome(self):
... pass
...