File to DataFrame Loader
DataLoader module is used to load data files content into a dataframe using custom loader function. This loader function can be configured to use different processing methods, such as single-threaded, multiprocess, dask, or dask_thread, as determined by the MORPHEUS_FILE_DOWNLOAD_TYPE
environment variable. When download_method starts with “dask,” a dask client is created to process the files, otherwise, a single thread or multiprocess is used.
After processing, the resulting dataframe is cached using a hash of the file paths. This loader also has the ability to load file content from S3 buckets, in addition to loading data from the disk.
Using below configuration while loading DataLoader module, specifies that the DataLoader module should utilize the file_to_df
loader when loading files into a dataframe.
{
"loaders": [{
"id": "file_to_df"
}]
}
Note : Loaders can receive configuration from the load
task via control message during runtime.
The parameters that can be configured for this specific loader at load task level:
Parameter |
Type |
Description |
Example Value |
Default Value |
---|---|---|---|---|
|
dictionary |
Options for batching |
See below |
|
|
array |
List of files to load |
[“/path/to/input/files”] |
|
|
string |
Unique identifier for the loader |
“file_to_df” |
|
Key |
Type |
Description |
Example Value |
Default Value |
---|---|---|---|---|
|
string |
Directory to cache the rolling window data |
“/path/to/cache” |
|
|
string |
Type of the input file |
“csv” |
|
|
boolean |
Whether to filter out null values |
true |
|
|
dictionary |
Keyword arguments to pass to the parser |
{“delimiter”: “,”} |
|
|
dictionary |
Schema of the input data |
See Below |
|
|
string |
Name of the timestamp column |
“timestamp” |
|
Below JSON configuration specifies how to pass additional configuration to the loader through a control message task at runtime.
{
"type": "load",
"properties": {
"loader_id": "file_to_df",
"files": ["/path/to/input/files"],
"batcher_config": {
"timestamp_column_name": "timestamp_column_name",
"schema": "string",
"file_type": "JSON",
"filter_null": false,
"parser_kwargs": {
"delimiter": ","
},
"cache_dir": "/path/to/cache"
}
}
}
Note : The file_batcher module currently generates tasks internally and assigns them to control messages, and then sends them to DataLoader module which uses file_to_df_loader. Having stated that, this loader configuration is obtained from the File Batcher module configuration.