mdx.behavior_learning.ingestion module

class Ingestion(config)

Bases: object

Data ingestion module for behavior data. Provides helper functions for data ingestion via spark structured streaming.

Parameters:

config (dict) – configuration for the module

ingestion = Ingestion(config)
dedup_and_upsert_to_delta(micro_batch_output_df, batch_id)

Batch writer to upsert data into the delta table. Deduplication of data is done within and across batches. If table is not present creates a new table.

Parameters:
  • micro_batch_output_df (pyspark.sql.DataFrame) – dataframe containing the micro batch to write.

  • batch_id (int) – id of the micro batch

Returns:

none

query = formatted_df.writeStream.foreachBatch(ingestion.dedup_and_upsert_to_delta)
format_data(df)

Formats the data to the requirements of dedup_and_upsert_to_delta() . Adds column hourOfDay which is required for creating partitions in the delta table.

Parameters:

spark_df (pyspark.sql.DataFrame) – spark dataframe to format

Returns:

returns the formatted dataframe.

Return type:

pyspark.sql.Dataframe

formatted_df = ingestion.format_data()
get_schema()

Returns schema of data. Schema is read from an internal json file.

Returns:

returns the schema of the behavior data.

Return type:

pyspark.sql.types.StructType

behavior_schema = ingestion.get_schema()
update_config(new_config)

Updates the config of the module with the user provided config.

Parameters:

new_config (dict) – new config for update.

Returns:

none

ingestion.update_config(new_config)
vacuum_delta()

Vacuums the delta table based on the data retention period set in config.

Returns:

none

ingestion.vacuum_delta()