Skip to content

mercury.graph.embeddings

mercury.graph.embeddings.Embeddings(dimension, num_elements=0, mean=0, sd=1, learn_step=3, bidirectional=False)

Bases: BaseClass

This class holds a matrix object that is interpreted as the embeddings for any list of objects, not only the nodes of a graph. You can see this class as the internal object holding the embedding for other classes such as class GraphEmbedding.

Parameters:

Name Type Description Default
dimension int

The number of columns in the embedding. See note below.

required
num_elements int

The number of rows in the embedding. You can leave this empty on creation and then use initialize_as() to automatically match the nodes in a graph.

0
mean float

The (expected) mean of the initial values.

0
sd float

The (expected) standard deviation of the initial values.

1
learn_step float

The size of the learning step elements get approached or moved away. Units are hexadecimal degrees in along an ellipse.

3
bidirectional bool

Should the changes apply only to the elements of first column (False) or to both.

False
Note

On dimension: Embeddings cannot be zero (that is against the whole concept). Smaller dimension embeddings can only hold few elements without introducing spurious correlations by some form of 'birthday attack' phenomenon as elements increase. Later it is very hard to get rid of that spurious 'knowledge'.

Solution: With may elements, you have to go to high enough dimension even if the structure is simple. Pretending to fit many embeddings in low dimension without them being correlated is like pretending to plot a trillion random points in a square centimeter while keeping them 1 mm apart from each other: It's simply impossible!

Source code in mercury/graph/embeddings/embeddings.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self, dimension, num_elements=0, mean=0, sd=1, learn_step=3, bidirectional=False
):
    self.dimension = dimension
    self.num_elements = num_elements
    self.mean = mean
    self.sd = sd
    self.learn_step = learn_step
    self.bidirectional = bidirectional

    if self.num_elements > 0:
        self.embeddings_matrix_ = np.random.normal(
            self.mean, self.sd, (self.num_elements, self.dimension)
        )

as_numpy()

Return the embedding as a numpy matrix where each row is an embedding.

Source code in mercury/graph/embeddings/embeddings.py
117
118
119
120
121
122
123
124
def as_numpy(self):
    """
    Return the embedding as a numpy matrix where each row is an embedding.
    """
    if not hasattr(self, "embeddings_matrix_"):
        return

    return self.embeddings_matrix_

fit(converge=None, diverge=None)

Apply a learning step to the embedding.

Parameters:

Name Type Description Default
converge numpy matrix of two columns

A matrix of indices to elements meaning (first column) should be approached to (second column).

None
diverge numpy matrix of two columns

A matrix of indices to elements meaning (first column) should be moved away from (second column).

None

Returns:

Type Description
self

Fitted self (or raises an error)

Note

Embeddings start being randomly distributed and hold no structure other than spurious correlations. Each time you apply a learning step by calling this method, you are tweaking the embedding to approach some rows and/or move others away. You can use both converge and diverge or just one of them and call this as many times you want with varying learning step. A proxy of how much an embedding can learn can be estimated by measuring how row correlations are converging towards some asymptotic values.

Source code in mercury/graph/embeddings/embeddings.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def fit(self, converge=None, diverge=None):
    """
    Apply a learning step to the embedding.

    Args:
        converge (numpy matrix of two columns): A matrix of indices to elements meaning (first column) should be approached to
            (second column).
        diverge (numpy matrix of two columns): A matrix of indices to elements meaning (first column) should be moved away from
            (second column).

    Returns:
        (self): Fitted self (or raises an error)

    Note:
        Embeddings start being randomly distributed and hold no structure other than spurious correlations. Each time you apply a
        learning step by calling this method, you are tweaking the embedding to approach some rows and/or move others away. You can use
        both converge and diverge or just one of them and call this as many times you want with varying learning step. A proxy of how
        much an embedding can learn can be estimated by measuring how row correlations are converging towards some asymptotic values.
    """

    w = self.learn_step * np.pi / 180

    cos_w = np.cos(w)
    sin_w = np.sin(w)

    if converge is not None:
        self.embeddings_matrix_ = _elliptic_rotate(
            self.embeddings_matrix_, converge[:, 0], converge[:, 1], cos_w, sin_w
        )

        if self.bidirectional:
            self.embeddings_matrix_ = _elliptic_rotate(
                self.embeddings_matrix_,
                converge[:, 1],
                converge[:, 0],
                cos_w,
                sin_w,
            )

    if diverge is not None:
        self.embeddings_matrix_ = _elliptic_rotate(
            self.embeddings_matrix_, diverge[:, 0], diverge[:, 1], cos_w, -sin_w
        )

        if self.bidirectional:
            self.embeddings_matrix_ = _elliptic_rotate(
                self.embeddings_matrix_, diverge[:, 1], diverge[:, 0], cos_w, -sin_w
            )

    return self

