This is a cache of https://developer.ibm.com/tutorials/dpk-data-pipelines/. It is a snapshot of the page as it appeared on 2025-11-14T13:48:18.332+0000.
Building data pipelines using natural language with Data Prep Kit (DPK) - IBM Developer

Tutorial

Building data pipelines using natural language with Data Prep Kit (DPK)

From words to workflows

By

Mohammad Nasser,

Revital Sur,

Alexey Roytman,

Aanchal Goyal

Large language models (LLMs) are revolutionizing various industries, but building and deploying them effectively requires robust data preparation pipelines. Data Prep Kit (DPK) provides a powerful and flexible framework for building these data pipelines.

In this tutorial, we explore two experimental notebooks that demonstrate how agentic workflows can support the creation of data pipelines using the Data Prep Kit based on user input provided in natural language:

  • The first notebook, Planning_DPK_agent.ipynb, demonstrates how to use a Planning DPK Agent to generate a Python script that processes PDF documents by identifying and removing inappropriate content and eliminating duplicates.
  • The second notebook, dpk_as_tools.ipynb, demonstrates how to integrate DPK’s data transformation capabilities as tools within popular frameworks like LangChain and LlamaIndex.

Prerequisites

To follow along, you’ll need:

  • A local Python development environment. While you can use an Anaconda Python environment, you can also use a Python virtual environment on Python v3.11.
  • A (free) account at Replicate to run LLMs. Replicate is the default LLM provider used in our notebooks, though alternatives are also available (see the next Configure LLM Access section). Use this invite to add some credit to your Replicate account. The free account will give you a few API calls for free. That is enough for this tutorial. Once you sign up, create a token by going to your account, selecting API tokens, and creating a new token, called rag-1 (you can use any name) as described in this section of the docs.
  • Clone the Data Prep Kit repo, and you'll find these two notebooks in the examples/agentic directory.

Building data pipelines using natural language with the DPK Planning Agent

The DPK Planning Agent (Planning_DPK_agent.ipynb) is an intelligent assistant that helps data engineers and users efficiently build data processing pipelines using natural language instructions. Instead of manually coding complex data transformation pipelines, you can describe your requirements in plain language, and then the DPK Planning agent will create an executable pipeline for you.

The DPK Planning Agent uses the LLM’s capabilities to understand your requirements and generate executable code that implements the necessary data transformations.

Key benefits include:

  • Reduced development time for data pipelines
  • Accessibility for users without extensive coding experience
  • Ensuring consistent application of best practices through the judge component
  • Flexible adaptation to various data processing needs

In this tutorial, you use the DPK Planning Agent to generate a Python script that processes PDF documents, identifies and removes inappropriate content, and eliminates duplicate documents.

Overview of the Agent's Pipeline

The DPK Planning Agent uses a Langflow workflow based on LLMs to:

  1. Understand your task requirements in natural language
  2. Plan a series of data transformations using DPK tools
  3. Review and refine the plan through a judge component
  4. Generate executable Python code that implements the pipeline

flow diagram

Understanding the components

The DPK Planning Agent consists of several key components:

  • Tools (DPK transforms): The DPK agent uses transforms as its tools. Each tool is a data prep kit transform that can be applied to your data. Some key transforms include:

    • Pdf2Parquet: Generates parquet files containing the converted document.
    • DocQuality: Calculates and annotates several metrics which are useful to assess the quality of the document.
    • Ededup: Exact data deduplication is used to identify (and remove) redundant records.
    • DocID: Adds a document identification (unique integers and content hashes), which later can be used in de-duplication operations.
    • LangID: Adds a column containing the language of the document.
    • Filter: Provides SQL-based expressions for filtering rows and optionally column removal from parquet files.
  • Planner: The planner component takes your natural language task and uses an LLM to generate a structured plan using the available tools. It follows these steps:

    • Parse the task description
    • Identify appropriate transforms to use
    • Determine the correct order of transforms
    • Create a JSON-formatted plan with detailed parameters
  • Judge: The judge component uses an LLM to evaluate the plan produced by the planner to ensure:

    • It properly addresses the task requirements
    • It follows all specified constraints
    • It doesn't include unnecessary steps
    • The inputs for each transform are properly specified

If the judge identifies issues, it provides feedback to the planner to refine the plan.

  • Code Generator: Once the plan is approved, the code generator uses an LLM to create an executable Python script that:

    • Imports the necessary Python modules, including the chosen transforms
    • Sets up command-line arguments for configurable parameters
    • Implements the workflow with proper error handling
    • Manages the flow of data between transforms

