dataquality.utils package#

Subpackages#

Submodules#

dataquality.utils.arrow module#

save_arrow_file(location, file_name, data)#

Helper function to save a dictionary as an arrow file that can be read by vaex

Return type:

None

dataquality.utils.auth module#

headers(token)#

Return authorization headers for API requests

Return type:

Dict[str, str]

dataquality.utils.auto module#

sample_dataset_dict(dd, dataset_config, max_train_size=None)#

Samples the dataset dict to the max train size

A few important notes: - If max train size is greater than the dataset size, we don’t sample - If max train size is None we also don’t sample - We set max eval size to be 25% of max train size - Test and inference data are not sampled

Return type:

DatasetDict

get_meta_cols(cols, reserved_cols=None)#

Returns the meta columns of a dataset.

Return type:

List[str]

load_data_from_str(data)#

Loads string data from either hf or disk.

The string corresponds to either a path to a local file or a path to a remote huggingface Dataset that we load with load_dataset

We can tell what it is based on if there’s an extension at the end of the file

Return type:

Union[DataFrame, Dataset]

add_val_data_if_missing(dd, task_type=None)#

Splits user provided training data if missing

We need validation data in order to train a model properly, and it’s required to enable early stopping. If the user didn’t provide that val data, simply split their train data 80/20 for validation data.

If there is test data, we can use the test as our val data (because that’s also pretty common)

Return type:

DatasetDict

run_name_from_hf_dataset(name)#
Return type:

str

get_task_type_from_data(hf_data=None, train_data=None)#

Determines the task type of the dataset by the dataset contents

Text classification will have text and label and NER will have tokens and tags/ner_tags

We know that one of these two parameters will be not None because that is validated before calling this function. See dq.auto

Return type:

TaskType

set_global_logging_level(level=40, prefices=None)#

Override logging levels of different modules based on their name as a prefix. It needs to be invoked after the modules have been loaded so that their loggers have been initialized.

Src: https://github.com/huggingface/transformers/issues/3050#issuecomment-682167272

Parameters:
  • level (-) – desired level. e.g. logging.INFO. Optional. Default is logging.ERROR

  • prefices (-) – list of one or more str prefices to match (e.g. [“transformers”, “torch”]). Optional. Default is [“”] to match all active loggers. The match is a case-sensitive module_name.startswith(prefix)

Return type:

None

add_class_label_to_dataset(ds, labels=None)#

Map a not ClassLabel ‘label’ column to a ClassLabel, if possible

Return type:

Dataset

dataquality.utils.auto_trainer module#

do_train(trainer, encoded_data, wait, create_data_embs=None)#
Return type:

Trainer

dataquality.utils.cuda module#

cuml_available()#
Return type:

bool

get_pca_embeddings(embs)#

Uses Cuda and GPUs to create the PCA embeddings before uploading

Should only be called if cuda ML available (cuda_available())

Returns the PCA embeddings, the components_ of the pca model, and the mean_ of the pca model

Return type:

Tuple[ndarray, ndarray, ndarray]

get_umap_embeddings(embs)#

Uses Cuda and GPUs to create the UMAP embeddings before uploading

Should only be called if cuda ML available (cuda_available())

Return type:

ndarray

dataquality.utils.cv module#

dataquality.utils.cv_smart_features module#

analyze_image_smart_features(image_path, hasher=None)#

Evaluate if the image contains anomalies (too blurry, over exposed, etc) and return a dictionary storing a quantitative value for every such feature. These values have to be further thresholded (in the next method) to return a boolean.

If no hasher is passed, the hash will be set to the empty string “”.

Return type:

Dict[str, Any]

analyze_image_smart_features_wrapper(hasher=None)#

Wrapper around analyze_image_smart_features to allow calling with only the path argument and making it easier to parallelize.

Return type:

Callable

generate_smart_features(in_frame, n_cores=-1)#

Create and return a dataframe containing the smart features on images (blurriness, contrast, etc).

Can run in parallel if n_cores is specified and different than 1. To use all available cores set n_cores = -1.

Return type:

DataFrame

dataquality.utils.dq_logger module#

class CustomSplitAdapter(logger, extra=None)#

Bases: LoggerAdapter

This adapter appends the split to the message, if found. Otherwise, “None”

Adapted from https://docs.python.org/3/howto/logging-cookbook.html (CustomAdapter)

Initialize the adapter with a logger and a dict-like object which provides contextual information. This constructor signature allows easy stacking of LoggerAdapters, if so desired.

You can effectively pass keyword arguments as shown in the following example:

adapter = LoggerAdapter(someLogger, dict(p1=v1, p2=”v2”))

process(msg, kwargs)#

Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.

Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.

Return type:

Tuple[str, Any]

get_dq_logger()#

Returns the dq logger for the current run_id

Return type:

CustomSplitAdapter

dq_log_file_path(run_id=None)#
Return type:

str

remove_dq_log_file(run_id=None)#
Return type:

None

dq_log_object_name(project_id, run_id)#

Returns the minio/s3 object name

Return type:

str

upload_dq_log_file()#
Return type:

None

get_dq_log_file(project_name=None, run_name=None)#
Return type:

Optional[str]

dataquality.utils.dqyolo module#

extract_value(arguments, key)#

Extract the value of the key from the arguments.

Parameters:
  • arguments (List[str]) – The arguments of the command line.

  • key (str) – The key to extract.

Return type:

Optional[str]

Returns:

The value of the key.

get_dataset_path(arguments)#

Extract the dataset path from the arguments of yolo.

Parameters:

arguments (List) – The arguments of ultralytics yolo.

Return type:

str

Returns:

The path to the dataset.

get_model_path(arguments)#

Extract the dataset path from the arguments of yolo.

Parameters:

arguments (List) – The arguments of ultralytics yolo.

Return type:

str

Returns:

The path to the dataset.

get_iou_thres(arguments)#

Extract the iou threshold from the arguments of yolo.

Parameters:

arguments (List) – The arguments of ultralytics yolo.

Return type:

float

Returns:

The iou threshold.

get_conf_thres(arguments)#

Extract the confidence threshold from the arguments of yolo.

Parameters:

arguments (List) – The arguments of ultralytics yolo.

