diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index 038d6d0..4a0e690 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -19,8 +19,14 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "eventhub") public class EventHubProperties { + public enum JdbcExtractionIngestMode { + SYNC_DIRECT, + CAMEL_ROUTE + } + private final Batch batch = new Batch(); private final Tachograph tachograph = new Tachograph(); + private final YellowFox yellowFox = new YellowFox(); public Batch getBatch() { return batch; @@ -30,6 +36,10 @@ public class EventHubProperties { return tachograph; } + public YellowFox getYellowFox() { + return yellowFox; + } + public static class Batch { /** Number of events collected before a package is persisted. */ private int completionSize = 5000; @@ -115,6 +125,9 @@ public class EventHubProperties { private SchedulerTriggerMode schedulerTriggerMode = SchedulerTriggerMode.PLAN_ONLY; + /** How JDBC extraction batches are handed over to the ingest pipeline. */ + private JdbcExtractionIngestMode jdbcExtractionIngestMode = JdbcExtractionIngestMode.SYNC_DIRECT; + /** Configured tenant/source import plans. */ private List importPlans = new ArrayList<>(); @@ -161,6 +174,16 @@ public class EventHubProperties { this.schedulerTriggerMode = schedulerTriggerMode == null ? SchedulerTriggerMode.PLAN_ONLY : schedulerTriggerMode; } + public JdbcExtractionIngestMode getJdbcExtractionIngestMode() { + return jdbcExtractionIngestMode; + } + + public void setJdbcExtractionIngestMode(JdbcExtractionIngestMode jdbcExtractionIngestMode) { + this.jdbcExtractionIngestMode = jdbcExtractionIngestMode == null + ? JdbcExtractionIngestMode.SYNC_DIRECT + : jdbcExtractionIngestMode; + } + public List getImportPlans() { return importPlans; } @@ -217,6 +240,138 @@ public class EventHubProperties { } } + public static class YellowFox { + /** Default chunk size for initial/backfill D8 booking imports. */ + private int defaultChunkDays = 1; + + /** Overlap used by incremental utc/eventid cursor imports to catch late rows and ignition transitions. */ + private Duration occurredAtOverlap = Duration.ofHours(2); + + /** Regular scheduler scan interval; each configured plan still uses its own cron. */ + private Duration schedulerPollInterval = Duration.ofMinutes(1); + + /** Whether scheduled YellowFox imports are enabled. */ + private boolean schedulerEnabled = false; + + private SchedulerTriggerMode schedulerTriggerMode = SchedulerTriggerMode.PLAN_ONLY; + + /** Emit a first ignition snapshot per vehicle if no previous D8 ignition state exists in the imported window. */ + private boolean emitInitialIgnitionSnapshot = false; + + /** Configured tenant/source import plans. */ + private List importPlans = new ArrayList<>(); + + /** Optional external YellowFox mirror datasource. If absent, the no-op extractor is used. */ + private YellowFoxDataSource datasource = new YellowFoxDataSource(); + + public int getDefaultChunkDays() { + return defaultChunkDays; + } + + public void setDefaultChunkDays(int defaultChunkDays) { + this.defaultChunkDays = Math.max(1, defaultChunkDays); + } + + public Duration getOccurredAtOverlap() { + return occurredAtOverlap; + } + + public void setOccurredAtOverlap(Duration occurredAtOverlap) { + if (occurredAtOverlap != null && !occurredAtOverlap.isNegative()) { + this.occurredAtOverlap = occurredAtOverlap; + } + } + + public Duration getSchedulerPollInterval() { + return schedulerPollInterval; + } + + public void setSchedulerPollInterval(Duration schedulerPollInterval) { + if (schedulerPollInterval != null && !schedulerPollInterval.isNegative()) { + this.schedulerPollInterval = schedulerPollInterval; + } + } + + public boolean isSchedulerEnabled() { + return schedulerEnabled; + } + + public void setSchedulerEnabled(boolean schedulerEnabled) { + this.schedulerEnabled = schedulerEnabled; + } + + public SchedulerTriggerMode getSchedulerTriggerMode() { + return schedulerTriggerMode; + } + + public void setSchedulerTriggerMode(SchedulerTriggerMode schedulerTriggerMode) { + this.schedulerTriggerMode = schedulerTriggerMode == null ? SchedulerTriggerMode.PLAN_ONLY : schedulerTriggerMode; + } + + public boolean isEmitInitialIgnitionSnapshot() { + return emitInitialIgnitionSnapshot; + } + + public void setEmitInitialIgnitionSnapshot(boolean emitInitialIgnitionSnapshot) { + this.emitInitialIgnitionSnapshot = emitInitialIgnitionSnapshot; + } + + public List getImportPlans() { + return importPlans; + } + + public void setImportPlans(List importPlans) { + this.importPlans = importPlans == null ? new ArrayList<>() : importPlans; + } + + public YellowFoxDataSource getDatasource() { + return datasource; + } + + public void setDatasource(YellowFoxDataSource datasource) { + this.datasource = datasource == null ? new YellowFoxDataSource() : datasource; + } + } + + public static class YellowFoxDataSource { + private String jdbcUrl; + private String username; + private String password; + private String driverClassName; + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getDriverClassName() { + return driverClassName; + } + + public void setDriverClassName(String driverClassName) { + this.driverClassName = driverClassName; + } + } + public static class ConfiguredImportPlan { private String planKey; private boolean enabled = true; diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportExecutionService.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportExecutionService.java new file mode 100644 index 0000000..8e925c7 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportExecutionService.java @@ -0,0 +1,119 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.importing.AbstractImportExecutionService; +import at.procon.eventhub.importing.ImportPlanDto; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportRunResultDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.persistence.ImportCursorRepository; +import at.procon.eventhub.importing.persistence.ImportRunRepository; +import at.procon.eventhub.persistence.DataPackageRepository; +import at.procon.eventhub.persistence.EventSourceRepository; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRunResultDto; +import java.util.Map; +import java.util.UUID; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Service +public class YellowFoxD8ImportExecutionService + extends AbstractImportExecutionService { + + private final YellowFoxD8ImportPlanService planService; + private final YellowFoxD8ExtractionBatchExecutor extractionBatchExecutor; + + public YellowFoxD8ImportExecutionService( + YellowFoxD8ImportPlanService planService, + EventSourceRepository eventSourceRepository, + ImportRunRepository importRunRepository, + DataPackageRepository dataPackageRepository, + ImportCursorRepository importCursorRepository, + YellowFoxD8ExtractionBatchExecutor extractionBatchExecutor + ) { + super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository); + this.planService = planService; + this.extractionBatchExecutor = extractionBatchExecutor; + } + + @Transactional + public YellowFoxD8ImportRunResultDto startImport(YellowFoxD8ImportRequest request) { + return toYellowFoxResult(createImportRun(request, false)); + } + + public YellowFoxD8ImportRunResultDto startAndExecuteImport(YellowFoxD8ImportRequest request) { + return toYellowFoxResult(createImportRun(request, true)); + } + + @Override + protected ImportPlanDto createPlan(YellowFoxD8ImportRequest request) { + return planService.createPlan(request); + } + + @Override + protected YellowFoxD8ExtractionBatchResultDto executeBatch( + UUID importRunId, + UUID packageId, + int eventSourceId, + YellowFoxD8ImportRequest request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk + ) { + return extractionBatchExecutor.execute(importRunId, packageId, eventSourceId, request, planItem, chunk); + } + + @Override + protected EventSourceDto eventSourceForItem(EventSourceDto base, ImportPlanItemDto item) { + return new EventSourceDto( + "YELLOWFOX", + item.sourceKind(), + "YELLOWFOX_D8", + base.sourceInstanceKey(), + base.tenantProviderSettingKey(), + base.externalFleetKey() + ); + } + + @Override + protected Map importRunMetadata(YellowFoxD8ImportRequest request, boolean executeImmediately) { + return Map.of( + "note", executeImmediately + ? "Created YellowFox D8 import run and executing planned extraction packages." + : "Created YellowFox D8 import run and planned extraction packages.", + "packageModel", "EventHub data packages are YellowFox D8 extraction batches; source rows are idempotent by EVENTID+UTC.", + "executeImmediately", executeImmediately, + "officialD8Semantics", "eventtype 0/1 card slot DRIVER/CO_DRIVER; eventtype 2/3 activity slot DRIVER/CO_DRIVER; ignition is a state column." + ); + } + + @Override + protected String providerPackagePrefix() { + return "YELLOWFOX_D8"; + } + + @Override + protected Map packageMetadata( + YellowFoxD8ImportRequest request, + ImportPlanItemDto item, + ImportTimeChunkDto chunk, + UUID importRunId + ) { + Map metadata = super.packageMetadata(request, item, chunk, importRunId); + metadata.put("cursor", "SOURCE_ROW_WATERMARK using utc + eventid"); + metadata.put("slotPolicy", "eventtype 0/2 = DRIVER, eventtype 1/3 = CO_DRIVER"); + metadata.put("ignitionPolicy", "Store ignition state on every D8 detail; emit separate ignition event only on state change."); + return metadata; + } + + private YellowFoxD8ImportRunResultDto toYellowFoxResult(ImportRunResultDto result) { + return new YellowFoxD8ImportRunResultDto( + result.importRunId(), + result.status(), + result.plannedPackageCount(), + result.plan(), + result.plannedPackageIds() + ); + } +}