get_most_similar_embeddings(index, k=5, metric='cosine')

Given an index of a vector in the embedding matrix, returns the k most similar embeddings in the matrix

Parameters:

Name Type Description Default
index int

index of the vector in the matrix that we want to compute the similar embeddings

required
k int

Number of most similar embeddings to return

5
metric str

metric to use as a similarity.

'cosine'

Returns:

Type Description
list

list of k most similar nodes as indices and list of similarities of the most similar nodes

Source code in mercury/graph/embeddings/embeddings.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def get_most_similar_embeddings(self, index, k=5, metric="cosine"):
    """
    Given an index of a vector in the embedding matrix, returns the k most similar embeddings in the matrix

    Args:
        index (int): index of the vector in the matrix that we want to compute the similar embeddings
        k (int): Number of most similar embeddings to return
        metric (str): metric to use as a similarity.

    Returns:
        (list): list of k most similar nodes as indices and list of similarities of the most similar nodes
    """
    if metric == "cosine":
        similarities = (
            1
            - cdist(
                np.expand_dims(self.as_numpy()[index], axis=0),
                self.as_numpy(),
                "cosine",
            )[0]
        )

    elif metric == "euclidean":
        similarities = 1 / (
            1
            + cdist(
                np.expand_dims(self.as_numpy()[index], axis=0),
                self.as_numpy(),
                "euclidean",
            )[0]
        )

    else:
        raise ValueError("Unknown Distance Metric: %s" % metric)

    ordered_indices = np.argsort(similarities)[::-1][1 : (k + 1)]
    ordered_similarities = similarities[ordered_indices]

    return ordered_indices, ordered_similarities

mercury.graph.embeddings.GraphEmbedding(dimension=None, n_jumps=None, max_per_epoch=None, learn_step=3, bidirectional=False, load_file=None)

Bases: BaseClass

Create an embedding mapping the nodes of a graph.

Includes contributions by David Muelas Recuenco.

Parameters:

Name Type Description Default
dimension int

The number of columns in the embedding. See note the notes in Embeddings for details. (This parameter will be ignored when load_file is used.)

None
n_jumps int

Number of random jumps from node to node.

None
max_per_epoch int

Maximum number Number of consecutive random jumps without randomly jumping outside the edges. Note that normal random jumps are not going to explore outside a connected component.

None
learn_step float

The size of the learning step elements get approached or moved away. Units are hexadecimal degrees in along an ellipse.

3
bidirectional bool

Should the changes apply only to the elements of first column (False) or to both.

False
load_file str

(optional) The full path to a binary file containing a serialized GraphEmbedding object. This file must be created using GraphEmbedding.save().

None

GraphEmbedding class constructor

Source code in mercury/graph/embeddings/graphembeddings.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def __init__(
    self,
    dimension=None,
    n_jumps=None,
    max_per_epoch=None,
    learn_step=3,
    bidirectional=False,
    load_file=None,
):
    """GraphEmbedding class constructor"""
    if load_file is None and (dimension is None or n_jumps is None):
        raise ValueError(
            "Parameters dimension and n_jumps are required when load_file is None"
        )

    self.dimension = dimension
    self.n_jumps = n_jumps
    self.max_per_epoch = max_per_epoch
    self.learn_step = learn_step
    self.bidirectional = bidirectional
    self.load_file = load_file

    if self.load_file is not None:
        self._load(self.load_file)
        return

__getitem__(arg)

Method to access rows in the embedding by ID.

Parameters:

Name Type Description Default
arg same as node ids in the graph

A node ID in the graph

required

Returns:

Type Description
matrix

A numpy matrix of one row

Source code in mercury/graph/embeddings/graphembeddings.py
116
117
118
119
120
121
122
123
124
125
126
127
def __getitem__(self, arg):
    """
    Method to access rows in the embedding by ID.

    Args:
        arg (same as node ids in the graph): A node ID in the graph

    Returns:
        (numpy.matrix): A numpy matrix of one row

    """
    return self.embeddings_.embeddings_matrix_[self.node_ids.index(arg)]

embedding()

Return the internal Embeddings object.

Returns:

Type Description
Embeddings

