A comprehensive, scalable PDF processing system with support for Google Cloud Storage (GCS) and Google Drive backends, featuring resume capability, distributed locking, and production-ready deployment options.
- π Resume Capability: Can resume from where it left off after crashes or interruptions
- β‘ Concurrent Processing: File-level and page-level concurrency with intelligent backpressure
- ποΈ Multi-Storage Backends: Support for both GCS and Google Drive via pluggable storage interface
- π Distributed Locking: Prevents duplicate processing across multiple instances
- π Comprehensive Logging: JSON logs, dead letter queue, and Supabase integration
- β PDF Validation: Validates PDF integrity before processing
- π¦ Rate Limiting: Global Gemini API throttling and storage operation limits
- π‘οΈ Graceful Shutdown: Proper cleanup on termination signals
- π₯ Health Monitoring: Built-in health checks and monitoring endpoints
- π Auto-scaling: Kubernetes HPA for dynamic scaling
- π³ Container Ready: Docker and Kubernetes deployment configurations
The system consists of:
- Unified Worker: Single worker supporting both GCS and Google Drive backends
- Storage Interface: Pluggable storage abstraction layer
- OCR Engine: Gemini API integration with intelligent rate limiting
- Resume System: Persistent progress tracking and resume capability
- Distributed Locking: Redis-based or file-based locking to prevent duplicates
- Comprehensive Logging: Multi-output logging system with structured JSON logs
- Health Monitoring: Built-in health checks and metrics endpoints
- Python 3.11+
- Google Cloud Storage bucket OR Google Drive folders
- Gemini API key
- Service account credentials (GCS) OR OAuth2 credentials (Drive)
- Redis instance (for distributed locking)
# Install from PyPI
pip install dist-gcs-pdf-processing==2.0.0
# Or install from source
git clone <repository-url>
cd gcs-pdf-processing
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
pip install -e .
Create a .env
file with your settings:
# API Keys
GEMINI_API_KEY=your_gemini_api_key
# Google Cloud Storage (for GCS backend)
GOOGLE_APPLICATION_CREDENTIALS=secrets/gcs-service-account.json
GCS_BUCKET_NAME=your-bucket-name
GCS_SOURCE_PREFIX=source/
GCS_DEST_PREFIX=processed/
# Google Drive (for Drive backend)
GOOGLE_DRIVE_CREDENTIALS=secrets/drive-oauth2-credentials.json
DRIVE_SOURCE_FOLDER_ID=your_source_folder_id
DRIVE_DEST_FOLDER_ID=your_dest_folder_id
# Redis (for distributed locking)
REDIS_URL=redis://localhost:6379/0
# Supabase (optional, for persistent error logging)
SUPABASE_URL=your_supabase_url
SUPABASE_API_KEY=your_supabase_api_key
# Worker Configuration
POLL_INTERVAL=30
MAX_CONCURRENT_FILES=3
MAX_CONCURRENT_WORKERS=8
GEMINI_GLOBAL_CONCURRENCY=10
MAX_RETRIES=3
# Run GCS worker
dist-gcs-worker
# Run Drive worker
dist-drive-worker
# Run API server
dist-gcs-api
# Build and run with Docker Compose
docker-compose up -d
# Scale workers
docker-compose up -d --scale pdf-worker-gcs=3 --scale pdf-worker-drive=2
# Deploy to Kubernetes
kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/secrets.yaml
kubectl apply -f k8s/redis-deployment.yaml
kubectl apply -f k8s/worker-deployment.yaml
kubectl apply -f k8s/api-deployment.yaml
kubectl apply -f k8s/hpa.yaml
βββ src/dist_gcs_pdf_processing/
β βββ unified_worker.py # π― Main unified worker
β βββ storage_interface.py # ποΈ Storage abstraction layer
β βββ gcs_utils.py # βοΈ GCS operations
β βββ drive_utils_oauth2.py # π Drive operations
β βββ ocr.py # π OCR processing
β βββ config.py # βοΈ Configuration
β βββ env.py # π Environment setup
β βββ shared.py # π§ Shared utilities
βββ k8s/ # βΈοΈ Kubernetes manifests
βββ docker-compose.yml # π³ Docker Compose config
βββ Dockerfile # π³ Docker configuration
βββ tests/ # π§ͺ Test suite
Variable | Description | Default | Notes |
---|---|---|---|
STORAGE_BACKEND |
Storage backend (gcs/drive) | gcs | Determines which storage to use |
POLL_INTERVAL |
Polling interval in seconds | 30 | How often to check for new files |
MAX_CONCURRENT_FILES |
Max concurrent files | 3 | Files processed simultaneously |
MAX_CONCURRENT_WORKERS |
Max concurrent workers | 8 | Pages processed simultaneously |
GEMINI_GLOBAL_CONCURRENCY |
Global Gemini API concurrency | 10 | Global API rate limiting |
MAX_RETRIES |
Max retries per page | 3 | Retry failed pages |
REDIS_URL |
Redis connection URL | None | For distributed locking |
WORKER_INSTANCE_ID |
Unique worker instance ID | Auto-generated | For logging and locking |
- Worker Health: Checks for log file existence
- API Health: HTTP endpoint at
/health
- Redis Health: Redis ping command
- Structured Logs: JSON format in
logs/json/
- Dead Letter Queue: Failed files in
logs/dead_letter/
- Progress Tracking: Resume state in
logs/progress/
- Supabase Integration: Persistent error logging
- Prometheus Metrics: Available at
/metrics
endpoint - Resource Usage: CPU, memory, network
- Processing Metrics: Files processed, pages processed, errors
# Start all services
docker-compose up -d
# View logs
docker-compose logs -f
# Scale workers
docker-compose up -d --scale pdf-worker-gcs=3 --scale pdf-worker-drive=2
# Stop services
docker-compose down
# Deploy to Kubernetes
kubectl apply -f k8s/
# Check deployment status
kubectl get pods -n pdf-processing
# View logs
kubectl logs -f deployment/pdf-worker-gcs -n pdf-processing
# Scale manually
kubectl scale deployment pdf-worker-gcs --replicas=5 -n pdf-processing
# Run GCS worker
docker run -d \
--name pdf-worker-gcs \
--env-file .env \
-v ./secrets:/app/secrets:ro \
-v ./logs:/app/logs \
pdf-worker:latest \
dist-gcs-worker
# Run Drive worker
docker run -d \
--name pdf-worker-drive \
--env-file .env \
-v ./secrets:/app/secrets:ro \
-v ./logs:/app/logs \
pdf-worker:latest \
dist-drive-worker
# Run API server
docker run -d \
--name pdf-api \
--env-file .env \
-p 8000:8000 \
-v ./secrets:/app/secrets:ro \
-v ./logs:/app/logs \
pdf-worker:latest \
dist-gcs-api
-
Redis Connection Failed
# Check Redis status kubectl get pods -l app=redis -n pdf-processing
-
Authentication Errors
# Check secrets kubectl get secret pdf-worker-secrets -n pdf-processing -o yaml
-
Duplicate Processing
# Check Redis locks redis-cli keys "pdf_processing:*"
# Check worker status
kubectl describe pod <pod-name> -n pdf-processing
# View logs
kubectl logs -f <pod-name> -n pdf-processing
# Execute shell in pod
kubectl exec -it <pod-name> -n pdf-processing -- /bin/bash
- Kubernetes HPA: Automatic scaling based on CPU/memory
- Manual Scaling:
kubectl scale deployment
- Docker Compose:
docker-compose up --scale
- Resource Limits: Adjust CPU/memory limits
- Concurrency: Increase
MAX_CONCURRENT_FILES
- Workers: Increase
MAX_CONCURRENT_WORKERS
- Secrets Management: Use Kubernetes secrets or external secret management
- Network Policies: Implement network segmentation
- RBAC: Configure proper role-based access control
- Image Security: Scan images for vulnerabilities
- Resource Limits: Prevent resource exhaustion attacks
-
Create a GCS Bucket:
- Go to Google Cloud Console
- Navigate to Cloud Storage
- Create a new bucket
- Note the bucket name
-
Create Service Account:
- Go to "IAM & Admin" β "Service Accounts"
- Click "Create Service Account"
- Provide name and description
- Grant "Storage Admin" role
- Create and download JSON key as
secrets/gcs-service-account.json
-
Configure Environment:
GOOGLE_APPLICATION_CREDENTIALS=secrets/gcs-service-account.json GCS_BUCKET_NAME=your-bucket-name GCS_SOURCE_PREFIX=source/ GCS_DEST_PREFIX=processed/
-
Create a Google Cloud Project:
- Go to Google Cloud Console
- Create a new project or select existing one
- Enable the Google Drive API
-
Create OAuth2 Credentials:
- Go to "Credentials" in the Google Cloud Console
- Click "Create Credentials" β "OAuth 2.0 Client IDs"
- Choose "Desktop application"
- Download the JSON file as
secrets/drive-oauth2-credentials.json
-
Set up OAuth2 Flow:
# Run the OAuth2 setup (one-time) python -c " from dist_gcs_pdf_processing.drive_utils_oauth2 import setup_oauth2_credentials setup_oauth2_credentials() "
-
Create Drive Folders:
- Create source and destination folders in Google Drive
- Copy folder IDs from URLs
- Configure environment:
GOOGLE_DRIVE_CREDENTIALS=secrets/drive-oauth2-credentials.json DRIVE_SOURCE_FOLDER_ID=your_source_folder_id DRIVE_DEST_FOLDER_ID=your_dest_folder_id
# Install Redis
# Ubuntu/Debian
sudo apt-get install redis-server
# macOS
brew install redis
# Start Redis
redis-server
docker run -d --name redis -p 6379:6379 redis:alpine
kubectl apply -f k8s/redis-deployment.yaml
GET /
- Basic health checkGET /health
- Detailed health statusGET /status
- Worker status and metricsGET /metrics
- Prometheus metrics
POST /process-file
- Process a specific filePOST /drive-event
- Process files from Drive webhookGET /logs
- Recent processing logs
GET /config
- Current configurationPOST /config
- Update configuration (restart required)
# Run all tests
pytest tests/
# Run specific test file
pytest tests/test_worker.py
# Run with coverage
pytest --cov=src/dist_gcs_pdf_processing tests/
# Run integration tests
pytest tests/test_integration.py -v
MIT License - see LICENSE file for details.
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Documentation: Wiki
See CONTRIBUTING.md for development guidelines and contribution instructions.