Generalize import execution infrastructure

This commit is contained in:
trifonovt 2026-04-30 13:55:08 +02:00
parent 7ec0a512bd
commit 3e96308c3f
19 changed files with 400 additions and 227 deletions

View File

@ -0,0 +1,218 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.importing.persistence.ImportRunRepository;
import at.procon.eventhub.persistence.DataPackageRepository;
import at.procon.eventhub.persistence.EventSourceRepository;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Shared import-run orchestration for source providers. Provider services supply
* the plan and the concrete batch executor; this class owns the common EventHub
* run, package, and cursor lifecycle.
*/
public abstract class AbstractImportExecutionService<R extends ImportRunRequest, B extends ExtractionBatchResult> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final EventSourceRepository eventSourceRepository;
private final ImportRunRepository importRunRepository;
private final DataPackageRepository dataPackageRepository;
private final ImportCursorRepository importCursorRepository;
protected AbstractImportExecutionService(
EventSourceRepository eventSourceRepository,
ImportRunRepository importRunRepository,
DataPackageRepository dataPackageRepository,
ImportCursorRepository importCursorRepository
) {
this.eventSourceRepository = eventSourceRepository;
this.importRunRepository = importRunRepository;
this.dataPackageRepository = dataPackageRepository;
this.importCursorRepository = importCursorRepository;
}
protected ImportRunResultDto createImportRun(R request, boolean executeImmediately) {
ImportPlanDto plan = createPlan(request);
int baseEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), request.eventSource());
UUID importRunId = importRunRepository.createPlannedRun(
baseEventSourceId,
request,
importRunMetadata(request, executeImmediately)
);
List<UUID> packageIds = new ArrayList<>();
List<PlannedPackage> plannedPackages = new ArrayList<>();
int batchNo = 1;
try {
for (ImportPlanItemDto item : plan.items()) {
EventSourceDto itemEventSource = eventSourceForItem(request.eventSource(), item);
int itemEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), itemEventSource);
for (ImportTimeChunkDto chunk : plan.chunks()) {
EventHubPackageRequest packageInfo = packageRequestFor(request, itemEventSource, item, chunk);
String packageKey = packageKey(importRunId, packageInfo, item, chunk, batchNo);
UUID packageId = dataPackageRepository.createPlannedExtractionPackage(
importRunId,
itemEventSourceId,
packageKey,
packageInfo,
item.extractionCode(),
item.sourceKind(),
item.entityAxis(),
chunk == null ? null : chunk.occurredFrom(),
chunk == null ? null : chunk.occurredTo(),
batchNo,
packageMetadata(request, item, chunk, importRunId)
);
packageIds.add(packageId);
plannedPackages.add(new PlannedPackage(packageId, itemEventSourceId, item, chunk));
batchNo++;
}
}
importRunRepository.markPlannedPackages(importRunId, packageIds.size());
log.info("Created import run provider={} importRunId={} plannedPackages={} tenant={} mode={} strategy={} executeImmediately={}",
providerPackagePrefix(), importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy(), executeImmediately);
if (executeImmediately) {
executePlannedPackages(importRunId, request, plannedPackages);
return new ImportRunResultDto(importRunId, ImportRunStatus.COMPLETED, packageIds.size(), plan, List.copyOf(packageIds));
}
return new ImportRunResultDto(importRunId, ImportRunStatus.PLANNED, packageIds.size(), plan, List.copyOf(packageIds));
} catch (RuntimeException ex) {
importRunRepository.markFailed(importRunId, ex.getMessage());
throw ex;
}
}
protected abstract ImportPlanDto createPlan(R request);
protected abstract B executeBatch(
UUID importRunId,
UUID packageId,
int eventSourceId,
R request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
);
protected void beforeExecute(R request) {
}
protected EventSourceDto eventSourceForItem(EventSourceDto base, ImportPlanItemDto item) {
return base;
}
protected Map<String, Object> importRunMetadata(R request, boolean executeImmediately) {
return Map.of(
"note", executeImmediately
? "Created import run and executing planned extraction packages."
: "Created import run and planned extraction packages.",
"packageModel", "EventHub data packages are extraction batches; original source packages are SourcePackageRefDto.",
"executeImmediately", executeImmediately
);
}
protected Map<String, Object> packageMetadata(R request, ImportPlanItemDto item, ImportTimeChunkDto chunk, UUID importRunId) {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("importRunId", importRunId.toString());
metadata.put("mode", request.mode().name());
metadata.put("acquisitionStrategy", request.acquisitionStrategy().name());
metadata.put("refreshMasterDataFirst", request.refreshMasterDataFirst());
metadata.put("eventFamily", item.eventFamily().name());
metadata.put("sourceKind", item.sourceKind());
metadata.put("extractionCode", item.extractionCode());
metadata.put("sourceTables", item.sourceTables());
metadata.put("entityAxis", item.entityAxis());
metadata.put("chunkSequence", chunk.sequence());
metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString());
metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString());
metadata.put("sourcePackageRefPolicy", "Original source package is preserved per acquired event when extraction returns it.");
return metadata;
}
protected String providerPackagePrefix() {
return "SOURCE";
}
protected String externalPackageId(R request, ImportPlanItemDto item, ImportTimeChunkDto chunk) {
String scope = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
return providerPackagePrefix() + ":" + item.sourceKind() + ":" + item.extractionCode() + ":" + item.eventFamily()
+ ":" + scope + ":CHUNK-" + chunk.sequence();
}
protected String packageKey(UUID importRunId, EventHubPackageRequest packageInfo, ImportPlanItemDto item, ImportTimeChunkDto chunk, int batchNo) {
return packageInfo.tenantKey()
+ ":" + packageInfo.eventSource().stableKey()
+ ":" + item.eventFamily()
+ ":" + item.extractionCode()
+ ":RUN-" + importRunId
+ ":CHUNK-" + chunk.sequence()
+ ":BATCH-" + batchNo;
}
private void executePlannedPackages(UUID importRunId, R request, List<PlannedPackage> plannedPackages) {
importRunRepository.markRunning(importRunId);
beforeExecute(request);
List<B> results = new ArrayList<>();
for (PlannedPackage plannedPackage : plannedPackages) {
dataPackageRepository.markImporting(plannedPackage.packageId());
B result = executeBatch(
importRunId,
plannedPackage.packageId(),
plannedPackage.eventSourceId(),
request,
plannedPackage.planItem(),
plannedPackage.chunk()
);
results.add(result);
dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted());
if (result.executed()) {
importCursorRepository.advanceCursor(
request.tenantKey(),
plannedPackage.eventSourceId(),
request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(),
plannedPackage.planItem().eventFamily(),
plannedPackage.planItem().sourceKind(),
request.acquisitionStrategy(),
result
);
}
}
importRunRepository.markCompleted(importRunId);
log.info("Completed import run provider={} importRunId={} packages={} insertedEvents={} executedBatches={}",
providerPackagePrefix(),
importRunId,
plannedPackages.size(),
results.stream().mapToInt(ExtractionBatchResult::eventsInserted).sum(),
results.stream().filter(ExtractionBatchResult::executed).count());
}
private EventHubPackageRequest packageRequestFor(
R request,
EventSourceDto itemEventSource,
ImportPlanItemDto item,
ImportTimeChunkDto chunk
) {
return new EventHubPackageRequest(
request.tenantKey(),
itemEventSource,
request.sourceGroup(),
request.importScope(),
item.eventFamily().name(),
null,
externalPackageId(request, item, chunk)
);
}
private record PlannedPackage(UUID packageId, int eventSourceId, ImportPlanItemDto planItem, ImportTimeChunkDto chunk) {
}
}

