astro.databases.postgres
Postgres database implementation.
Module Contents
Classes
Handle interactions with Postgres databases. If this class is successful, we should not have any Postgres-specific |
Attributes
- astro.databases.postgres.DEFAULT_CONN_ID
- class astro.databases.postgres.PostgresDatabase(conn_id=DEFAULT_CONN_ID, table=None, load_options=None)
Bases:
astro.databases.base.BaseDatabase
Handle interactions with Postgres databases. If this class is successful, we should not have any Postgres-specific logic in other parts of our code-base.
- Parameters:
conn_id (str) –
table (astro.table.BaseTable | None) –
load_options (astro.options.LoadOptions | None) –
- property sql_type: str
- Return type:
str
- property default_metadata: astro.table.Metadata
Fill in default metadata values for table objects addressing Postgres databases.
Currently, Schema is not being fetched from airflow connection for Postgres because, in Postgres, databases and schema are different concepts: https://www.postgresql.org/docs/current/ddl-schemas.html
The PostgresHook only exposes schema:
airflow.providers.postgres.hooks.postgres.PostgresHook
However, implementation-wise, it seems that if the PostgresHook receives a schema during initialization, but it uses it as a database in the connection to Postgres: https://github.com/apache/airflow/blob/main/airflow/providers/postgres/hooks/postgres.py#L96
- Return type:
- DEFAULT_SCHEMA
- illegal_column_name_chars: list[str] = ['.']
- illegal_column_name_chars_replacement: list[str] = ['_']
- hook()
Retrieve Airflow hook to interface with the Postgres database.
- schema_exists(schema)
Checks if a schema exists in the database
- Parameters:
schema – DB Schema - a namespace that contains named objects like (tables, functions, etc)
- Return type:
bool
- load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.
- Parameters:
source_dataframe (pandas.DataFrame) – Local or remote filepath
target_table (astro.table.BaseTable) – Table in which the file will be loaded
if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.
chunk_size (int) – Specify the number of rows in each batch to be written at a time.
- Return type:
None
- static get_table_qualified_name(table)
Return table qualified name. This is Database-specific. For instance, in Sqlite this is the table name. In Snowflake, however, it is the database, schema and table
- Parameters:
table (astro.table.BaseTable) – The table we want to retrieve the qualified name for.
- Return type:
str
- table_exists(table)
Check if a table exists in the database
- Parameters:
table (astro.table.BaseTable) – Details of the table we want to check that exists
- Return type:
bool
- merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')
Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.
- Parameters:
source_table (astro.table.BaseTable) – Contains the rows to be merged to the target_table
target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be merged
source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names
target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining
if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.
- Return type:
None
- static get_dataframe_from_file(file)
Get pandas dataframe file
- Parameters:
file (astro.files.File) – File path and conn_id for object stores
- openlineage_dataset_name(table)
Returns the open lineage dataset name as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md Example: schema_name.table_name
- Parameters:
table (astro.table.BaseTable) –
- Return type:
str
- openlineage_dataset_namespace()
Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md Example: postgresql://localhost:5432
- Return type:
str
- openlineage_dataset_uri(table)
Returns the open lineage dataset uri as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
- Parameters:
table (astro.table.BaseTable) –
- Return type:
str