outrank.algorithms.synthetic_data_generators.cc_generator

  1from __future__ import annotations
  2
  3from typing import List
  4from typing import Literal
  5from typing import Optional
  6from typing import Tuple
  7from typing import Union
  8
  9import numpy as np
 10from numpy.typing import ArrayLike
 11from scipy.linalg import qr
 12from scipy.stats import norm
 13from sklearn.cluster import KMeans
 14from sklearn.utils import resample
 15
 16
 17class CategoricalClassification:
 18
 19    def __init__(self, seed: int = 42):
 20        np.random.seed(seed)
 21        self.dataset_info = {
 22            'general': {},
 23            'combinations': [],
 24            'correlations': [],
 25            'duplicates': [],
 26            'labels': {},
 27            'noise': [],
 28        }
 29
 30    def __repr__(self):
 31        return f"CategoricalClassification(dataset_info={self.dataset_info})"
 32
 33    def generate_data(
 34        self,
 35        n_features: int,
 36        n_samples: int,
 37        cardinality: int = 5,
 38        structure: list | ArrayLike | None = None,
 39        ensure_rep: bool = False,
 40        random_values: bool | None = False,
 41        low: int | None = 0,
 42        high: int | None = 1000,
 43        seed: int = 42,
 44    ) -> np.ndarray:
 45
 46        """
 47        Generates dataset based on given parameters
 48        :param n_features: number of generated features
 49        :param n_samples: number of generated samples
 50        :param cardinality: default cardinality of the dataset
 51        :param structure: structure of the dataset
 52        :param ensure_rep: flag, ensures all given values represented
 53        :param random_values: flag, enables random (integer) feature values from set [low, high]
 54        :param low: sets lower bound of random feature values
 55        :param high: sets high bound of random feature values
 56        :param seed: sets seed of numpy random
 57        :return: X, 2D dataset
 58        """
 59
 60        self.dataset_info.update({
 61            'general': {
 62                'n_features': n_features,
 63                'n_samples': n_samples,
 64                'cardinality': cardinality,
 65                'structure': structure,
 66                'ensure_rep': ensure_rep,
 67                'seed': seed,
 68            },
 69        })
 70
 71        np.random.seed(seed)
 72        X = np.empty([n_features, n_samples])
 73
 74        # No specific structure parameter passed
 75        if structure is None:
 76            for i in range(n_features):
 77                x = self._generate_feature(
 78                    n_samples,
 79                    cardinality=cardinality,
 80                    ensure_rep=ensure_rep,
 81                    random_values=random_values,
 82                    low=low,
 83                    high=high,
 84                )
 85                X[i] = x
 86        # Structure parameter passed, building based on structure
 87        else:
 88            ix = 0
 89            for data in structure:
 90
 91                # Data in structure is a tuple of (feature index (integer), feature attributes)
 92                if not isinstance(data[0], (list, np.ndarray)):
 93                    feature_ix, feature_attributes = data
 94
 95                    # Filling out the dataset up to column index feature_ix
 96                    if ix < feature_ix:
 97                        for i in range(ix, feature_ix):
 98                            x = self._generate_feature(
 99                                n_samples,
100                                cardinality=cardinality,
101                                ensure_rep=ensure_rep,
102                                random_values=random_values,
103                                low=low,
104                                high=high,
105                            )
106                            X[ix] = x
107                            ix += 1
108
109                    x = self._configure_generate_feature(
110                        feature_attributes,
111                        n_samples,
112                        ensure_rep=ensure_rep,
113                        random_values=random_values,
114                        low=low,
115                        high=high,
116                    )
117                    X[ix] = x
118                    ix += 1
119
120                # Data in structure is a tuple of (list of feature indexes, feature attributes)
121                else:
122                    feature_ixs, feature_attributes = data
123
124                    # Filling out the dataset up to feature_ix
125                    for feature_ix in feature_ixs:
126                        if ix < feature_ix:
127                            for i in range(ix, feature_ix):
128                                x = self._generate_feature(
129                                    n_samples,
130                                    cardinality=cardinality,
131                                    ensure_rep=ensure_rep,
132                                    random_values=random_values,
133                                    low=low,
134                                    high=high,
135                                )
136                                X[ix] = x
137                                ix += 1
138
139                        x = self._configure_generate_feature(
140                            feature_attributes,
141                            n_samples,
142                            ensure_rep=ensure_rep,
143                            random_values=random_values,
144                            low=low,
145                            high=high,
146                        )
147
148                        X[ix] = x
149                        ix += 1
150
151            # Fill out the rest of the dataset
152            if ix < n_features:
153                for i in range(ix, n_features):
154                    x = self._generate_feature(
155                        n_samples,
156                        cardinality=cardinality,
157                        ensure_rep=ensure_rep,
158                        random_values=random_values,
159                        low=low,
160                        high=high,
161                    )
162                    X[i] = x
163
164        return X.T
165
166    def _configure_generate_feature(
167        self,
168        feature_attributes: int | list | ArrayLike,
169        n_samples: int,
170        ensure_rep: bool = False,
171        random_values: bool | None = False,
172        low: int | None = 0,
173        high: int | None = 1000,
174    ) -> np.ndarray:
175
176        """
177        Helper function, calls _generate_feature with appropriate parameters based on feature_attributes
178        :param feature_attributes: either integer (cardinality) or list of feature attributes
179        :param n_samples: number of samples in dataset
180        :param ensure_rep: ensures all values are represented at least once in the feature vector
181        :param random_values: randomly picked values for vec if true, otherwise values range from [low, cardinality] with by 1
182        :param low: lower bound of random feature vector values
183        :param high: upper bound of random feature vector values
184        :return: feature vector
185        """
186
187        # feature_cardinality is just an integer, generate feature either with random values or
188        # [low, low+cardinality]
189        if not isinstance(feature_attributes, (list, np.ndarray)):
190            x = self._generate_feature(
191                n_samples,
192                cardinality=feature_attributes,
193                ensure_rep=ensure_rep,
194                random_values=random_values,
195                low=low,
196                high=high,
197            )
198        # feature_cardinality is a list of [value_domain, value_frequencies]
199        else:
200            if isinstance(feature_attributes[0], (list, np.ndarray)):
201                value_domain, value_frequencies = feature_attributes
202                x = self._generate_feature(
203                    n_samples,
204                    vec=value_domain,
205                    ensure_rep=ensure_rep,
206                    p=value_frequencies,
207                )
208            # feature_cardinality is value_domain (list of values for feature)
209            else:
210                value_domain = feature_attributes
211                x = self._generate_feature(
212                    n_samples,
213                    vec=value_domain,
214                    ensure_rep=ensure_rep,
215                )
216
217        return x
218
219    def _generate_feature(
220        self,
221        size: int,
222        vec: list[int] | ArrayLike | None = None,
223        cardinality: int = 5,
224        ensure_rep: bool = False,
225        random_values: bool | None = False,
226        low: int | None = 0,
227        high: int | None = 1000,
228        p: list[float] | np.ndarray | None = None,
229    ) -> np.ndarray:
230        """
231        Generates feature vector of length size. Default probability density distribution is approximately normal, centred around a randomly picked value.
232        :param vec: list of feature values
233        :param cardinality: single value cardinality
234        :param size: length of feature vector
235        :param ensure_rep: ensures all values are represented at least once in the feature vector
236        :param random_values: randomly picked values for vec if true, otherwise values range from [low, cardinality] with by 1
237        :param low: lower bound of random feature vector values
238        :param high: upper bound of random feature vector values
239        :param p: list of probabilities of each value
240        :return: feature vector x
241        """
242
243        if vec is None:
244            if random_values:
245                vec = np.random.choice(range(low, high + 1), cardinality, replace=False)
246            else:
247                vec = np.arange(low, low + cardinality, 1)
248        else:
249            vec = np.array(vec)
250
251        if p is None:
252            v_shift = vec - vec[np.random.randint(len(vec))]
253            p = norm.pdf(v_shift, scale=3)
254        else:
255            p = np.array(p)
256
257        p = p / p.sum()
258
259        if ensure_rep and len(vec) < size:
260            sampled_values = np.random.choice(vec, size=(size - len(vec)), p=p)
261            sampled_values = np.append(sampled_values, vec)
262        else:
263            sampled_values = np.random.choice(vec, size=size, p=p)
264
265        np.random.shuffle(sampled_values)
266        return sampled_values
267
268    def generate_combinations(
269        self,
270        X: ArrayLike,
271        feature_indices: list[int] | ArrayLike,
272        combination_function: Optional = None,
273        combination_type: Literal['linear', 'nonlinear'] = 'linear',
274    ) -> np.ndarray:
275        """
276        Generates linear, nonlinear, or custom combinations within feature vectors in given dataset X
277        :param X: dataset
278        :param feature_indices: indexes of features to be in combination
279        :param combination_function: optional custom function for combining feature vectors
280        :param combination_type: string flag, either liner or nonlinear, defining combination type
281        :return: X with added resultant feature
282        """
283
284        selected_features = X[:, feature_indices]
285
286        if combination_function is None:
287            if combination_type == 'linear':
288                combination_function = lambda x: np.sum(x, axis=1)
289            elif combination_type == 'nonlinear':
290                combination_function = lambda x: np.sin(np.sum(x, axis=1))
291        else:
292            combination_type = str(combination_function.__name__)
293
294        combination_result = combination_function(selected_features)
295
296        combination_ix = len(X[0])
297
298        self.dataset_info['combinations'].append({
299            'feature_indices': feature_indices,
300            'combination_type': combination_type,
301            'combination_ix': combination_ix,
302        })
303
304        return np.column_stack((X, combination_result))
305
306    def _xor(self, arr: list[int] | ArrayLike) -> np.ndarray:
307        """
308        Performs bitwise XOR operation on two integer arrays
309        :param arr: features to perform XOR operation on
310        :return: bitwise XOR result
311        """
312        arrT = arr.T
313        arrT = arrT.astype(int)
314        out = np.bitwise_xor(arrT[0], arrT[1])
315        if len(arrT) > 2:
316            for i in range(2, len(arrT)):
317                out = np.bitwise_xor(out, arrT[i])
318
319        return out.T
320
321    def _and(self, arr: list[int] | ArrayLike) -> np.ndarray:
322        """
323        Performs bitwise AND operation on two integer arrays
324        :param arr: features to perform AND operation on
325        :return: bitwise AND result
326        """
327        arrT = arr.T
328        arrT = arrT.astype(int)
329        out = np.bitwise_xor(arrT[0], arrT[1])
330        if len(arrT) > 2:
331            for i in range(2, len(arrT)):
332                out = np.bitwise_and(out, arrT[i])
333
334        return out.T
335
336    def _or(self, arr: list[int] | ArrayLike) -> np.ndarray:
337        """
338        Performs bitwise OR operation on two integer arrays
339        :param arr: features to perform OR operation on
340        :return: bitwise OR result
341        """
342        arrT = arr.T
343        arrT = arrT.astype(int)
344        out = np.bitwise_xor(arrT[0], arrT[1])
345        if len(arrT) > 2:
346            for i in range(2, len(arrT)):
347                out = np.bitwise_or(out, arrT[i])
348
349        return out.T
350
351    def generate_correlated(
352        self,
353        X: ArrayLike,
354        feature_indices: list[int] | ArrayLike,
355        r: float = 0.8,
356    ) -> np.ndarray:
357
358        """
359        Generates correlated features using the given feature indices. Correlation is based on cosine of angle between vectors with mean 0.
360        :param X: dataset
361        :param feature_indices: indices of features to generate correlated feature to
362        :param r: (Pearson) correlation factor
363        :return: X with generated correlated  features
364        """
365
366        if not isinstance(feature_indices, (list, np.ndarray)):
367            feature_indices = np.array([feature_indices])
368
369        if len(feature_indices) > 1:
370            correlated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices)), 1)
371        else:
372            correlated_ixs = len(X[0])
373
374        selected_features = X[:, feature_indices]
375        transposed = np.transpose(selected_features)
376        correlated_features = []
377
378        for t in transposed:
379            theta = np.arccos(r)
380            t_standard = (t - np.mean(t)) / (np.std(t) + 1e-10)
381
382            rand = np.random.normal(0, 1, len(t_standard))
383            rand = (rand - np.mean(rand)) / (np.std(rand) + 1e-10)
384
385            M = np.column_stack((t_standard, rand))
386            M_centred = (M - np.mean(M, axis=0))
387
388            Id = np.eye(len(t))
389            Q = qr(M_centred[:, [0]], mode='economic')[0]
390            P = np.dot(Q, Q.T)
391            orthogonal_projection = np.dot(Id - P, M_centred[:, 1])
392            M_orthogonal = np.column_stack((M_centred[:, 0], orthogonal_projection))
393
394            Y = np.dot(M_orthogonal, np.diag(1 / np.sqrt(np.sum(M_orthogonal ** 2, axis=0))))
395            corr = Y[:, 1] + (1 / np.tan(theta)) * Y[:, 0]
396
397            correlated_features.append(corr)
398
399        correlated_features = np.transpose(correlated_features)
400
401        self.dataset_info['correlations'].append({
402            'feature_indices': feature_indices,
403            'correlated_indices': correlated_ixs,
404            'correlation_factor': r,
405        })
406
407        return np.column_stack((X, correlated_features))
408
409    def generate_duplicates(
410        self,
411        X: ArrayLike,
412        feature_indices: list[int] | ArrayLike,
413    ) -> np.ndarray:
414        """
415        Generates duplicate features
416        :param X: dataset
417        :param feature_indices: indices of features to duplicate
418        :return: dataset with duplicated features
419        """
420        if not isinstance(feature_indices, (list, np.ndarray)):
421            feature_indices = np.array([feature_indices])
422
423        duplicated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices) - 1), 1)
424
425        selected_features = X[:, feature_indices]
426
427        self.dataset_info['duplicates'].append({
428            'feature_indices': feature_indices,
429            'duplicate_indices': duplicated_ixs,
430        })
431
432        return np.column_stack((X, selected_features))
433
434    def generate_labels(
435        self,
436        X: ArrayLike,
437        n: int = 2,
438        p: float | list[float] | ArrayLike = 0.5,
439        k: int | float = 2,
440        decision_function: Optional = None,
441        class_relation: Literal['linear', 'nonlinear', 'cluster'] = 'linear',
442        balance: bool = False,
443        random_state: int = 42,
444    ):
445        """
446        Generates labels for dataset X
447        :param X: dataset
448        :param n: number of class labels
449        :param p: class distribution
450        :param k: constant
451        :param decision_function: optional user-defined decision function
452        :param class_relation: string, either 'linear', 'nonlinear', or 'cluster'
453        :param balance: boolean, whether to balance clustering class labels
454        :param random_state: seed for KMeans clustering, defaults to 42
455        :return: array of labels, corresponding to dataset X
456        """
457
458        if isinstance(p, (list, np.ndarray)):
459            if sum(p) > 1: raise ValueError('sum of values in must be less than 1.0')
460            if len(p) > n: raise ValueError('length of p must equal n')
461
462        if p > 1: raise ValueError('p must be less than 1.0')
463
464        n_samples, n_features = X.shape
465
466        if decision_function is None:
467            if class_relation == 'linear':
468                decision_function = lambda x: np.sum(2 * x + 3, axis=1)
469            elif class_relation == 'nonlinear':
470                decision_function = lambda x: np.sum(k * np.sin(x) + k * np.cos(x), axis=1)
471            elif class_relation == 'cluster':
472                decision_function = None
473        else:
474            class_relation = str(decision_function.__name__)
475
476        y = []
477        if decision_function is not None:
478            if n > 2:
479                if type(p) != list:
480                    p = 1 / n
481                    percentiles = [p * 100]
482                    for i in range(1, n - 1):
483                        percentiles.append(percentiles[i - 1] + (p * 100))
484
485                    decision_boundary = decision_function(X)
486                    p_points = np.percentile(decision_boundary, percentiles)
487
488                    y = np.zeros_like(decision_boundary, dtype=int)
489                    for p_point in p_points:
490                        y += (decision_boundary > p_point)
491                else:
492                    decision_boundary = decision_function(X)
493                    percentiles = [x * 100 for x in p]
494
495                    for i in range(1, len(percentiles) - 1):
496                        percentiles[i] += percentiles[i - 1]
497
498                    percentiles.insert(0, 0)
499                    percentiles.pop()
500                    print(percentiles)
501
502                    p_points = np.percentile(decision_boundary, percentiles)
503                    print(p_points)
504
505                    y = np.zeros_like(decision_boundary, dtype=int)
506                    for i in range(1, n):
507                        p_point = p_points[i]
508                        for j in range(len(decision_boundary)):
509                            if decision_boundary[j] > p_point:
510                                y[j] += 1
511            else:
512                decision_boundary = decision_function(X)
513                p_point = np.percentile(decision_boundary, p * 100)
514                y = np.where(decision_boundary > p_point, 1, 0)
515        else:
516            if p == 0.5:
517                p = 1.0
518            else:
519                p = [p, 1 - p]
520            y = self._cluster_data(X, n, p=p, balance=balance, random_state=random_state)
521
522        self.dataset_info.update({
523            'labels': {
524                'class_relation': class_relation,
525                'n_class': n,
526            },
527        })
528
529        return y
530
531    def _cluster_data(
532        self,
533        X: ArrayLike,
534        n: int,
535        p: float | list[float] | ArrayLike | None = 1.0,
536        balance: bool = False,
537        random_state: int = 42,
538    ) -> np.ndarray:
539        """
540        Cluster data using kmeans
541        :param X: dataset
542        :param n: number of clusters
543        :param p: class distribution
544        :param balance: balance the clusters according to p
545        :random_state: seed for KMeans clustering, defaults to 42
546        :return: array of labels, corresponding to dataset X
547        """
548
549        kmeans = KMeans(n_clusters=n, random_state=random_state)
550
551        kmeans.fit(X)
552
553        cluster_labels = kmeans.labels_
554
555        # Fully balanced clusters
556        if not isinstance(p, (list, np.ndarray)):
557            samples_per_cluster = [len(X) // n] * n
558        else:
559            samples = len(X)
560            samples_per_cluster = []
561            if not isinstance(p, (list, np.ndarray)):
562                samples_per_cluster.append(int(samples * p) // n)
563                samples_per_cluster.append(int(samples * (1 - p)) // n)
564            else:
565                if len(p) == n:
566                    for val in p:
567                        samples_per_cluster.append(int(samples * val))
568                else:
569                    raise Exception('Length of balance parameter must equal number of clusters.')
570
571        # Adjust cluster sizes
572        if balance:
573            adjustments = []
574            overflow_samples = []
575            overflow_indices = []
576            for i in range(n):
577                cluster_size = np.sum(cluster_labels == i)
578
579                adjustment = samples_per_cluster[i] - cluster_size
580                adjustments.append(adjustment)
581
582                # Cluster is too large
583                if adjustment < 0:
584                    centroid = kmeans.cluster_centers_[i]
585                    # Indices of samples in dataset
586                    dataset_indices = np.where(cluster_labels == i)[0]
587                    cluster_samples = np.copy(X[dataset_indices])
588
589                    distances = np.linalg.norm(
590                        cluster_samples - centroid,
591                        axis=1,
592                    )  # Distances of cluster samples to cluster centroid
593                    cluster_sample_indices = np.argsort(distances)
594                    dataset_indices_sorted = dataset_indices[
595                        cluster_sample_indices
596                    ]  # Indices of samples sorted by sample distance to cluster centroid
597
598                    overflow_sample_indices = cluster_sample_indices[samples_per_cluster[i]:]  # Overflow samples
599                    dataset_indices_sorted = dataset_indices_sorted[
600                                             samples_per_cluster[i]:
601                    ]  # Dataset indices of overflow samples
602
603                    for i in range(len(overflow_sample_indices)):
604                        overflow_samples.append(cluster_samples[overflow_sample_indices[i]])
605                        overflow_indices.append(dataset_indices_sorted[i])
606
607            overflow_samples = np.array(overflow_samples)
608            overflow_indices = np.array(overflow_indices)
609
610            # Making adjustments
611            for i in range(n):
612
613                if adjustments[i] > 0:
614                    centroid = kmeans.cluster_centers_[i]
615                    distances = np.linalg.norm(overflow_samples - centroid, axis=1)
616
617                    closest_sample_indices = np.argsort(distances)
618
619                    overflow_indices_sorted = overflow_indices[closest_sample_indices]
620
621                    sample_indices_slice = closest_sample_indices[:adjustments[i]]
622                    overflow_indices_slice = overflow_indices_sorted[:adjustments[i]]
623
624                    cluster_labels[overflow_indices_slice] = i
625
626                    overflow_samples = np.delete(overflow_samples, sample_indices_slice, axis=0)
627                    overflow_indices = np.delete(overflow_indices, sample_indices_slice, axis=0)
628
629        return np.array(cluster_labels)
630
631    def generate_noise(
632        self,
633        X: ArrayLike,
634        y: list[int] | ArrayLike,
635        p: float = 0.2,
636        type: Literal['categorical', 'missing'] = 'categorical',
637        missing_val: str | int | float = float('-inf'),
638    ) -> np.ndarray:
639
640        """
641        Simulates noise on given dataset X
642        :param X: dataset to apply noise to
643        :param y: required target labels for categorical noise generation
644        :param p: amount of noise to apply. Defaults to 0.2
645        :param type: type of noise to apply, either categorical or missing
646        :param missing_val: value to simulate missing values. Defaults to float('-inf')
647        :return: X with noise applied
648        """
649
650        self.dataset_info['noise'].append({
651            'type': type,
652            'amount': p,
653        })
654
655        if type == 'categorical':
656            label_values, label_count = np.unique(y, return_counts=True)
657            n_labels = len(label_values)
658
659            inds = y.argsort()
660            y_sort = y[inds]
661            X_sort = X[inds]
662
663            Xs_T = X_sort.T
664            n = Xs_T.shape[1]
665            n_flip = int(n * p)
666
667            for feature in Xs_T:
668                unique_per_label = {}
669
670                for i in range(n_labels):
671                    if i == 0:
672                        unique = np.unique(feature[:label_count[i]])
673                        unique_per_label[label_values[i]] = set(unique)
674                    else:
675                        unique = np.unique(feature[label_count[i - 1]:label_count[i - 1] + label_count[i] - 1])
676                        unique_per_label[label_values[i]] = set(unique)
677
678                ixs = np.random.choice(n, n_flip, replace=False)
679
680                for ix in ixs:
681                    current_label = y_sort[ix]
682                    possible_labels = np.where(label_values != current_label)[0]
683
684                    # find all unique values from labels != current label
685                    values = set()
686                    for key in possible_labels:
687                        values = values.union(unique_per_label[key])
688
689                    # remove any overlapping values, ensuring replacement values are unique & from a target label !=
690                    # current label
691                    for val in unique_per_label[current_label] & values:
692                        values.remove(val)
693
694                    if len(values) > 0:
695                        val = np.random.choice(list(values))
696
697                    else:
698                        key = possible_labels[np.random.randint(len(possible_labels))]
699                        values = unique_per_label[key]
700                        val = np.random.choice(list(values))
701
702                    feature[ix] = val
703
704            rev_ind = inds.argsort()
705            X_noise = Xs_T.T
706            X_noise = X_noise[rev_ind]
707
708            return X_noise
709
710        elif type == 'missing':
711            X_noise = np.copy(X)
712            Xn_T = X_noise.T
713            n = Xn_T.shape[1]
714            n_missing = int(n * p)
715            #print("n to delete:", n_missing)
716
717            for feature in Xn_T:
718                ixs = np.random.choice(n, n_missing, replace=False)
719
720                for ix in ixs:
721                    feature[ix] = missing_val
722
723            return Xn_T.T
724
725        else:
726            raise ValueError(f'Type {type} not supported')
727
728    def downsample_dataset(
729        self,
730        X: ArrayLike,
731        y: list[int] | ArrayLike,
732        N: int | None = None,
733        seed: int = 42,
734        reshuffle: bool = False,
735    ) -> tuple[np.ndarray, np.ndarray]:
736
737        """
738        Downsamples dataset X according to N or the number of samples in minority class, resulting in a balanced dataset.
739        :param X: Dataset to downsample
740        :param y: Labels corresponding to X
741        :param N: Optional number of samples per class to downsample to
742        :param seed: Seed for random state of resample function
743        :param reshuffle: Reshuffle the dataset after downsampling
744        :return: Balanced X and y after downsampling
745        """
746
747        original_shape = X.shape
748
749        values, counts = np.unique(y, return_counts=True)
750        if N is None:
751            N = min(counts)
752
753        if N > min(counts):
754            raise ValueError('N must be equal to or less than the number of samples in minority class')
755
756        X_arrays_list = []
757        y_downsampled = []
758        for label in values:
759            X_label = [X[i] for i in range(len(y)) if y[i] == label]
760            X_label_downsample = resample(
761                X_label,
762                replace=True,
763                n_samples=N,
764                random_state=seed,
765            )
766            X_arrays_list.append(X_label_downsample)
767            ys = [label] * N
768            y_downsampled = np.concatenate((y_downsampled, ys), axis=0)
769
770        X_downsampled = np.concatenate(X_arrays_list, axis=0)
771
772        if reshuffle:
773            indices = np.arange(len(X_downsampled))
774            np.random.shuffle(indices)
775            X_downsampled = X_downsampled[indices]
776            y_downsampled = y_downsampled[indices]
777
778        downsampled_shape = X_downsampled.shape
779
780        self.dataset_info.update({
781            'downsampling': {
782                'original_shape': original_shape,
783                'downsampled_shape': downsampled_shape,
784            },
785        })
786
787        return X_downsampled, y_downsampled
788
789    def print_dataset(
790        self,
791        X: ArrayLike,
792        y: ArrayLike,
793    ):
794        """
795        Prints given dataset
796        :param X: dataset
797        :param y: labels
798        :return:
799        """
800
801        n_samples, n_features = X.shape
802        n = 0
803        for arr in X:
804            print('[', end='')
805            for i in range(n_features):
806                if i == n_features - 1:
807                    print(arr[i], end='')
808                else:
809                    print(arr[i], end=', ')
810            print(f'], Label: {y[n]}')
811            n += 1
812
813    """
814    def summarize(self):
815        # TODO: Logging function
816    """
class CategoricalClassification:
 18class CategoricalClassification:
 19
 20    def __init__(self, seed: int = 42):
 21        np.random.seed(seed)
 22        self.dataset_info = {
 23            'general': {},
 24            'combinations': [],
 25            'correlations': [],
 26            'duplicates': [],
 27            'labels': {},
 28            'noise': [],
 29        }
 30
 31    def __repr__(self):
 32        return f"CategoricalClassification(dataset_info={self.dataset_info})"
 33
 34    def generate_data(
 35        self,
 36        n_features: int,
 37        n_samples: int,
 38        cardinality: int = 5,
 39        structure: list | ArrayLike | None = None,
 40        ensure_rep: bool = False,
 41        random_values: bool | None = False,
 42        low: int | None = 0,
 43        high: int | None = 1000,
 44        seed: int = 42,
 45    ) -> np.ndarray:
 46
 47        """
 48        Generates dataset based on given parameters
 49        :param n_features: number of generated features
 50        :param n_samples: number of generated samples
 51        :param cardinality: default cardinality of the dataset
 52        :param structure: structure of the dataset
 53        :param ensure_rep: flag, ensures all given values represented
 54        :param random_values: flag, enables random (integer) feature values from set [low, high]
 55        :param low: sets lower bound of random feature values
 56        :param high: sets high bound of random feature values
 57        :param seed: sets seed of numpy random
 58        :return: X, 2D dataset
 59        """
 60
 61        self.dataset_info.update({
 62            'general': {
 63                'n_features': n_features,
 64                'n_samples': n_samples,
 65                'cardinality': cardinality,
 66                'structure': structure,
 67                'ensure_rep': ensure_rep,
 68                'seed': seed,
 69            },
 70        })
 71
 72        np.random.seed(seed)
 73        X = np.empty([n_features, n_samples])
 74
 75        # No specific structure parameter passed
 76        if structure is None:
 77            for i in range(n_features):
 78                x = self._generate_feature(
 79                    n_samples,
 80                    cardinality=cardinality,
 81                    ensure_rep=ensure_rep,
 82                    random_values=random_values,
 83                    low=low,
 84                    high=high,
 85                )
 86                X[i] = x
 87        # Structure parameter passed, building based on structure
 88        else:
 89            ix = 0
 90            for data in structure:
 91
 92                # Data in structure is a tuple of (feature index (integer), feature attributes)
 93                if not isinstance(data[0], (list, np.ndarray)):
 94                    feature_ix, feature_attributes = data
 95
 96                    # Filling out the dataset up to column index feature_ix
 97                    if ix < feature_ix:
 98                        for i in range(ix, feature_ix):
 99                            x = self._generate_feature(
100                                n_samples,
101                                cardinality=cardinality,
102                                ensure_rep=ensure_rep,
103                                random_values=random_values,
104                                low=low,
105                                high=high,
106                            )
107                            X[ix] = x
108                            ix += 1
109
110                    x = self._configure_generate_feature(
111                        feature_attributes,
112                        n_samples,
113                        ensure_rep=ensure_rep,
114                        random_values=random_values,
115                        low=low,
116                        high=high,
117                    )
118                    X[ix] = x
119                    ix += 1
120
121                # Data in structure is a tuple of (list of feature indexes, feature attributes)
122                else:
123                    feature_ixs, feature_attributes = data
124
125                    # Filling out the dataset up to feature_ix
126                    for feature_ix in feature_ixs:
127                        if ix < feature_ix:
128                            for i in range(ix, feature_ix):
129                                x = self._generate_feature(
130                                    n_samples,
131                                    cardinality=cardinality,
132                                    ensure_rep=ensure_rep,
133                                    random_values=random_values,
134                                    low=low,
135                                    high=high,
136                                )
137                                X[ix] = x
138                                ix += 1
139
140                        x = self._configure_generate_feature(
141                            feature_attributes,
142                            n_samples,
143                            ensure_rep=ensure_rep,
144                            random_values=random_values,
145                            low=low,
146                            high=high,
147                        )
148
149                        X[ix] = x
150                        ix += 1
151
152            # Fill out the rest of the dataset
153            if ix < n_features:
154                for i in range(ix, n_features):
155                    x = self._generate_feature(
156                        n_samples,
157                        cardinality=cardinality,
158                        ensure_rep=ensure_rep,
159                        random_values=random_values,
160                        low=low,
161                        high=high,
162                    )
163                    X[i] = x
164
165        return X.T
166
167    def _configure_generate_feature(
168        self,
169        feature_attributes: int | list | ArrayLike,
170        n_samples: int,
171        ensure_rep: bool = False,
172        random_values: bool | None = False,
173        low: int | None = 0,
174        high: int | None = 1000,
175    ) -> np.ndarray:
176
177        """
178        Helper function, calls _generate_feature with appropriate parameters based on feature_attributes
179        :param feature_attributes: either integer (cardinality) or list of feature attributes
180        :param n_samples: number of samples in dataset
181        :param ensure_rep: ensures all values are represented at least once in the feature vector
182        :param random_values: randomly picked values for vec if true, otherwise values range from [low, cardinality] with by 1
183        :param low: lower bound of random feature vector values
184        :param high: upper bound of random feature vector values
185        :return: feature vector
186        """
187
188        # feature_cardinality is just an integer, generate feature either with random values or
189        # [low, low+cardinality]
190        if not isinstance(feature_attributes, (list, np.ndarray)):
191            x = self._generate_feature(
192                n_samples,
193                cardinality=feature_attributes,
194                ensure_rep=ensure_rep,
195                random_values=random_values,
196                low=low,
197                high=high,
198            )
199        # feature_cardinality is a list of [value_domain, value_frequencies]
200        else:
201            if isinstance(feature_attributes[0], (list, np.ndarray)):
202                value_domain, value_frequencies = feature_attributes
203                x = self._generate_feature(
204                    n_samples,
205                    vec=value_domain,
206                    ensure_rep=ensure_rep,
207                    p=value_frequencies,
208                )
209            # feature_cardinality is value_domain (list of values for feature)
210            else:
211                value_domain = feature_attributes
212                x = self._generate_feature(
213                    n_samples,
214                    vec=value_domain,
215                    ensure_rep=ensure_rep,
216                )
217
218        return x
219
220    def _generate_feature(
221        self,
222        size: int,
223        vec: list[int] | ArrayLike | None = None,
224        cardinality: int = 5,
225        ensure_rep: bool = False,
226        random_values: bool | None = False,
227        low: int | None = 0,
228        high: int | None = 1000,
229        p: list[float] | np.ndarray | None = None,
230    ) -> np.ndarray:
231        """
232        Generates feature vector of length size. Default probability density distribution is approximately normal, centred around a randomly picked value.
233        :param vec: list of feature values
234        :param cardinality: single value cardinality
235        :param size: length of feature vector
236        :param ensure_rep: ensures all values are represented at least once in the feature vector
237        :param random_values: randomly picked values for vec if true, otherwise values range from [low, cardinality] with by 1
238        :param low: lower bound of random feature vector values
239        :param high: upper bound of random feature vector values
240        :param p: list of probabilities of each value
241        :return: feature vector x
242        """
243
244        if vec is None:
245            if random_values:
246                vec = np.random.choice(range(low, high + 1), cardinality, replace=False)
247            else:
248                vec = np.arange(low, low + cardinality, 1)
249        else:
250            vec = np.array(vec)
251
252        if p is None:
253            v_shift = vec - vec[np.random.randint(len(vec))]
254            p = norm.pdf(v_shift, scale=3)
255        else:
256            p = np.array(p)
257
258        p = p / p.sum()
259
260        if ensure_rep and len(vec) < size:
261            sampled_values = np.random.choice(vec, size=(size - len(vec)), p=p)
262            sampled_values = np.append(sampled_values, vec)
263        else:
264            sampled_values = np.random.choice(vec, size=size, p=p)
265
266        np.random.shuffle(sampled_values)
267        return sampled_values
268
269    def generate_combinations(
270        self,
271        X: ArrayLike,
272        feature_indices: list[int] | ArrayLike,
273        combination_function: Optional = None,
274        combination_type: Literal['linear', 'nonlinear'] = 'linear',
275    ) -> np.ndarray:
276        """
277        Generates linear, nonlinear, or custom combinations within feature vectors in given dataset X
278        :param X: dataset
279        :param feature_indices: indexes of features to be in combination
280        :param combination_function: optional custom function for combining feature vectors
281        :param combination_type: string flag, either liner or nonlinear, defining combination type
282        :return: X with added resultant feature
283        """
284
285        selected_features = X[:, feature_indices]
286
287        if combination_function is None:
288            if combination_type == 'linear':
289                combination_function = lambda x: np.sum(x, axis=1)
290            elif combination_type == 'nonlinear':
291                combination_function = lambda x: np.sin(np.sum(x, axis=1))
292        else:
293            combination_type = str(combination_function.__name__)
294
295        combination_result = combination_function(selected_features)
296
297        combination_ix = len(X[0])
298
299        self.dataset_info['combinations'].append({
300            'feature_indices': feature_indices,
301            'combination_type': combination_type,
302            'combination_ix': combination_ix,
303        })
304
305        return np.column_stack((X, combination_result))
306
307    def _xor(self, arr: list[int] | ArrayLike) -> np.ndarray:
308        """
309        Performs bitwise XOR operation on two integer arrays
310        :param arr: features to perform XOR operation on
311        :return: bitwise XOR result
312        """
313        arrT = arr.T
314        arrT = arrT.astype(int)
315        out = np.bitwise_xor(arrT[0], arrT[1])
316        if len(arrT) > 2:
317            for i in range(2, len(arrT)):
318                out = np.bitwise_xor(out, arrT[i])
319
320        return out.T
321
322    def _and(self, arr: list[int] | ArrayLike) -> np.ndarray:
323        """
324        Performs bitwise AND operation on two integer arrays
325        :param arr: features to perform AND operation on
326        :return: bitwise AND result
327        """
328        arrT = arr.T
329        arrT = arrT.astype(int)
330        out = np.bitwise_xor(arrT[0], arrT[1])
331        if len(arrT) > 2:
332            for i in range(2, len(arrT)):
333                out = np.bitwise_and(out, arrT[i])
334
335        return out.T
336
337    def _or(self, arr: list[int] | ArrayLike) -> np.ndarray:
338        """
339        Performs bitwise OR operation on two integer arrays
340        :param arr: features to perform OR operation on
341        :return: bitwise OR result
342        """
343        arrT = arr.T
344        arrT = arrT.astype(int)
345        out = np.bitwise_xor(arrT[0], arrT[1])
346        if len(arrT) > 2:
347            for i in range(2, len(arrT)):
348                out = np.bitwise_or(out, arrT[i])
349
350        return out.T
351
352    def generate_correlated(
353        self,
354        X: ArrayLike,
355        feature_indices: list[int] | ArrayLike,
356        r: float = 0.8,
357    ) -> np.ndarray:
358
359        """
360        Generates correlated features using the given feature indices. Correlation is based on cosine of angle between vectors with mean 0.
361        :param X: dataset
362        :param feature_indices: indices of features to generate correlated feature to
363        :param r: (Pearson) correlation factor
364        :return: X with generated correlated  features
365        """
366
367        if not isinstance(feature_indices, (list, np.ndarray)):
368            feature_indices = np.array([feature_indices])
369
370        if len(feature_indices) > 1:
371            correlated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices)), 1)
372        else:
373            correlated_ixs = len(X[0])
374
375        selected_features = X[:, feature_indices]
376        transposed = np.transpose(selected_features)
377        correlated_features = []
378
379        for t in transposed:
380            theta = np.arccos(r)
381            t_standard = (t - np.mean(t)) / (np.std(t) + 1e-10)
382
383            rand = np.random.normal(0, 1, len(t_standard))
384            rand = (rand - np.mean(rand)) / (np.std(rand) + 1e-10)
385
386            M = np.column_stack((t_standard, rand))
387            M_centred = (M - np.mean(M, axis=0))
388
389            Id = np.eye(len(t))
390            Q = qr(M_centred[:, [0]], mode='economic')[0]
391            P = np.dot(Q, Q.T)
392            orthogonal_projection = np.dot(Id - P, M_centred[:, 1])
393            M_orthogonal = np.column_stack((M_centred[:, 0], orthogonal_projection))
394
395            Y = np.dot(M_orthogonal, np.diag(1 / np.sqrt(np.sum(M_orthogonal ** 2, axis=0))))
396            corr = Y[:, 1] + (1 / np.tan(theta)) * Y[:, 0]
397
398            correlated_features.append(corr)
399
400        correlated_features = np.transpose(correlated_features)
401
402        self.dataset_info['correlations'].append({
403            'feature_indices': feature_indices,
404            'correlated_indices': correlated_ixs,
405            'correlation_factor': r,
406        })
407
408        return np.column_stack((X, correlated_features))
409
410    def generate_duplicates(
411        self,
412        X: ArrayLike,
413        feature_indices: list[int] | ArrayLike,
414    ) -> np.ndarray:
415        """
416        Generates duplicate features
417        :param X: dataset
418        :param feature_indices: indices of features to duplicate
419        :return: dataset with duplicated features
420        """
421        if not isinstance(feature_indices, (list, np.ndarray)):
422            feature_indices = np.array([feature_indices])
423
424        duplicated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices) - 1), 1)
425
426        selected_features = X[:, feature_indices]
427
428        self.dataset_info['duplicates'].append({
429            'feature_indices': feature_indices,
430            'duplicate_indices': duplicated_ixs,
431        })
432
433        return np.column_stack((X, selected_features))
434
435    def generate_labels(
436        self,
437        X: ArrayLike,
438        n: int = 2,
439        p: float | list[float] | ArrayLike = 0.5,
440        k: int | float = 2,
441        decision_function: Optional = None,
442        class_relation: Literal['linear', 'nonlinear', 'cluster'] = 'linear',
443        balance: bool = False,
444        random_state: int = 42,
445    ):
446        """
447        Generates labels for dataset X
448        :param X: dataset
449        :param n: number of class labels
450        :param p: class distribution
451        :param k: constant
452        :param decision_function: optional user-defined decision function
453        :param class_relation: string, either 'linear', 'nonlinear', or 'cluster'
454        :param balance: boolean, whether to balance clustering class labels
455        :param random_state: seed for KMeans clustering, defaults to 42
456        :return: array of labels, corresponding to dataset X
457        """
458
459        if isinstance(p, (list, np.ndarray)):
460            if sum(p) > 1: raise ValueError('sum of values in must be less than 1.0')
461            if len(p) > n: raise ValueError('length of p must equal n')
462
463        if p > 1: raise ValueError('p must be less than 1.0')
464
465        n_samples, n_features = X.shape
466
467        if decision_function is None:
468            if class_relation == 'linear':
469                decision_function = lambda x: np.sum(2 * x + 3, axis=1)
470            elif class_relation == 'nonlinear':
471                decision_function = lambda x: np.sum(k * np.sin(x) + k * np.cos(x), axis=1)
472            elif class_relation == 'cluster':
473                decision_function = None
474        else:
475            class_relation = str(decision_function.__name__)
476
477        y = []
478        if decision_function is not None:
479            if n > 2:
480                if type(p) != list:
481                    p = 1 / n
482                    percentiles = [p * 100]
483                    for i in range(1, n - 1):
484                        percentiles.append(percentiles[i - 1] + (p * 100))
485
486                    decision_boundary = decision_function(X)
487                    p_points = np.percentile(decision_boundary, percentiles)
488
489                    y = np.zeros_like(decision_boundary, dtype=int)
490                    for p_point in p_points:
491                        y += (decision_boundary > p_point)
492                else:
493                    decision_boundary = decision_function(X)
494                    percentiles = [x * 100 for x in p]
495
496                    for i in range(1, len(percentiles) - 1):
497                        percentiles[i] += percentiles[i - 1]
498
499                    percentiles.insert(0, 0)
500                    percentiles.pop()
501                    print(percentiles)
502
503                    p_points = np.percentile(decision_boundary, percentiles)
504                    print(p_points)
505
506                    y = np.zeros_like(decision_boundary, dtype=int)
507                    for i in range(1, n):
508                        p_point = p_points[i]
509                        for j in range(len(decision_boundary)):
510                            if decision_boundary[j] > p_point:
511                                y[j] += 1
512            else:
513                decision_boundary = decision_function(X)
514                p_point = np.percentile(decision_boundary, p * 100)
515                y = np.where(decision_boundary > p_point, 1, 0)
516        else:
517            if p == 0.5:
518                p = 1.0
519            else:
520                p = [p, 1 - p]
521            y = self._cluster_data(X, n, p=p, balance=balance, random_state=random_state)
522
523        self.dataset_info.update({
524            'labels': {
525                'class_relation': class_relation,
526                'n_class': n,
527            },
528        })
529
530        return y
531
532    def _cluster_data(
533        self,
534        X: ArrayLike,
535        n: int,
536        p: float | list[float] | ArrayLike | None = 1.0,
537        balance: bool = False,
538        random_state: int = 42,
539    ) -> np.ndarray:
540        """
541        Cluster data using kmeans
542        :param X: dataset
543        :param n: number of clusters
544        :param p: class distribution
545        :param balance: balance the clusters according to p
546        :random_state: seed for KMeans clustering, defaults to 42
547        :return: array of labels, corresponding to dataset X
548        """
549
550        kmeans = KMeans(n_clusters=n, random_state=random_state)
551
552        kmeans.fit(X)
553
554        cluster_labels = kmeans.labels_
555
556        # Fully balanced clusters
557        if not isinstance(p, (list, np.ndarray)):
558            samples_per_cluster = [len(X) // n] * n
559        else:
560            samples = len(X)
561            samples_per_cluster = []
562            if not isinstance(p, (list, np.ndarray)):
563                samples_per_cluster.append(int(samples * p) // n)
564                samples_per_cluster.append(int(samples * (1 - p)) // n)
565            else:
566                if len(p) == n:
567                    for val in p:
568                        samples_per_cluster.append(int(samples * val))
569                else:
570                    raise Exception('Length of balance parameter must equal number of clusters.')
571
572        # Adjust cluster sizes
573        if balance:
574            adjustments = []
575            overflow_samples = []
576            overflow_indices = []
577            for i in range(n):
578                cluster_size = np.sum(cluster_labels == i)
579
580                adjustment = samples_per_cluster[i] - cluster_size
581                adjustments.append(adjustment)
582
583                # Cluster is too large
584                if adjustment < 0:
585                    centroid = kmeans.cluster_centers_[i]
586                    # Indices of samples in dataset
587                    dataset_indices = np.where(cluster_labels == i)[0]
588                    cluster_samples = np.copy(X[dataset_indices])
589
590                    distances = np.linalg.norm(
591                        cluster_samples - centroid,
592                        axis=1,
593                    )  # Distances of cluster samples to cluster centroid
594                    cluster_sample_indices = np.argsort(distances)
595                    dataset_indices_sorted = dataset_indices[
596                        cluster_sample_indices
597                    ]  # Indices of samples sorted by sample distance to cluster centroid
598
599                    overflow_sample_indices = cluster_sample_indices[samples_per_cluster[i]:]  # Overflow samples
600                    dataset_indices_sorted = dataset_indices_sorted[
601                                             samples_per_cluster[i]:
602                    ]  # Dataset indices of overflow samples
603
604                    for i in range(len(overflow_sample_indices)):
605                        overflow_samples.append(cluster_samples[overflow_sample_indices[i]])
606                        overflow_indices.append(dataset_indices_sorted[i])
607
608            overflow_samples = np.array(overflow_samples)
609            overflow_indices = np.array(overflow_indices)
610
611            # Making adjustments
612            for i in range(n):
613
614                if adjustments[i] > 0:
615                    centroid = kmeans.cluster_centers_[i]
616                    distances = np.linalg.norm(overflow_samples - centroid, axis=1)
617
618                    closest_sample_indices = np.argsort(distances)
619
620                    overflow_indices_sorted = overflow_indices[closest_sample_indices]
621
622                    sample_indices_slice = closest_sample_indices[:adjustments[i]]
623                    overflow_indices_slice = overflow_indices_sorted[:adjustments[i]]
624
625                    cluster_labels[overflow_indices_slice] = i
626
627                    overflow_samples = np.delete(overflow_samples, sample_indices_slice, axis=0)
628                    overflow_indices = np.delete(overflow_indices, sample_indices_slice, axis=0)
629
630        return np.array(cluster_labels)
631
632    def generate_noise(
633        self,
634        X: ArrayLike,
635        y: list[int] | ArrayLike,
636        p: float = 0.2,
637        type: Literal['categorical', 'missing'] = 'categorical',
638        missing_val: str | int | float = float('-inf'),
639    ) -> np.ndarray:
640
641        """
642        Simulates noise on given dataset X
643        :param X: dataset to apply noise to
644        :param y: required target labels for categorical noise generation
645        :param p: amount of noise to apply. Defaults to 0.2
646        :param type: type of noise to apply, either categorical or missing
647        :param missing_val: value to simulate missing values. Defaults to float('-inf')
648        :return: X with noise applied
649        """
650
651        self.dataset_info['noise'].append({
652            'type': type,
653            'amount': p,
654        })
655
656        if type == 'categorical':
657            label_values, label_count = np.unique(y, return_counts=True)
658            n_labels = len(label_values)
659
660            inds = y.argsort()
661            y_sort = y[inds]
662            X_sort = X[inds]
663
664            Xs_T = X_sort.T
665            n = Xs_T.shape[1]
666            n_flip = int(n * p)
667
668            for feature in Xs_T:
669                unique_per_label = {}
670
671                for i in range(n_labels):
672                    if i == 0:
673                        unique = np.unique(feature[:label_count[i]])
674                        unique_per_label[label_values[i]] = set(unique)
675                    else:
676                        unique = np.unique(feature[label_count[i - 1]:label_count[i - 1] + label_count[i] - 1])
677                        unique_per_label[label_values[i]] = set(unique)
678
679                ixs = np.random.choice(n, n_flip, replace=False)
680
681                for ix in ixs:
682                    current_label = y_sort[ix]
683                    possible_labels = np.where(label_values != current_label)[0]
684
685                    # find all unique values from labels != current label
686                    values = set()
687                    for key in possible_labels:
688                        values = values.union(unique_per_label[key])
689
690                    # remove any overlapping values, ensuring replacement values are unique & from a target label !=
691                    # current label
692                    for val in unique_per_label[current_label] & values:
693                        values.remove(val)
694
695                    if len(values) > 0:
696                        val = np.random.choice(list(values))
697
698                    else:
699                        key = possible_labels[np.random.randint(len(possible_labels))]
700                        values = unique_per_label[key]
701                        val = np.random.choice(list(values))
702
703                    feature[ix] = val
704
705            rev_ind = inds.argsort()
706            X_noise = Xs_T.T
707            X_noise = X_noise[rev_ind]
708
709            return X_noise
710
711        elif type == 'missing':
712            X_noise = np.copy(X)
713            Xn_T = X_noise.T
714            n = Xn_T.shape[1]
715            n_missing = int(n * p)
716            #print("n to delete:", n_missing)
717
718            for feature in Xn_T:
719                ixs = np.random.choice(n, n_missing, replace=False)
720
721                for ix in ixs:
722                    feature[ix] = missing_val
723
724            return Xn_T.T
725
726        else:
727            raise ValueError(f'Type {type} not supported')
728
729    def downsample_dataset(
730        self,
731        X: ArrayLike,
732        y: list[int] | ArrayLike,
733        N: int | None = None,
734        seed: int = 42,
735        reshuffle: bool = False,
736    ) -> tuple[np.ndarray, np.ndarray]:
737
738        """
739        Downsamples dataset X according to N or the number of samples in minority class, resulting in a balanced dataset.
740        :param X: Dataset to downsample
741        :param y: Labels corresponding to X
742        :param N: Optional number of samples per class to downsample to
743        :param seed: Seed for random state of resample function
744        :param reshuffle: Reshuffle the dataset after downsampling
745        :return: Balanced X and y after downsampling
746        """
747
748        original_shape = X.shape
749
750        values, counts = np.unique(y, return_counts=True)
751        if N is None:
752            N = min(counts)
753
754        if N > min(counts):
755            raise ValueError('N must be equal to or less than the number of samples in minority class')
756
757        X_arrays_list = []
758        y_downsampled = []
759        for label in values:
760            X_label = [X[i] for i in range(len(y)) if y[i] == label]
761            X_label_downsample = resample(
762                X_label,
763                replace=True,
764                n_samples=N,
765                random_state=seed,
766            )
767            X_arrays_list.append(X_label_downsample)
768            ys = [label] * N
769            y_downsampled = np.concatenate((y_downsampled, ys), axis=0)
770
771        X_downsampled = np.concatenate(X_arrays_list, axis=0)
772
773        if reshuffle:
774            indices = np.arange(len(X_downsampled))
775            np.random.shuffle(indices)
776            X_downsampled = X_downsampled[indices]
777            y_downsampled = y_downsampled[indices]
778
779        downsampled_shape = X_downsampled.shape
780
781        self.dataset_info.update({
782            'downsampling': {
783                'original_shape': original_shape,
784                'downsampled_shape': downsampled_shape,
785            },
786        })
787
788        return X_downsampled, y_downsampled
789
790    def print_dataset(
791        self,
792        X: ArrayLike,
793        y: ArrayLike,
794    ):
795        """
796        Prints given dataset
797        :param X: dataset
798        :param y: labels
799        :return:
800        """
801
802        n_samples, n_features = X.shape
803        n = 0
804        for arr in X:
805            print('[', end='')
806            for i in range(n_features):
807                if i == n_features - 1:
808                    print(arr[i], end='')
809                else:
810                    print(arr[i], end=', ')
811            print(f'], Label: {y[n]}')
812            n += 1
813
814    """
815    def summarize(self):
816        # TODO: Logging function
817    """
CategoricalClassification(seed: int = 42)
20    def __init__(self, seed: int = 42):
21        np.random.seed(seed)
22        self.dataset_info = {
23            'general': {},
24            'combinations': [],
25            'correlations': [],
26            'duplicates': [],
27            'labels': {},
28            'noise': [],
29        }
dataset_info
def generate_data( self, n_features: int, n_samples: int, cardinality: int = 5, structure: Union[list, numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]], NoneType] = None, ensure_rep: bool = False, random_values: bool | None = False, low: int | None = 0, high: int | None = 1000, seed: int = 42) -> numpy.ndarray:
 34    def generate_data(
 35        self,
 36        n_features: int,
 37        n_samples: int,
 38        cardinality: int = 5,
 39        structure: list | ArrayLike | None = None,
 40        ensure_rep: bool = False,
 41        random_values: bool | None = False,
 42        low: int | None = 0,
 43        high: int | None = 1000,
 44        seed: int = 42,
 45    ) -> np.ndarray:
 46
 47        """
 48        Generates dataset based on given parameters
 49        :param n_features: number of generated features
 50        :param n_samples: number of generated samples
 51        :param cardinality: default cardinality of the dataset
 52        :param structure: structure of the dataset
 53        :param ensure_rep: flag, ensures all given values represented
 54        :param random_values: flag, enables random (integer) feature values from set [low, high]
 55        :param low: sets lower bound of random feature values
 56        :param high: sets high bound of random feature values
 57        :param seed: sets seed of numpy random
 58        :return: X, 2D dataset
 59        """
 60
 61        self.dataset_info.update({
 62            'general': {
 63                'n_features': n_features,
 64                'n_samples': n_samples,
 65                'cardinality': cardinality,
 66                'structure': structure,
 67                'ensure_rep': ensure_rep,
 68                'seed': seed,
 69            },
 70        })
 71
 72        np.random.seed(seed)
 73        X = np.empty([n_features, n_samples])
 74
 75        # No specific structure parameter passed
 76        if structure is None:
 77            for i in range(n_features):
 78                x = self._generate_feature(
 79                    n_samples,
 80                    cardinality=cardinality,
 81                    ensure_rep=ensure_rep,
 82                    random_values=random_values,
 83                    low=low,
 84                    high=high,
 85                )
 86                X[i] = x
 87        # Structure parameter passed, building based on structure
 88        else:
 89            ix = 0
 90            for data in structure:
 91
 92                # Data in structure is a tuple of (feature index (integer), feature attributes)
 93                if not isinstance(data[0], (list, np.ndarray)):
 94                    feature_ix, feature_attributes = data
 95
 96                    # Filling out the dataset up to column index feature_ix
 97                    if ix < feature_ix:
 98                        for i in range(ix, feature_ix):
 99                            x = self._generate_feature(
100                                n_samples,
101                                cardinality=cardinality,
102                                ensure_rep=ensure_rep,
103                                random_values=random_values,
104                                low=low,
105                                high=high,
106                            )
107                            X[ix] = x
108                            ix += 1
109
110                    x = self._configure_generate_feature(
111                        feature_attributes,
112                        n_samples,
113                        ensure_rep=ensure_rep,
114                        random_values=random_values,
115                        low=low,
116                        high=high,
117                    )
118                    X[ix] = x
119                    ix += 1
120
121                # Data in structure is a tuple of (list of feature indexes, feature attributes)
122                else:
123                    feature_ixs, feature_attributes = data
124
125                    # Filling out the dataset up to feature_ix
126                    for feature_ix in feature_ixs:
127                        if ix < feature_ix:
128                            for i in range(ix, feature_ix):
129                                x = self._generate_feature(
130                                    n_samples,
131                                    cardinality=cardinality,
132                                    ensure_rep=ensure_rep,
133                                    random_values=random_values,
134                                    low=low,
135                                    high=high,
136                                )
137                                X[ix] = x
138                                ix += 1
139
140                        x = self._configure_generate_feature(
141                            feature_attributes,
142                            n_samples,
143                            ensure_rep=ensure_rep,
144                            random_values=random_values,
145                            low=low,
146                            high=high,
147                        )
148
149                        X[ix] = x
150                        ix += 1
151
152            # Fill out the rest of the dataset
153            if ix < n_features:
154                for i in range(ix, n_features):
155                    x = self._generate_feature(
156                        n_samples,
157                        cardinality=cardinality,
158                        ensure_rep=ensure_rep,
159                        random_values=random_values,
160                        low=low,
161                        high=high,
162                    )
163                    X[i] = x
164
165        return X.T

Generates dataset based on given parameters

Parameters
  • n_features: number of generated features
  • n_samples: number of generated samples
  • cardinality: default cardinality of the dataset
  • structure: structure of the dataset
  • ensure_rep: flag, ensures all given values represented
  • random_values: flag, enables random (integer) feature values from set [low, high]
  • low: sets lower bound of random feature values
  • high: sets high bound of random feature values
  • seed: sets seed of numpy random
Returns

X, 2D dataset

def generate_combinations( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], feature_indices: Union[list[int], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], combination_function: Optional = None, combination_type: Literal['linear', 'nonlinear'] = 'linear') -> numpy.ndarray:
269    def generate_combinations(
270        self,
271        X: ArrayLike,
272        feature_indices: list[int] | ArrayLike,
273        combination_function: Optional = None,
274        combination_type: Literal['linear', 'nonlinear'] = 'linear',
275    ) -> np.ndarray:
276        """
277        Generates linear, nonlinear, or custom combinations within feature vectors in given dataset X
278        :param X: dataset
279        :param feature_indices: indexes of features to be in combination
280        :param combination_function: optional custom function for combining feature vectors
281        :param combination_type: string flag, either liner or nonlinear, defining combination type
282        :return: X with added resultant feature
283        """
284
285        selected_features = X[:, feature_indices]
286
287        if combination_function is None:
288            if combination_type == 'linear':
289                combination_function = lambda x: np.sum(x, axis=1)
290            elif combination_type == 'nonlinear':
291                combination_function = lambda x: np.sin(np.sum(x, axis=1))
292        else:
293            combination_type = str(combination_function.__name__)
294
295        combination_result = combination_function(selected_features)
296
297        combination_ix = len(X[0])
298
299        self.dataset_info['combinations'].append({
300            'feature_indices': feature_indices,
301            'combination_type': combination_type,
302            'combination_ix': combination_ix,
303        })
304
305        return np.column_stack((X, combination_result))

