Source code for xradar.io.backends.nexrad_level2

#!/usr/bin/env python
# Copyright (c) 2024, openradar developers.
# Distributed under the MIT License. See LICENSE for more info.

"""
NEXRAD Level2 Data I/O
^^^^^^^^^^^^^^^^^^^^^^

Reads data from NEXRAD Level2 data format

See https://www.roc.noaa.gov/WSR88D/BuildInfo/Files.aspx

Documents:
 - ICD 2620002 M ICD FOR RDA/RPG - Build RDA 11.5/RPG 13.0 (PDF)
 - ICD 2620010 E ICD FOR ARCHIVE II/USER - Build 12.0 (PDF)

To read from NEXRAD Level2 files :class:`numpy:numpy.memmap` is used for
uncompressed files (pre 2016-06-01) and :class:`bz2:BZ2Decompressor` for BZ2
compressed data. The NEXRAD header (`VOLUME_HEADER`, `MSG_HEADER`) are read in
any case into dedicated OrderedDict's. Reading sweep data can be skipped by
setting `loaddata=False`. By default, the data is decoded on the fly.
Using `rawdata=True` the data will be kept undecoded.

Code adapted from Py-ART.

.. autosummary::
   :nosignatures:
   :toctree: generated/

   {}
"""
__all__ = [
    "NexradLevel2BackendEntrypoint",
    "open_nexradlevel2_datatree",
]
__doc__ = __doc__.format("\n   ".join(__all__))

import bz2
import struct
from collections import OrderedDict

import numpy as np
import xarray as xr
from xarray import DataTree
from xarray.backends.common import AbstractDataStore, BackendArray, BackendEntrypoint
from xarray.backends.file_manager import CachingFileManager
from xarray.backends.store import StoreBackendEntrypoint
from xarray.core import indexing
from xarray.core.utils import FrozenDict
from xarray.core.variable import Variable

from xradar import util
from xradar.io.backends.common import (
    _assign_root,
    _attach_sweep_groups,
    _get_radar_calibration,
    _get_subgroup,
)
from xradar.model import (
    georeferencing_correction_subgroup,
    get_altitude_attrs,
    get_azimuth_attrs,
    get_elevation_attrs,
    get_latitude_attrs,
    get_longitude_attrs,
    get_range_attrs,
    get_time_attrs,
    moment_attrs,
    radar_calibration_subgroup,
    radar_parameters_subgroup,
    sweep_vars_mapping,
)

from .iris import (
    BIN2,
    IrisRecord,
    _get_fmt_string,
    _unpack_dictionary,
    string_dict,
)

#: mapping from NEXRAD names to CfRadial2/ODIM
nexrad_mapping = {
    "REF": "DBZH",
    "VEL": "VRADH",
    "SW ": "WRADH",
    "ZDR": "ZDR",
    "PHI": "PHIDP",
    "RHO": "RHOHV",
    "CFP": "CCORH",
}

# NEXRAD Level II file structures and sizes
# The deails on these structures are documented in:
# "Interface Control Document for the Achive II/User" RPG Build 12.0
# Document Number 2620010E
# and
# "Interface Control Document for the RDA/RPG" Open Build 13.0
# Document Number 2620002M
# Tables and page number refer to those in the second document unless
# otherwise noted.
RECORD_BYTES = 2432
COMPRESSION_RECORD_SIZE = 12
CONTROL_WORD_SIZE = 4

# format of structure elements
# section 3.2.1, page 3-2
UINT1 = {"fmt": "B", "dtype": "unit8"}
UINT2 = {"fmt": "H", "dtype": "uint16"}
UINT4 = {"fmt": "I", "dtype": "uint32"}
SINT1 = {"fmt": "b", "dtype": "int8"}
SINT2 = {"fmt": "h", "dtype": "int16"}
SINT4 = {"fmt": "i", "dtype": "int32"}
FLT4 = {"fmt": "f", "dtype": "float32"}
CODE1 = {"fmt": "B"}
CODE2 = {"fmt": "H"}


class NEXRADFile:
    """
    Class for accessing data in a NEXRAD (WSR-88D) Level II file.

    NEXRAD Level II files [1]_, also know as NEXRAD Archive Level II or
    WSR-88D Archive level 2, are available from the NOAA National Climate Data
    Center [2]_ as well as on the UCAR THREDDS Data Server [3]_. Files with
    uncompressed messages and compressed messages are supported. This class
    supports reading both "message 31" and "message 1" type files.

    Parameters
    ----------
    filename : str
        Filename of Archive II file to read.

    References
    ----------
    .. [1] http://www.roc.noaa.gov/WSR88D/Level_II/Level2Info.aspx
    .. [2] http://www.ncdc.noaa.gov/
    .. [3] http://thredds.ucar.edu/thredds/catalog.html

    """

    def __init__(self, filename, mode="r", loaddata=False):
        """initalize the object."""
        self._fp = None
        self._filename = filename
        # read in the volume header and compression_record
        if hasattr(filename, "read"):
            self._fh = filename
        else:
            self._fp = open(filename, "rb")
            self._fh = np.memmap(self._fp, mode=mode)
        self._filepos = 0
        self._rawdata = False
        self._loaddata = loaddata
        self._bz2_indices = None
        self.volume_header = self.get_header(VOLUME_HEADER)
        return

    @property
    def filename(self):
        return self._filename

    @property
    def fh(self):
        return self._fh

    @property
    def filepos(self):
        return self._filepos

    @filepos.setter
    def filepos(self, pos):
        self._filepos = pos

    def read_from_file(self, size):
        """Read from file.

        Parameters
        ----------
        size : int
            Number of data words to read.

        Returns
        -------
        data : array-like
            numpy array of data
        """
        start = self._filepos
        self._filepos += size
        return self._fh[start : self._filepos]

    def get_header(self, header):
        len = struct.calcsize(_get_fmt_string(header))
        head = _unpack_dictionary(self.read_from_file(len), header, self._rawdata)
        return head

    @property
    def bz2_record_indices(self):
        """Get file offsets of bz2 records."""
        if self._bz2_indices is None:
            # magic number inside BZ2
            seq = np.array([49, 65, 89, 38, 83, 89], dtype=np.uint8)
            rd = util.rolling_dim(self._fh, 6)
            self._bz2_indices = np.nonzero((rd == seq).all(1))[0] - 8
        return self._bz2_indices

    @property
    def is_compressed(self):
        """File contains bz2 compressed data."""
        size = self._fh[24:28].view(dtype=">u4")[0]
        return size > 0

    def close(self):
        if self._fp is not None:
            self._fp.close()

    __del__ = close

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.close()


