Add runtime tachograph aggregation and parity tests

This commit is contained in:
trifonovt 2026-06-15 12:22:23 +02:00
parent c1b4847cf0
commit 729e6fb261
9 changed files with 1076 additions and 182 deletions

View File

@ -0,0 +1,38 @@
# Tachograph DB / file-session runtime parity
The runtime pipeline treats direct tachograph file-session events and events loaded from the tachograph database as two representations of the same tachograph facts.
## Canonical semantic boundary
`RuntimeTachographEventSemantics` normalizes representation-only differences without modifying the original event:
- file-session and DB source systems are exposed as `TACHOGRAPH`;
- extraction codes are read from DB raw metadata or inferred from source kind and event domain;
- `PLACE START` and `PLACE BEGIN` share the semantic lifecycle `BEGIN` for mixing.
The raw lifecycle, source package, payload, and external source event ID remain unchanged for audit and provenance.
## Runtime aggregation
`RuntimeEventAggregationService` is shared by:
- `TachographFileSessionRuntimeEventLoader`;
- `TachographDbRuntimeEventLoader`;
- `UnifiedRuntimeEventAssemblyService`.
Aggregation removes:
- repeated reads of the same source record;
- duplicate serialized representations of the same extraction observation.
It deliberately preserves evidence pairs that are resolved downstream:
- `CARD_ACTIVITY` / `VU_ACTIVITY`;
- card/VU support evidence;
- `CARD_VEHICLES_USED` / `IW_CYCLE`.
The first two are handled by `RuntimeEventMixingService`. Vehicle usage remains source-distinct until interval-level reconciliation.
## Regression coverage
`RuntimeTachographRepresentationParityTest` verifies equal source profiles and mixing outcomes for DB and file-session representations. `RuntimeEventAggregationServiceTest` verifies that technical duplicates are reduced while card/VU and CVU/IW evidence remains available to later modules.

View File

