Add runtime response shaping and vehicle usage reconciliation

This commit is contained in:
trifonovt 2026-06-11 16:10:43 +02:00
parent 46a89ea5b5
commit 829dc2e06a
17 changed files with 1278 additions and 55 deletions

View File

@ -1,45 +1,109 @@
# EventHub runtime event-mixing refactor
# Patch: Vehicle Usage Interval Reconciliation
This patch refactors the previous targeted card/VU duplicate handling into a first-class runtime event-mixing subsystem.
This patch extends the already introduced runtime event-mixing architecture with an interval-level reconciliation step for tachograph vehicle-usage evidence.
## New architecture components
## New module
- `RuntimeEventMixingModule`
- `RuntimeEventMixingService`
- `RuntimeEventDescriptor`
- `RuntimeEventDescriptorFactory`
- `RuntimeEventSourceProfile`
- `RuntimeEventMixingRule`
- `RuntimeEventMixingRuleRegistry`
- `RuntimeEventMixingDecisionDto`
- `RuntimeMixedEventBundle`
- `RuntimeResolvedEvent`
- `RuntimeResolvedEventRole`
- `RuntimeEventMixingChannel`
Added runtime module:
## Current configured rules
```text
vehicle-usage-reconciliation
```
The rule registry currently applies these tachograph same-source rules:
It runs after:
1. `tachograph.activity.card-vu.same-event-key`
2. `tachograph.activity.card-vu.compatible-activity-key`
3. `tachograph.support.card-vu.same-event-key`
4. `tachograph.support.card-vu.compatible-support-key`
```text
event-to-vehicle-usage-intervals
```
The activity rules collapse duplicate `CARD_ACTIVITY`/`VU_ACTIVITY` points before activity intervalization.
and before:
The support rules collapse duplicate card/VU support evidence for:
```text
vehicle-usage-merge
```
- `CARD_POSITION` / `VU_POSITION`
- `CARD_PLACE` / `VU_PLACE`
- `CARD_BORDER_CROSSING` / `VU_BORDER_CROSSING`
## Main behavior
The card-side event remains the primary event. The VU-side event is suppressed from the processing channel but remains visible through `suppressedEvents`, `resolvedEvents`, and `eventMixingDecisions`.
The module intentionally does not mix `CARD_VEHICLES_USED` and `IW_CYCLE` at event level. Instead, it reconciles the completed vehicle-usage intervals.
## Still intentionally unchanged
Processing phases:
`CARD_VEHICLES_USED` and `IW_CYCLE` are still not mixed. They remain fully accepted in `vehicleUsageEvents` because they need a separate vehicle-usage rule later.
1. Split raw vehicle-usage intervals by source type:
- `CARD_VEHICLES_USED`
- `IW_CYCLE`
- `OTHER`
2. Normalize `CARD_VEHICLES_USED` technical midnight splits.
3. Reconcile normalized `CARD_VEHICLES_USED` intervals with `IW_CYCLE` intervals.
4. Produce effective vehicle-usage intervals for downstream processing.
## TACHOGRAPH_FILE_SESSION support
## CVU technical midnight split
The descriptor factory recognizes `TACHOGRAPH_FILE_SESSION` and `COMPOSITE_TACHOGRAPH_FILE_SESSION` events and derives card/VU extraction codes from `sourceKind` and event domain when no explicit `extractionCode` is present.
The technical midnight split is handled only for `CARD_VEHICLES_USED` / CVU intervals, not for `IW_CYCLE`.
Pattern:
```text
CARD_VEHICLES_USED interval A ends at 23:59:59
CARD_VEHICLES_USED interval B starts at 00:00:00
same driver
same registration / compatible vehicle
max gap: 1 second
```
Result:
```text
A + B => one normalized CARD_VEHICLES_USED interval
```
## CVU vs IW reconciliation
After CVU normalization:
```text
normalized CARD_VEHICLES_USED interval
vs
IW_CYCLE interval
```
Rule:
```text
IW_CYCLE is primary for effective vehicle-usage identity.
CARD_VEHICLES_USED is fallback or corroborating evidence.
```
Matching currently supports exact or compatible start/end boundaries with a 60-second tolerance.
## New classes
```text
src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleUsageReconciliationModule.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalDescriptor.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalDescriptorFactory.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalRole.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageIntervalSourceType.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationDecisionDto.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationResult.java
src/main/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationService.java
src/test/java/at/procon/eventhub/processing/eventprocessing/vehicleusage/RuntimeVehicleUsageReconciliationServiceTest.java
```
## Modified existing files
```text
src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java
src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageMergeModule.java
src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java
```
## Notes
`vehicle-usage-merge` now consumes the effective intervals from `vehicle-usage-reconciliation` when that module has run. If the reconciliation module is omitted from a custom module list, `vehicle-usage-merge` falls back to raw `event-to-vehicle-usage-intervals` output.
Tests were added for:
- CVU technical midnight split coalescing
- CVU + IW reconciliation with IW as primary
- CVU fallback when IW is missing
- IW primary when CVU is missing

View File

