Refine runtime event aggregation

This commit is contained in:
trifonovt 2026-06-15 12:53:54 +02:00
parent 729e6fb261
commit dd5c32f44f
3 changed files with 177 additions and 204 deletions

View File

@ -1,109 +1,16 @@
# Patch: Vehicle Usage Interval Reconciliation
# Card-place aggregation regression fix
This patch extends the already introduced runtime event-mixing architecture with an interval-level reconciliation step for tachograph vehicle-usage evidence.
This patch changes `RuntimeEventAggregationService` so that it removes only repeated reads of the same physical source record.
## New module
## Root cause
Added runtime module:
The parity implementation performed a second reduction by canonical semantic event key. Distinct same-source support records could therefore be collapsed merely because their normalized event content was equal. File-session card-place identifiers such as `CARDPLACE-1` may also repeat in separate XML `Places` sections, so generated identifiers alone are not a safe physical-record key.
```text
vehicle-usage-reconciliation
```
## Fix
It runs after:
```text
event-to-vehicle-usage-intervals
```
and before:
```text
vehicle-usage-merge
```
## Main behavior
The module intentionally does not mix `CARD_VEHICLES_USED` and `IW_CYCLE` at event level. Instead, it reconciles the completed vehicle-usage intervals.
Processing phases:
1. Split raw vehicle-usage intervals by source type:
- `CARD_VEHICLES_USED`
- `IW_CYCLE`
- `OTHER`
2. Normalize `CARD_VEHICLES_USED` technical midnight splits.
3. Reconcile normalized `CARD_VEHICLES_USED` intervals with `IW_CYCLE` intervals.
4. Produce effective vehicle-usage intervals for downstream processing.
## CVU technical midnight split
The technical midnight split is handled only for `CARD_VEHICLES_USED` / CVU intervals, not for `IW_CYCLE`.
Pattern:
```text
CARD_VEHICLES_USED interval A ends at 23:59:59
CARD_VEHICLES_USED interval B starts at 00:00:00
same driver
same registration / compatible vehicle
max gap: 1 second
```
Result:
```text
A + B => one normalized CARD_VEHICLES_USED interval
```
## CVU vs IW reconciliation
After CVU normalization:
```text
normalized CARD_VEHICLES_USED interval
vs
IW_CYCLE interval
```
Rule:
```text
IW_CYCLE is primary for effective vehicle-usage identity.
CARD_VEHICLES_USED is fallback or corroborating evidence.
```
Matching currently supports exact or compatible start/end boundaries with a 60-second tolerance.
## New classes
```text
src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleUsageReconciliationModule.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalDescriptor.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalDescriptorFactory.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalRole.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalSourceType.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationDecisionDto.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationResult.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationService.java
src/test/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationServiceTest.java
```
## Modified existing files
```text
src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java
src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageMergeModule.java
src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java
```
## Notes
`vehicle-usage-merge` now consumes the effective intervals from `vehicle-usage-reconciliation` when that module has run. If the reconciliation module is omitted from a custom module list, `vehicle-usage-merge` falls back to raw `event-to-vehicle-usage-intervals` output.
Tests were added for:
- CVU technical midnight split coalescing
- CVU + IW reconciliation with IW as primary
- CVU fallback when IW is missing
- IW primary when CVU is missing
- Prefer `raw.rawRecordPath` as the physical identity for file-session records.
- Fall back to `raw.sourceRowId`, `raw.supportEventId`, `externalSourceEventId`, event UUID, then canonical key.
- Include domain, type, semantic lifecycle and timestamp so START/END points of one interval remain separate.
- Remove canonical semantic reduction from aggregation.
- Preserve all card/VU evidence for downstream mixing and all CVU/IW evidence for interval reconciliation.
- Add regression tests for repeated `CARDPLACE-*` identifiers across XML sections and semantically equal but physically distinct place records.

View File

