Skip to main content

Scaling with Arraylake

For larger datasets, users will want to parallelize the reading, processing, and writing of their data. Arraylake supports several different scalability models for reading and writing your data. To make the best use of Arraylake's ability to scale with your workloads, it's important to choose a model that aligns with your workflow orchestration solution.

For a deeper conceptual explanation of how concurrency works in Arraylake, check out the docs on Concurrency Modes.

Isolated concurrency

The first scalability model Arraylake supports is the most common throughout our docs and demos, and looks something like this:

from arraylake import Client


def init_array(repo):
ny, nx = 100, 1000
shape = (ny, nx)
chunks = (1, nx)
repo.root_group.create("my_array", shape=shape, chunks=chunks)

def write_to_array(repo, j, value):
repo.root_group.my_array[j] = value

# Acquire a handle to the Arraylake repository.
repo = Client().get_repo("earthmover/example-repo")

# Set up a session for modifying the dataset within the repo.
repo.checkout()

# Perform writes against the dataset.
write_to_array(repo, 0, 1)

# Commit your changes.
repo.commit("made some modifications")

This style of workflow has the benefit of being simple and straightforward. It is very well suited to scripts, notebooks and smaller jobs, and it is how most of our users begin experimenting with Arraylake.

However, jobs that process large datasets typically partition the data into regions and delegate the processing of those regions to workers, which work in parallel. If we used this isolated model for a each task, a single job partitioned into e.g. 1000 regions would result in 1000 separate commits.

Aside from the fact that a log of 1000 commits is not very useful to humans, there are also performance implications to issuing so many commits when, really, we just want a single commit to capture each successfully completed job. This is why Arraylake also supports cooperative concurrency.

Cooperative concurrency

You might know that Arraylake uses something called a session when writing, which tracks all of the changes made within a given transaction. However, neither our repo.checkout() call, nor anything else in the snippet, refer to them. That's because in this more simple scenario, a session is created for you implicitly.

info

You can find a more complete explanation of sessions here, but simply put: a session tracks the changes made across a given transaction. Clients can create, join, and abandon sessions, and sessions also expire automatically after a configurable period of time. Once a session has been committed -- by calling repo.commit() -- further writes to that session are impossible.

Sessions expire after 24 hours by default, but can be configured to last up to seven (7) days by specifying a value for the expires_in argument in the repo.create_session(), repo.checkout(), or session.update_expiration() methods (see below for examples).

To capture large, distributed write jobs within a single commit, Arraylake allows you to explicitly create a session and then share it across multiple writers. This is, in fact, the pattern we recommend for jobs at scale.

Here's how a cooperative concurrent workflow might look:

import asyncio
from datetime import timedelta


# Define the write task for a single worker.
async def process_partition(session_id, region_id)
repo = Client().get_repo("earthmover/example-repo")

# Connect to the same shared session.
session = repo.join_session(session_id=session_id)

# Perform writes against the dataset. Note that commit() is never called
# inside this worker.
try:
write_to_array(repo, region_id, 1)

# If there is an error, abandon the session.
except Exception as e:
session.abandon()
raise e


# Create a new shared session that will expire in one hour if it is not committed.
repo = Client().get_repo("earthmover/example-repo")
session = repo.create_session(message="modifying stuff concurrently", expires_in=timedelta(hours=1))

# Run the job across multiple partitions concurrently.
job_regions = range(100)
await asyncio.gather([process_partition(session.id, region) for region in job_regions])

# Save all the changes in a single commit.
repo.commit("made some *distributed* modifications")

This workflow introduces some new methods on our repo -- create_session() and join_session() -- and they do just what you might expect them to. Users may optionally pass a message at session creation to describe the intent of the session, which can be useful for attribution and debugging. And, if a user wishes to change the expiry of a session after it has been created (but before it has expired, been committed, or been abandoned), they can call session.update_expiration().

For example:

# Modify the session expiration.
session.update_expiration(timedelta(hours=2))

Taken all together, these methods can be useful to clients that have no shared state with the process that spawned them (i.e. they need to instantiate their own Client and Repo), but need to cooperatively write to a shared session.

This scalability model is a natural fit for those orchestration frameworks, like Airflow and Prefect, which assume a certain degree of isolation between each step in a given execution pipeline. But for those who use shared state workflow coordination tools -- like Dask -- Arraylake also supports a shared state mode.

Shared state concurrency

To make use of shared state mode, it's sufficient to write code as if it were non-cooperative, passing the Repo instance to each worker after creating a new session. It's enough to simply checkout() the Repo, as this will automatically fetch a shared session.

import dask


repo = Client().get_or_create_repo("earthmover/example-repo")
repo.checkout(expires_in=timedelta(hours=1))

# Run the job across multiple partitions concurrently.
tasks = [
dask.delayed(write_to_array)(repo, region_id, 1)
for region_id in range(100)
]
dask.compute(tasks)

# Save all the changes in a single commit.
repo.commit("made some *dasky* modifications")

This snippet works because Dask serializes the Repo -- including the session token it fetched when it called checkout() -- before passing the Repo to all the workers that will execute the modify_region() function.

Because the workers already have a copy of the session token, there is no need to explicitly call join_session(). They will all simply begin transparently performing their write operations within the same shared session.

When to use which mode

When using cooperative concurrency to write data, it is crucial that you align your writes with Zarr chunk boundaries. Doing otherwise can lead to potential inconsistencies in your data. Because writes from an individual worker are not tracked in separate commits, Arraylake's version control system will not protect you here.

If you can't guarantee that multiple writers within a single job will not write to overlapping chunks, you should use isolated concurrency instead. This constraint does not apply to reading data. See Concurrency Modes for more details.

How to use shared sessions

info

Shared, server-managed sessions is a new feature that is now enabled by default. No action is required to make use of this feature. The Arraylake service will still support codebases using clients older than v0.8.0, but support for these legacy clients will end on March 31, 2024.

If you wish to opt out, there are several ways to disable server-managed sessions:

# Alter your configuration file.
arraylake config set server_managed_sessions False