You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
386 lines
13 KiB
Markdown
386 lines
13 KiB
Markdown
# Vektorisierung mit Apache Camel
|
|
|
|
## Übersicht
|
|
|
|
Die Vektorisierung erfolgt vollständig asynchron über **Apache Camel Routes** und nutzt einen externen **Python Embedding Service** über REST.
|
|
|
|
## Architektur
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ Vektorisierungs-Pipeline │
|
|
├─────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ ┌──────────────┐ ┌─────────────────┐ ┌───────────────┐ │
|
|
│ │ XML File │───▶│ TedDocumentRoute│───▶│ Document │ │
|
|
│ │ Processing │ │ │ │ Saved to DB │ │
|
|
│ └──────────────┘ └────────┬────────┘ └───────┬───────┘ │
|
|
│ │ │ │
|
|
│ │ wireTap │ │
|
|
│ ▼ │ │
|
|
│ ┌──────────────────────────────────────────────────┼──────┐ │
|
|
│ │ direct:vectorize (Trigger) │ │ │
|
|
│ └──────────────────────────┬───────────────────────┘ │ │
|
|
│ │ │ │
|
|
│ ▼ │ │
|
|
│ ┌──────────────────────────────────────────────────┐ │ │
|
|
│ │ seda:vectorize-async (4 concurrent workers) │ │ │
|
|
│ │ │ │ │
|
|
│ │ 1. Load document from DB │ │ │
|
|
│ │ 2. Extract text_content (includes Lots!) │ │ │
|
|
│ │ 3. Set status = PROCESSING │ │ │
|
|
│ │ 4. Add "passage: " prefix │ │ │
|
|
│ │ 5. Call REST API │ │ │
|
|
│ │ 6. Update content_vector │ │ │
|
|
│ └──────────────┬───────────────────────────────────┘ │ │
|
|
│ │ │ │
|
|
│ ▼ │ │
|
|
│ ┌──────────────────────────────────────────────────┐ │ │
|
|
│ │ Python Embedding Service (Port 8001) │ │ │
|
|
│ │ POST /embed │ │ │
|
|
│ │ Model: intfloat/multilingual-e5-large │ │ │
|
|
│ │ Returns: [1024 floats] │ │ │
|
|
│ └──────────────────────────────────────────────────┘ │ │
|
|
│ │ │
|
|
│ ┌──────────────────────────────────────────────────┐ │ │
|
|
│ │ Timer Route (every 60s) │◀─────┘ │
|
|
│ │ │ │
|
|
│ │ SELECT * FROM procurement_document │ │
|
|
│ │ WHERE vectorization_status = 'PENDING' │ │
|
|
│ │ LIMIT 16 │ │
|
|
│ │ │ │
|
|
│ │ → Trigger vectorization for each │ │
|
|
│ └──────────────────────────────────────────────────┘ │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
## Apache Camel Routes
|
|
|
|
### 1. Trigger Route (`direct:vectorize`)
|
|
|
|
**Route-ID:** `vectorization-trigger`
|
|
|
|
**Funktion:** Empfängt `documentId` und leitet an async Queue weiter
|
|
|
|
**Integration:** Wird von `TedDocumentRoute` per `wireTap` aufgerufen (non-blocking)
|
|
|
|
```java
|
|
from("direct:vectorize")
|
|
.to("seda:vectorize-async?concurrentConsumers=4&waitForTaskToComplete=Never");
|
|
```
|
|
|
|
### 2. Async Processor Route (`seda:vectorize-async`)
|
|
|
|
**Route-ID:** `vectorization-processor`
|
|
|
|
**Concurrent Workers:** 4 (konfigurierbar)
|
|
|
|
**Ablauf:**
|
|
1. ✅ Load document from DB via `documentId`
|
|
2. ✅ Update status → `PROCESSING`
|
|
3. ✅ Extract `text_content` (enthält Dokument + Lots!)
|
|
4. ✅ Truncate wenn > `max-text-length` (8192 chars)
|
|
5. ✅ Add prefix: `"passage: " + text`
|
|
6. ✅ POST → `http://localhost:8001/embed` mit JSON body
|
|
7. ✅ Parse JSON response → `float[1024]`
|
|
8. ✅ Update `content_vector` in DB
|
|
9. ✅ Update status → `COMPLETED`
|
|
|
|
**Error Handling:**
|
|
- Max 2 Retries mit 2s Delay
|
|
- Bei Fehler: Status → `FAILED` mit Error-Message
|
|
|
|
### 3. Scheduler Route (`timer:vectorization-scheduler`)
|
|
|
|
**Route-ID:** `vectorization-scheduler`
|
|
|
|
**Interval:** 60 Sekunden (nach 5s Delay beim Start)
|
|
|
|
**Funktion:** Verarbeitet noch nicht vektorisierte Dokumente aus der DB
|
|
|
|
**Ablauf:**
|
|
```java
|
|
from("timer:vectorization-scheduler?period=60000&delay=5000")
|
|
.process(exchange -> {
|
|
// Load PENDING documents from DB
|
|
List<ProcurementDocument> pending =
|
|
documentRepository.findByVectorizationStatus(PENDING, PageRequest.of(0, 16));
|
|
})
|
|
.split(body())
|
|
.to("direct:vectorize") // Trigger für jedes Dokument
|
|
.end();
|
|
```
|
|
|
|
## Text-Inhalt für Vektorisierung
|
|
|
|
Der `text_content` wird in `XmlParserService.generateTextContent()` erstellt und enthält:
|
|
|
|
```
|
|
Title: Mission de maitrise d'œuvre pour la création...
|
|
|
|
Description: Désignation d'une équipe de maîtrise d'œuvre...
|
|
|
|
Contracting Authority: Société Publique Locale, Bannalec (FRA)
|
|
|
|
Contract Type: SERVICES
|
|
Procedure: OTHER
|
|
CPV Codes: 71200000
|
|
|
|
Lots (1):
|
|
- LOT-0001: Mission de maîtrise d'œuvre... - Désignation d'une équipe...
|
|
```
|
|
|
|
**Alle Lot-Titel und Beschreibungen werden einbezogen!**
|
|
|
|
## REST API: Python Embedding Service
|
|
|
|
### Endpoint
|
|
|
|
**POST** `http://localhost:8001/embed`
|
|
|
|
### Request
|
|
|
|
```json
|
|
{
|
|
"text": "passage: Title: Mission de maitrise d'œuvre..."
|
|
}
|
|
```
|
|
|
|
### Response
|
|
|
|
```json
|
|
[0.123, -0.456, 0.789, ..., 0.321]
|
|
```
|
|
|
|
**Format:** JSON Array mit 1024 Floats
|
|
|
|
### Model
|
|
|
|
- **Name:** `intfloat/multilingual-e5-large`
|
|
- **Dimensions:** 1024
|
|
- **Languages:** 100+ (Mehrsprachig)
|
|
- **Normalization:** L2-normalized für Cosine Similarity
|
|
|
|
### E5 Model Prefixes
|
|
|
|
| Typ | Prefix | Verwendung |
|
|
|-----|--------|------------|
|
|
| **Dokumente** | `passage: ` | Beim Speichern in DB |
|
|
| **Queries** | `query: ` | Bei Suchanfragen |
|
|
|
|
## Konfiguration
|
|
|
|
**application.yml:**
|
|
|
|
```yaml
|
|
ted:
|
|
vectorization:
|
|
enabled: true # Aktivierung
|
|
use-http-api: true # REST statt Subprocess
|
|
api-url: http://localhost:8001 # Embedding Service URL
|
|
model-name: intfloat/multilingual-e5-large
|
|
dimensions: 1024
|
|
batch-size: 16 # Scheduler batch size
|
|
max-text-length: 8192 # Max chars für Vektorisierung
|
|
```
|
|
|
|
## Python Embedding Service Starten
|
|
|
|
### Option 1: Docker Compose
|
|
|
|
```bash
|
|
docker-compose up -d embedding-service
|
|
```
|
|
|
|
### Option 2: Standalone Python
|
|
|
|
**Datei:** `embedding_service.py`
|
|
|
|
```python
|
|
from flask import Flask, request, jsonify
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
app = Flask(__name__)
|
|
model = SentenceTransformer('intfloat/multilingual-e5-large')
|
|
|
|
@app.route('/embed', methods=['POST'])
|
|
def embed():
|
|
data = request.json
|
|
text = data['text']
|
|
|
|
# Generate embedding
|
|
embedding = model.encode(text, normalize_embeddings=True)
|
|
|
|
return jsonify(embedding.tolist())
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health():
|
|
return jsonify({"status": "ok"})
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=8001)
|
|
```
|
|
|
|
**Start:**
|
|
```bash
|
|
pip install flask sentence-transformers
|
|
python embedding_service.py
|
|
```
|
|
|
|
## Monitoring
|
|
|
|
### Vektorisierungs-Status prüfen
|
|
|
|
```sql
|
|
SELECT
|
|
vectorization_status,
|
|
COUNT(*) as count
|
|
FROM ted.procurement_document
|
|
GROUP BY vectorization_status;
|
|
```
|
|
|
|
**Mögliche Status:**
|
|
- `PENDING` - Wartet auf Vektorisierung
|
|
- `PROCESSING` - Wird gerade vektorisiert
|
|
- `COMPLETED` - Erfolgreich vektorisiert
|
|
- `FAILED` - Fehler bei Vektorisierung
|
|
- `SKIPPED` - Kein Text-Inhalt vorhanden
|
|
|
|
### Admin REST API
|
|
|
|
**GET** `/api/v1/admin/vectorization/status`
|
|
```json
|
|
{
|
|
"enabled": true,
|
|
"pending": 42,
|
|
"completed": 1523,
|
|
"failed": 3
|
|
}
|
|
```
|
|
|
|
**POST** `/api/v1/admin/vectorization/process-pending?batchSize=100`
|
|
|
|
Trigger manuelle Verarbeitung von PENDING Dokumenten
|
|
|
|
### Camel Routes Status
|
|
|
|
**Actuator Endpoint:** `http://localhost:8888/api/actuator/camel`
|
|
|
|
Zeigt Status aller Camel Routes:
|
|
- `vectorization-trigger`
|
|
- `vectorization-processor`
|
|
- `vectorization-scheduler`
|
|
|
|
## Error Handling
|
|
|
|
### Retry-Strategie
|
|
|
|
```java
|
|
onException(Exception.class)
|
|
.maximumRedeliveries(2)
|
|
.redeliveryDelay(2000)
|
|
.handled(true)
|
|
.process(exchange -> {
|
|
// Update status to FAILED in database
|
|
});
|
|
```
|
|
|
|
**Retries:** 2x mit 2 Sekunden Pause
|
|
|
|
**Bei endgültigem Fehler:**
|
|
- Status → `FAILED`
|
|
- Error-Message in `vectorization_error` Spalte gespeichert
|
|
- Dokument erscheint nicht mehr im Scheduler (nur PENDING)
|
|
|
|
### Häufige Fehler
|
|
|
|
| Fehler | Ursache | Lösung |
|
|
|--------|---------|--------|
|
|
| Connection refused | Embedding Service läuft nicht | Service starten |
|
|
| Invalid dimension | Falsches Model | Konfiguration prüfen |
|
|
| Timeout | Service überlastet | `concurrentConsumers` reduzieren |
|
|
| No text content | Dokument leer | Wird automatisch als SKIPPED markiert |
|
|
|
|
## Performance
|
|
|
|
### Durchsatz
|
|
|
|
**Concurrent Workers:** 4
|
|
- **Pro Worker:** ~2-3 Sekunden pro Dokument
|
|
- **Gesamt:** ~60-90 Dokumente/Minute
|
|
|
|
**Optimierung:**
|
|
```yaml
|
|
vectorization:
|
|
thread-pool-size: 8 # Mehr concurrent workers
|
|
```
|
|
|
|
### Memory
|
|
|
|
**E5-Large Model:**
|
|
- ~2 GB RAM
|
|
- Läuft auf CPU oder GPU
|
|
- Einmalig beim Service-Start geladen
|
|
|
|
### Netzwerk
|
|
|
|
**Request Size:** ~8 KB (8192 chars max)
|
|
**Response Size:** ~4 KB (1024 floats)
|
|
|
|
## Best Practices
|
|
|
|
✅ **DO:**
|
|
- Embedding Service separat laufen lassen
|
|
- Service-Health über `/health` endpoint prüfen
|
|
- Batch-Size an Server-Kapazität anpassen
|
|
- Failed Dokumente regelmäßig prüfen und retry
|
|
|
|
❌ **DON'T:**
|
|
- Nicht mehr als 8 concurrent workers (überlastet Service)
|
|
- Nicht zu große `max-text-length` (>10000 chars)
|
|
- Service nicht ohne Health-Check deployen
|
|
|
|
## Semantic Search
|
|
|
|
Nach erfolgreicher Vektorisierung sind Dokumente über Semantic Search auffindbar:
|
|
|
|
```bash
|
|
curl "http://localhost:8888/api/v1/documents/semantic-search?query=medical+equipment"
|
|
```
|
|
|
|
**Technologie:**
|
|
- PostgreSQL pgvector Extension
|
|
- Cosine Similarity (`1 - (vec1 <=> vec2)`)
|
|
- IVFFlat Index für schnelle Suche
|
|
|
|
## Troubleshooting
|
|
|
|
### Dokumente werden nicht vektorisiert
|
|
|
|
1. ✅ Check Embedding Service: `curl http://localhost:8001/health`
|
|
2. ✅ Check Logs: `vectorization-processor` Route
|
|
3. ✅ Check DB: `SELECT * FROM procurement_document WHERE vectorization_status = 'FAILED'`
|
|
4. ✅ Check Config: `vectorization.enabled = true`
|
|
|
|
### Embedding Service antwortet nicht
|
|
|
|
```bash
|
|
# Service Status
|
|
curl http://localhost:8001/health
|
|
|
|
# Test embedding
|
|
curl -X POST http://localhost:8001/embed \
|
|
-H "Content-Type: application/json" \
|
|
-d '{"text": "passage: test"}'
|
|
```
|
|
|
|
### Camel Route läuft nicht
|
|
|
|
```bash
|
|
# Actuator Camel Routes
|
|
curl http://localhost:8888/api/actuator/camel/routes
|
|
```
|
|
|
|
Prüfen ob Route `vectorization-processor` Status `Started` hat.
|