@ -4,44 +4,31 @@ import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.SourcePackageRefDto;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeEventSourceProfile;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeTachographEventSemantics;
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import java.util.ArrayList;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Aggregates runtime events before plan-specific semantic processing.
*
* <p>The aggregation removes repeated reads of the same source record and duplicate serialized
* representations of the same extraction observation. Tachograph evidence pairs that must be
* resolved later remain distinct: CARD/VU activity and support events are preserved for
* {@code RuntimeEventMixingService}, while CARD_VEHICLES_USED/IW_CYCLE remain distinct for
* interval reconciliation.</p>
* <p>This service removes only repeated reads of the same physical source record. It deliberately
* does not collapse merely semantically equal events. Card/VU observations must remain available
* for {@code RuntimeEventMixingService}, while CARD_VEHICLES_USED/IW_CYCLE observations must remain
* available for interval-level reconciliation.</p>
*
* <p>A source record is identified from the strongest available provenance value. File-session
* support events use {@code rawRecordPath}; database events normally use {@code sourceRowId}.
* Generated external event IDs are only a fallback because they are presentation identifiers and
* are not guaranteed to be unique for every extractor section.</p>
*/
@Service
public class RuntimeEventAggregationService {
private static final Set<String> DOWNSTREAM_RESOLVED_TACHOGRAPH_EXTRACTION_CODES = Set.of(
"CARD_ACTIVITY",
"VU_ACTIVITY",
"CARD_VEHICLES_USED",
"IW_CYCLE",
"CARD_POSITION",
"VU_POSITION",
"CARD_PLACE",
"VU_PLACE",
"CARD_BORDER_CROSSING",
"VU_BORDER_CROSSING",
"CARD_LOAD_UNLOAD",
"VU_LOAD_UNLOAD",
"CARD_SPECIFIC_CONDITION",
"VU_SPECIFIC_CONDITION"
);
private final RuntimeTachographEventSemantics tachographSemantics;
@Autowired
@ -62,18 +49,9 @@ public class RuntimeEventAggregationService {
appendExactSourceRecords(exactSourceRecords, eventGroup);
}
}
LinkedHashMap<String, List<EventHubEventDto>> canonicalGroups = new LinkedHashMap<>();
for (EventHubEventDto event : exactSourceRecords.values()) {
canonicalGroups.computeIfAbsent(
canonicalAggregationKey(event),
ignored -> new ArrayList<>()
).add(event);
}
List<EventHubEventDto> aggregated = new ArrayList<>();
canonicalGroups.values().forEach(group -> aggregated.addAll(reduceCanonicalGroup(group)));
return aggregated.stream().sorted(eventComparator()).toList();
return exactSourceRecords.values().stream()
.sorted(eventComparator())
.toList();
}
/** Compatibility alias for the first implementation name. */
@ -86,15 +64,11 @@ public class RuntimeEventAggregationService {
if (event == null) {
return "NULL_EVENT";
}
RuntimeEventSourceProfile profile = tachographSemantics.sourceProfile(event);
SourcePackageRefDto sourcePackage = event.sourcePackageRef();
String sourceIdentity = firstNonBlank(
event.externalSourceEventId(),
event.eventId() == null ? null : event.eventId().toString()
);
if (sourceIdentity == null) {
sourceIdentity = RuntimeEventIdentityResolver.canonicalEventKey(event);
}
JsonNode raw = RuntimeEntityReferenceResolver.rawPayload(event);
return String.join("|",
"SOURCE_RECORD",
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
@ -104,43 +78,39 @@ public class RuntimeEventAggregationService {
nullToEmpty(sourcePackage == null ? null : sourcePackage.packageKind()),
nullToEmpty(sourcePackage == null ? null : sourcePackage.sourcePackageId()),
nullToEmpty(sourcePackage == null ? null : sourcePackage.sourceEntityId()),
sourceIdentity
sourceRecordIdentity(event, raw),
nullToEmpty(event.eventDomain() == null ? null : event.eventDomain().name()),
nullToEmpty(event.eventType() == null ? null : event.eventType().name()),
nullToEmpty(tachographSemantics.semanticLifecycle(event)),
event.occurredAt() == null ? "" : event.occurredAt().toInstant().toString()
);
}
private String canonicalAggregationKey(EventHubEventDto event) {
String canonicalKey = RuntimeEventIdentityResolver.canonicalEventKey(event);
String semanticLifecycle = tachographSemantics.semanticLifecycle(event);
if (event == null || event.lifecycle() == null || semanticLifecycle == null
|| event.lifecycle().name().equals(semanticLifecycle)) {
return canonicalKey;
}
String[] parts = canonicalKey.split("\\|", -1);
if (parts.length > 4 && "EVENT".equals(parts[0])) {
parts[4] = semanticLifecycle;
return String.join("|", parts);
}
return canonicalKey;
}
private List<EventHubEventDto> reduceCanonicalGroup(List<EventHubEventDto> group) {
if (group == null || group.size() <= 1) {
return group == null ? List.of() : List.copyOf(group);
private String sourceRecordIdentity(EventHubEventDto event, JsonNode raw) {
String rawRecordPath = text(raw, "rawRecordPath");
if (rawRecordPath != null) {
return "RAW_PATH:" + rawRecordPath;
}
LinkedHashMap<String, EventHubEventDto> tachographEvidenceByExtraction = new LinkedHashMap<>();
for (EventHubEventDto event : group) {
RuntimeEventSourceProfile profile = tachographSemantics.sourceProfile(event);
if (profile.isTachographRuntimeSource()
&& DOWNSTREAM_RESOLVED_TACHOGRAPH_EXTRACTION_CODES.contains(profile.extractionCode())) {
tachographEvidenceByExtraction.putIfAbsent(profile.extractionCode(), event);
}
String sourceRowId = text(raw, "sourceRowId");
if (sourceRowId != null) {
return "SOURCE_ROW:" + sourceRowId;
}
if (tachographEvidenceByExtraction.size() > 1) {
return List.copyOf(tachographEvidenceByExtraction.values());
String supportEventId = text(raw, "supportEventId");
if (supportEventId != null) {
return "SUPPORT_EVENT:" + supportEventId;
}
return List.of(group.getFirst());
if (event.externalSourceEventId() != null && !event.externalSourceEventId().isBlank()) {
return "EXTERNAL:" + event.externalSourceEventId().trim();
}
if (event.eventId() != null) {
return "EVENT_ID:" + event.eventId();
}
return "CANONICAL:" + RuntimeEventIdentityResolver.canonicalEventKey(event);
}
private void appendExactSourceRecords(
@ -161,21 +131,18 @@ public class RuntimeEventAggregationService {
return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name())
.thenComparing(event -> event.eventType() == null ? "" : event.eventType().name())
.thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name())
.thenComparing(event -> tachographSemantics.semanticLifecycle(event), Comparator.nullsLast(String::compareTo))
.thenComparing(event -> tachographSemantics.sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo))
.thenComparing(this::stableSourceIdentityForSort)
.thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo));
}
private String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String value : values) {
if (value != null && !value.isBlank()) {
return value.trim();
}
}
return null;
private String stableSourceIdentityForSort(EventHubEventDto event) {
return sourceRecordIdentity(event, RuntimeEntityReferenceResolver.rawPayload(event));
}
private String text(JsonNode node, String field) {
return RuntimeEntityReferenceResolver.text(node, field);
}
private String nullToEmpty(Object value) {

View File

@ -32,7 +32,10 @@ class RuntimeEventAggregationServiceTest {
"TACHOGRAPH:CARD_POSITION:10",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT
EventLifecycle.SNAPSHOT,
"10",
null,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
EventHubEventDto vuPosition = event(
"VEHICLE_UNIT",
@ -40,7 +43,10 @@ class RuntimeEventAggregationServiceTest {
"TACHOGRAPH:VU_POSITION:20",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT
EventLifecycle.SNAPSHOT,
"20",
null,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
EventHubEventDto cvu = event(
"DRIVER_CARD",
@ -48,7 +54,10 @@ class RuntimeEventAggregationServiceTest {
"TACHOGRAPH:CARD_VEHICLES_USED:30:INSERT",
EventDomain.DRIVER_CARD,
EventType.CARD_INSERTED,
EventLifecycle.INSERT
EventLifecycle.INSERT,
"30",
null,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
EventHubEventDto iwCycle = event(
"VEHICLE_UNIT",
@ -56,7 +65,10 @@ class RuntimeEventAggregationServiceTest {
"TACHOGRAPH:IW_CYCLE:40:INSERT",
EventDomain.DRIVER_CARD,
EventType.CARD_INSERTED,
EventLifecycle.INSERT
EventLifecycle.INSERT,
"40",
null,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
List<EventHubEventDto> aggregated = service.aggregateRuntimeEvents(
@ -74,16 +86,18 @@ class RuntimeEventAggregationServiceTest {
);
}
@Test
void collapsesDuplicateSerializedRepresentationsOfSameExtractionObservation() {
void deduplicatesRepresentationsSharingTheSameSourceRowIdentity() {
EventHubEventDto first = event(
"DRIVER_CARD",
"CARD_POSITION",
"TACHOGRAPH:CARD_POSITION:10",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT
EventLifecycle.SNAPSHOT,
"10",
null,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
EventHubEventDto serializedCopy = event(
"DRIVER_CARD",
@ -91,37 +105,107 @@ class RuntimeEventAggregationServiceTest {
"TACHOGRAPH:CARD_POSITION:COPY-10",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT
EventLifecycle.SNAPSHOT,
"10",
null,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
assertThat(service.aggregateRuntimeEvents(List.of(first, serializedCopy)))
.containsExactly(first);
}
@Test
void collapsesPlaceStartAndBeginRepresentationsForSameExtractionSource() {
void collapsesPlaceStartAndBeginRepresentationsForTheSamePhysicalRecord() {
String rawRecordPath = "/DriverCard/Places[1]/cardPlaceDailyWorkPeriod/placeRecords[1]";
EventHubEventDto dbStyle = event(
"DRIVER_CARD",
"CARD_PLACE",
"TACHOGRAPH:CARD_PLACE:10",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.START
EventLifecycle.START,
"CARDPLACE-1",
rawRecordPath,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
EventHubEventDto fileStyle = event(
"DRIVER_CARD",
"CARD_PLACE",
"TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:place-10:BEGIN:2026-04-01T08:00:00Z",
"TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:CARDPLACE-1:BEGIN:2026-04-01T08:00:00Z",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN
EventLifecycle.BEGIN,
"CARDPLACE-1",
rawRecordPath,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
assertThat(service.aggregateRuntimeEvents(List.of(dbStyle, fileStyle)))
.containsExactly(dbStyle);
}
@Test
void keepsDistinctCardPlaceRecordsWhenGeneratedIdsRepeatAcrossSections() {
String repeatedExternalId = "TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:CARDPLACE-1:BEGIN:2026-04-01T08:00:00Z";
EventHubEventDto firstSection = event(
"DRIVER_CARD",
"CARD_PLACE",
repeatedExternalId,
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN,
"CARDPLACE-1",
"/DriverCard/Places[1]/cardPlaceDailyWorkPeriod/placeRecords[1]",
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
EventHubEventDto secondSection = event(
"DRIVER_CARD",
"CARD_PLACE",
repeatedExternalId,
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN,
"CARDPLACE-1",
"/DriverCard/Places[2]/cardPlaceDailyWorkPeriod/placeRecords[1]",
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
assertThat(service.aggregateRuntimeEvents(List.of(firstSection, secondSection)))
.containsExactly(firstSection, secondSection);
assertThat(service.exactSourceRecordKey(firstSection))
.isNotEqualTo(service.exactSourceRecordKey(secondSection));
}
@Test
void keepsSemanticallyEqualRecordsWhenTheirPhysicalSourceIdentityDiffers() {
EventHubEventDto first = event(
"DRIVER_CARD",
"CARD_PLACE",
"TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:CARDPLACE-1:BEGIN:2026-04-01T08:00:00Z",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN,
"CARDPLACE-1",
"/DriverCard/Places[1]/cardPlaceDailyWorkPeriod/placeRecords[1]",
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
EventHubEventDto second = event(
"DRIVER_CARD",
"CARD_PLACE",
"TACHOGRAPH_FILE_SESSION:session-1:SUPPORT:CARDPLACE-2:BEGIN:2026-04-01T08:00:00Z",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN,
"CARDPLACE-2",
"/DriverCard/Places[1]/cardPlaceDailyWorkPeriod/placeRecords[2]",
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
assertThat(service.aggregateRuntimeEvents(List.of(first, second)))
.containsExactly(first, second);
}
@Test
void keepsDifferentExtractionSourcesEvenWhenSemanticEventDataIsEqual() {
EventHubEventDto card = event(
@ -130,7 +214,10 @@ class RuntimeEventAggregationServiceTest {
"TACHOGRAPH:CARD_ACTIVITY:10:START",
EventDomain.DRIVER_ACTIVITY,
EventType.DRIVE,
EventLifecycle.START
EventLifecycle.START,
"10",
null,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
EventHubEventDto vu = event(
"VEHICLE_UNIT",
@ -138,7 +225,10 @@ class RuntimeEventAggregationServiceTest {
"TACHOGRAPH:VU_ACTIVITY:20:START",
EventDomain.DRIVER_ACTIVITY,
EventType.DRIVE,
EventLifecycle.START
EventLifecycle.START,
"20",
null,
OffsetDateTime.parse("2026-04-01T08:00:00Z")
);
assertThat(service.aggregateRuntimeEvents(List.of(card, vu)))
@ -153,7 +243,10 @@ class RuntimeEventAggregationServiceTest {
String externalSourceEventId,
EventDomain domain,
EventType eventType,
EventLifecycle lifecycle
EventLifecycle lifecycle,
String sourceRowId,
String rawRecordPath,
OffsetDateTime occurredAt
) {
EventSourceDto source = new EventSourceDto(
"TACHOGRAPH",
@ -177,6 +270,12 @@ class RuntimeEventAggregationServiceTest {
raw.put("extractionCode", extractionCode);
raw.put("driverKey", "12:123");
raw.put("registrationKey", "12:REG-1");
if (sourceRowId != null) {
raw.put("sourceRowId", sourceRowId);
}
if (rawRecordPath != null) {
raw.put("rawRecordPath", rawRecordPath);
}
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
return new EventHubEventDto(
@ -184,7 +283,7 @@ class RuntimeEventAggregationServiceTest {
externalSourceEventId,
new DriverRefDto("driver-1", new DriverCardRefDto("12", "123")),
new VehicleRefDto("vehicle-1", "VIN-1", new VehicleRegistrationRefDto("12", "REG-1")),
OffsetDateTime.parse("2026-04-01T08:00:00Z"),
occurredAt,
null,
null,
domain,