The embedding which is a dense matrix of float that can be used with numpy functions.

Source code in mercury/graph/embeddings/graphembeddings.py
205
206
207
208
209
210
211
212
213
214
215
def embedding(self):
    """
    Return the internal Embeddings object.

    Returns:
        (mercury.graph.embeddings.Embeddings): The embedding which is a dense matrix of `float` that can be used with `numpy` functions.
    """
    if not hasattr(self, "embeddings_"):
        return

    return self.embeddings_

fit(g)

Train the embedding by doing random walks.

Parameters:

Name Type Description Default
g mercury.graph Graph asset

A mercury.graph Graph object. The embedding will be created so that each row in the embedding maps a node ID in g.

required

Returns:

Type Description
self

Fitted self (or raises an error)

This does a number of random walks starting from a random node and selecting the edges with a probability that is proportional to the weight of the edge. If the destination node also has outgoing edges, the next step will start from it, otherwise, a new random node will be selected. The edges visited (concordant pairs) will get some reinforcement in the embedding while a randomly selected non-existent edges will get divergence instead (discordant pairs).

Internally, this stores the node IDS of the node visited and calls Embeddings.fit() to transfer the structure to the embedding. Of course, it can be called many times on the same GraphEmbedding.

Source code in mercury/graph/embeddings/graphembeddings.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def fit(self, g: Graph):
    """
    Train the embedding by doing random walks.

    Args:
        g (mercury.graph Graph asset): A `mercury.graph` Graph object. The embedding will be created so that each row in the embedding maps
            a node ID in g.

    Returns:
        (self): Fitted self (or raises an error)

    This does a number of random walks starting from a random node and selecting the edges with a probability that is proportional to
    the weight of the edge. If the destination node also has outgoing edges, the next step will start from it, otherwise, a new random
    node will be selected. The edges visited (concordant pairs) will get some reinforcement in the embedding while a randomly selected
    non-existent edges will get divergence instead (discordant pairs).

    Internally, this stores the node IDS of the node visited and calls Embeddings.fit() to transfer the structure to the embedding.
    Of course, it can be called many times on the same GraphEmbedding.

    """

    self.node_ids = list(g.networkx.nodes)

    j_matrix = nx.adjacency_matrix(g.networkx)

    N = j_matrix.shape[1]
    M = j_matrix.nnz

    self.r_ini = np.zeros(N, dtype=int)
    self.r_len = np.zeros(N, dtype=int)
    self.r_sum = np.zeros(N, dtype=float)
    self.r_col = np.zeros(M, dtype=int)
    self.r_wgt = np.zeros(M, dtype=float)

    i = 0
    for r in range(N):
        self.r_ini[r] = i

        i_col = j_matrix[[r], :].nonzero()[1]
        L = len(i_col)

        self.r_len[r] = L

        for k in range(L):
            c = i_col[k]
            w = j_matrix[r, c]

            self.r_sum[r] += w
            self.r_col[i] = c
            self.r_wgt[i] = w

            i += 1

    self.TotW = sum(self.r_sum)

    converge, diverge = _random_walks(
        self.r_ini,
        self.r_len,
        self.r_sum,
        self.r_col,
        self.r_wgt,
        self.TotW,
        self.n_jumps,
        self.max_per_epoch if self.max_per_epoch is not None else self.n_jumps,
    )

    self.embeddings_ = Embeddings(
        dimension=self.dimension,
        num_elements=len(self.node_ids),
        learn_step=self.learn_step,
        bidirectional=self.bidirectional,
    )
    self.embeddings_.fit(converge, diverge)

    return self

get_most_similar_nodes(node_id, k=5, metric='cosine', return_as_indices=False)

Returns the k most similar nodes and the similarities

Parameters:

Name Type Description Default
node_id object

Id of the node that we want to search the similar nodes.

required
k int

Number of most similar nodes to return

5
metric str

metric to use as a similarity.

'cosine'
return_as_indices bool

if return the nodes as indices (False), or as node ids (True)

False

Returns:

Type Description
list

list of k most similar nodes and list of similarities of the most similar nodes

DataFrame

A list of k most similar nodes as a pd.DataFrame[word: string, similarity: double]

