Scaling with Arraylake
For larger datasets, users will want to parallelize the reading, processing, and writing of their data. Arraylake supports several different scalability models for reading and writing your data. To make the best use of Arraylake's ability to scale with your workloads, it's important to choose a model that aligns with your workflow orchestration solution.
For a deeper conceptual explanation of how concurrency works in Arraylake, check out the docs on Concurrency Modes.
Isolated concurrency
The first scalability model Arraylake supports is the most common throughout our docs and demos, and looks something like this:
from arraylake import Client
def init_array(repo):
ny, nx = 100, 1000
shape = (ny, nx)
chunks = (1, nx)
repo.root_group.create("my_array", shape=shape, chunks=chunks)
def write_to_array(repo, j, value):
repo.root_group.my_array[j] = value
# Acquire a handle to the Arraylake repository.
repo = Client().get_repo("earthmover/example-repo")
# Set up a session for modifying the dataset within the repo.
repo.checkout()
# Perform writes against the dataset.
write_to_array(repo, 0, 1)
# Commit your changes.
repo.commit("made some modifications")
This style of workflow has the benefit of being simple and straightforward. It is very well suited to scripts, notebooks and smaller jobs, and it is how most of our users begin experimenting with Arraylake.
However, jobs that process large datasets typically partition the data into regions and delegate the processing of those regions to workers, which work in parallel. If we used this isolated model for a each task, a single job partitioned into e.g. 1000 regions would result in 1000 separate commits.
Aside from the fact that a log of 1000 commits is not very useful to humans, there are also performance implications to issuing so many commits when, really, we just want a single commit to capture each successfully completed job. This is why Arraylake also supports cooperative concurrency.
Cooperative concurrency
You might know that Arraylake uses something called a session
when writing,
which tracks all of the changes made within a given transaction. However,
neither our repo.checkout()
call, nor anything else in the snippet, refer to
them. That's because in this more simple scenario, a session is created for you
implicitly.
You can find a more complete explanation of sessions
here, but simply
put: a session tracks the changes made across a given transaction. Clients can
create, join, and abandon sessions, and sessions also expire automatically after
a configurable period of time. Once a session has been committed -- by calling
repo.commit()
-- further writes to that session are impossible.
Sessions expire after 24 hours by default, but can be configured to last up to
seven (7) days by specifying a value for the expires_in
argument in the
repo.create_session()
, repo.checkout()
, or session.update_expiration()
methods (see below for examples).
To capture large, distributed write jobs within a single commit, Arraylake allows you to explicitly create a session and then share it across multiple writers. This is, in fact, the pattern we recommend for jobs at scale.
Here's how a cooperative concurrent workflow might look:
import asyncio
from datetime import timedelta
# Define the write task for a single worker.
async def process_partition(session_id, region_id)
repo = Client().get_repo("earthmover/example-repo")
# Connect to the same shared session.
session = repo.join_session(session_id=session_id)
# Perform writes against the dataset. Note that commit() is never called
# inside this worker.
try:
write_to_array(repo, region_id, 1)
# If there is an error, abandon the session.
except Exception as e:
session.abandon()
raise e
# Create a new shared session that will expire in one hour if it is not committed.
repo = Client().get_repo("earthmover/example-repo")
session = repo.create_session(message="modifying stuff concurrently", expires_in=timedelta(hours=1))
# Run the job across multiple partitions concurrently.
job_regions = range(100)
await asyncio.gather([process_partition(session.id, region) for region in job_regions])
# Save all the changes in a single commit.
repo.commit("made some *distributed* modifications")
This workflow introduces some new methods on our repo -- create_session()
and
join_session()
-- and they do just what you might expect them to. Users may
optionally pass a message
at session creation to describe the intent of the
session, which can be useful for attribution and debugging. And, if a user
wishes to change the expiry of a session after it has been created (but before
it has expired, been committed, or been abandoned), they can call
session.update_expiration()
.
For example:
# Modify the session expiration.
session.update_expiration(timedelta(hours=2))
Taken all together, these methods can be useful to clients that have no shared
state with the process that spawned them (i.e. they need to instantiate their
own Client
and Repo
), but need to cooperatively write to a shared session.
This scalability model is a natural fit for those orchestration frameworks, like Airflow and Prefect, which assume a certain degree of isolation between each step in a given execution pipeline. But for those who use shared state workflow coordination tools -- like Dask -- Arraylake also supports a shared state mode.
Shared state concurrency
To make use of shared state mode, it's sufficient to write code as if it were
non-cooperative, passing the Repo
instance to each worker after creating a new
session. It's enough to simply checkout()
the Repo
, as this will automatically
fetch a shared session.
import dask
repo = Client().get_or_create_repo("earthmover/example-repo")
repo.checkout(expires_in=timedelta(hours=1))
# Run the job across multiple partitions concurrently.
tasks = [
dask.delayed(write_to_array)(repo, region_id, 1)
for region_id in range(100)
]
dask.compute(tasks)
# Save all the changes in a single commit.
repo.commit("made some *dasky* modifications")
This snippet works because Dask serializes the Repo
-- including the session
token it fetched when it called checkout()
-- before passing the Repo
to all
the workers that will execute the modify_region()
function.
Because the workers already have a copy of the session token, there is no need
to explicitly call join_session()
. They will all simply begin transparently
performing their write operations within the same shared session.
When to use which mode
When using cooperative concurrency to write data, it is crucial that you align your writes with Zarr chunk boundaries. Doing otherwise can lead to potential inconsistencies in your data. Because writes from an individual worker are not tracked in separate commits, Arraylake's version control system will not protect you here.
If you can't guarantee that multiple writers within a single job will not write to overlapping chunks, you should use isolated concurrency instead. This constraint does not apply to reading data. See Concurrency Modes for more details.
How to use shared sessions
Shared, server-managed sessions is a new feature that is now enabled by default. No action is required to make use of this feature. The Arraylake service will still support codebases using clients older than v0.8.0, but support for these legacy clients will end on March 31, 2024.
If you wish to opt out, there are several ways to disable server-managed sessions:
- CLI
- Environment
- Python
# Alter your configuration file.
arraylake config set server_managed_sessions False
# Disable the feature during the current terminal session.
export ARRAYLAKE_SERVER_MANAGED_SESSIONS=False
from arraylake import config
# Temporarily disable the setting from within a running Python process.
config.set({"server_managed_sessions": False})