"""Pilot Reports (PIREP)
This module attempts to process and store atomic data from PIREPs. These are
encoded products that look like so:
UBUS01 KMSC 221700
EAU UA /OV EAU360030/TM 1715/FL350/TP B737/TB CONT LGT-MOD CHOP =
EHY UA /OV MBW253036 /TM 1729 /FL105 /TP C206 /SK FEW250 /TA M06
/TB NEG /RM SMTH=
Unfortunately, there is not much documentation of this format and the feed of
this data contains a bunch of formatting errors.
"""
import math
import re
from datetime import timedelta
from typing import List
from metpy.units import units
from pyiem.models.pirep import PilotReport, Priority
from pyiem.nws.product import TextProduct
from pyiem.util import LOG, html_escape
FLIGHT_LEVEL = re.compile(r"FL(?P<levelx100>[0-9]{3})")
OV_LATLON = re.compile(
(
r"\s?(?P<lat>[0-9]{2,4})(?P<latsign>[NS])"
r"\s?(?P<lon>[0-9]{2,5})(?P<lonsign>[EW])"
)
)
OV_LOCDIR = re.compile(
r".*?(?P<loc>[A-Z0-9]{3,4})\s?(?P<dir>[0-9]{3})(?P<dist>[0-9]{3})"
)
OV_TWOLOC = re.compile(
r"(?P<loc1>[A-Z0-9]{3,4})\s?-\s?(?P<loc2>[A-Z0-9]{3,4})"
)
OV_OFFSET = re.compile(
(
r"(?P<dist>[0-9]{1,3})\s?"
"(?P<dir>NORTH|EAST|SOUTH|WEST|N|NNE|NE|ENE|E|ESE|"
r"SE|SSE|S|SSW|SW|WSW|W|WNW|NW|NNW)\s+(OF )?(?P<loc>[A-Z0-9]{3,4})"
)
)
DRCT2DIR = {
"N": 0,
"NNE": 22.5,
"NE": 45,
"ENE": 67.5,
"E": 90,
"ESE": 112.5,
"SE": 135,
"SSE": 157.5,
"S": 180,
"SSW": 202.5,
"SW": 225,
"WSW": 247.5,
"W": 270,
"WNW": 292.5,
"NW": 305,
"NNW": 327.5,
"NORTH": 0,
"EAST": 90,
"SOUTH": 180,
"WEST": 270,
}
def _rectify_identifier(station, textprod):
"""Rectify the station identifer to IEM Nomenclature."""
station = station.strip()
if len(station) == 4 and station.startswith("K"):
return station[1:]
if len(station) == 3 and not textprod.source.startswith("K"):
return textprod.source[0] + station
return station
def _parse_lonlat(text):
"""Convert string into lon, lat values"""
# 2500N07000W
# -or- 25N070W -or- 25N70W
# FMH-12 says this is in degrees and minutes!
d = re.match(OV_LATLON, text).groupdict()
if len(d["lat"]) == 2 and len(d["lon"]) <= 3:
# We have integer values :/
lat = int(d["lat"])
lon = int(d["lon"])
else:
# We have Degrees and minutes
_d = int(float(d["lat"][-2:]) / 60.0 * 10000.0)
lat = float(f"{d['lat'][:-2]}.{_d:.0f}")
_d = int(float(d["lon"][-2:]) / 60.0 * 10000.0)
lon = float(f"{d['lon'][:-2]}.{_d:.0f}")
if d["latsign"] == "S":
lat *= -1
if d["lonsign"] == "W":
lon *= -1
return lon, lat
[docs]
class Pirep(TextProduct):
"""Class for parsing and representing PIREPs found in NWS text products."""
def __init__(
self, text, utcnow=None, ugc_provider=None, nwsli_provider=None
):
"""constructor"""
super().__init__(
text,
utcnow=utcnow,
ugc_provider=ugc_provider,
nwsli_provider=nwsli_provider,
)
if self.afos is None:
self.afos = "PIREP"
self.reports: List[PilotReport] = []
self.parse_reports()
[docs]
def parse_reports(self):
"""Actually do the parsing of the product that generates the reports
stored within the self.reports list"""
# unixtext is conditioned at this point, so LDM, WMO and perhaps AFOS
lines = self.unixtext.split("\n")
# There may be an AWIPSID in line 3 or silly aviation control char
pos = 3 if len(lines[2]) < 10 or lines[2].startswith("\x1e") else 2
meat = " ".join(lines[pos:])
for report in meat.split("="):
if report.strip() == "":
continue
res = self.process_pirep(" ".join(report.strip().split()))
if res is not None and res.valid is not None:
self.reports.append(res)
[docs]
def process_pirep(self, report):
"""Convert this report text into an actual PIREP object"""
_pr = PilotReport()
_pr.text = report
for i, token in enumerate(report.split("/")):
token = token.strip()
# First token is always priority
if i == 0:
if len(token) > 10:
LOG.warning("Aborting as not-PIREP? |%s|", report)
return None
if token.find(" UUA") > 0:
_pr.priority = Priority.UUA
else:
_pr.priority = Priority.UA
parts = token.split()
if len(parts) == 2:
_pr.base_loc = parts[0]
if len(_pr.base_loc) == 4 and _pr.base_loc[0] == "K":
_pr.base_loc = _pr.base_loc[1:]
continue
# Flight Level
if token.startswith("FL"):
m = re.match(FLIGHT_LEVEL, token)
if m:
_pr.flight_level = int(m.group("levelx100")) * 100
continue
# Aircraft Type
if token.startswith("TP "):
_pr.aircraft_type = token[3:]
continue
# Location
if token.startswith("OV "):
dist = 0
bearing = 0
therest = token[3:]
if len(therest) == 3:
loc = _rectify_identifier(therest, self)
elif therest.startswith("FINAL RWY"):
loc = _rectify_identifier(report[:8].split()[0], self)
elif len(therest) == 4:
loc = _rectify_identifier(therest, self)
elif re.match(OV_OFFSET, therest):
d = re.match(OV_OFFSET, therest).groupdict()
loc = _rectify_identifier(d["loc"], self)
dist = int(d["dist"])
bearing = DRCT2DIR[d["dir"]]
elif therest.find("-") > 0 and re.match(OV_TWOLOC, therest):
d = re.match(OV_TWOLOC, therest).groupdict()
numbers = re.findall("[0-9]{6}", therest)
if numbers:
bearing = int(numbers[0][:3])
dist = int(numbers[0][3:])
loc = _rectify_identifier(d["loc2"], self)
else:
# Split the distance between the two points
lats = []
lons = []
for loc in [d["loc1"], d["loc2"]]:
loc = _rectify_identifier(loc, self)
if loc not in self.nwsli_provider:
self.warnings.append(
f"Unknown location: {loc} '{report}'"
)
else:
lats.append(self.nwsli_provider[loc]["lat"])
lons.append(self.nwsli_provider[loc]["lon"])
if len(lats) == 2:
_pr.latitude = sum(lats) / 2.0
_pr.longitude = sum(lons) / 2.0
continue
elif re.match(OV_LOCDIR, therest):
# KFAR330008
d = re.match(OV_LOCDIR, therest).groupdict()
loc = _rectify_identifier(d["loc"], self)
bearing = int(d["dir"])
dist = int(d["dist"])
elif re.match(OV_LATLON, therest):
_pr.longitude, _pr.latitude = _parse_lonlat(therest)
continue
elif therest == "O":
# Use the first part of the report in this case
loc = _rectify_identifier(report[:3], self)
else:
loc = _rectify_identifier(therest[:3], self)
if loc not in self.nwsli_provider:
if _pr.base_loc is None:
self.warnings.append(
f"Unknown location: {loc} '{report}'"
)
else:
loc = _pr.base_loc
if loc not in self.nwsli_provider:
self.warnings.append(
f"Double-unknown location: {report}"
)
# So we discard the offset when we go back to the base
dist = 0
bearing = 0
_pr.longitude, _pr.latitude = self.compute_loc(
loc, dist, bearing
)
continue
# Time
if token.startswith("TM "):
numbers = re.findall("[0-9]{4}", token)
if len(numbers) != 1:
self.warnings.append(f"TM parse failed {report}")
return None
hour = int(numbers[0][:2])
minute = int(numbers[0][2:])
_pr.valid = self.compute_pirep_valid(hour, minute)
continue
return _pr
[docs]
def compute_loc(self, loc, dist, bearing):
"""Figure out the lon/lat for this location"""
if loc is None or loc not in self.nwsli_provider:
return None, None
lat = self.nwsli_provider[loc]["lat"]
lon = self.nwsli_provider[loc]["lon"]
# shortcut
if dist == 0:
return lon, lat
# Air distances in PIREPs are in nautical miles!
meters = (units("nautical_mile") * float(dist)).to(units("meter")).m
northing = meters * math.cos(math.radians(bearing)) / 111111.0
easting = (
meters
* math.sin(math.radians(bearing))
/ math.cos(math.radians(lat))
/ 111111.0
)
return lon + easting, lat + northing
[docs]
def compute_pirep_valid(self, hour, minute):
"""Based on what utcnow is set to, compute when this is valid"""
res = self.utcnow.replace(
hour=hour, minute=minute, second=0, microsecond=0
)
if hour > self.utcnow.hour:
res -= timedelta(hours=24)
return res
[docs]
def sql(self, txn):
"""Save the reports to the database via the transaction"""
for report in self.reports:
if report.is_duplicate:
continue
if report.longitude is None:
geom = "POINT EMPTY"
artcc = "null"
else:
geom = f"SRID=4326;POINT({report.longitude} {report.latitude})"
artcc = (
"(select ident from airspaces where st_dwithin(geom, "
f"ST_MakePoint({report.longitude}, {report.latitude}), 0) "
"and type_code = 'ARTCC' LIMIT 1)"
)
txn.execute(
f"""
INSERT into pireps(valid, geom, is_urgent, aircraft_type,
report, artcc, product_id, flight_level) VALUES (%s,
ST_GeographyFromText(%s), %s, %s, %s, {artcc}, %s, %s)
""",
(
report.valid,
geom,
report.priority == Priority.UUA,
report.aircraft_type,
report.text,
self.get_product_id(),
report.flight_level,
),
)
[docs]
def assign_cwsu(self, txn):
"""Use this transaction object to assign CWSUs for the pireps"""
for report in self.reports:
if report.latitude is None:
continue
txn.execute(
"select id from cwsu WHERE "
"st_contains(geom, ST_Point(%s, %s, 4326))",
(report.longitude, report.latitude),
)
if txn.rowcount > 0:
report.cwsu = txn.fetchone()["id"]
[docs]
def get_jabbers(self, _uri, _uri2=None):
"""get jabber messages"""
res = []
for report in self.reports:
if report.is_duplicate:
continue
jmsg = {
"priority": "Urgent"
if report.priority == Priority.UUA
else "Routine",
"ts": report.valid.strftime("%H%M"),
"report": html_escape(report.text),
"color": (
"#ff0000" if report.priority == Priority.UUA else "#00ff00"
),
}
plain = "%(priority)s pilot report at %(ts)sZ: %(report)s" % jmsg
html = (
"<span style='color:%(color)s;'>%(priority)s pilot "
"report</span> at %(ts)sZ: %(report)s"
) % jmsg
xtra = {
"channels": (
f"{report.priority}.{report.cwsu},{report.priority}.PIREP"
),
"ptype": report.priority,
"category": "PIREP",
"twitter": plain[:140],
"valid": report.valid.strftime("%Y%m%dT%H:%M:00"),
}
if report.latitude is not None:
xtra["geometry"] = (
f"POINT({report.longitude} {report.latitude})"
)
res.append([plain, html, xtra])
return res
[docs]
def parser(buf, utcnow=None, ugc_provider=None, nwsli_provider=None):
"""A parser implementation"""
return Pirep(
buf,
utcnow=utcnow,
ugc_provider=ugc_provider,
nwsli_provider=nwsli_provider,
)