# Vektorisierung mit Apache Camel ## Übersicht Die Vektorisierung erfolgt vollständig asynchron über **Apache Camel Routes** und nutzt einen externen **Python Embedding Service** über REST. ## Architektur ``` ┌─────────────────────────────────────────────────────────────────┐ │ Vektorisierungs-Pipeline │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌─────────────────┐ ┌───────────────┐ │ │ │ XML File │───▶│ TedDocumentRoute│───▶│ Document │ │ │ │ Processing │ │ │ │ Saved to DB │ │ │ └──────────────┘ └────────┬────────┘ └───────┬───────┘ │ │ │ │ │ │ │ wireTap │ │ │ ▼ │ │ │ ┌──────────────────────────────────────────────────┼──────┐ │ │ │ direct:vectorize (Trigger) │ │ │ │ └──────────────────────────┬───────────────────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ │ seda:vectorize-async (4 concurrent workers) │ │ │ │ │ │ │ │ │ │ 1. Load document from DB │ │ │ │ │ 2. Extract text_content (includes Lots!) │ │ │ │ │ 3. Set status = PROCESSING │ │ │ │ │ 4. Add "passage: " prefix │ │ │ │ │ 5. Call REST API │ │ │ │ │ 6. Update content_vector │ │ │ │ └──────────────┬───────────────────────────────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ │ Python Embedding Service (Port 8001) │ │ │ │ │ POST /embed │ │ │ │ │ Model: intfloat/multilingual-e5-large │ │ │ │ │ Returns: [1024 floats] │ │ │ │ └──────────────────────────────────────────────────┘ │ │ │ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ │ Timer Route (every 60s) │◀─────┘ │ │ │ │ │ │ │ SELECT * FROM procurement_document │ │ │ │ WHERE vectorization_status = 'PENDING' │ │ │ │ LIMIT 16 │ │ │ │ │ │ │ │ → Trigger vectorization for each │ │ │ └──────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ## Apache Camel Routes ### 1. Trigger Route (`direct:vectorize`) **Route-ID:** `vectorization-trigger` **Funktion:** Empfängt `documentId` und leitet an async Queue weiter **Integration:** Wird von `TedDocumentRoute` per `wireTap` aufgerufen (non-blocking) ```java from("direct:vectorize") .to("seda:vectorize-async?concurrentConsumers=4&waitForTaskToComplete=Never"); ``` ### 2. Async Processor Route (`seda:vectorize-async`) **Route-ID:** `vectorization-processor` **Concurrent Workers:** 4 (konfigurierbar) **Ablauf:** 1. ✅ Load document from DB via `documentId` 2. ✅ Update status → `PROCESSING` 3. ✅ Extract `text_content` (enthält Dokument + Lots!) 4. ✅ Truncate wenn > `max-text-length` (8192 chars) 5. ✅ Add prefix: `"passage: " + text` 6. ✅ POST → `http://localhost:8001/embed` mit JSON body 7. ✅ Parse JSON response → `float[1024]` 8. ✅ Update `content_vector` in DB 9. ✅ Update status → `COMPLETED` **Error Handling:** - Max 2 Retries mit 2s Delay - Bei Fehler: Status → `FAILED` mit Error-Message ### 3. Scheduler Route (`timer:vectorization-scheduler`) **Route-ID:** `vectorization-scheduler` **Interval:** 60 Sekunden (nach 5s Delay beim Start) **Funktion:** Verarbeitet noch nicht vektorisierte Dokumente aus der DB **Ablauf:** ```java from("timer:vectorization-scheduler?period=60000&delay=5000") .process(exchange -> { // Load PENDING documents from DB List pending = documentRepository.findByVectorizationStatus(PENDING, PageRequest.of(0, 16)); }) .split(body()) .to("direct:vectorize") // Trigger für jedes Dokument .end(); ``` ## Text-Inhalt für Vektorisierung Der `text_content` wird in `XmlParserService.generateTextContent()` erstellt und enthält: ``` Title: Mission de maitrise d'œuvre pour la création... Description: Désignation d'une équipe de maîtrise d'œuvre... Contracting Authority: Société Publique Locale, Bannalec (FRA) Contract Type: SERVICES Procedure: OTHER CPV Codes: 71200000 Lots (1): - LOT-0001: Mission de maîtrise d'œuvre... - Désignation d'une équipe... ``` **Alle Lot-Titel und Beschreibungen werden einbezogen!** ## REST API: Python Embedding Service ### Endpoint **POST** `http://localhost:8001/embed` ### Request ```json { "text": "passage: Title: Mission de maitrise d'œuvre..." } ``` ### Response ```json [0.123, -0.456, 0.789, ..., 0.321] ``` **Format:** JSON Array mit 1024 Floats ### Model - **Name:** `intfloat/multilingual-e5-large` - **Dimensions:** 1024 - **Languages:** 100+ (Mehrsprachig) - **Normalization:** L2-normalized für Cosine Similarity ### E5 Model Prefixes | Typ | Prefix | Verwendung | |-----|--------|------------| | **Dokumente** | `passage: ` | Beim Speichern in DB | | **Queries** | `query: ` | Bei Suchanfragen | ## Konfiguration **application.yml:** ```yaml ted: vectorization: enabled: true # Aktivierung use-http-api: true # REST statt Subprocess api-url: http://localhost:8001 # Embedding Service URL model-name: intfloat/multilingual-e5-large dimensions: 1024 batch-size: 16 # Scheduler batch size max-text-length: 8192 # Max chars für Vektorisierung ``` ## Python Embedding Service Starten ### Option 1: Docker Compose ```bash docker-compose up -d embedding-service ``` ### Option 2: Standalone Python **Datei:** `embedding_service.py` ```python from flask import Flask, request, jsonify from sentence_transformers import SentenceTransformer app = Flask(__name__) model = SentenceTransformer('intfloat/multilingual-e5-large') @app.route('/embed', methods=['POST']) def embed(): data = request.json text = data['text'] # Generate embedding embedding = model.encode(text, normalize_embeddings=True) return jsonify(embedding.tolist()) @app.route('/health', methods=['GET']) def health(): return jsonify({"status": "ok"}) if __name__ == '__main__': app.run(host='0.0.0.0', port=8001) ``` **Start:** ```bash pip install flask sentence-transformers python embedding_service.py ``` ## Monitoring ### Vektorisierungs-Status prüfen ```sql SELECT vectorization_status, COUNT(*) as count FROM ted.procurement_document GROUP BY vectorization_status; ``` **Mögliche Status:** - `PENDING` - Wartet auf Vektorisierung - `PROCESSING` - Wird gerade vektorisiert - `COMPLETED` - Erfolgreich vektorisiert - `FAILED` - Fehler bei Vektorisierung - `SKIPPED` - Kein Text-Inhalt vorhanden ### Admin REST API **GET** `/api/v1/admin/vectorization/status` ```json { "enabled": true, "pending": 42, "completed": 1523, "failed": 3 } ``` **POST** `/api/v1/admin/vectorization/process-pending?batchSize=100` Trigger manuelle Verarbeitung von PENDING Dokumenten ### Camel Routes Status **Actuator Endpoint:** `http://localhost:8888/api/actuator/camel` Zeigt Status aller Camel Routes: - `vectorization-trigger` - `vectorization-processor` - `vectorization-scheduler` ## Error Handling ### Retry-Strategie ```java onException(Exception.class) .maximumRedeliveries(2) .redeliveryDelay(2000) .handled(true) .process(exchange -> { // Update status to FAILED in database }); ``` **Retries:** 2x mit 2 Sekunden Pause **Bei endgültigem Fehler:** - Status → `FAILED` - Error-Message in `vectorization_error` Spalte gespeichert - Dokument erscheint nicht mehr im Scheduler (nur PENDING) ### Häufige Fehler | Fehler | Ursache | Lösung | |--------|---------|--------| | Connection refused | Embedding Service läuft nicht | Service starten | | Invalid dimension | Falsches Model | Konfiguration prüfen | | Timeout | Service überlastet | `concurrentConsumers` reduzieren | | No text content | Dokument leer | Wird automatisch als SKIPPED markiert | ## Performance ### Durchsatz **Concurrent Workers:** 4 - **Pro Worker:** ~2-3 Sekunden pro Dokument - **Gesamt:** ~60-90 Dokumente/Minute **Optimierung:** ```yaml vectorization: thread-pool-size: 8 # Mehr concurrent workers ``` ### Memory **E5-Large Model:** - ~2 GB RAM - Läuft auf CPU oder GPU - Einmalig beim Service-Start geladen ### Netzwerk **Request Size:** ~8 KB (8192 chars max) **Response Size:** ~4 KB (1024 floats) ## Best Practices ✅ **DO:** - Embedding Service separat laufen lassen - Service-Health über `/health` endpoint prüfen - Batch-Size an Server-Kapazität anpassen - Failed Dokumente regelmäßig prüfen und retry ❌ **DON'T:** - Nicht mehr als 8 concurrent workers (überlastet Service) - Nicht zu große `max-text-length` (>10000 chars) - Service nicht ohne Health-Check deployen ## Semantic Search Nach erfolgreicher Vektorisierung sind Dokumente über Semantic Search auffindbar: ```bash curl "http://localhost:8888/api/v1/documents/semantic-search?query=medical+equipment" ``` **Technologie:** - PostgreSQL pgvector Extension - Cosine Similarity (`1 - (vec1 <=> vec2)`) - IVFFlat Index für schnelle Suche ## Troubleshooting ### Dokumente werden nicht vektorisiert 1. ✅ Check Embedding Service: `curl http://localhost:8001/health` 2. ✅ Check Logs: `vectorization-processor` Route 3. ✅ Check DB: `SELECT * FROM procurement_document WHERE vectorization_status = 'FAILED'` 4. ✅ Check Config: `vectorization.enabled = true` ### Embedding Service antwortet nicht ```bash # Service Status curl http://localhost:8001/health # Test embedding curl -X POST http://localhost:8001/embed \ -H "Content-Type: application/json" \ -d '{"text": "passage: test"}' ``` ### Camel Route läuft nicht ```bash # Actuator Camel Routes curl http://localhost:8888/api/actuator/camel/routes ``` Prüfen ob Route `vectorization-processor` Status `Started` hat.