From e3ffa56932441b9e629a8f5ecd774d43dd77791b Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:26:20 +0200 Subject: [PATCH] Generalize import scheduling and extraction --- .../AbstractConfiguredImportPlanService.java | 120 +++++++++++ .../importing/AbstractImportScheduler.java | 95 ++++++++ .../importing/ConfiguredImportPlanDto.java | 30 +++ .../importing/ImportChunkPlanner.java | 42 ++++ .../AbstractJdbcExtractionBatchExecutor.java | 203 ++++++++++++++++++ .../AbstractNoopExtractionBatchExecutor.java | 35 +++ .../extraction/ExtractionBatchExecutor.java | 19 ++ .../extraction/ExtractionContext.java} | 8 +- .../extraction/ExtractionDefinition.java | 14 ++ .../ExtractionDefinitionRegistry.java | 27 +++ .../extraction/ExtractionRowMapper.java | 11 + .../AbstractTachographActivityRowMapper.java | 11 +- ...JdbcTachographExtractionBatchExecutor.java | 161 +++----------- ...NoopTachographExtractionBatchExecutor.java | 21 +- ...TachographConfiguredImportPlanService.java | 108 +++------- .../TachographExtractionBatchExecutor.java | 5 +- .../TachographExtractionDefinition.java | 13 -- ...achographExtractionDefinitionRegistry.java | 30 +-- .../TachographExtractionRowMapper.java | 10 - .../service/TachographImportPlanService.java | 43 +--- .../service/TachographImportScheduler.java | 104 ++++----- 21 files changed, 734 insertions(+), 376 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/importing/AbstractConfiguredImportPlanService.java create mode 100644 src/main/java/at/procon/eventhub/importing/AbstractImportScheduler.java create mode 100644 src/main/java/at/procon/eventhub/importing/ConfiguredImportPlanDto.java create mode 100644 src/main/java/at/procon/eventhub/importing/ImportChunkPlanner.java create mode 100644 src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java create mode 100644 src/main/java/at/procon/eventhub/importing/extraction/AbstractNoopExtractionBatchExecutor.java create mode 100644 src/main/java/at/procon/eventhub/importing/extraction/ExtractionBatchExecutor.java rename src/main/java/at/procon/eventhub/{tachograph/service/TachographExtractionContext.java => importing/extraction/ExtractionContext.java} (70%) create mode 100644 src/main/java/at/procon/eventhub/importing/extraction/ExtractionDefinition.java create mode 100644 src/main/java/at/procon/eventhub/importing/extraction/ExtractionDefinitionRegistry.java create mode 100644 src/main/java/at/procon/eventhub/importing/extraction/ExtractionRowMapper.java delete mode 100644 src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionDefinition.java delete mode 100644 src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionRowMapper.java diff --git a/src/main/java/at/procon/eventhub/importing/AbstractConfiguredImportPlanService.java b/src/main/java/at/procon/eventhub/importing/AbstractConfiguredImportPlanService.java new file mode 100644 index 0000000..8daae46 --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/AbstractConfiguredImportPlanService.java @@ -0,0 +1,120 @@ +package at.procon.eventhub.importing; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import java.util.List; +import java.util.Locale; +import java.util.NoSuchElementException; +import java.util.function.Supplier; + +public abstract class AbstractConfiguredImportPlanService { + + private final Supplier> planSupplier; + private final String providerName; + + protected AbstractConfiguredImportPlanService( + Supplier> planSupplier, + String providerName + ) { + this.planSupplier = planSupplier; + this.providerName = providerName; + } + + public List listPlans() { + return planSupplier.get().stream() + .map(this::toDto) + .toList(); + } + + public D getPlan(String planKey) { + return planByKey(planKey); + } + + public R createRequest(String planKey, ImportMode modeOverride, AcquisitionStrategy strategyOverride) { + EventHubProperties.ConfiguredImportPlan plan = rawPlanByKey(planKey); + return createRequest(plan, modeOverride, strategyOverride, false); + } + + public R createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) { + return createRequest(plan, plan.getScheduledMode(), plan.getScheduledStrategy(), false); + } + + public R createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) { + return createRequest(plan, plan.getInitialMode(), plan.getInitialStrategy(), true); + } + + protected abstract D toDto(EventHubProperties.ConfiguredImportPlan plan); + + protected abstract R buildRequest( + EventHubProperties.ConfiguredImportPlan plan, + ImportMode mode, + AcquisitionStrategy strategy, + ImportScopeDto scope + ); + + private R createRequest( + EventHubProperties.ConfiguredImportPlan plan, + ImportMode modeOverride, + AcquisitionStrategy strategyOverride, + boolean applyInitialOccurredWindow + ) { + ImportMode mode = modeOverride == null ? plan.getScheduledMode() : modeOverride; + AcquisitionStrategy strategy = strategyOverride == null + ? (mode == ImportMode.INCREMENTAL_UPDATE ? plan.getScheduledStrategy() : plan.getInitialStrategy()) + : strategyOverride; + return buildRequest(plan, mode, strategy, scopedForRequest(plan, applyInitialOccurredWindow)); + } + + private ImportScopeDto scopedForRequest(EventHubProperties.ConfiguredImportPlan plan, boolean applyInitialOccurredWindow) { + ImportScopeDto scope = plan.getImportScope(); + if (applyInitialOccurredWindow && scope != null + && (plan.getInitialOccurredFrom() != null || plan.getInitialOccurredTo() != null)) { + return new ImportScopeDto( + scope.type(), + scope.rootSourceOrganisation(), + scope.includeChildren(), + plan.getInitialOccurredFrom() == null ? scope.occurredFrom() : plan.getInitialOccurredFrom(), + plan.getInitialOccurredTo() == null ? scope.occurredTo() : plan.getInitialOccurredTo() + ); + } + return scope; + } + + private D planByKey(String planKey) { + return toDto(rawPlanByKey(planKey)); + } + + private EventHubProperties.ConfiguredImportPlan rawPlanByKey(String planKey) { + return planSupplier.get().stream() + .filter(plan -> normalize(plan.getPlanKey()).equals(normalize(planKey))) + .findFirst() + .orElseThrow(() -> new NoSuchElementException("No configured " + providerName + " import plan found for key " + planKey)); + } + + protected ConfiguredImportPlanDto genericDto(EventHubProperties.ConfiguredImportPlan plan) { + return new ConfiguredImportPlanDto( + plan.getPlanKey(), + plan.isEnabled(), + plan.getCron(), + plan.getTenantKey(), + plan.getEventSource(), + plan.getSourceGroup(), + plan.getImportScope(), + plan.getEventFamilies(), + plan.getInitialMode(), + plan.getScheduledMode(), + plan.getInitialStrategy(), + plan.getScheduledStrategy(), + plan.isRefreshMasterDataFirst(), + plan.getInitialOccurredFrom(), + plan.getInitialOccurredTo(), + plan.isRunInitialOnStartup() + ); + } + + private String normalize(String value) { + return value == null ? "" : value.trim().toLowerCase(Locale.ROOT); + } +} diff --git a/src/main/java/at/procon/eventhub/importing/AbstractImportScheduler.java b/src/main/java/at/procon/eventhub/importing/AbstractImportScheduler.java new file mode 100644 index 0000000..0836628 --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/AbstractImportScheduler.java @@ -0,0 +1,95 @@ +package at.procon.eventhub.importing; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.SchedulerTriggerMode; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.support.CronExpression; + +public abstract class AbstractImportScheduler { + + private final Logger log = LoggerFactory.getLogger(getClass()); + private final Map nextRunByPlan = new ConcurrentHashMap<>(); + private final Map runningByPlan = new ConcurrentHashMap<>(); + + protected void triggerInitialPlansOnStartup() { + if (!schedulerEnabled()) { + return; + } + for (EventHubProperties.ConfiguredImportPlan plan : configuredPlans()) { + if (plan.isEnabled() && hasPlanKey(plan) && plan.isRunInitialOnStartup()) { + triggerPlan(plan, true, schedulerTriggerMode()); + } + } + } + + protected void pollConfiguredPlans() { + if (!schedulerEnabled()) { + return; + } + ZonedDateTime now = ZonedDateTime.now(ZoneId.systemDefault()); + for (EventHubProperties.ConfiguredImportPlan plan : configuredPlans()) { + if (!plan.isEnabled() || !hasPlanKey(plan) || plan.getCron() == null || plan.getCron().isBlank()) { + continue; + } + String key = plan.getPlanKey(); + ZonedDateTime next = nextRunByPlan.computeIfAbsent(key, ignored -> CronExpression.parse(plan.getCron()).next(now)); + if (next != null && !next.isAfter(now)) { + triggerPlan(plan, false, schedulerTriggerMode()); + nextRunByPlan.put(key, CronExpression.parse(plan.getCron()).next(now.plusSeconds(1))); + } + } + } + + protected abstract boolean schedulerEnabled(); + + protected abstract List configuredPlans(); + + protected abstract SchedulerTriggerMode schedulerTriggerMode(); + + protected abstract R createInitialRequest(EventHubProperties.ConfiguredImportPlan plan); + + protected abstract R createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan); + + protected abstract void startImport(R request); + + protected abstract void startAndExecuteImport(R request); + + protected String providerName() { + return "provider"; + } + + private void triggerPlan(EventHubProperties.ConfiguredImportPlan plan, boolean initial, SchedulerTriggerMode triggerMode) { + String key = plan.getPlanKey(); + AtomicBoolean running = runningByPlan.computeIfAbsent(key, ignored -> new AtomicBoolean(false)); + if (!running.compareAndSet(false, true)) { + log.info("Skipping {} import plan={} because a previous run is still active", providerName(), key); + return; + } + try { + R request = initial ? createInitialRequest(plan) : createScheduledRequest(plan); + log.info("Triggering {} import plan={} initial={} mode={} strategy={} triggerMode={} at={}", + providerName(), key, initial, request.mode(), request.acquisitionStrategy(), triggerMode, OffsetDateTime.now()); + if (triggerMode == SchedulerTriggerMode.EXECUTE) { + startAndExecuteImport(request); + } else { + startImport(request); + } + } catch (RuntimeException ex) { + log.error("{} import plan={} failed to trigger", providerName(), key, ex); + } finally { + running.set(false); + } + } + + private boolean hasPlanKey(EventHubProperties.ConfiguredImportPlan plan) { + return plan.getPlanKey() != null && !plan.getPlanKey().isBlank(); + } +} diff --git a/src/main/java/at/procon/eventhub/importing/ConfiguredImportPlanDto.java b/src/main/java/at/procon/eventhub/importing/ConfiguredImportPlanDto.java new file mode 100644 index 0000000..dcae551 --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/ConfiguredImportPlanDto.java @@ -0,0 +1,30 @@ +package at.procon.eventhub.importing; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import java.time.OffsetDateTime; +import java.util.Set; + +public record ConfiguredImportPlanDto( + String planKey, + boolean enabled, + String cron, + String tenantKey, + EventSourceDto eventSource, + SourceGroupRefDto sourceGroup, + ImportScopeDto importScope, + Set eventFamilies, + ImportMode initialMode, + ImportMode scheduledMode, + AcquisitionStrategy initialStrategy, + AcquisitionStrategy scheduledStrategy, + boolean refreshMasterDataFirst, + OffsetDateTime initialOccurredFrom, + OffsetDateTime initialOccurredTo, + boolean runInitialOnStartup +) { +} diff --git a/src/main/java/at/procon/eventhub/importing/ImportChunkPlanner.java b/src/main/java/at/procon/eventhub/importing/ImportChunkPlanner.java new file mode 100644 index 0000000..ddd1300 --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/ImportChunkPlanner.java @@ -0,0 +1,42 @@ +package at.procon.eventhub.importing; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.List; +import org.springframework.stereotype.Component; + +@Component +public class ImportChunkPlanner { + + public List chunksFor(ImportRunRequest request, int defaultChunkDays) { + ImportScopeDto scope = request.importScope(); + OffsetDateTime from = scope == null ? null : scope.occurredFrom(); + OffsetDateTime to = scope == null ? null : scope.occurredTo(); + + if (request.mode() == ImportMode.INCREMENTAL_UPDATE + && request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) { + return List.of(new ImportTimeChunkDto(1, from, to)); + } + + if (from == null || to == null) { + return List.of(new ImportTimeChunkDto(1, from, to)); + } + + List chunks = new ArrayList<>(); + int days = Math.max(1, defaultChunkDays); + OffsetDateTime cursor = from; + int sequence = 1; + while (cursor.isBefore(to)) { + OffsetDateTime next = cursor.plusDays(days); + if (next.isAfter(to)) { + next = to; + } + chunks.add(new ImportTimeChunkDto(sequence++, cursor, next)); + cursor = next; + } + return chunks.isEmpty() ? List.of(new ImportTimeChunkDto(1, from, to)) : chunks; + } +} diff --git a/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java new file mode 100644 index 0000000..338ff26 --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java @@ -0,0 +1,203 @@ +package at.procon.eventhub.importing.extraction; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportCursorStateDto; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.importing.ExtractionBatchResult; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportRunRequest; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.persistence.ImportCursorRepository; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.apache.camel.ProducerTemplate; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.util.StreamUtils; + +public abstract class AbstractJdbcExtractionBatchExecutor + implements ExtractionBatchExecutor { + + private final NamedParameterJdbcTemplate jdbcTemplate; + private final ProducerTemplate producerTemplate; + private final ResourceLoader resourceLoader; + private final ImportCursorRepository importCursorRepository; + + protected AbstractJdbcExtractionBatchExecutor( + NamedParameterJdbcTemplate jdbcTemplate, + ProducerTemplate producerTemplate, + ResourceLoader resourceLoader, + ImportCursorRepository importCursorRepository + ) { + this.jdbcTemplate = jdbcTemplate; + this.producerTemplate = producerTemplate; + this.resourceLoader = resourceLoader; + this.importCursorRepository = importCursorRepository; + } + + @Override + public B execute( + UUID importRunId, + UUID packageId, + int eventSourceId, + R request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk + ) { + ExtractionDefinition definition = findDefinition(planItem.extractionCode()) + .orElseThrow(() -> new IllegalArgumentException("No extraction definition for " + planItem.extractionCode())); + + ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk); + EventHubPackageRequest packageInfo = packageInfo(importRunId, request, planItem, chunk, chunkScope); + ExtractionContext context = new ExtractionContext<>( + importRunId, + packageId, + eventSourceId, + request, + planItem, + chunk, + packageInfo.eventSource(), + packageInfo + ); + + String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(); + ImportCursorStateDto cursor = importCursorRepository.findCursor( + request.tenantKey(), + eventSourceId, + scopeHash, + planItem.eventFamily(), + planItem.sourceKind(), + request.acquisitionStrategy() + ); + + Map params = parameters(request, chunkScope, cursor); + String sql = loadSql(definition.sqlResource()); + List events = jdbcTemplate.query(sql, params, (rs, rowNum) -> definition.rowMapper().map(rs, rowNum, context)); + events.forEach(event -> producerTemplate.sendBody(normalizedInputUri(), event)); + + return resultFor(packageId, planItem, chunk, cursor, events); + } + + protected abstract Optional> findDefinition(String code); + + protected abstract EventSourceDto eventSourceFor(R request, ImportPlanItemDto planItem); + + protected abstract B resultFor( + UUID packageId, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk, + ImportCursorStateDto cursor, + List events + ); + + protected String providerPackagePrefix() { + return "SOURCE"; + } + + protected String normalizedInputUri() { + return "direct:eventhub-normalized-input"; + } + + protected Map parameters(R request, ImportScopeDto scope, ImportCursorStateDto cursor) { + Map params = new HashMap<>(); + params.put("tenantKey", request.tenantKey()); + params.put("occurredFrom", scope == null ? null : scope.occurredFrom()); + params.put("occurredTo", scope == null ? null : scope.occurredTo()); + params.put("rootOrganisationId", scope == null || scope.rootSourceOrganisation() == null ? null : scope.rootSourceOrganisation().sourceEntityId()); + params.put("includeChildren", scope != null && scope.includeChildren()); + params.put("lastSourcePackageImportedAt", cursor == null ? null : cursor.lastSourcePackageImportedAt()); + params.put("lastSourcePackageId", cursor == null ? null : cursor.lastSourcePackageId()); + params.put("lastSourceRowUpdatedAt", cursor == null ? null : cursor.lastSourceRowUpdatedAt()); + params.put("lastOccurredTo", cursor == null ? null : cursor.lastOccurredTo()); + return params; + } + + protected OffsetDateTime lastSourcePackageImportedAt(List events, ImportCursorStateDto cursor) { + return events.stream() + .map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().importedIntoSourceAt()) + .filter(value -> value != null) + .max(OffsetDateTime::compareTo) + .orElse(cursor == null ? null : cursor.lastSourcePackageImportedAt()); + } + + protected String lastSourcePackageId(List events, ImportCursorStateDto cursor) { + return events.stream() + .filter(event -> event.sourcePackageRef() != null && event.sourcePackageRef().importedIntoSourceAt() != null) + .max((left, right) -> left.sourcePackageRef().importedIntoSourceAt().compareTo(right.sourcePackageRef().importedIntoSourceAt())) + .map(event -> event.sourcePackageRef().sourcePackageId()) + .orElseGet(() -> events.stream() + .map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId()) + .filter(value -> value != null && !value.isBlank()) + .max(this::compareSourcePackageId) + .orElse(cursor == null ? null : cursor.lastSourcePackageId())); + } + + private EventHubPackageRequest packageInfo( + UUID importRunId, + R request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk, + ImportScopeDto chunkScope + ) { + return new EventHubPackageRequest( + request.tenantKey(), + eventSourceFor(request, planItem), + request.sourceGroup(), + chunkScope, + planItem.eventFamily().name(), + chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(), + providerPackagePrefix() + ":" + planItem.sourceKind() + ":" + planItem.extractionCode() + ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence() + ); + } + + private ImportScopeDto chunkScope(ImportScopeDto scope, ImportTimeChunkDto chunk) { + if (scope == null) { + return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo()); + } + return new ImportScopeDto( + scope.type(), + scope.rootSourceOrganisation(), + scope.includeChildren(), + chunk.occurredFrom(), + chunk.occurredTo() + ); + } + + private String loadSql(String location) { + Resource resource = resourceLoader.getResource(location); + try (var inputStream = resource.getInputStream()) { + return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new IllegalStateException("Cannot load extraction SQL resource " + location, e); + } + } + + private int compareSourcePackageId(String left, String right) { + Integer leftInt = parseInteger(left); + Integer rightInt = parseInteger(right); + if (leftInt != null && rightInt != null) { + return leftInt.compareTo(rightInt); + } + return left.compareTo(right); + } + + private Integer parseInteger(String value) { + if (value == null || value.isBlank()) { + return null; + } + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException ignored) { + return null; + } + } +} diff --git a/src/main/java/at/procon/eventhub/importing/extraction/AbstractNoopExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/importing/extraction/AbstractNoopExtractionBatchExecutor.java new file mode 100644 index 0000000..a0e359a --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/extraction/AbstractNoopExtractionBatchExecutor.java @@ -0,0 +1,35 @@ +package at.procon.eventhub.importing.extraction; + +import at.procon.eventhub.importing.ExtractionBatchResult; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportRunRequest; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractNoopExtractionBatchExecutor + implements ExtractionBatchExecutor { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Override + public B execute( + UUID importRunId, + UUID packageId, + int eventSourceId, + R request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk + ) { + log.warn("No concrete {} SQL extractor configured. importRunId={} packageId={} extractionCode={} sourceKind={} chunk={}", + providerName(), importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), chunk.sequence()); + return emptyResult(packageId, planItem, chunk); + } + + protected abstract B emptyResult(UUID packageId, ImportPlanItemDto planItem, ImportTimeChunkDto chunk); + + protected String providerName() { + return "provider"; + } +} diff --git a/src/main/java/at/procon/eventhub/importing/extraction/ExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionBatchExecutor.java new file mode 100644 index 0000000..eda5910 --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionBatchExecutor.java @@ -0,0 +1,19 @@ +package at.procon.eventhub.importing.extraction; + +import at.procon.eventhub.importing.ExtractionBatchResult; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportRunRequest; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import java.util.UUID; + +public interface ExtractionBatchExecutor { + + B execute( + UUID importRunId, + UUID packageId, + int eventSourceId, + R request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk + ); +} diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionContext.java b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionContext.java similarity index 70% rename from src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionContext.java rename to src/main/java/at/procon/eventhub/importing/extraction/ExtractionContext.java index 9abd397..89fe066 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionContext.java +++ b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionContext.java @@ -1,17 +1,17 @@ -package at.procon.eventhub.tachograph.service; +package at.procon.eventhub.importing.extraction; import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportRunRequest; import at.procon.eventhub.importing.ImportTimeChunkDto; -import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.util.UUID; -public record TachographExtractionContext( +public record ExtractionContext( UUID importRunId, UUID packageId, int eventSourceId, - TachographImportRequest request, + R request, ImportPlanItemDto planItem, ImportTimeChunkDto chunk, EventSourceDto eventSource, diff --git a/src/main/java/at/procon/eventhub/importing/extraction/ExtractionDefinition.java b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionDefinition.java new file mode 100644 index 0000000..7329eda --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionDefinition.java @@ -0,0 +1,14 @@ +package at.procon.eventhub.importing.extraction; + +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.importing.ImportRunRequest; + +public record ExtractionDefinition( + String code, + EventFamily eventFamily, + String sourceKind, + String entityAxis, + String sqlResource, + ExtractionRowMapper rowMapper +) { +} diff --git a/src/main/java/at/procon/eventhub/importing/extraction/ExtractionDefinitionRegistry.java b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionDefinitionRegistry.java new file mode 100644 index 0000000..aee786f --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionDefinitionRegistry.java @@ -0,0 +1,27 @@ +package at.procon.eventhub.importing.extraction; + +import at.procon.eventhub.importing.ImportRunRequest; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class ExtractionDefinitionRegistry { + + private final Map> definitionsByCode; + + protected ExtractionDefinitionRegistry(List> definitions) { + this.definitionsByCode = definitions.stream() + .collect(Collectors.toUnmodifiableMap(definition -> normalize(definition.code()), Function.identity())); + } + + public Optional> findByCode(String code) { + return Optional.ofNullable(definitionsByCode.get(normalize(code))); + } + + private String normalize(String value) { + return value == null ? "" : value.trim().toUpperCase(Locale.ROOT); + } +} diff --git a/src/main/java/at/procon/eventhub/importing/extraction/ExtractionRowMapper.java b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionRowMapper.java new file mode 100644 index 0000000..0176072 --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/extraction/ExtractionRowMapper.java @@ -0,0 +1,11 @@ +package at.procon.eventhub.importing.extraction; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.importing.ImportRunRequest; +import java.sql.ResultSet; +import java.sql.SQLException; + +public interface ExtractionRowMapper { + + EventHubEventDto map(ResultSet rs, int rowNum, ExtractionContext context) throws SQLException; +} diff --git a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java index f0b6516..163549f 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java @@ -12,7 +12,10 @@ import at.procon.eventhub.dto.EventType; import at.procon.eventhub.dto.SourcePackageRefDto; import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.importing.extraction.ExtractionContext; +import at.procon.eventhub.importing.extraction.ExtractionRowMapper; import at.procon.eventhub.service.EventDetailsFactory; +import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; @@ -23,7 +26,7 @@ import java.util.Locale; import java.util.Map; import java.util.UUID; -abstract class AbstractTachographActivityRowMapper implements TachographExtractionRowMapper { +abstract class AbstractTachographActivityRowMapper implements ExtractionRowMapper { private final EventDetailsFactory detailsFactory; @@ -32,7 +35,7 @@ abstract class AbstractTachographActivityRowMapper implements TachographExtracti } @Override - public EventHubEventDto map(ResultSet rs, int rowNum, TachographExtractionContext context) throws SQLException { + public EventHubEventDto map(ResultSet rs, int rowNum, ExtractionContext context) throws SQLException { OffsetDateTime occurredAt = offsetDateTime(rs, "occurred_at"); SourcePackageRefDto sourcePackageRef = sourcePackageRef(rs); DriverRefDto driverRef = driverRef(rs); @@ -97,7 +100,7 @@ abstract class AbstractTachographActivityRowMapper implements TachographExtracti ); } - private Map payload(ResultSet rs, TachographExtractionContext context) throws SQLException { + private Map payload(ResultSet rs, ExtractionContext context) throws SQLException { Map raw = new LinkedHashMap<>(); raw.put("extractionCode", context.planItem().extractionCode()); raw.put("sourceKind", context.planItem().sourceKind()); @@ -122,7 +125,7 @@ abstract class AbstractTachographActivityRowMapper implements TachographExtracti } private String defaultExternalSourceEventId( - TachographExtractionContext context, + ExtractionContext context, int rowNum, OffsetDateTime occurredAt, SourcePackageRefDto sourcePackageRef, diff --git a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java index 6b72903..54a5c65 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java @@ -1,38 +1,34 @@ package at.procon.eventhub.tachograph.service; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportCursorStateDto; -import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.extraction.AbstractJdbcExtractionBatchExecutor; +import at.procon.eventhub.importing.extraction.ExtractionDefinition; import at.procon.eventhub.importing.persistence.ImportCursorRepository; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.time.OffsetDateTime; -import java.util.HashMap; -import java.util.Map; +import java.util.List; +import java.util.Optional; import java.util.UUID; import org.apache.camel.ProducerTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Service; -import org.springframework.util.StreamUtils; @Service @ConditionalOnBean(name = "tachographNamedParameterJdbcTemplate") @ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''") -public class JdbcTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor { +public class JdbcTachographExtractionBatchExecutor + extends AbstractJdbcExtractionBatchExecutor + implements TachographExtractionBatchExecutor { - private final NamedParameterJdbcTemplate tachographJdbcTemplate; - private final ProducerTemplate producerTemplate; - private final ResourceLoader resourceLoader; private final TachographExtractionDefinitionRegistry definitionRegistry; - private final ImportCursorRepository importCursorRepository; public JdbcTachographExtractionBatchExecutor( @Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate tachographJdbcTemplate, @@ -41,76 +37,23 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti TachographExtractionDefinitionRegistry definitionRegistry, ImportCursorRepository importCursorRepository ) { - this.tachographJdbcTemplate = tachographJdbcTemplate; - this.producerTemplate = producerTemplate; - this.resourceLoader = resourceLoader; + super(tachographJdbcTemplate, producerTemplate, resourceLoader, importCursorRepository); this.definitionRegistry = definitionRegistry; - this.importCursorRepository = importCursorRepository; } @Override - public TachographExtractionBatchResultDto execute( - UUID importRunId, + protected Optional> findDefinition(String code) { + return definitionRegistry.findByCode(code); + } + + @Override + protected TachographExtractionBatchResultDto resultFor( UUID packageId, - int eventSourceId, - TachographImportRequest request, ImportPlanItemDto planItem, - ImportTimeChunkDto chunk + ImportTimeChunkDto chunk, + ImportCursorStateDto cursor, + List events ) { - TachographExtractionDefinition definition = definitionRegistry.findByCode(planItem.extractionCode()) - .orElseThrow(() -> new IllegalArgumentException("No tachograph extraction definition for " + planItem.extractionCode())); - - ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk); - var packageInfo = new at.procon.eventhub.dto.EventHubPackageRequest( - request.tenantKey(), - eventSourceFor(request, planItem), - request.sourceGroup(), - chunkScope, - planItem.eventFamily().name(), - chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(), - "TACHOGRAPH:" + planItem.sourceKind() + ":" + planItem.extractionCode() + ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence() - ); - TachographExtractionContext context = new TachographExtractionContext( - importRunId, - packageId, - eventSourceId, - request, - planItem, - chunk, - packageInfo.eventSource(), - packageInfo - ); - - String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(); - ImportCursorStateDto cursor = importCursorRepository.findCursor( - request.tenantKey(), - eventSourceId, - scopeHash, - planItem.eventFamily(), - planItem.sourceKind(), - request.acquisitionStrategy() - ); - - Map params = parameters(request, chunkScope, cursor); - String sql = loadSql(definition.sqlResource()); - var events = tachographJdbcTemplate.query(sql, params, (rs, rowNum) -> definition.rowMapper().map(rs, rowNum, context)); - events.forEach(event -> producerTemplate.sendBody("direct:eventhub-normalized-input", event)); - - OffsetDateTime lastSourcePackageImportedAt = events.stream() - .map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().importedIntoSourceAt()) - .filter(value -> value != null) - .max(OffsetDateTime::compareTo) - .orElse(cursor == null ? null : cursor.lastSourcePackageImportedAt()); - String lastSourcePackageId = events.stream() - .filter(event -> event.sourcePackageRef() != null && event.sourcePackageRef().importedIntoSourceAt() != null) - .max((left, right) -> left.sourcePackageRef().importedIntoSourceAt().compareTo(right.sourcePackageRef().importedIntoSourceAt())) - .map(event -> event.sourcePackageRef().sourcePackageId()) - .orElseGet(() -> events.stream() - .map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId()) - .filter(value -> value != null && !value.isBlank()) - .max(this::compareSourcePackageId) - .orElse(cursor == null ? null : cursor.lastSourcePackageId())); - return new TachographExtractionBatchResultDto( packageId, planItem.extractionCode(), @@ -120,47 +63,21 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti events.size(), 0, true, - lastSourcePackageImportedAt, - lastSourcePackageId, + lastSourcePackageImportedAt(events, cursor), + lastSourcePackageId(events, cursor), null, chunk.occurredTo() ); } - private Map parameters(TachographImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) { - Map params = new HashMap<>(); - params.put("tenantKey", request.tenantKey()); - params.put("occurredFrom", scope == null ? null : scope.occurredFrom()); - params.put("occurredTo", scope == null ? null : scope.occurredTo()); - params.put("rootOrganisationId", scope == null || scope.rootSourceOrganisation() == null ? null : scope.rootSourceOrganisation().sourceEntityId()); - params.put("includeChildren", scope != null && scope.includeChildren()); - params.put("lastSourcePackageImportedAt", cursor == null ? null : cursor.lastSourcePackageImportedAt()); - params.put("lastSourcePackageId", cursor == null ? null : cursor.lastSourcePackageId()); - params.put("lastSourceRowUpdatedAt", cursor == null ? null : cursor.lastSourceRowUpdatedAt()); - params.put("lastOccurredTo", cursor == null ? null : cursor.lastOccurredTo()); - return params; - } - - private ImportScopeDto chunkScope(ImportScopeDto scope, ImportTimeChunkDto chunk) { - if (scope == null) { - return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo()); - } - return new ImportScopeDto( - scope.type(), - scope.rootSourceOrganisation(), - scope.includeChildren(), - chunk.occurredFrom(), - chunk.occurredTo() - ); - } - - private at.procon.eventhub.dto.EventSourceDto eventSourceFor(TachographImportRequest request, ImportPlanItemDto planItem) { + @Override + protected EventSourceDto eventSourceFor(TachographImportRequest request, ImportPlanItemDto planItem) { String sourceKey = switch (planItem.sourceKind()) { case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT"; case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD"; default -> request.eventSource().sourceKey(); }; - return new at.procon.eventhub.dto.EventSourceDto( + return new EventSourceDto( request.eventSource().providerKey(), planItem.sourceKind(), sourceKey, @@ -170,32 +87,8 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti ); } - private String loadSql(String location) { - Resource resource = resourceLoader.getResource(location); - try (var inputStream = resource.getInputStream()) { - return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new IllegalStateException("Cannot load tachograph extraction SQL resource " + location, e); - } - } - - private int compareSourcePackageId(String left, String right) { - Integer leftInt = parseInteger(left); - Integer rightInt = parseInteger(right); - if (leftInt != null && rightInt != null) { - return leftInt.compareTo(rightInt); - } - return left.compareTo(right); - } - - private Integer parseInteger(String value) { - if (value == null || value.isBlank()) { - return null; - } - try { - return Integer.parseInt(value.trim()); - } catch (NumberFormatException ignored) { - return null; - } + @Override + protected String providerPackagePrefix() { + return "TACHOGRAPH"; } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java index d57272a..fb4a161 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java @@ -2,13 +2,12 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.extraction.AbstractNoopExtractionBatchExecutor; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.util.UUID; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; /** @@ -21,21 +20,16 @@ import org.springframework.stereotype.Service; @Service @ConditionalOnMissingBean(TachographExtractionBatchExecutor.class) @ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' == ''") -public class NoopTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor { - - private static final Logger log = LoggerFactory.getLogger(NoopTachographExtractionBatchExecutor.class); +public class NoopTachographExtractionBatchExecutor + extends AbstractNoopExtractionBatchExecutor + implements TachographExtractionBatchExecutor { @Override - public TachographExtractionBatchResultDto execute( - UUID importRunId, + protected TachographExtractionBatchResultDto emptyResult( UUID packageId, - int eventSourceId, - TachographImportRequest request, ImportPlanItemDto planItem, ImportTimeChunkDto chunk ) { - log.warn("No concrete tachograph SQL extractor configured. importRunId={} packageId={} extractionCode={} sourceKind={} chunk={}", - importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), chunk.sequence()); return new TachographExtractionBatchResultDto( packageId, planItem.extractionCode(), @@ -51,4 +45,9 @@ public class NoopTachographExtractionBatchExecutor implements TachographExtracti null ); } + + @Override + protected String providerName() { + return "tachograph"; + } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographConfiguredImportPlanService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographConfiguredImportPlanService.java index 32db412..9631d5c 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographConfiguredImportPlanService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographConfiguredImportPlanService.java @@ -2,75 +2,29 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.dto.AcquisitionStrategy; -import at.procon.eventhub.tachograph.dto.ConfiguredTachographImportPlanDto; import at.procon.eventhub.dto.ImportMode; import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.importing.AbstractConfiguredImportPlanService; +import at.procon.eventhub.importing.ConfiguredImportPlanDto; +import at.procon.eventhub.tachograph.dto.ConfiguredTachographImportPlanDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import java.util.List; -import java.util.Locale; -import java.util.NoSuchElementException; import org.springframework.stereotype.Service; @Service -public class TachographConfiguredImportPlanService { - - private final EventHubProperties properties; +public class TachographConfiguredImportPlanService + extends AbstractConfiguredImportPlanService { public TachographConfiguredImportPlanService(EventHubProperties properties) { - this.properties = properties; + super(() -> properties.getTachograph().getImportPlans(), "tachograph"); } - public List listPlans() { - return properties.getTachograph().getImportPlans().stream() - .map(this::toDto) - .toList(); - } - - public ConfiguredTachographImportPlanDto getPlan(String planKey) { - return properties.getTachograph().getImportPlans().stream() - .filter(plan -> normalize(plan.getPlanKey()).equals(normalize(planKey))) - .findFirst() - .map(this::toDto) - .orElseThrow(() -> new NoSuchElementException("No configured tachograph import plan found for key " + planKey)); - } - - public TachographImportRequest createRequest(String planKey, ImportMode modeOverride, AcquisitionStrategy strategyOverride) { - EventHubProperties.ConfiguredImportPlan plan = properties.getTachograph().getImportPlans().stream() - .filter(candidate -> normalize(candidate.getPlanKey()).equals(normalize(planKey))) - .findFirst() - .orElseThrow(() -> new NoSuchElementException("No configured tachograph import plan found for key " + planKey)); - return createRequest(plan, modeOverride, strategyOverride, false); - } - - public TachographImportRequest createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) { - return createRequest(plan, plan.getScheduledMode(), plan.getScheduledStrategy(), false); - } - - public TachographImportRequest createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) { - return createRequest(plan, plan.getInitialMode(), plan.getInitialStrategy(), true); - } - - private TachographImportRequest createRequest( + @Override + protected TachographImportRequest buildRequest( EventHubProperties.ConfiguredImportPlan plan, - ImportMode modeOverride, - AcquisitionStrategy strategyOverride, - boolean applyInitialOccurredWindow + ImportMode mode, + AcquisitionStrategy strategy, + ImportScopeDto scope ) { - ImportMode mode = modeOverride == null ? plan.getScheduledMode() : modeOverride; - AcquisitionStrategy strategy = strategyOverride == null - ? (mode == ImportMode.INCREMENTAL_UPDATE ? plan.getScheduledStrategy() : plan.getInitialStrategy()) - : strategyOverride; - ImportScopeDto scope = plan.getImportScope(); - if (applyInitialOccurredWindow && scope != null - && (plan.getInitialOccurredFrom() != null || plan.getInitialOccurredTo() != null)) { - scope = new ImportScopeDto( - scope.type(), - scope.rootSourceOrganisation(), - scope.includeChildren(), - plan.getInitialOccurredFrom() == null ? scope.occurredFrom() : plan.getInitialOccurredFrom(), - plan.getInitialOccurredTo() == null ? scope.occurredTo() : plan.getInitialOccurredTo() - ); - } return new TachographImportRequest( plan.getTenantKey(), plan.getEventSource(), @@ -83,28 +37,26 @@ public class TachographConfiguredImportPlanService { ); } - private ConfiguredTachographImportPlanDto toDto(EventHubProperties.ConfiguredImportPlan plan) { + @Override + protected ConfiguredTachographImportPlanDto toDto(EventHubProperties.ConfiguredImportPlan plan) { + ConfiguredImportPlanDto dto = genericDto(plan); return new ConfiguredTachographImportPlanDto( - plan.getPlanKey(), - plan.isEnabled(), - plan.getCron(), - plan.getTenantKey(), - plan.getEventSource(), - plan.getSourceGroup(), - plan.getImportScope(), - plan.getEventFamilies(), - plan.getInitialMode(), - plan.getScheduledMode(), - plan.getInitialStrategy(), - plan.getScheduledStrategy(), - plan.isRefreshMasterDataFirst(), - plan.getInitialOccurredFrom(), - plan.getInitialOccurredTo(), - plan.isRunInitialOnStartup() + dto.planKey(), + dto.enabled(), + dto.cron(), + dto.tenantKey(), + dto.eventSource(), + dto.sourceGroup(), + dto.importScope(), + dto.eventFamilies(), + dto.initialMode(), + dto.scheduledMode(), + dto.initialStrategy(), + dto.scheduledStrategy(), + dto.refreshMasterDataFirst(), + dto.initialOccurredFrom(), + dto.initialOccurredTo(), + dto.runInitialOnStartup() ); } - - private String normalize(String value) { - return value == null ? "" : value.trim().toLowerCase(Locale.ROOT); - } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionBatchExecutor.java index fae4ce5..afd72bc 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionBatchExecutor.java @@ -1,13 +1,16 @@ package at.procon.eventhub.tachograph.service; +import at.procon.eventhub.importing.extraction.ExtractionBatchExecutor; import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.util.UUID; -public interface TachographExtractionBatchExecutor { +public interface TachographExtractionBatchExecutor + extends ExtractionBatchExecutor { + @Override TachographExtractionBatchResultDto execute( UUID importRunId, UUID packageId, diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionDefinition.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionDefinition.java deleted file mode 100644 index c5cf46c..0000000 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionDefinition.java +++ /dev/null @@ -1,13 +0,0 @@ -package at.procon.eventhub.tachograph.service; - -import at.procon.eventhub.dto.EventFamily; - -public record TachographExtractionDefinition( - String code, - EventFamily eventFamily, - String sourceKind, - String entityAxis, - String sqlResource, - TachographExtractionRowMapper rowMapper -) { -} diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionDefinitionRegistry.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionDefinitionRegistry.java index 05e50d3..10cb896 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionDefinitionRegistry.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionDefinitionRegistry.java @@ -1,25 +1,21 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.importing.extraction.ExtractionDefinition; +import at.procon.eventhub.importing.extraction.ExtractionDefinitionRegistry; +import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; import org.springframework.stereotype.Component; @Component -public class TachographExtractionDefinitionRegistry { - - private final Map definitionsByCode; +public class TachographExtractionDefinitionRegistry extends ExtractionDefinitionRegistry { public TachographExtractionDefinitionRegistry( CardActivityRowMapper cardActivityRowMapper, VuActivityRowMapper vuActivityRowMapper ) { - List definitions = List.of( - new TachographExtractionDefinition( + super(List.of( + new ExtractionDefinition<>( "CARD_ACTIVITY", EventFamily.DRIVER_ACTIVITY, "DRIVER_CARD", @@ -27,7 +23,7 @@ public class TachographExtractionDefinitionRegistry { "classpath:sql/tachograph/card-activity.sql", cardActivityRowMapper ), - new TachographExtractionDefinition( + new ExtractionDefinition<>( "VU_ACTIVITY", EventFamily.DRIVER_ACTIVITY, "VEHICLE_UNIT", @@ -35,16 +31,6 @@ public class TachographExtractionDefinitionRegistry { "classpath:sql/tachograph/vu-activity.sql", vuActivityRowMapper ) - ); - this.definitionsByCode = definitions.stream() - .collect(Collectors.toUnmodifiableMap(definition -> normalize(definition.code()), Function.identity())); - } - - public Optional findByCode(String code) { - return Optional.ofNullable(definitionsByCode.get(normalize(code))); - } - - private String normalize(String value) { - return value == null ? "" : value.trim().toUpperCase(Locale.ROOT); + )); } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionRowMapper.java deleted file mode 100644 index 6f64784..0000000 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographExtractionRowMapper.java +++ /dev/null @@ -1,10 +0,0 @@ -package at.procon.eventhub.tachograph.service; - -import at.procon.eventhub.dto.EventHubEventDto; -import java.sql.ResultSet; -import java.sql.SQLException; - -public interface TachographExtractionRowMapper { - - EventHubEventDto map(ResultSet rs, int rowNum, TachographExtractionContext context) throws SQLException; -} diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java index a64238d..97a07bf 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java @@ -3,13 +3,10 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.EventFamily; -import at.procon.eventhub.dto.ImportMode; -import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.importing.ImportChunkPlanner; import at.procon.eventhub.importing.ImportPlanDto; import at.procon.eventhub.importing.ImportPlanItemDto; -import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; import org.springframework.stereotype.Service; @@ -18,9 +15,11 @@ import org.springframework.stereotype.Service; public class TachographImportPlanService { private final EventHubProperties properties; + private final ImportChunkPlanner chunkPlanner; - public TachographImportPlanService(EventHubProperties properties) { + public TachographImportPlanService(EventHubProperties properties, ImportChunkPlanner chunkPlanner) { this.properties = properties; + this.chunkPlanner = chunkPlanner; } public ImportPlanDto createPlan(TachographImportRequest request) { @@ -36,43 +35,11 @@ public class TachographImportPlanService { request.importScope(), request.sourceGroup(), request.eventSource(), - chunksFor(request), + chunkPlanner.chunksFor(request, properties.getTachograph().getDefaultChunkDays()), items ); } - private List chunksFor(TachographImportRequest request) { - ImportScopeDto scope = request.importScope(); - OffsetDateTime from = scope == null ? null : scope.occurredFrom(); - OffsetDateTime to = scope == null ? null : scope.occurredTo(); - - // Source-package driven increments discover original card/VU packages by source-package - // watermark. The occurred window may be null because package period and imported-at - // timestamps are used later by the extractor. - if (request.mode() == ImportMode.INCREMENTAL_UPDATE - && request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) { - return List.of(new ImportTimeChunkDto(1, from, to)); - } - - if (from == null || to == null) { - return List.of(new ImportTimeChunkDto(1, from, to)); - } - - List chunks = new ArrayList<>(); - int days = Math.max(1, properties.getTachograph().getDefaultChunkDays()); - OffsetDateTime cursor = from; - int sequence = 1; - while (cursor.isBefore(to)) { - OffsetDateTime next = cursor.plusDays(days); - if (next.isAfter(to)) { - next = to; - } - chunks.add(new ImportTimeChunkDto(sequence++, cursor, next)); - cursor = next; - } - return chunks.isEmpty() ? List.of(new ImportTimeChunkDto(1, from, to)) : chunks; - } - private List itemsFor(EventFamily family, AcquisitionStrategy strategy) { return switch (family) { case DRIVER_ACTIVITY -> List.of( diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportScheduler.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportScheduler.java index a6eaab2..0ade919 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportScheduler.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportScheduler.java @@ -1,33 +1,21 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.config.EventHubProperties; -import at.procon.eventhub.dto.ImportMode; import at.procon.eventhub.dto.SchedulerTriggerMode; +import at.procon.eventhub.importing.AbstractImportScheduler; import at.procon.eventhub.tachograph.dto.TachographImportRequest; -import java.time.OffsetDateTime; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.List; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.scheduling.support.CronExpression; import org.springframework.stereotype.Service; @Service -public class TachographImportScheduler { - - private static final Logger log = LoggerFactory.getLogger(TachographImportScheduler.class); +public class TachographImportScheduler extends AbstractImportScheduler { private final EventHubProperties properties; private final TachographConfiguredImportPlanService configuredPlanService; private final TachographImportExecutionService executionService; - private final Map nextRunByPlan = new ConcurrentHashMap<>(); - private final Map runningByPlan = new ConcurrentHashMap<>(); public TachographImportScheduler( EventHubProperties properties, @@ -41,57 +29,51 @@ public class TachographImportScheduler { @EventListener(ApplicationReadyEvent.class) public void triggerInitialPlansOnStartup() { - if (!properties.getTachograph().isSchedulerEnabled()) { - return; - } - for (EventHubProperties.ConfiguredImportPlan plan : properties.getTachograph().getImportPlans()) { - if (plan.isEnabled() && plan.getPlanKey() != null && !plan.getPlanKey().isBlank() && plan.isRunInitialOnStartup()) { - triggerPlan(plan, true, properties.getTachograph().getSchedulerTriggerMode()); - } - } + super.triggerInitialPlansOnStartup(); } @Scheduled(fixedDelayString = "${eventhub.tachograph.scheduler-poll-interval-ms:60000}") public void pollConfiguredPlans() { - if (!properties.getTachograph().isSchedulerEnabled()) { - return; - } - ZonedDateTime now = ZonedDateTime.now(ZoneId.systemDefault()); - for (EventHubProperties.ConfiguredImportPlan plan : properties.getTachograph().getImportPlans()) { - if (!plan.isEnabled() || plan.getPlanKey() == null || plan.getPlanKey().isBlank() || plan.getCron() == null || plan.getCron().isBlank()) { - continue; - } - String key = plan.getPlanKey(); - ZonedDateTime next = nextRunByPlan.computeIfAbsent(key, ignored -> CronExpression.parse(plan.getCron()).next(now)); - if (next != null && !next.isAfter(now)) { - triggerPlan(plan, false, properties.getTachograph().getSchedulerTriggerMode()); - nextRunByPlan.put(key, CronExpression.parse(plan.getCron()).next(now.plusSeconds(1))); - } - } + super.pollConfiguredPlans(); } - private void triggerPlan(EventHubProperties.ConfiguredImportPlan plan, boolean initial, SchedulerTriggerMode triggerMode) { - String key = plan.getPlanKey(); - AtomicBoolean running = runningByPlan.computeIfAbsent(key, ignored -> new AtomicBoolean(false)); - if (!running.compareAndSet(false, true)) { - log.info("Skipping tachograph import plan={} because a previous run is still active", key); - return; - } - try { - TachographImportRequest request = initial - ? configuredPlanService.createInitialRequest(plan) - : configuredPlanService.createScheduledRequest(plan); - log.info("Triggering tachograph import plan={} initial={} mode={} strategy={} triggerMode={} at={}", - key, initial, request.mode(), request.acquisitionStrategy(), triggerMode, OffsetDateTime.now()); - if (triggerMode == SchedulerTriggerMode.EXECUTE) { - executionService.startAndExecuteImport(request); - } else { - executionService.startImport(request); - } - } catch (RuntimeException ex) { - log.error("Tachograph import plan={} failed to trigger", key, ex); - } finally { - running.set(false); - } + @Override + protected boolean schedulerEnabled() { + return properties.getTachograph().isSchedulerEnabled(); + } + + @Override + protected List configuredPlans() { + return properties.getTachograph().getImportPlans(); + } + + @Override + protected SchedulerTriggerMode schedulerTriggerMode() { + return properties.getTachograph().getSchedulerTriggerMode(); + } + + @Override + protected TachographImportRequest createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) { + return configuredPlanService.createInitialRequest(plan); + } + + @Override + protected TachographImportRequest createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) { + return configuredPlanService.createScheduledRequest(plan); + } + + @Override + protected void startImport(TachographImportRequest request) { + executionService.startImport(request); + } + + @Override + protected void startAndExecuteImport(TachographImportRequest request) { + executionService.startAndExecuteImport(request); + } + + @Override + protected String providerName() { + return "tachograph"; } }