runtime-split-patch-b
This commit is contained in:
parent
74609e481d
commit
44995cebf7
|
|
@ -0,0 +1,59 @@
|
|||
# Runtime split Patch B
|
||||
|
||||
Patch B builds on Patch A and makes the NEW runtime actually process embedding jobs.
|
||||
|
||||
## What changes
|
||||
|
||||
### 1. New embedding job scheduler
|
||||
Adds:
|
||||
|
||||
- `EmbeddingJobScheduler`
|
||||
- `EmbeddingJobSchedulingConfiguration`
|
||||
|
||||
Behavior:
|
||||
- enabled only in `NEW` runtime mode
|
||||
- active only when `dip.embedding.jobs.enabled=true`
|
||||
- periodically calls:
|
||||
- `RepresentationEmbeddingOrchestrator.processNextReadyBatch()`
|
||||
|
||||
### 2. Generic import hands off to the new embedding job path
|
||||
`GenericDocumentImportService` is updated so that in `NEW` mode it:
|
||||
|
||||
- resolves `dip.embedding.default-document-model`
|
||||
- ensures the model is registered in `DOC.doc_embedding_model`
|
||||
- creates embedding jobs through:
|
||||
- `RepresentationEmbeddingOrchestrator.enqueueRepresentation(...)`
|
||||
|
||||
It no longer creates legacy-style pending embeddings as the primary handoff for the NEW runtime path.
|
||||
|
||||
## Notes
|
||||
|
||||
- This patch assumes Patch A has already introduced:
|
||||
- `RuntimeMode`
|
||||
- `RuntimeModeProperties`
|
||||
- `@ConditionalOnRuntimeMode`
|
||||
- This patch does not yet remove the legacy vectorization runtime.
|
||||
That remains the job of subsequent cutover steps.
|
||||
|
||||
## Expected runtime behavior in NEW mode
|
||||
|
||||
- `GenericDocumentImportService` persists new generic representations
|
||||
- selected representations are queued into `DOC.doc_embedding_job`
|
||||
- scheduler processes pending jobs
|
||||
- vectors are persisted through the new embedding subsystem
|
||||
|
||||
## New config
|
||||
|
||||
Example:
|
||||
|
||||
```yaml
|
||||
dip:
|
||||
runtime:
|
||||
mode: NEW
|
||||
|
||||
embedding:
|
||||
enabled: true
|
||||
jobs:
|
||||
enabled: true
|
||||
scheduler-delay-ms: 5000
|
||||
```
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
package at.procon.dip.embedding.config;
|
||||
|
||||
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
||||
import at.procon.dip.runtime.config.RuntimeMode;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@Configuration
|
||||
@EnableScheduling
|
||||
@ConditionalOnRuntimeMode(RuntimeMode.NEW)
|
||||
public class EmbeddingJobSchedulingConfiguration {
|
||||
}
|
||||
|
|
@ -62,5 +62,6 @@ public class EmbeddingProperties {
|
|||
private int maxRetries = 5;
|
||||
private Duration initialRetryDelay = Duration.ofSeconds(30);
|
||||
private Duration maxRetryDelay = Duration.ofHours(6);
|
||||
private long schedulerDelayMs = 5000;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,32 @@
|
|||
package at.procon.dip.embedding.job;
|
||||
|
||||
import at.procon.dip.embedding.service.RepresentationEmbeddingOrchestrator;
|
||||
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
||||
import at.procon.dip.runtime.config.RuntimeMode;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@ConditionalOnRuntimeMode(RuntimeMode.NEW)
|
||||
@ConditionalOnProperty(prefix = "dip.embedding.jobs", name = "enabled", havingValue = "true")
|
||||
public class EmbeddingJobScheduler {
|
||||
|
||||
private final RepresentationEmbeddingOrchestrator orchestrator;
|
||||
|
||||
@Scheduled(fixedDelayString = "${dip.embedding.jobs.scheduler-delay-ms:5000}")
|
||||
public void processNextBatch() {
|
||||
try {
|
||||
int processed = orchestrator.processNextReadyBatch();
|
||||
if (processed > 0) {
|
||||
log.debug("NEW runtime embedding job scheduler processed {} job(s)", processed);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
log.warn("NEW runtime embedding job scheduler failed: {}", ex.getMessage(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -38,6 +38,11 @@ import at.procon.dip.normalization.spi.RepresentationBuildRequest;
|
|||
import at.procon.dip.normalization.spi.TextRepresentationDraft;
|
||||
import at.procon.dip.processing.spi.DocumentProcessingPolicy;
|
||||
import at.procon.dip.processing.spi.StructuredProcessingRequest;
|
||||
import at.procon.dip.embedding.config.EmbeddingProperties;
|
||||
import at.procon.dip.embedding.service.EmbeddingModelCatalogService;
|
||||
import at.procon.dip.embedding.service.RepresentationEmbeddingOrchestrator;
|
||||
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
||||
import at.procon.dip.runtime.config.RuntimeMode;
|
||||
import at.procon.ted.config.TedProcessorProperties;
|
||||
import at.procon.ted.util.HashUtils;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
|
@ -60,6 +65,7 @@ import org.springframework.util.StringUtils;
|
|||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@ConditionalOnRuntimeMode(RuntimeMode.NEW)
|
||||
public class GenericDocumentImportService {
|
||||
|
||||
private final TedProcessorProperties properties;
|
||||
|
|
@ -71,6 +77,9 @@ public class GenericDocumentImportService {
|
|||
private final DocumentContentService documentContentService;
|
||||
private final DocumentRepresentationService documentRepresentationService;
|
||||
private final DocumentEmbeddingService documentEmbeddingService;
|
||||
private final EmbeddingProperties embeddingProperties;
|
||||
private final EmbeddingModelCatalogService embeddingModelCatalogService;
|
||||
private final RepresentationEmbeddingOrchestrator representationEmbeddingOrchestrator;
|
||||
private final DocumentClassificationService classificationService;
|
||||
private final DocumentExtractionService extractionService;
|
||||
private final TextRepresentationBuildService representationBuildService;
|
||||
|
|
@ -395,19 +404,13 @@ public class GenericDocumentImportService {
|
|||
return;
|
||||
}
|
||||
|
||||
DocumentEmbeddingModel model = null;
|
||||
if (properties.getVectorization().isEnabled() && properties.getVectorization().isGenericPipelineEnabled()) {
|
||||
model = documentEmbeddingService.registerModel(new RegisterEmbeddingModelCommand(
|
||||
properties.getVectorization().getModelName(),
|
||||
properties.getVectorization().getEmbeddingProvider(),
|
||||
properties.getVectorization().getModelName(),
|
||||
properties.getVectorization().getDimensions(),
|
||||
null,
|
||||
false,
|
||||
true
|
||||
));
|
||||
String embeddingModelKey = resolveNewRuntimeEmbeddingModelKey();
|
||||
if (embeddingModelKey != null) {
|
||||
embeddingModelCatalogService.ensureRegistered(embeddingModelKey);
|
||||
}
|
||||
|
||||
java.util.List<UUID> queuedRepresentationIds = new java.util.ArrayList<>();
|
||||
|
||||
for (TextRepresentationDraft draft : drafts) {
|
||||
if (!StringUtils.hasText(draft.textBody())) {
|
||||
continue;
|
||||
|
|
@ -431,13 +434,31 @@ public class GenericDocumentImportService {
|
|||
draft.textBody()
|
||||
));
|
||||
|
||||
if (model != null && shouldQueueEmbedding(draft)) {
|
||||
documentEmbeddingService.ensurePendingEmbedding(document.getId(), representation.getId(), model.getId());
|
||||
if (embeddingModelKey != null && shouldQueueEmbedding(draft)) {
|
||||
queuedRepresentationIds.add(representation.getId());
|
||||
}
|
||||
}
|
||||
|
||||
if (embeddingModelKey != null) {
|
||||
for (UUID representationId : queuedRepresentationIds) {
|
||||
representationEmbeddingOrchestrator.enqueueRepresentation(document.getId(), representationId, embeddingModelKey);
|
||||
}
|
||||
}
|
||||
|
||||
documentService.updateStatus(document.getId(), DocumentStatus.REPRESENTED);
|
||||
}
|
||||
|
||||
private String resolveNewRuntimeEmbeddingModelKey() {
|
||||
if (!embeddingProperties.isEnabled() || !embeddingProperties.getJobs().isEnabled()) {
|
||||
return null;
|
||||
}
|
||||
if (!StringUtils.hasText(embeddingProperties.getDefaultDocumentModel())) {
|
||||
log.warn("NEW runtime embedding is enabled, but dip.embedding.default-document-model is not configured; skipping embedding job creation");
|
||||
return null;
|
||||
}
|
||||
return embeddingProperties.getDefaultDocumentModel();
|
||||
}
|
||||
|
||||
private DocumentContent resolveLinkedContent(TextRepresentationDraft draft,
|
||||
DocumentContent originalContent,
|
||||
Map<ContentRole, DocumentContent> derivedContent) {
|
||||
|
|
|
|||
|
|
@ -1,3 +1,9 @@
|
|||
dip:
|
||||
runtime:
|
||||
mode: NEW
|
||||
|
||||
embedding:
|
||||
enabled: true
|
||||
jobs:
|
||||
enabled: true
|
||||
scheduler-delay-ms: 5000
|
||||
|
|
|
|||
Loading…
Reference in New Issue