class NEXRADRecordFile(NEXRADFile):
    """NEXRADRecordFile Record File class"""

    def __init__(self, filename, **kwargs):
        super().__init__(filename=filename, **kwargs)
        self._rh = None
        self._rc = None
        self._ldm = dict()
        self._record_number = None

    @property
    def rh(self):
        """Returns current record object."""
        return self._rh

    @rh.setter
    def rh(self, value):
        """Sets current record object."""
        self._rh = value

    @property
    def record_number(self):
        """Returns current record number."""
        return self._record_number

    @record_number.setter
    def record_number(self, value):
        """Sets current record number."""
        self._record_number = value

    @property
    def record_size(self):
        """Returns current record number."""
        return self._record_size

    @record_size.setter
    def record_size(self, value):
        """Sets current record number."""
        self._record_size = value

    def _check_record(self):
        """Checks record for correct size.

        Need to be implemented in the derived classes
        """
        return True

    def get_end(self, buf):
        if len(buf) < 16:
            return 0
        header = _unpack_dictionary(buf, MSG_HEADER, self._rawdata, byte_order=">")
        size = header["size"] * 2 + 12
        if header["type"] != 31:
            size = size if size >= RECORD_BYTES else RECORD_BYTES
        return size

    def init_record(self, recnum):
        """Initialize record using given number."""

        # map record numbers to ldm compressed records
        def get_ldm(recnum):
            if recnum < 134:
                return 0
            mod = ((recnum - 134) // 120) + 1
            return mod

        if self.is_compressed:
            ldm = get_ldm(recnum)
            # get uncompressed ldm record
            if self._ldm.get(ldm, None) is None:
                # otherwise extract wanted ldm compressed record
                if ldm >= len(self.bz2_record_indices):
                    return False
                start = self.bz2_record_indices[ldm]
                size = self._fh[start : start + 4].view(dtype=">u4")[0]
                self._fp.seek(start + 4)
                dec = bz2.BZ2Decompressor()
                self._ldm[ldm] = np.frombuffer(
                    dec.decompress(self._fp.read(size)), dtype=np.uint8
                )

        # rectrieve wanted record and put into self.rh
        if recnum < 134:
            start = recnum * RECORD_BYTES
            if not self.is_compressed:
                start += 24
            stop = start + RECORD_BYTES
        else:
            if self.is_compressed:
                # get index into current compressed ldm record
                rnum = (recnum - 134) % 120
                start = self.record_size + self.filepos if rnum else 0
                buf = self._ldm[ldm][start + 12 : start + 12 + LEN_MSG_HEADER]
                size = self.get_end(buf)
                if not size:
                    return False
                stop = start + size
            else:
                start = self.record_size + self.filepos
                buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER]
                size = self.get_end(buf)
                if not size:
                    return False
                stop = start + size
        self.record_number = recnum
        self.record_size = stop - start
        if self.is_compressed:
            self.rh = IrisRecord(self._ldm[ldm][start:stop], recnum)
        else:
            self.rh = IrisRecord(self.fh[start:stop], recnum)
        self.filepos = start
        return self._check_record()

    def init_record_by_filepos(self, recnum, filepos):
        """Initialize record using given record number and position."""
        start = filepos
        buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER]
        size = self.get_end(buf)
        if not size:
            return False
        stop = start + size
        self.record_number = recnum
        self.record_size = stop - start
        self.rh = IrisRecord(self.fh[start:stop], recnum)
        self.filepos = start
        return self._check_record()

    def init_next_record(self):
        """Get next record from file.

        This increases record_number count and initialises a new IrisRecord
        with the calculated start and stop file offsets.

        Returns
        -------
        chk : bool
            True, if record is truncated.
        """
        return self.init_record(self.record_number + 1)

    def array_from_record(self, words, width, dtype):
        """Retrieve array from current record.

        Parameters
        ----------
        words : int
            Number of data words to read.
        width : int
            Size of the data word to read in bytes.
        dtype : str
            dtype string specifying data format.

        Returns
        -------
        data : array-like
            numpy array of data
        """
        return self.rh.read(words, width=width).view(dtype=dtype)

    def bytes_from_record(self, words, width):
        """Retrieve bytes from current record.

        Parameters
        ----------
        words : int
            Number of data words to read.
        width : int
            Size of the data word to read in bytes.

        Returns
        -------
        data : array-like
            numpy array of data
        """
        return self.rh.read(words, width=width)


