From 44995cebf7bf72428f01a2065c54bfa6d8256a20 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 23 Mar 2026 17:03:54 +0100 Subject: [PATCH] runtime-split-patch-b --- docs/architecture/RUNTIME_SPLIT_PATCH_B.md | 59 +++++++++++++++++++ .../EmbeddingJobSchedulingConfiguration.java | 12 ++++ .../embedding/config/EmbeddingProperties.java | 1 + .../embedding/job/EmbeddingJobScheduler.java | 32 ++++++++++ .../service/GenericDocumentImportService.java | 47 +++++++++++---- src/main/resources/application-new.yml | 6 ++ 6 files changed, 144 insertions(+), 13 deletions(-) create mode 100644 docs/architecture/RUNTIME_SPLIT_PATCH_B.md create mode 100644 src/main/java/at/procon/dip/embedding/config/EmbeddingJobSchedulingConfiguration.java create mode 100644 src/main/java/at/procon/dip/embedding/job/EmbeddingJobScheduler.java diff --git a/docs/architecture/RUNTIME_SPLIT_PATCH_B.md b/docs/architecture/RUNTIME_SPLIT_PATCH_B.md new file mode 100644 index 0000000..1446501 --- /dev/null +++ b/docs/architecture/RUNTIME_SPLIT_PATCH_B.md @@ -0,0 +1,59 @@ +# Runtime split Patch B + +Patch B builds on Patch A and makes the NEW runtime actually process embedding jobs. + +## What changes + +### 1. New embedding job scheduler +Adds: + +- `EmbeddingJobScheduler` +- `EmbeddingJobSchedulingConfiguration` + +Behavior: +- enabled only in `NEW` runtime mode +- active only when `dip.embedding.jobs.enabled=true` +- periodically calls: + - `RepresentationEmbeddingOrchestrator.processNextReadyBatch()` + +### 2. Generic import hands off to the new embedding job path +`GenericDocumentImportService` is updated so that in `NEW` mode it: + +- resolves `dip.embedding.default-document-model` +- ensures the model is registered in `DOC.doc_embedding_model` +- creates embedding jobs through: + - `RepresentationEmbeddingOrchestrator.enqueueRepresentation(...)` + +It no longer creates legacy-style pending embeddings as the primary handoff for the NEW runtime path. + +## Notes + +- This patch assumes Patch A has already introduced: + - `RuntimeMode` + - `RuntimeModeProperties` + - `@ConditionalOnRuntimeMode` +- This patch does not yet remove the legacy vectorization runtime. + That remains the job of subsequent cutover steps. + +## Expected runtime behavior in NEW mode + +- `GenericDocumentImportService` persists new generic representations +- selected representations are queued into `DOC.doc_embedding_job` +- scheduler processes pending jobs +- vectors are persisted through the new embedding subsystem + +## New config + +Example: + +```yaml +dip: + runtime: + mode: NEW + + embedding: + enabled: true + jobs: + enabled: true + scheduler-delay-ms: 5000 +``` diff --git a/src/main/java/at/procon/dip/embedding/config/EmbeddingJobSchedulingConfiguration.java b/src/main/java/at/procon/dip/embedding/config/EmbeddingJobSchedulingConfiguration.java new file mode 100644 index 0000000..cead9a8 --- /dev/null +++ b/src/main/java/at/procon/dip/embedding/config/EmbeddingJobSchedulingConfiguration.java @@ -0,0 +1,12 @@ +package at.procon.dip.embedding.config; + +import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; +import at.procon.dip.runtime.config.RuntimeMode; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; + +@Configuration +@EnableScheduling +@ConditionalOnRuntimeMode(RuntimeMode.NEW) +public class EmbeddingJobSchedulingConfiguration { +} diff --git a/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java b/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java index bc47cc0..ace2954 100644 --- a/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java +++ b/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java @@ -62,5 +62,6 @@ public class EmbeddingProperties { private int maxRetries = 5; private Duration initialRetryDelay = Duration.ofSeconds(30); private Duration maxRetryDelay = Duration.ofHours(6); + private long schedulerDelayMs = 5000; } } diff --git a/src/main/java/at/procon/dip/embedding/job/EmbeddingJobScheduler.java b/src/main/java/at/procon/dip/embedding/job/EmbeddingJobScheduler.java new file mode 100644 index 0000000..907efc1 --- /dev/null +++ b/src/main/java/at/procon/dip/embedding/job/EmbeddingJobScheduler.java @@ -0,0 +1,32 @@ +package at.procon.dip.embedding.job; + +import at.procon.dip.embedding.service.RepresentationEmbeddingOrchestrator; +import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; +import at.procon.dip.runtime.config.RuntimeMode; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Slf4j +@ConditionalOnRuntimeMode(RuntimeMode.NEW) +@ConditionalOnProperty(prefix = "dip.embedding.jobs", name = "enabled", havingValue = "true") +public class EmbeddingJobScheduler { + + private final RepresentationEmbeddingOrchestrator orchestrator; + + @Scheduled(fixedDelayString = "${dip.embedding.jobs.scheduler-delay-ms:5000}") + public void processNextBatch() { + try { + int processed = orchestrator.processNextReadyBatch(); + if (processed > 0) { + log.debug("NEW runtime embedding job scheduler processed {} job(s)", processed); + } + } catch (Exception ex) { + log.warn("NEW runtime embedding job scheduler failed: {}", ex.getMessage(), ex); + } + } +} diff --git a/src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java b/src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java index f0cadae..ad4cb38 100644 --- a/src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java +++ b/src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java @@ -38,6 +38,11 @@ import at.procon.dip.normalization.spi.RepresentationBuildRequest; import at.procon.dip.normalization.spi.TextRepresentationDraft; import at.procon.dip.processing.spi.DocumentProcessingPolicy; import at.procon.dip.processing.spi.StructuredProcessingRequest; +import at.procon.dip.embedding.config.EmbeddingProperties; +import at.procon.dip.embedding.service.EmbeddingModelCatalogService; +import at.procon.dip.embedding.service.RepresentationEmbeddingOrchestrator; +import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; +import at.procon.dip.runtime.config.RuntimeMode; import at.procon.ted.config.TedProcessorProperties; import at.procon.ted.util.HashUtils; import java.nio.charset.StandardCharsets; @@ -60,6 +65,7 @@ import org.springframework.util.StringUtils; @Service @RequiredArgsConstructor @Slf4j +@ConditionalOnRuntimeMode(RuntimeMode.NEW) public class GenericDocumentImportService { private final TedProcessorProperties properties; @@ -71,6 +77,9 @@ public class GenericDocumentImportService { private final DocumentContentService documentContentService; private final DocumentRepresentationService documentRepresentationService; private final DocumentEmbeddingService documentEmbeddingService; + private final EmbeddingProperties embeddingProperties; + private final EmbeddingModelCatalogService embeddingModelCatalogService; + private final RepresentationEmbeddingOrchestrator representationEmbeddingOrchestrator; private final DocumentClassificationService classificationService; private final DocumentExtractionService extractionService; private final TextRepresentationBuildService representationBuildService; @@ -395,19 +404,13 @@ public class GenericDocumentImportService { return; } - DocumentEmbeddingModel model = null; - if (properties.getVectorization().isEnabled() && properties.getVectorization().isGenericPipelineEnabled()) { - model = documentEmbeddingService.registerModel(new RegisterEmbeddingModelCommand( - properties.getVectorization().getModelName(), - properties.getVectorization().getEmbeddingProvider(), - properties.getVectorization().getModelName(), - properties.getVectorization().getDimensions(), - null, - false, - true - )); + String embeddingModelKey = resolveNewRuntimeEmbeddingModelKey(); + if (embeddingModelKey != null) { + embeddingModelCatalogService.ensureRegistered(embeddingModelKey); } + java.util.List queuedRepresentationIds = new java.util.ArrayList<>(); + for (TextRepresentationDraft draft : drafts) { if (!StringUtils.hasText(draft.textBody())) { continue; @@ -431,13 +434,31 @@ public class GenericDocumentImportService { draft.textBody() )); - if (model != null && shouldQueueEmbedding(draft)) { - documentEmbeddingService.ensurePendingEmbedding(document.getId(), representation.getId(), model.getId()); + if (embeddingModelKey != null && shouldQueueEmbedding(draft)) { + queuedRepresentationIds.add(representation.getId()); + } + } + + if (embeddingModelKey != null) { + for (UUID representationId : queuedRepresentationIds) { + representationEmbeddingOrchestrator.enqueueRepresentation(document.getId(), representationId, embeddingModelKey); } } + documentService.updateStatus(document.getId(), DocumentStatus.REPRESENTED); } + private String resolveNewRuntimeEmbeddingModelKey() { + if (!embeddingProperties.isEnabled() || !embeddingProperties.getJobs().isEnabled()) { + return null; + } + if (!StringUtils.hasText(embeddingProperties.getDefaultDocumentModel())) { + log.warn("NEW runtime embedding is enabled, but dip.embedding.default-document-model is not configured; skipping embedding job creation"); + return null; + } + return embeddingProperties.getDefaultDocumentModel(); + } + private DocumentContent resolveLinkedContent(TextRepresentationDraft draft, DocumentContent originalContent, Map derivedContent) { diff --git a/src/main/resources/application-new.yml b/src/main/resources/application-new.yml index efd8661..533aafd 100644 --- a/src/main/resources/application-new.yml +++ b/src/main/resources/application-new.yml @@ -1,3 +1,9 @@ dip: runtime: mode: NEW + + embedding: + enabled: true + jobs: + enabled: true + scheduler-delay-ms: 5000