API Reference
This part of the project documentation focuses on an information-oriented approach. Use it as a
reference for the technical implementation of therdsa-utils codebase.
General
rdsa_utils.exceptions
Common custom exceptions that can be raised in pipelines.
The purpose of these is to provide a clearer indication of why an error is
being raised over the standard builtin errors (e.g. ColumnNotInDataframeError
vs ValueError).
ColumnNotInDataframeError
Bases: Exception
Custom exception to raise when a column is not present in dataframe.
ConfigError
Bases: Exception
Custom exception to raise when there is an issue in a config object.
DataframeEmptyError
Bases: Exception
Custom exception to raise when a dataframe is empty.
InvalidBucketNameError
Bases: Exception
Custom exception to raise when an AWS S3 or GCS bucket name is invalid.
InvalidS3FilePathError
Bases: Exception
Custom exception to raise when an AWS S3 file path is invalid.
PipelineError
Bases: Exception
Custom exception to raise when there is a generic pipeline issue.
TableNotFoundError
Bases: Exception
Custom exception to raise when a table to be read is not found.
rdsa_utils.logging
Contains the logging configuration for files and method to initialise it.
add_warning_message_to_function(_func: Callable = None, *, message: Optional[str] = None) -> Callable
Apply decorator to log a warning message.
If a message is passed, this decorator adds a warning log of the form function_name: message
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Optional[str]
|
The message to be logged along with the function name. |
None
|
Notes
Explainer on complex decorators (and template for decorator structure): https://realpython.com/primer-on-python-decorators/#both-please-but-never-mind-the-bread
Usage
To use decorator to log a warning:
>>> @_add_warning_message_to_function(message='here be dragons...')
>>> def my_func(some_args, some_other_args):
>>> ...
>>>
>>> some_output = my_func(...)
Warning my_func: here be dragons...
init_logger_advanced(log_level: int, handlers: Optional[List[logging.Handler]] = None, log_format: str = None, date_format: str = None) -> None
Set up the root logger with specified settings, avoiding duplicate handlers.
This function initialises the root logger at the desired log_level, applies a consistent message and date format, and attaches any provided handlers. If the root logger already has handlers (e.g., from a previous call or another module), it returns immediately to prevent duplicate setup.
Recommended usage: call this function once during application startup—
ideally in your package’s top-level __init__.py —- to enforce consistent
logging configuration across all modules.
After initialisation, modules should obtain their
own logger via: logger = logging.getLogger(__name__)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_level
|
int
|
The logging level (e.g., logging.INFO, logging.DEBUG) for all messages emitted by the root logger. |
required |
handlers
|
Optional[List[Handler]]
|
A list of handler instances (e.g., StreamHandler, FileHandler) to attach.
If None or empty, |
None
|
log_format
|
str
|
A |
None
|
date_format
|
str
|
A strftime format string for timestamps. Defaults to: "%Y-%m-%d %H:%M:%S". |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If any item in handlers is not an instance of |
Notes
- Checks
root_logger.hasHandlers(): if True, exits without changes. - Sets the root logger's level to log_level before adding handlers.
- If no handlers provided, calls
logging.basicConfigafter formatter setup.
Examples:
>>> import logging
>>> import sys
>>> from rdsa_utils.logging import init_logger_advanced
>>> # Initialise once in your package’s __init__.py
>>> init_logger_advanced(
... log_level=logging.INFO,
... handlers=[logging.StreamHandler(sys.stdout)],
... log_format="%(levelname)s: %(message)s",
... date_format="%H:%M:%S"
... )
>>> # In other modules, get a named logger
>>> logger = logging.getLogger(__name__)
init_logger_basic(log_level: int) -> None
Instantiate a basic logger object to be used across modules.
By using this function to instantiate the logger, you also have access to
logger.dev for log_level=15, as this is defined in the same module scope
as this function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_level
|
int
|
The level of logging to be recorded. Can be defined either as the
integer level or the logging. |
required |
Returns:
| Type | Description |
|---|---|
None
|
The logger created by this function is available in any other modules
by using |
log_dev(self, message, *args, **kwargs)
Create a custom log level between INFO and DEBUG named DEV.
This is lifted from: https://stackoverflow.com/a/13638084
log_rows_in_spark_df(func: Callable) -> Callable
Apply decorator to log dataframe row count before and after a function.
Requires that the function being decorated has a parameter called df and
that the function is called with df being a keyword argument (e.g.
df=df). If not the decorator will report back that it could not count the
number of rows of the dataframe before running the decorated function.
Usage
@log_rows_in_spark_df
def my_func_that_changes_no_rows(some_args, df, some_other_args):
...
returns final_df
some_df = my_func_that_changes_no_rows(
some_args='hello',
df=input_df,
some_other_args='world'
)
>>> Rows in dataframe before my_func_that_changes_no_rows : 12345
>>> Rows in dataframe after my_func_that_changes_no_rows : 6789
Warning:
.count() is an expensive spark operation to perform. Overuse of this
decorator can be detrimental to performance. This decorator will cache the
input dataframe prior to running the count and decorated function, as well
as persisting the output dataframe prior to counting. The input dataframe
is also unpersisted from memory prior to the decorator completing.
log_spark_df_schema(_func: Callable = None, *, log_schema_on_input: bool = True) -> Callable
Apply decorator to log dataframe schema before and after a function.
If you use the `df.printSchema() method directly in a print/log statement the code is processed and printed regardless of logging leve. Instead you need to capture the output and pass this to the logger. See explanaition here - https://stackoverflow.com/a/59935109
Requires that the function being decorated has a parameter called df and
that the function is called with df being a keyword argument (e.g.
df=df). If not the decorator will report back that it could not count the
number of rows of the dataframe before running the decorated function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_schema_on_input
|
bool
|
If set to false, then no schema is attempted to be printed for the decorated function on input. This is useful for instance where function has no df input but does return one (such as when reading a table). |
True
|
Notes
Explainer on complex decorators (and template for decorator structure): https://realpython.com/primer-on-python-decorators/#both-please-but-never-mind-the-bread
Usage
To use decorator to record input and output schema:
>>> @log_spark_df_schema
>>> def my_func_that_changes_some_columns(some_args, df, some_other_args):
>>> ...
>>> returns final_df
>>>
>>> some_df = my_func_that_changes_some_columns(
>>> some_args='hello',
>>> df=input_df,
>>> some_other_args='world'
>>> )
Schema of dataframe before my_func_that_changes_some_columns:
root
|-- price: double (nullable = true)
|-- quantity: long (nullable = true)
Schema of dataframe after my_func_that_changes_some_columns:
root
|-- price: double (nullable = true)
|-- quantity: long (nullable = true)
|-- expenditure: double (nullable = true)
To use decorator to record output schema only:
>>> @log_spark_df_schema(log_schema_on_input=False)
>>> def my_func_that_changes_some_columns(some_args, df, some_other_args):
>>> ...
>>> returns final_df
>>>
>>> some_df = my_func_that_changes_some_columns(
>>> some_args='hello',
>>> df=input_df,
>>> some_other_args='world'
>>> )
Not printing schema of dataframe before my_func_that_changes_some_columns
Schema of dataframe after my_func_that_changes_some_columns:
root
|-- price: double (nullable = true)
|-- quantity: long (nullable = true)
|-- expenditure: double (nullable = true)
print_full_table_and_raise_error(df: pd.DataFrame, message: str, stop_pipeline: bool = False, show_records: bool = False) -> None
Output dataframe records to logger.
The purpose of this function is to enable a user to output a message to the logger with the added functionality of stopping the pipeline and showing dataframe records in a table format. It may be used for instance if a user wants to check the records in a dataframe when it expected to be empty.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The dataframe to display records from. |
required |
message
|
str
|
The message to output to the logger. |
required |
stop_pipeline
|
bool
|
Switch for the user to stop the pipeline and raise an error. |
False
|
show_records
|
bool
|
Switch to show records in a dataframe. |
False
|
Returns:
| Type | Description |
|---|---|
None
|
Displays message to user however nothing is returned from function. |
Raises:
| Type | Description |
|---|---|
ValueError
|
Raises error and stops pipeline if switch applied. |
timer_args(name: str, logger: Optional[Callable[[str], None]] = logger.info) -> Dict[str, str]
Initialise timer args workaround for 'text' args in codetiming package.
Works with codetiming==1.4.0
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name of the specific timer log. |
required |
logger
|
Optional[Callable[[str], None]]
|
Optional logger function that can accept a string argument. |
info
|
Returns:
| Type | Description |
|---|---|
Dict[str, str]
|
Dictionary of arguments to pass to specifc codetiming package Timer. |
rdsa_utils.test_utils
Functions and fixtures used with test suites.
Case(label: Optional[str] = None, marks: Optional[MarkDecorator] = None, **kwargs)
Container for a test case, with optional test ID.
The Case class is to be used in conjunction with parameterize_cases.
Attributes:
| Name | Type | Description |
|---|---|---|
label |
marks Optional pytest marks to denote any tests to skip etc. kwargs Parameters used for the test cases. |
Examples:
>>> Case(label="some test name", foo=10, bar="some value")
>>> Case(
>>> label="some test name",
>>> marks=pytest.mark.skip(reason='not implemented'),
>>> foo=10,
>>> bar="some value"
>>> )
See Also
Modified from https://github.com/ckp95/pytest-parametrize-cases to allow pytest mark usage.
Initialise objects.
__repr__() -> str
Return string.
create_dataframe(data: List[Tuple[str]], **kwargs) -> pd.DataFrame
Create pandas df from tuple data with a header.
create_spark_df(spark_session)
Create Spark DataFrame from tuple data with first row as schema.
Example:
create_spark_df([ ('column1', 'column2', 'column3'), ('aaaa', 1, 1.1) ])
Can specify the schema alongside the column names: create_spark_df([ ('column1 STRING, column2 INT, column3 DOUBLE'), ('aaaa', 1, 1.1) ])
parametrize_cases(*cases: Case)
More user friendly parameterize cases testing.
Utilise as a decorator on top of test function.
Examples:
@parameterize_cases(
Case(
label="some test name",
foo=10,
bar="some value"
),
Case(
label="some test name #2",
foo=20,
bar="some other value"
),
)
def test(foo, bar):
...
See Also
Source: https://github.com/ckp95/pytest-parametrize-cases
spark_session()
Set up spark session fixture.
suppress_py4j_logging()
Suppress spark logging.
to_date(dt: str) -> datetime.date
Convert date string to datetime.date type.
to_datetime(dt: str) -> datetime.datetime
Convert datetime string to datetime.datetime type.
to_spark(spark_session)
Convert pandas df to spark.
rdsa_utils.typing
Contains custom types for type hinting.
rdsa_utils.validation
Functions that support the use of pydantic validators.
allowed_date_format(date: str) -> str
Ensure that the date string can be converted to a useable datetime.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
date
|
str
|
The specified date string. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The input date. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the date is not one of the predefined allowed formats. |
apply_validation(config: Mapping[str, Any], Validator: Optional[BaseModel]) -> Mapping[str, Any]
Apply validation model to config.
If no Validator is passed, then a warning will be logged and the input config returned without validation. This mechanism is to allow the use of this function to aid in tracking config sections that are unvalidated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Mapping[str, Any]
|
The config for validating. |
required |
Validator
|
Optional[BaseModel]
|
Validator class for the config. |
required |
optional
|
Optional[BaseModel]
|
Validator class for the config. |
required |
Returns:
| Type | Description |
|---|---|
Mapping[str, Any]
|
The input config after being passed through the validator. |
list_convert_validator(*args, **kwargs) -> Callable
Wrapper to set kwargs for list_convert validator.
CDP
rdsa_utils.cdp.helpers.s3_utils
Utility functions for interacting with AWS S3.
To initialise a boto3 client for S3 and configure it with Ranger RAZ and SSL certificate, you can use the following code snippet:
import boto3
import raz_client
ssl_file_path = "/path/to/your/ssl_certificate.crt"
# Create a boto3 client for S3
client = boto3.client("s3")
# Configure the client with RAZ and SSL certificate
raz_client.configure_ranger_raz(client, ssl_file=ssl_file_path)
Note:
- The `raz-client` library is required only when running in a
managed Cloudera environment.
- You can install it using `pip install raz-client` when needed.
check_file(client: boto3.client, bucket_name: str, object_name: str) -> bool
Check if a file exists in an S3 bucket and meets specific criteria.
Verifies that the given path corresponds to a file in an S3 bucket, ensuring it exists, is not a directory, and has a size greater than 0.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
object_name
|
str
|
The path to a file in s3 bucket. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the file exists, is not a directory, and size > 0, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> check_file(client, 'mybucket', 'folder/file.txt')
True
>>> check_file(client, 'mybucket', 'folder/nonexistent_file.txt')
False
>>> check_file(client, 'mybucket', 'folder/')
False
copy_file(client: boto3.client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str, overwrite: bool = False) -> bool
Copy a file from one aWS S3 bucket to another.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
source_bucket_name
|
str
|
The name of the source bucket. |
required |
source_object_name
|
str
|
The S3 object name of the source file. |
required |
destination_bucket_name
|
str
|
The name of the destination bucket. |
required |
destination_object_name
|
str
|
The S3 object name of the destination file. |
required |
overwrite
|
bool
|
If True, overwrite the destination file if it already exists. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the file was copied successfully, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> copy_file(
... client,
... 'source-bucket',
... 'source_file.txt',
... 'destination-bucket',
... 'destination_file.txt'
... )
True
create_folder(client: boto3.client, bucket_name: str, folder_path: str) -> bool
Create a folder in an AWS S3 bucket if it doesn't already exist.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the bucket where the folder will be created. |
required |
folder_path
|
str
|
The name of the folder to create. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the folder was created successfully or already exists, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> create_folder(client, 'mybucket', 'new_folder/')
True
create_s3_uri(bucket: str, key: str, scheme: str = 's3') -> str
Create an S3 URI from a bucket, key, and scheme.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket
|
str
|
The S3 bucket name. |
required |
key
|
str
|
The S3 object key. |
required |
scheme
|
str
|
The URI scheme to use ('s3' or 's3a'). Default is "s3". |
's3'
|
Returns:
| Type | Description |
|---|---|
str
|
The formatted S3 URI. |
Examples:
>>> create_s3_uri("my-bucket", "folder/file.txt")
's3://my-bucket/folder/file.txt'
>>> create_s3_uri("my-bucket", "folder/file.txt", scheme="s3a")
's3a://my-bucket/folder/file.txt'
delete_file(client: boto3.client, bucket_name: str, object_name: str, overwrite: bool = False) -> bool
Delete a file from an AWS S3 bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the bucket from which the file will be deleted. |
required |
object_name
|
str
|
The S3 object name of the file to delete. |
required |
overwrite
|
bool
|
If False, the function will not delete the file if it does not exist; set to True to ignore non-existence on delete. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the file was deleted successfully, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> delete_file(client, 'mybucket', 'folder/s3_file.txt')
True
delete_folder(client: boto3.client, bucket_name: str, folder_path: str) -> bool
Delete a folder in an AWS S3 bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
folder_path
|
str
|
The path of the folder to delete. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the folder was deleted successfully, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> delete_folder(client, 'mybucket', 'path/to/folder/')
True
delete_old_objects_and_folders(client: boto3.client, bucket_name: str, prefix: str, age: str, dry_run: bool = False) -> bool
Delete objects and folders in an S3 bucket that are older than a specified age.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
prefix
|
str
|
The prefix to filter objects. |
required |
age
|
str
|
The age threshold for deleting objects. Supported formats: - "1 day", "2 days", etc. - "1 week", "2 weeks", etc. - "1 month", "2 months", etc. |
required |
dry_run
|
bool
|
If True, the function will only log the objects and folders that would be deleted, without actually performing the deletion. Default is False. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the objects and folders were (or would be) deleted successfully, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> # This will actually delete objects:
>>> delete_old_objects_and_folders(client, 'mybucket', 'folder/', '1 week')
True
>>> # This will only log the objects/folders to be deleted:
>>> delete_old_objects_and_folders(
... client,
... 'mybucket',
... 'folder/',
... '1 week',
... dry_run=True
... )
True
download_file(client: boto3.client, bucket_name: str, object_name: str, local_path: str, overwrite: bool = False) -> bool
Download a file from an AWS S3 bucket to a local directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket from which to download the file. |
required |
object_name
|
str
|
The S3 object name of the file to download. |
required |
local_path
|
str
|
The local file path where the downloaded file will be saved. |
required |
overwrite
|
bool
|
If True, overwrite the local file if it exists. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the file was downloaded successfully, False otherwise. |
Examples:
>>> client = boto3.client('s3')
>>> download_file(
... client,
... 'mybucket',
... 'folder/s3_file.txt',
... '/path/to/download.txt'
... )
True
download_folder(client: boto3.client, bucket_name: str, prefix: str, local_path: str, overwrite: bool = False) -> bool
Download a folder from an AWS S3 bucket to a local directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket from which to download the folder. |
required |
prefix
|
str
|
The S3 prefix of the folder to download. |
required |
local_path
|
str
|
The local directory path where the downloaded folder will be saved. |
required |
overwrite
|
bool
|
If True, overwrite existing local files if they exist. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the folder was downloaded successfully, False otherwise. |
Examples:
>>> client = boto3.client('s3')
>>> download_folder(
... client,
... 'mybucket',
... 'folder/subfolder/',
... '/path/to/local_folder',
... overwrite=False
... )
True
file_exists(client: boto3.client, bucket_name: str, object_name: str) -> bool
Check if a specific file exists in an AWS S3 bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
object_name
|
str
|
The S3 object name to check for existence. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the file exists, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> file_exists(client, 'mybucket', 'folder/file.txt')
True
file_size(client: boto3.client, bucket_name: str, object_name: str) -> int
Check the size of a file in an AWS S3 bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
object_name
|
str
|
The S3 object name to check for size. |
required |
Returns:
| Type | Description |
|---|---|
int
|
An integer value indicating the size of the file in bytes. |
Examples:
>>> client = boto3.client('s3')
>>> file_size(client, 'mybucket', 'folder/file.txt')
8
is_s3_directory(client: boto3.client, bucket_name: str, object_name: str) -> bool
Check if an AWS S3 key is a directory by listing its contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
object_name
|
str
|
The S3 object name to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the key represents a directory, False otherwise. |
list_files(client: boto3.client, bucket_name: str, prefix: str = '') -> List[str]
List files in an AWS S3 bucket that match a specific prefix.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
prefix
|
str
|
The prefix to filter files, by default "". |
''
|
Returns:
| Type | Description |
|---|---|
List[str]
|
A list of S3 object keys matching the prefix. |
Examples:
>>> client = boto3.client('s3')
>>> list_files(client, 'mybucket', 'folder_prefix/')
['folder_prefix/file1.txt', 'folder_prefix/file2.txt']
load_csv(client: boto3.client, bucket_name: str, filepath: str, keep_columns: Optional[List[str]] = None, rename_columns: Optional[Dict[str, str]] = None, drop_columns: Optional[List[str]] = None, **kwargs) -> pd.DataFrame
Load a CSV file from an S3 bucket into a Pandas DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
filepath
|
str
|
The key (full path and filename) of the CSV file in the S3 bucket. |
required |
keep_columns
|
Optional[List[str]]
|
A list of column names to keep in the DataFrame, dropping all others. Default value is None. |
None
|
rename_columns
|
Optional[Dict[str, str]]
|
A dictionary to rename columns where keys are existing column names and values are new column names. Default value is None. |
None
|
drop_columns
|
Optional[List[str]]
|
A list of column names to drop from the DataFrame. Default value is None. |
None
|
kwargs
|
Additional keyword arguments to pass to the |
{}
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
Pandas DataFrame containing the data from the CSV file. |
Raises:
| Type | Description |
|---|---|
InvalidBucketNameError
|
If the bucket name does not meet AWS specifications. |
InvalidS3FilePathError
|
If the file_path contains an S3 URI scheme like 's3://' or 's3a://'. |
Exception
|
If there is an error loading the file. |
ValueError
|
If a column specified in rename_columns, drop_columns, or keep_columns is not found in the DataFrame. |
Notes
Transformation order:
1. Columns are kept according to keep_columns.
2. Columns are dropped according to drop_columns.
3. Columns are renamed according to rename_columns.
Examples:
Load a CSV file and rename columns:
>>> df = load_csv(
client,
"my-bucket",
"path/to/file.csv",
rename_columns={"old_name": "new_name"}
)
Load a CSV file and keep only specific columns:
>>> df = load_csv(
client,
"my-bucket",
"path/to/file.csv",
keep_columns=["col1", "col2"]
)
Load a CSV file and drop specific columns:
>>> df = load_csv(
client,
"my-bucket",
"path/to/file.csv",
drop_columns=["col1", "col2"]
)
Load a CSV file with custom delimiter:
>>> df = load_csv(
client,
"my-bucket",
"path/to/file.csv",
sep=";"
)
load_json(client: boto3.client, bucket_name: str, filepath: str, encoding: Optional[str] = 'utf-8', multi_line: bool = False) -> Union[Dict, List[Dict]]
Load a JSON file from an S3 bucket, with optional line-by-line parsing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
filepath
|
str
|
The key (full path and filename) of the JSON file in the S3 bucket. |
required |
encoding
|
Optional[str]
|
The encoding of the JSON file. |
'utf-8'
|
multi_line
|
bool
|
If True, reads the JSON file line by line, treating each line as a separate JSON object. |
False
|
Returns:
| Type | Description |
|---|---|
Union[Dict, List[Dict]]
|
|
Raises:
| Type | Description |
|---|---|
InvalidBucketNameError
|
If the bucket name is invalid according to AWS rules. |
Exception
|
If there is an error loading the file from S3 or parsing the JSON. |
Examples:
>>> client = boto3.client('s3')
>>> data = load_json(client, 'my-bucket', 'path/to/file.json')
>>> print(data)
{
"name": "John",
"age": 30,
"city": "Manchester"
}
>>> log_data = load_json(client, 'my-bucket', 'path/to/log.json', multi_line=True)
>>> print(log_data)
[{'event': 'start', 'timestamp': '2025-02-18T12:00:00Z'}, ...]
md5_sum(client: boto3.client, bucket_name: str, object_name: str) -> str
Get md5 hash of a specific object on s3.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
object_name
|
str
|
The S3 object name to create md5 hash from. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A string value with the MD5 hash of the object data. |
Examples:
>>> client = boto3.client('s3')
>>> md5_sum(client, 'mybucket', 'folder/file.txt')
"d41d8cd98f00b204e9800998ecf8427e"
move_file(client: boto3.client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str) -> bool
Move a file within or between AWS S3 buckets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
source_bucket_name
|
str
|
The name of the source S3 bucket. |
required |
source_object_name
|
str
|
The S3 object name of the source file. |
required |
destination_bucket_name
|
str
|
The name of the destination S3 bucket. |
required |
destination_object_name
|
str
|
The S3 object name of the destination file. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the file was moved successfully, False otherwise. |
Examples:
>>> client = boto3.client('s3')
>>> move_file(
... client,
... 'sourcebucket',
... 'source_folder/file.txt',
... 'destbucket',
... 'dest_folder/file.txt'
... )
True
read_header(client: boto3.client, bucket_name: str, object_name: str) -> str
Read the first line of a file on s3.
Gets the entire file using boto3 get_objects, converts its body into an input stream, reads the first line and remove the carriage return character (backslash-n) from the end.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
object_name
|
str
|
The S3 object name to read header from. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Returns the first line of the file. |
Examples:
>>> client = boto3.client('s3')
>>> read_header(client, 'mybucket', 'folder/file.txt')
"First line"
remove_leading_slash(text: str) -> str
Remove the leading forward slash from a string if present.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
The text from which the leading slash will be removed. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The text stripped of its leading slash. |
Examples:
>>> remove_leading_slash('/example/path')
'example/path'
s3_walk(client: boto3.client, bucket_name: str, prefix: str) -> Dict
Traverse an S3 bucket and return its structure in a dictionary format.
Mimics the functionality of os.walk in s3 bucket using long filenames with slashes. Recursively goes through the long filenames and splits it into subdirectories, and "files" - short file names.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
prefix
|
str
|
The prefix of the object to start the walk from. |
required |
Returns:
| Type | Description |
|---|---|
Dict
|
A dictionary representing the bucket structure where: - Keys are directory paths ending with '/' - Values are tuples of (set(subdirectories), set(files)) where: - subdirectories: a set of directory names ending with '/' - files: a set of file paths |
Examples:
>>> client = boto3.client('s3')
>>> # For a bucket with files: file5.txt, folder1/file1.txt, folder1/file2.txt,
>>> # folder1/subfolder1/file3.txt, and folder2/file4.txt
>>> s3_walk(client, 'test-bucket', '')
{
'': ({"folder1/", "folder2/"}, {"file5.txt"}),
'folder1/': (set(), {"folder1/"}),
'folder2/': (set(), {"folder2/"})
}
>>> # When using a specific prefix
>>> s3_walk(client, 'test-bucket', 'folder1/')
{
'folder1/': ({"subfolder1/"}, {"folder1/file1.txt", "folder1/file2.txt"}),
'folder1/subfolder1/': (set(), {"folder1/subfolder1/"})
}
>>> # Empty bucket or nonexistent prefix
>>> s3_walk(client, 'test-bucket', 'nonexistent/')
{}
split_s3_uri(uri: str) -> Tuple[str, str]
Split an S3 URI into bucket and key.
Supports both s3:// and s3a:// schemes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri
|
str
|
The S3 URI to split, e.g., "s3://my-bucket/path/to/object.txt". |
required |
Returns:
| Type | Description |
|---|---|
Tuple[str, str]
|
A tuple containing the bucket name and the object key. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the URI is malformed or does not use the s3:// or s3a:// scheme. |
Examples:
>>> split_s3_uri("s3://my-bucket/data/file.csv")
('my-bucket', 'data/file.csv')
upload_file(client: boto3.client, bucket_name: str, local_path: str, object_name: Optional[str] = None, overwrite: bool = False) -> bool
Upload a file to an Amazon S3 bucket from local directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the target S3 bucket. |
required |
local_path
|
str
|
The file path on the local system to upload. |
required |
object_name
|
Optional[str]
|
The target S3 object name. If None, uses the base name of the local file path. |
None
|
overwrite
|
bool
|
If True, the existing file on S3 will be overwritten. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the file was uploaded successfully, False otherwise. |
Examples:
>>> client = boto3.client('s3')
>>> upload_file(
... client,
... 'mybucket',
... '/path/to/file.txt',
... 'folder/s3_file.txt'
... )
True
upload_folder(client: boto3.client, bucket_name: str, local_path: str, prefix: str = '', overwrite: bool = False) -> bool
Upload an entire folder from the local file system to an AWS S3 bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the bucket to which the folder will be uploaded. |
required |
local_path
|
str
|
The path to the local folder to upload. |
required |
prefix
|
str
|
The prefix to prepend to each object name when uploading to S3. |
''
|
overwrite
|
bool
|
If True, overwrite existing files in the bucket. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the folder was uploaded successfully, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> upload_folder(
... client,
... 'mybucket',
... '/path/to/local/folder',
... 'folder_prefix',
... True
... )
True
validate_bucket_name(bucket_name: str) -> str
Validate the format of an AWS S3 bucket name according to AWS rules.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket_name
|
str
|
The name of the bucket to validate. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The validated bucket name if valid. |
Raises:
| Type | Description |
|---|---|
InvalidBucketNameError
|
If the bucket name does not meet AWS specifications. |
Examples:
>>> validate_bucket_name('valid-bucket-name')
'valid-bucket-name'
>>> validate_bucket_name('Invalid_Bucket_Name')
InvalidBucketNameError: Bucket name must not contain underscores.
validate_s3_file_path(file_path: str, allow_s3_scheme: bool) -> str
Validate the file path based on the S3 URI scheme.
If allow_s3_scheme is True, the file path must contain an S3 URI scheme
(either 's3://' or 's3a://').
If allow_s3_scheme is False, the file path should not contain an S3 URI scheme.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str
|
The file path to validate. |
required |
allow_s3_scheme
|
bool
|
Whether or not to allow an S3 URI scheme in the file path. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The validated file path if valid. |
Raises:
| Type | Description |
|---|---|
InvalidS3FilePathError
|
If the validation fails based on the value of |
Examples:
>>> validate_s3_file_path('data_folder/data.csv', allow_s3_scheme=False)
'data_folder/data.csv'
>>> validate_s3_file_path('s3a://bucket-name/data.csv', allow_s3_scheme=True)
's3a://bucket-name/data.csv'
>>> validate_s3_file_path('s3a://bucket-name/data.csv', allow_s3_scheme=False)
InvalidS3FilePathError: The file_path should not contain an S3 URI scheme
like 's3://' or 's3a://'.
write_csv(client: boto3.client, bucket_name: str, data: pd.DataFrame, filepath: str, **kwargs) -> bool
Write a Pandas Dataframe to csv in an S3 bucket.
Uses StringIO library as a RAM buffer, so at first Pandas writes data to the buffer, then the buffer returns to the beginning, and then it is sent to the S3 bucket using the boto3.put_object method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
data
|
DataFrame
|
The dataframe to write to the specified path. |
required |
filepath
|
str
|
The filepath to save the dataframe to. |
required |
kwargs
|
Optional dictionary of Pandas to_csv arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the dataframe is written successfully. False if it was not possible to serialise or write the file. |
Raises:
| Type | Description |
|---|---|
Exception
|
If there is an error writing the file to S3. |
Examples:
>>> client = boto3.client('s3')
>>> data = pd.DataFrame({
>>> 'column1': [1, 2, 3],
>>> 'column2': ['a', 'b', 'c']
>>> })
>>> write_csv(client, 'my_bucket', data, 'path/to/file.csv')
True
write_excel(client: boto3.client, bucket_name: str, data: pd.DataFrame, filepath: str, **kwargs) -> bool
Write a Pandas DataFrame to an Excel file in an S3 bucket.
Uses BytesIO as a RAM buffer. Pandas writes data to the buffer, the buffer rewinds to the beginning, and then it is sent to S3 using the boto3.put_object method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
data
|
DataFrame
|
The dataframe to write to the specified path. |
required |
filepath
|
str
|
The filepath to save the dataframe to in the S3 bucket. |
required |
kwargs
|
dict
|
Optional dictionary of Pandas |
{}
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the dataframe is written successfully, False otherwise. |
Raises:
| Type | Description |
|---|---|
Exception
|
If there is an error writing the file to S3. |
Examples:
>>> client = boto3.client('s3')
>>> data = pd.DataFrame({
>>> 'column1': [1, 2, 3],
>>> 'column2': ['a', 'b', 'c']
>>> })
>>> write_excel(client, 'my_bucket', data, 'path/to/file.xlsx')
True
write_string_to_file(client: boto3.client, bucket_name: str, object_name: str, object_content: bytes) -> None
Write a string into the specified object in the s3 bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
object_name
|
str
|
The S3 object name to write into. |
required |
object_content
|
bytes
|
The content (str) to be written to "object_name". |
required |
Returns:
| Type | Description |
|---|---|
None
|
The outcome of this operation is the string written into the object in the s3 bucket. It will overwrite anything in the object. |
Examples:
>>> client = boto3.client('s3')
>>> write_string_to_file(client, 'mybucket', 'folder/file.txt', b'example content')
zip_local_directory_to_s3(client: boto3.client, local_directory_path: Union[str, Path], bucket_name: str, object_name: str, overwrite: bool = False) -> bool
Zips a local directory and uploads it to AWS S3.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
local_directory_path
|
Union[str, Path]
|
Path to the local directory to be zipped. |
required |
bucket_name
|
str
|
Name of the S3 bucket. |
required |
object_name
|
str
|
S3 key (path) where the zip file will be saved. |
required |
overwrite
|
bool
|
If False, will not upload if the file already exists in S3. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if upload was successful, False otherwise. |
Examples:
>>> import boto3
>>> from pathlib import Path
>>> client = boto3.client('s3')
>>> # Basic usage
>>> zip_local_directory_to_s3(
... client,
... '/path/to/local/dir',
... 'my-bucket',
... 'backups/mydir.zip'
... )
True
>>> # With overwrite parameter
>>> zip_local_directory_to_s3(
... client,
... Path('/path/to/local/dir'),
... 'my-bucket',
... 'backups/mydir.zip',
... overwrite=True
... )
True
zip_s3_directory_to_s3(client: boto3.client, source_bucket_name: str, source_prefix: str, destination_bucket_name: str, destination_object_name: str, overwrite: bool = False) -> bool
Zips a directory that exists in S3 and saves it to another location in S3.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
Initialised boto3 S3 client. |
required |
source_bucket_name
|
str
|
Name of the source S3 bucket. |
required |
source_prefix
|
str
|
Prefix (directory path) in the source bucket to zip. |
required |
destination_bucket_name
|
str
|
Name of the destination S3 bucket. |
required |
destination_object_name
|
str
|
S3 key (path) where the zip file will be saved. |
required |
overwrite
|
bool
|
If False, will not upload if the file already exists. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if operation was successful, False otherwise |
Examples:
>>> import boto3
>>> s3 = boto3.client('s3')
>>> # Basic usage
>>> zip_s3_directory_to_s3(
... s3,
... 'source-bucket',
... 'data/logs/',
... 'dest-bucket',
... 'archives/logs.zip'
... )
True
>>> # With overwrite parameter
>>> zip_s3_directory_to_s3(
... s3,
... 'source-bucket',
... 'data/logs/',
... 'dest-bucket',
... 'archives/logs.zip',
... overwrite=True
... )
True
rdsa_utils.cdp.helpers.hdfs_utils
Utility functions for interacting with HDFS.
change_permissions(path: str, permission: str, recursive: bool = False) -> bool
Change directory and file permissions in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The path to the file or directory in HDFS. |
required |
permission
|
str
|
The permission to be set, e.g., 'go+rwx' or '777'. |
required |
recursive
|
bool
|
If True, changes permissions for all subdirectories and files within a directory. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
copy(from_path: str, to_path: str, overwrite: bool = False) -> bool
Copy a file in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_path
|
str
|
The source path of the file in HDFS. |
required |
to_path
|
str
|
The target path of the file in HDFS. |
required |
overwrite
|
bool
|
If True, the existing file at the target path will be overwritten, default is False. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
Raises:
| Type | Description |
|---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
copy_local_to_hdfs(from_path: str, to_path: str) -> bool
Copy a local file to HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_path
|
str
|
The path to the local file. |
required |
to_path
|
str
|
The path to the HDFS directory where the file will be copied. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
create_dir(path: str) -> bool
Create a directory in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The HDFS path where the directory should be created. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the operation is successful (directory created), otherwise False. |
create_txt_from_string(path: str, string_to_write: str, replace: Optional[bool] = False) -> None
Create and populate a text file in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The path to the new file to be created, for example, '/some/directory/newfile.txt'. |
required |
string_to_write
|
str
|
The string that will populate the new text file. |
required |
replace
|
Optional[bool]
|
Flag determining whether an existing file should be replaced. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
None
|
This function doesn't return anything; it's used for its side effect of creating a text file. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If |
delete_dir(path: str) -> bool
Delete an empty directory from HDFS.
This function attempts to delete an empty directory in HDFS. If the directory is not empty, the deletion will fail.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The HDFS path to the directory to be deleted. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the operation is successful (directory deleted), otherwise False. |
Note
This function will only succeed if the directory is empty.
To delete directories containing files or other directories,
consider using delete_path instead.
delete_file(path: str) -> bool
Delete a specific file in HDFS.
This function is used to delete a single file located at the specified HDFS path. If the path points to a directory, the command will fail.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The path to the file in HDFS to be deleted. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the file was successfully deleted (command return code 0), otherwise False. |
Raises:
| Type | Description |
|---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
Note
This function is intended for files only. For directory deletions,
use delete_dir or delete_path.
delete_path(path: str) -> bool
Delete a file or directory in HDFS, including non-empty directories.
This function is capable of deleting both files and directories. When applied to directories, it will recursively delete all contents within the directory, making it suitable for removing directories regardless of whether they are empty or contain files or other directories.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The path to the file or directory in HDFS to be deleted. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the file was successfully deleted (command return code 0), otherwise False. |
Raises:
| Type | Description |
|---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
Warning
Use with caution: applying this function to a directory will remove all contained files and subdirectories without confirmation.
file_exists(path: str) -> bool
Check whether a file exists in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The path to the file in HDFS to be checked for existence. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the file exists (command return code 0), otherwise False. |
Raises:
| Type | Description |
|---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
get_date_modified(filepath: str) -> str
Return the last modified date of a file in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filepath
|
str
|
The path to the file in HDFS. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The date the file was last modified. |
is_dir(path: str) -> bool
Test if a directory exists in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The HDFS path to the directory to be tested. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the operation is successful (directory exists), otherwise False. |
move_local_to_hdfs(from_path: str, to_path: str) -> bool
Move a local file to HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_path
|
str
|
The path to the local file. |
required |
to_path
|
str
|
The path to the HDFS directory where the file will be moved. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
read_dir(path: str) -> List[str]
Read the contents of a directory in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The path to the directory in HDFS. |
required |
Returns:
| Type | Description |
|---|---|
List[str]
|
A list of full paths of the items found in the directory. |
read_dir_files(path: str) -> List[str]
Read the filenames in a directory in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The path to the directory in HDFS. |
required |
Returns:
| Type | Description |
|---|---|
List[str]
|
A list of filenames in the directory. |
read_dir_files_recursive(path: str, return_path: bool = True) -> List[str]
Recursively reads the contents of a directory in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The path to the directory in HDFS. |
required |
return_path
|
bool
|
If True, returns the full path of the files, otherwise just the filename. |
True
|
Returns:
| Type | Description |
|---|---|
List[str]
|
A list of files in the directory. |
rename(from_path: str, to_path: str, overwrite: bool = False) -> bool
Rename (i.e., move using full path) a file in HDFS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_path
|
str
|
The source path of the file in HDFS. |
required |
to_path
|
str
|
The target path of the file in HDFS. |
required |
overwrite
|
bool
|
If True, the existing file at the target path will be overwritten, default is False. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
Raises:
| Type | Description |
|---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
rdsa_utils.cdp.helpers.impala
Utilities for working with Impala.
invalidate_impala_metadata(table: str, impalad_address_port: str, impalad_ca_cert: str, keep_stderr: Optional[bool] = False)
Automate the invalidation of a table's metadata using impala-shell.
This function uses the impala-shell command with the given impalad_address_port and impalad_ca_cert, to invalidate a specified table's metadata.
It proves useful during a data pipeline's execution after writing to an intermediate Hive table. Using Impala Query Editor in Hue, end-users often need to run "INVALIDATE METADATA" command to refresh a table's metadata. However, this manual step can be missed, leading to potential use of outdated metadata.
The function automates the "INVALIDATE METADATA" command for a given table, ensuring up-to-date metadata for future queries. This reduces manual intervention, making outdated metadata issues less likely to occur.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
Name of the table for metadata invalidation. |
required |
impalad_address_port
|
str
|
'address:port' of the impalad instance. |
required |
impalad_ca_cert
|
str
|
Path to impalad's CA certificate file. |
required |
keep_stderr
|
Optional[bool]
|
If True, will print impala-shell command's stderr output. |
False
|
Returns:
| Type | Description |
|---|---|
None
|
|
Examples:
>>> invalidate_impala_metadata(
... 'my_table',
... 'localhost:21050',
... '/path/to/ca_cert.pem'
... )
>>> invalidate_impala_metadata(
... 'my_table',
... 'localhost:21050',
... '/path/to/ca_cert.pem',
... keep_stderr=True
... )
rdsa_utils.cdp.io.pipeline_runlog
Utilities for managing a Pipeline Runlog using Hive Tables.
add_runlog_entry(spark: SparkSession, desc: str, version: str, config: Union[ConfigParser, Dict[str, str]], pipeline: Optional[str] = None, log_table: str = 'pipeline_runlog', run_id: Optional[int] = None) -> DataFrame
Add an entry to a target runlog.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
A running spark session. |
required |
desc
|
str
|
Description to attach to the log entry. |
required |
version
|
str
|
Version of the pipeline. |
required |
config
|
Union[ConfigParser, Dict[str, str]]
|
Configuration object for the run. |
required |
pipeline
|
Optional[str]
|
Pipeline name. If None, uses the spark application name. |
None
|
log_table
|
str
|
Target runlog table. If database not set, this should include the database. |
'pipeline_runlog'
|
run_id
|
Optional[int]
|
Run id to use if already reserved. If not specified, a new one is generated. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
The log entry returned as a spark dataframe. |
create_runlog_entry(spark: SparkSession, run_id: int, desc: str, version: str, config: Union[ConfigParser, Dict[str, str]], pipeline: Optional[str] = None) -> DataFrame
Create an entry for the runlog.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
A running spark session. |
required |
run_id
|
int
|
Entry run id. |
required |
desc
|
str
|
Description to attach to the log entry. |
required |
version
|
str
|
Version of the pipeline. |
required |
config
|
Union[ConfigParser, Dict[str, str]]
|
Configuration object for the run. |
required |
pipeline
|
Optional[str]
|
Pipeline name. If None, derives from spark app name. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
The log entry returned as a spark dataframe. |
create_runlog_table(spark: SparkSession, database: str, tablename: Optional[str] = 'pipeline_runlog') -> None
Create runlog and _reserved_ids tables in the target database if needed.
This function executes two SQL queries to create two tables, if they do not already exist in the target database. The first table's structure includes columns for run_id, desc, user, datetime, pipeline_name, pipeline_version, and config, while the second table includes run_id and reserved_date.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
A running spark session which will be used to execute SQL queries. |
required |
database
|
str
|
The name of the target database where tables will be created. |
required |
tablename
|
Optional[str]
|
The name of the main table to be created (default is "pipeline_runlog"). The associated _reserved_ids table will be suffixed with this name. |
'pipeline_runlog'
|
Returns:
| Type | Description |
|---|---|
None
|
|
Examples:
>>> spark = SparkSession.builder.appName("test_session").getOrCreate()
>>> create_runlog_table(spark, "test_db", "test_table")
get_last_run_id(spark: SparkSession, pipeline: Optional[str] = None, log_table: str = 'pipeline_runlog') -> Optional[int]
Retrieve the last run_id, either in general or for a specific pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
A running Spark session. |
required |
pipeline
|
Optional[str]
|
If specified, the result will be for the listed pipeline only. |
None
|
log_table
|
str
|
The target runlog table. If the database is not set, this should include the database. |
'pipeline_runlog'
|
Returns:
| Type | Description |
|---|---|
int or None
|
The id of the last run. Returns None if the log table is empty. |
get_penultimate_run_id(spark: SparkSession, pipeline: Optional[str] = None, log_table: str = 'pipeline_runlog') -> Optional[int]
Retrieve penultimate run_id in general or a specific pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
A running Spark session. |
required |
pipeline
|
Optional[str]
|
If specified, the result will be for the listed pipeline only. |
None
|
log_table
|
str
|
The target runlog table. If the database is not set, this should include the database. |
'pipeline_runlog'
|
Returns:
| Type | Description |
|---|---|
int or None
|
The id of the penultimate run. Returns None if the log table is empty or has less than two entries. |
reserve_id(spark: SparkSession, log_table: Optional[str] = 'pipeline_runlog') -> int
Reserve a run id in the reserved ids table linked to the runlog table.
The function reads the last run id from the reserved ids table, increments it to create a new id,and writes the new id with the current timestamp to the reserved ids table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
A running SparkSession instance. |
required |
log_table
|
Optional[str]
|
The name of the main pipeline runlog table associated with this reserved id table, by default "pipeline_runlog". |
'pipeline_runlog'
|
Returns:
| Type | Description |
|---|---|
int
|
The new run id. |
write_runlog_file(spark: SparkSession, runlog_table: str, runlog_id: int, path: str) -> None
Write metadata from runlog entry to a text file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
A running SparkSession instance. |
required |
runlog_table
|
str
|
The name of the table containing the runlog entries. |
required |
runlog_id
|
int
|
The id of the desired entry. |
required |
path
|
str
|
The HDFS path where the file will be written. |
required |
Returns:
| Type | Description |
|---|---|
None
|
This function doesn't return anything; it's used for its side effect of creating a text file. |
rdsa_utils.cdp.io.input
Read inputs on CDP.
extract_database_name(spark: SparkSession, long_table_name: str) -> Tuple[str, str]
Extract the database component and table name from a compound table name.
This function can handle multiple scenarios:
-
For GCP's naming format '
. . ', the function will return the database and table name.
If the name is formatted as 'db_name.table_name', the function will extract and return the database and table names.
If the long_table_name contains only the table name (e.g., 'table_name'), the function will use the current database of the SparkSession.
For any other incorrectly formatted names, the function will raise a ValueError.
Parameters:
Name Type Description Default sparkSparkSessionActive SparkSession.
required long_table_namestrFull name of the table, which can include the GCP project and/or database name.
required Returns:
Type Description Tuple[str, str]A tuple containing the name of the database and the table name.
Raises:
Type Description ValueErrorIf the table name doesn't match any of the expected formats.
get_current_database(spark: SparkSession) -> strRetrieve the current database from the active SparkSession.
get_tables_in_database(spark: SparkSession, database_name: str) -> List[str]Get a list of tables in a given database.
Parameters:
Name Type Description Default sparkSparkSessionActive SparkSession.
required database_namestrThe name of the database from which to list tables.
required Returns:
Type Description List[str]A list of table names in the specified database.
Raises:
Type Description ValueErrorIf there is an error fetching tables from the specified database.
Examples:
>>> tables = get_tables_in_database(spark, "default") >>> print(tables) ['table1', 'table2', 'table3']load_and_validate_table(spark: SparkSession, table_name: str, skip_validation: bool = False, err_msg: str = None, filter_cond: str = None, keep_columns: Optional[List[str]] = None, rename_columns: Optional[Dict[str, str]] = None, drop_columns: Optional[List[str]] = None) -> SparkDFLoad a table, apply transformations, and validate if it is not empty.
Parameters:
Name Type Description Default sparkSparkSessionActive SparkSession.
required table_namestrName of the table to load.
required skip_validationboolIf True, skips validation step, by default False.
Falseerr_msgstrError message to return if table is empty, by default None.
Nonefilter_condstrCondition to apply to SparkDF once read, by default None.
Nonekeep_columnsOptional[List[str]]A list of column names to keep in the DataFrame, dropping all others. Default value is None.
Nonerename_columnsOptional[Dict[str, str]]A dictionary to rename columns where keys are existing column names and values are new column names. Default value is None.
Nonedrop_columnsOptional[List[str]]A list of column names to drop from the DataFrame. Default value is None.
NoneReturns:
Type Description DataFrameLoaded SparkDF if validated, subject to options above.
Raises:
Type Description PermissionErrorIf there's an issue accessing the table or if the table does not exist in the specified database.
ValueErrorIf the table is empty after loading, becomes empty after applying a filter condition, or if columns specified in keep_columns, drop_columns, or rename_columns do not exist in the DataFrame.
Notes
Transformation order: 1. Columns are kept according to
keep_columns. 2. Columns are dropped according todrop_columns. 3. Columns are renamed according torename_columns.Examples:
Load a table, apply a filter, and validate it:
>>> df = load_and_validate_table( spark=spark, table_name="my_table", filter_cond="age > 21" )Load a table and keep only specific columns:
>>> df = load_and_validate_table( spark=spark, table_name="my_table", keep_columns=["name", "age", "city"] )Load a table, drop specific columns, and rename a column:
>>> df = load_and_validate_table( spark=spark, table_name="my_table", drop_columns=["extra_column"], rename_columns={"name": "full_name"} )Load a table, skip validation, and apply all transformations:
>>> df = load_and_validate_table( spark=spark, table_name="my_table", skip_validation=True, keep_columns=["name", "age", "city"], drop_columns=["extra_column"], rename_columns={"name": "full_name"}, filter_cond="age > 21" )rdsa_utils.cdp.io.outputWrite outputs on CDP.
insert_df_to_hive_table(spark: SparkSession, df: SparkDF, table_name: str, overwrite: bool = False, fill_missing_cols: bool = False, repartition_data_by: Union[int, str, None] = None) -> NoneWrite SparkDF to Hive table with optional configuration.
This function writes data from a SparkDF into a Hive table, handling missing columns and optional repartitioning. It ensures the table's column order matches the DataFrame and manages different overwrite behaviors for partitioned and non-partitioned data.
Parameters:
Name Type Description Default sparkSparkSessionActive SparkSession.
required dfDataFrameSparkDF containing data to be written.
required table_namestrName of the Hive table to write data into.
required overwriteboolControls how existing data is handled, default is False:
For non-partitioned data: - True: Replaces entire table with DataFrame data. - False: Appends DataFrame data to existing table.
For partitioned data: - True: Replaces data only in partitions present in DataFrame. - False: Appends data to existing partitions or creates new ones.
Falsefill_missing_colsboolIf True, adds missing columns as NULL values. If False, raises an error on schema mismatch, default is False.
- Explicitly casts DataFrame columns to match the Hive table schema to avoid type mismatch errors.
- Adds missing columns as NULL values when
fill_missing_colsis True, regardless of their data type (e.g., String, Integer, Double, Boolean, etc.).
Falserepartition_data_byUnion[int, str, None]Controls data repartitioning, default is None: - int: Sets target number of partitions. - str: Specifies column to repartition by. - None: No repartitioning performed.
NoneNotes
When using repartition with a number: - Affects physical file structure but preserves Hive partitioning scheme. - Controls number of output files per write operation per Hive partition. - Maintains partition-based query optimisation.
When repartitioning by column: - Helps balance file sizes across Hive partitions. - Reduces creation of small files.
Raises:
Type Description AnalysisExceptionIf there's an error reading the table. This can occur if the table doesn't exist or if there's no access to it.
ValueErrorIf the SparkDF schema does not match the Hive table schema and 'fill_missing_cols' is set to False.
DataframeEmptyErrorIf input DataFrame is empty.
ExceptionFor other general exceptions when writing data to the table.
Examples:
Write a DataFrame to a Hive table without overwriting:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table" ... )Overwrite an existing table with a DataFrame:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table", ... overwrite=True ... )Write a DataFrame to a Hive table with missing columns filled:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table", ... fill_missing_cols=True ... )Repartition by column before writing to Hive:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table", ... repartition_data_by="partition_column" ... )Repartition into a fixed number of partitions before writing:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table", ... repartition_data_by=10 ... )save_csv_to_hdfs(df: SparkDF, file_name: str, file_path: str, overwrite: bool = True) -> NoneSave DataFrame as CSV on HDFS, coalescing to a single partition.
This function saves a PySpark DataFrame to HDFS in CSV format. By coalescing the DataFrame into a single partition before saving, it accomplishes two main objectives:
-
Single Part File: The output is a single CSV file rather than multiple part files. This method reduces complexity and cuts through the clutter of multi-part files, offering users and systems a more cohesive and hassle-free experience.
-
Preserving Row Order: Coalescing into a single partition maintains the order of rows as they appear in the DataFrame. This is essential when the row order matters for subsequent processing or analysis. It's important to note, however, that coalescing can have performance implications for very large DataFrames by concentrating all data processing on a single node.
Parameters:
Name Type Description Default dfDataFramePySpark DataFrame to be saved.
required file_namestrName of the CSV file. Must include the ".csv" extension.
required file_pathstrHDFS path where the CSV file should be saved.
required overwriteboolIf True, overwrite any existing file with the same name. If False and the file exists, the function will raise an error.
TrueRaises:
Type Description ValueErrorIf the file_name does not end with ".csv".
IOErrorIf overwrite is False and the target file already exists.
Examples:
Saving to an S3 bucket using the
s3a://scheme:# Assume `df` is a pre-defined PySpark DataFrame file_name = "data_output.csv" file_path = "s3a://my-bucket/data_folder/" save_csv_to_hdfs(df, file_name, file_path, overwrite=True)Saving to a normal HDFS path:
# Assume `df` is a pre-defined PySpark DataFrame file_name = "data_output.csv" file_path = "/user/hdfs/data_folder/" save_csv_to_hdfs(df, file_name, file_path, overwrite=True)save_csv_to_s3(df: SparkDF, bucket_name: str, file_name: str, file_path: str, s3_client: boto3.client, overwrite: bool = True) -> NoneSave DataFrame as CSV on S3, coalescing to a single partition.
This function saves a PySpark DataFrame to S3 in CSV format. By coalescing the DataFrame into a single partition before saving, it accomplishes two main objectives:
-
Single Part File: The output is a single CSV file rather than multiple part files. This method reduces complexity and cuts through the clutter of multi-part files, offering users and systems a more cohesive and hassle-free experience.
-
Preserving Row Order: Coalescing into a single partition maintains the order of rows as they appear in the DataFrame. This is essential when the row order matters for subsequent processing or analysis. It's important to note, however, that coalescing can have performance implications for very large DataFrames by concentrating all data processing on a single node.
Parameters:
Name Type Description Default dfDataFramePySpark DataFrame to be saved.
required bucket_namestrThe name of the S3 bucket where the CSV file should be saved.
required file_namestrName of the CSV file. Must include the ".csv" extension.
required file_pathstrS3 path where the CSV file should be saved.
required s3_clientclientThe boto3 S3 client instance.
required overwriteboolIf True, overwrite any existing file with the same name. If False and the file exists, the function will raise an error.
TrueRaises:
Type Description ValueErrorIf the file_name does not end with ".csv".
InvalidBucketNameErrorIf the bucket name does not meet AWS specifications.
InvalidS3FilePathErrorIf the file_path contains an S3 URI scheme like 's3://' or 's3a://'.
IOErrorIf overwrite is False and the target file already exists.
Examples:
Saving to an S3 bucket:
# Assume `df` is a pre-defined PySpark DataFrame file_name = "data_output.csv" file_path = "data_folder/" s3_client = boto3.client('s3') save_csv_to_s3( df, 'my-bucket', file_name, file_path, s3_client, overwrite=True )write_and_read_hive_table(spark: SparkSession, df: SparkDF, table_name: str, database: str, filter_id: Union[int, str], filter_col: str = 'run_id', fill_missing_cols: bool = False) -> SparkDFWrite a SparkDF to an existing Hive table and then read it back.
Parameters:
Name Type Description Default sparkSparkSessionActive SparkSession.
required dfDataFrameThe SparkDF to be written to the Hive table.
required table_namestrThe name of the Hive table to write to and read from.
required databasestrThe Hive database name.
required filter_idUnion[int, str]The identifier to filter on when reading data back from the Hive table.
required filter_colstrThe column name to use for filtering data when reading back from the Hive table, by default 'run_id'.
'run_id'fill_missing_colsboolIf True, missing columns in the DataFrame will be filled with nulls when writing to the Hive table, by default False.
FalseReturns:
Type Description DataFrameThe DataFrame read from the Hive table.
Raises:
Type Description ValueErrorIf the specified Hive table does not exist in the given database or if the provided DataFrame doesn't contain the specified filter column.
ExceptionFor general exceptions encountered during execution.
Notes
This function assumes the Hive table already exists. The DataFrame
dfshould have the same schema as the Hive table for the write to succeed.The function allows for more effective memory management when dealing with large PySpark DataFrames by leveraging Hive's on-disk storage.
Predicate pushdown is used when reading the data back into a PySpark DataFrame, minimizing the memory usage and optimising the read operation.
As part of the design, there is always a column called filter_col in the DataFrame and Hive table to track pipeline runs.
The Hive table contains all the runs, and we only read back the run that we just wrote to the Hive Table using the
filter_idparameter. If nofilter_colis specified, 'run_id' is used as default.GCP
rdsa_utils.gcp.helpers.gcp_utilsUtility functions for interacting with Google Cloud Storage.
To initialise a client for GCS and configure it with a service account JSON key file, you can use the following code snippet:
from google.cloud import storage # Create a GCS client client = storage.Client.from_service_account_json('path/to/keyfile.json')copy_file(client: storage.Client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str, overwrite: bool = False) -> boolCopy a file from one GCS bucket to another.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required source_bucket_namestrThe name of the source bucket.
required source_object_namestrThe GCS object name of the source file.
required destination_bucket_namestrThe name of the destination bucket.
required destination_object_namestrThe GCS object name of the destination file.
required overwriteboolIf True, overwrite the destination file if it already exists.
FalseReturns:
Type Description boolTrue if the file was copied successfully, otherwise False.
Examples:
>>> client = storage.Client() >>> copy_file( ... client, ... 'source-bucket', ... 'source_file.txt', ... 'destination-bucket', ... 'destination_file.txt' ... ) Truecreate_folder_on_gcs(client: storage.Client, bucket_name: str, folder_path: str) -> boolCreate a folder in a GCS bucket if it doesn't already exist.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required bucket_namestrThe name of the bucket where the folder will be created.
required folder_pathstrThe name of the folder to create.
required Returns:
Type Description boolTrue if the folder was created successfully or already exists, otherwise False.
Examples:
>>> client = storage.Client() >>> create_folder_on_gcs(client, 'mybucket', 'new_folder/') Truedelete_file(client: storage.Client, bucket_name: str, object_name: str) -> boolDelete a file from a GCS bucket.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required bucket_namestrThe name of the bucket from which the file will be deleted.
required object_namestrThe GCS object name of the file to delete.
required Returns:
Type Description boolTrue if the file was deleted successfully, otherwise False.
Examples:
>>> client = storage.Client() >>> delete_file(client, 'mybucket', 'folder/gcs_file.txt') Truedelete_folder(client: storage.Client, bucket_name: str, folder_path: str) -> boolDelete a folder in a GCS bucket.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required bucket_namestrThe name of the GCS bucket.
required folder_pathstrThe path of the folder to delete.
required Returns:
Type Description boolTrue if the folder was deleted successfully, otherwise False.
Examples:
>>> client = storage.Client() >>> delete_folder(client, 'mybucket', 'path/to/folder/') Truedownload_file(client: storage.Client, bucket_name: str, object_name: str, local_path: str, overwrite: bool = False) -> boolDownload a file from a GCS bucket to a local directory.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required bucket_namestrThe name of the GCS bucket from which to download the file.
required object_namestrThe GCS object name of the file to download.
required local_pathstrThe local file path where the downloaded file will be saved.
required overwriteboolIf True, overwrite the local file if it exists.
FalseReturns:
Type Description boolTrue if the file was downloaded successfully, False otherwise.
Examples:
>>> client = storage.Client() >>> download_file( ... client, ... 'mybucket', ... 'folder/gcs_file.txt', ... '/path/to/download.txt' ... ) Truedownload_folder(client: storage.Client, bucket_name: str, prefix: str, local_path: str, overwrite: bool = False) -> boolDownload a folder from a GCS bucket to a local directory.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required bucket_namestrThe name of the GCS bucket from which to download the folder.
required prefixstrThe GCS prefix of the folder to download.
required local_pathstrThe local directory path where the downloaded folder will be saved.
required overwriteboolIf True, overwrite existing local files if they exist.
FalseReturns:
Type Description boolTrue if the folder was downloaded successfully, False otherwise.
Examples:
>>> client = storage.Client() >>> download_folder( ... client, ... 'mybucket', ... 'folder/subfolder/', ... '/path/to/local_folder', ... overwrite=False ... ) Truefile_exists(client: storage.Client, bucket_name: str, object_name: str) -> boolCheck if a specific file exists in a GCS bucket.
Parameters:
Name Type Description Default clientClientThe GCS client.
required bucket_namestrThe name of the bucket.
required object_namestrThe GCS object name to check for existence.
required Returns:
Type Description boolTrue if the file exists, otherwise False.
Examples:
>>> client = storage.Client() >>> file_exists(client, 'mybucket', 'folder/file.txt') Trueget_table_columns(table_path) -> List[str]Return the column names for given bigquery table.
is_gcs_directory(client: storage.Client, bucket_name: str, object_name: str) -> boolCheck if a GCS key is a directory by listing its contents.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required bucket_namestrThe name of the GCS bucket.
required object_namestrThe GCS object name to check.
required Returns:
Type Description boolTrue if the key represents a directory, False otherwise.
list_files(client: storage.Client, bucket_name: str, prefix: str = '') -> List[str]List files in a GCS bucket that match a specific prefix.
Parameters:
Name Type Description Default clientClientThe GCS client.
required bucket_namestrThe name of the bucket.
required prefixstrThe prefix to filter files, by default "".
''Returns:
Type Description List[str]A list of GCS object keys matching the prefix.
Examples:
>>> client = storage.Client() >>> list_files(client, 'mybucket', 'folder_prefix/') ['folder_prefix/file1.txt', 'folder_prefix/file2.txt']load_config_gcp(config_path: str) -> Tuple[Dict, Dict]Load the config and dev_config files to dictionaries.
Parameters:
Name Type Description Default config_pathstrThe path of the config file in a yaml format.
required Returns:
Type Description Tuple[Dict, Dict]The contents of the config files.
move_file(client: storage.Client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str) -> boolMove a file within or between GCS buckets.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required source_bucket_namestrThe name of the source GCS bucket.
required source_object_namestrThe GCS object name of the source file.
required destination_bucket_namestrThe name of the destination GCS bucket.
required destination_object_namestrThe GCS object name of the destination file.
required Returns:
Type Description boolTrue if the file was moved successfully, False otherwise.
Examples:
>>> client = storage.Client() >>> move_file( ... client, ... 'sourcebucket', ... 'source_folder/file.txt', ... 'destbucket', ... 'dest_folder/file.txt' ... ) Trueremove_leading_slash(text: str) -> strRemove the leading forward slash from a string if present.
Parameters:
Name Type Description Default textstrThe text from which the leading slash will be removed.
required Returns:
Type Description strThe text stripped of its leading slash.
Examples:
>>> remove_leading_slash('/example/path') 'example/path'run_bq_query(query: str) -> bigquery.QueryJobRun an SQL query in BigQuery.
table_exists(table_path: TablePath) -> boolCheck the big query catalogue to see if a table exists.
Returns True if a table exists. See code sample explanation here: https://cloud.google.com/bigquery/docs/samples/bigquery-table-exists#bigquery_table_exists-python
Parameters:
Name Type Description Default table_pathTablePathThe target BigQuery table name of form:
. . required Returns:
Type Description boolReturns True if table exists and False if table does not exist.
upload_file(client: storage.Client, bucket_name: str, local_path: str, object_name: Optional[str] = None, overwrite: bool = False) -> boolUpload a file to a GCS bucket from local directory.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required bucket_namestrThe name of the target GCS bucket.
required local_pathstrThe file path on the local system to upload.
required object_nameOptional[str]The target GCS object name. If None, uses the base name of the local file path.
NoneoverwriteboolIf True, the existing file on GCS will be overwritten.
FalseReturns:
Type Description boolTrue if the file was uploaded successfully, False otherwise.
Examples:
>>> client = storage.Client() >>> upload_file( ... client, ... 'mybucket', ... '/path/to/file.txt', ... 'folder/gcs_file.txt' ... ) Trueupload_folder(client: storage.Client, bucket_name: str, local_path: str, prefix: str = '', overwrite: bool = False) -> boolUpload an entire folder from the local file system to a GCS bucket.
Parameters:
Name Type Description Default clientClientThe GCS client instance.
required bucket_namestrThe name of the bucket to which the folder will be uploaded.
required local_pathstrThe path to the local folder to upload.
required prefixstrThe prefix to prepend to each object name when uploading to GCS.
''overwriteboolIf True, overwrite existing files in the bucket.
FalseReturns:
Type Description boolTrue if the folder was uploaded successfully, otherwise False.
Examples:
>>> client = storage.Client() >>> upload_folder( ... client, ... 'mybucket', ... '/path/to/local/folder', ... 'folder_prefix', ... True ... ) Truevalidate_bucket_name(bucket_name: str) -> strValidate the format of a GCS bucket name according to GCS rules.
Parameters:
Name Type Description Default bucket_namestrThe name of the bucket to validate.
required Returns:
Type Description strThe validated bucket name if valid.
Raises:
Type Description InvalidBucketNameErrorIf the bucket name does not meet GCS specifications.
Examples:
>>> validate_bucket_name('valid-bucket-name') 'valid-bucket-name'>>> validate_bucket_name('Invalid_Bucket_Name') InvalidBucketNameError: Bucket name must not contain underscores.rdsa_utils.gcp.io.inputsRead from BigQuery.
build_sql_query(table_path: TablePath, columns: Optional[Sequence[str]] = None, date_column: Optional[str] = None, date_range: Optional[Sequence[str]] = None, column_filter_dict: Optional[Dict[str, Sequence[str]]] = None, partition_column: Optional[str] = None, partition_type: Optional[str] = None, partition_value: Optional[Union[Tuple[str, str], str]] = None) -> strCreate SQL query to load data with the specified filter conditions.
Parameters:
Name Type Description Default sparkSpark session.
required table_pathTablePathBigQuery table path in format "database_name.table_name".
required columnsOptional[Sequence[str]]The column selection. Selects all columns if None passed.
Nonedate_columnOptional[str]The name of the column to be used to filter the date range on.
Nonedate_rangeOptional[Sequence[str]]Sequence with two values, a lower and upper value for dates to load in.
Nonecolumn_filter_dictOptional[Dict[str, Sequence[str]]]A dictionary containing column: [values] where the values correspond to terms in the column that are to be filtered by.
Nonepartition_columnOptional[str]The name of the column that the table is partitioned by.
Nonepartition_typeOptional[str]The unit of time the table is partitioned by, must be one of: *
hour*day*month*yearNonepartition_valueOptional[Union[Tuple[str, str], str]]The value or pair of values for filtering the partition column to.
NoneReturns:
Type Description strThe string containing the SQL query.
read_table(spark: SparkSession, table_path: TablePath, columns: Optional[Sequence[str]] = None, date_column: Optional[str] = None, date_range: Optional[Sequence[str]] = None, column_filter_dict: Optional[Dict[str, Sequence[str]]] = None, run_id_column: Optional[str] = 'run_id', run_id: Optional[str] = None, flatten_struct_cols: bool = False, partition_column: Optional[str] = None, partition_type: Optional[BigQueryTimePartitions] = None, partition_value: Optional[Union[Tuple[str, str], str]] = None) -> SparkDFRead BigQuery table given table path and column selection.
Parameters:
Name Type Description Default sparkSparkSessionSpark session.
required table_pathTablePathThe target BigQuery table name of form:
. . required columnsOptional[Sequence[str]]The column selection. Selects all columns if None passed.
Nonedate_columnOptional[str]The name of the column to be used to filter the date range on.
Nonedate_rangeOptional[Sequence[str]]Sequence with two values, a lower and upper value for dates to load in.
Nonecolumn_filter_dictOptional[Dict[str, Sequence[str]]]A dictionary containing column: [values] where the values correspond to terms in the column that are to be filtered by.
Nonerun_id_columnOptional[str]The name of the column to be used to filter to the specified run_id.
'run_id'run_idOptional[str]The unique identifier for a run within the table that the read data is filtered to.
Nonepartition_columnOptional[str]The name of the column that the table is partitioned by.
Nonepartition_typeOptional[BigQueryTimePartitions]The unit of time the table is partitioned by, must be one of: *
hour*day*month*yearNonepartition_valueOptional[Union[Tuple[str, str], str]]The value or pair of values for filtering the partition column to.
Noneflatten_struct_colsboolWhen true, any struct type columns in the loaded dataframe are replaced with individual columns for each of the fields in the structs.
FalseReturns:
Type Description DataFramerdsa_utils.gcp.io.outputsWrite outputs to GCP.
write_table(df: Union[PandasDF, SparkDF], table_name: TablePath, mode: Literal['append', 'error', 'ignore', 'overwrite'] = 'error', partition_col: Optional[str] = None, partition_type: Optional[BigQueryTimePartitions] = None, partition_expiry_days: Optional[float] = None, clustered_fields: Optional[Union[str, List[str]]] = None) -> NoneWrite dataframe out to a Google BigQuery table.
In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function (default to throwing an exception). When mode is Overwrite, the schema of the DataFrame does not need to be the same as that of the existing table (the column order doesn't need be the same).
If you use the
df.printSchema()method directly in a print/log statement the code is processed and printed regardless of logging level. Instead you need to capture the output and pass this to the logger. See explanation here - https://stackoverflow.com/a/59935109To learn more about the partitioning of tables and how to use them in BigQuery: https://cloud.google.com/bigquery/docs/partitioned-tables
To learn more about the clustering of tables and how to use them in BigQuery: https://cloud.google.com/bigquery/docs/clustered-tables
To learn more about how spark dataframes are saved to BigQuery: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/blob/master/README.md
Parameters:
Name Type Description Default dfUnion[DataFrame, DataFrame]The dataframe to be saved.
required table_nameTablePathThe target BigQuery table name of form:
. . required modeLiteral['append', 'error', 'ignore', 'overwrite']Whether to overwrite or append to the BigQuery table. *
append: Append contents of this :class:DataFrameto table. *overwrite: Overwrite existing data. *error: Throw exception if data already exists. *ignore: Silently ignore this operation if data already exists.'error'partition_colOptional[str]A date or timestamp type column in the dataframe to use for the table partitioning.
Nonepartition_typeOptional[BigQueryTimePartitions]The unit of time to partition the table by, must be one of: *
hour*day*month*yearIf
partition_colis specified andpartition_type = Nonethen BigQuery will default to usingdaypartition type.If 'partition_type
is specified andpartition_col = Nonethen the table will be partitioned by the ingestion time pseudo column, and can be referenced in BigQuery via either_PARTITIONTIME as ptor_PARTITIONDATE' as pd`.See https://cloud.google.com/bigquery/docs/querying-partitioned-tables for more information on querying partitioned tables.
Nonepartition_expiry_daysOptional[float]If specified, this is the number of days (any decimal values are converted to that proportion of a day) that BigQuery keeps the data in each partition.
Noneclustered_fieldsOptional[Union[str, List[str]]]If specified, the columns (up to four) in the dataframe to cluster the data by when outputting. The order the columns are specified is important as will be the ordering of the clustering on the BigQuery table.
See: https://cloud.google.com/bigquery/docs/querying-clustered-tables for more information on querying clustered tables.
NoneReturns:
Type Description NoneHelpers
rdsa_utils.helpers.pysparkA selection of helper functions for building in pyspark.
aggregate_col(df: SparkDF, col: str, operation: str) -> floatAggregate (sum, max, min, or mean) a numeric PySpark column.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame containing the column.
required colstrThe name of the numeric column to aggregate.
required operationstrThe type of aggregation to perform. Must be one of 'sum', 'max', 'min', or 'mean'.
required Returns:
Type Description floatThe result of the specified aggregation on the column.
apply_col_func(df: SparkDF, cols: List[str], func: Callable[[SparkDF, str], SparkDF]) -> SparkDFApply a function to a list of columns in a PySpark DataFrame.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame.
required colsList[str]List of column names to apply the function to.
required funcCallable[[DataFrame, str], DataFrame]The function to apply, which should accept two arguments: (df, col).
required Returns:
Type Description DataFrameThe SparkDF after applying the function to each column.
assert_same_distinct_values(df1: SparkDF, df2: SparkDF, col_name: str) -> NoneAssert that two DataFrames have an identical set of distinct values.
This function extracts the unique values from the specified column in each DataFrame and asserts that the two resulting sets are identical.
Parameters:
Name Type Description Default df1DataFrameThe first DataFrame for comparison.
required df2DataFrameThe second DataFrame for comparison.
required col_namestrThe name of the column whose distinct values will be compared.
required Returns:
Type Description NoneThe function completes successfully if the sets are identical.
Raises:
Type Description ValueError- If
col_nameis not found in eitherdf1ordf2. - If the sets of distinct values in the specified column are not identical.
Examples:
>>> from pyspark.sql import SparkSession, types as T >>> spark = SparkSession.builder.appName("DistinctExample").getOrCreate()>>> # --- Create Sample DataFrames --- >>> schema = T.StructType([T.StructField("category", T.StringType())]) >>> df_a = spark.createDataFrame([("A",), ("B",), ("A",)], schema) >>> df_b = spark.createDataFrame([("B",), ("C",)], schema) >>> df_c = spark.createDataFrame([("B",), ("A",)], schema)>>> # --- 1. Success Case: Identical Sets --- >>> # Assertion passes silently because the distinct values {'A', 'B'} are the same. >>> assert_same_distinct_values(df_a, df_c, col_name="category")>>> # --- 2. Failure Case: Different Sets --- >>> # This will raise a ValueError with a descriptive message. >>> try: ... assert_same_distinct_values(df_a, df_b, col_name="category") ... except ValueError as e: ... print(e) Column 'category' has different distinct values across DataFrames. Values only in first DataFrame: {'A'} Values only in second DataFrame: {'C'}>>> # --- 3. Failure Case: Column Not Found --- >>> # This will raise a ValueError because the column does not exist. >>> try: ... assert_same_distinct_values(df_a, df_b, col_name="product_id") ... except ValueError as e: ... print(e) Column 'product_id' not found in the first DataFrame. Available: ['category']cache_time_df(df: SparkDF) -> NoneCache a PySpark DataFrame and print the time taken to cache and count it.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame to cache.
required Returns:
Type Description Nonecalc_median_price(groups: Union[str, Sequence[str]], price_col: str = 'price') -> SparkColCalculate the median price per grouping level.
Parameters:
Name Type Description Default groupsUnion[str, Sequence[str]]The grouping levels for calculating the average price.
required price_colstrColumn name containing the product prices.
'price'Returns:
Type Description ColumnA single entry for each grouping level, and its median price.
check_year_range(df: SparkDF, start_year: int, end_year: int, year_col: str) -> NoneCheck if a DataFrame contains all years within a given range.
This function verifies that every year from
start_yeartoend_year(inclusive) exists in the specified year column of the DataFrame. If any years are missing, it raises a ValueError.Parameters:
Name Type Description Default dfDataFrameThe input DataFrame to be checked.
required start_yearintThe starting year of the range (inclusive).
required end_yearintThe ending year of the range (inclusive).
required year_colstrThe name of the column containing integer year values.
required Returns:
Type Description NoneThe function completes successfully if all years within the specified range are present in the DataFrame.
Raises:
Type Description ValueError- If
start_yearis greater thanend_year. - If the specified
year_colis not found in the DataFrame. - If one or more years within the specified range are missing from
the DataFrame's
year_col.
Examples:
>>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.appName("YearCheckExample").getOrCreate()>>> # Example DataFrame >>> data = [(2018, "A"), (2019, "B"), (2020, "C"), (2021, "D")] >>> df = spark.createDataFrame(data, ["year", "data"])>>> # This will pass successfully and print log messages >>> check_year_range(df, start_year=2019, end_year=2021, year_col="year")>>> # This will raise a ValueError because 2022 is missing >>> try: ... check_year_range(df, start_year=2018, end_year=2022) ... except ValueError as e: ... print(f"ERROR: {e}")convert_cols_to_struct_col(df: SparkDF, struct_col_name: str, struct_cols: Optional[Sequence[str]], no_struct_col_type: T.DataTypeSingleton = T.BooleanType(), no_struct_col_value: Any = None) -> SparkDFConvert specified selection of columns to a single struct column.
As BigQuery tables do not take to having an empty struct column appended to them, this function will create a placeholder column to put into the struct column if no column names to combine are passed.
Parameters:
Name Type Description Default dfDataFrameThe input dataframe that contains the columns for combining.
required struct_col_namestrThe name of the resulting struct column.
required struct_colsOptional[Sequence[str]]A sequence of columns present in df for combining.
required no_struct_col_typeDataTypeSingletonIf no struct_cols are present, this is the type that the dummy column to place in the struct will be, default = BooleanType.
BooleanType()no_struct_col_valueAnyIf no struct_cols are present, this is the value that will be used in the dummy column, default = None.
NoneReturns:
Type Description The input dataframe with the specified struct_cols dropped and replacedwith a single struct type column containing those columns.
Raises:
Type Description ValueErrorIf not all the specified struct_cols are present in df.
convert_struc_col_to_columns(df: SparkDF, convert_nested_structs: bool = False) -> SparkDFFlatten struct columns in pyspark dataframe to individual columns.
Parameters:
Name Type Description Default dfDataFrameDataframe that may or may not contain struct type columns.
required convert_nested_structsboolIf true, function will recursively call until no structs are left. Inversely, when false, only top level structs are flattened; if these contain subsequent structs they would remain.
FalseReturns:
Type Description The input dataframe but with any struct type columns dropped, and inits place the individual fields within the struct column as individual columns.
count_nulls(df: SparkDF, subset_cols: Optional[Union[List[str], str]] = None) -> pd.DataFrameCount the number of null values in the specified columns of a SparkDF.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame to analyze.
required subset_colsOptional[Union[List[str], str]]List of column names or a single column name as a string to count null values for. If not provided, counts are calculated for all columns.
NoneReturns:
Type Description DataFrameA Pandas DataFrame with the count of null values per column.
create_colname_to_value_map(cols: Sequence[str]) -> SparkColCreate a column name to value MapType column.
create_spark_session(app_name: Optional[str] = None, size: Optional[Literal['small', 'medium', 'large', 'extra-large']] = None, extra_configs: Optional[Dict[str, str]] = None) -> SparkSessionCreate a PySpark Session based on the specified size.
This function creates a PySpark session with different configurations based on the size specified.
The size can be 'default', 'small', 'medium', 'large', or 'extra-large'. Extra Spark configurations can be passed as a dictionary. If no size is given, then a basic Spark session is spun up.
Parameters:
Name Type Description Default app_nameOptional[str]The spark session app name.
NonesizeOptional[Literal['small', 'medium', 'large', 'extra-large']]The size of the spark session to be created. It can be 'default', 'small', 'medium', 'large', or 'extra-large'.
Noneextra_configsOptional[Dict[str, str]]Mapping of additional spark session config settings and the desired value for it. Will override any default settings.
NoneReturns:
Type Description SparkSessionThe created PySpark session.
Raises:
Type Description ValueErrorIf the specified 'size' parameter is not one of the valid options: 'small', 'medium', 'large', or 'extra-large'.
ExceptionIf any other error occurs during the Spark session creation process.
Examples:
>>> spark = create_spark_session('medium', {'spark.ui.enabled': 'false'})Session Details:
'small': This is the smallest session that will realistically be used. It uses only 1g of memory and 3 executors, and only 1 core. The number of partitions are limited to 12, which can improve performance with smaller data. It's recommended for simple data exploration of small survey data or for training and demonstrations when several people need to run Spark sessions simultaneously. 'medium': A standard session used for analysing survey or synthetic datasets. Also used for some Production pipelines based on survey and/or smaller administrative data.It uses 6g of memory and 3 executors, and 3 cores. The number of partitions are limited to 18, which can improve performance with smaller data. 'large': Session designed for running Production pipelines on large administrative data, rather than just survey data. It uses 10g of memory and 5 executors, 1g of memory overhead, and 5 cores. It uses the default number of 200 partitions. 'extra-large': Used for the most complex pipelines, with huge administrative data sources and complex calculations. It uses 20g of memory and 12 executors, 2g of memory overhead, and 5 cores. It uses 240 partitions; not significantly higher than the default of 200, but it is best for these to be a multiple of cores and executors.
References
The session sizes and their details are taken directly from the following resource: "https://best-practice-and-impact.github.io/ons-spark/spark-overview/example-spark-sessions.html"
cumulative_array(df: SparkDF, array_col: str, output_colname: str) -> SparkDFConvert a PySpark array column to a cumulative array column.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame containing the array column.
required array_colstrThe name of the array column to convert.
required output_colnamestrThe name of the new column to store the cumulative array.
required Returns:
Type Description DataFrameThe SparkDF with the cumulative array column added.
cut_lineage(df: SparkDF) -> SparkDFConvert the SparkDF to a Java RDD and back again.
This function is helpful in instances where Catalyst optimiser is causing memory errors or problems, as it only tries to optimise till the conversion point.
Note: This uses internal members and may break between versions.
Parameters:
Name Type Description Default dfDataFrameSparkDF to convert.
required Returns:
Type Description DataFrameNew SparkDF created from Java RDD.
Raises:
Type Description ExceptionIf any error occurs during the lineage cutting process, particularly during conversion between SparkDF and Java RDD or accessing internal members.
Examples:
>>> df = rdd.toDF() >>> new_df = cut_lineage(df) >>> new_df.count() 3drop_duplicates_reproducible(df: SparkDF, col: str, id_col: Optional[str] = None) -> SparkDFRemove duplicates from a PySpark DataFrame in a repeatable manner.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame.
required colstrThe column to partition by for removing duplicates.
required id_colOptional[str]The column to use for ordering within each partition. If None, a unique ID column is generated.
NoneReturns:
Type Description DataFrameThe SparkDF with duplicates removed.
filter_out_values(df: SparkDF, column: str, values_to_exclude: List[Union[str, int, float]], keep_nulls: bool = True) -> SparkDFExclude rows whose column value appears in the exclusion list.
Parameters:
Name Type Description Default dfDataFrameInput DataFrame.
required columnstrName of the column to filter on.
required values_to_excludeList[Union[str, int, float]]List of values to remove.
required keep_nullsboolWhether to preserve NULL values in the column.
TrueRaises:
Type Description ValueErrorIf
values_to_excludeis empty. Ifcolumnis not found indf.Returns:
Type Description DataFrameFiltered DataFrame with specified values excluded.
Notes
isinperforms exact matching. For reliable filtering of floating-point data, prefer defining the column as DoubleType.- FloatType columns may suffer from binary precision issues, causing literal comparisons to fail unexpectedly.
- If you must filter on FloatType with approximate values, consider:
- Rounding the column to a fixed precision:
python df = df.withColumn("col", F.round(F.col("col"), 2)) filter_out_values(df, "col", [1.23, 4.56]) - Filtering by range to capture an approximate match:
python df.filter(~((F.col("col") >= 1.229) & (F.col("col") <= 1.231)))
Examples:
Keep nulls (default)
>>> data = [(1, "apple"), (2, None), (3, "banana")] >>> df = spark.createDataFrame(data, ["id", "fruit"]) >>> filter_out_values(df, "fruit", ["apple"]).show() +---+------+ | id| fruit| +---+------+ | 2| null| | 3|banana| +---+------+Exclude nulls
>>> filter_out_values(df, "fruit", ["apple"], keep_nulls=False).show() +---+------+ | id| fruit| +---+------+ | 3|banana| +---+------+find_spark_dataframes(locals_dict: Dict[str, Union[SparkDF, Dict]]) -> Dict[str, Union[SparkDF, Dict]]Extract SparkDF's objects from a given dictionary.
This function scans the dictionary and returns another containing only entries where the value is a SparkDF. It also handles dictionaries within the input, including them in the output if their first item is a SparkDF.
Designed to be used with locals() in Python, allowing extraction of all SparkDF variables in a function's local scope.
Parameters:
Name Type Description Default locals_dictDict[str, Union[DataFrame, Dict]]A dictionary usually returned by locals(), with variable names as keys and their corresponding objects as values.
required Returns:
Type Description DictA dictionary with entries from locals_dict where the value is a SparkDF or a dictionary with a SparkDF as its first item.
Examples:
>>> dfs = find_spark_dataframes(locals())get_unique(df: SparkDF, col: str, remove_null: bool = False, verbose: bool = True) -> List[Optional[Union[str, int, float]]]Return a list of unique values in a PySpark DataFrame column.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame containing the column.
required colstrThe name of the column to analyze.
required remove_nullboolWhether to remove null values from output. Default is False.
FalseverboseboolWhether to log the number of unique values. Default is True.
TrueReturns:
Type Description ListA list of unique values from the specified column.
get_window_spec(partition_cols: Optional[Union[str, Sequence[str]]] = None, order_cols: Optional[Union[str, Sequence[str]]] = None) -> WindowSpecReturn ordered and partitioned WindowSpec, defaulting to whole df.
Particularly useful when you don't know if the variable being used for partition_cols will contain values or not in advance.
Parameters:
Name Type Description Default partition_colsOptional[Union[str, Sequence[str]]]If present the columns to partition a spark dataframe on.
Noneorder_colsOptional[Union[str, Sequence[str]]]If present the columns to order a spark dataframe on (where order in sequence is order that orderBy is applied).
NoneReturns:
Type Description WindowSpecThe WindowSpec object to be applied.
Usage
window_spec = get_window_spec(...)
F.sum(values).over(window_spec)
has_no_nulls(df: SparkDF, column_name: str) -> boolCheck whether a specified column in a SparkDF contains no null values.
Parameters:
Name Type Description Default dfDataFrameSparkDF to check.
required column_namestrName of the column to inspect for null values.
required Returns:
Type Description boolTrue if the column contains no nulls, False otherwise.
is_df_empty(df: SparkDF) -> boolCheck whether a spark dataframe contains any records.
join_multi_dfs(df_list: List[SparkDF], on: Union[str, List[str]], how: str) -> SparkDFJoin multiple Spark SparkDFs together.
Parameters:
Name Type Description Default df_listList[DataFrame]List of Spark SparkDFs to join.
required onUnion[str, List[str]]Column(s) on which to join the SparkDFs.
required howstrType of join to perform (e.g., 'inner', 'outer', 'left', 'right').
required Returns:
Type Description DataFrameA SparkDF that is the result of joining all SparkDFs in the list.
load_csv(spark: SparkSession, filepath: str, keep_columns: Optional[List[str]] = None, rename_columns: Optional[Dict[str, str]] = None, drop_columns: Optional[List[str]] = None, **kwargs) -> SparkDFLoad a CSV file into a PySpark DataFrame.
spark Active SparkSession. filepath The full path and filename of the CSV file to load. keep_columns A list of column names to keep in the DataFrame, dropping all others. Default value is None. rename_columns A dictionary to rename columns where keys are existing column names and values are new column names. Default value is None. drop_columns A list of column names to drop from the DataFrame. Default value is None. kwargs Additional keyword arguments to pass to the
spark.read.csvmethod.Returns:
Type Description DataFramePySpark DataFrame containing the data from the CSV file.
Notes
Transformation order: 1. Columns are kept according to
keep_columns. 2. Columns are dropped according todrop_columns. 3. Columns are renamed according torename_columns.Raises:
Type Description ExceptionIf there is an error loading the file.
ValueErrorIf a column specified in rename_columns, drop_columns, or keep_columns is not found in the DataFrame.
Examples:
Load a CSV file with multiline and rename columns:
>>> df = load_csv( spark, "/path/to/file.csv", multiLine=True, rename_columns={"old_name": "new_name"} )Load a CSV file with a specific encoding:
>>> df = load_csv(spark, "/path/to/file.csv", encoding="ISO-8859-1")Load a CSV file and keep only specific columns:
>>> df = load_csv(spark, "/path/to/file.csv", keep_columns=["col1", "col2"])Load a CSV file and drop specific columns:
>>> df = load_csv(spark, "/path/to/file.csv", drop_columns=["col1", "col2"])Load a CSV file with custom delimiter and multiline:
>>> df = load_csv(spark, "/path/to/file.csv", sep=";", multiLine=True)map_column_names(df: SparkDF, mapper: Mapping[str, str]) -> SparkDFMap column names to the given values in the mapper.
If the column name is not in the mapper the name doesn't change.
map_column_values(df: SparkDF, dict_: Dict[str, str], input_col: str, output_col: Union[str, None] = None) -> SparkDFMap PySpark column to dictionary keys.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame to modify.
required dict_Dict[str, str]Dictionary for mapping values in input_col to new values.
required input_colstrThe name of the column to replace values in.
required output_colUnion[str, None]The name of the new column with replaced values. Defaults to input_col if not provided.
NoneReturns:
Type Description DataFrameThe SparkDF with the new column added.
melt(df: SparkDF, id_vars: Union[str, Sequence[str]], value_vars: Union[str, Sequence[str]], var_name: str = 'variable', value_name: str = 'value') -> SparkDFMelt a spark dataframe in a pandas like fashion.
Parameters:
Name Type Description Default dfDataFrameThe pyspark dataframe to melt.
required id_varsUnion[str, Sequence[str]]The names of the columns to use as identifier variables.
required value_varsUnion[str, Sequence[str]]The names of the columns containing the data to unpivot.
required var_namestrThe name of the target column containing variable names (i.e. the original column names).
'variable'value_namestrThe name of the target column containing the unpivoted data.
'value'Returns:
Type Description DataFrameThe "melted" input data as a pyspark data frame.
Examples:
>>> df = spark.createDataFrame( ... [[1, 2, 3, 4], ... [5, 6, 7, 8], ... [9, 10, 11, 12]], ... ["col1", "col2", "col3", "col4"]) >>> melt(df=df, id_vars="col1", value_vars=["col2", "col3"]).show() +----+--------+-----+ |col1|variable|value| +----+--------+-----+ | 1| col2| 2| | 1| col3| 3| | 5| col2| 6| | 5| col3| 7| | 9| col2| 10| | 9| col3| 11| +----+--------+-----+>>> melt(df=df, id_vars=["col1", "col2"], value_vars=["col3", "col4"] ... ).show() +----+----+--------+-----+ |col1|col2|variable|value| +----+----+--------+-----+ | 1| 2| col3| 3| | 1| 2| col4| 4| | 5| 6| col3| 7| | 5| 6| col4| 8| | 9| 10| col3| 11| | 9| 10| col4| 12| +----+----+--------+-----+pyspark_random_uniform(df: SparkDF, output_colname: str, lower_bound: float = 0, upper_bound: float = 1, seed: Optional[int] = None) -> SparkDFMimic numpy.random.uniform for PySpark.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame to which the column will be added.
required output_colnamestrThe name of the new column to be created.
required lower_boundfloatThe lower bound of the uniform distribution. Defaults to 0.
0upper_boundfloatThe upper bound of the uniform distribution. Defaults to 1.
1seedOptional[int]Seed for random number generation. Defaults to None for non-deterministic results.
NoneReturns:
Type Description DataFrameThe SparkDF with the new column added.
rank_numeric(numeric: Union[str, Sequence[str]], group: Union[str, Sequence[str]], ascending: bool = False) -> SparkColRank a numeric and assign a unique value to each row.
The
F.row_number()method has been selected as a method to rank as gives a unique number to each row. Other methods such asF.rank()andF.dense_rank()do not assign unique values per row.Parameters:
Name Type Description Default numericUnion[str, Sequence[str]]The column name or list of column names containing values which will be ranked.
required groupUnion[str, Sequence[str]]The grouping levels to rank the numeric column or columns over.
required ascendingboolDictates whether high or low values are ranked as the top value.
FalseReturns:
Type Description ColumnContains a rank for the row in its grouping level.
select_first_obs_appearing_in_group(df: SparkDF, group: Sequence[str], date_col: str, ascending: bool) -> SparkDFRank and select observation in group based on earliest or latest date.
Given that there can be multiple observations per group, select observation that appears first or last (depending on whether ascending is set to True or False, respectively).
Parameters:
Name Type Description Default dfDataFrameThe input dataframe that contains the group and date_col.
required groupSequence[str]The grouping levels required to find the observation that appears first or last (depending on whether ascending is set to True or False, respectively)
required date_colstrColumn name containing the dates of each observation.
required ascendingboolDictates whether first or last observation within a grouping is selected (depending on whether ascending is set to True or False, respectively).
required Returns:
Type Description DataFrameThe input dataframe that contains each observation per group that appeared first or last (depending on whether ascending is set to True or False, respectively) according to date_col.
set_df_columns_nullable(df: SparkDF, column_list: List[str], nullable: Optional[bool] = True) -> SparkDFChange specified columns nullable value.
Sometimes find that spark creates columns that have the nullable attribute set to False, which can cause issues if this dataframe is saved to a table as it will set the schema for that column to not allow missing values.
Changing this parameter for a column appears to be very difficult (and also potentially costly [see so answer comments] - SO USE ONLY IF NEEDED).
The solution implemented is taken from Stack Overflow post: https://stackoverflow.com/a/51821437
Parameters:
Name Type Description Default dfDataFrameThe dataframe with columns to have nullable attribute changed.
required column_listList[str]List of columns to change nullable attribute.
required nullableOptional[bool]The value to set the nullable attribute to for the specified columns.
TrueReturns:
Type Description DataFrameThe input dataframe but with nullable attribute changed for specified columns.
set_nulls(df: SparkDF, column: str, values: Union[str, List[str]]) -> SparkDFReplace specified values with nulls in given column of PySpark df.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame to modify.
required columnstrThe name of the column in which to replace values.
required valuesUnion[str, List[str]]The value(s) to replace with nulls.
required Returns:
Type Description DataFrameThe SparkDF with specified values replaced by nulls.
smart_coalesce(df: SparkDF, target_file_size_mb: int = 512) -> SparkDFCoalesce a Spark DataFrame to an appropriate number of partitions.
Coalesces a Spark DataFrame to an appropriate number of partitions based on its estimated size using Spark's Catalyst optimiser and a user-defined target file size.
This function helps to reduce the number of output files written when saving a DataFrame to storage systems such as Hive or Amazon S3 by adjusting the number of partitions using
.coalesce(). It is especially useful for avoiding the "small files problem", which can negatively affect performance, metadata management, and query planning.It leverages Spark Catalyst's query plan statistics to get a logical estimate of the DataFrame's size in bytes without triggering a full job or action. Based on the provided
target_file_size_mb, it calculates how many output files are needed and reduces the number of partitions accordingly.Parameters:
Name Type Description Default dfDataFrameThe input Spark DataFrame that will be written to storage.
required target_file_size_mbintThe desired maximum size of each output file in megabytes. This controls the number of output files by estimating how many are needed to approximately match the total data volume. Default is 512 MB.
512Returns:
Type Description DataFrameA Spark DataFrame with a reduced number of partitions, ready to be written to disk using
.write(). The number of partitions is chosen to produce output files close to the target size.Notes
- This function uses Spark Catalyst's logical plan statistics. These may be outdated or unavailable if statistics haven't been collected (e.g., ANALYZE TABLE not run).
- If the estimated size is zero or unavailable, it defaults to a single partition.
- This function uses
.coalesce()which avoids a shuffle but can cause skew if the data is unevenly distributed. For very large datasets, consider usingrepartition()instead. - This function is best used as a final optimisation before writing output files, especially to S3, Hive, or HDFS.
Why Small Files Are a Problem
Writing many small files (e.g., thousands of files per partition) negatively impacts:
- Hive Metastore:
- Hive must track every individual file in the metastore.
-
Too many files lead to slow table listings, metadata queries, and planning time.
-
Spark Performance:
- During reads, Spark spawns a task per file.
-
Thousands of tiny files = thousands of tasks = job scheduling overhead + slow query startup.
-
S3 Performance:
- S3 is object storage, not a filesystem. Each file written = one PUT request.
- Too many files increase write latency and cost.
- During reads, many GET requests slow down performance.
Examples:
Reduce number of output files for a moderate-sized DataFrame:
>>> coalesced_df = smart_coalesce(df, target_file_size_mb=200) >>> coalesced_df.write.mode("overwrite").saveAsTable("my_optimised_table")sum_columns(df: SparkDF, cols_to_sum: List[str], output_col: str) -> SparkDFCalculate row-wise sum of specified PySpark columns.
Parameters:
Name Type Description Default dfDataFrameThe PySpark DataFrame to modify.
required cols_to_sumList[str]List of column names to sum together.
required output_colstrThe name of the new column to create with the sum.
required Returns:
Type Description DataFrameThe SparkDF with the new column added.
to_list(df: SparkDF) -> List[Union[Any, List[Any]]]Convert Spark DF to a list.
Returns:
Type Description list or list of listsIf the input DataFrame has a single column then a list of column values will be returned. If the DataFrame has multiple columns then a list of row data as lists will be returned.
to_spark_col(_func=None, *, exclude: Sequence[str] = None) -> CallableConvert str args to Spark Column if not already.
Usage
Use as a decorator on a function.
To convert all string arguments to spark column
@to_spark_col def my_func(arg1, arg2)
To exclude a string arguments from being converted to a spark column
@to_spark_col(exclude=['arg2']) def my_func(arg1, arg2)
transform(self, f, *args, **kwargs)Chain Pyspark function.
truncate_external_hive_table(spark: SparkSession, table_identifier: str) -> NoneTruncate an External Hive table stored on S3 or HDFS.
Parameters:
Name Type Description Default sparkSparkSessionActive SparkSession.
required table_identifierstrThe name of the Hive table to truncate. This can either be in the format '
. ' or simply '
' if the current Spark session has a database set.
required Returns:
Type Description NoneThis function does not return any value. It performs an action of truncating the table.
Raises:
Type Description ValueErrorIf the table name is incorrectly formatted, the database is not provided when required, or if the table does not exist.
AnalysisExceptionIf there is an issue with partition operations or SQL queries.
ExceptionIf there is a general failure during the truncation process.
Examples:
Truncate a Hive table named 'my_database.my_table':
>>> truncate_external_hive_table(spark, 'my_database.my_table')Or, if the current Spark session already has a database set:
>>> spark.catalog.setCurrentDatabase('my_database') >>> truncate_external_hive_table(spark, 'my_table')union_mismatched_dfs(df1: SparkDF, df2: SparkDF) -> SparkDFPerform a union between PySpark DataFrames with mismatched column names.
Parameters:
Name Type Description Default df1DataFrameThe first PySpark DataFrame.
required df2DataFrameThe second PySpark DataFrame.
required Returns:
Type Description DataFrameA SparkDF resulting from the union of df1 and df2, with missing columns filled with null values.
union_multi_dfs(df_list: List[SparkDF]) -> SparkDFPerform a union on all SparkDFs in the provided list.
Note
All SparkDFs must have the same columns.
Parameters:
Name Type Description Default df_listList[DataFrame]List of PySpark DataFrames to union.
required Returns:
Type Description DataFrameA SparkDF that is the result of the union of all SparkDFs in the list.
unpack_list_col(df: SparkDF, list_col: str, unpacked_col: str) -> SparkDFUnpack a spark column containing a list into multiple rows.
Parameters:
Name Type Description Default dfDataFrameContains the list column to unpack.
required list_colstrThe name of the column which contains lists.
required unpacked_colstrThe name of the column containing the unpacked list items.
required Returns:
Type Description DataFrameContains a new row for each unpacked list item.
rdsa_utils.helpers.pythonMiscellaneous helper functions for Python.
always_iterable_local(obj: Any) -> CallableSupplement more-itertools
always_iterableto also exclude dicts.By default it would convert a dictionary to an iterable of just its keys, dropping all the values. This change makes it so dictionaries are not altered (similar to how strings aren't broken down).
calc_product_of_dict_values(**kwargs: Mapping[str, Union[str, float, Iterable]]) -> Mapping[str, any]Create cartesian product of values for each kwarg.
In order to create product of values, the values are converted to a list so that product of values can be derived.
Yields:
Type Description Next result of cartesian product of kwargs values.Example
my_dict = { 'key1': 1, 'key2': [2, 3, 4] }
list(calc_product_of_dict_values(**my_dict))
[{'key1': 1, 'key2': 2}, {'key1': 1, 'key2': 3}, {'key1': 1, 'key2': 4}]
Notes
Modified from: https://stackoverflow.com/a/5228294
check_file(filepath: str) -> boolCheck if a file exists on the local file system and meets specific criteria.
This function checks whether the given path corresponds to a valid file on the local or network file system. It ensures the file exists, is not a directory, and its size is greater than zero bytes.
Parameters:
Name Type Description Default filepathstrThe path to a local/network file.
required Returns:
Type Description boolTrue if the file exists, is not a directory, and size > 0, otherwise False.
Example
check_file("folder/file.txt") True check_file("folder/file_0_bytes.txt") False
convert_date_strings_to_datetimes(start_date: str, end_date: str) -> Tuple[pd.Timestamp, pd.Timestamp]Convert start and end dates from strings to timestamps.
Parameters:
Name Type Description Default start_datestrDatetime like object which is used to define the start date for filter. Acceptable string formats include (but not limited to): MMMM YYYY, YYYY-MM, YYYY-MM-DD, DD MMM YYYY etc. If only month and year specified the start_date is set as first day of month.
required end_datestrDatetime like object which is used to define the start date for filter. Acceptable string formats include (but not limited to): MMMM YYYY, YYYY-MM, YYYY-MM-DD, DD MMM YYYY etc. If only month and year specified the end_date is set as final day of month.
required Returns:
Type Description tuple[Timestamp, Timestamp]Tuple where the first value is the start date and the second the end date.
convert_types_iterable(lst: Iterable, dtype: type = float) -> ListConvert the data type of elements in an iterable.
Parameters:
Name Type Description Default lstIterableThe iterable whose elements are to be converted.
required dtypetypeThe target data type to which elements in the iterable should be converted. Defaults to
float.floatReturns:
Type Description listA new list with elements converted to the specified data type.
Examples:
>>> convert_types_iterable([1, 2, 3]) [1.0, 2.0, 3.0]>>> convert_types_iterable((10, 20, 30), dtype=str) ['10', '20', '30']>>> convert_types_iterable({'a', 'b', 'c'}, dtype=ord) [97, 98, 99]>>> convert_types_iterable(['10', '20', '30'], dtype=int) [10, 20, 30]create_folder(dirpath: str) -> NoneCreate a directory on a local network drive.
Parameters:
Name Type Description Default dirpathstrThe path to the directory to create.
required Returns:
Type Description NoneThe directory will be created if it does not already exist, including parent directories.
Example
create_folder("example_folder/subfolder")
The directory "example_folder/subfolder" will be created if it does not exist.
directory_exists(dirpath: str) -> boolTest if given path is a directory on the local file system.
Parameters:
Name Type Description Default dirpathstrThe directory path to check exists.
required Returns:
Type Description boolTrue if the dirpath is a directory, False otherwise.
Example
directory_exists("folder") True directory_exists("non_existing_folder") dirpath='.../non_existing_folder' cannot be found. False
dump_environment_requirements(output_file: str, tool: str = 'pip', args: List[str] = ['list', '--format=freeze']) -> NoneDump the current Python environment dependencies to a text file.
Parameters:
Name Type Description Default output_filestrPath to the output text file where the list of dependencies will be saved. If the directory does not exist, it will be created.
required toolstrThe command-line tool to use for exporting dependencies (e.g. 'pip', 'poetry', or 'uv'). Default is 'pip'.
'pip'argsList[str]The arguments to pass to the selected tool. For pip, the default is ['list', '--format=freeze']. For poetry, a common option is ['export', '--without-hashes']. For uv, you might use ['pip', 'freeze'].
['list', '--format=freeze']Returns:
Type Description NoneThis function writes to the specified file and does not return anything.
Examples:
>>> dump_environment_requirements("requirements.txt") >>> dump_environment_requirements( ... "requirements.txt", ... tool="pip", ... args=["freeze"] ... ) >>> dump_environment_requirements( ... "requirements.txt", ... tool="poetry", ... args=["export", "--without-hashes"] ... ) >>> dump_environment_requirements( ... "requirements.txt", ... tool="uv", ... args=["pip", "freeze"] ... )extend_lists(sections: List[List[str]], elements_to_add: List[str]) -> NoneCheck list elements are unique then append to existing list.
Note the
.extendmethod in Python overwrites each section. There is no need to assign a variable to this function, the section will update automatically.The function can be used with the
load_configfunction to extend a value list in a config yaml file. For example with aconfig.yamlfile as per below:input_columns - col_a - col_b output_columns - col_bTo add column
col_cthe function can be utilised as follows:config = load_config("config.yaml") sections = [config['input_columns'], config['output_columns']] elements_to_add = ['col_c'] extend_lists(sections, elements_to_add)The output will be as follows.
input_columns - col_a - col_b - col_c output_columns - col_b - col_cParameters:
Name Type Description Default sectionsList[List[str]]The section to be updated with the extra elements.
required elements_to_addList[str]The new elements to add to the specified sections.
required Returns:
Type Description NoneNote the
.extendmethod in Python overwrites the sections. There is no need to assign a variable to this function, the section will update automatically.file_exists(filepath: str) -> boolTest if file exists on the local file system.
Parameters:
Name Type Description Default filepathstrFilepath of file check exists.
required Returns:
Type Description boolTrue if the file exists, else False.
Example
file_exists("folder/file.txt") True file_exists("folder/non_existing_file.txt") filepath='.../folder/non_existing_file.txt' cannot be found. False
file_size(filepath: str) -> intReturn the size of the file from the network drive in bytes.
Parameters:
Name Type Description Default filepathstrThe filepath of file to check for size.
required Returns:
Type Description intAn integer value indicating the size of the file in bytes
Raises:
Type Description FileNotFoundErrorIf the file does not exist.
Example
file_size("folder/file.txt") 90 file_size("folder/non_existing_file.txt") FileNotFoundError: filepath='.../folder/non_existing_file.txt' cannot be found.
flatten_iterable(iterable: Iterable, types_to_flatten: Union[type, Tuple] = (list, tuple)) -> ListFlatten an iterable.
Parameters:
Name Type Description Default iterableIterableAn iterable that may contain elements of various types.
required types_to_flattenUnion[type, Tuple]Data type(s) that should be flattened. Defaults to (list, tuple).
(list, tuple)Returns:
Type Description listA flattened list with all elements from the input iterable, with specified types unpacked.
Examples:
>>> flatten_iterable([1, [2, 3], (4, 5), 'abc']) [1, 2, 3, 4, 5, 'abc'] >>> flatten_iterable([1, [2, 3], (4, 5), 'abc'], types_to_flatten=list) [1, 2, 3, (4, 5), 'abc'] >>> flatten_iterable(['a', 'bc', ['d', 'e']], types_to_flatten=str) ['a', 'b', 'c', 'd', 'e'] >>> flatten_iterable((1, [2, 3], (4, 5), 'abc'), types_to_flatten=(list, tuple)) (1, 2, 3, 4, 5, 'abc')interleave_iterables(iterable1: Iterable, iterable2: Iterable) -> ListInterleave two iterables element by element.
Parameters:
Name Type Description Default iterable1IterableThe first iterable to interleave.
required iterable2IterableThe second iterable to interleave.
required Returns:
Type Description listA new list with elements from
iterable1anditerable2interleaved.Raises:
Type Description TypeErrorIf either of the inputs is not an iterable of types: list, tuple, string, or range.
ValueErrorIf the lengths of the two iterables do not match.
Examples:
>>> interleave_iterables([1, 2, 3], [4, 5, 6]) [1, 4, 2, 5, 3, 6]>>> interleave_iterables((1, 2, 3), ('a', 'b', 'c')) [1, 'a', 2, 'b', 3, 'c']>>> interleave_iterables('ABC', '123') ['A', '1', 'B', '2', 'C', '3']>>> interleave_iterables(range(3), range(10, 13)) [0, 10, 1, 11, 2, 12]list_convert(obj: Any) -> List[Any]Convert object to list using more-itertools'
always_iterable.md5_sum(filepath: str) -> strGet md5sum of a specific file on the local file system.
Parameters:
Name Type Description Default filepathstrFilepath of file to create md5 hash from.
required Returns:
Type Description strThe md5sum of the file.
Raises:
Type Description FileNotFoundErrorIf the file does not exist.
Example
md5_sum("folder/file.txt") "d41d8cd98f00b204e9800998ecf8427e" md5_sum("folder/non_existing_file.txt") FileNotFoundError: filepath='../folder/non_existing_file.txt' cannot be found.
merge_multi_dfs(df_list: list, on: Union[str, list], how: str, fillna_val: Union[None, object] = None) -> pd.DataFramePerform consecutive merges on a list of pandas DataFrames.
Parameters:
Name Type Description Default df_listlistA list of DataFrames to be merged.
required onUnion[str, list]Column name(s) to merge on.
required howstrType of merge to be performed. Must be one of 'left', 'right', 'outer', 'inner'.
required fillna_valUnion[None, object]Value to replace missing values with. Default is None.
NoneReturns:
Type Description DataFrameThe resulting DataFrame after merging and optional filling of missing values.
Raises:
Type Description TypeErrorIf
df_listis not a list of pandas DataFrames, oronis not a string or list of strings, orhowis not a string.ValueErrorIf the
howargument is not one of 'left', 'right', 'outer', or 'inner'.Examples:
>>> import pandas as pd >>> df1 = pd.DataFrame({'key': ['A', 'B', 'C'], 'value1': [1, 2, 3]}) >>> df2 = pd.DataFrame({'key': ['A', 'B'], 'value2': [4, 5]}) >>> df3 = pd.DataFrame({'key': ['A'], 'value3': [6]}) >>> merge_multi_dfs([df1, df2, df3], on='key', how='inner') key value1 value2 value3 0 A 1 4 6>>> df1 = pd.DataFrame({'key': ['A', 'B', 'C'], 'value1': [1, 2, 3]}) >>> df2 = pd.DataFrame({'key': ['A', 'B'], 'value2': [4, 5]}) >>> merge_multi_dfs([df1, df2], on='key', how='outer', fillna_val=0) key value1 value2 0 A 1 4 1 B 2 5 2 C 3 0overwrite_dictionary(base_dict: Mapping[str, Any], override_dict: Mapping[str, Any]) -> Dict[str, Any]Overwrite dictionary values with user defined values.
The following restrictions are in place: * base_dict and override_dict have the same value which is not dictionary then override_dict has priority. * If base_dict contains dictionary and override_dict contains a value (e.g. string or list) with the same key, priority is upon base_dict and the override_dict value is ignored. * If key is in override_dict but not in base_dict then an Exception is raised and code stops. * Any other restrictions will require code changes.
Parameters:
Name Type Description Default base_dictMapping[str, Any]Dictionary containing existing key value pairs.
required override_dictMapping[str, Any]Dictionary containing new keys/values to inset into base_dict.
required Returns:
Type Description Dict[str, Any]The base_dict with any keys matching the override_dict being replaced. Any keys not present in base_dict are appended.
Example
dic1 = {"var1": "value1", "var2": {"var3": 1.1, "var4": 4.4}, "var5": [1, 2, 3]} dic2 = {"var2": {"var3": 9.9}}
overwrite_dictionary(dic1, dic2) {'var1': 'value1', 'var2': {'var3': 9.9, 'var4': 4.4}, 'var5': [1, 2, 3]}
dic3 = {"var2": {"var3": 9.9}, "var6": -1} overwrite_dictionary(dic1, dic3) ERROR main: ('var6', -1) not in base_dict
Notes
Modified from: https://stackoverflow.com/a/58742155
Warning
Due to recursive nature of function, the function will overwrite the base_dict object that is passed into the original function.
Raises:
Type Description ValueErrorIf a key is present in override_dict but not base_dict.
pairwise_iterable(iterable: Iterable) -> zipReturn pairs of adjacent values from the input iterable.
Parameters:
Name Type Description Default iterableIterableAn iterable object (e.g., list, tuple, string) from which pairs of adjacent values will be generated.
required Returns:
Type Description zipAn iterator of tuples, each containing a pair of adjacent values from the input iterable.
Raises:
Type Description TypeErrorIf the input is not an iterable.
Examples:
>>> list(pairwise_iterable([1, 2, 3, 4])) [(1, 2), (2, 3), (3, 4)]>>> list(pairwise_iterable('abcde')) [('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e')]>>> list(pairwise_iterable((10, 20, 30))) [(10, 20), (20, 30)]parse_pyproject_metadata(pyproject_path: Path) -> Dict[str, Optional[str]]Parse project metadata from a
pyproject.tomlfile.This function reads the TOML file at pyproject_path and extracts a subset of fields from the [project] table: the project name, required Python version, and package version.
Parameters:
Name Type Description Default pyproject_pathPathPath to the
pyproject.tomlfile.required Returns:
Type Description Dict[str, Optional[str]]A dictionary with the following keys: - name : str or None The project name. - requires_python : str or None The Python version specifier (from requires-python). - package_version : str or None The package version.
Raises:
Type Description FileNotFoundErrorIf the file does not exist.
TOMLDecodeErrorIf the file content is not valid TOML.
Examples:
>>> from pathlib import Path >>> meta = parse_pyproject_metadata(Path("pyproject.toml")) >>> meta["name"] 'my-package'read_header(filepath: str) -> strReturn the first line of a file on the local file system.
Reads the first line and removes the newline/returncarriage symbol.
Parameters:
Name Type Description Default filepathstrThe path to a local/network file.
required Returns:
Type Description strThe first line of the file as a string.
Raises:
Type Description FileNotFoundErrorIf the file does not exist.
Example
read_header("folder/file.txt") "This is the first line of the file." read_header("folder/non_existing_file.txt") FileNotFoundError: filepath='.../folder/non_existing_file.txt' cannot be found.
setdiff(a: Iterable, b: Iterable) -> List[Any]Return a list of elements that are present in
abut not inb.Parameters:
Name Type Description Default aIterableThe first iterable from which elements are to be selected.
required bIterableThe second iterable containing elements to be excluded.
required Returns:
Type Description listA list of elements that are in
abut not inb.Examples:
>>> setdiff([1, 2, 3, 4], [3, 4, 5, 6]) [1, 2] >>> setdiff('abcdef', 'bdf') ['a', 'c', 'e'] >>> setdiff({1, 2, 3}, {2, 3, 4}) [1] >>> setdiff(range(5), range(2, 7)) [0, 1]sha256_sum(filepath: str) -> strGet SHA256 hash of a specific file on the local file system.
Parameters:
Name Type Description Default filepathstrFilepath of file to create SHA256 hash from.
required Returns:
Type Description strThe SHA256 hash of the file.
Raises:
Type Description FileNotFoundErrorIf the file does not exist.
Example
sha256_sum("folder/file.txt") "9c56cc51b374c3b6e7b8e1e8b4e1e8b4e1e8b4e1e8b4e1e8b4e1e8b4e1e8b4e1e8" sha256_sum("folder/non_existing_file.txt") FileNotFoundError: filepath='../folder/non_existing_file.txt' cannot be found.
time_it(*timer_args, **timer_kwargs) -> CallableMeasure the execution time of a function, with options to configure Timer.
Parameters:
Name Type Description Default timer_argsPositional arguments to pass to the Timer object.
()timer_kwargsKeyword arguments to pass to the Timer object.
{}Returns:
Type Description CallableA wrapped function that includes timing measurement.
Example
@time_it() def example_function(): # Function implementation
tuple_convert(obj: Any) -> Tuple[Any]Convert object to tuple using more-itertools'
always_iterable.validate_env_vars(required_vars: List[str]) -> NoneValidate that required environment variables are present and non-empty.
This function checks whether each name in
required_varsexists in the current process environment (os.environ) and has a non-empty value. Variable names are stripped of surrounding whitespace and de-duplicated before validation. If any variables are missing (unset or empty), the function logs an error and exits by raisingSystemExit.Parameters:
Name Type Description Default required_varsList[str]Environment variable names to validate.
required Returns:
Type Description NoneThis function is intended for its side effects (validation and logging).
Raises:
Type Description TypeErrorIf
required_varsis not a list of non-empty strings.SystemExitIf one or more required environment variables are missing or empty.
Examples:
Success case ^^^^^^^^^^^^
>>> import os >>> os.environ["DB_HOST"] = "localhost" >>> os.environ["DB_PORT"] = "5432" >>> validate_env_vars(["DB_HOST", "DB_PORT"]) # no exceptionMissing variable ^^^^^^^^^^^^^^^^
>>> import os >>> os.environ["DB_HOST"] = "localhost" >>> validate_env_vars(["DB_HOST", "DB_PORT"]) Traceback (most recent call last): ... SystemExit: [ERROR] Missing environment variables: DB_PORTEmpty value counts as missing ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>>> import os >>> os.environ["API_KEY"] = "" # empty string -> missing >>> validate_env_vars(["API_KEY"]) Traceback (most recent call last): ... SystemExit: [ERROR] Missing environment variables: API_KEYwrite_string_to_file(content: bytes, filepath: str) -> NoneWrite a string into the specified file path on the local file system.
Parameters:
Name Type Description Default contentbytesThe content to write into the file.
required filepathstrThe path to the file where the content will be written. If the file already exists, it will be overwritten.
required Returns:
Type Description NoneExample
write_string_to_file(b"Hello, World!", "example.txt")
The content "Hello, World!" will be written to "example.txt"
IO
rdsa_utils.io.configModule for code relating to loading config files.
LoadConfig(config_path: Union[CloudPath, Path], config_overrides: Optional[Config] = None, config_type: Optional[Literal['json', 'toml', 'yaml']] = None, config_validators: Optional[Dict[str, BaseModel]] = None)Class for loading and storing a configuration file.
Attributes:
Name Type Description configThe loaded config stored as a dictionary.
config_dirThe logical parent directory of loaded
config_path.config_fileThe file name of the loaded
config_path.config_originalThe configuration dictionary as initially loaded, prior to applying any overrides or validation.
config_overridesThe configuration override dictionary, if provided.
config_pathThe path of the loaded config file.
config_typeThe file type of the loaded config file.
config_validatorsThe validators used to validate the loaded config, if provided.
**attrsEvery top level key in the loaded config is also set as an attribute to allow simpler access to each config section.
Init method.
Parameters:
Name Type Description Default config_pathUnion[CloudPath, Path]The path of the config file to be loaded.
required config_overridesOptional[Config]A dictionary containing a subset of the keys and values of the config file that is initially loaded, by default None. If values are provided that are not in the initial config then a ConfigError is raised.
NoneoptionalOptional[Config]A dictionary containing a subset of the keys and values of the config file that is initially loaded, by default None. If values are provided that are not in the initial config then a ConfigError is raised.
Noneconfig_typeOptional[Literal['json', 'toml', 'yaml']]The file type of the config file being loaded, by default None. If not specified then this is inferred from the
config_path.NoneoptionalOptional[Literal['json', 'toml', 'yaml']]The file type of the config file being loaded, by default None. If not specified then this is inferred from the
config_path.Noneconfig_validatorsOptional[Dict[str, BaseModel]]A dictionary made up of key, value pairs where the keys refer to the top level sections of the loaded config, and the values are a pydantic validation class for the section, by default None. If only some of the keys are specified with validators, warnings are raised to alert that they have not been validated.
NoneoptionalOptional[Dict[str, BaseModel]]A dictionary made up of key, value pairs where the keys refer to the top level sections of the loaded config, and the values are a pydantic validation class for the section, by default None. If only some of the keys are specified with validators, warnings are raised to alert that they have not been validated.
Nonerdsa_utils.io.inputModule containing generic input functionality code.
parse_json(data: str) -> ConfigParse JSON formatted string into a dictionary.
Parameters:
Name Type Description Default datastrString containing standard JSON-formatted data.
required Returns:
Type Description ConfigA dictionary containing the parsed data.
Raises:
Type Description JSONDecodeErrorIf the string format of config_overrides cannot be decoded by json.loads (i.e. converted to a dictionary).
parse_toml(data: str) -> ConfigParse TOML formatted string into a dictionary.
Parameters:
Name Type Description Default datastrString containing standard TOML-formatted data.
required Returns:
Type Description ConfigA dictionary containing the parsed data.
parse_yaml(data: str) -> ConfigParse YAML formatted string into a dictionary.
Parameters:
Name Type Description Default datastrString containing standard YAML-formatted data.
required Returns:
Type Description ConfigA dictionary containing the parsed data.
read_file(file: Union[CloudPath, Path]) -> strLoad contents of specified file.
Parameters:
Name Type Description Default fileUnion[CloudPath, Path]The absolute file path of the file to be read.
required Returns:
Type Description strThe contents of the provided file.
Raises:
Type Description FileNotFoundErrorIf the provided file does not exist.
rdsa_utils.io.outputModule containing generic output functionality code.
zip_folder(source_dir: str, output_filename: str, overwrite: bool = False) -> boolZip the contents of the specified directory.
Parameters:
Name Type Description Default source_dirstrThe directory whose contents are to be zipped.
required output_filenamestrThe output zip file name. It must end with '.zip'.
required overwriteboolIf True, overwrite the existing zip file if it exists. Default is False.
FalseReturns:
Type Description boolTrue if the directory was zipped successfully, False otherwise.
Examples:
>>> zip_folder('/path/to/source_dir', 'output.zip', overwrite=True) TrueMethods
rdsa_utils.methods.averaging_methodsWeighted and unweighted averaging functions.
get_weight_shares(weights: str, levels: Optional[Union[str, Sequence[str]]] = None) -> SparkColDivide weights by sum of weights for each group.
unweighted_arithmetic_average(val: str) -> SparkColCalculate the unweighted arithmetic average.
unweighted_geometric_average(val: str) -> SparkColCalculate the unweighted geometric average.
weighted_arithmetic_average(val: str, weight: str) -> SparkColCalculate the weighted arithmetic average.
weighted_geometric_average(val: str, weight: str) -> SparkColCalculate the weighted geometric average.