class NEXRADLevel2File(NEXRADRecordFile):
    def __init__(self, filename, **kwargs):
        super().__init__(filename, **kwargs)

        # check compression

        self._data_header = None
        # get all metadata headers
        # message 15, 13, 18, 3, 5 and 2
        self._meta_header = None

        # message 15
        # RDA Clutter Map Data
        # message 13
        # RDA Clutter Filter Bypass Map
        # message 18
        # RDA Adaptable Parameters
        # message 3
        # RDA Performance/Maintenance Data
        # message 5
        # RDA Volume Coverage Data
        self._msg_5_data = None
        # message 2
        # RDA Status Data

        # message 31 headers
        # Digital Radar Data Generic Format
        self._data_header = None
        self._msg_31_header = None
        self._msg_31_data_header = None
        self._data = OrderedDict()

    @property
    def data_header(self):
        """Retrieve data header."""
        if self._data_header is None:
            (
                self._data_header,
                self._msg_31_header,
                self._msg_31_data_header,
            ) = self.get_data_header()
        return self._data_header

    @property
    def msg_31_header(self):
        """Retrieve MSG31 Header."""
        if self._msg_31_header is None:
            (
                self._data_header,
                self._msg_31_header,
                self._msg_31_data_header,
            ) = self.get_data_header()
        return self._msg_31_header

    @property
    def msg_31_data_header(self):
        """Retrieve MSG31 data header."""
        if self._msg_31_data_header is None:
            (
                self._data_header,
                self._msg_31_header,
                self._msg_31_data_header,
            ) = self.get_data_header()
        return self._msg_31_data_header

    @property
    def meta_header(self):
        """Retrieve metadat header."""
        if self._meta_header is None:
            self._meta_header = self.get_metadata_header()
        return self._meta_header

    @property
    def msg_5(self):
        """Retrieve MSG5 data."""
        if self._msg_5_data is None:
            self._msg_5_data = self.get_msg_5_data()
        return self._msg_5_data

    @property
    def data(self):
        """Retrieve data."""
        return self._data

    def get_sweep(self, sweep_number, moments=None):
        """Retrieve sweep according sweep_number."""
        if moments is None:
            moments = self.msg_31_data_header[sweep_number]["msg_31_data_header"].keys()
        # get selected coordinate type names
        selected = [k for k in DATA_BLOCK_CONSTANT_IDENTIFIERS.keys() if k in moments]
        for moment in selected:
            self.get_moment(sweep_number, moment, "sweep_constant_data")

        # get selected data type names
        selected = [k for k in DATA_BLOCK_VARIABLE_IDENTIFIERS.keys() if k in moments]
        for moment in selected:
            self.get_moment(sweep_number, moment, "sweep_data")

    def get_moment(self, sweep_number, moment, mtype):
        """Retrieve Moment according sweep_number and moment."""
        sweep = self.data[sweep_number]
        # create new sweep_data OrderedDict
        if not sweep.get(mtype, False):
            sweep[mtype] = OrderedDict()
        # check if moment exist and return early
        data = sweep[mtype].get(moment, False)
        if data is not False:
            return

        data = OrderedDict()
        data[moment] = self.msg_31_data_header[sweep_number]["msg_31_data_header"].pop(
            moment
        )
        sweep.update(self.msg_31_data_header[sweep_number])
        sweep[mtype].update(data)

    def get_metadata_header(self):
        """Get metadaata header"""
        # data offsets
        # ICD 2620010E
        # 7.3.5 Metadata Record
        meta_offsets = dict(
            msg_15=(0, 77),
            msg_13=(77, 126),
            msg_18=(126, 131),
            msg_3=(131, 132),
            msg_5=(132, 133),
            msg_2=(133, 134),
        )
        meta_headers = {}
        for msg, (ms, me) in meta_offsets.items():
            mheader = []
            for rec in np.arange(ms, me):
                self.init_record(rec)
                filepos = self.filepos
                message_header = self.get_message_header()
                # do not read zero blocks of data
                if message_header["size"] == 0:
                    break
                message_header["record_number"] = self.record_number
                message_header["filepos"] = filepos
                mheader.append(message_header)
            meta_headers[msg] = mheader
        return meta_headers

    def get_msg_5_data(self):
        """Get MSG5 data."""
        self.init_record(132)
        self.rh.pos += LEN_MSG_HEADER + 12
        # unpack msg header
        msg_5 = _unpack_dictionary(
            self._rh.read(LEN_MSG_5, width=1),
            MSG_5,
            self._rawdata,
            byte_order=">",
        )
        msg_5["elevation_data"] = []
        # unpack elevation cuts
        for i in range(msg_5["number_elevation_cuts"]):
            msg_5_elev = _unpack_dictionary(
                self._rh.read(LEN_MSG_5_ELEV, width=1),
                MSG_5_ELEV,
                self._rawdata,
                byte_order=">",
            )
            msg_5["elevation_data"].append(msg_5_elev)
        return msg_5

    def get_message_header(self):
        """Read and unpack message header."""
        self._rh.pos += 12
        return _unpack_dictionary(
            self._rh.read(LEN_MSG_HEADER, width=1),
            MSG_HEADER,
            self._rawdata,
            byte_order=">",
        )

    def get_data(self, sweep_number, moment=None):
        """Load sweep data from file."""
        sweep = self.data[sweep_number]
        start = sweep["record_number"]
        stop = sweep["record_end"]
        intermediate_records = [
            rec["record_number"] for rec in sweep["intermediate_records"]
        ]
        filepos = sweep["filepos"]
        moments = sweep["sweep_data"]
        if moment is None:
            moment = moments
        elif isinstance(moment, str):
            moment = [moment]
        for name in moment:
            if self.is_compressed:
                self.init_record(start)
            else:
                self.init_record_by_filepos(start, filepos)
            ngates = moments[name]["ngates"]
            word_size = moments[name]["word_size"]
            data_offset = moments[name]["data_offset"]
            width = {8: 1, 16: 2}[word_size]
            data = []
            self.rh.pos += data_offset
            data.append(self._rh.read(ngates, width=width).view(f">u{width}"))
            while self.init_next_record() and self.record_number <= stop:
                if self.record_number in intermediate_records:
                    continue
                self.rh.pos += data_offset
                data.append(self._rh.read(ngates, width=width).view(f">u{width}"))
            moments[name].update(data=data)

    def get_data_header(self):
        """Load all data header from file."""
        self.init_record(133)
        current_sweep = -1
        current_header = -1
        sweep_msg_31_header = []
        sweep_intermediate_records = []
        sweep = OrderedDict()

        data_header = []
        _msg_31_header = []
        _msg_31_data_header = []

        while self.init_next_record():
            current_header += 1
            # get message headers
            msg_header = self.get_message_header()
            # append to data headers list
            msg_header["record_number"] = self.record_number
            msg_header["filepos"] = self.filepos

            # keep all data headers
            data_header.append(msg_header)
            # only check type 31 for now
            if msg_header["type"] == 31:
                # get msg_31_header
                msg_31_header = _unpack_dictionary(
                    self._rh.read(LEN_MSG_31, width=1),
                    MSG_31,
                    self._rawdata,
                    byte_order=">",
                )
                # add record_number/filepos
                msg_31_header["record_number"] = self.record_number
                msg_31_header["filepos"] = self.filepos
                # retrieve data/const headers from msg 31
                # check if this is a new sweep
                status = msg_31_header["radial_status"]
                if status == 1:
                    # 1 - intermediate radial
                    pass
                if status in [2, 4]:
                    # 2 - end of elevation
                    # 4 - end of volume
                    sweep["record_end"] = self.rh.recnum
                    sweep["intermediate_records"] = sweep_intermediate_records
                    self._data[current_sweep] = sweep
                    _msg_31_header.append(sweep_msg_31_header)
                if status in [0, 3, 5]:
                    # 0 - start of new elevation
                    # 3 - start of new volume
                    # 5 - start of new elevation, last elevation in VCP
                    current_sweep += 1
                    # create new sweep object
                    sweep = OrderedDict()
                    sweep_msg_31_header = []
                    sweep_intermediate_records = []
                    sweep["record_number"] = self.rh.recnum
                    sweep["filepos"] = self.filepos
                    sweep["msg_31_header"] = msg_31_header
                    block_pointers = [
                        v
                        for k, v in msg_31_header.items()
                        if k.startswith("block_pointer") and v > 0
                    ]
                    block_header = {}
                    for i, block_pointer in enumerate(
                        block_pointers[: msg_31_header["block_count"]]
                    ):
                        self.rh.pos = block_pointer + 12 + LEN_MSG_HEADER
                        buf = self._rh.read(4, width=1)
                        dheader = _unpack_dictionary(
                            buf,
                            DATA_BLOCK_HEADER,
                            self._rawdata,
                            byte_order=">",
                        )
                        block = DATA_BLOCK_TYPE_IDENTIFIER[dheader["block_type"]][
                            dheader["data_name"]
                        ]
                        LEN_BLOCK = struct.calcsize(
                            _get_fmt_string(block, byte_order=">")
                        )
                        block_header[dheader["data_name"]] = _unpack_dictionary(
                            self._rh.read(LEN_BLOCK, width=1),
                            block,
                            self._rawdata,
                            byte_order=">",
                        )
                        if dheader["block_type"] == "D":
                            block_header[dheader["data_name"]][
                                "data_offset"
                            ] = self.rh.pos

                    sweep["msg_31_data_header"] = block_header
                    _msg_31_data_header.append(sweep)
                sweep_msg_31_header.append(msg_31_header)
            else:
                sweep_intermediate_records.append(msg_header)

        return data_header, _msg_31_header, _msg_31_data_header

    def _check_record(self):
        """Checks record for correct size.

        Returns
        -------
        chk : bool
            False, if record is truncated.
        """
        chk = self._rh.record.shape[0] == self.record_size
        if not chk:
            raise EOFError(f"Unexpected file end detected at record {self.rh.recnum}")
        return chk


