You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

364 lines
17 KiB
Java

package at.procon.ted.camel;
import at.procon.ted.config.TedProcessorProperties;
import at.procon.ted.model.entity.ProcurementDocument;
import at.procon.ted.model.entity.VectorizationStatus;
import at.procon.ted.repository.ProcurementDocumentRepository;
import at.procon.ted.service.VectorizationProcessorService;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
import at.procon.dip.runtime.config.RuntimeMode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.UUID;
/**
* Apache Camel route for asynchronous document vectorization.
*
* Features:
* - Async vectorization triggered after document processing
* - Scheduled processing of pending vectorizations from database
* - Direct REST calls to Python embedding service
* - Error handling with retry mechanism
*
* @author Martin.Schweitzer@procon.co.at and claude.ai
*/
@Component
@ConditionalOnRuntimeMode(RuntimeMode.LEGACY)
@RequiredArgsConstructor
@Slf4j
public class VectorizationRoute extends RouteBuilder {
private static final String ROUTE_ID_TRIGGER = "vectorization-trigger";
private static final String ROUTE_ID_PROCESSOR = "vectorization-processor";
private static final String ROUTE_ID_SCHEDULER = "vectorization-scheduler";
private final TedProcessorProperties properties;
private final ProcurementDocumentRepository documentRepository;
private final VectorizationProcessorService vectorizationProcessorService;
private final ObjectMapper objectMapper;
/**
* Creates thread pool for vectorization with highest priority.
* Only 1 thread since only one embedding service is available.
*/
private java.util.concurrent.ExecutorService executorService() {
return java.util.concurrent.Executors.newFixedThreadPool(
1,
r -> {
Thread thread = new Thread(r);
thread.setName("ted-vectorization-" + thread.getId());
thread.setDaemon(true);
thread.setPriority(Thread.MAX_PRIORITY); // Highest priority
return thread;
}
);
}
@Override
public void configure() throws Exception {
if (!properties.getVectorization().isEnabled()) {
log.info("Vectorization is disabled, skipping route configuration");
return;
}
log.info("Configuring vectorization routes (enabled=true, apiUrl={}, connectTimeout={}ms, socketTimeout={}ms, maxRetries={}, scheduler every 6s)",
properties.getVectorization().getApiUrl(),
properties.getVectorization().getConnectTimeout(),
properties.getVectorization().getSocketTimeout(),
properties.getVectorization().getMaxRetries());
// Global error handler for unexpected exceptions (like NullPointer, Connection pool shutdown, etc.)
// Only catches severe exceptions that are not handled by route-specific doCatch
onException(NullPointerException.class, IllegalStateException.class)
.routeId("vectorization-error-handler")
.handled(true)
.process(exchange -> {
UUID documentId = exchange.getIn().getHeader("documentId", UUID.class);
Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
String errorMsg = exception != null ? exception.getClass().getSimpleName() + ": " + exception.getMessage() : "Unknown error";
// If connection pool is shut down, it's likely during application shutdown - just log warning
if (errorMsg.contains("Connection pool shut down")) {
log.warn("Vectorization aborted for document {} - connection pool shut down (application shutting down?)", documentId);
return;
}
log.error("Unexpected error in vectorization for document {}: {}", documentId, errorMsg, exception);
// Update document status to FAILED via service (transactional)
if (documentId != null) {
try {
vectorizationProcessorService.markAsFailed(documentId, errorMsg);
} catch (Exception e) {
log.warn("Failed to mark document {} as failed: {}", documentId, e.getMessage());
}
}
})
.to("log:vectorization-error?level=WARN");
// Trigger route: Receives document ID and queues for async processing
// Queue size limited to 1000 to prevent memory issues
from("direct:vectorize")
.routeId(ROUTE_ID_TRIGGER)
.doTry()
.to("seda:vectorize-async?waitForTaskToComplete=Never&size=1000&blockWhenFull=true&timeout=5000")
.doCatch(Exception.class)
.log(LoggingLevel.WARN, "Failed to queue document ${header.documentId} for vectorization (queue may be full or shutting down): ${exception.message}")
.end();
// Async processor route: Performs actual vectorization with highest priority
// Uses dedicated single-thread pool with MAX_PRIORITY (1 thread for 1 embedding service)
from("seda:vectorize-async?size=1000")
.routeId(ROUTE_ID_PROCESSOR)
.threads().executorService(executorService())
.process(exchange -> {
UUID documentId = exchange.getIn().getHeader("documentId", UUID.class);
log.debug("Starting vectorization for document: {}", documentId);
// Prepare document for vectorization (transactional)
VectorizationProcessorService.DocumentContent docContent =
vectorizationProcessorService.prepareDocumentForVectorization(documentId);
if (docContent == null) {
// Document was skipped (no content)
log.debug("Document {} has no content, skipping vectorization", documentId);
exchange.setProperty("skipVectorization", true);
return;
}
// Prepare request object
EmbedRequest embedRequest = new EmbedRequest();
embedRequest.text = docContent.textContent();
embedRequest.isQuery = false;
// Set headers and body for REST call
exchange.getIn().setHeader("documentId", documentId);
exchange.getIn().setHeader(Exchange.HTTP_METHOD, "POST");
exchange.getIn().setHeader(Exchange.CONTENT_TYPE, "application/json");
exchange.getIn().setBody(embedRequest);
})
.choice()
.when(exchangeProperty("skipVectorization").isEqualTo(true))
.log(LoggingLevel.DEBUG, "Skipping vectorization (no content): ${header.documentId}")
.otherwise()
// Marshal request to JSON
.marshal().json(JsonLibrary.Jackson)
// Initialize retry counter
.setProperty("retryCount", constant(0))
.setProperty("maxRetries", constant(properties.getVectorization().getMaxRetries()))
.setProperty("vectorizationSuccess", constant(false))
// Retry loop with exponential backoff
.loopDoWhile(simple("${exchangeProperty.vectorizationSuccess} == false && ${exchangeProperty.retryCount} < ${exchangeProperty.maxRetries}"))
.process(exchange -> {
Integer retryCount = exchange.getProperty("retryCount", Integer.class);
exchange.setProperty("retryCount", retryCount + 1);
// Exponential backoff: 2s, 4s, 8s, 16s, 32s
if (retryCount > 0) {
long backoffMs = (long) Math.pow(2, retryCount) * 1000;
UUID documentId = exchange.getIn().getHeader("documentId", UUID.class);
log.warn("Retry #{} for document {} after {}ms backoff", retryCount, documentId, backoffMs);
Thread.sleep(backoffMs);
}
})
.doTry()
// HTTP call with configurable timeouts
.toD(properties.getVectorization().getApiUrl() + "/embed?bridgeEndpoint=true&throwExceptionOnFailure=false&connectTimeout=" +
properties.getVectorization().getConnectTimeout() + "&socketTimeout=" +
properties.getVectorization().getSocketTimeout())
.process(exchange -> {
UUID documentId = exchange.getIn().getHeader("documentId", UUID.class);
Integer statusCode = exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
if (statusCode == null) {
log.error("No response from embedding service for document {} (service may be down!)", documentId);
throw new RuntimeException("Embedding service not reachable (no HTTP response)");
}
if (statusCode != 200) {
String responseBody = exchange.getIn().getBody(String.class);
String errorMsg = "HTTP " + statusCode + " from embedding service: " + responseBody;
log.error("Embedding service error for document {}: {}", documentId, errorMsg);
throw new RuntimeException(errorMsg);
}
})
.unmarshal().json(JsonLibrary.Jackson, EmbedResponse.class)
.process(exchange -> {
UUID documentId = exchange.getIn().getHeader("documentId", UUID.class);
EmbedResponse response = exchange.getIn().getBody(EmbedResponse.class);
if (response == null || response.embedding == null) {
throw new RuntimeException("Embedding service returned null response");
}
log.debug("Successfully vectorized document {}: {} dimensions, {} tokens",
documentId, response.dimensions, response.tokenCount);
// Save embedding with token count via service (transactional)
vectorizationProcessorService.saveEmbedding(documentId, response.embedding, response.tokenCount);
// Mark as successful to stop retry loop
exchange.setProperty("vectorizationSuccess", true);
})
.doCatch(Exception.class)
.process(exchange -> {
UUID documentId = exchange.getIn().getHeader("documentId", UUID.class);
Integer retryCount = exchange.getProperty("retryCount", Integer.class);
Integer maxRetries = exchange.getProperty("maxRetries", Integer.class);
Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
String errorMsg = exception != null ? exception.getMessage() : "Unknown error";
// Check if error is due to shutdown
if (errorMsg != null && errorMsg.contains("Connection pool shut down")) {
log.warn("Vectorization aborted for document {} - connection pool shut down (application shutting down)", documentId);
// Don't mark as failed - it will be retried on next startup
exchange.setProperty("vectorizationSuccess", true); // Stop retry loop
return;
}
if (retryCount >= maxRetries) {
log.error("Vectorization failed for document {} after {} retries: {}", documentId, maxRetries, errorMsg, exception);
try {
vectorizationProcessorService.markAsFailed(documentId, errorMsg);
} catch (Exception e) {
log.warn("Failed to mark document {} as failed (may be shutting down): {}", documentId, e.getMessage());
}
} else {
log.warn("Vectorization attempt #{} failed for document {}: {}", retryCount, documentId, errorMsg);
}
})
.end()
.end()
.end();
// Scheduled route: Process pending and failed vectorizations from database
// Runs every 6 seconds to catch documents that need (re-)vectorization
from("timer:vectorization-scheduler?period=6000&delay=500")
.routeId(ROUTE_ID_SCHEDULER)
.log(LoggingLevel.DEBUG, "Vectorization scheduler: Checking for pending/failed documents...")
.process(exchange -> {
int batchSize = properties.getVectorization().getBatchSize();
// First get PENDING documents (highest priority)
List<ProcurementDocument> pending = documentRepository.findByVectorizationStatus(
VectorizationStatus.PENDING,
PageRequest.of(0, batchSize)
);
// If no PENDING, get FAILED documents for retry
List<ProcurementDocument> failed = List.of();
if (pending.isEmpty()) {
failed = documentRepository.findByVectorizationStatus(
VectorizationStatus.FAILED,
PageRequest.of(0, batchSize)
);
}
List<ProcurementDocument> toProcess = !pending.isEmpty() ? pending : failed;
if (!toProcess.isEmpty()) {
String status = !pending.isEmpty() ? "PENDING" : "FAILED";
log.debug("Processing {} {} vectorizations from database", toProcess.size(), status);
exchange.getIn().setBody(toProcess);
} else {
exchange.setProperty("noPendingDocs", true);
}
})
.choice()
.when(exchangeProperty("noPendingDocs").isEqualTo(true))
.log(LoggingLevel.DEBUG, "Vectorization scheduler: No pending or failed vectorizations found")
.otherwise()
.split(body())
.process(exchange -> {
ProcurementDocument doc = exchange.getIn().getBody(ProcurementDocument.class);
exchange.getIn().setHeader("documentId", doc.getId());
})
.to("direct:vectorize")
.end()
.end();
}
/**
* Request model for embedding service.
* Matches Python FastAPI EmbedRequest model with snake_case field names.
*/
public static class EmbedRequest {
@JsonProperty("text")
public String text;
@JsonProperty("is_query")
public boolean isQuery;
public EmbedRequest() {}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
@JsonProperty("is_query")
public boolean isIsQuery() {
return isQuery;
}
@JsonProperty("is_query")
public void setIsQuery(boolean isQuery) {
this.isQuery = isQuery;
}
}
/**
* Response model for embedding service.
*/
public static class EmbedResponse {
public float[] embedding;
public int dimensions;
@JsonProperty("token_count")
public int tokenCount;
public EmbedResponse() {}
public float[] getEmbedding() {
return embedding;
}
public void setEmbedding(float[] embedding) {
this.embedding = embedding;
}
public int getDimensions() {
return dimensions;
}
public void setDimensions(int dimensions) {
this.dimensions = dimensions;
}
@JsonProperty("token_count")
public int getTokenCount() {
return tokenCount;
}
@JsonProperty("token_count")
public void setTokenCount(int tokenCount) {
this.tokenCount = tokenCount;
}
}
}