Skip to main content

Ingest and Query ECMWF IFS Forecast

This demo notebook shows how to ingest data from the ECMWF IFS forecast model into Arraylake. We then query the data via Python and via Flux APIs.

Define Download Functions

These functions download GRIB files and open them as a single Xarray dataset for each forecast.

import arraylake
import pandas as pd
import xarray as xr
import s3fs
import tempfile
from datetime import datetime, timedelta
from tqdm import tqdm
from dask.diagnostics import ProgressBar
from concurrent.futures import ThreadPoolExecutor, as_completed


def make_url(forecast_time: datetime, step_hour: int) -> str:
date_str = forecast_time.strftime("%Y%m%d")
z_str = forecast_time.strftime("%H")
time_str = forecast_time.strftime("%H%M%S")
step_str = f"{step_hour}h"
return f"s3://ecmwf-forecasts/{date_str}/{z_str}z/ifs/0p25/oper/{date_str}{time_str}-{step_str}-oper-fc.grib2"


def get_step(forecast_time: datetime, step_hour: int) -> xr.Dataset:
s3 = s3fs.S3FileSystem()
url = make_url(forecast_time, step_hour)
# print(f"downloading {url}")
with tempfile.NamedTemporaryFile() as tf:
try:
s3.download(url, tf.name)
except FileNotFoundError as e:
raise FileNotFoundError(url) from e
ds = xr.open_dataset(
tf.name,
engine="cfgrib",
decode_timedelta=True,
backend_kwargs={"filter_by_keys": {"typeOfLevel": "surface"}},
)
ds = ds.expand_dims(("time", "step")) # triggers loading
ds = ds.drop_vars("valid_time")
ds = ds[set(ds) - {"sdor", "slor", "z"}] # drop variables not present in each file
ds = ds.chunk()
ds.time.attrs["axis"] = "T"
ds.latitude.attrs["axis"] = "Y"
ds.longitude.attrs["axis"] = "X"
return ds


def get_dataset(forecast_date: datetime, hours: list[int]) -> xr.Dataset:
with ThreadPoolExecutor() as executor:
all_futures = [executor.submit(get_step, forecast_date, hour) for hour in hours]
datasets = [
fut.result()
for fut in tqdm(as_completed(all_futures), total=len(all_futures))
]
datasets.sort(key=lambda ds: ds.step.values)
ds = xr.concat(datasets, dim="step")
ds = ds.chunk({"step": 5, "latitude": 240, "longitude": 360})
return ds

Create a New Arraylake Repo

client = arraylake.Client()
client
Output
Client(service_uri='https://api.earthmover.io')
repo = client.create_repo("earthmover-demos/ecmwf-ifs-oper")
repo
Output
<icechunk.repository.Repository at 0x7fdb24913140>

Download Data

forecast_date = datetime(2025, 1, 1, 0, 0, 0)

