Source code for pyiem.nws.products.mos

"""
Supports parsing of Textual Model Output Statistics files
"""

import re
from datetime import timedelta

from pyiem.util import utc
from pyiem.wmo import WMOProduct

REMAP_VARS = {"X_N": "N_X", "WND": "WSP", "WGS": "GST"}


[docs] def section_parser(sect): """Parse this section of text""" metadata = re.findall( ( r"([A-Z0-9_]{3,10})\s+(....?) (V[0-9]\.[0-9] )?(....?) GUIDANCE\s+" r"([01]?[0-9])/([0-3][0-9])/([0-9]{4})\s+" r"([0-2][0-9][0-6][0-9]) UTC" ), sect, ) (station, model, _bogus, mos, month, day, year, hhmm) = metadata[0] if model == "NBM": model = mos if mos == "NBX": model = "NBE" if mos == "LAMP": model = "LAV" if model == "GFSX": model = "MEX" # We drop the minutes for the LAV, which has :30 after for some reason? initts = utc(int(year), int(month), int(day), int(hhmm[:2])) times = [initts] data = {} lines = sect.split(";;;") hrline = 2 if model in ["MEX", "LAV"]: hrline = 1 elif model in ["NBE", "NBS"]: hrline = 3 hrs = lines[hrline].replace("|", " ").split() if hrs[0] == "DT": # Hack hrs = lines[2].split() for i, hr in enumerate(hrs[1:]): if model == "LAV" and hrs[0] == "HR": ts = initts + timedelta(hours=int(hr)) elif model == "LAV": ts = initts + timedelta(hours=i + 1) assert ts.hour == int(hr) elif model in ["MEX", "NBE", "NBS"]: ts = initts + timedelta(hours=int(hr)) elif hr == "00": ts = times[-1] + timedelta(days=1) ts = ts.replace(hour=0) else: ts = times[-1].replace(hour=int(hr)) times.append(ts) data[ts] = {} # Double check for ts in data: if ts < initts: raise AssertionError(f"Computed ts of {ts} < initts {initts}") chars = "(...)" if model not in ["MEX", "NBE"] else "(....)" startline = 2 if model in ["LAV"] else 3 startlinepos = 4 if model not in ["NBE"] else 5 if mos == "NBX" or model == "MEX": startlinepos = 3 for line in lines[startline:]: if len(line) < 20: continue line = line.replace("|", " ") vname = line[:3].replace("/", "_").strip() vals = re.findall(chars, line[startlinepos:]) for i, val in enumerate(vals): # Some products have more data than columns :( if i >= len(data): continue if vname == "T06" and times[i + 1].hour in [0, 6, 12, 18]: data[times[i + 1]]["T06_1"] = ( vals[i - 1].replace("/", "").strip() ) data[times[i + 1]]["T06_2"] = val.replace("/", "").strip() elif vname == "T06": pass elif vname == "T12" and times[i + 1].hour in [0, 12]: data[times[i + 1]]["T12_1"] = ( vals[i - 1].replace("/", "").strip() ) data[times[i + 1]]["T12_2"] = val.replace("/", "").strip() elif vname == "T12": pass elif vname == "WDR" and val.strip() != "": data[times[i + 1]][vname] = int(val.strip()) * 10 else: data[times[i + 1]][vname] = val.strip() return dict(station=station, model=model, data=data, initts=initts)
[docs] def make_null(val): """Hmmm, perhaps we should set 999 as null too?""" if val in ["", "NG"] or val is None: return None return val
[docs] class MOSProduct(WMOProduct): """ Represents a Model Output Statistics file """ def __init__( self, text, utcnow=None, ugc_provider=None, nwsli_provider=None ): """constructor""" super().__init__(text, utcnow) self.ugc_provider = ugc_provider self.nwsli_provider = nwsli_provider self.data = [] self.parse_data()
[docs] def sql(self, txn): """Persist our data to the database Args: txn: Database cursor Returns: int number of inserts made to the database """ inserts = 0 for sect in self.data: for ts in sect["data"]: # Account for 'empty' MOS products if not sect["data"][ts]: continue fst = ( f"INSERT into t{sect['initts'].year} " "(station, model, runtime, ftime, " ) sst = "VALUES(%s,%s,%s,%s," args = [sect["station"], sect["model"], sect["initts"], ts] for vname in sect["data"][ts].keys(): # variables we don't wish to database if vname in ["FHR", "HR", "UTC"]: continue # save some database space :/ fst += f" {REMAP_VARS.get(vname, vname)}," sst += "%s," args.append(make_null(sect["data"][ts][vname])) if len(args) == 4: # No data was found continue sql = fst[:-1] + ") " + sst[:-1] + ")" txn.execute(sql, args) inserts += 1 return inserts
[docs] def parse_data(self): """Parse out our data!""" # Whitespace trim raw = "\n".join([s.strip() for s in self.unixtext.split("\n")]) raw = raw + "\n" raw = raw.replace("\n", ";;;").replace("\x1e", "") sections = re.findall( r"([A-Z0-9_]{3,10}\s+....? V?[0-9]?\.?[0-9]? ?" r"....? GUIDANCE .*?);;;;;;", raw, ) self.data = list(map(section_parser, sections)) if not sections: raise Exception("Failed to split MOS Product")
[docs] def parser(text, utcnow=None, ugc_provider=None, nwsli_provider=None): """Helper function""" return MOSProduct(text, utcnow, ugc_provider, nwsli_provider)