Skip to content

mercury.graph.ml

mercury.graph.ml.LouvainCommunities(min_modularity_gain=0.001, max_pass=2, max_iter=10, resolution=1, all_partitions=True, verbose=True)

Bases: BaseClass

Class that defines the functions that run a PySpark implementation of the Louvain algorithm to find the partition that maximizes the modularity of an undirected graph (as in 1).

This version of the algorithm differs from 1 in that the reassignment of nodes to new communities is calculated in parallel, not sequentially. That is, all nodes are reassigned at the same time and conflicts (i.e., 1 -> C2 and 2 -> C1) are resolved with a simple tie-breaking rule. This version also introduces the resolution parameter gamma, as in 2.

Contributed by Arturo Soberon Cedillo, Jose Antonio Guzman Vazquez and Isaac Dodanim Hernandez Garcia.


  1. Blondel V D, Guillaume J-L, Lambiotte R and Lefebvre E (2008). Fast unfolding of communities in large networks. Journal of Statistical Mechanics: Theory and Experiment, 2008. https://doi.org/10.1088/1742-5468/2008/10/p10008 

  2. Aynaud T, Blondel V D, Guillaume J-L and Lambiotte R (2013). Multilevel local optimization of modularity. Graph Partitioning (315--345), 2013. 

Parameters:

Name Type Description Default
min_modularity_gain float

Modularity gain threshold between each pass. The algorithm stops if the gain in modularity between the current pass and the previous one is less than the given threshold.

0.001
max_pass int

Maximum number of passes.

2
max_iter int

Maximum number of iterations within each pass.

10
resolution float

The resolution parameter gamma. Its value must be greater or equal to zero. If resolution is less than 1, modularity favors larger communities, while values greater than 1 favor smaller communities.

1
all_partitions bool

If True, the function will return all the partitions found at each step of the algorithm (i.e., pass0, pass1, pass2, ..., pass20). If False, only the last (and best) partition will be returned.

True
verbose bool

If True, print progress information during the Louvain algorithm execution. Defaults to True.

True
Source code in mercury/graph/ml/louvain.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    min_modularity_gain=1e-03,
    max_pass=2,
    max_iter=10,
    resolution: Union[float, int] = 1,
    all_partitions=True,
    verbose=True,
):
    self.min_modularity_gain = min_modularity_gain
    self.max_pass = max_pass
    self.max_iter = max_iter
    self.resolution = resolution
    self.all_partitions = all_partitions
    self.verbose = verbose

    # Check resolution
    if resolution < 0:
        exceptionMsg = f"Resolution value is {resolution} and cannot be < 0."
        raise ValueError(exceptionMsg)

_calculate_m(edges)

Get the weighted size of an undirected graph (where \(m\) is defined as \(m = \frac{1}{2} \sum_{ij} A_{ij}\))).

Parameters:

Name Type Description Default
edges DataFrame

A pyspark dataframe representing the edges of an undirected graph. It must have src and dst as its columns. The user may also specify the weight of each edge via the additional weight column (optional).

required

Returns:

Type Description
int

Returns the weighted size of the graph.

Source code in mercury/graph/ml/louvain.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def _calculate_m(self, edges) -> int:
    """Get the weighted size of an undirected graph (where $m$ is
    defined as $m = \\frac{1}{2} \\sum_{ij} A_{ij}$)).

    Args:
        edges (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the edges of an undirected graph.
            It must have `src` and `dst` as its columns. The user may also
            specify the weight of each edge via the additional `weight` column
            (optional).

    Returns:
        (int): Returns the weighted size of the graph.
    """

    m = edges.select(F.sum("weight")).collect()[0][0]

    return int(m)

_calculate_modularity(edges, partition, m=None)

This function calculates the modularity of a partition.

Parameters:

Name Type Description Default
edges DataFrame

A pyspark dataframe representing the edges of an undirected graph. It must have src and dst as its columns. The user may also specify the weight of each edge via the additional weight column (optional).

required
partition DataFrame

A pyspark dataframe representing the partition of an undirected graph (i.e., a table that indicates the community that each node belongs to). The dataframe must have columns id (indicating each node's ID) and c (indicating each node's assigned community).

required
m int

The weighted size of the graph (the output of _get_m()).

None

Returns:

Type Description
float

Bound between -1 and 1 representing the modularity of a partition. The output may exceed these bounds depending on the value of resolution (which is set to 1 by default).

Source code in mercury/graph/ml/louvain.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def _calculate_modularity(self, edges, partition, m=None) -> float:
    """This function calculates the modularity of a partition.

    Args:
        edges (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the edges of an undirected graph.
            It must have `src` and `dst` as its columns. The user may also
            specify the weight of each edge via the additional `weight` column
            (optional).

        partition (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the partition of an undirected
            graph (i.e., a table that indicates the community that each node
            belongs to). The dataframe must have columns `id` (indicating each
            node's ID) and `c` (indicating each node's assigned community).

        m (int):
            The weighted size of the graph (the output of `_get_m()`).

    Returns:
        (float):
            Bound between -1 and 1 representing the modularity of a
            partition. The output may exceed these bounds depending on the value of
            `resolution` (which is set to 1 by default).
    """

    # Calculate m (if necessary) and norm
    m = self._calculate_m(edges) if m is None else m
    norm = 1 / (2 * m)

    # Declare basic inputs
    labeledEdges = self._label_edges(edges, partition)
    labeledDegrees = self._label_degrees(edges, partition)

    # Get term on LHS
    k_in = (labeledEdges.where("cSrc = cDst").select(F.sum("weight"))).collect()[0][
        0
    ]

    # Handle NoneType
    k_in = 0 if k_in is None else k_in

    # Get term on RHS
    k_out = (
        labeledDegrees.groupby("c")
        .agg(F.sum("degree").alias("kC"))
        .selectExpr(f"{self.resolution} * sum(kC * kC)")
    ).collect()[0][0]

    # Return modularity
    return (k_in / m) - (norm**2 * float(k_out))

