Skip to content

mercury.graph.core

mercury.graph.core.Graph(data=None, keys=None, nodes=None)

This is the main class in mercury.graph.

This class seamlessly abstracts the underlying technology used to represent the graph. You can create a graph passing the following objects to the constructor:

  • A pandas DataFrame containing edges (with a keys dictionary to specify the columns and possibly a nodes DataFrame)
  • A pyspark DataFrame containing edges (with a keys dictionary to specify the columns and possibly a nodes DataFrame)
  • A networkx graph
  • A graphframes graph

Bear in mind that the graph object is immutable. This means that you can't modify the graph object once it has been created. If you want to modify it, you have to create a new graph object.

The graph object provides:

  • Properties to access the graph in different formats (networkx, graphframes, dgl)
  • Properties with metrics and summary information that are calculated on demand and technology independent.
  • It is inherited by other graph classes in mercury-graph providing ML algorithms such as graph embedding, visualization, etc.

Using this class from the other classes in mercury-graph:

The other classes in mercury-graph define models or functionalities that are based on graphs. They use a Scikit-learn-like API to interact with the graph object. This means that the graph object is passed to the class constructor and the class follow the Scikit-learn conventions. It is recommended to follow the same conventions when creating your own classes to work with mercury-graph.

The conventions can be found here:

Parameters:

Name Type Description Default
data (DataFrame, Graph or DataFrame)

The data to create the graph from. It can be a pandas DataFrame, a networkx Graph, a pyspark DataFrame, or a Graphframe. In case it already contains a graph (networkx or graphframes), the keys and nodes arguments are ignored.

None
keys dict

A dictionary with keys to specify the columns in the data DataFrame. The keys are:

  • 'src': The name of the column with the source node.
  • 'dst': The name of the column with the destination node.
  • 'id': The name of the column with the node id.
  • 'weight': The name of the column with the edge weight.
  • 'directed': A boolean to specify if the graph is directed. (Only for pyspark DataFrames)

When the keys argument is not provided or the key is missing, the default values are:

  • 'src': 'src'
  • 'dst': 'dst'
  • 'id': 'id'
  • 'weight': 'weight'
  • 'directed': True
None
nodes DataFrame

A pandas DataFrame or a pyspark DataFrame with the nodes data. (Only when data is pandas or pyspark DataFrame and with the same type as data) If not given, the nodes are inferred from the edges DataFrame.

None
Source code in mercury/graph/core/graph.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def __init__(self, data = None, keys = None, nodes = None):
    self._as_networkx = None
    self._as_graphframe = None
    self._as_dgl = None
    self._degree = None
    self._in_degree = None
    self._out_degree = None
    self._closeness_centrality = None
    self._betweenness_centrality = None
    self._pagerank = None
    self._connected_components = None
    self._nodes_colnames = None
    self._edges_colnames = None

    self._number_of_nodes = 0
    self._number_of_edges = 0
    self._node_ix = 0
    self._is_directed = False
    self._is_weighted = False

    self._init_values = {k: v for k, v in locals().items() if k in inspect.signature(self.__init__).parameters}

    if type(data) == pd.core.frame.DataFrame:
        self._from_pandas(data, nodes, keys)
        return

    if isinstance(data, nx.Graph):      # This is the most general case, including: ...Graph, ...DiGraph and ...MultiGraph
        self._from_networkx(data)
        return

    spark_int = SparkInterface()

    if pyspark_installed and graphframes_installed:
        if type(data) == spark_int.type_spark_dataframe:
            self._from_dataframe(data, nodes, keys)
            return

        if type(data) == spark_int.type_graphframe:
            self._from_graphframes(data)
            return

    raise ValueError('Invalid input data. (Expected: pandas DataFrame, a networkx Graph, a pyspark DataFrame, a graphframes Graph.)')

betweenness_centrality property

Returns the betweenness centrality of each node in the graph as a Python dictionary.

closeness_centrality property

Returns the closeness centrality of each node in the graph as a Python dictionary.

connected_components property

