astro.sql.operators.load_file

Module Contents

Classes

LoadFileOperator

Load S3/local file into either a database or a pandas dataframe

Functions

load_file(input_file[, output_table, task_id, ...])

Load a file or bucket into either a SQL table or a pandas dataframe.

check_if_connection_exists(conn_id)

Given an Airflow connection ID, identify if it exists.

class astro.sql.operators.load_file.LoadFileOperator(input_file, output_table=None, chunk_size=DEFAULT_CHUNK_SIZE, if_exists='replace', ndjson_normalize_sep='_', use_native_support=True, native_support_kwargs=None, load_options=None, columns_names_capitalization='original', enable_native_fallback=LOAD_FILE_ENABLE_NATIVE_FALLBACK, **kwargs)

Bases: astro.sql.operators.base_operator.AstroSQLBaseOperator

Load S3/local file into either a database or a pandas dataframe

Parameters:
  • input_file (astro.files.File) – File path and conn_id for object stores

  • output_table (BaseTable | None) – Table to create

  • ndjson_normalize_sep (str) – separator used to normalize nested ndjson.

  • chunk_size (int) – Specify the number of records in each batch to be written at a time.

  • if_exists (astro.constants.LoadExistStrategy) – Overwrite file if exists. Default False.

  • use_native_support (bool) – Use native support for data transfer if available on the destination.

  • native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow

  • columns_names_capitalization (astro.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe

  • enable_native_fallback (bool | None) – Use enable_native_fallback=True to fall back to default transfer

  • load_options (list[LoadOptions] | None) –

Returns:

If output_table is passed this operator returns a Table object. If not passed, returns a dataframe.

template_fields = ['output_table', 'input_file']
execute(context)

Load an existing dataset from a supported file into a SQL table or a Dataframe.

Parameters:

context (astro.utils.compat.typing.Context) –

Return type:

BaseTable | File

load_data(input_file, context)
Parameters:
Return type:

BaseTable | pd.DataFrame

load_data_to_table(input_file, context)

Loads csv/parquet table from local/S3/GCS with Pandas. Infers SQL database type based on connection then loads table to db.

Parameters:
Return type:

astro.table.BaseTable

load_data_to_dataframe(input_file)

Loads csv/parquet file from local/S3/GCS with Pandas. Returns dataframe as no SQL table was specified

Parameters:

input_file (astro.files.File) –

Return type:

pd.DataFrame | None

get_openlineage_facets_on_complete(task_instance)

Returns the lineage data

astro.sql.operators.load_file.load_file(input_file, output_table=None, task_id=None, if_exists='replace', ndjson_normalize_sep='_', use_native_support=True, native_support_kwargs=None, columns_names_capitalization='original', enable_native_fallback=True, load_options=None, **kwargs)

Load a file or bucket into either a SQL table or a pandas dataframe.

Parameters:
  • input_file (astro.files.File) – File path and conn_id for object stores

  • output_table (BaseTable | None) – Table to create

  • task_id (str | None) – task id, optional

  • if_exists (astro.constants.LoadExistStrategy) – default override an existing Table. Options: fail, replace, append

  • ndjson_normalize_sep (str) – separator used to normalize nested ndjson. ex - {"a": {"b":"c"}} will result in: column - "a_b" where ndjson_normalize_sep = "_"

  • use_native_support (bool) – Use native support for data transfer if available on the destination.

  • native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow

  • columns_names_capitalization (astro.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe

  • enable_native_fallback (bool | None) – Use enable_native_fallback=True to fall back to default transfer

  • load_options (list[LoadOptions] | None) – load options while reading and loading file

  • kwargs (Any) –

Return type:

airflow.models.xcom_arg.XComArg

astro.sql.operators.load_file.check_if_connection_exists(conn_id)

Given an Airflow connection ID, identify if it exists. Return True if it does or raise an AirflowNotFoundException exception if it does not.

Parameters:

conn_id (str) – Airflow connection ID

Return bool:

If the connection exists, return True

Return type:

bool