Skip to content

API Reference

This part of the project documentation focuses on an information-oriented approach. Use it as a reference for the technical implementation of therdsa-utils codebase.

General

rdsa_utils.exceptions

Common custom exceptions that can be raised in pipelines.

The purpose of these is to provide a clearer indication of why an error is being raised over the standard builtin errors (e.g. ColumnNotInDataframeError vs ValueError).

ColumnNotInDataframeError

Bases: Exception

Custom exception to raise when a column is not present in dataframe.

ConfigError

Bases: Exception

Custom exception to raise when there is an issue in a config object.

DataframeEmptyError

Bases: Exception

Custom exception to raise when a dataframe is empty.

InvalidBucketNameError

Bases: Exception

Custom exception to raise when an AWS S3 or GCS bucket name is invalid.

InvalidS3FilePathError

Bases: Exception

Custom exception to raise when an AWS S3 file path is invalid.

PipelineError

Bases: Exception

Custom exception to raise when there is a generic pipeline issue.

TableNotFoundError

Bases: Exception

Custom exception to raise when a table to be read is not found.

rdsa_utils.logging

Contains the logging configuration for files and method to initialise it.

add_warning_message_to_function(_func: Callable = None, *, message: Optional[str] = None) -> Callable

Apply decorator to log a warning message.

If a message is passed, this decorator adds a warning log of the form function_name: message

Parameters:

Name Type Description Default
message Optional[str]

The message to be logged along with the function name.

None
Notes

Explainer on complex decorators (and template for decorator structure): https://realpython.com/primer-on-python-decorators/#both-please-but-never-mind-the-bread

Usage

To use decorator to log a warning:

>>> @_add_warning_message_to_function(message='here be dragons...')
>>> def my_func(some_args, some_other_args):
>>>    ...
>>>
>>> some_output = my_func(...)

Warning my_func: here be dragons...

init_logger_advanced(log_level: int, handlers: Optional[List[logging.Handler]] = None, log_format: str = None, date_format: str = None) -> None

Instantiate a logger with provided handlers.

This function allows the logger to be used across modules. Logs can be handled by any number of handlers, e.g., FileHandler, StreamHandler, etc., provided in the handlers list.

Parameters:

Name Type Description Default
log_level int

