Skip to main content

repo

The Repo module contains the Arraylake classes for interacting with repositories, AsyncRepo and Repo.

The Repo class provides a Zarr-compatible store interface for use with Zarr, Xarray, and other libraries that support the Zarr protocol.

Repos should not be instantiated directly--instead, use the Client and AsyncClient, i.e.

from arraylake import Client
client = Client()
repo = client.get_repo("my-org/my-repo")

LocalWriteSession​

Tracks and manages the state of a write session.

update_expiration​

async def update_expiration(expires_in: datetime.timedelta)

Update a session's expiration period.

Arguments:

  • expires_in - a timedelta indicating how long after start_time the session should expire (defaults to 24 hours; maximum is 7 days).

abandon​

async def abandon()

Discard the session and its associated modifications, if any.

note

The session cannot be used after abandoning. Subsequent attempts to write to an expired session will result in an error.

AsyncRepo​

Asynchronous interface to Arraylake repo.

note

Because Zarr does not support asynchronous I/O, the async client cannot be used to read or write Zarr data directly.

__init__​

def __init__(metastore_db: MetastoreDatabase, chunkstore: Chunkstore,
name: str, author: Author)

Arguments:

  • metastore_db - A metastore database for storing metadata
  • chunkstore - A chunkstore for storing chunks
  • name - The name of the repo. Purely for display purposes.
  • author - The author name and email for commits

commit_data​

async def commit_data(refresh: bool = False) -> CommitData

Returns the CommitData for the current session.

commit_log​

async def commit_log() -> CommitLog

Returns the CommitLog for the current session.

status​

async def status(limit: int = 1000) -> SessionStatus

Returns the SessionStatus for the current session.

Arguments:

  • limit int - [Optional] The number of modified paths to return. Defaults to 1000, passing 0 is equivalent to setting no limit.

ping​

async def ping() -> None

Ping the metastore to confirm connection

list_active_sessions​

async def list_active_sessions() -> Sequence[SessionInfo]

Obtain a time-ordered list of active sessions

create_session​

async def create_session(
ref: str | CommitID = "main",
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT,
message: str | None = None) -> LocalSession

Create a new session.

Arguments:

  • ref - The branch or commit to start the session from.
  • for_writing - Whether the session is for writing or reading. Defaults to True (for writing).
  • expires_in - The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.
  • message - A description or message annotating the session. Defaults to None.

join_session​

async def join_session(session_id: SessionID) -> LocalSession

Join an existing session.

Arguments:

  • session_id - The ID of the session to join.

checkout​

async def checkout(
ref: str | CommitID = "main",
session_token: SessionID | None = None,
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT
) -> CommitID | None

Checkout a ref (branch, tag, or commit ID) and initialize a new session.

Arguments:

  • ref - Commit, branch, or tag name.
  • session_token - Shared session ID to join, if applicable. Defaults to None.
  • for_writing - Whether the session is for writing or reading. Defaults to True (for writing).
  • expires_in - The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.

Returns:

  • commit - CommitID

commit​

async def commit(
message: str,
auto_ff: bool = True,
checkout_for_writing: bool = DEFAULT_IS_WRITE_SESSION
) -> CommitID | None

Commit this session's changes and start a new session.

Arguments:

  • message - Commit message
  • auto_ff - Whether to automatically fast-forward the repo and retry if the commit fails
  • checkout_for_writing - Whether to checkout the ensuing commit in for_writing mode (default: True)

Returns:

  • new_commit - ID of new commit

fast_forward​

async def fast_forward()

Fast-forward the session. Attempts to update the session base commit to the latest branch tip. Will fail if the same paths have been modified in the current session and on the branch.

delete_tag​

async def delete_tag(tag_name: str) -> None

Delete a tag.

Arguments:

  • tag_name - Name of tag to delete

new_branch​

async def new_branch(branch_name: str) -> None

Create a new branch based on the current session reference

Arguments:

  • branch_name - New branch name

delete_branch​

async def delete_branch(branch: str) -> None

Delete a branch.

Arguments:

  • branch - Name of branch to delete

tree​

async def tree(prefix: str = "",
*,
depth: int = 10,
filter: str | None = None) -> Tree

Display this repo's hierarchy as a Rich Tree

Arguments:

  • prefix - Path prefix
  • depth - Maximum depth to descend into the hierarchy
  • filter - Optional JMESPath query to filter by

add_virtual_grib​

@_write_op
async def add_virtual_grib(grib_uri: str, path: Path, **kwargs) -> None

Add a virtual GRIB2 dataset to the repo.

If the GRIB file contains multiple messages, this method will attempt to concatentate the messages to a single Zarr store, where the data payload of each message in a single chunk.

Warning: This method has only been tested with a limited number of GRIB2 files. Please file an issue for GRIB2 files that don't work as expected.

Arguments:

  • grib_uri - The path to the GRIB2 file. Only s3:// and gs:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

add_kerchunk_references​

@_write_op
async def add_kerchunk_references(refs: dict[str, Any] | str, path: Path,
**kwargs) -> None

Add a kerchunk references to the repo.

