Skip to content

Estimation

mercury.monitoring.estimation

performance_predictor

PerformancePredictor(model, metric_fn, corruptions=None, percentiles=None, performance_predictor=None, param_grid=None, K_cv=5, random_state=None, store_train_data=False)

This class allow us to estimate the performance of a model on an unlabeled dataset, for example to monitor performance in production data when we still don't have the labels. The method is based on the paper Learning to Validate the Predictions of Black Box Classifiers on Unseen Data. In a nutshell, the steps of the method are:

1) Apply corruptions to a held-out (labeld) dataset 2) Obtain percentiles of model outputs and the performance of the model when applying these corruptions 3) Train a regressor model to predict model performance. The samples to train this regressor model are the percentiles and performances obtained in 2) 4) Use the trained regressor to estimate the performance on serving unlabeled data

According to the paper, the method works well when: 1) We have a situation of covariate (changes in input data distributions) and 2) We know in advance what kind of covariate shift we can find in our serving data. However, in our experiments we have found that in some situations the method still works when the data also suffers from label shift. At the same time, it is important to mention that the method is not 100% accurate and cannot detect performance drop in all cases.

Original paper: https://ssc.io/pdf/mod0077s.pdf

Parameters:

Name Type Description Default
model BaseEstimator

The model that we want to estimate the performance

required
metric_fn Callable

Function that calculates the metric that we want to estimate. The function should accept the true labels as first argument and the predictions as the second argument. For example, you can use functions from sklearn.metrics module.

required
corruptions List[Tuple]

Optional list of corruptions to apply in the dataset specified in fit method. If we specify them, we use a list of tuples where each tuple has two elements:

1) A string with the type of drift to apply. 2) A dictionary with the parameters of the drift to apply. For the first element you can use any method available in mercury.monitoring.drift.drift_simulation.BatchDriftGenerator class. In the second element, the parameters are the arguments of the drift function. You can see the tutorial of class or the BatchDriftGenerator documentation for more details. If not specified the corruptions will be added in the fit() method according to the drift detected.

None
percentiles Union[List, array]

np.array or list with percentiles to calculate in model outputs to be used as features in the regressor. By default, the calculated percentiles are [0, 5, 10, ..., 95, 100]

None
performance_predictor BaseEstimator

(unfitted) model to use as regressor. By default it will be a RandomForestRegressor with n_estimators=15

None
param_grid dict

dictionary with the hyperparameters grid that will be used when doing a grid search when training the regressor. By default just the the max_depth of the RandomForestRegressor is tunned.

None
K_cv int

Number of folds to use when doing the GridSearch cross-validation to train the regressor. By default 5 will be used

5
random_state int

random state to use in the RandomForestRegressor. By default is None.

None
store_train_data bool

whether to store the data to train the regressor in the attributes X_train_regressor and y_train_regressor. This can be useful for analysis when performing some experiments of the method. By default is False.

