get_file_list

When to use the get_file_list operator

You can use get_file_list to retrieve a list of available files based on a storage path and the Airflow connection. Based on the files available on your system storage, this can generate tasks dynamically.

The supported filesystems are Supported File Location

Warning

Fetching a lot of files using this method can lead to overloaded XCOM. This can create lot of parallel tasks when used in dynamic task map expand method.

The following example retrieves a file list from the GCS bucket and dynamically generates tasks using expand to upload each listed file to a Bigquery table.


import os
from datetime import datetime

from airflow import DAG
from airflow.decorators import task

from astro import sql as aql
from astro.files import get_file_list
from astro.sql import get_value_list
from astro.sql.operators.load_file import LoadFileOperator as LoadFile
from astro.table import Metadata, Table

GCS_BUCKET = os.getenv("GCS_BUCKET", "gs://dag-authoring/dynamic_task/")
ASTRO_GCP_CONN_ID = os.getenv("ASTRO_GCP_CONN_ID", "google_cloud_default")
ASTRO_BIGQUERY_DATASET = os.getenv("ASTRO_BIGQUERY_DATASET", "dag_authoring")
QUERY_STATEMENT = os.getenv(
    "ASTRO_BIGQUERY_DATASET",
    "SELECT rating FROM `astronomer-dag-authoring.dynamic_template.movie`",
)

default_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": 0,
}

with DAG(
    dag_id="example_dynamic_task_template",
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["airflow_version:2.3.0"],
    default_args=default_args,
) as dag:
    LoadFile.partial(
        task_id="load_gcs_to_bq",
        output_table=Table(
            metadata=Metadata(
                schema=ASTRO_BIGQUERY_DATASET,
            ),
            conn_id=ASTRO_GCP_CONN_ID,
        ),
        use_native_support=True,
    ).expand(input_file=get_file_list(path=GCS_BUCKET, conn_id=ASTRO_GCP_CONN_ID))