Arguments:

  • refs - A dictionary of kerchunk references or a path to a JSON file.

  • path - The path within the repo where the virtual dataset should be created.

  • kwargs - Additional arguments to pass to fsspec.open (only used when refs is a path to a JSON file).

  • Note - Kerchunk version 1 is supported unless the templates or gen keys are present.

add_virtual_netcdf​

@_write_op
async def add_virtual_netcdf(netcdf_uri: str, path: Path, **kwargs) -> None

Add a virtual Netcdf dataset to the repo.

Arguments:

  • netcdf_uri - The path to the netCDF file. Only s3:// and gs:// URIs are supported. Both netCDF4 and netCDF3 files are supported.
  • path - The path within the repo where the virtual dataset should be created.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

add_virtual_zarr​

@_write_op
async def add_virtual_zarr(zarr_uri: str, path: Path) -> None

Add a virtual Zarr dataset to the repo.

Arguments:

  • zarr_uri - The path to the Zarr store. Only Zarr V2 stores and s3:// or gs:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created.

add_virtual_tiff​

@_write_op
async def add_virtual_tiff(tiff_uri: str, path: Path, name: str,
**kwargs) -> None

Add a virtual TIFF dataset to the repo.

Arguments:

  • tiff_uri - The path to the TIFF file. Only s3:// or gs:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created. Unlike the other virtual functions, this path should include the array name.
  • name - TIFF files contian bare arrays without a name. You must provide one.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

Notes:

Arrays will be ingested with dimension names 'X', 'Y'[, 'band'] TIFFs with overviews will be ingested so that there is one array per overview level named '0', '1', '2', etc.

filter_metadata​

async def filter_metadata(filter: str) -> list[str]

Filter repo metadata documents using a JMSE search string.

https://jmespath.org/specification.html

Repo​

Synchronous interface to Arraylake repo.

__init__​

def __init__(arepo: AsyncRepo)

Initialize a Repo from an initialized AsyncRepo

Arguments:

  • arepo - An existing AsyncRepo

from_metastore_and_chunkstore​

@classmethod
def from_metastore_and_chunkstore(cls, metastore_db: MetastoreDatabase,
chunkstore: Chunkstore, name: str,
author: Author) -> Repo

Initialize a Repo from an initialized metastore database and chunkstore

Arguments:

  • metastore_db - A metastore database for storing metadata
  • chunkstore - A chunkstore for storing chunks
  • name - The name of the repo. Purely for display purposes.
  • author - The author name and email for commits

ping​

def ping()

Ping the metastore to confirm connection

checkout​

def checkout(
ref: str | CommitID = "main",
session_token: SessionID | None = None,
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT) -> CommitID

Checkout a ref (branch, tag, or commit ID) and initialize a new session.

Arguments:

  • ref - Commit, branch, or tag name.
  • session_token - Shared session ID to join, if applicable. Defaults to None.
  • for_writing - Whether the session is for writing or reading. Defaults to True (for writing).
  • expires_in - The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.

Returns:

  • commit - CommitID

commit​

def commit(message: str,
auto_ff: bool = True,
checkout_for_writing: bool = DEFAULT_IS_WRITE_SESSION) -> str

Commit this session's changes and start a new session.

Arguments:

  • message - Commit message
  • auto_ff - Whether to automatically fast-forward the repo and retry if the commit fails
  • checkout_for_writing - Whether the session is for writing or reading. Defaults to True (for writing).

Returns:

  • new_commit - ID of new commit

fast_forward​

def fast_forward()

Fast-forward the session. Attempts to update the session base commit to the latest branch tip. Will fail if the same paths have been modified in the current session and on the branch.

tag​

def tag(tag_name: str,
commit_id: str | CommitID | None = None,
*,
message: str | None = None) -> Tag

Add a new tag.

Arguments:

  • tag_name - Tag name
  • commit_id - Commit to tag

delete_tag​

def delete_tag(tag_name: str) -> None

Delete an existing tag.

Arguments:

  • tag_name - Tag name

new_branch​

def new_branch(branch: str) -> None

Create a new branch based on the current session reference

Arguments:

  • branch_name - New branch name

delete_branch​

def delete_branch(branch: str) -> None

Delete an existing branch.

Arguments:

  • branch - Branch name

list_active_sessions​

def list_active_sessions() -> Sequence[SessionInfo]

List all active sessions.

create_session​

def create_session(ref: str | CommitID = "main",
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT,
message: str | None = None) -> LocalSession

Create a new session.

Arguments:

  • ref - Commit, branch, or tag name. Defaults to main.
  • for_writing - Whether the session is for writing or reading. Defaults to True (for writing).
  • expires_in - The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.
  • message - Descriptive message for the session. Defaults to None.

join_session​

def join_session(session_id: SessionID) -> LocalSession

Join an existing session.

Arguments:

  • session_id - ID of session to join

store​

@property
def store()

Access a Zarr-compatible ArraylakeStore store object for this repo.

Example:

repo = Repo("my_org/my_repo")
group = zarr.open_group(store=repo.store)

root_group​

@property
def root_group() -> zarr.Group

Open the Zarr root group of this repo.

