From 61d163f8fe44f069414a5fe126b84ba2dd43fdd0 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 13 Apr 2026 16:28:20 +0200 Subject: [PATCH] ted legacy data migration --- ...cumentIntelligencePlatformApplication.java | 4 +- .../ted/entity/TedNoticeProjection.java | 6 +- .../TedNoticeProjectionRepository.java | 4 +- .../service/TedNoticeProjectionService.java | 27 ++ .../config/LegacyTedBackfillProperties.java | 35 ++ .../entity/LegacyTedMigrationCheckpoint.java | 75 ++++ .../entity/LegacyTedMigrationRun.java | 103 +++++ .../entity/LegacyTedMigrationRunStatus.java | 8 + ...egacyTedMigrationCheckpointRepository.java | 10 + .../LegacyTedMigrationRunRepository.java | 23 ++ .../LegacyTedBackfillMigrationService.java | 187 +++++++++ .../service/LegacyTedBackfillWorker.java | 372 ++++++++++++++++++ .../LegacyTedBackfillStartupRunner.java | 31 ++ .../repository/LegacyTedMigrationCursor.java | 9 + .../ProcurementDocumentRepository.java | 17 + src/main/resources/application-legacy.yml | 6 +- src/main/resources/application-new.yml | 39 +- src/main/resources/application.yml | 8 +- ..._ted_projection_add_package_identifier.sql | 21 + ...projection_package_and_legacy_backfill.sql | 72 ++++ 20 files changed, 1033 insertions(+), 24 deletions(-) create mode 100644 src/main/java/at/procon/dip/migration/config/LegacyTedBackfillProperties.java create mode 100644 src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationCheckpoint.java create mode 100644 src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationRun.java create mode 100644 src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationRunStatus.java create mode 100644 src/main/java/at/procon/dip/migration/repository/LegacyTedMigrationCheckpointRepository.java create mode 100644 src/main/java/at/procon/dip/migration/repository/LegacyTedMigrationRunRepository.java create mode 100644 src/main/java/at/procon/dip/migration/service/LegacyTedBackfillMigrationService.java create mode 100644 src/main/java/at/procon/dip/migration/service/LegacyTedBackfillWorker.java create mode 100644 src/main/java/at/procon/dip/migration/startup/LegacyTedBackfillStartupRunner.java create mode 100644 src/main/java/at/procon/ted/repository/LegacyTedMigrationCursor.java create mode 100644 src/main/resources/db/migration/V12__ted_projection_add_package_identifier.sql create mode 100644 src/main/resources/db/migration/V20__ted_projection_package_and_legacy_backfill.sql diff --git a/src/main/java/at/procon/dip/DocumentIntelligencePlatformApplication.java b/src/main/java/at/procon/dip/DocumentIntelligencePlatformApplication.java index 1a475f7..f45bbce 100644 --- a/src/main/java/at/procon/dip/DocumentIntelligencePlatformApplication.java +++ b/src/main/java/at/procon/dip/DocumentIntelligencePlatformApplication.java @@ -16,8 +16,8 @@ import org.springframework.scheduling.annotation.EnableAsync; */ @SpringBootApplication(scanBasePackages = {"at.procon.dip", "at.procon.ted"}) @EnableAsync -@EntityScan(basePackages = {"at.procon.ted.model.entity", "at.procon.dip.domain.document.entity", "at.procon.dip.domain.tenant.entity", "at.procon.dip.domain.ted.entity", "at.procon.dip.embedding.job.entity", "at.procon.dip.migration.audit.entity"}) -@EnableJpaRepositories(basePackages = {"at.procon.ted.repository", "at.procon.dip.domain.document.repository", "at.procon.dip.domain.tenant.repository", "at.procon.dip.domain.ted.repository", "at.procon.dip.embedding.job.repository", "at.procon.dip.migration.audit.repository"}) +@EntityScan(basePackages = {"at.procon.ted.model.entity", "at.procon.dip.domain.document.entity", "at.procon.dip.domain.tenant.entity", "at.procon.dip.domain.ted.entity", "at.procon.dip.embedding.job.entity", "at.procon.dip.migration.audit.entity", "at.procon.dip.migration.entity"}) +@EnableJpaRepositories(basePackages = {"at.procon.ted.repository", "at.procon.dip.domain.document.repository", "at.procon.dip.domain.tenant.repository", "at.procon.dip.domain.ted.repository", "at.procon.dip.embedding.job.repository", "at.procon.dip.migration.audit.repository", "at.procon.dip.migration.repository"}) public class DocumentIntelligencePlatformApplication { public static void main(String[] args) { diff --git a/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeProjection.java b/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeProjection.java index 704b5b9..e7d83e9 100644 --- a/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeProjection.java +++ b/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeProjection.java @@ -38,6 +38,7 @@ import org.hibernate.type.SqlTypes; @Table(schema = SchemaNames.TED, name = "ted_notice_projection", indexes = { @Index(name = "idx_ted_proj_document", columnList = "document_id"), @Index(name = "idx_ted_proj_legacy_doc", columnList = "legacy_procurement_document_id"), + @Index(name = "idx_ted_proj_package_identifier", columnList = "package_identifier"), @Index(name = "idx_ted_proj_publication_id", columnList = "publication_id"), @Index(name = "idx_ted_proj_notice_type", columnList = "notice_type"), @Index(name = "idx_ted_proj_buyer_country", columnList = "buyer_country_code"), @@ -61,13 +62,16 @@ public class TedNoticeProjection { @Column(name = "legacy_procurement_document_id", unique = true) private UUID legacyProcurementDocumentId; + @Column(name = "package_identifier", length = 32) + private String packageIdentifier; + @Column(name = "notice_id", length = 100) private String noticeId; @Column(name = "publication_id", length = 50) private String publicationId; - @Column(name = "notice_url", length = 255) + @Column(name = "notice_url", columnDefinition = "TEXT") private String noticeUrl; @Column(name = "ojs_id", length = 20) diff --git a/src/main/java/at/procon/dip/domain/ted/repository/TedNoticeProjectionRepository.java b/src/main/java/at/procon/dip/domain/ted/repository/TedNoticeProjectionRepository.java index dcc16cc..af2c517 100644 --- a/src/main/java/at/procon/dip/domain/ted/repository/TedNoticeProjectionRepository.java +++ b/src/main/java/at/procon/dip/domain/ted/repository/TedNoticeProjectionRepository.java @@ -9,7 +9,9 @@ public interface TedNoticeProjectionRepository extends JpaRepository findByDocument_Id(UUID documentId); - Optional findByLegacyProcurementDocumentId(UUID legacyProcurementDocumentId); + Optional findByLegacyProcurementDocumentId(UUID legacyProcurementDocumentId); + + Optional findByPackageIdentifierAndPublicationId(String packageIdentifier, String publicationId); boolean existsByLegacyProcurementDocumentId(UUID legacyProcurementDocumentId); } diff --git a/src/main/java/at/procon/dip/domain/ted/service/TedNoticeProjectionService.java b/src/main/java/at/procon/dip/domain/ted/service/TedNoticeProjectionService.java index 0172c73..534eabb 100644 --- a/src/main/java/at/procon/dip/domain/ted/service/TedNoticeProjectionService.java +++ b/src/main/java/at/procon/dip/domain/ted/service/TedNoticeProjectionService.java @@ -17,6 +17,8 @@ import at.procon.ted.model.entity.ProcurementLot; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -31,6 +33,8 @@ import org.springframework.transaction.annotation.Transactional; @Slf4j public class TedNoticeProjectionService { + private static final Pattern PACKAGE_IDENTIFIER_PATTERN = Pattern.compile("(? DOC/projection backfill subsystem. */ + private boolean enabled = false; + + /** Run the backfill automatically on application startup in NEW runtime. */ + private boolean startupEnabled = false; + + /** Number of legacy TED documents to process per fetch batch. */ + private int batchSize = 100; + + /** + * Optional cap for a single run. 0 or negative means unlimited until the full legacy set is backfilled. + * Useful to migrate incrementally in controlled slices. + */ + private long maxDocumentsPerRun = 0; + + /** Resume the latest STOPPED/FAILED run from its persisted cursor. */ + private boolean resumeLatestIncompleteRun = true; + + /** Import batch id written to DOC.doc_source rows created by the migration. */ + private String importBatchId = "legacy-ted-backfill"; + + /** Queue embeddings for migrated TED representations after the DOC/projection backfill. */ + private boolean queueEmbeddings = false; +} diff --git a/src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationCheckpoint.java b/src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationCheckpoint.java new file mode 100644 index 0000000..943bb88 --- /dev/null +++ b/src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationCheckpoint.java @@ -0,0 +1,75 @@ +package at.procon.dip.migration.entity; + +import at.procon.dip.architecture.SchemaNames; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.FetchType; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Index; +import jakarta.persistence.JoinColumn; +import jakarta.persistence.ManyToOne; +import jakarta.persistence.PrePersist; +import jakarta.persistence.Table; +import java.time.OffsetDateTime; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Entity +@Table(schema = SchemaNames.DOC, name = "doc_legacy_ted_migration_checkpoint", indexes = { + @Index(name = "idx_doc_legacy_ted_mig_ckpt_run", columnList = "run_id"), + @Index(name = "idx_doc_legacy_ted_mig_ckpt_batch", columnList = "batch_number") +}) +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class LegacyTedMigrationCheckpoint { + + @Id + @GeneratedValue(strategy = GenerationType.UUID) + private UUID id; + + @ManyToOne(fetch = FetchType.LAZY, optional = false) + @JoinColumn(name = "run_id", nullable = false) + private LegacyTedMigrationRun run; + + @Column(name = "batch_number", nullable = false) + private int batchNumber; + + @Column(name = "batch_processed_count", nullable = false) + private int batchProcessedCount; + + @Column(name = "cumulative_processed_count", nullable = false) + private long cumulativeProcessedCount; + + @Column(name = "last_legacy_created_at") + private OffsetDateTime lastLegacyCreatedAt; + + @Column(name = "last_legacy_document_id") + private UUID lastLegacyDocumentId; + + @Column(name = "last_doc_document_id") + private UUID lastDocDocumentId; + + @Column(name = "last_projection_id") + private UUID lastProjectionId; + + @Column(name = "note", columnDefinition = "TEXT") + private String note; + + @Builder.Default + @Column(name = "created_at", nullable = false, updatable = false) + private OffsetDateTime createdAt = OffsetDateTime.now(); + + @PrePersist + protected void onCreate() { + createdAt = OffsetDateTime.now(); + } +} diff --git a/src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationRun.java b/src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationRun.java new file mode 100644 index 0000000..4b25771 --- /dev/null +++ b/src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationRun.java @@ -0,0 +1,103 @@ +package at.procon.dip.migration.entity; + +import at.procon.dip.architecture.SchemaNames; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Index; +import jakarta.persistence.PrePersist; +import jakarta.persistence.PreUpdate; +import jakarta.persistence.Table; +import java.time.OffsetDateTime; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Entity +@Table(schema = SchemaNames.DOC, name = "doc_legacy_ted_migration_run", indexes = { + @Index(name = "idx_doc_legacy_ted_mig_run_status", columnList = "status"), + @Index(name = "idx_doc_legacy_ted_mig_run_started", columnList = "started_at DESC") +}) +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class LegacyTedMigrationRun { + + @Id + @GeneratedValue(strategy = GenerationType.UUID) + private UUID id; + + @Enumerated(EnumType.STRING) + @Column(name = "status", nullable = false, length = 32) + private LegacyTedMigrationRunStatus status; + + @Column(name = "import_batch_id", length = 255) + private String importBatchId; + + @Column(name = "queue_embeddings", nullable = false) + private boolean queueEmbeddings; + + @Column(name = "batch_size", nullable = false) + private int batchSize; + + @Column(name = "max_documents_per_run") + private Long maxDocumentsPerRun; + + @Column(name = "processed_count", nullable = false) + @Builder.Default + private long processedCount = 0; + + @Column(name = "success_count", nullable = false) + @Builder.Default + private long successCount = 0; + + @Column(name = "failed_count", nullable = false) + @Builder.Default + private long failedCount = 0; + + @Column(name = "last_legacy_created_at") + private OffsetDateTime lastLegacyCreatedAt; + + @Column(name = "last_legacy_document_id") + private UUID lastLegacyDocumentId; + + @Column(name = "last_doc_document_id") + private UUID lastDocDocumentId; + + @Column(name = "last_projection_id") + private UUID lastProjectionId; + + @Column(name = "last_error", columnDefinition = "TEXT") + private String lastError; + + @Builder.Default + @Column(name = "started_at", nullable = false, updatable = false) + private OffsetDateTime startedAt = OffsetDateTime.now(); + + @Builder.Default + @Column(name = "updated_at", nullable = false) + private OffsetDateTime updatedAt = OffsetDateTime.now(); + + @Column(name = "completed_at") + private OffsetDateTime completedAt; + + @PrePersist + protected void onCreate() { + startedAt = startedAt == null ? OffsetDateTime.now() : startedAt; + updatedAt = OffsetDateTime.now(); + } + + @PreUpdate + protected void onUpdate() { + updatedAt = OffsetDateTime.now(); + } +} diff --git a/src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationRunStatus.java b/src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationRunStatus.java new file mode 100644 index 0000000..2e9f938 --- /dev/null +++ b/src/main/java/at/procon/dip/migration/entity/LegacyTedMigrationRunStatus.java @@ -0,0 +1,8 @@ +package at.procon.dip.migration.entity; + +public enum LegacyTedMigrationRunStatus { + RUNNING, + STOPPED, + COMPLETED, + FAILED +} diff --git a/src/main/java/at/procon/dip/migration/repository/LegacyTedMigrationCheckpointRepository.java b/src/main/java/at/procon/dip/migration/repository/LegacyTedMigrationCheckpointRepository.java new file mode 100644 index 0000000..a84b6e4 --- /dev/null +++ b/src/main/java/at/procon/dip/migration/repository/LegacyTedMigrationCheckpointRepository.java @@ -0,0 +1,10 @@ +package at.procon.dip.migration.repository; + +import at.procon.dip.migration.entity.LegacyTedMigrationCheckpoint; +import java.util.UUID; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface LegacyTedMigrationCheckpointRepository extends JpaRepository { + + long countByRun_Id(UUID runId); +} diff --git a/src/main/java/at/procon/dip/migration/repository/LegacyTedMigrationRunRepository.java b/src/main/java/at/procon/dip/migration/repository/LegacyTedMigrationRunRepository.java new file mode 100644 index 0000000..d361bab --- /dev/null +++ b/src/main/java/at/procon/dip/migration/repository/LegacyTedMigrationRunRepository.java @@ -0,0 +1,23 @@ +package at.procon.dip.migration.repository; + +import at.procon.dip.migration.entity.LegacyTedMigrationRun; +import at.procon.dip.migration.entity.LegacyTedMigrationRunStatus; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +public interface LegacyTedMigrationRunRepository extends JpaRepository { + + @Query(""" + select r + from LegacyTedMigrationRun r + where r.status in :statuses + order by r.startedAt desc + """) + List findLatestByStatuses(@Param("statuses") Collection statuses, + Pageable pageable); +} diff --git a/src/main/java/at/procon/dip/migration/service/LegacyTedBackfillMigrationService.java b/src/main/java/at/procon/dip/migration/service/LegacyTedBackfillMigrationService.java new file mode 100644 index 0000000..ea2c985 --- /dev/null +++ b/src/main/java/at/procon/dip/migration/service/LegacyTedBackfillMigrationService.java @@ -0,0 +1,187 @@ +package at.procon.dip.migration.service; + +import at.procon.dip.migration.config.LegacyTedBackfillProperties; +import at.procon.dip.migration.entity.LegacyTedMigrationCheckpoint; +import at.procon.dip.migration.entity.LegacyTedMigrationRun; +import at.procon.dip.migration.entity.LegacyTedMigrationRunStatus; +import at.procon.dip.migration.repository.LegacyTedMigrationCheckpointRepository; +import at.procon.dip.migration.repository.LegacyTedMigrationRunRepository; +import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; +import at.procon.dip.runtime.config.RuntimeMode; +import at.procon.ted.repository.LegacyTedMigrationCursor; +import at.procon.ted.repository.ProcurementDocumentRepository; +import java.time.OffsetDateTime; +import java.util.EnumSet; +import java.util.List; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Service +@ConditionalOnRuntimeMode(RuntimeMode.NEW) +@RequiredArgsConstructor +@Slf4j +public class LegacyTedBackfillMigrationService { + + private final LegacyTedBackfillProperties properties; + private final ProcurementDocumentRepository procurementDocumentRepository; + private final LegacyTedMigrationRunRepository runRepository; + private final LegacyTedMigrationCheckpointRepository checkpointRepository; + private final LegacyTedBackfillWorker worker; + + public UUID runBackfill() { + if (!properties.isEnabled()) { + log.info("Legacy TED backfill is disabled"); + return null; + } + + LegacyTedMigrationRun run = resolveRun(); + log.info("Starting legacy TED -> DOC/projection backfill run {} (batchSize={}, maxDocumentsPerRun={}, queueEmbeddings={})", + run.getId(), run.getBatchSize(), run.getMaxDocumentsPerRun(), run.isQueueEmbeddings()); + + long existingCheckpointCount = checkpointRepository.countByRun_Id(run.getId()); + int batchNumber = existingCheckpointCount >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) existingCheckpointCount; + long processedInThisInvocation = 0; + + while (true) { + int limit = effectiveBatchLimit(run, processedInThisInvocation); + if (limit <= 0) { + markStopped(run, "Stopped because maxDocumentsPerRun was reached"); + createCheckpoint(run, ++batchNumber, 0, "Invocation limit reached"); + return run.getId(); + } + + List batch = procurementDocumentRepository.findNextMigrationBatch( + run.getLastLegacyCreatedAt(), + run.getLastLegacyDocumentId(), + limit + ); + + if (batch.isEmpty()) { + markCompleted(run); + createCheckpoint(run, ++batchNumber, 0, "Backfill completed - no more legacy TED documents after current cursor"); + return run.getId(); + } + + int processedInBatch = 0; + for (LegacyTedMigrationCursor cursor : batch) { + try { + LegacyTedBackfillWorker.BackfillOutcome outcome = worker.backfill( + cursor.getId(), + run.getImportBatchId(), + run.isQueueEmbeddings() + ); + advanceRun(run, cursor, outcome); + processedInBatch++; + processedInThisInvocation++; + } catch (RuntimeException ex) { + markFailed(run, cursor, ex); + createCheckpoint(run, ++batchNumber, processedInBatch, + "Failed at legacy document %s: %s".formatted(cursor.getId(), ex.getMessage())); + throw ex; + } + } + createCheckpoint(run, ++batchNumber, processedInBatch, + "Processed %d legacy TED documents in batch".formatted(processedInBatch)); + } + } + + @Transactional + protected LegacyTedMigrationRun resolveRun() { + if (properties.isResumeLatestIncompleteRun()) { + List resumable = runRepository.findLatestByStatuses( + EnumSet.of(LegacyTedMigrationRunStatus.RUNNING, LegacyTedMigrationRunStatus.STOPPED, LegacyTedMigrationRunStatus.FAILED), + PageRequest.of(0, 1) + ); + if (!resumable.isEmpty()) { + LegacyTedMigrationRun run = resumable.get(0); + run.setStatus(LegacyTedMigrationRunStatus.RUNNING); + run.setLastError(null); + run.setCompletedAt(null); + return runRepository.save(run); + } + } + + return runRepository.save(LegacyTedMigrationRun.builder() + .status(LegacyTedMigrationRunStatus.RUNNING) + .importBatchId(properties.getImportBatchId()) + .queueEmbeddings(properties.isQueueEmbeddings()) + .batchSize(Math.max(1, properties.getBatchSize())) + .maxDocumentsPerRun(properties.getMaxDocumentsPerRun() > 0 ? properties.getMaxDocumentsPerRun() : null) + .build()); + } + + @Transactional + protected void advanceRun(LegacyTedMigrationRun run, + LegacyTedMigrationCursor cursor, + LegacyTedBackfillWorker.BackfillOutcome outcome) { + run.setStatus(LegacyTedMigrationRunStatus.RUNNING); + run.setProcessedCount(run.getProcessedCount() + 1); + run.setSuccessCount(run.getSuccessCount() + 1); + run.setLastLegacyCreatedAt(cursor.getCreatedAt()); + run.setLastLegacyDocumentId(cursor.getId()); + run.setLastDocDocumentId(outcome.documentId()); + run.setLastProjectionId(outcome.projectionId()); + run.setLastError(null); + runRepository.save(run); + } + + @Transactional + protected void createCheckpoint(LegacyTedMigrationRun run, int batchNumber, int processedInBatch, String note) { + checkpointRepository.save(LegacyTedMigrationCheckpoint.builder() + .run(run) + .batchNumber(batchNumber) + .batchProcessedCount(processedInBatch) + .cumulativeProcessedCount(run.getProcessedCount()) + .lastLegacyCreatedAt(run.getLastLegacyCreatedAt()) + .lastLegacyDocumentId(run.getLastLegacyDocumentId()) + .lastDocDocumentId(run.getLastDocDocumentId()) + .lastProjectionId(run.getLastProjectionId()) + .note(note) + .build()); + } + + @Transactional + protected void markCompleted(LegacyTedMigrationRun run) { + run.setStatus(LegacyTedMigrationRunStatus.COMPLETED); + run.setCompletedAt(OffsetDateTime.now()); + run.setLastError(null); + runRepository.save(run); + log.info("Legacy TED backfill run {} completed successfully - processed {} documents", + run.getId(), run.getProcessedCount()); + } + + @Transactional + protected void markStopped(LegacyTedMigrationRun run, String message) { + run.setStatus(LegacyTedMigrationRunStatus.STOPPED); + run.setLastError(message); + runRepository.save(run); + log.info("Legacy TED backfill run {} stopped after {} processed documents: {}", + run.getId(), run.getProcessedCount(), message); + } + + @Transactional + protected void markFailed(LegacyTedMigrationRun run, LegacyTedMigrationCursor cursor, RuntimeException ex) { + run.setStatus(LegacyTedMigrationRunStatus.FAILED); + run.setFailedCount(run.getFailedCount() + 1); + run.setLastError(ex.getMessage()); + runRepository.save(run); + log.error("Legacy TED backfill run {} failed at legacy document {}: {}", + run.getId(), cursor.getId(), ex.getMessage(), ex); + } + + private int effectiveBatchLimit(LegacyTedMigrationRun run, long processedInThisInvocation) { + long maxPerRun = run.getMaxDocumentsPerRun() != null ? run.getMaxDocumentsPerRun() : 0L; + if (maxPerRun <= 0) { + return Math.max(1, run.getBatchSize()); + } + long remaining = maxPerRun - processedInThisInvocation; + if (remaining <= 0) { + return 0; + } + return (int) Math.max(1L, Math.min(run.getBatchSize(), remaining)); + } +} diff --git a/src/main/java/at/procon/dip/migration/service/LegacyTedBackfillWorker.java b/src/main/java/at/procon/dip/migration/service/LegacyTedBackfillWorker.java new file mode 100644 index 0000000..bf99ea9 --- /dev/null +++ b/src/main/java/at/procon/dip/migration/service/LegacyTedBackfillWorker.java @@ -0,0 +1,372 @@ +package at.procon.dip.migration.service; + +import at.procon.dip.classification.spi.DetectionResult; +import at.procon.dip.domain.document.ContentRole; +import at.procon.dip.domain.document.DocumentFamily; +import at.procon.dip.domain.document.DocumentStatus; +import at.procon.dip.domain.document.DocumentType; +import at.procon.dip.domain.document.RepresentationType; +import at.procon.dip.domain.document.SourceType; +import at.procon.dip.domain.document.StorageType; +import at.procon.dip.domain.document.entity.Document; +import at.procon.dip.domain.document.entity.DocumentContent; +import at.procon.dip.domain.document.entity.DocumentSource; +import at.procon.dip.domain.document.entity.DocumentTextRepresentation; +import at.procon.dip.domain.document.repository.DocumentContentRepository; +import at.procon.dip.domain.document.repository.DocumentSourceRepository; +import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository; +import at.procon.dip.domain.document.service.DocumentContentService; +import at.procon.dip.domain.document.service.DocumentRepresentationService; +import at.procon.dip.domain.document.service.DocumentService; +import at.procon.dip.domain.document.service.command.AddDocumentContentCommand; +import at.procon.dip.domain.document.service.command.AddDocumentSourceCommand; +import at.procon.dip.domain.document.service.command.AddDocumentTextRepresentationCommand; +import at.procon.dip.domain.ted.service.TedGenericDocumentRootService; +import at.procon.dip.domain.ted.service.TedNoticeProjectionService; +import at.procon.dip.embedding.config.EmbeddingProperties; +import at.procon.dip.embedding.service.RepresentationEmbeddingOrchestrator; +import at.procon.dip.extraction.spi.ExtractedStructuredPayload; +import at.procon.dip.extraction.spi.ExtractionResult; +import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy; +import at.procon.dip.ingestion.spi.SourceDescriptor; +import at.procon.dip.normalization.service.TextRepresentationBuildService; +import at.procon.dip.normalization.spi.RepresentationBuildRequest; +import at.procon.dip.normalization.spi.TextRepresentationDraft; +import at.procon.dip.search.service.DocumentLexicalIndexService; +import at.procon.ted.model.entity.ProcurementDocument; +import at.procon.ted.repository.ProcurementDocumentRepository; +import at.procon.ted.util.HashUtils; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.StringUtils; + +@Service +@RequiredArgsConstructor +@Slf4j +public class LegacyTedBackfillWorker { + + private static final String XML_MIME_TYPE = "application/xml"; + private static final String TEXT_MIME_TYPE = "text/plain"; + private static final String CHARSET_UTF8 = StandardCharsets.UTF_8.name(); + + private final ProcurementDocumentRepository procurementDocumentRepository; + private final TedGenericDocumentRootService tedGenericDocumentRootService; + private final TedNoticeProjectionService tedNoticeProjectionService; + private final DocumentService documentService; + private final DocumentSourceRepository sourceRepository; + private final at.procon.dip.domain.document.service.DocumentSourceService sourceService; + private final DocumentContentRepository contentRepository; + private final DocumentContentService documentContentService; + private final DocumentTextRepresentationRepository representationRepository; + private final DocumentRepresentationService documentRepresentationService; + private final DocumentLexicalIndexService lexicalIndexService; + private final TextRepresentationBuildService textRepresentationBuildService; + private final RepresentationEmbeddingOrchestrator embeddingOrchestrator; + private final EmbeddingProperties embeddingProperties; + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public BackfillOutcome backfill(UUID legacyProcurementDocumentId, String importBatchId, boolean queueEmbeddings) { + ProcurementDocument legacyDocument = procurementDocumentRepository.findById(legacyProcurementDocumentId) + .orElseThrow(() -> new IllegalArgumentException("Unknown legacy TED procurement document id: " + legacyProcurementDocumentId)); + + Document document = tedGenericDocumentRootService.ensureGenericTedDocument(legacyDocument); + ensureSource(document, legacyDocument, importBatchId); + DocumentContent originalContent = ensureOriginalXmlContent(document, legacyDocument); + DocumentContent normalizedTextContent = ensureNormalizedTextContent(document, legacyDocument); + + List drafts = buildDrafts(legacyDocument); + List savedRepresentations = ensureRepresentations(document, originalContent, normalizedTextContent, drafts); + + UUID projectionId = tedNoticeProjectionService.registerOrRefreshProjection(legacyDocument, document.getId()); + documentService.updateStatus(document.getId(), DocumentStatus.REPRESENTED); + + if (queueEmbeddings) { + queueEmbeddings(document.getId(), savedRepresentations); + } + + return new BackfillOutcome(document.getId(), projectionId); + } + + private void ensureSource(Document document, ProcurementDocument legacyDocument, String importBatchId) { + String externalSourceId = legacyDocument.getId().toString(); + DocumentSource existing = sourceRepository.findByDocument_Id(document.getId()).stream() + .filter(candidate -> candidate.getSourceType() == SourceType.MIGRATION || Objects.equals(candidate.getExternalSourceId(), externalSourceId)) + .findFirst() + .orElse(null); + if (existing == null) { + sourceService.addSource(new AddDocumentSourceCommand( + document.getId(), + SourceType.MIGRATION, + externalSourceId, + legacyDocument.getSourcePath(), + legacyDocument.getSourceFilename(), + null, + importBatchId, + legacyDocument.getCreatedAt() != null ? legacyDocument.getCreatedAt() : OffsetDateTime.now() + )); + return; + } + + existing.setSourceType(SourceType.MIGRATION); + existing.setExternalSourceId(externalSourceId); + existing.setSourceUri(legacyDocument.getSourcePath()); + existing.setSourceFilename(legacyDocument.getSourceFilename()); + existing.setImportBatchId(importBatchId); + existing.setReceivedAt(legacyDocument.getCreatedAt() != null ? legacyDocument.getCreatedAt() : existing.getReceivedAt()); + sourceRepository.save(existing); + } + + private DocumentContent ensureOriginalXmlContent(Document document, ProcurementDocument legacyDocument) { + String xml = legacyDocument.getXmlDocument(); + long sizeBytes = legacyDocument.getFileSizeBytes() != null + ? legacyDocument.getFileSizeBytes() + : (xml == null ? 0L : (long) xml.getBytes(StandardCharsets.UTF_8).length); + return ensureContent(document, ContentRole.ORIGINAL, XML_MIME_TYPE, xml, legacyDocument.getDocumentHash(), sizeBytes); + } + + private DocumentContent ensureNormalizedTextContent(Document document, ProcurementDocument legacyDocument) { + String normalizedText = resolveNormalizedText(legacyDocument); + String contentHash = HashUtils.computeSha256(legacyDocument.getDocumentHash() + ":NORMALIZED_TEXT:" + normalizedText); + return ensureContent(document, ContentRole.NORMALIZED_TEXT, TEXT_MIME_TYPE, normalizedText, contentHash, (long) normalizedText.length()); + } + + private DocumentContent ensureContent(Document document, + ContentRole role, + String mimeType, + String text, + String contentHash, + Long sizeBytes) { + List existing = contentRepository.findByDocument_IdAndContentRole(document.getId(), role); + if (!existing.isEmpty()) { + DocumentContent content = existing.get(0); + content.setStorageType(StorageType.DB_TEXT); + content.setMimeType(mimeType); + content.setCharsetName(CHARSET_UTF8); + content.setTextContent(text); + content.setBinaryContent(null); + content.setBinaryRef(null); + content.setContentHash(contentHash); + content.setSizeBytes(sizeBytes); + return contentRepository.save(content); + } + + return documentContentService.addContent(new AddDocumentContentCommand( + document.getId(), + role, + StorageType.DB_TEXT, + mimeType, + CHARSET_UTF8, + text, + null, + null, + contentHash, + sizeBytes + )); + } + + private List buildDrafts(ProcurementDocument legacyDocument) { + String normalizedText = resolveNormalizedText(legacyDocument); + + Map derivedText = new LinkedHashMap<>(); + derivedText.put(ContentRole.NORMALIZED_TEXT, normalizedText); + + SourceDescriptor sourceDescriptor = new SourceDescriptor( + null, + SourceType.MIGRATION, + legacyDocument.getId().toString(), + legacyDocument.getSourcePath(), + legacyDocument.getSourceFilename(), + XML_MIME_TYPE, + null, + legacyDocument.getXmlDocument(), + legacyDocument.getCreatedAt(), + OriginalContentStoragePolicy.DEFAULT, + Map.of("legacyProcurementDocumentId", legacyDocument.getId().toString()) + ); + + DetectionResult detectionResult = new DetectionResult( + DocumentType.TED_NOTICE, + DocumentFamily.PROCUREMENT, + XML_MIME_TYPE, + legacyDocument.getLanguageCode(), + Map.of() + ); + + ExtractionResult extractionResult = new ExtractionResult( + derivedText, + List.of(new ExtractedStructuredPayload("ted-notice", buildStructuredAttributes(legacyDocument))), + List.of() + ); + + return textRepresentationBuildService.build(new RepresentationBuildRequest( + sourceDescriptor, + detectionResult, + extractionResult + )); + } + + private List ensureRepresentations(Document document, + DocumentContent originalContent, + DocumentContent normalizedTextContent, + List drafts) { + if (drafts == null || drafts.isEmpty()) { + return List.of(); + } + + List existing = new ArrayList<>(representationRepository.findByDocument_Id(document.getId())); + existing.sort(Comparator.comparing(DocumentTextRepresentation::getCreatedAt, Comparator.nullsLast(Comparator.naturalOrder()))); + + List saved = new ArrayList<>(); + for (TextRepresentationDraft draft : drafts) { + if (!StringUtils.hasText(draft.textBody())) { + continue; + } + + DocumentTextRepresentation representation = findMatchingRepresentation(existing, draft) + .orElseGet(DocumentTextRepresentation::new); + + DocumentContent linkedContent = draft.sourceContentRole() == ContentRole.NORMALIZED_TEXT + ? normalizedTextContent + : originalContent; + + boolean isNew = representation.getId() == null; + representation.setDocument(document); + representation.setContent(linkedContent); + representation.setRepresentationType(draft.representationType()); + representation.setBuilderKey(draft.builderKey()); + representation.setLanguageCode(draft.languageCode()); + representation.setTokenCount(draft.textBody().length()); + representation.setCharCount(draft.textBody().length()); + representation.setChunkIndex(draft.chunkIndex()); + representation.setChunkStartOffset(draft.chunkStartOffset()); + representation.setChunkEndOffset(draft.chunkEndOffset()); + representation.setPrimaryRepresentation(draft.primary()); + representation.setTextBody(draft.textBody()); + + DocumentTextRepresentation savedRepresentation; + if (isNew) { + savedRepresentation = documentRepresentationService.addRepresentation(new AddDocumentTextRepresentationCommand( + document.getId(), + linkedContent == null ? null : linkedContent.getId(), + draft.representationType(), + draft.builderKey(), + draft.languageCode(), + draft.textBody().length(), + draft.chunkIndex(), + draft.chunkStartOffset(), + draft.chunkEndOffset(), + draft.primary(), + draft.textBody() + )); + existing.add(savedRepresentation); + } else { + savedRepresentation = representationRepository.save(representation); + lexicalIndexService.indexRepresentation(savedRepresentation.getId()); + } + saved.add(savedRepresentation); + } + return saved; + } + + private java.util.Optional findMatchingRepresentation(List existing, + TextRepresentationDraft draft) { + return existing.stream() + .filter(candidate -> candidate.getRepresentationType() == draft.representationType()) + .filter(candidate -> Objects.equals(candidate.getChunkIndex(), draft.chunkIndex())) + .filter(candidate -> Objects.equals(candidate.isPrimaryRepresentation(), draft.primary())) + .findFirst(); + } + + private void queueEmbeddings(UUID documentId, List representations) { + if (!embeddingProperties.isEnabled() || !StringUtils.hasText(embeddingProperties.getDefaultDocumentModel())) { + log.debug("Skipping embedding queue for migrated document {} because no default document model is configured", documentId); + return; + } + + for (DocumentTextRepresentation representation : representations) { + RepresentationType type = representation.getRepresentationType(); + boolean queue = switch (type) { + case SEMANTIC_TEXT -> true; + case CHUNK -> true; + default -> false; + }; + if (!queue) { + continue; + } + embeddingOrchestrator.enqueueRepresentation(documentId, representation.getId(), embeddingProperties.getDefaultDocumentModel()); + } + } + + private String resolveNormalizedText(ProcurementDocument legacyDocument) { + if (StringUtils.hasText(legacyDocument.getTextContent())) { + return legacyDocument.getTextContent().trim(); + } + + StringBuilder sb = new StringBuilder(); + appendLine(sb, legacyDocument.getProjectTitle()); + appendBlank(sb, legacyDocument.getProjectDescription()); + appendLine(sb, legacyDocument.getBuyerName()); + appendLine(sb, joinArray(legacyDocument.getCpvCodes())); + appendLine(sb, joinArray(legacyDocument.getNutsCodes())); + String fallback = sb.toString().trim(); + return StringUtils.hasText(fallback) ? fallback : legacyDocument.getDocumentHash(); + } + + private Map buildStructuredAttributes(ProcurementDocument legacyDocument) { + Map attrs = new LinkedHashMap<>(); + putIfText(attrs, "title", legacyDocument.getProjectTitle()); + putIfText(attrs, "description", legacyDocument.getProjectDescription()); + putIfText(attrs, "buyerName", legacyDocument.getBuyerName()); + putIfText(attrs, "cpvCodes", joinArray(legacyDocument.getCpvCodes())); + putIfText(attrs, "nutsCodes", joinArray(legacyDocument.getNutsCodes())); + putIfText(attrs, "publicationId", legacyDocument.getPublicationId()); + return attrs; + } + + private void putIfText(Map target, String key, String value) { + if (StringUtils.hasText(value)) { + target.put(key, value); + } + } + + private String joinArray(String[] values) { + if (values == null || values.length == 0) { + return null; + } + return String.join(", ", values); + } + + private void appendLine(StringBuilder sb, String value) { + if (StringUtils.hasText(value)) { + if (sb.length() > 0) { + sb.append('\n'); + } + sb.append(value.trim()); + } + } + + private void appendBlank(StringBuilder sb, String value) { + if (StringUtils.hasText(value)) { + if (sb.length() > 0) { + sb.append("\n\n"); + } + sb.append(value.trim()); + } + } + + public record BackfillOutcome(UUID documentId, UUID projectionId) { + } + +} diff --git a/src/main/java/at/procon/dip/migration/startup/LegacyTedBackfillStartupRunner.java b/src/main/java/at/procon/dip/migration/startup/LegacyTedBackfillStartupRunner.java new file mode 100644 index 0000000..7f93d80 --- /dev/null +++ b/src/main/java/at/procon/dip/migration/startup/LegacyTedBackfillStartupRunner.java @@ -0,0 +1,31 @@ +package at.procon.dip.migration.startup; + +import at.procon.dip.migration.config.LegacyTedBackfillProperties; +import at.procon.dip.migration.service.LegacyTedBackfillMigrationService; +import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; +import at.procon.dip.runtime.config.RuntimeMode; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +@Component +@ConditionalOnRuntimeMode(RuntimeMode.NEW) +@RequiredArgsConstructor +@Slf4j +public class LegacyTedBackfillStartupRunner implements ApplicationRunner { + + private final LegacyTedBackfillProperties properties; + private final LegacyTedBackfillMigrationService migrationService; + + @Override + public void run(ApplicationArguments args) { + if (!properties.isEnabled() || !properties.isStartupEnabled()) { + return; + } + + log.info("Startup-triggered legacy TED backfill is enabled"); + migrationService.runBackfill(); + } +} diff --git a/src/main/java/at/procon/ted/repository/LegacyTedMigrationCursor.java b/src/main/java/at/procon/ted/repository/LegacyTedMigrationCursor.java new file mode 100644 index 0000000..a89b42d --- /dev/null +++ b/src/main/java/at/procon/ted/repository/LegacyTedMigrationCursor.java @@ -0,0 +1,9 @@ +package at.procon.ted.repository; + +import java.time.OffsetDateTime; +import java.util.UUID; + +public interface LegacyTedMigrationCursor { + UUID getId(); + OffsetDateTime getCreatedAt(); +} diff --git a/src/main/java/at/procon/ted/repository/ProcurementDocumentRepository.java b/src/main/java/at/procon/ted/repository/ProcurementDocumentRepository.java index e35f16f..e6268b0 100644 --- a/src/main/java/at/procon/ted/repository/ProcurementDocumentRepository.java +++ b/src/main/java/at/procon/ted/repository/ProcurementDocumentRepository.java @@ -209,6 +209,23 @@ public interface ProcurementDocumentRepository extends nativeQuery = true) List findByTextContentContaining(@Param("query") String query, Pageable pageable); + + /** + * Lightweight cursor query for resumable legacy -> DOC/projection backfill. + */ + @Query(value = """ + SELECT p.id AS id, p.created_at AS createdAt + FROM ted.procurement_document p + WHERE (:lastCreatedAt IS NULL + OR p.created_at > :lastCreatedAt + OR (p.created_at = :lastCreatedAt AND CAST(p.id AS text) > CAST(:lastId AS text))) + ORDER BY p.created_at ASC, CAST(p.id AS text) ASC + LIMIT :limit + """, nativeQuery = true) + List findNextMigrationBatch(@Param("lastCreatedAt") OffsetDateTime lastCreatedAt, + @Param("lastId") UUID lastId, + @Param("limit") int limit); + /** * Delete all documents created before the specified date. * Cascading deletes will automatically remove related lots, organizations, and logs. diff --git a/src/main/resources/application-legacy.yml b/src/main/resources/application-legacy.yml index 05a7529..5b71835 100644 --- a/src/main/resources/application-legacy.yml +++ b/src/main/resources/application-legacy.yml @@ -55,7 +55,7 @@ ted: # Packages download configuration download: # Enable/disable automatic package download - enabled: true + enabled: false # User service-based camel route use-service-based: false # Base URL for TED Daily Packages @@ -91,9 +91,9 @@ ted: # Enable one-off repair / re-import of incomplete TED packages on startup enabled: false # Only list candidate packages without modifying data - dry-run: false + dry-run: true # Safety cap for one startup run - max-packages: 100 + max-packages: 200 # Optional explicit package identifiers to repair package-identifiers: [] # Optional inclusive package range diff --git a/src/main/resources/application-new.yml b/src/main/resources/application-new.yml index 1bb47af..4a6dcf2 100644 --- a/src/main/resources/application-new.yml +++ b/src/main/resources/application-new.yml @@ -223,7 +223,7 @@ dip: # ted packages download configuration ted-download: # Enable/disable automatic package download - enabled: false + enabled: true # Base URL for TED Daily Packages base-url: https://ted.europa.eu/packages/daily/ # Download directory for tar.gz files @@ -231,7 +231,7 @@ dip: # Start year for downloads start-year: 2026 # Polling interval (milliseconds) - 2 minutes - poll-interval: 3600000 + poll-interval: 120000 # Retry interval for tail NOT_FOUND packages - 6 hours not-found-retry-interval: 21600000 # Grace period after year end before a previous-year tail 404 is treated as final @@ -246,11 +246,18 @@ dip: delay-between-downloads: 5000 # Delete tar.gz after ingestion delete-after-ingestion: true - + ted: # Phase 3 TED projection configuration + projection: + # Enable/disable dual-write into the TED projection model on top of DOC.doc_document + enabled: true + # Optional startup backfill for legacy TED documents without a projection row yet + startup-backfill-enabled: false + # Maximum number of legacy TED documents to backfill during startup + startup-backfill-limit: 250 migration: legacy-audit: # Enable/disable the Wave 1 / Milestone A legacy integrity audit subsystem - enabled: true + enabled: false # Optional startup execution; the audit is read-only and only writes audit run/finding tables startup-run-enabled: true # Maximum number of legacy TED documents to scan during startup (0 = all) @@ -262,12 +269,18 @@ dip: # Maximum number of grouped duplicate samples captured for aggregate checks max-duplicate-samples: 100 - ted: # Phase 3 TED projection configuration - projection: - # Enable/disable dual-write into the TED projection model on top of DOC.doc_document - enabled: true - # Optional startup backfill for legacy TED documents without a projection row yet - startup-backfill-enabled: false - # Maximum number of legacy TED documents to backfill during startup - startup-backfill-limit: 250 - + legacy-ted: + # Enable the resumable legacy TED -> DOC/projection backfill subsystem + enabled: false + # Run the backfill automatically on NEW-runtime startup + startup-enabled: false + # Number of legacy TED documents fetched and processed per batch + batch-size: 100 + # Optional cap for a single invocation; 0 means migrate all remaining rows + max-documents-per-run: 0 + # Resume the latest STOPPED/FAILED run from its saved cursor + resume-latest-incomplete-run: true + # Import batch id written to DOC.doc_source rows created by the migration + import-batch-id: legacy-ted-backfill + # Keep false for Wave 1; embeddings can be backfilled later as a separate step + queue-embeddings: false \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 470a90c..3d903d2 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,7 +2,7 @@ # Author: Martin.Schweitzer@procon.co.at and claude.ai server: - port: 8889 + port: 8885 servlet: context-path: /api @@ -14,9 +14,9 @@ spring: name: document-intelligence-platform datasource: - url: jdbc:postgresql://94.130.218.54:32333/RELM + url: jdbc:postgresql://localhost:5432/RELM username: ${DB_USERNAME:postgres} - password: ${DB_PASSWORD:PDmXRx0Rbk9OFOn9qO5Gm/mPCfqW8zwbZ+/YIU1lySc=} + password: ${DB_PASSWORD:P54!pcd#Wi} driver-class-name: org.postgresql.Driver hikari: maximum-pool-size: 5 @@ -28,7 +28,7 @@ spring: jpa: hibernate: - ddl-auto: validate + ddl-auto: update show-sql: false open-in-view: false properties: diff --git a/src/main/resources/db/migration/V12__ted_projection_add_package_identifier.sql b/src/main/resources/db/migration/V12__ted_projection_add_package_identifier.sql new file mode 100644 index 0000000..ba5be04 --- /dev/null +++ b/src/main/resources/db/migration/V12__ted_projection_add_package_identifier.sql @@ -0,0 +1,21 @@ +-- Store the TED daily package identifier directly on the Phase 3 TED notice projection. +-- This makes migration, audit, and repair flows package-aware without having to derive the +-- package membership from source paths at query time. + +SET search_path TO TED, DOC, public; + +ALTER TABLE IF EXISTS TED.ted_notice_projection + ADD COLUMN IF NOT EXISTS package_identifier VARCHAR(20); + +CREATE INDEX IF NOT EXISTS idx_ted_notice_projection_package_identifier + ON TED.ted_notice_projection(package_identifier); + +-- Backfill from the linked legacy TED document's source path when the path contains the +-- extracted package directory, e.g. .../extracted/202600003/123.xml +UPDATE TED.ted_notice_projection projection +SET package_identifier = substring(legacy.source_path from '(?:^|[\\/])((?:19|20)[0-9]{7})(?:[\\/]|$)') +FROM TED.procurement_document legacy +WHERE projection.legacy_procurement_document_id = legacy.id + AND projection.package_identifier IS NULL + AND legacy.source_path IS NOT NULL + AND substring(legacy.source_path from '(?:^|[\\/])((?:19|20)[0-9]{7})(?:[\\/]|$)') IS NOT NULL; diff --git a/src/main/resources/db/migration/V20__ted_projection_package_and_legacy_backfill.sql b/src/main/resources/db/migration/V20__ted_projection_package_and_legacy_backfill.sql new file mode 100644 index 0000000..8893fee --- /dev/null +++ b/src/main/resources/db/migration/V20__ted_projection_package_and_legacy_backfill.sql @@ -0,0 +1,72 @@ +SET search_path TO TED, DOC, public; + +ALTER TABLE IF EXISTS TED.ted_notice_projection + ADD COLUMN IF NOT EXISTS package_identifier VARCHAR(32); + +CREATE INDEX IF NOT EXISTS idx_ted_notice_projection_package_identifier + ON TED.ted_notice_projection(package_identifier); + +ALTER TABLE IF EXISTS TED.organization + ALTER COLUMN city TYPE TEXT; + +ALTER TABLE IF EXISTS TED.procurement_document + ALTER COLUMN buyer_city TYPE TEXT; + +ALTER TABLE IF EXISTS TED.ted_notice_organization + ALTER COLUMN city TYPE TEXT; + +ALTER TABLE IF EXISTS TED.ted_notice_projection + ALTER COLUMN buyer_city TYPE TEXT; + +UPDATE TED.ted_notice_projection p +SET package_identifier = substring(coalesce(d.source_path, d.source_filename) from '(20[0-9]{2}[0-9]{5})') +FROM TED.procurement_document d +WHERE p.legacy_procurement_document_id = d.id + AND p.package_identifier IS NULL + AND substring(coalesce(d.source_path, d.source_filename) from '(20[0-9]{2}[0-9]{5})') IS NOT NULL; + +SET search_path TO DOC, public; + +CREATE TABLE IF NOT EXISTS DOC.doc_legacy_ted_migration_run ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + status VARCHAR(32) NOT NULL, + import_batch_id VARCHAR(255), + queue_embeddings BOOLEAN NOT NULL DEFAULT FALSE, + batch_size INTEGER NOT NULL DEFAULT 100, + max_documents_per_run BIGINT, + processed_count BIGINT NOT NULL DEFAULT 0, + success_count BIGINT NOT NULL DEFAULT 0, + failed_count BIGINT NOT NULL DEFAULT 0, + last_legacy_created_at TIMESTAMP WITH TIME ZONE, + last_legacy_document_id UUID, + last_doc_document_id UUID, + last_projection_id UUID, + last_error TEXT, + started_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + completed_at TIMESTAMP WITH TIME ZONE +); + +CREATE INDEX IF NOT EXISTS idx_doc_legacy_ted_mig_run_status + ON DOC.doc_legacy_ted_migration_run(status); +CREATE INDEX IF NOT EXISTS idx_doc_legacy_ted_mig_run_started + ON DOC.doc_legacy_ted_migration_run(started_at DESC); + +CREATE TABLE IF NOT EXISTS DOC.doc_legacy_ted_migration_checkpoint ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + run_id UUID NOT NULL REFERENCES DOC.doc_legacy_ted_migration_run(id) ON DELETE CASCADE, + batch_number INTEGER NOT NULL, + batch_processed_count INTEGER NOT NULL DEFAULT 0, + cumulative_processed_count BIGINT NOT NULL DEFAULT 0, + last_legacy_created_at TIMESTAMP WITH TIME ZONE, + last_legacy_document_id UUID, + last_doc_document_id UUID, + last_projection_id UUID, + note TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_doc_legacy_ted_mig_ckpt_run + ON DOC.doc_legacy_ted_migration_checkpoint(run_id); +CREATE INDEX IF NOT EXISTS idx_doc_legacy_ted_mig_ckpt_batch + ON DOC.doc_legacy_ted_migration_checkpoint(batch_number);