We describe an approach for efficiently embedding non-Dask algorithms into Dask processing pipeline. By constructing large contiguous memory array incrementally from a Dask graph we were able to achieve significant peak memory reductions. We have used this approach to generate cloud-free Sentinel-2 Geometric Median and Median Absolute Deviations mosaics over Africa (10m res).
We have used Dask to compute cloud-free Sentinel-2 Geometric Median and Median Absolute Deviations mosaics over the whole continent of Africa. The geomedian product combines measurements collected in each year to produce one representative, multispectral measurement for every 10 x 10 square metre unit of the African continent.
Geomedian is an iterative algorithm and is not Dask "native", it requires all the pixels across all the bands and time slices to be present in memory at once for reasonable performance. Data loading side of the problem however maps nicely to Dask processing model. Loading image data from S3, reconciling duplicated observations, removing pixels classified as cloud and finally projection change all implemented as lazy Dask transforms. There is significant data parallelism in the data loading section.
We faced significant challenges with memory usage early on in the project. Peak memory consumption was highly unpredictable as it depends on scheduling decisions, which themselves were hard to predict or control. We solved this problem by implementing an in-memory data store that we populate with pixel data using dask.array.store
method. Essentially this allows us to perform transpose and re-chunk operations with much greater efficiency and with significantly more predictable scheduling and peak memory usage. The primary limitation of this approach is that all the data needs to be produced on or copied to one large memory worker. This limitation was not a concern for us as we were using single local Dask worker anyway. Concurrency across multiple compute nodes is achieved by running many single-worker Dask tasks concurrently using Kubernetes Jobs.