Kickstage Logo
Default post hero

Implementing a Custom RAG Pipeline for Technical Documents

Backend
Franz Philip Brandmüller
,

Technical Document Retrieval using Qdrant, PyMuPDF and DSPy

Many of our customers manage a large corpus of technical documents for machines, detailing operation procedures and extensive usage guidelines. These documents are critical for daily operations, but their length and complexity make it challenging and time-consuming to manually search for specific information.

Out-of-the-box RAG solutions frequently fall short in delivering the quality of results needed, as they are typically designed to handle a broad range of document types and use cases, often lacking specialized domain knowledge. Custom solutions that leverage open-source tools and are tailored to the specific needs of the domain usually offer significantly better results.

They often include images, diagrams and tables essential for understanding the full context. While text-based searches in vector databases can find relevant passages, they usually overlook the context provided by these visuals.

Introduction

In this article, we explore Qdrant for efficient storage and retrieval of embeddings, integrating gpt4o-mini for Object Character Recognition (OCR) to extract additional context from images and combine our ground truth data into DSPy as our Retrieval-Augmented Generation (RAG) pipeline.

Why Qdrant?

When developing real world applications, speed is crucial. By using the Hierarchical Navigable Small World (HNSW) indexing technique, Qdrant ensures rapid search times, even as the volume of data increases significantly. HNSW optimizes the search process through a multi-layered graph structure, reducing the time complexity of locating similar vectors. This allows Qdrant to scale efficiently to millions of vectors while maintaining swift query responses.

Qdrant further enhances performance through advanced methods like vector quantization, which minimize memory usage without compromising speed.

In fact, Qdrant achieves up to 15 times higher query throughput compared to pgvector, all while delivering high accuracy. These outcomes highlight why Qdrant has gained popularity among many organizations in their attempt to build efficient generative AI applications.

Now when it comes to retrieval systems, we also need to choose an embedding model that fits our task.

How do I choose the right embedding model?

A great place to start is the Massive Text Embedding Benchmark (MTEB) Leaderboard which provides an overview of embedding models, including both proprietary and open-source options.

The leaderboard ranks models based on their performance across a range of tasks, such as Reranking, Summarization and Retrieval. By selecting a model that performs well across these tasks, you can ensure that your embeddings are both accurate and versatile.

mteb_leaderboard

For our task, we have chosen the text-embedding-3-large model from OpenAI, which is well-suited for our document retrieval use case. It provides high-quality embeddings and ease of use through the Embeddings API.

Now let's see how all of this comes together.

Prerequisites

  • Installing Dependencies:
1# Databases
2pip install qdrant-client fastembed pymongo
3
4# PDF processing
5pip install pymupdf pymupdf4llm
6
7# Text embedding
8pip install openai tiktoken
9
10# Utilities
11pip install python-dotenv uuid6 tqdm


  • Qdrant Setup: To get started we set up the Qdrant Server. We can use the official Docker image provided by Qdrant.
  • MongoDB Setup: We set up MongoDB as our image storage.

We create the following docker-compose.yml:

1services:
2 qdrant:
3 container_name: qdrant
4 image: qdrant/qdrant:latest
5 ports:
6 - "6333:6333"
7 - "6334:6334"
8 env_file:
9 - .env
10 mongodb:
11 container_name: mongodb
12 image: mongo:latest
13 hostname: mongodb
14 ports:
15 - "27017:27017"
  • Environment Variables: We store the required environment variables in a .env file, like this:
1HOST=host.docker.internal
2QDRANT_PORT=6333
3MONGODB_PORT=27017
4OPENAI_API_KEY=<your openai api key>
  • Once your configuration is ready, we can start up both the Qdrant and MongoDB containers:
1docker-compose up -d --build
  • Setup MongoDB Class: We store our images separately in a MongoDB collection.

    A multi-modal approach, though helpful in capturing both text and images in a vector space, might lack the exact matching between the two. For simplicity, and ensuring contextual alignment we opted to store all images separately in a MongoDB collection, matching them both by their UUID.