_label_degrees(edges, partition)

This function uses the edges of a graph to calculate the weighted degrees of each node and joins the result with the partition passed by the user.

Parameters:

Name Type Description Default
edges DataFrame

A pyspark dataframe representing the edges of an undirected graph. It must have src and dst as its columns. The user may also specify the weight of each edge via the additional weight column (optional).

required
partition DataFrame

A pyspark dataframe representing the partition of an undirected graph (i.e., a table that indicates the community that each node belongs to). The dataframe must have columns id (indicating each node's ID) and c (indicating each node's assigned community).

required

Returns:

Type Description
Dataframe

This function returns a dataframe with columns id (representing the ID of each node in the graph), c (representing each node's community) and degree (representing each node's degree).

Source code in mercury/graph/ml/louvain.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def _label_degrees(self, edges, partition):
    """
    This function uses the edges of a graph to calculate the weighted degrees
    of each node and joins the result with the partition passed by the user.

    Args:
        edges (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the edges of an undirected graph.
            It must have `src` and `dst` as its columns. The user may also
            specify the weight of each edge via the additional `weight` column
            (optional).

        partition (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the partition of an undirected
            graph (i.e., a table that indicates the community that each node
            belongs to). The dataframe must have columns `id` (indicating each
            node's ID) and `c` (indicating each node's assigned community).

    Returns:
        (Dataframe):
            This function returns a dataframe with columns `id` (representing the ID
            of each node in the graph), `c` (representing each node's community) and
            `degree` (representing each node's degree).
    """

    # Get id, community and weighted degree
    ret = (
        partition.join(
            # Unite sources and destinations to avoid double join
            other=(
                edges.selectExpr("src as id", "weight")
                .unionByName(edges.selectExpr("dst as id", "weight"))
                .groupBy("id")
                .agg(F.sum("weight").alias("degree"))
            ),
            on="id",
            how="inner",
        )
        .select("id", "c", "degree")
        .checkpoint()
    )

    return ret

_label_edges(edges, partition)

This function uses partition to add two columns to edges. The added columns cSrc and cDst indicate the community that the source and destination nodes belong to.

Args:
edges (pyspark.sql.dataframe.DataFrame):
    A pyspark dataframe representing the edges of an undirected graph.
    It must have `src` and `dst` as its columns. The user may also
    specify the weight of each edge via the additional `weight` column
    (optional).

partition (pyspark.sql.dataframe.DataFrame):
    A pyspark dataframe representing the partition of an undirected
    graph (i.e., a table that indicates the community that each node
    belongs to). The dataframe must have columns `id` (indicating each
    node's ID) and `c` (indicating each node's assigned community).

Returns:

Type Description
DataFrame

This function returns edges with two additional columns: the community that the source node belongs to (cSrc) and the community that the destination node belongs to (cDst).

Source code in mercury/graph/ml/louvain.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
def _label_edges(self, edges, partition):
    """This function uses `partition` to add two columns to `edges`. The added
    columns `cSrc` and `cDst` indicate the community that the source and
    destination nodes belong to.

        Args:
        edges (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the edges of an undirected graph.
            It must have `src` and `dst` as its columns. The user may also
            specify the weight of each edge via the additional `weight` column
            (optional).

        partition (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the partition of an undirected
            graph (i.e., a table that indicates the community that each node
            belongs to). The dataframe must have columns `id` (indicating each
            node's ID) and `c` (indicating each node's assigned community).

    Returns:
        (pyspark.sql.dataframe.DataFrame):
            This function returns `edges` with two additional columns: the community
            that the source node belongs to (`cSrc`) and the community that the
            destination node belongs to (`cDst`).
    """

    # Get communities
    ret = (
        edges
        # Start off with src, dst and weight
        .select("src", "dst", "weight")
        # Source destination
        .join(
            other=partition.selectExpr("id as src", "c as cSrc"),
            on="src",
            how="left",
        )
        # Destination community
        .join(
            other=partition.selectExpr("id as dst", "c as cDst"),
            on="dst",
            how="left",
        ).checkpoint()
    )

    return ret

_last_pass(df)

Returns the column name of the last pass.

Parameters:

Name Type Description Default
df DataFrame

A pyspark dataframe representing the series of partitions made by LouvainCommunities (a dataframe with columns 'id', 'pass0', 'pass1', 'pass2', 'pass3', etc.).

required
Source code in mercury/graph/ml/louvain.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def _last_pass(self, df):
    """Returns the column name of the last pass.

    Args:
        df (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the series of partitions made by
            `LouvainCommunities` (a dataframe with columns 'id', 'pass0',
            'pass1', 'pass2', 'pass3', etc.).
    """

    # Get all `passX` columns as list
    cols = [col for col in df.columns if "pass" in col]

    # Get last pass as int
    _max = max([int(col.split("pass")[1]) for col in cols])

    # Return last pass as string
    return f"pass{_max}"

_reassign_all(edges, partition, m=None)

This function simultaneously reassigns all the nodes in a graph to their corresponding optimal neighboring communities.

Parameters:

Name Type Description Default
edges DataFrame

A pyspark dataframe representing the edges of an undirected graph. It must have src and dst as its columns. The user may also specify the weight of each edge via the additional weight column (optional).

required
partition DataFrame

A pyspark dataframe representing the partition of an undirected graph (i.e., a table that indicates the community that each node belongs to). The dataframe must have columns id (indicating each node's ID) and c (indicating each node's assigned community).

required
m int

The weighted size of the graph (the output of getM()).

None

Returns:

Type Description
DataFrame

A pyspark dataframe with the same number of rows as there are vertices. Columns cx and cj represent each node's current and optimal neighboring community (accordingly).

