Configuration
Configuring the database default schema
If users don’t define a specific Table
(metadata) schema
, the Astro SDK will fall back to the global default schema configuration.
There are two options to define the default schema: 1. At a global level, for all databases 2. At a database level, for each specific database
If the user does not configure the database-specific configuration, the Astro SDK will use the global default schema (which has the value tmp_astro
if undefined). Example:
environment variable :
AIRFLOW__ASTRO_SDK__SQL_SCHEMA="tmp"
or by updating Airflow’s configuration
[astro_sdk]
schema = "tmp"
We can also configure the default schema specific to the database type (example: specific to Snowflake, BigQuery, Postgres). If both the default and database-specific schemas are defined, the preference is given to the database-specific value.
AIRFLOW__ASTRO_SDK__POSTGRES_DEFAULT_SCHEMA = "postgres_tmp"
AIRFLOW__ASTRO_SDK__BIGQUERY_DEFAULT_SCHEMA = "bigquery_tmp"
AIRFLOW__ASTRO_SDK__SNOWFLAKE_DEFAULT_SCHEMA = "snowflake_tmp"
AIRFLOW__ASTRO_SDK__REDSHIFT_DEFAULT_SCHEMA = "redshift_tmp"
AIRFLOW__ASTRO_SDK__MSSQL_DEFAULT_SCHEMA = "mssql_tmp"
or by updating Airflow’s configuration
[astro_sdk]
postgres_default_schema = "postgres_tmp"
bigquery_default_schema = "bigquery_tmp"
snowflake_default_schema = "snowflake_tmp"
redshift_default_schema = "redshift_tmp"
mssql_default_schema = "mssql_tmp"
Configuring if schemas existence should be checked and if the SDK should create them
By default, during aql.load_file
and aql.transform
, the SDK checks if the schema of the target table exists, and if not, it tries to create it. This type of check can be costly.
The configuration AIRFLOW__ASTRO_SDK__ASSUME_SCHEMA_EXISTS
allows users to inform the SDK that the schema already exists, skipping this check for all load_file
and transform
tasks.
The user can also have a more granular control, by defining the load_file
argument assume_schema_exists
on a per-task basis :ref:load_file.
Example of how to disable schema existence check using environment variables:
AIRFLOW__ASTRO_SDK__ASSUME_SCHEMA_EXISTS = True
Or using Airflow’s configuration file:
[astro_sdk]
assume_schema_exists = True
Configuring the unsafe dataframe storage
The dataframes (generated by dataframe
or transform
operators) are stored in XCom table using pickling in the Airflow metadata database. Since this dataframe is defined by the user and if it is huge, it might potentially break Airflow’s metadata DB by using all the available resources. Hence, unsafe dataframe storage should be set to True
once you are aware of this risk and are OK with it. Alternatively, you could use a Custom XCom backend to store the XCom data
AIRFLOW__ASTRO_SDK__DATAFRAME_ALLOW_UNSAFE_STORAGE = True
or by updating Airflow’s configuration
[astro_sdk]
dataframe_allow_unsafe_storage = True
Configuring the storage integration for Snowflake
A storage integration is a Snowflake object that stores a generated identity and access management (IAM) entity for your external cloud storage, along with an optional set of allowed or blocked storage locations (Amazon S3, Google Cloud Storage, or Microsoft Azure). Cloud provider administrators in your organization grant permissions on the storage locations to the generated entity. This option allows users to avoid supplying credentials when creating stages or when loading or unloading data.
Read more at: Snowflake storage integrations
AIRFLOW__ASTRO_SDK__SNOWFLAKE_STORAGE_INTEGRATION_AMAZON = "aws_integration"
AIRFLOW__ASTRO_SDK__SNOWFLAKE_STORAGE_INTEGRATION_GOOGLE = "gcp_integration"
or by updating Airflow’s configuration
[astro_sdk]
snowflake_storage_integration_amazon = "aws_integration"
snowflake_storage_integration_google = "gcp_integration"
Configuring the table autodetect row count
Following configuration indicates how many file rows should be loaded to infer the table columns types. This defaults to 1000 rows.
AIRFLOW__ASTRO_SDK__LOAD_TABLE_AUTODETECT_ROWS_COUNT = 1000
or by updating Airflow’s configuration
[astro_sdk]
load_table_autodetect_rows_count = 1000
Configuring the RAW SQL maximum response size
Reduce responses sizes returned by aql.run_raw_sql to avoid trashing the Airflow DB if the BaseXCom is used.
AIRFLOW__ASTRO_SDK__RUN_RAW_SQL_RESPONSE_SIZE = 1
or by updating Airflow’s configuration
[astro_sdk]
run_raw_sql_response_size = 1
Configuring the Dataset inlets/outlets
Astro SDK automatically adds inlets and outlets for all the operators if DATASET is supported (Airflow >=2.4).
While users can override it on a task level by adding inlets and outlets, this might be inconvenient for some users who do not want to leverage Data-aware scheduling. Such users can set the following config to False
to disable auto addition of inlets and outlets
AIRFLOW__ASTRO_SDK__AUTO_ADD_INLETS_OUTLETS = True
or by updating Airflow’s configuration
[astro_sdk]
auto_add_inlets_outlets = True
Configuring to emit temp table event in openlineage
Astro SDK has ability to create temporary tables see: Tables.
By default, we emit the temporary tables event in openlineage.
This might be not that useful for some users who do not want to emit such event in openlineage. Such users can set the following config to False
to disable it.
AIRFLOW__ASTRO_SDK__OPENLINEAGE_EMIT_TEMP_TABLE_EVENT = True
or by updating Airflow’s configuration
[astro_sdk]
openlineage_emit_temp_table_event = True
Configuring the native fallback mechanism
The LoadFileOperator
has a fallback mechanism when loading data to the database from file storage as explained in How load_file Works.
This fallback can be configured at the task level using enable_native_fallback
param.
Users can also control this setting and override the default at a global level (for all tasks) by setting the following config. Set it to True
to allow falling back to “pandas” path.
AIRFLOW__ASTRO_SDK__LOAD_FILE_ENABLE_NATIVE_FALLBACK = False
or by updating Airflow’s configuration
[astro_sdk]
load_file_enable_native_fallback = False
Configuring the max memory limit for a Dataframe to be stored in XCom table
If you are using Astro SDK with Airflow >= 2.5, you no longer need to use pickling or a Custom XCom backend to store Astro SDK’s dataset class or dataframes. Airflow will take care of serializing and deserializing them if you have set the following:
AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES = airflow\.* astro\.*
or by updating airflow.cfg
[core]
allowed_deserialization_classes = airflow\.* astro\.*
The dataframes (generated by dataframe
, transform
and other functions/operators where you don’t
pass output_table
) are stored in XCom table if you are not using a Custom XCom backend.
Since this dataframe is defined by the user and if it is huge, it might potentially break Airflow’s metadata DB by using all the available resources.
Hence, the SDK limits the amount of data stored (in kbs) in that table. This is controlled by the following setting:
AIRFLOW__ASTRO_SDK__MAX_DATAFRAME_MEM_FOR_XCOM_DB = 100
or by updating airflow.cfg
[astro_sdk]
max_dataframe_mem_for_xcom_db = 100
The value is represented in kbs, the default limit is 100 kb. If a dataframe is less than that, it is stored
in the XCom table. If it is greater than that, it is stored in an object store defined by the xcom_storage_conn_id
and xcom_storage_url
as shown below:
[astro_sdk]
xcom_storage_conn_id = gcp_conn_id
xcom_storage_url = gs://astro_sdk/temp
max_dataframe_mem_for_xcom_db = 100
or
AIRFLOW__ASTRO_SDK__XCOM_STORAGE_CONN_ID = 100
AIRFLOW__ASTRO_SDK__XCOM_STORAGE_URL = gs://astro_sdk/temp
AIRFLOW__ASTRO_SDK__MAX_DATAFRAME_MEM_FOR_XCOM_DB = 100
If all Airflow’s component are on a single machine, by default the xcom_storage_url
is the temp directory
on the host and you can ignore passing the xcom_storage_conn_id
.