Source code for xradar.io.backends.nexrad_level2

#!/usr/bin/env python
# Copyright (c) 2024, openradar developers.
# Distributed under the MIT License. See LICENSE for more info.

"""
NEXRAD Level2 Data I/O
^^^^^^^^^^^^^^^^^^^^^^

Reads data from NEXRAD Level2 data format

See https://www.roc.noaa.gov/WSR88D/BuildInfo/Files.aspx

Documents:
 - ICD 2620002 M ICD FOR RDA/RPG - Build RDA 11.5/RPG 13.0 (PDF)
 - ICD 2620010 E ICD FOR ARCHIVE II/USER - Build 12.0 (PDF)

To read from NEXRAD Level2 files :class:`numpy:numpy.memmap` is used for
uncompressed files (pre 2016-06-01) and :class:`bz2:BZ2Decompressor` for BZ2
compressed data. The NEXRAD header (`VOLUME_HEADER`, `MSG_HEADER`) are read in
any case into dedicated OrderedDict's. Reading sweep data can be skipped by
setting `loaddata=False`. By default, the data is decoded on the fly.
Using `rawdata=True` the data will be kept undecoded.

Code adapted from Py-ART.

.. autosummary::
   :nosignatures:
   :toctree: generated/

   {}
"""
__all__ = [
    "NexradLevel2BackendEntrypoint",
    "open_nexradlevel2_datatree",
]
__doc__ = __doc__.format("\n   ".join(__all__))

import bz2
import struct
from collections import OrderedDict

import numpy as np
import xarray as xr
from xarray import DataTree
from xarray.backends.common import AbstractDataStore, BackendArray, BackendEntrypoint
from xarray.backends.file_manager import CachingFileManager
from xarray.backends.store import StoreBackendEntrypoint
from xarray.core import indexing
from xarray.core.utils import FrozenDict, close_on_error
from xarray.core.variable import Variable

from xradar import util
from xradar.io.backends.common import (
    _assign_root,
    _get_radar_calibration,
    _get_subgroup,
)
from xradar.model import (
    georeferencing_correction_subgroup,
    get_altitude_attrs,
    get_azimuth_attrs,
    get_elevation_attrs,
    get_latitude_attrs,
    get_longitude_attrs,
    get_range_attrs,
    get_time_attrs,
    moment_attrs,
    radar_calibration_subgroup,
    radar_parameters_subgroup,
    sweep_vars_mapping,
)

from .iris import (
    BIN2,
    IrisRecord,
    _get_fmt_string,
    _unpack_dictionary,
    string_dict,
)

#: mapping from NEXRAD names to CfRadial2/ODIM
nexrad_mapping = {
    "REF": "DBZH",
    "VEL": "VRADH",
    "SW ": "WRADH",
    "ZDR": "ZDR",
    "PHI": "PHIDP",
    "RHO": "RHOHV",
    "CFP": "CCORH",
}

# NEXRAD Level II file structures and sizes
# The deails on these structures are documented in:
# "Interface Control Document for the Achive II/User" RPG Build 12.0
# Document Number 2620010E
# and
# "Interface Control Document for the RDA/RPG" Open Build 13.0
# Document Number 2620002M
# Tables and page number refer to those in the second document unless
# otherwise noted.
RECORD_BYTES = 2432
COMPRESSION_RECORD_SIZE = 12
CONTROL_WORD_SIZE = 4

# format of structure elements
# section 3.2.1, page 3-2
UINT1 = {"fmt": "B", "dtype": "unit8"}
UINT2 = {"fmt": "H", "dtype": "uint16"}
UINT4 = {"fmt": "I", "dtype": "uint32"}
SINT1 = {"fmt": "b", "dtype": "int8"}
SINT2 = {"fmt": "h", "dtype": "int16"}
SINT4 = {"fmt": "i", "dtype": "int32"}
FLT4 = {"fmt": "f", "dtype": "float32"}
CODE1 = {"fmt": "B"}
CODE2 = {"fmt": "H"}


class NEXRADFile:
    """
    Class for accessing data in a NEXRAD (WSR-88D) Level II file.

    NEXRAD Level II files [1]_, also know as NEXRAD Archive Level II or
    WSR-88D Archive level 2, are available from the NOAA National Climate Data
    Center [2]_ as well as on the UCAR THREDDS Data Server [3]_. Files with
    uncompressed messages and compressed messages are supported. This class
    supports reading both "message 31" and "message 1" type files.

    Parameters
    ----------
    filename : str
        Filename of Archive II file to read.

    References
    ----------
    .. [1] http://www.roc.noaa.gov/WSR88D/Level_II/Level2Info.aspx
    .. [2] http://www.ncdc.noaa.gov/
    .. [3] http://thredds.ucar.edu/thredds/catalog.html

    """

    def __init__(self, filename, mode="r", loaddata=False):
        """initalize the object."""
        self._fp = None
        self._filename = filename
        # read in the volume header and compression_record
        if hasattr(filename, "read"):
            self._fh = filename
        else:
            self._fp = open(filename, "rb")
            self._fh = np.memmap(self._fp, mode=mode)
        self._filepos = 0
        self._rawdata = False
        self._loaddata = loaddata
        self._bz2_indices = None
        self.volume_header = self.get_header(VOLUME_HEADER)
        return

    @property
    def filename(self):
        return self._filename

    @property
    def fh(self):
        return self._fh

    @property
    def filepos(self):
        return self._filepos

    @filepos.setter
    def filepos(self, pos):
        self._filepos = pos

    def read_from_file(self, size):
        """Read from file.

        Parameters
        ----------
        size : int
            Number of data words to read.

        Returns
        -------
        data : array-like
            numpy array of data
        """
        start = self._filepos
        self._filepos += size
        return self._fh[start : self._filepos]

    def get_header(self, header):
        len = struct.calcsize(_get_fmt_string(header))
        head = _unpack_dictionary(self.read_from_file(len), header, self._rawdata)
        return head

    @property
    def bz2_record_indices(self):
        """Get file offsets of bz2 records."""
        if self._bz2_indices is None:
            # magic number inside BZ2
            seq = np.array([49, 65, 89, 38, 83, 89], dtype=np.uint8)
            rd = util.rolling_dim(self._fh, 6)
            self._bz2_indices = np.nonzero((rd == seq).all(1))[0] - 8
        return self._bz2_indices

    @property
    def is_compressed(self):
        """File contains bz2 compressed data."""
        size = self._fh[24:28].view(dtype=">u4")[0]
        return size > 0

    def close(self):
        if self._fp is not None:
            self._fp.close()

    __del__ = close

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.close()