Source code in mercury/graph/ml/louvain.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
def _reassign_all(self, edges, partition, m=None):
    """This function simultaneously reassigns all the nodes in a graph to their
    corresponding optimal neighboring communities.

    Args:
        edges (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the edges of an undirected graph.
            It must have `src` and `dst` as its columns. The user may also
            specify the weight of each edge via the additional `weight` column
            (optional).

        partition (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the partition of an undirected
            graph (i.e., a table that indicates the community that each node
            belongs to). The dataframe must have columns `id` (indicating each
            node's ID) and `c` (indicating each node's assigned community).

        m (int):
            The weighted size of the graph (the output of `getM()`).

    Returns:
        (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe with the same number of rows as there are vertices.
            Columns `cx` and `cj` represent each node's current and optimal
            neighboring community (accordingly).
    """

    # Calculate m if necessary
    m = self._calculate_m(edges) if m is None else m

    # Label edges and degrees here to avoid long lineages
    labeledDegrees = self._label_degrees(edges, partition)
    labeledEdges = self._label_edges(edges, partition)

    # Add sum(ki) for i in C to labeledDegrees
    dq = (
        labeledDegrees.withColumn(
            "cx_sum_ki", F.sum("degree").over(Window.partitionBy("c"))
        )
        # Get sum(Aix) for i in Cx\{x}
        .join(
            other=(
                labeledEdges.where("(src != dst) and (cSrc = cDst)")
                .selectExpr("src as id", "weight")
                .unionByName(
                    labeledEdges.where("(src != dst) and (cSrc = cDst)").selectExpr(
                        "dst as id", "weight"
                    )
                )
                .groupBy("id")
                .agg(F.sum("weight").alias("cx_sum_aix"))
            ),
            on="id",
            how="left",
        )
        # Get sum(Aix) for i in Cj (relationship 1:J)
        .join(
            other=(
                labeledEdges.where("cSrc != cDst")
                .selectExpr("src as id", "cDst as cj", "weight")
                .unionByName(
                    labeledEdges.where("cSrc != cDst").selectExpr(
                        "dst as id", "cSrc as cj", "weight"
                    )
                )
                .groupBy("id", "cj")
                .agg(F.sum("weight").alias("cj_sum_aix"))
            ),
            on="id",
            how="left",
        )
        # Get sum(ki) for i in Cj
        .join(
            other=(
                labeledDegrees.withColumnRenamed("c", "cj")
                .groupBy("cj")
                .agg(F.sum("degree").alias("cj_sum_ki"))
            ),
            on="cj",
            how="left",
        )
        # Calculate modularity change of each possible switch (Cx -> {x} -> Cj)
        .withColumn(
            "mdq",
            F.coalesce("cj_sum_aix", F.lit(0))
            - F.coalesce("cx_sum_aix", F.lit(0))
            - (
                F.col("degree")
                / F.lit(2 * m)
                * (F.col("cj_sum_ki") - F.col("cx_sum_ki") + F.col("degree"))
            ),
        )
        # Rank mdq(x) in descending order
        .select(
            F.col("id"),
            F.col("c"),
            F.coalesce("cj", F.col("c")).alias("cj"),  # Trapped nodes: Cx == Cj
            F.col("mdq"),
            F.row_number()
            .over(Window.partitionBy("id").orderBy(F.desc("mdq")))
            .alias("mdq_rank"),
        )
        # Keep best (or first) change
        .where(F.col("mdq_rank") == 1)
    )

    # Break symmetric swaps (only in first iteration?)
    dq = (
        dq.withColumn(
            "sym_rank",
            F.row_number().over(
                Window.partitionBy(
                    F.sort_array(F.array(F.col("c"), F.col("cj")))
                ).orderBy(F.desc("mdq"))
            ),
        )
        # Select best switch (cStar) and break symmetric swaps with sym_rank
        .withColumn(
            "cStar",
            F.when(
                ((F.col("mdq") > F.lit(1e-04)) & (F.col("sym_rank") == 1)),
                F.col("cj"),
            ).otherwise(F.col("c")),
        ).selectExpr("id", "c as cx", "cStar as cj")
    )

    return dq

_sort_passes(res)

