"""Parser and object storage of information within NWS CLI Product."""
import re
from datetime import date, datetime
from pyiem.exceptions import CLIException
from pyiem.nws.product import TextProduct
from pyiem.observation import Observation
from pyiem.reference import TRACE_VALUE, StationAttributes
from pyiem.util import LOG
AMPM_COLON = re.compile(r"\s\d?\d:\d\d\s[AP]M")
HEADLINE_RE = re.compile(
(
r"\.\.\.THE ([A-Z_\.\-\(\)\/\,\s]+) "
r"CLIMATE SUMMARY (FOR|FROM)\s+"
r"([A-Z]+\s[0-9]+\s+[0-9]{4})( CORRECTION)?\.\.\."
)
)
WIND_RE = re.compile(
r"(HIGHEST|AVERAGE|RESULTANT)\s(WIND|GUST)\s(SPEED|DIRECTION)"
)
REGIMES = [
"WEATHER ITEM OBSERVED TIME RECORD YEAR NORMAL DEPARTURE LAST",
"WEATHER ITEM OBSERVED TIME NORMAL DEPARTURE LAST",
"WEATHER ITEM OBSERVED TIME RECORD YEAR NORMAL DEPARTURE LAST",
"WEATHER ITEM OBSERVED RECORD YEAR NORMAL DEPARTURE LAST",
"WEATHER ITEM OBSERVED TIME RECORD YEAR",
"WEATHER ITEM OBSERVED TIME RECORD YEAR NORMAL DEPARTURE",
"WEATHER ITEM OBSERVED RECORD YEAR NORMAL DEPARTURE",
"WEATHER ITEM OBSERVED",
"WEATHER ITEM OBSERVED RECORD YEAR NORMAL",
"WEATHER ITEM OBSERVED TIME RECORD YEAR NORMAL LAST",
"WEATHER ITEM OBSERVED TIME LAST",
"WEATHER ITEM OBSERVED NORMAL DEPARTURE LAST",
"WEATHER ITEM OBSERVED TIME NORMAL LAST",
"WEATHER ITEM OBSERVED TIME RECORD YEAR LAST",
"WEATHER ITEM OBSERVED TIME",
"WEATHER ITEM OBSERVED TIME NORMAL DEPARTURE",
"WEATHER ITEM OBSERVED NORMAL DEPARTURE",
"WEATHER ITEM OBSERVED TIME RECORD NORMAL DEPARTURE LAST",
(
"WEATHER ITEM OBSERVED TIME RECORD YEAR NORMAL "
"DEPARTURE LAST"
),
"WEATHER ITEM OBSERVED RECORD YEAR LAST",
(
"WEATHER ITEM OBSERVED TIME RECORD YEAR NORMAL "
"DEPARTURE LAST"
),
"WEATHER ITEM OBSERVED LAST",
(
"WEATHER ITEM OBSERVED TIME RECORD YEAR NORMAL "
"DEPARTURE LAST"
),
]
# label, value, time, record, year, normal, departure, last
COLS = [
[16, 23, 30, 37, 42, 49, 56, 65],
[16, 23, 30, None, None, 37, 44, 53],
[16, 22, 31, 37, 43, 50, 58, 65],
[16, 23, None, 30, 35, 42, 49, 58],
[16, 23, 25, 37, 42, None, None, None],
[16, 23, 30, 37, 42, 49, 56, None],
[16, 23, None, 30, 35, 42, 49, None],
[16, 23, None, None, None, None, None, None],
[16, 23, None, 30, 37, None, None, None],
[16, 23, 30, 37, 42, 49, None, 57],
[16, 23, 30, None, None, None, None, 39],
[16, 23, None, None, None, 30, 37, 46],
[16, 23, 30, None, None, 37, None, 45],
[16, 23, 30, 37, 42, None, None, 51],
[16, 23, 30, None, None, None, None, None],
[16, 23, 30, None, None, 37, 44, None],
[16, 23, None, None, None, 30, 37, None],
[16, 23, 30, 37, None, 44, 51, 60],
[16, 23, 34, 40, 50, 58, 67, 80],
[16, 23, None, 30, 35, None, None, 44],
[16, 23, 33, 40, 51, 59, 69, 79],
[16, 23, None, None, None, None, None, 33],
[16, 23, 31, 37, 46, 52, 61, 72],
]
# Allow manual provision of IDS
HARDCODED = {
"DODGE CITY KS": "KDDC", # Comes as CLIDGC
}
[docs]
def update_iemaccess(txn, entry):
"""Update the IEM Access Database."""
if entry["access_network"] is None:
return False
ob = Observation(
entry["access_station"], entry["access_network"], entry["cli_valid"]
)
ob.load(txn)
current = ob.data
data = entry["data"]
logmsg = []
if data.get("temperature_maximum") is not None:
climax = int(data["temperature_maximum"])
if climax != current["max_tmpf"]:
logmsg.append(f"MaxT O:{current['max_tmpf']} N:{climax}")
current["max_tmpf"] = climax
if data.get("temperature_minimum") is not None:
climin = int(data["temperature_minimum"])
if climin != current["min_tmpf"]:
logmsg.append(f"MinT O:{current['min_tmpf']} N:{climin}")
current["min_tmpf"] = climin
if data.get("precip_month") is not None:
val = data["precip_month"]
if val != current["pmonth"]:
logmsg.append(f"PMonth O:{current['pmonth']} N:{val}")
current["pmonth"] = val
if data.get("precip_today") is not None:
val = data["precip_today"]
if val != current["pday"]:
logmsg.append(f"PDay O:{current['pday']} N:{val}")
current["pday"] = val
for dkey, ikey in {"snow_today": "snow", "snowdepth": "snowd"}.items():
if data.get(dkey) is not None:
val = data[dkey]
if current[ikey] is None or val != current[ikey]:
logmsg.append(f"{ikey} O:{current[ikey]} N:{val}")
current[ikey] = val
if not logmsg:
return True
res = ob.save(txn, skip_current=True)
LOG.warning(
"%s (%s) %s ob.save: %s",
entry["access_station"],
entry["cli_valid"].strftime("%y%m%d"),
",".join(logmsg),
res,
)
return res
[docs]
def trace_r(val):
"""Convert our value back into meaningful string"""
if val is None or val == "M":
return "Missing"
if val == TRACE_VALUE:
return "Trace"
return f'{val}"'
[docs]
def get_number_year(text):
"""Ensure we get a year that makes sense."""
val = get_number(text)
if val is None or val < 1700 or val > (date.today().year + 1):
return None
return val
[docs]
def get_number(text):
"""Convert a string into a number, preferable a float!"""
if text is None:
return None
text = text.strip()
if text == "":
retval = None
elif text == "MM":
retval = None
elif text == "T":
retval = TRACE_VALUE
else:
number = re.findall(r"[\-\+]?\d*\.\d+|[\-\+]?\d+", text)
if len(number) == 1:
if text.find(".") > 0:
retval = float(number[0])
else:
retval = int(number[0])
else:
LOG.warning("get_number() failed for |%s|", text)
retval = None
return retval
[docs]
def convert_key(text):
"""Convert a key value to something we store"""
if text is None:
return None
if text == "YESTERDAY":
return "today"
if text == "TODAY":
return "today"
if text == "MONTH TO DATE":
return "month"
if text.startswith("SINCE "):
return text.replace("SINCE ", "").replace(" ", "").lower()
LOG.warning("convert_key() failed for |%s|", text)
return "fail"
[docs]
def make_tokens(regime, line):
"""Turn a line into tokens based on a regime"""
mycols = COLS[regime]
tokens = []
pos = 0
for e in mycols:
if e is None:
tokens.append(None)
continue
tokens.append(
line[pos:e].strip() if line[pos:e].strip() != "" else None
)
pos = e
for i, token in enumerate(tokens):
if token is not None and token.startswith("R "):
tokens[i] = token.replace("R ", "")
return tokens
[docs]
def parse_snowfall(regime, lines, data):
"""Parse the snowfall data"""
for linenum, line in enumerate(lines):
# skipme
if len(line.strip()) < 14:
continue
tokens = make_tokens(regime, line)
key = tokens[0].strip()
if key.startswith("SNOW DEPTH"):
data["snowdepth"] = get_number(tokens[1])
continue
key = convert_key(key)
data[f"snow_{key}"] = get_number(tokens[1])
data[f"snow_{key}_record"] = get_number(tokens[3])
yeartest = get_number_year(tokens[4])
if yeartest is not None:
data[f"snow_{key}_record_years"] = [yeartest]
data[f"snow_{key}_normal"] = get_number(tokens[5])
data[f"snow_{key}_departure"] = get_number(tokens[6])
data[f"snow_{key}_last"] = get_number(tokens[7])
if (
key == "today"
and yeartest is not None
and data[f"snow_{key}_record_years"][0] is not None
):
while (linenum + 1) < len(lines) and len(
lines[linenum + 1].strip()
) == 4:
n = get_number_year(lines[linenum + 1])
if n is not None:
data.setdefault("snow_today_record_years", []).append(n)
linenum += 1
[docs]
def parse_precipitation(regime, lines, data):
"""Parse the precipitation data"""
for linenum, line in enumerate(lines):
if len(line.strip()) < 20:
continue
tokens = make_tokens(regime, line)
key = convert_key(tokens[0])
if key is None:
continue
data[f"precip_{key}"] = get_number(tokens[1])
data[f"precip_{key}_record"] = get_number(tokens[3])
yeartest = get_number_year(tokens[4])
if yeartest is not None:
data[f"precip_{key}_record_years"] = [yeartest]
data[f"precip_{key}_normal"] = get_number(tokens[5])
data[f"precip_{key}_departure"] = get_number(tokens[6])
data[f"precip_{key}_last"] = get_number(tokens[7])
if (
key == "today"
and yeartest is not None
and data[f"precip_{key}_record_years"][0] is not None
):
while (linenum + 1) < len(lines) and len(
lines[linenum + 1].strip()
) == 4:
n = get_number_year(lines[linenum + 1])
if n is not None:
data.setdefault("precip_today_record_years", []).append(n)
linenum += 1
[docs]
def parse_temperature(prod, regime, lines, data):
"""Here we parse a temperature section"""
for linenum, line in enumerate(lines):
if len(line.strip()) < 18:
continue
# Repair a broken (E) product, see akrherz/pyIEM#08
if line[20:23] == "(E)" and line[38] == " ":
prod.warnings.append(f"Invalid line repaired |{line}|")
line = line.replace("(E)", "E ")
tokens = make_tokens(regime, line)
key = tokens[0].strip().lower()
if key.upper() not in ["MAXIMUM", "MINIMUM", "AVERAGE"]:
continue
data[f"temperature_{key}"] = get_number(tokens[1])
if tokens[2] is not None and len(tokens[2]) < 8:
data[f"temperature_{key}_time"] = tokens[2]
if tokens[3] is not None:
data[f"temperature_{key}_record"] = get_number(tokens[3])
if tokens[4] is not None and tokens[4].strip() not in ["", "M", "MM"]:
n = get_number_year(tokens[4])
if n is not None:
data[f"temperature_{key}_record_years"] = [n]
else:
prod.warnings.append(f"Found invalid year |{tokens[4]}|")
if tokens[5] is not None:
data[f"temperature_{key}_normal"] = get_number(tokens[5])
# Check next line(s) for more years
while (linenum + 1) < len(lines) and len(
lines[linenum + 1].strip()
) == 4:
line2 = lines[linenum + 1].strip()
n = get_number_year(line2)
if n is not None:
data.setdefault(
f"temperature_{key}_record_years",
[],
).append(n)
else:
prod.warnings.append(f"Found invalid year |{line2}|")
linenum += 1
[docs]
def parse_sky_coverage(lines, data):
"""Turn section into data."""
asc = "AVERAGE SKY COVER"
for line in lines:
pos = line.find(asc)
if pos < 0:
continue
try:
data["average_sky_cover"] = float(line[pos:].replace(asc, ""))
except ValueError:
LOG.debug("Convert '%s' to float failed", line)
[docs]
def parse_headline(section):
"""Figure out when this product is valid for"""
tokens = HEADLINE_RE.findall(section.replace("\n", " "))
myfmt = "%b %d %Y" if len(tokens[0][2].split()[0]) == 3 else "%B %d %Y"
cli_valid = datetime.strptime(tokens[0][2], myfmt).date()
cli_station = (tokens[0][0]).strip().upper()
return (cli_valid, cli_station)
[docs]
def parse_wind(lines, data):
"""Parse any wind information."""
# hold your nose here
# make everything space seperated
content = " ".join((" ".join(lines[1:])).strip().split())
tokens = WIND_RE.findall(content)
for token in tokens:
content = content.replace(" ".join(token), ";")
vals = content[1:].split(";")
for token, val in zip(tokens, vals, strict=False):
data[("_".join(token)).lower()] = get_number(val)
def _compute_station_ids(prod, cli_station_name, is_multi):
"""Compute needed station IDs."""
# Consult the HARDCODED list
if cli_station_name in HARDCODED:
station = HARDCODED[cli_station_name]
# Can't always use the AFOS as the station ID :(
elif is_multi:
station = None
for st in prod.nwsli_provider:
if prod.nwsli_provider[st]["name"].upper() == cli_station_name:
station = st
break
if station is None:
raise CLIException(
f"Unknown CLI Station Text: |{cli_station_name}|"
)
else:
station = prod.source[0] + prod.afos[3:]
# We have computed a four character station ID, is it known?
if station not in prod.nwsli_provider:
prod.warnings.append(
f"Station not known to NWSCLI Network |{station}|"
)
return station, None, None
access_station = None
access_network = None
# See if our network table provides an attribute that maps us to an ASOS
val = (
prod.nwsli_provider[station]
.get("attributes", {})
.get(StationAttributes.MAPS_TO)
)
if val is not None:
tokens = val.split("|")
if len(tokens) == 2:
access_station, access_network = tokens
if access_station is None:
# Our default mapping
access_station = station[1:] if station.startswith("K") else station
access_network = f"{prod.nwsli_provider[station].get('state')}_ASOS"
return station, access_station, access_network
[docs]
def sql_data(prod, cursor, data):
"""Do an individual data entry."""
# See what we currently have stored.
cursor.execute(
"SELECT product from cli_data where station = %s and valid = %s",
(data["db_station"], data["cli_valid"]),
)
if cursor.rowcount == 1:
row = cursor.fetchone()
pid = row["product"]
if pid is not None and prod.get_product_id() < pid:
return
cursor.execute(
"DELETE from cli_data WHERE station = %s and valid = %s",
(data["db_station"], data["cli_valid"]),
)
dd = data["data"]
cursor.execute(
"""
INSERT into cli_data(station, product, valid, high, high_normal,
high_record, high_record_years, low, low_normal, low_record,
low_record_years, precip, precip_month, precip_jan1, precip_jul1,
precip_normal, precip_record, precip_record_years, precip_month_normal,
snow, snow_month, snow_jun1, snow_jul1, snow_normal, snow_dec1,
precip_dec1, precip_dec1_normal, precip_jan1_normal, high_time,
low_time, snow_record_years, snow_record, snow_jun1_normal,
snow_jul1_normal, snow_dec1_normal, snow_month_normal, precip_jun1,
precip_jun1_normal, average_sky_cover, resultant_wind_speed,
resultant_wind_direction, highest_wind_speed, highest_wind_direction,
highest_gust_speed, highest_gust_direction, average_wind_speed,
snowdepth)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
data["db_station"],
prod.get_product_id(),
data["cli_valid"],
dd.get("temperature_maximum"),
dd.get("temperature_maximum_normal"),
dd.get("temperature_maximum_record"),
dd.get("temperature_maximum_record_years", []),
dd.get("temperature_minimum"),
dd.get("temperature_minimum_normal"),
dd.get("temperature_minimum_record"),
dd.get("temperature_minimum_record_years", []),
dd.get("precip_today"),
dd.get("precip_month"),
dd.get("precip_jan1"),
dd.get("precip_jul1"),
dd.get("precip_today_normal"),
dd.get("precip_today_record"),
dd.get("precip_today_record_years", []),
dd.get("precip_month_normal"),
dd.get("snow_today"),
dd.get("snow_month"),
dd.get("snow_jun1"),
dd.get("snow_jul1"),
dd.get("snow_today_normal"),
dd.get("snow_dec1"),
dd.get("precip_dec1"),
dd.get("precip_dec1_normal"),
dd.get("precip_jan1_normal"),
dd.get("temperature_maximum_time"),
dd.get("temperature_minimum_time"),
dd.get("snow_today_record_years", []),
dd.get("snow_today_record"),
dd.get("snow_jun1_normal"),
dd.get("snow_jul1_normal"),
dd.get("snow_dec1_normal"),
dd.get("snow_month_normal"),
dd.get("precip_jun1"),
dd.get("precip_jun1_normal"),
dd.get("average_sky_cover"),
dd.get("resultant_wind_speed"),
dd.get("resultant_wind_direction"),
dd.get("highest_wind_speed"),
dd.get("highest_wind_direction"),
dd.get("highest_gust_speed"),
dd.get("highest_gust_direction"),
dd.get("average_wind_speed"),
dd.get("snowdepth"),
),
)
[docs]
class CLIProduct(TextProduct):
"""
Represents a CLI Daily Climate Report Product
"""
def __init__(
self, text, utcnow=None, ugc_provider=None, nwsli_provider=None
):
"""constructor"""
super().__init__(text, utcnow, ugc_provider, nwsli_provider)
# Hold our parsing results as an array of dicts
self.data = []
self.regime = None
# Sometimes, we get products that are not really in CLI format but
# are RER (record event reports) with a CLI AWIPS ID
if self.wmo[:2] != "CD":
LOG.warning(
"Product %s skipped due to wrong header", self.get_product_id()
)
return
sections = self.find_sections()
for section in sections:
# We have meat!
self.compute_diction(section)
entry = {}
entry["cli_valid"], entry["cli_station"] = parse_headline(section)
(
entry["db_station"],
entry["access_station"],
entry["access_network"],
) = _compute_station_ids(
self, entry["cli_station"], len(sections) > 1
)
entry["data"] = self.parse_data(section)
self.data.append(entry)
[docs]
def find_sections(self):
"""Some trickery to figure out if we have multiple reports
Returns:
list of text sections
"""
sections = []
text = self.unixtext
# Correct bad encoding of colons due to new NWS software
for token in AMPM_COLON.findall(text):
text = text.replace(token, " " + token.replace(":", ""))
for section in text.split("&&"):
if not HEADLINE_RE.findall(section.replace("\n", " ")):
continue
tokens = re.findall(r"^\s?WEATHER ITEM.*$", section, re.M)
if not tokens:
raise CLIException("Could not find 'WEATHER ITEM' within text")
if len(tokens) == 1:
sections.append(section)
continue
# Uh oh, we need to do some manual splitting
pos = []
for match in re.finditer(HEADLINE_RE, section.replace("\n", " ")):
pos.append(match.start()) # noqa
pos.append(len(section))
for i, p in enumerate(pos[:-1]):
sections.append(section[max([0, p - 10]) : pos[i + 1]])
return sections
[docs]
def compute_diction(self, text):
"""Try to determine what we have for a format"""
tokens = re.findall(r"^\s?WEATHER ITEM.*$", text, re.M)
diction = tokens[0].strip()
if diction not in REGIMES:
raise CLIException(
f"Unknown diction found in 'WEATHER ITEM'\n|{diction}|"
)
self.regime = REGIMES.index(diction)
[docs]
def get_jabbers(self, uri, _=None):
"""Override the jabber message formatter"""
url = f"{uri}?pid={self.get_product_id()}"
res = []
xtra = {
"channels": self.get_channels(),
"product_id": self.get_product_id(),
}
for data in self.data:
msg = (
f"High: {data['data'].get('temperature_maximum', 'M')} "
f"Low: {data['data'].get('temperature_minimum', 'M')} "
f"Precip: {trace_r(data['data'].get('precip_today', 'M'))} "
f"Snow: {trace_r(data['data'].get('snow_today', 'M'))}"
)
sd = data["data"].get("snowdepth")
if sd is not None:
msg += f" Snow Depth: {trace_r(sd)}"
mess = (
f"{data['cli_station']} {data['cli_valid']:%b %-d} "
f"Climate Report: {msg} {url}"
)
htmlmess = (
f'{data["cli_station"]} <a href="{url}">'
f"{data['cli_valid']:%b %-d} Climate Report</a>: {msg}"
)
xtra["twitter_media"] = (
"https://mesonet.agron.iastate.edu/plotting/auto/plot/218/"
f"network:NWSCLI::station:{data['db_station']}::"
f"date:{data['cli_valid'].strftime('%Y-%m-%d')}::"
f"_:{self.get_product_id()}.png"
)
xtra["twitter"] = (
f"{data['cli_station']} {data['cli_valid']:%b %-d} "
f"Climate: {msg} {url}"
)
res.append(
[
mess.replace(str(TRACE_VALUE), "Trace"),
htmlmess.replace(str(TRACE_VALUE), "Trace"),
xtra,
]
)
return res
[docs]
def parse_data(self, section):
"""Actually do the parsing of this silly format"""
data = {}
# We need to first search down the section to look for where the
# first TEMPERATURE section starts.
regex = re.compile("^TEMPERATURE", re.M)
search = regex.search(section)
if search is None:
raise CLIException("Failed to find TEMPERATURE, aborting")
pos = search.start()
# Strip extraneous spaces
meat = "\n".join([s.rstrip() for s in section[pos:].split("\n")])
# replace any 2+ \n with just two
meat = re.sub(r"\n{2,}", "\n\n", meat)
sections = meat.split("\n\n")
for _section in sections:
lines = [ll for ll in _section.split("\n") if ll.find("\t") < 0]
if lines[0].startswith("TEMPERATURE"):
parse_temperature(self, self.regime, lines[1:], data)
elif lines[0].startswith("PRECIPITATION"):
parse_precipitation(self.regime, lines[1:], data)
elif lines[0].startswith("SNOWFALL"):
parse_snowfall(self.regime, lines[1:], data)
elif lines[0] in ["SKY COVER"]:
parse_sky_coverage(lines, data)
elif lines[0] in ["WIND (MPH)"] and len(lines) > 1:
parse_wind(lines, data)
return data
[docs]
def sql(self, cursor):
"""Do the database update!"""
for entry in self.data:
sql_data(self, cursor, entry)
if not update_iemaccess(cursor, entry):
self.warnings.append(
f"IEMAccess Update failed {entry['access_network']} "
f"{entry['access_station']} {entry['cli_valid']}"
)
[docs]
def parser(text, utcnow=None, ugc_provider=None, nwsli_provider=None):
"""Parse CLI Text Products.
Args:
nwsli_provider (dict): This dictionary provider in the form of the
`pyiem.network.Table` object should contain additional attributes of
`access_station` and `access_network` to map back to IEMAccess.
"""
# Careful here, see if we have two CLIs in one product!
return CLIProduct(text, utcnow, ugc_provider, nwsli_provider)