Coverage for kcpdi/kcp_ds.py: 84%

69 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-10-01 10:11 +0000

1""" 

2This module, `kcp_ds` (Kernel Change Point Detect Select), encapsulates functions from the Ruptures 

3package and includes a function for penalized variable selection. 

4 

5Specific details and remarks: 

61. The function outputs Python indices of the columns of "data" corresponding to the detected 

7change points. Real-world times associated with the data are not considered in this function. 

82. The ruptures class KernelCPD has a minimum allowed segment size (min_size). It is currently set 

9to 2 by default and is not an input parameter for kcp_ds. 

103. The default kernel is "linear," with options for "cosine" and "rbf" kernels. To cover all bases, 

11an input params is required, which defaults to None but can be modified for the rbf kernel (e.g., 

12params = {"gamma": value_of_gamma}). 

134. Due to memory constraints, the algorithm processes offline time-series data longer than max_n_time_points 

14in segments of length max_n_time_points, and then outputs the full set of change points. 

15 

16Note: Please handle real-world times associated with the data independently as this module focuses 

17on change point detection and variable selection. 

18""" 

19 

20from typing import Any, Dict, List, Literal, Optional, Sequence, Tuple 

21import math 

22 

23import numpy as np 

24import ruptures as rpt 

25from sklearn.linear_model import LinearRegression 

26 

27from kcpdi.utils import get_sum_of_cost 

28 

29 

30def kcp_ds( 

31 data: np.array, 

32 kernel: Literal["linear", "cosine", "rbf"] = "linear", 

33 params: Optional[Dict[str, Any]] = None, 

34 max_n_time_points: int = 2000, 

35 min_n_time_points: int = 10, 

36 expected_frac_anomaly: float = 1 / 1000, 

37) -> Tuple[List[int], List[int]]: 

38 """Return a set of important change points change points after 

39 running kernel change point detection followed by penalized model 

40 selection using the slope heuristic method. 

41 

42 Since the criterion is responding to noise/overfitting, penalized 

43 variable selection is performed in order to obtain a final decision 

44 as to how many of the detected change points are truly significant 

45 and not simply adjusted noise. 

46 

47 Args: 

48 data: array of dimension (number of time points) x (number of 

49 time-series) containing the data points. 

50 kernel: Kernel of the kernet change_point detection. If it is 

51 rbf, params must include a "gamma" parameters, with a 

52 positive real value. 

53 params: parameters for the kernel instance 

54 max_n_time_points: maximum size (max_n_time_points) x 

55 (max_n_time_points) of matrices expected to be processed 

56 quickly by the computer system. 

57 min_n_time_points: minimum number of time points in the current 

58 dataset for which it makes sense to run the detection 

59 algorithm again. If there are fewer than min_n_time_points 

60 points in the dataset, no computations will be run and the 

61 outputs will be empty. 

62 expected_frac_anomaly: This parameter encodes prior knowledge 

63 of users as to how often anomalies might occur. Results can 

64 be quite dependent on the choice of this parameter, so 

65 choose carefully! 

66 """ 

67 if max_n_time_points < min_n_time_points: 

68 raise ValueError("max_n_time_points should be greater than min_n_time_points.") 

69 

70 if expected_frac_anomaly > 1 / 2: 

71 raise ValueError("expected_frac_anomaly should be at most 1/2.") 

72 

73 n_total_time_points = np.shape(data)[0] 

74 

75 current_data = np.copy(data) # new variable for internal manipulation 

76 

77 n_current_time_points = np.shape(current_data)[0] 

78 curr_n_remaining_t_points = n_current_time_points 

79 

80 # Initialise an empty list to store the end points of the intervals of length max_n_time_points 

81 # (except for the last interval, which could be shorter). This variable will be required for 

82 # the explainability function that can be run after the current function: 

83 interval_end_points = [] 

84 

85 current_zero = 0 # remember the original Python indices 

86 

87 detected_change_points = [] 

88 

89 while curr_n_remaining_t_points > max_n_time_points: 

90 # Get the block of data to plug into the Ruptures setting: 

91 data_use = current_data[:max_n_time_points, :] 

92 

93 # Store the true index of the time point at the end of this interval, which will be needed 

94 # in the explainability function later: 

95 interval_end_points.append(current_zero + max_n_time_points) 

96 

97 detected_change_points, next_ch_pts = _detect_changepoints_from_kernelcpd( 

98 kernel=kernel, 

99 params=params, 

100 data_use=data_use, 

101 expected_frac_anomaly=expected_frac_anomaly, 

102 current_zero=current_zero, 

103 detected_change_points=detected_change_points, 

104 ) 