Returns the connected components of each node in the graph as a Python dictionary.

degree property

Returns the degree of each node in the graph as a Python dictionary.

dgl property

Returns the graph as a DGL graph.

If the graph has not been converted to a DGL graph yet, it will be converted and cached for future use.

Returns:

Type Description
DGLGraph

The graph represented as a DGL graph.

edges property

Returns an iterator over the edges in the graph.

Returns:

Type Description
EdgeIterator

An iterator object that allows iterating over the edges in the graph.

edges_colnames property

Returns the column names of the edges DataFrame.

graphframe property

Returns the graph as a GraphFrame.

If the graph has not been converted to a GraphFrame yet, it will be converted and cached for future use.

Returns:

Type Description
GraphFrame

The graph represented as a GraphFrame.

in_degree property

Returns the in-degree of each node in the graph as a Python dictionary.

is_directed property

Returns True if the graph is directed, False otherwise.

Note

Graphs created using graphframes are always directed. The way around it is to add the reverse edges to the graph. This can be done by creating the Graph with pyspark DataFrame() and defining a key 'directed' set as False in the dict argument. Otherwise, the graph will be considered directed even if these reversed edges have been created by other means this class cannot be aware of.

is_weighted property

Returns True if the graph is weighted, False otherwise.

A graph is considered weight if it has a column named 'weight' in the edges DataFrame or the column has a different name and that name is passed in the dict argument as the 'weight' key.

networkx property

Returns the graph representation as a NetworkX graph.

If the graph has not been converted to NetworkX format yet, it will be converted and cached for future use.

Returns:

Type Description
Graph

The graph representation as a NetworkX graph.

nodes property

Returns an iterator over all the nodes in the graph.

Returns:

Type Description
NodeIterator

An iterator that yields each node in the graph.

nodes_colnames property

Returns the column names of the nodes DataFrame.

number_of_edges property

Returns the number of edges in the graph.

Returns:

Type Description
int

The number of edges in the graph.

number_of_nodes property

Returns the number of nodes in the graph.

Returns:

Type Description
int

The number of nodes in the graph.

out_degree property

Returns the out-degree of each node in the graph as a Python dictionary.

pagerank property

Returns the PageRank of each node in the graph as a Python dictionary.

edges_as_dataframe()

Returns the edges as a pyspark DataFrame.

If the graph is represented as a graphframes graph, the edges are extracted from it. Otherwise, the edges are converted from the pandas DataFrame representation. The columns used as the source and destination nodes are always named 'src' and 'dst', respectively, regardless of the original column names passed to the constructor.

Source code in mercury/graph/core/graph.py
481
482
483
484
485
486
487
488
489
490
491
492
def edges_as_dataframe(self):
    """
    Returns the edges as a pyspark DataFrame.

    If the graph is represented as a graphframes graph, the edges are extracted from it. Otherwise, the edges are converted from the
    pandas DataFrame representation. The columns used as the source and destination nodes are always named 'src' and 'dst',
    respectively, regardless of the original column names passed to the constructor.
    """
    if self._as_graphframe is not None:
        return self._as_graphframe.edges

    return SparkInterface().spark.createDataFrame(self.edges_as_pandas())

edges_as_pandas()

Returns the edges as a pandas DataFrame.

If the graph is represented as a networkx graph, the edges are extracted from it. Otherwise, the graphframes graph will be used. This dataset may differ from possible pandas DataFrame passed to the constructor in the column names and order. The columns used as the source and destination nodes are always named 'src' and 'dst', respectively.

Source code in mercury/graph/core/graph.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def edges_as_pandas(self):
    """
    Returns the edges as a pandas DataFrame.

    If the graph is represented as a networkx graph, the edges are extracted from it. Otherwise, the graphframes graph will be used.
    This dataset may differ from possible pandas DataFrame passed to the constructor in the column names and order. The columns used
    as the source and destination nodes are always named 'src' and 'dst', respectively.
    """
    if self._as_networkx is not None:
        edges_data = self._as_networkx.edges(data = True)
        edges_df   = pd.DataFrame([(src, dst, attr) for src, dst, attr in edges_data], columns = ['src', 'dst', 'attributes'])

        attrs_df   = pd.json_normalize(edges_df['attributes'])

        return pd.concat([edges_df.drop('attributes', axis = 1), attrs_df], axis = 1)

    return self.graphframe.edges.toPandas()

