Source code for tstrends.label_tuning.filtering

"""
Efficiency-based intra-trend weighting for tuned labels.

Within each non-neutral trend interval, weights emphasize timesteps where
forward-looking directional efficiency (net move over path length) is high.
"""

from itertools import pairwise

import numpy as np

from tstrends.label_tuning.base import BaseFilter

_EPS = 1e-8


def _parse_window_pair(
    abs_val: int | None,
    rel_val: float | None,
    *,
    abs_param: str,
    rel_param: str,
) -> tuple[int | None, float | None]:
    """Validate mutually exclusive absolute / relative window and return the pair."""
    if abs_val is not None:
        if not isinstance(abs_val, int) or abs_val < 1:
            raise ValueError(f"{abs_param} must be an integer >= 1")
        return abs_val, None
    if rel_val is not None:
        fr = float(rel_val)
        if fr <= 0.0 or fr >= 1.0:
            raise ValueError(f"{rel_param} must be a float in (0, 1)")
        return None, fr
    raise ValueError(f"either {abs_param} or {rel_param} must be provided")


def _window_length_for_interval(
    abs_val: int | None,
    rel_val: float | None,
    interval_len: int,
) -> int:
    """Map stored abs/rel spec to an integer window for an interval of length ``n``."""
    if abs_val is not None:
        return abs_val
    n = interval_len
    return max(1, min(n - 1, int(round(rel_val * n))))


[docs] class ForwardLookingFilter(BaseFilter): """ Per-timestep weights from a normalized forward-looking efficiency metric. For timestep ``t`` inside an interval (with horizon ``h``): .. math:: e_t = \\frac{|x_{t+h} - x_t|}{\\sum_{k=1}^{h} |x_{t+k} - x_{t+k-1}| + \\epsilon} Weights are smoothed to avoid propagating time series noise into the labels and scaled by a high quantile (avoiding a max to add robustness) so that low-efficiency regions are downweighted but not zeroed. """
[docs] def __init__( self, forward_window: int | None = None, forward_window_rel: float | None = None, smoothing_window: int | None = None, smoothing_window_rel: float | None = None, quantile: float = 0.95, ) -> None: """ Args: forward_window: Absolute horizon ``h`` for efficiency (net vs path length). forward_window_rel: Relative horizon as a fraction of each trend interval length (mutually exclusive with ``forward_window``). smoothing_window: Length of the centered moving-average kernel on the filtering weights. smoothing_window_rel: Relative length of the centered moving-average kernel on the filtering weights, as a fraction of each trend interval length (mutually exclusive with ``smoothing_window``). quantile: High quantile for robust normalization denominator. """ self._forward_spec = _parse_window_pair( forward_window, forward_window_rel, abs_param="forward_window", rel_param="forward_window_rel", ) self._smooth_spec = _parse_window_pair( smoothing_window, smoothing_window_rel, abs_param="smoothing_window", rel_param="smoothing_window_rel", ) if not isinstance(quantile, (int, float)) or not 0.0 < float(quantile) <= 1.0: raise ValueError("quantile must be in (0, 1]") self.quantile = float(quantile)
def _compute_efficiency(self, x: np.ndarray, h: int) -> np.ndarray: """Raw efficiency ``e``; trailing ``h`` positions are zero (undefined horizon).""" n = len(x) e = np.zeros(n, dtype=float) abs_diff = np.abs(np.diff(x)) cumsum = np.concatenate(([0.0], np.cumsum(abs_diff))) path = cumsum[h:] - cumsum[:-h] net = np.abs(x[h:] - x[:-h]) e[: n - h] = net / (path + _EPS) return e def _smooth_efficiency(self, e: np.ndarray) -> np.ndarray: """Centered moving average (same length as ``e``).""" window = _window_length_for_interval(*self._smooth_spec, len(e)) kernel = np.ones(window, dtype=float) / window return np.convolve(e, kernel, mode="same") def _robust_normalize(self, e: np.ndarray) -> np.ndarray: """Scale by high quantile and clip to ``[0, 1]``.""" denom = float(np.quantile(e, q=self.quantile)) w = e / (denom + _EPS) return np.clip(w, 0.0, 1.0) def _weights_for_interval(self, interval_ts: np.ndarray) -> np.ndarray: """Full stabilization pipeline for one contiguous interval.""" x = np.asarray(interval_ts, dtype=float) n = len(x) if n == 0: return np.array([], dtype=float) h = _window_length_for_interval(*self._forward_spec, n) if n <= h: return np.ones(n, dtype=float) e = self._compute_efficiency(x, h) e_smooth = self._smooth_efficiency(e) return self._robust_normalize(e_smooth)
[docs] def get_coefficients( self, time_series: list[float] | np.ndarray, labels: list[int] | np.ndarray, ) -> np.ndarray: self._verify_inputs(time_series, labels) ts_array = np.asarray(time_series, dtype=float) labels_array = np.asarray(labels) coefficients = np.ones(len(time_series), dtype=float) bounds = self._find_trend_intervals(labels) n_labels = len(labels_array) for start, end in pairwise(bounds): if labels_array[start] == 0: continue # ``end`` is the last index of the series for the final pair; otherwise it # is the first index of the next segment (exclusive upper bound). stop_exclusive = end + 1 if end == n_labels - 1 else end interval_slice = slice(start, stop_exclusive) interval_ts = ts_array[interval_slice] coefficients[interval_slice] = self._weights_for_interval(interval_ts) return coefficients