# Figure 1 in Interface Control Document for the Archive II/User
# page 7-2
VOLUME_HEADER = OrderedDict(
    [
        ("tape", {"fmt": "9s"}),
        ("extension", {"fmt": "3s"}),
        ("date", UINT4),
        ("time", UINT4),
        ("icao", {"fmt": "4s"}),
    ]
)

# Table II Message Header Data
# page 3-7
MSG_HEADER = OrderedDict(
    [
        ("size", UINT2),  # size of data, no including header
        ("channels", UINT1),
        ("type", UINT1),
        ("seq_id", UINT2),
        ("date", UINT2),
        ("ms", UINT4),
        ("segments", UINT2),
        ("seg_num", UINT2),
    ]
)
LEN_MSG_HEADER = struct.calcsize(_get_fmt_string(MSG_HEADER, byte_order=">"))

# Table XVII Digital Radar Generic Format Blocks (Message Type 31)
# pages 3-87 to 3-89
MSG_31 = OrderedDict(
    [
        ("id", {"fmt": "4s"}),  # 0-3
        ("collect_ms", UINT4),  # 4-7
        ("collect_date", UINT2),  # 8-9
        ("azimuth_number", UINT2),  # 10-11
        ("azimuth_angle", FLT4),  # 12-15
        ("compress_flag", CODE1),  # 16
        ("spare_0", UINT1),  # 17
        ("radial_length", UINT2),  # 18-19
        ("azimuth_resolution", CODE1),  # 20
        ("radial_status", CODE1),  # 21
        ("elevation_number", UINT1),  # 22
        ("cut_sector", UINT1),  # 23
        ("elevation_angle", FLT4),  # 24-27
        ("radial_blanking", CODE1),  # 28
        ("azimuth_mode", SINT1),  # 29
        ("block_count", UINT2),  # 30-31
        ("block_pointer_1", UINT4),  # 32-35  Volume Data Constant XVII-E
        ("block_pointer_2", UINT4),  # 36-39  Elevation Data Constant XVII-F
        ("block_pointer_3", UINT4),  # 40-43  Radial Data Constant XVII-H
        ("block_pointer_4", UINT4),  # 44-47  Moment "REF" XVII-{B/I}
        ("block_pointer_5", UINT4),  # 48-51  Moment "VEL"
        ("block_pointer_6", UINT4),  # 52-55  Moment "SW"
        ("block_pointer_7", UINT4),  # 56-59  Moment "ZDR"
        ("block_pointer_8", UINT4),  # 60-63  Moment "PHI"
        ("block_pointer_9", UINT4),  # 64-67  Moment "RHO"
        ("block_pointer_10", UINT4),  # Moment "CFP"
    ]
)
LEN_MSG_31 = struct.calcsize(_get_fmt_string(MSG_31, byte_order=">"))


# Table III Digital Radar Data (Message Type 1)
# pages 3-7 to
MSG_1 = OrderedDict(
    [
        ("collect_ms", UINT4),  # 0-3
        ("collect_date", UINT2),  # 4-5
        ("unambig_range", SINT2),  # 6-7
        ("azimuth_angle", CODE2),  # 8-9
        ("azimuth_number", UINT2),  # 10-11
        ("radial_status", CODE2),  # 12-13
        ("elevation_angle", UINT2),  # 14-15
        ("elevation_number", UINT2),  # 16-17
        ("sur_range_first", CODE2),  # 18-19
        ("doppler_range_first", CODE2),  # 20-21
        ("sur_range_step", CODE2),  # 22-23
        ("doppler_range_step", CODE2),  # 24-25
        ("sur_nbins", UINT2),  # 26-27
        ("doppler_nbins", UINT2),  # 28-29
        ("cut_sector_num", UINT2),  # 30-31
        ("calib_const", FLT4),  # 32-35
        ("sur_pointer", UINT2),  # 36-37
        ("vel_pointer", UINT2),  # 38-39
        ("width_pointer", UINT2),  # 40-41
        ("doppler_resolution", CODE2),  # 42-43
        ("vcp", UINT2),  # 44-45
        ("spare_1", {"fmt": "8s"}),  # 46-53
        ("spare_2", {"fmt": "2s"}),  # 54-55
        ("spare_3", {"fmt": "2s"}),  # 56-57
        ("spare_4", {"fmt": "2s"}),  # 58-59
        ("nyquist_vel", SINT2),  # 60-61
        ("atmos_attenuation", SINT2),  # 62-63
        ("threshold", SINT2),  # 64-65
        ("spot_blank_status", UINT2),  # 66-67
        ("spare_5", {"fmt": "32s"}),  # 68-99
        # 100+  reflectivity, velocity and/or spectral width data, CODE1
    ]
)
LEN_MSG_1 = struct.calcsize(_get_fmt_string(MSG_1, byte_order=">"))

# Table XI Volume Coverage Pattern Data (Message Type 5 & 7)
# pages 3-51 to 3-54
MSG_5 = OrderedDict(
    [
        ("message_size", UINT2),
        ("pattern_type", CODE2),
        ("pattern_number", UINT2),
        ("number_elevation_cuts", UINT2),
        ("clutter_map_group_number", UINT2),
        ("doppler_velocity_resolution", CODE1),  # 2: 0.5 degrees, 4: 1.0 degrees
        ("pulse_width", CODE1),  # 2: short, 4: long
        ("spare", {"fmt": "10s"}),  # halfwords 7-11 (10 bytes, 5 halfwords)
    ]
)
LEN_MSG_5 = struct.calcsize(_get_fmt_string(MSG_5, byte_order=">"))

