diff --git a/README_NDI_HOME_CLASSIFICATION.md b/README_NDI_HOME_CLASSIFICATION.md index b223227..930dcac 100644 --- a/README_NDI_HOME_CLASSIFICATION.md +++ b/README_NDI_HOME_CLASSIFICATION.md @@ -1,6 +1,6 @@ -# NDI HOME / NOT_HOME classification implementation +# NDI HOME / NOT_HOME classification and country trip segmentation -This patch implements the HOME / NOT_HOME part of `docs/ndi_home_classification_en.md` as a dedicated runtime processing plan while reusing the existing driver-working-time pipeline. +This patch implements the HOME / NOT_HOME classification and the country-trip segmentation described in `docs/ndi_home_classification_en.md`. It reuses the existing driver-working-time pipeline and adds configurable Nominatim reverse geocoding only where source country evidence is missing. ## Public processing plan @@ -10,19 +10,20 @@ Use: driver-home-classification-v1 ``` -The plan delegates to the shared `driver-working-time-v1` pipeline and explicitly inserts: +The dedicated plan delegates to the shared `driver-working-time-v1` pipeline and explicitly inserts: ```text support-evidence-normalization -> ndi-home-classification +-> country-trip-segmentation -> driving-derived-projections ``` -The original `driver-working-time-v1` plan does not run the optional NDI module by default. It can opt in by explicitly requesting `ndi-home-classification`. +The normal `driver-working-time-v1` plan keeps both modules optional. They can also be requested explicitly as `ndi-home-classification` and `country-trip-segmentation`. ## Reused projection structures -`DriverWorkingTimeReusableProjectionBuilder.buildAllNonDrivingIntervalCoverage(...)` runs the existing Esper interruption/card-absence/GNSS enrichment pipeline with a zero rest-candidate threshold. It therefore creates enriched evidence for every positive non-driving interruption without changing the legacy daily/weekly-rest threshold or outputs. +`DriverWorkingTimeReusableProjectionBuilder.buildAllNonDrivingIntervalCoverage(...)` runs the existing Esper interruption/card-absence/GNSS enrichment pipeline with a zero rest-candidate threshold. It creates enriched evidence for every positive non-driving interruption without changing the legacy daily/weekly-rest threshold or outputs. The implementation reuses `DriverWorkingTimeRestCoverageInterval` as the enriched NDI evidence model. It provides: @@ -32,7 +33,7 @@ The implementation reuses `DriverWorkingTimeRestCoverageInterval` as the enriche - begin/end boundary GNSS evidence; - boundary odometer and movement evidence. -## Implemented classification rules +## HOME / NOT_HOME classification The rules are evaluated in the document order: @@ -48,89 +49,113 @@ Every classification contains a `DriverNdiHomeClassificationReason`, so the firs ## Location learning and clustering -Only NDIs longer than 7.5 hours with a position are added to the corpus. - -Position selection follows the document through the existing boundary-evidence resolver: - -```text -resolved begin-boundary evidence for the previous driving/vehicle context, -otherwise resolved end-boundary evidence for the next driving/vehicle context -``` - -The selected evidence is the closest eligible support-position event within the configured boundary lookup window, so it is an approximation when no event exists exactly at the driving boundary. +Only NDIs longer than 7.5 hours with a position are added to the corpus. Position selection uses the existing resolved begin-boundary evidence and falls back to resolved end-boundary evidence. The in-memory cache: - accumulates observations across one or more file-session executions; - deduplicates the same NDI across repeated/overlapping sessions; -- retains the source session IDs as provenance; +- retains source-session provenance; - stores the driver key on every observation; -- does not permanently mark a driver as "actual" or "other". - -For each result driver, the same cached corpus is viewed as: - -```text -actual-driver observations -other-driver observations -``` - -This makes the distinction request-relative and allows the corpus to be reused for another driver. +- calculates actual-driver and other-driver views per request. Clustering uses Java DBSCAN with Haversine distance. Defaults are 150 metres and three points. Noise observations remain in the denominator for visit-share calculations but are never home clusters. -## File-session learning scope +## Country trip segmentation -The dedicated plan defaults `ndiLearnAllFileSessionDrivers` to `true`. +`DriverCountryTripSegmentationService` builds country segments over driving intervals. -For a request with explicit canonical driver keys, the plan internally loads all drivers from the selected file sessions for location learning and filters the response back to the originally requested drivers. +Evidence precedence is: -The scope is not broadened when: +1. explicit tachograph border-crossing event (`countryFrom` / `countryTo`); +2. country code already present on a positioned support event; +3. Nominatim reverse lookup for a positioned event without a usable country code. -- the source selection is mixed or database-only; -- the option is disabled; -- the request uses only alternate card/source selectors and cannot be filtered safely by canonical driver key. +Country values are normalized to ISO 3166-1 alpha-2 where a mapping is known. Segment boundaries retain their evidence source: -## Configuration +```text +EXPLICIT_BORDER_CROSSING +GNSS_SOURCE_COUNTRY_CHANGE +NOMINATIM_COUNTRY_CHANGE +VEHICLE_CHANGE +FINAL +``` -The defaults are under: +The result includes segment counts, explicit-border counts, remote lookup counts, cache-hit counts, unresolved-coordinate counts, warnings, and OpenStreetMap attribution. + +## Nominatim integration + +The client uses the reverse endpoint with: + +```text +format=jsonv2 +zoom=3 +addressdetails=1 +layer=address +``` + +Only `address.country_code` is required by the classification/segmentation logic. Failures do not fail the whole processing plan; the coordinate remains unresolved and a diagnostic warning is returned. + +Safeguards: + +- identifying configurable `User-Agent`; +- optional identifying email; +- shared coordinate cache with TTL and maximum size; +- coordinate quantization for cache reuse; +- one execution-level remote lookup budget; +- fully serialized remote calls; +- configurable minimum interval; +- enforced minimum one-second interval for `nominatim.openstreetmap.org`; +- public OSM endpoint disabled unless deliberately opted in; +- configurable endpoint so a self-hosted or contracted Nominatim service can be substituted without code changes. + +### Configuration ```yaml eventhub: - tachograph-file-session: - processing: - ndi-long-minutes: 450 - ndi-very-long-minutes: 1440 - ndi-card-removal-percent: 80 - ndi-visit-share-percent: 25 - ndi-dbscan-eps-meters: 150 - ndi-dbscan-min-points: 3 - ndi-location-cache-ttl: 4h - ndi-location-cache-max-observations: 100000 - ndi-location-cache-namespace: default + reverse-geocoding: + enabled: true + provider: NOMINATIM + nominatim: + base-url: https://nominatim.openstreetmap.org + public-service-enabled: false + user-agent: eventhub-tachograph/0.1 (Nominatim reverse geocoding) + email: "" + accept-language: en + connect-timeout: 10s + read-timeout: 20s + minimum-request-interval: 1s + cache-ttl: 30d + cache-max-entries: 100000 + coordinate-decimal-places: 4 + max-remote-lookups-per-execution: 25 ``` -For tenantless uploaded sessions, configure a namespace that prevents unrelated operational contexts from sharing a corpus. Explicit tenant keys always create tenant-scoped corpora. +Environment variables use the `NOMINATIM_*` names shown in `application.yml`. -## Response extension +For a self-hosted endpoint, set `NOMINATIM_BASE_URL`; `public-service-enabled` is not needed. For deliberately selected, policy-compliant, low-volume use of the donated public endpoint, additionally set: -Each driver partition can now contain: +```text +NOMINATIM_PUBLIC_SERVICE_ENABLED=true +NOMINATIM_USER_AGENT= +NOMINATIM_EMAIL= +``` + +Production or recurring tachograph batch processing should use a self-hosted instance or a provider whose terms cover the expected workload. Coordinates may reveal vehicle or driver movements; do not send confidential or personal-location data to a public endpoint without an appropriate legal and privacy basis. + +## File-session learning scope + +The dedicated plan defaults `ndiLearnAllFileSessionDrivers` to `true`. For a request with explicit canonical driver keys, it internally loads all drivers from selected file sessions for location learning and filters the response back to the originally requested drivers. + +The scope is not broadened when the source is mixed/database-only, the option is disabled, or the result cannot safely be filtered by canonical driver key. + +## Response extensions + +Each driver partition can contain: ```text ndiHomeClassification +countryTripSegmentation ``` -It includes: - -- all NDI classifications; -- company and driver home cluster IDs; -- cluster centroids and visit statistics; -- actual-driver versus other-driver cached observation counts; -- diagnostics and notes. - -The field is omitted when the optional module was not executed, preserving the existing JSON shape for normal `driver-working-time-v1` calls. - -## Current implementation boundary - -This patch implements sections 1-4 of the document: NDI derivation/enrichment, location clustering, home-location determination, and HOME / NOT_HOME classification. - -Section 5, border-crossing/country trip segmentation, is intentionally not included yet. It needs a separate country-resolution abstraction and a decision between local geographic data, PostGIS, or an external reverse-geocoding provider. +The fields are omitted when their optional modules were not executed, preserving the existing JSON shape for normal `driver-working-time-v1` calls. diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index 09a8460..2cb79b7 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -33,6 +33,7 @@ public class EventHubProperties { private final RuntimeProcessing runtimeProcessing = new RuntimeProcessing(); private final TachographFileSession tachographFileSession = new TachographFileSession(runtimeProcessing); private final EsperPoc esperPoc = new EsperPoc(); + private final ReverseGeocoding reverseGeocoding = new ReverseGeocoding(); private final YellowFox yellowFox = new YellowFox(); public Batch getBatch() { @@ -55,10 +56,174 @@ public class EventHubProperties { return esperPoc; } + public ReverseGeocoding getReverseGeocoding() { + return reverseGeocoding; + } + public YellowFox getYellowFox() { return yellowFox; } + public static class ReverseGeocoding { + private boolean enabled = true; + private String provider = "NOMINATIM"; + private final Nominatim nominatim = new Nominatim(); + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getProvider() { + return provider; + } + + public void setProvider(String provider) { + if (provider != null && !provider.isBlank()) { + this.provider = provider.trim().toUpperCase(java.util.Locale.ROOT); + } + } + + public Nominatim getNominatim() { + return nominatim; + } + } + + public static class Nominatim { + private String baseUrl = "https://nominatim.openstreetmap.org"; + private boolean publicServiceEnabled = false; + private String userAgent = "eventhub-tachograph/0.1 (Nominatim reverse geocoding)"; + private String email; + private String acceptLanguage = "en"; + private Duration connectTimeout = Duration.ofSeconds(10); + private Duration readTimeout = Duration.ofSeconds(20); + private Duration minimumRequestInterval = Duration.ofSeconds(1); + private Duration cacheTtl = Duration.ofDays(30); + private int cacheMaxEntries = 100000; + private int coordinateDecimalPlaces = 4; + private int maxRemoteLookupsPerExecution = 25; + + public String getBaseUrl() { + return baseUrl; + } + + public void setBaseUrl(String baseUrl) { + if (baseUrl == null || baseUrl.isBlank()) { + return; + } + String normalized = baseUrl.trim(); + while (normalized.endsWith("/")) { + normalized = normalized.substring(0, normalized.length() - 1); + } + if (!normalized.isBlank()) { + this.baseUrl = normalized; + } + } + + public boolean isPublicServiceEnabled() { + return publicServiceEnabled; + } + + public void setPublicServiceEnabled(boolean publicServiceEnabled) { + this.publicServiceEnabled = publicServiceEnabled; + } + + public String getUserAgent() { + return userAgent; + } + + public void setUserAgent(String userAgent) { + if (userAgent != null && !userAgent.isBlank()) { + this.userAgent = userAgent.trim(); + } + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email == null || email.isBlank() ? null : email.trim(); + } + + public String getAcceptLanguage() { + return acceptLanguage; + } + + public void setAcceptLanguage(String acceptLanguage) { + if (acceptLanguage != null && !acceptLanguage.isBlank()) { + this.acceptLanguage = acceptLanguage.trim(); + } + } + + public Duration getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(Duration connectTimeout) { + if (connectTimeout != null && !connectTimeout.isNegative() && !connectTimeout.isZero()) { + this.connectTimeout = connectTimeout; + } + } + + public Duration getReadTimeout() { + return readTimeout; + } + + public void setReadTimeout(Duration readTimeout) { + if (readTimeout != null && !readTimeout.isNegative() && !readTimeout.isZero()) { + this.readTimeout = readTimeout; + } + } + + public Duration getMinimumRequestInterval() { + return minimumRequestInterval; + } + + public void setMinimumRequestInterval(Duration minimumRequestInterval) { + if (minimumRequestInterval != null && !minimumRequestInterval.isNegative()) { + this.minimumRequestInterval = minimumRequestInterval; + } + } + + public Duration getCacheTtl() { + return cacheTtl; + } + + public void setCacheTtl(Duration cacheTtl) { + if (cacheTtl != null && !cacheTtl.isNegative() && !cacheTtl.isZero()) { + this.cacheTtl = cacheTtl; + } + } + + public int getCacheMaxEntries() { + return cacheMaxEntries; + } + + public void setCacheMaxEntries(int cacheMaxEntries) { + this.cacheMaxEntries = Math.max(100, cacheMaxEntries); + } + + public int getCoordinateDecimalPlaces() { + return coordinateDecimalPlaces; + } + + public void setCoordinateDecimalPlaces(int coordinateDecimalPlaces) { + this.coordinateDecimalPlaces = Math.max(0, Math.min(7, coordinateDecimalPlaces)); + } + + public int getMaxRemoteLookupsPerExecution() { + return maxRemoteLookupsPerExecution; + } + + public void setMaxRemoteLookupsPerExecution(int maxRemoteLookupsPerExecution) { + this.maxRemoteLookupsPerExecution = Math.max(0, maxRemoteLookupsPerExecution); + } + } + public static class EsperPoc { private EsperActivityMergeMode activityMergeMode = EsperActivityMergeMode.JAVA; private EsperShiftResolutionMode shiftResolutionMode = EsperShiftResolutionMode.JAVA; diff --git a/src/main/java/at/procon/eventhub/geocoding/model/GeoCountryResolution.java b/src/main/java/at/procon/eventhub/geocoding/model/GeoCountryResolution.java new file mode 100644 index 0000000..088f748 --- /dev/null +++ b/src/main/java/at/procon/eventhub/geocoding/model/GeoCountryResolution.java @@ -0,0 +1,39 @@ +package at.procon.eventhub.geocoding.model; + +import java.math.BigDecimal; + +public record GeoCountryResolution( + GeoCountryResolutionStatus status, + BigDecimal latitude, + BigDecimal longitude, + String countryCode, + String countryName, + String displayName, + String provider, + String attribution, + boolean cacheHit, + boolean remoteRequestPerformed, + String errorMessage +) { + public boolean resolved() { + return status == GeoCountryResolutionStatus.RESOLVED + && countryCode != null + && !countryCode.isBlank(); + } + + public GeoCountryResolution asCacheHit(BigDecimal requestedLatitude, BigDecimal requestedLongitude) { + return new GeoCountryResolution( + status, + requestedLatitude, + requestedLongitude, + countryCode, + countryName, + displayName, + provider, + attribution, + true, + false, + errorMessage + ); + } +} diff --git a/src/main/java/at/procon/eventhub/geocoding/model/GeoCountryResolutionStatus.java b/src/main/java/at/procon/eventhub/geocoding/model/GeoCountryResolutionStatus.java new file mode 100644 index 0000000..50205bd --- /dev/null +++ b/src/main/java/at/procon/eventhub/geocoding/model/GeoCountryResolutionStatus.java @@ -0,0 +1,9 @@ +package at.procon.eventhub.geocoding.model; + +public enum GeoCountryResolutionStatus { + RESOLVED, + NOT_FOUND, + DISABLED, + REMOTE_LOOKUP_NOT_ALLOWED, + ERROR +} diff --git a/src/main/java/at/procon/eventhub/geocoding/service/GeoCountryResolver.java b/src/main/java/at/procon/eventhub/geocoding/service/GeoCountryResolver.java new file mode 100644 index 0000000..1057ad0 --- /dev/null +++ b/src/main/java/at/procon/eventhub/geocoding/service/GeoCountryResolver.java @@ -0,0 +1,19 @@ +package at.procon.eventhub.geocoding.service; + +import at.procon.eventhub.geocoding.model.GeoCountryResolution; +import java.math.BigDecimal; + +/** + * Resolves a WGS84 coordinate to an ISO country code. + * + *