Generates linear, nonlinear, or custom combinations within feature vectors in given dataset X

Parameters
  • X: dataset
  • feature_indices: indexes of features to be in combination
  • combination_function: optional custom function for combining feature vectors
  • combination_type: string flag, either liner or nonlinear, defining combination type
Returns

X with added resultant feature

def generate_correlated( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], feature_indices: Union[list[int], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], r: float = 0.8) -> numpy.ndarray:
352    def generate_correlated(
353        self,
354        X: ArrayLike,
355        feature_indices: list[int] | ArrayLike,
356        r: float = 0.8,
357    ) -> np.ndarray:
358
359        """
360        Generates correlated features using the given feature indices. Correlation is based on cosine of angle between vectors with mean 0.
361        :param X: dataset
362        :param feature_indices: indices of features to generate correlated feature to
363        :param r: (Pearson) correlation factor
364        :return: X with generated correlated  features
365        """
366
367        if not isinstance(feature_indices, (list, np.ndarray)):
368            feature_indices = np.array([feature_indices])
369
370        if len(feature_indices) > 1:
371            correlated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices)), 1)
372        else:
373            correlated_ixs = len(X[0])
374
375        selected_features = X[:, feature_indices]
376        transposed = np.transpose(selected_features)
377        correlated_features = []
378
379        for t in transposed:
380            theta = np.arccos(r)
381            t_standard = (t - np.mean(t)) / (np.std(t) + 1e-10)
382
383            rand = np.random.normal(0, 1, len(t_standard))
384            rand = (rand - np.mean(rand)) / (np.std(rand) + 1e-10)
385
386            M = np.column_stack((t_standard, rand))
387            M_centred = (M - np.mean(M, axis=0))
388
389            Id = np.eye(len(t))
390            Q = qr(M_centred[:, [0]], mode='economic')[0]
391            P = np.dot(Q, Q.T)
392            orthogonal_projection = np.dot(Id - P, M_centred[:, 1])
393            M_orthogonal = np.column_stack((M_centred[:, 0], orthogonal_projection))
394
395            Y = np.dot(M_orthogonal, np.diag(1 / np.sqrt(np.sum(M_orthogonal ** 2, axis=0))))
396            corr = Y[:, 1] + (1 / np.tan(theta)) * Y[:, 0]
397
398            correlated_features.append(corr)
399
400        correlated_features = np.transpose(correlated_features)
401
402        self.dataset_info['correlations'].append({
403            'feature_indices': feature_indices,
404            'correlated_indices': correlated_ixs,
405            'correlation_factor': r,
406        })
407
408        return np.column_stack((X, correlated_features))