# these are the hours when forecasts are available
hours = list(range(0, 144 + 1, 3)) # + list(range(150, 360+1, 6))
ds = get_dataset(forecast_date, hours)
print(ds)
Output
100%|██████████| 49/49 [00:29<00:00,  1.65it/s]
Output
<xarray.Dataset> Size: 3GB
Dimensions: (time: 1, step: 49, latitude: 721, longitude: 1440)
Coordinates:
* time (time) datetime64[ns] 8B 2025-01-01
* step (step) timedelta64[ns] 392B 0 days 00:00:00 ... 6 days 00:00:00
surface float64 8B 0.0
* latitude (latitude) float64 6kB 90.0 89.75 89.5 ... -89.5 -89.75 -90.0
* longitude (longitude) float64 12kB -180.0 -179.8 -179.5 ... 179.5 179.8
Data variables: (12/15)
strd (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
skt (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
nsss (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
str (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
ssrd (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
ptype (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
... ...
lsm (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
asn (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
ssr (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
sp (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
ewss (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
ro (time, step, latitude, longitude) float32 203MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
Attributes:
GRIB_edition: 2
GRIB_centre: ecmf
GRIB_centreDescription: European Centre for Medium-Range Weather Forecasts
GRIB_subCentre: 0
Conventions: CF-1.7
institution: European Centre for Medium-Range Weather Forecasts
history: 2025-07-15T18:43 GRIB to CDM+CF via cfgrib-0.9.1...

Ingest to Arraylake

with repo.transaction("main", message="wrote initial dataset") as store:
with ProgressBar():
ds.to_zarr(store, zarr_format=3, consolidated=False)
Output
[########################################] | 100% Completed | 13.59 s

The repo is now visible in the Arraylake web app at https://app.earthmover.io/earthmover-demos/ecmwf-ifs-oper

Download and Append Another Forecast

ds = get_dataset(datetime(2025, 1, 2, 0, 0, 0), hours)
with repo.transaction("main", message="appended a forecast") as store:
with ProgressBar():
ds.to_zarr(
store, zarr_format=3, consolidated=False, mode="a", append_dim="time"
)
Output
100%|██████████| 49/49 [00:24<00:00,  1.97it/s]
Output
[########################################] | 100% Completed | 13.64 ss

Query with Python

We can now easily query back our data at lightning speed.

ds_al = xr.open_zarr(
repo.readonly_session("main").store, decode_timedelta=True, consolidated=False
)
print(ds_al)
Output
<xarray.Dataset> Size: 6GB
Dimensions: (time: 2, step: 49, latitude: 721, longitude: 1440)
Coordinates:
* latitude (latitude) float64 6kB 90.0 89.75 89.5 ... -89.5 -89.75 -90.0
* longitude (longitude) float64 12kB -180.0 -179.8 -179.5 ... 179.5 179.8
* step (step) timedelta64[ns] 392B 0 days 00:00:00 ... 6 days 00:00:00
surface float64 8B ...
* time (time) datetime64[ns] 16B 2025-01-01 2025-01-02
Data variables: (12/15)
skt (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
ptype (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
nsss (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
sp (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
ewss (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
ro (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
... ...
lsm (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
ssrd (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
tp (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
tprate (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
strd (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
unknown (time, step, latitude, longitude) float32 407MB dask.array<chunksize=(1, 5, 240, 360), meta=np.ndarray>
Attributes:
GRIB_edition: 2
GRIB_centre: ecmf
GRIB_centreDescription: European Centre for Medium-Range Weather Forecasts
GRIB_subCentre: 0
Conventions: CF-1.7
institution: European Centre for Medium-Range Weather Forecasts
history: 2025-07-15T18:43 GRIB to CDM+CF via cfgrib-0.9.1...
ds_al.sel(time="2025-01-01", step=timedelta(hours=6)).tprate.plot(robust=True)
Output
<matplotlib.collections.QuadMesh at 0x7fd988799f40>

png

ds_al.sp.sel(time="2025-01-01", longitude=-50, latitude=25, method="nearest").plot()
Output
[<matplotlib.lines.Line2D at 0x7fd9803f0200>]

png

Query with Flux

We can also easily view our data with Flux APIs.

from ipyleaflet import Map, WMSLayer

url = (
"https://compute.earthmover.io/v1/services/wms/earthmover-demos/ecmwf-ifs-oper/main/wms?"
"version=1.3.0&service=WMS&request=GetMap&crs=EPSG:3857&time=2025-01-01T00:00:00Z&step=12+hr&colorscalerange=0,0.001"
)
m = Map(center=(52, 360), zoom=2)
wms = WMSLayer(
url=url,
layers="tprate",
styles="raster/Blues",
opacity=0.8,
)
m.add(wms)
m
Output
Map(center=[52, 360], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title', 'zoom_out_te…
url = (
"https://compute.earthmover.io/v1/services/edr/earthmover-demos/ecmwf-ifs-oper/main/edr/position?"
"coords=POINT(-50%2025)&time=2025-01-01T00:00:00&parameter-name=sp&f=csv"
)
df = pd.read_csv(url)
print(df)
Output
          time             step  latitude  longitude          sp  surface  \
0 2025-01-01 0 days 00:00:00 25.0 -50.0 101553.810 0.0
1 2025-01-01 0 days 03:00:00 25.0 -50.0 101568.630 0.0
2 2025-01-01 0 days 06:00:00 25.0 -50.0 101468.125 0.0
3 2025-01-01 0 days 09:00:00 25.0 -50.0 101494.340 0.0
4 2025-01-01 0 days 12:00:00 25.0 -50.0 101721.830 0.0
5 2025-01-01 0 days 15:00:00 25.0 -50.0 101738.530 0.0
6 2025-01-01 0 days 18:00:00 25.0 -50.0 101570.375 0.0
7 2025-01-01 0 days 21:00:00 25.0 -50.0 101641.440 0.0
8 2025-01-01 1 days 00:00:00 25.0 -50.0 101780.380 0.0
9 2025-01-01 1 days 03:00:00 25.0 -50.0 101812.060 0.0
10 2025-01-01 1 days 06:00:00 25.0 -50.0 101724.740 0.0
11 2025-01-01 1 days 09:00:00 25.0 -50.0 101719.470 0.0
12 2025-01-01 1 days 12:00:00 25.0 -50.0 101914.630 0.0
13 2025-01-01 1 days 15:00:00 25.0 -50.0 101942.960 0.0
14 2025-01-01 1 days 18:00:00 25.0 -50.0 101803.500 0.0
15 2025-01-01 1 days 21:00:00 25.0 -50.0 101879.440 0.0
16 2025-01-01 2 days 00:00:00 25.0 -50.0 102050.060 0.0
17 2025-01-01 2 days 03:00:00 25.0 -50.0 102049.170 0.0
18 2025-01-01 2 days 06:00:00 25.0 -50.0 101875.580 0.0
19 2025-01-01 2 days 09:00:00 25.0 -50.0 101889.750 0.0
20 2025-01-01 2 days 12:00:00 25.0 -50.0 102070.300 0.0
21 2025-01-01 2 days 15:00:00 25.0 -50.0 102075.890 0.0
22 2025-01-01 2 days 18:00:00 25.0 -50.0 101928.050 0.0
23 2025-01-01 2 days 21:00:00 25.0 -50.0 101992.780 0.0
24 2025-01-01 3 days 00:00:00 25.0 -50.0 102101.550 0.0
25 2025-01-01 3 days 03:00:00 25.0 -50.0 102077.770 0.0
26 2025-01-01 3 days 06:00:00 25.0 -50.0 101913.640 0.0
27 2025-01-01 3 days 09:00:00 25.0 -50.0 101871.240 0.0
28 2025-01-01 3 days 12:00:00 25.0 -50.0 102043.800 0.0
29 2025-01-01 3 days 15:00:00 25.0 -50.0 101967.590 0.0
30 2025-01-01 3 days 18:00:00 25.0 -50.0 101766.770 0.0
31 2025-01-01 3 days 21:00:00 25.0 -50.0 101785.370 0.0
32 2025-01-01 4 days 00:00:00 25.0 -50.0 101906.766 0.0
33 2025-01-01 4 days 03:00:00 25.0 -50.0 101812.310 0.0
34 2025-01-01 4 days 06:00:00 25.0 -50.0 101636.960 0.0
35 2025-01-01 4 days 09:00:00 25.0 -50.0 101625.000 0.0
36 2025-01-01 4 days 12:00:00 25.0 -50.0 101773.070 0.0
37 2025-01-01 4 days 15:00:00 25.0 -50.0 101740.850 0.0
38 2025-01-01 4 days 18:00:00 25.0 -50.0 101529.200 0.0
39 2025-01-01 4 days 21:00:00 25.0 -50.0 101573.140 0.0
40 2025-01-01 5 days 00:00:00 25.0 -50.0 101673.484 0.0
41 2025-01-01 5 days 03:00:00 25.0 -50.0 101674.410 0.0
42 2025-01-01 5 days 06:00:00 25.0 -50.0 101538.670 0.0
43 2025-01-01 5 days 09:00:00 25.0 -50.0 101510.090 0.0
44 2025-01-01 5 days 12:00:00 25.0 -50.0 101739.600 0.0
45 2025-01-01 5 days 15:00:00 25.0 -50.0 101742.610 0.0
46 2025-01-01 5 days 18:00:00 25.0 -50.0 101571.250 0.0
47 2025-01-01 5 days 21:00:00 25.0 -50.0 101641.984 0.0
48 2025-01-01 6 days 00:00:00 25.0 -50.0 101788.420 0.0

spatial_ref
0 0
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
10 0
11 0
12 0
13 0
14 0
15 0
16 0
17 0
18 0
19 0
20 0
21 0
22 0
23 0
24 0
25 0
26 0
27 0
28 0
29 0
30 0
31 0
32 0
33 0
34 0
35 0
36 0
37 0
38 0
39 0
40 0
41 0
42 0
43 0
44 0
45 0
46 0
47 0
48 0
df.plot(x="step", y="sp")
Output
<Axes: xlabel='step'>

png

[Optional] Clean Up by Deleting Repo

! al repo delete earthmover-demos/ecmwf-ifs-oper --confirm
Output
✓ Deleting repo earthmover-demos/ecmwf-ifs-oper...succeeded
[?25hRepo earthmover-demos/ecmwf-ifs-oper removed from Arraylake.
The underlying Icechunk bucket must be deleted manually.