astro.databases.base
Module Contents
Classes
Base class to represent all the Database interactions. |
- class astro.databases.base.BaseDatabase(conn_id, table=None, load_options=None)
Bases:
abc.ABC
Base class to represent all the Database interactions.
The goal is to be able to support new databases by adding a new module to the astro/databases directory, without the need of changing other modules and classes.
The exception is if the Airflow connection type does not match the new Database module name. In that case, we should update the dictionary CUSTOM_CONN_TYPE_TO_MODULE_PATH available at astro/databases/__init__.py.
- Parameters:
conn_id (str) –
table (astro.table.BaseTable | None) –
load_options (astro.options.LoadOptions | None) –
- abstract property sql_type
- property connection: sqlalchemy.engine.base.Connection
Return a Sqlalchemy connection object for the given database.
- Return type:
sqlalchemy.engine.base.Connection
- abstract property default_metadata: astro.table.Metadata
Extract the metadata available within the Airflow connection associated with self.conn_id.
- Returns:
a Metadata instance
- Return type:
- illegal_column_name_chars: list[str] = []
- illegal_column_name_chars_replacement: list[str] = []
- NATIVE_PATHS: dict[Any, Any]
- DEFAULT_SCHEMA
- NATIVE_LOAD_EXCEPTIONS: Any
- NATIVE_AUTODETECT_SCHEMA_CONFIG: Mapping[astro.constants.FileLocation, Mapping[str, list[astro.constants.FileType] | Callable]]
- FILE_PATTERN_BASED_AUTODETECT_SCHEMA_SUPPORTED: set[astro.constants.FileLocation]
- abstract hook()
Return an instance of the database-specific Airflow hook.
- Return type:
airflow.hooks.dbapi.DbApiHook
- sqlalchemy_engine()
Return Sqlalchemy engine.
- Return type:
sqlalchemy.engine.base.Engine
- run_single_sql_query(sql='', parameters=None)
Return the results to running a SQL statement.
Whenever possible, this method should be implemented using Airflow Hooks, since this will simplify the integration with Async operators.
- Parameters:
sql (str | sqlalchemy.sql.ClauseElement) – Contains SQL query to be run against database
parameters (dict | None) – Optional parameters to be used to render the query
- Return type:
Any
- run_sql(sql='', parameters=None, handler=None, query_modifier=QueryModifier(), **kwargs)
Return the results to running a SQL statement.
Whenever possible, this method should be implemented using Airflow Hooks, since this will simplify the integration with Async operators.
- Parameters:
query_modifier (astro.query_modifier.QueryModifier) – a query modifier that informs the pre and post query queries.
sql (str | sqlalchemy.sql.ClauseElement) – Contains SQL query to be run against database
parameters (dict | None) – Optional parameters to be used to render the query
autocommit – Optional autocommit flag
handler (Callable | None) – function that takes in a cursor as an argument.
- Return type:
Any
- columns_exist(table, columns)
Check that a list of columns exist in the given table.
- Parameters:
table (astro.table.BaseTable) – The table to check in.
columns (list[str]) – The columns to check.
- Returns:
whether the columns exist in the table or not.
- Return type:
bool
- table_exists(table)
Check if a table exists in the database.
- Parameters:
table (astro.table.BaseTable) – Details of the table we want to check that exists
- Return type:
bool
- static get_merge_initialization_query(parameters)
Handles database-specific logic to handle constraints, keeping it agnostic to database.
- Parameters:
parameters (tuple) –
- Return type:
str
- static get_table_qualified_name(table)
Return table qualified name. This is Database-specific. For instance, in Sqlite this is the table name. In Snowflake, however, it is the database, schema and table
- Parameters:
table (astro.table.BaseTable) – The table we want to retrieve the qualified name for.
- Return type:
str
- populate_table_metadata(table)
Given a table, check if the table has metadata. If the metadata is missing, and the database has metadata, assign it to the table. If the table schema was not defined by the end, retrieve the user-defined schema. This method performs the changes in-place and also returns the table.
- Parameters:
table (astro.table.BaseTable) – Table to potentially have their metadata changed
- Return table:
Return the modified table
- Return type:
astro.table.BaseTable
- create_table_using_columns(table)
Create a SQL table using the table columns.
- Parameters:
table (astro.table.BaseTable) – The table to be created.
- Return type:
None
- abstract create_table_using_native_schema_autodetection(table, file)
Create a SQL table, automatically inferring the schema using the given file via native database support.
- Parameters:
table (astro.table.BaseTable) – The table to be created.
file (astro.files.File) – File used to infer the new table columns.
- Return type:
None
- create_table_using_schema_autodetection(table, file=None, dataframe=None, columns_names_capitalization='original')
Create a SQL table, automatically inferring the schema using the given file.
- Parameters:
table (astro.table.BaseTable) – The table to be created.
file (astro.files.File | None) – File used to infer the new table columns.
dataframe (pandas.DataFrame | None) – Dataframe used to infer the new table columns if there is no file
columns_names_capitalization (astro.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe
- Return type:
None
- is_native_autodetect_schema_available(file)
Check if native auto detection of schema is available.
- Parameters:
file (astro.files.File) – File used to check the file type of to decide whether there is a native auto detection available for it.
- Return type:
bool
- create_table(table, file=None, dataframe=None, columns_names_capitalization='original', use_native_support=True)
Create a table either using its explicitly defined columns or inferring it’s columns from a given file.
- Parameters:
table (astro.table.BaseTable) – The table to be created
file (astro.files.File | None) – (optional) File used to infer the table columns.
dataframe (pandas.DataFrame | None) – (optional) Dataframe used to infer the new table columns if there is no file
columns_names_capitalization (astro.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe
use_native_support (bool) –
- Return type:
None
- create_table_from_select_statement(statement, target_table, parameters=None, query_modifier=QueryModifier())
Export the result rows of a query statement into another table.
- Parameters:
query_modifier (astro.query_modifier.QueryModifier) – a query modifier that informs the pre and post query queries.
statement (str) – SQL query statement
target_table (astro.table.BaseTable) – Destination table where results will be recorded.
parameters (dict | None) – (Optional) parameters to be used to render the SQL query
- Return type:
None
- drop_table(table)
Delete a SQL table, if it exists.
- Parameters:
table (astro.table.BaseTable) – The table to be deleted.
- Return type:
None
- create_table_if_needed(table, file, normalize_config=None, columns_names_capitalization='original', if_exists='replace', use_native_support=True)
Checks if the autodetect schema exists for native support else creates the schema and table :param table: Table to create :param file: File path and conn_id for object stores :param normalize_config: pandas json_normalize params config :param columns_names_capitalization: determines whether to convert all columns to lowercase/uppercase :param if_exists: Overwrite file if exists :param use_native_support: Use native support for data transfer if available on the destination
- Parameters:
table (astro.table.BaseTable) –
file (astro.files.File) –
normalize_config (dict | None) –
columns_names_capitalization (astro.constants.ColumnCapitalization) –
if_exists (astro.constants.LoadExistStrategy) –
use_native_support (bool) –
- fetch_all_rows(table, row_limit=-1)
Fetches all rows for a table and returns as a list. This is needed because some databases have different cursors that require different methods to fetch rows
- Parameters:
row_limit (int) – Limit the number of rows returned, by default return all rows.
table (astro.table.BaseTable) – The table metadata needed to fetch the rows
- Returns:
a list of rows
- Return type:
Any
- static check_for_minio_connection(input_file)
Automatically check if the connection is minio or S3
- Parameters:
input_file (astro.files.File) –
- Return type:
bool
- load_file_to_table(input_file, output_table, normalize_config=None, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE, use_native_support=True, native_support_kwargs=None, columns_names_capitalization='original', enable_native_fallback=LOAD_FILE_ENABLE_NATIVE_FALLBACK, assume_schema_exists=ASSUME_SCHEMA_EXISTS, **kwargs)
Load content of multiple files in output_table. Multiple files are sourced from the file path, which can also be path pattern.
- Parameters:
input_file (astro.files.File) – File path and conn_id for object stores
output_table (astro.table.BaseTable) – Table to create
if_exists (astro.constants.LoadExistStrategy) – Overwrite file if exists
chunk_size (int) – Specify the number of records in each batch to be written at a time
use_native_support (bool) – Use native support for data transfer if available on the destination
normalize_config (dict | None) – pandas json_normalize params config
native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow
columns_names_capitalization (astro.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe
enable_native_fallback (bool | None) – Use enable_native_fallback=True to fall back to default transfer
assume_schema_exists (bool) – If True, do not check if the output table schema it exists or attempt to create it
- static get_dataframe_from_file(file)
Get pandas dataframe file. We need export_to_dataframe() for Biqqery,Snowflake and Redshift except for Postgres. For postgres we are overriding this method and using export_to_dataframe_via_byte_stream(). export_to_dataframe_via_byte_stream copies files in a buffer and then use that buffer to ingest data. With this approach we have significant performance boost for postgres.
- Parameters:
file (astro.files.File) – File path and conn_id for object stores
- load_file_to_table_using_pandas(input_file, output_table, normalize_config=None, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
- Parameters:
input_file (astro.files.File) –
output_table (astro.table.BaseTable) –
normalize_config (dict | None) –
if_exists (astro.constants.LoadExistStrategy) –
chunk_size (int) –
- load_file_to_table_natively_with_fallback(source_file, target_table, if_exists='replace', normalize_config=None, native_support_kwargs=None, enable_native_fallback=LOAD_FILE_ENABLE_NATIVE_FALLBACK, chunk_size=DEFAULT_CHUNK_SIZE, **kwargs)
Load content of a file in output_table.
- Parameters:
source_file (astro.files.File) – File path and conn_id for object stores
target_table (astro.table.BaseTable) – Table to create
if_exists (astro.constants.LoadExistStrategy) – Overwrite file if exists
chunk_size (int) – Specify the number of records in each batch to be written at a time
native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow
enable_native_fallback (bool | None) – Use enable_native_fallback=True to fall back to default transfer
normalize_config (dict | None) – pandas json_normalize params config
- load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.
- Parameters:
source_dataframe (pandas.DataFrame) – Local or remote filepath
target_table (astro.table.BaseTable) – Table in which the file will be loaded
if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.
chunk_size (int) – Specify the number of rows in each batch to be written at a time.
- Return type:
None
- append_table(source_table, target_table, source_to_target_columns_map)
Append the source table rows into a destination table.
- Parameters:
source_table (astro.table.BaseTable) – Contains the rows to be appended to the target_table
target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be appended
source_to_target_columns_map (dict[str, str]) – Dict of source_table columns names to target_table columns names
- Return type:
None
- abstract merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')
Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.
- Parameters:
source_table (astro.table.BaseTable) – Contains the rows to be merged to the target_table
target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be merged
source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names
target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining
if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.
- Return type:
None
- get_sqla_table(table)
Return SQLAlchemy table instance
- Parameters:
table (astro.table.BaseTable) – Astro Table to be converted to SQLAlchemy table instance
- Return type:
sqlalchemy.sql.schema.Table
- export_table_to_pandas_dataframe(source_table, select_kwargs=None)
Copy the content of a table to an in-memory Pandas dataframe.
- Parameters:
source_table (astro.table.BaseTable) – An existing table in the database
select_kwargs (dict | None) – kwargs for select statement
- Return type:
pandas.DataFrame
- export_table_to_file(source_table, target_file, if_exists='exception')
Copy the content of a table to a target file of supported type, in a supported location.
- Parameters:
source_table (astro.table.BaseTable) – An existing table in the database
target_file (astro.files.File) – The path to the file to which we aim to dump the content of the database
if_exists (astro.constants.ExportExistsStrategy) – Overwrite file if exists. Default False
- Return type:
None
- create_schema_if_applicable(schema, assume_exists=ASSUME_SCHEMA_EXISTS)
This function checks if the expected schema exists in the database. If the schema does not exist, it will attempt to create it.
- Parameters:
schema (str | None) – DB Schema - a namespace that contains named objects like (tables, functions, etc)
assume_exists (bool) – If assume exists is True, does not check or attempt to create the schema
- Return type:
None
- abstract schema_exists(schema)
Checks if a schema exists in the database
- Parameters:
schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)
- Return type:
bool
- get_sqlalchemy_template_table_identifier_and_parameter(table, jinja_table_identifier)
During the conversion from a Jinja-templated SQL query to a SQLAlchemy query, there is the need to convert a Jinja table identifier to a safe SQLAlchemy-compatible table identifier.
- For example, the query:
sql = “SELECT * FROM {{input_table}};” parameters = {“input_table”: Table(name=”user_defined_table”, metadata=Metadata(schema=”some_schema”))}
- Can become (depending on the database):
“SELECT * FROM some_schema.user_defined_table;” parameters = {“input_table”: “user_defined_table”}
Since the table value is templated, there is a safety concern (e.g. SQL injection). We recommend looking into the documentation of the database and seeing what are the best practices. For example, Snowflake: https://docs.snowflake.com/en/sql-reference/identifier-literal.html
- Parameters:
table (astro.table.BaseTable) – The table object we want to generate a safe table identifier for
jinja_table_identifier (str) – The name used within the Jinja template to represent this table
- Returns:
value to replace the table identifier in the query and the value that should be used to replace it
- Return type:
tuple[str, str]
- row_count(table)
Returns the number of rows in a table.
- Parameters:
table (astro.table.BaseTable) – table to count
- Returns:
The number of rows in the table
- parameterize_variable(variable)
While most databases use sqlalchemy, we want to open up how we parameterize variables for databases that a) do not use sqlalchemy and b) have different parameterization schemes (namely delta).
- Parameters:
variable (str) – The variable to parameterize.
- Returns:
Variable with proper parameters
- is_native_load_file_available(source_file, target_table)
Check if there is an optimised path for source to destination.
- Parameters:
source_file (astro.files.File) – File from which we need to transfer data
target_table (astro.table.BaseTable) – Table that needs to be populated with file data
- Return type:
bool
- abstract load_file_to_table_natively(source_file, target_table, if_exists='replace', native_support_kwargs=None, **kwargs)
Checks if optimised path for transfer between File location to database exists and if it does, it transfers it and returns true else false
- Parameters:
source_file (astro.files.File) – File from which we need to transfer data
target_table (astro.table.BaseTable) – Table that needs to be populated with file data
if_exists (astro.constants.LoadExistStrategy) – Overwrite file if exists. Default False
native_support_kwargs (dict | None) – kwargs to be used by native loading command
- check_schema_autodetection_is_supported(source_file)
Checks if schema autodetection is handled natively by the database
- Parameters:
source_file (astro.files.File) – File from which we need to transfer data
- Return type:
bool
- check_file_pattern_based_schema_autodetection_is_supported(source_file)
Checks if schema autodetection is handled natively by the database for file patterns and prefixes.
- Parameters:
source_file (astro.files.File) – File from which we need to transfer data
- Return type:
bool
- abstract openlineage_dataset_name(table)
Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
- Parameters:
table (astro.table.BaseTable) –
- Return type:
str
- abstract openlineage_dataset_namespace()
Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
- Return type:
str
- abstract openlineage_dataset_uri(table)
Returns the open lineage dataset uri as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
- Parameters:
table (astro.table.BaseTable) –
- Return type:
str