Open Lineage Integration

OpenLineage is an open-source framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata.

Configure the OpenLineage and Astro Python SDK Integration

We’ll need to specify where we want Astro Python SDK operators to send OpenLineage events. openlineage-airflow will use the OPENLINEAGE_URL environment variable to send OpenLineage events to Marquez. Optionally, we can also specify a namespace where the lineage events will be stored using the OPENLINEAGE_NAMESPACE environment variable.

A user may choose to send or not, the source code to the OpenLineage then user can specify an environment variable OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE set with either True or False. By default it will be set to True.

By default, we emit the temporary tables event in openlineage. This might be not that useful for some users who do not want to emit such event in openlineage. Such users can set the following config to False to disable it. More details can be found at Configuring to emit temp table event in openlineage.

Note

Disclaimer: Users need to trigger the DAG from the webserver. airflow dags tests would not work with open lineage.

Users can validate the openlineage in Marquez UI. Instruction to set up Marquez locally can be found here

For example, to send OpenLineage events to a local instance of Marquez with the dev namespace, use:

AIRFLOW__LINEAGE__BACKEND=openlineage.lineage_backend.OpenLineageBackend
OPENLINEAGE_URL=http://localhost:3000
OPENLINEAGE_NAMESPACE="default"
AIRFLOW__ASTRO_SDK__OPENLINEAGE_EMIT_TEMP_TABLE_EVENT=False

When you run the example DAG given below, by setting the environment variables described above,

from datetime import datetime

from airflow import DAG

from astro import sql as aql
from astro.files import File
from astro.table import Table


@aql.transform()
def top_five_animations(input_table: Table):
    return """
        SELECT title, rating
        FROM {{input_table}}
        WHERE genre1='Animation'
        ORDER BY Rating desc
        LIMIT 5;
    """


with DAG(
    "calculate_popular_movies",
    schedule_interval=None,
    start_date=datetime(2000, 1, 1),
    catchup=False,
) as dag:
    imdb_movies = aql.load_file(
        File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv"),
        output_table=Table(conn_id="sqlite_default"),
    )
    top_five_animations(
        input_table=imdb_movies,
        output_table=Table(name="top_animation"),
    )

Then you would see the Openlineage facets on Marquez/OpenLineage UI under default namespace

../_images/openlineage_1.png

Lineage from the task can be viewed by clicking on calculate_popular_movies.load_file

../_images/openlineage_2.png