Drift
mercury.monitoring.drift
auto_encoder_drift_detector
AutoEncoderDriftDetector(p_val=0.05, bootstrap_samples=30, bootstrap_sample_size=10000, autoencoder_kwargs=None, custom_reconstruction_error='mse', fitted_models=None, reference_errors=None)
This class uses several autoencoders for testing whether the joint distributions of two datasets are the same. In order to do this, it makes bootstrap samples from a source dataset (X_src), trains an autoencoder on each one of them (using training/test splits) and it stores the distribution of reconstruction errors (i.e. the error distribution reconstructing the source dataset). The default reconstruction error is the MSE, although you can specify your own.
Then, when target data is available (X_target), it calculates the error distribution on the new environment and compares it with the one obtained on X_src by using a Mann-Withney-U statistical test under the null hypothesis (H0) that no drift exists (that is, the two error samples come from the same distribution).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
p_val |
float
|
Significance level for rejecting the null hypothesis. Default value is 0.05. |
0.05
|
bootstrap_samples |
int
|
How many bootstrap samples to make |
30
|
bootstrap_sample_size |
int
|
The size of each bootstrap sample. |
10000
|
autoencoder_kwargs |
dict
|
Dictionary with arguments for training the internal |
None
|
custom_reconstruction_error |
callable or string
|
Custom function for measuring reconstruction quality. This is only used for the
drift calculation, not for training the autoencoders (use autoencoder_kwargs for that). This
can be a string in (mse, rmse, mae), meaning (Mean Squared Error, Root Mean Squared Error,
Mean Absolute Loss error). Alternatively, you can also provide a custom function with the form
|
'mse'
|
fitted_models |
list
|
List with trained autoencoders (tf.keras.Model). Use it only if you want to "reload" an already trained detector. |
None
|
reference_errors |
List[float]
|
List with the errors obtained at the time of training the detector. Use it only if you want to "reload" an already trained detector. |
None
|
Example
>>> from mercury.monitoring.drift.auto_encoder_drift_detector import AutoEncoderDriftDetector
# Create example datasets
>>> X_src = np.array([np.random.normal(0, 1, 1000), np.random.normal(0, 1, 1000)]).T
>>> X_target = np.array([np.random.normal(0, 1, 500) + 2, np.random.normal(0, 1, 500)]).T
# Train the detector on the source dataset
>>> drift_detector = AutoEncoderDriftDetector()
>>> drift_detector = drift_detector.fit(X_src)
# Use the detector for checking whether drift exists on X_target
>>> drift_detector.calculate_drift(X_target)
"{'pval': 7.645836962858183e-06, 'drift': True}"
Source code in mercury/monitoring/drift/auto_encoder_drift_detector.py
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
|
is_fitted: bool
property
Whether the detector has been fitted and it's ready for calculating drift
calculate_drift(X, return_errors=False)
Checks whether drift exists on a given dataset using X_src as reference:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X |
np.ndarray
|
the target dataset. |
required |
return_errors |
bool
|
Whether to include the reconstruction errors in the result. |
False
|
Returns:
Type | Description |
---|---|
dict
|
Dictionary containing the result of the test and, optionally, the reference |
dict
|
and target reconstruction errors. |
Source code in mercury/monitoring/drift/auto_encoder_drift_detector.py
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
|
fit(X_src)
Trains the detector using a source (reference) dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X_src |
np.ndarray
|
the reference dataset. |
required |
Returns:
Type | Description |
---|---|
AutoEncoderDriftDetector
|
the detector itself. |
Source code in mercury/monitoring/drift/auto_encoder_drift_detector.py
197 198 199 200 201 202 203 204 205 206 207 |
|
base
BaseBatchDriftDetector(X_src=None, X_target=None, distr_src=None, distr_target=None, features=None, correction='bonferroni', p_val=0.05)
Bases: ABC
Base class for batch drift detection.
The class can be extended to create a method for batch drift detection over a source dataset X_src
and target
dataset X_target
, or a source dataset distributions distr_src
and a target dataset distributions distr_target
The main method to implement is the calculate_drift()
method.
Source code in mercury/monitoring/drift/base.py
17 18 19 20 21 22 23 24 25 26 27 |
|
calculate_drift()
abstractmethod
Method to implement in extended classes
Source code in mercury/monitoring/drift/base.py
82 83 84 85 86 87 |
|
get_drifted_features(return_as_indices=False)
Gets the features that are considered to have drift. The method calculate_drift
needs to be called first.
The parameter return_as_indices
controls whether to return the features as indices or names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
return_as_indices |
boolean
|
whether to return the features as indices or names. |
False
|
Returns:
Type | Description |
---|---|
list
|
features with drift. |
Source code in mercury/monitoring/drift/base.py
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
|
plot_distribution_drifted_features(figsize=(8, 4), kde=True, stat='density', common_bins=True, common_norm=False, **kwargs)
Plots the distributions of the features that are considered to have drift. The method calculate_drift
needs to be called first.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
figsize |
tuple
|
figsize to use if |
(8, 4)
|
kde |
bool
|
whether to show the smoothed distribution. Default is True |
True
|
stat |
str
|
Aggregate statistic to compute in each bin. By default is 'density'. Valid values can be checked in seaborn documentation |
'density'
|
common_bins |
bool
|
If True, use the same bins for the target and source dataset. By default is True |
True
|
common_norm |
bool
|
If True and using a normalized statistic, the normalization will apply over the full dataset (src and target). Otherwise, the normalization is applied to each set independently. Default is False. |
False
|
kwargs |
Extra key arguments that can be passed to seaborn |
{}
|
Source code in mercury/monitoring/drift/base.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
|
plot_distribution_feature(idx_feature=None, name_feature=None, ax=None, figsize=(8, 4), kde=True, stat='density', common_bins=True, common_norm=False, **kwargs)
Plots the distribution of a given feature for the source and the target datasets.
The feature can be indicated by the index using the idx_feature
parameter, or by the name using the
name_feature
parameter (only possible if attribute features
was specified.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idx_feature |
int
|
index of the feature |
None
|
name_feature |
string
|
name of the feature |
None
|
ax |
matplotlib.axes._subplots.AxesSubplot
|
Axes object on which the data will be plotted. If not specified it creates one. |
None
|
figsize |
tuple
|
figsize to use if |
(8, 4)
|
kde |
bool
|
whether to show the smoothed distribution. Default is True |
True
|
stat |
str
|
Aggregate statistic to compute in each bin. By default is 'density'. Valid values can be checked in seaborn documentation |
'density'
|
common_bins |
bool
|
If True, use the same bins for the target and source dataset. By default is True |
True
|
common_norm |
bool
|
If True and using a normalized statistic, the normalization will apply over the full dataset (src and target). Otherwise, the normalization is applied to each set independently. Default is False. |
False
|
kwargs |
Extra key arguments that can be passed to seaborn |
{}
|
Returns:
Type | Description |
---|---|
The axes |
Source code in mercury/monitoring/drift/base.py
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
|
plot_feature_drift_scores(top_k=None, ax=None, figsize=(8, 4))
Plots a summary of the scores for each feature. The meaning of the score depends on the subclass that is used.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top_k |
int
|
If specified, plots only the top_k features with higher scores. |
None
|
ax |
matplotlib.axes._subplots.AxesSubplot
|
Axes object on which the data will be plotted. If not specified it creates one. |
None
|
figsize |
tuple
|
Size of the plotted figure |
(8, 4)
|
Source code in mercury/monitoring/drift/base.py
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
|
plot_histograms_drifted_features(figsize=(8, 4))
Plots the histograms of the features that are considered to have drift. The method calculate_drift
needs to be called first.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
figsize |
tuple
|
figsize to use if |
(8, 4)
|
Source code in mercury/monitoring/drift/base.py
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
|
plot_histograms_feature(idx_feature=None, name_feature=None, figsize=(8, 4))
Plots the histograms of a given feature for the source and the target distributions.
The feature can be indicated by the index using the idx_feature
parameter, or by the name using the
name_feature
parameter (only possible if attribute features
was specified.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idx_feature |
int
|
index of the feature |
None
|
name_feature |
string
|
name of the feature |
None
|
figsize |
tuple
|
figsize to use if |
(8, 4)
|
Returns:
Type | Description |
---|---|
The axes |
Source code in mercury/monitoring/drift/base.py
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
|
chi2_drift_detector
Chi2Drift(distr_src=None, distr_target=None, features=None, correction='bonferroni', p_val=0.05)
Bases: BaseBatchDriftDetector
The Chi2Drift class allows us to detect drift in the source dataset distributions distr_src
and the
target dataset distributions distr_target
by performing a chi-square test of independence of variables in a
contingency table for each feature. It does it separately for each feature, calculating the statistic and the
p-value. The source and target distributions are specified when creating the object. Then, the method
calculate_drift()
is used to obtain a dict of metrics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
distr_src |
list of np.arrays
|
The source dataset distributions. It's a list of np.arrays, where each np.array represents the counts of
each bin for that feature. The features and its bins must be in the same order than the |
None
|
distr_target |
list of np.arrays
|
The target dataset distributions. It's a list of np.arrays, where each np.array represents the counts of
each bin for that feature. The features and its bins must be in the same order than the |
None
|
features |
list
|
The name of the features of |
None
|
p_val |
float
|
Threshold to be used with the p-value obtained to decide if there is drift or not (affects
|
0.05
|
correction |
string
|
Since multiple tests are performed, this parameter controls whether to perform a correction of the p-value
or not (affects |
'bonferroni'
|
Example
>>> from mercury.monitoring.drift.chi2_drift_detector import Chi2Drift
# Create example distributions
>>> histograms_src = [np.array([5, 5, 5]), np.array([3, 6, 9])]
>>> histograms_target = [np.array([4, 1, 8]), np.array([20, 3, 2])]
# Calculate drift
>>> drift_detector = Chi2Drift(
... distr_src=histograms_src, distr_target=histograms_target, features=["f1", "f2"],
... correction="bonferroni", p_val=0.01)
>>> drift_metrics = drift_detector.calculate_drift()
# Print the metrics
>>> print("Drift Score: ", drift_metrics["score"])
>>> print("Is drift detected? ", drift_metrics["drift_detected"])
# Plot feature drift scores
>>> ax = drift_detector.plot_feature_drift_scores(top_k=7)
# Get drifted features
>>> print(drift_detector.get_drifted_features())
# Plot histograms of drifted features
>>> drift_detector.plot_histograms_drifted_features()
Source code in mercury/monitoring/drift/chi2_drift_detector.py
59 60 61 62 |
|
calculate_drift()
Computes drift by performing a chi-square test of independence between the source dataset distributions
distr_src
and the target dataset distributions distr_target
. It performs the test individually for each
feature and obtains the p-values and the value of the statistic for each.
The returned dictionary contains the next metrics:
- p_vals: np.array of the p-values returned by the Chi2 test for each feature.
- scores: np.array of the chi2 statistics for each feature.
- score: average of all the scores.
- drift_detected: final decision if drift is detected. If any of the p-values of the features is lower than
the threshold, then this is set to 1. Otherwise is 0. This will depend on the specified
p_val
andcorrection
values. - threshold: final threshold applied to decide if drift is detected. It depends on the specified
p_val
andcorrection
values.
Returns:
Type | Description |
---|---|
dict
|
Dictionary with the drift metrics as specified. |
Source code in mercury/monitoring/drift/chi2_drift_detector.py
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
|
density_drift_detector
DensityDriftDetector(embedding_dim=2, vae=None, encoder=None, decoder=None)
This detector calculates the "probability" of a particular sample of being an anomaly (or presenting drift) with respect to a source reference dataset.
This detector works using a VAE for building embeddings of the input data. Then, with those source embeddings, the density is estimated in such a way that zones in the embeddings space with lots of samples have high density whereas zones with low density represent "less" probable samples.
After having trained the detector it can be used for checking whether a particular (new) sample is an anomaly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
embedding_dim |
int
|
embedding dimensionality for the VAE (if no |
2
|
vae |
tf.keras.Model
|
Optional VAE architecture |
None
|
encoder |
tf.keras.Model
|
encoder part from |
None
|
decoder |
tf.keras.Model
|
decoder part from |
None
|
Example
>>> from mercury.monitoring.drift.density_drift_detector import DensityDriftDetector
>>> detector = DensityDriftDetector().fit(X_source)
Source code in mercury/monitoring/drift/density_drift_detector.py
35 36 37 38 39 40 41 42 43 44 45 46 47 |
|
explain(x, ref_point=None)
Explains what features have to change in a certain sample (or batch of samples)
x so it becomes a ref_point
in the embedding space. You can use this mehod when
you have "anomalies" and what to check what feeatures need to change for the point
to be an inlier.
If ref_point
is None, the considered reference point will be the point in the
embedding space with highest density (i.e. the most normal one).
The returned explanation will be an array with dim(explanation) = dim(x) in which each item points the direction and amount each feature needs to change (also known as delta).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
np.ndarray
|
datapoint (or batch of datapoints) to explain |
required |
ref_point |
Optional[np.ndarray]
|
Reference point coords in the embedding space |
None
|
Returns:
Type | Description |
---|---|
np.ndarray
|
numpy array(s) with explanation(s) per item. |
Source code in mercury/monitoring/drift/density_drift_detector.py
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
|
fit(source, epochs=None, batch_size=128)
Fits the model
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
Union[np.ndarray, pd.DataFrame]
|
Reference dataset |
required |
epochs |
int
|
Number of epochs to train the model. If None, early stopping will be used. |
None
|
batch_size |
int
|
batch_size |
128
|
Returns:
Type | Description |
---|---|
self, detector trained |
Source code in mercury/monitoring/drift/density_drift_detector.py
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
|
predict(X)
Predicts the "probability" of each sample in X being an inlier. That is, its similarity to the reference points seen during training.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X |
np.ndarray
|
dataset |
required |
Returns:
Type | Description |
---|---|
np.ndarray
|
numpy array with densities |
Source code in mercury/monitoring/drift/density_drift_detector.py
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
|
predict_embeddings(target)
Gets the embeddings predicted by the VAE's encoder.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target |
dataset |
required |
Returns:
Type | Description |
---|---|
np.ndarray
|
numpy array with embeddings |
Source code in mercury/monitoring/drift/density_drift_detector.py
139 140 141 142 143 144 145 146 147 148 149 150 151 |
|
domain_classifier_drift_detector
DomainClassifierDrift(X_src=None, X_target=None, features=None, p_val=0.05, test_size=0.3, n_runs=10, alpha=0.025, **kwargs)
Bases: BaseBatchDriftDetector
This class is a drift detector that allows us to train a classifier (Random Forest) to distinguish between a
source dataset X_src
and a target dataset X_target
. The source and target datasets are specified when creating
the object together with other available parameters. Then the method calculate_drift()
is used to obtain a dict
of metrics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X_src |
np.array
|
The source dataset. |
None
|
X_target |
np.array
|
The target dataset. |
None
|
features |
list
|
The name of the features of |
None
|
p_val |
float
|
Threshold to be used with the p-value obtained to decide if there is drift or not (affects
|
0.05
|
test_size |
float
|
When training the domain classifier, this parameter indicates the size of the test set of the domain
classifier. Since the metric |
0.3
|
n_runs |
int
|
This is the number of times that the domain classifier is trained with different splits. The returned metrics correspond to the averages of different runs. A higher number of runs will produce more accurate metrics, but it will be computationally more expensive. |
10
|
alpha |
float
|
This parameter impacts directly to the false drift detection rate. Concretely, it specifies how much better the AUC of the domain classifier has to be than 0.5 (as a random classifier) to consider that successfully distinguishes samples from the source dataset and target dataset. Increasing the value will be more conservative when detecting drift, so it will lower the false positive detection rate. However, higher values also can miss true drift detection. Default value is 0.025 which in general a good trade-off. |
0.025
|
**kwargs |
The rest of the parameters will be used in the Random forest trained as domain classifier. If not specified, then the default parameters will be used. |
{}
|
Example
>>> from mercury.monitoring.drift.domain_classifier_drift import DomainClassifierDrift
# Create example datasets
>>> X_src = np.array([np.random.normal(0, 1, 1000), np.random.normal(0, 1, 1000)]).T
>>> X_target = np.array([np.random.normal(0, 1, 500) + 2, np.random.normal(0, 1, 500)]).T
# Calculate drift
>>> drift_detector = DomainClassifierDrift(
... X_src, X_target, features=["f1", "f2"], p_val=0.01, test_size=0.3, n_runs=10)
>>> drift_metrics = drift_detector.calculate_drift()
# Print the metrics
>>> print("Drift Score: ", drift_metrics["score"])
>>> print("p_val: ", drift_metrics["p_val"])
>>> print("Is drift detected? ", drift_metrics["drift_detected"])
# Get drift scores per feature
>>> drift_detector.plot_feature_drift_scores(figsize=(8,4))
Source code in mercury/monitoring/drift/domain_classifier_drift_detector.py
67 68 69 70 71 72 73 |
|
calculate_drift(return_drift_score_target=False)
Calculates the drift metrics. The returned drift metrics are in a dictionary with the next keys:
- drift_detected: indicates if drift is detected or not, comparing the indicated
p_val
and the obtained p-value. The obtained p-value is based on a binomial test that takes into account the number of times that the AUC of the domain classifier was higher than random guesses. - drift_score: this is the average AUC obtained by the domain classifier over the different runs.
If the parameter return_drift_score_target
is True, then it also returns a dataframe with a drift score for
each sample in the target dataset. This score is the predicted probability by the domain classifier and can
help to detect the drifted samples.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
return_drift_score_target |
boolean
|
Indicates if additionally return a dataframe with a drift score for each sample in the target dataset |
False
|
Returns:
Type | Description |
---|---|
dict
|
Dictionary with the drift metrics. If parameter |
returns a dataframe with the drift scores for the target dataset. |
Source code in mercury/monitoring/drift/domain_classifier_drift_detector.py
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
|
drift_simulation
BatchDriftGenerator(X, schema=None)
This is a simulator for applying synthetic perturbations to a given dataset, so real life drift situations can be generated.
It allows generating several typical drift-like perturbations. In particular, it has the
following ones implemented
1) Shift drift: Each one of the numerical features will be shifted by an amount N. With this,
you will be able to simulate situations where a particular feature increases of decreases.
2) Recodification drift: This operations simulates the situation where a particular feature
changes its codification (due to some change in a data ingestion pipeline). For example, imagine
a feature codified as [A, B, C] and, for some reason, category "C" becomes "A" and vice versa.
3) Missing value drift: Artificially increases the percentage of NaNs on a given feature or
list of features.
4) Hyperplane rotation drift: This method operates by modifying a set of features at once but
attempting to preserve the relationships they have. In other terms, it tries to change the
joint distribution of N continuous variables at once.
5) Outliers drift: This method artificially introduces outliers in the specified columns. You
can use one of the existing methods to inject outliers or create your own method. Check the
method documentation for outliers_drift
for further documentation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X |
Union[pd.DataFrame, np.ndarray]
|
Dataset features |
required |
names_categorical |
An optional list with the names of the categorical variables. If given, |
required |
Source code in mercury/monitoring/drift/drift_simulation.py
39 40 41 42 43 44 45 46 47 48 |
|
__get_rotation_matrix(n, deg)
Returns a rotation matrix for a R^n space.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n |
int
|
Dimensionality of the space |
required |
deg |
float
|
amount of rotation to be applied. In degrees. |
required |
Returns:
Type | Description |
---|---|
np.ndarray
|
nxn dimensional rotation matrix |
Source code in mercury/monitoring/drift/drift_simulation.py
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
|
__remap_categorical(df, colname)
Randomly shuffles the categories of a particular column
Source code in mercury/monitoring/drift/drift_simulation.py
243 244 245 246 247 248 249 250 |
|
hyperplane_rotation_drift(cols=None, force=10.0)
Adds drift to several features at once by rotating them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cols |
List[str]
|
columns which will be jointly rotated. |
None
|
force |
float
|
rotation amount in degrees (0 or 360 = no transformation) |
10.0
|
Returns:
Type | Description |
---|---|
BatchDriftGenerator
|
An updated version of the simulator. |
Source code in mercury/monitoring/drift/drift_simulation.py
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
|
missing_val_drift(cols=None, percent=0.1)
Randomly injects NaN values in a column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cols |
List[str]
|
Columns where this operation will be performed |
None
|
percent |
Union[float, List[float]]
|
Percentage of values which will become Null values. |
0.1
|
Returns:
Type | Description |
---|---|
BatchDriftGenerator
|
An updated version of the simulator. |
Source code in mercury/monitoring/drift/drift_simulation.py
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
|
outliers_drift(cols=None, method='percentile', method_params=None)
Injects outliers in cols
using the method specified in method
. Parameters of the method can
be specified using method_params
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cols |
List[str]
|
List of columns names to generate outliers. |
None
|
method |
Union[str, Callable]
|
string of the method to generate to use or custom function to generate outliers. If
using string, the current available method are 'percentile' and 'value'. If using 'percentile'
values in the dataframe are replaced by a percentile of that column. The percentile to use is
specified in key 'percentile' in |
'percentile'
|
method_params |
dict
|
dictionary containing the parameters to use in |
None
|
Returns:
Type | Description |
---|---|
BatchDriftGenerator
|
An updated version of the simulator. |
Source code in mercury/monitoring/drift/drift_simulation.py
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
|
recodification_drift(cols=None)
Adds drift by randomly remapping the categories of cols. For example, if you have a column with domain: ["A", "B", "C"], then, this operation would swap all the "A" with "C" (and vice versa).
Note the swaps are random. The same remapping will not be repeated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cols |
List[str]
|
List of columns names to be recoded |
None
|
Returns:
Type | Description |
---|---|
BatchDriftGenerator
|
An updated version of the simulator. |
Source code in mercury/monitoring/drift/drift_simulation.py
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
|
scale_drift(cols=None, mean=1, sd=0.05, iqr=None)
Adds drift to one or several features by multiplying then by some random factor (normally close to 1).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cols |
List[str]
|
List of columns names where the operation will be performed |
None
|
mean |
The mean value of the random scale (if iqr is not given). Default is 1. |
1
|
|
sd |
The standard deviation of the random scale (if iqr is not given). Default is 0.05. |
0.05
|
|
iqr |
An alternative way to define the random scale is by giving the interquartile range as a list of 2 elements. E.g. [1, 1.2] |
None
|
Returns:
Type | Description |
---|---|
BatchDriftGenerator
|
An updated version of the simulator. |
Source code in mercury/monitoring/drift/drift_simulation.py
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
|
shift_drift(cols=None, force=5.0, noise=0.0)
Adds drift to one or several features by adding or substracting a force
quantity. You could also add random noise to these shifts via the noise
parameter. The shifted amount will follow a gaussian distribution with mean
force
and std noise
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cols |
List[str]
|
List of columns names where the operation will be performed |
None
|
force |
Amount shift each column will receive |
5.0
|
|
noise |
Amount of standard deviation the shift of each column will receive. |
0.0
|
Returns:
Type | Description |
---|---|
BatchDriftGenerator
|
An updated version of the simulator. |
Source code in mercury/monitoring/drift/drift_simulation.py
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
|
histogram_distance_drift_detector
HistogramDistanceDrift(distr_src=None, distr_target=None, features=None, correction=None, p_val=None, n_permutations=50, distance_metric='hellinger')
Bases: BaseBatchDriftDetector
The HistogramDistanceDrift class allows us to detect drift in the source dataset distributions distr_src
and the
target dataset distributions distr_target
by calculating distances between the distributions. For each feature,
the distance between its distributions in distr_src
and distr_target
is calculated. The distance function to
use is indicated by the distance_metric
parameter. For each feature, a p-value is also calculated. This is
calculated by doing a permutation test, where data from distr_src
and distr_target
are put together and sampled
to calculate a new simulated distance n_permutations
times. The p-value then is obtained by computing
the fraction of simulated distances equal or greater than the observed distance. The source and target distributions
are specified when creating the object. Then, the method calculate_drift()
is used to obtain a dict of metrics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
distr_src |
list of np.arrays
|
The source dataset distributions. It's a list of np.arrays, where each np.array represents the counts of
each bin for that feature. The features and its bins must be in the same order than the |
None
|
distr_target |
list of np.arrays
|
The target dataset distributions. It's a list of np.arrays, where each np.array represents the counts of
each bin for that feature. The features and its bins must be in the same order than the |
None
|
features |
list
|
The name of the features of |
None
|
distance_metric |
string
|
The distance metric to use. Available metrics are "hellinger", "jeffreys" or "psi". Default value is "hellinger" |
'hellinger'
|
p_val |
float
|
Threshold to be used with the p-value obtained to decide if there is drift or not (affects
|
None
|
correction |
string
|
Since multiple tests are performed, this parameter controls whether to perform a correction of the p-value
or not (affects |
None
|
n_permutations |
int
|
The number of permutations to perform to calculate the p-value. A higher number will calculate a more accurate p-value, but it will be computationally more expensive. Default value is 50 |
50
|
Example
>>> from mercury.monitoring.drift.histogram_distance_drift_detector import HistogramDistanceDrift
# Create example distributions
>>> histograms_src = [np.array([5, 5, 5]), np.array([3, 6, 9])]
>>> histograms_target = [np.array([4, 1, 8]), np.array([20, 3, 2])]
# Calculate drift
>>> drift_detector = HistogramDistanceDrift(
... distr_src=histograms_src, distr_target=histograms_target, features=["f1", "f2"],
... distance_metric="hellinger", correction="bonferroni", p_val=0.01, n_permutations=100)
>>> drift_metrics = drift_detector.calculate_drift()
# Print the metrics
>>> print("Drift Score: ", drift_metrics["score"])
>>> print("Is drift detected? ", drift_metrics["drift_detected"])
# Plot feature drift scores
>>> ax = drift_detector.plot_feature_drift_scores(top_k=7)
# Get drifted features
>>> print(drift_detector.get_drifted_features())
# Plot histograms of drifted features
>>> drift_detector.plot_histograms_drifted_features()
Source code in mercury/monitoring/drift/histogram_distance_drift_detector.py
69 70 71 72 73 74 75 76 77 |
|
calculate_drift()
Computes drift by calculating histogram-based distances between the source dataset distributions distr_src
and the target dataset distributions distr_target
. It calculates the distance individually for each feature
and obtains the p-values and distance for each.
The returned dictionary contains the next metrics:
- p_vals: np.array of the p-values returned by the permutation test for each feature.
- scores: np.array of the distances for each feature.
- score: average of all the distances.
- drift_detected: final decision if drift is detected. If any of the p-values of the features is lower than
the threshold, then this is set to 1. Otherwise is 0. This will depend on the specified
p_val
andcorrection
values. - threshold: final threshold applied to decide if drift is detected. It depends on the specified
p_val
andcorrection
values.
Returns:
Type | Description |
---|---|
dict
|
Dictionary with the drift metrics as specified. |
Source code in mercury/monitoring/drift/histogram_distance_drift_detector.py
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
|
ks_drift_detector
KSDrift(X_src=None, X_target=None, features=None, correction='bonferroni', p_val=0.05)
Bases: BaseBatchDriftDetector
This class is a drift detector that allows us to detect drift in a source dataset X_src
and a target dataset
X_target
by calculating the Kolmogorov-Smirnov (KS) statistic. For each feature in the datasets, a
Kolmogorov-Smirnov test is performed, which calculates the distance between the feature using the samples of each
dataset. A p-value is also obtained when performing the tests, which are used to decide if exists drift in the
datasets. The source and target datasets are specified when creating the object together with other available
parameters. Then the method calculate_drift()
is used to obtain a dict of metrics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X_src |
np.array
|
The source dataset. |
None
|
X_target |
np.array
|
The target dataset. |
None
|
features |
list
|
The name of the features of |
None
|
p_val |
float
|
Threshold to be used with the p-value obtained to decide if there is drift or not (affects
|
0.05
|
correction |
string
|
Since multiple tests are performed, this parameter controls whether to perform a correction of the p-value
or not (affects |
'bonferroni'
|
Example
>>> from mercury.monitoring.drift.ks_drift_detector import KSDrift
# Create example datasets
>>> X_src = np.array([np.random.normal(0, 1, 1000), np.random.normal(0, 1, 1000)]).T
>>> X_target = np.array([np.random.normal(0, 1, 500) + 2, np.random.normal(0, 1, 500)]).T
# Calculate drift
>>> drift_detector = KSDrift(X_src, X_target, features=["f1", "f2"], p_val=0.01, correction="bonferroni")
>>> drift_metrics = drift_detector.calculate_drift()
# Print the metrics
>>> print("Drift Score: ", drift_metrics["score"])
>>> print("Is drift detected? ", drift_metrics["drift_detected"])
# Plot feature drift scoress
>>> ax = drift_detector.plot_feature_drift_scores(top_k=7)
# Get drifted features
>>> print(drift_detector.get_drifted_features())
# Plot distributions of drifted features
>>> drift_detector.plot_distribution_drifted_features()
Source code in mercury/monitoring/drift/ks_drift_detector.py
56 57 |
|
calculate_drift()
Computes Drift using the Kolmogorov-Smirnov KS Test, which calculates the largest difference of the cumulative density functions of two variables. It performs the test individually for each feature and obtains the p-values and the statistics for each feature. The returned dictionary contains the next metrics:
- p_vals (np.array): the p-values returned by the KS test for each feature.
- scores (np.array): the distances (ks statistic) return by the KS test for each feature.
- score: average of all the distances.
- drift_detected: final decision if drift is detected. If any of the p-values of the features is lower than
the threshold, then this is set to 1. Otherwise is 0. This will depend on the specified
p_val
andcorrection
values. - threshold: final threshold applied to decide if drift is detected. It depends on the specified
p_val
andcorrection
values.
Returns:
Type | Description |
---|---|
dict
|
Dictionary with the drift metrics as specified. |
Source code in mercury/monitoring/drift/ks_drift_detector.py
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
|
metrics
bootstrap_eval_drift(y_true_src=None, y_pred_src=None, y_true_target=None, y_pred_target=None, eval_fn=None, resample_size_src=None, resample_size_target=None, num_resamples=50, drift_detector=None)
This function performs resampling with replacement for a source set and a target set to obtain
a distribution of some evaluation metric for both sets. Then, applies a drift detector to calculate
the drift of the metric between the source and target sets. More concretely, first performs resampling
with replacement in the arrays y_true_src
and y_pred_src
and calculates a metric using the
eval_fn
function in order to obtain a metric for each resample. Thus, we obtain a distribution of a
metric for the source set. Next, it performs the same procedure with the arrays y_true_target
and
y_pred_target
, which gives a distribution of the metric for the target dataset. Finally, applies
a drift detector (KSDrift if not specified) in order to calculate the drift between the source and
target obtained distributions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
y_true_src |
np.array
|
array which will be the first argument when calling |
None
|
y_pred_src |
np.array
|
array which will be the second argument when calling |
None
|
y_true_target |
np.array
|
array which will the first argument when calling |
None
|
y_pred_target |
np.array
|
array which will be the second argument when calling |
None
|
eval_fn |
Callable
|
function that must return a float. For the source dataset, receives a resamples
of |
None
|
resample_size_src |
int
|
size of each resample for the source dataset. If None, length of the source. |
None
|
resample_size_target |
int
|
size of each resample for the target dataset. If None, length of the target. |
None
|
num_resamples |
int
|
number of resamples |
50
|
drift_detector |
drift detector to use. If None it will create a KSDrift object |
None
|
Returns:
Name | Type | Description |
---|---|---|
drift_metrics |
dict
|
dictionary with the obtained drift metrics |
drift_detector |
BaseBatchDriftDetector
|
drift detector object used to calculate drift |
dist_src |
np.array
|
obtained metrics distribution for the source dataset |
dist_target |
np.array
|
obtained metrics distribution for the target dataset |
Example
>>> y_true_src = np.array([0,0,0,0,0,1,1,1,1,1])
>>> y_pred_src = np.array([0,0,0,0,1,0,1,1,1,1])
>>> y_true_target = np.array([0,0,0,0,0,1,1,1,1,1])
>>> y_pred_target = np.array([0,1,1,1,1,0,0,0,0,1])
>>> def eval_acc(y_true, y_pred):
... return np.sum(y_true == y_pred) / y_true.shape[0]
>>> drift_metrics, drift_detector, dist_src, dist_target = bootstrap_eval_drift(
... y_true_src=y_true_src, y_pred_src=y_pred_src, y_true_target=y_true_target, y_pred_target=y_pred_target,
... resample_size_src=8, resample_size_target=8, num_resamples=1000,
... eval_fn=eval_acc, drift_detector=None
... )
>>> drift_metrics["drift_detected]
True
>>> drift_metrics["score"]
0.943
Source code in mercury/monitoring/drift/metrics.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
|
hellinger_distance(p, q, normalize=True)
Computes Hellinger distance between two histograms as specified in this paper: http://users.rowan.edu/~polikar/RESEARCH/PUBLICATIONS/cidue11.pdf Both histograms are represented by numpy arrays with the same number of dimensions, where each dimension represents the counts for that particular bin. It is assumed that the bin edges are the same.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
p |
np.array
|
First histogram. Each dimension represents one bin and bin edges are assumed to be the same as in q. |
required |
q |
np.array
|
Second histogram. Each dimension represents one bin and bin edges are assumed to be the same as in p. |
required |
normalize |
bool
|
Whether to normalize the histograms. If True the proportions of each bin are calculated first and then the distance is calculated with the proportions |
True
|
Returns |
(float): float representing the Hellinger distance |
required |
Source code in mercury/monitoring/drift/metrics.py
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
|
jeffreys_divergence(p, q, normalize=True)
Computes Jeffreys divergence between two histograms as specified in this paper: https://www.cs.cmu.edu/~efros/courses/LBMV07/Papers/rubner-jcviu-00.pdf Both histograms are represented by numpy arrays with the same number of dimensions, where each dimension represents the counts for that particular bin. It is assumed that the bin edges are the same.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
p |
np.array
|
First histogram. Each dimension represents one bin and bin edges are assumed to be the same as in q. |
required |
q |
np.array
|
Second histogram. Each dimension represents one bin and bin edges are assumed to be the same as in p. |
required |
normalize |
bool
|
Whether to normalize the histograms. If True the proportions of each bin are calculated first and then the distance is calculated with the proportions |
True
|
Returns |
(float): float representing the hellinger distance |
required |
Source code in mercury/monitoring/drift/metrics.py
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
|
psi(p, q, normalize=True, eps=0.0001)
Calculates the Population Stability Index (PSI). The metric helps to measure the stability between two population samples. It assumes that the two population samples have already been splitted in bins, so the histograms are the input to this function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
p |
np.array
|
First histogram. Each dimension represents one bin and bin edges are assumed to be the same as in q. |
required |
q |
np.array
|
Second histogram. Each dimension represents one bin and bin edges are assumed to be the same as in p. |
required |
normalize |
bool
|
Whether to normalize the histograms. If True the proportions of each bin are calculated first and then the distance is calculated with the proportions |
True
|
Returns:
Type | Description |
---|---|
float
|
float representing the PSI |
Example
>>> a = np.array([12, 11, 14, 12, 12, 10, 12, 6, 6, 5])
>>> b = np.array([11, 11, 12, 13, 11, 11, 13, 5, 7, 6])
>>> psi = psi(a, b)
Source code in mercury/monitoring/drift/metrics.py
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
|