15. 任务-在后台执行繁重的工作

提示

如果您在pyqgis控制台之外,则此页面上的代码片段需要以下导入:

 1from qgis.core import (
 2  Qgis,
 3  QgsApplication,
 4  QgsMessageLog,
 5  QgsProcessingAlgRunnerTask,
 6  QgsProcessingContext,
 7  QgsProcessingFeedback,
 8  QgsProject,
 9  QgsTask,
10  QgsTaskManager,
11)

15.1. 引言

使用线程的后台处理是在进行繁重处理时保持响应的用户界面的一种方式。在QGIS中可以使用任务来实现线程化。

一项任务 (QgsTask )是要在后台执行的代码的容器,而任务管理器 (QgsTaskManager )用于控制任务的运行。这些类通过提供用于发送信号、进度报告和访问后台进程状态的机制,简化了QGIS中的后台处理。可以使用子任务对任务进行分组。

全局任务管理器(随一起找到 QgsApplication.taskManager() )通常使用。这意味着您的任务可能不是任务管理器控制的唯一任务。

创建QGIS任务有多种方法:

  • 通过扩展创建您自己的任务 QgsTask

    class SpecialisedTask(QgsTask):
        pass
    
  • 从函数创建任务

     1def heavyFunction():
     2    # Some CPU intensive processing ...
     3    pass
     4
     5def workdone():
     6    # ... do something useful with the results
     7    pass
     8
     9task = QgsTask.fromFunction('heavy function', heavyFunction,
    10                     on_finished=workdone)
    
  • 根据处理算法创建任务

    1params = dict()
    2context = QgsProcessingContext()
    3context.setProject(QgsProject.instance())
    4feedback = QgsProcessingFeedback()
    5
    6buffer_alg = QgsApplication.instance().processingRegistry().algorithmById('native:buffer')
    7task = QgsProcessingAlgRunnerTask(buffer_alg, params, context,
    8                           feedback)
    

警告

任何后台任务(无论它是如何创建的)都不能使用主线程上的任何QObject,例如访问QgsVectorLayer、QgsProject或执行任何基于图形用户界面的操作,如创建新的小部件或与现有小部件交互。Qt窗口小部件只能从主线程访问或修改。在任务启动之前,必须复制任务中使用的数据。试图从后台线程使用它们将导致崩溃。

此外,一定要确保 contextfeedback 至少和使用它们的任务一样长久地活下去。如果在任务完成后,QGIS将崩溃, QgsTaskManager 无法访问 contextfeedback 该任务是根据其调度的。

备注

这是一种常见的模式,可以调用 setProject() 在打完电话后不久 QgsProcessingContext 。这允许任务及其回调函数使用大多数项目范围的设置。在回调函数中处理空间层时,这一点尤其有用。

任务之间的依赖关系可以使用 addSubTask() 的功能 QgsTask 。当声明依赖项时,任务管理器将自动确定如何执行这些依赖项。在可能的情况下,将并行执行依赖项,以便尽可能快地满足它们。如果取消了另一个任务所依赖的任务,则该依赖任务也将被取消。循环依赖可能会导致死锁,因此要小心。

如果任务依赖于可用的层,则可以使用 setDependentLayers() 的功能 QgsTask 。如果任务所依赖的层不可用,则该任务将被取消。

一旦创建了任务,就可以使用 addTask() 任务管理器的功能。向管理器添加任务会自动将该任务的所有权转移给管理器,管理器将在任务执行后清理和删除这些任务。任务的调度受任务优先级的影响,该优先级在 addTask()

可以使用以下工具监视任务的状态 QgsTaskQgsTaskManager 信号和功能。

15.2. 实例

15.2.1. 扩展QgsTask

