embedding model prefixes support
This commit is contained in:
parent
1cd8ebe066
commit
439a06d633
|
|
@ -67,3 +67,33 @@ Notes:
|
|||
- non-batch-capable models still fall back to single-item execution
|
||||
- `parallel-batch-count` controls how many claimed job batches may be started in parallel
|
||||
- `execution-batch-size` controls how many texts are sent in one `/vectorize-batch` request inside each claimed job batch
|
||||
|
||||
|
||||
## E5 prefix handling
|
||||
|
||||
For models such as `intfloat/multilingual-e5-large`, configure prefix handling on the model:
|
||||
|
||||
```yaml
|
||||
dip:
|
||||
embedding:
|
||||
models:
|
||||
e5-default:
|
||||
provider-config-key: vector-sync-e5
|
||||
provider-model-key: intfloat/multilingual-e5-large
|
||||
dimensions: 1024
|
||||
supports-batch: true
|
||||
prefix-mode: CLIENT
|
||||
query-prefix: "query: "
|
||||
document-prefix: "passage: "
|
||||
```
|
||||
|
||||
Supported modes:
|
||||
- `OFF` - DIP sends raw text
|
||||
- `CLIENT` - DIP prepends the configured prefix before calling the provider
|
||||
- `EXTERNAL` - DIP assumes the external service applies the prefixing itself
|
||||
|
||||
For persisted document embeddings, the produced prefix provenance is stored in `doc.doc_embedding`:
|
||||
- `prefix_mode`
|
||||
- `applied_prefix`
|
||||
|
||||
This makes it possible to identify whether indexed vectors were created with raw text, DIP-side prefixing, or externally handled prefixing before deciding on re-embedding.
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package at.procon.dip.domain.document.entity;
|
|||
|
||||
import at.procon.dip.architecture.SchemaNames;
|
||||
import at.procon.dip.domain.document.EmbeddingStatus;
|
||||
import at.procon.dip.embedding.model.EmbeddingPrefixMode;
|
||||
import jakarta.persistence.Column;
|
||||
import jakarta.persistence.Entity;
|
||||
import jakarta.persistence.EnumType;
|
||||
|
|
@ -37,7 +38,8 @@ import lombok.Setter;
|
|||
@Index(name = "idx_doc_embedding_repr", columnList = "representation_id"),
|
||||
@Index(name = "idx_doc_embedding_model", columnList = "model_id"),
|
||||
@Index(name = "idx_doc_embedding_status", columnList = "embedding_status"),
|
||||
@Index(name = "idx_doc_embedding_embedded_at", columnList = "embedded_at")
|
||||
@Index(name = "idx_doc_embedding_embedded_at", columnList = "embedded_at"),
|
||||
@Index(name = "idx_doc_embedding_prefix_mode", columnList = "prefix_mode")
|
||||
})
|
||||
@Getter
|
||||
@Setter
|
||||
|
|
@ -79,6 +81,15 @@ public class DocumentEmbedding {
|
|||
@Column(name = "embedded_at")
|
||||
private OffsetDateTime embeddedAt;
|
||||
|
||||
@Enumerated(EnumType.STRING)
|
||||
@Column(name = "prefix_mode", nullable = false, length = 32)
|
||||
@Builder.Default
|
||||
private EmbeddingPrefixMode prefixMode = EmbeddingPrefixMode.OFF;
|
||||
|
||||
@Column(name = "applied_prefix", length = 64)
|
||||
private String appliedPrefix;
|
||||
|
||||
|
||||
@Builder.Default
|
||||
@Column(name = "created_at", nullable = false, updatable = false)
|
||||
private OffsetDateTime createdAt = OffsetDateTime.now();
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package at.procon.dip.domain.document.repository;
|
|||
|
||||
import at.procon.dip.domain.document.EmbeddingStatus;
|
||||
import at.procon.dip.domain.document.entity.DocumentEmbedding;
|
||||
import at.procon.dip.embedding.model.EmbeddingPrefixMode;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
|
@ -32,15 +33,25 @@ public interface DocumentEmbeddingRepository extends JpaRepository<DocumentEmbed
|
|||
"WHERE e.id = :embeddingId")
|
||||
Optional<DocumentEmbedding> findDetailedById(@Param("embeddingId") UUID embeddingId);
|
||||
|
||||
default int updateEmbeddingVector(@Param("id") UUID id,
|
||||
@Param("vectorData") float[] vectorData,
|
||||
@Param("tokenCount") Integer tokenCount,
|
||||
@Param("dimensions") Integer dimensions) {
|
||||
return updateEmbeddingVector(id, vectorData, tokenCount, dimensions, EmbeddingPrefixMode.OFF.name(), null);
|
||||
}
|
||||
|
||||
@Modifying
|
||||
@Query(value = "UPDATE doc.doc_embedding SET embedding_vector = CAST(:vectorData AS vector), " +
|
||||
"embedding_status = 'COMPLETED', embedded_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP, " +
|
||||
"error_message = NULL, token_count = :tokenCount, embedding_dimensions = :dimensions WHERE id = :id",
|
||||
"error_message = NULL, token_count = :tokenCount, embedding_dimensions = :dimensions, " +
|
||||
"prefix_mode = :prefixMode, applied_prefix = :appliedPrefix WHERE id = :id",
|
||||
nativeQuery = true)
|
||||
int updateEmbeddingVector(@Param("id") UUID id,
|
||||
@Param("vectorData") float[] vectorData,
|
||||
@Param("tokenCount") Integer tokenCount,
|
||||
@Param("dimensions") Integer dimensions);
|
||||
@Param("dimensions") Integer dimensions,
|
||||
@Param("prefixMode") String prefixMode,
|
||||
@Param("appliedPrefix") String appliedPrefix);
|
||||
|
||||
@Modifying
|
||||
@Query("UPDATE DocumentEmbedding e SET e.embeddingStatus = :status, e.errorMessage = :errorMessage, " +
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package at.procon.dip.embedding.config;
|
||||
|
||||
import at.procon.dip.domain.document.DistanceMetric;
|
||||
import at.procon.dip.embedding.model.EmbeddingPrefixMode;
|
||||
import java.time.Duration;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
|
@ -50,6 +51,9 @@ public class EmbeddingProperties {
|
|||
private boolean supportsBatch = false;
|
||||
private Integer maxInputChars;
|
||||
private boolean active = true;
|
||||
private EmbeddingPrefixMode prefixMode = EmbeddingPrefixMode.OFF;
|
||||
private String queryPrefix = "query: ";
|
||||
private String documentPrefix = "passage: ";
|
||||
}
|
||||
|
||||
@Data
|
||||
|
|
|
|||
|
|
@ -0,0 +1,35 @@
|
|||
package at.procon.dip.embedding.model;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public final class EmbeddingInputPrefixing {
|
||||
|
||||
private EmbeddingInputPrefixing() {
|
||||
}
|
||||
|
||||
public static PrefixedTexts apply(EmbeddingModelDescriptor model, EmbeddingUseCase useCase, List<String> texts) {
|
||||
EmbeddingPrefixMode prefixMode = model.prefixMode() == null ? EmbeddingPrefixMode.OFF : model.prefixMode();
|
||||
return switch (prefixMode) {
|
||||
case OFF -> new PrefixedTexts(texts, EmbeddingPrefixMode.OFF, null);
|
||||
case EXTERNAL -> new PrefixedTexts(texts, EmbeddingPrefixMode.EXTERNAL, null);
|
||||
case CLIENT -> {
|
||||
String prefix = useCase == EmbeddingUseCase.QUERY ? model.queryPrefix() : model.documentPrefix();
|
||||
if (prefix == null || prefix.isBlank()) {
|
||||
throw new IllegalStateException("Prefix mode CLIENT requires a non-blank prefix for use case " + useCase);
|
||||
}
|
||||
yield new PrefixedTexts(
|
||||
texts.stream().map(text -> prefix + text).toList(),
|
||||
EmbeddingPrefixMode.CLIENT,
|
||||
prefix
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public record PrefixedTexts(
|
||||
List<String> texts,
|
||||
EmbeddingPrefixMode prefixMode,
|
||||
String appliedPrefix
|
||||
) {
|
||||
}
|
||||
}
|
||||
|
|
@ -11,6 +11,33 @@ public record EmbeddingModelDescriptor(
|
|||
boolean supportsQueryEmbeddingMode,
|
||||
boolean supportsBatch,
|
||||
Integer maxInputChars,
|
||||
boolean active
|
||||
boolean active,
|
||||
EmbeddingPrefixMode prefixMode,
|
||||
String queryPrefix,
|
||||
String documentPrefix
|
||||
) {
|
||||
public EmbeddingModelDescriptor(String modelKey,
|
||||
String providerConfigKey,
|
||||
String providerModelKey,
|
||||
int dimensions,
|
||||
DistanceMetric distanceMetric,
|
||||
boolean supportsQueryEmbeddingMode,
|
||||
boolean supportsBatch,
|
||||
Integer maxInputChars,
|
||||
boolean active) {
|
||||
this(
|
||||
modelKey,
|
||||
providerConfigKey,
|
||||
providerModelKey,
|
||||
dimensions,
|
||||
distanceMetric,
|
||||
supportsQueryEmbeddingMode,
|
||||
supportsBatch,
|
||||
maxInputChars,
|
||||
active,
|
||||
EmbeddingPrefixMode.OFF,
|
||||
"query: ",
|
||||
"passage: "
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,7 @@
|
|||
package at.procon.dip.embedding.model;
|
||||
|
||||
public enum EmbeddingPrefixMode {
|
||||
OFF,
|
||||
CLIENT,
|
||||
EXTERNAL
|
||||
}
|
||||
|
|
@ -7,6 +7,15 @@ public record EmbeddingProviderResult(
|
|||
List<float[]> vectors,
|
||||
List<String> warnings,
|
||||
String providerRequestId,
|
||||
Integer tokenCount
|
||||
Integer tokenCount,
|
||||
EmbeddingPrefixMode prefixMode,
|
||||
String appliedPrefix
|
||||
) {
|
||||
public EmbeddingProviderResult(EmbeddingModelDescriptor model,
|
||||
List<float[]> vectors,
|
||||
List<String> warnings,
|
||||
String providerRequestId,
|
||||
Integer tokenCount) {
|
||||
this(model, vectors, warnings, providerRequestId, tokenCount, EmbeddingPrefixMode.OFF, null);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package at.procon.dip.embedding.provider.http;
|
||||
|
||||
import at.procon.dip.embedding.model.EmbeddingInputPrefixing;
|
||||
import at.procon.dip.embedding.model.EmbeddingModelDescriptor;
|
||||
import at.procon.dip.embedding.model.EmbeddingProviderResult;
|
||||
import at.procon.dip.embedding.model.EmbeddingRequest;
|
||||
|
|
@ -12,17 +13,19 @@ import java.io.IOException;
|
|||
import java.net.http.HttpResponse;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Existing HTTP/JSON embedding provider using the /embed contract.
|
||||
* Compatibility provider for the currently running external Python vectorization service.
|
||||
*
|
||||
* It calls /embed with the legacy payload shape { text, isQuery }.
|
||||
*/
|
||||
@Component
|
||||
public class ExternalHttpEmbeddingProvider extends AbstractHttpEmbeddingProviderSupport implements EmbeddingProvider {
|
||||
|
||||
private static final String PROVIDER_TYPE = "http-json";
|
||||
|
||||
public ExternalHttpEmbeddingProvider(ObjectMapper objectMapper, ObjectMapper mapper) {
|
||||
public ExternalHttpEmbeddingProvider(ObjectMapper objectMapper) {
|
||||
super(objectMapper);
|
||||
}
|
||||
|
||||
|
|
@ -40,29 +43,32 @@ public class ExternalHttpEmbeddingProvider extends AbstractHttpEmbeddingProvider
|
|||
public EmbeddingProviderResult embedDocuments(ResolvedEmbeddingProviderConfig providerConfig,
|
||||
EmbeddingModelDescriptor model,
|
||||
EmbeddingRequest request) {
|
||||
return execute(providerConfig, request, EmbeddingUseCase.DOCUMENT);
|
||||
return execute(providerConfig, model, request, EmbeddingUseCase.DOCUMENT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EmbeddingProviderResult embedQuery(ResolvedEmbeddingProviderConfig providerConfig,
|
||||
EmbeddingModelDescriptor model,
|
||||
EmbeddingRequest request) {
|
||||
return execute(providerConfig, request, EmbeddingUseCase.QUERY);
|
||||
return execute(providerConfig, model, request, EmbeddingUseCase.QUERY);
|
||||
}
|
||||
|
||||
private EmbeddingProviderResult execute(ResolvedEmbeddingProviderConfig providerConfig,
|
||||
EmbeddingModelDescriptor model,
|
||||
EmbeddingRequest request,
|
||||
EmbeddingUseCase useCase) {
|
||||
if (request.texts() == null || request.texts().isEmpty()) {
|
||||
throw new IllegalArgumentException("Embedding request texts must not be empty");
|
||||
}
|
||||
|
||||
EmbeddingInputPrefixing.PrefixedTexts prefixedTexts = EmbeddingInputPrefixing.apply(model, useCase, request.texts());
|
||||
|
||||
try {
|
||||
HttpResponse<String> response = postJson(
|
||||
providerConfig,
|
||||
"/embed",
|
||||
Map.of(
|
||||
"text", request.texts().getFirst(),
|
||||
"text", prefixedTexts.texts().getFirst(),
|
||||
"isQuery", useCase == EmbeddingUseCase.QUERY
|
||||
)
|
||||
);
|
||||
|
|
@ -73,11 +79,13 @@ public class ExternalHttpEmbeddingProvider extends AbstractHttpEmbeddingProvider
|
|||
}
|
||||
|
||||
return new EmbeddingProviderResult(
|
||||
null,
|
||||
model,
|
||||
List.of(parsed.embedding),
|
||||
List.of(),
|
||||
null,
|
||||
parsed.tokenCount
|
||||
parsed.tokenCount,
|
||||
prefixedTexts.prefixMode(),
|
||||
prefixedTexts.appliedPrefix()
|
||||
);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
|
|
|||
|
|
@ -1,8 +1,10 @@
|
|||
package at.procon.dip.embedding.provider.http;
|
||||
|
||||
import at.procon.dip.embedding.model.EmbeddingInputPrefixing;
|
||||
import at.procon.dip.embedding.model.EmbeddingModelDescriptor;
|
||||
import at.procon.dip.embedding.model.EmbeddingProviderResult;
|
||||
import at.procon.dip.embedding.model.EmbeddingRequest;
|
||||
import at.procon.dip.embedding.model.EmbeddingUseCase;
|
||||
import at.procon.dip.embedding.model.ResolvedEmbeddingProviderConfig;
|
||||
import at.procon.dip.embedding.provider.EmbeddingProvider;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
|
@ -73,27 +75,30 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid
|
|||
public EmbeddingProviderResult embedDocuments(ResolvedEmbeddingProviderConfig providerConfig,
|
||||
EmbeddingModelDescriptor model,
|
||||
EmbeddingRequest request) {
|
||||
return execute(providerConfig, model, request);
|
||||
return execute(providerConfig, model, request, EmbeddingUseCase.DOCUMENT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EmbeddingProviderResult embedQuery(ResolvedEmbeddingProviderConfig providerConfig,
|
||||
EmbeddingModelDescriptor model,
|
||||
EmbeddingRequest request) {
|
||||
return execute(providerConfig, model, request);
|
||||
return execute(providerConfig, model, request, EmbeddingUseCase.QUERY);
|
||||
}
|
||||
|
||||
private EmbeddingProviderResult execute(ResolvedEmbeddingProviderConfig providerConfig,
|
||||
EmbeddingModelDescriptor model,
|
||||
EmbeddingRequest request) {
|
||||
EmbeddingRequest request,
|
||||
EmbeddingUseCase useCase) {
|
||||
if (request.texts() == null || request.texts().isEmpty()) {
|
||||
throw new IllegalArgumentException("Embedding request texts must not be empty");
|
||||
}
|
||||
|
||||
EmbeddingInputPrefixing.PrefixedTexts prefixedTexts = EmbeddingInputPrefixing.apply(model, useCase, request.texts());
|
||||
|
||||
try {
|
||||
return request.texts().size() == 1
|
||||
? executeSingle(providerConfig, model, request.texts().getFirst())
|
||||
: executeBatch(providerConfig, model, request);
|
||||
return prefixedTexts.texts().size() == 1
|
||||
? executeSingle(providerConfig, model, prefixedTexts)
|
||||
: executeBatch(providerConfig, model, request, prefixedTexts);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException("Embedding provider call interrupted", e);
|
||||
|
|
@ -104,11 +109,11 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid
|
|||
|
||||
private EmbeddingProviderResult executeSingle(ResolvedEmbeddingProviderConfig providerConfig,
|
||||
EmbeddingModelDescriptor model,
|
||||
String text) throws IOException, InterruptedException {
|
||||
EmbeddingInputPrefixing.PrefixedTexts prefixedTexts) throws IOException, InterruptedException {
|
||||
HttpResponse<String> response = postJson(
|
||||
providerConfig,
|
||||
"/vector-sync",
|
||||
new VectorSyncRequest(model.providerModelKey(), text)
|
||||
new VectorSyncRequest(model.providerModelKey(), prefixedTexts.texts().getFirst())
|
||||
);
|
||||
|
||||
VectorSyncResponse parsed = objectMapper.readValue(response.body(), VectorSyncResponse.class);
|
||||
|
|
@ -119,13 +124,16 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid
|
|||
List.of(vector),
|
||||
List.of(),
|
||||
null,
|
||||
parsed.tokenCount
|
||||
parsed.tokenCount,
|
||||
prefixedTexts.prefixMode(),
|
||||
prefixedTexts.appliedPrefix()
|
||||
);
|
||||
}
|
||||
|
||||
private EmbeddingProviderResult executeBatch(ResolvedEmbeddingProviderConfig providerConfig,
|
||||
EmbeddingModelDescriptor model,
|
||||
EmbeddingRequest request) throws IOException, InterruptedException {
|
||||
EmbeddingRequest request,
|
||||
EmbeddingInputPrefixing.PrefixedTexts prefixedTexts) throws IOException, InterruptedException {
|
||||
BatchRequestSettings settings = resolveBatchRequestSettings(providerConfig, request.providerOptions());
|
||||
|
||||
if (settings.truncateLength() <= 0) {
|
||||
|
|
@ -135,10 +143,10 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid
|
|||
throw new IllegalArgumentException("Batch chunk size must be > 0");
|
||||
}
|
||||
|
||||
List<String> requestOrder = new ArrayList<>(request.texts().size());
|
||||
List<VectorizeBatchItemRequest> items = new ArrayList<>(request.texts().size());
|
||||
List<String> requestOrder = new ArrayList<>(prefixedTexts.texts().size());
|
||||
List<VectorizeBatchItemRequest> items = new ArrayList<>(prefixedTexts.texts().size());
|
||||
|
||||
for (String text : request.texts()) {
|
||||
for (String text : prefixedTexts.texts()) {
|
||||
String id = UUID.randomUUID().toString();
|
||||
requestOrder.add(id);
|
||||
items.add(new VectorizeBatchItemRequest(id, text));
|
||||
|
|
@ -167,7 +175,7 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid
|
|||
resultById.put(result.id, result);
|
||||
}
|
||||
|
||||
List<float[]> vectors = new ArrayList<>(request.texts().size());
|
||||
List<float[]> vectors = new ArrayList<>(prefixedTexts.texts().size());
|
||||
int totalTokenCount = 0;
|
||||
boolean hasAnyTokenCount = false;
|
||||
|
||||
|
|
@ -190,7 +198,9 @@ public class VectorSyncHttpEmbeddingProvider extends AbstractHttpEmbeddingProvid
|
|||
vectors,
|
||||
List.of(),
|
||||
null,
|
||||
hasAnyTokenCount ? totalTokenCount : null
|
||||
hasAnyTokenCount ? totalTokenCount : null,
|
||||
prefixedTexts.prefixMode(),
|
||||
prefixedTexts.appliedPrefix()
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -57,7 +57,10 @@ public class EmbeddingModelRegistry {
|
|||
model.isSupportsQueryEmbeddingMode(),
|
||||
model.isSupportsBatch(),
|
||||
model.getMaxInputChars(),
|
||||
model.isActive()
|
||||
model.isActive(),
|
||||
model.getPrefixMode(),
|
||||
model.getQueryPrefix(),
|
||||
model.getDocumentPrefix()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package at.procon.dip.embedding.service;
|
||||
|
||||
import at.procon.dip.embedding.job.service.EmbeddingJobService;
|
||||
import at.procon.dip.embedding.model.EmbeddingPrefixMode;
|
||||
import at.procon.dip.embedding.model.EmbeddingProviderResult;
|
||||
import java.util.UUID;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
|
@ -37,7 +38,18 @@ public class EmbeddingJobExecutionPersistenceService {
|
|||
Integer tokenCount,
|
||||
UUID jobId,
|
||||
String providerRequestId) {
|
||||
embeddingPersistenceService.saveCompleted(embeddingId, vector, tokenCount);
|
||||
completeJob(embeddingId, vector, tokenCount, jobId, providerRequestId, EmbeddingPrefixMode.OFF, null);
|
||||
}
|
||||
|
||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||
public void completeJob(UUID embeddingId,
|
||||
float[] vector,
|
||||
Integer tokenCount,
|
||||
UUID jobId,
|
||||
String providerRequestId,
|
||||
EmbeddingPrefixMode prefixMode,
|
||||
String appliedPrefix) {
|
||||
embeddingPersistenceService.saveCompleted(embeddingId, vector, tokenCount, prefixMode, appliedPrefix);
|
||||
jobService.markCompleted(jobId, providerRequestId);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import at.procon.dip.domain.document.entity.DocumentTextRepresentation;
|
|||
import at.procon.dip.domain.document.repository.DocumentEmbeddingRepository;
|
||||
import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository;
|
||||
import at.procon.dip.domain.document.service.DocumentEmbeddingService;
|
||||
import at.procon.dip.embedding.model.EmbeddingPrefixMode;
|
||||
import at.procon.dip.embedding.model.EmbeddingProviderResult;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.UUID;
|
||||
|
|
@ -43,10 +44,18 @@ public class EmbeddingPersistenceService {
|
|||
if (result.vectors() == null || result.vectors().isEmpty()) {
|
||||
throw new IllegalArgumentException("Embedding provider result contains no vectors");
|
||||
}
|
||||
saveCompleted(embeddingId, result.vectors().getFirst(), result.tokenCount());
|
||||
saveCompleted(embeddingId, result.vectors().getFirst(), result.tokenCount(), result.prefixMode(), result.appliedPrefix());
|
||||
}
|
||||
|
||||
public void saveCompleted(UUID embeddingId, float[] vector, Integer tokenCount) {
|
||||
saveCompleted(embeddingId, vector, tokenCount, EmbeddingPrefixMode.OFF, null);
|
||||
}
|
||||
|
||||
public void saveCompleted(UUID embeddingId,
|
||||
float[] vector,
|
||||
Integer tokenCount,
|
||||
EmbeddingPrefixMode prefixMode,
|
||||
String appliedPrefix) {
|
||||
if (vector == null || vector.length == 0) {
|
||||
throw new IllegalArgumentException("Embedding vector must not be empty");
|
||||
}
|
||||
|
|
@ -54,7 +63,9 @@ public class EmbeddingPersistenceService {
|
|||
embeddingId,
|
||||
vector,
|
||||
tokenCount,
|
||||
vector.length
|
||||
vector.length,
|
||||
(prefixMode == null ? EmbeddingPrefixMode.OFF : prefixMode).name(),
|
||||
appliedPrefix
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -285,7 +285,9 @@ public class RepresentationEmbeddingOrchestrator {
|
|||
result.vectors().get(i),
|
||||
null,
|
||||
prepared.job().getId(),
|
||||
result.providerRequestId()
|
||||
result.providerRequestId(),
|
||||
result.prefixMode(),
|
||||
result.appliedPrefix()
|
||||
);
|
||||
}
|
||||
} catch (RuntimeException ex) {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package at.procon.dip.embedding.startup;
|
||||
|
||||
import at.procon.dip.embedding.config.EmbeddingProperties;
|
||||
import at.procon.dip.embedding.model.EmbeddingPrefixMode;
|
||||
import at.procon.dip.embedding.registry.EmbeddingModelRegistry;
|
||||
import at.procon.dip.embedding.registry.EmbeddingProviderConfigResolver;
|
||||
import at.procon.dip.embedding.registry.EmbeddingProviderRegistry;
|
||||
|
|
@ -34,8 +35,16 @@ public class EmbeddingSubsystemStartupValidator implements ApplicationRunner {
|
|||
modelRegistry.getActiveModels().forEach(model -> {
|
||||
var providerConfig = providerConfigResolver.resolve(model.providerConfigKey());
|
||||
providerRegistry.getRequired(providerConfig.providerType());
|
||||
log.info("Validated embedding model {} -> provider {} ({})",
|
||||
model.modelKey(), model.providerConfigKey(), providerConfig.providerType());
|
||||
if (model.prefixMode() == EmbeddingPrefixMode.CLIENT) {
|
||||
if (model.queryPrefix() == null || model.queryPrefix().isBlank()) {
|
||||
throw new IllegalStateException("Embedding model " + model.modelKey() + " uses CLIENT prefix mode but query-prefix is blank");
|
||||
}
|
||||
if (model.documentPrefix() == null || model.documentPrefix().isBlank()) {
|
||||
throw new IllegalStateException("Embedding model " + model.modelKey() + " uses CLIENT prefix mode but document-prefix is blank");
|
||||
}
|
||||
}
|
||||
log.info("Validated embedding model {} -> provider {} ({}, prefixMode={})",
|
||||
model.modelKey(), model.providerConfigKey(), providerConfig.providerType(), model.prefixMode());
|
||||
});
|
||||
|
||||
if (properties.getDefaultDocumentModel() != null && !properties.getDefaultDocumentModel().isBlank()) {
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ ted:
|
|||
# Polling delay in milliseconds (1 minute)
|
||||
delay: 60000
|
||||
# Max messages per poll
|
||||
max-messages-per-poll: 100
|
||||
max-messages-per-poll: 10
|
||||
# Output directory for processed attachments
|
||||
attachment-output-directory: /ted.europe/mail-attachments
|
||||
# Enable/disable MIME file input processing
|
||||
|
|
|
|||
|
|
@ -39,11 +39,11 @@ dip:
|
|||
embedding:
|
||||
enabled: true
|
||||
jobs:
|
||||
enabled: true
|
||||
parallel-batch-count: 2
|
||||
enabled: false
|
||||
parallel-batch-count: 1
|
||||
process-in-batches: true
|
||||
batch-size: 48
|
||||
execution-batch-size: 48
|
||||
batch-size: 16
|
||||
execution-batch-size: 16
|
||||
|
||||
default-document-model: e5-default
|
||||
default-query-model: e5-default
|
||||
|
|
@ -56,7 +56,7 @@ dip:
|
|||
|
||||
external-e5:
|
||||
type: http-json
|
||||
base-url: http://172.20.20.6:8001
|
||||
base-url: http://172.20.241.55:8001
|
||||
connect-timeout: 5s
|
||||
read-timeout: 60s
|
||||
batch-request:
|
||||
|
|
@ -66,7 +66,7 @@ dip:
|
|||
|
||||
vector-sync-e5:
|
||||
type: http-vector-sync
|
||||
base-url: http://172.20.20.6:8001
|
||||
base-url: http://172.20.241.55:8001
|
||||
connect-timeout: 30s
|
||||
read-timeout: 300s
|
||||
headers:
|
||||
|
|
@ -93,6 +93,9 @@ dip:
|
|||
distance-metric: COSINE
|
||||
supports-query-embedding-mode: true
|
||||
supports-batch: true
|
||||
prefix-mode: CLIENT
|
||||
query-prefix: "query: "
|
||||
document-prefix: "passage: "
|
||||
active: true
|
||||
|
||||
profiles:
|
||||
|
|
@ -290,6 +293,18 @@ dip:
|
|||
# Delete tar.gz after ingestion
|
||||
delete-after-ingestion: true
|
||||
|
||||
|
||||
time:
|
||||
enabled: false
|
||||
leitstand:
|
||||
enabled: false
|
||||
import-batch-id: time-leitstand
|
||||
reconcile-lookback-days: 7
|
||||
toggl-track:
|
||||
enabled: false
|
||||
import-batch-id: time-toggl
|
||||
reconcile-lookback-days: 7
|
||||
|
||||
ted: # Phase 3 TED projection configuration
|
||||
projection:
|
||||
# Enable/disable dual-write into the TED projection model on top of DOC.doc_document
|
||||
|
|
|
|||
|
|
@ -14,9 +14,12 @@ spring:
|
|||
name: document-intelligence-platform
|
||||
|
||||
datasource:
|
||||
url: jdbc:postgresql://localhost:5432/RELM
|
||||
#url: jdbc:postgresql://localhost:5432/RELM
|
||||
#username: ${DB_USERNAME:postgres}
|
||||
#password: ${DB_PASSWORD:P54!pcd#Wi}
|
||||
url: jdbc:postgresql://94.130.218.54:32333/RELM
|
||||
username: ${DB_USERNAME:postgres}
|
||||
password: ${DB_PASSWORD:P54!pcd#Wi}
|
||||
password: ${DB_PASSWORD:PDmXRx0Rbk9OFOn9qO5Gm/mPCfqW8zwbZ+/YIU1lySc=}
|
||||
driver-class-name: org.postgresql.Driver
|
||||
hikari:
|
||||
maximum-pool-size: 5
|
||||
|
|
@ -28,7 +31,7 @@ spring:
|
|||
|
||||
jpa:
|
||||
hibernate:
|
||||
ddl-auto: update
|
||||
ddl-auto: validate
|
||||
show-sql: false
|
||||
open-in-view: false
|
||||
properties:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,8 @@
|
|||
ALTER TABLE doc.doc_embedding
|
||||
ADD COLUMN IF NOT EXISTS prefix_mode VARCHAR(32) NOT NULL DEFAULT 'OFF';
|
||||
|
||||
ALTER TABLE doc.doc_embedding
|
||||
ADD COLUMN IF NOT EXISTS applied_prefix VARCHAR(64);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_doc_embedding_prefix_mode
|
||||
ON doc.doc_embedding(prefix_mode);
|
||||
|
|
@ -2,6 +2,7 @@ package at.procon.dip.architecture;
|
|||
|
||||
import at.procon.dip.domain.ted.config.TedProjectionProperties;
|
||||
import at.procon.dip.ingestion.config.DipIngestionProperties;
|
||||
import at.procon.dip.domain.time.config.TimeDomainProperties;
|
||||
import at.procon.dip.search.config.DipSearchProperties;
|
||||
import at.procon.ted.config.TedProcessorProperties;
|
||||
import java.lang.reflect.Constructor;
|
||||
|
|
@ -22,11 +23,8 @@ class NewRuntimeMustNotDependOnTedProcessorPropertiesTest {
|
|||
List<Class<?>> newRuntimeClasses = List.of(
|
||||
at.procon.dip.ingestion.service.GenericDocumentImportService.class,
|
||||
at.procon.dip.ingestion.camel.GenericFileSystemIngestionRoute.class,
|
||||
at.procon.dip.ingestion.camel.GenericMailIngestionRoute.class,
|
||||
at.procon.dip.ingestion.controller.GenericDocumentImportController.class,
|
||||
at.procon.dip.ingestion.adapter.MailDocumentIngestionAdapter.class,
|
||||
at.procon.dip.ingestion.service.MailMetadataPersistenceService.class,
|
||||
at.procon.dip.ingestion.mail.MailImportIdentityResolver.class,
|
||||
at.procon.dip.ingestion.adapter.TedPackageDocumentIngestionAdapter.class,
|
||||
at.procon.dip.ingestion.service.TedPackageChildImportProcessor.class,
|
||||
at.procon.dip.domain.ted.service.TedNoticeProjectionService.class,
|
||||
|
|
@ -52,6 +50,7 @@ class NewRuntimeMustNotDependOnTedProcessorPropertiesTest {
|
|||
assertThat(DipSearchProperties.class).isNotNull();
|
||||
assertThat(DipIngestionProperties.class).isNotNull();
|
||||
assertThat(TedProjectionProperties.class).isNotNull();
|
||||
assertThat(TimeDomainProperties.class).isNotNull();
|
||||
}
|
||||
|
||||
private boolean hasDependency(Class<?> owner, Class<?> dependency) {
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|||
|
||||
import at.procon.dip.domain.document.DistanceMetric;
|
||||
import at.procon.dip.embedding.model.EmbeddingModelDescriptor;
|
||||
import at.procon.dip.embedding.model.EmbeddingPrefixMode;
|
||||
import at.procon.dip.embedding.model.EmbeddingRequest;
|
||||
import at.procon.dip.embedding.model.EmbeddingUseCase;
|
||||
import at.procon.dip.embedding.model.ResolvedEmbeddingProviderConfig;
|
||||
|
|
@ -27,6 +28,7 @@ class VectorSyncHttpEmbeddingProviderTest {
|
|||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private HttpServer server;
|
||||
private final AtomicReference<String> lastBatchBody = new AtomicReference<>();
|
||||
private final AtomicReference<String> lastSingleBody = new AtomicReference<>();
|
||||
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
|
|
@ -75,6 +77,53 @@ class VectorSyncHttpEmbeddingProviderTest {
|
|||
assertThat(result.vectors()).hasSize(1);
|
||||
assertThat(result.vectors().getFirst()).containsExactly(0.1f, 0.2f, 0.3f);
|
||||
assertThat(result.tokenCount()).isEqualTo(9);
|
||||
assertThat(result.prefixMode()).isEqualTo(EmbeddingPrefixMode.OFF);
|
||||
assertThat(result.appliedPrefix()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldPrefixTextsInClientModeForDocuments() throws Exception {
|
||||
server = HttpServer.create(new InetSocketAddress(0), 0);
|
||||
server.createContext("/vector-sync", this::handleVectorSync);
|
||||
server.start();
|
||||
|
||||
var provider = new VectorSyncHttpEmbeddingProvider(objectMapper);
|
||||
var config = ResolvedEmbeddingProviderConfig.builder()
|
||||
.key("vector-sync-local")
|
||||
.providerType("http-vector-sync")
|
||||
.baseUrl("http://localhost:" + server.getAddress().getPort())
|
||||
.readTimeout(Duration.ofSeconds(5))
|
||||
.headers(Map.of("X-Client", "dip-test"))
|
||||
.batchTruncateText(false)
|
||||
.batchTruncateLength(512)
|
||||
.batchChunkSize(20)
|
||||
.build();
|
||||
var model = new EmbeddingModelDescriptor(
|
||||
"e5-default",
|
||||
"vector-sync-local",
|
||||
"intfloat/multilingual-e5-large",
|
||||
3,
|
||||
DistanceMetric.COSINE,
|
||||
true,
|
||||
false,
|
||||
8192,
|
||||
true,
|
||||
EmbeddingPrefixMode.CLIENT,
|
||||
"query: ",
|
||||
"passage: "
|
||||
);
|
||||
var request = EmbeddingRequest.builder()
|
||||
.modelKey("e5-default")
|
||||
.useCase(EmbeddingUseCase.DOCUMENT)
|
||||
.texts(List.of("This is a sample text to vectorize"))
|
||||
.providerOptions(Map.of())
|
||||
.build();
|
||||
|
||||
var result = provider.embedDocuments(config, model, request);
|
||||
|
||||
assertThat(result.prefixMode()).isEqualTo(EmbeddingPrefixMode.CLIENT);
|
||||
assertThat(result.appliedPrefix()).isEqualTo("passage: ");
|
||||
assertThat(lastSingleBody.get()).contains("passage: This is a sample text to vectorize");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -118,6 +167,7 @@ class VectorSyncHttpEmbeddingProviderTest {
|
|||
assertThat(result.vectors().get(0)).containsExactly(0.1f, 0.2f, 0.3f);
|
||||
assertThat(result.vectors().get(1)).containsExactly(0.4f, 0.5f, 0.6f);
|
||||
assertThat(result.tokenCount()).isEqualTo(12);
|
||||
assertThat(result.prefixMode()).isEqualTo(EmbeddingPrefixMode.OFF);
|
||||
|
||||
JsonNode requestBody = objectMapper.readTree(lastBatchBody.get());
|
||||
assertThat(requestBody.get("truncate_text").asBoolean()).isTrue();
|
||||
|
|
@ -177,9 +227,10 @@ class VectorSyncHttpEmbeddingProviderTest {
|
|||
body = new String(in.readAllBytes(), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
lastSingleBody.set(body);
|
||||
assertThat(exchange.getRequestMethod()).isEqualTo("POST");
|
||||
assertThat(body).contains("\"model\":\"intfloat/multilingual-e5-large\"");
|
||||
assertThat(body).contains("\"text\":\"This is a sample text to vectorize\"");
|
||||
assertThat(body).contains("This is a sample text to vectorize");
|
||||
assertThat(exchange.getRequestHeaders().getFirst("X-Client")).isEqualTo("dip-test");
|
||||
|
||||
respondJson(exchange, """
|
||||
|
|
|
|||
|
|
@ -175,8 +175,8 @@ class RepresentationEmbeddingOrchestratorTest {
|
|||
ArgumentCaptor<List<String>> textsCaptor = ArgumentCaptor.forClass(List.class);
|
||||
verify(executionService, times(1)).embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), textsCaptor.capture());
|
||||
assertThat(textsCaptor.getValue()).containsExactly("alpha", "beta");
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1"));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1"));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1"), eq(EmbeddingPrefixMode.OFF), eq(null));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1"), eq(EmbeddingPrefixMode.OFF), eq(null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -220,7 +220,7 @@ class RepresentationEmbeddingOrchestratorTest {
|
|||
orchestrator.processNextReadyBatch();
|
||||
|
||||
verify(executionService, times(1)).embedTexts(eq("mock-search"), eq(EmbeddingUseCase.DOCUMENT), eq(List.of("gamma")));
|
||||
verify(executionPersistenceService, never()).completeJob(eq(embeddingId), any(float[].class), eq(null), eq(job.getId()), anyString());
|
||||
verify(executionPersistenceService, never()).completeJob(eq(embeddingId), any(float[].class), eq(null), eq(job.getId()), anyString(), any(), any());
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId), any(EmbeddingProviderResult.class), eq(job.getId()), eq("req-2"));
|
||||
}
|
||||
|
||||
|
|
@ -305,11 +305,11 @@ class RepresentationEmbeddingOrchestratorTest {
|
|||
assertThat(processed).isEqualTo(6);
|
||||
verify(jobService, times(4)).claimNextReadyJobs(2);
|
||||
verify(executionService, times(3)).embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), any());
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1"));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1"));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId3), aryEq(new float[]{0.7f, 0.8f, 0.9f}), eq(null), eq(job3.getId()), eq("batch-req-2"));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId4), aryEq(new float[]{1.0f, 1.1f, 1.2f}), eq(null), eq(job4.getId()), eq("batch-req-2"));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId5), aryEq(new float[]{1.3f, 1.4f, 1.5f}), eq(null), eq(job5.getId()), eq("batch-req-3"));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId6), aryEq(new float[]{1.6f, 1.7f, 1.8f}), eq(null), eq(job6.getId()), eq("batch-req-3"));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1"), eq(EmbeddingPrefixMode.OFF), eq(null));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1"), eq(EmbeddingPrefixMode.OFF), eq(null));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId3), aryEq(new float[]{0.7f, 0.8f, 0.9f}), eq(null), eq(job3.getId()), eq("batch-req-2"), eq(EmbeddingPrefixMode.OFF), eq(null));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId4), aryEq(new float[]{1.0f, 1.1f, 1.2f}), eq(null), eq(job4.getId()), eq("batch-req-2"), eq(EmbeddingPrefixMode.OFF), eq(null));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId5), aryEq(new float[]{1.3f, 1.4f, 1.5f}), eq(null), eq(job5.getId()), eq("batch-req-3"), eq(EmbeddingPrefixMode.OFF), eq(null));
|
||||
verify(executionPersistenceService).completeJob(eq(embeddingId6), aryEq(new float[]{1.6f, 1.7f, 1.8f}), eq(null), eq(job6.getId()), eq("batch-req-3"), eq(EmbeddingPrefixMode.OFF), eq(null));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue