"""Support library for the IEM Reanalysis code."""
from datetime import datetime, timezone
from typing import Optional, Tuple
from zoneinfo import ZoneInfo
import numpy as np
import pyproj
import xarray as xr
from affine import Affine
from psycopg.sql import SQL, Identifier
from rasterio.warp import Resampling, reproject
from pyiem.database import get_dbconn
from pyiem.util import LOG, utc
# Legacy constants prior to addition of other IEMRE domains
# 1/8 degree grid
# This is the center of the grid cells at the corners of the grid
SOUTH = 23.0
WEST = -126.0
NORTH = 49.875
EAST = -65.125
# These are the outside edges of the domain
SOUTH_EDGE = 22.9375
WEST_EDGE = -126.0625
NORTH_EDGE = 49.9375
EAST_EDGE = -65.0625
DX = 0.125
DY = 0.125
NX = 488
NY = 216
XAXIS = np.linspace(WEST, EAST, NX)
YAXIS = np.linspace(SOUTH, NORTH, NY)
AFFINE = Affine(DX, 0.0, WEST_EDGE, 0.0, 0 - DY, NORTH_EDGE)
AFFINE_NATIVE = Affine(DX, 0.0, WEST_EDGE, 0.0, DY, SOUTH_EDGE)
# Definition of analysis domains for IEMRE
DOMAINS = {
"conus": {
"west": WEST,
"east": EAST,
"south": SOUTH,
"north": NORTH,
"south_edge": SOUTH - DY / 2.0,
"north_edge": NORTH + DY / 2.0,
"west_edge": WEST - DX / 2.0,
"east_edge": EAST + DX / 2.0,
"nx": NX,
"ny": NY,
"affine": AFFINE,
"affine_native": AFFINE_NATIVE,
"tzinfo": ZoneInfo("America/Chicago"),
},
"china": {
"west": 70,
"east": 139.875,
"south": 15,
"north": 54.875,
"west_edge": 69.9375,
"east_edge": 139.9375,
"south_edge": 14.9375,
"north_edge": 54.9375,
"nx": 560,
"ny": 320,
"affine": Affine(DX, 0.0, 69.9375, 0.0, 0 - DY, 54.9375),
"affine_native": Affine(DX, 0.0, 69.9375, 0.0, DY, 14.9375),
"tzinfo": ZoneInfo("Asia/Shanghai"),
},
"europe": {
"west": -10,
"east": 39.875,
"south": 35,
"north": 69.875,
"west_edge": -10.0625,
"east_edge": 39.9375,
"south_edge": 34.9375,
"north_edge": 69.9375,
"nx": 400,
"ny": 280,
"affine": Affine(DX, 0.0, -10.0625, 0.0, 0 - DY, 69.9375),
"affine_native": Affine(DX, 0.0, -10.0625, 0.0, DY, 34.9375),
"tzinfo": ZoneInfo("Europe/Paris"),
},
"sa": {
"west": -81.5,
"east": -34.125,
"south": -55.875,
"north": 12.5,
"west_edge": -81.5625,
"east_edge": -34.0625,
"south_edge": -55.9375,
"north_edge": 12.5625,
"nx": 380,
"ny": 548,
"affine": Affine(DX, 0.0, -81.5625, 0.0, 0 - DY, 12.5625),
"affine_native": Affine(DX, 0.0, -81.5625, 0.0, DY, -55.9375),
"tzinfo": ZoneInfo("America/Sao_Paulo"),
},
}
[docs]
def d2l(val) -> str:
"""Convert a domain label to a string used within filenames."""
if val is None or val in ["", "conus"]:
return "iemre"
return f"iemre_{val}"
[docs]
def get_table(valid):
"""Figure out which table should be used for given valid.
Args:
valid (datetime or date): which time is of interest
Returns:
str tablename
"""
# careful here, a datetime is not an instance of date
if isinstance(valid, datetime):
table = f"iemre_hourly_{valid.astimezone(timezone.utc):%Y%m}"
else:
table = f"iemre_daily_{valid.year}"
return table
[docs]
def set_grids(valid, ds, table: str | None = None, domain: str = "conus"):
"""Update the database with a given ``xarray.Dataset``.
Args:
valid (datetime or date): If datetime, save hourly, if date, save daily
ds (xarray.Dataset): The xarray dataset to save
table (str,optional): hard coded database table to use to set the data
on. Usually dynamically computed.
domain (str,optional): IEMRE domain to save data to
"""
table = Identifier(table if table is not None else get_table(valid))
dom = DOMAINS[domain]
pgconn = get_dbconn(d2l(domain))
cursor = pgconn.cursor()
# Do we currently have database entries?
cursor.execute(
SQL("SELECT valid from {} WHERE valid = %s LIMIT 1").format(table),
(valid,),
)
if cursor.rowcount == 0:
# Create entries
cursor.execute(
SQL(
"insert into {}(gid, valid) select gid, %s from iemre_grid"
).format(table),
(valid,),
)
cursor.close()
pgconn.commit()
# NB: whatever reason, postgresql is having a hard time with the query
# plan on these newly inserted rows, so we force a vacuum analyze
pgconn.autocommit = True
pgconn.execute(SQL("VACUUM ANALYZE {}").format(table))
pgconn.autocommit = False
cursor = pgconn.cursor()
# Now we do our update.
query = SQL("update {} set {} where valid = %s and gid = %s").format(
table,
SQL(",".join(f"{col} = %s" for col in ds)),
)
# Implementation notes: seems quite fast
pig = {v: ds[v].values.ravel().tolist() for v in ds}
pig["valid"] = [f"{valid:%Y-%m-%d}"] * (dom["nx"] * dom["ny"])
pig["gid"] = list(range(dom["nx"] * dom["ny"]))
sts = utc()
cursor.executemany(query, zip(*pig.values(), strict=False))
cursor.close()
pgconn.commit()
LOG.info(
"timing %.2f/s", dom["nx"] * dom["ny"] / (utc() - sts).total_seconds()
)
[docs]
def get_grids(
valid, varnames=None, cursor=None, table=None, domain: str = "conus"
):
"""Fetch grid(s) from the database, returning xarray.
Args:
valid (datetime or date): If datetime, load hourly, if date, load daily
varnames (str or list,optional): Which variables to fetch from database,
defaults to all available
cursor (database cursor,optional): cursor to use for query
table (str,optional): Hard coded table to fetch data from, useful in the
case of forecast data.
domain (str,optional): IEMRE domain to fetch data from
Returns:
``xarray.Dataset``"""
table = table if table is not None else get_table(valid)
dom = DOMAINS[domain]
if cursor is None:
pgconn = get_dbconn(d2l(domain))
cursor = pgconn.cursor()
# rectify varnames
if isinstance(varnames, str):
varnames = [varnames]
# Compute variable names
cursor.execute(
"SELECT column_name FROM information_schema.columns "
"WHERE table_schema = 'public' AND table_name = %s and "
"column_name not in ('gid', 'valid')",
(table,),
)
use_columns = []
for row in cursor:
if not varnames or row[0] in varnames:
use_columns.append(row[0]) # noqa
colsql = ",".join(use_columns)
cursor.execute(
f"SELECT (gid / %s)::int as y, gid %% %s as x, {colsql} "
f"from {table} WHERE valid = %s",
(dom["nx"], dom["nx"], valid),
)
data = {
key: np.full((dom["ny"], dom["nx"]), np.nan) for key in use_columns
}
for row in cursor:
for i, col in enumerate(use_columns):
data[col][row[0], row[1]] = row[2 + i]
return xr.Dataset(
dict((key, (["y", "x"], data[key])) for key in data),
coords={
"lon": (["x"], np.arange(dom["west"], dom["east"] + 0.001, DX)),
"lat": (["y"], np.arange(dom["south"], dom["north"] + 0.001, DY)),
},
)
[docs]
def get_dailyc_ncname(domain: str = "conus") -> str:
"""Return the filename of the daily climatology netcdf file"""
return f"/mesonet/data/{d2l(domain)}/{d2l(domain)}_dailyc.nc"
[docs]
def get_daily_ncname(year, domain: str = "conus") -> str:
"""Get the daily netcdf filename for the given year"""
return f"/mesonet/data/{d2l(domain)}/{year}_{d2l(domain)}_daily.nc"
[docs]
def get_dailyc_mrms_ncname():
"""Get the MRMS daily climatology filename"""
return "/mesonet/data/mrms/mrms_dailyc.nc"
[docs]
def get_daily_mrms_ncname(year):
"""Get the daily netcdf MRMS filename for the given year"""
return f"/mesonet/data/mrms/{year}_mrms_daily.nc"
[docs]
def get_hourly_ncname(year, domain: str = "conus") -> str:
"""Get the daily netcdf filename for the given year"""
return f"/mesonet/data/{d2l(domain)}/{year}_{d2l(domain)}_hourly.nc"
[docs]
def daily_offset(ts):
"""Compute the timestamp index in the netcdf file"""
# In case ts is passed here as a datetime.date object
ts = datetime(ts.year, ts.month, ts.day)
base = ts.replace(
month=1, day=1, hour=0, minute=0, second=0, microsecond=0
)
days = (ts - base).days
return int(days)
[docs]
def hourly_offset(dtobj):
"""Return time index for given timestamp
Args:
dtobj (datetime): datetime, if no tzinfo, we assume it is UTC
Returns:
int time index in the netcdf file
"""
if dtobj.tzinfo and dtobj.tzinfo != timezone.utc:
dtobj = dtobj.astimezone(timezone.utc)
base = dtobj.replace(
month=1, day=1, hour=0, minute=0, second=0, microsecond=0
)
seconds = (dtobj - base).total_seconds()
return int(seconds / 3600.0)
[docs]
def find_ij(
lon: float, lat: float, domain: str = "conus"
) -> Tuple[Optional[int], Optional[int]]:
"""Return the i, j grid indices (based 0) for given lat/lon."""
dom = DOMAINS[domain]
if (
lon < dom["west_edge"]
or lon >= dom["east_edge"]
or lat < dom["south_edge"]
or lat >= dom["north_edge"]
):
return None, None
i = int((lon - dom["west_edge"]) / DX)
j = int((lat - dom["south_edge"]) / DY)
return i, j
[docs]
def get_domain(lon: float, lat: float) -> Optional[str]:
"""Compute the domain that contains the given point."""
for domain, dom in DOMAINS.items():
if (
dom["west_edge"] <= lon < dom["east_edge"]
and dom["south_edge"] <= lat < dom["north_edge"]
):
return domain
return None
[docs]
def get_gid(lon, lat, domain: str = "conus") -> Optional[int]:
"""Compute the grid id for the given location."""
i, j = find_ij(lon, lat, domain)
if i is None:
return None
return j * DOMAINS[domain]["nx"] + i
[docs]
def grb2iemre(grb, resampling=None, domain: str = "conus") -> np.ndarray:
"""Reproject a grib message onto the IEMRE grid.
A helper frontend to ``reproject2iemre``.
Args:
grb (pygrib.gribmessage): single message to reproject
resampling (rasterio.warp.Resampling,optional): defaults to nearest
domain (str): IEMRE domain to reproject onto
Returns:
numpy.ma.array of reprojected grid oriented S to N like IEMRE
"""
pparams = grb.projparams
lat1 = grb["latitudeOfFirstGridPointInDegrees"]
lon1 = grb["longitudeOfFirstGridPointInDegrees"]
llx, lly = pyproj.Proj(pparams)(lon1, lat1)
# The reprojected first grid cell is the centroid, not the outer edge
aff = Affine(
grb["DxInMetres"],
0.0,
llx - grb["DxInMetres"] / 2.0,
0.0,
-grb["DyInMetres"],
lly + grb["DyInMetres"] * grb["Ny"] + grb["DyInMetres"] / 2.0,
)
return reproject2iemre(
np.flipud(grb.values), aff, pparams, resampling, domain
)
[docs]
def reproject2iemre(
grid, affine_in, crs_in: str, resampling=None, domain: str = "conus"
):
"""Reproject the given grid to IEMRE grid, returning S to N oriented grid.
Note: If the affine_in is dy > 0 then the grid is assumed to be S to N.
Args:
grid (numpy.array): 2D grid to reproject
affine_in (affine.Affine): affine transform of input grid
crs_in (pyproj.Proj): projection of input grid
resampling (rasterio.warp.Resampling,optional): defaults to nearest
domain (str): IEMRE domain to reproject onto
Returns:
numpy.ma.array of reprojected grid oriented S to N like IEMRE
"""
dom = DOMAINS[domain]
data = np.zeros((dom["ny"], dom["nx"]), float)
# If source is a masked array, we need to fill it
src_is_masked = hasattr(grid, "mask")
if src_is_masked:
grid = grid.filled(np.nan)
reproject(
grid,
data,
src_transform=affine_in,
src_crs=crs_in,
dst_transform=(
dom["affine"] if affine_in.e < 0 else dom["affine_native"]
),
dst_crs={"init": "EPSG:4326"},
dst_nodata=np.nan,
resampling=(
resampling if resampling is not None else Resampling.nearest
),
)
data = np.ma.array(data, mask=np.isnan(data))
return data if affine_in.e > 0 else np.flipud(data)