merge operator

When to use the merge operator

Unlike the append operator, which expects data to be unique and conflict free, the merge operator allows you to to add data to an existing table with conflict resolution techniques like ignore and update.

Prerequisites

The merge operator runs different SQL queries behind the scenes based on the database used. Some databases only allow you to run these queries if there are constraints on the columns specified in the parameter target_conflict_columns. Below is the list of supported databases and their constraint requirements.

Database

Constraint required

Bigquery

No

Postgres

Yes

Snowflake

Yes

SQLite

Yes

MS SQL

No

You can create and add constraints on a table by providing them in the columns parameter of load_file operator. Refer to the Parameters to use when loading a file to a database table section for details.

    target_table_1 = aql.load_file(
        input_file=File(DATA_DIR + "sample.csv"),
        output_table=Table(
            conn_id="bigquery",
            metadata=Metadata(schema="first_table_schema"),
            columns=[
                Column(name="id", type_=types.Integer, primary_key=True),
                Column(name="name", type_=types.String),
            ],
        ),
    )

When tables have the same schema

If the source and target tables have the same schema, you should specify the complete list or tuple of columns to the columns parameter. This is required so the merge function can determine which columns might have Conflicts. You can also select a subset of columns to be part of the merge.

    aql.merge(
        target_table=target_table_1,
        source_table=source_table,
        target_conflict_columns=["id"],
        columns=["id", "name"],
        if_conflicts="update",
    )

When tables have different schemas

If the source and target tables have different schemas, you should pass a dict object mapping the source_table columns to the target_table columns.

    aql.merge(
        target_table=target_table_2,
        source_table=source_table,
        target_conflict_columns=["id"],
        columns={"id": "id", "name": "name"},
        if_conflicts="update",
    )

Conflicts

Conflicts arise due to the target_table having constraints on certain columns. For example, your source_table and target_table may have duplicate primary keys. There are three strategies for resolving merge conflicts.

  1. ignore

    This method will not update the target table with the source table data if there is a conflict.

    To use this method, you should specify if_conflicts='ignore' and provide the column(s) with constraints to target_conflict_columns. For this example, target_conflict_columns=['A'].

    Target

    A

    B

    1

    2

    3

    4

    Source

    A

    B

    8

    2

    3

    9

    post-merge Target table

    A

    B

    1

    2

    3

    4

    8

    2

  2. update

    This method will update the target table with the source table data if there is a conflict.

    To use this method, you should specify if_conflicts='update' and provide the column(s) with constraints to target_conflict_columns. For this example, target_conflict_columns=['A'].

    Target

    A

    B

    1

    2

    3

    4

    Source

    A

    B

    8

    2

    3

    9

    post-merge Target table

    A

    B

    1

    2

    3

    9

    8

    2

  3. exception

    This method will raise an exception if there are any conflicts at the time of merging.

    To use this method, you should specify if_conflicts='exception'.

Default Datasets

  • Input dataset - Source table for the operator.

  • Output dataset - Target table of the operator.