Return type:

float

Returns:

The confidence threshold.

find_last_run(files_start, files_end)#

Find the path of the last run. This assumes that the path is the only thing that has changed.

Parameters:
  • files_start (List) – The list of files before the run.

  • files_end (List) – The list of files after the run.

Return type:

str

Returns:

The path to the last run.

validate_args(arguments)#

Validate the arguments of the command line.

Parameters:

arguments (List) – The arguments of the command line.

Return type:

None

dataquality.utils.emb module#

get_concat_emb_df(run_dir, split_epoch)#

Returns the concatenated embedding df for all available (non inf) splits

Return type:

DataFrame

save_processed_emb_dfs(df_emb, split_epoch, run_dir)#

Saves the concatenated embeddings to their respective split locations

The df has a column that has the split name, so we filter the dataframe on split, and write it back to where we read it from, but now containing the pca and xy embeddings

Return type:

None

apply_umap_to_embs(run_dir, last_epoch)#

In the event that the user has Nvidia cuml installed, we apply UMAP locally

We take advantage of the user’s GPU to apply UMAP significantly faster than if we were to do it on the Galileo server

We want to only apply UMAP (and PCA) to the final epoch of the split (or the last_epoch if specified).

Return type:

None

upload_umap_data_embs(project_id, run_id, input_data_dir, run_dir, last_epoch, data_embs_col)#

Given the location to _all_ input text, create and upload the data embs

Read in all of the text samples from all splits, create the data embeddings, apply PCA over all samples, then UMAP, then upload each split to its respective location in object storage.

Data embeddings are _always_ stored in the final epoch of the dataset, so we use get_last_epoch_for_splits to find that for each split. Similarly, for inference, we store it in the inference name. So when the split is inference, we further filter the dataframe by inference name and upload the df.

Return type:

None

np_to_pa(embs)#

Converts a numpy column to pyarrow array

Return type:

ListArray

convert_np_to_pa(df, col)#

Vaex safe conversion of np_to_pa above

This function allows us to safely convert a high dimensional numpy array to a pyarrow array. Since HDF5 files cannot store multi-dimensional numpy arrays, in certain cases, such as embeddings in Seq2Seq, we must store them as pyarrow arrays.

We convert the column in a memory safe way using a vaex register function.

Return type:

DataFrame

convert_pa_to_np(df, col)#

Converts a pyarrow array col to numpy column

This function allows us to safely convert a high dimensional pyarrow array to a numpy array. It is the inverse of convert_np_to_pa and is used when calculating things such as similar_to on the PCA embeddings, which assumes numpy arrays.

We convert the column in a memory safe way using a vaex register function.

While this fn is built primarily for 2d arrays, it will work for any dimensional array.

Return type:

DataFrame

dataquality.utils.file module#

get_extension_for_dir(dir_)#

Returns the file extension of all files in the directory

Raises exception if there are no files in the directory or if there are multiple file extensions

Return type:

str

get_file_extension(path)#

Returns the file extension

Return type:

str

get_largest_epoch_for_split(split_dir, last_epoch)#

Gets the latest epoch that is largest in size

In the event of early stopping, the last split won’t be the same size as the others. This checks that, and returns the last epoch if no early stopping occured, otherwise the second to last epoch

Return type:

int

get_largest_epoch_for_splits(run_dir, last_epoch)#

For each available (non inf) split, return the largest epoch in terms of bytes

The ‘largest’ epoch is the last epoch in the split, unless early stopping occurred, in which case it’s the second to last epoch

Parameters:
  • run_dir (str) – The location on disk to the run data

  • last_epoch (Optional[int]) – The user can optionally tell us to only upload up to a specific epoch. If they did, this will indicate that

Return type:

Dict[str, int]

get_last_epoch_for_splits(run_dir, last_epoch)#

For each available (non inf) split, return the last epoch of training

If last_epoch is provided, consider that as the last (if it exists)

Return type:

Dict[str, int]

dataquality.utils.hdf5_store module#

class HDF5Store(datapath, dataset, shape, dtype=<class 'numpy.float32'>, compression=None, chunk_len=1)#

Bases: object

Simple class to append value to a hdf5 file on disc.

Used to concatenate HDF5 files for vaex

Params:

datapath: filepath of h5 file dataset: dataset name within the file shape: dataset shape (not counting main/batch axis) dtype: numpy dtype

Usage:

hdf5_store = HDF5Store(‘/tmp/hdf5_store.h5’,’X’, shape=(20,20,3)) x = np.random.random(hdf5_store.shape) hdf5_store.append(x) hdf5_store.append(x)

Adapted From https://gist.github.com/wassname/a0a75f133831eed1113d052c67cf8633

append(values)#
Return type:

None

concat_hdf5_files(location, prob_only)#

Concatenates all hdf5 in a directory using an HDF5 store

Vaex stores a dataframe as an hdf5 file in a predictable format using groups

Each column gets its own group, following “/table/columns/{col}/data

We can exploit that by concatenating our datasets with that structure, so vaex can open the final file as a single dataframe

Parameters:
  • location (str) – The directory containing the files

  • prob_only (bool) – If True, only the id, prob, and gold columns will be concatted

Return type:

List[str]

dataquality.utils.helpers module#

check_noop(func)#

Checks if GALILEO_DISABLED is set. If so, skip the function call

https://peps.python.org/pep-0612/

Return type:

Callable[[ParamSpec(P)], Optional[TypeVar(T)]]

galileo_disabled()#
Return type:

bool

disable_galileo()#
Return type:

None

enable_galileo()#
Return type:

None

galileo_verbose_logging()#
Return type:

bool

enable_galileo_verbose()#
Return type:

None

disable_galileo_verbose()#
Return type:

None

wrap_fn(exist_func, hook_fn)#

Hook a function to a function call :type exist_func: Callable :param exist_func: The function to hook :type hook_fn: Callable :param hook_fn: The hook function :rtype: Callable :return: The hooked function

Example: # example debugger def myobserver(orig_func, *args, **kwargs):

# ———————– # Your logic goes here # ———————– print(“debugging xyz”) return orig_func(*args, **kwargs)

# hook the function

example_class.func = hook(example_class.func, myobserver)

