astro.databases.postgres

Postgres database implementation.

Module Contents

Classes

PostgresDatabase

Handle interactions with Postgres databases. If this class is successful, we should not have any Postgres-specific

Attributes

DEFAULT_CONN_ID

astro.databases.postgres.DEFAULT_CONN_ID
class astro.databases.postgres.PostgresDatabase(conn_id=DEFAULT_CONN_ID)

Bases: astro.databases.base.BaseDatabase

Handle interactions with Postgres databases. If this class is successful, we should not have any Postgres-specific logic in other parts of our code-base.

Parameters

conn_id (str) –

DEFAULT_SCHEMA
illegal_column_name_chars :list[str] = ['.']
illegal_column_name_chars_replacement :list[str] = ['_']
property sql_type
Return type

str

property hook

Retrieve Airflow hook to interface with the Postgres database.

Return type

airflow.providers.postgres.hooks.postgres.PostgresHook

property default_metadata

Fill in default metadata values for table objects addressing Postgres databases.

Currently, Schema is not being fetched from airflow connection for Postgres because, in Postgres, databases and schema are different concepts: https://www.postgresql.org/docs/current/ddl-schemas.html The PostgresHook only exposes schema: https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_api/airflow/providers/postgres/hooks/postgres/index.html However, implementation-wise, it seems that if the PostgresHook receives a schema during initialization, but it uses it as a database in the connection to Postgres: https://github.com/apache/airflow/blob/main/airflow/providers/postgres/hooks/postgres.py#L96

Return type

astro.sql.table.Metadata

schema_exists(schema)

Checks if a schema exists in the database

Parameters

schema – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type

bool

load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters
  • source_dataframe (pandas.DataFrame) – Local or remote filepath

  • target_table (astro.sql.table.Table) – Table in which the file will be loaded

  • if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type

None

static get_table_qualified_name(table)

Return table qualified name. This is Database-specific. For instance, in Sqlite this is the table name. In Snowflake, however, it is the database, schema and table

Parameters

table (astro.sql.table.Table) – The table we want to retrieve the qualified name for.

Return type

str

table_exists(table)

Check if a table exists in the database

Parameters

table (astro.sql.table.Table) – Details of the table we want to check that exists

Return type

bool

merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')

Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.

Parameters
  • source_table (astro.sql.table.Table) – Contains the rows to be merged to the target_table

  • target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be merged

  • source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names

  • target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.

Return type

None