Setting Up the Environment

First, let's install the required dependencies:

bash
pip install -r requirements.txt
pip install -r dpk-requirements.txt

Running the DPK Planning Agent

Now let's walk through the process of using the Planning DPK Agent:

Step 1: Define Your Task

First, we define the task we want to accomplish:

from IPython.display import HTML
task = "Process the provided PDF dataset to identify and extract only documents that don't contain inappropriate language. Remove the duplications."
HTML(f"<p><span style='color:blue; font-weight:bold; font-size:14.0pt;'>TASK: {task}</span></p>")

Step 2: Configure LLM Access

The DPK Planning Agent can work with various LLM providers. You'll need to configure access to one of the following:

config = dotenv_values("./.env")
# Choose the models you want to use
model_id1 = "ibm-granite/granite-3.1-8b-instruct"
model_id2 = "meta-llama/llama-3-1-70b-instruct"
model_id3 = "mistralai/mixtral-8x7B-instruct-v0.1"
llm_plan = getChatLLM(“<LLM_PROVIDER>”, model_replicate_id1, config)
llm_judge = getChatLLM(“<LLM_PROVIDER>”, model_replicate_id2, config)
llm_generate = getChatLLM(“<LLM_PROVIDER>”, model_replicate_id3, config)

Note: Different agents can use different models.

Step 3: Set Up the Agent Workflow

The agent workflow is built using LangGraph, which coordinates the different components:

from langgraph.graph import StateGraph, END
from llm_utils.agent_helpers import *
from llm_utils.prompts.planner_prompt import *
from llm_utils.prompts.judge_prompt import *
from llm_utils.prompts.generate_prompt import *
from llm_utils.dpk.tools import *
from llm_utils.dpk.examples import *
from llm_utils.dpk.constraints import *
from functools import partial

# Create the graph
workflow = StateGraph(State)
# Add nodes
workflow.add_node("planner", partial(planner, prompt=planner_prompt_str, tools=tools_json, example=example_task1, context=constraints, llm=llm_plan))
workflow.add_node("judge", partial(judge, prompt=judge_prompt_str_dpk, tools=tools_json, context=constraints, llm=llm_judge))
workflow.add_node("user_review", get_user_review)
workflow.add_node("code generator", partial(generator, prompt=generate_prompt_str_with_example, llm=llm_generate))
workflow.add_node("code validator", code_validator_noop)

# Add edges and conditional logic
workflow.set_entry_point("planner")
workflow.add_edge("code generator", "code validator")
workflow.add_edge("code validator", END)

# Add conditional edges
workflow.add_conditional_edges(
    "judge",
    is_plan_OK,
    {
        False: "planner",  # If needs revision, go back to planner
        True: "user_review"  # If plan is good, proceed to user review
    }
)

workflow.add_conditional_edges(
    "planner",
    need_judge,
    {
        True: "judge",  # If needs revision, go back to planner
        False: "user_review"  # If plan is good, proceed to user review
    }
)

workflow.add_conditional_edges(
    "user_review",
    is_user_review_OK,
    {
        False: "planner",  # If needs revision, go back to planner
        True: "code generator",
    }
)

Step 4: Run the Agent

Finally, we compile and run the agent:

app = workflow.compile()

# Display the workflow graph
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))

# Run the graph
initial_state = {
    "task": task,
    "context": "",
    "plan": ["still no plan"],
    "planning_attempts": 0,
    "feedback": "Still no review",
    "needs_revision": "",
    "need_judge": True,
}

state = initial_state

for output in app.stream(state):
    pass

Understanding the generated plan

When the agent runs, it will create a plan like this:

{"step_name": "Step #1 convert pdf to parquet", "tool_name": "Pdf2Parquet", "tool_input": [{"in_folder": "user_input", "out_folder": "user_input", "data_files_to_use": "['pdf']"}], "import": "from llm_utils.dpk.langchain_tools.tools.language.pdf2parquet import Pdf2parquetTransform", "step_ev": "#E1"}
{"step_name": "Step #2 remove duplicates", "tool_name": "exact_dedup", "tool_input": [{"in_folder": "#E1", "out_folder": "#E1"}], "import": "from llm_utils.dpk.langchain_tools.tools.universal.ededup import EdedupTransform", "step_ev": "#E2"}
{"step_name": "Step #3 calculate document quality", "tool_name": "doc_quality", "tool_input": [{"in_folder": "#E2", "out_folder": "#E2", "docq_bad_word_filepath": "user_input"}], "import": "from llm_utils.dpk.langchain_tools.tools.language.doc_quality import DocQualityTransform", "step_ev": "#E3"}
{"step_name": "Step #4 filter documents with inappropriate language", "tool_name": "filter_transform", "tool_input": [{"in_folder": "#E3", "out_folder": "#E3", "filter_criteria_list": "['docq_bad_word_count==0']"}], "import": "from llm_utils.dpk.langchain_tools.tools.universal.filter import FilterTransform", "step_ev": "#E4"}

