Coverage for kcpdi/kcp_ds.py: 84%
69 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-01 10:11 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-10-01 10:11 +0000
1"""
2This module, `kcp_ds` (Kernel Change Point Detect Select), encapsulates functions from the Ruptures
3package and includes a function for penalized variable selection.
5Specific details and remarks:
61. The function outputs Python indices of the columns of "data" corresponding to the detected
7change points. Real-world times associated with the data are not considered in this function.
82. The ruptures class KernelCPD has a minimum allowed segment size (min_size). It is currently set
9to 2 by default and is not an input parameter for kcp_ds.
103. The default kernel is "linear," with options for "cosine" and "rbf" kernels. To cover all bases,
11an input params is required, which defaults to None but can be modified for the rbf kernel (e.g.,
12params = {"gamma": value_of_gamma}).
134. Due to memory constraints, the algorithm processes offline time-series data longer than max_n_time_points
14in segments of length max_n_time_points, and then outputs the full set of change points.
16Note: Please handle real-world times associated with the data independently as this module focuses
17on change point detection and variable selection.
18"""
20from typing import Any, Dict, List, Literal, Optional, Sequence, Tuple
21import math
23import numpy as np
24import ruptures as rpt
25from sklearn.linear_model import LinearRegression
27from kcpdi.utils import get_sum_of_cost
30def kcp_ds(
31 data: np.array,
32 kernel: Literal["linear", "cosine", "rbf"] = "linear",
33 params: Optional[Dict[str, Any]] = None,
34 max_n_time_points: int = 2000,
35 min_n_time_points: int = 10,
36 expected_frac_anomaly: float = 1 / 1000,
37) -> Tuple[List[int], List[int]]:
38 """Return a set of important change points change points after
39 running kernel change point detection followed by penalized model
40 selection using the slope heuristic method.
42 Since the criterion is responding to noise/overfitting, penalized
43 variable selection is performed in order to obtain a final decision
44 as to how many of the detected change points are truly significant
45 and not simply adjusted noise.
47 Args:
48 data: array of dimension (number of time points) x (number of
49 time-series) containing the data points.
50 kernel: Kernel of the kernet change_point detection. If it is
51 rbf, params must include a "gamma" parameters, with a
52 positive real value.
53 params: parameters for the kernel instance
54 max_n_time_points: maximum size (max_n_time_points) x
55 (max_n_time_points) of matrices expected to be processed
56 quickly by the computer system.
57 min_n_time_points: minimum number of time points in the current
58 dataset for which it makes sense to run the detection
59 algorithm again. If there are fewer than min_n_time_points
60 points in the dataset, no computations will be run and the
61 outputs will be empty.
62 expected_frac_anomaly: This parameter encodes prior knowledge
63 of users as to how often anomalies might occur. Results can
64 be quite dependent on the choice of this parameter, so
65 choose carefully!
66 """
67 if max_n_time_points < min_n_time_points:
68 raise ValueError("max_n_time_points should be greater than min_n_time_points.")
70 if expected_frac_anomaly > 1 / 2:
71 raise ValueError("expected_frac_anomaly should be at most 1/2.")
73 n_total_time_points = np.shape(data)[0]
75 current_data = np.copy(data) # new variable for internal manipulation
77 n_current_time_points = np.shape(current_data)[0]
78 curr_n_remaining_t_points = n_current_time_points
80 # Initialise an empty list to store the end points of the intervals of length max_n_time_points
81 # (except for the last interval, which could be shorter). This variable will be required for
82 # the explainability function that can be run after the current function:
83 interval_end_points = []
85 current_zero = 0 # remember the original Python indices
87 detected_change_points = []
89 while curr_n_remaining_t_points > max_n_time_points:
90 # Get the block of data to plug into the Ruptures setting:
91 data_use = current_data[:max_n_time_points, :]
93 # Store the true index of the time point at the end of this interval, which will be needed
94 # in the explainability function later:
95 interval_end_points.append(current_zero + max_n_time_points)
97 detected_change_points, next_ch_pts = _detect_changepoints_from_kernelcpd(
98 kernel=kernel,
99 params=params,
100 data_use=data_use,
101 expected_frac_anomaly=expected_frac_anomaly,
102 current_zero=current_zero,
103 detected_change_points=detected_change_points,
104 )
106 # Concatenate any current data rows left after the last change point found, to the
107 # remaining data (if there is any left):
108 current_data = data[(next_ch_pts[-1] - 1):, :]
110 current_zero = next_ch_pts[-1] - 1
112 n_current_time_points = np.shape(current_data)[0]
113 curr_n_remaining_t_points = n_current_time_points
115 # Deal with any residual block of data that is shorter than (or equal to)
116 # max_n_time_points but still larger than min_n_time_points:
117 if (curr_n_remaining_t_points <= max_n_time_points) & (
118 curr_n_remaining_t_points >= min_n_time_points
119 ):
120 # Store the true index of the time point at the end of this interval, which will be needed
121 # in the explainability function later. Unlike in the above while loop, here the time point
122 # at the end of the interval is the last time point in the dataset:
123 interval_end_points.append(n_total_time_points)
125 detected_change_points, next_ch_pts = _detect_changepoints_from_kernelcpd(
126 kernel=kernel,
127 params=params,
128 data_use=current_data,
129 expected_frac_anomaly=expected_frac_anomaly,
130 current_zero=current_zero,
131 detected_change_points=detected_change_points,
132 )
134 # We need to remove any "detected" change-points which were actually ends of intervals.
135 # First we add 1 to each detected_change_point:
136 temp_detec = [
137 detected_change_points[i] + 1 for i in range(len(detected_change_points))
138 ]
139 # Now look for matches with values in "interval_end_points"
140 index_matches = [
141 i
142 for i in range(len(detected_change_points))
143 if temp_detec[i] in interval_end_points
144 ]
145 detected_change_points = [
146 i for j, i in enumerate(detected_change_points) if j not in index_matches
147 ]
149 return detected_change_points, interval_end_points
152def _detect_changepoints_from_kernelcpd(
153 kernel: Literal["linear", "cosine", "rbf"],
154 params: Optional[Dict[str, Any]],
155 data_use: np.array,
156 expected_frac_anomaly: float,
157 current_zero: int,
158 detected_change_points: List[int],
159) -> Tuple[List[int], List[int]]:
160 """Apply kernel change point detection and update
161 detected_change_points.
163 Auxiliary function to kcp_ds. Update the detected_change_points
164 variable in place.
166 Args:
167 kernel: Kernel of the kernet change_point detection. If it is
168 rbf, params must include a "gamma" parameters, with a
169 positive real value.
170 params: parameters for the kernel instance
171 data_use: chunk of data on which to apply the detection
172 expected_frac_anomaly: This parameter encodes prior knowledge
173 of users as to how often anomalies might occur. Results can
174 be quite dependent on the choice of this parameter, so
175 choose carefully!
176 current_zero: index of the first point of the current chunk of
177 the dataset
178 detected_change_points: all breakpoints detected so far
180 Returns:
181 The list of all points detected so far.
182 The list of points newly detected during the last iteration.
183 """
184 # Associate the data with the class KernelCPD from the Ruptures library:
185 algo = rpt.KernelCPD(kernel=kernel, params=params)
186 algo.fit(data_use)
188 # Get a value for the maximum number of change-points to look for; essentially,
189 # this is set as = 2 * fraction of expected anomalies * number of current time points.
190 n_bkps_max = max(
191 1, int(2 * np.floor(expected_frac_anomaly * np.shape(data_use)[0]))
192 )
194 array_of_n_bkps = np.arange(1, n_bkps_max + 1)
195 algo_costs = [
196 get_sum_of_cost(algo=algo, n_bkps=n_bkps) for n_bkps in array_of_n_bkps
197 ] # value of the criterion for when we have chosen to stick with 1 up to n_bkps_max points
198 # Since each time we add an extra change point, the criterion will never decrease:
199 # algo_costs is a non-increasing list.
201 # For the linear kernel we want to concatenate onto the FRONT of this list the value of the
202 # criterion when there are ZERO change-points. The problem is that the ruptures package
203 # assumes that there is at least one change-point, while the model selection of Arlot et al
204 # (2019) includes the case when there are ZERO change-points. We emphasize that the matrix
205 # multiplication below is only for the linear kernel.
206 linear_prod = np.dot(data_use, data_use.T)
207 diag_sum = np.sum([linear_prod[i, i] for i in range(np.shape(data_use)[0])])
208 crit_linear_zero_cp = diag_sum - np.mean(linear_prod)
209 # Concatenate this value onto the FRONT of the algo_costs list:
210 algo_costs.insert(0, crit_linear_zero_cp)
212 n_bkps_pen = _model_select_n_bp(
213 data=data_use,
214 algo_costs=algo_costs,
215 array_of_n_bkps=array_of_n_bkps,
216 n_bkps_max=n_bkps_max,
217 ) # penalized selection
219 # Extract the solution:
220 if n_bkps_pen > 0:
221 next_ch_pts = algo.predict(n_bkps=n_bkps_pen)
222 # Remove the end point, which itself is not an endpoint:
223 del next_ch_pts[-1]
224 else:
225 # If there were NO change-points detected, by convention we shall
226 # instead output the end-point index of the interval AS IF it were
227 # a change-point. Note that this (index + 1) was also already stored in
228 # the variable "interval_end_points" so we will be able to weed out
229 # this change-point at the end for the final list and for the
230 # eventual visualization steps.
231 next_ch_pts = algo.predict(n_bkps=1)
232 next_ch_pts = [next_ch_pts[-1] - 1]
234 # Add the current zero index (in order to get the true row/time point in the data):
235 next_ch_pts = [x + current_zero for x in next_ch_pts]
237 # Concatenate these with any found in previous loops:
238 detected_change_points = detected_change_points + list(next_ch_pts)
240 return detected_change_points, next_ch_pts
243def _model_select_n_bp(
244 data: np.array,
245 algo_costs: Sequence[float],
246 array_of_n_bkps: Sequence[int],
247 n_bkps_max: int,
248) -> int:
249 """Calculate the number of change points after model selection using
250 the slope heuristic method.
252 This function takes the output set of candidate change points from
253 the Ruptures library method and refines them to a subset that is
254 likely to represent signal rather than (over)fitted noise. It
255 implements the penalized variable selection method described
256 in Arlot et al. (2019).
258 Args:
259 data: signal data points in which to calculate the optimal
260 number of change points
261 algo_costs: the (non-penalized) costs associated with each
262 number of change points
263 array_of_n_bkps: array listing the number of change points to be
264 considered
265 n_bkps_max: maximum number of change points to be detected
267 Returns:
268 The number of change points after penalized variable selection
269 using the slope heuristic method in Arlot et al. (2019).
270 """
271 lower_D = math.ceil(0.6 * n_bkps_max)
273 X1 = [
274 (1 / np.shape(data)[0]) * math.log(math.comb(np.shape(data)[0] - 1, D - 1))
275 for D in np.arange(lower_D, n_bkps_max + 1)
276 ]
277 X2 = [(1 / np.shape(data)[0]) * D for D in np.arange(lower_D, n_bkps_max + 1)]
279 X = np.transpose(np.array([X1, X2]))
280 y = np.array(
281 [
282 (1 / np.shape(data)[0]) * algo_costs[D - 1]
283 for D in np.arange(lower_D, n_bkps_max + 1)
284 ]
285 )
286 #
287 # Linear regression:
288 algo_reg = LinearRegression().fit(X, y)
289 s1_hat = algo_reg.coef_[0]
290 s2_hat = algo_reg.coef_[1]
292 ###########################################################
294 # Final parameters:
295 c1 = -2 * s1_hat
296 c2 = -2 * s2_hat
298 # Use these two parameters to do the model selection in Arlot et al. (2019):
300 algo_pen = [
301 algo_costs[i - 1]
302 + c1 * math.log(math.comb(np.shape(data)[0] - 1, i - 1))
303 + c2 * i
304 for i in array_of_n_bkps
305 ]
307 min_val = min(algo_pen)
308 all_min_indices = [i for i, x in enumerate(algo_pen) if x == min_val]
310 # There could be more than one items in this object. We choose the smallest for parcimony
311 # reasons in this (unlikely) event:
312 min_index = all_min_indices[0]
314 n_bkps_pen = min_index
316 return n_bkps_pen