API Reference
This part of the project documentation focuses on an information-oriented approach. Use it as a
reference for the technical implementation of therdsa-utils
codebase.
General
rdsa_utils.exceptions
Common custom exceptions that can be raised in pipelines.
The purpose of these is to provide a clearer indication of why an error is
being raised over the standard builtin errors (e.g. ColumnNotInDataframeError
vs ValueError
).
ColumnNotInDataframeError
Bases: Exception
Custom exception to raise when a column is not present in dataframe.
ConfigError
Bases: Exception
Custom exception to raise when there is an issue in a config object.
DataframeEmptyError
Bases: Exception
Custom exception to raise when a dataframe is empty.
InvalidBucketNameError
Bases: Exception
Custom exception to raise when an AWS S3 or GCS bucket name is invalid.
InvalidS3FilePathError
Bases: Exception
Custom exception to raise when an AWS S3 file path is invalid.
PipelineError
Bases: Exception
Custom exception to raise when there is a generic pipeline issue.
TableNotFoundError
Bases: Exception
Custom exception to raise when a table to be read is not found.
rdsa_utils.logging
Contains the logging configuration for files and method to initialise it.
add_warning_message_to_function(_func: Callable = None, *, message: Optional[str] = None) -> Callable
Apply decorator to log a warning message.
If a message is passed, this decorator adds a warning log of the form function_name: message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
Optional[str]
|
The message to be logged along with the function name. |
None
|
Notes
Explainer on complex decorators (and template for decorator structure): https://realpython.com/primer-on-python-decorators/#both-please-but-never-mind-the-bread
Usage
To use decorator to log a warning:
>>> @_add_warning_message_to_function(message='here be dragons...')
>>> def my_func(some_args, some_other_args):
>>> ...
>>>
>>> some_output = my_func(...)
Warning my_func: here be dragons...
init_logger_advanced(log_level: int, handlers: Optional[List[logging.Handler]] = None, log_format: str = None, date_format: str = None) -> None
Instantiate a logger with provided handlers.
This function allows the logger to be used across modules. Logs can be
handled by any number of handlers, e.g., FileHandler, StreamHandler, etc.,
provided in the handlers
list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_level
|
int
|
The level of logging to be recorded. Can be defined either as the
integer level or the logging. |
required |
handlers
|
Optional[List[Handler]]
|
List of handler instances to be added to the logger. Each handler
instance must be a subclass of |
None
|
log_format
|
str
|
The format of the log message. If not provided, a default format
|
None
|
date_format
|
str
|
The format of the date in the log message. If not provided, a default
format |
None
|
Returns:
Type | Description |
---|---|
None
|
The logger created by this function is available in any other modules
by using |
Raises:
Type | Description |
---|---|
ValueError
|
If any item in the |
Examples:
>>> file_handler = logging.FileHandler('logfile.log')
>>> rich_handler = RichHandler()
>>> init_logger_advanced(
... logging.DEBUG,
... [file_handler, rich_handler],
... "%(levelname)s: %(message)s",
... "%H:%M:%S"
... )
init_logger_basic(log_level: int) -> None
Instantiate a basic logger object to be used across modules.
By using this function to instantiate the logger, you also have access to
logger.dev
for log_level=15, as this is defined in the same module scope
as this function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_level
|
int
|
The level of logging to be recorded. Can be defined either as the
integer level or the logging. |
required |
Returns:
Type | Description |
---|---|
None
|
The logger created by this function is available in any other modules
by using |
log_dev(self, message, *args, **kwargs)
Create a custom log level between INFO and DEBUG named DEV.
This is lifted from: https://stackoverflow.com/a/13638084
log_rows_in_spark_df(func: Callable) -> Callable
Apply decorator to log dataframe row count before and after a function.
Requires that the function being decorated has a parameter called df
and
that the function is called with df
being a keyword argument (e.g.
df=df
). If not the decorator will report back that it could not count the
number of rows of the dataframe before running the decorated function.
Usage
@log_rows_in_spark_df
def my_func_that_changes_no_rows(some_args, df, some_other_args):
...
returns final_df
some_df = my_func_that_changes_no_rows(
some_args='hello',
df=input_df,
some_other_args='world'
)
>>> Rows in dataframe before my_func_that_changes_no_rows : 12345
>>> Rows in dataframe after my_func_that_changes_no_rows : 6789
Warning:
.count()
is an expensive spark operation to perform. Overuse of this
decorator can be detrimental to performance. This decorator will cache the
input dataframe prior to running the count and decorated function, as well
as persisting the output dataframe prior to counting. The input dataframe
is also unpersisted from memory prior to the decorator completing.
log_spark_df_schema(_func: Callable = None, *, log_schema_on_input: bool = True) -> Callable
Apply decorator to log dataframe schema before and after a function.
If you use the `df.printSchema() method directly in a print/log statement the code is processed and printed regardless of logging leve. Instead you need to capture the output and pass this to the logger. See explanaition here - https://stackoverflow.com/a/59935109
Requires that the function being decorated has a parameter called df
and
that the function is called with df
being a keyword argument (e.g.
df=df
). If not the decorator will report back that it could not count the
number of rows of the dataframe before running the decorated function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_schema_on_input
|
bool
|
If set to false, then no schema is attempted to be printed for the decorated function on input. This is useful for instance where function has no df input but does return one (such as when reading a table). |
True
|
Notes
Explainer on complex decorators (and template for decorator structure): https://realpython.com/primer-on-python-decorators/#both-please-but-never-mind-the-bread
Usage
To use decorator to record input and output schema:
>>> @log_spark_df_schema
>>> def my_func_that_changes_some_columns(some_args, df, some_other_args):
>>> ...
>>> returns final_df
>>>
>>> some_df = my_func_that_changes_some_columns(
>>> some_args='hello',
>>> df=input_df,
>>> some_other_args='world'
>>> )
Schema of dataframe before my_func_that_changes_some_columns:
root
|-- price: double (nullable = true)
|-- quantity: long (nullable = true)
Schema of dataframe after my_func_that_changes_some_columns:
root
|-- price: double (nullable = true)
|-- quantity: long (nullable = true)
|-- expenditure: double (nullable = true)
To use decorator to record output schema only:
>>> @log_spark_df_schema(log_schema_on_input=False)
>>> def my_func_that_changes_some_columns(some_args, df, some_other_args):
>>> ...
>>> returns final_df
>>>
>>> some_df = my_func_that_changes_some_columns(
>>> some_args='hello',
>>> df=input_df,
>>> some_other_args='world'
>>> )
Not printing schema of dataframe before my_func_that_changes_some_columns
Schema of dataframe after my_func_that_changes_some_columns:
root
|-- price: double (nullable = true)
|-- quantity: long (nullable = true)
|-- expenditure: double (nullable = true)
print_full_table_and_raise_error(df: pd.DataFrame, message: str, stop_pipeline: bool = False, show_records: bool = False) -> None
Output dataframe records to logger.
The purpose of this function is to enable a user to output a message to the logger with the added functionality of stopping the pipeline and showing dataframe records in a table format. It may be used for instance if a user wants to check the records in a dataframe when it expected to be empty.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrame
|
The dataframe to display records from. |
required |
message
|
str
|
The message to output to the logger. |
required |
stop_pipeline
|
bool
|
Switch for the user to stop the pipeline and raise an error. |
False
|
show_records
|
bool
|
Switch to show records in a dataframe. |
False
|
Returns:
Type | Description |
---|---|
None
|
Displays message to user however nothing is returned from function. |
Raises:
Type | Description |
---|---|
ValueError
|
Raises error and stops pipeline if switch applied. |
timer_args(name: str, logger: Optional[Callable[[str], None]] = logger.info) -> Dict[str, str]
Initialise timer args workaround for 'text' args in codetiming package.
Works with codetiming==1.4.0
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the specific timer log. |
required |
logger
|
Optional[Callable[[str], None]]
|
Optional logger function that can accept a string argument. |
info
|
Returns:
Type | Description |
---|---|
Dict[str, str]
|
Dictionary of arguments to pass to specifc codetiming package Timer. |
rdsa_utils.test_utils
Functions and fixtures used with test suites.
Case(label: Optional[str] = None, marks: Optional[MarkDecorator] = None, **kwargs)
Container for a test case, with optional test ID.
The Case class is to be used in conjunction with parameterize_cases
.
Attributes:
Name | Type | Description |
---|---|---|
label |
marks Optional pytest marks to denote any tests to skip etc. kwargs Parameters used for the test cases. |
Examples:
>>> Case(label="some test name", foo=10, bar="some value")
>>> Case(
>>> label="some test name",
>>> marks=pytest.mark.skip(reason='not implemented'),
>>> foo=10,
>>> bar="some value"
>>> )
See Also
Modified from https://github.com/ckp95/pytest-parametrize-cases to allow pytest mark usage.
Initialise objects.
__repr__() -> str
Return string.
create_dataframe(data: List[Tuple[str]], **kwargs) -> pd.DataFrame
Create pandas df from tuple data with a header.
create_spark_df(spark_session)
Create Spark DataFrame from tuple data with first row as schema.
Example:
create_spark_df([ ('column1', 'column2', 'column3'), ('aaaa', 1, 1.1) ])
Can specify the schema alongside the column names: create_spark_df([ ('column1 STRING, column2 INT, column3 DOUBLE'), ('aaaa', 1, 1.1) ])
parametrize_cases(*cases: Case)
More user friendly parameterize cases testing.
Utilise as a decorator on top of test function.
Examples:
@parameterize_cases(
Case(
label="some test name",
foo=10,
bar="some value"
),
Case(
label="some test name #2",
foo=20,
bar="some other value"
),
)
def test(foo, bar):
...
See Also
Source: https://github.com/ckp95/pytest-parametrize-cases
spark_session()
Set up spark session fixture.
suppress_py4j_logging()
Suppress spark logging.
to_date(dt: str) -> datetime.date
Convert date string to datetime.date type.
to_datetime(dt: str) -> datetime.datetime
Convert datetime string to datetime.datetime type.
to_spark(spark_session)
Convert pandas df to spark.
rdsa_utils.typing
Contains custom types for type hinting.
rdsa_utils.validation
Functions that support the use of pydantic validators.
allowed_date_format(date: str) -> str
Ensure that the date string can be converted to a useable datetime.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
date
|
str
|
The specified date string. |
required |
Returns:
Type | Description |
---|---|
str
|
The input date. |
Raises:
Type | Description |
---|---|
ValueError
|
If the date is not one of the predefined allowed formats. |
apply_validation(config: Mapping[str, Any], Validator: Optional[BaseModel]) -> Mapping[str, Any]
Apply validation model to config.
If no Validator is passed, then a warning will be logged and the input config returned without validation. This mechanism is to allow the use of this function to aid in tracking config sections that are unvalidated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
Mapping[str, Any]
|
The config for validating. |
required |
Validator
|
Optional[BaseModel]
|
Validator class for the config. |
required |
optional
|
Optional[BaseModel]
|
Validator class for the config. |
required |
Returns:
Type | Description |
---|---|
Mapping[str, Any]
|
The input config after being passed through the validator. |
list_convert_validator(*args, **kwargs) -> Callable
Wrapper to set kwargs for list_convert validator.
CDP
rdsa_utils.cdp.helpers.s3_utils
Utility functions for interacting with AWS S3.
To initialise a boto3 client for S3 and configure it with Ranger RAZ and SSL certificate, you can use the following code snippet:
import boto3
import raz_client
ssl_file_path = "/path/to/your/ssl_certificate.crt"
# Create a boto3 client for S3
client = boto3.client("s3")
# Configure the client with RAZ and SSL certificate
raz_client.configure_ranger_raz(client, ssl_file=ssl_file_path)
Note:
- The `raz-client` library is required only when running in a
managed Cloudera environment.
- You can install it using `pip install raz-client` when needed.
copy_file(client: boto3.client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str, overwrite: bool = False) -> bool
Copy a file from one aWS S3 bucket to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
source_bucket_name
|
str
|
The name of the source bucket. |
required |
source_object_name
|
str
|
The S3 object name of the source file. |
required |
destination_bucket_name
|
str
|
The name of the destination bucket. |
required |
destination_object_name
|
str
|
The S3 object name of the destination file. |
required |
overwrite
|
bool
|
If True, overwrite the destination file if it already exists. |
False
|
Returns:
Type | Description |
---|---|
bool
|
True if the file was copied successfully, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> copy_file(
... client,
... 'source-bucket',
... 'source_file.txt',
... 'destination-bucket',
... 'destination_file.txt'
... )
True
create_folder_on_s3(client: boto3.client, bucket_name: str, folder_path: str) -> bool
Create a folder in an AWS S3 bucket if it doesn't already exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the bucket where the folder will be created. |
required |
folder_path
|
str
|
The name of the folder to create. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the folder was created successfully or already exists, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> create_folder_on_s3(client, 'mybucket', 'new_folder/')
True
delete_file(client: boto3.client, bucket_name: str, object_name: str, overwrite: bool = False) -> bool
Delete a file from an AWS S3 bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the bucket from which the file will be deleted. |
required |
object_name
|
str
|
The S3 object name of the file to delete. |
required |
overwrite
|
bool
|
If False, the function will not delete the file if it does not exist; set to True to ignore non-existence on delete. |
False
|
Returns:
Type | Description |
---|---|
bool
|
True if the file was deleted successfully, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> delete_file(client, 'mybucket', 'folder/s3_file.txt')
True
delete_folder(client: boto3.client, bucket_name: str, folder_path: str) -> bool
Delete a folder in an AWS S3 bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
folder_path
|
str
|
The path of the folder to delete. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the folder was deleted successfully, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> delete_folder(client, 'mybucket', 'path/to/folder/')
True
download_file(client: boto3.client, bucket_name: str, object_name: str, local_path: str, overwrite: bool = False) -> bool
Download a file from an AWS S3 bucket to a local directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket from which to download the file. |
required |
object_name
|
str
|
The S3 object name of the file to download. |
required |
local_path
|
str
|
The local file path where the downloaded file will be saved. |
required |
overwrite
|
bool
|
If True, overwrite the local file if it exists. |
False
|
Returns:
Type | Description |
---|---|
bool
|
True if the file was downloaded successfully, False otherwise. |
Examples:
>>> client = boto3.client('s3')
>>> download_file(
... client,
... 'mybucket',
... 'folder/s3_file.txt',
... '/path/to/download.txt'
... )
True
download_folder(client: boto3.client, bucket_name: str, prefix: str, local_path: str, overwrite: bool = False) -> bool
Download a folder from an AWS S3 bucket to a local directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket from which to download the folder. |
required |
prefix
|
str
|
The S3 prefix of the folder to download. |
required |
local_path
|
str
|
The local directory path where the downloaded folder will be saved. |
required |
overwrite
|
bool
|
If True, overwrite existing local files if they exist. |
False
|
Returns:
Type | Description |
---|---|
bool
|
True if the folder was downloaded successfully, False otherwise. |
Examples:
>>> client = boto3.client('s3')
>>> download_folder(
... client,
... 'mybucket',
... 'folder/subfolder/',
... '/path/to/local_folder',
... overwrite=False
... )
True
file_exists(client: boto3.client, bucket_name: str, object_name: str) -> bool
Check if a specific file exists in an AWS S3 bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
object_name
|
str
|
The S3 object name to check for existence. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the file exists, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> file_exists(client, 'mybucket', 'folder/file.txt')
True
is_s3_directory(client: boto3.client, bucket_name: str, object_name: str) -> bool
Check if an AWS S3 key is a directory by listing its contents.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
object_name
|
str
|
The S3 object name to check. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the key represents a directory, False otherwise. |
list_files(client: boto3.client, bucket_name: str, prefix: str = '') -> List[str]
List files in an AWS S3 bucket that match a specific prefix.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client. |
required |
bucket_name
|
str
|
The name of the bucket. |
required |
prefix
|
str
|
The prefix to filter files, by default "". |
''
|
Returns:
Type | Description |
---|---|
List[str]
|
A list of S3 object keys matching the prefix. |
Examples:
>>> client = boto3.client('s3')
>>> list_files(client, 'mybucket', 'folder_prefix/')
['folder_prefix/file1.txt', 'folder_prefix/file2.txt']
load_csv(client: boto3.client, bucket_name: str, filepath: str, keep_columns: Optional[List[str]] = None, rename_columns: Optional[Dict[str, str]] = None, drop_columns: Optional[List[str]] = None, **kwargs) -> pd.DataFrame
Load a CSV file from an S3 bucket into a Pandas DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
filepath
|
str
|
The key (full path and filename) of the CSV file in the S3 bucket. |
required |
keep_columns
|
Optional[List[str]]
|
A list of column names to keep in the DataFrame, dropping all others. Default value is None. |
None
|
rename_columns
|
Optional[Dict[str, str]]
|
A dictionary to rename columns where keys are existing column names and values are new column names. Default value is None. |
None
|
drop_columns
|
Optional[List[str]]
|
A list of column names to drop from the DataFrame. Default value is None. |
None
|
kwargs
|
Additional keyword arguments to pass to the |
{}
|
Returns:
Type | Description |
---|---|
DataFrame
|
Pandas DataFrame containing the data from the CSV file. |
Raises:
Type | Description |
---|---|
InvalidBucketNameError
|
If the bucket name does not meet AWS specifications. |
InvalidS3FilePathError
|
If the file_path contains an S3 URI scheme like 's3://' or 's3a://'. |
Exception
|
If there is an error loading the file. |
ValueError
|
If a column specified in rename_columns, drop_columns, or keep_columns is not found in the DataFrame. |
Notes
Transformation order:
1. Columns are kept according to keep_columns
.
2. Columns are dropped according to drop_columns
.
3. Columns are renamed according to rename_columns
.
Examples:
Load a CSV file and rename columns:
>>> df = load_csv(
client,
"my-bucket",
"path/to/file.csv",
rename_columns={"old_name": "new_name"}
)
Load a CSV file and keep only specific columns:
>>> df = load_csv(
client,
"my-bucket",
"path/to/file.csv",
keep_columns=["col1", "col2"]
)
Load a CSV file and drop specific columns:
>>> df = load_csv(
client,
"my-bucket",
"path/to/file.csv",
drop_columns=["col1", "col2"]
)
Load a CSV file with custom delimiter:
>>> df = load_csv(
client,
"my-bucket",
"path/to/file.csv",
sep=";"
)
load_json(client: boto3.client, bucket_name: str, filepath: str, encoding: Optional[str] = 'utf-8') -> Dict
Load a JSON file from an S3 bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
filepath
|
str
|
The key (full path and filename) of the JSON file in the S3 bucket. |
required |
encoding
|
Optional[str]
|
The encoding of the JSON file. Default is 'utf-8'. |
'utf-8'
|
Returns:
Type | Description |
---|---|
Dict
|
Dictionary containing the data from the JSON file. |
Raises:
Type | Description |
---|---|
InvalidBucketNameError
|
If the bucket name is invalid according to AWS rules. |
Exception
|
If there is an error loading the file from S3 or parsing the JSON. |
Examples:
>>> client = boto3.client('s3')
>>> data = load_json(client, 'my-bucket', 'path/to/file.json')
>>> print(data)
{
"name": "John",
"age": 30,
"city": "Manchester"
}
move_file(client: boto3.client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str) -> bool
Move a file within or between AWS S3 buckets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
source_bucket_name
|
str
|
The name of the source S3 bucket. |
required |
source_object_name
|
str
|
The S3 object name of the source file. |
required |
destination_bucket_name
|
str
|
The name of the destination S3 bucket. |
required |
destination_object_name
|
str
|
The S3 object name of the destination file. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the file was moved successfully, False otherwise. |
Examples:
>>> client = boto3.client('s3')
>>> move_file(
... client,
... 'sourcebucket',
... 'source_folder/file.txt',
... 'destbucket',
... 'dest_folder/file.txt'
... )
True
remove_leading_slash(text: str) -> str
Remove the leading forward slash from a string if present.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text
|
str
|
The text from which the leading slash will be removed. |
required |
Returns:
Type | Description |
---|---|
str
|
The text stripped of its leading slash. |
Examples:
>>> remove_leading_slash('/example/path')
'example/path'
upload_file(client: boto3.client, bucket_name: str, local_path: str, object_name: Optional[str] = None, overwrite: bool = False) -> bool
Upload a file to an Amazon S3 bucket from local directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the target S3 bucket. |
required |
local_path
|
str
|
The file path on the local system to upload. |
required |
object_name
|
Optional[str]
|
The target S3 object name. If None, uses the base name of the local file path. |
None
|
overwrite
|
bool
|
If True, the existing file on S3 will be overwritten. |
False
|
Returns:
Type | Description |
---|---|
bool
|
True if the file was uploaded successfully, False otherwise. |
Examples:
>>> client = boto3.client('s3')
>>> upload_file(
... client,
... 'mybucket',
... '/path/to/file.txt',
... 'folder/s3_file.txt'
... )
True
upload_folder(client: boto3.client, bucket_name: str, local_path: str, prefix: str = '', overwrite: bool = False) -> bool
Upload an entire folder from the local file system to an AWS S3 bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the bucket to which the folder will be uploaded. |
required |
local_path
|
str
|
The path to the local folder to upload. |
required |
prefix
|
str
|
The prefix to prepend to each object name when uploading to S3. |
''
|
overwrite
|
bool
|
If True, overwrite existing files in the bucket. |
False
|
Returns:
Type | Description |
---|---|
bool
|
True if the folder was uploaded successfully, otherwise False. |
Examples:
>>> client = boto3.client('s3')
>>> upload_folder(
... client,
... 'mybucket',
... '/path/to/local/folder',
... 'folder_prefix',
... True
... )
True
validate_bucket_name(bucket_name: str) -> str
Validate the format of an AWS S3 bucket name according to AWS rules.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name
|
str
|
The name of the bucket to validate. |
required |
Returns:
Type | Description |
---|---|
str
|
The validated bucket name if valid. |
Raises:
Type | Description |
---|---|
InvalidBucketNameError
|
If the bucket name does not meet AWS specifications. |
Examples:
>>> validate_bucket_name('valid-bucket-name')
'valid-bucket-name'
>>> validate_bucket_name('Invalid_Bucket_Name')
InvalidBucketNameError: Bucket name must not contain underscores.
validate_s3_file_path(file_path: str, allow_s3_scheme: bool) -> str
Validate the file path based on the S3 URI scheme.
If allow_s3_scheme
is True, the file path must contain an S3 URI scheme
(either 's3://' or 's3a://').
If allow_s3_scheme
is False, the file path should not contain an S3 URI scheme.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path
|
str
|
The file path to validate. |
required |
allow_s3_scheme
|
bool
|
Whether or not to allow an S3 URI scheme in the file path. |
required |
Returns:
Type | Description |
---|---|
str
|
The validated file path if valid. |
Raises:
Type | Description |
---|---|
InvalidS3FilePathError
|
If the validation fails based on the value of |
Examples:
>>> validate_s3_file_path('data_folder/data.csv', allow_s3_scheme=False)
'data_folder/data.csv'
>>> validate_s3_file_path('s3a://bucket-name/data.csv', allow_s3_scheme=True)
's3a://bucket-name/data.csv'
>>> validate_s3_file_path('s3a://bucket-name/data.csv', allow_s3_scheme=False)
InvalidS3FilePathError: The file_path should not contain an S3 URI scheme
like 's3://' or 's3a://'.
write_csv(client: boto3.client, bucket_name: str, data: pd.DataFrame, filepath: str, **kwargs) -> bool
Write a Pandas Dataframe to csv in an S3 bucket.
Uses StringIO library as a RAM buffer, so at first Pandas writes data to the buffer, then the buffer returns to the beginning, and then it is sent to the S3 bucket using the boto3.put_object method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client
|
client
|
The boto3 S3 client instance. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
data
|
DataFrame
|
The dataframe to write to the spexified path. |
required |
filepath
|
str
|
The filepath to save the dataframe to. |
required |
kwargs
|
Optional dictionary of Pandas to_csv arguments. |
{}
|
Returns:
Type | Description |
---|---|
bool
|
True if the dataframe is written successfully. False if it was not possible to serialise or write the file. |
Raises:
Type | Description |
---|---|
Exception
|
If there is an error writing the file to S3. |
Examples:
>>> client = boto3.client('s3')
>>> data = pd.DataFrame({
>>> 'column1': [1, 2, 3],
>>> 'column2': ['a', 'b', 'c']
>>> })
>>> write_csv(client, 'my_bucket', data, 'path/to/file.csv')
True
rdsa_utils.cdp.helpers.hdfs_utils
Utility functions for interacting with HDFS.
change_permissions(path: str, permission: str, recursive: bool = False) -> bool
Change directory and file permissions in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to the file or directory in HDFS. |
required |
permission
|
str
|
The permission to be set, e.g., 'go+rwx' or '777'. |
required |
recursive
|
bool
|
If True, changes permissions for all subdirectories and files within a directory. |
False
|
Returns:
Type | Description |
---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
copy(from_path: str, to_path: str, overwrite: bool = False) -> bool
Copy a file in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path
|
str
|
The source path of the file in HDFS. |
required |
to_path
|
str
|
The target path of the file in HDFS. |
required |
overwrite
|
bool
|
If True, the existing file at the target path will be overwritten, default is False. |
False
|
Returns:
Type | Description |
---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
Raises:
Type | Description |
---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
copy_local_to_hdfs(from_path: str, to_path: str) -> bool
Copy a local file to HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path
|
str
|
The path to the local file. |
required |
to_path
|
str
|
The path to the HDFS directory where the file will be copied. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
create_dir(path: str) -> bool
Create a directory in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The HDFS path where the directory should be created. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the operation is successful (directory created), otherwise False. |
create_txt_from_string(path: str, string_to_write: str, replace: Optional[bool] = False) -> None
Create and populate a text file in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to the new file to be created, for example, '/some/directory/newfile.txt'. |
required |
string_to_write
|
str
|
The string that will populate the new text file. |
required |
replace
|
Optional[bool]
|
Flag determining whether an existing file should be replaced. Defaults to False. |
False
|
Returns:
Type | Description |
---|---|
None
|
This function doesn't return anything; it's used for its side effect of creating a text file. |
Raises:
Type | Description |
---|---|
FileNotFoundError
|
If |
delete_dir(path: str) -> bool
Delete an empty directory from HDFS.
This function attempts to delete an empty directory in HDFS. If the directory is not empty, the deletion will fail.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The HDFS path to the directory to be deleted. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the operation is successful (directory deleted), otherwise False. |
Note
This function will only succeed if the directory is empty.
To delete directories containing files or other directories,
consider using delete_path
instead.
delete_file(path: str) -> bool
Delete a specific file in HDFS.
This function is used to delete a single file located at the specified HDFS path. If the path points to a directory, the command will fail.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to the file in HDFS to be deleted. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the file was successfully deleted (command return code 0), otherwise False. |
Raises:
Type | Description |
---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
Note
This function is intended for files only. For directory deletions,
use delete_dir
or delete_path
.
delete_path(path: str) -> bool
Delete a file or directory in HDFS, including non-empty directories.
This function is capable of deleting both files and directories. When applied to directories, it will recursively delete all contents within the directory, making it suitable for removing directories regardless of whether they are empty or contain files or other directories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to the file or directory in HDFS to be deleted. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the file was successfully deleted (command return code 0), otherwise False. |
Raises:
Type | Description |
---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
Warning
Use with caution: applying this function to a directory will remove all contained files and subdirectories without confirmation.
file_exists(path: str) -> bool
Check whether a file exists in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to the file in HDFS to be checked for existence. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the file exists (command return code 0), otherwise False. |
Raises:
Type | Description |
---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
get_date_modified(filepath: str) -> str
Return the last modified date of a file in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath
|
str
|
The path to the file in HDFS. |
required |
Returns:
Type | Description |
---|---|
str
|
The date the file was last modified. |
is_dir(path: str) -> bool
Test if a directory exists in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The HDFS path to the directory to be tested. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the operation is successful (directory exists), otherwise False. |
move_local_to_hdfs(from_path: str, to_path: str) -> bool
Move a local file to HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path
|
str
|
The path to the local file. |
required |
to_path
|
str
|
The path to the HDFS directory where the file will be moved. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
read_dir(path: str) -> List[str]
Read the contents of a directory in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to the directory in HDFS. |
required |
Returns:
Type | Description |
---|---|
List[str]
|
A list of full paths of the items found in the directory. |
read_dir_files(path: str) -> List[str]
Read the filenames in a directory in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to the directory in HDFS. |
required |
Returns:
Type | Description |
---|---|
List[str]
|
A list of filenames in the directory. |
read_dir_files_recursive(path: str, return_path: bool = True) -> List[str]
Recursively reads the contents of a directory in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to the directory in HDFS. |
required |
return_path
|
bool
|
If True, returns the full path of the files, otherwise just the filename. |
True
|
Returns:
Type | Description |
---|---|
List[str]
|
A list of files in the directory. |
rename(from_path: str, to_path: str, overwrite: bool = False) -> bool
Rename (i.e., move using full path) a file in HDFS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path
|
str
|
The source path of the file in HDFS. |
required |
to_path
|
str
|
The target path of the file in HDFS. |
required |
overwrite
|
bool
|
If True, the existing file at the target path will be overwritten, default is False. |
False
|
Returns:
Type | Description |
---|---|
bool
|
True if the operation was successful (command return code 0), otherwise False. |
Raises:
Type | Description |
---|---|
TimeoutExpired
|
If the process does not complete within the default timeout. |
rdsa_utils.cdp.helpers.impala
Utilities for working with Impala.
invalidate_impala_metadata(table: str, impalad_address_port: str, impalad_ca_cert: str, keep_stderr: Optional[bool] = False)
Automate the invalidation of a table's metadata using impala-shell.
This function uses the impala-shell command with the given impalad_address_port and impalad_ca_cert, to invalidate a specified table's metadata.
It proves useful during a data pipeline's execution after writing to an intermediate Hive table. Using Impala Query Editor in Hue, end-users often need to run "INVALIDATE METADATA" command to refresh a table's metadata. However, this manual step can be missed, leading to potential use of outdated metadata.
The function automates the "INVALIDATE METADATA" command for a given table, ensuring up-to-date metadata for future queries. This reduces manual intervention, making outdated metadata issues less likely to occur.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table
|
str
|
Name of the table for metadata invalidation. |
required |
impalad_address_port
|
str
|
'address:port' of the impalad instance. |
required |
impalad_ca_cert
|
str
|
Path to impalad's CA certificate file. |
required |
keep_stderr
|
Optional[bool]
|
If True, will print impala-shell command's stderr output. |
False
|
Returns:
Type | Description |
---|---|
None
|
|
Examples:
>>> invalidate_impala_metadata(
... 'my_table',
... 'localhost:21050',
... '/path/to/ca_cert.pem'
... )
>>> invalidate_impala_metadata(
... 'my_table',
... 'localhost:21050',
... '/path/to/ca_cert.pem',
... keep_stderr=True
... )
rdsa_utils.cdp.io.pipeline_runlog
Utilities for managing a Pipeline Runlog using Hive Tables.
add_runlog_entry(spark: SparkSession, desc: str, version: str, config: Union[ConfigParser, Dict[str, str]], pipeline: Optional[str] = None, log_table: str = 'pipeline_runlog', run_id: Optional[int] = None) -> DataFrame
Add an entry to a target runlog.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark
|
SparkSession
|
A running spark session. |
required |
desc
|
str
|
Description to attach to the log entry. |
required |
version
|
str
|
Version of the pipeline. |
required |
config
|
Union[ConfigParser, Dict[str, str]]
|
Configuration object for the run. |
required |
pipeline
|
Optional[str]
|
Pipeline name. If None, uses the spark application name. |
None
|
log_table
|
str
|
Target runlog table. If database not set, this should include the database. |
'pipeline_runlog'
|
run_id
|
Optional[int]
|
Run id to use if already reserved. If not specified, a new one is generated. |
None
|
Returns:
Type | Description |
---|---|
DataFrame
|
The log entry returned as a spark dataframe. |
create_runlog_entry(spark: SparkSession, run_id: int, desc: str, version: str, config: Union[ConfigParser, Dict[str, str]], pipeline: Optional[str] = None) -> DataFrame
Create an entry for the runlog.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark
|
SparkSession
|
A running spark session. |
required |
run_id
|
int
|
Entry run id. |
required |
desc
|
str
|
Description to attach to the log entry. |
required |
version
|
str
|
Version of the pipeline. |
required |
config
|
Union[ConfigParser, Dict[str, str]]
|
Configuration object for the run. |
required |
pipeline
|
Optional[str]
|
Pipeline name. If None, derives from spark app name. |
None
|
Returns:
Type | Description |
---|---|
DataFrame
|
The log entry returned as a spark dataframe. |
create_runlog_table(spark: SparkSession, database: str, tablename: Optional[str] = 'pipeline_runlog') -> None
Create runlog and _reserved_ids tables in the target database if needed.
This function executes two SQL queries to create two tables, if they do not already exist in the target database. The first table's structure includes columns for run_id, desc, user, datetime, pipeline_name, pipeline_version, and config, while the second table includes run_id and reserved_date.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark
|
SparkSession
|
A running spark session which will be used to execute SQL queries. |
required |
database
|
str
|
The name of the target database where tables will be created. |
required |
tablename
|
Optional[str]
|
The name of the main table to be created (default is "pipeline_runlog"). The associated _reserved_ids table will be suffixed with this name. |
'pipeline_runlog'
|
Returns:
Type | Description |
---|---|
None
|
|
Examples:
>>> spark = SparkSession.builder.appName("test_session").getOrCreate()
>>> create_runlog_table(spark, "test_db", "test_table")
get_last_run_id(spark: SparkSession, pipeline: Optional[str] = None, log_table: str = 'pipeline_runlog') -> Optional[int]
Retrieve the last run_id, either in general or for a specific pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark
|
SparkSession
|
A running Spark session. |
required |
pipeline
|
Optional[str]
|
If specified, the result will be for the listed pipeline only. |
None
|
log_table
|
str
|
The target runlog table. If the database is not set, this should include the database. |
'pipeline_runlog'
|
Returns:
Type | Description |
---|---|
int or None
|
The id of the last run. Returns None if the log table is empty. |
get_penultimate_run_id(spark: SparkSession, pipeline: Optional[str] = None, log_table: str = 'pipeline_runlog') -> Optional[int]
Retrieve penultimate run_id in general or a specific pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark
|
SparkSession
|
A running Spark session. |
required |
pipeline
|
Optional[str]
|
If specified, the result will be for the listed pipeline only. |
None
|
log_table
|
str
|
The target runlog table. If the database is not set, this should include the database. |
'pipeline_runlog'
|
Returns:
Type | Description |
---|---|
int or None
|
The id of the penultimate run. Returns None if the log table is empty or has less than two entries. |
reserve_id(spark: SparkSession, log_table: Optional[str] = 'pipeline_runlog') -> int
Reserve a run id in the reserved ids table linked to the runlog table.
The function reads the last run id from the reserved ids table, increments it to create a new id,and writes the new id with the current timestamp to the reserved ids table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark
|
SparkSession
|
A running SparkSession instance. |
required |
log_table
|
Optional[str]
|
The name of the main pipeline runlog table associated with this reserved id table, by default "pipeline_runlog". |
'pipeline_runlog'
|
Returns:
Type | Description |
---|---|
int
|
The new run id. |
write_runlog_file(spark: SparkSession, runlog_table: str, runlog_id: int, path: str) -> None
Write metadata from runlog entry to a text file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark
|
SparkSession
|
A running SparkSession instance. |
required |
runlog_table
|
str
|
The name of the table containing the runlog entries. |
required |
runlog_id
|
int
|
The id of the desired entry. |
required |
path
|
str
|
The HDFS path where the file will be written. |
required |
Returns:
Type | Description |
---|---|
None
|
This function doesn't return anything; it's used for its side effect of creating a text file. |
rdsa_utils.cdp.io.input
Read inputs on CDP.
extract_database_name(spark: SparkSession, long_table_name: str) -> Tuple[str, str]
Extract the database component and table name from a compound table name.
This function can handle multiple scenarios:
-
For GCP's naming format '
. . ', the function will return the database and table name.
If the name is formatted as 'db_name.table_name', the function will extract and return the database and table names.
If the long_table_name contains only the table name (e.g., 'table_name'), the function will use the current database of the SparkSession.
For any other incorrectly formatted names, the function will raise a ValueError.
Parameters:
Name Type Description Default spark
SparkSession
Active SparkSession.
required long_table_name
str
Full name of the table, which can include the GCP project and/or database name.
required Returns:
Type Description Tuple[str, str]
A tuple containing the name of the database and the table name.
Raises:
Type Description ValueError
If the table name doesn't match any of the expected formats.
get_current_database(spark: SparkSession) -> str
Retrieve the current database from the active SparkSession.
get_tables_in_database(spark: SparkSession, database_name: str) -> List[str]
Get a list of tables in a given database.
Parameters:
Name Type Description Default spark
SparkSession
Active SparkSession.
required database_name
str
The name of the database from which to list tables.
required Returns:
Type Description List[str]
A list of table names in the specified database.
Raises:
Type Description ValueError
If there is an error fetching tables from the specified database.
Examples:
>>> tables = get_tables_in_database(spark, "default") >>> print(tables) ['table1', 'table2', 'table3']
load_and_validate_table(spark: SparkSession, table_name: str, skip_validation: bool = False, err_msg: str = None, filter_cond: str = None, keep_columns: Optional[List[str]] = None, rename_columns: Optional[Dict[str, str]] = None, drop_columns: Optional[List[str]] = None) -> SparkDF
Load a table, apply transformations, and validate if it is not empty.
Parameters:
Name Type Description Default spark
SparkSession
Active SparkSession.
required table_name
str
Name of the table to load.
required skip_validation
bool
If True, skips validation step, by default False.
False
err_msg
str
Error message to return if table is empty, by default None.
None
filter_cond
str
Condition to apply to SparkDF once read, by default None.
None
keep_columns
Optional[List[str]]
A list of column names to keep in the DataFrame, dropping all others. Default value is None.
None
rename_columns
Optional[Dict[str, str]]
A dictionary to rename columns where keys are existing column names and values are new column names. Default value is None.
None
drop_columns
Optional[List[str]]
A list of column names to drop from the DataFrame. Default value is None.
None
Returns:
Type Description DataFrame
Loaded SparkDF if validated, subject to options above.
Raises:
Type Description PermissionError
If there's an issue accessing the table or if the table does not exist in the specified database.
ValueError
If the table is empty after loading, becomes empty after applying a filter condition, or if columns specified in keep_columns, drop_columns, or rename_columns do not exist in the DataFrame.
Notes
Transformation order: 1. Columns are kept according to
keep_columns
. 2. Columns are dropped according todrop_columns
. 3. Columns are renamed according torename_columns
.Examples:
Load a table, apply a filter, and validate it:
>>> df = load_and_validate_table( spark=spark, table_name="my_table", filter_cond="age > 21" )
Load a table and keep only specific columns:
>>> df = load_and_validate_table( spark=spark, table_name="my_table", keep_columns=["name", "age", "city"] )
Load a table, drop specific columns, and rename a column:
>>> df = load_and_validate_table( spark=spark, table_name="my_table", drop_columns=["extra_column"], rename_columns={"name": "full_name"} )
Load a table, skip validation, and apply all transformations:
>>> df = load_and_validate_table( spark=spark, table_name="my_table", skip_validation=True, keep_columns=["name", "age", "city"], drop_columns=["extra_column"], rename_columns={"name": "full_name"}, filter_cond="age > 21" )
rdsa_utils.cdp.io.output
Write outputs on CDP.
insert_df_to_hive_table(spark: SparkSession, df: SparkDF, table_name: str, overwrite: bool = False, fill_missing_cols: bool = False, repartition_data_by: Union[int, str, None] = None) -> None
Write SparkDF to Hive table with optional configuration.
This function writes data from a SparkDF into a Hive table, handling missing columns and optional repartitioning. It ensures the table's column order matches the DataFrame and manages different overwrite behaviors for partitioned and non-partitioned data.
Parameters:
Name Type Description Default spark
SparkSession
Active SparkSession.
required df
DataFrame
SparkDF containing data to be written.
required table_name
str
Name of the Hive table to write data into.
required overwrite
bool
Controls how existing data is handled, default is False:
For non-partitioned data: - True: Replaces entire table with DataFrame data. - False: Appends DataFrame data to existing table.
For partitioned data: - True: Replaces data only in partitions present in DataFrame. - False: Appends data to existing partitions or creates new ones.
False
fill_missing_cols
bool
If True, adds missing columns as NULL values. If False, raises an error on schema mismatch, default is False.
- Explicitly casts DataFrame columns to match the Hive table schema to avoid type mismatch errors.
- Adds missing columns as NULL values when
fill_missing_cols
is True, regardless of their data type (e.g., String, Integer, Double, Boolean, etc.).
False
repartition_data_by
Union[int, str, None]
Controls data repartitioning, default is None: - int: Sets target number of partitions. - str: Specifies column to repartition by. - None: No repartitioning performed.
None
Notes
When using repartition with a number: - Affects physical file structure but preserves Hive partitioning scheme. - Controls number of output files per write operation per Hive partition. - Maintains partition-based query optimization.
When repartitioning by column: - Helps balance file sizes across Hive partitions. - Reduces creation of small files.
Raises:
Type Description AnalysisException
If there's an error reading the table. This can occur if the table doesn't exist or if there's no access to it.
ValueError
If the SparkDF schema does not match the Hive table schema and 'fill_missing_cols' is set to False.
DataframeEmptyError
If input DataFrame is empty.
Exception
For other general exceptions when writing data to the table.
Examples:
Write a DataFrame to a Hive table without overwriting:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table" ... )
Overwrite an existing table with a DataFrame:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table", ... overwrite=True ... )
Write a DataFrame to a Hive table with missing columns filled:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table", ... fill_missing_cols=True ... )
Repartition by column before writing to Hive:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table", ... repartition_data_by="partition_column" ... )
Repartition into a fixed number of partitions before writing:
>>> insert_df_to_hive_table( ... spark=spark, ... df=df, ... table_name="my_database.my_table", ... repartition_data_by=10 ... )
save_csv_to_hdfs(df: SparkDF, file_name: str, file_path: str, overwrite: bool = True) -> None
Save DataFrame as CSV on HDFS, coalescing to a single partition.
This function saves a PySpark DataFrame to HDFS in CSV format. By coalescing the DataFrame into a single partition before saving, it accomplishes two main objectives:
-
Single Part File: The output is a single CSV file rather than multiple part files. This method reduces complexity and cuts through the clutter of multi-part files, offering users and systems a more cohesive and hassle-free experience.
-
Preserving Row Order: Coalescing into a single partition maintains the order of rows as they appear in the DataFrame. This is essential when the row order matters for subsequent processing or analysis. It's important to note, however, that coalescing can have performance implications for very large DataFrames by concentrating all data processing on a single node.
Parameters:
Name Type Description Default df
DataFrame
PySpark DataFrame to be saved.
required file_name
str
Name of the CSV file. Must include the ".csv" extension.
required file_path
str
HDFS path where the CSV file should be saved.
required overwrite
bool
If True, overwrite any existing file with the same name. If False and the file exists, the function will raise an error.
True
Raises:
Type Description ValueError
If the file_name does not end with ".csv".
IOError
If overwrite is False and the target file already exists.
Examples:
Saving to an S3 bucket using the
s3a://
scheme:# Assume `df` is a pre-defined PySpark DataFrame file_name = "data_output.csv" file_path = "s3a://my-bucket/data_folder/" save_csv_to_hdfs(df, file_name, file_path, overwrite=True)
Saving to a normal HDFS path:
# Assume `df` is a pre-defined PySpark DataFrame file_name = "data_output.csv" file_path = "/user/hdfs/data_folder/" save_csv_to_hdfs(df, file_name, file_path, overwrite=True)
save_csv_to_s3(df: SparkDF, bucket_name: str, file_name: str, file_path: str, s3_client: boto3.client, overwrite: bool = True) -> None
Save DataFrame as CSV on S3, coalescing to a single partition.
This function saves a PySpark DataFrame to S3 in CSV format. By coalescing the DataFrame into a single partition before saving, it accomplishes two main objectives:
-
Single Part File: The output is a single CSV file rather than multiple part files. This method reduces complexity and cuts through the clutter of multi-part files, offering users and systems a more cohesive and hassle-free experience.
-
Preserving Row Order: Coalescing into a single partition maintains the order of rows as they appear in the DataFrame. This is essential when the row order matters for subsequent processing or analysis. It's important to note, however, that coalescing can have performance implications for very large DataFrames by concentrating all data processing on a single node.
Parameters:
Name Type Description Default df
DataFrame
PySpark DataFrame to be saved.
required bucket_name
str
The name of the S3 bucket where the CSV file should be saved.
required file_name
str
Name of the CSV file. Must include the ".csv" extension.
required file_path
str
S3 path where the CSV file should be saved.
required s3_client
client
The boto3 S3 client instance.
required overwrite
bool
If True, overwrite any existing file with the same name. If False and the file exists, the function will raise an error.
True
Raises:
Type Description ValueError
If the file_name does not end with ".csv".
InvalidBucketNameError
If the bucket name does not meet AWS specifications.
InvalidS3FilePathError
If the file_path contains an S3 URI scheme like 's3://' or 's3a://'.
IOError
If overwrite is False and the target file already exists.
Examples:
Saving to an S3 bucket:
# Assume `df` is a pre-defined PySpark DataFrame file_name = "data_output.csv" file_path = "data_folder/" s3_client = boto3.client('s3') save_csv_to_s3( df, 'my-bucket', file_name, file_path, s3_client, overwrite=True )
write_and_read_hive_table(spark: SparkSession, df: SparkDF, table_name: str, database: str, filter_id: Union[int, str], filter_col: str = 'run_id', fill_missing_cols: bool = False) -> SparkDF
Write a SparkDF to an existing Hive table and then read it back.
Parameters:
Name Type Description Default spark
SparkSession
Active SparkSession.
required df
DataFrame
The SparkDF to be written to the Hive table.
required table_name
str
The name of the Hive table to write to and read from.
required database
str
The Hive database name.
required filter_id
Union[int, str]
The identifier to filter on when reading data back from the Hive table.
required filter_col
str
The column name to use for filtering data when reading back from the Hive table, by default 'run_id'.
'run_id'
fill_missing_cols
bool
If True, missing columns in the DataFrame will be filled with nulls when writing to the Hive table, by default False.
False
Returns:
Type Description DataFrame
The DataFrame read from the Hive table.
Raises:
Type Description ValueError
If the specified Hive table does not exist in the given database or if the provided DataFrame doesn't contain the specified filter column.
Exception
For general exceptions encountered during execution.
Notes
This function assumes the Hive table already exists. The DataFrame
df
should have the same schema as the Hive table for the write to succeed.The function allows for more effective memory management when dealing with large PySpark DataFrames by leveraging Hive's on-disk storage.
Predicate pushdown is used when reading the data back into a PySpark DataFrame, minimizing the memory usage and optimizing the read operation.
As part of the design, there is always a column called filter_col in the DataFrame and Hive table to track pipeline runs.
The Hive table contains all the runs, and we only read back the run that we just wrote to the Hive Table using the
filter_id
parameter. If nofilter_col
is specified, 'run_id' is used as default.GCP
rdsa_utils.gcp.helpers.gcp_utils
Utility functions for interacting with Google Cloud Storage.
To initialise a client for GCS and configure it with a service account JSON key file, you can use the following code snippet:
from google.cloud import storage # Create a GCS client client = storage.Client.from_service_account_json('path/to/keyfile.json')
copy_file(client: storage.Client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str, overwrite: bool = False) -> bool
Copy a file from one GCS bucket to another.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required source_bucket_name
str
The name of the source bucket.
required source_object_name
str
The GCS object name of the source file.
required destination_bucket_name
str
The name of the destination bucket.
required destination_object_name
str
The GCS object name of the destination file.
required overwrite
bool
If True, overwrite the destination file if it already exists.
False
Returns:
Type Description bool
True if the file was copied successfully, otherwise False.
Examples:
>>> client = storage.Client() >>> copy_file( ... client, ... 'source-bucket', ... 'source_file.txt', ... 'destination-bucket', ... 'destination_file.txt' ... ) True
create_folder_on_gcs(client: storage.Client, bucket_name: str, folder_path: str) -> bool
Create a folder in a GCS bucket if it doesn't already exist.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required bucket_name
str
The name of the bucket where the folder will be created.
required folder_path
str
The name of the folder to create.
required Returns:
Type Description bool
True if the folder was created successfully or already exists, otherwise False.
Examples:
>>> client = storage.Client() >>> create_folder_on_gcs(client, 'mybucket', 'new_folder/') True
delete_file(client: storage.Client, bucket_name: str, object_name: str) -> bool
Delete a file from a GCS bucket.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required bucket_name
str
The name of the bucket from which the file will be deleted.
required object_name
str
The GCS object name of the file to delete.
required Returns:
Type Description bool
True if the file was deleted successfully, otherwise False.
Examples:
>>> client = storage.Client() >>> delete_file(client, 'mybucket', 'folder/gcs_file.txt') True
delete_folder(client: storage.Client, bucket_name: str, folder_path: str) -> bool
Delete a folder in a GCS bucket.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required bucket_name
str
The name of the GCS bucket.
required folder_path
str
The path of the folder to delete.
required Returns:
Type Description bool
True if the folder was deleted successfully, otherwise False.
Examples:
>>> client = storage.Client() >>> delete_folder(client, 'mybucket', 'path/to/folder/') True
download_file(client: storage.Client, bucket_name: str, object_name: str, local_path: str, overwrite: bool = False) -> bool
Download a file from a GCS bucket to a local directory.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required bucket_name
str
The name of the GCS bucket from which to download the file.
required object_name
str
The GCS object name of the file to download.
required local_path
str
The local file path where the downloaded file will be saved.
required overwrite
bool
If True, overwrite the local file if it exists.
False
Returns:
Type Description bool
True if the file was downloaded successfully, False otherwise.
Examples:
>>> client = storage.Client() >>> download_file( ... client, ... 'mybucket', ... 'folder/gcs_file.txt', ... '/path/to/download.txt' ... ) True
download_folder(client: storage.Client, bucket_name: str, prefix: str, local_path: str, overwrite: bool = False) -> bool
Download a folder from a GCS bucket to a local directory.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required bucket_name
str
The name of the GCS bucket from which to download the folder.
required prefix
str
The GCS prefix of the folder to download.
required local_path
str
The local directory path where the downloaded folder will be saved.
required overwrite
bool
If True, overwrite existing local files if they exist.
False
Returns:
Type Description bool
True if the folder was downloaded successfully, False otherwise.
Examples:
>>> client = storage.Client() >>> download_folder( ... client, ... 'mybucket', ... 'folder/subfolder/', ... '/path/to/local_folder', ... overwrite=False ... ) True
file_exists(client: storage.Client, bucket_name: str, object_name: str) -> bool
Check if a specific file exists in a GCS bucket.
Parameters:
Name Type Description Default client
Client
The GCS client.
required bucket_name
str
The name of the bucket.
required object_name
str
The GCS object name to check for existence.
required Returns:
Type Description bool
True if the file exists, otherwise False.
Examples:
>>> client = storage.Client() >>> file_exists(client, 'mybucket', 'folder/file.txt') True
get_table_columns(table_path) -> List[str]
Return the column names for given bigquery table.
is_gcs_directory(client: storage.Client, bucket_name: str, object_name: str) -> bool
Check if a GCS key is a directory by listing its contents.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required bucket_name
str
The name of the GCS bucket.
required object_name
str
The GCS object name to check.
required Returns:
Type Description bool
True if the key represents a directory, False otherwise.
list_files(client: storage.Client, bucket_name: str, prefix: str = '') -> List[str]
List files in a GCS bucket that match a specific prefix.
Parameters:
Name Type Description Default client
Client
The GCS client.
required bucket_name
str
The name of the bucket.
required prefix
str
The prefix to filter files, by default "".
''
Returns:
Type Description List[str]
A list of GCS object keys matching the prefix.
Examples:
>>> client = storage.Client() >>> list_files(client, 'mybucket', 'folder_prefix/') ['folder_prefix/file1.txt', 'folder_prefix/file2.txt']
load_config_gcp(config_path: str) -> Tuple[Dict, Dict]
Load the config and dev_config files to dictionaries.
Parameters:
Name Type Description Default config_path
str
The path of the config file in a yaml format.
required Returns:
Type Description Tuple[Dict, Dict]
The contents of the config files.
move_file(client: storage.Client, source_bucket_name: str, source_object_name: str, destination_bucket_name: str, destination_object_name: str) -> bool
Move a file within or between GCS buckets.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required source_bucket_name
str
The name of the source GCS bucket.
required source_object_name
str
The GCS object name of the source file.
required destination_bucket_name
str
The name of the destination GCS bucket.
required destination_object_name
str
The GCS object name of the destination file.
required Returns:
Type Description bool
True if the file was moved successfully, False otherwise.
Examples:
>>> client = storage.Client() >>> move_file( ... client, ... 'sourcebucket', ... 'source_folder/file.txt', ... 'destbucket', ... 'dest_folder/file.txt' ... ) True
remove_leading_slash(text: str) -> str
Remove the leading forward slash from a string if present.
Parameters:
Name Type Description Default text
str
The text from which the leading slash will be removed.
required Returns:
Type Description str
The text stripped of its leading slash.
Examples:
>>> remove_leading_slash('/example/path') 'example/path'
run_bq_query(query: str) -> bigquery.QueryJob
Run an SQL query in BigQuery.
table_exists(table_path: TablePath) -> bool
Check the big query catalogue to see if a table exists.
Returns True if a table exists. See code sample explanation here: https://cloud.google.com/bigquery/docs/samples/bigquery-table-exists#bigquery_table_exists-python
Parameters:
Name Type Description Default table_path
TablePath
The target BigQuery table name of form:
. . required Returns:
Type Description bool
Returns True if table exists and False if table does not exist.
upload_file(client: storage.Client, bucket_name: str, local_path: str, object_name: Optional[str] = None, overwrite: bool = False) -> bool
Upload a file to a GCS bucket from local directory.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required bucket_name
str
The name of the target GCS bucket.
required local_path
str
The file path on the local system to upload.
required object_name
Optional[str]
The target GCS object name. If None, uses the base name of the local file path.
None
overwrite
bool
If True, the existing file on GCS will be overwritten.
False
Returns:
Type Description bool
True if the file was uploaded successfully, False otherwise.
Examples:
>>> client = storage.Client() >>> upload_file( ... client, ... 'mybucket', ... '/path/to/file.txt', ... 'folder/gcs_file.txt' ... ) True
upload_folder(client: storage.Client, bucket_name: str, local_path: str, prefix: str = '', overwrite: bool = False) -> bool
Upload an entire folder from the local file system to a GCS bucket.
Parameters:
Name Type Description Default client
Client
The GCS client instance.
required bucket_name
str
The name of the bucket to which the folder will be uploaded.
required local_path
str
The path to the local folder to upload.
required prefix
str
The prefix to prepend to each object name when uploading to GCS.
''
overwrite
bool
If True, overwrite existing files in the bucket.
False
Returns:
Type Description bool
True if the folder was uploaded successfully, otherwise False.
Examples:
>>> client = storage.Client() >>> upload_folder( ... client, ... 'mybucket', ... '/path/to/local/folder', ... 'folder_prefix', ... True ... ) True
validate_bucket_name(bucket_name: str) -> str
Validate the format of a GCS bucket name according to GCS rules.
Parameters:
Name Type Description Default bucket_name
str
The name of the bucket to validate.
required Returns:
Type Description str
The validated bucket name if valid.
Raises:
Type Description InvalidBucketNameError
If the bucket name does not meet GCS specifications.
Examples:
>>> validate_bucket_name('valid-bucket-name') 'valid-bucket-name'
>>> validate_bucket_name('Invalid_Bucket_Name') InvalidBucketNameError: Bucket name must not contain underscores.
rdsa_utils.gcp.io.inputs
Read from BigQuery.
build_sql_query(table_path: TablePath, columns: Optional[Sequence[str]] = None, date_column: Optional[str] = None, date_range: Optional[Sequence[str]] = None, column_filter_dict: Optional[Dict[str, Sequence[str]]] = None, partition_column: Optional[str] = None, partition_type: Optional[str] = None, partition_value: Optional[Union[Tuple[str, str], str]] = None) -> str
Create SQL query to load data with the specified filter conditions.
Parameters:
Name Type Description Default spark
Spark session.
required table_path
TablePath
BigQuery table path in format "database_name.table_name".
required columns
Optional[Sequence[str]]
The column selection. Selects all columns if None passed.
None
date_column
Optional[str]
The name of the column to be used to filter the date range on.
None
date_range
Optional[Sequence[str]]
Sequence with two values, a lower and upper value for dates to load in.
None
column_filter_dict
Optional[Dict[str, Sequence[str]]]
A dictionary containing column: [values] where the values correspond to terms in the column that are to be filtered by.
None
partition_column
Optional[str]
The name of the column that the table is partitioned by.
None
partition_type
Optional[str]
The unit of time the table is partitioned by, must be one of: *
hour
*day
*month
*year
None
partition_value
Optional[Union[Tuple[str, str], str]]
The value or pair of values for filtering the partition column to.
None
Returns:
Type Description str
The string containing the SQL query.
read_table(spark: SparkSession, table_path: TablePath, columns: Optional[Sequence[str]] = None, date_column: Optional[str] = None, date_range: Optional[Sequence[str]] = None, column_filter_dict: Optional[Dict[str, Sequence[str]]] = None, run_id_column: Optional[str] = 'run_id', run_id: Optional[str] = None, flatten_struct_cols: bool = False, partition_column: Optional[str] = None, partition_type: Optional[BigQueryTimePartitions] = None, partition_value: Optional[Union[Tuple[str, str], str]] = None) -> SparkDF
Read BigQuery table given table path and column selection.
Parameters:
Name Type Description Default spark
SparkSession
Spark session.
required table_path
TablePath
The target BigQuery table name of form:
. . required columns
Optional[Sequence[str]]
The column selection. Selects all columns if None passed.
None
date_column
Optional[str]
The name of the column to be used to filter the date range on.
None
date_range
Optional[Sequence[str]]
Sequence with two values, a lower and upper value for dates to load in.
None
column_filter_dict
Optional[Dict[str, Sequence[str]]]
A dictionary containing column: [values] where the values correspond to terms in the column that are to be filtered by.
None
run_id_column
Optional[str]
The name of the column to be used to filter to the specified run_id.
'run_id'
run_id
Optional[str]
The unique identifier for a run within the table that the read data is filtered to.
None
partition_column
Optional[str]
The name of the column that the table is partitioned by.
None
partition_type
Optional[BigQueryTimePartitions]
The unit of time the table is partitioned by, must be one of: *
hour
*day
*month
*year
None
partition_value
Optional[Union[Tuple[str, str], str]]
The value or pair of values for filtering the partition column to.
None
flatten_struct_cols
bool
When true, any struct type columns in the loaded dataframe are replaced with individual columns for each of the fields in the structs.
False
Returns:
Type Description DataFrame
rdsa_utils.gcp.io.outputs
Write outputs to GCP.
write_table(df: Union[PandasDF, SparkDF], table_name: TablePath, mode: Literal['append', 'error', 'ignore', 'overwrite'] = 'error', partition_col: Optional[str] = None, partition_type: Optional[BigQueryTimePartitions] = None, partition_expiry_days: Optional[float] = None, clustered_fields: Optional[Union[str, List[str]]] = None) -> None
Write dataframe out to a Google BigQuery table.
In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function (default to throwing an exception). When mode is Overwrite, the schema of the DataFrame does not need to be the same as that of the existing table (the column order doesn't need be the same).
If you use the
df.printSchema()
method directly in a print/log statement the code is processed and printed regardless of logging level. Instead you need to capture the output and pass this to the logger. See explanation here - https://stackoverflow.com/a/59935109To learn more about the partitioning of tables and how to use them in BigQuery: https://cloud.google.com/bigquery/docs/partitioned-tables
To learn more about the clustering of tables and how to use them in BigQuery: https://cloud.google.com/bigquery/docs/clustered-tables
To learn more about how spark dataframes are saved to BigQuery: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/blob/master/README.md
Parameters:
Name Type Description Default df
Union[DataFrame, DataFrame]
The dataframe to be saved.
required table_name
TablePath
The target BigQuery table name of form:
. . required mode
Literal['append', 'error', 'ignore', 'overwrite']
Whether to overwrite or append to the BigQuery table. *
append
: Append contents of this :class:DataFrame
to table. *overwrite
: Overwrite existing data. *error
: Throw exception if data already exists. *ignore
: Silently ignore this operation if data already exists.'error'
partition_col
Optional[str]
A date or timestamp type column in the dataframe to use for the table partitioning.
None
partition_type
Optional[BigQueryTimePartitions]
The unit of time to partition the table by, must be one of: *
hour
*day
*month
*year
If
partition_col
is specified andpartition_type = None
then BigQuery will default to usingday
partition type.If 'partition_type
is specified and
partition_col = Nonethen the table will be partitioned by the ingestion time pseudo column, and can be referenced in BigQuery via either
_PARTITIONTIME as ptor
_PARTITIONDATE' as pd`.See https://cloud.google.com/bigquery/docs/querying-partitioned-tables for more information on querying partitioned tables.
None
partition_expiry_days
Optional[float]
If specified, this is the number of days (any decimal values are converted to that proportion of a day) that BigQuery keeps the data in each partition.
None
clustered_fields
Optional[Union[str, List[str]]]
If specified, the columns (up to four) in the dataframe to cluster the data by when outputting. The order the columns are specified is important as will be the ordering of the clustering on the BigQuery table.
See: https://cloud.google.com/bigquery/docs/querying-clustered-tables for more information on querying clustered tables.
None
Returns:
Type Description None
Helpers
rdsa_utils.helpers.pyspark
A selection of helper functions for building in pyspark.
calc_median_price(groups: Union[str, Sequence[str]], price_col: str = 'price') -> SparkCol
Calculate the median price per grouping level.
Parameters:
Name Type Description Default groups
Union[str, Sequence[str]]
The grouping levels for calculating the average price.
required price_col
str
Column name containing the product prices.
'price'
Returns:
Type Description Column
A single entry for each grouping level, and its median price.
convert_cols_to_struct_col(df: SparkDF, struct_col_name: str, struct_cols: Optional[Sequence[str]], no_struct_col_type: T.DataTypeSingleton = T.BooleanType(), no_struct_col_value: Any = None) -> SparkDF
Convert specified selection of columns to a single struct column.
As BigQuery tables do not take to having an empty struct column appended to them, this function will create a placeholder column to put into the struct column if no column names to combine are passed.
Parameters:
Name Type Description Default df
DataFrame
The input dataframe that contains the columns for combining.
required struct_col_name
str
The name of the resulting struct column.
required struct_cols
Optional[Sequence[str]]
A sequence of columns present in df for combining.
required no_struct_col_type
DataTypeSingleton
If no struct_cols are present, this is the type that the dummy column to place in the struct will be, default = BooleanType.
BooleanType()
no_struct_col_value
Any
If no struct_cols are present, this is the value that will be used in the dummy column, default = None.
None
Returns:
Type Description The input dataframe with the specified struct_cols dropped and replaced
with a single struct type column containing those columns.
Raises:
Type Description ValueError
If not all the specified struct_cols are present in df.
convert_struc_col_to_columns(df: SparkDF, convert_nested_structs: bool = False) -> SparkDF
Flatten struct columns in pyspark dataframe to individual columns.
Parameters:
Name Type Description Default df
DataFrame
Dataframe that may or may not contain struct type columns.
required convert_nested_structs
bool
If true, function will recursively call until no structs are left. Inversely, when false, only top level structs are flattened; if these contain subsequent structs they would remain.
False
Returns:
Type Description The input dataframe but with any struct type columns dropped, and in
its place the individual fields within the struct column as individual columns.
create_colname_to_value_map(cols: Sequence[str]) -> SparkCol
Create a column name to value MapType column.
create_spark_session(app_name: Optional[str] = None, size: Optional[Literal['small', 'medium', 'large', 'extra-large']] = None, extra_configs: Optional[Dict[str, str]] = None) -> SparkSession
Create a PySpark Session based on the specified size.
This function creates a PySpark session with different configurations based on the size specified.
The size can be 'default', 'small', 'medium', 'large', or 'extra-large'. Extra Spark configurations can be passed as a dictionary. If no size is given, then a basic Spark session is spun up.
Parameters:
Name Type Description Default app_name
Optional[str]
The spark session app name.
None
size
Optional[Literal['small', 'medium', 'large', 'extra-large']]
The size of the spark session to be created. It can be 'default', 'small', 'medium', 'large', or 'extra-large'.
None
extra_configs
Optional[Dict[str, str]]
Mapping of additional spark session config settings and the desired value for it. Will override any default settings.
None
Returns:
Type Description SparkSession
The created PySpark session.
Raises:
Type Description ValueError
If the specified 'size' parameter is not one of the valid options: 'small', 'medium', 'large', or 'extra-large'.
Exception
If any other error occurs during the Spark session creation process.
Examples:
>>> spark = create_spark_session('medium', {'spark.ui.enabled': 'false'})
Session Details:
'small': This is the smallest session that will realistically be used. It uses only 1g of memory and 3 executors, and only 1 core. The number of partitions are limited to 12, which can improve performance with smaller data. It's recommended for simple data exploration of small survey data or for training and demonstrations when several people need to run Spark sessions simultaneously. 'medium': A standard session used for analysing survey or synthetic datasets. Also used for some Production pipelines based on survey and/or smaller administrative data.It uses 6g of memory and 3 executors, and 3 cores. The number of partitions are limited to 18, which can improve performance with smaller data. 'large': Session designed for running Production pipelines on large administrative data, rather than just survey data. It uses 10g of memory and 5 executors, 1g of memory overhead, and 5 cores. It uses the default number of 200 partitions. 'extra-large': Used for the most complex pipelines, with huge administrative data sources and complex calculations. It uses 20g of memory and 12 executors, 2g of memory overhead, and 5 cores. It uses 240 partitions; not significantly higher than the default of 200, but it is best for these to be a multiple of cores and executors.
References
The session sizes and their details are taken directly from the following resource: "https://best-practice-and-impact.github.io/ons-spark/spark-overview/example-spark-sessions.html"
cut_lineage(df: SparkDF) -> SparkDF
Convert the SparkDF to a Java RDD and back again.
This function is helpful in instances where Catalyst optimizer is causing memory errors or problems, as it only tries to optimize till the conversion point.
Note: This uses internal members and may break between versions.
Parameters:
Name Type Description Default df
DataFrame
SparkDF to convert.
required Returns:
Type Description DataFrame
New SparkDF created from Java RDD.
Raises:
Type Description Exception
If any error occurs during the lineage cutting process, particularly during conversion between SparkDF and Java RDD or accessing internal members.
Examples:
>>> df = rdd.toDF() >>> new_df = cut_lineage(df) >>> new_df.count() 3
find_spark_dataframes(locals_dict: Dict[str, Union[SparkDF, Dict]]) -> Dict[str, Union[SparkDF, Dict]]
Extract SparkDF's objects from a given dictionary.
This function scans the dictionary and returns another containing only entries where the value is a SparkDF. It also handles dictionaries within the input, including them in the output if their first item is a SparkDF.
Designed to be used with locals() in Python, allowing extraction of all SparkDF variables in a function's local scope.
Parameters:
Name Type Description Default locals_dict
Dict[str, Union[DataFrame, Dict]]
A dictionary usually returned by locals(), with variable names as keys and their corresponding objects as values.
required Returns:
Type Description Dict
A dictionary with entries from locals_dict where the value is a SparkDF or a dictionary with a SparkDF as its first item.
Examples:
>>> dfs = find_spark_dataframes(locals())
get_window_spec(partition_cols: Optional[Union[str, Sequence[str]]] = None, order_cols: Optional[Union[str, Sequence[str]]] = None) -> WindowSpec
Return ordered and partitioned WindowSpec, defaulting to whole df.
Particularly useful when you don't know if the variable being used for partition_cols will contain values or not in advance.
Parameters:
Name Type Description Default partition_cols
Optional[Union[str, Sequence[str]]]
If present the columns to partition a spark dataframe on.
None
order_cols
Optional[Union[str, Sequence[str]]]
If present the columns to order a spark dataframe on (where order in sequence is order that orderBy is applied).
None
Returns:
Type Description WindowSpec
The WindowSpec object to be applied.
Usage
window_spec = get_window_spec(...)
F.sum(values).over(window_spec)
is_df_empty(df: SparkDF) -> bool
Check whether a spark dataframe contains any records.
load_csv(spark: SparkSession, filepath: str, keep_columns: Optional[List[str]] = None, rename_columns: Optional[Dict[str, str]] = None, drop_columns: Optional[List[str]] = None, **kwargs) -> SparkDF
Load a CSV file into a PySpark DataFrame.
spark Active SparkSession. filepath The full path and filename of the CSV file to load. keep_columns A list of column names to keep in the DataFrame, dropping all others. Default value is None. rename_columns A dictionary to rename columns where keys are existing column names and values are new column names. Default value is None. drop_columns A list of column names to drop from the DataFrame. Default value is None. kwargs Additional keyword arguments to pass to the
spark.read.csv
method.Returns:
Type Description DataFrame
PySpark DataFrame containing the data from the CSV file.
Notes
Transformation order: 1. Columns are kept according to
keep_columns
. 2. Columns are dropped according todrop_columns
. 3. Columns are renamed according torename_columns
.Raises:
Type Description Exception
If there is an error loading the file.
ValueError
If a column specified in rename_columns, drop_columns, or keep_columns is not found in the DataFrame.
Examples:
Load a CSV file with multiline and rename columns:
>>> df = load_csv( spark, "/path/to/file.csv", multiLine=True, rename_columns={"old_name": "new_name"} )
Load a CSV file with a specific encoding:
>>> df = load_csv(spark, "/path/to/file.csv", encoding="ISO-8859-1")
Load a CSV file and keep only specific columns:
>>> df = load_csv(spark, "/path/to/file.csv", keep_columns=["col1", "col2"])
Load a CSV file and drop specific columns:
>>> df = load_csv(spark, "/path/to/file.csv", drop_columns=["col1", "col2"])
Load a CSV file with custom delimiter and multiline:
>>> df = load_csv(spark, "/path/to/file.csv", sep=";", multiLine=True)
map_column_names(df: SparkDF, mapper: Mapping[str, str]) -> SparkDF
Map column names to the given values in the mapper.
If the column name is not in the mapper the name doesn't change.
melt(df: SparkDF, id_vars: Union[str, Sequence[str]], value_vars: Union[str, Sequence[str]], var_name: str = 'variable', value_name: str = 'value') -> SparkDF
Melt a spark dataframe in a pandas like fashion.
Parameters:
Name Type Description Default df
DataFrame
The pyspark dataframe to melt.
required id_vars
Union[str, Sequence[str]]
The names of the columns to use as identifier variables.
required value_vars
Union[str, Sequence[str]]
The names of the columns containing the data to unpivot.
required var_name
str
The name of the target column containing variable names (i.e. the original column names).
'variable'
value_name
str
The name of the target column containing the unpivoted data.
'value'
Returns:
Type Description DataFrame
The "melted" input data as a pyspark data frame.
Examples:
>>> df = spark.createDataFrame( ... [[1, 2, 3, 4], ... [5, 6, 7, 8], ... [9, 10, 11, 12]], ... ["col1", "col2", "col3", "col4"]) >>> melt(df=df, id_vars="col1", value_vars=["col2", "col3"]).show() +----+--------+-----+ |col1|variable|value| +----+--------+-----+ | 1| col2| 2| | 1| col3| 3| | 5| col2| 6| | 5| col3| 7| | 9| col2| 10| | 9| col3| 11| +----+--------+-----+
>>> melt(df=df, id_vars=["col1", "col2"], value_vars=["col3", "col4"] ... ).show() +----+----+--------+-----+ |col1|col2|variable|value| +----+----+--------+-----+ | 1| 2| col3| 3| | 1| 2| col4| 4| | 5| 6| col3| 7| | 5| 6| col4| 8| | 9| 10| col3| 11| | 9| 10| col4| 12| +----+----+--------+-----+
rank_numeric(numeric: Union[str, Sequence[str]], group: Union[str, Sequence[str]], ascending: bool = False) -> SparkCol
Rank a numeric and assign a unique value to each row.
The
F.row_number()
method has been selected as a method to rank as gives a unique number to each row. Other methods such asF.rank()
andF.dense_rank()
do not assign unique values per row.Parameters:
Name Type Description Default numeric
Union[str, Sequence[str]]
The column name or list of column names containing values which will be ranked.
required group
Union[str, Sequence[str]]
The grouping levels to rank the numeric column or columns over.
required ascending
bool
Dictates whether high or low values are ranked as the top value.
False
Returns:
Type Description Column
Contains a rank for the row in its grouping level.
select_first_obs_appearing_in_group(df: SparkDF, group: Sequence[str], date_col: str, ascending: bool) -> SparkDF
Rank and select observation in group based on earliest or latest date.
Given that there can be multiple observations per group, select observation that appears first or last (depending on whether ascending is set to True or False, respectively).
Parameters:
Name Type Description Default df
DataFrame
The input dataframe that contains the group and date_col.
required group
Sequence[str]
The grouping levels required to find the observation that appears first or last (depending on whether ascending is set to True or False, respectively)
required date_col
str
Column name containing the dates of each observation.
required ascending
bool
Dictates whether first or last observation within a grouping is selected (depending on whether ascending is set to True or False, respectively).
required Returns:
Type Description DataFrame
The input dataframe that contains each observation per group that appeared first or last (depending on whether ascending is set to True or False, respectively) according to date_col.
set_df_columns_nullable(df: SparkDF, column_list: List[str], nullable: Optional[bool] = True) -> SparkDF
Change specified columns nullable value.
Sometimes find that spark creates columns that have the nullable attribute set to False, which can cause issues if this dataframe is saved to a table as it will set the schema for that column to not allow missing values.
Changing this parameter for a column appears to be very difficult (and also potentially costly [see so answer comments] - SO USE ONLY IF NEEDED).
The solution implemented is taken from Stack Overflow post: https://stackoverflow.com/a/51821437
Parameters:
Name Type Description Default df
DataFrame
The dataframe with columns to have nullable attribute changed.
required column_list
List[str]
List of columns to change nullable attribute.
required nullable
Optional[bool]
The value to set the nullable attribute to for the specified columns.
True
Returns:
Type Description DataFrame
The input dataframe but with nullable attribute changed for specified columns.
to_list(df: SparkDF) -> List[Union[Any, List[Any]]]
Convert Spark DF to a list.
Returns:
Type Description list or list of lists
If the input DataFrame has a single column then a list of column values will be returned. If the DataFrame has multiple columns then a list of row data as lists will be returned.
to_spark_col(_func=None, *, exclude: Sequence[str] = None) -> Callable
Convert str args to Spark Column if not already.
Usage
Use as a decorator on a function.
To convert all string arguments to spark column
@to_spark_col def my_func(arg1, arg2)
To exclude a string arguments from being converted to a spark column
@to_spark_col(exclude=['arg2']) def my_func(arg1, arg2)
transform(self, f, *args, **kwargs)
Chain Pyspark function.
truncate_external_hive_table(spark: SparkSession, table_identifier: str) -> None
Truncate an External Hive table stored on S3 or HDFS.
Parameters:
Name Type Description Default spark
SparkSession
Active SparkSession.
required table_identifier
str
The name of the Hive table to truncate. This can either be in the format '
. ' or simply '
' if the current Spark session has a database set.
required Returns:
Type Description None
This function does not return any value. It performs an action of truncating the table.
Raises:
Type Description ValueError
If the table name is incorrectly formatted, the database is not provided when required, or if the table does not exist.
AnalysisException
If there is an issue with partition operations or SQL queries.
Exception
If there is a general failure during the truncation process.
Examples:
Truncate a Hive table named 'my_database.my_table':
>>> truncate_external_hive_table(spark, 'my_database.my_table')
Or, if the current Spark session already has a database set:
>>> spark.catalog.setCurrentDatabase('my_database') >>> truncate_external_hive_table(spark, 'my_table')
unpack_list_col(df: SparkDF, list_col: str, unpacked_col: str) -> SparkDF
Unpack a spark column containing a list into multiple rows.
Parameters:
Name Type Description Default df
DataFrame
Contains the list column to unpack.
required list_col
str
The name of the column which contains lists.
required unpacked_col
str
The name of the column containing the unpacked list items.
required Returns:
Type Description DataFrame
Contains a new row for each unpacked list item.
rdsa_utils.helpers.python
Miscellaneous helper functions for Python.
always_iterable_local(obj: Any) -> Callable
Supplement more-itertools
always_iterable
to also exclude dicts.By default it would convert a dictionary to an iterable of just its keys, dropping all the values. This change makes it so dictionaries are not altered (similar to how strings aren't broken down).
calc_product_of_dict_values(**kwargs: Mapping[str, Union[str, float, Iterable]]) -> Mapping[str, any]
Create cartesian product of values for each kwarg.
In order to create product of values, the values are converted to a list so that product of values can be derived.
Yields:
Type Description Next result of cartesian product of kwargs values.
Example
my_dict = { 'key1': 1, 'key2': [2, 3, 4] }
list(calc_product_of_dict_values(**my_dict))
[{'key1': 1, 'key2': 2}, {'key1': 1, 'key2': 3}, {'key1': 1, 'key2': 4}]
Notes
Modified from: https://stackoverflow.com/a/5228294
convert_date_strings_to_datetimes(start_date: str, end_date: str) -> Tuple[pd.Timestamp, pd.Timestamp]
Convert start and end dates from strings to timestamps.
Parameters:
Name Type Description Default start_date
str
Datetime like object which is used to define the start date for filter. Acceptable string formats include (but not limited to): MMMM YYYY, YYYY-MM, YYYY-MM-DD, DD MMM YYYY etc. If only month and year specified the start_date is set as first day of month.
required end_date
str
Datetime like object which is used to define the start date for filter. Acceptable string formats include (but not limited to): MMMM YYYY, YYYY-MM, YYYY-MM-DD, DD MMM YYYY etc. If only month and year specified the end_date is set as final day of month.
required Returns:
Type Description tuple[Timestamp, Timestamp]
Tuple where the first value is the start date and the second the end date.
extend_lists(sections: List[List[str]], elements_to_add: List[str]) -> None
Check list elements are unique then append to existing list.
Note the
.extend
method in Python overwrites each section. There is no need to assign a variable to this function, the section will update automatically.The function can be used with the
load_config
function to extend a value list in a config yaml file. For example with aconfig.yaml
file as per below:input_columns - col_a - col_b output_columns - col_b
To add column
col_c
the function can be utilised as follows:config = load_config("config.yaml") sections = [config['input_columns'], config['output_columns']] elements_to_add = ['col_c'] extend_lists(sections, elements_to_add)
The output will be as follows.
input_columns - col_a - col_b - col_c output_columns - col_b - col_c
Parameters:
Name Type Description Default sections
List[List[str]]
The section to be updated with the extra elements.
required elements_to_add
List[str]
The new elements to add to the specified sections.
required Returns:
Type Description None
Note the
.extend
method in Python overwrites the sections. There is no need to assign a variable to this function, the section will update automatically.list_convert(obj: Any) -> List[Any]
Convert object to list using more-itertools'
always_iterable
.overwrite_dictionary(base_dict: Mapping[str, Any], override_dict: Mapping[str, Any]) -> Dict[str, Any]
Overwrite dictionary values with user defined values.
The following restrictions are in place: * base_dict and override_dict have the same value which is not dictionary then override_dict has priority. * If base_dict contains dictionary and override_dict contains a value (e.g. string or list) with the same key, priority is upon base_dict and the override_dict value is ignored. * If key is in override_dict but not in base_dict then an Exception is raised and code stops. * Any other restrictions will require code changes.
Parameters:
Name Type Description Default base_dict
Mapping[str, Any]
Dictionary containing existing key value pairs.
required override_dict
Mapping[str, Any]
Dictionary containing new keys/values to inset into base_dict.
required Returns:
Type Description Dict[str, Any]
The base_dict with any keys matching the override_dict being replaced. Any keys not present in base_dict are appended.
Example
dic1 = {"var1": "value1", "var2": {"var3": 1.1, "var4": 4.4}, "var5": [1, 2, 3]} dic2 = {"var2": {"var3": 9.9}}
overwrite_dictionary(dic1, dic2) {'var1': 'value1', 'var2': {'var3': 9.9, 'var4': 4.4}, 'var5': [1, 2, 3]}
dic3 = {"var2": {"var3": 9.9}, "var6": -1} overwrite_dictionary(dic1, dic3) ERROR main: ('var6', -1) not in base_dict
Notes
Modified from: https://stackoverflow.com/a/58742155
Warning
Due to recursive nature of function, the function will overwrite the base_dict object that is passed into the original function.
Raises:
Type Description ValueError
If a key is present in override_dict but not base_dict.
tuple_convert(obj: Any) -> Tuple[Any]
Convert object to tuple using more-itertools'
always_iterable
.IO
rdsa_utils.io.config
Module for code relating to loading config files.
LoadConfig(config_path: Union[CloudPath, Path], config_overrides: Optional[Config] = None, config_type: Optional[Literal['json', 'toml', 'yaml']] = None, config_validators: Optional[Dict[str, BaseModel]] = None)
Class for loading and storing a configuration file.
Attributes:
Name Type Description config
The loaded config stored as a dictionary.
config_dir
The logical parent directory of loaded
config_path
.config_file
The file name of the loaded
config_path
.config_original
The configuration dictionary as initially loaded, prior to applying any overrides or validation.
config_overrides
The configuration override dictionary, if provided.
config_path
The path of the loaded config file.
config_type
The file type of the loaded config file.
config_validators
The validators used to validate the loaded config, if provided.
**attrs
Every top level key in the loaded config is also set as an attribute to allow simpler access to each config section.
Init method.
Parameters:
Name Type Description Default config_path
Union[CloudPath, Path]
The path of the config file to be loaded.
required config_overrides
Optional[Config]
A dictionary containing a subset of the keys and values of the config file that is initially loaded, by default None. If values are provided that are not in the initial config then a ConfigError is raised.
None
optional
Optional[Config]
A dictionary containing a subset of the keys and values of the config file that is initially loaded, by default None. If values are provided that are not in the initial config then a ConfigError is raised.
None
config_type
Optional[Literal['json', 'toml', 'yaml']]
The file type of the config file being loaded, by default None. If not specified then this is inferred from the
config_path
.None
optional
Optional[Literal['json', 'toml', 'yaml']]
The file type of the config file being loaded, by default None. If not specified then this is inferred from the
config_path
.None
config_validators
Optional[Dict[str, BaseModel]]
A dictionary made up of key, value pairs where the keys refer to the top level sections of the loaded config, and the values are a pydantic validation class for the section, by default None. If only some of the keys are specified with validators, warnings are raised to alert that they have not been validated.
None
optional
Optional[Dict[str, BaseModel]]
A dictionary made up of key, value pairs where the keys refer to the top level sections of the loaded config, and the values are a pydantic validation class for the section, by default None. If only some of the keys are specified with validators, warnings are raised to alert that they have not been validated.
None
rdsa_utils.io.input
Module containing generic input functionality code.
parse_json(data: str) -> Config
Parse JSON formatted string into a dictionary.
Parameters:
Name Type Description Default data
str
String containing standard JSON-formatted data.
required Returns:
Type Description Config
A dictionary containing the parsed data.
Raises:
Type Description JSONDecodeError
If the string format of config_overrides cannot be decoded by json.loads (i.e. converted to a dictionary).
parse_toml(data: str) -> Config
Parse TOML formatted string into a dictionary.
Parameters:
Name Type Description Default data
str
String containing standard TOML-formatted data.
required Returns:
Type Description Config
A dictionary containing the parsed data.
parse_yaml(data: str) -> Config
Parse YAML formatted string into a dictionary.
Parameters:
Name Type Description Default data
str
String containing standard YAML-formatted data.
required Returns:
Type Description Config
A dictionary containing the parsed data.
read_file(file: Union[CloudPath, Path]) -> str
Load contents of specified file.
Parameters:
Name Type Description Default file
Union[CloudPath, Path]
The absolute file path of the file to be read.
required Returns:
Type Description str
The contents of the provided file.
Raises:
Type Description FileNotFoundError
If the provided file does not exist.
rdsa_utils.io.output
Module containing generic output functionality code.
zip_folder(source_dir: str, output_filename: str, overwrite: bool = False) -> bool
Zip the contents of the specified directory.
Parameters:
Name Type Description Default source_dir
str
The directory whose contents are to be zipped.
required output_filename
str
The output zip file name. It must end with '.zip'.
required overwrite
bool
If True, overwrite the existing zip file if it exists. Default is False.
False
Returns:
Type Description bool
True if the directory was zipped successfully, False otherwise.
Examples:
>>> zip_folder('/path/to/source_dir', 'output.zip', overwrite=True) True
Methods
rdsa_utils.methods.averaging_methods
Weighted and unweighted averaging functions.
get_weight_shares(weights: str, levels: Optional[Union[str, Sequence[str]]] = None) -> SparkCol
Divide weights by sum of weights for each group.
unweighted_arithmetic_average(val: str) -> SparkCol
Calculate the unweighted arithmetic average.
unweighted_geometric_average(val: str) -> SparkCol
Calculate the unweighted geometric average.
weighted_arithmetic_average(val: str, weight: str) -> SparkCol
Calculate the weighted arithmetic average.
weighted_geometric_average(val: str, weight: str) -> SparkCol
Calculate the weighted geometric average.