class NEXRADRecordFile(NEXRADFile):
    """NEXRADRecordFile Record File class"""

    def __init__(self, filename, **kwargs):
        super().__init__(filename=filename, **kwargs)
        self._rh = None
        self._rc = None
        self._ldm = dict()
        self._record_number = None

    @property
    def rh(self):
        """Returns current record object."""
        return self._rh

    @rh.setter
    def rh(self, value):
        """Sets current record object."""
        self._rh = value

    @property
    def record_number(self):
        """Returns current record number."""
        return self._record_number

    @record_number.setter
    def record_number(self, value):
        """Sets current record number."""
        self._record_number = value

    @property
    def record_size(self):
        """Returns current record number."""
        return self._record_size

    @record_size.setter
    def record_size(self, value):
        """Sets current record number."""
        self._record_size = value

    def _check_record(self):
        """Checks record for correct size.

        Need to be implemented in the derived classes
        """
        return True

    def get_end(self, buf):
        if len(buf) < 16:
            return 0
        header = _unpack_dictionary(buf, MSG_HEADER, self._rawdata, byte_order=">")
        size = header["size"] * 2 + 12
        if header["type"] != 31:
            size = size if size >= RECORD_BYTES else RECORD_BYTES
        return size

    def init_record(self, recnum):
        """Initialize record using given number."""

        # map record numbers to ldm compressed records
        def get_ldm(recnum):
            if recnum < 134:
                return 0
            mod = ((recnum - 134) // 120) + 1
            return mod

        if self.is_compressed:
            ldm = get_ldm(recnum)
            # get uncompressed ldm record
            if self._ldm.get(ldm, None) is None:
                # otherwise extract wanted ldm compressed record
                if ldm >= len(self.bz2_record_indices):
                    return False
                start = self.bz2_record_indices[ldm]
                size = self._fh[start : start + 4].view(dtype=">u4")[0]
                self._fp.seek(start + 4)
                dec = bz2.BZ2Decompressor()
                self._ldm[ldm] = np.frombuffer(
                    dec.decompress(self._fp.read(size)), dtype=np.uint8
                )

        # rectrieve wanted record and put into self.rh
        if recnum < 134:
            start = recnum * RECORD_BYTES
            if not self.is_compressed:
                start += 24
            stop = start + RECORD_BYTES
        else:
            if self.is_compressed:
                # get index into current compressed ldm record
                rnum = (recnum - 134) % 120
                start = self.record_size + self.filepos if rnum else 0
                buf = self._ldm[ldm][start + 12 : start + 12 + LEN_MSG_HEADER]
                size = self.get_end(buf)
                if not size:
                    return False
                stop = start + size
            else:
                start = self.record_size + self.filepos
                buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER]
                size = self.get_end(buf)
                if not size:
                    return False
                stop = start + size
        self.record_number = recnum
        self.record_size = stop - start
        if self.is_compressed:
            self.rh = IrisRecord(self._ldm[ldm][start:stop], recnum)
        else:
            self.rh = IrisRecord(self.fh[start:stop], recnum)
        self.filepos = start
        return self._check_record()

    def init_record_by_filepos(self, recnum, filepos):
        """Initialize record using given record number and position."""
        start = filepos
        buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER]
        size = self.get_end(buf)
        if not size:
            return False
        stop = start + size
        self.record_number = recnum
        self.record_size = stop - start
        self.rh = IrisRecord(self.fh[start:stop], recnum)
        self.filepos = start
        return self._check_record()

    def init_next_record(self):
        """Get next record from file.

        This increases record_number count and initialises a new IrisRecord
        with the calculated start and stop file offsets.

        Returns
        -------
        chk : bool
            True, if record is truncated.
        """
        return self.init_record(self.record_number + 1)

    def array_from_record(self, words, width, dtype):
        """Retrieve array from current record.

        Parameters
        ----------
        words : int
            Number of data words to read.
        width : int
            Size of the data word to read in bytes.
        dtype : str
            dtype string specifying data format.

        Returns
        -------
        data : array-like
            numpy array of data
        """
        return self.rh.read(words, width=width).view(dtype=dtype)

    def bytes_from_record(self, words, width):
        """Retrieve bytes from current record.

        Parameters
        ----------
        words : int
            Number of data words to read.
        width : int
            Size of the data word to read in bytes.

        Returns
        -------
        data : array-like
            numpy array of data
        """
        return self.rh.read(words, width=width)


