From dd5c32f44f042247cb1762aace6dfae7b2e5a0c9 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 15 Jun 2026 12:53:54 +0200 Subject: [PATCH] Refine runtime event aggregation --- README_PATCH.md | 115 ++------------- .../RuntimeEventAggregationService.java | 133 +++++++----------- .../RuntimeEventAggregationServiceTest.java | 133 +++++++++++++++--- 3 files changed, 177 insertions(+), 204 deletions(-) diff --git a/README_PATCH.md b/README_PATCH.md index a37b85a..26ed6d5 100644 --- a/README_PATCH.md +++ b/README_PATCH.md @@ -1,109 +1,16 @@ -# Patch: Vehicle Usage Interval Reconciliation +# Card-place aggregation regression fix -This patch extends the already introduced runtime event-mixing architecture with an interval-level reconciliation step for tachograph vehicle-usage evidence. +This patch changes `RuntimeEventAggregationService` so that it removes only repeated reads of the same physical source record. -## New module +## Root cause -Added runtime module: +The parity implementation performed a second reduction by canonical semantic event key. Distinct same-source support records could therefore be collapsed merely because their normalized event content was equal. File-session card-place identifiers such as `CARDPLACE-1` may also repeat in separate XML `Places` sections, so generated identifiers alone are not a safe physical-record key. -```text -vehicle-usage-reconciliation -``` +## Fix -It runs after: - -```text -event-to-vehicle-usage-intervals -``` - -and before: - -```text -vehicle-usage-merge -``` - -## Main behavior - -The module intentionally does not mix `CARD_VEHICLES_USED` and `IW_CYCLE` at event level. Instead, it reconciles the completed vehicle-usage intervals. - -Processing phases: - -1. Split raw vehicle-usage intervals by source type: - - `CARD_VEHICLES_USED` - - `IW_CYCLE` - - `OTHER` -2. Normalize `CARD_VEHICLES_USED` technical midnight splits. -3. Reconcile normalized `CARD_VEHICLES_USED` intervals with `IW_CYCLE` intervals. -4. Produce effective vehicle-usage intervals for downstream processing. - -## CVU technical midnight split - -The technical midnight split is handled only for `CARD_VEHICLES_USED` / CVU intervals, not for `IW_CYCLE`. - -Pattern: - -```text -CARD_VEHICLES_USED interval A ends at 23:59:59 -CARD_VEHICLES_USED interval B starts at 00:00:00 -same driver -same registration / compatible vehicle -max gap: 1 second -``` - -Result: - -```text -A + B => one normalized CARD_VEHICLES_USED interval -``` - -## CVU vs IW reconciliation - -After CVU normalization: - -```text -normalized CARD_VEHICLES_USED interval -vs -IW_CYCLE interval -``` - -Rule: - -```text -IW_CYCLE is primary for effective vehicle-usage identity. -CARD_VEHICLES_USED is fallback or corroborating evidence. -``` - -Matching currently supports exact or compatible start/end boundaries with a 60-second tolerance. - -## New classes - -```text -src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleUsageReconciliationModule.java -src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalDescriptor.java -src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalDescriptorFactory.java -src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalRole.java -src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalSourceType.java -src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationDecisionDto.java -src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationResult.java -src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationService.java -src/test/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationServiceTest.java -``` - -## Modified existing files - -```text -src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java -src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageMergeModule.java -src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java -``` - -## Notes - -`vehicle-usage-merge` now consumes the effective intervals from `vehicle-usage-reconciliation` when that module has run. If the reconciliation module is omitted from a custom module list, `vehicle-usage-merge` falls back to raw `event-to-vehicle-usage-intervals` output. - -Tests were added for: - -- CVU technical midnight split coalescing -- CVU + IW reconciliation with IW as primary -- CVU fallback when IW is missing -- IW primary when CVU is missing +- Prefer `raw.rawRecordPath` as the physical identity for file-session records. +- Fall back to `raw.sourceRowId`, `raw.supportEventId`, `externalSourceEventId`, event UUID, then canonical key. +- Include domain, type, semantic lifecycle and timestamp so START/END points of one interval remain separate. +- Remove canonical semantic reduction from aggregation. +- Preserve all card/VU evidence for downstream mixing and all CVU/IW evidence for interval reconciliation. +- Add regression tests for repeated `CARDPLACE-*` identifiers across XML sections and semantically equal but physically distinct place records. diff --git a/src/main/java/at/procon/eventhub/processing/service/RuntimeEventAggregationService.java b/src/main/java/at/procon/eventhub/processing/service/RuntimeEventAggregationService.java index b3e1660..7b2dd61 100644 --- a/src/main/java/at/procon/eventhub/processing/service/RuntimeEventAggregationService.java +++ b/src/main/java/at/procon/eventhub/processing/service/RuntimeEventAggregationService.java @@ -4,44 +4,31 @@ import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.SourcePackageRefDto; import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeEventSourceProfile; import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeTachographEventSemantics; +import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver; import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver; -import java.util.ArrayList; +import com.fasterxml.jackson.databind.JsonNode; import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; -import java.util.Set; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * Aggregates runtime events before plan-specific semantic processing. * - *

