Fix async ingest wait package key lookup

This commit is contained in:
trifonovt 2026-05-03 01:09:48 +02:00
parent bd3620b9af
commit d0a8d44082
1 changed files with 180 additions and 21 deletions

View File

@ -1,12 +1,16 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.importing.persistence.ImportRunRepository;
import at.procon.eventhub.persistence.DataPackageRepository.CamelBatchGroupStatus;
import at.procon.eventhub.persistence.DataPackageRepository;
import at.procon.eventhub.persistence.EventSourceRepository;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@ -22,23 +26,31 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractImportExecutionService<R extends ImportRunRequest, B extends ExtractionBatchResult> {
private static final Duration ASYNC_INGEST_AWAIT_TIMEOUT = Duration.ofHours(6);
private static final Duration ASYNC_INGEST_POLL_INTERVAL = Duration.ofSeconds(2);
private static final Duration ASYNC_INGEST_FAILURE_GRACE_PERIOD = Duration.ofSeconds(90);
private static final Duration ASYNC_INGEST_PROGRESS_LOG_INTERVAL = Duration.ofSeconds(30);
private final Logger log = LoggerFactory.getLogger(getClass());
private final EventSourceRepository eventSourceRepository;
private final ImportRunRepository importRunRepository;
private final DataPackageRepository dataPackageRepository;
private final ImportCursorRepository importCursorRepository;
private final EventHubProperties eventHubProperties;
protected AbstractImportExecutionService(
EventSourceRepository eventSourceRepository,
ImportRunRepository importRunRepository,
DataPackageRepository dataPackageRepository,
ImportCursorRepository importCursorRepository
ImportCursorRepository importCursorRepository,
EventHubProperties eventHubProperties
) {
this.eventSourceRepository = eventSourceRepository;
this.importRunRepository = importRunRepository;
this.dataPackageRepository = dataPackageRepository;
this.importCursorRepository = importCursorRepository;
this.eventHubProperties = eventHubProperties;
}
protected ImportRunResultDto createImportRun(R request, boolean executeImmediately) {
@ -165,6 +177,7 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
List<B> results = new ArrayList<>();
for (PlannedPackage plannedPackage : plannedPackages) {
dataPackageRepository.markImporting(plannedPackage.packageId());
try {
B result = executeBatch(
importRunId,
plannedPackage.packageId(),
@ -173,6 +186,7 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
plannedPackage.planItem(),
plannedPackage.chunk()
);
awaitAsyncIngestCompletion(importRunId, request, plannedPackage, result);
results.add(result);
dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted());
if (result.executed()) {
@ -186,6 +200,10 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
result
);
}
} catch (RuntimeException ex) {
dataPackageRepository.markFailed(plannedPackage.packageId(), ex.getMessage());
throw ex;
}
}
importRunRepository.markCompleted(importRunId);
log.info("Completed import run provider={} importRunId={} packages={} insertedEvents={} executedBatches={}",
@ -196,6 +214,88 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
results.stream().filter(ExtractionBatchResult::executed).count());
}
private void awaitAsyncIngestCompletion(UUID importRunId, R request, PlannedPackage plannedPackage, B result) {
int expectedCamelBatches = expectedCamelBatchCount(result.eventsInserted());
if (expectedCamelBatches <= 0) {
return;
}
EventHubPackageRequest packageInfo = extractionAggregatePackageInfo(
importRunId,
request,
eventSourceForItem(request.eventSource(), plannedPackage.planItem()),
plannedPackage.planItem(),
plannedPackage.chunk()
);
String aggregatePackageKey = aggregatePackageKey(packageInfo);
Instant deadline = Instant.now().plus(ASYNC_INGEST_AWAIT_TIMEOUT);
Instant nextProgressLogAt = Instant.now().plus(ASYNC_INGEST_PROGRESS_LOG_INTERVAL);
Instant failedStateObservedAt = null;
CamelBatchGroupStatus previousState = null;
while (Instant.now().isBefore(deadline)) {
CamelBatchGroupStatus state = dataPackageRepository.findCamelBatchGroupStatus(
plannedPackage.eventSourceId(),
request.tenantKey(),
aggregatePackageKey
);
if (state.totalCount() >= expectedCamelBatches && state.successCount() >= expectedCamelBatches) {
return;
}
boolean stateChanged = previousState == null || !previousState.equals(state);
if (state.failedCount() > 0 && stateChanged) {
failedStateObservedAt = Instant.now();
} else if (state.failedCount() == 0) {
failedStateObservedAt = null;
}
if (state.failedCount() > 0
&& failedStateObservedAt != null
&& Instant.now().isAfter(failedStateObservedAt.plus(ASYNC_INGEST_FAILURE_GRACE_PERIOD))) {
throw new IllegalStateException(
"Async EventHub ingest failed for importRunId=" + importRunId
+ " aggregatePackageKey=" + aggregatePackageKey
+ " expectedCamelBatches=" + expectedCamelBatches
+ " observedCamelBatches=" + state.totalCount()
+ " failedCamelBatches=" + state.failedCount()
+ " failedMessage=" + state.failedMessage()
);
}
if (Instant.now().isAfter(nextProgressLogAt)) {
log.info("Waiting for async EventHub ingest provider={} importRunId={} extractionPackageId={} aggregatePackageKey={} expectedCamelBatches={} observedCamelBatches={} successfulCamelBatches={} failedCamelBatches={} importingCamelBatches={}",
providerPackagePrefix(),
importRunId,
plannedPackage.packageId(),
aggregatePackageKey,
expectedCamelBatches,
state.totalCount(),
state.successCount(),
state.failedCount(),
state.importingCount());
nextProgressLogAt = Instant.now().plus(ASYNC_INGEST_PROGRESS_LOG_INTERVAL);
}
previousState = state;
sleepQuietly(ASYNC_INGEST_POLL_INTERVAL);
}
CamelBatchGroupStatus finalState = dataPackageRepository.findCamelBatchGroupStatus(
plannedPackage.eventSourceId(),
request.tenantKey(),
aggregatePackageKey
);
throw new IllegalStateException(
"Timed out waiting for async EventHub ingest for importRunId=" + importRunId
+ " aggregatePackageKey=" + aggregatePackageKey
+ " expectedCamelBatches=" + expectedCamelBatches
+ " observedCamelBatches=" + finalState.totalCount()
+ " successfulCamelBatches=" + finalState.successCount()
+ " failedCamelBatches=" + finalState.failedCount()
);
}
private EventHubPackageRequest packageRequestFor(
R request,
EventSourceDto itemEventSource,
@ -206,13 +306,72 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
request.tenantKey(),
itemEventSource,
request.sourceGroup(),
request.importScope(),
chunkScope(request.importScope(), chunk),
item.eventFamily().name(),
null,
chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(),
externalPackageId(request, item, chunk)
);
}
private EventHubPackageRequest extractionAggregatePackageInfo(
UUID importRunId,
R request,
EventSourceDto itemEventSource,
ImportPlanItemDto item,
ImportTimeChunkDto chunk
) {
return new EventHubPackageRequest(
request.tenantKey(),
itemEventSource,
request.sourceGroup(),
chunkScope(request.importScope(), chunk),
item.eventFamily().name(),
chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(),
providerPackagePrefix() + ":" + item.sourceKind() + ":" + item.extractionCode()
+ ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence()
);
}
private int expectedCamelBatchCount(int extractedEventCount) {
if (extractedEventCount <= 0) {
return 0;
}
int completionSize = Math.max(1, eventHubProperties.getBatch().getCompletionSize());
return ((extractedEventCount - 1) / completionSize) + 1;
}
private String aggregatePackageKey(EventHubPackageRequest packageInfo) {
return packageInfo.tenantKey()
+ ":" + packageInfo.eventSource().stableKey()
+ ":" + (packageInfo.sourceGroup() == null ? "NO_GROUP" : packageInfo.sourceGroup().stableKey())
+ ":" + (packageInfo.importScope() == null ? "NO_SCOPE" : packageInfo.importScope().stableKey())
+ ":" + packageInfo.eventFamily()
+ ":" + (packageInfo.businessDate() == null ? "NO_DATE" : packageInfo.businessDate())
+ ":" + packageInfo.externalPackageId();
}
private at.procon.eventhub.dto.ImportScopeDto chunkScope(at.procon.eventhub.dto.ImportScopeDto scope, ImportTimeChunkDto chunk) {
if (scope == null) {
return at.procon.eventhub.dto.ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo());
}
return new at.procon.eventhub.dto.ImportScopeDto(
scope.type(),
scope.rootSourceOrganisation(),
scope.includeChildren(),
chunk.occurredFrom(),
chunk.occurredTo()
);
}
private void sleepQuietly(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted while waiting for async EventHub ingest", ex);
}
}
private record PlannedPackage(UUID packageId, int eventSourceId, ImportPlanItemDto planItem, ImportTimeChunkDto chunk) {
}
}