Skip to content

Intake

reels.Intake

AccumulatorParam()

Dummy to avoid parsing error when there is no pyspark. In that case it will never be called.

Source code in reels/Intake.py
43
44
def __init__(self):
    pass

Intake(dataframe, spark_method='local_iterator')

Utility class to efficiently populate any reels object with data either from pandas or pyspark dataframes.

This object implements data populating methods (in plural) that call the equivalent methods (in singular) over a complete dataframe.

  • insert_rows() is Events.insert_row() for each row
  • define_events() is Events.define_event() for each row
  • scan_events() is Clips.scan_event() for each row
  • insert_targets() is Targets.insert_target() for each row

Parameters:

Name Type Description Default
dataframe pd.DataFrame

Either a pandas or a pyspark dataframe with the data to be loaded into reels objects.

required
spark_method str

This only applies to pyspark dataframe. It has two possible values 'local_iterator' (default) the safest and less RAM consuming. If your environment has many workers (and therefore you would want to improve performance via parallelism) and you have enough RAM in the driver to hold a list of tuples with the values you want to load, you can try the more efficient but also more experimental 'accumulator' value.

'local_iterator'
Source code in reels/Intake.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def __init__(self, dataframe: pd.DataFrame, spark_method: str='local_iterator'):
    self.pd_data = None
    self.sp_data = None

    if type(dataframe) == pd.DataFrame:
        self.pd_data = dataframe

        return

    if PYSPARK and type(dataframe) == pyspark.sql.dataframe.DataFrame:
        self.sp_data = dataframe

        self.use_accumulator = (spark_method == 'accumulator') and SPARK is not None

        return

    raise ValueError

define_events(events, columns=None)

Populate an Events object calling events.define_event() over the entire dataframe.

Parameters:

Name Type Description Default
events object

The Events object to be filled with data.

required
columns str

A list with the names of the four columns containing (emitter, description, weight, code) in the dataframe. The default value is ['emitter', 'description', 'weight', 'code'].

None
Source code in reels/Intake.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def define_events(self, events: object, columns: str=None):
    """Populate an Events object calling events.define_event() over the entire dataframe.

    Args:
        events:  The Events object to be filled with data.
        columns: A list with the names of the four columns containing (emitter, description, weight, code) in the dataframe.
                 The default value is ['emitter', 'description', 'weight', 'code'].
    """

    if columns is None:
        columns = ['emitter', 'description', 'weight', 'code']

    lambda_f = lambda row: events.define_event(     # noqa: E731
        str(row[columns[0]]),
        str(row[columns[1]]),
        float(row[columns[2]]),
        int(row[columns[3]]),
    )

    if self.pd_data is not None:
        self.pd_data.apply(lambda_f, axis=1)
    else:
        for row in self.sp_data.rdd.toLocalIterator():
            lambda_f(row)

insert_rows(events, columns=None)

Populate an Events object calling events.insert_row() over the entire dataframe.

Parameters:

Name Type Description Default
events object

The Events object to be filled with data.

required
columns list

A list with the names of the three columns containing (emitter, description, weight) in the dataframe. The default value is ['emitter', 'description', 'weight'].

None
Source code in reels/Intake.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def insert_rows(self, events: object, columns: list=None):
    """Populate an Events object calling events.insert_row() over the entire dataframe.

    Args:
        events:  The Events object to be filled with data.
        columns: A list with the names of the three columns containing (emitter, description, weight) in the dataframe.
                 The default value is ['emitter', 'description', 'weight'].
    """

    if columns is None:
        columns = ['emitter', 'description', 'weight']

    if self.sp_data is not None and self.use_accumulator:
        events_acc.value = events

        def f(row):
            global events_acc

            tup = (
                str(row[columns[0]]),
                str(row[columns[1]]),
                float(row[columns[2]]),
            )

            events_acc += tup

        self.sp_data.foreach(f)

        return

    lambda_f = lambda row: events.insert_row(                               # noqa: E731
        str(row[columns[0]]), str(row[columns[1]]), float(row[columns[2]])
    )

    if self.pd_data is not None:
        self.pd_data.apply(lambda_f, axis=1)
    else:
        for row in self.sp_data.rdd.toLocalIterator():
            lambda_f(row)

insert_targets(targets, columns=None)

