Skip to main content

repo

The Repo module contains the Arraylake classes for interacting with repositories, AsyncRepo and Repo.

The Repo class provides a Zarr-compatible store interface for use with Zarr, Xarray, and other libraries that support the Zarr protocol.

Repos should not be instantiated directly--instead, use the Client and AsyncClient, i.e.

from arraylake import Client
client = Client()
repo = client.get_repo("my-org/my-repo")

LocalWriteSession

Tracks and manages the state of a write session.

update_expiration

async def update_expiration(expires_in: datetime.timedelta)

Update a session's expiration period.

Arguments:

  • expires_in - a timedelta indicating how long after start_time the session should expire (defaults to 24 hours; maximum is 7 days).

abandon

async def abandon()

Discard the session and its associated modifications, if any.

note

The session cannot be used after abandoning. Subsequent attempts to write to an expired session will result in an error.

AsyncRepo

Asynchronous interface to Arraylake repo.

note

Because Zarr does not support asynchronous I/O, the async client cannot be used to read or write Zarr data directly.

__init__

def __init__(metastore_db: MetastoreDatabase, chunkstore: Chunkstore,
name: str, author: Author)

Arguments:

  • metastore_db - A metastore database for storing metadata
  • chunkstore - A chunkstore for storing chunks
  • name - The name of the repo. Purely for display purposes.
  • author - The author name and email for commits

commit_data

async def commit_data(refresh: bool = False) -> CommitData

Returns the CommitData for the current session.

commit_log

async def commit_log() -> CommitLog

Returns the CommitLog for the current session.

status

async def status(limit: int = 1000) -> SessionStatus

Returns the SessionStatus for the current session.

Arguments:

  • limit int - [Optional] The number of modified paths to return. Defaults to 1000, passing 0 is equivalent to setting no limit.

ping

async def ping() -> None

Ping the metastore to confirm connection

list_active_sessions

async def list_active_sessions() -> Sequence[SessionInfo]

Obtain a time-ordered list of active sessions

create_session

async def create_session(
ref: str | CommitID = "main",
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT,
message: str | None = None) -> LocalSession

Create a new session.

Arguments:

  • ref - The branch or commit to start the session from.
  • for_writing - Whether the session is for writing or reading. Defaults to True (for writing).
  • expires_in - The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.
  • message - A description or message annotating the session. Defaults to None.

join_session

async def join_session(session_id: SessionID) -> LocalSession

Join an existing session.

Arguments:

  • session_id - The ID of the session to join.

checkout

async def checkout(
ref: str | CommitID = "main",
session_token: SessionID | None = None,
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT
) -> CommitID | None

Checkout a ref (branch, tag, or commit ID) and initialize a new session.

Arguments:

  • ref - Commit, branch, or tag name.
  • session_token - Shared session ID to join, if applicable. Defaults to None.
  • for_writing - Whether the session is for writing or reading. Defaults to True (for writing).
  • expires_in - The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.

Returns:

  • commit - CommitID

commit

async def commit(
message: str,
auto_ff: bool = True,
checkout_for_writing: bool = DEFAULT_IS_WRITE_SESSION
) -> CommitID | None

Commit this session's changes and start a new session.

Arguments:

  • message - Commit message
  • auto_ff - Whether to automatically fast-forward the repo and retry if the commit fails
  • checkout_for_writing - Whether to checkout the ensuing commit in for_writing mode (default: True)

Returns:

  • new_commit - ID of new commit

fast_forward

async def fast_forward()

Fast-forward the session. Attempts to update the session base commit to the latest branch tip. Will fail if the same paths have been modified in the current session and on the branch.

delete_tag

async def delete_tag(tag_name: str) -> None

Delete a tag.

Arguments:

  • tag_name - Name of tag to delete

new_branch

async def new_branch(branch_name: str) -> None

Create a new branch based on the current session reference

Arguments:

  • branch_name - New branch name

delete_branch

async def delete_branch(branch: str) -> None

Delete a branch.

Arguments:

  • branch - Name of branch to delete

tree

async def tree(prefix: str = "",
*,
depth: int = 10,
filter: str | None = None) -> Tree