Generates correlated features using the given feature indices. Correlation is based on cosine of angle between vectors with mean 0.

Parameters
  • X: dataset
  • feature_indices: indices of features to generate correlated feature to
  • r: (Pearson) correlation factor
Returns

X with generated correlated features

def generate_duplicates( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], feature_indices: Union[list[int], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]]) -> numpy.ndarray:
410    def generate_duplicates(
411        self,
412        X: ArrayLike,
413        feature_indices: list[int] | ArrayLike,
414    ) -> np.ndarray:
415        """
416        Generates duplicate features
417        :param X: dataset
418        :param feature_indices: indices of features to duplicate
419        :return: dataset with duplicated features
420        """
421        if not isinstance(feature_indices, (list, np.ndarray)):
422            feature_indices = np.array([feature_indices])
423
424        duplicated_ixs = np.arange(len(X[0]), (len(X[0]) + len(feature_indices) - 1), 1)
425
426        selected_features = X[:, feature_indices]
427
428        self.dataset_info['duplicates'].append({
429            'feature_indices': feature_indices,
430            'duplicate_indices': duplicated_ixs,
431        })
432
433        return np.column_stack((X, selected_features))

Generates duplicate features

Parameters
  • X: dataset
  • feature_indices: indices of features to duplicate
Returns