@ -10,13 +10,25 @@ import com.fasterxml.jackson.databind.JsonNode;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Objects; import java.util.Objects;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class RuntimeEventDescriptorFactory { public class RuntimeEventDescriptorFactory {
private final RuntimeTachographEventSemantics tachographSemantics;
@Autowired
public RuntimeEventDescriptorFactory(RuntimeTachographEventSemantics tachographSemantics) {
this.tachographSemantics = tachographSemantics;
}
/** Compatibility constructor used by unit tests. */
public RuntimeEventDescriptorFactory() {
this(new RuntimeTachographEventSemantics());
}
public List<RuntimeEventDescriptor> describeSorted(List<EventHubEventDto> events) { public List<RuntimeEventDescriptor> describeSorted(List<EventHubEventDto> events) {
return sort(events).stream() return sort(events).stream()
.map(this::describe) .map(this::describe)
@ -64,26 +76,7 @@ public class RuntimeEventDescriptorFactory {
} }
public RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) { public RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) {
JsonNode raw = rawPayload(event); return tachographSemantics.sourceProfile(event);
String sourceKind = firstNonBlank(text(raw, "sourceKind"), sourceKind(event));
String extractionCode = firstNonBlank(
text(raw, "extractionCode"),
fileSessionExtractionCode(event, sourceKind),
extractionCodeFromExternalSourceEventId(event)
);
String sourceSystem = firstNonBlank(
text(raw, "sourceSystem"),
sourceProvider(event),
sourceSystemFromExternalSourceEventId(event)
);
if (sourceSystem == null && (extractionCode != null || isTachographFileSessionEvent(event))) {
sourceSystem = "TACHOGRAPH";
}
return new RuntimeEventSourceProfile(
normalizeUpper(sourceSystem),
normalizeUpper(sourceKind),
normalizeUpper(extractionCode)
);
} }
public String eventIdentityKey(EventHubEventDto event) { public String eventIdentityKey(EventHubEventDto event) {
@ -106,77 +99,6 @@ public class RuntimeEventDescriptorFactory {
.thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo)); .thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo));
} }
private String fileSessionExtractionCode(EventHubEventDto event, String sourceKind) {
if (!isTachographFileSessionEvent(event)) {
return null;
}
String normalizedSourceKind = normalizeUpper(sourceKind);
if (normalizedSourceKind == null) {
return null;
}
if (event != null && event.eventDomain() == EventDomain.DRIVER_ACTIVITY) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_ACTIVITY";
case "VEHICLE_UNIT" -> "VU_ACTIVITY";
default -> null;
};
}
if (event != null && event.eventDomain() == EventDomain.DRIVER_CARD) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_VEHICLES_USED";
case "VEHICLE_UNIT" -> "IW_CYCLE";
default -> null;
};
}
if (event == null || event.eventDomain() == null) {
return null;
}
String prefix = switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD";
case "VEHICLE_UNIT" -> "VU";
default -> null;
};
if (prefix == null) {
return null;
}
return switch (event.eventDomain()) {
case POSITION -> prefix + "_POSITION";
case PLACE -> prefix + "_PLACE";
case BORDER_CROSSING -> prefix + "_BORDER_CROSSING";
case LOAD_UNLOAD -> prefix + "_LOAD_UNLOAD";
case SPECIFIC_CONDITION -> prefix + "_SPECIFIC_CONDITION";
case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null;
default -> null;
};
}
private boolean isTachographFileSessionEvent(EventHubEventDto event) {
if (event == null) {
return false;
}
String packageKind = event.sourcePackageRef() == null ? null : normalizeUpper(event.sourcePackageRef().packageKind());
if (Objects.equals("TACHOGRAPH_FILE_SESSION", packageKind)
|| Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", packageKind)) {
return true;
}
String provider = sourceProvider(event);
if (Objects.equals("TACHOGRAPH_FILE_SESSION", normalizeUpper(provider))
|| Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", normalizeUpper(provider))) {
return true;
}
String sourceKey = event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: normalizeUpper(event.packageInfo().eventSource().sourceKey());
if (Objects.equals("TACHOGRAPH_FILE_SESSION", sourceKey)
|| Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", sourceKey)) {
return true;
}
String externalId = event.externalSourceEventId();
return externalId != null
&& (externalId.startsWith("TACHOGRAPH_FILE_SESSION:")
|| externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:"));
}
private String compatibleActivityKey(EventHubEventDto event) { private String compatibleActivityKey(EventHubEventDto event) {
JsonNode raw = rawPayload(event); JsonNode raw = rawPayload(event);
return String.join("|", return String.join("|",
@ -220,50 +142,13 @@ public class RuntimeEventDescriptorFactory {
} }
private String semanticSupportLifecycle(EventHubEventDto event) { private String semanticSupportLifecycle(EventHubEventDto event) {
if (event == null || event.lifecycle() == null) { return tachographSemantics.semanticLifecycle(event);
return null;
}
if (event.eventDomain() == EventDomain.PLACE
&& (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.BEGIN)) {
return EventLifecycle.BEGIN.name();
}
return event.lifecycle().name();
} }
private JsonNode rawPayload(EventHubEventDto event) { private JsonNode rawPayload(EventHubEventDto event) {
return RuntimeEntityReferenceResolver.rawPayload(event); return RuntimeEntityReferenceResolver.rawPayload(event);
} }
private String sourceKind(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().sourceKind();
}
private String sourceProvider(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().providerKey();
}
private String sourceSystemFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
return parts.length >= 1 ? parts[0] : null;
}
private String extractionCodeFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
return parts.length >= 2 ? parts[1] : null;
}
private String detailText(EventHubEventDto event, String field) { private String detailText(EventHubEventDto event, String field) {
if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) { if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) {
return null; return null;
@ -300,10 +185,6 @@ public class RuntimeEventDescriptorFactory {
return null; return null;
} }
private String normalizeUpper(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT);
}
private String nullToEmpty(Object value) { private String nullToEmpty(Object value) {
return value == null ? "" : String.valueOf(value); return value == null ? "" : String.valueOf(value);
} }

View File

@ -0,0 +1,249 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import org.springframework.stereotype.Component;
/**
* Canonical semantic view of tachograph runtime events.
*
* <p>The same tachograph package can be processed directly from an in-memory file session or
* loaded from the tachograph database after serialization. This component normalizes only the
* representation differences that are relevant to downstream mixing. It deliberately leaves the
* original {@link EventHubEventDto} untouched for provenance and audit purposes.</p>
*/
@Component
public class RuntimeTachographEventSemantics {
private static final Set<String> TACHOGRAPH_SOURCE_SYSTEMS = Set.of(
"TACHOGRAPH",
"TACHOGRAPH_FILE_SESSION",
"COMPOSITE_TACHOGRAPH_FILE_SESSION"
);
private static final Set<String> KNOWN_EXTRACTION_CODES = Set.of(
"CARD_ACTIVITY",
"VU_ACTIVITY",
"CARD_VEHICLES_USED",
"IW_CYCLE",
"CARD_POSITION",
"VU_POSITION",
"CARD_PLACE",
"VU_PLACE",
"CARD_BORDER_CROSSING",
"VU_BORDER_CROSSING",
"CARD_LOAD_UNLOAD",
"VU_LOAD_UNLOAD",
"CARD_SPECIFIC_CONDITION",
"VU_SPECIFIC_CONDITION",
"SPEEDING_EVENTS"
);
public RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
String explicitExtractionCode = normalizeUpper(firstNonBlank(
text(raw, "extractionCode"),
extractionCodeFromExternalSourceEventId(event)
));
String sourceKind = normalizeUpper(firstNonBlank(
text(raw, "sourceKind"),
sourceKind(event),
sourceKindFromExtractionCode(explicitExtractionCode)
));
String extractionCode = normalizeUpper(firstNonBlank(
explicitExtractionCode,
inferExtractionCode(event, sourceKind)
));
String sourceSystemCandidate = normalizeUpper(firstNonBlank(
text(raw, "sourceSystem"),
sourceProvider(event),
sourceSystemFromExternalSourceEventId(event)
));
String sourceSystem = isTachographRepresentation(event, sourceSystemCandidate, extractionCode)
? "TACHOGRAPH"
: sourceSystemCandidate;
return new RuntimeEventSourceProfile(sourceSystem, sourceKind, extractionCode);
}
/**
* Returns a semantic lifecycle used only for equivalence matching.
* DB place events use START while file-session place events use BEGIN for the same fact.
*/
public String semanticLifecycle(EventHubEventDto event) {
if (event == null || event.lifecycle() == null) {
return null;
}
if (event.eventDomain() == EventDomain.PLACE
&& (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.BEGIN)) {
return EventLifecycle.BEGIN.name();
}
return event.lifecycle().name();
}
public String inferExtractionCode(EventHubEventDto event, String sourceKind) {
if (event == null || event.eventDomain() == null) {
return null;
}
String normalizedSourceKind = normalizeUpper(sourceKind);
if (normalizedSourceKind == null) {
return null;
}
if (event.eventDomain() == EventDomain.DRIVER_ACTIVITY) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_ACTIVITY";
case "VEHICLE_UNIT" -> "VU_ACTIVITY";
default -> null;
};
}
if (event.eventDomain() == EventDomain.DRIVER_CARD) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_VEHICLES_USED";
case "VEHICLE_UNIT" -> "IW_CYCLE";
default -> null;
};
}
String prefix = switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD";
case "VEHICLE_UNIT" -> "VU";
default -> null;
};
if (prefix == null) {
return null;
}
return switch (event.eventDomain()) {
case POSITION -> prefix + "_POSITION";
case PLACE -> prefix + "_PLACE";
case BORDER_CROSSING -> prefix + "_BORDER_CROSSING";
case LOAD_UNLOAD -> prefix + "_LOAD_UNLOAD";
case SPECIFIC_CONDITION -> prefix + "_SPECIFIC_CONDITION";
case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null;
default -> null;
};
}
public boolean isTachographRepresentation(EventHubEventDto event) {
RuntimeEventSourceProfile profile = sourceProfile(event);
return profile.isTachographRuntimeSource();
}
private boolean isTachographRepresentation(
EventHubEventDto event,
String sourceSystemCandidate,
String extractionCode
) {
if (TACHOGRAPH_SOURCE_SYSTEMS.contains(nullToEmpty(sourceSystemCandidate))) {
return true;
}
if (KNOWN_EXTRACTION_CODES.contains(nullToEmpty(extractionCode))) {
return true;
}
String packageKind = event == null || event.sourcePackageRef() == null
? null
: normalizeUpper(event.sourcePackageRef().packageKind());
if (TACHOGRAPH_SOURCE_SYSTEMS.contains(nullToEmpty(packageKind))) {
return true;
}
String sourceKey = event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: normalizeUpper(event.packageInfo().eventSource().sourceKey());
if (sourceKey != null && sourceKey.startsWith("TACHOGRAPH")) {
return true;
}
String externalId = event == null ? null : event.externalSourceEventId();
return externalId != null
&& (externalId.startsWith("TACHOGRAPH:")
|| externalId.startsWith("TACHOGRAPH_FILE_SESSION:")
|| externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:"));
}
private JsonNode rawPayload(EventHubEventDto event) {
return RuntimeEntityReferenceResolver.rawPayload(event);
}
private String sourceKind(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().sourceKind();
}
private String sourceProvider(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().providerKey();
}
private String sourceSystemFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
return parts.length >= 1 ? parts[0] : null;
}
private String extractionCodeFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
if (parts.length < 2 || !"TACHOGRAPH".equals(normalizeUpper(parts[0]))) {
return null;
}
String candidate = normalizeUpper(parts[1]);
return KNOWN_EXTRACTION_CODES.contains(nullToEmpty(candidate)) ? candidate : null;
}
private String sourceKindFromExtractionCode(String extractionCode) {
String normalized = normalizeUpper(extractionCode);
if (normalized == null) {
return null;
}
if (normalized.startsWith("CARD_")) {
return "DRIVER_CARD";
}
if (normalized.startsWith("VU_") || Objects.equals("IW_CYCLE", normalized)
|| Objects.equals("SPEEDING_EVENTS", normalized)) {
return "VEHICLE_UNIT";
}
return null;
}
private String text(JsonNode node, String field) {
if (node == null || field == null) {
return null;
}
JsonNode value = node.get(field);
if (value == null || value.isNull()) {
return null;
}
String text = value.asText(null);
return text == null || text.isBlank() ? null : text.trim();
}
private String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String value : values) {
if (value != null && !value.isBlank()) {
return value.trim();
}
}
return null;
}
private String normalizeUpper(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT);
}
private String nullToEmpty(String value) {
return value == null ? "" : value;
}
}

