Skip to content

mercury.graph.ml

mercury.graph.ml.LouvainCommunities(min_modularity_gain=0.001, max_pass=2, max_iter=10, resolution=1, all_partitions=True, verbose=True)

Bases: BaseClass

Class that defines the functions that run a PySpark implementation of the Louvain algorithm to find the partition that maximizes the modularity of an undirected graph (as in 1).

This version of the algorithm differs from 1 in that the reassignment of nodes to new communities is calculated in parallel, not sequentially. That is, all nodes are reassigned at the same time and conflicts (i.e., 1 -> C2 and 2 -> C1) are resolved with a simple tie-breaking rule. This version also introduces the resolution parameter gamma, as in 2.

Contributed by Arturo Soberon Cedillo, Jose Antonio Guzman Vazquez and Isaac Dodanim Hernandez Garcia.


  1. Blondel V D, Guillaume J-L, Lambiotte R and Lefebvre E (2008). Fast unfolding of communities in large networks. Journal of Statistical Mechanics: Theory and Experiment, 2008. https://doi.org/10.1088/1742-5468/2008/10/p10008 

  2. Aynaud T, Blondel V D, Guillaume J-L and Lambiotte R (2013). Multilevel local optimization of modularity. Graph Partitioning (315--345), 2013. 

Parameters:

Name Type Description Default
min_modularity_gain float

Modularity gain threshold between each pass. The algorithm stops if the gain in modularity between the current pass and the previous one is less than the given threshold.

0.001
max_pass int

Maximum number of passes.

2
max_iter int

Maximum number of iterations within each pass.

10
resolution float

The resolution parameter gamma. Its value must be greater or equal to zero. If resolution is less than 1, modularity favors larger communities, while values greater than 1 favor smaller communities.

1
all_partitions bool

If True, the function will return all the partitions found at each step of the algorithm (i.e., pass0, pass1, pass2, ..., pass20). If False, only the last (and best) partition will be returned.

True
verbose bool

If True, print progress information during the Louvain algorithm execution. Defaults to True.

True
Source code in mercury/graph/ml/louvain.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    min_modularity_gain=1e-03,
    max_pass=2,
    max_iter=10,
    resolution: Union[float, int] = 1,
    all_partitions=True,
    verbose=True,
):
    self.min_modularity_gain = min_modularity_gain
    self.max_pass = max_pass
    self.max_iter = max_iter
    self.resolution = resolution
    self.all_partitions = all_partitions
    self.verbose = verbose

    # Check resolution
    if resolution < 0:
        exceptionMsg = f"Resolution value is {resolution} and cannot be < 0."
        raise ValueError(exceptionMsg)

fit(g)

Parameters:

Name Type Description Default
g Graph

A mercury graph structure.

required

Returns:

Type Description
self

Fitted self (or raises an error).