在本例中 RandomIntegerSumTask 延展 QgsTask 并且将在指定的时间段内生成100个介于0和500之间的随机整数。如果随机数为42,则中止任务并引发异常。几个实例 RandomIntegerSumTask (包含子任务)被生成并添加到任务管理器,演示了两种类型的依赖关系。

  1import random
  2from time import sleep
  3
  4from qgis.core import (
  5    QgsApplication, QgsTask, QgsMessageLog, Qgis
  6    )
  7
  8MESSAGE_CATEGORY = 'RandomIntegerSumTask'
  9
 10class RandomIntegerSumTask(QgsTask):
 11    """This shows how to subclass QgsTask"""
 12
 13    def __init__(self, description, duration):
 14        super().__init__(description, QgsTask.CanCancel)
 15        self.duration = duration
 16        self.total = 0
 17        self.iterations = 0
 18        self.exception = None
 19
 20    def run(self):
 21        """Here you implement your heavy lifting.
 22        Should periodically test for isCanceled() to gracefully
 23        abort.
 24        This method MUST return True or False.
 25        Raising exceptions will crash QGIS, so we handle them
 26        internally and raise them in self.finished
 27        """
 28        QgsMessageLog.logMessage('Started task "{}"'.format(
 29                                     self.description()),
 30                                 MESSAGE_CATEGORY, Qgis.Info)
 31        wait_time = self.duration / 100
 32        for i in range(100):
 33            sleep(wait_time)
 34            # use setProgress to report progress
 35            self.setProgress(i)
 36            arandominteger = random.randint(0, 500)
 37            self.total += arandominteger
 38            self.iterations += 1
 39            # check isCanceled() to handle cancellation
 40            if self.isCanceled():
 41                return False
 42            # simulate exceptions to show how to abort task
 43            if arandominteger == 42:
 44                # DO NOT raise Exception('bad value!')
 45                # this would crash QGIS
 46                self.exception = Exception('bad value!')
 47                return False
 48        return True
 49
 50    def finished(self, result):
 51        """
 52        This function is automatically called when the task has
 53        completed (successfully or not).
 54        You implement finished() to do whatever follow-up stuff
 55        should happen after the task is complete.
 56        finished is always called from the main thread, so it's safe
 57        to do GUI operations and raise Python exceptions here.
 58        result is the return value from self.run.
 59        """
 60        if result:
 61            QgsMessageLog.logMessage(
 62                'RandomTask "{name}" completed\n' \
 63                'RandomTotal: {total} (with {iterations} '\
 64              'iterations)'.format(
 65                  name=self.description(),
 66                  total=self.total,
 67                  iterations=self.iterations),
 68              MESSAGE_CATEGORY, Qgis.Success)
 69        else:
 70            if self.exception is None:
 71                QgsMessageLog.logMessage(
 72                    'RandomTask "{name}" not successful but without '\
 73                    'exception (probably the task was manually '\
 74                    'canceled by the user)'.format(
 75                        name=self.description()),
 76                    MESSAGE_CATEGORY, Qgis.Warning)
 77            else:
 78                QgsMessageLog.logMessage(
 79                    'RandomTask "{name}" Exception: {exception}'.format(
 80                        name=self.description(),
 81                        exception=self.exception),
 82                    MESSAGE_CATEGORY, Qgis.Critical)
 83                raise self.exception
 84
 85    def cancel(self):
 86        QgsMessageLog.logMessage(
 87            'RandomTask "{name}" was canceled'.format(
 88                name=self.description()),
 89            MESSAGE_CATEGORY, Qgis.Info)
 90        super().cancel()
 91
 92
 93longtask = RandomIntegerSumTask('waste cpu long', 20)
 94shorttask = RandomIntegerSumTask('waste cpu short', 10)
 95minitask = RandomIntegerSumTask('waste cpu mini', 5)
 96shortsubtask = RandomIntegerSumTask('waste cpu subtask short', 5)
 97longsubtask = RandomIntegerSumTask('waste cpu subtask long', 10)
 98shortestsubtask = RandomIntegerSumTask('waste cpu subtask shortest', 4)
 99
100# Add a subtask (shortsubtask) to shorttask that must run after
101# minitask and longtask has finished
102shorttask.addSubTask(shortsubtask, [minitask, longtask])
103# Add a subtask (longsubtask) to longtask that must be run
104# before the parent task
105longtask.addSubTask(longsubtask, [], QgsTask.ParentDependsOnSubTask)
106# Add a subtask (shortestsubtask) to longtask
107longtask.addSubTask(shortestsubtask)
108
109QgsApplication.taskManager().addTask(longtask)
110QgsApplication.taskManager().addTask(shorttask)
111QgsApplication.taskManager().addTask(minitask)
 1RandomIntegerSumTask(0): Started task "waste cpu subtask shortest"
 2RandomIntegerSumTask(0): Started task "waste cpu short"
 3RandomIntegerSumTask(0): Started task "waste cpu mini"
 4RandomIntegerSumTask(0): Started task "waste cpu subtask long"
 5RandomIntegerSumTask(3): Task "waste cpu subtask shortest" completed
 6RandomTotal: 25452 (with 100 iterations)
 7RandomIntegerSumTask(3): Task "waste cpu mini" completed
 8RandomTotal: 23810 (with 100 iterations)
 9RandomIntegerSumTask(3): Task "waste cpu subtask long" completed
10RandomTotal: 26308 (with 100 iterations)
11RandomIntegerSumTask(0): Started task "waste cpu long"
12RandomIntegerSumTask(3): Task "waste cpu long" completed
13RandomTotal: 22534 (with 100 iterations)

15.2.2. 来自函数的任务

