outrank.algorithms.synthetic_data_generators.cc_generator
1from __future__ import annotations 2 3from typing import List 4from typing import Literal 5from typing import Optional 6from typing import Tuple 7from typing import Union 8 9import numpy as np 10from numpy.typing import ArrayLike 11from scipy.linalg import qr 12from scipy.stats import norm 13from sklearn.cluster import KMeans 14from sklearn.utils import resample 15 16 17class CategoricalClassification: 18 19 def __init__(self, seed: int = 42): 20 np.random.seed(seed) 21 self.dataset_info = { 22 'general': {}, 23 'combinations': [], 24 'correlations': [], 25 'duplicates': [], 26 'labels': {}, 27 'noise': [], 28 } 29 30 def __repr__(self): 31 return f"CategoricalClassification(dataset_info={self.dataset_info})" 32 33 def generate_data( 34 self, 35 n_features: int, 36 n_samples: int, 37 cardinality: int = 5, 38 structure: list | ArrayLike | None = None, 39 ensure_rep: bool = False, 40 random_values: bool | None = False, 41 low: int | None = 0, 42 high: int | None = 1000, 43 seed: int = 42, 44 ) -> np.ndarray: 45 46 """ 47 Generates dataset based on given parameters 48 :param n_features: number of generated features 49 :param n_samples: number of generated samples 50 :param cardinality: default cardinality of the dataset 51 :param structure: structure of the dataset 52 :param ensure_rep: flag, ensures all given values represented 53 :param random_values: flag, enables random (integer) feature values from set [low, high] 54 :param low: sets lower bound of random feature values 55 :param high: sets high bound of random feature values 56 :param seed: sets seed of numpy random 57 :return: X, 2D dataset 58 """ 59 60 self.dataset_info.update({ 61 'general': { 62 'n_features': n_features, 63 'n_samples': n_samples, 64 'cardinality': cardinality, 65 'structure': structure, 66 'ensure_rep': ensure_rep, 67 'seed': seed, 68 }, 69 }) 70 71 np.random.seed(seed) 72 X = np.empty([n_features, n_samples]) 73 74 # No specific structure parameter passed 75 if structure is None: 76 for i in range(n_features): 77 x = self._generate_feature( 78 n_samples, 79 cardinality=cardinality, 80 ensure_rep=ensure_rep, 81 random_values=random_values, 82 low=low, 83 high=high, 84 ) 85 X[i] = x 86 # Structure parameter passed, building based on structure 87 else: 88 ix = 0 89 for data in structure: 90 91 # Data in structure is a tuple of (feature index (integer), feature attributes) 92 if not isinstance(data[0], (list, np.ndarray)): 93 feature_ix, feature_attributes = data 94 95 # Filling out the dataset up to column index feature_ix 96 if ix < feature_ix: 97 for i in range(ix, feature_ix): 98 x = self._generate_feature( 99 n_samples, 100 cardinality=cardinality, 101 ensure_rep=ensure_rep, 102 random_values=random_values, 103 low=low, 104 high=high, 105 ) 106 X[ix] = x 107 ix += 1 108 109 x = self._configure_generate_feature( 110 feature_attributes, 111 n_samples, 112 ensure_rep=ensure_rep, 113 random_values=random_values, 114 low=low, 115 high=high, 116 ) 117 X[ix] = x 118 ix += 1 119 120 # Data in structure is a tuple of (list of feature indexes, feature attributes) 121 else: 122 feature_ixs, feature_attributes = data 123 124 # Filling out the dataset up to feature_ix 125 for feature_ix in feature_ixs: 126 if ix < feature_ix: 127 for i in range(ix, feature_ix): 128 x = self._generate_feature( 129 n_samples, 130 cardinality=cardinality, 131 ensure_rep=ensure_rep, 132 random_values=random_values, 133 low=low, 134 high=high, 135 ) 136 X[ix] = x 137 ix += 1 138 139 x = self._configure_generate_feature( 140 feature_attributes, 141 n_samples, 142 ensure_rep=ensure_rep, 143 random_values=random_values, 144 low=low, 145 high=high, 146 ) 147 148 X[ix] = x 149 ix += 1 150 151 # Fill out the rest of the dataset 152 if ix < n_features: 153 for i in range(ix, n_features): 154 x = self._generate_feature( 155 n_samples, 156 cardinality=cardinality, 157 ensure_rep=ensure_rep, 158 random_values=random_values, 159 low=low, 160 high=high, 161 ) 162 X[i] = x 163 164 return X.T 165 166 def _configure_generate_feature( 167 self, 168 feature_attributes: int | list | ArrayLike, 169 n_samples: int, 170 ensure_rep: bool = False, 171 random_values: bool | None = False, 172 low: int | None = 0, 173 high: int | None = 1000, 174 ) -> np.ndarray: 175 176 """ 177 Helper function, calls _generate_feature with appropriate parameters based on feature_attributes 178 :param feature_attributes: either integer (cardinality) or list of feature attributes 179 :param n_samples: number of samples in dataset 180 :param ensure_rep: ensures all values are represented at least once in the feature vector 181 :param random_values: randomly picked values for vec if true, otherwise values range from [low, cardinality] with by 1 182 :param low: lower bound of random feature vector values 183 :param high: upper bound of random feature vector values 184 :return: feature vector 185 """ 186 187 # feature_cardinality is just an integer, generate feature either with random values or 188 # [low, low+cardinality] 189 if not isinstance(feature_attributes, (list, np.ndarray)): 190 x = self._generate_feature( 191 n_samples, 192 cardinality=feature_attributes, 193 ensure_rep=ensure_rep, 194 random_values=random_values, 195 low=low, 196 high=high, 197 ) 198 # feature_cardinality is a list of [value_domain, value_frequencies] 199 else: 200 if isinstance(feature_attributes[0], (list, np.ndarray)): 201 value_domain, value_frequencies = feature_attributes 202 x = self._generate_feature( 203 n_samples, 204 vec=value_domain, 205 ensure_rep=ensure_rep, 206 p=value_frequencies, 207 ) 208 # feature_cardinality is value_domain (list of values for feature) 209 else: 210 value_domain = feature_attributes 211 x = self._generate_feature( 212 n_samples, 213 vec=value_domain, 214 ensure_rep=ensure_rep, 215 ) 216 217 return x 218 219 def _generate_feature( 220 self, 221 size: int, 222 vec: list[int] | ArrayLike | None = None, 223 cardinality: int = 5, 224 ensure_rep: bool = False, 225 random_values: bool | None = False, 226 low: int | None = 0, 227 high: int | None = 1000, 228 p: list[float] | np.ndarray | None = None, 229 ) -> np.ndarray: 230 """ 231 Generates feature vector of length size. Default probability density distribution is approximately normal, centred around a randomly picked value. 232 :param vec: list of feature values 233 :param cardinality: single value cardinality 234 :param size: length of feature vector 235 :param ensure_rep: ensures all values are represented at least once in the feature vector 236 :param random_values: randomly picked values for vec if true, otherwise values range from [low, cardinality] with by 1 237 :param low: lower bound of random feature vector values 238 :param high: upper bound of random feature vector values 239 :param p: list of probabilities of each value 240 :return: feature vector x 241 """ 242 243 if vec is None: 244 if random_values: 245 vec = np.random.choice(range(low, high + 1), cardinality, replace=False) 246 else: 247 vec = np.arange(low, low + cardinality, 1) 248 else: 249 vec = np.array(vec) 250 251 if p is None: 252 v_shift = vec - vec[np.random.randint(len(vec))] 253 p = norm.pdf(v_shift, scale=3) 254 else: 255 p = np.array(p) 256 257 p = p / p.sum() 258 259 if ensure_rep and len(vec) < size: 260 sampled_values = np.random.choice(vec, size=(size - len(vec)), p=p) 261 sampled_values = np.append(sampled_values, vec) 262 else: 263 sampled_values = np.random.choice(vec, size=size, p=p) 264 265 np.random.shuffle(sampled_values) 266 return sampled_values 267 268 def generate_combinations( 269 self, 270 X: ArrayLike, 271 feature_indices: list[int] | ArrayLike, 272 combination_function: Optional = None, 273 combination_type: Literal['linear', 'nonlinear'] = 'linear', 274 ) -> np.ndarray: 275 """ 276 Generates linear, nonlinear, or custom combinations within feature vectors in given dataset X 277 :param X: dataset 278 :param feature_indices: indexes of features to be in combination 279 :param combination_function: optional custom function for combining feature vectors 280 :param combination_type: string flag, either liner or nonlinear, defining combination type 281 :return: X with added resultant feature 282 """ 283 284 selected_features = X[:, feature_indices] 285 286 if combination_function is None: 287 if combination_type == 'linear': 288 combination_function = lambda x: np.sum(x, axis=1) 289 elif combination_type == 'nonlinear': 290 combination_function = lambda x: np.sin(np.sum(x, axis=1)) 291 else: 292 combination_type = str(combination_function.__name__) 293 294 combination_result = combination_function(selected_features) 295 296 combination_ix = len(X[0]) 297 298 self.dataset_info['combinations'].append({ 299 'feature_indices': feature_indices, 300 'combination_type': combination_type, 301 'combination_ix': combination_ix, 302 }) 303 304 return np.column_stack((X, combination_result)) 305 306 def _xor(self, arr: list[int] | ArrayLike) -> np.ndarray: 307 """ 308 Performs bitwise XOR operation on two integer arrays 309 :param arr: features to perform XOR operation on 310 :return: bitwise XOR result 311 """ 312 arrT = arr.T 313 arrT = arrT.astype(int) 314 out = np.bitwise_xor(arrT[0], arrT[1]) 315 if len(arrT) > 2: 316 for i in range(2, len(arrT)): 317 out = np.bitwise_xor(out, arrT[i]) 318 319 return out.T 320 321 def _and(self, arr: list[int] | ArrayLike) -> np.ndarray: 322 """ 323 Performs bitwise AND operation on two integer arrays 324 :param arr: features to perform AND operation on 325 :return: bitwise AND result 326 """ 327 arrT = arr.T 328 arrT = arrT.astype(int) 329 out = np.bitwise_xor(arrT[0], arrT[1]) 330 if len(arrT) > 2: 331 for i in range(2, len(arrT)): 332 out = np.bitwise_and(out, arrT[i]) 333 334 return out.T 335 336 def _or(self, arr: list[int] | ArrayLike) -> np.ndarray: 337 """ 338 Performs bitwise OR operation on two integer arrays 339 :param arr: features to perform OR operation on 340 :return: bitwise OR result 341 """ 342 arrT = arr.T 343 arrT = arrT.astype(int) 344 out = np.bitwise_xor(arrT[0], arrT[1]) 345 if len(arrT) > 2: 346 for i in range(2, len(arrT)): 347 out = np.bitwise_or(out, arrT[i]) 348 349 return out.T 350 351 def generate_correlated( 352 self, 353 X: ArrayLike, 354 feature_indices: list[int] | ArrayLike, 355 r: float = 0.8, 356 ) -> np.ndarray: 357 358 """ 359 Generates correlated features using the given feature indices. Correlation is based on cosine of angle between vectors with mean 0. 360 :param X: dataset 361 :param feature_indices: indices of features to generate correlated feature to 362 :param r: (Pearson) correlation factor 363 :return: X with generated correlated features 364 """ 365 366 if not isinstance(feature_indices, (list, np.ndarray)): 367 feature_indices = np.array([feature_indices]) 368 369 if len(feature_indices) > 1: 370 correlated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices)), 1) 371 else: 372 correlated_ixs = len(X[0]) 373 374 selected_features = X[:, feature_indices] 375 transposed = np.transpose(selected_features) 376 correlated_features = [] 377 378 for t in transposed: 379 theta = np.arccos(r) 380 t_standard = (t - np.mean(t)) / (np.std(t) + 1e-10) 381 382 rand = np.random.normal(0, 1, len(t_standard)) 383 rand = (rand - np.mean(rand)) / (np.std(rand) + 1e-10) 384 385 M = np.column_stack((t_standard, rand)) 386 M_centred = (M - np.mean(M, axis=0)) 387 388 Id = np.eye(len(t)) 389 Q = qr(M_centred[:, [0]], mode='economic')[0] 390 P = np.dot(Q, Q.T) 391 orthogonal_projection = np.dot(Id - P, M_centred[:, 1]) 392 M_orthogonal = np.column_stack((M_centred[:, 0], orthogonal_projection)) 393 394 Y = np.dot(M_orthogonal, np.diag(1 / np.sqrt(np.sum(M_orthogonal ** 2, axis=0)))) 395 corr = Y[:, 1] + (1 / np.tan(theta)) * Y[:, 0] 396 397 correlated_features.append(corr) 398 399 correlated_features = np.transpose(correlated_features) 400 401 self.dataset_info['correlations'].append({ 402 'feature_indices': feature_indices, 403 'correlated_indices': correlated_ixs, 404 'correlation_factor': r, 405 }) 406 407 return np.column_stack((X, correlated_features)) 408 409 def generate_duplicates( 410 self, 411 X: ArrayLike, 412 feature_indices: list[int] | ArrayLike, 413 ) -> np.ndarray: 414 """ 415 Generates duplicate features 416 :param X: dataset 417 :param feature_indices: indices of features to duplicate 418 :return: dataset with duplicated features 419 """ 420 if not isinstance(feature_indices, (list, np.ndarray)): 421 feature_indices = np.array([feature_indices]) 422 423 duplicated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices) - 1), 1) 424 425 selected_features = X[:, feature_indices] 426 427 self.dataset_info['duplicates'].append({ 428 'feature_indices': feature_indices, 429 'duplicate_indices': duplicated_ixs, 430 }) 431 432 return np.column_stack((X, selected_features)) 433 434 def generate_labels( 435 self, 436 X: ArrayLike, 437 n: int = 2, 438 p: float | list[float] | ArrayLike = 0.5, 439 k: int | float = 2, 440 decision_function: Optional = None, 441 class_relation: Literal['linear', 'nonlinear', 'cluster'] = 'linear', 442 balance: bool = False, 443 random_state: int = 42, 444 ): 445 """ 446 Generates labels for dataset X 447 :param X: dataset 448 :param n: number of class labels 449 :param p: class distribution 450 :param k: constant 451 :param decision_function: optional user-defined decision function 452 :param class_relation: string, either 'linear', 'nonlinear', or 'cluster' 453 :param balance: boolean, whether to balance clustering class labels 454 :param random_state: seed for KMeans clustering, defaults to 42 455 :return: array of labels, corresponding to dataset X 456 """ 457 458 if isinstance(p, (list, np.ndarray)): 459 if sum(p) > 1: raise ValueError('sum of values in must be less than 1.0') 460 if len(p) > n: raise ValueError('length of p must equal n') 461 462 if p > 1: raise ValueError('p must be less than 1.0') 463 464 n_samples, n_features = X.shape 465 466 if decision_function is None: 467 if class_relation == 'linear': 468 decision_function = lambda x: np.sum(2 * x + 3, axis=1) 469 elif class_relation == 'nonlinear': 470 decision_function = lambda x: np.sum(k * np.sin(x) + k * np.cos(x), axis=1) 471 elif class_relation == 'cluster': 472 decision_function = None 473 else: 474 class_relation = str(decision_function.__name__) 475 476 y = [] 477 if decision_function is not None: 478 if n > 2: 479 if type(p) != list: 480 p = 1 / n 481 percentiles = [p * 100] 482 for i in range(1, n - 1): 483 percentiles.append(percentiles[i - 1] + (p * 100)) 484 485 decision_boundary = decision_function(X) 486 p_points = np.percentile(decision_boundary, percentiles) 487 488 y = np.zeros_like(decision_boundary, dtype=int) 489 for p_point in p_points: 490 y += (decision_boundary > p_point) 491 else: 492 decision_boundary = decision_function(X) 493 percentiles = [x * 100 for x in p] 494 495 for i in range(1, len(percentiles) - 1): 496 percentiles[i] += percentiles[i - 1] 497 498 percentiles.insert(0, 0) 499 percentiles.pop() 500 print(percentiles) 501 502 p_points = np.percentile(decision_boundary, percentiles) 503 print(p_points) 504 505 y = np.zeros_like(decision_boundary, dtype=int) 506 for i in range(1, n): 507 p_point = p_points[i] 508 for j in range(len(decision_boundary)): 509 if decision_boundary[j] > p_point: 510 y[j] += 1 511 else: 512 decision_boundary = decision_function(X) 513 p_point = np.percentile(decision_boundary, p * 100) 514 y = np.where(decision_boundary > p_point, 1, 0) 515 else: 516 if p == 0.5: 517 p = 1.0 518 else: 519 p = [p, 1 - p] 520 y = self._cluster_data(X, n, p=p, balance=balance, random_state=random_state) 521 522 self.dataset_info.update({ 523 'labels': { 524 'class_relation': class_relation, 525 'n_class': n, 526 }, 527 }) 528 529 return y 530 531 def _cluster_data( 532 self, 533 X: ArrayLike, 534 n: int, 535 p: float | list[float] | ArrayLike | None = 1.0, 536 balance: bool = False, 537 random_state: int = 42, 538 ) -> np.ndarray: 539 """ 540 Cluster data using kmeans 541 :param X: dataset 542 :param n: number of clusters 543 :param p: class distribution 544 :param balance: balance the clusters according to p 545 :random_state: seed for KMeans clustering, defaults to 42 546 :return: array of labels, corresponding to dataset X 547 """ 548 549 kmeans = KMeans(n_clusters=n, random_state=random_state) 550 551 kmeans.fit(X) 552 553 cluster_labels = kmeans.labels_ 554 555 # Fully balanced clusters 556 if not isinstance(p, (list, np.ndarray)): 557 samples_per_cluster = [len(X) // n] * n 558 else: 559 samples = len(X) 560 samples_per_cluster = [] 561 if not isinstance(p, (list, np.ndarray)): 562 samples_per_cluster.append(int(samples * p) // n) 563 samples_per_cluster.append(int(samples * (1 - p)) // n) 564 else: 565 if len(p) == n: 566 for val in p: 567 samples_per_cluster.append(int(samples * val)) 568 else: 569 raise Exception('Length of balance parameter must equal number of clusters.') 570 571 # Adjust cluster sizes 572 if balance: 573 adjustments = [] 574 overflow_samples = [] 575 overflow_indices = [] 576 for i in range(n): 577 cluster_size = np.sum(cluster_labels == i) 578 579 adjustment = samples_per_cluster[i] - cluster_size 580 adjustments.append(adjustment) 581 582 # Cluster is too large 583 if adjustment < 0: 584 centroid = kmeans.cluster_centers_[i] 585 # Indices of samples in dataset 586 dataset_indices = np.where(cluster_labels == i)[0] 587 cluster_samples = np.copy(X[dataset_indices]) 588 589 distances = np.linalg.norm( 590 cluster_samples - centroid, 591 axis=1, 592 ) # Distances of cluster samples to cluster centroid 593 cluster_sample_indices = np.argsort(distances) 594 dataset_indices_sorted = dataset_indices[ 595 cluster_sample_indices 596 ] # Indices of samples sorted by sample distance to cluster centroid 597 598 overflow_sample_indices = cluster_sample_indices[samples_per_cluster[i]:] # Overflow samples 599 dataset_indices_sorted = dataset_indices_sorted[ 600 samples_per_cluster[i]: 601 ] # Dataset indices of overflow samples 602 603 for i in range(len(overflow_sample_indices)): 604 overflow_samples.append(cluster_samples[overflow_sample_indices[i]]) 605 overflow_indices.append(dataset_indices_sorted[i]) 606 607 overflow_samples = np.array(overflow_samples) 608 overflow_indices = np.array(overflow_indices) 609 610 # Making adjustments 611 for i in range(n): 612 613 if adjustments[i] > 0: 614 centroid = kmeans.cluster_centers_[i] 615 distances = np.linalg.norm(overflow_samples - centroid, axis=1) 616 617 closest_sample_indices = np.argsort(distances) 618 619 overflow_indices_sorted = overflow_indices[closest_sample_indices] 620 621 sample_indices_slice = closest_sample_indices[:adjustments[i]] 622 overflow_indices_slice = overflow_indices_sorted[:adjustments[i]] 623 624 cluster_labels[overflow_indices_slice] = i 625 626 overflow_samples = np.delete(overflow_samples, sample_indices_slice, axis=0) 627 overflow_indices = np.delete(overflow_indices, sample_indices_slice, axis=0) 628 629 return np.array(cluster_labels) 630 631 def generate_noise( 632 self, 633 X: ArrayLike, 634 y: list[int] | ArrayLike, 635 p: float = 0.2, 636 type: Literal['categorical', 'missing'] = 'categorical', 637 missing_val: str | int | float = float('-inf'), 638 ) -> np.ndarray: 639 640 """ 641 Simulates noise on given dataset X 642 :param X: dataset to apply noise to 643 :param y: required target labels for categorical noise generation 644 :param p: amount of noise to apply. Defaults to 0.2 645 :param type: type of noise to apply, either categorical or missing 646 :param missing_val: value to simulate missing values. Defaults to float('-inf') 647 :return: X with noise applied 648 """ 649 650 self.dataset_info['noise'].append({ 651 'type': type, 652 'amount': p, 653 }) 654 655 if type == 'categorical': 656 label_values, label_count = np.unique(y, return_counts=True) 657 n_labels = len(label_values) 658 659 inds = y.argsort() 660 y_sort = y[inds] 661 X_sort = X[inds] 662 663 Xs_T = X_sort.T 664 n = Xs_T.shape[1] 665 n_flip = int(n * p) 666 667 for feature in Xs_T: 668 unique_per_label = {} 669 670 for i in range(n_labels): 671 if i == 0: 672 unique = np.unique(feature[:label_count[i]]) 673 unique_per_label[label_values[i]] = set(unique) 674 else: 675 unique = np.unique(feature[label_count[i - 1]:label_count[i - 1] + label_count[i] - 1]) 676 unique_per_label[label_values[i]] = set(unique) 677 678 ixs = np.random.choice(n, n_flip, replace=False) 679 680 for ix in ixs: 681 current_label = y_sort[ix] 682 possible_labels = np.where(label_values != current_label)[0] 683 684 # find all unique values from labels != current label 685 values = set() 686 for key in possible_labels: 687 values = values.union(unique_per_label[key]) 688 689 # remove any overlapping values, ensuring replacement values are unique & from a target label != 690 # current label 691 for val in unique_per_label[current_label] & values: 692 values.remove(val) 693 694 if len(values) > 0: 695 val = np.random.choice(list(values)) 696 697 else: 698 key = possible_labels[np.random.randint(len(possible_labels))] 699 values = unique_per_label[key] 700 val = np.random.choice(list(values)) 701 702 feature[ix] = val 703 704 rev_ind = inds.argsort() 705 X_noise = Xs_T.T 706 X_noise = X_noise[rev_ind] 707 708 return X_noise 709 710 elif type == 'missing': 711 X_noise = np.copy(X) 712 Xn_T = X_noise.T 713 n = Xn_T.shape[1] 714 n_missing = int(n * p) 715 #print("n to delete:", n_missing) 716 717 for feature in Xn_T: 718 ixs = np.random.choice(n, n_missing, replace=False) 719 720 for ix in ixs: 721 feature[ix] = missing_val 722 723 return Xn_T.T 724 725 else: 726 raise ValueError(f'Type {type} not supported') 727 728 def downsample_dataset( 729 self, 730 X: ArrayLike, 731 y: list[int] | ArrayLike, 732 N: int | None = None, 733 seed: int = 42, 734 reshuffle: bool = False, 735 ) -> tuple[np.ndarray, np.ndarray]: 736 737 """ 738 Downsamples dataset X according to N or the number of samples in minority class, resulting in a balanced dataset. 739 :param X: Dataset to downsample 740 :param y: Labels corresponding to X 741 :param N: Optional number of samples per class to downsample to 742 :param seed: Seed for random state of resample function 743 :param reshuffle: Reshuffle the dataset after downsampling 744 :return: Balanced X and y after downsampling 745 """ 746 747 original_shape = X.shape 748 749 values, counts = np.unique(y, return_counts=True) 750 if N is None: 751 N = min(counts) 752 753 if N > min(counts): 754 raise ValueError('N must be equal to or less than the number of samples in minority class') 755 756 X_arrays_list = [] 757 y_downsampled = [] 758 for label in values: 759 X_label = [X[i] for i in range(len(y)) if y[i] == label] 760 X_label_downsample = resample( 761 X_label, 762 replace=True, 763 n_samples=N, 764 random_state=seed, 765 ) 766 X_arrays_list.append(X_label_downsample) 767 ys = [label] * N 768 y_downsampled = np.concatenate((y_downsampled, ys), axis=0) 769 770 X_downsampled = np.concatenate(X_arrays_list, axis=0) 771 772 if reshuffle: 773 indices = np.arange(len(X_downsampled)) 774 np.random.shuffle(indices) 775 X_downsampled = X_downsampled[indices] 776 y_downsampled = y_downsampled[indices] 777 778 downsampled_shape = X_downsampled.shape 779 780 self.dataset_info.update({ 781 'downsampling': { 782 'original_shape': original_shape, 783 'downsampled_shape': downsampled_shape, 784 }, 785 }) 786 787 return X_downsampled, y_downsampled 788 789 def print_dataset( 790 self, 791 X: ArrayLike, 792 y: ArrayLike, 793 ): 794 """ 795 Prints given dataset 796 :param X: dataset 797 :param y: labels 798 :return: 799 """ 800 801 n_samples, n_features = X.shape 802 n = 0 803 for arr in X: 804 print('[', end='') 805 for i in range(n_features): 806 if i == n_features - 1: 807 print(arr[i], end='') 808 else: 809 print(arr[i], end=', ') 810 print(f'], Label: {y[n]}') 811 n += 1 812 813 """ 814 def summarize(self): 815 # TODO: Logging function 816 """
class
CategoricalClassification:
18class CategoricalClassification: 19 20 def __init__(self, seed: int = 42): 21 np.random.seed(seed) 22 self.dataset_info = { 23 'general': {}, 24 'combinations': [], 25 'correlations': [], 26 'duplicates': [], 27 'labels': {}, 28 'noise': [], 29 } 30 31 def __repr__(self): 32 return f"CategoricalClassification(dataset_info={self.dataset_info})" 33 34 def generate_data( 35 self, 36 n_features: int, 37 n_samples: int, 38 cardinality: int = 5, 39 structure: list | ArrayLike | None = None, 40 ensure_rep: bool = False, 41 random_values: bool | None = False, 42 low: int | None = 0, 43 high: int | None = 1000, 44 seed: int = 42, 45 ) -> np.ndarray: 46 47 """ 48 Generates dataset based on given parameters 49 :param n_features: number of generated features 50 :param n_samples: number of generated samples 51 :param cardinality: default cardinality of the dataset 52 :param structure: structure of the dataset 53 :param ensure_rep: flag, ensures all given values represented 54 :param random_values: flag, enables random (integer) feature values from set [low, high] 55 :param low: sets lower bound of random feature values 56 :param high: sets high bound of random feature values 57 :param seed: sets seed of numpy random 58 :return: X, 2D dataset 59 """ 60 61 self.dataset_info.update({ 62 'general': { 63 'n_features': n_features, 64 'n_samples': n_samples, 65 'cardinality': cardinality, 66 'structure': structure, 67 'ensure_rep': ensure_rep, 68 'seed': seed, 69 }, 70 }) 71 72 np.random.seed(seed) 73 X = np.empty([n_features, n_samples]) 74 75 # No specific structure parameter passed 76 if structure is None: 77 for i in range(n_features): 78 x = self._generate_feature( 79 n_samples, 80 cardinality=cardinality, 81 ensure_rep=ensure_rep, 82 random_values=random_values, 83 low=low, 84 high=high, 85 ) 86 X[i] = x 87 # Structure parameter passed, building based on structure 88 else: 89 ix = 0 90 for data in structure: 91 92 # Data in structure is a tuple of (feature index (integer), feature attributes) 93 if not isinstance(data[0], (list, np.ndarray)): 94 feature_ix, feature_attributes = data 95 96 # Filling out the dataset up to column index feature_ix 97 if ix < feature_ix: 98 for i in range(ix, feature_ix): 99 x = self._generate_feature( 100 n_samples, 101 cardinality=cardinality, 102 ensure_rep=ensure_rep, 103 random_values=random_values, 104 low=low, 105 high=high, 106 ) 107 X[ix] = x 108 ix += 1 109 110 x = self._configure_generate_feature( 111 feature_attributes, 112 n_samples, 113 ensure_rep=ensure_rep, 114 random_values=random_values, 115 low=low, 116 high=high, 117 ) 118 X[ix] = x 119 ix += 1 120 121 # Data in structure is a tuple of (list of feature indexes, feature attributes) 122 else: 123 feature_ixs, feature_attributes = data 124 125 # Filling out the dataset up to feature_ix 126 for feature_ix in feature_ixs: 127 if ix < feature_ix: 128 for i in range(ix, feature_ix): 129 x = self._generate_feature( 130 n_samples, 131 cardinality=cardinality, 132 ensure_rep=ensure_rep, 133 random_values=random_values, 134 low=low, 135 high=high, 136 ) 137 X[ix] = x 138 ix += 1 139 140 x = self._configure_generate_feature( 141 feature_attributes, 142 n_samples, 143 ensure_rep=ensure_rep, 144 random_values=random_values, 145 low=low, 146 high=high, 147 ) 148 149 X[ix] = x 150 ix += 1 151 152 # Fill out the rest of the dataset 153 if ix < n_features: 154 for i in range(ix, n_features): 155 x = self._generate_feature( 156 n_samples, 157 cardinality=cardinality, 158 ensure_rep=ensure_rep, 159 random_values=random_values, 160 low=low, 161 high=high, 162 ) 163 X[i] = x 164 165 return X.T 166 167 def _configure_generate_feature( 168 self, 169 feature_attributes: int | list | ArrayLike, 170 n_samples: int, 171 ensure_rep: bool = False, 172 random_values: bool | None = False, 173 low: int | None = 0, 174 high: int | None = 1000, 175 ) -> np.ndarray: 176 177 """ 178 Helper function, calls _generate_feature with appropriate parameters based on feature_attributes 179 :param feature_attributes: either integer (cardinality) or list of feature attributes 180 :param n_samples: number of samples in dataset 181 :param ensure_rep: ensures all values are represented at least once in the feature vector 182 :param random_values: randomly picked values for vec if true, otherwise values range from [low, cardinality] with by 1 183 :param low: lower bound of random feature vector values 184 :param high: upper bound of random feature vector values 185 :return: feature vector 186 """ 187 188 # feature_cardinality is just an integer, generate feature either with random values or 189 # [low, low+cardinality] 190 if not isinstance(feature_attributes, (list, np.ndarray)): 191 x = self._generate_feature( 192 n_samples, 193 cardinality=feature_attributes, 194 ensure_rep=ensure_rep, 195 random_values=random_values, 196 low=low, 197 high=high, 198 ) 199 # feature_cardinality is a list of [value_domain, value_frequencies] 200 else: 201 if isinstance(feature_attributes[0], (list, np.ndarray)): 202 value_domain, value_frequencies = feature_attributes 203 x = self._generate_feature( 204 n_samples, 205 vec=value_domain, 206 ensure_rep=ensure_rep, 207 p=value_frequencies, 208 ) 209 # feature_cardinality is value_domain (list of values for feature) 210 else: 211 value_domain = feature_attributes 212 x = self._generate_feature( 213 n_samples, 214 vec=value_domain, 215 ensure_rep=ensure_rep, 216 ) 217 218 return x 219 220 def _generate_feature( 221 self, 222 size: int, 223 vec: list[int] | ArrayLike | None = None, 224 cardinality: int = 5, 225 ensure_rep: bool = False, 226 random_values: bool | None = False, 227 low: int | None = 0, 228 high: int | None = 1000, 229 p: list[float] | np.ndarray | None = None, 230 ) -> np.ndarray: 231 """ 232 Generates feature vector of length size. Default probability density distribution is approximately normal, centred around a randomly picked value. 233 :param vec: list of feature values 234 :param cardinality: single value cardinality 235 :param size: length of feature vector 236 :param ensure_rep: ensures all values are represented at least once in the feature vector 237 :param random_values: randomly picked values for vec if true, otherwise values range from [low, cardinality] with by 1 238 :param low: lower bound of random feature vector values 239 :param high: upper bound of random feature vector values 240 :param p: list of probabilities of each value 241 :return: feature vector x 242 """ 243 244 if vec is None: 245 if random_values: 246 vec = np.random.choice(range(low, high + 1), cardinality, replace=False) 247 else: 248 vec = np.arange(low, low + cardinality, 1) 249 else: 250 vec = np.array(vec) 251 252 if p is None: 253 v_shift = vec - vec[np.random.randint(len(vec))] 254 p = norm.pdf(v_shift, scale=3) 255 else: 256 p = np.array(p) 257 258 p = p / p.sum() 259 260 if ensure_rep and len(vec) < size: 261 sampled_values = np.random.choice(vec, size=(size - len(vec)), p=p) 262 sampled_values = np.append(sampled_values, vec) 263 else: 264 sampled_values = np.random.choice(vec, size=size, p=p) 265 266 np.random.shuffle(sampled_values) 267 return sampled_values 268 269 def generate_combinations( 270 self, 271 X: ArrayLike, 272 feature_indices: list[int] | ArrayLike, 273 combination_function: Optional = None, 274 combination_type: Literal['linear', 'nonlinear'] = 'linear', 275 ) -> np.ndarray: 276 """ 277 Generates linear, nonlinear, or custom combinations within feature vectors in given dataset X 278 :param X: dataset 279 :param feature_indices: indexes of features to be in combination 280 :param combination_function: optional custom function for combining feature vectors 281 :param combination_type: string flag, either liner or nonlinear, defining combination type 282 :return: X with added resultant feature 283 """ 284 285 selected_features = X[:, feature_indices] 286 287 if combination_function is None: 288 if combination_type == 'linear': 289 combination_function = lambda x: np.sum(x, axis=1) 290 elif combination_type == 'nonlinear': 291 combination_function = lambda x: np.sin(np.sum(x, axis=1)) 292 else: 293 combination_type = str(combination_function.__name__) 294 295 combination_result = combination_function(selected_features) 296 297 combination_ix = len(X[0]) 298 299 self.dataset_info['combinations'].append({ 300 'feature_indices': feature_indices, 301 'combination_type': combination_type, 302 'combination_ix': combination_ix, 303 }) 304 305 return np.column_stack((X, combination_result)) 306 307 def _xor(self, arr: list[int] | ArrayLike) -> np.ndarray: 308 """ 309 Performs bitwise XOR operation on two integer arrays 310 :param arr: features to perform XOR operation on 311 :return: bitwise XOR result 312 """ 313 arrT = arr.T 314 arrT = arrT.astype(int) 315 out = np.bitwise_xor(arrT[0], arrT[1]) 316 if len(arrT) > 2: 317 for i in range(2, len(arrT)): 318 out = np.bitwise_xor(out, arrT[i]) 319 320 return out.T 321 322 def _and(self, arr: list[int] | ArrayLike) -> np.ndarray: 323 """ 324 Performs bitwise AND operation on two integer arrays 325 :param arr: features to perform AND operation on 326 :return: bitwise AND result 327 """ 328 arrT = arr.T 329 arrT = arrT.astype(int) 330 out = np.bitwise_xor(arrT[0], arrT[1]) 331 if len(arrT) > 2: 332 for i in range(2, len(arrT)): 333 out = np.bitwise_and(out, arrT[i]) 334 335 return out.T 336 337 def _or(self, arr: list[int] | ArrayLike) -> np.ndarray: 338 """ 339 Performs bitwise OR operation on two integer arrays 340 :param arr: features to perform OR operation on 341 :return: bitwise OR result 342 """ 343 arrT = arr.T 344 arrT = arrT.astype(int) 345 out = np.bitwise_xor(arrT[0], arrT[1]) 346 if len(arrT) > 2: 347 for i in range(2, len(arrT)): 348 out = np.bitwise_or(out, arrT[i]) 349 350 return out.T 351 352 def generate_correlated( 353 self, 354 X: ArrayLike, 355 feature_indices: list[int] | ArrayLike, 356 r: float = 0.8, 357 ) -> np.ndarray: 358 359 """ 360 Generates correlated features using the given feature indices. Correlation is based on cosine of angle between vectors with mean 0. 361 :param X: dataset 362 :param feature_indices: indices of features to generate correlated feature to 363 :param r: (Pearson) correlation factor 364 :return: X with generated correlated features 365 """ 366 367 if not isinstance(feature_indices, (list, np.ndarray)): 368 feature_indices = np.array([feature_indices]) 369 370 if len(feature_indices) > 1: 371 correlated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices)), 1) 372 else: 373 correlated_ixs = len(X[0]) 374 375 selected_features = X[:, feature_indices] 376 transposed = np.transpose(selected_features) 377 correlated_features = [] 378 379 for t in transposed: 380 theta = np.arccos(r) 381 t_standard = (t - np.mean(t)) / (np.std(t) + 1e-10) 382 383 rand = np.random.normal(0, 1, len(t_standard)) 384 rand = (rand - np.mean(rand)) / (np.std(rand) + 1e-10) 385 386 M = np.column_stack((t_standard, rand)) 387 M_centred = (M - np.mean(M, axis=0)) 388 389 Id = np.eye(len(t)) 390 Q = qr(M_centred[:, [0]], mode='economic')[0] 391 P = np.dot(Q, Q.T) 392 orthogonal_projection = np.dot(Id - P, M_centred[:, 1]) 393 M_orthogonal = np.column_stack((M_centred[:, 0], orthogonal_projection)) 394 395 Y = np.dot(M_orthogonal, np.diag(1 / np.sqrt(np.sum(M_orthogonal ** 2, axis=0)))) 396 corr = Y[:, 1] + (1 / np.tan(theta)) * Y[:, 0] 397 398 correlated_features.append(corr) 399 400 correlated_features = np.transpose(correlated_features) 401 402 self.dataset_info['correlations'].append({ 403 'feature_indices': feature_indices, 404 'correlated_indices': correlated_ixs, 405 'correlation_factor': r, 406 }) 407 408 return np.column_stack((X, correlated_features)) 409 410 def generate_duplicates( 411 self, 412 X: ArrayLike, 413 feature_indices: list[int] | ArrayLike, 414 ) -> np.ndarray: 415 """ 416 Generates duplicate features 417 :param X: dataset 418 :param feature_indices: indices of features to duplicate 419 :return: dataset with duplicated features 420 """ 421 if not isinstance(feature_indices, (list, np.ndarray)): 422 feature_indices = np.array([feature_indices]) 423 424 duplicated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices) - 1), 1) 425 426 selected_features = X[:, feature_indices] 427 428 self.dataset_info['duplicates'].append({ 429 'feature_indices': feature_indices, 430 'duplicate_indices': duplicated_ixs, 431 }) 432 433 return np.column_stack((X, selected_features)) 434 435 def generate_labels( 436 self, 437 X: ArrayLike, 438 n: int = 2, 439 p: float | list[float] | ArrayLike = 0.5, 440 k: int | float = 2, 441 decision_function: Optional = None, 442 class_relation: Literal['linear', 'nonlinear', 'cluster'] = 'linear', 443 balance: bool = False, 444 random_state: int = 42, 445 ): 446 """ 447 Generates labels for dataset X 448 :param X: dataset 449 :param n: number of class labels 450 :param p: class distribution 451 :param k: constant 452 :param decision_function: optional user-defined decision function 453 :param class_relation: string, either 'linear', 'nonlinear', or 'cluster' 454 :param balance: boolean, whether to balance clustering class labels 455 :param random_state: seed for KMeans clustering, defaults to 42 456 :return: array of labels, corresponding to dataset X 457 """ 458 459 if isinstance(p, (list, np.ndarray)): 460 if sum(p) > 1: raise ValueError('sum of values in must be less than 1.0') 461 if len(p) > n: raise ValueError('length of p must equal n') 462 463 if p > 1: raise ValueError('p must be less than 1.0') 464 465 n_samples, n_features = X.shape 466 467 if decision_function is None: 468 if class_relation == 'linear': 469 decision_function = lambda x: np.sum(2 * x + 3, axis=1) 470 elif class_relation == 'nonlinear': 471 decision_function = lambda x: np.sum(k * np.sin(x) + k * np.cos(x), axis=1) 472 elif class_relation == 'cluster': 473 decision_function = None 474 else: 475 class_relation = str(decision_function.__name__) 476 477 y = [] 478 if decision_function is not None: 479 if n > 2: 480 if type(p) != list: 481 p = 1 / n 482 percentiles = [p * 100] 483 for i in range(1, n - 1): 484 percentiles.append(percentiles[i - 1] + (p * 100)) 485 486 decision_boundary = decision_function(X) 487 p_points = np.percentile(decision_boundary, percentiles) 488 489 y = np.zeros_like(decision_boundary, dtype=int) 490 for p_point in p_points: 491 y += (decision_boundary > p_point) 492 else: 493 decision_boundary = decision_function(X) 494 percentiles = [x * 100 for x in p] 495 496 for i in range(1, len(percentiles) - 1): 497 percentiles[i] += percentiles[i - 1] 498 499 percentiles.insert(0, 0) 500 percentiles.pop() 501 print(percentiles) 502 503 p_points = np.percentile(decision_boundary, percentiles) 504 print(p_points) 505 506 y = np.zeros_like(decision_boundary, dtype=int) 507 for i in range(1, n): 508 p_point = p_points[i] 509 for j in range(len(decision_boundary)): 510 if decision_boundary[j] > p_point: 511 y[j] += 1 512 else: 513 decision_boundary = decision_function(X) 514 p_point = np.percentile(decision_boundary, p * 100) 515 y = np.where(decision_boundary > p_point, 1, 0) 516 else: 517 if p == 0.5: 518 p = 1.0 519 else: 520 p = [p, 1 - p] 521 y = self._cluster_data(X, n, p=p, balance=balance, random_state=random_state) 522 523 self.dataset_info.update({ 524 'labels': { 525 'class_relation': class_relation, 526 'n_class': n, 527 }, 528 }) 529 530 return y 531 532 def _cluster_data( 533 self, 534 X: ArrayLike, 535 n: int, 536 p: float | list[float] | ArrayLike | None = 1.0, 537 balance: bool = False, 538 random_state: int = 42, 539 ) -> np.ndarray: 540 """ 541 Cluster data using kmeans 542 :param X: dataset 543 :param n: number of clusters 544 :param p: class distribution 545 :param balance: balance the clusters according to p 546 :random_state: seed for KMeans clustering, defaults to 42 547 :return: array of labels, corresponding to dataset X 548 """ 549 550 kmeans = KMeans(n_clusters=n, random_state=random_state) 551 552 kmeans.fit(X) 553 554 cluster_labels = kmeans.labels_ 555 556 # Fully balanced clusters 557 if not isinstance(p, (list, np.ndarray)): 558 samples_per_cluster = [len(X) // n] * n 559 else: 560 samples = len(X) 561 samples_per_cluster = [] 562 if not isinstance(p, (list, np.ndarray)): 563 samples_per_cluster.append(int(samples * p) // n) 564 samples_per_cluster.append(int(samples * (1 - p)) // n) 565 else: 566 if len(p) == n: 567 for val in p: 568 samples_per_cluster.append(int(samples * val)) 569 else: 570 raise Exception('Length of balance parameter must equal number of clusters.') 571 572 # Adjust cluster sizes 573 if balance: 574 adjustments = [] 575 overflow_samples = [] 576 overflow_indices = [] 577 for i in range(n): 578 cluster_size = np.sum(cluster_labels == i) 579 580 adjustment = samples_per_cluster[i] - cluster_size 581 adjustments.append(adjustment) 582 583 # Cluster is too large 584 if adjustment < 0: 585 centroid = kmeans.cluster_centers_[i] 586 # Indices of samples in dataset 587 dataset_indices = np.where(cluster_labels == i)[0] 588 cluster_samples = np.copy(X[dataset_indices]) 589 590 distances = np.linalg.norm( 591 cluster_samples - centroid, 592 axis=1, 593 ) # Distances of cluster samples to cluster centroid 594 cluster_sample_indices = np.argsort(distances) 595 dataset_indices_sorted = dataset_indices[ 596 cluster_sample_indices 597 ] # Indices of samples sorted by sample distance to cluster centroid 598 599 overflow_sample_indices = cluster_sample_indices[samples_per_cluster[i]:] # Overflow samples 600 dataset_indices_sorted = dataset_indices_sorted[ 601 samples_per_cluster[i]: 602 ] # Dataset indices of overflow samples 603 604 for i in range(len(overflow_sample_indices)): 605 overflow_samples.append(cluster_samples[overflow_sample_indices[i]]) 606 overflow_indices.append(dataset_indices_sorted[i]) 607 608 overflow_samples = np.array(overflow_samples) 609 overflow_indices = np.array(overflow_indices) 610 611 # Making adjustments 612 for i in range(n): 613 614 if adjustments[i] > 0: 615 centroid = kmeans.cluster_centers_[i] 616 distances = np.linalg.norm(overflow_samples - centroid, axis=1) 617 618 closest_sample_indices = np.argsort(distances) 619 620 overflow_indices_sorted = overflow_indices[closest_sample_indices] 621 622 sample_indices_slice = closest_sample_indices[:adjustments[i]] 623 overflow_indices_slice = overflow_indices_sorted[:adjustments[i]] 624 625 cluster_labels[overflow_indices_slice] = i 626 627 overflow_samples = np.delete(overflow_samples, sample_indices_slice, axis=0) 628 overflow_indices = np.delete(overflow_indices, sample_indices_slice, axis=0) 629 630 return np.array(cluster_labels) 631 632 def generate_noise( 633 self, 634 X: ArrayLike, 635 y: list[int] | ArrayLike, 636 p: float = 0.2, 637 type: Literal['categorical', 'missing'] = 'categorical', 638 missing_val: str | int | float = float('-inf'), 639 ) -> np.ndarray: 640 641 """ 642 Simulates noise on given dataset X 643 :param X: dataset to apply noise to 644 :param y: required target labels for categorical noise generation 645 :param p: amount of noise to apply. Defaults to 0.2 646 :param type: type of noise to apply, either categorical or missing 647 :param missing_val: value to simulate missing values. Defaults to float('-inf') 648 :return: X with noise applied 649 """ 650 651 self.dataset_info['noise'].append({ 652 'type': type, 653 'amount': p, 654 }) 655 656 if type == 'categorical': 657 label_values, label_count = np.unique(y, return_counts=True) 658 n_labels = len(label_values) 659 660 inds = y.argsort() 661 y_sort = y[inds] 662 X_sort = X[inds] 663 664 Xs_T = X_sort.T 665 n = Xs_T.shape[1] 666 n_flip = int(n * p) 667 668 for feature in Xs_T: 669 unique_per_label = {} 670 671 for i in range(n_labels): 672 if i == 0: 673 unique = np.unique(feature[:label_count[i]]) 674 unique_per_label[label_values[i]] = set(unique) 675 else: 676 unique = np.unique(feature[label_count[i - 1]:label_count[i - 1] + label_count[i] - 1]) 677 unique_per_label[label_values[i]] = set(unique) 678 679 ixs = np.random.choice(n, n_flip, replace=False) 680 681 for ix in ixs: 682 current_label = y_sort[ix] 683 possible_labels = np.where(label_values != current_label)[0] 684 685 # find all unique values from labels != current label 686 values = set() 687 for key in possible_labels: 688 values = values.union(unique_per_label[key]) 689 690 # remove any overlapping values, ensuring replacement values are unique & from a target label != 691 # current label 692 for val in unique_per_label[current_label] & values: 693 values.remove(val) 694 695 if len(values) > 0: 696 val = np.random.choice(list(values)) 697 698 else: 699 key = possible_labels[np.random.randint(len(possible_labels))] 700 values = unique_per_label[key] 701 val = np.random.choice(list(values)) 702 703 feature[ix] = val 704 705 rev_ind = inds.argsort() 706 X_noise = Xs_T.T 707 X_noise = X_noise[rev_ind] 708 709 return X_noise 710 711 elif type == 'missing': 712 X_noise = np.copy(X) 713 Xn_T = X_noise.T 714 n = Xn_T.shape[1] 715 n_missing = int(n * p) 716 #print("n to delete:", n_missing) 717 718 for feature in Xn_T: 719 ixs = np.random.choice(n, n_missing, replace=False) 720 721 for ix in ixs: 722 feature[ix] = missing_val 723 724 return Xn_T.T 725 726 else: 727 raise ValueError(f'Type {type} not supported') 728 729 def downsample_dataset( 730 self, 731 X: ArrayLike, 732 y: list[int] | ArrayLike, 733 N: int | None = None, 734 seed: int = 42, 735 reshuffle: bool = False, 736 ) -> tuple[np.ndarray, np.ndarray]: 737 738 """ 739 Downsamples dataset X according to N or the number of samples in minority class, resulting in a balanced dataset. 740 :param X: Dataset to downsample 741 :param y: Labels corresponding to X 742 :param N: Optional number of samples per class to downsample to 743 :param seed: Seed for random state of resample function 744 :param reshuffle: Reshuffle the dataset after downsampling 745 :return: Balanced X and y after downsampling 746 """ 747 748 original_shape = X.shape 749 750 values, counts = np.unique(y, return_counts=True) 751 if N is None: 752 N = min(counts) 753 754 if N > min(counts): 755 raise ValueError('N must be equal to or less than the number of samples in minority class') 756 757 X_arrays_list = [] 758 y_downsampled = [] 759 for label in values: 760 X_label = [X[i] for i in range(len(y)) if y[i] == label] 761 X_label_downsample = resample( 762 X_label, 763 replace=True, 764 n_samples=N, 765 random_state=seed, 766 ) 767 X_arrays_list.append(X_label_downsample) 768 ys = [label] * N 769 y_downsampled = np.concatenate((y_downsampled, ys), axis=0) 770 771 X_downsampled = np.concatenate(X_arrays_list, axis=0) 772 773 if reshuffle: 774 indices = np.arange(len(X_downsampled)) 775 np.random.shuffle(indices) 776 X_downsampled = X_downsampled[indices] 777 y_downsampled = y_downsampled[indices] 778 779 downsampled_shape = X_downsampled.shape 780 781 self.dataset_info.update({ 782 'downsampling': { 783 'original_shape': original_shape, 784 'downsampled_shape': downsampled_shape, 785 }, 786 }) 787 788 return X_downsampled, y_downsampled 789 790 def print_dataset( 791 self, 792 X: ArrayLike, 793 y: ArrayLike, 794 ): 795 """ 796 Prints given dataset 797 :param X: dataset 798 :param y: labels 799 :return: 800 """ 801 802 n_samples, n_features = X.shape 803 n = 0 804 for arr in X: 805 print('[', end='') 806 for i in range(n_features): 807 if i == n_features - 1: 808 print(arr[i], end='') 809 else: 810 print(arr[i], end=', ') 811 print(f'], Label: {y[n]}') 812 n += 1 813 814 """ 815 def summarize(self): 816 # TODO: Logging function 817 """
def
generate_data( self, n_features: int, n_samples: int, cardinality: int = 5, structure: Union[list, numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]], NoneType] = None, ensure_rep: bool = False, random_values: bool | None = False, low: int | None = 0, high: int | None = 1000, seed: int = 42) -> numpy.ndarray:
34 def generate_data( 35 self, 36 n_features: int, 37 n_samples: int, 38 cardinality: int = 5, 39 structure: list | ArrayLike | None = None, 40 ensure_rep: bool = False, 41 random_values: bool | None = False, 42 low: int | None = 0, 43 high: int | None = 1000, 44 seed: int = 42, 45 ) -> np.ndarray: 46 47 """ 48 Generates dataset based on given parameters 49 :param n_features: number of generated features 50 :param n_samples: number of generated samples 51 :param cardinality: default cardinality of the dataset 52 :param structure: structure of the dataset 53 :param ensure_rep: flag, ensures all given values represented 54 :param random_values: flag, enables random (integer) feature values from set [low, high] 55 :param low: sets lower bound of random feature values 56 :param high: sets high bound of random feature values 57 :param seed: sets seed of numpy random 58 :return: X, 2D dataset 59 """ 60 61 self.dataset_info.update({ 62 'general': { 63 'n_features': n_features, 64 'n_samples': n_samples, 65 'cardinality': cardinality, 66 'structure': structure, 67 'ensure_rep': ensure_rep, 68 'seed': seed, 69 }, 70 }) 71 72 np.random.seed(seed) 73 X = np.empty([n_features, n_samples]) 74 75 # No specific structure parameter passed 76 if structure is None: 77 for i in range(n_features): 78 x = self._generate_feature( 79 n_samples, 80 cardinality=cardinality, 81 ensure_rep=ensure_rep, 82 random_values=random_values, 83 low=low, 84 high=high, 85 ) 86 X[i] = x 87 # Structure parameter passed, building based on structure 88 else: 89 ix = 0 90 for data in structure: 91 92 # Data in structure is a tuple of (feature index (integer), feature attributes) 93 if not isinstance(data[0], (list, np.ndarray)): 94 feature_ix, feature_attributes = data 95 96 # Filling out the dataset up to column index feature_ix 97 if ix < feature_ix: 98 for i in range(ix, feature_ix): 99 x = self._generate_feature( 100 n_samples, 101 cardinality=cardinality, 102 ensure_rep=ensure_rep, 103 random_values=random_values, 104 low=low, 105 high=high, 106 ) 107 X[ix] = x 108 ix += 1 109 110 x = self._configure_generate_feature( 111 feature_attributes, 112 n_samples, 113 ensure_rep=ensure_rep, 114 random_values=random_values, 115 low=low, 116 high=high, 117 ) 118 X[ix] = x 119 ix += 1 120 121 # Data in structure is a tuple of (list of feature indexes, feature attributes) 122 else: 123 feature_ixs, feature_attributes = data 124 125 # Filling out the dataset up to feature_ix 126 for feature_ix in feature_ixs: 127 if ix < feature_ix: 128 for i in range(ix, feature_ix): 129 x = self._generate_feature( 130 n_samples, 131 cardinality=cardinality, 132 ensure_rep=ensure_rep, 133 random_values=random_values, 134 low=low, 135 high=high, 136 ) 137 X[ix] = x 138 ix += 1 139 140 x = self._configure_generate_feature( 141 feature_attributes, 142 n_samples, 143 ensure_rep=ensure_rep, 144 random_values=random_values, 145 low=low, 146 high=high, 147 ) 148 149 X[ix] = x 150 ix += 1 151 152 # Fill out the rest of the dataset 153 if ix < n_features: 154 for i in range(ix, n_features): 155 x = self._generate_feature( 156 n_samples, 157 cardinality=cardinality, 158 ensure_rep=ensure_rep, 159 random_values=random_values, 160 low=low, 161 high=high, 162 ) 163 X[i] = x 164 165 return X.T
Generates dataset based on given parameters
Parameters
- n_features: number of generated features
- n_samples: number of generated samples
- cardinality: default cardinality of the dataset
- structure: structure of the dataset
- ensure_rep: flag, ensures all given values represented
- random_values: flag, enables random (integer) feature values from set [low, high]
- low: sets lower bound of random feature values
- high: sets high bound of random feature values
- seed: sets seed of numpy random
Returns
X, 2D dataset
def
generate_combinations( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], feature_indices: Union[list[int], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], combination_function: Optional = None, combination_type: Literal['linear', 'nonlinear'] = 'linear') -> numpy.ndarray:
269 def generate_combinations( 270 self, 271 X: ArrayLike, 272 feature_indices: list[int] | ArrayLike, 273 combination_function: Optional = None, 274 combination_type: Literal['linear', 'nonlinear'] = 'linear', 275 ) -> np.ndarray: 276 """ 277 Generates linear, nonlinear, or custom combinations within feature vectors in given dataset X 278 :param X: dataset 279 :param feature_indices: indexes of features to be in combination 280 :param combination_function: optional custom function for combining feature vectors 281 :param combination_type: string flag, either liner or nonlinear, defining combination type 282 :return: X with added resultant feature 283 """ 284 285 selected_features = X[:, feature_indices] 286 287 if combination_function is None: 288 if combination_type == 'linear': 289 combination_function = lambda x: np.sum(x, axis=1) 290 elif combination_type == 'nonlinear': 291 combination_function = lambda x: np.sin(np.sum(x, axis=1)) 292 else: 293 combination_type = str(combination_function.__name__) 294 295 combination_result = combination_function(selected_features) 296 297 combination_ix = len(X[0]) 298 299 self.dataset_info['combinations'].append({ 300 'feature_indices': feature_indices, 301 'combination_type': combination_type, 302 'combination_ix': combination_ix, 303 }) 304 305 return np.column_stack((X, combination_result))
Generates linear, nonlinear, or custom combinations within feature vectors in given dataset X
Parameters
- X: dataset
- feature_indices: indexes of features to be in combination
- combination_function: optional custom function for combining feature vectors
- combination_type: string flag, either liner or nonlinear, defining combination type
Returns
X with added resultant feature
def
generate_duplicates( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], feature_indices: Union[list[int], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]]) -> numpy.ndarray:
410 def generate_duplicates( 411 self, 412 X: ArrayLike, 413 feature_indices: list[int] | ArrayLike, 414 ) -> np.ndarray: 415 """ 416 Generates duplicate features 417 :param X: dataset 418 :param feature_indices: indices of features to duplicate 419 :return: dataset with duplicated features 420 """ 421 if not isinstance(feature_indices, (list, np.ndarray)): 422 feature_indices = np.array([feature_indices]) 423 424 duplicated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices) - 1), 1) 425 426 selected_features = X[:, feature_indices] 427 428 self.dataset_info['duplicates'].append({ 429 'feature_indices': feature_indices, 430 'duplicate_indices': duplicated_ixs, 431 }) 432 433 return np.column_stack((X, selected_features))
Generates duplicate features
Parameters
- X: dataset
- feature_indices: indices of features to duplicate
Returns
dataset with duplicated features
def
generate_labels( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], n: int = 2, p: Union[float, list[float], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]] = 0.5, k: int | float = 2, decision_function: Optional = None, class_relation: Literal['linear', 'nonlinear', 'cluster'] = 'linear', balance: bool = False, random_state: int = 42):
435 def generate_labels( 436 self, 437 X: ArrayLike, 438 n: int = 2, 439 p: float | list[float] | ArrayLike = 0.5, 440 k: int | float = 2, 441 decision_function: Optional = None, 442 class_relation: Literal['linear', 'nonlinear', 'cluster'] = 'linear', 443 balance: bool = False, 444 random_state: int = 42, 445 ): 446 """ 447 Generates labels for dataset X 448 :param X: dataset 449 :param n: number of class labels 450 :param p: class distribution 451 :param k: constant 452 :param decision_function: optional user-defined decision function 453 :param class_relation: string, either 'linear', 'nonlinear', or 'cluster' 454 :param balance: boolean, whether to balance clustering class labels 455 :param random_state: seed for KMeans clustering, defaults to 42 456 :return: array of labels, corresponding to dataset X 457 """ 458 459 if isinstance(p, (list, np.ndarray)): 460 if sum(p) > 1: raise ValueError('sum of values in must be less than 1.0') 461 if len(p) > n: raise ValueError('length of p must equal n') 462 463 if p > 1: raise ValueError('p must be less than 1.0') 464 465 n_samples, n_features = X.shape 466 467 if decision_function is None: 468 if class_relation == 'linear': 469 decision_function = lambda x: np.sum(2 * x + 3, axis=1) 470 elif class_relation == 'nonlinear': 471 decision_function = lambda x: np.sum(k * np.sin(x) + k * np.cos(x), axis=1) 472 elif class_relation == 'cluster': 473 decision_function = None 474 else: 475 class_relation = str(decision_function.__name__) 476 477 y = [] 478 if decision_function is not None: 479 if n > 2: 480 if type(p) != list: 481 p = 1 / n 482 percentiles = [p * 100] 483 for i in range(1, n - 1): 484 percentiles.append(percentiles[i - 1] + (p * 100)) 485 486 decision_boundary = decision_function(X) 487 p_points = np.percentile(decision_boundary, percentiles) 488 489 y = np.zeros_like(decision_boundary, dtype=int) 490 for p_point in p_points: 491 y += (decision_boundary > p_point) 492 else: 493 decision_boundary = decision_function(X) 494 percentiles = [x * 100 for x in p] 495 496 for i in range(1, len(percentiles) - 1): 497 percentiles[i] += percentiles[i - 1] 498 499 percentiles.insert(0, 0) 500 percentiles.pop() 501 print(percentiles) 502 503 p_points = np.percentile(decision_boundary, percentiles) 504 print(p_points) 505 506 y = np.zeros_like(decision_boundary, dtype=int) 507 for i in range(1, n): 508 p_point = p_points[i] 509 for j in range(len(decision_boundary)): 510 if decision_boundary[j] > p_point: 511 y[j] += 1 512 else: 513 decision_boundary = decision_function(X) 514 p_point = np.percentile(decision_boundary, p * 100) 515 y = np.where(decision_boundary > p_point, 1, 0) 516 else: 517 if p == 0.5: 518 p = 1.0 519 else: 520 p = [p, 1 - p] 521 y = self._cluster_data(X, n, p=p, balance=balance, random_state=random_state) 522 523 self.dataset_info.update({ 524 'labels': { 525 'class_relation': class_relation, 526 'n_class': n, 527 }, 528 }) 529 530 return y
Generates labels for dataset X
Parameters
- X: dataset
- n: number of class labels
- p: class distribution
- k: constant
- decision_function: optional user-defined decision function
- class_relation: string, either 'linear', 'nonlinear', or 'cluster'
- balance: boolean, whether to balance clustering class labels
- random_state: seed for KMeans clustering, defaults to 42
Returns
array of labels, corresponding to dataset X
def
generate_noise( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], y: Union[list[int], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], p: float = 0.2, type: Literal['categorical', 'missing'] = 'categorical', missing_val: str | int | float = -inf) -> numpy.ndarray:
632 def generate_noise( 633 self, 634 X: ArrayLike, 635 y: list[int] | ArrayLike, 636 p: float = 0.2, 637 type: Literal['categorical', 'missing'] = 'categorical', 638 missing_val: str | int | float = float('-inf'), 639 ) -> np.ndarray: 640 641 """ 642 Simulates noise on given dataset X 643 :param X: dataset to apply noise to 644 :param y: required target labels for categorical noise generation 645 :param p: amount of noise to apply. Defaults to 0.2 646 :param type: type of noise to apply, either categorical or missing 647 :param missing_val: value to simulate missing values. Defaults to float('-inf') 648 :return: X with noise applied 649 """ 650 651 self.dataset_info['noise'].append({ 652 'type': type, 653 'amount': p, 654 }) 655 656 if type == 'categorical': 657 label_values, label_count = np.unique(y, return_counts=True) 658 n_labels = len(label_values) 659 660 inds = y.argsort() 661 y_sort = y[inds] 662 X_sort = X[inds] 663 664 Xs_T = X_sort.T 665 n = Xs_T.shape[1] 666 n_flip = int(n * p) 667 668 for feature in Xs_T: 669 unique_per_label = {} 670 671 for i in range(n_labels): 672 if i == 0: 673 unique = np.unique(feature[:label_count[i]]) 674 unique_per_label[label_values[i]] = set(unique) 675 else: 676 unique = np.unique(feature[label_count[i - 1]:label_count[i - 1] + label_count[i] - 1]) 677 unique_per_label[label_values[i]] = set(unique) 678 679 ixs = np.random.choice(n, n_flip, replace=False) 680 681 for ix in ixs: 682 current_label = y_sort[ix] 683 possible_labels = np.where(label_values != current_label)[0] 684 685 # find all unique values from labels != current label 686 values = set() 687 for key in possible_labels: 688 values = values.union(unique_per_label[key]) 689 690 # remove any overlapping values, ensuring replacement values are unique & from a target label != 691 # current label 692 for val in unique_per_label[current_label] & values: 693 values.remove(val) 694 695 if len(values) > 0: 696 val = np.random.choice(list(values)) 697 698 else: 699 key = possible_labels[np.random.randint(len(possible_labels))] 700 values = unique_per_label[key] 701 val = np.random.choice(list(values)) 702 703 feature[ix] = val 704 705 rev_ind = inds.argsort() 706 X_noise = Xs_T.T 707 X_noise = X_noise[rev_ind] 708 709 return X_noise 710 711 elif type == 'missing': 712 X_noise = np.copy(X) 713 Xn_T = X_noise.T 714 n = Xn_T.shape[1] 715 n_missing = int(n * p) 716 #print("n to delete:", n_missing) 717 718 for feature in Xn_T: 719 ixs = np.random.choice(n, n_missing, replace=False) 720 721 for ix in ixs: 722 feature[ix] = missing_val 723 724 return Xn_T.T 725 726 else: 727 raise ValueError(f'Type {type} not supported')
Simulates noise on given dataset X
Parameters
- X: dataset to apply noise to
- y: required target labels for categorical noise generation
- p: amount of noise to apply. Defaults to 0.2
- type: type of noise to apply, either categorical or missing
- missing_val: value to simulate missing values. Defaults to float('-inf')
Returns
X with noise applied
def
downsample_dataset( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], y: Union[list[int], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], N: int | None = None, seed: int = 42, reshuffle: bool = False) -> tuple[numpy.ndarray, numpy.ndarray]:
729 def downsample_dataset( 730 self, 731 X: ArrayLike, 732 y: list[int] | ArrayLike, 733 N: int | None = None, 734 seed: int = 42, 735 reshuffle: bool = False, 736 ) -> tuple[np.ndarray, np.ndarray]: 737 738 """ 739 Downsamples dataset X according to N or the number of samples in minority class, resulting in a balanced dataset. 740 :param X: Dataset to downsample 741 :param y: Labels corresponding to X 742 :param N: Optional number of samples per class to downsample to 743 :param seed: Seed for random state of resample function 744 :param reshuffle: Reshuffle the dataset after downsampling 745 :return: Balanced X and y after downsampling 746 """ 747 748 original_shape = X.shape 749 750 values, counts = np.unique(y, return_counts=True) 751 if N is None: 752 N = min(counts) 753 754 if N > min(counts): 755 raise ValueError('N must be equal to or less than the number of samples in minority class') 756 757 X_arrays_list = [] 758 y_downsampled = [] 759 for label in values: 760 X_label = [X[i] for i in range(len(y)) if y[i] == label] 761 X_label_downsample = resample( 762 X_label, 763 replace=True, 764 n_samples=N, 765 random_state=seed, 766 ) 767 X_arrays_list.append(X_label_downsample) 768 ys = [label] * N 769 y_downsampled = np.concatenate((y_downsampled, ys), axis=0) 770 771 X_downsampled = np.concatenate(X_arrays_list, axis=0) 772 773 if reshuffle: 774 indices = np.arange(len(X_downsampled)) 775 np.random.shuffle(indices) 776 X_downsampled = X_downsampled[indices] 777 y_downsampled = y_downsampled[indices] 778 779 downsampled_shape = X_downsampled.shape 780 781 self.dataset_info.update({ 782 'downsampling': { 783 'original_shape': original_shape, 784 'downsampled_shape': downsampled_shape, 785 }, 786 }) 787 788 return X_downsampled, y_downsampled
Downsamples dataset X according to N or the number of samples in minority class, resulting in a balanced dataset.
Parameters
- X: Dataset to downsample
- y: Labels corresponding to X
- N: Optional number of samples per class to downsample to
- seed: Seed for random state of resample function
- reshuffle: Reshuffle the dataset after downsampling
Returns
Balanced X and y after downsampling
def
print_dataset( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], y: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]]):
790 def print_dataset( 791 self, 792 X: ArrayLike, 793 y: ArrayLike, 794 ): 795 """ 796 Prints given dataset 797 :param X: dataset 798 :param y: labels 799 :return: 800 """ 801 802 n_samples, n_features = X.shape 803 n = 0 804 for arr in X: 805 print('[', end='') 806 for i in range(n_features): 807 if i == n_features - 1: 808 print(arr[i], end='') 809 else: 810 print(arr[i], end=', ') 811 print(f'], Label: {y[n]}') 812 n += 1
Prints given dataset
Parameters
- X: dataset
- y: labels