diff --git a/docs/runtime-tachograph-representation-parity.md b/docs/runtime-tachograph-representation-parity.md new file mode 100644 index 0000000..6e94575 --- /dev/null +++ b/docs/runtime-tachograph-representation-parity.md @@ -0,0 +1,38 @@ +# Tachograph DB / file-session runtime parity + +The runtime pipeline treats direct tachograph file-session events and events loaded from the tachograph database as two representations of the same tachograph facts. + +## Canonical semantic boundary + +`RuntimeTachographEventSemantics` normalizes representation-only differences without modifying the original event: + +- file-session and DB source systems are exposed as `TACHOGRAPH`; +- extraction codes are read from DB raw metadata or inferred from source kind and event domain; +- `PLACE START` and `PLACE BEGIN` share the semantic lifecycle `BEGIN` for mixing. + +The raw lifecycle, source package, payload, and external source event ID remain unchanged for audit and provenance. + +## Runtime aggregation + +`RuntimeEventAggregationService` is shared by: + +- `TachographFileSessionRuntimeEventLoader`; +- `TachographDbRuntimeEventLoader`; +- `UnifiedRuntimeEventAssemblyService`. + +Aggregation removes: + +- repeated reads of the same source record; +- duplicate serialized representations of the same extraction observation. + +It deliberately preserves evidence pairs that are resolved downstream: + +- `CARD_ACTIVITY` / `VU_ACTIVITY`; +- card/VU support evidence; +- `CARD_VEHICLES_USED` / `IW_CYCLE`. + +The first two are handled by `RuntimeEventMixingService`. Vehicle usage remains source-distinct until interval-level reconciliation. + +## Regression coverage + +`RuntimeTachographRepresentationParityTest` verifies equal source profiles and mixing outcomes for DB and file-session representations. `RuntimeEventAggregationServiceTest` verifies that technical duplicates are reduced while card/VU and CVU/IW evidence remains available to later modules. diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventDescriptorFactory.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventDescriptorFactory.java index 105cff7..7182abc 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventDescriptorFactory.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventDescriptorFactory.java @@ -10,13 +10,25 @@ import com.fasterxml.jackson.databind.JsonNode; import java.time.OffsetDateTime; import java.util.Comparator; import java.util.List; -import java.util.Locale; import java.util.Objects; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RuntimeEventDescriptorFactory { + private final RuntimeTachographEventSemantics tachographSemantics; + + @Autowired + public RuntimeEventDescriptorFactory(RuntimeTachographEventSemantics tachographSemantics) { + this.tachographSemantics = tachographSemantics; + } + + /** Compatibility constructor used by unit tests. */ + public RuntimeEventDescriptorFactory() { + this(new RuntimeTachographEventSemantics()); + } + public List describeSorted(List events) { return sort(events).stream() .map(this::describe) @@ -64,26 +76,7 @@ public class RuntimeEventDescriptorFactory { } public RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) { - JsonNode raw = rawPayload(event); - String sourceKind = firstNonBlank(text(raw, "sourceKind"), sourceKind(event)); - String extractionCode = firstNonBlank( - text(raw, "extractionCode"), - fileSessionExtractionCode(event, sourceKind), - extractionCodeFromExternalSourceEventId(event) - ); - String sourceSystem = firstNonBlank( - text(raw, "sourceSystem"), - sourceProvider(event), - sourceSystemFromExternalSourceEventId(event) - ); - if (sourceSystem == null && (extractionCode != null || isTachographFileSessionEvent(event))) { - sourceSystem = "TACHOGRAPH"; - } - return new RuntimeEventSourceProfile( - normalizeUpper(sourceSystem), - normalizeUpper(sourceKind), - normalizeUpper(extractionCode) - ); + return tachographSemantics.sourceProfile(event); } public String eventIdentityKey(EventHubEventDto event) { @@ -106,77 +99,6 @@ public class RuntimeEventDescriptorFactory { .thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo)); } - private String fileSessionExtractionCode(EventHubEventDto event, String sourceKind) { - if (!isTachographFileSessionEvent(event)) { - return null; - } - String normalizedSourceKind = normalizeUpper(sourceKind); - if (normalizedSourceKind == null) { - return null; - } - if (event != null && event.eventDomain() == EventDomain.DRIVER_ACTIVITY) { - return switch (normalizedSourceKind) { - case "DRIVER_CARD" -> "CARD_ACTIVITY"; - case "VEHICLE_UNIT" -> "VU_ACTIVITY"; - default -> null; - }; - } - if (event != null && event.eventDomain() == EventDomain.DRIVER_CARD) { - return switch (normalizedSourceKind) { - case "DRIVER_CARD" -> "CARD_VEHICLES_USED"; - case "VEHICLE_UNIT" -> "IW_CYCLE"; - default -> null; - }; - } - if (event == null || event.eventDomain() == null) { - return null; - } - String prefix = switch (normalizedSourceKind) { - case "DRIVER_CARD" -> "CARD"; - case "VEHICLE_UNIT" -> "VU"; - default -> null; - }; - if (prefix == null) { - return null; - } - return switch (event.eventDomain()) { - case POSITION -> prefix + "_POSITION"; - case PLACE -> prefix + "_PLACE"; - case BORDER_CROSSING -> prefix + "_BORDER_CROSSING"; - case LOAD_UNLOAD -> prefix + "_LOAD_UNLOAD"; - case SPECIFIC_CONDITION -> prefix + "_SPECIFIC_CONDITION"; - case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null; - default -> null; - }; - } - - private boolean isTachographFileSessionEvent(EventHubEventDto event) { - if (event == null) { - return false; - } - String packageKind = event.sourcePackageRef() == null ? null : normalizeUpper(event.sourcePackageRef().packageKind()); - if (Objects.equals("TACHOGRAPH_FILE_SESSION", packageKind) - || Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", packageKind)) { - return true; - } - String provider = sourceProvider(event); - if (Objects.equals("TACHOGRAPH_FILE_SESSION", normalizeUpper(provider)) - || Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", normalizeUpper(provider))) { - return true; - } - String sourceKey = event.packageInfo() == null || event.packageInfo().eventSource() == null - ? null - : normalizeUpper(event.packageInfo().eventSource().sourceKey()); - if (Objects.equals("TACHOGRAPH_FILE_SESSION", sourceKey) - || Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", sourceKey)) { - return true; - } - String externalId = event.externalSourceEventId(); - return externalId != null - && (externalId.startsWith("TACHOGRAPH_FILE_SESSION:") - || externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:")); - } - private String compatibleActivityKey(EventHubEventDto event) { JsonNode raw = rawPayload(event); return String.join("|", @@ -220,50 +142,13 @@ public class RuntimeEventDescriptorFactory { } private String semanticSupportLifecycle(EventHubEventDto event) { - if (event == null || event.lifecycle() == null) { - return null; - } - if (event.eventDomain() == EventDomain.PLACE - && (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.BEGIN)) { - return EventLifecycle.BEGIN.name(); - } - return event.lifecycle().name(); + return tachographSemantics.semanticLifecycle(event); } private JsonNode rawPayload(EventHubEventDto event) { return RuntimeEntityReferenceResolver.rawPayload(event); } - private String sourceKind(EventHubEventDto event) { - return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null - ? null - : event.packageInfo().eventSource().sourceKind(); - } - - private String sourceProvider(EventHubEventDto event) { - return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null - ? null - : event.packageInfo().eventSource().providerKey(); - } - - private String sourceSystemFromExternalSourceEventId(EventHubEventDto event) { - String externalId = event == null ? null : event.externalSourceEventId(); - if (externalId == null || externalId.isBlank()) { - return null; - } - String[] parts = externalId.split(":"); - return parts.length >= 1 ? parts[0] : null; - } - - private String extractionCodeFromExternalSourceEventId(EventHubEventDto event) { - String externalId = event == null ? null : event.externalSourceEventId(); - if (externalId == null || externalId.isBlank()) { - return null; - } - String[] parts = externalId.split(":"); - return parts.length >= 2 ? parts[1] : null; - } - private String detailText(EventHubEventDto event, String field) { if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) { return null; @@ -300,10 +185,6 @@ public class RuntimeEventDescriptorFactory { return null; } - private String normalizeUpper(String value) { - return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT); - } - private String nullToEmpty(Object value) { return value == null ? "" : String.valueOf(value); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeTachographEventSemantics.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeTachographEventSemantics.java new file mode 100644 index 0000000..caa1879 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeTachographEventSemantics.java @@ -0,0 +1,249 @@ +package at.procon.eventhub.processing.eventprocessing.mixing; + +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver; +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Locale; +import java.util.Objects; +import java.util.Set; +import org.springframework.stereotype.Component; + +/** + * Canonical semantic view of tachograph runtime events. + * + *

