merge operator
When to use the merge
operator
Unlike the append operator, which expects data to be unique and conflict free, the merge
operator allows you to to add data to an existing table with conflict resolution techniques like ignore
and update
.
Prerequisites
The merge
operator runs different SQL queries behind the scenes based on the database used. Some databases only allow you to run these queries if there are constraints on the columns specified in the parameter target_conflict_columns
. Below is the list of supported databases and their constraint requirements.
Database |
Constraint required |
---|---|
Bigquery |
No |
Postgres |
Yes |
Snowflake |
Yes |
SQLite |
Yes |
You can create and add constraints on a table by providing them in the columns
parameter of load_file operator. Refer to the Parameters to use when loading a file to a database table section for details.
target_table_1 = aql.load_file(
input_file=File(DATA_DIR + "sample.csv"),
output_table=Table(
conn_id="bigquery",
metadata=Metadata(schema="first_table_schema"),
columns=[
Column(name="id", type_=types.Integer, primary_key=True),
Column(name="name", type_=types.String),
],
),
)
When tables have the same schema
If the source and target tables have the same schema, you should specify the complete list
or tuple
of columns to the columns
parameter. This is required so the merge
function can determine which columns might have Conflicts. You can also select a subset of columns to be part of the merge.
aql.merge(
target_table=target_table_1,
source_table=source_table,
target_conflict_columns=["id"],
columns=["id", "name"],
if_conflicts="update",
)
When tables have different schemas
If the source and target tables have different schemas, you should pass a dict
object mapping the source_table
columns to the target_table
columns.
aql.merge(
target_table=target_table_2,
source_table=source_table,
target_conflict_columns=["id"],
columns={"id": "id", "name": "name"},
if_conflicts="update",
)
Conflicts
Conflicts arise due to the target_table
having constraints on certain columns. For example, your source_table
and target_table
may have duplicate primary keys. There are three strategies for resolving merge conflicts.
ignore
This method will not update the target table with the source table data if there is a conflict.
To use this method, you should specify
if_conflicts='ignore'
and provide the column(s) with constraints totarget_conflict_columns
. For this example,target_conflict_columns=['A']
.Target A
B
1
2
3
4
Source A
B
8
2
3
9
post-merge Target table A
B
1
2
3
4
8
2
update
This method will update the target table with the source table data if there is a conflict.
To use this method, you should specify
if_conflicts='update'
and provide the column(s) with constraints totarget_conflict_columns
. For this example,target_conflict_columns=['A']
.Target A
B
1
2
3
4
Source A
B
8
2
3
9
post-merge Target table A
B
1
2
3
9
8
2
- exception
This method will raise an exception if there are any conflicts at the time of merging.
To use this method, you should specify
if_conflicts='exception'
.
Default Datasets
Input dataset - Source table for the operator.
Output dataset - Target table of the operator.