astro.databases.snowflake

Snowflake database implementation.

Module Contents

Functions

wrap_identifier(inp)

is_valid_snow_identifier(name)

Because Snowflake does not allow using Identifier for inserts or updates, we need to make reasonable attempts to

ensure_internal_quotes_closed(name)

ensure_only_valid_characters(name)

Classes

SnowflakeDatabase

Handle interactions with snowflake databases. If this class is successful, we should not have any snowflake-specific

Attributes

DEFAULT_CONN_ID

astro.databases.snowflake.wrap_identifier(inp)
Parameters

inp (str) –

Return type

str

astro.databases.snowflake.is_valid_snow_identifier(name)

Because Snowflake does not allow using Identifier for inserts or updates, we need to make reasonable attempts to ensure that no one can perform a SQL injection using this method. The following method ensures that a string follows the expected identifier syntax https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html

Parameters

name (str) –

Return type

bool

astro.databases.snowflake.ensure_internal_quotes_closed(name)
Parameters

name (str) –

Return type

bool

astro.databases.snowflake.ensure_only_valid_characters(name)
Parameters

name (str) –

Return type

bool

astro.databases.snowflake.DEFAULT_CONN_ID
class astro.databases.snowflake.SnowflakeDatabase(conn_id=DEFAULT_CONN_ID)

Bases: astro.databases.base.BaseDatabase

Handle interactions with snowflake databases. If this class is successful, we should not have any snowflake-specific logic in other parts of our code-base.

Parameters

conn_id (str) –

property hook

Retrieve Airflow hook to interface with the snowflake database.

Return type

airflow.providers.snowflake.hooks.snowflake.SnowflakeHook

property sql_type
Return type

str

property default_metadata

Fill in default metadata values for table objects addressing snowflake databases

Return type

astro.sql.table.Metadata

static get_table_qualified_name(table)

Return table qualified name. In Snowflake, it is the database, schema and table

Parameters

table (astro.sql.table.Table) – The table we want to retrieve the qualified name for.

Return type

str

load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters
  • source_dataframe (pandas.DataFrame) – Local or remote filepath

  • target_table (astro.sql.table.Table) – Table in which the file will be loaded

  • if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type

None

get_sqlalchemy_template_table_identifier_and_parameter(table, jinja_table_identifier)

During the conversion from a Jinja-templated SQL query to a SQLAlchemy query, there is the need to convert a Jinja table identifier to a safe SQLAlchemy-compatible table identifier.

For Snowflake, the query:

sql_statement = “SELECT * FROM {{input_table}};” parameters = {“input_table”: Table(name=”user_defined_table”, metadata=Metadata(schema=”some_schema”))}

Will become

“SELECT * FROM IDENTIFIER(:input_table);” parameters = {“input_table”: “some_schema.user_defined_table”}

Example of usage:
jinja_table_identifier, jinja_table_parameter_value = get_sqlalchemy_template_table_identifier_and_parameter(

Table(name=”user_defined_table”, metadata=Metadata(schema=”some_schema”), “input_table”

)

assert jinja_table_identifier == “IDENTIFIER(:input_table)” assert jinja_table_parameter_value == “some_schema.user_defined_table”

Since the table value is templated, there is a safety concern (e.g. SQL injection). We recommend looking into the documentation of the database and seeing what are the best practices. This is the Snowflake documentation: https://docs.snowflake.com/en/sql-reference/identifier-literal.html

Parameters
  • table (astro.sql.table.Table) – The table object we want to generate a safe table identifier for

  • jinja_table_identifier (str) – The name used within the Jinja template to represent this table

Returns

value to replace the table identifier in the query and the value that should be used to replace it

Return type

Tuple[str, str]

schema_exists(schema)

Checks if a schema exists in the database

Parameters

schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type

bool

merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')

Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.

Parameters
  • source_table (astro.sql.table.Table) – Contains the rows to be merged to the target_table

  • target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be merged

  • source_to_target_columns_map (Dict[str, str]) – Dict of target_table columns names to source_table columns names

  • target_conflict_columns (List[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.

Return type

None

_build_merge_sql(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')

Build the SQL statement for Merge operation

Parameters
  • source_table (astro.sql.table.Table) –

  • target_table (astro.sql.table.Table) –

  • source_to_target_columns_map (Dict[str, str]) –

  • target_conflict_columns (List[str]) –

  • if_conflicts (astro.constants.MergeConflictStrategy) –