Parsing PDF Documents at Scale
PDF documents are ubiquitous in business, academia, and government, containing vast amounts of valuable information. However, extracting structured data from PDFs presents unique challenges, especially when dealing with thousands or millions of documents. This guide explores strategies and tools for parsing PDFs at scale.
The PDF Parsing Challenge
PDFs were designed for consistent visual presentation, not for data extraction. This creates several challenges:
- Complex structure: PDFs combine text, images, tables, and forms
- No standardized layout: Each PDF can have a unique format
- Loss of semantic information: Original document structure is often lost
- Content variety: Text can flow across columns, pages, and around images
- Performance concerns: Processing large PDF collections efficiently
Core Components of a PDF Processing Pipeline
An effective large-scale PDF processing system typically includes:
1. Document Ingestion and Triage
Before parsing, documents need to be categorized and prepared:
import os
from PyPDF2 import PdfReader
import hashlib
class DocumentProcessor:
def __init__(self, input_dir, output_dir):
self.input_dir = input_dir
self.output_dir = output_dir
# Create output directories if they don't exist
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "processed"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "failed"), exist_ok=True)
def process_directory(self):
"""Process all PDF files in the input directory"""
results = {
"total": 0,
"processed": 0,
"failed": 0
}
for filename in os.listdir(self.input_dir):
if not filename.lower().endswith('.pdf'):
continue
filepath = os.path.join(self.input_dir, filename)
results["total"] += 1
try:
# Get basic document info
doc_info = self.extract_document_info(filepath)
# Determine document type
doc_type = self.classify_document(doc_info)
# Log document for processing
print(f"Processing {filename}: {doc_type} document, {doc_info['page_count']} pages")
# Process based on document type (handled elsewhere)
results["processed"] += 1
except Exception as e:
print(f"Failed to process {filename}: {str(e)}")
results["failed"] += 1
# Move to failed directory for manual review
os.rename(
filepath,
os.path.join(self.output_dir, "failed", filename)
)
return results
def extract_document_info(self, filepath):
"""Extract basic document metadata"""
with open(filepath, 'rb') as file:
# Calculate file hash for deduplication
file_hash = hashlib.md5(file.read()).hexdigest()
# Read PDF metadata
reader = PdfReader(filepath)
# Extract basic info
info = {
"filename": os.path.basename(filepath),
"file_size": os.path.getsize(filepath),
"file_hash": file_hash,
"page_count": len(reader.pages),
"metadata": reader.metadata
}
# Extract first page text for classification
if len(reader.pages) > 0:
first_page = reader.pages[0].extract_text()
info["first_page_text"] = first_page
return info
def classify_document(self, doc_info):
"""Simple rule-based document classification"""
# This could be replaced with a more sophisticated ML-based classifier
first_page = doc_info.get("first_page_text", "").lower()
if "invoice" in first_page or "bill" in first_page:
return "invoice"
elif "agreement" in first_page or "contract" in first_page:
return "contract"
elif "report" in first_page:
return "report"
else:
return "general"
2. Text and Layout Extraction
Different tools excel at different aspects of PDF parsing:
import pdfplumber
from pdf2image import convert_from_path
import pytesseract
import numpy as np
class PDFExtractor:
def __init__(self, ocr_enabled=True):
self.ocr_enabled = ocr_enabled
def extract_text(self, pdf_path):
"""Extract text content with layout awareness"""
with pdfplumber.open(pdf_path) as pdf:
all_text = []
for i, page in enumerate(pdf.pages):
# Extract text with position information
text = page.extract_text(x_tolerance=3, y_tolerance=3)
# If text extraction failed and OCR is enabled, try OCR
if not text and self.ocr_enabled:
text = self.perform_ocr_on_page(pdf_path, i)
all_text.append(text)
return "\n\n".join(all_text)
def perform_ocr_on_page(self, pdf_path, page_num):
"""Extract text using OCR for image-based PDFs"""
# Convert PDF page to image
images = convert_from_path(pdf_path, first_page=page_num+1, last_page=page_num+1)
if not images:
return ""
# Apply OCR to the image
text = pytesseract.image_to_string(images[0])
return text
def extract_tables(self, pdf_path):
"""Extract tables from PDF"""
with pdfplumber.open(pdf_path) as pdf:
all_tables = []
for page in pdf.pages:
# Find tables in the page
tables = page.extract_tables()
if tables:
all_tables.extend(tables)
return all_tables
def extract_structured_data(self, pdf_path, doc_type):
"""Extract structured data based on document type"""
if doc_type == "invoice":
return self.extract_invoice_data(pdf_path)
elif doc_type == "contract":
return self.extract_contract_data(pdf_path)
else:
# Default to basic text extraction
return {"text": self.extract_text(pdf_path)}
def extract_invoice_data(self, pdf_path):
"""Extract structured data from invoice"""
# This would be a specialized parser for invoices
# Typically using regex patterns, position-based extraction, or ML models
# For demonstration purposes, we'll just extract basic text
text = self.extract_text(pdf_path)
# Example of simple pattern matching (would be more sophisticated in practice)
import re
# Try to find invoice number
invoice_number_match = re.search(r'Invoice\s+#?:?\s*([A-Z0-9\-]+)', text)
invoice_number = invoice_number_match.group(1) if invoice_number_match else None
# Try to find date
date_match = re.search(r'Date:?\s*(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{2,4})', text)
date = date_match.group(1) if date_match else None
# Try to find total amount
amount_match = re.search(r'Total:?\s*\$?\s*(\d+[,\.]?\d*)', text)
total_amount = amount_match.group(1) if amount_match else None
return {
"document_type": "invoice",
"invoice_number": invoice_number,
"date": date,
"total_amount": total_amount,
"raw_text": text
}
3. Content Normalization
Converting extracted content into a standardized format:
import re
import json
import dateutil.parser
class ContentNormalizer:
def __init__(self):
# Load common patterns for data types
self.date_patterns = [
r'\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{2,4}', # MM/DD/YYYY or DD/MM/YYYY
r'\d{4}[\/\-\.]\d{1,2}[\/\-\.]\d{1,2}', # YYYY/MM/DD
r'[A-Z][a-z]{2}\s+\d{1,2},?\s+\d{4}' # Month DD, YYYY
]
def normalize_text(self, text):
"""Basic text normalization"""
if not text:
return text
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Normalize line breaks
text = re.sub(r'[\r\n]+', '\n', text)
return text.strip()
def normalize_date(self, date_str):
"""Convert various date formats to ISO format"""
if not date_str:
return None
try:
# Use dateutil to parse various formats
parsed_date = dateutil.parser.parse(date_str)
return parsed_date.strftime('%Y-%m-%d')
except:
return date_str
def normalize_currency(self, amount_str):
"""Normalize currency amounts"""
if not amount_str:
return None
# Remove currency symbols and commas
amount_str = re.sub(r'[$£€,]', '', amount_str.strip())
try:
# Convert to float
return float(amount_str)
except:
return amount_str
def normalize_document(self, doc_data):
"""Normalize extracted document data"""
normalized = {}
# Copy original data
for key, value in doc_data.items():
if key == "raw_text":
normalized[key] = self.normalize_text(value)
elif key.endswith("_date") or key == "date":
normalized[key] = self.normalize_date(value)
elif key.endswith("_amount") or key == "amount" or key == "total":
normalized[key] = self.normalize_currency(value)
else:
normalized[key] = value
return normalized
4. Distributed Processing
For handling large document collections efficiently:
from concurrent.futures import ProcessPoolExecutor
import os
import json
import time
class PDFBatchProcessor:
def __init__(self, input_dir, output_dir, max_workers=None):
self.input_dir = input_dir
self.output_dir = output_dir
self.max_workers = max_workers # None means use all available cores
# Initialize components
self.extractor = PDFExtractor(ocr_enabled=True)
self.normalizer = ContentNormalizer()
# Create output directory
os.makedirs(output_dir, exist_ok=True)
def process_file(self, filepath):
"""Process a single PDF file"""
try:
# Get filename for output
filename = os.path.basename(filepath)
output_path = os.path.join(self.output_dir, f"{os.path.splitext(filename)[0]}.json")
# Skip if already processed
if os.path.exists(output_path):
return {"status": "skipped", "file": filename}
# Extract document info and classify
doc_processor = DocumentProcessor("", "")
doc_info = doc_processor.extract_document_info(filepath)
doc_type = doc_processor.classify_document(doc_info)
# Extract structured data
extracted_data = self.extractor.extract_structured_data(filepath, doc_type)
# Normalize data
normalized_data = self.normalizer.normalize_document(extracted_data)
# Add metadata
normalized_data.update({
"source_file": filename,
"file_size": doc_info["file_size"],
"page_count": doc_info["page_count"],
"processed_at": time.strftime("%Y-%m-%d %H:%M:%S")
})
# Save processed data
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(normalized_data, f, indent=2, ensure_ascii=False)
return {"status": "success", "file": filename}
except Exception as e:
return {"status": "error", "file": filepath, "error": str(e)}
def process_batch(self):
"""Process all PDF files using parallel execution"""
# Get all PDF files in the directory
pdf_files = [
os.path.join(self.input_dir, f)
for f in os.listdir(self.input_dir)
if f.lower().endswith('.pdf')
]
total_files = len(pdf_files)
if total_files == 0:
return {"status": "complete", "processed": 0, "errors": 0, "message": "No PDF files found"}
print(f"Processing {total_files} PDF files...")
# Process files in parallel
results = []
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
for result in executor.map(self.process_file, pdf_files):
results.append(result)
# Print progress
processed = len(results)
if processed % 10 == 0 or processed == total_files:
success = sum(1 for r in results if r["status"] == "success")
errors = sum(1 for r in results if r["status"] == "error")
skipped = sum(1 for r in results if r["status"] == "skipped")
print(f"Progress: {processed}/{total_files} ({success} succeeded, {errors} failed, {skipped} skipped)")
# Summarize results
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
skipped_count = sum(1 for r in results if r["status"] == "skipped")
return {
"status": "complete",
"total": total_files,
"processed": success_count,
"errors": error_count,
"skipped": skipped_count
}
5. Quality Control
Implementing validation and error handling:
import json
import os
from jsonschema import validate, ValidationError
class QualityController:
def __init__(self, schemas_dir):
# Load schema definitions for different document types
self.schemas = {}
for filename in os.listdir(schemas_dir):
if filename.endswith('.json'):
schema_type = os.path.splitext(filename)[0]
with open(os.path.join(schemas_dir, filename)) as f:
self.schemas[schema_type] = json.load(f)
def validate_document(self, document_data):
"""Validate document data against schema"""
doc_type = document_data.get("document_type", "general")
# If we have a schema for this document type, validate against it
if doc_type in self.schemas:
try:
validate(instance=document_data, schema=self.schemas[doc_type])
return {"valid": True, "errors": []}
except ValidationError as e:
return {"valid": False, "errors": [str(e)]}
# No schema available
return {"valid": True, "warnings": ["No schema available for validation"]}
def check_data_quality(self, document_data):
"""Perform quality checks on extracted data"""
quality_issues = []
# Check for empty fields
for key, value in document_data.items():
if key != "raw_text" and (value is None or value == ""):
quality_issues.append(f"Empty value for field: {key}")
# Check for raw text extraction
if "raw_text" in document_data:
raw_text = document_data["raw_text"]
if not raw_text or len(raw_text) < 50:
quality_issues.append("Limited or no text extracted")
# Check for date format consistency
date_fields = [k for k in document_data.keys() if k.endswith("_date") or k == "date"]
for field in date_fields:
value = document_data.get(field)
if value and not re.match(r'^\d{4}-\d{2}-\d{2}$', value):
quality_issues.append(f"Date field {field} not in YYYY-MM-DD format: {value}")
return {
"quality_score": 1.0 - (len(quality_issues) * 0.1), # Simple scoring
"issues": quality_issues
}
def process_directory(self, processed_dir, quality_threshold=0.7):
"""Process a directory of extracted document data"""
results = {
"total": 0,
"passed": 0,
"failed_validation": 0,
"low_quality": 0
}
for filename in os.listdir(processed_dir):
if not filename.endswith('.json'):
continue
results["total"] += 1
filepath = os.path.join(processed_dir, filename)
with open(filepath, 'r', encoding='utf-8') as f:
try:
document_data = json.load(f)
# Validate against schema
validation = self.validate_document(document_data)
# Check data quality
quality = self.check_data_quality(document_data)
# Update document with quality info
document_data["_quality"] = {
"validation": validation,
"quality_check": quality
}
# Write updated document
with open(filepath, 'w', encoding='utf-8') as outf:
json.dump(document_data, outf, indent=2, ensure_ascii=False)
# Update results
if not validation["valid"]:
results["failed_validation"] += 1
elif quality["quality_score"] < quality_threshold:
results["low_quality"] += 1
else:
results["passed"] += 1
except Exception as e:
print(f"Error processing {filename}: {str(e)}")
return results
Handling Different PDF Types
Different document types require specialized approaches:
Scanned Documents (Image-Based PDFs)
For PDFs that are essentially images:
from pdf2image import convert_from_path
import pytesseract
import cv2
import numpy as np
class ScannedPDFProcessor:
def __init__(self):
pass
def preprocess_image(self, image):
"""Improve image quality for better OCR results"""
# Convert to grayscale
gray = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2GRAY)
# Apply adaptive thresholding
thresh = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
# Noise removal
kernel = np.ones((1, 1), np.uint8)
opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
return opening
def extract_text_from_scan(self, pdf_path, preprocess=True, deskew=True):
"""Extract text from scanned PDF using OCR"""
# Convert PDF pages to images
images = convert_from_path(pdf_path)
all_text = []
for i, image in enumerate(images):
# Preprocess image if requested
if preprocess:
image_processed = self.preprocess_image(image)
else:
image_processed = np.array(image)
# Deskew if requested
if deskew:
try:
# Find skew angle
coords = np.column_stack(np.where(image_processed > 0))
angle = cv2.minAreaRect(coords)[-1]
# Adjust angle
if angle < -45:
angle = -(90 + angle)
else:
angle = -angle
# Rotate image
(h, w) = image_processed.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
image_processed = cv2.warpAffine(
image_processed, M, (w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE
)
except:
# Skip deskewing if it fails
pass
# Apply OCR
text = pytesseract.image_to_string(image_processed)
all_text.append(text)
return "\n\n".join(all_text)
Form-Based PDFs
For documents with structured forms:
import fitz # PyMuPDF
import re
class FormExtractor:
def __init__(self):
pass
def extract_form_fields(self, pdf_path):
"""Extract form fields and values from PDF"""
doc = fitz.open(pdf_path)
form_fields = {}
# Check if document has form fields
for page in doc:
widgets = page.widgets()
for widget in widgets:
field_name = widget.field_name
field_value = widget.field_value
field_type = widget.field_type
# Store field info
form_fields[field_name] = {
"value": field_value,
"type": field_type,
"page": page.number
}
# If there are no form fields, try to extract them by position
if not form_fields:
form_fields = self.extract_fields_by_position(doc)
doc.close()
return form_fields
def extract_fields_by_position(self, doc):
"""Extract form fields based on text positions and patterns"""
fields = {}
# Common form field labels
field_patterns = [
r"Name:\s*(.*?)(?:\n|$)",
r"Address:\s*(.*?)(?:\n|$)",
r"Email:\s*(.*?)(?:\n|$)",
r"Phone:\s*(.*?)(?:\n|$)",
r"Date:\s*(.*?)(?:\n|$)",
# Add more patterns as needed
]
for page in doc:
text = page.get_text()
# Try to extract fields based on patterns
for pattern in field_patterns:
matches = re.findall(pattern, text)
if matches:
# Extract field name from pattern
field_name = pattern.split(":")[0]
field_value = matches[0].strip()
fields[field_name] = {
"value": field_value,
"page": page.number,
"extraction_method": "pattern_match"
}
return fields
Tables in PDFs
Specialized extraction for tabular data:
import camelot
import pandas as pd
class TableExtractor:
def __init__(self):
pass
def extract_tables(self, pdf_path, pages="all"):
"""Extract tables from PDF using Camelot"""
# Extract tables
tables = camelot.read_pdf(pdf_path, pages=pages, flavor="lattice")
extracted_tables = []
for i, table in enumerate(tables):
# Convert to DataFrame
df = table.df
# Basic cleaning
df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
# Use first row as header if it looks like one
if df.shape[0] > 0:
if all(cell and not cell.isdigit() for cell in df.iloc[0].values):
df.columns = df.iloc[0]
df = df.iloc[1:].reset_index(drop=True)
# Get table metadata
metadata = {
"table_number": i + 1,
"page": table.page,
"accuracy": table.accuracy,
"whitespace": table.whitespace,
"shape": df.shape
}
extracted_tables.append({
"data": df.to_dict(orient="records"),
"metadata": metadata
})
return extracted_tables
Creating a Complete Processing Pipeline
Putting everything together:
import os
import json
import time
import logging
from concurrent.futures import ProcessPoolExecutor
import traceback
class PDFProcessingPipeline:
def __init__(self, config):
self.config = config
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(config["log_file"]),
logging.StreamHandler()
]
)
self.logger = logging.getLogger("PDFProcessor")
# Create directories
os.makedirs(config["input_dir"], exist_ok=True)
os.makedirs(config["output_dir"], exist_ok=True)
os.makedirs(config["error_dir"], exist_ok=True)
os.makedirs(config["processed_dir"], exist_ok=True)
# Initialize components
self.document_processor = DocumentProcessor(
config["input_dir"], config["processed_dir"]
)
self.extractor = PDFExtractor(ocr_enabled=config.get("ocr_enabled", True))
self.table_extractor = TableExtractor()
self.form_extractor = FormExtractor()
self.scanned_processor = ScannedPDFProcessor()
self.normalizer = ContentNormalizer()
self.quality_controller = QualityController(config["schemas_dir"])
def process_pdf(self, filepath):
"""Process a single PDF file through the entire pipeline"""
filename = os.path.basename(filepath)
output_path = os.path.join(self.config["output_dir"], f"{os.path.splitext(filename)[0]}.json")
try:
self.logger.info(f"Processing {filename}")
# 1. Extract document info and classify
doc_info = self.document_processor.extract_document_info(filepath)
doc_type = self.document_processor.classify_document(doc_info)
self.logger.info(f"Classified {filename} as {doc_type}")
# 2. Extract data based on document type
extracted_data = {"document_type": doc_type}
# Basic text extraction
extracted_data["text"] = self.extractor.extract_text(filepath)
# If document is primarily scanned/image-based
if len(extracted_data["text"]) < 100:
self.logger.info(f"{filename} appears to be a scanned document, applying OCR")
extracted_data["text"] = self.scanned_processor.extract_text_from_scan(filepath)
# Extract tables if present
tables = self.table_extractor.extract_tables(filepath)
if tables:
extracted_data["tables"] = tables
self.logger.info(f"Extracted {len(tables)} tables from {filename}")
# Extract form fields if present
form_fields = self.form_extractor.extract_form_fields(filepath)
if form_fields:
extracted_data["form_fields"] = form_fields
self.logger.info(f"Extracted {len(form_fields)} form fields from {filename}")
# Type-specific extraction
if doc_type == "invoice":
extracted_data.update(self.extractor.extract_invoice_data(filepath))
# 3. Normalize data
normalized_data = self.normalizer.normalize_document(extracted_data)
# Add metadata
normalized_data.update({
"source_file": filename,
"file_size": doc_info["file_size"],
"file_hash": doc_info["file_hash"],
"page_count": doc_info["page_count"],
"processed_at": time.strftime("%Y-%m-%d %H:%M:%S")
})
# 4. Quality control
validation = self.quality_controller.validate_document(normalized_data)
quality = self.quality_controller.check_data_quality(normalized_data)
normalized_data["_quality"] = {
"validation": validation,
"quality_check": quality
}
# 5. Save processed data
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(normalized_data, f, indent=2, ensure_ascii=False)
# Move to processed directory
os.rename(
filepath,
os.path.join(self.config["processed_dir"], filename)
)
self.logger.info(f"Successfully processed {filename}")
return {"status": "success", "file": filename}
except Exception as e:
self.logger.error(f"Error processing {filename}: {str(e)}")
self.logger.error(traceback.format_exc())
# Move to error directory
os.rename(
filepath,
os.path.join(self.config["error_dir"], filename)
)
return {"status": "error", "file": filename, "error": str(e)}
def run_pipeline(self, max_workers=None):
"""Run the processing pipeline on all files in input directory"""
# Get all PDF files in the input directory
pdf_files = [
os.path.join(self.config["input_dir"], f)
for f in os.listdir(self.config["input_dir"])
if f.lower().endswith('.pdf')
]
total_files = len(pdf_files)
self.logger.info(f"Starting pipeline for {total_files} PDF files")
if total_files == 0:
return {"status": "complete", "processed": 0, "errors": 0}
# Process files in parallel
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
for result in executor.map(self.process_pdf, pdf_files):
results.append(result)
# Summarize results
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
summary = {
"status": "complete",
"total": total_files,
"processed": success_count,
"errors": error_count
}
self.logger.info(f"Pipeline completed: {success_count}/{total_files} processed successfully")
# Save summary
with open(os.path.join(self.config["output_dir"], "processing_summary.json"), 'w') as f:
json.dump(summary, f, indent=2)
return summary
Using the Pipeline
Here's how to use this pipeline in practice:
# Example usage
config = {
"input_dir": "/path/to/input",
"output_dir": "/path/to/output",
"processed_dir": "/path/to/processed",
"error_dir": "/path/to/errors",
"schemas_dir": "/path/to/schemas",
"log_file": "/path/to/processing.log",
"ocr_enabled": True
}
# Initialize and run pipeline
pipeline = PDFProcessingPipeline(config)
results = pipeline.run_pipeline(max_workers=4) # Adjust based on available CPU cores
print(f"Processing complete: {results['processed']}/{results['total']} files processed successfully")
Best Practices for PDF Processing at Scale
Based on experience working with large document collections:
1. Document Preprocessing
- Normalize PDF files: Fix corrupted documents before processing
- Remove password protection: Handle encrypted PDFs
- Optimize scanned documents: Enhance image quality for better OCR
2. Performance Optimization
- Batch processing: Group smaller PDFs together
- Resource monitoring: Track memory usage for large documents
- Caching: Store intermediate results to avoid reprocessing
3. Quality Management
- Sampling: Regularly sample outputs for manual review
- Feedback loops: Update extraction rules based on error patterns
- Confidence scoring: Assign confidence levels to extracted data
Advanced Techniques
For the most challenging PDF processing tasks:
1. Machine Learning for Layout Analysis
Using ML models to understand document structure:
# Pseudocode for ML-based layout analysis
def analyze_layout_with_ml(pdf_path):
# Convert PDF page to image
page_images = convert_pdf_to_images(pdf_path)
# For each page image
for img in page_images:
# Preprocess image
preprocessed = preprocess_image(img)
# Apply layout detection model
# This would be a trained model, e.g., Mask R-CNN or YOLO
layout_regions = layout_detection_model.predict(preprocessed)
# Classify each region (text, table, image, etc.)
classified_regions = []
for region in layout_regions:
region_img = crop_region(preprocessed, region)
region_type = region_classifier.predict(region_img)
classified_regions.append({
"bbox": region,
"type": region_type
})
# Process each region according to its type
for region in classified_regions:
if region["type"] == "text":
# Extract text
pass
elif region["type"] == "table":
# Extract table
pass
# etc.
return processed_document
2. Template-Based Extraction
For documents with consistent layouts:
class TemplateBasedExtractor:
def __init__(self, templates_dir):
# Load template definitions
self.templates = {}
for filename in os.listdir(templates_dir):
if filename.endswith('.json'):
template_name = os.path.splitext(filename)[0]
with open(os.path.join(templates_dir, filename)) as f:
self.templates[template_name] = json.load(f)
def match_template(self, pdf_path):
"""Determine which template matches this document"""
# Extract first page text for matching
with pdfplumber.open(pdf_path) as pdf:
if len(pdf.pages) > 0:
first_page_text = pdf.pages[0].extract_text()
else:
return None
# Try to match against known templates
best_match = None
best_score = 0
for name, template in self.templates.items():
# Check for required text markers
markers = template.get("markers", [])
match_count = sum(1 for marker in markers if marker in first_page_text)
if match_count > best_score:
best_score = match_count
best_match = name
# Require at least 2 markers to match
if best_score >= 2:
return best_match
return None
def extract_with_template(self, pdf_path, template_name):
"""Extract data using a specific template"""
template = self.templates.get(template_name)
if not template:
return None
# Open PDF
with pdfplumber.open(pdf_path) as pdf:
extracted_data = {}
# Process each field definition
for field_name, field_def in template.get("fields", {}).items():
page_num = field_def.get("page", 0)
if page_num < len(pdf.pages):
page = pdf.pages[page_num]
# Extract based on field type
if field_def["type"] == "text_at_position":
# Extract text from a specific area
x0, y0, x1, y1 = field_def["bbox"]
crop = page.crop((x0, y0, x1, y1))
extracted_data[field_name] = crop.extract_text()
elif field_def["type"] == "table_at_position":
# Extract table from a specific area
x0, y0, x1, y1 = field_def["bbox"]
crop = page.crop((x0, y0, x1, y1))
table = crop.extract_table()
if table:
extracted_data[field_name] = table
elif field_def["type"] == "regex_pattern":
# Extract text using regex pattern
text = page.extract_text()
pattern = field_def["pattern"]
match = re.search(pattern, text)
if match and match.groups():
extracted_data[field_name] = match.group(1)
return extracted_data
Conclusion
Building a scalable PDF processing system requires a thoughtful combination of different techniques, tools, and quality control mechanisms. By implementing a pipeline like the one described here, you can efficiently extract structured data from large volumes of PDF documents, transforming unstructured content into valuable, machine-readable information.
The approaches outlined in this guide provide a foundation that can be adapted to specific document types and use cases. Remember that PDF parsing is often an iterative process—continuously improving extraction rules and processing techniques based on the specific documents you encounter will yield the best results.