13 KiB
Vektorisierung mit Apache Camel
Übersicht
Die Vektorisierung erfolgt vollständig asynchron über Apache Camel Routes und nutzt einen externen Python Embedding Service über REST.
Architektur
┌─────────────────────────────────────────────────────────────────┐
│ Vektorisierungs-Pipeline │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌─────────────────┐ ┌───────────────┐ │
│ │ XML File │───▶│ TedDocumentRoute│───▶│ Document │ │
│ │ Processing │ │ │ │ Saved to DB │ │
│ └──────────────┘ └────────┬────────┘ └───────┬───────┘ │
│ │ │ │
│ │ wireTap │ │
│ ▼ │ │
│ ┌──────────────────────────────────────────────────┼──────┐ │
│ │ direct:vectorize (Trigger) │ │ │
│ └──────────────────────────┬───────────────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────────────────────────────────────────┐ │ │
│ │ seda:vectorize-async (4 concurrent workers) │ │ │
│ │ │ │ │
│ │ 1. Load document from DB │ │ │
│ │ 2. Extract text_content (includes Lots!) │ │ │
│ │ 3. Set status = PROCESSING │ │ │
│ │ 4. Add "passage: " prefix │ │ │
│ │ 5. Call REST API │ │ │
│ │ 6. Update content_vector │ │ │
│ └──────────────┬───────────────────────────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────────────────────────────────────────┐ │ │
│ │ Python Embedding Service (Port 8001) │ │ │
│ │ POST /embed │ │ │
│ │ Model: intfloat/multilingual-e5-large │ │ │
│ │ Returns: [1024 floats] │ │ │
│ └──────────────────────────────────────────────────┘ │ │
│ │ │
│ ┌──────────────────────────────────────────────────┐ │ │
│ │ Timer Route (every 60s) │◀─────┘ │
│ │ │ │
│ │ SELECT * FROM procurement_document │ │
│ │ WHERE vectorization_status = 'PENDING' │ │
│ │ LIMIT 16 │ │
│ │ │ │
│ │ → Trigger vectorization for each │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Apache Camel Routes
1. Trigger Route (direct:vectorize)
Route-ID: vectorization-trigger
Funktion: Empfängt documentId und leitet an async Queue weiter
Integration: Wird von TedDocumentRoute per wireTap aufgerufen (non-blocking)
from("direct:vectorize")
.to("seda:vectorize-async?concurrentConsumers=4&waitForTaskToComplete=Never");
2. Async Processor Route (seda:vectorize-async)
Route-ID: vectorization-processor
Concurrent Workers: 4 (konfigurierbar)
Ablauf:
- ✅ Load document from DB via
documentId - ✅ Update status →
PROCESSING - ✅ Extract
text_content(enthält Dokument + Lots!) - ✅ Truncate wenn >
max-text-length(8192 chars) - ✅ Add prefix:
"passage: " + text - ✅ POST →
http://localhost:8001/embedmit JSON body - ✅ Parse JSON response →
float[1024] - ✅ Update
content_vectorin DB - ✅ Update status →
COMPLETED
Error Handling:
- Max 2 Retries mit 2s Delay
- Bei Fehler: Status →
FAILEDmit Error-Message
3. Scheduler Route (timer:vectorization-scheduler)
Route-ID: vectorization-scheduler
Interval: 60 Sekunden (nach 5s Delay beim Start)
Funktion: Verarbeitet noch nicht vektorisierte Dokumente aus der DB
Ablauf:
from("timer:vectorization-scheduler?period=60000&delay=5000")
.process(exchange -> {
// Load PENDING documents from DB
List<ProcurementDocument> pending =
documentRepository.findByVectorizationStatus(PENDING, PageRequest.of(0, 16));
})
.split(body())
.to("direct:vectorize") // Trigger für jedes Dokument
.end();
Text-Inhalt für Vektorisierung
Der text_content wird in XmlParserService.generateTextContent() erstellt und enthält:
Title: Mission de maitrise d'œuvre pour la création...
Description: Désignation d'une équipe de maîtrise d'œuvre...
Contracting Authority: Société Publique Locale, Bannalec (FRA)
Contract Type: SERVICES
Procedure: OTHER
CPV Codes: 71200000
Lots (1):
- LOT-0001: Mission de maîtrise d'œuvre... - Désignation d'une équipe...
Alle Lot-Titel und Beschreibungen werden einbezogen!
REST API: Python Embedding Service
Endpoint
POST http://localhost:8001/embed
Request
{
"text": "passage: Title: Mission de maitrise d'œuvre..."
}
Response
[0.123, -0.456, 0.789, ..., 0.321]
Format: JSON Array mit 1024 Floats
Model
- Name:
intfloat/multilingual-e5-large - Dimensions: 1024
- Languages: 100+ (Mehrsprachig)
- Normalization: L2-normalized für Cosine Similarity
E5 Model Prefixes
| Typ | Prefix | Verwendung |
|---|---|---|
| Dokumente | passage: |
Beim Speichern in DB |
| Queries | query: |
Bei Suchanfragen |
Konfiguration
application.yml:
ted:
vectorization:
enabled: true # Aktivierung
use-http-api: true # REST statt Subprocess
api-url: http://localhost:8001 # Embedding Service URL
model-name: intfloat/multilingual-e5-large
dimensions: 1024
batch-size: 16 # Scheduler batch size
max-text-length: 8192 # Max chars für Vektorisierung
Python Embedding Service Starten
Option 1: Docker Compose
docker-compose up -d embedding-service
Option 2: Standalone Python
Datei: embedding_service.py
from flask import Flask, request, jsonify
from sentence_transformers import SentenceTransformer
app = Flask(__name__)
model = SentenceTransformer('intfloat/multilingual-e5-large')
@app.route('/embed', methods=['POST'])
def embed():
data = request.json
text = data['text']
# Generate embedding
embedding = model.encode(text, normalize_embeddings=True)
return jsonify(embedding.tolist())
@app.route('/health', methods=['GET'])
def health():
return jsonify({"status": "ok"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8001)
Start:
pip install flask sentence-transformers
python embedding_service.py
Monitoring
Vektorisierungs-Status prüfen
SELECT
vectorization_status,
COUNT(*) as count
FROM ted.procurement_document
GROUP BY vectorization_status;
Mögliche Status:
PENDING- Wartet auf VektorisierungPROCESSING- Wird gerade vektorisiertCOMPLETED- Erfolgreich vektorisiertFAILED- Fehler bei VektorisierungSKIPPED- Kein Text-Inhalt vorhanden
Admin REST API
GET /api/v1/admin/vectorization/status
{
"enabled": true,
"pending": 42,
"completed": 1523,
"failed": 3
}
POST /api/v1/admin/vectorization/process-pending?batchSize=100
Trigger manuelle Verarbeitung von PENDING Dokumenten
Camel Routes Status
Actuator Endpoint: http://localhost:8888/api/actuator/camel
Zeigt Status aller Camel Routes:
vectorization-triggervectorization-processorvectorization-scheduler
Error Handling
Retry-Strategie
onException(Exception.class)
.maximumRedeliveries(2)
.redeliveryDelay(2000)
.handled(true)
.process(exchange -> {
// Update status to FAILED in database
});
Retries: 2x mit 2 Sekunden Pause
Bei endgültigem Fehler:
- Status →
FAILED - Error-Message in
vectorization_errorSpalte gespeichert - Dokument erscheint nicht mehr im Scheduler (nur PENDING)
Häufige Fehler
| Fehler | Ursache | Lösung |
|---|---|---|
| Connection refused | Embedding Service läuft nicht | Service starten |
| Invalid dimension | Falsches Model | Konfiguration prüfen |
| Timeout | Service überlastet | concurrentConsumers reduzieren |
| No text content | Dokument leer | Wird automatisch als SKIPPED markiert |
Performance
Durchsatz
Concurrent Workers: 4
- Pro Worker: ~2-3 Sekunden pro Dokument
- Gesamt: ~60-90 Dokumente/Minute
Optimierung:
vectorization:
thread-pool-size: 8 # Mehr concurrent workers
Memory
E5-Large Model:
- ~2 GB RAM
- Läuft auf CPU oder GPU
- Einmalig beim Service-Start geladen
Netzwerk
Request Size: ~8 KB (8192 chars max) Response Size: ~4 KB (1024 floats)
Best Practices
✅ DO:
- Embedding Service separat laufen lassen
- Service-Health über
/healthendpoint prüfen - Batch-Size an Server-Kapazität anpassen
- Failed Dokumente regelmäßig prüfen und retry
❌ DON'T:
- Nicht mehr als 8 concurrent workers (überlastet Service)
- Nicht zu große
max-text-length(>10000 chars) - Service nicht ohne Health-Check deployen
Semantic Search
Nach erfolgreicher Vektorisierung sind Dokumente über Semantic Search auffindbar:
curl "http://localhost:8888/api/v1/documents/semantic-search?query=medical+equipment"
Technologie:
- PostgreSQL pgvector Extension
- Cosine Similarity (
1 - (vec1 <=> vec2)) - IVFFlat Index für schnelle Suche
Troubleshooting
Dokumente werden nicht vektorisiert
- ✅ Check Embedding Service:
curl http://localhost:8001/health - ✅ Check Logs:
vectorization-processorRoute - ✅ Check DB:
SELECT * FROM procurement_document WHERE vectorization_status = 'FAILED' - ✅ Check Config:
vectorization.enabled = true
Embedding Service antwortet nicht
# Service Status
curl http://localhost:8001/health
# Test embedding
curl -X POST http://localhost:8001/embed \
-H "Content-Type: application/json" \
-d '{"text": "passage: test"}'
Camel Route läuft nicht
# Actuator Camel Routes
curl http://localhost:8888/api/actuator/camel/routes
Prüfen ob Route vectorization-processor Status Started hat.