"""Processing of NCEI IGRA2.2 data.
---------------------
Header Record Format:
---------------------
---------------------------------
Variable Columns Type
---------------------------------
HEADREC 1- 1 Character
ID 2- 12 Character
YEAR 14- 17 Integer
MONTH 19- 20 Integer
DAY 22- 23 Integer
HOUR 25- 26 Integer
RELTIME 28- 31 Integer
NUMLEV 33- 36 Integer
P_SRC 38- 45 Character
NP_SRC 47- 54 Character
LAT 56- 62 Integer
LON 64- 71 Integer
-------------------------------
---------------------
Data Record Format:
---------------------
-------------------------------
Variable Columns Type
-------------------------------
LVLTYP1 1- 1 Integer
LVLTYP2 2- 2 Integer
ETIME 4- 8 Integer
PRESS 10- 15 Integer
PFLAG 16- 16 Character
GPH 17- 21 Integer
ZFLAG 22- 22 Character
TEMP 23- 27 Integer
TFLAG 28- 28 Character
RH 29- 33 Integer
DPDP 35- 39 Integer
WDIR 41- 45 Integer
WSPD 47- 51 Integer
-------------------------------
"""
from datetime import datetime, timedelta
from typing import Optional
from pydantic import ValidationError
from shapely.geometry import Point
from pyiem.models.igra import SoundingHeader, SoundingModel, SoundingRecord
from pyiem.reference import igra2icao
from pyiem.util import LOG, utc
[docs]
class Sounding:
"""Encapsulate a sounding."""
def __init__(self, header: SoundingHeader, records: list[SoundingRecord]):
"""Create a Sounding object."""
self.model: SoundingModel = SoundingModel(
header=header,
records=records,
)
[docs]
def sql(self, txn, overwrite=False):
"""Do the database insert.
Args:
txn (psycopg2.cursor): Database cursor
overwrite (bool): Should we overwrite existing data?
"""
icao = igra2icao[self.model.header.station]
txn.execute(
"select fid from raob_flights where station = %s and valid = %s",
(icao, self.model.header.valid),
)
if txn.rowcount == 0:
txn.execute(
"INSERT into raob_flights(station, valid) VALUES (%s, %s) "
"RETURNING fid",
(icao, self.model.header.valid),
)
elif not overwrite:
LOG.info(
"Skipping %s[%s] as record exists",
icao,
self.model.header.valid,
)
return
fid = txn.fetchone()["fid"]
# Delete any existing data
table = f"raob_profile_{self.model.header.valid.year}"
txn.execute(
f"DELETE from {table} where fid = %s",
(fid,),
)
tropo_level = None
for record in self.model.records:
# levelcode came from the rucsounding days
# 9 is surface data
# 4 is mandatory level
# 5 is unsure
levelcode = 5
if record.lvltyp2 == 1:
levelcode = 9
if record.lvltyp1 == 1:
levelcode = 4
if record.lvltyp2 == 2: # tropopause
tropo_level = record.press
txn.execute(
f"""
INSERT into {table} (fid, ts, levelcode, pressure, height, tmpc, dwpc,
drct, smps) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
fid,
record.valid,
levelcode,
record.press,
record.gph,
record.temp,
record.dewp,
record.wdir,
record.wspd,
),
)
txn.execute(
"""
UPDATE raob_flights SET release_time = %s, ingested_at = now(),
tropo_level = %s, computed = 'f' WHERE fid = %s""",
(self.model.header.release_valid, tropo_level, fid),
)
LOG.info(
"Added %s records for %s[%s][fid:%s]",
len(self.model.records),
icao,
self.model.header.valid,
fid,
)
[docs]
def compute_time(release_valid: datetime, etime: str):
"""Compute the valid time."""
# etime is MMMSS with a negative number being missing
if etime[0] == "-":
return None
seconds = int(etime[-2:])
minutes = 0
if len(etime) > 2:
minutes = int(etime[:-2])
return release_valid + timedelta(minutes=minutes, seconds=seconds)
[docs]
def convert_pressure(text: str) -> float:
"""Convert the pressure value."""
pressure = float(text) / 100.0
if pressure < 0:
return None
return pressure
[docs]
def convert_height(text: str) -> int:
"""Convert the height value."""
height = int(text)
if height < 0:
return None
return height
[docs]
def calc_dewp(tmpc: Optional[float], text: str) -> Optional[float]:
"""Compute the dewpoint from the dew point depression."""
dpdp = convert_float(text)
if tmpc is None or dpdp is None:
return None
return tmpc - dpdp
[docs]
def convert_float(
text: str, gt: Optional[float] = None, lt: Optional[float] = None
) -> Optional[float]:
"""Convert the temperature value."""
val = float(text) / 10.0
if (
val < -100
or (gt is not None and val <= gt)
or (lt is not None and val >= lt)
):
return None
return val
[docs]
def convert_wind(text: str) -> Optional[int]:
"""Convert the wind value."""
wind = int(text)
if wind < 0:
return None
return wind
[docs]
def process_sounding(text: str) -> Sounding:
"""Process the IGRA sounding text."""
header = None
records = []
for line in text.strip().split("\n"):
if header is None:
header = parse_header(line)
continue
tmpc = convert_float(line[22:27].strip())
record = {
"lvltyp1": int(line[0:1]),
"lvltyp2": int(line[1:2]),
"valid": compute_time(header.release_valid, line[3:8].strip()),
"press": convert_pressure(line[9:15]),
"pflag": line[15:16],
"gph": convert_height(line[16:21].strip()),
"zflag": line[21:22],
"temp": tmpc,
"tflag": line[27:28],
"rh": convert_float(line[28:33].strip(), gt=0, lt=104),
"dewp": calc_dewp(tmpc, line[34:39].strip()),
"wdir": convert_wind(line[40:45].strip()),
"wspd": convert_float(line[46:51].strip()),
}
try:
records.append(SoundingRecord(**record))
except ValidationError as exp:
LOG.info("Record: %s failed valdiation: %s", record, exp)
return Sounding(header, records)
[docs]
def process_ytd(filename: str):
"""Process the YTD file on NCEI's webserver."""
msg = ""
with open(filename) as fh:
for line in fh:
if line.strip() == "":
continue
if line.startswith("#"):
if msg:
yield process_sounding(msg)
msg = line
else:
msg += line
if msg:
yield process_sounding(msg)