astro.databases.snowflake
Snowflake database implementation.
Module Contents
Classes
Dataclass which abstracts properties of a Snowflake Stage. |
|
Handle interactions with snowflake databases. If this class is successful, we should not have any snowflake-specific |
Functions
|
|
|
Because Snowflake does not allow using Identifier for inserts or updates, |
Attributes
- astro.databases.snowflake.DEFAULT_CONN_ID
- astro.databases.snowflake.ASTRO_SDK_TO_SNOWFLAKE_FILE_FORMAT_MAP
- astro.databases.snowflake.COPY_OPTIONS
- astro.databases.snowflake.DEFAULT_STORAGE_INTEGRATION
- astro.databases.snowflake.NATIVE_LOAD_SUPPORTED_FILE_TYPES
- astro.databases.snowflake.NATIVE_LOAD_SUPPORTED_FILE_LOCATIONS
- class astro.databases.snowflake.SnowflakeStage
Dataclass which abstracts properties of a Snowflake Stage.
Snowflake Stages are used to loading tables and unloading data from tables into files.
Example:
snowflake_stage = SnowflakeStage( name="stage_name", url="gcs://bucket/prefix", metadata=Metadata(database="SNOWFLAKE_DATABASE", schema="SNOWFLAKE_SCHEMA"), )
- name :str =
- Return type
str
- url :str =
- metadata :astro.sql.table.Metadata
- set_url_from_file(file)
Given a file to be loaded/unloaded to from Snowflake, identifies its folder and sets as self.url.
It is also responsbile for adjusting any path specific requirements for Snowflake.
- Parameters
file (astro.files.File) – File to be loaded/unloaded to from Snowflake
- Return type
None
- property qualified_name
Return stage qualified name. In Snowflake, it is the database, schema and table
- Returns
Snowflake stage qualified name (e.g. database.schema.table)
- Return type
str
- class astro.databases.snowflake.SnowflakeDatabase(conn_id=DEFAULT_CONN_ID)
Bases:
astro.databases.base.BaseDatabase
Handle interactions with snowflake databases. If this class is successful, we should not have any snowflake-specific logic in other parts of our code-base.
- Parameters
conn_id (str) –
- NATIVE_LOAD_EXCEPTIONS :Any
- DEFAULT_SCHEMA
- property hook
Retrieve Airflow hook to interface with the snowflake database.
- Return type
airflow.providers.snowflake.hooks.snowflake.SnowflakeHook
- property sql_type
- Return type
str
- property default_metadata
Fill in default metadata values for table objects addressing snowflake databases
- Return type
- static get_table_qualified_name(table)
Return table qualified name. In Snowflake, it is the database, schema and table
- Parameters
table (astro.sql.table.Table) – The table we want to retrieve the qualified name for.
- Return type
str
- create_stage(file, storage_integration=None, metadata=None)
Creates a new named external stage to use for loading data from files into Snowflake tables and unloading data from tables into files.
At the moment, the following ways of authenticating to the backend are supported: * Google Cloud Storage (GCS): using storage_integration, previously created * Amazon (S3): one of the following: (i) using storage_integration or (ii) retrieving the AWS_KEY_ID and AWS_SECRET_KEY from the Airflow file connection
- Parameters
file (astro.files.File) – File to be copied from/to using stage
storage_integration (str | None) – Previously created Snowflake storage integration
metadata (Metadata | None) – Contains Snowflake database and schema information
- Returns
Stage created
- Return type
- stage_exists(stage)
Checks if a Snowflake stage exists.
- Param
SnowflakeStage instance
- Returns
True/False
- Parameters
stage (SnowflakeStage) –
- Return type
bool
- drop_stage(stage)
Runs the snowflake query to drop stage if it exists.
- Parameters
stage (SnowflakeStage) – Stage to be dropped
- Return type
None
- create_table_using_schema_autodetection(table, file=None, dataframe=None, columns_names_capitalization='lower')
Create a SQL table, automatically inferring the schema using the given file.
- Parameters
table (astro.sql.table.Table) – The table to be created.
file (File | None) – File used to infer the new table columns.
dataframe (pd.DataFrame | None) – Dataframe used to infer the new table columns if there is no file
columns_names_capitalization (astro.constants.ColumnCapitalization) –
- Return type
None
- is_native_load_file_available(source_file, target_table)
Check if there is an optimised path for source to destination.
- Parameters
source_file (astro.files.File) – File from which we need to transfer data
target_table (astro.sql.table.Table) – Table that needs to be populated with file data
- Return type
bool
- load_file_to_table_natively(source_file, target_table, if_exists='replace', native_support_kwargs=None, **kwargs)
Load the content of a file to an existing Snowflake table natively by: - Creating a Snowflake external stage - Using Snowflake COPY INTO statement
Requirements: - The user must have permissions to create a STAGE in Snowflake. - If loading from GCP Cloud Storage, native_support_kwargs must define storage_integration - If loading from AWS S3, the credentials for creating the stage may be retrieved from the Airflow connection or from the storage_integration attribute within native_support_kwargs.
- Parameters
source_file (astro.files.File) – File from which we need to transfer data
target_table (astro.sql.table.Table) – Table to which the content of the file will be loaded to
if_exists (astro.constants.LoadExistStrategy) – Strategy used to load (currently supported: “append” or “replace”)
native_support_kwargs (dict | None) – may be used for the stage creation, as described above.
- load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.
- Parameters
source_dataframe (pandas.DataFrame) – Local or remote filepath
target_table (astro.sql.table.Table) – Table in which the file will be loaded
if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.
chunk_size (int) – Specify the number of rows in each batch to be written at a time.
- Return type
None
- get_sqlalchemy_template_table_identifier_and_parameter(table, jinja_table_identifier)
During the conversion from a Jinja-templated SQL query to a SQLAlchemy query, there is the need to convert a Jinja table identifier to a safe SQLAlchemy-compatible table identifier.
- For Snowflake, the query:
sql_statement = “SELECT * FROM {{input_table}};” parameters = {“input_table”: Table(name=”user_defined_table”, metadata=Metadata(schema=”some_schema”))}
- Will become
“SELECT * FROM IDENTIFIER(:input_table);” parameters = {“input_table”: “some_schema.user_defined_table”}
Example of usage:
jinja_table_identifier, jinja_table_parameter_value = get_sqlalchemy_template_table_identifier_and_parameter( Table(name="user_defined_table", metadata=Metadata(schema="some_schema"), "input_table" ) assert jinja_table_identifier == "IDENTIFIER(:input_table)" assert jinja_table_parameter_value == "some_schema.user_defined_table"
Since the table value is templated, there is a safety concern (e.g. SQL injection). We recommend looking into the documentation of the database and seeing what are the best practices.
- Parameters
table (astro.sql.table.Table) – The table object we want to generate a safe table identifier for
jinja_table_identifier (str) – The name used within the Jinja template to represent this table
- Returns
value to replace the table identifier in the query and the value that should be used to replace it
- Return type
tuple[str, str]
- schema_exists(schema)
Checks if a schema exists in the database
- Parameters
schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)
- Return type
bool
- merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')
Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.
- Parameters
source_table (astro.sql.table.Table) – Contains the rows to be merged to the target_table
target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be merged
source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names
target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining
if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.
- Return type
None
- astro.databases.snowflake.wrap_identifier(inp)
- Parameters
inp (str) –
- Return type
str
- astro.databases.snowflake.is_valid_snow_identifier(name)
Because Snowflake does not allow using Identifier for inserts or updates, we need to make reasonable attempts to ensure that no one can perform a SQL injection using this method. The following method ensures that a string follows the expected identifier syntax.
- Parameters
name (str) –
- Return type
bool
- astro.databases.snowflake.ensure_internal_quotes_closed(name)
- Parameters
name (str) –
- Return type
bool
- astro.databases.snowflake.ensure_only_valid_characters(name)
- Parameters
name (str) –
- Return type
bool