Skip to content

Drift

mercury.monitoring.drift

auto_encoder_drift_detector

AutoEncoderDriftDetector(p_val=0.05, bootstrap_samples=30, bootstrap_sample_size=10000, autoencoder_kwargs=None, custom_reconstruction_error='mse', fitted_models=None, reference_errors=None)

This class uses several autoencoders for testing whether the joint distributions of two datasets are the same. In order to do this, it makes bootstrap samples from a source dataset (X_src), trains an autoencoder on each one of them (using training/test splits) and it stores the distribution of reconstruction errors (i.e. the error distribution reconstructing the source dataset). The default reconstruction error is the MSE, although you can specify your own.

Then, when target data is available (X_target), it calculates the error distribution on the new environment and compares it with the one obtained on X_src by using a Mann-Withney-U statistical test under the null hypothesis (H0) that no drift exists (that is, the two error samples come from the same distribution).

Parameters:

Name Type Description Default
p_val float

Significance level for rejecting the null hypothesis. Default value is 0.05.

0.05
bootstrap_samples int

How many bootstrap samples to make

30
bootstrap_sample_size int

The size of each bootstrap sample.

10000
autoencoder_kwargs dict

Dictionary with arguments for training the internal n autoencoders.

None
custom_reconstruction_error callable or string

Custom function for measuring reconstruction quality. This is only used for the drift calculation, not for training the autoencoders (use autoencoder_kwargs for that). This can be a string in (mse, rmse, mae), meaning (Mean Squared Error, Root Mean Squared Error, Mean Absolute Loss error). Alternatively, you can also provide a custom function with the form custom_fn(ytrue, ypred) that returns the reconstruction error without reductions. That is, a matrix with the same shape as ytrue/ypred. Default is Mean squared error.

'mse'
fitted_models list

List with trained autoencoders (tf.keras.Model). Use it only if you want to "reload" an already trained detector.

None
reference_errors List[float]

List with the errors obtained at the time of training the detector. Use it only if you want to "reload" an already trained detector.

None
Example
>>> from mercury.monitoring.drift.auto_encoder_drift_detector import AutoEncoderDriftDetector
# Create example datasets
>>> X_src = np.array([np.random.normal(0, 1, 1000), np.random.normal(0, 1, 1000)]).T
>>> X_target = np.array([np.random.normal(0, 1, 500) + 2, np.random.normal(0, 1, 500)]).T
# Train the detector on the source dataset
>>> drift_detector = AutoEncoderDriftDetector()
>>> drift_detector = drift_detector.fit(X_src)
# Use the detector for checking whether drift exists on X_target
>>> drift_detector.calculate_drift(X_target)
"{'pval': 7.645836962858183e-06, 'drift': True}"
Source code in mercury/monitoring/drift/auto_encoder_drift_detector.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __init__(self,
             p_val: float = 0.05,
             bootstrap_samples: int = 30,
             bootstrap_sample_size: int = 10_000,
             autoencoder_kwargs: dict = None,
             custom_reconstruction_error: Union[str, Callable[[np.ndarray, np.ndarray], np.ndarray]] = 'mse',
             fitted_models: list = None,
             reference_errors: List[float] = None):

    self.bootstrap_samples = bootstrap_samples
    self.bootstrap_sample_size = bootstrap_sample_size
    self.pval = p_val
    self.ae_args = autoencoder_kwargs if autoencoder_kwargs else dict()
    self.reconstruction_metric_fn = self._get_reconstruction_err_fn(custom_reconstruction_error)

    # Support case when user "reloads" the detector
    if fitted_models is not None:
        if reference_errors is None or len(reference_errors) != len(fitted_models):
            raise ValueError("""If you pass a list of recovered fitted models you must also provide a list of """
                             """reference_errors of the same length as `fitted_models`.""")
        self.fitted_models = fitted_models
        self.reference_distribution = reference_errors
    if reference_errors is not None and fitted_models is None:
        raise ValueError("For reloading the detector you must pass both `reference_errors` and `fitted_models`.")

    if not isinstance(self.ae_args, dict):
        raise ValueError("Error. autoencoer_kwargs must be a dictionary.")

    self._init_autoencoder_params()
is_fitted: bool property

Whether the detector has been fitted and it's ready for calculating drift

calculate_drift(X, return_errors=False)

Checks whether drift exists on a given dataset using X_src as reference:

Parameters:

Name Type Description Default
X np.ndarray

the target dataset.

required
return_errors bool

Whether to include the reconstruction errors in the result.

False

Returns:

Type Description
dict

Dictionary containing the result of the test and, optionally, the reference

dict

and target reconstruction errors.

Source code in mercury/monitoring/drift/auto_encoder_drift_detector.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def calculate_drift(self, X: np.ndarray, return_errors: bool = False) -> dict:
    """ Checks whether drift exists on a given dataset using X_src as reference:

    Args:
        X: the target dataset.
        return_errors: Whether to include the reconstruction errors in the result.

    Returns:
        Dictionary containing the result of the test and, optionally, the reference
        and target reconstruction errors.

    """
    if not self.is_fitted:
        raise RuntimeError("Error. This detector han't been trained yet. Call `fit` first.")

    self.target_distribution = self._get_real_error_distr(X)

    test = mannwhitneyu(self.mse_reference['mses'], self.mse_target['mses'], alternative='less')

    result = {
        'pval': test.pvalue,
        'drift': test.pvalue < self.pval
    }

    if return_errors:
        result['source_errors'] = self.mse_reference['mses']
        result['target_errors'] = self.mse_target['mses']

    return result
fit(X_src)

Trains the detector using a source (reference) dataset.

Parameters:

Name Type Description Default
X_src np.ndarray

the reference dataset.

required

Returns:

Type Description
AutoEncoderDriftDetector

the detector itself.

Source code in mercury/monitoring/drift/auto_encoder_drift_detector.py
197
198
199
200
201
202
203
204
205
206
207
def fit(self, X_src: np.ndarray) -> "AutoEncoderDriftDetector":
    """ Trains the detector using a source (reference) dataset.

    Args:
        X_src: the reference dataset.

    Returns:
        the detector itself.
    """
    self.reference_distribution = self._get_simulation_error_distr(X_src)
    return self

base

BaseBatchDriftDetector(X_src=None, X_target=None, distr_src=None, distr_target=None, features=None, correction='bonferroni', p_val=0.05)

Bases: ABC

Base class for batch drift detection. The class can be extended to create a method for batch drift detection over a source dataset X_src and target dataset X_target, or a source dataset distributions distr_src and a target dataset distributions distr_target The main method to implement is the calculate_drift() method.

Source code in mercury/monitoring/drift/base.py
17
18
19
20
21
22
23
24
25
26
27
def __init__(
    self, X_src=None, X_target=None, distr_src=None, distr_target=None, features=None, correction="bonferroni", p_val=0.05
):
    self.X_src = X_src
    self.X_target = X_target
    self.distr_src = distr_src
    self.distr_target = distr_target
    self.features = features
    self.correction = correction
    self.p_val = p_val
    self.drift_metrics = None
calculate_drift() abstractmethod

Method to implement in extended classes

Source code in mercury/monitoring/drift/base.py
82
83
84
85
86
87
@abstractmethod
def calculate_drift(self):
    """
    Method to implement in extended classes
    """
    raise NotImplementedError()
get_drifted_features(return_as_indices=False)

Gets the features that are considered to have drift. The method calculate_drift needs to be called first. The parameter return_as_indices controls whether to return the features as indices or names.

Parameters:

Name Type Description Default
return_as_indices boolean

whether to return the features as indices or names.

False

Returns:

Type Description
list

features with drift.

Source code in mercury/monitoring/drift/base.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def get_drifted_features(self, return_as_indices=False):
    """
    Gets the features that are considered to have drift. The method `calculate_drift` needs to be called first.
    The parameter `return_as_indices` controls whether to return the features as indices or names.

    Args:
        return_as_indices (boolean): whether to return the features as indices or names.

    Returns:
        (list): features with drift.
    """
    if "p_vals" in self.drift_metrics.keys() and "threshold" in self.drift_metrics.keys():
        drifted_features = []
        for i in range(len(self.drift_metrics["scores"])):
            if self.drift_metrics["p_vals"][i] < self.drift_metrics["threshold"]:
                drifted_features.append(i)
    else:
        raise ValueError(
            "individual p-values need to be calculated to obtain features features. "
            "Call calculate_drit() first or use a drift method that calculates p-values per feature.")

    if return_as_indices:
        return drifted_features
    else:
        if self.features is None:
            raise ValueError("Feature indices are not setted")
        else:
            return [self.features[idx] for idx in drifted_features]
plot_distribution_drifted_features(figsize=(8, 4))