map_indices_to_ids(id_map, indices)#

Maps the indices to the ids :type id_map: List :param id_map: The list used for mapping indices to ids :type indices: List :param indices: The indices to map :rtype: List :return: The ids

open_console_url(link='')#

Tries to open the console url in the browser, if possible.

This will work in local environments like jupyter or a python script, but won’t work in colab (because colab is running on a server, so there’s no “browser” to interact with). This also prints out the link for users to click so even in those environments they still have something to interact with.

Return type:

None

gpu_available()#
Return type:

bool

mps_available()#

Checks for an MPS compatible GPU on Apple machines.

This will enabled Metal acceleration for model training when supported.

Return type:

bool

has_len(arr)#

Checks if an array has length

Array can be list, numpy array, or tensorflow tensor. Tensorflow tensors don’t let you call len(), they throw a TypeError so we catch that here and check shape https://github.com/tensorflow/tensorflow/blob/master/tensorflow/… python/framework/ops.py#L929

Return type:

bool

dataquality.utils.hf_images module#

process_hf_image_feature_for_logging(dataset, imgs_colname)#
Return type:

Dataset

process_hf_image_paths_for_logging(dataset, imgs_location_colname)#
Return type:

Dataset

dataquality.utils.hf_tokenizer module#

extract_gold_spans_at_word_level(gold_sequence, schema)#

Extracts span level words from a gold sequence

Given a gold sequence [O, O, B-PER, I-PER, I-PER, …] -> extracts out spans at the word level

gold_sequence = [‘B-LOC’, ‘I-LOC’, ‘I-LOC’, ‘I-LOC’, ‘I-LOC’] gold_spans = extract_gold_spans_at_word_level(gold_sequence) # gold_spans -> [{‘start’: 0, ‘end’: 5, ‘label’: ‘LOC’}]

Return type:

List[Dict]

class LabelTokenizer(ds, tokenizer, schema, label_names)#

Bases: object

Class that allows Galileo users to directly tokenize their data using a provided HF tokenizer, infers the schema based on provided labels, and also align the labels for the tokenized data. Galileo automatically extracts out the required input data and logs it.

initialize_sample(k)#
Return type:

None

update_text_token_indices(k, w_index_bpe, wid)#
Return type:

bool

adjust_labels_bpe(wid, w_index_bpe)#
Return type:

None

update_totals_for_sample(k)#
Return type:

None

update_tokenized_samples()#
Return type:

None

dataquality.utils.imports module#

torch_available()#
Return type:

bool

hf_available()#
Return type:

bool

tf_available()#
Return type:

bool

dataquality.utils.jsl module#

dataquality.utils.keras module#

build_empty_model()#
Return type:

Model

generate_indices(x, **kwargs)#

Generates the indices for the training and validation data. :param x_len: The length of the training or validation data. :type kwargs: Any :param kwargs: The arguments to the fit method. Kwargs needed are:

  • batch_size

  • epochs

  • steps_per_epoch

  • class_weight

  • sample_weight

  • initial_epoch

Return type:

Dict[str, Any]

Returns:

A dictionary of epochs/steps including the indices.

generate_split(x_len, kwargs)#
Return type:

Dict

combine_with_default_kwargs(func, args=(), kwargs={})#

Combines the default kwargs with the provided kwargs. While incoperating the args. :param signature: The signature of the function. :type args: Tuple :param args: The args to the function. :type kwargs: Dict[str, Any] :param kwargs: The kwargs to the function. :rtype: Dict[str, Any] :return: A dictionary of the combined kwargs.

get_x_len(x)#
Return type:

Optional[int]

patch_proxy_call(input, obj, *args, **kwargs)#

Call the patched layer method (before call and after call). :type input: Tensor :param input: The input to the layer. :type obj: Layer :param obj: The layer object. :type args: Tuple :param args: The arguments to the layer. :type kwargs: Any :param kwargs: The keyword arguments to the layer. :rtype: Any :return: The output of the layer.

patch_layer_call(layer, before_call=None, after_call=None)#

Patch the layer call method to add before and after call hooks. :type layer: Layer :param layer: The layer to patch. :type before_call: Optional[Callable] :param before_call: The before call hook. :type after_call: Optional[Callable] :param after_call: The after call hook.

Return type:

None

save_input(store, layer, input, *args, **kwargs)#

Save the input to the store. :type store: Dict[str, Any] :param store: The store to save the input to. :type layer: Layer :param layer: The layer that is being called. :type input: Tensor :param input: The input to the layer. :type args: Tuple :param args: The arguments to the layer. :type kwargs: Any :param kwargs: The keyword arguments to the layer.

Return type:

None

save_output(store, layer, input, output)#

Save the output to the store. :type store: Dict[str, Any] :param store: The store to save the output to. :type layer: Layer :param layer: The layer that is being called. :type input: Tensor :param input: The input to the layer. :type output: Tensor :param output: The output of the layer.

Return type:

None

get_batch_size(dataset)#

Get the batch size of a dataset. If the dataset is nested, recursivly check for _batch_size.

Return type:

Optional[int]

add_indices(dataset)#

Add indices to a TensorFlow dataset. :type dataset: Union[DatasetV2, Tensor] :param dataset: The dataset to add indices to. :rtype: DatasetV2 :return: A zipped dataset with indices in the second column.

class FixDistributedDatasetPatch#

Bases: Patch

Fixes the dataset patch in TensorFlow 2.13.0

dataquality.utils.ml module#

compute_confidence(probs)#

Compute the confidences for a prob array

Where confidence is the max prob for a given token

probs - [n_tokens, n_classes]

Return type:

ndarray

Return:#

:
  • confidences [n_tokens]: confidence per token

compute_nll_loss(probs, gold_labels)#

Compute the NLLLoss for each token in probs

Assumes for now that probs is a matrix of probability vectors per token, NOT the logits. Thus we have to take the log since Negative Log-Likelihood Loss expects log probs.

probs - [n_tokens, n_classes] gold_labels - [n_tokens], each element is index of gold label for that token

Return type:

ndarray

Return:#

:
  • loss [n_tokens]: The loss per NER token

select_span_token_for_prob(probs, method, gold_labels=None)#