class NEXRADLevel2File(NEXRADRecordFile):
    def __init__(self, filename, **kwargs):
        super().__init__(filename, **kwargs)

        # check compression

        self._data_header = None
        # get all metadata headers
        # message 15, 13, 18, 3, 5 and 2
        self._meta_header = None

        # message 15
        # RDA Clutter Map Data
        # message 13
        # RDA Clutter Filter Bypass Map
        # message 18
        # RDA Adaptable Parameters
        # message 3
        # RDA Performance/Maintenance Data
        # message 5
        # RDA Volume Coverage Data
        self._msg_5_data = None
        # message 2
        # RDA Status Data

        # message 31 headers
        # Digital Radar Data Generic Format
        self._data_header = None
        self._msg_31_header = None
        self._msg_31_data_header = None
        self._data = OrderedDict()

    @property
    def data_header(self):
        """Retrieve data header."""
        if self._data_header is None:
            (
                self._data_header,
                self._msg_31_header,
                self._msg_31_data_header,
            ) = self.get_data_header()
        return self._data_header

    @property
    def msg_31_header(self):
        """Retrieve MSG31 Header."""
        if self._msg_31_header is None:
            (
                self._data_header,
                self._msg_31_header,
                self._msg_31_data_header,
            ) = self.get_data_header()
        return self._msg_31_header

    @property
    def msg_31_data_header(self):
        """Retrieve MSG31 data header."""
        if self._msg_31_data_header is None:
            (
                self._data_header,
                self._msg_31_header,
                self._msg_31_data_header,
            ) = self.get_data_header()
        return self._msg_31_data_header

    @property
    def meta_header(self):
        """Retrieve metadat header."""
        if self._meta_header is None:
            self._meta_header = self.get_metadata_header()
        return self._meta_header

    @property
    def msg_5(self):
        """Retrieve MSG5 data."""
        if self._msg_5_data is None:
            self._msg_5_data = self.get_msg_5_data()
        return self._msg_5_data

    @property
    def data(self):
        """Retrieve data."""
        return self._data

    def get_sweep(self, sweep_number, moments=None):
        """Retrieve sweep according sweep_number."""
        if moments is None:
            moments = self.msg_31_data_header[sweep_number]["msg_31_data_header"].keys()
        # get selected coordinate type names
        selected = [k for k in DATA_BLOCK_CONSTANT_IDENTIFIERS.keys() if k in moments]
        for moment in selected:
            self.get_moment(sweep_number, moment, "sweep_constant_data")

        # get selected data type names
        selected = [k for k in DATA_BLOCK_VARIABLE_IDENTIFIERS.keys() if k in moments]
        for moment in selected:
            self.get_moment(sweep_number, moment, "sweep_data")

    def get_moment(self, sweep_number, moment, mtype):
        """Retrieve Moment according sweep_number and moment."""
        sweep = self.data[sweep_number]
        # create new sweep_data OrderedDict
        if not sweep.get(mtype, False):
            sweep[mtype] = OrderedDict()
        # check if moment exist and return early
        data = sweep[mtype].get(moment, False)
        if data is not False:
            return

        data = OrderedDict()
        data[moment] = self.msg_31_data_header[sweep_number]["msg_31_data_header"].pop(
            moment
        )
        sweep.update(self.msg_31_data_header[sweep_number])
        sweep[mtype].update(data)

    def get_metadata_header(self):
        """Get metadaata header"""
        # data offsets
        # ICD 2620010E
        # 7.3.5 Metadata Record
        meta_offsets = dict(
            msg_15=(0, 77),
            msg_13=(77, 126),
            msg_18=(126, 131),
            msg_3=(131, 132),
            msg_5=(132, 133),
            msg_2=(133, 134),
        )
        meta_headers = {}
        for msg, (ms, me) in meta_offsets.items():
            mheader = []
            for rec in np.arange(ms, me):
                self.init_record(rec)
                filepos = self.filepos
                message_header = self.get_message_header()
                # do not read zero blocks of data
                if message_header["size"] == 0:
                    break
                message_header["record_number"] = self.record_number
                message_header["filepos"] = filepos
                mheader.append(message_header)
            meta_headers[msg] = mheader
        return meta_headers

    def get_msg_5_data(self):
        """Get MSG5 data."""
        self.init_record(132)
        self.rh.pos += LEN_MSG_HEADER + 12
        # unpack msg header
        msg_5 = _unpack_dictionary(
            self._rh.read(LEN_MSG_5, width=1),
            MSG_5,
            self._rawdata,
            byte_order=">",
        )
        msg_5["elevation_data"] = []
        # unpack elevation cuts
        for i in range(msg_5["number_elevation_cuts"]):
            msg_5_elev = _unpack_dictionary(
                self._rh.read(LEN_MSG_5_ELEV, width=1),
                MSG_5_ELEV,
                self._rawdata,
                byte_order=">",
            )
            msg_5["elevation_data"].append(msg_5_elev)
        return msg_5

    def get_message_header(self):
        """Read and unpack message header."""
        self._rh.pos += 12
        return _unpack_dictionary(
            self._rh.read(LEN_MSG_HEADER, width=1),
            MSG_HEADER,
            self._rawdata,
            byte_order=">",
        )

    def get_data(self, sweep_number, moment=None):
        """Load sweep data from file."""
        sweep = self.data[sweep_number]
        start = sweep["record_number"]
        stop = sweep["record_end"]
        intermediate_records = [
            rec["record_number"] for rec in sweep["intermediate_records"]
        ]
        filepos = sweep["filepos"]
        moments = sweep["sweep_data"]
        if moment is None:
            moment = moments
        elif isinstance(moment, str):
            moment = [moment]
        for name in moment:
            if self.is_compressed:
                self.init_record(start)
            else:
                self.init_record_by_filepos(start, filepos)
            ngates = moments[name]["ngates"]
            word_size = moments[name]["word_size"]
            data_offset = moments[name]["data_offset"]
            width = {8: 1, 16: 2}[word_size]
            data = []
            self.rh.pos += data_offset
            data.append(self._rh.read(ngates, width=width).view(f">u{width}"))
            while self.init_next_record() and self.record_number <= stop:
                if self.record_number in intermediate_records:
                    continue
                self.rh.pos += data_offset
                data.append(self._rh.read(ngates, width=width).view(f">u{width}"))
            moments[name].update(data=data)

    def get_data_header(self):
        """Load all data header from file."""
        self.init_record(133)
        current_sweep = -1
        current_header = -1
        sweep_msg_31_header = []
        sweep_intermediate_records = []
        sweep = OrderedDict()

        data_header = []
        _msg_31_header = []
        _msg_31_data_header = []

        while self.init_next_record():
            current_header += 1
            # get message headers
            msg_header = self.get_message_header()
            # append to data headers list
            msg_header["record_number"] = self.record_number
            msg_header["filepos"] = self.filepos

            # keep all data headers
            data_header.append(msg_header)
            # only check type 31 for now
            if msg_header["type"] == 31:
                # get msg_31_header
                msg_31_header = _unpack_dictionary(
                    self._rh.read(LEN_MSG_31, width=1),
                    MSG_31,
                    self._rawdata,
                    byte_order=">",
                )
                # add record_number/filepos
                msg_31_header["record_number"] = self.record_number
                msg_31_header["filepos"] = self.filepos
                # retrieve data/const headers from msg 31
                # check if this is a new sweep
                status = msg_31_header["radial_status"]
                if status == 1:
                    # 1 - intermediate radial
                    pass
                if status in [2, 4]:
                    # 2 - end of elevation
                    # 4 - end of volume
                    sweep["record_end"] = self.rh.recnum
                    sweep["intermediate_records"] = sweep_intermediate_records
                    self._data[current_sweep] = sweep
                    _msg_31_header.append(sweep_msg_31_header)
                if status in [0, 3, 5]:
                    # 0 - start of new elevation
                    # 3 - start of new volume
                    # 5 - start of new elevation, last elevation in VCP
                    current_sweep += 1
                    # create new sweep object
                    sweep = OrderedDict()
                    sweep_msg_31_header = []
                    sweep_intermediate_records = []
                    sweep["record_number"] = self.rh.recnum
                    sweep["filepos"] = self.filepos
                    sweep["msg_31_header"] = msg_31_header
                    block_pointers = [
                        v
                        for k, v in msg_31_header.items()
                        if k.startswith("block_pointer") and v > 0
                    ]
                    block_header = {}
                    for i, block_pointer in enumerate(
                        block_pointers[: msg_31_header["block_count"]]
                    ):
                        self.rh.pos = block_pointer + 12 + LEN_MSG_HEADER
                        buf = self._rh.read(4, width=1)
                        dheader = _unpack_dictionary(
                            buf,
                            DATA_BLOCK_HEADER,
                            self._rawdata,
                            byte_order=">",
                        )
                        block = DATA_BLOCK_TYPE_IDENTIFIER[dheader["block_type"]][
                            dheader["data_name"]
                        ]
                        LEN_BLOCK = struct.calcsize(
                            _get_fmt_string(block, byte_order=">")
                        )
                        block_header[dheader["data_name"]] = _unpack_dictionary(
                            self._rh.read(LEN_BLOCK, width=1),
                            block,
                            self._rawdata,
                            byte_order=">",
                        )
                        if dheader["block_type"] == "D":
                            block_header[dheader["data_name"]][
                                "data_offset"
                            ] = self.rh.pos

                    sweep["msg_31_data_header"] = block_header
                    _msg_31_data_header.append(sweep)
                sweep_msg_31_header.append(msg_31_header)
            else:
                sweep_intermediate_records.append(msg_header)

        return data_header, _msg_31_header, _msg_31_data_header

    def _check_record(self):
        """Checks record for correct size.

        Returns
        -------
        chk : bool
            False, if record is truncated.
        """
        chk = self._rh.record.shape[0] == self.record_size
        if not chk:
            raise EOFError(f"Unexpected file end detected at record {self.rh.recnum}")
        return chk


# Figure 1 in Interface Control Document for the Archive II/User
# page 7-2
VOLUME_HEADER = OrderedDict(
    [
        ("tape", {"fmt": "9s"}),
        ("extension", {"fmt": "3s"}),
        ("date", UINT4),
        ("time", UINT4),
        ("icao", {"fmt": "4s"}),
    ]
)

# Table II Message Header Data
# page 3-7
MSG_HEADER = OrderedDict(
    [
        ("size", UINT2),  # size of data, no including header
        ("channels", UINT1),
        ("type", UINT1),
        ("seq_id", UINT2),
        ("date", UINT2),
        ("ms", UINT4),
        ("segments", UINT2),
        ("seg_num", UINT2),
    ]
)
LEN_MSG_HEADER = struct.calcsize(_get_fmt_string(MSG_HEADER, byte_order=">"))

