diff --git a/README_PATCH.md b/README_PATCH.md index 389b0f4..dadf9f7 100644 --- a/README_PATCH.md +++ b/README_PATCH.md @@ -1,33 +1,45 @@ -# Patch: Tachograph file-session support for runtime event mixing +# EventHub runtime event-mixing refactor -This patch extends the existing `event-evidence-mixing` implementation so it also supports events produced by uploaded tachograph file sessions. +This patch refactors the previous targeted card/VU duplicate handling into a first-class runtime event-mixing subsystem. -## What changed +## New architecture components -- `RuntimeEventMixingService` now treats `TACHOGRAPH_FILE_SESSION` and `COMPOSITE_TACHOGRAPH_FILE_SESSION` events as tachograph runtime evidence. -- File-session events do not always have a DB-style `extractionCode` such as `CARD_ACTIVITY` or `VU_ACTIVITY` in the raw payload. -- The mixing service now derives the equivalent extraction code from: - - source kind: `DRIVER_CARD` or `VEHICLE_UNIT` - - event domain: `DRIVER_ACTIVITY`, `POSITION`, `PLACE`, `BORDER_CROSSING`, `DRIVER_CARD`, etc. - - file-session external id / source-package kind. +- `RuntimeEventMixingModule` +- `RuntimeEventMixingService` +- `RuntimeEventDescriptor` +- `RuntimeEventDescriptorFactory` +- `RuntimeEventSourceProfile` +- `RuntimeEventMixingRule` +- `RuntimeEventMixingRuleRegistry` +- `RuntimeEventMixingDecisionDto` +- `RuntimeMixedEventBundle` +- `RuntimeResolvedEvent` +- `RuntimeResolvedEventRole` +- `RuntimeEventMixingChannel` -## Supported derived mappings +## Current configured rules -| File-session event | Driver-card source | Vehicle-unit source | -|---|---|---| -| `DRIVER_ACTIVITY` | `CARD_ACTIVITY` | `VU_ACTIVITY` | -| `POSITION` | `CARD_POSITION` | `VU_POSITION` | -| `PLACE` | `CARD_PLACE` | `VU_PLACE` | -| `BORDER_CROSSING` | `CARD_BORDER_CROSSING` | `VU_BORDER_CROSSING` | -| `DRIVER_CARD` insert/withdraw | `CARD_VEHICLES_USED` | `IW_CYCLE` | +The rule registry currently applies these tachograph same-source rules: -## Important behavior +1. `tachograph.activity.card-vu.same-event-key` +2. `tachograph.activity.card-vu.compatible-activity-key` +3. `tachograph.support.card-vu.same-event-key` +4. `tachograph.support.card-vu.compatible-support-key` -- Duplicate file-session `CARD_ACTIVITY` / `VU_ACTIVITY` events are mixed the same way as persistent tachograph DB events. -- Duplicate file-session position/place/border events are also mixed the same way as persistent tachograph DB support evidence. -- `CARD_VEHICLES_USED` and `IW_CYCLE` are still not mixed; they remain accepted for separate vehicle-usage processing. +The activity rules collapse duplicate `CARD_ACTIVITY`/`VU_ACTIVITY` points before activity intervalization. -## Modified files +The support rules collapse duplicate card/VU support evidence for: -- `src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java` -- `src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java` +- `CARD_POSITION` / `VU_POSITION` +- `CARD_PLACE` / `VU_PLACE` +- `CARD_BORDER_CROSSING` / `VU_BORDER_CROSSING` + +The card-side event remains the primary event. The VU-side event is suppressed from the processing channel but remains visible through `suppressedEvents`, `resolvedEvents`, and `eventMixingDecisions`. + +## Still intentionally unchanged + +`CARD_VEHICLES_USED` and `IW_CYCLE` are still not mixed. They remain fully accepted in `vehicleUsageEvents` because they need a separate vehicle-usage rule later. + +## TACHOGRAPH_FILE_SESSION support + +The descriptor factory recognizes `TACHOGRAPH_FILE_SESSION` and `COMPOSITE_TACHOGRAPH_FILE_SESSION` events and derives card/VU extraction codes from `sourceKind` and event domain when no explicit `extractionCode` is present. diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java index cf069a6..22a938b 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java @@ -1,24 +1,17 @@ package at.procon.eventhub.processing.eventprocessing.mixing; -import at.procon.eventhub.dto.EventDomain; import at.procon.eventhub.dto.EventHubEventDto; -import at.procon.eventhub.dto.EventLifecycle; -import at.procon.eventhub.dto.GeoPointDto; import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto; -import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver; -import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver; -import com.fasterxml.jackson.databind.JsonNode; -import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Comparator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @@ -37,37 +30,58 @@ public class RuntimeEventMixingService { public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY = "tachograph.support.card-vu.compatible-support-key"; + private final RuntimeEventDescriptorFactory descriptorFactory; + private final RuntimeEventMixingRuleRegistry ruleRegistry; + + @Autowired + public RuntimeEventMixingService( + RuntimeEventDescriptorFactory descriptorFactory, + RuntimeEventMixingRuleRegistry ruleRegistry + ) { + this.descriptorFactory = descriptorFactory; + this.ruleRegistry = ruleRegistry; + } + + /** Compatibility constructor used by unit tests and local registries. */ + public RuntimeEventMixingService() { + this(new RuntimeEventDescriptorFactory(), new RuntimeEventMixingRuleRegistry()); + } + public RuntimeMixedEventBundle mix(List events, String requestedMode) { - List rawEvents = sort(events); String mode = normalizeMode(requestedMode); + List descriptors = descriptorFactory.describeSorted(events); + List rawEvents = descriptors.stream().map(RuntimeEventDescriptor::event).toList(); if (MODE_OFF.equals(mode)) { - return unchanged(rawEvents, "Runtime event mixing is disabled by eventMixingMode=OFF."); + return unchanged(rawEvents, descriptors, "Runtime event mixing is disabled by eventMixingMode=OFF."); } - MixingState state = new MixingState(rawEvents); - applyTachographCardVuActivityMixing(state); - applyTachographCardVuSupportEvidenceMixing(state); + MixingState state = new MixingState(descriptors); + for (RuntimeEventMixingRule rule : ruleRegistry.rulesForMode(mode)) { + applyRule(state, rule); + } List driverPartitionEvents = rawEvents.stream() .filter(event -> !state.isSuppressed(event)) .map(state::effectiveEvent) .toList(); List activityTimelineEvents = driverPartitionEvents.stream() - .filter(RuntimeEventMixingService::isDriverActivityPoint) + .filter(descriptorFactory::isDriverActivityPoint) .toList(); // Vehicle-usage events are intentionally not mixed here. CARD_VEHICLES_USED and IW_CYCLE // are kept as separate input evidence because they must be processed by their own rules later. List vehicleUsageEvents = rawEvents.stream() - .filter(RuntimeEventMixingService::isDriverCardUsagePoint) + .filter(descriptorFactory::isDriverCardUsagePoint) .toList(); List supportEvidenceEvents = driverPartitionEvents.stream() - .filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event)) + .filter(event -> !descriptorFactory.isDriverActivityPoint(event) && !descriptorFactory.isDriverCardUsagePoint(event)) .toList(); + List resolvedEvents = buildResolvedEvents(state, rawEvents); List notes = new ArrayList<>(); notes.add("Runtime event mixing inspected " + rawEvents.size() + " event(s)."); + notes.add("Runtime event mixing applied " + ruleRegistry.rulesForMode(mode).size() + " configured rule(s) in mode " + mode + "."); notes.add("Runtime event mixing suppressed " + state.suppressedEvents().size() + " duplicate source event(s) from activity/support evidence channels."); notes.add("Runtime event mixing keeps CARD_POSITION, CARD_PLACE, and CARD_BORDER_CROSSING as primary when matching VU support evidence describes the same semantic event."); @@ -79,188 +93,156 @@ public class RuntimeEventMixingService { vehicleUsageEvents, supportEvidenceEvents, state.suppressedEvents(), + resolvedEvents, state.decisions(), notes, state.warnings() ); } - private RuntimeMixedEventBundle unchanged(List rawEvents, String note) { + private RuntimeMixedEventBundle unchanged(List rawEvents, List descriptors, String note) { return new RuntimeMixedEventBundle( rawEvents, rawEvents, - rawEvents.stream().filter(RuntimeEventMixingService::isDriverActivityPoint).toList(), - rawEvents.stream().filter(RuntimeEventMixingService::isDriverCardUsagePoint).toList(), - rawEvents.stream().filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event)).toList(), + rawEvents.stream().filter(descriptorFactory::isDriverActivityPoint).toList(), + rawEvents.stream().filter(descriptorFactory::isDriverCardUsagePoint).toList(), + rawEvents.stream().filter(event -> !descriptorFactory.isDriverActivityPoint(event) && !descriptorFactory.isDriverCardUsagePoint(event)).toList(), List.of(), + descriptors.stream().map(this::defaultResolvedEvent).toList(), List.of(), List.of(note), List.of() ); } - private void applyTachographCardVuActivityMixing(MixingState state) { - LinkedHashMap> exactGroups = new LinkedHashMap<>(); - for (EventHubEventDto event : state.rawEvents()) { - if (!isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) { + private void applyRule(MixingState state, RuntimeEventMixingRule rule) { + LinkedHashMap> groups = new LinkedHashMap<>(); + for (RuntimeEventDescriptor descriptor : state.descriptors()) { + if (state.isSuppressed(descriptor) || !rule.matches(descriptor)) { continue; } - exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>()) - .add(event); - } - for (Map.Entry> entry : exactGroups.entrySet()) { - fuseCardAndVuDuplicates( - state, - entry.getKey(), - "EXACT_EVENT_KEY", - RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY, - "ACTIVITY_TIMELINE", - entry.getValue(), - "CARD_ACTIVITY", - "VU_ACTIVITY", - "CARD_ACTIVITY and VU_ACTIVITY describe the same driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization." - ); - } - - LinkedHashMap> compatibleGroups = new LinkedHashMap<>(); - for (EventHubEventDto event : state.rawEvents()) { - if (state.isSuppressed(event) || !isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) { + String key = descriptor.keyFor(rule.equivalenceType()); + if (key == null || key.isBlank()) { continue; } - compatibleGroups.computeIfAbsent(compatibleActivityKey(event), ignored -> new ArrayList<>()) - .add(event); + groups.computeIfAbsent(key, ignored -> new ArrayList<>()).add(descriptor); } - for (Map.Entry> entry : compatibleGroups.entrySet()) { - fuseCardAndVuDuplicates( - state, - entry.getKey(), - "COMPATIBLE_ACTIVITY_KEY", - RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY, - "ACTIVITY_TIMELINE", - entry.getValue(), - "CARD_ACTIVITY", - "VU_ACTIVITY", - "CARD_ACTIVITY and VU_ACTIVITY describe a compatible driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization." - ); + for (Map.Entry> entry : groups.entrySet()) { + fuseDuplicateGroup(state, rule, entry.getKey(), entry.getValue()); } } - private void applyTachographCardVuSupportEvidenceMixing(MixingState state) { - LinkedHashMap> exactGroups = new LinkedHashMap<>(); - for (EventHubEventDto event : state.rawEvents()) { - if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) { - continue; - } - exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>()) - .add(event); - } - for (Map.Entry> entry : exactGroups.entrySet()) { - fuseCardAndVuSupportDuplicates( - state, - entry.getKey(), - "EXACT_EVENT_KEY", - RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY, - entry.getValue() - ); - } - - LinkedHashMap> compatibleGroups = new LinkedHashMap<>(); - for (EventHubEventDto event : state.rawEvents()) { - if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) { - continue; - } - compatibleGroups.computeIfAbsent(compatibleSupportEvidenceKey(event), ignored -> new ArrayList<>()) - .add(event); - } - for (Map.Entry> entry : compatibleGroups.entrySet()) { - fuseCardAndVuSupportDuplicates( - state, - entry.getKey(), - "COMPATIBLE_SUPPORT_KEY", - RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY, - entry.getValue() - ); - } - } - - private void fuseCardAndVuSupportDuplicates( + private void fuseDuplicateGroup( MixingState state, + RuntimeEventMixingRule rule, String eventKey, - String equivalenceType, - String ruleId, - List group + List group ) { - Map.Entry pair = cardVuSupportPair(group); - if (pair == null) { - return; - } - fuseCardAndVuDuplicates( - state, - eventKey, - equivalenceType, - ruleId, - "SUPPORT_EVIDENCE", - group, - pair.getKey(), - pair.getValue(), - pair.getKey() + " and " + pair.getValue() + " describe the same support evidence event. " - + pair.getKey() + " is kept as primary support evidence; " + pair.getValue() - + " is suppressed from support-evidence normalization. Vehicle/VIN identity from the VU event is copied to the primary event when the card event has weaker vehicle identity." - ); - } - - private void fuseCardAndVuDuplicates( - MixingState state, - String eventKey, - String equivalenceType, - String ruleId, - String channel, - List group, - String primaryExtractionCode, - String secondaryExtractionCode, - String reason - ) { - List primaries = group.stream() - .filter(event -> Objects.equals(primaryExtractionCode, extractionCode(event))) - .sorted(eventComparator()) + List primaries = group.stream() + .filter(rule::isPrimary) + .sorted(descriptorComparator()) .toList(); - List secondaries = group.stream() - .filter(event -> Objects.equals(secondaryExtractionCode, extractionCode(event))) - .sorted(eventComparator()) + List secondaries = group.stream() + .filter(rule::isSecondary) + .sorted(descriptorComparator()) .toList(); if (primaries.isEmpty() || secondaries.isEmpty()) { return; } - EventHubEventDto primary = primaries.get(0); - List newlySuppressed = secondaries.stream() - .filter(event -> !state.isSuppressed(event)) + RuntimeEventDescriptor primary = primaries.getFirst(); + List newlySuppressed = secondaries.stream() + .filter(descriptor -> !state.isSuppressed(descriptor)) .toList(); if (newlySuppressed.isEmpty()) { return; } - EventHubEventDto enrichedPrimary = enrichPrimaryVehicleRef(primary, newlySuppressed); - if (enrichedPrimary != primary) { + + EventHubEventDto enrichedPrimary = enrichPrimaryVehicleRef( + primary.event(), + newlySuppressed.stream().map(RuntimeEventDescriptor::event).toList() + ); + if (enrichedPrimary != primary.event()) { state.replace(primary, enrichedPrimary); } - newlySuppressed.forEach(state::suppress); + newlySuppressed.forEach(descriptor -> state.suppress(descriptor, rule, primary, eventKey)); + state.markPrimary(primary, rule, eventKey, newlySuppressed); state.decisions().add(new RuntimeEventMixingDecisionDto( - ruleId, - equivalenceType, + rule.ruleId(), + rule.equivalenceType(), eventKey, - "FUSED_PRIMARY_SELECTED", - channel, - primary.externalSourceEventId(), - extractionCode(primary), - newlySuppressed.stream().map(EventHubEventDto::externalSourceEventId).toList(), - newlySuppressed.stream().map(RuntimeEventMixingService::extractionCode).toList(), - primary.occurredAt(), + rule.decision(), + rule.channel().name(), + primary.event().externalSourceEventId(), + primary.extractionCode(), + newlySuppressed.stream().map(descriptor -> descriptor.event().externalSourceEventId()).toList(), + newlySuppressed.stream().map(RuntimeEventDescriptor::extractionCode).toList(), + primary.event().occurredAt(), primary.eventDomain() == null ? null : primary.eventDomain().name(), primary.eventType() == null ? null : primary.eventType().name(), primary.lifecycle() == null ? null : primary.lifecycle().name(), - reason + rule.reason() )); } + private List buildResolvedEvents(MixingState state, List rawEvents) { + List resolved = new ArrayList<>(); + for (EventHubEventDto event : rawEvents) { + RuntimeResolvedEvent explicit = state.resolvedEvent(event); + if (explicit != null) { + resolved.add(explicit); + continue; + } + RuntimeEventDescriptor descriptor = state.descriptor(event); + if (descriptor != null) { + resolved.add(defaultResolvedEvent(descriptor)); + } + } + return List.copyOf(resolved); + } + + private RuntimeResolvedEvent defaultResolvedEvent(RuntimeEventDescriptor descriptor) { + RuntimeEventMixingChannel channel = defaultChannel(descriptor); + RuntimeResolvedEventRole role = switch (channel) { + case ACTIVITY_TIMELINE -> RuntimeResolvedEventRole.PRIMARY; + case VEHICLE_USAGE -> RuntimeResolvedEventRole.VEHICLE_USAGE_INPUT; + case SUPPORT_EVIDENCE -> RuntimeResolvedEventRole.SUPPORT_ONLY; + default -> RuntimeResolvedEventRole.PRIMARY; + }; + return new RuntimeResolvedEvent( + descriptor.event(), + channel, + role, + null, + null, + descriptor.eventKey(), + descriptor.event() == null ? null : descriptor.event().externalSourceEventId(), + List.of(), + "No event-mixing rule changed this event." + ); + } + + private RuntimeEventMixingChannel defaultChannel(RuntimeEventDescriptor descriptor) { + if (descriptor == null) { + return RuntimeEventMixingChannel.AUDIT; + } + if (descriptor.driverActivityPoint()) { + return RuntimeEventMixingChannel.ACTIVITY_TIMELINE; + } + if (descriptor.driverCardUsagePoint()) { + return RuntimeEventMixingChannel.VEHICLE_USAGE; + } + return RuntimeEventMixingChannel.SUPPORT_EVIDENCE; + } + + private Comparator descriptorComparator() { + return Comparator.comparing(RuntimeEventDescriptor::occurredAt, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(descriptor -> descriptor.eventDomain() == null ? "" : descriptor.eventDomain().name()) + .thenComparing(descriptor -> descriptor.eventType() == null ? "" : descriptor.eventType().name()) + .thenComparing(descriptor -> descriptor.lifecycle() == null ? "" : descriptor.lifecycle().name()) + .thenComparing(RuntimeEventDescriptor::extractionCode, Comparator.nullsLast(String::compareTo)) + .thenComparing(descriptor -> descriptor.event() == null ? null : descriptor.event().externalSourceEventId(), Comparator.nullsLast(String::compareTo)); + } + private static EventHubEventDto enrichPrimaryVehicleRef(EventHubEventDto primary, List secondaries) { if (primary == null || secondaries == null || secondaries.isEmpty()) { return primary; @@ -329,215 +311,10 @@ public class RuntimeEventMixingService { ); } - private static Map.Entry cardVuSupportPair(List group) { - if (group == null || group.isEmpty()) { - return null; - } - Set extractionCodes = group.stream() - .map(RuntimeEventMixingService::extractionCode) - .filter(Objects::nonNull) - .collect(java.util.stream.Collectors.toCollection(LinkedHashSet::new)); - if (extractionCodes.contains("CARD_POSITION") && extractionCodes.contains("VU_POSITION")) { - return Map.entry("CARD_POSITION", "VU_POSITION"); - } - if (extractionCodes.contains("CARD_PLACE") && extractionCodes.contains("VU_PLACE")) { - return Map.entry("CARD_PLACE", "VU_PLACE"); - } - if (extractionCodes.contains("CARD_BORDER_CROSSING") && extractionCodes.contains("VU_BORDER_CROSSING")) { - return Map.entry("CARD_BORDER_CROSSING", "VU_BORDER_CROSSING"); - } - return null; - } - - private static boolean isTachographCardOrVuActivity(EventHubEventDto event) { - RuntimeEventSourceProfile profile = sourceProfile(event); - return isTachographRuntimeSource(profile) - && (Objects.equals("CARD_ACTIVITY", profile.extractionCode()) - || Objects.equals("VU_ACTIVITY", profile.extractionCode())); - } - - private static boolean isTachographCardOrVuSupportEvidence(EventHubEventDto event) { - if (event == null || isDriverActivityPoint(event) || isDriverCardUsagePoint(event)) { - return false; - } - RuntimeEventSourceProfile profile = sourceProfile(event); - if (!isTachographRuntimeSource(profile)) { - return false; - } - return switch (nullToEmpty(profile.extractionCode())) { - case "CARD_POSITION", "VU_POSITION", "CARD_PLACE", "VU_PLACE", "CARD_BORDER_CROSSING", "VU_BORDER_CROSSING" -> true; - default -> false; - }; - } - - private static boolean isDriverActivityPoint(EventHubEventDto event) { - return event != null - && event.eventDomain() == EventDomain.DRIVER_ACTIVITY - && (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.END) - && event.occurredAt() != null; - } - - private static boolean isDriverCardUsagePoint(EventHubEventDto event) { - return event != null - && event.eventDomain() == EventDomain.DRIVER_CARD - && (event.lifecycle() == EventLifecycle.INSERT || event.lifecycle() == EventLifecycle.WITHDRAW) - && event.occurredAt() != null; - } - - private static RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) { - JsonNode raw = rawPayload(event); - String sourceKind = firstNonBlank(text(raw, "sourceKind"), sourceKind(event)); - String extractionCode = firstNonBlank( - text(raw, "extractionCode"), - fileSessionExtractionCode(event, sourceKind), - extractionCodeFromExternalSourceEventId(event) - ); - String sourceSystem = firstNonBlank( - text(raw, "sourceSystem"), - sourceProvider(event), - sourceSystemFromExternalSourceEventId(event) - ); - if (sourceSystem == null && (extractionCode != null || isTachographFileSessionEvent(event))) { - sourceSystem = "TACHOGRAPH"; - } - return new RuntimeEventSourceProfile( - normalizeUpper(sourceSystem), - normalizeUpper(sourceKind), - normalizeUpper(extractionCode) - ); - } - - private static String extractionCode(EventHubEventDto event) { - return sourceProfile(event).extractionCode(); - } - private static boolean isTachographRuntimeSource(RuntimeEventSourceProfile profile) { - if (profile == null) { - return false; - } - return switch (nullToEmpty(profile.sourceSystem())) { - case "TACHOGRAPH", "TACHOGRAPH_FILE_SESSION", "COMPOSITE_TACHOGRAPH_FILE_SESSION" -> true; - default -> false; - }; - } - - private static String fileSessionExtractionCode(EventHubEventDto event, String sourceKind) { - if (!isTachographFileSessionEvent(event)) { - return null; - } - String normalizedSourceKind = normalizeUpper(sourceKind); - if (normalizedSourceKind == null) { - return null; - } - if (event != null && event.eventDomain() == EventDomain.DRIVER_ACTIVITY) { - return switch (normalizedSourceKind) { - case "DRIVER_CARD" -> "CARD_ACTIVITY"; - case "VEHICLE_UNIT" -> "VU_ACTIVITY"; - default -> null; - }; - } - if (event != null && event.eventDomain() == EventDomain.DRIVER_CARD) { - return switch (normalizedSourceKind) { - case "DRIVER_CARD" -> "CARD_VEHICLES_USED"; - case "VEHICLE_UNIT" -> "IW_CYCLE"; - default -> null; - }; - } - if (event == null || event.eventDomain() == null) { - return null; - } - String prefix = switch (normalizedSourceKind) { - case "DRIVER_CARD" -> "CARD"; - case "VEHICLE_UNIT" -> "VU"; - default -> null; - }; - if (prefix == null) { - return null; - } - return switch (event.eventDomain()) { - case POSITION -> prefix + "_POSITION"; - case PLACE -> prefix + "_PLACE"; - case BORDER_CROSSING -> prefix + "_BORDER_CROSSING"; - case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null; - default -> null; - }; - } - - private static boolean isTachographFileSessionEvent(EventHubEventDto event) { - if (event == null) { - return false; - } - String packageKind = event.sourcePackageRef() == null ? null : normalizeUpper(event.sourcePackageRef().packageKind()); - if (Objects.equals("TACHOGRAPH_FILE_SESSION", packageKind) - || Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", packageKind)) { - return true; - } - String externalId = event.externalSourceEventId(); - return externalId != null - && (externalId.startsWith("TACHOGRAPH_FILE_SESSION:") - || externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:")); - } - - - private static String compatibleActivityKey(EventHubEventDto event) { - JsonNode raw = rawPayload(event); - return String.join("|", - "ACTIVITY_COMPATIBLE", - nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()), - nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)), - nullToEmpty(event.eventDomain() == null ? null : event.eventDomain().name()), - nullToEmpty(event.eventType() == null ? null : event.eventType().name()), - nullToEmpty(event.lifecycle() == null ? null : event.lifecycle().name()), - normalizeTime(event.occurredAt()), - nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)), - nullToEmpty(firstNonBlank(text(raw, "startedAt"), text(raw, "intervalStartedAt"))), - nullToEmpty(firstNonBlank(text(raw, "endedAt"), text(raw, "intervalEndedAt"))), - nullToEmpty(firstNonBlank(text(raw, "slot"), text(raw, "cardSlot"))), - nullToEmpty(firstNonBlank(text(raw, "cardStatus"))), - nullToEmpty(firstNonBlank(text(raw, "drivingStatus"))) - ); - } - - private static String compatibleSupportEvidenceKey(EventHubEventDto event) { - JsonNode raw = rawPayload(event); - GeoPointDto position = event == null ? null : event.position(); - return String.join("|", - "SUPPORT_COMPATIBLE", - nullToEmpty(event == null || event.packageInfo() == null ? null : event.packageInfo().tenantKey()), - nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)), - nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()), - nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()), - nullToEmpty(event == null || event.lifecycle() == null ? null : event.lifecycle().name()), - normalizeTime(event == null ? null : event.occurredAt()), - nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)), - nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())), - nullToEmpty(firstNonBlank(text(raw, "longitude"), position == null || position.longitude() == null ? null : position.longitude().toPlainString())), - nullToEmpty(firstNonBlank(text(raw, "odometerM"), event == null || event.odometerM() == null ? null : String.valueOf(event.odometerM()))), - nullToEmpty(firstNonBlank(text(raw, "country"), detailText(event, "country"))), - nullToEmpty(firstNonBlank(text(raw, "region"), detailText(event, "region"))), - nullToEmpty(firstNonBlank(text(raw, "countryFrom"), detailText(event, "countryFrom"))), - nullToEmpty(firstNonBlank(text(raw, "countryTo"), detailText(event, "countryTo"))), - nullToEmpty(firstNonBlank(text(raw, "operation"), detailText(event, "operation"))) - ); - } - - private static Comparator eventComparator() { - return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder())) - .thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name()) - .thenComparing(event -> event.eventType() == null ? "" : event.eventType().name()) - .thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name()) - .thenComparing(event -> sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo)) - .thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo)); - } - - private static List sort(List events) { - return (events == null ? List.of() : events).stream() - .filter(Objects::nonNull) - .sorted(eventComparator()) - .toList(); - } - private static String normalizeMode(String requestedMode) { - String value = normalizeUpper(requestedMode); + String value = requestedMode == null || requestedMode.isBlank() + ? null + : requestedMode.trim().toUpperCase(java.util.Locale.ROOT); if (value == null) { return MODE_TACHOGRAPH_SAME_SOURCE; } @@ -547,62 +324,8 @@ public class RuntimeEventMixingService { }; } - private static JsonNode rawPayload(EventHubEventDto event) { - return RuntimeEntityReferenceResolver.rawPayload(event); - } - - private static String sourceKind(EventHubEventDto event) { - return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null - ? null - : event.packageInfo().eventSource().sourceKind(); - } - - private static String sourceProvider(EventHubEventDto event) { - return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null - ? null - : event.packageInfo().eventSource().providerKey(); - } - - private static String sourceSystemFromExternalSourceEventId(EventHubEventDto event) { - String externalId = event == null ? null : event.externalSourceEventId(); - if (externalId == null || externalId.isBlank()) { - return null; - } - String[] parts = externalId.split(":"); - return parts.length >= 1 ? parts[0] : null; - } - - private static String extractionCodeFromExternalSourceEventId(EventHubEventDto event) { - String externalId = event == null ? null : event.externalSourceEventId(); - if (externalId == null || externalId.isBlank()) { - return null; - } - String[] parts = externalId.split(":"); - return parts.length >= 2 ? parts[1] : null; - } - - private static String detailText(EventHubEventDto event, String field) { - if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) { - return null; - } - JsonNode value = event.eventDetails().attributes().get(field); - if (value == null || value.isNull()) { - return null; - } - String text = value.asText(null); - return text == null || text.isBlank() ? null : text.trim(); - } - - private static String text(JsonNode node, String field) { - if (node == null || field == null) { - return null; - } - JsonNode value = node.get(field); - if (value == null || value.isNull()) { - return null; - } - String text = value.asText(null); - return text == null || text.isBlank() ? null : text.trim(); + private static boolean notBlank(String value) { + return value != null && !value.isBlank(); } private static String firstNonBlank(String... values) { @@ -617,56 +340,120 @@ public class RuntimeEventMixingService { return null; } - private static String normalizeUpper(String value) { - return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT); - } - - private static String nullToEmpty(Object value) { - return value == null ? "" : String.valueOf(value); - } - - private static String normalizeTime(OffsetDateTime value) { - return value == null ? "" : value.toInstant().toString(); - } - - private static boolean notBlank(String value) { - return value != null && !value.isBlank(); - } - private static final class MixingState { - private final List rawEvents; + private final List descriptors; + private final Map descriptorsByEventId = new LinkedHashMap<>(); private final Set suppressedEventIds = new LinkedHashSet<>(); private final Map replacementsByEventId = new LinkedHashMap<>(); + private final Map resolvedEventsByEventId = new LinkedHashMap<>(); private final List suppressedEvents = new ArrayList<>(); private final List decisions = new ArrayList<>(); private final List warnings = new ArrayList<>(); - private MixingState(List rawEvents) { - this.rawEvents = rawEvents; + private MixingState(List descriptors) { + this.descriptors = descriptors == null ? List.of() : List.copyOf(descriptors); + for (RuntimeEventDescriptor descriptor : this.descriptors) { + descriptorsByEventId.put(descriptor.eventIdentityKey(), descriptor); + } } - private List rawEvents() { - return rawEvents; + private List descriptors() { + return descriptors; + } + + private boolean isSuppressed(RuntimeEventDescriptor descriptor) { + return descriptor != null && suppressedEventIds.contains(descriptor.eventIdentityKey()); } private boolean isSuppressed(EventHubEventDto event) { - return suppressedEventIds.contains(eventId(event)); + RuntimeEventDescriptor descriptor = descriptor(event); + return descriptor != null && isSuppressed(descriptor); } private EventHubEventDto effectiveEvent(EventHubEventDto event) { - return replacementsByEventId.getOrDefault(eventId(event), event); + RuntimeEventDescriptor descriptor = descriptor(event); + if (descriptor == null) { + return event; + } + return replacementsByEventId.getOrDefault(descriptor.eventIdentityKey(), event); } - private void replace(EventHubEventDto original, EventHubEventDto replacement) { + private RuntimeEventDescriptor descriptor(EventHubEventDto event) { + if (event == null) { + return null; + } + String externalId = event.externalSourceEventId(); + if (externalId != null && descriptorsByEventId.containsKey(externalId)) { + return descriptorsByEventId.get(externalId); + } + return descriptors.stream() + .filter(descriptor -> descriptor.event() == event || Objects.equals(descriptor.event(), event)) + .findFirst() + .orElse(null); + } + + private void replace(RuntimeEventDescriptor original, EventHubEventDto replacement) { if (original != null && replacement != null) { - replacementsByEventId.put(eventId(original), replacement); + replacementsByEventId.put(original.eventIdentityKey(), replacement); } } - private void suppress(EventHubEventDto event) { - if (suppressedEventIds.add(eventId(event))) { - suppressedEvents.add(event); + private void markPrimary( + RuntimeEventDescriptor primary, + RuntimeEventMixingRule rule, + String eventKey, + List secondaries + ) { + if (primary == null) { + return; } + EventHubEventDto effectivePrimary = replacementsByEventId.getOrDefault(primary.eventIdentityKey(), primary.event()); + resolvedEventsByEventId.put(primary.eventIdentityKey(), new RuntimeResolvedEvent( + effectivePrimary, + rule.channel(), + rule.primaryRole(), + rule.ruleId(), + rule.equivalenceType(), + eventKey, + primary.event().externalSourceEventId(), + secondaries.stream().map(descriptor -> descriptor.event().externalSourceEventId()).toList(), + rule.reason() + )); + } + + private void suppress( + RuntimeEventDescriptor descriptor, + RuntimeEventMixingRule rule, + RuntimeEventDescriptor primary, + String eventKey + ) { + if (descriptor == null || descriptor.event() == null) { + return; + } + if (suppressedEventIds.add(descriptor.eventIdentityKey())) { + suppressedEvents.add(descriptor.event()); + resolvedEventsByEventId.put(descriptor.eventIdentityKey(), new RuntimeResolvedEvent( + descriptor.event(), + rule.channel(), + rule.secondaryRole(), + rule.ruleId(), + rule.equivalenceType(), + eventKey, + primary == null || primary.event() == null ? null : primary.event().externalSourceEventId(), + primary == null || primary.event() == null + ? List.of() + : List.of(primary.event().externalSourceEventId()), + rule.reason() + )); + } + } + + private RuntimeResolvedEvent resolvedEvent(EventHubEventDto event) { + RuntimeEventDescriptor descriptor = descriptor(event); + if (descriptor == null) { + return null; + } + return resolvedEventsByEventId.get(descriptor.eventIdentityKey()); } private List suppressedEvents() { @@ -680,12 +467,5 @@ public class RuntimeEventMixingService { private List warnings() { return warnings; } - - private String eventId(EventHubEventDto event) { - if (event == null) { - return ""; - } - return firstNonBlank(event.externalSourceEventId(), event.eventId() == null ? null : event.eventId().toString(), RuntimeEventIdentityResolver.canonicalEventKey(event)); - } } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventSourceProfile.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventSourceProfile.java index b179099..0591e6c 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventSourceProfile.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventSourceProfile.java @@ -5,4 +5,10 @@ public record RuntimeEventSourceProfile( String sourceKind, String extractionCode ) { + public boolean isTachographRuntimeSource() { + return switch (sourceSystem == null ? "" : sourceSystem) { + case "TACHOGRAPH", "TACHOGRAPH_FILE_SESSION", "COMPOSITE_TACHOGRAPH_FILE_SESSION" -> true; + default -> false; + }; + } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeMixedEventBundle.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeMixedEventBundle.java index 7e482c9..af62827 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeMixedEventBundle.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeMixedEventBundle.java @@ -10,6 +10,7 @@ public record RuntimeMixedEventBundle( List vehicleUsageEvents, List supportEvidenceEvents, List suppressedEvents, + List resolvedEvents, List eventMixingDecisions, List notes, List warnings @@ -21,6 +22,7 @@ public record RuntimeMixedEventBundle( vehicleUsageEvents = vehicleUsageEvents == null ? List.of() : List.copyOf(vehicleUsageEvents); supportEvidenceEvents = supportEvidenceEvents == null ? List.of() : List.copyOf(supportEvidenceEvents); suppressedEvents = suppressedEvents == null ? List.of() : List.copyOf(suppressedEvents); + resolvedEvents = resolvedEvents == null ? List.of() : List.copyOf(resolvedEvents); eventMixingDecisions = eventMixingDecisions == null ? List.of() : List.copyOf(eventMixingDecisions); notes = notes == null ? List.of() : List.copyOf(notes); warnings = warnings == null ? List.of() : List.copyOf(warnings); diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java index 8b2c227..36dbf96 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java @@ -39,9 +39,9 @@ public class EventEvidenceMixingModule implements RuntimeProcessingModule { return new RuntimeProcessingModuleDescriptorDto( moduleKey(), "Event evidence mixing", - "Applies source-aware runtime evidence rules before intervalization. The initial rule collapses duplicate tachograph CARD_ACTIVITY/VU_ACTIVITY events by normalized eventKey while keeping CARD_VEHICLES_USED unchanged for vehicle-usage processing.", + "Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged for vehicle-usage processing.", "JAVA", - Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto") + Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto") ); } @@ -59,6 +59,7 @@ public class EventEvidenceMixingModule implements RuntimeProcessingModule { metadata.put("vehicleUsageEventCount", mixed.vehicleUsageEvents().size()); metadata.put("supportEvidenceEventCount", mixed.supportEvidenceEvents().size()); metadata.put("suppressedEventCount", mixed.suppressedEvents().size()); + metadata.put("resolvedEventCount", mixed.resolvedEvents().size()); metadata.put("eventMixingDecisionCount", mixed.eventMixingDecisions().size()); metadata.put("eventMixingMode", eventMixingMode(context)); return new RuntimeProcessingModuleResult( diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java index 6407fe0..0652df5 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java @@ -117,9 +117,9 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing descriptors.add(new RuntimeProcessingModuleDescriptorDto( DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING, "Event evidence mixing", - "Applies source-aware runtime evidence rules before intervalization. Initially collapses same-event-key CARD_ACTIVITY/VU_ACTIVITY duplicates and leaves CARD_VEHICLES_USED unchanged.", + "Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged.", "JAVA", - Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto") + Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto") )); } descriptors.addAll(List.of(