Add YellowFox D8 ingestion pipeline

This commit is contained in:
trifonovt 2026-05-05 12:54:49 +02:00
parent 32e9535bff
commit 900bfa5918
23 changed files with 1354 additions and 71 deletions

View File

@ -0,0 +1,93 @@
package at.procon.eventhub.yellowfox.api;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.SchedulerTriggerMode;
import at.procon.eventhub.yellowfox.dto.ConfiguredYellowFoxD8ImportPlanDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRunResultDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportTriggerResultDto;
import at.procon.eventhub.yellowfox.service.YellowFoxD8ConfiguredImportPlanService;
import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportExecutionService;
import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportPlanService;
import jakarta.validation.Valid;
import java.time.OffsetDateTime;
import java.util.List;
import org.apache.camel.ProducerTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/eventhub/acquisition/yellowfox/d8")
public class YellowFoxD8ImportController {
private final ProducerTemplate producerTemplate;
private final YellowFoxD8ImportPlanService importPlanService;
private final YellowFoxD8ConfiguredImportPlanService configuredImportPlanService;
private final YellowFoxD8ImportExecutionService importExecutionService;
public YellowFoxD8ImportController(
ProducerTemplate producerTemplate,
YellowFoxD8ImportPlanService importPlanService,
YellowFoxD8ConfiguredImportPlanService configuredImportPlanService,
YellowFoxD8ImportExecutionService importExecutionService
) {
this.producerTemplate = producerTemplate;
this.importPlanService = importPlanService;
this.configuredImportPlanService = configuredImportPlanService;
this.importExecutionService = importExecutionService;
}
@PostMapping("/imports/plan")
public ResponseEntity<?> planYellowFoxD8Import(@Valid @RequestBody YellowFoxD8ImportRequest request) {
return ResponseEntity.ok(importPlanService.createPlan(request));
}
@PostMapping("/imports/start")
public ResponseEntity<YellowFoxD8ImportRunResultDto> startYellowFoxD8Import(
@Valid @RequestBody YellowFoxD8ImportRequest request,
@RequestParam(defaultValue = "false") boolean execute
) {
YellowFoxD8ImportRunResultDto result = execute
? importExecutionService.startAndExecuteImport(request)
: producerTemplate.requestBody("direct:yellowfox-d8-import-start", request, YellowFoxD8ImportRunResultDto.class);
return ResponseEntity.accepted().body(result);
}
@GetMapping("/imports/configured-plans")
public ResponseEntity<List<ConfiguredYellowFoxD8ImportPlanDto>> listConfiguredYellowFoxPlans() {
return ResponseEntity.ok(configuredImportPlanService.listPlans());
}
@GetMapping("/imports/configured-plans/{planKey}")
public ResponseEntity<ConfiguredYellowFoxD8ImportPlanDto> getConfiguredYellowFoxPlan(@PathVariable String planKey) {
return ResponseEntity.ok(configuredImportPlanService.getPlan(planKey));
}
@PostMapping("/imports/configured-plans/{planKey}/start")
public ResponseEntity<YellowFoxD8ImportTriggerResultDto> startConfiguredYellowFoxPlan(
@PathVariable String planKey,
@RequestParam(required = false) ImportMode mode,
@RequestParam(required = false) AcquisitionStrategy strategy,
@RequestParam(defaultValue = "PLAN_ONLY") SchedulerTriggerMode triggerMode
) {
YellowFoxD8ImportRequest request = configuredImportPlanService.createRequest(planKey, mode, strategy);
YellowFoxD8ImportRunResultDto result = triggerMode == SchedulerTriggerMode.EXECUTE
? importExecutionService.startAndExecuteImport(request)
: importExecutionService.startImport(request);
return ResponseEntity.accepted().body(new YellowFoxD8ImportTriggerResultDto(
planKey,
request.mode(),
request.acquisitionStrategy(),
triggerMode,
OffsetDateTime.now(),
result
));
}
}

View File

@ -1,6 +1,11 @@
package at.procon.eventhub.yellowfox.camel; package at.procon.eventhub.yellowfox.camel;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingEventMapper; import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingEventMapper;
import at.procon.eventhub.yellowfox.service.YellowFoxD8IgnitionTransitionDetector;
import java.util.ArrayList;
import java.util.List;
import org.apache.camel.builder.RouteBuilder; import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -8,17 +13,34 @@ import org.springframework.stereotype.Component;
public class YellowFoxD8BookingInputRoute extends RouteBuilder { public class YellowFoxD8BookingInputRoute extends RouteBuilder {
private final YellowFoxD8BookingEventMapper mapper; private final YellowFoxD8BookingEventMapper mapper;
private final YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector;
public YellowFoxD8BookingInputRoute(YellowFoxD8BookingEventMapper mapper) { public YellowFoxD8BookingInputRoute(
YellowFoxD8BookingEventMapper mapper,
YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector
) {
this.mapper = mapper; this.mapper = mapper;
this.ignitionTransitionDetector = ignitionTransitionDetector;
} }
@Override @Override
public void configure() { public void configure() {
from("direct:yellowfox-d8-booking-input") from("direct:yellowfox-d8-booking-input")
.routeId("yellowfox-d8-booking-input-route") .routeId("yellowfox-d8-booking-input-route")
.process(exchange -> {
List<YellowFoxD8BookingDto> bookings = exchange.getMessage().getBody(List.class);
YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = ignitionTransitionDetector.newSession(false);
List<EventHubEventDto> events = new ArrayList<>();
for (YellowFoxD8BookingDto booking : bookings) {
events.add(mapper.map(booking));
EventHubEventDto ignitionEvent = ignitionSession.detect(booking);
if (ignitionEvent != null) {
events.add(ignitionEvent);
}
}
exchange.getMessage().setBody(events);
})
.split(body()) .split(body())
.bean(mapper, "map")
.to("direct:eventhub-normalized-input") .to("direct:eventhub-normalized-input")
.end(); .end();
} }

View File

@ -0,0 +1,33 @@
package at.procon.eventhub.yellowfox.camel;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportExecutionService;
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class YellowFoxD8ImportRequestRoute extends RouteBuilder {
private static final Logger log = LoggerFactory.getLogger(YellowFoxD8ImportRequestRoute.class);
private final YellowFoxD8ImportExecutionService executionService;
public YellowFoxD8ImportRequestRoute(YellowFoxD8ImportExecutionService executionService) {
this.executionService = executionService;
}
@Override
public void configure() {
from("direct:yellowfox-d8-import-start")
.routeId("yellowfox-d8-import-start-route")
.process(exchange -> {
YellowFoxD8ImportRequest request = exchange.getMessage().getBody(YellowFoxD8ImportRequest.class);
var result = executionService.startImport(request);
log.info("Prepared YellowFox D8 import run importRunId={} plannedPackages={} status={}",
result.importRunId(), result.plannedPackageCount(), result.status());
exchange.getMessage().setBody(result);
});
}
}

View File

@ -0,0 +1,96 @@
package at.procon.eventhub.yellowfox.config;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
@Configuration
@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.yellow-fox.datasource.jdbc-url:}')")
public class YellowFoxDataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "eventhub.yellow-fox.datasource")
public YellowFoxDataSourceProperties yellowFoxDataSourceProperties() {
return new YellowFoxDataSourceProperties();
}
@Bean(defaultCandidate = false)
public DataSource yellowFoxDataSource(YellowFoxDataSourceProperties config) {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setUrl(validateJdbcUrl(config));
dataSource.setUsername(config.getUsername());
dataSource.setPassword(config.getPassword());
String driverClassName = trimToNull(config.getDriverClassName());
if (driverClassName != null) {
dataSource.setDriverClassName(driverClassName);
}
return dataSource;
}
@Bean
public NamedParameterJdbcTemplate yellowFoxNamedParameterJdbcTemplate(
@Qualifier("yellowFoxDataSource") DataSource yellowFoxDataSource
) {
return new NamedParameterJdbcTemplate(yellowFoxDataSource);
}
private String validateJdbcUrl(YellowFoxDataSourceProperties config) {
String jdbcUrl = trimToNull(config.getJdbcUrl());
if (jdbcUrl == null) {
throw new IllegalStateException("eventhub.yellow-fox.datasource.jdbc-url must not be empty");
}
return jdbcUrl;
}
private String trimToNull(String value) {
if (value == null) {
return null;
}
String trimmedValue = value.trim();
return trimmedValue.isEmpty() ? null : trimmedValue;
}
public static class YellowFoxDataSourceProperties {
private String jdbcUrl;
private String username;
private String password;
private String driverClassName;
public String getJdbcUrl() {
return jdbcUrl;
}
public void setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDriverClassName() {
return driverClassName;
}
public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
}
}
}

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.yellowfox.dto;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import java.time.OffsetDateTime;
import java.util.Set;
public record ConfiguredYellowFoxD8ImportPlanDto(
String planKey,
boolean enabled,
String cron,
String tenantKey,
EventSourceDto eventSource,
SourceGroupRefDto sourceGroup,
ImportScopeDto importScope,
Set<EventFamily> eventFamilies,
ImportMode initialMode,
ImportMode scheduledMode,
AcquisitionStrategy initialStrategy,
AcquisitionStrategy scheduledStrategy,
boolean refreshMasterDataFirst,
OffsetDateTime initialOccurredFrom,
OffsetDateTime initialOccurredTo,
boolean runInitialOnStartup
) {
}

