File to DataFrame Loader#
DataLoader module is used to load data files content into a DataFrame using custom loader function. This loader function can be configured to use different processing methods, such as "single_thread", "dask", or "dask_thread", as determined by the MORPHEUS_FILE_DOWNLOAD_TYPE environment variable. When download_method is "dask", or "dask_thread", a Dask client is created to process the files, otherwise, a single thread is used.
After processing, the resulting DataFrame is cached using a hash of the file paths. This loader also has the ability to load file content from S3 buckets, in addition to loading data from the disk.
Example Loader Configuration#
Using below configuration while loading DataLoader module, specifies that the DataLoader module should utilize the file_to_df loader when loading files into a DataFrame.
{
	"loaders": [{
		"id": "file_to_df"
	}]
}
Note :  Loaders can receive configuration from the load task via control message during runtime.
Task Configurable Parameters#
The parameters that can be configured for this specific loader at load task level:
| Parameter | Type | Description | Example Value | Default Value | 
|---|---|---|---|---|
| 
 | dictionary | Options for batching | Refer Below | 
 | 
| 
 | array | List of files to load | 
 | 
 | 
| 
 | string | Unique identifier for the loader | 
 | 
 | 
batcher_config#
| Key | Type | Description | Example Value | Default Value | 
|---|---|---|---|---|
| 
 | string | Directory to cache the rolling window data | 
 | 
 | 
| 
 | string | Type of the input file | 
 | 
 | 
| 
 | boolean | Whether to filter out null values | 
 | 
 | 
| 
 | dictionary | Keyword arguments to pass to the parser | 
 | 
 | 
| 
 | dictionary | Schema of the input data | Refer Below | 
 | 
| 
 | string | Name of the timestamp column | 
 | 
 | 
Example Load Task Configuration#
Below JSON configuration specifies how to pass additional configuration to the loader through a control message task at runtime.
{
	"type": "load",
	"properties": {
		"loader_id": "file_to_df",
		"files": ["/path/to/input/files"],
		"batcher_config": {
			"timestamp_column_name": "timestamp_column_name",
			"schema": "string",
			"file_type": "JSON",
			"filter_null": false,
			"parser_kwargs": {
				"delimiter": ","
			},
			"cache_dir": "/path/to/cache"
		}
	}
}
Note : The file_batcher module currently generates tasks internally and assigns them to control messages, and then sends them to DataLoader module which uses file_to_df_loader. Having stated that, this loader configuration is obtained from the File Batcher module configuration.