The {@code allowRemoteLookup} flag lets a caller enforce a per-execution + * request budget while still benefiting from shared cached results.

+ */ +public interface GeoCountryResolver { + + GeoCountryResolution resolve( + BigDecimal latitude, + BigDecimal longitude, + boolean allowRemoteLookup + ); +} diff --git a/src/main/java/at/procon/eventhub/geocoding/service/NominatimGeoCountryResolver.java b/src/main/java/at/procon/eventhub/geocoding/service/NominatimGeoCountryResolver.java new file mode 100644 index 0000000..dca2d09 --- /dev/null +++ b/src/main/java/at/procon/eventhub/geocoding/service/NominatimGeoCountryResolver.java @@ -0,0 +1,426 @@ +package at.procon.eventhub.geocoding.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.geocoding.model.GeoCountryResolution; +import at.procon.eventhub.geocoding.model.GeoCountryResolutionStatus; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.net.URI; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Comparator; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Blocking Nominatim reverse-geocoding client with a shared coordinate cache + * and a process-wide request gate. + * + *

The public OpenStreetMap Nominatim service permits at most one request per + * second. The default configuration therefore uses a one-second minimum + * interval. Self-hosted Nominatim installations may configure another value.

+ */ +@Component +public class NominatimGeoCountryResolver implements GeoCountryResolver { + + private static final Logger LOG = LoggerFactory.getLogger(NominatimGeoCountryResolver.class); + private static final String PROVIDER = "NOMINATIM"; + + private final EventHubProperties properties; + private final ObjectMapper objectMapper; + private final HttpClient httpClient; + private final Clock clock; + private final Map cache = new ConcurrentHashMap<>(); + private final AtomicLong nextRemoteRequestNanos = new AtomicLong(0L); + private final Object requestGate = new Object(); + + public NominatimGeoCountryResolver( + EventHubProperties properties, + ObjectMapper objectMapper + ) { + this( + properties, + objectMapper, + HttpClient.newBuilder() + .connectTimeout(properties.getReverseGeocoding().getNominatim().getConnectTimeout()) + .build(), + Clock.systemUTC() + ); + } + + NominatimGeoCountryResolver( + EventHubProperties properties, + ObjectMapper objectMapper, + HttpClient httpClient, + Clock clock + ) { + this.properties = Objects.requireNonNull(properties, "properties must not be null"); + this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper must not be null"); + this.httpClient = Objects.requireNonNull(httpClient, "httpClient must not be null"); + this.clock = Objects.requireNonNull(clock, "clock must not be null"); + } + + @Override + public GeoCountryResolution resolve( + BigDecimal latitude, + BigDecimal longitude, + boolean allowRemoteLookup + ) { + if (!validCoordinate(latitude, longitude)) { + return result( + GeoCountryResolutionStatus.ERROR, + latitude, + longitude, + null, + null, + null, + false, + false, + "Latitude/longitude is missing or outside the WGS84 range." + ); + } + + EventHubProperties.ReverseGeocoding reverseConfig = properties.getReverseGeocoding(); + EventHubProperties.Nominatim config = reverseConfig.getNominatim(); + CoordinateKey key = CoordinateKey.of(latitude, longitude, config.getCoordinateDecimalPlaces()); + CacheEntry cached = cache.get(key); + Instant now = clock.instant(); + if (cached != null && cached.expiresAt().isAfter(now)) { + return cached.resolution().asCacheHit(latitude, longitude); + } + if (cached != null) { + cache.remove(key, cached); + } + + if (!reverseConfig.isEnabled() || !"NOMINATIM".equalsIgnoreCase(reverseConfig.getProvider())) { + return result( + GeoCountryResolutionStatus.DISABLED, + latitude, + longitude, + null, + null, + null, + false, + false, + "Nominatim reverse geocoding is disabled." + ); + } + if (isPublicService(config) && !config.isPublicServiceEnabled()) { + return result( + GeoCountryResolutionStatus.DISABLED, + latitude, + longitude, + null, + null, + null, + false, + false, + "The public nominatim.openstreetmap.org service requires explicit opt-in. " + + "Set eventhub.reverse-geocoding.nominatim.public-service-enabled=true " + + "only for policy-compliant low-volume use, or configure another Nominatim endpoint." + ); + } + if (!allowRemoteLookup) { + return result( + GeoCountryResolutionStatus.REMOTE_LOOKUP_NOT_ALLOWED, + latitude, + longitude, + null, + null, + null, + false, + false, + "The reverse-geocoding budget for this processing execution was exhausted." + ); + } + + GeoCountryResolution resolved = resolveRemote(latitude, longitude, config); + if (resolved.status() == GeoCountryResolutionStatus.RESOLVED + || resolved.status() == GeoCountryResolutionStatus.NOT_FOUND) { + putCache(key, resolved, now.plus(config.getCacheTtl()), config.getCacheMaxEntries()); + } + return resolved; + } + + private GeoCountryResolution resolveRemote( + BigDecimal latitude, + BigDecimal longitude, + EventHubProperties.Nominatim config + ) { + try { + synchronized (requestGate) { + awaitRequestPermit(effectiveMinimumRequestInterval(config)); + URI uri = reverseUri(latitude, longitude, config); + HttpRequest request = HttpRequest.newBuilder(uri) + .timeout(config.getReadTimeout()) + .header("Accept", "application/json") + .header("Accept-Language", config.getAcceptLanguage()) + .header("User-Agent", config.getUserAgent()) + .GET() + .build(); + HttpResponse response = httpClient.send( + request, + HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8) + ); + if (response.statusCode() == 404) { + return result( + GeoCountryResolutionStatus.NOT_FOUND, + latitude, + longitude, + null, + null, + null, + false, + true, + "Nominatim returned no address for the coordinate." + ); + } + if (response.statusCode() / 100 != 2) { + return result( + GeoCountryResolutionStatus.ERROR, + latitude, + longitude, + null, + null, + null, + false, + true, + "Nominatim reverse lookup failed with HTTP status " + response.statusCode() + "." + ); + } + + JsonNode root = objectMapper.readTree(response.body()); + if (root.hasNonNull("error")) { + return result( + GeoCountryResolutionStatus.NOT_FOUND, + latitude, + longitude, + null, + null, + text(root, "display_name"), + false, + true, + root.get("error").asText() + ); + } + JsonNode address = root.path("address"); + String countryCode = normalizeCountryCode(text(address, "country_code")); + String countryName = text(address, "country"); + if (countryCode == null) { + return result( + GeoCountryResolutionStatus.NOT_FOUND, + latitude, + longitude, + null, + countryName, + text(root, "display_name"), + false, + true, + "Nominatim response did not contain address.country_code." + ); + } + return new GeoCountryResolution( + GeoCountryResolutionStatus.RESOLVED, + latitude, + longitude, + countryCode, + countryName, + text(root, "display_name"), + PROVIDER, + text(root, "licence"), + false, + true, + null + ); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return result( + GeoCountryResolutionStatus.ERROR, + latitude, + longitude, + null, + null, + null, + false, + true, + "Nominatim reverse lookup was interrupted." + ); + } catch (IOException | RuntimeException ex) { + LOG.warn("Nominatim reverse lookup failed for {},{}: {}", latitude, longitude, ex.getMessage()); + return result( + GeoCountryResolutionStatus.ERROR, + latitude, + longitude, + null, + null, + null, + false, + true, + "Nominatim reverse lookup failed: " + ex.getMessage() + ); + } + } + + private URI reverseUri( + BigDecimal latitude, + BigDecimal longitude, + EventHubProperties.Nominatim config + ) { + StringBuilder query = new StringBuilder(config.getBaseUrl()) + .append("/reverse?format=jsonv2") + .append("&lat=").append(url(latitude.toPlainString())) + .append("&lon=").append(url(longitude.toPlainString())) + .append("&zoom=3") + .append("&addressdetails=1") + .append("&layer=address") + .append("&accept-language=").append(url(config.getAcceptLanguage())); + if (config.getEmail() != null && !config.getEmail().isBlank()) { + query.append("&email=").append(url(config.getEmail())); + } + return URI.create(query.toString()); + } + + private Duration effectiveMinimumRequestInterval(EventHubProperties.Nominatim config) { + Duration configured = config.getMinimumRequestInterval() == null + ? Duration.ZERO + : config.getMinimumRequestInterval(); + try { + if (isPublicService(config) + && configured.compareTo(Duration.ofSeconds(1)) < 0) { + return Duration.ofSeconds(1); + } + } catch (RuntimeException ignored) { + } + return configured; + } + + private boolean isPublicService(EventHubProperties.Nominatim config) { + try { + return "nominatim.openstreetmap.org".equalsIgnoreCase( + URI.create(config.getBaseUrl()).getHost() + ); + } catch (RuntimeException ignored) { + return false; + } + } + + private void awaitRequestPermit(Duration minimumInterval) throws InterruptedException { + long intervalNanos = minimumInterval == null ? 0L : Math.max(0L, minimumInterval.toNanos()); + long now = System.nanoTime(); + long allowedAt = nextRemoteRequestNanos.get(); + long waitNanos = allowedAt - now; + if (waitNanos > 0L) { + long millis = waitNanos / 1_000_000L; + int nanos = (int) (waitNanos % 1_000_000L); + Thread.sleep(millis, nanos); + } + nextRemoteRequestNanos.set(System.nanoTime() + intervalNanos); + } + + private void putCache( + CoordinateKey key, + GeoCountryResolution resolution, + Instant expiresAt, + int maxEntries + ) { + cache.put(key, new CacheEntry(resolution, expiresAt, clock.instant())); + if (cache.size() <= maxEntries) { + return; + } + Instant now = clock.instant(); + cache.entrySet().removeIf(entry -> !entry.getValue().expiresAt().isAfter(now)); + while (cache.size() > maxEntries) { + cache.entrySet().stream() + .min(Comparator.comparing(entry -> entry.getValue().createdAt())) + .map(Map.Entry::getKey) + .ifPresent(cache::remove); + } + } + + private boolean validCoordinate(BigDecimal latitude, BigDecimal longitude) { + return latitude != null + && longitude != null + && latitude.compareTo(BigDecimal.valueOf(-90)) >= 0 + && latitude.compareTo(BigDecimal.valueOf(90)) <= 0 + && longitude.compareTo(BigDecimal.valueOf(-180)) >= 0 + && longitude.compareTo(BigDecimal.valueOf(180)) <= 0; + } + + private String normalizeCountryCode(String value) { + if (value == null || value.isBlank()) { + return null; + } + String normalized = value.trim().toUpperCase(Locale.ROOT); + return normalized.length() == 2 ? normalized : null; + } + + private String text(JsonNode node, String field) { + if (node == null || !node.has(field) || node.get(field).isNull()) { + return null; + } + String value = node.get(field).asText(); + return value == null || value.isBlank() ? null : value.trim(); + } + + private String url(String value) { + return URLEncoder.encode(value == null ? "" : value, StandardCharsets.UTF_8); + } + + private GeoCountryResolution result( + GeoCountryResolutionStatus status, + BigDecimal latitude, + BigDecimal longitude, + String countryCode, + String countryName, + String displayName, + boolean cacheHit, + boolean remoteRequestPerformed, + String errorMessage + ) { + return new GeoCountryResolution( + status, + latitude, + longitude, + countryCode, + countryName, + displayName, + PROVIDER, + null, + cacheHit, + remoteRequestPerformed, + errorMessage + ); + } + + private record CoordinateKey(BigDecimal latitude, BigDecimal longitude) { + static CoordinateKey of(BigDecimal latitude, BigDecimal longitude, int decimalPlaces) { + return new CoordinateKey( + latitude.setScale(decimalPlaces, RoundingMode.HALF_UP), + longitude.setScale(decimalPlaces, RoundingMode.HALF_UP) + ); + } + } + + private record CacheEntry( + GeoCountryResolution resolution, + Instant expiresAt, + Instant createdAt + ) { + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegment.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegment.java new file mode 100644 index 0000000..a2afeb1 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegment.java @@ -0,0 +1,25 @@ +package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model; + +import java.math.BigDecimal; +import java.time.OffsetDateTime; + +public record DriverCountryTripSegment( + String segmentId, + String driverKey, + String registrationKey, + String vehicleKey, + OffsetDateTime startedAt, + OffsetDateTime endedAt, + String countryFrom, + String countryTo, + BigDecimal latitudeFrom, + BigDecimal longitudeFrom, + BigDecimal latitudeTo, + BigDecimal longitudeTo, + String positionFromEventId, + String positionToEventId, + DriverCountryTripSegmentBoundarySource endBoundarySource, + String boundaryEventId, + boolean boundaryCountryReverseGeocoded +) { +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegmentBoundarySource.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegmentBoundarySource.java new file mode 100644 index 0000000..1383cda --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegmentBoundarySource.java @@ -0,0 +1,9 @@ +package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model; + +public enum DriverCountryTripSegmentBoundarySource { + EXPLICIT_BORDER_CROSSING, + GNSS_SOURCE_COUNTRY_CHANGE, + NOMINATIM_COUNTRY_CHANGE, + VEHICLE_CHANGE, + FINAL +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegmentationResult.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegmentationResult.java new file mode 100644 index 0000000..27a9049 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegmentationResult.java @@ -0,0 +1,24 @@ +package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model; + +import java.util.List; + +public record DriverCountryTripSegmentationResult( + String driverKey, + int drivingIntervalCount, + int supportingGeoEventCount, + int explicitBorderCrossingCount, + int reverseGeocodingRemoteRequestCount, + int reverseGeocodingCacheHitCount, + int unresolvedCoordinateCount, + int segmentCount, + String reverseGeocodingAttribution, + List segments, + List notes, + List warnings +) { + public DriverCountryTripSegmentationResult { + segments = segments == null ? List.of() : List.copyOf(segments); + notes = notes == null ? List.of() : List.copyOf(notes); + warnings = warnings == null ? List.of() : List.copyOf(warnings); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegmentationScopeResult.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegmentationScopeResult.java new file mode 100644 index 0000000..02bb894 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/model/DriverCountryTripSegmentationScopeResult.java @@ -0,0 +1,29 @@ +package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public record DriverCountryTripSegmentationScopeResult( + int driverCount, + int segmentCount, + int reverseGeocodingRemoteRequestCount, + int reverseGeocodingCacheHitCount, + int unresolvedCoordinateCount, + String reverseGeocodingAttribution, + Map driverResults, + List notes, + List warnings +) { + public DriverCountryTripSegmentationScopeResult { + driverResults = driverResults == null + ? Map.of() + : java.util.Collections.unmodifiableMap(new LinkedHashMap<>(driverResults)); + notes = notes == null ? List.of() : List.copyOf(notes); + warnings = warnings == null ? List.of() : List.copyOf(warnings); + } + + public DriverCountryTripSegmentationResult resultForDriver(String driverKey) { + return driverKey == null ? null : driverResults.get(driverKey); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/service/CountryCodeNormalizer.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/service/CountryCodeNormalizer.java new file mode 100644 index 0000000..f7fb716 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/service/CountryCodeNormalizer.java @@ -0,0 +1,105 @@ +package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.service; + +import java.util.LinkedHashMap; +import java.util.Locale; +import java.util.Map; + +final class CountryCodeNormalizer { + + private static final Map TACHOGRAPH_TO_ISO2 = buildTachographMap(); + + private CountryCodeNormalizer() { + } + + static String normalizeTachograph(String value) { + if (value == null || value.isBlank()) { + return null; + } + String normalized = value.trim().toUpperCase(Locale.ROOT); + String mapped = TACHOGRAPH_TO_ISO2.get(normalized); + if (mapped != null) { + return mapped; + } + return normalizeIso(normalized); + } + + static String normalizeIso(String value) { + if (value == null || value.isBlank()) { + return null; + } + String normalized = value.trim().toUpperCase(Locale.ROOT); + if (normalized.length() == 2) { + return normalized; + } + for (String iso2 : Locale.getISOCountries()) { + Locale locale = Locale.of("", iso2); + try { + if (locale.getISO3Country().equalsIgnoreCase(normalized)) { + return iso2; + } + } catch (java.util.MissingResourceException ignored) { + } + } + return null; + } + + private static Map buildTachographMap() { + Map values = new LinkedHashMap<>(); + values.put("A", "AT"); + values.put("AL", "AL"); + values.put("AND", "AD"); + values.put("ARM", "AM"); + values.put("AZ", "AZ"); + values.put("B", "BE"); + values.put("BG", "BG"); + values.put("BIH", "BA"); + values.put("BY", "BY"); + values.put("CH", "CH"); + values.put("CY", "CY"); + values.put("CZ", "CZ"); + values.put("D", "DE"); + values.put("DK", "DK"); + values.put("E", "ES"); + values.put("EST", "EE"); + values.put("F", "FR"); + values.put("FIN", "FI"); + values.put("FL", "LI"); + values.put("FR", "FO"); + values.put("UK", "GB"); + values.put("GE", "GE"); + values.put("GR", "GR"); + values.put("H", "HU"); + values.put("HR", "HR"); + values.put("I", "IT"); + values.put("IRL", "IE"); + values.put("IS", "IS"); + values.put("KZ", "KZ"); + values.put("L", "LU"); + values.put("LT", "LT"); + values.put("LV", "LV"); + values.put("M", "MT"); + values.put("MC", "MC"); + values.put("MD", "MD"); + values.put("MK", "MK"); + values.put("N", "NO"); + values.put("NL", "NL"); + values.put("P", "PT"); + values.put("PL", "PL"); + values.put("RO", "RO"); + values.put("RSM", "SM"); + values.put("RUS", "RU"); + values.put("S", "SE"); + values.put("SK", "SK"); + values.put("SLO", "SI"); + values.put("TM", "TM"); + values.put("TR", "TR"); + values.put("UA", "UA"); + values.put("V", "VA"); + values.put("YU", "RS"); + values.put("MNE", "ME"); + values.put("SRB", "RS"); + values.put("UZ", "UZ"); + values.put("TJ", "TJ"); + return Map.copyOf(values); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/service/DriverCountryTripSegmentationService.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/service/DriverCountryTripSegmentationService.java new file mode 100644 index 0000000..33bb927 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/service/DriverCountryTripSegmentationService.java @@ -0,0 +1,593 @@ +package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.geocoding.model.GeoCountryResolution; +import at.procon.eventhub.geocoding.model.GeoCountryResolutionStatus; +import at.procon.eventhub.geocoding.service.GeoCountryResolver; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegment; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentBoundarySource; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationResult; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationScopeResult; +import at.procon.eventhub.processing.eventprocessing.support.RuntimeSupportEvidenceEvent; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.springframework.stereotype.Service; + +@Service +public class DriverCountryTripSegmentationService { + + private final GeoCountryResolver countryResolver; + private final EventHubProperties properties; + + public DriverCountryTripSegmentationService( + GeoCountryResolver countryResolver, + EventHubProperties properties + ) { + this.countryResolver = countryResolver; + this.properties = properties; + } + + public DriverCountryTripSegmentationScopeResult segmentPreparedInputs( + Map preparedInputs + ) { + int maxRemoteLookups = properties.getReverseGeocoding() + .getNominatim() + .getMaxRemoteLookupsPerExecution(); + LookupBudget budget = new LookupBudget(maxRemoteLookups); + LinkedHashMap driverResults = new LinkedHashMap<>(); + List scopeWarnings = new ArrayList<>(); + + if (preparedInputs != null) { + preparedInputs.entrySet().stream() + .filter(entry -> entry.getKey() != null + && !entry.getKey().isBlank() + && entry.getValue() != null + && entry.getValue().processingInput() != null) + .sorted(Map.Entry.comparingByKey()) + .forEach(entry -> { + DriverCountryTripSegmentationResult result = segmentDriver( + entry.getKey(), + entry.getValue(), + budget + ); + driverResults.put(entry.getKey(), result); + scopeWarnings.addAll(result.warnings()); + }); + } + + int segmentCount = driverResults.values().stream() + .mapToInt(DriverCountryTripSegmentationResult::segmentCount) + .sum(); + int cacheHitCount = driverResults.values().stream() + .mapToInt(DriverCountryTripSegmentationResult::reverseGeocodingCacheHitCount) + .sum(); + int unresolvedCount = driverResults.values().stream() + .mapToInt(DriverCountryTripSegmentationResult::unresolvedCoordinateCount) + .sum(); + List notes = List.of( + "Country trip segmentation preferred explicit tachograph border-crossing events and existing source country codes before Nominatim.", + "Nominatim remote lookups performed: " + budget.usedRemoteLookups() + " of configured maximum " + + maxRemoteLookups + ".", + "Nominatim requests use a shared coordinate cache and the configured minimum request interval." + ); + return new DriverCountryTripSegmentationScopeResult( + driverResults.size(), + segmentCount, + budget.usedRemoteLookups(), + cacheHitCount, + unresolvedCount, + "Data © OpenStreetMap contributors, ODbL 1.0", + driverResults, + notes, + distinctLimited(scopeWarnings, 50) + ); + } + + private DriverCountryTripSegmentationResult segmentDriver( + String driverKey, + DriverWorkingTimePreparedInput preparedInput, + LookupBudget budget + ) { + List drivingIntervals = preparedInput.processingInput() + .activityIntervals() + .stream() + .filter(Objects::nonNull) + .filter(interval -> "DRIVE".equalsIgnoreCase(interval.activityType())) + .filter(interval -> interval.startedAt() != null + && interval.endedAt() != null + && interval.endedAt().isAfter(interval.startedAt())) + .sorted(Comparator + .comparing(DriverWorkingTimeActivityInterval::startedAt) + .thenComparing(DriverWorkingTimeActivityInterval::endedAt)) + .toList(); + List supportEvents = preparedInput.processingInput() + .supportEvidenceEvents() + .stream() + .filter(Objects::nonNull) + .filter(event -> event.occurredAt() != null) + .sorted(Comparator + .comparing(RuntimeSupportEvidenceEvent::occurredAt) + .thenComparing(event -> nullToEmpty(event.eventId()))) + .toList(); + + if (drivingIntervals.isEmpty()) { + return new DriverCountryTripSegmentationResult( + driverKey, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + "Data © OpenStreetMap contributors, ODbL 1.0", + List.of(), + List.of("No driving intervals were available for country trip segmentation."), + List.of() + ); + } + + DriverStats stats = new DriverStats(); + List warnings = new ArrayList<>(); + List segments = new ArrayList<>(); + + DriverWorkingTimeActivityInterval firstDrive = drivingIntervals.getFirst(); + SegmentState state = new SegmentState( + firstDrive.startedAt(), + firstDrive.registrationKey(), + firstDrive.vehicleKey() + ); + DriverWorkingTimeActivityInterval previousDrive = null; + + for (DriverWorkingTimeActivityInterval drive : drivingIntervals) { + List driveEvents = supportEvents.stream() + .filter(event -> within(event.occurredAt(), drive.startedAt(), drive.endedAt())) + .filter(event -> compatibleVehicle(event, drive)) + .toList(); + stats.supportingGeoEventCount += (int) driveEvents.stream() + .filter(this::hasCoordinateOrBorderEvidence) + .count(); + stats.explicitBorderCrossingCount += (int) driveEvents.stream() + .filter(this::isExplicitBorderCrossing) + .count(); + + if (previousDrive != null && vehicleChanged(previousDrive, drive)) { + String previousCountry = state.country; + addSegment( + segments, + driverKey, + state, + previousDrive.endedAt(), + previousCountry, + previousCountry, + state.lastPositionEvent, + DriverCountryTripSegmentBoundarySource.VEHICLE_CHANGE, + null, + false + ); + state = new SegmentState( + drive.startedAt(), + drive.registrationKey(), + drive.vehicleKey() + ); + state.country = previousCountry; + } + + for (RuntimeSupportEvidenceEvent event : driveEvents) { + if (!hasCoordinateOrBorderEvidence(event)) { + continue; + } + if (state.startLatitude == null && hasCoordinate(event)) { + state.setStartPosition(event); + } + + if (isExplicitBorderCrossing(event)) { + processExplicitBorderCrossing( + segments, + driverKey, + state, + event, + budget, + stats, + warnings + ); + } else if (hasCoordinate(event)) { + processPosition( + segments, + driverKey, + state, + event, + budget, + stats, + warnings + ); + } + } + previousDrive = drive; + } + + DriverWorkingTimeActivityInterval lastDrive = drivingIntervals.getLast(); + addSegment( + segments, + driverKey, + state, + lastDrive.endedAt(), + state.country, + state.country, + state.lastPositionEvent, + DriverCountryTripSegmentBoundarySource.FINAL, + state.lastPositionEvent == null ? null : state.lastPositionEvent.eventId(), + false + ); + + List notes = List.of( + "Built country trip segments from " + drivingIntervals.size() + " driving interval(s) and " + + stats.supportingGeoEventCount + " supporting geo/border event(s).", + "Explicit border crossings are authoritative; Nominatim is used only when a positioned event has no usable source country code.", + "Country codes in the result are normalized to ISO 3166-1 alpha-2 where a mapping is known." + ); + if (budget.exhausted()) { + warnings.add("The configured Nominatim remote-lookup budget was exhausted; later uncached coordinates remained unresolved."); + } + return new DriverCountryTripSegmentationResult( + driverKey, + drivingIntervals.size(), + stats.supportingGeoEventCount, + stats.explicitBorderCrossingCount, + stats.remoteRequestCount, + stats.cacheHitCount, + stats.unresolvedCoordinateCount, + segments.size(), + "Data © OpenStreetMap contributors, ODbL 1.0", + segments, + notes, + distinctLimited(warnings, 20) + ); + } + + private void processExplicitBorderCrossing( + List segments, + String driverKey, + SegmentState state, + RuntimeSupportEvidenceEvent event, + LookupBudget budget, + DriverStats stats, + List warnings + ) { + String countryFrom = CountryCodeNormalizer.normalizeTachograph(event.countryFrom()); + String countryTo = CountryCodeNormalizer.normalizeTachograph(event.countryTo()); + ResolvedEventCountry positionedCountry = null; + if (countryTo == null && hasCoordinate(event)) { + positionedCountry = resolveEventCountry(event, budget, stats, warnings); + countryTo = positionedCountry.countryCode(); + } + if (state.country == null) { + state.country = firstNonBlank(countryFrom, countryTo); + } + String effectiveFrom = firstNonBlank(state.country, countryFrom); + String effectiveTo = firstNonBlank(countryTo, effectiveFrom); + if (effectiveFrom == null || effectiveTo == null || Objects.equals(effectiveFrom, effectiveTo)) { + state.lastPositionEvent = hasCoordinate(event) ? event : state.lastPositionEvent; + return; + } + + addSegment( + segments, + driverKey, + state, + event.occurredAt(), + effectiveFrom, + effectiveTo, + event, + DriverCountryTripSegmentBoundarySource.EXPLICIT_BORDER_CROSSING, + event.eventId(), + positionedCountry != null && positionedCountry.reverseGeocoded() + ); + state.restartAt(event, effectiveTo); + } + + private void processPosition( + List segments, + String driverKey, + SegmentState state, + RuntimeSupportEvidenceEvent event, + LookupBudget budget, + DriverStats stats, + List warnings + ) { + ResolvedEventCountry resolved = resolveEventCountry(event, budget, stats, warnings); + state.lastPositionEvent = event; + if (resolved.countryCode() == null) { + return; + } + if (state.country == null) { + state.country = resolved.countryCode(); + return; + } + if (Objects.equals(state.country, resolved.countryCode())) { + return; + } + + DriverCountryTripSegmentBoundarySource source = resolved.reverseGeocoded() + ? DriverCountryTripSegmentBoundarySource.NOMINATIM_COUNTRY_CHANGE + : DriverCountryTripSegmentBoundarySource.GNSS_SOURCE_COUNTRY_CHANGE; + addSegment( + segments, + driverKey, + state, + event.occurredAt(), + state.country, + resolved.countryCode(), + event, + source, + event.eventId(), + resolved.reverseGeocoded() + ); + state.restartAt(event, resolved.countryCode()); + } + + private ResolvedEventCountry resolveEventCountry( + RuntimeSupportEvidenceEvent event, + LookupBudget budget, + DriverStats stats, + List warnings + ) { + String sourceCountry = CountryCodeNormalizer.normalizeTachograph(event.countryCode()); + if (sourceCountry != null) { + return new ResolvedEventCountry(sourceCountry, false); + } + if (!hasCoordinate(event)) { + return ResolvedEventCountry.unresolved(); + } + + GeoCountryResolution resolution = countryResolver.resolve( + event.latitude(), + event.longitude(), + budget.remoteLookupAllowed() + ); + if (resolution.remoteRequestPerformed()) { + budget.recordRemoteLookup(); + stats.remoteRequestCount++; + } + if (resolution.cacheHit()) { + stats.cacheHitCount++; + } + if (resolution.resolved()) { + return new ResolvedEventCountry( + CountryCodeNormalizer.normalizeIso(resolution.countryCode()), + true + ); + } + + stats.unresolvedCoordinateCount++; + if (resolution.status() == GeoCountryResolutionStatus.ERROR + || resolution.status() == GeoCountryResolutionStatus.DISABLED + || resolution.status() == GeoCountryResolutionStatus.REMOTE_LOOKUP_NOT_ALLOWED) { + warnings.add("Country could not be resolved for support event " + + nullToEmpty(event.eventId()) + ": " + nullToEmpty(resolution.errorMessage())); + } + return ResolvedEventCountry.unresolved(); + } + + private void addSegment( + List segments, + String driverKey, + SegmentState state, + OffsetDateTime endedAt, + String countryFrom, + String countryTo, + RuntimeSupportEvidenceEvent endPosition, + DriverCountryTripSegmentBoundarySource boundarySource, + String boundaryEventId, + boolean reverseGeocodedBoundary + ) { + if (state.startedAt == null || endedAt == null || !endedAt.isAfter(state.startedAt)) { + return; + } + int index = segments.size(); + segments.add(new DriverCountryTripSegment( + segmentId(driverKey, state.startedAt, endedAt, index), + driverKey, + state.registrationKey, + state.vehicleKey, + state.startedAt, + endedAt, + countryFrom, + countryTo, + state.startLatitude, + state.startLongitude, + endPosition == null ? null : endPosition.latitude(), + endPosition == null ? null : endPosition.longitude(), + state.startPositionEventId, + endPosition == null ? null : endPosition.eventId(), + boundarySource, + boundaryEventId, + reverseGeocodedBoundary + )); + } + + private boolean within(OffsetDateTime value, OffsetDateTime start, OffsetDateTime end) { + return value != null + && start != null + && end != null + && !value.isBefore(start) + && !value.isAfter(end); + } + + private boolean compatibleVehicle( + RuntimeSupportEvidenceEvent event, + DriverWorkingTimeActivityInterval interval + ) { + if (event.vehicleKey() != null + && interval.vehicleKey() != null + && !event.vehicleKey().equals(interval.vehicleKey())) { + return false; + } + return event.registrationKey() == null + || interval.registrationKey() == null + || event.registrationKey().equals(interval.registrationKey()); + } + + private boolean vehicleChanged( + DriverWorkingTimeActivityInterval previous, + DriverWorkingTimeActivityInterval next + ) { + if (previous.vehicleKey() != null && next.vehicleKey() != null) { + return !previous.vehicleKey().equals(next.vehicleKey()); + } + if (previous.registrationKey() != null && next.registrationKey() != null) { + return !previous.registrationKey().equals(next.registrationKey()); + } + return false; + } + + private boolean hasCoordinateOrBorderEvidence(RuntimeSupportEvidenceEvent event) { + return hasCoordinate(event) || isExplicitBorderCrossing(event); + } + + private boolean hasCoordinate(RuntimeSupportEvidenceEvent event) { + return event.latitude() != null && event.longitude() != null; + } + + private boolean isExplicitBorderCrossing(RuntimeSupportEvidenceEvent event) { + return "BORDER_CROSSING".equalsIgnoreCase(event.eventDomain()) + || event.countryFrom() != null + || event.countryTo() != null; + } + + private String segmentId( + String driverKey, + OffsetDateTime startedAt, + OffsetDateTime endedAt, + int index + ) { + String raw = nullToEmpty(driverKey) + "|" + startedAt + "|" + endedAt + "|" + index; + try { + byte[] digest = MessageDigest.getInstance("SHA-256") + .digest(raw.getBytes(StandardCharsets.UTF_8)); + return "TRIP_COUNTRY|" + java.util.HexFormat.of().formatHex(digest, 0, 12); + } catch (NoSuchAlgorithmException ex) { + throw new IllegalStateException("SHA-256 is unavailable.", ex); + } + } + + private String firstNonBlank(String... values) { + if (values == null) { + return null; + } + for (String value : values) { + if (value != null && !value.isBlank()) { + return value.trim().toUpperCase(Locale.ROOT); + } + } + return null; + } + + private String nullToEmpty(String value) { + return value == null ? "" : value; + } + + private List distinctLimited(List values, int limit) { + Set distinct = new LinkedHashSet<>(); + if (values != null) { + values.stream() + .filter(value -> value != null && !value.isBlank()) + .map(String::trim) + .forEach(distinct::add); + } + return distinct.stream().limit(Math.max(0, limit)).toList(); + } + + private static final class SegmentState { + private OffsetDateTime startedAt; + private String registrationKey; + private String vehicleKey; + private String country; + private BigDecimal startLatitude; + private BigDecimal startLongitude; + private String startPositionEventId; + private RuntimeSupportEvidenceEvent lastPositionEvent; + + private SegmentState( + OffsetDateTime startedAt, + String registrationKey, + String vehicleKey + ) { + this.startedAt = startedAt; + this.registrationKey = registrationKey; + this.vehicleKey = vehicleKey; + } + + private void setStartPosition(RuntimeSupportEvidenceEvent event) { + if (event == null || event.latitude() == null || event.longitude() == null) { + return; + } + this.startLatitude = event.latitude(); + this.startLongitude = event.longitude(); + this.startPositionEventId = event.eventId(); + this.lastPositionEvent = event; + } + + private void restartAt(RuntimeSupportEvidenceEvent event, String newCountry) { + this.startedAt = event.occurredAt(); + this.country = newCountry; + this.startLatitude = event.latitude(); + this.startLongitude = event.longitude(); + this.startPositionEventId = event.eventId(); + this.lastPositionEvent = event; + } + } + + private static final class LookupBudget { + private final int maximum; + private int used; + + private LookupBudget(int maximum) { + this.maximum = Math.max(0, maximum); + } + + private boolean remoteLookupAllowed() { + return used < maximum; + } + + private void recordRemoteLookup() { + used++; + } + + private int usedRemoteLookups() { + return used; + } + + private boolean exhausted() { + return maximum > 0 && used >= maximum; + } + } + + private static final class DriverStats { + private int supportingGeoEventCount; + private int explicitBorderCrossingCount; + private int remoteRequestCount; + private int cacheHitCount; + private int unresolvedCoordinateCount; + } + + private record ResolvedEventCountry(String countryCode, boolean reverseGeocoded) { + private static ResolvedEventCountry unresolved() { + return new ResolvedEventCountry(null, false); + } + } +} diff --git a/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeDerivedProjectionResultDto.java b/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeDerivedProjectionResultDto.java index bc51db8..96c2608 100644 --- a/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeDerivedProjectionResultDto.java +++ b/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeDerivedProjectionResultDto.java @@ -2,6 +2,7 @@ package at.procon.eventhub.processing.dto; import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto; import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationResult; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationResult; import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import com.fasterxml.jackson.annotation.JsonInclude; @@ -19,7 +20,9 @@ public record UnifiedRuntimeDerivedProjectionResultDto( RuntimeSupportEvidenceNormalizationDebugDto supportEvidenceNormalization, RuntimeDriverPartitionDebugDto partitionDebug, @JsonInclude(JsonInclude.Include.NON_NULL) - DriverNdiHomeClassificationResult ndiHomeClassification + DriverNdiHomeClassificationResult ndiHomeClassification, + @JsonInclude(JsonInclude.Include.NON_NULL) + DriverCountryTripSegmentationResult countryTripSegmentation ) { public UnifiedRuntimeDerivedProjectionResultDto { discoveredVehicles = discoveredVehicles == null ? List.of() : List.copyOf(discoveredVehicles); @@ -47,6 +50,7 @@ public record UnifiedRuntimeDerivedProjectionResultDto( notes, null, null, + null, null ); } @@ -73,6 +77,7 @@ public record UnifiedRuntimeDerivedProjectionResultDto( notes, supportEvidenceNormalization, null, + null, null ); } @@ -100,6 +105,7 @@ public record UnifiedRuntimeDerivedProjectionResultDto( notes, supportEvidenceNormalization, partitionDebug, + null, null ); } @@ -116,7 +122,8 @@ public record UnifiedRuntimeDerivedProjectionResultDto( notes, supportEvidenceNormalization, debug, - ndiHomeClassification + ndiHomeClassification, + countryTripSegmentation ); } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverCountryTripSegmentationModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverCountryTripSegmentationModule.java new file mode 100644 index 0000000..1c8cac8 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverCountryTripSegmentationModule.java @@ -0,0 +1,70 @@ +package at.procon.eventhub.processing.eventprocessing.module; + +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationScopeResult; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.service.DriverCountryTripSegmentationService; +import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.springframework.stereotype.Component; + +@Component +public class DriverCountryTripSegmentationModule implements RuntimeProcessingModule { + + private final DriverCountryTripSegmentationService segmentationService; + + public DriverCountryTripSegmentationModule( + DriverCountryTripSegmentationService segmentationService + ) { + this.segmentationService = segmentationService; + } + + @Override + public String moduleKey() { + return DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION; + } + + @Override + public RuntimeProcessingModuleDescriptorDto descriptor() { + return new RuntimeProcessingModuleDescriptorDto( + moduleKey(), + "Country trip segmentation", + "Builds per-driver country trip segments from explicit tachograph border crossings and GNSS country changes, using cached/rate-limited Nominatim reverse geocoding when source country codes are absent.", + "JAVA+HTTP", + Set.of(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION), + Set.of("Map"), + Set.of("DriverCountryTripSegmentationScopeResult") + ); + } + + @Override + public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) { + DriverCountryTripSegmentationScopeResult result = segmentationService.segmentPreparedInputs( + preparedInputs(context) + ); + Map metadata = new LinkedHashMap<>(); + metadata.put("driverCount", result.driverCount()); + metadata.put("segmentCount", result.segmentCount()); + metadata.put("reverseGeocodingRemoteRequestCount", result.reverseGeocodingRemoteRequestCount()); + metadata.put("reverseGeocodingCacheHitCount", result.reverseGeocodingCacheHitCount()); + metadata.put("unresolvedCoordinateCount", result.unresolvedCoordinateCount()); + metadata.put("reverseGeocodingAttribution", result.reverseGeocodingAttribution()); + return new RuntimeProcessingModuleResult( + moduleKey(), + RuntimeProcessingModuleStatus.SUCCESS, + result, + metadata, + result.warnings() + ); + } + + @SuppressWarnings("unchecked") + private Map preparedInputs(RuntimeProcessingModuleContext context) { + Object output = context.requireResult(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION).output(); + if (output instanceof Map map) { + return (Map) map; + } + return Map.of(); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java index 3e0292d..22a539f 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java @@ -2,6 +2,7 @@ package at.procon.eventhub.processing.eventprocessing.module; import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto; import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationScopeResult; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDto; import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto; @@ -66,7 +67,7 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess return new RuntimeProcessingModuleDescriptorDto( moduleKey(), "Driving-derived projections", - "Executes the shared driver working-time core from typed per-driver module outputs for driving interruptions, rest candidates, card-absence coverage, overnight candidates, and trip candidates.", + "Executes the shared driver working-time core from typed per-driver module outputs for driving interruptions, rest candidates, card-absence coverage, overnight candidates, and trip candidates; optional NDI and country-trip module results are attached when present.", "ESPER+JAVA", Set.of( DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS, @@ -75,7 +76,7 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess ), Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map"), - Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto") + Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto", "DriverNdiHomeClassificationResult", "DriverCountryTripSegmentationResult") ); } @@ -90,6 +91,8 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess Map preparedInputs = preparedInputs(context); DriverNdiHomeClassificationScopeResult ndiHomeClassificationScope = optionalNdiHomeClassificationScope(context); + DriverCountryTripSegmentationScopeResult countryTripSegmentationScope = + optionalCountryTripSegmentationScope(context); LinkedHashMap driverResults = new LinkedHashMap<>(); List warnings = new ArrayList<>(); @@ -126,7 +129,10 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess preparedInput.partition().partitionDebug(), ndiHomeClassificationScope == null ? null - : ndiHomeClassificationScope.resultForDriver(preparedInput.driverKey()) + : ndiHomeClassificationScope.resultForDriver(preparedInput.driverKey()), + countryTripSegmentationScope == null + ? null + : countryTripSegmentationScope.resultForDriver(preparedInput.driverKey()) )); } @@ -136,6 +142,10 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess if (ndiHomeClassificationScope != null) { notes.addAll(ndiHomeClassificationScope.notes()); } + if (countryTripSegmentationScope != null) { + notes.addAll(countryTripSegmentationScope.notes()); + warnings.addAll(countryTripSegmentationScope.warnings()); + } UnifiedRuntimeDriverWorkingTimeScopeResultDto result = new UnifiedRuntimeDriverWorkingTimeScopeResultDto( broadBundle.request(), @@ -215,6 +225,17 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess return null; } + private DriverCountryTripSegmentationScopeResult optionalCountryTripSegmentationScope( + RuntimeProcessingModuleContext context + ) { + RuntimeProcessingModuleResult result = + context.previousResults().get(DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION); + if (result != null && result.output() instanceof DriverCountryTripSegmentationScopeResult scopeResult) { + return scopeResult; + } + return null; + } + private UnifiedRuntimeProcessingApiRequest scopeRequest(RuntimeProcessingModuleContext context) { Object value = context.attributes().get("runtimeScopeApiRequest"); if (value instanceof UnifiedRuntimeProcessingApiRequest request) { diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java index 9f284f3..e3ce4ad 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeModuleKeys.java @@ -11,6 +11,7 @@ public final class DriverWorkingTimeModuleKeys { public static final String VEHICLE_EVIDENCE_ATTACHMENT = "vehicle-evidence-attachment"; public static final String SUPPORT_EVIDENCE_NORMALIZATION = "support-evidence-normalization"; public static final String NDI_HOME_CLASSIFICATION = "ndi-home-classification"; + public static final String COUNTRY_TRIP_SEGMENTATION = "country-trip-segmentation"; public static final String DRIVING_DERIVED_PROJECTIONS = "driving-derived-projections"; private DriverWorkingTimeModuleKeys() { diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverHomeClassificationRuntimeProcessingPlan.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverHomeClassificationRuntimeProcessingPlan.java index fc7cfee..d000949 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverHomeClassificationRuntimeProcessingPlan.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverHomeClassificationRuntimeProcessingPlan.java @@ -44,8 +44,7 @@ public class DriverHomeClassificationRuntimeProcessingPlan implements RuntimePro @Override public String description() { - return "Builds enriched non-driving intervals, learns company and driver home locations " - + "from one or more tachograph file sessions, and applies ordered HOME/NOT_HOME rules."; + return "Builds enriched non-driving intervals, learns company and driver home locations, applies ordered HOME/NOT_HOME rules, and creates country trip segments using explicit border events plus Nominatim-backed GNSS country resolution."; } @Override @@ -97,8 +96,10 @@ public class DriverHomeClassificationRuntimeProcessingPlan implements RuntimePro .forEach(modules::add); } modules.removeIf(moduleKey -> DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION.equals(moduleKey) + || DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION.equals(moduleKey) || DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS.equals(moduleKey)); modules.add(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION); + modules.add(DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION); modules.add(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS); Map parameters = new LinkedHashMap<>(); diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java index 16a085c..b28caf2 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java @@ -205,10 +205,19 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing Set.of("Map"), Set.of("DriverNdiHomeClassificationScopeResult") ), + new RuntimeProcessingModuleDescriptorDto( + DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION, + "Country trip segmentation", + "Builds country trip segments from explicit border crossings and GNSS country changes; missing countries are resolved through cached, rate-limited Nominatim reverse geocoding.", + "JAVA+HTTP", + Set.of(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION), + Set.of("Map"), + Set.of("DriverCountryTripSegmentationScopeResult") + ), new RuntimeProcessingModuleDescriptorDto( DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, "Driving-derived projections", - "Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates; attaches NDI HOME/NOT_HOME classification when that optional module was requested.", + "Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates; attaches optional NDI HOME/NOT_HOME and country-trip results when requested.", "ESPER+JAVA", Set.of( DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS, @@ -216,12 +225,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION ), Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map"), - Set.of("DriverWorkingTimeProcessingResultDto", "DriverNdiHomeClassificationResult") + Set.of("DriverWorkingTimeProcessingResultDto", "DriverNdiHomeClassificationResult", "DriverCountryTripSegmentationResult") ) )); if (!includeRuntimeEventAssemblyModule) { descriptors.removeIf(descriptor -> DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION - .equals(descriptor.moduleKey())); + .equals(descriptor.moduleKey()) + || DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION.equals(descriptor.moduleKey())); } return List.copyOf(descriptors); } @@ -505,6 +515,24 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing ) ); } + if (driverResult.countryTripSegmentation() != null) { + results.put( + DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION, + new RuntimeProcessingModuleResult( + DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION, + RuntimeProcessingModuleStatus.SUCCESS, + driverResult.countryTripSegmentation(), + Map.of( + "segmentCount", driverResult.countryTripSegmentation().segmentCount(), + "explicitBorderCrossingCount", driverResult.countryTripSegmentation().explicitBorderCrossingCount(), + "reverseGeocodingRemoteRequestCount", driverResult.countryTripSegmentation().reverseGeocodingRemoteRequestCount(), + "reverseGeocodingCacheHitCount", driverResult.countryTripSegmentation().reverseGeocodingCacheHitCount(), + "unresolvedCoordinateCount", driverResult.countryTripSegmentation().unresolvedCoordinateCount() + ), + driverResult.countryTripSegmentation().warnings() + ) + ); + } results.put( DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, new RuntimeProcessingModuleResult( @@ -545,6 +573,15 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "driverHomeClusterCount", driverResult.ndiHomeClassification().driverHomeClusterCount() )); } + if (driverResult.countryTripSegmentation() != null) { + metadata.put("countryTripSegmentation", Map.of( + "segmentCount", driverResult.countryTripSegmentation().segmentCount(), + "explicitBorderCrossingCount", driverResult.countryTripSegmentation().explicitBorderCrossingCount(), + "reverseGeocodingRemoteRequestCount", driverResult.countryTripSegmentation().reverseGeocodingRemoteRequestCount(), + "reverseGeocodingCacheHitCount", driverResult.countryTripSegmentation().reverseGeocodingCacheHitCount(), + "unresolvedCoordinateCount", driverResult.countryTripSegmentation().unresolvedCoordinateCount() + )); + } return metadata; } @@ -564,7 +601,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing driverResult.notes(), includeSupportEvidenceNormalization ? driverResult.supportEvidenceNormalization() : null, includePartitionDebug ? driverResult.partitionDebug() : null, - driverResult.ndiHomeClassification() + driverResult.ndiHomeClassification(), + driverResult.countryTripSegmentation() ); } @@ -760,6 +798,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing } boolean includeNdiHomeClassification = requested.remove(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION) != null; + boolean includeCountryTripSegmentation = + requested.remove(DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION) != null; requested.remove(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS); if (includeNdiHomeClassification) { requested.put( @@ -767,6 +807,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION ); } + if (includeCountryTripSegmentation) { + requested.put( + DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION, + DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION + ); + } requested.put( DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS, DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS @@ -776,6 +822,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing return modules().stream() .map(RuntimeProcessingModuleDescriptorDto::moduleKey) .filter(moduleKey -> !DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION.equals(moduleKey)) + .filter(moduleKey -> !DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION.equals(moduleKey)) .toList(); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 14a779b..fa59610 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -50,6 +50,26 @@ eventhub: concurrent-consumers: 4 block-when-full: true queue-offer-timeout: 5m + reverse-geocoding: + enabled: ${NOMINATIM_ENABLED:true} + provider: NOMINATIM + nominatim: + # Public OSM Nominatim requires an identifying User-Agent and no more than one request/second. + # Configure a self-hosted endpoint here when higher throughput is required. + base-url: ${NOMINATIM_BASE_URL:https://nominatim.openstreetmap.org} + # Deliberate opt-in is required for the donated public service. Prefer a self-hosted or contracted endpoint for production/bulk processing. + public-service-enabled: ${NOMINATIM_PUBLIC_SERVICE_ENABLED:false} + user-agent: ${NOMINATIM_USER_AGENT:eventhub-tachograph/0.1 (Nominatim reverse geocoding)} + email: ${NOMINATIM_EMAIL:} + accept-language: ${NOMINATIM_ACCEPT_LANGUAGE:en} + connect-timeout: ${NOMINATIM_CONNECT_TIMEOUT:10s} + read-timeout: ${NOMINATIM_READ_TIMEOUT:20s} + minimum-request-interval: ${NOMINATIM_MIN_REQUEST_INTERVAL:1s} + cache-ttl: ${NOMINATIM_CACHE_TTL:30d} + cache-max-entries: ${NOMINATIM_CACHE_MAX_ENTRIES:100000} + coordinate-decimal-places: ${NOMINATIM_COORDINATE_DECIMAL_PLACES:4} + max-remote-lookups-per-execution: ${NOMINATIM_MAX_REMOTE_LOOKUPS_PER_EXECUTION:25} + tachograph: default-chunk-days: 1 occurred-at-overlap: 7d diff --git a/src/test/java/at/procon/eventhub/geocoding/service/NominatimGeoCountryResolverTest.java b/src/test/java/at/procon/eventhub/geocoding/service/NominatimGeoCountryResolverTest.java new file mode 100644 index 0000000..c693bc1 --- /dev/null +++ b/src/test/java/at/procon/eventhub/geocoding/service/NominatimGeoCountryResolverTest.java @@ -0,0 +1,144 @@ +package at.procon.eventhub.geocoding.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.geocoding.model.GeoCountryResolution; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import java.io.IOException; +import java.math.BigDecimal; +import java.net.InetSocketAddress; +import java.net.http.HttpClient; +import java.nio.charset.StandardCharsets; +import java.time.Clock; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +class NominatimGeoCountryResolverTest { + + private HttpServer server; + + @AfterEach + void stopServer() { + if (server != null) { + server.stop(0); + } + } + + @Test + void resolvesCountryWithRequiredHeadersAndCachesCoordinate() throws Exception { + AtomicInteger requestCount = new AtomicInteger(); + AtomicReference userAgent = new AtomicReference<>(); + AtomicReference query = new AtomicReference<>(); + server = HttpServer.create(new InetSocketAddress(0), 0); + server.createContext("/reverse", exchange -> respond(exchange, requestCount, userAgent, query)); + server.start(); + + EventHubProperties properties = new EventHubProperties(); + EventHubProperties.Nominatim config = properties.getReverseGeocoding().getNominatim(); + config.setBaseUrl("http://localhost:" + server.getAddress().getPort()); + config.setUserAgent("eventhub-test/1.0"); + config.setMinimumRequestInterval(Duration.ZERO); + config.setCoordinateDecimalPlaces(4); + + NominatimGeoCountryResolver resolver = new NominatimGeoCountryResolver( + properties, + new ObjectMapper(), + HttpClient.newHttpClient(), + Clock.systemUTC() + ); + + GeoCountryResolution first = resolver.resolve( + new BigDecimal("48.208200"), + new BigDecimal("16.373800"), + true + ); + GeoCountryResolution second = resolver.resolve( + new BigDecimal("48.208200"), + new BigDecimal("16.373800"), + false + ); + + assertThat(first.resolved()).isTrue(); + assertThat(first.countryCode()).isEqualTo("AT"); + assertThat(first.remoteRequestPerformed()).isTrue(); + assertThat(first.cacheHit()).isFalse(); + assertThat(second.resolved()).isTrue(); + assertThat(second.cacheHit()).isTrue(); + assertThat(second.remoteRequestPerformed()).isFalse(); + assertThat(requestCount).hasValue(1); + assertThat(userAgent).hasValue("eventhub-test/1.0"); + assertThat(query.get()) + .contains("format=jsonv2") + .contains("zoom=3") + .contains("layer=address") + .contains("lat=48.208200") + .contains("lon=16.373800"); + } + + @Test + void requiresExplicitOptInForPublicOsmEndpoint() { + EventHubProperties properties = new EventHubProperties(); + NominatimGeoCountryResolver resolver = new NominatimGeoCountryResolver( + properties, + new ObjectMapper(), + HttpClient.newHttpClient(), + Clock.systemUTC() + ); + + GeoCountryResolution result = resolver.resolve( + new BigDecimal("48.2082"), + new BigDecimal("16.3738"), + true + ); + + assertThat(result.resolved()).isFalse(); + assertThat(result.remoteRequestPerformed()).isFalse(); + assertThat(result.errorMessage()).contains("explicit opt-in"); + } + + @Test + void doesNotCallRemoteServiceWhenDisabled() { + EventHubProperties properties = new EventHubProperties(); + properties.getReverseGeocoding().setEnabled(false); + NominatimGeoCountryResolver resolver = new NominatimGeoCountryResolver( + properties, + new ObjectMapper(), + HttpClient.newHttpClient(), + Clock.systemUTC() + ); + + GeoCountryResolution result = resolver.resolve( + new BigDecimal("48.2082"), + new BigDecimal("16.3738"), + true + ); + + assertThat(result.resolved()).isFalse(); + assertThat(result.remoteRequestPerformed()).isFalse(); + } + + private void respond( + HttpExchange exchange, + AtomicInteger requestCount, + AtomicReference userAgent, + AtomicReference query + ) throws IOException { + requestCount.incrementAndGet(); + userAgent.set(exchange.getRequestHeaders().getFirst("User-Agent")); + query.set(exchange.getRequestURI().getRawQuery()); + byte[] body = ("{\"place_id\":1,\"display_name\":\"Vienna, Austria\"," + + "\"licence\":\"Data © OpenStreetMap contributors, ODbL 1.0\"," + + "\"address\":{\"country\":\"Austria\",\"country_code\":\"at\"}}") + .getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/json"); + exchange.sendResponseHeaders(200, body.length); + exchange.getResponseBody().write(body); + exchange.close(); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/service/DriverCountryTripSegmentationServiceTest.java b/src/test/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/service/DriverCountryTripSegmentationServiceTest.java new file mode 100644 index 0000000..4d00757 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/driverworkingtime/tripsegmentation/service/DriverCountryTripSegmentationServiceTest.java @@ -0,0 +1,263 @@ +package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.geocoding.model.GeoCountryResolution; +import at.procon.eventhub.geocoding.model.GeoCountryResolutionStatus; +import at.procon.eventhub.geocoding.service.GeoCountryResolver; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDriverPartition; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeProcessingInput; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentBoundarySource; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationResult; +import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationScopeResult; +import at.procon.eventhub.processing.eventprocessing.support.RuntimeSupportEvidenceEvent; +import java.math.BigDecimal; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +class DriverCountryTripSegmentationServiceTest { + + private static final String DRIVER = "13:DRIVER-1"; + private static final String VEHICLE = "VIN-1"; + private static final String REGISTRATION = "W-12345"; + + @Test + void prefersExplicitBorderCrossingAndExistingCountryCodes() { + AtomicInteger resolverCalls = new AtomicInteger(); + GeoCountryResolver resolver = (latitude, longitude, allowRemoteLookup) -> { + resolverCalls.incrementAndGet(); + return unresolved(latitude, longitude); + }; + DriverCountryTripSegmentationService service = new DriverCountryTripSegmentationService( + resolver, + properties() + ); + List supportEvents = List.of( + position("p-at", "2026-05-01T08:05:00Z", "48.2082", "16.3738", "A"), + border("b-at-de", "2026-05-01T10:00:00Z", "48.75", "13.84", "A", "D"), + position("p-de", "2026-05-01T11:00:00Z", "48.90", "13.40", "D") + ); + + DriverCountryTripSegmentationScopeResult scope = service.segmentPreparedInputs( + Map.of(DRIVER, preparedInput(supportEvents)) + ); + DriverCountryTripSegmentationResult result = scope.resultForDriver(DRIVER); + + assertThat(resolverCalls).hasValue(0); + assertThat(result.segmentCount()).isEqualTo(2); + assertThat(result.explicitBorderCrossingCount()).isEqualTo(1); + assertThat(result.segments().get(0).countryFrom()).isEqualTo("AT"); + assertThat(result.segments().get(0).countryTo()).isEqualTo("DE"); + assertThat(result.segments().get(0).endBoundarySource()) + .isEqualTo(DriverCountryTripSegmentBoundarySource.EXPLICIT_BORDER_CROSSING); + assertThat(result.segments().get(1).countryFrom()).isEqualTo("DE"); + assertThat(result.segments().get(1).countryTo()).isEqualTo("DE"); + } + + @Test + void usesNominatimWhenPositionCountryIsMissing() { + AtomicInteger resolverCalls = new AtomicInteger(); + GeoCountryResolver resolver = (latitude, longitude, allowRemoteLookup) -> { + resolverCalls.incrementAndGet(); + String code = longitude.compareTo(new BigDecimal("15")) > 0 ? "AT" : "DE"; + return new GeoCountryResolution( + GeoCountryResolutionStatus.RESOLVED, + latitude, + longitude, + code, + null, + null, + "NOMINATIM", + "Data © OpenStreetMap contributors, ODbL 1.0", + false, + true, + null + ); + }; + DriverCountryTripSegmentationService service = new DriverCountryTripSegmentationService( + resolver, + properties() + ); + List supportEvents = List.of( + position("p-at", "2026-05-01T08:05:00Z", "48.2082", "16.3738", null), + position("p-de", "2026-05-01T10:00:00Z", "48.90", "13.40", null) + ); + + DriverCountryTripSegmentationResult result = service.segmentPreparedInputs( + Map.of(DRIVER, preparedInput(supportEvents)) + ).resultForDriver(DRIVER); + + assertThat(resolverCalls).hasValue(2); + assertThat(result.reverseGeocodingRemoteRequestCount()).isEqualTo(2); + assertThat(result.segmentCount()).isEqualTo(2); + assertThat(result.segments().get(0).countryFrom()).isEqualTo("AT"); + assertThat(result.segments().get(0).countryTo()).isEqualTo("DE"); + assertThat(result.segments().get(0).endBoundarySource()) + .isEqualTo(DriverCountryTripSegmentBoundarySource.NOMINATIM_COUNTRY_CHANGE); + assertThat(result.segments().get(0).boundaryCountryReverseGeocoded()).isTrue(); + } + + private EventHubProperties properties() { + EventHubProperties properties = new EventHubProperties(); + properties.getReverseGeocoding().getNominatim().setMaxRemoteLookupsPerExecution(10); + return properties; + } + + private DriverWorkingTimePreparedInput preparedInput(List supportEvents) { + UUID sessionId = UUID.randomUUID(); + DriverWorkingTimeActivityInterval drive = new DriverWorkingTimeActivityInterval( + sessionId, + DRIVER, + "drive-1", + "DRIVE", + "1", + "INSERTED", + "SINGLE", + REGISTRATION, + VEHICLE, + "TACHOGRAPH_FILE_SESSION", + "drive-start", + "drive-end", + OffsetDateTime.parse("2026-05-01T08:00:00Z"), + OffsetDateTime.parse("2026-05-01T12:00:00Z"), + OffsetDateTime.parse("2026-05-01T08:00:00Z").toEpochSecond(), + OffsetDateTime.parse("2026-05-01T12:00:00Z").toEpochSecond(), + 4 * 60 * 60, + List.of("drive-1"), + false, + false, + "RAW" + ); + DriverWorkingTimeProcessingInput processingInput = new DriverWorkingTimeProcessingInput( + sessionId, + DRIVER, + "TACHOGRAPH_FILE_SESSION", + drive.startedAt(), + drive.endedAt(), + drive.startedAt(), + drive.endedAt(), + 3, + 720, + List.of(drive), + List.of(), + supportEvents, + List.of() + ); + DriverWorkingTimeDriverPartition partition = new DriverWorkingTimeDriverPartition( + DRIVER, + List.of(), + List.of(), + List.of(), + List.of(), + List.of(), + null, + supportEvents, + null, + List.of(), + List.of() + ); + return new DriverWorkingTimePreparedInput(DRIVER, partition, processingInput); + } + + private RuntimeSupportEvidenceEvent position( + String eventId, + String occurredAt, + String latitude, + String longitude, + String countryCode + ) { + return supportEvent( + eventId, + occurredAt, + "POSITION", + "POSITION_RECORDED", + latitude, + longitude, + countryCode, + null, + null + ); + } + + private RuntimeSupportEvidenceEvent border( + String eventId, + String occurredAt, + String latitude, + String longitude, + String countryFrom, + String countryTo + ) { + return supportEvent( + eventId, + occurredAt, + "BORDER_CROSSING", + "BORDER_OUTBOUND", + latitude, + longitude, + null, + countryFrom, + countryTo + ); + } + + private RuntimeSupportEvidenceEvent supportEvent( + String eventId, + String occurredAt, + String eventDomain, + String eventType, + String latitude, + String longitude, + String countryCode, + String countryFrom, + String countryTo + ) { + OffsetDateTime occurred = OffsetDateTime.parse(occurredAt); + return new RuntimeSupportEvidenceEvent( + eventId, + "TACHOGRAPH_FILE_SESSION", + "VEHICLE_UNIT", + eventDomain, + eventType, + "SNAPSHOT", + DRIVER, + VEHICLE, + REGISTRATION, + occurred, + occurred.toEpochSecond(), + new BigDecimal(latitude), + new BigDecimal(longitude), + countryCode, + null, + countryFrom, + countryTo, + null, + null, + null, + null, + Map.of() + ); + } + + private GeoCountryResolution unresolved(BigDecimal latitude, BigDecimal longitude) { + return new GeoCountryResolution( + GeoCountryResolutionStatus.NOT_FOUND, + latitude, + longitude, + null, + null, + null, + "NOMINATIM", + null, + false, + false, + "not found" + ); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java index 2226e5a..8a0b15b 100644 --- a/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java @@ -239,6 +239,7 @@ class DriverWorkingTimeRuntimeProcessingPlanTest { assertThat(delegated.processingPlanKey()).isEqualTo(DriverWorkingTimeRuntimeProcessingPlan.PLAN_KEY); assertThat(delegated.modules()).endsWith( DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION, + DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION, DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS ); assertThat(delegated.parameters())