Add country trip segmentation runtime module

This commit is contained in:
trifonovt 2026-06-17 10:48:02 +02:00
parent 5a10558612
commit c91d3cc1c4
22 changed files with 2119 additions and 76 deletions

View File

@ -1,6 +1,6 @@
# NDI HOME / NOT_HOME classification implementation
# NDI HOME / NOT_HOME classification and country trip segmentation
This patch implements the HOME / NOT_HOME part of `docs/ndi_home_classification_en.md` as a dedicated runtime processing plan while reusing the existing driver-working-time pipeline.
This patch implements the HOME / NOT_HOME classification and the country-trip segmentation described in `docs/ndi_home_classification_en.md`. It reuses the existing driver-working-time pipeline and adds configurable Nominatim reverse geocoding only where source country evidence is missing.
## Public processing plan
@ -10,19 +10,20 @@ Use:
driver-home-classification-v1
```
The plan delegates to the shared `driver-working-time-v1` pipeline and explicitly inserts:
The dedicated plan delegates to the shared `driver-working-time-v1` pipeline and explicitly inserts:
```text
support-evidence-normalization
-> ndi-home-classification
-> country-trip-segmentation
-> driving-derived-projections
```
The original `driver-working-time-v1` plan does not run the optional NDI module by default. It can opt in by explicitly requesting `ndi-home-classification`.
The normal `driver-working-time-v1` plan keeps both modules optional. They can also be requested explicitly as `ndi-home-classification` and `country-trip-segmentation`.
## Reused projection structures
`DriverWorkingTimeReusableProjectionBuilder.buildAllNonDrivingIntervalCoverage(...)` runs the existing Esper interruption/card-absence/GNSS enrichment pipeline with a zero rest-candidate threshold. It therefore creates enriched evidence for every positive non-driving interruption without changing the legacy daily/weekly-rest threshold or outputs.
`DriverWorkingTimeReusableProjectionBuilder.buildAllNonDrivingIntervalCoverage(...)` runs the existing Esper interruption/card-absence/GNSS enrichment pipeline with a zero rest-candidate threshold. It creates enriched evidence for every positive non-driving interruption without changing the legacy daily/weekly-rest threshold or outputs.
The implementation reuses `DriverWorkingTimeRestCoverageInterval` as the enriched NDI evidence model. It provides:
@ -32,7 +33,7 @@ The implementation reuses `DriverWorkingTimeRestCoverageInterval` as the enriche
- begin/end boundary GNSS evidence;
- boundary odometer and movement evidence.
## Implemented classification rules
## HOME / NOT_HOME classification
The rules are evaluated in the document order:
@ -48,89 +49,113 @@ Every classification contains a `DriverNdiHomeClassificationReason`, so the firs
## Location learning and clustering
Only NDIs longer than 7.5 hours with a position are added to the corpus.
Position selection follows the document through the existing boundary-evidence resolver:
```text
resolved begin-boundary evidence for the previous driving/vehicle context,
otherwise resolved end-boundary evidence for the next driving/vehicle context
```
The selected evidence is the closest eligible support-position event within the configured boundary lookup window, so it is an approximation when no event exists exactly at the driving boundary.
Only NDIs longer than 7.5 hours with a position are added to the corpus. Position selection uses the existing resolved begin-boundary evidence and falls back to resolved end-boundary evidence.
The in-memory cache:
- accumulates observations across one or more file-session executions;
- deduplicates the same NDI across repeated/overlapping sessions;
- retains the source session IDs as provenance;
- retains source-session provenance;
- stores the driver key on every observation;
- does not permanently mark a driver as "actual" or "other".
For each result driver, the same cached corpus is viewed as:
```text
actual-driver observations
other-driver observations
```
This makes the distinction request-relative and allows the corpus to be reused for another driver.
- calculates actual-driver and other-driver views per request.
Clustering uses Java DBSCAN with Haversine distance. Defaults are 150 metres and three points. Noise observations remain in the denominator for visit-share calculations but are never home clusters.
## File-session learning scope
## Country trip segmentation
The dedicated plan defaults `ndiLearnAllFileSessionDrivers` to `true`.
`DriverCountryTripSegmentationService` builds country segments over driving intervals.
For a request with explicit canonical driver keys, the plan internally loads all drivers from the selected file sessions for location learning and filters the response back to the originally requested drivers.
Evidence precedence is:
The scope is not broadened when:
1. explicit tachograph border-crossing event (`countryFrom` / `countryTo`);
2. country code already present on a positioned support event;
3. Nominatim reverse lookup for a positioned event without a usable country code.
- the source selection is mixed or database-only;
- the option is disabled;
- the request uses only alternate card/source selectors and cannot be filtered safely by canonical driver key.
Country values are normalized to ISO 3166-1 alpha-2 where a mapping is known. Segment boundaries retain their evidence source:
## Configuration
```text
EXPLICIT_BORDER_CROSSING
GNSS_SOURCE_COUNTRY_CHANGE
NOMINATIM_COUNTRY_CHANGE
VEHICLE_CHANGE
FINAL
```
The defaults are under:
The result includes segment counts, explicit-border counts, remote lookup counts, cache-hit counts, unresolved-coordinate counts, warnings, and OpenStreetMap attribution.
## Nominatim integration
The client uses the reverse endpoint with:
```text
format=jsonv2
zoom=3
addressdetails=1
layer=address
```
Only `address.country_code` is required by the classification/segmentation logic. Failures do not fail the whole processing plan; the coordinate remains unresolved and a diagnostic warning is returned.
Safeguards:
- identifying configurable `User-Agent`;
- optional identifying email;
- shared coordinate cache with TTL and maximum size;
- coordinate quantization for cache reuse;
- one execution-level remote lookup budget;
- fully serialized remote calls;
- configurable minimum interval;
- enforced minimum one-second interval for `nominatim.openstreetmap.org`;
- public OSM endpoint disabled unless deliberately opted in;
- configurable endpoint so a self-hosted or contracted Nominatim service can be substituted without code changes.
### Configuration
```yaml
eventhub:
tachograph-file-session:
processing:
ndi-long-minutes: 450
ndi-very-long-minutes: 1440
ndi-card-removal-percent: 80
ndi-visit-share-percent: 25
ndi-dbscan-eps-meters: 150
ndi-dbscan-min-points: 3
ndi-location-cache-ttl: 4h
ndi-location-cache-max-observations: 100000
ndi-location-cache-namespace: default
reverse-geocoding:
enabled: true
provider: NOMINATIM
nominatim:
base-url: https://nominatim.openstreetmap.org
public-service-enabled: false
user-agent: eventhub-tachograph/0.1 (Nominatim reverse geocoding)
email: ""
accept-language: en
connect-timeout: 10s
read-timeout: 20s
minimum-request-interval: 1s
cache-ttl: 30d
cache-max-entries: 100000
coordinate-decimal-places: 4
max-remote-lookups-per-execution: 25
```
For tenantless uploaded sessions, configure a namespace that prevents unrelated operational contexts from sharing a corpus. Explicit tenant keys always create tenant-scoped corpora.
Environment variables use the `NOMINATIM_*` names shown in `application.yml`.
## Response extension
For a self-hosted endpoint, set `NOMINATIM_BASE_URL`; `public-service-enabled` is not needed. For deliberately selected, policy-compliant, low-volume use of the donated public endpoint, additionally set:
Each driver partition can now contain:
```text
NOMINATIM_PUBLIC_SERVICE_ENABLED=true
NOMINATIM_USER_AGENT=<application/version and contact identifier>
NOMINATIM_EMAIL=<contact email when appropriate>
```
Production or recurring tachograph batch processing should use a self-hosted instance or a provider whose terms cover the expected workload. Coordinates may reveal vehicle or driver movements; do not send confidential or personal-location data to a public endpoint without an appropriate legal and privacy basis.
## File-session learning scope
The dedicated plan defaults `ndiLearnAllFileSessionDrivers` to `true`. For a request with explicit canonical driver keys, it internally loads all drivers from selected file sessions for location learning and filters the response back to the originally requested drivers.
The scope is not broadened when the source is mixed/database-only, the option is disabled, or the result cannot safely be filtered by canonical driver key.
## Response extensions
Each driver partition can contain:
```text
ndiHomeClassification
countryTripSegmentation
```
It includes:
- all NDI classifications;
- company and driver home cluster IDs;
- cluster centroids and visit statistics;
- actual-driver versus other-driver cached observation counts;
- diagnostics and notes.
The field is omitted when the optional module was not executed, preserving the existing JSON shape for normal `driver-working-time-v1` calls.
## Current implementation boundary
This patch implements sections 1-4 of the document: NDI derivation/enrichment, location clustering, home-location determination, and HOME / NOT_HOME classification.
Section 5, border-crossing/country trip segmentation, is intentionally not included yet. It needs a separate country-resolution abstraction and a decision between local geographic data, PostGIS, or an external reverse-geocoding provider.
The fields are omitted when their optional modules were not executed, preserving the existing JSON shape for normal `driver-working-time-v1` calls.

View File

@ -33,6 +33,7 @@ public class EventHubProperties {
private final RuntimeProcessing runtimeProcessing = new RuntimeProcessing();
private final TachographFileSession tachographFileSession = new TachographFileSession(runtimeProcessing);
private final EsperPoc esperPoc = new EsperPoc();
private final ReverseGeocoding reverseGeocoding = new ReverseGeocoding();
private final YellowFox yellowFox = new YellowFox();
public Batch getBatch() {
@ -55,10 +56,174 @@ public class EventHubProperties {
return esperPoc;
}
public ReverseGeocoding getReverseGeocoding() {
return reverseGeocoding;
}
public YellowFox getYellowFox() {
return yellowFox;
}
public static class ReverseGeocoding {
private boolean enabled = true;
private String provider = "NOMINATIM";
private final Nominatim nominatim = new Nominatim();
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getProvider() {
return provider;
}
public void setProvider(String provider) {
if (provider != null && !provider.isBlank()) {
this.provider = provider.trim().toUpperCase(java.util.Locale.ROOT);
}
}
public Nominatim getNominatim() {
return nominatim;
}
}
public static class Nominatim {
private String baseUrl = "https://nominatim.openstreetmap.org";
private boolean publicServiceEnabled = false;
private String userAgent = "eventhub-tachograph/0.1 (Nominatim reverse geocoding)";
private String email;
private String acceptLanguage = "en";
private Duration connectTimeout = Duration.ofSeconds(10);
private Duration readTimeout = Duration.ofSeconds(20);
private Duration minimumRequestInterval = Duration.ofSeconds(1);
private Duration cacheTtl = Duration.ofDays(30);
private int cacheMaxEntries = 100000;
private int coordinateDecimalPlaces = 4;
private int maxRemoteLookupsPerExecution = 25;
public String getBaseUrl() {
return baseUrl;
}
public void setBaseUrl(String baseUrl) {
if (baseUrl == null || baseUrl.isBlank()) {
return;
}
String normalized = baseUrl.trim();
while (normalized.endsWith("/")) {
normalized = normalized.substring(0, normalized.length() - 1);
}
if (!normalized.isBlank()) {
this.baseUrl = normalized;
}
}
public boolean isPublicServiceEnabled() {
return publicServiceEnabled;
}
public void setPublicServiceEnabled(boolean publicServiceEnabled) {
this.publicServiceEnabled = publicServiceEnabled;
}
public String getUserAgent() {
return userAgent;
}
public void setUserAgent(String userAgent) {
if (userAgent != null && !userAgent.isBlank()) {
this.userAgent = userAgent.trim();
}
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email == null || email.isBlank() ? null : email.trim();
}
public String getAcceptLanguage() {
return acceptLanguage;
}
public void setAcceptLanguage(String acceptLanguage) {
if (acceptLanguage != null && !acceptLanguage.isBlank()) {
this.acceptLanguage = acceptLanguage.trim();
}
}
public Duration getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(Duration connectTimeout) {
if (connectTimeout != null && !connectTimeout.isNegative() && !connectTimeout.isZero()) {
this.connectTimeout = connectTimeout;
}
}
public Duration getReadTimeout() {
return readTimeout;
}
public void setReadTimeout(Duration readTimeout) {
if (readTimeout != null && !readTimeout.isNegative() && !readTimeout.isZero()) {
this.readTimeout = readTimeout;
}
}
public Duration getMinimumRequestInterval() {
return minimumRequestInterval;
}
public void setMinimumRequestInterval(Duration minimumRequestInterval) {
if (minimumRequestInterval != null && !minimumRequestInterval.isNegative()) {
this.minimumRequestInterval = minimumRequestInterval;
}
}
public Duration getCacheTtl() {
return cacheTtl;
}
public void setCacheTtl(Duration cacheTtl) {
if (cacheTtl != null && !cacheTtl.isNegative() && !cacheTtl.isZero()) {
this.cacheTtl = cacheTtl;
}
}
public int getCacheMaxEntries() {
return cacheMaxEntries;
}
public void setCacheMaxEntries(int cacheMaxEntries) {
this.cacheMaxEntries = Math.max(100, cacheMaxEntries);
}
public int getCoordinateDecimalPlaces() {
return coordinateDecimalPlaces;
}
public void setCoordinateDecimalPlaces(int coordinateDecimalPlaces) {
this.coordinateDecimalPlaces = Math.max(0, Math.min(7, coordinateDecimalPlaces));
}
public int getMaxRemoteLookupsPerExecution() {
return maxRemoteLookupsPerExecution;
}
public void setMaxRemoteLookupsPerExecution(int maxRemoteLookupsPerExecution) {
this.maxRemoteLookupsPerExecution = Math.max(0, maxRemoteLookupsPerExecution);
}
}
public static class EsperPoc {
private EsperActivityMergeMode activityMergeMode = EsperActivityMergeMode.JAVA;
private EsperShiftResolutionMode shiftResolutionMode = EsperShiftResolutionMode.JAVA;

View File

@ -0,0 +1,39 @@
package at.procon.eventhub.geocoding.model;
import java.math.BigDecimal;
public record GeoCountryResolution(
GeoCountryResolutionStatus status,
BigDecimal latitude,
BigDecimal longitude,
String countryCode,
String countryName,
String displayName,
String provider,
String attribution,
boolean cacheHit,
boolean remoteRequestPerformed,
String errorMessage
) {
public boolean resolved() {
return status == GeoCountryResolutionStatus.RESOLVED
&& countryCode != null
&& !countryCode.isBlank();
}
public GeoCountryResolution asCacheHit(BigDecimal requestedLatitude, BigDecimal requestedLongitude) {
return new GeoCountryResolution(
status,
requestedLatitude,
requestedLongitude,
countryCode,
countryName,
displayName,
provider,
attribution,
true,
false,
errorMessage
);
}
}

View File

@ -0,0 +1,9 @@
package at.procon.eventhub.geocoding.model;
public enum GeoCountryResolutionStatus {
RESOLVED,
NOT_FOUND,
DISABLED,
REMOTE_LOOKUP_NOT_ALLOWED,
ERROR
}

View File

@ -0,0 +1,19 @@
package at.procon.eventhub.geocoding.service;
import at.procon.eventhub.geocoding.model.GeoCountryResolution;
import java.math.BigDecimal;
/**
* Resolves a WGS84 coordinate to an ISO country code.
*
* <p>The {@code allowRemoteLookup} flag lets a caller enforce a per-execution
* request budget while still benefiting from shared cached results.</p>
*/
public interface GeoCountryResolver {
GeoCountryResolution resolve(
BigDecimal latitude,
BigDecimal longitude,
boolean allowRemoteLookup
);
}

View File

@ -0,0 +1,426 @@
package at.procon.eventhub.geocoding.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.geocoding.model.GeoCountryResolution;
import at.procon.eventhub.geocoding.model.GeoCountryResolutionStatus;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.net.URI;
import java.net.URLEncoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* Blocking Nominatim reverse-geocoding client with a shared coordinate cache
* and a process-wide request gate.
*
* <p>The public OpenStreetMap Nominatim service permits at most one request per
* second. The default configuration therefore uses a one-second minimum
* interval. Self-hosted Nominatim installations may configure another value.</p>
*/
@Component
public class NominatimGeoCountryResolver implements GeoCountryResolver {
private static final Logger LOG = LoggerFactory.getLogger(NominatimGeoCountryResolver.class);
private static final String PROVIDER = "NOMINATIM";
private final EventHubProperties properties;
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final Clock clock;
private final Map<CoordinateKey, CacheEntry> cache = new ConcurrentHashMap<>();
private final AtomicLong nextRemoteRequestNanos = new AtomicLong(0L);
private final Object requestGate = new Object();
public NominatimGeoCountryResolver(
EventHubProperties properties,
ObjectMapper objectMapper
) {
this(
properties,
objectMapper,
HttpClient.newBuilder()
.connectTimeout(properties.getReverseGeocoding().getNominatim().getConnectTimeout())
.build(),
Clock.systemUTC()
);
}
NominatimGeoCountryResolver(
EventHubProperties properties,
ObjectMapper objectMapper,
HttpClient httpClient,
Clock clock
) {
this.properties = Objects.requireNonNull(properties, "properties must not be null");
this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper must not be null");
this.httpClient = Objects.requireNonNull(httpClient, "httpClient must not be null");
this.clock = Objects.requireNonNull(clock, "clock must not be null");
}
@Override
public GeoCountryResolution resolve(
BigDecimal latitude,
BigDecimal longitude,
boolean allowRemoteLookup
) {
if (!validCoordinate(latitude, longitude)) {
return result(
GeoCountryResolutionStatus.ERROR,
latitude,
longitude,
null,
null,
null,
false,
false,
"Latitude/longitude is missing or outside the WGS84 range."
);
}
EventHubProperties.ReverseGeocoding reverseConfig = properties.getReverseGeocoding();
EventHubProperties.Nominatim config = reverseConfig.getNominatim();
CoordinateKey key = CoordinateKey.of(latitude, longitude, config.getCoordinateDecimalPlaces());
CacheEntry cached = cache.get(key);
Instant now = clock.instant();
if (cached != null && cached.expiresAt().isAfter(now)) {
return cached.resolution().asCacheHit(latitude, longitude);
}
if (cached != null) {
cache.remove(key, cached);
}
if (!reverseConfig.isEnabled() || !"NOMINATIM".equalsIgnoreCase(reverseConfig.getProvider())) {
return result(
GeoCountryResolutionStatus.DISABLED,
latitude,
longitude,
null,
null,
null,
false,
false,
"Nominatim reverse geocoding is disabled."
);
}
if (isPublicService(config) && !config.isPublicServiceEnabled()) {
return result(
GeoCountryResolutionStatus.DISABLED,
latitude,
longitude,
null,
null,
null,
false,
false,
"The public nominatim.openstreetmap.org service requires explicit opt-in. "
+ "Set eventhub.reverse-geocoding.nominatim.public-service-enabled=true "
+ "only for policy-compliant low-volume use, or configure another Nominatim endpoint."
);
}
if (!allowRemoteLookup) {
return result(
GeoCountryResolutionStatus.REMOTE_LOOKUP_NOT_ALLOWED,
latitude,
longitude,
null,
null,
null,
false,
false,
"The reverse-geocoding budget for this processing execution was exhausted."
);
}
GeoCountryResolution resolved = resolveRemote(latitude, longitude, config);
if (resolved.status() == GeoCountryResolutionStatus.RESOLVED
|| resolved.status() == GeoCountryResolutionStatus.NOT_FOUND) {
putCache(key, resolved, now.plus(config.getCacheTtl()), config.getCacheMaxEntries());
}
return resolved;
}
private GeoCountryResolution resolveRemote(
BigDecimal latitude,
BigDecimal longitude,
EventHubProperties.Nominatim config
) {
try {
synchronized (requestGate) {
awaitRequestPermit(effectiveMinimumRequestInterval(config));
URI uri = reverseUri(latitude, longitude, config);
HttpRequest request = HttpRequest.newBuilder(uri)
.timeout(config.getReadTimeout())
.header("Accept", "application/json")
.header("Accept-Language", config.getAcceptLanguage())
.header("User-Agent", config.getUserAgent())
.GET()
.build();
HttpResponse<String> response = httpClient.send(
request,
HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)
);
if (response.statusCode() == 404) {
return result(
GeoCountryResolutionStatus.NOT_FOUND,
latitude,
longitude,
null,
null,
null,
false,
true,
"Nominatim returned no address for the coordinate."
);
}
if (response.statusCode() / 100 != 2) {
return result(
GeoCountryResolutionStatus.ERROR,
latitude,
longitude,
null,
null,
null,
false,
true,
"Nominatim reverse lookup failed with HTTP status " + response.statusCode() + "."
);
}
JsonNode root = objectMapper.readTree(response.body());
if (root.hasNonNull("error")) {
return result(
GeoCountryResolutionStatus.NOT_FOUND,
latitude,
longitude,
null,
null,
text(root, "display_name"),
false,
true,
root.get("error").asText()
);
}
JsonNode address = root.path("address");
String countryCode = normalizeCountryCode(text(address, "country_code"));
String countryName = text(address, "country");
if (countryCode == null) {
return result(
GeoCountryResolutionStatus.NOT_FOUND,
latitude,
longitude,
null,
countryName,
text(root, "display_name"),
false,
true,
"Nominatim response did not contain address.country_code."
);
}
return new GeoCountryResolution(
GeoCountryResolutionStatus.RESOLVED,
latitude,
longitude,
countryCode,
countryName,
text(root, "display_name"),
PROVIDER,
text(root, "licence"),
false,
true,
null
);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return result(
GeoCountryResolutionStatus.ERROR,
latitude,
longitude,
null,
null,
null,
false,
true,
"Nominatim reverse lookup was interrupted."
);
} catch (IOException | RuntimeException ex) {
LOG.warn("Nominatim reverse lookup failed for {},{}: {}", latitude, longitude, ex.getMessage());
return result(
GeoCountryResolutionStatus.ERROR,
latitude,
longitude,
null,
null,
null,
false,
true,
"Nominatim reverse lookup failed: " + ex.getMessage()
);
}
}
private URI reverseUri(
BigDecimal latitude,
BigDecimal longitude,
EventHubProperties.Nominatim config
) {
StringBuilder query = new StringBuilder(config.getBaseUrl())
.append("/reverse?format=jsonv2")
.append("&lat=").append(url(latitude.toPlainString()))
.append("&lon=").append(url(longitude.toPlainString()))
.append("&zoom=3")
.append("&addressdetails=1")
.append("&layer=address")
.append("&accept-language=").append(url(config.getAcceptLanguage()));
if (config.getEmail() != null && !config.getEmail().isBlank()) {
query.append("&email=").append(url(config.getEmail()));
}
return URI.create(query.toString());
}
private Duration effectiveMinimumRequestInterval(EventHubProperties.Nominatim config) {
Duration configured = config.getMinimumRequestInterval() == null
? Duration.ZERO
: config.getMinimumRequestInterval();
try {
if (isPublicService(config)
&& configured.compareTo(Duration.ofSeconds(1)) < 0) {
return Duration.ofSeconds(1);
}
} catch (RuntimeException ignored) {
}
return configured;
}
private boolean isPublicService(EventHubProperties.Nominatim config) {
try {
return "nominatim.openstreetmap.org".equalsIgnoreCase(
URI.create(config.getBaseUrl()).getHost()
);
} catch (RuntimeException ignored) {
return false;
}
}
private void awaitRequestPermit(Duration minimumInterval) throws InterruptedException {
long intervalNanos = minimumInterval == null ? 0L : Math.max(0L, minimumInterval.toNanos());
long now = System.nanoTime();
long allowedAt = nextRemoteRequestNanos.get();
long waitNanos = allowedAt - now;
if (waitNanos > 0L) {
long millis = waitNanos / 1_000_000L;
int nanos = (int) (waitNanos % 1_000_000L);
Thread.sleep(millis, nanos);
}
nextRemoteRequestNanos.set(System.nanoTime() + intervalNanos);
}
private void putCache(
CoordinateKey key,
GeoCountryResolution resolution,
Instant expiresAt,
int maxEntries
) {
cache.put(key, new CacheEntry(resolution, expiresAt, clock.instant()));
if (cache.size() <= maxEntries) {
return;
}
Instant now = clock.instant();
cache.entrySet().removeIf(entry -> !entry.getValue().expiresAt().isAfter(now));
while (cache.size() > maxEntries) {
cache.entrySet().stream()
.min(Comparator.comparing(entry -> entry.getValue().createdAt()))
.map(Map.Entry::getKey)
.ifPresent(cache::remove);
}
}
private boolean validCoordinate(BigDecimal latitude, BigDecimal longitude) {
return latitude != null
&& longitude != null
&& latitude.compareTo(BigDecimal.valueOf(-90)) >= 0
&& latitude.compareTo(BigDecimal.valueOf(90)) <= 0
&& longitude.compareTo(BigDecimal.valueOf(-180)) >= 0
&& longitude.compareTo(BigDecimal.valueOf(180)) <= 0;
}
private String normalizeCountryCode(String value) {
if (value == null || value.isBlank()) {
return null;
}
String normalized = value.trim().toUpperCase(Locale.ROOT);
return normalized.length() == 2 ? normalized : null;
}
private String text(JsonNode node, String field) {
if (node == null || !node.has(field) || node.get(field).isNull()) {
return null;
}
String value = node.get(field).asText();
return value == null || value.isBlank() ? null : value.trim();
}
private String url(String value) {
return URLEncoder.encode(value == null ? "" : value, StandardCharsets.UTF_8);
}
private GeoCountryResolution result(
GeoCountryResolutionStatus status,
BigDecimal latitude,
BigDecimal longitude,
String countryCode,
String countryName,
String displayName,
boolean cacheHit,
boolean remoteRequestPerformed,
String errorMessage
) {
return new GeoCountryResolution(
status,
latitude,
longitude,
countryCode,
countryName,
displayName,
PROVIDER,
null,
cacheHit,
remoteRequestPerformed,
errorMessage
);
}
private record CoordinateKey(BigDecimal latitude, BigDecimal longitude) {
static CoordinateKey of(BigDecimal latitude, BigDecimal longitude, int decimalPlaces) {
return new CoordinateKey(
latitude.setScale(decimalPlaces, RoundingMode.HALF_UP),
longitude.setScale(decimalPlaces, RoundingMode.HALF_UP)
);
}
}
private record CacheEntry(
GeoCountryResolution resolution,
Instant expiresAt,
Instant createdAt
) {
}
}

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
public record DriverCountryTripSegment(
String segmentId,
String driverKey,
String registrationKey,
String vehicleKey,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
String countryFrom,
String countryTo,
BigDecimal latitudeFrom,
BigDecimal longitudeFrom,
BigDecimal latitudeTo,
BigDecimal longitudeTo,
String positionFromEventId,
String positionToEventId,
DriverCountryTripSegmentBoundarySource endBoundarySource,
String boundaryEventId,
boolean boundaryCountryReverseGeocoded
) {
}

View File

@ -0,0 +1,9 @@
package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model;
public enum DriverCountryTripSegmentBoundarySource {
EXPLICIT_BORDER_CROSSING,
GNSS_SOURCE_COUNTRY_CHANGE,
NOMINATIM_COUNTRY_CHANGE,
VEHICLE_CHANGE,
FINAL
}

View File

@ -0,0 +1,24 @@
package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model;
import java.util.List;
public record DriverCountryTripSegmentationResult(
String driverKey,
int drivingIntervalCount,
int supportingGeoEventCount,
int explicitBorderCrossingCount,
int reverseGeocodingRemoteRequestCount,
int reverseGeocodingCacheHitCount,
int unresolvedCoordinateCount,
int segmentCount,
String reverseGeocodingAttribution,
List<DriverCountryTripSegment> segments,
List<String> notes,
List<String> warnings
) {
public DriverCountryTripSegmentationResult {
segments = segments == null ? List.of() : List.copyOf(segments);
notes = notes == null ? List.of() : List.copyOf(notes);
warnings = warnings == null ? List.of() : List.copyOf(warnings);
}
}

View File

@ -0,0 +1,29 @@
package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public record DriverCountryTripSegmentationScopeResult(
int driverCount,
int segmentCount,
int reverseGeocodingRemoteRequestCount,
int reverseGeocodingCacheHitCount,
int unresolvedCoordinateCount,
String reverseGeocodingAttribution,
Map<String, DriverCountryTripSegmentationResult> driverResults,
List<String> notes,
List<String> warnings
) {
public DriverCountryTripSegmentationScopeResult {
driverResults = driverResults == null
? Map.of()
: java.util.Collections.unmodifiableMap(new LinkedHashMap<>(driverResults));
notes = notes == null ? List.of() : List.copyOf(notes);
warnings = warnings == null ? List.of() : List.copyOf(warnings);
}
public DriverCountryTripSegmentationResult resultForDriver(String driverKey) {
return driverKey == null ? null : driverResults.get(driverKey);
}
}

View File

@ -0,0 +1,105 @@
package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.service;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
final class CountryCodeNormalizer {
private static final Map<String, String> TACHOGRAPH_TO_ISO2 = buildTachographMap();
private CountryCodeNormalizer() {
}
static String normalizeTachograph(String value) {
if (value == null || value.isBlank()) {
return null;
}
String normalized = value.trim().toUpperCase(Locale.ROOT);
String mapped = TACHOGRAPH_TO_ISO2.get(normalized);
if (mapped != null) {
return mapped;
}
return normalizeIso(normalized);
}
static String normalizeIso(String value) {
if (value == null || value.isBlank()) {
return null;
}
String normalized = value.trim().toUpperCase(Locale.ROOT);
if (normalized.length() == 2) {
return normalized;
}
for (String iso2 : Locale.getISOCountries()) {
Locale locale = Locale.of("", iso2);
try {
if (locale.getISO3Country().equalsIgnoreCase(normalized)) {
return iso2;
}
} catch (java.util.MissingResourceException ignored) {
}
}
return null;
}
private static Map<String, String> buildTachographMap() {
Map<String, String> values = new LinkedHashMap<>();
values.put("A", "AT");
values.put("AL", "AL");
values.put("AND", "AD");
values.put("ARM", "AM");
values.put("AZ", "AZ");
values.put("B", "BE");
values.put("BG", "BG");
values.put("BIH", "BA");
values.put("BY", "BY");
values.put("CH", "CH");
values.put("CY", "CY");
values.put("CZ", "CZ");
values.put("D", "DE");
values.put("DK", "DK");
values.put("E", "ES");
values.put("EST", "EE");
values.put("F", "FR");
values.put("FIN", "FI");
values.put("FL", "LI");
values.put("FR", "FO");
values.put("UK", "GB");
values.put("GE", "GE");
values.put("GR", "GR");
values.put("H", "HU");
values.put("HR", "HR");
values.put("I", "IT");
values.put("IRL", "IE");
values.put("IS", "IS");
values.put("KZ", "KZ");
values.put("L", "LU");
values.put("LT", "LT");
values.put("LV", "LV");
values.put("M", "MT");
values.put("MC", "MC");
values.put("MD", "MD");
values.put("MK", "MK");
values.put("N", "NO");
values.put("NL", "NL");
values.put("P", "PT");
values.put("PL", "PL");
values.put("RO", "RO");
values.put("RSM", "SM");
values.put("RUS", "RU");
values.put("S", "SE");
values.put("SK", "SK");
values.put("SLO", "SI");
values.put("TM", "TM");
values.put("TR", "TR");
values.put("UA", "UA");
values.put("V", "VA");
values.put("YU", "RS");
values.put("MNE", "ME");
values.put("SRB", "RS");
values.put("UZ", "UZ");
values.put("TJ", "TJ");
return Map.copyOf(values);
}
}

View File

@ -0,0 +1,593 @@
package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.geocoding.model.GeoCountryResolution;
import at.procon.eventhub.geocoding.model.GeoCountryResolutionStatus;
import at.procon.eventhub.geocoding.service.GeoCountryResolver;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegment;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentBoundarySource;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationResult;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationScopeResult;
import at.procon.eventhub.processing.eventprocessing.support.RuntimeSupportEvidenceEvent;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.springframework.stereotype.Service;
@Service
public class DriverCountryTripSegmentationService {
private final GeoCountryResolver countryResolver;
private final EventHubProperties properties;
public DriverCountryTripSegmentationService(
GeoCountryResolver countryResolver,
EventHubProperties properties
) {
this.countryResolver = countryResolver;
this.properties = properties;
}
public DriverCountryTripSegmentationScopeResult segmentPreparedInputs(
Map<String, DriverWorkingTimePreparedInput> preparedInputs
) {
int maxRemoteLookups = properties.getReverseGeocoding()
.getNominatim()
.getMaxRemoteLookupsPerExecution();
LookupBudget budget = new LookupBudget(maxRemoteLookups);
LinkedHashMap<String, DriverCountryTripSegmentationResult> driverResults = new LinkedHashMap<>();
List<String> scopeWarnings = new ArrayList<>();
if (preparedInputs != null) {
preparedInputs.entrySet().stream()
.filter(entry -> entry.getKey() != null
&& !entry.getKey().isBlank()
&& entry.getValue() != null
&& entry.getValue().processingInput() != null)
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> {
DriverCountryTripSegmentationResult result = segmentDriver(
entry.getKey(),
entry.getValue(),
budget
);
driverResults.put(entry.getKey(), result);
scopeWarnings.addAll(result.warnings());
});
}
int segmentCount = driverResults.values().stream()
.mapToInt(DriverCountryTripSegmentationResult::segmentCount)
.sum();
int cacheHitCount = driverResults.values().stream()
.mapToInt(DriverCountryTripSegmentationResult::reverseGeocodingCacheHitCount)
.sum();
int unresolvedCount = driverResults.values().stream()
.mapToInt(DriverCountryTripSegmentationResult::unresolvedCoordinateCount)
.sum();
List<String> notes = List.of(
"Country trip segmentation preferred explicit tachograph border-crossing events and existing source country codes before Nominatim.",
"Nominatim remote lookups performed: " + budget.usedRemoteLookups() + " of configured maximum "
+ maxRemoteLookups + ".",
"Nominatim requests use a shared coordinate cache and the configured minimum request interval."
);
return new DriverCountryTripSegmentationScopeResult(
driverResults.size(),
segmentCount,
budget.usedRemoteLookups(),
cacheHitCount,
unresolvedCount,
"Data © OpenStreetMap contributors, ODbL 1.0",
driverResults,
notes,
distinctLimited(scopeWarnings, 50)
);
}
private DriverCountryTripSegmentationResult segmentDriver(
String driverKey,
DriverWorkingTimePreparedInput preparedInput,
LookupBudget budget
) {
List<DriverWorkingTimeActivityInterval> drivingIntervals = preparedInput.processingInput()
.activityIntervals()
.stream()
.filter(Objects::nonNull)
.filter(interval -> "DRIVE".equalsIgnoreCase(interval.activityType()))
.filter(interval -> interval.startedAt() != null
&& interval.endedAt() != null
&& interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator
.comparing(DriverWorkingTimeActivityInterval::startedAt)
.thenComparing(DriverWorkingTimeActivityInterval::endedAt))
.toList();
List<RuntimeSupportEvidenceEvent> supportEvents = preparedInput.processingInput()
.supportEvidenceEvents()
.stream()
.filter(Objects::nonNull)
.filter(event -> event.occurredAt() != null)
.sorted(Comparator
.comparing(RuntimeSupportEvidenceEvent::occurredAt)
.thenComparing(event -> nullToEmpty(event.eventId())))
.toList();
if (drivingIntervals.isEmpty()) {
return new DriverCountryTripSegmentationResult(
driverKey,
0,
0,
0,
0,
0,
0,
0,
"Data © OpenStreetMap contributors, ODbL 1.0",
List.of(),
List.of("No driving intervals were available for country trip segmentation."),
List.of()
);
}
DriverStats stats = new DriverStats();
List<String> warnings = new ArrayList<>();
List<DriverCountryTripSegment> segments = new ArrayList<>();
DriverWorkingTimeActivityInterval firstDrive = drivingIntervals.getFirst();
SegmentState state = new SegmentState(
firstDrive.startedAt(),
firstDrive.registrationKey(),
firstDrive.vehicleKey()
);
DriverWorkingTimeActivityInterval previousDrive = null;
for (DriverWorkingTimeActivityInterval drive : drivingIntervals) {
List<RuntimeSupportEvidenceEvent> driveEvents = supportEvents.stream()
.filter(event -> within(event.occurredAt(), drive.startedAt(), drive.endedAt()))
.filter(event -> compatibleVehicle(event, drive))
.toList();
stats.supportingGeoEventCount += (int) driveEvents.stream()
.filter(this::hasCoordinateOrBorderEvidence)
.count();
stats.explicitBorderCrossingCount += (int) driveEvents.stream()
.filter(this::isExplicitBorderCrossing)
.count();
if (previousDrive != null && vehicleChanged(previousDrive, drive)) {
String previousCountry = state.country;
addSegment(
segments,
driverKey,
state,
previousDrive.endedAt(),
previousCountry,
previousCountry,
state.lastPositionEvent,
DriverCountryTripSegmentBoundarySource.VEHICLE_CHANGE,
null,
false
);
state = new SegmentState(
drive.startedAt(),
drive.registrationKey(),
drive.vehicleKey()
);
state.country = previousCountry;
}
for (RuntimeSupportEvidenceEvent event : driveEvents) {
if (!hasCoordinateOrBorderEvidence(event)) {
continue;
}
if (state.startLatitude == null && hasCoordinate(event)) {
state.setStartPosition(event);
}
if (isExplicitBorderCrossing(event)) {
processExplicitBorderCrossing(
segments,
driverKey,
state,
event,
budget,
stats,
warnings
);
} else if (hasCoordinate(event)) {
processPosition(
segments,
driverKey,
state,
event,
budget,
stats,
warnings
);
}
}
previousDrive = drive;
}
DriverWorkingTimeActivityInterval lastDrive = drivingIntervals.getLast();
addSegment(
segments,
driverKey,
state,
lastDrive.endedAt(),
state.country,
state.country,
state.lastPositionEvent,
DriverCountryTripSegmentBoundarySource.FINAL,
state.lastPositionEvent == null ? null : state.lastPositionEvent.eventId(),
false
);
List<String> notes = List.of(
"Built country trip segments from " + drivingIntervals.size() + " driving interval(s) and "
+ stats.supportingGeoEventCount + " supporting geo/border event(s).",
"Explicit border crossings are authoritative; Nominatim is used only when a positioned event has no usable source country code.",
"Country codes in the result are normalized to ISO 3166-1 alpha-2 where a mapping is known."
);
if (budget.exhausted()) {
warnings.add("The configured Nominatim remote-lookup budget was exhausted; later uncached coordinates remained unresolved.");
}
return new DriverCountryTripSegmentationResult(
driverKey,
drivingIntervals.size(),
stats.supportingGeoEventCount,
stats.explicitBorderCrossingCount,
stats.remoteRequestCount,
stats.cacheHitCount,
stats.unresolvedCoordinateCount,
segments.size(),
"Data © OpenStreetMap contributors, ODbL 1.0",
segments,
notes,
distinctLimited(warnings, 20)
);
}
private void processExplicitBorderCrossing(
List<DriverCountryTripSegment> segments,
String driverKey,
SegmentState state,
RuntimeSupportEvidenceEvent event,
LookupBudget budget,
DriverStats stats,
List<String> warnings
) {
String countryFrom = CountryCodeNormalizer.normalizeTachograph(event.countryFrom());
String countryTo = CountryCodeNormalizer.normalizeTachograph(event.countryTo());
ResolvedEventCountry positionedCountry = null;
if (countryTo == null && hasCoordinate(event)) {
positionedCountry = resolveEventCountry(event, budget, stats, warnings);
countryTo = positionedCountry.countryCode();
}
if (state.country == null) {
state.country = firstNonBlank(countryFrom, countryTo);
}
String effectiveFrom = firstNonBlank(state.country, countryFrom);
String effectiveTo = firstNonBlank(countryTo, effectiveFrom);
if (effectiveFrom == null || effectiveTo == null || Objects.equals(effectiveFrom, effectiveTo)) {
state.lastPositionEvent = hasCoordinate(event) ? event : state.lastPositionEvent;
return;
}
addSegment(
segments,
driverKey,
state,
event.occurredAt(),
effectiveFrom,
effectiveTo,
event,
DriverCountryTripSegmentBoundarySource.EXPLICIT_BORDER_CROSSING,
event.eventId(),
positionedCountry != null && positionedCountry.reverseGeocoded()
);
state.restartAt(event, effectiveTo);
}
private void processPosition(
List<DriverCountryTripSegment> segments,
String driverKey,
SegmentState state,
RuntimeSupportEvidenceEvent event,
LookupBudget budget,
DriverStats stats,
List<String> warnings
) {
ResolvedEventCountry resolved = resolveEventCountry(event, budget, stats, warnings);
state.lastPositionEvent = event;
if (resolved.countryCode() == null) {
return;
}
if (state.country == null) {
state.country = resolved.countryCode();
return;
}
if (Objects.equals(state.country, resolved.countryCode())) {
return;
}
DriverCountryTripSegmentBoundarySource source = resolved.reverseGeocoded()
? DriverCountryTripSegmentBoundarySource.NOMINATIM_COUNTRY_CHANGE
: DriverCountryTripSegmentBoundarySource.GNSS_SOURCE_COUNTRY_CHANGE;
addSegment(
segments,
driverKey,
state,
event.occurredAt(),
state.country,
resolved.countryCode(),
event,
source,
event.eventId(),
resolved.reverseGeocoded()
);
state.restartAt(event, resolved.countryCode());
}
private ResolvedEventCountry resolveEventCountry(
RuntimeSupportEvidenceEvent event,
LookupBudget budget,
DriverStats stats,
List<String> warnings
) {
String sourceCountry = CountryCodeNormalizer.normalizeTachograph(event.countryCode());
if (sourceCountry != null) {
return new ResolvedEventCountry(sourceCountry, false);
}
if (!hasCoordinate(event)) {
return ResolvedEventCountry.unresolved();
}
GeoCountryResolution resolution = countryResolver.resolve(
event.latitude(),
event.longitude(),
budget.remoteLookupAllowed()
);
if (resolution.remoteRequestPerformed()) {
budget.recordRemoteLookup();
stats.remoteRequestCount++;
}
if (resolution.cacheHit()) {
stats.cacheHitCount++;
}
if (resolution.resolved()) {
return new ResolvedEventCountry(
CountryCodeNormalizer.normalizeIso(resolution.countryCode()),
true
);
}
stats.unresolvedCoordinateCount++;
if (resolution.status() == GeoCountryResolutionStatus.ERROR
|| resolution.status() == GeoCountryResolutionStatus.DISABLED
|| resolution.status() == GeoCountryResolutionStatus.REMOTE_LOOKUP_NOT_ALLOWED) {
warnings.add("Country could not be resolved for support event "
+ nullToEmpty(event.eventId()) + ": " + nullToEmpty(resolution.errorMessage()));
}
return ResolvedEventCountry.unresolved();
}
private void addSegment(
List<DriverCountryTripSegment> segments,
String driverKey,
SegmentState state,
OffsetDateTime endedAt,
String countryFrom,
String countryTo,
RuntimeSupportEvidenceEvent endPosition,
DriverCountryTripSegmentBoundarySource boundarySource,
String boundaryEventId,
boolean reverseGeocodedBoundary
) {
if (state.startedAt == null || endedAt == null || !endedAt.isAfter(state.startedAt)) {
return;
}
int index = segments.size();
segments.add(new DriverCountryTripSegment(
segmentId(driverKey, state.startedAt, endedAt, index),
driverKey,
state.registrationKey,
state.vehicleKey,
state.startedAt,
endedAt,
countryFrom,
countryTo,
state.startLatitude,
state.startLongitude,
endPosition == null ? null : endPosition.latitude(),
endPosition == null ? null : endPosition.longitude(),
state.startPositionEventId,
endPosition == null ? null : endPosition.eventId(),
boundarySource,
boundaryEventId,
reverseGeocodedBoundary
));
}
private boolean within(OffsetDateTime value, OffsetDateTime start, OffsetDateTime end) {
return value != null
&& start != null
&& end != null
&& !value.isBefore(start)
&& !value.isAfter(end);
}
private boolean compatibleVehicle(
RuntimeSupportEvidenceEvent event,
DriverWorkingTimeActivityInterval interval
) {
if (event.vehicleKey() != null
&& interval.vehicleKey() != null
&& !event.vehicleKey().equals(interval.vehicleKey())) {
return false;
}
return event.registrationKey() == null
|| interval.registrationKey() == null
|| event.registrationKey().equals(interval.registrationKey());
}
private boolean vehicleChanged(
DriverWorkingTimeActivityInterval previous,
DriverWorkingTimeActivityInterval next
) {
if (previous.vehicleKey() != null && next.vehicleKey() != null) {
return !previous.vehicleKey().equals(next.vehicleKey());
}
if (previous.registrationKey() != null && next.registrationKey() != null) {
return !previous.registrationKey().equals(next.registrationKey());
}
return false;
}
private boolean hasCoordinateOrBorderEvidence(RuntimeSupportEvidenceEvent event) {
return hasCoordinate(event) || isExplicitBorderCrossing(event);
}
private boolean hasCoordinate(RuntimeSupportEvidenceEvent event) {
return event.latitude() != null && event.longitude() != null;
}
private boolean isExplicitBorderCrossing(RuntimeSupportEvidenceEvent event) {
return "BORDER_CROSSING".equalsIgnoreCase(event.eventDomain())
|| event.countryFrom() != null
|| event.countryTo() != null;
}
private String segmentId(
String driverKey,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
int index
) {
String raw = nullToEmpty(driverKey) + "|" + startedAt + "|" + endedAt + "|" + index;
try {
byte[] digest = MessageDigest.getInstance("SHA-256")
.digest(raw.getBytes(StandardCharsets.UTF_8));
return "TRIP_COUNTRY|" + java.util.HexFormat.of().formatHex(digest, 0, 12);
} catch (NoSuchAlgorithmException ex) {
throw new IllegalStateException("SHA-256 is unavailable.", ex);
}
}
private String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String value : values) {
if (value != null && !value.isBlank()) {
return value.trim().toUpperCase(Locale.ROOT);
}
}
return null;
}
private String nullToEmpty(String value) {
return value == null ? "" : value;
}
private List<String> distinctLimited(List<String> values, int limit) {
Set<String> distinct = new LinkedHashSet<>();
if (values != null) {
values.stream()
.filter(value -> value != null && !value.isBlank())
.map(String::trim)
.forEach(distinct::add);
}
return distinct.stream().limit(Math.max(0, limit)).toList();
}
private static final class SegmentState {
private OffsetDateTime startedAt;
private String registrationKey;
private String vehicleKey;
private String country;
private BigDecimal startLatitude;
private BigDecimal startLongitude;
private String startPositionEventId;
private RuntimeSupportEvidenceEvent lastPositionEvent;
private SegmentState(
OffsetDateTime startedAt,
String registrationKey,
String vehicleKey
) {
this.startedAt = startedAt;
this.registrationKey = registrationKey;
this.vehicleKey = vehicleKey;
}
private void setStartPosition(RuntimeSupportEvidenceEvent event) {
if (event == null || event.latitude() == null || event.longitude() == null) {
return;
}
this.startLatitude = event.latitude();
this.startLongitude = event.longitude();
this.startPositionEventId = event.eventId();
this.lastPositionEvent = event;
}
private void restartAt(RuntimeSupportEvidenceEvent event, String newCountry) {
this.startedAt = event.occurredAt();
this.country = newCountry;
this.startLatitude = event.latitude();
this.startLongitude = event.longitude();
this.startPositionEventId = event.eventId();
this.lastPositionEvent = event;
}
}
private static final class LookupBudget {
private final int maximum;
private int used;
private LookupBudget(int maximum) {
this.maximum = Math.max(0, maximum);
}
private boolean remoteLookupAllowed() {
return used < maximum;
}
private void recordRemoteLookup() {
used++;
}
private int usedRemoteLookups() {
return used;
}
private boolean exhausted() {
return maximum > 0 && used >= maximum;
}
}
private static final class DriverStats {
private int supportingGeoEventCount;
private int explicitBorderCrossingCount;
private int remoteRequestCount;
private int cacheHitCount;
private int unresolvedCoordinateCount;
}
private record ResolvedEventCountry(String countryCode, boolean reverseGeocoded) {
private static ResolvedEventCountry unresolved() {
return new ResolvedEventCountry(null, false);
}
}
}

View File

@ -2,6 +2,7 @@ package at.procon.eventhub.processing.dto;
import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationResult;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationResult;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import com.fasterxml.jackson.annotation.JsonInclude;
@ -19,7 +20,9 @@ public record UnifiedRuntimeDerivedProjectionResultDto(
RuntimeSupportEvidenceNormalizationDebugDto supportEvidenceNormalization,
RuntimeDriverPartitionDebugDto partitionDebug,
@JsonInclude(JsonInclude.Include.NON_NULL)
DriverNdiHomeClassificationResult ndiHomeClassification
DriverNdiHomeClassificationResult ndiHomeClassification,
@JsonInclude(JsonInclude.Include.NON_NULL)
DriverCountryTripSegmentationResult countryTripSegmentation
) {
public UnifiedRuntimeDerivedProjectionResultDto {
discoveredVehicles = discoveredVehicles == null ? List.of() : List.copyOf(discoveredVehicles);
@ -47,6 +50,7 @@ public record UnifiedRuntimeDerivedProjectionResultDto(
notes,
null,
null,
null,
null
);
}
@ -73,6 +77,7 @@ public record UnifiedRuntimeDerivedProjectionResultDto(
notes,
supportEvidenceNormalization,
null,
null,
null
);
}
@ -100,6 +105,7 @@ public record UnifiedRuntimeDerivedProjectionResultDto(
notes,
supportEvidenceNormalization,
partitionDebug,
null,
null
);
}
@ -116,7 +122,8 @@ public record UnifiedRuntimeDerivedProjectionResultDto(
notes,
supportEvidenceNormalization,
debug,
ndiHomeClassification
ndiHomeClassification,
countryTripSegmentation
);
}
}

View File

@ -0,0 +1,70 @@
package at.procon.eventhub.processing.eventprocessing.module;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationScopeResult;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.service.DriverCountryTripSegmentationService;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.stereotype.Component;
@Component
public class DriverCountryTripSegmentationModule implements RuntimeProcessingModule {
private final DriverCountryTripSegmentationService segmentationService;
public DriverCountryTripSegmentationModule(
DriverCountryTripSegmentationService segmentationService
) {
this.segmentationService = segmentationService;
}
@Override
public String moduleKey() {
return DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION;
}
@Override
public RuntimeProcessingModuleDescriptorDto descriptor() {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Country trip segmentation",
"Builds per-driver country trip segments from explicit tachograph border crossings and GNSS country changes, using cached/rate-limited Nominatim reverse geocoding when source country codes are absent.",
"JAVA+HTTP",
Set.of(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION),
Set.of("Map<String, DriverWorkingTimePreparedInput>"),
Set.of("DriverCountryTripSegmentationScopeResult")
);
}
@Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
DriverCountryTripSegmentationScopeResult result = segmentationService.segmentPreparedInputs(
preparedInputs(context)
);
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("driverCount", result.driverCount());
metadata.put("segmentCount", result.segmentCount());
metadata.put("reverseGeocodingRemoteRequestCount", result.reverseGeocodingRemoteRequestCount());
metadata.put("reverseGeocodingCacheHitCount", result.reverseGeocodingCacheHitCount());
metadata.put("unresolvedCoordinateCount", result.unresolvedCoordinateCount());
metadata.put("reverseGeocodingAttribution", result.reverseGeocodingAttribution());
return new RuntimeProcessingModuleResult(
moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS,
result,
metadata,
result.warnings()
);
}
@SuppressWarnings("unchecked")
private Map<String, DriverWorkingTimePreparedInput> preparedInputs(RuntimeProcessingModuleContext context) {
Object output = context.requireResult(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION).output();
if (output instanceof Map<?, ?> map) {
return (Map<String, DriverWorkingTimePreparedInput>) map;
}
return Map.of();
}
}

View File

@ -2,6 +2,7 @@ package at.procon.eventhub.processing.eventprocessing.module;
import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto;
import at.procon.eventhub.processing.driverworkingtime.homeclassification.model.DriverNdiHomeClassificationScopeResult;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationScopeResult;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput;
import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDto;
import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto;
@ -66,7 +67,7 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Driving-derived projections",
"Executes the shared driver working-time core from typed per-driver module outputs for driving interruptions, rest candidates, card-absence coverage, overnight candidates, and trip candidates.",
"Executes the shared driver working-time core from typed per-driver module outputs for driving interruptions, rest candidates, card-absence coverage, overnight candidates, and trip candidates; optional NDI and country-trip module results are attached when present.",
"ESPER+JAVA",
Set.of(
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS,
@ -75,7 +76,7 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
),
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval",
"Map<String, DriverWorkingTimePreparedInput>"),
Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto")
Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto", "DriverNdiHomeClassificationResult", "DriverCountryTripSegmentationResult")
);
}
@ -90,6 +91,8 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
Map<String, DriverWorkingTimePreparedInput> preparedInputs = preparedInputs(context);
DriverNdiHomeClassificationScopeResult ndiHomeClassificationScope =
optionalNdiHomeClassificationScope(context);
DriverCountryTripSegmentationScopeResult countryTripSegmentationScope =
optionalCountryTripSegmentationScope(context);
LinkedHashMap<String, UnifiedRuntimeDerivedProjectionResultDto> driverResults = new LinkedHashMap<>();
List<String> warnings = new ArrayList<>();
@ -126,7 +129,10 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
preparedInput.partition().partitionDebug(),
ndiHomeClassificationScope == null
? null
: ndiHomeClassificationScope.resultForDriver(preparedInput.driverKey())
: ndiHomeClassificationScope.resultForDriver(preparedInput.driverKey()),
countryTripSegmentationScope == null
? null
: countryTripSegmentationScope.resultForDriver(preparedInput.driverKey())
));
}
@ -136,6 +142,10 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
if (ndiHomeClassificationScope != null) {
notes.addAll(ndiHomeClassificationScope.notes());
}
if (countryTripSegmentationScope != null) {
notes.addAll(countryTripSegmentationScope.notes());
warnings.addAll(countryTripSegmentationScope.warnings());
}
UnifiedRuntimeDriverWorkingTimeScopeResultDto result = new UnifiedRuntimeDriverWorkingTimeScopeResultDto(
broadBundle.request(),
@ -215,6 +225,17 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
return null;
}
private DriverCountryTripSegmentationScopeResult optionalCountryTripSegmentationScope(
RuntimeProcessingModuleContext context
) {
RuntimeProcessingModuleResult result =
context.previousResults().get(DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION);
if (result != null && result.output() instanceof DriverCountryTripSegmentationScopeResult scopeResult) {
return scopeResult;
}
return null;
}
private UnifiedRuntimeProcessingApiRequest scopeRequest(RuntimeProcessingModuleContext context) {
Object value = context.attributes().get("runtimeScopeApiRequest");
if (value instanceof UnifiedRuntimeProcessingApiRequest request) {

View File

@ -11,6 +11,7 @@ public final class DriverWorkingTimeModuleKeys {
public static final String VEHICLE_EVIDENCE_ATTACHMENT = "vehicle-evidence-attachment";
public static final String SUPPORT_EVIDENCE_NORMALIZATION = "support-evidence-normalization";
public static final String NDI_HOME_CLASSIFICATION = "ndi-home-classification";
public static final String COUNTRY_TRIP_SEGMENTATION = "country-trip-segmentation";
public static final String DRIVING_DERIVED_PROJECTIONS = "driving-derived-projections";
private DriverWorkingTimeModuleKeys() {

View File

@ -44,8 +44,7 @@ public class DriverHomeClassificationRuntimeProcessingPlan implements RuntimePro
@Override
public String description() {
return "Builds enriched non-driving intervals, learns company and driver home locations "
+ "from one or more tachograph file sessions, and applies ordered HOME/NOT_HOME rules.";
return "Builds enriched non-driving intervals, learns company and driver home locations, applies ordered HOME/NOT_HOME rules, and creates country trip segments using explicit border events plus Nominatim-backed GNSS country resolution.";
}
@Override
@ -97,8 +96,10 @@ public class DriverHomeClassificationRuntimeProcessingPlan implements RuntimePro
.forEach(modules::add);
}
modules.removeIf(moduleKey -> DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION.equals(moduleKey)
|| DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION.equals(moduleKey)
|| DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS.equals(moduleKey));
modules.add(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION);
modules.add(DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION);
modules.add(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS);
Map<String, Object> parameters = new LinkedHashMap<>();

View File

@ -205,10 +205,19 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
Set.of("Map<String, DriverWorkingTimePreparedInput>"),
Set.of("DriverNdiHomeClassificationScopeResult")
),
new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION,
"Country trip segmentation",
"Builds country trip segments from explicit border crossings and GNSS country changes; missing countries are resolved through cached, rate-limited Nominatim reverse geocoding.",
"JAVA+HTTP",
Set.of(DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION),
Set.of("Map<String, DriverWorkingTimePreparedInput>"),
Set.of("DriverCountryTripSegmentationScopeResult")
),
new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS,
"Driving-derived projections",
"Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates; attaches NDI HOME/NOT_HOME classification when that optional module was requested.",
"Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates; attaches optional NDI HOME/NOT_HOME and country-trip results when requested.",
"ESPER+JAVA",
Set.of(
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS,
@ -216,12 +225,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION
),
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"),
Set.of("DriverWorkingTimeProcessingResultDto", "DriverNdiHomeClassificationResult")
Set.of("DriverWorkingTimeProcessingResultDto", "DriverNdiHomeClassificationResult", "DriverCountryTripSegmentationResult")
)
));
if (!includeRuntimeEventAssemblyModule) {
descriptors.removeIf(descriptor -> DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION
.equals(descriptor.moduleKey()));
.equals(descriptor.moduleKey())
|| DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION.equals(descriptor.moduleKey()));
}
return List.copyOf(descriptors);
}
@ -505,6 +515,24 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
)
);
}
if (driverResult.countryTripSegmentation() != null) {
results.put(
DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION,
new RuntimeProcessingModuleResult(
DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION,
RuntimeProcessingModuleStatus.SUCCESS,
driverResult.countryTripSegmentation(),
Map.of(
"segmentCount", driverResult.countryTripSegmentation().segmentCount(),
"explicitBorderCrossingCount", driverResult.countryTripSegmentation().explicitBorderCrossingCount(),
"reverseGeocodingRemoteRequestCount", driverResult.countryTripSegmentation().reverseGeocodingRemoteRequestCount(),
"reverseGeocodingCacheHitCount", driverResult.countryTripSegmentation().reverseGeocodingCacheHitCount(),
"unresolvedCoordinateCount", driverResult.countryTripSegmentation().unresolvedCoordinateCount()
),
driverResult.countryTripSegmentation().warnings()
)
);
}
results.put(
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS,
new RuntimeProcessingModuleResult(
@ -545,6 +573,15 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"driverHomeClusterCount", driverResult.ndiHomeClassification().driverHomeClusterCount()
));
}
if (driverResult.countryTripSegmentation() != null) {
metadata.put("countryTripSegmentation", Map.of(
"segmentCount", driverResult.countryTripSegmentation().segmentCount(),
"explicitBorderCrossingCount", driverResult.countryTripSegmentation().explicitBorderCrossingCount(),
"reverseGeocodingRemoteRequestCount", driverResult.countryTripSegmentation().reverseGeocodingRemoteRequestCount(),
"reverseGeocodingCacheHitCount", driverResult.countryTripSegmentation().reverseGeocodingCacheHitCount(),
"unresolvedCoordinateCount", driverResult.countryTripSegmentation().unresolvedCoordinateCount()
));
}
return metadata;
}
@ -564,7 +601,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
driverResult.notes(),
includeSupportEvidenceNormalization ? driverResult.supportEvidenceNormalization() : null,
includePartitionDebug ? driverResult.partitionDebug() : null,
driverResult.ndiHomeClassification()
driverResult.ndiHomeClassification(),
driverResult.countryTripSegmentation()
);
}
@ -760,6 +798,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
}
boolean includeNdiHomeClassification =
requested.remove(DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION) != null;
boolean includeCountryTripSegmentation =
requested.remove(DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION) != null;
requested.remove(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS);
if (includeNdiHomeClassification) {
requested.put(
@ -767,6 +807,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION
);
}
if (includeCountryTripSegmentation) {
requested.put(
DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION,
DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION
);
}
requested.put(
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS,
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS
@ -776,6 +822,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
return modules().stream()
.map(RuntimeProcessingModuleDescriptorDto::moduleKey)
.filter(moduleKey -> !DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION.equals(moduleKey))
.filter(moduleKey -> !DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION.equals(moduleKey))
.toList();
}

View File

@ -50,6 +50,26 @@ eventhub:
concurrent-consumers: 4
block-when-full: true
queue-offer-timeout: 5m
reverse-geocoding:
enabled: ${NOMINATIM_ENABLED:true}
provider: NOMINATIM
nominatim:
# Public OSM Nominatim requires an identifying User-Agent and no more than one request/second.
# Configure a self-hosted endpoint here when higher throughput is required.
base-url: ${NOMINATIM_BASE_URL:https://nominatim.openstreetmap.org}
# Deliberate opt-in is required for the donated public service. Prefer a self-hosted or contracted endpoint for production/bulk processing.
public-service-enabled: ${NOMINATIM_PUBLIC_SERVICE_ENABLED:false}
user-agent: ${NOMINATIM_USER_AGENT:eventhub-tachograph/0.1 (Nominatim reverse geocoding)}
email: ${NOMINATIM_EMAIL:}
accept-language: ${NOMINATIM_ACCEPT_LANGUAGE:en}
connect-timeout: ${NOMINATIM_CONNECT_TIMEOUT:10s}
read-timeout: ${NOMINATIM_READ_TIMEOUT:20s}
minimum-request-interval: ${NOMINATIM_MIN_REQUEST_INTERVAL:1s}
cache-ttl: ${NOMINATIM_CACHE_TTL:30d}
cache-max-entries: ${NOMINATIM_CACHE_MAX_ENTRIES:100000}
coordinate-decimal-places: ${NOMINATIM_COORDINATE_DECIMAL_PLACES:4}
max-remote-lookups-per-execution: ${NOMINATIM_MAX_REMOTE_LOOKUPS_PER_EXECUTION:25}
tachograph:
default-chunk-days: 1
occurred-at-overlap: 7d

View File

@ -0,0 +1,144 @@
package at.procon.eventhub.geocoding.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.geocoding.model.GeoCountryResolution;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.net.http.HttpClient;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
class NominatimGeoCountryResolverTest {
private HttpServer server;
@AfterEach
void stopServer() {
if (server != null) {
server.stop(0);
}
}
@Test
void resolvesCountryWithRequiredHeadersAndCachesCoordinate() throws Exception {
AtomicInteger requestCount = new AtomicInteger();
AtomicReference<String> userAgent = new AtomicReference<>();
AtomicReference<String> query = new AtomicReference<>();
server = HttpServer.create(new InetSocketAddress(0), 0);
server.createContext("/reverse", exchange -> respond(exchange, requestCount, userAgent, query));
server.start();
EventHubProperties properties = new EventHubProperties();
EventHubProperties.Nominatim config = properties.getReverseGeocoding().getNominatim();
config.setBaseUrl("http://localhost:" + server.getAddress().getPort());
config.setUserAgent("eventhub-test/1.0");
config.setMinimumRequestInterval(Duration.ZERO);
config.setCoordinateDecimalPlaces(4);
NominatimGeoCountryResolver resolver = new NominatimGeoCountryResolver(
properties,
new ObjectMapper(),
HttpClient.newHttpClient(),
Clock.systemUTC()
);
GeoCountryResolution first = resolver.resolve(
new BigDecimal("48.208200"),
new BigDecimal("16.373800"),
true
);
GeoCountryResolution second = resolver.resolve(
new BigDecimal("48.208200"),
new BigDecimal("16.373800"),
false
);
assertThat(first.resolved()).isTrue();
assertThat(first.countryCode()).isEqualTo("AT");
assertThat(first.remoteRequestPerformed()).isTrue();
assertThat(first.cacheHit()).isFalse();
assertThat(second.resolved()).isTrue();
assertThat(second.cacheHit()).isTrue();
assertThat(second.remoteRequestPerformed()).isFalse();
assertThat(requestCount).hasValue(1);
assertThat(userAgent).hasValue("eventhub-test/1.0");
assertThat(query.get())
.contains("format=jsonv2")
.contains("zoom=3")
.contains("layer=address")
.contains("lat=48.208200")
.contains("lon=16.373800");
}
@Test
void requiresExplicitOptInForPublicOsmEndpoint() {
EventHubProperties properties = new EventHubProperties();
NominatimGeoCountryResolver resolver = new NominatimGeoCountryResolver(
properties,
new ObjectMapper(),
HttpClient.newHttpClient(),
Clock.systemUTC()
);
GeoCountryResolution result = resolver.resolve(
new BigDecimal("48.2082"),
new BigDecimal("16.3738"),
true
);
assertThat(result.resolved()).isFalse();
assertThat(result.remoteRequestPerformed()).isFalse();
assertThat(result.errorMessage()).contains("explicit opt-in");
}
@Test
void doesNotCallRemoteServiceWhenDisabled() {
EventHubProperties properties = new EventHubProperties();
properties.getReverseGeocoding().setEnabled(false);
NominatimGeoCountryResolver resolver = new NominatimGeoCountryResolver(
properties,
new ObjectMapper(),
HttpClient.newHttpClient(),
Clock.systemUTC()
);
GeoCountryResolution result = resolver.resolve(
new BigDecimal("48.2082"),
new BigDecimal("16.3738"),
true
);
assertThat(result.resolved()).isFalse();
assertThat(result.remoteRequestPerformed()).isFalse();
}
private void respond(
HttpExchange exchange,
AtomicInteger requestCount,
AtomicReference<String> userAgent,
AtomicReference<String> query
) throws IOException {
requestCount.incrementAndGet();
userAgent.set(exchange.getRequestHeaders().getFirst("User-Agent"));
query.set(exchange.getRequestURI().getRawQuery());
byte[] body = ("{\"place_id\":1,\"display_name\":\"Vienna, Austria\","
+ "\"licence\":\"Data © OpenStreetMap contributors, ODbL 1.0\","
+ "\"address\":{\"country\":\"Austria\",\"country_code\":\"at\"}}")
.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(200, body.length);
exchange.getResponseBody().write(body);
exchange.close();
}
}

View File

@ -0,0 +1,263 @@
package at.procon.eventhub.processing.driverworkingtime.tripsegmentation.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.geocoding.model.GeoCountryResolution;
import at.procon.eventhub.geocoding.model.GeoCountryResolutionStatus;
import at.procon.eventhub.geocoding.service.GeoCountryResolver;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDriverPartition;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeProcessingInput;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentBoundarySource;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationResult;
import at.procon.eventhub.processing.driverworkingtime.tripsegmentation.model.DriverCountryTripSegmentationScopeResult;
import at.procon.eventhub.processing.eventprocessing.support.RuntimeSupportEvidenceEvent;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
class DriverCountryTripSegmentationServiceTest {
private static final String DRIVER = "13:DRIVER-1";
private static final String VEHICLE = "VIN-1";
private static final String REGISTRATION = "W-12345";
@Test
void prefersExplicitBorderCrossingAndExistingCountryCodes() {
AtomicInteger resolverCalls = new AtomicInteger();
GeoCountryResolver resolver = (latitude, longitude, allowRemoteLookup) -> {
resolverCalls.incrementAndGet();
return unresolved(latitude, longitude);
};
DriverCountryTripSegmentationService service = new DriverCountryTripSegmentationService(
resolver,
properties()
);
List<RuntimeSupportEvidenceEvent> supportEvents = List.of(
position("p-at", "2026-05-01T08:05:00Z", "48.2082", "16.3738", "A"),
border("b-at-de", "2026-05-01T10:00:00Z", "48.75", "13.84", "A", "D"),
position("p-de", "2026-05-01T11:00:00Z", "48.90", "13.40", "D")
);
DriverCountryTripSegmentationScopeResult scope = service.segmentPreparedInputs(
Map.of(DRIVER, preparedInput(supportEvents))
);
DriverCountryTripSegmentationResult result = scope.resultForDriver(DRIVER);
assertThat(resolverCalls).hasValue(0);
assertThat(result.segmentCount()).isEqualTo(2);
assertThat(result.explicitBorderCrossingCount()).isEqualTo(1);
assertThat(result.segments().get(0).countryFrom()).isEqualTo("AT");
assertThat(result.segments().get(0).countryTo()).isEqualTo("DE");
assertThat(result.segments().get(0).endBoundarySource())
.isEqualTo(DriverCountryTripSegmentBoundarySource.EXPLICIT_BORDER_CROSSING);
assertThat(result.segments().get(1).countryFrom()).isEqualTo("DE");
assertThat(result.segments().get(1).countryTo()).isEqualTo("DE");
}
@Test
void usesNominatimWhenPositionCountryIsMissing() {
AtomicInteger resolverCalls = new AtomicInteger();
GeoCountryResolver resolver = (latitude, longitude, allowRemoteLookup) -> {
resolverCalls.incrementAndGet();
String code = longitude.compareTo(new BigDecimal("15")) > 0 ? "AT" : "DE";
return new GeoCountryResolution(
GeoCountryResolutionStatus.RESOLVED,
latitude,
longitude,
code,
null,
null,
"NOMINATIM",
"Data © OpenStreetMap contributors, ODbL 1.0",
false,
true,
null
);
};
DriverCountryTripSegmentationService service = new DriverCountryTripSegmentationService(
resolver,
properties()
);
List<RuntimeSupportEvidenceEvent> supportEvents = List.of(
position("p-at", "2026-05-01T08:05:00Z", "48.2082", "16.3738", null),
position("p-de", "2026-05-01T10:00:00Z", "48.90", "13.40", null)
);
DriverCountryTripSegmentationResult result = service.segmentPreparedInputs(
Map.of(DRIVER, preparedInput(supportEvents))
).resultForDriver(DRIVER);
assertThat(resolverCalls).hasValue(2);
assertThat(result.reverseGeocodingRemoteRequestCount()).isEqualTo(2);
assertThat(result.segmentCount()).isEqualTo(2);
assertThat(result.segments().get(0).countryFrom()).isEqualTo("AT");
assertThat(result.segments().get(0).countryTo()).isEqualTo("DE");
assertThat(result.segments().get(0).endBoundarySource())
.isEqualTo(DriverCountryTripSegmentBoundarySource.NOMINATIM_COUNTRY_CHANGE);
assertThat(result.segments().get(0).boundaryCountryReverseGeocoded()).isTrue();
}
private EventHubProperties properties() {
EventHubProperties properties = new EventHubProperties();
properties.getReverseGeocoding().getNominatim().setMaxRemoteLookupsPerExecution(10);
return properties;
}
private DriverWorkingTimePreparedInput preparedInput(List<RuntimeSupportEvidenceEvent> supportEvents) {
UUID sessionId = UUID.randomUUID();
DriverWorkingTimeActivityInterval drive = new DriverWorkingTimeActivityInterval(
sessionId,
DRIVER,
"drive-1",
"DRIVE",
"1",
"INSERTED",
"SINGLE",
REGISTRATION,
VEHICLE,
"TACHOGRAPH_FILE_SESSION",
"drive-start",
"drive-end",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T12:00:00Z"),
OffsetDateTime.parse("2026-05-01T08:00:00Z").toEpochSecond(),
OffsetDateTime.parse("2026-05-01T12:00:00Z").toEpochSecond(),
4 * 60 * 60,
List.of("drive-1"),
false,
false,
"RAW"
);
DriverWorkingTimeProcessingInput processingInput = new DriverWorkingTimeProcessingInput(
sessionId,
DRIVER,
"TACHOGRAPH_FILE_SESSION",
drive.startedAt(),
drive.endedAt(),
drive.startedAt(),
drive.endedAt(),
3,
720,
List.of(drive),
List.of(),
supportEvents,
List.of()
);
DriverWorkingTimeDriverPartition partition = new DriverWorkingTimeDriverPartition(
DRIVER,
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
null,
supportEvents,
null,
List.of(),
List.of()
);
return new DriverWorkingTimePreparedInput(DRIVER, partition, processingInput);
}
private RuntimeSupportEvidenceEvent position(
String eventId,
String occurredAt,
String latitude,
String longitude,
String countryCode
) {
return supportEvent(
eventId,
occurredAt,
"POSITION",
"POSITION_RECORDED",
latitude,
longitude,
countryCode,
null,
null
);
}
private RuntimeSupportEvidenceEvent border(
String eventId,
String occurredAt,
String latitude,
String longitude,
String countryFrom,
String countryTo
) {
return supportEvent(
eventId,
occurredAt,
"BORDER_CROSSING",
"BORDER_OUTBOUND",
latitude,
longitude,
null,
countryFrom,
countryTo
);
}
private RuntimeSupportEvidenceEvent supportEvent(
String eventId,
String occurredAt,
String eventDomain,
String eventType,
String latitude,
String longitude,
String countryCode,
String countryFrom,
String countryTo
) {
OffsetDateTime occurred = OffsetDateTime.parse(occurredAt);
return new RuntimeSupportEvidenceEvent(
eventId,
"TACHOGRAPH_FILE_SESSION",
"VEHICLE_UNIT",
eventDomain,
eventType,
"SNAPSHOT",
DRIVER,
VEHICLE,
REGISTRATION,
occurred,
occurred.toEpochSecond(),
new BigDecimal(latitude),
new BigDecimal(longitude),
countryCode,
null,
countryFrom,
countryTo,
null,
null,
null,
null,
Map.of()
);
}
private GeoCountryResolution unresolved(BigDecimal latitude, BigDecimal longitude) {
return new GeoCountryResolution(
GeoCountryResolutionStatus.NOT_FOUND,
latitude,
longitude,
null,
null,
null,
"NOMINATIM",
null,
false,
false,
"not found"
);
}
}

View File

@ -239,6 +239,7 @@ class DriverWorkingTimeRuntimeProcessingPlanTest {
assertThat(delegated.processingPlanKey()).isEqualTo(DriverWorkingTimeRuntimeProcessingPlan.PLAN_KEY);
assertThat(delegated.modules()).endsWith(
DriverWorkingTimeModuleKeys.NDI_HOME_CLASSIFICATION,
DriverWorkingTimeModuleKeys.COUNTRY_TRIP_SEGMENTATION,
DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS
);
assertThat(delegated.parameters())