Plots the distributions of the features that are considered to have drift. The method calculate_drift needs to be called first.

Parameters:

Name Type Description Default
figsize tuple

figsize to use if ax is not specified.

(8, 4)
Source code in mercury/monitoring/drift/base.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def plot_distribution_drifted_features(self, figsize=(8, 4)):
    """
    Plots the distributions of the features that are considered to have drift. The method `calculate_drift`
    needs to be called first.

    Args:
        figsize (tuple):
            figsize to use if `ax` is not specified.

    """

    drifted_features = self.get_drifted_features(return_as_indices=True)
    for idx in drifted_features:
        ax = self.plot_distribution_feature(idx_feature=idx, figsize=figsize)
        if self.features:
            ax.set_title(self.features[idx])
        else:
            ax.set_title(idx)
plot_distribution_feature(idx_feature=None, name_feature=None, ax=None, figsize=(8, 4))

Plots the distribution of a given feature for the source and the target datasets. The feature can be indicated by the index using the idx_feature parameter, or by the name using the name_feature parameter (only possible if attribute features was specified.

Parameters:

Name Type Description Default
idx_feature int

index of the feature

None
name_feature string

name of the feature

None
ax matplotlib.axes._subplots.AxesSubplot

Axes object on which the data will be plotted. If not specified it creates one.

None
figsize tuple

figsize to use if ax is not specified.

(8, 4)

Returns:

Type Description

The axes

Source code in mercury/monitoring/drift/base.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def plot_distribution_feature(self, idx_feature=None, name_feature=None, ax=None, figsize=(8, 4)):
    """
    Plots the distribution of a given feature for the source and the target datasets.
    The feature can be indicated by the index using the `idx_feature` parameter, or by the name using the
    `name_feature` parameter (only possible if attribute `features` was specified.
    Args:
        idx_feature (int):
            index of the feature
        name_feature (string):
            name of the feature
        ax (matplotlib.axes._subplots.AxesSubplot):
            Axes object on which the data will be plotted. If not specified it creates one.
        figsize (tuple):
            figsize to use if `ax` is not specified.
    Returns:
        The axes
    """
    self._check_datasets()
    idx_feature = self._get_index_feature(idx_feature, name_feature)
    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=figsize)

    palette = itertools.cycle(sns.color_palette())
    axes = sns.histplot(self.X_src[:, idx_feature], kde=True, color=next(palette), ax=ax, label="dataset source")
    axes = sns.histplot(self.X_target[:, idx_feature], kde=True, color=next(palette), ax=ax, label="dataset target")
    axes.legend()
    return axes
plot_feature_drift_scores(top_k=None, ax=None, figsize=(8, 4))

Plots a summary of the scores for each feature. The meaning of the score depends on the subclass that is used.

Parameters:

Name Type Description Default
top_k int

If specified, plots only the top_k features with higher scores.

None
ax matplotlib.axes._subplots.AxesSubplot

Axes object on which the data will be plotted. If not specified it creates one.

None
figsize tuple

Size of the plotted figure

(8, 4)
Source code in mercury/monitoring/drift/base.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def plot_feature_drift_scores(self, top_k=None, ax=None, figsize=(8, 4)):
    """
    Plots a summary of the scores for each feature. The meaning of the score depends on the subclass that is used.

    Args:
        top_k (int):
            If specified, plots only the top_k features with higher scores.
        ax (matplotlib.axes._subplots.AxesSubplot):
            Axes object on which the data will be plotted. If not specified it creates one.
        figsize (tuple):
            Size of the plotted figure
    """
    if self.drift_metrics is None:
        raise ValueError("Drift must be calculated first to calculate feature drift scores")

    if "scores" not in self.drift_metrics.keys():
        raise ValueError("This drift methods doesn't have the drift scores calculated yet")

    if self.features is None:
        # if feature names are not given, we use the indices
        features = list(range(len(self.drift_metrics["scores"])))
    else:
        features = self.features

    # Sort features
    sorted_pairs = sorted(zip(self.drift_metrics["scores"], features))
    sorted_importances, sorted_features = [list(tuple) for tuple in zip(*sorted_pairs)]

    # Get top k if specified
    if top_k:
        sorted_features = sorted_features[-top_k:]
        sorted_importances = sorted_importances[-top_k:]

    # Create plot
    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=figsize)
    ax.barh([f for f in sorted_features], [imp for imp in sorted_importances])
    ax.set_title("feature drift scores")

    return ax
plot_histograms_drifted_features(figsize=(8, 4))

Plots the histograms of the features that are considered to have drift. The method calculate_drift needs to be called first.

Parameters:

Name Type Description Default
figsize tuple

figsize to use if ax is not specified.

(8, 4)
Source code in mercury/monitoring/drift/base.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def plot_histograms_drifted_features(self, figsize=(8, 4)):
    """
    Plots the histograms of the features that are considered to have drift. The method `calculate_drift`
    needs to be called first.

    Args:
        figsize (tuple):
            figsize to use if `ax` is not specified.

    """

    drifted_features = self.get_drifted_features(return_as_indices=True)
    for idx in drifted_features:
        axes = self.plot_histograms_feature(idx_feature=idx, figsize=figsize)
        if self.features:
            axes[0].set_title("histogram source " + self.features[idx])
            axes[1].set_title("histogram target " + self.features[idx])
        else:
            axes[0].set_title("histogram source " + str(idx))
            axes[1].set_title("histogram target " + str(idx))
plot_histograms_feature(idx_feature=None, name_feature=None, figsize=(8, 4))

