diff --git a/postman/eventhub-esper-poc.postman_collection.json b/postman/eventhub-esper-poc.postman_collection.json index a665d02..0d9a95b 100644 --- a/postman/eventhub-esper-poc.postman_collection.json +++ b/postman/eventhub-esper-poc.postman_collection.json @@ -57,6 +57,68 @@ ] } } + }, + { + "name": "Evaluate tachograph operating periods", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/api/eventhub/esper-poc/tachograph/operating-period-evaluation?tenantKey={{tenantKey}}&driverId={{driverId}}&occurredFrom={{occurredFrom}}&occurredTo={{occurredTo}}&guardHours=24&operatingSplitIdleHours=7&significantDrivingMinutes=3&mergeGapSeconds=0&gapDetectionToleranceSeconds=0&unknownTreatmentMode=AS_BREAK_REST", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "eventhub", + "esper-poc", + "tachograph", + "operating-period-evaluation" + ], + "query": [ + { + "key": "tenantKey", + "value": "{{tenantKey}}" + }, + { + "key": "driverId", + "value": "{{driverId}}" + }, + { + "key": "occurredFrom", + "value": "{{occurredFrom}}" + }, + { + "key": "occurredTo", + "value": "{{occurredTo}}" + }, + { + "key": "guardHours", + "value": "24" + }, + { + "key": "operatingSplitIdleHours", + "value": "7" + }, + { + "key": "significantDrivingMinutes", + "value": "3" + }, + { + "key": "mergeGapSeconds", + "value": "0" + }, + { + "key": "gapDetectionToleranceSeconds", + "value": "0" + }, + { + "key": "unknownTreatmentMode", + "value": "AS_BREAK_REST" + } + ] + } + } } ], "variable": [ @@ -72,6 +134,10 @@ "key": "driverEntityId", "value": "00000000-0000-0000-0000-000000000000" }, + { + "key": "driverId", + "value": "00000000-0000-0000-0000-000000000000" + }, { "key": "occurredFrom", "value": "2026-04-01T00:00:00Z" diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index c2c9700..a132a34 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -1,6 +1,7 @@ package at.procon.eventhub.config; import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode; +import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode; import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode; import java.time.Duration; import java.time.OffsetDateTime; @@ -50,6 +51,7 @@ public class EventHubProperties { public static class EsperPoc { private EsperActivityMergeMode activityMergeMode = EsperActivityMergeMode.JAVA; private EsperShiftResolutionMode shiftResolutionMode = EsperShiftResolutionMode.JAVA; + private final OperatingPeriodEvaluation operatingPeriodEvaluation = new OperatingPeriodEvaluation(); public EsperActivityMergeMode getActivityMergeMode() { return activityMergeMode; @@ -66,6 +68,60 @@ public class EventHubProperties { public void setShiftResolutionMode(EsperShiftResolutionMode shiftResolutionMode) { this.shiftResolutionMode = shiftResolutionMode == null ? EsperShiftResolutionMode.JAVA : shiftResolutionMode; } + + public OperatingPeriodEvaluation getOperatingPeriodEvaluation() { + return operatingPeriodEvaluation; + } + } + + public static class OperatingPeriodEvaluation { + private int operatingSplitIdleHours = 7; + private int significantDrivingMinutes = 3; + private int mergeGapSeconds = 0; + private int gapDetectionToleranceSeconds = 0; + private EsperUnknownTreatmentMode unknownTreatmentMode = EsperUnknownTreatmentMode.AS_BREAK_REST; + + public int getOperatingSplitIdleHours() { + return operatingSplitIdleHours; + } + + public void setOperatingSplitIdleHours(int operatingSplitIdleHours) { + this.operatingSplitIdleHours = Math.max(1, operatingSplitIdleHours); + } + + public int getSignificantDrivingMinutes() { + return significantDrivingMinutes; + } + + public void setSignificantDrivingMinutes(int significantDrivingMinutes) { + this.significantDrivingMinutes = Math.max(1, significantDrivingMinutes); + } + + public int getMergeGapSeconds() { + return mergeGapSeconds; + } + + public void setMergeGapSeconds(int mergeGapSeconds) { + this.mergeGapSeconds = Math.max(0, mergeGapSeconds); + } + + public int getGapDetectionToleranceSeconds() { + return gapDetectionToleranceSeconds; + } + + public void setGapDetectionToleranceSeconds(int gapDetectionToleranceSeconds) { + this.gapDetectionToleranceSeconds = Math.max(0, gapDetectionToleranceSeconds); + } + + public EsperUnknownTreatmentMode getUnknownTreatmentMode() { + return unknownTreatmentMode; + } + + public void setUnknownTreatmentMode(EsperUnknownTreatmentMode unknownTreatmentMode) { + this.unknownTreatmentMode = unknownTreatmentMode == null + ? EsperUnknownTreatmentMode.AS_BREAK_REST + : unknownTreatmentMode; + } } public static class Batch { diff --git a/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java b/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java index 4e0c97a..0bd93ab 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java +++ b/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java @@ -1,9 +1,13 @@ package at.procon.eventhub.esperpoc.api; import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode; +import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest; +import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto; import at.procon.eventhub.esperpoc.dto.EsperPocRequest; import at.procon.eventhub.esperpoc.dto.EsperPocResultDto; import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode; +import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode; +import at.procon.eventhub.esperpoc.service.EsperOperatingPeriodEvaluationService; import at.procon.eventhub.esperpoc.service.EsperPocDriverCardActivityService; import java.time.OffsetDateTime; import java.util.UUID; @@ -19,9 +23,14 @@ import org.springframework.web.bind.annotation.RestController; public class EsperPocController { private final EsperPocDriverCardActivityService service; + private final EsperOperatingPeriodEvaluationService operatingPeriodEvaluationService; - public EsperPocController(EsperPocDriverCardActivityService service) { + public EsperPocController( + EsperPocDriverCardActivityService service, + EsperOperatingPeriodEvaluationService operatingPeriodEvaluationService + ) { this.service = service; + this.operatingPeriodEvaluationService = operatingPeriodEvaluationService; } @GetMapping("/tachograph/driver-card-activities") @@ -55,4 +64,32 @@ public class EsperPocController { ); return ResponseEntity.ok(service.evaluate(request)); } + + @GetMapping("/tachograph/operating-period-evaluation") + public ResponseEntity evaluateOperatingPeriods( + @RequestParam String tenantKey, + @RequestParam UUID driverId, + @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredFrom, + @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredTo, + @RequestParam(defaultValue = "24") Integer guardHours, + @RequestParam(required = false) Integer operatingSplitIdleHours, + @RequestParam(required = false) Integer significantDrivingMinutes, + @RequestParam(required = false) Integer mergeGapSeconds, + @RequestParam(required = false) Integer gapDetectionToleranceSeconds, + @RequestParam(required = false) EsperUnknownTreatmentMode unknownTreatmentMode + ) { + EsperOperatingPeriodRequest request = new EsperOperatingPeriodRequest( + tenantKey, + driverId, + occurredFrom, + occurredTo, + guardHours, + operatingSplitIdleHours, + significantDrivingMinutes, + mergeGapSeconds, + gapDetectionToleranceSeconds, + unknownTreatmentMode + ); + return ResponseEntity.ok(operatingPeriodEvaluationService.evaluate(request)); + } } diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodRequest.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodRequest.java new file mode 100644 index 0000000..59420f4 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodRequest.java @@ -0,0 +1,30 @@ +package at.procon.eventhub.esperpoc.dto; + +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import java.time.OffsetDateTime; +import java.util.UUID; + +public record EsperOperatingPeriodRequest( + @NotBlank String tenantKey, + @NotNull UUID driverId, + @NotNull OffsetDateTime occurredFrom, + @NotNull OffsetDateTime occurredTo, + Integer guardHours, + Integer operatingSplitIdleHours, + Integer significantDrivingMinutes, + Integer mergeGapSeconds, + Integer gapDetectionToleranceSeconds, + EsperUnknownTreatmentMode unknownTreatmentMode +) { + public EsperOperatingPeriodRequest { + if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) { + throw new IllegalArgumentException("occurredFrom must be before occurredTo"); + } + guardHours = guardHours == null ? 24 : Math.max(0, guardHours); + operatingSplitIdleHours = operatingSplitIdleHours == null ? null : Math.max(1, operatingSplitIdleHours); + significantDrivingMinutes = significantDrivingMinutes == null ? null : Math.max(1, significantDrivingMinutes); + mergeGapSeconds = mergeGapSeconds == null ? null : Math.max(0, mergeGapSeconds); + gapDetectionToleranceSeconds = gapDetectionToleranceSeconds == null ? null : Math.max(0, gapDetectionToleranceSeconds); + } +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodResultDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodResultDto.java new file mode 100644 index 0000000..01048f1 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodResultDto.java @@ -0,0 +1,39 @@ +package at.procon.eventhub.esperpoc.dto; + +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; + +public record EsperOperatingPeriodResultDto( + String tenantKey, + UUID driverId, + OffsetDateTime requestedFrom, + OffsetDateTime requestedTo, + OffsetDateTime loadedFrom, + OffsetDateTime loadedTo, + int rawEventCount, + int driverCardRawEventCount, + int vehicleUnitRawEventCount, + int driverCardIntervalCount, + int vehicleUnitIntervalCount, + int resolvedKnownIntervalCount, + int evaluationIntervalCount, + int periodizedIntervalCount, + int mergedIntervalCount, + int nonDrivingIntervalCount, + int operatingPeriodCount, + int operatingSplitIdleHours, + int significantDrivingMinutes, + int mergeGapSeconds, + int gapDetectionToleranceSeconds, + EsperUnknownTreatmentMode unknownTreatmentMode, + List rawEvents, + List resolvedKnownIntervals, + List evaluationIntervals, + List periodizedIntervals, + List mergedIntervals, + List nonDrivingIntervals, + List operatingPeriods, + List notes +) { +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperUnknownTreatmentMode.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperUnknownTreatmentMode.java new file mode 100644 index 0000000..09dcaf1 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperUnknownTreatmentMode.java @@ -0,0 +1,6 @@ +package at.procon.eventhub.esperpoc.dto; + +public enum EsperUnknownTreatmentMode { + AS_BREAK_REST, + AS_UNKNOWN_NON_BREAK +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/NonDrivingIntervalDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/NonDrivingIntervalDto.java new file mode 100644 index 0000000..c1e24bb --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/NonDrivingIntervalDto.java @@ -0,0 +1,21 @@ +package at.procon.eventhub.esperpoc.dto; + +import java.time.OffsetDateTime; +import java.util.UUID; + +public record NonDrivingIntervalDto( + UUID driverId, + UUID vehicleId, + UUID vehicleRegistrationId, + String cardSlot, + OffsetDateTime startedAt, + OffsetDateTime endedAt, + long durationSeconds, + long operatingPeriodNo, + OffsetDateTime operatingPeriodStartedAt, + String closedBy, + boolean containsUnknown, + long unknownSeconds, + boolean clippedToRequestedPeriod +) { +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/OperatingPeriodActivityIntervalDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/OperatingPeriodActivityIntervalDto.java new file mode 100644 index 0000000..3bec3ff --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/OperatingPeriodActivityIntervalDto.java @@ -0,0 +1,110 @@ +package at.procon.eventhub.esperpoc.dto; + +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; + +public record OperatingPeriodActivityIntervalDto( + UUID driverId, + UUID vehicleId, + UUID vehicleRegistrationId, + String activityType, + String cardSlot, + String cardStatus, + String drivingStatus, + String sourceKind, + OffsetDateTime startedAt, + OffsetDateTime endedAt, + long durationSeconds, + String sourceRowId, + List sourceRowIds, + boolean clippedToRequestedPeriod, + String level, + long operatingPeriodNo, + OffsetDateTime operatingPeriodStartedAt, + boolean newOperatingPeriod, + Long gapSincePreviousActivitySeconds, + boolean synthetic +) { + public static OperatingPeriodActivityIntervalDto periodized( + ActivityIntervalDto interval, + long operatingPeriodNo, + OffsetDateTime operatingPeriodStartedAt, + boolean newOperatingPeriod, + Long gapSincePreviousActivitySeconds + ) { + return new OperatingPeriodActivityIntervalDto( + interval.driverEntityId(), + interval.vehicleId(), + interval.vehicleRegistrationId(), + interval.activityType(), + interval.cardSlot(), + interval.cardStatus(), + interval.drivingStatus(), + interval.sourceKind(), + interval.startedAt(), + interval.endedAt(), + interval.durationSeconds(), + interval.sourceRowId(), + interval.sourceRowIds(), + interval.clippedToRequestedPeriod(), + "PERIODIZED_ACTIVITY", + operatingPeriodNo, + operatingPeriodStartedAt, + newOperatingPeriod, + gapSincePreviousActivitySeconds, + "UNKNOWN_GAP".equals(interval.level()) + ); + } + + public OperatingPeriodActivityIntervalDto withTime(OffsetDateTime newStartedAt, OffsetDateTime newEndedAt, boolean clipped) { + return new OperatingPeriodActivityIntervalDto( + driverId, + vehicleId, + vehicleRegistrationId, + activityType, + cardSlot, + cardStatus, + drivingStatus, + sourceKind, + newStartedAt, + newEndedAt, + Duration.between(newStartedAt, newEndedAt).getSeconds(), + sourceRowId, + sourceRowIds, + clipped, + level, + operatingPeriodNo, + operatingPeriodStartedAt, + newOperatingPeriod, + gapSincePreviousActivitySeconds, + synthetic + ); + } + + public OperatingPeriodActivityIntervalDto asMerged(OffsetDateTime newEndedAt, List mergedSourceRowIds) { + return new OperatingPeriodActivityIntervalDto( + driverId, + vehicleId, + vehicleRegistrationId, + activityType, + cardSlot, + cardStatus, + drivingStatus, + sourceKind, + startedAt, + newEndedAt, + Duration.between(startedAt, newEndedAt).getSeconds(), + sourceRowId, + mergedSourceRowIds, + clippedToRequestedPeriod, + "MERGED_ACTIVITY", + operatingPeriodNo, + operatingPeriodStartedAt, + newOperatingPeriod, + gapSincePreviousActivitySeconds, + synthetic + ); + } +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/OperatingPeriodDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/OperatingPeriodDto.java new file mode 100644 index 0000000..e3a176d --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/OperatingPeriodDto.java @@ -0,0 +1,18 @@ +package at.procon.eventhub.esperpoc.dto; + +import java.time.OffsetDateTime; + +public record OperatingPeriodDto( + long operatingPeriodNo, + OffsetDateTime startedAt, + OffsetDateTime endedAt, + long durationSeconds, + String closedBy, + long drivingSeconds, + long workSeconds, + long availabilitySeconds, + long unknownSeconds, + int intervalCount, + boolean clippedToRequestedPeriod +) { +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperClosedOperatingPeriod.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperClosedOperatingPeriod.java new file mode 100644 index 0000000..f552b0a --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperClosedOperatingPeriod.java @@ -0,0 +1,12 @@ +package at.procon.eventhub.esperpoc.service; + +import java.time.OffsetDateTime; + +record EsperClosedOperatingPeriod( + long operatingPeriodNo, + OffsetDateTime startedAt, + OffsetDateTime endedAt, + long durationSeconds, + String closedBy +) { +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEngine.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEngine.java new file mode 100644 index 0000000..0e51e0e --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEngine.java @@ -0,0 +1,257 @@ +package at.procon.eventhub.esperpoc.service; + +import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto; +import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto; +import com.espertech.esper.common.client.EPCompiled; +import com.espertech.esper.common.client.EventBean; +import com.espertech.esper.common.client.configuration.Configuration; +import com.espertech.esper.compiler.client.CompilerArguments; +import com.espertech.esper.compiler.client.EPCompileException; +import com.espertech.esper.compiler.client.EPCompilerProvider; +import com.espertech.esper.runtime.client.EPDeployException; +import com.espertech.esper.runtime.client.EPDeployment; +import com.espertech.esper.runtime.client.EPRuntime; +import com.espertech.esper.runtime.client.EPRuntimeProvider; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.springframework.stereotype.Component; + +@Component +public class EsperOperatingPeriodEngine { + + private static final AtomicLong RUNTIME_COUNTER = new AtomicLong(); + private static final String INPUT_STREAM_EPL = """ + @name('operatingPeriodIntervalStream') + select * from OperatingPeriodIntervalInputEvent + """; + + public EsperOperatingPeriodEvaluation evaluate( + List intervals, + Duration operatingSplitIdleThreshold + ) { + List sorted = sortedPositiveIntervals(intervals); + if (sorted.isEmpty()) { + return new EsperOperatingPeriodEvaluation(List.of(), List.of()); + } + PeriodizationCollector collector = new PeriodizationCollector(operatingSplitIdleThreshold); + executeWithRuntime( + configuration -> configuration.getCommon().addEventType("OperatingPeriodIntervalInputEvent", EsperOperatingPeriodIntervalInputEvent.class), + INPUT_STREAM_EPL, + "operatingPeriodIntervalStream", + newData -> collectInputIntervals(newData, collector), + runtime -> { + for (ActivityIntervalDto interval : sorted) { + runtime.getEventService().sendEventBean(toInputEvent(interval), "OperatingPeriodIntervalInputEvent"); + } + } + ); + return collector.finish(); + } + + private void executeWithRuntime( + java.util.function.Consumer configurationSetup, + String epl, + String statementName, + java.util.function.Consumer listener, + java.util.function.Consumer sender + ) { + EPRuntime runtime = null; + try { + Configuration configuration = new Configuration(); + configurationSetup.accept(configuration); + String runtimeUri = "eventhub-esper-operating-period-" + RUNTIME_COUNTER.incrementAndGet(); + runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration); + + CompilerArguments arguments = new CompilerArguments(configuration); + EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments); + EPDeployment deployment = runtime.getDeploymentService().deploy(compiled); + runtime.getDeploymentService() + .getStatement(deployment.getDeploymentId(), statementName) + .addListener((newData, oldData, statement, rt) -> listener.accept(newData)); + sender.accept(runtime); + } catch (EPCompileException | EPDeployException e) { + throw new IllegalStateException("Cannot compile/deploy Esper operating-period EPL", e); + } finally { + if (runtime != null) { + runtime.destroy(); + } + } + } + + private void collectInputIntervals(EventBean[] newData, PeriodizationCollector collector) { + if (newData == null) { + return; + } + for (EventBean event : newData) { + collector.accept((EsperOperatingPeriodIntervalInputEvent) event.getUnderlying()); + } + } + + private EsperOperatingPeriodIntervalInputEvent toInputEvent(ActivityIntervalDto interval) { + return new EsperOperatingPeriodIntervalInputEvent( + interval.driverEntityId(), + interval.vehicleId(), + interval.vehicleRegistrationId(), + interval.activityType(), + interval.cardSlot(), + interval.cardStatus(), + interval.drivingStatus(), + interval.sourceKind(), + interval.startedAt().toInstant().toEpochMilli(), + interval.endedAt().toInstant().toEpochMilli(), + interval.durationSeconds() * 1000L, + interval.sourceRowId(), + interval.sourceRowIds(), + interval.clippedToRequestedPeriod(), + "UNKNOWN_GAP".equals(interval.level()) + ); + } + + private List sortedPositiveIntervals(List intervals) { + if (intervals == null || intervals.isEmpty()) { + return List.of(); + } + return intervals.stream() + .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) + .sorted(Comparator.comparing(ActivityIntervalDto::startedAt) + .thenComparing(ActivityIntervalDto::endedAt) + .thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + public record EsperOperatingPeriodEvaluation( + List periodizedIntervals, + List closedPeriods + ) { + } + + private static final class PeriodizationCollector { + private final Duration operatingSplitIdleThreshold; + private final List periodizedIntervals = new ArrayList<>(); + private final List closedPeriods = new ArrayList<>(); + private boolean hasOpenPeriod; + private long operatingPeriodNo; + private OffsetDateTime operatingPeriodStartedAt; + private OffsetDateTime lastKnownActivityEndAt; + + private PeriodizationCollector(Duration operatingSplitIdleThreshold) { + this.operatingSplitIdleThreshold = operatingSplitIdleThreshold; + } + + private void accept(EsperOperatingPeriodIntervalInputEvent interval) { + ActivityIntervalDto dto = new ActivityIntervalDto( + interval.driverId(), + interval.vehicleId(), + interval.vehicleRegistrationId(), + interval.activityType(), + interval.cardSlot(), + interval.cardStatus(), + interval.drivingStatus(), + interval.sourceKind(), + OffsetDateTime.ofInstant(java.time.Instant.ofEpochMilli(interval.startTs()), java.time.ZoneOffset.UTC), + OffsetDateTime.ofInstant(java.time.Instant.ofEpochMilli(interval.endTs()), java.time.ZoneOffset.UTC), + interval.durationMs() / 1000L, + interval.sourceRowId(), + interval.sourceRowIds(), + interval.clippedToRequestedPeriod(), + interval.synthetic() ? "UNKNOWN_GAP" : "RAW_INTERVAL" + ); + + if ("UNKNOWN".equals(dto.activityType())) { + if (!hasOpenPeriod) { + return; + } + if (dto.durationSeconds() >= operatingSplitIdleThreshold.getSeconds()) { + closeCurrent("UNKNOWN_GAP"); + return; + } + periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized( + dto, + operatingPeriodNo, + operatingPeriodStartedAt, + false, + Math.max(0, Duration.between(lastKnownActivityEndAt, dto.startedAt()).getSeconds()) + )); + return; + } + + if (!hasOpenPeriod) { + operatingPeriodNo = operatingPeriodNo < 1 ? 1 : operatingPeriodNo + 1; + hasOpenPeriod = true; + operatingPeriodStartedAt = dto.startedAt(); + lastKnownActivityEndAt = dto.endedAt(); + periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized( + dto, + operatingPeriodNo, + operatingPeriodStartedAt, + true, + null + )); + return; + } + + long gapSeconds = Math.max(0, Duration.between(lastKnownActivityEndAt, dto.startedAt()).getSeconds()); + if (gapSeconds >= operatingSplitIdleThreshold.getSeconds()) { + closeCurrent("IDLE_GAP"); + operatingPeriodNo++; + hasOpenPeriod = true; + operatingPeriodStartedAt = dto.startedAt(); + lastKnownActivityEndAt = dto.endedAt(); + periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized( + dto, + operatingPeriodNo, + operatingPeriodStartedAt, + true, + gapSeconds + )); + return; + } + + periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized( + dto, + operatingPeriodNo, + operatingPeriodStartedAt, + false, + gapSeconds + )); + if (dto.endedAt().isAfter(lastKnownActivityEndAt)) { + lastKnownActivityEndAt = dto.endedAt(); + } + } + + private EsperOperatingPeriodEvaluation finish() { + if (hasOpenPeriod) { + closeCurrent("FLUSH"); + } + return new EsperOperatingPeriodEvaluation( + periodizedIntervals.stream() + .sorted(Comparator.comparing(OperatingPeriodActivityIntervalDto::startedAt) + .thenComparing(OperatingPeriodActivityIntervalDto::endedAt) + .thenComparing(OperatingPeriodActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo))) + .toList(), + closedPeriods.stream() + .sorted(Comparator.comparing(EsperClosedOperatingPeriod::startedAt)) + .toList() + ); + } + + private void closeCurrent(String closedBy) { + if (!hasOpenPeriod || operatingPeriodStartedAt == null || lastKnownActivityEndAt == null) { + hasOpenPeriod = false; + return; + } + closedPeriods.add(new EsperClosedOperatingPeriod( + operatingPeriodNo, + operatingPeriodStartedAt, + lastKnownActivityEndAt, + Duration.between(operatingPeriodStartedAt, lastKnownActivityEndAt).getSeconds(), + closedBy + )); + hasOpenPeriod = false; + } + } +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationService.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationService.java new file mode 100644 index 0000000..5977ffb --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationService.java @@ -0,0 +1,694 @@ +package at.procon.eventhub.esperpoc.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto; +import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest; +import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto; +import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode; +import at.procon.eventhub.esperpoc.dto.NonDrivingIntervalDto; +import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto; +import at.procon.eventhub.esperpoc.dto.OperatingPeriodDto; +import at.procon.eventhub.esperpoc.dto.RawActivityEventDto; +import at.procon.eventhub.esperpoc.persistence.EsperPocActivityRepository; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class EsperOperatingPeriodEvaluationService { + + private static final Logger log = LoggerFactory.getLogger(EsperOperatingPeriodEvaluationService.class); + + private final EsperPocActivityRepository activityRepository; + private final EsperDriverActivityEngine activityEngine; + private final EsperOperatingPeriodEngine operatingPeriodEngine; + private final EventHubProperties properties; + + public EsperOperatingPeriodEvaluationService( + EsperPocActivityRepository activityRepository, + EsperDriverActivityEngine activityEngine, + EsperOperatingPeriodEngine operatingPeriodEngine + ) { + this(activityRepository, activityEngine, operatingPeriodEngine, null); + } + + @Autowired + public EsperOperatingPeriodEvaluationService( + EsperPocActivityRepository activityRepository, + EsperDriverActivityEngine activityEngine, + EsperOperatingPeriodEngine operatingPeriodEngine, + EventHubProperties properties + ) { + this.activityRepository = activityRepository; + this.activityEngine = activityEngine; + this.operatingPeriodEngine = operatingPeriodEngine; + this.properties = properties; + } + + public EsperOperatingPeriodResultDto evaluate(EsperOperatingPeriodRequest request) { + long startedNanos = System.nanoTime(); + OffsetDateTime requestedFrom = utc(request.occurredFrom()); + OffsetDateTime requestedTo = utc(request.occurredTo()); + OffsetDateTime loadedFrom = requestedFrom.minusHours(request.guardHours()); + OffsetDateTime loadedTo = requestedTo.plusHours(request.guardHours()); + Duration splitIdleThreshold = Duration.ofHours(resolveOperatingSplitIdleHours(request)); + Duration significantDrivingThreshold = Duration.ofMinutes(resolveSignificantDrivingMinutes(request)); + Duration mergeGapTolerance = Duration.ofSeconds(resolveMergeGapSeconds(request)); + Duration gapDetectionTolerance = Duration.ofSeconds(resolveGapDetectionToleranceSeconds(request)); + EsperUnknownTreatmentMode unknownTreatmentMode = resolveUnknownTreatmentMode(request); + + long dbStartedNanos = System.nanoTime(); + List rawEvents = activityRepository.findDriverActivityEvents( + request.tenantKey(), + request.driverId(), + loadedFrom, + loadedTo + ); + long dbElapsedMs = elapsedMillis(dbStartedNanos); + List driverCardRawEvents = rawEvents.stream() + .filter(event -> "DRIVER_CARD".equals(event.sourceKind())) + .toList(); + List vehicleUnitRawEvents = rawEvents.stream() + .filter(event -> "VEHICLE_UNIT".equals(event.sourceKind())) + .toList(); + + long cardIntervalsStartedNanos = System.nanoTime(); + List driverCardRawIntervals = activityEngine.buildIntervals(driverCardRawEvents); + long cardIntervalsElapsedMs = elapsedMillis(cardIntervalsStartedNanos); + long vuIntervalsStartedNanos = System.nanoTime(); + List vehicleUnitRawIntervals = activityEngine.buildIntervals(vehicleUnitRawEvents); + long vuIntervalsElapsedMs = elapsedMillis(vuIntervalsStartedNanos); + + long vuGapFillStartedNanos = System.nanoTime(); + List resolvedKnownLoadedIntervals = resolveVuFillGaps(driverCardRawIntervals, vehicleUnitRawIntervals); + long vuGapFillElapsedMs = elapsedMillis(vuGapFillStartedNanos); + + long unknownGapStartedNanos = System.nanoTime(); + List evaluationLoadedIntervals = synthesizeUnknownGaps( + resolvedKnownLoadedIntervals, + gapDetectionTolerance + ); + long unknownGapElapsedMs = elapsedMillis(unknownGapStartedNanos); + + long periodizeStartedNanos = System.nanoTime(); + EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate( + evaluationLoadedIntervals, + splitIdleThreshold + ); + long periodizeElapsedMs = elapsedMillis(periodizeStartedNanos); + + long mergeStartedNanos = System.nanoTime(); + List mergedLoadedIntervals = mergeConsecutiveActivities( + evaluation.periodizedIntervals(), + mergeGapTolerance + ); + long mergeElapsedMs = elapsedMillis(mergeStartedNanos); + + long nonDrivingStartedNanos = System.nanoTime(); + List nonDrivingLoadedIntervals = buildNonDrivingIntervals( + mergedLoadedIntervals, + significantDrivingThreshold, + unknownTreatmentMode + ); + long nonDrivingElapsedMs = elapsedMillis(nonDrivingStartedNanos); + + List periodizedIntervals = clipOperatingIntervals( + evaluation.periodizedIntervals(), + requestedFrom, + requestedTo + ); + List mergedIntervals = clipOperatingIntervals( + mergedLoadedIntervals, + requestedFrom, + requestedTo + ); + List nonDrivingIntervals = clipNonDrivingIntervals( + nonDrivingLoadedIntervals, + requestedFrom, + requestedTo + ); + List operatingPeriods = buildOperatingPeriods( + evaluation.closedPeriods(), + periodizedIntervals, + requestedFrom, + requestedTo + ); + long totalElapsedMs = elapsedMillis(startedNanos); + + log.info("Esper operating-period evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} unknownMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedKnownIntervals={} evaluationIntervals={} periodizedIntervals={} mergedIntervals={} nonDrivingIntervals={} operatingPeriods={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, synthUnknown={}, periodizeEsper={}, merge={}, nonDriving={}, total={}}}", + request.tenantKey(), + request.driverId(), + requestedFrom, + requestedTo, + loadedFrom, + loadedTo, + unknownTreatmentMode, + rawEvents.size(), + driverCardRawEvents.size(), + vehicleUnitRawEvents.size(), + driverCardRawIntervals.size(), + vehicleUnitRawIntervals.size(), + resolvedKnownLoadedIntervals.size(), + evaluationLoadedIntervals.size(), + periodizedIntervals.size(), + mergedIntervals.size(), + nonDrivingIntervals.size(), + operatingPeriods.size(), + dbElapsedMs, + cardIntervalsElapsedMs, + vuIntervalsElapsedMs, + vuGapFillElapsedMs, + unknownGapElapsedMs, + periodizeElapsedMs, + mergeElapsedMs, + nonDrivingElapsedMs, + totalElapsedMs); + + return new EsperOperatingPeriodResultDto( + request.tenantKey(), + request.driverId(), + requestedFrom, + requestedTo, + loadedFrom, + loadedTo, + rawEvents.size(), + driverCardRawEvents.size(), + vehicleUnitRawEvents.size(), + driverCardRawIntervals.size(), + vehicleUnitRawIntervals.size(), + resolvedKnownLoadedIntervals.size(), + evaluationLoadedIntervals.size(), + periodizedIntervals.size(), + mergedIntervals.size(), + nonDrivingIntervals.size(), + operatingPeriods.size(), + resolveOperatingSplitIdleHours(request), + resolveSignificantDrivingMinutes(request), + resolveMergeGapSeconds(request), + resolveGapDetectionToleranceSeconds(request), + unknownTreatmentMode, + rawEvents, + resolvedKnownLoadedIntervals, + evaluationLoadedIntervals, + periodizedIntervals, + mergedIntervals, + nonDrivingIntervals, + operatingPeriods, + notes( + unknownTreatmentMode, + resolveOperatingSplitIdleHours(request), + resolveSignificantDrivingMinutes(request), + resolveGapDetectionToleranceSeconds(request) + ) + ); + } + + List synthesizeUnknownGaps( + List resolvedKnownLoadedIntervals, + Duration gapDetectionTolerance + ) { + List allKnown = sortedPositiveIntervals(resolvedKnownLoadedIntervals); + List nonRestActivities = allKnown.stream() + .filter(interval -> !"BREAK_REST".equals(interval.activityType())) + .toList(); + if (nonRestActivities.isEmpty()) { + return List.of(); + } + + List result = new ArrayList<>(); + for (int index = 0; index < nonRestActivities.size(); index++) { + ActivityIntervalDto current = nonRestActivities.get(index); + result.add(current); + if (index + 1 >= nonRestActivities.size()) { + continue; + } + ActivityIntervalDto next = nonRestActivities.get(index + 1); + if (!next.startedAt().isAfter(current.endedAt())) { + continue; + } + OffsetDateTime gapStart = current.endedAt(); + OffsetDateTime gapEnd = next.startedAt(); + List uncoveredGapSegments = subtractCoverage( + unknownGapTemplate(current, next, gapStart, gapEnd), + allKnown + ); + for (ActivityIntervalDto gap : uncoveredGapSegments) { + if (gap.durationSeconds() > gapDetectionTolerance.getSeconds()) { + result.add(gap); + } + } + } + return result.stream() + .sorted(Comparator.comparing(ActivityIntervalDto::startedAt) + .thenComparing(ActivityIntervalDto::endedAt) + .thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + List mergeConsecutiveActivities( + List intervals, + Duration mergeGapTolerance + ) { + if (intervals == null || intervals.isEmpty()) { + return List.of(); + } + List sorted = intervals.stream() + .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) + .sorted(Comparator.comparing(OperatingPeriodActivityIntervalDto::startedAt) + .thenComparing(OperatingPeriodActivityIntervalDto::endedAt) + .thenComparing(OperatingPeriodActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo))) + .toList(); + List result = new ArrayList<>(); + OperatingPeriodActivityIntervalDto current = null; + List currentSources = new ArrayList<>(); + for (OperatingPeriodActivityIntervalDto next : sorted) { + if (current == null) { + current = next; + currentSources = new ArrayList<>(next.sourceRowIds()); + continue; + } + if (canMerge(current, next, mergeGapTolerance)) { + currentSources.addAll(next.sourceRowIds()); + current = current.asMerged(max(current.endedAt(), next.endedAt()), List.copyOf(currentSources)); + } else { + result.add(current.asMerged(current.endedAt(), List.copyOf(currentSources))); + current = next; + currentSources = new ArrayList<>(next.sourceRowIds()); + } + } + if (current != null) { + result.add(current.asMerged(current.endedAt(), List.copyOf(currentSources))); + } + return result; + } + + List buildNonDrivingIntervals( + List mergedIntervals, + Duration significantDrivingThreshold, + EsperUnknownTreatmentMode unknownTreatmentMode + ) { + if (mergedIntervals == null || mergedIntervals.isEmpty()) { + return List.of(); + } + List result = new ArrayList<>(); + NonDrivingAccumulator open = null; + for (OperatingPeriodActivityIntervalDto interval : mergedIntervals) { + if (open != null && open.operatingPeriodNo != interval.operatingPeriodNo()) { + result.add(open.close("NEW_OPERATING_PERIOD")); + open = null; + } + if (startsOrExtendsNonDriving(interval, unknownTreatmentMode)) { + open = open == null ? NonDrivingAccumulator.open(interval) : open.extend(interval); + continue; + } + if ("UNKNOWN".equals(interval.activityType()) && unknownTreatmentMode == EsperUnknownTreatmentMode.AS_UNKNOWN_NON_BREAK) { + if (open != null) { + result.add(open.close("UNKNOWN_GAP")); + open = null; + } + continue; + } + if ("DRIVE".equals(interval.activityType())) { + if (interval.durationSeconds() >= significantDrivingThreshold.getSeconds()) { + if (open != null) { + result.add(open.close("SIGNIFICANT_DRIVING")); + open = null; + } + } else if (open != null) { + open = open.extend(interval); + } + } + } + if (open != null) { + result.add(open.close("FLUSH")); + } + return result; + } + + private boolean startsOrExtendsNonDriving( + OperatingPeriodActivityIntervalDto interval, + EsperUnknownTreatmentMode unknownTreatmentMode + ) { + if ("WORK".equals(interval.activityType()) || "AVAILABILITY".equals(interval.activityType())) { + return true; + } + return "UNKNOWN".equals(interval.activityType()) + && unknownTreatmentMode == EsperUnknownTreatmentMode.AS_BREAK_REST; + } + + private boolean canMerge( + OperatingPeriodActivityIntervalDto left, + OperatingPeriodActivityIntervalDto right, + Duration mergeGapTolerance + ) { + long gapSeconds = Duration.between(left.endedAt(), right.startedAt()).getSeconds(); + return Objects.equals(left.driverId(), right.driverId()) + && left.operatingPeriodNo() == right.operatingPeriodNo() + && Objects.equals(left.activityType(), right.activityType()) + && Objects.equals(left.cardSlot(), right.cardSlot()) + && Objects.equals(left.cardStatus(), right.cardStatus()) + && Objects.equals(left.drivingStatus(), right.drivingStatus()) + && Objects.equals(left.sourceKind(), right.sourceKind()) + && left.synthetic() == right.synthetic() + && gapSeconds <= mergeGapTolerance.getSeconds(); + } + + private List buildOperatingPeriods( + List closedPeriods, + List clippedPeriodizedIntervals, + OffsetDateTime requestedFrom, + OffsetDateTime requestedTo + ) { + Map> intervalsByPeriod = new LinkedHashMap<>(); + for (OperatingPeriodActivityIntervalDto interval : clippedPeriodizedIntervals) { + intervalsByPeriod.computeIfAbsent(interval.operatingPeriodNo(), ignored -> new ArrayList<>()).add(interval); + } + List result = new ArrayList<>(); + for (EsperClosedOperatingPeriod closedPeriod : closedPeriods) { + if (!closedPeriod.endedAt().isAfter(requestedFrom) || !closedPeriod.startedAt().isBefore(requestedTo)) { + continue; + } + OffsetDateTime start = max(closedPeriod.startedAt(), requestedFrom); + OffsetDateTime end = min(closedPeriod.endedAt(), requestedTo); + if (!end.isAfter(start)) { + continue; + } + List intervals = intervalsByPeriod.getOrDefault(closedPeriod.operatingPeriodNo(), List.of()); + long drivingSeconds = sumActivitySeconds(intervals, "DRIVE"); + long workSeconds = sumActivitySeconds(intervals, "WORK"); + long availabilitySeconds = sumActivitySeconds(intervals, "AVAILABILITY"); + long unknownSeconds = sumActivitySeconds(intervals, "UNKNOWN"); + result.add(new OperatingPeriodDto( + closedPeriod.operatingPeriodNo(), + start, + end, + Duration.between(start, end).getSeconds(), + closedPeriod.closedBy(), + drivingSeconds, + workSeconds, + availabilitySeconds, + unknownSeconds, + intervals.size(), + !start.equals(closedPeriod.startedAt()) || !end.equals(closedPeriod.endedAt()) + )); + } + return result; + } + + private long sumActivitySeconds(List intervals, String activityType) { + return intervals.stream() + .filter(interval -> activityType.equals(interval.activityType())) + .mapToLong(OperatingPeriodActivityIntervalDto::durationSeconds) + .sum(); + } + + private List clipOperatingIntervals( + List intervals, + OffsetDateTime requestedFrom, + OffsetDateTime requestedTo + ) { + return intervals.stream() + .map(interval -> { + OffsetDateTime start = max(interval.startedAt(), requestedFrom); + OffsetDateTime end = min(interval.endedAt(), requestedTo); + if (!end.isAfter(start)) { + return null; + } + boolean clipped = interval.clippedToRequestedPeriod() + || !start.equals(interval.startedAt()) + || !end.equals(interval.endedAt()); + return interval.withTime(start, end, clipped); + }) + .filter(Objects::nonNull) + .toList(); + } + + private List clipNonDrivingIntervals( + List intervals, + OffsetDateTime requestedFrom, + OffsetDateTime requestedTo + ) { + return intervals.stream() + .map(interval -> { + OffsetDateTime start = max(interval.startedAt(), requestedFrom); + OffsetDateTime end = min(interval.endedAt(), requestedTo); + if (!end.isAfter(start)) { + return null; + } + return new NonDrivingIntervalDto( + interval.driverId(), + interval.vehicleId(), + interval.vehicleRegistrationId(), + interval.cardSlot(), + start, + end, + Duration.between(start, end).getSeconds(), + interval.operatingPeriodNo(), + interval.operatingPeriodStartedAt(), + interval.closedBy(), + interval.containsUnknown(), + interval.unknownSeconds(), + !start.equals(interval.startedAt()) || !end.equals(interval.endedAt()) + ); + }) + .filter(Objects::nonNull) + .toList(); + } + + private ActivityIntervalDto unknownGapTemplate( + ActivityIntervalDto previous, + ActivityIntervalDto next, + OffsetDateTime gapStart, + OffsetDateTime gapEnd + ) { + return new ActivityIntervalDto( + previous.driverEntityId(), + Objects.equals(previous.vehicleId(), next.vehicleId()) ? previous.vehicleId() : null, + Objects.equals(previous.vehicleRegistrationId(), next.vehicleRegistrationId()) ? previous.vehicleRegistrationId() : null, + "UNKNOWN", + Objects.equals(previous.cardSlot(), next.cardSlot()) ? previous.cardSlot() : null, + previous.cardStatus(), + "UNKNOWN", + "SYNTHETIC_GAP", + gapStart, + gapEnd, + Duration.between(gapStart, gapEnd).getSeconds(), + null, + List.of(), + false, + "UNKNOWN_GAP" + ); + } + + private List resolveVuFillGaps( + List driverCardRawIntervals, + List vehicleUnitRawIntervals + ) { + List driverCard = sortedPositiveIntervals(driverCardRawIntervals); + List vehicleUnit = sortedPositiveIntervals(vehicleUnitRawIntervals); + if (driverCard.isEmpty()) { + return vehicleUnit; + } + if (vehicleUnit.isEmpty()) { + return driverCard; + } + + List resolved = new ArrayList<>(driverCard); + for (ActivityIntervalDto vuInterval : vehicleUnit) { + resolved.addAll(subtractCoverage(vuInterval, driverCard)); + } + return resolved.stream() + .sorted(Comparator.comparing(ActivityIntervalDto::startedAt) + .thenComparing(ActivityIntervalDto::endedAt) + .thenComparing(ActivityIntervalDto::sourceKind, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + private List subtractCoverage( + ActivityIntervalDto candidate, + List coverage + ) { + List result = new ArrayList<>(); + OffsetDateTime cursor = candidate.startedAt(); + for (ActivityIntervalDto covered : coverage) { + if (!covered.endedAt().isAfter(cursor)) { + continue; + } + if (!covered.startedAt().isBefore(candidate.endedAt())) { + break; + } + OffsetDateTime overlapStart = max(cursor, covered.startedAt()); + if (overlapStart.isAfter(cursor)) { + result.add(candidate.withTime(cursor, overlapStart, candidate.clippedToRequestedPeriod())); + } + if (covered.endedAt().isAfter(cursor)) { + cursor = max(cursor, covered.endedAt()); + } + if (!candidate.endedAt().isAfter(cursor)) { + break; + } + } + if (candidate.endedAt().isAfter(cursor)) { + result.add(candidate.withTime(cursor, candidate.endedAt(), candidate.clippedToRequestedPeriod())); + } + return result.stream() + .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) + .toList(); + } + + private List sortedPositiveIntervals(List intervals) { + if (intervals == null || intervals.isEmpty()) { + return List.of(); + } + return intervals.stream() + .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) + .sorted(Comparator.comparing(ActivityIntervalDto::startedAt) + .thenComparing(ActivityIntervalDto::endedAt) + .thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + private int resolveOperatingSplitIdleHours(EsperOperatingPeriodRequest request) { + if (request.operatingSplitIdleHours() != null) { + return request.operatingSplitIdleHours(); + } + return properties == null ? 7 : properties.getEsperPoc().getOperatingPeriodEvaluation().getOperatingSplitIdleHours(); + } + + private int resolveSignificantDrivingMinutes(EsperOperatingPeriodRequest request) { + if (request.significantDrivingMinutes() != null) { + return request.significantDrivingMinutes(); + } + return properties == null ? 3 : properties.getEsperPoc().getOperatingPeriodEvaluation().getSignificantDrivingMinutes(); + } + + private int resolveMergeGapSeconds(EsperOperatingPeriodRequest request) { + if (request.mergeGapSeconds() != null) { + return request.mergeGapSeconds(); + } + return properties == null ? 0 : properties.getEsperPoc().getOperatingPeriodEvaluation().getMergeGapSeconds(); + } + + private int resolveGapDetectionToleranceSeconds(EsperOperatingPeriodRequest request) { + if (request.gapDetectionToleranceSeconds() != null) { + return request.gapDetectionToleranceSeconds(); + } + return properties == null ? 0 : properties.getEsperPoc().getOperatingPeriodEvaluation().getGapDetectionToleranceSeconds(); + } + + private EsperUnknownTreatmentMode resolveUnknownTreatmentMode(EsperOperatingPeriodRequest request) { + if (request.unknownTreatmentMode() != null) { + return request.unknownTreatmentMode(); + } + return properties == null + ? EsperUnknownTreatmentMode.AS_BREAK_REST + : properties.getEsperPoc().getOperatingPeriodEvaluation().getUnknownTreatmentMode(); + } + + private List notes( + EsperUnknownTreatmentMode unknownTreatmentMode, + int operatingSplitIdleHours, + int significantDrivingMinutes, + int gapDetectionToleranceSeconds + ) { + return List.of( + "This endpoint runs in parallel to the existing working-shift PoC and does not change its semantics.", + "BREAK_REST events are ignored for activity evaluation but still prevent synthetic UNKNOWN intervals from being created over covered rest spans.", + "Synthetic UNKNOWN intervals are created only for uncovered gaps between non-rest activities.", + "UNKNOWN treatment mode is " + unknownTreatmentMode + ".", + "Operating periods split after " + operatingSplitIdleHours + " hours of no non-rest activity; significant driving closes non-driving intervals from " + significantDrivingMinutes + " minutes onward.", + "Synthetic UNKNOWN gaps are only emitted when uncovered time exceeds " + gapDetectionToleranceSeconds + " seconds." + ); + } + + private OffsetDateTime utc(OffsetDateTime value) { + return value == null ? null : value.withOffsetSameInstant(java.time.ZoneOffset.UTC); + } + + private OffsetDateTime max(OffsetDateTime left, OffsetDateTime right) { + return left.isAfter(right) ? left : right; + } + + private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) { + return left.isBefore(right) ? left : right; + } + + private long elapsedMillis(long startedNanos) { + return Math.round((System.nanoTime() - startedNanos) / 1_000_000.0d); + } + + private static final class NonDrivingAccumulator { + private final UUID driverId; + private UUID vehicleId; + private UUID vehicleRegistrationId; + private final String cardSlot; + private final OffsetDateTime startedAt; + private OffsetDateTime endedAt; + private final long operatingPeriodNo; + private final OffsetDateTime operatingPeriodStartedAt; + private boolean containsUnknown; + private long unknownSeconds; + + private NonDrivingAccumulator(OperatingPeriodActivityIntervalDto interval) { + this.driverId = interval.driverId(); + this.vehicleId = interval.vehicleId(); + this.vehicleRegistrationId = interval.vehicleRegistrationId(); + this.cardSlot = interval.cardSlot(); + this.startedAt = interval.startedAt(); + this.endedAt = interval.endedAt(); + this.operatingPeriodNo = interval.operatingPeriodNo(); + this.operatingPeriodStartedAt = interval.operatingPeriodStartedAt(); + this.containsUnknown = "UNKNOWN".equals(interval.activityType()); + this.unknownSeconds = "UNKNOWN".equals(interval.activityType()) ? interval.durationSeconds() : 0L; + } + + private static NonDrivingAccumulator open(OperatingPeriodActivityIntervalDto interval) { + return new NonDrivingAccumulator(interval); + } + + private NonDrivingAccumulator extend(OperatingPeriodActivityIntervalDto interval) { + if (interval.vehicleId() != null) { + this.vehicleId = interval.vehicleId(); + } + if (interval.vehicleRegistrationId() != null) { + this.vehicleRegistrationId = interval.vehicleRegistrationId(); + } + if (interval.endedAt().isAfter(this.endedAt)) { + this.endedAt = interval.endedAt(); + } + if ("UNKNOWN".equals(interval.activityType())) { + this.containsUnknown = true; + this.unknownSeconds += interval.durationSeconds(); + } + return this; + } + + private NonDrivingIntervalDto close(String closedBy) { + return new NonDrivingIntervalDto( + driverId, + vehicleId, + vehicleRegistrationId, + cardSlot, + startedAt, + endedAt, + Duration.between(startedAt, endedAt).getSeconds(), + operatingPeriodNo, + operatingPeriodStartedAt, + closedBy, + containsUnknown, + unknownSeconds, + false + ); + } + } +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodIntervalInputEvent.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodIntervalInputEvent.java new file mode 100644 index 0000000..ef9e9a6 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodIntervalInputEvent.java @@ -0,0 +1,23 @@ +package at.procon.eventhub.esperpoc.service; + +import java.util.List; +import java.util.UUID; + +public record EsperOperatingPeriodIntervalInputEvent( + UUID driverId, + UUID vehicleId, + UUID vehicleRegistrationId, + String activityType, + String cardSlot, + String cardStatus, + String drivingStatus, + String sourceKind, + long startTs, + long endTs, + long durationMs, + String sourceRowId, + List sourceRowIds, + boolean clippedToRequestedPeriod, + boolean synthetic +) { +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3c53752..5405526 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -123,6 +123,12 @@ eventhub: esper-poc: activity-merge-mode: JAVA shift-resolution-mode: JAVA + operating-period-evaluation: + operating-split-idle-hours: 7 + significant-driving-minutes: 3 + merge-gap-seconds: 0 + gap-detection-tolerance-seconds: 0 + unknown-treatment-mode: AS_BREAK_REST yellow-fox: default-chunk-days: 1 diff --git a/src/test/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationServiceTest.java b/src/test/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationServiceTest.java new file mode 100644 index 0000000..fe5c442 --- /dev/null +++ b/src/test/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationServiceTest.java @@ -0,0 +1,154 @@ +package at.procon.eventhub.esperpoc.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto; +import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode; +import at.procon.eventhub.esperpoc.dto.NonDrivingIntervalDto; +import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class EsperOperatingPeriodEvaluationServiceTest { + + private final EsperOperatingPeriodEngine operatingPeriodEngine = new EsperOperatingPeriodEngine(); + private final EsperOperatingPeriodEvaluationService service = new EsperOperatingPeriodEvaluationService( + null, + new EsperDriverActivityEngine(), + operatingPeriodEngine + ); + + @Test + void synthesizeUnknownGapsSkipsCoveredBreakRestAndCreatesOnlyUncoveredUnknown() { + UUID driverId = UUID.randomUUID(); + List intervals = List.of( + activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"), + activity(driverId, "BREAK_REST", "2026-04-01T09:00:00Z", "2026-04-01T10:00:00Z", "r1", "DRIVER_CARD"), + activity(driverId, "AVAILABILITY", "2026-04-01T10:00:00Z", "2026-04-01T11:00:00Z", "a1", "DRIVER_CARD"), + activity(driverId, "WORK", "2026-04-01T12:00:00Z", "2026-04-01T13:00:00Z", "w2", "DRIVER_CARD") + ); + + List evaluationIntervals = service.synthesizeUnknownGaps(intervals, Duration.ZERO); + + assertThat(evaluationIntervals).extracting(ActivityIntervalDto::activityType) + .containsExactly("WORK", "AVAILABILITY", "UNKNOWN", "WORK"); + assertThat(evaluationIntervals.get(2).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z")); + assertThat(evaluationIntervals.get(2).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T12:00:00Z")); + } + + @Test + void periodizationSplitsAfterLongUnknownGapAndShortDrivingDoesNotCloseNonDriving() { + UUID driverId = UUID.randomUUID(); + List evaluationIntervals = List.of( + activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"), + activity(driverId, "AVAILABILITY", "2026-04-01T10:00:00Z", "2026-04-01T11:00:00Z", "a1", "DRIVER_CARD"), + activity(driverId, "DRIVE", "2026-04-01T11:00:00Z", "2026-04-01T11:02:00Z", "d1", "DRIVER_CARD"), + activity(driverId, "WORK", "2026-04-01T11:02:00Z", "2026-04-01T12:00:00Z", "w2", "DRIVER_CARD"), + unknown(driverId, "2026-04-01T12:00:00Z", "2026-04-01T20:00:00Z"), + activity(driverId, "DRIVE", "2026-04-01T20:00:00Z", "2026-04-01T20:30:00Z", "d2", "DRIVER_CARD") + ); + + EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate( + evaluationIntervals, + Duration.ofHours(7) + ); + + assertThat(evaluation.periodizedIntervals()).extracting(OperatingPeriodActivityIntervalDto::activityType) + .containsExactly("WORK", "AVAILABILITY", "DRIVE", "WORK", "DRIVE"); + assertThat(evaluation.closedPeriods()).hasSize(2); + assertThat(evaluation.closedPeriods().get(0).closedBy()).isEqualTo("UNKNOWN_GAP"); + assertThat(evaluation.closedPeriods().get(1).closedBy()).isEqualTo("FLUSH"); + + List mergedIntervals = service.mergeConsecutiveActivities( + evaluation.periodizedIntervals(), + Duration.ZERO + ); + List nonDrivingIntervals = service.buildNonDrivingIntervals( + mergedIntervals, + Duration.ofMinutes(3), + EsperUnknownTreatmentMode.AS_BREAK_REST + ); + + assertThat(nonDrivingIntervals).hasSize(1); + assertThat(nonDrivingIntervals.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T08:00:00Z")); + assertThat(nonDrivingIntervals.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T12:00:00Z")); + assertThat(nonDrivingIntervals.get(0).closedBy()).isEqualTo("NEW_OPERATING_PERIOD"); + } + + @Test + void unknownCanCloseNonDrivingWhenConfiguredAsNonBreak() { + UUID driverId = UUID.randomUUID(); + List mergedIntervals = List.of( + periodized(activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"), 1), + periodized(unknown(driverId, "2026-04-01T09:00:00Z", "2026-04-01T09:30:00Z"), 1), + periodized(activity(driverId, "AVAILABILITY", "2026-04-01T09:30:00Z", "2026-04-01T10:00:00Z", "a1", "DRIVER_CARD"), 1) + ); + + List nonDrivingIntervals = service.buildNonDrivingIntervals( + mergedIntervals, + Duration.ofMinutes(3), + EsperUnknownTreatmentMode.AS_UNKNOWN_NON_BREAK + ); + + assertThat(nonDrivingIntervals).hasSize(2); + assertThat(nonDrivingIntervals.get(0).closedBy()).isEqualTo("UNKNOWN_GAP"); + assertThat(nonDrivingIntervals.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T09:00:00Z")); + assertThat(nonDrivingIntervals.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T09:30:00Z")); + } + + private ActivityIntervalDto activity( + UUID driverId, + String activity, + String from, + String to, + String sourceRowId, + String sourceKind + ) { + return ActivityIntervalDto.raw( + driverId, + null, + null, + activity, + "DRIVER", + "INSERTED", + "KNOWN", + sourceKind, + OffsetDateTime.parse(from), + OffsetDateTime.parse(to), + sourceRowId + ); + } + + private ActivityIntervalDto unknown(UUID driverId, String from, String to) { + return new ActivityIntervalDto( + driverId, + null, + null, + "UNKNOWN", + "DRIVER", + null, + "UNKNOWN", + "SYNTHETIC_GAP", + OffsetDateTime.parse(from), + OffsetDateTime.parse(to), + Duration.between(OffsetDateTime.parse(from), OffsetDateTime.parse(to)).getSeconds(), + null, + List.of(), + false, + "UNKNOWN_GAP" + ); + } + + private OperatingPeriodActivityIntervalDto periodized(ActivityIntervalDto interval, long operatingPeriodNo) { + return OperatingPeriodActivityIntervalDto.periodized( + interval, + operatingPeriodNo, + interval.startedAt(), + false, + 0L + ); + } +}