Source code in mercury/graph/ml/louvain.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def fit(self, g: Graph):
    """
    Args:
        g (Graph): A mercury graph structure.

    Returns:
        (self): Fitted self (or raises an error).
    """
    edges = g.graphframe.edges

    # Verify edges input
    self._verify_data(
        df=edges,
        expected_cols_grouping=["src", "dst"],
        expected_cols_others=["weight"],
    )

    # Init dataframe to be returned
    ret = (
        edges.selectExpr("src as id")
        .unionByName(edges.selectExpr("dst as id"))
        .distinct()
        .withColumn("pass0", F.row_number().over(Window.orderBy("id")))
    ).checkpoint()

    # Convert edges to anonymized src's and dst's
    edges = (
        edges.selectExpr("src as src0", "dst as dst0", "weight")
        .join(other=ret.selectExpr("id as src0", "pass0 as src"), on="src0")
        .join(other=ret.selectExpr("id as dst0", "pass0 as dst"), on="dst0")
        .select("src", "dst", "weight")
    ).checkpoint()

    # Calculate m and initialize modularity
    m = self._calculate_m(edges)
    modularity0 = -1.0

    # Begin pass
    canPass, _pass = True, 0
    while canPass:

        # Declare naive partition
        p1 = (
            edges.selectExpr("src as id")
            .unionByName(edges.selectExpr("dst as id"))
            .distinct()
            .withColumn("c", F.col("id"))
        )

        # Begin iterations within pass
        canIter, _iter = True, 0
        # Carry reference to previously cached p2 to call unpersist()
        prev_p2 = None
        while canIter:

            if _iter >= self.max_iter:
                break

            # Print progress
            if self.verbose:
                print(f"Starting Pass {_pass} Iteration {_iter}.")

            # Create new partition and check if movements were made
            p2 = self._reassign_all(edges, p1)
            # Break complex lineage caused by loops first
            p2 = p2.checkpoint()
            p2.cache()

            canIter = len(p2.where("cx != cj").take(1)) > 0
            if canIter:
                p1 = p2.selectExpr("id", "cj as c")
            if prev_p2 is not None:
                prev_p2.unpersist()
            prev_p2 = p2
            _iter += 1

        # Calculate new modularity and update pass counter
        modularity1 = self._calculate_modularity(edges=edges, partition=p1, m=m)

        # Declare stopping criterion and update old modularity
        canPass = (modularity1 - modularity0 > self.min_modularity_gain) and (
            _pass < self.max_pass
        )
        modularity0 = modularity1

        self.modularity_ = modularity0

        # Update ret and compress graph
        if canPass:
            ret = ret.join(
                other=p1.selectExpr(f"id as pass{_pass}", f"c as pass{_pass + 1}"),
                on=f"pass{_pass}",
            ).checkpoint()

            edges = (
                self._label_edges(edges, p1)
                .select("cSrc", "cDst", "weight")
                .groupBy("cSrc", "cDst")
                .agg(F.sum("weight").alias("weight"))
                .selectExpr("cSrc as src", "cDst as dst", "weight")
            ).checkpoint()

        prev_p2.unpersist()
        _pass += 1

    # Return final dataframe with sorted columns
    if self.all_partitions:

        # Return sorted columns
        cols = self._sort_passes(ret)
        ret = ret.select(cols)

    # Return final dataframe with id & community
    else:
        _last = self._last_pass(ret)
        ret = ret.selectExpr("id as node_id", f"{_last} as cluster")

    self.labels_ = ret

    return self

mercury.graph.ml.SparkRandomWalker(num_epochs=10, batch_size=1, n_sampling_edges=None)

Bases: BaseClass

Class to perform random walks from a specific source_id node within a given Graph

Parameters:

Name Type Description Default
num_epochs int

Number of epochs. This is the total number of steps the iteration goes through.

10
batch_size int

This forces caching the random walks computed so far and breaks planning each time this number of epochs is reached. The default value is a high number to avoid this entering at all. In really large jobs, you may want to set this parameter to avoid possible overflows even if it can add some extra time to the process. Note that with a high number of epochs and nodes resource requirements for the active part of your random walks can be high. This allows to "cache a continue" so to say.

1
n_sampling_edges int

by setting this parameter you can limit at each timestep the number of new paths opened from each node. This is useful when the graph contains nodes with very high out-degree, where running the algorithm several epochs is not feasible. When using this parameter, the graph will consider only at most edge_sampling outgoing edges at each epoch for each path. If the last node of the path contains more than edge_sampling the selected edges are sampled using its weight.

