astro.databases.snowflake
Snowflake database implementation.
Module Contents
Functions
|
|
|
Because Snowflake does not allow using Identifier for inserts or updates, we need to make reasonable attempts to |
Classes
Handle interactions with snowflake databases. If this class is successful, we should not have any snowflake-specific |
Attributes
- astro.databases.snowflake.wrap_identifier(inp)
- Parameters
inp (str) –
- Return type
str
- astro.databases.snowflake.is_valid_snow_identifier(name)
Because Snowflake does not allow using Identifier for inserts or updates, we need to make reasonable attempts to ensure that no one can perform a SQL injection using this method. The following method ensures that a string follows the expected identifier syntax https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html
- Parameters
name (str) –
- Return type
bool
- astro.databases.snowflake.ensure_internal_quotes_closed(name)
- Parameters
name (str) –
- Return type
bool
- astro.databases.snowflake.ensure_only_valid_characters(name)
- Parameters
name (str) –
- Return type
bool
- astro.databases.snowflake.DEFAULT_CONN_ID
- class astro.databases.snowflake.SnowflakeDatabase(conn_id=DEFAULT_CONN_ID)
Bases:
astro.databases.base.BaseDatabase
Handle interactions with snowflake databases. If this class is successful, we should not have any snowflake-specific logic in other parts of our code-base.
- Parameters
conn_id (str) –
- property hook
Retrieve Airflow hook to interface with the snowflake database.
- Return type
airflow.providers.snowflake.hooks.snowflake.SnowflakeHook
- property sql_type
- Return type
str
- property default_metadata
Fill in default metadata values for table objects addressing snowflake databases
- Return type
- static get_table_qualified_name(table)
Return table qualified name. In Snowflake, it is the database, schema and table
- Parameters
table (astro.sql.table.Table) – The table we want to retrieve the qualified name for.
- Return type
str
- load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.
- Parameters
source_dataframe (pandas.DataFrame) – Local or remote filepath
target_table (astro.sql.table.Table) – Table in which the file will be loaded
if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.
chunk_size (int) – Specify the number of rows in each batch to be written at a time.
- Return type
None
- get_sqlalchemy_template_table_identifier_and_parameter(table, jinja_table_identifier)
During the conversion from a Jinja-templated SQL query to a SQLAlchemy query, there is the need to convert a Jinja table identifier to a safe SQLAlchemy-compatible table identifier.
- For Snowflake, the query:
sql_statement = “SELECT * FROM {{input_table}};” parameters = {“input_table”: Table(name=”user_defined_table”, metadata=Metadata(schema=”some_schema”))}
- Will become
“SELECT * FROM IDENTIFIER(:input_table);” parameters = {“input_table”: “some_schema.user_defined_table”}
- Example of usage:
- jinja_table_identifier, jinja_table_parameter_value = get_sqlalchemy_template_table_identifier_and_parameter(
Table(name=”user_defined_table”, metadata=Metadata(schema=”some_schema”), “input_table”
)
assert jinja_table_identifier == “IDENTIFIER(:input_table)” assert jinja_table_parameter_value == “some_schema.user_defined_table”
Since the table value is templated, there is a safety concern (e.g. SQL injection). We recommend looking into the documentation of the database and seeing what are the best practices. This is the Snowflake documentation: https://docs.snowflake.com/en/sql-reference/identifier-literal.html
- Parameters
table (astro.sql.table.Table) – The table object we want to generate a safe table identifier for
jinja_table_identifier (str) – The name used within the Jinja template to represent this table
- Returns
value to replace the table identifier in the query and the value that should be used to replace it
- Return type
Tuple[str, str]
- schema_exists(schema)
Checks if a schema exists in the database
- Parameters
schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)
- Return type
bool
- merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')
Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.
- Parameters
source_table (astro.sql.table.Table) – Contains the rows to be merged to the target_table
target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be merged
source_to_target_columns_map (Dict[str, str]) – Dict of target_table columns names to source_table columns names
target_conflict_columns (List[str]) – List of cols where we expect to have a conflict while combining
if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.
- Return type
None
- _build_merge_sql(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')
Build the SQL statement for Merge operation
- Parameters
source_table (astro.sql.table.Table) –
target_table (astro.sql.table.Table) –
source_to_target_columns_map (Dict[str, str]) –
target_conflict_columns (List[str]) –
if_conflicts (astro.constants.MergeConflictStrategy) –