Select the representative token for a span’s prob vector

Based on the method provided, compute that metric for each of the NER tokens and then select the “representative” token.

probs - [n_tokens, n_classes] method - NERProbMethod (confidence or loss) gold_labels - [n_tokens], each element is index of gold label for that token

Return type:

Tuple[ndarray, Optional[int]]

Return:#

:
  • prob_token: Probability vector for selected token - shape[n_classes]

  • gold_label: The gold label index for the selected token (if method is loss)

dataquality.utils.name module#

validate_name(name, assign_random=False)#

Validates project/run name ensuring only letters, numbers, space, - and _

If no name is provided, a random name is generated

Return type:

str

random_name()#
Return type:

str

dataquality.utils.od module#

Utils for Object Detection

scale_boxes(bboxes, img_size)#

Normalizes boxes to image size

Return type:

ndarray

convert_cxywh_xyxy(bboxes)#

Converts center point xywh boxes to xyxy. Can be in either int coords or 0-1

Return type:

ndarray

convert_tlxywh_xyxy(bboxes)#

Converts top left xywh boxes to xyxy. Can be in either integer coords or 0-1

Return type:

ndarray

filter_arrays_and_concatenate(arrays, empty_dim=None)#

Filters out empty arrays and concatenates them

Return type:

ndarray

dataquality.utils.patcher module#

class RefManager(cleanup_func)#

Bases: object

Manage weak references to objects and call a cleanup function when the object is garbage collected.

Initialize the cleanup manager. :type cleanup_func: Callable :param cleanup_func: The function to call when an object is garbage collected.

register(instance)#

Register an object with the cleanup manager. :type instance: Any :param instance: The object to register.

Return type:

None

class Cleanup(cleanup_manager)#

Bases: object

Base class for objects that need to be cleaned up when they are garbage collected.

Register this object with the cleanup manager. :type cleanup_manager: RefManager :param cleanup_manager: The cleanup manager to register with.

class Borg#

Bases: object

Borg class making class attributes global

class PatchManager#

Bases: Borg

Class to manage patches

Class to manage patches

add_patch(patch)#

Add patch to list of patches :type patch: Patch :param patch: patch to add

Return type:

None

unpatch()#

Unpatch all patches

Return type:

None

class Patch#

Bases: ABC

is_patch = True#
manager = <dataquality.utils.patcher.PatchManager object>#
name = 'patch'#
patch()#

Patch function. Call _patch function and adds patch to manager.

Return type:

None

unpatch()#

Unpatch function. Call _unpatch function and removes patch from manager.

Return type:

None

dataquality.utils.profiler module#

parse_exception_ipython(etype, evalue, tb)#

Parse exception for IPython

Return type:

Dict

parse_exception(etype, evalue, tb)#

Parse exception for Python

Return type:

Dict

get_device_info()#

Get device info. For example, Operating system, Python version, etc.

Return type:

Dict

change_function(func, handle_request)#

Change function to hook into the exception in python

Return type:

Callable

exception_from_error(error)#

Get stacktrace from an exception

Return type:

Tuple[Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]]

dataquality.utils.setfit module#

class DataSampleLogArgs(split, inference_name=None, meta=<factory>, texts=<factory>, ids=<factory>, labels=<factory>)#

Bases: object

split: Split#
inference_name: Optional[str] = None#
meta: Dict#
texts: List[str]#
ids: List[int]#
labels: List#
clear()#

Resets the arrays of the class.

Return type:

None

log_preds_setfit(model, dataset, split, dq_store, batch_size, meta=None, inference_name=None, return_preds=False, epoch=None)#

Logs the data samples and model outputs for a SetFit model. :type model: SetFitModel :param model: The SetFit model :type dataset: Dataset :param dataset: The dataset in the form of a HuggingFace Dataset :type split: Split :param split: The split of the data samples (for example “training”) :type dq_store: Dict :param dq_store: The dataquality store :type batch_size: int :param batch_size: The batch size :type inference_name: Optional[str] :param inference_name: The name of the inference (for example “inference_run_1”) :type return_preds: bool :param return_preds: Whether to return the predictions :rtype: Tensor :return: The predictions

validate_setfit(setfit, labels, batch_size=None, meta=None)#

Validates a SetFit model. :type setfit: Union[SetFitModel, SetFitTrainer] :param setfit: The SetFit model or trainer :type labels: List[str] :param labels: The labels of the model :param wait: Whether to wait for the run to finish :type batch_size: Optional[int] :param batch_size: The batch size :type meta: Optional[List[str]] :param meta: The meta columns

Return type:

None

class SetFitModelHook(setfit_model, store=None, func_name='predict_proba', n_labels=None)#

Bases: Patch

Hook to SetFit model to store input and output of predict_proba function.

Hook to SetFit model to store input and output of predict_proba function. :type setfit_model: Any :param setfit_model: SetFit model :type store: Optional[Dict] :param store: dictionary to store input and output :type func_name: str :param func_name: name of function to hook :type n_labels: Optional[int] :param n_labels: number of labels

name = 'setfit_predict'#
get_trainer(dd, model_checkpoint, training_args)#
Return type:

SetFitTrainer

dataquality.utils.task_helpers module#

get_task_type(task_type=None)#
Return type:

TaskType

dataquality.utils.tf module#

is_tf_2()#
Return type:

bool

dataquality.utils.thread_pool module#

class ThreadPoolManager#

Bases: object

A class for managing the threaded logging calls throughout dataquality

THREADS: List[Thread] = []#
MAX_THREADS = 4#
static add_thread(target, args=None)#

Start a new function in a thread and store that in the global list of threads

Parameters:
  • target (Callable) – The callable

  • args (Optional[Iterable[Any]]) – The arguments to the function

Return type:

None

static wait_for_threads()#

Joins all currently active threads and waits for all to be done

Return type:

None

static wait_for_thread()#

Waits for an open slot in the ThreadPool for another thread.

Return type:

None

dataquality.utils.torch module#

cleanup_cuda(model=None, optimizer=None, tensors=None)#

Cleanup cuda memory

Delete unused variables to free CUDA memory to ensure that dq.finish() does not run into out-of-memory errors.

Return type:

