"""Base Class encapsulating a NWS Text Product"""
import re
from collections import OrderedDict
from datetime import timedelta, timezone
from typing import Optional, Union
from shapely.geometry import MultiPolygon, Polygon
from shapely.wkt import dumps
from pyiem import reference
from pyiem.exceptions import InvalidPolygon, TextProductException
from pyiem.nws import hvtec, ugc
from pyiem.nws.vtec import VTEC
from pyiem.nws.vtec import parse as vtec_parse
from pyiem.util import LOG
from pyiem.wmo import WMOProduct
TIME_MOT_LOC = re.compile(
r"TIME\.\.\.MOT\.\.\.LOC\s+(?P<ztime>[0-9]{4})Z\s+"
r"(?P<dir>[0-9]{1,3})DEG\s+"
r"(?P<sknt>[0-9]{1,3})KT\s+(?P<loc>[0-9 ]+)"
)
LAT_LON_PREFIX = re.compile(r"LAT\.\.\.LON", re.IGNORECASE)
LAT_LON = re.compile(r"([0-9]{4,8})\s+")
# This is a legacy tag that is no longer used, but we want to continue to
# parse it.
WINDHAIL = re.compile(
r".*WIND\.\.\.HAIL (?P<winddir>[><]?)(?P<wind>[0-9]+)"
"(?P<windunits>MPH|KTS) "
r"(?P<haildir>[><]?)(?P<hail>[0-9\.]+)IN"
)
HAILTAG = re.compile(
r".*(HAIL|MAX HAIL SIZE)\.\.\.(?P<haildir>[><]?)(?P<hail>[0-9\.]+)\s?IN"
)
WINDTAG = re.compile(
r".*(WIND|MAX WIND GUST)\.\.\.(?P<winddir>[><]?)\s?(?P<wind>[0-9]+)\s?"
"(?P<windunits>MPH|KTS)"
)
TORNADOTAG = re.compile(
r".*TORNADO\.\.\.(?P<tornado>RADAR INDICATED|OBSERVED|POSSIBLE)"
)
SPOUTTAG = re.compile(
r".*(?P<species>LAND|WATER)SPOUT\.\.\."
"(?P<spout>RADAR INDICATED|OBSERVED|POSSIBLE)"
)
DAMAGETAG = re.compile(
r".*(?P<species>TORNADO|THUNDERSTORM) DAMAGE THREAT\.\.\."
"(?P<damage>CONSIDERABLE|SIGNIFICANT|CATASTROPHIC|DESTRUCTIVE)"
)
THREATTAG = re.compile(
r"(?P<species>HAIL|WIND) THREAT\.\.\."
r"(?P<tag>RADAR INDICATED|OBSERVED|POSSIBLE)"
)
FLOOD_TAGS = re.compile(
r".*(?P<key>FLASH FLOOD|FLASH FLOOD DAMAGE THREAT|EXPECTED RAINFALL|"
r"DAM FAILURE|LEVEE FAILURE)\.\.\.(?P<value>.*?)\n"
)
TORNADO = re.compile(r"^AT |^\* AT", re.I)
RESENT = re.compile(r"\.\.\.(RESENT|RETRANSMITTED|CORRECTED)")
EMERGENCY_RE = re.compile(r"(TORNADO|FLASH\s+FLOOD)\s+EMERGENCY", re.I)
PDS_RE = re.compile(
r"THIS\s+IS\s+A\s+PARTICULARLY\s+DANGEROUS\s+SITUATION", re.I
)
SQUALLTAG = re.compile(
r".*SNOW\s?SQUALL\.\.\.(?P<tag>RADAR INDICATED|OBSERVED)"
)
SQUALLIMPACTTAG = re.compile(
r".*SNOW\s?SQUALL IMPACT\.\.\.(?P<tag>SIGNIFICANT|GENERAL)"
)
EF_RE = re.compile(r"^Rating:\s*EF\s?\-?(?P<num>\d)\s*$", re.M | re.I)
ATTN_WFO = re.compile(
r"ATTN\.\.\.WFO\.\.\.([\.A-Z]*?)(?:LAT\.\.\.LON|ATTN\.\.\.RFC)"
)
ATTN_RFC = re.compile(r"ATTN\.\.\.RFC\.\.\.([\.A-Z]*)")
[docs]
def damage_survey_pns(prod, data):
"""Glean out things, hopefully."""
ffs = {}
for token in EF_RE.findall(prod.unixtext):
entry = ffs.setdefault(int(token), [])
entry.append(1)
data["maxf"] = ""
if ffs:
maxf = max(ffs.keys())
# More for testing than anything
if maxf > 5:
raise ValueError(f"Bad EF of {maxf} found")
data["maxf"] = f" (Max: EF{maxf})"
# approx and headline has starting space already :/
hdl = " " if len(data["headline"]) > 90 else f"{data['headline']} "
plain = (
f"{data['source']} issues Damage Survey PNS{data['maxf']} at "
f"{data['stamp']}{hdl}{data['url']}"
)
html = (
f'<p>{data["source"]} issues <a href="{data["url"]}">Damage Survey PNS'
f"</a>{data['maxf']} at {data['stamp']}{hdl}</p>"
)
return plain, html
[docs]
def checker(lon, lat, strdata):
"""make sure our values are within physical bounds"""
if lat >= 90 or lat <= -90:
raise TextProductException(f"invalid latitude {lat} from {strdata}")
if lon > 180 or lon < -180:
raise TextProductException(f"invalid longitude {lon} from {strdata}")
return (lon, lat)
[docs]
def str2polygon(strdata):
"""Convert some string data into a polygon"""
pts = []
partial = None
# If the last character is a digit, we need to add a space (legacy prods)
if strdata[-1].isdigit():
strdata += " "
# We have two potential formats, one with 4 or 5 places and one
# with eight!
vals = re.findall(LAT_LON, strdata)
for val in vals:
if len(val) == 8:
lat = float(val[:4]) / 100.00
lon = float(val[4:]) / 100.00
if lon < 40:
lon += 100.0
lon = 0 - lon
pts.append(checker(lon, lat, strdata))
else:
fval = float(val) / 100.00
if partial is None: # we have lat
partial = fval
continue
# we have a lon
if fval < 40:
fval += 100.0
fval = 0 - fval
pts.append(checker(fval, partial, strdata))
partial = None
if not pts:
return None
if pts[0][0] != pts[-1][0] and pts[0][1] != pts[-1][1]:
pts.append(pts[0])
if len(pts) < 3:
raise InvalidPolygon(f"Less than three points for polygon {pts}")
return Polygon(pts)
[docs]
def qc_is_emergency(seg):
"""Belt + Suspenders check that this segment is an emergency."""
ffdt = seg.flood_tags.get("FLASH FLOOD DAMAGE THREAT")
# NOOPS
if (
not seg.is_emergency
and seg.damagetag != "CATASTROPHIC"
and ffdt != "CATASTROPHIC"
):
return
# Auto qualifier
tag_confirms = "CATASTROPHIC" in [seg.damagetag, ffdt]
if tag_confirms:
seg.is_emergency = True
return
# It is 2023, we need to be tag confirmed.
if seg.is_emergency and not tag_confirms and seg.tp.valid.year >= 2023:
seg.tp.warnings.append(
"Segment indicated emergency, but tags did not confirm and "
"product is >= 2023, so setting no emergency."
)
seg.is_emergency = False
return
# Oh, we have work to do
has_tags = seg.damagetag is not None or ffdt is not None
# tags do not confirm the emergency
if seg.is_emergency and has_tags and not tag_confirms:
seg.tp.warnings.append(
"Segment indicated emergency, but tags negated it.\n"
f"tag_confirms: {tag_confirms} damagetag: {seg.damagetag} "
f"ffdt: {ffdt}"
)
seg.is_emergency = False
return
# this segment is a CAN, EXP VTEC sole action
if seg.is_emergency and seg.vtec and seg.vtec[0].action in ["CAN", "EXP"]:
seg.tp.warnings.append(
"Segment indicated emergency, but VTEC action is expiring."
)
seg.is_emergency = False
[docs]
class TextProductSegment:
"""A segment of a Text Product"""
def __init__(self, text, tp: "TextProduct"):
"""Constructor"""
# Poor name shadow to self.tp, but different
self.unixtext = text
self.tp = tp # Reference to parent
self.vtec: list[VTEC] = vtec_parse(text)
self.ugcs, self.ugcexpire = ugc.parse(
text,
tp.valid,
ugc_provider=tp.ugc_provider,
is_firewx=any(v.phenomena == "FW" for v in self.vtec),
)
self.headlines = self.parse_headlines()
self.hvtec = hvtec.parse(text, tp=tp)
# TIME...MOT...LOC Stuff!
self.tml_giswkt = None
self.tml_valid = None
self.tml_sknt = None
self.tml_dir = None
self.process_time_mot_loc()
#
self.giswkt = None
try:
self.sbw = self.process_latlon()
except InvalidPolygon as exp:
tp.warnings.append(str(exp))
self.sbw = None
# tags
self.windtag = None
self.windtagunits = None
self.windthreat = None
self.hailtag = None
self.haildirtag = None
self.hailthreat = None
self.winddirtag = None
self.tornadotag = None
self.waterspouttag = None
self.landspouttag = None
self.damagetag = None
self.squalltag = None
# allows for deterministic testing of results
self.flood_tags = OrderedDict()
self.is_emergency = False
self.is_pds = False
self.process_tags()
qc_is_emergency(self)
self.bullets = self.process_bullets()
[docs]
def get_ugcs_tuple(self):
"""Helper to return a tuple useful for SQL."""
return tuple(self.get_ugcs_list())
[docs]
def get_ugcs_list(self):
"""Helper to return a list useful for SQL."""
return [str(u) for u in self.ugcs]
[docs]
def get_hvtec_nwsli(self):
"""Return the first hvtec NWSLI entry, if it exists"""
if not self.hvtec:
return None
return self.hvtec[0].nwsli.id
[docs]
def get_hvtec_cause(self):
"""Return the first hvtec cause entry, if it exists"""
if not self.hvtec:
return None
return self.hvtec[0].cause
[docs]
def get_hvtec_severity(self):
"""Return the first hvtec severity entry, if it exists"""
if not self.hvtec:
return None
return self.hvtec[0].severity
[docs]
def get_hvtec_record(self):
"""Return the first hvtec record entry, if it exists"""
if not self.hvtec:
return None
return self.hvtec[0].record
[docs]
def svs_search(self):
"""Special search the product for special text"""
sections = self.unixtext.split("\n\n")
for section in sections:
if TORNADO.findall(section):
return " ".join(section.replace("\n", " ").split())
return ""
[docs]
def process_bullets(self):
"""Figure out the bulleted segments"""
parts = re.findall(r"^\*([^\*]*)", self.unixtext, re.M | re.DOTALL)
bullets = []
for part in parts:
pos = part.find("\n\n")
if pos > 0:
part = part[:pos]
# look for subheadings :/
piece = ""
for line in part.split("\n"):
if line.strip().startswith("- "):
if piece != "":
bullets.append(piece)
piece = line.split("- ", 1)[1]
continue
if piece != "":
piece += f" {line} "
if piece != "":
bullets.append(piece)
bullets.append(" ".join(part.replace("\n", "").split()))
# Cleanup
return [" ".join(b.split()) for b in bullets]
[docs]
def process_tags(self):
"""Find various tags in this segment"""
nolf = self.unixtext.replace("\n", " ")
res = EMERGENCY_RE.findall(nolf)
if res and self.vtec:
# Ensure that the emergency RE matches the segment phenomena
# We later double check this based on the found tags
if (
res[0].upper() == "FLASH FLOOD"
and self.vtec[0].phenomena == "FF"
):
self.is_emergency = True
if res[0].upper() == "TORNADO" and self.vtec[0].phenomena == "TO":
self.is_emergency = True
res = PDS_RE.findall(nolf)
if res:
self.is_pds = True
match = WINDHAIL.match(nolf)
if match:
gdict = match.groupdict()
self.windtag = gdict["wind"]
self.windtagunits = gdict["windunits"]
self.haildirtag = gdict["haildir"]
self.winddirtag = gdict["winddir"]
self.hailtag = gdict["hail"]
match = WINDTAG.match(nolf)
if match:
gdict = match.groupdict()
self.winddirtag = gdict["winddir"]
self.windtag = gdict["wind"]
self.windtagunits = gdict["windunits"]
match = HAILTAG.match(nolf)
if match:
gdict = match.groupdict()
self.haildirtag = gdict["haildir"]
self.hailtag = gdict["hail"]
match = TORNADOTAG.match(nolf)
if match:
gdict = match.groupdict()
self.tornadotag = gdict["tornado"]
match = DAMAGETAG.match(nolf)
if match:
gdict = match.groupdict()
self.damagetag = gdict["damage"]
for species, tag in THREATTAG.findall(nolf):
if species == "HAIL":
self.hailthreat = tag
elif species == "WIND":
self.windthreat = tag
match = SQUALLTAG.match(nolf)
if match:
gdict = match.groupdict()
self.squalltag = gdict["tag"]
match = SQUALLIMPACTTAG.match(nolf)
if match:
gdict = match.groupdict()
self.damagetag = gdict["tag"]
match = SPOUTTAG.match(nolf)
if match:
gdict = match.groupdict()
if gdict["species"] == "WATER":
self.waterspouttag = gdict["spout"]
else:
self.landspouttag = gdict["spout"]
for token in FLOOD_TAGS.findall(self.unixtext):
self.flood_tags[token[0]] = token[1]
[docs]
def special_tags_to_text(self):
"""
Convert the special tags into a nice text
"""
if all(
[
self.windtag is None,
self.tornadotag is None,
self.hailtag is None,
self.damagetag is None,
self.waterspouttag is None,
self.landspouttag is None,
self.squalltag is None,
not self.flood_tags,
]
):
return ""
parts = []
if self.squalltag is not None:
parts.append(f"snow squall: {self.squalltag}")
if self.tornadotag is not None:
parts.append(f"tornado: {self.tornadotag}")
if self.waterspouttag is not None:
parts.append(f"waterspout: {self.waterspouttag}")
if self.landspouttag is not None:
parts.append(f"landspout: {self.landspouttag}")
if self.damagetag is not None:
if self.vtec and self.vtec[0].phenomena == "SQ":
parts.append(f"impact threat: {self.damagetag}")
else:
parts.append(f"damage threat: {self.damagetag}")
if self.windtag is not None:
_p1 = self.winddirtag.replace(">", ">").replace("<", "<")
_p2 = "" if self.windthreat is None else f" ({self.windthreat})"
parts.append(f"wind: {_p1}{self.windtag} {self.windtagunits}{_p2}")
if self.hailtag is not None:
_p1 = self.haildirtag.replace(">", ">").replace("<", "<")
_p2 = "" if self.hailthreat is None else f" ({self.hailthreat})"
parts.append(f"hail: {_p1}{self.hailtag} IN{_p2}")
for k, v in self.flood_tags.items():
parts.append(f"{k.lower()}: {v.lower()}")
return f" [{', '.join(parts)}] "
[docs]
def process_latlon(self):
"""Parse the segment looking for the 'standard' LAT...LON encoding"""
data = self.unixtext.replace("\n", " ")
search = LAT_LON_PREFIX.search(data)
if search is None:
return None
pos = search.start()
newdata = data[pos + 9 :]
# Go find our next non-digit, non-space character, if we find it, we
# should truncate our string, this could be improved, I suspect
search = re.search(r"[^\s0-9]", newdata)
if search is not None:
pos2 = search.start()
newdata = newdata[:pos2]
poly = str2polygon(newdata)
if poly is None:
return None
# check 0, PGUM polygons are east longitude akrherz/pyIEM#74
if self.tp.source == "PGUM":
newpts = [[0 - pt[0], pt[1]] for pt in poly.exterior.coords]
poly = Polygon(newpts)
# check 1, is the polygon valid?
if not poly.is_valid:
poly = poly.buffer(0)
# Careful, this could return a multipolygon
if isinstance(poly, MultiPolygon):
if len(poly.geoms) > 2:
self.tp.warnings.append(
"LAT...LON buffer(0) returned 3+ polygon, culling."
)
return None
# Life choices here, if we have a bowtie and the one side is
# very small, we should cull it and just use the larger side
polys = list(poly.geoms)
polys = sorted(polys, key=lambda x: x.area)
if (
polys[0].is_valid
and polys[1].is_valid
and polys[0].area * 10 < polys[1].area
):
self.tp.warnings.append(
"LAT...LON buffer(0) made 2 polys, taking biggest."
)
poly = polys[1]
else:
self.tp.warnings.append(
"LAT...LON buffer(0) made 2 polys with invalids."
)
return None
else:
self.tp.warnings.append(
"LAT...LON polygon is invalid, but buffer(0) fixed it!"
)
# check 2, is the exterior ring of the polygon clockwise?
if poly.exterior.is_ccw:
# No longer a warning as it was too much noise
LOG.debug(
"LAT...LON polygon exterior is CCW, reversing\n%s",
poly.exterior.xy,
)
poly = Polygon(
zip(
poly.exterior.xy[0][::-1],
poly.exterior.xy[1][::-1],
strict=False,
)
)
# NB: the encoding parsed above should always have just two decimals
self.giswkt = (
f"SRID=4326;{dumps(MultiPolygon([poly]), rounding_precision=2)}"
)
return poly
[docs]
def process_time_mot_loc(self):
"""Try to parse the TIME...MOT...LOC"""
pos = self.unixtext.find("TIME...MOT...LOC")
if pos == -1:
return
if self.unixtext[pos - 1] != "\n":
self.tp.warnings.append(
"process_time_mot_loc segment likely has "
"poorly formatted TIME...MOT...LOC"
)
search = TIME_MOT_LOC.search(self.unixtext)
if not search:
self.tp.warnings.append(
"process_time_mot_loc segment find OK, but regex failed..."
)
return
gdict = search.groupdict()
if len(gdict["ztime"]) != 4 or self.ugcexpire is None:
return
hh = int(gdict["ztime"][:2])
mi = int(gdict["ztime"][2:])
self.tml_valid = self.ugcexpire.replace(hour=hh, minute=mi)
if hh > self.ugcexpire.hour:
self.tml_valid = self.tml_valid - timedelta(days=1)
self.tml_valid = self.tml_valid.replace(tzinfo=timezone.utc)
tokens = gdict["loc"].split()
lats = []
lons = []
if len(tokens) % 2 != 0:
tokens = tokens[:-1]
if not tokens:
return
for i in range(0, len(tokens), 2):
lats.append(float(tokens[i]) / 100.0)
lons.append(0 - float(tokens[i + 1]) / 100.0)
if len(lats) == 1:
self.tml_giswkt = f"SRID=4326;POINT({lons[0]} {lats[0]})"
else:
pairs = []
for lat, lon in zip(lats, lons, strict=False):
pairs.append(f"{lon} {lat}")
self.tml_giswkt = f"SRID=4326;LINESTRING({','.join(pairs)})"
self.tml_sknt = int(gdict["sknt"])
self.tml_dir = int(gdict["dir"])
[docs]
def parse_headlines(self):
"""Find headlines in this segment"""
res = []
candidate = ""
for line in [s.strip() for s in self.unixtext.split("\n")]:
if line.startswith("..."):
candidate = line
elif candidate != "":
candidate += " " + line
if line.endswith("..."):
if 6 < len(candidate) < 160: # arb
res.append(candidate.replace("...", ""))
candidate = ""
return res
[docs]
def get_affected_wfos(self) -> list[str]:
"""Based on the ugc_provider, figure out which WFOs are impacted by
this product segment"""
affected_wfos = []
for _ugc in self.ugcs:
affected_wfos.extend(_ugc.wfos)
return list(set(affected_wfos))
[docs]
class TextProduct(WMOProduct):
"""class representing a NWS Text Product"""
def __init__(
self,
text,
utcnow=None,
ugc_provider: Optional[Union[ugc.UGCProvider, dict]] = None,
nwsli_provider=None,
parse_segments=True,
):
"""
Constructor
@param text string single text product
@param utcnow used to compute offsets for when this product my be valid
@param ugc_provider dict or UGCProvider instance
@param parse_segments should the segments be parsed as well? True
"""
super().__init__(text, utcnow=utcnow)
if isinstance(ugc_provider, dict):
ugc_provider = ugc.UGCProvider(legacy_dict=ugc_provider)
if ugc_provider is None:
ugc_provider = ugc.UGCProvider()
if nwsli_provider is None:
nwsli_provider = {}
self.ugc_provider = ugc_provider
self.nwsli_provider = nwsli_provider
self.sections = self.unixtext.split("\n\n")
self.segments: list[TextProductSegment] = []
self.geometry = None
if parse_segments:
self.parse_segments()
[docs]
def suv_iter(self):
"""Yield [(segment, ugcs, vtec)] combos found in product."""
for segment in self.segments:
if not segment.ugcs or not segment.vtec:
continue
for _vtec in segment.vtec:
if _vtec.status == "T" or _vtec.action == "ROU":
continue
yield (segment, segment.ugcs, _vtec)
[docs]
def is_resent(self):
"""Check to see if this product is a ...RESENT product"""
return self.unixtext.find("...RESENT") > 0
[docs]
def is_correction(self):
"""Is this product a correction?
Sadly, this is not as easy as it should be. It turns out that some
products do not have a proper correction mechanism, so offices will
just brute force in a note into the MND header. So we have to do
some further checking...
Returns:
bool: Is this product a correction?
"""
# OK, go looking for RESENT style tags, assume it happens within first
# 300 chars
if RESENT.search(self.text[:300]):
return True
if self.bbb is None or not self.bbb:
return False
if self.bbb[0] in ["A", "C"]:
return True
return False
[docs]
def parse_attn_rfc(self):
"""Figure out which RFCs this product is seeking attention"""
tokens = ATTN_RFC.findall(self.unixtext.replace("\n", ""))
if not tokens:
return []
rfcs = re.findall("([A-Z]{3,5})", tokens[0])
# le sigh
if "LAT" in rfcs:
rfcs = rfcs[: rfcs.index("LAT")]
return rfcs
[docs]
def parse_attn_wfo(self):
"""Figure out which WFOs this product is seeking attention"""
tokens = ATTN_WFO.findall(self.unixtext.replace("\n", ""))
if not tokens:
return []
return re.findall("([A-Z]{3})", tokens[0])
[docs]
def get_channels(self):
"""Return a list of channels"""
res = [self.afos, f"{self.afos[:3]}..."]
if self.afos[:3] in ["TCU", "TCD", "TCM", "TCP", "TWO"]:
res.append(self.afos[:5])
elif self.afos == "AHDNWC":
# Get the ATTN
res.extend(self.parse_attn_wfo())
res.extend(self.parse_attn_rfc())
return res
[docs]
def get_nicedate(self):
"""Nicely format the issuance time of this product"""
if self.valid is None:
return "(unknown issuance time)"
localts = self.valid
fmt = "%b %-d, %H:%M UTC"
if self.tz is not None:
localts = self.valid.astimezone(self.tz)
# A bit of complexity as offices may not implement daylight saving
if self.z.endswith("ST") and localts.dst():
localts -= timedelta(hours=1)
fmt = "%b %-d, %-I:%M %p " + self.z
return localts.strftime(fmt)
[docs]
def get_main_headline(self, default=""):
"""Return a string for the main headline, if it exists"""
for segment in self.segments:
if segment.headlines:
return segment.headlines[0]
return default
[docs]
def get_jabbers(self, uri, _uri2=None):
"""Return a tuple of jabber messages [(plain, html, xtra_dict)]
Args:
uri (str): the base URI to use to construct links
Returns:
[(str, str, dict)]
"""
templates = [
"%(source)s issues %(name)s (%(aaa)s) at %(stamp)s%(headline)s ",
"%(source)s issues %(name)s (%(aaa)s) at %(stamp)s ",
]
aaa = "" if self.afos is None else self.afos[:3]
hdl = self.get_main_headline()
data = {
"headline": f" ...{hdl}..." if hdl != "" else "",
"source": self.source[1:],
"aaa": aaa,
"name": reference.prodDefinitions.get(
self.afos, reference.prodDefinitions.get(aaa, self.afos)
),
"stamp": self.get_nicedate(),
"url": f"{uri}?pid={self.get_product_id()}",
}
plain = (templates[0] + "%(url)s") % data
html = (
'<p>%(source)s issues <a href="%(url)s">%(name)s (%(aaa)s)</a>'
" at %(stamp)s</p>"
) % data
tweet = templates[0] % data
if (len(tweet) - 25) > reference.TWEET_CHARS:
tweet = templates[1] % data
tweet += data["url"]
xtra = {
"channels": ",".join(self.get_channels()),
"product_id": self.get_product_id(),
"twitter": f"#{tweet}",
}
if self.segments and self.segments[0].sbw:
# Generic product has a polygon, so we add special twitter message
xtra["twitter_media"] = (
"https://mesonet.agron.iastate.edu/plotting/auto/plot/227/"
f"pid:{self.get_product_id()}::segnum:0.png"
)
if (
self.afos is not None
and self.afos[:3] == "PNS"
and self.unixtext.upper().find("DAMAGE SURVEY") > -1
):
try:
plain, html = damage_survey_pns(self, data)
xtra["twitter"] = plain
xtra["channels"] += ",DAMAGEPNS"
except Exception as exp:
self.warnings.append(f"Hit exception {exp} in damage_survey")
return [(plain, html, xtra)]
[docs]
def get_signature(self) -> Optional[str]:
"""Attempt to glean the free form text that is a signature."""
lines = [x.strip() for x in self.unixtext.split("\n") if x.strip()]
res = None
for line in lines[::-1][:3]:
# We found something that should have come before the signature
if line in ["$$", "&&"] or len(line) > 24:
break
if line not in ["$", "&"]:
res = line
break
return res
[docs]
def parse_segments(self):
"""Split the product by its $$"""
segs = self.unixtext.split("$$")
for seg in segs:
self.segments.append(TextProductSegment(seg, self))
[docs]
def get_affected_wfos(self):
"""Based on the ugc_provider, figure out which WFOs are impacted by
this product"""
affected_wfos = []
for segment in self.segments:
for ugcs in segment.ugcs:
for wfo in ugcs.wfos:
if wfo not in affected_wfos:
affected_wfos.append(wfo)
return affected_wfos