105 

106 # Concatenate any current data rows left after the last change point found, to the 

107 # remaining data (if there is any left): 

108 current_data = data[(next_ch_pts[-1] - 1):, :] 

109 

110 current_zero = next_ch_pts[-1] - 1 

111 

112 n_current_time_points = np.shape(current_data)[0] 

113 curr_n_remaining_t_points = n_current_time_points 

114 

115 # Deal with any residual block of data that is shorter than (or equal to) 

116 # max_n_time_points but still larger than min_n_time_points: 

117 if (curr_n_remaining_t_points <= max_n_time_points) & ( 

118 curr_n_remaining_t_points >= min_n_time_points 

119 ): 

120 # Store the true index of the time point at the end of this interval, which will be needed 

121 # in the explainability function later. Unlike in the above while loop, here the time point 

122 # at the end of the interval is the last time point in the dataset: 

123 interval_end_points.append(n_total_time_points) 

124 

125 detected_change_points, next_ch_pts = _detect_changepoints_from_kernelcpd( 

126 kernel=kernel, 

127 params=params, 

128 data_use=current_data, 

129 expected_frac_anomaly=expected_frac_anomaly, 

130 current_zero=current_zero, 

131 detected_change_points=detected_change_points, 

132 ) 

133 

134 # We need to remove any "detected" change-points which were actually ends of intervals. 

135 # First we add 1 to each detected_change_point: 

136 temp_detec = [ 

137 detected_change_points[i] + 1 for i in range(len(detected_change_points)) 

138 ] 

139 # Now look for matches with values in "interval_end_points" 

140 index_matches = [ 

141 i 

142 for i in range(len(detected_change_points)) 

143 if temp_detec[i] in interval_end_points 

144 ] 

145 detected_change_points = [ 

146 i for j, i in enumerate(detected_change_points) if j not in index_matches 

147 ] 

148 

149 return detected_change_points, interval_end_points 

150 

151 

152def _detect_changepoints_from_kernelcpd( 

153 kernel: Literal["linear", "cosine", "rbf"], 

154 params: Optional[Dict[str, Any]], 

155 data_use: np.array, 

156 expected_frac_anomaly: float, 

157 current_zero: int, 

158 detected_change_points: List[int], 

159) -> Tuple[List[int], List[int]]: 

160 """Apply kernel change point detection and update 

161 detected_change_points. 

162 

163 Auxiliary function to kcp_ds. Update the detected_change_points 

164 variable in place. 

165 

166 Args: 

167 kernel: Kernel of the kernet change_point detection. If it is 

168 rbf, params must include a "gamma" parameters, with a 

169 positive real value. 

170 params: parameters for the kernel instance 

171 data_use: chunk of data on which to apply the detection 

172 expected_frac_anomaly: This parameter encodes prior knowledge 

173 of users as to how often anomalies might occur. Results can 

174 be quite dependent on the choice of this parameter, so 

175 choose carefully! 

176 current_zero: index of the first point of the current chunk of 

177 the dataset 

178 detected_change_points: all breakpoints detected so far 

179 

180 Returns: 

181 The list of all points detected so far. 

182 The list of points newly detected during the last iteration. 

183 """ 

184 # Associate the data with the class KernelCPD from the Ruptures library: 

185 algo = rpt.KernelCPD(kernel=kernel, params=params) 

186 algo.fit(data_use) 

187 

188 # Get a value for the maximum number of change-points to look for; essentially, 

189 # this is set as = 2 * fraction of expected anomalies * number of current time points. 

190 n_bkps_max = max( 

191 1, int(2 * np.floor(expected_frac_anomaly * np.shape(data_use)[0])) 

192 ) 

193 

194 array_of_n_bkps = np.arange(1, n_bkps_max + 1) 

195 algo_costs = [ 

196 get_sum_of_cost(algo=algo, n_bkps=n_bkps) for n_bkps in array_of_n_bkps 

197 ] # value of the criterion for when we have chosen to stick with 1 up to n_bkps_max points 

198 # Since each time we add an extra change point, the criterion will never decrease: 

199 # algo_costs is a non-increasing list. 

200 

201 # For the linear kernel we want to concatenate onto the FRONT of this list the value of the 

202 # criterion when there are ZERO change-points. The problem is that the ruptures package 

203 # assumes that there is at least one change-point, while the model selection of Arlot et al 

204 # (2019) includes the case when there are ZERO change-points. We emphasize that the matrix 

205 # multiplication below is only for the linear kernel. 

