diff --git a/README.md b/README.md index 44270d8..8cab9f9 100644 --- a/README.md +++ b/README.md @@ -711,3 +711,98 @@ planned data_package -> send to direct:eventhub-normalized-input -> only advance eventhub.import_cursor after successful import ``` + +## Configurable scheduled tachograph imports + +The project now supports configuration-driven tachograph import plans. A configured plan describes: + +```text +tenant +EventSource +optional sourceGroup, e.g. tachograph root organisation +ImportScope, including organisation subtree and occurred-time filter +event families +initial backfill strategy +scheduled incremental strategy +cron schedule +``` + +Example configuration is included in `application.yml` under: + +```yaml +eventhub: + tachograph: + scheduler-enabled: false + scheduler-poll-interval-ms: 60000 + scheduler-trigger-mode: PLAN_ONLY + import-plans: + - plan-key: kralowetz-tachograph-org-147 + enabled: false + cron: "0 15 * * * *" + tenant-key: kralowetz + event-source: + provider-key: TACHOGRAPH + source-kind: MIXED + source-key: TACHOGRAPH_DB + source-instance-key: tachograph-prod-db + tenant-provider-setting-key: kralowetz-tachograph-prod + import-scope: + type: SOURCE_ORGANISATION_SUBTREE + root-source-organisation: + type: ORGANISATION + source-entity-id: "147" + include-children: true + occurred-from: null + occurred-to: null + event-families: + - DRIVER_ACTIVITY + - DRIVER_CARD + - POSITION + initial-mode: INITIAL_BACKFILL + scheduled-mode: INCREMENTAL_UPDATE + initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP + scheduled-strategy: SOURCE_PACKAGE_WATERMARK + refresh-master-data-first: true + initial-occurred-from: "2025-01-01T00:00:00+01:00" + run-initial-on-startup: false +``` + +`PLAN_ONLY` creates an `import_run` plus planned extraction `data_package` rows. `EXECUTE` also invokes the configured `TachographExtractionBatchExecutor`. The generated project provides a no-op executor as an extension point; replace it with a SQL/JDBC extractor that reads the real tachograph DB. + +Configured plan endpoints: + +```http +GET /api/eventhub/acquisition/tachograph/imports/configured-plans +GET /api/eventhub/acquisition/tachograph/imports/configured-plans/{planKey} +POST /api/eventhub/acquisition/tachograph/imports/configured-plans/{planKey}/start?triggerMode=PLAN_ONLY +POST /api/eventhub/acquisition/tachograph/imports/configured-plans/{planKey}/start?triggerMode=EXECUTE +``` + +Manual start from a configured plan can override mode and strategy: + +```http +POST /api/eventhub/acquisition/tachograph/imports/configured-plans/kralowetz-tachograph-org-147/start?mode=INCREMENTAL_UPDATE&strategy=SOURCE_PACKAGE_WATERMARK&triggerMode=PLAN_ONLY +``` + +## Concrete extraction extension point + +The scheduler and import-run service are now implemented, but the generated skeleton still does not know the real tachograph DB SQL. The extension point is: + +```java +TachographExtractionBatchExecutor +``` + +Replace `NoopTachographExtractionBatchExecutor` with an implementation that: + +```text +1. receives importRunId, packageId, TachographImportRequest, planItem and time chunk +2. uses planItem.extractionCode to select the SQL statement +3. applies importScope organisation and occurred-time filters +4. applies source-package watermark or source-row watermark for incremental updates +5. maps rows to EventHubEventDto +6. sets sourcePackageRef when the row can be traced to an original card/VU package +7. sends events to direct:eventhub-normalized-input or EventHubIngestionService +8. returns TachographExtractionBatchResultDto with cursor watermarks +``` + +The import cursor is advanced only when the executor reports `executed=true`. The default no-op executor returns `executed=false`, so it does not move cursors accidentally. diff --git a/src/main/java/at/procon/eventhub/EventHubIngestionApplication.java b/src/main/java/at/procon/eventhub/EventHubIngestionApplication.java index 62bbeda..ae0660d 100644 --- a/src/main/java/at/procon/eventhub/EventHubIngestionApplication.java +++ b/src/main/java/at/procon/eventhub/EventHubIngestionApplication.java @@ -4,9 +4,11 @@ import at.procon.eventhub.config.EventHubProperties; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableConfigurationProperties(EventHubProperties.class) +@EnableScheduling public class EventHubIngestionApplication { public static void main(String[] args) { diff --git a/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java b/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java index 25ccacc..e47a851 100644 --- a/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java +++ b/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java @@ -1,21 +1,32 @@ package at.procon.eventhub.api; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.ConfiguredTachographImportPlanDto; import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubPackageIngestRequest; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.SchedulerTriggerMode; import at.procon.eventhub.dto.TachographImportRequest; import at.procon.eventhub.dto.TachographImportRunResultDto; +import at.procon.eventhub.dto.TachographImportTriggerResultDto; import at.procon.eventhub.dto.source.TachographActivityDto; import at.procon.eventhub.dto.source.TelematicsPositionDto; import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; +import at.procon.eventhub.service.TachographConfiguredImportPlanService; +import at.procon.eventhub.service.TachographImportExecutionService; import at.procon.eventhub.service.TachographImportPlanService; import jakarta.validation.Valid; +import java.time.OffsetDateTime; import java.util.List; import java.util.Map; import org.apache.camel.ProducerTemplate; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @@ -24,10 +35,19 @@ public class EventHubIngestionController { private final ProducerTemplate producerTemplate; private final TachographImportPlanService tachographImportPlanService; + private final TachographConfiguredImportPlanService configuredImportPlanService; + private final TachographImportExecutionService tachographImportExecutionService; - public EventHubIngestionController(ProducerTemplate producerTemplate, TachographImportPlanService tachographImportPlanService) { + public EventHubIngestionController( + ProducerTemplate producerTemplate, + TachographImportPlanService tachographImportPlanService, + TachographConfiguredImportPlanService configuredImportPlanService, + TachographImportExecutionService tachographImportExecutionService + ) { this.producerTemplate = producerTemplate; this.tachographImportPlanService = tachographImportPlanService; + this.configuredImportPlanService = configuredImportPlanService; + this.tachographImportExecutionService = tachographImportExecutionService; } @PostMapping("/yellowfox/d8-bookings") @@ -54,15 +74,47 @@ public class EventHubIngestionController { } @PostMapping("/tachograph/imports/start") - public ResponseEntity startTachographImport(@Valid @RequestBody TachographImportRequest request) { - TachographImportRunResultDto result = producerTemplate.requestBody( - "direct:tachograph-import-start", - request, - TachographImportRunResultDto.class - ); + public ResponseEntity startTachographImport( + @Valid @RequestBody TachographImportRequest request, + @RequestParam(defaultValue = "false") boolean execute + ) { + TachographImportRunResultDto result = execute + ? tachographImportExecutionService.startAndExecuteImport(request) + : producerTemplate.requestBody("direct:tachograph-import-start", request, TachographImportRunResultDto.class); return ResponseEntity.accepted().body(result); } + @GetMapping("/tachograph/imports/configured-plans") + public ResponseEntity> listConfiguredTachographPlans() { + return ResponseEntity.ok(configuredImportPlanService.listPlans()); + } + + @GetMapping("/tachograph/imports/configured-plans/{planKey}") + public ResponseEntity getConfiguredTachographPlan(@PathVariable String planKey) { + return ResponseEntity.ok(configuredImportPlanService.getPlan(planKey)); + } + + @PostMapping("/tachograph/imports/configured-plans/{planKey}/start") + public ResponseEntity startConfiguredTachographPlan( + @PathVariable String planKey, + @RequestParam(required = false) ImportMode mode, + @RequestParam(required = false) AcquisitionStrategy strategy, + @RequestParam(defaultValue = "PLAN_ONLY") SchedulerTriggerMode triggerMode + ) { + TachographImportRequest request = configuredImportPlanService.createRequest(planKey, mode, strategy); + TachographImportRunResultDto result = triggerMode == SchedulerTriggerMode.EXECUTE + ? tachographImportExecutionService.startAndExecuteImport(request) + : tachographImportExecutionService.startImport(request); + return ResponseEntity.accepted().body(new TachographImportTriggerResultDto( + planKey, + request.mode(), + request.acquisitionStrategy(), + triggerMode, + OffsetDateTime.now(), + result + )); + } + @PostMapping("/packages") public ResponseEntity> ingestPackage(@Valid @RequestBody EventHubPackageIngestRequest request) { producerTemplate.sendBody("direct:eventhub-package-input", request); diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index 088f29d..640c678 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -1,6 +1,19 @@ package at.procon.eventhub.config; import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SchedulerTriggerMode; +import at.procon.eventhub.dto.SourceGroupRefDto; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "eventhub") @@ -48,6 +61,17 @@ public class EventHubProperties { /** Overlap used by occurred-at fallback incremental imports. */ private Duration occurredAtOverlap = Duration.ofDays(7); + /** Regular scheduler scan interval; each configured plan still uses its own cron. */ + private Duration schedulerPollInterval = Duration.ofMinutes(1); + + /** Whether scheduled tachograph imports are enabled. */ + private boolean schedulerEnabled = false; + + private SchedulerTriggerMode schedulerTriggerMode = SchedulerTriggerMode.PLAN_ONLY; + + /** Configured tenant/source import plans. */ + private List importPlans = new ArrayList<>(); + public int getDefaultChunkDays() { return defaultChunkDays; } @@ -63,5 +87,186 @@ public class EventHubProperties { public void setOccurredAtOverlap(Duration occurredAtOverlap) { this.occurredAtOverlap = occurredAtOverlap; } + + public Duration getSchedulerPollInterval() { + return schedulerPollInterval; + } + + public void setSchedulerPollInterval(Duration schedulerPollInterval) { + this.schedulerPollInterval = schedulerPollInterval; + } + + public boolean isSchedulerEnabled() { + return schedulerEnabled; + } + + public void setSchedulerEnabled(boolean schedulerEnabled) { + this.schedulerEnabled = schedulerEnabled; + } + + public SchedulerTriggerMode getSchedulerTriggerMode() { + return schedulerTriggerMode; + } + + public void setSchedulerTriggerMode(SchedulerTriggerMode schedulerTriggerMode) { + this.schedulerTriggerMode = schedulerTriggerMode == null ? SchedulerTriggerMode.PLAN_ONLY : schedulerTriggerMode; + } + + public List getImportPlans() { + return importPlans; + } + + public void setImportPlans(List importPlans) { + this.importPlans = importPlans == null ? new ArrayList<>() : importPlans; + } + } + + public static class ConfiguredImportPlan { + private String planKey; + private boolean enabled = true; + private String cron; + private String tenantKey; + private EventSourceDto eventSource; + private SourceGroupRefDto sourceGroup; + private ImportScopeDto importScope; + private Set eventFamilies = EnumSet.allOf(EventFamily.class); + private ImportMode initialMode = ImportMode.INITIAL_BACKFILL; + private ImportMode scheduledMode = ImportMode.INCREMENTAL_UPDATE; + private AcquisitionStrategy initialStrategy = AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP; + private AcquisitionStrategy scheduledStrategy = AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK; + private boolean refreshMasterDataFirst = true; + private OffsetDateTime initialOccurredFrom; + private OffsetDateTime initialOccurredTo; + private boolean runInitialOnStartup = false; + + public String getPlanKey() { + return planKey; + } + + public void setPlanKey(String planKey) { + this.planKey = planKey; + } + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getCron() { + return cron; + } + + public void setCron(String cron) { + this.cron = cron; + } + + public String getTenantKey() { + return tenantKey; + } + + public void setTenantKey(String tenantKey) { + this.tenantKey = tenantKey; + } + + public EventSourceDto getEventSource() { + return eventSource; + } + + public void setEventSource(EventSourceDto eventSource) { + this.eventSource = eventSource; + } + + public SourceGroupRefDto getSourceGroup() { + return sourceGroup; + } + + public void setSourceGroup(SourceGroupRefDto sourceGroup) { + this.sourceGroup = sourceGroup; + } + + public ImportScopeDto getImportScope() { + return importScope; + } + + public void setImportScope(ImportScopeDto importScope) { + this.importScope = importScope; + } + + public Set getEventFamilies() { + return eventFamilies; + } + + public void setEventFamilies(Set eventFamilies) { + this.eventFamilies = eventFamilies == null || eventFamilies.isEmpty() + ? EnumSet.allOf(EventFamily.class) + : EnumSet.copyOf(eventFamilies); + } + + public ImportMode getInitialMode() { + return initialMode; + } + + public void setInitialMode(ImportMode initialMode) { + this.initialMode = initialMode; + } + + public ImportMode getScheduledMode() { + return scheduledMode; + } + + public void setScheduledMode(ImportMode scheduledMode) { + this.scheduledMode = scheduledMode; + } + + public AcquisitionStrategy getInitialStrategy() { + return initialStrategy; + } + + public void setInitialStrategy(AcquisitionStrategy initialStrategy) { + this.initialStrategy = initialStrategy; + } + + public AcquisitionStrategy getScheduledStrategy() { + return scheduledStrategy; + } + + public void setScheduledStrategy(AcquisitionStrategy scheduledStrategy) { + this.scheduledStrategy = scheduledStrategy; + } + + public boolean isRefreshMasterDataFirst() { + return refreshMasterDataFirst; + } + + public void setRefreshMasterDataFirst(boolean refreshMasterDataFirst) { + this.refreshMasterDataFirst = refreshMasterDataFirst; + } + + public OffsetDateTime getInitialOccurredFrom() { + return initialOccurredFrom; + } + + public void setInitialOccurredFrom(OffsetDateTime initialOccurredFrom) { + this.initialOccurredFrom = initialOccurredFrom; + } + + public OffsetDateTime getInitialOccurredTo() { + return initialOccurredTo; + } + + public void setInitialOccurredTo(OffsetDateTime initialOccurredTo) { + this.initialOccurredTo = initialOccurredTo; + } + + public boolean isRunInitialOnStartup() { + return runInitialOnStartup; + } + + public void setRunInitialOnStartup(boolean runInitialOnStartup) { + this.runInitialOnStartup = runInitialOnStartup; + } } } diff --git a/src/main/java/at/procon/eventhub/dto/ConfiguredTachographImportPlanDto.java b/src/main/java/at/procon/eventhub/dto/ConfiguredTachographImportPlanDto.java new file mode 100644 index 0000000..a751367 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/ConfiguredTachographImportPlanDto.java @@ -0,0 +1,24 @@ +package at.procon.eventhub.dto; + +import java.time.OffsetDateTime; +import java.util.Set; + +public record ConfiguredTachographImportPlanDto( + String planKey, + boolean enabled, + String cron, + String tenantKey, + EventSourceDto eventSource, + SourceGroupRefDto sourceGroup, + ImportScopeDto importScope, + Set eventFamilies, + ImportMode initialMode, + ImportMode scheduledMode, + AcquisitionStrategy initialStrategy, + AcquisitionStrategy scheduledStrategy, + boolean refreshMasterDataFirst, + OffsetDateTime initialOccurredFrom, + OffsetDateTime initialOccurredTo, + boolean runInitialOnStartup +) { +} diff --git a/src/main/java/at/procon/eventhub/dto/SchedulerTriggerMode.java b/src/main/java/at/procon/eventhub/dto/SchedulerTriggerMode.java new file mode 100644 index 0000000..0a08541 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/SchedulerTriggerMode.java @@ -0,0 +1,9 @@ +package at.procon.eventhub.dto; + +public enum SchedulerTriggerMode { + /** Scheduler creates import runs and extraction packages only. A worker/extractor can execute them later. */ + PLAN_ONLY, + + /** Scheduler creates the import run and immediately invokes the configured extraction executor. */ + EXECUTE +} diff --git a/src/main/java/at/procon/eventhub/dto/TachographExtractionBatchResultDto.java b/src/main/java/at/procon/eventhub/dto/TachographExtractionBatchResultDto.java new file mode 100644 index 0000000..a65ca92 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/TachographExtractionBatchResultDto.java @@ -0,0 +1,20 @@ +package at.procon.eventhub.dto; + +import java.time.OffsetDateTime; +import java.util.UUID; + +public record TachographExtractionBatchResultDto( + UUID packageId, + String extractionCode, + String sourceKind, + int sourceRowsRead, + int eventsMapped, + int eventsInserted, + int alreadyImported, + boolean executed, + OffsetDateTime lastSourcePackageImportedAt, + String lastSourcePackageId, + OffsetDateTime lastSourceRowUpdatedAt, + OffsetDateTime lastOccurredTo +) { +} diff --git a/src/main/java/at/procon/eventhub/dto/TachographImportTriggerResultDto.java b/src/main/java/at/procon/eventhub/dto/TachographImportTriggerResultDto.java new file mode 100644 index 0000000..2ccc324 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/TachographImportTriggerResultDto.java @@ -0,0 +1,13 @@ +package at.procon.eventhub.dto; + +import java.time.OffsetDateTime; + +public record TachographImportTriggerResultDto( + String planKey, + ImportMode mode, + AcquisitionStrategy acquisitionStrategy, + SchedulerTriggerMode triggerMode, + OffsetDateTime triggeredAt, + TachographImportRunResultDto runResult +) { +} diff --git a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java index 7fef2f9..cbbfbc3 100644 --- a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java @@ -170,6 +170,18 @@ public class DataPackageRepository { ); } + public void markImporting(UUID packageId) { + jdbcTemplate.update( + """ + update eventhub.data_package + set status = ? + where id = ? + """, + DataPackageStatus.IMPORTING.name(), + packageId + ); + } + public void markImported(UUID packageId, int insertedCount) { jdbcTemplate.update( """ diff --git a/src/main/java/at/procon/eventhub/persistence/ImportCursorRepository.java b/src/main/java/at/procon/eventhub/persistence/ImportCursorRepository.java new file mode 100644 index 0000000..68615bc --- /dev/null +++ b/src/main/java/at/procon/eventhub/persistence/ImportCursorRepository.java @@ -0,0 +1,56 @@ +package at.procon.eventhub.persistence; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.TachographExtractionBatchResultDto; +import java.util.UUID; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +public class ImportCursorRepository { + + private final JdbcTemplate jdbcTemplate; + + public ImportCursorRepository(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + public void advanceCursor( + String tenantKey, + int eventSourceId, + String scopeHash, + EventFamily eventFamily, + String sourceKind, + AcquisitionStrategy strategy, + TachographExtractionBatchResultDto result + ) { + jdbcTemplate.update( + """ + insert into eventhub.import_cursor( + id, tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type, + last_source_package_imported_at, last_source_package_id, + last_source_row_updated_at, last_occurred_to, updated_at + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now()) + on conflict (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type) + do update set + last_source_package_imported_at = coalesce(excluded.last_source_package_imported_at, eventhub.import_cursor.last_source_package_imported_at), + last_source_package_id = coalesce(excluded.last_source_package_id, eventhub.import_cursor.last_source_package_id), + last_source_row_updated_at = coalesce(excluded.last_source_row_updated_at, eventhub.import_cursor.last_source_row_updated_at), + last_occurred_to = coalesce(excluded.last_occurred_to, eventhub.import_cursor.last_occurred_to), + updated_at = now() + """, + UUID.randomUUID(), + tenantKey, + eventSourceId, + scopeHash, + eventFamily.name(), + sourceKind, + strategy.name(), + result.lastSourcePackageImportedAt(), + result.lastSourcePackageId(), + result.lastSourceRowUpdatedAt(), + result.lastOccurredTo() + ); + } +} diff --git a/src/main/java/at/procon/eventhub/service/NoopTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/service/NoopTachographExtractionBatchExecutor.java new file mode 100644 index 0000000..33343bf --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/NoopTachographExtractionBatchExecutor.java @@ -0,0 +1,49 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.TachographExtractionBatchResultDto; +import at.procon.eventhub.dto.TachographImportPlanItemDto; +import at.procon.eventhub.dto.TachographImportRequest; +import at.procon.eventhub.dto.TimeChunkDto; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +/** + * Default executor used by the generated skeleton. It marks the route contract + * and scheduling/execution lifecycle but intentionally does not read the real + * tachograph DB. Replace this bean with an implementation that executes the + * SQL for each extractionCode and sends mapped EventHubEventDto records to + * direct:eventhub-normalized-input. + */ +@Service +public class NoopTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor { + + private static final Logger log = LoggerFactory.getLogger(NoopTachographExtractionBatchExecutor.class); + + @Override + public TachographExtractionBatchResultDto execute( + UUID importRunId, + UUID packageId, + TachographImportRequest request, + TachographImportPlanItemDto planItem, + TimeChunkDto chunk + ) { + log.warn("No concrete tachograph SQL extractor configured. importRunId={} packageId={} extractionCode={} sourceKind={} chunk={}", + importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), chunk.sequence()); + return new TachographExtractionBatchResultDto( + packageId, + planItem.extractionCode(), + planItem.sourceKind(), + 0, + 0, + 0, + 0, + false, + null, + null, + null, + null + ); + } +} diff --git a/src/main/java/at/procon/eventhub/service/TachographConfiguredImportPlanService.java b/src/main/java/at/procon/eventhub/service/TachographConfiguredImportPlanService.java new file mode 100644 index 0000000..8d24867 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographConfiguredImportPlanService.java @@ -0,0 +1,110 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.ConfiguredTachographImportPlanDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.TachographImportRequest; +import java.util.List; +import java.util.Locale; +import java.util.NoSuchElementException; +import org.springframework.stereotype.Service; + +@Service +public class TachographConfiguredImportPlanService { + + private final EventHubProperties properties; + + public TachographConfiguredImportPlanService(EventHubProperties properties) { + this.properties = properties; + } + + public List listPlans() { + return properties.getTachograph().getImportPlans().stream() + .map(this::toDto) + .toList(); + } + + public ConfiguredTachographImportPlanDto getPlan(String planKey) { + return properties.getTachograph().getImportPlans().stream() + .filter(plan -> normalize(plan.getPlanKey()).equals(normalize(planKey))) + .findFirst() + .map(this::toDto) + .orElseThrow(() -> new NoSuchElementException("No configured tachograph import plan found for key " + planKey)); + } + + public TachographImportRequest createRequest(String planKey, ImportMode modeOverride, AcquisitionStrategy strategyOverride) { + EventHubProperties.ConfiguredImportPlan plan = properties.getTachograph().getImportPlans().stream() + .filter(candidate -> normalize(candidate.getPlanKey()).equals(normalize(planKey))) + .findFirst() + .orElseThrow(() -> new NoSuchElementException("No configured tachograph import plan found for key " + planKey)); + return createRequest(plan, modeOverride, strategyOverride, false); + } + + public TachographImportRequest createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) { + return createRequest(plan, plan.getScheduledMode(), plan.getScheduledStrategy(), false); + } + + public TachographImportRequest createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) { + return createRequest(plan, plan.getInitialMode(), plan.getInitialStrategy(), true); + } + + private TachographImportRequest createRequest( + EventHubProperties.ConfiguredImportPlan plan, + ImportMode modeOverride, + AcquisitionStrategy strategyOverride, + boolean applyInitialOccurredWindow + ) { + ImportMode mode = modeOverride == null ? plan.getScheduledMode() : modeOverride; + AcquisitionStrategy strategy = strategyOverride == null + ? (mode == ImportMode.INCREMENTAL_UPDATE ? plan.getScheduledStrategy() : plan.getInitialStrategy()) + : strategyOverride; + ImportScopeDto scope = plan.getImportScope(); + if (applyInitialOccurredWindow && scope != null + && (plan.getInitialOccurredFrom() != null || plan.getInitialOccurredTo() != null)) { + scope = new ImportScopeDto( + scope.type(), + scope.rootSourceOrganisation(), + scope.includeChildren(), + plan.getInitialOccurredFrom() == null ? scope.occurredFrom() : plan.getInitialOccurredFrom(), + plan.getInitialOccurredTo() == null ? scope.occurredTo() : plan.getInitialOccurredTo() + ); + } + return new TachographImportRequest( + plan.getTenantKey(), + plan.getEventSource(), + plan.getSourceGroup(), + scope, + plan.getEventFamilies(), + mode, + plan.isRefreshMasterDataFirst(), + strategy + ); + } + + private ConfiguredTachographImportPlanDto toDto(EventHubProperties.ConfiguredImportPlan plan) { + return new ConfiguredTachographImportPlanDto( + plan.getPlanKey(), + plan.isEnabled(), + plan.getCron(), + plan.getTenantKey(), + plan.getEventSource(), + plan.getSourceGroup(), + plan.getImportScope(), + plan.getEventFamilies(), + plan.getInitialMode(), + plan.getScheduledMode(), + plan.getInitialStrategy(), + plan.getScheduledStrategy(), + plan.isRefreshMasterDataFirst(), + plan.getInitialOccurredFrom(), + plan.getInitialOccurredTo(), + plan.isRunInitialOnStartup() + ); + } + + private String normalize(String value) { + return value == null ? "" : value.trim().toLowerCase(Locale.ROOT); + } +} diff --git a/src/main/java/at/procon/eventhub/service/TachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/service/TachographExtractionBatchExecutor.java new file mode 100644 index 0000000..4ad9ebf --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographExtractionBatchExecutor.java @@ -0,0 +1,18 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.TachographExtractionBatchResultDto; +import at.procon.eventhub.dto.TachographImportPlanItemDto; +import at.procon.eventhub.dto.TachographImportRequest; +import at.procon.eventhub.dto.TimeChunkDto; +import java.util.UUID; + +public interface TachographExtractionBatchExecutor { + + TachographExtractionBatchResultDto execute( + UUID importRunId, + UUID packageId, + TachographImportRequest request, + TachographImportPlanItemDto planItem, + TimeChunkDto chunk + ); +} diff --git a/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java b/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java index 1aa85b6..dd775f3 100644 --- a/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java @@ -4,6 +4,7 @@ import at.procon.eventhub.dto.DataPackageType; import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportRunStatus; +import at.procon.eventhub.dto.TachographExtractionBatchResultDto; import at.procon.eventhub.dto.TachographImportPlanDto; import at.procon.eventhub.dto.TachographImportPlanItemDto; import at.procon.eventhub.dto.TachographImportRequest; @@ -12,6 +13,7 @@ import at.procon.eventhub.dto.TimeChunkDto; import at.procon.eventhub.persistence.DataPackageRepository; import at.procon.eventhub.persistence.EventSourceRepository; import at.procon.eventhub.persistence.ImportRunRepository; +import at.procon.eventhub.persistence.ImportCursorRepository; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -25,10 +27,9 @@ import org.springframework.transaction.annotation.Transactional; /** * Creates import runs and extraction data packages for tachograph acquisition. * - * This service deliberately creates EventHub packages for extraction batches. The - * original tachograph card/VU package is not treated as the EventHub package; it - * is preserved later as SourcePackageRefDto on acquired events or in batch - * metadata when an extractor processes one concrete source package. + * EventHub data packages are extraction batches. The original tachograph card/VU + * package is preserved later as SourcePackageRefDto on acquired events or in + * batch metadata when an extractor processes one concrete source package. */ @Service public class TachographImportExecutionService { @@ -39,29 +40,51 @@ public class TachographImportExecutionService { private final EventSourceRepository eventSourceRepository; private final ImportRunRepository importRunRepository; private final DataPackageRepository dataPackageRepository; + private final ImportCursorRepository importCursorRepository; + private final TachographMasterDataRefreshService masterDataRefreshService; + private final TachographExtractionBatchExecutor extractionBatchExecutor; public TachographImportExecutionService( TachographImportPlanService planService, EventSourceRepository eventSourceRepository, ImportRunRepository importRunRepository, - DataPackageRepository dataPackageRepository + DataPackageRepository dataPackageRepository, + ImportCursorRepository importCursorRepository, + TachographMasterDataRefreshService masterDataRefreshService, + TachographExtractionBatchExecutor extractionBatchExecutor ) { this.planService = planService; this.eventSourceRepository = eventSourceRepository; this.importRunRepository = importRunRepository; this.dataPackageRepository = dataPackageRepository; + this.importCursorRepository = importCursorRepository; + this.masterDataRefreshService = masterDataRefreshService; + this.extractionBatchExecutor = extractionBatchExecutor; } @Transactional public TachographImportRunResultDto startImport(TachographImportRequest request) { + return createImportRun(request, false); + } + + @Transactional + public TachographImportRunResultDto startAndExecuteImport(TachographImportRequest request) { + return createImportRun(request, true); + } + + private TachographImportRunResultDto createImportRun(TachographImportRequest request, boolean executeImmediately) { TachographImportPlanDto plan = planService.createPlan(request); int baseEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), request.eventSource()); UUID importRunId = importRunRepository.createPlannedRun(baseEventSourceId, request, Map.of( - "note", "Created tachograph import run and planned extraction packages. SQL extraction is handled by event-family routes.", - "packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto." + "note", executeImmediately + ? "Created tachograph import run and executing planned extraction packages." + : "Created tachograph import run and planned extraction packages.", + "packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto.", + "executeImmediately", executeImmediately )); List packageIds = new ArrayList<>(); + List plannedPackages = new ArrayList<>(); int batchNo = 1; try { for (TachographImportPlanItemDto item : plan.items()) { @@ -81,12 +104,18 @@ public class TachographImportExecutionService { metadata(request, item, chunk, importRunId) ); packageIds.add(packageId); + plannedPackages.add(new PlannedPackage(packageId, itemEventSourceId, item, chunk)); batchNo++; } } importRunRepository.markPlannedPackages(importRunId, packageIds.size()); - log.info("Created tachograph import run importRunId={} plannedPackages={} tenant={} mode={} strategy={}", - importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy()); + log.info("Created tachograph import run importRunId={} plannedPackages={} tenant={} mode={} strategy={} executeImmediately={}", + importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy(), executeImmediately); + + if (executeImmediately) { + executePlannedPackages(importRunId, request, plannedPackages); + return new TachographImportRunResultDto(importRunId, ImportRunStatus.COMPLETED, packageIds.size(), plan, List.copyOf(packageIds)); + } return new TachographImportRunResultDto(importRunId, ImportRunStatus.PLANNED, packageIds.size(), plan, List.copyOf(packageIds)); } catch (RuntimeException ex) { importRunRepository.markFailed(importRunId, ex.getMessage()); @@ -94,6 +123,41 @@ public class TachographImportExecutionService { } } + private void executePlannedPackages(UUID importRunId, TachographImportRequest request, List plannedPackages) { + importRunRepository.markRunning(importRunId); + masterDataRefreshService.refreshIfRequested(request); + List results = new ArrayList<>(); + for (PlannedPackage plannedPackage : plannedPackages) { + dataPackageRepository.markImporting(plannedPackage.packageId()); + TachographExtractionBatchResultDto result = extractionBatchExecutor.execute( + importRunId, + plannedPackage.packageId(), + request, + plannedPackage.planItem(), + plannedPackage.chunk() + ); + results.add(result); + dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted()); + if (result.executed()) { + importCursorRepository.advanceCursor( + request.tenantKey(), + plannedPackage.eventSourceId(), + request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(), + plannedPackage.planItem().eventFamily(), + plannedPackage.planItem().sourceKind(), + request.acquisitionStrategy(), + result + ); + } + } + importRunRepository.markCompleted(importRunId); + log.info("Completed tachograph import run importRunId={} packages={} insertedEvents={} executedBatches={}", + importRunId, + plannedPackages.size(), + results.stream().mapToInt(TachographExtractionBatchResultDto::eventsInserted).sum(), + results.stream().filter(TachographExtractionBatchResultDto::executed).count()); + } + private EventSourceDto eventSourceForItem(EventSourceDto base, TachographImportPlanItemDto item) { String sourceKind = item.sourceKind(); String sourceKey = switch (sourceKind) { @@ -161,4 +225,7 @@ public class TachographImportExecutionService { metadata.put("sourcePackageRefPolicy", "Original tachograph card/VU package is preserved per acquired event when SQL extraction returns it."); return metadata; } + + private record PlannedPackage(UUID packageId, int eventSourceId, TachographImportPlanItemDto planItem, TimeChunkDto chunk) { + } } diff --git a/src/main/java/at/procon/eventhub/service/TachographImportScheduler.java b/src/main/java/at/procon/eventhub/service/TachographImportScheduler.java new file mode 100644 index 0000000..946b044 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographImportScheduler.java @@ -0,0 +1,97 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.SchedulerTriggerMode; +import at.procon.eventhub.dto.TachographImportRequest; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.support.CronExpression; +import org.springframework.stereotype.Service; + +@Service +public class TachographImportScheduler { + + private static final Logger log = LoggerFactory.getLogger(TachographImportScheduler.class); + + private final EventHubProperties properties; + private final TachographConfiguredImportPlanService configuredPlanService; + private final TachographImportExecutionService executionService; + private final Map nextRunByPlan = new ConcurrentHashMap<>(); + private final Map runningByPlan = new ConcurrentHashMap<>(); + + public TachographImportScheduler( + EventHubProperties properties, + TachographConfiguredImportPlanService configuredPlanService, + TachographImportExecutionService executionService + ) { + this.properties = properties; + this.configuredPlanService = configuredPlanService; + this.executionService = executionService; + } + + @EventListener(ApplicationReadyEvent.class) + public void triggerInitialPlansOnStartup() { + if (!properties.getTachograph().isSchedulerEnabled()) { + return; + } + for (EventHubProperties.ConfiguredImportPlan plan : properties.getTachograph().getImportPlans()) { + if (plan.isEnabled() && plan.getPlanKey() != null && !plan.getPlanKey().isBlank() && plan.isRunInitialOnStartup()) { + triggerPlan(plan, true, properties.getTachograph().getSchedulerTriggerMode()); + } + } + } + + @Scheduled(fixedDelayString = "${eventhub.tachograph.scheduler-poll-interval-ms:60000}") + public void pollConfiguredPlans() { + if (!properties.getTachograph().isSchedulerEnabled()) { + return; + } + ZonedDateTime now = ZonedDateTime.now(ZoneId.systemDefault()); + for (EventHubProperties.ConfiguredImportPlan plan : properties.getTachograph().getImportPlans()) { + if (!plan.isEnabled() || plan.getPlanKey() == null || plan.getPlanKey().isBlank() || plan.getCron() == null || plan.getCron().isBlank()) { + continue; + } + String key = plan.getPlanKey(); + ZonedDateTime next = nextRunByPlan.computeIfAbsent(key, ignored -> CronExpression.parse(plan.getCron()).next(now)); + if (next != null && !next.isAfter(now)) { + triggerPlan(plan, false, properties.getTachograph().getSchedulerTriggerMode()); + nextRunByPlan.put(key, CronExpression.parse(plan.getCron()).next(now.plusSeconds(1))); + } + } + } + + private void triggerPlan(EventHubProperties.ConfiguredImportPlan plan, boolean initial, SchedulerTriggerMode triggerMode) { + String key = plan.getPlanKey(); + AtomicBoolean running = runningByPlan.computeIfAbsent(key, ignored -> new AtomicBoolean(false)); + if (!running.compareAndSet(false, true)) { + log.info("Skipping tachograph import plan={} because a previous run is still active", key); + return; + } + try { + TachographImportRequest request = initial + ? configuredPlanService.createInitialRequest(plan) + : configuredPlanService.createScheduledRequest(plan); + log.info("Triggering tachograph import plan={} initial={} mode={} strategy={} triggerMode={} at={}", + key, initial, request.mode(), request.acquisitionStrategy(), triggerMode, OffsetDateTime.now()); + if (triggerMode == SchedulerTriggerMode.EXECUTE) { + executionService.startAndExecuteImport(request); + } else { + executionService.startImport(request); + } + } catch (RuntimeException ex) { + log.error("Tachograph import plan={} failed to trigger", key, ex); + } finally { + running.set(false); + } + } +} diff --git a/src/main/java/at/procon/eventhub/service/TachographMasterDataRefreshService.java b/src/main/java/at/procon/eventhub/service/TachographMasterDataRefreshService.java new file mode 100644 index 0000000..a1e5f5e --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographMasterDataRefreshService.java @@ -0,0 +1,27 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.TachographImportRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +/** + * Hook for refreshing tachograph master data before event extraction. + * + * The generated project does not yet know the concrete tachograph master-data + * schema. Replace/extend this service with SQL readers for organisations, + * vehicles, vehicle registrations, drivers, and driver cards. + */ +@Service +public class TachographMasterDataRefreshService { + + private static final Logger log = LoggerFactory.getLogger(TachographMasterDataRefreshService.class); + + public void refreshIfRequested(TachographImportRequest request) { + if (!request.refreshMasterDataFirst()) { + return; + } + log.info("Tachograph master-data refresh requested for tenant={} source={}. Concrete SQL refresh is a project-specific extension point.", + request.tenantKey(), request.eventSource().stableKey()); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 227d06e..6e5e5e0 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -32,3 +32,56 @@ eventhub: tachograph: default-chunk-days: 1 occurred-at-overlap: 7d + + # Enables the scheduler that regularly triggers configured tachograph import plans. + scheduler-enabled: false + scheduler-poll-interval-ms: 60000 + + # PLAN_ONLY creates import_run + planned extraction packages. + # EXECUTE also invokes the configured TachographExtractionBatchExecutor. + scheduler-trigger-mode: PLAN_ONLY + + # Example plan. Keep disabled until the tachograph datasource/extractor is wired. + import-plans: + - plan-key: kralowetz-tachograph-org-147 + enabled: false + cron: "0 15 * * * *" # hourly at minute 15 + tenant-key: kralowetz + event-source: + provider-key: TACHOGRAPH + source-kind: MIXED + source-key: TACHOGRAPH_DB + source-instance-key: tachograph-prod-db + tenant-provider-setting-key: kralowetz-tachograph-prod + source-group: + type: ORGANISATION + source-entity-id: "147" + code: "147" + name: Kralowetz root organisation + import-scope: + type: SOURCE_ORGANISATION_SUBTREE + root-source-organisation: + type: ORGANISATION + source-entity-id: "147" + code: "147" + name: Kralowetz root organisation + include-children: true + occurred-from: null + occurred-to: null + event-families: + - DRIVER_ACTIVITY + - DRIVER_CARD + - POSITION + - BORDER_CROSSING + - LOAD_UNLOAD + - PLACE + - SPECIFIC_CONDITION + - SPEEDING + initial-mode: INITIAL_BACKFILL + scheduled-mode: INCREMENTAL_UPDATE + initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP + scheduled-strategy: SOURCE_PACKAGE_WATERMARK + refresh-master-data-first: true + initial-occurred-from: "2025-01-01T00:00:00+01:00" + initial-occurred-to: null + run-initial-on-startup: false