Source code for pyiem.nws.products.metarcollect

"""Encapsulates a text product holding METARs."""

import re
from datetime import timedelta, timezone
from typing import Tuple
from zoneinfo import ZoneInfo

from metar.Metar import Metar
from metar.Metar import ParserError as MetarParserError

from pyiem import datatypes
from pyiem.nws.product import TextProduct
from pyiem.observation import Observation
from pyiem.reference import TRACE_VALUE, TWEET_CHARS
from pyiem.util import LOG, drct2text

NIL_RE = re.compile(r"[\s\n]NIL")
ERROR_RE = re.compile("Unparsed groups in body '(?P<msg>.*)' while processing")
TORNADO_RE = re.compile(r" \+FC |TORNADO")
FUNNEL_RE = re.compile(r" FC |FUNNEL")
# Match what looks like SA formatted messages
SA_RE = re.compile(r"^[A-Z]{3}\sSA")
# Sites we should route to Jabber
JABBER_SITES = {}
# Keep track of Wind alerts to prevent dups
WIND_ALERTS = {}
# Wind speed threshold in kts for alerting
WIND_ALERT_THRESHOLD_KTS = 50.0
# Per site thresholds to govern mapping to channels
WIND_ALERT_THRESHOLD_KTS_BY_ICAO = {}


