diff --git a/src/main/java/at/procon/eventhub/processing/model/UnifiedDiscoveredVehicleRef.java b/src/main/java/at/procon/eventhub/processing/model/UnifiedDiscoveredVehicleRef.java new file mode 100644 index 0000000..7242001 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/model/UnifiedDiscoveredVehicleRef.java @@ -0,0 +1,70 @@ +package at.procon.eventhub.processing.model; + +public record UnifiedDiscoveredVehicleRef( + String sourceVehicleEntityId, + String vin, + String registrationNation, + String registrationNumber +) { + public UnifiedDiscoveredVehicleRef { + sourceVehicleEntityId = normalize(sourceVehicleEntityId); + vin = normalizeUpper(vin); + registrationNation = normalizeUpper(registrationNation); + registrationNumber = normalize(registrationNumber); + } + + public boolean hasAnyReference() { + return sourceVehicleEntityId != null || vin != null || registrationNumber != null; + } + + public boolean matches(UnifiedDiscoveredVehicleRef other) { + if (other == null) { + return false; + } + return same(sourceVehicleEntityId, other.sourceVehicleEntityId) + || same(vin, other.vin) + || (same(registrationNumber, other.registrationNumber) + && same(registrationNation, other.registrationNation)); + } + + public UnifiedDiscoveredVehicleRef merge(UnifiedDiscoveredVehicleRef other) { + if (other == null) { + return this; + } + return new UnifiedDiscoveredVehicleRef( + firstNonBlank(sourceVehicleEntityId, other.sourceVehicleEntityId), + firstNonBlank(vin, other.vin), + firstNonBlank(registrationNation, other.registrationNation), + firstNonBlank(registrationNumber, other.registrationNumber) + ); + } + + public String stableKey() { + if (sourceVehicleEntityId != null) { + return "SOURCE_VEHICLE:" + sourceVehicleEntityId; + } + if (vin != null) { + return "VIN:" + vin; + } + return "REG:" + (registrationNation == null ? "" : registrationNation) + ":" + (registrationNumber == null ? "" : registrationNumber); + } + + private boolean same(String left, String right) { + return left != null && right != null && left.equals(right); + } + + private static String firstNonBlank(String first, String second) { + if (first != null && !first.isBlank()) { + return first; + } + return second != null && !second.isBlank() ? second : null; + } + + private static String normalize(String value) { + return value == null || value.isBlank() ? null : value.trim(); + } + + private static String normalizeUpper(String value) { + return value == null || value.isBlank() ? null : value.trim().toUpperCase(); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/model/UnifiedDriverEventsRequest.java b/src/main/java/at/procon/eventhub/processing/model/UnifiedDriverEventsRequest.java index f1269d7..1fab3d5 100644 --- a/src/main/java/at/procon/eventhub/processing/model/UnifiedDriverEventsRequest.java +++ b/src/main/java/at/procon/eventhub/processing/model/UnifiedDriverEventsRequest.java @@ -123,6 +123,31 @@ public record UnifiedDriverEventsRequest( ); } + public static UnifiedDriverEventsRequest forYellowFoxDbDriver( + String tenantKey, + String driverSourceEntityId, + String driverCardNation, + String driverCardNumber, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo + ) { + return new UnifiedDriverEventsRequest( + UnifiedEventSourceFamily.YELLOWFOX_DB, + null, + null, + tenantKey, + driverSourceEntityId, + driverCardNation, + driverCardNumber, + null, + null, + null, + null, + occurredFrom, + occurredTo + ); + } + public boolean hasDriverSelector() { return hasDriverSelector(driverSourceEntityId, driverCardNumber); } diff --git a/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeEventBackend.java b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeEventBackend.java new file mode 100644 index 0000000..88a2466 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeEventBackend.java @@ -0,0 +1,6 @@ +package at.procon.eventhub.processing.model; + +public enum UnifiedRuntimeEventBackend { + SOURCE_DB, + EVENTHUB_DB +} diff --git a/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeEventBundle.java b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeEventBundle.java new file mode 100644 index 0000000..39f65cf --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeEventBundle.java @@ -0,0 +1,21 @@ +package at.procon.eventhub.processing.model; + +import at.procon.eventhub.dto.EventHubEventDto; +import java.util.List; + +public record UnifiedRuntimeEventBundle( + UnifiedRuntimeProcessingRequest request, + List driverSeedEvents, + List discoveredVehicles, + List expandedVehicleEvents, + List mergedEvents, + List notes +) { + public UnifiedRuntimeEventBundle { + driverSeedEvents = driverSeedEvents == null ? List.of() : List.copyOf(driverSeedEvents); + discoveredVehicles = discoveredVehicles == null ? List.of() : List.copyOf(discoveredVehicles); + expandedVehicleEvents = expandedVehicleEvents == null ? List.of() : List.copyOf(expandedVehicleEvents); + mergedEvents = mergedEvents == null ? List.of() : List.copyOf(mergedEvents); + notes = notes == null ? List.of() : List.copyOf(notes); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java new file mode 100644 index 0000000..b64aab4 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java @@ -0,0 +1,156 @@ +package at.procon.eventhub.processing.model; + +import java.time.OffsetDateTime; +import java.util.Set; + +public record UnifiedRuntimeProcessingRequest( + String tenantKey, + Set sourceFamilies, + UnifiedRuntimeEventBackend eventBackend, + String driverSourceEntityId, + String driverCardNation, + String driverCardNumber, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, + boolean expandVehicleEvents, + int vehicleExpansionPaddingMinutes +) { + public UnifiedRuntimeProcessingRequest { + tenantKey = normalize(tenantKey); + driverSourceEntityId = normalize(driverSourceEntityId); + driverCardNation = normalizeUpper(driverCardNation); + driverCardNumber = normalize(driverCardNumber); + if (tenantKey == null) { + throw new IllegalArgumentException("tenantKey must not be blank"); + } + if (sourceFamilies == null || sourceFamilies.isEmpty()) { + throw new IllegalArgumentException("sourceFamilies must not be empty"); + } + sourceFamilies = Set.copyOf(sourceFamilies); + eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend; + if (driverSourceEntityId == null && driverCardNumber == null) { + throw new IllegalArgumentException("At least one driver selector must be provided."); + } + if (occurredFrom != null && occurredTo != null && occurredTo.isBefore(occurredFrom)) { + throw new IllegalArgumentException("occurredTo must not be before occurredFrom"); + } + if (vehicleExpansionPaddingMinutes < 0) { + throw new IllegalArgumentException("vehicleExpansionPaddingMinutes must not be negative"); + } + } + + public static UnifiedRuntimeProcessingRequest forDriver( + String tenantKey, + Set sourceFamilies, + String driverSourceEntityId, + String driverCardNation, + String driverCardNumber, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo + ) { + return forDriver( + tenantKey, + sourceFamilies, + driverSourceEntityId, + driverCardNation, + driverCardNumber, + occurredFrom, + occurredTo, + UnifiedRuntimeEventBackend.SOURCE_DB, + true, + 0 + ); + } + + public static UnifiedRuntimeProcessingRequest forDriver( + String tenantKey, + Set sourceFamilies, + String driverSourceEntityId, + String driverCardNation, + String driverCardNumber, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, + boolean expandVehicleEvents, + int vehicleExpansionPaddingMinutes + ) { + return forDriver( + tenantKey, + sourceFamilies, + driverSourceEntityId, + driverCardNation, + driverCardNumber, + occurredFrom, + occurredTo, + UnifiedRuntimeEventBackend.SOURCE_DB, + expandVehicleEvents, + vehicleExpansionPaddingMinutes + ); + } + + public static UnifiedRuntimeProcessingRequest forDriver( + String tenantKey, + Set sourceFamilies, + String driverSourceEntityId, + String driverCardNation, + String driverCardNumber, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, + UnifiedRuntimeEventBackend eventBackend, + boolean expandVehicleEvents, + int vehicleExpansionPaddingMinutes + ) { + return new UnifiedRuntimeProcessingRequest( + tenantKey, + sourceFamilies, + eventBackend, + driverSourceEntityId, + driverCardNation, + driverCardNumber, + occurredFrom, + occurredTo, + expandVehicleEvents, + vehicleExpansionPaddingMinutes + ); + } + + public static UnifiedRuntimeProcessingRequest forDriverFromEventHub( + String tenantKey, + Set sourceFamilies, + String driverSourceEntityId, + String driverCardNation, + String driverCardNumber, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, + boolean expandVehicleEvents, + int vehicleExpansionPaddingMinutes + ) { + return new UnifiedRuntimeProcessingRequest( + tenantKey, + sourceFamilies, + UnifiedRuntimeEventBackend.EVENTHUB_DB, + driverSourceEntityId, + driverCardNation, + driverCardNumber, + occurredFrom, + occurredTo, + expandVehicleEvents, + vehicleExpansionPaddingMinutes + ); + } + + public OffsetDateTime vehicleOccurredFrom() { + return occurredFrom == null ? null : occurredFrom.minusMinutes(vehicleExpansionPaddingMinutes); + } + + public OffsetDateTime vehicleOccurredTo() { + return occurredTo == null ? null : occurredTo.plusMinutes(vehicleExpansionPaddingMinutes); + } + + private static String normalize(String value) { + return value == null || value.isBlank() ? null : value.trim(); + } + + private static String normalizeUpper(String value) { + return value == null || value.isBlank() ? null : value.trim().toUpperCase(); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/EventHubRuntimeEventLoader.java b/src/main/java/at/procon/eventhub/processing/service/EventHubRuntimeEventLoader.java new file mode 100644 index 0000000..e684bca --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/EventHubRuntimeEventLoader.java @@ -0,0 +1,114 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.processing.model.UnifiedVehicleEventsRequest; +import java.util.List; +import org.springframework.stereotype.Component; + +@Component +public class EventHubRuntimeEventLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader { + + private final UnifiedDriverEventSourceService driverEventSourceService; + private final UnifiedVehicleEventSourceService vehicleEventSourceService; + + public EventHubRuntimeEventLoader( + UnifiedDriverEventSourceService driverEventSourceService, + UnifiedVehicleEventSourceService vehicleEventSourceService + ) { + this.driverEventSourceService = driverEventSourceService; + this.vehicleEventSourceService = vehicleEventSourceService; + } + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB + && (sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB + || sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB); + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + throw new UnsupportedOperationException("Source family must be specified when loading EventHub-backed runtime driver events."); + } + + @Override + public List loadVehicleEvents( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef + ) { + throw new UnsupportedOperationException("Source family must be specified when loading EventHub-backed runtime vehicle events."); + } + + public List loadDriverEvents( + UnifiedRuntimeProcessingRequest request, + UnifiedEventSourceFamily sourceFamily + ) { + return driverEventSourceService.loadDriverEvents(driverRequest(request, sourceFamily)); + } + + public List loadVehicleEvents( + UnifiedRuntimeProcessingRequest request, + UnifiedEventSourceFamily sourceFamily, + UnifiedDiscoveredVehicleRef vehicleRef + ) { + return vehicleEventSourceService.loadVehicleEvents(vehicleRequest(request, sourceFamily, vehicleRef)); + } + + private UnifiedDriverEventsRequest driverRequest( + UnifiedRuntimeProcessingRequest request, + UnifiedEventSourceFamily sourceFamily + ) { + return switch (sourceFamily) { + case TACHOGRAPH_DB -> UnifiedDriverEventsRequest.forTachographDbDriver( + request.tenantKey(), + request.driverSourceEntityId(), + request.driverCardNation(), + request.driverCardNumber(), + request.occurredFrom(), + request.occurredTo() + ); + case YELLOWFOX_DB -> UnifiedDriverEventsRequest.forYellowFoxDbDriver( + request.tenantKey(), + request.driverSourceEntityId(), + request.driverCardNation(), + request.driverCardNumber(), + request.occurredFrom(), + request.occurredTo() + ); + default -> throw new IllegalArgumentException("EventHub runtime backend does not support source family " + sourceFamily + "."); + }; + } + + private UnifiedVehicleEventsRequest vehicleRequest( + UnifiedRuntimeProcessingRequest request, + UnifiedEventSourceFamily sourceFamily, + UnifiedDiscoveredVehicleRef vehicleRef + ) { + return switch (sourceFamily) { + case TACHOGRAPH_DB -> UnifiedVehicleEventsRequest.forTachographDb( + request.tenantKey(), + vehicleRef.sourceVehicleEntityId(), + vehicleRef.vin(), + vehicleRef.registrationNation(), + vehicleRef.registrationNumber(), + request.vehicleOccurredFrom(), + request.vehicleOccurredTo() + ); + case YELLOWFOX_DB -> UnifiedVehicleEventsRequest.forYellowFoxDb( + request.tenantKey(), + vehicleRef.sourceVehicleEntityId(), + vehicleRef.vin(), + vehicleRef.registrationNation(), + vehicleRef.registrationNumber(), + request.vehicleOccurredFrom(), + request.vehicleOccurredTo() + ); + default -> throw new IllegalArgumentException("EventHub runtime backend does not support source family " + sourceFamily + "."); + }; + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/RuntimeDriverEventLoader.java b/src/main/java/at/procon/eventhub/processing/service/RuntimeDriverEventLoader.java new file mode 100644 index 0000000..fe239bd --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/RuntimeDriverEventLoader.java @@ -0,0 +1,13 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import java.util.List; + +public interface RuntimeDriverEventLoader { + + boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily); + + List loadDriverEvents(UnifiedRuntimeProcessingRequest request); +} diff --git a/src/main/java/at/procon/eventhub/processing/service/RuntimeVehicleEventLoader.java b/src/main/java/at/procon/eventhub/processing/service/RuntimeVehicleEventLoader.java new file mode 100644 index 0000000..a3d5bc5 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/RuntimeVehicleEventLoader.java @@ -0,0 +1,17 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import java.util.List; + +public interface RuntimeVehicleEventLoader { + + boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily); + + List loadVehicleEvents( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef + ); +} diff --git a/src/main/java/at/procon/eventhub/processing/service/TachographDbRuntimeEventLoader.java b/src/main/java/at/procon/eventhub/processing/service/TachographDbRuntimeEventLoader.java new file mode 100644 index 0000000..a357117 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/TachographDbRuntimeEventLoader.java @@ -0,0 +1,254 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.extraction.ExtractionContext; +import at.procon.eventhub.importing.extraction.ExtractionDefinition; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.tachograph.dto.TachographImportRequest; +import at.procon.eventhub.tachograph.service.TachographExtractionDefinitionRegistry; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.util.StreamUtils; + +@Component +@ConditionalOnBean(name = "tachographNamedParameterJdbcTemplate") +@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.tachograph.datasource.jdbc-url:}')") +public class TachographDbRuntimeEventLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader { + + private final NamedParameterJdbcTemplate jdbcTemplate; + private final TachographExtractionDefinitionRegistry definitionRegistry; + private final ResourceLoader resourceLoader; + + public TachographDbRuntimeEventLoader( + @Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate, + TachographExtractionDefinitionRegistry definitionRegistry, + ResourceLoader resourceLoader + ) { + this.jdbcTemplate = jdbcTemplate; + this.definitionRegistry = definitionRegistry; + this.resourceLoader = resourceLoader; + } + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB + && sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + List result = new ArrayList<>(); + for (ExtractionDefinition definition : driverDefinitions()) { + result.addAll(queryDefinition( + definition, + request, + null, + request.occurredFrom(), + request.occurredTo() + )); + } + return List.copyOf(result); + } + + @Override + public List loadVehicleEvents( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef + ) { + List result = new ArrayList<>(); + for (ExtractionDefinition definition : vehicleDefinitions()) { + result.addAll(queryDefinition( + definition, + request, + vehicleRef, + request.vehicleOccurredFrom(), + request.vehicleOccurredTo() + )); + } + return List.copyOf(result); + } + + private List queryDefinition( + ExtractionDefinition definition, + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo + ) { + List events = new ArrayList<>(); + String sql = wrapSql(loadSql(definition.sqlResource()), request, vehicleRef); + Map params = parameters(request, vehicleRef, occurredFrom, occurredTo); + ExtractionContext context = context(definition, request, occurredFrom, occurredTo); + jdbcTemplate.query(sql, params, rs -> { + int rowNum = 0; + while (rs.next()) { + events.add(definition.rowMapper().map(rs, rowNum, context)); + rowNum++; + } + return null; + }); + return events; + } + + private List> driverDefinitions() { + return definitionRegistry.definitions().stream() + .filter(definition -> !"VEHICLE".equals(definition.entityAxis())) + .toList(); + } + + private List> vehicleDefinitions() { + return definitionRegistry.definitions().stream() + .filter(definition -> !"DRIVER".equals(definition.entityAxis())) + .toList(); + } + + private ExtractionContext context( + ExtractionDefinition definition, + UnifiedRuntimeProcessingRequest request, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo + ) { + EventSourceDto eventSource = eventSourceFor(definition.sourceKind()); + ImportScopeDto scope = ImportScopeDto.tenantAll(occurredFrom, occurredTo); + TachographImportRequest importRequest = new TachographImportRequest( + request.tenantKey(), + eventSource, + null, + scope, + EnumSet.of(definition.eventFamily()), + ImportMode.REPROCESS, + false, + AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP + ); + ImportPlanItemDto planItem = new ImportPlanItemDto( + definition.eventFamily(), + definition.sourceKind(), + definition.code(), + List.of(), + definition.entityAxis(), + "Runtime direct tachograph load", + AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP + ); + ImportTimeChunkDto chunk = new ImportTimeChunkDto(1, occurredFrom, occurredTo); + EventHubPackageRequest packageInfo = new EventHubPackageRequest( + request.tenantKey(), + eventSource, + null, + scope, + definition.eventFamily().name(), + occurredFrom == null ? null : occurredFrom.toLocalDate(), + "RUNTIME:TACHOGRAPH:" + definition.code() + ":" + UUID.randomUUID() + ); + return new ExtractionContext<>( + UUID.randomUUID(), + UUID.randomUUID(), + 0, + importRequest, + planItem, + chunk, + eventSource, + packageInfo + ); + } + + private EventSourceDto eventSourceFor(String sourceKind) { + String sourceKey = switch (sourceKind) { + case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT"; + case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD"; + default -> "TACHOGRAPH_" + sourceKind; + }; + return new EventSourceDto("TACHOGRAPH", sourceKind, sourceKey, null, null, null); + } + + private Map parameters( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo + ) { + Map params = new HashMap<>(); + params.put("occurredFrom", occurredFrom); + params.put("occurredTo", occurredTo); + params.put("organisationId", null); + params.put("sourcePackageWatermarkEnabled", 0); + params.put("lastSourcePackageImportedAt", null); + params.put("lastSourcePackageIdNumeric", null); + params.put("driverSourceEntityId", request.driverSourceEntityId()); + params.put("driverCardNation", request.driverCardNation()); + params.put("driverCardNumber", request.driverCardNumber()); + params.put("vehicleSourceEntityId", vehicleRef == null ? null : vehicleRef.sourceVehicleEntityId()); + params.put("vehicleVin", vehicleRef == null ? null : vehicleRef.vin()); + params.put("vehicleRegistrationNation", vehicleRef == null ? null : vehicleRef.registrationNation()); + params.put("vehicleRegistrationNumber", vehicleRef == null ? null : vehicleRef.registrationNumber()); + return params; + } + + private String wrapSql( + String baseSql, + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef + ) { + StringBuilder sql = new StringBuilder("select * from ("); + sql.append(baseSql); + sql.append("\n) runtime where 1 = 1"); + if (request.driverSourceEntityId() != null) { + sql.append("\n and runtime.driver_source_entity_id = :driverSourceEntityId"); + } + if (request.driverCardNumber() != null) { + sql.append("\n and runtime.driver_card_number = :driverCardNumber"); + if (request.driverCardNation() != null) { + sql.append("\n and runtime.driver_card_nation = :driverCardNation"); + } + } + if (vehicleRef != null) { + if (vehicleRef.sourceVehicleEntityId() != null) { + sql.append("\n and runtime.vehicle_source_entity_id = :vehicleSourceEntityId"); + } + if (vehicleRef.vin() != null) { + sql.append("\n and runtime.vehicle_vin = :vehicleVin"); + } + if (vehicleRef.registrationNumber() != null) { + sql.append("\n and runtime.vehicle_registration_number = :vehicleRegistrationNumber"); + if (vehicleRef.registrationNation() != null) { + sql.append("\n and runtime.vehicle_registration_nation = :vehicleRegistrationNation"); + } + } + } + sql.append("\norder by runtime.occurred_at, runtime.external_source_event_id"); + return sql.toString(); + } + + private String loadSql(String sqlResource) { + Resource resource = resourceLoader.getResource(sqlResource); + try (var in = resource.getInputStream()) { + return StreamUtils.copyToString(in, StandardCharsets.UTF_8); + } catch (IOException ex) { + throw new IllegalStateException("Failed to load tachograph runtime SQL " + sqlResource, ex); + } + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java new file mode 100644 index 0000000..0b03889 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java @@ -0,0 +1,165 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; +import org.springframework.stereotype.Service; + +@Service +public class UnifiedRuntimeEventAssemblyService { + + private final List driverEventLoaders; + private final List vehicleEventLoaders; + + public UnifiedRuntimeEventAssemblyService( + List driverEventLoaders, + List vehicleEventLoaders + ) { + this.driverEventLoaders = List.copyOf(driverEventLoaders); + this.vehicleEventLoaders = List.copyOf(vehicleEventLoaders); + } + + public UnifiedRuntimeEventBundle assembleDriverScopedEvents(UnifiedRuntimeProcessingRequest request) { + List driverSeedEvents = loadDriverSeedEvents(request); + List discoveredVehicles = discoverVehicles(driverSeedEvents); + List expandedVehicleEvents = request.expandVehicleEvents() + ? loadExpandedVehicleEvents(request, discoveredVehicles) + : List.of(); + List mergedEvents = deduplicateAndSort(driverSeedEvents, expandedVehicleEvents); + + List notes = new ArrayList<>(); + notes.add(request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB + ? "Driver seed events were loaded from the local EventHub event store." + : "Driver seed events were loaded directly from the selected source databases."); + if (request.expandVehicleEvents()) { + notes.add("Vehicle expansion loaded additional events for vehicles discovered in the driver seed set."); + notes.add("Vehicle expansion padding minutes: " + request.vehicleExpansionPaddingMinutes() + "."); + } else { + notes.add("Vehicle expansion was disabled for this runtime request."); + } + return new UnifiedRuntimeEventBundle( + request, + driverSeedEvents, + discoveredVehicles, + expandedVehicleEvents, + mergedEvents, + notes + ); + } + + private List loadDriverSeedEvents(UnifiedRuntimeProcessingRequest request) { + List result = new ArrayList<>(); + for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) { + RuntimeDriverEventLoader loader = driverLoader(request, sourceFamily); + if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) { + result.addAll(eventHubLoader.loadDriverEvents(request, sourceFamily)); + } else { + result.addAll(loader.loadDriverEvents(request)); + } + } + return deduplicateAndSort(result, List.of()); + } + + private List loadExpandedVehicleEvents( + UnifiedRuntimeProcessingRequest request, + List discoveredVehicles + ) { + List result = new ArrayList<>(); + for (UnifiedDiscoveredVehicleRef vehicleRef : discoveredVehicles) { + for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) { + RuntimeVehicleEventLoader loader = vehicleLoader(request, sourceFamily); + if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) { + result.addAll(eventHubLoader.loadVehicleEvents(request, sourceFamily, vehicleRef)); + } else { + result.addAll(loader.loadVehicleEvents(request, vehicleRef)); + } + } + } + return deduplicateAndSort(result, List.of()); + } + + private List discoverVehicles(List events) { + List result = new ArrayList<>(); + for (EventHubEventDto event : events) { + VehicleRefDto vehicleRef = event.vehicleRef(); + if (vehicleRef == null || !vehicleRef.hasAnyReference()) { + continue; + } + UnifiedDiscoveredVehicleRef candidate = new UnifiedDiscoveredVehicleRef( + vehicleRef.sourceVehicleEntityId(), + vehicleRef.vin(), + vehicleRef.vehicleRegistration() == null ? null : vehicleRef.vehicleRegistration().nation(), + vehicleRef.vehicleRegistration() == null ? null : vehicleRef.vehicleRegistration().number() + ); + if (!candidate.hasAnyReference()) { + continue; + } + boolean merged = false; + for (int i = 0; i < result.size(); i++) { + UnifiedDiscoveredVehicleRef existing = result.get(i); + if (existing.matches(candidate)) { + result.set(i, existing.merge(candidate)); + merged = true; + break; + } + } + if (!merged) { + result.add(candidate); + } + } + result.sort(Comparator.comparing(UnifiedDiscoveredVehicleRef::stableKey)); + return List.copyOf(result); + } + + private List deduplicateAndSort( + List left, + List right + ) { + LinkedHashMap byKey = new LinkedHashMap<>(); + appendDeduplicated(byKey, left); + appendDeduplicated(byKey, right); + return byKey.values().stream() + .sorted(Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(event -> event.eventDomain().name()) + .thenComparing(event -> event.eventType().name()) + .thenComparing(event -> event.lifecycle().name()) + .thenComparing(EventHubEventDto::externalSourceEventId)) + .toList(); + } + + private void appendDeduplicated(LinkedHashMap byKey, List events) { + for (EventHubEventDto event : events) { + byKey.putIfAbsent(dedupKey(event), event); + } + } + + private String dedupKey(EventHubEventDto event) { + String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null + ? event.packageInfo().eventSource().stableKey() + : "NO_SOURCE"; + return sourceKey + "|" + event.externalSourceEventId(); + } + + private RuntimeDriverEventLoader driverLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return driverEventLoaders.stream() + .filter(loader -> loader.supports(request, sourceFamily)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No runtime driver event loader is registered for source family " + sourceFamily + ".")); + } + + private RuntimeVehicleEventLoader vehicleLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return vehicleEventLoaders.stream() + .filter(loader -> loader.supports(request, sourceFamily)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No runtime vehicle event loader is registered for source family " + sourceFamily + ".")); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/YellowFoxDbRuntimeEventLoader.java b/src/main/java/at/procon/eventhub/processing/service/YellowFoxDbRuntimeEventLoader.java new file mode 100644 index 0000000..efbc962 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/YellowFoxDbRuntimeEventLoader.java @@ -0,0 +1,181 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; +import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingEventMapper; +import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingRowMapper; +import at.procon.eventhub.yellowfox.service.YellowFoxD8IgnitionTransitionDetector; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.util.StreamUtils; + +@Component +@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.yellow-fox.datasource.jdbc-url:}')") +public class YellowFoxDbRuntimeEventLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader { + + private static final String SYNTHETIC_NATION = "YELLOWFOX"; + + private final NamedParameterJdbcTemplate jdbcTemplate; + private final ResourceLoader resourceLoader; + private final YellowFoxD8BookingRowMapper rowMapper; + private final YellowFoxD8BookingEventMapper eventMapper; + private final YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector; + private final EventHubProperties properties; + + public YellowFoxDbRuntimeEventLoader( + @Qualifier("yellowFoxNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate, + ResourceLoader resourceLoader, + YellowFoxD8BookingRowMapper rowMapper, + YellowFoxD8BookingEventMapper eventMapper, + YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector, + EventHubProperties properties + ) { + this.jdbcTemplate = jdbcTemplate; + this.resourceLoader = resourceLoader; + this.rowMapper = rowMapper; + this.eventMapper = eventMapper; + this.ignitionTransitionDetector = ignitionTransitionDetector; + this.properties = properties; + } + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB + && sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + if (request.driverCardNation() != null && !SYNTHETIC_NATION.equalsIgnoreCase(request.driverCardNation())) { + return List.of(); + } + return query(request, null, request.occurredFrom(), request.occurredTo()); + } + + @Override + public List loadVehicleEvents( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef + ) { + if (vehicleRef.registrationNation() != null && !SYNTHETIC_NATION.equalsIgnoreCase(vehicleRef.registrationNation())) { + return List.of(); + } + return query(request, vehicleRef, request.vehicleOccurredFrom(), request.vehicleOccurredTo()); + } + + private List query( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo + ) { + String sql = applyFilters(loadSqlTemplate(), filters(request, vehicleRef, occurredFrom, occurredTo)); + Map params = parameters(request, vehicleRef, occurredFrom, occurredTo); + List events = new ArrayList<>(); + YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = + ignitionTransitionDetector.newSession(properties.getYellowFox().isEmitInitialIgnitionSnapshot()); + + jdbcTemplate.query(sql, params, rs -> { + while (rs.next()) { + YellowFoxD8BookingDto booking = rowMapper.map(rs, request.tenantKey(), null, null); + if (!hasEventReference(booking)) { + continue; + } + events.add(eventMapper.map(booking)); + EventHubEventDto ignitionEvent = ignitionSession.detect(booking); + if (ignitionEvent != null) { + events.add(ignitionEvent); + } + } + return null; + }); + return List.copyOf(events); + } + + private String filters( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo + ) { + StringBuilder filters = new StringBuilder("where 1 = 1"); + if (occurredFrom != null) { + filters.append("\n and b.utc >= :occurredFrom"); + } + if (occurredTo != null) { + filters.append("\n and b.utc < :occurredTo"); + } + if (request.driverSourceEntityId() != null) { + filters.append("\n and cast(b.driver_id as varchar(128)) = :driverSourceEntityId"); + } + if (request.driverCardNumber() != null) { + filters.append("\n and left(trim(d.drivers_card), 14) = :driverCardNumber"); + } + if (vehicleRef != null) { + if (vehicleRef.sourceVehicleEntityId() != null) { + filters.append("\n and cast(b.vehicle_id as varchar(128)) = :vehicleSourceEntityId"); + } + if (vehicleRef.vin() != null) { + filters.append("\n and v.vin = :vehicleVin"); + } + if (vehicleRef.registrationNumber() != null) { + filters.append("\n and v.vrn = :vehicleRegistrationNumber"); + } + } + return filters.toString(); + } + + private Map parameters( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo + ) { + Map params = new HashMap<>(); + params.put("occurredFrom", occurredFrom); + params.put("occurredTo", occurredTo); + params.put("driverSourceEntityId", request.driverSourceEntityId()); + params.put("driverCardNumber", request.driverCardNumber()); + params.put("vehicleSourceEntityId", vehicleRef == null ? null : vehicleRef.sourceVehicleEntityId()); + params.put("vehicleVin", vehicleRef == null ? null : vehicleRef.vin()); + params.put("vehicleRegistrationNumber", vehicleRef == null ? null : vehicleRef.registrationNumber()); + return params; + } + + private boolean hasEventReference(YellowFoxD8BookingDto booking) { + return (booking.driverRef() != null && booking.driverRef().hasAnyReference()) + || (booking.vehicleRef() != null && booking.vehicleRef().hasAnyReference()); + } + + private String applyFilters(String sqlTemplate, String filters) { + if (!sqlTemplate.contains("/*__FILTERS__*/")) { + throw new IllegalStateException("YellowFox D8 runtime SQL template is missing filter marker"); + } + return sqlTemplate.replace("/*__FILTERS__*/", filters); + } + + private String loadSqlTemplate() { + Resource resource = resourceLoader.getResource("classpath:sql/yellowfox/d8-bookings.sql"); + try (var in = resource.getInputStream()) { + return StreamUtils.copyToString(in, StandardCharsets.UTF_8); + } catch (IOException ex) { + throw new IllegalStateException("Failed to load YellowFox D8 runtime SQL", ex); + } + } +} diff --git a/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java b/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java new file mode 100644 index 0000000..22e5d56 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java @@ -0,0 +1,89 @@ +package at.procon.eventhub.processing.model; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.OffsetDateTime; +import java.util.Set; +import org.junit.jupiter.api.Test; + +class UnifiedRuntimeProcessingRequestTest { + + @Test + void buildsDriverScopedRuntimeRequest() { + UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriver( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB), + "DRIVER:42", + "AT", + "123", + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z") + ); + + assertThat(request.tenantKey()).isEqualTo("default"); + assertThat(request.sourceFamilies()).containsExactlyInAnyOrder( + UnifiedEventSourceFamily.TACHOGRAPH_DB, + UnifiedEventSourceFamily.YELLOWFOX_DB + ); + assertThat(request.driverSourceEntityId()).isEqualTo("DRIVER:42"); + assertThat(request.driverCardNation()).isEqualTo("AT"); + assertThat(request.driverCardNumber()).isEqualTo("123"); + assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.SOURCE_DB); + assertThat(request.expandVehicleEvents()).isTrue(); + assertThat(request.vehicleOccurredFrom()).isEqualTo(OffsetDateTime.parse("2026-05-01T00:00:00Z")); + } + + @Test + void canDisableVehicleExpansionExplicitly() { + UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriver( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), + "DRIVER:42", + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + false, + 30 + ); + + assertThat(request.expandVehicleEvents()).isFalse(); + assertThat(request.vehicleExpansionPaddingMinutes()).isEqualTo(30); + } + + @Test + void canBuildEventHubBackedRuntimeRequest() { + UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriverFromEventHub( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB), + "DRIVER:42", + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 15 + ); + + assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.EVENTHUB_DB); + assertThat(request.vehicleExpansionPaddingMinutes()).isEqualTo(15); + } + + @Test + void rejectsRequestWithoutDriverSelector() { + assertThatThrownBy(() -> new UnifiedRuntimeProcessingRequest( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), + UnifiedRuntimeEventBackend.SOURCE_DB, + null, + null, + null, + null, + null, + true, + 0 + )).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("At least one driver selector"); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/service/EventHubRuntimeEventLoaderTest.java b/src/test/java/at/procon/eventhub/processing/service/EventHubRuntimeEventLoaderTest.java new file mode 100644 index 0000000..129b6d6 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/service/EventHubRuntimeEventLoaderTest.java @@ -0,0 +1,159 @@ +package at.procon.eventhub.processing.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.processing.model.UnifiedVehicleEventsRequest; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.junit.jupiter.api.Test; + +class EventHubRuntimeEventLoaderTest { + + @Test + void supportsOnlyEventHubBackendForTachographAndYellowFox() { + EventHubRuntimeEventLoader loader = new EventHubRuntimeEventLoader( + new UnifiedDriverEventSourceService(List.of(new CapturingDriverSource())), + new UnifiedVehicleEventSourceService(List.of(new CapturingVehicleSource())) + ); + + UnifiedRuntimeProcessingRequest eventHubRequest = UnifiedRuntimeProcessingRequest.forDriverFromEventHub( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), + "DRIVER:42", + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 15 + ); + UnifiedRuntimeProcessingRequest sourceDbRequest = UnifiedRuntimeProcessingRequest.forDriver( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), + "DRIVER:42", + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z") + ); + + assertThat(loader.supports(eventHubRequest, UnifiedEventSourceFamily.TACHOGRAPH_DB)).isTrue(); + assertThat(loader.supports(eventHubRequest, UnifiedEventSourceFamily.YELLOWFOX_DB)).isTrue(); + assertThat(loader.supports(sourceDbRequest, UnifiedEventSourceFamily.TACHOGRAPH_DB)).isFalse(); + assertThat(loader.supports(eventHubRequest, UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION)).isFalse(); + } + + @Test + void buildsDriverRequestsForLocalEventHubSources() { + CapturingDriverSource driverSource = new CapturingDriverSource(); + EventHubRuntimeEventLoader loader = new EventHubRuntimeEventLoader( + new UnifiedDriverEventSourceService(List.of(driverSource)), + new UnifiedVehicleEventSourceService(List.of(new CapturingVehicleSource())) + ); + UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriverFromEventHub( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB), + "DRIVER:42", + "AT", + "123", + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 15 + ); + + loader.loadDriverEvents(request, UnifiedEventSourceFamily.TACHOGRAPH_DB); + loader.loadDriverEvents(request, UnifiedEventSourceFamily.YELLOWFOX_DB); + + assertThat(driverSource.requests).hasSize(2); + assertThat(driverSource.requests).extracting(UnifiedDriverEventsRequest::sourceFamily) + .containsExactly(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB); + assertThat(driverSource.requests).allSatisfy(driverRequest -> { + assertThat(driverRequest.tenantKey()).isEqualTo("default"); + assertThat(driverRequest.driverSourceEntityId()).isEqualTo("DRIVER:42"); + assertThat(driverRequest.driverCardNation()).isEqualTo("AT"); + assertThat(driverRequest.driverCardNumber()).isEqualTo("123"); + assertThat(driverRequest.occurredFrom()).isEqualTo(OffsetDateTime.parse("2026-05-01T00:00:00Z")); + assertThat(driverRequest.occurredTo()).isEqualTo(OffsetDateTime.parse("2026-05-02T00:00:00Z")); + }); + } + + @Test + void buildsVehicleRequestsForLocalEventHubSourcesWithPadding() { + CapturingVehicleSource vehicleSource = new CapturingVehicleSource(); + EventHubRuntimeEventLoader loader = new EventHubRuntimeEventLoader( + new UnifiedDriverEventSourceService(List.of(new CapturingDriverSource())), + new UnifiedVehicleEventSourceService(List.of(vehicleSource)) + ); + UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriverFromEventHub( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB), + "DRIVER:42", + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 15 + ); + UnifiedDiscoveredVehicleRef vehicleRef = new UnifiedDiscoveredVehicleRef("VEH-1", "VIN-1", "AT", "W-1"); + + loader.loadVehicleEvents(request, UnifiedEventSourceFamily.TACHOGRAPH_DB, vehicleRef); + loader.loadVehicleEvents(request, UnifiedEventSourceFamily.YELLOWFOX_DB, vehicleRef); + + assertThat(vehicleSource.requests).hasSize(2); + assertThat(vehicleSource.requests).extracting(UnifiedVehicleEventsRequest::sourceFamily) + .containsExactly(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB); + assertThat(vehicleSource.requests).allSatisfy(vehicleRequest -> { + assertThat(vehicleRequest.tenantKey()).isEqualTo("default"); + assertThat(vehicleRequest.vehicleSourceEntityId()).isEqualTo("VEH-1"); + assertThat(vehicleRequest.vin()).isEqualTo("VIN-1"); + assertThat(vehicleRequest.registrationNation()).isEqualTo("AT"); + assertThat(vehicleRequest.registrationNumber()).isEqualTo("W-1"); + assertThat(vehicleRequest.occurredFrom()).isEqualTo(OffsetDateTime.parse("2026-04-30T23:45:00Z")); + assertThat(vehicleRequest.occurredTo()).isEqualTo(OffsetDateTime.parse("2026-05-02T00:15:00Z")); + }); + } + + private static final class CapturingDriverSource implements UnifiedDriverEventSource { + + private final List requests = new ArrayList<>(); + + @Override + public boolean supports(UnifiedDriverEventsRequest request) { + return request.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_DB + || request.sourceFamily() == UnifiedEventSourceFamily.YELLOWFOX_DB; + } + + @Override + public List loadDriverEvents(UnifiedDriverEventsRequest request) { + requests.add(request); + return List.of(); + } + } + + private static final class CapturingVehicleSource implements UnifiedVehicleEventSource { + + private final List requests = new ArrayList<>(); + + @Override + public boolean supports(UnifiedVehicleEventsRequest request) { + return request.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_DB + || request.sourceFamily() == UnifiedEventSourceFamily.YELLOWFOX_DB; + } + + @Override + public List loadVehicleEvents(UnifiedVehicleEventsRequest request) { + requests.add(request); + return List.of(); + } + } +} diff --git a/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java b/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java new file mode 100644 index 0000000..e48710f --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java @@ -0,0 +1,164 @@ +package at.procon.eventhub.processing.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class UnifiedRuntimeEventAssemblyServiceTest { + + @Test + void assemblesSeedAndExpandedVehicleEventsInMemory() { + UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService( + List.of(new FakeLoader()), + List.of(new FakeLoader()) + ); + + UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents( + new UnifiedRuntimeProcessingRequest( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), + UnifiedRuntimeEventBackend.SOURCE_DB, + "DRIVER:42", + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 15 + ) + ); + + assertThat(bundle.driverSeedEvents()).hasSize(2); + assertThat(bundle.discoveredVehicles()).hasSize(2); + assertThat(bundle.discoveredVehicles()).extracting(UnifiedDiscoveredVehicleRef::stableKey) + .containsExactly("SOURCE_VEHICLE:VEH-1", "VIN:VIN-2"); + assertThat(bundle.expandedVehicleEvents()).hasSize(2); + assertThat(bundle.mergedEvents()).hasSize(3); + assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("SEED-1", "VEHICLE-EXPANDED", "SEED-2"); + } + + @Test + void canSkipVehicleExpansion() { + UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService( + List.of(new FakeLoader()), + List.of(new FakeLoader()) + ); + + UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents( + UnifiedRuntimeProcessingRequest.forDriver( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), + "DRIVER:42", + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + false, + 0 + ) + ); + + assertThat(bundle.driverSeedEvents()).hasSize(2); + assertThat(bundle.discoveredVehicles()).hasSize(2); + assertThat(bundle.expandedVehicleEvents()).isEmpty(); + assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("SEED-1", "SEED-2"); + } + + private static final class FakeLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader { + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB + && sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + return List.of( + event("SEED-1", OffsetDateTime.parse("2026-05-01T08:00:00Z"), "VEH-1", "VIN-1", "AT", "W-1"), + event("SEED-2", OffsetDateTime.parse("2026-05-01T09:00:00Z"), null, "VIN-2", "AT", "W-2") + ); + } + + @Override + public List loadVehicleEvents( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef + ) { + if ("SOURCE_VEHICLE:VEH-1".equals(vehicleRef.stableKey())) { + return List.of( + event("SEED-1", OffsetDateTime.parse("2026-05-01T08:00:00Z"), "VEH-1", "VIN-1", "AT", "W-1"), + event("VEHICLE-EXPANDED", OffsetDateTime.parse("2026-05-01T08:30:00Z"), "VEH-1", "VIN-1", "AT", "W-1") + ); + } + return List.of(); + } + + private EventHubEventDto event( + String externalId, + OffsetDateTime occurredAt, + String sourceVehicleId, + String vin, + String registrationNation, + String registrationNumber + ) { + EventSourceDto source = new EventSourceDto("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD", null, null, null); + return new EventHubEventDto( + UUID.randomUUID(), + externalId, + new DriverRefDto("DRIVER:42", null), + new VehicleRefDto( + sourceVehicleId, + vin, + sourceVehicleId, + new VehicleRegistrationRefDto(registrationNation, registrationNumber) + ), + occurredAt, + null, + occurredAt, + EventDomain.DRIVER_ACTIVITY, + EventType.DRIVE, + EventLifecycle.START, + null, + null, + null, + null, + null, + false, + new at.procon.eventhub.dto.EventHubPackageRequest( + "default", + source, + null, + ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z") + ), + EventDomain.DRIVER_ACTIVITY.name(), + LocalDate.parse("2026-05-01"), + source.stableKey() + ":2026-05-01" + ) + ); + } + } +}