Source code in mercury/graph/embeddings/graphembeddings.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def get_most_similar_nodes(
    self, node_id, k=5, metric="cosine", return_as_indices=False
):
    """
    Returns the k most similar nodes and the similarities

    Args:
        node_id (object): Id of the node that we want to search the similar nodes.
        k (int): Number of most similar nodes to return
        metric (str): metric to use as a similarity.
        return_as_indices (bool): if return the nodes as indices (False), or as node ids (True)

    Returns:
        (list): list of k most similar nodes and list of similarities of the most similar nodes
        (DataFrame): A list of k most similar nodes as a `pd.DataFrame[word: string, similarity: double]`
    """
    node_index = self.node_ids.index(node_id)

    ordered_indices, ordered_similarities = (
        self.embeddings_.get_most_similar_embeddings(node_index, k, metric)
    )

    if not return_as_indices:
        nodes = list(np.array(self.node_ids)[ordered_indices])
    else:
        nodes = list(ordered_indices)

    return pd.DataFrame({"word": nodes, "similarity": ordered_similarities})

save(file_name, save_embedding=False)

Saves a GraphEmbedding to a compressed binary file with or without the embedding itself. It saves the graph's node names and the adjacency matrix as a sparse matrix.

Parameters:

Name Type Description Default
file_name str

The name of the file to which the GraphEmbedding will be saved.

required
save_embedding bool

Since the embedding can be big and, if not trained, it is just a matrix of uniform random numbers it is possible avoiding saving it. In case it is not saved, loading the file will create a new random embedding. This parameter controls if the embedding is saved or not (the default value).

False
Source code in mercury/graph/embeddings/graphembeddings.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def save(self, file_name, save_embedding=False):
    """
    Saves a GraphEmbedding to a compressed binary file with or without the embedding itself. It saves the graph's node names
    and the adjacency matrix as a sparse matrix.

    Args:
        file_name (str): The name of the file to which the GraphEmbedding will be saved.
        save_embedding (bool): Since the embedding can be big and, if not trained, it is just a matrix of uniform random numbers it is
            possible avoiding saving it. In case it is not saved, loading the file will create a new random embedding. This parameter
            controls if the embedding is saved or not (the default value).
    """
    with bz2.BZ2File(file_name, "w") as f:
        pickle.dump(GraphEmbedding.FILE_HEAD, f)
        pickle.dump(save_embedding, f)
        pickle.dump(self.embeddings_.dimension, f)

        pickle.dump(self.node_ids, f)

        np.save(f, self.r_ini)
        np.save(f, self.r_len)
        np.save(f, self.r_sum)
        np.save(f, self.r_col)
        np.save(f, self.r_wgt)

        pickle.dump(self.TotW, f)

        if save_embedding:
            np.save(f, self.embeddings_.embeddings_matrix_)

        pickle.dump(GraphEmbedding.FILE_END, f)

mercury.graph.embeddings.SparkNode2Vec(dimension=None, sampling_ratio=1.0, num_epochs=10, num_paths_per_node=1, batch_size=1000000, w2v_max_iter=1, w2v_num_partitions=1, w2v_step_size=0.025, w2v_min_count=5, path_cache=None, use_cached_rw=False, n_partitions_cache=10, load_file=None)

Bases: BaseClass

Create or reload a SparkNode2Vec embedding mapping the nodes of a graph.

Parameters:

Name Type Description Default
dimension int

The number of columns in the embedding. See note the notes in Embeddings for details. (This parameter will be ignored when load_file is used.)

None
sampling_ratio float

The proportion from the total number of nodes to be used in parallel at each step (whenever possible).

1.0
num_epochs int

Number of epochs. This is the total number of steps the iteration goes through. At each step, sampling_ratio times the total number of nodes paths will be computed in parallel.

10
num_paths_per_node int

The amount of random walks to source from each node.

1
batch_size int

This forces caching the random walks computed so far and breaks planning each time this number of epochs is reached. The default value is a high number to avoid this entering at all. In really large jobs, you may want to set this parameter to avoid possible overflows even if it can add some extra time to the process. Note that with a high number of epochs and nodes resource requirements for the active part of your random walks can be high. This allows to "cache a continue" so to say.

1000000
w2v_max_iter int

This is the Spark Word2Vec parameter maxIter, the default value is the original default value.

1
w2v_num_partitions int

This is the Spark Word2Vec parameter numPartitions, the default value is the original default value.

1
w2v_step_size float

This is the Spark Word2Vec parameter stepSize, the default value is the original default value.

0.025
w2v_min_count int

This is the Spark Word2Vec parameter minCount, the default value is the original default value (5). Is the minimum number of times that a node has to appear to generate an embedding.

5
path_cache str

Folder where random walks will be stored, the default value is None which entails that random walks will not be stored.

