"""Parsing of Storm Prediction Center SAW Product
This does not process the legacy SAW products that did not have LAT...LON
"""
import re
from datetime import timedelta
from typing import Optional
from shapely.geometry import MultiPolygon
from shapely.geometry import Polygon as ShapelyPolygon
from pyiem.exceptions import SAWException
from pyiem.nws.product import TextProduct
from pyiem.util import load_geodf, utc
LATLON = re.compile(r"LAT\.\.\.LON\s+((?:[0-9]{8}\s+)+)")
NUM_RE = re.compile(
r"WW ([0-9]*) (TEST)?\s?(SEVERE TSTM|TORNADO|SEVERE THUNDERSTORM)"
)
REPLACES_RE = re.compile("REPLACES WW ([0-9]*)")
DBTYPES = ["TOR", "SVR"]
TYPE2STRING = ["Tornado", "Severe Thunderstorm"]
SPCURL = "https://www.spc.noaa.gov/products/watch"
[docs]
class SAWProduct(TextProduct):
"""Class representing a SAW Product"""
(TORNADO, SEVERE_THUNDERSTORM) = range(2)
(ISSUES, CANCELS) = range(2)
def __init__(self, text, utcnow=None):
"""Constructor
Args:
text (str): text to parse
"""
super().__init__(text, utcnow=utcnow, ugc_provider={})
self.saw = int(self.afos[3:].strip())
self.action = self.find_action()
self.geometry = self.find_polygon()
self.ww_num = self.find_ww_num()
(self.sts, self.ets) = self.find_time()
self.ww_type = self.find_ww_type()
self.replaces_num = self.find_replaces()
self.affected_wfos = []
[docs]
def find_replaces(self) -> Optional[int]:
"""Figure out what this watch replaces."""
tokens = REPLACES_RE.findall(self.unixtext)
if not tokens:
return None
return int(tokens[0])
[docs]
def find_action(self):
"""Figure out if this is an issuance or cancells statement
Return:
(int): either ISSUES or CANCELS
"""
if re.findall("CANCELLED", self.unixtext):
return self.CANCELS
return self.ISSUES
[docs]
def compute_wfos(self, _txn=None):
"""Figure out who is impacted by this watch"""
if self.geometry is None:
return
gdf = load_geodf("cwa")
gdf = gdf[gdf.intersects(self.geometry)]
self.affected_wfos.extend(gdf.index.to_list())
[docs]
def sql(self, txn):
"""Do the necessary database work
Args:
(psycopg.transaction): a database transaction
"""
if self.action == self.ISSUES:
# Ensure we have a watch to update
txn.execute(
"select 1 from watches WHERE num = %s and "
"extract(year from issued at time zone 'UTC') = %s",
(self.ww_num, self.sts.year),
)
if txn.rowcount == 0:
txn.execute(
"INSERT into watches (num, issued) VALUES (%s, %s)",
(self.ww_num, self.sts),
)
# Insert into the main watches table
giswkt = f"SRID=4326;{MultiPolygon([self.geometry]).wkt}"
sql = (
"UPDATE watches SET sel = %s, issued = %s, expired = %s, "
"type = %s, geom = %s, product_id_saw = %s "
"WHERE num = %s and "
"extract(year from issued at time zone 'UTC') = %s"
)
args = (
f"SEL{self.saw}",
self.sts,
self.ets,
DBTYPES[self.ww_type],
giswkt,
self.get_product_id(),
self.ww_num,
self.sts.year,
)
txn.execute(sql, args)
# Update the watches_current table
sql = (
"UPDATE watches_current SET issued = %s, expired = %s, "
"type = %s, geom = %s, num = %s WHERE sel = %s"
)
args = (
self.sts,
self.ets,
DBTYPES[self.ww_type],
giswkt,
self.ww_num,
f"SEL{self.saw}",
)
txn.execute(sql, args)
# Is this a replacement?
if self.replaces_num is not None:
txn.execute(
"UPDATE watches SET expired = %s "
"WHERE num = %s and extract(year from expired) = %s",
(self.valid, self.replaces_num, self.sts.year),
)
elif self.action == self.CANCELS:
for table in ("watches", "watches_current"):
txn.execute(
f"UPDATE {table} SET expired = %s "
"WHERE num = %s and extract(year from expired) = %s",
(self.valid, self.ww_num, self.valid.year),
)
if table == "watches" and txn.rowcount != 1:
self.warnings.append(
"Expiration of watch resulted in "
f"update of {txn.rowcount} rows, instead of 1."
)
[docs]
def find_time(self):
"""Find the start and end valid time of this watch
Returns:
(datetime, datetime): representing the time of this watch
"""
if self.action == self.CANCELS:
return (None, None)
tokens = re.findall(
"([0-3][0-9])([0-2][0-9])([0-6][0-9])Z - "
"([0-3][0-9])([0-2][0-9])([0-6][0-9])Z",
self.unixtext,
)
day1 = int(tokens[0][0])
hour1 = int(tokens[0][1])
minute1 = int(tokens[0][2])
day2 = int(tokens[0][3])
hour2 = int(tokens[0][4])
minute2 = int(tokens[0][5])
sts = utc(self.utcnow.year, self.utcnow.month, day1, hour1, minute1)
ets = utc(self.utcnow.year, self.utcnow.month, day2, hour2, minute2)
# If we are near the end of the month and the day1 is 1, add 1 month
if self.utcnow.day > 27 and day1 == 1:
sts += timedelta(days=+35)
sts = sts.replace(day=1)
if self.utcnow.day > 27 and day2 == 1:
ets += timedelta(days=+35)
ets = ets.replace(day=1)
return (sts, ets)
[docs]
def find_ww_num(self):
"""Find the Weather Watch Number
Returns:
(int): The Weather Watch Number
"""
tokens = NUM_RE.findall(self.unixtext)
if not tokens:
raise SAWException("Could not locate Weather Watch Number")
return int(tokens[0][0])
[docs]
def is_test(self):
"""Is this a test watch?
Returns:
boolean if this SAW is a test or not
"""
tokens = NUM_RE.findall(self.unixtext)
if not tokens:
raise SAWException("Could not locate Weather Watch Number")
return tokens[0][1] == "TEST"
[docs]
def find_ww_type(self):
"""Find the Weather Watch Type
Returns:
(int): The Weather Watch Type
"""
tokens = NUM_RE.findall(self.unixtext)
if not tokens:
raise SAWException("Could not locate Weather Watch Type")
if tokens[0][2] == "TORNADO":
return self.TORNADO
return self.SEVERE_THUNDERSTORM
[docs]
def find_polygon(self):
"""Search out the text for the LAT...LON polygon
Returns:
(str): Well Known Text (WKT) representation
"""
if self.action == self.CANCELS:
return None
tokens = LATLON.findall(self.unixtext.replace("\n", " "))
if not tokens:
raise SAWException("Could not parse LAT...LON geometry")
pts = []
for pair in tokens[0].split():
lat = float(pair[:4]) / 100.0
lon = 0 - float(pair[4:]) / 100.0
if lon > -40:
lon = lon - 100.0
pts.append((lon, lat))
return ShapelyPolygon(pts)
[docs]
def get_jabbers(self, uri, _uri2=None, **kwargs):
"""Generate the jabber messages for this product.
NOTE: In the past, the messages generated here have tripped twitter's
spam logic, so we are careful to craft unique messages
NOTE: Since interesting watch information comes within three products
from SPC, there is some magic in pyWWA/watch_parser.py that awaits
the arrival of all three.
Args:
uri (str): link to IEM Watch Overview page.
_uri2 (str): unused in this context.
wwpprod (WWPProduct): the WWPProduct object.
selprod (SELProduct): the SELProduct object.
"""
selprod = kwargs.get("selprod")
wwpprod = kwargs.get("wwpprod")
res = []
url = f"{SPCURL}/{self.valid.year}/ww{self.ww_num:04.0f}.html"
spc_channels = f"SPC,SPC.{DBTYPES[self.ww_type]}WATCH"
pds = False
if wwpprod is not None:
pds = wwpprod.data.is_pds
if pds:
spc_channels += f",{DBTYPES[self.ww_type][:2]}.PDS"
product_id = self.get_product_id()
if selprod is not None: # SEL is prettier
product_id = selprod.get_product_id()
html = "%s"
plain = "%s"
if self.action == self.CANCELS:
plain = (
"Storm Prediction Center cancels Weather Watch Number "
f"{self.ww_num} {url}"
)
html = (
f'<p>Storm Prediction Center cancels <a href="{url}">'
f"Weather Watch Number {self.ww_num}</a></p>"
)
res.append(
[plain, html, dict(channels=spc_channels, twitter=plain)]
)
# Now create templates
plain = (
"Storm Prediction Center cancels Weather Watch Number "
f"{self.ww_num} for portions of %s {url}"
)
html = (
f'<p>Storm Prediction Center cancels <a href="{url}">'
f"Weather Watch Number {self.ww_num}</a> "
"for portions of %s</p>"
)
elif self.action == self.ISSUES:
pds_extra = " (Particularly Dangerous Situation) " if pds else ""
plain = (
f"SPC issues {TYPE2STRING[self.ww_type]} Watch {self.ww_num} "
f"{pds_extra}till {self.ets:%-H:%M}Z"
)
html = (
"<p>Storm Prediction Center issues "
'<a href="https://www.spc.noaa.gov/products/watch/'
f'ww{self.ww_num:04.0f}.html">{TYPE2STRING[self.ww_type]} '
f"Watch {self.ww_num}</a> {pds_extra}"
f"till {self.ets:%-H:%M} UTC"
)
if self.replaces_num is not None:
plain += f", new watch replaces WW {self.replaces_num}"
html += f", new watch replaces WW {self.replaces_num}"
plain2 = f"{plain} {url}"
plain2 = " ".join(plain2.split())
xtra = {
"channels": spc_channels,
"twitter": f"#{plain2}",
"product_id": product_id,
}
html2 = html + (
f' (<a href="{uri}?year={self.sts.year}&num={self.ww_num}"'
">Watch "
"Quickview</a>)</p>"
)
res.append([plain2, html2, xtra])
# Now create templates
plain += f" for portions of %s {url}"
html += (
" for portions of %s "
f'(<a href="{uri}?year={self.sts.year}&num={self.ww_num}"'
">Watch Quickview</a>)</p>"
)
plain = " ".join(plain.split())
for wfo in self.affected_wfos:
xtra = {
"channels": f"{wfo}",
"twitter": plain % (wfo,),
"product_id": product_id,
}
res.append([plain % (wfo,), html % (wfo,), xtra])
return res
[docs]
def parser(text, utcnow=None, _ugc_provider=None, _nwsli_provider=None):
"""parser of raw SPC SAW Text
Args:
text (str): the raw text to parse
utcnow (datetime): the current datetime with timezone set!
Returns:
SAWProduct instance
"""
return SAWProduct(text, utcnow=utcnow)