MSG_5_ELEV = OrderedDict(
    [
        ("elevation_angle", BIN2),  # scaled by 360/65536 for value in degrees.
        ("channel_config", CODE1),
        ("waveform_type", CODE1),
        ("super_resolution", CODE1),
        ("prf_number", UINT1),
        ("prf_pulse_count", UINT2),
        ("azimuth_rate", CODE2),
        ("ref_thresh", SINT2),
        ("vel_thresh", SINT2),
        ("sw_thresh", SINT2),
        ("zdr_thres", SINT2),
        ("phi_thres", SINT2),
        ("rho_thres", SINT2),
        ("edge_angle_1", CODE2),
        ("dop_prf_num_1", UINT2),
        ("dop_prf_pulse_count_1", UINT2),
        ("spare_1", {"fmt": "2s"}),
        ("edge_angle_2", CODE2),
        ("dop_prf_num_2", UINT2),
        ("dop_prf_pulse_count_2", UINT2),
        ("spare_2", {"fmt": "2s"}),
        ("edge_angle_3", CODE2),
        ("dop_prf_num_3", UINT2),
        ("dop_prf_pulse_count_3", UINT2),
        ("spare_3", {"fmt": "2s"}),
    ]
)
LEN_MSG_5_ELEV = struct.calcsize(_get_fmt_string(MSG_5_ELEV, byte_order=">"))

MSG_18 = OrderedDict(
    [
        ("adap_file_name", {"fmt": "12s"}),
        ("adap_format", {"fmt": "4s"}),
        ("adap_revision", {"fmt": "4s"}),
        ("adap_date", {"fmt": "12s"}),
        ("adap_time", {"fmt": "12s"}),
        ("k1", FLT4),
        ("az_lat", FLT4),
        ("k3", FLT4),
        ("el_lat", FLT4),
        ("park_az", FLT4),
        ("park_el", FLT4),
        ("a_fuel_conv0", FLT4),
        ("a_fuel_conv1", FLT4),
        ("a_fuel_conv2", FLT4),
        ("a_fuel_conv3", FLT4),
        ("a_fuel_conv4", FLT4),
        ("a_fuel_conv5", FLT4),
        ("a_fuel_conv6", FLT4),
        ("a_fuel_conv7", FLT4),
        ("a_fuel_conv8", FLT4),
        ("a_fuel_conv9", FLT4),
        ("a_fuel_conv10", FLT4),
        ("a_min_shelter_temp", FLT4),
        ("a_max_shelter_temp", FLT4),
        ("a_min_shelter_ac_temp_diff", FLT4),
        ("a_max_xmtr_air_temp", FLT4),
        ("a_max_rad_temp", FLT4),
        ("a_max_rad_temp_rise", FLT4),
        ("ped_28v_reg_lim", FLT4),
        ("ped_5v_reg_lim", FLT4),
        ("ped_15v_reg_lim", FLT4),
        ("a_min_gen_room_temp", FLT4),
        ("a_max_gen_room_temp", FLT4),
        ("dau_5v_reg_lim", FLT4),
        ("dau_15v_reg_lim", FLT4),
        ("dau_28v_reg_lim", FLT4),
        ("en_5v_reg_lim", FLT4),
        ("en_5v_nom_volts", FLT4),
        ("rpg_co_located", {"fmt": "4s"}),
        ("spec_filter_installed", {"fmt": "4s"}),
        ("tps_installed", {"fmt": "4s"}),
        ("rms_installed", {"fmt": "4s"}),
        ("a_hvdl_tst_int", UINT4),
        ("a_rpg_lt_int", UINT4),
        ("a_min_stab_util_pwr_time", UINT4),
        ("a_gen_auto_exer_interval", UINT4),
        ("a_util_pwr_sw_req_interval", UINT4),
        ("a_low_fuel_level", FLT4),
        ("config_chan_number", UINT4),
        ("a_rpg_link_type", UINT4),
        ("redundant_chan_config", UINT4),
    ]
)
for i in np.arange(104):
    MSG_18[f"atten_table_{i:03d}"] = FLT4
MSG_18.update(
    [
        ("spare_01", {"fmt": "24s"}),
        ("path_losses_07", FLT4),
        ("spare_02", {"fmt": "12s"}),
        ("path_losses_11", FLT4),
        ("spare_03", {"fmt": "4s"}),
        ("path_losses_13", FLT4),
        ("path_losses_14", FLT4),
        ("spare_04", {"fmt": "16s"}),
        ("path_losses_19", FLT4),
        ("spare_05", {"fmt": "32s"}),
    ]
)
for i in np.arange(28, 48):
    MSG_18[f"path_losses_{i:02d}"] = FLT4
MSG_18.update(
    [
        ("h_coupler_cw_loss", FLT4),
        ("v_coupler_xmt_loss", FLT4),
        ("path_losses_50", FLT4),
        ("spare_06", {"fmt": "4s"}),
        ("path_losses_52", FLT4),
        ("v_coupler_cw_loss", FLT4),
        ("path_losses_54", FLT4),
        ("spare_07", {"fmt": "12s"}),
        ("path_losses_58", FLT4),
        ("path_losses_59", FLT4),
        ("path_losses_60", FLT4),
        ("path_losses_61", FLT4),
        ("spare_08", {"fmt": "4s"}),
        ("path_losses_63", FLT4),
        ("path_losses_64", FLT4),
        ("path_losses_65", FLT4),
        ("path_losses_66", FLT4),
        ("path_losses_67", FLT4),
        ("path_losses_68", FLT4),
        ("path_losses_69", FLT4),
        ("chan_cal_diff", FLT4),
        ("spare_09", {"fmt": "4s"}),
        ("log_amp_factor_1", FLT4),
        ("log_amp_factor_2", FLT4),
        ("v_ts_cw", FLT4),
    ]
)
for i in np.arange(13):
    MSG_18[f"rnscale_{i:02d}"] = FLT4
for i in np.arange(13):
    MSG_18[f"atmos_{i:02d}"] = FLT4
for i in np.arange(12):
    MSG_18[f"el_index_{i:02d}"] = FLT4