Plots the histograms of a given feature for the source and the target distributions. The feature can be indicated by the index using the idx_feature parameter, or by the name using the name_feature parameter (only possible if attribute features was specified.

Parameters:

Name Type Description Default
idx_feature int

index of the feature

None
name_feature string

name of the feature

None
figsize tuple

figsize to use if ax is not specified.

(8, 4)

Returns:

Type Description

The axes

Source code in mercury/monitoring/drift/base.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def plot_histograms_feature(self, idx_feature=None, name_feature=None, figsize=(8, 4)):
    """
    Plots the histograms of a given feature for the source and the target distributions.
    The feature can be indicated by the index using the `idx_feature` parameter, or by the name using the
    `name_feature` parameter (only possible if attribute `features` was specified.
    Args:
        idx_feature (int):
            index of the feature
        name_feature (string):
            name of the feature
        figsize (tuple):
            figsize to use if `ax` is not specified.
    Returns:
        The axes
    """

    self._check_distributions()
    idx_feature = self._get_index_feature(idx_feature, name_feature)
    # Plot distributions using histograms in different plots
    fig, axes = plt.subplots(1, 2, figsize=figsize, constrained_layout=False)

    axes[0].bar(range(len(self.distr_src[idx_feature])), self.distr_src[idx_feature], width=0.8)
    axes[1].bar(range(len(self.distr_target[idx_feature])), self.distr_target[idx_feature], width=0.8)
    axes[0].set_title("histogram source")
    axes[1].set_title("histogram target")
    return axes

chi2_drift_detector

Chi2Drift(distr_src=None, distr_target=None, features=None, correction='bonferroni', p_val=0.05)

Bases: BaseBatchDriftDetector

The Chi2Drift class allows us to detect drift in the source dataset distributions distr_src and the target dataset distributions distr_target by performing a chi-square test of independence of variables in a contingency table for each feature. It does it separately for each feature, calculating the statistic and the p-value. The source and target distributions are specified when creating the object. Then, the method calculate_drift() is used to obtain a dict of metrics.

Parameters:

Name Type Description Default
distr_src list of np.arrays

The source dataset distributions. It's a list of np.arrays, where each np.array represents the counts of each bin for that feature. The features and its bins must be in the same order than the distr_target

None
distr_target list of np.arrays

The target dataset distributions. It's a list of np.arrays, where each np.array represents the counts of each bin for that feature. The features and its bins must be in the same order than the distr_src

None
features list

The name of the features of X_src and X_target.

None
p_val float

Threshold to be used with the p-value obtained to decide if there is drift or not (affects drift_detected metric when calling calculate_drift()) Lower values will be more conservative when indicating that drift exists in the dataset. Default value is 0.05.

0.05
correction string

Since multiple tests are performed, this parameter controls whether to perform a correction of the p-value or not (affects drift_detected metric when calling calculate_drift()). If None, it doesn't perform a correction Default value is "bonferroni", which makes the bonferroni correction taking into account the number of tests

'bonferroni'
Example
>>> from mercury.monitoring.drift.chi2_drift_detector import Chi2Drift
# Create example distributions
>>> histograms_src = [np.array([5, 5, 5]), np.array([3, 6, 9])]
>>> histograms_target = [np.array([4, 1, 8]), np.array([20, 3, 2])]
# Calculate drift
>>> drift_detector = Chi2Drift(
...     distr_src=histograms_src, distr_target=histograms_target, features=["f1", "f2"],
...     correction="bonferroni", p_val=0.01)
>>> drift_metrics = drift_detector.calculate_drift()
# Print the metrics
>>> print("Drift Score: ", drift_metrics["score"])
>>> print("Is drift detected? ", drift_metrics["drift_detected"])
# Plot feature drift scores
>>> ax = drift_detector.plot_feature_drift_scores(top_k=7)
# Get drifted features
>>> print(drift_detector.get_drifted_features())
# Plot histograms of drifted features
>>> drift_detector.plot_histograms_drifted_features()
Source code in mercury/monitoring/drift/chi2_drift_detector.py
59
60
61
62
def __init__(self, distr_src=None, distr_target=None, features=None, correction="bonferroni", p_val=0.05):
    super().__init__(
        distr_src=distr_src, distr_target=distr_target, features=features, correction=correction, p_val=p_val
    )
calculate_drift()

Computes drift by performing a chi-square test of independence between the source dataset distributions distr_src and the target dataset distributions distr_target. It performs the test individually for each feature and obtains the p-values and the value of the statistic for each. The returned dictionary contains the next metrics:

  • p_vals: np.array of the p-values returned by the Chi2 test for each feature.
  • scores: np.array of the chi2 statistics for each feature.
  • score: average of all the scores.
  • drift_detected: final decision if drift is detected. If any of the p-values of the features is lower than the threshold, then this is set to 1. Otherwise is 0. This will depend on the specified p_val and correction values.
  • threshold: final threshold applied to decide if drift is detected. It depends on the specified p_val and correction values.

Returns:

Type Description
dict

Dictionary with the drift metrics as specified.

Source code in mercury/monitoring/drift/chi2_drift_detector.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def calculate_drift(self):
    """
    Computes drift by performing a chi-square test of independence between the source dataset distributions
    `distr_src` and the target dataset distributions `distr_target`. It performs the test individually for each
    feature and obtains the p-values and the value of the statistic for each.
    The returned dictionary contains the next metrics:

    - p_vals: np.array of the p-values returned by the Chi2 test for each feature.
    - scores: np.array of the chi2 statistics for each feature.
    - score: average of all the scores.
    - drift_detected: final decision if drift is detected. If any of the p-values of the features is lower than
        the threshold, then this is set to 1. Otherwise is 0. This will depend on the specified `p_val` and
        `correction` values.
    - threshold: final threshold applied to decide if drift is detected. It depends on the specified
        `p_val` and `correction` values.

    Returns:
        (dict): Dictionary with the drift metrics as specified.
    """

    p_vals_test, distances = self._calculate_chi2()
    drift_pred, threshold = self._apply_correction(p_vals_test)
    self.drift_metrics = {
        "p_vals": p_vals_test,
        "score": np.mean(distances),
        "scores": distances,
        "drift_detected": drift_pred,
        "threshold": threshold
    }
    return self.drift_metrics

density_drift_detector

DensityDriftDetector(embedding_dim=2, vae=None, encoder=None, decoder=None)

This detector calculates the "probability" of a particular sample of being an anomaly (or presenting drift) with respect to a source reference dataset.

This detector works using a VAE for building embeddings of the input data. Then, with those source embeddings, the density is estimated in such a way that zones in the embeddings space with lots of samples have high density whereas zones with low density represent "less" probable samples.

After having trained the detector it can be used for checking whether a particular (new) sample is an anomaly.

Parameters:

Name Type Description Default
embedding_dim int

embedding dimensionality for the VAE (if no vae provided)

2
vae Model

Optional VAE architecture

None
encoder Model

encoder part from vae

None
decoder Model

decoder part from vae

None
Example
>>> from mercury.monitoring.drift.density_drift_detector import DensityDriftDetector
>>> detector = DensityDriftDetector().fit(X_source)
Source code in mercury/monitoring/drift/density_drift_detector.py
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    embedding_dim: int = 2,
    vae: "tf.keras.Model" = None,  # noqa: F821
    encoder: "tf.keras.Model" = None,  # noqa: F821
    decoder: "tf.keras.Model" = None  # noqa: F821
):

    if encoder or decoder or vae and all(i is None for i in [vae, encoder, decoder]):
        raise ValueError("All of vae, encoder and decoder must be defined.")

    self.vae, self.encoder, self.decoder = vae, encoder, decoder
    self.embedding_dim = encoder.outputs[0].shape[-1] if self.vae else embedding_dim
explain(x, ref_point=None)

Explains what features have to change in a certain sample (or batch of samples) x so it becomes a ref_point in the embedding space. You can use this mehod when you have "anomalies" and what to check what feeatures need to change for the point to be an inlier.

If ref_point is None, the considered reference point will be the point in the embedding space with highest density (i.e. the most normal one).

The returned explanation will be an array with dim(explanation) = dim(x) in which each item points the direction and amount each feature needs to change (also known as delta).

Parameters:

Name Type Description Default
x np.ndarray

datapoint (or batch of datapoints) to explain

required
ref_point Optional[np.ndarray]

Reference point coords in the embedding space

None

Returns:

Type Description
np.ndarray

numpy array(s) with explanation(s) per item.

Source code in mercury/monitoring/drift/density_drift_detector.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def explain(self, x: np.ndarray, ref_point: Optional[np.ndarray] = None) -> np.ndarray:
    """
    Explains what features have to change in a certain sample (or batch of samples)
    x so it becomes a `ref_point` in the embedding space. You can use this mehod when
    you have "anomalies" and what to check what feeatures need to change for the point
    to be an inlier.

    If `ref_point` is None, the considered reference point will be the point in the
    embedding space with highest density (i.e. the most normal one).

    The returned explanation will be an array with dim(explanation) = dim(x) in which
    each item points the direction and amount each feature needs to change (also known
    as delta).

    Args:
        x: datapoint (or batch of datapoints) to explain
        ref_point: Reference point coords in the embedding space

    Returns:
        numpy array(s) with explanation(s) per item.
    """
    reference = self.max_density_point if ref_point is None else ref_point
    reconstructed_reference = self.decoder(reference[np.newaxis, ...])
    delta = reconstructed_reference - x
    return delta.numpy()
fit(source, epochs=None, batch_size=128)

Fits the model

Parameters:

Name Type Description Default
source Union[ndarray, DataFrame]

Reference dataset

required
epochs int

Number of epochs to train the model. If None, early stopping will be used.

None
batch_size int

batch_size

128

Returns:

Type Description

self, detector trained

Source code in mercury/monitoring/drift/density_drift_detector.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def fit(self, source: Union["np.ndarray", "pd.DataFrame"],
        epochs: int = None, batch_size: int = 128):
    """
    Fits the model

    Args:
        source: Reference dataset
        epochs: Number of epochs to train the model. If None, early stopping will be used.
        batch_size: batch_size

    Returns:
        self, detector trained
    """
    import tensorflow as tf

    callbacks = []
    if epochs is None:
        epochs = 100
        callbacks.append(tf.keras.callbacks.EarlyStopping(
            monitor='loss',
            min_delta=0.1,
            patience=5,
        ))

    if not self.vae:
        self.vae, self.encoder, self.decoder = self._build_vae(source.shape[-1], self.embedding_dim)

    x = source.values if isinstance(source, pd.DataFrame) else source

    hist = self.vae.fit(x, epochs=epochs, batch_size=batch_size, callbacks=callbacks)
    self.hist_ = hist

    # fit gaussian kde to embeddings
    _, _, embeddings_nodrift = self.encoder(x)
    embeddings_nodrift = embeddings_nodrift.numpy()

    # TODO: Possible future idea -> For more complicated spaces, try IFs / OCSVM
    self.kde = gaussian_kde(embeddings_nodrift.T, 'silverman')

    # Train a surroggate for replacing the kde as it can be slow for inference (+ it
    # stores train datapoints)
    self.surrogate = self._build_surrogate(inp_dim=self.kde.covariance.shape[0])
    self._fit_surrogate()

    # At this point we could actually delete self.kde
    # del(self.kde)

    return self
predict(X)

Predicts the "probability" of each sample in X being an inlier. That is, its similarity to the reference points seen during training.

Parameters:

Name Type Description Default
X np.ndarray

dataset

required

Returns:

Type Description
np.ndarray

numpy array with densities

Source code in mercury/monitoring/drift/density_drift_detector.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def predict(self, X: np.ndarray) -> np.ndarray:
    """
    Predicts the "probability" of each sample in X being an inlier. That is, its similarity
    to the reference points seen during training.

    Args:
        X: dataset

    Returns:
        numpy array with densities
    """

    embeddings = self.predict_embeddings(X)
    densities = self.surrogate(embeddings).numpy()
    return densities
predict_embeddings(target)