dataset with duplicated features

def generate_labels( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], n: int = 2, p: Union[float, list[float], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]] = 0.5, k: int | float = 2, decision_function: Optional = None, class_relation: Literal['linear', 'nonlinear', 'cluster'] = 'linear', balance: bool = False, random_state: int = 42):
435    def generate_labels(
436        self,
437        X: ArrayLike,
438        n: int = 2,
439        p: float | list[float] | ArrayLike = 0.5,
440        k: int | float = 2,
441        decision_function: Optional = None,
442        class_relation: Literal['linear', 'nonlinear', 'cluster'] = 'linear',
443        balance: bool = False,
444        random_state: int = 42,
445    ):
446        """
447        Generates labels for dataset X
448        :param X: dataset
449        :param n: number of class labels
450        :param p: class distribution
451        :param k: constant
452        :param decision_function: optional user-defined decision function
453        :param class_relation: string, either 'linear', 'nonlinear', or 'cluster'
454        :param balance: boolean, whether to balance clustering class labels
455        :param random_state: seed for KMeans clustering, defaults to 42
456        :return: array of labels, corresponding to dataset X
457        """
458
459        if isinstance(p, (list, np.ndarray)):
460            if sum(p) > 1: raise ValueError('sum of values in must be less than 1.0')
461            if len(p) > n: raise ValueError('length of p must equal n')
462
463        if p > 1: raise ValueError('p must be less than 1.0')
464
465        n_samples, n_features = X.shape
466
467        if decision_function is None:
468            if class_relation == 'linear':
469                decision_function = lambda x: np.sum(2 * x + 3, axis=1)
470            elif class_relation == 'nonlinear':
471                decision_function = lambda x: np.sum(k * np.sin(x) + k * np.cos(x), axis=1)
472            elif class_relation == 'cluster':
473                decision_function = None
474        else:
475            class_relation = str(decision_function.__name__)
476
477        y = []
478        if decision_function is not None:
479            if n > 2:
480                if type(p) != list:
481                    p = 1 / n
482                    percentiles = [p * 100]
483                    for i in range(1, n - 1):
484                        percentiles.append(percentiles[i - 1] + (p * 100))
485
486                    decision_boundary = decision_function(X)
487                    p_points = np.percentile(decision_boundary, percentiles)
488
489                    y = np.zeros_like(decision_boundary, dtype=int)
490                    for p_point in p_points:
491                        y += (decision_boundary > p_point)
492                else:
493                    decision_boundary = decision_function(X)
494                    percentiles = [x * 100 for x in p]
495
496                    for i in range(1, len(percentiles) - 1):
497                        percentiles[i] += percentiles[i - 1]
498
499                    percentiles.insert(0, 0)
500                    percentiles.pop()
501                    print(percentiles)
502
503                    p_points = np.percentile(decision_boundary, percentiles)
504                    print(p_points)
505
506                    y = np.zeros_like(decision_boundary, dtype=int)
507                    for i in range(1, n):
508                        p_point = p_points[i]
509                        for j in range(len(decision_boundary)):
510                            if decision_boundary[j] > p_point:
511                                y[j] += 1
512            else:
513                decision_boundary = decision_function(X)
514                p_point = np.percentile(decision_boundary, p * 100)
515                y = np.where(decision_boundary > p_point, 1, 0)
516        else:
517            if p == 0.5:
518                p = 1.0
519            else:
520                p = [p, 1 - p]
521            y = self._cluster_data(X, n, p=p, balance=balance, random_state=random_state)
522
523        self.dataset_info.update({
524            'labels': {
525                'class_relation': class_relation,
526                'n_class': n,
527            },
528        })
529
530        return y