None
Source code in mercury/graph/ml/spark_randomwalker.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, num_epochs=10, batch_size=1, n_sampling_edges=None):
    """
    Class to perform random walks from a specific source_id node within a given Graph

    Args:
        num_epochs (int): Number of epochs. This is the total number of steps the iteration goes through.
        batch_size (int): This forces caching the random walks computed so far and breaks planning each time this number of epochs
            is reached. The default value is a high number to avoid this entering at all. In really large jobs, you may want to
            set this parameter to avoid possible overflows even if it can add some extra time to the process. Note that with a high
            number of epochs and nodes resource requirements for the active part of your random walks can be high. This allows to
            "cache a continue" so to say.
        n_sampling_edges (int): by setting this parameter you can limit at each timestep the number of new paths opened from each node.
            This is useful when the graph contains nodes with very high out-degree, where running the algorithm several epochs is
            not feasible. When using this parameter, the graph will consider only at most `edge_sampling` outgoing edges at each
            epoch for each path. If the last node of the path contains more than `edge_sampling` the selected edges are sampled
            using its weight.
    """
    self.num_epochs = num_epochs
    self.batch_size = batch_size
    self.n_sampling_edges = n_sampling_edges

fit(G, source_id)

Perform random walks from a specific source_id node within a given Graph

Parameters:

Name Type Description Default
G mercury.graph Graph asset

A mercury.graph Graph

required
source_id int / str / list

the source vertex or list for vertices to start the random walks.

required

Returns:

Type Description
self

Fitted self (or raises an error)

Attribute paths_ contains a Spark Dataframe with a columns random_walks containing an array of the elements of the path walked and another column with the corresponding weights. The weights represent the probability of following that specific path starting from source_id.

Source code in mercury/graph/ml/spark_randomwalker.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def fit(self, G: Graph, source_id):
    """
    Perform random walks from a specific source_id node within a given Graph

    Args:
        G (mercury.graph Graph asset): A `mercury.graph` Graph
        source_id (int/str/list): the source vertex or list for vertices to start the random walks.

    Returns:
        (self): Fitted self (or raises an error)

    Attribute `paths_` contains a Spark Dataframe with a columns `random_walks` containing an array of the elements
    of the path walked and another column with the corresponding weights. The weights represent the probability of
    following that specific path starting from source_id.
    """
    self.paths_ = self._run_rw(G, source_id)

    return self

mercury.graph.ml.SparkSpreadingActivation(attribute='influence', spreading_factor=0.2, transfer_function='weighted', steps=1, influenced_by=False)

Bases: BaseClass

This class is a model that represents a “word-of-mouth” scenario where a node influences his neighbors, from where the influence spreads to other neighbors, and so on.

At the end of the diffusion process, we inspect the amount of influence received by each node. Using a threshold-based technique, a node that is currently not influenced can be declared to be a potential future one, based on the influence that has been accumulated.

The diffusion model is based on Spreading Activation (SPA) techniques proposed in cognitive psychology and later used for trust metric computations. For more details, please see paper entitled "Social Ties and their Relevance to Churn in Mobile Telecom Networks"

Parameters:

Name Type Description Default
attribute str

Column name which will store the amount of influence spread

'influence'
spreading_factor float

Percentage of influence to distribute. Low values favor influence proximity to the source of injection, while high values allow the influence to also reach nodes which are further away. It must be a value in the range (0,1). Default value is 0.2

0.2
transfer_function str

Allowed values: "weighted" or "unweighted". Once a node decides what fraction of energy to distribute, the next step is to decide what fraction of the energy is transferred to each neighbor. This is controlled by the Transfer Function. If "weighted" then the energy distributed along the directed edge depends on its relatively weight compared to the sum of weights of all outgoing edges of X. If "unweighted", then the energy distributed along the edge is independent of its relatively weight.

'weighted'
steps int

Number of steps to perform

1
influenced_by bool

if True, and extra column "influenced_by" is calculated which contains the seed nodes that have spread some influence to a given node. When True, the ids of the nodes cannot contain commas ",". Note that seed_nodes will have at least their own (remaining) influence

False
Source code in mercury/graph/ml/spark_spreadactivation.py
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(
    self,
    attribute: str = "influence",
    spreading_factor: float = 0.2,
    transfer_function: str = "weighted",
    steps: int = 1,
    influenced_by: bool = False,
):
    self.attribute = attribute
    self.spreading_factor = spreading_factor
    self.transfer_function = transfer_function
    self.steps = steps
    self.influenced_by = influenced_by