1import os
2
3import pymongo
4from bson.json_util import dumps, loads
5from dotenv import load_dotenv
6from pymongo.errors import DuplicateKeyError
7
8load_dotenv()
9
10class MongoDBWrapper:
11 """Wrapper class for MongoDB Client."""
12
13 def __init__(self):
14 self.client = pymongo.MongoClient(
15 os.getenv("DOCKER_INTERNAL_HOST"), int(os.getenv("MONGODB_PORT"))
16 )
17 self.db_name = "my_pdfs"
18
19 self.db = self.client[self.db_name]
20 self.images_collection = self.db["images"]
21
22 # Create compound index for machine and page
23 self.images_collection.create_index({"machine": 1, "page": 1}, unique=True)
24
25 def upsert(
26 self,
27 uuid: str,
28 image: str,
29 machine: str,
30 page: int,
31 ) -> None:
32 """Upsert data in MongoDB."""
33 try:
34 self.images_collection.update_one(
35 {"id": uuid, "machine": machine, "page": page},
36 {"$set": {"image": image}},
37 upsert=True,
38 )
39 except DuplicateKeyError:
40 pass

Step-by-Step Guide Document Retrieval

  • Setup Chunk Class: We will use a simple Chunk class as a structured container to store the data extracted from PyMuPDF, organizing the elements we extract from PDF documents.
1class Chunk:
2 """Chunking Class for Qdrant to store data."""
3
4 def __init__(
5 self,
6 texts: Optional[list[str]] = [],
7 images: Optional[list[Image.Image]] = [],
8 path: Optional[str] = None,
9
10 ):
11 self.texts = texts
12 self.images = images
13 self.path = path
  • Qdrant Client Setup: We will initialize the Qdrant Client by the following class:
1import os
2
3from qdrant_client import QdrantClient
4
5# Load environment variables
6load_dotenv()
7
8class QdrantWrapper:
9 """Wrapper for Qdrant Client."""
10
11 def __init__(self):
12 self.url = (
13 f"http://{os.getenv('HOST')}:{os.getenv('QDRANT_PORT')}"
14 )
15 self.client = QdrantClient(url=self.url, timeout=10)
16 self.embedding_model_name = "text-embedding-3-large"
  • Document Preprocessing: We preprocess our document by chunking it into images and text using PyMuPDF and preparing the data for embedding:
1import re
2import logging
3
4import fitz
5from tqdm import tqdm
6from PIL import Image
7
8logging.basicConfig(
9 level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%d-%b-%y %H:%M:%S"
10)
11
12def chunk_document(
13 self,
14 file_path: str,
15 dpi: str = 600,
16 image_format: str = "png",
17 table_strategy: str = "lines",
18) -> list:
19 """
20 Extract text, tables and images from PDF file.
21
22 Args:
23 file_path (str): Path to the PDF file.
24 dpi (str): Resolution for image extraction (default: 600).
25 image_format (str): Format for extracted images (default: png).
26 table_strategy (str): Strategy for table extraction (default: lines).
27 """
28 chunks = []
29 doc = fitz.open(file_path)
30
31 try:
32 # Use PyMuPDF4LLM to read and convert PDF into markdown-like format
33 md_reader = pymupdf4llm.to_markdown(
34 doc,
35 page_chunks=True,
36 dpi=dpi,
37 image_format=image_format,
38 table_strategy=table_strategy,
39 )
40
41 # Iterate through pages and extract text, tables, and images
42 for i, page in tqdm(enumerate(doc), total=len(doc), desc=f"Scraping {file_path} with pymupdf4llm"):
43 page_data = md_reader[i]
44 text = page_data["text"]
45 cleaned_text = re.sub(r"\n{3,}", "\n\n", text).strip()
46
47 pix = page.get_pixmap()
48 img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
49 chunks.append(
50 Chunk([cleaned_text], [img], file_path)
51 )
52 except Exception as e:
53 logging.error(f"Failed to scrape {file_path} with pymupdf4llm. Falling back to PyMuPDF. Error: {e}")
54
55 # Fallback method using basic PyMuPDF
56 for i in tqdm(range(len(doc)), desc=f"Scraping {file_path} with PyMuPDF"):
57 page = doc[i]
58 text = page.get_text()
59 cleaned_text = re.sub(r"\n{3,}", "\n\n", text).strip()
60
61 pix = page.get_pixmap()
62 img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
63 chunks.append(
64 Chunk([cleaned_text], [img], file_path)
65 )
66 finally:
67 doc.close()
68 return chunks