206 linear_prod = np.dot(data_use, data_use.T) 

207 diag_sum = np.sum([linear_prod[i, i] for i in range(np.shape(data_use)[0])]) 

208 crit_linear_zero_cp = diag_sum - np.mean(linear_prod) 

209 # Concatenate this value onto the FRONT of the algo_costs list: 

210 algo_costs.insert(0, crit_linear_zero_cp) 

211 

212 n_bkps_pen = _model_select_n_bp( 

213 data=data_use, 

214 algo_costs=algo_costs, 

215 array_of_n_bkps=array_of_n_bkps, 

216 n_bkps_max=n_bkps_max, 

217 ) # penalized selection 

218 

219 # Extract the solution: 

220 if n_bkps_pen > 0: 

221 next_ch_pts = algo.predict(n_bkps=n_bkps_pen) 

222 # Remove the end point, which itself is not an endpoint: 

223 del next_ch_pts[-1] 

224 else: 

225 # If there were NO change-points detected, by convention we shall 

226 # instead output the end-point index of the interval AS IF it were 

227 # a change-point. Note that this (index + 1) was also already stored in 

228 # the variable "interval_end_points" so we will be able to weed out 

229 # this change-point at the end for the final list and for the 

230 # eventual visualization steps. 

231 next_ch_pts = algo.predict(n_bkps=1) 

232 next_ch_pts = [next_ch_pts[-1] - 1] 

233 

234 # Add the current zero index (in order to get the true row/time point in the data): 

235 next_ch_pts = [x + current_zero for x in next_ch_pts] 

236 

237 # Concatenate these with any found in previous loops: 

238 detected_change_points = detected_change_points + list(next_ch_pts) 

239 

240 return detected_change_points, next_ch_pts 

241 

242 

243def _model_select_n_bp( 

244 data: np.array, 

245 algo_costs: Sequence[float], 

246 array_of_n_bkps: Sequence[int], 

247 n_bkps_max: int, 

248) -> int: 

249 """Calculate the number of change points after model selection using 

250 the slope heuristic method. 

251 

252 This function takes the output set of candidate change points from 

253 the Ruptures library method and refines them to a subset that is 

254 likely to represent signal rather than (over)fitted noise. It 

255 implements the penalized variable selection method described 

256 in Arlot et al. (2019). 

257 

258 Args: 

259 data: signal data points in which to calculate the optimal 

260 number of change points 

261 algo_costs: the (non-penalized) costs associated with each 

262 number of change points 

263 array_of_n_bkps: array listing the number of change points to be 

264 considered 

265 n_bkps_max: maximum number of change points to be detected 

266 

267 Returns: 

268 The number of change points after penalized variable selection 

269 using the slope heuristic method in Arlot et al. (2019). 

270 """ 

271 lower_D = math.ceil(0.6 * n_bkps_max) 

272 

273 X1 = [ 

274 (1 / np.shape(data)[0]) * math.log(math.comb(np.shape(data)[0] - 1, D - 1)) 

275 for D in np.arange(lower_D, n_bkps_max + 1) 

276 ] 

277 X2 = [(1 / np.shape(data)[0]) * D for D in np.arange(lower_D, n_bkps_max + 1)] 

278 

279 X = np.transpose(np.array([X1, X2])) 

280 y = np.array( 

281 [ 

282 (1 / np.shape(data)[0]) * algo_costs[D - 1] 

283 for D in np.arange(lower_D, n_bkps_max + 1) 

284 ] 

285 ) 

286 # 

287 # Linear regression: 

288 algo_reg = LinearRegression().fit(X, y) 

289 s1_hat = algo_reg.coef_[0] 

290 s2_hat = algo_reg.coef_[1] 

291 

292 ########################################################### 

293 

294 # Final parameters: 

295 c1 = -2 * s1_hat 

296 c2 = -2 * s2_hat 

297 

298 # Use these two parameters to do the model selection in Arlot et al. (2019): 

299 

300 algo_pen = [ 

301 algo_costs[i - 1] 

302 + c1 * math.log(math.comb(np.shape(data)[0] - 1, i - 1)) 

303 + c2 * i 

304 for i in array_of_n_bkps 

305 ] 

306 

307 min_val = min(algo_pen) 

308 all_min_indices = [i for i, x in enumerate(algo_pen) if x == min_val] 

309 

310 # There could be more than one items in this object. We choose the smallest for parcimony 

311 # reasons in this (unlikely) event: 

312 min_index = all_min_indices[0] 

313 

314 n_bkps_pen = min_index 

315 

316 return n_bkps_pen