Source code for kcpdi.kcp_ss_learner

"""
This module, `kcp_ss_learner` (Kernel Change Point Sample Scorer Learner), includes a wrapper to
perform anomaly scoring as understood in the TADkit formalism, based on this library's change-point
detection methodology.

The kernel change-point detection algorithm outputs a final list of time indices at which it thinks
that "something happened".

Other methods output a score at __all__ time indices, where the higher the score is, the more we
believe that "something happened there".

In order to allow the kernel change-point method to be integrated into a Python package based
around score samples, we have implemented a function __kcp_ss__ which takes the __kcp_ds__ output
list of change-points and turns them into scores at __all time indices__.

Remember that the kernel change-point detection algorithm truly believes that it has found the true
and only set of change-points. However, due to random noise, it could have been that a true
change-point was immediately before or immediately after a detected change-point, i.e., 1 or 2 or
maybe even 3 time indices before or after.

We give a score of 1 at each detected change-point. Other points are assigned a score that
decreases as they move away from the detected change-point, in a way parameterized by a decay
parameter (gamma).
"""
from typing import Any, Dict, List, Literal, Optional, Sequence

from sklearn.base import BaseEstimator, OutlierMixin

from kcpdi import kcp_ds

import numpy as np


[docs] class KcpLearner(BaseEstimator, OutlierMixin): required_properties = [] params_description = { "decay_param": { "description": "How fast the score decays around the change-points.", "family": "postprocessing", "value_type": "log_range", "log_start": -2, "log_step": 0.1, "log_stop": 2, "default": 1, } } def __init__( self, kernel: Literal["linear", "cosine", "rbf"] = "linear", params: Optional[Dict[str, Any]] = None, max_n_time_points: int = 2000, min_n_time_points: int = 10, expected_frac_anomaly: float = 1 / 1000, decay_param: float = 1., ): self.kernel = kernel self.params = params self.max_n_time_points = max_n_time_points self.min_n_time_points = min_n_time_points self.expected_frac_anomaly = expected_frac_anomaly self.decay_param = decay_param
[docs] def fit(self, X, y=None): return self
@staticmethod def _kcp_ss( detected_change_points: Sequence[int], n_time_points: int, decay_param: float = 1., ) -> List[float]: """Take the output list of detected change-points from the function kcp_ds() and transform them into a score at each time point from 0 (low probability of an anomaly at the time point) to 1 (high probability of an anomaly at the time point). Args: detected_change_points: The detected change-points. It should be the first output of the kcp_ds function. n_time_points: The number of time points in the input data. Usually this will be something like: np.shape(data)[0]. decay_param: Parameter that defines how fast the score decays around the change-points. The larger this value, the faster it decays. """ if decay_param < 0: raise ValueError( "decay_param has to be positive." ) # Create list of 0s of length n_time_points kcp_scores = [0] * n_time_points if not len(detected_change_points): return kcp_scores for i in range(n_time_points): # Cases: # 1) i is in the list of change-point indices # 2) i is before the first change-point index # 3) i is between two change-point indices # 4) i is after the last change-point index # Create a local version of the input variable detected_change_points # and append i to it: local_detected_change_points = detected_change_points.copy() local_detected_change_points.append(i) # A couple of useful values: sum_first = sum(i < j for j in detected_change_points) sum_last = sum(i > j for j in detected_change_points) # Case (1) if len(set(local_detected_change_points)) == len(detected_change_points): kcp_scores[i] = 1 # Case (2) elif sum_first == len(detected_change_points): # We only have the distance to the first change-point index to deal with: curr_dist = abs(i - detected_change_points[0]) kcp_scores[i] = (1 / 2) ** (curr_dist * decay_param) # Case (4) elif sum_last == len(detected_change_points): # We only have the distance to the last change-point index to deal with: curr_dist = abs(i - detected_change_points[-1]) kcp_scores[i] = (1 / 2) ** (curr_dist * decay_param) # Case (3) else: n_left = sum(i > j for j in detected_change_points) curr_dist_left = abs(i - detected_change_points[n_left - 1]) curr_dist_right = abs(i - detected_change_points[n_left]) kcp_scores[i] = (1 / 2) * ( (1 / 2) ** (curr_dist_left * decay_param) + (1 / 2) ** (curr_dist_right * decay_param) ) return kcp_scores
[docs] def score_samples(self, X): if hasattr(X, "values"): # small hack to handle DataFrame s X = X.values detected_change_points, _ = kcp_ds( data=X, kernel=self.kernel, params=self.params, max_n_time_points=self.max_n_time_points, min_n_time_points=self.min_n_time_points, expected_frac_anomaly=self.expected_frac_anomaly, ) return np.array([- score for score in self._kcp_ss( detected_change_points=detected_change_points, n_time_points=len(X), decay_param=self.decay_param )])