Gets the embeddings predicted by the VAE's encoder.

Parameters:

Name Type Description Default
target

dataset

required

Returns:

Type Description
np.ndarray

numpy array with embeddings

Source code in mercury/monitoring/drift/density_drift_detector.py
139
140
141
142
143
144
145
146
147
148
149
150
151
def predict_embeddings(self, target) -> np.ndarray:
    """
    Gets the embeddings predicted by the VAE's encoder.

    Args:
        target: dataset

    Returns:
        numpy array with embeddings
    """
    x = target.values if isinstance(target, pd.DataFrame) else target
    _, _, embeddings = self.encoder(x)
    return embeddings.numpy()

domain_classifier_drift_detector

DomainClassifierDrift(X_src=None, X_target=None, features=None, p_val=0.05, test_size=0.3, n_runs=10, alpha=0.025, **kwargs)

Bases: BaseBatchDriftDetector

This class is a drift detector that allows us to train a classifier (Random Forest) to distinguish between a source dataset X_src and a target dataset X_target. The source and target datasets are specified when creating the object together with other available parameters. Then the method calculate_drift() is used to obtain a dict of metrics.

Parameters:

Name Type Description Default
X_src np.array

The source dataset.

None
X_target np.array

The target dataset.

None
features list

The name of the features of X_src and X_target.

None
p_val float

Threshold to be used with the p-value obtained to decide if there is drift or not (affects drift_detected metric when calling calculate_drift()) Lower values will be more conservative when indicating that drift exists in the dataset. Default value is 0.05.

0.05
test_size float

When training the domain classifier, this parameter indicates the size of the test set of the domain classifier. Since the metric drift_score is the auc of this test size, the drift_score won't be very accurate if the test_size is too small. Default value is 0.3

0.3
n_runs int

This is the number of times that the domain classifier is trained with different splits. The returned metrics correspond to the averages of different runs. A higher number of runs will produce more accurate metrics, but it will be computationally more expensive.

10
alpha float

This parameter impacts directly to the false drift detection rate. Concretely, it specifies how much better the AUC of the domain classifier has to be than 0.5 (as a random classifier) to consider that successfully distinguishes samples from the source dataset and target dataset. Increasing the value will be more conservative when detecting drift, so it will lower the false positive detection rate. However, higher values also can miss true drift detection. Default value is 0.025 which in general a good trade-off.

0.025
**kwargs

The rest of the parameters will be used in the Random forest trained as domain classifier. If not specified, then the default parameters will be used.

{}
Example
>>> from mercury.monitoring.drift.domain_classifier_drift import DomainClassifierDrift
# Create example datasets
>>> X_src = np.array([np.random.normal(0, 1, 1000), np.random.normal(0, 1, 1000)]).T
>>> X_target = np.array([np.random.normal(0, 1, 500) + 2, np.random.normal(0, 1, 500)]).T
# Calculate drift
>>> drift_detector = DomainClassifierDrift(
...     X_src, X_target, features=["f1", "f2"], p_val=0.01, test_size=0.3, n_runs=10)
>>> drift_metrics = drift_detector.calculate_drift()
# Print the metrics
>>> print("Drift Score: ", drift_metrics["score"])
>>> print("p_val: ", drift_metrics["p_val"])
>>> print("Is drift detected? ", drift_metrics["drift_detected"])
# Get drift scores per feature
>>> drift_detector.plot_feature_drift_scores(figsize=(8,4))
Source code in mercury/monitoring/drift/domain_classifier_drift_detector.py
67
68
69
70
71
72
73
def __init__(self, X_src=None, X_target=None, features=None, p_val=0.05, test_size=0.3, n_runs=10,
             alpha=0.025, **kwargs):
    super().__init__(X_src=X_src, X_target=X_target, features=features, p_val=p_val)
    self.test_size = test_size
    self.n_runs = n_runs
    self.alpha = alpha
    self.kwargs = kwargs
calculate_drift(return_drift_score_target=False)

Calculates the drift metrics. The returned drift metrics are in a dictionary with the next keys:

  • drift_detected: indicates if drift is detected or not, comparing the indicated p_val and the obtained p-value. The obtained p-value is based on a binomial test that takes into account the number of times that the AUC of the domain classifier was higher than random guesses.
  • drift_score: this is the average AUC obtained by the domain classifier over the different runs.

If the parameter return_drift_score_target is True, then it also returns a dataframe with a drift score for each sample in the target dataset. This score is the predicted probability by the domain classifier and can help to detect the drifted samples.

Parameters:

Name Type Description Default
return_drift_score_target boolean

Indicates if additionally return a dataframe with a drift score for each sample in the target dataset

False

Returns:

Type Description
dict

Dictionary with the drift metrics. If parameter return_drift_score_target is set to True, then it also

returns a dataframe with the drift scores for the target dataset.

Source code in mercury/monitoring/drift/domain_classifier_drift_detector.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def calculate_drift(self, return_drift_score_target=False):
    """
    Calculates the drift metrics. The returned drift metrics are in a dictionary with the next keys:

    - drift_detected: indicates if drift is detected or not, comparing the indicated `p_val` and the obtained
        p-value. The obtained p-value is based on a binomial test that takes into account the number of times
        that the AUC of the domain classifier was higher than random guesses.
    - drift_score: this is the average AUC obtained by the domain classifier over the different runs.

    If the parameter `return_drift_score_target` is True, then it also returns a dataframe with a drift score for
    each sample in the target dataset. This score is the predicted probability by the domain classifier and can
    help to detect the drifted samples.

    Args:
        return_drift_score_target (boolean):
            Indicates if additionally return a dataframe with a drift score for each sample in the target dataset

    Returns:
        (dict): Dictionary with the drift metrics. If parameter `return_drift_score_target` is set to True, then it also
        returns a dataframe with the drift scores for the target dataset.
    """

    metrics_domain_classifier = []
    X_target_hat = []
    for _ in range(self.n_runs):
        m_domain_classifier, model = self._train_domain_classifier(test_size=self.test_size)
        metrics_domain_classifier.append(m_domain_classifier)
        if return_drift_score_target:
            X_target_hat.append(model.predict_proba(self.X_target)[:, 1])

    self.drift_metrics = self._aggregate_domain_classifier_metrics(metrics_domain_classifier)
    self.drift_metrics["drift_detected"] = self.drift_metrics["p_val"] < self.p_val
    if return_drift_score_target:
        df_drift_scores = pd.DataFrame(self.X_target, columns=self.features)
        df_drift_scores["drift_score"] = np.mean(X_target_hat, axis=0)
        return self.drift_metrics, df_drift_scores
    else:
        return self.drift_metrics

drift_simulation

BatchDriftGenerator(X, schema=None)

This is a simulator for applying synthetic perturbations to a given dataset, so real life drift situations can be generated.

It allows generating several typical drift-like perturbations. In particular, it has the

following ones implemented

1) Shift drift: Each one of the numerical features will be shifted by an amount N. With this, you will be able to simulate situations where a particular feature increases of decreases. 2) Recodification drift: This operations simulates the situation where a particular feature changes its codification (due to some change in a data ingestion pipeline). For example, imagine a feature codified as [A, B, C] and, for some reason, category "C" becomes "A" and vice versa. 3) Missing value drift: Artificially increases the percentage of NaNs on a given feature or list of features. 4) Hyperplane rotation drift: This method operates by modifying a set of features at once but attempting to preserve the relationships they have. In other terms, it tries to change the joint distribution of N continuous variables at once. 5) Outliers drift: This method artificially introduces outliers in the specified columns. You can use one of the existing methods to inject outliers or create your own method. Check the method documentation for outliers_drift for further documentation

Parameters:

Name Type Description Default
X Union[pd.DataFrame, np.ndarray]

Dataset features

required
names_categorical

An optional list with the names of the categorical variables. If given,

required
Source code in mercury/monitoring/drift/drift_simulation.py
39
40
41
42
43
44
45
46
47
48
def __init__(
        self, X: Union[pd.DataFrame, np.ndarray],
        schema: DataSchema = None,
):
    self.X = X

    if isinstance(self.X, np.ndarray):
        self.X = pd.DataFrame(self.X, columns=[f'col{i}' for i in range(self.X.shape[-1])])

    self._schema = schema
__get_rotation_matrix(n, deg)

Returns a rotation matrix for a R^n space.

Parameters:

Name Type Description Default
n int

Dimensionality of the space

required
deg float

amount of rotation to be applied. In degrees.

required

Returns:

Type Description
np.ndarray

nxn dimensional rotation matrix

