#!/usr/bin/env python
# Copyright (c) 2024, openradar developers.
# Distributed under the MIT License. See LICENSE for more info.
"""
NEXRAD Level2 Data I/O
^^^^^^^^^^^^^^^^^^^^^^
Reads data from NEXRAD Level2 data format
See https://www.roc.noaa.gov/WSR88D/BuildInfo/Files.aspx
Documents:
- ICD 2620002 M ICD FOR RDA/RPG - Build RDA 11.5/RPG 13.0 (PDF)
- ICD 2620010 E ICD FOR ARCHIVE II/USER - Build 12.0 (PDF)
To read from NEXRAD Level2 files :class:`numpy:numpy.memmap` is used for
uncompressed files (pre 2016-06-01) and :class:`bz2:BZ2Decompressor` for BZ2
compressed data. The NEXRAD header (`VOLUME_HEADER`, `MSG_HEADER`) are read in
any case into dedicated OrderedDict's. Reading sweep data can be skipped by
setting `loaddata=False`. By default, the data is decoded on the fly.
Using `rawdata=True` the data will be kept undecoded.
Code adapted from Py-ART.
.. autosummary::
:nosignatures:
:toctree: generated/
{}
"""
__all__ = [
"NexradLevel2BackendEntrypoint",
"open_nexradlevel2_datatree",
]
__doc__ = __doc__.format("\n ".join(__all__))
import bz2
import struct
from collections import OrderedDict
import numpy as np
import xarray as xr
from xarray import DataTree
from xarray.backends.common import AbstractDataStore, BackendArray, BackendEntrypoint
from xarray.backends.file_manager import CachingFileManager
from xarray.backends.store import StoreBackendEntrypoint
from xarray.core import indexing
from xarray.core.utils import FrozenDict
from xarray.core.variable import Variable
from xradar import util
from xradar.io.backends.common import (
_assign_root,
_attach_sweep_groups,
_get_radar_calibration,
_get_subgroup,
)
from xradar.model import (
georeferencing_correction_subgroup,
get_altitude_attrs,
get_azimuth_attrs,
get_elevation_attrs,
get_latitude_attrs,
get_longitude_attrs,
get_range_attrs,
get_time_attrs,
moment_attrs,
radar_calibration_subgroup,
radar_parameters_subgroup,
sweep_vars_mapping,
)
from .iris import (
BIN2,
IrisRecord,
_get_fmt_string,
_unpack_dictionary,
string_dict,
)
#: mapping from NEXRAD names to CfRadial2/ODIM
nexrad_mapping = {
"REF": "DBZH",
"VEL": "VRADH",
"SW ": "WRADH",
"ZDR": "ZDR",
"PHI": "PHIDP",
"RHO": "RHOHV",
"CFP": "CCORH",
}
# NEXRAD Level II file structures and sizes
# The deails on these structures are documented in:
# "Interface Control Document for the Achive II/User" RPG Build 12.0
# Document Number 2620010E
# and
# "Interface Control Document for the RDA/RPG" Open Build 13.0
# Document Number 2620002M
# Tables and page number refer to those in the second document unless
# otherwise noted.
RECORD_BYTES = 2432
COMPRESSION_RECORD_SIZE = 12
CONTROL_WORD_SIZE = 4
# format of structure elements
# section 3.2.1, page 3-2
UINT1 = {"fmt": "B", "dtype": "unit8"}
UINT2 = {"fmt": "H", "dtype": "uint16"}
UINT4 = {"fmt": "I", "dtype": "uint32"}
SINT1 = {"fmt": "b", "dtype": "int8"}
SINT2 = {"fmt": "h", "dtype": "int16"}
SINT4 = {"fmt": "i", "dtype": "int32"}
FLT4 = {"fmt": "f", "dtype": "float32"}
CODE1 = {"fmt": "B"}
CODE2 = {"fmt": "H"}
class NEXRADFile:
"""
Class for accessing data in a NEXRAD (WSR-88D) Level II file.
NEXRAD Level II files [1]_, also know as NEXRAD Archive Level II or
WSR-88D Archive level 2, are available from the NOAA National Climate Data
Center [2]_ as well as on the UCAR THREDDS Data Server [3]_. Files with
uncompressed messages and compressed messages are supported. This class
supports reading both "message 31" and "message 1" type files.
Parameters
----------
filename : str
Filename of Archive II file to read.
References
----------
.. [1] http://www.roc.noaa.gov/WSR88D/Level_II/Level2Info.aspx
.. [2] http://www.ncdc.noaa.gov/
.. [3] http://thredds.ucar.edu/thredds/catalog.html
"""
def __init__(self, filename, mode="r", loaddata=False):
"""initalize the object."""
self._fp = None
self._filename = filename
# read in the volume header and compression_record
if hasattr(filename, "read"):
self._fh = filename
else:
self._fp = open(filename, "rb")
self._fh = np.memmap(self._fp, mode=mode)
self._filepos = 0
self._rawdata = False
self._loaddata = loaddata
self._bz2_indices = None
self.volume_header = self.get_header(VOLUME_HEADER)
return
@property
def filename(self):
return self._filename
@property
def fh(self):
return self._fh
@property
def filepos(self):
return self._filepos
@filepos.setter
def filepos(self, pos):
self._filepos = pos
def read_from_file(self, size):
"""Read from file.
Parameters
----------
size : int
Number of data words to read.
Returns
-------
data : array-like
numpy array of data
"""
start = self._filepos
self._filepos += size
return self._fh[start : self._filepos]
def get_header(self, header):
len = struct.calcsize(_get_fmt_string(header))
head = _unpack_dictionary(self.read_from_file(len), header, self._rawdata)
return head
@property
def bz2_record_indices(self):
"""Get file offsets of bz2 records."""
if self._bz2_indices is None:
# magic number inside BZ2
seq = np.array([49, 65, 89, 38, 83, 89], dtype=np.uint8)
rd = util.rolling_dim(self._fh, 6)
self._bz2_indices = np.nonzero((rd == seq).all(1))[0] - 8
return self._bz2_indices
@property
def is_compressed(self):
"""File contains bz2 compressed data."""
size = self._fh[24:28].view(dtype=">u4")[0]
return size > 0
def close(self):
if self._fp is not None:
self._fp.close()
__del__ = close
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
class NEXRADRecordFile(NEXRADFile):
"""NEXRADRecordFile Record File class"""
def __init__(self, filename, **kwargs):
super().__init__(filename=filename, **kwargs)
self._rh = None
self._rc = None
self._ldm = dict()
self._record_number = None
@property
def rh(self):
"""Returns current record object."""
return self._rh
@rh.setter
def rh(self, value):
"""Sets current record object."""
self._rh = value
@property
def record_number(self):
"""Returns current record number."""
return self._record_number
@record_number.setter
def record_number(self, value):
"""Sets current record number."""
self._record_number = value
@property
def record_size(self):
"""Returns current record number."""
return self._record_size
@record_size.setter
def record_size(self, value):
"""Sets current record number."""
self._record_size = value
def _check_record(self):
"""Checks record for correct size.
Need to be implemented in the derived classes
"""
return True
def get_end(self, buf):
if len(buf) < 16:
return 0
header = _unpack_dictionary(buf, MSG_HEADER, self._rawdata, byte_order=">")
size = header["size"] * 2 + 12
if header["type"] != 31:
size = size if size >= RECORD_BYTES else RECORD_BYTES
return size
def init_record(self, recnum):
"""Initialize record using given number."""
# map record numbers to ldm compressed records
def get_ldm(recnum):
if recnum < 134:
return 0
mod = ((recnum - 134) // 120) + 1
return mod
if self.is_compressed:
ldm = get_ldm(recnum)
# get uncompressed ldm record
if self._ldm.get(ldm, None) is None:
# otherwise extract wanted ldm compressed record
if ldm >= len(self.bz2_record_indices):
return False
start = self.bz2_record_indices[ldm]
size = self._fh[start : start + 4].view(dtype=">u4")[0]
self._fp.seek(start + 4)
dec = bz2.BZ2Decompressor()
self._ldm[ldm] = np.frombuffer(
dec.decompress(self._fp.read(size)), dtype=np.uint8
)
# rectrieve wanted record and put into self.rh
if recnum < 134:
start = recnum * RECORD_BYTES
if not self.is_compressed:
start += 24
stop = start + RECORD_BYTES
else:
if self.is_compressed:
# get index into current compressed ldm record
rnum = (recnum - 134) % 120
start = self.record_size + self.filepos if rnum else 0
buf = self._ldm[ldm][start + 12 : start + 12 + LEN_MSG_HEADER]
size = self.get_end(buf)
if not size:
return False
stop = start + size
else:
start = self.record_size + self.filepos
buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER]
size = self.get_end(buf)
if not size:
return False
stop = start + size
self.record_number = recnum
self.record_size = stop - start
if self.is_compressed:
self.rh = IrisRecord(self._ldm[ldm][start:stop], recnum)
else:
self.rh = IrisRecord(self.fh[start:stop], recnum)
self.filepos = start
return self._check_record()
def init_record_by_filepos(self, recnum, filepos):
"""Initialize record using given record number and position."""
start = filepos
buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER]
size = self.get_end(buf)
if not size:
return False
stop = start + size
self.record_number = recnum
self.record_size = stop - start
self.rh = IrisRecord(self.fh[start:stop], recnum)
self.filepos = start
return self._check_record()
def init_next_record(self):
"""Get next record from file.
This increases record_number count and initialises a new IrisRecord
with the calculated start and stop file offsets.
Returns
-------
chk : bool
True, if record is truncated.
"""
return self.init_record(self.record_number + 1)
def array_from_record(self, words, width, dtype):
"""Retrieve array from current record.
Parameters
----------
words : int
Number of data words to read.
width : int
Size of the data word to read in bytes.
dtype : str
dtype string specifying data format.
Returns
-------
data : array-like
numpy array of data
"""
return self.rh.read(words, width=width).view(dtype=dtype)
def bytes_from_record(self, words, width):
"""Retrieve bytes from current record.
Parameters
----------
words : int
Number of data words to read.
width : int
Size of the data word to read in bytes.
Returns
-------
data : array-like
numpy array of data
"""
return self.rh.read(words, width=width)
class NEXRADLevel2File(NEXRADRecordFile):
def __init__(self, filename, **kwargs):
super().__init__(filename, **kwargs)
# check compression
self._data_header = None
# get all metadata headers
# message 15, 13, 18, 3, 5 and 2
self._meta_header = None
# message 15
# RDA Clutter Map Data
# message 13
# RDA Clutter Filter Bypass Map
# message 18
# RDA Adaptable Parameters
# message 3
# RDA Performance/Maintenance Data
# message 5
# RDA Volume Coverage Data
self._msg_5_data = None
# message 2
# RDA Status Data
# message 31 headers
# Digital Radar Data Generic Format
self._data_header = None
self._msg_31_header = None
self._msg_31_data_header = None
self._data = OrderedDict()
@property
def data_header(self):
"""Retrieve data header."""
if self._data_header is None:
(
self._data_header,
self._msg_31_header,
self._msg_31_data_header,
) = self.get_data_header()
return self._data_header
@property
def msg_31_header(self):
"""Retrieve MSG31 Header."""
if self._msg_31_header is None:
(
self._data_header,
self._msg_31_header,
self._msg_31_data_header,
) = self.get_data_header()
return self._msg_31_header
@property
def msg_31_data_header(self):
"""Retrieve MSG31 data header."""
if self._msg_31_data_header is None:
(
self._data_header,
self._msg_31_header,
self._msg_31_data_header,
) = self.get_data_header()
return self._msg_31_data_header
@property
def meta_header(self):
"""Retrieve metadat header."""
if self._meta_header is None:
self._meta_header = self.get_metadata_header()
return self._meta_header
@property
def msg_5(self):
"""Retrieve MSG5 data."""
if self._msg_5_data is None:
self._msg_5_data = self.get_msg_5_data()
return self._msg_5_data
@property
def data(self):
"""Retrieve data."""
return self._data
def get_sweep(self, sweep_number, moments=None):
"""Retrieve sweep according sweep_number."""
if moments is None:
moments = self.msg_31_data_header[sweep_number]["msg_31_data_header"].keys()
# get selected coordinate type names
selected = [k for k in DATA_BLOCK_CONSTANT_IDENTIFIERS.keys() if k in moments]
for moment in selected:
self.get_moment(sweep_number, moment, "sweep_constant_data")
# get selected data type names
selected = [k for k in DATA_BLOCK_VARIABLE_IDENTIFIERS.keys() if k in moments]
for moment in selected:
self.get_moment(sweep_number, moment, "sweep_data")
def get_moment(self, sweep_number, moment, mtype):
"""Retrieve Moment according sweep_number and moment."""
sweep = self.data[sweep_number]
# create new sweep_data OrderedDict
if not sweep.get(mtype, False):
sweep[mtype] = OrderedDict()
# check if moment exist and return early
data = sweep[mtype].get(moment, False)
if data is not False:
return
data = OrderedDict()
data[moment] = self.msg_31_data_header[sweep_number]["msg_31_data_header"].pop(
moment
)
sweep.update(self.msg_31_data_header[sweep_number])
sweep[mtype].update(data)
def get_metadata_header(self):
"""Get metadaata header"""
# data offsets
# ICD 2620010E
# 7.3.5 Metadata Record
meta_offsets = dict(
msg_15=(0, 77),
msg_13=(77, 126),
msg_18=(126, 131),
msg_3=(131, 132),
msg_5=(132, 133),
msg_2=(133, 134),
)
meta_headers = {}
for msg, (ms, me) in meta_offsets.items():
mheader = []
for rec in np.arange(ms, me):
self.init_record(rec)
filepos = self.filepos
message_header = self.get_message_header()
# do not read zero blocks of data
if message_header["size"] == 0:
break
message_header["record_number"] = self.record_number
message_header["filepos"] = filepos
mheader.append(message_header)
meta_headers[msg] = mheader
return meta_headers
def get_msg_5_data(self):
"""Get MSG5 data."""
self.init_record(132)
self.rh.pos += LEN_MSG_HEADER + 12
# unpack msg header
msg_5 = _unpack_dictionary(
self._rh.read(LEN_MSG_5, width=1),
MSG_5,
self._rawdata,
byte_order=">",
)
msg_5["elevation_data"] = []
# unpack elevation cuts
for i in range(msg_5["number_elevation_cuts"]):
msg_5_elev = _unpack_dictionary(
self._rh.read(LEN_MSG_5_ELEV, width=1),
MSG_5_ELEV,
self._rawdata,
byte_order=">",
)
msg_5["elevation_data"].append(msg_5_elev)
return msg_5
def get_message_header(self):
"""Read and unpack message header."""
self._rh.pos += 12
return _unpack_dictionary(
self._rh.read(LEN_MSG_HEADER, width=1),
MSG_HEADER,
self._rawdata,
byte_order=">",
)
def get_data(self, sweep_number, moment=None):
"""Load sweep data from file."""
sweep = self.data[sweep_number]
start = sweep["record_number"]
stop = sweep["record_end"]
intermediate_records = [
rec["record_number"] for rec in sweep["intermediate_records"]
]
filepos = sweep["filepos"]
moments = sweep["sweep_data"]
if moment is None:
moment = moments
elif isinstance(moment, str):
moment = [moment]
for name in moment:
if self.is_compressed:
self.init_record(start)
else:
self.init_record_by_filepos(start, filepos)
ngates = moments[name]["ngates"]
word_size = moments[name]["word_size"]
data_offset = moments[name]["data_offset"]
width = {8: 1, 16: 2}[word_size]
data = []
self.rh.pos += data_offset
data.append(self._rh.read(ngates, width=width).view(f">u{width}"))
while self.init_next_record() and self.record_number <= stop:
if self.record_number in intermediate_records:
continue
self.rh.pos += data_offset
data.append(self._rh.read(ngates, width=width).view(f">u{width}"))
moments[name].update(data=data)
def get_data_header(self):
"""Load all data header from file."""
self.init_record(133)
current_sweep = -1
current_header = -1
sweep_msg_31_header = []
sweep_intermediate_records = []
sweep = OrderedDict()
data_header = []
_msg_31_header = []
_msg_31_data_header = []
while self.init_next_record():
current_header += 1
# get message headers
msg_header = self.get_message_header()
# append to data headers list
msg_header["record_number"] = self.record_number
msg_header["filepos"] = self.filepos
# keep all data headers
data_header.append(msg_header)
# only check type 31 for now
if msg_header["type"] == 31:
# get msg_31_header
msg_31_header = _unpack_dictionary(
self._rh.read(LEN_MSG_31, width=1),
MSG_31,
self._rawdata,
byte_order=">",
)
# add record_number/filepos
msg_31_header["record_number"] = self.record_number
msg_31_header["filepos"] = self.filepos
# retrieve data/const headers from msg 31
# check if this is a new sweep
status = msg_31_header["radial_status"]
if status == 1:
# 1 - intermediate radial
pass
if status in [2, 4]:
# 2 - end of elevation
# 4 - end of volume
sweep["record_end"] = self.rh.recnum
sweep["intermediate_records"] = sweep_intermediate_records
self._data[current_sweep] = sweep
_msg_31_header.append(sweep_msg_31_header)
if status in [0, 3, 5]:
# 0 - start of new elevation
# 3 - start of new volume
# 5 - start of new elevation, last elevation in VCP
current_sweep += 1
# create new sweep object
sweep = OrderedDict()
sweep_msg_31_header = []
sweep_intermediate_records = []
sweep["record_number"] = self.rh.recnum
sweep["filepos"] = self.filepos
sweep["msg_31_header"] = msg_31_header
block_pointers = [
v
for k, v in msg_31_header.items()
if k.startswith("block_pointer") and v > 0
]
block_header = {}
for i, block_pointer in enumerate(
block_pointers[: msg_31_header["block_count"]]
):
self.rh.pos = block_pointer + 12 + LEN_MSG_HEADER
buf = self._rh.read(4, width=1)
dheader = _unpack_dictionary(
buf,
DATA_BLOCK_HEADER,
self._rawdata,
byte_order=">",
)
block = DATA_BLOCK_TYPE_IDENTIFIER[dheader["block_type"]][
dheader["data_name"]
]
LEN_BLOCK = struct.calcsize(
_get_fmt_string(block, byte_order=">")
)
block_header[dheader["data_name"]] = _unpack_dictionary(
self._rh.read(LEN_BLOCK, width=1),
block,
self._rawdata,
byte_order=">",
)
if dheader["block_type"] == "D":
block_header[dheader["data_name"]][
"data_offset"
] = self.rh.pos
sweep["msg_31_data_header"] = block_header
_msg_31_data_header.append(sweep)
sweep_msg_31_header.append(msg_31_header)
else:
sweep_intermediate_records.append(msg_header)
return data_header, _msg_31_header, _msg_31_data_header
def _check_record(self):
"""Checks record for correct size.
Returns
-------
chk : bool
False, if record is truncated.
"""
chk = self._rh.record.shape[0] == self.record_size
if not chk:
raise EOFError(f"Unexpected file end detected at record {self.rh.recnum}")
return chk
# Figure 1 in Interface Control Document for the Archive II/User
# page 7-2
VOLUME_HEADER = OrderedDict(
[
("tape", {"fmt": "9s"}),
("extension", {"fmt": "3s"}),
("date", UINT4),
("time", UINT4),
("icao", {"fmt": "4s"}),
]
)
# Table II Message Header Data
# page 3-7
MSG_HEADER = OrderedDict(
[
("size", UINT2), # size of data, no including header
("channels", UINT1),
("type", UINT1),
("seq_id", UINT2),
("date", UINT2),
("ms", UINT4),
("segments", UINT2),
("seg_num", UINT2),
]
)
LEN_MSG_HEADER = struct.calcsize(_get_fmt_string(MSG_HEADER, byte_order=">"))
# Table XVII Digital Radar Generic Format Blocks (Message Type 31)
# pages 3-87 to 3-89
MSG_31 = OrderedDict(
[
("id", {"fmt": "4s"}), # 0-3
("collect_ms", UINT4), # 4-7
("collect_date", UINT2), # 8-9
("azimuth_number", UINT2), # 10-11
("azimuth_angle", FLT4), # 12-15
("compress_flag", CODE1), # 16
("spare_0", UINT1), # 17
("radial_length", UINT2), # 18-19
("azimuth_resolution", CODE1), # 20
("radial_status", CODE1), # 21
("elevation_number", UINT1), # 22
("cut_sector", UINT1), # 23
("elevation_angle", FLT4), # 24-27
("radial_blanking", CODE1), # 28
("azimuth_mode", SINT1), # 29
("block_count", UINT2), # 30-31
("block_pointer_1", UINT4), # 32-35 Volume Data Constant XVII-E
("block_pointer_2", UINT4), # 36-39 Elevation Data Constant XVII-F
("block_pointer_3", UINT4), # 40-43 Radial Data Constant XVII-H
("block_pointer_4", UINT4), # 44-47 Moment "REF" XVII-{B/I}
("block_pointer_5", UINT4), # 48-51 Moment "VEL"
("block_pointer_6", UINT4), # 52-55 Moment "SW"
("block_pointer_7", UINT4), # 56-59 Moment "ZDR"
("block_pointer_8", UINT4), # 60-63 Moment "PHI"
("block_pointer_9", UINT4), # 64-67 Moment "RHO"
("block_pointer_10", UINT4), # Moment "CFP"
]
)
LEN_MSG_31 = struct.calcsize(_get_fmt_string(MSG_31, byte_order=">"))
# Table III Digital Radar Data (Message Type 1)
# pages 3-7 to
MSG_1 = OrderedDict(
[
("collect_ms", UINT4), # 0-3
("collect_date", UINT2), # 4-5
("unambig_range", SINT2), # 6-7
("azimuth_angle", CODE2), # 8-9
("azimuth_number", UINT2), # 10-11
("radial_status", CODE2), # 12-13
("elevation_angle", UINT2), # 14-15
("elevation_number", UINT2), # 16-17
("sur_range_first", CODE2), # 18-19
("doppler_range_first", CODE2), # 20-21
("sur_range_step", CODE2), # 22-23
("doppler_range_step", CODE2), # 24-25
("sur_nbins", UINT2), # 26-27
("doppler_nbins", UINT2), # 28-29
("cut_sector_num", UINT2), # 30-31
("calib_const", FLT4), # 32-35
("sur_pointer", UINT2), # 36-37
("vel_pointer", UINT2), # 38-39
("width_pointer", UINT2), # 40-41
("doppler_resolution", CODE2), # 42-43
("vcp", UINT2), # 44-45
("spare_1", {"fmt": "8s"}), # 46-53
("spare_2", {"fmt": "2s"}), # 54-55
("spare_3", {"fmt": "2s"}), # 56-57
("spare_4", {"fmt": "2s"}), # 58-59
("nyquist_vel", SINT2), # 60-61
("atmos_attenuation", SINT2), # 62-63
("threshold", SINT2), # 64-65
("spot_blank_status", UINT2), # 66-67
("spare_5", {"fmt": "32s"}), # 68-99
# 100+ reflectivity, velocity and/or spectral width data, CODE1
]
)
LEN_MSG_1 = struct.calcsize(_get_fmt_string(MSG_1, byte_order=">"))
# Table XI Volume Coverage Pattern Data (Message Type 5 & 7)
# pages 3-51 to 3-54
MSG_5 = OrderedDict(
[
("message_size", UINT2),
("pattern_type", CODE2),
("pattern_number", UINT2),
("number_elevation_cuts", UINT2),
("clutter_map_group_number", UINT2),
("doppler_velocity_resolution", CODE1), # 2: 0.5 degrees, 4: 1.0 degrees
("pulse_width", CODE1), # 2: short, 4: long
("spare", {"fmt": "10s"}), # halfwords 7-11 (10 bytes, 5 halfwords)
]
)
LEN_MSG_5 = struct.calcsize(_get_fmt_string(MSG_5, byte_order=">"))
MSG_5_ELEV = OrderedDict(
[
("elevation_angle", BIN2), # scaled by 360/65536 for value in degrees.
("channel_config", CODE1),
("waveform_type", CODE1),
("super_resolution", CODE1),
("prf_number", UINT1),
("prf_pulse_count", UINT2),
("azimuth_rate", CODE2),
("ref_thresh", SINT2),
("vel_thresh", SINT2),
("sw_thresh", SINT2),
("zdr_thres", SINT2),
("phi_thres", SINT2),
("rho_thres", SINT2),
("edge_angle_1", CODE2),
("dop_prf_num_1", UINT2),
("dop_prf_pulse_count_1", UINT2),
("spare_1", {"fmt": "2s"}),
("edge_angle_2", CODE2),
("dop_prf_num_2", UINT2),
("dop_prf_pulse_count_2", UINT2),
("spare_2", {"fmt": "2s"}),
("edge_angle_3", CODE2),
("dop_prf_num_3", UINT2),
("dop_prf_pulse_count_3", UINT2),
("spare_3", {"fmt": "2s"}),
]
)
LEN_MSG_5_ELEV = struct.calcsize(_get_fmt_string(MSG_5_ELEV, byte_order=">"))
MSG_18 = OrderedDict(
[
("adap_file_name", {"fmt": "12s"}),
("adap_format", {"fmt": "4s"}),
("adap_revision", {"fmt": "4s"}),
("adap_date", {"fmt": "12s"}),
("adap_time", {"fmt": "12s"}),
("k1", FLT4),
("az_lat", FLT4),
("k3", FLT4),
("el_lat", FLT4),
("park_az", FLT4),
("park_el", FLT4),
("a_fuel_conv0", FLT4),
("a_fuel_conv1", FLT4),
("a_fuel_conv2", FLT4),
("a_fuel_conv3", FLT4),
("a_fuel_conv4", FLT4),
("a_fuel_conv5", FLT4),
("a_fuel_conv6", FLT4),
("a_fuel_conv7", FLT4),
("a_fuel_conv8", FLT4),
("a_fuel_conv9", FLT4),
("a_fuel_conv10", FLT4),
("a_min_shelter_temp", FLT4),
("a_max_shelter_temp", FLT4),
("a_min_shelter_ac_temp_diff", FLT4),
("a_max_xmtr_air_temp", FLT4),
("a_max_rad_temp", FLT4),
("a_max_rad_temp_rise", FLT4),
("ped_28v_reg_lim", FLT4),
("ped_5v_reg_lim", FLT4),
("ped_15v_reg_lim", FLT4),
("a_min_gen_room_temp", FLT4),
("a_max_gen_room_temp", FLT4),
("dau_5v_reg_lim", FLT4),
("dau_15v_reg_lim", FLT4),
("dau_28v_reg_lim", FLT4),
("en_5v_reg_lim", FLT4),
("en_5v_nom_volts", FLT4),
("rpg_co_located", {"fmt": "4s"}),
("spec_filter_installed", {"fmt": "4s"}),
("tps_installed", {"fmt": "4s"}),
("rms_installed", {"fmt": "4s"}),
("a_hvdl_tst_int", UINT4),
("a_rpg_lt_int", UINT4),
("a_min_stab_util_pwr_time", UINT4),
("a_gen_auto_exer_interval", UINT4),
("a_util_pwr_sw_req_interval", UINT4),
("a_low_fuel_level", FLT4),
("config_chan_number", UINT4),
("a_rpg_link_type", UINT4),
("redundant_chan_config", UINT4),
]
)
for i in np.arange(104):
MSG_18[f"atten_table_{i:03d}"] = FLT4
MSG_18.update(
[
("spare_01", {"fmt": "24s"}),
("path_losses_07", FLT4),
("spare_02", {"fmt": "12s"}),
("path_losses_11", FLT4),
("spare_03", {"fmt": "4s"}),
("path_losses_13", FLT4),
("path_losses_14", FLT4),
("spare_04", {"fmt": "16s"}),
("path_losses_19", FLT4),
("spare_05", {"fmt": "32s"}),
]
)
for i in np.arange(28, 48):
MSG_18[f"path_losses_{i:02d}"] = FLT4
MSG_18.update(
[
("h_coupler_cw_loss", FLT4),
("v_coupler_xmt_loss", FLT4),
("path_losses_50", FLT4),
("spare_06", {"fmt": "4s"}),
("path_losses_52", FLT4),
("v_coupler_cw_loss", FLT4),
("path_losses_54", FLT4),
("spare_07", {"fmt": "12s"}),
("path_losses_58", FLT4),
("path_losses_59", FLT4),
("path_losses_60", FLT4),
("path_losses_61", FLT4),
("spare_08", {"fmt": "4s"}),
("path_losses_63", FLT4),
("path_losses_64", FLT4),
("path_losses_65", FLT4),
("path_losses_66", FLT4),
("path_losses_67", FLT4),
("path_losses_68", FLT4),
("path_losses_69", FLT4),
("chan_cal_diff", FLT4),
("spare_09", {"fmt": "4s"}),
("log_amp_factor_1", FLT4),
("log_amp_factor_2", FLT4),
("v_ts_cw", FLT4),
]
)
for i in np.arange(13):
MSG_18[f"rnscale_{i:02d}"] = FLT4
for i in np.arange(13):
MSG_18[f"atmos_{i:02d}"] = FLT4
for i in np.arange(12):
MSG_18[f"el_index_{i:02d}"] = FLT4
MSG_18.update(
[
("tfreq_mhz", UINT4),
("base_data_tcn", FLT4),
("refl_data_tover", FLT4),
("tar_h_dbz0_lp", FLT4),
("tar_v_dbz0_lp", FLT4),
("init_phi_dp", UINT4),
("norm_init_phi_dp", UINT4),
("lx_lp", FLT4),
("lx_sp", FLT4),
("meteor_param", FLT4),
("beamwidth", FLT4),
("antenna_gain", FLT4),
("spare_10", {"fmt": "4s"}),
("vel_maint_limit", FLT4),
("wth_maint_limit", FLT4),
("vel_degrad_limit", FLT4),
("wth_degrad_limit", FLT4),
("h_noisetemp_degrad_limit", FLT4),
("h_noisetemp_maint_limit", FLT4),
("v_noisetemp_degrad_limit", FLT4),
("v_noisetemp_maint_limit", FLT4),
("kly_degrade_limit", FLT4),
("ts_coho", FLT4),
("h_ts_cw", FLT4),
("ts_rf_sp", FLT4),
("ts_rf_lp", FLT4),
("ts_stalo", FLT4),
("ame_h_noise_enr", FLT4),
("xmtr_peak_pwr_high_limit", FLT4),
("xmtr_peak_pwr_low_limit", FLT4),
("h_dbz0_delta_limit", FLT4),
("threshold1", FLT4),
("threshold2", FLT4),
("clut_supp_dgrad_lim", FLT4),
("clut_supp_maint_lim", FLT4),
("range0_value", FLT4),
("xmtr_pwr_mtr_scale", FLT4),
("v_dbz0_delta_limit", FLT4),
("tar_h_dbz0_sp", FLT4),
("tar_v_dbz0_sp", FLT4),
("deltaprf", UINT4),
("spare_11", {"fmt": "8s"}),
("tau_sp", UINT4),
("tau_lp", UINT4),
("nc_dead_value", UINT4),
("tau_rf_sp", UINT4),
("tau_rf_lp", UINT4),
("seg1lim", FLT4),
("slatsec", FLT4),
("slonsec", FLT4),
("spare_12", {"fmt": "4s"}),
("slatdeg", UINT4),
("slatmin", UINT4),
("slondeg", UINT4),
("slonmin", UINT4),
("slatdir", {"fmt": "4s"}),
("slondir", {"fmt": "4s"}),
("spare_13", {"fmt": "4s"}),
("vcpat_11", {"fmt": "1172s"}),
("vcpat_21", {"fmt": "1172s"}),
("vcpat_31", {"fmt": "1172s"}),
("vcpat_32", {"fmt": "1172s"}),
("vcpat_300", {"fmt": "1172s"}),
("vcpat_301", {"fmt": "1172s"}),
("az_correction_factor", FLT4),
("el_correction_factor", FLT4),
("site_name", {"fmt": "4s"}),
("ant_manual_setup_ielmin", SINT4),
("ant_manual_setup_ielmax", SINT4),
("ant_manual_setup_fazvelmax", SINT4),
("ant_manual_setup_felvelmax", SINT4),
("ant_manual_setup_ignd_hgt", SINT4),
("ant_manual_setup_irad_hgt", SINT4),
("spare_14", {"fmt": "300s"}),
("rvp8nv_iwaveguide_lenght", UINT4),
("spare_15", {"fmt": "44s"}),
("vel_data_tover", FLT4),
("width_data_tover", FLT4),
("spare_16", {"fmt": "12s"}),
("doppler_range_start", FLT4),
("max_el_index", UINT4),
("seg2lim", FLT4),
("seg3lim", FLT4),
("seg4lim", FLT4),
("nbr_el_segments", UINT4),
("h_noise_long", FLT4),
("ant_noise_temp", FLT4),
("h_noise_short", FLT4),
("h_noise_tolerance", FLT4),
("min_h_dyn_range", FLT4),
("gen_installed", {"fmt": "4s"}),
("gen_exercise", {"fmt": "4s"}),
("v_noise_tolerance", FLT4),
("min_v_dyn_range", FLT4),
("zdr_bias_dgrad_lim", FLT4),
("spare_17", {"fmt": "16s"}),
("v_noise_long", FLT4),
("v_noise_short", FLT4),
("zdr_data_tover", FLT4),
("phi_data_tover", FLT4),
("rho_data_tover", FLT4),
("stalo_power_dgrad_limit", FLT4),
("stalo_power_maint_limit", FLT4),
("min_h_pwr_sense", FLT4),
("min_v_pwr_sense", FLT4),
("h_pwr_sense_offset", FLT4),
("v_pwr_sense_offset", FLT4),
("ps_gain_ref", FLT4),
("rf_pallet_broad_loss", FLT4),
("zdr_check_threshold", FLT4),
("phi_check_threshold", FLT4),
("rho_check_threshold", FLT4),
("spare_18", {"fmt": "52s"}),
("ame_ps_tolerance", FLT4),
("ame_max_temp", FLT4),
("ame_min_temp", FLT4),
("rcvr_mod_max_temp", FLT4),
("rcvr_mod_min_temp", FLT4),
("bite_mod_max_temp", FLT4),
("bite_mod_min_temp", FLT4),
("default_polarization", UINT4),
("tr_limit_dgrad_limit", FLT4),
("tr_limit_fail_limit", FLT4),
("spare_19", {"fmt": "8s"}),
("ame_current_tolerance", FLT4),
("h_only_polarization", UINT4),
("v_only_polarization", UINT4),
("spare_20", {"fmt": "8s"}),
("reflector_bias", FLT4),
("a_min_shelter_temp_warn", FLT4),
("spare_21", {"fmt": "432s"}),
]
)
LEN_MSG_18 = struct.calcsize(_get_fmt_string(MSG_18, byte_order=">"))
DATA_BLOCK_HEADER = OrderedDict(
[("block_type", string_dict(1)), ("data_name", string_dict(3))]
)
LEN_DATA_BLOCK_HEADER = struct.calcsize(
_get_fmt_string(DATA_BLOCK_HEADER, byte_order=">")
)
# Table XVII-B Data Block (Descriptor of Generic Data Moment Type)
# pages 3-90 and 3-91
GENERIC_DATA_BLOCK = OrderedDict(
[
("reserved", UINT4),
("ngates", UINT2),
("first_gate", SINT2),
("gate_spacing", SINT2),
("thresh", SINT2),
("snr_thres", SINT2),
("flags", CODE1),
("word_size", UINT1),
("scale", FLT4),
("offset", FLT4),
# then data
]
)
LEN_GENERIC_DATA_BLOCK = struct.calcsize(
_get_fmt_string(GENERIC_DATA_BLOCK, byte_order=">")
)
# Table XVII-E Data Block (Volume Data Constant Type)
# page 3-92
VOLUME_DATA_BLOCK = OrderedDict(
[
("lrtup", UINT2),
("version_major", UINT1),
("version_minor", UINT1),
("lat", FLT4),
("lon", FLT4),
("height", SINT2),
("feedhorn_height", UINT2),
("refl_calib", FLT4),
("power_h", FLT4),
("power_v", FLT4),
("diff_refl_calib", FLT4),
("init_phase", FLT4),
("vcp", UINT2),
("spare", {"fmt": "2s"}),
]
)
LEN_VOLUME_DATA_BLOCK = struct.calcsize(
_get_fmt_string(VOLUME_DATA_BLOCK, byte_order=">")
)
# Table XVII-F Data Block (Elevation Data Constant Type)
# page 3-93
ELEVATION_DATA_BLOCK = OrderedDict(
[
("lrtup", UINT2),
("atmos", SINT2),
("refl_calib", FLT4),
]
)
LEN_ELEVATION_DATA_BLOCK = struct.calcsize(
_get_fmt_string(ELEVATION_DATA_BLOCK, byte_order=">")
)
# Table XVII-H Data Block (Radial Data Constant Type)
# pages 3-93
RADIAL_DATA_BLOCK = OrderedDict(
[
("lrtup", UINT2),
("unambig_range", SINT2),
("noise_h", FLT4),
("noise_v", FLT4),
("nyquist_vel", SINT2),
("spare", {"fmt": "2s"}),
]
)
LEN_RADIAL_DATA_BLOCK = struct.calcsize(
_get_fmt_string(RADIAL_DATA_BLOCK, byte_order=">")
)
DATA_BLOCK_CONSTANT_IDENTIFIERS = OrderedDict(
[
("VOL", VOLUME_DATA_BLOCK),
("ELV", ELEVATION_DATA_BLOCK),
("RAD", RADIAL_DATA_BLOCK),
]
)
DATA_BLOCK_VARIABLE_IDENTIFIERS = OrderedDict(
[
("REF", GENERIC_DATA_BLOCK),
("VEL", GENERIC_DATA_BLOCK),
("SW ", GENERIC_DATA_BLOCK),
("ZDR", GENERIC_DATA_BLOCK),
("PHI", GENERIC_DATA_BLOCK),
("RHO", GENERIC_DATA_BLOCK),
("CFP", GENERIC_DATA_BLOCK),
]
)
DATA_BLOCK_TYPE_IDENTIFIER = OrderedDict(
[
("R", DATA_BLOCK_CONSTANT_IDENTIFIERS),
("D", DATA_BLOCK_VARIABLE_IDENTIFIERS),
]
)
class NexradLevel2ArrayWrapper(BackendArray):
"""Wraps array of NexradLevel2 data."""
def __init__(self, datastore, name, var):
self.datastore = datastore
self.group = datastore._group
self.name = name
# get rays and bins
nrays = (
datastore.ds["record_end"]
- datastore.ds["record_number"]
+ 1
- len(datastore.ds["intermediate_records"])
)
nbins = max([v["ngates"] for k, v in datastore.ds["sweep_data"].items()])
word_size = datastore.ds["sweep_data"][name]["word_size"]
width = {8: 1, 16: 2}[word_size]
self.dtype = np.dtype(f">u{width}")
self.shape = (nrays, nbins)
def _getitem(self, key):
# read the data if not available
try:
data = self.datastore.ds["sweep_data"][self.name]["data"]
except KeyError:
self.datastore.root.get_data(self.group, self.name)
data = self.datastore.ds["sweep_data"][self.name]["data"]
# see 3.2.4.17.6 Table XVII-I Data Moment Characteristics and Conversion for Data Names
word_size = self.datastore.ds["sweep_data"][self.name]["word_size"]
if self.name == "PHI" and word_size == 16:
# 10 bit mask, but only for 2 byte data
x = np.uint16(0x3FF)
elif self.name == "ZDR" and word_size == 16:
# 11 bit mask, but only for 2 byte data
x = np.uint16(0x7FF)
else:
x = np.uint8(0xFF)
if len(data[0]) < self.shape[1]:
return np.pad(
np.vstack(data) & x,
((0, 0), (0, self.shape[1] - len(data[0]))),
mode="constant",
constant_values=0,
)[key]
else:
return (np.vstack(data) & x)[key]
def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
key,
self.shape,
indexing.IndexingSupport.BASIC,
self._getitem,
)
class NexradLevel2Store(AbstractDataStore):
def __init__(self, manager, group=None):
self._manager = manager
self._group = int(group[6:])
self._filename = self.filename
@classmethod
def open(cls, filename, mode="r", group=None, **kwargs):
manager = CachingFileManager(
NEXRADLevel2File, filename, mode=mode, kwargs=kwargs
)
return cls(manager, group=group)
@property
def filename(self):
with self._manager.acquire_context(False) as root:
return root.filename
@property
def root(self):
with self._manager.acquire_context(False) as root:
return root
def _acquire(self, needs_lock=True):
with self._manager.acquire_context(needs_lock) as root:
root.get_sweep(self._group)
ds = root.data[self._group]
return ds
@property
def ds(self):
return self._acquire()
def open_store_variable(self, name, var):
dim = "azimuth"
data = indexing.LazilyOuterIndexedArray(
NexradLevel2ArrayWrapper(self, name, var)
)
encoding = {"group": self._group, "source": self._filename}
mname = nexrad_mapping.get(name, name)
mapping = sweep_vars_mapping.get(mname, {})
attrs = {key: mapping[key] for key in moment_attrs if key in mapping}
attrs["scale_factor"] = 1.0 / var["scale"]
attrs["add_offset"] = -var["offset"] / var["scale"]
attrs["coordinates"] = (
"elevation azimuth range latitude longitude altitude time"
)
return mname, Variable((dim, "range"), data, attrs, encoding)
def open_store_coordinates(self):
msg_31_header = self.root.msg_31_header[self._group]
# azimuth/elevation
azimuth = np.array([ms["azimuth_angle"] for ms in msg_31_header])
elevation = np.array([ms["elevation_angle"] for ms in msg_31_header])
# time
# date is 1-based (1 -> 1970-01-01T00:00:00), so we need to subtract 1
date = (np.array([ms["collect_date"] for ms in msg_31_header]) - 1) * 86400e3
milliseconds = np.array([ms["collect_ms"] for ms in msg_31_header])
rtime = date + milliseconds
time_prefix = "milli"
rtime_attrs = get_time_attrs(date_unit=f"{time_prefix}seconds")
# site coords
vol = self.ds["sweep_constant_data"]["VOL"]
lon, lat, alt = vol["lon"], vol["lat"], vol["height"] + vol["feedhorn_height"]
# range
sweep_data = list(self.ds["sweep_data"].values())[0]
first_gate = sweep_data["first_gate"]
gate_spacing = sweep_data["gate_spacing"]
ngates = max([v["ngates"] for k, v in self.ds["sweep_data"].items()])
last_gate = first_gate + gate_spacing * (ngates - 0.5)
rng = np.arange(first_gate, last_gate, gate_spacing, "float32")
range_attrs = get_range_attrs(rng)
encoding = {"group": self._group}
dim = "azimuth"
sweep_mode = "azimuth_surveillance"
sweep_number = self._group
prt_mode = "not_set"
follow_mode = "not_set"
fixed_angle = self.root.msg_5["elevation_data"][self._group]["elevation_angle"]
coords = {
"azimuth": Variable((dim,), azimuth, get_azimuth_attrs(), encoding),
"elevation": Variable((dim,), elevation, get_elevation_attrs(), encoding),
"time": Variable((dim,), rtime, rtime_attrs, encoding),
"range": Variable(("range",), rng, range_attrs),
"longitude": Variable((), lon, get_longitude_attrs()),
"latitude": Variable((), lat, get_latitude_attrs()),
"altitude": Variable((), alt, get_altitude_attrs()),
"sweep_mode": Variable((), sweep_mode),
"sweep_number": Variable((), sweep_number),
"prt_mode": Variable((), prt_mode),
"follow_mode": Variable((), follow_mode),
"sweep_fixed_angle": Variable((), fixed_angle),
}
return coords
def get_variables(self):
return FrozenDict(
(k1, v1)
for k1, v1 in {
**dict(
self.open_store_variable(k, v)
for k, v in self.ds["sweep_data"].items()
),
**self.open_store_coordinates(),
}.items()
)
def get_attrs(self):
attributes = [("instrument_name", self.root.volume_header["icao"].decode())]
return FrozenDict(attributes)
[docs]
class NexradLevel2BackendEntrypoint(BackendEntrypoint):
"""
Xarray BackendEntrypoint for NEXRAD Level2 Data
"""
description = "Open NEXRAD Level2 files in Xarray"
url = "tbd"
def open_dataset(
self,
filename_or_obj,
*,
mask_and_scale=True,
decode_times=True,
concat_characters=True,
decode_coords=True,
drop_variables=None,
use_cftime=None,
decode_timedelta=None,
group=None,
first_dim="auto",
reindex_angle=False,
fix_second_angle=False,
site_coords=True,
optional=True,
):
store = NexradLevel2Store.open(
filename_or_obj,
group=group,
loaddata=False,
)
store_entrypoint = StoreBackendEntrypoint()
ds = store_entrypoint.open_dataset(
store,
mask_and_scale=mask_and_scale,
decode_times=decode_times,
concat_characters=concat_characters,
decode_coords=decode_coords,
drop_variables=drop_variables,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
)
# reassign azimuth/elevation/time coordinates
ds = ds.assign_coords({"azimuth": ds.azimuth})
ds = ds.assign_coords({"elevation": ds.elevation})
ds = ds.assign_coords({"time": ds.time})
ds.encoding["engine"] = "nexradlevel2"
# handle duplicates and reindex
if decode_coords and reindex_angle is not False:
ds = ds.pipe(util.remove_duplicate_rays)
ds = ds.pipe(util.reindex_angle, **reindex_angle)
ds = ds.pipe(util.ipol_time, **reindex_angle)
# handling first dimension
dim0 = "elevation" if ds.sweep_mode.load() == "rhi" else "azimuth"
# todo: could be optimized
if first_dim == "time":
ds = ds.swap_dims({dim0: "time"})
ds = ds.sortby("time")
else:
ds = ds.sortby(dim0)
# assign geo-coords
if site_coords:
ds = ds.assign_coords(
{
"latitude": ds.latitude,
"longitude": ds.longitude,
"altitude": ds.altitude,
}
)
return ds
[docs]
def open_nexradlevel2_datatree(filename_or_obj, **kwargs):
"""Open NEXRAD Level2 dataset as :py:class:`xarray.DataTree`.
Parameters
----------
filename_or_obj : str, Path, file-like or DataStore
Strings and Path objects are interpreted as a path to a local or remote
radar file
Keyword Arguments
-----------------
sweep : int, list of int, optional
Sweep number(s) to extract, default to first sweep. If None, all sweeps are
extracted into a list.
first_dim : str
Can be ``time`` or ``auto`` first dimension. If set to ``auto``,
first dimension will be either ``azimuth`` or ``elevation`` depending on
type of sweep. Defaults to ``auto``.
reindex_angle : bool or dict
Defaults to False, no reindexing. Given dict should contain the kwargs to
reindex_angle. Only invoked if `decode_coord=True`.
fix_second_angle : bool
If True, fixes erroneous second angle data. Defaults to ``False``.
site_coords : bool
Attach radar site-coordinates to Dataset, defaults to ``True``.
kwargs : dict
Additional kwargs are fed to :py:func:`xarray.open_dataset`.
Returns
-------
dtree: xarray.DataTree
DataTree
"""
# handle kwargs, extract first_dim
backend_kwargs = kwargs.pop("backend_kwargs", {})
# first_dim = backend_kwargs.pop("first_dim", None)
sweep = kwargs.pop("sweep", None)
sweeps = []
kwargs["backend_kwargs"] = backend_kwargs
if isinstance(sweep, str):
sweeps = [sweep]
elif isinstance(sweep, int):
sweeps = [f"sweep_{sweep}"]
elif isinstance(sweep, list):
if isinstance(sweep[0], int):
sweeps = [f"sweep_{i}" for i in sweep]
else:
sweeps.extend(sweep)
else:
with NEXRADLevel2File(filename_or_obj, loaddata=False) as nex:
nsweeps = nex.msg_5["number_elevation_cuts"]
sweeps = [f"sweep_{i}" for i in range(nsweeps)]
engine = NexradLevel2BackendEntrypoint
# todo: only open once! Needs new xarray built in datatree!
ls_ds: list[xr.Dataset] = []
for swp in sweeps:
try:
dsx = xr.open_dataset(filename_or_obj, group=swp, engine=engine, **kwargs)
except IndexError:
break
else:
ls_ds.append(dsx)
ls_ds.insert(0, xr.Dataset())
dtree: dict = {
"/": _assign_root(ls_ds),
"/radar_parameters": _get_subgroup(ls_ds, radar_parameters_subgroup),
"/georeferencing_correction": _get_subgroup(
ls_ds, georeferencing_correction_subgroup
),
"/radar_calibration": _get_radar_calibration(ls_ds, radar_calibration_subgroup),
}
dtree = _attach_sweep_groups(dtree, ls_ds[1:])
return DataTree.from_dict(dtree)