Takes the output of LouvainCommunities and returns a list containing its columns ordered by their integer part in ascending order. For example, if the columns returned by LouvainCommunities are['pass2', 'id', 'pass1', 'pass0'], this function will turn the list to['id', 'pass0', 'pass1', 'pass2']. This function also supports cases wheremax_pass > 10`.

Parameters:

Name Type Description Default
res DataFrame

A pyspark dataframe representing the output of LouvainCommunities. res must have columns 'id', 'pass0', 'pass1', 'pass2', etc.

required
Source code in mercury/graph/ml/louvain.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
def _sort_passes(self, res) -> list:
    """Takes the output of `LouvainCommunities` and returns a list containing
    its columns ordered by their integer part in ascending order.
    For example, if the columns returned by `LouvainCommunities are
    `['pass2', 'id', 'pass1', 'pass0']`, this function will turn the list to
    `['id', 'pass0', 'pass1', 'pass2']`.
    This function also supports cases where `max_pass > 10`.

    Args:
        res (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the output of `LouvainCommunities`.
            `res` must have columns 'id', 'pass0', 'pass1', 'pass2', etc.
    """

    # Get pass-columns and sort them by their integer part
    cols = [col for col in res.columns if "pass" in col]
    ints = sorted([int(col.replace("pass", "")) for col in cols])
    cols_sorted = ["id"] + ["pass" + str(i) for i in ints]

    return cols_sorted

_verify_data(df, expected_cols_grouping, expected_cols_others)

Checks if edges meets the format expected by LouvainCommunities.

Parameters:

Name Type Description Default
df DataFrame

A pyspark dataframe representing the edges of an undirected graph. It must have src and dst as its columns. The user may also specify the weight of each edge via the additional weight column (optional).

required
expected_cols_grouping list

A list of strings representing the columns that must be present in df to group the data.

required
expected_cols_others list

A list of strings representing the columns that must be present in df but are not used for grouping.

required
Source code in mercury/graph/ml/louvain.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def _verify_data(self, df, expected_cols_grouping, expected_cols_others):
    """Checks if `edges` meets the format expected by `LouvainCommunities`.

    Args:
        df (pyspark.sql.dataframe.DataFrame):
            A pyspark dataframe representing the edges of an undirected graph.
            It must have `src` and `dst` as its columns. The user may also
            specify the weight of each edge via the additional `weight` column
            (optional).

        expected_cols_grouping (list):
            A list of strings representing the columns that must be present in
            `df` to group the data.

        expected_cols_others (list):
            A list of strings representing the columns that must be present in
            `df` but are not used for grouping.
    """

    cols = df.columns
    expected_cols = expected_cols_grouping + expected_cols_others

    # Check type
    if not isinstance(df, DataFrame):
        raise TypeError("Input data must be a pyspark DataFrame.")

    # Check missing columns
    msg = "Input data is missing expected column '{}'."
    for col in expected_cols:
        if col not in cols:
            raise ValueError(msg.format(col))

    # Check for duplicates
    dup = (
        df.groupBy(*expected_cols_grouping)
        .agg(F.count(F.lit(1)).alias("count"))
        .where("count > 1")
        .count()
    )
    if dup > 0:
        raise ValueError("Data has duplicated entries.")

fit(g)

Parameters:

Name Type Description Default
g Graph

A mercury graph structure.

required

Returns:

Type Description
self

Fitted self (or raises an error).

Source code in mercury/graph/ml/louvain.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def fit(self, g: Graph):
    """
    Args:
        g (Graph): A mercury graph structure.

    Returns:
        (self): Fitted self (or raises an error).
    """
    edges = g.graphframe.edges

    # Verify edges input
    self._verify_data(
        df=edges,
        expected_cols_grouping=["src", "dst"],
        expected_cols_others=["weight"],
    )

    # Init dataframe to be returned
    ret = (
        edges.selectExpr("src as id")
        .unionByName(edges.selectExpr("dst as id"))
        .distinct()
        .withColumn("pass0", F.row_number().over(Window.orderBy("id")))
    ).checkpoint()

    # Convert edges to anonymized src's and dst's
    edges = (
        edges.selectExpr("src as src0", "dst as dst0", "weight")
        .join(other=ret.selectExpr("id as src0", "pass0 as src"), on="src0")
        .join(other=ret.selectExpr("id as dst0", "pass0 as dst"), on="dst0")
        .select("src", "dst", "weight")
    ).checkpoint()

    # Calculate m and initialize modularity
    m = self._calculate_m(edges)
    modularity0 = -1.0

    # Begin pass
    canPass, _pass = True, 0
    while canPass:

        # Declare naive partition
        p1 = (
            edges.selectExpr("src as id")
            .unionByName(edges.selectExpr("dst as id"))
            .distinct()
            .withColumn("c", F.col("id"))
        )

        # Begin iterations within pass
        canIter, _iter = True, 0
        # Carry reference to previously cached p2 to call unpersist()
        prev_p2 = None
        while canIter:

            if _iter >= self.max_iter:
                break

            # Print progress
            if self.verbose:
                print(f"Starting Pass {_pass} Iteration {_iter}.")

            # Create new partition and check if movements were made
            p2 = self._reassign_all(edges, p1)
            # Break complex lineage caused by loops first
            p2 = p2.checkpoint()
            p2.cache()

            canIter = len(p2.where("cx != cj").take(1)) > 0
            if canIter:
                p1 = p2.selectExpr("id", "cj as c")
            if prev_p2 is not None:
                prev_p2.unpersist()
            prev_p2 = p2
            _iter += 1

        # Calculate new modularity and update pass counter
        modularity1 = self._calculate_modularity(edges=edges, partition=p1, m=m)

        # Declare stopping criterion and update old modularity
        canPass = (modularity1 - modularity0 > self.min_modularity_gain) and (
            _pass < self.max_pass
        )
        modularity0 = modularity1

        self.modularity_ = modularity0

        # Update ret and compress graph
        if canPass:
            ret = ret.join(
                other=p1.selectExpr(f"id as pass{_pass}", f"c as pass{_pass + 1}"),
                on=f"pass{_pass}",
            ).checkpoint()

            edges = (
                self._label_edges(edges, p1)
                .select("cSrc", "cDst", "weight")
                .groupBy("cSrc", "cDst")
                .agg(F.sum("weight").alias("weight"))
                .selectExpr("cSrc as src", "cDst as dst", "weight")
            ).checkpoint()

        prev_p2.unpersist()
        _pass += 1

    # Return final dataframe with sorted columns
    if self.all_partitions:

        # Return sorted columns
        cols = self._sort_passes(ret)
        ret = ret.select(cols)

    # Return final dataframe with id & community
    else:
        _last = self._last_pass(ret)
        ret = ret.selectExpr("id as node_id", f"{_last} as cluster")

    self.labels_ = ret

    return self

mercury.graph.ml.SparkRandomWalker(num_epochs=10, batch_size=1, n_sampling_edges=None)

Bases: BaseClass

Class to perform random walks from a specific source_id node within a given Graph

Parameters:

Name Type Description Default
num_epochs int

Number of epochs. This is the total number of steps the iteration goes through.

10
batch_size int

This forces caching the random walks computed so far and breaks planning each time this number of epochs is reached. The default value is a high number to avoid this entering at all. In really large jobs, you may want to set this parameter to avoid possible overflows even if it can add some extra time to the process. Note that with a high number of epochs and nodes resource requirements for the active part of your random walks can be high. This allows to "cache a continue" so to say.

1
n_sampling_edges int

by setting this parameter you can limit at each timestep the number of new paths opened from each node. This is useful when the graph contains nodes with very high out-degree, where running the algorithm several epochs is not feasible. When using this parameter, the graph will consider only at most edge_sampling outgoing edges at each epoch for each path. If the last node of the path contains more than edge_sampling the selected edges are sampled using its weight.

None
Source code in mercury/graph/ml/spark_randomwalker.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, num_epochs=10, batch_size=1, n_sampling_edges=None):
    """
    Class to perform random walks from a specific source_id node within a given Graph

    Args:
        num_epochs (int): Number of epochs. This is the total number of steps the iteration goes through.
        batch_size (int): This forces caching the random walks computed so far and breaks planning each time this number of epochs
            is reached. The default value is a high number to avoid this entering at all. In really large jobs, you may want to
            set this parameter to avoid possible overflows even if it can add some extra time to the process. Note that with a high
            number of epochs and nodes resource requirements for the active part of your random walks can be high. This allows to
            "cache a continue" so to say.
        n_sampling_edges (int): by setting this parameter you can limit at each timestep the number of new paths opened from each node.
            This is useful when the graph contains nodes with very high out-degree, where running the algorithm several epochs is
            not feasible. When using this parameter, the graph will consider only at most `edge_sampling` outgoing edges at each
            epoch for each path. If the last node of the path contains more than `edge_sampling` the selected edges are sampled
            using its weight.
    """
    self.num_epochs = num_epochs
    self.batch_size = batch_size
    self.n_sampling_edges = n_sampling_edges

fit(G, source_id)

Perform random walks from a specific source_id node within a given Graph

Parameters:

Name Type Description Default
G mercury.graph Graph asset

A mercury.graph Graph

required
source_id int / str / list

the source vertex or list for vertices to start the random walks.

required

Returns:

Type Description
self

Fitted self (or raises an error)

Attribute paths_ contains a Spark Dataframe with a columns random_walks containing an array of the elements of the path walked and another column with the corresponding weights. The weights represent the probability of following that specific path starting from source_id.

Source code in mercury/graph/ml/spark_randomwalker.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def fit(self, G: Graph, source_id):
    """
    Perform random walks from a specific source_id node within a given Graph

    Args:
        G (mercury.graph Graph asset): A `mercury.graph` Graph
        source_id (int/str/list): the source vertex or list for vertices to start the random walks.

    Returns:
        (self): Fitted self (or raises an error)

    Attribute `paths_` contains a Spark Dataframe with a columns `random_walks` containing an array of the elements
    of the path walked and another column with the corresponding weights. The weights represent the probability of
    following that specific path starting from source_id.
    """
    self.paths_ = self._run_rw(G, source_id)

    return self

mercury.graph.ml.SparkSpreadingActivation(attribute='influence', spreading_factor=0.2, transfer_function='weighted', steps=1, influenced_by=False)

Bases: BaseClass

This class is a model that represents a “word-of-mouth” scenario where a node influences his neighbors, from where the influence spreads to other neighbors, and so on.

At the end of the diffusion process, we inspect the amount of influence received by each node. Using a threshold-based technique, a node that is currently not influenced can be declared to be a potential future one, based on the influence that has been accumulated.

The diffusion model is based on Spreading Activation (SPA) techniques proposed in cognitive psychology and later used for trust metric computations. For more details, please see paper entitled "Social Ties and their Relevance to Churn in Mobile Telecom Networks"

Parameters:

Name Type Description Default
attribute str

Column name which will store the amount of influence spread

'influence'
spreading_factor float

Percentage of influence to distribute. Low values favor influence proximity to the source of injection, while high values allow the influence to also reach nodes which are further away. It must be a value in the range (0,1). Default value is 0.2

0.2
transfer_function str

Allowed values: "weighted" or "unweighted". Once a node decides what fraction of energy to distribute, the next step is to decide what fraction of the energy is transferred to each neighbor. This is controlled by the Transfer Function. If "weighted" then the energy distributed along the directed edge depends on its relatively weight compared to the sum of weights of all outgoing edges of X. If "unweighted", then the energy distributed along the edge is independent of its relatively weight.

'weighted'
steps int

Number of steps to perform

1
influenced_by bool

if True, and extra column "influenced_by" is calculated which contains the seed nodes that have spread some influence to a given node. When True, the ids of the nodes cannot contain commas ",". Note that seed_nodes will have at least their own (remaining) influence

False
Source code in mercury/graph/ml/spark_spreadactivation.py
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(
    self,
    attribute: str = "influence",
    spreading_factor: float = 0.2,
    transfer_function: str = "weighted",
    steps: int = 1,
    influenced_by: bool = False,
):
    self.attribute = attribute
    self.spreading_factor = spreading_factor
    self.transfer_function = transfer_function
    self.steps = steps
    self.influenced_by = influenced_by

_compute_degrees(g)

Compute weighted and unweighted in and out degrees in graph. Re-declares graph to add the following attributes: inDegree, outDegree, w_inDegree, w_outDegree.

Parameters:

Name Type Description Default
g Graph

graphframe object, network

required
Source code in mercury/graph/ml/spark_spreadactivation.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def _compute_degrees(self, g: Graph):
    """
    Compute weighted and unweighted in and out degrees in graph. Re-declares graph to add the following
    attributes: inDegree, outDegree, w_inDegree, w_outDegree.

    Args:
        g: graphframe object, network
    """
    g_vertices = g.graphframe.vertices
    g_edges = g.graphframe.edges

    # Get unweighted degrees
    indeg = g.graphframe.inDegrees
    outdeg = g.graphframe.outDegrees

    # Get weighted degrees
    w_indeg = (
        g_edges.groupby("dst").agg(f.sum("weight").alias("w_inDegree"))
    ).selectExpr("dst as id", "w_inDegree as w_inDegree")
    w_outdeg = (
        g_edges.groupby("src").agg(f.sum("weight").alias("w_outDegree"))
    ).selectExpr("src as id", "w_outDegree as w_outDegree")

    # Update vertices attribute
    new_v = g_vertices.join(indeg, "id", "left_outer")
    new_v = new_v.join(outdeg, "id", "left_outer")
    new_v = new_v.join(w_indeg, "id", "left_outer")
    new_v = new_v.join(w_outdeg, "id", "left_outer")
    new_v = new_v.na.fill(0)

    # Update graph
    return Graph(GraphFrame(new_v, g_edges))

_set_seed_nodes(g, seed_nodes=None)

Set seed nodes which are the source of influence using pyspark dataframe.

Parameters:

Name Type Description Default
g Graph

A mercury.graph Graph object.

required
seed_nodes Union[List, DataFrame]

Collection of nodes that are the source to spread the influence. It must be pyspark dataframe with column 'id' or python list.

None
Source code in mercury/graph/ml/spark_spreadactivation.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def _set_seed_nodes(
    self,
    g: Graph,
    seed_nodes: Union[List, "pyspark.sql.DataFrame"] = None,
):
    """
    Set seed nodes which are the source of influence using pyspark dataframe.

    Args:
        g (mercury.graph.core.Graph): A `mercury.graph` Graph object.
        seed_nodes (Union[List, pyspark.sql.DataFrame]): Collection of nodes that are the source to spread
            the influence. It must be pyspark dataframe with column 'id' or python list.
    """

    seed_nodes_dataframe = seed_nodes

    # Convert list to dataframe
    if isinstance(seed_nodes, list):
        rdd_list = SparkInterface().spark.sparkContext.parallelize(seed_nodes)
        row_rdd_list = rdd_list.map(lambda x: Row(x))
        field_list = [StructField("id", StringType(), True)]
        schema_list = StructType(field_list)
        seed_nodes_dataframe = SparkInterface().spark.createDataFrame(
            row_rdd_list, schema_list
        )

    # Create column for influence attribute containing 1's
    seed_nodes_dataframe = seed_nodes_dataframe.withColumn(
        self.attribute, f.lit(1.0)
    )
    self.seed_nodes_ = seed_nodes_dataframe

    # Merge to original vertices of graph
    orig_vertices = g.graphframe.vertices.select("id")
    orig_edges = g.graphframe.edges
    new_vertices = orig_vertices.join(
        seed_nodes_dataframe, "id", "left_outer"
    ).na.fill(0)

    # If influenced_by flag is set, then initialize the seed nodes
    if self.influenced_by:
        new_vertices = new_vertices.withColumn(
            "influenced_by",
            f.when(
                new_vertices[self.attribute] == 1,
                f.split(new_vertices["id"], pattern=","),
            ).otherwise(f.array().cast("array<string>")),
        )

    # Update graph
    return Graph(GraphFrame(new_vertices, orig_edges))

_spread_activation_step(g)

One step in the spread activation model.

Parameters:

Name Type Description Default
g Graph

graphframe object, network

required

Returns:

Type Description
Graphframe

new network with updated new calculation of attribute in vertices

Source code in mercury/graph/ml/spark_spreadactivation.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def _spread_activation_step(self, g: Graph):
    """
    One step in the spread activation model.

    Args:
        g: graphframe object, network

    Returns:
        (Graphframe): new network with updated new calculation of attribute in vertices
    """

    # Pass influence/message to neighboring nodes (weighted/unweighted option)
    if self.transfer_function == "unweighted":
        msg_to_src = (AM.src[self.attribute] / AM.src["outDegree"]) * (
            1 - self.spreading_factor
        )
        msg_to_dst = f.when(
            AM.dst["outDegree"] != 0,
            (
                (AM.src[self.attribute] / AM.src["outDegree"])
                * self.spreading_factor
            ),
        ).otherwise(
            ((1 / AM.dst["inDegree"]) * AM.dst[self.attribute])
            + (
                (AM.src[self.attribute] / AM.src["outDegree"])
                * self.spreading_factor
            )
        )

    elif self.transfer_function == "weighted":
        weight = AM.edge["weight"] / AM.src["w_outDegree"]
        msg_to_src = (AM.src[self.attribute] / AM.src["outDegree"]) * (
            1 - self.spreading_factor
        )
        msg_to_dst = f.when(
            AM.dst["outDegree"] != 0,
            ((AM.src[self.attribute]) * (self.spreading_factor * weight)),
        ).otherwise(
            ((1 / AM.dst["inDegree"]) * AM.dst[self.attribute])
            + ((AM.src[self.attribute]) * (self.spreading_factor * weight))
        )

    # Aggregate messages
    agg = g.graphframe.aggregateMessages(
        f.sum(AM.msg).alias(self.attribute),
        sendToSrc=msg_to_src,
        sendToDst=msg_to_dst,
    )

    # Create a new cached copy of the dataFrame to get new calculated attribute
    cached_new_vertices = AM.getCachedDataFrame(agg)

    if self.influenced_by:
        to_join = g.graphframe.vertices.select(
            "id",
            "inDegree",
            "outDegree",
            "w_inDegree",
            "w_outDegree",
            "influenced_by",
        )
    else:
        to_join = g.graphframe.vertices.select(
            "id", "inDegree", "outDegree", "w_inDegree", "w_outDegree"
        )
    new_cached_new_vertices = cached_new_vertices.join(to_join, "id", "left_outer")
    new_cached_new_vertices = new_cached_new_vertices.na.fill(0)

    # If influenced_by flag is set, compute new seed nodes influencing
    if self.influenced_by:
        new_cached_new_vertices = self._calculate_influenced_by(
            g, new_cached_new_vertices
        )

    # Return graph with new calculated attribute
    return Graph(GraphFrame(new_cached_new_vertices, g.graphframe.edges))

fit(g, seed_nodes)

Perform all iterations of spread_activation

Parameters:

Name Type Description Default
g Graph

A mercury.graph Graph object.

required
seed_nodes Union[List, DataFrame]

Collection of nodes that are the "seed" or are the source to spread the influence. It must be pyspark dataframe with column 'id' or python list

required

Returns:

Type Description
self

Fitted self

Source code in mercury/graph/ml/spark_spreadactivation.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def fit(
    self,
    g: Graph,
    seed_nodes: Union[List, "pyspark.sql.DataFrame"],
):
    """
    Perform all iterations of spread_activation

    Args:
        g (mercury.graph.core.Graph): A `mercury.graph` Graph object.
        seed_nodes (Union[List, pyspark.sql.DataFrame]): Collection of nodes that are the "seed" or are the source to spread
            the influence. It must be pyspark dataframe with column 'id' or python list

    Returns:
        (self): Fitted self
    """

    # Set seed nodes which are the source of influence
    g = self._set_seed_nodes(g, seed_nodes)

    # Compute degrees
    g = self._compute_degrees(g)

    # Number of iterations specified for spread activation
    for _ in range(0, self.steps, 1):
        g = self._spread_activation_step(
            g,
        )

    # Graph with updated attributes
    self.fitted_graph_ = g
    # Influences as DataFrame
    self.influences_ = self.fitted_graph_.nodes_as_dataframe().select(
        "id", "influence"
    )

    return self

mercury.graph.ml.SpectralClustering(n_clusters=2, mode='networkx', max_iterations=10, random_state=0)

Bases: BaseClass

Implementation of the spectral clustering algorithm which detect communities inside a graph.

Contributed by Gibran Gabriel Otazo Sanchez.

Parameters:

Name Type Description Default
n_clusters int

The number of clusters that you want to detect.

2
random_state int

Seed for reproducibility

0
mode str

Calculation mode. Pass 'networkx' for using pandas + networkx or 'spark' for spark + graphframes

'networkx'
max_iterations int

Max iterations parameter (only used if mode==spark)

10
Source code in mercury/graph/ml/spectral.py
32
33
34
35
36
37
38
39
40
41
def __init__(
    self, n_clusters=2, mode="networkx", max_iterations=10, random_state=0
):
    self.n_clusters = n_clusters
    self.mode = mode
    self.max_iterations = max_iterations
    self.random_state = random_state

    if self.mode not in ("networkx", "spark"):
        raise ValueError("Error: Mode must be either 'networkx' or 'spark'")

_fit_networkx(graph)

Spectral clustering but using networkx (local mode implementation)

Parameters:

Name Type Description Default
graph Graph

A mercury graph structure.

required

Returns:

Type Description
self

Fitted self (or raises an error)

Source code in mercury/graph/ml/spectral.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def _fit_networkx(self, graph: Graph):
    """
    Spectral clustering but using networkx (local mode implementation)

    Args:
        graph (Graph): A mercury graph structure.

    Returns:
        (self): Fitted self (or raises an error)
    """
    gnx = graph.networkx.to_undirected()

    L = normalized_laplacian_matrix(gnx).todense()

    if not np.allclose(L, L.T):
        raise ValueError("Normalized Laplacian matrix of the undirected graph should be symmetric")

    w, v = eigh(L)

    U = v[:, : self.n_clusters]
    U = asarray(U)

    kmeans = KMeans(
        n_clusters=self.n_clusters, random_state=self.random_state, n_init="auto"
    ).fit(U)

    self.labels_ = DataFrame({"node_id": gnx.nodes(), "cluster": kmeans.labels_})

    cluster_nodes = self.labels_.groupby("cluster")["node_id"].apply(list)
    self.modularity_ = nx_modularity(gnx, cluster_nodes)

_fit_spark(graph)

Spectral clustering but using pyspark

Parameters:

Name Type Description Default
graph Graph

A mercury graph structure.

required

Returns:

Type Description
self

Fitted self (or raises an error)

Source code in mercury/graph/ml/spectral.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def _fit_spark(self, graph: Graph):
    """
    Spectral clustering but using pyspark

    Args:
        graph (Graph): A mercury graph structure.

    Returns:
        (self): Fitted self (or raises an error)
    """

    graph_frames_graph = graph.graphframe

    pic = PowerIterationClustering(k=self.n_clusters, weightCol="weight")
    pic.setMaxIter(self.max_iterations)

    # Node ids can be strings, with this we ensure IDs are always converted to
    # integers (needed by PowerIterationClustering)
    vertices_mapping = graph_frames_graph.vertices.withColumn(
        "idx", F.monotonically_increasing_id()
    )

    mapped_node_ids = (
        graph_frames_graph.edges.join(
            vertices_mapping, graph_frames_graph.edges.src == vertices_mapping.id
        )
        .withColumnRenamed("idx", "src_mapped")
        .drop("id", "src")
    )

    mapped_node_ids = (
        mapped_node_ids.join(
            vertices_mapping, mapped_node_ids.dst == vertices_mapping.id
        )
        .withColumnRenamed("idx", "dst_mapped")
        .drop("id", "dst")
        .withColumnRenamed("src_mapped", "src")
        .withColumnRenamed("dst_mapped", "dst")
    )
    assignments = pic.assignClusters(mapped_node_ids)

    self.labels_ = (
        vertices_mapping.join(assignments, vertices_mapping.idx == assignments.id)
        .drop(assignments.id)
        .selectExpr(["id as node_id", "cluster"])
    )

    self.modularity_ = self._spark_modularity(
        graph_frames_graph.edges, graph_frames_graph.degrees
    )

_spark_modularity(edges, degrees, resolution=1)

Computes modularity using the same approximation as networkx: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.community.quality.modularity.html

Source code in mercury/graph/ml/spectral.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def _spark_modularity(self, edges, degrees, resolution=1):
    """Computes modularity using the same approximation as networkx:
    https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.community.quality.modularity.html
    """

    edge_nb = edges.count()
    q = []

    for i in range(self.n_clusters):
        nids = self.labels_[self.labels_.cluster == i]
        nodeids = [row["node_id"] for row in nids.select("node_id").collect()]

        l_c = edges.filter(
            edges.src.isin(nodeids) & edges.dst.isin(nodeids)
        ).count()

        k_c = (
            nids.join(degrees.withColumnRenamed("id", "node_id"), on="node_id")
            .select(F.sum("degree"))
            .collect()[0][0]
        )

        qi = (l_c / edge_nb) - resolution * (k_c / (2 * edge_nb)) ** 2
        q.append(qi)

    return np.sum(q)

fit(graph)

Find the optimal clusters of a given graph. The function returns nothing, but saves the clusters and the modularity in the object self.

Parameters:

Name Type Description Default
graph Graph

A mercury graph structure.

required

Returns:

Type Description
self

Fitted self (or raises an error)

Source code in mercury/graph/ml/spectral.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def fit(self, graph: Graph):
    """
    Find the optimal clusters of a given graph. The function returns nothing, but saves the clusters and
    the modularity in the object self.

    Args:
        graph (Graph): A mercury graph structure.

    Returns:
        (self): Fitted self (or raises an error)

    """
    if self.mode == "networkx":
        self._fit_networkx(graph)
    else:
        self._fit_spark(graph)

    return self

mercury.graph.ml.Transition()

Bases: BaseClass

Create an interface class to manage the adjacency matrix of a directed graph as a transition matrix. This enables computing distributions of probabilities over the nodes after a given number of iterations.

Source code in mercury/graph/ml/transition.py
18
19
def __init__(self):
    self.fitted_graph_ = None

fit(G)

Converts the adjacency matrix into a transition matrix. Transition matrices are used to compute the distribution of probability of being in each of the nodes (or states) of a directed graph (or Markov process). The distribution for state s is:

  • \(s_t = T*s_{t-1}\)

Where:

T is the transition matrix. After calling.fit(), the adjacency matrix is the transition matrix. You can use .to_pandas() to see it. \(s_{t-1}\) is the previous state.

What .fit() does is scaling the non-zero rows to make them sum 1 as they are probability distributions and make the zero rows recurrent states. A recurrent state is a final state, a state whose next state is itself.

Parameters:

Name Type Description Default
G Graph

A mercury.graph Graph.

required

Returns:

Type Description
self

Fitted self (or raises an error).

Note

If created using NetworkX directly, the name of the weight must be 'weight' and must be positive. The recommended way to create the graph is using .set_row() which will always name the weight as 'weight' but does not check the value.

Source code in mercury/graph/ml/transition.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def fit(self, G: Graph):
    """
    Converts the adjacency matrix into a transition matrix. Transition matrices are used to compute the distribution of probability
    of being in each of the nodes (or states) of a directed graph (or Markov process). The distribution for state s is:

    * $s_t = T*s_{t-1}$

    Where:

    T is the transition matrix. After calling.fit(), the adjacency matrix is the transition matrix. You can use .to_pandas() to see it.
    $s_{t-1}$ is the previous state.

    What .fit() does is scaling the non-zero rows to make them sum 1 as they are probability distributions and make the zero rows
    recurrent states. A recurrent state is a final state, a state whose next state is itself.

    Args:
        G (Graph): A `mercury.graph` Graph.

    Returns:
        (self): Fitted self (or raises an error).

    Note:
        If created using NetworkX directly, the name of the weight must be 'weight' and must be positive. The recommended way
        to create the graph is using .set_row() which will always name the weight as 'weight' but does not check the value.

    """
    names = list(G.networkx.nodes)
    adj_m = nx.adjacency_matrix(G.networkx, weight="weight", dtype=float)

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")

        for i in range(adj_m.shape[0]):
            row = adj_m[[i], :]
            tot = row.sum()

            if tot == 0:
                row[0, i] = 1
            else:
                row = row / tot

            adj_m[[i], :] = row

    df = pd.DataFrame(adj_m.todense(), index=names, columns=names)
    self.fitted_graph_ = Graph(nx.from_pandas_adjacency(df, create_using=nx.DiGraph))

    return self

to_pandas(num_iterations=1)

Returns the adjacency (which is the transition matrix after fit() was called) for a given number of iterations as a pandas dataframe with labeled rows and columns.

Parameters:

Name Type Description Default
num_iterations int

If you want to compute the matrix for a different number of iterations, k, you can use this argument to raise the matrix to any non negative integer, since \(s_{t+k} = T^k*s_t\)

1

Returns:

Type Description
DataFrame

The transition matrix for num_iterations.

Note

This method does not automatically call fit(). This allows inspecting the adjacency matrix as a pandas dataframe. The result of computing num_iterations will not make sense if fit() has not been called before to_pandas().

Source code in mercury/graph/ml/transition.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def to_pandas(self, num_iterations=1):
    """
    Returns the adjacency (which is the transition matrix after `fit()` was called) for a given number of iterations as a pandas
    dataframe with labeled rows and columns.

    Args:
        num_iterations (int): If you want to compute the matrix for a different number of iterations, k, you can use this argument to
            raise the matrix to any non negative integer, since $s_{t+k} = T^k*s_t$

    Returns:
        (pd.DataFrame): The transition matrix for num_iterations.

    Note:
        This method does not automatically call `fit()`. This allows inspecting the adjacency matrix as a pandas dataframe.
        The result of computing num_iterations will not make sense if `fit()` has not been called before `to_pandas()`.

    """
    if self.fitted_graph_ is None:
        raise ValueError("Error: fit() must be called first.")

    names = list(self.fitted_graph_.networkx.nodes)
    adj_m = nx.adjacency_matrix(self.fitted_graph_.networkx, weight="weight").todense()

    if num_iterations != 1:
        adj_m = matrix_power(adj_m, num_iterations)

    return pd.DataFrame(adj_m, index=names, columns=names)