View File

@ -11,8 +11,10 @@ public record YellowFoxD8BookingDto(
String sourceInstanceKey, String sourceInstanceKey,
String tenantProviderSettingKey, String tenantProviderSettingKey,
String externalFleetKey, String externalFleetKey,
String externalFleetName,
String eventId, String eventId,
String key, String key,
Integer ignition,
Integer eventType, Integer eventType,
Integer state, Integer state,
OffsetDateTime occurredAt, OffsetDateTime occurredAt,

View File

@ -0,0 +1,27 @@
package at.procon.eventhub.yellowfox.dto;
import at.procon.eventhub.importing.ExtractionBatchResult;
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.UUID;
public record YellowFoxD8ExtractionBatchResultDto(
UUID packageId,
String extractionCode,
String sourceKind,
int sourceRowsRead,
int eventsMapped,
int eventsInserted,
int alreadyImported,
boolean executed,
OffsetDateTime lastSourcePackageImportedAt,
String lastSourcePackageId,
OffsetDateTime lastSourceRowUpdatedAt,
OffsetDateTime lastOccurredTo,
Map<String, Integer> eventTypeCounts
) implements ExtractionBatchResult {
public YellowFoxD8ExtractionBatchResultDto {
eventTypeCounts = eventTypeCounts == null ? Map.of() : Map.copyOf(eventTypeCounts);
}
}

View File

@ -0,0 +1,48 @@
package at.procon.eventhub.yellowfox.dto;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.importing.ImportRunRequest;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.util.EnumSet;
import java.util.Set;
public record YellowFoxD8ImportRequest(
@NotBlank String tenantKey,
@Valid @NotNull EventSourceDto eventSource,
@Valid SourceGroupRefDto sourceGroup,
@Valid @NotNull ImportScopeDto importScope,
Set<EventFamily> eventFamilies,
ImportMode mode,
boolean refreshMasterDataFirst,
AcquisitionStrategy acquisitionStrategy
) implements ImportRunRequest {
public YellowFoxD8ImportRequest {
tenantKey = tenantKey == null ? null : tenantKey.trim();
if (eventSource == null) {
eventSource = new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", null, null, null);
}
if (importScope == null) {
importScope = ImportScopeDto.tenantAll(null, null);
}
if (eventFamilies == null || eventFamilies.isEmpty()) {
eventFamilies = EnumSet.of(EventFamily.DRIVER_ACTIVITY, EventFamily.DRIVER_CARD);
} else {
eventFamilies = EnumSet.copyOf(eventFamilies);
}
if (mode == null) {
mode = ImportMode.INITIAL_BACKFILL;
}
if (acquisitionStrategy == null || acquisitionStrategy == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) {
acquisitionStrategy = mode == ImportMode.INCREMENTAL_UPDATE
? AcquisitionStrategy.SOURCE_ROW_WATERMARK
: AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP;
}
}
}

View File

@ -0,0 +1,15 @@
package at.procon.eventhub.yellowfox.dto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.importing.ImportPlanDto;
import java.util.List;
import java.util.UUID;
public record YellowFoxD8ImportRunResultDto(
UUID importRunId,
ImportRunStatus status,
int plannedPackageCount,
ImportPlanDto plan,
List<UUID> plannedPackageIds
) {
}

View File

@ -0,0 +1,16 @@
package at.procon.eventhub.yellowfox.dto;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.SchedulerTriggerMode;
import java.time.OffsetDateTime;
public record YellowFoxD8ImportTriggerResultDto(
String planKey,
ImportMode mode,
AcquisitionStrategy acquisitionStrategy,
SchedulerTriggerMode triggerMode,
OffsetDateTime triggeredAt,
YellowFoxD8ImportRunResultDto result
) {
}

View File

@ -0,0 +1,238 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupType;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.camel.ProducerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StreamUtils;
@Service
@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.yellow-fox.datasource.jdbc-url:}')")
public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD8ExtractionBatchExecutor {
private static final Logger log = LoggerFactory.getLogger(JdbcYellowFoxD8BookingExtractionBatchExecutor.class);
private static final String EXTRACTION_CODE = "YELLOWFOX_D8_BOOKING";
private static final int PROGRESS_LOG_INTERVAL = 5000;
private final NamedParameterJdbcTemplate jdbcTemplate;
private final ProducerTemplate producerTemplate;
private final ResourceLoader resourceLoader;
private final ImportCursorRepository importCursorRepository;
private final EventHubProperties properties;
private final YellowFoxD8BookingRowMapper rowMapper;
private final YellowFoxD8BookingEventMapper eventMapper;
private final YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector;
public JdbcYellowFoxD8BookingExtractionBatchExecutor(
@Qualifier("yellowFoxNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate,
ProducerTemplate producerTemplate,
ResourceLoader resourceLoader,
ImportCursorRepository importCursorRepository,
EventHubProperties properties,
YellowFoxD8BookingRowMapper rowMapper,
YellowFoxD8BookingEventMapper eventMapper,
YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector
) {
this.jdbcTemplate = jdbcTemplate;
this.producerTemplate = producerTemplate;
this.resourceLoader = resourceLoader;
this.importCursorRepository = importCursorRepository;
this.properties = properties;
this.rowMapper = rowMapper;
this.eventMapper = eventMapper;
this.ignitionTransitionDetector = ignitionTransitionDetector;
}
@Override
public YellowFoxD8ExtractionBatchResultDto execute(
UUID importRunId,
UUID packageId,
int eventSourceId,
YellowFoxD8ImportRequest request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
) {
if (!EXTRACTION_CODE.equals(planItem.extractionCode())) {
throw new IllegalArgumentException("Unsupported YellowFox extraction code " + planItem.extractionCode());
}
ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk);
ImportCursorStateDto cursor = findCursor(eventSourceId, request, planItem);
Map<String, Object> params = parameters(request, chunkScope, cursor);
Stats stats = new Stats();
YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = ignitionTransitionDetector
.newSession(properties.getYellowFox().isEmitInitialIgnitionSnapshot());
log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={}",
request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), params.get("fleetId"), request.acquisitionStrategy());
jdbcTemplate.query(loadSql(), params, rs -> {
stats.sourceRowsRead++;
YellowFoxD8BookingDto booking = rowMapper.map(
rs,
request.tenantKey(),
request.eventSource().sourceInstanceKey(),
request.eventSource().tenantProviderSettingKey()
);
stats.acceptSourceRow(booking);
if (!hasEventReference(booking)) {
stats.skippedRows++;
return;
}
EventHubEventDto primaryEvent = eventMapper.map(booking);
send(primaryEvent, stats);
EventHubEventDto ignitionEvent = ignitionSession.detect(booking);
if (ignitionEvent != null) {
send(ignitionEvent, stats);
}
if (stats.sourceRowsRead % PROGRESS_LOG_INTERVAL == 0) {
log.info("YellowFox D8 extraction progress tenant={} importRunId={} packageId={} rows={} events={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.eventTypeCounts);
}
});
log.info("Finished YellowFox D8 extraction tenant={} importRunId={} packageId={} rows={} events={} skippedRows={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.skippedRows, stats.eventTypeCounts);
return new YellowFoxD8ExtractionBatchResultDto(
packageId,
planItem.extractionCode(),
planItem.sourceKind(),
stats.sourceRowsRead,
stats.eventsSent,
stats.eventsSent,
stats.skippedRows,
true,
null,
stats.lastEventId,
stats.lastOccurredAt,
stats.lastOccurredAt,
stats.eventTypeCounts
);
}
private ImportCursorStateDto findCursor(int eventSourceId, YellowFoxD8ImportRequest request, ImportPlanItemDto planItem) {
String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
return importCursorRepository.findCursor(
request.tenantKey(),
eventSourceId,
scopeHash,
planItem.eventFamily(),
planItem.sourceKind(),
request.acquisitionStrategy()
);
}
private Map<String, Object> parameters(YellowFoxD8ImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) {
Map<String, Object> params = new HashMap<>();
params.put("occurredFrom", scope == null ? null : scope.occurredFrom());
params.put("occurredTo", scope == null ? null : scope.occurredTo());
params.put("fleetId", fleetId(request));
if (request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_ROW_WATERMARK && cursor != null) {
OffsetDateTime lastOccurredTo = cursor.lastOccurredTo();
String lastSourceRowId = cursor.lastSourcePackageId();
if (lastOccurredTo != null && properties.getYellowFox().getOccurredAtOverlap() != null
&& !properties.getYellowFox().getOccurredAtOverlap().isZero()) {
lastOccurredTo = lastOccurredTo.minus(properties.getYellowFox().getOccurredAtOverlap());
lastSourceRowId = null;
}
params.put("lastOccurredTo", lastOccurredTo);
params.put("lastSourceRowId", lastSourceRowId);
} else {
params.put("lastOccurredTo", null);
params.put("lastSourceRowId", null);
}
return params;
}
private Integer fleetId(YellowFoxD8ImportRequest request) {
if (request.sourceGroup() == null || request.sourceGroup().type() != SourceGroupType.FLEET) {
return null;
}
String value = request.sourceGroup().sourceEntityId() == null
? request.sourceGroup().code()
: request.sourceGroup().sourceEntityId();
if (value == null || value.isBlank()) {
return null;
}
try {
return Integer.parseInt(value.trim());
} catch (NumberFormatException ex) {
throw new IllegalArgumentException("YellowFox D8 fleet sourceEntityId/code must be numeric for JDBC import: " + value, ex);
}
}
private ImportScopeDto chunkScope(ImportScopeDto scope, ImportTimeChunkDto chunk) {
if (scope == null) {
return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo());
}
return new ImportScopeDto(scope.type(), scope.rootSourceOrganisation(), scope.includeChildren(), chunk.occurredFrom(), chunk.occurredTo());
}
private boolean hasEventReference(YellowFoxD8BookingDto booking) {
return (booking.driverRef() != null && booking.driverRef().hasAnyReference())
|| (booking.vehicleRef() != null && booking.vehicleRef().hasAnyReference());
}
private void send(EventHubEventDto event, Stats stats) {
producerTemplate.sendBody("direct:eventhub-normalized-input", event);
stats.acceptEvent(event);
}
private String loadSql() {
Resource resource = resourceLoader.getResource("classpath:sql/yellowfox/d8-bookings.sql");
try (var in = resource.getInputStream()) {
return StreamUtils.copyToString(in, StandardCharsets.UTF_8);
} catch (IOException ex) {
throw new IllegalStateException("Failed to load YellowFox D8 extraction SQL", ex);
}
}
private static class Stats {
private int sourceRowsRead;
private int eventsSent;
private int skippedRows;
private OffsetDateTime lastOccurredAt;
private String lastEventId;
private final Map<String, Integer> eventTypeCounts = new LinkedHashMap<>();
private void acceptSourceRow(YellowFoxD8BookingDto booking) {
if (booking.occurredAt() != null) {
lastOccurredAt = booking.occurredAt();
lastEventId = booking.eventId();
}
}
private void acceptEvent(EventHubEventDto event) {
eventsSent++;
EventType type = event.eventType();
String key = event.eventDomain().name() + "/" + (type == null ? "UNKNOWN" : type.name());
eventTypeCounts.merge(key, 1, Integer::sum);
}
}
}

