Open Lineage Integration
OpenLineage is an open-source framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata.
Configure the OpenLineage and Astro Python SDK Integration
We’ll need to specify where we want Astro Python SDK operators to send OpenLineage events. openlineage-airflow will use the
OPENLINEAGE_URL
environment variable to send OpenLineage events to Marquez. Optionally, we can also
specify a namespace where the lineage events will be stored using the OPENLINEAGE_NAMESPACE
environment variable.
A user may choose to send or not, the source code to the OpenLineage then user can specify an environment variable
OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE
set with either True
or False
. By default it will be set to True
.
By default, we emit the temporary tables event in openlineage. This might be not that useful for some users who do not
want to emit such event in openlineage. Such users can set the following config to False
to disable it. More details can be found at Configuring to emit temp table event in openlineage.
Note
Disclaimer: Users need to trigger the DAG from the webserver. airflow dags tests
would not work with open lineage.
Users can validate the openlineage in Marquez UI. Instruction to set up Marquez locally can be found here
For example, to send OpenLineage events to a local instance of Marquez with the dev namespace, use:
AIRFLOW__LINEAGE__BACKEND=openlineage.lineage_backend.OpenLineageBackend
OPENLINEAGE_URL=http://localhost:3000
OPENLINEAGE_NAMESPACE="default"
AIRFLOW__ASTRO_SDK__OPENLINEAGE_EMIT_TEMP_TABLE_EVENT=False
When you run the example DAG given below, by setting the environment variables described above,
from datetime import datetime
from airflow import DAG
from astro import sql as aql
from astro.files import File
from astro.table import Table
@aql.transform()
def top_five_animations(input_table: Table):
return """
SELECT title, rating
FROM {{input_table}}
WHERE genre1='Animation'
ORDER BY Rating desc
LIMIT 5;
"""
with DAG(
"calculate_popular_movies",
schedule_interval=None,
start_date=datetime(2000, 1, 1),
catchup=False,
) as dag:
imdb_movies = aql.load_file(
File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv"),
output_table=Table(conn_id="sqlite_default"),
)
top_five_animations(
input_table=imdb_movies,
output_table=Table(name="top_animation"),
)
Then you would see the Openlineage facets on Marquez/OpenLineage UI under default
namespace

Lineage from the task can be viewed by clicking on calculate_popular_movies.load_file