MSG_18.update(
    [
        ("tfreq_mhz", UINT4),
        ("base_data_tcn", FLT4),
        ("refl_data_tover", FLT4),
        ("tar_h_dbz0_lp", FLT4),
        ("tar_v_dbz0_lp", FLT4),
        ("init_phi_dp", UINT4),
        ("norm_init_phi_dp", UINT4),
        ("lx_lp", FLT4),
        ("lx_sp", FLT4),
        ("meteor_param", FLT4),
        ("beamwidth", FLT4),
        ("antenna_gain", FLT4),
        ("spare_10", {"fmt": "4s"}),
        ("vel_maint_limit", FLT4),
        ("wth_maint_limit", FLT4),
        ("vel_degrad_limit", FLT4),
        ("wth_degrad_limit", FLT4),
        ("h_noisetemp_degrad_limit", FLT4),
        ("h_noisetemp_maint_limit", FLT4),
        ("v_noisetemp_degrad_limit", FLT4),
        ("v_noisetemp_maint_limit", FLT4),
        ("kly_degrade_limit", FLT4),
        ("ts_coho", FLT4),
        ("h_ts_cw", FLT4),
        ("ts_rf_sp", FLT4),
        ("ts_rf_lp", FLT4),
        ("ts_stalo", FLT4),
        ("ame_h_noise_enr", FLT4),
        ("xmtr_peak_pwr_high_limit", FLT4),
        ("xmtr_peak_pwr_low_limit", FLT4),
        ("h_dbz0_delta_limit", FLT4),
        ("threshold1", FLT4),
        ("threshold2", FLT4),
        ("clut_supp_dgrad_lim", FLT4),
        ("clut_supp_maint_lim", FLT4),
        ("range0_value", FLT4),
        ("xmtr_pwr_mtr_scale", FLT4),
        ("v_dbz0_delta_limit", FLT4),
        ("tar_h_dbz0_sp", FLT4),
        ("tar_v_dbz0_sp", FLT4),
        ("deltaprf", UINT4),
        ("spare_11", {"fmt": "8s"}),
        ("tau_sp", UINT4),
        ("tau_lp", UINT4),
        ("nc_dead_value", UINT4),
        ("tau_rf_sp", UINT4),
        ("tau_rf_lp", UINT4),
        ("seg1lim", FLT4),
        ("slatsec", FLT4),
        ("slonsec", FLT4),
        ("spare_12", {"fmt": "4s"}),
        ("slatdeg", UINT4),
        ("slatmin", UINT4),
        ("slondeg", UINT4),
        ("slonmin", UINT4),
        ("slatdir", {"fmt": "4s"}),
        ("slondir", {"fmt": "4s"}),
        ("spare_13", {"fmt": "4s"}),
        ("vcpat_11", {"fmt": "1172s"}),
        ("vcpat_21", {"fmt": "1172s"}),
        ("vcpat_31", {"fmt": "1172s"}),
        ("vcpat_32", {"fmt": "1172s"}),
        ("vcpat_300", {"fmt": "1172s"}),
        ("vcpat_301", {"fmt": "1172s"}),
        ("az_correction_factor", FLT4),
        ("el_correction_factor", FLT4),
        ("site_name", {"fmt": "4s"}),
        ("ant_manual_setup_ielmin", SINT4),
        ("ant_manual_setup_ielmax", SINT4),
        ("ant_manual_setup_fazvelmax", SINT4),
        ("ant_manual_setup_felvelmax", SINT4),
        ("ant_manual_setup_ignd_hgt", SINT4),
        ("ant_manual_setup_irad_hgt", SINT4),
        ("spare_14", {"fmt": "300s"}),
        ("rvp8nv_iwaveguide_lenght", UINT4),
        ("spare_15", {"fmt": "44s"}),
        ("vel_data_tover", FLT4),
        ("width_data_tover", FLT4),
        ("spare_16", {"fmt": "12s"}),
        ("doppler_range_start", FLT4),
        ("max_el_index", UINT4),
        ("seg2lim", FLT4),
        ("seg3lim", FLT4),
        ("seg4lim", FLT4),
        ("nbr_el_segments", UINT4),
        ("h_noise_long", FLT4),
        ("ant_noise_temp", FLT4),
        ("h_noise_short", FLT4),
        ("h_noise_tolerance", FLT4),
        ("min_h_dyn_range", FLT4),
        ("gen_installed", {"fmt": "4s"}),
        ("gen_exercise", {"fmt": "4s"}),
        ("v_noise_tolerance", FLT4),
        ("min_v_dyn_range", FLT4),
        ("zdr_bias_dgrad_lim", FLT4),
        ("spare_17", {"fmt": "16s"}),
        ("v_noise_long", FLT4),
        ("v_noise_short", FLT4),
        ("zdr_data_tover", FLT4),
        ("phi_data_tover", FLT4),
        ("rho_data_tover", FLT4),
        ("stalo_power_dgrad_limit", FLT4),
        ("stalo_power_maint_limit", FLT4),
        ("min_h_pwr_sense", FLT4),
        ("min_v_pwr_sense", FLT4),
        ("h_pwr_sense_offset", FLT4),
        ("v_pwr_sense_offset", FLT4),
        ("ps_gain_ref", FLT4),
        ("rf_pallet_broad_loss", FLT4),
        ("zdr_check_threshold", FLT4),
        ("phi_check_threshold", FLT4),
        ("rho_check_threshold", FLT4),
        ("spare_18", {"fmt": "52s"}),
        ("ame_ps_tolerance", FLT4),
        ("ame_max_temp", FLT4),
        ("ame_min_temp", FLT4),
        ("rcvr_mod_max_temp", FLT4),
        ("rcvr_mod_min_temp", FLT4),
        ("bite_mod_max_temp", FLT4),
        ("bite_mod_min_temp", FLT4),
        ("default_polarization", UINT4),
        ("tr_limit_dgrad_limit", FLT4),
        ("tr_limit_fail_limit", FLT4),
        ("spare_19", {"fmt": "8s"}),
        ("ame_current_tolerance", FLT4),
        ("h_only_polarization", UINT4),
        ("v_only_polarization", UINT4),
        ("spare_20", {"fmt": "8s"}),
        ("reflector_bias", FLT4),
        ("a_min_shelter_temp_warn", FLT4),
        ("spare_21", {"fmt": "432s"}),
    ]
)
LEN_MSG_18 = struct.calcsize(_get_fmt_string(MSG_18, byte_order=">"))

DATA_BLOCK_HEADER = OrderedDict(
    [("block_type", string_dict(1)), ("data_name", string_dict(3))]
)
LEN_DATA_BLOCK_HEADER = struct.calcsize(
    _get_fmt_string(DATA_BLOCK_HEADER, byte_order=">")
)


# Table XVII-B Data Block (Descriptor of Generic Data Moment Type)
# pages 3-90 and 3-91
GENERIC_DATA_BLOCK = OrderedDict(
    [
        ("reserved", UINT4),
        ("ngates", UINT2),
        ("first_gate", SINT2),
        ("gate_spacing", SINT2),
        ("thresh", SINT2),
        ("snr_thres", SINT2),
        ("flags", CODE1),
        ("word_size", UINT1),
        ("scale", FLT4),
        ("offset", FLT4),
        # then data
    ]
)
LEN_GENERIC_DATA_BLOCK = struct.calcsize(
    _get_fmt_string(GENERIC_DATA_BLOCK, byte_order=">")
)