View File

@ -0,0 +1,47 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.extraction.AbstractNoopExtractionBatchExecutor;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.util.UUID;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.stereotype.Service;
@Service
@ConditionalOnMissingBean(YellowFoxD8ExtractionBatchExecutor.class)
@ConditionalOnExpression("!T(org.springframework.util.StringUtils).hasText('${eventhub.yellow-fox.datasource.jdbc-url:}')")
public class NoopYellowFoxD8ExtractionBatchExecutor
extends AbstractNoopExtractionBatchExecutor<YellowFoxD8ImportRequest, YellowFoxD8ExtractionBatchResultDto>
implements YellowFoxD8ExtractionBatchExecutor {
@Override
protected YellowFoxD8ExtractionBatchResultDto emptyResult(
UUID packageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
) {
return new YellowFoxD8ExtractionBatchResultDto(
packageId,
planItem.extractionCode(),
planItem.sourceKind(),
0,
0,
0,
0,
false,
null,
null,
null,
null,
java.util.Map.of()
);
}
@Override
protected String providerName() {
return "yellowfox-d8";
}
}

View File

@ -2,7 +2,8 @@ package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.dto.CardSlot; import at.procon.eventhub.dto.CardSlot;
import at.procon.eventhub.dto.CardStatus; import at.procon.eventhub.dto.CardStatus;
import at.procon.eventhub.dto.DrivingStatus; import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDetailsDto;
import at.procon.eventhub.dto.EventDomain; import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventHubPackageRequest;
@ -13,6 +14,7 @@ import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto; import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.SourceGroupType; import at.procon.eventhub.dto.SourceGroupType;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.service.EventDetailsFactory; import at.procon.eventhub.service.EventDetailsFactory;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import java.time.LocalDate; import java.time.LocalDate;
@ -25,6 +27,8 @@ import org.springframework.stereotype.Component;
@Component @Component
public class YellowFoxD8BookingEventMapper { public class YellowFoxD8BookingEventMapper {
public static final String DETAIL_SOURCE = "YELLOWFOX_D8";
private final EventDetailsFactory detailsFactory; private final EventDetailsFactory detailsFactory;
public YellowFoxD8BookingEventMapper(EventDetailsFactory detailsFactory) { public YellowFoxD8BookingEventMapper(EventDetailsFactory detailsFactory) {
@ -33,30 +37,42 @@ public class YellowFoxD8BookingEventMapper {
public EventHubEventDto map(YellowFoxD8BookingDto source) { public EventHubEventDto map(YellowFoxD8BookingDto source) {
NormalizedEvent normalized = normalize(source.eventType(), source.state()); NormalizedEvent normalized = normalize(source.eventType(), source.state());
Map<String, Object> payload = new LinkedHashMap<>(); return event(source, sourceEventId(source, "D8_BOOKING"), normalized, detailsFor(source, normalized), false);
if (source.payload() != null) { }
payload.putAll(source.payload());
}
payload.put("provider", "YELLOWFOX");
payload.put("sourceKind", "TELEMATICS_PLATFORM");
payload.put("yellowFoxEventType", source.eventType());
payload.put("yellowFoxState", source.state());
payload.put("yellowFoxKey", source.key());
EventSourceDto eventSource = new EventSourceDto( public EventHubEventDto mapIgnitionTransition(YellowFoxD8BookingDto source, Integer previousIgnitionState) {
"YELLOWFOX", if (source.ignition() == null) {
"TELEMATICS_PLATFORM", return null;
"YELLOWFOX_D8", }
source.sourceInstanceKey(), boolean ignitionOn = source.ignition() == 1;
source.tenantProviderSettingKey(), NormalizedEvent normalized = new NormalizedEvent(
source.externalFleetKey() EventDomain.IGNITION,
ignitionOn ? EventType.IGNITION_ON : EventType.IGNITION_OFF,
ignitionOn ? EventLifecycle.ON : EventLifecycle.OFF,
null,
null
); );
Map<String, Object> attributes = baseAttributes(source, normalized);
attributes.put("previousIgnitionState", previousIgnitionState);
attributes.put("previousIgnitionMeaning", ignitionMeaning(previousIgnitionState));
attributes.put("derivedFrom", "YELLOWFOX_D8_BOOKING");
attributes.put("sourceEventId", sourceEventId(source, "D8_BOOKING"));
EventDetailsDto details = new EventDetailsDto("IGNITION", detailsFactory.payloadFromMap(attributes));
return event(source, sourceEventId(source, "D8_IGNITION") + ":" + (ignitionOn ? "ON" : "OFF"), normalized, details, true);
}
private EventHubEventDto event(
YellowFoxD8BookingDto source,
String externalSourceEventId,
NormalizedEvent normalized,
EventDetailsDto details,
boolean derived
) {
EventSourceDto eventSource = eventSource(source);
LocalDate businessDate = source.occurredAt().toLocalDate(); LocalDate businessDate = source.occurredAt().toLocalDate();
var occurredFrom = businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); var occurredFrom = businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime();
var occurredTo = businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); var occurredTo = businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime();
SourceGroupRefDto sourceGroup = source.externalFleetKey() == null || source.externalFleetKey().isBlank() SourceGroupRefDto sourceGroup = sourceGroup(source);
? null
: new SourceGroupRefDto(SourceGroupType.FLEET, source.externalFleetKey(), source.externalFleetKey(), null);
EventHubPackageRequest packageInfo = new EventHubPackageRequest( EventHubPackageRequest packageInfo = new EventHubPackageRequest(
tenantOrDefault(source.tenantKey()), tenantOrDefault(source.tenantKey()),
eventSource, eventSource,
@ -69,8 +85,8 @@ public class YellowFoxD8BookingEventMapper {
return new EventHubEventDto( return new EventHubEventDto(
UUID.randomUUID(), UUID.randomUUID(),
source.eventId(), externalSourceEventId,
source.driverRef(), derived && normalized.domain() == EventDomain.IGNITION ? null : source.driverRef(),
source.vehicleRef(), source.vehicleRef(),
source.occurredAt(), source.occurredAt(),
source.receivedPartnerAt(), source.receivedPartnerAt(),
@ -79,60 +95,170 @@ public class YellowFoxD8BookingEventMapper {
normalized.type(), normalized.type(),
normalized.lifecycle(), normalized.lifecycle(),
source.odometerM(), source.odometerM(),
new GeoPointDto(source.latitude(), source.longitude()), point(source),
detailsFor(normalized), details,
null, null,
detailsFactory.payloadFromMap(payload), detailsFactory.payloadFromMap(payload(source, derived)),
false, false,
packageInfo packageInfo
); );
} }
public EventSourceDto eventSource(YellowFoxD8BookingDto source) {
return new EventSourceDto(
"YELLOWFOX",
"TELEMATICS_PLATFORM",
"YELLOWFOX_D8",
source.sourceInstanceKey(),
source.tenantProviderSettingKey(),
source.externalFleetKey()
);
}
private SourceGroupRefDto sourceGroup(YellowFoxD8BookingDto source) {
if (source.externalFleetKey() == null || source.externalFleetKey().isBlank()) {
return null;
}
return new SourceGroupRefDto(
SourceGroupType.FLEET,
source.externalFleetKey(),
source.externalFleetKey(),
source.externalFleetName() == null ? source.externalFleetKey() : source.externalFleetName()
);
}
private GeoPointDto point(YellowFoxD8BookingDto source) {
if (source.latitude() == null && source.longitude() == null) {
return null;
}
return new GeoPointDto(source.latitude(), source.longitude());
}
private String tenantOrDefault(String value) { private String tenantOrDefault(String value) {
return value == null || value.isBlank() ? "default" : value.trim(); return value == null || value.isBlank() ? "default" : value.trim();
} }
private at.procon.eventhub.dto.EventDetailsDto detailsFor(NormalizedEvent normalized) { private String sourceEventId(YellowFoxD8BookingDto source, String prefix) {
if (normalized.domain() == EventDomain.DRIVER_ACTIVITY) { String eventId = source.eventId() == null || source.eventId().isBlank() ? "UNKNOWN_EVENT" : source.eventId().trim();
return detailsFactory.driverActivity(CardSlot.DRIVER, CardStatus.INSERTED, DrivingStatus.UNKNOWN); return prefix + ":" + eventId + ":" + source.occurredAt();
}
private Map<String, Object> payload(YellowFoxD8BookingDto source, boolean derived) {
Map<String, Object> payload = new LinkedHashMap<>();
if (source.payload() != null) {
payload.putAll(source.payload());
} }
if (normalized.domain() == EventDomain.DRIVER_CARD) { payload.put("provider", "YELLOWFOX");
CardStatus status = normalized.lifecycle() == EventLifecycle.INSERT ? CardStatus.INSERTED : CardStatus.NOT_INSERTED; payload.put("sourceKind", "TELEMATICS_PLATFORM");
return detailsFactory.driverCard(CardSlot.DRIVER, status); payload.put("sourceKey", "YELLOWFOX_D8");
} payload.put("derived", derived);
return null; payload.put("yellowFoxEventType", source.eventType());
payload.put("yellowFoxState", source.state());
payload.put("yellowFoxIgnition", source.ignition());
payload.put("yellowFoxKey", source.key());
return payload;
}
private EventDetailsDto detailsFor(YellowFoxD8BookingDto source, NormalizedEvent normalized) {
String type = switch (normalized.domain()) {
case DRIVER_ACTIVITY -> "DRIVER_ACTIVITY";
case DRIVER_CARD -> "DRIVER_CARD";
default -> "YELLOWFOX_D8_BOOKING";
};
return new EventDetailsDto(type, detailsFactory.payloadFromMap(baseAttributes(source, normalized)));
}
private Map<String, Object> baseAttributes(YellowFoxD8BookingDto source, NormalizedEvent normalized) {
Map<String, Object> attributes = new LinkedHashMap<>();
attributes.put("source", DETAIL_SOURCE);
attributes.put("yellowFoxEventType", source.eventType());
attributes.put("yellowFoxState", source.state());
attributes.put("yellowFoxStateMeaning", stateMeaning(source.eventType(), source.state()));
attributes.put("yellowFoxKey", source.key());
attributes.put("ignitionState", source.ignition());
attributes.put("ignitionMeaning", ignitionMeaning(source.ignition()));
attributes.put("odometer", source.odometerM());
attributes.put("slot", normalized.slot() == null ? null : normalized.slot().name());
attributes.put("slotNumber", normalized.slotNumber());
attributes.put("cardSlot", normalized.slot() == null ? null : normalized.slot().name());
attributes.put("eventDomain", normalized.domain().name());
attributes.put("eventType", normalized.type().name());
attributes.put("lifecycle", normalized.lifecycle().name());
removeNullValues(attributes);
return attributes;
}
private void removeNullValues(Map<String, Object> attributes) {
attributes.entrySet().removeIf(entry -> entry.getValue() == null);
} }
private NormalizedEvent normalize(Integer eventType, Integer state) { private NormalizedEvent normalize(Integer eventType, Integer state) {
if (eventType == null || state == null) { if (eventType == null || state == null) {
return new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT); return new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT, null, null);
} }
return switch (eventType) { return switch (eventType) {
case 0, 1 -> normalizeCardActivity(state); case 0 -> normalizeCardActivity(CardSlot.DRIVER, 1, state);
case 2, 3 -> normalizeDriverActivity(state); case 1 -> normalizeCardActivity(CardSlot.CO_DRIVER, 2, state);
default -> new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT); case 2 -> normalizeDriverActivity(CardSlot.DRIVER, 1, state);
case 3 -> normalizeDriverActivity(CardSlot.CO_DRIVER, 2, state);
default -> new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT, null, null);
}; };
} }
private NormalizedEvent normalizeCardActivity(Integer state) { private NormalizedEvent normalizeCardActivity(CardSlot slot, int slotNumber, Integer state) {
return switch (state) { return switch (state) {
case 0 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW); case 0 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW, slot, slotNumber);
case 1 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT); case 1 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT, slot, slotNumber);
default -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT); default -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT, slot, slotNumber);
}; };
} }
private NormalizedEvent normalizeDriverActivity(Integer state) { private NormalizedEvent normalizeDriverActivity(CardSlot slot, int slotNumber, Integer state) {
return switch (state) { return switch (state) {
case 0 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.BREAK_REST, EventLifecycle.SNAPSHOT); case 0 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.BREAK_REST, EventLifecycle.START, slot, slotNumber);
case 1 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.AVAILABILITY, EventLifecycle.SNAPSHOT); case 1 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.AVAILABILITY, EventLifecycle.START, slot, slotNumber);
case 2 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.WORK, EventLifecycle.SNAPSHOT); case 2 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.WORK, EventLifecycle.START, slot, slotNumber);
case 3 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.SNAPSHOT); case 3 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START, slot, slotNumber);
default -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.UNKNOWN_ACTIVITY, EventLifecycle.SNAPSHOT); default -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.UNKNOWN_ACTIVITY, EventLifecycle.SNAPSHOT, slot, slotNumber);
}; };
} }
private record NormalizedEvent(EventDomain domain, EventType type, EventLifecycle lifecycle) { private String stateMeaning(Integer eventType, Integer state) {
if (eventType == null || state == null) {
return null;
}
if (eventType == 0 || eventType == 1) {
return switch (state) {
case 0 -> "CARD_REMOVED";
case 1 -> "CARD_INSERTED";
default -> "UNKNOWN_CARD_STATE";
};
}
if (eventType == 2 || eventType == 3) {
return switch (state) {
case 0 -> "PAUSE_OR_REST";
case 1 -> "STANDBY_TIME";
case 2 -> "WORK_TIME";
case 3 -> "STEERING_TIME";
default -> "UNKNOWN_ACTIVITY_STATE";
};
}
return "UNKNOWN_EVENT_TYPE";
}
private String ignitionMeaning(Integer ignition) {
if (ignition == null) {
return null;
}
return ignition == 1 ? "ON" : ignition == 0 ? "OFF" : "UNKNOWN";
}
private record NormalizedEvent(
EventDomain domain,
EventType type,
EventLifecycle lifecycle,
CardSlot slot,
Integer slotNumber
) {
} }
} }

