repo
The Repo module contains the Arraylake classes for interacting with repositories, AsyncRepo
and Repo
.
The Repo
class provides a Zarr-compatible store interface for use with Zarr, Xarray, and other libraries
that support the Zarr protocol.
Repos should not be instantiated directly--instead, use the Client
and AsyncClient
, i.e.
from arraylake import Client
client = Client()
repo = client.get_repo("my-org/my-repo")
LocalWriteSession​
Tracks and manages the state of a write session.
update_expiration​
async def update_expiration(expires_in: datetime.timedelta)
Update a session's expiration period.
Arguments:
expires_in
- a timedelta indicating how long after start_time the session should expire (defaults to 24 hours; maximum is 7 days).
abandon​
async def abandon()
Discard the session and its associated modifications, if any.
The session cannot be used after abandoning. Subsequent attempts to write to an expired session will result in an error.
AsyncRepo​
Asynchronous interface to Arraylake repo.
Because Zarr does not support asynchronous I/O, the async client cannot be used to read or write Zarr data directly.
__init__​
def __init__(metastore_db: MetastoreDatabase, chunkstore: Chunkstore,
name: str, author: Author)
Arguments:
metastore_db
- A metastore database for storing metadatachunkstore
- A chunkstore for storing chunksname
- The name of the repo. Purely for display purposes.author
- The author name and email for commits
commit_data​
async def commit_data(refresh: bool = False) -> CommitData
Returns the CommitData
for the current session.
commit_log​
async def commit_log() -> CommitLog
Returns the CommitLog
for the current session.
status​
async def status(limit: int = 1000) -> SessionStatus
Returns the SessionStatus
for the current session.
Arguments:
limit
int - [Optional] The number of modified paths to return. Defaults to 1000, passing 0 is equivalent to setting no limit.
ping​
async def ping() -> None
Ping the metastore to confirm connection
list_active_sessions​
async def list_active_sessions() -> Sequence[SessionInfo]
Obtain a time-ordered list of active sessions
create_session​
async def create_session(
ref: str | CommitID = "main",
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT,
message: str | None = None) -> LocalSession
Create a new session.
Arguments:
ref
- The branch or commit to start the session from.for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).expires_in
- The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.message
- A description or message annotating the session. Defaults to None.
join_session​
async def join_session(session_id: SessionID) -> LocalSession
Join an existing session.
Arguments:
session_id
- The ID of the session to join.
checkout​
async def checkout(
ref: str | CommitID = "main",
session_token: SessionID | None = None,
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT
) -> CommitID | None
Checkout a ref (branch, tag, or commit ID) and initialize a new session.
Arguments:
ref
- Commit, branch, or tag name.session_token
- Shared session ID to join, if applicable. Defaults to None.for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).expires_in
- The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.
Returns:
commit
-CommitID
commit​
async def commit(
message: str,
auto_ff: bool = True,
checkout_for_writing: bool = DEFAULT_IS_WRITE_SESSION
) -> CommitID | None
Commit this session's changes and start a new session.
Arguments:
message
- Commit messageauto_ff
- Whether to automatically fast-forward the repo and retry if the commit failscheckout_for_writing
- Whether to checkout the ensuing commit infor_writing
mode (default: True)
Returns:
new_commit
- ID of new commit
fast_forward​
async def fast_forward()
Fast-forward the session. Attempts to update the session base commit to the latest branch tip. Will fail if the same paths have been modified in the current session and on the branch.
delete_tag​
async def delete_tag(tag_name: str) -> None
Delete a tag.
Arguments:
tag_name
- Name of tag to delete
new_branch​
async def new_branch(branch_name: str) -> None
Create a new branch based on the current session reference
Arguments:
branch_name
- New branch name
delete_branch​
async def delete_branch(branch: str) -> None
Delete a branch.
Arguments:
branch
- Name of branch to delete
tree​
async def tree(prefix: str = "",
*,
depth: int = 10,
filter: str | None = None) -> Tree
Display this repo's hierarchy as a Rich Tree
Arguments:
prefix
- Path prefixdepth
- Maximum depth to descend into the hierarchyfilter
- Optional JMESPath query to filter by
add_virtual_grib​
@_write_op
async def add_virtual_grib(grib_uri: str, path: Path, **kwargs) -> None
Add a virtual GRIB2 dataset to the repo.
If the GRIB file contains multiple messages, this method will attempt to concatentate the messages to a single Zarr store, where the data payload of each message in a single chunk.
Warning: This method has only been tested with a limited number of GRIB2 files. Please file an issue for GRIB2 files that don't work as expected.
Arguments:
grib_uri
- The path to the GRIB2 file. Onlys3://
andgs://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
add_kerchunk_references​
@_write_op
async def add_kerchunk_references(refs: dict[str, Any] | str, path: Path,
**kwargs) -> None
Add a kerchunk references to the repo.
Arguments:
-
refs
- A dictionary of kerchunk references or a path to a JSON file. -
path
- The path within the repo where the virtual dataset should be created. -
kwargs
- Additional arguments to pass tofsspec.open
(only used whenrefs
is a path to a JSON file). -
Note
- Kerchunk version 1 is supported unless thetemplates
orgen
keys are present.
add_virtual_netcdf​
@_write_op
async def add_virtual_netcdf(netcdf_uri: str, path: Path, **kwargs) -> None
Add a virtual Netcdf dataset to the repo.
Arguments:
netcdf_uri
- The path to the netCDF file. Onlys3://
andgs://
URIs are supported. Both netCDF4 and netCDF3 files are supported.path
- The path within the repo where the virtual dataset should be created.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
add_virtual_zarr​
@_write_op
async def add_virtual_zarr(zarr_uri: str, path: Path) -> None
Add a virtual Zarr dataset to the repo.
Arguments:
zarr_uri
- The path to the Zarr store. Only Zarr V2 stores ands3://
orgs://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created.
add_virtual_tiff​
@_write_op
async def add_virtual_tiff(tiff_uri: str, path: Path, name: str,
**kwargs) -> None
Add a virtual TIFF dataset to the repo.
Arguments:
tiff_uri
- The path to the TIFF file. Onlys3://
orgs://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created. Unlike the other virtual functions, this path should include the array name.name
- TIFF files contian bare arrays without a name. You must provide one.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
Notes:
Arrays will be ingested with dimension names 'X', 'Y'[, 'band'] TIFFs with overviews will be ingested so that there is one array per overview level named '0', '1', '2', etc.
filter_metadata​
async def filter_metadata(filter: str) -> list[str]
Filter repo metadata documents using a JMSE search string.
https://jmespath.org/specification.html
Repo​
Synchronous interface to Arraylake repo.
__init__​
def __init__(arepo: AsyncRepo)
Initialize a Repo from an initialized AsyncRepo
Arguments:
arepo
- An existing AsyncRepo
from_metastore_and_chunkstore​
@classmethod
def from_metastore_and_chunkstore(cls, metastore_db: MetastoreDatabase,
chunkstore: Chunkstore, name: str,
author: Author) -> Repo
Initialize a Repo from an initialized metastore database and chunkstore
Arguments:
metastore_db
- A metastore database for storing metadatachunkstore
- A chunkstore for storing chunksname
- The name of the repo. Purely for display purposes.author
- The author name and email for commits
ping​
def ping()
Ping the metastore to confirm connection
checkout​
def checkout(
ref: str | CommitID = "main",
session_token: SessionID | None = None,
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT) -> CommitID
Checkout a ref (branch, tag, or commit ID) and initialize a new session.
Arguments:
ref
- Commit, branch, or tag name.session_token
- Shared session ID to join, if applicable. Defaults to None.for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).expires_in
- The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.
Returns:
commit
-CommitID
commit​
def commit(message: str,
auto_ff: bool = True,
checkout_for_writing: bool = DEFAULT_IS_WRITE_SESSION) -> str
Commit this session's changes and start a new session.
Arguments:
message
- Commit messageauto_ff
- Whether to automatically fast-forward the repo and retry if the commit failscheckout_for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).
Returns:
new_commit
- ID of new commit
fast_forward​
def fast_forward()
Fast-forward the session. Attempts to update the session base commit to the latest branch tip. Will fail if the same paths have been modified in the current session and on the branch.
tag​
def tag(tag_name: str,
commit_id: str | CommitID | None = None,
*,
message: str | None = None) -> Tag
Add a new tag.
Arguments:
tag_name
- Tag namecommit_id
- Commit to tag
delete_tag​
def delete_tag(tag_name: str) -> None
Delete an existing tag.
Arguments:
tag_name
- Tag name
new_branch​
def new_branch(branch: str) -> None
Create a new branch based on the current session reference
Arguments:
branch_name
- New branch name
delete_branch​
def delete_branch(branch: str) -> None
Delete an existing branch.
Arguments:
branch
- Branch name
list_active_sessions​
def list_active_sessions() -> Sequence[SessionInfo]
List all active sessions.
create_session​
def create_session(ref: str | CommitID = "main",
for_writing: bool = DEFAULT_IS_WRITE_SESSION,
expires_in: datetime.timedelta = DEFAULT_SESSION_TIMEOUT,
message: str | None = None) -> LocalSession
Create a new session.
Arguments:
ref
- Commit, branch, or tag name. Defaults to main.for_writing
- Whether the session is for writing or reading. Defaults to True (for writing).expires_in
- The duration the session should last. Defaults to 24 hours, maximum acceptable value is 7 days.message
- Descriptive message for the session. Defaults to None.
join_session​
def join_session(session_id: SessionID) -> LocalSession
Join an existing session.
Arguments:
session_id
- ID of session to join
store​
@property
def store()
Access a Zarr-compatible ArraylakeStore
store object for this repo.
Example:
repo = Repo("my_org/my_repo")
group = zarr.open_group(store=repo.store)
root_group​
@property
def root_group() -> zarr.Group
Open the Zarr root group of this repo.
Example:
repo = Repo("my_org/my_repo")
group = repo.root_group
group.tree() # visualize group hierarchy
status​
def status(limit: int = 1000)
Returns the SessionStatus
for the current session.
Arguments:
limit
int - [Optional] The number of modified paths to return. Defaults to 1000, passing 0 is equivalent to setting no limit.
commit_log​
@property
def commit_log()
Returns a log of all commits on the current branch.
tags​
@property
def tags()
Returns a list of tag names.
branches​
@property
def branches()
Returns a list of branch names.
add_kerchunk_references​
def add_kerchunk_references(refs: dict[str, Any] | str, path: Path,
**kwargs) -> None
Add a kerchunk references to the repo.
Arguments:
refs
- A dictionary of kerchunk references.path
- The path within the repo where the virtual dataset should be created.kwargs
- Additional arguments to pass tofsspec.open
(only used whenrefs
is a path to a JSON file).
add_virtual_grib​
def add_virtual_grib(grib_uri: str, path: Path) -> None
Add a virtual GRIB2 dataset to the repo.
Arguments:
path
- The path within the repo where the virtual dataset should be created.grib_uri
- The path to the GRIB2 file. Onlys3://
orgs://
URIs are supported at the moment.
add_virtual_hdf​
def add_virtual_hdf(hdf_uri: str, path: Path) -> None
Add a virtual HDF5 dataset to the arraylake.
Arguments:
hdf_uri
- The path to the HDF5 file. Onlys3://
orgs://
URIs are supported at the moment.path
- The path with the repo where the virtual HDF5 dataset should be created.
add_virtual_netcdf​
def add_virtual_netcdf(netcdf_uri: str, path: Path, **kwargs) -> None
Add a virtual Netcdf dataset to the repo.
Arguments:
netcdf_uri
- The path to the netCDF file. Onlys3://
orgs://
URIs are supported at the moment. Both netCDF4 and netCDF3 files are supported.path
- The path within the repo where the virtual dataset should be created.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
add_virtual_zarr​
def add_virtual_zarr(zarr_uri: str, path: Path) -> None
Add a virtual Zarr dataset to the repo.
Arguments:
zarr_uri
- The path to the Zarr store. Only Zarr V2 stores ands3://
orgs://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created.
add_virtual_tiff​
def add_virtual_tiff(tiff_uri: str, path: Path, name: str, **kwargs) -> None
Add a virtual TIFF dataset to the repo.
Arguments:
tiff_uri
- The path to the TIFF file. Onlys3://
URIs are supported at the moment.path
- The path within the repo where the virtual dataset should be created.name
- TIFF files contain bare arrays and no name. You must provide one.kwargs
- Additional arguments to pass to the kerchunk file format backend. Do not passstorage_options
orinline_threshold
.
tree​
def tree(prefix: str = "",
*,
depth: int = 10,
filter: str | None = None) -> Tree
Display this repo's hierarchy as a Rich Tree
Arguments:
prefix
- Path prefixdepth
- Maximum depth to descend into the hierarchyfilter
- JMESPath query to subset the tree
to_xarray​
def to_xarray(group=None, **kwargs) -> xr.Dataset
Open and decode an Xarray dataset from the Zarr-compatible ArraylakeStore.
There is no need to specify the zarr_version
or engine
keyword arguments.
They are both set by default in this method.
Arguments:
group
- path to the Zarr Group to load thexarray.Dataset
from**kwargs
- additional keyword arguments passed toxarray.open_dataset
Returns:
Dataset
- xarray.Dataset
filter_metadata​
def filter_metadata(filter: str) -> list[str]
Filter repo metadata attributes using a JMSE search string.
Arguments:
filter
- JMESPath query to subset the tree
Returns:
A list of document paths
The full JMES spec including examples and an interactive console is available here.
Some specific examples of queries:
"flags[0].spec[0] == 'a'"
"flags[0].spec[0:2] == ['a', 'b']"
'flags[0].spec[2] == band'
"contains(keys(@), 'flags') && contains(keys(@), 'band') && flags[0].spec[2] == band"
"someNaN == 'NaN'"
'number >= `3` && number <= `15`'
'"eo:val" == `12`'
'"created:at:time" <= `2022-05-01`'
'(!flags == `5` || flags == `10`) && foo == `10`'
And some specific nuances to be aware of:
- NaNs are strings, assert for them as follows:
"someKey == 'NaN'"
The following will not match NaN values:
"someNaN == NaN"
"someNaN == NaN
"
- Comparison of two missing keys is truthy:
The following will return true if both don't exist on the doc, as null == null
'foo == bar'
Here's a safer way to perform this query:
'contains(keys(@), "foo") && contains(keys(@), "bar") && foo == bar'
- Keys with special characters should be double quoted
'"eo:val" == 12
'
The following will fail
'eo:val == 12
'
SessionStatus​
Holds the status of a session.
repo_name: str
​
Name of the repo
session: LocalSession
​
LocalSession
object.
modified_paths: list[tuple[Path, bool]]
​
List of modified paths and whether they were deleted.