The level of logging to be recorded. Can be defined either as the integer level or the logging. values in line with the definitions of the logging module. (see - https://docs.python.org/3/library/logging.html#levels)

required
handlers Optional[List[Handler]]

List of handler instances to be added to the logger. Each handler instance must be a subclass of logging.Handler. Default is an empty list, and in this case, basicConfig with log_level, log_format, and date_format is used.

None
log_format str

The format of the log message. If not provided, a default format '%(asctime)s %(levelname)s %(name)s: %(message)s' is used.

None
date_format str

The format of the date in the log message. If not provided, a default format '%Y-%m-%d %H:%M:%S' is used.

None

Returns:

Type Description
None

The logger created by this function is available in any other modules by using logging.getLogger(__name__) at the global scope level in a module (i.e., below imports, not in a function).

Raises:

Type Description
ValueError

If any item in the handlers list is not an instance of logging.Handler.

Examples:

>>> file_handler = logging.FileHandler('logfile.log')
>>> rich_handler = RichHandler()
>>> init_logger_advanced(
...     logging.DEBUG,
...     [file_handler, rich_handler],
...     "%(levelname)s: %(message)s",
...     "%H:%M:%S"
... )

init_logger_basic(log_level: int) -> None

Instantiate a basic logger object to be used across modules.

By using this function to instantiate the logger, you also have access to logger.dev for log_level=15, as this is defined in the same module scope as this function.

Parameters:

Name Type Description Default
log_level int

The level of logging to be recorded. Can be defined either as the integer level or the logging. values in line with the definitions of the logging module (see - https://docs.python.org/3/library/logging.html#levels)

required

Returns:

Type Description
None

The logger created by this function is available in any other modules by using logger = logging.getLogger(__name__) at the global scope level in a module (i.e. below imports, not in a function).

log_dev(self, message, *args, **kwargs)

Create a custom log level between INFO and DEBUG named DEV.

This is lifted from: https://stackoverflow.com/a/13638084

log_rows_in_spark_df(func: Callable) -> Callable

Apply decorator to log dataframe row count before and after a function.

Requires that the function being decorated has a parameter called df and that the function is called with df being a keyword argument (e.g. df=df). If not the decorator will report back that it could not count the number of rows of the dataframe before running the decorated function.

Usage
@log_rows_in_spark_df
def my_func_that_changes_no_rows(some_args, df, some_other_args):
   ...
   returns final_df

some_df = my_func_that_changes_no_rows(
    some_args='hello',
    df=input_df,
    some_other_args='world'
)

>>> Rows in dataframe before my_func_that_changes_no_rows : 12345
>>> Rows in dataframe after my_func_that_changes_no_rows  : 6789
Warning:

.count() is an expensive spark operation to perform. Overuse of this decorator can be detrimental to performance. This decorator will cache the input dataframe prior to running the count and decorated function, as well as persisting the output dataframe prior to counting. The input dataframe is also unpersisted from memory prior to the decorator completing.

log_spark_df_schema(_func: Callable = None, *, log_schema_on_input: bool = True) -> Callable

Apply decorator to log dataframe schema before and after a function.

If you use the `df.printSchema() method directly in a print/log statement the code is processed and printed regardless of logging leve. Instead you need to capture the output and pass this to the logger. See explanaition here - https://stackoverflow.com/a/59935109

Requires that the function being decorated has a parameter called df and that the function is called with df being a keyword argument (e.g. df=df). If not the decorator will report back that it could not count the number of rows of the dataframe before running the decorated function.

Parameters:

Name Type Description Default
log_schema_on_input bool

If set to false, then no schema is attempted to be printed for the decorated function on input. This is useful for instance where function has no df input but does return one (such as when reading a table).

True
Notes

Explainer on complex decorators (and template for decorator structure): https://realpython.com/primer-on-python-decorators/#both-please-but-never-mind-the-bread

Usage

To use decorator to record input and output schema:

>>> @log_spark_df_schema
>>> def my_func_that_changes_some_columns(some_args, df, some_other_args):
>>>    ...
>>>    returns final_df
>>>
>>> some_df = my_func_that_changes_some_columns(
>>>     some_args='hello',
>>>     df=input_df,
>>>     some_other_args='world'
>>> )

Schema of dataframe before my_func_that_changes_some_columns:
root
|-- price: double (nullable = true)
|-- quantity: long (nullable = true)

Schema of dataframe after my_func_that_changes_some_columns:
root
|-- price: double (nullable = true)
|-- quantity: long (nullable = true)
|-- expenditure: double (nullable = true)

To use decorator to record output schema only:

>>> @log_spark_df_schema(log_schema_on_input=False)
>>> def my_func_that_changes_some_columns(some_args, df, some_other_args):
>>>    ...
>>>    returns final_df
>>>
>>> some_df = my_func_that_changes_some_columns(
>>>     some_args='hello',
>>>     df=input_df,
>>>     some_other_args='world'
>>> )

Not printing schema of dataframe before my_func_that_changes_some_columns

Schema of dataframe after my_func_that_changes_some_columns:
root
|-- price: double (nullable = true)
|-- quantity: long (nullable = true)
|-- expenditure: double (nullable = true)

print_full_table_and_raise_error(df: pd.DataFrame, message: str, stop_pipeline: bool = False, show_records: bool = False) -> None

Output dataframe records to logger.

The purpose of this function is to enable a user to output a message to the logger with the added functionality of stopping the pipeline and showing dataframe records in a table format. It may be used for instance if a user wants to check the records in a dataframe when it expected to be empty.

Parameters:

Name Type Description Default
df DataFrame

The dataframe to display records from.

required
message str

The message to output to the logger.

required
stop_pipeline bool

Switch for the user to stop the pipeline and raise an error.

False
show_records bool

Switch to show records in a dataframe.

False

Returns:

Type Description
None

Displays message to user however nothing is returned from function.

Raises:

Type Description
ValueError

Raises error and stops pipeline if switch applied.

timer_args(name: str, logger: Optional[Callable[[str], None]] = logger.info) -> Dict[str, str]

Initialise timer args workaround for 'text' args in codetiming package.

Works with codetiming==1.4.0

Parameters:

Name Type Description Default
name str

The name of the specific timer log.

required
logger Optional[Callable[[str], None]]

Optional logger function that can accept a string argument.

info

Returns:

Type Description
Dict[str, str]

Dictionary of arguments to pass to specifc codetiming package Timer.

rdsa_utils.test_utils

Functions and fixtures used with test suites.

Case(label: Optional[str] = None, marks: Optional[MarkDecorator] = None, **kwargs)

Container for a test case, with optional test ID.

The Case class is to be used in conjunction with parameterize_cases.

Attributes:

Name Type Description
label
Optional test ID. Will be displayed for each test when
running `pytest -v`.

marks Optional pytest marks to denote any tests to skip etc. kwargs Parameters used for the test cases.

Examples:

>>> Case(label="some test name", foo=10, bar="some value")

>>> Case(
>>>     label="some test name",
>>>     marks=pytest.mark.skip(reason='not implemented'),
>>>     foo=10,
>>>     bar="some value"
>>> )
See Also

Modified from https://github.com/ckp95/pytest-parametrize-cases to allow pytest mark usage.

Initialise objects.

__repr__() -> str

Return string.

create_dataframe(data: List[Tuple[str]], **kwargs) -> pd.DataFrame

Create pandas df from tuple data with a header.

create_spark_df(spark_session)

Create Spark DataFrame from tuple data with first row as schema.

Example:

create_spark_df([ ('column1', 'column2', 'column3'), ('aaaa', 1, 1.1) ])

Can specify the schema alongside the column names: create_spark_df([ ('column1 STRING, column2 INT, column3 DOUBLE'), ('aaaa', 1, 1.1) ])

parametrize_cases(*cases: Case)

More user friendly parameterize cases testing.

Utilise as a decorator on top of test function.

Examples:

@parameterize_cases(
    Case(
        label="some test name",
        foo=10,
        bar="some value"
    ),
    Case(
        label="some test name #2",
        foo=20,
        bar="some other value"
    ),
)
def test(foo, bar):
    ...
See Also

Source: https://github.com/ckp95/pytest-parametrize-cases

spark_session()

Set up spark session fixture.

suppress_py4j_logging()

Suppress spark logging.

to_date(dt: str) -> datetime.date

Convert date string to datetime.date type.

to_datetime(dt: str) -> datetime.datetime

Convert datetime string to datetime.datetime type.

to_spark(spark_session)

Convert pandas df to spark.

rdsa_utils.typing

Contains custom types for type hinting.

rdsa_utils.validation

Functions that support the use of pydantic validators.

allowed_date_format(date: str) -> str

Ensure that the date string can be converted to a useable datetime.

Parameters:

Name Type Description Default
date str

The specified date string.

required

Returns:

Type Description
str

The input date.

Raises:

Type Description
ValueError

If the date is not one of the predefined allowed formats.

apply_validation(config: Mapping[str, Any], Validator: Optional[BaseModel]) -> Mapping[str, Any]

Apply validation model to config.

If no Validator is passed, then a warning will be logged and the input config returned without validation. This mechanism is to allow the use of this function to aid in tracking config sections that are unvalidated.

Parameters:

Name Type Description Default
config Mapping[str, Any]

The config for validating.

required
Validator Optional[BaseModel]

Validator class for the config.

required
optional Optional[BaseModel]

Validator class for the config.

required

Returns:

Type Description
Mapping[str, Any]

The input config after being passed through the validator.

list_convert_validator(*args, **kwargs) -> Callable

Wrapper to set kwargs for list_convert validator.

CDP

rdsa_utils.cdp.helpers.s3_utils

Utility functions for interacting with AWS S3.

To initialise a boto3 client for S3 and configure it with Ranger RAZ and SSL certificate, you can use the following code snippet:


import boto3
import raz_client

ssl_file_path = "/path/to/your/ssl_certificate.crt"

# Create a boto3 client for S3
client = boto3.client("s3")

# Configure the client with RAZ and SSL certificate
raz_client.configure_ranger_raz(client, ssl_file=ssl_file_path)

Note:
- The `raz-client` library is required only when running in a
  managed Cloudera environment.
- You can install it using `pip install raz-client` when needed.

copy_file(client: boto3.client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str, overwrite: bool = False) -> bool

Copy a file from one aWS S3 bucket to another.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
source_bucket_name str

The name of the source bucket.

required
source_object_name str

The S3 object name of the source file.

required
destination_bucket_name str

The name of the destination bucket.

required
destination_object_name str

The S3 object name of the destination file.

required
overwrite bool

If True, overwrite the destination file if it already exists.

False

Returns:

Type Description
bool

True if the file was copied successfully, otherwise False.

Examples:

>>> client = boto3.client('s3')
>>> copy_file(
...     client,
...     'source-bucket',
...     'source_file.txt',
...     'destination-bucket',
...     'destination_file.txt'
... )
True

create_folder_on_s3(client: boto3.client, bucket_name: str, folder_path: str) -> bool

Create a folder in an AWS S3 bucket if it doesn't already exist.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the bucket where the folder will be created.

required
folder_path str

The name of the folder to create.

required

Returns:

Type Description
bool

True if the folder was created successfully or already exists, otherwise False.

Examples:

>>> client = boto3.client('s3')
>>> create_folder_on_s3(client, 'mybucket', 'new_folder/')
True

delete_file(client: boto3.client, bucket_name: str, object_name: str, overwrite: bool = False) -> bool

Delete a file from an AWS S3 bucket.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the bucket from which the file will be deleted.

required
object_name str

The S3 object name of the file to delete.

required
overwrite bool

If False, the function will not delete the file if it does not exist; set to True to ignore non-existence on delete.

False

Returns:

Type Description
bool

True if the file was deleted successfully, otherwise False.

Examples:

>>> client = boto3.client('s3')
>>> delete_file(client, 'mybucket', 'folder/s3_file.txt')
True

delete_folder(client: boto3.client, bucket_name: str, folder_path: str) -> bool

Delete a folder in an AWS S3 bucket.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the S3 bucket.

required
folder_path str

The path of the folder to delete.

required

Returns:

Type Description
bool

True if the folder was deleted successfully, otherwise False.

Examples:

>>> client = boto3.client('s3')
>>> delete_folder(client, 'mybucket', 'path/to/folder/')
True

download_file(client: boto3.client, bucket_name: str, object_name: str, local_path: str, overwrite: bool = False) -> bool

Download a file from an AWS S3 bucket to a local directory.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the S3 bucket from which to download the file.

required
object_name str

The S3 object name of the file to download.

required
local_path str

The local file path where the downloaded file will be saved.

required
overwrite bool

If True, overwrite the local file if it exists.

False

Returns:

Type Description
bool

True if the file was downloaded successfully, False otherwise.

Examples:

>>> client = boto3.client('s3')
>>> download_file(
...     client,
...     'mybucket',
...     'folder/s3_file.txt',
...     '/path/to/download.txt'
... )
True

download_folder(client: boto3.client, bucket_name: str, prefix: str, local_path: str, overwrite: bool = False) -> bool

Download a folder from an AWS S3 bucket to a local directory.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the S3 bucket from which to download the folder.

required
prefix str

The S3 prefix of the folder to download.

required
local_path str

The local directory path where the downloaded folder will be saved.

required
overwrite bool

If True, overwrite existing local files if they exist.

False

Returns:

Type Description
bool

True if the folder was downloaded successfully, False otherwise.

Examples:

>>> client = boto3.client('s3')
>>> download_folder(
...     client,
...     'mybucket',
...     'folder/subfolder/',
...     '/path/to/local_folder',
...     overwrite=False
... )
True

file_exists(client: boto3.client, bucket_name: str, object_name: str) -> bool

Check if a specific file exists in an AWS S3 bucket.

Parameters:

Name Type Description Default
client client

The boto3 S3 client.

required
bucket_name str

The name of the bucket.

required
object_name str

The S3 object name to check for existence.

required

Returns:

Type Description
bool

True if the file exists, otherwise False.

Examples:

>>> client = boto3.client('s3')
>>> file_exists(client, 'mybucket', 'folder/file.txt')
True

is_s3_directory(client: boto3.client, bucket_name: str, object_name: str) -> bool

Check if an AWS S3 key is a directory by listing its contents.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the S3 bucket.

required
object_name str

The S3 object name to check.

required

Returns:

Type Description
bool

True if the key represents a directory, False otherwise.

list_files(client: boto3.client, bucket_name: str, prefix: str = '') -> List[str]

List files in an AWS S3 bucket that match a specific prefix.

Parameters:

Name Type Description Default
client client

The boto3 S3 client.

required
bucket_name str

The name of the bucket.

required
prefix str

The prefix to filter files, by default "".

''

Returns:

Type Description
List[str]

A list of S3 object keys matching the prefix.

Examples:

>>> client = boto3.client('s3')
>>> list_files(client, 'mybucket', 'folder_prefix/')
['folder_prefix/file1.txt', 'folder_prefix/file2.txt']

load_csv(client: boto3.client, bucket_name: str, filepath: str, keep_columns: Optional[List[str]] = None, rename_columns: Optional[Dict[str, str]] = None, drop_columns: Optional[List[str]] = None, **kwargs) -> pd.DataFrame

Load a CSV file from an S3 bucket into a Pandas DataFrame.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the S3 bucket.

required
filepath str

The key (full path and filename) of the CSV file in the S3 bucket.

required
keep_columns Optional[List[str]]

A list of column names to keep in the DataFrame, dropping all others. Default value is None.

None
rename_columns Optional[Dict[str, str]]

A dictionary to rename columns where keys are existing column names and values are new column names. Default value is None.

None
drop_columns Optional[List[str]]

A list of column names to drop from the DataFrame. Default value is None.

None
kwargs

Additional keyword arguments to pass to the pd.read_csv method.

{}

Returns:

Type Description
DataFrame

Pandas DataFrame containing the data from the CSV file.

Raises:

Type Description
InvalidBucketNameError

If the bucket name does not meet AWS specifications.

InvalidS3FilePathError

If the file_path contains an S3 URI scheme like 's3://' or 's3a://'.

Exception

If there is an error loading the file.

ValueError

If a column specified in rename_columns, drop_columns, or keep_columns is not found in the DataFrame.

Notes

Transformation order: 1. Columns are kept according to keep_columns. 2. Columns are dropped according to drop_columns. 3. Columns are renamed according to rename_columns.

Examples:

Load a CSV file and rename columns:

>>> df = load_csv(
        client,
        "my-bucket",
        "path/to/file.csv",
        rename_columns={"old_name": "new_name"}
    )

Load a CSV file and keep only specific columns:

>>> df = load_csv(
        client,
        "my-bucket",
        "path/to/file.csv",
        keep_columns=["col1", "col2"]
    )

Load a CSV file and drop specific columns:

>>> df = load_csv(
        client,
        "my-bucket",
        "path/to/file.csv",
        drop_columns=["col1", "col2"]
    )

Load a CSV file with custom delimiter:

>>> df = load_csv(
        client,
        "my-bucket",
        "path/to/file.csv",
        sep=";"
    )

load_json(client: boto3.client, bucket_name: str, filepath: str, encoding: Optional[str] = 'utf-8') -> Dict

Load a JSON file from an S3 bucket.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the S3 bucket.

required
filepath str

The key (full path and filename) of the JSON file in the S3 bucket.

required
encoding Optional[str]

The encoding of the JSON file. Default is 'utf-8'.

'utf-8'

Returns:

Type Description
Dict

Dictionary containing the data from the JSON file.

Raises:

Type Description
InvalidBucketNameError

If the bucket name is invalid according to AWS rules.

Exception

If there is an error loading the file from S3 or parsing the JSON.

Examples:

>>> client = boto3.client('s3')
>>> data = load_json(client, 'my-bucket', 'path/to/file.json')
>>> print(data)
{
    "name": "John",
    "age": 30,
    "city": "Manchester"
}

move_file(client: boto3.client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str) -> bool

Move a file within or between AWS S3 buckets.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
source_bucket_name str

The name of the source S3 bucket.

required
source_object_name str

The S3 object name of the source file.

required
destination_bucket_name str

The name of the destination S3 bucket.

required
destination_object_name str

The S3 object name of the destination file.

required

Returns:

Type Description
bool

True if the file was moved successfully, False otherwise.

Examples:

>>> client = boto3.client('s3')
>>> move_file(
...     client,
...     'sourcebucket',
...     'source_folder/file.txt',
...     'destbucket',
...     'dest_folder/file.txt'
... )
True

remove_leading_slash(text: str) -> str

Remove the leading forward slash from a string if present.

Parameters:

Name Type Description Default
text str

The text from which the leading slash will be removed.

required

Returns:

Type Description
str

The text stripped of its leading slash.

Examples:

>>> remove_leading_slash('/example/path')
'example/path'

upload_file(client: boto3.client, bucket_name: str, local_path: str, object_name: Optional[str] = None, overwrite: bool = False) -> bool

Upload a file to an Amazon S3 bucket from local directory.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the target S3 bucket.

required
local_path str

The file path on the local system to upload.

required
object_name Optional[str]

The target S3 object name. If None, uses the base name of the local file path.

None
overwrite bool

If True, the existing file on S3 will be overwritten.

False

Returns:

Type Description
bool

True if the file was uploaded successfully, False otherwise.

Examples:

>>> client = boto3.client('s3')
>>> upload_file(
...     client,
...     'mybucket',
...     '/path/to/file.txt',
...     'folder/s3_file.txt'
... )
True

upload_folder(client: boto3.client, bucket_name: str, local_path: str, prefix: str = '', overwrite: bool = False) -> bool

Upload an entire folder from the local file system to an AWS S3 bucket.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the bucket to which the folder will be uploaded.

required
local_path str

The path to the local folder to upload.

required
prefix str

The prefix to prepend to each object name when uploading to S3.

''
overwrite bool

If True, overwrite existing files in the bucket.

False

Returns:

Type Description
bool

True if the folder was uploaded successfully, otherwise False.

Examples:

>>> client = boto3.client('s3')
>>> upload_folder(
...     client,
...     'mybucket',
...     '/path/to/local/folder',
...     'folder_prefix',
...     True
... )
True

validate_bucket_name(bucket_name: str) -> str

Validate the format of an AWS S3 bucket name according to AWS rules.

Parameters:

Name Type Description Default
bucket_name str

The name of the bucket to validate.

required

Returns:

Type Description
str

The validated bucket name if valid.

Raises:

Type Description
InvalidBucketNameError

If the bucket name does not meet AWS specifications.

Examples:

>>> validate_bucket_name('valid-bucket-name')
'valid-bucket-name'
>>> validate_bucket_name('Invalid_Bucket_Name')
InvalidBucketNameError: Bucket name must not contain underscores.

validate_s3_file_path(file_path: str, allow_s3_scheme: bool) -> str

Validate the file path based on the S3 URI scheme.

If allow_s3_scheme is True, the file path must contain an S3 URI scheme (either 's3://' or 's3a://').

If allow_s3_scheme is False, the file path should not contain an S3 URI scheme.

Parameters:

Name Type Description Default
file_path str

The file path to validate.

required
allow_s3_scheme bool

Whether or not to allow an S3 URI scheme in the file path.

required

Returns:

Type Description
str

The validated file path if valid.

Raises:

Type Description
InvalidS3FilePathError

If the validation fails based on the value of allow_s3_scheme.

Examples:

>>> validate_s3_file_path('data_folder/data.csv', allow_s3_scheme=False)
'data_folder/data.csv'
>>> validate_s3_file_path('s3a://bucket-name/data.csv', allow_s3_scheme=True)
's3a://bucket-name/data.csv'
>>> validate_s3_file_path('s3a://bucket-name/data.csv', allow_s3_scheme=False)
InvalidS3FilePathError: The file_path should not contain an S3 URI scheme
like 's3://' or 's3a://'.

write_csv(client: boto3.client, bucket_name: str, data: pd.DataFrame, filepath: str, **kwargs) -> bool

Write a Pandas Dataframe to csv in an S3 bucket.

Uses StringIO library as a RAM buffer, so at first Pandas writes data to the buffer, then the buffer returns to the beginning, and then it is sent to the S3 bucket using the boto3.put_object method.

Parameters:

Name Type Description Default
client client

The boto3 S3 client instance.

required
bucket_name str

The name of the S3 bucket.

required
data DataFrame

The dataframe to write to the spexified path.

required
filepath str

The filepath to save the dataframe to.

required
kwargs

Optional dictionary of Pandas to_csv arguments.

{}

Returns:

Type Description
bool

True if the dataframe is written successfully. False if it was not possible to serialise or write the file.

Raises:

Type Description
Exception

If there is an error writing the file to S3.

Examples:

>>> client = boto3.client('s3')
>>> data = pd.DataFrame({
>>>     'column1': [1, 2, 3],
>>>     'column2': ['a', 'b', 'c']
>>> })
>>> write_csv(client, 'my_bucket', data, 'path/to/file.csv')
True

rdsa_utils.cdp.helpers.hdfs_utils

Utility functions for interacting with HDFS.

change_permissions(path: str, permission: str, recursive: bool = False) -> bool

Change directory and file permissions in HDFS.

Parameters:

Name Type Description Default
path str

The path to the file or directory in HDFS.

required
permission str

The permission to be set, e.g., 'go+rwx' or '777'.

required
recursive bool

If True, changes permissions for all subdirectories and files within a directory.

False

Returns:

Type Description
bool

True if the operation was successful (command return code 0), otherwise False.

copy(from_path: str, to_path: str, overwrite: bool = False) -> bool

Copy a file in HDFS.

Parameters:

Name Type Description Default
from_path str

The source path of the file in HDFS.

required
to_path str

The target path of the file in HDFS.

required
overwrite bool

If True, the existing file at the target path will be overwritten, default is False.

False

Returns:

Type Description
bool

True if the operation was successful (command return code 0), otherwise False.

Raises:

Type Description
TimeoutExpired

If the process does not complete within the default timeout.

copy_local_to_hdfs(from_path: str, to_path: str) -> bool

Copy a local file to HDFS.

Parameters:

Name Type Description Default
from_path str

The path to the local file.

required
to_path str

The path to the HDFS directory where the file will be copied.

required

Returns:

Type Description
bool

True if the operation was successful (command return code 0), otherwise False.

create_dir(path: str) -> bool

Create a directory in HDFS.

Parameters:

Name Type Description Default
path str

The HDFS path where the directory should be created.

required

Returns:

Type Description
bool

True if the operation is successful (directory created), otherwise False.

create_txt_from_string(path: str, string_to_write: str, replace: Optional[bool] = False) -> None

Create and populate a text file in HDFS.

Parameters:

Name Type Description Default
path str

The path to the new file to be created, for example, '/some/directory/newfile.txt'.

required
string_to_write str

The string that will populate the new text file.

required
replace Optional[bool]

Flag determining whether an existing file should be replaced. Defaults to False.

False

Returns:

Type Description
None

This function doesn't return anything; it's used for its side effect of creating a text file.

Raises:

Type Description
FileNotFoundError

If replace is False and the file already exists.

delete_dir(path: str) -> bool

Delete an empty directory from HDFS.

This function attempts to delete an empty directory in HDFS. If the directory is not empty, the deletion will fail.

Parameters:

Name Type Description Default
path str

The HDFS path to the directory to be deleted.

required

Returns:

Type Description
bool

True if the operation is successful (directory deleted), otherwise False.

Note

This function will only succeed if the directory is empty. To delete directories containing files or other directories, consider using delete_path instead.

delete_file(path: str) -> bool

Delete a specific file in HDFS.

This function is used to delete a single file located at the specified HDFS path. If the path points to a directory, the command will fail.

Parameters:

Name Type Description Default
path str

The path to the file in HDFS to be deleted.

required

Returns:

Type Description
bool

True if the file was successfully deleted (command return code 0), otherwise False.

Raises:

Type Description
TimeoutExpired

If the process does not complete within the default timeout.

Note

This function is intended for files only. For directory deletions, use delete_dir or delete_path.

delete_path(path: str) -> bool

Delete a file or directory in HDFS, including non-empty directories.

This function is capable of deleting both files and directories. When applied to directories, it will recursively delete all contents within the directory, making it suitable for removing directories regardless of whether they are empty or contain files or other directories.

Parameters:

Name Type Description Default
path str

The path to the file or directory in HDFS to be deleted.

required

Returns:

Type Description
bool

True if the file was successfully deleted (command return code 0), otherwise False.

Raises:

Type Description
TimeoutExpired

If the process does not complete within the default timeout.

Warning

Use with caution: applying this function to a directory will remove all contained files and subdirectories without confirmation.

file_exists(path: str) -> bool

Check whether a file exists in HDFS.

Parameters:

Name Type Description Default
path str

The path to the file in HDFS to be checked for existence.

required

Returns:

Type Description
bool

True if the file exists (command return code 0), otherwise False.

Raises:

Type Description
TimeoutExpired

If the process does not complete within the default timeout.

get_date_modified(filepath: str) -> str

Return the last modified date of a file in HDFS.

Parameters:

Name Type Description Default
filepath str

The path to the file in HDFS.

required

Returns:

Type Description
str

The date the file was last modified.

is_dir(path: str) -> bool

Test if a directory exists in HDFS.

Parameters:

Name Type Description Default
path str

The HDFS path to the directory to be tested.

required

Returns:

Type Description
bool

True if the operation is successful (directory exists), otherwise False.

move_local_to_hdfs(from_path: str, to_path: str) -> bool

Move a local file to HDFS.

Parameters:

Name Type Description Default
from_path str

The path to the local file.

required
to_path str

The path to the HDFS directory where the file will be moved.

required

Returns:

Type Description
bool

True if the operation was successful (command return code 0), otherwise False.

read_dir(path: str) -> List[str]

Read the contents of a directory in HDFS.

Parameters:

Name Type Description Default
path str

The path to the directory in HDFS.

required

Returns:

Type Description
List[str]

A list of full paths of the items found in the directory.

read_dir_files(path: str) -> List[str]

Read the filenames in a directory in HDFS.

Parameters:

Name Type Description Default
path str

The path to the directory in HDFS.

required

Returns:

Type Description
List[str]

A list of filenames in the directory.

read_dir_files_recursive(path: str, return_path: bool = True) -> List[str]

Recursively reads the contents of a directory in HDFS.

Parameters:

Name Type Description Default
path str

The path to the directory in HDFS.

required
return_path bool

If True, returns the full path of the files, otherwise just the filename.

True

Returns:

Type Description
List[str]

A list of files in the directory.

rename(from_path: str, to_path: str, overwrite: bool = False) -> bool

Rename (i.e., move using full path) a file in HDFS.

Parameters:

Name Type Description Default
from_path str

The source path of the file in HDFS.

required
to_path str

The target path of the file in HDFS.

required
overwrite bool

If True, the existing file at the target path will be overwritten, default is False.

False

Returns:

Type Description
bool

True if the operation was successful (command return code 0), otherwise False.

Raises:

Type Description
TimeoutExpired

If the process does not complete within the default timeout.

rdsa_utils.cdp.helpers.impala

Utilities for working with Impala.

invalidate_impala_metadata(table: str, impalad_address_port: str, impalad_ca_cert: str, keep_stderr: Optional[bool] = False)

Automate the invalidation of a table's metadata using impala-shell.

This function uses the impala-shell command with the given impalad_address_port and impalad_ca_cert, to invalidate a specified table's metadata.

It proves useful during a data pipeline's execution after writing to an intermediate Hive table. Using Impala Query Editor in Hue, end-users often need to run "INVALIDATE METADATA" command to refresh a table's metadata. However, this manual step can be missed, leading to potential use of outdated metadata.

The function automates the "INVALIDATE METADATA" command for a given table, ensuring up-to-date metadata for future queries. This reduces manual intervention, making outdated metadata issues less likely to occur.

Parameters:

Name Type Description Default
table str

Name of the table for metadata invalidation.

required
impalad_address_port str

'address:port' of the impalad instance.

required
impalad_ca_cert str

Path to impalad's CA certificate file.

required
keep_stderr Optional[bool]

If True, will print impala-shell command's stderr output.

False

Returns:

Type Description
None

Examples:

>>> invalidate_impala_metadata(
...     'my_table',
...     'localhost:21050',
...     '/path/to/ca_cert.pem'
... )
>>> invalidate_impala_metadata(
...     'my_table',
...     'localhost:21050',
...     '/path/to/ca_cert.pem',
...     keep_stderr=True
... )

rdsa_utils.cdp.io.pipeline_runlog

Utilities for managing a Pipeline Runlog using Hive Tables.

add_runlog_entry(spark: SparkSession, desc: str, version: str, config: Union[ConfigParser, Dict[str, str]], pipeline: Optional[str] = None, log_table: str = 'pipeline_runlog', run_id: Optional[int] = None) -> DataFrame

Add an entry to a target runlog.

Parameters:

Name Type Description Default
spark SparkSession

A running spark session.

required
desc str

Description to attach to the log entry.

required
version str

Version of the pipeline.

required
config Union[ConfigParser, Dict[str, str]]

Configuration object for the run.

required
pipeline Optional[str]

Pipeline name. If None, uses the spark application name.

None
log_table str

Target runlog table. If database not set, this should include the database.

'pipeline_runlog'
run_id Optional[int]

Run id to use if already reserved. If not specified, a new one is generated.

None

Returns:

Type Description
DataFrame

The log entry returned as a spark dataframe.

create_runlog_entry(spark: SparkSession, run_id: int, desc: str, version: str, config: Union[ConfigParser, Dict[str, str]], pipeline: Optional[str] = None) -> DataFrame

Create an entry for the runlog.

Parameters:

Name Type Description Default
spark SparkSession

A running spark session.

required
run_id int

Entry run id.

required
desc str

Description to attach to the log entry.

required
version str

Version of the pipeline.

required
config Union[ConfigParser, Dict[str, str]]

Configuration object for the run.

required
pipeline Optional[str]

Pipeline name. If None, derives from spark app name.

None

Returns:

Type Description
DataFrame

The log entry returned as a spark dataframe.

create_runlog_table(spark: SparkSession, database: str, tablename: Optional[str] = 'pipeline_runlog') -> None

Create runlog and _reserved_ids tables in the target database if needed.

This function executes two SQL queries to create two tables, if they do not already exist in the target database. The first table's structure includes columns for run_id, desc, user, datetime, pipeline_name, pipeline_version, and config, while the second table includes run_id and reserved_date.

Parameters:

Name Type Description Default
spark SparkSession

A running spark session which will be used to execute SQL queries.

required
database str

The name of the target database where tables will be created.

required
tablename Optional[str]

The name of the main table to be created (default is "pipeline_runlog"). The associated _reserved_ids table will be suffixed with this name.

'pipeline_runlog'

Returns:

Type Description
None

Examples:

>>> spark = SparkSession.builder.appName("test_session").getOrCreate()
>>> create_runlog_table(spark, "test_db", "test_table")

get_last_run_id(spark: SparkSession, pipeline: Optional[str] = None, log_table: str = 'pipeline_runlog') -> Optional[int]

Retrieve the last run_id, either in general or for a specific pipeline.

Parameters:

Name Type Description Default
spark SparkSession

A running Spark session.

required
pipeline Optional[str]

If specified, the result will be for the listed pipeline only.

None
log_table str

The target runlog table. If the database is not set, this should include the database.

'pipeline_runlog'

Returns:

Type Description
int or None

The id of the last run. Returns None if the log table is empty.

get_penultimate_run_id(spark: SparkSession, pipeline: Optional[str] = None, log_table: str = 'pipeline_runlog') -> Optional[int]

Retrieve penultimate run_id in general or a specific pipeline.

Parameters:

Name Type Description Default
spark SparkSession

A running Spark session.

required
pipeline Optional[str]

If specified, the result will be for the listed pipeline only.

None
log_table str

The target runlog table. If the database is not set, this should include the database.

'pipeline_runlog'

Returns:

Type Description
int or None

The id of the penultimate run. Returns None if the log table is empty or has less than two entries.

reserve_id(spark: SparkSession, log_table: Optional[str] = 'pipeline_runlog') -> int

Reserve a run id in the reserved ids table linked to the runlog table.

The function reads the last run id from the reserved ids table, increments it to create a new id,and writes the new id with the current timestamp to the reserved ids table.

Parameters:

Name Type Description Default
spark SparkSession

A running SparkSession instance.

required
log_table Optional[str]

The name of the main pipeline runlog table associated with this reserved id table, by default "pipeline_runlog".

'pipeline_runlog'

Returns:

Type Description
int

The new run id.

write_runlog_file(spark: SparkSession, runlog_table: str, runlog_id: int, path: str) -> None

Write metadata from runlog entry to a text file.

Parameters:

Name Type Description Default
spark SparkSession

A running SparkSession instance.

required
runlog_table str

The name of the table containing the runlog entries.

required
runlog_id int

The id of the desired entry.

required
path str

The HDFS path where the file will be written.

required

Returns:

Type Description
None

This function doesn't return anything; it's used for its side effect of creating a text file.

rdsa_utils.cdp.io.input

Read inputs on CDP.

extract_database_name(spark: SparkSession, long_table_name: str) -> Tuple[str, str]

Extract the database component and table name from a compound table name.

This function can handle multiple scenarios:

  1. For GCP's naming format '..

    ', the function will return the database and table name.

  2. If the name is formatted as 'db_name.table_name', the function will extract and return the database and table names.

  3. If the long_table_name contains only the table name (e.g., 'table_name'), the function will use the current database of the SparkSession.

  4. For any other incorrectly formatted names, the function will raise a ValueError.

  5. Parameters:

    Name Type Description Default
    spark SparkSession

    Active SparkSession.

    required
    long_table_name str

    Full name of the table, which can include the GCP project and/or database name.

    required

    Returns:

    Type Description
    Tuple[str, str]

    A tuple containing the name of the database and the table name.

    Raises:

    Type Description
    ValueError

    If the table name doesn't match any of the expected formats.

get_current_database(spark: SparkSession) -> str

Retrieve the current database from the active SparkSession.

get_tables_in_database(spark: SparkSession, database_name: str) -> List[str]

Get a list of tables in a given database.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
database_name str

The name of the database from which to list tables.

required

Returns:

Type Description
List[str]

A list of table names in the specified database.

Raises:

Type Description
ValueError

If there is an error fetching tables from the specified database.

Examples:

>>> tables = get_tables_in_database(spark, "default")
>>> print(tables)
['table1', 'table2', 'table3']

load_and_validate_table(spark: SparkSession, table_name: str, skip_validation: bool = False, err_msg: str = None, filter_cond: str = None, keep_columns: Optional[List[str]] = None, rename_columns: Optional[Dict[str, str]] = None, drop_columns: Optional[List[str]] = None) -> SparkDF

Load a table, apply transformations, and validate if it is not empty.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
table_name str

Name of the table to load.

required
skip_validation bool

If True, skips validation step, by default False.

False
err_msg str

Error message to return if table is empty, by default None.

None
filter_cond str

Condition to apply to SparkDF once read, by default None.

None
keep_columns Optional[List[str]]

A list of column names to keep in the DataFrame, dropping all others. Default value is None.

None
rename_columns Optional[Dict[str, str]]

A dictionary to rename columns where keys are existing column names and values are new column names. Default value is None.

None
drop_columns Optional[List[str]]

A list of column names to drop from the DataFrame. Default value is None.

None

Returns:

Type Description
DataFrame

Loaded SparkDF if validated, subject to options above.

Raises:

Type Description
PermissionError

If there's an issue accessing the table or if the table does not exist in the specified database.

ValueError

If the table is empty after loading, becomes empty after applying a filter condition, or if columns specified in keep_columns, drop_columns, or rename_columns do not exist in the DataFrame.

Notes

Transformation order: 1. Columns are kept according to keep_columns. 2. Columns are dropped according to drop_columns. 3. Columns are renamed according to rename_columns.

Examples:

Load a table, apply a filter, and validate it:

>>> df = load_and_validate_table(
        spark=spark,
        table_name="my_table",
        filter_cond="age > 21"
    )

Load a table and keep only specific columns:

>>> df = load_and_validate_table(
        spark=spark,
        table_name="my_table",
        keep_columns=["name", "age", "city"]
    )

Load a table, drop specific columns, and rename a column:

>>> df = load_and_validate_table(
        spark=spark,
        table_name="my_table",
        drop_columns=["extra_column"],
        rename_columns={"name": "full_name"}
    )

Load a table, skip validation, and apply all transformations:

>>> df = load_and_validate_table(
        spark=spark,
        table_name="my_table",
        skip_validation=True,
        keep_columns=["name", "age", "city"],
        drop_columns=["extra_column"],
        rename_columns={"name": "full_name"},
        filter_cond="age > 21"
    )

rdsa_utils.cdp.io.output

Write outputs on CDP.

insert_df_to_hive_table(spark: SparkSession, df: SparkDF, table_name: str, overwrite: bool = False, fill_missing_cols: bool = False, repartition_data_by: Union[int, str, None] = None) -> None

Write SparkDF to Hive table with optional configuration.

This function writes data from a SparkDF into a Hive table, handling missing columns and optional repartitioning. It ensures the table's column order matches the DataFrame and manages different overwrite behaviors for partitioned and non-partitioned data.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
df DataFrame

SparkDF containing data to be written.

required
table_name str

Name of the Hive table to write data into.

required
overwrite bool

Controls how existing data is handled, default is False:

For non-partitioned data: - True: Replaces entire table with DataFrame data. - False: Appends DataFrame data to existing table.

For partitioned data: - True: Replaces data only in partitions present in DataFrame. - False: Appends data to existing partitions or creates new ones.

False
fill_missing_cols bool

If True, adds missing columns as NULL values. If False, raises an error on schema mismatch, default is False.

  • Explicitly casts DataFrame columns to match the Hive table schema to avoid type mismatch errors.
  • Adds missing columns as NULL values when fill_missing_cols is True, regardless of their data type (e.g., String, Integer, Double, Boolean, etc.).
False
repartition_data_by Union[int, str, None]

Controls data repartitioning, default is None: - int: Sets target number of partitions. - str: Specifies column to repartition by. - None: No repartitioning performed.

None
Notes

When using repartition with a number: - Affects physical file structure but preserves Hive partitioning scheme. - Controls number of output files per write operation per Hive partition. - Maintains partition-based query optimization.

When repartitioning by column: - Helps balance file sizes across Hive partitions. - Reduces creation of small files.

Raises:

Type Description
AnalysisException

If there's an error reading the table. This can occur if the table doesn't exist or if there's no access to it.

ValueError

If the SparkDF schema does not match the Hive table schema and 'fill_missing_cols' is set to False.

DataframeEmptyError

If input DataFrame is empty.

Exception

For other general exceptions when writing data to the table.

Examples:

Write a DataFrame to a Hive table without overwriting:

>>> insert_df_to_hive_table(
...     spark=spark,
...     df=df,
...     table_name="my_database.my_table"
... )

Overwrite an existing table with a DataFrame:

>>> insert_df_to_hive_table(
...     spark=spark,
...     df=df,
...     table_name="my_database.my_table",
...     overwrite=True
... )

Write a DataFrame to a Hive table with missing columns filled:

>>> insert_df_to_hive_table(
...     spark=spark,
...     df=df,
...     table_name="my_database.my_table",
...     fill_missing_cols=True
... )

Repartition by column before writing to Hive:

>>> insert_df_to_hive_table(
...     spark=spark,
...     df=df,
...     table_name="my_database.my_table",
...     repartition_data_by="partition_column"
... )

Repartition into a fixed number of partitions before writing:

>>> insert_df_to_hive_table(
...     spark=spark,
...     df=df,
...     table_name="my_database.my_table",
...     repartition_data_by=10
... )

save_csv_to_hdfs(df: SparkDF, file_name: str, file_path: str, overwrite: bool = True) -> None

Save DataFrame as CSV on HDFS, coalescing to a single partition.

This function saves a PySpark DataFrame to HDFS in CSV format. By coalescing the DataFrame into a single partition before saving, it accomplishes two main objectives:

  1. Single Part File: The output is a single CSV file rather than multiple part files. This method reduces complexity and cuts through the clutter of multi-part files, offering users and systems a more cohesive and hassle-free experience.

  2. Preserving Row Order: Coalescing into a single partition maintains the order of rows as they appear in the DataFrame. This is essential when the row order matters for subsequent processing or analysis. It's important to note, however, that coalescing can have performance implications for very large DataFrames by concentrating all data processing on a single node.

Parameters:

Name Type Description Default
df DataFrame

PySpark DataFrame to be saved.

required
file_name str

Name of the CSV file. Must include the ".csv" extension.

required
file_path str

HDFS path where the CSV file should be saved.

required
overwrite bool

If True, overwrite any existing file with the same name. If False and the file exists, the function will raise an error.

True

Raises:

Type Description
ValueError

If the file_name does not end with ".csv".

IOError

If overwrite is False and the target file already exists.

Examples:

Saving to an S3 bucket using the s3a:// scheme:

# Assume `df` is a pre-defined PySpark DataFrame
file_name = "data_output.csv"
file_path = "s3a://my-bucket/data_folder/"
save_csv_to_hdfs(df, file_name, file_path, overwrite=True)

Saving to a normal HDFS path:

# Assume `df` is a pre-defined PySpark DataFrame
file_name = "data_output.csv"
file_path = "/user/hdfs/data_folder/"
save_csv_to_hdfs(df, file_name, file_path, overwrite=True)

save_csv_to_s3(df: SparkDF, bucket_name: str, file_name: str, file_path: str, s3_client: boto3.client, overwrite: bool = True) -> None

Save DataFrame as CSV on S3, coalescing to a single partition.

This function saves a PySpark DataFrame to S3 in CSV format. By coalescing the DataFrame into a single partition before saving, it accomplishes two main objectives:

  1. Single Part File: The output is a single CSV file rather than multiple part files. This method reduces complexity and cuts through the clutter of multi-part files, offering users and systems a more cohesive and hassle-free experience.

  2. Preserving Row Order: Coalescing into a single partition maintains the order of rows as they appear in the DataFrame. This is essential when the row order matters for subsequent processing or analysis. It's important to note, however, that coalescing can have performance implications for very large DataFrames by concentrating all data processing on a single node.

Parameters:

Name Type Description Default
df DataFrame

PySpark DataFrame to be saved.

required
bucket_name str

The name of the S3 bucket where the CSV file should be saved.

required
file_name str

Name of the CSV file. Must include the ".csv" extension.

required
file_path str

S3 path where the CSV file should be saved.

required
s3_client client

The boto3 S3 client instance.

required
overwrite bool

If True, overwrite any existing file with the same name. If False and the file exists, the function will raise an error.

True

Raises:

Type Description
ValueError

If the file_name does not end with ".csv".

InvalidBucketNameError

If the bucket name does not meet AWS specifications.

InvalidS3FilePathError

If the file_path contains an S3 URI scheme like 's3://' or 's3a://'.

IOError

If overwrite is False and the target file already exists.

Examples:

Saving to an S3 bucket:

# Assume `df` is a pre-defined PySpark DataFrame
file_name = "data_output.csv"
file_path = "data_folder/"
s3_client = boto3.client('s3')
save_csv_to_s3(
    df,
    'my-bucket',
    file_name,
    file_path,
    s3_client,
    overwrite=True
)

write_and_read_hive_table(spark: SparkSession, df: SparkDF, table_name: str, database: str, filter_id: Union[int, str], filter_col: str = 'run_id', fill_missing_cols: bool = False) -> SparkDF

Write a SparkDF to an existing Hive table and then read it back.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
df DataFrame

The SparkDF to be written to the Hive table.

required
table_name str

The name of the Hive table to write to and read from.

required
database str

The Hive database name.

required
filter_id Union[int, str]

The identifier to filter on when reading data back from the Hive table.

required
filter_col str

The column name to use for filtering data when reading back from the Hive table, by default 'run_id'.

'run_id'
fill_missing_cols bool

If True, missing columns in the DataFrame will be filled with nulls when writing to the Hive table, by default False.

False

Returns:

Type Description
DataFrame

The DataFrame read from the Hive table.

Raises:

Type Description
ValueError

If the specified Hive table does not exist in the given database or if the provided DataFrame doesn't contain the specified filter column.

Exception

For general exceptions encountered during execution.

Notes

This function assumes the Hive table already exists. The DataFrame df should have the same schema as the Hive table for the write to succeed.

The function allows for more effective memory management when dealing with large PySpark DataFrames by leveraging Hive's on-disk storage.

Predicate pushdown is used when reading the data back into a PySpark DataFrame, minimizing the memory usage and optimizing the read operation.

As part of the design, there is always a column called filter_col in the DataFrame and Hive table to track pipeline runs.

The Hive table contains all the runs, and we only read back the run that we just wrote to the Hive Table using the filter_id parameter. If no filter_col is specified, 'run_id' is used as default.

GCP

rdsa_utils.gcp.helpers.gcp_utils

Utility functions for interacting with Google Cloud Storage.

To initialise a client for GCS and configure it with a service account JSON key file, you can use the following code snippet:

from google.cloud import storage

# Create a GCS client
client = storage.Client.from_service_account_json('path/to/keyfile.json')

copy_file(client: storage.Client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str, overwrite: bool = False) -> bool

Copy a file from one GCS bucket to another.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
source_bucket_name str

The name of the source bucket.

required
source_object_name str

The GCS object name of the source file.

required
destination_bucket_name str

The name of the destination bucket.

required
destination_object_name str

The GCS object name of the destination file.

required
overwrite bool

If True, overwrite the destination file if it already exists.

False

Returns:

Type Description
bool

True if the file was copied successfully, otherwise False.

Examples:

>>> client = storage.Client()
>>> copy_file(
...     client,
...     'source-bucket',
...     'source_file.txt',
...     'destination-bucket',
...     'destination_file.txt'
... )
True

create_folder_on_gcs(client: storage.Client, bucket_name: str, folder_path: str) -> bool

Create a folder in a GCS bucket if it doesn't already exist.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
bucket_name str

The name of the bucket where the folder will be created.

required
folder_path str

The name of the folder to create.

required

Returns:

Type Description
bool

True if the folder was created successfully or already exists, otherwise False.

Examples:

>>> client = storage.Client()
>>> create_folder_on_gcs(client, 'mybucket', 'new_folder/')
True

delete_file(client: storage.Client, bucket_name: str, object_name: str) -> bool

Delete a file from a GCS bucket.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
bucket_name str

The name of the bucket from which the file will be deleted.

required
object_name str

The GCS object name of the file to delete.

required

Returns:

Type Description
bool

True if the file was deleted successfully, otherwise False.

Examples:

>>> client = storage.Client()
>>> delete_file(client, 'mybucket', 'folder/gcs_file.txt')
True

delete_folder(client: storage.Client, bucket_name: str, folder_path: str) -> bool

Delete a folder in a GCS bucket.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
bucket_name str

The name of the GCS bucket.

required
folder_path str

The path of the folder to delete.

required

Returns:

Type Description
bool

True if the folder was deleted successfully, otherwise False.

Examples:

>>> client = storage.Client()
>>> delete_folder(client, 'mybucket', 'path/to/folder/')
True

download_file(client: storage.Client, bucket_name: str, object_name: str, local_path: str, overwrite: bool = False) -> bool

Download a file from a GCS bucket to a local directory.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
bucket_name str

The name of the GCS bucket from which to download the file.

required
object_name str

The GCS object name of the file to download.

required
local_path str

The local file path where the downloaded file will be saved.

required
overwrite bool

If True, overwrite the local file if it exists.

False

Returns:

Type Description
bool

True if the file was downloaded successfully, False otherwise.

Examples:

>>> client = storage.Client()
>>> download_file(
...     client,
...     'mybucket',
...     'folder/gcs_file.txt',
...     '/path/to/download.txt'
... )
True

download_folder(client: storage.Client, bucket_name: str, prefix: str, local_path: str, overwrite: bool = False) -> bool

Download a folder from a GCS bucket to a local directory.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
bucket_name str

The name of the GCS bucket from which to download the folder.

required
prefix str

The GCS prefix of the folder to download.

required
local_path str

The local directory path where the downloaded folder will be saved.

required
overwrite bool

If True, overwrite existing local files if they exist.

False

Returns:

Type Description
bool

True if the folder was downloaded successfully, False otherwise.

Examples:

>>> client = storage.Client()
>>> download_folder(
...     client,
...     'mybucket',
...     'folder/subfolder/',
...     '/path/to/local_folder',
...     overwrite=False
... )
True

file_exists(client: storage.Client, bucket_name: str, object_name: str) -> bool

Check if a specific file exists in a GCS bucket.

Parameters:

Name Type Description Default
client Client

The GCS client.

required
bucket_name str

The name of the bucket.

required
object_name str

The GCS object name to check for existence.

required

Returns:

Type Description
bool

True if the file exists, otherwise False.

Examples:

>>> client = storage.Client()
>>> file_exists(client, 'mybucket', 'folder/file.txt')
True

get_table_columns(table_path) -> List[str]

Return the column names for given bigquery table.

is_gcs_directory(client: storage.Client, bucket_name: str, object_name: str) -> bool

Check if a GCS key is a directory by listing its contents.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
bucket_name str

The name of the GCS bucket.

required
object_name str

The GCS object name to check.

required

Returns:

Type Description
bool

True if the key represents a directory, False otherwise.

list_files(client: storage.Client, bucket_name: str, prefix: str = '') -> List[str]

List files in a GCS bucket that match a specific prefix.

Parameters:

Name Type Description Default
client Client

The GCS client.

required
bucket_name str

The name of the bucket.

required
prefix str

The prefix to filter files, by default "".

''

Returns:

Type Description
List[str]

A list of GCS object keys matching the prefix.

Examples:

>>> client = storage.Client()
>>> list_files(client, 'mybucket', 'folder_prefix/')
['folder_prefix/file1.txt', 'folder_prefix/file2.txt']

load_config_gcp(config_path: str) -> Tuple[Dict, Dict]

Load the config and dev_config files to dictionaries.

Parameters:

Name Type Description Default
config_path str

The path of the config file in a yaml format.

required

Returns:

Type Description
Tuple[Dict, Dict]

The contents of the config files.

move_file(client: storage.Client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str) -> bool

Move a file within or between GCS buckets.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
source_bucket_name str

The name of the source GCS bucket.

required
source_object_name str

The GCS object name of the source file.

required
destination_bucket_name str

The name of the destination GCS bucket.

required
destination_object_name str

The GCS object name of the destination file.

required

Returns:

Type Description
bool

True if the file was moved successfully, False otherwise.

Examples:

>>> client = storage.Client()
>>> move_file(
...     client,
...     'sourcebucket',
...     'source_folder/file.txt',
...     'destbucket',
...     'dest_folder/file.txt'
... )
True

remove_leading_slash(text: str) -> str

Remove the leading forward slash from a string if present.

Parameters:

Name Type Description Default
text str

The text from which the leading slash will be removed.

required

Returns:

Type Description
str

The text stripped of its leading slash.

Examples:

>>> remove_leading_slash('/example/path')
'example/path'

run_bq_query(query: str) -> bigquery.QueryJob

Run an SQL query in BigQuery.

table_exists(table_path: TablePath) -> bool

Check the big query catalogue to see if a table exists.

Returns True if a table exists. See code sample explanation here: https://cloud.google.com/bigquery/docs/samples/bigquery-table-exists#bigquery_table_exists-python

Parameters:

Name Type Description Default
table_path TablePath

The target BigQuery table name of form: ..

required

Returns:

Type Description
bool

Returns True if table exists and False if table does not exist.

upload_file(client: storage.Client, bucket_name: str, local_path: str, object_name: Optional[str] = None, overwrite: bool = False) -> bool

Upload a file to a GCS bucket from local directory.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
bucket_name str

The name of the target GCS bucket.

required
local_path str

The file path on the local system to upload.

required
object_name Optional[str]

The target GCS object name. If None, uses the base name of the local file path.

None
overwrite bool

If True, the existing file on GCS will be overwritten.

False

Returns:

Type Description
bool

True if the file was uploaded successfully, False otherwise.

Examples:

>>> client = storage.Client()
>>> upload_file(
...     client,
...     'mybucket',
...     '/path/to/file.txt',
...     'folder/gcs_file.txt'
... )
True

upload_folder(client: storage.Client, bucket_name: str, local_path: str, prefix: str = '', overwrite: bool = False) -> bool

Upload an entire folder from the local file system to a GCS bucket.

Parameters:

Name Type Description Default
client Client

The GCS client instance.

required
bucket_name str

The name of the bucket to which the folder will be uploaded.

required
local_path str

The path to the local folder to upload.

required
prefix str

The prefix to prepend to each object name when uploading to GCS.

''
overwrite bool

If True, overwrite existing files in the bucket.

False

Returns:

Type Description
bool

True if the folder was uploaded successfully, otherwise False.

Examples:

>>> client = storage.Client()
>>> upload_folder(
...     client,
...     'mybucket',
...     '/path/to/local/folder',
...     'folder_prefix',
...     True
... )
True

validate_bucket_name(bucket_name: str) -> str

Validate the format of a GCS bucket name according to GCS rules.

Parameters:

Name Type Description Default
bucket_name str

The name of the bucket to validate.

required

Returns:

Type Description
str

The validated bucket name if valid.

Raises:

Type Description
InvalidBucketNameError

If the bucket name does not meet GCS specifications.

Examples:

>>> validate_bucket_name('valid-bucket-name')
'valid-bucket-name'
>>> validate_bucket_name('Invalid_Bucket_Name')
InvalidBucketNameError: Bucket name must not contain underscores.

rdsa_utils.gcp.io.inputs

Read from BigQuery.

build_sql_query(table_path: TablePath, columns: Optional[Sequence[str]] = None, date_column: Optional[str] = None, date_range: Optional[Sequence[str]] = None, column_filter_dict: Optional[Dict[str, Sequence[str]]] = None, partition_column: Optional[str] = None, partition_type: Optional[str] = None, partition_value: Optional[Union[Tuple[str, str], str]] = None) -> str

Create SQL query to load data with the specified filter conditions.

Parameters:

Name Type Description Default
spark

Spark session.

required
table_path TablePath

BigQuery table path in format "database_name.table_name".

required
columns Optional[Sequence[str]]

The column selection. Selects all columns if None passed.

None
date_column Optional[str]

The name of the column to be used to filter the date range on.

None
date_range Optional[Sequence[str]]

Sequence with two values, a lower and upper value for dates to load in.

None
column_filter_dict Optional[Dict[str, Sequence[str]]]

A dictionary containing column: [values] where the values correspond to terms in the column that are to be filtered by.

None
partition_column Optional[str]

The name of the column that the table is partitioned by.

None
partition_type Optional[str]

The unit of time the table is partitioned by, must be one of: * hour * day * month * year

None
partition_value Optional[Union[Tuple[str, str], str]]

The value or pair of values for filtering the partition column to.

None

Returns:

Type Description
str

The string containing the SQL query.

read_table(spark: SparkSession, table_path: TablePath, columns: Optional[Sequence[str]] = None, date_column: Optional[str] = None, date_range: Optional[Sequence[str]] = None, column_filter_dict: Optional[Dict[str, Sequence[str]]] = None, run_id_column: Optional[str] = 'run_id', run_id: Optional[str] = None, flatten_struct_cols: bool = False, partition_column: Optional[str] = None, partition_type: Optional[BigQueryTimePartitions] = None, partition_value: Optional[Union[Tuple[str, str], str]] = None) -> SparkDF

Read BigQuery table given table path and column selection.

Parameters:

Name Type Description Default
spark SparkSession

Spark session.

required
table_path TablePath

The target BigQuery table name of form: ..

required
columns Optional[Sequence[str]]

The column selection. Selects all columns if None passed.

None
date_column Optional[str]

The name of the column to be used to filter the date range on.

None
date_range Optional[Sequence[str]]

Sequence with two values, a lower and upper value for dates to load in.

None
column_filter_dict Optional[Dict[str, Sequence[str]]]

A dictionary containing column: [values] where the values correspond to terms in the column that are to be filtered by.

None
run_id_column Optional[str]

The name of the column to be used to filter to the specified run_id.

'run_id'
run_id Optional[str]

The unique identifier for a run within the table that the read data is filtered to.

None
partition_column Optional[str]

The name of the column that the table is partitioned by.

None
partition_type Optional[BigQueryTimePartitions]

The unit of time the table is partitioned by, must be one of: * hour * day * month * year

None
partition_value Optional[Union[Tuple[str, str], str]]

The value or pair of values for filtering the partition column to.

None
flatten_struct_cols bool

When true, any struct type columns in the loaded dataframe are replaced with individual columns for each of the fields in the structs.

False

Returns:

Type Description
DataFrame

rdsa_utils.gcp.io.outputs

Write outputs to GCP.

write_table(df: Union[PandasDF, SparkDF], table_name: TablePath, mode: Literal['append', 'error', 'ignore', 'overwrite'] = 'error', partition_col: Optional[str] = None, partition_type: Optional[BigQueryTimePartitions] = None, partition_expiry_days: Optional[float] = None, clustered_fields: Optional[Union[str, List[str]]] = None) -> None

Write dataframe out to a Google BigQuery table.

In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function (default to throwing an exception). When mode is Overwrite, the schema of the DataFrame does not need to be the same as that of the existing table (the column order doesn't need be the same).

If you use the df.printSchema() method directly in a print/log statement the code is processed and printed regardless of logging level. Instead you need to capture the output and pass this to the logger. See explanation here - https://stackoverflow.com/a/59935109

To learn more about the partitioning of tables and how to use them in BigQuery: https://cloud.google.com/bigquery/docs/partitioned-tables

To learn more about the clustering of tables and how to use them in BigQuery: https://cloud.google.com/bigquery/docs/clustered-tables

To learn more about how spark dataframes are saved to BigQuery: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/blob/master/README.md

Parameters:

Name Type Description Default
df Union[DataFrame, DataFrame]

The dataframe to be saved.

required
table_name TablePath

The target BigQuery table name of form: ..

required
mode Literal['append', 'error', 'ignore', 'overwrite']

Whether to overwrite or append to the BigQuery table. * append: Append contents of this :class:DataFrame to table. * overwrite: Overwrite existing data. * error: Throw exception if data already exists. * ignore: Silently ignore this operation if data already exists.

'error'
partition_col Optional[str]

A date or timestamp type column in the dataframe to use for the table partitioning.

None
partition_type Optional[BigQueryTimePartitions]

The unit of time to partition the table by, must be one of: * hour * day * month * year

If partition_col is specified and partition_type = None then BigQuery will default to using day partition type.

If 'partition_typeis specified andpartition_col = Nonethen the table will be partitioned by the ingestion time pseudo column, and can be referenced in BigQuery via either_PARTITIONTIME as ptor_PARTITIONDATE' as pd`.

See https://cloud.google.com/bigquery/docs/querying-partitioned-tables for more information on querying partitioned tables.

None
partition_expiry_days Optional[float]

If specified, this is the number of days (any decimal values are converted to that proportion of a day) that BigQuery keeps the data in each partition.

None
clustered_fields Optional[Union[str, List[str]]]

If specified, the columns (up to four) in the dataframe to cluster the data by when outputting. The order the columns are specified is important as will be the ordering of the clustering on the BigQuery table.

See: https://cloud.google.com/bigquery/docs/querying-clustered-tables for more information on querying clustered tables.

None

Returns:

Type Description
None

Helpers

rdsa_utils.helpers.pyspark

A selection of helper functions for building in pyspark.

calc_median_price(groups: Union[str, Sequence[str]], price_col: str = 'price') -> SparkCol

Calculate the median price per grouping level.

Parameters:

Name Type Description Default
groups Union[str, Sequence[str]]

The grouping levels for calculating the average price.

required
price_col str

Column name containing the product prices.

'price'

Returns:

Type Description
Column

A single entry for each grouping level, and its median price.

convert_cols_to_struct_col(df: SparkDF, struct_col_name: str, struct_cols: Optional[Sequence[str]], no_struct_col_type: T.DataTypeSingleton = T.BooleanType(), no_struct_col_value: Any = None) -> SparkDF

Convert specified selection of columns to a single struct column.

As BigQuery tables do not take to having an empty struct column appended to them, this function will create a placeholder column to put into the struct column if no column names to combine are passed.

Parameters:

Name Type Description Default
df DataFrame

The input dataframe that contains the columns for combining.

required
struct_col_name str

The name of the resulting struct column.

required
struct_cols Optional[Sequence[str]]

A sequence of columns present in df for combining.

required
no_struct_col_type DataTypeSingleton

If no struct_cols are present, this is the type that the dummy column to place in the struct will be, default = BooleanType.

BooleanType()
no_struct_col_value Any

If no struct_cols are present, this is the value that will be used in the dummy column, default = None.

None

Returns:

Type Description
The input dataframe with the specified struct_cols dropped and replaced

with a single struct type column containing those columns.

Raises:

Type Description
ValueError

If not all the specified struct_cols are present in df.

convert_struc_col_to_columns(df: SparkDF, convert_nested_structs: bool = False) -> SparkDF

Flatten struct columns in pyspark dataframe to individual columns.

Parameters:

Name Type Description Default
df DataFrame

Dataframe that may or may not contain struct type columns.

required
convert_nested_structs bool

If true, function will recursively call until no structs are left. Inversely, when false, only top level structs are flattened; if these contain subsequent structs they would remain.

False

Returns:

Type Description
The input dataframe but with any struct type columns dropped, and in

its place the individual fields within the struct column as individual columns.

create_colname_to_value_map(cols: Sequence[str]) -> SparkCol

Create a column name to value MapType column.

create_spark_session(app_name: Optional[str] = None, size: Optional[Literal['small', 'medium', 'large', 'extra-large']] = None, extra_configs: Optional[Dict[str, str]] = None) -> SparkSession

Create a PySpark Session based on the specified size.

This function creates a PySpark session with different configurations based on the size specified.

The size can be 'default', 'small', 'medium', 'large', or 'extra-large'. Extra Spark configurations can be passed as a dictionary. If no size is given, then a basic Spark session is spun up.

Parameters:

Name Type Description Default
app_name Optional[str]

The spark session app name.

None
size Optional[Literal['small', 'medium', 'large', 'extra-large']]

The size of the spark session to be created. It can be 'default', 'small', 'medium', 'large', or 'extra-large'.

None
extra_configs Optional[Dict[str, str]]

Mapping of additional spark session config settings and the desired value for it. Will override any default settings.

None

Returns:

Type Description
SparkSession

The created PySpark session.

Raises:

Type Description
ValueError

If the specified 'size' parameter is not one of the valid options: 'small', 'medium', 'large', or 'extra-large'.

Exception

If any other error occurs during the Spark session creation process.

Examples:

>>> spark = create_spark_session('medium', {'spark.ui.enabled': 'false'})
Session Details:

'small': This is the smallest session that will realistically be used. It uses only 1g of memory and 3 executors, and only 1 core. The number of partitions are limited to 12, which can improve performance with smaller data. It's recommended for simple data exploration of small survey data or for training and demonstrations when several people need to run Spark sessions simultaneously. 'medium': A standard session used for analysing survey or synthetic datasets. Also used for some Production pipelines based on survey and/or smaller administrative data.It uses 6g of memory and 3 executors, and 3 cores. The number of partitions are limited to 18, which can improve performance with smaller data. 'large': Session designed for running Production pipelines on large administrative data, rather than just survey data. It uses 10g of memory and 5 executors, 1g of memory overhead, and 5 cores. It uses the default number of 200 partitions. 'extra-large': Used for the most complex pipelines, with huge administrative data sources and complex calculations. It uses 20g of memory and 12 executors, 2g of memory overhead, and 5 cores. It uses 240 partitions; not significantly higher than the default of 200, but it is best for these to be a multiple of cores and executors.

References

The session sizes and their details are taken directly from the following resource: "https://best-practice-and-impact.github.io/ons-spark/spark-overview/example-spark-sessions.html"

cut_lineage(df: SparkDF) -> SparkDF

Convert the SparkDF to a Java RDD and back again.

This function is helpful in instances where Catalyst optimizer is causing memory errors or problems, as it only tries to optimize till the conversion point.

Note: This uses internal members and may break between versions.

Parameters:

Name Type Description Default
df DataFrame

SparkDF to convert.

required

Returns:

Type Description
DataFrame

New SparkDF created from Java RDD.

Raises:

Type Description
Exception

If any error occurs during the lineage cutting process, particularly during conversion between SparkDF and Java RDD or accessing internal members.

Examples:

>>> df = rdd.toDF()
>>> new_df = cut_lineage(df)
>>> new_df.count()
3

find_spark_dataframes(locals_dict: Dict[str, Union[SparkDF, Dict]]) -> Dict[str, Union[SparkDF, Dict]]

Extract SparkDF's objects from a given dictionary.

This function scans the dictionary and returns another containing only entries where the value is a SparkDF. It also handles dictionaries within the input, including them in the output if their first item is a SparkDF.

Designed to be used with locals() in Python, allowing extraction of all SparkDF variables in a function's local scope.

Parameters:

Name Type Description Default
locals_dict Dict[str, Union[DataFrame, Dict]]

A dictionary usually returned by locals(), with variable names as keys and their corresponding objects as values.

required

Returns:

Type Description
Dict

A dictionary with entries from locals_dict where the value is a SparkDF or a dictionary with a SparkDF as its first item.

Examples:

>>> dfs = find_spark_dataframes(locals())

get_window_spec(partition_cols: Optional[Union[str, Sequence[str]]] = None, order_cols: Optional[Union[str, Sequence[str]]] = None) -> WindowSpec

Return ordered and partitioned WindowSpec, defaulting to whole df.

Particularly useful when you don't know if the variable being used for partition_cols will contain values or not in advance.

Parameters:

Name Type Description Default
partition_cols Optional[Union[str, Sequence[str]]]

If present the columns to partition a spark dataframe on.

None
order_cols Optional[Union[str, Sequence[str]]]

If present the columns to order a spark dataframe on (where order in sequence is order that orderBy is applied).

None

Returns:

Type Description
WindowSpec

The WindowSpec object to be applied.

Usage

window_spec = get_window_spec(...)

F.sum(values).over(window_spec)

is_df_empty(df: SparkDF) -> bool

Check whether a spark dataframe contains any records.

load_csv(spark: SparkSession, filepath: str, keep_columns: Optional[List[str]] = None, rename_columns: Optional[Dict[str, str]] = None, drop_columns: Optional[List[str]] = None, **kwargs) -> SparkDF

Load a CSV file into a PySpark DataFrame.

spark Active SparkSession. filepath The full path and filename of the CSV file to load. keep_columns A list of column names to keep in the DataFrame, dropping all others. Default value is None. rename_columns A dictionary to rename columns where keys are existing column names and values are new column names. Default value is None. drop_columns A list of column names to drop from the DataFrame. Default value is None. kwargs Additional keyword arguments to pass to the spark.read.csv method.

Returns:

Type Description
DataFrame

PySpark DataFrame containing the data from the CSV file.

Notes

Transformation order: 1. Columns are kept according to keep_columns. 2. Columns are dropped according to drop_columns. 3. Columns are renamed according to rename_columns.

Raises:

Type Description
Exception

If there is an error loading the file.

ValueError

If a column specified in rename_columns, drop_columns, or keep_columns is not found in the DataFrame.

Examples:

Load a CSV file with multiline and rename columns:

>>> df = load_csv(
        spark,
        "/path/to/file.csv",
        multiLine=True,
        rename_columns={"old_name": "new_name"}
    )

Load a CSV file with a specific encoding:

>>> df = load_csv(spark, "/path/to/file.csv", encoding="ISO-8859-1")

Load a CSV file and keep only specific columns:

>>> df = load_csv(spark, "/path/to/file.csv", keep_columns=["col1", "col2"])

Load a CSV file and drop specific columns:

>>> df = load_csv(spark, "/path/to/file.csv", drop_columns=["col1", "col2"])

Load a CSV file with custom delimiter and multiline:

>>> df = load_csv(spark, "/path/to/file.csv", sep=";", multiLine=True)

map_column_names(df: SparkDF, mapper: Mapping[str, str]) -> SparkDF

Map column names to the given values in the mapper.

If the column name is not in the mapper the name doesn't change.

melt(df: SparkDF, id_vars: Union[str, Sequence[str]], value_vars: Union[str, Sequence[str]], var_name: str = 'variable', value_name: str = 'value') -> SparkDF

Melt a spark dataframe in a pandas like fashion.

Parameters:

Name Type Description Default
df DataFrame

The pyspark dataframe to melt.

required
id_vars Union[str, Sequence[str]]

The names of the columns to use as identifier variables.

required
value_vars Union[str, Sequence[str]]

The names of the columns containing the data to unpivot.

required
var_name str

The name of the target column containing variable names (i.e. the original column names).

'variable'
value_name str

The name of the target column containing the unpivoted data.

'value'

Returns:

Type Description
DataFrame

The "melted" input data as a pyspark data frame.

Examples:

>>> df = spark.createDataFrame(
...     [[1, 2, 3, 4],
...      [5, 6, 7, 8],
...      [9, 10, 11, 12]],
...     ["col1", "col2", "col3", "col4"])
>>> melt(df=df, id_vars="col1", value_vars=["col2", "col3"]).show()
+----+--------+-----+
|col1|variable|value|
+----+--------+-----+
|   1|    col2|    2|
|   1|    col3|    3|
|   5|    col2|    6|
|   5|    col3|    7|
|   9|    col2|   10|
|   9|    col3|   11|
+----+--------+-----+
>>> melt(df=df, id_vars=["col1", "col2"], value_vars=["col3", "col4"]
... ).show()
+----+----+--------+-----+
|col1|col2|variable|value|
+----+----+--------+-----+
|   1|   2|    col3|    3|
|   1|   2|    col4|    4|
|   5|   6|    col3|    7|
|   5|   6|    col4|    8|
|   9|  10|    col3|   11|
|   9|  10|    col4|   12|
+----+----+--------+-----+

rank_numeric(numeric: Union[str, Sequence[str]], group: Union[str, Sequence[str]], ascending: bool = False) -> SparkCol

Rank a numeric and assign a unique value to each row.

The F.row_number() method has been selected as a method to rank as gives a unique number to each row. Other methods such as F.rank() and F.dense_rank() do not assign unique values per row.

Parameters:

Name Type Description Default
numeric Union[str, Sequence[str]]

The column name or list of column names containing values which will be ranked.

required
group Union[str, Sequence[str]]

The grouping levels to rank the numeric column or columns over.

required
ascending bool

Dictates whether high or low values are ranked as the top value.

False

Returns:

Type Description
Column

Contains a rank for the row in its grouping level.

select_first_obs_appearing_in_group(df: SparkDF, group: Sequence[str], date_col: str, ascending: bool) -> SparkDF

Rank and select observation in group based on earliest or latest date.

Given that there can be multiple observations per group, select observation that appears first or last (depending on whether ascending is set to True or False, respectively).

Parameters:

Name Type Description Default
df DataFrame

The input dataframe that contains the group and date_col.

required
group Sequence[str]

The grouping levels required to find the observation that appears first or last (depending on whether ascending is set to True or False, respectively)

required
date_col str

Column name containing the dates of each observation.

required
ascending bool

Dictates whether first or last observation within a grouping is selected (depending on whether ascending is set to True or False, respectively).

required

Returns:

Type Description
DataFrame

The input dataframe that contains each observation per group that appeared first or last (depending on whether ascending is set to True or False, respectively) according to date_col.

set_df_columns_nullable(df: SparkDF, column_list: List[str], nullable: Optional[bool] = True) -> SparkDF

Change specified columns nullable value.

Sometimes find that spark creates columns that have the nullable attribute set to False, which can cause issues if this dataframe is saved to a table as it will set the schema for that column to not allow missing values.

Changing this parameter for a column appears to be very difficult (and also potentially costly [see so answer comments] - SO USE ONLY IF NEEDED).

The solution implemented is taken from Stack Overflow post: https://stackoverflow.com/a/51821437

Parameters:

Name Type Description Default
df DataFrame

The dataframe with columns to have nullable attribute changed.

required
column_list List[str]

List of columns to change nullable attribute.

required
nullable Optional[bool]

The value to set the nullable attribute to for the specified columns.

True

Returns:

Type Description
DataFrame

The input dataframe but with nullable attribute changed for specified columns.

to_list(df: SparkDF) -> List[Union[Any, List[Any]]]

Convert Spark DF to a list.

Returns:

Type Description
list or list of lists

If the input DataFrame has a single column then a list of column values will be returned. If the DataFrame has multiple columns then a list of row data as lists will be returned.

to_spark_col(_func=None, *, exclude: Sequence[str] = None) -> Callable

Convert str args to Spark Column if not already.

Usage

Use as a decorator on a function.

To convert all string arguments to spark column

@to_spark_col def my_func(arg1, arg2)

To exclude a string arguments from being converted to a spark column

@to_spark_col(exclude=['arg2']) def my_func(arg1, arg2)

transform(self, f, *args, **kwargs)

Chain Pyspark function.

truncate_external_hive_table(spark: SparkSession, table_identifier: str) -> None

Truncate an External Hive table stored on S3 or HDFS.

Parameters:

Name Type Description Default
spark SparkSession

Active SparkSession.

required
table_identifier str

The name of the Hive table to truncate. This can either be in the format '.

' or simply '
' if the current Spark session has a database set.

required

Returns:

Type Description
None

This function does not return any value. It performs an action of truncating the table.

Raises:

Type Description
ValueError

If the table name is incorrectly formatted, the database is not provided when required, or if the table does not exist.

AnalysisException

If there is an issue with partition operations or SQL queries.

Exception

If there is a general failure during the truncation process.

Examples:

Truncate a Hive table named 'my_database.my_table':

>>> truncate_external_hive_table(spark, 'my_database.my_table')

Or, if the current Spark session already has a database set:

>>> spark.catalog.setCurrentDatabase('my_database')
>>> truncate_external_hive_table(spark, 'my_table')

unpack_list_col(df: SparkDF, list_col: str, unpacked_col: str) -> SparkDF

Unpack a spark column containing a list into multiple rows.

Parameters:

Name Type Description Default
df DataFrame

Contains the list column to unpack.

required
list_col str

The name of the column which contains lists.

required
unpacked_col str

The name of the column containing the unpacked list items.

required

Returns:

Type Description
DataFrame

Contains a new row for each unpacked list item.

rdsa_utils.helpers.python

Miscellaneous helper functions for Python.

always_iterable_local(obj: Any) -> Callable

Supplement more-itertools always_iterable to also exclude dicts.

By default it would convert a dictionary to an iterable of just its keys, dropping all the values. This change makes it so dictionaries are not altered (similar to how strings aren't broken down).

calc_product_of_dict_values(**kwargs: Mapping[str, Union[str, float, Iterable]]) -> Mapping[str, any]

Create cartesian product of values for each kwarg.

In order to create product of values, the values are converted to a list so that product of values can be derived.

Yields:

Type Description
Next result of cartesian product of kwargs values.
Example

my_dict = { 'key1': 1, 'key2': [2, 3, 4] }

list(calc_product_of_dict_values(**my_dict))

[{'key1': 1, 'key2': 2}, {'key1': 1, 'key2': 3}, {'key1': 1, 'key2': 4}]

Notes

Modified from: https://stackoverflow.com/a/5228294

convert_date_strings_to_datetimes(start_date: str, end_date: str) -> Tuple[pd.Timestamp, pd.Timestamp]

Convert start and end dates from strings to timestamps.

Parameters:

Name Type Description Default
start_date str

Datetime like object which is used to define the start date for filter. Acceptable string formats include (but not limited to): MMMM YYYY, YYYY-MM, YYYY-MM-DD, DD MMM YYYY etc. If only month and year specified the start_date is set as first day of month.

required
end_date str

Datetime like object which is used to define the start date for filter. Acceptable string formats include (but not limited to): MMMM YYYY, YYYY-MM, YYYY-MM-DD, DD MMM YYYY etc. If only month and year specified the end_date is set as final day of month.

required

Returns:

Type Description
tuple[Timestamp, Timestamp]

Tuple where the first value is the start date and the second the end date.

extend_lists(sections: List[List[str]], elements_to_add: List[str]) -> None

Check list elements are unique then append to existing list.

Note the .extend method in Python overwrites each section. There is no need to assign a variable to this function, the section will update automatically.

The function can be used with the load_config function to extend a value list in a config yaml file. For example with a config.yaml file as per below:

input_columns
    - col_a
    - col_b

output_columns
    - col_b

To add column col_c the function can be utilised as follows:

config = load_config("config.yaml")

sections = [config['input_columns'], config['output_columns']]
elements_to_add = ['col_c']

extend_lists(sections, elements_to_add)

The output will be as follows.

input_columns
    - col_a
    - col_b
    - col_c

output_columns
    - col_b
    - col_c

Parameters:

Name Type Description Default
sections List[List[str]]

The section to be updated with the extra elements.

required
elements_to_add List[str]

The new elements to add to the specified sections.

required

Returns:

Type Description
None

Note the .extend method in Python overwrites the sections. There is no need to assign a variable to this function, the section will update automatically.

list_convert(obj: Any) -> List[Any]

Convert object to list using more-itertools' always_iterable.

overwrite_dictionary(base_dict: Mapping[str, Any], override_dict: Mapping[str, Any]) -> Dict[str, Any]

Overwrite dictionary values with user defined values.

The following restrictions are in place: * base_dict and override_dict have the same value which is not dictionary then override_dict has priority. * If base_dict contains dictionary and override_dict contains a value (e.g. string or list) with the same key, priority is upon base_dict and the override_dict value is ignored. * If key is in override_dict but not in base_dict then an Exception is raised and code stops. * Any other restrictions will require code changes.

Parameters:

Name Type Description Default
base_dict Mapping[str, Any]

Dictionary containing existing key value pairs.

required
override_dict Mapping[str, Any]

Dictionary containing new keys/values to inset into base_dict.

required

Returns:

Type Description
Dict[str, Any]

The base_dict with any keys matching the override_dict being replaced. Any keys not present in base_dict are appended.

Example

dic1 = {"var1": "value1", "var2": {"var3": 1.1, "var4": 4.4}, "var5": [1, 2, 3]} dic2 = {"var2": {"var3": 9.9}}

overwrite_dictionary(dic1, dic2) {'var1': 'value1', 'var2': {'var3': 9.9, 'var4': 4.4}, 'var5': [1, 2, 3]}

dic3 = {"var2": {"var3": 9.9}, "var6": -1} overwrite_dictionary(dic1, dic3) ERROR main: ('var6', -1) not in base_dict

Notes

Modified from: https://stackoverflow.com/a/58742155

Warning

Due to recursive nature of function, the function will overwrite the base_dict object that is passed into the original function.

Raises:

Type Description
ValueError

If a key is present in override_dict but not base_dict.

tuple_convert(obj: Any) -> Tuple[Any]

Convert object to tuple using more-itertools' always_iterable.

IO

rdsa_utils.io.config

Module for code relating to loading config files.

LoadConfig(config_path: Union[CloudPath, Path], config_overrides: Optional[Config] = None, config_type: Optional[Literal['json', 'toml', 'yaml']] = None, config_validators: Optional[Dict[str, BaseModel]] = None)

Class for loading and storing a configuration file.

Attributes:

Name Type Description
config

The loaded config stored as a dictionary.

config_dir

The logical parent directory of loaded config_path.

config_file

The file name of the loaded config_path.

config_original

The configuration dictionary as initially loaded, prior to applying any overrides or validation.

config_overrides

The configuration override dictionary, if provided.

config_path

The path of the loaded config file.

config_type

The file type of the loaded config file.

config_validators

The validators used to validate the loaded config, if provided.

**attrs

Every top level key in the loaded config is also set as an attribute to allow simpler access to each config section.

Init method.

Parameters:

Name Type Description Default
config_path Union[CloudPath, Path]

The path of the config file to be loaded.

required
config_overrides Optional[Config]

A dictionary containing a subset of the keys and values of the config file that is initially loaded, by default None. If values are provided that are not in the initial config then a ConfigError is raised.

None
optional Optional[Config]

A dictionary containing a subset of the keys and values of the config file that is initially loaded, by default None. If values are provided that are not in the initial config then a ConfigError is raised.

None
config_type Optional[Literal['json', 'toml', 'yaml']]

The file type of the config file being loaded, by default None. If not specified then this is inferred from the config_path.

None
optional Optional[Literal['json', 'toml', 'yaml']]

The file type of the config file being loaded, by default None. If not specified then this is inferred from the config_path.

None
config_validators Optional[Dict[str, BaseModel]]

A dictionary made up of key, value pairs where the keys refer to the top level sections of the loaded config, and the values are a pydantic validation class for the section, by default None. If only some of the keys are specified with validators, warnings are raised to alert that they have not been validated.

None
optional Optional[Dict[str, BaseModel]]

A dictionary made up of key, value pairs where the keys refer to the top level sections of the loaded config, and the values are a pydantic validation class for the section, by default None. If only some of the keys are specified with validators, warnings are raised to alert that they have not been validated.

None

rdsa_utils.io.input

Module containing generic input functionality code.

parse_json(data: str) -> Config

Parse JSON formatted string into a dictionary.

Parameters:

Name Type Description Default
data str

String containing standard JSON-formatted data.

required

Returns:

Type Description
Config

A dictionary containing the parsed data.

Raises:

Type Description
JSONDecodeError

If the string format of config_overrides cannot be decoded by json.loads (i.e. converted to a dictionary).

parse_toml(data: str) -> Config

Parse TOML formatted string into a dictionary.

Parameters:

Name Type Description Default
data str

String containing standard TOML-formatted data.

required

Returns:

Type Description
Config

A dictionary containing the parsed data.

parse_yaml(data: str) -> Config

Parse YAML formatted string into a dictionary.

Parameters:

Name Type Description Default
data str

String containing standard YAML-formatted data.

required

Returns:

Type Description
Config

A dictionary containing the parsed data.

read_file(file: Union[CloudPath, Path]) -> str

Load contents of specified file.

Parameters:

Name Type Description Default
file Union[CloudPath, Path]

The absolute file path of the file to be read.

required

Returns:

Type Description
str

The contents of the provided file.

Raises:

Type Description
FileNotFoundError

If the provided file does not exist.

rdsa_utils.io.output

Module containing generic output functionality code.

zip_folder(source_dir: str, output_filename: str, overwrite: bool = False) -> bool

Zip the contents of the specified directory.

Parameters:

Name Type Description Default
source_dir str

The directory whose contents are to be zipped.

required
output_filename str

The output zip file name. It must end with '.zip'.

required
overwrite bool

If True, overwrite the existing zip file if it exists. Default is False.

False

Returns:

Type Description
bool

True if the directory was zipped successfully, False otherwise.

Examples:

>>> zip_folder('/path/to/source_dir', 'output.zip', overwrite=True)
True

Methods

rdsa_utils.methods.averaging_methods

Weighted and unweighted averaging functions.

get_weight_shares(weights: str, levels: Optional[Union[str, Sequence[str]]] = None) -> SparkCol

Divide weights by sum of weights for each group.

unweighted_arithmetic_average(val: str) -> SparkCol

Calculate the unweighted arithmetic average.

unweighted_geometric_average(val: str) -> SparkCol

Calculate the unweighted geometric average.

weighted_arithmetic_average(val: str, weight: str) -> SparkCol

Calculate the weighted arithmetic average.

weighted_geometric_average(val: str, weight: str) -> SparkCol

Calculate the weighted geometric average.