None

class ModelHookManager#

Bases: Borg

Manages hooks for models. Has the ability to find the layer automatically. Otherwise the layer or the layer name needs to be provided.

Class to manage patches

get_embedding_layer_auto(model)#

Use a scoring algorithm to find the embedding layer automatically given a model. The higher the score the more likely it is the embedding layer.

Return type:

Module

get_layer_by_name(model, name)#

Iterate over each layer and stop once the the layer name matches :type model: Module :param model: Model :parm name: string

Return type:

Module

attach_hooks_to_model(model, hook_fn, model_layer=None)#

Attach hook and save it in our hook list

Return type:

RemovableHandle

attach_classifier_hook(model, classifier_hook, model_layer=None)#

Attach hook and save it in our hook list

Return type:

RemovableHandle

attach_hook(selected_layer, hook)#

Register a hook and save it in our hook list

Return type:

RemovableHandle

detach_hooks()#

Remove all hooks from the model

Return type:

None

class ModelOutputsStore(embs=None, logits=None, ids_queue=<factory>, ids=None)#

Bases: object

embs: Optional[Tensor] = None#
logits: Union[Tensor, Tuple[Tuple], None] = None#
ids_queue: List[List[int]]#
ids: Optional[List[int]] = None#
clear()#

Resets the arrays of the class.

Return type:

None

clear_all()#

Resets the arrays of the class.

Return type:

None

to_dict()#

Returns the class as a dictionary.

Return type:

Dict[str, Any]

class TorchHelper(model=None, hook_manager=None, model_outputs_store=<factory>, dl_next_idx_ids=<factory>, model_input=tensor([]), batch=<factory>, ids=<factory>, patches=<factory>)#

Bases: object

model: Optional[Any] = None#
hook_manager: Optional[ModelHookManager] = None#
model_outputs_store: ModelOutputsStore#
dl_next_idx_ids: List[Any]#
model_input: Tensor = tensor([])#
batch: Dict[str, Any]#
ids: List[Any]#
patches: List[Dict]#
clear()#

Resets the arrays of the class.

Return type:

None

class TorchBaseInstance#

Bases: object

embedding_dim: Union[int, slice, Tensor, List, Tuple, None]#
logits_dim: Union[int, slice, Tensor, List, Tuple, None]#
embedding_fn: Optional[Any] = None#
logits_fn: Optional[Any] = None#
task: TaskType#
torch_helper_data: TorchHelper#
store_batch_indices(store)#
Return type:

Callable

patch_iterator_with_store(store)#

Patches the iterator of the dataloader to return the indices

Return type:

Callable

validate_fancy_index_str(input_str='[:, 1:, :]')#

Validates a fancy index string. :type input_str: str :param input_str: The string to validate for example “[:, 1:, :]” :rtype: bool :return: True if the string is valid, False otherwise

convert_fancy_idx_str_to_slice(fstr='[:, 1:, :]')#

Converts a fancy index string to a slice. :type fstr: str :param fstr: The fancy index string to convert for example “[:, 1:, :]” :rtype: Union[Tuple, slice, int] :return: The slice for example: (slice(None, None, None), slice(1, None, None), slice(None, None, None))

unpatch(patches=[])#

Unpatch all patched classes and instances :type patches: List[Dict[str, Any]] :param patches: list of patches

Return type:

None

remove_hook(child, all=False)#

Remove all forward hooks from a module. :type child: Module :param child: The module to remove the hooks from. :type all: bool :param all: If true, all hooks will be removed. If false, only the hooks starting with dq_ will be removed.

Return type:

None

remove_all_forward_hooks(model, all=False, start=True)#

Remove all forward hooks from a model. :type model: Module :param model: The model to remove the hooks from. :type all: bool :param all: If true, all hooks will be removed. If false, only the hooks starting with dq_ will be removed. :type start: bool :param start: If true, the function will be called recursively on all submodules.

Return type:

None

find_dq_hook_by_name(model, name='_dq_', start=True)#

Find a hook by name in a model. :type model: Module :param model: The model to search the hook in. :type name: str :param name: The name of the hook. :rtype: bool :return: True if the hook was found, False otherwise.

class PatchSingleDataloaderIterator(dataloader_cls, store, fn_name='_get_iterator')#

Bases: Patch

Initializes the class with a collate function, columns to keep, and a store to save ids.

Parameters:
  • collate_fn – The collate function to wrap

  • keep_cols – The columns to keep

  • store (ModelOutputsStore) – The store to use to save the ids

name = 'patch_single_dataloader_iterator'#
class PatchSingleDataloaderNextIndex(dataloader_cls, store, fn_name='_get_iterator')#

Bases: Patch

Initializes the class with a collate function, columns to keep, and a store to save ids.

Parameters:
  • collate_fn – The collate function to wrap

  • keep_cols – The columns to keep

  • store (ModelOutputsStore) – The store to use to save the ids

name = 'patch_next_index'#
class PatchDataloadersGlobally(torch_helper)#

Bases: Patch

Patch the dataloaders to store the indices of the batches. :param store: The store to save the indices to.

name = 'patch_dataloaders_globally'#
patch_dataloaders()#
Return type:

None

dataquality.utils.transformers module#

class RemoveIdCollatePatch(trainer_cls, keep_cols, store, fn_name='data_collator')#

Bases: Patch

Initializes the class with a collate function, columns to keep, and a store to save ids.

Parameters:
  • collate_fn – The collate function to wrap

  • keep_cols (List[str]) – The columns to keep

  • store (ModelOutputsStore) – The store to use to save the ids

name = 'remove_id_collate_patch'#
class SignatureColumnsPatch(trainer)#

Bases: Patch

Patches the trainer to add the id column to the signature columns. And adds a collate function to remove the id column. During training. If the id is already in the signature column, it will not be removed

on unpatching.

Parameters:

trainer (Trainer) – The trainer to patch

name = 'signature_columns_patch'#
class ModelHook(model, hook_fn)#

Bases: Patch

name = 'dq_hook'#

dataquality.utils.ultralytics module#

box_label(image, box, label='', color=(128, 128, 128), txt_color=(255, 255, 255))#

Draw a labeled bounding box on an image.