Source code in mercury/monitoring/drift/drift_simulation.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def __get_rotation_matrix(self, n: int, deg: float) -> np.ndarray:
    """ Returns a rotation matrix for a R^n space.

    Args:
        n: Dimensionality of the space
        deg: amount of rotation to be applied. In degrees.

    Returns:
        nxn dimensional rotation matrix
    """
    # https://en.wikipedia.org/wiki/Orthogonal_matrix#:~:text=Canonical%20form%5Bedit%5D
    theta = deg * np.pi / 180
    rot = np.zeros(shape=(n, n))

    for i in range(0, n - 1, 2):
        rot[i, i] = np.cos(theta)
        rot[i, i + 1] = -np.sin(theta)
        rot[i + 1, i] = np.sin(theta)
        rot[i + 1, i + 1] = np.cos(theta)
    if n % 2 == 1:
        rot[-1, -1] = 1
    return rot
__remap_categorical(df, colname)

Randomly shuffles the categories of a particular column

Source code in mercury/monitoring/drift/drift_simulation.py
243
244
245
246
247
248
249
250
def __remap_categorical(self, df: pd.DataFrame, colname: str):
    """ Randomly shuffles the categories of a particular column
    """
    original_ids = df[colname].dropna().unique()
    remaped_ids = np.array(original_ids)
    np.random.shuffle(remaped_ids)
    remap_dic = {x: y for x, y in zip(original_ids, remaped_ids)}
    df.loc[:, colname] = df.loc[:, colname].map(remap_dic)
hyperplane_rotation_drift(cols=None, force=10.0)

Adds drift to several features at once by rotating them.

Parameters:

Name Type Description Default
cols List[str]

columns which will be jointly rotated.

None
force float

rotation amount in degrees (0 or 360 = no transformation)

10.0

Returns:

Type Description
BatchDriftGenerator

An updated version of the simulator.

Source code in mercury/monitoring/drift/drift_simulation.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def hyperplane_rotation_drift(self, cols: List[str] = None, force: float = 10.0) -> "BatchDriftGenerator":
    """ Adds drift to several features at once by rotating them.

    Args:
        cols: columns which will be jointly rotated.
        force: rotation amount in degrees (0 or 360 = no transformation)
    Returns:
        An updated version of the simulator.
    """
    force = abs(force)
    columns = self.schema.continuous_feats if cols is None else cols
    rotmat = self.__get_rotation_matrix(len(columns), force)
    # Rows with NaNs wont receive the transformation
    nan_entries = self.X.loc[:, columns].isna().any(axis=1)
    self.X.loc[~nan_entries, columns] = self.X.loc[~nan_entries, columns].values @ rotmat
    return self
missing_val_drift(cols=None, percent=0.1)

Randomly injects NaN values in a column.

Parameters:

Name Type Description Default
cols List[str]

Columns where this operation will be performed

None
percent Union[float, List[float]]

Percentage of values which will become Null values.

0.1

Returns:

Type Description
BatchDriftGenerator

An updated version of the simulator.

Source code in mercury/monitoring/drift/drift_simulation.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def missing_val_drift(self, cols: List[str] = None,
                      percent: Union[float, List[float]] = .1) -> "BatchDriftGenerator":
    """ Randomly injects NaN values in a column.

    Args:
        cols: Columns where this operation will be performed
        percent: Percentage of values which will become Null values.
    Returns:
        An updated version of the simulator.
    """
    if type(cols) == list and type(percent) == list and len(cols) != len(percent):
        raise ValueError("If both cols and percent are lists, they must have the same size.")

    drifted_df = self.X
    cols = drifted_df.columns if cols is None else cols
    for i, col in enumerate(cols):
        p = percent[i] if type(percent) == list else percent
        mask = np.random.uniform(0, 1, len(self.X)) < p
        drifted_df.loc[mask, col] = np.nan

    return self
outliers_drift(cols=None, method='percentile', method_params=None)

Injects outliers in cols using the method specified in method. Parameters of the method can be specified using method_params.

Parameters:

Name Type Description Default
cols List[str]

List of columns names to generate outliers.

None
method Union[str, Callable]

string of the method to generate to use or custom function to generate outliers. If using string, the current available method are 'percentile' and 'value'. If using 'percentile' values in the dataframe are replaced by a percentile of that column. The percentile to use is specified in key 'percentile' in method_params dictionary, and the key 'proportion_outliers' specifies the proportion of outliers to set in the column. If using 'value', a custom value is set in the column. The value is specified with the key 'value' in method_params and the key proportion_outliers specifies the proportion of outliers to set in the column. When using a custom function, the interface that should follow is fn(X: np.array, params: dict = None), where X will be an array of shape==1 to apply the outlier generation and params is the dictionary where parameters of the method are passed.

'percentile'
method_params dict

dictionary containing the parameters to use in method

None

Returns:

Type Description
BatchDriftGenerator

An updated version of the simulator.

Source code in mercury/monitoring/drift/drift_simulation.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def outliers_drift(
    self, cols: List[str] = None, method: Union[str, Callable] = "percentile", method_params: dict = None
) -> "BatchDriftGenerator":
    """
    Injects outliers in `cols` using the method specified in `method`. Parameters of the method can
    be specified using `method_params`.

    Args:
          cols: List of columns names to generate outliers.
          method: string of the method to generate to use or custom function to generate outliers. If
            using string, the current available method are 'percentile' and 'value'. If using 'percentile'
            values in the dataframe are replaced by a percentile of that column. The percentile to use is
            specified in key 'percentile' in `method_params` dictionary, and the key 'proportion_outliers'
            specifies the proportion of outliers to set in the column. If using 'value', a custom value is
            set in the column. The value is specified with the key 'value' in `method_params` and the
            key `proportion_outliers` specifies the proportion of outliers to set in the column.
            When using a custom function, the interface that should follow is
            `fn(X: np.array, params: dict = None)`, where `X` will be an array of shape==1 to apply
            the outlier generation and `params` is the dictionary where parameters of the method
            are passed.
          method_params: dictionary containing the parameters to use in `method`

    Returns:
        An updated version of the simulator.
    """
    drifted_df = self.X

    if cols is None:
        cols = self.schema.continuous_feats + self.schema.discrete_feats

    if isinstance(method, Callable):
        generate_outliers_fn = method
    else:
        generate_outliers_fn = _get_outlier_generation_fn(method)

    for col in cols:
        outliers = generate_outliers_fn(drifted_df.loc[:, col].values, method_params)
        drifted_df.loc[:, col] = outliers

    return self
recodification_drift(cols=None)

Adds drift by randomly remapping the categories of cols. For example, if you have a column with domain: ["A", "B", "C"], then, this operation would swap all the "A" with "C" (and vice versa).

Note the swaps are random. The same remapping will not be repeated.

Parameters:

Name Type Description Default
cols List[str]

List of columns names to be recoded

None

Returns:

Type Description
BatchDriftGenerator

An updated version of the simulator.

Source code in mercury/monitoring/drift/drift_simulation.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def recodification_drift(self, cols: List[str] = None) -> "BatchDriftGenerator":
    """ Adds drift by randomly remapping the categories of cols.
    For example, if you have a column with domain: ["A", "B", "C"], then, this
    operation would swap all the "A" with "C" (and vice versa).

    Note the swaps are random. The same remapping will not be repeated.

    Args:
          cols: List of columns names to be recoded

    Returns:
        An updated version of the simulator.
    """
    drifted_df = self.X
    cols = self.schema.categorical_feats if cols is None else cols
    for col in cols:
        self.__remap_categorical(drifted_df, col)

    return self
scale_drift(cols=None, mean=1, sd=0.05, iqr=None)

Adds drift to one or several features by multiplying then by some random factor (normally close to 1).

Parameters:

Name Type Description Default
cols List[str]

List of columns names where the operation will be performed

None
mean

The mean value of the random scale (if iqr is not given). Default is 1.

1
sd

The standard deviation of the random scale (if iqr is not given). Default is 0.05.

0.05
iqr

An alternative way to define the random scale is by giving the interquartile range as a list of 2 elements. E.g. [1, 1.2]

None

Returns:

Type Description
BatchDriftGenerator

An updated version of the simulator.

Source code in mercury/monitoring/drift/drift_simulation.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def scale_drift(
    self, cols: List[str] = None, mean=1, sd=0.05, iqr=None
) -> "BatchDriftGenerator":
    """
    Adds drift to one or several features by multiplying then by some
    random factor (normally close to 1).

    Args:
        cols: List of columns names where the operation will be performed
        mean: The mean value of the random scale (if iqr is not given). Default is 1.
        sd: The standard deviation of the random scale (if iqr is not given). Default is 0.05.
        iqr: An alternative way to define the random scale is by giving the interquartile range as
            a list of 2 elements. E.g. [1, 1.2]

    Returns:
        An updated version of the simulator.
    """
    if iqr is not None:
        # q1, q3 = mean +/- .67449*sd
        q1, q3 = iqr[0], iqr[1]

        mean = (q1 + q3) / 2
        sd = (q3 - q1) / 1.34898

    drifted_df = self.X

    if cols is None:
        cols = self.schema.continuous_feats

    for col in cols:
        scale = np.random.normal(mean, sd, len(drifted_df))
        drifted_df.loc[:, col] *= scale

    return self
