# -*- coding: utf-8 -*-
# pylint: disable=unbalanced-tuple-unpacking
"""Utility functions for pyIEM package
This module contains utility functions used by various parts of the codebase.
"""
import logging
import os
import random
import re
import subprocess
import sys
import tempfile
import time
import warnings
from contextlib import contextmanager
from datetime import date, datetime, timedelta, timezone
from html import escape
# !important
# Lazy import everything possible here, since this module is imported by
# most everything.
import httpx
import numpy as np # used too many places
# NB: careful with circular imports!
from pyiem import database
from pyiem.exceptions import IncompleteWebRequest, UnknownStationException
from pyiem.network import Table as NetworkTable
SEQNUM = re.compile(r"^[0-9]{3}\s?$")
# Setup a default logging instance for this module
LOG = logging.getLogger("pyiem")
LOG.addHandler(logging.NullHandler())
WFO_FOURCHAR = ["AFG", "GUM", "AFG", "HFO", "AFC", "AJK"]
def _strptime(ins: str, fmt: str, rectify: bool = False) -> datetime:
"""Wrapper around strptime."""
# Forgive an encoded space
ins = ins.replace("+", " ")
try:
return datetime.strptime(ins, fmt)
except ValueError as exp:
if rectify:
return handle_date_err(exp, ins, fmt)
raise IncompleteWebRequest(
f"String provided: `{ins}` does not match format: `{fmt}`"
) from exp
[docs]
def ddhhmm2datetime(ddhhmm: str, utcnow: datetime) -> datetime:
"""Do the nasty WMO header timestamp conversion."""
wmo_day = int(ddhhmm[:2])
wmo_hour = int(ddhhmm[2:4])
wmo_minute = int(ddhhmm[4:])
wmo_valid = utcnow.replace(
hour=wmo_hour, minute=wmo_minute, second=0, microsecond=0
)
if wmo_day != utcnow.day:
if wmo_day - utcnow.day == 1: # Tomorrow
wmo_valid = wmo_valid.replace(day=wmo_day)
elif wmo_day > 25 and utcnow.day < 15: # Previous month!
wmo_valid = wmo_valid + timedelta(days=-10)
wmo_valid = wmo_valid.replace(day=wmo_day)
elif wmo_day < 5 and utcnow.day >= 15: # next month
wmo_valid = wmo_valid + timedelta(days=10)
wmo_valid = wmo_valid.replace(day=wmo_day)
else:
wmo_valid = wmo_valid.replace(day=wmo_day)
return wmo_valid
[docs]
def web2ldm(url, ldm_product_name, md5_from_name=False, pqinsert="pqinsert"):
"""Download a URL and insert into LDM.
Implements a common IEM workflow whereby a web resource is downloaded,
saved to a temporary file, and then inserted into LDM.
Args:
url (str): Web resource to download.
ldm_product_name (str): LDM product ID to use when inserting.
md5_from_name (bool): Should `pqinsert -i` be used, which causes LDM
to compute the MD5 value from the product name instead of data bytes.
pqinsert (str): pqinsert command.
Returns:
bool - success of this workflow.
"""
resp = httpx.get(url, timeout=60)
if resp.status_code != 200:
return False
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp:
tmp.write(resp.content)
args = [pqinsert, "-p", ldm_product_name, tmp.name]
if md5_from_name:
args.insert(1, "-i")
try:
with subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
stderr = proc.stderr.read()
res = True
if stderr != b"":
LOG.info("pqinsert stderr result %s", stderr)
res = False
except FileNotFoundError:
LOG.info("Failed to find `pqinsert` in $PATH")
res = False
os.unlink(tmp.name)
return res
[docs]
def load_geodf(dataname: str, epsg: int = 4326):
"""Load a given bundled GeoDataFrame.
Args:
dataname (str): The name of the dataset name to load.
Returns:
GeoDataFrame
"""
import geopandas as gpd
preproj = (
f"/opt/miniconda3/pyiem_data/parquet/{epsg}/geodf/{dataname}.parquet"
)
if os.path.isfile(preproj):
fn = preproj
else:
fn = os.path.join(
os.path.dirname(__file__), "data", "geodf", f"{dataname}.parquet"
)
if not os.path.isfile(fn):
LOG.info("load_geodf(%s) failed, file is missing!", fn)
return gpd.GeoDataFrame()
return gpd.read_parquet(fn)
[docs]
def convert_value(val, units_in, units_out):
"""DRY Helper to return magnitude of a metpy unit conversion.
Args:
val (mixed): something with values.
units_in (str): What units those values have.
units_out (str): What values we want with given magnitude.
Returns:
mixed: magnitude of val with unit conversion applied
"""
from metpy.units import masked_array, units
return masked_array(val, units(units_in)).to(units(units_out)).m
[docs]
def c2f(val):
"""Helper to return magnitude of Celcius to Fahrenheit conversion.
Args:
val (mixed): something with values in C
Returns:
val: something with values in F
"""
return convert_value(val, "degC", "degF")
[docs]
def mm2inch(val):
"""Helper to return magnitude of milimeters to inch conversion.
Args:
val (mixed): something with values in mm
Returns:
val: something with values in inch
"""
return convert_value(val, "mm", "inch")
[docs]
def html_escape(val):
"""Wrapper around cgi.escape deprecation."""
return escape(val)
[docs]
def get_test_filepath(name: str) -> str:
"""Helper to get a testing filename, full path."""
return f"{os.getcwd()}/data/product_examples/{name}"
[docs]
def get_test_file(name):
"""Helper to get data for test usage."""
with open(get_test_filepath(name), "rb") as fp:
return fp.read().decode("utf-8")
[docs]
def logger(name="pyiem", level=None):
"""Get pyiem's logger with a stream handler attached.
Args:
name (str): The name of the logger to get, default pyiem
level (logging.LEVEL): The log level for this pyiem logget, default is
WARNING for non interactive sessions, INFO otherwise
Returns:
logger instance
"""
ch = logging.StreamHandler()
ch.setFormatter(CustomFormatter())
log = logging.getLogger(name)
log.addHandler(ch)
if level is None and sys.stdout.isatty():
level = logging.INFO
log.setLevel(level if level is not None else logging.WARNING)
return log
[docs]
def find_ij(lons, lats, lon, lat):
"""Compute the i,j closest cell."""
dist = ((lons - lon) ** 2 + (lats - lat) ** 2) ** 0.5
(xidx, yidx) = np.unravel_index(dist.argmin(), dist.shape)
return xidx, yidx
[docs]
def ssw(mixedobj):
"""python23 wrapper for sys.stdout.write
Args:
mixedobj (str or bytes): what content we want to send
"""
stdout = getattr(sys.stdout, "buffer", sys.stdout)
if isinstance(mixedobj, str):
stdout.write(mixedobj.encode("utf-8"))
else:
stdout.write(mixedobj)
[docs]
def ncopen(ncfn, mode="r", timeout=60, _sleep=5):
"""Safely open netcdf files
The issue here is that we can only have the following situation for a
given NetCDF file.
1. Only 1 or more readers
2. Only 1 appender
The netcdf is being accessed over NFS and perhaps local disk, so writing
lock files is problematic.
Args:
ncfn (str): The netCDF filename
mode (str,optional): The netCDF4.Dataset open mode, default 'r'
timeout (int): The total time in seconds to attempt a read, default 60
Returns:
`netCDF4.Dataset` or `None`
"""
import netCDF4
if mode != "w" and not os.path.isfile(ncfn):
raise IOError(f"No such file {ncfn}")
sts = datetime.now(timezone.utc)
nc = None
while (datetime.now(timezone.utc) - sts).total_seconds() < timeout:
try:
nc = netCDF4.Dataset(ncfn, mode)
nc.set_auto_scale(True)
break
except (OSError, IOError) as exp:
LOG.debug(exp)
time.sleep(_sleep)
return nc
[docs]
def utc(year=None, month=1, day=1, hour=0, minute=0, second=0, microsecond=0):
"""Create a datetime instance with tzinfo=timezone.utc
When no arguments are provided, returns `datetime.now(timezone.utc)`.
Returns:
datetime with tzinfo set
"""
if year is None:
return datetime.now(timezone.utc)
return datetime(
year, month, day, hour, minute, second, microsecond
).replace(tzinfo=timezone.utc)
[docs]
def noaaport_text(text):
"""Make whatever text look like it is NOAAPort Pristine
Args:
text (string): the inbound text
Returns:
text that looks noaaportish
"""
# Rectify the text to remove any stray stuff
text = text.replace("\003", "").replace("\001", "").replace("\r", "")
# trim any right hand space
lines = [x.rstrip() for x in text.split("\n")]
# remove any beginning empty lines
for pos in [0, -1]:
while lines and lines[pos].strip() == "":
lines.pop(pos)
# lime 0 should be start of product sequence
lines.insert(0, "\001")
# line 1 should be the LDM sequence number 4 chars
if not SEQNUM.match(lines[1]):
if len(lines[1]) > 5:
lines.insert(1, "000 ")
else:
lines[1] = f"{lines[1][:3]} "
# last line should be the control-c, by itself
lines.append("\003")
return "\r\r\n".join(lines)
[docs]
def handle_date_err(exp, value, fmt):
"""Attempt to fix up a date string, when possible."""
if str(exp).find("day is out of range") == -1:
raise exp
tokens = value.split(" ")
datepart = tokens[0]
(yyyy, mm, _dd) = datepart.split("-")
# construct a new month date and then substract one day
lastday = (date(int(yyyy), int(mm), 15) + timedelta(days=20)).replace(
day=1
) - timedelta(days=1)
# Reconstruct the date string
res = lastday.strftime("%Y-%m-%d")
if len(tokens) > 1:
res += " " + tokens[1]
# Careful here, we don't want a recursive loop
return _strptime(res, fmt, rectify=False)
[docs]
def get_autoplot_context(fdict, cfg, enforce_optional=False, **kwargs):
"""Get the variables out of a dict of strings
This helper for IEM autoplot gets values out of a dictionary of strings,
as provided by CGI. It does some magic to get types right, defaults right
and so on. The typical way this is called
ctx = iemutils.get_context(fdict, get_description())
Args:
fdict (dictionary): what was likely provided by `cgi.FieldStorage()`
cfg (dictionary): autoplot value of get_description
enforce_optional (bool,optional): Should the `optional` flag be enforced
rectify_dates (bool,optional): Attempt to fix common date errors like
June 31. Default `false`.
Returns:
dictionary of variable names and values, with proper types!
"""
import matplotlib
from pyiem.reference import state_names
def _float(val):
"""Convert string to float, if possible."""
try:
return float(val)
except ValueError as exp:
raise IncompleteWebRequest(f"Invalid float value: {val}") from exp
ctx = {}
# Check for DPI setting
val = fdict.get("dpi")
if val is not None:
ctx["dpi"] = int(val)
# Check for internal params
for key in filter(lambda x: x.startswith("_"), fdict.keys()):
ctx[key] = html_escape(fdict[key])
# Check over autoplot provided arguments
for opt in cfg.get("arguments", []):
name = opt.get("name")
default = opt.get("default")
typ = opt.get("type")
minval = opt.get("min")
maxval = opt.get("max")
optional = opt.get("optional", False)
value = fdict.get(name)
# vtec_ps is special since we have special logic to get its value
if (
optional
and typ != "vtec_ps"
and (
value is None
or (enforce_optional and fdict.get(f"_opt_{name}") != "on")
)
):
continue
if typ == "text":
pattern = opt.get("pattern")
if pattern is not None and not re.match(pattern, value):
value = default
elif typ in ["station", "zstation", "sid", "networkselect"]:
# A bit of hackery here if we have a name ending in a number
_n = name[-1] if name[-1] in ["1", "2", "3", "4", "5"] else ""
netname = f"network{_n}"
# The network variable tags along and within a non-PHP context,
# this variable is unset, so we do some more hackery here
ctx[netname] = fdict.get(netname, opt.get("network"))
# Convience we load up the network metadata
ntname = f"_nt{_n}"
ctx[ntname] = NetworkTable(ctx[netname], only_online=False)
# stations starting with _ are virtual and should not error
if value is None:
value = default
if not value.startswith("_") and value not in ctx[ntname].sts:
# HACK for three/four char ugliness
if ctx[netname] == "WFO" and value in WFO_FOURCHAR:
value = f"P{value}"
elif ctx[netname] == "WFO" and value in ["JSJ", "SJU"]:
value = "TJSJ"
else:
raise UnknownStationException("Unknown station provided.")
# A helper to remove downstream boilerplate
sname = ctx[ntname].sts.get(value, {"name": f"(({value}))"})[
"name"
]
ctx[f"_sname{_n}"] = f"[{value}] {sname}"
elif typ == "cmap":
# Ensure that our value is a valid colormap known to matplotlib
if value not in matplotlib.colormaps:
value = default
elif typ in ["int", "month", "zhour", "hour", "day", "year"]:
if value is not None:
value = int(_float(value))
if default is not None:
default = int(_float(default))
elif typ == "float":
if value is not None:
value = _float(value)
if default is not None:
default = _float(default)
elif typ == "state":
if value is not None:
value = value.upper()
if value not in state_names and default is not None:
value = default
elif typ == "select":
options = opt.get("options", {})
# Allow for legacy variable aliases
alias = opt.get("alias", {})
# in case of multi, value could be a list
if value is None:
value = default
elif isinstance(value, str):
if value in alias:
value = alias[value]
if value not in options:
value = default
if opt.get("multiple"):
value = [value]
else:
newvalue = []
for subval in value:
if subval in alias:
subval = alias[subval]
if subval in options:
newvalue.append(subval)
value = newvalue
elif typ == "datetime":
# tricky here, php has YYYY/mm/dd and CGI has YYYY-mm-dd
if value is not None and value.strip() == "":
value = default
if default is not None:
default = _strptime(default, "%Y/%m/%d %H%M")
if minval is not None:
minval = _strptime(minval, "%Y/%m/%d %H%M")
if maxval is not None:
maxval = _strptime(maxval, "%Y/%m/%d %H%M")
if value is not None:
# A common problem is for the space to be missing
if value.find(" ") == -1:
if len(value) == 14:
value = f"{value[:10]} {value[10:]}"
else:
value += " 0000"
value = _strptime(
value[:15].replace("/", "-"),
"%Y-%m-%d %H%M",
rectify=kwargs.get("rectify_dates", False),
)
elif typ == "sday":
# supports legacy uris with yyyy-mm-dd, before migration to sday
if default is not None:
default = _strptime(f"2000{default}", "%Y%m%d").date()
if minval is not None:
minval = _strptime(f"2000{minval}", "%Y%m%d").date()
if maxval is not None:
maxval = _strptime(f"2000{maxval}", "%Y%m%d").date()
if value is not None:
if value.find("-") > -1:
value = _strptime(value, "%Y-%m-%d").date()
else:
value = _strptime(f"2000{value}", "%Y%m%d").date()
elif typ == "date":
# tricky here, php has YYYY/mm/dd and CGI has YYYY-mm-dd
if default is not None:
default = _strptime(default, "%Y/%m/%d").date()
if minval is not None:
minval = _strptime(minval, "%Y/%m/%d").date()
if maxval is not None:
maxval = _strptime(maxval, "%Y/%m/%d").date()
if value is not None:
value = _strptime(
value,
"%Y-%m-%d",
rectify=kwargs.get("rectify_dates", False),
).date()
elif typ == "vtec_ps":
# VTEC phenomena and significance
defaults = {}
# Only set a default value when the field is not optional
if default is not None and not optional:
tokens = default.split(".")
if (
len(tokens) == 2
and len(tokens[0]) == 2
and len(tokens[1]) == 1
):
defaults["phenomena"] = tokens[0]
defaults["significance"] = tokens[1]
for label in ["phenomena", "significance"]:
label2 = label + name
ctx[label2] = fdict.get(label2, defaults.get(label))
# Prevent empty strings from being set
if ctx[label2] is not None and ctx[label2] == "":
ctx[label2] = defaults.get(label)
continue
elif typ == "dat":
# Damage Assessment Toolkit
ctx["datglobalid"] = fdict.get("datglobalid")
# validation
if minval is not None and value is not None and value < minval:
value = default
if maxval is not None and value is not None and value > maxval:
value = default
ctx[name] = value if value is not None else default
# Ensure defaults are set, if they exist
for key in cfg.get("defaults", {}):
if key not in ctx:
ctx[key] = cfg["defaults"][key]
return ctx
[docs]
def exponential_backoff(func, *args, **kwargs):
"""Exponentially backoff some function until it stops erroring
Args:
_ebfactor (int,optional): Optional scale factor, allowing for faster test
"""
ebfactor = float(kwargs.pop("_ebfactor", 5))
msgs = []
for i in range(5):
try:
return func(*args, **kwargs)
except Exception as exp:
msgs.append(f"{i + 1}/5 {func.__name__} traceback: {exp}")
time.sleep(ebfactor * (random.randint(0, 1000) / 1000))
logging.error("%s failure", func.__name__)
logging.error("\n".join(msgs))
return None
[docs]
def delete_property(name, cursor=None):
"""Delete a property from the database.
Args:
name (str): The name of the property to delete
cursor (psycopg2.cursor): Optional database cursor to use
"""
if cursor is None:
pgconn, _cursor = database.get_dbconnc("mesosite")
else:
_cursor = cursor
_cursor.execute("DELETE from properties WHERE propname = %s", (name,))
if cursor is None:
pgconn.commit()
pgconn.close()
[docs]
def get_properties(cursor=None):
"""Fetch the properties set
Returns:
dict: a dictionary of property names and values (both str)
"""
if cursor is None:
pgconn, _cursor = database.get_dbconnc("mesosite")
else:
_cursor = cursor
_cursor.execute("SELECT propname, propvalue from properties")
res = {}
for row in _cursor:
res[row["propname"]] = row["propvalue"]
if cursor is None:
pgconn.close()
return res
[docs]
def set_property(name, value, cursor=None):
"""
Set a property value in the database.
Args:
name (str): The name of the property to set
value (str,datetime): The value to set
cursor (psycopg2.cursor): Optional database cursor to use
"""
from pyiem.reference import ISO8601
if cursor is None:
pgconn, _cursor = database.get_dbconnc("mesosite")
else:
_cursor = cursor
# auto convert datetime to ISO8601 string
if isinstance(value, datetime):
value = value.strftime(ISO8601)
_cursor.execute(
"UPDATE properties SET propvalue = %s WHERE propname = %s",
(value, name),
)
if _cursor.rowcount == 0:
_cursor.execute(
"INSERT into properties (propname, propvalue) VALUES (%s, %s)",
(name, value),
)
if cursor is None:
pgconn.commit()
pgconn.close()
[docs]
def drct2text(drct):
"""Convert an degree value to text representation of direction.
Args:
drct (int or float): Value in degrees to convert to text
Returns:
str: String representation of the direction, could be `None`
"""
if drct is None:
return None
# Convert the value into a float
drct = float(drct)
if drct > 360 or drct < 0:
return None
text = None
if drct >= 350 or drct < 13:
text = "N"
elif drct < 35:
text = "NNE"
elif drct < 57:
text = "NE"
elif drct < 80:
text = "ENE"
elif drct < 102:
text = "E"
elif drct < 127:
text = "ESE"
elif drct < 143:
text = "SE"
elif drct < 166:
text = "SSE"
elif drct < 190:
text = "S"
elif drct < 215:
text = "SSW"
elif drct < 237:
text = "SW"
elif drct < 260:
text = "WSW"
elif drct < 281:
text = "W"
elif drct < 304:
text = "WNW"
elif drct < 324:
text = "NW"
else:
text = "NNW"
return text
[docs]
def grid_bounds(lons, lats, bounds):
"""Figure out indices that we can truncate big grid
Args:
lons (np.array): grid lons
lats (np.array): grid lats
bounds (list): [x0, y0, x1, y1]
Returns:
[x0, y0, x1, y1]
"""
if len(lons.shape) == 1:
# Do 1-d work
(x0, x1) = np.digitize([bounds[0], bounds[2]], lons)
(y0, y1) = np.digitize([bounds[1], bounds[3]], lats)
szx = len(lons)
szy = len(lats)
else:
# Do 2-d work
diff = ((lons - bounds[0]) ** 2 + (lats - bounds[1]) ** 2) ** 0.5
(lly, llx) = np.unravel_index(np.argmin(diff), lons.shape)
diff = ((lons - bounds[2]) ** 2 + (lats - bounds[3]) ** 2) ** 0.5
(ury, urx) = np.unravel_index(np.argmin(diff), lons.shape)
diff = ((lons - bounds[0]) ** 2 + (lats - bounds[3]) ** 2) ** 0.5
(uly, ulx) = np.unravel_index(np.argmin(diff), lons.shape)
diff = ((lons - bounds[2]) ** 2 + (lats - bounds[1]) ** 2) ** 0.5
(lry, lrx) = np.unravel_index(np.argmin(diff), lons.shape)
x0 = min([llx, ulx])
x1 = max([lrx, urx])
y0 = min([lry, lly])
y1 = max([uly, ury])
(szy, szx) = lons.shape
return [
int(i)
for i in [max([0, x0]), max([0, y0]), min([szx, x1]), min([szy, y1])]
]
[docs]
@contextmanager
def archive_fetch(
partialpath: str,
localdir: str = "/mesonet/ARCHIVE/data",
method: str = "get",
):
"""
Helper to fetch a file from the archive, by first looking at the filesystem
and then going to the website. This returns a filename. If a temporary
file is created, it is deleted after the context manager exits.
Args:
partialpath (str): Typically a path that starts with /YYYY/mm/dd
method (str): HTTP method to use, default 'get', in the case of head,
we only check the existence and return an empty string if found.
Returns:
str: filename of the file found and available for use
"""
# Ensure partialpath does not start with a /
if partialpath.startswith("/"):
partialpath = partialpath[1:]
localfn = os.path.join(localdir, partialpath)
if os.path.isfile(localfn):
yield localfn
return
url = f"http://mesonet.agron.iastate.edu/archive/data/{partialpath}"
tmp = None
suffix = "." + os.path.basename(partialpath).split(".")[-1]
try:
resp = httpx.request(method.upper(), url, timeout=30)
resp.raise_for_status()
if method.lower() == "head":
yield ""
return
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(resp.content)
yield tmp.name
return
except Exception as exp:
LOG.info("archive_fetch(%s) failed: %s", url, exp)
yield None
finally:
if tmp is not None:
os.unlink(tmp.name)
[docs]
def __getattr__(name: str):
"""Shim to allow deprecation notices to start."""
if name.startswith("get_dbconn") or name == "get_sqlalchemy_conn":
warnings.warn(
f"{name} has been moved to pyiem.database.{name}, removed in 1.26",
DeprecationWarning,
stacklevel=2,
)
return getattr(database, name)
raise AttributeError(f"module {__name__} has no attribute {name}")