get_file_list
When to use the get_file_list
operator
You can use get_file_list
to retrieve a list of available files based on a storage path and the Airflow connection. Based on the files available on your system storage, this can generate tasks dynamically.
The supported filesystems are Supported File Location
Warning
Fetching a lot of files using this method can lead to overloaded XCOM. This can create lot of parallel tasks when used in dynamic task map expand
method.
The following example retrieves a file list from the GCS bucket and dynamically generates tasks using expand
to upload each listed file to a Bigquery table.
import os
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from astro import sql as aql
from astro.files import get_file_list
from astro.sql import get_value_list
from astro.sql.operators.load_file import LoadFileOperator as LoadFile
from astro.table import Metadata, Table
GCS_BUCKET = os.getenv("GCS_BUCKET", "gs://dag-authoring/dynamic_task/")
ASTRO_GCP_CONN_ID = os.getenv("ASTRO_GCP_CONN_ID", "google_cloud_default")
ASTRO_BIGQUERY_DATASET = os.getenv("ASTRO_BIGQUERY_DATASET", "dag_authoring")
QUERY_STATEMENT = os.getenv(
"ASTRO_BIGQUERY_DATASET",
"SELECT rating FROM `astronomer-dag-authoring.dynamic_template.movie`",
)
default_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": 0,
}
with DAG(
dag_id="example_dynamic_task_template",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["airflow_version:2.3.0"],
default_args=default_args,
) as dag:
LoadFile.partial(
task_id="load_gcs_to_bq",
output_table=Table(
metadata=Metadata(
schema=ASTRO_BIGQUERY_DATASET,
),
conn_id=ASTRO_GCP_CONN_ID,
),
use_native_support=True,
).expand(input_file=get_file_list(path=GCS_BUCKET, conn_id=ASTRO_GCP_CONN_ID))