False
Example
>>> model.fit(X_train, y_train)
>>> from mercury.monitoring.estimation.performance_predictor import PerformancePredictor
>>> from sklearn.metrics import accuracy_score
>>> performance_predictor = PerformancePredictor(model, metric_fn=accuracy_score, random_state=42)
>>> performance_predictor.fit(X=df_test[features], y=df_test[label], X_serving=df_serving[features])
Source code in mercury/monitoring/estimation/performance_predictor.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def __init__(
    self,
    model: "BaseEstimator",  # noqa: F821
    metric_fn: Callable,
    corruptions: List[Tuple] = None,
    percentiles: Union[List, np.array] = None,
    performance_predictor: "BaseEstimator" = None,  # noqa: F821
    param_grid: dict = None,
    K_cv: int = 5,
    random_state: int = None,
    store_train_data: bool = False
):
    """
    This class allow us to estimate the performance of a model on an unlabeled dataset, for example to monitor performance
    in production data when we still don't have the labels. The method is based on the paper
    Learning to Validate the Predictions of Black Box Classifiers on Unseen Data. In a nutshell, the steps of the method are:

    1) Apply corruptions to a held-out (labeld) dataset
    2) Obtain percentiles of model outputs and the performance of the model when applying these corruptions
    3) Train a regressor model to predict model performance. The samples to train this regressor model are the percentiles and
        performances obtained in 2)
    4) Use the trained regressor to estimate the performance on serving unlabeled data

    According to the paper, the method works well when: 1) We have a situation of covariate (changes in input data distributions) and
    2) We know in advance what kind of covariate shift we can find in our serving data. However, in our experiments we have found
    that in some situations the method still works when the data also suffers from label shift.
    At the same time, it is important to mention that the method is not 100% accurate and cannot detect performance drop in all cases.

    Original paper:
    https://ssc.io/pdf/mod0077s.pdf

    Args:
        model: The model that we want to estimate the performance
        metric_fn: Function that calculates the metric that we want to estimate. The function should accept the true labels as
            first argument and the predictions as the second argument. For example, you can use functions from sklearn.metrics module.
        corruptions: Optional list of corruptions to apply in the dataset specified in `fit` method.
            If we specify them, we use a list of tuples where each tuple has two elements:

            1) A string with the type of drift to apply.
            2) A dictionary with the parameters of the drift to apply. For the first element you can use any method available in
            mercury.monitoring.drift.drift_simulation.BatchDriftGenerator class. In the second element, the parameters are the
            arguments of the drift function. You can see the tutorial of class or the BatchDriftGenerator documentation for more
            details. If not specified the corruptions will be added in the `fit()` method according to the drift detected.

        percentiles: np.array or list with percentiles to calculate in model outputs to be used as features in the regressor.
            By default, the calculated percentiles are [0, 5, 10, ..., 95, 100]
        performance_predictor: (unfitted) model to use as regressor. By default it will be a RandomForestRegressor with n_estimators=15
        param_grid: dictionary with the hyperparameters grid that will be used when doing a grid search when training the regressor.
            By default just the the max_depth of the RandomForestRegressor is tunned.
        K_cv: Number of folds to use when doing the GridSearch cross-validation to train the regressor. By default 5 will be used
        random_state: random state to use in the RandomForestRegressor. By default is None.
        store_train_data: whether to store the data to train the regressor in the attributes `X_train_regressor` and
            `y_train_regressor`. This can be useful for analysis when performing some experiments of the method. By default is False.

    Example:
        ```python
        >>> model.fit(X_train, y_train)
        >>> from mercury.monitoring.estimation.performance_predictor import PerformancePredictor
        >>> from sklearn.metrics import accuracy_score
        >>> performance_predictor = PerformancePredictor(model, metric_fn=accuracy_score, random_state=42)
        >>> performance_predictor.fit(X=df_test[features], y=df_test[label], X_serving=df_serving[features])
        ```
    """
    self.model = model
    self.metric_fn = metric_fn
    self.corruptions = [] if corruptions is None else corruptions
    self.percentiles = np.arange(0, 101, 5) if percentiles is None else percentiles
    if performance_predictor is None:
        self.performance_predictor_unfitted = RandomForestRegressor(n_estimators=15, criterion='mae', random_state=random_state)
    else:
        self.performance_predictor_unfitted = performance_predictor
    self.param_grid = {'max_depth': np.arange(3, 16, 1), 'criterion': ['absolute_error']} if param_grid is None else param_grid
    self.K_cv = K_cv
    self.store_train_data = store_train_data
    self.performance_predictor = None
_apply_corruption(X, corruption_fn, corruption_args)

apply corruption corruption_fn using corruption_args arguments to X

