morpheus.utils.sql_utils.SQLUtils#
- class SQLUtils(sql_config, pool_size=5, max_overflow=5)[source]#
Bases:
objectUtility class for working with SQL databases using SQLAlchemy.
- Parameters:
- sql_configtyping.Dict[any, any]
Configuration parameters for the SQL connection.
- pool_sizeint
The number of connections to keep in the connection pool (default: 5).
- max_overflowint
The maximum number of connections that can be created beyond the pool_size (default: 5).
- Attributes:
- connection_string
Methods
add_columns_if_not_exists(table_name, ...[, ...])Adds columns to a table if they don't already exist.
close()Closes the database connection.
connect()Establishes a connection to the database using the specified SQL configuration.
create_table(table_name, columns)Creates a table in the database.
drop_table(table_name)Drops a table from the database if it exists.
execute_queries_in_transaction(queries[, params])Executes multiple SQL queries within a transaction.
execute_query(query[, params])Executes a SQL query without returning any result.
execute_query_with_result(query[, params])Executes a SQL query and returns the result as a list of rows.
gen_placeholder_str(count)Generates the placeholder string for SQL query parameters.
gen_sql_conn_str(sql_config)Generates the SQL connection string based on the SQL configuration.
get_existing_column_rows(table_name)Retrieves the existing column names of a table.
get_table_columns(table_name)Retrieves the column names of a table.
insert_data(table_name, values[, columns])Inserts data into a table.
to_dataframe(query[, params])Executes a SQL query and returns the result as a Pandas DataFrame.
to_table(df, table_name[, if_exists, index])Converts a Pandas DataFrame to a SQL table.
- add_columns_if_not_exists(
- table_name,
- columns,
- data_type,
- default_value=None,
Adds columns to a table if they don’t already exist.
- Parameters:
- table_namestr
The name of the table.
- columnstyping.List[str]
The column names to add.
- data_typestr
The data type of the columns.
- default_valuetyping.Union[int, str], optional
The default value for the columns (default: None).
- Returns:
- new_col_addedbool
True if columns were added, False otherwise.
- create_table(table_name, columns)[source]#
Creates a table in the database.
- Parameters:
- table_namestr
The name of the table to create.
- columnsstr
The column definitions for the table.
- drop_table(table_name)[source]#
Drops a table from the database if it exists.
- Parameters:
- table_namestr
The name of the table to drop.
- execute_queries_in_transaction(queries, params=())[source]#
Executes multiple SQL queries within a transaction.
- Parameters:
- queriestyping.List[str]
The list of SQL queries to execute.
- paramstyping.Tuple, optional
The query parameters (default: ()).
- execute_query(query, params=())[source]#
Executes a SQL query without returning any result.
- Parameters:
- querystr
The SQL query to execute.
- paramstyping.Tuple, optional
The query parameters (default: ()).
- execute_query_with_result(query, params=())[source]#
Executes a SQL query and returns the result as a list of rows.
- Parameters:
- querystr
The SQL query to execute.
- paramstyping.Tuple, optional
The query parameters (default: ()).
- Returns:
- rowstyping.List[Row]
The result of the query as a list of rows.
- gen_placeholder_str(count)[source]#
Generates the placeholder string for SQL query parameters.
- Parameters:
- countint
The number of placeholders to generate.
- Returns:
- placeholdersstr
The generated placeholder string.
- Raises:
- RuntimeError
If the count is invalid.
- classmethod gen_sql_conn_str(sql_config)[source]#
Generates the SQL connection string based on the SQL configuration.
- Parameters:
- sql_configtyping.Dict
Configuration parameters for the SQL connection.
- Returns:
- connection_stringstr
The generated SQL connection string.
- Raises:
- RuntimeError
If required SQL parameters are missing or invalid.
- get_existing_column_rows(table_name)[source]#
Retrieves the existing column names of a table.
- Parameters:
- table_namestr
The name of the table.
- Returns:
- existing_columnstyping.List[str]
The existing column names.
- Raises:
- exc.SQLAlchemyError
If an error occurs while retrieving the existing columns.
- get_table_columns(table_name)[source]#
Retrieves the column names of a table.
- Parameters:
- table_namestr
The name of the table.
- Returns:
- columnstyping.List[str]
The column names.
- Raises:
- exc.SQLAlchemyError
If an error occurs while retrieving the columns.
- insert_data(table_name, values, columns=None)[source]#
Inserts data into a table.
- Parameters:
- table_namestr
The name of the table to insert data into.
- valuestyping.Tuple
The values to insert.
- columnstyping.List[str], optional
The column names to insert data into (default: None).
- to_dataframe(query, params=None)[source]#
Executes a SQL query and returns the result as a Pandas DataFrame.
- Parameters:
- querystr
The SQL query to execute.
- paramstyping.Tuple, optional
The query parameters (default: None).
- Returns:
- dfpd.DataFrame
The result of the query as a Pandas DataFrame.
- Raises:
- exc.SQLAlchemyError
If an error occurs while executing the query or converting the result to a DataFrame.
- to_table(df, table_name, if_exists='replace', index=False)[source]#
Converts a Pandas DataFrame to a SQL table.
- Parameters:
- dfpd.DataFrame
The DataFrame to convert to a table.
- table_namestr
The name of the table.
- if_existsstr, optional
Action to take if the table already exists (default: “replace”).
- indexbool, optional
Whether to include the DataFrame index as a column in the table (default: False).
- Raises:
- exc.SQLAlchemyError
If an error occurs while converting the DataFrame to a table.