astro.databases.databricks.delta

Module Contents

Classes

DeltaDatabase

Base class to represent all the Database interactions.

class astro.databases.databricks.delta.DeltaDatabase(conn_id, table=None, load_options=None)

Bases: astro.databases.base.BaseDatabase

Base class to represent all the Database interactions.

The goal is to be able to support new databases by adding a new module to the astro/databases directory, without the need of changing other modules and classes.

The exception is if the Airflow connection type does not match the new Database module name. In that case, we should update the dictionary CUSTOM_CONN_TYPE_TO_MODULE_PATH available at astro/databases/__init__.py.

Parameters:
property api_client: databricks_cli.sdk.api_client.ApiClient

Returns the databricks API client. Used for interacting with databricks services like DBFS, Jobs, etc.

Returns:

A databricks ApiClient

Return type:

databricks_cli.sdk.api_client.ApiClient

property sql_type
property hook: airflow.providers.databricks.hooks.databricks_sql.DatabricksSqlHook

Return the hook for the relevant databricks conn_id

Returns:

a DatabricksSqlHook with metadata

Return type:

airflow.providers.databricks.hooks.databricks_sql.DatabricksSqlHook

abstract property sqlalchemy_engine: sqlalchemy.engine.base.Engine

Return Sqlalchemy engine.

Return type:

sqlalchemy.engine.base.Engine

property default_metadata: astro.table.Metadata

Extract the metadata available within the Airflow connection associated with self.conn_id.

Returns:

a Metadata instance

Return type:

astro.table.Metadata

LOAD_OPTIONS_CLASS_NAME = ('DeltaLoadOptions',)
populate_table_metadata(table)

Given a table, populates the “metadata” field with what we would consider as “defaults” These defaults are determined based on environment variables and the connection settings.

Parameters:

table (astro.table.BaseTable) – table to be populated

Returns:

Return type:

astro.table.BaseTable

abstract create_table_using_native_schema_autodetection(table, file)

Create a SQL table, automatically inferring the schema using the given file via native database support.

Parameters:
  • table (astro.table.BaseTable) – The table to be created.

  • file (astro.files.File) – File used to infer the new table columns.

Return type:

None

schema_exists(schema)

Checks if a schema exists in the database

Parameters:

schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type:

bool

create_schema_if_applicable(schema, assume_exists=ASSUME_SCHEMA_EXISTS)

This function checks if the expected schema exists in the database. If the schema does not exist, it will attempt to create it.

Parameters:
  • schema (str | None) – DB Schema - a namespace that contains named objects like (tables, functions, etc)

  • assume_exists (bool) – If assume exists is True, does not check or attempt to create the schema

Return type:

None

fetch_all_rows(table, row_limit=-1)

Fetches all rows for a table and returns as a list. This is needed because some databases have different cursors that require different methods to fetch rows

Parameters:
  • table (astro.table.BaseTable) – The table metadata needed to fetch the rows

  • row_limit (int) – Limit the number of rows returned, by default return all rows.

Returns:

a list of rows

Return type:

list

load_file_to_table(input_file, output_table, normalize_config=None, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE, use_native_support=True, native_support_kwargs=None, columns_names_capitalization='original', enable_native_fallback=None, assume_schema_exists=ASSUME_SCHEMA_EXISTS, databricks_job_name='', **kwargs)

Load content of multiple files in output_table. Multiple files are sourced from the file path, which can also be path pattern.

Parameters:
  • databricks_job_name (str) – Create a consistent job name so that we don’t litter the databricks job screen. This should be <dag_id>_<task_id>

  • input_file (astro.files.File) – File path and conn_id for object stores

  • output_table (astro.table.BaseTable) – Table to create

  • if_exists (astro.constants.LoadExistStrategy) – Overwrite file if exists

  • chunk_size (int) – Specify the number of records in each batch to be written at a time

  • use_native_support (bool) – Use native support for data transfer if available on the destination

  • normalize_config (dict | None) – pandas json_normalize params config

  • native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow

  • columns_names_capitalization (astro.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe

  • enable_native_fallback (bool | None) – Use enable_native_fallback=True to fall back to default transfer

  • assume_schema_exists (bool) – If True, skips check to see if output_table schema exists

openlineage_dataset_name(table)

Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

Parameters:

table (astro.table.BaseTable) –

Return type:

str

openlineage_dataset_namespace()

Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

Return type:

str

openlineage_dataset_uri(table)

Returns the open lineage dataset uri as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

Parameters:

table (astro.table.BaseTable) –

Return type:

str

create_table_from_select_statement(statement, target_table, parameters=None, query_modifier=QueryModifier())

Create a Delta table from a SQL SELECT statement.

Parameters:
  • statement (str) – Statement that will return a table

  • target_table (astro.table.BaseTable) – The table which the result of the SQL statement will be placed

  • parameters (dict | None) – Parameters to pass to databricks

Returns:

None

Return type:

None

parameterize_variable(variable)

Parameterize a variable in a way that the Databricks SQL API can recognize. :param variable: Variable to parameterize :return: The number of rows in the table

Parameters:

variable (str) –

Return type:

str

row_count(table)

Returns the number of rows in a table.

Parameters:

table (astro.table.BaseTable) – table to count

Returns:

The number of rows in the table

run_sql(sql='', parameters=None, handler=None, query_modifier=QueryModifier(), **kwargs)

Run SQL against a delta table using spark SQL.

Parameters:
  • query_modifier (astro.query_modifier.QueryModifier) –

  • sql (str | sqlalchemy.sql.ClauseElement) – SQL Query to run on delta table

  • parameters (dict | None) – parameters to pass to delta

  • handler (Callable | None) – function that takes in a databricks cursor as an argument.

  • kwargs

Returns:

None if there is no handler, otherwise return result of handler function

Return type:

Any

table_exists(table)

Queries databricks to check if a table exists.

Since the databricks SQL API returns an exception if the table does not exist we look out for the relevant exception. :param table: Table that may or may not exist :return: True if the table exists, false if it does not

Parameters:

table (astro.table.BaseTable) –

Return type:

bool

create_table_using_columns(table)

Create a SQL table using the table columns provided by the user.

Parameters:

table (astro.table.BaseTable) – The table to be created.

Return type:

None

append_table(source_table, target_table, source_to_target_columns_map)

Append the source table rows into a destination table.

Parameters:
  • source_table (astro.table.BaseTable) – Contains the rows to be appended to the target_table

  • target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be appended

  • source_to_target_columns_map (dict[str, str]) – Dict of source_table columns names to target_table columns names

Return type:

None

export_table_to_pandas_dataframe(source_table, select_kwargs=None)

Converts a delta table into a pandas dataframe that can be processed locally.

Please note that this is a local pandas dataframe and not a spark dataframe. Be careful of the size of dataframe you return. :param source_table: Delta table to convert to dataframe :param select_kwargs: Unused in this function :return:

Parameters:
  • source_table (astro.table.BaseTable) –

  • select_kwargs (dict | None) –

Return type:

pandas.DataFrame

load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters:
  • source_dataframe (pandas.DataFrame) – Local or remote filepath

  • target_table (astro.table.BaseTable) – Table in which the file will be loaded

  • if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type:

None

static get_merge_initialization_query(parameters)

Handles database-specific logic to handle constraints for Delta. setting constraints is not required for merging tables in Delta so we only return “1”

Parameters:

parameters (tuple) –

Return type:

str

merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')

Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.

Parameters:
  • source_table (astro.table.BaseTable) – Contains the rows to be merged to the target_table

  • target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be merged

  • source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names

  • target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.