View File

@ -0,0 +1,184 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.SourcePackageRefDto;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeEventSourceProfile;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeTachographEventSemantics;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Aggregates runtime events before plan-specific semantic processing.
*
* <p>The aggregation removes repeated reads of the same source record and duplicate serialized
* representations of the same extraction observation. Tachograph evidence pairs that must be
* resolved later remain distinct: CARD/VU activity and support events are preserved for
* {@code RuntimeEventMixingService}, while CARD_VEHICLES_USED/IW_CYCLE remain distinct for
* interval reconciliation.</p>
*/
@Service
public class RuntimeEventAggregationService {
private static final Set<String> DOWNSTREAM_RESOLVED_TACHOGRAPH_EXTRACTION_CODES = Set.of(
"CARD_ACTIVITY",
"VU_ACTIVITY",
"CARD_VEHICLES_USED",
"IW_CYCLE",
"CARD_POSITION",
"VU_POSITION",
"CARD_PLACE",
"VU_PLACE",
"CARD_BORDER_CROSSING",
"VU_BORDER_CROSSING",
"CARD_LOAD_UNLOAD",
"VU_LOAD_UNLOAD",
"CARD_SPECIFIC_CONDITION",
"VU_SPECIFIC_CONDITION"
);
private final RuntimeTachographEventSemantics tachographSemantics;
@Autowired
public RuntimeEventAggregationService(RuntimeTachographEventSemantics tachographSemantics) {
this.tachographSemantics = tachographSemantics;
}
/** Compatibility constructor used by unit tests and direct loader construction. */
public RuntimeEventAggregationService() {
this(new RuntimeTachographEventSemantics());
}
@SafeVarargs
public final List<EventHubEventDto> aggregateRuntimeEvents(List<EventHubEventDto>... eventGroups) {
LinkedHashMap<String, EventHubEventDto> exactSourceRecords = new LinkedHashMap<>();
if (eventGroups != null) {
for (List<EventHubEventDto> eventGroup : eventGroups) {
appendExactSourceRecords(exactSourceRecords, eventGroup);
}
}
LinkedHashMap<String, List<EventHubEventDto>> canonicalGroups = new LinkedHashMap<>();
for (EventHubEventDto event : exactSourceRecords.values()) {
canonicalGroups.computeIfAbsent(
canonicalAggregationKey(event),
ignored -> new ArrayList<>()
).add(event);
}
List<EventHubEventDto> aggregated = new ArrayList<>();
canonicalGroups.values().forEach(group -> aggregated.addAll(reduceCanonicalGroup(group)));
return aggregated.stream().sorted(eventComparator()).toList();
}
/** Compatibility alias for the first implementation name. */
@SafeVarargs
public final List<EventHubEventDto> aggregateExactSourceRecords(List<EventHubEventDto>... eventGroups) {
return aggregateRuntimeEvents(eventGroups);
}
public String exactSourceRecordKey(EventHubEventDto event) {
if (event == null) {
return "NULL_EVENT";
}
RuntimeEventSourceProfile profile = tachographSemantics.sourceProfile(event);
SourcePackageRefDto sourcePackage = event.sourcePackageRef();
String sourceIdentity = firstNonBlank(
event.externalSourceEventId(),
event.eventId() == null ? null : event.eventId().toString()
);
if (sourceIdentity == null) {
sourceIdentity = RuntimeEventIdentityResolver.canonicalEventKey(event);
}
return String.join("|",
"SOURCE_RECORD",
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
nullToEmpty(profile.sourceSystem()),
nullToEmpty(profile.sourceKind()),
nullToEmpty(profile.extractionCode()),
nullToEmpty(sourcePackage == null ? null : sourcePackage.packageKind()),
nullToEmpty(sourcePackage == null ? null : sourcePackage.sourcePackageId()),
nullToEmpty(sourcePackage == null ? null : sourcePackage.sourceEntityId()),
sourceIdentity
);
}
private String canonicalAggregationKey(EventHubEventDto event) {
String canonicalKey = RuntimeEventIdentityResolver.canonicalEventKey(event);
String semanticLifecycle = tachographSemantics.semanticLifecycle(event);
if (event == null || event.lifecycle() == null || semanticLifecycle == null
|| event.lifecycle().name().equals(semanticLifecycle)) {
return canonicalKey;
}
String[] parts = canonicalKey.split("\\|", -1);
if (parts.length > 4 && "EVENT".equals(parts[0])) {
parts[4] = semanticLifecycle;
return String.join("|", parts);
}
return canonicalKey;
}
private List<EventHubEventDto> reduceCanonicalGroup(List<EventHubEventDto> group) {
if (group == null || group.size() <= 1) {
return group == null ? List.of() : List.copyOf(group);
}
LinkedHashMap<String, EventHubEventDto> tachographEvidenceByExtraction = new LinkedHashMap<>();
for (EventHubEventDto event : group) {
RuntimeEventSourceProfile profile = tachographSemantics.sourceProfile(event);
if (profile.isTachographRuntimeSource()
&& DOWNSTREAM_RESOLVED_TACHOGRAPH_EXTRACTION_CODES.contains(profile.extractionCode())) {
tachographEvidenceByExtraction.putIfAbsent(profile.extractionCode(), event);
}
}
if (tachographEvidenceByExtraction.size() > 1) {
return List.copyOf(tachographEvidenceByExtraction.values());
}
return List.of(group.getFirst());
}
private void appendExactSourceRecords(
LinkedHashMap<String, EventHubEventDto> target,
List<EventHubEventDto> events
) {
if (events == null) {
return;
}
for (EventHubEventDto event : events) {
if (event != null) {
target.putIfAbsent(exactSourceRecordKey(event), event);
}
}
}
private Comparator<EventHubEventDto> eventComparator() {
return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name())
.thenComparing(event -> event.eventType() == null ? "" : event.eventType().name())
.thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name())
.thenComparing(event -> tachographSemantics.sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo))
.thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo));
}
private String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String value : values) {
if (value != null && !value.isBlank()) {
return value.trim();
}
}
return null;
}
private String nullToEmpty(Object value) {
return value == null ? "" : String.valueOf(value);
}
}

