From 5a1055861244af36a23d8773082f2ebceef4c029 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Wed, 17 Jun 2026 09:13:31 +0200 Subject: [PATCH] Add NDI home classification runtime pipeline --- README_NDI_HOME_CLASSIFICATION.md | 136 +++++ .../eventhub/config/EventHubProperties.java | 86 +++ .../model/DriverNdiHomeClassification.java | 16 + .../DriverNdiHomeClassificationReason.java | 13 + .../DriverNdiHomeClassificationResult.java | 33 + ...riverNdiHomeClassificationScopeResult.java | 26 + .../model/DriverNdiHomeStatus.java | 6 + .../model/DriverNdiLocationCluster.java | 16 + .../DriverNdiLocationCorpusSnapshot.java | 18 + .../model/DriverNdiLocationObservation.java | 67 ++ .../service/DriverNdiDbscanClusterer.java | 203 +++++++ .../DriverNdiHomeClassificationService.java | 572 ++++++++++++++++++ .../service/DriverNdiLocationCorpusCache.java | 119 ++++ ...rWorkingTimeReusableProjectionBuilder.java | 23 +- ...fiedRuntimeDerivedProjectionResultDto.java | 40 +- ...erWorkingTimeDerivedProjectionsModule.java | 25 +- .../module/DriverWorkingTimeModuleKeys.java | 1 + .../module/NdiHomeClassificationModule.java | 79 +++ ...meClassificationRuntimeProcessingPlan.java | 121 ++++ ...riverWorkingTimeRuntimeProcessingPlan.java | 237 +++++++- src/main/resources/application.yml | 9 + .../service/DriverNdiDbscanClustererTest.java | 63 ++ ...riverNdiHomeClassificationServiceTest.java | 301 +++++++++ ...kingTimeReusableProjectionBuilderTest.java | 99 +++ ...rkingTimeDerivedProjectionsModuleTest.java | 16 + ...rWorkingTimeRuntimeProcessingPlanTest.java | 168 +++++ 26 files changed, 2474 insertions(+), 19 deletions(-) create mode 100644 README_NDI_HOME_CLASSIFICATION.md create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassification.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationReason.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationResult.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationScopeResult.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeStatus.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationCluster.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationCorpusSnapshot.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationObservation.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiDbscanClusterer.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiHomeClassificationService.java create mode 100644 src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiLocationCorpusCache.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/module/NdiHomeClassificationModule.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverHomeClassificationRuntimeProcessingPlan.java create mode 100644 src/test/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiDbscanClustererTest.java create mode 100644 src/test/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiHomeClassificationServiceTest.java diff --git a/README_NDI_HOME_CLASSIFICATION.md b/README_NDI_HOME_CLASSIFICATION.md new file mode 100644 index 0000000..b223227 --- /dev/null +++ b/README_NDI_HOME_CLASSIFICATION.md @@ -0,0 +1,136 @@ +# NDI HOME / NOT_HOME classification implementation + +This patch implements the HOME / NOT_HOME part of `docs/ndi_home_classification_en.md` as a dedicated runtime processing plan while reusing the existing driver-working-time pipeline. + +## Public processing plan + +Use: + +```text +driver-home-classification-v1 +``` + +The plan delegates to the shared `driver-working-time-v1` pipeline and explicitly inserts: + +```text +support-evidence-normalization +-> ndi-home-classification +-> driving-derived-projections +``` + +The original `driver-working-time-v1` plan does not run the optional NDI module by default. It can opt in by explicitly requesting `ndi-home-classification`. + +## Reused projection structures + +`DriverWorkingTimeReusableProjectionBuilder.buildAllNonDrivingIntervalCoverage(...)` runs the existing Esper interruption/card-absence/GNSS enrichment pipeline with a zero rest-candidate threshold. It therefore creates enriched evidence for every positive non-driving interruption without changing the legacy daily/weekly-rest threshold or outputs. + +The implementation reuses `DriverWorkingTimeRestCoverageInterval` as the enriched NDI evidence model. It provides: + +- previous and next driving/vehicle identities; +- NDI start, end, and duration; +- card-absence duration and percentage; +- begin/end boundary GNSS evidence; +- boundary odometer and movement evidence. + +## Implemented classification rules + +The rules are evaluated in the document order: + +1. previous and next vehicles differ -> `HOME`; +2. card absent for more than 80% -> `HOME`; +3. NDI longer than 24 hours -> `HOME`; +4. no position: NDI longer than 7.5 hours -> `HOME`, otherwise `NOT_HOME`; +5. positioned long NDI in a company or driver home cluster -> `HOME`; +6. positioned long NDI outside those clusters -> `NOT_HOME`; +7. remaining short NDI -> `NOT_HOME`. + +Every classification contains a `DriverNdiHomeClassificationReason`, so the first matching rule remains visible in the API response. + +## Location learning and clustering + +Only NDIs longer than 7.5 hours with a position are added to the corpus. + +Position selection follows the document through the existing boundary-evidence resolver: + +```text +resolved begin-boundary evidence for the previous driving/vehicle context, +otherwise resolved end-boundary evidence for the next driving/vehicle context +``` + +The selected evidence is the closest eligible support-position event within the configured boundary lookup window, so it is an approximation when no event exists exactly at the driving boundary. + +The in-memory cache: + +- accumulates observations across one or more file-session executions; +- deduplicates the same NDI across repeated/overlapping sessions; +- retains the source session IDs as provenance; +- stores the driver key on every observation; +- does not permanently mark a driver as "actual" or "other". + +For each result driver, the same cached corpus is viewed as: + +```text +actual-driver observations +other-driver observations +``` + +This makes the distinction request-relative and allows the corpus to be reused for another driver. + +Clustering uses Java DBSCAN with Haversine distance. Defaults are 150 metres and three points. Noise observations remain in the denominator for visit-share calculations but are never home clusters. + +## File-session learning scope + +The dedicated plan defaults `ndiLearnAllFileSessionDrivers` to `true`. + +For a request with explicit canonical driver keys, the plan internally loads all drivers from the selected file sessions for location learning and filters the response back to the originally requested drivers. + +The scope is not broadened when: + +- the source selection is mixed or database-only; +- the option is disabled; +- the request uses only alternate card/source selectors and cannot be filtered safely by canonical driver key. + +## Configuration + +The defaults are under: + +```yaml +eventhub: + tachograph-file-session: + processing: + ndi-long-minutes: 450 + ndi-very-long-minutes: 1440 + ndi-card-removal-percent: 80 + ndi-visit-share-percent: 25 + ndi-dbscan-eps-meters: 150 + ndi-dbscan-min-points: 3 + ndi-location-cache-ttl: 4h + ndi-location-cache-max-observations: 100000 + ndi-location-cache-namespace: default +``` + +For tenantless uploaded sessions, configure a namespace that prevents unrelated operational contexts from sharing a corpus. Explicit tenant keys always create tenant-scoped corpora. + +## Response extension + +Each driver partition can now contain: + +```text +ndiHomeClassification +``` + +It includes: + +- all NDI classifications; +- company and driver home cluster IDs; +- cluster centroids and visit statistics; +- actual-driver versus other-driver cached observation counts; +- diagnostics and notes. + +The field is omitted when the optional module was not executed, preserving the existing JSON shape for normal `driver-working-time-v1` calls. + +## Current implementation boundary + +This patch implements sections 1-4 of the document: NDI derivation/enrichment, location clustering, home-location determination, and HOME / NOT_HOME classification. + +Section 5, border-crossing/country trip segmentation, is intentionally not included yet. It needs a separate country-resolution abstraction and a decision between local geographic data, PostGIS, or an external reverse-geocoding provider. diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index 21fb336..09a8460 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -381,6 +381,15 @@ public class EventHubProperties { private int restCandidateGeoLookaheadMinutes = 180; private int restCandidateGeoStationaryMaxMeters = 500; private int restCandidateGeoMinorMovementMaxMeters = 2000; + private int ndiLongMinutes = 450; + private int ndiVeryLongMinutes = 1440; + private double ndiCardRemovalPercent = 80.0d; + private double ndiVisitSharePercent = 25.0d; + private int ndiDbscanEpsMeters = 150; + private int ndiDbscanMinPoints = 3; + private Duration ndiLocationCacheTtl = Duration.ofHours(4); + private int ndiLocationCacheMaxObservations = 100000; + private String ndiLocationCacheNamespace = "default"; private int mergeGapSeconds = 0; private int gapDetectionToleranceSeconds = 0; @@ -461,6 +470,83 @@ public class EventHubProperties { Math.max(this.restCandidateGeoStationaryMaxMeters, restCandidateGeoMinorMovementMaxMeters); } + public int getNdiLongMinutes() { + return ndiLongMinutes; + } + + public void setNdiLongMinutes(int ndiLongMinutes) { + this.ndiLongMinutes = Math.max(1, ndiLongMinutes); + this.ndiVeryLongMinutes = Math.max(this.ndiLongMinutes, this.ndiVeryLongMinutes); + } + + public int getNdiVeryLongMinutes() { + return ndiVeryLongMinutes; + } + + public void setNdiVeryLongMinutes(int ndiVeryLongMinutes) { + this.ndiVeryLongMinutes = Math.max(this.ndiLongMinutes, ndiVeryLongMinutes); + } + + public double getNdiCardRemovalPercent() { + return ndiCardRemovalPercent; + } + + public void setNdiCardRemovalPercent(double ndiCardRemovalPercent) { + this.ndiCardRemovalPercent = Math.max(0.0d, Math.min(100.0d, ndiCardRemovalPercent)); + } + + public double getNdiVisitSharePercent() { + return ndiVisitSharePercent; + } + + public void setNdiVisitSharePercent(double ndiVisitSharePercent) { + this.ndiVisitSharePercent = Math.max(0.0d, Math.min(100.0d, ndiVisitSharePercent)); + } + + public int getNdiDbscanEpsMeters() { + return ndiDbscanEpsMeters; + } + + public void setNdiDbscanEpsMeters(int ndiDbscanEpsMeters) { + this.ndiDbscanEpsMeters = Math.max(1, ndiDbscanEpsMeters); + } + + public int getNdiDbscanMinPoints() { + return ndiDbscanMinPoints; + } + + public void setNdiDbscanMinPoints(int ndiDbscanMinPoints) { + this.ndiDbscanMinPoints = Math.max(1, ndiDbscanMinPoints); + } + + public Duration getNdiLocationCacheTtl() { + return ndiLocationCacheTtl; + } + + public void setNdiLocationCacheTtl(Duration ndiLocationCacheTtl) { + if (ndiLocationCacheTtl != null && !ndiLocationCacheTtl.isNegative() && !ndiLocationCacheTtl.isZero()) { + this.ndiLocationCacheTtl = ndiLocationCacheTtl; + } + } + + public int getNdiLocationCacheMaxObservations() { + return ndiLocationCacheMaxObservations; + } + + public void setNdiLocationCacheMaxObservations(int ndiLocationCacheMaxObservations) { + this.ndiLocationCacheMaxObservations = Math.max(100, ndiLocationCacheMaxObservations); + } + + public String getNdiLocationCacheNamespace() { + return ndiLocationCacheNamespace; + } + + public void setNdiLocationCacheNamespace(String ndiLocationCacheNamespace) { + if (ndiLocationCacheNamespace != null && !ndiLocationCacheNamespace.isBlank()) { + this.ndiLocationCacheNamespace = ndiLocationCacheNamespace.trim(); + } + } + public int getMergeGapSeconds() { return mergeGapSeconds; } diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassification.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassification.java new file mode 100644 index 0000000..61668f8 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassification.java @@ -0,0 +1,16 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.model; + +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeRestCoverageInterval; + +public record DriverNdiHomeClassification( + String intervalId, + DriverWorkingTimeRestCoverageInterval evidence, + Double latitude, + Double longitude, + String locationSource, + String clusterId, + boolean clusterNoise, + DriverNdiHomeStatus status, + DriverNdiHomeClassificationReason reason +) { +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationReason.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationReason.java new file mode 100644 index 0000000..2c004b2 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationReason.java @@ -0,0 +1,13 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.model; + +public enum DriverNdiHomeClassificationReason { + VEHICLE_CHANGED, + CARD_REMOVED_OVER_THRESHOLD, + REST_OVER_VERY_LONG_THRESHOLD, + NO_POSITION_LONG_REST, + NO_POSITION_SHORT_REST, + COMPANY_HOME_CLUSTER, + DRIVER_HOME_CLUSTER, + LONG_REST_OUTSIDE_HOME_CLUSTER, + SHORT_REST +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationResult.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationResult.java new file mode 100644 index 0000000..e8fdb6b --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationResult.java @@ -0,0 +1,33 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.model; + +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +public record DriverNdiHomeClassificationResult( + String driverKey, + int nonDrivingIntervalCount, + int currentLongLocationObservationCount, + int cachedActualDriverObservationCount, + int cachedOtherDriverObservationCount, + int companyHomeClusterCount, + int driverHomeClusterCount, + List classifications, + List clusters, + Set companyHomeClusterIds, + Set driverHomeClusterIds, + List notes +) { + public DriverNdiHomeClassificationResult { + classifications = classifications == null ? List.of() : List.copyOf(classifications); + clusters = clusters == null ? List.of() : List.copyOf(clusters); + companyHomeClusterIds = companyHomeClusterIds == null + ? Set.of() + : Collections.unmodifiableSet(new LinkedHashSet<>(companyHomeClusterIds)); + driverHomeClusterIds = driverHomeClusterIds == null + ? Set.of() + : Collections.unmodifiableSet(new LinkedHashSet<>(driverHomeClusterIds)); + notes = notes == null ? List.of() : List.copyOf(notes); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationScopeResult.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationScopeResult.java new file mode 100644 index 0000000..8fa6144 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeClassificationScopeResult.java @@ -0,0 +1,26 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.model; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public record DriverNdiHomeClassificationScopeResult( + String corpusKey, + int currentObservationCount, + int cachedObservationCount, + int clusterCount, + Map driverResults, + List notes +) { + public DriverNdiHomeClassificationScopeResult { + driverResults = driverResults == null + ? Map.of() + : Collections.unmodifiableMap(new LinkedHashMap<>(driverResults)); + notes = notes == null ? List.of() : List.copyOf(notes); + } + + public DriverNdiHomeClassificationResult resultForDriver(String driverKey) { + return driverResults.get(driverKey); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeStatus.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeStatus.java new file mode 100644 index 0000000..e8aa898 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiHomeStatus.java @@ -0,0 +1,6 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.model; + +public enum DriverNdiHomeStatus { + HOME, + NOT_HOME +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationCluster.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationCluster.java new file mode 100644 index 0000000..52fa3c2 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationCluster.java @@ -0,0 +1,16 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.model; + +public record DriverNdiLocationCluster( + String clusterId, + double centroidLatitude, + double centroidLongitude, + int totalVisitCount, + int distinctDriverCount, + int actualDriverVisitCount, + int otherDriverVisitCount, + double companyVisitSharePercent, + double actualDriverVisitSharePercent, + boolean companyHome, + boolean actualDriverHome +) { +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationCorpusSnapshot.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationCorpusSnapshot.java new file mode 100644 index 0000000..d9ef705 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationCorpusSnapshot.java @@ -0,0 +1,18 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.model; + +import java.time.Instant; +import java.util.List; + +public record DriverNdiLocationCorpusSnapshot( + String corpusKey, + int observationCountBeforeMerge, + int addedObservationCount, + int observationCountAfterMerge, + List observations, + Instant updatedAt, + Instant expiresAt +) { + public DriverNdiLocationCorpusSnapshot { + observations = observations == null ? List.of() : List.copyOf(observations); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationObservation.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationObservation.java new file mode 100644 index 0000000..f8de244 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/model/DriverNdiLocationObservation.java @@ -0,0 +1,67 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.model; + +import java.time.OffsetDateTime; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.UUID; + +public record DriverNdiLocationObservation( + String observationId, + String tenantKey, + List sourceSessionIds, + UUID compositeSessionId, + String driverKey, + OffsetDateTime startedAt, + OffsetDateTime endedAt, + long durationSeconds, + double latitude, + double longitude, + String locationSource, + String geoEventId, + String geoEventDomain, + Long geoDistanceSeconds, + String previousDrivingSourceIntervalId, + String nextDrivingSourceIntervalId, + String previousRegistrationKey, + String nextRegistrationKey, + String previousVehicleKey, + String nextVehicleKey +) { + public DriverNdiLocationObservation { + sourceSessionIds = sourceSessionIds == null ? List.of() : List.copyOf(sourceSessionIds); + } + + public DriverNdiLocationObservation mergeProvenance(DriverNdiLocationObservation other) { + if (other == null || !observationId.equals(other.observationId())) { + return this; + } + LinkedHashSet mergedSessionIds = new LinkedHashSet<>(sourceSessionIds); + mergedSessionIds.addAll(other.sourceSessionIds()); + return new DriverNdiLocationObservation( + observationId, + other.tenantKey() != null ? other.tenantKey() : tenantKey, + List.copyOf(mergedSessionIds), + other.compositeSessionId() != null ? other.compositeSessionId() : compositeSessionId, + other.driverKey() != null ? other.driverKey() : driverKey, + other.startedAt() != null ? other.startedAt() : startedAt, + other.endedAt() != null ? other.endedAt() : endedAt, + other.durationSeconds(), + other.latitude(), + other.longitude(), + other.locationSource() != null ? other.locationSource() : locationSource, + other.geoEventId() != null ? other.geoEventId() : geoEventId, + other.geoEventDomain() != null ? other.geoEventDomain() : geoEventDomain, + other.geoDistanceSeconds() != null ? other.geoDistanceSeconds() : geoDistanceSeconds, + other.previousDrivingSourceIntervalId() != null + ? other.previousDrivingSourceIntervalId() + : previousDrivingSourceIntervalId, + other.nextDrivingSourceIntervalId() != null + ? other.nextDrivingSourceIntervalId() + : nextDrivingSourceIntervalId, + other.previousRegistrationKey() != null ? other.previousRegistrationKey() : previousRegistrationKey, + other.nextRegistrationKey() != null ? other.nextRegistrationKey() : nextRegistrationKey, + other.previousVehicleKey() != null ? other.previousVehicleKey() : previousVehicleKey, + other.nextVehicleKey() != null ? other.nextVehicleKey() : nextVehicleKey + ); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiDbscanClusterer.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiDbscanClusterer.java new file mode 100644 index 0000000..5088325 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiDbscanClusterer.java @@ -0,0 +1,203 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.service; + +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationObservation; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.springframework.stereotype.Service; + +@Service +public class DriverNdiDbscanClusterer { + + public static final String NOISE_CLUSTER_ID = "NOISE"; + private static final int UNVISITED = Integer.MIN_VALUE; + private static final int NOISE = -1; + private static final double EARTH_RADIUS_METERS = 6_371_008.8d; + + public ClusterResult cluster( + List observations, + double epsilonMeters, + int minimumPoints + ) { + List points = observations == null ? List.of() : List.copyOf(observations); + double effectiveEpsilonMeters = Double.isFinite(epsilonMeters) + ? Math.max(0.0d, epsilonMeters) + : 0.0d; + int effectiveMinimumPoints = Math.max(1, minimumPoints); + if (points.isEmpty()) { + return new ClusterResult(Map.of(), Map.of()); + } + + int[] labels = new int[points.size()]; + java.util.Arrays.fill(labels, UNVISITED); + int clusterNumber = 0; + for (int pointIndex = 0; pointIndex < points.size(); pointIndex++) { + if (labels[pointIndex] != UNVISITED) { + continue; + } + List neighbours = neighbours(points, pointIndex, effectiveEpsilonMeters); + if (neighbours.size() < effectiveMinimumPoints) { + labels[pointIndex] = NOISE; + continue; + } + expandCluster(points, labels, pointIndex, neighbours, clusterNumber, effectiveEpsilonMeters, effectiveMinimumPoints); + clusterNumber++; + } + + Map> numericClusters = new HashMap<>(); + for (int index = 0; index < points.size(); index++) { + if (labels[index] >= 0) { + numericClusters.computeIfAbsent(labels[index], ignored -> new ArrayList<>()).add(points.get(index)); + } + } + + List sortedClusters = numericClusters.entrySet().stream() + .map(entry -> RawCluster.of(entry.getKey(), entry.getValue())) + .sorted(Comparator + .comparingDouble(RawCluster::centroidLatitude) + .thenComparingDouble(RawCluster::centroidLongitude) + .thenComparingInt(RawCluster::numericId)) + .toList(); + + Map clusterIdByNumericId = new HashMap<>(); + LinkedHashMap clusters = new LinkedHashMap<>(); + for (int index = 0; index < sortedClusters.size(); index++) { + RawCluster raw = sortedClusters.get(index); + String clusterId = "CLUSTER-" + String.format("%03d", index + 1); + clusterIdByNumericId.put(raw.numericId(), clusterId); + clusters.put(clusterId, new ClusterMembers( + clusterId, + raw.centroidLatitude(), + raw.centroidLongitude(), + raw.members() + )); + } + + LinkedHashMap assignmentByObservationId = new LinkedHashMap<>(); + for (int index = 0; index < points.size(); index++) { + String clusterId = labels[index] == NOISE + ? NOISE_CLUSTER_ID + : clusterIdByNumericId.get(labels[index]); + assignmentByObservationId.put(points.get(index).observationId(), clusterId); + } + + return new ClusterResult(Map.copyOf(assignmentByObservationId), Map.copyOf(clusters)); + } + + private void expandCluster( + List points, + int[] labels, + int seedIndex, + List seedNeighbours, + int clusterNumber, + double epsilonMeters, + int minimumPoints + ) { + labels[seedIndex] = clusterNumber; + Deque queue = new ArrayDeque<>(); + Set queued = new LinkedHashSet<>(); + for (Integer neighbour : seedNeighbours) { + if (neighbour != seedIndex && queued.add(neighbour)) { + queue.addLast(neighbour); + } + } + + while (!queue.isEmpty()) { + int candidate = queue.removeFirst(); + if (labels[candidate] == NOISE) { + labels[candidate] = clusterNumber; + } + if (labels[candidate] != UNVISITED) { + continue; + } + labels[candidate] = clusterNumber; + List candidateNeighbours = neighbours(points, candidate, epsilonMeters); + if (candidateNeighbours.size() >= minimumPoints) { + for (Integer neighbour : candidateNeighbours) { + if (queued.add(neighbour)) { + queue.addLast(neighbour); + } + } + } + } + } + + private List neighbours( + List points, + int pointIndex, + double epsilonMeters + ) { + DriverNdiLocationObservation source = points.get(pointIndex); + List neighbours = new ArrayList<>(); + for (int candidateIndex = 0; candidateIndex < points.size(); candidateIndex++) { + DriverNdiLocationObservation candidate = points.get(candidateIndex); + if (haversineMeters( + source.latitude(), + source.longitude(), + candidate.latitude(), + candidate.longitude() + ) <= epsilonMeters) { + neighbours.add(candidateIndex); + } + } + return neighbours; + } + + static double haversineMeters( + double latitudeA, + double longitudeA, + double latitudeB, + double longitudeB + ) { + double latitudeDelta = Math.toRadians(latitudeB - latitudeA); + double longitudeDelta = Math.toRadians(longitudeB - longitudeA); + double latitudeARadians = Math.toRadians(latitudeA); + double latitudeBRadians = Math.toRadians(latitudeB); + double haversine = Math.sin(latitudeDelta / 2.0d) * Math.sin(latitudeDelta / 2.0d) + + Math.cos(latitudeARadians) * Math.cos(latitudeBRadians) + * Math.sin(longitudeDelta / 2.0d) * Math.sin(longitudeDelta / 2.0d); + double normalizedHaversine = Math.max(0.0d, Math.min(1.0d, haversine)); + double angularDistance = 2.0d * Math.atan2( + Math.sqrt(normalizedHaversine), + Math.sqrt(1.0d - normalizedHaversine) + ); + return EARTH_RADIUS_METERS * angularDistance; + } + + public record ClusterResult( + Map assignmentByObservationId, + Map clusters + ) { + } + + public record ClusterMembers( + String clusterId, + double centroidLatitude, + double centroidLongitude, + List members + ) { + public ClusterMembers { + members = members == null ? List.of() : List.copyOf(members); + } + } + + private record RawCluster( + int numericId, + double centroidLatitude, + double centroidLongitude, + List members + ) { + private static RawCluster of(int numericId, List members) { + double latitude = members.stream().mapToDouble(DriverNdiLocationObservation::latitude).average().orElse(0.0d); + double longitude = members.stream().mapToDouble(DriverNdiLocationObservation::longitude).average().orElse(0.0d); + return new RawCluster(numericId, latitude, longitude, List.copyOf(members)); + } + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiHomeClassificationService.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiHomeClassificationService.java new file mode 100644 index 0000000..7b7d9c9 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiHomeClassificationService.java @@ -0,0 +1,572 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassification; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationReason; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationResult; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeStatus; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationCluster; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationCorpusSnapshot; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationObservation; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeRestCoverageInterval; +import at.procon.eventhub.processing.driverworkingtime.service.DriverWorkingTimeReusableProjectionBuilder; +import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import org.springframework.stereotype.Service; + +@Service +public class DriverNdiHomeClassificationService { + + private static final String ALGORITHM_VERSION = "NDI_HOME_V1"; + + private final DriverWorkingTimeReusableProjectionBuilder projectionBuilder; + private final DriverNdiLocationCorpusCache locationCorpusCache; + private final DriverNdiDbscanClusterer clusterer; + private final EventHubProperties properties; + + public DriverNdiHomeClassificationService( + DriverWorkingTimeReusableProjectionBuilder projectionBuilder, + DriverNdiLocationCorpusCache locationCorpusCache, + DriverNdiDbscanClusterer clusterer, + EventHubProperties properties + ) { + this.projectionBuilder = projectionBuilder; + this.locationCorpusCache = locationCorpusCache; + this.clusterer = clusterer; + this.properties = properties; + } + + public DriverNdiHomeClassificationScopeResult classifyPreparedInputs( + UnifiedRuntimeProcessingApiRequest request, + Map preparedInputs + ) { + LinkedHashMap> evidenceByDriver = new LinkedHashMap<>(); + if (preparedInputs != null) { + preparedInputs.entrySet().stream() + .filter(entry -> entry.getKey() != null + && entry.getValue() != null + && entry.getValue().processingInput() != null) + .sorted(Map.Entry.comparingByKey()) + .forEach(entry -> evidenceByDriver.put( + entry.getKey(), + projectionBuilder.buildAllNonDrivingIntervalCoverage(entry.getValue().processingInput()) + )); + } + return classifyEvidence(request, evidenceByDriver); + } + + public DriverNdiHomeClassificationScopeResult classifyEvidence( + UnifiedRuntimeProcessingApiRequest request, + Map> evidenceByDriver + ) { + EventHubProperties.Processing settings = properties.getRuntimeProcessing(); + long longThresholdSeconds = settings.getNdiLongMinutes() * 60L; + long veryLongThresholdSeconds = settings.getNdiVeryLongMinutes() * 60L; + double cardRemovalThresholdPercent = settings.getNdiCardRemovalPercent(); + double visitShareThresholdPercent = settings.getNdiVisitSharePercent(); + + List sourceSessionIds = sourceSessionIds(request); + UUID compositeSessionId = request == null ? null : request.compositeSessionId(); + String tenantKey = normalizedTenantKey(request == null ? null : request.tenantKey()); + String corpusKey = corpusKey( + request, + tenantKey, + settings.getNdiLocationCacheNamespace(), + settings.getNdiLongMinutes() + ); + + LinkedHashMap> safeEvidenceByDriver = new LinkedHashMap<>(); + if (evidenceByDriver != null) { + evidenceByDriver.entrySet().stream() + .filter(entry -> entry.getKey() != null && !entry.getKey().isBlank()) + .sorted(Map.Entry.comparingByKey()) + .forEach(entry -> safeEvidenceByDriver.put( + entry.getKey(), + safeList(entry.getValue()).stream() + .filter(Objects::nonNull) + .sorted(Comparator + .comparing(DriverWorkingTimeRestCoverageInterval::startedAt, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(DriverWorkingTimeRestCoverageInterval::endedAt, Comparator.nullsLast(Comparator.naturalOrder()))) + .toList() + )); + } + + List currentObservations = new ArrayList<>(); + Map currentObservationByIntervalId = new HashMap<>(); + for (Map.Entry> entry : safeEvidenceByDriver.entrySet()) { + for (DriverWorkingTimeRestCoverageInterval evidence : entry.getValue()) { + if (evidence.durationSeconds() <= longThresholdSeconds) { + continue; + } + ResolvedPosition position = resolvePosition(evidence); + if (position == null) { + continue; + } + String intervalId = intervalId(evidence); + DriverNdiLocationObservation observation = new DriverNdiLocationObservation( + observationId(tenantKey, intervalId), + tenantKey, + observationSourceSessionIds(evidence.sessionId(), sourceSessionIds), + compositeSessionId, + entry.getKey(), + evidence.startedAt(), + evidence.endedAt(), + evidence.durationSeconds(), + position.latitude(), + position.longitude(), + position.source(), + position.geoEventId(), + position.geoEventDomain(), + position.geoDistanceSeconds(), + evidence.previousDrivingSourceIntervalId(), + evidence.nextDrivingSourceIntervalId(), + evidence.previousRegistrationKey(), + evidence.nextRegistrationKey(), + evidence.previousVehicleKey(), + evidence.nextVehicleKey() + ); + currentObservations.add(observation); + currentObservationByIntervalId.put(intervalId, observation); + } + } + + DriverNdiLocationCorpusSnapshot corpus = locationCorpusCache.merge(corpusKey, currentObservations); + DriverNdiDbscanClusterer.ClusterResult clusterResult = clusterer.cluster( + corpus.observations(), + settings.getNdiDbscanEpsMeters(), + settings.getNdiDbscanMinPoints() + ); + + Set companyHomeClusterIds = determineCompanyHomeClusters( + clusterResult, + corpus.observations().size(), + visitShareThresholdPercent + ); + + LinkedHashMap driverResults = new LinkedHashMap<>(); + for (Map.Entry> entry : safeEvidenceByDriver.entrySet()) { + String driverKey = entry.getKey(); + List cachedDriverObservations = corpus.observations().stream() + .filter(observation -> Objects.equals(driverKey, observation.driverKey())) + .toList(); + Set driverHomeClusterIds = determineDriverHomeClusters( + driverKey, + clusterResult, + cachedDriverObservations.size(), + companyHomeClusterIds, + visitShareThresholdPercent + ); + + List driverClusters = buildDriverClusterView( + driverKey, + clusterResult, + corpus.observations().size(), + cachedDriverObservations.size(), + companyHomeClusterIds, + driverHomeClusterIds + ); + + List classifications = entry.getValue().stream() + .map(evidence -> classifyInterval( + evidence, + currentObservationByIntervalId.get(intervalId(evidence)), + clusterResult.assignmentByObservationId(), + companyHomeClusterIds, + driverHomeClusterIds, + longThresholdSeconds, + veryLongThresholdSeconds, + cardRemovalThresholdPercent + )) + .toList(); + + int currentDriverObservationCount = (int) currentObservations.stream() + .filter(observation -> Objects.equals(driverKey, observation.driverKey())) + .count(); + int cachedOtherDriverObservationCount = corpus.observations().size() - cachedDriverObservations.size(); + List notes = List.of( + "NDI classification used ordered rules from ndi_home_classification_en.md.", + "Location learning used " + cachedDriverObservations.size() + " cached observation(s) for the actual driver and " + + cachedOtherDriverObservationCount + " observation(s) from other drivers.", + "DBSCAN parameters: eps=" + settings.getNdiDbscanEpsMeters() + "m, minPoints=" + + settings.getNdiDbscanMinPoints() + "." + ); + driverResults.put(driverKey, new DriverNdiHomeClassificationResult( + driverKey, + classifications.size(), + currentDriverObservationCount, + cachedDriverObservations.size(), + cachedOtherDriverObservationCount, + companyHomeClusterIds.size(), + driverHomeClusterIds.size(), + classifications, + driverClusters, + companyHomeClusterIds, + driverHomeClusterIds, + notes + )); + } + + List notes = new ArrayList<>(); + notes.add("NDI location corpus " + corpus.corpusKey() + " contains " + corpus.observationCountAfterMerge() + + " observation(s); " + corpus.addedObservationCount() + " were added by this execution."); + notes.add("Current execution contributed " + currentObservations.size() + " long NDI location observation(s) from " + + safeEvidenceByDriver.size() + " driver(s)."); + if (safeEvidenceByDriver.size() <= 1) { + notes.add("Only one driver was present in the current execution; company-home detection may still use cached observations from other drivers."); + } + + return new DriverNdiHomeClassificationScopeResult( + corpus.corpusKey(), + currentObservations.size(), + corpus.observationCountAfterMerge(), + clusterResult.clusters().size(), + driverResults, + notes + ); + } + + private DriverNdiHomeClassification classifyInterval( + DriverWorkingTimeRestCoverageInterval evidence, + DriverNdiLocationObservation observation, + Map assignmentByObservationId, + Set companyHomeClusterIds, + Set driverHomeClusterIds, + long longThresholdSeconds, + long veryLongThresholdSeconds, + double cardRemovalThresholdPercent + ) { + ResolvedPosition position = resolvePosition(evidence); + String clusterId = observation == null ? null : assignmentByObservationId.get(observation.observationId()); + boolean clusterNoise = DriverNdiDbscanClusterer.NOISE_CLUSTER_ID.equals(clusterId); + + DriverNdiHomeStatus status; + DriverNdiHomeClassificationReason reason; + if (vehicleChanged(evidence)) { + status = DriverNdiHomeStatus.HOME; + reason = DriverNdiHomeClassificationReason.VEHICLE_CHANGED; + } else if (evidence.cardAbsentCoveragePercent() > cardRemovalThresholdPercent) { + status = DriverNdiHomeStatus.HOME; + reason = DriverNdiHomeClassificationReason.CARD_REMOVED_OVER_THRESHOLD; + } else if (evidence.durationSeconds() > veryLongThresholdSeconds) { + status = DriverNdiHomeStatus.HOME; + reason = DriverNdiHomeClassificationReason.REST_OVER_VERY_LONG_THRESHOLD; + } else if (position == null) { + if (evidence.durationSeconds() > longThresholdSeconds) { + status = DriverNdiHomeStatus.HOME; + reason = DriverNdiHomeClassificationReason.NO_POSITION_LONG_REST; + } else { + status = DriverNdiHomeStatus.NOT_HOME; + reason = DriverNdiHomeClassificationReason.NO_POSITION_SHORT_REST; + } + } else if (evidence.durationSeconds() > longThresholdSeconds) { + if (clusterId != null && companyHomeClusterIds.contains(clusterId)) { + status = DriverNdiHomeStatus.HOME; + reason = DriverNdiHomeClassificationReason.COMPANY_HOME_CLUSTER; + } else if (clusterId != null && driverHomeClusterIds.contains(clusterId)) { + status = DriverNdiHomeStatus.HOME; + reason = DriverNdiHomeClassificationReason.DRIVER_HOME_CLUSTER; + } else { + status = DriverNdiHomeStatus.NOT_HOME; + reason = DriverNdiHomeClassificationReason.LONG_REST_OUTSIDE_HOME_CLUSTER; + } + } else { + status = DriverNdiHomeStatus.NOT_HOME; + reason = DriverNdiHomeClassificationReason.SHORT_REST; + } + + return new DriverNdiHomeClassification( + intervalId(evidence), + evidence, + position == null ? null : position.latitude(), + position == null ? null : position.longitude(), + position == null ? null : position.source(), + clusterId, + clusterNoise, + status, + reason + ); + } + + private Set determineCompanyHomeClusters( + DriverNdiDbscanClusterer.ClusterResult clusterResult, + int totalObservationCount, + double visitShareThresholdPercent + ) { + if (totalObservationCount <= 0) { + return Set.of(); + } + LinkedHashSet result = new LinkedHashSet<>(); + clusterResult.clusters().values().stream() + .sorted(Comparator.comparing(DriverNdiDbscanClusterer.ClusterMembers::clusterId)) + .forEach(cluster -> { + double share = cluster.members().size() * 100.0d / totalObservationCount; + if (share > visitShareThresholdPercent) { + result.add(cluster.clusterId()); + } + }); + return Collections.unmodifiableSet(result); + } + + private Set determineDriverHomeClusters( + String driverKey, + DriverNdiDbscanClusterer.ClusterResult clusterResult, + int driverObservationCount, + Set companyHomeClusterIds, + double visitShareThresholdPercent + ) { + if (driverObservationCount <= 0) { + return Set.of(); + } + LinkedHashSet result = new LinkedHashSet<>(); + clusterResult.clusters().values().stream() + .sorted(Comparator.comparing(DriverNdiDbscanClusterer.ClusterMembers::clusterId)) + .forEach(cluster -> { + long driverVisits = cluster.members().stream() + .filter(observation -> Objects.equals(driverKey, observation.driverKey())) + .count(); + double share = driverVisits * 100.0d / driverObservationCount; + if (share > visitShareThresholdPercent && !companyHomeClusterIds.contains(cluster.clusterId())) { + result.add(cluster.clusterId()); + } + }); + return Collections.unmodifiableSet(result); + } + + private List buildDriverClusterView( + String driverKey, + DriverNdiDbscanClusterer.ClusterResult clusterResult, + int totalObservationCount, + int driverObservationCount, + Set companyHomeClusterIds, + Set driverHomeClusterIds + ) { + return clusterResult.clusters().values().stream() + .sorted(Comparator.comparing(DriverNdiDbscanClusterer.ClusterMembers::clusterId)) + .map(cluster -> { + int actualDriverVisits = (int) cluster.members().stream() + .filter(observation -> Objects.equals(driverKey, observation.driverKey())) + .count(); + int totalVisits = cluster.members().size(); + int distinctDriverCount = (int) cluster.members().stream() + .map(DriverNdiLocationObservation::driverKey) + .filter(Objects::nonNull) + .distinct() + .count(); + return new DriverNdiLocationCluster( + cluster.clusterId(), + cluster.centroidLatitude(), + cluster.centroidLongitude(), + totalVisits, + distinctDriverCount, + actualDriverVisits, + totalVisits - actualDriverVisits, + totalObservationCount == 0 ? 0.0d : totalVisits * 100.0d / totalObservationCount, + driverObservationCount == 0 ? 0.0d : actualDriverVisits * 100.0d / driverObservationCount, + companyHomeClusterIds.contains(cluster.clusterId()), + driverHomeClusterIds.contains(cluster.clusterId()) + ); + }) + .toList(); + } + + private boolean vehicleChanged(DriverWorkingTimeRestCoverageInterval evidence) { + if (known(evidence.previousVehicleKey()) && known(evidence.nextVehicleKey())) { + return knownAndDifferent(evidence.previousVehicleKey(), evidence.nextVehicleKey()); + } + return knownAndDifferent(evidence.previousRegistrationKey(), evidence.nextRegistrationKey()); + } + + private boolean known(String value) { + return value != null && !value.isBlank(); + } + + private boolean knownAndDifferent(String left, String right) { + return left != null && !left.isBlank() + && right != null && !right.isBlank() + && !left.trim().equalsIgnoreCase(right.trim()); + } + + private ResolvedPosition resolvePosition(DriverWorkingTimeRestCoverageInterval evidence) { + if (validCoordinates(evidence.beginLatitude(), evidence.beginLongitude())) { + return new ResolvedPosition( + evidence.beginLatitude(), + evidence.beginLongitude(), + "PREVIOUS_DRIVE_END", + evidence.beginGeoEventId(), + evidence.beginGeoEventDomain(), + evidence.beginGeoDistanceSeconds() + ); + } + if (validCoordinates(evidence.endLatitude(), evidence.endLongitude())) { + return new ResolvedPosition( + evidence.endLatitude(), + evidence.endLongitude(), + "NEXT_DRIVE_START", + evidence.endGeoEventId(), + evidence.endGeoEventDomain(), + evidence.endGeoDistanceSeconds() + ); + } + return null; + } + + private boolean validCoordinates(Double latitude, Double longitude) { + return latitude != null + && longitude != null + && Double.isFinite(latitude) + && Double.isFinite(longitude) + && latitude >= -90.0d + && latitude <= 90.0d + && longitude >= -180.0d + && longitude <= 180.0d; + } + + private List observationSourceSessionIds( + UUID evidenceSessionId, + List requestSessionIds + ) { + if (evidenceSessionId != null) { + return List.of(evidenceSessionId); + } + return requestSessionIds == null ? List.of() : requestSessionIds; + } + + private List sourceSessionIds(UnifiedRuntimeProcessingApiRequest request) { + if (request == null) { + return List.of(); + } + LinkedHashSet values = new LinkedHashSet<>(); + if (request.sessionId() != null) { + values.add(request.sessionId()); + } + if (request.sessionIds() != null) { + request.sessionIds().stream().filter(Objects::nonNull).sorted().forEach(values::add); + } + if (request.sourceInputs() != null) { + request.sourceInputs().stream().filter(Objects::nonNull).forEach(sourceInput -> { + if (sourceInput.sessionId() != null) { + values.add(sourceInput.sessionId()); + } + if (sourceInput.sessionIds() != null) { + sourceInput.sessionIds().stream().filter(Objects::nonNull).sorted().forEach(values::add); + } + }); + } + return List.copyOf(values); + } + + private String corpusKey( + UnifiedRuntimeProcessingApiRequest request, + String tenantKey, + String cacheNamespace, + int longThresholdMinutes + ) { + boolean fileSessionCorpus = isFileSessionOnly(request); + if (!fileSessionCorpus) { + return tenantKey + "|RUNTIME|" + ALGORITHM_VERSION + "|LONG=" + longThresholdMinutes; + } + + String scope; + if (request != null && request.tenantKey() != null && !request.tenantKey().isBlank()) { + scope = "TENANT=" + tenantKey; + } else { + String normalizedNamespace = cacheNamespace == null || cacheNamespace.isBlank() + ? "default" + : cacheNamespace.trim(); + scope = "NAMESPACE=" + normalizedNamespace; + } + return scope + "|FILE_SESSION|" + ALGORITHM_VERSION + "|LONG=" + longThresholdMinutes; + } + + private boolean isFileSessionOnly(UnifiedRuntimeProcessingApiRequest request) { + if (request == null) { + return false; + } + if (request.sourceInputs() != null && !request.sourceInputs().isEmpty()) { + List sourceInputs = + request.sourceInputs().stream().filter(Objects::nonNull).toList(); + return !sourceInputs.isEmpty() && sourceInputs.stream().allMatch(sourceInput -> + sourceInput.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION); + } + if (request.sourceFamilies() != null && !request.sourceFamilies().isEmpty()) { + return request.sourceFamilies().stream().allMatch(sourceFamily -> + sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION); + } + return request.sessionId() != null + || (request.sessionIds() != null && !request.sessionIds().isEmpty()) + || request.compositeSessionId() != null; + } + + private String normalizedTenantKey(String tenantKey) { + return tenantKey == null || tenantKey.isBlank() ? "DEFAULT" : tenantKey.trim(); + } + + private String intervalId(DriverWorkingTimeRestCoverageInterval evidence) { + String raw = String.join("|", + nullSafe(evidence.driverKey()), + nullSafe(evidence.startedAt()), + nullSafe(evidence.endedAt()), + nullSafe(evidence.previousDrivingSourceIntervalId()), + nullSafe(evidence.nextDrivingSourceIntervalId()), + nullSafe(evidence.previousVehicleKey()), + nullSafe(evidence.nextVehicleKey()), + nullSafe(evidence.previousRegistrationKey()), + nullSafe(evidence.nextRegistrationKey()) + ); + return "NDI-" + shortHash(raw); + } + + private String observationId(String tenantKey, String intervalId) { + return "NDI-LOC-" + shortHash(tenantKey + "|" + intervalId); + } + + private String shortHash(String value) { + try { + byte[] digest = MessageDigest.getInstance("SHA-256") + .digest(value.getBytes(StandardCharsets.UTF_8)); + StringBuilder result = new StringBuilder(24); + for (int index = 0; index < 12; index++) { + result.append(String.format(Locale.ROOT, "%02x", digest[index])); + } + return result.toString(); + } catch (NoSuchAlgorithmException ex) { + throw new IllegalStateException("SHA-256 is not available", ex); + } + } + + private String nullSafe(Object value) { + return value == null ? "" : value.toString(); + } + + private List safeList(List values) { + return values == null ? List.of() : values; + } + + private record ResolvedPosition( + double latitude, + double longitude, + String source, + String geoEventId, + String geoEventDomain, + Long geoDistanceSeconds + ) { + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiLocationCorpusCache.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiLocationCorpusCache.java new file mode 100644 index 0000000..948f986 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiLocationCorpusCache.java @@ -0,0 +1,119 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationCorpusSnapshot; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationObservation; +import java.time.Duration; +import java.time.Instant; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.springframework.stereotype.Service; + +@Service +public class DriverNdiLocationCorpusCache { + + private final EventHubProperties properties; + private final ConcurrentMap corpora = new ConcurrentHashMap<>(); + + public DriverNdiLocationCorpusCache(EventHubProperties properties) { + this.properties = properties; + } + + public DriverNdiLocationCorpusSnapshot merge( + String corpusKey, + List newObservations + ) { + String normalizedKey = normalizeCorpusKey(corpusKey); + Instant now = Instant.now(); + Duration ttl = properties.getRuntimeProcessing().getNdiLocationCacheTtl(); + int maxObservations = properties.getRuntimeProcessing().getNdiLocationCacheMaxObservations(); + MergeMetrics metrics = new MergeMetrics(); + + CachedCorpus updated = corpora.compute(normalizedKey, (key, existing) -> { + LinkedHashMap observations = new LinkedHashMap<>(); + if (existing != null && existing.expiresAt().isAfter(now)) { + observations.putAll(existing.observationsById()); + } + metrics.before = observations.size(); + + for (DriverNdiLocationObservation observation : safeList(newObservations)) { + if (observation == null || observation.observationId() == null || observation.observationId().isBlank()) { + continue; + } + DriverNdiLocationObservation previous = observations.get(observation.observationId()); + if (previous == null) { + observations.put(observation.observationId(), observation); + metrics.added++; + } else { + observations.put(observation.observationId(), previous.mergeProvenance(observation)); + } + } + + if (observations.size() > maxObservations) { + List retained = observations.values().stream() + .sorted(Comparator + .comparing(DriverNdiLocationObservation::endedAt, Comparator.nullsFirst(Comparator.naturalOrder())) + .reversed() + .thenComparing(DriverNdiLocationObservation::observationId)) + .limit(maxObservations) + .toList(); + observations.clear(); + retained.stream() + .sorted(Comparator + .comparing(DriverNdiLocationObservation::startedAt, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(DriverNdiLocationObservation::observationId)) + .forEach(value -> observations.put(value.observationId(), value)); + } + + return new CachedCorpus( + Map.copyOf(observations), + now, + now.plus(ttl) + ); + }); + + List sorted = updated.observationsById().values().stream() + .sorted(Comparator + .comparing(DriverNdiLocationObservation::startedAt, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(DriverNdiLocationObservation::driverKey, Comparator.nullsLast(String::compareTo)) + .thenComparing(DriverNdiLocationObservation::observationId)) + .toList(); + return new DriverNdiLocationCorpusSnapshot( + normalizedKey, + metrics.before, + metrics.added, + sorted.size(), + sorted, + updated.updatedAt(), + updated.expiresAt() + ); + } + + public void clear() { + corpora.clear(); + } + + private String normalizeCorpusKey(String corpusKey) { + return corpusKey == null || corpusKey.isBlank() ? "DEFAULT|NDI_HOME_V1" : corpusKey.trim(); + } + + private List safeList(List values) { + return values == null ? List.of() : values; + } + + private record CachedCorpus( + Map observationsById, + Instant updatedAt, + Instant expiresAt + ) { + } + + private static final class MergeMetrics { + private int before; + private int added; + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java index ef46d54..f4faaf8 100644 --- a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java @@ -111,6 +111,27 @@ public class DriverWorkingTimeReusableProjectionBuilder { ); } + /** + * Reuses the complete card-absence and boundary-GNSS enrichment pipeline for every + * positive non-driving interval. Legacy daily/weekly-rest projections keep their + * configured minimum-rest threshold and are not changed by this method. + */ + public List buildAllNonDrivingIntervalCoverage( + DriverWorkingTimeProcessingInput input + ) { + if (input == null) { + return List.of(); + } + DriverWorkingTimeDerivedProjectionBundle allNonDrivingProjection = buildDerivedProjectionBundle( + buildActivityIntervalInputEvents(input.activityIntervals()), + buildVehicleUsageIntervalInputEventsCommon(input.vehicleUsageIntervals()), + buildSupportGeoInputEventsCommon(input.sessionId(), input.supportEvidenceEvents()), + input.significantDrivingMinutes(), + 0 + ); + return allNonDrivingProjection.dailyWeeklyRestCandidateCoverageIntervals(); + } + private DriverWorkingTimeDerivedProjectionBundle buildDerivedProjectionBundle( List> activityInputEvents, List> vehicleUsageInputEvents, @@ -910,7 +931,7 @@ public class DriverWorkingTimeReusableProjectionBuilder { ) .replace( "${MINIMUM_REST_PERIOD_THRESHOLD_SECONDS}", - Long.toString(Math.max(1, minimumRestPeriodMinutes) * 60L) + Long.toString(Math.max(0, minimumRestPeriodMinutes) * 60L) ) .replace( "${REST_GEO_LOOKBACK_SECONDS}", diff --git a/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeDerivedProjectionResultDto.java b/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeDerivedProjectionResultDto.java index d1c774a..bc51db8 100644 --- a/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeDerivedProjectionResultDto.java +++ b/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeDerivedProjectionResultDto.java @@ -1,8 +1,10 @@ package at.procon.eventhub.processing.dto; +import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationResult; import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; -import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto; +import com.fasterxml.jackson.annotation.JsonInclude; import java.util.List; public record UnifiedRuntimeDerivedProjectionResultDto( @@ -15,7 +17,9 @@ public record UnifiedRuntimeDerivedProjectionResultDto( DriverWorkingTimeProcessingResultDto projection, List notes, RuntimeSupportEvidenceNormalizationDebugDto supportEvidenceNormalization, - RuntimeDriverPartitionDebugDto partitionDebug + RuntimeDriverPartitionDebugDto partitionDebug, + @JsonInclude(JsonInclude.Include.NON_NULL) + DriverNdiHomeClassificationResult ndiHomeClassification ) { public UnifiedRuntimeDerivedProjectionResultDto { discoveredVehicles = discoveredVehicles == null ? List.of() : List.copyOf(discoveredVehicles); @@ -42,6 +46,7 @@ public record UnifiedRuntimeDerivedProjectionResultDto( projection, notes, null, + null, null ); } @@ -67,11 +72,37 @@ public record UnifiedRuntimeDerivedProjectionResultDto( projection, notes, supportEvidenceNormalization, + null, null ); } - + public UnifiedRuntimeDerivedProjectionResultDto( + UnifiedRuntimeProcessingRequest request, + int driverSeedEventCount, + int discoveredVehicleCount, + int expandedVehicleEventCount, + int mergedEventCount, + List discoveredVehicles, + DriverWorkingTimeProcessingResultDto projection, + List notes, + RuntimeSupportEvidenceNormalizationDebugDto supportEvidenceNormalization, + RuntimeDriverPartitionDebugDto partitionDebug + ) { + this( + request, + driverSeedEventCount, + discoveredVehicleCount, + expandedVehicleEventCount, + mergedEventCount, + discoveredVehicles, + projection, + notes, + supportEvidenceNormalization, + partitionDebug, + null + ); + } public UnifiedRuntimeDerivedProjectionResultDto withPartitionDebug(RuntimeDriverPartitionDebugDto debug) { return new UnifiedRuntimeDerivedProjectionResultDto( @@ -84,7 +115,8 @@ public record UnifiedRuntimeDerivedProjectionResultDto( projection, notes, supportEvidenceNormalization, - debug + debug, + ndiHomeClassification ); } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java index cf45a36..3e0292d 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java @@ -1,6 +1,7 @@ package at.procon.eventhub.processing.eventprocessing.module; import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDto; import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto; @@ -72,7 +73,8 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE, DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION ), - Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map"), + Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", + "Map"), Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto") ); } @@ -86,6 +88,8 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context); UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context); Map preparedInputs = preparedInputs(context); + DriverNdiHomeClassificationScopeResult ndiHomeClassificationScope = + optionalNdiHomeClassificationScope(context); LinkedHashMap driverResults = new LinkedHashMap<>(); List warnings = new ArrayList<>(); @@ -119,13 +123,19 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess projection, projection.notes(), preparedInput.partition().supportEvidenceNormalization(), - preparedInput.partition().partitionDebug() + preparedInput.partition().partitionDebug(), + ndiHomeClassificationScope == null + ? null + : ndiHomeClassificationScope.resultForDriver(preparedInput.driverKey()) )); } List notes = new ArrayList<>(broadBundle.notes()); notes.add("Runtime driver working-time processing used module-to-module dataflow for event assembly, activity intervalization, vehicle-usage intervalization, evidence attachment, support evidence normalization, and final derived projections."); notes.add("Selected driver partitions: " + driverResults.size() + "."); + if (ndiHomeClassificationScope != null) { + notes.addAll(ndiHomeClassificationScope.notes()); + } UnifiedRuntimeDriverWorkingTimeScopeResultDto result = new UnifiedRuntimeDriverWorkingTimeScopeResultDto( broadBundle.request(), @@ -194,6 +204,17 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess + " requires previous result " + DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY + "."); } + private DriverNdiHomeClassificationScopeResult optionalNdiHomeClassificationScope( + RuntimeProcessingModuleContext context + ) { + RuntimeProcessingModuleResult result = + context.previousResults().get(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION); + if (result != null && result.output() instanceof DriverNdiHomeClassificationScopeResult scopeResult) { + return scopeResult; + } + return null; + } + private UnifiedRuntimeProcessingApiRequest scopeRequest(RuntimeProcessingModuleContext context) { Object value = context.attributes().get("runtimeScopeApiRequest"); if (value instanceof UnifiedRuntimeProcessingApiRequest request) { diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java index 2af8da4..9f284f3 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java @@ -10,6 +10,7 @@ public final class DriverWorkingTimeModuleKeys { public static final String VEHICLE_USAGE_MERGE = "vehicle-usage-merge"; public static final String VEHICLE_EVIDENCE_ATTACHMENT = "vehicle-evidence-attachment"; public static final String SUPPORT_EVIDENCE_NORMALIZATION = "support-evidence-normalization"; + public static final String NDI_HOME_CLASSIFICATION = "ndi-home-classification"; public static final String DRIVING_DERIVED_PROJECTIONS = "driving-derived-projections"; private DriverWorkingTimeModuleKeys() { diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/NdiHomeClassificationModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/NdiHomeClassificationModule.java new file mode 100644 index 0000000..4d62285 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/NdiHomeClassificationModule.java @@ -0,0 +1,79 @@ +package at.procon.eventhub.processing.eventprocessing.module; + +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.service.DriverNdiHomeClassificationService; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; +import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.springframework.stereotype.Component; + +@Component +public class NdiHomeClassificationModule implements RuntimeProcessingModule { + + private final DriverNdiHomeClassificationService classificationService; + + public NdiHomeClassificationModule(DriverNdiHomeClassificationService classificationService) { + this.classificationService = classificationService; + } + + @Override + public String moduleKey() { + return DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION; + } + + @Override + public RuntimeProcessingModuleDescriptorDto descriptor() { + return new RuntimeProcessingModuleDescriptorDto( + moduleKey(), + "NDI home classification", + "Builds enriched non-driving intervals, accumulates long-rest locations in a driver-aware cache, applies Haversine DBSCAN, and classifies HOME/NOT_HOME using the ordered NDI rules.", + "ESPER+JAVA", + Set.of(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION), + Set.of("Map"), + Set.of("DriverNdiHomeClassificationScopeResult") + ); + } + + @Override + public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) { + Map preparedInputs = preparedInputs(context); + UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context); + DriverNdiHomeClassificationScopeResult result = classificationService.classifyPreparedInputs( + scopeRequest, + preparedInputs + ); + Map metadata = new LinkedHashMap<>(); + metadata.put("corpusKey", result.corpusKey()); + metadata.put("currentObservationCount", result.currentObservationCount()); + metadata.put("cachedObservationCount", result.cachedObservationCount()); + metadata.put("clusterCount", result.clusterCount()); + metadata.put("driverResultCount", result.driverResults().size()); + return new RuntimeProcessingModuleResult( + moduleKey(), + RuntimeProcessingModuleStatus.SUCCESS, + result, + metadata, + java.util.List.of() + ); + } + + private UnifiedRuntimeProcessingApiRequest scopeRequest(RuntimeProcessingModuleContext context) { + Object value = context.attributes().get("runtimeScopeApiRequest"); + if (value instanceof UnifiedRuntimeProcessingApiRequest request) { + return request; + } + return context.request().sourceSelection(); + } + + @SuppressWarnings("unchecked") + private Map preparedInputs(RuntimeProcessingModuleContext context) { + Object output = context.requireResult(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION).output(); + if (output instanceof Map map) { + return (Map) map; + } + return Map.of(); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverHomeClassificationRuntimeProcessingPlan.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverHomeClassificationRuntimeProcessingPlan.java new file mode 100644 index 0000000..fc7cfee --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverHomeClassificationRuntimeProcessingPlan.java @@ -0,0 +1,121 @@ +package at.procon.eventhub.processing.eventprocessing.plan; + +import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeModuleKeys; +import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.springframework.stereotype.Component; + +/** + * Dedicated public processing-plan entry point for NDI HOME/NOT_HOME classification. + * + *