Parameters:
  • image (Any) – The image to draw on.

  • box (Any) – The bounding box to draw. The format is (x1, y1, x2, y2).

  • label (str) – The label to draw.

  • color (Tuple[int, int, int]) – The color of the box and label.

  • txt_color (Tuple[int, int, int]) – The color of the text.

Return type:

None

plot_bboxes(image, boxes, labels=[], score=True, conf=None)#

Plot bounding boxes on image, with or without confidence score.

Return type:

None

Parameters:
  • image (Any) – The image to draw on.

  • boxes (Any) – The bounding boxes to draw.

The format is (x1, y1, x2, y2, class, score). :type labels: List :param labels: The labels to draw. :type score: bool :param score: Whether to draw the score. :type conf: Optional[float] :param conf: The confidence threshold.

non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, labels=(), max_det=300, nc=0, nm=None)#

Perform non-maximum suppression (NMS) on a set of boxes, with support for masks and multiple labels per box.

Parameters:
  • prediction (torch.Tensor) – A tensor of shape (batch_size, num_boxes, num_classes + 4 + num_masks) containing the predicted boxes, classes, and masks. The tensor should be in the format output by a model, such as YOLO.

  • conf_thres (float) – The confidence threshold below which boxes will be filtered out. Valid values are between 0.0 and 1.0.

  • iou_thres (float) – The IoU threshold below which boxes will be filtered out during NMS. Valid values are between 0.0 and 1.0.

  • classes (List[int]) – A list of class indices to consider. If None, all classes will be considered.

  • agnostic (bool) – If True, the model is agnostic to the number of classes, and all classes will be considered as one.

  • multi_label (bool) – If True, each box may have multiple labels.

  • labels (List[List[Union[int, float, torch.Tensor]]]) – A list of lists, where each inner list contains the apriori labels for a given image. The list should be in the format output by a dataloader, with each label being a tuple of (class_index, x1, y1, x2, y2).

  • max_det (int) – The maximum number of boxes to keep after NMS.

  • nc (int) – (optional) The number of classes output by the model. Any indices after this will be considered masks.

Returns:

A list of length batch_size,

where each element is a tensor of shape (num_boxes, 6 + num_masks) containing the kept boxes, with columns (x1, y1, x2, y2, confidence, class, mask1, mask2, …).

Return type:

(List[torch.Tensor])

denorm(in_images)#

Denormalize images.

Parameters:

in_images (Any) – Input images

Return type:

ndarray

process_batch_data(batch)#

Convert batch data to a dictionary of image index to image data. Also denormalizes the images.

Parameters:

batch (Dict) – Batch data

Return type:

Dict[int, Any]

temporary_cfg_for_val(cfg, split, ds_path='')#

Creates a temporary config file with the split set to the given split.

Parameters:
  • cfg_path – Path to the config file

  • split (Split) – Split to set the config to

Return type:

str

dataquality.utils.upload module#

class UploadDfWorker(request_queue, project_id, file_list, stop_val, export_format, export_cols, temp_dir, bucket, object_path, show_progress=True, pbar=None, step=None, use_data_md5_hash=True)#

Bases: Thread

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run()#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Return type:

None

chunk_load_then_upload_df(file_list, export_cols, temp_dir, bucket, project_id=None, object_path=None, parallel=False, step=50, num_workers=1, stop_val='END', export_format='arrow', show_progress=True, use_data_md5_hash=True)#
Return type:

None

dataquality.utils.upload_model module#

create_tar_archive(source_folder, output_filename)#

Creates a tar archive from a folder / model. :type source_folder: str :param source_folder: The folder to archive. :type output_filename: str :param output_filename: The name of the output tar file.

Return type:

None

upload_to_minio_using_presigned_url(presigned_url, file_path)#

Uploads a file to a presigned url.

Return type:

Tuple

upload_model_to_dq(model, model_parameters, model_kind, tokenizer)#

Uploads the model to the Galileo platform.

Return type:

None

Returns:

None

dataquality.utils.vaex module#

validate_unique_ids(df, epoch_or_inf_name)#

Helper function to validate the logged df has unique ids

Fail gracefully otherwise

Return type:

None

get_dup_ids(df)#

Gets the list of duplicate IDs in a dataframe, if any

Return type:

List

drop_empty_columns(df)#

Drops any columns that have no values

Return type:

DataFrame

filter_df(df, col_name, value)#

Filter vaex df on the value of a column

Drop any columns for this df that are empty (e.g. metadata logged for a different split)

Return type:

DataFrame

rename_df(df, columns)#

Renames a vaex df using a mapping

Return type:

DataFrame

add_umap_pca_to_df(df, data_embs=False)#

Adds the PCA embeddings and UMAP xy embeddings if possible

If data_embs is True, the x and y values from umap will be named data_x and data_y

Return type:

DataFrame

create_data_embs_df(df, text_col, lazy=True)#

Runs sentence transformer on raw text to get off the shelf data embeddings

text_col can be passed in as “input” or “target” for Seq2Seq tasks. text_col can also be any given metadata text column.

Parameters:
  • df (DataFrame) – The dataframe to get data embeddings for. Must have the text_col

  • text_col (str) – The column to use for calculating data embeddings

  • lazy (bool) – If true, we lazily apply the model to encode the text

Return type:

DataFrame

get_output_df(dir_name, prob_only, split, epoch_or_inf)#

Creates the single hdf5 file for the output data of a split/epoch

Applies the necessary conversions post-concatenation of files (see concat_hdf5_files)

Return type:

DataFrame

add_pca_to_df(df, chunk_size=100000)#

Adds the ‘emb_pca’ to the dataframe

Return type:

DataFrame

dataquality.utils.version module#

version_check()#

version_check.

Asserts that the dataquality python client and the api have matching major versions.

https://semver.org/#summary.

Return type:

None

Returns:

None

Module contents#

class tqdm(*_, **__)#

Bases: Comparable

Decorate an iterable object, returning an iterator which acts exactly like the original iterable, but prints a dynamically updating progressbar every time a value is requested.

