Skip to main content

Prefect

Prefect Logo

Prefect is an open-source orchestration engine that turns ordinary Python functions into observable, production-grade data pipelines—no DSLs or YAML required.

Arraylake and Prefect work well together—you write your data pipeline as plain Python @flow and @task functions, Prefect handles scheduling, retries, observability, and parallelism, and Arraylake serves as your versioned cloud data lake. Because Arraylake is accessed from inside your Python code, integrating it into a Prefect workflow is mostly a matter of making your Arraylake API token (these begin with "ema_") available to your flow runs.

General pattern

A Prefect flow is a Python function decorated with @flow; tasks are smaller units of work decorated with @task and called from within a flow. The snippet below shows a simple pipeline that reads from Arraylake, computes a result, and writes it back—committing the change as a new Icechunk version.

from prefect import flow, task
import arraylake as al
import xarray as xr

@task
def load_data() -> xr.Dataset:
al_client = al.Client()
repo = al_client.get_repo("my-climate-company/ocean-data")
session = repo.readonly_session("main")
return xr.open_dataset(
session.store,
group="xarray/ocean-temp",
chunks="auto",
)

@task
def write_result(temps: xr.Dataset) -> str:
al_client = al.Client()
repo = al_client.get_repo("my-climate-company/ocean-data")
session = repo.writable_session("main")
temps.to_zarr(session.store, group="xarray/avg-season-temps", mode="w")
return session.commit("wrote seasonal average temps")

@flow
def seasonal_averages():
ds = load_data()
temps = ds.groupby("time.season").mean("temp").compute()
commit_id = write_result(temps)
print(f"committed {commit_id}")

if __name__ == "__main__":
seasonal_averages()

al.Client() reads your credentials from the ARRAYLAKE_TOKEN environment variable. The sections below cover the different ways to supply that token depending on how and where your flow runs.

Specific examples

Running a flow locally

info

Arraylake access: Set the ARRAYLAKE_TOKEN environment variable to your Arraylake API token before running your flow.

The simplest way to get started is to run a flow on your own machine. Set ARRAYLAKE_TOKEN in your environment so that al.Client() can authenticate:

export ARRAYLAKE_TOKEN=ema_XXXXXXXX
python my_flow.py

Inside the flow, connect to Arraylake exactly as you would in any Python script:

from prefect import flow
import arraylake as al

@flow
def list_repos(org: str):
client = al.Client()
repos = client.list_repos(org)
for repo in repos:
print(repo.name)

if __name__ == "__main__":
list_repos("earthmover-demos")

Each flow and task run is tracked in the Prefect UI, giving you logs, timing, and state (success/failure/retry) for every step.