View File

@ -0,0 +1,18 @@
package at.procon.eventhub.importing;
import java.time.OffsetDateTime;
public interface ExtractionBatchResult {
int eventsInserted();
boolean executed();
OffsetDateTime lastSourcePackageImportedAt();
String lastSourcePackageId();
OffsetDateTime lastSourceRowUpdatedAt();
OffsetDateTime lastOccurredTo();
}

View File

@ -1,14 +1,13 @@
package at.procon.eventhub.tachograph.dto;
package at.procon.eventhub.importing;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import java.util.List;
public record TachographImportPlanDto(
public record ImportPlanDto(
String tenantKey,
ImportMode mode,
AcquisitionStrategy acquisitionStrategy,
@ -16,7 +15,7 @@ public record TachographImportPlanDto(
ImportScopeDto importScope,
SourceGroupRefDto sourceGroup,
EventSourceDto eventSource,
List<TimeChunkDto> chunks,
List<TachographImportPlanItemDto> items
List<ImportTimeChunkDto> chunks,
List<ImportPlanItemDto> items
) {
}

View File

@ -1,11 +1,10 @@
package at.procon.eventhub.tachograph.dto;
package at.procon.eventhub.importing;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import java.util.List;
public record TachographImportPlanItemDto(
public record ImportPlanItemDto(
EventFamily eventFamily,
String sourceKind,
String extractionCode,

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import java.util.Set;
public interface ImportRunRequest {
String tenantKey();
EventSourceDto eventSource();
SourceGroupRefDto sourceGroup();
ImportScopeDto importScope();
Set<EventFamily> eventFamilies();
ImportMode mode();
boolean refreshMasterDataFirst();
AcquisitionStrategy acquisitionStrategy();
}

View File

@ -0,0 +1,14 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.dto.ImportRunStatus;
import java.util.List;
import java.util.UUID;
public record ImportRunResultDto(
UUID importRunId,
ImportRunStatus status,
int plannedPackageCount,
ImportPlanDto plan,
List<UUID> plannedPackageIds
) {
}

View File

@ -1,8 +1,8 @@
package at.procon.eventhub.tachograph.dto;
package at.procon.eventhub.importing;
import java.time.OffsetDateTime;
public record TimeChunkDto(
public record ImportTimeChunkDto(
int sequence,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo

View File

@ -1,9 +1,9 @@
package at.procon.eventhub.tachograph.persistence;
package at.procon.eventhub.importing.persistence;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.importing.ExtractionBatchResult;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@ -63,7 +63,7 @@ public class ImportCursorRepository {
EventFamily eventFamily,
String sourceKind,
AcquisitionStrategy strategy,
TachographExtractionBatchResultDto result
ExtractionBatchResult result
) {
jdbcTemplate.update(
"""

View File

@ -1,10 +1,9 @@
package at.procon.eventhub.tachograph.persistence;
package at.procon.eventhub.importing.persistence;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import at.procon.eventhub.importing.ImportRunRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Array;
@ -26,12 +25,11 @@ public class ImportRunRepository {
this.objectMapper = objectMapper;
}
public UUID createPlannedRun(int eventSourceId, TachographImportRequest request, Map<String, Object> metadata) {
public UUID createPlannedRun(int eventSourceId, ImportRunRequest request, Map<String, Object> metadata) {
UUID id = UUID.randomUUID();
SourceGroupRefDto sourceGroup = request.sourceGroup();
ImportScopeDto importScope = request.importScope();
SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation();
EventSourceDto eventSource = request.eventSource();
jdbcTemplate.update(con -> {
var ps = con.prepareStatement("""
@ -84,7 +82,7 @@ public class ImportRunRepository {
jdbcTemplate.update("update eventhub.import_run set status = ?, error_message = ?, finished_at = now() where id = ?", ImportRunStatus.FAILED.name(), errorMessage, id);
}
private Array eventFamilyArray(Connection con, TachographImportRequest request) throws SQLException {
private Array eventFamilyArray(Connection con, ImportRunRequest request) throws SQLException {
String[] values = request.eventFamilies().stream().map(Enum::name).toArray(String[]::new);
return con.createArrayOf("text", values);
}

View File

@ -1,5 +1,6 @@
package at.procon.eventhub.tachograph.dto;
import at.procon.eventhub.importing.ExtractionBatchResult;
import java.time.OffsetDateTime;
import java.util.UUID;
@ -16,5 +17,5 @@ public record TachographExtractionBatchResultDto(
String lastSourcePackageId,
OffsetDateTime lastSourceRowUpdatedAt,
OffsetDateTime lastOccurredTo
) {
) implements ExtractionBatchResult {
}

View File

@ -6,6 +6,7 @@ import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.importing.ImportRunRequest;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
@ -27,7 +28,7 @@ public record TachographImportRequest(
ImportMode mode,
boolean refreshMasterDataFirst,
AcquisitionStrategy acquisitionStrategy
) {
) implements ImportRunRequest {
public TachographImportRequest {
tenantKey = tenantKey == null ? null : tenantKey.trim();
if (importScope == null) {

View File

@ -1,6 +1,7 @@
package at.procon.eventhub.tachograph.dto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.importing.ImportPlanDto;
import java.util.List;
import java.util.UUID;
@ -9,7 +10,7 @@ public record TachographImportRunResultDto(
UUID importRunId,
ImportRunStatus status,
int plannedPackageCount,
TachographImportPlanDto plan,
ImportPlanDto plan,
List<UUID> plannedPackageIds
) {
}

View File

@ -2,11 +2,11 @@ package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import at.procon.eventhub.tachograph.dto.TimeChunkDto;
import at.procon.eventhub.tachograph.persistence.ImportCursorRepository;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
@ -54,8 +54,8 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti
UUID packageId,
int eventSourceId,
TachographImportRequest request,
TachographImportPlanItemDto planItem,
TimeChunkDto chunk
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
) {
TachographExtractionDefinition definition = definitionRegistry.findByCode(planItem.extractionCode())
.orElseThrow(() -> new IllegalArgumentException("No tachograph extraction definition for " + planItem.extractionCode()));
@ -141,7 +141,7 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti
return params;
}
private ImportScopeDto chunkScope(ImportScopeDto scope, TimeChunkDto chunk) {
private ImportScopeDto chunkScope(ImportScopeDto scope, ImportTimeChunkDto chunk) {
if (scope == null) {
return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo());
}
@ -154,7 +154,7 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti
);
}
private at.procon.eventhub.dto.EventSourceDto eventSourceFor(TachographImportRequest request, TachographImportPlanItemDto planItem) {
private at.procon.eventhub.dto.EventSourceDto eventSourceFor(TachographImportRequest request, ImportPlanItemDto planItem) {
String sourceKey = switch (planItem.sourceKind()) {
case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT";
case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD";

View File

@ -1,9 +1,9 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import at.procon.eventhub.tachograph.dto.TimeChunkDto;
import java.util.UUID;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -31,8 +31,8 @@ public class NoopTachographExtractionBatchExecutor implements TachographExtracti
UUID packageId,
int eventSourceId,
TachographImportRequest request,
TachographImportPlanItemDto planItem,
TimeChunkDto chunk
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
) {
log.warn("No concrete tachograph SQL extractor configured. importRunId={} packageId={} extractionCode={} sourceKind={} chunk={}",
importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), chunk.sequence());

View File

@ -1,9 +1,9 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import at.procon.eventhub.tachograph.dto.TimeChunkDto;
import java.util.UUID;
public interface TachographExtractionBatchExecutor {
@ -13,7 +13,7 @@ public interface TachographExtractionBatchExecutor {
UUID packageId,
int eventSourceId,
TachographImportRequest request,
TachographImportPlanItemDto planItem,
TimeChunkDto chunk
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
);
}

View File

@ -2,9 +2,9 @@ package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import at.procon.eventhub.tachograph.dto.TimeChunkDto;
import java.util.UUID;
public record TachographExtractionContext(
@ -12,8 +12,8 @@ public record TachographExtractionContext(
UUID packageId,
int eventSourceId,
TachographImportRequest request,
TachographImportPlanItemDto planItem,
TimeChunkDto chunk,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
EventSourceDto eventSource,
EventHubPackageRequest packageInfo
) {

View File

@ -1,46 +1,28 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.tachograph.dto.TachographImportPlanDto;
import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import at.procon.eventhub.tachograph.dto.TachographImportRunResultDto;
import at.procon.eventhub.tachograph.dto.TimeChunkDto;
import at.procon.eventhub.importing.AbstractImportExecutionService;
import at.procon.eventhub.importing.ImportPlanDto;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportRunResultDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.importing.persistence.ImportRunRepository;
import at.procon.eventhub.persistence.DataPackageRepository;
import at.procon.eventhub.persistence.EventSourceRepository;
import at.procon.eventhub.tachograph.persistence.ImportCursorRepository;
import at.procon.eventhub.tachograph.persistence.ImportRunRepository;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import at.procon.eventhub.tachograph.dto.TachographImportRunResultDto;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* Creates import runs and extraction data packages for tachograph acquisition.
*
* EventHub data packages are extraction batches. The original tachograph card/VU
* package is preserved later as SourcePackageRefDto on acquired events or in
* batch metadata when an extractor processes one concrete source package.
*/
@Service
public class TachographImportExecutionService {
private static final Logger log = LoggerFactory.getLogger(TachographImportExecutionService.class);
public class TachographImportExecutionService
extends AbstractImportExecutionService<TachographImportRequest, TachographExtractionBatchResultDto> {
private final TachographImportPlanService planService;
private final EventSourceRepository eventSourceRepository;
private final ImportRunRepository importRunRepository;
private final DataPackageRepository dataPackageRepository;
private final ImportCursorRepository importCursorRepository;
private final TachographMasterDataRefreshService masterDataRefreshService;
private final TachographExtractionBatchExecutor extractionBatchExecutor;
@ -53,116 +35,46 @@ public class TachographImportExecutionService {
TachographMasterDataRefreshService masterDataRefreshService,
TachographExtractionBatchExecutor extractionBatchExecutor
) {
super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository);
this.planService = planService;
this.eventSourceRepository = eventSourceRepository;
this.importRunRepository = importRunRepository;
this.dataPackageRepository = dataPackageRepository;
this.importCursorRepository = importCursorRepository;
this.masterDataRefreshService = masterDataRefreshService;
this.extractionBatchExecutor = extractionBatchExecutor;
}
@Transactional
public TachographImportRunResultDto startImport(TachographImportRequest request) {
return createImportRun(request, false);
return toTachographResult(createImportRun(request, false));
}
@Transactional
public TachographImportRunResultDto startAndExecuteImport(TachographImportRequest request) {
return createImportRun(request, true);
return toTachographResult(createImportRun(request, true));
}
private TachographImportRunResultDto createImportRun(TachographImportRequest request, boolean executeImmediately) {
TachographImportPlanDto plan = planService.createPlan(request);
int baseEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), request.eventSource());
UUID importRunId = importRunRepository.createPlannedRun(baseEventSourceId, request, Map.of(
"note", executeImmediately
? "Created tachograph import run and executing planned extraction packages."
: "Created tachograph import run and planned extraction packages.",
"packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto.",
"executeImmediately", executeImmediately
));
List<UUID> packageIds = new ArrayList<>();
List<PlannedPackage> plannedPackages = new ArrayList<>();
int batchNo = 1;
try {
for (TachographImportPlanItemDto item : plan.items()) {
EventSourceDto itemEventSource = eventSourceForItem(request.eventSource(), item);
int itemEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), itemEventSource);
for (TimeChunkDto chunk : plan.chunks()) {
EventHubPackageRequest packageInfo = packageRequestFor(request, itemEventSource, item, chunk);
String packageKey = packageKey(importRunId, packageInfo, item, chunk, batchNo);
UUID packageId = dataPackageRepository.createPlannedExtractionPackage(
importRunId,
itemEventSourceId,
packageKey,
packageInfo,
item.extractionCode(),
item.sourceKind(),
item.entityAxis(),
chunk == null ? null : chunk.occurredFrom(),
chunk == null ? null : chunk.occurredTo(),
batchNo,
metadata(request, item, chunk, importRunId)
);
packageIds.add(packageId);
plannedPackages.add(new PlannedPackage(packageId, itemEventSourceId, item, chunk));
batchNo++;
}
}
importRunRepository.markPlannedPackages(importRunId, packageIds.size());
log.info("Created tachograph import run importRunId={} plannedPackages={} tenant={} mode={} strategy={} executeImmediately={}",
importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy(), executeImmediately);
if (executeImmediately) {
executePlannedPackages(importRunId, request, plannedPackages);
return new TachographImportRunResultDto(importRunId, ImportRunStatus.COMPLETED, packageIds.size(), plan, List.copyOf(packageIds));
}
return new TachographImportRunResultDto(importRunId, ImportRunStatus.PLANNED, packageIds.size(), plan, List.copyOf(packageIds));
} catch (RuntimeException ex) {
importRunRepository.markFailed(importRunId, ex.getMessage());
throw ex;
}
@Override
protected ImportPlanDto createPlan(TachographImportRequest request) {
return planService.createPlan(request);
}
private void executePlannedPackages(UUID importRunId, TachographImportRequest request, List<PlannedPackage> plannedPackages) {
importRunRepository.markRunning(importRunId);
@Override
protected void beforeExecute(TachographImportRequest request) {
masterDataRefreshService.refreshIfRequested(request);
List<TachographExtractionBatchResultDto> results = new ArrayList<>();
for (PlannedPackage plannedPackage : plannedPackages) {
dataPackageRepository.markImporting(plannedPackage.packageId());
TachographExtractionBatchResultDto result = extractionBatchExecutor.execute(
importRunId,
plannedPackage.packageId(),
plannedPackage.eventSourceId(),
request,
plannedPackage.planItem(),
plannedPackage.chunk()
);
results.add(result);
dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted());
if (result.executed()) {
importCursorRepository.advanceCursor(
request.tenantKey(),
plannedPackage.eventSourceId(),
request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(),
plannedPackage.planItem().eventFamily(),
plannedPackage.planItem().sourceKind(),
request.acquisitionStrategy(),
result
);
}
}
importRunRepository.markCompleted(importRunId);
log.info("Completed tachograph import run importRunId={} packages={} insertedEvents={} executedBatches={}",
importRunId,
plannedPackages.size(),
results.stream().mapToInt(TachographExtractionBatchResultDto::eventsInserted).sum(),
results.stream().filter(TachographExtractionBatchResultDto::executed).count());
}
private EventSourceDto eventSourceForItem(EventSourceDto base, TachographImportPlanItemDto item) {
@Override
protected TachographExtractionBatchResultDto executeBatch(
UUID importRunId,
UUID packageId,
int eventSourceId,
TachographImportRequest request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
) {
return extractionBatchExecutor.execute(importRunId, packageId, eventSourceId, request, planItem, chunk);
}
@Override
protected EventSourceDto eventSourceForItem(EventSourceDto base, ImportPlanItemDto item) {
String sourceKind = item.sourceKind();
String sourceKey = switch (sourceKind) {
case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT";
@ -179,57 +91,41 @@ public class TachographImportExecutionService {
);
}
private EventHubPackageRequest packageRequestFor(
TachographImportRequest request,
EventSourceDto itemEventSource,
TachographImportPlanItemDto item,
TimeChunkDto chunk
) {
return new EventHubPackageRequest(
request.tenantKey(),
itemEventSource,
request.sourceGroup(),
request.importScope(),
item.eventFamily().name(),
null,
externalPackageId(request, item, chunk)
@Override
protected Map<String, Object> importRunMetadata(TachographImportRequest request, boolean executeImmediately) {
return Map.of(
"note", executeImmediately
? "Created tachograph import run and executing planned extraction packages."
: "Created tachograph import run and planned extraction packages.",
"packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto.",
"executeImmediately", executeImmediately
);
}
private String externalPackageId(TachographImportRequest request, TachographImportPlanItemDto item, TimeChunkDto chunk) {
String scope = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
return "TACHOGRAPH:" + item.sourceKind() + ":" + item.extractionCode() + ":" + item.eventFamily()
+ ":" + scope + ":CHUNK-" + chunk.sequence();
@Override
protected String providerPackagePrefix() {
return "TACHOGRAPH";
}
private String packageKey(UUID importRunId, EventHubPackageRequest packageInfo, TachographImportPlanItemDto item, TimeChunkDto chunk, int batchNo) {
return packageInfo.tenantKey()
+ ":" + packageInfo.eventSource().stableKey()
+ ":" + item.eventFamily()
+ ":" + item.extractionCode()
+ ":RUN-" + importRunId
+ ":CHUNK-" + chunk.sequence()
+ ":BATCH-" + batchNo;
}
private Map<String, Object> metadata(TachographImportRequest request, TachographImportPlanItemDto item, TimeChunkDto chunk, UUID importRunId) {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("importRunId", importRunId.toString());
metadata.put("mode", request.mode().name());
metadata.put("acquisitionStrategy", request.acquisitionStrategy().name());
metadata.put("refreshMasterDataFirst", request.refreshMasterDataFirst());
metadata.put("eventFamily", item.eventFamily().name());
metadata.put("sourceKind", item.sourceKind());
metadata.put("extractionCode", item.extractionCode());
metadata.put("sourceTables", item.sourceTables());
metadata.put("entityAxis", item.entityAxis());
metadata.put("chunkSequence", chunk.sequence());
metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString());
metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString());
@Override
protected Map<String, Object> packageMetadata(
TachographImportRequest request,
ImportPlanItemDto item,
ImportTimeChunkDto chunk,
UUID importRunId
) {
Map<String, Object> metadata = super.packageMetadata(request, item, chunk, importRunId);
metadata.put("sourcePackageRefPolicy", "Original tachograph card/VU package is preserved per acquired event when SQL extraction returns it.");
return metadata;
}
private record PlannedPackage(UUID packageId, int eventSourceId, TachographImportPlanItemDto planItem, TimeChunkDto chunk) {
private TachographImportRunResultDto toTachographResult(ImportRunResultDto result) {
return new TachographImportRunResultDto(
result.importRunId(),
result.status(),
result.plannedPackageCount(),
result.plan(),
result.plannedPackageIds()
);
}
}

View File

@ -5,10 +5,10 @@ import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.tachograph.dto.TachographImportPlanDto;
import at.procon.eventhub.tachograph.dto.TachographImportPlanItemDto;
import at.procon.eventhub.importing.ImportPlanDto;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import at.procon.eventhub.tachograph.dto.TimeChunkDto;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
@ -23,12 +23,12 @@ public class TachographImportPlanService {
this.properties = properties;
}
public TachographImportPlanDto createPlan(TachographImportRequest request) {
List<TachographImportPlanItemDto> items = new ArrayList<>();
public ImportPlanDto createPlan(TachographImportRequest request) {
List<ImportPlanItemDto> items = new ArrayList<>();
for (EventFamily family : request.eventFamilies()) {
items.addAll(itemsFor(family, request.acquisitionStrategy()));
}
return new TachographImportPlanDto(
return new ImportPlanDto(
request.tenantKey(),
request.mode(),
request.acquisitionStrategy(),
@ -41,7 +41,7 @@ public class TachographImportPlanService {
);
}
private List<TimeChunkDto> chunksFor(TachographImportRequest request) {
private List<ImportTimeChunkDto> chunksFor(TachographImportRequest request) {
ImportScopeDto scope = request.importScope();
OffsetDateTime from = scope == null ? null : scope.occurredFrom();
OffsetDateTime to = scope == null ? null : scope.occurredTo();
@ -51,14 +51,14 @@ public class TachographImportPlanService {
// timestamps are used later by the extractor.
if (request.mode() == ImportMode.INCREMENTAL_UPDATE
&& request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) {
return List.of(new TimeChunkDto(1, from, to));
return List.of(new ImportTimeChunkDto(1, from, to));
}
if (from == null || to == null) {
return List.of(new TimeChunkDto(1, from, to));
return List.of(new ImportTimeChunkDto(1, from, to));
}
List<TimeChunkDto> chunks = new ArrayList<>();
List<ImportTimeChunkDto> chunks = new ArrayList<>();
int days = Math.max(1, properties.getTachograph().getDefaultChunkDays());
OffsetDateTime cursor = from;
int sequence = 1;
@ -67,13 +67,13 @@ public class TachographImportPlanService {
if (next.isAfter(to)) {
next = to;
}
chunks.add(new TimeChunkDto(sequence++, cursor, next));
chunks.add(new ImportTimeChunkDto(sequence++, cursor, next));
cursor = next;
}
return chunks.isEmpty() ? List.of(new TimeChunkDto(1, from, to)) : chunks;
return chunks.isEmpty() ? List.of(new ImportTimeChunkDto(1, from, to)) : chunks;
}
private List<TachographImportPlanItemDto> itemsFor(EventFamily family, AcquisitionStrategy strategy) {
private List<ImportPlanItemDto> itemsFor(EventFamily family, AcquisitionStrategy strategy) {
return switch (family) {
case DRIVER_ACTIVITY -> List.of(
item(family, "VEHICLE_UNIT", "VU_ACTIVITY", List.of("VUActivity"), "VEHICLE", "Vehicle-unit driver activity point events", strategy),
@ -109,7 +109,7 @@ public class TachographImportPlanService {
};
}
private TachographImportPlanItemDto item(
private ImportPlanItemDto item(
EventFamily family,
String sourceKind,
String extractionCode,
@ -118,6 +118,6 @@ public class TachographImportPlanService {
String description,
AcquisitionStrategy strategy
) {
return new TachographImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description, strategy);
return new ImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description, strategy);
}
}

View File

@ -18,9 +18,9 @@ create table if not exists eventhub.event_source (
constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key)
);
-- One execution of a tachograph acquisition job. A run may create many EventHub
-- extraction packages. The original tachograph card/VU packages are preserved
-- separately as source_package_* references, not used as EventHub package identity.
-- One execution of a provider acquisition job. A run may create many EventHub
-- extraction packages. Original provider packages are preserved separately as
-- source_package_* references, not used as EventHub package identity.
create table if not exists eventhub.import_run (
id uuid primary key,
tenant_key text not null,
@ -73,8 +73,8 @@ create table if not exists eventhub.import_cursor (
);
-- One EventHub acquisition package. It represents an extraction batch, not
-- necessarily one original tachograph package. A single extraction batch can
-- contain rows from multiple original card/VU packages; those are preserved on
-- necessarily one original provider package. A single extraction batch can
-- contain rows from multiple original provider packages; those are preserved on
-- acquired_event.source_package_* where available.
create table if not exists eventhub.data_package (
id uuid primary key,
@ -110,7 +110,7 @@ create table if not exists eventhub.data_package (
chunk_to timestamptz,
-- Optional package-level source package reference. Usually null for extraction
-- batches, because the original tachograph package is stored per acquired event.
-- batches, because the original source package is stored per acquired event.
source_package_kind text,
source_package_id text,
source_package_entity_id text,