repo
The Repo module contains the Arraylake classes for interacting with repositories, AsyncRepo
and Repo
.
The Repo
class provides a Zarr-compatible store interface for use with Zarr, Xarray, and other libraries
that support the Zarr protocol.
Repos should not be instantiated directly--instead, use the Client
and AsyncClient
, i.e.
from arraylake import Client
client = Client()
repo = client.get_repo("my-org/my-repo")
LocalWriteSession
Tracks and manages the state of a write session.
update_expiration
async def update_expiration(expires_in: datetime.timedelta)
Update a session's expiration period.
Arguments:
expires_in
- a timedelta indicating how long after start_time the session should expire (defaults to 24 hours; maximum is 7 days).
abandon
async def abandon()
Discard the session and its associated modifications, if any.
The session cannot be used after abandoning. Subsequent attempts to write to an expired session will result in an error.
AsyncRepo
Asynchronous interface to Arraylake repo.
Because Zarr does not support asynchronous I/O, the async client cannot be used to read or write Zarr data directly.
__init__
def __init__(metastore_db: MetastoreDatabase, chunkstore: Chunkstore,
name: str, author: Author)
Arguments:
metastore_db
- A metastore database for storing metadatachunkstore
- A chunkstore for storing chunksname
- The name of the repo. Purely for display purposes.author
- The author name and email for commits
commit_data
async def commit_data(refresh: bool = False) -> CommitData
Returns the CommitData
for the current session.
commit_log
async def commit_log() -> CommitLog
Returns the CommitLog
for the current session.
status
async def status(limit: int = 1000) -> SessionStatus
Returns the SessionStatus
for the current session.
Arguments:
limit
int - [Optional] The number of modified paths to return. Defaults to 1000, passing 0 is equivalent to setting no limit.
ping
async def ping() -> None
Ping the metastore to confirm connection
list_active_sessions
async def list_active_sessions() -> Sequence[SessionInfo]
Obtain a time-ordered list of active sessions
create_session
async def create_session(
ref: str | CommitID = "main",
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT,
message: str | None = None) -> LocalSession
Create a new session.
Arguments:
ref
- The branch or commit to start the session from.for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).expires_in
- The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.message
- A description or message annotating the session. Defaults to None.
join_session
async def join_session(session_id: SessionID) -> LocalSession
Join an existing session.
Arguments:
session_id
- The ID of the session to join.
checkout
async def checkout(
ref: str | CommitID = "main",
session_token: SessionID | None = None,
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT
) -> CommitID | None
Checkout a ref (branch, tag, or commit ID) and initialize a new session.
Arguments:
ref
- Commit, branch, or tag name.session_token
- Shared session ID to join, if applicable. Defaults to None.for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).expires_in
- The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.
Returns:
commit
-CommitID
commit
async def commit(
message: str,
auto_ff: bool = True,
checkout_for_writing: bool = DEFAULT_IS_WRITE_SESSION
) -> CommitID | None
Commit this session's changes and start a new session.
Arguments:
message
- Commit messageauto_ff
- Whether to automatically fast-forward the repo and retry if the commit failscheckout_for_writing
- Whether to checkout the ensuing commit infor_writing
mode (default: True)
Returns:
new_commit
- ID of new commit
fast_forward
async def fast_forward()
Fast-forward the session. Attempts to update the session base commit to the latest branch tip. Will fail if the same paths have been modified in the current session and on the branch.
delete_tag
async def delete_tag(tag_name: str) -> None
Delete a tag.
Arguments:
tag_name
- Name of tag to delete
new_branch
async def new_branch(branch_name: str) -> None
Create a new branch based on the current session reference
Arguments:
branch_name
- New branch name
delete_branch
async def delete_branch(branch: str) -> None
Delete a branch.
Arguments:
branch
- Name of branch to delete
tree
async def tree(prefix: str = "",
*,
depth: int = 10,
filter: str | None = None) -> Tree
Display this repo's hierarchy as a Rich Tree
Arguments:
prefix
- Path prefixdepth
- Maximum depth to descend into the hierarchyfilter
- Optional JMESPath query to filter by
add_virtual_grib
@_write_op
async def add_virtual_grib(grib_uri: str, path: Path, **kwargs) -> None
Add a virtual GRIB2 dataset to the repo.
If the GRIB file contains multiple messages, this method will attempt to concatentate the messages to a single Zarr store, where the data payload of each message in a single chunk.
Warning: This method has only been tested with a limited number of GRIB2 files. Please file an issue for GRIB2 files that don't work as expected.
Arguments:
grib_uri
- The path to the GRIB2 file. Onlys3://
andgs://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
add_kerchunk_references
@_write_op
async def add_kerchunk_references(refs: dict[str, Any] | str, path: Path,
**kwargs) -> None
Add a kerchunk references to the repo.
Arguments:
-
refs
- A dictionary of kerchunk references or a path to a JSON file. -
path
- The path within the repo where the virtual dataset should be created. -
kwargs
- Additional arguments to pass tofsspec.open
(only used whenrefs
is a path to a JSON file). -
Note
- Kerchunk version 1 is supported unless thetemplates
orgen
keys are present.
add_virtual_netcdf
@_write_op
async def add_virtual_netcdf(netcdf_uri: str, path: Path, **kwargs) -> None
Add a virtual Netcdf dataset to the repo.
Arguments:
netcdf_uri
- The path to the netCDF file. Onlys3://
andgs://
URIs are supported. Both netCDF4 and netCDF3 files are supported.path
- The path within the repo where the virtual dataset should be created.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
add_virtual_zarr
@_write_op
async def add_virtual_zarr(zarr_uri: str, path: Path) -> None
Add a virtual Zarr dataset to the repo.
Arguments:
zarr_uri
- The path to the Zarr store. Only Zarr V2 stores ands3://
orgs://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created.
add_virtual_tiff
@_write_op
async def add_virtual_tiff(tiff_uri: str, path: Path, name: str,
**kwargs) -> None
Add a virtual TIFF dataset to the repo.
Arguments:
tiff_uri
- The path to the TIFF file. Onlys3://
orgs://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created. Unlike the other virtual functions, this path should include the array name.name
- TIFF files contian bare arrays without a name. You must provide one.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
Notes:
Arrays will be ingested with dimension names 'X', 'Y'[, 'band'] TIFFs with overviews will be ingested so that there is one array per overview level named '0', '1', '2', etc.
filter_metadata
async def filter_metadata(filter: str) -> list[str]
Filter repo metadata documents using a JMSE search string.
https://jmespath.org/specification.html
Repo
Synchronous interface to Arraylake repo.
__init__
def __init__(arepo: AsyncRepo)
Initialize a Repo from an initialized AsyncRepo
Arguments:
arepo
- An existing AsyncRepo
from_metastore_and_chunkstore
@classmethod
def from_metastore_and_chunkstore(cls, metastore_db: MetastoreDatabase,
chunkstore: Chunkstore, name: str,
author: Author) -> Repo
Initialize a Repo from an initialized metastore database and chunkstore
Arguments:
metastore_db
- A metastore database for storing metadatachunkstore
- A chunkstore for storing chunksname
- The name of the repo. Purely for display purposes.author
- The author name and email for commits
ping
def ping()
Ping the metastore to confirm connection
checkout
def checkout(
ref: str | CommitID = "main",
session_token: SessionID | None = None,
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT) -> CommitID
Checkout a ref (branch, tag, or commit ID) and initialize a new session.
Arguments:
ref
- Commit, branch, or tag name.session_token
- Shared session ID to join, if applicable. Defaults to None.for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).expires_in
- The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.
Returns:
commit
-CommitID
commit
def commit(message: str,
auto_ff: bool = True,
checkout_for_writing: bool = DEFAULT_IS_WRITE_SESSION) -> str
Commit this session's changes and start a new session.
Arguments:
message
- Commit messageauto_ff
- Whether to automatically fast-forward the repo and retry if the commit failscheckout_for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).
Returns:
new_commit
- ID of new commit
fast_forward
def fast_forward()
Fast-forward the session. Attempts to update the session base commit to the latest branch tip. Will fail if the same paths have been modified in the current session and on the branch.
tag
def tag(tag_name: str,
commit_id: str | CommitID | None = None,
*,
message: str | None = None) -> Tag
Add a new tag.
Arguments:
tag_name
- Tag namecommit_id
- Commit to tag
delete_tag
def delete_tag(tag_name: str) -> None
Delete an existing tag.
Arguments:
tag_name
- Tag name
new_branch
def new_branch(branch: str) -> None
Create a new branch based on the current session reference
Arguments:
branch_name
- New branch name
delete_branch
def delete_branch(branch: str) -> None
Delete an existing branch.
Arguments:
branch
- Branch name
list_active_sessions
def list_active_sessions() -> Sequence[SessionInfo]
List all active sessions.
create_session
def create_session(ref: str | CommitID = "main",
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT,
message: str | None = None) -> LocalSession
Create a new session.
Arguments:
ref
- Commit, branch, or tag name. Defaults to main.for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).expires_in
- The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.message
- Descriptive message for the session. Defaults to None.
join_session
def join_session(session_id: SessionID) -> LocalSession
Join an existing session.
Arguments:
session_id
- ID of session to join
store
@property
def store() -> ArraylakeStore
Access a Zarr-compatible ArraylakeStore
store object for this repo.
Example:
repo = Repo("my_org/my_repo")
group = zarr.open_group(store=repo.store)
root_group
@property
def root_group() -> zarr.Group
Open the Zarr root group of this repo.
Example:
repo = Repo("my_org/my_repo")
group = repo.root_group
group.tree() # visualize group hierarchy
status
def status(limit: int = 1000)
Returns the SessionStatus
for the current session.
Arguments:
limit
int - [Optional] The number of modified paths to return. Defaults to 1000, passing 0 is equivalent to setting no limit.
commit_log
@property
def commit_log()
Returns a log of all commits on the current branch.
tags
@property
def tags()
Returns a list of tag names.
branches
@property
def branches()
Returns a list of branch names.
add_kerchunk_references
def add_kerchunk_references(refs: dict[str, Any] | str, path: Path,
**kwargs) -> None
Add a kerchunk references to the repo.
Arguments:
refs
- A dictionary of kerchunk references.path
- The path within the repo where the virtual dataset should be created.kwargs
- Additional arguments to pass tofsspec.open
(only used whenrefs
is a path to a JSON file).
add_virtual_grib
def add_virtual_grib(grib_uri: str, path: Path) -> None
Add a virtual GRIB2 dataset to the repo.
Arguments:
path
- The path within the repo where the virtual dataset should be created.grib_uri
- The path to the GRIB2 file. Onlys3://
orgs://
URIs are supported at the moment.
add_virtual_hdf
def add_virtual_hdf(hdf_uri: str, path: Path) -> None
Add a virtual HDF5 dataset to the arraylake.
Arguments:
hdf_uri
- The path to the HDF5 file. Onlys3://
orgs://
URIs are supported at the moment.path
- The path with the repo where the virtual HDF5 dataset should be created.
add_virtual_netcdf
def add_virtual_netcdf(netcdf_uri: str, path: Path, **kwargs) -> None
Add a virtual Netcdf dataset to the repo.
Arguments:
netcdf_uri
- The path to the netCDF file. Onlys3://
orgs://
URIs are supported at the moment. Both netCDF4 and netCDF3 files are supported.path
- The path within the repo where the virtual dataset should be created.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
add_virtual_zarr
def add_virtual_zarr(zarr_uri: str, path: Path) -> None
Add a virtual Zarr dataset to the repo.
Arguments:
zarr_uri
- The path to the Zarr store. Only Zarr V2 stores ands3://
orgs://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created.
add_virtual_tiff
def add_virtual_tiff(tiff_uri: str, path: Path, name: str, **kwargs) -> None
Add a virtual TIFF dataset to the repo.
Arguments:
tiff_uri
- The path to the TIFF file. Onlys3://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created.name
- TIFF files contain bare arrays and no name. You must provide one.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
tree
def tree(prefix: str = "",
*,
depth: int = 10,
filter: str | None = None) -> Tree
Display this repo's hierarchy as a Rich Tree
Arguments:
prefix
- Path prefixdepth
- Maximum depth to descend into the hierarchyfilter
- JMESPath query to subset the tree
to_xarray
def to_xarray(group=None, **kwargs) -> xr.Dataset
Open and decode an Xarray dataset from the Zarr-compatible ArraylakeStore.
There is no need to specify the zarr_version
or engine
keyword arguments.
They are both set by default in this method.
Arguments:
group
- path to the Zarr Group to load thexarray.Dataset
from**kwargs
- additional keyword arguments passed toxarray.open_dataset
Returns:
Dataset
- xarray.Dataset
filter_metadata
def filter_metadata(filter: str) -> list[str]
Filter repo metadata attributes using a JMSE search string.
Arguments:
filter
- JMESPath query to subset the tree
Returns:
A list of document paths
The full JMES spec including examples and an interactive console is available here.
Some specific examples of queries:
"flags[0].spec[0] == 'a'"
"flags[0].spec[0:2] == ['a', 'b']"
'flags[0].spec[2] == band'
"contains(keys(@), 'flags') && contains(keys(@), 'band') && flags[0].spec[2] == band"
"someNaN == 'NaN'"
'number >= `3` && number <= `15`'
'"eo:val" == `12`'
'"created:at:time" <= `2022-05-01`'
'(!flags == `5` || flags == `10`) && foo == `10`'
And some specific nuances to be aware of:
- NaNs are strings, assert for them as follows:
"someKey == 'NaN'"
The following will not match NaN values:
"someNaN == NaN"
"someNaN == NaN
"
- Comparison of two missing keys is truthy:
The following will return true if both don't exist on the doc, as null == null
'foo == bar'
Here's a safer way to perform this query:
'contains(keys(@), "foo") && contains(keys(@), "bar") && foo == bar'
- Keys with special characters should be double quoted
'"eo:val" == 12
'
The following will fail
'eo:val == 12
'
ArraylakeStore
ArrayLake's Zarr Store interface
This is an implementation of a Zarr V3 Store.
This class is not intended to be constructed directly by users. Instead, use the store
property on the Repo
class.
list_prefix
def list_prefix(prefix: str) -> list[str]
List a prefix in the store
Arguments:
prefix : the path to list
Returns:
A list of document paths
listdir
def listdir(prefix: str) -> list[str]
List a directory in the store
Arguments:
prefix
- the path to list
Returns:
A list of document paths
__getitem__
def __getitem__(key) -> bytes
Get a value
Arguments:
key
- the path to get
Returns:
bytes (metadata or chunk)
getitems
def getitems(keys, *, contexts=None, on_error="omit") -> Mapping[str, bytes]
Get multiple items
Arguments:
keys
- list of paths to get
Returns:
Mapping where keys are paths and values are bytes (metadata or chunks)
__setitem__
def __setitem__(key, value: bytes) -> None
Set a value
Arguments:
key
- the path to set
Returns:
bytes (metadata or chunk)
setitems
def setitems(items: Mapping[str, bytes]) -> None
Set multiple items
Arguments:
keys : list of paths
Returns:
Mapping where keys are paths and values are bytes (metadata or chunks)
__delitem__
def __delitem__(key)
Delete a key.
Arguments:
key
- path to delete
delitems
def delitems(keys) -> None
Delete multiple keys
Arguments:
keys
- list of paths to delete
__contains__
def __contains__(key: str) -> bool
check if key exists in store.
Arguments:
key
- path to check
keys
def keys() -> list[str]
Return a list of this store's keys
__iter__
def __iter__()
Iterate over this store's keys
erase_prefix
def erase_prefix(prefix)
Erase all keys with the given prefix.
__len__
def __len__() -> int
number of keys in this store
__getattribute__
def __getattribute__(name)
Influence upstream Zarr by failing a hasattr check for get_partial_values
.
Doing so forces it to use a preferable code path for retrieving data (getitems). See: https://github.com/zarr-developers/zarr-python/blob/a81db0782535ba04c32c277102a6457d118a73e8/zarr/core.py#L2162-L2171
SessionStatus
Holds the status of a session.
repo_name: str
Name of the repo
session: LocalSession
LocalSession
object.
modified_paths: list[tuple[Path, bool]]
List of modified paths and whether they were deleted.