astro.databases.databricks.delta
Module Contents
Classes
Base class to represent all the Database interactions. |
- class astro.databases.databricks.delta.DeltaDatabase(conn_id, table=None, load_options=None)
Bases:
astro.databases.base.BaseDatabase
Base class to represent all the Database interactions.
The goal is to be able to support new databases by adding a new module to the astro/databases directory, without the need of changing other modules and classes.
The exception is if the Airflow connection type does not match the new Database module name. In that case, we should update the dictionary CUSTOM_CONN_TYPE_TO_MODULE_PATH available at astro/databases/__init__.py.
- Parameters
conn_id (str) –
table (BaseTable | None) –
load_options (LoadOptions | None) –
- property api_client: databricks_cli.sdk.api_client.ApiClient
Returns the databricks API client. Used for interacting with databricks services like DBFS, Jobs, etc.
- Returns
A databricks ApiClient
- Return type
databricks_cli.sdk.api_client.ApiClient
- property sql_type
- property hook: airflow.providers.databricks.hooks.databricks_sql.DatabricksSqlHook
Return the hook for the relevant databricks conn_id
- Returns
a DatabricksSqlHook with metadata
- Return type
airflow.providers.databricks.hooks.databricks_sql.DatabricksSqlHook
- abstract property sqlalchemy_engine: sqlalchemy.engine.base.Engine
Return Sqlalchemy engine.
- Return type
sqlalchemy.engine.base.Engine
- property default_metadata: astro.table.Metadata
Extract the metadata available within the Airflow connection associated with self.conn_id.
- Returns
a Metadata instance
- Return type
- LOAD_OPTIONS_CLASS_NAME = 'DeltaLoadOptions'
- populate_table_metadata(table)
Given a table, populates the “metadata” field with what we would consider as “defaults” These defaults are determined based on environment variables and the connection settings.
- Parameters
table (astro.table.BaseTable) – table to be populated
- Returns
- Return type
astro.table.BaseTable
- abstract create_table_using_native_schema_autodetection(table, file)
Create a SQL table, automatically inferring the schema using the given file via native database support.
- Parameters
table (astro.table.BaseTable) – The table to be created.
file (astro.files.File) – File used to infer the new table columns.
- Return type
None
- schema_exists(schema)
Checks if a schema exists in the database
- Parameters
schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)
- Return type
bool
- create_schema_if_needed(schema)
This function checks if the expected schema exists in the database. If the schema does not exist, it will attempt to create it.
- Parameters
schema (str | None) – DB Schema - a namespace that contains named objects like (tables, functions, etc)
- Return type
None
- fetch_all_rows(table, row_limit=-1)
Fetches all rows for a table and returns as a list. This is needed because some databases have different cursors that require different methods to fetch rows
- Parameters
table (astro.table.BaseTable) – The table metadata needed to fetch the rows
row_limit (int) – Limit the number of rows returned, by default return all rows.
- Returns
a list of rows
- Return type
list
- load_file_to_table(input_file, output_table, normalize_config=None, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE, use_native_support=True, native_support_kwargs=None, columns_names_capitalization='original', enable_native_fallback=None, databricks_job_name='', **kwargs)
Load content of multiple files in output_table. Multiple files are sourced from the file path, which can also be path pattern.
- Parameters
databricks_job_name (str) – Create a consistent job name so that we don’t litter the databricks job screen. This should be <dag_id>_<task_id>
input_file (astro.files.File) – File path and conn_id for object stores
output_table (astro.table.BaseTable) – Table to create
if_exists (astro.constants.LoadExistStrategy) – Overwrite file if exists
chunk_size (int) – Specify the number of records in each batch to be written at a time
use_native_support (bool) – Use native support for data transfer if available on the destination
normalize_config (dict | None) – pandas json_normalize params config
native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow
columns_names_capitalization (astro.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe
enable_native_fallback (bool | None) – Use enable_native_fallback=True to fall back to default transfer
- openlineage_dataset_name(table)
Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
- Parameters
table (astro.table.BaseTable) –
- Return type
str
- openlineage_dataset_namespace()
Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
- Return type
str
- create_table_from_select_statement(statement, target_table, parameters=None)
Create a Delta table from a SQL SELECT statement.
- Parameters
statement (str) – Statement that will return a table
target_table (astro.table.BaseTable) – The table which the result of the SQL statement will be placed
parameters (dict | None) – Parameters to pass to databricks
- Returns
None
- Return type
None
- parameterize_variable(variable)
Parameterize a variable in a way that the Databricks SQL API can recognize. :param variable: Variable to parameterize :return: The number of rows in the table
- Parameters
variable (str) –
- Return type
str
- row_count(table)
Returns the number of rows in a table.
- Parameters
table (astro.table.BaseTable) – table to count
- Returns
The number of rows in the table
- run_sql(sql='', parameters=None, handler=None, **kwargs)
Run SQL against a delta table using spark SQL.
- Parameters
sql (str | ClauseElement) – SQL Query to run on delta table
parameters (dict | None) – parameters to pass to delta
handler – function that takes in a databricks cursor as an argument.
kwargs –
- Returns
None if there is no handler, otherwise return result of handler function
- table_exists(table)
Queries databricks to check if a table exists.
Since the databricks SQL API returns an exception if the table does not exist we look out for the relevant exception. :param table: Table that may or may not exist :return: True if the table exists, false if it does not
- Parameters
table (astro.table.BaseTable) –
- Return type
bool
- create_table_using_columns(table)
Create a SQL table using the table columns provided by the user.
- Parameters
table (astro.table.BaseTable) – The table to be created.
- Return type
None
- append_table(source_table, target_table, source_to_target_columns_map)
Append the source table rows into a destination table.
- Parameters
source_table (astro.table.BaseTable) – Contains the rows to be appended to the target_table
target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be appended
source_to_target_columns_map (dict[str, str]) – Dict of source_table columns names to target_table columns names
- Return type
None
- export_table_to_pandas_dataframe(source_table, select_kwargs=None)
Converts a delta table into a pandas dataframe that can be processed locally.
Please note that this is a local pandas dataframe and not a spark dataframe. Be careful of the size of dataframe you return. :param source_table: Delta table to convert to dataframe :param select_kwargs: Unused in this function :return:
- Parameters
source_table (astro.table.BaseTable) –
select_kwargs (dict | None) –
- Return type
pandas.DataFrame
- load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.
- Parameters
source_dataframe (pandas.DataFrame) – Local or remote filepath
target_table (astro.table.BaseTable) – Table in which the file will be loaded
if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.
chunk_size (int) – Specify the number of rows in each batch to be written at a time.
- Return type
None
- static get_merge_initialization_query(parameters)
Handles database-specific logic to handle constraints for Delta. setting constraints is not required for merging tables in Delta so we only return “1”
- Parameters
parameters (tuple) –
- Return type
str
- merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')
Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.
- Parameters
source_table (astro.table.BaseTable) – Contains the rows to be merged to the target_table
target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be merged
source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names
target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining
if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.