diff --git a/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java b/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java new file mode 100644 index 0000000..3e2ffca --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java @@ -0,0 +1,218 @@ +package at.procon.eventhub.importing; + +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportRunStatus; +import at.procon.eventhub.importing.persistence.ImportCursorRepository; +import at.procon.eventhub.importing.persistence.ImportRunRepository; +import at.procon.eventhub.persistence.DataPackageRepository; +import at.procon.eventhub.persistence.EventSourceRepository; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Shared import-run orchestration for source providers. Provider services supply + * the plan and the concrete batch executor; this class owns the common EventHub + * run, package, and cursor lifecycle. + */ +public abstract class AbstractImportExecutionService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final EventSourceRepository eventSourceRepository; + private final ImportRunRepository importRunRepository; + private final DataPackageRepository dataPackageRepository; + private final ImportCursorRepository importCursorRepository; + + protected AbstractImportExecutionService( + EventSourceRepository eventSourceRepository, + ImportRunRepository importRunRepository, + DataPackageRepository dataPackageRepository, + ImportCursorRepository importCursorRepository + ) { + this.eventSourceRepository = eventSourceRepository; + this.importRunRepository = importRunRepository; + this.dataPackageRepository = dataPackageRepository; + this.importCursorRepository = importCursorRepository; + } + + protected ImportRunResultDto createImportRun(R request, boolean executeImmediately) { + ImportPlanDto plan = createPlan(request); + int baseEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), request.eventSource()); + UUID importRunId = importRunRepository.createPlannedRun( + baseEventSourceId, + request, + importRunMetadata(request, executeImmediately) + ); + + List packageIds = new ArrayList<>(); + List plannedPackages = new ArrayList<>(); + int batchNo = 1; + try { + for (ImportPlanItemDto item : plan.items()) { + EventSourceDto itemEventSource = eventSourceForItem(request.eventSource(), item); + int itemEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), itemEventSource); + for (ImportTimeChunkDto chunk : plan.chunks()) { + EventHubPackageRequest packageInfo = packageRequestFor(request, itemEventSource, item, chunk); + String packageKey = packageKey(importRunId, packageInfo, item, chunk, batchNo); + UUID packageId = dataPackageRepository.createPlannedExtractionPackage( + importRunId, + itemEventSourceId, + packageKey, + packageInfo, + item.extractionCode(), + item.sourceKind(), + item.entityAxis(), + chunk == null ? null : chunk.occurredFrom(), + chunk == null ? null : chunk.occurredTo(), + batchNo, + packageMetadata(request, item, chunk, importRunId) + ); + packageIds.add(packageId); + plannedPackages.add(new PlannedPackage(packageId, itemEventSourceId, item, chunk)); + batchNo++; + } + } + importRunRepository.markPlannedPackages(importRunId, packageIds.size()); + log.info("Created import run provider={} importRunId={} plannedPackages={} tenant={} mode={} strategy={} executeImmediately={}", + providerPackagePrefix(), importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy(), executeImmediately); + + if (executeImmediately) { + executePlannedPackages(importRunId, request, plannedPackages); + return new ImportRunResultDto(importRunId, ImportRunStatus.COMPLETED, packageIds.size(), plan, List.copyOf(packageIds)); + } + return new ImportRunResultDto(importRunId, ImportRunStatus.PLANNED, packageIds.size(), plan, List.copyOf(packageIds)); + } catch (RuntimeException ex) { + importRunRepository.markFailed(importRunId, ex.getMessage()); + throw ex; + } + } + + protected abstract ImportPlanDto createPlan(R request); + + protected abstract B executeBatch( + UUID importRunId, + UUID packageId, + int eventSourceId, + R request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk + ); + + protected void beforeExecute(R request) { + } + + protected EventSourceDto eventSourceForItem(EventSourceDto base, ImportPlanItemDto item) { + return base; + } + + protected Map importRunMetadata(R request, boolean executeImmediately) { + return Map.of( + "note", executeImmediately + ? "Created import run and executing planned extraction packages." + : "Created import run and planned extraction packages.", + "packageModel", "EventHub data packages are extraction batches; original source packages are SourcePackageRefDto.", + "executeImmediately", executeImmediately + ); + } + + protected Map packageMetadata(R request, ImportPlanItemDto item, ImportTimeChunkDto chunk, UUID importRunId) { + Map metadata = new LinkedHashMap<>(); + metadata.put("importRunId", importRunId.toString()); + metadata.put("mode", request.mode().name()); + metadata.put("acquisitionStrategy", request.acquisitionStrategy().name()); + metadata.put("refreshMasterDataFirst", request.refreshMasterDataFirst()); + metadata.put("eventFamily", item.eventFamily().name()); + metadata.put("sourceKind", item.sourceKind()); + metadata.put("extractionCode", item.extractionCode()); + metadata.put("sourceTables", item.sourceTables()); + metadata.put("entityAxis", item.entityAxis()); + metadata.put("chunkSequence", chunk.sequence()); + metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString()); + metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString()); + metadata.put("sourcePackageRefPolicy", "Original source package is preserved per acquired event when extraction returns it."); + return metadata; + } + + protected String providerPackagePrefix() { + return "SOURCE"; + } + + protected String externalPackageId(R request, ImportPlanItemDto item, ImportTimeChunkDto chunk) { + String scope = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(); + return providerPackagePrefix() + ":" + item.sourceKind() + ":" + item.extractionCode() + ":" + item.eventFamily() + + ":" + scope + ":CHUNK-" + chunk.sequence(); + } + + protected String packageKey(UUID importRunId, EventHubPackageRequest packageInfo, ImportPlanItemDto item, ImportTimeChunkDto chunk, int batchNo) { + return packageInfo.tenantKey() + + ":" + packageInfo.eventSource().stableKey() + + ":" + item.eventFamily() + + ":" + item.extractionCode() + + ":RUN-" + importRunId + + ":CHUNK-" + chunk.sequence() + + ":BATCH-" + batchNo; + } + + private void executePlannedPackages(UUID importRunId, R request, List plannedPackages) { + importRunRepository.markRunning(importRunId); + beforeExecute(request); + List results = new ArrayList<>(); + for (PlannedPackage plannedPackage : plannedPackages) { + dataPackageRepository.markImporting(plannedPackage.packageId()); + B result = executeBatch( + importRunId, + plannedPackage.packageId(), + plannedPackage.eventSourceId(), + request, + plannedPackage.planItem(), + plannedPackage.chunk() + ); + results.add(result); + dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted()); + if (result.executed()) { + importCursorRepository.advanceCursor( + request.tenantKey(), + plannedPackage.eventSourceId(), + request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(), + plannedPackage.planItem().eventFamily(), + plannedPackage.planItem().sourceKind(), + request.acquisitionStrategy(), + result + ); + } + } + importRunRepository.markCompleted(importRunId); + log.info("Completed import run provider={} importRunId={} packages={} insertedEvents={} executedBatches={}", + providerPackagePrefix(), + importRunId, + plannedPackages.size(), + results.stream().mapToInt(ExtractionBatchResult::eventsInserted).sum(), + results.stream().filter(ExtractionBatchResult::executed).count()); + } + + private EventHubPackageRequest packageRequestFor( + R request, + EventSourceDto itemEventSource, + ImportPlanItemDto item, + ImportTimeChunkDto chunk + ) { + return new EventHubPackageRequest( + request.tenantKey(), + itemEventSource, + request.sourceGroup(), + request.importScope(), + item.eventFamily().name(), + null, + externalPackageId(request, item, chunk) + ); + } + + private record PlannedPackage(UUID packageId, int eventSourceId, ImportPlanItemDto planItem, ImportTimeChunkDto chunk) { + } +} diff --git a/src/main/java/at/procon/eventhub/importing/ExtractionBatchResult.java b/src/main/java/at/procon/eventhub/importing/ExtractionBatchResult.java new file mode 100644 index 0000000..920c46d --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/ExtractionBatchResult.java @@ -0,0 +1,18 @@ +package at.procon.eventhub.importing; + +import java.time.OffsetDateTime; + +public interface ExtractionBatchResult { + + int eventsInserted(); + + boolean executed(); + + OffsetDateTime lastSourcePackageImportedAt(); + + String lastSourcePackageId(); + + OffsetDateTime lastSourceRowUpdatedAt(); + + OffsetDateTime lastOccurredTo(); +} diff --git a/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportPlanDto.java b/src/main/java/at/procon/eventhub/importing/ImportPlanDto.java similarity index 75% rename from src/main/java/at/procon/eventhub/tachograph/dto/TachographImportPlanDto.java rename to src/main/java/at/procon/eventhub/importing/ImportPlanDto.java index c6e28fb..8bdc0b2 100644 --- a/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportPlanDto.java +++ b/src/main/java/at/procon/eventhub/importing/ImportPlanDto.java @@ -1,14 +1,13 @@ -package at.procon.eventhub.tachograph.dto; +package at.procon.eventhub.importing; import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportMode; import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.SourceGroupRefDto; - import java.util.List; -public record TachographImportPlanDto( +public record ImportPlanDto( String tenantKey, ImportMode mode, AcquisitionStrategy acquisitionStrategy, @@ -16,7 +15,7 @@ public record TachographImportPlanDto( ImportScopeDto importScope, SourceGroupRefDto sourceGroup, EventSourceDto eventSource, - List chunks, - List items + List chunks, + List items ) { } diff --git a/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportPlanItemDto.java b/src/main/java/at/procon/eventhub/importing/ImportPlanItemDto.java similarity index 80% rename from src/main/java/at/procon/eventhub/tachograph/dto/TachographImportPlanItemDto.java rename to src/main/java/at/procon/eventhub/importing/ImportPlanItemDto.java index 3240ec6..062c30a 100644 --- a/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportPlanItemDto.java +++ b/src/main/java/at/procon/eventhub/importing/ImportPlanItemDto.java @@ -1,11 +1,10 @@ -package at.procon.eventhub.tachograph.dto; +package at.procon.eventhub.importing; import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.EventFamily; - import java.util.List; -public record TachographImportPlanItemDto( +public record ImportPlanItemDto( EventFamily eventFamily, String sourceKind, String extractionCode, diff --git a/src/main/java/at/procon/eventhub/importing/ImportRunRequest.java b/src/main/java/at/procon/eventhub/importing/ImportRunRequest.java new file mode 100644 index 0000000..d5c54dc --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/ImportRunRequest.java @@ -0,0 +1,28 @@ +package at.procon.eventhub.importing; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import java.util.Set; + +public interface ImportRunRequest { + + String tenantKey(); + + EventSourceDto eventSource(); + + SourceGroupRefDto sourceGroup(); + + ImportScopeDto importScope(); + + Set eventFamilies(); + + ImportMode mode(); + + boolean refreshMasterDataFirst(); + + AcquisitionStrategy acquisitionStrategy(); +} diff --git a/src/main/java/at/procon/eventhub/importing/ImportRunResultDto.java b/src/main/java/at/procon/eventhub/importing/ImportRunResultDto.java new file mode 100644 index 0000000..0fc7857 --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/ImportRunResultDto.java @@ -0,0 +1,14 @@ +package at.procon.eventhub.importing; + +import at.procon.eventhub.dto.ImportRunStatus; +import java.util.List; +import java.util.UUID; + +public record ImportRunResultDto( + UUID importRunId, + ImportRunStatus status, + int plannedPackageCount, + ImportPlanDto plan, + List plannedPackageIds +) { +} diff --git a/src/main/java/at/procon/eventhub/tachograph/dto/TimeChunkDto.java b/src/main/java/at/procon/eventhub/importing/ImportTimeChunkDto.java similarity index 83% rename from src/main/java/at/procon/eventhub/tachograph/dto/TimeChunkDto.java rename to src/main/java/at/procon/eventhub/importing/ImportTimeChunkDto.java index bc5115f..6ba351c 100644 --- a/src/main/java/at/procon/eventhub/tachograph/dto/TimeChunkDto.java +++ b/src/main/java/at/procon/eventhub/importing/ImportTimeChunkDto.java @@ -1,8 +1,8 @@ -package at.procon.eventhub.tachograph.dto; +package at.procon.eventhub.importing; import java.time.OffsetDateTime; -public record TimeChunkDto( +public record ImportTimeChunkDto( int sequence, OffsetDateTime occurredFrom, OffsetDateTime occurredTo diff --git a/src/main/java/at/procon/eventhub/tachograph/persistence/ImportCursorRepository.java b/src/main/java/at/procon/eventhub/importing/persistence/ImportCursorRepository.java similarity index 95% rename from src/main/java/at/procon/eventhub/tachograph/persistence/ImportCursorRepository.java rename to src/main/java/at/procon/eventhub/importing/persistence/ImportCursorRepository.java index d73d2c2..f23c0de 100644 --- a/src/main/java/at/procon/eventhub/tachograph/persistence/ImportCursorRepository.java +++ b/src/main/java/at/procon/eventhub/importing/persistence/ImportCursorRepository.java @@ -1,9 +1,9 @@ -package at.procon.eventhub.tachograph.persistence; +package at.procon.eventhub.importing.persistence; import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.EventFamily; import at.procon.eventhub.dto.ImportCursorStateDto; -import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; +import at.procon.eventhub.importing.ExtractionBatchResult; import java.util.UUID; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; @@ -63,7 +63,7 @@ public class ImportCursorRepository { EventFamily eventFamily, String sourceKind, AcquisitionStrategy strategy, - TachographExtractionBatchResultDto result + ExtractionBatchResult result ) { jdbcTemplate.update( """ diff --git a/src/main/java/at/procon/eventhub/tachograph/persistence/ImportRunRepository.java b/src/main/java/at/procon/eventhub/importing/persistence/ImportRunRepository.java similarity index 91% rename from src/main/java/at/procon/eventhub/tachograph/persistence/ImportRunRepository.java rename to src/main/java/at/procon/eventhub/importing/persistence/ImportRunRepository.java index 4e5029e..4c9ef45 100644 --- a/src/main/java/at/procon/eventhub/tachograph/persistence/ImportRunRepository.java +++ b/src/main/java/at/procon/eventhub/importing/persistence/ImportRunRepository.java @@ -1,10 +1,9 @@ -package at.procon.eventhub.tachograph.persistence; +package at.procon.eventhub.importing.persistence; -import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportRunStatus; import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.SourceGroupRefDto; -import at.procon.eventhub.tachograph.dto.TachographImportRequest; +import at.procon.eventhub.importing.ImportRunRequest; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.sql.Array; @@ -26,12 +25,11 @@ public class ImportRunRepository { this.objectMapper = objectMapper; } - public UUID createPlannedRun(int eventSourceId, TachographImportRequest request, Map metadata) { + public UUID createPlannedRun(int eventSourceId, ImportRunRequest request, Map metadata) { UUID id = UUID.randomUUID(); SourceGroupRefDto sourceGroup = request.sourceGroup(); ImportScopeDto importScope = request.importScope(); SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation(); - EventSourceDto eventSource = request.eventSource(); jdbcTemplate.update(con -> { var ps = con.prepareStatement(""" @@ -84,7 +82,7 @@ public class ImportRunRepository { jdbcTemplate.update("update eventhub.import_run set status = ?, error_message = ?, finished_at = now() where id = ?", ImportRunStatus.FAILED.name(), errorMessage, id); } - private Array eventFamilyArray(Connection con, TachographImportRequest request) throws SQLException { + private Array eventFamilyArray(Connection con, ImportRunRequest request) throws SQLException { String[] values = request.eventFamilies().stream().map(Enum::name).toArray(String[]::new); return con.createArrayOf("text", values); } diff --git a/src/main/java/at/procon/eventhub/tachograph/dto/TachographExtractionBatchResultDto.java b/src/main/java/at/procon/eventhub/tachograph/dto/TachographExtractionBatchResultDto.java index c1fd71e..29557c9 100644 --- a/src/main/java/at/procon/eventhub/tachograph/dto/TachographExtractionBatchResultDto.java +++ b/src/main/java/at/procon/eventhub/tachograph/dto/TachographExtractionBatchResultDto.java @@ -1,5 +1,6 @@ package at.procon.eventhub.tachograph.dto; +import at.procon.eventhub.importing.ExtractionBatchResult; import java.time.OffsetDateTime; import java.util.UUID; @@ -16,5 +17,5 @@ public record TachographExtractionBatchResultDto( String lastSourcePackageId, OffsetDateTime lastSourceRowUpdatedAt, OffsetDateTime lastOccurredTo -) { +) implements ExtractionBatchResult { } diff --git a/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportRequest.java b/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportRequest.java index 43a98c0..4ea0cd8 100644 --- a/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportRequest.java +++ b/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportRequest.java @@ -6,6 +6,7 @@ import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportMode; import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.SourceGroupRefDto; +import at.procon.eventhub.importing.ImportRunRequest; import jakarta.validation.Valid; import jakarta.validation.constraints.NotBlank; @@ -27,7 +28,7 @@ public record TachographImportRequest( ImportMode mode, boolean refreshMasterDataFirst, AcquisitionStrategy acquisitionStrategy -) { +) implements ImportRunRequest { public TachographImportRequest { tenantKey = tenantKey == null ? null : tenantKey.trim(); if (importScope == null) { diff --git a/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportRunResultDto.java b/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportRunResultDto.java index 18c6c37..b15ccd8 100644 --- a/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportRunResultDto.java +++ b/src/main/java/at/procon/eventhub/tachograph/dto/TachographImportRunResultDto.java @@ -1,6 +1,7 @@ package at.procon.eventhub.tachograph.dto; import at.procon.eventhub.dto.ImportRunStatus; +import at.procon.eventhub.importing.ImportPlanDto; import java.util.List; import java.util.UUID; @@ -9,7 +10,7 @@ public record TachographImportRunResultDto( UUID importRunId, ImportRunStatus status, int plannedPackageCount, - TachographImportPlanDto plan, + ImportPlanDto plan, List plannedPackageIds ) { } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java index 2f0485d..6b72903 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java @@ -2,11 +2,11 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.dto.ImportCursorStateDto; import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.persistence.ImportCursorRepository; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; -import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import at.procon.eventhub.tachograph.dto.TimeChunkDto; -import at.procon.eventhub.tachograph.persistence.ImportCursorRepository; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; @@ -54,8 +54,8 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti UUID packageId, int eventSourceId, TachographImportRequest request, - TachographImportPlanItemDto planItem, - TimeChunkDto chunk + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk ) { TachographExtractionDefinition definition = definitionRegistry.findByCode(planItem.extractionCode()) .orElseThrow(() -> new IllegalArgumentException("No tachograph extraction definition for " + planItem.extractionCode())); @@ -141,7 +141,7 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti return params; } - private ImportScopeDto chunkScope(ImportScopeDto scope, TimeChunkDto chunk) { + private ImportScopeDto chunkScope(ImportScopeDto scope, ImportTimeChunkDto chunk) { if (scope == null) { return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo()); } @@ -154,7 +154,7 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti ); } - private at.procon.eventhub.dto.EventSourceDto eventSourceFor(TachographImportRequest request, TachographImportPlanItemDto planItem) { + private at.procon.eventhub.dto.EventSourceDto eventSourceFor(TachographImportRequest request, ImportPlanItemDto planItem) { String sourceKey = switch (planItem.sourceKind()) { case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT"; case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD"; diff --git a/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java index 450275c..d57272a 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java @@ -1,9 +1,9 @@ package at.procon.eventhub.tachograph.service; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; -import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import at.procon.eventhub.tachograph.dto.TimeChunkDto; import java.util.UUID; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -31,8 +31,8 @@ public class NoopTachographExtractionBatchExecutor implements TachographExtracti UUID packageId, int eventSourceId, TachographImportRequest request, - TachographImportPlanItemDto planItem, - TimeChunkDto chunk + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk ) { log.warn("No concrete tachograph SQL extractor configured. importRunId={} packageId={} extractionCode={} sourceKind={} chunk={}", importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), chunk.sequence()); diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionBatchExecutor.java index b2a81fd..fae4ce5 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionBatchExecutor.java @@ -1,9 +1,9 @@ package at.procon.eventhub.tachograph.service; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; -import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import at.procon.eventhub.tachograph.dto.TimeChunkDto; import java.util.UUID; public interface TachographExtractionBatchExecutor { @@ -13,7 +13,7 @@ public interface TachographExtractionBatchExecutor { UUID packageId, int eventSourceId, TachographImportRequest request, - TachographImportPlanItemDto planItem, - TimeChunkDto chunk + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk ); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionContext.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionContext.java index e3e0e52..9abd397 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionContext.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionContext.java @@ -2,9 +2,9 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventSourceDto; -import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import at.procon.eventhub.tachograph.dto.TimeChunkDto; import java.util.UUID; public record TachographExtractionContext( @@ -12,8 +12,8 @@ public record TachographExtractionContext( UUID packageId, int eventSourceId, TachographImportRequest request, - TachographImportPlanItemDto planItem, - TimeChunkDto chunk, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk, EventSourceDto eventSource, EventHubPackageRequest packageInfo ) { diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java index f245faf..2d357a2 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java @@ -1,46 +1,28 @@ package at.procon.eventhub.tachograph.service; -import at.procon.eventhub.dto.DataPackageType; -import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventSourceDto; -import at.procon.eventhub.dto.ImportRunStatus; -import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; -import at.procon.eventhub.tachograph.dto.TachographImportPlanDto; -import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto; -import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import at.procon.eventhub.tachograph.dto.TachographImportRunResultDto; -import at.procon.eventhub.tachograph.dto.TimeChunkDto; +import at.procon.eventhub.importing.AbstractImportExecutionService; +import at.procon.eventhub.importing.ImportPlanDto; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportRunResultDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.persistence.ImportCursorRepository; +import at.procon.eventhub.importing.persistence.ImportRunRepository; import at.procon.eventhub.persistence.DataPackageRepository; import at.procon.eventhub.persistence.EventSourceRepository; -import at.procon.eventhub.tachograph.persistence.ImportCursorRepository; -import at.procon.eventhub.tachograph.persistence.ImportRunRepository; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; +import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; +import at.procon.eventhub.tachograph.dto.TachographImportRequest; +import at.procon.eventhub.tachograph.dto.TachographImportRunResultDto; import java.util.Map; import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -/** - * Creates import runs and extraction data packages for tachograph acquisition. - * - * EventHub data packages are extraction batches. The original tachograph card/VU - * package is preserved later as SourcePackageRefDto on acquired events or in - * batch metadata when an extractor processes one concrete source package. - */ @Service -public class TachographImportExecutionService { - - private static final Logger log = LoggerFactory.getLogger(TachographImportExecutionService.class); +public class TachographImportExecutionService + extends AbstractImportExecutionService { private final TachographImportPlanService planService; - private final EventSourceRepository eventSourceRepository; - private final ImportRunRepository importRunRepository; - private final DataPackageRepository dataPackageRepository; - private final ImportCursorRepository importCursorRepository; private final TachographMasterDataRefreshService masterDataRefreshService; private final TachographExtractionBatchExecutor extractionBatchExecutor; @@ -53,116 +35,46 @@ public class TachographImportExecutionService { TachographMasterDataRefreshService masterDataRefreshService, TachographExtractionBatchExecutor extractionBatchExecutor ) { + super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository); this.planService = planService; - this.eventSourceRepository = eventSourceRepository; - this.importRunRepository = importRunRepository; - this.dataPackageRepository = dataPackageRepository; - this.importCursorRepository = importCursorRepository; this.masterDataRefreshService = masterDataRefreshService; this.extractionBatchExecutor = extractionBatchExecutor; } @Transactional public TachographImportRunResultDto startImport(TachographImportRequest request) { - return createImportRun(request, false); + return toTachographResult(createImportRun(request, false)); } @Transactional public TachographImportRunResultDto startAndExecuteImport(TachographImportRequest request) { - return createImportRun(request, true); + return toTachographResult(createImportRun(request, true)); } - private TachographImportRunResultDto createImportRun(TachographImportRequest request, boolean executeImmediately) { - TachographImportPlanDto plan = planService.createPlan(request); - int baseEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), request.eventSource()); - UUID importRunId = importRunRepository.createPlannedRun(baseEventSourceId, request, Map.of( - "note", executeImmediately - ? "Created tachograph import run and executing planned extraction packages." - : "Created tachograph import run and planned extraction packages.", - "packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto.", - "executeImmediately", executeImmediately - )); - - List packageIds = new ArrayList<>(); - List plannedPackages = new ArrayList<>(); - int batchNo = 1; - try { - for (TachographImportPlanItemDto item : plan.items()) { - EventSourceDto itemEventSource = eventSourceForItem(request.eventSource(), item); - int itemEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), itemEventSource); - for (TimeChunkDto chunk : plan.chunks()) { - EventHubPackageRequest packageInfo = packageRequestFor(request, itemEventSource, item, chunk); - String packageKey = packageKey(importRunId, packageInfo, item, chunk, batchNo); - UUID packageId = dataPackageRepository.createPlannedExtractionPackage( - importRunId, - itemEventSourceId, - packageKey, - packageInfo, - item.extractionCode(), - item.sourceKind(), - item.entityAxis(), - chunk == null ? null : chunk.occurredFrom(), - chunk == null ? null : chunk.occurredTo(), - batchNo, - metadata(request, item, chunk, importRunId) - ); - packageIds.add(packageId); - plannedPackages.add(new PlannedPackage(packageId, itemEventSourceId, item, chunk)); - batchNo++; - } - } - importRunRepository.markPlannedPackages(importRunId, packageIds.size()); - log.info("Created tachograph import run importRunId={} plannedPackages={} tenant={} mode={} strategy={} executeImmediately={}", - importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy(), executeImmediately); - - if (executeImmediately) { - executePlannedPackages(importRunId, request, plannedPackages); - return new TachographImportRunResultDto(importRunId, ImportRunStatus.COMPLETED, packageIds.size(), plan, List.copyOf(packageIds)); - } - return new TachographImportRunResultDto(importRunId, ImportRunStatus.PLANNED, packageIds.size(), plan, List.copyOf(packageIds)); - } catch (RuntimeException ex) { - importRunRepository.markFailed(importRunId, ex.getMessage()); - throw ex; - } + @Override + protected ImportPlanDto createPlan(TachographImportRequest request) { + return planService.createPlan(request); } - private void executePlannedPackages(UUID importRunId, TachographImportRequest request, List plannedPackages) { - importRunRepository.markRunning(importRunId); + @Override + protected void beforeExecute(TachographImportRequest request) { masterDataRefreshService.refreshIfRequested(request); - List results = new ArrayList<>(); - for (PlannedPackage plannedPackage : plannedPackages) { - dataPackageRepository.markImporting(plannedPackage.packageId()); - TachographExtractionBatchResultDto result = extractionBatchExecutor.execute( - importRunId, - plannedPackage.packageId(), - plannedPackage.eventSourceId(), - request, - plannedPackage.planItem(), - plannedPackage.chunk() - ); - results.add(result); - dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted()); - if (result.executed()) { - importCursorRepository.advanceCursor( - request.tenantKey(), - plannedPackage.eventSourceId(), - request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(), - plannedPackage.planItem().eventFamily(), - plannedPackage.planItem().sourceKind(), - request.acquisitionStrategy(), - result - ); - } - } - importRunRepository.markCompleted(importRunId); - log.info("Completed tachograph import run importRunId={} packages={} insertedEvents={} executedBatches={}", - importRunId, - plannedPackages.size(), - results.stream().mapToInt(TachographExtractionBatchResultDto::eventsInserted).sum(), - results.stream().filter(TachographExtractionBatchResultDto::executed).count()); } - private EventSourceDto eventSourceForItem(EventSourceDto base, TachographImportPlanItemDto item) { + @Override + protected TachographExtractionBatchResultDto executeBatch( + UUID importRunId, + UUID packageId, + int eventSourceId, + TachographImportRequest request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk + ) { + return extractionBatchExecutor.execute(importRunId, packageId, eventSourceId, request, planItem, chunk); + } + + @Override + protected EventSourceDto eventSourceForItem(EventSourceDto base, ImportPlanItemDto item) { String sourceKind = item.sourceKind(); String sourceKey = switch (sourceKind) { case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT"; @@ -179,57 +91,41 @@ public class TachographImportExecutionService { ); } - private EventHubPackageRequest packageRequestFor( - TachographImportRequest request, - EventSourceDto itemEventSource, - TachographImportPlanItemDto item, - TimeChunkDto chunk - ) { - return new EventHubPackageRequest( - request.tenantKey(), - itemEventSource, - request.sourceGroup(), - request.importScope(), - item.eventFamily().name(), - null, - externalPackageId(request, item, chunk) + @Override + protected Map importRunMetadata(TachographImportRequest request, boolean executeImmediately) { + return Map.of( + "note", executeImmediately + ? "Created tachograph import run and executing planned extraction packages." + : "Created tachograph import run and planned extraction packages.", + "packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto.", + "executeImmediately", executeImmediately ); } - private String externalPackageId(TachographImportRequest request, TachographImportPlanItemDto item, TimeChunkDto chunk) { - String scope = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(); - return "TACHOGRAPH:" + item.sourceKind() + ":" + item.extractionCode() + ":" + item.eventFamily() - + ":" + scope + ":CHUNK-" + chunk.sequence(); + @Override + protected String providerPackagePrefix() { + return "TACHOGRAPH"; } - private String packageKey(UUID importRunId, EventHubPackageRequest packageInfo, TachographImportPlanItemDto item, TimeChunkDto chunk, int batchNo) { - return packageInfo.tenantKey() - + ":" + packageInfo.eventSource().stableKey() - + ":" + item.eventFamily() - + ":" + item.extractionCode() - + ":RUN-" + importRunId - + ":CHUNK-" + chunk.sequence() - + ":BATCH-" + batchNo; - } - - private Map metadata(TachographImportRequest request, TachographImportPlanItemDto item, TimeChunkDto chunk, UUID importRunId) { - Map metadata = new LinkedHashMap<>(); - metadata.put("importRunId", importRunId.toString()); - metadata.put("mode", request.mode().name()); - metadata.put("acquisitionStrategy", request.acquisitionStrategy().name()); - metadata.put("refreshMasterDataFirst", request.refreshMasterDataFirst()); - metadata.put("eventFamily", item.eventFamily().name()); - metadata.put("sourceKind", item.sourceKind()); - metadata.put("extractionCode", item.extractionCode()); - metadata.put("sourceTables", item.sourceTables()); - metadata.put("entityAxis", item.entityAxis()); - metadata.put("chunkSequence", chunk.sequence()); - metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString()); - metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString()); + @Override + protected Map packageMetadata( + TachographImportRequest request, + ImportPlanItemDto item, + ImportTimeChunkDto chunk, + UUID importRunId + ) { + Map metadata = super.packageMetadata(request, item, chunk, importRunId); metadata.put("sourcePackageRefPolicy", "Original tachograph card/VU package is preserved per acquired event when SQL extraction returns it."); return metadata; } - private record PlannedPackage(UUID packageId, int eventSourceId, TachographImportPlanItemDto planItem, TimeChunkDto chunk) { + private TachographImportRunResultDto toTachographResult(ImportRunResultDto result) { + return new TachographImportRunResultDto( + result.importRunId(), + result.status(), + result.plannedPackageCount(), + result.plan(), + result.plannedPackageIds() + ); } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java index 4157861..a64238d 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java @@ -5,10 +5,10 @@ import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.EventFamily; import at.procon.eventhub.dto.ImportMode; import at.procon.eventhub.dto.ImportScopeDto; -import at.procon.eventhub.tachograph.dto.TachographImportPlanDto; -import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto; +import at.procon.eventhub.importing.ImportPlanDto; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import at.procon.eventhub.tachograph.dto.TimeChunkDto; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; @@ -23,12 +23,12 @@ public class TachographImportPlanService { this.properties = properties; } - public TachographImportPlanDto createPlan(TachographImportRequest request) { - List items = new ArrayList<>(); + public ImportPlanDto createPlan(TachographImportRequest request) { + List items = new ArrayList<>(); for (EventFamily family : request.eventFamilies()) { items.addAll(itemsFor(family, request.acquisitionStrategy())); } - return new TachographImportPlanDto( + return new ImportPlanDto( request.tenantKey(), request.mode(), request.acquisitionStrategy(), @@ -41,7 +41,7 @@ public class TachographImportPlanService { ); } - private List chunksFor(TachographImportRequest request) { + private List chunksFor(TachographImportRequest request) { ImportScopeDto scope = request.importScope(); OffsetDateTime from = scope == null ? null : scope.occurredFrom(); OffsetDateTime to = scope == null ? null : scope.occurredTo(); @@ -51,14 +51,14 @@ public class TachographImportPlanService { // timestamps are used later by the extractor. if (request.mode() == ImportMode.INCREMENTAL_UPDATE && request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) { - return List.of(new TimeChunkDto(1, from, to)); + return List.of(new ImportTimeChunkDto(1, from, to)); } if (from == null || to == null) { - return List.of(new TimeChunkDto(1, from, to)); + return List.of(new ImportTimeChunkDto(1, from, to)); } - List chunks = new ArrayList<>(); + List chunks = new ArrayList<>(); int days = Math.max(1, properties.getTachograph().getDefaultChunkDays()); OffsetDateTime cursor = from; int sequence = 1; @@ -67,13 +67,13 @@ public class TachographImportPlanService { if (next.isAfter(to)) { next = to; } - chunks.add(new TimeChunkDto(sequence++, cursor, next)); + chunks.add(new ImportTimeChunkDto(sequence++, cursor, next)); cursor = next; } - return chunks.isEmpty() ? List.of(new TimeChunkDto(1, from, to)) : chunks; + return chunks.isEmpty() ? List.of(new ImportTimeChunkDto(1, from, to)) : chunks; } - private List itemsFor(EventFamily family, AcquisitionStrategy strategy) { + private List itemsFor(EventFamily family, AcquisitionStrategy strategy) { return switch (family) { case DRIVER_ACTIVITY -> List.of( item(family, "VEHICLE_UNIT", "VU_ACTIVITY", List.of("VUActivity"), "VEHICLE", "Vehicle-unit driver activity point events", strategy), @@ -109,7 +109,7 @@ public class TachographImportPlanService { }; } - private TachographImportPlanItemDto item( + private ImportPlanItemDto item( EventFamily family, String sourceKind, String extractionCode, @@ -118,6 +118,6 @@ public class TachographImportPlanService { String description, AcquisitionStrategy strategy ) { - return new TachographImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description, strategy); + return new ImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description, strategy); } } diff --git a/src/main/resources/db/migration/V1__create_eventhub_schema.sql b/src/main/resources/db/migration/V1__create_eventhub_schema.sql index b4e5718..5bfcdc4 100644 --- a/src/main/resources/db/migration/V1__create_eventhub_schema.sql +++ b/src/main/resources/db/migration/V1__create_eventhub_schema.sql @@ -18,9 +18,9 @@ create table if not exists eventhub.event_source ( constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key) ); --- One execution of a tachograph acquisition job. A run may create many EventHub --- extraction packages. The original tachograph card/VU packages are preserved --- separately as source_package_* references, not used as EventHub package identity. +-- One execution of a provider acquisition job. A run may create many EventHub +-- extraction packages. Original provider packages are preserved separately as +-- source_package_* references, not used as EventHub package identity. create table if not exists eventhub.import_run ( id uuid primary key, tenant_key text not null, @@ -73,8 +73,8 @@ create table if not exists eventhub.import_cursor ( ); -- One EventHub acquisition package. It represents an extraction batch, not --- necessarily one original tachograph package. A single extraction batch can --- contain rows from multiple original card/VU packages; those are preserved on +-- necessarily one original provider package. A single extraction batch can +-- contain rows from multiple original provider packages; those are preserved on -- acquired_event.source_package_* where available. create table if not exists eventhub.data_package ( id uuid primary key, @@ -110,7 +110,7 @@ create table if not exists eventhub.data_package ( chunk_to timestamptz, -- Optional package-level source package reference. Usually null for extraction - -- batches, because the original tachograph package is stored per acquired event. + -- batches, because the original source package is stored per acquired event. source_package_kind text, source_package_id text, source_package_entity_id text,