Source code for pyiem.nws.products.lsr

"""NWS Local Storm Report (LSR) Parsing."""

import math
import re
from datetime import datetime, timedelta, timezone

from shapely.geometry import Point as ShapelyPoint

from pyiem import reference
from pyiem.nws.lsr import LSR, _icestorm_remark
from pyiem.nws.product import TextProduct, TextProductException
from pyiem.util import utc

# Don't permit LSRs that are more than 1 hour newer than product time
# or future of the current time
FUTURE_THRESHOLD = timedelta(hours=1)
SPLITTER = re.compile(
    r"(^[0-9].+?\n^[0-9].+?\n)((?:.*?\n)+?)(?=^[0-9]|$)", re.MULTILINE
)


[docs] class LSRProductException(TextProductException): """Something we can raise when bad things happen!"""
[docs] class LSRProduct(TextProduct): """Represents a text product of the LSR variety""" def __init__( self, text, utcnow=None, ugc_provider=None, nwsli_provider=None ): """constructor""" super().__init__( text, utcnow=utcnow, ugc_provider=ugc_provider, nwsli_provider=nwsli_provider, ) self.lsrs: list[LSR] = [] self.duplicates = 0
[docs] def get_temporal_domain(self) -> tuple[datetime | None, datetime | None]: """Return the min and max timestamps of lsrs""" if not self.lsrs: return None, None valids = [lsr.valid.astimezone(timezone.utc) for lsr in self.lsrs] return min(valids), max(valids)
[docs] def is_summary(self): """Returns is this LSR is a summary or not""" return self.unixtext.upper().find("...SUMMARY") > 0
[docs] def get_url(self, baseuri: str) -> str: """Get the URL of this product""" min_time, max_time = self.get_temporal_domain() wfo = self.source[1:] return ( f"{baseuri}?by=wfo&wfo={wfo}&sts={min_time:%Y%m%d%H%M}" f"&ets={max_time:%Y%m%d%H%M}" )
[docs] def get_jabbers(self, uri, _uri2=None): """return a text and html variant for Jabber stuff""" res = [] if not self.lsrs: return res wfo = self.source[1:] url = self.get_url(uri) if len(self.lsrs) < 5: for mylsr in self.lsrs: if mylsr.duplicate: continue res.append(mylsr.get_jabbers(uri)) if self.is_summary() or len(self.lsrs) >= 5: extra_text = " " if self.duplicates > 0: extra_text = ( f", {self.duplicates} out of {len(self.lsrs)} reports " "were previously sent and not repeated here. " ) text = f"{wfo} issues Summary Local Storm Report{extra_text}{url}" html = ( f"<p>{wfo} issues <a href='{url}'>" f"Summary Local Storm Report</a>{extra_text}</p>" ) xtra = { "product_id": self.get_product_id(), "channels": f"LSR{wfo}", "twitter": text, "twitter_media": ( "https://mesonet.agron.iastate.edu/plotting/auto/plot/242/" f"pid:{self.get_product_id()}.png" ), } res.append([text, html, xtra]) return res
def _mylowercase(text): """Specialized lowercase function""" tokens = text.split() for i, t in enumerate(tokens): if len(t) > 3: tokens[i] = t.title() elif t in [ "N", "NNE", "NNW", "NE", "E", "ENE", "ESE", "SE", "S", "SSE", "SSW", "SW", "W", "WSW", "WNW", "NW", ]: continue return " ".join(tokens)
[docs] def parse_lsr(prod, text): """Emit a LSR object based on this text! 0914 PM HAIL SHAW 33.60N 90.77W 04/29/2005 1.00 INCH BOLIVAR MS EMERGENCY MNGR """ lines = text.split("\n") if len(lines) < 2: prod.warnings.append( ("LSR text is too short |%s|\n%s") % (text.replace("\n", "<NL>"), text) ) return None lsr = LSR() lsr.product = prod lsr.text = text tokens = lines[0].split() h12 = tokens[0][:-2] mm = tokens[0][-2:] ampm = tokens[1] dstr = f"{h12}:{mm} {ampm} {lines[1][:10]}" lsr.valid = datetime.strptime(dstr, "%I:%M %p %m/%d/%Y") lsr.assign_timezone(prod.tz, prod.z) # Check that we are within bounds if lsr.utcvalid > (prod.valid + FUTURE_THRESHOLD) or lsr.utcvalid > ( utc() + FUTURE_THRESHOLD ): prod.warnings.append( "LSR is from the future!\n" f"prod.valid: {prod.valid} lsr.valid: {lsr.valid}\n" f"{text}\n" ) return None lsr.wfo = prod.source[1:] lsr.typetext = lines[0][12:29].strip() if lsr.typetext.upper() not in reference.lsr_events: prod.warnings.append(f"Unknown lsr.typetext |{lsr.typetext}|\n{text}") return None lsr.city = lines[0][29:53].strip() tokens = lines[0][53:].strip().split() lat = float(tokens[0][:-1]) lat_sign = tokens[0][-1] if lat_sign == "S": lat = 0 - lat lon = 0 - float(tokens[1][:-1]) lon_sign = tokens[1][-1] if lon_sign == "E": lon = 0 - lon if lon <= -180 or lon >= 180 or lat >= 90 or lat <= -90: prod.warnings.append(f"Invalid Geometry Lat: {lat} Lon: {lon}\n{text}") return None lsr.geometry = ShapelyPoint((lon, lat)) lsr.consume_magnitude(lines[1][12:29].strip()) if lsr.magnitude_f is not None and math.isnan(lsr.magnitude_f): prod.warnings.append(f"LSR has NAN magnitude\n{text}") return None # Condition for a specific spacing case we want to workaround if lines[1][47] != " " and lines[1][52] != " ": prod.warnings.append(f"Workaround LSR spacing for |{lines[1][47:53]}|") lines[1] = lines[1][:47] + " " + lines[1][47:] lsr.county = lines[1][29:48].strip() if lsr.county == "": prod.warnings.append(f"LSR has empty county\n{text}") lsr.state = lines[1][48:50].strip() if lsr.state == "": prod.warnings.append(f"LSR has empty state\n{text}") lsr.source = lines[1][53:].strip() if lsr.source == "": prod.warnings.append(f"LSR has empty source\n{text}") if len(lines) > 2: meat = " ".join(lines[2:]).strip() if meat.strip() != "": lsr.remark = " ".join(meat.split()) if lsr.typetext.upper() == "ICE STORM" and lsr.magnitude_f is None: val = _icestorm_remark(lsr.remark) if val is not None: lsr.magnitude_f = val lsr.magnitude_qualifier = "U" lsr.magnitude_units = "INCH" return lsr
[docs] def parser(text, utcnow=None, ugc_provider=None, nwsli_provider=None): """Helper function that actually converts the raw text and emits an LSRProduct instance or returns an exception""" if ugc_provider is None: ugc_provider = {} prod = LSRProduct( text, utcnow, ugc_provider=ugc_provider, nwsli_provider=nwsli_provider ) if prod.z is None: prod.warnings.append("Abort parsing as no timezone was found.") return prod for match in SPLITTER.finditer(prod.unixtext): lsr = parse_lsr(prod, "".join(match.groups())) if lsr is None: continue prod.lsrs.append(lsr) return prod