Add configured tachograph import scheduler

This commit is contained in:
trifonovt 2026-04-30 12:46:14 +02:00
parent c52712f881
commit 21a4fe12fb
17 changed files with 925 additions and 16 deletions

View File

@ -711,3 +711,98 @@ planned data_package
-> send to direct:eventhub-normalized-input
-> only advance eventhub.import_cursor after successful import
```
## Configurable scheduled tachograph imports
The project now supports configuration-driven tachograph import plans. A configured plan describes:
```text
tenant
EventSource
optional sourceGroup, e.g. tachograph root organisation
ImportScope, including organisation subtree and occurred-time filter
event families
initial backfill strategy
scheduled incremental strategy
cron schedule
```
Example configuration is included in `application.yml` under:
```yaml
eventhub:
tachograph:
scheduler-enabled: false
scheduler-poll-interval-ms: 60000
scheduler-trigger-mode: PLAN_ONLY
import-plans:
- plan-key: kralowetz-tachograph-org-147
enabled: false
cron: "0 15 * * * *"
tenant-key: kralowetz
event-source:
provider-key: TACHOGRAPH
source-kind: MIXED
source-key: TACHOGRAPH_DB
source-instance-key: tachograph-prod-db
tenant-provider-setting-key: kralowetz-tachograph-prod
import-scope:
type: SOURCE_ORGANISATION_SUBTREE
root-source-organisation:
type: ORGANISATION
source-entity-id: "147"
include-children: true
occurred-from: null
occurred-to: null
event-families:
- DRIVER_ACTIVITY
- DRIVER_CARD
- POSITION
initial-mode: INITIAL_BACKFILL
scheduled-mode: INCREMENTAL_UPDATE
initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP
scheduled-strategy: SOURCE_PACKAGE_WATERMARK
refresh-master-data-first: true
initial-occurred-from: "2025-01-01T00:00:00+01:00"
run-initial-on-startup: false
```
`PLAN_ONLY` creates an `import_run` plus planned extraction `data_package` rows. `EXECUTE` also invokes the configured `TachographExtractionBatchExecutor`. The generated project provides a no-op executor as an extension point; replace it with a SQL/JDBC extractor that reads the real tachograph DB.
Configured plan endpoints:
```http
GET /api/eventhub/acquisition/tachograph/imports/configured-plans
GET /api/eventhub/acquisition/tachograph/imports/configured-plans/{planKey}
POST /api/eventhub/acquisition/tachograph/imports/configured-plans/{planKey}/start?triggerMode=PLAN_ONLY
POST /api/eventhub/acquisition/tachograph/imports/configured-plans/{planKey}/start?triggerMode=EXECUTE
```
Manual start from a configured plan can override mode and strategy:
```http
POST /api/eventhub/acquisition/tachograph/imports/configured-plans/kralowetz-tachograph-org-147/start?mode=INCREMENTAL_UPDATE&strategy=SOURCE_PACKAGE_WATERMARK&triggerMode=PLAN_ONLY
```
## Concrete extraction extension point
The scheduler and import-run service are now implemented, but the generated skeleton still does not know the real tachograph DB SQL. The extension point is:
```java
TachographExtractionBatchExecutor
```
Replace `NoopTachographExtractionBatchExecutor` with an implementation that:
```text
1. receives importRunId, packageId, TachographImportRequest, planItem and time chunk
2. uses planItem.extractionCode to select the SQL statement
3. applies importScope organisation and occurred-time filters
4. applies source-package watermark or source-row watermark for incremental updates
5. maps rows to EventHubEventDto
6. sets sourcePackageRef when the row can be traced to an original card/VU package
7. sends events to direct:eventhub-normalized-input or EventHubIngestionService
8. returns TachographExtractionBatchResultDto with cursor watermarks
```
The import cursor is advanced only when the executor reports `executed=true`. The default no-op executor returns `executed=false`, so it does not move cursors accidentally.

View File

@ -4,9 +4,11 @@ import at.procon.eventhub.config.EventHubProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableConfigurationProperties(EventHubProperties.class)
@EnableScheduling
public class EventHubIngestionApplication {
public static void main(String[] args) {

View File

@ -1,21 +1,32 @@
package at.procon.eventhub.api;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.ConfiguredTachographImportPlanDto;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageIngestRequest;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.SchedulerTriggerMode;
import at.procon.eventhub.dto.TachographImportRequest;
import at.procon.eventhub.dto.TachographImportRunResultDto;
import at.procon.eventhub.dto.TachographImportTriggerResultDto;
import at.procon.eventhub.dto.source.TachographActivityDto;
import at.procon.eventhub.dto.source.TelematicsPositionDto;
import at.procon.eventhub.dto.source.YellowFoxD8BookingDto;
import at.procon.eventhub.service.TachographConfiguredImportPlanService;
import at.procon.eventhub.service.TachographImportExecutionService;
import at.procon.eventhub.service.TachographImportPlanService;
import jakarta.validation.Valid;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import org.apache.camel.ProducerTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@ -24,10 +35,19 @@ public class EventHubIngestionController {
private final ProducerTemplate producerTemplate;
private final TachographImportPlanService tachographImportPlanService;
private final TachographConfiguredImportPlanService configuredImportPlanService;
private final TachographImportExecutionService tachographImportExecutionService;
public EventHubIngestionController(ProducerTemplate producerTemplate, TachographImportPlanService tachographImportPlanService) {
public EventHubIngestionController(
ProducerTemplate producerTemplate,
TachographImportPlanService tachographImportPlanService,
TachographConfiguredImportPlanService configuredImportPlanService,
TachographImportExecutionService tachographImportExecutionService
) {
this.producerTemplate = producerTemplate;
this.tachographImportPlanService = tachographImportPlanService;
this.configuredImportPlanService = configuredImportPlanService;
this.tachographImportExecutionService = tachographImportExecutionService;
}
@PostMapping("/yellowfox/d8-bookings")
@ -54,15 +74,47 @@ public class EventHubIngestionController {
}
@PostMapping("/tachograph/imports/start")
public ResponseEntity<TachographImportRunResultDto> startTachographImport(@Valid @RequestBody TachographImportRequest request) {
TachographImportRunResultDto result = producerTemplate.requestBody(
"direct:tachograph-import-start",
request,
TachographImportRunResultDto.class
);
public ResponseEntity<TachographImportRunResultDto> startTachographImport(
@Valid @RequestBody TachographImportRequest request,
@RequestParam(defaultValue = "false") boolean execute
) {
TachographImportRunResultDto result = execute
? tachographImportExecutionService.startAndExecuteImport(request)
: producerTemplate.requestBody("direct:tachograph-import-start", request, TachographImportRunResultDto.class);
return ResponseEntity.accepted().body(result);
}
@GetMapping("/tachograph/imports/configured-plans")
public ResponseEntity<List<ConfiguredTachographImportPlanDto>> listConfiguredTachographPlans() {
return ResponseEntity.ok(configuredImportPlanService.listPlans());
}
@GetMapping("/tachograph/imports/configured-plans/{planKey}")
public ResponseEntity<ConfiguredTachographImportPlanDto> getConfiguredTachographPlan(@PathVariable String planKey) {
return ResponseEntity.ok(configuredImportPlanService.getPlan(planKey));
}
@PostMapping("/tachograph/imports/configured-plans/{planKey}/start")
public ResponseEntity<TachographImportTriggerResultDto> startConfiguredTachographPlan(
@PathVariable String planKey,
@RequestParam(required = false) ImportMode mode,
@RequestParam(required = false) AcquisitionStrategy strategy,
@RequestParam(defaultValue = "PLAN_ONLY") SchedulerTriggerMode triggerMode
) {
TachographImportRequest request = configuredImportPlanService.createRequest(planKey, mode, strategy);
TachographImportRunResultDto result = triggerMode == SchedulerTriggerMode.EXECUTE
? tachographImportExecutionService.startAndExecuteImport(request)
: tachographImportExecutionService.startImport(request);
return ResponseEntity.accepted().body(new TachographImportTriggerResultDto(
planKey,
request.mode(),
request.acquisitionStrategy(),
triggerMode,
OffsetDateTime.now(),
result
));
}
@PostMapping("/packages")
public ResponseEntity<Map<String, Object>> ingestPackage(@Valid @RequestBody EventHubPackageIngestRequest request) {
producerTemplate.sendBody("direct:eventhub-package-input", request);

View File

@ -1,6 +1,19 @@
package at.procon.eventhub.config;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SchedulerTriggerMode;
import at.procon.eventhub.dto.SourceGroupRefDto;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "eventhub")
@ -48,6 +61,17 @@ public class EventHubProperties {
/** Overlap used by occurred-at fallback incremental imports. */
private Duration occurredAtOverlap = Duration.ofDays(7);
/** Regular scheduler scan interval; each configured plan still uses its own cron. */
private Duration schedulerPollInterval = Duration.ofMinutes(1);
/** Whether scheduled tachograph imports are enabled. */
private boolean schedulerEnabled = false;
private SchedulerTriggerMode schedulerTriggerMode = SchedulerTriggerMode.PLAN_ONLY;
/** Configured tenant/source import plans. */
private List<ConfiguredImportPlan> importPlans = new ArrayList<>();
public int getDefaultChunkDays() {
return defaultChunkDays;
}
@ -63,5 +87,186 @@ public class EventHubProperties {
public void setOccurredAtOverlap(Duration occurredAtOverlap) {
this.occurredAtOverlap = occurredAtOverlap;
}
public Duration getSchedulerPollInterval() {
return schedulerPollInterval;
}
public void setSchedulerPollInterval(Duration schedulerPollInterval) {
this.schedulerPollInterval = schedulerPollInterval;
}
public boolean isSchedulerEnabled() {
return schedulerEnabled;
}
public void setSchedulerEnabled(boolean schedulerEnabled) {
this.schedulerEnabled = schedulerEnabled;
}
public SchedulerTriggerMode getSchedulerTriggerMode() {
return schedulerTriggerMode;
}
public void setSchedulerTriggerMode(SchedulerTriggerMode schedulerTriggerMode) {
this.schedulerTriggerMode = schedulerTriggerMode == null ? SchedulerTriggerMode.PLAN_ONLY : schedulerTriggerMode;
}
public List<ConfiguredImportPlan> getImportPlans() {
return importPlans;
}
public void setImportPlans(List<ConfiguredImportPlan> importPlans) {
this.importPlans = importPlans == null ? new ArrayList<>() : importPlans;
}
}
public static class ConfiguredImportPlan {
private String planKey;
private boolean enabled = true;
private String cron;
private String tenantKey;
private EventSourceDto eventSource;
private SourceGroupRefDto sourceGroup;
private ImportScopeDto importScope;
private Set<EventFamily> eventFamilies = EnumSet.allOf(EventFamily.class);
private ImportMode initialMode = ImportMode.INITIAL_BACKFILL;
private ImportMode scheduledMode = ImportMode.INCREMENTAL_UPDATE;
private AcquisitionStrategy initialStrategy = AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP;
private AcquisitionStrategy scheduledStrategy = AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK;
private boolean refreshMasterDataFirst = true;
private OffsetDateTime initialOccurredFrom;
private OffsetDateTime initialOccurredTo;
private boolean runInitialOnStartup = false;
public String getPlanKey() {
return planKey;
}
public void setPlanKey(String planKey) {
this.planKey = planKey;
}
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
this.cron = cron;
}
public String getTenantKey() {
return tenantKey;
}
public void setTenantKey(String tenantKey) {
this.tenantKey = tenantKey;
}
public EventSourceDto getEventSource() {
return eventSource;
}
public void setEventSource(EventSourceDto eventSource) {
this.eventSource = eventSource;
}
public SourceGroupRefDto getSourceGroup() {
return sourceGroup;
}
public void setSourceGroup(SourceGroupRefDto sourceGroup) {
this.sourceGroup = sourceGroup;
}
public ImportScopeDto getImportScope() {
return importScope;
}
public void setImportScope(ImportScopeDto importScope) {
this.importScope = importScope;
}
public Set<EventFamily> getEventFamilies() {
return eventFamilies;
}
public void setEventFamilies(Set<EventFamily> eventFamilies) {
this.eventFamilies = eventFamilies == null || eventFamilies.isEmpty()
? EnumSet.allOf(EventFamily.class)
: EnumSet.copyOf(eventFamilies);
}
public ImportMode getInitialMode() {
return initialMode;
}
public void setInitialMode(ImportMode initialMode) {
this.initialMode = initialMode;
}
public ImportMode getScheduledMode() {
return scheduledMode;
}
public void setScheduledMode(ImportMode scheduledMode) {
this.scheduledMode = scheduledMode;
}
public AcquisitionStrategy getInitialStrategy() {
return initialStrategy;
}
public void setInitialStrategy(AcquisitionStrategy initialStrategy) {
this.initialStrategy = initialStrategy;
}
public AcquisitionStrategy getScheduledStrategy() {
return scheduledStrategy;
}
public void setScheduledStrategy(AcquisitionStrategy scheduledStrategy) {
this.scheduledStrategy = scheduledStrategy;
}
public boolean isRefreshMasterDataFirst() {
return refreshMasterDataFirst;
}
public void setRefreshMasterDataFirst(boolean refreshMasterDataFirst) {
this.refreshMasterDataFirst = refreshMasterDataFirst;
}
public OffsetDateTime getInitialOccurredFrom() {
return initialOccurredFrom;
}
public void setInitialOccurredFrom(OffsetDateTime initialOccurredFrom) {
this.initialOccurredFrom = initialOccurredFrom;
}
public OffsetDateTime getInitialOccurredTo() {
return initialOccurredTo;
}
public void setInitialOccurredTo(OffsetDateTime initialOccurredTo) {
this.initialOccurredTo = initialOccurredTo;
}
public boolean isRunInitialOnStartup() {
return runInitialOnStartup;
}
public void setRunInitialOnStartup(boolean runInitialOnStartup) {
this.runInitialOnStartup = runInitialOnStartup;
}
}
}

View File

@ -0,0 +1,24 @@
package at.procon.eventhub.dto;
import java.time.OffsetDateTime;
import java.util.Set;
public record ConfiguredTachographImportPlanDto(
String planKey,
boolean enabled,
String cron,
String tenantKey,
EventSourceDto eventSource,
SourceGroupRefDto sourceGroup,
ImportScopeDto importScope,
Set<EventFamily> eventFamilies,
ImportMode initialMode,
ImportMode scheduledMode,
AcquisitionStrategy initialStrategy,
AcquisitionStrategy scheduledStrategy,
boolean refreshMasterDataFirst,
OffsetDateTime initialOccurredFrom,
OffsetDateTime initialOccurredTo,
boolean runInitialOnStartup
) {
}

View File

@ -0,0 +1,9 @@
package at.procon.eventhub.dto;
public enum SchedulerTriggerMode {
/** Scheduler creates import runs and extraction packages only. A worker/extractor can execute them later. */
PLAN_ONLY,
/** Scheduler creates the import run and immediately invokes the configured extraction executor. */
EXECUTE
}

View File

@ -0,0 +1,20 @@
package at.procon.eventhub.dto;
import java.time.OffsetDateTime;
import java.util.UUID;
public record TachographExtractionBatchResultDto(
UUID packageId,
String extractionCode,
String sourceKind,
int sourceRowsRead,
int eventsMapped,
int eventsInserted,
int alreadyImported,
boolean executed,
OffsetDateTime lastSourcePackageImportedAt,
String lastSourcePackageId,
OffsetDateTime lastSourceRowUpdatedAt,
OffsetDateTime lastOccurredTo
) {
}

View File

@ -0,0 +1,13 @@
package at.procon.eventhub.dto;
import java.time.OffsetDateTime;
public record TachographImportTriggerResultDto(
String planKey,
ImportMode mode,
AcquisitionStrategy acquisitionStrategy,
SchedulerTriggerMode triggerMode,
OffsetDateTime triggeredAt,
TachographImportRunResultDto runResult
) {
}

View File

@ -170,6 +170,18 @@ public class DataPackageRepository {
);
}
public void markImporting(UUID packageId) {
jdbcTemplate.update(
"""
update eventhub.data_package
set status = ?
where id = ?
""",
DataPackageStatus.IMPORTING.name(),
packageId
);
}
public void markImported(UUID packageId, int insertedCount) {
jdbcTemplate.update(
"""

View File

@ -0,0 +1,56 @@
package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.TachographExtractionBatchResultDto;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class ImportCursorRepository {
private final JdbcTemplate jdbcTemplate;
public ImportCursorRepository(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public void advanceCursor(
String tenantKey,
int eventSourceId,
String scopeHash,
EventFamily eventFamily,
String sourceKind,
AcquisitionStrategy strategy,
TachographExtractionBatchResultDto result
) {
jdbcTemplate.update(
"""
insert into eventhub.import_cursor(
id, tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type,
last_source_package_imported_at, last_source_package_id,
last_source_row_updated_at, last_occurred_to, updated_at
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now())
on conflict (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type)
do update set
last_source_package_imported_at = coalesce(excluded.last_source_package_imported_at, eventhub.import_cursor.last_source_package_imported_at),
last_source_package_id = coalesce(excluded.last_source_package_id, eventhub.import_cursor.last_source_package_id),
last_source_row_updated_at = coalesce(excluded.last_source_row_updated_at, eventhub.import_cursor.last_source_row_updated_at),
last_occurred_to = coalesce(excluded.last_occurred_to, eventhub.import_cursor.last_occurred_to),
updated_at = now()
""",
UUID.randomUUID(),
tenantKey,
eventSourceId,
scopeHash,
eventFamily.name(),
sourceKind,
strategy.name(),
result.lastSourcePackageImportedAt(),
result.lastSourcePackageId(),
result.lastSourceRowUpdatedAt(),
result.lastOccurredTo()
);
}
}

View File

@ -0,0 +1,49 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.dto.TachographImportPlanItemDto;
import at.procon.eventhub.dto.TachographImportRequest;
import at.procon.eventhub.dto.TimeChunkDto;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* Default executor used by the generated skeleton. It marks the route contract
* and scheduling/execution lifecycle but intentionally does not read the real
* tachograph DB. Replace this bean with an implementation that executes the
* SQL for each extractionCode and sends mapped EventHubEventDto records to
* direct:eventhub-normalized-input.
*/
@Service
public class NoopTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor {
private static final Logger log = LoggerFactory.getLogger(NoopTachographExtractionBatchExecutor.class);
@Override
public TachographExtractionBatchResultDto execute(
UUID importRunId,
UUID packageId,
TachographImportRequest request,
TachographImportPlanItemDto planItem,
TimeChunkDto chunk
) {
log.warn("No concrete tachograph SQL extractor configured. importRunId={} packageId={} extractionCode={} sourceKind={} chunk={}",
importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), chunk.sequence());
return new TachographExtractionBatchResultDto(
packageId,
planItem.extractionCode(),
planItem.sourceKind(),
0,
0,
0,
0,
false,
null,
null,
null,
null
);
}
}

View File

@ -0,0 +1,110 @@
package at.procon.eventhub.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.ConfiguredTachographImportPlanDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.TachographImportRequest;
import java.util.List;
import java.util.Locale;
import java.util.NoSuchElementException;
import org.springframework.stereotype.Service;
@Service
public class TachographConfiguredImportPlanService {
private final EventHubProperties properties;
public TachographConfiguredImportPlanService(EventHubProperties properties) {
this.properties = properties;
}
public List<ConfiguredTachographImportPlanDto> listPlans() {
return properties.getTachograph().getImportPlans().stream()
.map(this::toDto)
.toList();
}
public ConfiguredTachographImportPlanDto getPlan(String planKey) {
return properties.getTachograph().getImportPlans().stream()
.filter(plan -> normalize(plan.getPlanKey()).equals(normalize(planKey)))
.findFirst()
.map(this::toDto)
.orElseThrow(() -> new NoSuchElementException("No configured tachograph import plan found for key " + planKey));
}
public TachographImportRequest createRequest(String planKey, ImportMode modeOverride, AcquisitionStrategy strategyOverride) {
EventHubProperties.ConfiguredImportPlan plan = properties.getTachograph().getImportPlans().stream()
.filter(candidate -> normalize(candidate.getPlanKey()).equals(normalize(planKey)))
.findFirst()
.orElseThrow(() -> new NoSuchElementException("No configured tachograph import plan found for key " + planKey));
return createRequest(plan, modeOverride, strategyOverride, false);
}
public TachographImportRequest createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) {
return createRequest(plan, plan.getScheduledMode(), plan.getScheduledStrategy(), false);
}
public TachographImportRequest createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) {
return createRequest(plan, plan.getInitialMode(), plan.getInitialStrategy(), true);
}
private TachographImportRequest createRequest(
EventHubProperties.ConfiguredImportPlan plan,
ImportMode modeOverride,
AcquisitionStrategy strategyOverride,
boolean applyInitialOccurredWindow
) {
ImportMode mode = modeOverride == null ? plan.getScheduledMode() : modeOverride;
AcquisitionStrategy strategy = strategyOverride == null
? (mode == ImportMode.INCREMENTAL_UPDATE ? plan.getScheduledStrategy() : plan.getInitialStrategy())
: strategyOverride;
ImportScopeDto scope = plan.getImportScope();
if (applyInitialOccurredWindow && scope != null
&& (plan.getInitialOccurredFrom() != null || plan.getInitialOccurredTo() != null)) {
scope = new ImportScopeDto(
scope.type(),
scope.rootSourceOrganisation(),
scope.includeChildren(),
plan.getInitialOccurredFrom() == null ? scope.occurredFrom() : plan.getInitialOccurredFrom(),
plan.getInitialOccurredTo() == null ? scope.occurredTo() : plan.getInitialOccurredTo()
);
}
return new TachographImportRequest(
plan.getTenantKey(),
plan.getEventSource(),
plan.getSourceGroup(),
scope,
plan.getEventFamilies(),
mode,
plan.isRefreshMasterDataFirst(),
strategy
);
}
private ConfiguredTachographImportPlanDto toDto(EventHubProperties.ConfiguredImportPlan plan) {
return new ConfiguredTachographImportPlanDto(
plan.getPlanKey(),
plan.isEnabled(),
plan.getCron(),
plan.getTenantKey(),
plan.getEventSource(),
plan.getSourceGroup(),
plan.getImportScope(),
plan.getEventFamilies(),
plan.getInitialMode(),
plan.getScheduledMode(),
plan.getInitialStrategy(),
plan.getScheduledStrategy(),
plan.isRefreshMasterDataFirst(),
plan.getInitialOccurredFrom(),
plan.getInitialOccurredTo(),
plan.isRunInitialOnStartup()
);
}
private String normalize(String value) {
return value == null ? "" : value.trim().toLowerCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,18 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.dto.TachographImportPlanItemDto;
import at.procon.eventhub.dto.TachographImportRequest;
import at.procon.eventhub.dto.TimeChunkDto;
import java.util.UUID;
public interface TachographExtractionBatchExecutor {
TachographExtractionBatchResultDto execute(
UUID importRunId,
UUID packageId,
TachographImportRequest request,
TachographImportPlanItemDto planItem,
TimeChunkDto chunk
);
}

View File

@ -4,6 +4,7 @@ import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.dto.TachographImportPlanDto;
import at.procon.eventhub.dto.TachographImportPlanItemDto;
import at.procon.eventhub.dto.TachographImportRequest;
@ -12,6 +13,7 @@ import at.procon.eventhub.dto.TimeChunkDto;
import at.procon.eventhub.persistence.DataPackageRepository;
import at.procon.eventhub.persistence.EventSourceRepository;
import at.procon.eventhub.persistence.ImportRunRepository;
import at.procon.eventhub.persistence.ImportCursorRepository;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@ -25,10 +27,9 @@ import org.springframework.transaction.annotation.Transactional;
/**
* Creates import runs and extraction data packages for tachograph acquisition.
*
* This service deliberately creates EventHub packages for extraction batches. The
* original tachograph card/VU package is not treated as the EventHub package; it
* is preserved later as SourcePackageRefDto on acquired events or in batch
* metadata when an extractor processes one concrete source package.
* EventHub data packages are extraction batches. The original tachograph card/VU
* package is preserved later as SourcePackageRefDto on acquired events or in
* batch metadata when an extractor processes one concrete source package.
*/
@Service
public class TachographImportExecutionService {
@ -39,29 +40,51 @@ public class TachographImportExecutionService {
private final EventSourceRepository eventSourceRepository;
private final ImportRunRepository importRunRepository;
private final DataPackageRepository dataPackageRepository;
private final ImportCursorRepository importCursorRepository;
private final TachographMasterDataRefreshService masterDataRefreshService;
private final TachographExtractionBatchExecutor extractionBatchExecutor;
public TachographImportExecutionService(
TachographImportPlanService planService,
EventSourceRepository eventSourceRepository,
ImportRunRepository importRunRepository,
DataPackageRepository dataPackageRepository
DataPackageRepository dataPackageRepository,
ImportCursorRepository importCursorRepository,
TachographMasterDataRefreshService masterDataRefreshService,
TachographExtractionBatchExecutor extractionBatchExecutor
) {
this.planService = planService;
this.eventSourceRepository = eventSourceRepository;
this.importRunRepository = importRunRepository;
this.dataPackageRepository = dataPackageRepository;
this.importCursorRepository = importCursorRepository;
this.masterDataRefreshService = masterDataRefreshService;
this.extractionBatchExecutor = extractionBatchExecutor;
}
@Transactional
public TachographImportRunResultDto startImport(TachographImportRequest request) {
return createImportRun(request, false);
}
@Transactional
public TachographImportRunResultDto startAndExecuteImport(TachographImportRequest request) {
return createImportRun(request, true);
}
private TachographImportRunResultDto createImportRun(TachographImportRequest request, boolean executeImmediately) {
TachographImportPlanDto plan = planService.createPlan(request);
int baseEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), request.eventSource());
UUID importRunId = importRunRepository.createPlannedRun(baseEventSourceId, request, Map.of(
"note", "Created tachograph import run and planned extraction packages. SQL extraction is handled by event-family routes.",
"packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto."
"note", executeImmediately
? "Created tachograph import run and executing planned extraction packages."
: "Created tachograph import run and planned extraction packages.",
"packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto.",
"executeImmediately", executeImmediately
));
List<UUID> packageIds = new ArrayList<>();
List<PlannedPackage> plannedPackages = new ArrayList<>();
int batchNo = 1;
try {
for (TachographImportPlanItemDto item : plan.items()) {
@ -81,12 +104,18 @@ public class TachographImportExecutionService {
metadata(request, item, chunk, importRunId)
);
packageIds.add(packageId);
plannedPackages.add(new PlannedPackage(packageId, itemEventSourceId, item, chunk));
batchNo++;
}
}
importRunRepository.markPlannedPackages(importRunId, packageIds.size());
log.info("Created tachograph import run importRunId={} plannedPackages={} tenant={} mode={} strategy={}",
importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy());
log.info("Created tachograph import run importRunId={} plannedPackages={} tenant={} mode={} strategy={} executeImmediately={}",
importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy(), executeImmediately);
if (executeImmediately) {
executePlannedPackages(importRunId, request, plannedPackages);
return new TachographImportRunResultDto(importRunId, ImportRunStatus.COMPLETED, packageIds.size(), plan, List.copyOf(packageIds));
}
return new TachographImportRunResultDto(importRunId, ImportRunStatus.PLANNED, packageIds.size(), plan, List.copyOf(packageIds));
} catch (RuntimeException ex) {
importRunRepository.markFailed(importRunId, ex.getMessage());
@ -94,6 +123,41 @@ public class TachographImportExecutionService {
}
}
private void executePlannedPackages(UUID importRunId, TachographImportRequest request, List<PlannedPackage> plannedPackages) {
importRunRepository.markRunning(importRunId);
masterDataRefreshService.refreshIfRequested(request);
List<TachographExtractionBatchResultDto> results = new ArrayList<>();
for (PlannedPackage plannedPackage : plannedPackages) {
dataPackageRepository.markImporting(plannedPackage.packageId());
TachographExtractionBatchResultDto result = extractionBatchExecutor.execute(
importRunId,
plannedPackage.packageId(),
request,
plannedPackage.planItem(),
plannedPackage.chunk()
);
results.add(result);
dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted());
if (result.executed()) {
importCursorRepository.advanceCursor(
request.tenantKey(),
plannedPackage.eventSourceId(),
request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(),
plannedPackage.planItem().eventFamily(),
plannedPackage.planItem().sourceKind(),
request.acquisitionStrategy(),
result
);
}
}
importRunRepository.markCompleted(importRunId);
log.info("Completed tachograph import run importRunId={} packages={} insertedEvents={} executedBatches={}",
importRunId,
plannedPackages.size(),
results.stream().mapToInt(TachographExtractionBatchResultDto::eventsInserted).sum(),
results.stream().filter(TachographExtractionBatchResultDto::executed).count());
}
private EventSourceDto eventSourceForItem(EventSourceDto base, TachographImportPlanItemDto item) {
String sourceKind = item.sourceKind();
String sourceKey = switch (sourceKind) {
@ -161,4 +225,7 @@ public class TachographImportExecutionService {
metadata.put("sourcePackageRefPolicy", "Original tachograph card/VU package is preserved per acquired event when SQL extraction returns it.");
return metadata;
}
private record PlannedPackage(UUID packageId, int eventSourceId, TachographImportPlanItemDto planItem, TimeChunkDto chunk) {
}
}

View File

@ -0,0 +1,97 @@
package at.procon.eventhub.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.SchedulerTriggerMode;
import at.procon.eventhub.dto.TachographImportRequest;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service;
@Service
public class TachographImportScheduler {
private static final Logger log = LoggerFactory.getLogger(TachographImportScheduler.class);
private final EventHubProperties properties;
private final TachographConfiguredImportPlanService configuredPlanService;
private final TachographImportExecutionService executionService;
private final Map<String, ZonedDateTime> nextRunByPlan = new ConcurrentHashMap<>();
private final Map<String, AtomicBoolean> runningByPlan = new ConcurrentHashMap<>();
public TachographImportScheduler(
EventHubProperties properties,
TachographConfiguredImportPlanService configuredPlanService,
TachographImportExecutionService executionService
) {
this.properties = properties;
this.configuredPlanService = configuredPlanService;
this.executionService = executionService;
}
@EventListener(ApplicationReadyEvent.class)
public void triggerInitialPlansOnStartup() {
if (!properties.getTachograph().isSchedulerEnabled()) {
return;
}
for (EventHubProperties.ConfiguredImportPlan plan : properties.getTachograph().getImportPlans()) {
if (plan.isEnabled() && plan.getPlanKey() != null && !plan.getPlanKey().isBlank() && plan.isRunInitialOnStartup()) {
triggerPlan(plan, true, properties.getTachograph().getSchedulerTriggerMode());
}
}
}
@Scheduled(fixedDelayString = "${eventhub.tachograph.scheduler-poll-interval-ms:60000}")
public void pollConfiguredPlans() {
if (!properties.getTachograph().isSchedulerEnabled()) {
return;
}
ZonedDateTime now = ZonedDateTime.now(ZoneId.systemDefault());
for (EventHubProperties.ConfiguredImportPlan plan : properties.getTachograph().getImportPlans()) {
if (!plan.isEnabled() || plan.getPlanKey() == null || plan.getPlanKey().isBlank() || plan.getCron() == null || plan.getCron().isBlank()) {
continue;
}
String key = plan.getPlanKey();
ZonedDateTime next = nextRunByPlan.computeIfAbsent(key, ignored -> CronExpression.parse(plan.getCron()).next(now));
if (next != null && !next.isAfter(now)) {
triggerPlan(plan, false, properties.getTachograph().getSchedulerTriggerMode());
nextRunByPlan.put(key, CronExpression.parse(plan.getCron()).next(now.plusSeconds(1)));
}
}
}
private void triggerPlan(EventHubProperties.ConfiguredImportPlan plan, boolean initial, SchedulerTriggerMode triggerMode) {
String key = plan.getPlanKey();
AtomicBoolean running = runningByPlan.computeIfAbsent(key, ignored -> new AtomicBoolean(false));
if (!running.compareAndSet(false, true)) {
log.info("Skipping tachograph import plan={} because a previous run is still active", key);
return;
}
try {
TachographImportRequest request = initial
? configuredPlanService.createInitialRequest(plan)
: configuredPlanService.createScheduledRequest(plan);
log.info("Triggering tachograph import plan={} initial={} mode={} strategy={} triggerMode={} at={}",
key, initial, request.mode(), request.acquisitionStrategy(), triggerMode, OffsetDateTime.now());
if (triggerMode == SchedulerTriggerMode.EXECUTE) {
executionService.startAndExecuteImport(request);
} else {
executionService.startImport(request);
}
} catch (RuntimeException ex) {
log.error("Tachograph import plan={} failed to trigger", key, ex);
} finally {
running.set(false);
}
}
}

View File

@ -0,0 +1,27 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.TachographImportRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* Hook for refreshing tachograph master data before event extraction.
*
* The generated project does not yet know the concrete tachograph master-data
* schema. Replace/extend this service with SQL readers for organisations,
* vehicles, vehicle registrations, drivers, and driver cards.
*/
@Service
public class TachographMasterDataRefreshService {
private static final Logger log = LoggerFactory.getLogger(TachographMasterDataRefreshService.class);
public void refreshIfRequested(TachographImportRequest request) {
if (!request.refreshMasterDataFirst()) {
return;
}
log.info("Tachograph master-data refresh requested for tenant={} source={}. Concrete SQL refresh is a project-specific extension point.",
request.tenantKey(), request.eventSource().stableKey());
}
}

View File

@ -32,3 +32,56 @@ eventhub:
tachograph:
default-chunk-days: 1
occurred-at-overlap: 7d
# Enables the scheduler that regularly triggers configured tachograph import plans.
scheduler-enabled: false
scheduler-poll-interval-ms: 60000
# PLAN_ONLY creates import_run + planned extraction packages.
# EXECUTE also invokes the configured TachographExtractionBatchExecutor.
scheduler-trigger-mode: PLAN_ONLY
# Example plan. Keep disabled until the tachograph datasource/extractor is wired.
import-plans:
- plan-key: kralowetz-tachograph-org-147
enabled: false
cron: "0 15 * * * *" # hourly at minute 15
tenant-key: kralowetz
event-source:
provider-key: TACHOGRAPH
source-kind: MIXED
source-key: TACHOGRAPH_DB
source-instance-key: tachograph-prod-db
tenant-provider-setting-key: kralowetz-tachograph-prod
source-group:
type: ORGANISATION
source-entity-id: "147"
code: "147"
name: Kralowetz root organisation
import-scope:
type: SOURCE_ORGANISATION_SUBTREE
root-source-organisation:
type: ORGANISATION
source-entity-id: "147"
code: "147"
name: Kralowetz root organisation
include-children: true
occurred-from: null
occurred-to: null
event-families:
- DRIVER_ACTIVITY
- DRIVER_CARD
- POSITION
- BORDER_CROSSING
- LOAD_UNLOAD
- PLACE
- SPECIFIC_CONDITION
- SPEEDING
initial-mode: INITIAL_BACKFILL
scheduled-mode: INCREMENTAL_UPDATE
initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP
scheduled-strategy: SOURCE_PACKAGE_WATERMARK
refresh-master-data-first: true
initial-occurred-from: "2025-01-01T00:00:00+01:00"
initial-occurred-to: null
run-initial-on-startup: false