[docs] def normalize_temp(val): """When temperatures are close to an int, return that int!""" rounded = round(val, 0) return int(rounded) if abs(val - rounded) < 0.199 else round(val, 1)
[docs] def normid(station_id: str) -> str: """Normalize a station identifer.""" if len(station_id) == 4 and station_id.startswith("K"): return station_id[1:] return station_id
[docs] def wind_logic(iem, mtr: Metar): """Hairy logic for now we handle winds.""" # Explicit storages if mtr.wind_speed: iem.data["sknt"] = mtr.wind_speed.value("KT") if mtr.wind_gust: iem.data["gust"] = mtr.wind_gust.value("KT") if mtr.wind_dir: iem.data["drct"] = float(mtr.wind_dir.value()) if mtr.wind_speed_peak: iem.data["peak_wind_gust"] = mtr.wind_speed_peak.value("KT") if mtr.wind_dir_peak: iem.data["peak_wind_drct"] = mtr.wind_dir_peak.value() if mtr.peak_wind_time: # python-metar has an edge case for events crossing a month if mtr.peak_wind_time > mtr.time: mtr.peak_wind_time = mtr.peak_wind_time.replace( year=mtr.time.year, month=mtr.time.month ) iem.data["peak_wind_time"] = mtr.peak_wind_time.replace( tzinfo=timezone.utc ) # Figure out if we have a new max_drct old_max_wind = max( [iem.data.get("max_sknt", 0) or 0, iem.data.get("max_gust", 0) or 0] ) new_max_wind = max( [iem.data.get("sknt", 0) or 0, iem.data.get("gust", 0) or 0] ) # if our sknt or gust is a new max, use drct if new_max_wind > old_max_wind: iem.data["max_drct"] = iem.data.get("drct", 0) # if our PK WND is greater than all yall, use PK WND # TODO: PK WND potentially could be from last hour / thus yesterday? if ( mtr.wind_speed_peak and mtr.wind_dir_peak and mtr.wind_speed_peak.value("KT") > old_max_wind and mtr.wind_speed_peak.value("KT") > new_max_wind ): iem.data["max_drct"] = mtr.wind_dir_peak.value() iem.data["max_gust_ts"] = mtr.peak_wind_time.replace( tzinfo=timezone.utc ) iem.data["max_gust"] = mtr.wind_speed_peak.value("KT")
[docs] def trace(pobj): """Convert this precip object to a numeric value""" if pobj is None: return None val = pobj.value("IN") if val == 0: # IEM denotation of trace return TRACE_VALUE return val
[docs] def to_metar(textprod, text) -> Metar: """Create a METAR object, if possible""" # Do some cleaning and whitespace trimming text = sanitize(text) if len(text) < 14: # arb return None attempt = 1 mtr = None original_text = text valid = textprod.valid while attempt < 6 and mtr is None: try: mtr = Metar(text, month=valid.month, year=valid.year) except MetarParserError as inst: tokens = ERROR_RE.findall(str(inst)) if tokens: if tokens[0] == text or text.startswith(tokens[0]): return None # So tokens contains a series of groups that needs updated newtext = text for token in tokens[0].split(): newtext = newtext.replace(f" {token}", "") if newtext != text: text = newtext if str(inst).find("day is out of range for month") > -1: if valid.day < 10: valid = valid.replace(day=1) - timedelta(days=1) attempt += 1 if mtr is not None: # Attempt to figure out more things if mtr.station_id is None: LOG.warning("Aborting due to station_id being None |%s|", text) return None if mtr.time is None: LOG.warning("Aborting due to time being None |%s|", text) return None # don't allow data more than an hour into the future ceiling = (textprod.utcnow + timedelta(hours=1)).replace(tzinfo=None) if mtr.time > ceiling: # careful, we may have obs from the previous month if ceiling.day < 5 and mtr.time.day > 15: prevmonth = ceiling - timedelta(days=10) mtr.time = mtr.time.replace( year=prevmonth.year, month=prevmonth.month ) else: LOG.warning( "Aborting due to time in the future " "ceiling: %s mtr.time: %s", ceiling, mtr.time, ) return None mtr.code = original_text return mtr
[docs] def sanitize(text): """Clean our text string with METAR data""" text = re.sub("\015", " ", text) # Remove any multiple whitespace, bad chars text = ( text.encode("utf-8", "ignore") .replace(b"\xa0", b" ") .replace(b"\001", b"") .replace(b"\003", b"") .decode("utf-8", errors="ignore") ) text = " ".join(text.strip().split()) # Look to see that our METAR starts with A-Z if re.match("^[0-9]", text): tokens = text.split() text = " ".join(tokens[1:]) return text
def _is_same_day(valid, tzname, hours=6): """Can we trust a six hour total?""" try: tzinfo = ZoneInfo(tzname) except Exception: return False lts = valid.astimezone(tzinfo) # TODO we should likely somehow compute this in standard time, shrug return lts.day == (lts - timedelta(hours=hours)).day
[docs] def wind_message(mtr: Metar) -> Tuple[str, int]: """Convert this into a Jabber style message""" drct = 0 sknt = 0 time = mtr.time.replace(tzinfo=timezone.utc) if mtr.wind_gust: sknt = mtr.wind_gust.value("KT") if mtr.wind_dir: drct = mtr.wind_dir.value() if mtr.wind_speed_peak: v1 = mtr.wind_speed_peak.value("KT") d1 = mtr.wind_dir_peak.value() t1 = mtr.peak_wind_time.replace(tzinfo=timezone.utc) if v1 > sknt: sknt = v1 drct = d1 time = t1 key = f"{mtr.station_id};{sknt};{time}" if key in WIND_ALERTS: return None, None WIND_ALERTS[key] = 1 speed = datatypes.speed(sknt, "KT") msg = ( f"gust of {speed.value('KT'):.0f} knots " f"({speed.value('MPH'):.1f} mph) from {drct2text(drct)} @ {time:%H%M}Z" ) return msg, int(speed.value("KT"))
[docs] def over_wind_threshold(mtr: Metar) -> bool: """Is this METAR over the wind threshold for alerting""" if mtr.wind_gust and mtr.wind_gust.value("KT") >= WIND_ALERT_THRESHOLD_KTS: return True if ( mtr.wind_speed_peak and mtr.wind_speed_peak.value("KT") >= WIND_ALERT_THRESHOLD_KTS ): return True return False
[docs] def to_iemaccess( txn, mtr: Metar, iemid: int, tzname: str, force_current_log=False, skip_current=False, ): """Persist parsed data to IEMAccess Database. Args: txn (psycopg.cursor): database cursor / transaction mtr (Metar): Metar instance iemid: The iem station identifier tzname (str): Local timezone of station. force_current_log (boolean): should this ob always go to current_log skip_current (boolean): should this ob always skip current table """ gts = mtr.time.replace(tzinfo=timezone.utc) iem = Observation(valid=gts, iemid=iemid, tzname=tzname) # Load the observation from the database, if the same time exists! iem.load(txn) # Need to figure out if we have a duplicate ob, if so, check # the length of the raw data, if greater, take the temps if iem.data["raw"] is None or len(iem.data["raw"]) < len(mtr.code): if mtr.temp: val = mtr.temp.value("F") # Place reasonable bounds on the temperature before saving it! if -90 < val < 150: iem.data["tmpf"] = normalize_temp(val) if mtr.dewpt: val = mtr.dewpt.value("F") # Place reasonable bounds on the temperature before saving it! if -150 < val < 100: iem.data["dwpf"] = normalize_temp(val) # Database only allows len 254 iem.data["raw"] = mtr.code[:254] # Always take a COR if mtr.code.find(" COR ") > -1: iem.data["raw"] = mtr.code[:254] wind_logic(iem, mtr) if mtr.max_temp_6hr: iem.data["max_tmpf_6hr"] = normalize_temp(mtr.max_temp_6hr.value("F")) if tzname and _is_same_day(iem.data["valid"], tzname): iem.data["max_tmpf_cond"] = iem.data["max_tmpf_6hr"] if mtr.min_temp_6hr: iem.data["min_tmpf_6hr"] = normalize_temp(mtr.min_temp_6hr.value("F")) if tzname and _is_same_day(iem.data["valid"], tzname): iem.data["min_tmpf_cond"] = iem.data["min_tmpf_6hr"] if mtr.max_temp_24hr: iem.data["max_tmpf_24hr"] = normalize_temp( mtr.max_temp_24hr.value("F") ) if mtr.min_temp_24hr: iem.data["min_tmpf_24hr"] = normalize_temp( mtr.min_temp_24hr.value("F") ) if mtr.precip_3hr: iem.data["p03i"] = trace(mtr.precip_3hr) if mtr.precip_6hr: iem.data["p06i"] = trace(mtr.precip_6hr) if mtr.precip_24hr: iem.data["p24i"] = trace(mtr.precip_24hr) # We assume the value is zero, sad! iem.data["phour"] = 0 if mtr.precip_1hr: iem.data["phour"] = trace(mtr.precip_1hr) if mtr.snowdepth: # NOTE snowd is a summary variable that wants to be daily, this # METAR value is more instantaneous, so goes to current table iem.data["snowdepth"] = mtr.snowdepth.value("IN") if mtr.vis: iem.data["vsby"] = mtr.vis.value("SM") if mtr.press: iem.data["alti"] = mtr.press.value("IN") if mtr.press_sea_level: iem.data["mslp"] = mtr.press_sea_level.value("MB") if mtr.press_sea_level and mtr.press: alti = mtr.press.value("MB") mslp = mtr.press_sea_level.value("MB") if abs(alti - mslp) > 25: LOG.warning( "PRESSURE ERROR %s %s ALTI: %s MSLP: %s", mtr.station_id, iem.data["valid"], alti, mslp, ) if alti > mslp: iem.data["mslp"] += 100.0 else: iem.data["mslp"] -= 100.0 # Do something with sky coverage for i, (cov, hgh, _) in enumerate(mtr.sky, start=1): iem.data[f"skyc{i}"] = cov if hgh is not None: iem.data[f"skyl{i}"] = int(hgh.value("FT")) # Presentwx if mtr.weather: pwx = [] for wx in mtr.weather: val = "".join([a for a in wx if a is not None]) if val in ["", len(val) * "/"]: continue pwx.append(val) iem.data["wxcodes"] = pwx # Ice Accretion for hr in [1, 3, 6]: key = f"ice_accretion_{hr}hr" iem.data[key] = trace(getattr(mtr, key)) return iem, iem.save(txn, force_current_log, skip_current)
[docs] class METARCollective(TextProduct): """ A TextProduct containing METAR information """ def __init__( self, text, utcnow=None, ugc_provider=None, nwsli_provider=None ): """Constructor Args: text (string): the raw string to process""" super().__init__(text, utcnow, ugc_provider, nwsli_provider) self.metars = [] self.split_and_parse()
[docs] def get_jabbers(self, uri, _uri2=None): """Make this into jabber messages""" jmsgs = [] for mtr in self.metars: msg = None sknt = 0 for weatheri in mtr.weather: for wx in weatheri: if wx is not None and "GR" in wx: msg = "Hail" if TORNADO_RE.findall(mtr.code): msg = "Tornado" elif FUNNEL_RE.findall(mtr.code): msg = "Funnel Cloud" # Search for Peak wind gust info.... elif over_wind_threshold(mtr): msg, sknt = wind_message(mtr) # Sites set to always route to Jabber. if ( mtr.station_id in JABBER_SITES and JABBER_SITES[mtr.station_id] != mtr.time ): JABBER_SITES[mtr.station_id] = mtr.time channels = [f"METAR.{mtr.station_id}"] if mtr.type == "SPECI": channels.append(f"SPECI.{mtr.station_id}") mstr = f"{mtr.type} {mtr.code}" jmsgs.append([mstr, mstr, dict(channels=",".join(channels))]) if msg is None: continue sid = normid(mtr.station_id) row = self.nwsli_provider.get(sid, {}) wfo = row.get("wfo") if wfo is None or wfo == "": LOG.warning( "Unknown WFO for %s, skipping alert", mtr.station_id ) continue channels = [f"METAR.{mtr.station_id}"] if mtr.type == "SPECI": channels.append(f"SPECI.{mtr.station_id}") if sknt > 0: # Custom stuff for how this wind reports maps to channels if sknt >= WIND_ALERT_THRESHOLD_KTS_BY_ICAO.get( mtr.station_id, WIND_ALERT_THRESHOLD_KTS ): channels.append(wfo) # Thresholded channels for _sknt in range( int(WIND_ALERT_THRESHOLD_KTS), sknt + 1, 10 ): channels.append(f"METAR.{mtr.station_id}.WIND{_sknt:.0f}") else: channels.append(wfo) st = row.get("state") nm = row.get("name") extra = "" if mtr.code.find("$") > 0: extra = "(Caution: Maintenance Check Indicator)" url = f"{uri}{row.get('network')}" jtxt = ( f"{nm},{st} ({sid}) ASOS {extra} reports {msg}\n" f"{mtr.code} {url}" ) jhtml = ( f'<p><a href="{url}">{nm},{st}</a> ({sid}) ASOS ' f"{extra} reports <strong>{msg}</strong>" f"<br/>{mtr.code}</p>" ) xtra = { "channels": ",".join(channels), "lat": str(row.get("lat")), "long": str(row.get("lon")), "twitter": ( f"{nm},{st} ({sid}) ASOS reports {msg} -- {mtr.code}" )[:TWEET_CHARS], } jmsgs.append([jtxt, jhtml, xtra]) return jmsgs
[docs] def split_and_parse(self): """Create METAR objects as we find products in the text""" # unixtext is conditioned, so first line is LDM, WMO # the question is what is on the third line lines = self.unixtext.split("\n") # not METAR or SPECI, so take it linenum = 2 if len(lines[2].strip()) > 5 else 3 content = "\n".join(lines[linenum:]) # Tokenize on the '=', which splits a product with METARs tokens = content.split("=") for token in tokens: # Dump METARs that have NIL in them prefix = "METAR" if self.afos != "SPECI" else "SPECI" if NIL_RE.search(token): continue if token.find("METAR") > -1: token = token[(token.find("METAR") + 5) :] elif token.find("SPECI") > -1: token = token[(token.find("SPECI") + 5) :] prefix = "SPECI" elif len(token.strip()) < 5: continue res = to_metar(self, token) if res: res.type = prefix self.metars.append(res)
[docs] def parser(text, utcnow=None, ugc_provider=None, nwsli_provider=None): """Helper function""" return METARCollective(text, utcnow, ugc_provider, nwsli_provider)