You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DIP/TED_AUTOMATED_PIPELINE.md

18 KiB

TED Automatisierte Download & Verarbeitungs-Pipeline

Übersicht

Die komplette automatisierte Pipeline für TED (Tenders Electronic Daily) Ausschreibungen:

┌────────────────────────────────────────────────────────────────────────┐
│                   TED Automatisierte Pipeline                          │
├────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────┐                                                   │
│  │  Timer (1h)     │  Alle 1 Stunde neue Packages prüfen              │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  HTTP Download  │  https://ted.europa.eu/packages/daily/           │
│  │  Package        │  Format: YYYY-MM-DD_XXXX.tar.gz                  │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  Extract        │  tar.gz → Tausende von XML Files                 │
│  │  tar.gz         │  Extract to: D:/ted.europe/extracted             │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  XML Splitter   │  Parallel Processing (Streaming)                 │
│  │  (Parallel)     │  Each XML → direct:process-document              │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  XML Parser     │  XPath Parsing + Metadata Extraction             │
│  │  & Validator    │  Schema Validation (eForms SDK 1.13)             │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  SHA-256 Hash   │  Idempotent Processing                           │
│  │  Check          │  Skip if already imported                        │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  Save to DB     │  PostgreSQL (ted.procurement_document)           │
│  │  (PostgreSQL)   │  + Native XML + Metadata                         │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  wireTap        │  Non-blocking Trigger                            │
│  │  Vectorization  │  direct:vectorize (async)                        │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  SEDA Queue     │  4 Concurrent Workers                            │
│  │  (Async)        │  vectorize-async queue                           │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  Extract Text   │  Title + Description + Lots                      │
│  │  Content        │  Buyer Info + CPV Codes                          │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  POST to        │  http://localhost:8001/embed                     │
│  │  Embedding API  │  {"text": "...", "is_query": false}              │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  Python Service │  intfloat/multilingual-e5-large                  │
│  │  (FastAPI)      │  Returns: 1024-dimensional vector                │
│  └────────┬────────┘                                                   │
│           │                                                            │
│           ▼                                                            │
│  ┌─────────────────┐                                                   │
│  │  Save Vector    │  content_vector column (pgvector)                │
│  │  to Database    │  Status: COMPLETED                               │
│  └─────────────────┘                                                   │
│                                                                         │
└────────────────────────────────────────────────────────────────────────┘

Konfiguration

application.yml:

ted:
  # Input directory (points to extract directory)
  input:
    directory: D:/ted.europe/extracted
    pattern: "**/*.xml"            # Recursive scanning
    poll-interval: 5000            # Check every 5 seconds
    max-messages-per-poll: 100     # Process up to 100 XMLs per poll

  # Automatic download from ted.europa.eu
  download:
    enabled: true                  # ✅ ENABLED
    base-url: https://ted.europa.eu/packages/daily/
    download-directory: D:/ted.europe/downloads
    extract-directory: D:/ted.europe/extracted
    start-year: 2024               # Start downloading from 2024
    poll-interval: 3600000         # Check every 1 hour
    max-consecutive-404: 4         # Stop after 4 consecutive 404s
    delete-after-extraction: true  # Clean up tar.gz files

  # Vectorization (automatic after save)
  vectorization:
    enabled: true                  # ✅ ENABLED
    api-url: http://localhost:8001
    model-name: intfloat/multilingual-e5-large
    dimensions: 1024
    batch-size: 16
    max-text-length: 8192

Camel Routes

1. TedPackageDownloadCamelRoute (Download & Extract)

Route ID: ted-package-scheduler

Trigger: Timer alle 1 Stunde

Ablauf:

  1. Bestimmt nächstes Package (Jahr + Serial Number)
  2. Prüft ob bereits vorhanden (Idempotent Consumer)
  3. HTTP GET von https://ted.europa.eu/packages/daily/YYYY-MM-DD_XXXX.tar.gz
  4. Speichert in download-directory
  5. Extrahiert nach extract-directory
  6. Löscht tar.gz (optional)
  7. Splittiert XML Files → direct:process-document

Enterprise Integration Patterns:

  • Timer Pattern
  • Idempotent Consumer
  • Content-Based Router
  • Splitter Pattern (Parallel + Streaming)
  • Dead Letter Channel

2. TedDocumentRoute (XML Processing)

Route ID: ted-document-processor

Trigger:

  • File Watcher auf D:/ted.europe/extracted
  • Direct Call von Download Route

