astro.databases.snowflake

Snowflake database implementation.

Module Contents

Classes

SnowflakeFileFormat

Dataclass which abstracts properties of a Snowflake File Format.

SnowflakeStage

Dataclass which abstracts properties of a Snowflake Stage.

SnowflakeDatabase

Handle interactions with snowflake databases. If this class is successful, we should not have any snowflake-specific

Functions

wrap_identifier(inp)

is_valid_snow_identifier(name)

Because Snowflake does not allow using Identifier for inserts or updates,

ensure_internal_quotes_closed(name)

ensure_only_valid_characters(name)

Attributes

DEFAULT_CONN_ID

ASTRO_SDK_TO_SNOWFLAKE_FILE_FORMAT_MAP

DEFAULT_STORAGE_INTEGRATION

NATIVE_LOAD_SUPPORTED_FILE_TYPES

NATIVE_LOAD_SUPPORTED_FILE_LOCATIONS

NATIVE_AUTODETECT_SCHEMA_SUPPORTED_FILE_TYPES

NATIVE_AUTODETECT_SCHEMA_SUPPORTED_FILE_LOCATIONS

COPY_INTO_COMMAND_FAIL_STATUS

astro.databases.snowflake.DEFAULT_CONN_ID
astro.databases.snowflake.ASTRO_SDK_TO_SNOWFLAKE_FILE_FORMAT_MAP
astro.databases.snowflake.DEFAULT_STORAGE_INTEGRATION
astro.databases.snowflake.NATIVE_LOAD_SUPPORTED_FILE_TYPES = ()
astro.databases.snowflake.NATIVE_LOAD_SUPPORTED_FILE_LOCATIONS = ()
astro.databases.snowflake.NATIVE_AUTODETECT_SCHEMA_SUPPORTED_FILE_TYPES
astro.databases.snowflake.NATIVE_AUTODETECT_SCHEMA_SUPPORTED_FILE_LOCATIONS
astro.databases.snowflake.COPY_INTO_COMMAND_FAIL_STATUS = 'LOAD_FAILED'
class astro.databases.snowflake.SnowflakeFileFormat

Dataclass which abstracts properties of a Snowflake File Format.

Snowflake File Formats are used to define the format of files stored in a stage.

Example:

snowflake_stage = SnowflakeFileFormat(
    name="file_format",
    file_type="PARQUET",
)
name: str = ''
Return type:

str

file_type: str = ''
set_file_type_from_file(file)

Set Snowflake specific file format based on a given file.

Parameters:

file (astro.files.File) – File to use for file type mapping.

Return type:

None

class astro.databases.snowflake.SnowflakeStage

Dataclass which abstracts properties of a Snowflake Stage.

Snowflake Stages are used to loading tables and unloading data from tables into files.

Example:

snowflake_stage = SnowflakeStage(
    name="stage_name",
    url="gcs://bucket/prefix",
    metadata=Metadata(database="SNOWFLAKE_DATABASE", schema="SNOWFLAKE_SCHEMA"),
)
property qualified_name: str

Return stage qualified name. In Snowflake, it is the database, schema and table

Returns:

Snowflake stage qualified name (e.g. database.schema.table)

Return type:

str

name: str = ''
Return type:

str

url: str = ''
metadata: astro.table.Metadata
set_url_from_file(file)

Given a file to be loaded/unloaded to from Snowflake, identifies its folder and sets as self.url.

It is also responsible for adjusting any path specific requirements for Snowflake.

Parameters:

file (astro.files.File) – File to be loaded/unloaded to from Snowflake

Return type:

None

class astro.databases.snowflake.SnowflakeDatabase(conn_id=DEFAULT_CONN_ID, table=None, load_options=None)

Bases: astro.databases.base.BaseDatabase

Handle interactions with snowflake databases. If this class is successful, we should not have any snowflake-specific logic in other parts of our code-base.

Parameters:
property sql_type: str
Return type:

str

property default_metadata: astro.table.Metadata

Fill in default metadata values for table objects addressing snowflake databases

Return type:

astro.table.Metadata

LOAD_OPTIONS_CLASS_NAME = ('SnowflakeLoadOptions',)
NATIVE_LOAD_EXCEPTIONS: Any = ()
DEFAULT_SCHEMA
METADATA_COLUMNS_DATATYPE
hook()

