morpheus.utils.sql_utils.SQLUtils
- class SQLUtils(sql_config, pool_size=5, max_overflow=5)[source]
Bases:
object
Utility class for working with SQL databases using SQLAlchemy.
- Parameters
- sql_configtyping.Dict[any, any]
Configuration parameters for the SQL connection.
- pool_sizeint
The number of connections to keep in the connection pool (default: 5).
- max_overflowint
The maximum number of connections that can be created beyond the pool_size (default: 5).
- Attributes
- connection_string
Methods
add_columns_if_not_exists
(table_name, ...[, ...])Adds columns to a table if they don't already exist. close
()Closes the database connection. connect
()Establishes a connection to the database using the specified SQL configuration. create_table
(table_name, columns)Creates a table in the database. drop_table
(table_name)Drops a table from the database if it exists. execute_queries_in_transaction
(queries[, params])Executes multiple SQL queries within a transaction. execute_query
(query[, params])Executes a SQL query without returning any result. execute_query_with_result
(query[, params])Executes a SQL query and returns the result as a list of rows. gen_placeholder_str
(count)Generates the placeholder string for SQL query parameters. gen_sql_conn_str
(sql_config)Generates the SQL connection string based on the SQL configuration. get_existing_column_rows
(table_name)Retrieves the existing column names of a table. get_table_columns
(table_name)Retrieves the column names of a table. insert_data
(table_name, values[, columns])Inserts data into a table. to_dataframe
(query[, params])Executes a SQL query and returns the result as a Pandas DataFrame. to_table
(df, table_name[, if_exists, index])Converts a Pandas DataFrame to a SQL table. - add_columns_if_not_exists(table_name, columns, data_type, default_value=None)[source]
Adds columns to a table if they don’t already exist.
- Parameters
- table_namestr
The name of the table.
- columnstyping.List[str]
The column names to add.
- data_typestr
The data type of the columns.
- default_valuetyping.Union[int, str], optional
The default value for the columns (default: None).
- Returns
- new_col_addedbool
True if columns were added, False otherwise.
- close()[source]
Closes the database connection.
- connect()[source]
Establishes a connection to the database using the specified SQL configuration.
- create_table(table_name, columns)[source]
Creates a table in the database.
- Parameters
- table_namestr
The name of the table to create.
- columnsstr
The column definitions for the table.
- drop_table(table_name)[source]
Drops a table from the database if it exists.
- Parameters
- table_namestr
The name of the table to drop.
- execute_queries_in_transaction(queries, params=())[source]
Executes multiple SQL queries within a transaction.
- Parameters
- queriestyping.List[str]
The list of SQL queries to execute.
- paramstyping.Tuple, optional
The query parameters (default: ()).
- execute_query(query, params=())[source]
Executes a SQL query without returning any result.
- Parameters
- querystr
The SQL query to execute.
- paramstyping.Tuple, optional
The query parameters (default: ()).
- execute_query_with_result(query, params=())[source]
Executes a SQL query and returns the result as a list of rows.
- Parameters
- querystr
The SQL query to execute.
- paramstyping.Tuple, optional
The query parameters (default: ()).
- Returns
- rowstyping.List[Row]
The result of the query as a list of rows.
- gen_placeholder_str(count)[source]
Generates the placeholder string for SQL query parameters.
- Parameters
- countint
The number of placeholders to generate.
- Returns
- placeholdersstr
The generated placeholder string.
- Raises
- RuntimeError
If the count is invalid.
- classmethod gen_sql_conn_str(sql_config)[source]
Generates the SQL connection string based on the SQL configuration.
- Parameters
- sql_configtyping.Dict
Configuration parameters for the SQL connection.
- Returns
- connection_stringstr
The generated SQL connection string.
- Raises
- RuntimeError
If required SQL parameters are missing or invalid.
- get_existing_column_rows(table_name)[source]
Retrieves the existing column names of a table.
- Parameters
- table_namestr
The name of the table.
- Returns
- existing_columnstyping.List[str]
The existing column names.
- Raises
- exc.SQLAlchemyError
If an error occurs while retrieving the existing columns.
- get_table_columns(table_name)[source]
Retrieves the column names of a table.
- Parameters
- table_namestr
The name of the table.
- Returns
- columnstyping.List[str]
The column names.
- Raises
- exc.SQLAlchemyError
If an error occurs while retrieving the columns.
- insert_data(table_name, values, columns=None)[source]
Inserts data into a table.
- Parameters
- table_namestr
The name of the table to insert data into.
- valuestyping.Tuple
The values to insert.
- columnstyping.List[str], optional
The column names to insert data into (default: None).
- to_dataframe(query, params=None)[source]
Executes a SQL query and returns the result as a Pandas DataFrame.
- Parameters
- querystr
The SQL query to execute.
- paramstyping.Tuple, optional
The query parameters (default: None).
- Returns
- dfpd.DataFrame
The result of the query as a Pandas DataFrame.
- Raises
- exc.SQLAlchemyError
If an error occurs while executing the query or converting the result to a DataFrame.
- to_table(df, table_name, if_exists='replace', index=False)[source]
Converts a Pandas DataFrame to a SQL table.
- Parameters
- dfpd.DataFrame
The DataFrame to convert to a table.
- table_namestr
The name of the table.
- if_existsstr, optional
Action to take if the table already exists (default: “replace”).
- indexbool, optional
Whether to include the DataFrame index as a column in the table (default: False).
- Raises
- exc.SQLAlchemyError
If an error occurs while converting the DataFrame to a table.