The same tachograph package can be processed directly from an in-memory file session or + * loaded from the tachograph database after serialization. This component normalizes only the + * representation differences that are relevant to downstream mixing. It deliberately leaves the + * original {@link EventHubEventDto} untouched for provenance and audit purposes.

+ */ +@Component +public class RuntimeTachographEventSemantics { + + private static final Set TACHOGRAPH_SOURCE_SYSTEMS = Set.of( + "TACHOGRAPH", + "TACHOGRAPH_FILE_SESSION", + "COMPOSITE_TACHOGRAPH_FILE_SESSION" + ); + + private static final Set KNOWN_EXTRACTION_CODES = Set.of( + "CARD_ACTIVITY", + "VU_ACTIVITY", + "CARD_VEHICLES_USED", + "IW_CYCLE", + "CARD_POSITION", + "VU_POSITION", + "CARD_PLACE", + "VU_PLACE", + "CARD_BORDER_CROSSING", + "VU_BORDER_CROSSING", + "CARD_LOAD_UNLOAD", + "VU_LOAD_UNLOAD", + "CARD_SPECIFIC_CONDITION", + "VU_SPECIFIC_CONDITION", + "SPEEDING_EVENTS" + ); + + public RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) { + JsonNode raw = rawPayload(event); + String explicitExtractionCode = normalizeUpper(firstNonBlank( + text(raw, "extractionCode"), + extractionCodeFromExternalSourceEventId(event) + )); + String sourceKind = normalizeUpper(firstNonBlank( + text(raw, "sourceKind"), + sourceKind(event), + sourceKindFromExtractionCode(explicitExtractionCode) + )); + String extractionCode = normalizeUpper(firstNonBlank( + explicitExtractionCode, + inferExtractionCode(event, sourceKind) + )); + String sourceSystemCandidate = normalizeUpper(firstNonBlank( + text(raw, "sourceSystem"), + sourceProvider(event), + sourceSystemFromExternalSourceEventId(event) + )); + String sourceSystem = isTachographRepresentation(event, sourceSystemCandidate, extractionCode) + ? "TACHOGRAPH" + : sourceSystemCandidate; + return new RuntimeEventSourceProfile(sourceSystem, sourceKind, extractionCode); + } + + /** + * Returns a semantic lifecycle used only for equivalence matching. + * DB place events use START while file-session place events use BEGIN for the same fact. + */ + public String semanticLifecycle(EventHubEventDto event) { + if (event == null || event.lifecycle() == null) { + return null; + } + if (event.eventDomain() == EventDomain.PLACE + && (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.BEGIN)) { + return EventLifecycle.BEGIN.name(); + } + return event.lifecycle().name(); + } + + public String inferExtractionCode(EventHubEventDto event, String sourceKind) { + if (event == null || event.eventDomain() == null) { + return null; + } + String normalizedSourceKind = normalizeUpper(sourceKind); + if (normalizedSourceKind == null) { + return null; + } + if (event.eventDomain() == EventDomain.DRIVER_ACTIVITY) { + return switch (normalizedSourceKind) { + case "DRIVER_CARD" -> "CARD_ACTIVITY"; + case "VEHICLE_UNIT" -> "VU_ACTIVITY"; + default -> null; + }; + } + if (event.eventDomain() == EventDomain.DRIVER_CARD) { + return switch (normalizedSourceKind) { + case "DRIVER_CARD" -> "CARD_VEHICLES_USED"; + case "VEHICLE_UNIT" -> "IW_CYCLE"; + default -> null; + }; + } + String prefix = switch (normalizedSourceKind) { + case "DRIVER_CARD" -> "CARD"; + case "VEHICLE_UNIT" -> "VU"; + default -> null; + }; + if (prefix == null) { + return null; + } + return switch (event.eventDomain()) { + case POSITION -> prefix + "_POSITION"; + case PLACE -> prefix + "_PLACE"; + case BORDER_CROSSING -> prefix + "_BORDER_CROSSING"; + case LOAD_UNLOAD -> prefix + "_LOAD_UNLOAD"; + case SPECIFIC_CONDITION -> prefix + "_SPECIFIC_CONDITION"; + case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null; + default -> null; + }; + } + + public boolean isTachographRepresentation(EventHubEventDto event) { + RuntimeEventSourceProfile profile = sourceProfile(event); + return profile.isTachographRuntimeSource(); + } + + private boolean isTachographRepresentation( + EventHubEventDto event, + String sourceSystemCandidate, + String extractionCode + ) { + if (TACHOGRAPH_SOURCE_SYSTEMS.contains(nullToEmpty(sourceSystemCandidate))) { + return true; + } + if (KNOWN_EXTRACTION_CODES.contains(nullToEmpty(extractionCode))) { + return true; + } + String packageKind = event == null || event.sourcePackageRef() == null + ? null + : normalizeUpper(event.sourcePackageRef().packageKind()); + if (TACHOGRAPH_SOURCE_SYSTEMS.contains(nullToEmpty(packageKind))) { + return true; + } + String sourceKey = event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null + ? null + : normalizeUpper(event.packageInfo().eventSource().sourceKey()); + if (sourceKey != null && sourceKey.startsWith("TACHOGRAPH")) { + return true; + } + String externalId = event == null ? null : event.externalSourceEventId(); + return externalId != null + && (externalId.startsWith("TACHOGRAPH:") + || externalId.startsWith("TACHOGRAPH_FILE_SESSION:") + || externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:")); + } + + private JsonNode rawPayload(EventHubEventDto event) { + return RuntimeEntityReferenceResolver.rawPayload(event); + } + + private String sourceKind(EventHubEventDto event) { + return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null + ? null + : event.packageInfo().eventSource().sourceKind(); + } + + private String sourceProvider(EventHubEventDto event) { + return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null + ? null + : event.packageInfo().eventSource().providerKey(); + } + + private String sourceSystemFromExternalSourceEventId(EventHubEventDto event) { + String externalId = event == null ? null : event.externalSourceEventId(); + if (externalId == null || externalId.isBlank()) { + return null; + } + String[] parts = externalId.split(":"); + return parts.length >= 1 ? parts[0] : null; + } + + private String extractionCodeFromExternalSourceEventId(EventHubEventDto event) { + String externalId = event == null ? null : event.externalSourceEventId(); + if (externalId == null || externalId.isBlank()) { + return null; + } + String[] parts = externalId.split(":"); + if (parts.length < 2 || !"TACHOGRAPH".equals(normalizeUpper(parts[0]))) { + return null; + } + String candidate = normalizeUpper(parts[1]); + return KNOWN_EXTRACTION_CODES.contains(nullToEmpty(candidate)) ? candidate : null; + } + + private String sourceKindFromExtractionCode(String extractionCode) { + String normalized = normalizeUpper(extractionCode); + if (normalized == null) { + return null; + } + if (normalized.startsWith("CARD_")) { + return "DRIVER_CARD"; + } + if (normalized.startsWith("VU_") || Objects.equals("IW_CYCLE", normalized) + || Objects.equals("SPEEDING_EVENTS", normalized)) { + return "VEHICLE_UNIT"; + } + return null; + } + + private String text(JsonNode node, String field) { + if (node == null || field == null) { + return null; + } + JsonNode value = node.get(field); + if (value == null || value.isNull()) { + return null; + } + String text = value.asText(null); + return text == null || text.isBlank() ? null : text.trim(); + } + + private String firstNonBlank(String... values) { + if (values == null) { + return null; + } + for (String value : values) { + if (value != null && !value.isBlank()) { + return value.trim(); + } + } + return null; + } + + private String normalizeUpper(String value) { + return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT); + } + + private String nullToEmpty(String value) { + return value == null ? "" : value; + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/RuntimeEventAggregationService.java b/src/main/java/at/procon/eventhub/processing/service/RuntimeEventAggregationService.java new file mode 100644 index 0000000..b3e1660 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/RuntimeEventAggregationService.java @@ -0,0 +1,184 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.SourcePackageRefDto; +import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeEventSourceProfile; +import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeTachographEventSemantics; +import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Aggregates runtime events before plan-specific semantic processing. + * + *

The aggregation removes repeated reads of the same source record and duplicate serialized + * representations of the same extraction observation. Tachograph evidence pairs that must be + * resolved later remain distinct: CARD/VU activity and support events are preserved for + * {@code RuntimeEventMixingService}, while CARD_VEHICLES_USED/IW_CYCLE remain distinct for + * interval reconciliation.

+ */ +@Service +public class RuntimeEventAggregationService { + + private static final Set DOWNSTREAM_RESOLVED_TACHOGRAPH_EXTRACTION_CODES = Set.of( + "CARD_ACTIVITY", + "VU_ACTIVITY", + "CARD_VEHICLES_USED", + "IW_CYCLE", + "CARD_POSITION", + "VU_POSITION", + "CARD_PLACE", + "VU_PLACE", + "CARD_BORDER_CROSSING", + "VU_BORDER_CROSSING", + "CARD_LOAD_UNLOAD", + "VU_LOAD_UNLOAD", + "CARD_SPECIFIC_CONDITION", + "VU_SPECIFIC_CONDITION" + ); + + private final RuntimeTachographEventSemantics tachographSemantics; + + @Autowired + public RuntimeEventAggregationService(RuntimeTachographEventSemantics tachographSemantics) { + this.tachographSemantics = tachographSemantics; + } + + /** Compatibility constructor used by unit tests and direct loader construction. */ + public RuntimeEventAggregationService() { + this(new RuntimeTachographEventSemantics()); + } + + @SafeVarargs + public final List aggregateRuntimeEvents(List... eventGroups) { + LinkedHashMap exactSourceRecords = new LinkedHashMap<>(); + if (eventGroups != null) { + for (List eventGroup : eventGroups) { + appendExactSourceRecords(exactSourceRecords, eventGroup); + } + } + + LinkedHashMap> canonicalGroups = new LinkedHashMap<>(); + for (EventHubEventDto event : exactSourceRecords.values()) { + canonicalGroups.computeIfAbsent( + canonicalAggregationKey(event), + ignored -> new ArrayList<>() + ).add(event); + } + + List aggregated = new ArrayList<>(); + canonicalGroups.values().forEach(group -> aggregated.addAll(reduceCanonicalGroup(group))); + return aggregated.stream().sorted(eventComparator()).toList(); + } + + /** Compatibility alias for the first implementation name. */ + @SafeVarargs + public final List aggregateExactSourceRecords(List... eventGroups) { + return aggregateRuntimeEvents(eventGroups); + } + + public String exactSourceRecordKey(EventHubEventDto event) { + if (event == null) { + return "NULL_EVENT"; + } + RuntimeEventSourceProfile profile = tachographSemantics.sourceProfile(event); + SourcePackageRefDto sourcePackage = event.sourcePackageRef(); + String sourceIdentity = firstNonBlank( + event.externalSourceEventId(), + event.eventId() == null ? null : event.eventId().toString() + ); + if (sourceIdentity == null) { + sourceIdentity = RuntimeEventIdentityResolver.canonicalEventKey(event); + } + return String.join("|", + "SOURCE_RECORD", + nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()), + nullToEmpty(profile.sourceSystem()), + nullToEmpty(profile.sourceKind()), + nullToEmpty(profile.extractionCode()), + nullToEmpty(sourcePackage == null ? null : sourcePackage.packageKind()), + nullToEmpty(sourcePackage == null ? null : sourcePackage.sourcePackageId()), + nullToEmpty(sourcePackage == null ? null : sourcePackage.sourceEntityId()), + sourceIdentity + ); + } + + private String canonicalAggregationKey(EventHubEventDto event) { + String canonicalKey = RuntimeEventIdentityResolver.canonicalEventKey(event); + String semanticLifecycle = tachographSemantics.semanticLifecycle(event); + if (event == null || event.lifecycle() == null || semanticLifecycle == null + || event.lifecycle().name().equals(semanticLifecycle)) { + return canonicalKey; + } + String[] parts = canonicalKey.split("\\|", -1); + if (parts.length > 4 && "EVENT".equals(parts[0])) { + parts[4] = semanticLifecycle; + return String.join("|", parts); + } + return canonicalKey; + } + + private List reduceCanonicalGroup(List group) { + if (group == null || group.size() <= 1) { + return group == null ? List.of() : List.copyOf(group); + } + + LinkedHashMap tachographEvidenceByExtraction = new LinkedHashMap<>(); + for (EventHubEventDto event : group) { + RuntimeEventSourceProfile profile = tachographSemantics.sourceProfile(event); + if (profile.isTachographRuntimeSource() + && DOWNSTREAM_RESOLVED_TACHOGRAPH_EXTRACTION_CODES.contains(profile.extractionCode())) { + tachographEvidenceByExtraction.putIfAbsent(profile.extractionCode(), event); + } + } + + if (tachographEvidenceByExtraction.size() > 1) { + return List.copyOf(tachographEvidenceByExtraction.values()); + } + return List.of(group.getFirst()); + } + + private void appendExactSourceRecords( + LinkedHashMap target, + List events + ) { + if (events == null) { + return; + } + for (EventHubEventDto event : events) { + if (event != null) { + target.putIfAbsent(exactSourceRecordKey(event), event); + } + } + } + + private Comparator eventComparator() { + return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name()) + .thenComparing(event -> event.eventType() == null ? "" : event.eventType().name()) + .thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name()) + .thenComparing(event -> tachographSemantics.sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo)) + .thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo)); + } + + private String firstNonBlank(String... values) { + if (values == null) { + return null; + } + for (String value : values) { + if (value != null && !value.isBlank()) { + return value.trim(); + } + } + return null; + } + + private String nullToEmpty(Object value) { + return value == null ? "" : String.valueOf(value); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java index 22a9c47..a89aa2f 100644 --- a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java +++ b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java @@ -8,14 +8,13 @@ import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput; -import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver; import java.util.ArrayList; import java.util.Comparator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @@ -25,13 +24,25 @@ public class UnifiedRuntimeEventAssemblyService { private final List driverEventLoaders; private final List vehicleEventLoaders; + private final RuntimeEventAggregationService eventAggregationService; + @Autowired + public UnifiedRuntimeEventAssemblyService( + List driverEventLoaders, + List vehicleEventLoaders, + RuntimeEventAggregationService eventAggregationService + ) { + this.driverEventLoaders = List.copyOf(driverEventLoaders); + this.vehicleEventLoaders = List.copyOf(vehicleEventLoaders); + this.eventAggregationService = eventAggregationService; + } + + /** Compatibility constructor retained for existing tests and direct construction. */ public UnifiedRuntimeEventAssemblyService( List driverEventLoaders, List vehicleEventLoaders ) { - this.driverEventLoaders = List.copyOf(driverEventLoaders); - this.vehicleEventLoaders = List.copyOf(vehicleEventLoaders); + this(driverEventLoaders, vehicleEventLoaders, new RuntimeEventAggregationService()); } public UnifiedRuntimeEventBundle assembleDriverScopedEvents(UnifiedRuntimeProcessingRequest request) { @@ -43,7 +54,7 @@ public class UnifiedRuntimeEventAssemblyService { ? loadExpandedVehicleEvents(request, discoveredVehicles) : List.of(); List aggregatedEvents = expandVehicleEvents - ? deduplicateAndSort(driverSeedEvents, expandedVehicleEvents) + ? eventAggregationService.aggregateRuntimeEvents(driverSeedEvents, expandedVehicleEvents) : driverSeedEvents; List notes = new ArrayList<>(); @@ -76,6 +87,7 @@ public class UnifiedRuntimeEventAssemblyService { } else { notes.add("Vehicle expansion was disabled for this runtime request."); } + notes.add("Runtime aggregation removes repeated reads and duplicate serialized representations of the same extraction observation while preserving card/VU equivalents and CARD_VEHICLES_USED/IW_CYCLE evidence for later processing."); notes.add("The assembled event set is a broad aggregated runtime scope; semantic card/VU mixing and interval reconciliation are performed by later modules."); LOG.info( "Runtime event assembly completed (expandVehicleEvents: {}, sourceInputs: {}, driverSeedEvents: {}, discoveredVehicles: {}, expandedVehicleEvents: {}, aggregatedEvents: {})", @@ -107,7 +119,7 @@ public class UnifiedRuntimeEventAssemblyService { result.addAll(loader.loadDriverEvents(sourceRequest)); } } - return deduplicateAndSort(result, List.of()); + return eventAggregationService.aggregateRuntimeEvents(result); } private List loadExpandedVehicleEvents( @@ -126,7 +138,7 @@ public class UnifiedRuntimeEventAssemblyService { } } } - return deduplicateAndSort(result, List.of()); + return eventAggregationService.aggregateRuntimeEvents(result); } private List discoverVehicles(List events) { @@ -166,28 +178,6 @@ public class UnifiedRuntimeEventAssemblyService { return List.copyOf(result); } - private List deduplicateAndSort( - List left, - List right - ) { - LinkedHashMap byKey = new LinkedHashMap<>(); - appendDeduplicated(byKey, left); - appendDeduplicated(byKey, right); - return byKey.values().stream() - .sorted(Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder())) - .thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name()) - .thenComparing(event -> event.eventType() == null ? "" : event.eventType().name()) - .thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name()) - .thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo))) - .toList(); - } - - private void appendDeduplicated(LinkedHashMap byKey, List events) { - for (EventHubEventDto event : events) { - byKey.putIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), event); - } - } - private RuntimeDriverEventLoader driverLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { return driverEventLoaders.stream() .filter(loader -> loader.supports(request, sourceFamily)) diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographDbRuntimeEventLoader.java b/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographDbRuntimeEventLoader.java index 80b500c..583c8d3 100644 --- a/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographDbRuntimeEventLoader.java +++ b/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographDbRuntimeEventLoader.java @@ -16,6 +16,7 @@ import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import at.procon.eventhub.processing.service.RuntimeDriverEventLoader; +import at.procon.eventhub.processing.service.RuntimeEventAggregationService; import at.procon.eventhub.processing.service.RuntimeVehicleEventLoader; import at.procon.eventhub.reference.TachographNationRegistry; import at.procon.eventhub.tachograph.dto.TachographImportRequest; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.core.io.Resource; @@ -45,15 +47,28 @@ public class TachographDbRuntimeEventLoader implements RuntimeDriverEventLoader, private final NamedParameterJdbcTemplate jdbcTemplate; private final TachographExtractionDefinitionRegistry definitionRegistry; private final ResourceLoader resourceLoader; + private final RuntimeEventAggregationService eventAggregationService; + @Autowired public TachographDbRuntimeEventLoader( @Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate, TachographExtractionDefinitionRegistry definitionRegistry, - ResourceLoader resourceLoader + ResourceLoader resourceLoader, + RuntimeEventAggregationService eventAggregationService ) { this.jdbcTemplate = jdbcTemplate; this.definitionRegistry = definitionRegistry; this.resourceLoader = resourceLoader; + this.eventAggregationService = eventAggregationService; + } + + /** Compatibility constructor retained for direct construction. */ + public TachographDbRuntimeEventLoader( + NamedParameterJdbcTemplate jdbcTemplate, + TachographExtractionDefinitionRegistry definitionRegistry, + ResourceLoader resourceLoader + ) { + this(jdbcTemplate, definitionRegistry, resourceLoader, new RuntimeEventAggregationService()); } @Override @@ -74,7 +89,7 @@ public class TachographDbRuntimeEventLoader implements RuntimeDriverEventLoader, request.occurredTo() )); } - return List.copyOf(result); + return eventAggregationService.aggregateRuntimeEvents(result); } @@ -93,7 +108,7 @@ public class TachographDbRuntimeEventLoader implements RuntimeDriverEventLoader, request.vehicleOccurredTo() )); } - return List.copyOf(result); + return eventAggregationService.aggregateRuntimeEvents(result); } private List queryDefinition( diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionRuntimeEventLoader.java b/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionRuntimeEventLoader.java index ea7db06..cedcde3 100644 --- a/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionRuntimeEventLoader.java +++ b/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionRuntimeEventLoader.java @@ -11,14 +11,15 @@ import at.procon.eventhub.processing.service.RuntimeDriverEventLoader; import at.procon.eventhub.processing.service.RuntimeVehicleEventLoader; import at.procon.eventhub.processing.service.UnifiedDriverEventSourceService; import at.procon.eventhub.processing.service.UnifiedVehicleEventSourceService; +import at.procon.eventhub.processing.service.RuntimeEventAggregationService; import at.procon.eventhub.service.EventAcquisitionRecordKeyService; import at.procon.eventhub.service.EventHubEventSorter; import at.procon.eventhub.tachographfilesession.service.TachographCompositeSessionNotFoundException; import at.procon.eventhub.tachographfilesession.service.TachographCompositeSessionRepository; import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; import java.util.UUID; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @@ -27,21 +28,35 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve private final UnifiedDriverEventSourceService driverEventSourceService; private final UnifiedVehicleEventSourceService vehicleEventSourceService; private final TachographCompositeSessionRepository compositeSessionRepository; - private final EventAcquisitionRecordKeyService eventKeyService; - private final EventHubEventSorter eventSorter; + private final RuntimeEventAggregationService eventAggregationService; + @Autowired public TachographFileSessionRuntimeEventLoader( UnifiedDriverEventSourceService driverEventSourceService, UnifiedVehicleEventSourceService vehicleEventSourceService, TachographCompositeSessionRepository compositeSessionRepository, - EventAcquisitionRecordKeyService eventKeyService, - EventHubEventSorter eventSorter + RuntimeEventAggregationService eventAggregationService ) { this.driverEventSourceService = driverEventSourceService; this.vehicleEventSourceService = vehicleEventSourceService; this.compositeSessionRepository = compositeSessionRepository; - this.eventKeyService = eventKeyService; - this.eventSorter = eventSorter; + this.eventAggregationService = eventAggregationService; + } + + /** Compatibility constructor retained for existing tests and direct construction. */ + public TachographFileSessionRuntimeEventLoader( + UnifiedDriverEventSourceService driverEventSourceService, + UnifiedVehicleEventSourceService vehicleEventSourceService, + TachographCompositeSessionRepository compositeSessionRepository, + EventAcquisitionRecordKeyService ignoredEventKeyService, + EventHubEventSorter ignoredEventSorter + ) { + this( + driverEventSourceService, + vehicleEventSourceService, + compositeSessionRepository, + new RuntimeEventAggregationService() + ); } @Override @@ -64,7 +79,7 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve ) )); } - return deduplicateBySignatureAndSort(result); + return eventAggregationService.aggregateRuntimeEvents(result); } @Override @@ -87,7 +102,7 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve ) )); } - return deduplicateBySignatureAndSort(result); + return eventAggregationService.aggregateRuntimeEvents(result); } private List resolveSessionIds(UnifiedRuntimeProcessingRequest request) { @@ -99,11 +114,5 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve return request.sessionIds(); } - private List deduplicateBySignatureAndSort(List events) { - LinkedHashMap bySignature = new LinkedHashMap<>(); - for (EventHubEventDto event : events) { - bySignature.putIfAbsent(eventKeyService.buildEventSignatureHash(event), event); - } - return eventSorter.sort(new ArrayList<>(bySignature.values())); - } + } diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeTachographRepresentationParityTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeTachographRepresentationParityTest.java new file mode 100644 index 0000000..de21e09 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeTachographRepresentationParityTest.java @@ -0,0 +1,326 @@ +package at.procon.eventhub.processing.eventprocessing.mixing; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDetailsDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourcePackageRefDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.OffsetDateTime; +import java.util.List; +import org.junit.jupiter.api.Test; + +class RuntimeTachographRepresentationParityTest { + + private static final OffsetDateTime OCCURRED_AT = OffsetDateTime.parse("2026-04-01T08:00:00Z"); + + private final RuntimeEventDescriptorFactory descriptorFactory = new RuntimeEventDescriptorFactory(); + private final RuntimeEventMixingService mixingService = new RuntimeEventMixingService(); + + @Test + void producesSameCanonicalSourceProfileForDbAndFileSessionPlaceRepresentations() { + EventHubEventDto dbEvent = event( + Representation.DB, + "DRIVER_CARD", + "CARD_PLACE", + "TACHOGRAPH:CARD_PLACE:10", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.START + ); + EventHubEventDto fileSessionEvent = event( + Representation.FILE_SESSION, + "DRIVER_CARD", + null, + "TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:place-10:BEGIN:2026-04-01T08:00:00Z", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.BEGIN + ); + + RuntimeEventDescriptor dbDescriptor = descriptorFactory.describe(dbEvent); + RuntimeEventDescriptor fileDescriptor = descriptorFactory.describe(fileSessionEvent); + + assertThat(dbDescriptor.sourceProfile()).isEqualTo(new RuntimeEventSourceProfile( + "TACHOGRAPH", + "DRIVER_CARD", + "CARD_PLACE" + )); + assertThat(fileDescriptor.sourceProfile()).isEqualTo(dbDescriptor.sourceProfile()); + assertThat(fileDescriptor.compatibleSupportEvidenceKey()) + .isEqualTo(dbDescriptor.compatibleSupportEvidenceKey()); + } + + @Test + void infersSameVehicleUsageExtractionProfilesForDbAndFileSessionRepresentations() { + EventHubEventDto dbCvu = event( + Representation.DB, + "DRIVER_CARD", + "CARD_VEHICLES_USED", + "TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT", + EventDomain.DRIVER_CARD, + EventType.CARD_INSERTED, + EventLifecycle.INSERT + ); + EventHubEventDto fileCvu = event( + Representation.FILE_SESSION, + "DRIVER_CARD", + null, + "TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:CARD_USAGE:cvu-10:INSERT:2026-04-01T08:00:00Z", + EventDomain.DRIVER_CARD, + EventType.CARD_INSERTED, + EventLifecycle.INSERT + ); + EventHubEventDto dbIw = event( + Representation.DB, + "VEHICLE_UNIT", + "IW_CYCLE", + "TACHOGRAPH:IW_CYCLE:20:INSERT", + EventDomain.DRIVER_CARD, + EventType.CARD_INSERTED, + EventLifecycle.INSERT + ); + EventHubEventDto fileIw = event( + Representation.FILE_SESSION, + "VEHICLE_UNIT", + null, + "TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:CARD_USAGE:iw-20:INSERT:2026-04-01T08:00:00Z", + EventDomain.DRIVER_CARD, + EventType.CARD_INSERTED, + EventLifecycle.INSERT + ); + + assertThat(descriptorFactory.sourceProfile(fileCvu)) + .isEqualTo(descriptorFactory.sourceProfile(dbCvu)); + assertThat(descriptorFactory.sourceProfile(fileIw)) + .isEqualTo(descriptorFactory.sourceProfile(dbIw)); + assertThat(descriptorFactory.sourceProfile(dbCvu).extractionCode()) + .isEqualTo("CARD_VEHICLES_USED"); + assertThat(descriptorFactory.sourceProfile(dbIw).extractionCode()) + .isEqualTo("IW_CYCLE"); + } + + @Test + void producesEquivalentMixingOutcomeForDbAndFileSessionSupportPairs() { + RuntimeMixedEventBundle dbMixed = mixingService.mix( + List.of( + event( + Representation.DB, + "DRIVER_CARD", + "CARD_PLACE", + "TACHOGRAPH:CARD_PLACE:10", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.START + ), + event( + Representation.DB, + "VEHICLE_UNIT", + "VU_PLACE", + "TACHOGRAPH:VU_PLACE:20", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.START + ) + ), + RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE + ); + RuntimeMixedEventBundle fileMixed = mixingService.mix( + List.of( + event( + Representation.FILE_SESSION, + "DRIVER_CARD", + null, + "TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-place-10:BEGIN:2026-04-01T08:00:00Z", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.BEGIN + ), + event( + Representation.FILE_SESSION, + "VEHICLE_UNIT", + null, + "TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-place-20:BEGIN:2026-04-01T08:00:00Z", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.BEGIN + ) + ), + RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE + ); + + assertThat(fileMixed.supportEvidenceEvents()).hasSameSizeAs(dbMixed.supportEvidenceEvents()); + assertThat(fileMixed.suppressedEvents()).hasSameSizeAs(dbMixed.suppressedEvents()); + assertThat(fileMixed.eventMixingDecisions()).hasSameSizeAs(dbMixed.eventMixingDecisions()); + assertThat(fileMixed.eventMixingDecisions().getFirst().primaryExtractionCode()) + .isEqualTo(dbMixed.eventMixingDecisions().getFirst().primaryExtractionCode()); + assertThat(fileMixed.eventMixingDecisions().getFirst().secondaryExtractionCodes()) + .isEqualTo(dbMixed.eventMixingDecisions().getFirst().secondaryExtractionCodes()); + assertThat(fileMixed.eventMixingDecisions().getFirst().ruleId()) + .isEqualTo(dbMixed.eventMixingDecisions().getFirst().ruleId()); + } + + + + @Test + void producesEquivalentMixingOutcomeForDbAndFileSessionActivityPairs() { + RuntimeMixedEventBundle dbMixed = mixingService.mix( + List.of( + event(Representation.DB, "DRIVER_CARD", "CARD_ACTIVITY", + "TACHOGRAPH:CARD_ACTIVITY:10:START", + EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START), + event(Representation.DB, "VEHICLE_UNIT", "VU_ACTIVITY", + "TACHOGRAPH:VU_ACTIVITY:20:START", + EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START) + ), + RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE + ); + RuntimeMixedEventBundle fileMixed = mixingService.mix( + List.of( + event(Representation.FILE_SESSION, "DRIVER_CARD", null, + "TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-10:START:2026-04-01T08:00:00Z", + EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START), + event(Representation.FILE_SESSION, "VEHICLE_UNIT", null, + "TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-20:START:2026-04-01T08:00:00Z", + EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START) + ), + RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE + ); + + assertThat(fileMixed.activityTimelineEvents()).hasSameSizeAs(dbMixed.activityTimelineEvents()); + assertThat(fileMixed.suppressedEvents()).hasSameSizeAs(dbMixed.suppressedEvents()); + assertThat(fileMixed.eventMixingDecisions()).hasSameSizeAs(dbMixed.eventMixingDecisions()); + assertThat(fileMixed.eventMixingDecisions().getFirst().primaryExtractionCode()) + .isEqualTo(dbMixed.eventMixingDecisions().getFirst().primaryExtractionCode()); + assertThat(fileMixed.eventMixingDecisions().getFirst().secondaryExtractionCodes()) + .isEqualTo(dbMixed.eventMixingDecisions().getFirst().secondaryExtractionCodes()); + } + + @Test + void preservesEquivalentCvuAndIwCycleInputsForBothRepresentations() { + List dbEvents = List.of( + event(Representation.DB, "DRIVER_CARD", "CARD_VEHICLES_USED", + "TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT", + EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT), + event(Representation.DB, "DRIVER_CARD", "CARD_VEHICLES_USED", + "TACHOGRAPH:CARD_VEHICLES_USED:10:WITHDRAW", + EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW), + event(Representation.DB, "VEHICLE_UNIT", "IW_CYCLE", + "TACHOGRAPH:IW_CYCLE:20:INSERT", + EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT), + event(Representation.DB, "VEHICLE_UNIT", "IW_CYCLE", + "TACHOGRAPH:IW_CYCLE:20:WITHDRAW", + EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW) + ); + List fileEvents = List.of( + event(Representation.FILE_SESSION, "DRIVER_CARD", null, + "TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:CARD_USAGE:cvu-10:INSERT:2026-04-01T08:00:00Z", + EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT), + event(Representation.FILE_SESSION, "DRIVER_CARD", null, + "TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:CARD_USAGE:cvu-10:WITHDRAW:2026-04-01T08:00:00Z", + EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW), + event(Representation.FILE_SESSION, "VEHICLE_UNIT", null, + "TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:CARD_USAGE:iw-20:INSERT:2026-04-01T08:00:00Z", + EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT), + event(Representation.FILE_SESSION, "VEHICLE_UNIT", null, + "TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:CARD_USAGE:iw-20:WITHDRAW:2026-04-01T08:00:00Z", + EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW) + ); + + RuntimeMixedEventBundle dbMixed = mixingService.mix( + dbEvents, RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE); + RuntimeMixedEventBundle fileMixed = mixingService.mix( + fileEvents, RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE); + + assertThat(dbMixed.vehicleUsageEvents()).hasSize(4); + assertThat(fileMixed.vehicleUsageEvents()).hasSize(4); + assertThat(dbMixed.suppressedEvents()).isEmpty(); + assertThat(fileMixed.suppressedEvents()).isEmpty(); + assertThat(fileEvents).extracting(event -> descriptorFactory.sourceProfile(event).extractionCode()) + .containsExactly("CARD_VEHICLES_USED", "CARD_VEHICLES_USED", "IW_CYCLE", "IW_CYCLE"); + } + + private EventHubEventDto event( + Representation representation, + String sourceKind, + String extractionCode, + String externalSourceEventId, + EventDomain domain, + EventType type, + EventLifecycle lifecycle + ) { + String provider = representation == Representation.DB ? "TACHOGRAPH" : "TACHOGRAPH_FILE_SESSION"; + EventSourceDto source = new EventSourceDto( + provider, + sourceKind, + provider + "_" + sourceKind, + null, + null, + null + ); + EventHubPackageRequest packageInfo = new EventHubPackageRequest( + "default", + source, + null, + ImportScopeDto.tenantAll(null, null), + domain.name(), + null, + provider + ":package" + ); + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("sourceKind", sourceKind); + raw.put("driverKey", "12:123"); + raw.put("registrationKey", "12:REG-1"); + if (extractionCode != null) { + raw.put("extractionCode", extractionCode); + } + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + return new EventHubEventDto( + null, + externalSourceEventId, + new DriverRefDto("driver-1", new DriverCardRefDto("12", "123")), + new VehicleRefDto( + sourceKind.equals("VEHICLE_UNIT") ? "vehicle-1" : null, + sourceKind.equals("VEHICLE_UNIT") ? "VIN-1" : null, + new VehicleRegistrationRefDto("12", "REG-1") + ), + OCCURRED_AT, + null, + null, + domain, + type, + lifecycle, + null, + null, + new EventDetailsDto(domain.name(), JsonNodeFactory.instance.objectNode()), + new SourcePackageRefDto( + representation == Representation.DB ? sourceKind : "TACHOGRAPH_FILE_SESSION", + representation == Representation.DB ? "package-1" : "session-1", + sourceKind.equals("VEHICLE_UNIT") ? "vehicle-1" : "card-1", + null, + null, + null + ), + payload, + false, + packageInfo + ); + } + + private enum Representation { + DB, + FILE_SESSION + } +} diff --git a/src/test/java/at/procon/eventhub/processing/service/RuntimeEventAggregationServiceTest.java b/src/test/java/at/procon/eventhub/processing/service/RuntimeEventAggregationServiceTest.java new file mode 100644 index 0000000..96f1847 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/service/RuntimeEventAggregationServiceTest.java @@ -0,0 +1,202 @@ +package at.procon.eventhub.processing.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDetailsDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.OffsetDateTime; +import java.util.List; +import org.junit.jupiter.api.Test; + +class RuntimeEventAggregationServiceTest { + + private final RuntimeEventAggregationService service = new RuntimeEventAggregationService(); + + @Test + void removesOnlyRepeatedReadsOfSameSourceRecord() { + EventHubEventDto cardPosition = event( + "DRIVER_CARD", + "CARD_POSITION", + "TACHOGRAPH:CARD_POSITION:10", + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT + ); + EventHubEventDto vuPosition = event( + "VEHICLE_UNIT", + "VU_POSITION", + "TACHOGRAPH:VU_POSITION:20", + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT + ); + EventHubEventDto cvu = event( + "DRIVER_CARD", + "CARD_VEHICLES_USED", + "TACHOGRAPH:CARD_VEHICLES_USED:30:INSERT", + EventDomain.DRIVER_CARD, + EventType.CARD_INSERTED, + EventLifecycle.INSERT + ); + EventHubEventDto iwCycle = event( + "VEHICLE_UNIT", + "IW_CYCLE", + "TACHOGRAPH:IW_CYCLE:40:INSERT", + EventDomain.DRIVER_CARD, + EventType.CARD_INSERTED, + EventLifecycle.INSERT + ); + + List aggregated = service.aggregateRuntimeEvents( + List.of(cardPosition, vuPosition, cvu, iwCycle), + List.of(cardPosition) + ); + + assertThat(aggregated).hasSize(4); + assertThat(aggregated).extracting(EventHubEventDto::externalSourceEventId) + .containsExactlyInAnyOrder( + cardPosition.externalSourceEventId(), + vuPosition.externalSourceEventId(), + cvu.externalSourceEventId(), + iwCycle.externalSourceEventId() + ); + } + + + @Test + void collapsesDuplicateSerializedRepresentationsOfSameExtractionObservation() { + EventHubEventDto first = event( + "DRIVER_CARD", + "CARD_POSITION", + "TACHOGRAPH:CARD_POSITION:10", + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT + ); + EventHubEventDto serializedCopy = event( + "DRIVER_CARD", + "CARD_POSITION", + "TACHOGRAPH:CARD_POSITION:COPY-10", + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT + ); + + assertThat(service.aggregateRuntimeEvents(List.of(first, serializedCopy))) + .containsExactly(first); + } + + + @Test + void collapsesPlaceStartAndBeginRepresentationsForSameExtractionSource() { + EventHubEventDto dbStyle = event( + "DRIVER_CARD", + "CARD_PLACE", + "TACHOGRAPH:CARD_PLACE:10", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.START + ); + EventHubEventDto fileStyle = event( + "DRIVER_CARD", + "CARD_PLACE", + "TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:place-10:BEGIN:2026-04-01T08:00:00Z", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.BEGIN + ); + + assertThat(service.aggregateRuntimeEvents(List.of(dbStyle, fileStyle))) + .containsExactly(dbStyle); + } + + @Test + void keepsDifferentExtractionSourcesEvenWhenSemanticEventDataIsEqual() { + EventHubEventDto card = event( + "DRIVER_CARD", + "CARD_ACTIVITY", + "TACHOGRAPH:CARD_ACTIVITY:10:START", + EventDomain.DRIVER_ACTIVITY, + EventType.DRIVE, + EventLifecycle.START + ); + EventHubEventDto vu = event( + "VEHICLE_UNIT", + "VU_ACTIVITY", + "TACHOGRAPH:VU_ACTIVITY:20:START", + EventDomain.DRIVER_ACTIVITY, + EventType.DRIVE, + EventLifecycle.START + ); + + assertThat(service.aggregateRuntimeEvents(List.of(card, vu))) + .containsExactly(card, vu); + assertThat(service.exactSourceRecordKey(card)) + .isNotEqualTo(service.exactSourceRecordKey(vu)); + } + + private EventHubEventDto event( + String sourceKind, + String extractionCode, + String externalSourceEventId, + EventDomain domain, + EventType eventType, + EventLifecycle lifecycle + ) { + EventSourceDto source = new EventSourceDto( + "TACHOGRAPH", + sourceKind, + "TACHOGRAPH_" + sourceKind, + null, + null, + null + ); + EventHubPackageRequest packageInfo = new EventHubPackageRequest( + "default", + source, + null, + ImportScopeDto.tenantAll(null, null), + domain.name(), + null, + "TACHOGRAPH:package" + ); + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("sourceKind", sourceKind); + raw.put("extractionCode", extractionCode); + raw.put("driverKey", "12:123"); + raw.put("registrationKey", "12:REG-1"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + return new EventHubEventDto( + null, + externalSourceEventId, + new DriverRefDto("driver-1", new DriverCardRefDto("12", "123")), + new VehicleRefDto("vehicle-1", "VIN-1", new VehicleRegistrationRefDto("12", "REG-1")), + OffsetDateTime.parse("2026-04-01T08:00:00Z"), + null, + null, + domain, + eventType, + lifecycle, + null, + null, + new EventDetailsDto(domain.name(), JsonNodeFactory.instance.objectNode()), + null, + payload, + false, + packageInfo + ); + } +}