astro.databases.google.bigquery

Google BigQuery table implementation.

Module Contents

Classes

BigqueryDatabase

Handle interactions with Bigquery databases. If this class is successful, we should not have any Bigquery-specific

S3ToBigqueryDataTransfer

Create and run Datatransfer job from S3 to Bigquery

Attributes

DEFAULT_CONN_ID

NATIVE_PATHS_SUPPORTED_FILE_TYPES

BIGQUERY_WRITE_DISPOSITION

astro.databases.google.bigquery.DEFAULT_CONN_ID
astro.databases.google.bigquery.NATIVE_PATHS_SUPPORTED_FILE_TYPES
astro.databases.google.bigquery.BIGQUERY_WRITE_DISPOSITION
class astro.databases.google.bigquery.BigqueryDatabase(conn_id=DEFAULT_CONN_ID)

Bases: astro.databases.base.BaseDatabase

Handle interactions with Bigquery databases. If this class is successful, we should not have any Bigquery-specific logic in other parts of our code-base.

Parameters

conn_id (str) –

DEFAULT_SCHEMA
NATIVE_PATHS
illegal_column_name_chars :list[str] = ['.']
illegal_column_name_chars_replacement :list[str] = ['_']
NATIVE_LOAD_EXCEPTIONS :Any
property sql_type
property hook

Retrieve Airflow hook to interface with the BigQuery database.

Return type

airflow.providers.google.cloud.hooks.bigquery.BigQueryHook

property sqlalchemy_engine

Return SQAlchemy engine.

Return type

sqlalchemy.engine.base.Engine

property default_metadata

Fill in default metadata values for table objects addressing bigquery databases

Returns

Return type

astro.sql.table.Metadata

schema_exists(schema)

Checks if a dataset exists in the BigQuery

Parameters

schema (str) – Bigquery namespace

Return type

bool

static get_merge_initialization_query(parameters)

Handles database-specific logic to handle constraints for BigQuery. The only constraint that BigQuery supports is NOT NULL.

Parameters

parameters (tuple) –

Return type

str

load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters
  • source_dataframe (pandas.DataFrame) – Local or remote filepath

  • target_table (astro.sql.table.Table) – Table in which the file will be loaded

  • if_exists (astro.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type

None

merge_table(source_table, target_table, source_to_target_columns_map, target_conflict_columns, if_conflicts='exception')

Merge the source table rows into a destination table. The argument if_conflicts allows the user to define how to handle conflicts.

Parameters
  • source_table (astro.sql.table.Table) – Contains the rows to be merged to the target_table

  • target_table (astro.sql.table.Table) – Contains the destination table in which the rows will be merged

  • source_to_target_columns_map (dict[str, str]) – Dict of target_table columns names to source_table columns names

  • target_conflict_columns (list[str]) – List of cols where we expect to have a conflict while combining

  • if_conflicts (astro.constants.MergeConflictStrategy) – The strategy to be applied if there are conflicts.

Return type

None

is_native_load_file_available(source_file, target_table)

Check if there is an optimised path for source to destination.

Parameters
  • source_file (astro.files.File) – File from which we need to transfer data

  • target_table (astro.sql.table.Table) – Table that needs to be populated with file data

Return type

bool

load_file_to_table_natively(source_file, target_table, if_exists='replace', native_support_kwargs=None, **kwargs)

Checks if optimised path for transfer between File location to database exists and if it does, it transfers it and returns true else false.

Parameters
  • source_file (astro.files.File) – File from which we need to transfer data

  • target_table (astro.sql.table.Table) – Table that needs to be populated with file data

  • if_exists (astro.constants.LoadExistStrategy) – Overwrite file if exists. Default False

  • native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow

load_gs_file_to_table(source_file, target_table, if_exists='replace', native_support_kwargs=None, **kwargs)

Transfer data from gcs to bigquery

Parameters
  • source_file (astro.files.File) – Source file that is used as source of data

  • target_table (astro.sql.table.Table) – Table that will be created on the bigquery

  • if_exists (astro.constants.LoadExistStrategy) – Overwrite table if exists. Default ‘replace’

  • native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow

load_s3_file_to_table(source_file, target_table, native_support_kwargs=None, **kwargs)

Load content of multiple files in S3 to output_table in Bigquery by using a datatransfer job Note - To use this function we need 1. Enable API on Bigquery 2. Enable Data transfer service on Bigquery, which is a chargeable service for more information refer - https://cloud.google.com/bigquery-transfer/docs/enable-transfer-service

Parameters
  • source_file (astro.files.File) – Source file that is used as source of data

  • target_table (astro.sql.table.Table) – Table that will be created on the bigquery

  • if_exists – Overwrite table if exists. Default ‘replace’

  • native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow

get_project_id(target_table)

Get project id from the hook.

Parameters

target_table – table object that the hook is derived from.

Return type

str

load_local_file_to_table(source_file, target_table, if_exists='replace', native_support_kwargs=None, **kwargs)

Transfer data from local to bigquery

Parameters
  • source_file (astro.files.File) –

  • target_table (astro.sql.table.Table) –

  • if_exists (astro.constants.LoadExistStrategy) –

  • native_support_kwargs (dict | None) –

class astro.databases.google.bigquery.S3ToBigqueryDataTransfer(target_table, source_file, project_id, poll_duration=1, native_support_kwargs=None, **kwargs)

Create and run Datatransfer job from S3 to Bigquery

Parameters
  • source_file (astro.files.File) – Source file that is used as source of data

  • target_table (astro.sql.table.Table) – Table that will be created on the bigquery

  • project_id (str) – Bigquery project id

  • poll_duration (int) – sleep duration between two consecutive job status checks. Unit - seconds. Default 1 sec.

  • native_support_kwargs (dict | None) – kwargs to be used by method involved in native support flow

run()

Algo to run S3 to Bigquery datatransfer

static get_transfer_config_id(config)

Extract transfer_config_id from TransferConfig object

Parameters

config (google.cloud.bigquery_datatransfer_v1.types.TransferConfig) –

Return type

str

static get_run_id(config)

Extract run_id from StartManualTransferRunsResponse object

Parameters

config (google.cloud.bigquery_datatransfer_v1.types.StartManualTransferRunsResponse) –

Return type

str

create_transfer_config()

Create bigquery transfer config on cloud

delete_transfer_config(transfer_config_id)

Delete transfer config created on Google cloud

Parameters

transfer_config_id (str) –

run_transfer_now(transfer_config_id)

Run transfer job on Google cloud

Parameters

transfer_config_id (str) –

get_transfer_info(run_id, transfer_config_id)

Get transfer job info

Parameters
  • run_id (str) –

  • transfer_config_id (str) –