The implementation deliberately delegates to the shared driver-working-time pipeline so + * activity intervalization, vehicle-usage reconciliation, support evidence normalization, and + * reusable NDI enrichment keep one source-neutral implementation.

+ */ +@Component +public class DriverHomeClassificationRuntimeProcessingPlan implements RuntimeProcessingPlan { + + public static final String PLAN_KEY = "driver-home-classification-v1"; + + private final DriverWorkingTimeRuntimeProcessingPlan delegate; + + public DriverHomeClassificationRuntimeProcessingPlan(DriverWorkingTimeRuntimeProcessingPlan delegate) { + this.delegate = delegate; + } + + @Override + public String processingPlanKey() { + return PLAN_KEY; + } + + @Override + public RuntimeEventPartitioningStrategy defaultPartitioningStrategy() { + return delegate.defaultPartitioningStrategy(); + } + + @Override + public String displayName() { + return "Driver NDI home classification"; + } + + @Override + public String description() { + return "Builds enriched non-driving intervals, learns company and driver home locations " + + "from one or more tachograph file sessions, and applies ordered HOME/NOT_HOME rules."; + } + + @Override + public List supportedPartitioningStrategies() { + return delegate.supportedPartitioningStrategies(); + } + + @Override + public List modules() { + return delegate.modules(); + } + + @Override + public Set requiredParameters() { + return delegate.requiredParameters(); + } + + @Override + public Set optionalParameters() { + return delegate.optionalParameters(); + } + + @Override + public RuntimeProcessingExecutionResultDto execute(RuntimeProcessingExecutionApiRequest request) { + RuntimeProcessingExecutionApiRequest delegatedRequest = prepareDelegatedRequest(request); + RuntimeProcessingExecutionResultDto result = delegate.execute(delegatedRequest); + return new RuntimeProcessingExecutionResultDto( + PLAN_KEY, + result.executedModules(), + result.partitioningStrategy(), + result.request(), + result.inputEventCount(), + result.selectedPartitionCount(), + result.discoveredVehicleCount(), + result.discoveredVehicles(), + result.moduleResults(), + result.partitionResults(), + result.notes(), + result.warnings() + ); + } + + RuntimeProcessingExecutionApiRequest prepareDelegatedRequest(RuntimeProcessingExecutionApiRequest request) { + List modules = new ArrayList<>(); + if (request.modules() != null) { + request.modules().stream() + .filter(value -> value != null && !value.isBlank()) + .map(String::trim) + .forEach(modules::add); + } + modules.removeIf(moduleKey -> DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION.equals(moduleKey) + || DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS.equals(moduleKey)); + modules.add(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION); + modules.add(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS); + + Map parameters = new LinkedHashMap<>(); + if (request.parameters() != null) { + parameters.putAll(request.parameters()); + } + parameters.putIfAbsent( + DriverWorkingTimeRuntimeProcessingPlan.NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER, + true + ); + + return new RuntimeProcessingExecutionApiRequest( + DriverWorkingTimeRuntimeProcessingPlan.PLAN_KEY, + request.sourceSelection(), + request.partitioning(), + List.copyOf(modules), + parameters + ); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java index dbe63cd..16a085c 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java @@ -20,6 +20,7 @@ import at.procon.eventhub.processing.eventprocessing.module.SupportEvidenceNorma import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeDerivedProjectionsModule; import at.procon.eventhub.processing.service.RuntimeDriverWorkingTimeScopeProcessingService; import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -37,6 +38,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing public static final String INCLUDE_PARTITION_METADATA_PARAMETER = "includePartitionMetadata"; public static final String INCLUDE_PARTITION_MODULE_RESULTS_PARAMETER = "includePartitionModuleResults"; public static final String INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER = "includeSupportEvidenceNormalization"; + public static final String NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER = "ndiLearnAllFileSessionDrivers"; private final RuntimeProcessingPipelineExecutor pipelineExecutor; private final boolean includeRuntimeEventAssemblyModule; @@ -194,10 +196,19 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing Set.of("Map", "DriverActivityIntervalEvent"), Set.of("RuntimeSupportEvidenceNormalizationDebugDto") ), + new RuntimeProcessingModuleDescriptorDto( + DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION, + "NDI home classification", + "Builds enriched non-driving intervals, accumulates long-rest locations across file sessions, clusters them with Haversine DBSCAN, and applies ordered HOME/NOT_HOME rules.", + "ESPER+JAVA", + Set.of(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION), + Set.of("Map"), + Set.of("DriverNdiHomeClassificationScopeResult") + ), new RuntimeProcessingModuleDescriptorDto( DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, "Driving-derived projections", - "Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates.", + "Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates; attaches NDI HOME/NOT_HOME classification when that optional module was requested.", "ESPER+JAVA", Set.of( DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS, @@ -205,9 +216,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION ), Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map"), - Set.of("DriverWorkingTimeProcessingResultDto") + Set.of("DriverWorkingTimeProcessingResultDto", "DriverNdiHomeClassificationResult") ) )); + if (!includeRuntimeEventAssemblyModule) { + descriptors.removeIf(descriptor -> DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION + .equals(descriptor.moduleKey())); + } return List.copyOf(descriptors); } @@ -225,6 +240,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "includeDrivingIntervals", "includePartitionDebug", INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER, + NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER, "eventMixingMode" ); } @@ -266,11 +282,24 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing request.partitioning(), request.parameters() ); - UnifiedRuntimeProcessingApiRequest scopeRequest = applyExecutionRequest( + UnifiedRuntimeProcessingApiRequest requestedScopeRequest = applyExecutionRequest( request.sourceSelection(), request.partitioning(), request.parameters() ); + boolean ndiHomeClassificationRequested = requestsNdiHomeClassification(request.modules()); + boolean learnAllFileSessionDrivers = booleanParameter( + request.parameters(), + NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER, + false + ); + UnifiedRuntimeProcessingApiRequest scopeRequest = expandFileSessionLearningScope( + requestedScopeRequest, + ndiHomeClassificationRequested && learnAllFileSessionDrivers + ); + Set requestedOutputDriverKeys = requestedOutputDriverKeys(requestedScopeRequest); + boolean filterOutputDrivers = !Boolean.TRUE.equals(requestedScopeRequest.includeAllDrivers()) + && !requestedOutputDriverKeys.isEmpty(); Map attributes = new LinkedHashMap<>(); attributes.put("runtimeScopeApiRequest", scopeRequest); @@ -295,6 +324,9 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing Map partitionResults = new LinkedHashMap<>(); workingTimeResult.driverResults().forEach((driverKey, driverResult) -> { + if (filterOutputDrivers && !requestedOutputDriverKeys.contains(driverKey)) { + return; + } UnifiedRuntimeDerivedProjectionResultDto shapedDriverResult = shapeDriverResult( driverResult, includeSupportEvidenceNormalization, @@ -316,18 +348,31 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing ); }); + boolean learningScopeExpanded = !scopeRequest.equals(requestedScopeRequest); + List resultNotes = new java.util.ArrayList<>(workingTimeResult.notes()); + if (learningScopeExpanded) { + resultNotes.add("NDI location learning expanded the selected tachograph file sessions to all drivers; " + + "the response remains filtered to the originally requested driver partition(s)."); + } + List responseVehicles = learningScopeExpanded + ? selectedDiscoveredVehicles(partitionResults) + : workingTimeResult.discoveredVehicles(); + int responseInputEventCount = learningScopeExpanded + ? selectedInputEventCount(partitionResults) + : workingTimeResult.inputEventCount(); + return new RuntimeProcessingExecutionResultDto( processingPlanKey(), executedModules, RuntimeEventPartitioningStrategy.DRIVER, - workingTimeResult.request(), - workingTimeResult.inputEventCount(), - workingTimeResult.selectedDriverCount(), - workingTimeResult.discoveredVehicleCount(), - workingTimeResult.discoveredVehicles(), + requestedScopeRequest.toRuntimeRequest(), + responseInputEventCount, + partitionResults.size(), + responseVehicles.size(), + responseVehicles, includeExecutionModuleResults ? sanitizeExecutionModuleResults(moduleResults) : Map.of(), partitionResults, - workingTimeResult.notes(), + resultNotes, workingTimeResult.warnings() ); } @@ -352,6 +397,45 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing + (output == null ? "null" : output.getClass().getName())); } + private List selectedDiscoveredVehicles( + Map partitionResults + ) { + List vehicles = new java.util.ArrayList<>(); + for (RuntimeEventProcessingPartitionResultDto partition : partitionResults.values()) { + if (!(partition.result() instanceof UnifiedRuntimeDerivedProjectionResultDto driverResult)) { + continue; + } + for (UnifiedDiscoveredVehicleRef candidate : driverResult.discoveredVehicles()) { + boolean merged = false; + for (int index = 0; index < vehicles.size(); index++) { + UnifiedDiscoveredVehicleRef existing = vehicles.get(index); + if (existing.matches(candidate)) { + vehicles.set(index, existing.merge(candidate)); + merged = true; + break; + } + } + if (!merged) { + vehicles.add(candidate); + } + } + } + vehicles.sort(java.util.Comparator.comparing(UnifiedDiscoveredVehicleRef::stableKey)); + return List.copyOf(vehicles); + } + + private int selectedInputEventCount( + Map partitionResults + ) { + long count = partitionResults.values().stream() + .map(RuntimeEventProcessingPartitionResultDto::result) + .filter(UnifiedRuntimeDerivedProjectionResultDto.class::isInstance) + .map(UnifiedRuntimeDerivedProjectionResultDto.class::cast) + .mapToLong(UnifiedRuntimeDerivedProjectionResultDto::mergedEventCount) + .sum(); + return (int) Math.min(Integer.MAX_VALUE, Math.max(0L, count)); + } + private Map sanitizeExecutionModuleResults( Map moduleResults ) { @@ -403,6 +487,24 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing List.of() ) ); + if (driverResult.ndiHomeClassification() != null) { + results.put( + DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION, + new RuntimeProcessingModuleResult( + DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION, + RuntimeProcessingModuleStatus.SUCCESS, + driverResult.ndiHomeClassification(), + Map.of( + "nonDrivingIntervalCount", driverResult.ndiHomeClassification().nonDrivingIntervalCount(), + "cachedActualDriverObservationCount", driverResult.ndiHomeClassification().cachedActualDriverObservationCount(), + "cachedOtherDriverObservationCount", driverResult.ndiHomeClassification().cachedOtherDriverObservationCount(), + "companyHomeClusterCount", driverResult.ndiHomeClassification().companyHomeClusterCount(), + "driverHomeClusterCount", driverResult.ndiHomeClassification().driverHomeClusterCount() + ), + List.of() + ) + ); + } results.put( DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, new RuntimeProcessingModuleResult( @@ -433,6 +535,16 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing if (driverResult.partitionDebug() != null) { metadata.put("partitionDebug", driverResult.partitionDebug()); } + if (driverResult.ndiHomeClassification() != null) { + metadata.put("ndiHomeClassification", Map.of( + "nonDrivingIntervalCount", driverResult.ndiHomeClassification().nonDrivingIntervalCount(), + "currentLongLocationObservationCount", driverResult.ndiHomeClassification().currentLongLocationObservationCount(), + "cachedActualDriverObservationCount", driverResult.ndiHomeClassification().cachedActualDriverObservationCount(), + "cachedOtherDriverObservationCount", driverResult.ndiHomeClassification().cachedOtherDriverObservationCount(), + "companyHomeClusterCount", driverResult.ndiHomeClassification().companyHomeClusterCount(), + "driverHomeClusterCount", driverResult.ndiHomeClassification().driverHomeClusterCount() + )); + } return metadata; } @@ -451,10 +563,89 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing driverResult.projection(), driverResult.notes(), includeSupportEvidenceNormalization ? driverResult.supportEvidenceNormalization() : null, - includePartitionDebug ? driverResult.partitionDebug() : null + includePartitionDebug ? driverResult.partitionDebug() : null, + driverResult.ndiHomeClassification() ); } + UnifiedRuntimeProcessingApiRequest expandFileSessionLearningScope( + UnifiedRuntimeProcessingApiRequest request, + boolean enabled + ) { + if (!enabled || !includeRuntimeEventAssemblyModule || request == null || !isFileSessionOnly(request)) { + return request; + } + Set explicitDriverKeys = requestedOutputDriverKeys(request); + if (Boolean.TRUE.equals(request.includeAllDrivers()) && explicitDriverKeys.isEmpty()) { + return request; + } + // Without explicit canonical driver keys the response cannot be filtered safely after + // broadening the internal learning scope. Keep alternate card/source selectors unchanged. + if (explicitDriverKeys.isEmpty()) { + return request; + } + return new UnifiedRuntimeProcessingApiRequest( + request.sessionId(), + request.sessionIds(), + request.compositeSessionId(), + request.tenantKey(), + request.sourceFamilies(), + request.eventBackend(), + request.sourceKinds(), + null, + Set.of(), + true, + request.vehicleKeys(), + request.includeAllVehicles(), + null, + null, + null, + request.occurredFrom(), + request.occurredTo(), + request.expandVehicleEvents(), + request.vehicleExpansionPaddingMinutes(), + request.includeIntersectingIntervals(), + request.significantDrivingMinutes(), + request.minimumRestPeriodMinutes(), + request.includeActivityIntervals(), + request.includeDrivingIntervals(), + request.sourceInputs() + ); + } + + private boolean isFileSessionOnly(UnifiedRuntimeProcessingApiRequest request) { + if (request.sourceInputs() != null && !request.sourceInputs().isEmpty()) { + List sourceInputs = + request.sourceInputs().stream().filter(java.util.Objects::nonNull).toList(); + return !sourceInputs.isEmpty() && sourceInputs.stream().allMatch(input -> input.sourceFamily() + == at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION); + } + if (request.sourceFamilies() != null && !request.sourceFamilies().isEmpty()) { + return request.sourceFamilies().stream().allMatch(sourceFamily -> sourceFamily + == at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION); + } + return request.sessionId() != null + || (request.sessionIds() != null && !request.sessionIds().isEmpty()) + || request.compositeSessionId() != null; + } + + private Set requestedOutputDriverKeys(UnifiedRuntimeProcessingApiRequest request) { + java.util.LinkedHashSet result = new java.util.LinkedHashSet<>(); + if (request == null) { + return Set.of(); + } + if (request.driverKeys() != null) { + request.driverKeys().stream() + .filter(value -> value != null && !value.isBlank()) + .map(String::trim) + .forEach(result::add); + } + if (request.driverKey() != null && !request.driverKey().isBlank()) { + result.add(request.driverKey().trim()); + } + return Set.copyOf(result); + } + public UnifiedRuntimeProcessingApiRequest applyExecutionRequest( UnifiedRuntimeProcessingApiRequest sourceSelection, RuntimeEventPartitioningApiRequest partitioning, @@ -567,15 +758,37 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing requested.put(module.trim(), module.trim()); } } - requested.putIfAbsent(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, - DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS); + boolean includeNdiHomeClassification = + requested.remove(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION) != null; + requested.remove(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS); + if (includeNdiHomeClassification) { + requested.put( + DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION, + DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION + ); + } + requested.put( + DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, + DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS + ); return List.copyOf(requested.values()); } return modules().stream() .map(RuntimeProcessingModuleDescriptorDto::moduleKey) + .filter(moduleKey -> !DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION.equals(moduleKey)) .toList(); } + private boolean requestsNdiHomeClassification(List requestedModules) { + if (requestedModules == null) { + return false; + } + return requestedModules.stream() + .filter(java.util.Objects::nonNull) + .map(String::trim) + .anyMatch(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION::equals); + } + private boolean booleanParameter(Map parameters, String key, boolean fallback) { if (parameters == null || !parameters.containsKey(key)) { return fallback; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 79e3eb1..14a779b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -128,6 +128,15 @@ eventhub: processing: operating-split-idle-hours: 7 significant-driving-minutes: 3 + ndi-long-minutes: 450 + ndi-very-long-minutes: 1440 + ndi-card-removal-percent: 80 + ndi-visit-share-percent: 25 + ndi-dbscan-eps-meters: 150 + ndi-dbscan-min-points: 3 + ndi-location-cache-ttl: 4h + ndi-location-cache-max-observations: 100000 + ndi-location-cache-namespace: default merge-gap-seconds: 0 gap-detection-tolerance-seconds: 0 timeline-input-mode: events diff --git a/src/test/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiDbscanClustererTest.java b/src/test/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiDbscanClustererTest.java new file mode 100644 index 0000000..0df17c8 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiDbscanClustererTest.java @@ -0,0 +1,63 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiLocationObservation; +import java.time.OffsetDateTime; +import java.util.List; +import org.junit.jupiter.api.Test; + +class DriverNdiDbscanClustererTest { + + private final DriverNdiDbscanClusterer clusterer = new DriverNdiDbscanClusterer(); + + @Test + void clustersThreeNearbyLocationsAndKeepsRemoteLocationAsNoise() { + List observations = List.of( + observation("A", "D1", 48.20820, 16.37380), + observation("B", "D2", 48.20835, 16.37390), + observation("C", "D3", 48.20810, 16.37405), + observation("D", "D4", 48.25000, 16.45000) + ); + + DriverNdiDbscanClusterer.ClusterResult result = clusterer.cluster(observations, 150.0d, 3); + + assertThat(result.clusters()).hasSize(1); + assertThat(result.assignmentByObservationId().get("A")) + .isEqualTo(result.assignmentByObservationId().get("B")) + .isEqualTo(result.assignmentByObservationId().get("C")); + assertThat(result.assignmentByObservationId().get("D")) + .isEqualTo(DriverNdiDbscanClusterer.NOISE_CLUSTER_ID); + } + + private DriverNdiLocationObservation observation( + String observationId, + String driverKey, + double latitude, + double longitude + ) { + OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z"); + return new DriverNdiLocationObservation( + observationId, + "TENANT", + List.of(), + null, + driverKey, + start, + start.plusHours(8), + 8 * 3600L, + latitude, + longitude, + "PREVIOUS_DRIVE_END", + null, + null, + null, + "P-" + observationId, + "N-" + observationId, + "REG-1", + "REG-1", + "VIN-1", + "VIN-1" + ); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiHomeClassificationServiceTest.java b/src/test/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiHomeClassificationServiceTest.java new file mode 100644 index 0000000..99daf8f --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/driverworkingtime/homeclassification/service/DriverNdiHomeClassificationServiceTest.java @@ -0,0 +1,301 @@ +package at.procon.eventhub.processing.driverworkingtime.homeclassification.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationReason; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationResult; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeStatus; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeRestCoverageInterval; +import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import java.time.OffsetDateTime; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class DriverNdiHomeClassificationServiceTest { + + private DriverNdiLocationCorpusCache cache; + private DriverNdiHomeClassificationService service; + + @BeforeEach + void setUp() { + EventHubProperties properties = new EventHubProperties(); + cache = new DriverNdiLocationCorpusCache(properties); + service = new DriverNdiHomeClassificationService( + null, + cache, + new DriverNdiDbscanClusterer(), + properties + ); + } + + @Test + void accumulatesOtherDriverLocationsAcrossFileSessionsAndUsesThemAsCompanyHome() { + OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z"); + Map> firstSession = new LinkedHashMap<>(); + firstSession.put("D2", List.of(evidence("D2", start, 8, 48.20820, 16.37380, 0.0d, "VIN-DEPOT", "VIN-DEPOT"))); + firstSession.put("D3", List.of(evidence("D3", start.plusDays(1), 8, 48.20825, 16.37385, 0.0d, "VIN-DEPOT", "VIN-DEPOT"))); + firstSession.put("D4", List.of(evidence("D4", start.plusDays(2), 8, 48.20815, 16.37375, 0.0d, "VIN-DEPOT", "VIN-DEPOT"))); + + service.classifyEvidence(request(UUID.randomUUID()), firstSession); + + Map> secondSession = Map.of( + "D1", + List.of(evidence("D1", start.plusDays(3), 8, 48.20822, 16.37382, 0.0d, "VIN-DEPOT", "VIN-DEPOT")) + ); + DriverNdiHomeClassificationScopeResult result = service.classifyEvidence( + request(UUID.randomUUID()), + secondSession + ); + DriverNdiHomeClassificationResult driver = result.resultForDriver("D1"); + + assertThat(driver.cachedActualDriverObservationCount()).isEqualTo(1); + assertThat(driver.cachedOtherDriverObservationCount()).isEqualTo(3); + assertThat(driver.companyHomeClusterCount()).isEqualTo(1); + assertThat(driver.classifications()).singleElement().satisfies(classification -> { + assertThat(classification.status()).isEqualTo(DriverNdiHomeStatus.HOME); + assertThat(classification.reason()).isEqualTo(DriverNdiHomeClassificationReason.COMPANY_HOME_CLUSTER); + }); + } + + @Test + void identifiesDriverPrivateHomeSeparatelyFromOtherDriverLocations() { + OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z"); + Map> evidenceByDriver = new LinkedHashMap<>(); + evidenceByDriver.put("D1", List.of( + evidence("D1", start, 8, 48.20820, 16.37380, 0.0d, "VIN-A", "VIN-A"), + evidence("D1", start.plusDays(1), 8, 48.20825, 16.37385, 0.0d, "VIN-A", "VIN-A"), + evidence("D1", start.plusDays(2), 8, 48.20815, 16.37375, 0.0d, "VIN-A", "VIN-A"), + evidence("D1", start.plusDays(3), 8, 48.30000, 16.50000, 0.0d, "VIN-A", "VIN-A") + )); + for (int index = 0; index < 9; index++) { + double latitude = 47.0d + index * 0.1d; + double longitude = 14.0d + index * 0.1d; + evidenceByDriver.put( + "OTHER-" + index, + List.of(evidence( + "OTHER-" + index, + start.plusDays(10L + index), + 8, + latitude, + longitude, + 0.0d, + "VIN-" + index, + "VIN-" + index + )) + ); + } + + DriverNdiHomeClassificationResult driver = service.classifyEvidence( + request(UUID.randomUUID()), + evidenceByDriver + ).resultForDriver("D1"); + + assertThat(driver.companyHomeClusterCount()).isZero(); + assertThat(driver.driverHomeClusterCount()).isEqualTo(1); + assertThat(driver.classifications().subList(0, 3)) + .allSatisfy(classification -> { + assertThat(classification.status()).isEqualTo(DriverNdiHomeStatus.HOME); + assertThat(classification.reason()).isEqualTo(DriverNdiHomeClassificationReason.DRIVER_HOME_CLUSTER); + }); + assertThat(driver.classifications().get(3).reason()) + .isEqualTo(DriverNdiHomeClassificationReason.LONG_REST_OUTSIDE_HOME_CLUSTER); + } + + @Test + void reprocessingTheSameNdiUpdatesLocationWithoutDuplicatingTheCachedVisit() { + OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z"); + service.classifyEvidence( + request(UUID.randomUUID()), + Map.of("D1", List.of(evidence("D1", start, 8, 48.20820, 16.37380, 0.0d, "VIN-A", "VIN-A"))) + ); + + DriverNdiHomeClassificationScopeResult result = service.classifyEvidence( + request(UUID.randomUUID()), + Map.of("D1", List.of(evidence("D1", start, 8, 48.20900, 16.37450, 0.0d, "VIN-A", "VIN-A"))) + ); + + assertThat(result.cachedObservationCount()).isEqualTo(1); + assertThat(result.resultForDriver("D1").cachedActualDriverObservationCount()).isEqualTo(1); + assertThat(result.resultForDriver("D1").classifications()) + .singleElement() + .satisfies(classification -> { + assertThat(classification.latitude()).isEqualTo(48.20900); + assertThat(classification.longitude()).isEqualTo(16.37450); + }); + } + + @Test + void doesNotTreatRegistrationChangeAsVehicleChangeWhenVinIsUnchanged() { + OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z"); + DriverWorkingTimeRestCoverageInterval interval = evidenceWithIdentity( + "D1", + start, + 2, + null, + null, + 0.0d, + "REG-A", + "REG-B", + "VIN-A", + "VIN-A" + ); + + DriverNdiHomeClassificationResult result = service.classifyEvidence( + request(UUID.randomUUID()), + Map.of("D1", List.of(interval)) + ).resultForDriver("D1"); + + assertThat(result.classifications()).singleElement().satisfies(classification -> { + assertThat(classification.status()).isEqualTo(DriverNdiHomeStatus.NOT_HOME); + assertThat(classification.reason()).isEqualTo(DriverNdiHomeClassificationReason.NO_POSITION_SHORT_REST); + }); + } + + @Test + void appliesRulesInDocumentOrderBeforeLocationClassification() { + OffsetDateTime start = OffsetDateTime.parse("2026-05-01T00:00:00Z"); + List evidence = List.of( + evidence("D1", start, 1, null, null, 0.0d, "VIN-A", "VIN-B"), + evidence("D1", start.plusDays(1), 2, null, null, 81.0d, "VIN-A", "VIN-A"), + evidence("D1", start.plusDays(2), 25, 48.5, 16.5, 0.0d, "VIN-A", "VIN-A"), + evidence("D1", start.plusDays(4), 8, null, null, 0.0d, "VIN-A", "VIN-A"), + evidence("D1", start.plusDays(5), 2, null, null, 0.0d, "VIN-A", "VIN-A") + ); + + DriverNdiHomeClassificationResult driver = service.classifyEvidence( + request(UUID.randomUUID()), + Map.of("D1", evidence) + ).resultForDriver("D1"); + + assertThat(driver.classifications()) + .extracting(value -> value.reason()) + .containsExactly( + DriverNdiHomeClassificationReason.VEHICLE_CHANGED, + DriverNdiHomeClassificationReason.CARD_REMOVED_OVER_THRESHOLD, + DriverNdiHomeClassificationReason.REST_OVER_VERY_LONG_THRESHOLD, + DriverNdiHomeClassificationReason.NO_POSITION_LONG_REST, + DriverNdiHomeClassificationReason.NO_POSITION_SHORT_REST + ); + assertThat(driver.classifications()) + .extracting(value -> value.status()) + .containsExactly( + DriverNdiHomeStatus.HOME, + DriverNdiHomeStatus.HOME, + DriverNdiHomeStatus.HOME, + DriverNdiHomeStatus.HOME, + DriverNdiHomeStatus.NOT_HOME + ); + } + + private UnifiedRuntimeProcessingApiRequest request(UUID sessionId) { + return new UnifiedRuntimeProcessingApiRequest( + sessionId, + List.of(sessionId), + null, + "TENANT", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION), + null, + Set.of("DRIVER_CARD", "VEHICLE_UNIT"), + null, + Set.of(), + true, + Set.of(), + true, + null, + null, + null, + null, + null, + true, + 0, + true, + 3, + 720, + true, + true + ); + } + + private DriverWorkingTimeRestCoverageInterval evidence( + String driverKey, + OffsetDateTime start, + int durationHours, + Double latitude, + Double longitude, + double cardAbsentCoveragePercent, + String previousVehicle, + String nextVehicle + ) { + return evidenceWithIdentity( + driverKey, + start, + durationHours, + latitude, + longitude, + cardAbsentCoveragePercent, + "REG-A", + previousVehicle.equals(nextVehicle) ? "REG-A" : "REG-B", + previousVehicle, + nextVehicle + ); + } + + private DriverWorkingTimeRestCoverageInterval evidenceWithIdentity( + String driverKey, + OffsetDateTime start, + int durationHours, + Double latitude, + Double longitude, + double cardAbsentCoveragePercent, + String previousRegistration, + String nextRegistration, + String previousVehicle, + String nextVehicle + ) { + OffsetDateTime end = start.plusHours(durationHours); + long durationSeconds = durationHours * 3600L; + return new DriverWorkingTimeRestCoverageInterval( + UUID.randomUUID(), + driverKey, + start, + end, + durationSeconds, + Math.round(durationSeconds * cardAbsentCoveragePercent / 100.0d), + cardAbsentCoveragePercent, + "DRIVE-BEFORE-" + start, + "DRIVE-AFTER-" + start, + previousRegistration, + nextRegistration, + previousVehicle, + nextVehicle, + null, + null, + latitude == null ? null : "GEO-" + start, + latitude == null ? null : "POSITION", + latitude == null ? null : start, + latitude, + longitude, + latitude == null ? null : 0L, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java b/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java index 1ba8df4..90af49d 100644 --- a/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java +++ b/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java @@ -423,4 +423,103 @@ class DriverWorkingTimeReusableProjectionBuilderTest { .isNull(); } + + @Test + void buildsEnrichedCoverageForNdiBelowLegacyRestThreshold() { + DriverWorkingTimeReusableProjectionBuilder builder = + new DriverWorkingTimeReusableProjectionBuilder(new EventHubProperties()); + UUID sessionId = UUID.randomUUID(); + OffsetDateTime from = OffsetDateTime.parse("2026-05-01T08:00:00Z"); + OffsetDateTime firstDriveEnd = OffsetDateTime.parse("2026-05-01T09:00:00Z"); + OffsetDateTime secondDriveStart = OffsetDateTime.parse("2026-05-01T10:00:00Z"); + OffsetDateTime to = OffsetDateTime.parse("2026-05-01T11:00:00Z"); + + DriverWorkingTimeProcessingInput input = new DriverWorkingTimeProcessingInput( + sessionId, + "12:123", + "DRIVER_CARD", + from, + to, + from, + to, + 3, + 720, + List.of( + new DriverWorkingTimeActivityInterval( + sessionId, + "12:123", + "ACT-1", + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "DRIVER_CARD", + "ACT-1", + "ACT-1", + from, + firstDriveEnd, + from.toEpochSecond(), + firstDriveEnd.toEpochSecond(), + firstDriveEnd.toEpochSecond() - from.toEpochSecond(), + List.of("ACT-1"), + false, + false, + "RAW_INTERVAL" + ), + new DriverWorkingTimeActivityInterval( + sessionId, + "12:123", + "ACT-2", + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "DRIVER_CARD", + "ACT-2", + "ACT-2", + secondDriveStart, + to, + secondDriveStart.toEpochSecond(), + to.toEpochSecond(), + to.toEpochSecond() - secondDriveStart.toEpochSecond(), + List.of("ACT-2"), + false, + false, + "RAW_INTERVAL" + ) + ), + List.of( + new DriverWorkingTimeVehicleUsageInterval( + sessionId, + "12:123", + "VU-1", + "VU-START", + "VU-END", + from, + to, + from.toEpochSecond(), + to.toEpochSecond(), + to.toEpochSecond() - from.toEpochSecond(), + null, + null, + "12:REG-1", + "VIN-1", + "DRIVER_CARD", + List.of("VU-START", "VU-END") + ) + ), + List.of(), + List.of() + ); + + assertThat(builder.buildDerivedProjectionBundle(input).dailyWeeklyRestCandidateCoverageIntervals()).isEmpty(); + assertThat(builder.buildAllNonDrivingIntervalCoverage(input)) + .singleElement() + .satisfies(interval -> assertThat(interval.durationSeconds()).isEqualTo(3600L)); + } + } diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModuleTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModuleTest.java index d359b27..67e4ea7 100644 --- a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModuleTest.java +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModuleTest.java @@ -5,6 +5,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto; +import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDriverPartition; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; @@ -202,6 +203,21 @@ class DriverWorkingTimeDerivedProjectionsModuleTest { Map.of("12:123", preparedInput), Map.of(), List.of() + ), + DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION, + new RuntimeProcessingModuleResult( + DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION, + RuntimeProcessingModuleStatus.SUCCESS, + new DriverNdiHomeClassificationScopeResult( + "TENANT|FILE_SESSION|NDI_HOME_V1", + 0, + 0, + 0, + Map.of(), + List.of() + ), + Map.of(), + List.of() ) ) ); diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java index 0610cdb..2226e5a 100644 --- a/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java @@ -8,6 +8,8 @@ import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDt import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto; import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest; +import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeModuleKeys; +import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingPipelineExecutor; import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; @@ -77,6 +79,172 @@ class DriverWorkingTimeRuntimeProcessingPlanTest { assertThat(resolved.vehicleExpansionPaddingMinutes()).isEqualTo(15); } + @Test + void expandsSelectedFileSessionToAllDriversForNdiLearning() { + DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan( + Mockito.mock(RuntimeProcessingPipelineExecutor.class) + ); + + UnifiedRuntimeProcessingApiRequest expanded = plan.expandFileSessionLearningScope(sourceSelection(), true); + + assertThat(expanded.driverKey()).isNull(); + assertThat(expanded.driverKeys()).isEmpty(); + assertThat(expanded.includeAllDrivers()).isTrue(); + assertThat(expanded.sessionId()).isEqualTo(sourceSelection().sessionId()); + } + + @Test + void recognizesSessionOnlySelectionAsFileSessionLearningScope() { + DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan( + Mockito.mock(RuntimeProcessingPipelineExecutor.class) + ); + UnifiedRuntimeProcessingApiRequest source = sourceSelection(); + UnifiedRuntimeProcessingApiRequest sessionOnly = new UnifiedRuntimeProcessingApiRequest( + source.sessionId(), + source.sessionIds(), + source.compositeSessionId(), + source.tenantKey(), + Set.of(), + source.eventBackend(), + source.sourceKinds(), + source.driverKey(), + source.driverKeys(), + source.includeAllDrivers(), + source.vehicleKeys(), + source.includeAllVehicles(), + source.driverSourceEntityId(), + source.driverCardNation(), + source.driverCardNumber(), + source.occurredFrom(), + source.occurredTo(), + source.expandVehicleEvents(), + source.vehicleExpansionPaddingMinutes(), + source.includeIntersectingIntervals(), + source.significantDrivingMinutes(), + source.minimumRestPeriodMinutes(), + source.includeActivityIntervals(), + source.includeDrivingIntervals(), + source.sourceInputs() + ); + + UnifiedRuntimeProcessingApiRequest expanded = plan.expandFileSessionLearningScope(sessionOnly, true); + + assertThat(expanded.includeAllDrivers()).isTrue(); + assertThat(expanded.driverKey()).isNull(); + assertThat(expanded.sessionId()).isEqualTo(sessionOnly.sessionId()); + } + + @Test + void doesNotExpandAlternateDriverSelectorWithoutCanonicalDriverKey() { + DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan( + Mockito.mock(RuntimeProcessingPipelineExecutor.class) + ); + UnifiedRuntimeProcessingApiRequest selectedByCard = new UnifiedRuntimeProcessingApiRequest( + sourceSelection().sessionId(), + sourceSelection().sessionIds(), + sourceSelection().compositeSessionId(), + sourceSelection().tenantKey(), + sourceSelection().sourceFamilies(), + sourceSelection().eventBackend(), + sourceSelection().sourceKinds(), + null, + Set.of(), + false, + sourceSelection().vehicleKeys(), + sourceSelection().includeAllVehicles(), + "driver-source-1", + "A", + "CARD-1", + sourceSelection().occurredFrom(), + sourceSelection().occurredTo(), + sourceSelection().expandVehicleEvents(), + sourceSelection().vehicleExpansionPaddingMinutes(), + sourceSelection().includeIntersectingIntervals(), + sourceSelection().significantDrivingMinutes(), + sourceSelection().minimumRestPeriodMinutes(), + sourceSelection().includeActivityIntervals(), + sourceSelection().includeDrivingIntervals(), + sourceSelection().sourceInputs() + ); + + assertThat(plan.expandFileSessionLearningScope(selectedByCard, true)).isSameAs(selectedByCard); + } + + @Test + void clearsAlternateSelectorsWhenCanonicalDriverKeyAllowsSafeResponseFiltering() { + DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan( + Mockito.mock(RuntimeProcessingPipelineExecutor.class) + ); + UnifiedRuntimeProcessingApiRequest requested = new UnifiedRuntimeProcessingApiRequest( + sourceSelection().sessionId(), + sourceSelection().sessionIds(), + sourceSelection().compositeSessionId(), + sourceSelection().tenantKey(), + sourceSelection().sourceFamilies(), + sourceSelection().eventBackend(), + sourceSelection().sourceKinds(), + sourceSelection().driverKey(), + sourceSelection().driverKeys(), + false, + sourceSelection().vehicleKeys(), + sourceSelection().includeAllVehicles(), + "driver-source-1", + "A", + "CARD-1", + sourceSelection().occurredFrom(), + sourceSelection().occurredTo(), + sourceSelection().expandVehicleEvents(), + sourceSelection().vehicleExpansionPaddingMinutes(), + sourceSelection().includeIntersectingIntervals(), + sourceSelection().significantDrivingMinutes(), + sourceSelection().minimumRestPeriodMinutes(), + sourceSelection().includeActivityIntervals(), + sourceSelection().includeDrivingIntervals(), + sourceSelection().sourceInputs() + ); + + UnifiedRuntimeProcessingApiRequest expanded = plan.expandFileSessionLearningScope(requested, true); + + assertThat(expanded.includeAllDrivers()).isTrue(); + assertThat(expanded.driverSourceEntityId()).isNull(); + assertThat(expanded.driverCardNation()).isNull(); + assertThat(expanded.driverCardNumber()).isNull(); + } + + @Test + void canDisableAllDriverExpansionForNdiLearning() { + DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan( + Mockito.mock(RuntimeProcessingPipelineExecutor.class) + ); + UnifiedRuntimeProcessingApiRequest requested = sourceSelection(); + + assertThat(plan.expandFileSessionLearningScope(requested, false)).isSameAs(requested); + } + + @Test + void dedicatedHomePlanForcesNdiBeforeFinalProjectionAndEnablesAllDriverLearning() { + DriverHomeClassificationRuntimeProcessingPlan plan = new DriverHomeClassificationRuntimeProcessingPlan( + Mockito.mock(DriverWorkingTimeRuntimeProcessingPlan.class) + ); + RuntimeProcessingExecutionApiRequest request = new RuntimeProcessingExecutionApiRequest( + DriverHomeClassificationRuntimeProcessingPlan.PLAN_KEY, + sourceSelection(), + null, + List.of(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS), + Map.of() + ); + + RuntimeProcessingExecutionApiRequest delegated = plan.prepareDelegatedRequest(request); + + assertThat(delegated.processingPlanKey()).isEqualTo(DriverWorkingTimeRuntimeProcessingPlan.PLAN_KEY); + assertThat(delegated.modules()).endsWith( + DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION, + DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS + ); + assertThat(delegated.parameters()) + .containsEntry(DriverWorkingTimeRuntimeProcessingPlan.NDI_LEARN_ALL_FILE_SESSION_DRIVERS_PARAMETER, true); + } + @Test void executeCanOmitExtendedPartitionPayloads() { RuntimeDriverWorkingTimeScopeProcessingService scopeService = Mockito.mock(RuntimeDriverWorkingTimeScopeProcessingService.class);