Tuesday, September 3, 2024

Building a Proof-of-Concept RAG System in an Afternoon

Abdellatif Abdelfattah

Building a Proof-of-Concept RAG System in an Afternoon

Retrieval-Augmented Generation (RAG) has transformed how organizations leverage their internal knowledge with AI. By connecting language models to your documents, RAG enables accurate, contextual answers grounded in your specific information.

While production RAG systems can be complex, building a proof-of-concept (POC) to demonstrate value doesn't have to be. This guide will walk you through creating a functional RAG system in just a few hours.

What You'll Need

  • Python 3.8+
  • OpenAI API key (or another LLM provider)
  • A collection of documents (PDFs, text files, etc.)
  • Basic Python knowledge

The High-Level Architecture

Our RAG system will consist of four main components:

  1. Document loader: Imports and processes documents
  2. Text chunker: Splits documents into manageable pieces
  3. Embedding engine: Converts text chunks into vector representations
  4. Retriever + Generator: Finds relevant chunks and generates answers

Step 1: Set Up Your Environment

First, create a new directory and set up a virtual environment:

mkdir rag-poc
cd rag-poc
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

Install the necessary packages:

pip install openai pypdf langchain chromadb tiktoken

Step 2: Create a Configuration File

Create a config.py file to store your settings:

# config.py
import os

# API Keys
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")

# Paths
DATA_PATH = "data/"
DB_PATH = "db/"

# LLM Settings
LLM_MODEL = "gpt-3.5-turbo"
EMBEDDING_MODEL = "text-embedding-3-small"

# Chunking Settings
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

Step 3: Build the Document Loader

Now let's create a script to load and process PDF documents:

# document_loader.py
import os
from config import DATA_PATH
import pypdf

def load_documents(directory=DATA_PATH):
    """Load all PDF documents from the specified directory"""
    documents = []
    
    # Ensure directory exists
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"Created directory {directory} - please add PDF files to it")
        return documents
    
    # Load each PDF file
    for filename in os.listdir(directory):
        if filename.endswith('.pdf'):
            filepath = os.path.join(directory, filename)
            print(f"Loading {filepath}")
            
            try:
                pdf = pypdf.PdfReader(filepath)
                
                # Extract text from each page
                text = ""
                for page in pdf.pages:
                    text += page.extract_text()
                
                # Add document to collection
                documents.append({
                    "filename": filename,
                    "text": text
                })
                print(f"  Successfully loaded {filename} ({len(text)} characters)")
            except Exception as e:
                print(f"  Error loading {filename}: {e}")
    
    print(f"Loaded {len(documents)} documents")
    return documents

Step 4: Implement Text Chunking

Next, create a text chunker to split documents into smaller pieces:

# text_chunker.py
from config import CHUNK_SIZE, CHUNK_OVERLAP

def chunk_document(document, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP):
    """Split a document into overlapping chunks of text"""
    text = document["text"]
    chunks = []
    
    # Split text into paragraphs
    paragraphs = text.split('\n\n')
    
    current_chunk = ""
    
    for paragraph in paragraphs:
        # If adding this paragraph would exceed chunk size and we already have content,
        # save the current chunk and start a new one with overlap
        if current_chunk and len(current_chunk) + len(paragraph) > chunk_size:
            chunks.append({
                "text": current_chunk,
                "source": document["filename"]
            })
            
            # Find a good breaking point for the overlap
            words = current_chunk.split(' ')
            if len(words) > chunk_overlap:
                current_chunk = ' '.join(words[-chunk_overlap:])
            else:
                current_chunk = ""
        
        # Add the paragraph to the current chunk
        if current_chunk:
            current_chunk += "\n\n" + paragraph
        else:
            current_chunk = paragraph
    
    # Add the final chunk if it contains content
    if current_chunk:
        chunks.append({
            "text": current_chunk,
            "source": document["filename"]
        })
    
    return chunks

def chunk_documents(documents):
    """Process a list of documents into chunks"""
    all_chunks = []
    
    for document in documents:
        document_chunks = chunk_document(document)
        all_chunks.extend(document_chunks)
        print(f"Created {len(document_chunks)} chunks from {document['filename']}")
    
    return all_chunks