Example:

repo = Repo("my_org/my_repo")
group = repo.root_group
group.tree() # visualize group hierarchy

status​

def status(limit: int = 1000)

Returns the SessionStatus for the current session.

Arguments:

  • limit int - [Optional] The number of modified paths to return. Defaults to 1000, passing 0 is equivalent to setting no limit.

commit_log​

@property
def commit_log()

Returns a log of all commits on the current branch.

tags​

@property
def tags()

Returns a list of tag names.

branches​

@property
def branches()

Returns a list of branch names.

add_kerchunk_references​

def add_kerchunk_references(refs: dict[str, Any] | str, path: Path,
**kwargs) -> None

Add a kerchunk references to the repo.

Arguments:

  • refs - A dictionary of kerchunk references.
  • path - The path within the repo where the virtual dataset should be created.
  • kwargs - Additional arguments to pass to fsspec.open (only used when refs is a path to a JSON file).

add_virtual_grib​

def add_virtual_grib(grib_uri: str, path: Path) -> None

Add a virtual GRIB2 dataset to the repo.

Arguments:

  • path - The path within the repo where the virtual dataset should be created.
  • grib_uri - The path to the GRIB2 file. Only s3:// or gs:// URIs are supported at the moment.

add_virtual_hdf​

def add_virtual_hdf(hdf_uri: str, path: Path) -> None

Add a virtual HDF5 dataset to the arraylake.

Arguments:

  • hdf_uri - The path to the HDF5 file. Only s3:// or gs:// URIs are supported at the moment.
  • path - The path with the repo where the virtual HDF5 dataset should be created.

add_virtual_netcdf​

def add_virtual_netcdf(netcdf_uri: str, path: Path, **kwargs) -> None

Add a virtual Netcdf dataset to the repo.

Arguments:

  • netcdf_uri - The path to the netCDF file. Only s3:// or gs:// URIs are supported at the moment. Both netCDF4 and netCDF3 files are supported.
  • path - The path within the repo where the virtual dataset should be created.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

add_virtual_zarr​

def add_virtual_zarr(zarr_uri: str, path: Path) -> None

Add a virtual Zarr dataset to the repo.

Arguments:

  • zarr_uri - The path to the Zarr store. Only Zarr V2 stores and s3:// or gs:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created.

add_virtual_tiff​

def add_virtual_tiff(tiff_uri: str, path: Path, name: str, **kwargs) -> None

Add a virtual TIFF dataset to the repo.

Arguments:

  • tiff_uri - The path to the TIFF file. Only s3:// URIs are supported at the moment.
  • path - The path within the repo where the virtual dataset should be created.
  • name - TIFF files contain bare arrays and no name. You must provide one.
  • kwargs - Additional arguments to pass to the kerchunk file format backend. Do not pass storage_options or inline_threshold.

tree​

def tree(prefix: str = "",
*,
depth: int = 10,
filter: str | None = None) -> Tree

Display this repo's hierarchy as a Rich Tree

Arguments:

  • prefix - Path prefix
  • depth - Maximum depth to descend into the hierarchy
  • filter - JMESPath query to subset the tree

to_xarray​

def to_xarray(group=None, **kwargs) -> xr.Dataset

Open and decode an Xarray dataset from the Zarr-compatible ArraylakeStore.

note

There is no need to specify the zarr_version or engine keyword arguments. They are both set by default in this method.

Arguments:

  • group - path to the Zarr Group to load the xarray.Dataset from
  • **kwargs - additional keyword arguments passed to xarray.open_dataset

Returns:

  • Dataset - xarray.Dataset

filter_metadata​

def filter_metadata(filter: str) -> list[str]

Filter repo metadata attributes using a JMSE search string.

Arguments:

  • filter - JMESPath query to subset the tree

Returns:

A list of document paths

The full JMES spec including examples and an interactive console is available here.

Some specific examples of queries:

"flags[0].spec[0] == 'a'"
"flags[0].spec[0:2] == ['a', 'b']"
'flags[0].spec[2] == band'
"contains(keys(@), 'flags') && contains(keys(@), 'band') && flags[0].spec[2] == band"
"someNaN == 'NaN'"
'number >= `3` && number <= `15`'
'"eo:val" == `12`'
'"created:at:time" <= `2022-05-01`'
'(!flags == `5` || flags == `10`) && foo == `10`'

And some specific nuances to be aware of:

  1. NaNs are strings, assert for them as follows:

"someKey == 'NaN'"

The following will not match NaN values:

"someNaN == NaN" "someNaN == NaN"

  1. Comparison of two missing keys is truthy:

The following will return true if both don't exist on the doc, as null == null

'foo == bar'

Here's a safer way to perform this query:

'contains(keys(@), "foo") && contains(keys(@), "bar") && foo == bar'

  1. Keys with special characters should be double quoted

'"eo:val" == 12'

The following will fail

'eo:val == 12'

SessionStatus​

Holds the status of a session.

repo_name: str​

Name of the repo

session: LocalSession​

LocalSession object.

modified_paths: list[tuple[Path, bool]]​

List of modified paths and whether they were deleted.