This is a cache of https://developer.ibm.com/tutorials/dpk-data-pipelines/. It is a snapshot of the page as it appeared on 2025-11-14T13:48:18.332+0000.
Building data pipelines using natural language with Data Prep Kit (DPK) - IBM Developer
Large language models (LLMs) are revolutionizing various industries, but building and deploying them effectively requires robust data preparation pipelines. Data Prep Kit (DPK) provides a powerful and flexible framework for building these data pipelines.
In this tutorial, we explore two experimental notebooks that demonstrate how agentic workflows can support the creation of data pipelines using the Data Prep Kit based on user input provided in natural language:
The first notebook, Planning_DPK_agent.ipynb, demonstrates how to use a Planning DPK Agent to generate a Python script that processes PDF documents by identifying and removing inappropriate content and eliminating duplicates.
The second notebook, dpk_as_tools.ipynb, demonstrates how to integrate DPK’s data transformation capabilities as tools within popular frameworks like LangChain and LlamaIndex.
Prerequisites
To follow along, you’ll need:
A local Python development environment. While you can use an Anaconda Python environment, you can also use a Python virtual environment on Python v3.11.
A (free) account at Replicate to run LLMs. Replicate is the default LLM provider used in our notebooks, though alternatives are also available (see the next Configure LLM Access section). Use this invite to add some credit to your Replicate account. The free account will give you a few API calls for free. That is enough for this tutorial. Once you sign up, create a token by going to your account, selecting API tokens, and creating a new token, called rag-1 (you can use any name) as described in this section of the docs.
Building data pipelines using natural language with the DPK Planning Agent
The DPK Planning Agent (Planning_DPK_agent.ipynb) is an intelligent assistant that helps data engineers and users efficiently build data processing pipelines using natural language instructions. Instead of manually coding complex data transformation pipelines, you can describe your requirements in plain language, and then the DPK Planning agent will create an executable pipeline for you.
The DPK Planning Agent uses the LLM’s capabilities to understand your requirements and generate executable code that implements the necessary data transformations.
Key benefits include:
Reduced development time for data pipelines
Accessibility for users without extensive coding experience
Ensuring consistent application of best practices through the judge component
Flexible adaptation to various data processing needs
In this tutorial, you use the DPK Planning Agent to generate a Python script that processes PDF documents, identifies and removes inappropriate content, and eliminates duplicate documents.
Overview of the Agent's Pipeline
The DPK Planning Agent uses a Langflow workflow based on LLMs to:
Understand your task requirements in natural language
Plan a series of data transformations using DPK tools
Review and refine the plan through a judge component
Generate executable Python code that implements the pipeline
Understanding the components
The DPK Planning Agent consists of several key components:
Tools (DPK transforms): The DPK agent uses transforms as its tools. Each tool is a data prep kit transform that can be applied to your data. Some key transforms include:
Pdf2Parquet: Generates parquet files containing the converted document.
DocQuality: Calculates and annotates several metrics which are useful to assess the quality of the document.
Ededup: Exact data deduplication is used to identify (and remove) redundant records.
DocID: Adds a document identification (unique integers and content hashes), which later can be used in de-duplication operations.
LangID: Adds a column containing the language of the document.
Filter: Provides SQL-based expressions for filtering rows and optionally column removal from parquet files.
Planner: The planner component takes your natural language task and uses an LLM to generate a structured plan using the available tools. It follows these steps:
Parse the task description
Identify appropriate transforms to use
Determine the correct order of transforms
Create a JSON-formatted plan with detailed parameters
Judge: The judge component uses an LLM to evaluate the plan produced by the planner to ensure:
It properly addresses the task requirements
It follows all specified constraints
It doesn't include unnecessary steps
The inputs for each transform are properly specified
If the judge identifies issues, it provides feedback to the planner to refine the plan.
Code Generator: Once the plan is approved, the code generator uses an LLM to create an executable Python script that:
Imports the necessary Python modules, including the chosen transforms
Sets up command-line arguments for configurable parameters
Implements the workflow with proper error handling
Now let's walk through the process of using the Planning DPK Agent:
Step 1: Define Your Task
First, we define the task we want to accomplish:
from IPython.display import HTML
task = "Process the provided PDF dataset to identify and extract only documents that don't contain inappropriate language. Remove the duplications."
HTML(f"<p><span style='color:blue; font-weight:bold; font-size:14.0pt;'>TASK: {task}</span></p>")
Copy codeCopied!
Step 2: Configure LLM Access
The DPK Planning Agent can work with various LLM providers. You'll need to configure access to one of the following:
config = dotenv_values("./.env")
# Choose the models you want to use
model_id1 = "ibm-granite/granite-3.1-8b-instruct"
model_id2 = "meta-llama/llama-3-1-70b-instruct"
model_id3 = "mistralai/mixtral-8x7B-instruct-v0.1"
llm_plan = getChatLLM(“<LLM_PROVIDER>”, model_replicate_id1, config)
llm_judge = getChatLLM(“<LLM_PROVIDER>”, model_replicate_id2, config)
llm_generate = getChatLLM(“<LLM_PROVIDER>”, model_replicate_id3, config)
Copy codeCopied!
Note: Different agents can use different models.
Step 3: Set Up the Agent Workflow
The agent workflow is built using LangGraph, which coordinates the different components:
from langgraph.graph import StateGraph, END
from llm_utils.agent_helpers import *
from llm_utils.prompts.planner_prompt import *
from llm_utils.prompts.judge_prompt import *
from llm_utils.prompts.generate_prompt import *
from llm_utils.dpk.tools import *
from llm_utils.dpk.examples import *
from llm_utils.dpk.constraints import *
from functools import partial
# Create the graph
workflow = StateGraph(State)
# Add nodes
workflow.add_node("planner", partial(planner, prompt=planner_prompt_str, tools=tools_json, example=example_task1, context=constraints, llm=llm_plan))
workflow.add_node("judge", partial(judge, prompt=judge_prompt_str_dpk, tools=tools_json, context=constraints, llm=llm_judge))
workflow.add_node("user_review", get_user_review)
workflow.add_node("code generator", partial(generator, prompt=generate_prompt_str_with_example, llm=llm_generate))
workflow.add_node("code validator", code_validator_noop)
# Add edges and conditional logic
workflow.set_entry_point("planner")
workflow.add_edge("code generator", "code validator")
workflow.add_edge("code validator", END)
# Add conditional edges
workflow.add_conditional_edges(
"judge",
is_plan_OK,
{
False: "planner", # If needs revision, go back to plannerTrue: "user_review"# If plan is good, proceed to user review
}
)
workflow.add_conditional_edges(
"planner",
need_judge,
{
True: "judge", # If needs revision, go back to plannerFalse: "user_review"# If plan is good, proceed to user review
}
)
workflow.add_conditional_edges(
"user_review",
is_user_review_OK,
{
False: "planner", # If needs revision, go back to plannerTrue: "code generator",
}
)
Copy codeCopied!Show more
Step 4: Run the Agent
Finally, we compile and run the agent:
app = workflow.compile()
# Display the workflow graphfrom IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))
# Run the graph
initial_state = {
"task": task,
"context": "",
"plan": ["still no plan"],
"planning_attempts": 0,
"feedback": "Still no review",
"needs_revision": "",
"need_judge": True,
}
state = initial_state
for output in app.stream(state):
pass
Copy codeCopied!
Understanding the generated plan
When the agent runs, it will create a plan like this:
out_folder is the folder where output data will be stored
docq_bad_word_filepath – is the path to the list of “bad” words, which is used by the document quality transform
Customizing the DPL Planning Agent
You can customize the DPK Planning Agent for your specific use cases:
Modify the task: Change the natural language description to describe different data processing requirements.
Add constraints: Specify additional constraints in the constraints.py file.
Add examples: Provide example plans in the examples.py file to guide the planner toward similar solutions.
Extend available tools: Add new DPK transforms to the tools.py file.
Create data pipelines using LLMs by defining DPK transforms as LangChain or LlamaIndex tools
The dpk_as_tools.ipynb notebook explores a novel approach, creating and executing DPK pipelines using LLMs by defining DPK transforms as LangChain and LlamaIndex tools. It shows how DPK can be seamlessly integrated with LangChain and LlamaIndex to simplify and accelerate LLM data preparation. This integration offers a compelling value proposition for AI developers, simplifying complex data preparation tasks and enabling more intuitive pipeline construction.
Step 1: Defining the DPK transforms pipeline in natural language
In this step, the LLM is used to generate a DPK pipeline based on natural language input.
Setting Up the Environment
The initial cells in the notebook install the Python dependencies listed in requirements.txt and dpk-requirements.txt. Please make sure to run these cells.
Define the agentic framework to use
DPK transforms are wrapped as LangChain or LlamaIndex tools. This involves creating classes that inherit from the respective framework's Tool interface.
Set define_dpk_as_langchain_tools to True to use DPK tools defined as LangChain tools. Set it to False to use them as LlamaIndex tools instead.
# Use langchain or llama-index# Set to True to define DPK transforms as langchain tools; otherwise they will be defined as llama-index tools
define_dpk_as_langchain_tools=False
Copy codeCopied!
Step 1.3 Provide the input task in natural language
The user provides a description of the desired data transformations pipeline in plain English (e.g., “Execute pdf2parquet, doc_chunk, doc_id, ededup transforms”).
# Define the input task# Set to True to execute the transforms on the local Ray cluster; otherwise, the Python implementation is used.
run_with_local_ray=False
ray_text=""if run_with_local_ray:
ray_text="on a local ray cluster "
task=f"Execute pdf2parquet, doc_chunk, doc_id, ededup, text_encoder transforms {ray_text} one after the other where the input to a transform is the output of the previous transform run."
Copy codeCopied!
Step 1.4 Set input/output paths
Executing this cell will automatically populate the input and output cells while clearing any content from previous executions in the output cell.
In this initial version of the notebook, non-default transform parameters must be explicitly set in the input task. In future versions, we expect to support describing these parameters using natural language as well. Note that only the input directory for the first transform and the output directory for the final transform are explicitly specified. All intermediate directories for the remaining transforms will be generated by the LLM, based on examples included in the input prompt (as shown later).
This section outputs the list of transform descriptions from their agentic framework implementation—either LlamaIndex or LangChain. This list will be included as part of the input to the LLM agent.
# List DPK transformsif define_dpk_as_langchain_tools:
from llm_utils.dpk.langchain_tools.agent_toolkit.toolkit import DataPrepKitToolkit
toolkit = DataPrepKitToolkit()
tools = toolkit.get_tools()
print("-- DPK tools: --")
print(tools)
else:
from llm_utils.dpk.llama_index_tools.llama_index_tools_dpk.llama_index_dpk.tools.dpk.base import DPKTransformsToolSpec
dpk_spec = DPKTransformsToolSpec()
tools = dpk_spec.to_tool_list()
print("-- DPK tools: --")
for t in tools:
print(t.metadata.name)
Copy codeCopied!
Step 1.8 Define the prompt
In this cell, the prompt template is defined. The prompt includes:
A request to process the task.
Examples of valid pipeline specifications, emphasizing the requirement that each transform’s input must be the output of the previous transform.
Clear instructions on how to interpret the user's input.
Step 1.9 Invoke the agent and create the plan
In this cell, the LLM processes the input and, using its understanding of DPK transforms, generates a pipeline in the correct format, ensuring that the output of one transform becomes the input to the next.
Once the pipeline is generated, the notebook dynamically invokes each transform using the LangChain or LlamaIndex tool interface:
# Execute the transfoms by calling their tool definitiondefrun_tool(match) -> str:
defcontains_parquet_files(dir_path):
returnany(file.endswith(".parquet") for file in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, file)))
tool_name = match[0]
tool_to_use = find_tool_by_name(tools, tool_name)
tool_name = match[0]
tool_input="{"+match[1]+"}"
tool_input_dict = load_from_json(tool_input)
print("=======================================================")
print (f"🏃🏼 RUNNING {tool_name} with params: {tool_input_dict}")
print("=======================================================")
if define_dpk_as_langchain_tools:
tool_result = tool_to_use.run(tool_input_dict)
else:
tool_result = tool_to_use.call(**tool_input_dict)
ifnot contains_parquet_files(tool_input_dict["output_folder"]):
out_dir=tool_input_dict["output_folder"]
raise Exception (f"The {out_dir} directory is unexpectedly empty, indicating the job failed.")
print (f"✅ {tool_result}")
return tool_result
Copy codeCopied!
The last notebook cell includes code to inspect the generated output files. You will see a column called embeddings added at the end. This is the text content converted into vectors or embeddings as shown in the following figure. We used the model sentence-transformers/all-MiniLM-L6-v2.
Summary
In this tutorial, we explore two notebooks that showcase an experimental approach to generating DPK (Data Prep Kit) transform pipelines using agentic workflows. Although still in the early stages of development, this method shows promising potential to streamline data preparation and make it more accessible to a broader range of users, including those without deep technical expertise. Leveraging natural language input and LLM-powered planning opens the door to more intuitive and automated pipeline construction.
About cookies on this siteOur websites require some cookies to function properly (required). In addition, other cookies may be used with your consent to analyze site usage, improve the user experience and for advertising.For more information, please review your cookie preferences options. By visiting our website, you agree to our processing of information as described in IBM’sprivacy statement. To provide a smooth navigation, your cookie preferences will be shared across the IBM web domains listed here.