Configuration

Configuring the database default schema

If users don’t define a specific Table (metadata) schema, the Astro SDK will fall back to the global default schema configuration.

There are two options to define the default schema: 1. At a global level, for all databases 2. At a database level, for each specific database

If the user does not configure the database-specific configuration, the Astro SDK will use the global default schema (which has the value tmp_astro if undefined). Example: environment variable :

AIRFLOW__ASTRO_SDK__SQL_SCHEMA="tmp"

or by updating Airflow’s configuration

[astro_sdk]
schema = "tmp"

We can also configure the default schema specific to the database type (example: specific to Snowflake, BigQuery, Postgres). If both the default and database-specific schemas are defined, the preference is given to the database-specific value.

AIRFLOW__ASTRO_SDK__POSTGRES_DEFAULT_SCHEMA = "postgres_tmp"
AIRFLOW__ASTRO_SDK__BIGQUERY_DEFAULT_SCHEMA = "bigquery_tmp"
AIRFLOW__ASTRO_SDK__SNOWFLAKE_DEFAULT_SCHEMA = "snowflake_tmp"
AIRFLOW__ASTRO_SDK__REDSHIFT_DEFAULT_SCHEMA = "redshift_tmp"
AIRFLOW__ASTRO_SDK__MSSQL_DEFAULT_SCHEMA = "mssql_tmp"

or by updating Airflow’s configuration

[astro_sdk]
postgres_default_schema = "postgres_tmp"
bigquery_default_schema = "bigquery_tmp"
snowflake_default_schema = "snowflake_tmp"
redshift_default_schema = "redshift_tmp"
mssql_default_schema = "mssql_tmp"

Configuring if schemas existence should be checked and if the SDK should create them

By default, during aql.load_file and aql.transform, the SDK checks if the schema of the target table exists, and if not, it tries to create it. This type of check can be costly.

The configuration AIRFLOW__ASTRO_SDK__ASSUME_SCHEMA_EXISTS allows users to inform the SDK that the schema already exists, skipping this check for all load_file and transform tasks.

The user can also have a more granular control, by defining the load_file argument assume_schema_exists on a per-task basis :ref:load_file.

Example of how to disable schema existence check using environment variables:

AIRFLOW__ASTRO_SDK__ASSUME_SCHEMA_EXISTS = True

Or using Airflow’s configuration file:

[astro_sdk]
assume_schema_exists = True

Configuring the unsafe dataframe storage

The dataframes (generated by dataframe or transform operators) are stored in XCom table using pickling in the Airflow metadata database. Since this dataframe is defined by the user and if it is huge, it might potentially break Airflow’s metadata DB by using all the available resources. Hence, unsafe dataframe storage should be set to True once you are aware of this risk and are OK with it. Alternatively, you could use a Custom XCom backend to store the XCom data

AIRFLOW__ASTRO_SDK__DATAFRAME_ALLOW_UNSAFE_STORAGE = True

or by updating Airflow’s configuration

[astro_sdk]
dataframe_allow_unsafe_storage = True

Configuring the storage integration for Snowflake

A storage integration is a Snowflake object that stores a generated identity and access management (IAM) entity for your external cloud storage, along with an optional set of allowed or blocked storage locations (Amazon S3, Google Cloud Storage, or Microsoft Azure). Cloud provider administrators in your organization grant permissions on the storage locations to the generated entity. This option allows users to avoid supplying credentials when creating stages or when loading or unloading data.

Read more at: Snowflake storage integrations

AIRFLOW__ASTRO_SDK__SNOWFLAKE_STORAGE_INTEGRATION_AMAZON = "aws_integration"
AIRFLOW__ASTRO_SDK__SNOWFLAKE_STORAGE_INTEGRATION_GOOGLE = "gcp_integration"

or by updating Airflow’s configuration

[astro_sdk]
snowflake_storage_integration_amazon = "aws_integration"
snowflake_storage_integration_google = "gcp_integration"

Configuring the table autodetect row count

Following configuration indicates how many file rows should be loaded to infer the table columns types. This defaults to 1000 rows.

