load_file operator
When to use the load_file
operator
The load_file
operator allows you to load data from files into your target transformation system.
There are two uses of the load_file
operator.
Loading a file(s) into a database table
Loading a file(s) into a Pandas dataframe
- Case 1: Load files into a database table
To load files into a database table, you need to provide the name and connection to the target table with the
output_table
parameter. The operator will return an instance of the table object passed inoutput_table
. If the specified table does not already exist, it will be created. If it does already exist, it will be replaced, unless the if_exists parameter is modified.my_homes_table = aql.load_file( input_file=File(path="s3://astro-sdk/python_sdk/example_dags/data/sample.csv"), output_table=Table( conn_id="postgres_conn", ), )
- Case 2: Load files into a Pandas dataframe
If you don’t provide an
output_table
to theload_file
operator, it will convert the file into a Pandas dataframe and return the reference to dataframe.dataframe = aql.load_file( input_file=File(path="s3://astro-sdk/python_sdk/example_dags/data/sample.csv"), )
Parameters to use when loading a file to a database table
if_exists - If the table you trying to create already exists, you can specify whether you want to replace the table or append the new data by specifying either
if_exists='append'
orif_exists='replace'
.new_table = aql.load_file( input_file=File(path="s3://astro-sdk/python_sdk/example_dags/data/sample.csv"), output_table=Table( conn_id="postgres_conn", ), if_exists="replace", )
Note that if you use
if_exists='replace'
, the existing table will be dropped and the schema of the new data will be used.output_table - This parameter defines the output table to load data to, which should be an instance of
astro.sql.table.Table
. You can specify the schema of the table by providing a list of the instance ofsqlalchemy.Column <https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.Column>
to thecolumns
parameter. If you don’t specify a schema, it will be inferred using Pandas.custom_schema_table = aql.load_file( input_file=File(path="s3://astro-sdk/python_sdk/example_dags/data/sample.csv"), output_table=Table( conn_id="postgres_conn", columns=[ sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True), sqlalchemy.Column("name", sqlalchemy.String(60), nullable=False, key="name"), ], ), )
columns_names_capitalization - If you are working with a
Snowflake
database with Inferring a Table Schema and withif_exists=replace
, you can control whether the column names of the output table are capitalized. The default is to convert all column names to lowercase. Valid inputs arelower
,upper
, ororiginal
which will convert column names to lowercase.load_options - LoadOptions
ndjson_normalize_sep - If your input file type is NDJSON, you can use this parameter to normalize the data to two dimensions. This makes the data suitable for loading into a table. This parameter is used as a delimiter for combining columns names if required.
- example:
input JSON:
{"a": {"b": "c"}}
output table:
a_b
c
Note - columns a and b are merged to form one column a_b and _ is used as a delimiter.
sample_table = aql.load_file( input_file=File(path="s3://astro-sdk/python_sdk/example_dags/data/sample.ndjson"), output_table=Table( conn_id="postgres_conn", ), ndjson_normalize_sep="__", )
Inferring a Table Schema
There are three ways to infer the schema of the table to be created, listed by priority:
User specified schema - You can specify the schema of the table to be created in the Table object, like the
output_table
section in Parameters to use when loading a file to a database tableNative auto schema detection - If available, this will be used over pandas auto schema detection below, which will use the schema inference mechanism provided by the database.
Pandas auto schema detection - if you don’t specify the schema in the table object, then
load_file
will infer the schema using the top 1000 rows. The default value of rows to look at is 1000, but this can be changed by creating an environment variable.AIRFLOW__ASTRO_SDK_LOAD_TABLE_AUTODETECT_ROWS_COUNT
Or within your Airflow config:
[astro_sdk] load_table_autodetect_rows_count = 1000
Note - this only applies to Supported File Type JSON, NDJSON and CSV. PARQUET files have type information so schema inference is unnecessary.
Parameters to use when loading a file to a Pandas dataframe
columns_names_capitalization: Use to control the capitalization of column names in the generated dataframe. The default value is
original
.original - Remains the same as the input file
upper - Convert to uppercase
lower - Convert to lowercase
dataframe = aql.load_file( input_file=File(path="s3://astro-sdk/python_sdk/example_dags/data/sample.csv"), columns_names_capitalization="upper", )
load_options - LoadOptions
Parameters for native transfer
Refer to How load_file Works for details on Native Path.
use_native_support: Native transfer support is available for some file sources and databases. If it is available for your systems, the default is to use this support. To leverage native transfer support, certain settings may need to be modified on your destination database. If you do not wish to use native transfer support, you can turn off this behavior by specifying
use_native_support=False
.This feature is enabled by default, to disable it refer to the example below.
aql.load_file( input_file=File("s3://tmp9/homes_main.csv", conn_id="aws_conn"), output_table=Table(conn_id="bigquery", metadata=Metadata(schema="astro")), use_native_support=False, )
To check if the native transfer will be used for your combination of file location and database, refer to Supported native transfers.
When to turn off native transfer:
Sometimes additional services need to be enabled on the target database to use native transfer.
For example, see https://cloud.google.com/bigquery-transfer/docs/s3-transfer
There may be additional costs associated due to the services used to perform the native transfer.
Native transfers are overkill in cases when you are transferring smaller files. It may take less time to load small files using the default approach.
native_support_kwargs:
load_file
supports multiple databases that may require different parameters like to process a file or control error rate. You can specify those parameters innative_support_kwargs
. These parameters will be passed to the destination database.Check for valid parameters based on file location and database combination in Supported native transfers
aql.load_file( input_file=File("s3://tmp9/homes_main.csv", conn_id="aws_conn"), output_table=Table(conn_id="bigquery", metadata=Metadata(schema="astro")), use_native_support=True, native_support_kwargs={ "ignore_unknown_values": True, "allow_jagged_rows": True, "skip_leading_rows": "1", }, )
enable_native_fallback: When
use_native_support
is set toTrue
,load_file
will attempt to use native transfer. If this fails,load_file
task will fail unless you Configuring the native fallback mechanismaql.load_file( input_file=File( "gs://astro-sdk/workspace/sample_pattern.csv", conn_id="bigquery", filetype=FileType.CSV, ), output_table=Table(conn_id="bigquery", metadata=Metadata(schema="astro")), use_native_support=True, native_support_kwargs={ "ignore_unknown_values": True, "allow_jagged_rows": True, "skip_leading_rows": "1", }, enable_native_fallback=True, )
load_options - LoadOptions
Supported native transfers
File Location |
Database |
native_support_kwargs params |
Permission |
S3 |
Bigquery |
https://cloud.google.com/bigquery-transfer/docs/s3-transfer#bq |
https://cloud.google.com/bigquery/docs/s3-transfer#required_permissions and |
GCS |
Bigquery |
https://cloud.google.com/bigquery-transfer/docs/cloud-storage-transfer#bq |
https://cloud.google.com/bigquery/docs/cloud-storage-transfer#required_permissions and |
S3 |
Snowflake |
https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html |
|
GCS |
Snowflake |
https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html |
https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html |
S3 |
Redshift |
https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-create-role.html |
Note
For loading from S3 to Redshift database, although Redshift allows the below two options for authorization, we only support the IAM Role option as if you pass CREDENTIALS in the query, they might get printed in logs and have a potential risk of getting leaked:
IAM Role
CREDENTIALS
Reference on how to create such a role is here: https://www.dataliftoff.com/iam-roles-for-loading-data-from-s3-into-redshift/
Patterns in file path
load_file
can resolve patterns in file path. There are three types of patterns supported based on the Supported File Location
Local - On local we support glob pattern - glob doc
my_homes_table = aql.load_file( input_file=File(path=str(CWD.parent) + "/tests/data/homes*", filetype=FileType.CSV), output_table=Table( conn_id="postgres_conn", ), )
S3 - prefix in file path - S3 doc
aql.load_file( input_file=File("s3://astro-sdk/sample_pattern", conn_id="aws_conn", filetype=FileType.CSV), output_table=Table(conn_id="bigquery", metadata=Metadata(schema="astro")), use_native_support=False, )
GCS - prefix in file path - GCS doc
aql.load_file( input_file=File( "gs://astro-sdk/workspace/sample_pattern.csv", conn_id="bigquery", filetype=FileType.CSV, ), output_table=Table(conn_id="bigquery", metadata=Metadata(schema="astro")), use_native_support=False, )
GCS to Bigquery - only applicable when using native path(for details check -How load_file Works)
When loading data from
GCS
toBigquery
, we by default use the native path, which is faster since the schema detection and pattern are processed directly byBigquery
. We can also process multiple files by passing a pattern; for a valid pattern, check Bigquery doc and look for thesourceUris
field.
Inferring file type
Supported File Type will be inferred in two ways with the following priority:
File object - If the user has passed the
filetype
parameter while declaring theastro.files.File
object, that file type will be used. Valid values are listed in Supported File Type.my_homes_table = aql.load_file( input_file=File(path=str(CWD.parent) + "/tests/data/homes*", filetype=FileType.CSV), output_table=Table( conn_id="postgres_conn", ), )
Note - This parameter becomes mandatory when the file path don’t have an extension.
From file extensions - When an
astro.files.File
object is created and provided a fully qualified path, the file extension is used to infer file type. Here the file type is CSV.new_table = aql.load_file( input_file=File(path="s3://astro-sdk/python_sdk/example_dags/data/sample.csv"), output_table=Table( conn_id="postgres_conn", ), if_exists="replace", )
Loading data from HTTP API
Users can also load data from an HTTP API:
t1 = aql.load_file(
task_id="load_from_github_to_bq",
input_file=File(
path="https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv"
),
output_table=Table(name="imdb_movies", conn_id="bigquery", metadata=Metadata(schema="astro")),
)
Loading data from SFTP
Users can also load data from an SFTP:
aql.load_file(
input_file=File(
path="sftp://upload/ADOPTION_CENTER_1_unquoted.csv", conn_id="sftp_conn", filetype=FileType.CSV
),
output_table=Table(
conn_id=SNOWFLAKE_CONN_ID,
metadata=Metadata(
database=os.environ["SNOWFLAKE_DATABASE"],
schema=os.environ["SNOWFLAKE_SCHEMA"],
),
),
)
Loading data from FTP
Users can also load data from an FTP:
aql.load_file(
input_file=File(
path="ftp://upload/ADOPTION_CENTER_1_unquoted.csv",
conn_id="ftp_conn",
filetype=FileType.CSV,
),
output_table=Table(
conn_id=SNOWFLAKE_CONN_ID,
metadata=Metadata(
database=os.environ["SNOWFLAKE_DATABASE"],
schema=os.environ["SNOWFLAKE_SCHEMA"],
),
),
)
Default Datasets
If you are using the Astro Python SDK version 1.1 or later, you do not need to make any code updates to use Datasets
Datasets are automatically registered for any
functions with output tables in Astro Python SDK version 1.1 or later,
and you do not need to define any outlets
parameter.
Note that a default Dataset URI generated by Astro Python SDK will be in the format
astro://test_conn@?table=test_table&schema=schema&database=database
For load_file
operator,
Input Dataset - Source file for the operator.
Output Dataset - Target table of the operator.
These Datasets can be used by Airflow 2.4 and above to show data dependencies between DAGs.
Snowflake Identifier
Table
We are creating tables in uppercase. This has an impact when we run raw SQL queries. For example, if we create a table with the name customer
or Customer
or CUSTOMER
, in the query we have to use the uppercase or lowercase name CUSTOMER
or customer
without quotes. Example Select * from CUSTOMER
or Select * from customer
.
Columns
When loading data to the snowflake table from a file there are three cases concerning column names
Uppercase: When all your column names are in uppercase
Lowercase: When all your column names are in lowercase
Mixed case: When your column names are like -
List
etc.
Mixed
Run raw SQL
Warning
When we load data we are preserving the case by adding quotes to column names. We need to be aware of this behavior while running raw SQL queries. With mixed columns, you will have to add quotes for all the column names. Like
select "Name" from customer
.Example - if we try to load a below mentioned CSV File with load_file():
Name
Age
John
20
Sam
30
When we run a SQL query we have to preserve the casing by passing the identifier in quotes. For example -
SELECT "Name", "Age" FROM <table_name>
Dataframes
There is no impact when we try to convert the table into a dataframe.
Upper/Lower
Run raw SQL
For upper or lowercase we don’t have an impact, we can simply run queries without adding quotes around column names.
Dataframe
Warning
We will have lowercase col names even for uppercase names in the file. For example, if you have below mentioned file, which we load via
load_file()
operator.NAME
AGE
John
20
Sam
30
When we load this into a dataframe, you will get columns in lowercase -
name
andage
LoadOptions
If you want to provide the list of load options(specific to database or file location) while reading or loading the file pass the list of LoadOptions
. For example, there can be a simple case of passing a delimiter
while loading CSV to pandas.read_csv() method. Following are the different load options supported:
Note
load_options
will be replacingnative_support_kwargs
parameter
PandasLoadOptions
- Pandas load options for reading and loading file using pandas path for various file types:
PandasCsvLoadOptions
- Pandas load options while reading and loading csv file.
PandasJsonLoadOptions
- Pandas load options while reading and loading json file.
PandasNdjsonLoadOptions
- Pandas load options while reading and loading Ndjson file.
PandasParquetLoadOptions
- Pandas load options while reading and loading Parquet file.aql.load_file( input_file=File("s3://tmp9/delimiter_dollar.csv", conn_id="aws_conn"), output_table=Table( conn_id=SNOWFLAKE_CONN_ID, ), use_native_support=False, load_options=[PandasCsvLoadOptions(delimiter="$")], )
SnowflakeLoadOptions
- Load options to load file to snowflake using native approach.aql.load_file( input_file=File("s3://astro-sdk/python_sdk/example_dags/data/sample.csv", conn_id="aws_conn"), output_table=Table( conn_id=SNOWFLAKE_CONN_ID, ), load_options=[ SnowflakeLoadOptions( file_options={"SKIP_HEADER": 1, "SKIP_BLANK_LINES": True}, copy_options={"ON_ERROR": "CONTINUE"}, ) ], )
DeltaLoadOptions
- Load options to rendering options into COPY_INTO Databricks SQL statements.aql.load_file( input_file=File("s3://astro-sdk/python_sdk/example_dags/data/sample.csv", conn_id="aws_conn"), output_table=Table( conn_id=DATABRICKS_CONN_ID, ), load_options=[ DeltaLoadOptions( copy_into_format_options={"header": "true", "inferSchema": "true"}, copy_into_copy_options={"mergeSchema": "true"}, ) ], )