@ -88,7 +88,11 @@ The important design point is that runtime processing is not tachograph-specific
"parameters": {
"significantDrivingMinutes": 3,
"minimumRestPeriodMinutes": 720,
"includePartitionDebug": true
"includePartitionDebug": true,
"includePartitionMetadata": true,
"includePartitionModuleResults": true,
"includeExecutionModuleResults": true,
"includeSupportEvidenceNormalization": true
}
}
```
@ -204,3 +208,58 @@ The first source-neutral EPL modules are:
| `event-to-vehicle-usage-intervals` | `EPL` | `esper/runtime-driver-vehicle-usage-intervals.epl` | `driverVehicleUsageIntervals` |
These modules operate on canonical EventHub runtime events, not on tachograph-specific source rows. They are currently used as first-class phase modules in `driver-working-time-v1`; the final `driving-derived-projections` module remains a compatibility adapter over the validated working-time projection service until the remaining projection stages are split into direct EPL modules.
## Response shaping
`driver-working-time-v1` supports a small set of optional request parameters to trim diagnostic payloads from `/api/eventhub/runtime-processing/executions` and the legacy `/api/eventhub/runtime-processing/event-processing` adapter.
All of these default to `true` to preserve current behavior:
```text
includeExecutionModuleResults
includePartitionMetadata
includePartitionModuleResults
includeSupportEvidenceNormalization
includePartitionDebug
```
Meaning:
- `includeExecutionModuleResults`: include top-level `moduleResults` on `/executions`
- `includePartitionMetadata`: include `partitionResults[*].metadata`
- `includePartitionModuleResults`: include `partitionResults[*].moduleResults`
- `includeSupportEvidenceNormalization`: include `supportEvidenceNormalization` in each partition result payload
- `includePartitionDebug`: include `partitionDebug` in each partition result payload
Example slim response request:
```json
{
"processingPlanKey": "driver-working-time-v1",
"sourceSelection": {
"tenantKey": "default",
"driverKey": "12:12345678901234",
"occurredFrom": "2026-05-01T00:00:00Z",
"occurredTo": "2026-05-31T23:59:59Z",
"sourceInputs": [
{
"sourceFamily": "TACHOGRAPH_FILE_SESSION",
"eventBackend": "SOURCE_DB",
"sessionIds": [
"11111111-1111-1111-1111-111111111111"
]
}
]
},
"partitioning": {
"strategy": "DRIVER"
},
"parameters": {
"includeExecutionModuleResults": false,
"includePartitionMetadata": false,
"includePartitionModuleResults": false,
"includeSupportEvidenceNormalization": false,
"includePartitionDebug": false
}
}
```

View File

@ -66,6 +66,12 @@ public record UnifiedRuntimeDriverWorkingTimeScopeResultDto(
Object debug = entry.getValue().metadata().get("partitionDebug");
if (debug instanceof RuntimeDriverPartitionDebugDto partitionDebug) {
debugByDriver.put(entry.getKey(), partitionDebug);
continue;
}
Object value = entry.getValue().result();
if (value instanceof UnifiedRuntimeDerivedProjectionResultDto projectionResult
&& projectionResult.partitionDebug() != null) {
debugByDriver.put(entry.getKey(), projectionResult.partitionDebug());
}
}
return debugByDriver;

View File

@ -2,6 +2,7 @@ package at.procon.eventhub.processing.eventprocessing.module;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
import at.procon.eventhub.processing.eventprocessing.vehicleusage.RuntimeVehicleUsageReconciliationResult;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
@ -26,7 +27,7 @@ public class DriverVehicleUsageMergeModule implements RuntimeProcessingModule {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Vehicle usage merge",
"Merges adjacent or continuous same-driver/same-vehicle usage intervals, including 23:59:59 to 00:00:00 continuations.",
"Merges adjacent or continuous effective same-driver/same-vehicle usage intervals after source reconciliation.",
"JAVA",
Set.of("DriverWorkingTimeVehicleUsageInterval")
);
@ -34,9 +35,7 @@ public class DriverVehicleUsageMergeModule implements RuntimeProcessingModule {
@Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
Object output = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS) == null
? null
: context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS).output();
Object output = sourceOutput(context);
List<DriverWorkingTimeVehicleUsageInterval> intervals = castIntervals(output);
List<DriverWorkingTimeVehicleUsageInterval> merged = merge(intervals);
Map<String, Object> metadata = new LinkedHashMap<>();
@ -51,8 +50,20 @@ public class DriverVehicleUsageMergeModule implements RuntimeProcessingModule {
);
}
private Object sourceOutput(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult reconciled = context.previousResults().get(DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION);
if (reconciled != null) {
return reconciled.output();
}
RuntimeProcessingModuleResult raw = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS);
return raw == null ? null : raw.output();
}
@SuppressWarnings("unchecked")
private List<DriverWorkingTimeVehicleUsageInterval> castIntervals(Object output) {
if (output instanceof RuntimeVehicleUsageReconciliationResult result) {
return result.effectiveVehicleUsageIntervals();
}
return output instanceof List<?> list
? (List<DriverWorkingTimeVehicleUsageInterval>) list
: List.of();

View File

@ -6,6 +6,7 @@ public final class DriverWorkingTimeModuleKeys {
public static final String EVENT_EVIDENCE_MIXING = "event-evidence-mixing";
public static final String EVENT_TO_ACTIVITY_INTERVALS = "event-to-activity-intervals";
public static final String EVENT_TO_VEHICLE_USAGE_INTERVALS = "event-to-vehicle-usage-intervals";
public static final String VEHICLE_USAGE_RECONCILIATION = "vehicle-usage-reconciliation";
public static final String VEHICLE_USAGE_MERGE = "vehicle-usage-merge";
public static final String VEHICLE_EVIDENCE_ATTACHMENT = "vehicle-evidence-attachment";
public static final String SUPPORT_EVIDENCE_NORMALIZATION = "support-evidence-normalization";

View File

@ -0,0 +1,74 @@
package at.procon.eventhub.processing.eventprocessing.module;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
import at.procon.eventhub.processing.eventprocessing.vehicleusage.RuntimeVehicleUsageReconciliationResult;
import at.procon.eventhub.processing.eventprocessing.vehicleusage.RuntimeVehicleUsageReconciliationService;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class VehicleUsageReconciliationModule implements RuntimeProcessingModule {
private final RuntimeVehicleUsageReconciliationService reconciliationService;
@Autowired
public VehicleUsageReconciliationModule(RuntimeVehicleUsageReconciliationService reconciliationService) {
this.reconciliationService = reconciliationService;
}
/** Compatibility constructor for legacy tests that instantiate a local module registry. */
public VehicleUsageReconciliationModule() {
this(new RuntimeVehicleUsageReconciliationService());
}
@Override
public String moduleKey() {
return DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION;
}
@Override
public RuntimeProcessingModuleDescriptorDto descriptor() {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Vehicle usage reconciliation",
"Normalizes CARD_VEHICLES_USED technical midnight splits, then reconciles normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary for effective vehicle usage; CARD_VEHICLES_USED remains fallback or corroborating evidence.",
"JAVA",
Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent")
);
}
@Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
List<DriverWorkingTimeVehicleUsageInterval> rawIntervals = vehicleUsageIntervals(context);
RuntimeVehicleUsageReconciliationResult result = reconciliationService.reconcile(rawIntervals);
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("inputVehicleUsageIntervalCount", result.rawVehicleUsageIntervals().size());
metadata.put("normalizedCardVehicleUsedIntervalCount", result.normalizedCardVehicleUsedIntervals().size());
metadata.put("effectiveVehicleUsageIntervalCount", result.effectiveVehicleUsageIntervals().size());
metadata.put("suppressedSecondaryIntervalCount", result.suppressedSecondaryIntervals().size());
metadata.put("vehicleUsageReconciliationDecisionCount", result.vehicleUsageReconciliationDecisions().size());
metadata.put("vehicleUsageReconciliationDecisions", result.vehicleUsageReconciliationDecisions());
metadata.put("notes", result.notes());
return new RuntimeProcessingModuleResult(
moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS,
result,
metadata,
result.warnings()
);
}
@SuppressWarnings("unchecked")
private List<DriverWorkingTimeVehicleUsageInterval> vehicleUsageIntervals(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult result = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS);
if (result == null || !(result.output() instanceof List<?> list)) {
return List.of();
}
return (List<DriverWorkingTimeVehicleUsageInterval>) list;
}
}

View File

@ -14,6 +14,7 @@ import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingMod
import at.procon.eventhub.processing.eventprocessing.module.DriverActivityIntervalsModule;
import at.procon.eventhub.processing.eventprocessing.module.DriverVehicleUsageIntervalsModule;
import at.procon.eventhub.processing.eventprocessing.module.DriverVehicleUsageMergeModule;
import at.procon.eventhub.processing.eventprocessing.module.VehicleUsageReconciliationModule;
import at.procon.eventhub.processing.eventprocessing.module.VehicleEvidenceAttachmentModule;
import at.procon.eventhub.processing.eventprocessing.module.SupportEvidenceNormalizationModule;
import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeDerivedProjectionsModule;
@ -32,6 +33,10 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
public static final String PLAN_KEY = "driver-working-time-v1";
public static final String ATTACH_VEHICLE_ONLY_EVENTS_ATTRIBUTE = "attachVehicleOnlyEvents";
public static final String VEHICLE_EVIDENCE_PADDING_MINUTES_ATTRIBUTE = "vehicleEvidencePaddingMinutes";
public static final String INCLUDE_EXECUTION_MODULE_RESULTS_PARAMETER = "includeExecutionModuleResults";
public static final String INCLUDE_PARTITION_METADATA_PARAMETER = "includePartitionMetadata";
public static final String INCLUDE_PARTITION_MODULE_RESULTS_PARAMETER = "includePartitionModuleResults";
public static final String INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER = "includeSupportEvidenceNormalization";
private final RuntimeProcessingPipelineExecutor pipelineExecutor;
private final boolean includeRuntimeEventAssemblyModule;
@ -55,6 +60,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
this(new RuntimeProcessingPipelineExecutor(new RuntimeProcessingModuleRegistry(List.of(
new DriverActivityIntervalsModule(),
new DriverVehicleUsageIntervalsModule(),
new VehicleUsageReconciliationModule(),
new DriverVehicleUsageMergeModule(),
new VehicleEvidenceAttachmentModule(),
new SupportEvidenceNormalizationModule(),
@ -137,11 +143,18 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"ESPER",
Set.of("DriverVehicleUsageIntervalEvent")
),
new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION,
"Vehicle usage reconciliation",
"Coalesces CARD_VEHICLES_USED technical midnight splits before reconciling normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary; CARD_VEHICLES_USED is fallback or corroborating evidence.",
"JAVA",
Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent")
),
new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
"Vehicle usage merge",
"Merges adjacent same-driver/same-vehicle usage intervals, including 23:59:59 to 00:00:00 continuations. Currently delegated to the derived projections adapter.",
"JAVA/ESPER",
"Merges adjacent effective same-driver/same-vehicle usage intervals after vehicle-usage source reconciliation.",
"JAVA",
Set.of("DriverVehicleUsageIntervalEvent")
),
new RuntimeProcessingModuleDescriptorDto(
@ -176,9 +189,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"minimumRestPeriodMinutes",
"attachVehicleOnlyEvents",
"vehicleEvidencePaddingMinutes",
INCLUDE_EXECUTION_MODULE_RESULTS_PARAMETER,
INCLUDE_PARTITION_METADATA_PARAMETER,
INCLUDE_PARTITION_MODULE_RESULTS_PARAMETER,
"includeActivityIntervals",
"includeDrivingIntervals",
"includePartitionDebug",
INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER,
"eventMixingMode"
);
}
@ -195,6 +212,26 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
request.partitioning(),
request.parameters()
);
boolean includeExecutionModuleResults = booleanParameter(
request.parameters(),
INCLUDE_EXECUTION_MODULE_RESULTS_PARAMETER,
true
);
boolean includePartitionMetadata = booleanParameter(
request.parameters(),
INCLUDE_PARTITION_METADATA_PARAMETER,
true
);
boolean includePartitionModuleResults = booleanParameter(
request.parameters(),
INCLUDE_PARTITION_MODULE_RESULTS_PARAMETER,
true
);
boolean includeSupportEvidenceNormalization = booleanParameter(
request.parameters(),
INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER,
true
);
int vehicleEvidencePaddingMinutes = resolveVehicleEvidencePaddingMinutes(
request.sourceSelection(),
request.partitioning(),
@ -228,26 +265,23 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
Map<String, RuntimeEventProcessingPartitionResultDto> partitionResults = new LinkedHashMap<>();
workingTimeResult.driverResults().forEach((driverKey, driverResult) -> {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("projectionResultType", driverResult.projection() == null ? "NONE" : "DriverWorkingTimeProcessingResultDto");
metadata.put("driverSeedEventCount", driverResult.driverSeedEventCount());
metadata.put("expandedVehicleEventCount", driverResult.expandedVehicleEventCount());
metadata.put("mergedEventCount", driverResult.mergedEventCount());
if (driverResult.supportEvidenceNormalization() != null) {
metadata.put("supportEvidenceNormalization", driverResult.supportEvidenceNormalization());
}
if (driverResult.partitionDebug() != null) {
metadata.put("partitionDebug", driverResult.partitionDebug());
}
UnifiedRuntimeDerivedProjectionResultDto shapedDriverResult = shapeDriverResult(
driverResult,
includeSupportEvidenceNormalization,
includePartitionDebug
);
Map<String, Object> metadata = includePartitionMetadata
? partitionMetadata(shapedDriverResult)
: Map.of();
partitionResults.put(
driverKey,
new RuntimeEventProcessingPartitionResultDto(
"DRIVER",
driverKey,
"UnifiedRuntimeDerivedProjectionResultDto",
driverResult,
shapedDriverResult,
metadata,
partitionModuleResults(driverResult)
includePartitionModuleResults ? partitionModuleResults(shapedDriverResult) : Map.of()
)
);
});
@ -261,7 +295,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
workingTimeResult.selectedDriverCount(),
workingTimeResult.discoveredVehicleCount(),
workingTimeResult.discoveredVehicles(),
sanitizeExecutionModuleResults(moduleResults),
includeExecutionModuleResults ? sanitizeExecutionModuleResults(moduleResults) : Map.of(),
partitionResults,
workingTimeResult.notes(),
workingTimeResult.warnings()
@ -355,6 +389,42 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
return results;
}
private Map<String, Object> partitionMetadata(
UnifiedRuntimeDerivedProjectionResultDto driverResult
) {
LinkedHashMap<String, Object> metadata = new LinkedHashMap<>();
metadata.put("projectionResultType", driverResult.projection() == null ? "NONE" : "DriverWorkingTimeProcessingResultDto");
metadata.put("driverSeedEventCount", driverResult.driverSeedEventCount());
metadata.put("expandedVehicleEventCount", driverResult.expandedVehicleEventCount());
metadata.put("mergedEventCount", driverResult.mergedEventCount());
if (driverResult.supportEvidenceNormalization() != null) {
metadata.put("supportEvidenceNormalization", driverResult.supportEvidenceNormalization());
}
if (driverResult.partitionDebug() != null) {
metadata.put("partitionDebug", driverResult.partitionDebug());
}
return metadata;
}
private UnifiedRuntimeDerivedProjectionResultDto shapeDriverResult(
UnifiedRuntimeDerivedProjectionResultDto driverResult,
boolean includeSupportEvidenceNormalization,
boolean includePartitionDebug
) {
return new UnifiedRuntimeDerivedProjectionResultDto(
driverResult.request(),
driverResult.driverSeedEventCount(),
driverResult.discoveredVehicleCount(),
driverResult.expandedVehicleEventCount(),
driverResult.mergedEventCount(),
driverResult.discoveredVehicles(),
driverResult.projection(),
driverResult.notes(),
includeSupportEvidenceNormalization ? driverResult.supportEvidenceNormalization() : null,
includePartitionDebug ? driverResult.partitionDebug() : null
);
}
public UnifiedRuntimeProcessingApiRequest applyExecutionRequest(
UnifiedRuntimeProcessingApiRequest sourceSelection,
RuntimeEventPartitioningApiRequest partitioning,

View File

@ -0,0 +1,19 @@
package at.procon.eventhub.processing.eventprocessing.vehicleusage;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import java.time.OffsetDateTime;
public record RuntimeVehicleUsageIntervalDescriptor(
DriverWorkingTimeVehicleUsageInterval interval,
RuntimeVehicleUsageIntervalSourceType sourceType,
String sourceKind,
String driverKey,
String registrationKey,
String vehicleKey,
OffsetDateTime startedAt,
OffsetDateTime endedAt
) {
public String intervalId() {
return interval == null ? null : interval.intervalId();
}
}

View File

@ -0,0 +1,64 @@
package at.procon.eventhub.processing.eventprocessing.vehicleusage;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.springframework.stereotype.Component;
@Component
public class RuntimeVehicleUsageIntervalDescriptorFactory {
public RuntimeVehicleUsageIntervalDescriptor describe(DriverWorkingTimeVehicleUsageInterval interval) {
if (interval == null) {
return null;
}
return new RuntimeVehicleUsageIntervalDescriptor(
interval,
sourceType(interval),
interval.sourceKind(),
interval.driverKey(),
interval.registrationKey(),
interval.vehicleKey(),
interval.startedAt(),
interval.endedAt()
);
}
private RuntimeVehicleUsageIntervalSourceType sourceType(DriverWorkingTimeVehicleUsageInterval interval) {
String sourceKind = normalize(interval.sourceKind());
List<String> identifiers = identifiers(interval).stream().map(this::normalize).toList();
if (identifiers.stream().anyMatch(value -> value.contains("CARD_VEHICLES_USED"))
|| "CARD_VEHICLES_USED".equals(sourceKind)
|| "DRIVER_CARD".equals(sourceKind)) {
return RuntimeVehicleUsageIntervalSourceType.CARD_VEHICLES_USED;
}
if (identifiers.stream().anyMatch(value -> value.contains("IW_CYCLE"))
|| "IW_CYCLE".equals(sourceKind)
|| "VEHICLE_UNIT".equals(sourceKind)) {
return RuntimeVehicleUsageIntervalSourceType.IW_CYCLE;
}
return RuntimeVehicleUsageIntervalSourceType.OTHER;
}
private List<String> identifiers(DriverWorkingTimeVehicleUsageInterval interval) {
List<String> result = new ArrayList<>();
add(result, interval.intervalId());
add(result, interval.firstSourceIntervalId());
add(result, interval.lastSourceIntervalId());
if (interval.sourceIntervalIds() != null) {
interval.sourceIntervalIds().forEach(value -> add(result, value));
}
return result;
}
private void add(List<String> values, String value) {
if (value != null && !value.isBlank()) {
values.add(value);
}
}
private String normalize(String value) {
return value == null ? "" : value.trim().toUpperCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,11 @@
package at.procon.eventhub.processing.eventprocessing.vehicleusage;
public enum RuntimeVehicleUsageIntervalRole {
PRIMARY,
FUSED_PRIMARY,
FALLBACK_PRIMARY,
CORROBORATING_SECONDARY,
SUPPRESSED_DUPLICATE,
UNCHANGED,
CONFLICTING_EVIDENCE
}

View File

@ -0,0 +1,7 @@
package at.procon.eventhub.processing.eventprocessing.vehicleusage;
public enum RuntimeVehicleUsageIntervalSourceType {
CARD_VEHICLES_USED,
IW_CYCLE,
OTHER
}

View File

@ -0,0 +1,26 @@
package at.procon.eventhub.processing.eventprocessing.vehicleusage;
import java.time.OffsetDateTime;
import java.util.List;
public record RuntimeVehicleUsageReconciliationDecisionDto(
String ruleId,
String decision,
String equivalenceType,
String primaryIntervalId,
String primarySourceType,
List<String> secondaryIntervalIds,
List<String> secondarySourceTypes,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
Long startDeltaSeconds,
Long endDeltaSeconds,
String reason,
List<String> warnings
) {
public RuntimeVehicleUsageReconciliationDecisionDto {
secondaryIntervalIds = secondaryIntervalIds == null ? List.of() : List.copyOf(secondaryIntervalIds);
secondarySourceTypes = secondarySourceTypes == null ? List.of() : List.copyOf(secondarySourceTypes);
warnings = warnings == null ? List.of() : List.copyOf(warnings);
}
}

View File

@ -0,0 +1,24 @@
package at.procon.eventhub.processing.eventprocessing.vehicleusage;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import java.util.List;
public record RuntimeVehicleUsageReconciliationResult(
List<DriverWorkingTimeVehicleUsageInterval> rawVehicleUsageIntervals,
List<DriverWorkingTimeVehicleUsageInterval> normalizedCardVehicleUsedIntervals,
List<DriverWorkingTimeVehicleUsageInterval> effectiveVehicleUsageIntervals,
List<DriverWorkingTimeVehicleUsageInterval> suppressedSecondaryIntervals,
List<RuntimeVehicleUsageReconciliationDecisionDto> vehicleUsageReconciliationDecisions,
List<String> notes,
List<String> warnings
) {
public RuntimeVehicleUsageReconciliationResult {
rawVehicleUsageIntervals = rawVehicleUsageIntervals == null ? List.of() : List.copyOf(rawVehicleUsageIntervals);
normalizedCardVehicleUsedIntervals = normalizedCardVehicleUsedIntervals == null ? List.of() : List.copyOf(normalizedCardVehicleUsedIntervals);
effectiveVehicleUsageIntervals = effectiveVehicleUsageIntervals == null ? List.of() : List.copyOf(effectiveVehicleUsageIntervals);
suppressedSecondaryIntervals = suppressedSecondaryIntervals == null ? List.of() : List.copyOf(suppressedSecondaryIntervals);
vehicleUsageReconciliationDecisions = vehicleUsageReconciliationDecisions == null ? List.of() : List.copyOf(vehicleUsageReconciliationDecisions);
notes = notes == null ? List.of() : List.copyOf(notes);
warnings = warnings == null ? List.of() : List.copyOf(warnings);
}
}

View File

@ -0,0 +1,474 @@
package at.procon.eventhub.processing.eventprocessing.vehicleusage;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.springframework.stereotype.Service;
@Service
public class RuntimeVehicleUsageReconciliationService {
public static final String RULE_CVU_MIDNIGHT_CONTINUATION = "tachograph.vehicle-usage.card-vehicles-used.midnight-continuation";
public static final String RULE_CVU_IW_EXACT_OR_COMPATIBLE = "tachograph.vehicle-usage.card-vehicles-used-iw-cycle.exact-or-compatible";
public static final String RULE_CVU_FALLBACK = "tachograph.vehicle-usage.card-vehicles-used-fallback";
public static final String RULE_IW_PRIMARY = "tachograph.vehicle-usage.iw-cycle-primary";
private static final long MATCH_TOLERANCE_SECONDS = 60L;
private final RuntimeVehicleUsageIntervalDescriptorFactory descriptorFactory;
public RuntimeVehicleUsageReconciliationService() {
this(new RuntimeVehicleUsageIntervalDescriptorFactory());
}
public RuntimeVehicleUsageReconciliationService(RuntimeVehicleUsageIntervalDescriptorFactory descriptorFactory) {
this.descriptorFactory = descriptorFactory;
}
public RuntimeVehicleUsageReconciliationResult reconcile(List<DriverWorkingTimeVehicleUsageInterval> intervals) {
List<DriverWorkingTimeVehicleUsageInterval> raw = intervals == null ? List.of() : intervals.stream()
.filter(Objects::nonNull)
.sorted(intervalComparator())
.toList();
List<RuntimeVehicleUsageReconciliationDecisionDto> decisions = new ArrayList<>();
List<String> warnings = new ArrayList<>();
List<DriverWorkingTimeVehicleUsageInterval> cardIntervals = new ArrayList<>();
List<DriverWorkingTimeVehicleUsageInterval> iwIntervals = new ArrayList<>();
List<DriverWorkingTimeVehicleUsageInterval> otherIntervals = new ArrayList<>();
for (DriverWorkingTimeVehicleUsageInterval interval : raw) {
RuntimeVehicleUsageIntervalDescriptor descriptor = descriptorFactory.describe(interval);
RuntimeVehicleUsageIntervalSourceType sourceType = descriptor == null
? RuntimeVehicleUsageIntervalSourceType.OTHER
: descriptor.sourceType();
switch (sourceType) {
case CARD_VEHICLES_USED -> cardIntervals.add(interval);
case IW_CYCLE -> iwIntervals.add(interval);
case OTHER -> otherIntervals.add(interval);
}
}
List<DriverWorkingTimeVehicleUsageInterval> normalizedCardIntervals = normalizeCardVehicleUsed(cardIntervals, decisions);
ReconciliationStepResult reconciled = reconcileCardWithIw(normalizedCardIntervals, iwIntervals, decisions, warnings);
List<DriverWorkingTimeVehicleUsageInterval> effective = new ArrayList<>();
effective.addAll(reconciled.effectiveIntervals());
effective.addAll(otherIntervals);
effective.sort(intervalComparator());
List<String> notes = List.of(
"Vehicle usage reconciliation first coalesced CARD_VEHICLES_USED technical midnight splits, then reconciled normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals."
);
return new RuntimeVehicleUsageReconciliationResult(
raw,
normalizedCardIntervals,
effective,
reconciled.suppressedIntervals(),
decisions,
notes,
warnings
);
}
private List<DriverWorkingTimeVehicleUsageInterval> normalizeCardVehicleUsed(
List<DriverWorkingTimeVehicleUsageInterval> intervals,
List<RuntimeVehicleUsageReconciliationDecisionDto> decisions
) {
if (intervals == null || intervals.isEmpty()) {
return List.of();
}
List<DriverWorkingTimeVehicleUsageInterval> sorted = intervals.stream()
.filter(Objects::nonNull)
.sorted(intervalComparator())
.toList();
List<DriverWorkingTimeVehicleUsageInterval> normalized = new ArrayList<>();
for (DriverWorkingTimeVehicleUsageInterval next : sorted) {
if (normalized.isEmpty()) {
normalized.add(next);
continue;
}
DriverWorkingTimeVehicleUsageInterval current = normalized.get(normalized.size() - 1);
if (canMergeCardVehicleUsedTechnicalMidnightSplit(current, next)) {
DriverWorkingTimeVehicleUsageInterval merged = mergeCardVehicleUsedTechnicalSplit(current, next);
normalized.set(normalized.size() - 1, merged);
decisions.add(new RuntimeVehicleUsageReconciliationDecisionDto(
RULE_CVU_MIDNIGHT_CONTINUATION,
"COALESCED_TECHNICAL_MIDNIGHT_SPLIT",
"CARD_VEHICLES_USED_MIDNIGHT_CONTINUATION",
merged.intervalId(),
RuntimeVehicleUsageIntervalSourceType.CARD_VEHICLES_USED.name(),
List.of(current.intervalId(), next.intervalId()),
List.of(RuntimeVehicleUsageIntervalSourceType.CARD_VEHICLES_USED.name(), RuntimeVehicleUsageIntervalSourceType.CARD_VEHICLES_USED.name()),
merged.startedAt(),
merged.endedAt(),
deltaSeconds(current.endedAt(), next.startedAt()),
null,
"CARD_VEHICLES_USED contained a technical 23:59:59/00:00:00 split for the same driver and vehicle; it was normalized before IW_CYCLE reconciliation.",
List.of()
));
} else {
normalized.add(next);
}
}
return List.copyOf(normalized);
}
private ReconciliationStepResult reconcileCardWithIw(
List<DriverWorkingTimeVehicleUsageInterval> cardIntervals,
List<DriverWorkingTimeVehicleUsageInterval> iwIntervals,
List<RuntimeVehicleUsageReconciliationDecisionDto> decisions,
List<String> warnings
) {
List<DriverWorkingTimeVehicleUsageInterval> effective = new ArrayList<>();
List<DriverWorkingTimeVehicleUsageInterval> suppressed = new ArrayList<>();
Set<String> usedIwKeys = new LinkedHashSet<>();
Map<String, DriverWorkingTimeVehicleUsageInterval> iwByKey = new LinkedHashMap<>();
for (DriverWorkingTimeVehicleUsageInterval iw : iwIntervals == null ? List.<DriverWorkingTimeVehicleUsageInterval>of() : iwIntervals) {
iwByKey.put(intervalIdentity(iw), iw);
}
for (DriverWorkingTimeVehicleUsageInterval card : cardIntervals == null ? List.<DriverWorkingTimeVehicleUsageInterval>of() : cardIntervals) {
DriverWorkingTimeVehicleUsageInterval matchingIw = bestMatchingIw(card, iwIntervals, usedIwKeys);
if (matchingIw != null) {
usedIwKeys.add(intervalIdentity(matchingIw));
DriverWorkingTimeVehicleUsageInterval fused = fuseIwPrimary(matchingIw, card);
effective.add(fused);
suppressed.add(card);
decisions.add(new RuntimeVehicleUsageReconciliationDecisionDto(
RULE_CVU_IW_EXACT_OR_COMPATIBLE,
"FUSED_PRIMARY_SELECTED",
equivalenceType(card, matchingIw),
fused.intervalId(),
RuntimeVehicleUsageIntervalSourceType.IW_CYCLE.name(),
List.of(card.intervalId()),
List.of(RuntimeVehicleUsageIntervalSourceType.CARD_VEHICLES_USED.name()),
fused.startedAt(),
fused.endedAt(),
absoluteDeltaSeconds(card.startedAt(), matchingIw.startedAt()),
absoluteDeltaSeconds(card.endedAt(), matchingIw.endedAt()),
"Normalized CARD_VEHICLES_USED interval matches IW_CYCLE. IW_CYCLE is primary for vehicle usage identity; CARD_VEHICLES_USED remains corroborating source evidence.",
vehicleConflictWarnings(card, matchingIw)
));
warnings.addAll(vehicleConflictWarnings(card, matchingIw));
} else {
effective.add(card);
decisions.add(new RuntimeVehicleUsageReconciliationDecisionDto(
RULE_CVU_FALLBACK,
"FALLBACK_PRIMARY_SELECTED",
"NO_COMPATIBLE_IW_CYCLE",
card.intervalId(),
RuntimeVehicleUsageIntervalSourceType.CARD_VEHICLES_USED.name(),
List.of(),
List.of(),
card.startedAt(),
card.endedAt(),
null,
null,
"No compatible IW_CYCLE interval was found; CARD_VEHICLES_USED is kept as fallback vehicle-usage evidence.",
List.of()
));
}
}
iwByKey.forEach((key, iw) -> {
if (!usedIwKeys.contains(key)) {
effective.add(iw);
decisions.add(new RuntimeVehicleUsageReconciliationDecisionDto(
RULE_IW_PRIMARY,
"PRIMARY_SELECTED",
"NO_COMPATIBLE_CARD_VEHICLES_USED",
iw.intervalId(),
RuntimeVehicleUsageIntervalSourceType.IW_CYCLE.name(),
List.of(),
List.of(),
iw.startedAt(),
iw.endedAt(),
null,
null,
"No compatible CARD_VEHICLES_USED interval was found; IW_CYCLE is kept as primary vehicle-usage evidence.",
List.of()
));
}
});
effective.sort(intervalComparator());
suppressed.sort(intervalComparator());
return new ReconciliationStepResult(List.copyOf(effective), List.copyOf(suppressed));
}
private DriverWorkingTimeVehicleUsageInterval bestMatchingIw(
DriverWorkingTimeVehicleUsageInterval card,
List<DriverWorkingTimeVehicleUsageInterval> iwIntervals,
Set<String> usedIwKeys
) {
if (card == null || iwIntervals == null || iwIntervals.isEmpty()) {
return null;
}
return iwIntervals.stream()
.filter(Objects::nonNull)
.filter(iw -> !usedIwKeys.contains(intervalIdentity(iw)))
.filter(iw -> canReconcileCardWithIw(card, iw))
.min(Comparator.comparingLong(iw -> matchScore(card, iw)))
.orElse(null);
}
private boolean canReconcileCardWithIw(
DriverWorkingTimeVehicleUsageInterval card,
DriverWorkingTimeVehicleUsageInterval iw
) {
if (card == null || iw == null || card.startedAt() == null || card.endedAt() == null
|| iw.startedAt() == null || iw.endedAt() == null) {
return false;
}
if (!Objects.equals(card.driverKey(), iw.driverKey())) {
return false;
}
if (!compatibleVehicleIdentity(card, iw)) {
return false;
}
long startDelta = Math.abs(card.startedAt().toEpochSecond() - iw.startedAt().toEpochSecond());
long endDelta = Math.abs(card.endedAt().toEpochSecond() - iw.endedAt().toEpochSecond());
return startDelta <= MATCH_TOLERANCE_SECONDS && endDelta <= MATCH_TOLERANCE_SECONDS;
}
private boolean canMergeCardVehicleUsedTechnicalMidnightSplit(
DriverWorkingTimeVehicleUsageInterval left,
DriverWorkingTimeVehicleUsageInterval right
) {
if (left == null || right == null || left.endedAt() == null || right.startedAt() == null) {
return false;
}
if (!Objects.equals(left.driverKey(), right.driverKey())) {
return false;
}
if (!compatibleVehicleIdentity(left, right)) {
return false;
}
if (!isEndOfDay(left.endedAt()) || !isStartOfDay(right.startedAt())) {
return false;
}
long gap = right.startedAt().toEpochSecond() - left.endedAt().toEpochSecond();
return gap >= 0 && gap <= 1;
}
private DriverWorkingTimeVehicleUsageInterval mergeCardVehicleUsedTechnicalSplit(
DriverWorkingTimeVehicleUsageInterval left,
DriverWorkingTimeVehicleUsageInterval right
) {
OffsetDateTime end = right.endedAt() == null ? left.endedAt() : right.endedAt();
return new DriverWorkingTimeVehicleUsageInterval(
firstNonNull(left.sessionId(), right.sessionId()),
firstNonBlank(left.driverKey(), right.driverKey()),
mergedIntervalId("CVU_MERGED", left, right),
firstNonBlank(left.firstSourceIntervalId(), left.intervalId()),
firstNonBlank(right.lastSourceIntervalId(), right.intervalId(), left.lastSourceIntervalId()),
left.startedAt(),
end,
left.startedAtEpochSecond(),
end == null ? null : end.toEpochSecond(),
end == null ? left.durationSeconds() : end.toEpochSecond() - left.startedAtEpochSecond(),
firstNonNull(left.odometerBeginKm(), right.odometerBeginKm()),
firstNonNull(right.odometerEndKm(), left.odometerEndKm()),
firstNonBlank(left.registrationKey(), right.registrationKey()),
firstNonBlank(left.vehicleKey(), right.vehicleKey()),
firstNonBlank(left.sourceKind(), right.sourceKind()),
sourceIntervalIds(left, right)
);
}
private DriverWorkingTimeVehicleUsageInterval fuseIwPrimary(
DriverWorkingTimeVehicleUsageInterval iw,
DriverWorkingTimeVehicleUsageInterval card
) {
return new DriverWorkingTimeVehicleUsageInterval(
firstNonNull(iw.sessionId(), card.sessionId()),
firstNonBlank(iw.driverKey(), card.driverKey()),
iw.intervalId(),
firstNonBlank(iw.firstSourceIntervalId(), iw.intervalId()),
firstNonBlank(iw.lastSourceIntervalId(), iw.intervalId()),
iw.startedAt(),
iw.endedAt(),
iw.startedAtEpochSecond(),
iw.endedAtEpochSecond(),
iw.durationSeconds(),
firstNonNull(iw.odometerBeginKm(), card.odometerBeginKm()),
firstNonNull(iw.odometerEndKm(), card.odometerEndKm()),
firstNonBlank(iw.registrationKey(), card.registrationKey()),
firstNonBlank(iw.vehicleKey(), card.vehicleKey()),
firstNonBlank(iw.sourceKind(), RuntimeVehicleUsageIntervalSourceType.IW_CYCLE.name()),
sourceIntervalIds(iw, card)
);
}
private boolean compatibleVehicleIdentity(
DriverWorkingTimeVehicleUsageInterval left,
DriverWorkingTimeVehicleUsageInterval right
) {
boolean registrationCompatible = compatibleNullable(left.registrationKey(), right.registrationKey());
boolean vehicleCompatible = compatibleNullable(left.vehicleKey(), right.vehicleKey());
if (hasText(left.registrationKey()) && hasText(right.registrationKey())) {
return registrationCompatible && vehicleCompatible;
}
if (hasText(left.vehicleKey()) && hasText(right.vehicleKey())) {
return vehicleCompatible && registrationCompatible;
}
return registrationCompatible || vehicleCompatible;
}
private boolean compatibleNullable(String left, String right) {
return !hasText(left) || !hasText(right) || Objects.equals(left, right);
}
private List<String> vehicleConflictWarnings(
DriverWorkingTimeVehicleUsageInterval card,
DriverWorkingTimeVehicleUsageInterval iw
) {
List<String> result = new ArrayList<>();
if (hasText(card.registrationKey()) && hasText(iw.registrationKey())
&& !Objects.equals(card.registrationKey(), iw.registrationKey())) {
result.add("CARD_VEHICLES_USED and IW_CYCLE identify different registrations for overlapping vehicle usage intervals: "
+ card.registrationKey() + " vs " + iw.registrationKey() + ".");
}
if (hasText(card.vehicleKey()) && hasText(iw.vehicleKey())
&& !Objects.equals(card.vehicleKey(), iw.vehicleKey())) {
result.add("CARD_VEHICLES_USED and IW_CYCLE identify different vehicle keys for overlapping vehicle usage intervals: "
+ card.vehicleKey() + " vs " + iw.vehicleKey() + ".");
}
return List.copyOf(result);
}
private String equivalenceType(DriverWorkingTimeVehicleUsageInterval card, DriverWorkingTimeVehicleUsageInterval iw) {
Long start = absoluteDeltaSeconds(card.startedAt(), iw.startedAt());
Long end = absoluteDeltaSeconds(card.endedAt(), iw.endedAt());
if ((start == null || start == 0L) && (end == null || end == 0L)) {
return "EXACT_INTERVAL_BOUNDARIES";
}
return "COMPATIBLE_INTERVAL_BOUNDARIES";
}
private long matchScore(DriverWorkingTimeVehicleUsageInterval card, DriverWorkingTimeVehicleUsageInterval iw) {
Long start = absoluteDeltaSeconds(card.startedAt(), iw.startedAt());
Long end = absoluteDeltaSeconds(card.endedAt(), iw.endedAt());
return (start == null ? Long.MAX_VALUE / 4 : start) + (end == null ? Long.MAX_VALUE / 4 : end);
}
private Long absoluteDeltaSeconds(OffsetDateTime left, OffsetDateTime right) {
if (left == null || right == null) {
return null;
}
return Math.abs(left.toEpochSecond() - right.toEpochSecond());
}
private Long deltaSeconds(OffsetDateTime left, OffsetDateTime right) {
if (left == null || right == null) {
return null;
}
return right.toEpochSecond() - left.toEpochSecond();
}
private boolean isEndOfDay(OffsetDateTime value) {
return value != null && value.toLocalTime().equals(LocalTime.of(23, 59, 59));
}
private boolean isStartOfDay(OffsetDateTime value) {
return value != null && value.toLocalTime().equals(LocalTime.MIDNIGHT);
}
private Comparator<DriverWorkingTimeVehicleUsageInterval> intervalComparator() {
return Comparator
.comparing(DriverWorkingTimeVehicleUsageInterval::startedAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(DriverWorkingTimeVehicleUsageInterval::endedAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(DriverWorkingTimeVehicleUsageInterval::driverKey, Comparator.nullsLast(String::compareTo))
.thenComparing(DriverWorkingTimeVehicleUsageInterval::intervalId, Comparator.nullsLast(String::compareTo));
}
private List<String> sourceIntervalIds(DriverWorkingTimeVehicleUsageInterval... intervals) {
LinkedHashSet<String> values = new LinkedHashSet<>();
for (DriverWorkingTimeVehicleUsageInterval interval : intervals) {
if (interval == null) {
continue;
}
add(values, interval.intervalId());
add(values, interval.firstSourceIntervalId());
add(values, interval.lastSourceIntervalId());
if (interval.sourceIntervalIds() != null) {
interval.sourceIntervalIds().forEach(value -> add(values, value));
}
}
return List.copyOf(values);
}
private void add(Set<String> values, String value) {
if (value != null && !value.isBlank()) {
values.add(value);
}
}
private String mergedIntervalId(
String prefix,
DriverWorkingTimeVehicleUsageInterval left,
DriverWorkingTimeVehicleUsageInterval right
) {
String driver = firstNonBlank(left.driverKey(), right.driverKey(), "driver");
String registration = firstNonBlank(left.registrationKey(), right.registrationKey(), "vehicle");
String start = left.startedAt() == null ? "open" : left.startedAt().toString();
String end = right.endedAt() == null ? "open" : right.endedAt().toString();
return prefix + "|" + driver + "|" + registration + "|" + start + "|" + end;
}
private String intervalIdentity(DriverWorkingTimeVehicleUsageInterval interval) {
if (interval == null) {
return "null";
}
String id = interval.intervalId();
if (id != null && !id.isBlank()) {
return id;
}
return interval.driverKey() + "|" + interval.registrationKey() + "|" + interval.vehicleKey()
+ "|" + interval.startedAt() + "|" + interval.endedAt();
}
private boolean hasText(String value) {
return value != null && !value.isBlank();
}
@SafeVarargs
private final <T> T firstNonNull(T... values) {
if (values == null) {
return null;
}
for (T value : values) {
if (value != null) {
return value;
}
}
return null;
}
private String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String value : values) {
if (value != null && !value.isBlank()) {
return value;
}
}
return null;
}
private record ReconciliationStepResult(
List<DriverWorkingTimeVehicleUsageInterval> effectiveIntervals,
List<DriverWorkingTimeVehicleUsageInterval> suppressedIntervals
) {
}
}

View File

@ -2,10 +2,15 @@ package at.procon.eventhub.processing.eventprocessing.plan;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.processing.dto.RuntimeDriverPartitionDebugDto;
import at.procon.eventhub.processing.dto.RuntimeSupportEvidenceNormalizationDebugDto;
import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDto;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto;
import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.service.RuntimeDriverWorkingTimeScopeProcessingService;
import java.time.OffsetDateTime;
import java.util.List;
@ -13,13 +18,15 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
class DriverWorkingTimeRuntimeProcessingPlanTest {
@Test
void applyExecutionRequestDoesNotRewriteScopeExpansionFromAttachmentFlag() {
DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan(
org.mockito.Mockito.mock(RuntimeDriverWorkingTimeScopeProcessingService.class)
Mockito.mock(RuntimeDriverWorkingTimeScopeProcessingService.class)
);
UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest(
UUID.randomUUID(),
@ -69,4 +76,143 @@ class DriverWorkingTimeRuntimeProcessingPlanTest {
assertThat(resolved.expandVehicleEvents()).isTrue();
assertThat(resolved.vehicleExpansionPaddingMinutes()).isEqualTo(15);
}
@Test
void executeCanOmitExtendedPartitionPayloads() {
RuntimeDriverWorkingTimeScopeProcessingService scopeService = Mockito.mock(RuntimeDriverWorkingTimeScopeProcessingService.class);
DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan(scopeService);
RuntimeDriverPartitionDebugDto partitionDebug = new RuntimeDriverPartitionDebugDto(
"12:DRIVER-1",
5,
2,
1,
1,
0,
6,
List.of(),
List.of(),
List.of("debug note"),
List.of()
);
RuntimeSupportEvidenceNormalizationDebugDto normalizationDebug = new RuntimeSupportEvidenceNormalizationDebugDto(
6,
2,
4,
List.of("normalization note")
);
UnifiedRuntimeProcessingRequest processedRequest = processedRequest();
UnifiedRuntimeDerivedProjectionResultDto driverResult = new UnifiedRuntimeDerivedProjectionResultDto(
processedRequest,
5,
1,
1,
6,
List.of(),
null,
List.of("driver processed"),
normalizationDebug,
partitionDebug
);
Mockito.when(scopeService.processScope(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean()))
.thenReturn(new UnifiedRuntimeDriverWorkingTimeScopeResultDto(
processedRequest,
6,
1,
1,
List.of(),
Map.of("12:DRIVER-1", driverResult),
Map.of("12:DRIVER-1", partitionDebug),
List.of("scope processed"),
List.of()
));
RuntimeProcessingExecutionResultDto result = plan.execute(new RuntimeProcessingExecutionApiRequest(
DriverWorkingTimeRuntimeProcessingPlan.PLAN_KEY,
sourceSelection(),
new RuntimeEventPartitioningApiRequest(
RuntimeEventPartitioningStrategy.DRIVER,
Set.of("12:DRIVER-1"),
false,
Set.of("12:DRIVER-1"),
false,
Set.of(),
false,
true,
15,
false
),
List.of(),
Map.of(
DriverWorkingTimeRuntimeProcessingPlan.INCLUDE_EXECUTION_MODULE_RESULTS_PARAMETER, false,
DriverWorkingTimeRuntimeProcessingPlan.INCLUDE_PARTITION_METADATA_PARAMETER, false,
DriverWorkingTimeRuntimeProcessingPlan.INCLUDE_PARTITION_MODULE_RESULTS_PARAMETER, false,
DriverWorkingTimeRuntimeProcessingPlan.INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER, false,
"includePartitionDebug", false
)
));
assertThat(result.moduleResults()).isEmpty();
assertThat(result.partitionResults()).containsOnlyKeys("12:DRIVER-1");
assertThat(result.partitionResults().get("12:DRIVER-1").metadata()).isEmpty();
assertThat(result.partitionResults().get("12:DRIVER-1").moduleResults()).isEmpty();
UnifiedRuntimeDerivedProjectionResultDto shapedResult =
(UnifiedRuntimeDerivedProjectionResultDto) result.partitionResults().get("12:DRIVER-1").result();
assertThat(shapedResult.supportEvidenceNormalization()).isNull();
assertThat(shapedResult.partitionDebug()).isNull();
}
private UnifiedRuntimeProcessingApiRequest sourceSelection() {
return new UnifiedRuntimeProcessingApiRequest(
UUID.fromString("11111111-1111-1111-1111-111111111111"),
List.of(),
null,
null,
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:DRIVER-1",
Set.of("12:DRIVER-1"),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-31T23:59:59Z"),
true,
15,
true,
null,
null,
null,
null,
List.of()
);
}
private UnifiedRuntimeProcessingRequest processedRequest() {
return new UnifiedRuntimeProcessingRequest(
UUID.fromString("11111111-1111-1111-1111-111111111111"),
List.of(UUID.fromString("11111111-1111-1111-1111-111111111111")),
null,
null,
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:DRIVER-1",
Set.of("12:DRIVER-1"),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-31T23:59:59Z"),
true,
15,
true
);
}
}

View File

@ -38,9 +38,14 @@ class DriverWorkingTimeRuntimeEventProcessingProfileTest {
"minimumRestPeriodMinutes",
"attachVehicleOnlyEvents",
"vehicleEvidencePaddingMinutes",
"includeExecutionModuleResults",
"includePartitionMetadata",
"includePartitionModuleResults",
"includeActivityIntervals",
"includeDrivingIntervals",
"includePartitionDebug"
"includePartitionDebug",
"includeSupportEvidenceNormalization",
"eventMixingMode"
);
}
@ -155,6 +160,6 @@ class DriverWorkingTimeRuntimeEventProcessingProfileTest {
assertThat(result.partitioningStrategy()).isEqualTo(RuntimeEventPartitioningStrategy.DRIVER);
assertThat(result.partitionResults()).containsOnlyKeys("12:DRIVER-1");
assertThat(result.partitionResults().get("12:DRIVER-1").partitionType()).isEqualTo("DRIVER");
assertThat(result.partitionResults().get("12:DRIVER-1").result()).isSameAs(driverResult);
assertThat(result.partitionResults().get("12:DRIVER-1").result()).isEqualTo(driverResult);
}
}

View File

@ -0,0 +1,162 @@
package at.procon.eventhub.processing.eventprocessing.vehicleusage;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class RuntimeVehicleUsageReconciliationServiceTest {
private final RuntimeVehicleUsageReconciliationService service = new RuntimeVehicleUsageReconciliationService();
@Test
void coalescesCardVehiclesUsedTechnicalMidnightSplitBeforeReconciliation() {
DriverWorkingTimeVehicleUsageInterval first = interval(
"TACHOGRAPH:CARD_VEHICLES_USED:1",
"CARD_VEHICLES_USED:first",
"CARD_VEHICLES_USED:first-end",
"2026-04-01T15:43:00Z",
"2026-04-01T23:59:59Z",
"DRIVER_CARD"
);
DriverWorkingTimeVehicleUsageInterval second = interval(
"TACHOGRAPH:CARD_VEHICLES_USED:2",
"CARD_VEHICLES_USED:second",
"CARD_VEHICLES_USED:second-end",
"2026-04-02T00:00:00Z",
"2026-04-02T10:30:00Z",
"DRIVER_CARD"
);
RuntimeVehicleUsageReconciliationResult result = service.reconcile(List.of(first, second));
assertThat(result.normalizedCardVehicleUsedIntervals()).hasSize(1);
assertThat(result.effectiveVehicleUsageIntervals()).hasSize(1);
DriverWorkingTimeVehicleUsageInterval normalized = result.effectiveVehicleUsageIntervals().getFirst();
assertThat(normalized.startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T15:43:00Z"));
assertThat(normalized.endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-02T10:30:00Z"));
assertThat(normalized.sourceIntervalIds()).contains(
"TACHOGRAPH:CARD_VEHICLES_USED:1",
"TACHOGRAPH:CARD_VEHICLES_USED:2",
"CARD_VEHICLES_USED:first",
"CARD_VEHICLES_USED:second-end"
);
assertThat(result.vehicleUsageReconciliationDecisions()).extracting(RuntimeVehicleUsageReconciliationDecisionDto::ruleId)
.contains(RuntimeVehicleUsageReconciliationService.RULE_CVU_MIDNIGHT_CONTINUATION);
}
@Test
void reconcilesNormalizedCardVehiclesUsedWithIwCycleAndKeepsIwAsPrimary() {
DriverWorkingTimeVehicleUsageInterval cvuFirst = interval(
"TACHOGRAPH:CARD_VEHICLES_USED:1",
"CARD_VEHICLES_USED:first",
"CARD_VEHICLES_USED:first-end",
"2026-04-01T15:43:00Z",
"2026-04-01T23:59:59Z",
"DRIVER_CARD"
);
DriverWorkingTimeVehicleUsageInterval cvuSecond = interval(
"TACHOGRAPH:CARD_VEHICLES_USED:2",
"CARD_VEHICLES_USED:second",
"CARD_VEHICLES_USED:second-end",
"2026-04-02T00:00:00Z",
"2026-04-02T10:30:00Z",
"DRIVER_CARD"
);
DriverWorkingTimeVehicleUsageInterval iw = interval(
"TACHOGRAPH:IW_CYCLE:10",
"IW_CYCLE:first",
"IW_CYCLE:last",
"2026-04-01T15:43:00Z",
"2026-04-02T10:30:00Z",
"VEHICLE_UNIT"
);
RuntimeVehicleUsageReconciliationResult result = service.reconcile(List.of(cvuFirst, cvuSecond, iw));
assertThat(result.effectiveVehicleUsageIntervals()).hasSize(1);
DriverWorkingTimeVehicleUsageInterval effective = result.effectiveVehicleUsageIntervals().getFirst();
assertThat(effective.intervalId()).isEqualTo("TACHOGRAPH:IW_CYCLE:10");
assertThat(effective.sourceKind()).isEqualTo("VEHICLE_UNIT");
assertThat(effective.sourceIntervalIds()).contains(
"TACHOGRAPH:IW_CYCLE:10",
"TACHOGRAPH:CARD_VEHICLES_USED:1",
"TACHOGRAPH:CARD_VEHICLES_USED:2"
);
assertThat(result.suppressedSecondaryIntervals()).hasSize(1);
assertThat(result.vehicleUsageReconciliationDecisions()).extracting(RuntimeVehicleUsageReconciliationDecisionDto::ruleId)
.contains(
RuntimeVehicleUsageReconciliationService.RULE_CVU_MIDNIGHT_CONTINUATION,
RuntimeVehicleUsageReconciliationService.RULE_CVU_IW_EXACT_OR_COMPATIBLE
);
}
@Test
void keepsCardVehiclesUsedAsFallbackWhenIwCycleIsMissing() {
DriverWorkingTimeVehicleUsageInterval card = interval(
"TACHOGRAPH:CARD_VEHICLES_USED:1",
"CARD_VEHICLES_USED:first",
"CARD_VEHICLES_USED:last",
"2026-04-01T08:00:00Z",
"2026-04-01T12:00:00Z",
"DRIVER_CARD"
);
RuntimeVehicleUsageReconciliationResult result = service.reconcile(List.of(card));
assertThat(result.effectiveVehicleUsageIntervals()).containsExactly(card);
assertThat(result.vehicleUsageReconciliationDecisions()).extracting(RuntimeVehicleUsageReconciliationDecisionDto::ruleId)
.containsExactly(RuntimeVehicleUsageReconciliationService.RULE_CVU_FALLBACK);
}
@Test
void keepsIwCycleAsPrimaryWhenCardVehiclesUsedIsMissing() {
DriverWorkingTimeVehicleUsageInterval iw = interval(
"TACHOGRAPH:IW_CYCLE:10",
"IW_CYCLE:first",
"IW_CYCLE:last",
"2026-04-01T08:00:00Z",
"2026-04-01T12:00:00Z",
"VEHICLE_UNIT"
);
RuntimeVehicleUsageReconciliationResult result = service.reconcile(List.of(iw));
assertThat(result.effectiveVehicleUsageIntervals()).containsExactly(iw);
assertThat(result.vehicleUsageReconciliationDecisions()).extracting(RuntimeVehicleUsageReconciliationDecisionDto::ruleId)
.containsExactly(RuntimeVehicleUsageReconciliationService.RULE_IW_PRIMARY);
}
private DriverWorkingTimeVehicleUsageInterval interval(
String intervalId,
String firstSourceIntervalId,
String lastSourceIntervalId,
String start,
String end,
String sourceKind
) {
OffsetDateTime startedAt = OffsetDateTime.parse(start);
OffsetDateTime endedAt = OffsetDateTime.parse(end);
return new DriverWorkingTimeVehicleUsageInterval(
UUID.fromString("11111111-1111-1111-1111-111111111111"),
"1:driver",
intervalId,
firstSourceIntervalId,
lastSourceIntervalId,
startedAt,
endedAt,
startedAt.toEpochSecond(),
endedAt.toEpochSecond(),
endedAt.toEpochSecond() - startedAt.toEpochSecond(),
null,
null,
"1:LL-158TE",
"VIN:WDB9634031L123456",
sourceKind,
List.of(firstSourceIntervalId, lastSourceIntervalId)
);
}
}