# -*- coding: utf-8 -*-
# pylint: disable=unbalanced-tuple-unpacking
"""Utility functions for pyIEM package
This module contains utility functions used by various parts of the codebase.
"""
import logging
import os
import random
import re
import subprocess
import sys
import tempfile
import time
import warnings
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from html import escape
# !important
# Lazy import everything possible here, since this module is imported by
# most everything.
import httpx
import numpy as np # used too many places
# NB: careful with circular imports!
from pyiem import database
SEQNUM = re.compile(r"^[0-9]{3}\s?$")
# Setup a default logging instance for this module
LOG = logging.getLogger("pyiem")
LOG.addHandler(logging.NullHandler())
WFO_FOURCHAR = ["AFG", "GUM", "AFG", "HFO", "AFC", "AJK"]
[docs]
def get_autoplot_context(fdict, cfg, enforce_optional=False, **kwargs):
"""Consider migrating to ``pyiem.autoplot.get_autoplot_context``."""
from .autoplot import get_autoplot_context as real_func
return real_func(fdict, cfg, enforce_optional=enforce_optional, **kwargs)
[docs]
def ddhhmm2datetime(ddhhmm: str, utcnow: datetime) -> datetime:
"""Do the nasty WMO header timestamp conversion."""
wmo_day = int(ddhhmm[:2])
wmo_hour = int(ddhhmm[2:4])
wmo_minute = int(ddhhmm[4:])
wmo_valid = utcnow.replace(
hour=wmo_hour, minute=wmo_minute, second=0, microsecond=0
)
if wmo_day != utcnow.day:
if wmo_day - utcnow.day == 1: # Tomorrow
wmo_valid = wmo_valid.replace(day=wmo_day)
elif wmo_day > 25 and utcnow.day < 15: # Previous month!
wmo_valid = wmo_valid + timedelta(days=-10)
wmo_valid = wmo_valid.replace(day=wmo_day)
elif wmo_day < 5 and utcnow.day >= 15: # next month
wmo_valid = wmo_valid + timedelta(days=10)
wmo_valid = wmo_valid.replace(day=wmo_day)
else:
wmo_valid = wmo_valid.replace(day=wmo_day)
return wmo_valid
[docs]
def web2ldm(url, ldm_product_name, md5_from_name=False, pqinsert="pqinsert"):
"""Download a URL and insert into LDM.
Implements a common IEM workflow whereby a web resource is downloaded,
saved to a temporary file, and then inserted into LDM.
Args:
url (str): Web resource to download.
ldm_product_name (str): LDM product ID to use when inserting.
md5_from_name (bool): Should `pqinsert -i` be used, which causes LDM
to compute the MD5 value from the product name instead of data bytes.
pqinsert (str): pqinsert command.
Returns:
bool - success of this workflow.
"""
resp = httpx.get(url, timeout=60)
if resp.status_code != 200:
return False
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp:
tmp.write(resp.content)
args = [pqinsert, "-p", ldm_product_name, tmp.name]
if md5_from_name:
args.insert(1, "-i")
try:
with subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
stderr = proc.stderr.read()
res = True
if stderr != b"":
LOG.info("pqinsert stderr result %s", stderr)
res = False
except FileNotFoundError:
LOG.info("Failed to find `pqinsert` in $PATH")
res = False
os.unlink(tmp.name)
return res
[docs]
def load_geodf(dataname: str, epsg: int = 4326):
"""Load a given bundled GeoDataFrame.
Args:
dataname (str): The name of the dataset name to load.
Returns:
GeoDataFrame
"""
import geopandas as gpd
preproj = (
f"/opt/miniconda3/pyiem_data/parquet/{epsg}/geodf/{dataname}.parquet"
)
if os.path.isfile(preproj):
fn = preproj
else:
fn = os.path.join(
os.path.dirname(__file__), "data", "geodf", f"{dataname}.parquet"
)
if not os.path.isfile(fn):
LOG.info("load_geodf(%s) failed, file is missing!", fn)
return gpd.GeoDataFrame()
return gpd.read_parquet(fn)
[docs]
def convert_value(val, units_in, units_out):
"""DRY Helper to return magnitude of a metpy unit conversion.
Args:
val (mixed): something with values.
units_in (str): What units those values have.
units_out (str): What values we want with given magnitude.
Returns:
mixed: magnitude of val with unit conversion applied
"""
from metpy.units import masked_array, units
return masked_array(val, units(units_in)).to(units(units_out)).m
[docs]
def c2f(val):
"""Helper to return magnitude of Celcius to Fahrenheit conversion.
Args:
val (mixed): something with values in C
Returns:
val: something with values in F
"""
return convert_value(val, "degC", "degF")
[docs]
def mm2inch(val):
"""Helper to return magnitude of milimeters to inch conversion.
Args:
val (mixed): something with values in mm
Returns:
val: something with values in inch
"""
return convert_value(val, "mm", "inch")
[docs]
def html_escape(val):
"""Wrapper around cgi.escape deprecation."""
return escape(val)
[docs]
def get_test_filepath(name: str) -> str:
"""Helper to get a testing filename, full path."""
return f"{os.getcwd()}/data/product_examples/{name}"
[docs]
def get_test_file(name):
"""Helper to get data for test usage."""
with open(get_test_filepath(name), "rb") as fp:
return fp.read().decode("utf-8")
[docs]
def logger(name="pyiem", level=None):
"""Get pyiem's logger with a stream handler attached.
Args:
name (str): The name of the logger to get, default pyiem
level (logging.LEVEL): The log level for this pyiem logget, default is
WARNING for non interactive sessions, INFO otherwise
Returns:
logger instance
"""
ch = logging.StreamHandler()
ch.setFormatter(CustomFormatter())
log = logging.getLogger(name)
log.addHandler(ch)
if level is None and sys.stdout.isatty():
level = logging.INFO
log.setLevel(level if level is not None else logging.WARNING)
return log
[docs]
def find_ij(lons, lats, lon, lat):
"""Compute the i,j closest cell."""
dist = ((lons - lon) ** 2 + (lats - lat) ** 2) ** 0.5
(xidx, yidx) = np.unravel_index(dist.argmin(), dist.shape)
return xidx, yidx
[docs]
def ssw(mixedobj):
"""python23 wrapper for sys.stdout.write
Args:
mixedobj (str or bytes): what content we want to send
"""
stdout = getattr(sys.stdout, "buffer", sys.stdout)
if isinstance(mixedobj, str):
stdout.write(mixedobj.encode("utf-8"))
else:
stdout.write(mixedobj)
[docs]
def ncopen(ncfn, mode="r", timeout=60, _sleep=5):
"""Safely open netcdf files
The issue here is that we can only have the following situation for a
given NetCDF file.
1. Only 1 or more readers
2. Only 1 appender
The netcdf is being accessed over NFS and perhaps local disk, so writing
lock files is problematic.
Args:
ncfn (str): The netCDF filename
mode (str,optional): The netCDF4.Dataset open mode, default 'r'
timeout (int): The total time in seconds to attempt a read, default 60
Returns:
`netCDF4.Dataset` or `None`
"""
import netCDF4
if mode != "w" and not os.path.isfile(ncfn):
raise IOError(f"No such file {ncfn}")
sts = datetime.now(timezone.utc)
nc = None
while (datetime.now(timezone.utc) - sts).total_seconds() < timeout:
try:
nc = netCDF4.Dataset(ncfn, mode)
nc.set_auto_scale(True)
break
except (OSError, IOError) as exp:
LOG.debug(exp)
time.sleep(_sleep)
return nc
[docs]
def utc(year=None, month=1, day=1, hour=0, minute=0, second=0, microsecond=0):
"""Create a datetime instance with tzinfo=timezone.utc
When no arguments are provided, returns `datetime.now(timezone.utc)`.
Returns:
datetime with tzinfo set
"""
if year is None:
return datetime.now(timezone.utc)
return datetime(
year, month, day, hour, minute, second, microsecond
).replace(tzinfo=timezone.utc)
[docs]
def noaaport_text(text):
"""Make whatever text look like it is NOAAPort Pristine
Args:
text (string): the inbound text
Returns:
text that looks noaaportish
"""
# Rectify the text to remove any stray stuff
text = text.replace("\003", "").replace("\001", "").replace("\r", "")
# trim any right hand space
lines = [x.rstrip() for x in text.split("\n")]
# remove any beginning empty lines
for pos in [0, -1]:
while lines and lines[pos].strip() == "":
lines.pop(pos)
# lime 0 should be start of product sequence
lines.insert(0, "\001")
# line 1 should be the LDM sequence number 4 chars
if not SEQNUM.match(lines[1]):
if len(lines[1]) > 5:
lines.insert(1, "000 ")
else:
lines[1] = f"{lines[1][:3]} "
# last line should be the control-c, by itself
lines.append("\003")
return "\r\r\n".join(lines)
[docs]
def exponential_backoff(func, *args, **kwargs):
"""Exponentially backoff some function until it stops erroring
Args:
_ebfactor (int,optional): Optional scale factor, allowing for faster test
"""
ebfactor = float(kwargs.pop("_ebfactor", 5))
msgs = []
for i in range(5):
try:
return func(*args, **kwargs)
except Exception as exp:
msgs.append(f"{i + 1}/5 {func.__name__} traceback: {exp}")
time.sleep(ebfactor * (random.randint(0, 1000) / 1000))
logging.error("%s failure", func.__name__)
logging.error("\n".join(msgs))
return None
[docs]
def delete_property(name, cursor=None):
"""Delete a property from the database.
Args:
name (str): The name of the property to delete
cursor (psycopg2.cursor): Optional database cursor to use
"""
if cursor is None:
pgconn, _cursor = database.get_dbconnc("mesosite")
else:
_cursor = cursor
_cursor.execute("DELETE from properties WHERE propname = %s", (name,))
if cursor is None:
pgconn.commit()
pgconn.close()
[docs]
def get_properties(cursor=None):
"""Fetch the properties set
Returns:
dict: a dictionary of property names and values (both str)
"""
if cursor is None:
pgconn, _cursor = database.get_dbconnc("mesosite")
else:
_cursor = cursor
_cursor.execute("SELECT propname, propvalue from properties")
res = {}
for row in _cursor:
res[row["propname"]] = row["propvalue"]
if cursor is None:
pgconn.close()
return res
[docs]
def set_property(name, value, cursor=None):
"""
Set a property value in the database.
Args:
name (str): The name of the property to set
value (str,datetime): The value to set
cursor (psycopg2.cursor): Optional database cursor to use
"""
from pyiem.reference import ISO8601
if cursor is None:
pgconn, _cursor = database.get_dbconnc("mesosite")
else:
_cursor = cursor
# auto convert datetime to ISO8601 string
if isinstance(value, datetime):
value = value.strftime(ISO8601)
_cursor.execute(
"UPDATE properties SET propvalue = %s WHERE propname = %s",
(value, name),
)
if _cursor.rowcount == 0:
_cursor.execute(
"INSERT into properties (propname, propvalue) VALUES (%s, %s)",
(name, value),
)
if cursor is None:
pgconn.commit()
pgconn.close()
[docs]
def drct2text(drct):
"""Convert an degree value to text representation of direction.
Args:
drct (int or float): Value in degrees to convert to text
Returns:
str: String representation of the direction, could be `None`
"""
if drct is None:
return None
# Convert the value into a float
drct = float(drct)
if drct > 360 or drct < 0:
return None
text = None
if drct >= 350 or drct < 13:
text = "N"
elif drct < 35:
text = "NNE"
elif drct < 57:
text = "NE"
elif drct < 80:
text = "ENE"
elif drct < 102:
text = "E"
elif drct < 127:
text = "ESE"
elif drct < 143:
text = "SE"
elif drct < 166:
text = "SSE"
elif drct < 190:
text = "S"
elif drct < 215:
text = "SSW"
elif drct < 237:
text = "SW"
elif drct < 260:
text = "WSW"
elif drct < 281:
text = "W"
elif drct < 304:
text = "WNW"
elif drct < 324:
text = "NW"
else:
text = "NNW"
return text
[docs]
def grid_bounds(lons, lats, bounds):
"""Figure out indices that we can truncate big grid
Args:
lons (np.array): grid lons
lats (np.array): grid lats
bounds (list): [x0, y0, x1, y1]
Returns:
[x0, y0, x1, y1]
"""
if len(lons.shape) == 1:
# Do 1-d work
(x0, x1) = np.digitize([bounds[0], bounds[2]], lons)
(y0, y1) = np.digitize([bounds[1], bounds[3]], lats)
szx = len(lons)
szy = len(lats)
else:
# Do 2-d work
diff = ((lons - bounds[0]) ** 2 + (lats - bounds[1]) ** 2) ** 0.5
(lly, llx) = np.unravel_index(np.argmin(diff), lons.shape)
diff = ((lons - bounds[2]) ** 2 + (lats - bounds[3]) ** 2) ** 0.5
(ury, urx) = np.unravel_index(np.argmin(diff), lons.shape)
diff = ((lons - bounds[0]) ** 2 + (lats - bounds[3]) ** 2) ** 0.5
(uly, ulx) = np.unravel_index(np.argmin(diff), lons.shape)
diff = ((lons - bounds[2]) ** 2 + (lats - bounds[1]) ** 2) ** 0.5
(lry, lrx) = np.unravel_index(np.argmin(diff), lons.shape)
x0 = min([llx, ulx])
x1 = max([lrx, urx])
y0 = min([lry, lly])
y1 = max([uly, ury])
(szy, szx) = lons.shape
return [
int(i)
for i in [max([0, x0]), max([0, y0]), min([szx, x1]), min([szy, y1])]
]
[docs]
@contextmanager
def archive_fetch(
partialpath: str,
localdir: str = "/mesonet/ARCHIVE/data",
method: str = "get",
):
"""
Helper to fetch a file from the archive, by first looking at the filesystem
and then going to the website. This returns a filename. If a temporary
file is created, it is deleted after the context manager exits.
Args:
partialpath (str): Typically a path that starts with /YYYY/mm/dd
method (str): HTTP method to use, default 'get', in the case of head,
we only check the existence and return an empty string if found.
Returns:
str: filename of the file found and available for use
"""
# Ensure partialpath does not start with a /
if partialpath.startswith("/"):
partialpath = partialpath[1:]
localfn = os.path.join(localdir, partialpath)
if os.path.isfile(localfn):
yield localfn
return
url = f"http://mesonet.agron.iastate.edu/archive/data/{partialpath}"
tmp = None
suffix = "." + os.path.basename(partialpath).split(".")[-1]
try:
resp = httpx.request(method.upper(), url, timeout=30)
resp.raise_for_status()
if method.lower() == "head":
yield ""
return
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(resp.content)
yield tmp.name
return
except Exception as exp:
LOG.info("archive_fetch(%s) failed: %s", url, exp)
yield None
finally:
if tmp is not None:
os.unlink(tmp.name)
[docs]
def __getattr__(name: str):
"""Shim to allow deprecation notices to start."""
if name.startswith("get_dbconn") or name == "get_sqlalchemy_conn":
warnings.warn(
f"{name} has been moved to pyiem.database.{name}, removed in 1.26",
DeprecationWarning,
stacklevel=2,
)
return getattr(database, name)
raise AttributeError(f"module {__name__} has no attribute {name}")