Fused
Fused is an end-to-end cloud platform for data analytics. The core feature behind Fused are Python functions that can be run via HTTP requests, called User Defined Functions (UDFs). You can run these UDFs and retrieve data from it from anywhere you can call a HTTP endpoint.
You can use the Fused Workbench to compute on and visualize data stored in Arraylake.
Credentials
To make working with Fused as easy as possible, we recommend using the following configuration options:
- Configure your repo to use role-based access delegation to access object storage
- Create or resuse an existng service account API token
Once you have your API token, enter it into the Fused secrets manager under the name ARRAYLAKE_TOKEN
.
This will allow your Fused UDFs to access data in Arraylake.
Create a UDF
To use Fused with Arraylake, you will write a Fused User-Defined Function (UDF) which translates data stored in Arraylake into something else.
In this example, we use Fused to visualize raster data, so we our UDF will use Fused Tile
mode.
The UDF accepts a bounding box, and our code must return a properly formatted Xarray DataArray with RGB image data for that bounding box.
The exact way this conversion is done will vary considerably depending on the nature of the data stored in Arraylake.
from datetime import datetime
import xarray as xr
import fused
import arraylake
# remove this after Arraylake v0.13 is released an it becomes default
arraylake.config.set({"chunkstore.use_delegated_credentials": True})
@fused.udf
def udf(
bbox: fused.types.TileGDF=None,
repo_name="earthmover-demos/sentinel-datacube-South-America-3",
varname="rgb_median",
time: datetime = datetime(2020, 10, 1),
min_data_value = 1000,
scale_factor: int = 3_000
) -> xr.DataArray:
import boto3
client = arraylake.Client(token=fused.secrets["ARRAYLAKE_TOKEN"])
repo = client.get_repo(repo_name, read_only=True)
ds = repo.to_xarray(mask_and_scale=False, chunks={})
# try to infer the resolution of the dataset
pixel_res = 110e3 * abs(ds.latitude.values[1] - ds.latitude.values[0])
print(bbox)
resolution = int(5 * 2 ** (15 - bbox.z[0]))
coarsen_factor = max(int(resolution // pixel_res), 1)
print(f"Coarsening by {coarsen_factor}")
min_lon, min_lat, max_lon, max_lat = bbox.bounds.values[0]
ds = ds.sel(time=time, method="nearest")
ds = ds.sel(longitude=slice(min_lon, max_lon), latitude=slice(max_lat, min_lat,))
data = ds[varname]
data.load()
data = data.where(data > 0)
if coarsen_factor > 1:
data = data.coarsen(
longitude=coarsen_factor,
latitude=coarsen_factor,
boundary="pad"
).mean()
# fused needs dimensions in this order
data = data.transpose("band", "latitude", "longitude")
data = data - min_data_value
data = (256 / scale_factor) * data
# avoid oversaturating pixels
data = data.where(data.max("band") <= 256, 256)
return data
Here is an example image produced by the fused UDF tile output: