Chunk data and parallelize computation#
TL;DR#
icclim make use of dask to chunk and parallelize computations. You can configure dask to limit its memory footprint and CPU usage by instantiating a distributed Cluster and by tuning dask.config options. A configuration working well for small to medium dataset and simple climate indices could be:
import dask
from distributed import Client
client = Client(memory_limit="10GB", n_workers=1, threads_per_worker=8)
dask.config.set({"array.slicing.split_large_chunks": False})
dask.config.set({"array.chunk-size": "100 MB"})
icclim.index(in_files="data.nc", indice_name="SU", out_file="output.nc")
The art of chunking#
import xarray
import icclim
ds = xarray.open_dataset("data.nc")
ds = ds.chunk({"time": 10, "lat": 20, "lon": 20})
And then use ds
dataset object as input for icclim.
icclim.index(in_files=ds, indice_name="SU", out_file="output.nc")
In that case, icclim will not re-chunk ds
data. It is left to you to
find the proper chunking. For more information on how to properly chunk
see:
xarray guide: https://xarray.pydata.org/en/stable/user-guide/dask.html#chunking-and-performance
dask best practices: https://docs.dask.org/en/stable/array-best-practices.html
Another option is to leave dask find the best chunking for each dimension. This is the recommended way and the default behavior of icclim when using file a path in in_files parameter. In this case, chunking can still be controlled by limiting the size of each individual chunk:
import dask
dask.config.set({"array.chunk-size": "50 MB"})
icclim.su(in_files="data.nc")
By default, the dask chunk-size is around 100MB. You can also use
with
python keyword if you don’t want this configuration to spread
globally. Internally, icclim will ask xarray and dask to chunk using
"auto"
configuration. This usually results in a pretty good
chunking. It chunks by respecting as much as possible how the data is
stored and will make chunks as large as approximately the configured
chunk-size. Moreover, some operations in icclim/xclim need to re-chunk
intermediary results. We usually try to keep the chunk sizes to their
initial value but re-chunking is costly and we sometimes prefer to
generate larger chunks to try improving performances. If you wish to
avoid this large chunking behavior, you can try the following dask
configuration:
dask.config.set({"array.slicing.split_large_chunks": True})
Create an optimized chunking on disk#
Sometimes, you have to work with data that were originally chunked and stored in a way that is suboptimal. Often climate data are stored in a one year per file format thus the natural chunking of the dataset will be one year per chunk. The most efficient way to read data on disk would be to chunk data in memory the same way it is distributed on disk. Here it means having one chunk per file, as long as a file size does not exceed the array.chunk-size configuration.
This scattering of the time axis in many chunks can limit the computation performances of some indices. Indeed, indices such as percentile based indices require a specific chunking schema to be computed. This means we must either rechunk in memory to have an optimized chunking. However, this generates many dask tasks and can overload dask scheduler.
To tackle this issue, icclim 5.1.0 comes with a new feature to first
rewrite the data on disk before starting any computation. We rely on the
rechunker library to make this
possible. The feature is icclim.create_optimized_zarr_store
. It is a
context manager which allow you to rewrite an input data into a zarr
store. Zarr stores are a modern way to store files on disk with
optimized reading and writing in mind. In our case, it allows to
rewrite files with a specific chunking schema, optimized for climate
indices computation.
Now, depending on the climate index you want to compute, the optimal chunking schema might differ.
For most indices, if you consider chunking on time dimension, you should
never chunk below the target resampling period. For example, with
slice_mode="month"
, ideally each chunk should include whole month
and never chunk in the middle of a month. But month being of variable
lengths, it might actually be much easier to have one chunk per year.
Leap years would add another difficulty to this.
However, on indices where a bootstrapping of percentile is necessary (e.g
Tg90p), it is actually optimal to have no chunk at all on time
dimension. This is true only because the bootstrapping algorithm rely on
map_block.
In that case, you can use icclim.create_optimized_zarr_store
to
first create a zarr store not chunked at all on time dimension:
import icclim
ref_period = [datetime.datetime(1980, 1, 1), datetime.datetime(2009, 12, 31)]
with icclim.create_optimized_zarr_store(
in_files="netcdf_files/tas.nc",
var_names="tas",
target_zarr_store_name="opti.zarr",
keep_target_store=False,
chunking={"time": -1, "lat": "auto", "lon": "auto"},
) as opti_tas:
icclim.index(
index_name="TG90p",
in_files=opti_tas,
slice_mode="YS",
base_period_time_range=ref_period,
out_file="netcdf_files/output/tg90p.nc",
)
Actually chunking={“time”: -1, “lat”:”auto”, “lon”:”auto” }, which avoid chunking on time, is the default behavior of the function. chunking parameter could thus be omitted in the above example.
You can also control if you want to keep the optimized zarr store on
disk by turning keep_target_store
to True. This can be useful if you
wish to compute other indices using the same chunking.
On performances#
Computation of ECA&D indices can largely be done in parallel on spatial dimensions. Indeed, the ECA&D indices available in icclim are all computed on each individual pixel independently. In a ideal world it means we could compute each pixel concurrently. In reality this would result in considerable efforts necessary to chunk data that much, this would be sub-optimal because the smaller chunk are, the greater dask overhead is.
Note
By overhead, we mean here the necessary python code running to move around and handle each independent chunk.
slice_mode
parameter.Hence, there are multiple things to consider to maximize performances.
ds = xarray.open_dataset("data.nc")
icclim.index(in_files=ds, indice_name="SU", out_file="output.nc")
There will be no parallelization but, on small dataset it’s unnecessary. Numpy is natively really fast and dask overhead may slow it downs.
On the other hand when using dask we must:
Minimize the number of task to speed things up, thus divide data into large enough chunks.
Minimize the workload of each worker to avoid i/o operation, thus divide data into small enough chunks.
In the following we present a few possible configuration for dask.
Small to medium dataset (a few MB) - No configuration#
The first approach is to use default values. By default icclim relies on
dask’s "auto"
chunking and dask will be started with the threading
scheduler. This scheduler runs everything in the existing python process
and will spawn multiple threads to concurrently compute climate indices.
You can find more information on the default scheduler here:
https://docs.dask.org/en/stable/scheduling.html#local-threads
Note
To control the “in base” period,
icclim.index
provides thebase_period_time_range
parameter.To control the “out of base” period,
icclim.index
provides thetime_range
parameter.
For these percentile based indices, we recommend to use one of the following configuration.
Medium to large dataset (~200MB) - dask LocalCluster#
By default, dask will run on a default threaded scheduler. This behavior can be overwritten by creating you own “cluster” running locally on your machine. This LocalCluster is distributed in a separate dask package called “distributed” and is not a mandatory dependency of icclim.
To install it run:
$ conda install dask distributed -c conda-forge
See the documentation for more details: http://distributed.dask.org/en/stable/
Once installed, you can delegate the LocalCluster
instantiation
to the distributed.Client class. This Client
object creates both a
LocalCluster
and a web application to investigate how your
computation is going. This web dashboard is very powerful and helps to
understand where are the computation bottlenecks as well as to visualize
how dask is working. By default it runs on localhost:8787
, you can
print the client object to see on which port it runs.
from distributed import Client
client = Client()
print(client)
By default dask creates a LocalCluster
with 1 worker (process), CPU
count threads and a memory limit up to the system available memory.
Note
You can see how dask counts CPU here: dask/dask
How dask measures available memory here: dask/distributed
Depending on your OS, these values are not exactly computed the same way.
The cluster can be configured directly through Client arguments.
client = Client(memory_limit="16GB", n_workers=1, threads_per_worker=8)
A few notes:
The CLient must be started in the same python interpreter as the computation. This is how dask know which scheduler to use.
If needed, the localCluster can be started independently and the Client connected to a running LocalCluster. See: http://distributed.dask.org/en/stable/client.html
Each worker is an independent python process and memory_limit is set for each of these processes. So, if you have 16GB of RAM don’t set
memory_limit='16GB'
unless you run a single worker.Memory sharing is much more efficient between threads than between processes (workers), see dask doc
On a single worker, a good threads number should be a multiple of your CPU cores (usually *2).
All threads of the same worker are idle whenever one of the thread is reading or writing on disk.
It’s useless to spawn too many threads, there are hardware limits on how many of them can run concurrently and if they are too numerous, the OS will waste time orchestrating them.
A dask worker may write to disk some of its data even if the memory limit is not reached. This seems to be a normal behavior happening when dask knows some intermediary results will not be used soon. However, this can significantly slow down the computation due to i/o.
Percentiles based indices may need up to nb_thread * chunk_size * 30 memory which is unusually high for a dask application. We are trying to reduce this memory footprint but it means some costly re-chunking in the middle of computation have to be made.
Knowing all these, we can consider a few scenarios.
Low memory footprint#
Let’s suppose you want to compute indices on your laptop while continue to work on other subjects. You should configure your local cluster to use not too many threads and processes and to limit the amount of memory each process (worker) has available. On my 4 cores, 16GB of RAM laptop I would consider:
client = Client(memory_limit="10GB", n_workers=1, threads_per_worker=4)
Eventually, to reduce the amount of i/o on disk we can also increase dask memory thresholds:
dask.config.set({"distributed.worker.memory.target": "0.8"})
dask.config.set({"distributed.worker.memory.spill": "0.9"})
dask.config.set({"distributed.worker.memory.pause": "0.95"})
dask.config.set({"distributed.worker.memory.terminate": "0.98"})
These thresholds are fractions of memory_limit used by dask to take a decision.
At 80% of memory the worker will write to disk its unmanaged memory.
At 90%, the worker will write all its memory to disk.
At 95%, the worker pause computation to focus on writing to disk.
At 98%, the worker is killed to avoid reaching memory limit.
Increasing these thresholds is risky. The memory could be filled quicker than expected resulting in a killed worker and thus loosing all work done by this worker. If a single worker is running and it is killed, the whole computation will be restarted (and will likely reach the same memory limit).
High resources use#
If you want to have the result as quickly as possible it’s a good idea to give dask all possible resources. This may render your computer “laggy” thought. On my 4 cores (8 CPU threads), 16GB of RAM laptop I would consider:
client = Client(memory_limit="16GB", n_workers=1, threads_per_worker=8)
On this kind of configuration, it can be useful to add 1 or 2 workers in
case a lot of i/o is necessary. If there are multiple workers
memory_limit
should be reduced accordingly. It can also be necessary
to reduce chunk size. dask default value is around 100 MB per chunk
which on some complex indices may result in a large memory usage.
It’s over 9000!#
This configuration may put your computer to its knees, use it at your
own risk. The idea is to bypass all memory safety implemented by dask.
This may yield very good performances because there will be no i/o on
disk by dask itself. However, when your OS run out of RAM, it will use
your disk swap which is sort of similar to dask spilling mechanism but
probably much slower. And if you run out of swap, your computer will
likely crash. To roll the dices use the following configuration
memory_limit='0'
in :
client = Client(memory_limit="0")
Dask will spawn a worker with multiple threads without any memory limits.
Large to huge dataset (1GB and above)#
If you wish to compute climate indices of a large datasets, a personal computer may not be appropriate. In that case you can deploy a real dask cluster as opposed to the LocalCluster seen before. You can find more information on how to deploy dask cluster here: https://docs.dask.org/en/stable/scheduling.html#dask-distributed-cluster
However, if you must run your computation on limited resources, you can try to:
Use only one or two threads on a single worker. This will drastically slow down the computation but very few chunks will be in memory at once letting you use quite large chunks.
Use small chunk size, but beware the smaller they are, the more dask creates tasks thus, the more complex the dask graph becomes.
Rechunk your dataset into a zarr storage to optimize file reading and reduce the amount of rechunking tasks needed by dask. For this, you should consider the Pangeo rechunker library to ease this process: https://rechunker.readthedocs.io/en/latest/ A shorthand to Pangeo rechunker is available in icclim with icclim.create_optimized_zarr_store.
Split your data into smaller netcdf inputs and run the computation multiple times.
The last point is the most frustrating option because chunking is supposed to do exactly that. But, sometimes it can be easier to chunk “by hand” than to find the exact configuration that fit for the input dataset.
Real example#
On CMIP6 data, when computing the percentile based indices Tx90p for 20 years and, bootstrapping on 19 years we use:
client = Client(memory_limit="16GB", n_workers=1, threads_per_worker=2)
dask.config.set({"array.slicing.split_large_chunks": False})
dask.config.set({"array.chunk-size": "100 MB"})
dask.config.set({"distributed.worker.memory.target": "0.8"})
dask.config.set({"distributed.worker.memory.spill": "0.9"})
dask.config.set({"distributed.worker.memory.pause": "0.95"})
dask.config.set({"distributed.worker.memory.terminate": "0.98"})
Troubleshooting and dashboard analysis#
This section describe common warnings and errors that dask can raise.
There are also some silent issues that only dask dashboard can expose.
To start the dashboard, run the distributed Client(...)
. It should
be available on localhost:8787
as a web application (type
“localhost:8787” in your browser address bar to access it).
Memory overload#
The warning may be "distributed.nanny - WARNING - Restarting worker"
or the error "KilledWorker"
. This means the computation uses more
memory than what is available for the worker. Keep in mind that:
memory_limit
parameter is a limit set for each individual worker.Some indices, such as percentile based indices (R__p, R__pTOT, T_90p, T_10p families) may use large amount of memory. This is especially true on temperature based indices where percentiles are bootstrapped.
You can reduce memory footprint by using smaller chunks.
Each thread may load multiple chunks in memory at once.
Client(memory_limit="16GB")
.Client(n_workers=1, threads_per_worker=1)
. This may slow down
computation.dask.config.set({"array.chunk-size": "50 MB"})
, default is around
100MB.Garbage collection “wasting” CPU time#
distributed.utils_perf - WARNING - full
garbage collections took xx% CPU time recently (threshold: 10%)
distributed.worker - WARNING -
gc.collect() took 1.259s. This is usually a sign that some tasks
handle too many Python objects at the same time. Rechunking the work
into smaller tasks might help.
Internal re-chunking#
PerformanceWarning: Increasing number of
chunks by factor of xx
.Computation never starts#
CancelledError
.dask.config.set({"array.chunk-size": "200 MB"})
.Client(n_workers=1, threads_per_worker=2)
.Note
Beware, if the computation is fast or if the client is not started in the same python process as icclim, the dashboard may also look empty but the computation is actually running.
Disk read and write analysis - Dashboard#
localhost:8787
.
You can increase total available memory with
Client(memory_limit="16GB")
.You can decrease memory pressure by reducing chunk size with
dask.config.set({"array.chunk-size": "50 MB"})
or by reducing number of threads withClient(n_workers=1, threads_per_worker=2)
.
Note
Don’t instantiate multiple client with different configurations, put everything in the same Client constructor call.
Beware, as of icclim 5.0.0, the bootstrapping of percentiles is known to produce a lot of i/o.
Worker chatterbox syndrome - Dashboard#
Too many workers are spawned for the amount of work.
The load balancer has a lot of work to do.
cal_perc
function used to compute percentiles.Idle threads#
Client(n_workers=1, threads_per_worker=4)
.Conclusion#
Note
This document has been rewritten in january 2022 and the whole stack under icclim is evolving rapidly.
Some information presented here might become outdated quickly.