Source code for pyiem.nws.products.hml

"""NWS Hydrological Markup Language

Attempt to break up the HML product into atomic data

"""

import re
from datetime import datetime, timezone

import defusedxml.ElementTree as ET
import pandas as pd

from pyiem.nws.product import TextProduct
from pyiem.util import LOG

DELIMITER = r"""\<\?xml version="1.0" standalone="yes"\?\>"""


[docs] def no999(val): """No negative -999 or -9999 please.""" if val is None or val == "-999" or val == "-9999": return None return val
[docs] def parseUTC(s): """Parse an ISO-ish string into UTC timestamp""" if s is None: return None return datetime.strptime(s[:19], "%Y-%m-%dT%H:%M:%S").replace( tzinfo=timezone.utc )
[docs] def parse_xml(token): """Attempt to parse the XML into something useful""" root = ET.fromstring(token) hml = HMLData() hml.station = root.attrib["id"] hml.stationname = root.attrib.get("name") hml.originator = root.attrib.get("originator") hml.generationtime = parseUTC(root.attrib["generationtime"]) for child in root: if child.tag not in ["observed", "forecast"]: continue rows = [] for datum in child.findall("datum"): secondary = datum.find("secondary") rows.append( dict( name=child.tag, valid=parseUTC(datum.find("valid").text), primary=no999(datum.find("primary").text), secondary=( no999(secondary.text) if secondary is not None else None ), ) ) mydict = hml.data[child.tag] df = pd.DataFrame(rows) df["primary"] = pd.to_numeric(df["primary"], errors="coerce") df["secondary"] = pd.to_numeric(df["secondary"], errors="coerce") mydict["dataframe"] = df mydict["issued"] = parseUTC(child.attrib.get("issued")) for attr in [ "primaryName", "secondaryName", "primaryUnits", "secondaryUnits", ]: mydict[attr] = child.attrib.get(attr) return hml
[docs] class HMLData: """Our data object.""" def __init__(self): """Constructor.""" self.station = None self.stationname = None self.originator = None self.generationtime = None self.data = { "observed": dict( dataframe=None, primaryUnits=None, issued=None, secondaryUnits=None, primaryName=None, secondaryName=None, ), "forecast": dict( dataframe=None, primaryUnits=None, issued=None, secondaryUnits=None, primaryName=None, secondaryName=None, ), }
[docs] class HML(TextProduct): """Class for parsing and representing Space Wx Products""" def __init__( self, text, utcnow=None, ugc_provider=None, nwsli_provider=None ): """constructor""" super().__init__( text, utcnow=utcnow, ugc_provider=ugc_provider, nwsli_provider=nwsli_provider, ) self.data = [] self.parsing()
[docs] def do_sql_observed(self, cursor, _hml): """Process the observed portion of the dataset""" ob = _hml.data["observed"] if ob["dataframe"] is None: return df = ob["dataframe"] if df.empty: return for col in ["primary", "secondary"]: if ob[col + "Name"] is None: continue key = "%s[%s]" % (ob[f"{col}Name"], ob[f"{col}Units"]) # Check that we have some non-null data df2 = df[pd.notnull(df[col])] if df2.empty: continue minvalid = df2["valid"].min() maxvalid = df2["valid"].max() cursor.execute( """ DELETE from hml_observed_data WHERE station = %s and valid >= %s and valid <= %s and key = get_hml_observed_key(%s) """, (_hml.station, minvalid, maxvalid, key), ) for _, row in df2.iterrows(): val = row[col] if val is None: continue cursor.execute( "INSERT into hml_observed_data (station, valid, key, " "value) VALUES (%s, %s, get_hml_observed_key(%s), %s) " "RETURNING key", (_hml.station, row["valid"], key, val), ) if cursor.fetchone()["key"] is not None: continue # Delete the bad row cursor.execute( "DELETE from hml_observed_data WHERE station = %s and " "valid = %s and key is null", (_hml.station, row["valid"]), ) # Need to create a new unit! cursor.execute( "INSERT into hml_observed_keys(id, label) VALUES (" "(SELECT coalesce(max(id) + 1, 0) from hml_observed_keys)," "%s) RETURNING id", (key,), ) LOG.warning( "Created key %s for %s", cursor.fetchone()["id"], key )
[docs] def do_sql_forecast(self, cursor, _hml): """Process the forecast portion of the dataset""" fx = _hml.data["forecast"] df = fx["dataframe"] if df is None: return if df.empty: return # Get an id cursor.execute( """ INSERT into hml_forecast(station, generationtime, originator, product_id, primaryname, secondaryname, primaryunits, secondaryunits, issued, forecast_sts, forecast_ets) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( _hml.station, _hml.generationtime, _hml.originator, self.get_product_id(), fx["primaryName"], fx["secondaryName"], fx["primaryUnits"], fx["secondaryUnits"], fx["issued"], df["valid"].min(), df["valid"].max(), ), ) fid = cursor.fetchone()["id"] # Table partitioning is done by issued time table = f"hml_forecast_data_{fx['issued'].year}" for _, row in fx["dataframe"].iterrows(): cursor.execute( f"INSERT into {table} (hml_forecast_id, valid, primary_value, " "secondary_value) VALUES (%s, %s, %s, %s)", (fid, row["valid"], row["primary"], row["secondary"]), )
[docs] def sql(self, cursor): """Persist this information to the database""" for _hml in self.data: self.do_sql_forecast(cursor, _hml) self.do_sql_observed(cursor, _hml)
[docs] def parsing(self): """Attempt to parse out what we have found""" tokens = re.split(DELIMITER, self.unixtext) for token in tokens: if token.find("</site>") == -1: continue content = token.strip() try: self.data.append(parse_xml(content)) except Exception as exp: self.warnings.append( f"Parsing {self.get_product_id()} resulted in {exp}\n" f"{content}" )
[docs] def __str__(self): """string representation""" s = f"HML {self.get_product_id()}\n" for _hml in self.data: s += ( f" + SID: {_hml.station} " f"generationTime: {_hml.generationtime}\n" ) return s
[docs] def parser(buf, utcnow=None, ugc_provider=None, nwsli_provider=None): """Parse a HML NOAAPort product This may have multiple xml documents inside. Args: buf (str): What we want to parse """ return HML(buf, utcnow, ugc_provider, nwsli_provider)