astro.databases.google.bigquery
Google BigQuery table implementation.
Module Contents
Classes
Handle interactions with Bigquery databases. If this class is successful, we should not have any Bigquery-specific |
|
Create and run Datatransfer job from S3 to Bigquery |
Attributes
- astro.databases.google.bigquery.DEFAULT_CONN_ID
- astro.databases.google.bigquery.NATIVE_PATHS_SUPPORTED_FILE_TYPES
- astro.databases.google.bigquery.BIGQUERY_WRITE_DISPOSITION
- class astro.databases.google.bigquery.BigqueryDatabase(conn_id=DEFAULT_CONN_ID, table=None, load_options=None)
Bases:
astro.databases.base.BaseDatabase
Handle interactions with Bigquery databases. If this class is successful, we should not have any Bigquery-specific logic in other parts of our code-base.
- Parameters:
conn_id (str) –
table (astro.table.BaseTable | None) –
load_options (astro.options.LoadOptions | None) –
- property sql_type: str
- Return type:
str
- property default_metadata: astro.table.Metadata
Fill in default metadata values for table objects addressing bigquery databases
- Returns:
- Return type:
- DEFAULT_SCHEMA
- NATIVE_PATHS
- NATIVE_AUTODETECT_SCHEMA_CONFIG: Mapping[astro.constants.FileLocation, Mapping[str, list[astro.constants.FileType] | Callable]]
- FILE_PATTERN_BASED_AUTODETECT_SCHEMA_SUPPORTED: set[astro.constants.FileLocation]
- illegal_column_name_chars: list[str] = ['.']
- illegal_column_name_chars_replacement: list[str] = ['_']
- NATIVE_LOAD_EXCEPTIONS: Any = ()
- hook()
Retrieve Airflow hook to interface with the BigQuery database.
- Return type:
airflow.providers.google.cloud.hooks.bigquery.BigQueryHook
- sqlalchemy_engine()
Return SQAlchemy engine.
- Return type:
sqlalchemy.engine.base.Engine
- populate_table_metadata(table)
Populate the metadata of the passed Table object from the Table used in instantiation of the BigqueryDatabase or from the Default Metadata (passed in configs).
- Parameters:
table (astro.table.BaseTable) – Table for which the metadata needs to be populated
- Returns:
Modified Table
- Return type:
astro.table.BaseTable
- schema_exists(schema)
Checks if a dataset exists in the BigQuery
- Parameters:
schema (str) – Bigquery namespace
- Return type:
bool
- static get_merge_initialization_query(parameters)
Handles database-specific logic to handle constraints for BigQuery. The only constraint that BigQuery supports is NOT NULL.
- Parameters:
parameters (tuple) –
- Return type:
str
- load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.
- Parameters:
source_dataframe (pandas.DataFrame) – Local or remote filepath
target_table (astro.table.BaseTable) – Table in which the file will be loaded
if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.
chunk_size (int) – Specify the number of rows in each batch to be written at a time.
- Return type:
None
- create_schema_if_needed(schema)
This function checks if the expected schema exists in the database. If the schema does not exist, it will attempt to create it.
- Parameters:
schema (str | None) – DB Schema - a namespace that contains named objects like (tables, functions, etc)
- Return type:
None
- merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')
Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.
- Parameters:
source_table (astro.table.BaseTable) – Contains the rows to be merged to the target_table
target_table (astro.table.BaseTable) – Contains the destination table in which the rows will be merged
source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names
target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining
if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.
- Return type:
None
- is_native_autodetect_schema_available(file)
Check if native auto detection of schema is available.
- Parameters:
file (astro.files.File) – File used to check the file type of to decide whether there is a native auto detection available for it.
- Return type:
bool
- create_table_using_native_schema_autodetection(table, file)
Create a SQL table, automatically inferring the schema using the given file via native database support.
- Parameters:
table (astro.table.BaseTable) – The table to be created.
file (astro.files.File) – File used to infer the new table columns.
- Return type:
None
- is_native_load_file_available(source_file, target_table)
Check if there is an optimised path for source to destination.
- Parameters:
source_file (astro.files.File) – File from which we need to transfer data
target_table (astro.table.BaseTable) – Table that needs to be populated with file data
- Return type:
bool
- load_file_to_table_natively(source_file, target_table, if_exists='replace', native_support_kwargs=None, **kwargs)
Checks if optimised path for transfer between File location to database exists and if it does, it transfers it and returns true else false.
- Parameters:
source_file (astro.files.File) – File from which we need to transfer data
target_table (astro.table.BaseTable) – Table that needs to be populated with file data
if_exists (astro.constants.LoadExistStrategy) – Overwrite file if exists. Default False
native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow
- load_gs_file_to_table(source_file, target_table, if_exists='replace', native_support_kwargs=None, **kwargs)
Transfer data from gcs to bigquery
- Parameters:
source_file (astro.files.File) – Source file that is used as source of data
target_table (astro.table.BaseTable) – Table that will be created on the bigquery
if_exists (astro.constants.LoadExistStrategy) – Overwrite table if exists. Default ‘replace’
native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow
- load_s3_file_to_table(source_file, target_table, native_support_kwargs=None, **kwargs)
Load content of multiple files in S3 to output_table in Bigquery by using a datatransfer job Note - To use this function we need 1. Enable API on Bigquery 2. Enable Data transfer service on Bigquery, which is a chargeable service for more information refer - https://cloud.google.com/bigquery-transfer/docs/enable-transfer-service
- Parameters:
source_file (astro.files.File) – Source file that is used as source of data
target_table (astro.table.BaseTable) – Table that will be created on the bigquery
if_exists – Overwrite table if exists. Default ‘replace’
native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow
- get_project_id(target_table)
Get project id from the hook.
- Parameters:
target_table – table object that the hook is derived from.
- Return type:
str
- load_local_file_to_table(source_file, target_table, if_exists='replace', native_support_kwargs=None, **kwargs)
Transfer data from local to bigquery
- Parameters:
source_file (astro.files.File) –
target_table (astro.table.BaseTable) –
if_exists (astro.constants.LoadExistStrategy) –
native_support_kwargs (dict | None) –
- openlineage_dataset_name(table)
Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md Example: PROJECT.dataset_name.table_name
- Parameters:
table (astro.table.BaseTable) –
- Return type:
str
- openlineage_dataset_namespace()
Returns the open lineage dataset name as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md Example: bigquery
- Return type:
str
- openlineage_dataset_uri(table)
Returns the open lineage dataset uri as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
- Parameters:
table (astro.table.BaseTable) –
- Return type:
str
- class astro.databases.google.bigquery.S3ToBigqueryDataTransfer(target_table, source_file, project_id, poll_duration=1, native_support_kwargs=None, **kwargs)
Create and run Datatransfer job from S3 to Bigquery
- Parameters:
source_file (astro.files.File) – Source file that is used as source of data
target_table (astro.table.BaseTable) – Table that will be created on the bigquery
project_id (str) – Bigquery project id
poll_duration (int) – sleep duration between two consecutive job status checks. Unit - seconds. Default 1 sec.
native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow
- run()
Algo to run S3 to Bigquery datatransfer
- static get_transfer_config_id(config)
Extract transfer_config_id from TransferConfig object
- Parameters:
config (google.cloud.bigquery_datatransfer_v1.types.TransferConfig) –
- Return type:
str
- static get_run_id(config)
Extract run_id from StartManualTransferRunsResponse object
- Parameters:
config (google.cloud.bigquery_datatransfer_v1.types.StartManualTransferRunsResponse) –
- Return type:
str
- create_transfer_config()
Create bigquery transfer config on cloud
- delete_transfer_config(transfer_config_id)
Delete transfer config created on Google cloud
- Parameters:
transfer_config_id (str) –
- run_transfer_now(transfer_config_id)
Run transfer job on Google cloud
- Parameters:
transfer_config_id (str) –
- get_transfer_info(run_id, transfer_config_id)
Get transfer job info
- Parameters:
run_id (str) –
transfer_config_id (str) –