From 27b411e647ab099c7e9ebdcc4269fc7c7677a4e3 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 25 May 2026 23:12:49 +0200 Subject: [PATCH] Add runtime support evidence normalization --- docs/runtime-event-processing.md | 35 ++ ...ntime-tachograph-esper-scope-processing.md | 6 + ...verEsperRuntimeEventProcessingProfile.java | 3 +- .../support/RuntimeSupportEvidenceEvent.java | 40 ++ ...imeSupportEvidenceNormalizationResult.java | 19 + .../RuntimeSupportEvidenceNormalizer.java | 436 ++++++++++++++++++ ...nifiedRuntimeDerivedProjectionService.java | 25 +- .../RuntimeSupportEvidenceNormalizerTest.java | 161 +++++++ 8 files changed, 718 insertions(+), 7 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceEvent.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizationResult.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizer.java create mode 100644 src/test/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizerTest.java diff --git a/docs/runtime-event-processing.md b/docs/runtime-event-processing.md index d395fb6..c92cf03 100644 --- a/docs/runtime-event-processing.md +++ b/docs/runtime-event-processing.md @@ -351,3 +351,38 @@ supportGeoEvents ``` For a single session, this is a direct parity check against the original file-session endpoint. For multiple sessions, the reference side is the sum of the individual file-session endpoint results per driver; runtime processing may intentionally deduplicate or merge across session boundaries, so differences should be reviewed with the debug/audit output. + +## Runtime support evidence normalization + +The tachograph profile now normalizes mixed-source support events before invoking the shared Esper core: + +```text +runtime partition events + -> RuntimeSupportEvidenceNormalizer + -> tachograph-consumable support evidence view + -> event-input Esper preprocessor / driving-derived bundle +``` + +The normalizer does not change driver activity events or driver-card usage events. It only adapts support/vehicle events that carry geo or odometer evidence. Provider-specific semantics are preserved in the payload under `raw.originalEventDomain`, `raw.originalEventType`, `raw.originalLifecycle`, `raw.supportEventDomain`, and `raw.supportEventType`. + +Examples: + +```text +IGNITION / IGNITION_ON with position + -> POSITION / POSITION_RECORDED / SNAPSHOT + -> raw.supportEventType = IGNITION_ON + +TELEMATICS_DATA with position + -> POSITION / POSITION_RECORDED / SNAPSHOT + -> raw.supportEventType = TELEMATICS_DATA + +BORDER_CROSSING with position + -> BORDER_CROSSING, preserving the original border event type/lifecycle + +LOAD_UNLOAD with position + -> LOAD_UNLOAD, preserving the original load/unload event type/lifecycle +``` + +This keeps the EPL rules provider-neutral. YellowFox, tachograph VU, or future telematics events are converted to the common support-evidence shape before the tachograph profile consumes them. + +Runtime result notes include how many events were inspected and how many support events were adapted. Use partition debug together with normalization notes when validating mixed-source attribution. diff --git a/docs/runtime-tachograph-esper-scope-processing.md b/docs/runtime-tachograph-esper-scope-processing.md index b06419c..967ba1c 100644 --- a/docs/runtime-tachograph-esper-scope-processing.md +++ b/docs/runtime-tachograph-esper-scope-processing.md @@ -52,3 +52,9 @@ Attachment is temporal: a vehicle-only event must match a reconstructed driver v ## Debugging vehicle evidence attachment Prefer the generic `/api/eventhub/runtime-processing/event-processing` endpoint with `partitioning.includeDebug=true` or `parameters.includePartitionDebug=true`. The compatibility response type has `partitionDebugByDriver`, but the generic endpoint is the preferred way to enable debug output explicitly. The generic response exposes debug data under `partitionResults[*].metadata.partitionDebug`. + +## Support evidence normalization + +The tachograph runtime profile is now implemented as a specialization of the generic runtime event-processing framework. Before calling the shared tachograph Esper core, mixed support events are normalized by `RuntimeSupportEvidenceNormalizer`. + +This means that attached vehicle-only evidence from sources such as YellowFox ignition/position events can be consumed by the tachograph profile as support geo evidence when the event contains a position or odometer value. The provider-specific event meaning is preserved in the raw payload, while the Esper-facing event domain is adapted to the common tachograph support evidence contract. diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/profile/TachographDriverEsperRuntimeEventProcessingProfile.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/profile/TachographDriverEsperRuntimeEventProcessingProfile.java index d46e963..0cd5c23 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/profile/TachographDriverEsperRuntimeEventProcessingProfile.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/profile/TachographDriverEsperRuntimeEventProcessingProfile.java @@ -45,7 +45,8 @@ public class TachographDriverEsperRuntimeEventProcessingProfile implements Runti @Override public String description() { return "Runs the shared tachograph driver Esper processing pipeline over Runtime Processing event scopes. " - + "The profile partitions mixed runtime events by driver before invoking the event-input EPL pipeline."; + + "The profile partitions mixed runtime events by driver, attaches vehicle evidence by vehicle/time, " + + "normalizes mixed-source support evidence, and then invokes the event-input EPL pipeline."; } @Override diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceEvent.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceEvent.java new file mode 100644 index 0000000..40ebed7 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceEvent.java @@ -0,0 +1,40 @@ +package at.procon.eventhub.processing.eventprocessing.support; + +import java.math.BigDecimal; +import java.time.OffsetDateTime; +import java.util.Map; + +/** + * Source-neutral support-evidence view used before adapting mixed provider + * events into a concrete processing profile. The current tachograph Esper + * profile consumes this as geo support evidence when latitude/longitude are + * available, while the original provider semantics remain in rawAttributes. + */ +public record RuntimeSupportEvidenceEvent( + String eventId, + String sourceFamily, + String sourceKind, + String eventDomain, + String eventType, + String lifecycle, + String driverKey, + String vehicleKey, + String registrationKey, + OffsetDateTime occurredAt, + Long occurredAtEpochSecond, + BigDecimal latitude, + BigDecimal longitude, + String countryCode, + String regionCode, + String countryFrom, + String countryTo, + String operation, + Long odometerKm, + BigDecimal speedKmh, + BigDecimal maxSpeedKmh, + Map rawAttributes +) { + public RuntimeSupportEvidenceEvent { + rawAttributes = rawAttributes == null ? Map.of() : Map.copyOf(rawAttributes); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizationResult.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizationResult.java new file mode 100644 index 0000000..05ad99f --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizationResult.java @@ -0,0 +1,19 @@ +package at.procon.eventhub.processing.eventprocessing.support; + +import at.procon.eventhub.dto.EventHubEventDto; +import java.util.List; + +public record RuntimeSupportEvidenceNormalizationResult( + List normalizedEvents, + int inputEventCount, + int normalizedSupportEvidenceEventCount, + int unchangedEventCount, + List notes +) { + public RuntimeSupportEvidenceNormalizationResult { + normalizedEvents = normalizedEvents == null ? List.of() : List.copyOf(normalizedEvents); + normalizedSupportEvidenceEventCount = Math.max(0, normalizedSupportEvidenceEventCount); + unchangedEventCount = Math.max(0, unchangedEventCount); + notes = notes == null ? List.of() : List.copyOf(notes); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizer.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizer.java new file mode 100644 index 0000000..96da6cc --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizer.java @@ -0,0 +1,436 @@ +package at.procon.eventhub.processing.eventprocessing.support; + +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDetailsDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.GeoPointDto; +import at.procon.eventhub.dto.VehicleRefDto; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import org.springframework.stereotype.Service; + +@Service +public class RuntimeSupportEvidenceNormalizer { + + private static final LinkedHashSet DIRECT_TACHOGRAPH_SUPPORT_DOMAINS = new LinkedHashSet<>(List.of( + EventDomain.POSITION, + EventDomain.PLACE, + EventDomain.BORDER_CROSSING, + EventDomain.LOAD_UNLOAD + )); + + private final ObjectMapper objectMapper; + + public RuntimeSupportEvidenceNormalizer(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public RuntimeSupportEvidenceNormalizationResult normalizeForTachographDriver( + String driverKey, + List events + ) { + List safeEvents = events == null ? List.of() : List.copyOf(events); + List normalizedEvents = new ArrayList<>(safeEvents.size()); + int normalizedSupportEvidence = 0; + int unchanged = 0; + for (EventHubEventDto event : safeEvents) { + EventHubEventDto normalized = normalizeOneForTachographDriver(driverKey, event); + normalizedEvents.add(normalized); + if (normalized != event) { + normalizedSupportEvidence++; + } else { + unchanged++; + } + } + List notes = new ArrayList<>(); + notes.add("Runtime support evidence normalization inspected " + safeEvents.size() + " event(s)."); + notes.add("Runtime support evidence normalization adapted " + normalizedSupportEvidence + + " support/vehicle event(s) for the tachograph Esper profile."); + return new RuntimeSupportEvidenceNormalizationResult( + normalizedEvents, + safeEvents.size(), + normalizedSupportEvidence, + unchanged, + notes + ); + } + + public RuntimeSupportEvidenceEvent toSupportEvidenceEvent(String fallbackDriverKey, EventHubEventDto event) { + if (event == null || isDriverActivityOrCardUsage(event)) { + return null; + } + JsonNode raw = rawPayload(event); + GeoPointDto position = event.position(); + BigDecimal latitude = position == null ? decimal(raw, "latitude") : position.latitude(); + BigDecimal longitude = position == null ? decimal(raw, "longitude") : position.longitude(); + Long odometerKm = firstNonNull(longValue(raw, "odometerKm"), toKilometers(event.odometerM())); + return new RuntimeSupportEvidenceEvent( + firstNonBlank(text(raw, "supportEventId"), text(raw, "sourceRowId"), event.externalSourceEventId()), + sourceFamily(event), + firstNonBlank(text(raw, "sourceKind"), sourceKind(event)), + event.eventDomain() == null ? null : event.eventDomain().name(), + event.eventType() == null ? null : event.eventType().name(), + event.lifecycle() == null ? null : event.lifecycle().name(), + firstNonBlank(text(raw, "driverKey"), fallbackDriverKey, driverKey(event)), + firstNonBlank(text(raw, "vehicleKey"), vehicleKey(event)), + firstNonBlank(text(raw, "registrationKey"), registrationKey(event)), + event.occurredAt(), + event.occurredAt() == null ? null : event.occurredAt().toEpochSecond(), + latitude, + longitude, + firstNonBlank(text(raw, "country"), detailText(event, "country")), + firstNonBlank(text(raw, "region"), detailText(event, "region")), + firstNonBlank(text(raw, "countryFrom"), detailText(event, "countryFrom")), + firstNonBlank(text(raw, "countryTo"), detailText(event, "countryTo")), + firstNonBlank(text(raw, "operation"), detailText(event, "operation")), + odometerKm, + decimal(raw, "avgSpeedKmh"), + decimal(raw, "maxSpeedKmh"), + rawAttributes(event, raw) + ); + } + + private EventHubEventDto normalizeOneForTachographDriver(String fallbackDriverKey, EventHubEventDto event) { + if (event == null || isDriverActivityOrCardUsage(event)) { + return event; + } + RuntimeSupportEvidenceEvent support = toSupportEvidenceEvent(fallbackDriverKey, event); + if (support == null || !hasGeoOrOdometerEvidence(support)) { + return event; + } + EventDomain normalizedDomain = normalizedDomain(event); + EventType normalizedType = normalizedType(normalizedDomain, event.eventType()); + EventLifecycle normalizedLifecycle = normalizedLifecycle(normalizedDomain, event.lifecycle()); + JsonNode payload = normalizedPayload(fallbackDriverKey, event, support, normalizedDomain); + EventDetailsDto details = normalizedDetails(event, support, normalizedDomain); + DriverRefDto driverRef = event.driverRef(); + if ((driverRef == null || !driverRef.hasAnyReference()) && support.driverKey() != null) { + driverRef = new DriverRefDto(support.driverKey(), null); + } + return new EventHubEventDto( + event.eventId(), + event.externalSourceEventId(), + driverRef, + event.vehicleRef(), + event.occurredAt(), + event.receivedPartnerAt(), + event.receivedHubAt(), + normalizedDomain, + normalizedType, + normalizedLifecycle, + event.odometerM(), + normalizedPosition(event, support), + details, + event.sourcePackageRef(), + payload, + event.manualEntry(), + event.packageInfo() + ); + } + + private GeoPointDto normalizedPosition(EventHubEventDto original, RuntimeSupportEvidenceEvent support) { + if (original != null && original.position() != null) { + return original.position(); + } + if (support == null || support.latitude() == null || support.longitude() == null) { + return null; + } + return new GeoPointDto(support.latitude(), support.longitude()); + } + + private boolean isDriverActivityOrCardUsage(EventHubEventDto event) { + if (event == null) { + return true; + } + if (event.eventDomain() == EventDomain.DRIVER_ACTIVITY) { + return true; + } + return event.eventDomain() == EventDomain.DRIVER_CARD + && (event.eventType() == EventType.CARD_INSERTED || event.eventType() == EventType.CARD_WITHDRAWN); + } + + private boolean hasGeoOrOdometerEvidence(RuntimeSupportEvidenceEvent support) { + return support != null + && ((support.latitude() != null && support.longitude() != null) || support.odometerKm() != null); + } + + private EventDomain normalizedDomain(EventHubEventDto event) { + if (event.eventDomain() != null && DIRECT_TACHOGRAPH_SUPPORT_DOMAINS.contains(event.eventDomain())) { + return event.eventDomain(); + } + return EventDomain.POSITION; + } + + private EventType normalizedType(EventDomain normalizedDomain, EventType originalType) { + if (normalizedDomain == EventDomain.POSITION) { + return EventType.POSITION_RECORDED; + } + if (normalizedDomain == EventDomain.PLACE) { + return EventType.WORKING_DAY_PLACE_RECORDED; + } + if (normalizedDomain == EventDomain.BORDER_CROSSING) { + return originalType == null ? EventType.BORDER_INBOUND : originalType; + } + if (normalizedDomain == EventDomain.LOAD_UNLOAD) { + return originalType == null ? EventType.LOAD_UNLOAD : originalType; + } + return originalType == null ? EventType.UNKNOWN_EVENT : originalType; + } + + private EventLifecycle normalizedLifecycle(EventDomain normalizedDomain, EventLifecycle originalLifecycle) { + if (normalizedDomain == EventDomain.POSITION || normalizedDomain == EventDomain.PLACE) { + return EventLifecycle.SNAPSHOT; + } + return originalLifecycle == null ? EventLifecycle.SNAPSHOT : originalLifecycle; + } + + private EventDetailsDto normalizedDetails( + EventHubEventDto original, + RuntimeSupportEvidenceEvent support, + EventDomain normalizedDomain + ) { + Map attributes = new LinkedHashMap<>(); + put(attributes, "normalizedSupportEvidence", true); + put(attributes, "normalizedForProfile", "tachograph-driver-esper-v1"); + put(attributes, "originalEventDomain", support.eventDomain()); + put(attributes, "originalEventType", support.eventType()); + put(attributes, "originalLifecycle", support.lifecycle()); + put(attributes, "sourceFamily", support.sourceFamily()); + put(attributes, "sourceKind", support.sourceKind()); + put(attributes, "country", support.countryCode()); + put(attributes, "region", support.regionCode()); + put(attributes, "countryFrom", support.countryFrom()); + put(attributes, "countryTo", support.countryTo()); + put(attributes, "operation", support.operation()); + if (original.eventDetails() != null && original.eventDetails().attributes() != null) { + attributes.put("originalAttributes", original.eventDetails().attributes()); + } + String type = normalizedDomain == EventDomain.POSITION ? "POSITION" : normalizedDomain.name(); + return new EventDetailsDto(type, objectMapper.valueToTree(attributes)); + } + + private JsonNode normalizedPayload( + String fallbackDriverKey, + EventHubEventDto original, + RuntimeSupportEvidenceEvent support, + EventDomain normalizedDomain + ) { + ObjectNode root = objectMapper.createObjectNode(); + ObjectNode raw = root.putObject("raw"); + JsonNode originalRaw = rawPayload(original); + if (originalRaw != null && originalRaw.isObject()) { + originalRaw.fields().forEachRemaining(entry -> raw.set(entry.getKey(), entry.getValue())); + } + put(raw, "normalizedSupportEvidence", true); + put(raw, "normalizedForProfile", "tachograph-driver-esper-v1"); + put(raw, "supportEventId", support.eventId()); + put(raw, "supportEventDomain", support.eventDomain()); + put(raw, "supportEventType", support.eventType()); + put(raw, "supportEventLifecycle", support.lifecycle()); + put(raw, "originalEventDomain", support.eventDomain()); + put(raw, "originalEventType", support.eventType()); + put(raw, "originalLifecycle", support.lifecycle()); + put(raw, "normalizedEventDomain", normalizedDomain.name()); + put(raw, "driverKey", firstNonBlank(support.driverKey(), fallbackDriverKey)); + put(raw, "vehicleKey", support.vehicleKey()); + put(raw, "registrationKey", support.registrationKey()); + put(raw, "sourceFamily", support.sourceFamily()); + put(raw, "sourceKind", support.sourceKind()); + put(raw, "country", support.countryCode()); + put(raw, "region", support.regionCode()); + put(raw, "countryFrom", support.countryFrom()); + put(raw, "countryTo", support.countryTo()); + put(raw, "operation", support.operation()); + put(raw, "odometerKm", support.odometerKm()); + put(raw, "avgSpeedKmh", support.speedKmh()); + put(raw, "maxSpeedKmh", support.maxSpeedKmh()); + put(raw, "latitude", support.latitude()); + put(raw, "longitude", support.longitude()); + put(raw, "rawRecordPath", firstNonBlank(text(originalRaw, "rawRecordPath"), original.externalSourceEventId())); + return root; + } + + private Map rawAttributes(EventHubEventDto event, JsonNode raw) { + Map result = new LinkedHashMap<>(); + put(result, "externalSourceEventId", event == null ? null : event.externalSourceEventId()); + put(result, "eventDomain", event == null || event.eventDomain() == null ? null : event.eventDomain().name()); + put(result, "eventType", event == null || event.eventType() == null ? null : event.eventType().name()); + put(result, "lifecycle", event == null || event.lifecycle() == null ? null : event.lifecycle().name()); + if (raw != null) { + result.put("raw", raw); + } + return result; + } + + private String sourceFamily(EventHubEventDto event) { + if (event == null || event.packageInfo() == null) { + return null; + } + return event.packageInfo().eventFamily(); + } + + private String sourceKind(EventHubEventDto event) { + if (event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null) { + return null; + } + return event.packageInfo().eventSource().sourceKind(); + } + + private JsonNode rawPayload(EventHubEventDto event) { + if (event == null || event.payload() == null || event.payload().isNull() || event.payload().isMissingNode()) { + return null; + } + JsonNode raw = event.payload().get("raw"); + return raw == null || raw.isNull() ? event.payload() : raw; + } + + private String driverKey(EventHubEventDto event) { + if (event == null || event.driverRef() == null || !event.driverRef().hasAnyReference()) { + return null; + } + return event.driverRef().stableKey(); + } + + private String vehicleKey(EventHubEventDto event) { + JsonNode raw = rawPayload(event); + String rawVehicleKey = text(raw, "vehicleKey"); + if (rawVehicleKey != null) { + return rawVehicleKey; + } + VehicleRefDto vehicleRef = event == null ? null : event.vehicleRef(); + if (vehicleRef == null) { + return null; + } + if (vehicleRef.vin() != null) { + return vehicleRef.vin(); + } + if (vehicleRef.sourceVehicleEntityId() != null) { + return vehicleRef.sourceVehicleEntityId(); + } + return null; + } + + private String registrationKey(EventHubEventDto event) { + JsonNode raw = rawPayload(event); + String rawRegistrationKey = text(raw, "registrationKey"); + if (rawRegistrationKey != null) { + return rawRegistrationKey; + } + VehicleRefDto vehicleRef = event == null ? null : event.vehicleRef(); + if (vehicleRef == null) { + return null; + } + if (vehicleRef.vehicleRegistration() != null && vehicleRef.vehicleRegistration().hasValue()) { + return vehicleRef.vehicleRegistration().stableKey(); + } + return vehicleRef.sourceRegistrationEntityId(); + } + + private String detailText(EventHubEventDto event, String field) { + if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null) { + return null; + } + JsonNode value = event.eventDetails().attributes().get(field); + return value == null || value.isNull() ? null : value.asText(null); + } + + private String text(JsonNode node, String field) { + if (node == null || field == null) { + return null; + } + JsonNode value = node.get(field); + if (value == null || value.isNull()) { + return null; + } + String text = value.asText(null); + return text == null || text.isBlank() ? null : text.trim(); + } + + private Long longValue(JsonNode node, String field) { + if (node == null || field == null) { + return null; + } + JsonNode value = node.get(field); + if (value == null || value.isNull()) { + return null; + } + try { + return value.isNumber() ? value.asLong() : Long.parseLong(value.asText()); + } catch (NumberFormatException ex) { + return null; + } + } + + private BigDecimal decimal(JsonNode node, String field) { + if (node == null || field == null) { + return null; + } + JsonNode value = node.get(field); + if (value == null || value.isNull()) { + return null; + } + if (value.isNumber()) { + return value.decimalValue(); + } + String text = value.asText(null); + if (text == null || text.isBlank()) { + return null; + } + try { + return new BigDecimal(text.trim()); + } catch (NumberFormatException ex) { + return null; + } + } + + private Long toKilometers(Long meters) { + return meters == null ? null : meters / 1_000L; + } + + @SafeVarargs + private final T firstNonNull(T... values) { + if (values == null) { + return null; + } + for (T value : values) { + if (value != null) { + return value; + } + } + return null; + } + + private String firstNonBlank(String... values) { + if (values == null) { + return null; + } + for (String value : values) { + if (value != null && !value.isBlank()) { + return value.trim(); + } + } + return null; + } + + private void put(ObjectNode node, String field, Object value) { + if (node != null && field != null && value != null) { + node.set(field, objectMapper.valueToTree(value)); + } + } + + private void put(Map target, String field, Object value) { + if (target != null && field != null && value != null) { + target.put(field, value); + } + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDerivedProjectionService.java b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDerivedProjectionService.java index d5491d6..f7bd7ad 100644 --- a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDerivedProjectionService.java +++ b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDerivedProjectionService.java @@ -5,6 +5,8 @@ import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDto; import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.eventprocessing.support.RuntimeSupportEvidenceNormalizationResult; +import at.procon.eventhub.processing.eventprocessing.support.RuntimeSupportEvidenceNormalizer; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import at.procon.eventhub.tachographfilesession.dto.TachographEsperDriverProcessingResultDto; @@ -43,6 +45,7 @@ public class UnifiedRuntimeDerivedProjectionService { private final DriverTimelineBuilder driverTimelineBuilder; private final DriverTimelineReusableProjectionBuilder reusableProjectionBuilder; private final TachographEsperProcessingCore esperProcessingCore; + private final RuntimeSupportEvidenceNormalizer supportEvidenceNormalizer; private final EventHubProperties properties; public UnifiedRuntimeDerivedProjectionService( @@ -50,7 +53,8 @@ public class UnifiedRuntimeDerivedProjectionService { UnifiedEventTimelineReconstructor timelineReconstructor, DriverTimelineBuilder driverTimelineBuilder, DriverTimelineReusableProjectionBuilder reusableProjectionBuilder, - EventHubProperties properties + EventHubProperties properties, + RuntimeSupportEvidenceNormalizer supportEvidenceNormalizer ) { this( runtimeEventAssemblyService, @@ -58,7 +62,8 @@ public class UnifiedRuntimeDerivedProjectionService { driverTimelineBuilder, reusableProjectionBuilder, properties, - new TachographEsperProcessingCore(driverTimelineBuilder, reusableProjectionBuilder, properties) + new TachographEsperProcessingCore(driverTimelineBuilder, reusableProjectionBuilder, properties), + supportEvidenceNormalizer ); } @@ -69,7 +74,8 @@ public class UnifiedRuntimeDerivedProjectionService { DriverTimelineBuilder driverTimelineBuilder, DriverTimelineReusableProjectionBuilder reusableProjectionBuilder, EventHubProperties properties, - TachographEsperProcessingCore esperProcessingCore + TachographEsperProcessingCore esperProcessingCore, + RuntimeSupportEvidenceNormalizer supportEvidenceNormalizer ) { this.runtimeEventAssemblyService = runtimeEventAssemblyService; this.timelineReconstructor = timelineReconstructor; @@ -77,6 +83,7 @@ public class UnifiedRuntimeDerivedProjectionService { this.reusableProjectionBuilder = reusableProjectionBuilder; this.properties = properties; this.esperProcessingCore = esperProcessingCore; + this.supportEvidenceNormalizer = supportEvidenceNormalizer; } public UnifiedRuntimeDerivedProjectionResultDto loadDriverDerivedProjections( @@ -96,10 +103,15 @@ public class UnifiedRuntimeDerivedProjectionService { String driverKey = explicitDriverKey == null ? resolveDriverKey(request, eventBundle.mergedEvents()) : explicitDriverKey; + RuntimeSupportEvidenceNormalizationResult normalizationResult = supportEvidenceNormalizer.normalizeForTachographDriver( + driverKey, + eventBundle.mergedEvents() + ); + List normalizedEvents = normalizationResult.normalizedEvents(); ResolvedDriverTimeline timeline = timelineReconstructor.reconstruct( runtimeSessionId(request), driverKey, - eventBundle.mergedEvents() + normalizedEvents ); OffsetDateTime requestedFrom = apiRequest.occurredFrom() == null @@ -120,7 +132,8 @@ public class UnifiedRuntimeDerivedProjectionService { : Math.max(1, apiRequest.minimumRestPeriodMinutes()); List notes = new ArrayList<>(eventBundle.notes()); - notes.add("Runtime derived projections were evaluated from the unified merged event stream using the shared tachograph Esper processing core."); + notes.addAll(normalizationResult.notes()); + notes.add("Runtime derived projections were evaluated from the unified merged event stream using normalized support evidence and the shared tachograph Esper processing core."); notes.add("Significant driving threshold minutes: " + significantDrivingMinutes + "."); notes.add("Minimum rest candidate period minutes: " + minimumRestPeriodMinutes + "."); if (request.occurredFrom() != null || request.occurredTo() != null) { @@ -131,7 +144,7 @@ public class UnifiedRuntimeDerivedProjectionService { runtimeSessionId(request), driverKey, timeline, - eventBundle.mergedEvents(), + normalizedEvents, requestedFrom, requestedTo, significantDrivingMinutes, diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizerTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizerTest.java new file mode 100644 index 0000000..79eee4e --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/support/RuntimeSupportEvidenceNormalizerTest.java @@ -0,0 +1,161 @@ +package at.procon.eventhub.processing.eventprocessing.support; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.GeoPointDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.math.BigDecimal; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class RuntimeSupportEvidenceNormalizerTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final RuntimeSupportEvidenceNormalizer normalizer = new RuntimeSupportEvidenceNormalizer(objectMapper); + + @Test + void normalizesVehicleOnlyIgnitionWithPositionAsTachographPositionSupportEvidence() { + EventHubEventDto ignition = vehicleOnlyEvent( + "ignition-on-1", + EventDomain.IGNITION, + EventType.IGNITION_ON, + EventLifecycle.ON, + "VIN-1", + "AT:W-1", + "2026-05-01T21:30:00Z", + new GeoPointDto(new BigDecimal("48.2082"), new BigDecimal("16.3738")) + ); + + RuntimeSupportEvidenceNormalizationResult result = normalizer.normalizeForTachographDriver( + "DRIVER-1", + List.of(ignition) + ); + + assertThat(result.normalizedSupportEvidenceEventCount()).isEqualTo(1); + EventHubEventDto normalized = result.normalizedEvents().getFirst(); + assertThat(normalized.eventDomain()).isEqualTo(EventDomain.POSITION); + assertThat(normalized.eventType()).isEqualTo(EventType.POSITION_RECORDED); + assertThat(normalized.lifecycle()).isEqualTo(EventLifecycle.SNAPSHOT); + assertThat(normalized.payload().path("raw").path("driverKey").asText()).isEqualTo("DRIVER-1"); + assertThat(normalized.payload().path("raw").path("supportEventType").asText()).isEqualTo("IGNITION_ON"); + assertThat(normalized.payload().path("raw").path("originalEventDomain").asText()).isEqualTo("IGNITION"); + assertThat(normalized.eventDetails().attributes().path("normalizedSupportEvidence").asBoolean()).isTrue(); + } + + @Test + void keepsTachographSupportDomainButAddsDriverAndNormalizedRawMetadata() { + EventHubEventDto border = vehicleOnlyEvent( + "border-1", + EventDomain.BORDER_CROSSING, + EventType.BORDER_INBOUND, + EventLifecycle.INBOUND, + "VIN-1", + "AT:W-1", + "2026-05-01T22:00:00Z", + new GeoPointDto(new BigDecimal("48.5"), new BigDecimal("16.5")) + ); + + EventHubEventDto normalized = normalizer.normalizeForTachographDriver("DRIVER-1", List.of(border)) + .normalizedEvents() + .getFirst(); + + assertThat(normalized.eventDomain()).isEqualTo(EventDomain.BORDER_CROSSING); + assertThat(normalized.eventType()).isEqualTo(EventType.BORDER_INBOUND); + assertThat(normalized.payload().path("raw").path("driverKey").asText()).isEqualTo("DRIVER-1"); + assertThat(normalized.payload().path("raw").path("supportEventType").asText()).isEqualTo("BORDER_INBOUND"); + } + + @Test + void doesNotNormalizeActivityOrCardUsageEvents() { + EventHubEventDto cardUsage = new EventHubEventDto( + UUID.randomUUID(), + "card-in-1", + null, + vehicleRef("VIN-1", "AT:W-1"), + OffsetDateTime.parse("2026-05-01T08:00:00Z"), + null, + OffsetDateTime.parse("2026-05-01T08:00:00Z"), + EventDomain.DRIVER_CARD, + EventType.CARD_INSERTED, + EventLifecycle.INSERT, + null, + null, + null, + null, + raw("DRIVER-1", "VIN-1", "AT:W-1"), + false, + null + ); + + RuntimeSupportEvidenceNormalizationResult result = normalizer.normalizeForTachographDriver( + "DRIVER-1", + List.of(cardUsage) + ); + + assertThat(result.normalizedSupportEvidenceEventCount()).isZero(); + assertThat(result.normalizedEvents().getFirst()).isSameAs(cardUsage); + } + + private EventHubEventDto vehicleOnlyEvent( + String externalId, + EventDomain domain, + EventType type, + EventLifecycle lifecycle, + String vehicleKey, + String registrationKey, + String occurredAt, + GeoPointDto position + ) { + return new EventHubEventDto( + UUID.randomUUID(), + externalId, + null, + vehicleRef(vehicleKey, registrationKey), + OffsetDateTime.parse(occurredAt), + null, + OffsetDateTime.parse(occurredAt), + domain, + type, + lifecycle, + 123_000L, + position, + null, + null, + raw(null, vehicleKey, registrationKey), + false, + null + ); + } + + private VehicleRefDto vehicleRef(String vehicleKey, String registrationKey) { + String[] registrationParts = registrationKey.split(":", 2); + return new VehicleRefDto( + "VIN:" + vehicleKey, + vehicleKey, + "VR:" + registrationKey, + new VehicleRegistrationRefDto(registrationParts[0], registrationParts[1]) + ); + } + + private JsonNode raw(String driverKey, String vehicleKey, String registrationKey) { + ObjectNode root = objectMapper.createObjectNode(); + ObjectNode raw = root.putObject("raw"); + if (driverKey != null) { + raw.put("driverKey", driverKey); + } + raw.put("vehicleKey", vehicleKey); + raw.put("registrationKey", registrationKey); + raw.put("sourceKind", "TEST"); + return root; + } +}