From 439a06d63380240f7b45ca84837715fed2550e62 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Tue, 21 Apr 2026 14:51:53 +0200 Subject: [PATCH] embedding model prefixes support --- docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md | 30 +++++++++++ .../document/entity/DocumentEmbedding.java | 13 ++++- .../DocumentEmbeddingRepository.java | 15 +++++- .../embedding/config/EmbeddingProperties.java | 4 ++ .../model/EmbeddingInputPrefixing.java | 35 ++++++++++++ .../model/EmbeddingModelDescriptor.java | 29 +++++++++- .../embedding/model/EmbeddingPrefixMode.java | 7 +++ .../model/EmbeddingProviderResult.java | 11 +++- .../http/ExternalHttpEmbeddingProvider.java | 24 ++++++--- .../http/VectorSyncHttpEmbeddingProvider.java | 40 ++++++++------ .../registry/EmbeddingModelRegistry.java | 5 +- ...beddingJobExecutionPersistenceService.java | 14 ++++- .../service/EmbeddingPersistenceService.java | 15 +++++- .../RepresentationEmbeddingOrchestrator.java | 4 +- .../EmbeddingSubsystemStartupValidator.java | 13 ++++- src/main/resources/application-legacy.yml | 2 +- src/main/resources/application-new.yml | 27 +++++++--- src/main/resources/application.yml | 9 ++-- .../V27__doc_embedding_prefix_provenance.sql | 8 +++ ...NotDependOnTedProcessorPropertiesTest.java | 5 +- .../VectorSyncHttpEmbeddingProviderTest.java | 53 ++++++++++++++++++- ...presentationEmbeddingOrchestratorTest.java | 18 +++---- 22 files changed, 323 insertions(+), 58 deletions(-) create mode 100644 src/main/java/at/procon/dip/embedding/model/EmbeddingInputPrefixing.java create mode 100644 src/main/java/at/procon/dip/embedding/model/EmbeddingPrefixMode.java create mode 100644 src/main/resources/db/migration/V27__doc_embedding_prefix_provenance.sql diff --git a/docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md b/docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md index dc8b182..ea40c3b 100644 --- a/docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md +++ b/docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md @@ -67,3 +67,33 @@ Notes: - non-batch-capable models still fall back to single-item execution - `parallel-batch-count` controls how many claimed job batches may be started in parallel - `execution-batch-size` controls how many texts are sent in one `/vectorize-batch` request inside each claimed job batch + + +## E5 prefix handling + +For models such as `intfloat/multilingual-e5-large`, configure prefix handling on the model: + +```yaml +dip: + embedding: + models: + e5-default: + provider-config-key: vector-sync-e5 + provider-model-key: intfloat/multilingual-e5-large + dimensions: 1024 + supports-batch: true + prefix-mode: CLIENT + query-prefix: "query: " + document-prefix: "passage: " +``` + +Supported modes: +- `OFF` - DIP sends raw text +- `CLIENT` - DIP prepends the configured prefix before calling the provider +- `EXTERNAL` - DIP assumes the external service applies the prefixing itself + +For persisted document embeddings, the produced prefix provenance is stored in `doc.doc_embedding`: +- `prefix_mode` +- `applied_prefix` + +This makes it possible to identify whether indexed vectors were created with raw text, DIP-side prefixing, or externally handled prefixing before deciding on re-embedding. diff --git a/src/main/java/at/procon/dip/domain/document/entity/DocumentEmbedding.java b/src/main/java/at/procon/dip/domain/document/entity/DocumentEmbedding.java index 07797d1..281a40e 100644 --- a/src/main/java/at/procon/dip/domain/document/entity/DocumentEmbedding.java +++ b/src/main/java/at/procon/dip/domain/document/entity/DocumentEmbedding.java @@ -2,6 +2,7 @@ package at.procon.dip.domain.document.entity; import at.procon.dip.architecture.SchemaNames; import at.procon.dip.domain.document.EmbeddingStatus; +import at.procon.dip.embedding.model.EmbeddingPrefixMode; import jakarta.persistence.Column; import jakarta.persistence.Entity; import jakarta.persistence.EnumType; @@ -37,7 +38,8 @@ import lombok.Setter; @Index(name = "idx_doc_embedding_repr", columnList = "representation_id"), @Index(name = "idx_doc_embedding_model", columnList = "model_id"), @Index(name = "idx_doc_embedding_status", columnList = "embedding_status"), - @Index(name = "idx_doc_embedding_embedded_at", columnList = "embedded_at") + @Index(name = "idx_doc_embedding_embedded_at", columnList = "embedded_at"), + @Index(name = "idx_doc_embedding_prefix_mode", columnList = "prefix_mode") }) @Getter @Setter @@ -79,6 +81,15 @@ public class DocumentEmbedding { @Column(name = "embedded_at") private OffsetDateTime embeddedAt; + @Enumerated(EnumType.STRING) + @Column(name = "prefix_mode", nullable = false, length = 32) + @Builder.Default + private EmbeddingPrefixMode prefixMode = EmbeddingPrefixMode.OFF; + + @Column(name = "applied_prefix", length = 64) + private String appliedPrefix; + + @Builder.Default @Column(name = "created_at", nullable = false, updatable = false) private OffsetDateTime createdAt = OffsetDateTime.now(); diff --git a/src/main/java/at/procon/dip/domain/document/repository/DocumentEmbeddingRepository.java b/src/main/java/at/procon/dip/domain/document/repository/DocumentEmbeddingRepository.java index 85e0b63..977df13 100644 --- a/src/main/java/at/procon/dip/domain/document/repository/DocumentEmbeddingRepository.java +++ b/src/main/java/at/procon/dip/domain/document/repository/DocumentEmbeddingRepository.java @@ -2,6 +2,7 @@ package at.procon.dip.domain.document.repository; import at.procon.dip.domain.document.EmbeddingStatus; import at.procon.dip.domain.document.entity.DocumentEmbedding; +import at.procon.dip.embedding.model.EmbeddingPrefixMode; import java.time.OffsetDateTime; import java.util.List; import java.util.Optional; @@ -32,15 +33,25 @@ public interface DocumentEmbeddingRepository extends JpaRepository findDetailedById(@Param("embeddingId") UUID embeddingId); + default int updateEmbeddingVector(@Param("id") UUID id, + @Param("vectorData") float[] vectorData, + @Param("tokenCount") Integer tokenCount, + @Param("dimensions") Integer dimensions) { + return updateEmbeddingVector(id, vectorData, tokenCount, dimensions, EmbeddingPrefixMode.OFF.name(), null); + } + @Modifying @Query(value = "UPDATE doc.doc_embedding SET embedding_vector = CAST(:vectorData AS vector), " + "embedding_status = 'COMPLETED', embedded_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP, " + - "error_message = NULL, token_count = :tokenCount, embedding_dimensions = :dimensions WHERE id = :id", + "error_message = NULL, token_count = :tokenCount, embedding_dimensions = :dimensions, " + + "prefix_mode = :prefixMode, applied_prefix = :appliedPrefix WHERE id = :id", nativeQuery = true) int updateEmbeddingVector(@Param("id") UUID id, @Param("vectorData") float[] vectorData, @Param("tokenCount") Integer tokenCount, - @Param("dimensions") Integer dimensions); + @Param("dimensions") Integer dimensions, + @Param("prefixMode") String prefixMode, + @Param("appliedPrefix") String appliedPrefix); @Modifying @Query("UPDATE DocumentEmbedding e SET e.embeddingStatus = :status, e.errorMessage = :errorMessage, " + diff --git a/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java b/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java index d022e5e..6abada7 100644 --- a/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java +++ b/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java @@ -1,6 +1,7 @@ package at.procon.dip.embedding.config; import at.procon.dip.domain.document.DistanceMetric; +import at.procon.dip.embedding.model.EmbeddingPrefixMode; import java.time.Duration; import java.util.LinkedHashMap; import java.util.Map; @@ -50,6 +51,9 @@ public class EmbeddingProperties { private boolean supportsBatch = false; private Integer maxInputChars; private boolean active = true; + private EmbeddingPrefixMode prefixMode = EmbeddingPrefixMode.OFF; + private String queryPrefix = "query: "; + private String documentPrefix = "passage: "; } @Data diff --git a/src/main/java/at/procon/dip/embedding/model/EmbeddingInputPrefixing.java b/src/main/java/at/procon/dip/embedding/model/EmbeddingInputPrefixing.java new file mode 100644 index 0000000..d85c900 --- /dev/null +++ b/src/main/java/at/procon/dip/embedding/model/EmbeddingInputPrefixing.java @@ -0,0 +1,35 @@ +package at.procon.dip.embedding.model; + +import java.util.List; + +public final class EmbeddingInputPrefixing { + + private EmbeddingInputPrefixing() { + } + + public static PrefixedTexts apply(EmbeddingModelDescriptor model, EmbeddingUseCase useCase, List texts) { + EmbeddingPrefixMode prefixMode = model.prefixMode() == null ? EmbeddingPrefixMode.OFF : model.prefixMode(); + return switch (prefixMode) { + case OFF -> new PrefixedTexts(texts, EmbeddingPrefixMode.OFF, null); + case EXTERNAL -> new PrefixedTexts(texts, EmbeddingPrefixMode.EXTERNAL, null); + case CLIENT -> { + String prefix = useCase == EmbeddingUseCase.QUERY ? model.queryPrefix() : model.documentPrefix(); + if (prefix == null || prefix.isBlank()) { + throw new IllegalStateException("Prefix mode CLIENT requires a non-blank prefix for use case " + useCase); + } + yield new PrefixedTexts( + texts.stream().map(text -> prefix + text).toList(), + EmbeddingPrefixMode.CLIENT, + prefix + ); + } + }; + } + + public record PrefixedTexts( + List texts, + EmbeddingPrefixMode prefixMode, + String appliedPrefix + ) { + } +} diff --git a/src/main/java/at/procon/dip/embedding/model/EmbeddingModelDescriptor.java b/src/main/java/at/procon/dip/embedding/model/EmbeddingModelDescriptor.java index 8982901..75d51b5 100644 --- a/src/main/java/at/procon/dip/embedding/model/EmbeddingModelDescriptor.java +++ b/src/main/java/at/procon/dip/embedding/model/EmbeddingModelDescriptor.java @@ -11,6 +11,33 @@ public record EmbeddingModelDescriptor( boolean supportsQueryEmbeddingMode, boolean supportsBatch, Integer maxInputChars, - boolean active + boolean active, + EmbeddingPrefixMode prefixMode, + String queryPrefix, + String documentPrefix ) { + public EmbeddingModelDescriptor(String modelKey, + String providerConfigKey, + String providerModelKey, + int dimensions, + DistanceMetric distanceMetric, + boolean supportsQueryEmbeddingMode, + boolean supportsBatch, + Integer maxInputChars, + boolean active) { + this( + modelKey, + providerConfigKey, + providerModelKey, + dimensions, + distanceMetric, + supportsQueryEmbeddingMode, + supportsBatch, + maxInputChars, + active, + EmbeddingPrefixMode.OFF, + "query: ", + "passage: " + ); + } } diff --git a/src/main/java/at/procon/dip/embedding/model/EmbeddingPrefixMode.java b/src/main/java/at/procon/dip/embedding/model/EmbeddingPrefixMode.java new file mode 100644 index 0000000..88e518f --- /dev/null +++ b/src/main/java/at/procon/dip/embedding/model/EmbeddingPrefixMode.java @@ -0,0 +1,7 @@ +package at.procon.dip.embedding.model; + +public enum EmbeddingPrefixMode { + OFF, + CLIENT, + EXTERNAL +} diff --git a/src/main/java/at/procon/dip/embedding/model/EmbeddingProviderResult.java b/src/main/java/at/procon/dip/embedding/model/EmbeddingProviderResult.java index d6ae13d..3fe649a 100644 --- a/src/main/java/at/procon/dip/embedding/model/EmbeddingProviderResult.java +++ b/src/main/java/at/procon/dip/embedding/model/EmbeddingProviderResult.java @@ -7,6 +7,15 @@ public record EmbeddingProviderResult( List vectors, List warnings, String providerRequestId, - Integer tokenCount + Integer tokenCount, + EmbeddingPrefixMode prefixMode, + String appliedPrefix ) { + public EmbeddingProviderResult(EmbeddingModelDescriptor model, + List vectors, + List warnings, + String providerRequestId, + Integer tokenCount) { + this(model, vectors, warnings, providerRequestId, tokenCount, EmbeddingPrefixMode.OFF, null); + } } diff --git a/src/main/java/at/procon/dip/embedding/provider/http/ExternalHttpEmbeddingProvider.java b/src/main/java/at/procon/dip/embedding/provider/http/ExternalHttpEmbeddingProvider.java index ac6b919..123929e 100644 --- a/src/main/java/at/procon/dip/embedding/provider/http/ExternalHttpEmbeddingProvider.java +++ b/src/main/java/at/procon/dip/embedding/provider/http/ExternalHttpEmbeddingProvider.java @@ -1,5 +1,6 @@ package at.procon.dip.embedding.provider.http; +import at.procon.dip.embedding.model.EmbeddingInputPrefixing; import at.procon.dip.embedding.model.EmbeddingModelDescriptor; import at.procon.dip.embedding.model.EmbeddingProviderResult; import at.procon.dip.embedding.model.EmbeddingRequest; @@ -12,17 +13,19 @@ import java.io.IOException; import java.net.http.HttpResponse; import java.util.List; import java.util.Map; -import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; /** - * Existing HTTP/JSON embedding provider using the /embed contract. + * Compatibility provider for the currently running external Python vectorization service. + * + * It calls /embed with the legacy payload shape { text, isQuery }. */ @Component public class ExternalHttpEmbeddingProvider extends AbstractHttpEmbeddingProviderSupport implements EmbeddingProvider { + private static final String PROVIDER_TYPE = "http-json"; - public ExternalHttpEmbeddingProvider(ObjectMapper objectMapper, ObjectMapper mapper) { + public ExternalHttpEmbeddingProvider(ObjectMapper objectMapper) { super(objectMapper); } @@ -40,29 +43,32 @@ public class ExternalHttpEmbeddingProvider extends AbstractHttpEmbeddingProvider public EmbeddingProviderResult embedDocuments(ResolvedEmbeddingProviderConfig providerConfig, EmbeddingModelDescriptor model, EmbeddingRequest request) { - return execute(providerConfig, request, EmbeddingUseCase.DOCUMENT); + return execute(providerConfig, model, request, EmbeddingUseCase.DOCUMENT); } @Override public EmbeddingProviderResult embedQuery(ResolvedEmbeddingProviderConfig providerConfig, EmbeddingModelDescriptor model, EmbeddingRequest request) { - return execute(providerConfig, request, EmbeddingUseCase.QUERY); + return execute(providerConfig, model, request, EmbeddingUseCase.QUERY); } private EmbeddingProviderResult execute(ResolvedEmbeddingProviderConfig providerConfig, + EmbeddingModelDescriptor model, EmbeddingRequest request, EmbeddingUseCase useCase) { if (request.texts() == null || request.texts().isEmpty()) { throw new IllegalArgumentException("Embedding request texts must not be empty"); } + EmbeddingInputPrefixing.PrefixedTexts prefixedTexts = EmbeddingInputPrefixing.apply(model, useCase, request.texts()); + try { HttpResponse response = postJson( providerConfig, "/embed", Map.of( - "text", request.texts().getFirst(), + "text", prefixedTexts.texts().getFirst(), "isQuery", useCase == EmbeddingUseCase.QUERY ) ); @@ -73,11 +79,13 @@ public class ExternalHttpEmbeddingProvider extends AbstractHttpEmbeddingProvider } return new EmbeddingProviderResult( - null, + model, List.of(parsed.embedding), List.of(), null, - parsed.tokenCount + parsed.tokenCount, + prefixedTexts.prefixMode(), + prefixedTexts.appliedPrefix() ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/src/main/java/at/procon/dip/embedding/provider/http/VectorSyncHttpEmbeddingProvider.java b/src/main/java/at/procon/dip/embedding/provider/http/VectorSyncHttpEmbeddingProvider.java index c0f2804..fb60849 100644 --- a/src/main/java/at/procon/dip/embedding/provider/http/VectorSyncHttpEmbeddingProvider.java +++ b/src/main/java/at/procon/dip/embedding/provider/http/VectorSyncHttpEmbeddingProvider.java @@ -1,8 +1,10 @@ package at.procon.dip.embedding.provider.http; +import at.procon.dip.embedding.model.EmbeddingInputPrefixing; import at.procon.dip.embedding.model.EmbeddingModelDescriptor; import at.procon.dip.embedding.model.EmbeddingProviderResult; import at.procon.dip.embedding.model.EmbeddingRequest; +import at.procon.dip.embedding.model.EmbeddingUseCase; import at.procon.dip.embedding.model.ResolvedEmbeddingProviderConfig; import at.procon.dip.embedding.provider.EmbeddingProvider; import com.fasterxml.jackson.annotation.JsonProperty; @@ -73,27 +75,30 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid public EmbeddingProviderResult embedDocuments(ResolvedEmbeddingProviderConfig providerConfig, EmbeddingModelDescriptor model, EmbeddingRequest request) { - return execute(providerConfig, model, request); + return execute(providerConfig, model, request, EmbeddingUseCase.DOCUMENT); } @Override public EmbeddingProviderResult embedQuery(ResolvedEmbeddingProviderConfig providerConfig, EmbeddingModelDescriptor model, EmbeddingRequest request) { - return execute(providerConfig, model, request); + return execute(providerConfig, model, request, EmbeddingUseCase.QUERY); } private EmbeddingProviderResult execute(ResolvedEmbeddingProviderConfig providerConfig, EmbeddingModelDescriptor model, - EmbeddingRequest request) { + EmbeddingRequest request, + EmbeddingUseCase useCase) { if (request.texts() == null || request.texts().isEmpty()) { throw new IllegalArgumentException("Embedding request texts must not be empty"); } + EmbeddingInputPrefixing.PrefixedTexts prefixedTexts = EmbeddingInputPrefixing.apply(model, useCase, request.texts()); + try { - return request.texts().size() == 1 - ? executeSingle(providerConfig, model, request.texts().getFirst()) - : executeBatch(providerConfig, model, request); + return prefixedTexts.texts().size() == 1 + ? executeSingle(providerConfig, model, prefixedTexts) + : executeBatch(providerConfig, model, request, prefixedTexts); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("Embedding provider call interrupted", e); @@ -104,11 +109,11 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid private EmbeddingProviderResult executeSingle(ResolvedEmbeddingProviderConfig providerConfig, EmbeddingModelDescriptor model, - String text) throws IOException, InterruptedException { + EmbeddingInputPrefixing.PrefixedTexts prefixedTexts) throws IOException, InterruptedException { HttpResponse response = postJson( providerConfig, "/vector-sync", - new VectorSyncRequest(model.providerModelKey(), text) + new VectorSyncRequest(model.providerModelKey(), prefixedTexts.texts().getFirst()) ); VectorSyncResponse parsed = objectMapper.readValue(response.body(), VectorSyncResponse.class); @@ -119,13 +124,16 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid List.of(vector), List.of(), null, - parsed.tokenCount + parsed.tokenCount, + prefixedTexts.prefixMode(), + prefixedTexts.appliedPrefix() ); } private EmbeddingProviderResult executeBatch(ResolvedEmbeddingProviderConfig providerConfig, EmbeddingModelDescriptor model, - EmbeddingRequest request) throws IOException, InterruptedException { + EmbeddingRequest request, + EmbeddingInputPrefixing.PrefixedTexts prefixedTexts) throws IOException, InterruptedException { BatchRequestSettings settings = resolveBatchRequestSettings(providerConfig, request.providerOptions()); if (settings.truncateLength() <= 0) { @@ -135,10 +143,10 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid throw new IllegalArgumentException("Batch chunk size must be > 0"); } - List requestOrder = new ArrayList<>(request.texts().size()); - List items = new ArrayList<>(request.texts().size()); + List requestOrder = new ArrayList<>(prefixedTexts.texts().size()); + List items = new ArrayList<>(prefixedTexts.texts().size()); - for (String text : request.texts()) { + for (String text : prefixedTexts.texts()) { String id = UUID.randomUUID().toString(); requestOrder.add(id); items.add(new VectorizeBatchItemRequest(id, text)); @@ -167,7 +175,7 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid resultById.put(result.id, result); } - List vectors = new ArrayList<>(request.texts().size()); + List vectors = new ArrayList<>(prefixedTexts.texts().size()); int totalTokenCount = 0; boolean hasAnyTokenCount = false; @@ -190,7 +198,9 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid vectors, List.of(), null, - hasAnyTokenCount ? totalTokenCount : null + hasAnyTokenCount ? totalTokenCount : null, + prefixedTexts.prefixMode(), + prefixedTexts.appliedPrefix() ); } diff --git a/src/main/java/at/procon/dip/embedding/registry/EmbeddingModelRegistry.java b/src/main/java/at/procon/dip/embedding/registry/EmbeddingModelRegistry.java index 7b1a508..b08445c 100644 --- a/src/main/java/at/procon/dip/embedding/registry/EmbeddingModelRegistry.java +++ b/src/main/java/at/procon/dip/embedding/registry/EmbeddingModelRegistry.java @@ -57,7 +57,10 @@ public class EmbeddingModelRegistry { model.isSupportsQueryEmbeddingMode(), model.isSupportsBatch(), model.getMaxInputChars(), - model.isActive() + model.isActive(), + model.getPrefixMode(), + model.getQueryPrefix(), + model.getDocumentPrefix() ); } } diff --git a/src/main/java/at/procon/dip/embedding/service/EmbeddingJobExecutionPersistenceService.java b/src/main/java/at/procon/dip/embedding/service/EmbeddingJobExecutionPersistenceService.java index c176718..26e3d3e 100644 --- a/src/main/java/at/procon/dip/embedding/service/EmbeddingJobExecutionPersistenceService.java +++ b/src/main/java/at/procon/dip/embedding/service/EmbeddingJobExecutionPersistenceService.java @@ -1,6 +1,7 @@ package at.procon.dip.embedding.service; import at.procon.dip.embedding.job.service.EmbeddingJobService; +import at.procon.dip.embedding.model.EmbeddingPrefixMode; import at.procon.dip.embedding.model.EmbeddingProviderResult; import java.util.UUID; import lombok.RequiredArgsConstructor; @@ -37,7 +38,18 @@ public class EmbeddingJobExecutionPersistenceService { Integer tokenCount, UUID jobId, String providerRequestId) { - embeddingPersistenceService.saveCompleted(embeddingId, vector, tokenCount); + completeJob(embeddingId, vector, tokenCount, jobId, providerRequestId, EmbeddingPrefixMode.OFF, null); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void completeJob(UUID embeddingId, + float[] vector, + Integer tokenCount, + UUID jobId, + String providerRequestId, + EmbeddingPrefixMode prefixMode, + String appliedPrefix) { + embeddingPersistenceService.saveCompleted(embeddingId, vector, tokenCount, prefixMode, appliedPrefix); jobService.markCompleted(jobId, providerRequestId); } diff --git a/src/main/java/at/procon/dip/embedding/service/EmbeddingPersistenceService.java b/src/main/java/at/procon/dip/embedding/service/EmbeddingPersistenceService.java index f26bb04..c8ac9af 100644 --- a/src/main/java/at/procon/dip/embedding/service/EmbeddingPersistenceService.java +++ b/src/main/java/at/procon/dip/embedding/service/EmbeddingPersistenceService.java @@ -7,6 +7,7 @@ import at.procon.dip.domain.document.entity.DocumentTextRepresentation; import at.procon.dip.domain.document.repository.DocumentEmbeddingRepository; import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository; import at.procon.dip.domain.document.service.DocumentEmbeddingService; +import at.procon.dip.embedding.model.EmbeddingPrefixMode; import at.procon.dip.embedding.model.EmbeddingProviderResult; import java.time.OffsetDateTime; import java.util.UUID; @@ -43,10 +44,18 @@ public class EmbeddingPersistenceService { if (result.vectors() == null || result.vectors().isEmpty()) { throw new IllegalArgumentException("Embedding provider result contains no vectors"); } - saveCompleted(embeddingId, result.vectors().getFirst(), result.tokenCount()); + saveCompleted(embeddingId, result.vectors().getFirst(), result.tokenCount(), result.prefixMode(), result.appliedPrefix()); } public void saveCompleted(UUID embeddingId, float[] vector, Integer tokenCount) { + saveCompleted(embeddingId, vector, tokenCount, EmbeddingPrefixMode.OFF, null); + } + + public void saveCompleted(UUID embeddingId, + float[] vector, + Integer tokenCount, + EmbeddingPrefixMode prefixMode, + String appliedPrefix) { if (vector == null || vector.length == 0) { throw new IllegalArgumentException("Embedding vector must not be empty"); } @@ -54,7 +63,9 @@ public class EmbeddingPersistenceService { embeddingId, vector, tokenCount, - vector.length + vector.length, + (prefixMode == null ? EmbeddingPrefixMode.OFF : prefixMode).name(), + appliedPrefix ); } diff --git a/src/main/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestrator.java b/src/main/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestrator.java index b0581fe..dba0b02 100644 --- a/src/main/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestrator.java +++ b/src/main/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestrator.java @@ -285,7 +285,9 @@ public class RepresentationEmbeddingOrchestrator { result.vectors().get(i), null, prepared.job().getId(), - result.providerRequestId() + result.providerRequestId(), + result.prefixMode(), + result.appliedPrefix() ); } } catch (RuntimeException ex) { diff --git a/src/main/java/at/procon/dip/embedding/startup/EmbeddingSubsystemStartupValidator.java b/src/main/java/at/procon/dip/embedding/startup/EmbeddingSubsystemStartupValidator.java index 6c63380..64a7194 100644 --- a/src/main/java/at/procon/dip/embedding/startup/EmbeddingSubsystemStartupValidator.java +++ b/src/main/java/at/procon/dip/embedding/startup/EmbeddingSubsystemStartupValidator.java @@ -1,6 +1,7 @@ package at.procon.dip.embedding.startup; import at.procon.dip.embedding.config.EmbeddingProperties; +import at.procon.dip.embedding.model.EmbeddingPrefixMode; import at.procon.dip.embedding.registry.EmbeddingModelRegistry; import at.procon.dip.embedding.registry.EmbeddingProviderConfigResolver; import at.procon.dip.embedding.registry.EmbeddingProviderRegistry; @@ -34,8 +35,16 @@ public class EmbeddingSubsystemStartupValidator implements ApplicationRunner { modelRegistry.getActiveModels().forEach(model -> { var providerConfig = providerConfigResolver.resolve(model.providerConfigKey()); providerRegistry.getRequired(providerConfig.providerType()); - log.info("Validated embedding model {} -> provider {} ({})", - model.modelKey(), model.providerConfigKey(), providerConfig.providerType()); + if (model.prefixMode() == EmbeddingPrefixMode.CLIENT) { + if (model.queryPrefix() == null || model.queryPrefix().isBlank()) { + throw new IllegalStateException("Embedding model " + model.modelKey() + " uses CLIENT prefix mode but query-prefix is blank"); + } + if (model.documentPrefix() == null || model.documentPrefix().isBlank()) { + throw new IllegalStateException("Embedding model " + model.modelKey() + " uses CLIENT prefix mode but document-prefix is blank"); + } + } + log.info("Validated embedding model {} -> provider {} ({}, prefixMode={})", + model.modelKey(), model.providerConfigKey(), providerConfig.providerType(), model.prefixMode()); }); if (properties.getDefaultDocumentModel() != null && !properties.getDefaultDocumentModel().isBlank()) { diff --git a/src/main/resources/application-legacy.yml b/src/main/resources/application-legacy.yml index eef1beb..d1c29d1 100644 --- a/src/main/resources/application-legacy.yml +++ b/src/main/resources/application-legacy.yml @@ -133,7 +133,7 @@ ted: # Polling delay in milliseconds (1 minute) delay: 60000 # Max messages per poll - max-messages-per-poll: 100 + max-messages-per-poll: 10 # Output directory for processed attachments attachment-output-directory: /ted.europe/mail-attachments # Enable/disable MIME file input processing diff --git a/src/main/resources/application-new.yml b/src/main/resources/application-new.yml index 78f68ac..0108c0a 100644 --- a/src/main/resources/application-new.yml +++ b/src/main/resources/application-new.yml @@ -39,11 +39,11 @@ dip: embedding: enabled: true jobs: - enabled: true - parallel-batch-count: 2 + enabled: false + parallel-batch-count: 1 process-in-batches: true - batch-size: 48 - execution-batch-size: 48 + batch-size: 16 + execution-batch-size: 16 default-document-model: e5-default default-query-model: e5-default @@ -56,7 +56,7 @@ dip: external-e5: type: http-json - base-url: http://172.20.20.6:8001 + base-url: http://172.20.241.55:8001 connect-timeout: 5s read-timeout: 60s batch-request: @@ -66,7 +66,7 @@ dip: vector-sync-e5: type: http-vector-sync - base-url: http://172.20.20.6:8001 + base-url: http://172.20.241.55:8001 connect-timeout: 30s read-timeout: 300s headers: @@ -93,6 +93,9 @@ dip: distance-metric: COSINE supports-query-embedding-mode: true supports-batch: true + prefix-mode: CLIENT + query-prefix: "query: " + document-prefix: "passage: " active: true profiles: @@ -290,6 +293,18 @@ dip: # Delete tar.gz after ingestion delete-after-ingestion: true + + time: + enabled: false + leitstand: + enabled: false + import-batch-id: time-leitstand + reconcile-lookback-days: 7 + toggl-track: + enabled: false + import-batch-id: time-toggl + reconcile-lookback-days: 7 + ted: # Phase 3 TED projection configuration projection: # Enable/disable dual-write into the TED projection model on top of DOC.doc_document diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1581780..4a00f13 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -14,9 +14,12 @@ spring: name: document-intelligence-platform datasource: - url: jdbc:postgresql://localhost:5432/RELM + #url: jdbc:postgresql://localhost:5432/RELM + #username: ${DB_USERNAME:postgres} + #password: ${DB_PASSWORD:P54!pcd#Wi} + url: jdbc:postgresql://94.130.218.54:32333/RELM username: ${DB_USERNAME:postgres} - password: ${DB_PASSWORD:P54!pcd#Wi} + password: ${DB_PASSWORD:PDmXRx0Rbk9OFOn9qO5Gm/mPCfqW8zwbZ+/YIU1lySc=} driver-class-name: org.postgresql.Driver hikari: maximum-pool-size: 5 @@ -28,7 +31,7 @@ spring: jpa: hibernate: - ddl-auto: update + ddl-auto: validate show-sql: false open-in-view: false properties: diff --git a/src/main/resources/db/migration/V27__doc_embedding_prefix_provenance.sql b/src/main/resources/db/migration/V27__doc_embedding_prefix_provenance.sql new file mode 100644 index 0000000..b0f883a --- /dev/null +++ b/src/main/resources/db/migration/V27__doc_embedding_prefix_provenance.sql @@ -0,0 +1,8 @@ +ALTER TABLE doc.doc_embedding + ADD COLUMN IF NOT EXISTS prefix_mode VARCHAR(32) NOT NULL DEFAULT 'OFF'; + +ALTER TABLE doc.doc_embedding + ADD COLUMN IF NOT EXISTS applied_prefix VARCHAR(64); + +CREATE INDEX IF NOT EXISTS idx_doc_embedding_prefix_mode + ON doc.doc_embedding(prefix_mode); diff --git a/src/test/java/at/procon/dip/architecture/NewRuntimeMustNotDependOnTedProcessorPropertiesTest.java b/src/test/java/at/procon/dip/architecture/NewRuntimeMustNotDependOnTedProcessorPropertiesTest.java index 2ad0629..4687ca5 100644 --- a/src/test/java/at/procon/dip/architecture/NewRuntimeMustNotDependOnTedProcessorPropertiesTest.java +++ b/src/test/java/at/procon/dip/architecture/NewRuntimeMustNotDependOnTedProcessorPropertiesTest.java @@ -2,6 +2,7 @@ package at.procon.dip.architecture; import at.procon.dip.domain.ted.config.TedProjectionProperties; import at.procon.dip.ingestion.config.DipIngestionProperties; +import at.procon.dip.domain.time.config.TimeDomainProperties; import at.procon.dip.search.config.DipSearchProperties; import at.procon.ted.config.TedProcessorProperties; import java.lang.reflect.Constructor; @@ -22,11 +23,8 @@ class NewRuntimeMustNotDependOnTedProcessorPropertiesTest { List> newRuntimeClasses = List.of( at.procon.dip.ingestion.service.GenericDocumentImportService.class, at.procon.dip.ingestion.camel.GenericFileSystemIngestionRoute.class, - at.procon.dip.ingestion.camel.GenericMailIngestionRoute.class, at.procon.dip.ingestion.controller.GenericDocumentImportController.class, at.procon.dip.ingestion.adapter.MailDocumentIngestionAdapter.class, - at.procon.dip.ingestion.service.MailMetadataPersistenceService.class, - at.procon.dip.ingestion.mail.MailImportIdentityResolver.class, at.procon.dip.ingestion.adapter.TedPackageDocumentIngestionAdapter.class, at.procon.dip.ingestion.service.TedPackageChildImportProcessor.class, at.procon.dip.domain.ted.service.TedNoticeProjectionService.class, @@ -52,6 +50,7 @@ class NewRuntimeMustNotDependOnTedProcessorPropertiesTest { assertThat(DipSearchProperties.class).isNotNull(); assertThat(DipIngestionProperties.class).isNotNull(); assertThat(TedProjectionProperties.class).isNotNull(); + assertThat(TimeDomainProperties.class).isNotNull(); } private boolean hasDependency(Class owner, Class dependency) { diff --git a/src/test/java/at/procon/dip/embedding/provider/http/VectorSyncHttpEmbeddingProviderTest.java b/src/test/java/at/procon/dip/embedding/provider/http/VectorSyncHttpEmbeddingProviderTest.java index 889e99a..16d45aa 100644 --- a/src/test/java/at/procon/dip/embedding/provider/http/VectorSyncHttpEmbeddingProviderTest.java +++ b/src/test/java/at/procon/dip/embedding/provider/http/VectorSyncHttpEmbeddingProviderTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThat; import at.procon.dip.domain.document.DistanceMetric; import at.procon.dip.embedding.model.EmbeddingModelDescriptor; +import at.procon.dip.embedding.model.EmbeddingPrefixMode; import at.procon.dip.embedding.model.EmbeddingRequest; import at.procon.dip.embedding.model.EmbeddingUseCase; import at.procon.dip.embedding.model.ResolvedEmbeddingProviderConfig; @@ -27,6 +28,7 @@ class VectorSyncHttpEmbeddingProviderTest { private final ObjectMapper objectMapper = new ObjectMapper(); private HttpServer server; private final AtomicReference lastBatchBody = new AtomicReference<>(); + private final AtomicReference lastSingleBody = new AtomicReference<>(); @AfterEach void tearDown() { @@ -75,6 +77,53 @@ class VectorSyncHttpEmbeddingProviderTest { assertThat(result.vectors()).hasSize(1); assertThat(result.vectors().getFirst()).containsExactly(0.1f, 0.2f, 0.3f); assertThat(result.tokenCount()).isEqualTo(9); + assertThat(result.prefixMode()).isEqualTo(EmbeddingPrefixMode.OFF); + assertThat(result.appliedPrefix()).isNull(); + } + + @Test + void shouldPrefixTextsInClientModeForDocuments() throws Exception { + server = HttpServer.create(new InetSocketAddress(0), 0); + server.createContext("/vector-sync", this::handleVectorSync); + server.start(); + + var provider = new VectorSyncHttpEmbeddingProvider(objectMapper); + var config = ResolvedEmbeddingProviderConfig.builder() + .key("vector-sync-local") + .providerType("http-vector-sync") + .baseUrl("http://localhost:" + server.getAddress().getPort()) + .readTimeout(Duration.ofSeconds(5)) + .headers(Map.of("X-Client", "dip-test")) + .batchTruncateText(false) + .batchTruncateLength(512) + .batchChunkSize(20) + .build(); + var model = new EmbeddingModelDescriptor( + "e5-default", + "vector-sync-local", + "intfloat/multilingual-e5-large", + 3, + DistanceMetric.COSINE, + true, + false, + 8192, + true, + EmbeddingPrefixMode.CLIENT, + "query: ", + "passage: " + ); + var request = EmbeddingRequest.builder() + .modelKey("e5-default") + .useCase(EmbeddingUseCase.DOCUMENT) + .texts(List.of("This is a sample text to vectorize")) + .providerOptions(Map.of()) + .build(); + + var result = provider.embedDocuments(config, model, request); + + assertThat(result.prefixMode()).isEqualTo(EmbeddingPrefixMode.CLIENT); + assertThat(result.appliedPrefix()).isEqualTo("passage: "); + assertThat(lastSingleBody.get()).contains("passage: This is a sample text to vectorize"); } @Test @@ -118,6 +167,7 @@ class VectorSyncHttpEmbeddingProviderTest { assertThat(result.vectors().get(0)).containsExactly(0.1f, 0.2f, 0.3f); assertThat(result.vectors().get(1)).containsExactly(0.4f, 0.5f, 0.6f); assertThat(result.tokenCount()).isEqualTo(12); + assertThat(result.prefixMode()).isEqualTo(EmbeddingPrefixMode.OFF); JsonNode requestBody = objectMapper.readTree(lastBatchBody.get()); assertThat(requestBody.get("truncate_text").asBoolean()).isTrue(); @@ -177,9 +227,10 @@ class VectorSyncHttpEmbeddingProviderTest { body = new String(in.readAllBytes(), StandardCharsets.UTF_8); } + lastSingleBody.set(body); assertThat(exchange.getRequestMethod()).isEqualTo("POST"); assertThat(body).contains("\"model\":\"intfloat/multilingual-e5-large\""); - assertThat(body).contains("\"text\":\"This is a sample text to vectorize\""); + assertThat(body).contains("This is a sample text to vectorize"); assertThat(exchange.getRequestHeaders().getFirst("X-Client")).isEqualTo("dip-test"); respondJson(exchange, """ diff --git a/src/test/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestratorTest.java b/src/test/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestratorTest.java index 873e6c0..2b440b1 100644 --- a/src/test/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestratorTest.java +++ b/src/test/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestratorTest.java @@ -175,8 +175,8 @@ class RepresentationEmbeddingOrchestratorTest { ArgumentCaptor> textsCaptor = ArgumentCaptor.forClass(List.class); verify(executionService, times(1)).embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), textsCaptor.capture()); assertThat(textsCaptor.getValue()).containsExactly("alpha", "beta"); - verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1")); - verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1")); + verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1"), eq(EmbeddingPrefixMode.OFF), eq(null)); + verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1"), eq(EmbeddingPrefixMode.OFF), eq(null)); } @Test @@ -220,7 +220,7 @@ class RepresentationEmbeddingOrchestratorTest { orchestrator.processNextReadyBatch(); verify(executionService, times(1)).embedTexts(eq("mock-search"), eq(EmbeddingUseCase.DOCUMENT), eq(List.of("gamma"))); - verify(executionPersistenceService, never()).completeJob(eq(embeddingId), any(float[].class), eq(null), eq(job.getId()), anyString()); + verify(executionPersistenceService, never()).completeJob(eq(embeddingId), any(float[].class), eq(null), eq(job.getId()), anyString(), any(), any()); verify(executionPersistenceService).completeJob(eq(embeddingId), any(EmbeddingProviderResult.class), eq(job.getId()), eq("req-2")); } @@ -305,11 +305,11 @@ class RepresentationEmbeddingOrchestratorTest { assertThat(processed).isEqualTo(6); verify(jobService, times(4)).claimNextReadyJobs(2); verify(executionService, times(3)).embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), any()); - verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1")); - verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1")); - verify(executionPersistenceService).completeJob(eq(embeddingId3), aryEq(new float[]{0.7f, 0.8f, 0.9f}), eq(null), eq(job3.getId()), eq("batch-req-2")); - verify(executionPersistenceService).completeJob(eq(embeddingId4), aryEq(new float[]{1.0f, 1.1f, 1.2f}), eq(null), eq(job4.getId()), eq("batch-req-2")); - verify(executionPersistenceService).completeJob(eq(embeddingId5), aryEq(new float[]{1.3f, 1.4f, 1.5f}), eq(null), eq(job5.getId()), eq("batch-req-3")); - verify(executionPersistenceService).completeJob(eq(embeddingId6), aryEq(new float[]{1.6f, 1.7f, 1.8f}), eq(null), eq(job6.getId()), eq("batch-req-3")); + verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1"), eq(EmbeddingPrefixMode.OFF), eq(null)); + verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1"), eq(EmbeddingPrefixMode.OFF), eq(null)); + verify(executionPersistenceService).completeJob(eq(embeddingId3), aryEq(new float[]{0.7f, 0.8f, 0.9f}), eq(null), eq(job3.getId()), eq("batch-req-2"), eq(EmbeddingPrefixMode.OFF), eq(null)); + verify(executionPersistenceService).completeJob(eq(embeddingId4), aryEq(new float[]{1.0f, 1.1f, 1.2f}), eq(null), eq(job4.getId()), eq("batch-req-2"), eq(EmbeddingPrefixMode.OFF), eq(null)); + verify(executionPersistenceService).completeJob(eq(embeddingId5), aryEq(new float[]{1.3f, 1.4f, 1.5f}), eq(null), eq(job5.getId()), eq("batch-req-3"), eq(EmbeddingPrefixMode.OFF), eq(null)); + verify(executionPersistenceService).completeJob(eq(embeddingId6), aryEq(new float[]{1.6f, 1.7f, 1.8f}), eq(null), eq(job6.getId()), eq("batch-req-3"), eq(EmbeddingPrefixMode.OFF), eq(null)); } }