#!/usr/bin/env python
# Copyright (c) 2022-2024, openradar developers.
# Distributed under the MIT License. See LICENSE for more info.
"""
ODIM_H5
=======
This sub-module contains the ODIM_H5 xarray backend for reading ODIM_H5-based radar
data into Xarray structures as well as a reader to create a complete datatree.Datatree.
Code ported from wradlib.
Example::
import xradar as xd
dtree = xd.io.open_odim_datatree(filename)
.. autosummary::
:nosignatures:
:toctree: generated/
{}
"""
__all__ = [
"OdimBackendEntrypoint",
"open_odim_datatree",
]
__doc__ = __doc__.format("\n ".join(__all__))
import datetime as dt
import io
import warnings
import h5netcdf
import numpy as np
import xarray as xr
from xarray import DataTree
from xarray.backends.common import (
AbstractDataStore,
BackendArray,
BackendEntrypoint,
find_root_and_group,
)
from xarray.backends.file_manager import CachingFileManager, DummyFileManager
from xarray.backends.locks import SerializableLock, ensure_lock
from xarray.backends.store import StoreBackendEntrypoint
from xarray.core import indexing
from xarray.core.utils import FrozenDict, is_remote_uri
from xarray.core.variable import Variable
from ... import util
from ...model import (
georeferencing_correction_subgroup,
get_altitude_attrs,
get_azimuth_attrs,
get_elevation_attrs,
get_latitude_attrs,
get_longitude_attrs,
get_range_attrs,
get_time_attrs,
moment_attrs,
radar_calibration_subgroup,
radar_parameters_subgroup,
sweep_vars_mapping,
)
from .common import (
_attach_sweep_groups,
_fix_angle,
_get_h5group_names,
_get_radar_calibration,
_get_required_root_dataset,
_get_subgroup,
_maybe_decode,
)
HDF5_LOCK = SerializableLock()
def _calculate_angle_res(dim):
# need to sort dim first
angle_diff = np.diff(sorted(dim))
angle_diff2 = np.abs(np.diff(angle_diff))
# only select angle_diff, where angle_diff2 is less than 0.1 deg
# Todo: currently 0.05 is working in most cases
# make this robust or parameterisable
angle_diff_wanted = angle_diff[:-1][angle_diff2 < 0.05]
return np.round(np.nanmean(angle_diff_wanted), decimals=2)
def _get_azimuth_how(how):
startaz = how["startazA"]
stopaz = how.get("stopazA", False)
if stopaz is False:
# stopazA missing
# create from startazA
stopaz = np.roll(startaz, -1)
stopaz[-1] += 360
zero_index = np.where(stopaz < startaz)
stopaz[zero_index[0]] += 360
azimuth_data = (startaz + stopaz) / 2.0
azimuth_data[azimuth_data >= 360] -= 360
return azimuth_data
def _get_azimuth_where(where):
res = 360.0 / where["nrays"]
return np.arange(res / 2.0, 360.0, res, dtype="float32")
def _get_fixed_dim_and_angle(where):
dim = "elevation"
# try RHI first
angle_keys = ["az_angle", "azangle"]
angle = None
for ak in angle_keys:
angle = where.get(ak, None)
if angle is not None:
break
if angle is None:
dim = "azimuth"
angle = where["elangle"]
# do not round angle
# angle = np.round(angle, decimals=1)
return dim, angle
def _get_elevation_how(how):
startel = how.get("startelA", False)
stopel = how.get("stopelA", False)
if startel is not False and stopel is not False:
elevation_data = (startel + stopel) / 2.0
else:
elevation_data = how["elangles"]
return elevation_data
def _get_elevation_where(where):
return np.ones(where["nrays"], dtype="float32") * where["elangle"]
def _get_time_how(how):
return (how["startazT"] + how["stopazT"]) / 2.0
def _get_time_what(what, where, nrays=None):
startdate = _maybe_decode(what["startdate"])
starttime = _maybe_decode(what["starttime"])
# take care for missing enddate/endtime
# see https://github.com/wradlib/wradlib/issues/563
enddate = _maybe_decode(what.get("enddate", what["startdate"]))
endtime = _maybe_decode(what.get("endtime", what["starttime"]))
start = dt.datetime.strptime(startdate + starttime, "%Y%m%d%H%M%S")
end = dt.datetime.strptime(enddate + endtime, "%Y%m%d%H%M%S")
start = start.replace(tzinfo=dt.timezone.utc).timestamp()
end = end.replace(tzinfo=dt.timezone.utc).timestamp()
if nrays is None:
nrays = where["nrays"]
if start == end:
import warnings
warnings.warn(
"xradar: Equal ODIM `starttime` and `endtime` "
"values. Can't determine correct sweep start-, "
"end- and raytimes.",
UserWarning,
)
time_data = np.ones(nrays) * start
else:
delta = (end - start) / nrays
time_data = np.arange(start + delta / 2.0, end, delta)
time_data = np.roll(time_data, shift=+where["a1gate"])
return time_data
def _get_range(where, odim_version=None):
scale = 1000.0
if odim_version == "ODIM_H5/V2_4":
scale = 1.0
ngates = where["nbins"]
range_start = where["rstart"] * scale
bin_range = where["rscale"]
cent_first = range_start + bin_range / 2.0
range_data = np.arange(
cent_first, range_start + bin_range * ngates, bin_range, dtype="float32"
)
return range_data, cent_first, bin_range
def _get_time(what, point="start"):
startdate = _maybe_decode(what[f"{point}date"])
starttime = _maybe_decode(what[f"{point}time"])
start = dt.datetime.strptime(startdate + starttime, "%Y%m%d%H%M%S")
start = np.array(start.replace(tzinfo=dt.timezone.utc)).astype("<M8[s]")
return start
def _get_a1gate(where):
return where["a1gate"]
class _H5NetCDFMetadata:
"""Wrapper around OdimH5/Gamic data fileobj for easy access of metadata.
Parameters
----------
fileobj : file-like
h5netcdf filehandle.
group : str
odim group to acquire
Returns
-------
object : metadata object
"""
def __init__(self, fileobj, group):
self._root = fileobj
self._group = group
self._how = None
self._what = None
self._where = None
self._dim0 = None
self._fixed_angle = None
@property
def coordinates(self):
# additional metadata which might come in handy in the future
# do not forward a1gate and angle_res for now
# a1gate = self.a1gate
# angle_res = _calculate_angle_res(locals()[dim])
# az_attrs["a1gate"] = a1gate
#
# if dim == "azimuth":
# az_attrs["angle_res"] = angle_res
# else:
# el_attrs["angle_res"] = angle_res
coordinates = {
"azimuth": self.azimuth,
"elevation": self.elevation,
"time": self.time,
"range": self.range,
"sweep_mode": self.sweep_mode,
"sweep_number": self.sweep_number,
"prt_mode": self.prt_mode,
"follow_mode": self.follow_mode,
"sweep_fixed_angle": self.sweep_fixed_angle,
"longitude": self.longitude,
"latitude": self.latitude,
"altitude": self.altitude,
}
return coordinates
@property
def first_dim(self):
warnings.warn(
"xradar: use of `first_dim` is deprecated, please use `dim0` instead.",
DeprecationWarning,
)
if self._dim0 is None:
self._dim0, self._fixed_angle = self._get_fixed_dim_and_angle()
return self._dim0
@property
def dim0(self):
"""Get name of first dimension."""
if self._dim0 is None:
self._dim0, self._fixed_angle = self._get_fixed_dim_and_angle()
return self._dim0
def get_variable_dimensions(self, dims):
dimensions = []
for n, _ in enumerate(dims):
if n == 0:
dimensions.append(self.dim0)
elif n == 1:
dimensions.append("range")
return tuple(dimensions)
@property
def site_coords(self):
return self._get_site_coords()
@property
def how(self):
"""Get how-group attributes."""
if self._how is None:
self._how = self.grp["how"].attrs
return self._how
@property
def what(self):
"""Get what-group attributes."""
if self._what is None:
self._what = self.grp["what"].attrs
return self._what
@property
def where(self):
"""Get where-group attributes."""
if self._where is None:
self._where = self.grp["where"].attrs
return self._where
def _get_odim_version(self):
version = self._root.attrs.get("Conventions", "None")
return version
def _get_site_coords(self):
lon = self._root["where"].attrs["lon"]
lat = self._root["where"].attrs["lat"]
alt = self._root["where"].attrs["height"]
return lon, lat, alt
@property
def azimuth(self):
"""Return azimuth Variable."""
return self._azimuth
@property
def elevation(self):
"""Return elevation Variable."""
return self._elevation
@property
def time(self):
"""Return time Variable."""
return self._time
@property
def range(self):
"""Return range Variable."""
range_data, cent_first, bin_range = self._get_range()
range_attrs = get_range_attrs(range_data)
return Variable(("range",), range_data, range_attrs)
@property
def longitude(self):
"""Return longitude Variable."""
return Variable((), self._root["where"].attrs["lon"], get_longitude_attrs())
@property
def latitude(self):
"""Return latitude Variable."""
return Variable((), self._root["where"].attrs["lat"], get_latitude_attrs())
@property
def altitude(self):
"""Return altitude Variable."""
return Variable((), self._root["where"].attrs["height"], get_altitude_attrs())
@property
def sweep_fixed_angle(self):
"""Return sweep_fixed_angle Variable."""
if self._fixed_angle is None:
self._dim0, self._fixed_angle = self._get_fixed_dim_and_angle()
return Variable((), self._fixed_angle)
@property
def sweep_mode(self):
"""Return sweep_mode Variable."""
sweep_mode = "azimuth_surveillance" if self.dim0 == "azimuth" else "rhi"
return Variable((), sweep_mode)
@property
def sweep_number(self):
"""Return sweep_number Variable."""
return Variable((), self._sweep_number)
@property
def prt_mode(self):
"""Return prt_mode Variable."""
return Variable((), "not_set")
@property
def follow_mode(self):
"""Return follow_mode Variable."""
return Variable((), "not_set")
class _OdimH5NetCDFMetadata(_H5NetCDFMetadata):
"""Wrapper around OdimH5 data fileobj for easy access of metadata.
Parameters
----------
fileobj : file-like
h5netcdf filehandle.
group : str
odim group to acquire
Returns
-------
object : metadata object
"""
@property
def ds_what(self):
return self._get_dset_what()
@property
def grp(self):
return self._root[self._group.split("/")[0]]
def _get_fixed_dim_and_angle(self):
return _get_fixed_dim_and_angle(self.where)
def _get_ray_times(self, nrays=None):
try:
time_data = _get_time_how(self.how)
self._need_time_recalc = False
except (AttributeError, KeyError, TypeError):
time_data = _get_time_what(self.what, self.where, nrays)
self._need_time_recalc = True
return time_data
def _get_range(self):
return _get_range(self.where, odim_version=self._odim_version)
def _get_time(self, point="start"):
return _get_time(self.what, point=point)
def _get_dset_what(self):
attrs = {}
what = self._root[self._group]["what"].attrs
gain = what.get("gain", 1.0)
offset = what.get("offset", 0.0)
if not (gain == 1.0 and offset == 0.0):
attrs["scale_factor"] = gain
attrs["add_offset"] = offset
attrs["_FillValue"] = what.get("nodata", None)
attrs["_Undetect"] = what.get("undetect", 0.0)
# if no quantity is given, use the group-name
attrs["quantity"] = _maybe_decode(
what.get("quantity", self._group.split("/")[-1])
)
return attrs
@property
def a1gate(self):
return _get_a1gate(self.where)
@property
def _azimuth(self):
try:
azimuth = _get_azimuth_how(self.how)
except (AttributeError, KeyError, TypeError):
azimuth = _get_azimuth_where(self.where)
return Variable((self.dim0,), azimuth, get_azimuth_attrs())
@property
def _elevation(self):
try:
elevation = _get_elevation_how(self.how)
except (AttributeError, KeyError, TypeError):
elevation = _get_elevation_where(self.where)
return Variable((self.dim0,), elevation, get_elevation_attrs())
@property
def _time(self):
return Variable((self.dim0,), self._get_ray_times(), get_time_attrs())
@property
def _sweep_number(self):
"""Return sweep number."""
return int(self._group.split("/")[0][7:]) - 1
@property
def _odim_version(self):
return self._get_odim_version()
class H5NetCDFArrayWrapper(BackendArray):
"""H5NetCDFArrayWrapper
adapted from https://github.com/pydata/xarray/
"""
__slots__ = ("datastore", "dtype", "shape", "variable_name")
def __init__(self, variable_name, datastore):
self.datastore = datastore
self.variable_name = variable_name
array = self.get_array()
self.shape = array.shape
dtype = array.dtype
if dtype is str:
# use object dtype because that's the only way in numpy to
# represent variable length strings; it also prevents automatic
# string concatenation via conventions.decode_cf_variable
dtype = np.dtype("O")
self.dtype = dtype
def __setitem__(self, key, value):
with self.datastore.lock:
data = self.get_array(needs_lock=False)
data[key] = value
if self.datastore.autoclose:
self.datastore.close(needs_lock=False)
def get_array(self, needs_lock=True):
ds = self.datastore._acquire(needs_lock)
return ds.variables[self.variable_name]
def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
)
def _getitem(self, key):
# h5py requires using lists for fancy indexing:
# https://github.com/h5py/h5py/issues/992
key = tuple(list(k) if isinstance(k, np.ndarray) else k for k in key)
with self.datastore.lock:
array = self.get_array(needs_lock=False)
return array[key]
def _get_h5netcdf_encoding(self, var):
"""get encoding from h5netcdf Variable
adapted from https://github.com/pydata/xarray/
"""
import h5py
# netCDF4 specific encoding
encoding = {
"chunksizes": var.chunks,
"fletcher32": var.fletcher32,
"shuffle": var.shuffle,
}
# Convert h5py-style compression options to NetCDF4-Python
# style, if possible
if var.compression == "gzip":
encoding["zlib"] = True
encoding["complevel"] = var.compression_opts
elif var.compression is not None:
encoding["compression"] = var.compression
encoding["compression_opts"] = var.compression_opts
# save source so __repr__ can detect if it's local or not
encoding["source"] = self._filename
encoding["original_shape"] = var.shape
vlen_dtype = h5py.check_dtype(vlen=var.dtype)
if vlen_dtype is str:
encoding["dtype"] = str
# todo: this might be removed, since xarray is capable from version 2023.X.Y
elif vlen_dtype is not None: # pragma: no cover
# xarray doesn't support writing arbitrary vlen dtypes yet.
pass
else:
encoding["dtype"] = var.dtype
return encoding
def _get_odim_variable_name_and_attrs(name, attrs):
if "data" in name:
name = attrs.pop("quantity")
# handle non-standard moment names
try:
mapping = sweep_vars_mapping[name]
except KeyError:
pass
else:
attrs.update({key: mapping[key] for key in moment_attrs})
attrs["coordinates"] = (
"elevation azimuth range latitude longitude altitude time"
)
return name, attrs
class OdimSubStore(AbstractDataStore):
"""Store for reading ODIM data-moments via h5netcdf."""
def __init__(
self,
store,
group=None,
lock=False,
):
if not isinstance(store, OdimStore):
raise TypeError(
f"Wrong type {type(store)} for parameter store, "
f"expected 'OdimStore'."
)
self._manager = store._manager
self._group = group
self._filename = store.filename
self.is_remote = is_remote_uri(self._filename)
self.lock = ensure_lock(lock)
@property
def root(self):
with self._manager.acquire_context(False) as root:
return _OdimH5NetCDFMetadata(root, self._group.lstrip("/"))
def _acquire(self, needs_lock=True):
with self._manager.acquire_context(needs_lock) as root:
ds = root[self._group.lstrip("/")]
return ds
@property
def ds(self):
return self._acquire()
def open_store_variable(self, name, var):
dimensions = self.root.get_variable_dimensions(var.dimensions)
data = indexing.LazilyOuterIndexedArray(H5NetCDFArrayWrapper(name, self))
encoding = _get_h5netcdf_encoding(self, var)
encoding["group"] = self._group
name, attrs = _get_odim_variable_name_and_attrs(name, self.root.ds_what)
return name, Variable(dimensions, data, attrs, encoding)
def open_store_coordinates(self):
return self.root.coordinates
def get_variables(self):
return FrozenDict(
(k1, v1)
for k1, v1 in {
**dict(
[
self.open_store_variable(k, v)
for k, v in self.ds.variables.items()
]
),
}.items()
)
class OdimStore(AbstractDataStore):
"""Store for reading ODIM dataset groups via h5netcdf."""
def __init__(self, manager, group=None, lock=False):
if isinstance(manager, (h5netcdf.File, h5netcdf.Group)):
if group is None:
root, group = find_root_and_group(manager)
else:
if type(manager) is not h5netcdf.File:
raise ValueError(
"must supply a h5netcdf.File if the group "
"argument is provided"
)
root = manager
manager = DummyFileManager(root)
self._manager = manager
self._group = f"/dataset{int(group[6:])+1}"
self._filename = self.filename
self.is_remote = is_remote_uri(self._filename)
self.lock = ensure_lock(lock)
self._substore = None
self._need_time_recalc = False
@classmethod
def open(
cls,
filename,
mode="r",
format=None,
group=None,
lock=None,
invalid_netcdf=None,
phony_dims=None,
decode_vlen_strings=True,
):
if isinstance(filename, bytes):
raise ValueError(
"can't open netCDF4/HDF5 as bytes "
"try passing a path or file-like object"
)
if format not in [None, "NETCDF4"]:
raise ValueError("invalid format for h5netcdf backend")
kwargs = {"invalid_netcdf": invalid_netcdf}
if phony_dims is not None:
kwargs["phony_dims"] = phony_dims
kwargs["decode_vlen_strings"] = decode_vlen_strings
if lock is None:
if util.has_import("dask"):
lock = HDF5_LOCK
else:
lock = False
manager = CachingFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs)
return cls(manager, group=group, lock=lock)
@property
def filename(self):
with self._manager.acquire_context(False) as root:
return root.filename
@property
def substore(self):
if self._substore is None:
with self._manager.acquire_context(False) as root:
subgroups = [
"/".join([self._group, k])
for k in root[self._group].groups
# get data and quality groups
if "data" or "quality" in k
]
substore = []
substore.extend(
[
OdimSubStore(
self,
group=group,
lock=self.lock,
)
for group in subgroups
]
)
self._substore = substore
return self._substore
def open_store_coordinates(self):
return self.substore[0].open_store_coordinates()
def get_variables(self):
return FrozenDict(
(k1, v1)
for k1, v1 in {
**{
k: v
for substore in self.substore
for k, v in substore.get_variables().items()
},
**self.open_store_coordinates(),
}.items()
)
def get_attrs(self):
attributes = {"Conventions": "ODIM_H5/V2_2"}
return FrozenDict(attributes)
[docs]
class OdimBackendEntrypoint(BackendEntrypoint):
"""Xarray BackendEntrypoint for ODIM data.
Keyword Arguments
-----------------
first_dim : str
Can be ``time`` or ``auto`` first dimension. If set to ``auto``,
first dimension will be either ``azimuth`` or ``elevation`` depending on
type of sweep. Defaults to ``auto``.
reindex_angle : bool or dict
Defaults to False, no reindexing. Given dict should contain the kwargs to
reindex_angle. Only invoked if `decode_coord=True`.
fix_second_angle : bool
If True, fixes erroneous second angle data. Defaults to ``False``.
site_coords : bool
Attach radar site-coordinates to Dataset, defaults to ``True``.
kwargs : dict
Additional kwargs are fed to :py:func:`xarray.open_dataset`.
"""
description = "Open ODIM_H5 (.h5, .hdf5) using h5netcdf in Xarray"
url = "https://xradar.rtfd.io/en/latest/io.html#odim-h5"
def open_dataset(
self,
filename_or_obj,
*,
mask_and_scale=True,
decode_times=True,
concat_characters=True,
decode_coords=True,
drop_variables=None,
use_cftime=None,
decode_timedelta=None,
format=None,
group="sweep_0",
invalid_netcdf=None,
phony_dims="access",
decode_vlen_strings=True,
first_dim="auto",
reindex_angle=False,
fix_second_angle=False,
site_coords=True,
):
if isinstance(filename_or_obj, io.IOBase):
filename_or_obj.seek(0)
store = OdimStore.open(
filename_or_obj,
format=format,
group=group,
invalid_netcdf=invalid_netcdf,
phony_dims=phony_dims,
decode_vlen_strings=decode_vlen_strings,
)
store_entrypoint = StoreBackendEntrypoint()
ds = store_entrypoint.open_dataset(
store,
mask_and_scale=mask_and_scale,
decode_times=decode_times,
concat_characters=concat_characters,
decode_coords=decode_coords,
drop_variables=drop_variables,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
)
# reassign azimuth/elevation/time coordinates
ds = ds.assign_coords({"azimuth": ds.azimuth})
ds = ds.assign_coords({"elevation": ds.elevation})
ds = ds.assign_coords({"time": ds.time})
ds.encoding["engine"] = "odim"
# handle duplicates and reindex
if decode_coords and reindex_angle is not False:
ds = ds.pipe(util.remove_duplicate_rays)
ds = ds.pipe(util.reindex_angle, **reindex_angle)
ds = ds.pipe(util.ipol_time, **reindex_angle)
# handling first dimension
dim0 = "elevation" if ds.sweep_mode.load() == "rhi" else "azimuth"
if first_dim == "auto":
if "time" in ds.dims:
ds = ds.swap_dims({"time": dim0})
ds = ds.sortby(dim0)
else:
if "time" not in ds.dims:
ds = ds.swap_dims({dim0: "time"})
ds = ds.sortby("time")
# fix second angle
if fix_second_angle and first_dim == "auto":
dim1 = {"azimuth": "elevation", "elevation": "azimuth"}[dim0]
ds = ds.assign_coords({dim1: ds[dim1].pipe(_fix_angle)})
# assign geo-coords
ds = ds.assign_coords(
{
"latitude": ds.latitude,
"longitude": ds.longitude,
"altitude": ds.altitude,
}
)
return ds
[docs]
def open_odim_datatree(filename_or_obj, **kwargs):
"""Open ODIM_H5 dataset as :py:class:`xarray.DataTree`.
Parameters
----------
filename_or_obj : str, Path, file-like or DataStore
Strings and Path objects are interpreted as a path to a local or remote
radar file
Keyword Arguments
-----------------
sweep : int, list of int, optional
Sweep number(s) to extract, default to first sweep. If None, all sweeps are
extracted into a list.
first_dim : str
Can be ``time`` or ``auto`` first dimension. If set to ``auto``,
first dimension will be either ``azimuth`` or ``elevation`` depending on
type of sweep. Defaults to ``auto``.
reindex_angle : bool or dict
Defaults to False, no reindexing. Given dict should contain the kwargs to
reindex_angle. Only invoked if `decode_coord=True`.
fix_second_angle : bool
If True, fixes erroneous second angle data. Defaults to ``False``.
site_coords : bool
Attach radar site-coordinates to Dataset, defaults to ``True``.
kwargs : dict
Additional kwargs are fed to :py:func:`xarray.open_dataset`.
Returns
-------
dtree: xarray.DataTree
DataTree
"""
# handle kwargs, extract first_dim
backend_kwargs = kwargs.pop("backend_kwargs", {})
optional = backend_kwargs.pop("optional", True)
sweep = kwargs.pop("sweep", None)
sweeps = []
kwargs["backend_kwargs"] = backend_kwargs
if isinstance(sweep, str):
sweeps = [sweep]
elif isinstance(sweep, int):
sweeps = [f"sweep_{sweep}"]
elif isinstance(sweep, list):
if isinstance(sweep[0], int):
sweeps = [f"sweep_{i+1}" for i in sweep]
else:
sweeps.extend(sweep)
else:
sweeps = _get_h5group_names(filename_or_obj, "odim")
ls_ds: list[xr.Dataset] = [
xr.open_dataset(filename_or_obj, group=swp, engine="odim", **kwargs)
for swp in sweeps
]
# todo: apply CfRadial2 group structure below
dtree: dict = {
"/": _get_required_root_dataset(ls_ds, optional=optional),
"/radar_parameters": _get_subgroup(ls_ds, radar_parameters_subgroup),
"/georeferencing_correction": _get_subgroup(
ls_ds, georeferencing_correction_subgroup
),
"/radar_calibration": _get_radar_calibration(ls_ds, radar_calibration_subgroup),
}
dtree = _attach_sweep_groups(dtree, ls_ds)
return DataTree.from_dict(dtree)