None
use_cached_rw bool

Flag that indicates if random walks should be read from disk (hence, they will not be computed again). Setting this parameter to True requires a valid path_cache.

False
n_partitions_cache int

Number of partitions that will be used when storing the random walks, to optimize read access. The default value is 10.

10
load_file str

(optional) The full path to a parquet file containing a serialized SparkNode2Vec object. This file must be created using SparkNode2Vec.save().

None
Source code in mercury/graph/embeddings/spark_node2vec.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(
    self,
    dimension=None,
    sampling_ratio=1.0,
    num_epochs=10,
    num_paths_per_node=1,
    batch_size=1000000,
    w2v_max_iter=1,
    w2v_num_partitions=1,
    w2v_step_size=0.025,
    w2v_min_count=5,
    path_cache=None,
    use_cached_rw=False,
    n_partitions_cache=10,
    load_file=None,
):
    """
    Create or reload a SparkNode2Vec embedding mapping the nodes of a graph.

    Args:
        dimension (int): The number of columns in the embedding. See note the notes in `Embeddings` for details. (This parameter will be
            ignored when `load_file` is used.)
        sampling_ratio (float): The proportion from the total number of nodes to be used in parallel at each step (whenever possible).
        num_epochs (int): Number of epochs. This is the total number of steps the iteration goes through. At each step, sampling_ratio
            times the total number of nodes paths will be computed in parallel.
        num_paths_per_node (int): The amount of random walks to source from each node.
        batch_size (int): This forces caching the random walks computed so far and breaks planning each time this number of epochs
            is reached. The default value is a high number to avoid this entering at all. In really large jobs, you may want to
            set this parameter to avoid possible overflows even if it can add some extra time to the process. Note that with a high
            number of epochs and nodes resource requirements for the active part of your random walks can be high. This allows to
            "cache a continue" so to say.
        w2v_max_iter (int): This is the Spark Word2Vec parameter maxIter, the default value is the original default value.
        w2v_num_partitions (int): This is the Spark Word2Vec parameter numPartitions, the default value is the original default value.
        w2v_step_size (float): This is the Spark Word2Vec parameter stepSize, the default value is the original default value.
        w2v_min_count (int): This is the Spark Word2Vec parameter minCount, the default value is the original default value (5). Is the
            minimum number of times that a node has to appear to generate an embedding.
        path_cache (str): Folder where random walks will be stored, the default value is None which entails that random walks will not
            be stored.
        use_cached_rw (bool): Flag that indicates if random walks should be read from disk (hence, they will not be computed again).
            Setting this parameter to True requires a valid path_cache.
        n_partitions_cache (int): Number of partitions that will be used when storing the random walks, to optimize read access.
            The default value is 10.
        load_file (str): (optional) The full path to a parquet file containing a serialized SparkNode2Vec object. This file must be created
            using SparkNode2Vec.save().
    """
    self.dimension = dimension
    self.sampling_ratio = sampling_ratio
    self.num_epochs = num_epochs
    self.num_paths_per_node = num_paths_per_node
    self.batch_size = batch_size
    self.w2v_max_iter = w2v_max_iter
    self.w2v_num_partitions = w2v_num_partitions
    self.w2v_step_size = w2v_step_size
    self.w2v_min_count = w2v_min_count
    self.path_cache = path_cache
    self.use_cached_rw = use_cached_rw
    self.n_partitions_cache = n_partitions_cache
    self.load_file = load_file

    if self.load_file is not None:
        self._load(self.load_file)
        return

embedding()

Return all embeddings.

Returns:

Type Description
DataFrame

All embeddings as a DataFrame[word: string, vector: vector].

Source code in mercury/graph/embeddings/spark_node2vec.py
196
197
198
199
200
201
202
203
204
205
206
def embedding(self):
    """
    Return all embeddings.

    Returns:
        (DataFrame): All embeddings as a `DataFrame[word: string, vector: vector]`.
    """
    if not hasattr(self, "node2vec_"):
        return

    return self.node2vec_.getVectors()

fit(G)

Train the embedding by doing random walks.

Random walk paths are available in attribute paths_.

Parameters:

Name Type Description Default
G Graph

A mercury.graph Graph object. The embedding will be created so that each row in the embedding maps a node ID in G. (This parameter will be ignored when load_file is used.)

required

Returns:

Type Description
self

Fitted self (or raises an error)