Generates labels for dataset X

Parameters
  • X: dataset
  • n: number of class labels
  • p: class distribution
  • k: constant
  • decision_function: optional user-defined decision function
  • class_relation: string, either 'linear', 'nonlinear', or 'cluster'
  • balance: boolean, whether to balance clustering class labels
  • random_state: seed for KMeans clustering, defaults to 42
Returns

array of labels, corresponding to dataset X

def generate_noise( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], y: Union[list[int], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], p: float = 0.2, type: Literal['categorical', 'missing'] = 'categorical', missing_val: str | int | float = -inf) -> numpy.ndarray:
632    def generate_noise(
633        self,
634        X: ArrayLike,
635        y: list[int] | ArrayLike,
636        p: float = 0.2,
637        type: Literal['categorical', 'missing'] = 'categorical',
638        missing_val: str | int | float = float('-inf'),
639    ) -> np.ndarray:
640
641        """
642        Simulates noise on given dataset X
643        :param X: dataset to apply noise to
644        :param y: required target labels for categorical noise generation
645        :param p: amount of noise to apply. Defaults to 0.2
646        :param type: type of noise to apply, either categorical or missing
647        :param missing_val: value to simulate missing values. Defaults to float('-inf')
648        :return: X with noise applied
649        """
650
651        self.dataset_info['noise'].append({
652            'type': type,
653            'amount': p,
654        })
655
656        if type == 'categorical':
657            label_values, label_count = np.unique(y, return_counts=True)
658            n_labels = len(label_values)
659
660            inds = y.argsort()
661            y_sort = y[inds]
662            X_sort = X[inds]
663
664            Xs_T = X_sort.T
665            n = Xs_T.shape[1]
666            n_flip = int(n * p)
667
668            for feature in Xs_T:
669                unique_per_label = {}
670
671                for i in range(n_labels):
672                    if i == 0:
673                        unique = np.unique(feature[:label_count[i]])
674                        unique_per_label[label_values[i]] = set(unique)
675                    else:
676                        unique = np.unique(feature[label_count[i - 1]:label_count[i - 1] + label_count[i] - 1])
677                        unique_per_label[label_values[i]] = set(unique)
678
679                ixs = np.random.choice(n, n_flip, replace=False)
680
681                for ix in ixs:
682                    current_label = y_sort[ix]
683                    possible_labels = np.where(label_values != current_label)[0]
684
685                    # find all unique values from labels != current label
686                    values = set()
687                    for key in possible_labels:
688                        values = values.union(unique_per_label[key])
689
690                    # remove any overlapping values, ensuring replacement values are unique & from a target label !=
691                    # current label
692                    for val in unique_per_label[current_label] & values:
693                        values.remove(val)
694
695                    if len(values) > 0:
696                        val = np.random.choice(list(values))
697
698                    else:
699                        key = possible_labels[np.random.randint(len(possible_labels))]
700                        values = unique_per_label[key]
701                        val = np.random.choice(list(values))
702
703                    feature[ix] = val
704
705            rev_ind = inds.argsort()
706            X_noise = Xs_T.T
707            X_noise = X_noise[rev_ind]
708
709            return X_noise
710
711        elif type == 'missing':
712            X_noise = np.copy(X)
713            Xn_T = X_noise.T
714            n = Xn_T.shape[1]
715            n_missing = int(n * p)
716            #print("n to delete:", n_missing)
717
718            for feature in Xn_T:
719                ixs = np.random.choice(n, n_missing, replace=False)
720
721                for ix in ixs:
722                    feature[ix] = missing_val
723
724            return Xn_T.T
725
726        else:
727            raise ValueError(f'Type {type} not supported')