Source code in mercury/monitoring/estimation/performance_predictor.py
291
292
293
294
295
296
297
298
299
300
301
302
303
def _apply_corruption(self, X, corruption_fn, corruption_args):
    """
    apply corruption `corruption_fn` using `corruption_args` arguments to `X`
    """

    corruption_generator = BatchDriftGenerator(X=X.copy(), schema=self.dataset_schema)
    corruption_gen_fun = getattr(corruption_generator, corruption_fn, None)
    if not callable(corruption_gen_fun):
        raise RuntimeError(
            "corruption_fn = %s must be a method of BatchDriftGenerator."
        )
    corrupted_gen = corruption_gen_fun(**corruption_args)
    return corrupted_gen.data
_create_corruptions_from_data_drift(X_source, X_target)

Creates corruptions by detecting drift between X_source and X_target

Source code in mercury/monitoring/estimation/performance_predictor.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def _create_corruptions_from_data_drift(self, X_source, X_target):
    """
    Creates corruptions by detecting drift between `X_source` and `X_target`
    """

    corruptions = []

    # Numerical Features: Get Features with drift with KSDrift
    continous_feats = self.dataset_schema.continuous_feats + self.dataset_schema.discrete_feats
    if len(continous_feats) > 0:
        ks_drift = KSDrift(X_src=X_source[continous_feats].values, X_target=X_target[continous_feats].values, features=continous_feats)
        drift_result = ks_drift.calculate_drift()
        # Apply different kinds of drift for continuos features
        for feat in ks_drift.get_drifted_features():

            # Shift Drift
            corruptions.extend(self._create_shift_drift(X_source, X_target, feat))

            # Scale Drift
            corruptions.extend(self._create_scale_drift(feat))

            # Outliers Drift
            corruptions.extend(self._create_outliers_drift(feat))

        # Hyperplane Rotation Drift
        if len(ks_drift.get_drifted_features()) > 0:
            corruptions.extend(self._create_hyperplane_rotation_drift(ks_drift.get_drifted_features()))

    # Categorical Features: Chi-Square Drift
    cat_feats = self.dataset_schema.binary_feats + self.dataset_schema.categorical_feats
    if len(cat_feats) > 0:
        src_histograms, tgt_histograms = _get_histogram_categoricals(
            X_source[cat_feats], X_target[cat_feats], cat_feats
        )
        chi2_drift = Chi2Drift(
            distr_src=src_histograms,
            distr_target=tgt_histograms,
            features=cat_feats
        )
        drift_result = chi2_drift.calculate_drift()
        for feat in chi2_drift.get_drifted_features():

            # Recodification drift
            for i in range(min(X_source[feat].nunique(), 10)):
                corruptions.append(('recodification_drift', {'cols': [feat]}))

    return corruptions
_create_hyperplane_rotation_drift(features, num_drifts=20)

Returns list with hyperplane drift specifications

Source code in mercury/monitoring/estimation/performance_predictor.py
278
279
280
281
282
283
284
285
286
287
288
289
def _create_hyperplane_rotation_drift(self, features, num_drifts=20):
    """
    Returns list with hyperplane drift specifications
    """
    hyperplane_drift_corruptions = []
    for force in np.linspace(0, 90, num=num_drifts):
        drift_args = {
            'cols': features,
            'force': force
        }
        hyperplane_drift_corruptions.append(('hyperplane_rotation_drift', drift_args))
    return hyperplane_drift_corruptions
_create_outliers_drift(feature)

Returns list with outliers drift specifications

Source code in mercury/monitoring/estimation/performance_predictor.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def _create_outliers_drift(self, feature):
    """
    Returns list with outliers drift specifications
    """
    outliers_drift_corruptions = []
    for perc in [0.05, 0.95]:
        for proportion in [0.25, 0.5, 0.75]:
            drift_args = {
                'cols': [feature],
                'method': 'percentile',
                'method_params': {
                    'percentile': perc,
                    'proportion': proportion
                }
            }
            outliers_drift_corruptions.append(('outliers_drift', drift_args))
    return outliers_drift_corruptions
_create_scale_drift(feature)

Returns list with scale drift specifications

