I am trying to write some code using dask.distributed.Client and rioxarray to_raster that:
- Concatenates two rasters (dask arrays)
- Applies a function across all blocks in the concatenated array
- Writes the final array to a ".tif" file
However the code fails either with a ValueError: Lock is not yet acquired or AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper' error depending on what type of lock I use.
I'm using a local cluster and want to run the code across multiple threads and 1 worker. I've written the following minimum reproducible example:
def test():
from osgeo import gdal
import rasterio
from rasterio.transform import Affine
import xarray as xr
import rioxarray
from dask.distributed import Client, Lock
import numpy as np
# Create georeferenced example rasters
x = np.linspace(-90, 90, 100)
y = np.linspace(90, -90, 100)
X, Y = np.meshgrid(x, y)
Z1 = np.abs(((X - 10) ** 2 + (Y - 10) ** 2) / 1 ** 2)
Z2 = np.abs(((X + 10) ** 2 + (Y + 10) ** 2) / 2.5 ** 2)
Z = (Z1 - Z2)
xres = (x[-1] - x[0]) / len(x)
yres = (y[-1] - y[0]) / len(y)
transform = Affine.translation(x[0] - xres / 2, y[0] - yres / 2) * Affine.scale(xres, yres)
with rasterio.open(
"example.tif",
mode="w",
driver="GTiff",
height=Z.shape[0],
width=Z.shape[1],
count=1,
dtype=Z.dtype,
crs="+proj=latlong",
transform=transform,
) as new_dataset:
new_dataset.write(Z, 1)
with rasterio.open(
"example2.tif",
mode="w",
driver="GTiff",
height=Z.shape[0],
width=Z.shape[1],
count=1,
dtype=Z.dtype,
crs="+proj=latlong",
transform=transform,
) as new_dataset:
new_dataset.write(Z, 1)
# Use dask distributed and rioxarray to open the rasters
chunkDimsX = 10
chunkDimsY = 10
with Client(threads_per_worker=6, n_workers=1, processes=False) as client:
with rioxarray.open_rasterio("example.tif", chunks=(chunkDimsX, chunkDimsY), lock=False) as xds:
with rioxarray.open_rasterio("example2.tif", chunks=(chunkDimsX, chunkDimsY), lock=False) as xds2:
concatRaster = xr.concat([xds, xds2], dim='band').chunk({'band': 1, 'x': chunkDimsX, 'y': chunkDimsY})
concatRaster = concatRaster.map_blocks(lambda x: x + 1, template=concatRaster)
concatRaster.rio.write_nodata(0, inplace=True)
concatRaster.rio.to_raster('concat.tif', tiled=True, lock = Lock(client=client), overwrite=True)
if __name__ == '__main__':
test()
When I run the above, I get the following error message:
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "<stdin>", line 52, in test
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_array.py", line 1125, in to_raster
return RasterioWriter(raster_path=raster_path).to_raster(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_writer.py", line 295, in to_raster
return dask.array.store(
^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\array\core.py", line 1236, in store
compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\base.py", line 402, in compute_as_if_collection
return schedule(dsk2, keys, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 3275, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 2372, in gather
return self.sync(
^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\lock.py", line 163, in release
raise ValueError("Lock is not yet acquired")
^^^^^^^^^^^^^^^
ValueError: Lock is not yet acquired
I tried changing the Lock argument to be True instead of the dask.distributed client:
concatRaster.rio.to_raster('concat.tif', tiled=True, lock = True, overwrite=True)
But I get the following error:
2024-03-22 10:21:45,634 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1a0162f93d0>
0. 1787078771392
>.
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 68, in dumps
pickler.dump(x)
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1245, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 68, in dumps
pickler.dump(x)
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\serialize.py", line 353, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\serialize.py", line 76, in pickle_dumps
frames[0] = pickle.dumps(
^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1245, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "<stdin>", line 52, in test
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_array.py", line 1125, in to_raster
return RasterioWriter(raster_path=raster_path).to_raster(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_writer.py", line 295, in to_raster
return dask.array.store(
^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\array\core.py", line 1236, in store
compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\base.py", line 402, in compute_as_if_collection
return schedule(dsk2, keys, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 3255, in get
futures = self._graph_to_futures(
^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 3151, in _graph_to_futures
header, frames = serialize(ToPickle(dsk), on_error="raise")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\serialize.py", line 379, in serialize
raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1a0162f93d0>\n 0. 1787078771392\n>')
It looks like it's related to the dask array map_blocks method.
I'm fairly inexperienced using dask, so any help with deciphering why these error messages are occurring and what I'm doing wrong would be greatly appreciated!