Prefect
Prefect is an open-source orchestration engine that turns ordinary Python functions into observable, production-grade data pipelines—no DSLs or YAML required.
Arraylake and Prefect work well together—you write your data pipeline as plain Python @flow and @task functions, Prefect handles scheduling, retries, observability, and parallelism, and Arraylake serves as your versioned cloud data lake. Because Arraylake is accessed from inside your Python code, integrating it into a Prefect workflow is mostly a matter of making your Arraylake API token (these begin with "ema_") available to your flow runs.
General pattern
A Prefect flow is a Python function decorated with @flow; tasks are smaller units of work decorated with @task and called from within a flow. The snippet below shows a simple pipeline that reads from Arraylake, computes a result, and writes it back—committing the change as a new Icechunk version.
from prefect import flow, task
import arraylake as al
import xarray as xr
@task
def load_data() -> xr.Dataset:
al_client = al.Client()
repo = al_client.get_repo("my-climate-company/ocean-data")
session = repo.readonly_session("main")
return xr.open_dataset(
session.store,
group="xarray/ocean-temp",
chunks="auto",
)
@task
def write_result(temps: xr.Dataset) -> str:
al_client = al.Client()
repo = al_client.get_repo("my-climate-company/ocean-data")
session = repo.writable_session("main")
temps.to_zarr(session.store, group="xarray/avg-season-temps", mode="w")
return session.commit("wrote seasonal average temps")
@flow
def seasonal_averages():
ds = load_data()
temps = ds.groupby("time.season").mean("temp").compute()
commit_id = write_result(temps)
print(f"committed {commit_id}")
if __name__ == "__main__":
seasonal_averages()
al.Client() reads your credentials from the ARRAYLAKE_TOKEN environment variable. The sections below cover the different ways to supply that token depending on how and where your flow runs.
Specific examples
- Local flow run
- Secret blocks
- Deployments & work pools
- Parallelism with Dask
Running a flow locally
Arraylake access: Set the ARRAYLAKE_TOKEN environment variable to your Arraylake API token before running your flow.
The simplest way to get started is to run a flow on your own machine. Set ARRAYLAKE_TOKEN in your environment so that al.Client() can authenticate:
export ARRAYLAKE_TOKEN=ema_XXXXXXXX
python my_flow.py
Inside the flow, connect to Arraylake exactly as you would in any Python script:
from prefect import flow
import arraylake as al
@flow
def list_repos(org: str):
client = al.Client()
repos = client.list_repos(org)
for repo in repos:
print(repo.name)
if __name__ == "__main__":
list_repos("earthmover-demos")
Each flow and task run is tracked in the Prefect UI, giving you logs, timing, and state (success/failure/retry) for every step.
Secret blocks
Arraylake access: Store your token in a Prefect Secret block and load it at runtime.
Rather than relying on an environment variable, you can store your Arraylake token securely in Prefect using a Secret block. Secret values are encrypted at rest in your Prefect backend, which makes this a good choice for deployments running on Prefect Cloud or a shared work pool.
First, save the secret once (from a Python session, or via the Prefect UI):
from prefect.blocks.system import Secret
Secret(value="ema_XXXXXXXX").save("arraylake-token")
Then load it inside your flow and pass it to the Arraylake client. Setting the environment variable in-process keeps al.Client() working without any extra configuration:
import os
from prefect import flow, task
from prefect.blocks.system import Secret
import arraylake as al
@task
def write_data():
import zarr
client = al.Client()
repo = client.get_or_create_repo("earthmover/prefect_example")
session = repo.writable_session("main")
zarr.create_array(
session.store, name="foo/data", shape=100, chunks=10, dtype="f4", fill_value=0,
)
return session.commit("created array via Prefect")
@flow
def ingest():
token = Secret.load("arraylake-token")
os.environ["ARRAYLAKE_TOKEN"] = token.get()
commit_id = write_data()
print(f"committed {commit_id}")
if __name__ == "__main__":
ingest()
Deployments and work pools
Arraylake access: Pass ARRAYLAKE_TOKEN through the deployment's job_variables.env.
To run flows on a schedule or trigger them via the API, you package them as a deployment that executes on a work pool (Docker, Kubernetes, a process pool, etc.). Environment variables for the flow-run infrastructure are set with job_variables, so this is where you supply your Arraylake token.
Using the .deploy() method—reading the token from your local environment so it isn't hard-coded:
import os
from my_flow import seasonal_averages
if __name__ == "__main__":
seasonal_averages.from_source(
source="https://github.com/my-org/my-pipelines.git",
entrypoint="flows/seasonal.py:seasonal_averages",
).deploy(
name="seasonal-averages",
work_pool_name="my-k8s-pool",
job_variables={
"env": {"ARRAYLAKE_TOKEN": os.environ["ARRAYLAKE_TOKEN"]},
},
)
Or declaratively in prefect.yaml:
deployments:
- name: seasonal-averages
entrypoint: flows/seasonal.py:seasonal_averages
work_pool:
name: my-k8s-pool
job_variables:
env:
ARRAYLAKE_TOKEN: "{{ $ARRAYLAKE_TOKEN }}"
For a more secure setup, combine this with a Secret block (see the Secret blocks tab) and load the token inside the flow instead of injecting it as a plain environment variable.
Parallelism with Dask
Arraylake access: Set the ARRAYLAKE_TOKEN environment variable on each Dask worker.
Prefect can fan tasks out across a Dask cluster using the prefect-dask integration. Assign a DaskTaskRunner to your flow and use .submit() to schedule tasks for parallel execution. Install it with pip install prefect-dask.
Make sure ARRAYLAKE_TOKEN is set in the environment of each Dask worker (for example, via the cluster's worker environment, or a Coiled cluster's environ) so that al.Client() can authenticate on the workers.
from prefect import flow, task
from prefect_dask import DaskTaskRunner
import arraylake as al
@task
def write_chunk(i: int):
import zarr
client = al.Client()
repo = client.get_or_create_repo("earthmover/prefect_example")
session = repo.writable_session("main")
arr = zarr.open_array(session.store, path="foo/data", mode="r+")
arr[i] = i
return session.commit(f"wrote chunk {i}")
@flow(task_runner=DaskTaskRunner())
def parallel_writes():
futures = [write_chunk.submit(i) for i in range(10)]
return [f.result() for f in futures]
if __name__ == "__main__":
parallel_writes()
To run on an existing cluster instead of a temporary local one, pass its scheduler address:
@flow(task_runner=DaskTaskRunner(address="tcp://my-scheduler:8786"))
def parallel_writes():
...
Concurrent writers should coordinate their commits—see the Arraylake and Icechunk docs on conflict resolution and distributed writes before having many workers commit to the same branch.