astro.databases.base
Module Contents
Classes
Base class to represent all the Database interactions. |
- class astro.databases.base.BaseDatabase(conn_id)
Bases:
abc.ABC
Base class to represent all the Database interactions.
The goal is to be able to support new databases by adding a new module to the astro/databases directory, without the need of changing other modules and classes.
The exception is if the Airflow connection type does not match the new Database module name. In that case, we should update the dictionary CUSTOM_CONN_TYPE_TO_MODULE_PATH available at astro/databases/__init__.py.
- Parameters
conn_id (str) –
- _create_schema_statement :str = CREATE SCHEMA IF NOT EXISTS {}
- _drop_table_statement :str = DROP TABLE IF EXISTS {}
- _create_table_statement :str = CREATE TABLE IF NOT EXISTS {} AS {}
- illegal_column_name_chars :List[str] = []
- illegal_column_name_chars_replacement :List[str] = []
- __repr__()
Return repr(self).
- property sql_type
- property hook
Return an instance of the database-specific Airflow hook.
- Return type
airflow.hooks.dbapi.DbApiHook
- property connection
Return a Sqlalchemy connection object for the given database.
- Return type
sqlalchemy.engine.base.Connection
- property sqlalchemy_engine
Return Sqlalchemy engine.
- Return type
sqlalchemy.engine.base.Engine
- run_sql(sql_statement, parameters=None)
Return the results to running a SQL statement.
Whenever possible, this method should be implemented using Airflow Hooks, since this will simplify the integration with Async operators.
- Parameters
sql_statement (Union[str, sqlalchemy.sql.ClauseElement]) – Contains SQL query to be run against database
parameters (Optional[dict]) – Optional parameters to be used to render the query
- table_exists(table)
Check if a table exists in the database.
- Parameters
table (astro.sql.table.Table) – Details of the table we want to check that exists
- Return type
bool
- static get_merge_initialization_query(parameters)
Handles database-specific logic to handle constraints, keeping it agnostic to database.
- Parameters
parameters (Tuple) –
- Return type
str
- static get_table_qualified_name(table)
Return table qualified name. This is Database-specific. For instance, in Sqlite this is the table name. In Snowflake, however, it is the database, schema and table
- Parameters
table (astro.sql.table.Table) – The table we want to retrieve the qualified name for.
- Return type
str
- property default_metadata
Extract the metadata available within the Airflow connection associated with self.conn_id.
- Returns
a Metadata instance
- Return type
- populate_table_metadata(table)
Given a table, check if the table has metadata. If the metadata is missing, and the database has metadata, assign it to the table. If the table schema was not defined by the end, retrieve the user-defined schema. This method performs the changes in-place and also returns the table.
- Parameters
table (astro.sql.table.Table) – Table to potentially have their metadata changed
- Return table
Return the modified table
- Return type
- create_table(table)
Create a SQL table. For this method to work, the table instance must contain columns.
- Parameters
table (astro.sql.table.Table) – The table to be created.
- Return type
None
- create_table_from_select_statement(statement, target_table, parameters=None)
Export the result rows of a query statement into another table.
- Parameters
statement (str) – SQL query statement
target_table (astro.sql.table.Table) – Destination table where results will be recorded.
parameters (Optional[dict]) – (Optional) parameters to be used to render the SQL query
- Return type
None
- drop_table(table)
Delete a SQL table, if it exists.
- Parameters
table (astro.sql.table.Table) – The table to be deleted.
- Return type
None
- load_file_to_table(source_file, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
Load the content of the source file to the target database table. If the table already exists, append or replace the content, depending on the value of if_exists.
- Parameters
source_file (astro.files.File) – Local or remote filepath (e.g. a File(“/tmp/sample_data.csv”))
target_table (astro.sql.table.Table) – Table in which the file will be loaded
if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists
chunk_size (int) – Specify the number of rows in each batch to be written at a time.
- Return type
None
- load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.
- Parameters
source_dataframe (pandas.DataFrame) – Local or remote filepath
target_table (astro.sql.table.Table) – Table in which the file will be loaded
if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.
chunk_size (int) – Specify the number of rows in each batch to be written at a time.
- Return type
None
- append_table(source_table, target_table, source_to_target_columns_map)
Append the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.
- Parameters
source_table (astro.sql.table.Table) – Contains the rows to be appended to the target_table
target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be appended
source_to_target_columns_map (Dict[str, str]) – Dict of source_table columns names to target_table columns names
- Return type
None
- abstract merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')
Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.
- Parameters
source_table (astro.sql.table.Table) – Contains the rows to be merged to the target_table
target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be merged
source_to_target_columns_map (Dict[str, str]) – Dict of target_table columns names to source_table columns names
target_conflict_columns (List[str]) – List of cols where we expect to have a conflict while combining
if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.
- Return type
None
- get_sqla_table(table)
Return SQLAlchemy table instance
- Parameters
table (astro.sql.table.Table) – Astro Table to be converted to SQLAlchemy table instance
- Return type
sqlalchemy.sql.schema.Table
- export_table_to_pandas_dataframe(source_table)
Copy the content of a table to an in-memory Pandas dataframe.
- Parameters
source_table (astro.sql.table.Table) – An existing table in the database
- Return type
pandas.DataFrame
- export_table_to_file(source_table, target_file, if_exists='exception')
Copy the content of a table to a target file of supported type, in a supported location.
- Parameters
source_table (astro.sql.table.Table) – An existing table in the database
target_file (astro.files.File) – The path to the file to which we aim to dump the content of the database
if_exists (astro.constants.ExportExistsStrategy) – Overwrite file if exists. Default False
- Return type
None
- create_schema_if_needed(schema)
This function checks if the expected schema exists in the database. If the schema does not exist, it will attempt to create it.
- Parameters
schema (Optional[str]) – DB Schema - a namespace that contains named objects like (tables, functions, etc)
- Return type
None
- abstract schema_exists(schema)
Checks if a schema exists in the database
- Parameters
schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)
- Return type
bool
- get_sqlalchemy_template_table_identifier_and_parameter(table, jinja_table_identifier)
During the conversion from a Jinja-templated SQL query to a SQLAlchemy query, there is the need to convert a Jinja table identifier to a safe SQLAlchemy-compatible table identifier.
- For example, the query:
sql_statement = “SELECT * FROM {{input_table}};” parameters = {“input_table”: Table(name=”user_defined_table”, metadata=Metadata(schema=”some_schema”))}
- Can become (depending on the database):
“SELECT * FROM some_schema.user_defined_table;” parameters = {“input_table”: “user_defined_table”}
Since the table value is templated, there is a safety concern (e.g. SQL injection). We recommend looking into the documentation of the database and seeing what are the best practices. For example, Snowflake: https://docs.snowflake.com/en/sql-reference/identifier-literal.html
- Parameters
table (astro.sql.table.Table) – The table object we want to generate a safe table identifier for
jinja_table_identifier (str) – The name used within the Jinja template to represent this table
- Returns
value to replace the table identifier in the query and the value that should be used to replace it
- Return type
Tuple[str, str]