View File

@ -0,0 +1,117 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
import org.springframework.stereotype.Component;
@Component
public class YellowFoxD8BookingRowMapper {
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {
};
private final ObjectMapper objectMapper;
public YellowFoxD8BookingRowMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
public YellowFoxD8BookingDto map(ResultSet rs, String tenantKey, String sourceInstanceKey, String tenantProviderSettingKey) throws SQLException {
String eventId = rs.getString("eventid");
OffsetDateTime occurredAt = rs.getObject("utc", OffsetDateTime.class);
Integer vehicleId = getInteger(rs, "vehicle_id");
Integer driverId = getInteger(rs, "driver_id");
String vehicleVrn = rs.getString("vehicle_vrn");
String vehicleVin = rs.getString("vehicle_vin");
String driverCard = rs.getString("driver_card_number");
Integer fleetId = getInteger(rs, "fleet_id");
String fleetName = rs.getString("fleet_name");
Integer odometer = getInteger(rs, "odometer");
Map<String, Object> payload = payload(rs.getString("payload"));
put(payload, "yellowFoxEventId", eventId);
put(payload, "yellowFoxOdometerRaw", odometer);
put(payload, "vehicleVrn", vehicleVrn);
put(payload, "vehicleVin", vehicleVin);
put(payload, "driverCardNumber", driverCard);
put(payload, "driverFirstName", rs.getString("driver_firstname"));
put(payload, "driverLastName", rs.getString("driver_lastname"));
put(payload, "fleetId", fleetId);
put(payload, "fleetName", fleetName);
put(payload, "telematicProviderId", getInteger(rs, "telematic_provider_id"));
put(payload, "telematicProviderName", rs.getString("telematic_provider_name"));
DriverRefDto driverRef = driverId == null && isBlank(driverCard)
? null
: new DriverRefDto(driverId == null ? null : driverId.toString(), new DriverCardRefDto(null, driverCard));
VehicleRefDto vehicleRef = vehicleId == null && isBlank(vehicleVin) && isBlank(vehicleVrn)
? null
: new VehicleRefDto(
vehicleId == null ? null : vehicleId.toString(),
vehicleVin,
vehicleId == null ? null : vehicleId.toString(),
new VehicleRegistrationRefDto(null, vehicleVrn)
);
return new YellowFoxD8BookingDto(
tenantKey,
sourceInstanceKey,
tenantProviderSettingKey,
fleetId == null ? null : fleetId.toString(),
fleetName,
eventId,
rs.getString("key"),
getInteger(rs, "ignition"),
getInteger(rs, "eventtype"),
getInteger(rs, "state"),
occurredAt,
null,
driverRef,
vehicleRef,
odometer == null ? null : odometer.longValue() * 1000L,
rs.getBigDecimal("latitude"),
rs.getBigDecimal("longitude"),
payload
);
}
private Integer getInteger(ResultSet rs, String column) throws SQLException {
int value = rs.getInt(column);
return rs.wasNull() ? null : value;
}
private Map<String, Object> payload(String json) {
if (json == null || json.isBlank()) {
return new LinkedHashMap<>();
}
try {
return new LinkedHashMap<>(objectMapper.readValue(json, MAP_TYPE));
} catch (Exception ex) {
Map<String, Object> fallback = new LinkedHashMap<>();
fallback.put("rawPayload", json);
fallback.put("payloadParseError", ex.getMessage());
return fallback;
}
}
private void put(Map<String, Object> target, String key, Object value) {
if (value != null) {
target.put(key, value instanceof BigDecimal bd ? bd.stripTrailingZeros().toPlainString() : value);
}
}
private boolean isBlank(String value) {
return value == null || value.isBlank();
}
}

