Skip to main content

Coiled

Coiled Logo

Built on Dask, a parallel computing library, Coiled makes it easy to use Python on the cloud.

Arraylake and Coiled work well together—you can use Coiled to manage your cloud infrastructure, run your computations in parallel with Dask, and use Arraylake as your cloud data lake platform. Coiled provides four interfaces for initializing resources:

  • Dask clusters
  • Serverless functions
  • CLI jobs
  • Jupyter notebooks

General pattern

The code snippet below demonstrates a general pattern of how to use Coiled and Arraylake in a simple workflow.

import coiled
import arraylake as al
import xarray as xr

cluster = coiled.Cluster(
n_workers=100, # Start 100 machines on AWS, GCP or Azure
)
dask_client = cluster.get_client()

# Connect to Arraylake by specifying 'organization/repo'
al_client = al.Client()
repo = al_client.get_repo("my-climate-company/ocean-data")

#read array data from Arraylake
ds = xr.open_dataset(
repo.store,
group="xarray/ocean-temp",
chunks="auto", # Use Dask for parallelism
)

# Run your computation in parallel on the cloud
temps = ds.groupby("time.season").mean("temp").compute()

# Write result to Arraylake
temps.to_zarr(
repo.store,
group="xarray/avg-season-temps",
engine="zarr"
)

Specific examples

The following sections detail how to use Arraylake with the different Coiled APIs. To start, you will need an Arraylake API token (these begin with "ema_").

Dask cluster

info

Arraylake access: Set ARRAYLAKE_TOKEN environment variable to your Arraylake API token.

Dask is a general purpose library for parallel computing that is closely integrated with the PyData ecosystem (Zarr, Xarray, GeoTIFF, etc.) to scale out your workflows. Coiled deploys Dask clusters on the cloud.

Parallelize workflows involving Arraylake by spinning up a Dask cluster with a set number of workers. Before initializing cluster, set the ARRAYLAKE_TOKEN environment variable with your API token in order to credential into Arraylake. The following example demonstrates initiating a cluster of Dask workers, reading a dataset, and writing it as a Zarr data cube to an Arraylake Repo. In a Python session:

import coiled
import arraylake as al
import xarray as xr

cluster = coiled.Cluster(n_workers=10)

This will prompt Coiled to create a cluster of Dask workers:

╭───────────────────────── Package Sync for arraylake ─────────────────────────╮
│ Fetching latest package priorities ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 │
│ Scanning 208 conda packages ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 │
│ Scanning 338 python packages ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 │
│ Running pip check ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:02 │
│ Validating environment ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:03 │
│ Creating wheel for arraylake ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:07 │
│ Creating wheel for arraylake-mongo-metastore ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:06 │
│ Uploading arraylake ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 │
│ Uploading arraylake-mongo-metastore ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 │
│ Requesting package sync build ━━━━━━━━━━━━━━━━━━━━━━━ 0:00:00 │
╰──────────────────────────────────────────────────────────────────────────────╯
╭──────────────────────────────── Package Info ────────────────────────────────╮
│ ╷ │
│ Package │ Note │
│ ╶───────────────────────────┼──────────────────────────────────────────────╴ │
│ arraylake │ Wheel built from │
│ │ ~/Desktop/earthmover/arraylake/client │
│ arraylake-mongo-metastore │ Wheel built from │
│ │ ~/Desktop/earthmover/arraylake/mongo-metasto │
│ │ re │
│ ╵ │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─────────────────────────────── Coiled Cluster ───────────────────────────────╮
│ https://cloud.coiled.io/clusters/537310?account=dask │
╰──────────────────────────────────────────────────────────────────────────────╯
╭────────────── Overview ──────────────╮╭─────────── Configuration ────────────╮
│ ││ │
│ Name: dask-1b0d5c8f ││ Region: us-east-2 │
│ ││ │
│ Scheduler Status: started ││ Scheduler: m6i.xlarge │
│ ││ │
│ Dashboard: ││ Workers: m6i.xlarge (2) │
│ https://cluster-upast.dask.host?toke ││ │
│ n=U-3fkZ5GRwezON1C ││ Workers Requested: 2 │
│ ││ │
╰──────────────────────────────────────╯╰──────────────────────────────────────╯
╭───────────────────────── (2024/07/26 12:54:51 MDT) ──────────────────────────╮
│ │
│ All workers ready. │
│ │
│ │
╰──────────────────────────────────────────────────────────────────────────────╯

Once all workers are ready, we can connect the cluster to the Dask client, connect to the Arraylake client, and begin working with our data.

dask_client = cluster.get_client()
al_client = al.Client()
repo = al_client.get_or_create_repo('earthmover/coiled_example')

ds = xr.tutorial.open_dataset('air_temperature').chunk(
{'time': 1000, 'lat': 5, 'lon': 5}
)
ds
<xarray.Dataset> Size: 31MB
Dimensions: (lat: 25, time: 2920, lon: 53)
Coordinates:
* lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0
* lon (lon) float32 212B 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0
* time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
air (time, lat, lon) float64 31MB ...
Attributes:
Conventions: COARDS
title: 4x daily NMC reanalysis (1948)
description: Data is from NMC initialized reanalysis\n(4x/day). These a...
platform: Model
references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...

We read an air temperature dataset as an xr.Dataset. Next, write the data to the Arraylake repo:

ds.to_zarr(
repo.store,
group = 'xr_tutorial_ds/air_temperature',
zarr_version=3,
mode='w'
)

We've now written the data, but these changes won't be visible without committing the change to our Arraylake repo.

repo.commit('wrote air temp data to repo')
66a3f2093f5775e7440d3e62