"""Storm Prediction Center PTS Product Parser.
My life was not supposed to end like this, what a brutal format.
"""
import re
import tempfile
from datetime import timedelta
from typing import Optional
from shapely.geometry import (
MultiPolygon,
Polygon,
)
# Local
from pyiem.nws.product import TextProduct
from pyiem.util import LOG, load_geodf, utc
from ._outlook_util import (
CONUS,
THRESHOLD2TEXT,
compute_layers,
condition_segment,
convert_segments,
load_conus_data,
quality_control,
sql_day_collect,
winding_logic,
)
DAYRE = re.compile(
r"SEVERE WEATHER OUTLOOK POINTS DAY\s+(?P<day>[0-9])", re.IGNORECASE
)
DMATCH = re.compile(r"D(?P<day1>[0-9])\-?(?P<day2>[0-9])?")
THRESHOLD_ORDER = (
"0.02 0.05 0.10 0.15 0.25 0.30 0.35 0.40 0.45 0.60 TSTM MRGL SLGT ENH "
"MDT HIGH IDRT SDRT ELEV CRIT EXTM"
).split()
[docs]
def imgsrc_from_row(row: dict) -> Optional[str]:
"""Compute the SPC image source for a given database row."""
if row["cycle"] == -1 or row["cycle"] is None:
return None
if row["day"] > 3:
# Le Sigh
return (
"https://www.spc.noaa.gov/products/exper/day4-8/archive/"
f"{row['product_issue'].year}/day{row['day']}prob_"
f"{row['product_issue']:%Y%m%d}_1200.gif"
)
url = "https://www.spc.noaa.gov/products/outlook/archive/"
# year is based on the issue date
url += f"{row['product_issue'].year}/day{row['day']}"
if row["category"] == "CATEGORICAL":
url += "otlk"
elif row["day"] == 3:
url += "prob"
else:
url += "probotlk"
url += f"_{row['product_issue']:%Y%m%d}_"
conv = {}
if row["day"] == 1:
conv = {6: "1200", 16: "1630"}
elif row["day"] == 2:
conv = {7: "0600", 17: "1730"}
elif row["day"] == 3:
conv = {8: "0730", 20: "1930"}
url += conv.get(row["cycle"], f"{row['cycle']:02.0f}00") + "_"
if row["category"] in ["TORNADO", "HAIL", "WIND"]:
url += f"{row['category'].lower()[:4]}_"
return f"{url}prt.gif"
[docs]
def compute_times(afos, issue, expire, day):
"""Compute actual issue, expire time.
For the multi-day products, the text product contains a range of dates
that need translated to an actual issue and expire time.
Returns
-------
issue (datetime)
expire (datetime)
"""
if afos not in ["PTSD48", "PFWF38"]:
return issue, expire
baseday = 3 if afos == "PFWF38" else 4
issue = issue + timedelta(days=day - baseday)
return issue, issue + timedelta(hours=24)
[docs]
def get_day(prod, text):
"""Figure out which day this is for."""
if prod.afos in ["PTSDY1", "PTSDY2", "PTSDY3", "PFWFD1", "PFWFD2"]:
return int(prod.afos[5])
search = DAYRE.search(text)
if search is None:
return None
return int(search.groupdict()["day"])
[docs]
def get_segments_from_text(text):
"""Return list of segments for this text."""
tokens = re.findall("([0-9]{8})", text.replace("\n", ""))
# First we generate a list of segments, based on what we found
segments = []
pts = []
for token in tokens:
lat = float(token[:4]) / 100.0
lon = 0 - (float(token[-4:]) / 100.0)
if lon > -30:
lon -= 100.0
if token == "99999999":
if len(pts) > 1:
segments.append(pts)
pts = []
else:
pts.append([lon, lat])
if len(pts) > 1:
segments.append(pts)
return segments
[docs]
def str2multipolygon(s):
"""Convert string PTS data into a polygon.
Args:
s (str): the cryptic string that we attempt to make valid polygons from
"""
# 1. Generate list of line segments, no conditioning is done.
segments_raw = get_segments_from_text(s)
# 2. Quality Control the segments, splitting naughty ones that cross twice
segments = []
for segment in segments_raw:
res = condition_segment(segment)
if res:
segments.extend(res)
# 3. Convert segments into what they are
polygons, interiors, linestrings = convert_segments(segments)
# we do our winding logic now
polygons.extend(winding_logic(linestrings))
# Assign our interiors
for interior in interiors:
for i, polygon in enumerate(polygons):
if not polygon.intersection(interior).is_empty:
current = list(polygon.interiors)
current.append(interior)
polygons[i] = Polygon(polygon.exterior, current)
# Buffer zero any invalid polygons
for i, polygon in enumerate(polygons):
if polygon.is_valid:
continue
LOG.warning(" polygon %s is invalid, buffer(0)", i)
polygons[i] = polygon.buffer(0)
return MultiPolygon(polygons)
[docs]
def init_days(prod) -> dict[int, "SPCOutlookCollection"]:
"""Figure out which days this product should have based on the AFOS."""
def f(day):
"""Help."""
issue, expire = compute_times(prod.afos, prod.issue, prod.expire, day)
return SPCOutlookCollection(issue, expire, day)
if prod.afos == "PTSD48":
return {4: f(4), 5: f(5), 6: f(6), 7: f(7), 8: f(8)}
if prod.afos == "PFWF38":
return {3: f(3), 4: f(4), 5: f(5), 6: f(6), 7: f(7), 8: f(8)}
return {int(prod.afos[5]): f(int(prod.afos[5]))}
def _compute_cycle(prod):
"""Figure out an integer cycle that identifies this product."""
# Extended ones are easy
if prod.afos in ["PTSD48", "PFWF38"]:
return 21 if prod.outlook_type == "F" else 10
day = int(prod.afos[5])
if day == 3: # has to be convective
if prod.valid.hour in range(18, 23) and prod.valid > utc(2024, 8, 20):
return 20
return 8
# Day 2 are based on the product issuance time
if day == 2:
if prod.outlook_type == "F":
if prod.valid.hour in range(4, 13):
return 8
if prod.valid.hour in range(14, 23):
return 18
if prod.outlook_type == "C":
if prod.valid.hour in range(4, 13):
return 7
if prod.valid.hour in range(14, 23):
return 17
return -1
# We are left with day 1
hhmi = prod.get_outlookcollection(day).issue.strftime("%H%M")
if prod.outlook_type == "C":
lkp = {"1200": 6, "1300": 13, "1630": 16, "2000": 20, "0100": 1}
return lkp.get(hhmi, -1)
lkp = {"1200": 7, "1700": 17}
return lkp.get(hhmi, -1)
[docs]
class SPCOutlookCollection:
"""A collection of outlooks for a single 'day'."""
def __init__(self, issue, expire, day):
"""Construct."""
self.issue = issue
self.expire = expire
self.day = day
self.outlooks = []
[docs]
def add_outlook(self, outlook):
"""We insert an outlook in an ordered manner."""
# no choice
if not self.outlooks or outlook.level is None:
self.outlooks.append(outlook)
return
# Perf
if (
self.outlooks[-1].level is not None
and outlook.level > self.outlooks[-1].level
):
self.outlooks.append(outlook)
return
# Uh oh
for idx in range(-1, -1 - len(self.outlooks), -1):
if self.outlooks[idx].level is None:
continue
if outlook.level > self.outlooks[idx].level:
self.outlooks.insert(idx + 1, outlook)
return
self.outlooks.insert(0, outlook)
[docs]
def get_categories(self):
"""Return list of categories covered in this outlook."""
arr = []
for ol in self.outlooks:
if ol.category not in arr:
arr.append(ol.category)
return arr
[docs]
def difference_geometries(self):
"""Do the difference work to figure out actual geometries."""
# Our outlooks are ordered, so hopefully this works
for cat in self.get_categories():
outlooks = list(filter(lambda x: x.category == cat, self.outlooks))
for idx in range(0, len(outlooks) - 1):
larger = outlooks[idx]
smaller = outlooks[idx + 1]
if (
larger.level is None
or smaller.level is None
or larger.threshold in ["SDRT", "IDRT"] # One Off
):
larger.geometry = larger.geometry_layers
continue
larger.geometry = larger.geometry_layers.difference(
smaller.geometry_layers
)
# Ensure multipolygon
if not isinstance(larger.geometry, MultiPolygon):
larger.geometry = MultiPolygon([larger.geometry])
# Last polygon needs duplicated
if outlooks:
outlooks[-1].geometry = outlooks[-1].geometry_layers
[docs]
class SPCOutlook:
"""A class holding what we store for a single outlook."""
def __init__(self, category, threshold, multipoly):
"""Create a new outlook.
Args:
category (str): the label of this category
threshold (str): the threshold associated with the category
multipoly (MultiPolygon): the geometry
"""
self.category = category
self.threshold = threshold
self.level = (
None
if threshold not in THRESHOLD_ORDER
else THRESHOLD_ORDER.index(threshold)
)
self.geometry_layers = multipoly
self.geometry = None # Computed later
self.wfos = []
[docs]
class SPCPTS(TextProduct):
"""A class representing the polygons and metadata in SPC PTS Product."""
def __init__(
self, text, utcnow=None, ugc_provider=None, nwsli_provider=None
):
"""Create a new SPCPTS.
Args:
text (string): the raw PTS product that is to be parsed
utcnow (datetime, optional): in case of ambuigity with time
ugc_provider (dict, optional): unused in this class
nwsli_provider (dict, optional): unused in this class
"""
super().__init__(text, utcnow, ugc_provider, nwsli_provider)
LOG.warning("==== SPCPTS Processing: %s", self.get_product_id())
load_conus_data(self.valid)
self.issue = None
self.expire = None
self.outlook_type = None
self.set_metadata()
self.find_issue_expire()
self.outlook_collections = init_days(self)
self.find_outlooks()
quality_control(self)
compute_layers(self)
self.compute_wfos()
self.cycle = _compute_cycle(self)
[docs]
def get_outlookcollection(self, day: int) -> SPCOutlookCollection | None:
"""Return the SPCOutlookCollection for a given day."""
return self.outlook_collections.get(day)
[docs]
def get_outlook(self, category, threshold, day):
"""Get an outlook by category and threshold."""
if day not in self.outlook_collections:
return None
for outlook in self.outlook_collections[day].outlooks:
if outlook.category == category and outlook.threshold == threshold:
return outlook
return None
[docs]
def draw_outlooks(self):
"""For debugging, draw the outlooks on a simple map for inspection."""
# pylint: disable=import-outside-toplevel
from pyiem.plot.use_agg import figure
for day, collect in self.outlook_collections.items():
for outlook in collect.outlooks:
fig = figure(figsize=(12, 8))
ax = fig.add_subplot(111)
# pylint: disable=unsubscriptable-object
ax.plot(
CONUS["line"][:, 0],
CONUS["line"][:, 1],
color="b",
label="Conus",
)
for poly in outlook.geometry_layers.geoms:
ax.plot(
poly.exterior.xy[0],
poly.exterior.xy[1],
lw=2,
color="r",
)
for poly in outlook.geometry.geoms:
for interior in poly.interiors:
ax.plot(
interior.xy[0],
interior.xy[1],
lw=2,
linestyle=":",
color="g",
)
ax.set_title(
f"Day {day} Category {outlook.category} "
f"Threshold {outlook.threshold}"
)
ax.legend(loc=3)
fn = (
f"{tempfile.gettempdir()}/{day}_{self.issue:%Y%m%d%H%M}_"
f"{outlook.category}_{outlook.threshold}.png"
).replace(" ", "_")
LOG.warning(":: creating plot %s", fn)
fig.savefig(fn)
[docs]
def find_issue_expire(self):
"""Determine the period this product is valid for."""
tokens = re.findall("VALID TIME ([0-9]{6})Z - ([0-9]{6})Z", self.text)
day1 = int(tokens[0][0][:2])
hour1 = int(tokens[0][0][2:4])
min1 = int(tokens[0][0][4:])
day2 = int(tokens[0][1][:2])
hour2 = int(tokens[0][1][2:4])
min2 = int(tokens[0][1][4:])
issue = self.valid.replace(day=day1, hour=hour1, minute=min1)
expire = self.valid.replace(day=day2, hour=hour2, minute=min2)
# NB: outlooks can go out more than just one day
if day1 < self.valid.day:
issue = self.valid + timedelta(days=25)
issue = issue.replace(day=day1, hour=hour1, minute=min1)
if day2 < self.valid.day:
expire = self.valid + timedelta(days=25)
expire = expire.replace(day=day2, hour=hour2, minute=min2)
self.issue = issue
self.expire = expire
[docs]
def find_outlooks(self):
"""Find the outlook sections within the text product."""
if self.text.find("&&") == -1:
self.text = self.text.replace("\n... ", "\n&&\n... ")
self.text += "\n&& "
for segment in self.text.split("&&")[:-1]:
day = get_day(self, segment)
# We need to figure out the probabilistic or category
tokens = re.findall(r"\.\.\.\s+(.*)\s+\.\.\.", segment)
if not tokens:
continue
category = tokens[0].strip()
point_data = {}
# Now we loop over the lines looking for data
threshold = None
for line in segment.split("\n"):
if (
re.match(
(
r"^(D[3-8]\-?[3-8]?|EXTM|MRGL|ENH|SLGT|MDT|ELEV|"
r"HIGH|CRIT|TSTM|SIGN|IDRT|SDRT|0\.[0-9][0-9]) "
),
line,
)
is not None
):
newthreshold = line.split()[0]
if threshold is not None and threshold == newthreshold:
point_data[threshold] += " 99999999 "
threshold = newthreshold
if threshold is None:
continue
if threshold not in point_data:
point_data[threshold] = ""
point_data[threshold] += line.replace(threshold, " ")
if day is not None:
collect = self.get_outlookcollection(day)
# We need to duplicate, in the case of day-day spans
for threshold in list(point_data.keys()):
match = DMATCH.match(threshold)
if match:
data = match.groupdict()
if data.get("day2") is not None:
day1 = int(data["day1"])
day2 = int(data["day2"])
LOG.warning("Duplicating threshold %s-%s", day1, day2)
for i in range(day1, day2 + 1):
point_data[f"D{i}"] = point_data[threshold]
del point_data[threshold]
for threshold, text in point_data.items():
match = DMATCH.match(threshold)
if match:
day = int(match.groupdict()["day1"])
collect = self.get_outlookcollection(day)
LOG.warning(
"--> Start Day: %s Category: '%s' Threshold: '%s' =====",
day,
category,
threshold,
)
mp = str2multipolygon(text)
if DMATCH.match(threshold):
threshold = "0.15"
LOG.warning("----> End threshold is: %s", threshold)
collect.add_outlook(SPCOutlook(category, threshold, mp))
[docs]
def compute_wfos(self, _txn=None):
"""Figure out which WFOs are impacted by this polygon."""
geodf = load_geodf("cwa")
for day, collect in self.outlook_collections.items():
for outlook in collect.outlooks:
df2 = geodf[geodf["geom"].intersects(outlook.geometry_layers)]
outlook.wfos = df2.index.to_list()
LOG.warning(
"Day: %s Category: %s Threshold: %s #WFOS: %s %s",
day,
outlook.category,
outlook.threshold,
len(outlook.wfos),
",".join(outlook.wfos),
)
[docs]
def sql(self, txn):
"""Do database work.
Args:
txn (psycopg.cursor): database cursor
"""
for day, collect in self.outlook_collections.items():
sql_day_collect(self, txn, day, collect)
[docs]
def get_descript_and_url(self):
"""Help to convert awips id into strings."""
product_descript = f"(({self.afos}))"
url = "https://www.spc.noaa.gov"
day = product_descript
if self.afos == "PTSDY1":
day = "Day 1"
product_descript = "Convective"
url = (
"https://www.spc.noaa.gov/products/outlook/archive/"
f"{self.valid.year}/day1otlk_{self.issue:%Y%m%d_%H%M}.html"
)
elif self.afos == "PTSDY2":
day = "Day 2"
product_descript = "Convective"
hhmm = "1730" if self.valid.hour > 11 else "0600"
url = (
"https://www.spc.noaa.gov/products/outlook/archive/"
f"{self.valid.year}/day2otlk_{self.valid:%Y%m%d}_{hhmm}.html"
)
elif self.afos == "PTSDY3":
# 0730 when in CDT, 0830 when in CST
hhmm = "0730" if self.z == "CDT" else "0830"
day = "Day 3"
if self.cycle == 20:
hhmm = "1930"
product_descript = "Convective"
url = (
"https://www.spc.noaa.gov/products/outlook/archive/"
f"{self.valid.year}/day3otlk_{self.valid:%Y%m%d}_{hhmm}.html"
)
elif self.afos == "PTSD48":
day = "Days 4-8"
product_descript = "Convective"
url = (
"https://www.spc.noaa.gov/products/exper/day4-8/archive/"
f"{self.valid.year}/day4-8_{self.valid:%Y%m%d}.html"
)
elif self.afos == "PFWFD1":
day = "Day 1"
product_descript = "Fire Weather"
url = self.issue.strftime(
(
"https://www.spc.noaa.gov/products/fire_wx/%Y/%y%m%d_%H%M"
"_fwdy1_print.html"
)
)
elif self.afos == "PFWFD2":
day = "Day 2"
product_descript = "Fire Weather"
url = self.issue.strftime(
(
"https://www.spc.noaa.gov/products/fire_wx/%Y/%y%m%d_%H%M"
"_fwdy2_print.html"
)
)
elif self.afos == "PFWF38":
day = "Day 3-8"
product_descript = "Fire Weather"
url = self.issue.strftime(
(
"https://www.spc.noaa.gov/products/exper/fire_wx/%Y/%y%m%d"
".html"
)
)
return product_descript, url, day
[docs]
def get_jabbers(self, uri, _uri2=None):
"""Wordsmith the Jabber/Twitter Messaging."""
res = []
product_descript, url, title = self.get_descript_and_url()
jdict = {
"title": title,
"name": "The Storm Prediction Center",
"tstamp": self.valid.strftime("%b %-d, %-H:%Mz"),
"outlooktype": product_descript,
"url": url,
"wfo": "DMX", # autoplot expects something valid here
"cat": self.outlook_type,
"t220": "cwa",
}
twmedia = (
"https://mesonet.agron.iastate.edu/plotting/auto/plot/220/"
"cat:categorical::which:%(day)s%(cat)s::t:%(t220)s::network:WFO::"
"wfo:%(wfo)s::_r:86::"
f"csector:conus::valid:{self.valid.strftime('%Y-%m-%d %H%M')}"
".png"
).replace(" ", "%%20")
res = []
max_category = None
for day, collect in self.outlook_collections.items():
wfos = {
"TSTM": [],
"EXTM": [],
"MRGL": [],
"SLGT": [],
"ENH": [],
"CRIT": [],
"MDT": [],
"HIGH": [],
"ELEV": [],
"IDRT": [],
"SDRT": [],
"0.15": [],
"0.30": [],
}
for outlook in collect.outlooks:
_d = wfos.setdefault(outlook.threshold, [])
_d.extend(outlook.wfos)
jdict["day"] = day
wfomsgs = {}
# We order in least to greatest, so that the highest threshold
# overwrites lower ones
for cat in [
"MRGL",
"SLGT",
"ENH",
"MDT",
"HIGH",
"ELEV",
"CRIT",
"EXTM",
"IDRT",
"SDRT",
"0.15",
"0.30",
]:
# hack
if cat in ["0.15", "0.30"] and day < 4:
continue
jdict["ttext"] = (
f"{THRESHOLD2TEXT[cat]} {product_descript} Risk"
)
for wfo in wfos[cat]:
max_category = cat
jdict["wfo"] = wfo
wfomsgs[wfo] = [
(
"%(name)s issues Day %(day)s %(ttext)s "
"at %(tstamp)s for portions of %(wfo)s %(url)s"
)
% jdict,
(
"<p>%(name)s issues "
'<a href="%(url)s">Day %(day)s %(ttext)s</a> '
"at %(tstamp)s for portions of %(wfo)s's area</p>"
)
% jdict,
{
"channels": [
wfo,
f"{wfo}.SPC{self.afos[3:]}",
f"{wfo}.SPC{self.afos[3:]}.{cat}",
],
"product_id": self.get_product_id(),
"twitter_media": twmedia % jdict,
"twitter": (
"#SPC issues Day %(day)s %(ttext)s "
"at %(tstamp)s for %(wfo)s %(url)s"
)
% jdict,
},
]
keys = list(wfomsgs.keys())
keys.sort()
for wfo in keys:
res.append(wfomsgs[wfo])
# Generic for SPC
jdict["t220"] = "conus"
if len(self.outlook_collections) > 1:
jdict["day"] = "0"
jdict["catmsg"] = (
""
if max_category is None
else f" (Max Risk: {THRESHOLD2TEXT[max_category]})"
)
res.append(
[
(
"%(name)s issues %(title)s %(outlooktype)s Outlook"
"%(catmsg)s at %(tstamp)s %(url)s"
)
% jdict,
(
'<p>%(name)s issues <a href="%(url)s">%(title)s '
"%(outlooktype)s Outlook</a>%(catmsg)s at %(tstamp)s</p>"
)
% jdict,
{
"channels": ["SPC", f"SPC{self.afos[3:]}"],
"product_id": self.get_product_id(),
"twitter_media": twmedia % jdict,
"twitter": (
"%(name)s issues %(title)s "
"%(outlooktype)s Outlook%(catmsg)s "
"at %(tstamp)s %(url)s"
)
% jdict,
},
]
)
return res
[docs]
def parser(text, utcnow=None, ugc_provider=None, nwsli_provider=None):
"""Parse this text."""
return SPCPTS(text, utcnow, ugc_provider, nwsli_provider)