nodes_as_dataframe()

Returns the nodes as a pyspark DataFrame.

If the graph is represented as a graphframes graph, the nodes are extracted from it. Otherwise, the nodes are converted from the pandas DataFrame representation. The column used as the node id is always named 'id', regardless of the original column name passed to the constructor.

Source code in mercury/graph/core/graph.py
467
468
469
470
471
472
473
474
475
476
477
478
def nodes_as_dataframe(self):
    """
    Returns the nodes as a pyspark DataFrame.

    If the graph is represented as a graphframes graph, the nodes are extracted from it. Otherwise, the nodes are converted from the
    pandas DataFrame representation. The column used as the node id is always named 'id', regardless of the original column name passed
    to the constructor.
    """
    if self._as_graphframe is not None:
        return self._as_graphframe.vertices

    return SparkInterface().spark.createDataFrame(self.nodes_as_pandas())

nodes_as_pandas()

Returns the nodes as a pandas DataFrame.

If the graph is represented as a networkx graph, the nodes are extracted from it. Otherwise, the graphframes graph will be used. This dataset may differ from possible pandas DataFrame passed to the constructor in the column names and order. The column used as the node id is always named 'id'.

Source code in mercury/graph/core/graph.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def nodes_as_pandas(self):
    """
    Returns the nodes as a pandas DataFrame.

    If the graph is represented as a networkx graph, the nodes are extracted from it. Otherwise, the graphframes graph will be used.
    This dataset may differ from possible pandas DataFrame passed to the constructor in the column names and order. The column used
    as the node id is always named 'id'.
    """
    if self._as_networkx is not None:
        nodes_data = self._as_networkx.nodes(data = True)
        nodes_df   = pd.DataFrame([(node, attr) for node, attr in nodes_data], columns = ['id', 'attributes'])

        attrs_df = pd.json_normalize(nodes_df['attributes'])

        return pd.concat([nodes_df.drop('attributes', axis = 1), attrs_df], axis = 1)

    return self.graphframe.vertices.toPandas()

mercury.graph.core.SparkInterface(config=None, session=None)

A class that provides an interface for interacting with Apache Spark, graphframes and dgl.

Attributes:

Name Type Description
_spark_session SparkSession

The shared Spark session.

_graphframes module

The shared graphframes namespace.

Methods:

Name Description
_create_spark_session

Creates a Spark session.

spark

Property that returns the shared Spark session.

pyspark

Property that returns the pyspark namespace.

graphframes

Property that returns the shared graphframes namespace.

dgl

Property that returns the shared dgl namespace.

read_csv

Reads a CSV file into a DataFrame.

read_parquet

Reads a Parquet file into a DataFrame.

read_json

Reads a JSON file into a DataFrame.

read_text

Reads a text file into a DataFrame.

read

Reads a file into a DataFrame.

sql

Executes a SQL query.

udf

Registers a user-defined function (UDF).

stop

Stops the Spark session.

Parameters:

Name Type Description Default
config dict

A dictionary of Spark configuration options. If not provided, the configuration in the global variable default_spark_config will be used.

None
Source code in mercury/graph/core/spark_interface.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(self, config=None, session=None):
    if SparkInterface._spark_session is None:
        if session is not None:
            SparkInterface._spark_session = session
        else:
            SparkInterface._spark_session = self._create_spark_session(config)
            # Set checkpoint directory
            SparkInterface._spark_session.sparkContext.setCheckpointDir(".checkpoint")

    if SparkInterface._graphframes is None and graphframes_installed:
        SparkInterface._graphframes = gf

    if SparkInterface._dgl is None and dgl_installed:
        SparkInterface._dgl = dgl