fit(g, seed_nodes)

Perform all iterations of spread_activation

Parameters:

Name Type Description Default
g Graph

A mercury.graph Graph object.

required
seed_nodes Union[List, DataFrame]

Collection of nodes that are the "seed" or are the source to spread the influence. It must be pyspark dataframe with column 'id' or python list

required

Returns:

Type Description
self

Fitted self

Source code in mercury/graph/ml/spark_spreadactivation.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def fit(
    self,
    g: Graph,
    seed_nodes: Union[List, "pyspark.sql.DataFrame"],
):
    """
    Perform all iterations of spread_activation

    Args:
        g (mercury.graph.core.Graph): A `mercury.graph` Graph object.
        seed_nodes (Union[List, pyspark.sql.DataFrame]): Collection of nodes that are the "seed" or are the source to spread
            the influence. It must be pyspark dataframe with column 'id' or python list

    Returns:
        (self): Fitted self
    """

    # Set seed nodes which are the source of influence
    g = self._set_seed_nodes(g, seed_nodes)

    # Compute degrees
    g = self._compute_degrees(g)

    # Number of iterations specified for spread activation
    for _ in range(0, self.steps, 1):
        g = self._spread_activation_step(
            g,
        )

    # Graph with updated attributes
    self.fitted_graph_ = g
    # Influences as DataFrame
    self.influences_ = self.fitted_graph_.nodes_as_dataframe().select(
        "id", "influence"
    )

    return self

mercury.graph.ml.SpectralClustering(n_clusters=2, mode='networkx', max_iterations=10, random_state=0)

Bases: BaseClass

Implementation of the spectral clustering algorithm which detect communities inside a graph.

Contributed by Gibran Gabriel Otazo Sanchez.

Parameters:

Name Type Description Default
n_clusters int

The number of clusters that you want to detect.

2
random_state int

Seed for reproducibility

0
mode str

Calculation mode. Pass 'networkx' for using pandas + networkx or 'spark' for spark + graphframes

'networkx'
max_iterations int

Max iterations parameter (only used if mode==spark)

10
Source code in mercury/graph/ml/spectral.py
32
33
34
35
36
37
38
39
40
41
def __init__(
    self, n_clusters=2, mode="networkx", max_iterations=10, random_state=0
):
    self.n_clusters = n_clusters
    self.mode = mode
    self.max_iterations = max_iterations
    self.random_state = random_state

    if self.mode not in ("networkx", "spark"):
        raise ValueError("Error: Mode must be either 'networkx' or 'spark'")

fit(graph)

Find the optimal clusters of a given graph. The function returns nothing, but saves the clusters and the modularity in the object self.

Parameters:

Name Type Description Default
graph Graph

A mercury graph structure.

required

Returns:

Type Description
self

Fitted self (or raises an error)

Source code in mercury/graph/ml/spectral.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def fit(self, graph: Graph):
    """
    Find the optimal clusters of a given graph. The function returns nothing, but saves the clusters and
    the modularity in the object self.

    Args:
        graph (Graph): A mercury graph structure.

    Returns:
        (self): Fitted self (or raises an error)

    """
    if self.mode == "networkx":
        self._fit_networkx(graph)
    else:
        self._fit_spark(graph)

    return self

mercury.graph.ml.Transition()

Bases: BaseClass

Create an interface class to manage the adjacency matrix of a directed graph as a transition matrix. This enables computing distributions of probabilities over the nodes after a given number of iterations.

Source code in mercury/graph/ml/transition.py
18
19
def __init__(self):
    self.fitted_graph_ = None

fit(G)

Converts the adjacency matrix into a transition matrix. Transition matrices are used to compute the distribution of probability of being in each of the nodes (or states) of a directed graph (or Markov process). The distribution for state s is:

  • \(s_t = T*s_{t-1}\)

Where:

