"""
Eh, I am able to parse the SPC PTS now, so why not add more pain.
Weather Prediction Center Excessive Rainfall Outlook.
"""
import math
import re
import tempfile
from datetime import datetime, timezone
from metpy.units import units
from shapely.geometry import MultiPolygon
from pyiem.nws.product import TextProduct
from pyiem.nws.products.spcpts import SPCOutlook as Outlook
from pyiem.nws.products.spcpts import SPCOutlookCollection as OutlookCollection
# Local
from pyiem.reference import txt2drct
from pyiem.util import LOG, load_geodf
from ._outlook_util import (
CONUS,
THRESHOLD2TEXT,
compute_layers,
condition_segment,
convert_segments,
load_conus_data,
quality_control,
sql_day_collect,
winding_logic,
)
RISK_RE = re.compile(
r"^(?P<cat>MARGINAL|SLIGHT|MODERATE|HIGH)",
re.I,
)
VALID_RE = re.compile(
r"Valid\s+(?P<start>[0-9]{1,4})Z?\s+...\s+(?P<smonth>[A-Z]{3})\s+"
r"(?P<sday>\d+)\s+(?P<syear>\d\d\d\d)\s+\-\s+"
r"(?P<end>[0-9]{1,4})Z?\s+...\s+(?P<emonth>[A-Z]{3})\s+"
r"(?P<eday>\d+)\s+(?P<eyear>\d\d\d\d)",
re.IGNORECASE,
)
TEXT2THRESHOLD = {
"MARGINAL": "MRGL",
"SLIGHT": "SLGT",
"ENHANCED": "ENH",
"MODERATE": "MDT",
"HIGH": "HIGH",
}
[docs]
def init_days(prod):
"""Figure out which days this product should have based on the AFOS."""
day = 1
if prod.afos == "RBG98E":
day = 2
elif prod.afos == "RBG99E":
day = 3
return day, {day: OutlookCollection(prod.issue, prod.expire, day)}
def _compute_cycle(prod):
"""Figure out an integer cycle that identifies this product."""
if prod.day == 1:
if prod.valid.hour in range(0, 6):
return 1
if prod.valid.hour in range(6, 13):
return 8
if prod.valid.hour in range(13, 20):
return 16
if prod.valid.hour in range(4, 12):
return 8
if prod.valid.hour in range(17, 22):
return 20
return -1
[docs]
def compute_loc(lon, lat, dist, bearing):
"""Estimate a lat/lon"""
meters = (units("mile") * dist).to(units("meter")).m
northing = meters * math.cos(math.radians(bearing)) / 111111.0
easting = (
meters
* math.sin(math.radians(bearing))
/ math.cos(math.radians(lat))
/ 111111.0
)
return lon + easting, lat + northing
[docs]
def sid_rectify(sid):
"""Ensure it matches our nomenclature."""
if sid.startswith("K") and len(sid) == 4:
return sid[1:]
return sid
[docs]
def meat2segment(meat):
"""Convert into a list of points."""
asos = load_geodf("sfstns")
tokens = meat.split()
sz = len(tokens)
i = 0
pts = []
gc = "geometry"
while i < sz:
token = tokens[i]
if token.isdigit() and (i + 2) < sz:
miles = float(token)
drct = txt2drct.get(tokens[i + 1])
sid = sid_rectify(tokens[i + 2])
row = asos.loc[sid]
pts.append(compute_loc(row[gc].x, row[gc].y, miles, drct))
i += 3
continue
sid = sid_rectify(tokens[i])
row = asos.loc[sid]
pts.append([row[gc].x, row[gc].y])
i += 1
return pts
[docs]
def jabber_messages(valid, outlook_collections) -> list:
"""Generate a list of Jabber messages to send.
Args:
valid (datetime): the valid time of the outlook
outlook_collections (dict): a list of OutlookCollection objects
Returns:
list: a list of Jabber messages [(plain, html, xtradict), ...]
"""
res = []
url = (
"https://www.wpc.ncep.noaa.gov/qpf/excessive_rainfall_outlook_ero.php"
)
product_descript = "Excessive Rainfall Outlook"
jdict = {
"title": product_descript,
"name": "The Weather Prediction Center",
"tstamp": valid.strftime("%b %-d, %-H:%Mz"),
"url": url,
"wfo": "DMX", # autoplot expects something valid here
"cat": "E",
"t220": "cwa",
}
twmedia = (
"https://mesonet.agron.iastate.edu/plotting/auto/plot/220/"
"cat:categorical::which:%(day)s%(cat)s::t:%(t220)s::network:WFO::"
"wfo:%(wfo)s::_r:86::"
f"csector:conus::valid:{valid.strftime('%Y-%m-%d %H%M')}"
".png"
).replace(" ", "%%20")
max_category = None
for day, collect in outlook_collections.items():
wfos = {
"MRGL": [],
"SLGT": [],
"ENH": [],
"MDT": [],
"HIGH": [],
}
for outlook in collect.outlooks:
_d = wfos.setdefault(outlook.threshold, [])
_d.extend(outlook.wfos)
jdict["day"] = day
wfomsgs = {}
# We order in least to greatest, so that the highest threshold
# overwrites lower ones
for cat in [
"MRGL",
"SLGT",
"ENH",
"MDT",
"HIGH",
]:
jdict["ttext"] = f"{THRESHOLD2TEXT[cat]} Risk {product_descript}"
for wfo in wfos[cat]:
max_category = cat
jdict["wfo"] = wfo
wfomsgs[wfo] = [
(
"%(name)s issues Day %(day)s %(ttext)s "
"at %(tstamp)s for portions of %(wfo)s %(url)s"
)
% jdict,
(
"<p>%(name)s issues "
'<a href="%(url)s">Day %(day)s %(ttext)s</a> '
"at %(tstamp)s for portions of %(wfo)s's area</p>"
)
% jdict,
{
"channels": [
wfo,
f"{wfo}.ERODY{day}",
f"{wfo}.ERODY{day}.{cat}",
],
"twitter_media": twmedia % jdict,
"twitter": (
"WPC issues Day %(day)s %(ttext)s "
"at %(tstamp)s for %(wfo)s %(url)s"
)
% jdict,
},
]
keys = list(wfomsgs.keys())
keys.sort()
for wfo in keys:
res.append(wfomsgs[wfo]) # noqa
# Generic for WPC
jdict["t220"] = "conus"
jdict["title2"] = "%(name)s issues Day %(day)s %(title)s" % jdict
jdict["catmsg"] = (
""
if max_category is None
else f" (Max Risk: {THRESHOLD2TEXT[max_category]})"
)
res.append(
[
"%(title2)s%(catmsg)s at %(tstamp)s %(url)s" % jdict,
(
'<p>%(name)s issues <a href="%(url)s">Day %(day)s '
"%(title)s</a>%(catmsg)s at %(tstamp)s</p>"
)
% jdict,
{
"channels": ["WPC", "ERODY%(day)s" % jdict],
"twitter_media": twmedia % jdict,
"twitter": "%(title2)s%(catmsg)s at %(tstamp)s %(url)s"
% jdict,
},
],
)
return res
[docs]
def compute_wfos(outlook_collections):
"""Compute the WFOs affected for each outlook in collection."""
geodf = load_geodf("cwa")
for day, collect in outlook_collections.items():
for outlook in collect.outlooks:
df2 = geodf[geodf["geom"].intersects(outlook.geometry_layers)]
outlook.wfos = df2.index.to_list()
LOG.info(
"Day: %s Category: %s Threshold: %s #WFOS: %s %s",
day,
outlook.category,
outlook.threshold,
len(outlook.wfos),
",".join(outlook.wfos),
)
[docs]
class ERO(TextProduct):
"""A class representing the polygons and metadata in WPC ERO Product"""
def __init__(
self, text, utcnow=None, ugc_provider=None, nwsli_provider=None
):
"""Constructor
Args:
text (string): the raw PTS product that is to be parsed
utcnow (datetime, optional): in case of ambuigity with time
ugc_provider (dict, optional): unused in this class
nwsli_provider (dict, optional): unused in this class
"""
super().__init__(text, utcnow, ugc_provider, nwsli_provider)
LOG.info("==== ERO Processing: %s", self.get_product_id())
load_conus_data()
self.issue = None
self.expire = None
self.outlook_type = "E"
self.find_issue_expire()
self.day, self.outlook_collections = init_days(self)
self.find_outlooks()
quality_control(self)
compute_layers(self)
compute_wfos(self.outlook_collections)
self.cycle = _compute_cycle(self)
[docs]
def get_outlookcollection(self, day):
"""Returns the OutlookCollection for a given day"""
return self.outlook_collections.get(day)
[docs]
def get_outlook(self, category, threshold, day):
"""Get an outlook by category and threshold"""
if day not in self.outlook_collections:
return None
for outlook in self.outlook_collections[day].outlooks:
if outlook.category == category and outlook.threshold == threshold:
return outlook
return None
[docs]
def draw_outlooks(self):
"""For debugging, draw the outlooks on a simple map for inspection!"""
# pylint: disable=import-outside-toplevel
from pyiem.plot.use_agg import figure
for day, collect in self.outlook_collections.items():
for outlook in collect.outlooks:
fig = figure(figsize=(12, 8))
ax = fig.add_subplot(111)
# pylint: disable=unsubscriptable-object
ax.plot(
CONUS["line"][:, 0],
CONUS["line"][:, 1],
color="b",
label="Conus",
)
for poly in outlook.geometry_layers.geoms:
ax.plot(
poly.exterior.xy[0],
poly.exterior.xy[1],
lw=2,
color="r",
)
ax.set_title(
f"Day {day} Category {outlook.category} "
f"Threshold {outlook.threshold}"
)
ax.legend(loc=3)
fn = (
f"{tempfile.gettempdir()}/{day}_{self.issue:%Y%m%d%H%M}_"
f"{outlook.category}_{outlook.threshold}.png"
).replace(" ", "_")
LOG.info(":: creating plot %s", fn)
fig.savefig(fn)
[docs]
def find_issue_expire(self):
"""
Determine the period this product is valid for
"""
m = VALID_RE.findall(self.text)[0]
if len(m[0]) >= 3:
hour1 = int(m[0][:2])
minute1 = int(m[0][-2:])
else:
hour1 = int(m[0])
minute1 = 0
if len(m[4]) >= 3:
hour2 = int(m[4][:2])
minute2 = int(m[4][-2:])
else:
hour2 = int(m[4])
minute2 = 0
self.issue = datetime.strptime(
f"{hour1}:{minute1} {m[1]} {m[2]} {m[3]}",
"%H:%M %b %d %Y",
).replace(tzinfo=timezone.utc)
self.expire = datetime.strptime(
f"{hour2}:{minute2} {m[5]} {m[6]} {m[7]}",
"%H:%M %b %d %Y",
).replace(tzinfo=timezone.utc)
[docs]
def find_outlooks(self):
"""Find the outlook sections within the text product!"""
text = "\n".join([x.strip() for x in self.text.split("\n")])
tokens = text.strip().split("\n\n")
point_data = {}
for token in tokens:
section = token.strip().replace("\n", " ")
m = RISK_RE.match(section)
if m is None:
continue
threshold = TEXT2THRESHOLD[m.groupdict()["cat"]]
arr = point_data.setdefault(threshold, [])
meat = section.split(" FROM ", 1)[1].replace(".", "")
arr.append(meat2segment(meat))
collect = self.get_outlookcollection(self.day)
for threshold, pdata in point_data.items():
segments = []
for segment in pdata:
res = condition_segment(segment)
if res:
segments.extend(res)
polygons, _interiors, linestrings = convert_segments(segments)
# we do our winding logic now
polygons.extend(winding_logic(linestrings))
mp = MultiPolygon(polygons)
collect.outlooks.append(Outlook("CATEGORICAL", threshold, mp))
[docs]
def sql(self, txn):
"""Do database work
Args:
txn (psycopg.cursor): database cursor
"""
for day, collect in self.outlook_collections.items():
sql_day_collect(self, txn, day, collect)
[docs]
def get_jabbers(self, uri, _uri2=None):
"""Wordsmith the Jabber/Twitter Messaging"""
return jabber_messages(self.valid, self.outlook_collections)
[docs]
def parser(text, utcnow=None, ugc_provider=None, nwsli_provider=None):
"""Parse this text!"""
return ERO(text, utcnow, ugc_provider, nwsli_provider)