The plan shows a sequence of transforms:

  1. Pdf2Parquet: Converts PDF files to Parquet format
  2. Exact Dedup: Removes exact duplicate documents based on the document hash
  3. DocQuality: Checks for inappropriate content using a list of bad words
  4. Filter: Removes documents that contain inappropriate content

plan with sequence of transforms

Understanding the generated code

Once the plan is approved, the agent generates a Python script that implements the pipeline. The script will include:

  • Imports for all necessary Python modules
  • A function to extract output folders from transform results
  • The main execution workflow that runs each transform in sequence
  • Command-line argument handling for configurable parameters

The generated script can be run from the command line like this:

python llm_plan_generated.py --in_folder /path/to/pdf/files --out_folder /path/to/output --docq_bad_word_filepath /path/to/bad_words_file

Where:

  • in_folder defines the input data folder,
  • out_folder is the folder where output data will be stored
  • docq_bad_word_filepath – is the path to the list of “bad” words, which is used by the document quality transform

Customizing the DPL Planning Agent

You can customize the DPK Planning Agent for your specific use cases:

  1. Modify the task: Change the natural language description to describe different data processing requirements.
  2. Add constraints: Specify additional constraints in the constraints.py file.
  3. Add examples: Provide example plans in the examples.py file to guide the planner toward similar solutions.
  4. Extend available tools: Add new DPK transforms to the tools.py file.

Create data pipelines using LLMs by defining DPK transforms as LangChain or LlamaIndex tools

The dpk_as_tools.ipynb notebook explores a novel approach, creating and executing DPK pipelines using LLMs by defining DPK transforms as LangChain and LlamaIndex tools. It shows how DPK can be seamlessly integrated with LangChain and LlamaIndex to simplify and accelerate LLM data preparation. This integration offers a compelling value proposition for AI developers, simplifying complex data preparation tasks and enabling more intuitive pipeline construction.

Step 1: Defining the DPK transforms pipeline in natural language

In this step, the LLM is used to generate a DPK pipeline based on natural language input.

Setting Up the Environment

The initial cells in the notebook install the Python dependencies listed in requirements.txt and dpk-requirements.txt. Please make sure to run these cells.

Define the agentic framework to use

DPK transforms are wrapped as LangChain or LlamaIndex tools. This involves creating classes that inherit from the respective framework's Tool interface.

Set define_dpk_as_langchain_tools to True to use DPK tools defined as LangChain tools. Set it to False to use them as LlamaIndex tools instead.

# Use langchain or llama-index
# Set to True to define DPK transforms as langchain tools; otherwise they will be defined as llama-index tools
define_dpk_as_langchain_tools=False

Step 1.3 Provide the input task in natural language

The user provides a description of the desired data transformations pipeline in plain English (e.g., “Execute pdf2parquet, doc_chunk, doc_id, ededup transforms”).

# Define the input task
# Set to True to execute the transforms on the local Ray cluster; otherwise, the Python implementation is used.
run_with_local_ray=False
ray_text=""
if run_with_local_ray:
    ray_text="on a local ray cluster "
task=f"Execute pdf2parquet, doc_chunk, doc_id, ededup, text_encoder transforms {ray_text} one after the other where the input to a transform is the output of the previous transform run."

Step 1.4 Set input/output paths

Executing this cell will automatically populate the input and output cells while clearing any content from previous executions in the output cell.

# Set input/output paths
import shutil
import os
cwd = os.getcwd()
output_base_path = f"{cwd}/output"
input_folder = f"{cwd}/test-data/input/"
output_folder =  f"{output_base_path}/final_1/"
shutil.rmtree(output_base_path, ignore_errors=True)
print (f"✅ Cleared {output_base_path} directory")

Step 1.5 Set the transform parameters

In this initial version of the notebook, non-default transform parameters must be explicitly set in the input task. In future versions, we expect to support describing these parameters using natural language as well. Note that only the input directory for the first transform and the output directory for the final transform are explicitly specified. All intermediate directories for the remaining transforms will be generated by the LLM, based on examples included in the input prompt (as shown later).