shift_drift(cols=None, force=5.0, noise=0.0)

Adds drift to one or several features by adding or substracting a force quantity. You could also add random noise to these shifts via the noise parameter. The shifted amount will follow a gaussian distribution with mean force and std noise.

Parameters:

Name Type Description Default
cols List[str]

List of columns names where the operation will be performed

None
force

Amount shift each column will receive

5.0
noise

Amount of standard deviation the shift of each column will receive.

0.0

Returns:

Type Description
BatchDriftGenerator

An updated version of the simulator.

Source code in mercury/monitoring/drift/drift_simulation.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def shift_drift(self, cols: List[str] = None, force=5.0, noise=0.0) -> "BatchDriftGenerator":
    """ Adds drift to one or several features by adding or substracting a `force`
    quantity. You could also add random noise to these shifts via the `noise`
    parameter. The shifted amount will follow a gaussian distribution with mean
    `force` and std `noise`.

    Args:
        cols: List of columns names where the operation will be performed
        force: Amount shift each column will receive
        noise: Amount of standard deviation the shift of each column will receive.

    Returns:
        An updated version of the simulator.
    """
    drifted_df = self.X
    columns = self.schema.continuous_feats if cols is None else cols
    for i, col in enumerate(columns):
        f = force[i] if type(force) == list else force
        n = noise[i] if type(noise) == list else noise
        shifts = np.random.normal(f, n, len(drifted_df))
        drifted_df.loc[:, col] += shifts

    return self

histogram_distance_drift_detector

HistogramDistanceDrift(distr_src=None, distr_target=None, features=None, correction=None, p_val=None, n_permutations=50, distance_metric='hellinger')

Bases: BaseBatchDriftDetector

The HistogramDistanceDrift class allows us to detect drift in the source dataset distributions distr_src and the target dataset distributions distr_target by calculating distances between the distributions. For each feature, the distance between its distributions in distr_src and distr_target is calculated. The distance function to use is indicated by the distance_metric parameter. For each feature, a p-value is also calculated. This is calculated by doing a permutation test, where data from distr_src and distr_target are put together and sampled to calculate a new simulated distance n_permutations times. The p-value then is obtained by computing the fraction of simulated distances equal or greater than the observed distance. The source and target distributions are specified when creating the object. Then, the method calculate_drift() is used to obtain a dict of metrics.

Parameters:

Name Type Description Default
distr_src list of np.arrays

The source dataset distributions. It's a list of np.arrays, where each np.array represents the counts of each bin for that feature. The features and its bins must be in the same order than the distr_target

None
distr_target list of np.arrays

The target dataset distributions. It's a list of np.arrays, where each np.array represents the counts of each bin for that feature. The features and its bins must be in the same order than the distr_src

None
features list

The name of the features of X_src and X_target.

None
distance_metric string

The distance metric to use. Available metrics are "hellinger", "jeffreys" or "psi". Default value is "hellinger"

'hellinger'
p_val float

Threshold to be used with the p-value obtained to decide if there is drift or not (affects drift_detected metric when calling calculate_drift()) Lower values will be more conservative when indicating that drift exists in the dataset. Default value is 0.05.

None
correction string

Since multiple tests are performed, this parameter controls whether to perform a correction of the p-value or not (affects drift_detected metric when calling calculate_drift()). If None, it doesn't perform a correction Default value is "bonferroni", which makes the bonferroni correction taking into account the number of tests

None
n_permutations int

The number of permutations to perform to calculate the p-value. A higher number will calculate a more accurate p-value, but it will be computationally more expensive. Default value is 50

50
Example
>>> from mercury.monitoring.drift.histogram_distance_drift_detector import HistogramDistanceDrift
# Create example distributions
>>> histograms_src = [np.array([5, 5, 5]), np.array([3, 6, 9])]
>>> histograms_target = [np.array([4, 1, 8]), np.array([20, 3, 2])]
# Calculate drift
>>> drift_detector = HistogramDistanceDrift(
...     distr_src=histograms_src, distr_target=histograms_target, features=["f1", "f2"],
...     distance_metric="hellinger", correction="bonferroni", p_val=0.01, n_permutations=100)
>>> drift_metrics = drift_detector.calculate_drift()
# Print the metrics
>>> print("Drift Score: ", drift_metrics["score"])
>>> print("Is drift detected? ", drift_metrics["drift_detected"])
# Plot feature drift scores
>>> ax = drift_detector.plot_feature_drift_scores(top_k=7)
# Get drifted features
>>> print(drift_detector.get_drifted_features())
# Plot histograms of drifted features
>>> drift_detector.plot_histograms_drifted_features()
Source code in mercury/monitoring/drift/histogram_distance_drift_detector.py
69
70
71
72
73
74
75
76
77
def __init__(
    self, distr_src=None, distr_target=None, features=None,
    correction=None, p_val=None, n_permutations=50, distance_metric="hellinger"
):
    super().__init__(
        distr_src=distr_src, distr_target=distr_target, features=features, correction=correction, p_val=p_val
    )
    self.n_permutations = n_permutations
    self.distance_metric = distance_metric
calculate_drift()

Computes drift by calculating histogram-based distances between the source dataset distributions distr_src and the target dataset distributions distr_target. It calculates the distance individually for each feature and obtains the p-values and distance for each. The returned dictionary contains the next metrics:

  • p_vals: np.array of the p-values returned by the permutation test for each feature.
  • scores: np.array of the distances for each feature.
  • score: average of all the distances.
  • drift_detected: final decision if drift is detected. If any of the p-values of the features is lower than the threshold, then this is set to 1. Otherwise is 0. This will depend on the specified p_val and correction values.
  • threshold: final threshold applied to decide if drift is detected. It depends on the specified p_val and correction values.

Returns:

Type Description
dict

Dictionary with the drift metrics as specified.

Source code in mercury/monitoring/drift/histogram_distance_drift_detector.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def calculate_drift(self):
    """
    Computes drift by calculating histogram-based distances between the source dataset distributions `distr_src`
    and the target dataset distributions `distr_target`. It calculates the distance individually for each feature
    and obtains the p-values and distance for each.
    The returned dictionary contains the next metrics:

    - p_vals: np.array of the p-values returned by the permutation test for each feature.
    - scores: np.array of the distances for each feature.
    - score: average of all the distances.
    - drift_detected: final decision if drift is detected. If any of the p-values of the features is lower than
        the threshold, then this is set to 1. Otherwise is 0. This will depend on the specified `p_val` and
        `correction` values.
    - threshold: final threshold applied to decide if drift is detected. It depends on the specified
        `p_val` and `correction` values.

    Returns:
        (dict): Dictionary with the drift metrics as specified.
    """
    p_vals_estimated, distances = self._calculate_histogram_distance()
    drift_pred, threshold = self._apply_correction(p_vals_estimated)
    self.drift_metrics = {
        "p_vals": p_vals_estimated,
        "score": np.mean(distances),
        "scores": distances,
        "drift_detected": drift_pred,
        "threshold": threshold
    }
    return self.drift_metrics

ks_drift_detector

KSDrift(X_src=None, X_target=None, features=None, correction='bonferroni', p_val=0.05)

Bases: BaseBatchDriftDetector

This class is a drift detector that allows us to detect drift in a source dataset X_src and a target dataset X_target by calculating the Kolmogorov-Smirnov (KS) statistic. For each feature in the datasets, a Kolmogorov-Smirnov test is performed, which calculates the distance between the feature using the samples of each dataset. A p-value is also obtained when performing the tests, which are used to decide if exists drift in the datasets. The source and target datasets are specified when creating the object together with other available parameters. Then the method calculate_drift() is used to obtain a dict of metrics.

Parameters:

Name Type Description Default
X_src np.array

The source dataset.

None
X_target np.array

The target dataset.

None
features list

The name of the features of X_src and X_target.

None
p_val float

Threshold to be used with the p-value obtained to decide if there is drift or not (affects drift_detected metric when calling calculate_drift()). Lower values will be more conservative when indicating that drift exists in the dataset. Default value is 0.05.

0.05
correction string

Since multiple tests are performed, this parameter controls whether to perform a correction of the p-value or not (affects drift_detected metric when calling calculate_drift()). If None, it doesn't perform a correction Default value is "bonferroni", which makes the bonferroni correction taking into account the number of tests