Step 5: Set Up the Vector Store

Now we need to create a vector store for our document chunks:

# vectorstore.py
import os
import chromadb
from openai import OpenAI
from config import OPENAI_API_KEY, DB_PATH, EMBEDDING_MODEL

# Initialize OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

def get_embedding(text):
    """Generate an embedding for the given text"""
    response = client.embeddings.create(
        model=EMBEDDING_MODEL,
        input=text
    )
    return response.data[0].embedding

def create_vectorstore(chunks):
    """Create a vector store from document chunks"""
    # Ensure the DB directory exists
    if not os.path.exists(DB_PATH):
        os.makedirs(DB_PATH)
    
    # Initialize ChromaDB
    chroma_client = chromadb.PersistentClient(path=DB_PATH)
    
    # Create or get collection
    try:
        collection = chroma_client.get_or_create_collection("documents")
        print("Connected to existing collection")
    except:
        collection = chroma_client.create_collection("documents")
        print("Created new collection")
    
    # Add documents to the collection
    documents = [chunk["text"] for chunk in chunks]
    metadatas = [{"source": chunk["source"]} for chunk in chunks]
    ids = [f"chunk_{i}" for i in range(len(chunks))]
    
    # Generate embeddings (batching to avoid overwhelming the API)
    batch_size = 100
    for i in range(0, len(documents), batch_size):
        end = min(i + batch_size, len(documents))
        batch_docs = documents[i:end]
        batch_metadatas = metadatas[i:end]
        batch_ids = ids[i:end]
        
        # Generate embeddings for this batch
        embeddings = [get_embedding(doc) for doc in batch_docs]
        
        # Add to collection
        collection.add(
            embeddings=embeddings,
            documents=batch_docs,
            metadatas=batch_metadatas,
            ids=batch_ids
        )
        
        print(f"Added {len(batch_docs)} chunks to vector store (batch {i//batch_size + 1})")
    
    return collection

def query_vectorstore(query_text, n_results=5):
    """Query the vector store for similar documents"""
    # Initialize ChromaDB
    chroma_client = chromadb.PersistentClient(path=DB_PATH)
    
    # Get collection
    collection = chroma_client.get_collection("documents")
    
    # Generate embedding for query
    query_embedding = get_embedding(query_text)
    
    # Query the collection
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results
    )
    
    return results

Step 6: Create the RAG System

Now, let's connect everything to create our RAG system:

# rag_system.py
from openai import OpenAI
from config import OPENAI_API_KEY, LLM_MODEL
from document_loader import load_documents
from text_chunker import chunk_documents
from vectorstore import create_vectorstore, query_vectorstore

# Initialize OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

def initialize_system():
    """Initialize the RAG system by loading and processing documents"""
    print("Initializing RAG system...")
    
    # Load documents
    documents = load_documents()
    if not documents:
        print("No documents found. Please add PDF files to the data directory.")
        return False
    
    # Chunk documents
    chunks = chunk_documents(documents)
    
    # Create vector store
    create_vectorstore(chunks)
    
    print("RAG system initialized successfully!")
    return True

def answer_question(question, n_docs=3):
    """Answer a question using RAG"""
    # Query the vector store
    results = query_vectorstore(question, n_results=n_docs)
    
    # Extract the retrieved documents
    contexts = results["documents"][0]
    sources = [result["source"] for result in results["metadatas"][0]]
    
    # Build the prompt with context
    prompt = f"""Answer the following question based on the provided context. 
If the context doesn't contain enough information to provide a complete answer, 
say so rather than making up information.

Context:
{chr(10).join([f'[Document {i+1}]: {doc}' for i, doc in enumerate(contexts)])}

Question: {question}

Answer:"""
    
    # Generate the answer
    response = client.chat.completions.create(
        model=LLM_MODEL,
        messages=[
            {"role": "system", "content": "You are a helpful assistant that answers questions based on the provided context."},
            {"role": "user", "content": prompt}
        ]
    )
    
    answer = response.choices[0].message.content
    
    # Format the response with sources
    sources_text = "\n".join([f"- {source}" for source in set(sources)])
    full_response = f"{answer}\n\nSources:\n{sources_text}"
    
    return full_response

