ted legacy data migration
parent
152d9739af
commit
61d163f8fe
@ -0,0 +1,35 @@
|
||||
package at.procon.dip.migration.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "dip.migration.legacy-ted")
|
||||
@Data
|
||||
public class LegacyTedBackfillProperties {
|
||||
|
||||
/** Enable the TED legacy -> DOC/projection backfill subsystem. */
|
||||
private boolean enabled = false;
|
||||
|
||||
/** Run the backfill automatically on application startup in NEW runtime. */
|
||||
private boolean startupEnabled = false;
|
||||
|
||||
/** Number of legacy TED documents to process per fetch batch. */
|
||||
private int batchSize = 100;
|
||||
|
||||
/**
|
||||
* Optional cap for a single run. 0 or negative means unlimited until the full legacy set is backfilled.
|
||||
* Useful to migrate incrementally in controlled slices.
|
||||
*/
|
||||
private long maxDocumentsPerRun = 0;
|
||||
|
||||
/** Resume the latest STOPPED/FAILED run from its persisted cursor. */
|
||||
private boolean resumeLatestIncompleteRun = true;
|
||||
|
||||
/** Import batch id written to DOC.doc_source rows created by the migration. */
|
||||
private String importBatchId = "legacy-ted-backfill";
|
||||
|
||||
/** Queue embeddings for migrated TED representations after the DOC/projection backfill. */
|
||||
private boolean queueEmbeddings = false;
|
||||
}
|
||||
@ -0,0 +1,75 @@
|
||||
package at.procon.dip.migration.entity;
|
||||
|
||||
import at.procon.dip.architecture.SchemaNames;
|
||||
import jakarta.persistence.Column;
|
||||
import jakarta.persistence.Entity;
|
||||
import jakarta.persistence.FetchType;
|
||||
import jakarta.persistence.GeneratedValue;
|
||||
import jakarta.persistence.GenerationType;
|
||||
import jakarta.persistence.Id;
|
||||
import jakarta.persistence.Index;
|
||||
import jakarta.persistence.JoinColumn;
|
||||
import jakarta.persistence.ManyToOne;
|
||||
import jakarta.persistence.PrePersist;
|
||||
import jakarta.persistence.Table;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.UUID;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
|
||||
@Entity
|
||||
@Table(schema = SchemaNames.DOC, name = "doc_legacy_ted_migration_checkpoint", indexes = {
|
||||
@Index(name = "idx_doc_legacy_ted_mig_ckpt_run", columnList = "run_id"),
|
||||
@Index(name = "idx_doc_legacy_ted_mig_ckpt_batch", columnList = "batch_number")
|
||||
})
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class LegacyTedMigrationCheckpoint {
|
||||
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.UUID)
|
||||
private UUID id;
|
||||
|
||||
@ManyToOne(fetch = FetchType.LAZY, optional = false)
|
||||
@JoinColumn(name = "run_id", nullable = false)
|
||||
private LegacyTedMigrationRun run;
|
||||
|
||||
@Column(name = "batch_number", nullable = false)
|
||||
private int batchNumber;
|
||||
|
||||
@Column(name = "batch_processed_count", nullable = false)
|
||||
private int batchProcessedCount;
|
||||
|
||||
@Column(name = "cumulative_processed_count", nullable = false)
|
||||
private long cumulativeProcessedCount;
|
||||
|
||||
@Column(name = "last_legacy_created_at")
|
||||
private OffsetDateTime lastLegacyCreatedAt;
|
||||
|
||||
@Column(name = "last_legacy_document_id")
|
||||
private UUID lastLegacyDocumentId;
|
||||
|
||||
@Column(name = "last_doc_document_id")
|
||||
private UUID lastDocDocumentId;
|
||||
|
||||
@Column(name = "last_projection_id")
|
||||
private UUID lastProjectionId;
|
||||
|
||||
@Column(name = "note", columnDefinition = "TEXT")
|
||||
private String note;
|
||||
|
||||
@Builder.Default
|
||||
@Column(name = "created_at", nullable = false, updatable = false)
|
||||
private OffsetDateTime createdAt = OffsetDateTime.now();
|
||||
|
||||
@PrePersist
|
||||
protected void onCreate() {
|
||||
createdAt = OffsetDateTime.now();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,103 @@
|
||||
package at.procon.dip.migration.entity;
|
||||
|
||||
import at.procon.dip.architecture.SchemaNames;
|
||||
import jakarta.persistence.Column;
|
||||
import jakarta.persistence.Entity;
|
||||
import jakarta.persistence.EnumType;
|
||||
import jakarta.persistence.Enumerated;
|
||||
import jakarta.persistence.GeneratedValue;
|
||||
import jakarta.persistence.GenerationType;
|
||||
import jakarta.persistence.Id;
|
||||
import jakarta.persistence.Index;
|
||||
import jakarta.persistence.PrePersist;
|
||||
import jakarta.persistence.PreUpdate;
|
||||
import jakarta.persistence.Table;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.UUID;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
|
||||
@Entity
|
||||
@Table(schema = SchemaNames.DOC, name = "doc_legacy_ted_migration_run", indexes = {
|
||||
@Index(name = "idx_doc_legacy_ted_mig_run_status", columnList = "status"),
|
||||
@Index(name = "idx_doc_legacy_ted_mig_run_started", columnList = "started_at DESC")
|
||||
})
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class LegacyTedMigrationRun {
|
||||
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.UUID)
|
||||
private UUID id;
|
||||
|
||||
@Enumerated(EnumType.STRING)
|
||||
@Column(name = "status", nullable = false, length = 32)
|
||||
private LegacyTedMigrationRunStatus status;
|
||||
|
||||
@Column(name = "import_batch_id", length = 255)
|
||||
private String importBatchId;
|
||||
|
||||
@Column(name = "queue_embeddings", nullable = false)
|
||||
private boolean queueEmbeddings;
|
||||
|
||||
@Column(name = "batch_size", nullable = false)
|
||||
private int batchSize;
|
||||
|
||||
@Column(name = "max_documents_per_run")
|
||||
private Long maxDocumentsPerRun;
|
||||
|
||||
@Column(name = "processed_count", nullable = false)
|
||||
@Builder.Default
|
||||
private long processedCount = 0;
|
||||
|
||||
@Column(name = "success_count", nullable = false)
|
||||
@Builder.Default
|
||||
private long successCount = 0;
|
||||
|
||||
@Column(name = "failed_count", nullable = false)
|
||||
@Builder.Default
|
||||
private long failedCount = 0;
|
||||
|
||||
@Column(name = "last_legacy_created_at")
|
||||
private OffsetDateTime lastLegacyCreatedAt;
|
||||
|
||||
@Column(name = "last_legacy_document_id")
|
||||
private UUID lastLegacyDocumentId;
|
||||
|
||||
@Column(name = "last_doc_document_id")
|
||||
private UUID lastDocDocumentId;
|
||||
|
||||
@Column(name = "last_projection_id")
|
||||
private UUID lastProjectionId;
|
||||
|
||||
@Column(name = "last_error", columnDefinition = "TEXT")
|
||||
private String lastError;
|
||||
|
||||
@Builder.Default
|
||||
@Column(name = "started_at", nullable = false, updatable = false)
|
||||
private OffsetDateTime startedAt = OffsetDateTime.now();
|
||||
|
||||
@Builder.Default
|
||||
@Column(name = "updated_at", nullable = false)
|
||||
private OffsetDateTime updatedAt = OffsetDateTime.now();
|
||||
|
||||
@Column(name = "completed_at")
|
||||
private OffsetDateTime completedAt;
|
||||
|
||||
@PrePersist
|
||||
protected void onCreate() {
|
||||
startedAt = startedAt == null ? OffsetDateTime.now() : startedAt;
|
||||
updatedAt = OffsetDateTime.now();
|
||||
}
|
||||
|
||||
@PreUpdate
|
||||
protected void onUpdate() {
|
||||
updatedAt = OffsetDateTime.now();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,8 @@
|
||||
package at.procon.dip.migration.entity;
|
||||
|
||||
public enum LegacyTedMigrationRunStatus {
|
||||
RUNNING,
|
||||
STOPPED,
|
||||
COMPLETED,
|
||||
FAILED
|
||||
}
|
||||
@ -0,0 +1,10 @@
|
||||
package at.procon.dip.migration.repository;
|
||||
|
||||
import at.procon.dip.migration.entity.LegacyTedMigrationCheckpoint;
|
||||
import java.util.UUID;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
public interface LegacyTedMigrationCheckpointRepository extends JpaRepository<LegacyTedMigrationCheckpoint, UUID> {
|
||||
|
||||
long countByRun_Id(UUID runId);
|
||||
}
|
||||
@ -0,0 +1,23 @@
|
||||
package at.procon.dip.migration.repository;
|
||||
|
||||
import at.procon.dip.migration.entity.LegacyTedMigrationRun;
|
||||
import at.procon.dip.migration.entity.LegacyTedMigrationRunStatus;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
public interface LegacyTedMigrationRunRepository extends JpaRepository<LegacyTedMigrationRun, UUID> {
|
||||
|
||||
@Query("""
|
||||
select r
|
||||
from LegacyTedMigrationRun r
|
||||
where r.status in :statuses
|
||||
order by r.startedAt desc
|
||||
""")
|
||||
List<LegacyTedMigrationRun> findLatestByStatuses(@Param("statuses") Collection<LegacyTedMigrationRunStatus> statuses,
|
||||
Pageable pageable);
|
||||
}
|
||||
@ -0,0 +1,187 @@
|
||||
package at.procon.dip.migration.service;
|
||||
|
||||
import at.procon.dip.migration.config.LegacyTedBackfillProperties;
|
||||
import at.procon.dip.migration.entity.LegacyTedMigrationCheckpoint;
|
||||
import at.procon.dip.migration.entity.LegacyTedMigrationRun;
|
||||
import at.procon.dip.migration.entity.LegacyTedMigrationRunStatus;
|
||||
import at.procon.dip.migration.repository.LegacyTedMigrationCheckpointRepository;
|
||||
import at.procon.dip.migration.repository.LegacyTedMigrationRunRepository;
|
||||
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
||||
import at.procon.dip.runtime.config.RuntimeMode;
|
||||
import at.procon.ted.repository.LegacyTedMigrationCursor;
|
||||
import at.procon.ted.repository.ProcurementDocumentRepository;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@Service
|
||||
@ConditionalOnRuntimeMode(RuntimeMode.NEW)
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class LegacyTedBackfillMigrationService {
|
||||
|
||||
private final LegacyTedBackfillProperties properties;
|
||||
private final ProcurementDocumentRepository procurementDocumentRepository;
|
||||
private final LegacyTedMigrationRunRepository runRepository;
|
||||
private final LegacyTedMigrationCheckpointRepository checkpointRepository;
|
||||
private final LegacyTedBackfillWorker worker;
|
||||
|
||||
public UUID runBackfill() {
|
||||
if (!properties.isEnabled()) {
|
||||
log.info("Legacy TED backfill is disabled");
|
||||
return null;
|
||||
}
|
||||
|
||||
LegacyTedMigrationRun run = resolveRun();
|
||||
log.info("Starting legacy TED -> DOC/projection backfill run {} (batchSize={}, maxDocumentsPerRun={}, queueEmbeddings={})",
|
||||
run.getId(), run.getBatchSize(), run.getMaxDocumentsPerRun(), run.isQueueEmbeddings());
|
||||
|
||||
long existingCheckpointCount = checkpointRepository.countByRun_Id(run.getId());
|
||||
int batchNumber = existingCheckpointCount >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) existingCheckpointCount;
|
||||
long processedInThisInvocation = 0;
|
||||
|
||||
while (true) {
|
||||
int limit = effectiveBatchLimit(run, processedInThisInvocation);
|
||||
if (limit <= 0) {
|
||||
markStopped(run, "Stopped because maxDocumentsPerRun was reached");
|
||||
createCheckpoint(run, ++batchNumber, 0, "Invocation limit reached");
|
||||
return run.getId();
|
||||
}
|
||||
|
||||
List<LegacyTedMigrationCursor> batch = procurementDocumentRepository.findNextMigrationBatch(
|
||||
run.getLastLegacyCreatedAt(),
|
||||
run.getLastLegacyDocumentId(),
|
||||
limit
|
||||
);
|
||||
|
||||
if (batch.isEmpty()) {
|
||||
markCompleted(run);
|
||||
createCheckpoint(run, ++batchNumber, 0, "Backfill completed - no more legacy TED documents after current cursor");
|
||||
return run.getId();
|
||||
}
|
||||
|
||||
int processedInBatch = 0;
|
||||
for (LegacyTedMigrationCursor cursor : batch) {
|
||||
try {
|
||||
LegacyTedBackfillWorker.BackfillOutcome outcome = worker.backfill(
|
||||
cursor.getId(),
|
||||
run.getImportBatchId(),
|
||||
run.isQueueEmbeddings()
|
||||
);
|
||||
advanceRun(run, cursor, outcome);
|
||||
processedInBatch++;
|
||||
processedInThisInvocation++;
|
||||
} catch (RuntimeException ex) {
|
||||
markFailed(run, cursor, ex);
|
||||
createCheckpoint(run, ++batchNumber, processedInBatch,
|
||||
"Failed at legacy document %s: %s".formatted(cursor.getId(), ex.getMessage()));
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
createCheckpoint(run, ++batchNumber, processedInBatch,
|
||||
"Processed %d legacy TED documents in batch".formatted(processedInBatch));
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional
|
||||
protected LegacyTedMigrationRun resolveRun() {
|
||||
if (properties.isResumeLatestIncompleteRun()) {
|
||||
List<LegacyTedMigrationRun> resumable = runRepository.findLatestByStatuses(
|
||||
EnumSet.of(LegacyTedMigrationRunStatus.RUNNING, LegacyTedMigrationRunStatus.STOPPED, LegacyTedMigrationRunStatus.FAILED),
|
||||
PageRequest.of(0, 1)
|
||||
);
|
||||
if (!resumable.isEmpty()) {
|
||||
LegacyTedMigrationRun run = resumable.get(0);
|
||||
run.setStatus(LegacyTedMigrationRunStatus.RUNNING);
|
||||
run.setLastError(null);
|
||||
run.setCompletedAt(null);
|
||||
return runRepository.save(run);
|
||||
}
|
||||
}
|
||||
|
||||
return runRepository.save(LegacyTedMigrationRun.builder()
|
||||
.status(LegacyTedMigrationRunStatus.RUNNING)
|
||||
.importBatchId(properties.getImportBatchId())
|
||||
.queueEmbeddings(properties.isQueueEmbeddings())
|
||||
.batchSize(Math.max(1, properties.getBatchSize()))
|
||||
.maxDocumentsPerRun(properties.getMaxDocumentsPerRun() > 0 ? properties.getMaxDocumentsPerRun() : null)
|
||||
.build());
|
||||
}
|
||||
|
||||
@Transactional
|
||||
protected void advanceRun(LegacyTedMigrationRun run,
|
||||
LegacyTedMigrationCursor cursor,
|
||||
LegacyTedBackfillWorker.BackfillOutcome outcome) {
|
||||
run.setStatus(LegacyTedMigrationRunStatus.RUNNING);
|
||||
run.setProcessedCount(run.getProcessedCount() + 1);
|
||||
run.setSuccessCount(run.getSuccessCount() + 1);
|
||||
run.setLastLegacyCreatedAt(cursor.getCreatedAt());
|
||||
run.setLastLegacyDocumentId(cursor.getId());
|
||||
run.setLastDocDocumentId(outcome.documentId());
|
||||
run.setLastProjectionId(outcome.projectionId());
|
||||
run.setLastError(null);
|
||||
runRepository.save(run);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
protected void createCheckpoint(LegacyTedMigrationRun run, int batchNumber, int processedInBatch, String note) {
|
||||
checkpointRepository.save(LegacyTedMigrationCheckpoint.builder()
|
||||
.run(run)
|
||||
.batchNumber(batchNumber)
|
||||
.batchProcessedCount(processedInBatch)
|
||||
.cumulativeProcessedCount(run.getProcessedCount())
|
||||
.lastLegacyCreatedAt(run.getLastLegacyCreatedAt())
|
||||
.lastLegacyDocumentId(run.getLastLegacyDocumentId())
|
||||
.lastDocDocumentId(run.getLastDocDocumentId())
|
||||
.lastProjectionId(run.getLastProjectionId())
|
||||
.note(note)
|
||||
.build());
|
||||
}
|
||||
|
||||
@Transactional
|
||||
protected void markCompleted(LegacyTedMigrationRun run) {
|
||||
run.setStatus(LegacyTedMigrationRunStatus.COMPLETED);
|
||||
run.setCompletedAt(OffsetDateTime.now());
|
||||
run.setLastError(null);
|
||||
runRepository.save(run);
|
||||
log.info("Legacy TED backfill run {} completed successfully - processed {} documents",
|
||||
run.getId(), run.getProcessedCount());
|
||||
}
|
||||
|
||||
@Transactional
|
||||
protected void markStopped(LegacyTedMigrationRun run, String message) {
|
||||
run.setStatus(LegacyTedMigrationRunStatus.STOPPED);
|
||||
run.setLastError(message);
|
||||
runRepository.save(run);
|
||||
log.info("Legacy TED backfill run {} stopped after {} processed documents: {}",
|
||||
run.getId(), run.getProcessedCount(), message);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
protected void markFailed(LegacyTedMigrationRun run, LegacyTedMigrationCursor cursor, RuntimeException ex) {
|
||||
run.setStatus(LegacyTedMigrationRunStatus.FAILED);
|
||||
run.setFailedCount(run.getFailedCount() + 1);
|
||||
run.setLastError(ex.getMessage());
|
||||
runRepository.save(run);
|
||||
log.error("Legacy TED backfill run {} failed at legacy document {}: {}",
|
||||
run.getId(), cursor.getId(), ex.getMessage(), ex);
|
||||
}
|
||||
|
||||
private int effectiveBatchLimit(LegacyTedMigrationRun run, long processedInThisInvocation) {
|
||||
long maxPerRun = run.getMaxDocumentsPerRun() != null ? run.getMaxDocumentsPerRun() : 0L;
|
||||
if (maxPerRun <= 0) {
|
||||
return Math.max(1, run.getBatchSize());
|
||||
}
|
||||
long remaining = maxPerRun - processedInThisInvocation;
|
||||
if (remaining <= 0) {
|
||||
return 0;
|
||||
}
|
||||
return (int) Math.max(1L, Math.min(run.getBatchSize(), remaining));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,372 @@
|
||||
package at.procon.dip.migration.service;
|
||||
|
||||
import at.procon.dip.classification.spi.DetectionResult;
|
||||
import at.procon.dip.domain.document.ContentRole;
|
||||
import at.procon.dip.domain.document.DocumentFamily;
|
||||
import at.procon.dip.domain.document.DocumentStatus;
|
||||
import at.procon.dip.domain.document.DocumentType;
|
||||
import at.procon.dip.domain.document.RepresentationType;
|
||||
import at.procon.dip.domain.document.SourceType;
|
||||
import at.procon.dip.domain.document.StorageType;
|
||||
import at.procon.dip.domain.document.entity.Document;
|
||||
import at.procon.dip.domain.document.entity.DocumentContent;
|
||||
import at.procon.dip.domain.document.entity.DocumentSource;
|
||||
import at.procon.dip.domain.document.entity.DocumentTextRepresentation;
|
||||
import at.procon.dip.domain.document.repository.DocumentContentRepository;
|
||||
import at.procon.dip.domain.document.repository.DocumentSourceRepository;
|
||||
import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository;
|
||||
import at.procon.dip.domain.document.service.DocumentContentService;
|
||||
import at.procon.dip.domain.document.service.DocumentRepresentationService;
|
||||
import at.procon.dip.domain.document.service.DocumentService;
|
||||
import at.procon.dip.domain.document.service.command.AddDocumentContentCommand;
|
||||
import at.procon.dip.domain.document.service.command.AddDocumentSourceCommand;
|
||||
import at.procon.dip.domain.document.service.command.AddDocumentTextRepresentationCommand;
|
||||
import at.procon.dip.domain.ted.service.TedGenericDocumentRootService;
|
||||
import at.procon.dip.domain.ted.service.TedNoticeProjectionService;
|
||||
import at.procon.dip.embedding.config.EmbeddingProperties;
|
||||
import at.procon.dip.embedding.service.RepresentationEmbeddingOrchestrator;
|
||||
import at.procon.dip.extraction.spi.ExtractedStructuredPayload;
|
||||
import at.procon.dip.extraction.spi.ExtractionResult;
|
||||
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
|
||||
import at.procon.dip.ingestion.spi.SourceDescriptor;
|
||||
import at.procon.dip.normalization.service.TextRepresentationBuildService;
|
||||
import at.procon.dip.normalization.spi.RepresentationBuildRequest;
|
||||
import at.procon.dip.normalization.spi.TextRepresentationDraft;
|
||||
import at.procon.dip.search.service.DocumentLexicalIndexService;
|
||||
import at.procon.ted.model.entity.ProcurementDocument;
|
||||
import at.procon.ted.repository.ProcurementDocumentRepository;
|
||||
import at.procon.ted.util.HashUtils;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class LegacyTedBackfillWorker {
|
||||
|
||||
private static final String XML_MIME_TYPE = "application/xml";
|
||||
private static final String TEXT_MIME_TYPE = "text/plain";
|
||||
private static final String CHARSET_UTF8 = StandardCharsets.UTF_8.name();
|
||||
|
||||
private final ProcurementDocumentRepository procurementDocumentRepository;
|
||||
private final TedGenericDocumentRootService tedGenericDocumentRootService;
|
||||
private final TedNoticeProjectionService tedNoticeProjectionService;
|
||||
private final DocumentService documentService;
|
||||
private final DocumentSourceRepository sourceRepository;
|
||||
private final at.procon.dip.domain.document.service.DocumentSourceService sourceService;
|
||||
private final DocumentContentRepository contentRepository;
|
||||
private final DocumentContentService documentContentService;
|
||||
private final DocumentTextRepresentationRepository representationRepository;
|
||||
private final DocumentRepresentationService documentRepresentationService;
|
||||
private final DocumentLexicalIndexService lexicalIndexService;
|
||||
private final TextRepresentationBuildService textRepresentationBuildService;
|
||||
private final RepresentationEmbeddingOrchestrator embeddingOrchestrator;
|
||||
private final EmbeddingProperties embeddingProperties;
|
||||
|
||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||
public BackfillOutcome backfill(UUID legacyProcurementDocumentId, String importBatchId, boolean queueEmbeddings) {
|
||||
ProcurementDocument legacyDocument = procurementDocumentRepository.findById(legacyProcurementDocumentId)
|
||||
.orElseThrow(() -> new IllegalArgumentException("Unknown legacy TED procurement document id: " + legacyProcurementDocumentId));
|
||||
|
||||
Document document = tedGenericDocumentRootService.ensureGenericTedDocument(legacyDocument);
|
||||
ensureSource(document, legacyDocument, importBatchId);
|
||||
DocumentContent originalContent = ensureOriginalXmlContent(document, legacyDocument);
|
||||
DocumentContent normalizedTextContent = ensureNormalizedTextContent(document, legacyDocument);
|
||||
|
||||
List<TextRepresentationDraft> drafts = buildDrafts(legacyDocument);
|
||||
List<DocumentTextRepresentation> savedRepresentations = ensureRepresentations(document, originalContent, normalizedTextContent, drafts);
|
||||
|
||||
UUID projectionId = tedNoticeProjectionService.registerOrRefreshProjection(legacyDocument, document.getId());
|
||||
documentService.updateStatus(document.getId(), DocumentStatus.REPRESENTED);
|
||||
|
||||
if (queueEmbeddings) {
|
||||
queueEmbeddings(document.getId(), savedRepresentations);
|
||||
}
|
||||
|
||||
return new BackfillOutcome(document.getId(), projectionId);
|
||||
}
|
||||
|
||||
private void ensureSource(Document document, ProcurementDocument legacyDocument, String importBatchId) {
|
||||
String externalSourceId = legacyDocument.getId().toString();
|
||||
DocumentSource existing = sourceRepository.findByDocument_Id(document.getId()).stream()
|
||||
.filter(candidate -> candidate.getSourceType() == SourceType.MIGRATION || Objects.equals(candidate.getExternalSourceId(), externalSourceId))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
if (existing == null) {
|
||||
sourceService.addSource(new AddDocumentSourceCommand(
|
||||
document.getId(),
|
||||
SourceType.MIGRATION,
|
||||
externalSourceId,
|
||||
legacyDocument.getSourcePath(),
|
||||
legacyDocument.getSourceFilename(),
|
||||
null,
|
||||
importBatchId,
|
||||
legacyDocument.getCreatedAt() != null ? legacyDocument.getCreatedAt() : OffsetDateTime.now()
|
||||
));
|
||||
return;
|
||||
}
|
||||
|
||||
existing.setSourceType(SourceType.MIGRATION);
|
||||
existing.setExternalSourceId(externalSourceId);
|
||||
existing.setSourceUri(legacyDocument.getSourcePath());
|
||||
existing.setSourceFilename(legacyDocument.getSourceFilename());
|
||||
existing.setImportBatchId(importBatchId);
|
||||
existing.setReceivedAt(legacyDocument.getCreatedAt() != null ? legacyDocument.getCreatedAt() : existing.getReceivedAt());
|
||||
sourceRepository.save(existing);
|
||||
}
|
||||
|
||||
private DocumentContent ensureOriginalXmlContent(Document document, ProcurementDocument legacyDocument) {
|
||||
String xml = legacyDocument.getXmlDocument();
|
||||
long sizeBytes = legacyDocument.getFileSizeBytes() != null
|
||||
? legacyDocument.getFileSizeBytes()
|
||||
: (xml == null ? 0L : (long) xml.getBytes(StandardCharsets.UTF_8).length);
|
||||
return ensureContent(document, ContentRole.ORIGINAL, XML_MIME_TYPE, xml, legacyDocument.getDocumentHash(), sizeBytes);
|
||||
}
|
||||
|
||||
private DocumentContent ensureNormalizedTextContent(Document document, ProcurementDocument legacyDocument) {
|
||||
String normalizedText = resolveNormalizedText(legacyDocument);
|
||||
String contentHash = HashUtils.computeSha256(legacyDocument.getDocumentHash() + ":NORMALIZED_TEXT:" + normalizedText);
|
||||
return ensureContent(document, ContentRole.NORMALIZED_TEXT, TEXT_MIME_TYPE, normalizedText, contentHash, (long) normalizedText.length());
|
||||
}
|
||||
|
||||
private DocumentContent ensureContent(Document document,
|
||||
ContentRole role,
|
||||
String mimeType,
|
||||
String text,
|
||||
String contentHash,
|
||||
Long sizeBytes) {
|
||||
List<DocumentContent> existing = contentRepository.findByDocument_IdAndContentRole(document.getId(), role);
|
||||
if (!existing.isEmpty()) {
|
||||
DocumentContent content = existing.get(0);
|
||||
content.setStorageType(StorageType.DB_TEXT);
|
||||
content.setMimeType(mimeType);
|
||||
content.setCharsetName(CHARSET_UTF8);
|
||||
content.setTextContent(text);
|
||||
content.setBinaryContent(null);
|
||||
content.setBinaryRef(null);
|
||||
content.setContentHash(contentHash);
|
||||
content.setSizeBytes(sizeBytes);
|
||||
return contentRepository.save(content);
|
||||
}
|
||||
|
||||
return documentContentService.addContent(new AddDocumentContentCommand(
|
||||
document.getId(),
|
||||
role,
|
||||
StorageType.DB_TEXT,
|
||||
mimeType,
|
||||
CHARSET_UTF8,
|
||||
text,
|
||||
null,
|
||||
null,
|
||||
contentHash,
|
||||
sizeBytes
|
||||
));
|
||||
}
|
||||
|
||||
private List<TextRepresentationDraft> buildDrafts(ProcurementDocument legacyDocument) {
|
||||
String normalizedText = resolveNormalizedText(legacyDocument);
|
||||
|
||||
Map<ContentRole, String> derivedText = new LinkedHashMap<>();
|
||||
derivedText.put(ContentRole.NORMALIZED_TEXT, normalizedText);
|
||||
|
||||
SourceDescriptor sourceDescriptor = new SourceDescriptor(
|
||||
null,
|
||||
SourceType.MIGRATION,
|
||||
legacyDocument.getId().toString(),
|
||||
legacyDocument.getSourcePath(),
|
||||
legacyDocument.getSourceFilename(),
|
||||
XML_MIME_TYPE,
|
||||
null,
|
||||
legacyDocument.getXmlDocument(),
|
||||
legacyDocument.getCreatedAt(),
|
||||
OriginalContentStoragePolicy.DEFAULT,
|
||||
Map.of("legacyProcurementDocumentId", legacyDocument.getId().toString())
|
||||
);
|
||||
|
||||
DetectionResult detectionResult = new DetectionResult(
|
||||
DocumentType.TED_NOTICE,
|
||||
DocumentFamily.PROCUREMENT,
|
||||
XML_MIME_TYPE,
|
||||
legacyDocument.getLanguageCode(),
|
||||
Map.of()
|
||||
);
|
||||
|
||||
ExtractionResult extractionResult = new ExtractionResult(
|
||||
derivedText,
|
||||
List.of(new ExtractedStructuredPayload("ted-notice", buildStructuredAttributes(legacyDocument))),
|
||||
List.of()
|
||||
);
|
||||
|
||||
return textRepresentationBuildService.build(new RepresentationBuildRequest(
|
||||
sourceDescriptor,
|
||||
detectionResult,
|
||||
extractionResult
|
||||
));
|
||||
}
|
||||
|
||||
private List<DocumentTextRepresentation> ensureRepresentations(Document document,
|
||||
DocumentContent originalContent,
|
||||
DocumentContent normalizedTextContent,
|
||||
List<TextRepresentationDraft> drafts) {
|
||||
if (drafts == null || drafts.isEmpty()) {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
List<DocumentTextRepresentation> existing = new ArrayList<>(representationRepository.findByDocument_Id(document.getId()));
|
||||
existing.sort(Comparator.comparing(DocumentTextRepresentation::getCreatedAt, Comparator.nullsLast(Comparator.naturalOrder())));
|
||||
|
||||
List<DocumentTextRepresentation> saved = new ArrayList<>();
|
||||
for (TextRepresentationDraft draft : drafts) {
|
||||
if (!StringUtils.hasText(draft.textBody())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
DocumentTextRepresentation representation = findMatchingRepresentation(existing, draft)
|
||||
.orElseGet(DocumentTextRepresentation::new);
|
||||
|
||||
DocumentContent linkedContent = draft.sourceContentRole() == ContentRole.NORMALIZED_TEXT
|
||||
? normalizedTextContent
|
||||
: originalContent;
|
||||
|
||||
boolean isNew = representation.getId() == null;
|
||||
representation.setDocument(document);
|
||||
representation.setContent(linkedContent);
|
||||
representation.setRepresentationType(draft.representationType());
|
||||
representation.setBuilderKey(draft.builderKey());
|
||||
representation.setLanguageCode(draft.languageCode());
|
||||
representation.setTokenCount(draft.textBody().length());
|
||||
representation.setCharCount(draft.textBody().length());
|
||||
representation.setChunkIndex(draft.chunkIndex());
|
||||
representation.setChunkStartOffset(draft.chunkStartOffset());
|
||||
representation.setChunkEndOffset(draft.chunkEndOffset());
|
||||
representation.setPrimaryRepresentation(draft.primary());
|
||||
representation.setTextBody(draft.textBody());
|
||||
|
||||
DocumentTextRepresentation savedRepresentation;
|
||||
if (isNew) {
|
||||
savedRepresentation = documentRepresentationService.addRepresentation(new AddDocumentTextRepresentationCommand(
|
||||
document.getId(),
|
||||
linkedContent == null ? null : linkedContent.getId(),
|
||||
draft.representationType(),
|
||||
draft.builderKey(),
|
||||
draft.languageCode(),
|
||||
draft.textBody().length(),
|
||||
draft.chunkIndex(),
|
||||
draft.chunkStartOffset(),
|
||||
draft.chunkEndOffset(),
|
||||
draft.primary(),
|
||||
draft.textBody()
|
||||
));
|
||||
existing.add(savedRepresentation);
|
||||
} else {
|
||||
savedRepresentation = representationRepository.save(representation);
|
||||
lexicalIndexService.indexRepresentation(savedRepresentation.getId());
|
||||
}
|
||||
saved.add(savedRepresentation);
|
||||
}
|
||||
return saved;
|
||||
}
|
||||
|
||||
private java.util.Optional<DocumentTextRepresentation> findMatchingRepresentation(List<DocumentTextRepresentation> existing,
|
||||
TextRepresentationDraft draft) {
|
||||
return existing.stream()
|
||||
.filter(candidate -> candidate.getRepresentationType() == draft.representationType())
|
||||
.filter(candidate -> Objects.equals(candidate.getChunkIndex(), draft.chunkIndex()))
|
||||
.filter(candidate -> Objects.equals(candidate.isPrimaryRepresentation(), draft.primary()))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
private void queueEmbeddings(UUID documentId, List<DocumentTextRepresentation> representations) {
|
||||
if (!embeddingProperties.isEnabled() || !StringUtils.hasText(embeddingProperties.getDefaultDocumentModel())) {
|
||||
log.debug("Skipping embedding queue for migrated document {} because no default document model is configured", documentId);
|
||||
return;
|
||||
}
|
||||
|
||||
for (DocumentTextRepresentation representation : representations) {
|
||||
RepresentationType type = representation.getRepresentationType();
|
||||
boolean queue = switch (type) {
|
||||
case SEMANTIC_TEXT -> true;
|
||||
case CHUNK -> true;
|
||||
default -> false;
|
||||
};
|
||||
if (!queue) {
|
||||
continue;
|
||||
}
|
||||
embeddingOrchestrator.enqueueRepresentation(documentId, representation.getId(), embeddingProperties.getDefaultDocumentModel());
|
||||
}
|
||||
}
|
||||
|
||||
private String resolveNormalizedText(ProcurementDocument legacyDocument) {
|
||||
if (StringUtils.hasText(legacyDocument.getTextContent())) {
|
||||
return legacyDocument.getTextContent().trim();
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
appendLine(sb, legacyDocument.getProjectTitle());
|
||||
appendBlank(sb, legacyDocument.getProjectDescription());
|
||||
appendLine(sb, legacyDocument.getBuyerName());
|
||||
appendLine(sb, joinArray(legacyDocument.getCpvCodes()));
|
||||
appendLine(sb, joinArray(legacyDocument.getNutsCodes()));
|
||||
String fallback = sb.toString().trim();
|
||||
return StringUtils.hasText(fallback) ? fallback : legacyDocument.getDocumentHash();
|
||||
}
|
||||
|
||||
private Map<String, Object> buildStructuredAttributes(ProcurementDocument legacyDocument) {
|
||||
Map<String, Object> attrs = new LinkedHashMap<>();
|
||||
putIfText(attrs, "title", legacyDocument.getProjectTitle());
|
||||
putIfText(attrs, "description", legacyDocument.getProjectDescription());
|
||||
putIfText(attrs, "buyerName", legacyDocument.getBuyerName());
|
||||
putIfText(attrs, "cpvCodes", joinArray(legacyDocument.getCpvCodes()));
|
||||
putIfText(attrs, "nutsCodes", joinArray(legacyDocument.getNutsCodes()));
|
||||
putIfText(attrs, "publicationId", legacyDocument.getPublicationId());
|
||||
return attrs;
|
||||
}
|
||||
|
||||
private void putIfText(Map<String, Object> target, String key, String value) {
|
||||
if (StringUtils.hasText(value)) {
|
||||
target.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
private String joinArray(String[] values) {
|
||||
if (values == null || values.length == 0) {
|
||||
return null;
|
||||
}
|
||||
return String.join(", ", values);
|
||||
}
|
||||
|
||||
private void appendLine(StringBuilder sb, String value) {
|
||||
if (StringUtils.hasText(value)) {
|
||||
if (sb.length() > 0) {
|
||||
sb.append('\n');
|
||||
}
|
||||
sb.append(value.trim());
|
||||
}
|
||||
}
|
||||
|
||||
private void appendBlank(StringBuilder sb, String value) {
|
||||
if (StringUtils.hasText(value)) {
|
||||
if (sb.length() > 0) {
|
||||
sb.append("\n\n");
|
||||
}
|
||||
sb.append(value.trim());
|
||||
}
|
||||
}
|
||||
|
||||
public record BackfillOutcome(UUID documentId, UUID projectionId) {
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,31 @@
|
||||
package at.procon.dip.migration.startup;
|
||||
|
||||
import at.procon.dip.migration.config.LegacyTedBackfillProperties;
|
||||
import at.procon.dip.migration.service.LegacyTedBackfillMigrationService;
|
||||
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
||||
import at.procon.dip.runtime.config.RuntimeMode;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ConditionalOnRuntimeMode(RuntimeMode.NEW)
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class LegacyTedBackfillStartupRunner implements ApplicationRunner {
|
||||
|
||||
private final LegacyTedBackfillProperties properties;
|
||||
private final LegacyTedBackfillMigrationService migrationService;
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
if (!properties.isEnabled() || !properties.isStartupEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Startup-triggered legacy TED backfill is enabled");
|
||||
migrationService.runBackfill();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
package at.procon.ted.repository;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.UUID;
|
||||
|
||||
public interface LegacyTedMigrationCursor {
|
||||
UUID getId();
|
||||
OffsetDateTime getCreatedAt();
|
||||
}
|
||||
@ -0,0 +1,21 @@
|
||||
-- Store the TED daily package identifier directly on the Phase 3 TED notice projection.
|
||||
-- This makes migration, audit, and repair flows package-aware without having to derive the
|
||||
-- package membership from source paths at query time.
|
||||
|
||||
SET search_path TO TED, DOC, public;
|
||||
|
||||
ALTER TABLE IF EXISTS TED.ted_notice_projection
|
||||
ADD COLUMN IF NOT EXISTS package_identifier VARCHAR(20);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_ted_notice_projection_package_identifier
|
||||
ON TED.ted_notice_projection(package_identifier);
|
||||
|
||||
-- Backfill from the linked legacy TED document's source path when the path contains the
|
||||
-- extracted package directory, e.g. .../extracted/202600003/123.xml
|
||||
UPDATE TED.ted_notice_projection projection
|
||||
SET package_identifier = substring(legacy.source_path from '(?:^|[\\/])((?:19|20)[0-9]{7})(?:[\\/]|$)')
|
||||
FROM TED.procurement_document legacy
|
||||
WHERE projection.legacy_procurement_document_id = legacy.id
|
||||
AND projection.package_identifier IS NULL
|
||||
AND legacy.source_path IS NOT NULL
|
||||
AND substring(legacy.source_path from '(?:^|[\\/])((?:19|20)[0-9]{7})(?:[\\/]|$)') IS NOT NULL;
|
||||
@ -0,0 +1,72 @@
|
||||
SET search_path TO TED, DOC, public;
|
||||
|
||||
ALTER TABLE IF EXISTS TED.ted_notice_projection
|
||||
ADD COLUMN IF NOT EXISTS package_identifier VARCHAR(32);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_ted_notice_projection_package_identifier
|
||||
ON TED.ted_notice_projection(package_identifier);
|
||||
|
||||
ALTER TABLE IF EXISTS TED.organization
|
||||
ALTER COLUMN city TYPE TEXT;
|
||||
|
||||
ALTER TABLE IF EXISTS TED.procurement_document
|
||||
ALTER COLUMN buyer_city TYPE TEXT;
|
||||
|
||||
ALTER TABLE IF EXISTS TED.ted_notice_organization
|
||||
ALTER COLUMN city TYPE TEXT;
|
||||
|
||||
ALTER TABLE IF EXISTS TED.ted_notice_projection
|
||||
ALTER COLUMN buyer_city TYPE TEXT;
|
||||
|
||||
UPDATE TED.ted_notice_projection p
|
||||
SET package_identifier = substring(coalesce(d.source_path, d.source_filename) from '(20[0-9]{2}[0-9]{5})')
|
||||
FROM TED.procurement_document d
|
||||
WHERE p.legacy_procurement_document_id = d.id
|
||||
AND p.package_identifier IS NULL
|
||||
AND substring(coalesce(d.source_path, d.source_filename) from '(20[0-9]{2}[0-9]{5})') IS NOT NULL;
|
||||
|
||||
SET search_path TO DOC, public;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS DOC.doc_legacy_ted_migration_run (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
status VARCHAR(32) NOT NULL,
|
||||
import_batch_id VARCHAR(255),
|
||||
queue_embeddings BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
batch_size INTEGER NOT NULL DEFAULT 100,
|
||||
max_documents_per_run BIGINT,
|
||||
processed_count BIGINT NOT NULL DEFAULT 0,
|
||||
success_count BIGINT NOT NULL DEFAULT 0,
|
||||
failed_count BIGINT NOT NULL DEFAULT 0,
|
||||
last_legacy_created_at TIMESTAMP WITH TIME ZONE,
|
||||
last_legacy_document_id UUID,
|
||||
last_doc_document_id UUID,
|
||||
last_projection_id UUID,
|
||||
last_error TEXT,
|
||||
started_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
completed_at TIMESTAMP WITH TIME ZONE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_doc_legacy_ted_mig_run_status
|
||||
ON DOC.doc_legacy_ted_migration_run(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_doc_legacy_ted_mig_run_started
|
||||
ON DOC.doc_legacy_ted_migration_run(started_at DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS DOC.doc_legacy_ted_migration_checkpoint (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
run_id UUID NOT NULL REFERENCES DOC.doc_legacy_ted_migration_run(id) ON DELETE CASCADE,
|
||||
batch_number INTEGER NOT NULL,
|
||||
batch_processed_count INTEGER NOT NULL DEFAULT 0,
|
||||
cumulative_processed_count BIGINT NOT NULL DEFAULT 0,
|
||||
last_legacy_created_at TIMESTAMP WITH TIME ZONE,
|
||||
last_legacy_document_id UUID,
|
||||
last_doc_document_id UUID,
|
||||
last_projection_id UUID,
|
||||
note TEXT,
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_doc_legacy_ted_mig_ckpt_run
|
||||
ON DOC.doc_legacy_ted_migration_checkpoint(run_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_doc_legacy_ted_mig_ckpt_batch
|
||||
ON DOC.doc_legacy_ted_migration_checkpoint(batch_number);
|
||||
Loading…
Reference in New Issue