This is a cache of https://developer.ibm.com/tutorials/dpk-scaling-custom-transforms/. It is a snapshot of the page as it appeared on 2025-11-24T08:13:18.912+0000.
Scaling data preparation workflows in Data Prep Kit (DPK) - IBM Developer
In this tutorial, we will continue with the sample digest transform that was created in the previous module as a Python module and implement extensions to that module that will result in the creation of a Ray scalable version of the same transform. This scaled version can run either on a single machine or a Kubernetes cluster of machines. We will also discuss the development of unit tests for our digest transform, integration with CI/CD, and optional integration with KubeFlow Pipeline (KFP).
Prerequisites
Follow the steps in this tutorial to build the digest transform using the Python runtime.
Steps
Step 1. Create a submodule for the Ray implementation
For this tutorial, we will follow the convention of having ray as a submodule of the main transform module, and add the required code to ray/runtime.py* in the digest/dpk_digest/ray directory. While we encourage you to follow this convention, we realize that there are cases where you might need to deviate from it and use a different name for the ray submodule or even implement ray functionality as its own module. Recall the proposed folder structure in the previous tutorial, which now includes the ray subdirectory.
The dpk_digest/ray/runtime.py file implements the necessary API for integrating the digest transform with the Ray runtime orchestrator.
import sys
from data_processing.utils import ParamsUtils, get_logger
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing_ray.runtime.ray.runtime_configuration import RayTransformRuntimeConfiguration
from dpk_digest.runtime import DigestConfiguration
logger = get_logger(__name__)
classDigestRayRuntime(RayTransformRuntimeConfiguration):
def__init__(self):
super().__init__(transform_config=DigestConfiguration())
if __name__ == "__main__":
launcher = RayTransformLauncher(DigestRayRuntime())
launcher.launch()
Copy codeCopied!
Similarly, we will implement the Digest API for the Ray submodule to define a simplified method for the API, enabling the user to use the runtime while still being able to invoke the transform from a notebook.
classDigest:
def__init__(self, **kwargs):
self.params = {}
for key in kwargs:
self.params[key] = kwargs[key]
try:
local_conf = {k: self.params[k] for k in ("input_folder", "output_folder")}
self.params["data_local_config"] = ParamsUtils.convert_to_ast(local_conf)
delself.params["input_folder"], self.params["output_folder"]
except:
passtry:
worker_options = {k: self.params[k] for k in ("num_cpus", "memory")}
self.params["runtime_worker_options"] = ParamsUtils.convert_to_ast(worker_options)
delself.params["num_cpus"], self.params["memory"]
except:
passdeftransform(self):
sys.argv = ParamsUtils.dict_to_req(d=(self.params))
launcher = RayTransformLauncher(DigestRayRuntime())
return_code = launcher.launch()
return return_code
Copy codeCopied!
Step 3. Run the transform
You can run the transform in a notebook or via the CLI.
The notebook should show how to run the notebook from the current folder. This is a simple notebook for the Ray version of our digest transform.
To run the transform from the CLI, you need to set up a virtual environment (we will use venv) with dependencies for testing and then run the transform with –help option:
And to make sure that the run created the output files:
ls output
metadata.json test1.parquet
Copy codeCopied!
Step 4. Develop the unit tests
In this step, we will develop unit tests for the Python runtime of our transform. This could be extended to developing unit tests for the Ray runtime, but we will skip that for simplicity in this module. For our testing, we will need some initial data as input to the transform. We will copy it from an existing transform test folder.
cd data-prep-kit/transforms/universal/digest
mkdir -p test-data/input
cp ../../language/doc_chunk/test-data/expected/*.parquet test-data/input
Copy codeCopied!
Create a virtual environment and run the transform against the input data to produce the expected output data. This will be used by the CI/CD code to verify that the logic of the transform always produces the same output for a given input.
Developers have some freedom in designing their unit tests. In this section, we show how developers can use the test fixture defined in the framework to rapidly create unit tests. Here is the code for test/test_digest.py:
The repo implements a rich set of functionality for setting up the environment, running unit tests, publishing the transforms to pypi, building the transforms as part of a docker image, and running it with Kubeflow. For the purpose of this tutorial, we will explore only a portion of the capabilities that support this initial phase of the implementation.
We will first copy the Makefile template from the parent folder:
cd data-prep-kit/transforms/universal/digest
cp ../../Makefile.transform.template Makefile
cp ../../Dockerfile.python.template Dockerfile.python
Copy codeCopied!
The Makefile has a number of predefined targets that will be useful for testing and publishing the transform. To get a list of available targets, run the following command from the digest folder:
make
Copy codeCopied!
Below is a small list of available targets that may be useful at this stage:
Target Description
------ -----------
clean Clean up the virtual environment.
venv Create the virtual environment using requirements.txt
test-src Create the virtual environment using requirements.txt and run the unit tests
image Build the docker image for the transform
test-image Build and test the docker image for the transform
publish Publish the docker image to quay.io container registry
Copy codeCopied!
Create virtual environment with all preloaded dependencies:
make clean && make venv
Copy codeCopied!
Run unit tests and verify the proper operations of the code:
make test-src
Copy codeCopied!
Though not part of the CI/CD, as a last step, if we want to include our transform in the PyPi wheel that has all our transforms, we edit the transforms/pyproject.toml project and add the requirements.txt for the module and the name of the module and its package location:
Step 6. Optionally, integrate the transform with KubeFlow Pipelines (KFP)
You might choose to build a KubeFlow Pipeline (KFP) that chains multiple transforms together. In this step, we will show the steps you need to take so that the ops team can create a pipeline tailored to their specific use case. We will only cover the artifact that you need to produce to enable the integration of the digest transform in a KFP pipeline.
Create a folder to host KFP-related artifacts, in particular a makefile.
cd data-prep-kit/transforms/universal/digest
mkdir -p kfp_ray
cp ../../Makefile.kfp.template kfp_ray/Makefile
Copy codeCopied!
Create a KFP definition file, kfp-ray/digest_wf.py. This file will be used to produce the KFP workflow yaml definition file. The full content of this file is available here. We only highlight some of the key elements.
This file defines the reference to the docker image for the transform (task_image) and entry point (EXEC_SCRIPT_NAME).
import os
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
task_image = "quay.io/dataprep1/data-prep-kit/digest-ray:latest"
# the name of the job script
EXEC_SCRIPT_NAME: str = "-m dpk_digest.ray.runtime"
# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
Copy codeCopied!
Then, it defines the list of configuration parameters that are required by the framework return as a dictionary structure (digest_algorithm):
It defines the main entry point for compiling the yaml file required for running kfp.
if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(digest, __file__.replace(".py", ".yaml"))
Copy codeCopied!
Summary
In this tutorial you learned how to build a Ray runtime, create unit tests, integrate with CI/CD, and add KFP extensions to existing transforms. The source code used for this tutorial can be found in my forked data-prep-kit repo.
About cookies on this siteOur websites require some cookies to function properly (required). In addition, other cookies may be used with your consent to analyze site usage, improve the user experience and for advertising.For more information, please review your cookie preferences options. By visiting our website, you agree to our processing of information as described in IBM’sprivacy statement. To provide a smooth navigation, your cookie preferences will be shared across the IBM web domains listed here.