"""Utilities for the Daily Erosion Project"""
import math
import re
from datetime import date, timedelta
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
# Local
from pyiem.database import get_sqlalchemy_conn, sql_helper
from pyiem.iemre import EAST, NORTH, SOUTH, WEST
YLD_CROPTYPE = re.compile(r"Crop Type #\s+(?P<num>\d+)\s+is (?P<name>[^\s]+)")
YLD_DATA = re.compile(
(
r"Crop Type #\s+(?P<num>\d+)\s+Date = (?P<doy>\d+)"
r" OFE #\s+(?P<ofe>\d+)\s+yield=\s+(?P<yield>[0-9\.]+)"
r" \(kg/m\*\*2\) year= (?P<year>\d+)"
)
)
# 9 values for 8 colors on the website
# NB: Keep the lowest value just above zero so that plots always show data
RAMPS = {
"english": [
[0.01, 0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 5.0],
[0.01, 0.5, 1.0, 2.0, 3.0, 5.0, 7.0, 10.0, 20.0],
[0.01, 1.0, 2.0, 5.0, 7.0, 10.0, 20.0, 30.0, 40.0],
],
"metric": [
[0.25, 1.0, 10.0, 15.0, 25.0, 35.0, 50.0, 100.0, 200.0],
[0.25, 2.0, 20.0, 30.0, 50.0, 70.0, 100.0, 200.0, 400.0],
[0.25, 25.0, 50.0, 125.0, 200.0, 300.0, 500.0, 750.0, 1000.0],
],
}
[docs]
def load_scenarios() -> pd.DataFrame:
"""Build a dataframe of DEP scenarios."""
with get_sqlalchemy_conn("idep") as conn:
return pd.read_sql(
sql_helper("SELECT * from scenarios ORDER by id ASC"),
conn,
index_col="id",
)
[docs]
def get_cli_fname(lon, lat, scenario=0):
"""Get the climate file name for the given lon, lat, and scenario"""
# The trouble here is relying on rounding is problematic, so we just
# truncate
lat = round(lat, 2)
lon = round(lon, 2)
if lat < SOUTH or lat > NORTH:
raise ValueError(f"lat: {lat} outside of bounds: {SOUTH} {NORTH}")
if lon < WEST or lon > EAST:
raise ValueError(f"lon: {lon} outside of bounds: {WEST} {EAST}")
return (
f"/i/{scenario}/cli/{math.floor(0 - lon):03.0f}x"
f"{math.floor(lat):03.0f}/{(0 - lon):06.2f}x{lat:06.2f}.cli"
)
[docs]
def read_yld(filename):
"""read WEPP yld file with some local mods to include a year
Args:
filename (str): Filename to read
Returns:
pandas.DataFrame
"""
with open(filename, encoding="utf8") as fh:
data = fh.read()
xref = {}
for cropcode, label in YLD_CROPTYPE.findall(data):
xref[cropcode] = label # noqa
rows = []
for cropcode, doy, ofe, yld, year in YLD_DATA.findall(data):
dt = date(int(year), 1, 1) + timedelta(days=int(doy) - 1)
rows.append(
dict(
valid=dt,
year=int(year),
yield_kgm2=float(yld),
crop=xref[cropcode],
ofe=int(ofe),
)
)
return pd.DataFrame(rows)
[docs]
def read_slp(filename):
"""read WEPP slp file.
Args:
filename (str): Filename to read
Returns:
list of slope profiles
"""
with open(filename, encoding="utf8") as fh:
lines = [a[: a.find("#")].strip() for a in fh]
segments = int(lines[5])
res = [None] * segments
xpos = 0
elev = 0
for seg in range(segments):
# first line is pts and x-length
(_pts, length) = [float(x) for x in lines[7 + seg * 2].split()]
# next line is the combo of x-position along length and slope at pt
line2 = lines[8 + seg * 2].replace(",", "")
tokens = np.array([float(x) for x in line2.split()])
# first value in each pair is the x-pos relative to the length
xs = xpos + tokens[::2] * length
# second value is the slope at that position?
slopes = tokens[1::2]
# initialize the y-position at the current elevation
ys = [elev]
for i in range(1, len(slopes)):
# dx * slope
elev -= (xs[i] - xs[i - 1]) * slopes[i - 1]
ys.append(elev)
res[seg] = {"x": xs, "slopes": slopes, "y": np.array(ys)}
xpos += length
return res
[docs]
def man2df(mandict: dict, year1: int = 1) -> pd.DataFrame:
"""Convert nasty dictionary returned from `read_man` into `pd.DataFrame`.
The DataFrame is oriented with OFE, year.
Args:
mandict (dict): Dictionary populated from read_man.
year1 (int,optional): What does WEPP year index 1 equate to in the
real world! The default of 1 just uses what WEPP does.
Returns:
pd.DataFrame
"""
rows = []
baseyear = year1 # roundabout
for iofe in range(mandict["iofe"]):
for iyear in range(mandict["inyr"]):
year = iyear + baseyear
scenyr = mandict["rotations"][iyear][iofe]["yearindex"]
ncrop = mandict["scens"][scenyr - 1]["ntype"]
tilseq = mandict["scens"][scenyr - 1]["tilseq"]
plant_date = None
for surfeff in mandict["surfeffects"][tilseq - 1]["tills"]:
op = surfeff["op"]
if (
mandict["operations"][op - 1]["scecomment"].find("Planter")
> -1
):
doy = surfeff["mdate"]
plant_date = date(year, 1, 1) + timedelta(days=doy - 1)
rows.append(
{
"year": year,
"ofe": iofe + 1,
"plant_date": plant_date,
"crop_name": mandict["crops"][ncrop - 1]["crpnam"],
}
)
return pd.DataFrame(rows)
[docs]
def read_man(filename):
"""Implements WEPP's INFILE.for for reading management file
Args:
filename (str): Filename to read
Returns:
dict of management info
"""
res = {}
# Step one make a array of any data
with open(filename, encoding="ascii") as fh:
lines = [a[: a.find("#")].strip() for a in fh]
res["manver"] = lines[0]
res["iofe"] = int(lines[6])
res["inyr"] = int(lines[7])
res["ncrop"] = int(lines[13])
res["crops"] = [None] * res["ncrop"]
linenum = 16
for ncrop in range(res["ncrop"]):
res["crops"][ncrop] = {
"crpnam": lines[linenum],
"crpcomment": "\n".join(lines[linenum + 1 : linenum + 4]),
"iplant": int(lines[linenum + 4]),
}
linenum += 5
if res["crops"][ncrop]["iplant"] == 1:
# TODO
linenum += 7
linenum += 4
res["nop"] = int(lines[linenum])
linenum += 3
res["operations"] = [None] * res["nop"]
for nop in range(res["nop"]):
res["operations"][nop] = {
"scenam": lines[linenum],
"scecomment": "\n".join(lines[linenum + 1 : linenum + 4]),
"iplant": int(lines[linenum + 4]),
}
linenum += 9
linenum += 6
res["nini"] = int(lines[linenum])
res["ini"] = [None] * res["nini"]
linenum += 3
for ini in range(res["nini"]):
res["ini"][ini] = {
"scenam": lines[linenum],
"scecomment": "\n".join(lines[linenum + 1 : linenum + 4]),
"iplant": int(lines[linenum + 4]),
}
linenum += 14
linenum += 6
res["nsurf"] = int(lines[linenum])
res["surfeffects"] = [None] * res["nsurf"]
linenum += 6
for surf in range(res["nsurf"]):
res["surfeffects"][surf] = {
"scenam": lines[linenum],
"scecomment": "\n".join(lines[linenum + 1 : linenum + 4]),
"iplant": int(lines[linenum + 4]),
"ntill": int(lines[linenum + 5]),
}
res["surfeffects"][surf]["tills"] = [None] * res["surfeffects"][surf][
"ntill"
]
linenum += 6
for till in range(res["surfeffects"][surf]["ntill"]):
res["surfeffects"][surf]["tills"][till] = {
"mdate": int(lines[linenum]),
"op": int(lines[linenum + 1]),
"depth": float(lines[linenum + 2]),
"type": int(lines[linenum + 3]),
}
linenum += 4
linenum += 4
linenum += 2
res["ncnt"] = int(lines[linenum])
linenum += 7
res["ndrain"] = int(lines[linenum])
linenum += 7
res["nmscen"] = int(lines[linenum])
linenum += 4
res["scens"] = [None] * res["nmscen"]
for scen in range(res["nmscen"]):
res["scens"][scen] = {
"scenam": lines[linenum],
"scecomment": "\n".join(lines[linenum + 1 : linenum + 4]),
"iplant": int(lines[linenum + 4]),
"ntype": int(lines[linenum + 5]),
"tilseq": int(lines[linenum + 6]),
"conseq": int(lines[linenum + 7]),
"drseq": int(lines[linenum + 8]),
"imngmt": int(lines[linenum + 9]),
}
if res["scens"][scen]["iplant"] == 1:
if res["scens"][scen]["imngmt"] in [1, 3]:
# Annual/Fallow Cropping system
res["scens"][scen]["jdharv"] = int(lines[linenum + 10])
res["scens"][scen]["jdplt"] = int(lines[linenum + 11])
res["scens"][scen]["r1"] = float(lines[linenum + 12])
res["scens"][scen]["resmgt"] = int(lines[linenum + 13])
if res["scens"][scen]["resmgt"] == 1:
res["scens"][scen]["jdherb"] = int(lines[linenum + 14])
linenum += 15
elif res["scens"][scen]["resmgt"] == 2:
res["scens"][scen]["jdburn"] = int(lines[linenum + 14])
res["scens"][scen]["fbrna1"] = float(
lines[linenum + 15].split()[0]
)
res["scens"][scen]["fbrno1"] = float(
lines[linenum + 15].split()[1]
)
linenum += 16
elif res["scens"][scen]["resmgt"] == 3:
res["scens"][scen]["jdslge"] = int(lines[linenum + 14])
linenum += 15
elif res["scens"][scen]["resmgt"] == 4:
res["scens"][scen]["jdcut"] = int(lines[linenum + 14])
res["scens"][scen]["frcu1"] = int(lines[linenum + 15])
linenum += 16
elif res["scens"][scen]["resmgt"] == 5:
res["scens"][scen]["jdmove"] = int(lines[linenum + 14])
res["scens"][scen]["frmov1"] = float(lines[linenum + 14])
linenum += 16
elif res["scens"][scen]["resmgt"] == 6:
linenum += 14
else:
# Perrenial Cropland
res["scens"][scen]["jdharv"] = int(lines[linenum + 10])
res["scens"][scen]["jdplt"] = int(lines[linenum + 11])
res["scens"][scen]["jdstop"] = int(lines[linenum + 12])
res["scens"][scen]["r1"] = float(lines[linenum + 13])
res["scens"][scen]["mgtopt"] = int(lines[linenum + 14])
if res["scens"][scen]["mgtopt"] == 1:
# Cutting
res["scens"][scen]["ncut"] = int(lines[linenum + 15])
res["scens"][scen]["cuts"] = [None] * res["scens"][scen][
"ncut"
]
linenum += 16
for cut in range(res["scens"][scen]["ncut"]):
res["scens"][scen]["cuts"][cut] = int(lines[linenum])
linenum += 1
elif res["scens"][scen]["mgtopt"] == 2:
# Grazing
res["scens"][scen]["ncycle"] = int(lines[linenum + 15])
res["scens"][scen]["cycles"] = [None] * res["scens"][scen][
"ncycle"
]
linenum += 16
for cycle in range(res["scens"][scen]["ncycle"]):
res["scens"][scen]["cycles"][cycle] = {
"arr": lines[linenum],
"gday": int(lines[linenum + 1]),
"gend": int(lines[linenum + 2]),
}
linenum += 1
elif res["scens"][scen]["mgtopt"] == 3:
linenum += 15
elif res["scens"][scen]["iplant"] == 2:
pass
linenum += 3
linenum += 3
res["mantitle"] = lines[linenum]
res["mandesc"] = "\n".join(lines[linenum + 1 : linenum + 4])
linenum += 4
res["nwsofe"] = int(lines[linenum])
res["inindx"] = [None] * res["nwsofe"]
linenum += 1
for idx in range(res["nwsofe"]):
res["inindx"][idx] = int(lines[linenum])
linenum += 1
res["nrots"] = int(lines[linenum])
linenum += 1
res["nyears"] = int(lines[linenum])
sz = res["nyears"] * res["nrots"]
res["rotations"] = [None] * sz
linenum += 6
yidx = 0
for _rot in range(res["nrots"]):
for _year in range(res["nyears"]):
res["rotations"][yidx] = [None] * res["nwsofe"]
for ofe in range(res["nwsofe"]):
res["rotations"][yidx][ofe] = {
"plant": int(lines[linenum]),
"yearindex": int(lines[linenum + 1]),
}
linenum += 3
yidx += 1
linenum += 4
return res
[docs]
def rfactor(times, points, return_rfactor_metric=True):
"""Compute the R-factor.
https://www.hydrol-earth-syst-sci.net/19/4113/2015/hess-19-4113-2015.pdf
It would appear that a strict implementation would need to have a six
hour dry period around events and require more then 12mm of precipitation.
Args:
times (list): List of decimal time values for a date.
points (list): list of accumulated precip values (mm).
return_rfactor_metric (bool, optional): Should this return a metric
(default) or english unit R value.
Returns:
rfactor (float): Units of MJ mm ha-1 h-1
"""
# No precip!
if not times:
return 0
# interpolate dataset into 30 minute bins
func = interp1d(
times,
points,
kind="linear",
fill_value=(0, points[-1]),
bounds_error=False,
)
accum = func(np.arange(0, 24.01, 0.5))
rate_mmhr = (accum[1:] - accum[0:-1]) * 2.0
# sum of E x I
# I is the 30 minute peak intensity (mm h-1), capped at 3 in/hr
Imax = min([3.0 * 25.4, np.max(rate_mmhr)])
# E is sum of e_r (MJ ha-1 mm-1) * p_r (mm)
e_r = 0.29 * (1.0 - 0.72 * np.exp(-0.082 * rate_mmhr))
# rate * times
p_r = rate_mmhr / 2.0
# MJ ha-1 * mm h-1 or MJ inch a-1 h-1
unitconv = 1.0 if return_rfactor_metric else (1.0 / 25.4 / 2.47105)
return np.sum(e_r * p_r) * Imax * unitconv
[docs]
def read_cli(filename, compute_rfactor=False, return_rfactor_metric=True):
"""Read WEPP CLI File, Return DataFrame
Args:
filename (str): Filename to read
compute_rfactor (bool, optional): Should the R-factor be computed as
well, adds computational expense and default is False.
return_rfactor_metric (bool, optional): should the R-factor be
computed as the common metric value. Default is True.
Returns:
pandas.DataFrame
"""
rows = []
dates = []
with open(filename, encoding="ascii") as fh:
lines = fh.readlines()
linenum = 15
while linenum < len(lines):
(da, mo, year, breakpoints, tmax, tmin, rad, wvl, wdir, tdew) = lines[
linenum
].split()
breakpoints = int(breakpoints)
accum = 0
times = []
points = []
for i in range(1, breakpoints + 1):
(ts, accum) = lines[linenum + i].split()
times.append(float(ts))
points.append(float(accum))
maxr = 0
for i in range(1, len(times)):
dt = times[i] - times[i - 1]
dr = points[i] - points[i - 1]
maxr = max(maxr, dr / dt)
linenum += breakpoints + 1
dates.append(date(int(year), int(mo), int(da)))
rows.append(
{
"tmax": float(tmax),
"tmin": float(tmin),
"rad": float(rad),
"wvl": float(wvl),
"wdir": float(wdir),
"tdew": float(tdew),
"maxr": maxr,
"bpcount": breakpoints,
"pcpn": float(accum),
"rfactor": (
np.nan
if not compute_rfactor
else rfactor(
times,
points,
return_rfactor_metric=return_rfactor_metric,
)
),
}
)
return pd.DataFrame(rows, index=pd.DatetimeIndex(dates))
[docs]
def read_env(filename, year0=2006):
"""Read WEPP .env file, return a dataframe
Args:
filename (str): Filename to read
year0 (int,optional): The simulation start year minus 1
Returns:
pd.DataFrame
"""
df = pd.read_csv(
filename,
skiprows=3,
index_col=False,
sep=r"\s+",
header=None,
na_values=["*******", "******", "*****"],
names=[
"day",
"month",
"year",
"precip",
"runoff",
"ir_det",
"av_det",
"mx_det",
"point",
"av_dep",
"max_dep",
"point2",
"sed_del",
"er",
],
)
if df.empty:
df["date"] = None
else:
# Faster than +=
df["year"] = df["year"] + year0
# Considerably faster than df.apply
df["date"] = pd.to_datetime(
dict(year=df["year"], month=df["month"], day=df["day"])
)
return df
[docs]
def read_ofe(filename, year0=2006):
"""Read OFE .ofe file, return a dataframe
Args:
filename (str): Filename to read
year0 (int,optional): The simulation start year minus 1
Returns:
pd.DataFrame
"""
df = pd.read_csv(
filename,
skiprows=2,
index_col=False,
sep=r"\s+",
header=None,
na_values=["*******", "******", "********"],
names=[
"ofe",
"day",
"month",
"year",
"precip",
"runoff",
"effint",
"peakro",
"effdur",
"enrich_ratio",
"keff",
"sm",
"leafarea",
"canhght",
"cancov",
"intcov",
"rilcov",
"livbio",
"deadbio",
"ki",
"kr",
"tcrit",
"rilwid",
"sedleave",
],
)
if df.empty:
df["date"] = None
else:
# Faster than +=
df["year"] = df["year"] + year0
# Considerably faster than df.apply
df["date"] = pd.to_datetime(
dict(year=df["year"], month=df["month"], day=df["day"])
)
return df
def _date_from_year_jday(df):
"""Create a date column based on year and jday columns."""
df["date"] = pd.to_datetime(
df["year"].astype(str) + " " + df["jday"].astype(str), format="%Y %j"
)
[docs]
def read_wb(filename):
"""Read a *custom* WEPP .wb file into Pandas Data Table"""
df = pd.read_csv(filename, sep=r"\s+", na_values=["*******", "******"])
# Considerably faster than df.apply
_date_from_year_jday(df)
return df
[docs]
def read_crop(filename):
"""Read WEPP's plant and residue output file.
Args:
filename (str): The file to read in.
Returns:
pandas.DataFrame
"""
df = pd.read_csv(
filename,
skiprows=13,
index_col=False,
sep=r"\s+",
header=None,
na_values=["*******", "******", "********"],
names=[
"ofe",
"jday",
"year",
"canopy_height_m",
"canopy_percent",
"lai",
"cover_rill_percent",
"cover_inter_percent",
"cover_inter_type",
"live_biomass_kgm2",
"standing_residue_kgm2",
"flat_residue_last_type",
"flat_residue_last_kgm2",
"flat_residue_prev_type",
"flat_residue_prev_kgm2",
"flat_residue_all_type",
"flat_residue_all_kgm2",
"buried_residue_last_kgm2",
"buried_residue_prev_kgm2",
"buried_residue_all_kgm2",
"deadroot_residue_last_type",
"deadroot_residue_last_kgm2",
"deadroot_residue_prev_type",
"deadroot_residue_prev_kgm2",
"deadroot_residue_all_type",
"deadroot_residue_all_kgm2",
"avg_temp_c",
],
)
# Convert jday into dates
_date_from_year_jday(df)
return df