astro.databases.mssql

Module Contents

Classes

MssqlDatabase

Base class to represent all the Database interactions.

Functions

wrap_identifier(inp)

Attributes

DEFAULT_CONN_ID

astro.databases.mssql.DEFAULT_CONN_ID
class astro.databases.mssql.MssqlDatabase(conn_id=DEFAULT_CONN_ID, table=None, load_options=None)

Bases: astro.databases.base.BaseDatabase

Base class to represent all the Database interactions.

The goal is to be able to support new databases by adding a new module to the astro/databases directory, without the need of changing other modules and classes.

The exception is if the Airflow connection type does not match the new Database module name. In that case, we should update the dictionary CUSTOM_CONN_TYPE_TO_MODULE_PATH available at astro/databases/__init__.py.

Parameters:
  • conn_id (str) –

  • table (BaseTable | None) –

  • load_options (LoadOptions | None) –

property sql_type: str
Return type:

str

property default_metadata: astro.table.Metadata

Fill in default metadata values for table objects addressing mssql databases. Currently, Schema is not being fetched from airflow connection for Mssql because, in Mssql, databases and schema are different concepts The MssqlHook only exposes schema However, implementation-wise, it seems that if the MssqlHook receives a schema during initialization, but it uses it as a database in the connection to Mssql:

Return type:

astro.table.Metadata

DEFAULT_SCHEMA
hook()

Retrieve Airflow hook to interface with the mssql database.

Return type:

airflow.providers.microsoft.mssql.hooks.mssql.MsSqlHook

table_exists(table)

Check if a table exists in the database

Parameters:

table (astro.table.BaseTable) – Details of the table we want to check that exists

Return type:

bool

static get_table_qualified_name(table)

Return the table qualified name.

Parameters:

table (astro.table.BaseTable) – The table we want to retrieve the qualified name for.

Return type:

str

schema_exists(schema)

Checks if a schema exists in the database

Parameters:

schema – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type:

bool

is_autocommit_required(sql)

Checks if autocommit is required for a query

Parameters:

sql – sql query which needs to be checked for autocommit setting

Return type:

bool

run_sql(sql='', parameters=None, **kwargs)

Return the results to running a SQL statement. Whenever possible, this method should be implemented using Airflow Hooks, since this will simplify the integration with Async operators.

Parameters:
  • sql (str | ClauseElement) – Contains SQL query to be run against database

  • parameters (dict | None) – Optional parameters to be used to render the query

Return type:

sqlalchemy.engine.cursor.CursorResult

create_schema_if_needed(schema)

This function checks if the expected schema exists in the database. If the schema does not exist, it will attempt to create it.

Parameters:

schema (str | None) – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type:

None

create_table_from_select_statement(statement, target_table, parameters=None)

Export the result rows of a query statement into another table.

Parameters:
  • statement (str) – SQL query statement

  • target_table (astro.table.BaseTable) – Destination table where results will be recorded.

  • parameters (dict | None) – (Optional) parameters to be used to render the SQL query

Return type:

None

drop_table(table)

Delete a SQL table, if it exists.

Parameters:

table (astro.table.BaseTable) – The table to be deleted.

Return type:

None

fetch_all_rows(table, row_limit=-1)

Fetches all rows for a table and returns as a list. This is needed because some databases have different cursors that require different methods to fetch rows

Parameters:
  • row_limit (int) – Limit the number of rows returned, by default return all rows.

  • table (astro.table.BaseTable) – The table metadata needed to fetch the rows

Returns:

a list of rows

Return type:

list

load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters:
  • source_dataframe (pandas.DataFrame) – Local or remote filepath

  • target_table (astro.table.BaseTable) – Table in which the file will be loaded

  • if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type:

None

merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')

Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.

Parameters:
  • source_table (astro.table.BaseTable) – Contains the rows to be merged to the target_table

  • target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be merged

  • source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names

  • target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.

Return type:

None

static get_merge_initialization_query(parameters)

Handles database-specific logic to handle constraints, keeping it agnostic to database.

Parameters:

parameters (tuple) –

Return type:

str

astro.databases.mssql.wrap_identifier(inp)
Parameters:

inp (str) –

Return type:

str