并行处理¶
栅格提供对栅格数据的并行处理。调用gdal时释放python的全局解释器锁(gil) GDALRasterIO()
函数,这意味着Python线程可以同时读写。
numpy库还经常发布gil,例如,在对数组应用通用函数时,这使得可以跨处理器的核心分布数组的处理。下面的cython函数,包含在Rasterio的 _example
模块,模拟了这样一个gil发布的栅格处理功能。
# cython: boundscheck=False
import numpy as np
def compute(unsigned char[:, :, :] input):
"""reverses bands inefficiently
Given input and output uint8 arrays, fakes an CPU-intensive
computation.
"""
cdef int I, J, K
cdef int i, j, k, l
cdef double val
I = input.shape[0]
J = input.shape[1]
K = input.shape[2]
output = np.empty((I, J, K), dtype='uint8')
cdef unsigned char[:, :, :] output_view = output
with nogil:
for i in range(I):
for j in range(J):
for k in range(K):
val = <double>input[i, j, k]
for l in range(2000):
val += 1.0
val -= 2000.0
output_view[~i, j, k] = <unsigned char>val
return output
下面是examples/thread_pool_executor.py中的程序。
"""thread_pool_executor.py
Operate on a raster dataset window-by-window using a ThreadPoolExecutor.
Simulates a CPU-bound thread situation where multiple threads can improve
performance.
With -j 4, the program returns in about 1/4 the time as with -j 1.
"""
import concurrent.futures
import rasterio
from rasterio._example import compute
def main(infile, outfile, num_workers=4):
"""Process infile block-by-block and write to a new file
The output is the same as the input, but with band order
reversed.
"""
with rasterio.Env():
with rasterio.open(infile) as src:
# Create a destination dataset based on source params. The
# destination will be tiled, and we'll process the tiles
# concurrently.
profile = src.profile
profile.update(blockxsize=128, blockysize=128, tiled=True)
with rasterio.open(outfile, "w", **profile) as dst:
# Materialize a list of destination block windows
# that we will use in several statements below.
windows = [window for ij, window in dst.block_windows()]
# This generator comprehension gives us raster data
# arrays for each window. Later we will zip a mapping
# of it with the windows list to get (window, result)
# pairs.
data_gen = (src.read(window=window) for window in windows)
with concurrent.futures.ThreadPoolExecutor(
max_workers=num_workers
) as executor:
# We map the compute() function over the raster
# data generator, zip the resulting iterator with
# the windows list, and as pairs come back we
# write data to the destination dataset.
for window, result in zip(
windows, executor.map(compute, data_gen)
):
dst.write(result, window=window)
上面的代码模拟CPU密集型计算,当使用 ThreadPoolExecutor
从python 3的 concurrent.futures
模块。与一个并发作业的情况相比 (-j 1
)
$ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 1
real 0m3.555s
user 0m3.422s
sys 0m0.095s
我们有四个并行作业,速度快了近3倍。
$ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 4
real 0m1.247s
user 0m3.505s
sys 0m0.088s
注解
如果要在栅格窗口上映射的函数没有释放gil,则可以替换 ThreadPoolExecutor
具有 ProcessPoolExecutor
得到相同的结果,性能相似。