T is the transition matrix. After calling.fit(), the adjacency matrix is the transition matrix. You can use .to_pandas() to see it. \(s_{t-1}\) is the previous state.

What .fit() does is scaling the non-zero rows to make them sum 1 as they are probability distributions and make the zero rows recurrent states. A recurrent state is a final state, a state whose next state is itself.

Parameters:

Name Type Description Default
G Graph

A mercury.graph Graph.

required

Returns:

Type Description
self

Fitted self (or raises an error).

Note

If created using NetworkX directly, the name of the weight must be 'weight' and must be positive. The recommended way to create the graph is using .set_row() which will always name the weight as 'weight' but does not check the value.

Source code in mercury/graph/ml/transition.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def fit(self, G: Graph):
    """
    Converts the adjacency matrix into a transition matrix. Transition matrices are used to compute the distribution of probability
    of being in each of the nodes (or states) of a directed graph (or Markov process). The distribution for state s is:

    * $s_t = T*s_{t-1}$

    Where:

    T is the transition matrix. After calling.fit(), the adjacency matrix is the transition matrix. You can use .to_pandas() to see it.
    $s_{t-1}$ is the previous state.

    What .fit() does is scaling the non-zero rows to make them sum 1 as they are probability distributions and make the zero rows
    recurrent states. A recurrent state is a final state, a state whose next state is itself.

    Args:
        G (Graph): A `mercury.graph` Graph.

    Returns:
        (self): Fitted self (or raises an error).

    Note:
        If created using NetworkX directly, the name of the weight must be 'weight' and must be positive. The recommended way
        to create the graph is using .set_row() which will always name the weight as 'weight' but does not check the value.

    """
    names = list(G.networkx.nodes)
    adj_m = nx.adjacency_matrix(G.networkx, weight="weight", dtype=float)

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")

        for i in range(adj_m.shape[0]):
            row = adj_m[[i], :]
            tot = row.sum()

            if tot == 0:
                row[0, i] = 1
            else:
                row = row / tot

            adj_m[[i], :] = row

    df = pd.DataFrame(adj_m.todense(), index=names, columns=names)
    self.fitted_graph_ = Graph(nx.from_pandas_adjacency(df, create_using=nx.DiGraph))

    return self

to_pandas(num_iterations=1)

Returns the adjacency (which is the transition matrix after fit() was called) for a given number of iterations as a pandas dataframe with labeled rows and columns.

Parameters:

Name Type Description Default
num_iterations int

If you want to compute the matrix for a different number of iterations, k, you can use this argument to raise the matrix to any non negative integer, since \(s_{t+k} = T^k*s_t\)

1

Returns:

Type Description
DataFrame

The transition matrix for num_iterations.

Note

This method does not automatically call fit(). This allows inspecting the adjacency matrix as a pandas dataframe. The result of computing num_iterations will not make sense if fit() has not been called before to_pandas().

Source code in mercury/graph/ml/transition.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def to_pandas(self, num_iterations=1):
    """
    Returns the adjacency (which is the transition matrix after `fit()` was called) for a given number of iterations as a pandas
    dataframe with labeled rows and columns.

    Args:
        num_iterations (int): If you want to compute the matrix for a different number of iterations, k, you can use this argument to
            raise the matrix to any non negative integer, since $s_{t+k} = T^k*s_t$

    Returns:
        (pd.DataFrame): The transition matrix for num_iterations.

    Note:
        This method does not automatically call `fit()`. This allows inspecting the adjacency matrix as a pandas dataframe.
        The result of computing num_iterations will not make sense if `fit()` has not been called before `to_pandas()`.

    """
    if self.fitted_graph_ is None:
        raise ValueError("Error: fit() must be called first.")

    names = list(self.fitted_graph_.networkx.nodes)
    adj_m = nx.adjacency_matrix(self.fitted_graph_.networkx, weight="weight").todense()

    if num_iterations != 1:
        adj_m = matrix_power(adj_m, num_iterations)

    return pd.DataFrame(adj_m, index=names, columns=names)