From d0a8d4408213e7704d7002f30d6f4c18ee3fb6c3 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Sun, 3 May 2026 01:09:48 +0200 Subject: [PATCH] Fix async ingest wait package key lookup --- .../AbstractImportExecutionService.java | 201 ++++++++++++++++-- 1 file changed, 180 insertions(+), 21 deletions(-) diff --git a/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java b/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java index 3e2ffca..ed5444a 100644 --- a/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java @@ -1,12 +1,16 @@ package at.procon.eventhub.importing; +import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportRunStatus; import at.procon.eventhub.importing.persistence.ImportCursorRepository; import at.procon.eventhub.importing.persistence.ImportRunRepository; +import at.procon.eventhub.persistence.DataPackageRepository.CamelBatchGroupStatus; import at.procon.eventhub.persistence.DataPackageRepository; import at.procon.eventhub.persistence.EventSourceRepository; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -22,23 +26,31 @@ import org.slf4j.LoggerFactory; */ public abstract class AbstractImportExecutionService { + private static final Duration ASYNC_INGEST_AWAIT_TIMEOUT = Duration.ofHours(6); + private static final Duration ASYNC_INGEST_POLL_INTERVAL = Duration.ofSeconds(2); + private static final Duration ASYNC_INGEST_FAILURE_GRACE_PERIOD = Duration.ofSeconds(90); + private static final Duration ASYNC_INGEST_PROGRESS_LOG_INTERVAL = Duration.ofSeconds(30); + private final Logger log = LoggerFactory.getLogger(getClass()); private final EventSourceRepository eventSourceRepository; private final ImportRunRepository importRunRepository; private final DataPackageRepository dataPackageRepository; private final ImportCursorRepository importCursorRepository; + private final EventHubProperties eventHubProperties; protected AbstractImportExecutionService( EventSourceRepository eventSourceRepository, ImportRunRepository importRunRepository, DataPackageRepository dataPackageRepository, - ImportCursorRepository importCursorRepository + ImportCursorRepository importCursorRepository, + EventHubProperties eventHubProperties ) { this.eventSourceRepository = eventSourceRepository; this.importRunRepository = importRunRepository; this.dataPackageRepository = dataPackageRepository; this.importCursorRepository = importCursorRepository; + this.eventHubProperties = eventHubProperties; } protected ImportRunResultDto createImportRun(R request, boolean executeImmediately) { @@ -165,26 +177,32 @@ public abstract class AbstractImportExecutionService results = new ArrayList<>(); for (PlannedPackage plannedPackage : plannedPackages) { dataPackageRepository.markImporting(plannedPackage.packageId()); - B result = executeBatch( - importRunId, - plannedPackage.packageId(), - plannedPackage.eventSourceId(), - request, - plannedPackage.planItem(), - plannedPackage.chunk() - ); - results.add(result); - dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted()); - if (result.executed()) { - importCursorRepository.advanceCursor( - request.tenantKey(), + try { + B result = executeBatch( + importRunId, + plannedPackage.packageId(), plannedPackage.eventSourceId(), - request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(), - plannedPackage.planItem().eventFamily(), - plannedPackage.planItem().sourceKind(), - request.acquisitionStrategy(), - result + request, + plannedPackage.planItem(), + plannedPackage.chunk() ); + awaitAsyncIngestCompletion(importRunId, request, plannedPackage, result); + results.add(result); + dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted()); + if (result.executed()) { + importCursorRepository.advanceCursor( + request.tenantKey(), + plannedPackage.eventSourceId(), + request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(), + plannedPackage.planItem().eventFamily(), + plannedPackage.planItem().sourceKind(), + request.acquisitionStrategy(), + result + ); + } + } catch (RuntimeException ex) { + dataPackageRepository.markFailed(plannedPackage.packageId(), ex.getMessage()); + throw ex; } } importRunRepository.markCompleted(importRunId); @@ -196,6 +214,88 @@ public abstract class AbstractImportExecutionService= expectedCamelBatches && state.successCount() >= expectedCamelBatches) { + return; + } + + boolean stateChanged = previousState == null || !previousState.equals(state); + if (state.failedCount() > 0 && stateChanged) { + failedStateObservedAt = Instant.now(); + } else if (state.failedCount() == 0) { + failedStateObservedAt = null; + } + + if (state.failedCount() > 0 + && failedStateObservedAt != null + && Instant.now().isAfter(failedStateObservedAt.plus(ASYNC_INGEST_FAILURE_GRACE_PERIOD))) { + throw new IllegalStateException( + "Async EventHub ingest failed for importRunId=" + importRunId + + " aggregatePackageKey=" + aggregatePackageKey + + " expectedCamelBatches=" + expectedCamelBatches + + " observedCamelBatches=" + state.totalCount() + + " failedCamelBatches=" + state.failedCount() + + " failedMessage=" + state.failedMessage() + ); + } + + if (Instant.now().isAfter(nextProgressLogAt)) { + log.info("Waiting for async EventHub ingest provider={} importRunId={} extractionPackageId={} aggregatePackageKey={} expectedCamelBatches={} observedCamelBatches={} successfulCamelBatches={} failedCamelBatches={} importingCamelBatches={}", + providerPackagePrefix(), + importRunId, + plannedPackage.packageId(), + aggregatePackageKey, + expectedCamelBatches, + state.totalCount(), + state.successCount(), + state.failedCount(), + state.importingCount()); + nextProgressLogAt = Instant.now().plus(ASYNC_INGEST_PROGRESS_LOG_INTERVAL); + } + + previousState = state; + sleepQuietly(ASYNC_INGEST_POLL_INTERVAL); + } + + CamelBatchGroupStatus finalState = dataPackageRepository.findCamelBatchGroupStatus( + plannedPackage.eventSourceId(), + request.tenantKey(), + aggregatePackageKey + ); + throw new IllegalStateException( + "Timed out waiting for async EventHub ingest for importRunId=" + importRunId + + " aggregatePackageKey=" + aggregatePackageKey + + " expectedCamelBatches=" + expectedCamelBatches + + " observedCamelBatches=" + finalState.totalCount() + + " successfulCamelBatches=" + finalState.successCount() + + " failedCamelBatches=" + finalState.failedCount() + ); + } + private EventHubPackageRequest packageRequestFor( R request, EventSourceDto itemEventSource, @@ -206,13 +306,72 @@ public abstract class AbstractImportExecutionService