# Table XVII-E Data Block (Volume Data Constant Type)
# page 3-92
VOLUME_DATA_BLOCK = OrderedDict(
    [
        ("lrtup", UINT2),
        ("version_major", UINT1),
        ("version_minor", UINT1),
        ("lat", FLT4),
        ("lon", FLT4),
        ("height", SINT2),
        ("feedhorn_height", UINT2),
        ("refl_calib", FLT4),
        ("power_h", FLT4),
        ("power_v", FLT4),
        ("diff_refl_calib", FLT4),
        ("init_phase", FLT4),
        ("vcp", UINT2),
        ("spare", {"fmt": "2s"}),
    ]
)
LEN_VOLUME_DATA_BLOCK = struct.calcsize(
    _get_fmt_string(VOLUME_DATA_BLOCK, byte_order=">")
)

# Table XVII-F Data Block (Elevation Data Constant Type)
# page 3-93
ELEVATION_DATA_BLOCK = OrderedDict(
    [
        ("lrtup", UINT2),
        ("atmos", SINT2),
        ("refl_calib", FLT4),
    ]
)
LEN_ELEVATION_DATA_BLOCK = struct.calcsize(
    _get_fmt_string(ELEVATION_DATA_BLOCK, byte_order=">")
)

# Table XVII-H Data Block (Radial Data Constant Type)
# pages 3-93
RADIAL_DATA_BLOCK = OrderedDict(
    [
        ("lrtup", UINT2),
        ("unambig_range", SINT2),
        ("noise_h", FLT4),
        ("noise_v", FLT4),
        ("nyquist_vel", SINT2),
        ("spare", {"fmt": "2s"}),
    ]
)
LEN_RADIAL_DATA_BLOCK = struct.calcsize(
    _get_fmt_string(RADIAL_DATA_BLOCK, byte_order=">")
)

DATA_BLOCK_CONSTANT_IDENTIFIERS = OrderedDict(
    [
        ("VOL", VOLUME_DATA_BLOCK),
        ("ELV", ELEVATION_DATA_BLOCK),
        ("RAD", RADIAL_DATA_BLOCK),
    ]
)

DATA_BLOCK_VARIABLE_IDENTIFIERS = OrderedDict(
    [
        ("REF", GENERIC_DATA_BLOCK),
        ("VEL", GENERIC_DATA_BLOCK),
        ("SW ", GENERIC_DATA_BLOCK),
        ("ZDR", GENERIC_DATA_BLOCK),
        ("PHI", GENERIC_DATA_BLOCK),
        ("RHO", GENERIC_DATA_BLOCK),
        ("CFP", GENERIC_DATA_BLOCK),
    ]
)

DATA_BLOCK_TYPE_IDENTIFIER = OrderedDict(
    [
        ("R", DATA_BLOCK_CONSTANT_IDENTIFIERS),
        ("D", DATA_BLOCK_VARIABLE_IDENTIFIERS),
    ]
)


class NexradLevel2ArrayWrapper(BackendArray):
    """Wraps array of NexradLevel2 data."""

    def __init__(self, datastore, name, var):
        self.datastore = datastore
        self.group = datastore._group
        self.name = name
        # get rays and bins
        nrays = (
            datastore.ds["record_end"]
            - datastore.ds["record_number"]
            + 1
            - len(datastore.ds["intermediate_records"])
        )
        nbins = max([v["ngates"] for k, v in datastore.ds["sweep_data"].items()])
        word_size = datastore.ds["sweep_data"][name]["word_size"]
        width = {8: 1, 16: 2}[word_size]
        self.dtype = np.dtype(f">u{width}")
        self.shape = (nrays, nbins)

    def _getitem(self, key):
        # read the data if not available
        try:
            data = self.datastore.ds["sweep_data"][self.name]["data"]
        except KeyError:
            self.datastore.root.get_data(self.group, self.name)
            data = self.datastore.ds["sweep_data"][self.name]["data"]
        # see 3.2.4.17.6 Table XVII-I Data Moment Characteristics and Conversion for Data Names
        word_size = self.datastore.ds["sweep_data"][self.name]["word_size"]
        if self.name == "PHI" and word_size == 16:
            # 10 bit mask, but only for 2 byte data
            x = np.uint16(0x3FF)
        elif self.name == "ZDR" and word_size == 16:
            # 11 bit mask, but only for 2 byte data
            x = np.uint16(0x7FF)
        else:
            x = np.uint8(0xFF)
        if len(data[0]) < self.shape[1]:
            return np.pad(
                np.vstack(data) & x,
                ((0, 0), (0, self.shape[1] - len(data[0]))),
                mode="constant",
                constant_values=0,
            )[key]
        else:
            return (np.vstack(data) & x)[key]

    def __getitem__(self, key):
        return indexing.explicit_indexing_adapter(
            key,
            self.shape,
            indexing.IndexingSupport.BASIC,
            self._getitem,
        )


class NexradLevel2Store(AbstractDataStore):
    def __init__(self, manager, group=None):
        self._manager = manager
        self._group = int(group[6:])
        self._filename = self.filename

    @classmethod
    def open(cls, filename, mode="r", group=None, **kwargs):
        manager = CachingFileManager(
            NEXRADLevel2File, filename, mode=mode, kwargs=kwargs
        )
        return cls(manager, group=group)

    @property
    def filename(self):
        with self._manager.acquire_context(False) as root:
            return root.filename

    @property
    def root(self):
        with self._manager.acquire_context(False) as root:
            return root

    def _acquire(self, needs_lock=True):
        with self._manager.acquire_context(needs_lock) as root:
            root.get_sweep(self._group)
            ds = root.data[self._group]
        return ds

    @property
    def ds(self):
        return self._acquire()

    def open_store_variable(self, name, var):
        dim = "azimuth"

        data = indexing.LazilyOuterIndexedArray(
            NexradLevel2ArrayWrapper(self, name, var)
        )
        encoding = {"group": self._group, "source": self._filename}

        mname = nexrad_mapping.get(name, name)
        mapping = sweep_vars_mapping.get(mname, {})
        attrs = {key: mapping[key] for key in moment_attrs if key in mapping}
        attrs["scale_factor"] = 1.0 / var["scale"]
        attrs["add_offset"] = -var["offset"] / var["scale"]
        attrs["coordinates"] = (
            "elevation azimuth range latitude longitude altitude time"
        )
        return mname, Variable((dim, "range"), data, attrs, encoding)

    def open_store_coordinates(self):
        msg_31_header = self.root.msg_31_header[self._group]

        # azimuth/elevation
        azimuth = np.array([ms["azimuth_angle"] for ms in msg_31_header])
        elevation = np.array([ms["elevation_angle"] for ms in msg_31_header])

        # time
        # date is 1-based (1 -> 1970-01-01T00:00:00), so we need to subtract 1
        date = (np.array([ms["collect_date"] for ms in msg_31_header]) - 1) * 86400e3
        milliseconds = np.array([ms["collect_ms"] for ms in msg_31_header])
        rtime = date + milliseconds
        time_prefix = "milli"
        rtime_attrs = get_time_attrs(date_unit=f"{time_prefix}seconds")

        # site coords
        vol = self.ds["sweep_constant_data"]["VOL"]
        lon, lat, alt = vol["lon"], vol["lat"], vol["height"] + vol["feedhorn_height"]

        # range
        sweep_data = list(self.ds["sweep_data"].values())[0]
        first_gate = sweep_data["first_gate"]
        gate_spacing = sweep_data["gate_spacing"]
        ngates = max([v["ngates"] for k, v in self.ds["sweep_data"].items()])
        last_gate = first_gate + gate_spacing * (ngates - 0.5)
        rng = np.arange(first_gate, last_gate, gate_spacing, "float32")
        range_attrs = get_range_attrs(rng)

        encoding = {"group": self._group}
        dim = "azimuth"
        sweep_mode = "azimuth_surveillance"
        sweep_number = self._group
        prt_mode = "not_set"
        follow_mode = "not_set"

        fixed_angle = self.root.msg_5["elevation_data"][self._group]["elevation_angle"]

        coords = {
            "azimuth": Variable((dim,), azimuth, get_azimuth_attrs(), encoding),
            "elevation": Variable((dim,), elevation, get_elevation_attrs(), encoding),
            "time": Variable((dim,), rtime, rtime_attrs, encoding),
            "range": Variable(("range",), rng, range_attrs),
            "longitude": Variable((), lon, get_longitude_attrs()),
            "latitude": Variable((), lat, get_latitude_attrs()),
            "altitude": Variable((), alt, get_altitude_attrs()),
            "sweep_mode": Variable((), sweep_mode),
            "sweep_number": Variable((), sweep_number),
            "prt_mode": Variable((), prt_mode),
            "follow_mode": Variable((), follow_mode),
            "sweep_fixed_angle": Variable((), fixed_angle),
        }

        return coords

    def get_variables(self):
        return FrozenDict(
            (k1, v1)
            for k1, v1 in {
                **dict(
                    self.open_store_variable(k, v)
                    for k, v in self.ds["sweep_data"].items()
                ),
                **self.open_store_coordinates(),
            }.items()
        )

    def get_attrs(self):
        attributes = [("instrument_name", self.root.volume_header["icao"].decode())]

        return FrozenDict(attributes)