'bonferroni'
Example
>>> from mercury.monitoring.drift.ks_drift_detector import KSDrift
# Create example datasets
>>> X_src = np.array([np.random.normal(0, 1, 1000), np.random.normal(0, 1, 1000)]).T
>>> X_target = np.array([np.random.normal(0, 1, 500) + 2, np.random.normal(0, 1, 500)]).T
# Calculate drift
>>> drift_detector = KSDrift(X_src, X_target, features=["f1", "f2"], p_val=0.01, correction="bonferroni")
>>> drift_metrics = drift_detector.calculate_drift()
# Print the metrics
>>> print("Drift Score: ", drift_metrics["score"])
>>> print("Is drift detected? ", drift_metrics["drift_detected"])
# Plot feature drift scoress
>>> ax = drift_detector.plot_feature_drift_scores(top_k=7)
# Get drifted features
>>> print(drift_detector.get_drifted_features())
# Plot distributions of drifted features
>>> drift_detector.plot_distribution_drifted_features()
Source code in mercury/monitoring/drift/ks_drift_detector.py
56
57
def __init__(self, X_src=None, X_target=None, features=None, correction="bonferroni", p_val=0.05):
    super().__init__(X_src=X_src, X_target=X_target, features=features, correction=correction, p_val=p_val)
calculate_drift()

Computes Drift using the Kolmogorov-Smirnov KS Test, which calculates the largest difference of the cumulative density functions of two variables. It performs the test individually for each feature and obtains the p-values and the statistics for each feature. The returned dictionary contains the next metrics:

  • p_vals (np.array): the p-values returned by the KS test for each feature.
  • scores (np.array): the distances (ks statistic) return by the KS test for each feature.
  • score: average of all the distances.
  • drift_detected: final decision if drift is detected. If any of the p-values of the features is lower than the threshold, then this is set to 1. Otherwise is 0. This will depend on the specified p_val and correction values.
  • threshold: final threshold applied to decide if drift is detected. It depends on the specified p_val and correction values.

Returns:

Type Description
dict

Dictionary with the drift metrics as specified.

Source code in mercury/monitoring/drift/ks_drift_detector.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def calculate_drift(self):
    """
    Computes Drift using the Kolmogorov-Smirnov KS Test, which calculates the largest difference of the cumulative
    density functions of two variables.
    It performs the test individually for each feature and obtains the p-values and the statistics for each feature.
    The returned dictionary contains the next metrics:

    - p_vals (np.array): the p-values returned by the KS test for each feature.
    - scores (np.array): the distances (ks statistic) return by the KS test for each feature.
    - score: average of all the distances.
    - drift_detected: final decision if drift is detected. If any of the p-values of the features is lower than
        the threshold, then this is set to 1. Otherwise is 0. This will depend on the specified `p_val` and
        `correction` values.
    - threshold: final threshold applied to decide if drift is detected. It depends on the specified
        `p_val` and `correction` values.

    Returns:
        (dict): Dictionary with the drift metrics as specified.

    """

    p_vals_test, distances = self._calculate_ks()
    drift_pred, threshold = self._apply_correction(p_vals_test)
    self.drift_metrics = {
        "p_vals": p_vals_test,
        "scores": distances,
        "score": np.mean(distances),
        "drift_detected": drift_pred,
        "threshold": threshold
    }
    return self.drift_metrics

metrics

bootstrap_eval_drift(y_true_src=None, y_pred_src=None, y_true_target=None, y_pred_target=None, eval_fn=None, resample_size_src=None, resample_size_target=None, num_resamples=50, drift_detector=None)

This function performs resampling with replacement for a source set and a target set to obtain a distribution of some evaluation metric for both sets. Then, applies a drift detector to calculate the drift of the metric between the source and target sets. More concretely, first performs resampling with replacement in the arrays y_true_src and y_pred_src and calculates a metric using the eval_fn function in order to obtain a metric for each resample. Thus, we obtain a distribution of a metric for the source set. Next, it performs the same procedure with the arrays y_true_target and y_pred_target, which gives a distribution of the metric for the target dataset. Finally, applies a drift detector (KSDrift if not specified) in order to calculate the drift between the source and target obtained distributions.

Parameters:

Name Type Description Default
y_true_src np.array

array which will be the first argument when calling eval_fn for the source dataset. It can contain the labels of the source dataset if eval_fn is defined to accept labels as the first argument.

None
y_pred_src np.array

array which will be the second argument when calling eval_fn for the source dataset. It can contain the source dataset model predictions if the eval_fn is defined to accept predictions as the second argument.

None
y_true_target np.array

array which will the first argument when calling eval_fn for the target dataset. It can contain the labels of the target dataset if eval_fn is defined to accept labels as the first argument.

None
y_pred_target np.array

array which will be the second argument when calling eval_fn for the target dataset. It can contain the target dataset model predictions if the eval_fn is defined to accept predictions as the second argument.

None
eval_fn Callable

function that must return a float. For the source dataset, receives a resamples of y_true_src as first parameter and resamples of y_pred as second parameter. Similarly, it uses resamples of y_true_target as first parameter and resamples of y_pred_target as second parameter for the target dataset. For example, it can be a function that calculates the accuracy given y_true and y_pred, or it can be a weighted average of different metrics.

None
resample_size_src int

size of each resample for the source dataset. If None, length of the source.

None
resample_size_target int

size of each resample for the target dataset. If None, length of the target.

None
num_resamples int

number of resamples

50
drift_detector

drift detector to use. If None it will create a KSDrift object

None

Returns:

Name Type Description
drift_metrics dict

dictionary with the obtained drift metrics

drift_detector BaseBatchDriftDetector

drift detector object used to calculate drift

dist_src np.array

obtained metrics distribution for the source dataset

dist_target np.array

obtained metrics distribution for the target dataset

Example
>>> y_true_src = np.array([0,0,0,0,0,1,1,1,1,1])
>>> y_pred_src = np.array([0,0,0,0,1,0,1,1,1,1])
>>> y_true_target = np.array([0,0,0,0,0,1,1,1,1,1])
>>> y_pred_target = np.array([0,1,1,1,1,0,0,0,0,1])
>>> def eval_acc(y_true, y_pred):
...     return np.sum(y_true == y_pred) / y_true.shape[0]