Simulates noise on given dataset X

Parameters
  • X: dataset to apply noise to
  • y: required target labels for categorical noise generation
  • p: amount of noise to apply. Defaults to 0.2
  • type: type of noise to apply, either categorical or missing
  • missing_val: value to simulate missing values. Defaults to float('-inf')
Returns

X with noise applied

def downsample_dataset( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], y: Union[list[int], numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], N: int | None = None, seed: int = 42, reshuffle: bool = False) -> tuple[numpy.ndarray, numpy.ndarray]:
729    def downsample_dataset(
730        self,
731        X: ArrayLike,
732        y: list[int] | ArrayLike,
733        N: int | None = None,
734        seed: int = 42,
735        reshuffle: bool = False,
736    ) -> tuple[np.ndarray, np.ndarray]:
737
738        """
739        Downsamples dataset X according to N or the number of samples in minority class, resulting in a balanced dataset.
740        :param X: Dataset to downsample
741        :param y: Labels corresponding to X
742        :param N: Optional number of samples per class to downsample to
743        :param seed: Seed for random state of resample function
744        :param reshuffle: Reshuffle the dataset after downsampling
745        :return: Balanced X and y after downsampling
746        """
747
748        original_shape = X.shape
749
750        values, counts = np.unique(y, return_counts=True)
751        if N is None:
752            N = min(counts)
753
754        if N > min(counts):
755            raise ValueError('N must be equal to or less than the number of samples in minority class')
756
757        X_arrays_list = []
758        y_downsampled = []
759        for label in values:
760            X_label = [X[i] for i in range(len(y)) if y[i] == label]
761            X_label_downsample = resample(
762                X_label,
763                replace=True,
764                n_samples=N,
765                random_state=seed,
766            )
767            X_arrays_list.append(X_label_downsample)
768            ys = [label] * N
769            y_downsampled = np.concatenate((y_downsampled, ys), axis=0)
770
771        X_downsampled = np.concatenate(X_arrays_list, axis=0)
772
773        if reshuffle:
774            indices = np.arange(len(X_downsampled))
775            np.random.shuffle(indices)
776            X_downsampled = X_downsampled[indices]
777            y_downsampled = y_downsampled[indices]
778
779        downsampled_shape = X_downsampled.shape
780
781        self.dataset_info.update({
782            'downsampling': {
783                'original_shape': original_shape,
784                'downsampled_shape': downsampled_shape,
785            },
786        })
787
788        return X_downsampled, y_downsampled