1import io
2import os
3
4import tiktoken
5
6def preprocess_chunks(self, chunks: list) -> tuple[list[str], list[str], list[bytes]]:
7 """Preprocesses data chunks."""
8
9 # Extract texts and images from chunks
10 texts = [chunk.texts[0].strip().replace("\n \n", "").replace("\n", " ") for chunk in chunks]
11 images = [chunk.images[0] for chunk in chunks]
12
13 def _convert_image_to_binary(image) -> bytes:
14 """Convert an image to binary data (PNG format)."""
15 buffer = io.BytesIO()
16 image.save(buffer, format="PNG")
17 binary_string = buffer.getvalue()
18 buffer.close()
19 return binary_string
20
21 def _num_tokens_from_string(string: str) -> int:
22 """Calculate and validate the number of tokens in a text string."""
23 encoding = tiktoken.get_encoding(
24 tiktoken.encoding_for_model(self.embedding_model_name).name
25 )
26 num_tokens = len(encoding.encode(string))
27 if num_tokens > 8192:
28 raise ValueError(f"Input is too long ({num_tokens} > 8192)")
29 return num_tokens
30
31 def _sanitize(text: str) -> str:
32 """Sanitize and return non-empty text."""
33 if not text:
34 return " "
35 return text
36
37 # Sanitize and validate texts
38 sanitized_texts = []
39 for text in texts:
40 _num_tokens_from_string(text)
41 sanitized_text = _sanitize(text)
42
43 if " \n" == sanitized_text:
44 sanitized_text = "Empty string \n"
45
46 sanitized_texts.append(sanitized_text)
47
48 # Convert images to binary format
49 binary_images = [_convert_image_to_binary(image) for image in images]
50 return sanitized_texts, binary_images
  • Embedding: We embed the preprocessed texts using OpenAI's text-embedding-3-large embedding model:
1import openai
2from dotenv import load_dotenv
3from qdrant_client.models import PointStruct
4from uuid6 import uuid7
5
6load_dotenv()
7
8openai_client = openai.Client(api_key=os.getenv("OPENAI_API_KEY"))
9
10def embed_and_upload(self, texts: list, images: list, machine: str) -> list[PointStruct]:
11 """Embeds the preprocessed texts and uploads images to MongoDB."""
12 # Initialize MongoDB wrapper
13 mongodb = MongoDBWrapper()
14
15 # Create embeddings using OpenAI's API
16 embeddings_response = openai_client.embeddings.create(
17 input=texts, model=self.embedding_model_name
18 )
19
20 points = []
21 for idx, (data, text, image) in enumerate(zip(embeddings_response.data, texts, images)):
22 # Generate a UUID for each point
23 uuid_str = uuid7().hex
24
25 # Calculate the page number
26 page = idx + 1
27
28 # Create a point structure for Qdrant
29 point = PointStruct(
30 id=uuid_str,
31 vector=data.embedding,
32 payload={"id": uuid_str, "page": page, "machine": machine, "text": text},
33 )
34 points.append(point)
35
36 # Upsert image data into MongoDB
37 mongodb.upsert(uuid=uuid_str, image_data=str(image), machine=machine, page=page)
38 return points
  • Upload to Qdrant: For the vector parameters we use size=3072 and distance=Distance.COSINE. To avoid timeout errors, we chunk the embeddings into a maximum of 200 points per request:
