-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance of GRIB files read with dask and dask.distributed can be improved. #33
Comments
I don't know whether this could be flagged as issue but I've noticed some weird behaviour trying to use I'm running some benchmarks on GRIB data containing output of global models with different resolutions (ranging from 80 km to 2.5 km) written with compression. The shape of the array that I'm trying to process ranges within (~300 timesteps, ~80 levels, ~8e5-80e6 points). My initial idea was to speed up the processing of data by parallelising instead than using Serial (not parallel) approach¶In [1]:
import xarray as xr In [15]:
%%time dss = xr.open_mfdataset('/work/ka1081/DYAMOND/ICON-80km/nwp_R2B05_lkm1008_atm_3d_tot_qc_dia_ml_*.grb', engine='cfgrib') CPU times: user 1min 40s, sys: 982 ms, total: 1min 41s Wall time: 1min 48s In [16]:
%%time dss.unknown.mean(axis=2).compute() CPU times: user 3min 4s, sys: 22.5 s, total: 3min 26s Wall time: 3min 13s Out[16]:
<xarray.DataArray 'unknown' (time: 321, generalVerticalLayer: 77)> array([[0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.542062e-06, 1.236650e-06, 1.581882e-06], [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.137736e-06, 7.532420e-07, 4.611456e-07], [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.473110e-06, 8.896610e-07, 5.851762e-07], ..., [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 9.012958e-07, 5.529808e-07, 4.198118e-07], [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.179397e-06, 7.647039e-07, 6.093065e-07], [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.571679e-06, 1.017117e-06, 8.398871e-07]], dtype=float32) Coordinates: step timedelta64[ns] 00:00:00 * generalVerticalLayer (generalVerticalLayer) int64 14 15 16 17 ... 88 89 90 * time (time) datetime64[ns] 2016-08-01 ... 2016-09-10 valid_time (time) datetime64[ns] 2016-08-01 ... 2016-09-10 Parallel Approach¶In [1]:
import json data=[] with open('/work/mh0731/m300382/dask/scheduler.json') as f: data = json.load(f) scheduler_address=data['address'] from dask.distributed import Client, progress client = Client(scheduler_address) client Out[1]:
In [2]:
import xarray as xr In [4]:
%%time dss = xr.open_mfdataset('/work/ka1081/DYAMOND/ICON-80km/nwp_R2B05_lkm1008_atm_3d_tot_qc_dia_ml_*.grb', engine='cfgrib', parallel=True) CPU times: user 659 ms, sys: 218 ms, total: 877 ms Wall time: 11.6 s In [6]:
%%time dss.unknown.mean(axis=2).compute() CPU times: user 275 ms, sys: 23 ms, total: 298 ms Wall time: 11.4 s Out[6]:
<xarray.DataArray 'unknown' (time: 321, generalVerticalLayer: 77)> array([[0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.542062e-06, 1.236650e-06, 1.581882e-06], [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.137736e-06, 7.532420e-07, 4.611456e-07], [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.473110e-06, 8.896610e-07, 5.851762e-07], ..., [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 9.012958e-07, 5.529808e-07, 4.198118e-07], [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.179397e-06, 7.647039e-07, 6.093065e-07], [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.571679e-06, 1.017117e-06, 8.398871e-07]], dtype=float32) Coordinates: step timedelta64[ns] 00:00:00 * generalVerticalLayer (generalVerticalLayer) int64 14 15 16 17 ... 88 89 90 * time (time) datetime64[ns] 2016-08-01 ... 2016-09-10 valid_time (time) datetime64[ns] 2016-08-01 ... 2016-09-10 However, when trying to process in the same way data from the 20 km resolution model, which should still be quite lightweight, the computation hangs and some messages are printed in the logs of dask workers.
This causes also the scheduler to hangs and crash. I'm wondering whether it has to do with the fact that data is also compressed, and that maybe decompressing and parallelizing at the same time is just too much, or whether the I'm testing this on our cluster where |
@guidocioni I didn't know the In order to enable chunked (thus parallel) processing in
|
Yes, the Furthermore I've also used |
My guess is that for the 20km you have 321 files with one Is it a public dataset that I can see? How many files do you have? What size is every file? How are they structured? |
For the 20km-resolution model 3D variables are written in It would be really nice to share these files to test the |
It looks like it is a memory issue... By refining the chunks I was able to run the computation for a while before the job crashed due to limited memory. Maybe I just have to refine the way I ask for the resources on the cluster size. Thus, I don't think this is strictly |
Small update..apparently using the same approach with import xarray as xr
from glob import glob
import dask.array as da
chunk_space=1114111
chunk_time=50
main_folder='/work/bm0834/k203095/ICON_LEM_DE/'
file_prefix='2d_cloud_day'
domain='DOM01'
folders=['20130620-default-ifs']
variable='rain_gsp_rate'
filenames=[]
for folder in folders:
filenames=filenames+(sorted(glob(main_folder+folder+'/DATA/'+file_prefix+'_'+domain+'*')))
temps=[xr.open_dataset(fn).variables[variable] for fn in filenames]
arrays=[da.from_array(t, chunks=(chunk_time,chunk_space)) for t in temps]
var_normal=da.concatenate(arrays, axis=0)
mean=var_normal.mean(axis=1)
mean=mean.persist() This completes in about 1 minute since the dataset is not that big (1080 timesteps, 2323968 points equals to about total 2.5e9 individual values). However if I just try to use the same approach with chunk_space=1000000
chunk_levels=77
chunk_time=10
main_folder='/work/ka1081/DYAMOND/ICON-80km/'
variables=['u']
filenames=[]
for variable in variables:
file_prefix='nwp_R2B05_lkm1008_atm_3d_'+variable+'_ml_'
filenames=filenames+sorted(glob(main_folder+file_prefix+'*.grb'))
temps=[xr.open_dataset(fn, engine='cfgrib').variables[variable] for fn in filenames[0:-2]]
arrays=[da.from_array(t, chunks=(chunk_time,chunk_levels,chunk_space)) for t in temps]
var_normal=da.concatenate(arrays, axis=0)
mean = var_normal.mean(axis=2)
mean = mean.persist() and I'm using exactly the same session with the same Honestly I don't know where the problem is...with this dataset the individual chunks of data should be so small not to create any problem. I'm attaching the LOG file from the |
The log file contains the following errors from ecCodes:
which is probably what causes the problems, but also:
so we don't see the messages from cfgrib itself. I'm not sure how to enable cfgrib logging in dask workers so I cannot debug more that this at the moment. Anyway, I'll be working on performance and especially performance under dask.distributed early next year. |
Hi @alexamici I am trying to open a 320mb file on a HPC system where bandwidth is not an issue for this size of file. Forming the index takes a really long time considering the size of the file. Time scale is in mins not seconds 7mins to open a 320mb file. It's much slower that nio which also has to create an index from nothing. In my use case, pre-generating index files is not really possible and anyway, it would still be computationally expensive to do it just once. It's a heterogeneous file with many variables and many index keys. It seems like the specific getitem calls are a large part of the expense of the indexing cost. Perhaps there is some gains to be made here before diving into the Dask optimization. See the profile (incidentally from Dask) below. the largest chunk of index is getitem. I am currently using the cfgrib.open_datasets interface and doing the Dask multi file stuff myself because I have heterogeneous files. I thought before profiling that it was the merging and sorting costing the time, but it's really just indexing the files. Regarding dask optimization. My experience is that creating single processor clients yields the best performance. This is probably due to the classic python not releasing the GIL multithreading issue. A solution to that would also make life a lot easier down the road when you want to use the data and workers for a multithreaded process, like training a neural net. Perhaps this is helpful? Thanks for taking the time to read. :) I would love to hear any suggestions to improve the performance from the outside or alternatively I would also be very excited to hear if you managed to find a way of getting some improvements. |
Chiming in again as I gave another go to dask these days :) Just to test I wanted to compute yearly averages from era5 monthly data. I have 2 grib files with different time periods downloaded from the Copernicus hub. import xarray as xr
from dask.distributed import Client
client = Client(n_workers=4)
ds = xr.open_mfdataset('Downloads/adaptor.mars.internal-*.grib',
chunks={"time": 10},
parallel=True) results = ds.resample({'time':'1Y'}).mean().compute() Takes about 40 seconds and the average CPU usage (I have 6 core i7-8700) sits at about 60-70%. The same with cdo (base) guidocioni@Guidos-iMac:~/Downloads$ cdo yearavg -mergetime adaptor.mars.internal-*.grib test.grib
cdo(1) mergetime: Process started
cgribexGetTsteptype: Time range indicator 133 unsupported, set to 0!
cdo(1) mergetime: Processed 5601830400 values from 2 variables over 864 timesteps.
cdo yearavg: Processed 5601830400 values from 1 variable over 935 timesteps [36.85s 262MB]. takes about 36 seconds with a CPU usage of just 10%. Is this huge difference in performance really related only to the fact that CDO is highly optimized and written directly in C? To be fair, I think |
Support for dask and dask.distributed is currently quite good from the point of view of the correctness. We are able to perform complex operations on 320Gb of data from 20 files on a 10 VM x 8 core dask.distributed cluster with no problem.
Performance is not stellar and more work is needed in this respect.
The text was updated successfully, but these errors were encountered: