Add NDI home classification runtime pipeline

This commit is contained in:
trifonovt 2026-06-17 09:13:31 +02:00
parent 34e6c6f236
commit 5a10558612
26 changed files with 2474 additions and 19 deletions

View File

@ -0,0 +1,136 @@
# NDI HOME / NOT_HOME classification implementation
This patch implements the HOME / NOT_HOME part of `docs/ndi_home_classification_en.md` as a dedicated runtime processing plan while reusing the existing driver-working-time pipeline.
## Public processing plan
Use:
```text
driver-home-classification-v1
```
The plan delegates to the shared `driver-working-time-v1` pipeline and explicitly inserts:
```text
support-evidence-normalization
-> ndi-home-classification
-> driving-derived-projections
```
The original `driver-working-time-v1` plan does not run the optional NDI module by default. It can opt in by explicitly requesting `ndi-home-classification`.
## Reused projection structures
`DriverWorkingTimeReusableProjectionBuilder.buildAllNonDrivingIntervalCoverage(...)` runs the existing Esper interruption/card-absence/GNSS enrichment pipeline with a zero rest-candidate threshold. It therefore creates enriched evidence for every positive non-driving interruption without changing the legacy daily/weekly-rest threshold or outputs.
The implementation reuses `DriverWorkingTimeRestCoverageInterval` as the enriched NDI evidence model. It provides:
- previous and next driving/vehicle identities;
- NDI start, end, and duration;
- card-absence duration and percentage;
- begin/end boundary GNSS evidence;
- boundary odometer and movement evidence.
## Implemented classification rules
The rules are evaluated in the document order:
1. previous and next vehicles differ -> `HOME`;
2. card absent for more than 80% -> `HOME`;
3. NDI longer than 24 hours -> `HOME`;
4. no position: NDI longer than 7.5 hours -> `HOME`, otherwise `NOT_HOME`;
5. positioned long NDI in a company or driver home cluster -> `HOME`;
6. positioned long NDI outside those clusters -> `NOT_HOME`;
7. remaining short NDI -> `NOT_HOME`.
Every classification contains a `DriverNdiHomeClassificationReason`, so the first matching rule remains visible in the API response.
## Location learning and clustering
Only NDIs longer than 7.5 hours with a position are added to the corpus.
Position selection follows the document through the existing boundary-evidence resolver:
```text
resolved begin-boundary evidence for the previous driving/vehicle context,
otherwise resolved end-boundary evidence for the next driving/vehicle context
```
The selected evidence is the closest eligible support-position event within the configured boundary lookup window, so it is an approximation when no event exists exactly at the driving boundary.
The in-memory cache:
- accumulates observations across one or more file-session executions;
- deduplicates the same NDI across repeated/overlapping sessions;
- retains the source session IDs as provenance;
- stores the driver key on every observation;
- does not permanently mark a driver as "actual" or "other".
For each result driver, the same cached corpus is viewed as:
```text
actual-driver observations
other-driver observations
```
This makes the distinction request-relative and allows the corpus to be reused for another driver.
Clustering uses Java DBSCAN with Haversine distance. Defaults are 150 metres and three points. Noise observations remain in the denominator for visit-share calculations but are never home clusters.
## File-session learning scope
The dedicated plan defaults `ndiLearnAllFileSessionDrivers` to `true`.
For a request with explicit canonical driver keys, the plan internally loads all drivers from the selected file sessions for location learning and filters the response back to the originally requested drivers.
The scope is not broadened when:
- the source selection is mixed or database-only;
- the option is disabled;
- the request uses only alternate card/source selectors and cannot be filtered safely by canonical driver key.
## Configuration
The defaults are under:
```yaml
eventhub:
tachograph-file-session:
processing:
ndi-long-minutes: 450
ndi-very-long-minutes: 1440
ndi-card-removal-percent: 80
ndi-visit-share-percent: 25
ndi-dbscan-eps-meters: 150
ndi-dbscan-min-points: 3
ndi-location-cache-ttl: 4h
ndi-location-cache-max-observations: 100000
ndi-location-cache-namespace: default
```
For tenantless uploaded sessions, configure a namespace that prevents unrelated operational contexts from sharing a corpus. Explicit tenant keys always create tenant-scoped corpora.
## Response extension
Each driver partition can now contain:
```text
ndiHomeClassification
```
It includes:
- all NDI classifications;
- company and driver home cluster IDs;
- cluster centroids and visit statistics;
- actual-driver versus other-driver cached observation counts;
- diagnostics and notes.
The field is omitted when the optional module was not executed, preserving the existing JSON shape for normal `driver-working-time-v1` calls.
## Current implementation boundary
This patch implements sections 1-4 of the document: NDI derivation/enrichment, location clustering, home-location determination, and HOME / NOT_HOME classification.
Section 5, border-crossing/country trip segmentation, is intentionally not included yet. It needs a separate country-resolution abstraction and a decision between local geographic data, PostGIS, or an external reverse-geocoding provider.

View File

@ -381,6 +381,15 @@ public class EventHubProperties {
private int restCandidateGeoLookaheadMinutes = 180; private int restCandidateGeoLookaheadMinutes = 180;
private int restCandidateGeoStationaryMaxMeters = 500; private int restCandidateGeoStationaryMaxMeters = 500;
private int restCandidateGeoMinorMovementMaxMeters = 2000; private int restCandidateGeoMinorMovementMaxMeters = 2000;
private int ndiLongMinutes = 450;
private int ndiVeryLongMinutes = 1440;
private double ndiCardRemovalPercent = 80.0d;
private double ndiVisitSharePercent = 25.0d;
private int ndiDbscanEpsMeters = 150;
private int ndiDbscanMinPoints = 3;
private Duration ndiLocationCacheTtl = Duration.ofHours(4);
private int ndiLocationCacheMaxObservations = 100000;
private String ndiLocationCacheNamespace = "default";
private int mergeGapSeconds = 0; private int mergeGapSeconds = 0;
private int gapDetectionToleranceSeconds = 0; private int gapDetectionToleranceSeconds = 0;
@ -461,6 +470,83 @@ public class EventHubProperties {
Math.max(this.restCandidateGeoStationaryMaxMeters, restCandidateGeoMinorMovementMaxMeters); Math.max(this.restCandidateGeoStationaryMaxMeters, restCandidateGeoMinorMovementMaxMeters);
} }
public int getNdiLongMinutes() {
return ndiLongMinutes;
}
public void setNdiLongMinutes(int ndiLongMinutes) {
this.ndiLongMinutes = Math.max(1, ndiLongMinutes);
this.ndiVeryLongMinutes = Math.max(this.ndiLongMinutes, this.ndiVeryLongMinutes);
}
public int getNdiVeryLongMinutes() {
return ndiVeryLongMinutes;
}
public void setNdiVeryLongMinutes(int ndiVeryLongMinutes) {
this.ndiVeryLongMinutes = Math.max(this.ndiLongMinutes, ndiVeryLongMinutes);
}
public double getNdiCardRemovalPercent() {
return ndiCardRemovalPercent;
}
public void setNdiCardRemovalPercent(double ndiCardRemovalPercent) {
this.ndiCardRemovalPercent = Math.max(0.0d, Math.min(100.0d, ndiCardRemovalPercent));
}
public double getNdiVisitSharePercent() {
return ndiVisitSharePercent;
}
public void setNdiVisitSharePercent(double ndiVisitSharePercent) {
this.ndiVisitSharePercent = Math.max(0.0d, Math.min(100.0d, ndiVisitSharePercent));
}
public int getNdiDbscanEpsMeters() {
return ndiDbscanEpsMeters;
}
public void setNdiDbscanEpsMeters(int ndiDbscanEpsMeters) {
this.ndiDbscanEpsMeters = Math.max(1, ndiDbscanEpsMeters);
}
public int getNdiDbscanMinPoints() {
return ndiDbscanMinPoints;
}
public void setNdiDbscanMinPoints(int ndiDbscanMinPoints) {
this.ndiDbscanMinPoints = Math.max(1, ndiDbscanMinPoints);
}
public Duration getNdiLocationCacheTtl() {
return ndiLocationCacheTtl;
}
public void setNdiLocationCacheTtl(Duration ndiLocationCacheTtl) {
if (ndiLocationCacheTtl != null && !ndiLocationCacheTtl.isNegative() && !ndiLocationCacheTtl.isZero()) {
this.ndiLocationCacheTtl = ndiLocationCacheTtl;
}
}
public int getNdiLocationCacheMaxObservations() {
return ndiLocationCacheMaxObservations;
}
public void setNdiLocationCacheMaxObservations(int ndiLocationCacheMaxObservations) {
this.ndiLocationCacheMaxObservations = Math.max(100, ndiLocationCacheMaxObservations);
}
public String getNdiLocationCacheNamespace() {
return ndiLocationCacheNamespace;
}
public void setNdiLocationCacheNamespace(String ndiLocationCacheNamespace) {
if (ndiLocationCacheNamespace != null && !ndiLocationCacheNamespace.isBlank()) {
this.ndiLocationCacheNamespace = ndiLocationCacheNamespace.trim();
}
}
public int getMergeGapSeconds() { public int getMergeGapSeconds() {
return mergeGapSeconds; return mergeGapSeconds;
} }

View File

@ -0,0 +1,16 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.model;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeRestCoverageInterval;
public record DriverNdiHomeClassification(
String intervalId,
DriverWorkingTimeRestCoverageInterval evidence,
Double latitude,
Double longitude,
String locationSource,
String clusterId,
boolean clusterNoise,
DriverNdiHomeStatus status,
DriverNdiHomeClassificationReason reason
) {
}

View File

@ -0,0 +1,13 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.model;
public enum DriverNdiHomeClassificationReason {
VEHICLE_CHANGED,
CARD_REMOVED_OVER_THRESHOLD,
REST_OVER_VERY_LONG_THRESHOLD,
NO_POSITION_LONG_REST,
NO_POSITION_SHORT_REST,
COMPANY_HOME_CLUSTER,
DRIVER_HOME_CLUSTER,
LONG_REST_OUTSIDE_HOME_CLUSTER,
SHORT_REST
}

View File

@ -0,0 +1,33 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.model;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
public record DriverNdiHomeClassificationResult(
String driverKey,
int nonDrivingIntervalCount,
int currentLongLocationObservationCount,
int cachedActualDriverObservationCount,
int cachedOtherDriverObservationCount,
int companyHomeClusterCount,
int driverHomeClusterCount,
List<DriverNdiHomeClassification> classifications,
List<DriverNdiLocationCluster> clusters,
Set<String> companyHomeClusterIds,
Set<String> driverHomeClusterIds,
List<String> notes
) {
public DriverNdiHomeClassificationResult {
classifications = classifications == null ? List.of() : List.copyOf(classifications);
clusters = clusters == null ? List.of() : List.copyOf(clusters);
companyHomeClusterIds = companyHomeClusterIds == null
? Set.of()
: Collections.unmodifiableSet(new LinkedHashSet<>(companyHomeClusterIds));
driverHomeClusterIds = driverHomeClusterIds == null
? Set.of()
: Collections.unmodifiableSet(new LinkedHashSet<>(driverHomeClusterIds));
notes = notes == null ? List.of() : List.copyOf(notes);
}
}

View File

@ -0,0 +1,26 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.model;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public record DriverNdiHomeClassificationScopeResult(
String corpusKey,
int currentObservationCount,
int cachedObservationCount,
int clusterCount,
Map<String, DriverNdiHomeClassificationResult> driverResults,
List<String> notes
) {
public DriverNdiHomeClassificationScopeResult {
driverResults = driverResults == null
? Map.of()
: Collections.unmodifiableMap(new LinkedHashMap<>(driverResults));
notes = notes == null ? List.of() : List.copyOf(notes);
}
public DriverNdiHomeClassificationResult resultForDriver(String driverKey) {
return driverResults.get(driverKey);
}
}

View File

@ -0,0 +1,6 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.model;
public enum DriverNdiHomeStatus {
HOME,
NOT_HOME
}

View File

@ -0,0 +1,16 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.model;
public record DriverNdiLocationCluster(
String clusterId,
double centroidLatitude,
double centroidLongitude,
int totalVisitCount,
int distinctDriverCount,
int actualDriverVisitCount,
int otherDriverVisitCount,
double companyVisitSharePercent,
double actualDriverVisitSharePercent,
boolean companyHome,
boolean actualDriverHome
) {
}

View File

@ -0,0 +1,18 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.model;
import java.time.Instant;
import java.util.List;
public record DriverNdiLocationCorpusSnapshot(
String corpusKey,
int observationCountBeforeMerge,
int addedObservationCount,
int observationCountAfterMerge,
List<DriverNdiLocationObservation> observations,
Instant updatedAt,
Instant expiresAt
) {
public DriverNdiLocationCorpusSnapshot {
observations = observations == null ? List.of() : List.copyOf(observations);
}
}

View File

@ -0,0 +1,67 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.model;
import java.time.OffsetDateTime;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.UUID;
public record DriverNdiLocationObservation(
String observationId,
String tenantKey,
List<UUID> sourceSessionIds,
UUID compositeSessionId,
String driverKey,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
double latitude,
double longitude,
String locationSource,
String geoEventId,
String geoEventDomain,
Long geoDistanceSeconds,
String previousDrivingSourceIntervalId,
String nextDrivingSourceIntervalId,
String previousRegistrationKey,
String nextRegistrationKey,
String previousVehicleKey,
String nextVehicleKey
) {
public DriverNdiLocationObservation {
sourceSessionIds = sourceSessionIds == null ? List.of() : List.copyOf(sourceSessionIds);
}
public DriverNdiLocationObservation mergeProvenance(DriverNdiLocationObservation other) {
if (other == null || !observationId.equals(other.observationId())) {
return this;
}
LinkedHashSet<UUID> mergedSessionIds = new LinkedHashSet<>(sourceSessionIds);
mergedSessionIds.addAll(other.sourceSessionIds());
return new DriverNdiLocationObservation(
observationId,
other.tenantKey() != null ? other.tenantKey() : tenantKey,
List.copyOf(mergedSessionIds),
other.compositeSessionId() != null ? other.compositeSessionId() : compositeSessionId,
other.driverKey() != null ? other.driverKey() : driverKey,
other.startedAt() != null ? other.startedAt() : startedAt,
other.endedAt() != null ? other.endedAt() : endedAt,
other.durationSeconds(),
other.latitude(),
other.longitude(),
other.locationSource() != null ? other.locationSource() : locationSource,
other.geoEventId() != null ? other.geoEventId() : geoEventId,
other.geoEventDomain() != null ? other.geoEventDomain() : geoEventDomain,
other.geoDistanceSeconds() != null ? other.geoDistanceSeconds() : geoDistanceSeconds,
other.previousDrivingSourceIntervalId() != null
? other.previousDrivingSourceIntervalId()
: previousDrivingSourceIntervalId,
other.nextDrivingSourceIntervalId() != null
? other.nextDrivingSourceIntervalId()
: nextDrivingSourceIntervalId,
other.previousRegistrationKey() != null ? other.previousRegistrationKey() : previousRegistrationKey,
other.nextRegistrationKey() != null ? other.nextRegistrationKey() : nextRegistrationKey,
other.previousVehicleKey() != null ? other.previousVehicleKey() : previousVehicleKey,
other.nextVehicleKey() != null ? other.nextVehicleKey() : nextVehicleKey
);
}
}

View File

@ -0,0 +1,203 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.service;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationObservation;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.stereotype.Service;
@Service
public class DriverNdiDbscanClusterer {
public static final String NOISE_CLUSTER_ID = "NOISE";
private static final int UNVISITED = Integer.MIN_VALUE;
private static final int NOISE = -1;
private static final double EARTH_RADIUS_METERS = 6_371_008.8d;
public ClusterResult cluster(
List<DriverNdiLocationObservation> observations,
double epsilonMeters,
int minimumPoints
) {
List<DriverNdiLocationObservation> points = observations == null ? List.of() : List.copyOf(observations);
double effectiveEpsilonMeters = Double.isFinite(epsilonMeters)
? Math.max(0.0d, epsilonMeters)
: 0.0d;
int effectiveMinimumPoints = Math.max(1, minimumPoints);
if (points.isEmpty()) {
return new ClusterResult(Map.of(), Map.of());
}
int[] labels = new int[points.size()];
java.util.Arrays.fill(labels, UNVISITED);
int clusterNumber = 0;
for (int pointIndex = 0; pointIndex < points.size(); pointIndex++) {
if (labels[pointIndex] != UNVISITED) {
continue;
}
List<Integer> neighbours = neighbours(points, pointIndex, effectiveEpsilonMeters);
if (neighbours.size() < effectiveMinimumPoints) {
labels[pointIndex] = NOISE;
continue;
}
expandCluster(points, labels, pointIndex, neighbours, clusterNumber, effectiveEpsilonMeters, effectiveMinimumPoints);
clusterNumber++;
}
Map<Integer, List<DriverNdiLocationObservation>> numericClusters = new HashMap<>();
for (int index = 0; index < points.size(); index++) {
if (labels[index] >= 0) {
numericClusters.computeIfAbsent(labels[index], ignored -> new ArrayList<>()).add(points.get(index));
}
}
List<RawCluster> sortedClusters = numericClusters.entrySet().stream()
.map(entry -> RawCluster.of(entry.getKey(), entry.getValue()))
.sorted(Comparator
.comparingDouble(RawCluster::centroidLatitude)
.thenComparingDouble(RawCluster::centroidLongitude)
.thenComparingInt(RawCluster::numericId))
.toList();
Map<Integer, String> clusterIdByNumericId = new HashMap<>();
LinkedHashMap<String, ClusterMembers> clusters = new LinkedHashMap<>();
for (int index = 0; index < sortedClusters.size(); index++) {
RawCluster raw = sortedClusters.get(index);
String clusterId = "CLUSTER-" + String.format("%03d", index + 1);
clusterIdByNumericId.put(raw.numericId(), clusterId);
clusters.put(clusterId, new ClusterMembers(
clusterId,
raw.centroidLatitude(),
raw.centroidLongitude(),
raw.members()
));
}
LinkedHashMap<String, String> assignmentByObservationId = new LinkedHashMap<>();
for (int index = 0; index < points.size(); index++) {
String clusterId = labels[index] == NOISE
? NOISE_CLUSTER_ID
: clusterIdByNumericId.get(labels[index]);
assignmentByObservationId.put(points.get(index).observationId(), clusterId);
}
return new ClusterResult(Map.copyOf(assignmentByObservationId), Map.copyOf(clusters));
}
private void expandCluster(
List<DriverNdiLocationObservation> points,
int[] labels,
int seedIndex,
List<Integer> seedNeighbours,
int clusterNumber,
double epsilonMeters,
int minimumPoints
) {
labels[seedIndex] = clusterNumber;
Deque<Integer> queue = new ArrayDeque<>();
Set<Integer> queued = new LinkedHashSet<>();
for (Integer neighbour : seedNeighbours) {
if (neighbour != seedIndex && queued.add(neighbour)) {
queue.addLast(neighbour);
}
}
while (!queue.isEmpty()) {
int candidate = queue.removeFirst();
if (labels[candidate] == NOISE) {
labels[candidate] = clusterNumber;
}
if (labels[candidate] != UNVISITED) {
continue;
}
labels[candidate] = clusterNumber;
List<Integer> candidateNeighbours = neighbours(points, candidate, epsilonMeters);
if (candidateNeighbours.size() >= minimumPoints) {
for (Integer neighbour : candidateNeighbours) {
if (queued.add(neighbour)) {
queue.addLast(neighbour);
}
}
}
}
}
private List<Integer> neighbours(
List<DriverNdiLocationObservation> points,
int pointIndex,
double epsilonMeters
) {
DriverNdiLocationObservation source = points.get(pointIndex);
List<Integer> neighbours = new ArrayList<>();
for (int candidateIndex = 0; candidateIndex < points.size(); candidateIndex++) {
DriverNdiLocationObservation candidate = points.get(candidateIndex);
if (haversineMeters(
source.latitude(),
source.longitude(),
candidate.latitude(),
candidate.longitude()
) <= epsilonMeters) {
neighbours.add(candidateIndex);
}
}
return neighbours;
}
static double haversineMeters(
double latitudeA,
double longitudeA,
double latitudeB,
double longitudeB
) {
double latitudeDelta = Math.toRadians(latitudeB - latitudeA);
double longitudeDelta = Math.toRadians(longitudeB - longitudeA);
double latitudeARadians = Math.toRadians(latitudeA);
double latitudeBRadians = Math.toRadians(latitudeB);
double haversine = Math.sin(latitudeDelta / 2.0d) * Math.sin(latitudeDelta / 2.0d)
+ Math.cos(latitudeARadians) * Math.cos(latitudeBRadians)
* Math.sin(longitudeDelta / 2.0d) * Math.sin(longitudeDelta / 2.0d);
double normalizedHaversine = Math.max(0.0d, Math.min(1.0d, haversine));
double angularDistance = 2.0d * Math.atan2(
Math.sqrt(normalizedHaversine),
Math.sqrt(1.0d - normalizedHaversine)
);
return EARTH_RADIUS_METERS * angularDistance;
}
public record ClusterResult(
Map<String, String> assignmentByObservationId,
Map<String, ClusterMembers> clusters
) {
}
public record ClusterMembers(
String clusterId,
double centroidLatitude,
double centroidLongitude,
List<DriverNdiLocationObservation> members
) {
public ClusterMembers {
members = members == null ? List.of() : List.copyOf(members);
}
}
private record RawCluster(
int numericId,
double centroidLatitude,
double centroidLongitude,
List<DriverNdiLocationObservation> members
) {
private static RawCluster of(int numericId, List<DriverNdiLocationObservation> members) {
double latitude = members.stream().mapToDouble(DriverNdiLocationObservation::latitude).average().orElse(0.0d);
double longitude = members.stream().mapToDouble(DriverNdiLocationObservation::longitude).average().orElse(0.0d);
return new RawCluster(numericId, latitude, longitude, List.copyOf(members));
}
}
}

View File

@ -0,0 +1,572 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassification;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationReason;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationResult;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeStatus;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationCluster;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationCorpusSnapshot;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationObservation;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeRestCoverageInterval;
import at.procon.eventhub.processing.driverworkingtime.service.DriverWorkingTimeReusableProjectionBuilder;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.springframework.stereotype.Service;
@Service
public class DriverNdiHomeClassificationService {
private static final String ALGORITHM_VERSION = "NDI_HOME_V1";
private final DriverWorkingTimeReusableProjectionBuilder projectionBuilder;
private final DriverNdiLocationCorpusCache locationCorpusCache;
private final DriverNdiDbscanClusterer clusterer;
private final EventHubProperties properties;
public DriverNdiHomeClassificationService(
DriverWorkingTimeReusableProjectionBuilder projectionBuilder,
DriverNdiLocationCorpusCache locationCorpusCache,
DriverNdiDbscanClusterer clusterer,
EventHubProperties properties
) {
this.projectionBuilder = projectionBuilder;
this.locationCorpusCache = locationCorpusCache;
this.clusterer = clusterer;
this.properties = properties;
}
public DriverNdiHomeClassificationScopeResult classifyPreparedInputs(
UnifiedRuntimeProcessingApiRequest request,
Map<String, DriverWorkingTimePreparedInput> preparedInputs
) {
LinkedHashMap<String, List<DriverWorkingTimeRestCoverageInterval>> evidenceByDriver = new LinkedHashMap<>();
if (preparedInputs != null) {
preparedInputs.entrySet().stream()
.filter(entry -> entry.getKey() != null
&& entry.getValue() != null
&& entry.getValue().processingInput() != null)
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> evidenceByDriver.put(
entry.getKey(),
projectionBuilder.buildAllNonDrivingIntervalCoverage(entry.getValue().processingInput())
));
}
return classifyEvidence(request, evidenceByDriver);
}
public DriverNdiHomeClassificationScopeResult classifyEvidence(
UnifiedRuntimeProcessingApiRequest request,
Map<String, List<DriverWorkingTimeRestCoverageInterval>> evidenceByDriver
) {
EventHubProperties.Processing settings = properties.getRuntimeProcessing();
long longThresholdSeconds = settings.getNdiLongMinutes() * 60L;
long veryLongThresholdSeconds = settings.getNdiVeryLongMinutes() * 60L;
double cardRemovalThresholdPercent = settings.getNdiCardRemovalPercent();
double visitShareThresholdPercent = settings.getNdiVisitSharePercent();
List<UUID> sourceSessionIds = sourceSessionIds(request);
UUID compositeSessionId = request == null ? null : request.compositeSessionId();
String tenantKey = normalizedTenantKey(request == null ? null : request.tenantKey());
String corpusKey = corpusKey(
request,
tenantKey,
settings.getNdiLocationCacheNamespace(),
settings.getNdiLongMinutes()
);
LinkedHashMap<String, List<DriverWorkingTimeRestCoverageInterval>> safeEvidenceByDriver = new LinkedHashMap<>();
if (evidenceByDriver != null) {
evidenceByDriver.entrySet().stream()
.filter(entry -> entry.getKey() != null && !entry.getKey().isBlank())
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> safeEvidenceByDriver.put(
entry.getKey(),
safeList(entry.getValue()).stream()
.filter(Objects::nonNull)
.sorted(Comparator
.comparing(DriverWorkingTimeRestCoverageInterval::startedAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(DriverWorkingTimeRestCoverageInterval::endedAt, Comparator.nullsLast(Comparator.naturalOrder())))
.toList()
));
}
List<DriverNdiLocationObservation> currentObservations = new ArrayList<>();
Map<String, DriverNdiLocationObservation> currentObservationByIntervalId = new HashMap<>();
for (Map.Entry<String, List<DriverWorkingTimeRestCoverageInterval>> entry : safeEvidenceByDriver.entrySet()) {
for (DriverWorkingTimeRestCoverageInterval evidence : entry.getValue()) {
if (evidence.durationSeconds() <= longThresholdSeconds) {
continue;
}
ResolvedPosition position = resolvePosition(evidence);
if (position == null) {
continue;
}
String intervalId = intervalId(evidence);
DriverNdiLocationObservation observation = new DriverNdiLocationObservation(
observationId(tenantKey, intervalId),
tenantKey,
observationSourceSessionIds(evidence.sessionId(), sourceSessionIds),
compositeSessionId,
entry.getKey(),
evidence.startedAt(),
evidence.endedAt(),
evidence.durationSeconds(),
position.latitude(),
position.longitude(),
position.source(),
position.geoEventId(),
position.geoEventDomain(),
position.geoDistanceSeconds(),
evidence.previousDrivingSourceIntervalId(),
evidence.nextDrivingSourceIntervalId(),
evidence.previousRegistrationKey(),
evidence.nextRegistrationKey(),
evidence.previousVehicleKey(),
evidence.nextVehicleKey()
);
currentObservations.add(observation);
currentObservationByIntervalId.put(intervalId, observation);
}
}
DriverNdiLocationCorpusSnapshot corpus = locationCorpusCache.merge(corpusKey, currentObservations);
DriverNdiDbscanClusterer.ClusterResult clusterResult = clusterer.cluster(
corpus.observations(),
settings.getNdiDbscanEpsMeters(),
settings.getNdiDbscanMinPoints()
);
Set<String> companyHomeClusterIds = determineCompanyHomeClusters(
clusterResult,
corpus.observations().size(),
visitShareThresholdPercent
);
LinkedHashMap<String, DriverNdiHomeClassificationResult> driverResults = new LinkedHashMap<>();
for (Map.Entry<String, List<DriverWorkingTimeRestCoverageInterval>> entry : safeEvidenceByDriver.entrySet()) {
String driverKey = entry.getKey();
List<DriverNdiLocationObservation> cachedDriverObservations = corpus.observations().stream()
.filter(observation -> Objects.equals(driverKey, observation.driverKey()))
.toList();
Set<String> driverHomeClusterIds = determineDriverHomeClusters(
driverKey,
clusterResult,
cachedDriverObservations.size(),
companyHomeClusterIds,
visitShareThresholdPercent
);
List<DriverNdiLocationCluster> driverClusters = buildDriverClusterView(
driverKey,
clusterResult,
corpus.observations().size(),
cachedDriverObservations.size(),
companyHomeClusterIds,
driverHomeClusterIds
);
List<DriverNdiHomeClassification> classifications = entry.getValue().stream()
.map(evidence -> classifyInterval(
evidence,
currentObservationByIntervalId.get(intervalId(evidence)),
clusterResult.assignmentByObservationId(),
companyHomeClusterIds,
driverHomeClusterIds,
longThresholdSeconds,
veryLongThresholdSeconds,
cardRemovalThresholdPercent
))
.toList();
int currentDriverObservationCount = (int) currentObservations.stream()
.filter(observation -> Objects.equals(driverKey, observation.driverKey()))
.count();
int cachedOtherDriverObservationCount = corpus.observations().size() - cachedDriverObservations.size();
List<String> notes = List.of(
"NDI classification used ordered rules from ndi_home_classification_en.md.",
"Location learning used " + cachedDriverObservations.size() + " cached observation(s) for the actual driver and "
+ cachedOtherDriverObservationCount + " observation(s) from other drivers.",
"DBSCAN parameters: eps=" + settings.getNdiDbscanEpsMeters() + "m, minPoints="
+ settings.getNdiDbscanMinPoints() + "."
);
driverResults.put(driverKey, new DriverNdiHomeClassificationResult(
driverKey,
classifications.size(),
currentDriverObservationCount,
cachedDriverObservations.size(),
cachedOtherDriverObservationCount,
companyHomeClusterIds.size(),
driverHomeClusterIds.size(),
classifications,
driverClusters,
companyHomeClusterIds,
driverHomeClusterIds,
notes
));
}
List<String> notes = new ArrayList<>();
notes.add("NDI location corpus " + corpus.corpusKey() + " contains " + corpus.observationCountAfterMerge()
+ " observation(s); " + corpus.addedObservationCount() + " were added by this execution.");
notes.add("Current execution contributed " + currentObservations.size() + " long NDI location observation(s) from "
+ safeEvidenceByDriver.size() + " driver(s).");
if (safeEvidenceByDriver.size() <= 1) {
notes.add("Only one driver was present in the current execution; company-home detection may still use cached observations from other drivers.");
}
return new DriverNdiHomeClassificationScopeResult(
corpus.corpusKey(),
currentObservations.size(),
corpus.observationCountAfterMerge(),
clusterResult.clusters().size(),
driverResults,
notes
);
}
private DriverNdiHomeClassification classifyInterval(
DriverWorkingTimeRestCoverageInterval evidence,
DriverNdiLocationObservation observation,
Map<String, String> assignmentByObservationId,
Set<String> companyHomeClusterIds,
Set<String> driverHomeClusterIds,
long longThresholdSeconds,
long veryLongThresholdSeconds,
double cardRemovalThresholdPercent
) {
ResolvedPosition position = resolvePosition(evidence);
String clusterId = observation == null ? null : assignmentByObservationId.get(observation.observationId());
boolean clusterNoise = DriverNdiDbscanClusterer.NOISE_CLUSTER_ID.equals(clusterId);
DriverNdiHomeStatus status;
DriverNdiHomeClassificationReason reason;
if (vehicleChanged(evidence)) {
status = DriverNdiHomeStatus.HOME;
reason = DriverNdiHomeClassificationReason.VEHICLE_CHANGED;
} else if (evidence.cardAbsentCoveragePercent() > cardRemovalThresholdPercent) {
status = DriverNdiHomeStatus.HOME;
reason = DriverNdiHomeClassificationReason.CARD_REMOVED_OVER_THRESHOLD;
} else if (evidence.durationSeconds() > veryLongThresholdSeconds) {
status = DriverNdiHomeStatus.HOME;
reason = DriverNdiHomeClassificationReason.REST_OVER_VERY_LONG_THRESHOLD;
} else if (position == null) {
if (evidence.durationSeconds() > longThresholdSeconds) {
status = DriverNdiHomeStatus.HOME;
reason = DriverNdiHomeClassificationReason.NO_POSITION_LONG_REST;
} else {
status = DriverNdiHomeStatus.NOT_HOME;
reason = DriverNdiHomeClassificationReason.NO_POSITION_SHORT_REST;
}
} else if (evidence.durationSeconds() > longThresholdSeconds) {
if (clusterId != null && companyHomeClusterIds.contains(clusterId)) {
status = DriverNdiHomeStatus.HOME;
reason = DriverNdiHomeClassificationReason.COMPANY_HOME_CLUSTER;
} else if (clusterId != null && driverHomeClusterIds.contains(clusterId)) {
status = DriverNdiHomeStatus.HOME;
reason = DriverNdiHomeClassificationReason.DRIVER_HOME_CLUSTER;
} else {
status = DriverNdiHomeStatus.NOT_HOME;
reason = DriverNdiHomeClassificationReason.LONG_REST_OUTSIDE_HOME_CLUSTER;
}
} else {
status = DriverNdiHomeStatus.NOT_HOME;
reason = DriverNdiHomeClassificationReason.SHORT_REST;
}
return new DriverNdiHomeClassification(
intervalId(evidence),
evidence,
position == null ? null : position.latitude(),
position == null ? null : position.longitude(),
position == null ? null : position.source(),
clusterId,
clusterNoise,
status,
reason
);
}
private Set<String> determineCompanyHomeClusters(
DriverNdiDbscanClusterer.ClusterResult clusterResult,
int totalObservationCount,
double visitShareThresholdPercent
) {
if (totalObservationCount <= 0) {
return Set.of();
}
LinkedHashSet<String> result = new LinkedHashSet<>();
clusterResult.clusters().values().stream()
.sorted(Comparator.comparing(DriverNdiDbscanClusterer.ClusterMembers::clusterId))
.forEach(cluster -> {
double share = cluster.members().size() * 100.0d / totalObservationCount;
if (share > visitShareThresholdPercent) {
result.add(cluster.clusterId());
}
});
return Collections.unmodifiableSet(result);
}
private Set<String> determineDriverHomeClusters(
String driverKey,
DriverNdiDbscanClusterer.ClusterResult clusterResult,
int driverObservationCount,
Set<String> companyHomeClusterIds,
double visitShareThresholdPercent
) {
if (driverObservationCount <= 0) {
return Set.of();
}
LinkedHashSet<String> result = new LinkedHashSet<>();
clusterResult.clusters().values().stream()
.sorted(Comparator.comparing(DriverNdiDbscanClusterer.ClusterMembers::clusterId))
.forEach(cluster -> {
long driverVisits = cluster.members().stream()
.filter(observation -> Objects.equals(driverKey, observation.driverKey()))
.count();
double share = driverVisits * 100.0d / driverObservationCount;
if (share > visitShareThresholdPercent && !companyHomeClusterIds.contains(cluster.clusterId())) {
result.add(cluster.clusterId());
}
});
return Collections.unmodifiableSet(result);
}
private List<DriverNdiLocationCluster> buildDriverClusterView(
String driverKey,
DriverNdiDbscanClusterer.ClusterResult clusterResult,
int totalObservationCount,
int driverObservationCount,
Set<String> companyHomeClusterIds,
Set<String> driverHomeClusterIds
) {
return clusterResult.clusters().values().stream()
.sorted(Comparator.comparing(DriverNdiDbscanClusterer.ClusterMembers::clusterId))
.map(cluster -> {
int actualDriverVisits = (int) cluster.members().stream()
.filter(observation -> Objects.equals(driverKey, observation.driverKey()))
.count();
int totalVisits = cluster.members().size();
int distinctDriverCount = (int) cluster.members().stream()
.map(DriverNdiLocationObservation::driverKey)
.filter(Objects::nonNull)
.distinct()
.count();
return new DriverNdiLocationCluster(
cluster.clusterId(),
cluster.centroidLatitude(),
cluster.centroidLongitude(),
totalVisits,
distinctDriverCount,
actualDriverVisits,
totalVisits - actualDriverVisits,
totalObservationCount == 0 ? 0.0d : totalVisits * 100.0d / totalObservationCount,
driverObservationCount == 0 ? 0.0d : actualDriverVisits * 100.0d / driverObservationCount,
companyHomeClusterIds.contains(cluster.clusterId()),
driverHomeClusterIds.contains(cluster.clusterId())
);
})
.toList();
}
private boolean vehicleChanged(DriverWorkingTimeRestCoverageInterval evidence) {
if (known(evidence.previousVehicleKey()) && known(evidence.nextVehicleKey())) {
return knownAndDifferent(evidence.previousVehicleKey(), evidence.nextVehicleKey());
}
return knownAndDifferent(evidence.previousRegistrationKey(), evidence.nextRegistrationKey());
}
private boolean known(String value) {
return value != null && !value.isBlank();
}
private boolean knownAndDifferent(String left, String right) {
return left != null && !left.isBlank()
&& right != null && !right.isBlank()
&& !left.trim().equalsIgnoreCase(right.trim());
}
private ResolvedPosition resolvePosition(DriverWorkingTimeRestCoverageInterval evidence) {
if (validCoordinates(evidence.beginLatitude(), evidence.beginLongitude())) {
return new ResolvedPosition(
evidence.beginLatitude(),
evidence.beginLongitude(),
"PREVIOUS_DRIVE_END",
evidence.beginGeoEventId(),
evidence.beginGeoEventDomain(),
evidence.beginGeoDistanceSeconds()
);
}
if (validCoordinates(evidence.endLatitude(), evidence.endLongitude())) {
return new ResolvedPosition(
evidence.endLatitude(),
evidence.endLongitude(),
"NEXT_DRIVE_START",
evidence.endGeoEventId(),
evidence.endGeoEventDomain(),
evidence.endGeoDistanceSeconds()
);
}
return null;
}
private boolean validCoordinates(Double latitude, Double longitude) {
return latitude != null
&& longitude != null
&& Double.isFinite(latitude)
&& Double.isFinite(longitude)
&& latitude >= -90.0d
&& latitude <= 90.0d
&& longitude >= -180.0d
&& longitude <= 180.0d;
}
private List<UUID> observationSourceSessionIds(
UUID evidenceSessionId,
List<UUID> requestSessionIds
) {
if (evidenceSessionId != null) {
return List.of(evidenceSessionId);
}
return requestSessionIds == null ? List.of() : requestSessionIds;
}
private List<UUID> sourceSessionIds(UnifiedRuntimeProcessingApiRequest request) {
if (request == null) {
return List.of();
}
LinkedHashSet<UUID> values = new LinkedHashSet<>();
if (request.sessionId() != null) {
values.add(request.sessionId());
}
if (request.sessionIds() != null) {
request.sessionIds().stream().filter(Objects::nonNull).sorted().forEach(values::add);
}
if (request.sourceInputs() != null) {
request.sourceInputs().stream().filter(Objects::nonNull).forEach(sourceInput -> {
if (sourceInput.sessionId() != null) {
values.add(sourceInput.sessionId());
}
if (sourceInput.sessionIds() != null) {
sourceInput.sessionIds().stream().filter(Objects::nonNull).sorted().forEach(values::add);
}
});
}
return List.copyOf(values);
}
private String corpusKey(
UnifiedRuntimeProcessingApiRequest request,
String tenantKey,
String cacheNamespace,
int longThresholdMinutes
) {
boolean fileSessionCorpus = isFileSessionOnly(request);
if (!fileSessionCorpus) {
return tenantKey + "|RUNTIME|" + ALGORITHM_VERSION + "|LONG=" + longThresholdMinutes;
}
String scope;
if (request != null && request.tenantKey() != null && !request.tenantKey().isBlank()) {
scope = "TENANT=" + tenantKey;
} else {
String normalizedNamespace = cacheNamespace == null || cacheNamespace.isBlank()
? "default"
: cacheNamespace.trim();
scope = "NAMESPACE=" + normalizedNamespace;
}
return scope + "|FILE_SESSION|" + ALGORITHM_VERSION + "|LONG=" + longThresholdMinutes;
}
private boolean isFileSessionOnly(UnifiedRuntimeProcessingApiRequest request) {
if (request == null) {
return false;
}
if (request.sourceInputs() != null && !request.sourceInputs().isEmpty()) {
List<at.procon.eventhub.processing.dto.UnifiedRuntimeSourceInputApiRequest> sourceInputs =
request.sourceInputs().stream().filter(Objects::nonNull).toList();
return !sourceInputs.isEmpty() && sourceInputs.stream().allMatch(sourceInput ->
sourceInput.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION);
}
if (request.sourceFamilies() != null && !request.sourceFamilies().isEmpty()) {
return request.sourceFamilies().stream().allMatch(sourceFamily ->
sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION);
}
return request.sessionId() != null
|| (request.sessionIds() != null && !request.sessionIds().isEmpty())
|| request.compositeSessionId() != null;
}
private String normalizedTenantKey(String tenantKey) {
return tenantKey == null || tenantKey.isBlank() ? "DEFAULT" : tenantKey.trim();
}
private String intervalId(DriverWorkingTimeRestCoverageInterval evidence) {
String raw = String.join("|",
nullSafe(evidence.driverKey()),
nullSafe(evidence.startedAt()),
nullSafe(evidence.endedAt()),
nullSafe(evidence.previousDrivingSourceIntervalId()),
nullSafe(evidence.nextDrivingSourceIntervalId()),
nullSafe(evidence.previousVehicleKey()),
nullSafe(evidence.nextVehicleKey()),
nullSafe(evidence.previousRegistrationKey()),
nullSafe(evidence.nextRegistrationKey())
);
return "NDI-" + shortHash(raw);
}
private String observationId(String tenantKey, String intervalId) {
return "NDI-LOC-" + shortHash(tenantKey + "|" + intervalId);
}
private String shortHash(String value) {
try {
byte[] digest = MessageDigest.getInstance("SHA-256")
.digest(value.getBytes(StandardCharsets.UTF_8));
StringBuilder result = new StringBuilder(24);
for (int index = 0; index < 12; index++) {
result.append(String.format(Locale.ROOT, "%02x", digest[index]));
}
return result.toString();
} catch (NoSuchAlgorithmException ex) {
throw new IllegalStateException("SHA-256 is not available", ex);
}
}
private String nullSafe(Object value) {
return value == null ? "" : value.toString();
}
private <T> List<T> safeList(List<T> values) {
return values == null ? List.of() : values;
}
private record ResolvedPosition(
double latitude,
double longitude,
String source,
String geoEventId,
String geoEventDomain,
Long geoDistanceSeconds
) {
}
}

View File

@ -0,0 +1,119 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationCorpusSnapshot;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationObservation;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.springframework.stereotype.Service;
@Service
public class DriverNdiLocationCorpusCache {
private final EventHubProperties properties;
private final ConcurrentMap<String, CachedCorpus> corpora = new ConcurrentHashMap<>();
public DriverNdiLocationCorpusCache(EventHubProperties properties) {
this.properties = properties;
}
public DriverNdiLocationCorpusSnapshot merge(
String corpusKey,
List<DriverNdiLocationObservation> newObservations
) {
String normalizedKey = normalizeCorpusKey(corpusKey);
Instant now = Instant.now();
Duration ttl = properties.getRuntimeProcessing().getNdiLocationCacheTtl();
int maxObservations = properties.getRuntimeProcessing().getNdiLocationCacheMaxObservations();
MergeMetrics metrics = new MergeMetrics();
CachedCorpus updated = corpora.compute(normalizedKey, (key, existing) -> {
LinkedHashMap<String, DriverNdiLocationObservation> observations = new LinkedHashMap<>();
if (existing != null && existing.expiresAt().isAfter(now)) {
observations.putAll(existing.observationsById());
}
metrics.before = observations.size();
for (DriverNdiLocationObservation observation : safeList(newObservations)) {
if (observation == null || observation.observationId() == null || observation.observationId().isBlank()) {
continue;
}
DriverNdiLocationObservation previous = observations.get(observation.observationId());
if (previous == null) {
observations.put(observation.observationId(), observation);
metrics.added++;
} else {
observations.put(observation.observationId(), previous.mergeProvenance(observation));
}
}
if (observations.size() > maxObservations) {
List<DriverNdiLocationObservation> retained = observations.values().stream()
.sorted(Comparator
.comparing(DriverNdiLocationObservation::endedAt, Comparator.nullsFirst(Comparator.naturalOrder()))
.reversed()
.thenComparing(DriverNdiLocationObservation::observationId))
.limit(maxObservations)
.toList();
observations.clear();
retained.stream()
.sorted(Comparator
.comparing(DriverNdiLocationObservation::startedAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(DriverNdiLocationObservation::observationId))
.forEach(value -> observations.put(value.observationId(), value));
}
return new CachedCorpus(
Map.copyOf(observations),
now,
now.plus(ttl)
);
});
List<DriverNdiLocationObservation> sorted = updated.observationsById().values().stream()
.sorted(Comparator
.comparing(DriverNdiLocationObservation::startedAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(DriverNdiLocationObservation::driverKey, Comparator.nullsLast(String::compareTo))
.thenComparing(DriverNdiLocationObservation::observationId))
.toList();
return new DriverNdiLocationCorpusSnapshot(
normalizedKey,
metrics.before,
metrics.added,
sorted.size(),
sorted,
updated.updatedAt(),
updated.expiresAt()
);
}
public void clear() {
corpora.clear();
}
private String normalizeCorpusKey(String corpusKey) {
return corpusKey == null || corpusKey.isBlank() ? "DEFAULT|NDI_HOME_V1" : corpusKey.trim();
}
private <T> List<T> safeList(List<T> values) {
return values == null ? List.of() : values;
}
private record CachedCorpus(
Map<String, DriverNdiLocationObservation> observationsById,
Instant updatedAt,
Instant expiresAt
) {
}
private static final class MergeMetrics {
private int before;
private int added;
}
}

View File

@ -111,6 +111,27 @@ public class DriverWorkingTimeReusableProjectionBuilder {
); );
} }
/**
* Reuses the complete card-absence and boundary-GNSS enrichment pipeline for every
* positive non-driving interval. Legacy daily/weekly-rest projections keep their
* configured minimum-rest threshold and are not changed by this method.
*/
public List<DriverWorkingTimeRestCoverageInterval> buildAllNonDrivingIntervalCoverage(
DriverWorkingTimeProcessingInput input
) {
if (input == null) {
return List.of();
}
DriverWorkingTimeDerivedProjectionBundle allNonDrivingProjection = buildDerivedProjectionBundle(
buildActivityIntervalInputEvents(input.activityIntervals()),
buildVehicleUsageIntervalInputEventsCommon(input.vehicleUsageIntervals()),
buildSupportGeoInputEventsCommon(input.sessionId(), input.supportEvidenceEvents()),
input.significantDrivingMinutes(),
0
);
return allNonDrivingProjection.dailyWeeklyRestCandidateCoverageIntervals();
}
private DriverWorkingTimeDerivedProjectionBundle buildDerivedProjectionBundle( private DriverWorkingTimeDerivedProjectionBundle buildDerivedProjectionBundle(
List<Map<String, Object>> activityInputEvents, List<Map<String, Object>> activityInputEvents,
List<Map<String, Object>> vehicleUsageInputEvents, List<Map<String, Object>> vehicleUsageInputEvents,
@ -910,7 +931,7 @@ public class DriverWorkingTimeReusableProjectionBuilder {
) )
.replace( .replace(
"${MINIMUM_REST_PERIOD_THRESHOLD_SECONDS}", "${MINIMUM_REST_PERIOD_THRESHOLD_SECONDS}",
Long.toString(Math.max(1, minimumRestPeriodMinutes) * 60L) Long.toString(Math.max(0, minimumRestPeriodMinutes) * 60L)
) )
.replace( .replace(
"${REST_GEO_LOOKBACK_SECONDS}", "${REST_GEO_LOOKBACK_SECONDS}",

View File

@ -1,8 +1,10 @@
package at.procon.eventhub.processing.dto; package at.procon.eventhub.processing.dto;
import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationResult;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto; import com.fasterxml.jackson.annotation.JsonInclude;
import java.util.List; import java.util.List;
public record UnifiedRuntimeDerivedProjectionResultDto( public record UnifiedRuntimeDerivedProjectionResultDto(
@ -15,7 +17,9 @@ public record UnifiedRuntimeDerivedProjectionResultDto(
DriverWorkingTimeProcessingResultDto projection, DriverWorkingTimeProcessingResultDto projection,
List<String> notes, List<String> notes,
RuntimeSupportEvidenceNormalizationDebugDto supportEvidenceNormalization, RuntimeSupportEvidenceNormalizationDebugDto supportEvidenceNormalization,
RuntimeDriverPartitionDebugDto partitionDebug RuntimeDriverPartitionDebugDto partitionDebug,
@JsonInclude(JsonInclude.Include.NON_NULL)
DriverNdiHomeClassificationResult ndiHomeClassification
) { ) {
public UnifiedRuntimeDerivedProjectionResultDto { public UnifiedRuntimeDerivedProjectionResultDto {
discoveredVehicles = discoveredVehicles == null ? List.of() : List.copyOf(discoveredVehicles); discoveredVehicles = discoveredVehicles == null ? List.of() : List.copyOf(discoveredVehicles);
@ -42,6 +46,7 @@ public record UnifiedRuntimeDerivedProjectionResultDto(
projection, projection,
notes, notes,
null, null,
null,
null null
); );
} }
@ -67,11 +72,37 @@ public record UnifiedRuntimeDerivedProjectionResultDto(
projection, projection,
notes, notes,
supportEvidenceNormalization, supportEvidenceNormalization,
null,
null null
); );
} }
public UnifiedRuntimeDerivedProjectionResultDto(
UnifiedRuntimeProcessingRequest request,
int driverSeedEventCount,
int discoveredVehicleCount,
int expandedVehicleEventCount,
int mergedEventCount,
List<UnifiedDiscoveredVehicleRef> discoveredVehicles,
DriverWorkingTimeProcessingResultDto projection,
List<String> notes,
RuntimeSupportEvidenceNormalizationDebugDto supportEvidenceNormalization,
RuntimeDriverPartitionDebugDto partitionDebug
) {
this(
request,
driverSeedEventCount,
discoveredVehicleCount,
expandedVehicleEventCount,
mergedEventCount,
discoveredVehicles,
projection,
notes,
supportEvidenceNormalization,
partitionDebug,
null
);
}
public UnifiedRuntimeDerivedProjectionResultDto withPartitionDebug(RuntimeDriverPartitionDebugDto debug) { public UnifiedRuntimeDerivedProjectionResultDto withPartitionDebug(RuntimeDriverPartitionDebugDto debug) {
return new UnifiedRuntimeDerivedProjectionResultDto( return new UnifiedRuntimeDerivedProjectionResultDto(
@ -84,7 +115,8 @@ public record UnifiedRuntimeDerivedProjectionResultDto(
projection, projection,
notes, notes,
supportEvidenceNormalization, supportEvidenceNormalization,
debug debug,
ndiHomeClassification
); );
} }
} }

View File

@ -1,6 +1,7 @@
package at.procon.eventhub.processing.eventprocessing.module; package at.procon.eventhub.processing.eventprocessing.module;
import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto; import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput;
import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDto; import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDto;
import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto; import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto;
@ -72,7 +73,8 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE, DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION
), ),
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"), Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval",
"Map<String, DriverWorkingTimePreparedInput>"),
Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto") Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto")
); );
} }
@ -86,6 +88,8 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context); UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context);
UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context); UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context);
Map<String, DriverWorkingTimePreparedInput> preparedInputs = preparedInputs(context); Map<String, DriverWorkingTimePreparedInput> preparedInputs = preparedInputs(context);
DriverNdiHomeClassificationScopeResult ndiHomeClassificationScope =
optionalNdiHomeClassificationScope(context);
LinkedHashMap<String, UnifiedRuntimeDerivedProjectionResultDto> driverResults = new LinkedHashMap<>(); LinkedHashMap<String, UnifiedRuntimeDerivedProjectionResultDto> driverResults = new LinkedHashMap<>();
List<String> warnings = new ArrayList<>(); List<String> warnings = new ArrayList<>();
@ -119,13 +123,19 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
projection, projection,
projection.notes(), projection.notes(),
preparedInput.partition().supportEvidenceNormalization(), preparedInput.partition().supportEvidenceNormalization(),
preparedInput.partition().partitionDebug() preparedInput.partition().partitionDebug(),
ndiHomeClassificationScope == null
? null
: ndiHomeClassificationScope.resultForDriver(preparedInput.driverKey())
)); ));
} }
List<String> notes = new ArrayList<>(broadBundle.notes()); List<String> notes = new ArrayList<>(broadBundle.notes());
notes.add("Runtime driver working-time processing used module-to-module dataflow for event assembly, activity intervalization, vehicle-usage intervalization, evidence attachment, support evidence normalization, and final derived projections."); notes.add("Runtime driver working-time processing used module-to-module dataflow for event assembly, activity intervalization, vehicle-usage intervalization, evidence attachment, support evidence normalization, and final derived projections.");
notes.add("Selected driver partitions: " + driverResults.size() + "."); notes.add("Selected driver partitions: " + driverResults.size() + ".");
if (ndiHomeClassificationScope != null) {
notes.addAll(ndiHomeClassificationScope.notes());
}
UnifiedRuntimeDriverWorkingTimeScopeResultDto result = new UnifiedRuntimeDriverWorkingTimeScopeResultDto( UnifiedRuntimeDriverWorkingTimeScopeResultDto result = new UnifiedRuntimeDriverWorkingTimeScopeResultDto(
broadBundle.request(), broadBundle.request(),
@ -194,6 +204,17 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
+ " requires previous result " + DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY + "."); + " requires previous result " + DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY + ".");
} }
private DriverNdiHomeClassificationScopeResult optionalNdiHomeClassificationScope(
RuntimeProcessingModuleContext context
) {
RuntimeProcessingModuleResult result =
context.previousResults().get(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION);
if (result != null && result.output() instanceof DriverNdiHomeClassificationScopeResult scopeResult) {
return scopeResult;
}
return null;
}
private UnifiedRuntimeProcessingApiRequest scopeRequest(RuntimeProcessingModuleContext context) { private UnifiedRuntimeProcessingApiRequest scopeRequest(RuntimeProcessingModuleContext context) {
Object value = context.attributes().get("runtimeScopeApiRequest"); Object value = context.attributes().get("runtimeScopeApiRequest");
if (value instanceof UnifiedRuntimeProcessingApiRequest request) { if (value instanceof UnifiedRuntimeProcessingApiRequest request) {

View File

@ -10,6 +10,7 @@ public final class DriverWorkingTimeModuleKeys {
public static final String VEHICLE_USAGE_MERGE = "vehicle-usage-merge"; public static final String VEHICLE_USAGE_MERGE = "vehicle-usage-merge";
public static final String VEHICLE_EVIDENCE_ATTACHMENT = "vehicle-evidence-attachment"; public static final String VEHICLE_EVIDENCE_ATTACHMENT = "vehicle-evidence-attachment";
public static final String SUPPORT_EVIDENCE_NORMALIZATION = "support-evidence-normalization"; public static final String SUPPORT_EVIDENCE_NORMALIZATION = "support-evidence-normalization";
public static final String NDI_HOME_CLASSIFICATION = "ndi-home-classification";
public static final String DRIVING_DERIVED_PROJECTIONS = "driving-derived-projections"; public static final String DRIVING_DERIVED_PROJECTIONS = "driving-derived-projections";
private DriverWorkingTimeModuleKeys() { private DriverWorkingTimeModuleKeys() {

View File

@ -0,0 +1,79 @@
package at.procon.eventhub.processing.eventprocessing.module;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.service.DriverNdiHomeClassificationService;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.stereotype.Component;
@Component
public class NdiHomeClassificationModule implements RuntimeProcessingModule {
private final DriverNdiHomeClassificationService classificationService;
public NdiHomeClassificationModule(DriverNdiHomeClassificationService classificationService) {
this.classificationService = classificationService;
}
@Override
public String moduleKey() {
return DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION;
}
@Override
public RuntimeProcessingModuleDescriptorDto descriptor() {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"NDI home classification",
"Builds enriched non-driving intervals, accumulates long-rest locations in a driver-aware cache, applies Haversine DBSCAN, and classifies HOME/NOT_HOME using the ordered NDI rules.",
"ESPER+JAVA",
Set.of(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION),
Set.of("Map<String, DriverWorkingTimePreparedInput>"),
Set.of("DriverNdiHomeClassificationScopeResult")
);
}
@Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
Map<String, DriverWorkingTimePreparedInput> preparedInputs = preparedInputs(context);
UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context);
DriverNdiHomeClassificationScopeResult result = classificationService.classifyPreparedInputs(
scopeRequest,
preparedInputs
);
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("corpusKey", result.corpusKey());
metadata.put("currentObservationCount", result.currentObservationCount());
metadata.put("cachedObservationCount", result.cachedObservationCount());
metadata.put("clusterCount", result.clusterCount());
metadata.put("driverResultCount", result.driverResults().size());
return new RuntimeProcessingModuleResult(
moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS,
result,
metadata,
java.util.List.of()
);
}
private UnifiedRuntimeProcessingApiRequest scopeRequest(RuntimeProcessingModuleContext context) {
Object value = context.attributes().get("runtimeScopeApiRequest");
if (value instanceof UnifiedRuntimeProcessingApiRequest request) {
return request;
}
return context.request().sourceSelection();
}
@SuppressWarnings("unchecked")
private Map<String, DriverWorkingTimePreparedInput> preparedInputs(RuntimeProcessingModuleContext context) {
Object output = context.requireResult(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION).output();
if (output instanceof Map<?, ?> map) {
return (Map<String, DriverWorkingTimePreparedInput>) map;
}
return Map.of();
}
}

View File

@ -0,0 +1,121 @@
package at.procon.eventhub.processing.eventprocessing.plan;
import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeModuleKeys;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.stereotype.Component;
/**
* Dedicated public processing-plan entry point for NDI HOME/NOT_HOME classification.
*
* <p>The implementation deliberately delegates to the shared driver-working-time pipeline so
* activity intervalization, vehicle-usage reconciliation, support evidence normalization, and
* reusable NDI enrichment keep one source-neutral implementation.</p>
*/
@Component
public class DriverHomeClassificationRuntimeProcessingPlan implements RuntimeProcessingPlan {
public static final String PLAN_KEY = "driver-home-classification-v1";
private final DriverWorkingTimeRuntimeProcessingPlan delegate;
public DriverHomeClassificationRuntimeProcessingPlan(DriverWorkingTimeRuntimeProcessingPlan delegate) {
this.delegate = delegate;
}
@Override
public String processingPlanKey() {
return PLAN_KEY;
}
@Override
public RuntimeEventPartitioningStrategy defaultPartitioningStrategy() {
return delegate.defaultPartitioningStrategy();
}
@Override
public String displayName() {
return "Driver NDI home classification";
}
@Override
public String description() {
return "Builds enriched non-driving intervals, learns company and driver home locations "
+ "from one or more tachograph file sessions, and applies ordered HOME/NOT_HOME rules.";
}
@Override
public List<RuntimeEventPartitioningStrategy> supportedPartitioningStrategies() {
return delegate.supportedPartitioningStrategies();
}
@Override
public List<RuntimeProcessingModuleDescriptorDto> modules() {
return delegate.modules();
}
@Override
public Set<String> requiredParameters() {
return delegate.requiredParameters();
}
@Override
public Set<String> optionalParameters() {
return delegate.optionalParameters();
}
@Override
public RuntimeProcessingExecutionResultDto execute(RuntimeProcessingExecutionApiRequest request) {
RuntimeProcessingExecutionApiRequest delegatedRequest = prepareDelegatedRequest(request);
RuntimeProcessingExecutionResultDto result = delegate.execute(delegatedRequest);
return new RuntimeProcessingExecutionResultDto(
PLAN_KEY,
result.executedModules(),
result.partitioningStrategy(),
result.request(),
result.inputEventCount(),
result.selectedPartitionCount(),
result.discoveredVehicleCount(),
result.discoveredVehicles(),
result.moduleResults(),
result.partitionResults(),
result.notes(),
result.warnings()
);
}
RuntimeProcessingExecutionApiRequest prepareDelegatedRequest(RuntimeProcessingExecutionApiRequest request) {
List<String> modules = new ArrayList<>();
if (request.modules() != null) {
request.modules().stream()
.filter(value -> value != null && !value.isBlank())
.map(String::trim)
.forEach(modules::add);
}
modules.removeIf(moduleKey -> DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION.equals(moduleKey)
|| DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS.equals(moduleKey));
modules.add(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION);
modules.add(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS);
Map<String, Object> parameters = new LinkedHashMap<>();
if (request.parameters() != null) {
parameters.putAll(request.parameters());
}
parameters.putIfAbsent(
DriverWorkingTimeRuntimeProcessingPlan.NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER,
true
);
return new RuntimeProcessingExecutionApiRequest(
DriverWorkingTimeRuntimeProcessingPlan.PLAN_KEY,
request.sourceSelection(),
request.partitioning(),
List.copyOf(modules),
parameters
);
}
}

View File

@ -20,6 +20,7 @@ import at.procon.eventhub.processing.eventprocessing.module.SupportEvidenceNorma
import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeDerivedProjectionsModule; import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeDerivedProjectionsModule;
import at.procon.eventhub.processing.service.RuntimeDriverWorkingTimeScopeProcessingService; import at.procon.eventhub.processing.service.RuntimeDriverWorkingTimeScopeProcessingService;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -37,6 +38,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
public static final String INCLUDE_PARTITION_METADATA_PARAMETER = "includePartitionMetadata"; public static final String INCLUDE_PARTITION_METADATA_PARAMETER = "includePartitionMetadata";
public static final String INCLUDE_PARTITION_MODULE_RESULTS_PARAMETER = "includePartitionModuleResults"; public static final String INCLUDE_PARTITION_MODULE_RESULTS_PARAMETER = "includePartitionModuleResults";
public static final String INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER = "includeSupportEvidenceNormalization"; public static final String INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER = "includeSupportEvidenceNormalization";
public static final String NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER = "ndiLearnAllFileSessionDrivers";
private final RuntimeProcessingPipelineExecutor pipelineExecutor; private final RuntimeProcessingPipelineExecutor pipelineExecutor;
private final boolean includeRuntimeEventAssemblyModule; private final boolean includeRuntimeEventAssemblyModule;
@ -194,10 +196,19 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
Set.of("Map<String, DriverWorkingTimeDriverPartition>", "DriverActivityIntervalEvent"), Set.of("Map<String, DriverWorkingTimeDriverPartition>", "DriverActivityIntervalEvent"),
Set.of("RuntimeSupportEvidenceNormalizationDebugDto") Set.of("RuntimeSupportEvidenceNormalizationDebugDto")
), ),
new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION,
"NDI home classification",
"Builds enriched non-driving intervals, accumulates long-rest locations across file sessions, clusters them with Haversine DBSCAN, and applies ordered HOME/NOT_HOME rules.",
"ESPER+JAVA",
Set.of(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION),
Set.of("Map<String, DriverWorkingTimePreparedInput>"),
Set.of("DriverNdiHomeClassificationScopeResult")
),
new RuntimeProcessingModuleDescriptorDto( new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS,
"Driving-derived projections", "Driving-derived projections",
"Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates.", "Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates; attaches NDI HOME/NOT_HOME classification when that optional module was requested.",
"ESPER+JAVA", "ESPER+JAVA",
Set.of( Set.of(
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS, DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS,
@ -205,9 +216,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION
), ),
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"), Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"),
Set.of("DriverWorkingTimeProcessingResultDto") Set.of("DriverWorkingTimeProcessingResultDto", "DriverNdiHomeClassificationResult")
) )
)); ));
if (!includeRuntimeEventAssemblyModule) {
descriptors.removeIf(descriptor -> DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION
.equals(descriptor.moduleKey()));
}
return List.copyOf(descriptors); return List.copyOf(descriptors);
} }
@ -225,6 +240,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"includeDrivingIntervals", "includeDrivingIntervals",
"includePartitionDebug", "includePartitionDebug",
INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER, INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER,
NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER,
"eventMixingMode" "eventMixingMode"
); );
} }
@ -266,11 +282,24 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
request.partitioning(), request.partitioning(),
request.parameters() request.parameters()
); );
UnifiedRuntimeProcessingApiRequest scopeRequest = applyExecutionRequest( UnifiedRuntimeProcessingApiRequest requestedScopeRequest = applyExecutionRequest(
request.sourceSelection(), request.sourceSelection(),
request.partitioning(), request.partitioning(),
request.parameters() request.parameters()
); );
boolean ndiHomeClassificationRequested = requestsNdiHomeClassification(request.modules());
boolean learnAllFileSessionDrivers = booleanParameter(
request.parameters(),
NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER,
false
);
UnifiedRuntimeProcessingApiRequest scopeRequest = expandFileSessionLearningScope(
requestedScopeRequest,
ndiHomeClassificationRequested && learnAllFileSessionDrivers
);
Set<String> requestedOutputDriverKeys = requestedOutputDriverKeys(requestedScopeRequest);
boolean filterOutputDrivers = !Boolean.TRUE.equals(requestedScopeRequest.includeAllDrivers())
&& !requestedOutputDriverKeys.isEmpty();
Map<String, Object> attributes = new LinkedHashMap<>(); Map<String, Object> attributes = new LinkedHashMap<>();
attributes.put("runtimeScopeApiRequest", scopeRequest); attributes.put("runtimeScopeApiRequest", scopeRequest);
@ -295,6 +324,9 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
Map<String, RuntimeEventProcessingPartitionResultDto> partitionResults = new LinkedHashMap<>(); Map<String, RuntimeEventProcessingPartitionResultDto> partitionResults = new LinkedHashMap<>();
workingTimeResult.driverResults().forEach((driverKey, driverResult) -> { workingTimeResult.driverResults().forEach((driverKey, driverResult) -> {
if (filterOutputDrivers && !requestedOutputDriverKeys.contains(driverKey)) {
return;
}
UnifiedRuntimeDerivedProjectionResultDto shapedDriverResult = shapeDriverResult( UnifiedRuntimeDerivedProjectionResultDto shapedDriverResult = shapeDriverResult(
driverResult, driverResult,
includeSupportEvidenceNormalization, includeSupportEvidenceNormalization,
@ -316,18 +348,31 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
); );
}); });
boolean learningScopeExpanded = !scopeRequest.equals(requestedScopeRequest);
List<String> resultNotes = new java.util.ArrayList<>(workingTimeResult.notes());
if (learningScopeExpanded) {
resultNotes.add("NDI location learning expanded the selected tachograph file sessions to all drivers; "
+ "the response remains filtered to the originally requested driver partition(s).");
}
List<UnifiedDiscoveredVehicleRef> responseVehicles = learningScopeExpanded
? selectedDiscoveredVehicles(partitionResults)
: workingTimeResult.discoveredVehicles();
int responseInputEventCount = learningScopeExpanded
? selectedInputEventCount(partitionResults)
: workingTimeResult.inputEventCount();
return new RuntimeProcessingExecutionResultDto( return new RuntimeProcessingExecutionResultDto(
processingPlanKey(), processingPlanKey(),
executedModules, executedModules,
RuntimeEventPartitioningStrategy.DRIVER, RuntimeEventPartitioningStrategy.DRIVER,
workingTimeResult.request(), requestedScopeRequest.toRuntimeRequest(),
workingTimeResult.inputEventCount(), responseInputEventCount,
workingTimeResult.selectedDriverCount(), partitionResults.size(),
workingTimeResult.discoveredVehicleCount(), responseVehicles.size(),
workingTimeResult.discoveredVehicles(), responseVehicles,
includeExecutionModuleResults ? sanitizeExecutionModuleResults(moduleResults) : Map.of(), includeExecutionModuleResults ? sanitizeExecutionModuleResults(moduleResults) : Map.of(),
partitionResults, partitionResults,
workingTimeResult.notes(), resultNotes,
workingTimeResult.warnings() workingTimeResult.warnings()
); );
} }
@ -352,6 +397,45 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
+ (output == null ? "null" : output.getClass().getName())); + (output == null ? "null" : output.getClass().getName()));
} }
private List<UnifiedDiscoveredVehicleRef> selectedDiscoveredVehicles(
Map<String, RuntimeEventProcessingPartitionResultDto> partitionResults
) {
List<UnifiedDiscoveredVehicleRef> vehicles = new java.util.ArrayList<>();
for (RuntimeEventProcessingPartitionResultDto partition : partitionResults.values()) {
if (!(partition.result() instanceof UnifiedRuntimeDerivedProjectionResultDto driverResult)) {
continue;
}
for (UnifiedDiscoveredVehicleRef candidate : driverResult.discoveredVehicles()) {
boolean merged = false;
for (int index = 0; index < vehicles.size(); index++) {
UnifiedDiscoveredVehicleRef existing = vehicles.get(index);
if (existing.matches(candidate)) {
vehicles.set(index, existing.merge(candidate));
merged = true;
break;
}
}
if (!merged) {
vehicles.add(candidate);
}
}
}
vehicles.sort(java.util.Comparator.comparing(UnifiedDiscoveredVehicleRef::stableKey));
return List.copyOf(vehicles);
}
private int selectedInputEventCount(
Map<String, RuntimeEventProcessingPartitionResultDto> partitionResults
) {
long count = partitionResults.values().stream()
.map(RuntimeEventProcessingPartitionResultDto::result)
.filter(UnifiedRuntimeDerivedProjectionResultDto.class::isInstance)
.map(UnifiedRuntimeDerivedProjectionResultDto.class::cast)
.mapToLong(UnifiedRuntimeDerivedProjectionResultDto::mergedEventCount)
.sum();
return (int) Math.min(Integer.MAX_VALUE, Math.max(0L, count));
}
private Map<String, RuntimeProcessingModuleResult> sanitizeExecutionModuleResults( private Map<String, RuntimeProcessingModuleResult> sanitizeExecutionModuleResults(
Map<String, RuntimeProcessingModuleResult> moduleResults Map<String, RuntimeProcessingModuleResult> moduleResults
) { ) {
@ -403,6 +487,24 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
List.of() List.of()
) )
); );
if (driverResult.ndiHomeClassification() != null) {
results.put(
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION,
new RuntimeProcessingModuleResult(
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION,
RuntimeProcessingModuleStatus.SUCCESS,
driverResult.ndiHomeClassification(),
Map.of(
"nonDrivingIntervalCount", driverResult.ndiHomeClassification().nonDrivingIntervalCount(),
"cachedActualDriverObservationCount", driverResult.ndiHomeClassification().cachedActualDriverObservationCount(),
"cachedOtherDriverObservationCount", driverResult.ndiHomeClassification().cachedOtherDriverObservationCount(),
"companyHomeClusterCount", driverResult.ndiHomeClassification().companyHomeClusterCount(),
"driverHomeClusterCount", driverResult.ndiHomeClassification().driverHomeClusterCount()
),
List.of()
)
);
}
results.put( results.put(
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS,
new RuntimeProcessingModuleResult( new RuntimeProcessingModuleResult(
@ -433,6 +535,16 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
if (driverResult.partitionDebug() != null) { if (driverResult.partitionDebug() != null) {
metadata.put("partitionDebug", driverResult.partitionDebug()); metadata.put("partitionDebug", driverResult.partitionDebug());
} }
if (driverResult.ndiHomeClassification() != null) {
metadata.put("ndiHomeClassification", Map.of(
"nonDrivingIntervalCount", driverResult.ndiHomeClassification().nonDrivingIntervalCount(),
"currentLongLocationObservationCount", driverResult.ndiHomeClassification().currentLongLocationObservationCount(),
"cachedActualDriverObservationCount", driverResult.ndiHomeClassification().cachedActualDriverObservationCount(),
"cachedOtherDriverObservationCount", driverResult.ndiHomeClassification().cachedOtherDriverObservationCount(),
"companyHomeClusterCount", driverResult.ndiHomeClassification().companyHomeClusterCount(),
"driverHomeClusterCount", driverResult.ndiHomeClassification().driverHomeClusterCount()
));
}
return metadata; return metadata;
} }
@ -451,10 +563,89 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
driverResult.projection(), driverResult.projection(),
driverResult.notes(), driverResult.notes(),
includeSupportEvidenceNormalization ? driverResult.supportEvidenceNormalization() : null, includeSupportEvidenceNormalization ? driverResult.supportEvidenceNormalization() : null,
includePartitionDebug ? driverResult.partitionDebug() : null includePartitionDebug ? driverResult.partitionDebug() : null,
driverResult.ndiHomeClassification()
); );
} }
UnifiedRuntimeProcessingApiRequest expandFileSessionLearningScope(
UnifiedRuntimeProcessingApiRequest request,
boolean enabled
) {
if (!enabled || !includeRuntimeEventAssemblyModule || request == null || !isFileSessionOnly(request)) {
return request;
}
Set<String> explicitDriverKeys = requestedOutputDriverKeys(request);
if (Boolean.TRUE.equals(request.includeAllDrivers()) && explicitDriverKeys.isEmpty()) {
return request;
}
// Without explicit canonical driver keys the response cannot be filtered safely after
// broadening the internal learning scope. Keep alternate card/source selectors unchanged.
if (explicitDriverKeys.isEmpty()) {
return request;
}
return new UnifiedRuntimeProcessingApiRequest(
request.sessionId(),
request.sessionIds(),
request.compositeSessionId(),
request.tenantKey(),
request.sourceFamilies(),
request.eventBackend(),
request.sourceKinds(),
null,
Set.of(),
true,
request.vehicleKeys(),
request.includeAllVehicles(),
null,
null,
null,
request.occurredFrom(),
request.occurredTo(),
request.expandVehicleEvents(),
request.vehicleExpansionPaddingMinutes(),
request.includeIntersectingIntervals(),
request.significantDrivingMinutes(),
request.minimumRestPeriodMinutes(),
request.includeActivityIntervals(),
request.includeDrivingIntervals(),
request.sourceInputs()
);
}
private boolean isFileSessionOnly(UnifiedRuntimeProcessingApiRequest request) {
if (request.sourceInputs() != null && !request.sourceInputs().isEmpty()) {
List<at.procon.eventhub.processing.dto.UnifiedRuntimeSourceInputApiRequest> sourceInputs =
request.sourceInputs().stream().filter(java.util.Objects::nonNull).toList();
return !sourceInputs.isEmpty() && sourceInputs.stream().allMatch(input -> input.sourceFamily()
== at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION);
}
if (request.sourceFamilies() != null && !request.sourceFamilies().isEmpty()) {
return request.sourceFamilies().stream().allMatch(sourceFamily -> sourceFamily
== at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION);
}
return request.sessionId() != null
|| (request.sessionIds() != null && !request.sessionIds().isEmpty())
|| request.compositeSessionId() != null;
}
private Set<String> requestedOutputDriverKeys(UnifiedRuntimeProcessingApiRequest request) {
java.util.LinkedHashSet<String> result = new java.util.LinkedHashSet<>();
if (request == null) {
return Set.of();
}
if (request.driverKeys() != null) {
request.driverKeys().stream()
.filter(value -> value != null && !value.isBlank())
.map(String::trim)
.forEach(result::add);
}
if (request.driverKey() != null && !request.driverKey().isBlank()) {
result.add(request.driverKey().trim());
}
return Set.copyOf(result);
}
public UnifiedRuntimeProcessingApiRequest applyExecutionRequest( public UnifiedRuntimeProcessingApiRequest applyExecutionRequest(
UnifiedRuntimeProcessingApiRequest sourceSelection, UnifiedRuntimeProcessingApiRequest sourceSelection,
RuntimeEventPartitioningApiRequest partitioning, RuntimeEventPartitioningApiRequest partitioning,
@ -567,15 +758,37 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
requested.put(module.trim(), module.trim()); requested.put(module.trim(), module.trim());
} }
} }
requested.putIfAbsent(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, boolean includeNdiHomeClassification =
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS); requested.remove(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION) != null;
requested.remove(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS);
if (includeNdiHomeClassification) {
requested.put(
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION,
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION
);
}
requested.put(
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS,
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS
);
return List.copyOf(requested.values()); return List.copyOf(requested.values());
} }
return modules().stream() return modules().stream()
.map(RuntimeProcessingModuleDescriptorDto::moduleKey) .map(RuntimeProcessingModuleDescriptorDto::moduleKey)
.filter(moduleKey -> !DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION.equals(moduleKey))
.toList(); .toList();
} }
private boolean requestsNdiHomeClassification(List<String> requestedModules) {
if (requestedModules == null) {
return false;
}
return requestedModules.stream()
.filter(java.util.Objects::nonNull)
.map(String::trim)
.anyMatch(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION::equals);
}
private boolean booleanParameter(Map<String, Object> parameters, String key, boolean fallback) { private boolean booleanParameter(Map<String, Object> parameters, String key, boolean fallback) {
if (parameters == null || !parameters.containsKey(key)) { if (parameters == null || !parameters.containsKey(key)) {
return fallback; return fallback;

View File

@ -128,6 +128,15 @@ eventhub:
processing: processing:
operating-split-idle-hours: 7 operating-split-idle-hours: 7
significant-driving-minutes: 3 significant-driving-minutes: 3
ndi-long-minutes: 450
ndi-very-long-minutes: 1440
ndi-card-removal-percent: 80
ndi-visit-share-percent: 25
ndi-dbscan-eps-meters: 150
ndi-dbscan-min-points: 3
ndi-location-cache-ttl: 4h
ndi-location-cache-max-observations: 100000
ndi-location-cache-namespace: default
merge-gap-seconds: 0 merge-gap-seconds: 0
gap-detection-tolerance-seconds: 0 gap-detection-tolerance-seconds: 0
timeline-input-mode: events timeline-input-mode: events

View File

@ -0,0 +1,63 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationObservation;
import java.time.OffsetDateTime;
import java.util.List;
import org.junit.jupiter.api.Test;
class DriverNdiDbscanClustererTest {
private final DriverNdiDbscanClusterer clusterer = new DriverNdiDbscanClusterer();
@Test
void clustersThreeNearbyLocationsAndKeepsRemoteLocationAsNoise() {
List<DriverNdiLocationObservation> observations = List.of(
observation("A", "D1", 48.20820, 16.37380),
observation("B", "D2", 48.20835, 16.37390),
observation("C", "D3", 48.20810, 16.37405),
observation("D", "D4", 48.25000, 16.45000)
);
DriverNdiDbscanClusterer.ClusterResult result = clusterer.cluster(observations, 150.0d, 3);
assertThat(result.clusters()).hasSize(1);
assertThat(result.assignmentByObservationId().get("A"))
.isEqualTo(result.assignmentByObservationId().get("B"))
.isEqualTo(result.assignmentByObservationId().get("C"));
assertThat(result.assignmentByObservationId().get("D"))
.isEqualTo(DriverNdiDbscanClusterer.NOISE_CLUSTER_ID);
}
private DriverNdiLocationObservation observation(
String observationId,
String driverKey,
double latitude,
double longitude
) {
OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z");
return new DriverNdiLocationObservation(
observationId,
"TENANT",
List.of(),
null,
driverKey,
start,
start.plusHours(8),
8 * 3600L,
latitude,
longitude,
"PREVIOUS_DRIVE_END",
null,
null,
null,
"P-" + observationId,
"N-" + observationId,
"REG-1",
"REG-1",
"VIN-1",
"VIN-1"
);
}
}

View File

@ -0,0 +1,301 @@
package at.procon.eventhub.processing.driverworkingtime.homeclassification.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationReason;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationResult;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeStatus;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeRestCoverageInterval;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import java.time.OffsetDateTime;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class DriverNdiHomeClassificationServiceTest {
private DriverNdiLocationCorpusCache cache;
private DriverNdiHomeClassificationService service;
@BeforeEach
void setUp() {
EventHubProperties properties = new EventHubProperties();
cache = new DriverNdiLocationCorpusCache(properties);
service = new DriverNdiHomeClassificationService(
null,
cache,
new DriverNdiDbscanClusterer(),
properties
);
}
@Test
void accumulatesOtherDriverLocationsAcrossFileSessionsAndUsesThemAsCompanyHome() {
OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z");
Map<String, List<DriverWorkingTimeRestCoverageInterval>> firstSession = new LinkedHashMap<>();
firstSession.put("D2", List.of(evidence("D2", start, 8, 48.20820, 16.37380, 0.0d, "VIN-DEPOT", "VIN-DEPOT")));
firstSession.put("D3", List.of(evidence("D3", start.plusDays(1), 8, 48.20825, 16.37385, 0.0d, "VIN-DEPOT", "VIN-DEPOT")));
firstSession.put("D4", List.of(evidence("D4", start.plusDays(2), 8, 48.20815, 16.37375, 0.0d, "VIN-DEPOT", "VIN-DEPOT")));
service.classifyEvidence(request(UUID.randomUUID()), firstSession);
Map<String, List<DriverWorkingTimeRestCoverageInterval>> secondSession = Map.of(
"D1",
List.of(evidence("D1", start.plusDays(3), 8, 48.20822, 16.37382, 0.0d, "VIN-DEPOT", "VIN-DEPOT"))
);
DriverNdiHomeClassificationScopeResult result = service.classifyEvidence(
request(UUID.randomUUID()),
secondSession
);
DriverNdiHomeClassificationResult driver = result.resultForDriver("D1");
assertThat(driver.cachedActualDriverObservationCount()).isEqualTo(1);
assertThat(driver.cachedOtherDriverObservationCount()).isEqualTo(3);
assertThat(driver.companyHomeClusterCount()).isEqualTo(1);
assertThat(driver.classifications()).singleElement().satisfies(classification -> {
assertThat(classification.status()).isEqualTo(DriverNdiHomeStatus.HOME);
assertThat(classification.reason()).isEqualTo(DriverNdiHomeClassificationReason.COMPANY_HOME_CLUSTER);
});
}
@Test
void identifiesDriverPrivateHomeSeparatelyFromOtherDriverLocations() {
OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z");
Map<String, List<DriverWorkingTimeRestCoverageInterval>> evidenceByDriver = new LinkedHashMap<>();
evidenceByDriver.put("D1", List.of(
evidence("D1", start, 8, 48.20820, 16.37380, 0.0d, "VIN-A", "VIN-A"),
evidence("D1", start.plusDays(1), 8, 48.20825, 16.37385, 0.0d, "VIN-A", "VIN-A"),
evidence("D1", start.plusDays(2), 8, 48.20815, 16.37375, 0.0d, "VIN-A", "VIN-A"),
evidence("D1", start.plusDays(3), 8, 48.30000, 16.50000, 0.0d, "VIN-A", "VIN-A")
));
for (int index = 0; index < 9; index++) {
double latitude = 47.0d + index * 0.1d;
double longitude = 14.0d + index * 0.1d;
evidenceByDriver.put(
"OTHER-" + index,
List.of(evidence(
"OTHER-" + index,
start.plusDays(10L + index),
8,
latitude,
longitude,
0.0d,
"VIN-" + index,
"VIN-" + index
))
);
}
DriverNdiHomeClassificationResult driver = service.classifyEvidence(
request(UUID.randomUUID()),
evidenceByDriver
).resultForDriver("D1");
assertThat(driver.companyHomeClusterCount()).isZero();
assertThat(driver.driverHomeClusterCount()).isEqualTo(1);
assertThat(driver.classifications().subList(0, 3))
.allSatisfy(classification -> {
assertThat(classification.status()).isEqualTo(DriverNdiHomeStatus.HOME);
assertThat(classification.reason()).isEqualTo(DriverNdiHomeClassificationReason.DRIVER_HOME_CLUSTER);
});
assertThat(driver.classifications().get(3).reason())
.isEqualTo(DriverNdiHomeClassificationReason.LONG_REST_OUTSIDE_HOME_CLUSTER);
}
@Test
void reprocessingTheSameNdiUpdatesLocationWithoutDuplicatingTheCachedVisit() {
OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z");
service.classifyEvidence(
request(UUID.randomUUID()),
Map.of("D1", List.of(evidence("D1", start, 8, 48.20820, 16.37380, 0.0d, "VIN-A", "VIN-A")))
);
DriverNdiHomeClassificationScopeResult result = service.classifyEvidence(
request(UUID.randomUUID()),
Map.of("D1", List.of(evidence("D1", start, 8, 48.20900, 16.37450, 0.0d, "VIN-A", "VIN-A")))
);
assertThat(result.cachedObservationCount()).isEqualTo(1);
assertThat(result.resultForDriver("D1").cachedActualDriverObservationCount()).isEqualTo(1);
assertThat(result.resultForDriver("D1").classifications())
.singleElement()
.satisfies(classification -> {
assertThat(classification.latitude()).isEqualTo(48.20900);
assertThat(classification.longitude()).isEqualTo(16.37450);
});
}
@Test
void doesNotTreatRegistrationChangeAsVehicleChangeWhenVinIsUnchanged() {
OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z");
DriverWorkingTimeRestCoverageInterval interval = evidenceWithIdentity(
"D1",
start,
2,
null,
null,
0.0d,
"REG-A",
"REG-B",
"VIN-A",
"VIN-A"
);
DriverNdiHomeClassificationResult result = service.classifyEvidence(
request(UUID.randomUUID()),
Map.of("D1", List.of(interval))
).resultForDriver("D1");
assertThat(result.classifications()).singleElement().satisfies(classification -> {
assertThat(classification.status()).isEqualTo(DriverNdiHomeStatus.NOT_HOME);
assertThat(classification.reason()).isEqualTo(DriverNdiHomeClassificationReason.NO_POSITION_SHORT_REST);
});
}
@Test
void appliesRulesInDocumentOrderBeforeLocationClassification() {
OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z");
List<DriverWorkingTimeRestCoverageInterval> evidence = List.of(
evidence("D1", start, 1, null, null, 0.0d, "VIN-A", "VIN-B"),
evidence("D1", start.plusDays(1), 2, null, null, 81.0d, "VIN-A", "VIN-A"),
evidence("D1", start.plusDays(2), 25, 48.5, 16.5, 0.0d, "VIN-A", "VIN-A"),
evidence("D1", start.plusDays(4), 8, null, null, 0.0d, "VIN-A", "VIN-A"),
evidence("D1", start.plusDays(5), 2, null, null, 0.0d, "VIN-A", "VIN-A")
);
DriverNdiHomeClassificationResult driver = service.classifyEvidence(
request(UUID.randomUUID()),
Map.of("D1", evidence)
).resultForDriver("D1");
assertThat(driver.classifications())
.extracting(value -> value.reason())
.containsExactly(
DriverNdiHomeClassificationReason.VEHICLE_CHANGED,
DriverNdiHomeClassificationReason.CARD_REMOVED_OVER_THRESHOLD,
DriverNdiHomeClassificationReason.REST_OVER_VERY_LONG_THRESHOLD,
DriverNdiHomeClassificationReason.NO_POSITION_LONG_REST,
DriverNdiHomeClassificationReason.NO_POSITION_SHORT_REST
);
assertThat(driver.classifications())
.extracting(value -> value.status())
.containsExactly(
DriverNdiHomeStatus.HOME,
DriverNdiHomeStatus.HOME,
DriverNdiHomeStatus.HOME,
DriverNdiHomeStatus.HOME,
DriverNdiHomeStatus.NOT_HOME
);
}
private UnifiedRuntimeProcessingApiRequest request(UUID sessionId) {
return new UnifiedRuntimeProcessingApiRequest(
sessionId,
List.of(sessionId),
null,
"TENANT",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
Set.of("DRIVER_CARD", "VEHICLE_UNIT"),
null,
Set.of(),
true,
Set.of(),
true,
null,
null,
null,
null,
null,
true,
0,
true,
3,
720,
true,
true
);
}
private DriverWorkingTimeRestCoverageInterval evidence(
String driverKey,
OffsetDateTime start,
int durationHours,
Double latitude,
Double longitude,
double cardAbsentCoveragePercent,
String previousVehicle,
String nextVehicle
) {
return evidenceWithIdentity(
driverKey,
start,
durationHours,
latitude,
longitude,
cardAbsentCoveragePercent,
"REG-A",
previousVehicle.equals(nextVehicle) ? "REG-A" : "REG-B",
previousVehicle,
nextVehicle
);
}
private DriverWorkingTimeRestCoverageInterval evidenceWithIdentity(
String driverKey,
OffsetDateTime start,
int durationHours,
Double latitude,
Double longitude,
double cardAbsentCoveragePercent,
String previousRegistration,
String nextRegistration,
String previousVehicle,
String nextVehicle
) {
OffsetDateTime end = start.plusHours(durationHours);
long durationSeconds = durationHours * 3600L;
return new DriverWorkingTimeRestCoverageInterval(
UUID.randomUUID(),
driverKey,
start,
end,
durationSeconds,
Math.round(durationSeconds * cardAbsentCoveragePercent / 100.0d),
cardAbsentCoveragePercent,
"DRIVE-BEFORE-" + start,
"DRIVE-AFTER-" + start,
previousRegistration,
nextRegistration,
previousVehicle,
nextVehicle,
null,
null,
latitude == null ? null : "GEO-" + start,
latitude == null ? null : "POSITION",
latitude == null ? null : start,
latitude,
longitude,
latitude == null ? null : 0L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
}

View File

@ -423,4 +423,103 @@ class DriverWorkingTimeReusableProjectionBuilderTest {
.isNull(); .isNull();
} }
@Test
void buildsEnrichedCoverageForNdiBelowLegacyRestThreshold() {
DriverWorkingTimeReusableProjectionBuilder builder =
new DriverWorkingTimeReusableProjectionBuilder(new EventHubProperties());
UUID sessionId = UUID.randomUUID();
OffsetDateTime from = OffsetDateTime.parse("2026-05-01T08:00:00Z");
OffsetDateTime firstDriveEnd = OffsetDateTime.parse("2026-05-01T09:00:00Z");
OffsetDateTime secondDriveStart = OffsetDateTime.parse("2026-05-01T10:00:00Z");
OffsetDateTime to = OffsetDateTime.parse("2026-05-01T11:00:00Z");
DriverWorkingTimeProcessingInput input = new DriverWorkingTimeProcessingInput(
sessionId,
"12:123",
"DRIVER_CARD",
from,
to,
from,
to,
3,
720,
List.of(
new DriverWorkingTimeActivityInterval(
sessionId,
"12:123",
"ACT-1",
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"DRIVER_CARD",
"ACT-1",
"ACT-1",
from,
firstDriveEnd,
from.toEpochSecond(),
firstDriveEnd.toEpochSecond(),
firstDriveEnd.toEpochSecond() - from.toEpochSecond(),
List.of("ACT-1"),
false,
false,
"RAW_INTERVAL"
),
new DriverWorkingTimeActivityInterval(
sessionId,
"12:123",
"ACT-2",
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"DRIVER_CARD",
"ACT-2",
"ACT-2",
secondDriveStart,
to,
secondDriveStart.toEpochSecond(),
to.toEpochSecond(),
to.toEpochSecond() - secondDriveStart.toEpochSecond(),
List.of("ACT-2"),
false,
false,
"RAW_INTERVAL"
)
),
List.of(
new DriverWorkingTimeVehicleUsageInterval(
sessionId,
"12:123",
"VU-1",
"VU-START",
"VU-END",
from,
to,
from.toEpochSecond(),
to.toEpochSecond(),
to.toEpochSecond() - from.toEpochSecond(),
null,
null,
"12:REG-1",
"VIN-1",
"DRIVER_CARD",
List.of("VU-START", "VU-END")
)
),
List.of(),
List.of()
);
assertThat(builder.buildDerivedProjectionBundle(input).dailyWeeklyRestCandidateCoverageIntervals()).isEmpty();
assertThat(builder.buildAllNonDrivingIntervalCoverage(input))
.singleElement()
.satisfies(interval -> assertThat(interval.durationSeconds()).isEqualTo(3600L));
}
} }

View File

@ -5,6 +5,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto; import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDriverPartition; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDriverPartition;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput;
@ -202,6 +203,21 @@ class DriverWorkingTimeDerivedProjectionsModuleTest {
Map.of("12:123", preparedInput), Map.of("12:123", preparedInput),
Map.of(), Map.of(),
List.of() List.of()
),
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION,
new RuntimeProcessingModuleResult(
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION,
RuntimeProcessingModuleStatus.SUCCESS,
new DriverNdiHomeClassificationScopeResult(
"TENANT|FILE_SESSION|NDI_HOME_V1",
0,
0,
0,
Map.of(),
List.of()
),
Map.of(),
List.of()
) )
) )
); );

View File

@ -8,6 +8,8 @@ import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDt
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto; import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto;
import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest; import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest;
import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeModuleKeys;
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingPipelineExecutor;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
@ -77,6 +79,172 @@ class DriverWorkingTimeRuntimeProcessingPlanTest {
assertThat(resolved.vehicleExpansionPaddingMinutes()).isEqualTo(15); assertThat(resolved.vehicleExpansionPaddingMinutes()).isEqualTo(15);
} }
@Test
void expandsSelectedFileSessionToAllDriversForNdiLearning() {
DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan(
Mockito.mock(RuntimeProcessingPipelineExecutor.class)
);
UnifiedRuntimeProcessingApiRequest expanded = plan.expandFileSessionLearningScope(sourceSelection(), true);
assertThat(expanded.driverKey()).isNull();
assertThat(expanded.driverKeys()).isEmpty();
assertThat(expanded.includeAllDrivers()).isTrue();
assertThat(expanded.sessionId()).isEqualTo(sourceSelection().sessionId());
}
@Test
void recognizesSessionOnlySelectionAsFileSessionLearningScope() {
DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan(
Mockito.mock(RuntimeProcessingPipelineExecutor.class)
);
UnifiedRuntimeProcessingApiRequest source = sourceSelection();
UnifiedRuntimeProcessingApiRequest sessionOnly = new UnifiedRuntimeProcessingApiRequest(
source.sessionId(),
source.sessionIds(),
source.compositeSessionId(),
source.tenantKey(),
Set.of(),
source.eventBackend(),
source.sourceKinds(),
source.driverKey(),
source.driverKeys(),
source.includeAllDrivers(),
source.vehicleKeys(),
source.includeAllVehicles(),
source.driverSourceEntityId(),
source.driverCardNation(),
source.driverCardNumber(),
source.occurredFrom(),
source.occurredTo(),
source.expandVehicleEvents(),
source.vehicleExpansionPaddingMinutes(),
source.includeIntersectingIntervals(),
source.significantDrivingMinutes(),
source.minimumRestPeriodMinutes(),
source.includeActivityIntervals(),
source.includeDrivingIntervals(),
source.sourceInputs()
);
UnifiedRuntimeProcessingApiRequest expanded = plan.expandFileSessionLearningScope(sessionOnly, true);
assertThat(expanded.includeAllDrivers()).isTrue();
assertThat(expanded.driverKey()).isNull();
assertThat(expanded.sessionId()).isEqualTo(sessionOnly.sessionId());
}
@Test
void doesNotExpandAlternateDriverSelectorWithoutCanonicalDriverKey() {
DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan(
Mockito.mock(RuntimeProcessingPipelineExecutor.class)
);
UnifiedRuntimeProcessingApiRequest selectedByCard = new UnifiedRuntimeProcessingApiRequest(
sourceSelection().sessionId(),
sourceSelection().sessionIds(),
sourceSelection().compositeSessionId(),
sourceSelection().tenantKey(),
sourceSelection().sourceFamilies(),
sourceSelection().eventBackend(),
sourceSelection().sourceKinds(),
null,
Set.of(),
false,
sourceSelection().vehicleKeys(),
sourceSelection().includeAllVehicles(),
"driver-source-1",
"A",
"CARD-1",
sourceSelection().occurredFrom(),
sourceSelection().occurredTo(),
sourceSelection().expandVehicleEvents(),
sourceSelection().vehicleExpansionPaddingMinutes(),
sourceSelection().includeIntersectingIntervals(),
sourceSelection().significantDrivingMinutes(),
sourceSelection().minimumRestPeriodMinutes(),
sourceSelection().includeActivityIntervals(),
sourceSelection().includeDrivingIntervals(),
sourceSelection().sourceInputs()
);
assertThat(plan.expandFileSessionLearningScope(selectedByCard, true)).isSameAs(selectedByCard);
}
@Test
void clearsAlternateSelectorsWhenCanonicalDriverKeyAllowsSafeResponseFiltering() {
DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan(
Mockito.mock(RuntimeProcessingPipelineExecutor.class)
);
UnifiedRuntimeProcessingApiRequest requested = new UnifiedRuntimeProcessingApiRequest(
sourceSelection().sessionId(),
sourceSelection().sessionIds(),
sourceSelection().compositeSessionId(),
sourceSelection().tenantKey(),
sourceSelection().sourceFamilies(),
sourceSelection().eventBackend(),
sourceSelection().sourceKinds(),
sourceSelection().driverKey(),
sourceSelection().driverKeys(),
false,
sourceSelection().vehicleKeys(),
sourceSelection().includeAllVehicles(),
"driver-source-1",
"A",
"CARD-1",
sourceSelection().occurredFrom(),
sourceSelection().occurredTo(),
sourceSelection().expandVehicleEvents(),
sourceSelection().vehicleExpansionPaddingMinutes(),
sourceSelection().includeIntersectingIntervals(),
sourceSelection().significantDrivingMinutes(),
sourceSelection().minimumRestPeriodMinutes(),
sourceSelection().includeActivityIntervals(),
sourceSelection().includeDrivingIntervals(),
sourceSelection().sourceInputs()
);
UnifiedRuntimeProcessingApiRequest expanded = plan.expandFileSessionLearningScope(requested, true);
assertThat(expanded.includeAllDrivers()).isTrue();
assertThat(expanded.driverSourceEntityId()).isNull();
assertThat(expanded.driverCardNation()).isNull();
assertThat(expanded.driverCardNumber()).isNull();
}
@Test
void canDisableAllDriverExpansionForNdiLearning() {
DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan(
Mockito.mock(RuntimeProcessingPipelineExecutor.class)
);
UnifiedRuntimeProcessingApiRequest requested = sourceSelection();
assertThat(plan.expandFileSessionLearningScope(requested, false)).isSameAs(requested);
}
@Test
void dedicatedHomePlanForcesNdiBeforeFinalProjectionAndEnablesAllDriverLearning() {
DriverHomeClassificationRuntimeProcessingPlan plan = new DriverHomeClassificationRuntimeProcessingPlan(
Mockito.mock(DriverWorkingTimeRuntimeProcessingPlan.class)
);
RuntimeProcessingExecutionApiRequest request = new RuntimeProcessingExecutionApiRequest(
DriverHomeClassificationRuntimeProcessingPlan.PLAN_KEY,
sourceSelection(),
null,
List.of(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS),
Map.of()
);
RuntimeProcessingExecutionApiRequest delegated = plan.prepareDelegatedRequest(request);
assertThat(delegated.processingPlanKey()).isEqualTo(DriverWorkingTimeRuntimeProcessingPlan.PLAN_KEY);
assertThat(delegated.modules()).endsWith(
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION,
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS
);
assertThat(delegated.parameters())
.containsEntry(DriverWorkingTimeRuntimeProcessingPlan.NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER, true);
}
@Test @Test
void executeCanOmitExtendedPartitionPayloads() { void executeCanOmitExtendedPartitionPayloads() {
RuntimeDriverWorkingTimeScopeProcessingService scopeService = Mockito.mock(RuntimeDriverWorkingTimeScopeProcessingService.class); RuntimeDriverWorkingTimeScopeProcessingService scopeService = Mockito.mock(RuntimeDriverWorkingTimeScopeProcessingService.class);