# Table XVII Digital Radar Generic Format Blocks (Message Type 31)
# pages 3-87 to 3-89
MSG_31 = OrderedDict(
    [
        ("id", {"fmt": "4s"}),  # 0-3
        ("collect_ms", UINT4),  # 4-7
        ("collect_date", UINT2),  # 8-9
        ("azimuth_number", UINT2),  # 10-11
        ("azimuth_angle", FLT4),  # 12-15
        ("compress_flag", CODE1),  # 16
        ("spare_0", UINT1),  # 17
        ("radial_length", UINT2),  # 18-19
        ("azimuth_resolution", CODE1),  # 20
        ("radial_status", CODE1),  # 21
        ("elevation_number", UINT1),  # 22
        ("cut_sector", UINT1),  # 23
        ("elevation_angle", FLT4),  # 24-27
        ("radial_blanking", CODE1),  # 28
        ("azimuth_mode", SINT1),  # 29
        ("block_count", UINT2),  # 30-31
        ("block_pointer_1", UINT4),  # 32-35  Volume Data Constant XVII-E
        ("block_pointer_2", UINT4),  # 36-39  Elevation Data Constant XVII-F
        ("block_pointer_3", UINT4),  # 40-43  Radial Data Constant XVII-H
        ("block_pointer_4", UINT4),  # 44-47  Moment "REF" XVII-{B/I}
        ("block_pointer_5", UINT4),  # 48-51  Moment "VEL"
        ("block_pointer_6", UINT4),  # 52-55  Moment "SW"
        ("block_pointer_7", UINT4),  # 56-59  Moment "ZDR"
        ("block_pointer_8", UINT4),  # 60-63  Moment "PHI"
        ("block_pointer_9", UINT4),  # 64-67  Moment "RHO"
        ("block_pointer_10", UINT4),  # Moment "CFP"
    ]
)
LEN_MSG_31 = struct.calcsize(_get_fmt_string(MSG_31, byte_order=">"))


# Table III Digital Radar Data (Message Type 1)
# pages 3-7 to
MSG_1 = OrderedDict(
    [
        ("collect_ms", UINT4),  # 0-3
        ("collect_date", UINT2),  # 4-5
        ("unambig_range", SINT2),  # 6-7
        ("azimuth_angle", CODE2),  # 8-9
        ("azimuth_number", UINT2),  # 10-11
        ("radial_status", CODE2),  # 12-13
        ("elevation_angle", UINT2),  # 14-15
        ("elevation_number", UINT2),  # 16-17
        ("sur_range_first", CODE2),  # 18-19
        ("doppler_range_first", CODE2),  # 20-21
        ("sur_range_step", CODE2),  # 22-23
        ("doppler_range_step", CODE2),  # 24-25
        ("sur_nbins", UINT2),  # 26-27
        ("doppler_nbins", UINT2),  # 28-29
        ("cut_sector_num", UINT2),  # 30-31
        ("calib_const", FLT4),  # 32-35
        ("sur_pointer", UINT2),  # 36-37
        ("vel_pointer", UINT2),  # 38-39
        ("width_pointer", UINT2),  # 40-41
        ("doppler_resolution", CODE2),  # 42-43
        ("vcp", UINT2),  # 44-45
        ("spare_1", {"fmt": "8s"}),  # 46-53
        ("spare_2", {"fmt": "2s"}),  # 54-55
        ("spare_3", {"fmt": "2s"}),  # 56-57
        ("spare_4", {"fmt": "2s"}),  # 58-59
        ("nyquist_vel", SINT2),  # 60-61
        ("atmos_attenuation", SINT2),  # 62-63
        ("threshold", SINT2),  # 64-65
        ("spot_blank_status", UINT2),  # 66-67
        ("spare_5", {"fmt": "32s"}),  # 68-99
        # 100+  reflectivity, velocity and/or spectral width data, CODE1
    ]
)
LEN_MSG_1 = struct.calcsize(_get_fmt_string(MSG_1, byte_order=">"))

# Table XI Volume Coverage Pattern Data (Message Type 5 & 7)
# pages 3-51 to 3-54
MSG_5 = OrderedDict(
    [
        ("message_size", UINT2),
        ("pattern_type", CODE2),
        ("pattern_number", UINT2),
        ("number_elevation_cuts", UINT2),
        ("clutter_map_group_number", UINT2),
        ("doppler_velocity_resolution", CODE1),  # 2: 0.5 degrees, 4: 1.0 degrees
        ("pulse_width", CODE1),  # 2: short, 4: long
        ("spare", {"fmt": "10s"}),  # halfwords 7-11 (10 bytes, 5 halfwords)
    ]
)
LEN_MSG_5 = struct.calcsize(_get_fmt_string(MSG_5, byte_order=">"))

MSG_5_ELEV = OrderedDict(
    [
        ("elevation_angle", BIN2),  # scaled by 360/65536 for value in degrees.
        ("channel_config", CODE1),
        ("waveform_type", CODE1),
        ("super_resolution", CODE1),
        ("prf_number", UINT1),
        ("prf_pulse_count", UINT2),
        ("azimuth_rate", CODE2),
        ("ref_thresh", SINT2),
        ("vel_thresh", SINT2),
        ("sw_thresh", SINT2),
        ("zdr_thres", SINT2),
        ("phi_thres", SINT2),
        ("rho_thres", SINT2),
        ("edge_angle_1", CODE2),
        ("dop_prf_num_1", UINT2),
        ("dop_prf_pulse_count_1", UINT2),
        ("spare_1", {"fmt": "2s"}),
        ("edge_angle_2", CODE2),
        ("dop_prf_num_2", UINT2),
        ("dop_prf_pulse_count_2", UINT2),
        ("spare_2", {"fmt": "2s"}),
        ("edge_angle_3", CODE2),
        ("dop_prf_num_3", UINT2),
        ("dop_prf_pulse_count_3", UINT2),
        ("spare_3", {"fmt": "2s"}),
    ]
)
LEN_MSG_5_ELEV = struct.calcsize(_get_fmt_string(MSG_5_ELEV, byte_order=">"))

MSG_18 = OrderedDict(
    [
        ("adap_file_name", {"fmt": "12s"}),
        ("adap_format", {"fmt": "4s"}),
        ("adap_revision", {"fmt": "4s"}),
        ("adap_date", {"fmt": "12s"}),
        ("adap_time", {"fmt": "12s"}),
        ("k1", FLT4),
        ("az_lat", FLT4),
        ("k3", FLT4),
        ("el_lat", FLT4),
        ("park_az", FLT4),
        ("park_el", FLT4),
        ("a_fuel_conv0", FLT4),
        ("a_fuel_conv1", FLT4),
        ("a_fuel_conv2", FLT4),
        ("a_fuel_conv3", FLT4),
        ("a_fuel_conv4", FLT4),
        ("a_fuel_conv5", FLT4),
        ("a_fuel_conv6", FLT4),
        ("a_fuel_conv7", FLT4),
        ("a_fuel_conv8", FLT4),
        ("a_fuel_conv9", FLT4),
        ("a_fuel_conv10", FLT4),
        ("a_min_shelter_temp", FLT4),
        ("a_max_shelter_temp", FLT4),
        ("a_min_shelter_ac_temp_diff", FLT4),
        ("a_max_xmtr_air_temp", FLT4),
        ("a_max_rad_temp", FLT4),
        ("a_max_rad_temp_rise", FLT4),
        ("ped_28v_reg_lim", FLT4),
        ("ped_5v_reg_lim", FLT4),
        ("ped_15v_reg_lim", FLT4),
        ("a_min_gen_room_temp", FLT4),
        ("a_max_gen_room_temp", FLT4),
        ("dau_5v_reg_lim", FLT4),
        ("dau_15v_reg_lim", FLT4),
        ("dau_28v_reg_lim", FLT4),
        ("en_5v_reg_lim", FLT4),
        ("en_5v_nom_volts", FLT4),
        ("rpg_co_located", {"fmt": "4s"}),
        ("spec_filter_installed", {"fmt": "4s"}),
        ("tps_installed", {"fmt": "4s"}),
        ("rms_installed", {"fmt": "4s"}),
        ("a_hvdl_tst_int", UINT4),
        ("a_rpg_lt_int", UINT4),
        ("a_min_stab_util_pwr_time", UINT4),
        ("a_gen_auto_exer_interval", UINT4),
        ("a_util_pwr_sw_req_interval", UINT4),
        ("a_low_fuel_level", FLT4),
        ("config_chan_number", UINT4),
        ("a_rpg_link_type", UINT4),
        ("redundant_chan_config", UINT4),
    ]
)
for i in np.arange(104):
    MSG_18[f"atten_table_{i:03d}"] = FLT4