1from qdrant_client.models import Distance, VectorParams
2
3from qdrant_wrapper import QdrantWrapper
4
5# Initialize Qdrant wrapper
6qdrant = QdrantWrapper()
7collection_name = "pdf-manuals"
8
9# Create Qdrant collection
10qdrant.client.create_collection(
11 collection_name=collection_name,
12 vectors_config=VectorParams(size=3072, distance=Distance.COSINE)
13)
14
15# Get chunks
16chunks = qdrant.chunk_document(file_path="MLC.pdf")
17
18# Preprocess chunks
19texts, images = qdrant.preprocess_chunks(chunks)
20
21# Embed and upload embeddings
22points = qdrant.embed_and_upload(texts, images, machine="MLC")
23
24# Upload embeddings to Qdrant
25if len(points) >= 200:
26 list_of_points = [points[i : i + 200] for i in range(0, len(points), 200)]
27
28 for point in list_of_points:
29 qdrant.client.upsert(collection_name, point)
30else:
31 qdrant.client.upsert(collection_name, points)
  • Search in Qdrant: We want to implement a query filter to match the exact machine we're searching for. By applying a must filter, we ensure that only relevant technical documents are included, effectively preventing any overlap with unrelated PDF documents:
1from qdrant_client import models
2
3def search(
4 self,
5 collection_name: str,
6 query: str,
7 machine: str,
8 limit: int = 10,
9) -> list:
10 """Search in Qdrant collection."""
11 search_result = self.client.search(
12 collection_name=collection_name,
13 query_filter=models.Filter(
14 must=[
15 models.FieldCondition(
16 key="machine",
17 match=models.MatchValue(value=machine),
18 )
19 ]
20 ),
21 query_vector=openai_client.embeddings.create(
22 input=[query], model=self.embedding_model_name
23 )
24 .data[0]
25 .embedding,
26 limit=limit,
27 )
28 return search_result
29
30# Initialize Qdrant wrapper
31qdrant = QdrantWrapper()
32
33# Search example
34search_result = qdrant.search(
35 collection_name="pdf-manuals",
36 query="The signal lamp of my MLC machine is blinking green. What does that mean?",
37 machine="MLC"
38)
39top_result = search_result[0]
  • Output: We get the top result from the search:
1image = '<PIL.PngImagePlugin.PngImageFile image mode=RGB size=596x842 at 0x7FE5368C3E20>'
2machine = 'MLC'
3page = 4
4passage = '**1.1.1** **Schlüsselberechtigung** |Farbe|Anwender|Berechtigung / Funktion| |---|---|---| |grau (kein Schlüssel)|Weber (weaver)|Maschine in Produktion halten (minimale Geräteeinstellungen vornehmen, Produkti- onseinstellungen1) ansehen)| |blau|Einrichter (fitter)|Maschine mechanisch und textiltechnisch einrichten und verwalten (Geräte einstellen und Produktionseinstellungen1) vornehmen, Update und Diagnose durchführen, Zusatz- informationen ansehen)| |gelb|Vorgesetzter (supervisior)|Maschine statistisch und netzwerktechnisch einrichten und verwalten (Zeit und Schich- ten konfigurieren, Netzwerkeinstellungen vornehmen)| 1) Produktionseinstellungen = Auftrag, Artikel, Muster **1.2** **Signallampe** Die Maschine verfügt über folgende Störungsanzeigen: Multifunktions-Signalleuchte Störungsmeldungen MÜDATA-Display **RGB Multifunktions-Signalleuchte** Lampe leuchtet Lampe blink |Farbe|Muster|Fehler| |---|---|---| |dunkel||Maschine läuft| |weiss||System bereit, Stop| |weiss||Initialisierung läuft| |rot||No...'
5query = 'The signal lamp of my MLC machine is blinking green. What does that mean?'
6score = 0.46674937

Image to Text

To improve the retrieval accuracy, we could use OpenAI's vision capabilities to extract addional context from images.

Let's say we have the following table, where we want to query for specific information:

Our uploaded text-embeddings does not include the symbols (Muster) from the second column as context. Which makes it impossible to get an accurate result when we later feed the query to an LLM.

  • OpenAI Vision: We will use gpt-4o-mini to reformat this image to text as an OCR task:
