"""Parser for the CF6 Product."""
import calendar
import re
from datetime import date, datetime, timedelta
from io import StringIO
import pandas as pd
from pyiem.nws.product import TextProduct
from pyiem.reference import TRACE_VALUE
MONTH_RE = re.compile(r"^MONTH:\s+(?P<month>[A-Z]+)$", re.I)
MONTH_RE_NUM = re.compile(r"^MONTH:\s+(?P<month>[0-9]+)$", re.I)
YEAR_RE = re.compile(r"^YEAR:\s+(?P<year>[0-9]{4})$", re.I)
COL_WIDTHS = [2, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 3, 4, 4, 5, 4, 7, 3, 4]
COL_NAMES = [
"dy",
"max",
"min",
"avg",
"dep",
"hdd",
"cdd",
"wtr",
"snw",
"dpth",
"avg_spd",
"max_spd",
"avg_dir",
"min_sun",
"psbl_sun",
"ss_sky",
"wx",
"gust_spd",
"gust_dir",
]
[docs]
class CF6Product(TextProduct):
"""
Represents a CF6 Product
"""
def __init__(
self, text, utcnow=None, ugc_provider=None, nwsli_provider=None
):
"""constructor"""
super().__init__(text, utcnow, ugc_provider, nwsli_provider)
# Hold our parsing results as an array of dicts
self.station = f"{self.source[0]}{self.afos[3:]}"
self.df = None
self.parser()
[docs]
def parser(self):
"""Do the parsing we need to do!"""
year = None
month = None
lines = self.unixtext.split("\n")
# Arbitrary pick to eliminate products that are likely this:
# Bad date: 12/2020. Last month in file is 11 / 2020 .
if len(lines) < 8:
return
for line in lines:
line = line.strip()
if line.startswith("MONTH:"):
m = MONTH_RE.match(line)
if m:
month = m.groupdict()["month"]
m = MONTH_RE_NUM.match(line)
if m:
month = calendar.month_name[int(m.groupdict()["month"])]
elif line.startswith("YEAR:"):
m = YEAR_RE.match(line)
if m:
year = m.groupdict()["year"]
if year is not None and month is not None:
break
if year is None or month is None:
raise ValueError("Failed to find required month and year values")
day1 = datetime.strptime(
f"{year} {month} 1",
"%Y %B %d" if len(month) > 3 else "%Y %b %d",
)
headercount = 0
sio = StringIO()
for line in self.unixtext.split("\n"):
if line.strip().startswith("================"):
headercount += 1
if headercount != 2:
continue
if len(line.strip()) > 70 and line.strip()[0].isdigit():
sio.write(line + "\n")
sio.seek(0)
df = pd.read_fwf(
sio,
widths=COL_WIDTHS,
names=COL_NAMES,
dtype={"wx": str},
)
df = df.replace("T", TRACE_VALUE)
for col in df.columns:
if col == "wx":
continue
df[col] = pd.to_numeric(df[col], errors="coerce")
df["valid"] = df["dy"].apply(
lambda x: date(day1.year, day1.month, int(x))
)
# Ensure we don't have data from utcnow + 1 day
ceiling = (self.utcnow + timedelta(days=1)).date()
indicies = pd.to_datetime(df["valid"]) > pd.Timestamp(ceiling)
if indicies.any():
self.warnings.append(f"{indicies.sum()} rows from the future")
df = df.loc[~indicies]
# Ensure rows that have maxt as nan for "today" or future are removed
df = df[~((df["valid"] >= self.utcnow.date()) & df["max"].isna())]
self.df = df.set_index("valid")
[docs]
def sql(self, cursor):
"""Send the data to the database."""
if self.df is None or self.df.empty:
return
# Prevent NaN numbers from going to the database.
_df = self.df.where(pd.notnull(self.df), None)
for valid, row in _df.iterrows():
cursor.execute(
"DELETE from cf6_data where station = %s and valid = %s",
(self.station, valid),
)
cursor.execute(
"INSERT into cf6_data(station, valid, product, high, low, "
"avg_temp, dep_temp, hdd, cdd, precip, snow, snowd_12z, "
"avg_smph, max_smph, avg_drct, minutes_sunshine, "
"possible_sunshine, cloud_ss, wxcodes, gust_smph, gust_drct, "
"updated) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, "
"%s, %s, %s, %s, %s, %s, %s, %s, now())",
(
self.station,
valid,
self.get_product_id(),
row[COL_NAMES[1]],
row[COL_NAMES[2]],
row[COL_NAMES[3]],
row[COL_NAMES[4]],
row[COL_NAMES[5]],
row[COL_NAMES[6]],
row[COL_NAMES[7]],
row[COL_NAMES[8]],
row[COL_NAMES[9]],
row[COL_NAMES[10]],
row[COL_NAMES[11]],
row[COL_NAMES[12]],
row[COL_NAMES[13]],
row[COL_NAMES[14]],
row[COL_NAMES[15]],
row[COL_NAMES[16]],
row[COL_NAMES[17]],
row[COL_NAMES[18]],
),
)
[docs]
def parser(text, utcnow=None, ugc_provider=None, nwsli_provider=None):
"""Provide back CF6 objects based on the parsing of this text"""
return CF6Product(text, utcnow, ugc_provider, nwsli_provider)