View File

@ -0,0 +1,62 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.importing.AbstractConfiguredImportPlanService;
import at.procon.eventhub.importing.ConfiguredImportPlanDto;
import at.procon.eventhub.yellowfox.dto.ConfiguredYellowFoxD8ImportPlanDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import org.springframework.stereotype.Service;
@Service
public class YellowFoxD8ConfiguredImportPlanService
extends AbstractConfiguredImportPlanService<YellowFoxD8ImportRequest, ConfiguredYellowFoxD8ImportPlanDto> {
public YellowFoxD8ConfiguredImportPlanService(EventHubProperties properties) {
super(() -> properties.getYellowFox().getImportPlans(), "yellowfox-d8");
}
@Override
protected YellowFoxD8ImportRequest buildRequest(
EventHubProperties.ConfiguredImportPlan plan,
ImportMode mode,
AcquisitionStrategy strategy,
ImportScopeDto scope
) {
return new YellowFoxD8ImportRequest(
plan.getTenantKey(),
plan.getEventSource(),
plan.getSourceGroup(),
scope,
plan.getEventFamilies(),
mode,
false,
strategy
);
}
@Override
protected ConfiguredYellowFoxD8ImportPlanDto toDto(EventHubProperties.ConfiguredImportPlan plan) {
ConfiguredImportPlanDto dto = genericDto(plan);
return new ConfiguredYellowFoxD8ImportPlanDto(
dto.planKey(),
dto.enabled(),
dto.cron(),
dto.tenantKey(),
dto.eventSource(),
dto.sourceGroup(),
dto.importScope(),
dto.eventFamilies(),
dto.initialMode(),
dto.scheduledMode(),
dto.initialStrategy(),
dto.scheduledStrategy(),
false,
dto.initialOccurredFrom(),
dto.initialOccurredTo(),
dto.runInitialOnStartup()
);
}
}