Downsamples dataset X according to N or the number of samples in minority class, resulting in a balanced dataset.

Parameters
  • X: Dataset to downsample
  • y: Labels corresponding to X
  • N: Optional number of samples per class to downsample to
  • seed: Seed for random state of resample function
  • reshuffle: Reshuffle the dataset after downsampling
Returns

Balanced X and y after downsampling

def print_dataset( self, X: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]], y: Union[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]], numpy._typing._nested_sequence._NestedSequence[numpy._typing._array_like._SupportsArray[numpy.dtype[Any]]], bool, int, float, complex, str, bytes, numpy._typing._nested_sequence._NestedSequence[Union[bool, int, float, complex, str, bytes]]]):
790    def print_dataset(
791        self,
792        X: ArrayLike,
793        y: ArrayLike,
794    ):
795        """
796        Prints given dataset
797        :param X: dataset
798        :param y: labels
799        :return:
800        """
801
802        n_samples, n_features = X.shape
803        n = 0
804        for arr in X:
805            print('[', end='')
806            for i in range(n_features):
807                if i == n_features - 1:
808                    print(arr[i], end='')
809                else:
810                    print(arr[i], end=', ')
811            print(f'], Label: {y[n]}')
812            n += 1

Prints given dataset

Parameters
  • X: dataset
  • y: labels
Returns