1import io
2import os
3import base64
4import requests
5from PIL import Image
6from dotenv import load_dotenv
7
8load_dotenv()
9
10class GPT:
11 """Wrapper class to interact with OpenAI API Vision."""
12
13 def __call__(self, image: Image.Image):
14 """Analyze image with OpenAI API."""
15 buffered = io.BytesIO()
16 image.save(buffered, format="PNG")
17 image_bytes = buffered.getvalue()
18
19 # Encode image to base64
20 base64_image = base64.b64encode(image_bytes).decode("utf-8")
21
22 headers = {
23 "Content-Type": "application/json",
24 "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
25 }
26
27 payload = {
28 "model": "gpt-4o-mini",
29 "messages": [
30 {
31 "role": "user",
32 "content": [
33 {
34 "type": "text",
35 "text": "Format this image precisely to text. Be very particular with your formatting.",
36 },
37 {
38 "type": "image_url",
39 "image_url": {
40 "url": f"data:image/jpeg;base64,{base64_image}"
41 },
42 },
43 ],
44 },
45 ],
46 "max_tokens": 400,
47 "top_p": 0.1
48 }
49
50 response = requests.post(
51 "https://api.openai.com/v1/chat/completions", headers=headers, json=payload
52 )
53 response.raise_for_status()
54 return response.json().get("choices")[0].get("message").get("content")
55
56query = 'The signal lamp of my MLC machine is blinking green. What does that mean?'
57openai_vision = GPT()
58image_context = openai_vision(query, top_result.image)
  • Output: We did not got a perfect result, but we were able to improve the quality of the context as input to an LLM:
1**RGB Multifunktions-Signalleuchte**
2
3⊙ Lampe leuchtet
4⊗ Lampe blinkt
5
6| Farbe | Muster | Fehler |
7|-------|--------|--------|
8| dunkel | Maschine läuft | |
9| weiss | System bereit, Stop | |
10| weiss | Initialisierung läuft | |
11| rot | Not-HALT | |
12| rot | Webstellanabdeckung geöffnet | |
13| blau | Auftragsende | |
14| grün | Schussfadenbruch | |
15| grün | Kettfadenbruch / Scheibeltstopp | |
16| pink | Handling Mode / Webstellanabdeckung geschlossen | |
17| pink | Handling Mode und Webstellanabdeckung geöffnet | |
18| gelb | Hilfsfadenbruch | |
19| türkis | Aufwickelsicherung |
  • DSPy Module: We can use dspy to feed all the relevant information to gpt-4o-mini.

Let's first define a class to store the relevant information.

1import ast
2import io
3from typing import Optional
4
5from PIL import Image
6from qdrant_client.http.exceptions import UnexpectedResponse
7
8from qdrant_wrapper import QdrantWrapper
9from mongodb_wrapper import MongoDBWrapper
10
11class Response:
12 """Response object for Qdrant search."""
13
14 def __init__(
15 self,
16 query: str,
17 machine_type: str,
18 passage: Optional[str] = None,
19 image: Optional[Image.Image] = None,
20 page: Optional[int] = None,
21 score: Optional[float] = None,
22 ):
23 self.query = query
24 self.machine_type = machine_type
25 self.passage = passage
26 self.image = image
27 self.page = page
28 self.score = score
29
30 def process_query(self, k: int) -> list["QdrantResponse"]:
31 """Process the query and return the top k results."""
32 qdrant = QdrantWrapper()
33 mongodb = MongoDBWrapper()
34
35 # Vector search and show results
36 try:
37 search_results = qdrant.search(
38 "pdf-manuals", self.query, self.machine_type
39 )
40 except UnexpectedResponse as e:
41 raise ValueError(f"Something went wrong with the Qdrant API: {e}")
42
43 top_results = []
44 for result in search_results[:k]:
45 payload = result.payload
46 passage = payload["text"]
47 page = payload["page"]
48
49 # Get image with the same UUID
50 collection_item = mongodb.get(payload["id"])
51
52 byte_data = ast.literal_eval(collection_item["image"])
53 image = Image.open(io.BytesIO(byte_data)) if byte_data else None
54
55 top_results.append(
56 Response(
57 self.query,
58 self.machine_type,
59 passage,
60 image,
61 page,
62 result.score,
63 )
64 )
65 return top_results

Let's define our retriever model, the input and output schema and build out the RAG pipeline.

