astro.sql.operators.load_file

Module Contents

Functions

load_file(input_file[, output_table, task_id, ...])

Load a file or bucket into either a SQL table or a pandas dataframe.

Classes

LoadFile

Load S3/local file into either a database or a pandas dataframe

astro.sql.operators.load_file.load_file(input_file, output_table=None, task_id=None, if_exists='replace', ndjson_normalize_sep='_', **kwargs)

Load a file or bucket into either a SQL table or a pandas dataframe.

Parameters
  • input_file (astro.files.File) – File path and conn_id for object stores

  • output_table (Optional[astro.sql.table.Table]) – Table to create

  • task_id (Optional[str]) – task id, optional

  • if_exists (astro.constants.LoadExistStrategy) – default override an existing Table. Options: fail, replace, append

  • ndjson_normalize_sep (str) –

    separator used to normalize nested ndjson. ex - {“a”: {“b”:”c”}} will result in

    column - “a_b” where ndjson_normalize_sep = “_”

  • kwargs (Any) –

Return type

airflow.models.xcom_arg.XComArg

class astro.sql.operators.load_file.LoadFile(input_file, output_table=None, chunk_size=DEFAULT_CHUNK_SIZE, if_exists='replace', ndjson_normalize_sep='_', **kwargs)

Bases: airflow.models.BaseOperator

Load S3/local file into either a database or a pandas dataframe

Parameters
  • input_file (astro.files.File) – File path and conn_id for object stores

  • output_table (Optional[astro.sql.table.Table]) – Table to create

  • ndjson_normalize_sep (str) – separator used to normalize nested ndjson.

  • chunk_size (int) – Specify the number of records in each batch to be written at a time.

  • if_exists (astro.constants.LoadExistStrategy) – Overwrite file if exists. Default False.

Returns

If output_table is passed this operator returns a Table object. If not passed, returns a dataframe.

template_fields = ['output_table', 'input_file']
execute(context)

Load an existing dataset from a supported file into a SQL table or a Dataframe.

Parameters

context (Any) –

Return type

Union[astro.sql.table.Table, pandas.DataFrame]

load_data(input_file)
Parameters

input_file (astro.files.File) –

Return type

Union[astro.sql.table.Table, pandas.DataFrame]

load_data_to_table(input_file)

Loads csv/parquet table from local/S3/GCS with Pandas. Infers SQL database type based on connection then loads table to db.

Parameters

input_file (astro.files.File) –

Return type

astro.sql.table.Table

load_data_to_dataframe(input_file)

Loads csv/parquet file from local/S3/GCS with Pandas. Returns dataframe as no SQL table was specified

Parameters

input_file (astro.files.File) –

Return type

Optional[pandas.DataFrame]

static _populate_normalize_config(database, ndjson_normalize_sep='_')

Validate pandas json_normalize() parameter for databases, since default params result in invalid column name. Default parameter result in the columns name containing ‘.’ char.

Parameters
Return type

Dict[str, str]