Skip to content

Base Tests

mercury.explainability.explainers

run_until_timeout(timeout, fn, *args, **kwargs)

Timeout function to stop the execution in case it takes longer than the timeout argument. After timeout seconds it will raise an exception.

Parameters:

Name Type Description Default
timeout

Number of seconds until the Exception is raised.

required
fn

Function to execute.

required
*args

args of the function fn.

()
**kwargs

keyword args passed to fn.

{}
Example

explanation = run_until_timeout(timeout, explainer.explain, data=data)

Source code in mercury/explainability/explainers/__init__.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def run_until_timeout(timeout, fn, *args, **kwargs):
    """
    Timeout function to stop the execution in case it takes longer than the timeout argument.
    After timeout seconds it will raise an exception.

    Args:
        timeout: Number of seconds until the Exception is raised.
        fn: Function to execute.
        *args: args of the function fn.
        **kwargs: keyword args passed to fn.

    Example:
        >>> explanation = run_until_timeout(timeout, explainer.explain, data=data)

    """
    def signal_handler(signum, frame):
        raise Exception('Timeout: explanation took too long...')

    if timeout > 0:
        signal.signal(signal.SIGALRM, signal_handler)
        signal.alarm(timeout)
    else:
        signal.alarm(0)
    return fn(*args, **kwargs)

ale

ALEExplainer(predictor, target_names)

Bases: Explainer, MercuryExplainer

Accumulated Local Effects for tabular datasets. Current implementation supports first order feature effects of numerical features.

Parameters:

Name Type Description Default
predictor Callable

A callable that takes in an NxF array as input and outputs an NxT array (N - number of data points, F - number of features, T - number of outputs/targets (e.g. 1 for single output regression, >=2 for classification).

required
target_names Union[List[str], str]

A list of target/output names used for displaying results.

required
Source code in mercury/explainability/explainers/ale.py
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self, predictor: TP.Callable, target_names: TP.Union[TP.List[str], str]) -> None:
    super().__init__(meta=copy.deepcopy(DEFAULT_META_ALE))

    if (not isinstance(target_names, list) and
            not isinstance(target_names, str)):
        raise AttributeError('The attribute target_names should be a string or list.')

    if type(target_names) == str:
        target_names = [target_names]

    self.predictor = predictor
    self.target_names = target_names
explain(X, min_bin_points=4, ignore_features=[])

Calculate the ALE curves for each feature with respect to the dataset X.

Parameters:

Name Type Description Default
X DataFrame

An NxF tabular dataset used to calculate the ALE curves. This is typically the training dataset or a representative sample.

required
min_bin_points int

Minimum number of points each discretized interval should contain to ensure more precise ALE estimation.

4
ignore_features list

Features that will be ignored while computing the ALE curves. Useful for reducing computing time if there are predictors we dont care about.

[]

Returns:

Type Description
Explanation

An Explanation object containing the data and the metadata of the calculated ALE curves.

Source code in mercury/explainability/explainers/ale.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def explain(self, X: pd.DataFrame, min_bin_points: int = 4, ignore_features: list = []) -> Explanation:
    """
    Calculate the ALE curves for each feature with respect to the dataset `X`.

    Args:
        X:
            An NxF tabular dataset used to calculate the ALE curves. This is typically the training dataset
            or a representative sample.
        min_bin_points:
            Minimum number of points each discretized interval should contain to ensure more precise
            ALE estimation.
        ignore_features:
            Features that will be ignored while computing the ALE curves. Useful for reducing computing time
            if there are predictors we dont care about.

    Returns:
        An `Explanation` object containing the data and the metadata of the calculated ALE curves.

    """
    self.meta['params'].update(min_bin_points=min_bin_points)

    if not isinstance(X, pd.DataFrame):
        raise ValueError('X must be a pandas DataFrame')

    features = list(X.columns)
    n_features =len(features)

    self.feature_names = np.array(features)
    self.target_names = np.array(self.target_names)

    feature_values = []
    ale_values = []
    ale0 = []
    feature_deciles = []

    X = X[features].values

    # TODO: use joblib to paralelise?
    for feature, feat_name in enumerate(self.feature_names):
        if feat_name not in ignore_features:
            q, ale, a0 = ale_num(
                self.predictor,
                X=X,
                feature=feature,
                min_bin_points=min_bin_points
            )
            deciles = get_quantiles(X[:, feature], num_quantiles=11)

            feature_values.append(q)
            ale_values.append(ale)
            ale0.append(a0)
            feature_deciles.append(deciles)

    constant_value = self.predictor(X).mean()
    # TODO: an ALE plot ideally requires a rugplot to gauge density of instances in the feature space.
    # I've replaced this with feature deciles which is coarser but has constant space complexity
    # as opposed to a rugplot. Alternatively, could consider subsampling to produce a rug with some
    # maximum number of points.
    return self._build_explanation(
        ale_values=ale_values,
        ale0=ale0,
        constant_value=constant_value,
        feature_values=feature_values,
        feature_deciles=feature_deciles,
        ignore_features=ignore_features
    )
save(filename)

Overwrite to ensure that we use MercuryExplainer.save

Source code in mercury/explainability/explainers/ale.py
140
141
142
def save(self, filename):
    """Overwrite to ensure that we use MercuryExplainer.save"""
    MercuryExplainer.save(self, filename=filename)

plot_ale(exp, features='all', targets='all', n_cols=3, sharey='all', constant=False, ax=None, line_kw=None, fig_kw=None)

Plot ALE curves on matplotlib axes.

Parameters:

Name Type Description Default
exp Explanation

An Explanation object produced by a call to the ALE.explain method.

required
features Union[List[Union[int, str]], str]

A list of features for which to plot the ALE curves or all for all features. Can be a mix of integers denoting feature index or strings denoting entries in exp.feature_names. Defaults to 'all'.

'all'
targets Union[List[Union[int, str]], str]

A list of targets for which to plot the ALE curves or all for all targets. Can be a mix of integers denoting target index or strings denoting entries in exp.target_names. Defaults to 'all'.

'all'
n_cols int

Number of columns to organize the resulting plot into.

3
sharey str

A parameter specifying whether the y-axis of the ALE curves should be on the same scale for several features. Possible values are all, row, None.

'all'
constant bool

A parameter specifying whether the constant zeroth order effects should be added to the ALE first order effects.

False
ax Union[Axes, ndarray, None]

A matplotlib axes object or a numpy array of matplotlib axes to plot on.

None
line_kw Optional[dict]

Keyword arguments passed to the plt.plot function.

None
fig_kw Optional[dict]

Keyword arguments passed to the fig.set function.

None

Returns:

Type Description
ndarray

An array of matplotlib axes with the resulting ALE plots.

Source code in mercury/explainability/explainers/ale.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@no_type_check
def plot_ale(exp: Explanation,
             features: TP.Union[TP.List[TP.Union[int, str]], str] = 'all',
             targets: TP.Union[TP.List[TP.Union[int, str]], str] = 'all',
             n_cols: int = 3,
             sharey: str = 'all',
             constant: bool = False,
             ax: TP.Union['plt.Axes', np.ndarray, None] = None,
             line_kw: TP.Optional[dict] = None,
             fig_kw: TP.Optional[dict] = None) -> 'np.ndarray':
    """
    Plot ALE curves on matplotlib axes.

    Args:
        exp: An `Explanation` object produced by a call to the `ALE.explain` method.
        features: A list of features for which to plot the ALE curves or `all` for all features.
            Can be a mix of integers denoting feature index or strings denoting entries in
            `exp.feature_names`. Defaults to 'all'.
        targets: A list of targets for which to plot the ALE curves or `all` for all targets.
            Can be a mix of integers denoting target index or strings denoting entries in
            `exp.target_names`. Defaults to 'all'.
        n_cols: Number of columns to organize the resulting plot into.
        sharey: A parameter specifying whether the y-axis of the ALE curves should be on the same scale
            for several features. Possible values are `all`, `row`, `None`.
        constant: A parameter specifying whether the constant zeroth order effects should be added to the
            ALE first order effects.
        ax: A `matplotlib` axes object or a numpy array of `matplotlib` axes to plot on.
        line_kw: Keyword arguments passed to the `plt.plot` function.
        fig_kw: Keyword arguments passed to the `fig.set` function.

    Returns:
        An array of matplotlib axes with the resulting ALE plots.

    """
    # line_kw and fig_kw values
    default_line_kw = {'markersize': 3, 'marker': 'o', 'label': None}
    if line_kw is None:
        line_kw = {}
    line_kw = {**default_line_kw, **line_kw}

    default_fig_kw = {'tight_layout': 'tight'}
    if fig_kw is None:
        fig_kw = {}
    fig_kw = {**default_fig_kw, **fig_kw}

    if features == 'all':
        selected_features = range(0, len(exp.feature_names))
    else:
        selected_features = []
        for ix, f in enumerate(features):
            if isinstance(f, str):
                try:
                    selected_features.append(exp.feature_names.index(f))
                except ValueError:
                    raise ValueError("Feature name {} does not exist.".format(f))
            elif isinstance(f, int):
                selected_features.append(f)
    n_features = len(selected_features)

    if targets == 'all':
        targets = range(0, len(exp.target_names))
    else:
        for ix, t in enumerate(targets):
            if isinstance(t, str):
                try:
                    t = np.argwhere(exp.target_names == t).item()
                except ValueError:
                    raise ValueError("Target name {} does not exist.".format(t))
            targets[ix] = t

    # make axes
    if ax is None:
        fig, ax = plt.subplots()

    if isinstance(ax, plt.Axes) and n_features != 1:
        ax.set_axis_off()  # treat passed axis as a canvas for subplots
        fig = ax.figure
        n_cols = min(n_cols, n_features)
        n_rows = math.ceil(n_features / n_cols)

        axes = np.empty((n_rows, n_cols), dtype=np.object_)
        axes_ravel = axes.ravel()
        # gs = GridSpecFromSubplotSpec(n_rows, n_cols, subplot_spec=ax.get_subplotspec())
        gs = GridSpec(n_rows, n_cols)
        for i, spec in zip(range(n_features), gs):
            # determine which y-axes should be shared
            if sharey == 'all':
                cond = i != 0
            elif sharey == 'row':
                cond = i % n_cols != 0
            else:
                cond = False

            if cond:
                axes_ravel[i] = fig.add_subplot(spec, sharey=axes_ravel[i - 1])
                continue
            axes_ravel[i] = fig.add_subplot(spec)

    else:  # array-like
        if isinstance(ax, plt.Axes):
            ax = np.array(ax)
        if ax.size < n_features:
            raise ValueError("Expected ax to have {} axes, got {}".format(n_features, ax.size))
        axes = np.atleast_2d(ax)
        axes_ravel = axes.ravel()
        fig = axes_ravel[0].figure

    # make plots
    for ix, feature, ax_ravel in \
            zip(count(), selected_features, axes_ravel):
        _ = _plot_one_ale_num(exp=exp,
                              feature=feature,
                              targets=targets,
                              constant=constant,
                              ax=ax_ravel,
                              legend=not ix,  # only one legend
                              line_kw=line_kw)

    # if explicit labels passed, handle the legend here as the axis passed might be repeated
    if line_kw['label'] is not None:
        axes_ravel[0].legend()

    fig.set(**fig_kw)
    # TODO: should we return just axes or ax + axes
    return axes

anchors

AnchorsWithImportanceExplainer(predict_fn, train_data, categorical_names={}, disc_perc=(25, 50, 75), *args, **kwargs)

Bases: AnchorTabular, MercuryExplainer

Extending Alibi's AnchorsTabular Implementation, this module allows for the computation of feature importance by means of calculating several anchors. Initialize the anchor tabular explainer.

Parameters:

Name Type Description Default
predict_fn Callable

Model prediction function

required
train_data DataFrame

Pandas Dataframe with the features

required
disc_perc Tuple[Union[int, float], ...]

List or tuple with percentiles (int) used for discretization.

(25, 50, 75)
categorical_names Dict[str, List]

Dictionary where keys are feature columns and values are the categories for the feature

{}

Raises:

Type Description
AttributeError

if categorical_names is not a dict

AttributeError

if train_data is not a pd.DataFrame

Example
>>> explain_data = pd.read_csv('./test/explain_data.csv')
>>> model = MyModel() # (Trained) model prediction function (has be callable)
>>> explainer = AnchorsWithImportanceExplainer(model, explain_data)
>>> explanation = explainer.explain(explain_data.head(10).values) # For the first 10 samples
>>> explanation.interpret_explanations(n_important_features=2)
# We can also get the feature importances for the first 10 samples.
>>> anchorsExtendedExplainer.get_feature_importance(explain_data=explain_data.head(10))
Source code in mercury/explainability/explainers/anchors.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(
    self,
    predict_fn: TP.Callable,
    train_data: pd.DataFrame,
    categorical_names: TP.Dict[str, TP.List] = {},
    disc_perc: TP.Tuple[TP.Union[int, float], ...] = (25, 50, 75),
    *args, **kwargs
) -> None:
    if not isinstance(categorical_names, dict):
        raise AttributeError("""
            The attribute categorical_names should be a dictionary
            where the keys are the categorical feature names and the
            values are the categories for each categorical feature.
        """)

    if not isinstance(train_data, pd.DataFrame):
        raise AttributeError("""
            train_data should be a pandas DataFrame.
        """)

    super().__init__(predict_fn, list(train_data.columns), categorical_names)
    self.categorical_names = categorical_names

    super().fit(
        train_data=train_data.values,
        disc_perc=disc_perc,
        *args, **kwargs
    )