View File

@ -8,14 +8,13 @@ import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput; import at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
@ -25,13 +24,25 @@ public class UnifiedRuntimeEventAssemblyService {
private final List<RuntimeDriverEventLoader> driverEventLoaders; private final List<RuntimeDriverEventLoader> driverEventLoaders;
private final List<RuntimeVehicleEventLoader> vehicleEventLoaders; private final List<RuntimeVehicleEventLoader> vehicleEventLoaders;
private final RuntimeEventAggregationService eventAggregationService;
@Autowired
public UnifiedRuntimeEventAssemblyService(
List<RuntimeDriverEventLoader> driverEventLoaders,
List<RuntimeVehicleEventLoader> vehicleEventLoaders,
RuntimeEventAggregationService eventAggregationService
) {
this.driverEventLoaders = List.copyOf(driverEventLoaders);
this.vehicleEventLoaders = List.copyOf(vehicleEventLoaders);
this.eventAggregationService = eventAggregationService;
}
/** Compatibility constructor retained for existing tests and direct construction. */
public UnifiedRuntimeEventAssemblyService( public UnifiedRuntimeEventAssemblyService(
List<RuntimeDriverEventLoader> driverEventLoaders, List<RuntimeDriverEventLoader> driverEventLoaders,
List<RuntimeVehicleEventLoader> vehicleEventLoaders List<RuntimeVehicleEventLoader> vehicleEventLoaders
) { ) {
this.driverEventLoaders = List.copyOf(driverEventLoaders); this(driverEventLoaders, vehicleEventLoaders, new RuntimeEventAggregationService());
this.vehicleEventLoaders = List.copyOf(vehicleEventLoaders);
} }
public UnifiedRuntimeEventBundle assembleDriverScopedEvents(UnifiedRuntimeProcessingRequest request) { public UnifiedRuntimeEventBundle assembleDriverScopedEvents(UnifiedRuntimeProcessingRequest request) {
@ -43,7 +54,7 @@ public class UnifiedRuntimeEventAssemblyService {
? loadExpandedVehicleEvents(request, discoveredVehicles) ? loadExpandedVehicleEvents(request, discoveredVehicles)
: List.of(); : List.of();
List<EventHubEventDto> aggregatedEvents = expandVehicleEvents List<EventHubEventDto> aggregatedEvents = expandVehicleEvents
? deduplicateAndSort(driverSeedEvents, expandedVehicleEvents) ? eventAggregationService.aggregateRuntimeEvents(driverSeedEvents, expandedVehicleEvents)
: driverSeedEvents; : driverSeedEvents;
List<String> notes = new ArrayList<>(); List<String> notes = new ArrayList<>();
@ -76,6 +87,7 @@ public class UnifiedRuntimeEventAssemblyService {
} else { } else {
notes.add("Vehicle expansion was disabled for this runtime request."); notes.add("Vehicle expansion was disabled for this runtime request.");
} }
notes.add("Runtime aggregation removes repeated reads and duplicate serialized representations of the same extraction observation while preserving card/VU equivalents and CARD_VEHICLES_USED/IW_CYCLE evidence for later processing.");
notes.add("The assembled event set is a broad aggregated runtime scope; semantic card/VU mixing and interval reconciliation are performed by later modules."); notes.add("The assembled event set is a broad aggregated runtime scope; semantic card/VU mixing and interval reconciliation are performed by later modules.");
LOG.info( LOG.info(
"Runtime event assembly completed (expandVehicleEvents: {}, sourceInputs: {}, driverSeedEvents: {}, discoveredVehicles: {}, expandedVehicleEvents: {}, aggregatedEvents: {})", "Runtime event assembly completed (expandVehicleEvents: {}, sourceInputs: {}, driverSeedEvents: {}, discoveredVehicles: {}, expandedVehicleEvents: {}, aggregatedEvents: {})",
@ -107,7 +119,7 @@ public class UnifiedRuntimeEventAssemblyService {
result.addAll(loader.loadDriverEvents(sourceRequest)); result.addAll(loader.loadDriverEvents(sourceRequest));
} }
} }
return deduplicateAndSort(result, List.of()); return eventAggregationService.aggregateRuntimeEvents(result);
} }
private List<EventHubEventDto> loadExpandedVehicleEvents( private List<EventHubEventDto> loadExpandedVehicleEvents(
@ -126,7 +138,7 @@ public class UnifiedRuntimeEventAssemblyService {
} }
} }
} }
return deduplicateAndSort(result, List.of()); return eventAggregationService.aggregateRuntimeEvents(result);
} }
private List<UnifiedDiscoveredVehicleRef> discoverVehicles(List<EventHubEventDto> events) { private List<UnifiedDiscoveredVehicleRef> discoverVehicles(List<EventHubEventDto> events) {
@ -166,28 +178,6 @@ public class UnifiedRuntimeEventAssemblyService {
return List.copyOf(result); return List.copyOf(result);
} }
private List<EventHubEventDto> deduplicateAndSort(
List<EventHubEventDto> left,
List<EventHubEventDto> right
) {
LinkedHashMap<String, EventHubEventDto> byKey = new LinkedHashMap<>();
appendDeduplicated(byKey, left);
appendDeduplicated(byKey, right);
return byKey.values().stream()
.sorted(Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name())
.thenComparing(event -> event.eventType() == null ? "" : event.eventType().name())
.thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name())
.thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo)))
.toList();
}
private void appendDeduplicated(LinkedHashMap<String, EventHubEventDto> byKey, List<EventHubEventDto> events) {
for (EventHubEventDto event : events) {
byKey.putIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), event);
}
}
private RuntimeDriverEventLoader driverLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { private RuntimeDriverEventLoader driverLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return driverEventLoaders.stream() return driverEventLoaders.stream()
.filter(loader -> loader.supports(request, sourceFamily)) .filter(loader -> loader.supports(request, sourceFamily))

View File

@ -16,6 +16,7 @@ import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.service.RuntimeDriverEventLoader; import at.procon.eventhub.processing.service.RuntimeDriverEventLoader;
import at.procon.eventhub.processing.service.RuntimeEventAggregationService;
import at.procon.eventhub.processing.service.RuntimeVehicleEventLoader; import at.procon.eventhub.processing.service.RuntimeVehicleEventLoader;
import at.procon.eventhub.reference.TachographNationRegistry; import at.procon.eventhub.reference.TachographNationRegistry;
import at.procon.eventhub.tachograph.dto.TachographImportRequest; import at.procon.eventhub.tachograph.dto.TachographImportRequest;
@ -30,6 +31,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
@ -45,15 +47,28 @@ public class TachographDbRuntimeEventLoader implements RuntimeDriverEventLoader,
private final NamedParameterJdbcTemplate jdbcTemplate; private final NamedParameterJdbcTemplate jdbcTemplate;
private final TachographExtractionDefinitionRegistry definitionRegistry; private final TachographExtractionDefinitionRegistry definitionRegistry;
private final ResourceLoader resourceLoader; private final ResourceLoader resourceLoader;
private final RuntimeEventAggregationService eventAggregationService;
@Autowired
public TachographDbRuntimeEventLoader( public TachographDbRuntimeEventLoader(
@Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate, @Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate,
TachographExtractionDefinitionRegistry definitionRegistry, TachographExtractionDefinitionRegistry definitionRegistry,
ResourceLoader resourceLoader ResourceLoader resourceLoader,
RuntimeEventAggregationService eventAggregationService
) { ) {
this.jdbcTemplate = jdbcTemplate; this.jdbcTemplate = jdbcTemplate;
this.definitionRegistry = definitionRegistry; this.definitionRegistry = definitionRegistry;
this.resourceLoader = resourceLoader; this.resourceLoader = resourceLoader;
this.eventAggregationService = eventAggregationService;
}
/** Compatibility constructor retained for direct construction. */
public TachographDbRuntimeEventLoader(
NamedParameterJdbcTemplate jdbcTemplate,
TachographExtractionDefinitionRegistry definitionRegistry,
ResourceLoader resourceLoader
) {
this(jdbcTemplate, definitionRegistry, resourceLoader, new RuntimeEventAggregationService());
} }
@Override @Override
@ -74,7 +89,7 @@ public class TachographDbRuntimeEventLoader implements RuntimeDriverEventLoader,
request.occurredTo() request.occurredTo()
)); ));
} }
return List.copyOf(result); return eventAggregationService.aggregateRuntimeEvents(result);
} }
@ -93,7 +108,7 @@ public class TachographDbRuntimeEventLoader implements RuntimeDriverEventLoader,
request.vehicleOccurredTo() request.vehicleOccurredTo()
)); ));
} }
return List.copyOf(result); return eventAggregationService.aggregateRuntimeEvents(result);
} }
private List<EventHubEventDto> queryDefinition( private List<EventHubEventDto> queryDefinition(

View File

@ -11,14 +11,15 @@ import at.procon.eventhub.processing.service.RuntimeDriverEventLoader;
import at.procon.eventhub.processing.service.RuntimeVehicleEventLoader; import at.procon.eventhub.processing.service.RuntimeVehicleEventLoader;
import at.procon.eventhub.processing.service.UnifiedDriverEventSourceService; import at.procon.eventhub.processing.service.UnifiedDriverEventSourceService;
import at.procon.eventhub.processing.service.UnifiedVehicleEventSourceService; import at.procon.eventhub.processing.service.UnifiedVehicleEventSourceService;
import at.procon.eventhub.processing.service.RuntimeEventAggregationService;
import at.procon.eventhub.service.EventAcquisitionRecordKeyService; import at.procon.eventhub.service.EventAcquisitionRecordKeyService;
import at.procon.eventhub.service.EventHubEventSorter; import at.procon.eventhub.service.EventHubEventSorter;
import at.procon.eventhub.tachographfilesession.service.TachographCompositeSessionNotFoundException; import at.procon.eventhub.tachographfilesession.service.TachographCompositeSessionNotFoundException;
import at.procon.eventhub.tachographfilesession.service.TachographCompositeSessionRepository; import at.procon.eventhub.tachographfilesession.service.TachographCompositeSessionRepository;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
@ -27,21 +28,35 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve
private final UnifiedDriverEventSourceService driverEventSourceService; private final UnifiedDriverEventSourceService driverEventSourceService;
private final UnifiedVehicleEventSourceService vehicleEventSourceService; private final UnifiedVehicleEventSourceService vehicleEventSourceService;
private final TachographCompositeSessionRepository compositeSessionRepository; private final TachographCompositeSessionRepository compositeSessionRepository;
private final EventAcquisitionRecordKeyService eventKeyService; private final RuntimeEventAggregationService eventAggregationService;
private final EventHubEventSorter eventSorter;
@Autowired
public TachographFileSessionRuntimeEventLoader( public TachographFileSessionRuntimeEventLoader(
UnifiedDriverEventSourceService driverEventSourceService, UnifiedDriverEventSourceService driverEventSourceService,
UnifiedVehicleEventSourceService vehicleEventSourceService, UnifiedVehicleEventSourceService vehicleEventSourceService,
TachographCompositeSessionRepository compositeSessionRepository, TachographCompositeSessionRepository compositeSessionRepository,
EventAcquisitionRecordKeyService eventKeyService, RuntimeEventAggregationService eventAggregationService
EventHubEventSorter eventSorter
) { ) {
this.driverEventSourceService = driverEventSourceService; this.driverEventSourceService = driverEventSourceService;
this.vehicleEventSourceService = vehicleEventSourceService; this.vehicleEventSourceService = vehicleEventSourceService;
this.compositeSessionRepository = compositeSessionRepository; this.compositeSessionRepository = compositeSessionRepository;
this.eventKeyService = eventKeyService; this.eventAggregationService = eventAggregationService;
this.eventSorter = eventSorter; }
/** Compatibility constructor retained for existing tests and direct construction. */
public TachographFileSessionRuntimeEventLoader(
UnifiedDriverEventSourceService driverEventSourceService,
UnifiedVehicleEventSourceService vehicleEventSourceService,
TachographCompositeSessionRepository compositeSessionRepository,
EventAcquisitionRecordKeyService ignoredEventKeyService,
EventHubEventSorter ignoredEventSorter
) {
this(
driverEventSourceService,
vehicleEventSourceService,
compositeSessionRepository,
new RuntimeEventAggregationService()
);
} }
@Override @Override
@ -64,7 +79,7 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve
) )
)); ));
} }
return deduplicateBySignatureAndSort(result); return eventAggregationService.aggregateRuntimeEvents(result);
} }
@Override @Override
@ -87,7 +102,7 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve
) )
)); ));
} }
return deduplicateBySignatureAndSort(result); return eventAggregationService.aggregateRuntimeEvents(result);
} }
private List<UUID> resolveSessionIds(UnifiedRuntimeProcessingRequest request) { private List<UUID> resolveSessionIds(UnifiedRuntimeProcessingRequest request) {
@ -99,11 +114,5 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve
return request.sessionIds(); return request.sessionIds();
} }
private List<EventHubEventDto> deduplicateBySignatureAndSort(List<EventHubEventDto> events) {
LinkedHashMap<String, EventHubEventDto> bySignature = new LinkedHashMap<>();
for (EventHubEventDto event : events) {
bySignature.putIfAbsent(eventKeyService.buildEventSignatureHash(event), event);
}
return eventSorter.sort(new ArrayList<>(bySignature.values()));
}
} }