Display this repo's hierarchy as a Rich Tree

Arguments:

  • prefix - Path prefix
  • depth - Maximum depth to descend into the hierarchy
  • filter - Optional JMESPath query to filter by

add_virtual_grib

@_write_op
async def add_virtual_grib(grib_uri: str, path: Path, **kwargs) -> None

Add a virtual GRIB2 dataset to the repo.

If the GRIB file contains multiple messages, this method will attempt to concatentate the messages to a single Zarr store, where the data payload of each message in a single chunk.

Warning: This method has only been tested with a limited number of GRIB2 files. Please file an issue for GRIB2 files that don't work as expected.

Arguments:

  • grib_uri - The path to the GRIB2 file. Only s3:// and gs:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

add_virtual_netcdf

@_write_op
async def add_virtual_netcdf(netcdf_uri: str, path: Path, **kwargs) -> None

Add a virtual Netcdf dataset to the repo.

Arguments:

  • netcdf_uri - The path to the netCDF file. Only s3:// and gs:// URIs are supported. Both netCDF4 and netCDF3 files are supported.
  • path - The path within the repo where the virtual dataset should be created.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

add_virtual_zarr

@_write_op
async def add_virtual_zarr(zarr_uri: str, path: Path) -> None

Add a virtual Zarr dataset to the repo.

Arguments:

  • zarr_uri - The path to the Zarr store. Only Zarr V2 stores and s3:// or gs:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created.

add_virtual_tiff

@_write_op
async def add_virtual_tiff(tiff_uri: str, path: Path, name: str,
**kwargs) -> None

Add a virtual TIFF dataset to the repo.

Arguments:

  • tiff_uri - The path to the TIFF file. Only s3:// or gs:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created. Unlike the other virtual functions, this path should include the array name.
  • name - TIFF files contian bare arrays without a name. You must provide one.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

Notes:

Arrays will be ingested with dimension names 'X', 'Y'[, 'band'] TIFFs with overviews will be ingested so that there is one array per overview level named '0', '1', '2', etc.

filter_metadata

async def filter_metadata(filter: str) -> list[str]

Filter repo metadata documents using a JMSE search string.

https://jmespath.org/specification.html

Repo

Synchronous interface to Arraylake repo.

__init__

def __init__(arepo: AsyncRepo)

Initialize a Repo from an initialized AsyncRepo

Arguments:

  • arepo - An existing AsyncRepo

from_metastore_and_chunkstore

@classmethod
def from_metastore_and_chunkstore(cls, metastore_db: MetastoreDatabase,
chunkstore: Chunkstore, name: str,
author: Author) -> Repo

Initialize a Repo from an initialized metastore database and chunkstore

Arguments:

  • metastore_db - A metastore database for storing metadata
  • chunkstore - A chunkstore for storing chunks
  • name - The name of the repo. Purely for display purposes.
  • author - The author name and email for commits

ping

def ping()

Ping the metastore to confirm connection

checkout

def checkout(
ref: str | CommitID = "main",
session_token: SessionID | None = None,
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT) -> CommitID

Checkout a ref (branch, tag, or commit ID) and initialize a new session.

Arguments:

  • ref - Commit, branch, or tag name.
  • session_token - Shared session ID to join, if applicable. Defaults to None.
  • for_writing - Whether the session is for writing or reading. Defaults to True (for writing).
  • expires_in - The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.

Returns:

  • commit - CommitID

commit

def commit(message: str,
auto_ff: bool = True,
checkout_for_writing: bool = DEFAULT_IS_WRITE_SESSION) -> str

Commit this session's changes and start a new session.

Arguments:

  • message - Commit message
  • auto_ff - Whether to automatically fast-forward the repo and retry if the commit fails
  • checkout_for_writing - Whether the session is for writing or reading. Defaults to True (for writing).

Returns:

  • new_commit - ID of new commit

fast_forward

def fast_forward()

Fast-forward the session. Attempts to update the session base commit to the latest branch tip. Will fail if the same paths have been modified in the current session and on the branch.

tag

