astro.databases.databricks.api_utils
Module Contents
Functions
|
Delete the scope we created to prevent littering the databricks secret store with one-time scopes. |
|
Uploads secrets to a scope |
|
In order to run autoloader jobs in databricks, we need to generate a python file that creates a pyspark job. |
|
Load a file into DBFS. Used to move a python file into DBFS, so we can run the jobs as pyspark jobs. |
|
Creates a databricks job and runs it to completion. |
Attributes
- astro.databases.databricks.api_utils.cwd
- astro.databases.databricks.api_utils.log
- astro.databases.databricks.api_utils.delete_secret_scope(scope_name, api_client)
Delete the scope we created to prevent littering the databricks secret store with one-time scopes.
- Parameters:
scope_name (str) – name of scope to delete
api_client (databricks_cli.sdk.api_client.ApiClient) – populated databricks client
- Return type:
None
- astro.databases.databricks.api_utils.create_secrets(scope_name, filesystem_secrets, api_client)
Uploads secrets to a scope Before we can transfer data from external file sources (s3, GCS, etc.) we first need to upload the relevant secrets to databricks, so we can use them in the autoloader config. This allows us to perform ad-hoc queries that are not dependent on existing settings.
- Parameters:
scope_name (str) – the name of the secret scope. placing all secrets in a single scope makes them easy to delete later
filesystem_secrets (Dict[str, str]) – a dictionary of k,v secrets where the key is the secret name and the value is the secret value
api_client (databricks_cli.sdk.api_client.ApiClient) – The databricks API client that has all necessary credentials
- Return type:
None
- astro.databases.databricks.api_utils.generate_file(data_source_path, table_name, source_type, output_file_path, load_options, file_type='')
In order to run autoloader jobs in databricks, we need to generate a python file that creates a pyspark job. This function uses jinja templating to generate a file with all necessary user inputs
- Parameters:
data_source_path (str) – The path where the data we’re injesting lives (e.g. the s3 bucket URL)
table_name (str) – The delta table we would like to push the data into
source_type (str) – the source location (e.g. s3)
load_options (astro.databases.databricks.load_options.DeltaLoadOptions) – any additional load options the user would like to inject into their job
output_file_path (pathlib.Path) – where the generated file should be placed.
file_type (str) – when using the COPY INTO command, Spark requires explicitly stating the data type you are loading. For individual files we can infer the file type, but if you are loading a directory please explicitly state the file type in the File object.
- Returns:
output file path
- astro.databases.databricks.api_utils.load_file_to_dbfs(local_file_path, file_name, api_client)
Load a file into DBFS. Used to move a python file into DBFS, so we can run the jobs as pyspark jobs. Currently saves the file to dbfs:/mnt/pyscripts/, but will eventually support more locations.
- Parameters:
local_file_path (pathlib.Path) – path of the file to upload
api_client (databricks_cli.sdk.api_client.ApiClient) – Databricks API client
file_name (str) –
- Returns:
path to the file in DBFS
- Return type:
pathlib.Path
- astro.databases.databricks.api_utils.create_and_run_job(api_client, file_to_run, databricks_job_name, existing_cluster_id=None, new_cluster_specs=None)
Creates a databricks job and runs it to completion.
- Parameters:
databricks_job_name (str) – Name of the job to submit
file_to_run (str) – path to file we will run with the job
api_client – databricks API client
existing_cluster_id – If you want to run this job on an existing cluster, you can give the cluster ID
new_cluster_specs – If you want to run this job on a new cluster, you can give the cluster specs as a python dictionary.
- Return type:
None