MSG_18.update(
    [
        ("spare_01", {"fmt": "24s"}),
        ("path_losses_07", FLT4),
        ("spare_02", {"fmt": "12s"}),
        ("path_losses_11", FLT4),
        ("spare_03", {"fmt": "4s"}),
        ("path_losses_13", FLT4),
        ("path_losses_14", FLT4),
        ("spare_04", {"fmt": "16s"}),
        ("path_losses_19", FLT4),
        ("spare_05", {"fmt": "32s"}),
    ]
)
for i in np.arange(28, 48):
    MSG_18[f"path_losses_{i:02d}"] = FLT4
MSG_18.update(
    [
        ("h_coupler_cw_loss", FLT4),
        ("v_coupler_xmt_loss", FLT4),
        ("path_losses_50", FLT4),
        ("spare_06", {"fmt": "4s"}),
        ("path_losses_52", FLT4),
        ("v_coupler_cw_loss", FLT4),
        ("path_losses_54", FLT4),
        ("spare_07", {"fmt": "12s"}),
        ("path_losses_58", FLT4),
        ("path_losses_59", FLT4),
        ("path_losses_60", FLT4),
        ("path_losses_61", FLT4),
        ("spare_08", {"fmt": "4s"}),
        ("path_losses_63", FLT4),
        ("path_losses_64", FLT4),
        ("path_losses_65", FLT4),
        ("path_losses_66", FLT4),
        ("path_losses_67", FLT4),
        ("path_losses_68", FLT4),
        ("path_losses_69", FLT4),
        ("chan_cal_diff", FLT4),
        ("spare_09", {"fmt": "4s"}),
        ("log_amp_factor_1", FLT4),
        ("log_amp_factor_2", FLT4),
        ("v_ts_cw", FLT4),
    ]
)
for i in np.arange(13):
    MSG_18[f"rnscale_{i:02d}"] = FLT4
for i in np.arange(13):
    MSG_18[f"atmos_{i:02d}"] = FLT4
for i in np.arange(12):
    MSG_18[f"el_index_{i:02d}"] = FLT4
MSG_18.update(
    [
        ("tfreq_mhz", UINT4),
        ("base_data_tcn", FLT4),
        ("refl_data_tover", FLT4),
        ("tar_h_dbz0_lp", FLT4),
        ("tar_v_dbz0_lp", FLT4),
        ("init_phi_dp", UINT4),
        ("norm_init_phi_dp", UINT4),
        ("lx_lp", FLT4),
        ("lx_sp", FLT4),
        ("meteor_param", FLT4),
        ("beamwidth", FLT4),
        ("antenna_gain", FLT4),
        ("spare_10", {"fmt": "4s"}),
        ("vel_maint_limit", FLT4),
        ("wth_maint_limit", FLT4),
        ("vel_degrad_limit", FLT4),
        ("wth_degrad_limit", FLT4),
        ("h_noisetemp_degrad_limit", FLT4),
        ("h_noisetemp_maint_limit", FLT4),
        ("v_noisetemp_degrad_limit", FLT4),
        ("v_noisetemp_maint_limit", FLT4),
        ("kly_degrade_limit", FLT4),
        ("ts_coho", FLT4),
        ("h_ts_cw", FLT4),
        ("ts_rf_sp", FLT4),
        ("ts_rf_lp", FLT4),
        ("ts_stalo", FLT4),
        ("ame_h_noise_enr", FLT4),
        ("xmtr_peak_pwr_high_limit", FLT4),
        ("xmtr_peak_pwr_low_limit", FLT4),
        ("h_dbz0_delta_limit", FLT4),
        ("threshold1", FLT4),
        ("threshold2", FLT4),
        ("clut_supp_dgrad_lim", FLT4),
        ("clut_supp_maint_lim", FLT4),
        ("range0_value", FLT4),
        ("xmtr_pwr_mtr_scale", FLT4),
        ("v_dbz0_delta_limit", FLT4),
        ("tar_h_dbz0_sp", FLT4),
        ("tar_v_dbz0_sp", FLT4),
        ("deltaprf", UINT4),
        ("spare_11", {"fmt": "8s"}),
        ("tau_sp", UINT4),
        ("tau_lp", UINT4),
        ("nc_dead_value", UINT4),
        ("tau_rf_sp", UINT4),
        ("tau_rf_lp", UINT4),
        ("seg1lim", FLT4),
        ("slatsec", FLT4),
        ("slonsec", FLT4),
        ("spare_12", {"fmt": "4s"}),
        ("slatdeg", UINT4),
        ("slatmin", UINT4),
        ("slondeg", UINT4),
        ("slonmin", UINT4),
        ("slatdir", {"fmt": "4s"}),
        ("slondir", {"fmt": "4s"}),
        ("spare_13", {"fmt": "4s"}),
        ("vcpat_11", {"fmt": "1172s"}),
        ("vcpat_21", {"fmt": "1172s"}),
        ("vcpat_31", {"fmt": "1172s"}),
        ("vcpat_32", {"fmt": "1172s"}),
        ("vcpat_300", {"fmt": "1172s"}),
        ("vcpat_301", {"fmt": "1172s"}),
        ("az_correction_factor", FLT4),
        ("el_correction_factor", FLT4),
        ("site_name", {"fmt": "4s"}),
        ("ant_manual_setup_ielmin", SINT4),
        ("ant_manual_setup_ielmax", SINT4),
        ("ant_manual_setup_fazvelmax", SINT4),
        ("ant_manual_setup_felvelmax", SINT4),
        ("ant_manual_setup_ignd_hgt", SINT4),
        ("ant_manual_setup_irad_hgt", SINT4),
        ("spare_14", {"fmt": "300s"}),
        ("rvp8nv_iwaveguide_lenght", UINT4),
        ("spare_15", {"fmt": "44s"}),
        ("vel_data_tover", FLT4),
        ("width_data_tover", FLT4),
        ("spare_16", {"fmt": "12s"}),
        ("doppler_range_start", FLT4),
        ("max_el_index", UINT4),
        ("seg2lim", FLT4),
        ("seg3lim", FLT4),
        ("seg4lim", FLT4),
        ("nbr_el_segments", UINT4),
        ("h_noise_long", FLT4),
        ("ant_noise_temp", FLT4),
        ("h_noise_short", FLT4),
        ("h_noise_tolerance", FLT4),
        ("min_h_dyn_range", FLT4),
        ("gen_installed", {"fmt": "4s"}),
        ("gen_exercise", {"fmt": "4s"}),
        ("v_noise_tolerance", FLT4),
        ("min_v_dyn_range", FLT4),
        ("zdr_bias_dgrad_lim", FLT4),
        ("spare_17", {"fmt": "16s"}),
        ("v_noise_long", FLT4),
        ("v_noise_short", FLT4),
        ("zdr_data_tover", FLT4),
        ("phi_data_tover", FLT4),
        ("rho_data_tover", FLT4),
        ("stalo_power_dgrad_limit", FLT4),
        ("stalo_power_maint_limit", FLT4),
        ("min_h_pwr_sense", FLT4),
        ("min_v_pwr_sense", FLT4),
        ("h_pwr_sense_offset", FLT4),
        ("v_pwr_sense_offset", FLT4),
        ("ps_gain_ref", FLT4),
        ("rf_pallet_broad_loss", FLT4),
        ("zdr_check_threshold", FLT4),
        ("phi_check_threshold", FLT4),
        ("rho_check_threshold", FLT4),
        ("spare_18", {"fmt": "52s"}),
        ("ame_ps_tolerance", FLT4),
        ("ame_max_temp", FLT4),
        ("ame_min_temp", FLT4),
        ("rcvr_mod_max_temp", FLT4),
        ("rcvr_mod_min_temp", FLT4),
        ("bite_mod_max_temp", FLT4),
        ("bite_mod_min_temp", FLT4),
        ("default_polarization", UINT4),
        ("tr_limit_dgrad_limit", FLT4),
        ("tr_limit_fail_limit", FLT4),
        ("spare_19", {"fmt": "8s"}),
        ("ame_current_tolerance", FLT4),
        ("h_only_polarization", UINT4),
        ("v_only_polarization", UINT4),
        ("spare_20", {"fmt": "8s"}),
        ("reflector_bias", FLT4),
        ("a_min_shelter_temp_warn", FLT4),
        ("spare_21", {"fmt": "432s"}),
    ]
)
LEN_MSG_18 = struct.calcsize(_get_fmt_string(MSG_18, byte_order=">"))

