astro.databases.databricks.api_utils

Module Contents

Functions

delete_secret_scope(scope_name, api_client)

Delete the scope we created to prevent littering the databricks secret store with one-time scopes.

create_secrets(scope_name, filesystem_secrets, api_client)

Uploads secrets to a scope

generate_file(data_source_path, table_name, ...[, ...])

In order to run autoloader jobs in databricks, we need to generate a python file that creates a pyspark job.

load_file_to_dbfs(local_file_path, file_name, api_client)

Load a file into DBFS. Used to move a python file into DBFS, so we can run the jobs as pyspark jobs.

create_and_run_job(api_client, file_to_run, ...[, ...])

Creates a databricks job and runs it to completion.

Attributes

cwd

log

astro.databases.databricks.api_utils.cwd
astro.databases.databricks.api_utils.log
astro.databases.databricks.api_utils.delete_secret_scope(scope_name, api_client)

Delete the scope we created to prevent littering the databricks secret store with one-time scopes.

Parameters:
  • scope_name (str) – name of scope to delete

  • api_client (databricks_cli.sdk.api_client.ApiClient) – populated databricks client

Return type:

None

astro.databases.databricks.api_utils.create_secrets(scope_name, filesystem_secrets, api_client)

Uploads secrets to a scope Before we can transfer data from external file sources (s3, GCS, etc.) we first need to upload the relevant secrets to databricks, so we can use them in the autoloader config. This allows us to perform ad-hoc queries that are not dependent on existing settings.

Parameters:
  • scope_name (str) – the name of the secret scope. placing all secrets in a single scope makes them easy to delete later

  • filesystem_secrets (Dict[str, str]) – a dictionary of k,v secrets where the key is the secret name and the value is the secret value

  • api_client (databricks_cli.sdk.api_client.ApiClient) – The databricks API client that has all necessary credentials

Return type:

None

astro.databases.databricks.api_utils.generate_file(data_source_path, table_name, source_type, output_file_path, load_options, file_type='')

In order to run autoloader jobs in databricks, we need to generate a python file that creates a pyspark job. This function uses jinja templating to generate a file with all necessary user inputs

Parameters:
  • data_source_path (str) – The path where the data we’re injesting lives (e.g. the s3 bucket URL)

  • table_name (str) – The delta table we would like to push the data into

  • source_type (str) – the source location (e.g. s3)

  • load_options (astro.databases.databricks.load_options.DeltaLoadOptions) – any additional load options the user would like to inject into their job

  • output_file_path (pathlib.Path) – where the generated file should be placed.

  • file_type (str) – when using the COPY INTO command, Spark requires explicitly stating the data type you are loading. For individual files we can infer the file type, but if you are loading a directory please explicitly state the file type in the File object.

Returns:

output file path

astro.databases.databricks.api_utils.load_file_to_dbfs(local_file_path, file_name, api_client)

Load a file into DBFS. Used to move a python file into DBFS, so we can run the jobs as pyspark jobs. Currently saves the file to dbfs:/mnt/pyscripts/, but will eventually support more locations.

Parameters:
  • local_file_path (pathlib.Path) – path of the file to upload

  • api_client (databricks_cli.sdk.api_client.ApiClient) – Databricks API client

  • file_name (str) –

Returns:

path to the file in DBFS

Return type:

pathlib.Path

astro.databases.databricks.api_utils.create_and_run_job(api_client, file_to_run, databricks_job_name, existing_cluster_id=None, new_cluster_specs=None)

Creates a databricks job and runs it to completion.

Parameters:
  • databricks_job_name (str) – Name of the job to submit

  • file_to_run (str) – path to file we will run with the job

  • api_client – databricks API client

  • existing_cluster_id – If you want to run this job on an existing cluster, you can give the cluster ID

  • new_cluster_specs – If you want to run this job on a new cluster, you can give the cluster specs as a python dictionary.

Return type:

None