Populate a Targets object calling targets.insert_target() over the entire dataframe.

Parameters:

Name Type Description Default
targets object

The Targets object to be filled with data.

required
columns list

A list with the names of the two columns containing (client, time) in the dataframe. The default value is ['client', 'time'].

None
Source code in reels/Intake.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def insert_targets(self, targets: object, columns: list=None):
    """Populate a Targets object calling targets.insert_target() over the entire dataframe.

    Args:
        targets: The Targets object to be filled with data.
        columns: A list with the names of the two columns containing (client, time) in the dataframe.
                 The default value is ['client', 'time'].
    """

    if columns is None:
        columns = ['client', 'time']

    lambda_f = lambda row: targets.insert_target(   # noqa: E731
        str(row[columns[0]]), str(row[columns[1]])
    )

    if self.pd_data is not None:
        self.pd_data.apply(lambda_f, axis=1)
    else:
        for row in self.sp_data.rdd.toLocalIterator():
            lambda_f(row)

scan_events(clips, columns=None)

Populate a Clips object calling clips.scan_event() over the entire dataframe.

Parameters:

Name Type Description Default
clips object

The Clips object to be filled with data.

required
columns str

A list with the names of the five columns containing (emitter, description, weight, client, time) in the dataframe. The default value is ['emitter', 'description', 'weight', 'client', 'time'].

None
Source code in reels/Intake.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def scan_events(self, clips: object, columns: str=None):
    """Populate a Clips object calling clips.scan_event() over the entire dataframe.

    Args:
        clips:   The Clips object to be filled with data.
        columns: A list with the names of the five columns containing (emitter, description, weight, client, time) in the dataframe.
                 The default value is ['emitter', 'description', 'weight', 'client', 'time'].
    """

    if columns is None:
        columns = ['emitter', 'description', 'weight', 'client', 'time']

    if self.sp_data is not None and self.use_accumulator:
        clips_acc.value = clips

        def f(row):
            global clips_acc

            tup = (
                str(row[columns[0]]),
                str(row[columns[1]]),
                float(row[columns[2]]),
                str(row[columns[3]]),
                str(row[columns[4]]),
            )

            clips_acc += tup

        self.sp_data.foreach(f)

        return

    lambda_f = lambda row: clips.scan_event(    # noqa: E731
        str(row[columns[0]]),
        str(row[columns[1]]),
        float(row[columns[2]]),
        str(row[columns[3]]),
        str(row[columns[4]]),
    )

    if self.pd_data is not None:
        self.pd_data.apply(lambda_f, axis=1)
    else:
        for row in self.sp_data.rdd.toLocalIterator():
            lambda_f(row)

SparkClipsAcc

Bases: AccumulatorParam

This is an internal AccumulatorParam descendant to propagate the tuples created by a user defined function (inside the Intake) applied via .foreach() in the workers. It is a reducer in a map/reduce paradigm generating a complete reduced result in the driver.

This specific one is able to manage a reels.Clips object and feed the tuples to it via its .scan_event() method.

CAVEAT!!: This object collects all the tuples in lists before they are applied to a unique reels.Clips object in the driver. It is intended to operate in environments where data is not too big (compared to available RAM in the driver) and computation resources (number of workers) is high and therefore the process will benefit from parallelism. In any other setting, you should not use this class. This class is created when you construct an Intake object with spark_method == 'accumulator'. The much safer (and possibly slower) spark_method == 'local_iterator' will not require a lot of RAM.

SparkEventsAcc

Bases: AccumulatorParam

This is an internal AccumulatorParam descendant to propagate the tuples created by a user defined function (inside the Intake) applied via .foreach() in the workers. It is a reducer in a map/reduce paradigm generating a complete reduced result in the driver.

This specific one is able to manage a reels.Events object and feed the tuples to it via its .insert_row() method.

CAVEAT!!: This object collects all the tuples in lists before they are applied to a unique reels.Events object in the driver. It is intended to operate in environments where data is not too big (compared to available RAM in the driver) and computation resources (number of workers) is high and therefore the process will benefit from parallelism. In any other setting, you should not use this class. This class is created when you construct an Intake object with spark_method == 'accumulator'. The much safer (and possibly slower) spark_method == 'local_iterator' will not require a lot of RAM.