mdx.behavior_learning.ingestion module
- class Ingestion(config)
Bases:
object
Data ingestion module for behavior data. Provides helper functions for data ingestion via spark structured streaming.
- Parameters:
config (dict) – configuration for the module
ingestion = Ingestion(config)
- dedup_and_upsert_to_delta(micro_batch_output_df, batch_id)
Batch writer to upsert data into the delta table. Deduplication of data is done within and across batches. If table is not present creates a new table.
- Parameters:
micro_batch_output_df (pyspark.sql.DataFrame) – dataframe containing the micro batch to write.
batch_id (int) – id of the micro batch
- Returns:
none
query = formatted_df.writeStream.foreachBatch(ingestion.dedup_and_upsert_to_delta)
- format_data(df)
Formats the data to the requirements of
dedup_and_upsert_to_delta()
. Adds column hourOfDay which is required for creating partitions in the delta table.- Parameters:
spark_df (pyspark.sql.DataFrame) – spark dataframe to format
- Returns:
returns the formatted dataframe.
- Return type:
pyspark.sql.Dataframe
formatted_df = ingestion.format_data()
- get_schema()
Returns schema of data. Schema is read from an internal json file.
- Returns:
returns the schema of the behavior data.
- Return type:
pyspark.sql.types.StructType
behavior_schema = ingestion.get_schema()
- update_config(new_config)
Updates the config of the module with the user provided config.
- Parameters:
new_config (dict) – new config for update.
- Returns:
none
ingestion.update_config(new_config)
- vacuum_delta()
Vacuums the delta table based on the data retention period set in config.
- Returns:
none
ingestion.vacuum_delta()