Ablauf:

  1. Liest XML File
  2. Parst mit XPath (eForms UBL Schema)
  3. Extrahiert Metadata
  4. Berechnet SHA-256 Hash
  5. Prüft Duplikat in DB
  6. Speichert in ted.procurement_document
  7. wireTapdirect:vectorize (non-blocking!)

3. VectorizationRoute (Async Embedding)

Route ID: vectorization-processor

Trigger:

  • wireTap von TedDocumentRoute
  • Timer Scheduler (alle 60s für PENDING)

Ablauf:

  1. Load document from DB
  2. Extract text_content (Document + Lots)
  3. POST to Python Embedding Service
  4. Parse 1024-dimensional vector
  5. Save to content_vector column
  6. Update status → COMPLETED

Queue: SEDA with 4 concurrent workers

Verzeichnisstruktur

D:/ted.europe/
├── downloads/              # Temporäre tar.gz Downloads
│   └── 2025-11-30_0001.tar.gz
│   └── 2025-11-30_0002.tar.gz
│
├── extracted/              # Extrahierte XML Files
│   ├── 2025-11-30/
│   │   ├── 001/
│   │   │   ├── 00123456_2025.xml
│   │   │   └── 00123457_2025.xml
│   │   └── 002/
│   │       └── ...
│   └── .processed/         # Erfolgreich verarbeitete XMLs
│   └── .error/             # Fehlgeschlagene XMLs

Datenbank-Tracking

ted_daily_package (Download-Tracking)

Spalte Typ Beschreibung
id UUID Primary Key
year INT Package Jahr (2024, 2025)
serial_number INT Package Nummer (1, 2, 3...)
package_id VARCHAR Format: 2025-11-30_0001
download_url VARCHAR Full URL
download_status VARCHAR PENDING, DOWNLOADING, COMPLETED, NOT_FOUND, FAILED
downloaded_at TIMESTAMP Download-Zeitpunkt
file_size_bytes BIGINT Größe der tar.gz
xml_file_count INT Anzahl extrahierter XMLs
processed_count INT Anzahl verarbeiteter XMLs

procurement_document (XML-Daten)

Spalte Typ Beschreibung
id UUID Primary Key
document_hash VARCHAR(64) SHA-256 für Idempotenz
publication_id VARCHAR(50) TED ID (00123456-2025)
notice_url VARCHAR(255) Auto-generated TED URL
xml_document XML Native PostgreSQL XML
text_content TEXT Für Vektorisierung
content_vector vector(1024) pgvector Embedding
vectorization_status VARCHAR PENDING, PROCESSING, COMPLETED, FAILED

Monitoring

Camel Routes Status

curl http://localhost:8888/api/actuator/camel/routes

Wichtige Routes:

  • ted-package-scheduler - Download Timer
  • ted-document-processor - XML Processing
  • vectorization-processor - Embedding Generation
  • vectorization-scheduler - PENDING Documents

Download Status

SELECT
    year,
    COUNT(*) FILTER (WHERE download_status = 'COMPLETED') as completed,
    COUNT(*) FILTER (WHERE download_status = 'NOT_FOUND') as not_found,
    COUNT(*) FILTER (WHERE download_status = 'FAILED') as failed,
    SUM(xml_file_count) as total_xmls,
    SUM(processed_count) as processed_xmls
FROM ted.ted_daily_package
GROUP BY year
ORDER BY year DESC;

Vectorization Status

SELECT
    COUNT(*) FILTER (WHERE vectorization_status = 'COMPLETED') as completed,
    COUNT(*) FILTER (WHERE vectorization_status = 'PENDING') as pending,
    COUNT(*) FILTER (WHERE vectorization_status = 'FAILED') as failed,
    COUNT(*) FILTER (WHERE content_vector IS NOT NULL) as has_vector
FROM ted.procurement_document;

Heute verarbeitete Dokumente

SELECT
    COUNT(*) as today_count,
    MIN(created_at) as first,
    MAX(created_at) as last
FROM ted.procurement_document
WHERE created_at::date = CURRENT_DATE;

Python Embedding Service

Start:

python embedding_service.py

Health Check:

curl http://localhost:8001/health

Expected Response:

{
  "status": "healthy",
  "model_name": "intfloat/multilingual-e5-large",
  "dimensions": 1024,
  "max_length": 512
}