AIRFLOW__ASTRO_SDK__LOAD_TABLE_AUTODETECT_ROWS_COUNT = 1000

or by updating Airflow’s configuration

[astro_sdk]
load_table_autodetect_rows_count = 1000

Configuring the RAW SQL maximum response size

Reduce responses sizes returned by aql.run_raw_sql to avoid trashing the Airflow DB if the BaseXCom is used.

AIRFLOW__ASTRO_SDK__RUN_RAW_SQL_RESPONSE_SIZE = 1

or by updating Airflow’s configuration

[astro_sdk]
run_raw_sql_response_size = 1

Configuring the Dataset inlets/outlets

Astro SDK automatically adds inlets and outlets for all the operators if DATASET is supported (Airflow >=2.4).

While users can override it on a task level by adding inlets and outlets, this might be inconvenient for some users who do not want to leverage Data-aware scheduling. Such users can set the following config to False to disable auto addition of inlets and outlets

AIRFLOW__ASTRO_SDK__AUTO_ADD_INLETS_OUTLETS = True

or by updating Airflow’s configuration

[astro_sdk]
auto_add_inlets_outlets = True

Configuring to emit temp table event in openlineage

Astro SDK has ability to create temporary tables see: Tables.

By default, we emit the temporary tables event in openlineage.

This might be not that useful for some users who do not want to emit such event in openlineage. Such users can set the following config to False to disable it.

AIRFLOW__ASTRO_SDK__OPENLINEAGE_EMIT_TEMP_TABLE_EVENT = True

or by updating Airflow’s configuration

[astro_sdk]
openlineage_emit_temp_table_event = True

Configuring the native fallback mechanism

The LoadFileOperator has a fallback mechanism when loading data to the database from file storage as explained in How load_file Works.

This fallback can be configured at the task level using enable_native_fallback param.

Users can also control this setting and override the default at a global level (for all tasks) by setting the following config. Set it to True to allow falling back to “pandas” path.

AIRFLOW__ASTRO_SDK__LOAD_FILE_ENABLE_NATIVE_FALLBACK = False

or by updating Airflow’s configuration

[astro_sdk]
load_file_enable_native_fallback = False

Configuring the max memory limit for a Dataframe to be stored in XCom table

If you are using Astro SDK with Airflow >= 2.5, you no longer need to use pickling or a Custom XCom backend to store Astro SDK’s dataset class or dataframes. Airflow will take care of serializing and deserializing them if you have set the following:

AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES = airflow\.* astro\.*

or by updating airflow.cfg

[core]
allowed_deserialization_classes = airflow\.* astro\.*

The dataframes (generated by dataframe, transform and other functions/operators where you don’t pass output_table) are stored in XCom table if you are not using a Custom XCom backend.

Since this dataframe is defined by the user and if it is huge, it might potentially break Airflow’s metadata DB by using all the available resources.

Hence, the SDK limits the amount of data stored (in kbs) in that table. This is controlled by the following setting:

AIRFLOW__ASTRO_SDK__MAX_DATAFRAME_MEM_FOR_XCOM_DB = 100

or by updating airflow.cfg

[astro_sdk]
max_dataframe_mem_for_xcom_db = 100

The value is represented in kbs, the default limit is 100 kb. If a dataframe is less than that, it is stored in the XCom table. If it is greater than that, it is stored in an object store defined by the xcom_storage_conn_id and xcom_storage_url as shown below:

[astro_sdk]
xcom_storage_conn_id = gcp_conn_id
xcom_storage_url = gs://astro_sdk/temp
max_dataframe_mem_for_xcom_db = 100

or

AIRFLOW__ASTRO_SDK__XCOM_STORAGE_CONN_ID = 100
AIRFLOW__ASTRO_SDK__XCOM_STORAGE_URL = gs://astro_sdk/temp
AIRFLOW__ASTRO_SDK__MAX_DATAFRAME_MEM_FOR_XCOM_DB = 100

If all Airflow’s component are on a single machine, by default the xcom_storage_url is the temp directory on the host and you can ignore passing the xcom_storage_conn_id.