[docs] class NexradLevel2BackendEntrypoint(BackendEntrypoint): """ Xarray BackendEntrypoint for NEXRAD Level2 Data """ description = "Open NEXRAD Level2 files in Xarray" url = "tbd" def open_dataset( self, filename_or_obj, *, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, drop_variables=None, use_cftime=None, decode_timedelta=None, group=None, first_dim="auto", reindex_angle=False, fix_second_angle=False, site_coords=True, optional=True, ): store = NexradLevel2Store.open( filename_or_obj, group=group, loaddata=False, ) store_entrypoint = StoreBackendEntrypoint() ds = store_entrypoint.open_dataset( store, mask_and_scale=mask_and_scale, decode_times=decode_times, concat_characters=concat_characters, decode_coords=decode_coords, drop_variables=drop_variables, use_cftime=use_cftime, decode_timedelta=decode_timedelta, ) # reassign azimuth/elevation/time coordinates ds = ds.assign_coords({"azimuth": ds.azimuth}) ds = ds.assign_coords({"elevation": ds.elevation}) ds = ds.assign_coords({"time": ds.time}) ds.encoding["engine"] = "nexradlevel2" # handle duplicates and reindex if decode_coords and reindex_angle is not False: ds = ds.pipe(util.remove_duplicate_rays) ds = ds.pipe(util.reindex_angle, **reindex_angle) ds = ds.pipe(util.ipol_time, **reindex_angle) # handling first dimension dim0 = "elevation" if ds.sweep_mode.load() == "rhi" else "azimuth" # todo: could be optimized if first_dim == "time": ds = ds.swap_dims({dim0: "time"}) ds = ds.sortby("time") else: ds = ds.sortby(dim0) # assign geo-coords if site_coords: ds = ds.assign_coords( { "latitude": ds.latitude, "longitude": ds.longitude, "altitude": ds.altitude, } ) return ds
[docs] def open_nexradlevel2_datatree(filename_or_obj, **kwargs): """Open NEXRAD Level2 dataset as :py:class:`xarray.DataTree`. Parameters ---------- filename_or_obj : str, Path, file-like or DataStore Strings and Path objects are interpreted as a path to a local or remote radar file Keyword Arguments ----------------- sweep : int, list of int, optional Sweep number(s) to extract, default to first sweep. If None, all sweeps are extracted into a list. first_dim : str Can be ``time`` or ``auto`` first dimension. If set to ``auto``, first dimension will be either ``azimuth`` or ``elevation`` depending on type of sweep. Defaults to ``auto``. reindex_angle : bool or dict Defaults to False, no reindexing. Given dict should contain the kwargs to reindex_angle. Only invoked if `decode_coord=True`. fix_second_angle : bool If True, fixes erroneous second angle data. Defaults to ``False``. site_coords : bool Attach radar site-coordinates to Dataset, defaults to ``True``. kwargs : dict Additional kwargs are fed to :py:func:`xarray.open_dataset`. Returns ------- dtree: xarray.DataTree DataTree """ # handle kwargs, extract first_dim backend_kwargs = kwargs.pop("backend_kwargs", {}) # first_dim = backend_kwargs.pop("first_dim", None) sweep = kwargs.pop("sweep", None) sweeps = [] kwargs["backend_kwargs"] = backend_kwargs if isinstance(sweep, str): sweeps = [sweep] elif isinstance(sweep, int): sweeps = [f"sweep_{sweep}"] elif isinstance(sweep, list): if isinstance(sweep[0], int): sweeps = [f"sweep_{i}" for i in sweep] else: sweeps.extend(sweep) else: with NEXRADLevel2File(filename_or_obj, loaddata=False) as nex: nsweeps = nex.msg_5["number_elevation_cuts"] sweeps = [f"sweep_{i}" for i in range(nsweeps)] engine = NexradLevel2BackendEntrypoint # todo: only open once! Needs new xarray built in datatree! ls_ds: list[xr.Dataset] = [] for swp in sweeps: try: dsx = xr.open_dataset(filename_or_obj, group=swp, engine=engine, **kwargs) except IndexError: break else: ls_ds.append(dsx) ls_ds.insert(0, xr.Dataset()) dtree: dict = { "/": _assign_root(ls_ds), "/radar_parameters": _get_subgroup(ls_ds, radar_parameters_subgroup), "/georeferencing_correction": _get_subgroup( ls_ds, georeferencing_correction_subgroup ), "/radar_calibration": _get_radar_calibration(ls_ds, radar_calibration_subgroup), } dtree = _attach_sweep_groups(dtree, ls_ds[1:]) return DataTree.from_dict(dtree)