explain(X, threshold=0.95, delta=0.1, tau=0.15, batch_size=100, coverage_samples=10000, beam_size=1, stop_on_first=False, max_anchor_size=None, min_samples_start=100, n_covered_ex=10, binary_cache_size=10000, cache_margin=1000, verbose=False, verbose_every=1, **kwargs)

Explain prediction made by classifier on instance X.

Parameters:

Name Type Description Default
X ndarray

Instance to be explained.

required
threshold float

Minimum precision threshold.

0.95
delta float

Used to compute beta.

0.1
tau float

Margin between lower confidence bound and minimum precision or upper bound.

0.15
batch_size int

Batch size used for sampling.

100
coverage_samples int

Number of samples used to estimate coverage from during result search.

10000
beam_size int

The number of anchors extended at each step of new anchors construction.

1
stop_on_first bool

If True, the beam search algorithm will return the first anchor that has satisfies the probability constraint.

False
max_anchor_size Optional[int]

Maximum number of features in result.

None
min_samples_start int

Min number of initial samples.

100
n_covered_ex int

How many examples where anchors apply to store for each anchor sampled during search (both examples where prediction on samples agrees/disagrees with desired_label are stored).

10
binary_cache_size int

The result search pre-allocates binary_cache_size batches for storing the binary arrays returned during sampling.

10000
cache_margin int

When only max(cache_margin, batch_size) positions in the binary cache remain empty, a new cache of the same size is pre-allocated to continue buffering samples.

1000
verbose bool

Display updates during the anchor search iterations.

False
verbose_every int

Frequency of displayed iterations during anchor search process.

1

Returns:

Type Description
Explanation

explanation Explanation object containing the result explaining the instance with additional metadata as attributes. See usage at AnchorTabular examples_ for details. .. _AnchorTabular examples: https://docs.seldon.io/projects/alibi/en/latest/methods/Anchors.html

Source code in mercury/explainability/explainers/anchors.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def explain(self,
            X: np.ndarray,
            threshold: float = 0.95,
            delta: float = 0.1,
            tau: float = 0.15,
            batch_size: int = 100,
            coverage_samples: int = 10000,
            beam_size: int = 1,
            stop_on_first: bool = False,
            max_anchor_size: TP.Optional[int] = None,
            min_samples_start: int = 100,
            n_covered_ex: int = 10,
            binary_cache_size: int = 10000,
            cache_margin: int = 1000,
            verbose: bool = False,
            verbose_every: int = 1,
            **kwargs: TP.Any) -> Explanation:
    """
    Explain prediction made by classifier on instance `X`.

    Args:
        X: Instance to be explained.
        threshold: Minimum precision threshold.
        delta: Used to compute `beta`.
        tau: Margin between lower confidence bound and minimum precision or upper bound.
        batch_size: Batch size used for sampling.
        coverage_samples: Number of samples used to estimate coverage from during result search.
        beam_size: The number of anchors extended at each step of new anchors construction.
        stop_on_first: If ``True``, the beam search algorithm will return the
                        first anchor that has satisfies the probability constraint.
        max_anchor_size: Maximum number of features in result.
        min_samples_start: Min number of initial samples.
        n_covered_ex: How many examples where anchors apply to store for each anchor sampled during search
                        (both examples where prediction on samples agrees/disagrees with `desired_label` are stored).
        binary_cache_size: The result search pre-allocates `binary_cache_size` batches for storing the binary arrays
                            returned during sampling.
        cache_margin: When only ``max(cache_margin, batch_size)`` positions in the binary cache remain empty, a new cache
                        of the same size is pre-allocated to continue buffering samples.
        verbose: Display updates during the anchor search iterations.
        verbose_every: Frequency of displayed iterations during anchor search process.

    Returns:
        explanation
            `Explanation` object containing the result explaining the instance with additional metadata as attributes.
            See usage at `AnchorTabular examples`_ for details.
            .. _AnchorTabular examples:
                https://docs.seldon.io/projects/alibi/en/latest/methods/Anchors.html
    """
    exp = super().explain(
        X=X,
        threshold=threshold,
        delta=delta,
        tau=tau,
        batch_size=batch_size,
        coverage_samples=coverage_samples,
        beam_size=beam_size,
        stop_on_first=stop_on_first,
        max_anchor_size=max_anchor_size,
        min_samples_start=min_samples_start,
        n_covered_ex=n_covered_ex,
        binary_cache_size=binary_cache_size,
        cache_margin=cache_margin,
        verbose=verbose,
        verbose_every=verbose_every
    )

    # This attribute makes pickle serialization crash, so we delete it.
    if hasattr(self, "mab"):
        delattr(self, "mab")

    return exp
get_feature_importance(explain_data, threshold=0.95, print_every=0, print_explanations=False, n_important_features=3, tau=0.15, timeout=0)

Parameters:

Name Type Description Default
explain_data DataFrame

Pandas dataframe containing all the instances for which to find an anchor and therefore obtain feature importances.

required
threshold float

To be used in and passed down to the anchor explainer as defined on Alibi's documentation. Controls the minimum precision desired when looking for anchors. Defaults to 0.95.

0.95
print_every int

Logging information. Defaults to 0 - No logging

0
print_explanations bool

Boolean that determines whether to print the explanations at the end of the method or not. Defaults to False.

False
n_important_features int

Number of top features that will be printed. Defaults to 3.

3
tau float

To be used in and passed down to the anchos explainer as defined on Alibi's documentation. Used within the multi-armed bandit part of the optimisation problem. Defaults to 0.15

0.15
timeout int

Maximum time to be spent looking for an Anchor in seconds. A value of 0 means that no timeout is set. Defaults to 0.

0

Returns:

Type Description
AnchorsWithImportanceExplanation

A list containing all the explanations.

Source code in mercury/explainability/explainers/anchors.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def get_feature_importance(
    self,
    explain_data: pd.DataFrame,
    threshold: float = 0.95,
    print_every: int = 0,
    print_explanations: bool = False,
    n_important_features: int = 3,
    tau: float = 0.15,
    timeout: int = 0) -> AnchorsWithImportanceExplanation:
    """
    Args:
        explain_data:
            Pandas dataframe containing all the instances for which to find an anchor and therefore
            obtain feature importances.
        threshold: To be used in and passed down to the anchor explainer as defined on Alibi's documentation.
            Controls the minimum precision desired when looking for anchors.
            Defaults to 0.95.
        print_every:
            Logging information.
            Defaults to 0 - No logging
        print_explanations:
            Boolean that determines whether to print the explanations at the end of the method or not.
            Defaults to False.
        n_important_features:
            Number of top features that will be printed.
            Defaults to 3.
        tau:
            To be used in and passed down to the anchos explainer as defined on Alibi's documentation.
            Used within the multi-armed bandit part of the optimisation problem.
            Defaults to 0.15
        timeout:
            Maximum time to be spent looking for an Anchor in seconds. A value of 0 means that no timeout
            is set.
            Defaults to 0.

    Returns:
        A list containing all the explanations.
    """

    explanations = []
    if print_every > 0:
        print('Looking for a total of {} explanations'.format(
            len(explain_data))
        )
    for explain_datum_idx, explain_datum in explain_data.iterrows():
        try:
            explanation = run_until_timeout(timeout,
                              self.explain,
                              explain_datum.values,
                              threshold=threshold,
                              tau=tau)
            explanations.append(explanation)
        except Exception:
            if print_every > 0:
                print('No anchor found for observation {}'
                                            .format(explain_datum_idx))
            explanations.append('No explanation')

        # Unset timeout
        signal.alarm(0)

        if print_every > 0:
            if len(explanations) % print_every == 0:
                print(
                    ("""A total of {} observations have been processed """ +
                        """for explaining""").format(len(explanations)))
                print("{} anchors have already been found".format(
                    sum([1 for explan in explanations
                        if not isinstance(explan, str)])
                ))
    # Here we have a list with all the anchors explanations that we've been able to find.
    anchorsExtendedExplanation = AnchorsWithImportanceExplanation(
        explain_data=explain_data,
        explanations=explanations,
        categorical=self.categorical_names
    )
    if print_explanations:
        anchorsExtendedExplanation.interpret_explanations(
            n_important_features=n_important_features
        )
    return anchorsExtendedExplanation
save(filename)

Overwrite to ensure that we use MercuryExplainer.save

Source code in mercury/explainability/explainers/anchors.py
251
252
253
def save(self, filename):
    """Overwrite to ensure that we use MercuryExplainer.save"""
    MercuryExplainer.save(self, filename=filename)
translate(explanation)

Translates an explanation into simple words

Parameters:

Name Type Description Default
explanation Explanation

Alibi explanation object

required
Source code in mercury/explainability/explainers/anchors.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def translate(self, explanation: Explanation) -> str:
    """
    Translates an explanation into simple words

    Args:
        explanation: Alibi explanation object

    """
    coverage = explanation['data']['coverage']
    if type(explanation['data']['precision']) is np.ndarray:
        precision = explanation['data']['precision'][0]
    else:
        precision = explanation['data']['precision']

    if coverage * precision < 0.1:
        quality = "POOR"
    elif 0.1 <= coverage * precision < 0.4:
        quality = "GOOD"
    else:
        quality = "GREAT"

    return "[{} explanation] This anchor explains a {}% of all records of its class with {}% confidence.".format(
        quality,
        round(100 * coverage, 2),
        round(100 * precision, 2)
    )

cf_strategies

Optimization strategies for simple counterfactual explanations.

Backtracking(state, bounds, step, fn, class_idx=1, threshold=0.0, kernel=None, report=False, keep_explored_points=True)

Bases: Strategy

Backtracking strategy.

Parameters:

Name Type Description Default
state ndarray

Initial state (initial starting point).

required
bounds ndarray

Bounds to be used when moving around the probability space defined by fn.

required
step ndarray

Step size values to be used when moving around the probability space defined by fn. Lower values may take more time/steps to find a solution while too large values may make impossible to find a solution.

required
fn Callable[[Union[ndarray, DataFrame]], ndarray]

Classifier predict_proba- like function.

required
class_idx int

Class to be explained (e.g. 1 for binary classifiers). Default value is 1.

1
threshold float

Probability to be achieved (if path is found). Default value is 0.0.

0.0
kernel Optional[ndarray]

