merge operator
When to use the merge operator
Unlike the append operator, which expects data to be unique and conflict free, the merge operator allows you to to add data to an existing table with conflict resolution techniques like ignore and update.
Prerequisites
The merge operator runs different SQL queries behind the scenes based on the database used. Some databases only allow you to run these queries if there are constraints on the columns specified in the parameter target_conflict_columns. Below is the list of supported databases and their constraint requirements.
Database |
Constraint required |
|---|---|
Bigquery |
No |
Postgres |
Yes |
Snowflake |
Yes |
SQLite |
Yes |
MS SQL |
No |
MySQL |
Yes |
You can create and add constraints on a table by providing them in the columns parameter of load_file operator. Refer to the Parameters to use when loading a file to a database table section for details.
target_table_1 = aql.load_file(
input_file=File(DATA_DIR + "sample.csv"),
output_table=Table(
conn_id=ASTRO_GCP_CONN_ID,
metadata=Metadata(schema="first_table_schema"),
columns=[
Column(name="id", type_=types.Integer, primary_key=True),
Column(name="name", type_=types.String),
],
),
)
When tables have the same schema
If the source and target tables have the same schema, you should specify the complete list or tuple of columns to the columns parameter. This is required so the merge function can determine which columns might have Conflicts. You can also select a subset of columns to be part of the merge.
aql.merge(
target_table=target_table_1,
source_table=source_table,
target_conflict_columns=["id"],
columns=["id", "name"],
if_conflicts="update",
)
When tables have different schemas
If the source and target tables have different schemas, you should pass a dict object mapping the source_table columns to the target_table columns.
aql.merge(
target_table=target_table_2,
source_table=source_table,
target_conflict_columns=["id"],
columns={"id": "id", "name": "name"},
if_conflicts="update",
)
Conflicts
Conflicts arise due to the target_table having constraints on certain columns. For example, your source_table and target_table may have duplicate primary keys. There are three strategies for resolving merge conflicts.
ignore
This method will not update the target table with the source table data if there is a conflict.
To use this method, you should specify
if_conflicts='ignore'and provide the column(s) with constraints totarget_conflict_columns. For this example,target_conflict_columns=['A'].Target A
B
1
2
3
4
Source A
B
8
2
3
9
post-merge Target table A
B
1
2
3
4
8
2
update
This method will update the target table with the source table data if there is a conflict.
To use this method, you should specify
if_conflicts='update'and provide the column(s) with constraints totarget_conflict_columns. For this example,target_conflict_columns=['A'].Target A
B
1
2
3
4
Source A
B
8
2
3
9
post-merge Target table A
B
1
2
3
9
8
2
- exception
This method will raise an exception if there are any conflicts at the time of merging.
To use this method, you should specify
if_conflicts='exception'.
Default Datasets
Input dataset - Source table for the operator.
Output dataset - Target table of the operator.