Parameters:
  • iterable (iterable, optional) – Iterable to decorate with a progressbar. Leave blank to manually manage the updates.

  • desc (str, optional) – Prefix for the progressbar.

  • total (int or float, optional) – The number of expected iterations. If unspecified, len(iterable) is used if possible. If float(“inf”) or as a last resort, only basic progress statistics are displayed (no ETA, no progressbar). If gui is True and this parameter needs subsequent updating, specify an initial arbitrary large positive number, e.g. 9e9.

  • leave (bool, optional) – If [default: True], keeps all traces of the progressbar upon termination of iteration. If None, will leave only if position is 0.

  • file (io.TextIOWrapper or io.StringIO, optional) – Specifies where to output the progress messages (default: sys.stderr). Uses file.write(str) and file.flush() methods. For encoding, see write_bytes.

  • ncols (int, optional) – The width of the entire output message. If specified, dynamically resizes the progressbar to stay within this bound. If unspecified, attempts to use environment width. The fallback is a meter width of 10 and no limit for the counter and statistics. If 0, will not print any meter (only stats).

  • mininterval (float, optional) – Minimum progress display update interval [default: 0.1] seconds.

  • maxinterval (float, optional) – Maximum progress display update interval [default: 10] seconds. Automatically adjusts miniters to correspond to mininterval after long display update lag. Only works if dynamic_miniters or monitor thread is enabled.

  • miniters (int or float, optional) – Minimum progress display update interval, in iterations. If 0 and dynamic_miniters, will automatically adjust to equal mininterval (more CPU efficient, good for tight loops). If > 0, will skip display of specified number of iterations. Tweak this and mininterval to get very efficient loops. If your progress is erratic with both fast and slow iterations (network, skipping items, etc) you should set miniters=1.

  • ascii (bool or str, optional) – If unspecified or False, use unicode (smooth blocks) to fill the meter. The fallback is to use ASCII characters “ 123456789#”.

  • disable (bool, optional) – Whether to disable the entire progressbar wrapper [default: False]. If set to None, disable on non-TTY.

  • unit (str, optional) – String that will be used to define the unit of each iteration [default: it].

  • unit_scale (bool or int or float, optional) – If 1 or True, the number of iterations will be reduced/scaled automatically and a metric prefix following the International System of Units standard will be added (kilo, mega, etc.) [default: False]. If any other non-zero number, will scale total and n.

  • dynamic_ncols (bool, optional) – If set, constantly alters ncols and nrows to the environment (allowing for window resizes) [default: False].

  • smoothing (float, optional) – Exponential moving average smoothing factor for speed estimates (ignored in GUI mode). Ranges from 0 (average speed) to 1 (current/instantaneous speed) [default: 0.3].

  • bar_format (str, optional) –

    Specify a custom bar string formatting. May impact performance. [default: ‘{l_bar}{bar}{r_bar}’], where l_bar=’{desc}: {percentage:3.0f}%|’ and r_bar=’| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, ‘

    ’{rate_fmt}{postfix}]’

    Possible vars: l_bar, bar, r_bar, n, n_fmt, total, total_fmt,

    percentage, elapsed, elapsed_s, ncols, nrows, desc, unit, rate, rate_fmt, rate_noinv, rate_noinv_fmt, rate_inv, rate_inv_fmt, postfix, unit_divisor, remaining, remaining_s, eta.

    Note that a trailing “: “ is automatically removed after {desc} if the latter is empty.

  • initial (int or float, optional) – The initial counter value. Useful when restarting a progress bar [default: 0]. If using float, consider specifying {n:.3f} or similar in bar_format, or specifying unit_scale.

  • position (int, optional) – Specify the line offset to print this bar (starting from 0) Automatic if unspecified. Useful to manage multiple bars at once (eg, from threads).

  • postfix (dict or *, optional) – Specify additional stats to display at the end of the bar. Calls set_postfix(**postfix) if possible (dict).

  • unit_divisor (float, optional) – [default: 1000], ignored unless unit_scale is True.

  • write_bytes (bool, optional) – Whether to write bytes. If (default: False) will write unicode.

  • lock_args (tuple, optional) – Passed to refresh for intermediate output (initialisation, iterating, and updating).

  • nrows (int, optional) – The screen height. If specified, hides nested bars outside this bound. If unspecified, attempts to use environment height. The fallback is 20.

  • colour (str, optional) – Bar colour (e.g. ‘green’, ‘#00ff00’).

  • delay (float, optional) – Don’t display until [default: 0] seconds have elapsed.

  • gui (bool, optional) – WARNING: internal parameter - do not use. Use tqdm.gui.tqdm(…) instead. If set, will attempt to use matplotlib animations for a graphical output [default: False].

Returns:

out

Return type:

decorated iterator.

monitor_interval = 10#
monitor = None#
static format_sizeof(num, suffix='', divisor=1000)#

Formats a number (greater than unity) with SI Order of Magnitude prefixes.

Parameters:
  • num (float) – Number ( >= 1) to format.

  • suffix (str, optional) – Post-postfix [default: ‘’].

  • divisor (float, optional) – Divisor between prefixes [default: 1000].

Returns:

out – Number with Order of Magnitude SI unit postfix.

Return type:

str

static format_interval(t)#

Formats a number of seconds as a clock time, [H:]MM:SS

Parameters:

t (int) – Number of seconds.

Returns:

out – [H:]MM:SS

Return type:

str

static format_num(n)#

Intelligent scientific notation (.3g).

Parameters:

n (int or float or Numeric) – A Number.

Returns:

out – Formatted number.

Return type:

str

static status_printer(file)#

Manage the printing and in-place updating of a line of characters. Note that if the string is longer than a line, then in-place updating may not work (it will print a new line at each refresh).

static format_meter(n, total, elapsed, ncols=None, prefix='', ascii=False, unit='it', unit_scale=False, rate=None, bar_format=None, postfix=None, unit_divisor=1000, initial=0, colour=None, **extra_kwargs)#

Return a string-based progress bar given some parameters

Parameters:
  • n (int or float) – Number of finished iterations.

  • total (int or float) – The expected total number of iterations. If meaningless (None), only basic progress statistics are displayed (no ETA).

  • elapsed (float) – Number of seconds passed since start.

  • ncols (int, optional) – The width of the entire output message. If specified, dynamically resizes {bar} to stay within this bound [default: None]. If 0, will not print any bar (only stats). The fallback is {bar:10}.

  • prefix (str, optional) – Prefix message (included in total width) [default: ‘’]. Use as {desc} in bar_format string.

  • ascii (bool, optional or str, optional) – If not set, use unicode (smooth blocks) to fill the meter [default: False]. The fallback is to use ASCII characters “ 123456789#”.

  • unit (str, optional) – The iteration unit [default: ‘it’].

  • unit_scale (bool or int or float, optional) – If 1 or True, the number of iterations will be printed with an appropriate SI metric prefix (k = 10^3, M = 10^6, etc.) [default: False]. If any other non-zero number, will scale total and n.

  • rate (float, optional) – Manual override for iteration rate. If [default: None], uses n/elapsed.

  • bar_format (str, optional) –

    Specify a custom bar string formatting. May impact performance. [default: ‘{l_bar}{bar}{r_bar}’], where l_bar=’{desc}: {percentage:3.0f}%|’ and r_bar=’| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, ‘

    ’{rate_fmt}{postfix}]’

    Possible vars: l_bar, bar, r_bar, n, n_fmt, total, total_fmt,

    percentage, elapsed, elapsed_s, ncols, nrows, desc, unit, rate, rate_fmt, rate_noinv, rate_noinv_fmt, rate_inv, rate_inv_fmt, postfix, unit_divisor, remaining, remaining_s, eta.

    Note that a trailing “: “ is automatically removed after {desc} if the latter is empty.

  • postfix (*, optional) – Similar to prefix, but placed at the end (e.g. for additional stats). Note: postfix is usually a string (not a dict) for this method, and will if possible be set to postfix = ‘, ‘ + postfix. However other types are supported (#382).

  • unit_divisor (float, optional) – [default: 1000], ignored unless unit_scale is True.

  • initial (int or float, optional) – The initial counter value [default: 0].

  • colour (str, optional) – Bar colour (e.g. ‘green’, ‘#00ff00’).

Returns:

out

Return type:

Formatted meter and stats, ready to display.

classmethod write(s, file=None, end='\n', nolock=False)#

Print a message via tqdm (without overlap with bars).

classmethod external_write_mode(file=None, nolock=False)#

Disable tqdm within context and refresh tqdm when exits. Useful when writing to standard output stream

classmethod set_lock(lock)#

Set the global lock.

classmethod get_lock()#

Get the global lock. Construct it if it does not exist.

classmethod pandas(**tqdm_kwargs)#
Registers the current tqdm class with

pandas.core. ( frame.DataFrame | series.Series | groupby.(generic.)DataFrameGroupBy | groupby.(generic.)SeriesGroupBy ).progress_apply

A new instance will be created every time progress_apply is called, and each instance will automatically close() upon completion.

Parameters:

tqdm_kwargs (arguments for the tqdm instance) –

Examples

>>> import pandas as pd
>>> import numpy as np
>>> from tqdm import tqdm
>>> from tqdm.gui import tqdm as tqdm_gui
>>>
>>> df = pd.DataFrame(np.random.randint(0, 100, (100000, 6)))
>>> tqdm.pandas(ncols=50)  # can use tqdm_gui, optional kwargs, etc
>>> # Now you can use `progress_apply` instead of `apply`
>>> df.groupby(0).progress_apply(lambda x: x**2)

References

<https://stackoverflow.com/questions/18603270/ progress-indicator-during-pandas-operations-python>

update(n=1)#

Manually update the progress bar, useful for streams such as reading files. E.g.: >>> t = tqdm(total=filesize) # Initialise >>> for current_buffer in stream: … … … t.update(len(current_buffer)) >>> t.close() The last line is highly recommended, but possibly not necessary if t.update() will be called in such a way that filesize will be exactly reached and printed.

Parameters:

n (int or float, optional) – Increment to add to the internal counter of iterations [default: 1]. If using float, consider specifying {n:.3f} or similar in bar_format, or specifying unit_scale.

Returns:

out – True if a display() was triggered.

Return type:

bool or None

close()#

Cleanup and (if leave=False) close the progressbar.

clear(nolock=False)#

Clear current bar display.

refresh(nolock=False, lock_args=None)#

Force refresh the display of this bar.

Parameters:
  • nolock (bool, optional) – If True, does not lock. If [default: False]: calls acquire() on internal lock.

  • lock_args (tuple, optional) – Passed to internal lock’s acquire(). If specified, will only display() if acquire() returns True.

unpause()#

Restart tqdm timer from last print time.

reset(total=None)#

Resets to 0 iterations for repeated use.

Consider combining with leave=True.

Parameters:

total (int or float, optional. Total to use for the new bar.) –

set_description(desc=None, refresh=True)#

Set/modify description of the progress bar.

Parameters:
  • desc (str, optional) –

  • refresh (bool, optional) – Forces refresh [default: True].

set_description_str(desc=None, refresh=True)#

Set/modify description without ‘: ‘ appended.

set_postfix(ordered_dict=None, refresh=True, **kwargs)#

Set/modify postfix (additional stats) with automatic formatting based on datatype.

Parameters:
  • ordered_dict (dict or OrderedDict, optional) –

  • refresh (bool, optional) – Forces refresh [default: True].

  • kwargs (dict, optional) –

set_postfix_str(s='', refresh=True)#

Postfix without dictionary expansion, similar to prefix handling.

moveto(n)#
property format_dict#

Public API for read-only member access.

display(msg=None, pos=None)#

Use self.sp to display msg in the specified pos.

Consider overloading this function when inheriting to use e.g.: self.some_frontend(**self.format_dict) instead of self.sp.

Parameters:
  • msg (str, optional. What to display (default: repr(self)).) –

  • pos (int, optional. Position to moveto) – (default: abs(self.pos)).

classmethod wrapattr(stream, method, total=None, bytes=True, **tqdm_kwargs)#

stream : file-like object. method : str, “read” or “write”. The result of read() and

the first argument of write() should have a len().

>>> with tqdm.wrapattr(file_obj, "read", total=file_obj.size) as fobj:
...     while True:
...         chunk = fobj.read(chunk_size)
...         if not chunk:
...             break