def tag(tag_name: str,
commit_id: str | CommitID | None = None,
*,
message: str | None = None) -> Tag

Add a new tag.

Arguments:

  • tag_name - Tag name
  • commit_id - Commit to tag

delete_tag

def delete_tag(tag_name: str) -> None

Delete an existing tag.

Arguments:

  • tag_name - Tag name

new_branch

def new_branch(branch: str) -> None

Create a new branch based on the current session reference

Arguments:

  • branch_name - New branch name

delete_branch

def delete_branch(branch: str) -> None

Delete an existing branch.

Arguments:

  • branch - Branch name

list_active_sessions

def list_active_sessions() -> Sequence[SessionInfo]

List all active sessions.

create_session

def create_session(ref: str | CommitID = "main",
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT,
message: str | None = None) -> LocalSession

Create a new session.

Arguments:

  • ref - Commit, branch, or tag name. Defaults to main.
  • for_writing - Whether the session is for writing or reading. Defaults to True (for writing).
  • expires_in - The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.
  • message - Descriptive message for the session. Defaults to None.

join_session

def join_session(session_id: SessionID) -> LocalSession

Join an existing session.

Arguments:

  • session_id - ID of session to join

store

@property
def store() -> ArraylakeStore

Access a Zarr-compatible ArraylakeStore store object for this repo.

Example:

repo = Repo("my_org/my_repo")
group = zarr.open_group(store=repo.store)

root_group

@property
def root_group() -> zarr.Group

Open the Zarr root group of this repo.

Example:

repo = Repo("my_org/my_repo")
group = repo.root_group
group.tree() # visualize group hierarchy

status

def status(limit: int = 1000)

Returns the SessionStatus for the current session.

Arguments:

  • limit int - [Optional] The number of modified paths to return. Defaults to 1000, passing 0 is equivalent to setting no limit.

commit_log

@property
def commit_log()

Returns a log of all commits on the current branch.

tags

@property
def tags()

Returns a list of tag names.

branches

@property
def branches()

Returns a list of branch names.

add_virtual_grib

def add_virtual_grib(grib_uri: str, path: Path) -> None

Add a virtual GRIB2 dataset to the repo.

Arguments:

  • path - The path within the repo where the virtual dataset should be created.
  • grib_uri - The path to the GRIB2 file. Only s3:// or gs:// URIs are supported at the moment.

add_virtual_hdf

def add_virtual_hdf(hdf_uri: str, path: Path) -> None

Add a virtual HDF5 dataset to the arraylake.

Arguments:

  • hdf_uri - The path to the HDF5 file. Only s3:// or gs:// URIs are supported at the moment.
  • path - The path with the repo where the virtual HDF5 dataset should be created.

add_virtual_netcdf

def add_virtual_netcdf(netcdf_uri: str, path: Path, **kwargs) -> None

Add a virtual Netcdf dataset to the repo.

Arguments:

  • netcdf_uri - The path to the netCDF file. Only s3:// or gs:// URIs are supported at the moment. Both netCDF4 and netCDF3 files are supported.
  • path - The path within the repo where the virtual dataset should be created.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

add_virtual_zarr

def add_virtual_zarr(zarr_uri: str, path: Path) -> None

Add a virtual Zarr dataset to the repo.

Arguments:

  • zarr_uri - The path to the Zarr store. Only Zarr V2 stores and s3:// or gs:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created.

add_virtual_tiff

def add_virtual_tiff(tiff_uri: str, path: Path, name: str, **kwargs) -> None

Add a virtual TIFF dataset to the repo.

Arguments:

  • tiff_uri - The path to the TIFF file. Only s3:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created.
  • name - TIFF files contain bare arrays and no name. You must provide one.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

tree

def tree(prefix: str = "",
*,
depth: int = 10,
filter: str | None = None) -> Tree

Display this repo's hierarchy as a Rich Tree

Arguments:

  • prefix - Path prefix
  • depth - Maximum depth to descend into the hierarchy
  • filter - JMESPath query to subset the tree

to_xarray

def to_xarray(group=None, **kwargs) -> xr.Dataset