Retrieve Airflow hook to interface with the snowflake database.

Return type:

airflow.providers.snowflake.hooks.snowflake.SnowflakeHook

static get_table_qualified_name(table)

Return table qualified name. In Snowflake, it is the database, schema and table

Parameters:

table (astro.table.BaseTable) – The table we want to retrieve the qualified name for.

Return type:

str

create_file_format(file)

Create a new named file format.

Parameters:

file (astro.files.File) – File to use for file format creation.

Return type:

SnowflakeFileFormat

create_stage(file, storage_integration=None, metadata=None)

Creates a new named external stage to use for loading data from files into Snowflake tables and unloading data from tables into files.

At the moment, the following ways of authenticating to the backend are supported: * Google Cloud Storage (GCS): using storage_integration, previously created * Amazon (S3): one of the following: (i) using storage_integration or (ii) retrieving the AWS_KEY_ID and AWS_SECRET_KEY from the Airflow file connection

Parameters:
  • file (astro.files.File) – File to be copied from/to using stage

  • storage_integration (str | None) – Previously created Snowflake storage integration

  • metadata (astro.table.Metadata | None) – Contains Snowflake database and schema information

Returns:

Stage created

Return type:

SnowflakeStage

stage_exists(stage)

Checks if a Snowflake stage exists.

Param:

SnowflakeStage instance

Returns:

True/False

Parameters:

stage (SnowflakeStage) –

Return type:

bool

drop_stage(stage)

Runs the snowflake query to drop stage if it exists.

Parameters:

stage (SnowflakeStage) – Stage to be dropped

Return type:

None

is_native_autodetect_schema_available(file)

Check if native auto detection of schema is available.

Parameters:

file (astro.files.File) – File used to check the file type of to decide whether there is a native auto detection available for it.

Return type:

bool

create_table(table, *args, **kwargs)

Override create_table to add metadata columns to the table if specified in load_options

Parameters:

table (astro.table.BaseTable) –

create_table_using_native_schema_autodetection(table, file)

Create a SQL table, automatically inferring the schema using the given file via native database support.

Parameters:
  • table (astro.table.BaseTable) – The table to be created.

  • file (astro.files.File) – File used to infer the new table columns.

Return type:

None

classmethod use_quotes(cols)

With snowflake identifier we have two cases,

  1. When Upper/Mixed case col names are used

    We are required to preserver the text casing of the col names. By adding the quotes around identifier.

  2. When lower case col names are used

    We can use them as is

This is done to be in sync with Snowflake SQLAlchemy dialect. https://docs.snowflake.com/en/user-guide/sqlalchemy.html#object-name-case-handling

Snowflake stores all case-insensitive object names in uppercase text. In contrast, SQLAlchemy considers all lowercase object names to be case-insensitive. Snowflake SQLAlchemy converts the object name case during schema-level communication (i.e. during table and index reflection). If you use uppercase object names, SQLAlchemy assumes they are case-sensitive and encloses the names with quotes. This behavior will cause mismatches against data dictionary data received from Snowflake, so unless identifier names have been truly created as case sensitive using quotes (e.g. “TestDb”), all lowercase names should be used on the SQLAlchemy side.

Parameters:

cols (Sequence[str]) – list of columns

Return type:

bool

create_table_using_schema_autodetection(table, file=None, dataframe=None, columns_names_capitalization='original')

Create a SQL table, automatically inferring the schema using the given file. Overriding default behaviour and not using the prep_table since it doesn’t allow the adding quotes.

Parameters:
  • table (astro.table.BaseTable) – The table to be created.

  • file (astro.files.File | None) – File used to infer the new table columns.

  • dataframe (pandas.DataFrame | None) – Dataframe used to infer the new table columns if there is no file

  • columns_names_capitalization (astro.constants.ColumnCapitalization) –

Return type:

None

is_native_load_file_available(source_file, target_table)

Check if there is an optimised path for source to destination.

Parameters:
  • source_file (astro.files.File) – File from which we need to transfer data

  • target_table (astro.table.BaseTable) – Table that needs to be populated with file data

Return type:

bool

load_file_to_table_natively(source_file, target_table, if_exists='replace', native_support_kwargs=None, **kwargs)

Load the content of a file to an existing Snowflake table natively by: - Creating a Snowflake external stage - Using Snowflake COPY INTO statement