View File

@ -0,0 +1,326 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDetailsDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourcePackageRefDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.OffsetDateTime;
import java.util.List;
import org.junit.jupiter.api.Test;
class RuntimeTachographRepresentationParityTest {
private static final OffsetDateTime OCCURRED_AT = OffsetDateTime.parse("2026-04-01T08:00:00Z");
private final RuntimeEventDescriptorFactory descriptorFactory = new RuntimeEventDescriptorFactory();
private final RuntimeEventMixingService mixingService = new RuntimeEventMixingService();
@Test
void producesSameCanonicalSourceProfileForDbAndFileSessionPlaceRepresentations() {
EventHubEventDto dbEvent = event(
Representation.DB,
"DRIVER_CARD",
"CARD_PLACE",
"TACHOGRAPH:CARD_PLACE:10",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.START
);
EventHubEventDto fileSessionEvent = event(
Representation.FILE_SESSION,
"DRIVER_CARD",
null,
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:place-10:BEGIN:2026-04-01T08:00:00Z",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN
);
RuntimeEventDescriptor dbDescriptor = descriptorFactory.describe(dbEvent);
RuntimeEventDescriptor fileDescriptor = descriptorFactory.describe(fileSessionEvent);
assertThat(dbDescriptor.sourceProfile()).isEqualTo(new RuntimeEventSourceProfile(
"TACHOGRAPH",
"DRIVER_CARD",
"CARD_PLACE"
));
assertThat(fileDescriptor.sourceProfile()).isEqualTo(dbDescriptor.sourceProfile());
assertThat(fileDescriptor.compatibleSupportEvidenceKey())
.isEqualTo(dbDescriptor.compatibleSupportEvidenceKey());
}
@Test
void infersSameVehicleUsageExtractionProfilesForDbAndFileSessionRepresentations() {
EventHubEventDto dbCvu = event(
Representation.DB,
"DRIVER_CARD",
"CARD_VEHICLES_USED",
"TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT",
EventDomain.DRIVER_CARD,
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
EventHubEventDto fileCvu = event(
Representation.FILE_SESSION,
"DRIVER_CARD",
null,
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:CARD_USAGE:cvu-10:INSERT:2026-04-01T08:00:00Z",
EventDomain.DRIVER_CARD,
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
EventHubEventDto dbIw = event(
Representation.DB,
"VEHICLE_UNIT",
"IW_CYCLE",
"TACHOGRAPH:IW_CYCLE:20:INSERT",
EventDomain.DRIVER_CARD,
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
EventHubEventDto fileIw = event(
Representation.FILE_SESSION,
"VEHICLE_UNIT",
null,
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:CARD_USAGE:iw-20:INSERT:2026-04-01T08:00:00Z",
EventDomain.DRIVER_CARD,
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
assertThat(descriptorFactory.sourceProfile(fileCvu))
.isEqualTo(descriptorFactory.sourceProfile(dbCvu));
assertThat(descriptorFactory.sourceProfile(fileIw))
.isEqualTo(descriptorFactory.sourceProfile(dbIw));
assertThat(descriptorFactory.sourceProfile(dbCvu).extractionCode())
.isEqualTo("CARD_VEHICLES_USED");
assertThat(descriptorFactory.sourceProfile(dbIw).extractionCode())
.isEqualTo("IW_CYCLE");
}
@Test
void producesEquivalentMixingOutcomeForDbAndFileSessionSupportPairs() {
RuntimeMixedEventBundle dbMixed = mixingService.mix(
List.of(
event(
Representation.DB,
"DRIVER_CARD",
"CARD_PLACE",
"TACHOGRAPH:CARD_PLACE:10",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.START
),
event(
Representation.DB,
"VEHICLE_UNIT",
"VU_PLACE",
"TACHOGRAPH:VU_PLACE:20",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.START
)
),
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
);
RuntimeMixedEventBundle fileMixed = mixingService.mix(
List.of(
event(
Representation.FILE_SESSION,
"DRIVER_CARD",
null,
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-place-10:BEGIN:2026-04-01T08:00:00Z",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN
),
event(
Representation.FILE_SESSION,
"VEHICLE_UNIT",
null,
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-place-20:BEGIN:2026-04-01T08:00:00Z",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN
)
),
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
);
assertThat(fileMixed.supportEvidenceEvents()).hasSameSizeAs(dbMixed.supportEvidenceEvents());
assertThat(fileMixed.suppressedEvents()).hasSameSizeAs(dbMixed.suppressedEvents());
assertThat(fileMixed.eventMixingDecisions()).hasSameSizeAs(dbMixed.eventMixingDecisions());
assertThat(fileMixed.eventMixingDecisions().getFirst().primaryExtractionCode())
.isEqualTo(dbMixed.eventMixingDecisions().getFirst().primaryExtractionCode());
assertThat(fileMixed.eventMixingDecisions().getFirst().secondaryExtractionCodes())
.isEqualTo(dbMixed.eventMixingDecisions().getFirst().secondaryExtractionCodes());
assertThat(fileMixed.eventMixingDecisions().getFirst().ruleId())
.isEqualTo(dbMixed.eventMixingDecisions().getFirst().ruleId());
}
@Test
void producesEquivalentMixingOutcomeForDbAndFileSessionActivityPairs() {
RuntimeMixedEventBundle dbMixed = mixingService.mix(
List.of(
event(Representation.DB, "DRIVER_CARD", "CARD_ACTIVITY",
"TACHOGRAPH:CARD_ACTIVITY:10:START",
EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START),
event(Representation.DB, "VEHICLE_UNIT", "VU_ACTIVITY",
"TACHOGRAPH:VU_ACTIVITY:20:START",
EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START)
),
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
);
RuntimeMixedEventBundle fileMixed = mixingService.mix(
List.of(
event(Representation.FILE_SESSION, "DRIVER_CARD", null,
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-10:START:2026-04-01T08:00:00Z",
EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START),
event(Representation.FILE_SESSION, "VEHICLE_UNIT", null,
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-20:START:2026-04-01T08:00:00Z",
EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.START)
),
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
);
assertThat(fileMixed.activityTimelineEvents()).hasSameSizeAs(dbMixed.activityTimelineEvents());
assertThat(fileMixed.suppressedEvents()).hasSameSizeAs(dbMixed.suppressedEvents());
assertThat(fileMixed.eventMixingDecisions()).hasSameSizeAs(dbMixed.eventMixingDecisions());
assertThat(fileMixed.eventMixingDecisions().getFirst().primaryExtractionCode())
.isEqualTo(dbMixed.eventMixingDecisions().getFirst().primaryExtractionCode());
assertThat(fileMixed.eventMixingDecisions().getFirst().secondaryExtractionCodes())
.isEqualTo(dbMixed.eventMixingDecisions().getFirst().secondaryExtractionCodes());
}
@Test
void preservesEquivalentCvuAndIwCycleInputsForBothRepresentations() {
List<EventHubEventDto> dbEvents = List.of(
event(Representation.DB, "DRIVER_CARD", "CARD_VEHICLES_USED",
"TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT",
EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT),
event(Representation.DB, "DRIVER_CARD", "CARD_VEHICLES_USED",
"TACHOGRAPH:CARD_VEHICLES_USED:10:WITHDRAW",
EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW),
event(Representation.DB, "VEHICLE_UNIT", "IW_CYCLE",
"TACHOGRAPH:IW_CYCLE:20:INSERT",
EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT),
event(Representation.DB, "VEHICLE_UNIT", "IW_CYCLE",
"TACHOGRAPH:IW_CYCLE:20:WITHDRAW",
EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW)
);
List<EventHubEventDto> fileEvents = List.of(
event(Representation.FILE_SESSION, "DRIVER_CARD", null,
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:CARD_USAGE:cvu-10:INSERT:2026-04-01T08:00:00Z",
EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT),
event(Representation.FILE_SESSION, "DRIVER_CARD", null,
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:CARD_USAGE:cvu-10:WITHDRAW:2026-04-01T08:00:00Z",
EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW),
event(Representation.FILE_SESSION, "VEHICLE_UNIT", null,
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:CARD_USAGE:iw-20:INSERT:2026-04-01T08:00:00Z",
EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT),
event(Representation.FILE_SESSION, "VEHICLE_UNIT", null,
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:CARD_USAGE:iw-20:WITHDRAW:2026-04-01T08:00:00Z",
EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW)
);
RuntimeMixedEventBundle dbMixed = mixingService.mix(
dbEvents, RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
RuntimeMixedEventBundle fileMixed = mixingService.mix(
fileEvents, RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(dbMixed.vehicleUsageEvents()).hasSize(4);
assertThat(fileMixed.vehicleUsageEvents()).hasSize(4);
assertThat(dbMixed.suppressedEvents()).isEmpty();
assertThat(fileMixed.suppressedEvents()).isEmpty();
assertThat(fileEvents).extracting(event -> descriptorFactory.sourceProfile(event).extractionCode())
.containsExactly("CARD_VEHICLES_USED", "CARD_VEHICLES_USED", "IW_CYCLE", "IW_CYCLE");
}
private EventHubEventDto event(
Representation representation,
String sourceKind,
String extractionCode,
String externalSourceEventId,
EventDomain domain,
EventType type,
EventLifecycle lifecycle
) {
String provider = representation == Representation.DB ? "TACHOGRAPH" : "TACHOGRAPH_FILE_SESSION";
EventSourceDto source = new EventSourceDto(
provider,
sourceKind,
provider + "_" + sourceKind,
null,
null,
null
);
EventHubPackageRequest packageInfo = new EventHubPackageRequest(
"default",
source,
null,
ImportScopeDto.tenantAll(null, null),
domain.name(),
null,
provider + ":package"
);
ObjectNode raw = JsonNodeFactory.instance.objectNode();
raw.put("sourceKind", sourceKind);
raw.put("driverKey", "12:123");
raw.put("registrationKey", "12:REG-1");
if (extractionCode != null) {
raw.put("extractionCode", extractionCode);
}
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
return new EventHubEventDto(
null,
externalSourceEventId,
new DriverRefDto("driver-1", new DriverCardRefDto("12", "123")),
new VehicleRefDto(
sourceKind.equals("VEHICLE_UNIT") ? "vehicle-1" : null,
sourceKind.equals("VEHICLE_UNIT") ? "VIN-1" : null,
new VehicleRegistrationRefDto("12", "REG-1")
),
OCCURRED_AT,
null,
null,
domain,
type,
lifecycle,
null,
null,
new EventDetailsDto(domain.name(), JsonNodeFactory.instance.objectNode()),
new SourcePackageRefDto(
representation == Representation.DB ? sourceKind : "TACHOGRAPH_FILE_SESSION",
representation == Representation.DB ? "package-1" : "session-1",
sourceKind.equals("VEHICLE_UNIT") ? "vehicle-1" : "card-1",
null,
null,
null
),
payload,
false,
packageInfo
);
}
private enum Representation {
DB,
FILE_SESSION
}
}

View File

@ -0,0 +1,202 @@
package at.procon.eventhub.processing.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDetailsDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.OffsetDateTime;
import java.util.List;
import org.junit.jupiter.api.Test;
class RuntimeEventAggregationServiceTest {
private final RuntimeEventAggregationService service = new RuntimeEventAggregationService();
@Test
void removesOnlyRepeatedReadsOfSameSourceRecord() {
EventHubEventDto cardPosition = event(
"DRIVER_CARD",
"CARD_POSITION",
"TACHOGRAPH:CARD_POSITION:10",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT
);
EventHubEventDto vuPosition = event(
"VEHICLE_UNIT",
"VU_POSITION",
"TACHOGRAPH:VU_POSITION:20",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT
);
EventHubEventDto cvu = event(
"DRIVER_CARD",
"CARD_VEHICLES_USED",
"TACHOGRAPH:CARD_VEHICLES_USED:30:INSERT",
EventDomain.DRIVER_CARD,
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
EventHubEventDto iwCycle = event(
"VEHICLE_UNIT",
"IW_CYCLE",
"TACHOGRAPH:IW_CYCLE:40:INSERT",
EventDomain.DRIVER_CARD,
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
List<EventHubEventDto> aggregated = service.aggregateRuntimeEvents(
List.of(cardPosition, vuPosition, cvu, iwCycle),
List.of(cardPosition)
);
assertThat(aggregated).hasSize(4);
assertThat(aggregated).extracting(EventHubEventDto::externalSourceEventId)
.containsExactlyInAnyOrder(
cardPosition.externalSourceEventId(),
vuPosition.externalSourceEventId(),
cvu.externalSourceEventId(),
iwCycle.externalSourceEventId()
);
}
@Test
void collapsesDuplicateSerializedRepresentationsOfSameExtractionObservation() {
EventHubEventDto first = event(
"DRIVER_CARD",
"CARD_POSITION",
"TACHOGRAPH:CARD_POSITION:10",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT
);
EventHubEventDto serializedCopy = event(
"DRIVER_CARD",
"CARD_POSITION",
"TACHOGRAPH:CARD_POSITION:COPY-10",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT
);
assertThat(service.aggregateRuntimeEvents(List.of(first, serializedCopy)))
.containsExactly(first);
}
@Test
void collapsesPlaceStartAndBeginRepresentationsForSameExtractionSource() {
EventHubEventDto dbStyle = event(
"DRIVER_CARD",
"CARD_PLACE",
"TACHOGRAPH:CARD_PLACE:10",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.START
);
EventHubEventDto fileStyle = event(
"DRIVER_CARD",
"CARD_PLACE",
"TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:place-10:BEGIN:2026-04-01T08:00:00Z",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN
);
assertThat(service.aggregateRuntimeEvents(List.of(dbStyle, fileStyle)))
.containsExactly(dbStyle);
}
@Test
void keepsDifferentExtractionSourcesEvenWhenSemanticEventDataIsEqual() {
EventHubEventDto card = event(
"DRIVER_CARD",
"CARD_ACTIVITY",
"TACHOGRAPH:CARD_ACTIVITY:10:START",
EventDomain.DRIVER_ACTIVITY,
EventType.DRIVE,
EventLifecycle.START
);
EventHubEventDto vu = event(
"VEHICLE_UNIT",
"VU_ACTIVITY",
"TACHOGRAPH:VU_ACTIVITY:20:START",
EventDomain.DRIVER_ACTIVITY,
EventType.DRIVE,
EventLifecycle.START
);
assertThat(service.aggregateRuntimeEvents(List.of(card, vu)))
.containsExactly(card, vu);
assertThat(service.exactSourceRecordKey(card))
.isNotEqualTo(service.exactSourceRecordKey(vu));
}
private EventHubEventDto event(
String sourceKind,
String extractionCode,
String externalSourceEventId,
EventDomain domain,
EventType eventType,
EventLifecycle lifecycle
) {
EventSourceDto source = new EventSourceDto(
"TACHOGRAPH",
sourceKind,
"TACHOGRAPH_" + sourceKind,
null,
null,
null
);
EventHubPackageRequest packageInfo = new EventHubPackageRequest(
"default",
source,
null,
ImportScopeDto.tenantAll(null, null),
domain.name(),
null,
"TACHOGRAPH:package"
);
ObjectNode raw = JsonNodeFactory.instance.objectNode();
raw.put("sourceKind", sourceKind);
raw.put("extractionCode", extractionCode);
raw.put("driverKey", "12:123");
raw.put("registrationKey", "12:REG-1");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
return new EventHubEventDto(
null,
externalSourceEventId,
new DriverRefDto("driver-1", new DriverCardRefDto("12", "123")),
new VehicleRefDto("vehicle-1", "VIN-1", new VehicleRegistrationRefDto("12", "REG-1")),
OffsetDateTime.parse("2026-04-01T08:00:00Z"),
null,
null,
domain,
eventType,
lifecycle,
null,
null,
new EventDetailsDto(domain.name(), JsonNodeFactory.instance.objectNode()),
null,
payload,
false,
packageInfo
);
}
}