Source code in mercury/monitoring/estimation/performance_predictor.py
246
247
248
249
250
251
252
253
254
255
256
257
258
def _create_scale_drift(self, feature):
    """
    Returns list with scale drift specifications
    """
    scale_drift_corruptions = []
    forces = [0.1, 0.5, 0.8, 1.2, 1.5, 2, 2.5, 3, 5, 10, 20, 100]
    for f in forces:
        drift_args = {
            'cols': [feature],
            'mean': f
        }
        scale_drift_corruptions.append(('scale_drift', drift_args))
    return scale_drift_corruptions
_create_shift_drift(X_source, X_target, feature)

Returns list with shift drift specifications

Source code in mercury/monitoring/estimation/performance_predictor.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def _create_shift_drift(self, X_source, X_target, feature):
    """
    Returns list with shift drift specifications
    """
    shift_drift_corruptions = []

    diff_q95 = X_target[feature].quantile(0.95) - X_source[feature].median()
    diff_q05 = X_target[feature].quantile(0.05) - X_source[feature].median()
    forces_neg = np.linspace(diff_q05, 0, num=10)
    forces_pos = np.linspace(0, diff_q95 , num=10)
    forces = list(forces_neg) + list(forces_pos)
    noises = [X_source[feature].std()] * len(forces)
    for force, noise in zip(forces, noises):
        drift_args = {
            'cols': [feature],
            'force': force,
            'noise': noise
        }
        shift_drift_corruptions.append(('shift_drift', drift_args))

    return shift_drift_corruptions
_fit_performance_predictor(X, y)

Fit the performance predictor using the GridSearchCV

Source code in mercury/monitoring/estimation/performance_predictor.py
335
336
337
338
339
340
341
342
343
344
345
def _fit_performance_predictor(self, X, y):
    """
    Fit the performance predictor using the GridSearchCV
    """

    self.performance_predictor = GridSearchCV(
        self.performance_predictor_unfitted,
        param_grid=self.param_grid,
        cv=self.K_cv,
        scoring='neg_mean_absolute_error')\
        .fit(X, y)
_generate_schema(X, dataset_schema, names_categorical=None)

Generates the dataset schema if not specified and stores it dataset_schema attirbute

Source code in mercury/monitoring/estimation/performance_predictor.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def _generate_schema(self, X, dataset_schema, names_categorical=None):
    """
    Generates the dataset schema if not specified and stores it `dataset_schema` attirbute
    """
    if dataset_schema is not None:
        self.dataset_schema = dataset_schema
    elif names_categorical is not None:
        self.dataset_schema = DataSchema().generate_manual(
            dataframe=X,
            categ_columns=names_categorical,
            discrete_columns=[],
            binary_columns=[]
        )
    else:
        self.dataset_schema = DataSchema().generate(X, verbose=False)
_get_statistics_model_outputs(X)

Obtains percentiles of model outputs

Source code in mercury/monitoring/estimation/performance_predictor.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def _get_statistics_model_outputs(self, X):
    """
    Obtains percentiles of model outputs
    """

    try:
        # Try predict_proba.
        y_hat = self.model.predict_proba(X)
    except AttributeError:
        # If predict_proba is not defined in model, do predict (eg. for regression and tf models)
        y_hat = self.model.predict(X)

    if len(y_hat.shape) == 2:
        # classification case
        if y_hat.shape[1] == 2:
            # Binary Classification. Just take percentiles of positive class
            statistics = np.percentile(y_hat[:, 1], q=self.percentiles)
        else:
            # Compute Percentiles for all classes
            l_percentiles = []
            for i in range(y_hat.shape[1]):
                l_percentiles.append(
                    np.percentile(y_hat[:, i], q=np.arange(0, 101, 5))
                )
            statistics = np.array(l_percentiles).flatten()
    else:
        statistics = np.percentile(y_hat, q=self.percentiles)

    return statistics
