astro.sql.operators.dataframe

Module Contents

Classes

DataframeOperator

Converts a SQL table into a dataframe. Users can then give a python function that takes a dataframe as

Functions

load_op_arg_table_into_dataframe(op_args, ...)

For dataframe based functions, takes any Table objects from the op_args

load_op_kwarg_table_into_dataframe(op_kwargs, ...)

For dataframe based functions, takes any Table objects from the op_kwargs

dataframe([python_callable, multiple_outputs, ...])

This decorator will allow users to write python functions while treating SQL tables as dataframes

astro.sql.operators.dataframe.load_op_arg_table_into_dataframe(op_args, python_callable, columns_names_capitalization, log)

For dataframe based functions, takes any Table objects from the op_args and converts them into local dataframes that can be handled in the python context

Parameters:
  • op_args (tuple) –

  • python_callable (Callable) –

  • columns_names_capitalization (astro.constants.ColumnCapitalization) –

  • log (logging.Logger) –

Return type:

tuple

astro.sql.operators.dataframe.load_op_kwarg_table_into_dataframe(op_kwargs, python_callable, columns_names_capitalization, log)

For dataframe based functions, takes any Table objects from the op_kwargs and converts them into local dataframes that can be handled in the python context

Parameters:
  • op_kwargs (dict) –

  • python_callable (Callable) –

  • columns_names_capitalization (astro.constants.ColumnCapitalization) –

  • log (logging.Logger) –

Return type:

dict

class astro.sql.operators.dataframe.DataframeOperator(conn_id=None, database=None, schema=None, columns_names_capitalization='original', if_exists='replace', **kwargs)

Bases: astro.sql.operators.base_operator.AstroSQLBaseOperator, airflow.decorators.base.DecoratedOperator

Converts a SQL table into a dataframe. Users can then give a python function that takes a dataframe as one of its inputs and run that python function. Once that function has completed, the result is accessible via the Taskflow API.

Parameters:
  • conn_id (str | None) – Connection to the DB that you will pull the table from

  • database (str | None) – Database for input table

  • schema (str | None) – schema for input table

  • if_exists (astro.constants.LoadExistStrategy) – Overwrite when set to “replace” else “append”.

  • warehouse – (Snowflake) Which warehouse to use for the input table

  • columns_names_capitalization (astro.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe

  • kwargs – Any keyword arguments supported by the BaseOperator is supported (e.g queue, owner)

Returns:

If raw_sql is true, we return the result of the handler function, otherwise we will return the generated output_table.

execute(context)

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Parameters:

context (astro.utils.compat.typing.Context) –

Return type:

astro.sql.table.Table | pandas.DataFrame | list

get_openlineage_facets_on_complete(task_instance)

Collect the input, output, job and run facets for DataframeOperator

get_source_code(py_callable)

Return the source code for the lineage

Parameters:

py_callable (Callable) –

Return type:

str | None

astro.sql.operators.dataframe.dataframe(python_callable=None, multiple_outputs=None, conn_id='', database=None, schema=None, columns_names_capitalization='original', if_exists='replace', **kwargs)

This decorator will allow users to write python functions while treating SQL tables as dataframes

This decorator allows a user to run python functions in Airflow but with the huge benefit that SQL tables will automatically be turned into dataframes and resulting dataframes can automatically used in astro.sql functions

Parameters:
  • python_callable (Callable | None) – This parameter is filled in automatically when you use the dataframe function as a decorator. This is where the python function gets passed to the wrapping function

  • multiple_outputs (bool | None) – If set to True, the decorated function’s return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.

  • conn_id (str) – Connection ID for the database you want to connect to. If you do not pass in a value for this object we can infer the connection ID from the first table passed into the python_callable function. (required if there are no table arguments)

  • database (str | None) – Database within the SQL instance you want to access. If left blank we will default to the table.metadata.database in the first Table passed to the function (required if there are no table arguments)

  • schema (str | None) – Schema within the SQL instance you want to access. If left blank we will default to the table.metadata.schema in the first Table passed to the function (required if there are no table arguments)

  • columns_names_capitalization (astro.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe

  • if_exists (astro.constants.LoadExistStrategy) – Overwrite when set to “replace” else “append”

  • kwargs (Any) – Any keyword arguments supported by the BaseOperator is supported (e.g queue, owner)

Return type:

airflow.decorators.base.TaskDecorator