Requirements: - The user must have permissions to create a STAGE in Snowflake. - If loading from GCP Cloud Storage, native_support_kwargs must define storage_integration - If loading from AWS S3, the credentials for creating the stage may be retrieved from the Airflow connection or from the storage_integration attribute within native_support_kwargs.

Parameters:
  • source_file (astro.files.File) – File from which we need to transfer data

  • target_table (astro.table.BaseTable) – Table to which the content of the file will be loaded to

  • if_exists (astro.constants.LoadExistStrategy) – Strategy used to load (currently supported: “append” or “replace”)

  • native_support_kwargs (dict | None) – may be used for the stage creation, as described above.

static evaluate_results(rows)

check the error state returned by snowflake when running copy into query.

load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters:
  • source_dataframe (pandas.DataFrame) – Local or remote filepath

  • target_table (astro.table.BaseTable) – Table in which the file will be loaded

  • if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type:

None

get_sqlalchemy_template_table_identifier_and_parameter(table, jinja_table_identifier)

During the conversion from a Jinja-templated SQL query to a SQLAlchemy query, there is the need to convert a Jinja table identifier to a safe SQLAlchemy-compatible table identifier.

For Snowflake, the query:

sql_statement = “SELECT * FROM {{input_table}};” parameters = {“input_table”: Table(name=”user_defined_table”, metadata=Metadata(schema=”some_schema”))}

Will become

“SELECT * FROM IDENTIFIER(:input_table);” parameters = {“input_table”: “some_schema.user_defined_table”}

Example of usage:

jinja_table_identifier, jinja_table_parameter_value =                 get_sqlalchemy_template_table_identifier_and_parameter(
        Table(name="user_defined_table", metadata=Metadata(schema="some_schema"),
        "input_table"
    )
assert jinja_table_identifier == "IDENTIFIER(:input_table)"
assert jinja_table_parameter_value == "some_schema.user_defined_table"

Since the table value is templated, there is a safety concern (e.g. SQL injection). We recommend looking into the documentation of the database and seeing what are the best practices.

Parameters:
  • table (astro.table.BaseTable) – The table object we want to generate a safe table identifier for

  • jinja_table_identifier (str) – The name used within the Jinja template to represent this table

Returns:

value to replace the table identifier in the query and the value that should be used to replace it

Return type:

tuple[str, str]

schema_exists(schema)

Checks if a schema exists in the database

Parameters:

schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type:

bool

merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')

Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.

Parameters:
  • source_table (astro.table.BaseTable) – Contains the rows to be merged to the target_table

  • target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be merged

  • source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names

  • target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.

Return type:

None

append_table(source_table, target_table, source_to_target_columns_map)

Append the source table rows into a destination table.

Overriding the base method since we need to add quotes around the identifiers for

snowflake to preserve case of cols - Column(name=col, quote=True)

Parameters:
  • source_table (astro.table.BaseTable) – Contains the rows to be appended to the target_table

  • target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be appended

  • source_to_target_columns_map (dict[str, str]) – Dict of source_table columns names to target_table columns names

Return type:

None

classmethod get_merge_initialization_query(parameters)

Handles database-specific logic to handle constraints, keeping it agnostic to database.

Parameters:

parameters (tuple) –

Return type:

str

openlineage_dataset_name(table)

Returns the open lineage dataset name as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md Example: db_name.schema_name.table_name

Parameters:

table (astro.table.BaseTable) –

Return type:

str

openlineage_dataset_namespace()

Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md Example: snowflake://ACCOUNT

Return type:

str

openlineage_dataset_uri(table)

Returns the open lineage dataset uri as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

Parameters:

table (astro.table.BaseTable) –

Return type:

str

truncate_table(table)

Truncate table

astro.databases.snowflake.wrap_identifier(inp)
Parameters:

inp (str) –

Return type:

str

astro.databases.snowflake.is_valid_snow_identifier(name)

Because Snowflake does not allow using Identifier for inserts or updates, we need to make reasonable attempts to ensure that no one can perform a SQL injection using this method. The following method ensures that a string follows the expected identifier syntax.

Parameters:

name (str) –

Return type:

bool

astro.databases.snowflake.ensure_internal_quotes_closed(name)
Parameters:

name (str) –

Return type:

bool

astro.databases.snowflake.ensure_only_valid_characters(name)
Parameters:

name (str) –

Return type:

bool