View File

@ -0,0 +1,22 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.extraction.ExtractionBatchExecutor;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.util.UUID;
public interface YellowFoxD8ExtractionBatchExecutor
extends ExtractionBatchExecutor<YellowFoxD8ImportRequest, YellowFoxD8ExtractionBatchResultDto> {
@Override
YellowFoxD8ExtractionBatchResultDto execute(
UUID importRunId,
UUID packageId,
int eventSourceId,
YellowFoxD8ImportRequest request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
);
}

View File

@ -0,0 +1,47 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import java.util.HashMap;
import java.util.Map;
import org.springframework.stereotype.Component;
@Component
public class YellowFoxD8IgnitionTransitionDetector {
private final YellowFoxD8BookingEventMapper mapper;
public YellowFoxD8IgnitionTransitionDetector(YellowFoxD8BookingEventMapper mapper) {
this.mapper = mapper;
}
public Session newSession(boolean emitInitialSnapshot) {
return new Session(mapper, emitInitialSnapshot);
}
public static class Session {
private final YellowFoxD8BookingEventMapper mapper;
private final boolean emitInitialSnapshot;
private final Map<String, Integer> lastIgnitionByVehicle = new HashMap<>();
private Session(YellowFoxD8BookingEventMapper mapper, boolean emitInitialSnapshot) {
this.mapper = mapper;
this.emitInitialSnapshot = emitInitialSnapshot;
}
public EventHubEventDto detect(YellowFoxD8BookingDto booking) {
if (booking == null || booking.ignition() == null || booking.vehicleRef() == null || !booking.vehicleRef().hasAnyReference()) {
return null;
}
String vehicleKey = booking.vehicleRef().stableKey();
Integer previous = lastIgnitionByVehicle.put(vehicleKey, booking.ignition());
if (previous == null) {
return emitInitialSnapshot ? mapper.mapIgnitionTransition(booking, null) : null;
}
if (!previous.equals(booking.ignition())) {
return mapper.mapIgnitionTransition(booking, previous);
}
return null;
}
}
}

