From b4289abea50eedefaf51eaf81f1e7518f2745cf7 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Thu, 11 Jun 2026 13:11:17 +0200 Subject: [PATCH] Add runtime event mixing project changes --- README_PATCH.md | 40 +- .../mixing/RuntimeEventMixingDecisionDto.java | 30 + .../mixing/RuntimeEventMixingService.java | 691 ++++++++++++++++++ .../mixing/RuntimeEventSourceProfile.java | 8 + .../mixing/RuntimeMixedEventBundle.java | 28 + .../module/DriverActivityIntervalsModule.java | 2 +- .../DriverVehicleUsageIntervalsModule.java | 2 +- .../module/DriverWorkingTimeModuleKeys.java | 1 + .../module/EventEvidenceMixingModule.java | 103 +++ .../VehicleEvidenceAttachmentModule.java | 10 +- .../epl/DriverWorkingTimeEplEventMapper.java | 25 + ...riverWorkingTimeRuntimeProcessingPlan.java | 10 +- .../mixing/RuntimeEventMixingServiceTest.java | 467 ++++++++++++ 13 files changed, 1399 insertions(+), 18 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingDecisionDto.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventSourceProfile.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeMixedEventBundle.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java create mode 100644 src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java diff --git a/README_PATCH.md b/README_PATCH.md index ea2abc8..389b0f4 100644 --- a/README_PATCH.md +++ b/README_PATCH.md @@ -1,17 +1,33 @@ -# EventHub fix-list patch +# Patch: Tachograph file-session support for runtime event mixing -This patch implements the requested remaining items from the fix list, excluding build verification and SQL Server 2008 SQL rewriting. +This patch extends the existing `event-evidence-mixing` implementation so it also supports events produced by uploaded tachograph file sessions. -## Apply notes +## What changed -1. Copy the files from this archive into the project root. -2. Delete the old migration files listed in `DELETE_FILES.txt`. -3. Run the test suite locally with Maven/Java 21. +- `RuntimeEventMixingService` now treats `TACHOGRAPH_FILE_SESSION` and `COMPOSITE_TACHOGRAPH_FILE_SESSION` events as tachograph runtime evidence. +- File-session events do not always have a DB-style `extractionCode` such as `CARD_ACTIVITY` or `VU_ACTIVITY` in the raw payload. +- The mixing service now derives the equivalent extraction code from: + - source kind: `DRIVER_CARD` or `VEHICLE_UNIT` + - event domain: `DRIVER_ACTIVITY`, `POSITION`, `PLACE`, `BORDER_CROSSING`, `DRIVER_CARD`, etc. + - file-session external id / source-package kind. -## Main changes +## Supported derived mappings -- Renumbered Flyway migrations to remove duplicate `V9` and `V10` versions. -- Removed the duplicated Timescale/Event source record migration. -- Switched local Docker Compose DB from plain PostgreSQL to a TimescaleDB/PostGIS-capable image. -- Added normalized raw tachograph payload metadata for DB-extracted EventHub events. -- Added tests for Flyway version uniqueness and tachograph DB mapper → timeline reconstruction metadata. +| File-session event | Driver-card source | Vehicle-unit source | +|---|---|---| +| `DRIVER_ACTIVITY` | `CARD_ACTIVITY` | `VU_ACTIVITY` | +| `POSITION` | `CARD_POSITION` | `VU_POSITION` | +| `PLACE` | `CARD_PLACE` | `VU_PLACE` | +| `BORDER_CROSSING` | `CARD_BORDER_CROSSING` | `VU_BORDER_CROSSING` | +| `DRIVER_CARD` insert/withdraw | `CARD_VEHICLES_USED` | `IW_CYCLE` | + +## Important behavior + +- Duplicate file-session `CARD_ACTIVITY` / `VU_ACTIVITY` events are mixed the same way as persistent tachograph DB events. +- Duplicate file-session position/place/border events are also mixed the same way as persistent tachograph DB support evidence. +- `CARD_VEHICLES_USED` and `IW_CYCLE` are still not mixed; they remain accepted for separate vehicle-usage processing. + +## Modified files + +- `src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java` +- `src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java` diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingDecisionDto.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingDecisionDto.java new file mode 100644 index 0000000..4ca94cc --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingDecisionDto.java @@ -0,0 +1,30 @@ +package at.procon.eventhub.processing.eventprocessing.mixing; + +import java.time.OffsetDateTime; +import java.util.List; + +public record RuntimeEventMixingDecisionDto( + String ruleId, + String equivalenceType, + String eventKey, + String decision, + String channel, + String primaryExternalSourceEventId, + String primaryExtractionCode, + List secondaryExternalSourceEventIds, + List secondaryExtractionCodes, + OffsetDateTime occurredAt, + String eventDomain, + String eventType, + String lifecycle, + String reason +) { + public RuntimeEventMixingDecisionDto { + secondaryExternalSourceEventIds = secondaryExternalSourceEventIds == null + ? List.of() + : List.copyOf(secondaryExternalSourceEventIds); + secondaryExtractionCodes = secondaryExtractionCodes == null + ? List.of() + : List.copyOf(secondaryExtractionCodes); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java new file mode 100644 index 0000000..cf069a6 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java @@ -0,0 +1,691 @@ +package at.procon.eventhub.processing.eventprocessing.mixing; + +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.GeoPointDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver; +import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver; +import com.fasterxml.jackson.databind.JsonNode; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.springframework.stereotype.Service; + +@Service +public class RuntimeEventMixingService { + + public static final String MODE_OFF = "OFF"; + public static final String MODE_TACHOGRAPH_SAME_SOURCE = "TACHOGRAPH_SAME_SOURCE"; + public static final String MODE_FULL = "FULL"; + + public static final String RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY = + "tachograph.activity.card-vu.same-event-key"; + public static final String RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY = + "tachograph.activity.card-vu.compatible-activity-key"; + public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY = + "tachograph.support.card-vu.same-event-key"; + public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY = + "tachograph.support.card-vu.compatible-support-key"; + + public RuntimeMixedEventBundle mix(List events, String requestedMode) { + List rawEvents = sort(events); + String mode = normalizeMode(requestedMode); + if (MODE_OFF.equals(mode)) { + return unchanged(rawEvents, "Runtime event mixing is disabled by eventMixingMode=OFF."); + } + + MixingState state = new MixingState(rawEvents); + applyTachographCardVuActivityMixing(state); + applyTachographCardVuSupportEvidenceMixing(state); + + List driverPartitionEvents = rawEvents.stream() + .filter(event -> !state.isSuppressed(event)) + .map(state::effectiveEvent) + .toList(); + List activityTimelineEvents = driverPartitionEvents.stream() + .filter(RuntimeEventMixingService::isDriverActivityPoint) + .toList(); + + // Vehicle-usage events are intentionally not mixed here. CARD_VEHICLES_USED and IW_CYCLE + // are kept as separate input evidence because they must be processed by their own rules later. + List vehicleUsageEvents = rawEvents.stream() + .filter(RuntimeEventMixingService::isDriverCardUsagePoint) + .toList(); + + List supportEvidenceEvents = driverPartitionEvents.stream() + .filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event)) + .toList(); + + List notes = new ArrayList<>(); + notes.add("Runtime event mixing inspected " + rawEvents.size() + " event(s)."); + notes.add("Runtime event mixing suppressed " + state.suppressedEvents().size() + + " duplicate source event(s) from activity/support evidence channels."); + notes.add("Runtime event mixing keeps CARD_POSITION, CARD_PLACE, and CARD_BORDER_CROSSING as primary when matching VU support evidence describes the same semantic event."); + notes.add("Runtime event mixing kept all CARD_VEHICLES_USED and IW_CYCLE card-usage point events unchanged for vehicle-usage processing."); + return new RuntimeMixedEventBundle( + rawEvents, + driverPartitionEvents, + activityTimelineEvents, + vehicleUsageEvents, + supportEvidenceEvents, + state.suppressedEvents(), + state.decisions(), + notes, + state.warnings() + ); + } + + private RuntimeMixedEventBundle unchanged(List rawEvents, String note) { + return new RuntimeMixedEventBundle( + rawEvents, + rawEvents, + rawEvents.stream().filter(RuntimeEventMixingService::isDriverActivityPoint).toList(), + rawEvents.stream().filter(RuntimeEventMixingService::isDriverCardUsagePoint).toList(), + rawEvents.stream().filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event)).toList(), + List.of(), + List.of(), + List.of(note), + List.of() + ); + } + + private void applyTachographCardVuActivityMixing(MixingState state) { + LinkedHashMap> exactGroups = new LinkedHashMap<>(); + for (EventHubEventDto event : state.rawEvents()) { + if (!isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) { + continue; + } + exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>()) + .add(event); + } + for (Map.Entry> entry : exactGroups.entrySet()) { + fuseCardAndVuDuplicates( + state, + entry.getKey(), + "EXACT_EVENT_KEY", + RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY, + "ACTIVITY_TIMELINE", + entry.getValue(), + "CARD_ACTIVITY", + "VU_ACTIVITY", + "CARD_ACTIVITY and VU_ACTIVITY describe the same driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization." + ); + } + + LinkedHashMap> compatibleGroups = new LinkedHashMap<>(); + for (EventHubEventDto event : state.rawEvents()) { + if (state.isSuppressed(event) || !isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) { + continue; + } + compatibleGroups.computeIfAbsent(compatibleActivityKey(event), ignored -> new ArrayList<>()) + .add(event); + } + for (Map.Entry> entry : compatibleGroups.entrySet()) { + fuseCardAndVuDuplicates( + state, + entry.getKey(), + "COMPATIBLE_ACTIVITY_KEY", + RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY, + "ACTIVITY_TIMELINE", + entry.getValue(), + "CARD_ACTIVITY", + "VU_ACTIVITY", + "CARD_ACTIVITY and VU_ACTIVITY describe a compatible driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization." + ); + } + } + + private void applyTachographCardVuSupportEvidenceMixing(MixingState state) { + LinkedHashMap> exactGroups = new LinkedHashMap<>(); + for (EventHubEventDto event : state.rawEvents()) { + if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) { + continue; + } + exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>()) + .add(event); + } + for (Map.Entry> entry : exactGroups.entrySet()) { + fuseCardAndVuSupportDuplicates( + state, + entry.getKey(), + "EXACT_EVENT_KEY", + RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY, + entry.getValue() + ); + } + + LinkedHashMap> compatibleGroups = new LinkedHashMap<>(); + for (EventHubEventDto event : state.rawEvents()) { + if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) { + continue; + } + compatibleGroups.computeIfAbsent(compatibleSupportEvidenceKey(event), ignored -> new ArrayList<>()) + .add(event); + } + for (Map.Entry> entry : compatibleGroups.entrySet()) { + fuseCardAndVuSupportDuplicates( + state, + entry.getKey(), + "COMPATIBLE_SUPPORT_KEY", + RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY, + entry.getValue() + ); + } + } + + private void fuseCardAndVuSupportDuplicates( + MixingState state, + String eventKey, + String equivalenceType, + String ruleId, + List group + ) { + Map.Entry pair = cardVuSupportPair(group); + if (pair == null) { + return; + } + fuseCardAndVuDuplicates( + state, + eventKey, + equivalenceType, + ruleId, + "SUPPORT_EVIDENCE", + group, + pair.getKey(), + pair.getValue(), + pair.getKey() + " and " + pair.getValue() + " describe the same support evidence event. " + + pair.getKey() + " is kept as primary support evidence; " + pair.getValue() + + " is suppressed from support-evidence normalization. Vehicle/VIN identity from the VU event is copied to the primary event when the card event has weaker vehicle identity." + ); + } + + private void fuseCardAndVuDuplicates( + MixingState state, + String eventKey, + String equivalenceType, + String ruleId, + String channel, + List group, + String primaryExtractionCode, + String secondaryExtractionCode, + String reason + ) { + List primaries = group.stream() + .filter(event -> Objects.equals(primaryExtractionCode, extractionCode(event))) + .sorted(eventComparator()) + .toList(); + List secondaries = group.stream() + .filter(event -> Objects.equals(secondaryExtractionCode, extractionCode(event))) + .sorted(eventComparator()) + .toList(); + if (primaries.isEmpty() || secondaries.isEmpty()) { + return; + } + EventHubEventDto primary = primaries.get(0); + List newlySuppressed = secondaries.stream() + .filter(event -> !state.isSuppressed(event)) + .toList(); + if (newlySuppressed.isEmpty()) { + return; + } + EventHubEventDto enrichedPrimary = enrichPrimaryVehicleRef(primary, newlySuppressed); + if (enrichedPrimary != primary) { + state.replace(primary, enrichedPrimary); + } + newlySuppressed.forEach(state::suppress); + state.decisions().add(new RuntimeEventMixingDecisionDto( + ruleId, + equivalenceType, + eventKey, + "FUSED_PRIMARY_SELECTED", + channel, + primary.externalSourceEventId(), + extractionCode(primary), + newlySuppressed.stream().map(EventHubEventDto::externalSourceEventId).toList(), + newlySuppressed.stream().map(RuntimeEventMixingService::extractionCode).toList(), + primary.occurredAt(), + primary.eventDomain() == null ? null : primary.eventDomain().name(), + primary.eventType() == null ? null : primary.eventType().name(), + primary.lifecycle() == null ? null : primary.lifecycle().name(), + reason + )); + } + + private static EventHubEventDto enrichPrimaryVehicleRef(EventHubEventDto primary, List secondaries) { + if (primary == null || secondaries == null || secondaries.isEmpty()) { + return primary; + } + VehicleRefDto bestSecondary = secondaries.stream() + .map(EventHubEventDto::vehicleRef) + .filter(Objects::nonNull) + .filter(VehicleRefDto::hasAnyReference) + .filter(RuntimeEventMixingService::hasVehicleIdentity) + .findFirst() + .orElse(null); + if (bestSecondary == null || !shouldEnrichVehicleRef(primary.vehicleRef(), bestSecondary)) { + return primary; + } + VehicleRefDto merged = mergeVehicleRef(primary.vehicleRef(), bestSecondary); + if (Objects.equals(primary.vehicleRef(), merged)) { + return primary; + } + return new EventHubEventDto( + primary.eventId(), + primary.externalSourceEventId(), + primary.driverRef(), + merged, + primary.occurredAt(), + primary.receivedPartnerAt(), + primary.receivedHubAt(), + primary.eventDomain(), + primary.eventType(), + primary.lifecycle(), + primary.odometerM(), + primary.position(), + primary.eventDetails(), + primary.sourcePackageRef(), + primary.payload(), + primary.manualEntry(), + primary.packageInfo() + ); + } + + private static boolean shouldEnrichVehicleRef(VehicleRefDto primary, VehicleRefDto secondary) { + return secondary != null + && hasVehicleIdentity(secondary) + && (primary == null || !hasVehicleIdentity(primary)); + } + + private static boolean hasVehicleIdentity(VehicleRefDto vehicleRef) { + return vehicleRef != null + && (notBlank(vehicleRef.sourceVehicleEntityId()) || notBlank(vehicleRef.vin())); + } + + private static VehicleRefDto mergeVehicleRef(VehicleRefDto primary, VehicleRefDto secondary) { + if (primary == null) { + return secondary; + } + if (secondary == null) { + return primary; + } + VehicleRegistrationRefDto registration = primary.vehicleRegistration() != null && primary.vehicleRegistration().hasValue() + ? primary.vehicleRegistration() + : secondary.vehicleRegistration(); + return new VehicleRefDto( + firstNonBlank(primary.sourceVehicleEntityId(), secondary.sourceVehicleEntityId()), + firstNonBlank(primary.vin(), secondary.vin()), + firstNonBlank(primary.sourceRegistrationEntityId(), secondary.sourceRegistrationEntityId()), + registration + ); + } + + private static Map.Entry cardVuSupportPair(List group) { + if (group == null || group.isEmpty()) { + return null; + } + Set extractionCodes = group.stream() + .map(RuntimeEventMixingService::extractionCode) + .filter(Objects::nonNull) + .collect(java.util.stream.Collectors.toCollection(LinkedHashSet::new)); + if (extractionCodes.contains("CARD_POSITION") && extractionCodes.contains("VU_POSITION")) { + return Map.entry("CARD_POSITION", "VU_POSITION"); + } + if (extractionCodes.contains("CARD_PLACE") && extractionCodes.contains("VU_PLACE")) { + return Map.entry("CARD_PLACE", "VU_PLACE"); + } + if (extractionCodes.contains("CARD_BORDER_CROSSING") && extractionCodes.contains("VU_BORDER_CROSSING")) { + return Map.entry("CARD_BORDER_CROSSING", "VU_BORDER_CROSSING"); + } + return null; + } + + private static boolean isTachographCardOrVuActivity(EventHubEventDto event) { + RuntimeEventSourceProfile profile = sourceProfile(event); + return isTachographRuntimeSource(profile) + && (Objects.equals("CARD_ACTIVITY", profile.extractionCode()) + || Objects.equals("VU_ACTIVITY", profile.extractionCode())); + } + + private static boolean isTachographCardOrVuSupportEvidence(EventHubEventDto event) { + if (event == null || isDriverActivityPoint(event) || isDriverCardUsagePoint(event)) { + return false; + } + RuntimeEventSourceProfile profile = sourceProfile(event); + if (!isTachographRuntimeSource(profile)) { + return false; + } + return switch (nullToEmpty(profile.extractionCode())) { + case "CARD_POSITION", "VU_POSITION", "CARD_PLACE", "VU_PLACE", "CARD_BORDER_CROSSING", "VU_BORDER_CROSSING" -> true; + default -> false; + }; + } + + private static boolean isDriverActivityPoint(EventHubEventDto event) { + return event != null + && event.eventDomain() == EventDomain.DRIVER_ACTIVITY + && (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.END) + && event.occurredAt() != null; + } + + private static boolean isDriverCardUsagePoint(EventHubEventDto event) { + return event != null + && event.eventDomain() == EventDomain.DRIVER_CARD + && (event.lifecycle() == EventLifecycle.INSERT || event.lifecycle() == EventLifecycle.WITHDRAW) + && event.occurredAt() != null; + } + + private static RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) { + JsonNode raw = rawPayload(event); + String sourceKind = firstNonBlank(text(raw, "sourceKind"), sourceKind(event)); + String extractionCode = firstNonBlank( + text(raw, "extractionCode"), + fileSessionExtractionCode(event, sourceKind), + extractionCodeFromExternalSourceEventId(event) + ); + String sourceSystem = firstNonBlank( + text(raw, "sourceSystem"), + sourceProvider(event), + sourceSystemFromExternalSourceEventId(event) + ); + if (sourceSystem == null && (extractionCode != null || isTachographFileSessionEvent(event))) { + sourceSystem = "TACHOGRAPH"; + } + return new RuntimeEventSourceProfile( + normalizeUpper(sourceSystem), + normalizeUpper(sourceKind), + normalizeUpper(extractionCode) + ); + } + + private static String extractionCode(EventHubEventDto event) { + return sourceProfile(event).extractionCode(); + } + private static boolean isTachographRuntimeSource(RuntimeEventSourceProfile profile) { + if (profile == null) { + return false; + } + return switch (nullToEmpty(profile.sourceSystem())) { + case "TACHOGRAPH", "TACHOGRAPH_FILE_SESSION", "COMPOSITE_TACHOGRAPH_FILE_SESSION" -> true; + default -> false; + }; + } + + private static String fileSessionExtractionCode(EventHubEventDto event, String sourceKind) { + if (!isTachographFileSessionEvent(event)) { + return null; + } + String normalizedSourceKind = normalizeUpper(sourceKind); + if (normalizedSourceKind == null) { + return null; + } + if (event != null && event.eventDomain() == EventDomain.DRIVER_ACTIVITY) { + return switch (normalizedSourceKind) { + case "DRIVER_CARD" -> "CARD_ACTIVITY"; + case "VEHICLE_UNIT" -> "VU_ACTIVITY"; + default -> null; + }; + } + if (event != null && event.eventDomain() == EventDomain.DRIVER_CARD) { + return switch (normalizedSourceKind) { + case "DRIVER_CARD" -> "CARD_VEHICLES_USED"; + case "VEHICLE_UNIT" -> "IW_CYCLE"; + default -> null; + }; + } + if (event == null || event.eventDomain() == null) { + return null; + } + String prefix = switch (normalizedSourceKind) { + case "DRIVER_CARD" -> "CARD"; + case "VEHICLE_UNIT" -> "VU"; + default -> null; + }; + if (prefix == null) { + return null; + } + return switch (event.eventDomain()) { + case POSITION -> prefix + "_POSITION"; + case PLACE -> prefix + "_PLACE"; + case BORDER_CROSSING -> prefix + "_BORDER_CROSSING"; + case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null; + default -> null; + }; + } + + private static boolean isTachographFileSessionEvent(EventHubEventDto event) { + if (event == null) { + return false; + } + String packageKind = event.sourcePackageRef() == null ? null : normalizeUpper(event.sourcePackageRef().packageKind()); + if (Objects.equals("TACHOGRAPH_FILE_SESSION", packageKind) + || Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", packageKind)) { + return true; + } + String externalId = event.externalSourceEventId(); + return externalId != null + && (externalId.startsWith("TACHOGRAPH_FILE_SESSION:") + || externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:")); + } + + + private static String compatibleActivityKey(EventHubEventDto event) { + JsonNode raw = rawPayload(event); + return String.join("|", + "ACTIVITY_COMPATIBLE", + nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()), + nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)), + nullToEmpty(event.eventDomain() == null ? null : event.eventDomain().name()), + nullToEmpty(event.eventType() == null ? null : event.eventType().name()), + nullToEmpty(event.lifecycle() == null ? null : event.lifecycle().name()), + normalizeTime(event.occurredAt()), + nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)), + nullToEmpty(firstNonBlank(text(raw, "startedAt"), text(raw, "intervalStartedAt"))), + nullToEmpty(firstNonBlank(text(raw, "endedAt"), text(raw, "intervalEndedAt"))), + nullToEmpty(firstNonBlank(text(raw, "slot"), text(raw, "cardSlot"))), + nullToEmpty(firstNonBlank(text(raw, "cardStatus"))), + nullToEmpty(firstNonBlank(text(raw, "drivingStatus"))) + ); + } + + private static String compatibleSupportEvidenceKey(EventHubEventDto event) { + JsonNode raw = rawPayload(event); + GeoPointDto position = event == null ? null : event.position(); + return String.join("|", + "SUPPORT_COMPATIBLE", + nullToEmpty(event == null || event.packageInfo() == null ? null : event.packageInfo().tenantKey()), + nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)), + nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()), + nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()), + nullToEmpty(event == null || event.lifecycle() == null ? null : event.lifecycle().name()), + normalizeTime(event == null ? null : event.occurredAt()), + nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)), + nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())), + nullToEmpty(firstNonBlank(text(raw, "longitude"), position == null || position.longitude() == null ? null : position.longitude().toPlainString())), + nullToEmpty(firstNonBlank(text(raw, "odometerM"), event == null || event.odometerM() == null ? null : String.valueOf(event.odometerM()))), + nullToEmpty(firstNonBlank(text(raw, "country"), detailText(event, "country"))), + nullToEmpty(firstNonBlank(text(raw, "region"), detailText(event, "region"))), + nullToEmpty(firstNonBlank(text(raw, "countryFrom"), detailText(event, "countryFrom"))), + nullToEmpty(firstNonBlank(text(raw, "countryTo"), detailText(event, "countryTo"))), + nullToEmpty(firstNonBlank(text(raw, "operation"), detailText(event, "operation"))) + ); + } + + private static Comparator eventComparator() { + return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name()) + .thenComparing(event -> event.eventType() == null ? "" : event.eventType().name()) + .thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name()) + .thenComparing(event -> sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo)) + .thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo)); + } + + private static List sort(List events) { + return (events == null ? List.of() : events).stream() + .filter(Objects::nonNull) + .sorted(eventComparator()) + .toList(); + } + + private static String normalizeMode(String requestedMode) { + String value = normalizeUpper(requestedMode); + if (value == null) { + return MODE_TACHOGRAPH_SAME_SOURCE; + } + return switch (value) { + case MODE_OFF, MODE_TACHOGRAPH_SAME_SOURCE, MODE_FULL -> value; + default -> MODE_TACHOGRAPH_SAME_SOURCE; + }; + } + + private static JsonNode rawPayload(EventHubEventDto event) { + return RuntimeEntityReferenceResolver.rawPayload(event); + } + + private static String sourceKind(EventHubEventDto event) { + return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null + ? null + : event.packageInfo().eventSource().sourceKind(); + } + + private static String sourceProvider(EventHubEventDto event) { + return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null + ? null + : event.packageInfo().eventSource().providerKey(); + } + + private static String sourceSystemFromExternalSourceEventId(EventHubEventDto event) { + String externalId = event == null ? null : event.externalSourceEventId(); + if (externalId == null || externalId.isBlank()) { + return null; + } + String[] parts = externalId.split(":"); + return parts.length >= 1 ? parts[0] : null; + } + + private static String extractionCodeFromExternalSourceEventId(EventHubEventDto event) { + String externalId = event == null ? null : event.externalSourceEventId(); + if (externalId == null || externalId.isBlank()) { + return null; + } + String[] parts = externalId.split(":"); + return parts.length >= 2 ? parts[1] : null; + } + + private static String detailText(EventHubEventDto event, String field) { + if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) { + return null; + } + JsonNode value = event.eventDetails().attributes().get(field); + if (value == null || value.isNull()) { + return null; + } + String text = value.asText(null); + return text == null || text.isBlank() ? null : text.trim(); + } + + private static String text(JsonNode node, String field) { + if (node == null || field == null) { + return null; + } + JsonNode value = node.get(field); + if (value == null || value.isNull()) { + return null; + } + String text = value.asText(null); + return text == null || text.isBlank() ? null : text.trim(); + } + + private static String firstNonBlank(String... values) { + if (values == null) { + return null; + } + for (String value : values) { + if (value != null && !value.isBlank()) { + return value.trim(); + } + } + return null; + } + + private static String normalizeUpper(String value) { + return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT); + } + + private static String nullToEmpty(Object value) { + return value == null ? "" : String.valueOf(value); + } + + private static String normalizeTime(OffsetDateTime value) { + return value == null ? "" : value.toInstant().toString(); + } + + private static boolean notBlank(String value) { + return value != null && !value.isBlank(); + } + + private static final class MixingState { + private final List rawEvents; + private final Set suppressedEventIds = new LinkedHashSet<>(); + private final Map replacementsByEventId = new LinkedHashMap<>(); + private final List suppressedEvents = new ArrayList<>(); + private final List decisions = new ArrayList<>(); + private final List warnings = new ArrayList<>(); + + private MixingState(List rawEvents) { + this.rawEvents = rawEvents; + } + + private List rawEvents() { + return rawEvents; + } + + private boolean isSuppressed(EventHubEventDto event) { + return suppressedEventIds.contains(eventId(event)); + } + + private EventHubEventDto effectiveEvent(EventHubEventDto event) { + return replacementsByEventId.getOrDefault(eventId(event), event); + } + + private void replace(EventHubEventDto original, EventHubEventDto replacement) { + if (original != null && replacement != null) { + replacementsByEventId.put(eventId(original), replacement); + } + } + + private void suppress(EventHubEventDto event) { + if (suppressedEventIds.add(eventId(event))) { + suppressedEvents.add(event); + } + } + + private List suppressedEvents() { + return suppressedEvents; + } + + private List decisions() { + return decisions; + } + + private List warnings() { + return warnings; + } + + private String eventId(EventHubEventDto event) { + if (event == null) { + return ""; + } + return firstNonBlank(event.externalSourceEventId(), event.eventId() == null ? null : event.eventId().toString(), RuntimeEventIdentityResolver.canonicalEventKey(event)); + } + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventSourceProfile.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventSourceProfile.java new file mode 100644 index 0000000..b179099 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventSourceProfile.java @@ -0,0 +1,8 @@ +package at.procon.eventhub.processing.eventprocessing.mixing; + +public record RuntimeEventSourceProfile( + String sourceSystem, + String sourceKind, + String extractionCode +) { +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeMixedEventBundle.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeMixedEventBundle.java new file mode 100644 index 0000000..7e482c9 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeMixedEventBundle.java @@ -0,0 +1,28 @@ +package at.procon.eventhub.processing.eventprocessing.mixing; + +import at.procon.eventhub.dto.EventHubEventDto; +import java.util.List; + +public record RuntimeMixedEventBundle( + List rawEvents, + List driverPartitionEvents, + List activityTimelineEvents, + List vehicleUsageEvents, + List supportEvidenceEvents, + List suppressedEvents, + List eventMixingDecisions, + List notes, + List warnings +) { + public RuntimeMixedEventBundle { + rawEvents = rawEvents == null ? List.of() : List.copyOf(rawEvents); + driverPartitionEvents = driverPartitionEvents == null ? List.of() : List.copyOf(driverPartitionEvents); + activityTimelineEvents = activityTimelineEvents == null ? List.of() : List.copyOf(activityTimelineEvents); + vehicleUsageEvents = vehicleUsageEvents == null ? List.of() : List.copyOf(vehicleUsageEvents); + supportEvidenceEvents = supportEvidenceEvents == null ? List.of() : List.copyOf(supportEvidenceEvents); + suppressedEvents = suppressedEvents == null ? List.of() : List.copyOf(suppressedEvents); + eventMixingDecisions = eventMixingDecisions == null ? List.of() : List.copyOf(eventMixingDecisions); + notes = notes == null ? List.of() : List.copyOf(notes); + warnings = warnings == null ? List.of() : List.copyOf(warnings); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverActivityIntervalsModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverActivityIntervalsModule.java index d46ad68..35e3e8c 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverActivityIntervalsModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverActivityIntervalsModule.java @@ -52,7 +52,7 @@ public class DriverActivityIntervalsModule implements RuntimeEplModule { @Override public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) { - List sourceEvents = DriverWorkingTimeEplEventMapper.sourceEvents(context); + List sourceEvents = DriverWorkingTimeEplEventMapper.activityTimelineEvents(context); List> pointEvents = DriverWorkingTimeEplEventMapper.activityPointEvents(sourceEvents); RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition( moduleKey(), diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModule.java index 5a26093..25301ea 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModule.java @@ -52,7 +52,7 @@ public class DriverVehicleUsageIntervalsModule implements RuntimeEplModule { @Override public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) { - List sourceEvents = DriverWorkingTimeEplEventMapper.sourceEvents(context); + List sourceEvents = DriverWorkingTimeEplEventMapper.vehicleUsageEvents(context); List> pointEvents = DriverWorkingTimeEplEventMapper.vehicleUsagePointEvents(sourceEvents); RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition( moduleKey(), diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java index 5a52668..2906f1f 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java @@ -3,6 +3,7 @@ package at.procon.eventhub.processing.eventprocessing.module; public final class DriverWorkingTimeModuleKeys { public static final String RUNTIME_EVENT_ASSEMBLY = "runtime-event-assembly"; + public static final String EVENT_EVIDENCE_MIXING = "event-evidence-mixing"; public static final String EVENT_TO_ACTIVITY_INTERVALS = "event-to-activity-intervals"; public static final String EVENT_TO_VEHICLE_USAGE_INTERVALS = "event-to-vehicle-usage-intervals"; public static final String VEHICLE_USAGE_MERGE = "vehicle-usage-merge"; diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java new file mode 100644 index 0000000..8b2c227 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java @@ -0,0 +1,103 @@ +package at.procon.eventhub.processing.eventprocessing.module; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeEventMixingService; +import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeMixedEventBundle; +import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class EventEvidenceMixingModule implements RuntimeProcessingModule { + + public static final String EVENT_MIXING_MODE_PARAMETER = "eventMixingMode"; + + private final RuntimeEventMixingService eventMixingService; + + @Autowired + public EventEvidenceMixingModule(RuntimeEventMixingService eventMixingService) { + this.eventMixingService = eventMixingService; + } + + /** Compatibility constructor for tests/local registries that do not wire the mixing service. */ + public EventEvidenceMixingModule() { + this.eventMixingService = null; + } + + @Override + public String moduleKey() { + return DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING; + } + + @Override + public RuntimeProcessingModuleDescriptorDto descriptor() { + return new RuntimeProcessingModuleDescriptorDto( + moduleKey(), + "Event evidence mixing", + "Applies source-aware runtime evidence rules before intervalization. The initial rule collapses duplicate tachograph CARD_ACTIVITY/VU_ACTIVITY events by normalized eventKey while keeping CARD_VEHICLES_USED unchanged for vehicle-usage processing.", + "JAVA", + Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto") + ); + } + + @Override + public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) { + if (eventMixingService == null) { + return delegatedPlaceholder(); + } + List sourceEvents = sourceEvents(context); + RuntimeMixedEventBundle mixed = eventMixingService.mix(sourceEvents, eventMixingMode(context)); + Map metadata = new LinkedHashMap<>(); + metadata.put("inputEventCount", mixed.rawEvents().size()); + metadata.put("driverPartitionEventCount", mixed.driverPartitionEvents().size()); + metadata.put("activityTimelineEventCount", mixed.activityTimelineEvents().size()); + metadata.put("vehicleUsageEventCount", mixed.vehicleUsageEvents().size()); + metadata.put("supportEvidenceEventCount", mixed.supportEvidenceEvents().size()); + metadata.put("suppressedEventCount", mixed.suppressedEvents().size()); + metadata.put("eventMixingDecisionCount", mixed.eventMixingDecisions().size()); + metadata.put("eventMixingMode", eventMixingMode(context)); + return new RuntimeProcessingModuleResult( + moduleKey(), + RuntimeProcessingModuleStatus.SUCCESS, + mixed, + metadata, + mixed.warnings() + ); + } + + private RuntimeProcessingModuleResult delegatedPlaceholder() { + Map metadata = new LinkedHashMap<>(); + metadata.put("executionModel", "delegated"); + metadata.put("note", "Event evidence mixing is not wired in this local/legacy module registry."); + return new RuntimeProcessingModuleResult( + moduleKey(), + RuntimeProcessingModuleStatus.SUCCESS, + null, + metadata, + List.of() + ); + } + + private List sourceEvents(RuntimeProcessingModuleContext context) { + RuntimeProcessingModuleResult assemblyResult = context.previousResults().get(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY); + if (assemblyResult != null && assemblyResult.output() instanceof UnifiedRuntimeEventBundle bundle) { + return bundle.mergedEvents(); + } + return context.events(); + } + + private String eventMixingMode(RuntimeProcessingModuleContext context) { + Object value = context.request() == null || context.request().parameters() == null + ? null + : context.request().parameters().get(EVENT_MIXING_MODE_PARAMETER); + if (value == null) { + value = context.attributes().get(EVENT_MIXING_MODE_PARAMETER); + } + return value == null ? RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE : value.toString(); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java index 8de698f..88214d5 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java @@ -7,6 +7,7 @@ import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDr import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval; import at.procon.eventhub.processing.dto.RuntimeDriverPartitionDebugDto; import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.eventprocessing.module.epl.DriverWorkingTimeEplEventMapper; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto; import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult; import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; @@ -66,6 +67,7 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule return delegatedPlaceholder(); } UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context); + List partitionSourceEvents = DriverWorkingTimeEplEventMapper.driverPartitionEvents(context); UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context); boolean includePartitionDebug = booleanAttribute(context, "includePartitionDebug", false); List mergedVehicleUsageIntervals = mergedVehicleUsageIntervals(context); @@ -73,8 +75,8 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule LinkedHashMap partitions = new LinkedHashMap<>(); LinkedHashMap> attachedVehicleEvidenceByEvent = new LinkedHashMap<>(); List warnings = new ArrayList<>(); - for (String driverKey : selectedDriverKeys(scopeRequest.toRuntimeRequest(), broadBundle.mergedEvents())) { - List directDriverEvents = broadBundle.mergedEvents().stream() + for (String driverKey : selectedDriverKeys(scopeRequest.toRuntimeRequest(), partitionSourceEvents)) { + List directDriverEvents = partitionSourceEvents.stream() .filter(event -> Objects.equals(driverKey(event), driverKey)) .toList(); List driverVehicleUsageIntervals = mergedVehicleUsageIntervals.stream() @@ -83,7 +85,7 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule RuntimeDriverVehicleEvidenceAttachmentResult attachmentResult = vehicleEvidenceAttachmentService.attachVehicleEvidence( driverKey, directDriverEvents, - broadBundle.mergedEvents(), + partitionSourceEvents, driverVehicleUsageIntervals, booleanAttribute( context, @@ -132,6 +134,8 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule metadata.put("attachedVehicleEvidenceEventCount", partitions.values().stream() .mapToInt(partition -> partition.attachedVehicleEvidenceEvents().size()) .sum()); + metadata.put("partitionSourceEventCount", partitionSourceEvents.size()); + metadata.put("rawMergedEventCount", broadBundle.mergedEvents().size()); return new RuntimeProcessingModuleResult( moduleKey(), RuntimeProcessingModuleStatus.SUCCESS, diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/epl/DriverWorkingTimeEplEventMapper.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/epl/DriverWorkingTimeEplEventMapper.java index 38ec905..1fe759c 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/epl/DriverWorkingTimeEplEventMapper.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/epl/DriverWorkingTimeEplEventMapper.java @@ -7,6 +7,7 @@ import at.procon.eventhub.dto.EventType; import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeModuleKeys; import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleContext; import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleResult; +import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeMixedEventBundle; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver; import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver; @@ -38,6 +39,30 @@ public final class DriverWorkingTimeEplEventMapper { return safeList(context.events()); } + public static List activityTimelineEvents(RuntimeProcessingModuleContext context) { + RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING); + if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) { + return safeList(mixed.activityTimelineEvents()); + } + return sourceEvents(context); + } + + public static List vehicleUsageEvents(RuntimeProcessingModuleContext context) { + RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING); + if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) { + return safeList(mixed.vehicleUsageEvents()); + } + return sourceEvents(context); + } + + public static List driverPartitionEvents(RuntimeProcessingModuleContext context) { + RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING); + if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) { + return safeList(mixed.driverPartitionEvents()); + } + return sourceEvents(context); + } + public static List> activityPointEvents(List sourceEvents) { return safeList(sourceEvents).stream() .map(DriverWorkingTimeEplEventMapper::toActivityPointEvent) diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java index cd48003..6407fe0 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java @@ -114,6 +114,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "JAVA", Set.of("UnifiedRuntimeEventBundle") )); + descriptors.add(new RuntimeProcessingModuleDescriptorDto( + DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING, + "Event evidence mixing", + "Applies source-aware runtime evidence rules before intervalization. Initially collapses same-event-key CARD_ACTIVITY/VU_ACTIVITY duplicates and leaves CARD_VEHICLES_USED unchanged.", + "JAVA", + Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto") + )); } descriptors.addAll(List.of( new RuntimeProcessingModuleDescriptorDto( @@ -171,7 +178,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "vehicleEvidencePaddingMinutes", "includeActivityIntervals", "includeDrivingIntervals", - "includePartitionDebug" + "includePartitionDebug", + "eventMixingMode" ); } diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java new file mode 100644 index 0000000..b669258 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java @@ -0,0 +1,467 @@ +package at.procon.eventhub.processing.eventprocessing.mixing; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDetailsDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class RuntimeEventMixingServiceTest { + + private final RuntimeEventMixingService service = new RuntimeEventMixingService(); + + @Test + void suppressesVuActivityDuplicateFromActivityTimelineWhenCardActivityHasSameEventKey() { + EventHubEventDto card = activity("CARD_ACTIVITY", "DRIVER_CARD", "TACHOGRAPH:CARD_ACTIVITY:1:START"); + EventHubEventDto vu = activity("VU_ACTIVITY", "VEHICLE_UNIT", "TACHOGRAPH:VU_ACTIVITY:2:START"); + + RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE); + + assertThat(mixed.rawEvents()).hasSize(2); + assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH:CARD_ACTIVITY:1:START"); + assertThat(mixed.driverPartitionEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH:CARD_ACTIVITY:1:START"); + assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH:VU_ACTIVITY:2:START"); + assertThat(mixed.eventMixingDecisions()).hasSize(1); + assertThat(mixed.eventMixingDecisions().getFirst().ruleId()) + .isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY); + } + + @Test + void keepsVuActivityWhenNoMatchingCardActivityExists() { + EventHubEventDto vu = activity("VU_ACTIVITY", "VEHICLE_UNIT", "TACHOGRAPH:VU_ACTIVITY:2:START"); + + RuntimeMixedEventBundle mixed = service.mix(List.of(vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE); + + assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH:VU_ACTIVITY:2:START"); + assertThat(mixed.suppressedEvents()).isEmpty(); + assertThat(mixed.eventMixingDecisions()).isEmpty(); + } + + @Test + void suppressesTachographFileSessionVuActivityDuplicateWhenNoExtractionCodeIsPresent() { + EventHubEventDto card = fileSessionActivity( + "DRIVER_CARD", + "TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-interval-1:START:2026-04-01T00:00:00Z" + ); + EventHubEventDto vu = fileSessionActivity( + "VEHICLE_UNIT", + "TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-interval-1:START:2026-04-01T00:00:00Z" + ); + + RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE); + + assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-interval-1:START:2026-04-01T00:00:00Z"); + assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-interval-1:START:2026-04-01T00:00:00Z"); + assertThat(mixed.eventMixingDecisions()).hasSize(1); + assertThat(mixed.eventMixingDecisions().getFirst().secondaryExtractionCodes()) + .containsExactly("VU_ACTIVITY"); + } + + @Test + void keepsAllCardVehicleUsedAndIwCycleEventsForVehicleUsageProcessing() { + EventHubEventDto cardVehicleUsed = cardUsage( + "CARD_VEHICLES_USED", + "DRIVER_CARD", + "TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT", + EventType.CARD_INSERTED, + EventLifecycle.INSERT + ); + EventHubEventDto iwCycle = cardUsage( + "IW_CYCLE", + "VEHICLE_UNIT", + "TACHOGRAPH:IW_CYCLE:20:INSERT", + EventType.CARD_INSERTED, + EventLifecycle.INSERT + ); + + RuntimeMixedEventBundle mixed = service.mix(List.of(cardVehicleUsed, iwCycle), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE); + + assertThat(mixed.vehicleUsageEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT", "TACHOGRAPH:IW_CYCLE:20:INSERT"); + assertThat(mixed.suppressedEvents()).isEmpty(); + } + + + @Test + void suppressesVuPositionDuplicateFromSupportEvidenceAndCopiesVehicleIdentityToCardPrimary() { + EventHubEventDto card = supportEvidence( + "CARD_POSITION", + "DRIVER_CARD", + "TACHOGRAPH:CARD_POSITION:1", + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT, + false + ); + EventHubEventDto vu = supportEvidence( + "VU_POSITION", + "VEHICLE_UNIT", + "TACHOGRAPH:VU_POSITION:2", + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT, + true + ); + + RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE); + + assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH:CARD_POSITION:1"); + assertThat(mixed.supportEvidenceEvents().getFirst().vehicleRef().vin()) + .isEqualTo("WDB9634031L123456"); + assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH:VU_POSITION:2"); + assertThat(mixed.eventMixingDecisions()).hasSize(1); + assertThat(mixed.eventMixingDecisions().getFirst().ruleId()) + .isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY); + assertThat(mixed.eventMixingDecisions().getFirst().channel()) + .isEqualTo("SUPPORT_EVIDENCE"); + } + + @Test + void suppressesVuPlaceAndBorderDuplicateFromSupportEvidence() { + EventHubEventDto cardPlace = supportEvidence( + "CARD_PLACE", + "DRIVER_CARD", + "TACHOGRAPH:CARD_PLACE:1", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.SNAPSHOT, + false + ); + EventHubEventDto vuPlace = supportEvidence( + "VU_PLACE", + "VEHICLE_UNIT", + "TACHOGRAPH:VU_PLACE:2", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.SNAPSHOT, + true + ); + EventHubEventDto cardBorder = supportEvidence( + "CARD_BORDER_CROSSING", + "DRIVER_CARD", + "TACHOGRAPH:CARD_BORDER_CROSSING:3", + EventDomain.BORDER_CROSSING, + EventType.BORDER_OUTBOUND, + EventLifecycle.OUTBOUND, + false + ); + EventHubEventDto vuBorder = supportEvidence( + "VU_BORDER_CROSSING", + "VEHICLE_UNIT", + "TACHOGRAPH:VU_BORDER_CROSSING:4", + EventDomain.BORDER_CROSSING, + EventType.BORDER_OUTBOUND, + EventLifecycle.OUTBOUND, + true + ); + + RuntimeMixedEventBundle mixed = service.mix(List.of(cardPlace, vuPlace, cardBorder, vuBorder), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE); + + assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH:CARD_BORDER_CROSSING:3", "TACHOGRAPH:CARD_PLACE:1"); + assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH:VU_BORDER_CROSSING:4", "TACHOGRAPH:VU_PLACE:2"); + assertThat(mixed.eventMixingDecisions()).hasSize(2); + } + + @Test + void suppressesTachographFileSessionVuPositionDuplicateWhenNoExtractionCodeIsPresent() { + EventHubEventDto card = fileSessionSupportEvidence( + "DRIVER_CARD", + "TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-position-1:SNAPSHOT:2026-04-01T00:00:00Z", + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT, + false + ); + EventHubEventDto vu = fileSessionSupportEvidence( + "VEHICLE_UNIT", + "TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-position-1:SNAPSHOT:2026-04-01T00:00:00Z", + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT, + true + ); + + RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE); + + assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-position-1:SNAPSHOT:2026-04-01T00:00:00Z"); + assertThat(mixed.supportEvidenceEvents().getFirst().vehicleRef().vin()) + .isEqualTo("WDB9634031L123456"); + assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-position-1:SNAPSHOT:2026-04-01T00:00:00Z"); + assertThat(mixed.eventMixingDecisions()).hasSize(1); + assertThat(mixed.eventMixingDecisions().getFirst().secondaryExtractionCodes()) + .containsExactly("VU_POSITION"); + } + + private EventHubEventDto activity(String extractionCode, String sourceKind, String externalId) { + ObjectNode raw = baseRaw(extractionCode, sourceKind); + raw.put("activityType", "BREAK_REST"); + raw.put("cardSlot", "DRIVER"); + raw.put("cardStatus", "INSERTED"); + raw.put("drivingStatus", "SINGLE"); + raw.put("startedAt", "2026-03-31T15:43:00Z"); + raw.put("endedAt", "2026-04-01T00:00:00Z"); + raw.put("intervalId", "TACHOGRAPH:" + extractionCode + ":1"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + ObjectNode attributes = JsonNodeFactory.instance.objectNode(); + attributes.put("cardSlot", "DRIVER"); + attributes.put("cardStatus", "INSERTED"); + attributes.put("drivingStatus", "SINGLE"); + return event( + externalId, + sourceKind, + EventDomain.DRIVER_ACTIVITY, + EventType.BREAK_REST, + EventLifecycle.START, + OffsetDateTime.parse("2026-04-01T00:00:00Z"), + new EventDetailsDto("DRIVER_ACTIVITY", attributes), + payload + ); + } + + private EventHubEventDto supportEvidence( + String extractionCode, + String sourceKind, + String externalId, + EventDomain domain, + EventType type, + EventLifecycle lifecycle, + boolean withVin + ) { + ObjectNode raw = baseRaw(extractionCode, sourceKind); + raw.put("latitude", "48.208174"); + raw.put("longitude", "16.373819"); + raw.put("odometerM", "123000"); + raw.put("country", "AT"); + raw.put("region", "W"); + raw.put("countryFrom", "AT"); + raw.put("countryTo", "DE"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + ObjectNode attributes = JsonNodeFactory.instance.objectNode(); + attributes.put("country", "AT"); + attributes.put("region", "W"); + attributes.put("countryFrom", "AT"); + attributes.put("countryTo", "DE"); + VehicleRefDto vehicleRef = withVin + ? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE")) + : new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE")); + return event( + externalId, + sourceKind, + domain, + type, + lifecycle, + OffsetDateTime.parse("2026-04-01T00:00:00Z"), + new EventDetailsDto(domain.name(), attributes), + payload, + vehicleRef + ); + } + + private EventHubEventDto fileSessionActivity(String sourceKind, String externalId) { + ObjectNode raw = baseRawWithoutExtraction(sourceKind); + raw.put("activityType", "BREAK_REST"); + raw.put("cardSlot", "DRIVER"); + raw.put("cardStatus", "INSERTED"); + raw.put("drivingStatus", "SINGLE"); + raw.put("startedAt", "2026-03-31T15:43:00Z"); + raw.put("endedAt", "2026-04-01T00:00:00Z"); + raw.put("intervalId", "TACHOGRAPH_FILE_SESSION:ACTIVITY:1"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + ObjectNode attributes = JsonNodeFactory.instance.objectNode(); + attributes.put("cardSlot", "DRIVER"); + attributes.put("cardStatus", "INSERTED"); + attributes.put("drivingStatus", "SINGLE"); + return event( + externalId, + sourceKind, + EventDomain.DRIVER_ACTIVITY, + EventType.BREAK_REST, + EventLifecycle.START, + OffsetDateTime.parse("2026-04-01T00:00:00Z"), + new EventDetailsDto("DRIVER_ACTIVITY", attributes), + payload + ); + } + + private EventHubEventDto fileSessionSupportEvidence( + String sourceKind, + String externalId, + EventDomain domain, + EventType type, + EventLifecycle lifecycle, + boolean withVin + ) { + ObjectNode raw = baseRawWithoutExtraction(sourceKind); + raw.put("latitude", "48.208174"); + raw.put("longitude", "16.373819"); + raw.put("odometerM", "123000"); + raw.put("country", "AT"); + raw.put("region", "W"); + raw.put("countryFrom", "AT"); + raw.put("countryTo", "DE"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + ObjectNode attributes = JsonNodeFactory.instance.objectNode(); + attributes.put("country", "AT"); + attributes.put("region", "W"); + attributes.put("countryFrom", "AT"); + attributes.put("countryTo", "DE"); + VehicleRefDto vehicleRef = withVin + ? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE")) + : new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE")); + return event( + externalId, + sourceKind, + domain, + type, + lifecycle, + OffsetDateTime.parse("2026-04-01T00:00:00Z"), + new EventDetailsDto(domain.name(), attributes), + payload, + vehicleRef + ); + } + + private EventHubEventDto cardUsage( + String extractionCode, + String sourceKind, + String externalId, + EventType eventType, + EventLifecycle lifecycle + ) { + ObjectNode raw = baseRaw(extractionCode, sourceKind); + raw.put("intervalId", "TACHOGRAPH:" + extractionCode + ":1"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + return event( + externalId, + sourceKind, + EventDomain.DRIVER_CARD, + eventType, + lifecycle, + OffsetDateTime.parse("2026-04-01T00:00:00Z"), + null, + payload + ); + } + + private ObjectNode baseRaw(String extractionCode, String sourceKind) { + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("sourceRowId", extractionCode + "-1"); + raw.put("sourceKind", sourceKind); + raw.put("extractionCode", extractionCode); + raw.put("driverKey", "1:10000000198490"); + raw.put("registrationKey", "1:LL-158TE"); + return raw; + } + + private ObjectNode baseRawWithoutExtraction(String sourceKind) { + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("sourceRowId", sourceKind + "-1"); + raw.put("sourceKind", sourceKind); + raw.put("driverKey", "1:10000000198490"); + raw.put("registrationKey", "1:LL-158TE"); + return raw; + } + + private EventHubEventDto event( + String externalId, + String sourceKind, + EventDomain domain, + EventType type, + EventLifecycle lifecycle, + OffsetDateTime occurredAt, + EventDetailsDto details, + ObjectNode payload + ) { + return event( + externalId, + sourceKind, + domain, + type, + lifecycle, + occurredAt, + details, + payload, + new VehicleRefDto(null, null, null, new VehicleRegistrationRefDto("1", 1, "LL-158TE")) + ); + } + + private EventHubEventDto event( + String externalId, + String sourceKind, + EventDomain domain, + EventType type, + EventLifecycle lifecycle, + OffsetDateTime occurredAt, + EventDetailsDto details, + ObjectNode payload, + VehicleRefDto vehicleRef + ) { + return new EventHubEventDto( + UUID.randomUUID(), + externalId, + new DriverRefDto("1:10000000198490", new DriverCardRefDto("1", 1, "10000000198490")), + vehicleRef, + occurredAt, + null, + occurredAt, + domain, + type, + lifecycle, + null, + null, + details, + null, + payload, + false, + packageInfo(sourceKind, domain, occurredAt.toLocalDate()) + ); + } + + private EventHubPackageRequest packageInfo(String sourceKind, EventDomain domain, LocalDate businessDate) { + EventSourceDto source = new EventSourceDto("TACHOGRAPH", sourceKind, "TACHOGRAPH_" + sourceKind, null, null, null); + OffsetDateTime from = businessDate.atStartOfDay().atOffset(java.time.ZoneOffset.UTC); + OffsetDateTime to = businessDate.plusDays(1).atStartOfDay().atOffset(java.time.ZoneOffset.UTC); + return new EventHubPackageRequest( + "default", + source, + null, + ImportScopeDto.tenantAll(from, to), + domain.name(), + businessDate, + source.stableKey() + ":" + domain.name() + ":" + businessDate + ); + } +}