DATA_BLOCK_HEADER = OrderedDict(
    [("block_type", string_dict(1)), ("data_name", string_dict(3))]
)
LEN_DATA_BLOCK_HEADER = struct.calcsize(
    _get_fmt_string(DATA_BLOCK_HEADER, byte_order=">")
)


# Table XVII-B Data Block (Descriptor of Generic Data Moment Type)
# pages 3-90 and 3-91
GENERIC_DATA_BLOCK = OrderedDict(
    [
        ("reserved", UINT4),
        ("ngates", UINT2),
        ("first_gate", SINT2),
        ("gate_spacing", SINT2),
        ("thresh", SINT2),
        ("snr_thres", SINT2),
        ("flags", CODE1),
        ("word_size", UINT1),
        ("scale", FLT4),
        ("offset", FLT4),
        # then data
    ]
)
LEN_GENERIC_DATA_BLOCK = struct.calcsize(
    _get_fmt_string(GENERIC_DATA_BLOCK, byte_order=">")
)

# Table XVII-E Data Block (Volume Data Constant Type)
# page 3-92
VOLUME_DATA_BLOCK = OrderedDict(
    [
        ("lrtup", UINT2),
        ("version_major", UINT1),
        ("version_minor", UINT1),
        ("lat", FLT4),
        ("lon", FLT4),
        ("height", SINT2),
        ("feedhorn_height", UINT2),
        ("refl_calib", FLT4),
        ("power_h", FLT4),
        ("power_v", FLT4),
        ("diff_refl_calib", FLT4),
        ("init_phase", FLT4),
        ("vcp", UINT2),
        ("spare", {"fmt": "2s"}),
    ]
)
LEN_VOLUME_DATA_BLOCK = struct.calcsize(
    _get_fmt_string(VOLUME_DATA_BLOCK, byte_order=">")
)

# Table XVII-F Data Block (Elevation Data Constant Type)
# page 3-93
ELEVATION_DATA_BLOCK = OrderedDict(
    [
        ("lrtup", UINT2),
        ("atmos", SINT2),
        ("refl_calib", FLT4),
    ]
)
LEN_ELEVATION_DATA_BLOCK = struct.calcsize(
    _get_fmt_string(ELEVATION_DATA_BLOCK, byte_order=">")
)

# Table XVII-H Data Block (Radial Data Constant Type)
# pages 3-93
RADIAL_DATA_BLOCK = OrderedDict(
    [
        ("lrtup", UINT2),
        ("unambig_range", SINT2),
        ("noise_h", FLT4),
        ("noise_v", FLT4),
        ("nyquist_vel", SINT2),
        ("spare", {"fmt": "2s"}),
    ]
)
LEN_RADIAL_DATA_BLOCK = struct.calcsize(
    _get_fmt_string(RADIAL_DATA_BLOCK, byte_order=">")
)

DATA_BLOCK_CONSTANT_IDENTIFIERS = OrderedDict(
    [
        ("VOL", VOLUME_DATA_BLOCK),
        ("ELV", ELEVATION_DATA_BLOCK),
        ("RAD", RADIAL_DATA_BLOCK),
    ]
)

DATA_BLOCK_VARIABLE_IDENTIFIERS = OrderedDict(
    [
        ("REF", GENERIC_DATA_BLOCK),
        ("VEL", GENERIC_DATA_BLOCK),
        ("SW ", GENERIC_DATA_BLOCK),
        ("ZDR", GENERIC_DATA_BLOCK),
        ("PHI", GENERIC_DATA_BLOCK),
        ("RHO", GENERIC_DATA_BLOCK),
        ("CFP", GENERIC_DATA_BLOCK),
    ]
)

DATA_BLOCK_TYPE_IDENTIFIER = OrderedDict(
    [
        ("R", DATA_BLOCK_CONSTANT_IDENTIFIERS),
        ("D", DATA_BLOCK_VARIABLE_IDENTIFIERS),
    ]
)


