"""TAF Parsing"""
# stdlib
import re
from datetime import datetime, timedelta
from pyiem import reference
from pyiem.models.taf import SkyCondition, TAFForecast, TAFReport, WindShear
# local
from pyiem.nws.product import TextProduct
TEMPO_TIME = re.compile(r"^(?P<ddhh1>\d{4})/(?P<ddhh2>\d{4}) ")
TEMPO_TIME_LEGACY = re.compile(r"^(?P<hrhr2>\d{4}) ")
STID_VALID = re.compile(
r"^(?P<station>[A-Z0-9]{3,4}) (AMD|TAF)?\s?(?P<ddhhmi>\d{6})Z? ",
re.MULTILINE,
)
WIND_RE = re.compile(r"(?P<dir>\d{3})(?P<sknt>\d{2,3})G?(?P<gust>\d{2,3})?KT")
VIS_RE = re.compile(r" (?P<over>P?)(?P<miles>[1-6])?\s?(?P<frac>\d/\d+)?SM")
WX_RE = re.compile(r"^([\-\+A-Z]+)$")
CLOUD_RE = re.compile(r" (?P<skyc>SCT|OVC|VV|BKN|FEW)(?P<skyl>\d{3})")
SHEAR_RE = re.compile(
r" WS(?P<level>\d{3})/(?P<drct>\d{3})(?P<sknt>\d{2,3})KT"
)
# Lame redefinition of what's in the database for ftype column
FTYPE = {
"OB": 0,
"FM": 1,
"TEMPO": 2,
"PROB30": 3,
"PROB40": 4,
"BECMG": 5,
}
[docs]
def add_forecast_info(fx, text):
"""Common things."""
m = WIND_RE.search(text)
if m:
d = m.groupdict()
fx.sknt = int(d["sknt"])
fx.drct = int(d["dir"])
fx.gust = int(d["gust"] or 0)
m = VIS_RE.search(text)
if m:
d = m.groupdict()
fx.visibility = int(d["miles"] or 0)
if d.get("over") == "P" and fx.visibility == 6:
fx.visibility = reference.TAF_VIS_OVER_6SM
if d["frac"] is not None:
tokens = d["frac"].split("/")
fx.visibility += float(tokens[0]) / float(tokens[1])
# This may be too clever and saying anything without a number is presentwx
fx.presentwx = [x for x in text.split() if WX_RE.match(x)]
if "SKC" in fx.presentwx:
fx.presentwx.remove("SKC")
fx.sky.append(SkyCondition(amount="SKC", level=None))
else:
for token in CLOUD_RE.findall(text):
fx.sky.append(
SkyCondition(amount=token[0], level=int(token[1]) * 100)
)
for token in SHEAR_RE.findall(text):
fx.shear = WindShear(
level=int(token[0]) * 100,
drct=int(token[1]),
sknt=int(token[2]),
)
[docs]
def make_qualifier(prod: TextProduct, text: str, ftype_str: str):
"""Parse a tempo group."""
text = text.replace(f"{ftype_str} ", "")
# Convert the ddhr/ddhr
m = TEMPO_TIME.search(text)
if m:
d = m.groupdict()
sts = ddhhmi2valid(prod, d["ddhh1"] + "00", prod.valid)
ets = ddhhmi2valid(prod, d["ddhh2"] + "00", prod.valid)
else:
m = TEMPO_TIME_LEGACY.search(text)
if m is None:
return None
d = m.groupdict()
sts = ambiguous_hour_magic(d["hrhr2"][:2] + "00", prod.valid)
ets = ambiguous_hour_magic(d["hrhr2"][2:] + "00", prod.valid)
fx = TAFForecast(
ftype=FTYPE[ftype_str],
valid=sts,
end_valid=ets,
raw=f"{ftype_str} {' '.join(text.split()).replace('=', '').strip()}",
)
add_forecast_info(fx, text)
return fx
[docs]
def make_forecast(
prod: TextProduct, text: str, obvalid: datetime
) -> TAFForecast:
"""Build a TAFForecast data model."""
valid = ddhhmi2valid(prod, text[2:8], obvalid)
fx = TAFForecast(
valid=valid,
raw=" ".join(text.split()).replace("=", "").strip(),
ftype=FTYPE["FM"],
)
add_forecast_info(fx, text)
return fx
[docs]
def ambiguous_hour_magic(text: str, obvalid: datetime) -> datetime:
"""Make some magic happen for ambiguous dates."""
hr = int(text[:2])
mi = int(text[2:4])
if hr == 24:
valid = obvalid.replace(hour=0, minute=mi) + timedelta(days=1)
else:
valid = obvalid.replace(hour=hr, minute=mi)
if valid.hour < obvalid.hour:
valid += timedelta(days=1)
return valid
[docs]
def ddhhmi2valid(prod: TextProduct, text: str, obvalid: datetime) -> datetime:
"""Figure out what valid time this is."""
# Account for 4 character timestamp, likely hour and minute
if text[4] == " ":
return ambiguous_hour_magic(text[:4], obvalid)
dd = int(text[:2])
hr = int(text[2:4])
mi = int(text[4:6])
if hr == 24:
valid = prod.valid.replace(day=dd, hour=0, minute=mi) + timedelta(
days=1
)
elif hr < 0 or hr > 23:
raise ValueError(f"Found invalid hr: {hr} from '{text}'")
else:
valid = prod.valid.replace(hour=hr, minute=mi)
# Next month
if valid.day > 20 and dd < 3:
valid += timedelta(days=14)
elif valid.day == 1 and dd > 20:
valid -= timedelta(days=14)
if hr < 24:
valid = valid.replace(day=dd)
return valid
[docs]
def parse_prod(prod: TextProduct, segtext: str) -> TAFReport:
"""Generate a data object from this product."""
m = STID_VALID.search(segtext)
d = m.groupdict()
lines = []
if "=" not in segtext:
segtext += "="
meat = segtext[m.end() : segtext.find("=")]
accum = ""
for token in [x.strip() for x in meat.splitlines()]:
if token.startswith(("FM", "TEMPO", "BECMG", "PROB")):
if accum != "":
lines.append(accum)
accum = token
else:
accum += f" {token}"
if accum != "":
lines.append(accum)
# Deal with the observation
valid = ddhhmi2valid(prod, d["ddhhmi"], prod.valid)
data = TAFReport(
station=d["station"] if len(d["station"]) == 4 else f"K{d['station']}",
valid=valid,
product_id=prod.get_product_id(),
observation=TAFForecast(
valid=valid,
raw=" ".join(lines[0].split()).strip(),
ftype=FTYPE["OB"],
),
)
add_forecast_info(data.observation, lines[0])
# Double check lines[0] for stuff
parts = re.split(r"(TEMPO|PROB30|PROB40|BECMG)", lines[0])
if len(parts) > 1:
data.observation.raw = parts[0].strip()
# Insert into lines
lines.insert(1, f"{parts[1]} {parts[2]}")
for token in lines[1:]:
diction = None
for part in re.split(r"(TEMPO|PROB30|PROB40|BECMG)", token):
if part == "":
continue
if part.startswith("FM"):
forecast = make_forecast(
prod, part.strip(), data.observation.valid
)
if forecast is not None:
data.forecasts.append(forecast)
continue
if diction is not None:
forecast = make_qualifier(prod, part.strip(), diction)
if forecast is None:
prod.warnings.append(
f"Failed to parse {part.strip()} with {diction}"
)
if forecast is not None:
data.forecasts.append(forecast)
diction = None
continue
diction = part.strip()
return data
[docs]
class TAFProduct(TextProduct):
"""
Represents a TAF
"""
def __init__(
self, text, utcnow=None, ugc_provider=None, nwsli_provider=None
):
"""constructor"""
# Prevent expensive and unnecessary dblookup
if ugc_provider is None:
ugc_provider = {}
super().__init__(text, utcnow, ugc_provider, nwsli_provider)
self.data: list[TAFReport] = []
for token in self.unixtext.split("="):
if len(token) < 10: # arb
continue
self.data.append(parse_prod(self, token.strip().lstrip("TAF ")))
[docs]
def get_channels_for_report(self, report: TAFReport) -> list[str]:
"""Return a list of channels"""
return [f"TAF{report.station[1:]}", "TAF...", f"{self.source}.TAF"]
[docs]
def sql(self, txn):
"""Persist to the database."""
for taf in self.data:
# Product corrections are not really accounted for here due to
# performance worries
# Create an entry
txn.execute(
"INSERT into taf(station, valid, product_id) "
"VALUES (%s, %s, %s) RETURNING id",
(taf.station, taf.valid, self.get_product_id()),
)
taf_id = txn.fetchone()["id"]
# Insert obs / forecast
for entry in [taf.observation, *taf.forecasts]:
txn.execute(
"INSERT into taf_forecast(taf_id, valid, raw, "
"end_valid, sknt, drct, gust, visibility, presentwx, "
"skyc, skyl, ws_level, ws_drct, ws_sknt, ftype) VALUES "
"(%s, %s, %s, %s, "
"%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
(
taf_id,
entry.valid,
entry.raw,
entry.end_valid,
entry.sknt,
entry.drct,
entry.gust,
entry.visibility,
entry.presentwx,
[x.amount for x in entry.sky],
[x.level for x in entry.sky],
None if entry.shear is None else entry.shear.level,
None if entry.shear is None else entry.shear.drct,
None if entry.shear is None else entry.shear.sknt,
entry.ftype,
),
)
[docs]
def get_jabbers(self, uri, _uri2=None):
"""Get the jabber variant of this message"""
res = []
url = f"{uri}?pid={self.get_product_id()}"
aaa = "TAF"
nicedate = self.get_nicedate()
label = reference.prodDefinitions.get(aaa, aaa)
for taf in self.data:
plain = (
f"{self.source[1:]} issues {label} ({aaa}) at {nicedate} for "
f"{taf.station[1:]} {url}"
)
html = (
f"<p>{self.source[1:]} issues "
f'<a href="{url}">{label} ({aaa})</a> '
f"at {nicedate} for {taf.station[1:]}</p>"
)
xtra = {
"channels": ",".join(self.get_channels_for_report(taf)),
"product_id": self.get_product_id(),
"twitter": plain,
"twitter_media": (
"https://mesonet.agron.iastate.edu/plotting/auto/plot/219/"
f"station:{taf.station}::valid:"
f"{taf.valid.strftime('%Y-%m-%d%%20%H%M')}.png"
),
}
res.append((plain, html, xtra))
return res
[docs]
def parser(text, utcnow=None, ugc_provider=None, nwsli_provider=None):
"""Helper function"""
return TAFProduct(text, utcnow, ugc_provider, nwsli_provider)