View File

@ -0,0 +1,52 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.importing.ImportChunkPlanner;
import at.procon.eventhub.importing.ImportPlanDto;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.util.List;
import org.springframework.stereotype.Service;
@Service
public class YellowFoxD8ImportPlanService {
public static final String SOURCE_KIND = "TELEMATICS_PLATFORM";
public static final String EXTRACTION_CODE = "YELLOWFOX_D8_BOOKING";
private final EventHubProperties properties;
private final ImportChunkPlanner chunkPlanner;
public YellowFoxD8ImportPlanService(EventHubProperties properties, ImportChunkPlanner chunkPlanner) {
this.properties = properties;
this.chunkPlanner = chunkPlanner;
}
public ImportPlanDto createPlan(YellowFoxD8ImportRequest request) {
return new ImportPlanDto(
request.tenantKey(),
request.mode(),
request.acquisitionStrategy(),
request.refreshMasterDataFirst(),
request.importScope(),
request.sourceGroup(),
request.eventSource(),
chunkPlanner.chunksFor(request, properties.getYellowFox().getDefaultChunkDays()),
List.of(item(request.acquisitionStrategy()))
);
}
private ImportPlanItemDto item(AcquisitionStrategy strategy) {
return new ImportPlanItemDto(
EventFamily.DRIVER_ACTIVITY,
SOURCE_KIND,
EXTRACTION_CODE,
List.of("data.d8_booking", "data.vehicle", "data.driver", "data.fleet", "data.telematic_provider"),
"VEHICLE",
"YellowFox D8 booking card/activity events with derived ignition transitions",
strategy
);
}
}

View File

@ -0,0 +1,79 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.SchedulerTriggerMode;
import at.procon.eventhub.importing.AbstractImportScheduler;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.util.List;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class YellowFoxD8ImportScheduler extends AbstractImportScheduler<YellowFoxD8ImportRequest> {
private final EventHubProperties properties;
private final YellowFoxD8ConfiguredImportPlanService configuredPlanService;
private final YellowFoxD8ImportExecutionService executionService;
public YellowFoxD8ImportScheduler(
EventHubProperties properties,
YellowFoxD8ConfiguredImportPlanService configuredPlanService,
YellowFoxD8ImportExecutionService executionService
) {
this.properties = properties;
this.configuredPlanService = configuredPlanService;
this.executionService = executionService;
}
@EventListener(ApplicationReadyEvent.class)
public void triggerInitialPlansOnStartup() {
super.triggerInitialPlansOnStartup();
}
@Scheduled(fixedDelayString = "${eventhub.yellow-fox.scheduler-poll-interval-ms:60000}")
public void pollConfiguredPlans() {
super.pollConfiguredPlans();
}
@Override
protected boolean schedulerEnabled() {
return properties.getYellowFox().isSchedulerEnabled();
}
@Override
protected List<EventHubProperties.ConfiguredImportPlan> configuredPlans() {
return properties.getYellowFox().getImportPlans();
}
@Override
protected SchedulerTriggerMode schedulerTriggerMode() {
return properties.getYellowFox().getSchedulerTriggerMode();
}
@Override
protected YellowFoxD8ImportRequest createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) {
return configuredPlanService.createInitialRequest(plan);
}
@Override
protected YellowFoxD8ImportRequest createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) {
return configuredPlanService.createScheduledRequest(plan);
}
@Override
protected void startImport(YellowFoxD8ImportRequest request) {
executionService.startImport(request);
}
@Override
protected void startAndExecuteImport(YellowFoxD8ImportRequest request) {
executionService.startAndExecuteImport(request);
}
@Override
protected String providerName() {
return "yellowfox-d8";
}
}

View File

@ -0,0 +1,15 @@
create index if not exists idx_event_detail_yellowfox_slot
on eventhub.event_detail(detail_type, (attributes ->> 'slot'), event_occurred_at)
where detail_type in ('DRIVER_ACTIVITY', 'DRIVER_CARD');
create index if not exists idx_event_detail_yellowfox_eventtype_state
on eventhub.event_detail(
(attributes ->> 'yellowFoxEventType'),
(attributes ->> 'yellowFoxState'),
event_occurred_at
)
where attributes ? 'yellowFoxEventType';
create index if not exists idx_event_detail_yellowfox_ignition
on eventhub.event_detail(detail_type, (attributes ->> 'ignitionState'), event_occurred_at)
where attributes ? 'ignitionState';

View File

@ -0,0 +1,10 @@
# YellowFox D8 SQL
Recommended source-side index for incremental import:
```sql
create index if not exists d8_booking_utc_eventid_idx
on data.d8_booking (utc asc, eventid asc);
```
The import uses SOURCE_ROW_WATERMARK with `(utc, eventid)`.