1from typing import Optional
2
3import dspy
4from pydantic import BaseModel, Field
5
6from gpt_wrapper import GPT
7from qdrant_wrapper import Response
8
9mini = dspy.OpenAI(model="gpt-4o-mini", max_tokens=500)
10dspy.configure(lm=mini, trace=["Test"])
11
12
13class QdrantRMClient(dspy.Retrieve):
14 """Custom Qdrant Retrieval Model Client."""
15
16 def __init__(self, machine_type: str, k: int = 3) -> None:
17 super().__init__(k=k)
18 self.machine_type = machine_type
19
20 def forward(self, query: str, k: int) -> Response:
21 """Forward pass of the QdrantRMClient."""
22 k = k if k else self.k
23 response = Response(query, self.machine_type).process_query(k)
24 return dspy.Prediction(passages=response)
25
26
27class Input(BaseModel):
28 """Input Schema for the RAG model."""
29
30 context: str = Field(description="May contain relevant facts")
31 question: str = Field()
32 image_context: Optional[str] = Field(description="May contain additional facts")
33
34
35class Output(BaseModel):
36 """Output Schema for the RAG model."""
37
38 answer: str = Field(description="The answer for the question")
39 pdf_path: str = Field()
40 image: bytes = Field()
41 page: str = Field()
42 machine_type: str = Field()
43
44
45class QASignature(dspy.Signature):
46 """Answer the question based on the context provided."""
47
48 input: Input = dspy.InputField()
49 output: Output = dspy.OutputField()
50
51
52class RAG(dspy.Module):
53 """PDF Manuals RAG Pipeline."""
54
55 def __init__(self, machine_type: str, max_hops: int = 3) -> None:
56 super().__init__()
57
58 self.predictor = dspy.TypedChainOfThought(QASignature)
59 self.qdrant_retrieve = QdrantRMClient(machine_type)
60 self.openai_vision_retrieve = GPT()
61 self.max_hops = max_hops
62
63 def forward(self, question: str) -> dspy.Prediction:
64 """Forward pass of the RAG model."""
65 retrieval = retrieval[0] # Get the top k passage
66 buffered = io.BytesIO()
67 retrieval.image.save(buffered, format="PNG")
68 image_bytes = buffered.getvalue()
69
70 input_fields = Input(
71 context=retrieval.passage,
72 question=question,
73 image_context=self.openai_vision_retrieve(retrieval.image),
74 )
75 prediction = self.predictor(input=input_fields)
76 return dspy.Prediction(
77 answer=prediction.output.answer,
78 pdf_path=retrieval.path,
79 image=image_bytes,
80 page=retrieval.page,
81 machine_type=retrieval.machine_type,
82 )
83
84if __name__ == "__main__":
85 machine_type = "MLC"
86 question = "The signal lamp of my MLC machine is blinking green. What does that mean?"
87 rag = RAG(machine_type)
88 pred = rag.forward(question)
89 print(pred.answer)


LLM Output

1'The blinking green signal lamp on your MLC machine indicates a shot thread break or a warp thread break/separation sheet stop.'

Limitations

OCR tools, including those used by language models, sometimes struggle with extracting non-standard characters, small icons, or special symbols (like circles in this case). If the symbols are rendered in a smaller or less clear font, OCR may fail to recognize them. The accuracy largely depends on the complexity and quality of the image you are analyzing, and how well the image is preprocessed if needed.

Conclusion

In this article, we have demonstrated how to build a technical document retrieval system using Qdrant for retrieval of text-based embeddings, OpenAI's gpt-4o-mini for OCR, PyMuPDF for PDF extraction, and DSPy for retrieving and augmenting the passages with context into a unified pipeline.

Now if we want to further enhance the retrieval of our documents, we could train a subset of our data for similarity search utilizing Quaterion, a framework designed for fine-tuning similarity learning models. This would allow us to better align our embeddings with the specific nuances of our dataset.

If you want to learn more, check out Quaterion's Quick Start Guide.


READ NEXT ARTICLES

Let’s Collaborate

Need to plan, create, or redefine a digital project so you can concentrate on your core business? We can help! Pop in for coffee at our office or simply reach us using the details below.

Office

Radnička cesta 47, 10000, Zagreb, Croatia

Contact

Valentin Topolovec

Social

Valentin Topolovec

Valentin Topolovec

Project Manager