class NexradLevel2ArrayWrapper(BackendArray):
    """Wraps array of NexradLevel2 data."""

    def __init__(self, datastore, name, var):
        self.datastore = datastore
        self.group = datastore._group
        self.name = name
        # get rays and bins
        nrays = (
            datastore.ds["record_end"]
            - datastore.ds["record_number"]
            + 1
            - len(datastore.ds["intermediate_records"])
        )
        nbins = max([v["ngates"] for k, v in datastore.ds["sweep_data"].items()])
        word_size = datastore.ds["sweep_data"][name]["word_size"]
        width = {8: 1, 16: 2}[word_size]
        self.dtype = np.dtype(f">u{width}")
        self.shape = (nrays, nbins)

    def _getitem(self, key):
        # read the data if not available
        try:
            data = self.datastore.ds["sweep_data"][self.name]["data"]
        except KeyError:
            self.datastore.root.get_data(self.group, self.name)
            data = self.datastore.ds["sweep_data"][self.name]["data"]
        # see 3.2.4.17.6 Table XVII-I Data Moment Characteristics and Conversion for Data Names
        word_size = self.datastore.ds["sweep_data"][self.name]["word_size"]
        if self.name == "PHI" and word_size == 16:
            # 10 bit mask, but only for 2 byte data
            x = np.uint16(0x3FF)
        elif self.name == "ZDR" and word_size == 16:
            # 11 bit mask, but only for 2 byte data
            x = np.uint16(0x7FF)
        else:
            x = np.uint8(0xFF)
        if len(data[0]) < self.shape[1]:
            return np.pad(
                np.vstack(data) & x,
                ((0, 0), (0, self.shape[1] - len(data[0]))),
                mode="constant",
                constant_values=0,
            )[key]
        else:
            return (np.vstack(data) & x)[key]

    def __getitem__(self, key):
        return indexing.explicit_indexing_adapter(
            key,
            self.shape,
            indexing.IndexingSupport.BASIC,
            self._getitem,
        )


class NexradLevel2Store(AbstractDataStore):
    def __init__(self, manager, group=None):
        self._manager = manager
        self._group = int(group[6:])
        self._filename = self.filename

    @classmethod
    def open(cls, filename, mode="r", group=None, **kwargs):
        manager = CachingFileManager(
            NEXRADLevel2File, filename, mode=mode, kwargs=kwargs
        )
        return cls(manager, group=group)

    @classmethod
    def open_groups(cls, filename, groups, mode="r", **kwargs):
        manager = CachingFileManager(
            NEXRADLevel2File, filename, mode=mode, kwargs=kwargs
        )
        return {group: cls(manager, group=group) for group in groups}

    @property
    def filename(self):
        with self._manager.acquire_context(False) as root:
            return root.filename

    @property
    def root(self):
        with self._manager.acquire_context(False) as root:
            return root

    def _acquire(self, needs_lock=True):
        with self._manager.acquire_context(needs_lock) as root:
            root.get_sweep(self._group)
            ds = root.data[self._group]
        return ds

    @property
    def ds(self):
        return self._acquire()

    def open_store_variable(self, name, var):
        dim = "azimuth"

        data = indexing.LazilyOuterIndexedArray(
            NexradLevel2ArrayWrapper(self, name, var)
        )
        encoding = {"group": self._group, "source": self._filename}

        mname = nexrad_mapping.get(name, name)
        mapping = sweep_vars_mapping.get(mname, {})
        attrs = {key: mapping[key] for key in moment_attrs if key in mapping}
        attrs["scale_factor"] = 1.0 / var["scale"]
        attrs["add_offset"] = -var["offset"] / var["scale"]
        attrs["coordinates"] = (
            "elevation azimuth range latitude longitude altitude time"
        )
        return mname, Variable((dim, "range"), data, attrs, encoding)

    def open_store_coordinates(self):
        msg_31_header = self.root.msg_31_header[self._group]

        # azimuth/elevation
        azimuth = np.array([ms["azimuth_angle"] for ms in msg_31_header])
        elevation = np.array([ms["elevation_angle"] for ms in msg_31_header])

        # time
        # date is 1-based (1 -> 1970-01-01T00:00:00), so we need to subtract 1
        date = (np.array([ms["collect_date"] for ms in msg_31_header]) - 1) * 86400e3
        milliseconds = np.array([ms["collect_ms"] for ms in msg_31_header])
        rtime = date + milliseconds
        time_prefix = "milli"
        rtime_attrs = get_time_attrs(date_unit=f"{time_prefix}seconds")

        # site coords
        vol = self.ds["sweep_constant_data"]["VOL"]
        lon, lat, alt = vol["lon"], vol["lat"], vol["height"] + vol["feedhorn_height"]

        # range
        sweep_data = list(self.ds["sweep_data"].values())[0]
        first_gate = sweep_data["first_gate"]
        gate_spacing = sweep_data["gate_spacing"]
        ngates = max([v["ngates"] for k, v in self.ds["sweep_data"].items()])
        last_gate = first_gate + gate_spacing * (ngates - 0.5)
        rng = np.arange(first_gate, last_gate, gate_spacing, "float32")
        range_attrs = get_range_attrs(rng)

        encoding = {"group": self._group}
        dim = "azimuth"
        sweep_mode = "azimuth_surveillance"
        sweep_number = self._group
        prt_mode = "not_set"
        follow_mode = "not_set"

        fixed_angle = self.root.msg_5["elevation_data"][self._group]["elevation_angle"]

        coords = {
            "azimuth": Variable((dim,), azimuth, get_azimuth_attrs(), encoding),
            "elevation": Variable((dim,), elevation, get_elevation_attrs(), encoding),
            "time": Variable((dim,), rtime, rtime_attrs, encoding),
            "range": Variable(("range",), rng, range_attrs),
            "longitude": Variable((), lon, get_longitude_attrs()),
            "latitude": Variable((), lat, get_latitude_attrs()),
            "altitude": Variable((), alt, get_altitude_attrs()),
            "sweep_mode": Variable((), sweep_mode),
            "sweep_number": Variable((), sweep_number),
            "prt_mode": Variable((), prt_mode),
            "follow_mode": Variable((), follow_mode),
            "sweep_fixed_angle": Variable((), fixed_angle),
        }

        return coords

    def get_variables(self):
        return FrozenDict(
            (k1, v1)
            for k1, v1 in {
                **dict(
                    self.open_store_variable(k, v)
                    for k, v in self.ds["sweep_data"].items()
                ),
                **self.open_store_coordinates(),
            }.items()
        )

    def get_attrs(self):
        _attributes = [
            ("instrument_name", self.root.volume_header["icao"].decode()),
            ("scan_name", f"VCP-{self.root.msg_5['pattern_number']}"),
        ]

        return FrozenDict(_attributes)