# Set transforms parameters
import json

def prepare_params(params: dict):
    params_json=json.dumps(params)
    # trim clurly braces
    return params_json[1:-1]
from data_processing.utils import GB, ParamsUtils
pdf2parquet_params_dict={"data_files_to_use": "['.pdf']", "input_folder":input_folder,  "pdf2parquet_contents_type": "application/json"}
doc_chunk_params_dict={}
doc_id_params_dict={"doc_id_hash_column": "chunk_hash", "doc_id_int_column": "chunk_id"}
ededup_params_dict={"ededup_doc_column": "contents", "ededup_doc_id_column": "chunk_hash"}
text_encoder_params_dict={"text_encoder_model_name": "sentence-transformers/all-MiniLM-L6-v2", "output_folder":output_folder}

Step 1.6: Configure access to LLMs

The Agent can work with various LLM providers. You'll need to configure access to one of the following:

Step 1.7: List DPK transforms

This section outputs the list of transform descriptions from their agentic framework implementation—either LlamaIndex or LangChain. This list will be included as part of the input to the LLM agent.

# List DPK transforms
if define_dpk_as_langchain_tools:
    from llm_utils.dpk.langchain_tools.agent_toolkit.toolkit import DataPrepKitToolkit

    toolkit = DataPrepKitToolkit()  
    tools = toolkit.get_tools()
    print("-- DPK tools: --")
    print(tools)
else:
    from llm_utils.dpk.llama_index_tools.llama_index_tools_dpk.llama_index_dpk.tools.dpk.base import DPKTransformsToolSpec

    dpk_spec = DPKTransformsToolSpec()
    tools = dpk_spec.to_tool_list()
    print("-- DPK tools: --")
    for t in tools:
        print(t.metadata.name)

Step 1.8 Define the prompt

In this cell, the prompt template is defined. The prompt includes:

  • A request to process the task.
  • Examples of valid pipeline specifications, emphasizing the requirement that each transform’s input must be the output of the previous transform.
  • Clear instructions on how to interpret the user's input.

Step 1.9 Invoke the agent and create the plan

In this cell, the LLM processes the input and, using its understanding of DPK transforms, generates a pipeline in the correct format, ensuring that the output of one transform becomes the input to the next.

# Invoke the agent to create the plan
from langchain.prompts import PromptTemplate
print(input)
agent = prompt_template | llm 
agent_step = ""
agent_step = agent.invoke(
            {
                "input": input,
                "tool_names": tool_names,
                "tools": tools_str,
            }
        )
   print(agent_step.content)

Next, the output plan is parsed.

Step 2: Execute the transforms pipeline

Once the pipeline is generated, the notebook dynamically invokes each transform using the LangChain or LlamaIndex tool interface:

# Execute the transfoms by calling their tool definition
def run_tool(match) -> str:
    def contains_parquet_files(dir_path):
      return any(file.endswith(".parquet") for file in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, file)))

    tool_name = match[0]
    tool_to_use = find_tool_by_name(tools, tool_name)
    tool_name = match[0]
    tool_input="{"+match[1]+"}"
    tool_input_dict = load_from_json(tool_input)
    print("=======================================================")
    print (f"🏃🏼 RUNNING {tool_name} with params: {tool_input_dict}")
    print("=======================================================")
    if define_dpk_as_langchain_tools:
        tool_result  = tool_to_use.run(tool_input_dict)
    else:
        tool_result  = tool_to_use.call(**tool_input_dict)
    if not contains_parquet_files(tool_input_dict["output_folder"]):
        out_dir=tool_input_dict["output_folder"]
        raise Exception (f"The {out_dir} directory is unexpectedly empty, indicating the job failed.")
    print (f"✅ {tool_result}")
        return tool_result

The last notebook cell includes code to inspect the generated output files. You will see a column called embeddings added at the end. This is the text content converted into vectors or embeddings as shown in the following figure. We used the model sentence-transformers/all-MiniLM-L6-v2.

embeddings in generated output files

Summary

In this tutorial, we explore two notebooks that showcase an experimental approach to generating DPK (Data Prep Kit) transform pipelines using agentic workflows. Although still in the early stages of development, this method shows promising potential to streamline data preparation and make it more accessible to a broader range of users, including those without deep technical expertise. Leveraging natural language input and LLM-powered planning opens the door to more intuitive and automated pipeline construction.