>>> drift_metrics, drift_detector, dist_src, dist_target = bootstrap_eval_drift(
...        y_true_src=y_true_src, y_pred_src=y_pred_src, y_true_target=y_true_target, y_pred_target=y_pred_target,
...        resample_size_src=8, resample_size_target=8, num_resamples=1000,
...        eval_fn=eval_acc, drift_detector=None
...    )
>>> drift_metrics["drift_detected]
True
>>> drift_metrics["score"]
0.943
Source code in mercury/monitoring/drift/metrics.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def bootstrap_eval_drift(
    y_true_src=None, y_pred_src=None, y_true_target=None, y_pred_target=None, eval_fn=None,
    resample_size_src=None, resample_size_target=None, num_resamples=50, drift_detector=None
):

    """
    This function performs resampling with replacement for a source set and a target set to obtain
    a distribution of some evaluation metric for both sets. Then, applies a drift detector to calculate
    the drift of the metric between the source and target sets. More concretely, first performs resampling
    with replacement in the arrays `y_true_src` and `y_pred_src` and calculates a metric using the
    `eval_fn` function in order to obtain a metric for each resample. Thus, we obtain a distribution of a
    metric for the source set. Next, it performs the same procedure with the arrays `y_true_target` and
    `y_pred_target`, which gives a distribution of the metric for the target dataset. Finally, applies
    a drift detector (KSDrift if not specified) in order to calculate the drift between the source and
    target obtained distributions.

    Args:
        y_true_src (np.array): array which will be the first argument when calling `eval_fn` for the source
            dataset. It can contain the labels of the source dataset if `eval_fn` is defined to accept
            labels as the first argument.
        y_pred_src (np.array): array which will be the second argument when calling `eval_fn` for the source
            dataset. It can contain the source dataset model predictions if the `eval_fn` is defined to
            accept predictions as the second argument.
        y_true_target (np.array): array which will the first argument when calling `eval_fn` for the target
            dataset. It can contain the labels of the target dataset if `eval_fn` is defined to accept
            labels as the first argument.
        y_pred_target (np.array): array which will be the second argument when calling `eval_fn` for the
            target dataset. It can contain the target dataset model predictions if the `eval_fn` is defined
            to accept predictions as the second argument.
        eval_fn (Callable): function that must return a float. For the source dataset, receives a resamples
            of `y_true_src` as first parameter and resamples of `y_pred` as second parameter. Similarly, it
            uses resamples of `y_true_target` as first parameter and resamples of `y_pred_target` as second
            parameter for the target dataset. For example, it can be a function that calculates the accuracy
            given `y_true` and `y_pred`, or it can be a weighted average of different metrics.
        resample_size_src (int): size of each resample for the source dataset. If None, length of the source.
        resample_size_target (int): size of each resample for the target dataset. If None, length of the target.
        num_resamples (int): number of resamples
        drift_detector: drift detector to use. If None it will create a KSDrift object

    Returns:
            drift_metrics (dict): dictionary with the obtained drift metrics
            drift_detector (BaseBatchDriftDetector): drift detector object used to calculate drift
            dist_src (np.array): obtained metrics distribution for the source dataset
            dist_target (np.array): obtained metrics distribution for the target dataset

    Example:
        ```python
        >>> y_true_src = np.array([0,0,0,0,0,1,1,1,1,1])
        >>> y_pred_src = np.array([0,0,0,0,1,0,1,1,1,1])
        >>> y_true_target = np.array([0,0,0,0,0,1,1,1,1,1])
        >>> y_pred_target = np.array([0,1,1,1,1,0,0,0,0,1])
        >>> def eval_acc(y_true, y_pred):
        ...     return np.sum(y_true == y_pred) / y_true.shape[0]

        >>> drift_metrics, drift_detector, dist_src, dist_target = bootstrap_eval_drift(
        ...        y_true_src=y_true_src, y_pred_src=y_pred_src, y_true_target=y_true_target, y_pred_target=y_pred_target,
        ...        resample_size_src=8, resample_size_target=8, num_resamples=1000,
        ...        eval_fn=eval_acc, drift_detector=None
        ...    )
        >>> drift_metrics["drift_detected]
        True
        >>> drift_metrics["score"]
        0.943
        ```
    """

    if resample_size_src is None:
        resample_size_src = len(y_true_src)
    if resample_size_target is None:
        resample_size_target = len(y_true_target)

    # Get distributions for source dataset
    dist_src = bootstrap_eval(y_true_src, y_pred_src, eval_fn, num_resamples, resample_size_src)

    # Get distributions for target dataset
    dist_target = bootstrap_eval(y_true_target, y_pred_target, eval_fn, num_resamples, resample_size_target)

    # Apply drift detector. If it is not passed, use ks drift
    if drift_detector is None:
        drift_detector = KSDrift(
            X_src=dist_src.reshape(-1, 1), X_target=dist_target.reshape(-1, 1), p_val=0.001, correction="bonferroni"
        )
    else:
        drift_detector.set_datasets(X_src=dist_src.reshape(-1, 1), X_target=dist_target.reshape(-1, 1))
    drift_metrics = drift_detector.calculate_drift()

    return drift_metrics, drift_detector, dist_src, dist_target

hellinger_distance(p, q, normalize=True)

Computes Hellinger distance between two histograms as specified in this paper: http://users.rowan.edu/~polikar/RESEARCH/PUBLICATIONS/cidue11.pdf Both histograms are represented by numpy arrays with the same number of dimensions, where each dimension represents the counts for that particular bin. It is assumed that the bin edges are the same.

Parameters:

Name Type Description Default
p np.array

First histogram. Each dimension represents one bin and bin edges are assumed to be the same as in q.

required
q np.array

Second histogram. Each dimension represents one bin and bin edges are assumed to be the same as in p.

required
normalize bool

Whether to normalize the histograms. If True the proportions of each bin are calculated first and then the distance is calculated with the proportions

True
Returns

(float): float representing the Hellinger distance

required
Source code in mercury/monitoring/drift/metrics.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def hellinger_distance(p, q, normalize=True):
    """
    Computes Hellinger distance between two histograms as specified in this paper:
    http://users.rowan.edu/~polikar/RESEARCH/PUBLICATIONS/cidue11.pdf
    Both histograms are represented by numpy arrays with the same number of dimensions,
    where each dimension represents the counts for that particular bin.
    It is assumed that the bin edges are the same.

    Args:
        p (np.array): First histogram. Each dimension represents one bin and bin edges are assumed to be the same as in q.
        q (np.array): Second histogram. Each dimension represents one bin and bin edges are assumed to be the same as in p.
        normalize (bool): Whether to normalize the histograms. If True the proportions of each bin are calculated first
            and then the distance is calculated with the proportions

        Returns:
            (float): float representing the Hellinger distance

    """

    if len(p) != len(q):
        raise ValueError("p and q must have the same size and represent the same bins")

    if normalize:
        p = p / np.sum(p)
        q = q / np.sum(q)

    distances = (np.sqrt(p) - np.sqrt(q)) ** 2
    return np.sqrt(np.sum(distances))

jeffreys_divergence(p, q, normalize=True)

Computes Jeffreys divergence between two histograms as specified in this paper: https://www.cs.cmu.edu/~efros/courses/LBMV07/Papers/rubner-jcviu-00.pdf Both histograms are represented by numpy arrays with the same number of dimensions, where each dimension represents the counts for that particular bin. It is assumed that the bin edges are the same.

Parameters:

Name Type Description Default
p np.array

First histogram. Each dimension represents one bin and bin edges are assumed to be the same as in q.

required
q np.array

Second histogram. Each dimension represents one bin and bin edges are assumed to be the same as in p.

required
normalize bool

Whether to normalize the histograms. If True the proportions of each bin are calculated first and then the distance is calculated with the proportions

True
Returns

(float): float representing the hellinger distance

required
Source code in mercury/monitoring/drift/metrics.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def jeffreys_divergence(p, q, normalize=True):

    """
    Computes Jeffreys divergence between two histograms as specified in this paper:
    https://www.cs.cmu.edu/~efros/courses/LBMV07/Papers/rubner-jcviu-00.pdf
    Both histograms are represented by numpy arrays with the same number of dimensions,
    where each dimension represents the counts for that particular bin.
    It is assumed that the bin edges are the same.

    Args:
        p (np.array): First histogram. Each dimension represents one bin and bin edges are assumed to be the same as in q.
        q (np.array): Second histogram. Each dimension represents one bin and bin edges are assumed to be the same as in p.
        normalize (bool): Whether to normalize the histograms. If True the proportions of each bin are calculated first
            and then the distance is calculated with the proportions

        Returns:
            (float): float representing the hellinger distance

    """

    if len(p) != len(q):
        raise ValueError("p and q must have the same size and represent the same bins")

    if normalize:
        p = p / np.sum(p)
        q = q / np.sum(q)

    m = (p + q) / 2
    distances = rel_entr(p, m) + rel_entr(q, m)
    return np.sum(distances)

psi(p, q, normalize=True, eps=0.0001)

Calculates the Population Stability Index (PSI). The metric helps to measure the stability between two population samples. It assumes that the two population samples have already been splitted in bins, so the histograms are the input to this function.

Parameters:

Name Type Description Default
p np.array

First histogram. Each dimension represents one bin and bin edges are assumed to be the same as in q.

required
q np.array

Second histogram. Each dimension represents one bin and bin edges are assumed to be the same as in p.

required
normalize bool

Whether to normalize the histograms. If True the proportions of each bin are calculated first and then the distance is calculated with the proportions

True

Returns:

Type Description
float

float representing the PSI

Example
>>> a = np.array([12, 11, 14, 12, 12, 10, 12, 6, 6, 5])
>>> b = np.array([11, 11, 12, 13, 11, 11, 13, 5, 7, 6])
>>> psi = psi(a, b)
Source code in mercury/monitoring/drift/metrics.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def psi(p, q, normalize=True, eps=1e-4):

    """
    Calculates the Population Stability Index (PSI). The metric helps to measure the stability between two population
    samples. It assumes that the two population samples have already been splitted in bins, so the histograms are the
    input to this function.

    Args:
        p (np.array): First histogram. Each dimension represents one bin and bin edges are assumed to be the same as in q.
        q (np.array): Second histogram. Each dimension represents one bin and bin edges are assumed to be the same as in p.
        normalize (bool): Whether to normalize the histograms. If True the proportions of each bin are calculated first
            and then the distance is calculated with the proportions

    Returns:
        (float): float representing the PSI

    Example:
        ```python
        >>> a = np.array([12, 11, 14, 12, 12, 10, 12, 6, 6, 5])
        >>> b = np.array([11, 11, 12, 13, 11, 11, 13, 5, 7, 6])
        >>> psi = psi(a, b)
        ```
    """

    if len(p.shape) != 1 or len(q.shape) != 1:
        raise ValueError("p and q must be np.array with len(shape)==1")

    if len(p) != len(q):
        raise ValueError("p and q must have the same size and represent the same bins")

    if normalize is None:
        if np.any(p > 1) or np.any(q > 1):
            normalize = True
        else:
            normalize = False

    if normalize:
        p = p / np.sum(p)
        q = q / np.sum(q)

    # Replace 0's to avoid inf and nans
    p[p == 0] = eps
    q[q == 0] = eps

    psi = (p - q) * np.log(p / q)
    return np.sum(psi)