Source code in mercury/graph/embeddings/spark_node2vec.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def fit(self, G: Graph):
    """
    Train the embedding by doing random walks.

    Random walk paths are available in attribute `paths_`.

    Args:
        G (mercury.graph.core.Graph): A `mercury.graph` Graph object. The embedding will be created so that each row in the embedding maps
            a node ID in G. (This parameter will be ignored when `load_file` is used.)

    Returns:
        (self): Fitted self (or raises an error)
    """

    if self.path_cache is None:
        if self.use_cached_rw:
            logging.warning(
                "Wrong options (use_cached_rw and no path_cache). "
                "Paths will be recomputed."
            )
        self.use_cached_rw = False

    if not self.use_cached_rw:
        paths = (
            self._run_rw(G)
            .withColumn("size", f.size("random_walks"))
            .where(f.col("size") > 1)
            .drop("size")
        )

        if self.path_cache is not None:
            (
                paths.repartition(self.n_partitions_cache)
                .write.mode("overwrite")
                .parquet("%s/block=0" % self.path_cache)
            )

        if self.num_paths_per_node > 1:
            for block_id in range(1, self.num_paths_per_node):
                new_paths = (
                    self._run_rw(G)
                    .withColumn("size", f.size("random_walks"))
                    .where(f.col("size") > 1)
                    .drop("size")
                )
                if self.path_cache is None:
                    paths = paths.unionByName(new_paths)
                else:
                    (
                        new_paths.repartition(self.n_partitions_cache)
                        .write.mode("overwrite")
                        .parquet("%s/block=%d" % (self.path_cache, block_id))
                    )
                    # With this, we clear the persisted dataframe
                    new_paths.unpersist()

    if self.path_cache is None:
        self.paths_ = paths.persist()
    else:
        self.paths_ = (
            SparkInterface()
            .read_parquet(self.path_cache)
            .drop("block")
            .repartition(self.n_partitions_cache)
            .persist()
        )

    w2v = Word2Vec(
        vectorSize=self.dimension,
        maxIter=self.w2v_max_iter,
        numPartitions=self.w2v_num_partitions,
        stepSize=self.w2v_step_size,
        inputCol="random_walks",
        outputCol="model",
        minCount=self.w2v_min_count,
    )

    self.node2vec_ = w2v.fit(self.paths_)

    return self

get_most_similar_nodes(node_id, k=5)

Returns the k most similar nodes and a similarity measure.

Parameters:

Name Type Description Default
node_id str

Id of the node we want to search.

required
k int

Number of most similar nodes to return

5

Returns:

Type Description
DataFrame

A list of k most similar nodes (using cosine similarity) as a DataFrame[word: string, similarity: double]

Source code in mercury/graph/embeddings/spark_node2vec.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def get_most_similar_nodes(self, node_id, k=5):
    """
    Returns the k most similar nodes and a similarity measure.

    Args:
        node_id (str): Id of the node we want to search.
        k (int): Number of most similar nodes to return

    Returns:
        (DataFrame): A list of k most similar nodes (using cosine similarity) as a `DataFrame[word: string, similarity: double]`
    """
    if not hasattr(self, "node2vec_"):
        return

    return self.node2vec_.findSynonyms(node_id, k)

model()

Returns the Spark Word2VecModel object.

Returns:

Type Description
Word2VecModel

The Spark Word2VecModel of the embedding to use its API directly.

Source code in mercury/graph/embeddings/spark_node2vec.py
208
209
210
211
212
213
214
215
216
217
218
def model(self):
    """
    Returns the Spark Word2VecModel object.

    Returns:
        (pyspark.ml.feature.Word2VecModel): The Spark Word2VecModel of the embedding to use its API directly.
    """
    if not hasattr(self, "node2vec_"):
        return

    return self.node2vec_

save(file_name)

Saves the internal Word2VecModel to a human-readable (JSON) model metadata as a Parquet formatted data file.

The model may be loaded using SparkNode2Vec(load_file='path/file')

Parameters:

Name Type Description Default
file_name str

The name of the file to which the Word2VecModel will be saved.

required
Source code in mercury/graph/embeddings/spark_node2vec.py
236
237
238
239
240
241
242
243
244
245
246
247
248
def save(self, file_name):
    """
    Saves the internal Word2VecModel to a human-readable (JSON) model metadata as a Parquet formatted data file.

    The model may be loaded using SparkNode2Vec(load_file='path/file')

    Args:
        file_name (str): The name of the file to which the Word2VecModel will be saved.
    """
    if not hasattr(self, "node2vec_"):
        return

    return self.node2vec_.save(file_name)