The aggregation removes repeated reads of the same source record and duplicate serialized - * representations of the same extraction observation. Tachograph evidence pairs that must be - * resolved later remain distinct: CARD/VU activity and support events are preserved for - * {@code RuntimeEventMixingService}, while CARD_VEHICLES_USED/IW_CYCLE remain distinct for - * interval reconciliation.

+ *

This service removes only repeated reads of the same physical source record. It deliberately + * does not collapse merely semantically equal events. Card/VU observations must remain available + * for {@code RuntimeEventMixingService}, while CARD_VEHICLES_USED/IW_CYCLE observations must remain + * available for interval-level reconciliation.

+ * + *

A source record is identified from the strongest available provenance value. File-session + * support events use {@code rawRecordPath}; database events normally use {@code sourceRowId}. + * Generated external event IDs are only a fallback because they are presentation identifiers and + * are not guaranteed to be unique for every extractor section.

*/ @Service public class RuntimeEventAggregationService { - private static final Set DOWNSTREAM_RESOLVED_TACHOGRAPH_EXTRACTION_CODES = Set.of( - "CARD_ACTIVITY", - "VU_ACTIVITY", - "CARD_VEHICLES_USED", - "IW_CYCLE", - "CARD_POSITION", - "VU_POSITION", - "CARD_PLACE", - "VU_PLACE", - "CARD_BORDER_CROSSING", - "VU_BORDER_CROSSING", - "CARD_LOAD_UNLOAD", - "VU_LOAD_UNLOAD", - "CARD_SPECIFIC_CONDITION", - "VU_SPECIFIC_CONDITION" - ); - private final RuntimeTachographEventSemantics tachographSemantics; @Autowired @@ -62,18 +49,9 @@ public class RuntimeEventAggregationService { appendExactSourceRecords(exactSourceRecords, eventGroup); } } - - LinkedHashMap> canonicalGroups = new LinkedHashMap<>(); - for (EventHubEventDto event : exactSourceRecords.values()) { - canonicalGroups.computeIfAbsent( - canonicalAggregationKey(event), - ignored -> new ArrayList<>() - ).add(event); - } - - List aggregated = new ArrayList<>(); - canonicalGroups.values().forEach(group -> aggregated.addAll(reduceCanonicalGroup(group))); - return aggregated.stream().sorted(eventComparator()).toList(); + return exactSourceRecords.values().stream() + .sorted(eventComparator()) + .toList(); } /** Compatibility alias for the first implementation name. */ @@ -86,15 +64,11 @@ public class RuntimeEventAggregationService { if (event == null) { return "NULL_EVENT"; } + RuntimeEventSourceProfile profile = tachographSemantics.sourceProfile(event); SourcePackageRefDto sourcePackage = event.sourcePackageRef(); - String sourceIdentity = firstNonBlank( - event.externalSourceEventId(), - event.eventId() == null ? null : event.eventId().toString() - ); - if (sourceIdentity == null) { - sourceIdentity = RuntimeEventIdentityResolver.canonicalEventKey(event); - } + JsonNode raw = RuntimeEntityReferenceResolver.rawPayload(event); + return String.join("|", "SOURCE_RECORD", nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()), @@ -104,43 +78,39 @@ public class RuntimeEventAggregationService { nullToEmpty(sourcePackage == null ? null : sourcePackage.packageKind()), nullToEmpty(sourcePackage == null ? null : sourcePackage.sourcePackageId()), nullToEmpty(sourcePackage == null ? null : sourcePackage.sourceEntityId()), - sourceIdentity + sourceRecordIdentity(event, raw), + nullToEmpty(event.eventDomain() == null ? null : event.eventDomain().name()), + nullToEmpty(event.eventType() == null ? null : event.eventType().name()), + nullToEmpty(tachographSemantics.semanticLifecycle(event)), + event.occurredAt() == null ? "" : event.occurredAt().toInstant().toString() ); } - private String canonicalAggregationKey(EventHubEventDto event) { - String canonicalKey = RuntimeEventIdentityResolver.canonicalEventKey(event); - String semanticLifecycle = tachographSemantics.semanticLifecycle(event); - if (event == null || event.lifecycle() == null || semanticLifecycle == null - || event.lifecycle().name().equals(semanticLifecycle)) { - return canonicalKey; - } - String[] parts = canonicalKey.split("\\|", -1); - if (parts.length > 4 && "EVENT".equals(parts[0])) { - parts[4] = semanticLifecycle; - return String.join("|", parts); - } - return canonicalKey; - } - - private List reduceCanonicalGroup(List group) { - if (group == null || group.size() <= 1) { - return group == null ? List.of() : List.copyOf(group); + private String sourceRecordIdentity(EventHubEventDto event, JsonNode raw) { + String rawRecordPath = text(raw, "rawRecordPath"); + if (rawRecordPath != null) { + return "RAW_PATH:" + rawRecordPath; } - LinkedHashMap tachographEvidenceByExtraction = new LinkedHashMap<>(); - for (EventHubEventDto event : group) { - RuntimeEventSourceProfile profile = tachographSemantics.sourceProfile(event); - if (profile.isTachographRuntimeSource() - && DOWNSTREAM_RESOLVED_TACHOGRAPH_EXTRACTION_CODES.contains(profile.extractionCode())) { - tachographEvidenceByExtraction.putIfAbsent(profile.extractionCode(), event); - } + String sourceRowId = text(raw, "sourceRowId"); + if (sourceRowId != null) { + return "SOURCE_ROW:" + sourceRowId; } - if (tachographEvidenceByExtraction.size() > 1) { - return List.copyOf(tachographEvidenceByExtraction.values()); + String supportEventId = text(raw, "supportEventId"); + if (supportEventId != null) { + return "SUPPORT_EVENT:" + supportEventId; } - return List.of(group.getFirst()); + + if (event.externalSourceEventId() != null && !event.externalSourceEventId().isBlank()) { + return "EXTERNAL:" + event.externalSourceEventId().trim(); + } + + if (event.eventId() != null) { + return "EVENT_ID:" + event.eventId(); + } + + return "CANONICAL:" + RuntimeEventIdentityResolver.canonicalEventKey(event); } private void appendExactSourceRecords( @@ -161,21 +131,18 @@ public class RuntimeEventAggregationService { return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder())) .thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name()) .thenComparing(event -> event.eventType() == null ? "" : event.eventType().name()) - .thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name()) + .thenComparing(event -> tachographSemantics.semanticLifecycle(event), Comparator.nullsLast(String::compareTo)) .thenComparing(event -> tachographSemantics.sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo)) + .thenComparing(this::stableSourceIdentityForSort) .thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo)); } - private String firstNonBlank(String... values) { - if (values == null) { - return null; - } - for (String value : values) { - if (value != null && !value.isBlank()) { - return value.trim(); - } - } - return null; + private String stableSourceIdentityForSort(EventHubEventDto event) { + return sourceRecordIdentity(event, RuntimeEntityReferenceResolver.rawPayload(event)); + } + + private String text(JsonNode node, String field) { + return RuntimeEntityReferenceResolver.text(node, field); } private String nullToEmpty(Object value) { diff --git a/src/test/java/at/procon/eventhub/processing/service/RuntimeEventAggregationServiceTest.java b/src/test/java/at/procon/eventhub/processing/service/RuntimeEventAggregationServiceTest.java index 96f1847..9dfc16a 100644 --- a/src/test/java/at/procon/eventhub/processing/service/RuntimeEventAggregationServiceTest.java +++ b/src/test/java/at/procon/eventhub/processing/service/RuntimeEventAggregationServiceTest.java @@ -32,7 +32,10 @@ class RuntimeEventAggregationServiceTest { "TACHOGRAPH:CARD_POSITION:10", EventDomain.POSITION, EventType.POSITION_RECORDED, - EventLifecycle.SNAPSHOT + EventLifecycle.SNAPSHOT, + "10", + null, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); EventHubEventDto vuPosition = event( "VEHICLE_UNIT", @@ -40,7 +43,10 @@ class RuntimeEventAggregationServiceTest { "TACHOGRAPH:VU_POSITION:20", EventDomain.POSITION, EventType.POSITION_RECORDED, - EventLifecycle.SNAPSHOT + EventLifecycle.SNAPSHOT, + "20", + null, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); EventHubEventDto cvu = event( "DRIVER_CARD", @@ -48,7 +54,10 @@ class RuntimeEventAggregationServiceTest { "TACHOGRAPH:CARD_VEHICLES_USED:30:INSERT", EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, - EventLifecycle.INSERT + EventLifecycle.INSERT, + "30", + null, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); EventHubEventDto iwCycle = event( "VEHICLE_UNIT", @@ -56,7 +65,10 @@ class RuntimeEventAggregationServiceTest { "TACHOGRAPH:IW_CYCLE:40:INSERT", EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, - EventLifecycle.INSERT + EventLifecycle.INSERT, + "40", + null, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); List aggregated = service.aggregateRuntimeEvents( @@ -74,16 +86,18 @@ class RuntimeEventAggregationServiceTest { ); } - @Test - void collapsesDuplicateSerializedRepresentationsOfSameExtractionObservation() { + void deduplicatesRepresentationsSharingTheSameSourceRowIdentity() { EventHubEventDto first = event( "DRIVER_CARD", "CARD_POSITION", "TACHOGRAPH:CARD_POSITION:10", EventDomain.POSITION, EventType.POSITION_RECORDED, - EventLifecycle.SNAPSHOT + EventLifecycle.SNAPSHOT, + "10", + null, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); EventHubEventDto serializedCopy = event( "DRIVER_CARD", @@ -91,37 +105,107 @@ class RuntimeEventAggregationServiceTest { "TACHOGRAPH:CARD_POSITION:COPY-10", EventDomain.POSITION, EventType.POSITION_RECORDED, - EventLifecycle.SNAPSHOT + EventLifecycle.SNAPSHOT, + "10", + null, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); assertThat(service.aggregateRuntimeEvents(List.of(first, serializedCopy))) .containsExactly(first); } - @Test - void collapsesPlaceStartAndBeginRepresentationsForSameExtractionSource() { + void collapsesPlaceStartAndBeginRepresentationsForTheSamePhysicalRecord() { + String rawRecordPath = "/DriverCard/Places[1]/cardPlaceDailyWorkPeriod/placeRecords[1]"; EventHubEventDto dbStyle = event( "DRIVER_CARD", "CARD_PLACE", "TACHOGRAPH:CARD_PLACE:10", EventDomain.PLACE, EventType.WORKING_DAY_PLACE_RECORDED, - EventLifecycle.START + EventLifecycle.START, + "CARDPLACE-1", + rawRecordPath, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); EventHubEventDto fileStyle = event( "DRIVER_CARD", "CARD_PLACE", - "TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:place-10:BEGIN:2026-04-01T08:00:00Z", + "TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:CARDPLACE-1:BEGIN:2026-04-01T08:00:00Z", EventDomain.PLACE, EventType.WORKING_DAY_PLACE_RECORDED, - EventLifecycle.BEGIN + EventLifecycle.BEGIN, + "CARDPLACE-1", + rawRecordPath, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); assertThat(service.aggregateRuntimeEvents(List.of(dbStyle, fileStyle))) .containsExactly(dbStyle); } + @Test + void keepsDistinctCardPlaceRecordsWhenGeneratedIdsRepeatAcrossSections() { + String repeatedExternalId = "TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:CARDPLACE-1:BEGIN:2026-04-01T08:00:00Z"; + EventHubEventDto firstSection = event( + "DRIVER_CARD", + "CARD_PLACE", + repeatedExternalId, + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.BEGIN, + "CARDPLACE-1", + "/DriverCard/Places[1]/cardPlaceDailyWorkPeriod/placeRecords[1]", + OffsetDateTime.parse("2026-04-01T08:00:00Z") + ); + EventHubEventDto secondSection = event( + "DRIVER_CARD", + "CARD_PLACE", + repeatedExternalId, + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.BEGIN, + "CARDPLACE-1", + "/DriverCard/Places[2]/cardPlaceDailyWorkPeriod/placeRecords[1]", + OffsetDateTime.parse("2026-04-01T08:00:00Z") + ); + + assertThat(service.aggregateRuntimeEvents(List.of(firstSection, secondSection))) + .containsExactly(firstSection, secondSection); + assertThat(service.exactSourceRecordKey(firstSection)) + .isNotEqualTo(service.exactSourceRecordKey(secondSection)); + } + + @Test + void keepsSemanticallyEqualRecordsWhenTheirPhysicalSourceIdentityDiffers() { + EventHubEventDto first = event( + "DRIVER_CARD", + "CARD_PLACE", + "TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:CARDPLACE-1:BEGIN:2026-04-01T08:00:00Z", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.BEGIN, + "CARDPLACE-1", + "/DriverCard/Places[1]/cardPlaceDailyWorkPeriod/placeRecords[1]", + OffsetDateTime.parse("2026-04-01T08:00:00Z") + ); + EventHubEventDto second = event( + "DRIVER_CARD", + "CARD_PLACE", + "TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:CARDPLACE-2:BEGIN:2026-04-01T08:00:00Z", + EventDomain.PLACE, + EventType.WORKING_DAY_PLACE_RECORDED, + EventLifecycle.BEGIN, + "CARDPLACE-2", + "/DriverCard/Places[1]/cardPlaceDailyWorkPeriod/placeRecords[2]", + OffsetDateTime.parse("2026-04-01T08:00:00Z") + ); + + assertThat(service.aggregateRuntimeEvents(List.of(first, second))) + .containsExactly(first, second); + } + @Test void keepsDifferentExtractionSourcesEvenWhenSemanticEventDataIsEqual() { EventHubEventDto card = event( @@ -130,7 +214,10 @@ class RuntimeEventAggregationServiceTest { "TACHOGRAPH:CARD_ACTIVITY:10:START", EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, - EventLifecycle.START + EventLifecycle.START, + "10", + null, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); EventHubEventDto vu = event( "VEHICLE_UNIT", @@ -138,7 +225,10 @@ class RuntimeEventAggregationServiceTest { "TACHOGRAPH:VU_ACTIVITY:20:START", EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, - EventLifecycle.START + EventLifecycle.START, + "20", + null, + OffsetDateTime.parse("2026-04-01T08:00:00Z") ); assertThat(service.aggregateRuntimeEvents(List.of(card, vu))) @@ -153,7 +243,10 @@ class RuntimeEventAggregationServiceTest { String externalSourceEventId, EventDomain domain, EventType eventType, - EventLifecycle lifecycle + EventLifecycle lifecycle, + String sourceRowId, + String rawRecordPath, + OffsetDateTime occurredAt ) { EventSourceDto source = new EventSourceDto( "TACHOGRAPH", @@ -177,6 +270,12 @@ class RuntimeEventAggregationServiceTest { raw.put("extractionCode", extractionCode); raw.put("driverKey", "12:123"); raw.put("registrationKey", "12:REG-1"); + if (sourceRowId != null) { + raw.put("sourceRowId", sourceRowId); + } + if (rawRecordPath != null) { + raw.put("rawRecordPath", rawRecordPath); + } ObjectNode payload = JsonNodeFactory.instance.objectNode(); payload.set("raw", raw); return new EventHubEventDto( @@ -184,7 +283,7 @@ class RuntimeEventAggregationServiceTest { externalSourceEventId, new DriverRefDto("driver-1", new DriverCardRefDto("12", "123")), new VehicleRefDto("vehicle-1", "VIN-1", new VehicleRegistrationRefDto("12", "REG-1")), - OffsetDateTime.parse("2026-04-01T08:00:00Z"), + occurredAt, null, null, domain,