Open Lineage Integration
OpenLineage is an open-source framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata.
Configure the OpenLineage and Astro Python SDK Integration
We’ll need to specify where we want Astro Python SDK operators to send OpenLineage events. openlineage-airflow will use the
OPENLINEAGE_URL
environment variable to send OpenLineage events to Marquez. Optionally, we can also
specify a namespace where the lineage events will be stored using the OPENLINEAGE_NAMESPACE
environment variable.
A user may choose to send or not, the source code to the OpenLineage then user can specify an environment variable
OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE
set with either True
or False
. By default it will be set to True
.
For example, to send OpenLineage events to a local instance of Marquez with the dev namespace, use:
AIRFLOW__LINEAGE__BACKEND=openlineage.lineage_backend.OpenLineageBackend
OPENLINEAGE_URL=http://localhost:5000
OPENLINEAGE_NAMESPACE="dev"
When you run the example DAG given below, by setting the environment variables described above,
import os
import pandas as pd
import time
from datetime import datetime, timedelta
from airflow.decorators import dag
from astro.table import Metadata, Table
variable_list = [["a", "b", "c"], ["AA", "BB", "CC"]]
dfList = pd.DataFrame(variable_list, columns=["COL_A", "COL_B", "COL_C"])
@aql.dataframe(columns_names_capitalization="original")
def aggregate_data(df: pd.DataFrame):
new_df = df
new_df.columns = new_df.columns.str.lower()
return new_df
@dag(
start_date=datetime(2021, 1, 1),
max_active_runs=1,
schedule_interval="@daily",
default_args={
"email_on_failure": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
},
catchup=False,
)
def example_simple_dataframe():
aggregate_data(
dfList,
output_table=Table(
name="aggregated_adoptions_" + str(int(time.time())),
metadata=Metadata(
schema=os.environ["SCHEMA"], database=os.environ["DATABASE"]
),
conn_id="sqlite_default",
),
)
Then you would see the Openlineage facets on Marquez/OpenLineage UI