Used to penalize certain dimensions when trying to move around the probability space (some dimensions may be more difficult to explain, hence don't move along them).

None
report bool

Whether to display probability updates during space search.

False
keep_explored_points bool

Whether to keep the points that the algorithm explores. Setting it to False will decrease the computation time and memory usage in some cases. Default value is True.

True
Source code in mercury/explainability/explainers/cf_strategies.py
229
230
231
232
233
234
235
236
237
238
239
240
241
def __init__(self,
             state: 'np.ndarray',
             bounds: 'np.ndarray',
             step: 'np.ndarray',
             fn: TP.Callable[[TP.Union['np.ndarray', pd.DataFrame]], 'np.ndarray'],
             class_idx: int = 1,
             threshold: float = 0.,
             kernel: TP.Optional['np.ndarray'] = None,
             report: bool = False,
             keep_explored_points: bool = True,
    ) -> None:
    super().__init__(state, bounds, step, fn, class_idx, threshold, kernel, report)
    self.keep_explored_points = keep_explored_points
run(*args, **kwargs)

Kick off the backtracking.

Parameters:

Name Type Description Default
**kwargs

Backtracking specific arguments - max_iter: max number of iterations

{}

Returns:

Name Type Description
result Tuple[ndarray, float, ndarray, Optional[ndarray]]

Tuple containing found solution, probability achieved, points visited and corresponding energies (i.e. probabilities).

Source code in mercury/explainability/explainers/cf_strategies.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def run(self, *args, **kwargs) -> TP.Tuple['np.ndarray', float, 'np.ndarray', TP.Optional['np.ndarray']]:
    """ Kick off the backtracking.

    Args:
        **kwargs: Backtracking specific arguments
                    - max_iter: max number of iterations

    Returns:
        result (TP.Tuple['np.ndarray', float, 'np.ndarray', TP.Optional['np.ndarray']]):
            Tuple containing found solution, probability achieved, points visited and
            corresponding energies (i.e. probabilities).
    """

    # Backtracking specific arguments
    if 'max_iter' in kwargs:
        max_iter = kwargs['max_iter']
    else:
        max_iter = 0
    if 'limit' in kwargs:
        limit = kwargs['limit']
    else:
        limit = None
    if 'shuffle_limit' in kwargs:
        shuffle_limit = kwargs['shuffle_limit']
    else:
        shuffle_limit = False

    # Backtracking starting point.
    point = self.state.reshape(1, -1)
    curr_prob = self.fn(point)[0, self.class_idx]
    best_point = point.copy()
    best_prob = curr_prob

    # Define condition (>= or <=) from current point and given threshold.
    # and the priority modifier (ascending or descending order) using prio_mod.
    if self.threshold > curr_prob:
        condition = float.__ge__
        prio_mod = lambda x: -x
    else:
        condition = float.__le__
        prio_mod = lambda x: x
        self.kernel = 1.1 - self.kernel

    visited = np.hstack([point, [[curr_prob]]])
    explored = np.hstack([point, [[curr_prob]]]) if self.keep_explored_points else np.array([])

    q = MyPriorityQueue()  # type: ignore
    # Avoid duplicates and querying known points.
    cache = {str(point.tolist()): curr_prob}
    points = [point]
    counter = 0
    while condition(self.threshold, curr_prob) and (not max_iter or counter < max_iter):
        # Explore neighbours.
        neighbours = np.zeros((0, self.step.size))
        neighbours_l = []
        neighbours_kernel = []
        local_cache = set()
        for p_ in points:
            for i, delta in enumerate(self.step):
                # In every direction.
                for sign in (-1, 1):
                    aux = p_.copy()
                    new_aux_i = aux[0, i] + sign * delta
                    if new_aux_i >= self.bounds[i][0] and new_aux_i <= self.bounds[i][1]:
                        aux[0, i] = new_aux_i
                        aux_l = aux.tolist()
                        str_aux_l = str(aux_l)
                        if str_aux_l not in cache and str_aux_l not in local_cache:
                            # If point is not in cache (visited) -> enqueue.
                            neighbours = np.vstack([neighbours, aux])
                            neighbours_l.append(aux_l)
                            neighbours_kernel.append(self.kernel[i])
                            local_cache.add(str_aux_l)
        if len(neighbours_l):
            assert neighbours.shape[0] == len(neighbours_kernel) == len(neighbours_l), \
                'Number of neighbours should match.'
            probs = self.fn(neighbours)[:, self.class_idx]
            for n_idx, kernel, pt in zip(
                    range(probs.shape[0]), neighbours_kernel, neighbours_l):
                prob = float(probs[n_idx])
                prio = prob * kernel
                q.put((prio_mod(prio), prob, pt))
                if self.keep_explored_points:
                    explored = np.vstack(
                        [explored,
                        np.hstack([neighbours[n_idx].reshape(1, -1), [[prob]]])])
                cache[str(pt)] = prob
        try:
            elements = q.get_same_priority(limit=limit, block=False, shuffle_limit=shuffle_limit)
            curr_prob = max([x[1] for x in elements])
            points = [np.array(x[2]) for x in elements]
        except queue.Empty:
            self.visited = visited
            self.explored = explored
            return best_point[0], best_prob, self.visited, self.explored
        if condition(curr_prob, best_prob):
            best_prob = curr_prob
            best_point = points[0]
        for point in points:
            visited = np.vstack([visited, np.hstack([point,
                                                     [[curr_prob]]])])

        if self.report:

            iter_string = f"{counter}/{max_iter}"
            update_string = f"\r{iter_string}\t\t{round(best_prob, 2)}"
            if counter == 0:
                print('\rIteration\tBest Prob', file=sys.stderr)
                print(update_string, file=sys.stderr, end="")
            else:
                print(update_string, file=sys.stderr, end="")
            sys.stderr.flush()

        counter += 1 if max_iter else 0

    self.explored = explored
    self.visited = visited
    return best_point[0], best_prob, self.visited, self.explored

SimulatedAnnealing(state, bounds, step, fn, class_idx=1, threshold=0.0, kernel=None, report=False)

Bases: Strategy, Annealer

Simulated Annealing strategy.

Parameters:

Name Type Description Default
state ndarray

Initial state (initial starting point).

required
bounds ndarray

Bounds to be used when moving around the probability space defined by fn.

required
step ndarray

Step size values to be used when moving around the probability space defined by fn. Lower values may take more time/steps to find a solution while too large values may make impossible to find a solution.

required
fn Callable[[Union[ndarray, DataFrame]], ndarray]

Classifier predict_proba- like function.

required
class_idx int

Class to be explained (e.g. 1 for binary classifiers). Default value is 1.

1
threshold float

Probability to be achieved (if path is found). Default value is 0.0.

0.0
kernel Optional[ndarray]

Used to penalize certain dimensions when trying to move around the probability space (some dimensions may be more difficult to explain, hence don't move along them).

None
report bool

Whether to display probability updates during space search.

False
Source code in mercury/explainability/explainers/cf_strategies.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def __init__(self,
             state: 'np.ndarray',
             bounds: 'np.ndarray',
             step: 'np.ndarray',
             fn: TP.Callable[[TP.Union['np.ndarray', pd.DataFrame]], 'np.ndarray'],
             class_idx: int = 1,
             threshold: float = 0.,
             kernel: TP.Optional['np.ndarray'] = None,
             report: bool = False) -> None:
    Strategy.__init__(self, state, bounds, step, fn, class_idx, threshold, kernel, report)
    # Annealer's __init__ uses signal, see Annealers's init:
    # https://github.com/perrygeo/simanneal/blob/b2576eb75d88f8b8c91d959a44dd708706bb108e/simanneal/anneal.py#L52
    # This may break execution (depending on where simanneal runs, e.g. threads)
    # Overload it!
    self.state = Annealer.copy_state(self, state)
best_solution(n=3)

Returns the n best solutions found during the Simulated Annealing.

Parameters:

Name Type Description Default
n int

Number of solutions to be retrieved. Default value is 3.

3
Source code in mercury/explainability/explainers/cf_strategies.py
136
137
138
139
140
141
142
143
144
145
146
147
def best_solution(self, n: int = 3) -> 'np.ndarray':
    """ Returns the n best solutions found during the Simulated Annealing.

    Args:
        n (int): Number of solutions to be retrieved. Default value is 3.
    """

    ps = self.fn(np.array(self.explored))[:, self.class_idx]
    sorted_ps_idxs = np.argsort(ps)
    if not self.min:
        sorted_ps_idxs = sorted_ps_idxs[::-1]
    return np.array(self.explored)[sorted_ps_idxs[:n]]
energy()

Energy step in Simulated Annealing.

Source code in mercury/explainability/explainers/cf_strategies.py
122
123
124
125
126
127
128
129
130
131
132
133
134
def energy(self) -> None:
    """ Energy step in Simulated Annealing. """
    p = self.fn(np.array(self.state.reshape(1, -1)))[0, self.class_idx]
    # energy check
    if self.min:
        if self.threshold is not None and p < self.threshold:
            p = self.threshold
    else:
        if self.threshold is not None and p > self.threshold:
            p = self.threshold
    self.energies.append(p)
    value = p if self.min else -p
    return value
move()

Move step in Simulated Annealing.

Source code in mercury/explainability/explainers/cf_strategies.py
111
112
113
114
115
116
117
118
119
120
def move(self) -> None:
    """ Move step in Simulated Annealing. """
    self.state = np.random.uniform(-0.1, 0.1, size=self.state.shape[0]) * self.kernel * self.step + self.state
    for i, bound in enumerate(self.bounds):
        if self.kernel[i] != 0:
            if self.state[i] < bound[0]:
                self.state[i] = bound[0]
            elif self.state[i] > bound[1]:
                self.state[i] = bound[1]
    self.explored.append(self.state)
run(*args, **kwargs)

Kick off Simulated Annealing.

Parameters:

Name Type Description Default
**kargs

Simulated Annealing specific arguments - tmin: min temperature - tmax: max temperature - steps: number of iterations

required

Returns:

Name Type Description
result Tuple[ndarray, float, ndarray, Optional[ndarray]]

Tuple containing found solution, probability achieved, points visited and corresponding energies (i.e. probabilities).

Source code in mercury/explainability/explainers/cf_strategies.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def run(self, *args, **kwargs) -> TP.Tuple['np.ndarray', float, 'np.ndarray', TP.Optional['np.ndarray']]:
    """ Kick off Simulated Annealing.

    Args:
        **kargs:
            Simulated Annealing specific arguments
                - tmin: min temperature
                - tmax: max temperature
                - steps: number of iterations

    Returns:
        result (TP.Tuple['np.ndarray', float, 'np.ndarray', TP.Optional['np.ndarray']]):
            Tuple containing found solution, probability achieved, points visited and
            corresponding energies (i.e. probabilities).
    """

    if 'tmin' in kwargs:
        self.Tmin = kwargs['tmin']
    if 'tmax' in kwargs:
        self.Tmax = kwargs['tmax']
    if 'steps' in kwargs:
        self.steps = kwargs['steps']
        self.updates = self.steps
        # self.updates = 100
    sol, p = self.anneal()  # type: TP.Tuple['np.ndarray', float]
    return sol, p, np.array(self.explored), np.array(self.energies)

Strategy(state, bounds, step, fn, class_idx=1, threshold=0.0, kernel=None, report=False)

Base class for explanation strategies.

Source code in mercury/explainability/explainers/cf_strategies.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(self,
             state: 'np.ndarray',
             bounds: 'np.ndarray',
             step: 'np.ndarray',
             fn: TP.Callable[[TP.Union['np.ndarray', pd.DataFrame]], 'np.ndarray'],
             class_idx: int = 1,
             threshold: float = 0.,
             kernel: TP.Optional['np.ndarray'] = None,
             report: bool = False) -> None:

    # Starting point
    self.state = state
    # Dimension bounds
    assert bounds.shape[0] == state.shape[0]
    self.bounds = bounds
    assert step.shape[0] == bounds.shape[0]
    self.step = step
    # Classifier fn
    self.fn = fn
    # Class
    self.class_idx = class_idx
    # Explored points
    self.explored = [self.state]
    # Probability threshold
    assert threshold >= 0. and threshold <= 1.
    self.threshold = threshold
    # Energies
    self.energies = [self.fn(self.state.reshape(1, -1))[:, self.class_idx][0]]
    # Kernel
    if kernel is None:
        kernel = np.ones(self.state.shape[0])
    # minimization or maximization problem?
    cur_prob = self.fn(self.state.reshape(1, -1))[0, self.class_idx]
    if cur_prob > self.threshold:
        self.min = True
    else:
        self.min = False

    self.kernel = kernel
    self.report = report

clustering_tree_explainer

ClusteringTreeExplainer(clustering_model, max_leaves, verbose=0, base_tree='IMM', n_jobs=None, random_state=None)

Bases: MercuryExplainer

ClusteringTreeExplainer explains a clustering model using a DecisionTree.

It is based on the method Iterative Mistake Minimization (IMM). The high-level idea is to find build a decision tree with the same number of leaves as the number of clusters. The tree is build by using the predicted clusters of a dataset using some previously fitted clustering model (like K-means) and fitting the decision tree using the clusters as labels. At each step, the a node with containing two or more of reference centres is split so the resulting split sends at least one reference centre to each side and moreover produces the fewest mistakes: that is, separates the minimum points from their corresponding centres.

There is also de option to create a decision tree with a higher number of leaves than clusters. This is based on the ExKMC method, which is an extension of the IMM algorithm. The goal in this case is to achieve fewer mistakes in the resulting decision tree, with the trade-off that it will be less explainable. You can see more details of the methods in the referenced papers below

In this implementation, the clustering solution can be created before using the ClusteringTreeExplainer. Otherwise, a k-means with default parameters is created before fitting the decision tree.

References

"Explainable k-Means and k-Medians Clustering": (http://proceedings.mlr.press/v119/moshkovitz20a/moshkovitz20a.pdf) "ExKMC: Expanding Explainable k-Means Clustering": (https://arxiv.org/pdf/2006.02399.pdf)

Parameters:

Name Type Description Default
clustering_model Union[BaseEstimator, Model]

The clustering model fitted. It supports sklearn and pyspark. When using sklearn, the model must be a sklearn BaseEstimator with a predict method and cluster_centers_ attribute (like KMeans). Alternatively, you can provide a fitted pipeline where the last stage is the clustering algorithm. When using pyspark, the clustering_model must be a fitted Pyspark Estimator containing clusterCenters() method (like pyspark Kmeans). Alternatively, you can provide a fitted Pyspark PipelineModel where the last stage of the pipeline contains the clustering algorithm and contains the `clusterCenters()' method.

required
k

number of clusters

required
max_leaves int

the maximum number of leaves. If max_leaves == k, then the method is the Iterative Mistake Minimization (IMM). If max_leaves > k, then the method to expand the tree further than k leaves is ExKMC. It cannot be max_leaves < k

required
verbose int

whether to show some messages during the process. If 0, it doesn't show any message. If >=1 show messages.

0
base_tree str

method to use to build the tree. If 'IMM' it build a tree with k leaves using the IMM method and then it expands it using the ExKMC method if max_leaves > k. If None then it uses ExKMC method of splitting nodes from the root node. The default and recommended option is 'IMM'

'IMM'
n_jobs int

number of jobs

None
random_state int

seed to use

None
Example
>>> from mercury.explainability.explainers.clustering_tree_explainer import ClusteringTreeExplainer
>>> from sklearn.cluster import KMeans
>>> k = 4
>>> kmeans = KMeans(k, random_state=42)
>>> kmeans.fit(df)
>>> clustering_tree_explainer = ClusteringTreeExplainer(clustering_model=kmeans, k=k, max_leaves=k)
>>> explanation = clustering_tree_explainer.explain(df)
>>> plot_explanation = explanation.plot(filename="explanation")
>>> plot_explanation
Source code in mercury/explainability/explainers/clustering_tree_explainer.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def __init__(
    self,
    clustering_model: TP.Union["sklearn.base.BaseEstimator", "pyspark.ml.base.Model"],  # noqa: F821
    max_leaves: int,
    verbose: int = 0,
    base_tree:str = 'IMM',
    n_jobs: int = None,
    random_state:int = None
):

    self.clustering_model = clustering_model
    self.all_centers = self._get_cluster_centers()
    self.k = len(self.all_centers)
    self.max_leaves = self.k if max_leaves is None else max_leaves
    if self.max_leaves < self.k:
        raise Exception('max_trees must be greater or equal to number of clusters [%d < %d]' % (self.max_leaves, self.k))
    self.verbose = verbose
    if base_tree not in BASE_TREE:
        raise Exception(base_tree + ' is not a supported base tree')
    self.base_tree = base_tree
    self.n_jobs = n_jobs if n_jobs is not None else 1
    self.random_state = random_state

    self.tree = None
    self._feature_importance = None
explain(X, subsample=None)

Create explanation for clustering algorithm.

Parameters:

Name Type Description Default
X Union[DataFrame, DataFrame]

inputs of the data that we are clustering

required
subsample float

percentage of X to subsample (only when using pyspark)

None

Returns:

Type Description
ClusteringTreeExplanation

ClusteringTreeExplanation object, which contains plot() method to display the built decision tree

Source code in mercury/explainability/explainers/clustering_tree_explainer.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def explain(
    self,
    X: TP.Union["pandas.DataFrame", "pyspark.sql.DataFrame"],  # noqa: F821
    subsample: float = None,
) -> ClusteringTreeExplanation:

    """
    Create explanation for clustering algorithm.

    Args:
        X: inputs of the data that we are clustering
        subsample: percentage of `X` to subsample (only when using pyspark)

    Returns:
        ClusteringTreeExplanation object, which contains plot() method to display the built decision tree
    """

    feature_names = X.columns

    if isinstance(X, pd.DataFrame):
        X, y = self._get_cluster_labels_pandas(X)
    else:
        X, y = self._get_cluster_labels_pyspark(X, subsample)

    self.tree = Tree(
        base_tree=self.base_tree, max_leaves=self.max_leaves, all_centers=self.all_centers,
        verbose=self.verbose, n_jobs=self.n_jobs, k=self.k
    )
    self.tree.fit(X, y)

    return ClusteringTreeExplanation(self.tree.root, feature_names=feature_names)
score(X)

Return the k-means cost of X. The k-means cost is the sum of squared distances of each point to the mean of points associated with the cluster. Args: X: The input samples. Returns: k-means cost of X.

Source code in mercury/explainability/explainers/clustering_tree_explainer.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
def score(self, X: TP.Union[pd.DataFrame, np.array]):
    """
    Return the k-means cost of X.
    The k-means cost is the sum of squared distances of each point to the mean of points associated with the cluster.
    Args:
        X: The input samples.
    Returns:
        k-means cost of X.
    """
    X = _convert_input(X)
    clusters = self.tree.predict(X)
    cost = 0
    for c in range(self.k):
        cluster_data = X[clusters == c, :]
        if cluster_data.shape[0] > 0:
            center = cluster_data.mean(axis=0)
            cost += np.linalg.norm(cluster_data - center) ** 2
    return cost
surrogate_score(X)

Return the k-means surrogate cost of X. The k-means surrogate cost is the sum of squared distances of each point to the closest center of the kmeans given (or trained) in the fit method. k-means surrogate cost > k-means cost, as k-means cost is computed with respect to the optimal centers. Args: X: The input samples. Returns: k-means surrogate cost of X.

Source code in mercury/explainability/explainers/clustering_tree_explainer.py
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def surrogate_score(self, X: TP.Union[pd.DataFrame, np.array]):
    """
    Return the k-means surrogate cost of X.
    The k-means surrogate cost is the sum of squared distances of each point to the closest center of the kmeans given (or trained)
    in the fit method. k-means surrogate cost > k-means cost, as k-means cost is computed with respect to the optimal centers.
    Args:
        X: The input samples.
    Returns:
        k-means surrogate cost of X.
    """
    X = _convert_input(X)
    clusters = self.tree.predict(X)
    cost = 0
    for c in range(self.k):
        cluster_data = X[clusters == c, :]
        if cluster_data.shape[0] > 0:
            center = self.all_centers[c]
            cost += np.linalg.norm(cluster_data - center) ** 2
    return cost

Tree(base_tree, k, max_leaves, all_centers, verbose, n_jobs)

Source code in mercury/explainability/explainers/clustering_tree_explainer.py
40
41
42
43
44
45
46
47
48
def __init__(self, base_tree, k, max_leaves, all_centers, verbose, n_jobs):
    self.root = None
    self.base_tree = base_tree
    self.k = k
    self.max_leaves = max_leaves
    self.all_centers = all_centers
    self.verbose = verbose
    self.n_jobs = n_jobs
    self._leaves_data = {}
__max_depth__(node)

Return the depth of the subtree rooted by node.

Parameters:

Name Type Description Default
node Node

root of a subtree.

required

Returns:

Type Description

The depth of the subtree rooted by node.

Source code in mercury/explainability/explainers/clustering_tree_explainer.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def __max_depth__(self, node: Node):
    """
    Return the depth of the subtree rooted by node.

    Args:
        node: root of a subtree.

    Returns:
        The depth of the subtree rooted by node.
    """
    if node is None:
        return -1
    else:
        dl = self.__max_depth__(node.left)
        dr = self.__max_depth__(node.right)
        return 1 + max(dl, dr)
__size__(node)

Return the number of nodes in the subtree rooted by node. Args: node: root of a subtree. Returns the number of nodes in the subtree rooted by node.

Source code in mercury/explainability/explainers/clustering_tree_explainer.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def __size__(self, node: Node):
    """
    Return the number of nodes in the subtree rooted by node.
    Args:
        node: root of a subtree.
    Returns
        the number of nodes in the subtree rooted by node.
    """
    if node is None:
        return 0
    else:
        sl = self.__size__(node.left)
        sr = self.__size__(node.right)
        return 1 + sl + sr
predict(X)

Predict clusters for X. X: The input samples. Returns: The predicted clusters.

Source code in mercury/explainability/explainers/clustering_tree_explainer.py
126
127
128
129
130
131
132
133
134
def predict(self, X: TP.Union[pd.DataFrame, np.array]):
    """
    Predict clusters for X.
        X: The input samples.
    Returns:
        The predicted clusters.
    """
    X = _convert_input(X)
    return self._predict_subtree(self.root, X)

counter_fact_basic

CounterFactualExplainerBasic(train, fn, labels=[], bounds=None, n_steps=200)

Bases: MercuryExplainer

Explains predictions on tabular (i.e. matrix) data for binary/multiclass classifiers. Currently two main strategies are implemented: one following a backtracking strategy and another following a probabilistic process (simulated annealing strategy).

Parameters:

Name Type Description Default
train Union[ndarray, DataFrame]

Training dataset to extract feature bounds from.

required
fn Callable[[Union[ndarray, DataFrame]], Union[float, ndarray]]

Classifier predict_proba-like function. Note that the returned probabilities must be valid, ie. the values must be between 0 and 1.

required
labels List[str]

List of labels to be used when plotting results. If DataFrame used, labels take dataframe column names. Default is empty list.

[]
bounds Optional[ndarray]

Feature bounds used when no train data is provided (shape must match labels'). Default is None.

None
n_steps int

Parameter used to indicate how small/large steps should be when exploring the space (default is 200).

200

Raises:

Type Description
AssertionError

if bounds.size <= 0 when no train data is provided | if bounds.ndim != 2 when no train data is provided | if bounds.shape[1] != 2 when no train data is provided | if bounds.shape[0] != len(labels)

TypeError

if train is not a DataFrame or numpy array.

Source code in mercury/explainability/explainers/counter_fact_basic.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(self,
             train: TP.Union[TP.Optional['np.ndarray'], TP.Optional[pd.DataFrame]],
             fn: TP.Callable[[TP.Union['np.ndarray', pd.DataFrame]], TP.Union[float, 'np.ndarray']],
             labels: TP.List[str] = [],
             bounds: TP.Optional['np.ndarray'] = None,
             n_steps: int = 200) -> None:
    if train is None:
        # If data is not provided, labels and bounds are required
        assert bounds.size > 0, 'Bounds are required if no data is provided'
        assert bounds.ndim == 2 and bounds.shape[1] == 2, 'min/max values are required for each feature'
        assert len(labels) == bounds.shape[0], \
            'Labels and bound shapes must match, got {} and {} respectively' \
                .format(len(labels), bounds.shape[0])
        # min/max values for each feature
        self.labels = labels
        self.bounds = bounds
    else:
        # Compute bounds
        if isinstance(train, pd.DataFrame):
            self.labels = train.columns.tolist()
            self.bounds = train.describe().loc[['min', 'max']].values.T
            assert len(self.labels) == self.bounds.shape[0], \
                'Labels and bound shapes must match, got {} and {} respectively' \
                    .format(len(self.labels), self.bounds.shape[0])
        elif isinstance(train, np.ndarray):
            self.labels = labels
            self.bounds = np.stack([
                np.apply_along_axis(np.min, 0, train),
                np.apply_along_axis(np.max, 0, train)], axis=1)
            assert len(self.labels) == self.bounds.shape[0], \
                'Labels and bound shapes must match, got {} and {} respectively' \
                    .format(len(self.labels), self.bounds.shape[0])
        else:
            raise TypeError('Invalid type for argument train, got {} but expected numpy array or pandas dataframe'.
                            format(type(train)))

    # Compute steps
    self.n_steps = n_steps
    self.step = (self.bounds[:, 1] - self.bounds[:, 0]) / self.n_steps

    # Function to be evaluated on optimization
    self.fn = fn
explain(from_, threshold, class_idx=1, kernel=None, bounds=None, step=None, strategy='backtracking', report=False, keep_explored_points=True, **kwargs)

Roll the panellet down the valley and find an explanation.

Parameters:

Name Type Description Default
from_ ndarray

Starting point.

required
threshold float

Probability to be achieved (if path is found).

required
class_idx int

Class to be explained (e.g. 1 for binary classifiers).

1
kernel Optional[ndarray]

Used to penalize certain dimensions when trying to move around the probability space (some dimensions may be more difficult to explain, hence don't move along them). Default is np.ones(n), meaning all dimensions can be used to move around the space (must be a value between 0 and 1).

None
bounds Optional[ndarray]

Feature bound values to be used when exploring the probability space. If not specified, the ones extracted from the training data are used instead.

None
step Optional[ndarray]

Step values to be used when moving around the probability space. If not specified, training bounds are divided by 200 (arbitrary value) and these are used as step value.

None
strategy str

If 'backtracking', the backtracking strategy is used to move around the probability space. If 'simanneal', the simulated annealing strategy is used to move around the probability space.

'backtracking'
report bool

Whether to report the algorithm progress during the execution.

False
keep_explored_points bool

Whether to keep the points that the algorithm explores. Setting it to False will decrease the computation time and memory usage in some cases. Default value is True.

True

Raises:

Type Description
AssertionError

If from_ number of dimensions is != 1 | If from_ shape does not match bounds shape | If bounds shape is not valid | If step shape does not match bounds shape |

ValueError

if strategy is not 'backtacking' or 'simanneal'.

Returns:

Name Type Description
explanation CounterfactualBasicExplanation

CounterfactualBasicExplanation with the solution found and how it differs from the starting point.

Source code in mercury/explainability/explainers/counter_fact_basic.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def explain(self,
            from_: 'np.ndarray',
            threshold: float,
            class_idx: int = 1,
            kernel: TP.Optional['np.ndarray'] = None,
            bounds: TP.Optional['np.ndarray'] = None,
            step: TP.Optional['np.ndarray'] = None,
            strategy: str = 'backtracking',
            report: bool = False,
            keep_explored_points: bool = True,
            **kwargs) -> CounterfactualBasicExplanation:
    """
    Roll the panellet down the valley and find an explanation.

    Args:
        from_ ('np.ndarray'):
            Starting point.
        threshold (float):
            Probability to be achieved (if path is found).
        class_idx (int):
            Class to be explained (e.g. 1 for binary classifiers).
        kernel (TP.Optional['np.ndarray']):
            Used to penalize certain dimensions when trying to move around
            the probability space (some dimensions may be more difficult to explain,
            hence don't move along them). Default is np.ones(n), meaning all dimensions
            can be used to move around the space (must be a value between 0 and 1).
        bounds (TP.Optional['np.ndarray']):
            Feature bound values to be used when exploring the probability space. If not
            specified, the ones extracted from the training data are used instead.
        step (TP.Optional['np.ndarray']):
            Step values to be used when moving around the probability space. If not specified,
            training bounds are divided by 200 (arbitrary value) and these are used as step value.
        strategy (str):
            If 'backtracking', the backtracking strategy is used to move around the probability space.
            If 'simanneal', the simulated annealing strategy is used to move around the probability space.
        report (bool):
            Whether to report the algorithm progress during the execution.
        keep_explored_points (bool):
            Whether to keep the points that the algorithm explores. Setting it to False will decrease
            the computation time and memory usage in some cases. Default value is True.

    Raises:
        AssertionError:
            If `from_` number of dimensions is != 1 |
            If `from_` shape does not match `bounds` shape |
            If `bounds` shape is not valid |
            If `step` shape does not match `bounds` shape |
        ValueError:
            if strategy is not 'backtacking' or 'simanneal'.

    Returns:
        explanation (CounterfactualBasicExplanation):
            CounterfactualBasicExplanation with the solution found and how it differs from the starting point.
    """

    if kernel is None:
        self.kernel = np.ones(from_.shape[0])
    else:
        self.kernel = kernel

    assert from_.ndim == 1, \
        'Invalid starting point shape, got {} but expected unidimensional vector'.format(from_.shape)

    if bounds is not None:
        assert from_.shape[0] == bounds.shape[0], \
            'Starting point and bounds shapes should match, got {} and {}'.format(from_.shape, bounds.shape[0])

    # Update bounds based on reference point
    l_bounds = self.bounds.copy()
    for i, bound in enumerate(l_bounds):
        new_min = bound[0]
        new_max = bound[1]
        if from_[i] < bound[0]:
            new_min = from_[i]
        elif from_[i] > bound[1]:
            new_max = from_[1]
        l_bounds[i, 0] = new_min
        l_bounds[i, 1] = new_max

    # Update bounds, if new_bounds are specified
    if bounds is not None:
        assert bounds.shape == l_bounds.shape, \
            'Invalid dimensions for new bounds, got {} but expected {}'.format(bounds.shape, l_bounds.shape)
        for i, bound in enumerate(bounds):
            # Update bound only if starting point is within it in this dimension
            if bound[0] <= from_[i] and bound[1] >= from_[i]:
                l_bounds[i] = bounds[i]

    if step is not None:
        assert step.shape[0] == l_bounds.shape[0], \
            'Invalid step shape, got {} but expected {}'.format(step.shape, l_bounds.shape[0])
        self.step = step

    if strategy == 'backtracking':
        # Backtracking strategy
        sol, p, visited, explored = Backtracking(from_, l_bounds, self.step, self.fn, class_idx,
                                                       threshold=threshold, kernel=self.kernel, report=report,
                                                       keep_explored_points=keep_explored_points).run(
            **kwargs)
        ps = visited[:, -1]
        visited = visited[:, :-1]
        if keep_explored_points:
            explored_points = explored[:, :-1]
            explored_ps = explored[:, -1]
        else:
            explored_points = np.array([])
            explored_ps = np.array([])
        return CounterfactualBasicExplanation(
            from_, sol, p, visited, ps, l_bounds, explored_points,
            explored_ps, labels=self.labels)
    elif strategy == 'simanneal':
        # Simulated Annealing strategy
        sol, p, visited, energies = SimulatedAnnealing(from_, l_bounds, self.step, self.fn, class_idx,
                                                             threshold=threshold, kernel=self.kernel,
                                                             report=report).run(**kwargs)
        return CounterfactualBasicExplanation(from_, sol, abs(p), visited, energies[:-1], l_bounds,
                                               labels=self.labels)
    else:
        raise ValueError('Invalid strategy')

counter_fact_importance

CounterFactualExplainerBase(feature_names, drop_features=[])

Bases: ABC

Abstract base class for CounterFactual explainers which hold common functionalities. You should extend this if you are implementing a custom counter factual explainer.

Parameters:

Name Type Description Default
feature_names List[str]

List of names of the features used in the model.

required
drop_features List[Any]

List with the names, as specified in feature_names, or with the indexes of the variables to leave out while looking for explanations.

[]
Source code in mercury/explainability/explainers/counter_fact_importance.py
33
34
35
36
37
38
39
def __init__(self, feature_names: TP.List[str],
             drop_features: TP.List[TP.Any] = []):
    self.feature_names = feature_names
    self.drop_features = drop_features
    # Each subclass will use its internal Alibi member (CounterFactual or
    # CounterFactualProto)
    self.cf_super = None
get_feature_importance(explain_data, n_important_features=3, print_every=5, threshold_probability=0.5, threshold_incr=0.01, init_threshold=0.1, max_iterations=10000, get_report=True)

This method computes the feature importance for a set of observations.

Parameters:

Name Type Description Default
explain_data DataFrame

Pandas DataFrame containing all the instances for which to find an anchor and therefore obtain feature importances.

required
n_important_features int

Number of top features that will be printed. Defaults to 3.

3
print_every int

Logging information (default=5).

5
threshold_incr float

The increment of the threshold to be used in the regularizer so as to bring the differences between a counterfactual instance and the original one to zero.

0.01
init_threshold float

The initial and minimum value to be used in the regularizer to bring the differences between a counterfactual instance and the original one to zero.

0.1
max_iterations int

Maximum number of iterations for the regularizer. This parameter gets pass down to the regularize_counterfactual method.

10000
get_report bool

Boolean determining whether to print the explanation or not.

True

Returns:

Type Description
CounterfactualWithImportanceExplanation

A CounterfactualExtendedExplanation object that contains all explanations as well as

CounterfactualWithImportanceExplanation

their interpretation.

Source code in mercury/explainability/explainers/counter_fact_importance.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def get_feature_importance(
    self,
    explain_data: pd.DataFrame,
    n_important_features: int = 3,
    print_every: int = 5,
    threshold_probability=0.5,
    threshold_incr: float = 0.01,
    init_threshold: float = 0.1,
    max_iterations: int = 10000,
    get_report: bool = True
) -> CounterfactualWithImportanceExplanation:
    """
    This method computes the feature importance for a set of observations.

    Args:
        explain_data:
            Pandas DataFrame containing all the instances for which to find an anchor and therefore
            obtain feature importances.
        n_important_features:
            Number of top features that will be printed. Defaults to 3.
        print_every:
            Logging information (default=5).
        threshold_incr:
            The increment of the threshold to be used in the regularizer so as to bring the differences
            between a counterfactual instance and the original one to zero.
        init_threshold:
            The initial and minimum value to be used in the regularizer to bring the differences
            between a counterfactual instance and the original one to zero.
        max_iterations:
            Maximum number of iterations for the regularizer. This parameter gets pass
            down to the regularize_counterfactual method.
        get_report:
            Boolean determining whether to print the explanation or not.

    Returns:
        A CounterfactualExtendedExplanation object that contains all explanations as well as
        their interpretation.
    """
    failed_explanations = 0
    counterfactuals = []
    attempted_explanations = 0
    for item_idx, item in explain_data.iterrows():
        try:
            attempted_explanations += 1
            cf_explanation = self.explain(
                item.values.reshape(1, -1)
            )
            cf_regularized, diff, new_proba = self._regularize_counterfactual(
                self.predict_fn,
                item.values.reshape(1, -1),
                cf_explanation.data['cf']['X'],
                min_prob=threshold_probability,
                threshold_incr=threshold_incr,
                max_iterations=max_iterations,
                init_threshold=init_threshold
            )
            cf_explanation = {
                'cf_original': cf_explanation.data['cf']['X'],
                'cf_class': cf_explanation.data['cf']['class'],
                'cf_proba': cf_explanation.data['cf']['proba'][0][
                    cf_explanation.data['cf']['class']
                ],
                'cf_regularized': cf_regularized,
                'cf_reg_proba': new_proba,
                'diff_orig_reg': diff
            }
        except Exception:
            failed_explanations += 1
            print('''
            There's been a problem while computing the explanation for observation: {}.
            If this observation is critical to you try with some other hyperparameters.
            '''.format(item_idx))
            cf_explanation = {
                'cf_original': None,
                'cf_class': None,
                'cf_proba': None,
                'cf_regularized': None,
                'cf_reg_proba': None,
                'diff_orig_reg': None
            }
        if attempted_explanations % print_every == 0:
            print('{} counterfactuals already computed. Found a solution for {} of them'.format(
                attempted_explanations, attempted_explanations - failed_explanations
            ))
        counterfactuals.append(cf_explanation)
    print('All counterfactuals have been computed and regularized')

    count_differences = dict(
        (feat_name, 0.0) for feat_name in self.feature_names
    )
    total_diffs = np.zeros(len(self.feature_names))

    if len(counterfactuals) > 0:
        for counterfactual_i in counterfactuals:
            if counterfactual_i['cf_original'] is not None:
                total_diffs += counterfactual_i['diff_orig_reg'][0]
                diffs_dict = dict(zip(self.feature_names, counterfactual_i['diff_orig_reg'][0]))
                for k, v in diffs_dict.items():
                    if np.abs(v) > 0.0:
                        count_differences[k] += 1

    max_diff = np.abs(total_diffs).max(axis=0)
    total_diffs /= max_diff

    importances = [
        (name, value, direction)
        for value, direction, name
        in sorted(
            zip(np.abs(total_diffs), np.sign(total_diffs), self.feature_names),
            reverse=True
        )
    ]

    count_differences_norm = dict(
        (key, value / (explain_data.shape[0] - failed_explanations))
        for key, value
        in count_differences.items()
    )
    count_differences_norm = {
        key: value
        for key, value
        in sorted(
            count_differences_norm.items(),
            key=lambda item: item[1],
            reverse=True
        )
    }

    n_important_features = n_important_features if n_important_features < len(self.feature_names) else len(self.feature_names)

    counterfactualExtendedExplanation = CounterfactualWithImportanceExplanation(
        explain_data,
        counterfactuals,
        importances,
        count_differences,
        count_differences_norm
    )

    if get_report:
        if n_important_features == len(self.feature_names):
            print('The total number of important features was too large and therefore all will be shown')
        if failed_explanations > 0:
            print('There were a total of {:d} fails'.format(failed_explanations))
        counterfactualExtendedExplanation.interpret_explanations(
            n_important_features
        )
    return counterfactualExtendedExplanation

CounterfactualExplainer(predict_fn, feature_names, shape=None, drop_features=[], distance_fn='l1', target_proba=1.0, target_class='other', max_iter=1000, early_stop=50, lam_init=0.1, max_lam_steps=10, tol=0.05, learning_rate_init=0.1, feature_range=(-10000000000.0, 10000000000.0), eps=0.01, init='identity', decay=True, write_dir=None, debug=False, sess=None)

Bases: CounterFactualExplainerBase

Backed by Alibi's CounterFactual, this class extends its functionality to allow for the computation of Feature Importances by means of the computation of several Counterfactuals.

Parameters:

Name Type Description Default
predict_fn Callable[[Union[ndarray, DataFrame]], Union[float, ndarray]]

Model prediction function. This function should return the probabilities of belonging to each class.

required
feature_names List[str]

List of names of the features used in the model.

required
shape Tuple[Any]

Shape of input data starting with batch size. By default it is inferred from feature_names.

None
drop_features List[Any]

List with the names, as specified in feature_names, or with the indexes of the variables to leave out while looking for explanations (in get_feature_importance method).

[]
distance_fn str

Distance function to use in the loss term

'l1'
target_proba float

Target probability for the counterfactual to reach

1.0
target_class Union[str, int]

Target class for the counterfactual to reach, one of 'other', 'same' or an integer denoting desired class membership for the counterfactual instance

'other'
max_iter int

Maximum number of interations to run the gradient descent for (inner loop)

1000
early_stop int

Number of steps after which to terminate gradient descent if all or none of found instances are solutions

50
lam_init float

Initial regularization constant for the prediction part of the Wachter loss

0.1
max_lam_steps int

Maximum number of times to adjust the regularization constant (outer loop) before terminating the search

10
tol float

Tolerance for the counterfactual target probability

0.05
learning_rate_init float

Initial learning rate for each outer loop of lambda

0.1
feature_range Union[Tuple[Any], str]

Tuple with min and max ranges to allow for perturbed instances. Min and max ranges can be floats or numpy arrays with dimension (1 x nb of features) for feature-wise ranges

(-10000000000.0, 10000000000.0)
eps Union[float, ndarray]

Gradient step sizes used in calculating numerical gradients, defaults to a single value for all features, but can be passed an array for feature-wise step sizes

0.01
init str

Initialization method for the search of counterfactuals, currently must be 'identity'

'identity'
decay bool

Flag to decay learning rate to zero for each outer loop over lambda

True
write_dir str

Directory to write Tensorboard files to

None
debug bool

Flag to write Tensorboard summaries for debugging

False
sess Session

Optional Tensorflow session that will be used if passed instead of creating or inferring one internally

None
Source code in mercury/explainability/explainers/counter_fact_importance.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def __init__(
    self,
    predict_fn: TP.Callable[[TP.Union['np.ndarray', pd.DataFrame]], TP.Union[float, 'np.ndarray']],
    feature_names: TP.List[str],
    shape: TP.Tuple[TP.Any] = None,
    drop_features: TP.List[TP.Any] = [],
    distance_fn: str = 'l1',
    target_proba: float = 1.0,
    target_class: TP.Union[str, int] = 'other',
    max_iter: int = 1000,
    early_stop: int = 50,
    lam_init: float = 1e-1,
    max_lam_steps: int = 10,
    tol: float = 0.05,
    learning_rate_init: float = 0.1,
    feature_range: TP.Union[TP.Tuple[TP.Any], str] = (-1e10, 1e10),
    eps: TP.Union[float, 'np.ndarray'] = 0.01,
    init: str = 'identity',
    decay: bool = True,
    write_dir: str = None,
    debug: bool = False,
    sess: tf.compat.v1.Session = None
) -> None:
    super(CounterfactualExplainer,
          self).__init__(feature_names, drop_features)

    self.predict_fn = predict_fn
    self.shape = shape if shape else (1, len(feature_names))
    self.distance_fn = distance_fn
    self.target_proba = target_proba
    self.target_class = target_class
    self.max_iter = max_iter
    self.early_stop = early_stop
    self.lam_init = lam_init
    self.max_lam_steps = max_lam_steps
    self.tol = tol
    self.learning_rate_init = learning_rate_init
    self.feature_range = feature_range
    self.eps = eps
    self.init = init
    self.decay = decay
    self.write_dir = write_dir
    self.debug = debug
    self.sess = sess
    self.cat_vars = None

    # Instantiate counterfactual
    self.cf_super = CounterFactual(
        predict_fn=self.predict_fn, shape=self.shape,
        distance_fn=self.distance_fn, target_proba=self.target_proba,
        target_class=self.target_class, max_iter=self.max_iter,
        early_stop=self.early_stop, lam_init=self.lam_init,
        max_lam_steps=self.max_lam_steps, tol=self.tol,
        learning_rate_init=self.learning_rate_init,
        feature_range=self.feature_range, eps=self.eps,
        init=self.init, decay=self.decay, write_dir=self.write_dir,
        debug=self.debug, sess=self.sess)
explain(explain_data)

This method serves two purposes. Firstly as an interface between the user and the CounterFactual class explain method and secondly, if there are features that shouldn't be taken into account, the method recomputes feature_range to account for this and reinstantiates the parent class (CounterFactual). Explain an instance and return the counterfactual with metadata.

Parameters:

Name Type Description Default
explain_data ndarray

Instance to be explained

required

Returns:

Type Description
Explanation

Explanation object as specified in Alibi's original explain method.

Source code in mercury/explainability/explainers/counter_fact_importance.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def explain(self, explain_data: 'np.ndarray') -> Explanation:
    """
    This method serves two purposes. Firstly as an interface between the user and the CounterFactual class
    explain method and secondly, if there are features that shouldn't be taken into account, the method
    recomputes feature_range to account for this and reinstantiates the parent class (CounterFactual).
    Explain an instance and return the counterfactual with metadata.

    Args:
        explain_data (np.ndarray): Instance to be explained

    Returns:
        Explanation object as specified in Alibi's original explain method.
    """
    return self.cf_super.explain(
        explain_data
    )

CounterfactualProtoExplainer(predict_fn, train_data, feature_names=None, shape=None, drop_features=[], kappa=0.0, beta=0.1, feature_range=None, gamma=0.0, ae_model=None, enc_model=None, theta=5.0, cat_vars=None, ohe=False, use_kdtree=False, learning_rate_init=0.01, max_iterations=1000, c_init=10.0, c_steps=10, eps=(0.001, 0.001), clip=(-1000, 1000), update_num_grad=1, write_dir=None, sess=None, trustscore_kwargs=None, d_type='abdm', w=None, disc_perc=[25, 50, 75], standardize_cat_vars=False, smooth=1.0, center=True, update_feature_range=True)

Bases: CounterFactualExplainerBase

Backed by Alibi's CounterFactualProto, this class extends its functionality to allow for the computation of Feature Importances by means of the computation of several Counterfactuals.

Parameters:

Name Type Description Default
predict_fn Callable[[Union[ndarray, DataFrame]], Union[float, ndarray]]

Model prediction function. This function should return the probabilities of belonging to each class.

required
train_data Union[ndarray, DataFrame]

Numpy array or Pandas dataframe with the training examples

required
feature_names List[str]

List of names of the features used in the model. Will be inferred by default from train_data

None
shape Tuple[Any]

Shape of input data starting with batch size. Will be inferred by default from train_data

None
drop_features List[Any]

List with the names, as specified in feature_names, or with the indexes of the variables to leave out while looking for explanations.

[]
beta float

Regularization constant for L1 loss term

0.1
kappa float

Confidence parameter for the attack loss term

0.0
feature_range Union[Tuple[Any], str]

Tuple with min and max ranges to allow for perturbed instances. Min and max ranges can be floats or numpy arrays with dimension (1x nb of features) for feature-wise ranges. Will be inferred by default from train_data

None
gamma float

Regularization constant for optional auto-encoder loss term

0.0
ae_model Model

Optional auto-encoder model used for loss regularization

None
enc_model Model

Optional encoder model used to guide instance perturbations towards a class prototype

None
theta float

Constant for the prototype search loss term. Default is 5. Set it to zero to disable it.

5.0
cat_vars dict

Dict with as keys the categorical columns and as values the number of categories per categorical variable.

None
ohe bool

Whether the categorical variables are one-hot encoded (OHE) or not. If not OHE, they are assumed to have ordinal encodings.

False
use_kdtree bool

Whether to use k-d trees for the prototype loss term if no encoder is available

False
learning_rate_init float

Initial learning rate of optimizer

0.01
max_iterations int

Maximum number of iterations for finding a counterfactual

1000
c_init float

Initial value to scale the attack loss term. If the computation shall be fastened up, this parameter should take a small value (0 or 1).

10.0
c_steps int

Number of iterations to adjust the constant scaling the attack loss term. If the computation shall be fastened up this parameter should take a somewhat small value (between 1 and 5).

10
eps Union[float, ndarray]

If numerical gradients are used to compute dL/dx = (dL/dp) * (dp/dx), then eps[0] is used to calculate dL/dp and eps[1] is used for dp/dx. eps[0] and eps[1] can be a combination of float values and numpy arrays. For eps[0], the array dimension should be (1x nb of prediction categories) and for eps[1] it should be (1x nb of features)

(0.001, 0.001)
clip Tuple[float]

Tuple with min and max clip ranges for both the numerical gradients and the gradients obtained from the TensorFlow graph

(-1000, 1000)
update_num_grad int

If numerical gradients are used, they will be updated every update_num_grad iterations. If the computation shall be fastened up this parameter should take a somewhat large value (between 100 and 250).

1
update_num_grad int

If numerical gradients are used, they will be updated every update_num_grad iterations

1
write_dir str

Directory to write tensorboard files to

None
sess Session

Optional Tensorflow session that will be used if passed instead of creating or inferring one internally

None
trustscore_kwargs dict

keyword arguments for the trust score object used to define the k-d trees for each class. (See original alibi) counterfactual guided by prototypes docs.

None
d_type str

Distance metric. Options are "abdm", "mvdm" or "abdm-mvdm".

'abdm'
w float

If the combined metric "abdm-mvdm" is used, w is the weight (between 0 and 1) given to abdm.

None
disc_perc Union[List[Union[int, float]], Tuple[Union[int, float]]]

List with percentiles (int) used for discretization

[25, 50, 75]
standardize_cat_vars bool

Whether to return the standardized values for the numerical distances of each categorical feature.

False
smooth float

if the difference in the distances between the categorical variables is too large, then a lower value of the smooth argument (0, 1) can smoothen out this difference. This would only be relevant if one categorical variable has significantly larger differences between its categories than others. As a result, the counterfactual search process will likely leave that categorical variable unchanged.

1.0
center bool

Whether to center the numerical distances of the categorical variables between the min and max feature ranges.

True
update_feature_range bool

whether to update the feature_range parameter for the categorical variables based on the min and max values it computed in the constructor.

True
Source code in mercury/explainability/explainers/counter_fact_importance.py
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
def __init__(self,
    predict_fn: TP.Callable[[TP.Union['np.ndarray', pd.DataFrame]], TP.Union[float, 'np.ndarray']],
    train_data: TP.Union[np.ndarray, pd.DataFrame],
    feature_names: TP.List[str] = None,
    shape: TP.Tuple[TP.Any] = None,
    drop_features: TP.List[TP.Any] = [],
    kappa: float = 0.,
    beta: float = .1,
    feature_range: TP.Union[TP.Tuple[TP.Any], str] = None,
    gamma: float = 0.,
    ae_model: tf.keras.Model = None,
    enc_model: tf.keras.Model = None,
    theta: float = 5.0,
    cat_vars: dict = None,
    ohe: bool = False,
    use_kdtree: bool = False,
    learning_rate_init: float = 1e-2,
    max_iterations: int = 1000,
    c_init: float = 10.,
    c_steps: int = 10,
    eps: TP.Union[float,'np.ndarray'] = (1e-3, 1e-3),
    clip: TP.Tuple[float] = (-1000, 1000),
    update_num_grad: int = 1,
    write_dir: str = None,
    sess: tf.compat.v1.Session = None,
    trustscore_kwargs: dict = None,
    d_type: str = 'abdm',
    w: float = None,
    disc_perc: TP.Union[TP.List[TP.Union[int, float]], TP.Tuple[TP.Union[int, float]]] = [25, 50, 75],
    standardize_cat_vars: bool = False,
    smooth: float = 1.,
    center: bool = True,
    update_feature_range: bool = True
) -> None:
    if feature_names is None and type(train_data) is not pd.DataFrame:
        raise ValueError("If feature_names is not present, train_data" +
                         "should be a Pandas DataFrame. ")

    super(CounterfactualProtoExplainer,
          self).__init__(feature_names, drop_features)

    self.fitted = False
    self.feature_range = feature_range
    self.predict_fn = predict_fn
    self.shape = (1, train_data.shape[-1])
    self.feature_names = feature_names
    self.kappa = kappa
    self.beta = beta
    self.gamma = gamma
    self.ae_model = ae_model
    self.enc_model = enc_model
    self.theta = theta
    self.cat_vars = cat_vars
    self.ohe = ohe
    self.use_kdtree = True if not enc_model and not ae_model else False
    self.learning_rate_init = learning_rate_init
    self.max_iterations = max_iterations
    self.c_init = c_init
    self.c_steps = c_steps
    self.eps = eps
    self.clip = clip
    self.update_num_grad = update_num_grad
    self.write_dir = write_dir
    self.sess = sess

    self.train_data = train_data

    if isinstance(train_data, pd.DataFrame):
        self.feature_names = list(train_data.columns)
        self.feature_range = \
            (train_data.min().values, train_data.max().values) \
                if not feature_range else (-2e5, 2e5)
        self.train_data=train_data.values

    self.cf_super = CounterFactualProto(
        predict=predict_fn, shape=self.shape, kappa=self.kappa, beta=self.beta,
        feature_range=self.feature_range, gamma=self.gamma, ae_model=self.ae_model,
        enc_model=self.enc_model, theta=self.theta, cat_vars=self.cat_vars, ohe=self.ohe,
        use_kdtree=self.use_kdtree, learning_rate_init=self.learning_rate_init,
        max_iterations=self.max_iterations, c_init=self.c_init, c_steps=self.c_steps,
        eps=self.eps, clip=self.clip, update_num_grad=self.update_num_grad,
        write_dir=self.write_dir, sess=self.sess
    )

    self.cf_super = self.cf_super.fit(self.train_data, trustscore_kwargs=trustscore_kwargs, d_type=d_type,
        w=w, disc_perc=disc_perc, standardize_cat_vars=standardize_cat_vars,
        smooth=smooth, center=center, update_feature_range=update_feature_range)
explain(explain_data, Y=None, target_class=None, k=None, k_type='mean', threshold=0.0, verbose=False, print_every=100, log_every=100)

This method serves three purposes. Firstly as an interface between the user and the CounterFactualProto class explain method, secondly, if there are features that shouldn't be taken into account, the method recomputes feature_range to account for this and reinstantiates the parent class (CounterFactualProto) and thridly, if a new CounterFactualProto instance has been created it's fitted. Explain instance and return counterfactual with metadata.

Parameters:

Name Type Description Default
X

Instances to explain

required
Y ndarray

Labels for X as one-hot-encoding

None
target_class list

List with target classes used to find closest prototype. If None, the nearest prototype except for the predict class on the instance is used.

None
k int

Number of nearest instances used to define the prototype for a class. Defaults to using all instances belonging to the class if an encoder is used and to 1 for k-d trees.

None
k_type str

Use either the average encoding of the k nearest instances in a class (k_type='mean') or the k-nearest encoding in the class (k_type='point') to define the prototype of that class. Only relevant if an encoder is used to define the prototypes.

'mean'
threshold float

Threshold level for the ratio between the distance of the counterfactual to the prototype of the predicted class for the original instance over the distance to the prototype of the predicted class for the counterfactual. If the trust score is below the threshold, the proposed counterfactual does not meet the requirements.

0.0
verbose bool

Print intermediate results of optimization if True

False
print_every int

Print frequency if verbose is True

100
log_every int

Tensorboard log frequency if write directory is specified

100

Returns:

Type Description
Explanation

explanation, Dictionary containing the counterfactual with additional metadata

Source code in mercury/explainability/explainers/counter_fact_importance.py
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
def explain(
    self,
    explain_data: 'np.ndarray',
    Y: 'np.ndarray' = None,
    target_class: list = None,
    k: int = None,
    k_type: str = 'mean',
    threshold: float = 0.,
    verbose: bool = False,
    print_every: int = 100,
    log_every: int = 100) -> Explanation:
    """
    This method serves three purposes. Firstly as an interface between the user and the CounterFactualProto
    class explain method, secondly, if there are features that shouldn't be taken into account, the method
    recomputes feature_range to account for this and reinstantiates the parent class (CounterFactualProto)
    and thridly, if a new CounterFactualProto instance has been created it's fitted.
    Explain instance and return counterfactual with metadata.

    Args:
        X:
            Instances to explain
        Y:
            Labels for X as one-hot-encoding
        target_class:
            List with target classes used to find closest prototype. If None, the nearest prototype
            except for the predict class on the instance is used.
        k:
            Number of nearest instances used to define the prototype for a class. Defaults to using all
            instances belonging to the class if an encoder is used and to 1 for k-d trees.
        k_type:
            Use either the average encoding of the k nearest instances in a class (k_type='mean') or
            the k-nearest encoding in the class (k_type='point') to define the prototype of that class.
            Only relevant if an encoder is used to define the prototypes.
        threshold:
            Threshold level for the ratio between the distance of the counterfactual to the prototype of the
            predicted class for the original instance over the distance to the prototype of the predicted class
            for the counterfactual. If the trust score is below the threshold, the proposed counterfactual does
            not meet the requirements.
        verbose:
            Print intermediate results of optimization if True
        print_every:
            Print frequency if verbose is True
        log_every:
            Tensorboard log frequency if write directory is specified

    Returns:
        explanation, Dictionary containing the counterfactual with additional metadata
    """

    return self.cf_super.explain(
        explain_data,
        Y,
        target_class,
        k,
        k_type,
        threshold,
        verbose,
        print_every,
        log_every
    )

explainer

MercuryExplainer

Bases: ABC

load(filename='explainer.pkl') classmethod

Loads a previosly saved explainer with its internal state to a file.

Parameters:

Name Type Description Default
filename str

Path where the explainer is stored

'explainer.pkl'
Source code in mercury/explainability/explainers/explainer.py
23
24
25
26
27
28
29
30
31
32
33
@classmethod
def load(self, filename: str = "explainer.pkl"):
    """
    Loads a previosly saved explainer with its internal state to a file.

    Args:
        filename (str): Path where the explainer is stored
    """
    assert os.path.isfile(filename), "File does not exist or not a valid file"
    with open(filename, 'rb') as f:
        return dill.load(f)
save(filename='explainer.pkl')

Saves the explainer with its internal state to a file.

Parameters:

Name Type Description Default
filename str

Path where the explainer will be saved

'explainer.pkl'
Source code in mercury/explainability/explainers/explainer.py
12
13
14
15
16
17
18
19
20
21
def save(self, filename: str = "explainer.pkl"):
    """
    Saves the explainer with its internal state to a file.

    Args:
        filename (str): Path where the explainer will be saved
    """
    with open(filename, 'wb') as f:
        dill.dump(self, f)
    assert os.path.isfile(filename), "Error storing file"

partial_dependence

PartialDependenceExplainer(predict_fn, output_col='prediction', max_categorical_thresh=5, quantiles=0.05, resolution=50, verbose=False)

Bases: MercuryExplainer

This explainer will calculate the partial dependences for a ML model. Also contains a distributed (pyspark) implementation which allows PySpark transformers/pipelines to be explained via PD.

Parameters:

Name Type Description Default
predict_fn Callable

Inference function. This function will take a DataFrame (Pandas or PySpark) and must return the output of your estimator (usually a NumPy array for a 'plain' python model or a DataFrame in case of PySpark).

required
output_col str

Name of the output column name of the PySpark transformer model. This will only be used in case the passed explain data is of type pyspark.DataFrame

'prediction'
max_categorical_thresh int

If a column contains less unique values than this threshold, it will be auto-considered categorical.

5
quantiles float or tuple

Calculate the quantiles of the model predictions. If type==float, (quantiles, 1-quantiles) range will be calculated. If type==tuple, range will be quantiles. If None, the quantile calculation will be disabled (useful for saving time).

0.05
resolution int

Number of different values to test for each non-categorical variable. Lowering this value will increase speed but reduce resolution in the plots.

50
verbose bool

Print progress status. Default is False.

False
Example
# "Plain python" example
>>> features = dataset.loc[:, FEATURE_NAMES] # DataFrame with only features
# You can create a custom inference function.
>>> def my_inference_fn(feats):
...     return my_fitted_model.predict(feats)
# You can also pass anything as long as it is callable and receives the predictors (e.g. my_fitted_model.predict / predict_proba).
>>> explainer = PartialDependenceExplainer(my_fitted_model.predict)
>>> explanation = explainer.explain(features)
# Plot a summary of partial dependences for all the features.
>>> explanation.plot()
# Plot the partial dependence of a single variable.
>>> fig, ax = plt.subplots()
>>> explanation.plot_single('FEATURE_1', ax=ax)

# Pyspark Example
# Dataset with ONLY feature columns. (Assumed an already trained model)
>>> features = dataset.drop('target')
# Inference function
>>> def my_pred_fn(data):
...     temp_df = assembler.transform(data)
...     return my_transformer.transform(temp_df)
# We can tell the explainer not to calculate  partial dependences for certain features. This will save time.
>>> features_to_ignore = ['FEATURE_4', 'FEATURE_88']
# We pass our custom inference function and also, tell the explainer which column will hold the transformer's output (necessary only when explaining pyspark models).
>>> explainer = PartialDependenceExplainer(my_pred_fn, output_col='probability')
# Explain the model ignoring features.
>>> explanation = explainer.explain(features, ignore_feats=features_to_ignore)
Source code in mercury/explainability/explainers/partial_dependence.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(self,
        predict_fn: TP.Callable[[TP.Union["pandas.DataFrame", "pyspark.sql.DataFrame"]],  # noqa: F821
                             TP.Union[np.ndarray, "pyspark.sql.DataFrame"]],  # noqa: F821
        output_col:str = 'prediction',
        max_categorical_thresh:int = 5,
        quantiles:TP.Union[float ,tuple] = 0.05,
        resolution: int = 50,
        verbose:bool = False):
    self.predict_fn = predict_fn
    self.output_col = output_col
    # If the number of different values of a given feature is less
    # than this, we will treat it as categorical
    self._max_categorical_values = max_categorical_thresh
    self.verbose = verbose
    self.resolution = resolution

    if type(quantiles) == float:
        self.compute_quantiles = (quantiles, 1 - quantiles)
    else:
        self.compute_quantiles = quantiles
__base_impl(features, feat_names, categoricals)

Contains the logic for the base implementation, i.e. the one which uses np.ndarrays/pd.DataFrames and undistributed models.

Parameters:

Name Type Description Default
features DataFrame

pandas.DataFrame with ONLY the predictors.

required
feat_names list[str]

List containing the names of the features that will be explained

required
categoricals set[str]

Set with the feature names that will be forced to be categorical.

required

Returns:

Type Description
dict

Dictionary with the partial dependences of each selected feature

dict

and whether that feature is categorical or not.

Source code in mercury/explainability/explainers/partial_dependence.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def __base_impl(self, features:pd.DataFrame, feat_names:list, categoricals:set)->dict:
    """
    Contains the logic for the base implementation, i.e. the  one
    which uses np.ndarrays/pd.DataFrames and undistributed models.

    Args:
        features (pd.DataFrame): pandas.DataFrame with ONLY the predictors.
        feat_names (list[str]):
            List containing the names of the features that will be explained
        categoricals (set[str]):
            Set with the feature names that will be forced to be categorical.

    Returns:
        Dictionary with the partial dependences of each selected feature
        and whether that feature is categorical or not.
    """
    data = {}
    for colname in feat_names:
        start = time.time()
        data[colname] = {}
        uniques = features[colname].unique()
        nb_uniques = len(uniques)

        if features.loc[:,colname].dtype == np.dtype('O') or colname in categoricals:
            is_categorical = True
        else:
            is_categorical = False

        if not is_categorical and type(features.loc[:,colname].iloc[0].item()) == float and nb_uniques > self._max_categorical_values:
            grid = np.linspace(features[colname].min(), features[colname].max())
        elif not is_categorical and nb_uniques > self._max_categorical_values and type(features.loc[:,colname].iloc[0].item()) == int:
            step_size = max(1, np.abs((features[colname].max() - features[colname].min()) // self.resolution))
            grid = np.arange(features[colname].min(), features[colname].max(), step=step_size)
        else:
            is_categorical = True
            grid = uniques

        if self.verbose:
            type_msg = 'categorical' if is_categorical else 'continuous'
            print(f"Starting calculation for feature {colname} - Type: {type_msg}")
            if is_categorical:
                print(f"\tValues to test: {grid}")
            else:
                print(f"\tValue range to test: [{grid[0]}, {grid[-1]}] (only on {self.resolution} items)")

        data[colname]['values'] = grid
        pdep_means, pdep_lquant, pdep_uquant = self.__partial_dep_base(colname, features, grid)
        data[colname]['preds'] = pdep_means
        data[colname]['lower_quantile'] = pdep_lquant
        data[colname]['upper_quantile'] = pdep_uquant
        data[colname]['categorical'] = is_categorical

        elapsed = time.time() - start
        if self.verbose:
            print(f"Partial dependence for feature {colname} calculated. Took {elapsed:.2f} seconds")

    return data
__partial_dep_base(feat_name, data, grid)

Logic for computing the partial dependence of one feature for the base implementation (undistributed models)

Source code in mercury/explainability/explainers/partial_dependence.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def __partial_dep_base(self,
        feat_name:str,
        data: "pandas.DataFrame",  # noqa: F821
        grid: TP.Union[list, "np.array"])->"np.ndarray":
    """ Logic for computing the partial dependence of one feature for the base
    implementation (undistributed models)
    """
    preds = []
    lower_quantiles = []
    upper_quantiles = []
    data = data.copy()
    for val in grid:
        data[feat_name] = val
        model_preds = self.predict_fn(data)
        preds.append(np.mean(model_preds, axis=0))
        if self.compute_quantiles:
            lower_quantiles.append(np.quantile(model_preds, q=self.compute_quantiles[0],axis=0))
            upper_quantiles.append(np.quantile(model_preds, q=self.compute_quantiles[1],axis=0))
    return np.array(preds), np.array(lower_quantiles), np.array(upper_quantiles)
__partial_dep_pyspark(feat_name, data, grid, is_categorical=False)

Helper method for the PySpark implementation (distributed models). Computes the partial dependences of one feature given its type (categorical or not)

Source code in mercury/explainability/explainers/partial_dependence.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def __partial_dep_pyspark(self,
        feat_name:str,
        data: TP.Union["pandas.DataFrame", "pyspark.sql.DataFrame"],  # noqa: F821
        grid: TP.Union[list, "np.array"],
        is_categorical:bool = False)->"np.ndarray":
    """ Helper method for the PySpark implementation (distributed models).
    Computes the partial dependences of one feature given its type (categorical or not)
    """
    from pyspark.ml.stat import Summarizer
    from pyspark.sql.functions import col
    from pyspark.sql import functions as sqlf

    mean_preds = []
    lower_quantiles = []
    upper_quantiles = []

    for v in grid:
        if type(v) == str:
            temp_df = data.withColumn(feat_name, sqlf.lit(v))
        else:
            temp_df = data.withColumn(feat_name, sqlf.lit(float(v)))
        temp_df = self.predict_fn(temp_df)

        if temp_df.schema[self.output_col].simpleString().split(':')[-1] == 'vector':
            # Aggregate all the probability distributions
            mean_preds.append(temp_df.agg(Summarizer.mean(col(self.output_col))).collect()[0][0].values.tolist())

            if self.compute_quantiles:
                # Tricky to calculate quantiles as Spark doesnt support this for Vector
                def _to_cols(row):
                    return tuple(row["probability"].toArray().tolist())

                pred_cols = temp_df.rdd.map(_to_cols).toDF()
                quants = np.array(
                    pred_cols.approxQuantile(pred_cols.columns, self.compute_quantiles, 0.15)
                )
                lower_quantiles.append(quants[:, 0])
                upper_quantiles.append(quants[:, 1])
        else:
            # Aggregate all the prediction probabilities/targets
            mean_preds.append(temp_df.agg({self.output_col: "mean"}).collect()[0][0])
            if self.compute_quantiles:
                quants =temp_df.approxQuantile(self.output_col, self.compute_quantiles, 0.15)
                lower_quantiles.append(quants[0])
                upper_quantiles.append(quants[1])

    return np.array(mean_preds), np.array(lower_quantiles), np.array(upper_quantiles)
__pyspark_impl(features, feat_names, categoricals)

Contains the logic for the pyspark implementation, i.e. the one which uses pyspark.DataFrames and undistributed models.

Parameters:

Name Type Description Default
features DataFrame

pyspark.DataFrame with ONLY the predictors.

required
feat_names list

list[str] List containing the names of the features that will be explained

required
categoricals set

set[str] Set with the feature names that will be forced to be categorical.

required

Returns:

Type Description
dict

Dictionary with the partial dependences of each selected feature

dict

and whether that feature is categorical or not.

Source code in mercury/explainability/explainers/partial_dependence.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def __pyspark_impl(self, features: "pyspark.DataFrame", feat_names: list, categoricals: set) -> dict:  # noqa: F821
    """ Contains the logic for the pyspark implementation, i.e. the  one
    which uses pyspark.DataFrames and undistributed models.

    Args:
        features: pyspark.DataFrame with ONLY the predictors.
        feat_names: list[str]
            List containing the names of the features that will be explained
        categoricals: set[str]
            Set with the feature names that will be forced to be categorical.

    Returns:
        Dictionary with the partial dependences of each selected feature
        and whether that feature is categorical or not.
    """
    from pyspark.sql.functions import col
    from pyspark.sql import functions as sqlf

    from pyspark.sql.types import (
        FloatType,
        DoubleType,
        DecimalType,
        LongType,
        IntegerType,
        ShortType,
        ByteType
    )

    # Pyspark names of float and integer datatypes
    float_pyspark_types = (FloatType, DoubleType, DecimalType)
    int_pyspark_types = (FloatType, DoubleType, LongType, IntegerType, ShortType, ByteType)

    data = {}

    # Calculate unique values for disjoint set of columns from categoricals
    cols_to_check = [n for n in feat_names if n not in categoricals]
    # Calculate all statistics at once to save collects
    num_distinct_values = features.agg(
        *(sqlf.countDistinct(col(c)).alias(c) for c in cols_to_check)
    ).collect()[0]
    minimums = features.select([sqlf.min(c).alias(c) for c in features.columns]).collect()[0]
    maximums = features.select([sqlf.max(c).alias(c) for c in features.columns]).collect()[0]

    for colname in feat_names:
        start = time.time()

        data[colname] = {}
        # Determine whether this column is discrete, real or categorical
        is_categorical = False
        if isinstance(features.schema[colname].dataType, float_pyspark_types) and colname not in categoricals:
            grid = np.linspace(minimums[colname], maximums[colname])
        elif isinstance(features.schema[colname].dataType, int_pyspark_types) \
                and num_distinct_values[colname] > self._max_categorical_values:
            step_size = max(
                1, np.abs((maximums[colname] - minimums[colname]) // self.resolution)
            )
            grid = np.arange(minimums[colname], maximums[colname], step=step_size)
        else:
            grid = features.select(colname).distinct().rdd.flatMap(lambda x: x).collect()
            is_categorical=True

        if self.verbose:
            type_msg = 'categorical' if is_categorical else 'continuous'
            print(f"Starting calculation for feature {colname} - Type: {type_msg}")
            if is_categorical:
                print(f"\tValues to test: {grid}")
            else:
                print(f"\tValue range to test: [{grid[0]}, {grid[-1]}] (only on {self.resolution} items)")

        data[colname]['values'] = grid
        pdep_means, pdep_lquant, pdep_uquant = self.__partial_dep_pyspark(colname, features, grid, is_categorical)
        data[colname]['preds'] = pdep_means
        data[colname]['lower_quantile'] = pdep_lquant
        data[colname]['upper_quantile'] = pdep_uquant
        data[colname]['categorical'] = is_categorical

        elapsed = time.time() - start
        if self.verbose:
            print(f"Partial dependence for feature {colname} calculated. Took {elapsed:.2f} seconds")

    return data
explain(features, ignore_feats=None, categoricals=None)

This method will compute the partial dependences for a ML model. This explainer also contains a distributed (pyspark) implementation which allows PySpark transformers/pipelines to be explained via PD.

Parameters:

Name Type Description Default
features DataFrame or DataFrame

DataFrame with only the features needed by the model. This dataframe should ONLY contain the features consumed by the model and, in the PySpark case, the vector-assembled column should be generated inside of the predict_fn.

required
ignore_feats List

Feature names which won't be explained

None
categoricals List

of feature names that will be forced to be taken as categoricals. If it's empty, the explainer will guess what columns are categorical

None

Returns:

Type Description
PartialDependenceExplanation

PartialDependenceExplanation containing the explanation results.

Source code in mercury/explainability/explainers/partial_dependence.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def explain(self,
        features: TP.Union["pandas.DataFrame", "pyspark.sql.DataFrame"],  # noqa: F821
        ignore_feats: TP.List[str] = None,
        categoricals: TP.List[str] = None)->PartialDependenceExplanation:
    """
    This method will compute the partial dependences for a ML model.
    This explainer also contains a distributed (pyspark) implementation which
    allows PySpark transformers/pipelines to be explained via PD.

    Args:
        features (pandas.DataFrame or pyspark.sql.DataFrame):
            DataFrame with only the features needed by the model.
            This dataframe should ONLY contain the features consumed by the model and,
            in the PySpark case, the vector-assembled column should be generated inside
            of the predict_fn.
        ignore_feats (List):
            Feature names which won't be explained
        categoricals (List):
            of feature names that will be forced to be taken as
            categoricals. If it's empty, the explainer will guess what columns are
            categorical

    Returns:
        PartialDependenceExplanation containing the explanation results.
    """
    ignore_feats = ignore_feats if ignore_feats else []
    categoricals = categoricals if categoricals else []

    partial_dep_impl = self.__base_impl
    categoricals = set(categoricals)
    if type(features) != pd.DataFrame:
        partial_dep_impl = self.__pyspark_impl

    feat_names = [f for f in list(features.columns) if f not in set(ignore_feats)]

    dependences = partial_dep_impl(features, feat_names, categoricals)

    return PartialDependenceExplanation(dependences)

shuffle_importance

ShuffleImportanceExplainer(eval_fn, normalize=True)

Bases: MercuryExplainer

This explainer estimates the feature importance of each predictor for a given black-box model. The used strategy consists on random shuffling one variable at a time and, on each step, checking how much a particular metric worses. The features which make the model to perform the worst are the most important ones.

Parameters:

Name Type Description Default
eval_fn Callable

Custom evaluation function. It will recieve a DataFrame with features and a Numpy array with the target outputs for each instance. It must implement an inference process and return a metric to score the model performance on the given data. This metric must be real numbered. If we use a metric which higher values means better metric (like accuracy) and we use the parameter normalize=True (default option), then it is recommended to return the negative of that metric in eval_fn to make the results more intuitive. In the case of Pyspark explanations, the function will only recieve a PySpark in the first argument already containing the target column, whereas the second argument will be None.

required
normalize bool

Whether to scale the feature importances between 0 and 1. If True, then it shows the relative importance of the features. If False, then the feature importances will be the value of the metric returned in eval_fn when shuffling the features. Default value is True

True
Example
# "Plain python" example
>>> features = pd.read_csv(PATH_DATA)
>>> targets = features['target']  # Targets
>>> features = features.loc[:, FEATURE_NAMES] # DataFrame with only features
>>> def my_inference_function(features, targets):
...     predictions = model.predict(features)
...     return mean_squared_error(targets, predictions)
>>> explainer = ShuffleImportanceExplainer(my_inference_function)
>>> explanation = explainer.explain(features, targets)
>>> explanation.plot()

# Explain a pyspark model (or pipeline)
>>> features = sess.createDataFrame(pandas_dataframe)
>>> target_colname = "target"  # Column name with the ground truth labels
>>> def my_inference_function(features, targets):
...     model_inp = vectorAssembler.transform(features)
...     model_out = my_pyspark_transformer.transform(model_inp)
...     return my_evaluator.evaluate(model_out)
>>> explainer = ShuffleImportanceExplainer(my_inference_function)
>>> explanation = explainer.explain(features, target_colname)
>>> explanation.plot()
Source code in mercury/explainability/explainers/shuffle_importance.py
64
65
66
67
68
69
def __init__(self,
             eval_fn: Callable[[Union["pd.DataFrame", "pyspark.sql.DataFrame"], Union["np.ndarray", str]], float],  # noqa: F821
             normalize: bool = True
    ):
    self.eval_fn = eval_fn
    self.normalize = normalize
explain(predictors, target)

Explains the model given the data.

Parameters:

Name Type Description Default
predictors Union[DataFrame, DataFrame]

DataFrame with the features the model needs and that will be explained. In the case of PySpark, this dataframe must also contain a column with the target.

required
target Union[ndarray, str]

The ground-truth target for each one of the instances. In the case of Pyspark, this should be the name of the column in the DataFrame which holds the target.

required

Raises:

Type Description
ValueError

if type(predictors) == pyspark.sql.DataFrame && type(target) != str

ValueError

if type(predictors) == pyspark.sql.DataFrame && target not in predictors.columns

Returns:

Type Description
FeatureImportanceExplanation

FeatureImportanceExplanation with the performances of the model

Source code in mercury/explainability/explainers/shuffle_importance.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def explain(self,
            predictors: Union["pd.DataFrame", "pyspark.sql.DataFrame"],  # noqa: F821
            target: Union["np.ndarray", str]
    ) -> FeatureImportanceExplanation:
    """
    Explains the model given the data.

    Args:
        predictors (Union[pandas.DataFrame, pyspark.sql.DataFrame]):
            DataFrame with the features the model needs and that will be explained.
            In the case of PySpark, this dataframe must  also contain a column
            with the target.
        target (Union[numpy.ndarray, str]):
            The ground-truth target for each one of the instances. In the case of
            Pyspark, this should be the name of the column in the DataFrame which
            holds the target.

    Raises:
        ValueError: if type(predictors) == pyspark.sql.DataFrame && type(target) != str
        ValueError: if type(predictors) == pyspark.sql.DataFrame && target not in predictors.columns

    Returns:
        FeatureImportanceExplanation with the performances of the model
    """

    implementation = self.__impl_base
    feature_names = []
    # Cheap way of check if type(predictors) == pyspark.sql.DataFrame (without importing pyspark).
    if hasattr(type(predictors), 'toPandas'):
        if type(target) != str:
            raise ValueError("""If predictors is a Spark DataFrame, target should be the name \
                             of the tareget column (a str)""")
        implementation = self.__impl_pyspark
        feature_names = list(filter(lambda x: x!=target, predictors.columns))
        if len(feature_names) == len(predictors.columns):
            raise ValueError(f"""`target` must be the name of the target column in the DataFrame. \
                             Value passed: {target}""")
    else:
        feature_names = list(predictors.columns)
        if type(target) == str:
            feature_names = list(filter(lambda x: x!=target, feature_names))

    metrics = {}
    for col in feature_names:
        metrics[col] = implementation(predictors, target, col)
    if self.normalize:
        metrics = self._normalize_importances(metrics)
    return FeatureImportanceExplanation(metrics)