从函数创建任务 (doSomething 在本例中)。该函数的第一个参数将保存 QgsTask 用于该函数。一个重要的(已命名)参数是 on_finished ,它指定在任务完成时将调用的函数。这个 doSomething 此示例中的函数有一个附加命名参数 wait_time

 1import random
 2from time import sleep
 3
 4MESSAGE_CATEGORY = 'TaskFromFunction'
 5
 6def doSomething(task, wait_time):
 7    """
 8    Raises an exception to abort the task.
 9    Returns a result if success.
10    The result will be passed, together with the exception (None in
11    the case of success), to the on_finished method.
12    If there is an exception, there will be no result.
13    """
14    QgsMessageLog.logMessage('Started task {}'.format(task.description()),
15                             MESSAGE_CATEGORY, Qgis.Info)
16    wait_time = wait_time / 100
17    total = 0
18    iterations = 0
19    for i in range(100):
20        sleep(wait_time)
21        # use task.setProgress to report progress
22        task.setProgress(i)
23        arandominteger = random.randint(0, 500)
24        total += arandominteger
25        iterations += 1
26        # check task.isCanceled() to handle cancellation
27        if task.isCanceled():
28            stopped(task)
29            return None
30        # raise an exception to abort the task
31        if arandominteger == 42:
32            raise Exception('bad value!')
33    return {'total': total, 'iterations': iterations,
34            'task': task.description()}
35
36def stopped(task):
37    QgsMessageLog.logMessage(
38        'Task "{name}" was canceled'.format(
39            name=task.description()),
40        MESSAGE_CATEGORY, Qgis.Info)
41
42def completed(exception, result=None):
43    """This is called when doSomething is finished.
44    Exception is not None if doSomething raises an exception.
45    result is the return value of doSomething."""
46    if exception is None:
47        if result is None:
48            QgsMessageLog.logMessage(
49                'Completed with no exception and no result '\
50                '(probably manually canceled by the user)',
51                MESSAGE_CATEGORY, Qgis.Warning)
52        else:
53            QgsMessageLog.logMessage(
54                'Task {name} completed\n'
55                'Total: {total} ( with {iterations} '
56                'iterations)'.format(
57                    name=result['task'],
58                    total=result['total'],
59                    iterations=result['iterations']),
60                MESSAGE_CATEGORY, Qgis.Info)
61    else:
62        QgsMessageLog.logMessage("Exception: {}".format(exception),
63                                 MESSAGE_CATEGORY, Qgis.Critical)
64        raise exception
65
66# Create a few tasks
67task1 = QgsTask.fromFunction('Waste cpu 1', doSomething,
68                             on_finished=completed, wait_time=4)
69task2 = QgsTask.fromFunction('Waste cpu 2', doSomething,
70                             on_finished=completed, wait_time=3)
71QgsApplication.taskManager().addTask(task1)
72QgsApplication.taskManager().addTask(task2)
1RandomIntegerSumTask(0): Started task "waste cpu subtask short"
2RandomTaskFromFunction(0): Started task Waste cpu 1
3RandomTaskFromFunction(0): Started task Waste cpu 2
4RandomTaskFromFunction(0): Task Waste cpu 2 completed
5RandomTotal: 23263 ( with 100 iterations)
6RandomTaskFromFunction(0): Task Waste cpu 1 completed
7RandomTotal: 25044 ( with 100 iterations)

15.2.3. 来自处理算法的任务

创建使用该算法的任务 qgis:randompointsinextent 在指定范围内生成50000个随机点。结果将以安全的方式添加到项目中。

 1from functools import partial
 2from qgis.core import (QgsTaskManager, QgsMessageLog,
 3                       QgsProcessingAlgRunnerTask, QgsApplication,
 4                       QgsProcessingContext, QgsProcessingFeedback,
 5                       QgsProject)
 6
 7MESSAGE_CATEGORY = 'AlgRunnerTask'
 8
 9def task_finished(context, successful, results):
10    if not successful:
11        QgsMessageLog.logMessage('Task finished unsucessfully',
12                                 MESSAGE_CATEGORY, Qgis.Warning)
13    output_layer = context.getMapLayer(results['OUTPUT'])
14    # because getMapLayer doesn't transfer ownership, the layer will
15    # be deleted when context goes out of scope and you'll get a
16    # crash.
17    # takeMapLayer transfers ownership so it's then safe to add it
18    # to the project and give the project ownership.
19    if output_layer and output_layer.isValid():
20        QgsProject.instance().addMapLayer(
21             context.takeResultLayer(output_layer.id()))
22
23alg = QgsApplication.processingRegistry().algorithmById(
24                                      'qgis:randompointsinextent')
25# `context` and `feedback` need to
26# live for as least as long as `task`,
27# otherwise the program will crash.
28# Initializing them globally is a sure way
29# of avoiding this unfortunate situation.
30context = QgsProcessingContext()
31feedback = QgsProcessingFeedback()
32params = {
33    'EXTENT': '0.0,10.0,40,50 [EPSG:4326]',
34    'MIN_DISTANCE': 0.0,
35    'POINTS_NUMBER': 50000,
36    'TARGET_CRS': 'EPSG:4326',
37    'OUTPUT': 'memory:My random points'
38}
39task = QgsProcessingAlgRunnerTask(alg, params, context, feedback)
40task.executed.connect(partial(task_finished, context))
41QgsApplication.taskManager().addTask(task)

另请参阅:https://www.opengis.ch/2018/06/22/threads-in-pyqgis3/.