1

I am trying to write some code using dask.distributed.Client and rioxarray to_raster that:

  1. Concatenates two rasters (dask arrays)
  2. Applies a function across all blocks in the concatenated array
  3. Writes the final array to a ".tif" file

However the code fails either with a ValueError: Lock is not yet acquired or AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper' error depending on what type of lock I use.

I'm using a local cluster and want to run the code across multiple threads and 1 worker. I've written the following minimum reproducible example:

def test():
    from osgeo import gdal
    import rasterio
    from rasterio.transform import Affine
    import xarray as xr
    import rioxarray
    from dask.distributed import Client, Lock
    import numpy as np

    # Create georeferenced example rasters
    x = np.linspace(-90, 90, 100)
    y = np.linspace(90, -90, 100)
    X, Y = np.meshgrid(x, y)
    Z1 =  np.abs(((X - 10) ** 2 + (Y - 10) ** 2) / 1 ** 2)
    Z2 =  np.abs(((X + 10) ** 2 + (Y + 10) ** 2) / 2.5 ** 2)
    Z =  (Z1 - Z2)

    xres = (x[-1] - x[0]) / len(x)
    yres = (y[-1] - y[0]) / len(y)

    transform = Affine.translation(x[0] - xres / 2, y[0] - yres / 2) * Affine.scale(xres, yres)

    with rasterio.open(
            "example.tif",
            mode="w",
            driver="GTiff",
            height=Z.shape[0],
            width=Z.shape[1],
            count=1,
            dtype=Z.dtype,
            crs="+proj=latlong",
            transform=transform,
    ) as new_dataset:
            new_dataset.write(Z, 1)


    with rasterio.open(
            "example2.tif",
            mode="w",
            driver="GTiff",
            height=Z.shape[0],
            width=Z.shape[1],
            count=1,
            dtype=Z.dtype,
            crs="+proj=latlong",
            transform=transform,
    ) as new_dataset:
            new_dataset.write(Z, 1)

    # Use dask distributed and rioxarray to open the rasters
    chunkDimsX = 10
    chunkDimsY = 10
    with Client(threads_per_worker=6, n_workers=1, processes=False) as client:

        with rioxarray.open_rasterio("example.tif", chunks=(chunkDimsX, chunkDimsY), lock=False) as xds:
            
            with rioxarray.open_rasterio("example2.tif", chunks=(chunkDimsX, chunkDimsY), lock=False) as xds2:
                
                concatRaster = xr.concat([xds, xds2], dim='band').chunk({'band': 1, 'x': chunkDimsX, 'y': chunkDimsY})
                concatRaster = concatRaster.map_blocks(lambda x: x + 1, template=concatRaster)
                concatRaster.rio.write_nodata(0, inplace=True)

                concatRaster.rio.to_raster('concat.tif', tiled=True, lock = Lock(client=client), overwrite=True)


if __name__ == '__main__':
    test()

When I run the above, I get the following error message:

Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "<stdin>", line 52, in test
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_array.py", line 1125, in to_raster
    return RasterioWriter(raster_path=raster_path).to_raster(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_writer.py", line 295, in to_raster
    return dask.array.store(
           ^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\array\core.py", line 1236, in store
    compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\base.py", line 402, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 3275, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 2372, in gather
    return self.sync(
           ^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\lock.py", line 163, in release
    raise ValueError("Lock is not yet acquired")
^^^^^^^^^^^^^^^
ValueError: Lock is not yet acquired

I tried changing the Lock argument to be True instead of the dask.distributed client:

concatRaster.rio.to_raster('concat.tif', tiled=True, lock = True, overwrite=True)

But I get the following error:

2024-03-22 10:21:45,634 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1a0162f93d0>
 0. 1787078771392
>.
Traceback (most recent call last):
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 68, in dumps
    pickler.dump(x)
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object
Traceback (most recent call last):
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 68, in dumps
    pickler.dump(x)
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\serialize.py", line 353, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\serialize.py", line 76, in pickle_dumps
    frames[0] = pickle.dumps(
                ^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "<stdin>", line 52, in test
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_array.py", line 1125, in to_raster
    return RasterioWriter(raster_path=raster_path).to_raster(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_writer.py", line 295, in to_raster
    return dask.array.store(
           ^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\array\core.py", line 1236, in store
    compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\base.py", line 402, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 3255, in get
    futures = self._graph_to_futures(
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 3151, in _graph_to_futures
    header, frames = serialize(ToPickle(dsk), on_error="raise")
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\serialize.py", line 379, in serialize
    raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1a0162f93d0>\n 0. 1787078771392\n>')

It looks like it's related to the dask array map_blocks method.

I'm fairly inexperienced using dask, so any help with deciphering why these error messages are occurring and what I'm doing wrong would be greatly appreciated!

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.