fit(X, y, dataset_schema=None, names_categorical=None, X_serving=None)

Fits the regressor to predict the performance using a dataset not used as training data.

Parameters:

Name Type Description Default
X DataFrame

Pandas dataframe with the inputs of our model. It should be a held-out dataset not used to train the model

required
y Union[DataFrame, array]

corresponding labels of X

required
dataset_schema DataSchema

a DataSchema object. If not passed, it is created automatically

None
names_categorical list

list of categorical columns. Only used if dataset_schema is not specified. In that case, it will take this list as categorical columns

None
X_serving DataFrame

optional dataframe with the serving data (without labels). If specified, it will detect drift between X and X_serving and the corruptions will be added based on that drift.

None
Source code in mercury/monitoring/estimation/performance_predictor.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def fit(
    self,
    X: "pandas.DataFrame",  # noqa: F821
    y: Union["pandas.DataFrame", "np.array"],  # noqa: F821
    dataset_schema: "mercury.dataschema.DataSchema" = None,  # noqa: F821
    names_categorical: list = None,
    X_serving: "pandas.DataFrame" = None  # noqa: F821
):
    """
    Fits the regressor to predict the performance using a dataset not used as training data.

    Args:
        X: Pandas dataframe with the inputs of our model. It should be a held-out dataset not used to train the model
        y: corresponding labels of `X`
        dataset_schema: a DataSchema object. If not passed, it is created automatically
        names_categorical: list of categorical columns. Only used if `dataset_schema` is not specified. In that case, it
            will take this list as categorical columns
        X_serving: optional dataframe with the serving data (without labels). If specified, it will detect drift between
            `X` and `X_serving` and the corruptions will be added based on that drift.
    """

    # Generate Schema
    self._generate_schema(X, dataset_schema, names_categorical)

    # if X_serving is passed, then add new error generators based on data drift
    if X_serving is not None:
        self.corruptions.extend(self._create_corruptions_from_data_drift(X, X_serving))

    # if we have very few corruptions (less than param K_cv), raise a warning a create scale drift
    if len(self.corruptions) <= self.K_cv:
        warnings.warn(
            "Very few corruptions have been specified or created from data drift. "
            "Scale drift will be added for all features individually."
        )
        continous_feats = self.dataset_schema.continuous_feats + self.dataset_schema.discrete_feats
        for f in continous_feats:
            self.corruptions.extend(self._create_scale_drift(feature=f))

    X_train_regressor = []
    y_train_regressor = []
    for corruption in self.corruptions:

        corruption_fn = corruption[0]
        corruption_args = corruption[1]
        # Apply drift generator
        X_corrupt = self._apply_corruption(X, corruption_fn, corruption_args)

        # Score on corrupted examples
        y_hat_corrupt = self.model.predict(X_corrupt)
        score_corrupt = self.metric_fn(y, y_hat_corrupt)

        # Statitics of model outputs (percentiles)
        statistics_outputs = self._get_statistics_model_outputs(X_corrupt)

        # Add data point to samples for performance predictor
        X_train_regressor.append(statistics_outputs)
        y_train_regressor.append(score_corrupt)

    # Train performance predictor regressor
    self._fit_performance_predictor(X_train_regressor, y_train_regressor)

    # Store performance predictor trai data if specified
    if self.store_train_data:
        self.X_train_regressor = X_train_regressor
        self.y_train_regressor = y_train_regressor

    return self
predict(X_serving)

Returns the estimated performance on X_serving

Source code in mercury/monitoring/estimation/performance_predictor.py
347
348
349
350
351
352
353
354
355
356
357
def predict(self, X_serving):
    """
    Returns the estimated performance on `X_serving`
    """

    # Statistics Model Outputs
    statistics_outputs = self._get_statistics_model_outputs(X_serving)

    # Predict Score on Serving Data
    predicted_score = self.performance_predictor.predict(statistics_outputs.reshape(1, -1))
    return predicted_score