Source code for pyiem.network
"""Network Table."""
from collections import OrderedDict
from pyiem.database import get_dbconnc
[docs]
class Table:
"""Our class"""
def __init__(self, network, cursor=None, only_online=True):
"""A class representing a network(s) of IEM metadata
Args:
network (str or list): A network identifier used by the IEM, this can
be either a string or a list of strings.
cursor (dbcursor,optional): A database cursor to use for the query
only_online (bool,otional): Should the listing of stations include
only those that are currently flagged as online.
"""
self.sts = OrderedDict()
if network is None:
return
if cursor is None:
dbconn, _cursor = get_dbconnc("mesosite")
else:
dbconn, _cursor = None, cursor
if isinstance(network, str):
network = [network]
if isinstance(network, tuple):
network = list(network)
online_extra = " and online " if only_online else ""
_cursor.execute(
f"""
WITH myattrs as (
SELECT a.iemid, array_agg(attr) as attrs,
array_agg(value) as attr_values from stations s JOIN
station_attributes a on (s.iemid = a.iemid) WHERE
s.network = any(%s) GROUP by a.iemid
), mythreading as (
SELECT a.iemid, array_agg(source_iemid) as threading_sources,
array_agg(begin_date) as threading_begin_dates,
array_agg(coalesce(end_date, 'TOMORROW'::date))
as threading_end_dates
from stations s JOIN
station_threading a on (s.iemid = a.iemid) WHERE
s.network = any(%s) GROUP by a.iemid
)
SELECT s.*, ST_x(geom) as lon, ST_y(geom) as lat,
a.attrs, a.attr_values, m.threading_sources,
m.threading_begin_dates, m.threading_end_dates
from stations s
LEFT JOIN myattrs a on (s.iemid = a.iemid)
LEFT JOIN mythreading m on (s.iemid = m.iemid)
WHERE network = any(%s) {online_extra} ORDER by name ASC
""",
(network, network, network),
)
for row in _cursor:
self.sts[row["id"]] = dict(row)
self.sts[row["id"]]["attributes"] = dict(
zip(row["attrs"] or [], row["attr_values"] or [], strict=False)
)
td = self.sts[row["id"]].setdefault("threading", [])
for i, s, e in zip(
row["threading_sources"] or [],
row["threading_begin_dates"] or [],
row["threading_end_dates"] or [],
strict=False,
):
td.append({"iemid": i, "begin_date": s, "end_date": e})
if cursor is None:
dbconn.close()
[docs]
def get_threading_id(self, sid, valid) -> str:
"""Return a station identifier (not iemid) based on threading.
Lookup what the threaded station identifier is based on this given
timestamp/date.
Args:
sid (str): station identifier to check threading for.
valid (datetime.date): lookup for comparison.
"""
entry = self.sts.get(sid)
if entry is None or not entry["threading"]:
return None
for tinfo in entry["threading"]:
if valid < tinfo["begin_date"] or valid >= tinfo["end_date"]:
continue
return self.get_id_by_key("iemid", tinfo["iemid"])
return None
[docs]
def get_id_by_key(self, key, value) -> str:
"""Find a station id by a given attribute = value.
Args:
key (str): attribute to lookup.
value (mixed): value to compare against
Returns:
station_id
"""
for sid in self.sts:
if self.sts[sid].get(key) == value:
return sid
return None