Open and decode an Xarray dataset from the Zarr-compatible ArraylakeStore.

note

There is no need to specify the zarr_version or engine keyword arguments. They are both set by default in this method.

Arguments:

  • group - path to the Zarr Group to load the xarray.Dataset from
  • **kwargs - additional keyword arguments passed to xarray.open_dataset

Returns:

  • Dataset - xarray.Dataset

filter_metadata

def filter_metadata(filter: str) -> list[str]

Filter repo metadata attributes using a JMSE search string.

Arguments:

  • filter - JMESPath query to subset the tree

Returns:

A list of document paths

The full JMES spec including examples and an interactive console is available here.

Some specific examples of queries:

"flags[0].spec[0] == 'a'"
"flags[0].spec[0:2] == ['a', 'b']"
'flags[0].spec[2] == band'
"contains(keys(@), 'flags') && contains(keys(@), 'band') && flags[0].spec[2] == band"
"someNaN == 'NaN'"
'number >= `3` && number <= `15`'
'"eo:val" == `12`'
'"created:at:time" <= `2022-05-01`'
'(!flags == `5` || flags == `10`) && foo == `10`'

And some specific nuances to be aware of:

  1. NaNs are strings, assert for them as follows:

"someKey == 'NaN'"

The following will not match NaN values:

"someNaN == NaN" "someNaN == NaN"

  1. Comparison of two missing keys is truthy:

The following will return true if both don't exist on the doc, as null == null

'foo == bar'

Here's a safer way to perform this query:

'contains(keys(@), "foo") && contains(keys(@), "bar") && foo == bar'

  1. Keys with special characters should be double quoted

'"eo:val" == 12'

The following will fail

'eo:val == 12'

ArraylakeStore

ArrayLake's Zarr Store interface

This is an implementation of a Zarr V3 Store.

note

This class is not intended to be constructed directly by users. Instead, use the store property on the Repo class.

list_prefix

def list_prefix(prefix: str) -> list[str]

List a prefix in the store

Arguments:

prefix : the path to list

Returns:

A list of document paths

listdir

def listdir(prefix: str) -> list[str]

List a directory in the store

Arguments:

  • prefix - the path to list

Returns:

A list of document paths

__getitem__

def __getitem__(key) -> bytes

Get a value

Arguments:

  • key - the path to get

Returns:

bytes (metadata or chunk)

getitems

def getitems(keys, *, contexts=None, on_error="omit") -> Mapping[str, bytes]

Get multiple items

Arguments:

  • keys - list of paths to get

Returns:

Mapping where keys are paths and values are bytes (metadata or chunks)

__setitem__

def __setitem__(key, value: bytes) -> None

Set a value

Arguments:

  • key - the path to set

Returns:

bytes (metadata or chunk)

setitems

def setitems(items: Mapping[str, bytes]) -> None

Set multiple items

Arguments:

keys : list of paths

Returns:

Mapping where keys are paths and values are bytes (metadata or chunks)

__delitem__

def __delitem__(key)

Delete a key.

Arguments:

  • key - path to delete

delitems

def delitems(keys) -> None

Delete multiple keys

Arguments:

  • keys - list of paths to delete

__contains__

def __contains__(key: str) -> bool

check if key exists in store.

Arguments:

  • key - path to check

keys

def keys() -> list[str]

Return a list of this store's keys

__iter__

def __iter__()

Iterate over this store's keys

erase_prefix

def erase_prefix(prefix)

Erase all keys with the given prefix.

__len__

def __len__() -> int

number of keys in this store

__getattribute__

def __getattribute__(name)

Influence upstream Zarr by failing a hasattr check for get_partial_values.

Doing so forces it to use a preferable code path for retrieving data (getitems). See: https://github.com/zarr-developers/zarr-python/blob/a81db0782535ba04c32c277102a6457d118a73e8/zarr/core.py#L2162-L2171

SessionStatus

Holds the status of a session.

repo_name: str

Name of the repo

session: LocalSession

LocalSession object.

modified_paths: list[tuple[Path, bool]]

List of modified paths and whether they were deleted.