Start der Pipeline

  1. Python Embedding Service starten:

    python embedding_service.py
    
  2. Spring Boot Anwendung starten:

    mvn spring-boot:run
    
  3. Logs beobachten:

    INFO: Checking for new TED packages...
    INFO: Next package to download: 2025-11-30_0001
    INFO: Downloading from https://ted.europa.eu/packages/daily/...
    INFO: Extracting package 2025-11-30_0001...
    INFO: Processing 1247 XML files from package 2025-11-30_0001
    INFO: Document processed successfully: 00123456_2025.xml
    DEBUG: Queueing document for vectorization: xxx
    INFO: Successfully vectorized document: xxx
    

Durchsatz

Geschätzte Performance:

Phase Geschwindigkeit Bemerkung
Download 1 Package/Stunde Timer-basiert
Extract ~10 Sekunden tar.gz → XMLs
XML Processing ~100-200 XMLs/min Abhängig von CPU
Vectorization ~60-90 Docs/min 4 Workers, Python Service

Täglich:

  • ~24 Packages heruntergeladen
  • ~30.000-50.000 Dokumente verarbeitet (je nach Package-Größe)
  • ~30.000-50.000 Vektoren generiert

Fehlerbehandlung

Download Fehler

404 Not Found: Package existiert (noch) nicht

  • Max 4 consecutive 404s → Switch zu Vorjahr
  • Automatische Wiederholung nach 1 Stunde

Network Error: Temporäre Verbindungsprobleme

  • 3 Retries mit 10s Delay
  • Dead Letter Channel

Processing Fehler

Duplikate: SHA-256 Hash bereits vorhanden

  • Wird übersprungen (Idempotent Processing)
  • Log: "Duplicate document skipped"

XML Parsing Error: Ungültiges XML

  • 3 Retries
  • Move to .error directory
  • Status: FAILED in DB

Vectorization Fehler

Embedding Service nicht erreichbar:

  • 2 Retries mit 2s Delay
  • Status: FAILED
  • Scheduler versucht erneut nach 60s

Invalid Embedding Dimension:

  • Status: FAILED mit Error-Message
  • Manuelles Eingreifen erforderlich

Troubleshooting

Pipeline läuft nicht

# Prüfe Camel Routes
curl http://localhost:8888/api/actuator/camel/routes | jq '.routes[] | {id: .id, status: .status}'

# Prüfe Download Route
tail -f logs/ted-procurement-processor.log | grep "ted-package"

# Prüfe Vectorization Route
tail -f logs/ted-procurement-processor.log | grep "vectoriz"

Keine Downloads

  1. Prüfe ted.download.enabled = true
  2. Prüfe Internet-Verbindung
  3. Prüfe ted.europa.eu erreichbar
  4. Prüfe Logs für 404/403 Errors

Keine Vektorisierung

  1. Prüfe Embedding Service: curl http://localhost:8001/health
  2. Prüfe ted.vectorization.enabled = true
  3. Prüfe PENDING Dokumente in DB
  4. Prüfe Logs für HTTP 400/500 Errors

Nach erfolgreicher Vektorisierung sind Dokumente durchsuchbar:

# Semantic Search
curl "http://localhost:8888/api/v1/documents/semantic-search?query=medical+equipment"

# Combined Search (Semantic + Filters)
curl -X POST "http://localhost:8888/api/v1/documents/search" \
  -H "Content-Type: application/json" \
  -d '{
    "countryCodes": ["DEU", "AUT"],
    "semanticQuery": "software development",
    "similarityThreshold": 0.7
  }'

Performance-Optimierung

Vectorization beschleunigen

ted:
  vectorization:
    thread-pool-size: 8  # Mehr Workers (Standard: 4)

Achtung: Mehr Workers = mehr Last auf Python Service!

XML Processing beschleunigen

ted:
  input:
    max-messages-per-poll: 200  # Mehr Files pro Poll (Standard: 100)

Download parallelisieren

ted:
  download:
    max-concurrent-downloads: 4  # Mehr parallele Downloads (Standard: 2)

Achtung: ted.europa.eu Rate Limiting beachten!

Zusammenfassung

Komplett automatisierte Pipeline von Download bis Semantic Search Idempotent Processing - Keine Duplikate Asynchrone Vektorisierung - Non-blocking Enterprise Integration Patterns - Production-ready Fehlerbehandlung - Retries & Dead Letter Channel Monitoring - Actuator + SQL Queries Skalierbar - Concurrent Workers & Parallel Processing

Die Pipeline läuft vollautomatisch 24/7 und verarbeitet alle neuen TED-Ausschreibungen! 🚀