并行处理

栅格提供对栅格数据的并行处理。调用gdal时释放python的全局解释器锁(gil) GDALRasterIO() 函数,这意味着Python线程可以同时读写。

numpy库还经常发布gil,例如,在对数组应用通用函数时,这使得可以跨处理器的核心分布数组的处理。下面的cython函数,包含在Rasterio的 _example 模块,模拟了这样一个gil发布的栅格处理功能。

# cython: boundscheck=False

import numpy as np


def compute(unsigned char[:, :, :] input):
    """reverses bands inefficiently

    Given input and output uint8 arrays, fakes an CPU-intensive
    computation.
    """
    cdef int I, J, K
    cdef int i, j, k, l
    cdef double val
    I = input.shape[0]
    J = input.shape[1]
    K = input.shape[2]
    output = np.empty((I, J, K), dtype='uint8')
    cdef unsigned char[:, :, :] output_view = output
    with nogil:
        for i in range(I):
            for j in range(J):
                for k in range(K):
                    val = <double>input[i, j, k]
                    for l in range(2000):
                        val += 1.0
                    val -= 2000.0
                    output_view[~i, j, k] = <unsigned char>val
    return output

下面是examples/thread_pool_executor.py中的程序。

"""thread_pool_executor.py

Operate on a raster dataset window-by-window using a ThreadPoolExecutor.

Simulates a CPU-bound thread situation where multiple threads can improve
performance.

With -j 4, the program returns in about 1/4 the time as with -j 1.
"""

import concurrent.futures

import rasterio
from rasterio._example import compute


def main(infile, outfile, num_workers=4):
    """Process infile block-by-block and write to a new file

    The output is the same as the input, but with band order
    reversed.
    """

    with rasterio.Env():

        with rasterio.open(infile) as src:

            # Create a destination dataset based on source params. The
            # destination will be tiled, and we'll process the tiles
            # concurrently.
            profile = src.profile
            profile.update(blockxsize=128, blockysize=128, tiled=True)

            with rasterio.open(outfile, "w", **profile) as dst:

                # Materialize a list of destination block windows
                # that we will use in several statements below.
                windows = [window for ij, window in dst.block_windows()]

                # This generator comprehension gives us raster data
                # arrays for each window. Later we will zip a mapping
                # of it with the windows list to get (window, result)
                # pairs.
                data_gen = (src.read(window=window) for window in windows)

                with concurrent.futures.ThreadPoolExecutor(
                    max_workers=num_workers
                ) as executor:

                    # We map the compute() function over the raster
                    # data generator, zip the resulting iterator with
                    # the windows list, and as pairs come back we
                    # write data to the destination dataset.
                    for window, result in zip(
                        windows, executor.map(compute, data_gen)
                    ):
                        dst.write(result, window=window)

上面的代码模拟CPU密集型计算,当使用 ThreadPoolExecutor 从python 3的 concurrent.futures 模块。与一个并发作业的情况相比 (-j 1

$ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 1

real    0m3.555s
user    0m3.422s
sys     0m0.095s

我们有四个并行作业,速度快了近3倍。

$ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 4

real    0m1.247s
user    0m3.505s
sys     0m0.088s

注解

如果要在栅格窗口上映射的函数没有释放gil,则可以替换 ThreadPoolExecutor 具有 ProcessPoolExecutor 得到相同的结果,性能相似。