diff --git a/src/main/java/at/procon/eventhub/yellowfox/api/YellowFoxD8ImportController.java b/src/main/java/at/procon/eventhub/yellowfox/api/YellowFoxD8ImportController.java new file mode 100644 index 0000000..17aa698 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/api/YellowFoxD8ImportController.java @@ -0,0 +1,93 @@ +package at.procon.eventhub.yellowfox.api; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.SchedulerTriggerMode; +import at.procon.eventhub.yellowfox.dto.ConfiguredYellowFoxD8ImportPlanDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRunResultDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportTriggerResultDto; +import at.procon.eventhub.yellowfox.service.YellowFoxD8ConfiguredImportPlanService; +import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportExecutionService; +import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportPlanService; +import jakarta.validation.Valid; +import java.time.OffsetDateTime; +import java.util.List; +import org.apache.camel.ProducerTemplate; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/eventhub/acquisition/yellowfox/d8") +public class YellowFoxD8ImportController { + + private final ProducerTemplate producerTemplate; + private final YellowFoxD8ImportPlanService importPlanService; + private final YellowFoxD8ConfiguredImportPlanService configuredImportPlanService; + private final YellowFoxD8ImportExecutionService importExecutionService; + + public YellowFoxD8ImportController( + ProducerTemplate producerTemplate, + YellowFoxD8ImportPlanService importPlanService, + YellowFoxD8ConfiguredImportPlanService configuredImportPlanService, + YellowFoxD8ImportExecutionService importExecutionService + ) { + this.producerTemplate = producerTemplate; + this.importPlanService = importPlanService; + this.configuredImportPlanService = configuredImportPlanService; + this.importExecutionService = importExecutionService; + } + + @PostMapping("/imports/plan") + public ResponseEntity planYellowFoxD8Import(@Valid @RequestBody YellowFoxD8ImportRequest request) { + return ResponseEntity.ok(importPlanService.createPlan(request)); + } + + @PostMapping("/imports/start") + public ResponseEntity startYellowFoxD8Import( + @Valid @RequestBody YellowFoxD8ImportRequest request, + @RequestParam(defaultValue = "false") boolean execute + ) { + YellowFoxD8ImportRunResultDto result = execute + ? importExecutionService.startAndExecuteImport(request) + : producerTemplate.requestBody("direct:yellowfox-d8-import-start", request, YellowFoxD8ImportRunResultDto.class); + return ResponseEntity.accepted().body(result); + } + + @GetMapping("/imports/configured-plans") + public ResponseEntity> listConfiguredYellowFoxPlans() { + return ResponseEntity.ok(configuredImportPlanService.listPlans()); + } + + @GetMapping("/imports/configured-plans/{planKey}") + public ResponseEntity getConfiguredYellowFoxPlan(@PathVariable String planKey) { + return ResponseEntity.ok(configuredImportPlanService.getPlan(planKey)); + } + + @PostMapping("/imports/configured-plans/{planKey}/start") + public ResponseEntity startConfiguredYellowFoxPlan( + @PathVariable String planKey, + @RequestParam(required = false) ImportMode mode, + @RequestParam(required = false) AcquisitionStrategy strategy, + @RequestParam(defaultValue = "PLAN_ONLY") SchedulerTriggerMode triggerMode + ) { + YellowFoxD8ImportRequest request = configuredImportPlanService.createRequest(planKey, mode, strategy); + YellowFoxD8ImportRunResultDto result = triggerMode == SchedulerTriggerMode.EXECUTE + ? importExecutionService.startAndExecuteImport(request) + : importExecutionService.startImport(request); + return ResponseEntity.accepted().body(new YellowFoxD8ImportTriggerResultDto( + planKey, + request.mode(), + request.acquisitionStrategy(), + triggerMode, + OffsetDateTime.now(), + result + )); + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/camel/YellowFoxD8BookingInputRoute.java b/src/main/java/at/procon/eventhub/yellowfox/camel/YellowFoxD8BookingInputRoute.java index 6555e8f..8213ec7 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/camel/YellowFoxD8BookingInputRoute.java +++ b/src/main/java/at/procon/eventhub/yellowfox/camel/YellowFoxD8BookingInputRoute.java @@ -1,6 +1,11 @@ package at.procon.eventhub.yellowfox.camel; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingEventMapper; +import at.procon.eventhub.yellowfox.service.YellowFoxD8IgnitionTransitionDetector; +import java.util.ArrayList; +import java.util.List; import org.apache.camel.builder.RouteBuilder; import org.springframework.stereotype.Component; @@ -8,17 +13,34 @@ import org.springframework.stereotype.Component; public class YellowFoxD8BookingInputRoute extends RouteBuilder { private final YellowFoxD8BookingEventMapper mapper; + private final YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector; - public YellowFoxD8BookingInputRoute(YellowFoxD8BookingEventMapper mapper) { + public YellowFoxD8BookingInputRoute( + YellowFoxD8BookingEventMapper mapper, + YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector + ) { this.mapper = mapper; + this.ignitionTransitionDetector = ignitionTransitionDetector; } @Override public void configure() { from("direct:yellowfox-d8-booking-input") .routeId("yellowfox-d8-booking-input-route") + .process(exchange -> { + List bookings = exchange.getMessage().getBody(List.class); + YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = ignitionTransitionDetector.newSession(false); + List events = new ArrayList<>(); + for (YellowFoxD8BookingDto booking : bookings) { + events.add(mapper.map(booking)); + EventHubEventDto ignitionEvent = ignitionSession.detect(booking); + if (ignitionEvent != null) { + events.add(ignitionEvent); + } + } + exchange.getMessage().setBody(events); + }) .split(body()) - .bean(mapper, "map") .to("direct:eventhub-normalized-input") .end(); } diff --git a/src/main/java/at/procon/eventhub/yellowfox/camel/YellowFoxD8ImportRequestRoute.java b/src/main/java/at/procon/eventhub/yellowfox/camel/YellowFoxD8ImportRequestRoute.java new file mode 100644 index 0000000..63a51d8 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/camel/YellowFoxD8ImportRequestRoute.java @@ -0,0 +1,33 @@ +package at.procon.eventhub.yellowfox.camel; + +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportExecutionService; +import org.apache.camel.builder.RouteBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class YellowFoxD8ImportRequestRoute extends RouteBuilder { + + private static final Logger log = LoggerFactory.getLogger(YellowFoxD8ImportRequestRoute.class); + + private final YellowFoxD8ImportExecutionService executionService; + + public YellowFoxD8ImportRequestRoute(YellowFoxD8ImportExecutionService executionService) { + this.executionService = executionService; + } + + @Override + public void configure() { + from("direct:yellowfox-d8-import-start") + .routeId("yellowfox-d8-import-start-route") + .process(exchange -> { + YellowFoxD8ImportRequest request = exchange.getMessage().getBody(YellowFoxD8ImportRequest.class); + var result = executionService.startImport(request); + log.info("Prepared YellowFox D8 import run importRunId={} plannedPackages={} status={}", + result.importRunId(), result.plannedPackageCount(), result.status()); + exchange.getMessage().setBody(result); + }); + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/config/YellowFoxDataSourceConfig.java b/src/main/java/at/procon/eventhub/yellowfox/config/YellowFoxDataSourceConfig.java new file mode 100644 index 0000000..344893d --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/config/YellowFoxDataSourceConfig.java @@ -0,0 +1,96 @@ +package at.procon.eventhub.yellowfox.config; + +import javax.sql.DataSource; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.datasource.DriverManagerDataSource; + +@Configuration +@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.yellow-fox.datasource.jdbc-url:}')") +public class YellowFoxDataSourceConfig { + + @Bean + @ConfigurationProperties(prefix = "eventhub.yellow-fox.datasource") + public YellowFoxDataSourceProperties yellowFoxDataSourceProperties() { + return new YellowFoxDataSourceProperties(); + } + + @Bean(defaultCandidate = false) + public DataSource yellowFoxDataSource(YellowFoxDataSourceProperties config) { + DriverManagerDataSource dataSource = new DriverManagerDataSource(); + dataSource.setUrl(validateJdbcUrl(config)); + dataSource.setUsername(config.getUsername()); + dataSource.setPassword(config.getPassword()); + String driverClassName = trimToNull(config.getDriverClassName()); + if (driverClassName != null) { + dataSource.setDriverClassName(driverClassName); + } + return dataSource; + } + + @Bean + public NamedParameterJdbcTemplate yellowFoxNamedParameterJdbcTemplate( + @Qualifier("yellowFoxDataSource") DataSource yellowFoxDataSource + ) { + return new NamedParameterJdbcTemplate(yellowFoxDataSource); + } + + private String validateJdbcUrl(YellowFoxDataSourceProperties config) { + String jdbcUrl = trimToNull(config.getJdbcUrl()); + if (jdbcUrl == null) { + throw new IllegalStateException("eventhub.yellow-fox.datasource.jdbc-url must not be empty"); + } + return jdbcUrl; + } + + private String trimToNull(String value) { + if (value == null) { + return null; + } + String trimmedValue = value.trim(); + return trimmedValue.isEmpty() ? null : trimmedValue; + } + + public static class YellowFoxDataSourceProperties { + private String jdbcUrl; + private String username; + private String password; + private String driverClassName; + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getDriverClassName() { + return driverClassName; + } + + public void setDriverClassName(String driverClassName) { + this.driverClassName = driverClassName; + } + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/dto/ConfiguredYellowFoxD8ImportPlanDto.java b/src/main/java/at/procon/eventhub/yellowfox/dto/ConfiguredYellowFoxD8ImportPlanDto.java new file mode 100644 index 0000000..071a97f --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/dto/ConfiguredYellowFoxD8ImportPlanDto.java @@ -0,0 +1,30 @@ +package at.procon.eventhub.yellowfox.dto; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import java.time.OffsetDateTime; +import java.util.Set; + +public record ConfiguredYellowFoxD8ImportPlanDto( + String planKey, + boolean enabled, + String cron, + String tenantKey, + EventSourceDto eventSource, + SourceGroupRefDto sourceGroup, + ImportScopeDto importScope, + Set eventFamilies, + ImportMode initialMode, + ImportMode scheduledMode, + AcquisitionStrategy initialStrategy, + AcquisitionStrategy scheduledStrategy, + boolean refreshMasterDataFirst, + OffsetDateTime initialOccurredFrom, + OffsetDateTime initialOccurredTo, + boolean runInitialOnStartup +) { +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8BookingDto.java b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8BookingDto.java index 29abf70..9c84354 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8BookingDto.java +++ b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8BookingDto.java @@ -11,8 +11,10 @@ public record YellowFoxD8BookingDto( String sourceInstanceKey, String tenantProviderSettingKey, String externalFleetKey, + String externalFleetName, String eventId, String key, + Integer ignition, Integer eventType, Integer state, OffsetDateTime occurredAt, diff --git a/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ExtractionBatchResultDto.java b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ExtractionBatchResultDto.java new file mode 100644 index 0000000..761da8d --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ExtractionBatchResultDto.java @@ -0,0 +1,27 @@ +package at.procon.eventhub.yellowfox.dto; + +import at.procon.eventhub.importing.ExtractionBatchResult; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.UUID; + +public record YellowFoxD8ExtractionBatchResultDto( + UUID packageId, + String extractionCode, + String sourceKind, + int sourceRowsRead, + int eventsMapped, + int eventsInserted, + int alreadyImported, + boolean executed, + OffsetDateTime lastSourcePackageImportedAt, + String lastSourcePackageId, + OffsetDateTime lastSourceRowUpdatedAt, + OffsetDateTime lastOccurredTo, + Map eventTypeCounts +) implements ExtractionBatchResult { + + public YellowFoxD8ExtractionBatchResultDto { + eventTypeCounts = eventTypeCounts == null ? Map.of() : Map.copyOf(eventTypeCounts); + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ImportRequest.java b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ImportRequest.java new file mode 100644 index 0000000..189c79c --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ImportRequest.java @@ -0,0 +1,48 @@ +package at.procon.eventhub.yellowfox.dto; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import at.procon.eventhub.importing.ImportRunRequest; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import java.util.EnumSet; +import java.util.Set; + +public record YellowFoxD8ImportRequest( + @NotBlank String tenantKey, + @Valid @NotNull EventSourceDto eventSource, + @Valid SourceGroupRefDto sourceGroup, + @Valid @NotNull ImportScopeDto importScope, + Set eventFamilies, + ImportMode mode, + boolean refreshMasterDataFirst, + AcquisitionStrategy acquisitionStrategy +) implements ImportRunRequest { + public YellowFoxD8ImportRequest { + tenantKey = tenantKey == null ? null : tenantKey.trim(); + if (eventSource == null) { + eventSource = new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", null, null, null); + } + if (importScope == null) { + importScope = ImportScopeDto.tenantAll(null, null); + } + if (eventFamilies == null || eventFamilies.isEmpty()) { + eventFamilies = EnumSet.of(EventFamily.DRIVER_ACTIVITY, EventFamily.DRIVER_CARD); + } else { + eventFamilies = EnumSet.copyOf(eventFamilies); + } + if (mode == null) { + mode = ImportMode.INITIAL_BACKFILL; + } + if (acquisitionStrategy == null || acquisitionStrategy == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) { + acquisitionStrategy = mode == ImportMode.INCREMENTAL_UPDATE + ? AcquisitionStrategy.SOURCE_ROW_WATERMARK + : AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP; + } + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ImportRunResultDto.java b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ImportRunResultDto.java new file mode 100644 index 0000000..c8a16f2 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ImportRunResultDto.java @@ -0,0 +1,15 @@ +package at.procon.eventhub.yellowfox.dto; + +import at.procon.eventhub.dto.ImportRunStatus; +import at.procon.eventhub.importing.ImportPlanDto; +import java.util.List; +import java.util.UUID; + +public record YellowFoxD8ImportRunResultDto( + UUID importRunId, + ImportRunStatus status, + int plannedPackageCount, + ImportPlanDto plan, + List plannedPackageIds +) { +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ImportTriggerResultDto.java b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ImportTriggerResultDto.java new file mode 100644 index 0000000..b0bb1c6 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8ImportTriggerResultDto.java @@ -0,0 +1,16 @@ +package at.procon.eventhub.yellowfox.dto; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.SchedulerTriggerMode; +import java.time.OffsetDateTime; + +public record YellowFoxD8ImportTriggerResultDto( + String planKey, + ImportMode mode, + AcquisitionStrategy acquisitionStrategy, + SchedulerTriggerMode triggerMode, + OffsetDateTime triggeredAt, + YellowFoxD8ImportRunResultDto result +) { +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java new file mode 100644 index 0000000..2a7d5da --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java @@ -0,0 +1,238 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.ImportCursorStateDto; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupType; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.persistence.ImportCursorRepository; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.camel.ProducerTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Service; +import org.springframework.util.StreamUtils; + +@Service +@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.yellow-fox.datasource.jdbc-url:}')") +public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD8ExtractionBatchExecutor { + + private static final Logger log = LoggerFactory.getLogger(JdbcYellowFoxD8BookingExtractionBatchExecutor.class); + private static final String EXTRACTION_CODE = "YELLOWFOX_D8_BOOKING"; + private static final int PROGRESS_LOG_INTERVAL = 5000; + + private final NamedParameterJdbcTemplate jdbcTemplate; + private final ProducerTemplate producerTemplate; + private final ResourceLoader resourceLoader; + private final ImportCursorRepository importCursorRepository; + private final EventHubProperties properties; + private final YellowFoxD8BookingRowMapper rowMapper; + private final YellowFoxD8BookingEventMapper eventMapper; + private final YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector; + + public JdbcYellowFoxD8BookingExtractionBatchExecutor( + @Qualifier("yellowFoxNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate, + ProducerTemplate producerTemplate, + ResourceLoader resourceLoader, + ImportCursorRepository importCursorRepository, + EventHubProperties properties, + YellowFoxD8BookingRowMapper rowMapper, + YellowFoxD8BookingEventMapper eventMapper, + YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector + ) { + this.jdbcTemplate = jdbcTemplate; + this.producerTemplate = producerTemplate; + this.resourceLoader = resourceLoader; + this.importCursorRepository = importCursorRepository; + this.properties = properties; + this.rowMapper = rowMapper; + this.eventMapper = eventMapper; + this.ignitionTransitionDetector = ignitionTransitionDetector; + } + + @Override + public YellowFoxD8ExtractionBatchResultDto execute( + UUID importRunId, + UUID packageId, + int eventSourceId, + YellowFoxD8ImportRequest request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk + ) { + if (!EXTRACTION_CODE.equals(planItem.extractionCode())) { + throw new IllegalArgumentException("Unsupported YellowFox extraction code " + planItem.extractionCode()); + } + + ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk); + ImportCursorStateDto cursor = findCursor(eventSourceId, request, planItem); + Map params = parameters(request, chunkScope, cursor); + Stats stats = new Stats(); + YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = ignitionTransitionDetector + .newSession(properties.getYellowFox().isEmitInitialIgnitionSnapshot()); + + log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={}", + request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), params.get("fleetId"), request.acquisitionStrategy()); + + jdbcTemplate.query(loadSql(), params, rs -> { + stats.sourceRowsRead++; + YellowFoxD8BookingDto booking = rowMapper.map( + rs, + request.tenantKey(), + request.eventSource().sourceInstanceKey(), + request.eventSource().tenantProviderSettingKey() + ); + stats.acceptSourceRow(booking); + if (!hasEventReference(booking)) { + stats.skippedRows++; + return; + } + EventHubEventDto primaryEvent = eventMapper.map(booking); + send(primaryEvent, stats); + EventHubEventDto ignitionEvent = ignitionSession.detect(booking); + if (ignitionEvent != null) { + send(ignitionEvent, stats); + } + if (stats.sourceRowsRead % PROGRESS_LOG_INTERVAL == 0) { + log.info("YellowFox D8 extraction progress tenant={} importRunId={} packageId={} rows={} events={} byType={}", + request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.eventTypeCounts); + } + }); + + log.info("Finished YellowFox D8 extraction tenant={} importRunId={} packageId={} rows={} events={} skippedRows={} byType={}", + request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.skippedRows, stats.eventTypeCounts); + + return new YellowFoxD8ExtractionBatchResultDto( + packageId, + planItem.extractionCode(), + planItem.sourceKind(), + stats.sourceRowsRead, + stats.eventsSent, + stats.eventsSent, + stats.skippedRows, + true, + null, + stats.lastEventId, + stats.lastOccurredAt, + stats.lastOccurredAt, + stats.eventTypeCounts + ); + } + + private ImportCursorStateDto findCursor(int eventSourceId, YellowFoxD8ImportRequest request, ImportPlanItemDto planItem) { + String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(); + return importCursorRepository.findCursor( + request.tenantKey(), + eventSourceId, + scopeHash, + planItem.eventFamily(), + planItem.sourceKind(), + request.acquisitionStrategy() + ); + } + + private Map parameters(YellowFoxD8ImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) { + Map params = new HashMap<>(); + params.put("occurredFrom", scope == null ? null : scope.occurredFrom()); + params.put("occurredTo", scope == null ? null : scope.occurredTo()); + params.put("fleetId", fleetId(request)); + if (request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_ROW_WATERMARK && cursor != null) { + OffsetDateTime lastOccurredTo = cursor.lastOccurredTo(); + String lastSourceRowId = cursor.lastSourcePackageId(); + if (lastOccurredTo != null && properties.getYellowFox().getOccurredAtOverlap() != null + && !properties.getYellowFox().getOccurredAtOverlap().isZero()) { + lastOccurredTo = lastOccurredTo.minus(properties.getYellowFox().getOccurredAtOverlap()); + lastSourceRowId = null; + } + params.put("lastOccurredTo", lastOccurredTo); + params.put("lastSourceRowId", lastSourceRowId); + } else { + params.put("lastOccurredTo", null); + params.put("lastSourceRowId", null); + } + return params; + } + + private Integer fleetId(YellowFoxD8ImportRequest request) { + if (request.sourceGroup() == null || request.sourceGroup().type() != SourceGroupType.FLEET) { + return null; + } + String value = request.sourceGroup().sourceEntityId() == null + ? request.sourceGroup().code() + : request.sourceGroup().sourceEntityId(); + if (value == null || value.isBlank()) { + return null; + } + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException("YellowFox D8 fleet sourceEntityId/code must be numeric for JDBC import: " + value, ex); + } + } + + private ImportScopeDto chunkScope(ImportScopeDto scope, ImportTimeChunkDto chunk) { + if (scope == null) { + return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo()); + } + return new ImportScopeDto(scope.type(), scope.rootSourceOrganisation(), scope.includeChildren(), chunk.occurredFrom(), chunk.occurredTo()); + } + + private boolean hasEventReference(YellowFoxD8BookingDto booking) { + return (booking.driverRef() != null && booking.driverRef().hasAnyReference()) + || (booking.vehicleRef() != null && booking.vehicleRef().hasAnyReference()); + } + + private void send(EventHubEventDto event, Stats stats) { + producerTemplate.sendBody("direct:eventhub-normalized-input", event); + stats.acceptEvent(event); + } + + private String loadSql() { + Resource resource = resourceLoader.getResource("classpath:sql/yellowfox/d8-bookings.sql"); + try (var in = resource.getInputStream()) { + return StreamUtils.copyToString(in, StandardCharsets.UTF_8); + } catch (IOException ex) { + throw new IllegalStateException("Failed to load YellowFox D8 extraction SQL", ex); + } + } + + private static class Stats { + private int sourceRowsRead; + private int eventsSent; + private int skippedRows; + private OffsetDateTime lastOccurredAt; + private String lastEventId; + private final Map eventTypeCounts = new LinkedHashMap<>(); + + private void acceptSourceRow(YellowFoxD8BookingDto booking) { + if (booking.occurredAt() != null) { + lastOccurredAt = booking.occurredAt(); + lastEventId = booking.eventId(); + } + } + + private void acceptEvent(EventHubEventDto event) { + eventsSent++; + EventType type = event.eventType(); + String key = event.eventDomain().name() + "/" + (type == null ? "UNKNOWN" : type.name()); + eventTypeCounts.merge(key, 1, Integer::sum); + } + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/NoopYellowFoxD8ExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/yellowfox/service/NoopYellowFoxD8ExtractionBatchExecutor.java new file mode 100644 index 0000000..ebfecbb --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/NoopYellowFoxD8ExtractionBatchExecutor.java @@ -0,0 +1,47 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.extraction.AbstractNoopExtractionBatchExecutor; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.util.UUID; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.stereotype.Service; + +@Service +@ConditionalOnMissingBean(YellowFoxD8ExtractionBatchExecutor.class) +@ConditionalOnExpression("!T(org.springframework.util.StringUtils).hasText('${eventhub.yellow-fox.datasource.jdbc-url:}')") +public class NoopYellowFoxD8ExtractionBatchExecutor + extends AbstractNoopExtractionBatchExecutor + implements YellowFoxD8ExtractionBatchExecutor { + + @Override + protected YellowFoxD8ExtractionBatchResultDto emptyResult( + UUID packageId, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk + ) { + return new YellowFoxD8ExtractionBatchResultDto( + packageId, + planItem.extractionCode(), + planItem.sourceKind(), + 0, + 0, + 0, + 0, + false, + null, + null, + null, + null, + java.util.Map.of() + ); + } + + @Override + protected String providerName() { + return "yellowfox-d8"; + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingEventMapper.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingEventMapper.java index 70a7e14..647a339 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingEventMapper.java +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingEventMapper.java @@ -2,7 +2,8 @@ package at.procon.eventhub.yellowfox.service; import at.procon.eventhub.dto.CardSlot; import at.procon.eventhub.dto.CardStatus; -import at.procon.eventhub.dto.DrivingStatus; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDetailsDto; import at.procon.eventhub.dto.EventDomain; import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubPackageRequest; @@ -13,6 +14,7 @@ import at.procon.eventhub.dto.GeoPointDto; import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.SourceGroupRefDto; import at.procon.eventhub.dto.SourceGroupType; +import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.service.EventDetailsFactory; import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; import java.time.LocalDate; @@ -25,6 +27,8 @@ import org.springframework.stereotype.Component; @Component public class YellowFoxD8BookingEventMapper { + public static final String DETAIL_SOURCE = "YELLOWFOX_D8"; + private final EventDetailsFactory detailsFactory; public YellowFoxD8BookingEventMapper(EventDetailsFactory detailsFactory) { @@ -33,30 +37,42 @@ public class YellowFoxD8BookingEventMapper { public EventHubEventDto map(YellowFoxD8BookingDto source) { NormalizedEvent normalized = normalize(source.eventType(), source.state()); - Map payload = new LinkedHashMap<>(); - if (source.payload() != null) { - payload.putAll(source.payload()); - } - payload.put("provider", "YELLOWFOX"); - payload.put("sourceKind", "TELEMATICS_PLATFORM"); - payload.put("yellowFoxEventType", source.eventType()); - payload.put("yellowFoxState", source.state()); - payload.put("yellowFoxKey", source.key()); + return event(source, sourceEventId(source, "D8_BOOKING"), normalized, detailsFor(source, normalized), false); + } - EventSourceDto eventSource = new EventSourceDto( - "YELLOWFOX", - "TELEMATICS_PLATFORM", - "YELLOWFOX_D8", - source.sourceInstanceKey(), - source.tenantProviderSettingKey(), - source.externalFleetKey() + public EventHubEventDto mapIgnitionTransition(YellowFoxD8BookingDto source, Integer previousIgnitionState) { + if (source.ignition() == null) { + return null; + } + boolean ignitionOn = source.ignition() == 1; + NormalizedEvent normalized = new NormalizedEvent( + EventDomain.IGNITION, + ignitionOn ? EventType.IGNITION_ON : EventType.IGNITION_OFF, + ignitionOn ? EventLifecycle.ON : EventLifecycle.OFF, + null, + null ); + Map attributes = baseAttributes(source, normalized); + attributes.put("previousIgnitionState", previousIgnitionState); + attributes.put("previousIgnitionMeaning", ignitionMeaning(previousIgnitionState)); + attributes.put("derivedFrom", "YELLOWFOX_D8_BOOKING"); + attributes.put("sourceEventId", sourceEventId(source, "D8_BOOKING")); + EventDetailsDto details = new EventDetailsDto("IGNITION", detailsFactory.payloadFromMap(attributes)); + return event(source, sourceEventId(source, "D8_IGNITION") + ":" + (ignitionOn ? "ON" : "OFF"), normalized, details, true); + } + + private EventHubEventDto event( + YellowFoxD8BookingDto source, + String externalSourceEventId, + NormalizedEvent normalized, + EventDetailsDto details, + boolean derived + ) { + EventSourceDto eventSource = eventSource(source); LocalDate businessDate = source.occurredAt().toLocalDate(); var occurredFrom = businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); var occurredTo = businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); - SourceGroupRefDto sourceGroup = source.externalFleetKey() == null || source.externalFleetKey().isBlank() - ? null - : new SourceGroupRefDto(SourceGroupType.FLEET, source.externalFleetKey(), source.externalFleetKey(), null); + SourceGroupRefDto sourceGroup = sourceGroup(source); EventHubPackageRequest packageInfo = new EventHubPackageRequest( tenantOrDefault(source.tenantKey()), eventSource, @@ -69,8 +85,8 @@ public class YellowFoxD8BookingEventMapper { return new EventHubEventDto( UUID.randomUUID(), - source.eventId(), - source.driverRef(), + externalSourceEventId, + derived && normalized.domain() == EventDomain.IGNITION ? null : source.driverRef(), source.vehicleRef(), source.occurredAt(), source.receivedPartnerAt(), @@ -79,60 +95,170 @@ public class YellowFoxD8BookingEventMapper { normalized.type(), normalized.lifecycle(), source.odometerM(), - new GeoPointDto(source.latitude(), source.longitude()), - detailsFor(normalized), + point(source), + details, null, - detailsFactory.payloadFromMap(payload), + detailsFactory.payloadFromMap(payload(source, derived)), false, packageInfo ); } + public EventSourceDto eventSource(YellowFoxD8BookingDto source) { + return new EventSourceDto( + "YELLOWFOX", + "TELEMATICS_PLATFORM", + "YELLOWFOX_D8", + source.sourceInstanceKey(), + source.tenantProviderSettingKey(), + source.externalFleetKey() + ); + } + + private SourceGroupRefDto sourceGroup(YellowFoxD8BookingDto source) { + if (source.externalFleetKey() == null || source.externalFleetKey().isBlank()) { + return null; + } + return new SourceGroupRefDto( + SourceGroupType.FLEET, + source.externalFleetKey(), + source.externalFleetKey(), + source.externalFleetName() == null ? source.externalFleetKey() : source.externalFleetName() + ); + } + + private GeoPointDto point(YellowFoxD8BookingDto source) { + if (source.latitude() == null && source.longitude() == null) { + return null; + } + return new GeoPointDto(source.latitude(), source.longitude()); + } + private String tenantOrDefault(String value) { return value == null || value.isBlank() ? "default" : value.trim(); } - private at.procon.eventhub.dto.EventDetailsDto detailsFor(NormalizedEvent normalized) { - if (normalized.domain() == EventDomain.DRIVER_ACTIVITY) { - return detailsFactory.driverActivity(CardSlot.DRIVER, CardStatus.INSERTED, DrivingStatus.UNKNOWN); + private String sourceEventId(YellowFoxD8BookingDto source, String prefix) { + String eventId = source.eventId() == null || source.eventId().isBlank() ? "UNKNOWN_EVENT" : source.eventId().trim(); + return prefix + ":" + eventId + ":" + source.occurredAt(); + } + + private Map payload(YellowFoxD8BookingDto source, boolean derived) { + Map payload = new LinkedHashMap<>(); + if (source.payload() != null) { + payload.putAll(source.payload()); } - if (normalized.domain() == EventDomain.DRIVER_CARD) { - CardStatus status = normalized.lifecycle() == EventLifecycle.INSERT ? CardStatus.INSERTED : CardStatus.NOT_INSERTED; - return detailsFactory.driverCard(CardSlot.DRIVER, status); - } - return null; + payload.put("provider", "YELLOWFOX"); + payload.put("sourceKind", "TELEMATICS_PLATFORM"); + payload.put("sourceKey", "YELLOWFOX_D8"); + payload.put("derived", derived); + payload.put("yellowFoxEventType", source.eventType()); + payload.put("yellowFoxState", source.state()); + payload.put("yellowFoxIgnition", source.ignition()); + payload.put("yellowFoxKey", source.key()); + return payload; + } + + private EventDetailsDto detailsFor(YellowFoxD8BookingDto source, NormalizedEvent normalized) { + String type = switch (normalized.domain()) { + case DRIVER_ACTIVITY -> "DRIVER_ACTIVITY"; + case DRIVER_CARD -> "DRIVER_CARD"; + default -> "YELLOWFOX_D8_BOOKING"; + }; + return new EventDetailsDto(type, detailsFactory.payloadFromMap(baseAttributes(source, normalized))); + } + + private Map baseAttributes(YellowFoxD8BookingDto source, NormalizedEvent normalized) { + Map attributes = new LinkedHashMap<>(); + attributes.put("source", DETAIL_SOURCE); + attributes.put("yellowFoxEventType", source.eventType()); + attributes.put("yellowFoxState", source.state()); + attributes.put("yellowFoxStateMeaning", stateMeaning(source.eventType(), source.state())); + attributes.put("yellowFoxKey", source.key()); + attributes.put("ignitionState", source.ignition()); + attributes.put("ignitionMeaning", ignitionMeaning(source.ignition())); + attributes.put("odometer", source.odometerM()); + attributes.put("slot", normalized.slot() == null ? null : normalized.slot().name()); + attributes.put("slotNumber", normalized.slotNumber()); + attributes.put("cardSlot", normalized.slot() == null ? null : normalized.slot().name()); + attributes.put("eventDomain", normalized.domain().name()); + attributes.put("eventType", normalized.type().name()); + attributes.put("lifecycle", normalized.lifecycle().name()); + removeNullValues(attributes); + return attributes; + } + + private void removeNullValues(Map attributes) { + attributes.entrySet().removeIf(entry -> entry.getValue() == null); } private NormalizedEvent normalize(Integer eventType, Integer state) { if (eventType == null || state == null) { - return new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT); + return new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT, null, null); } - return switch (eventType) { - case 0, 1 -> normalizeCardActivity(state); - case 2, 3 -> normalizeDriverActivity(state); - default -> new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT); + case 0 -> normalizeCardActivity(CardSlot.DRIVER, 1, state); + case 1 -> normalizeCardActivity(CardSlot.CO_DRIVER, 2, state); + case 2 -> normalizeDriverActivity(CardSlot.DRIVER, 1, state); + case 3 -> normalizeDriverActivity(CardSlot.CO_DRIVER, 2, state); + default -> new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT, null, null); }; } - private NormalizedEvent normalizeCardActivity(Integer state) { + private NormalizedEvent normalizeCardActivity(CardSlot slot, int slotNumber, Integer state) { return switch (state) { - case 0 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW); - case 1 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT); - default -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT); + case 0 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW, slot, slotNumber); + case 1 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT, slot, slotNumber); + default -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT, slot, slotNumber); }; } - private NormalizedEvent normalizeDriverActivity(Integer state) { + private NormalizedEvent normalizeDriverActivity(CardSlot slot, int slotNumber, Integer state) { return switch (state) { - case 0 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.BREAK_REST, EventLifecycle.SNAPSHOT); - case 1 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.AVAILABILITY, EventLifecycle.SNAPSHOT); - case 2 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.WORK, EventLifecycle.SNAPSHOT); - case 3 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.SNAPSHOT); - default -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.UNKNOWN_ACTIVITY, EventLifecycle.SNAPSHOT); + case 0 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.BREAK_REST, EventLifecycle.START, slot, slotNumber); + case 1 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.AVAILABILITY, EventLifecycle.START, slot, slotNumber); + case 2 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.WORK, EventLifecycle.START, slot, slotNumber); + case 3 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START, slot, slotNumber); + default -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.UNKNOWN_ACTIVITY, EventLifecycle.SNAPSHOT, slot, slotNumber); }; } - private record NormalizedEvent(EventDomain domain, EventType type, EventLifecycle lifecycle) { + private String stateMeaning(Integer eventType, Integer state) { + if (eventType == null || state == null) { + return null; + } + if (eventType == 0 || eventType == 1) { + return switch (state) { + case 0 -> "CARD_REMOVED"; + case 1 -> "CARD_INSERTED"; + default -> "UNKNOWN_CARD_STATE"; + }; + } + if (eventType == 2 || eventType == 3) { + return switch (state) { + case 0 -> "PAUSE_OR_REST"; + case 1 -> "STANDBY_TIME"; + case 2 -> "WORK_TIME"; + case 3 -> "STEERING_TIME"; + default -> "UNKNOWN_ACTIVITY_STATE"; + }; + } + return "UNKNOWN_EVENT_TYPE"; + } + + private String ignitionMeaning(Integer ignition) { + if (ignition == null) { + return null; + } + return ignition == 1 ? "ON" : ignition == 0 ? "OFF" : "UNKNOWN"; + } + + private record NormalizedEvent( + EventDomain domain, + EventType type, + EventLifecycle lifecycle, + CardSlot slot, + Integer slotNumber + ) { } } diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java new file mode 100644 index 0000000..6fa62b2 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java @@ -0,0 +1,117 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.math.BigDecimal; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.util.LinkedHashMap; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class YellowFoxD8BookingRowMapper { + + private static final TypeReference> MAP_TYPE = new TypeReference<>() { + }; + + private final ObjectMapper objectMapper; + + public YellowFoxD8BookingRowMapper(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public YellowFoxD8BookingDto map(ResultSet rs, String tenantKey, String sourceInstanceKey, String tenantProviderSettingKey) throws SQLException { + String eventId = rs.getString("eventid"); + OffsetDateTime occurredAt = rs.getObject("utc", OffsetDateTime.class); + Integer vehicleId = getInteger(rs, "vehicle_id"); + Integer driverId = getInteger(rs, "driver_id"); + String vehicleVrn = rs.getString("vehicle_vrn"); + String vehicleVin = rs.getString("vehicle_vin"); + String driverCard = rs.getString("driver_card_number"); + Integer fleetId = getInteger(rs, "fleet_id"); + String fleetName = rs.getString("fleet_name"); + Integer odometer = getInteger(rs, "odometer"); + + Map payload = payload(rs.getString("payload")); + put(payload, "yellowFoxEventId", eventId); + put(payload, "yellowFoxOdometerRaw", odometer); + put(payload, "vehicleVrn", vehicleVrn); + put(payload, "vehicleVin", vehicleVin); + put(payload, "driverCardNumber", driverCard); + put(payload, "driverFirstName", rs.getString("driver_firstname")); + put(payload, "driverLastName", rs.getString("driver_lastname")); + put(payload, "fleetId", fleetId); + put(payload, "fleetName", fleetName); + put(payload, "telematicProviderId", getInteger(rs, "telematic_provider_id")); + put(payload, "telematicProviderName", rs.getString("telematic_provider_name")); + + DriverRefDto driverRef = driverId == null && isBlank(driverCard) + ? null + : new DriverRefDto(driverId == null ? null : driverId.toString(), new DriverCardRefDto(null, driverCard)); + VehicleRefDto vehicleRef = vehicleId == null && isBlank(vehicleVin) && isBlank(vehicleVrn) + ? null + : new VehicleRefDto( + vehicleId == null ? null : vehicleId.toString(), + vehicleVin, + vehicleId == null ? null : vehicleId.toString(), + new VehicleRegistrationRefDto(null, vehicleVrn) + ); + + return new YellowFoxD8BookingDto( + tenantKey, + sourceInstanceKey, + tenantProviderSettingKey, + fleetId == null ? null : fleetId.toString(), + fleetName, + eventId, + rs.getString("key"), + getInteger(rs, "ignition"), + getInteger(rs, "eventtype"), + getInteger(rs, "state"), + occurredAt, + null, + driverRef, + vehicleRef, + odometer == null ? null : odometer.longValue() * 1000L, + rs.getBigDecimal("latitude"), + rs.getBigDecimal("longitude"), + payload + ); + } + + private Integer getInteger(ResultSet rs, String column) throws SQLException { + int value = rs.getInt(column); + return rs.wasNull() ? null : value; + } + + private Map payload(String json) { + if (json == null || json.isBlank()) { + return new LinkedHashMap<>(); + } + try { + return new LinkedHashMap<>(objectMapper.readValue(json, MAP_TYPE)); + } catch (Exception ex) { + Map fallback = new LinkedHashMap<>(); + fallback.put("rawPayload", json); + fallback.put("payloadParseError", ex.getMessage()); + return fallback; + } + } + + private void put(Map target, String key, Object value) { + if (value != null) { + target.put(key, value instanceof BigDecimal bd ? bd.stripTrailingZeros().toPlainString() : value); + } + } + + private boolean isBlank(String value) { + return value == null || value.isBlank(); + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanService.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanService.java new file mode 100644 index 0000000..502de8f --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanService.java @@ -0,0 +1,62 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.importing.AbstractConfiguredImportPlanService; +import at.procon.eventhub.importing.ConfiguredImportPlanDto; +import at.procon.eventhub.yellowfox.dto.ConfiguredYellowFoxD8ImportPlanDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import org.springframework.stereotype.Service; + +@Service +public class YellowFoxD8ConfiguredImportPlanService + extends AbstractConfiguredImportPlanService { + + public YellowFoxD8ConfiguredImportPlanService(EventHubProperties properties) { + super(() -> properties.getYellowFox().getImportPlans(), "yellowfox-d8"); + } + + @Override + protected YellowFoxD8ImportRequest buildRequest( + EventHubProperties.ConfiguredImportPlan plan, + ImportMode mode, + AcquisitionStrategy strategy, + ImportScopeDto scope + ) { + return new YellowFoxD8ImportRequest( + plan.getTenantKey(), + plan.getEventSource(), + plan.getSourceGroup(), + scope, + plan.getEventFamilies(), + mode, + false, + strategy + ); + } + + @Override + protected ConfiguredYellowFoxD8ImportPlanDto toDto(EventHubProperties.ConfiguredImportPlan plan) { + ConfiguredImportPlanDto dto = genericDto(plan); + return new ConfiguredYellowFoxD8ImportPlanDto( + dto.planKey(), + dto.enabled(), + dto.cron(), + dto.tenantKey(), + dto.eventSource(), + dto.sourceGroup(), + dto.importScope(), + dto.eventFamilies(), + dto.initialMode(), + dto.scheduledMode(), + dto.initialStrategy(), + dto.scheduledStrategy(), + false, + dto.initialOccurredFrom(), + dto.initialOccurredTo(), + dto.runInitialOnStartup() + ); + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ExtractionBatchExecutor.java new file mode 100644 index 0000000..a8fe774 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ExtractionBatchExecutor.java @@ -0,0 +1,22 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.extraction.ExtractionBatchExecutor; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.util.UUID; + +public interface YellowFoxD8ExtractionBatchExecutor + extends ExtractionBatchExecutor { + + @Override + YellowFoxD8ExtractionBatchResultDto execute( + UUID importRunId, + UUID packageId, + int eventSourceId, + YellowFoxD8ImportRequest request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk + ); +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetector.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetector.java new file mode 100644 index 0000000..5eb536d --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetector.java @@ -0,0 +1,47 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; +import java.util.HashMap; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class YellowFoxD8IgnitionTransitionDetector { + + private final YellowFoxD8BookingEventMapper mapper; + + public YellowFoxD8IgnitionTransitionDetector(YellowFoxD8BookingEventMapper mapper) { + this.mapper = mapper; + } + + public Session newSession(boolean emitInitialSnapshot) { + return new Session(mapper, emitInitialSnapshot); + } + + public static class Session { + private final YellowFoxD8BookingEventMapper mapper; + private final boolean emitInitialSnapshot; + private final Map lastIgnitionByVehicle = new HashMap<>(); + + private Session(YellowFoxD8BookingEventMapper mapper, boolean emitInitialSnapshot) { + this.mapper = mapper; + this.emitInitialSnapshot = emitInitialSnapshot; + } + + public EventHubEventDto detect(YellowFoxD8BookingDto booking) { + if (booking == null || booking.ignition() == null || booking.vehicleRef() == null || !booking.vehicleRef().hasAnyReference()) { + return null; + } + String vehicleKey = booking.vehicleRef().stableKey(); + Integer previous = lastIgnitionByVehicle.put(vehicleKey, booking.ignition()); + if (previous == null) { + return emitInitialSnapshot ? mapper.mapIgnitionTransition(booking, null) : null; + } + if (!previous.equals(booking.ignition())) { + return mapper.mapIgnitionTransition(booking, previous); + } + return null; + } + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportPlanService.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportPlanService.java new file mode 100644 index 0000000..ed18e60 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportPlanService.java @@ -0,0 +1,52 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.importing.ImportChunkPlanner; +import at.procon.eventhub.importing.ImportPlanDto; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.util.List; +import org.springframework.stereotype.Service; + +@Service +public class YellowFoxD8ImportPlanService { + + public static final String SOURCE_KIND = "TELEMATICS_PLATFORM"; + public static final String EXTRACTION_CODE = "YELLOWFOX_D8_BOOKING"; + + private final EventHubProperties properties; + private final ImportChunkPlanner chunkPlanner; + + public YellowFoxD8ImportPlanService(EventHubProperties properties, ImportChunkPlanner chunkPlanner) { + this.properties = properties; + this.chunkPlanner = chunkPlanner; + } + + public ImportPlanDto createPlan(YellowFoxD8ImportRequest request) { + return new ImportPlanDto( + request.tenantKey(), + request.mode(), + request.acquisitionStrategy(), + request.refreshMasterDataFirst(), + request.importScope(), + request.sourceGroup(), + request.eventSource(), + chunkPlanner.chunksFor(request, properties.getYellowFox().getDefaultChunkDays()), + List.of(item(request.acquisitionStrategy())) + ); + } + + private ImportPlanItemDto item(AcquisitionStrategy strategy) { + return new ImportPlanItemDto( + EventFamily.DRIVER_ACTIVITY, + SOURCE_KIND, + EXTRACTION_CODE, + List.of("data.d8_booking", "data.vehicle", "data.driver", "data.fleet", "data.telematic_provider"), + "VEHICLE", + "YellowFox D8 booking card/activity events with derived ignition transitions", + strategy + ); + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportScheduler.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportScheduler.java new file mode 100644 index 0000000..dcccaff --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportScheduler.java @@ -0,0 +1,79 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.SchedulerTriggerMode; +import at.procon.eventhub.importing.AbstractImportScheduler; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.util.List; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +@Service +public class YellowFoxD8ImportScheduler extends AbstractImportScheduler { + + private final EventHubProperties properties; + private final YellowFoxD8ConfiguredImportPlanService configuredPlanService; + private final YellowFoxD8ImportExecutionService executionService; + + public YellowFoxD8ImportScheduler( + EventHubProperties properties, + YellowFoxD8ConfiguredImportPlanService configuredPlanService, + YellowFoxD8ImportExecutionService executionService + ) { + this.properties = properties; + this.configuredPlanService = configuredPlanService; + this.executionService = executionService; + } + + @EventListener(ApplicationReadyEvent.class) + public void triggerInitialPlansOnStartup() { + super.triggerInitialPlansOnStartup(); + } + + @Scheduled(fixedDelayString = "${eventhub.yellow-fox.scheduler-poll-interval-ms:60000}") + public void pollConfiguredPlans() { + super.pollConfiguredPlans(); + } + + @Override + protected boolean schedulerEnabled() { + return properties.getYellowFox().isSchedulerEnabled(); + } + + @Override + protected List configuredPlans() { + return properties.getYellowFox().getImportPlans(); + } + + @Override + protected SchedulerTriggerMode schedulerTriggerMode() { + return properties.getYellowFox().getSchedulerTriggerMode(); + } + + @Override + protected YellowFoxD8ImportRequest createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) { + return configuredPlanService.createInitialRequest(plan); + } + + @Override + protected YellowFoxD8ImportRequest createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) { + return configuredPlanService.createScheduledRequest(plan); + } + + @Override + protected void startImport(YellowFoxD8ImportRequest request) { + executionService.startImport(request); + } + + @Override + protected void startAndExecuteImport(YellowFoxD8ImportRequest request) { + executionService.startAndExecuteImport(request); + } + + @Override + protected String providerName() { + return "yellowfox-d8"; + } +} diff --git a/src/main/resources/db/migration/V12__add_yellowfox_d8_detail_indexes.sql b/src/main/resources/db/migration/V12__add_yellowfox_d8_detail_indexes.sql new file mode 100644 index 0000000..0942ad2 --- /dev/null +++ b/src/main/resources/db/migration/V12__add_yellowfox_d8_detail_indexes.sql @@ -0,0 +1,15 @@ +create index if not exists idx_event_detail_yellowfox_slot + on eventhub.event_detail(detail_type, (attributes ->> 'slot'), event_occurred_at) + where detail_type in ('DRIVER_ACTIVITY', 'DRIVER_CARD'); + +create index if not exists idx_event_detail_yellowfox_eventtype_state + on eventhub.event_detail( + (attributes ->> 'yellowFoxEventType'), + (attributes ->> 'yellowFoxState'), + event_occurred_at + ) + where attributes ? 'yellowFoxEventType'; + +create index if not exists idx_event_detail_yellowfox_ignition + on eventhub.event_detail(detail_type, (attributes ->> 'ignitionState'), event_occurred_at) + where attributes ? 'ignitionState'; diff --git a/src/main/resources/sql/yellowfox/README.md b/src/main/resources/sql/yellowfox/README.md new file mode 100644 index 0000000..2823773 --- /dev/null +++ b/src/main/resources/sql/yellowfox/README.md @@ -0,0 +1,10 @@ +# YellowFox D8 SQL + +Recommended source-side index for incremental import: + +```sql +create index if not exists d8_booking_utc_eventid_idx +on data.d8_booking (utc asc, eventid asc); +``` + +The import uses SOURCE_ROW_WATERMARK with `(utc, eventid)`. diff --git a/src/main/resources/sql/yellowfox/d8-bookings.sql b/src/main/resources/sql/yellowfox/d8-bookings.sql new file mode 100644 index 0000000..7700cbd --- /dev/null +++ b/src/main/resources/sql/yellowfox/d8-bookings.sql @@ -0,0 +1,49 @@ +select + b.eventid, + b.utc, + b.vehicle_id, + b.driver_id, + b.key, + b.ignition, + b.eventtype, + b.state, + b.odometer, + st_y(b.geom) as latitude, + st_x(b.geom) as longitude, + b.payload, + + v.vrn as vehicle_vrn, + v.vin as vehicle_vin, + v.fleet_id as vehicle_fleet_id, + + d.firstname as driver_firstname, + d.name as driver_lastname, + d.drivers_card as driver_card_number, + d.fleet_id as driver_fleet_id, + + f.id as fleet_id, + f.name as fleet_name, + + tp.id as telematic_provider_id, + tp.name as telematic_provider_name +from data.d8_booking b +left join data.vehicle v + on v.id = b.vehicle_id +left join data.driver d + on d.id = b.driver_id +left join data.fleet f + on f.id = coalesce(v.fleet_id, d.fleet_id) +left join data.telematic_provider tp + on tp.id = v.telematic_provider_id +where (:occurredFrom is null or b.utc >= :occurredFrom) + and (:occurredTo is null or b.utc < :occurredTo) + and (:fleetId is null or f.id = :fleetId) + and ( + :lastOccurredTo is null + or b.utc > :lastOccurredTo + or ( + b.utc = :lastOccurredTo + and (:lastSourceRowId is null or b.eventid > :lastSourceRowId) + ) + ) +order by b.utc asc, b.eventid asc; diff --git a/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java index 099ab4a..5af598f 100644 --- a/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java +++ b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java @@ -1,5 +1,6 @@ package at.procon.eventhub; +import at.procon.eventhub.dto.CardSlot; import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.EventDomain; @@ -7,8 +8,8 @@ import at.procon.eventhub.dto.EventLifecycle; import at.procon.eventhub.dto.EventType; import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto; -import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; import at.procon.eventhub.service.EventDetailsFactory; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingEventMapper; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.OffsetDateTime; @@ -23,16 +24,67 @@ class YellowFoxD8BookingEventMapperTest { ); @Test - void mapsYellowFoxDrivingStateAsSnapshotWithEventSourceAndDetails() { - YellowFoxD8BookingDto source = new YellowFoxD8BookingDto( + void mapsYellowFoxDriverSlotActivityAsStartWithSlotDetails() { + var source = booking(2, 3, 1); + + var result = mapper.map(source); + + assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_ACTIVITY); + assertThat(result.eventType()).isEqualTo(EventType.DRIVE); + assertThat(result.lifecycle()).isEqualTo(EventLifecycle.START); + assertThat(result.externalSourceEventId()).startsWith("D8_BOOKING:event-1:"); + assertThat(result.eventDetails().type()).isEqualTo("DRIVER_ACTIVITY"); + assertThat(result.eventDetails().attributes().get("slot").asText()).isEqualTo(CardSlot.DRIVER.name()); + assertThat(result.eventDetails().attributes().get("slotNumber").asInt()).isEqualTo(1); + assertThat(result.eventDetails().attributes().get("yellowFoxStateMeaning").asText()).isEqualTo("STEERING_TIME"); + assertThat(result.eventDetails().attributes().get("ignitionState").asInt()).isEqualTo(1); + } + + @Test + void mapsYellowFoxCoDriverSlotActivityAsStartWithSlotDetails() { + var result = mapper.map(booking(3, 2, 0)); + + assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_ACTIVITY); + assertThat(result.eventType()).isEqualTo(EventType.WORK); + assertThat(result.lifecycle()).isEqualTo(EventLifecycle.START); + assertThat(result.eventDetails().attributes().get("slot").asText()).isEqualTo(CardSlot.CO_DRIVER.name()); + assertThat(result.eventDetails().attributes().get("slotNumber").asInt()).isEqualTo(2); + } + + @Test + void mapsCardInsertedUsingSameTachographDomainTypeAndLifecycle() { + var result = mapper.map(booking(0, 1, 1)); + + assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_CARD); + assertThat(result.eventType()).isEqualTo(EventType.CARD_INSERTED); + assertThat(result.lifecycle()).isEqualTo(EventLifecycle.INSERT); + assertThat(result.eventDetails().attributes().get("slot").asText()).isEqualTo(CardSlot.DRIVER.name()); + } + + @Test + void mapsIgnitionTransitionOnlyWhenDetectorRequestsIt() { + var result = mapper.mapIgnitionTransition(booking(2, 3, 1), 0); + + assertThat(result.eventDomain()).isEqualTo(EventDomain.IGNITION); + assertThat(result.eventType()).isEqualTo(EventType.IGNITION_ON); + assertThat(result.lifecycle()).isEqualTo(EventLifecycle.ON); + assertThat(result.externalSourceEventId()).contains("D8_IGNITION:event-1:"); + assertThat(result.eventDetails().attributes().get("previousIgnitionState").asInt()).isEqualTo(0); + assertThat(result.eventDetails().attributes().get("ignitionMeaning").asText()).isEqualTo("ON"); + } + + private YellowFoxD8BookingDto booking(int eventType, int state, int ignition) { + return new YellowFoxD8BookingDto( "tenant-1", "yellowfox-tenant-7", "tenant-yellowfox-7", "7", + "Fleet 7", "event-1", "key-1", - 2, - 3, + ignition, + eventType, + state, OffsetDateTime.parse("2026-04-29T08:15:00+02:00"), null, new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")), @@ -42,20 +94,5 @@ class YellowFoxD8BookingEventMapperTest { null, null ); - - var result = mapper.map(source); - - assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_ACTIVITY); - assertThat(result.eventType()).isEqualTo(EventType.DRIVE); - assertThat(result.lifecycle()).isEqualTo(EventLifecycle.SNAPSHOT); - assertThat(result.externalSourceEventId()).isEqualTo("event-1"); - assertThat(result.packageInfo().tenantKey()).isEqualTo("tenant-1"); - assertThat(result.packageInfo().eventSource().providerKey()).isEqualTo("YELLOWFOX"); - assertThat(result.packageInfo().eventSource().sourceKey()).isEqualTo("YELLOWFOX_D8"); - assertThat(result.driverRef().driverCard().nation()).isEqualTo("AT"); - assertThat(result.vehicleRef().vehicleRegistration().nation()).isEqualTo("AT"); - assertThat(result.eventDetails().type()).isEqualTo("DRIVER_ACTIVITY"); - assertThat(result.payload().get("yellowFoxEventType").asInt()).isEqualTo(2); - assertThat(result.payload().get("yellowFoxState").asInt()).isEqualTo(3); } }