def interactive_session():
    """Run an interactive Q&A session"""
    print("\nRAG Q&A System")
    print("Type 'exit' to quit\n")
    
    while True:
        question = input("Question: ")
        if question.lower() in ["exit", "quit"]:
            break
        
        if not question.strip():
            continue
        
        print("\nSearching documents and generating answer...\n")
        answer = answer_question(question)
        print(f"Answer:\n{answer}\n")

if __name__ == "__main__":
    if initialize_system():
        interactive_session()

Step 7: Create a Main Entry Point

Finally, create a main.py file to run the system:

# main.py
import os
from config import OPENAI_API_KEY, DATA_PATH
from rag_system import initialize_system, interactive_session

def check_setup():
    """Check if the system is properly set up"""
    if not OPENAI_API_KEY:
        print("Error: OPENAI_API_KEY is not set")
        print("Please set the OPENAI_API_KEY environment variable")
        return False
    
    if not os.path.exists(DATA_PATH):
        os.makedirs(DATA_PATH)
        print(f"Created data directory at {DATA_PATH}")
        print("Please add PDF files to this directory before running again")
        return False
    
    if not any(file.endswith('.pdf') for file in os.listdir(DATA_PATH)):
        print(f"No PDF files found in {DATA_PATH}")
        print("Please add PDF files to this directory before running again")
        return False
    
    return True

if __name__ == "__main__":
    print("RAG System POC")
    print("==============")
    
    if check_setup():
        if initialize_system():
            interactive_session()

Step 8: Run the System

Now you can run your RAG system:

# Set your OpenAI API key
export OPENAI_API_KEY=your_api_key_here

# Run the system
python main.py

First, add some PDF documents to the data/ directory. When you run the system, it will:

  1. Load and chunk your documents
  2. Create embeddings and store them in a vector database
  3. Start an interactive session where you can ask questions

Example Usage

Here's what a conversation might look like:

RAG System POC
==============
Initializing RAG system...
Loading data/company_handbook.pdf
  Successfully loaded company_handbook.pdf (52489 characters)
Loading data/quarterly_report.pdf
  Successfully loaded quarterly_report.pdf (38721 characters)
Loaded 2 documents
Created 18 chunks from company_handbook.pdf
Created 13 chunks from quarterly_report.pdf
Added 31 chunks to vector store (batch 1)
RAG system initialized successfully!

RAG Q&A System
Type 'exit' to quit

Question: What is our company's remote work policy?

Searching documents and generating answer...

Answer:
According to the provided context, the company has a flexible remote work policy. Employees are allowed to work remotely up to 3 days per week, with the expectation that they are in the office on Tuesdays and Thursdays for team meetings and collaboration. Remote work arrangements need to be approved by direct managers, and employees must maintain regular working hours and be available for virtual meetings when working remotely. The policy also notes that some roles may require more in-office presence depending on job responsibilities.

Sources:
- company_handbook.pdf

Question: exit

Enhancing Your POC

Once you have a working POC, consider these enhancements:

  1. Web interface: Add a simple Flask or Streamlit web interface
  2. Document formats: Expand to handle more document types (DOCX, HTML, etc.)
  3. Metadata filtering: Allow filtering by document type or date
  4. Improved chunking: Implement more sophisticated text splitting strategies
  5. Answer evaluation: Add self-evaluation to rate answer quality

Conclusion

You've now built a working RAG system that can answer questions based on your documents. This simple POC demonstrates the core concepts of retrieval-augmented generation and can be used to showcase the potential value of AI-powered document search in your organization.

While this system is intentionally simple for demonstration purposes, it provides a solid foundation that you can build upon for more advanced use cases. The most important next steps would be to improve document processing, implement proper evaluation metrics, and optimize the retrieval and generation components for your specific document types and use cases.