Source code for pyiem.mrms

"""Multi-RADAR Multi-Sensor (MRMS) Helper Functions.

Hopefully useful functions to help with the processing of MRMS data
"""

import gzip
import os
import struct
from datetime import datetime, timezone
from typing import Optional

import httpx
import numpy as np
from affine import Affine

from pyiem.util import LOG

# NOTE: This is the info for the MRMS grib products, NOT the IEM netcdf
# This is the center of the corner pixels
WEST = -129.995
NORTH = 54.995
SOUTH = 20.005
EAST = -60.005
DX = 0.01
DY = 0.01
NX = 7000
NY = 3500

# These are the edges of the domain
WEST_EDGE = -130.0
EAST_EDGE = -60.0
NORTH_EDGE = 55.0
SOUTH_EDGE = 20.0
XAXIS = np.linspace(WEST, EAST, NX)
YAXIS = np.linspace(SOUTH, NORTH, NY)
AFFINE = Affine(0.01, 0.0, WEST_EDGE, 0.0, -0.01, NORTH_EDGE)

# NOTE: These are the values for the IEM netcdf files, a smaller domain
MRMS4IEMRE_WEST_EDGE = -126.0
MRMS4IEMRE_EAST_EDGE = -65.0
MRMS4IEMRE_NORTH_EDGE = 50.0
MRMS4IEMRE_SOUTH_EDGE = 23.0
MRMS4IEMRE_WEST = -125.995
MRMS4IEMRE_EAST = -65.005
MRMS4IEMRE_NORTH = 49.995
MRMS4IEMRE_SOUTH = 23.005
MRMS4IEMRE_DX = 0.01
MRMS4IEMRE_DY = 0.01
MRMS4IEMRE_NX = 6100
MRMS4IEMRE_NY = 2700
MRMS4IEMRE_AFFINE = Affine(
    DX, 0.0, MRMS4IEMRE_WEST_EDGE, 0.0, -DY, MRMS4IEMRE_NORTH_EDGE
)
# How the netcdf files are stored
MRMS4IEMRE_AFFINE_NC = Affine(
    DX, 0.0, MRMS4IEMRE_WEST_EDGE, 0.0, DY, MRMS4IEMRE_SOUTH_EDGE
)
MRMS4IEMRE_XAXIS = np.linspace(MRMS4IEMRE_WEST, MRMS4IEMRE_EAST, MRMS4IEMRE_NX)
MRMS4IEMRE_YAXIS = np.linspace(
    MRMS4IEMRE_SOUTH, MRMS4IEMRE_NORTH, MRMS4IEMRE_NY
)


