from __future__ import annotations
import itertools
import logging
import numpy as np
import pandas as pd
from hydrobricks._exceptions import DataError
from hydrobricks.time_series import TimeSeries1D
from hydrobricks.trainer import evaluate
logger = logging.getLogger(__name__)
[docs]
class Observations(TimeSeries1D):
"""Class for observation time series data"""
def __init__(self) -> None:
"""Initialize Observations instance."""
super().__init__()
[docs]
def compute_reference_metric(
self,
metric: str,
start_date: str | None = None,
end_date: str | None = None,
with_exclusion: bool = False,
mean_discharge: bool = False,
all_combinations: bool = False,
n_evals: int = 100,
) -> float:
"""
Compute a reference for the provided metric (goodness of fit)
by block bootstrapping the observed series n_evals times (100 times by default),
evaluating the bootstrapped series using the provided metric and computing
the mean of the results.
Parameters
----------
metric
The abbreviation of the function as defined in HydroErr
(https://hydroerr.readthedocs.io/en/stable/list_of_metrics.html)
Examples: 'nse', 'kge_2012', 'rmse', etc.
start_date
Start date string for period of interest (format: 'YYYY-MM-DD').
If None, uses full time series. Default: None
end_date
End date string for period of interest (format: 'YYYY-MM-DD').
If None, uses full time series. Default: None
with_exclusion
If True, avoid using the same year's data for the same position in the
bootstrapped sample, ensuring no self-selection for specific years.
Default: False
mean_discharge
If True, computes the average on the discharge directly rather than on
the result of the HydroErr function. Default: False
all_combinations
If True uses all combinations possible for the bootstrapping.
If False, randomly samples n_evals combinations. Default: False
n_evals
Number of random evaluations to perform (ignored if all_combinations=True).
Default: 100
Returns
-------
float
The mean value of n_evals realizations of the selected metric.
Raises
------
DataError
If there is only one year of data (insufficient for block bootstrapping).
ValueError
If metric is not recognized or if time series setup is invalid.
Examples
--------
>>> obs = Observations()
>>> obs.load_from_csv('data.csv', 'date', '%Y-%m-%d', {'discharge': 'Q'})
>>> ref_metric = obs.compute_reference_metric('nse', n_evals=100)
"""
# Prepare base dataframe
df, years = self._prepare_dataframe()
# Handle mean discharge computation separately
if mean_discharge:
return self._compute_mean_discharge_metric(df, metric, start_date, end_date)
# Validate sufficient data
self._validate_years_for_bootstrapping(years)
# Prepare comparison data
df, comparing_years, comparing_df = self._prepare_comparison_data(
df, years, start_date, end_date
)
# Compute metrics via bootstrapping
if all_combinations:
metrics = self._compute_all_combinations(
df, years, comparing_years, comparing_df, metric, with_exclusion
)
else:
metrics = self._compute_random_sampling(
df,
years,
comparing_years,
comparing_df,
metric,
with_exclusion,
n_evals,
)
ref_metric = float(np.mean(metrics))
logger.info(f"Reference metric computed: {ref_metric}")
return ref_metric
def _prepare_dataframe(self) -> tuple[pd.DataFrame, np.ndarray]:
"""
Prepare the base dataframe with time series data and years.
Returns
-------
tuple[pd.DataFrame, np.ndarray]
DataFrame with data indexed by date, and array of unique years.
"""
df = self.time.to_frame(name="date").copy()
df["data"] = self.data[0]
df["year"] = pd.DatetimeIndex(df["date"]).year
df = df.set_index("date")
# Remove February 29 to ensure all years have 365 days
df = df[df.index.strftime("%m-%d") != "02-29"]
years = df.year.unique()
return df, years
def _compute_mean_discharge_metric(
self,
df: pd.DataFrame,
metric: str,
start_date: str | None,
end_date: str | None,
) -> float:
"""
Compute metric using mean discharge grouped by day of year.
Parameters
----------
df
DataFrame with observation data
metric
Metric name to evaluate
start_date
Optional start date for filtering
end_date
Optional end date for filtering
Returns
-------
float
Computed metric value
"""
# Use groupby to compute mean discharge per day of year
df["day_of_year"] = df.index.strftime("%m-%d")
df["mean_discharge"] = df.groupby("day_of_year")["data"].transform("mean")
# Apply date filtering if specified
if start_date:
df = df[df.index >= start_date]
if end_date:
df = df[df.index <= end_date]
return evaluate(df.mean_discharge.values, df.data.values, metric)
def _validate_years_for_bootstrapping(self, years: np.ndarray) -> None:
"""
Validate that sufficient years are available for bootstrapping.
Parameters
----------
years
Array of unique years in the dataset
Raises
------
DataError
If fewer than 2 years of data are available
"""
if len(years) < 2:
raise DataError(
"At least two years of data are required for block bootstrapping. "
f"Found only {len(years)} year(s).",
data_type="observations",
reason="Insufficient years for metric computation",
)
def _prepare_comparison_data(
self,
df: pd.DataFrame,
years: np.ndarray,
start_date: str | None,
end_date: str | None,
) -> tuple[pd.DataFrame, np.ndarray, pd.DataFrame]:
"""
Prepare data for comparison based on date range.
Parameters
----------
df
DataFrame indexed by date
years
Array of all available years
start_date
Optional start date
end_date
Optional end date
Returns
-------
tuple[pd.DataFrame, np.ndarray, pd.DataFrame]
Tuple of (df indexed by year, comparing years, comparison dataframe)
"""
df = df.reset_index().set_index("year")
if start_date and end_date:
date_range = pd.date_range(start_date, end_date)
comparing_years = np.unique(date_range.year)
comparing_df = df.set_index("date")
comparing_df = comparing_df[
(comparing_df.index >= start_date) & (comparing_df.index <= end_date)
]
else:
comparing_years = years
comparing_df = df
return df, comparing_years, comparing_df
def _should_exclude_sample(
self, sampled_years: np.ndarray, comparing_years: np.ndarray
) -> bool:
"""
Check if a sample should be excluded based on exclusion criteria.
Parameters
----------
sampled_years
Years selected in the current bootstrap sample
comparing_years
Years in the comparison dataset
Returns
-------
bool
True if sample should be excluded, False otherwise
"""
diff = sampled_years - comparing_years
return not np.all(diff)
def _compute_all_combinations(
self,
df: pd.DataFrame,
years: np.ndarray,
comparing_years: np.ndarray,
comparing_df: pd.DataFrame,
metric: str,
with_exclusion: bool,
) -> list[float]:
"""
Compute metrics for all possible year combinations.
Parameters
----------
df
DataFrame indexed by year
years
All available years
comparing_years
Years to compare against
comparing_df
DataFrame for comparison
metric
Metric to evaluate
with_exclusion
Whether to exclude self-selection
Returns
-------
list[float]
List of computed metric values
"""
metrics = []
year_combinations = itertools.product(years, repeat=len(comparing_years))
for sampled_years_tuple in year_combinations:
sampled_years = np.array(sampled_years_tuple)
if with_exclusion and self._should_exclude_sample(
sampled_years, comparing_years
):
continue
new_df = df.loc[sampled_years].copy()
value = evaluate(new_df.data.values, comparing_df.data.values, metric)
metrics.append(value)
return metrics
def _compute_random_sampling(
self,
df: pd.DataFrame,
years: np.ndarray,
comparing_years: np.ndarray,
comparing_df: pd.DataFrame,
metric: str,
with_exclusion: bool,
n_evals: int,
) -> np.ndarray:
"""
Compute metrics using random sampling of year combinations.
Parameters
----------
df
DataFrame indexed by year
years
All available years
comparing_years
Years to compare against
comparing_df
DataFrame for comparison
metric
Metric to evaluate
with_exclusion
Whether to exclude self-selection
n_evals
Number of random evaluations
Returns
-------
np.ndarray
Array of computed metric values
"""
metrics = []
while len(metrics) < n_evals:
sampled_years = np.random.choice(
years, size=len(comparing_years), replace=True
)
if with_exclusion and self._should_exclude_sample(
sampled_years, comparing_years
):
continue
new_df = df.loc[sampled_years].copy()
value = evaluate(new_df.data.values, comparing_df.data.values, metric)
metrics.append(value)
return np.array(metrics)