Unlock the Power of Your Data with RAG using Vertex AI and Elasticsearch
The world is drowning in data, but are we truly harnessing its potential? That's where Retrieval Augmented Generation (RAG) comes in, revolutionizing how we interact with information. This two-part blog series will equip you with the tools to transform raw data into actionable insights using the powerful combination of Elasticsearch and RAG.
Part 1 will guide you through efficiently ingesting your data into Elasticsearch, setting the stage for a robust and scalable knowledge base that provides the relevant context on private, domain specific and up-to-date customer data to AI models. Part 2 dives into the exciting world of RAG pipelines, demonstrating how to build a question-answering system using cutting-edge tools like Vertex AI and Langchain. Get ready to unlock the power of your data and take your applications to the next level!
What is RAG?
RAG pipelines leverage the power of retrieval and generative AI models to generate insightful responses based on a vast knowledge base. They work by:
- Retrieving Relevant Information: Using an embedding model, RAG pipelines analyze your query and find relevant documents from a curated dataset.
- Generating Concise Answers: A language model like Gemini Pro then utilizes the retrieved information to craft a comprehensive and informative answer.
Elastic and Google Cloud: a perfect match for enterprise GenAI
Combining Elastic and Google Cloud technologies for Generative AI significantly enhances AI-driven applications. Elastic's vector and hybrid search capabilities offer advanced, scalable solutions for managing, searching, and analyzing large datasets. The platform efficiently handles both structured and unstructured data, making it a perfect fit for GenAI models. Elastic's semantic search facilitates deeper contextual understanding, boosting the accuracy and relevance of AI-generated responses. These capabilities are important for use cases where specific knowledge is key – such as personalized recommendations, intelligent content and product search, conversations relying on users’ behavior and profile, and many more.
Not all vector databases are equal. While many claim to offer advanced vector features, most are limited to merely storing and, in limited ways, searching vectors. Most offer minimal customization, and often lack core requirements to securely adopt them in real-life production use-cases beyond a proof of concept. Elastic stands out by offering a robust, customizable vector search platform that scales for complex scenarios and provides plug-and-play options to reduce go-to-market timelines. With Elastic, you can create embeddings at ingest or query time, use built-in transformer models, integrate VertexAI models, or build your own. Elastic’s enterprise-grade security features ensure meticulous control of data access, supporting Google’s Responsible AI principles.
These capabilities align well with Google Cloud’s Gemini LLM and Vertex, which excel in multimodal interactions across text, images, video, and more. Gemini's ability to handle diverse data formats complements Elastic's versatile vector searches. This enables innovative applications, like a fashion AI assistant for a retail website that understands and interacts with both product descriptions and visual content.
An example Ingestion Pipeline
This blog post explores the creation of a RAG pipeline specifically designed for extracting data from images. We’ll be using Google Cloud Platform resources for this endeavor, providing a scalable and robust solution.
1. Setting Up Your Environment
First, we need to assemble the essential tools for our RAG pipeline. We will use Google Gemini Pro as our generative AI model, leveraging the Vertex AI platform for its powerful capabilities. Elasticsearch will serve as our vector database, enabling efficient storage and retrieval of image data.
!pip install --upgrade google-cloud-aiplatform elasticsearch
# # Automatically restart kernel after installs so that your environment can access the new packages
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
import vertexai
from vertexai.language_models import textEmbeddingModel
from vertexai.generative_models import GenerativeModel, Part, FinishReason
import vertexai.preview.generative_models as generative_models
import base64
import json
import os
import time
#Elastic Search
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError
Establishing the environment variables and authenticating
# GCP Parameters
PROJECT = "your-project" #@param {type:"string"}
LOCATION = "us-central1" #@param {type:"string"}
vertexai.init(project=PROJECT, location=LOCATION)
model = GenerativeModel("gemini-1.5-pro-002")
#ES Parameters
ES_URL = "your-elastic-cluster-url" #@param {type:"string"}
ES_INDEX_NAME = "your-index-name" #@param {type:"string"}
ES_API_KEY = "your-api-key" #@param {type:"string"}
THRESHOLD = 0.86 #@param {type:"number"}
from google.colab import auth as google_auth
google_auth.authenticate_user()
2. Image processing
The code starts by defining the directory containing the catalog image files and initializing a counter. It then sorts the files in the directory and iterates through each file, checking if it ends with ".jpg".
For each image file, the code loads the image using get_image, extracts information using the generate function, converts the response to a JSON object, and appends the extracted data (including the filename for reference) to the items list.
To prevent potential API throttling, the code pauses for 30 seconds every 5 processed items. Lastly, it moves the processed image to a "processed" directory and updates the counter.
The process continues until all ".jpg" files in the directory are processed. After completion, the code prints the total number of processed items, providing a count of the extracted information. This process demonstrates a real-world application of the previously discussed functions, showing how to apply the RAG pipeline to a collection of images to extract valuable data in a systematic and organized way.
def get_image(jpg_filepath):
"""Opens a JPG file, encodes it in base64, and returns a MIMEImage object.
Args:
jpg_filepath (str): The path to the JPG file.
Returns:
MIMEImage: An email MIMEImage object representing the JPG image,
ready to be used with Part.from_data
"""
with open(jpg_filepath, "rb") as image_file:
image_data = image_file.read() # Read the binary data of the image
image = Part.from_data(
mime_type="image/jpeg",
data=image_data)
return image
!mkdir processed
items = []
directory = "catalog_pages" # Replace with the actual directory path
counter = 0
files = os.listdir(directory)
files.sort()
for filename in files:
#Pause every 5 items to prevent throttling
if counter % 5 == 0:
time.sleep(30)
if filename.endswith(".jpg"):
print(filename)
image = get_image(f"{directory}/{filename}")
result = generate(image)
json_result = json.loads(result.text)
for item in json_result:
item['catalog_page'] = filename
items.append(item)
#Move processed files
!mv {directory}/{filename} processed
counter = counter+1
print(f"Processed {len(items)} items")
To process images and extract meaningful data, we create a function called generate that takes an image as input. This function acts as the core of our image data extraction process, leveraging the power of Google Gemini Pro to analyze the image and return structured information.
The generate function uses a carefully crafted prompt to guide Gemini Pro in its analysis. This prompt, defined in the text1 variable, instructs the model to act as an “expert musical instrument technician” with a specific task: extracting descriptive details about each product depicted in an image.
prompt_pt1 = """You are an expert musical instrument technician your job is to extract for every page a description of the product depicted.
To achieve your task, carefully follow the instructions.
<format>
[{
\"item_name\": ,
\"item_type\": ,
\"color\": ,
\"model\": ,
\"description_from_text\": ,
\"description_from_vision\": ,
\"fretboard_options\":[] ,
\"color_options\": [],
\"part_number\": ONLY IF AVAILABLE
},
{
\"item_name\": ,
\"item_type\": ,
\"color\": ,
\"model\": ,
\"description_from_text\": ,
\"description_from_vision\": ,
\"fretboard_options\":[] ,
\"color_options\": [],
\"part_number\": ONLY IF AVAILABLE
}]
</format>
<catalog_page>"""
prompt_pt2="""
</catalog_page>
<instructions>
1. Carefully analyze every page of the catalog, you will create a single entry per item presented.
2. Look at the item image and carefully read the text.
3. Always think step by step, and NEVER make up information.
4. Return results as well format JSONL.
</instructions>
output:
"""
The prompt provides a step-by-step guide for the model:
- Analyze the image: The model is instructed to carefully analyze each image, identifying the individual product presented.
- Extract information: The model should extract information from both the image itself and any accompanying text (if available).
- Format the output: The output should be formatted in a well-structured JSON format, ensuring clarity and consistency.
- Think critically: The model is encouraged to think step by step and avoid making assumptions or fabricating information.
The text1 prompt also includes a specific format for the JSON output, defining the fields that should be included:
{
"item_name": ,
"item_type": ,
"color": ,
"model": ,
"description_from_text": ,
"description_from_vision": ,
"fretboard_options":[] ,
"color_options": [],
"part_number": ONLY IF AVAILABLE
}
This structured format ensures that the extracted information is consistent and easily usable for further analysis or integration with other systems.
The function then uses the model.generate_content method to pass the image, prompt, and configuration settings to Gemini Pro. This method instructs the model to process the image, apply the provided instructions, and generate the structured JSON output.
Finally, the generate function returns the generated response, providing the extracted information about the image in a readily accessible format.
def generate(image):
responses = model.generate_content(
[prompt_pt1, image, prompt_pt2],
generation_config=generation_config,
safety_settings=safety_settings,
stream=False,
)
if(responses.candidates[0].finish_reason.value == 1):
if responses.candidates[0].content.parts[0].text.startswith("```json") and responses.candidates[0].content.parts[0].text.endswith("```"):
# Slice to remove the markers
return responses.candidates[0].content.parts[0].text[7:-3].strip() # Remove 7 chars at start, 3 at end, strip whitespace
else:
# Return original string if markers are missing
return(responses.candidates[0].content.parts[0].text)
else:
return(f"Content has been blocked for {responses.candidates[0].finish_reason.name} reasons.")
All the files in the dataset are processed through this code snippet that iterates through a directory of image files, processes each image to extract data using the generate function, and stores the extracted information in a list called items. It also moves processed images to a separate directory.
!mkdir processed
items = []
directory = "catalog_pages" # Replace with the actual directory path
counter = 0
files = os.listdir(directory)
files.sort()
for filename in files:
#Pause every 5 items to prevent throttling
if counter % 5 == 0:
time.sleep(30)
if filename.endswith(".jpg"):
print(filename)
image = get_image(f"{directory}/{filename}")
result = generate(image)
print(result)
json_result = json.loads(result)
for item in json_result:
item['catalog_page'] = filename
items.append(item)
#Move processed files
!mv {directory}/{filename} processed
counter = counter+1
print(f"Processed {len(items)} items")
3. Generating Embeddings and storing
Once the generate function has successfully extracted data from an image and returned it in a structured JSON format, this text is then processed to create embeddings. These embeddings, which are numerical representations of the extracted text, allow us to represent the meaning of the data in a way that can be easily compared and searched.
#Split items into text and metadata
embeddings = []
metadatas = []
for item in items:
item_text=(item['description_from_text']+"\n"+item['description_from_vision'])
embeddings.append(text_embedding(item_text))
metadatas.append(item)
print(len(embeddings))
To generate these embeddings, we utilize the text_embedding function, which leverages the Vertex AI text-embedding-004 model. This model takes the extracted text as input and transforms it into a vector of numerical values. The resulting embedding captures the semantic meaning of the extracted text, enabling us to search for similar content based on its meaning rather than just its literal form.
def text_embedding(query):
"""text embedding with a Large Language Model."""
model = textEmbeddingModel.from_pretrained("text-embedding-004")
#Working with a single
embeddings = model.get_embeddings([query])
return embeddings[0].values
With the embeddings generated, the next step is to ingest them into the Elasticsearch index. This process is handled by the store_embeddings_with_metadata function. This function takes the generated embeddings, along with associated metadata (like the original image name or other relevant information), and adds them to the Elastic Search database.
The store_embeddings_with_metadata function ensures that each embedding is stored with its corresponding metadata, allowing us to retrieve both the embedding and its associated information when performing searches. This metadata is crucial for understanding the context of each embedding and for providing more relevant results during searches.
def store_embeddings_with_metadata(
client,
index_name,
embeddings,
metadatas,
vector_query_field,
):
"""
Stores embeddings and metadata in an Elastic search cluster.
Args:
client: Elastic search client instance.
index_name: Name of the Elastic search index.
embeddings: List of embeddings to store.
metadatas: List of metadata dictionaries corresponding to the embeddings.
vector_query_field: Name of the field to store the embedding vectors in.
refresh_indices: Whether to refresh the index after adding documents.
"""
try:
from elasticsearch.helpers import bulk
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
"Please install it with `pip install elasticsearch`."
)
requests = []
for embedding, metadata in zip(embeddings, metadatas):
request = {
"_op_type": "index",
"_index": index_name,
vector_query_field: embedding,
"metadata": metadata,
}
requests.append(request)
if len(requests) > 0:
try:
success, failed = bulk(
client, requests, stats_only=True, refresh=True
)
print(f"Added {success} and failed to add {failed} documents to index")
except BulkIndexError as e:
print(f"Error adding documents: {e}")
raise e
else:
print("No documents to add to index")
By storing the embeddings and metadata in Elasticsearch, we leverage a powerful vector database that enables efficient search and retrieval of image-related information based on semantic similarity. This enables us to find relevant images based on the meaning of the extracted data, opening up possibilities for advanced image search and retrieval capabilities.
Conclusion
In the first part of this blog series we've explored how to extract valuable data from images and to store them in an Elasticsearch index. Next time, we'll discuss how to put together a sample application to interact with this data.
Curious to learn more about the power of Elastic and Google Cloud for your GenAI applications? Explore cutting-edge integrations and unlock the full potential of your data by visiting the Elastic Search Labs at https://www.elastic.co/search-labs/integrations/google.
Ready to try this out on your own? Start a free trial or use this self-paced hands-on learning for Search AI.
Elasticsearch has integrations for tools from LangChain, Cohere and more. Join our advanced semantic search webinar to build your next GenAI app!