[docs] class NexradLevel2BackendEntrypoint(BackendEntrypoint): """ Xarray BackendEntrypoint for NEXRAD Level2 Data """ description = "Open NEXRAD Level2 files in Xarray" url = "tbd" def open_dataset( self, filename_or_obj, *, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, drop_variables=None, use_cftime=None, decode_timedelta=None, group=None, first_dim="auto", reindex_angle=False, fix_second_angle=False, site_coords=True, optional=True, ): store = NexradLevel2Store.open( filename_or_obj, group=group, loaddata=False, ) store_entrypoint = StoreBackendEntrypoint() ds = store_entrypoint.open_dataset( store, mask_and_scale=mask_and_scale, decode_times=decode_times, concat_characters=concat_characters, decode_coords=decode_coords, drop_variables=drop_variables, use_cftime=use_cftime, decode_timedelta=decode_timedelta, ) # reassign azimuth/elevation/time coordinates ds = ds.assign_coords({"azimuth": ds.azimuth}) ds = ds.assign_coords({"elevation": ds.elevation}) ds = ds.assign_coords({"time": ds.time}) ds.encoding["engine"] = "nexradlevel2" # handle duplicates and reindex if decode_coords and reindex_angle is not False: ds = ds.pipe(util.remove_duplicate_rays) ds = ds.pipe(util.reindex_angle, **reindex_angle) ds = ds.pipe(util.ipol_time, **reindex_angle) # handling first dimension dim0 = "elevation" if ds.sweep_mode.load() == "rhi" else "azimuth" # todo: could be optimized if first_dim == "time": ds = ds.swap_dims({dim0: "time"}) ds = ds.sortby("time") else: ds = ds.sortby(dim0) # assign geo-coords if site_coords: ds = ds.assign_coords( { "latitude": ds.latitude, "longitude": ds.longitude, "altitude": ds.altitude, } ) return ds
[docs] def open_nexradlevel2_datatree( filename_or_obj, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, drop_variables=None, use_cftime=None, decode_timedelta=None, sweep=None, first_dim="auto", reindex_angle=False, fix_second_angle=False, site_coords=True, optional=True, **kwargs, ): """Open a NEXRAD Level2 dataset as an `xarray.DataTree`. This function loads NEXRAD Level2 radar data into a DataTree structure, which organizes radar sweeps as separate nodes. Provides options for decoding time and applying various transformations to the data. Parameters ---------- filename_or_obj : str, Path, file-like, or DataStore The path or file-like object representing the radar file. Path-like objects are interpreted as local or remote paths. mask_and_scale : bool, optional If True, replaces values in the dataset that match `_FillValue` with NaN and applies scale and offset adjustments. Default is True. decode_times : bool, optional If True, decodes time variables according to CF conventions. Default is True. concat_characters : bool, optional If True, concatenates character arrays along the last dimension, forming string arrays. Default is True. decode_coords : bool, optional If True, decodes the "coordinates" attribute to identify coordinates in the resulting dataset. Default is True. drop_variables : str or list of str, optional Specifies variables to exclude from the dataset. Useful for removing problematic or inconsistent variables. Default is None. use_cftime : bool, optional If True, uses cftime objects to represent time variables; if False, uses `np.datetime64` objects. If None, chooses the best format automatically. Default is None. decode_timedelta : bool, optional If True, decodes variables with units of time (e.g., seconds, minutes) into timedelta objects. If False, leaves them as numeric values. Default is None. sweep : int or list of int, optional Sweep numbers to extract from the dataset. If None, extracts all sweeps into a list. Default is the first sweep. first_dim : {"time", "auto"}, optional Defines the first dimension for each sweep. If "time," uses time as the first dimension. If "auto," determines the first dimension based on the sweep type (azimuth or elevation). Default is "auto." reindex_angle : bool or dict, optional Controls angle reindexing. If True or a dictionary, applies reindexing with specified settings (if given). Only used if `decode_coords=True`. Default is False. fix_second_angle : bool, optional If True, corrects errors in the second angle data, such as misaligned elevation or azimuth values. Default is False. site_coords : bool, optional Attaches radar site coordinates to the dataset if True. Default is True. optional : bool, optional If True, suppresses errors for optional dataset attributes, making them optional instead of required. Default is True. kwargs : dict Additional keyword arguments passed to `xarray.open_dataset`. Returns ------- dtree : xarray.DataTree An `xarray.DataTree` representing the radar data organized by sweeps. """ from xarray.core.treenode import NodePath comment = None if isinstance(sweep, str): sweep = NodePath(sweep).name sweeps = [sweep] elif isinstance(sweep, int): sweeps = [f"sweep_{sweep}"] elif isinstance(sweep, list): if isinstance(sweep[0], int): sweeps = [f"sweep_{i}" for i in sweep] elif isinstance(sweep[0], str): sweeps = [NodePath(i).name for i in sweep] else: raise ValueError( "Invalid type in 'sweep' list. Expected integers (e.g., [0, 1, 2]) or strings (e.g. [/sweep_0, sweep_1])." ) else: with NEXRADLevel2File(filename_or_obj, loaddata=False) as nex: nsweeps = nex.msg_5["number_elevation_cuts"] # Check if duplicated sweeps ("split cut mode") n_sweeps = len(nex.msg_31_data_header) if nsweeps > n_sweeps: nsweeps = n_sweeps comment = "Split Cut Mode scanning strategy" sweeps = [f"sweep_{i}" for i in range(nsweeps)] sweep_dict = open_sweeps_as_dict( filename_or_obj=filename_or_obj, mask_and_scale=mask_and_scale, decode_times=decode_times, concat_characters=concat_characters, decode_coords=decode_coords, drop_variables=drop_variables, use_cftime=use_cftime, decode_timedelta=decode_timedelta, sweeps=sweeps, first_dim=first_dim, reindex_angle=reindex_angle, fix_second_angle=fix_second_angle, site_coords=site_coords, optional=optional, **kwargs, ) ls_ds: list[xr.Dataset] = [sweep_dict[sweep] for sweep in sweep_dict.keys()] ls_ds.insert(0, xr.Dataset()) if comment is not None: ls_ds[0].attrs["comment"] = comment dtree: dict = { "/": _assign_root(ls_ds), "/radar_parameters": _get_subgroup(ls_ds, radar_parameters_subgroup), "/georeferencing_correction": _get_subgroup( ls_ds, georeferencing_correction_subgroup ), "/radar_calibration": _get_radar_calibration(ls_ds, radar_calibration_subgroup), } # todo: refactor _assign_root and _get_subgroup to recieve dict instead of list of datasets. # avoiding remove the attributes in the following line sweep_dict = { sweep_path: sweep_dict[sweep_path].drop_attrs(deep=False) for sweep_path in sweep_dict.keys() } dtree = dtree | sweep_dict return DataTree.from_dict(dtree)
def open_sweeps_as_dict( filename_or_obj, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, drop_variables=None, use_cftime=None, decode_timedelta=None, sweeps=None, first_dim="auto", reindex_angle=False, fix_second_angle=False, site_coords=True, optional=True, **kwargs, ): stores = NexradLevel2Store.open_groups( filename=filename_or_obj, groups=sweeps, ) groups_dict = {} for path_group, store in stores.items(): store_entrypoint = StoreBackendEntrypoint() with close_on_error(store): group_ds = store_entrypoint.open_dataset( store, mask_and_scale=mask_and_scale, decode_times=decode_times, concat_characters=concat_characters, decode_coords=decode_coords, drop_variables=drop_variables, use_cftime=use_cftime, decode_timedelta=decode_timedelta, ) # reassign azimuth/elevation/time coordinates group_ds = group_ds.assign_coords({"azimuth": group_ds.azimuth}) group_ds = group_ds.assign_coords({"elevation": group_ds.elevation}) group_ds = group_ds.assign_coords({"time": group_ds.time}) group_ds.encoding["engine"] = "nexradlevel2" # handle duplicates and reindex if decode_coords and reindex_angle is not False: group_ds = group_ds.pipe(util.remove_duplicate_rays) group_ds = group_ds.pipe(util.reindex_angle, **reindex_angle) group_ds = group_ds.pipe(util.ipol_time, **reindex_angle) # handling first dimension dim0 = "elevation" if group_ds.sweep_mode.load() == "rhi" else "azimuth" # todo: could be optimized if first_dim == "time": group_ds = group_ds.swap_dims({dim0: "time"}) group_ds = group_ds.sortby("time") else: group_ds = group_ds.sortby(dim0) # assign geo-coords if site_coords: group_ds = group_ds.assign_coords( { "latitude": group_ds.latitude, "longitude": group_ds.longitude, "altitude": group_ds.altitude, } ) groups_dict[path_group] = group_ds return groups_dict