[docs] def find_ij(lon: float, lat: float) -> tuple[Optional[int], Optional[int]]: """CAUTION: This is for the netcdf files, not grib2 files.""" if ( lon < MRMS4IEMRE_WEST_EDGE or lon >= MRMS4IEMRE_EAST_EDGE or lat < MRMS4IEMRE_SOUTH_EDGE or lat >= MRMS4IEMRE_NORTH_EDGE ): return None, None j = int((lat - MRMS4IEMRE_SOUTH_EDGE) / DY) i = int((lon - MRMS4IEMRE_WEST_EDGE) / DX) return i, j
[docs] def is_gzipped(text): """Check that we have gzipped content.""" return text[:2] == b"\x1f\x8b"
[docs] def get_url(center, valid, product): """Return the URL given the provided options.""" fn = f"{product}_00.00_{valid:%Y%m%d-%H%M}00.grib2.gz" if center == "mtarchive": uri = ( f"https://mtarchive.geol.iastate.edu/{valid:%Y/%m/%d}" f"/mrms/ncep/{product}/{fn}" ) if 2000 < valid.year < 2012 and product == "PrecipRate": uri = ( f"https://mtarchive.geol.iastate.edu/{valid:%Y/%m/%d}" f"/mrms/reanalysis/{product}/{fn}" ) else: uri = f"https://mrms{center}.ncep.noaa.gov/2D/{product}/MRMS_{fn}" return uri
[docs] def fetch(product, valid: datetime, tmpdir="/mesonet/tmp"): """Get a desired MRMS product. Applies the following logic: - does the file exist in `tmpdir`? - can I fetch it from mtarchive? - if recent, can I fetch it from NOAA MRMS website Args: product(str): MRMS product type valid(datetime): Datetime object for desired timestamp tmpdir(str,optional): location to check/place the downloaded file """ fn = f"{product}_00.00_{valid:%Y%m%d-%H%M}00.grib2.gz" tmpfn = os.path.join(tmpdir, fn) # Option 1, we have this file already in cache! if os.path.isfile(tmpfn): LOG.info("Found %s in tmpdir cache", tmpfn) return tmpfn # Option 2, go fetch it from mtarchive url = get_url("mtarchive", valid, product) try: resp = httpx.get(url, timeout=30) resp.raise_for_status() except Exception: LOG.info("Failed to fetch %s", url) resp = None if resp and is_gzipped(resp.content): with open(tmpfn, "wb") as fd: fd.write(resp.content) return tmpfn # Option 3, we go look at MRMS website, if timestamp is recent utcnow = datetime.now(timezone.utc) if valid.tzinfo is None: utcnow = utcnow.replace(tzinfo=None) if (utcnow - valid).total_seconds() > 86400: # Can't do option 3! return None # Loop over all IDP data centers for center in ["", "-bldr", "-cprk"]: url = get_url(center, valid, product) try: resp = httpx.get(url, timeout=30) resp.raise_for_status() except Exception: LOG.info("Failed to fetch %s", url) resp = None if resp and is_gzipped(resp.content): with open(tmpfn, "wb") as fd: fd.write(resp.content) return tmpfn return None
[docs] def make_colorramp(): """Make me a crude color ramp.""" c = np.zeros((256, 3), int) # Ramp blue for b in range(0, 37): c[b, 2] = 255 for b in range(37, 77): c[b, 2] = (77 - b) * 6 for b in range(160, 196): c[b, 2] = (b - 160) * 6 for b in range(196, 256): c[b, 2] = 254 # Ramp Green up for g in range(0, 37): c[g, 1] = g * 6 for g in range(37, 116): c[g, 1] = 254 for g in range(116, 156): c[g, 1] = (156 - g) * 6 for g in range(196, 256): c[g, 1] = (g - 196) * 4 # and Red for r in range(77, 117): c[r, 0] = (r - 77) * 6.0 for r in range(117, 256): c[r, 0] = 254 # Gray for missing c[255, :] = [144, 144, 144] # Black to remove, eventually c[0, :] = [0, 0, 0] return tuple(c.ravel())
[docs] def reader(fn): """Return metadata and the data.""" fp = gzip.open(fn, "rb") metadata = {} ( year, month, day, hour, minute, second, nx, ny, nz, _, _, _, _, _, _, _, _, ul_lon_cc, ul_lat_cc, _, scale_lon, scale_lat, grid_scale, ) = struct.unpack("9i4c10i", fp.read(80)) metadata["ul_lon_cc"] = ul_lon_cc / float(scale_lon) metadata["ul_lat_cc"] = ul_lat_cc / float(scale_lat) # Calculate metadata["ll_lon_cc"] = metadata["ul_lon_cc"] metadata["ll_lat_cc"] = metadata["ul_lat_cc"] - ( (scale_lat / float(grid_scale)) * (ny - 1) ) metadata["ll_lat"] = ( metadata["ll_lat_cc"] - (scale_lat / float(grid_scale)) / 2.0 ) metadata["ul_lat"] = ( metadata["ul_lat_cc"] - (scale_lat / float(grid_scale)) / 2.0 ) metadata["ll_lon"] = ( metadata["ll_lon_cc"] - (scale_lon / float(grid_scale)) / 2.0 ) metadata["ul_lon"] = ( metadata["ul_lon_cc"] - (scale_lon / float(grid_scale)) / 2.0 ) metadata["valid"] = datetime( year, month, day, hour, minute, second ).replace(tzinfo=timezone.utc) struct.unpack(f"{nz}i", fp.read(nz * 4)) # levels struct.unpack("i", fp.read(4)) # z_scale struct.unpack("10i", fp.read(40)) # bogus struct.unpack("20c", fp.read(20)) # varname metadata["unit"] = struct.unpack("6c", fp.read(6)) var_scale, _, num_radars = struct.unpack("3i", fp.read(12)) struct.unpack(f"{num_radars * 4}c", fp.read(num_radars * 4)) # rad_list sz = nx * ny * nz data = struct.unpack(f"{sz}h", fp.read(sz * 2)) data = np.reshape(np.array(data), (ny, nx)) / float(var_scale) fp.close() return metadata, data
[docs] def write_worldfile(filename): """Write a worldfile to the given filename. Args: filename (str): filename to write the world file information to """ # World file defines the center of the upper left pixel with open(filename, "w", encoding="utf8") as fd: fd.write(f"0.01\n0.00\n0.00\n-0.01\n{WEST}\n{NORTH}")