View File

@ -0,0 +1,49 @@
select
b.eventid,
b.utc,
b.vehicle_id,
b.driver_id,
b.key,
b.ignition,
b.eventtype,
b.state,
b.odometer,
st_y(b.geom) as latitude,
st_x(b.geom) as longitude,
b.payload,
v.vrn as vehicle_vrn,
v.vin as vehicle_vin,
v.fleet_id as vehicle_fleet_id,
d.firstname as driver_firstname,
d.name as driver_lastname,
d.drivers_card as driver_card_number,
d.fleet_id as driver_fleet_id,
f.id as fleet_id,
f.name as fleet_name,
tp.id as telematic_provider_id,
tp.name as telematic_provider_name
from data.d8_booking b
left join data.vehicle v
on v.id = b.vehicle_id
left join data.driver d
on d.id = b.driver_id
left join data.fleet f
on f.id = coalesce(v.fleet_id, d.fleet_id)
left join data.telematic_provider tp
on tp.id = v.telematic_provider_id
where (:occurredFrom is null or b.utc >= :occurredFrom)
and (:occurredTo is null or b.utc < :occurredTo)
and (:fleetId is null or f.id = :fleetId)
and (
:lastOccurredTo is null
or b.utc > :lastOccurredTo
or (
b.utc = :lastOccurredTo
and (:lastSourceRowId is null or b.eventid > :lastSourceRowId)
)
)
order by b.utc asc, b.eventid asc;

View File

@ -1,5 +1,6 @@
package at.procon.eventhub; package at.procon.eventhub;
import at.procon.eventhub.dto.CardSlot;
import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDomain; import at.procon.eventhub.dto.EventDomain;
@ -7,8 +8,8 @@ import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType; import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import at.procon.eventhub.service.EventDetailsFactory; import at.procon.eventhub.service.EventDetailsFactory;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingEventMapper; import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingEventMapper;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
@ -23,16 +24,67 @@ class YellowFoxD8BookingEventMapperTest {
); );
@Test @Test
void mapsYellowFoxDrivingStateAsSnapshotWithEventSourceAndDetails() { void mapsYellowFoxDriverSlotActivityAsStartWithSlotDetails() {
YellowFoxD8BookingDto source = new YellowFoxD8BookingDto( var source = booking(2, 3, 1);
var result = mapper.map(source);
assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_ACTIVITY);
assertThat(result.eventType()).isEqualTo(EventType.DRIVE);
assertThat(result.lifecycle()).isEqualTo(EventLifecycle.START);
assertThat(result.externalSourceEventId()).startsWith("D8_BOOKING:event-1:");
assertThat(result.eventDetails().type()).isEqualTo("DRIVER_ACTIVITY");
assertThat(result.eventDetails().attributes().get("slot").asText()).isEqualTo(CardSlot.DRIVER.name());
assertThat(result.eventDetails().attributes().get("slotNumber").asInt()).isEqualTo(1);
assertThat(result.eventDetails().attributes().get("yellowFoxStateMeaning").asText()).isEqualTo("STEERING_TIME");
assertThat(result.eventDetails().attributes().get("ignitionState").asInt()).isEqualTo(1);
}
@Test
void mapsYellowFoxCoDriverSlotActivityAsStartWithSlotDetails() {
var result = mapper.map(booking(3, 2, 0));
assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_ACTIVITY);
assertThat(result.eventType()).isEqualTo(EventType.WORK);
assertThat(result.lifecycle()).isEqualTo(EventLifecycle.START);
assertThat(result.eventDetails().attributes().get("slot").asText()).isEqualTo(CardSlot.CO_DRIVER.name());
assertThat(result.eventDetails().attributes().get("slotNumber").asInt()).isEqualTo(2);
}
@Test
void mapsCardInsertedUsingSameTachographDomainTypeAndLifecycle() {
var result = mapper.map(booking(0, 1, 1));
assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_CARD);
assertThat(result.eventType()).isEqualTo(EventType.CARD_INSERTED);
assertThat(result.lifecycle()).isEqualTo(EventLifecycle.INSERT);
assertThat(result.eventDetails().attributes().get("slot").asText()).isEqualTo(CardSlot.DRIVER.name());
}
@Test
void mapsIgnitionTransitionOnlyWhenDetectorRequestsIt() {
var result = mapper.mapIgnitionTransition(booking(2, 3, 1), 0);
assertThat(result.eventDomain()).isEqualTo(EventDomain.IGNITION);
assertThat(result.eventType()).isEqualTo(EventType.IGNITION_ON);
assertThat(result.lifecycle()).isEqualTo(EventLifecycle.ON);
assertThat(result.externalSourceEventId()).contains("D8_IGNITION:event-1:");
assertThat(result.eventDetails().attributes().get("previousIgnitionState").asInt()).isEqualTo(0);
assertThat(result.eventDetails().attributes().get("ignitionMeaning").asText()).isEqualTo("ON");
}
private YellowFoxD8BookingDto booking(int eventType, int state, int ignition) {
return new YellowFoxD8BookingDto(
"tenant-1", "tenant-1",
"yellowfox-tenant-7", "yellowfox-tenant-7",
"tenant-yellowfox-7", "tenant-yellowfox-7",
"7", "7",
"Fleet 7",
"event-1", "event-1",
"key-1", "key-1",
2, ignition,
3, eventType,
state,
OffsetDateTime.parse("2026-04-29T08:15:00+02:00"), OffsetDateTime.parse("2026-04-29T08:15:00+02:00"),
null, null,
new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")), new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")),
@ -42,20 +94,5 @@ class YellowFoxD8BookingEventMapperTest {
null, null,
null null
); );
var result = mapper.map(source);
assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_ACTIVITY);
assertThat(result.eventType()).isEqualTo(EventType.DRIVE);
assertThat(result.lifecycle()).isEqualTo(EventLifecycle.SNAPSHOT);
assertThat(result.externalSourceEventId()).isEqualTo("event-1");
assertThat(result.packageInfo().tenantKey()).isEqualTo("tenant-1");
assertThat(result.packageInfo().eventSource().providerKey()).isEqualTo("YELLOWFOX");
assertThat(result.packageInfo().eventSource().sourceKey()).isEqualTo("YELLOWFOX_D8");
assertThat(result.driverRef().driverCard().nation()).isEqualTo("AT");
assertThat(result.vehicleRef().vehicleRegistration().nation()).isEqualTo("AT");
assertThat(result.eventDetails().type()).isEqualTo("DRIVER_ACTIVITY");
assertThat(result.payload().get("yellowFoxEventType").asInt()).isEqualTo(2);
assertThat(result.payload().get("yellowFoxState").asInt()).isEqualTo(3);
} }
} }