Add Esper operating-period evaluation PoC

This commit is contained in:
trifonovt 2026-05-06 10:24:53 +02:00
parent 818009555a
commit 94767bc161
15 changed files with 1530 additions and 1 deletions

View File

@ -57,6 +57,68 @@
]
}
}
},
{
"name": "Evaluate tachograph operating periods",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{baseUrl}}/api/eventhub/esper-poc/tachograph/operating-period-evaluation?tenantKey={{tenantKey}}&driverId={{driverId}}&occurredFrom={{occurredFrom}}&occurredTo={{occurredTo}}&guardHours=24&operatingSplitIdleHours=7&significantDrivingMinutes=3&mergeGapSeconds=0&gapDetectionToleranceSeconds=0&unknownTreatmentMode=AS_BREAK_REST",
"host": [
"{{baseUrl}}"
],
"path": [
"api",
"eventhub",
"esper-poc",
"tachograph",
"operating-period-evaluation"
],
"query": [
{
"key": "tenantKey",
"value": "{{tenantKey}}"
},
{
"key": "driverId",
"value": "{{driverId}}"
},
{
"key": "occurredFrom",
"value": "{{occurredFrom}}"
},
{
"key": "occurredTo",
"value": "{{occurredTo}}"
},
{
"key": "guardHours",
"value": "24"
},
{
"key": "operatingSplitIdleHours",
"value": "7"
},
{
"key": "significantDrivingMinutes",
"value": "3"
},
{
"key": "mergeGapSeconds",
"value": "0"
},
{
"key": "gapDetectionToleranceSeconds",
"value": "0"
},
{
"key": "unknownTreatmentMode",
"value": "AS_BREAK_REST"
}
]
}
}
}
],
"variable": [
@ -72,6 +134,10 @@
"key": "driverEntityId",
"value": "00000000-0000-0000-0000-000000000000"
},
{
"key": "driverId",
"value": "00000000-0000-0000-0000-000000000000"
},
{
"key": "occurredFrom",
"value": "2026-04-01T00:00:00Z"

View File

@ -1,6 +1,7 @@
package at.procon.eventhub.config;
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode;
import java.time.Duration;
import java.time.OffsetDateTime;
@ -50,6 +51,7 @@ public class EventHubProperties {
public static class EsperPoc {
private EsperActivityMergeMode activityMergeMode = EsperActivityMergeMode.JAVA;
private EsperShiftResolutionMode shiftResolutionMode = EsperShiftResolutionMode.JAVA;
private final OperatingPeriodEvaluation operatingPeriodEvaluation = new OperatingPeriodEvaluation();
public EsperActivityMergeMode getActivityMergeMode() {
return activityMergeMode;
@ -66,6 +68,60 @@ public class EventHubProperties {
public void setShiftResolutionMode(EsperShiftResolutionMode shiftResolutionMode) {
this.shiftResolutionMode = shiftResolutionMode == null ? EsperShiftResolutionMode.JAVA : shiftResolutionMode;
}
public OperatingPeriodEvaluation getOperatingPeriodEvaluation() {
return operatingPeriodEvaluation;
}
}
public static class OperatingPeriodEvaluation {
private int operatingSplitIdleHours = 7;
private int significantDrivingMinutes = 3;
private int mergeGapSeconds = 0;
private int gapDetectionToleranceSeconds = 0;
private EsperUnknownTreatmentMode unknownTreatmentMode = EsperUnknownTreatmentMode.AS_BREAK_REST;
public int getOperatingSplitIdleHours() {
return operatingSplitIdleHours;
}
public void setOperatingSplitIdleHours(int operatingSplitIdleHours) {
this.operatingSplitIdleHours = Math.max(1, operatingSplitIdleHours);
}
public int getSignificantDrivingMinutes() {
return significantDrivingMinutes;
}
public void setSignificantDrivingMinutes(int significantDrivingMinutes) {
this.significantDrivingMinutes = Math.max(1, significantDrivingMinutes);
}
public int getMergeGapSeconds() {
return mergeGapSeconds;
}
public void setMergeGapSeconds(int mergeGapSeconds) {
this.mergeGapSeconds = Math.max(0, mergeGapSeconds);
}
public int getGapDetectionToleranceSeconds() {
return gapDetectionToleranceSeconds;
}
public void setGapDetectionToleranceSeconds(int gapDetectionToleranceSeconds) {
this.gapDetectionToleranceSeconds = Math.max(0, gapDetectionToleranceSeconds);
}
public EsperUnknownTreatmentMode getUnknownTreatmentMode() {
return unknownTreatmentMode;
}
public void setUnknownTreatmentMode(EsperUnknownTreatmentMode unknownTreatmentMode) {
this.unknownTreatmentMode = unknownTreatmentMode == null
? EsperUnknownTreatmentMode.AS_BREAK_REST
: unknownTreatmentMode;
}
}
public static class Batch {

View File

@ -1,9 +1,13 @@
package at.procon.eventhub.esperpoc.api;
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
import at.procon.eventhub.esperpoc.dto.EsperPocResultDto;
import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode;
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
import at.procon.eventhub.esperpoc.service.EsperOperatingPeriodEvaluationService;
import at.procon.eventhub.esperpoc.service.EsperPocDriverCardActivityService;
import java.time.OffsetDateTime;
import java.util.UUID;
@ -19,9 +23,14 @@ import org.springframework.web.bind.annotation.RestController;
public class EsperPocController {
private final EsperPocDriverCardActivityService service;
private final EsperOperatingPeriodEvaluationService operatingPeriodEvaluationService;
public EsperPocController(EsperPocDriverCardActivityService service) {
public EsperPocController(
EsperPocDriverCardActivityService service,
EsperOperatingPeriodEvaluationService operatingPeriodEvaluationService
) {
this.service = service;
this.operatingPeriodEvaluationService = operatingPeriodEvaluationService;
}
@GetMapping("/tachograph/driver-card-activities")
@ -55,4 +64,32 @@ public class EsperPocController {
);
return ResponseEntity.ok(service.evaluate(request));
}
@GetMapping("/tachograph/operating-period-evaluation")
public ResponseEntity<EsperOperatingPeriodResultDto> evaluateOperatingPeriods(
@RequestParam String tenantKey,
@RequestParam UUID driverId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredFrom,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredTo,
@RequestParam(defaultValue = "24") Integer guardHours,
@RequestParam(required = false) Integer operatingSplitIdleHours,
@RequestParam(required = false) Integer significantDrivingMinutes,
@RequestParam(required = false) Integer mergeGapSeconds,
@RequestParam(required = false) Integer gapDetectionToleranceSeconds,
@RequestParam(required = false) EsperUnknownTreatmentMode unknownTreatmentMode
) {
EsperOperatingPeriodRequest request = new EsperOperatingPeriodRequest(
tenantKey,
driverId,
occurredFrom,
occurredTo,
guardHours,
operatingSplitIdleHours,
significantDrivingMinutes,
mergeGapSeconds,
gapDetectionToleranceSeconds,
unknownTreatmentMode
);
return ResponseEntity.ok(operatingPeriodEvaluationService.evaluate(request));
}
}

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.esperpoc.dto;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.time.OffsetDateTime;
import java.util.UUID;
public record EsperOperatingPeriodRequest(
@NotBlank String tenantKey,
@NotNull UUID driverId,
@NotNull OffsetDateTime occurredFrom,
@NotNull OffsetDateTime occurredTo,
Integer guardHours,
Integer operatingSplitIdleHours,
Integer significantDrivingMinutes,
Integer mergeGapSeconds,
Integer gapDetectionToleranceSeconds,
EsperUnknownTreatmentMode unknownTreatmentMode
) {
public EsperOperatingPeriodRequest {
if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) {
throw new IllegalArgumentException("occurredFrom must be before occurredTo");
}
guardHours = guardHours == null ? 24 : Math.max(0, guardHours);
operatingSplitIdleHours = operatingSplitIdleHours == null ? null : Math.max(1, operatingSplitIdleHours);
significantDrivingMinutes = significantDrivingMinutes == null ? null : Math.max(1, significantDrivingMinutes);
mergeGapSeconds = mergeGapSeconds == null ? null : Math.max(0, mergeGapSeconds);
gapDetectionToleranceSeconds = gapDetectionToleranceSeconds == null ? null : Math.max(0, gapDetectionToleranceSeconds);
}
}

View File

@ -0,0 +1,39 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
public record EsperOperatingPeriodResultDto(
String tenantKey,
UUID driverId,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
OffsetDateTime loadedFrom,
OffsetDateTime loadedTo,
int rawEventCount,
int driverCardRawEventCount,
int vehicleUnitRawEventCount,
int driverCardIntervalCount,
int vehicleUnitIntervalCount,
int resolvedKnownIntervalCount,
int evaluationIntervalCount,
int periodizedIntervalCount,
int mergedIntervalCount,
int nonDrivingIntervalCount,
int operatingPeriodCount,
int operatingSplitIdleHours,
int significantDrivingMinutes,
int mergeGapSeconds,
int gapDetectionToleranceSeconds,
EsperUnknownTreatmentMode unknownTreatmentMode,
List<RawActivityEventDto> rawEvents,
List<ActivityIntervalDto> resolvedKnownIntervals,
List<ActivityIntervalDto> evaluationIntervals,
List<OperatingPeriodActivityIntervalDto> periodizedIntervals,
List<OperatingPeriodActivityIntervalDto> mergedIntervals,
List<NonDrivingIntervalDto> nonDrivingIntervals,
List<OperatingPeriodDto> operatingPeriods,
List<String> notes
) {
}

View File

@ -0,0 +1,6 @@
package at.procon.eventhub.esperpoc.dto;
public enum EsperUnknownTreatmentMode {
AS_BREAK_REST,
AS_UNKNOWN_NON_BREAK
}

View File

@ -0,0 +1,21 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.UUID;
public record NonDrivingIntervalDto(
UUID driverId,
UUID vehicleId,
UUID vehicleRegistrationId,
String cardSlot,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
long operatingPeriodNo,
OffsetDateTime operatingPeriodStartedAt,
String closedBy,
boolean containsUnknown,
long unknownSeconds,
boolean clippedToRequestedPeriod
) {
}

View File

@ -0,0 +1,110 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
public record OperatingPeriodActivityIntervalDto(
UUID driverId,
UUID vehicleId,
UUID vehicleRegistrationId,
String activityType,
String cardSlot,
String cardStatus,
String drivingStatus,
String sourceKind,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
String sourceRowId,
List<String> sourceRowIds,
boolean clippedToRequestedPeriod,
String level,
long operatingPeriodNo,
OffsetDateTime operatingPeriodStartedAt,
boolean newOperatingPeriod,
Long gapSincePreviousActivitySeconds,
boolean synthetic
) {
public static OperatingPeriodActivityIntervalDto periodized(
ActivityIntervalDto interval,
long operatingPeriodNo,
OffsetDateTime operatingPeriodStartedAt,
boolean newOperatingPeriod,
Long gapSincePreviousActivitySeconds
) {
return new OperatingPeriodActivityIntervalDto(
interval.driverEntityId(),
interval.vehicleId(),
interval.vehicleRegistrationId(),
interval.activityType(),
interval.cardSlot(),
interval.cardStatus(),
interval.drivingStatus(),
interval.sourceKind(),
interval.startedAt(),
interval.endedAt(),
interval.durationSeconds(),
interval.sourceRowId(),
interval.sourceRowIds(),
interval.clippedToRequestedPeriod(),
"PERIODIZED_ACTIVITY",
operatingPeriodNo,
operatingPeriodStartedAt,
newOperatingPeriod,
gapSincePreviousActivitySeconds,
"UNKNOWN_GAP".equals(interval.level())
);
}
public OperatingPeriodActivityIntervalDto withTime(OffsetDateTime newStartedAt, OffsetDateTime newEndedAt, boolean clipped) {
return new OperatingPeriodActivityIntervalDto(
driverId,
vehicleId,
vehicleRegistrationId,
activityType,
cardSlot,
cardStatus,
drivingStatus,
sourceKind,
newStartedAt,
newEndedAt,
Duration.between(newStartedAt, newEndedAt).getSeconds(),
sourceRowId,
sourceRowIds,
clipped,
level,
operatingPeriodNo,
operatingPeriodStartedAt,
newOperatingPeriod,
gapSincePreviousActivitySeconds,
synthetic
);
}
public OperatingPeriodActivityIntervalDto asMerged(OffsetDateTime newEndedAt, List<String> mergedSourceRowIds) {
return new OperatingPeriodActivityIntervalDto(
driverId,
vehicleId,
vehicleRegistrationId,
activityType,
cardSlot,
cardStatus,
drivingStatus,
sourceKind,
startedAt,
newEndedAt,
Duration.between(startedAt, newEndedAt).getSeconds(),
sourceRowId,
mergedSourceRowIds,
clippedToRequestedPeriod,
"MERGED_ACTIVITY",
operatingPeriodNo,
operatingPeriodStartedAt,
newOperatingPeriod,
gapSincePreviousActivitySeconds,
synthetic
);
}
}

View File

@ -0,0 +1,18 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
public record OperatingPeriodDto(
long operatingPeriodNo,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
String closedBy,
long drivingSeconds,
long workSeconds,
long availabilitySeconds,
long unknownSeconds,
int intervalCount,
boolean clippedToRequestedPeriod
) {
}

View File

@ -0,0 +1,12 @@
package at.procon.eventhub.esperpoc.service;
import java.time.OffsetDateTime;
record EsperClosedOperatingPeriod(
long operatingPeriodNo,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
String closedBy
) {
}

View File

@ -0,0 +1,257 @@
package at.procon.eventhub.esperpoc.service;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto;
import com.espertech.esper.common.client.EPCompiled;
import com.espertech.esper.common.client.EventBean;
import com.espertech.esper.common.client.configuration.Configuration;
import com.espertech.esper.compiler.client.CompilerArguments;
import com.espertech.esper.compiler.client.EPCompileException;
import com.espertech.esper.compiler.client.EPCompilerProvider;
import com.espertech.esper.runtime.client.EPDeployException;
import com.espertech.esper.runtime.client.EPDeployment;
import com.espertech.esper.runtime.client.EPRuntime;
import com.espertech.esper.runtime.client.EPRuntimeProvider;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.stereotype.Component;
@Component
public class EsperOperatingPeriodEngine {
private static final AtomicLong RUNTIME_COUNTER = new AtomicLong();
private static final String INPUT_STREAM_EPL = """
@name('operatingPeriodIntervalStream')
select * from OperatingPeriodIntervalInputEvent
""";
public EsperOperatingPeriodEvaluation evaluate(
List<ActivityIntervalDto> intervals,
Duration operatingSplitIdleThreshold
) {
List<ActivityIntervalDto> sorted = sortedPositiveIntervals(intervals);
if (sorted.isEmpty()) {
return new EsperOperatingPeriodEvaluation(List.of(), List.of());
}
PeriodizationCollector collector = new PeriodizationCollector(operatingSplitIdleThreshold);
executeWithRuntime(
configuration -> configuration.getCommon().addEventType("OperatingPeriodIntervalInputEvent", EsperOperatingPeriodIntervalInputEvent.class),
INPUT_STREAM_EPL,
"operatingPeriodIntervalStream",
newData -> collectInputIntervals(newData, collector),
runtime -> {
for (ActivityIntervalDto interval : sorted) {
runtime.getEventService().sendEventBean(toInputEvent(interval), "OperatingPeriodIntervalInputEvent");
}
}
);
return collector.finish();
}
private void executeWithRuntime(
java.util.function.Consumer<Configuration> configurationSetup,
String epl,
String statementName,
java.util.function.Consumer<EventBean[]> listener,
java.util.function.Consumer<EPRuntime> sender
) {
EPRuntime runtime = null;
try {
Configuration configuration = new Configuration();
configurationSetup.accept(configuration);
String runtimeUri = "eventhub-esper-operating-period-" + RUNTIME_COUNTER.incrementAndGet();
runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration);
CompilerArguments arguments = new CompilerArguments(configuration);
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments);
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
runtime.getDeploymentService()
.getStatement(deployment.getDeploymentId(), statementName)
.addListener((newData, oldData, statement, rt) -> listener.accept(newData));
sender.accept(runtime);
} catch (EPCompileException | EPDeployException e) {
throw new IllegalStateException("Cannot compile/deploy Esper operating-period EPL", e);
} finally {
if (runtime != null) {
runtime.destroy();
}
}
}
private void collectInputIntervals(EventBean[] newData, PeriodizationCollector collector) {
if (newData == null) {
return;
}
for (EventBean event : newData) {
collector.accept((EsperOperatingPeriodIntervalInputEvent) event.getUnderlying());
}
}
private EsperOperatingPeriodIntervalInputEvent toInputEvent(ActivityIntervalDto interval) {
return new EsperOperatingPeriodIntervalInputEvent(
interval.driverEntityId(),
interval.vehicleId(),
interval.vehicleRegistrationId(),
interval.activityType(),
interval.cardSlot(),
interval.cardStatus(),
interval.drivingStatus(),
interval.sourceKind(),
interval.startedAt().toInstant().toEpochMilli(),
interval.endedAt().toInstant().toEpochMilli(),
interval.durationSeconds() * 1000L,
interval.sourceRowId(),
interval.sourceRowIds(),
interval.clippedToRequestedPeriod(),
"UNKNOWN_GAP".equals(interval.level())
);
}
private List<ActivityIntervalDto> sortedPositiveIntervals(List<ActivityIntervalDto> intervals) {
if (intervals == null || intervals.isEmpty()) {
return List.of();
}
return intervals.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt)
.thenComparing(ActivityIntervalDto::endedAt)
.thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo)))
.toList();
}
public record EsperOperatingPeriodEvaluation(
List<OperatingPeriodActivityIntervalDto> periodizedIntervals,
List<EsperClosedOperatingPeriod> closedPeriods
) {
}
private static final class PeriodizationCollector {
private final Duration operatingSplitIdleThreshold;
private final List<OperatingPeriodActivityIntervalDto> periodizedIntervals = new ArrayList<>();
private final List<EsperClosedOperatingPeriod> closedPeriods = new ArrayList<>();
private boolean hasOpenPeriod;
private long operatingPeriodNo;
private OffsetDateTime operatingPeriodStartedAt;
private OffsetDateTime lastKnownActivityEndAt;
private PeriodizationCollector(Duration operatingSplitIdleThreshold) {
this.operatingSplitIdleThreshold = operatingSplitIdleThreshold;
}
private void accept(EsperOperatingPeriodIntervalInputEvent interval) {
ActivityIntervalDto dto = new ActivityIntervalDto(
interval.driverId(),
interval.vehicleId(),
interval.vehicleRegistrationId(),
interval.activityType(),
interval.cardSlot(),
interval.cardStatus(),
interval.drivingStatus(),
interval.sourceKind(),
OffsetDateTime.ofInstant(java.time.Instant.ofEpochMilli(interval.startTs()), java.time.ZoneOffset.UTC),
OffsetDateTime.ofInstant(java.time.Instant.ofEpochMilli(interval.endTs()), java.time.ZoneOffset.UTC),
interval.durationMs() / 1000L,
interval.sourceRowId(),
interval.sourceRowIds(),
interval.clippedToRequestedPeriod(),
interval.synthetic() ? "UNKNOWN_GAP" : "RAW_INTERVAL"
);
if ("UNKNOWN".equals(dto.activityType())) {
if (!hasOpenPeriod) {
return;
}
if (dto.durationSeconds() >= operatingSplitIdleThreshold.getSeconds()) {
closeCurrent("UNKNOWN_GAP");
return;
}
periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized(
dto,
operatingPeriodNo,
operatingPeriodStartedAt,
false,
Math.max(0, Duration.between(lastKnownActivityEndAt, dto.startedAt()).getSeconds())
));
return;
}
if (!hasOpenPeriod) {
operatingPeriodNo = operatingPeriodNo < 1 ? 1 : operatingPeriodNo + 1;
hasOpenPeriod = true;
operatingPeriodStartedAt = dto.startedAt();
lastKnownActivityEndAt = dto.endedAt();
periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized(
dto,
operatingPeriodNo,
operatingPeriodStartedAt,
true,
null
));
return;
}
long gapSeconds = Math.max(0, Duration.between(lastKnownActivityEndAt, dto.startedAt()).getSeconds());
if (gapSeconds >= operatingSplitIdleThreshold.getSeconds()) {
closeCurrent("IDLE_GAP");
operatingPeriodNo++;
hasOpenPeriod = true;
operatingPeriodStartedAt = dto.startedAt();
lastKnownActivityEndAt = dto.endedAt();
periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized(
dto,
operatingPeriodNo,
operatingPeriodStartedAt,
true,
gapSeconds
));
return;
}
periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized(
dto,
operatingPeriodNo,
operatingPeriodStartedAt,
false,
gapSeconds
));
if (dto.endedAt().isAfter(lastKnownActivityEndAt)) {
lastKnownActivityEndAt = dto.endedAt();
}
}
private EsperOperatingPeriodEvaluation finish() {
if (hasOpenPeriod) {
closeCurrent("FLUSH");
}
return new EsperOperatingPeriodEvaluation(
periodizedIntervals.stream()
.sorted(Comparator.comparing(OperatingPeriodActivityIntervalDto::startedAt)
.thenComparing(OperatingPeriodActivityIntervalDto::endedAt)
.thenComparing(OperatingPeriodActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo)))
.toList(),
closedPeriods.stream()
.sorted(Comparator.comparing(EsperClosedOperatingPeriod::startedAt))
.toList()
);
}
private void closeCurrent(String closedBy) {
if (!hasOpenPeriod || operatingPeriodStartedAt == null || lastKnownActivityEndAt == null) {
hasOpenPeriod = false;
return;
}
closedPeriods.add(new EsperClosedOperatingPeriod(
operatingPeriodNo,
operatingPeriodStartedAt,
lastKnownActivityEndAt,
Duration.between(operatingPeriodStartedAt, lastKnownActivityEndAt).getSeconds(),
closedBy
));
hasOpenPeriod = false;
}
}
}

View File

@ -0,0 +1,694 @@
package at.procon.eventhub.esperpoc.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
import at.procon.eventhub.esperpoc.dto.NonDrivingIntervalDto;
import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.OperatingPeriodDto;
import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
import at.procon.eventhub.esperpoc.persistence.EsperPocActivityRepository;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class EsperOperatingPeriodEvaluationService {
private static final Logger log = LoggerFactory.getLogger(EsperOperatingPeriodEvaluationService.class);
private final EsperPocActivityRepository activityRepository;
private final EsperDriverActivityEngine activityEngine;
private final EsperOperatingPeriodEngine operatingPeriodEngine;
private final EventHubProperties properties;
public EsperOperatingPeriodEvaluationService(
EsperPocActivityRepository activityRepository,
EsperDriverActivityEngine activityEngine,
EsperOperatingPeriodEngine operatingPeriodEngine
) {
this(activityRepository, activityEngine, operatingPeriodEngine, null);
}
@Autowired
public EsperOperatingPeriodEvaluationService(
EsperPocActivityRepository activityRepository,
EsperDriverActivityEngine activityEngine,
EsperOperatingPeriodEngine operatingPeriodEngine,
EventHubProperties properties
) {
this.activityRepository = activityRepository;
this.activityEngine = activityEngine;
this.operatingPeriodEngine = operatingPeriodEngine;
this.properties = properties;
}
public EsperOperatingPeriodResultDto evaluate(EsperOperatingPeriodRequest request) {
long startedNanos = System.nanoTime();
OffsetDateTime requestedFrom = utc(request.occurredFrom());
OffsetDateTime requestedTo = utc(request.occurredTo());
OffsetDateTime loadedFrom = requestedFrom.minusHours(request.guardHours());
OffsetDateTime loadedTo = requestedTo.plusHours(request.guardHours());
Duration splitIdleThreshold = Duration.ofHours(resolveOperatingSplitIdleHours(request));
Duration significantDrivingThreshold = Duration.ofMinutes(resolveSignificantDrivingMinutes(request));
Duration mergeGapTolerance = Duration.ofSeconds(resolveMergeGapSeconds(request));
Duration gapDetectionTolerance = Duration.ofSeconds(resolveGapDetectionToleranceSeconds(request));
EsperUnknownTreatmentMode unknownTreatmentMode = resolveUnknownTreatmentMode(request);
long dbStartedNanos = System.nanoTime();
List<RawActivityEventDto> rawEvents = activityRepository.findDriverActivityEvents(
request.tenantKey(),
request.driverId(),
loadedFrom,
loadedTo
);
long dbElapsedMs = elapsedMillis(dbStartedNanos);
List<RawActivityEventDto> driverCardRawEvents = rawEvents.stream()
.filter(event -> "DRIVER_CARD".equals(event.sourceKind()))
.toList();
List<RawActivityEventDto> vehicleUnitRawEvents = rawEvents.stream()
.filter(event -> "VEHICLE_UNIT".equals(event.sourceKind()))
.toList();
long cardIntervalsStartedNanos = System.nanoTime();
List<ActivityIntervalDto> driverCardRawIntervals = activityEngine.buildIntervals(driverCardRawEvents);
long cardIntervalsElapsedMs = elapsedMillis(cardIntervalsStartedNanos);
long vuIntervalsStartedNanos = System.nanoTime();
List<ActivityIntervalDto> vehicleUnitRawIntervals = activityEngine.buildIntervals(vehicleUnitRawEvents);
long vuIntervalsElapsedMs = elapsedMillis(vuIntervalsStartedNanos);
long vuGapFillStartedNanos = System.nanoTime();
List<ActivityIntervalDto> resolvedKnownLoadedIntervals = resolveVuFillGaps(driverCardRawIntervals, vehicleUnitRawIntervals);
long vuGapFillElapsedMs = elapsedMillis(vuGapFillStartedNanos);
long unknownGapStartedNanos = System.nanoTime();
List<ActivityIntervalDto> evaluationLoadedIntervals = synthesizeUnknownGaps(
resolvedKnownLoadedIntervals,
gapDetectionTolerance
);
long unknownGapElapsedMs = elapsedMillis(unknownGapStartedNanos);
long periodizeStartedNanos = System.nanoTime();
EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate(
evaluationLoadedIntervals,
splitIdleThreshold
);
long periodizeElapsedMs = elapsedMillis(periodizeStartedNanos);
long mergeStartedNanos = System.nanoTime();
List<OperatingPeriodActivityIntervalDto> mergedLoadedIntervals = mergeConsecutiveActivities(
evaluation.periodizedIntervals(),
mergeGapTolerance
);
long mergeElapsedMs = elapsedMillis(mergeStartedNanos);
long nonDrivingStartedNanos = System.nanoTime();
List<NonDrivingIntervalDto> nonDrivingLoadedIntervals = buildNonDrivingIntervals(
mergedLoadedIntervals,
significantDrivingThreshold,
unknownTreatmentMode
);
long nonDrivingElapsedMs = elapsedMillis(nonDrivingStartedNanos);
List<OperatingPeriodActivityIntervalDto> periodizedIntervals = clipOperatingIntervals(
evaluation.periodizedIntervals(),
requestedFrom,
requestedTo
);
List<OperatingPeriodActivityIntervalDto> mergedIntervals = clipOperatingIntervals(
mergedLoadedIntervals,
requestedFrom,
requestedTo
);
List<NonDrivingIntervalDto> nonDrivingIntervals = clipNonDrivingIntervals(
nonDrivingLoadedIntervals,
requestedFrom,
requestedTo
);
List<OperatingPeriodDto> operatingPeriods = buildOperatingPeriods(
evaluation.closedPeriods(),
periodizedIntervals,
requestedFrom,
requestedTo
);
long totalElapsedMs = elapsedMillis(startedNanos);
log.info("Esper operating-period evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} unknownMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedKnownIntervals={} evaluationIntervals={} periodizedIntervals={} mergedIntervals={} nonDrivingIntervals={} operatingPeriods={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, synthUnknown={}, periodizeEsper={}, merge={}, nonDriving={}, total={}}}",
request.tenantKey(),
request.driverId(),
requestedFrom,
requestedTo,
loadedFrom,
loadedTo,
unknownTreatmentMode,
rawEvents.size(),
driverCardRawEvents.size(),
vehicleUnitRawEvents.size(),
driverCardRawIntervals.size(),
vehicleUnitRawIntervals.size(),
resolvedKnownLoadedIntervals.size(),
evaluationLoadedIntervals.size(),
periodizedIntervals.size(),
mergedIntervals.size(),
nonDrivingIntervals.size(),
operatingPeriods.size(),
dbElapsedMs,
cardIntervalsElapsedMs,
vuIntervalsElapsedMs,
vuGapFillElapsedMs,
unknownGapElapsedMs,
periodizeElapsedMs,
mergeElapsedMs,
nonDrivingElapsedMs,
totalElapsedMs);
return new EsperOperatingPeriodResultDto(
request.tenantKey(),
request.driverId(),
requestedFrom,
requestedTo,
loadedFrom,
loadedTo,
rawEvents.size(),
driverCardRawEvents.size(),
vehicleUnitRawEvents.size(),
driverCardRawIntervals.size(),
vehicleUnitRawIntervals.size(),
resolvedKnownLoadedIntervals.size(),
evaluationLoadedIntervals.size(),
periodizedIntervals.size(),
mergedIntervals.size(),
nonDrivingIntervals.size(),
operatingPeriods.size(),
resolveOperatingSplitIdleHours(request),
resolveSignificantDrivingMinutes(request),
resolveMergeGapSeconds(request),
resolveGapDetectionToleranceSeconds(request),
unknownTreatmentMode,
rawEvents,
resolvedKnownLoadedIntervals,
evaluationLoadedIntervals,
periodizedIntervals,
mergedIntervals,
nonDrivingIntervals,
operatingPeriods,
notes(
unknownTreatmentMode,
resolveOperatingSplitIdleHours(request),
resolveSignificantDrivingMinutes(request),
resolveGapDetectionToleranceSeconds(request)
)
);
}
List<ActivityIntervalDto> synthesizeUnknownGaps(
List<ActivityIntervalDto> resolvedKnownLoadedIntervals,
Duration gapDetectionTolerance
) {
List<ActivityIntervalDto> allKnown = sortedPositiveIntervals(resolvedKnownLoadedIntervals);
List<ActivityIntervalDto> nonRestActivities = allKnown.stream()
.filter(interval -> !"BREAK_REST".equals(interval.activityType()))
.toList();
if (nonRestActivities.isEmpty()) {
return List.of();
}
List<ActivityIntervalDto> result = new ArrayList<>();
for (int index = 0; index < nonRestActivities.size(); index++) {
ActivityIntervalDto current = nonRestActivities.get(index);
result.add(current);
if (index + 1 >= nonRestActivities.size()) {
continue;
}
ActivityIntervalDto next = nonRestActivities.get(index + 1);
if (!next.startedAt().isAfter(current.endedAt())) {
continue;
}
OffsetDateTime gapStart = current.endedAt();
OffsetDateTime gapEnd = next.startedAt();
List<ActivityIntervalDto> uncoveredGapSegments = subtractCoverage(
unknownGapTemplate(current, next, gapStart, gapEnd),
allKnown
);
for (ActivityIntervalDto gap : uncoveredGapSegments) {
if (gap.durationSeconds() > gapDetectionTolerance.getSeconds()) {
result.add(gap);
}
}
}
return result.stream()
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt)
.thenComparing(ActivityIntervalDto::endedAt)
.thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo)))
.toList();
}
List<OperatingPeriodActivityIntervalDto> mergeConsecutiveActivities(
List<OperatingPeriodActivityIntervalDto> intervals,
Duration mergeGapTolerance
) {
if (intervals == null || intervals.isEmpty()) {
return List.of();
}
List<OperatingPeriodActivityIntervalDto> sorted = intervals.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(OperatingPeriodActivityIntervalDto::startedAt)
.thenComparing(OperatingPeriodActivityIntervalDto::endedAt)
.thenComparing(OperatingPeriodActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo)))
.toList();
List<OperatingPeriodActivityIntervalDto> result = new ArrayList<>();
OperatingPeriodActivityIntervalDto current = null;
List<String> currentSources = new ArrayList<>();
for (OperatingPeriodActivityIntervalDto next : sorted) {
if (current == null) {
current = next;
currentSources = new ArrayList<>(next.sourceRowIds());
continue;
}
if (canMerge(current, next, mergeGapTolerance)) {
currentSources.addAll(next.sourceRowIds());
current = current.asMerged(max(current.endedAt(), next.endedAt()), List.copyOf(currentSources));
} else {
result.add(current.asMerged(current.endedAt(), List.copyOf(currentSources)));
current = next;
currentSources = new ArrayList<>(next.sourceRowIds());
}
}
if (current != null) {
result.add(current.asMerged(current.endedAt(), List.copyOf(currentSources)));
}
return result;
}
List<NonDrivingIntervalDto> buildNonDrivingIntervals(
List<OperatingPeriodActivityIntervalDto> mergedIntervals,
Duration significantDrivingThreshold,
EsperUnknownTreatmentMode unknownTreatmentMode
) {
if (mergedIntervals == null || mergedIntervals.isEmpty()) {
return List.of();
}
List<NonDrivingIntervalDto> result = new ArrayList<>();
NonDrivingAccumulator open = null;
for (OperatingPeriodActivityIntervalDto interval : mergedIntervals) {
if (open != null && open.operatingPeriodNo != interval.operatingPeriodNo()) {
result.add(open.close("NEW_OPERATING_PERIOD"));
open = null;
}
if (startsOrExtendsNonDriving(interval, unknownTreatmentMode)) {
open = open == null ? NonDrivingAccumulator.open(interval) : open.extend(interval);
continue;
}
if ("UNKNOWN".equals(interval.activityType()) && unknownTreatmentMode == EsperUnknownTreatmentMode.AS_UNKNOWN_NON_BREAK) {
if (open != null) {
result.add(open.close("UNKNOWN_GAP"));
open = null;
}
continue;
}
if ("DRIVE".equals(interval.activityType())) {
if (interval.durationSeconds() >= significantDrivingThreshold.getSeconds()) {
if (open != null) {
result.add(open.close("SIGNIFICANT_DRIVING"));
open = null;
}
} else if (open != null) {
open = open.extend(interval);
}
}
}
if (open != null) {
result.add(open.close("FLUSH"));
}
return result;
}
private boolean startsOrExtendsNonDriving(
OperatingPeriodActivityIntervalDto interval,
EsperUnknownTreatmentMode unknownTreatmentMode
) {
if ("WORK".equals(interval.activityType()) || "AVAILABILITY".equals(interval.activityType())) {
return true;
}
return "UNKNOWN".equals(interval.activityType())
&& unknownTreatmentMode == EsperUnknownTreatmentMode.AS_BREAK_REST;
}
private boolean canMerge(
OperatingPeriodActivityIntervalDto left,
OperatingPeriodActivityIntervalDto right,
Duration mergeGapTolerance
) {
long gapSeconds = Duration.between(left.endedAt(), right.startedAt()).getSeconds();
return Objects.equals(left.driverId(), right.driverId())
&& left.operatingPeriodNo() == right.operatingPeriodNo()
&& Objects.equals(left.activityType(), right.activityType())
&& Objects.equals(left.cardSlot(), right.cardSlot())
&& Objects.equals(left.cardStatus(), right.cardStatus())
&& Objects.equals(left.drivingStatus(), right.drivingStatus())
&& Objects.equals(left.sourceKind(), right.sourceKind())
&& left.synthetic() == right.synthetic()
&& gapSeconds <= mergeGapTolerance.getSeconds();
}
private List<OperatingPeriodDto> buildOperatingPeriods(
List<EsperClosedOperatingPeriod> closedPeriods,
List<OperatingPeriodActivityIntervalDto> clippedPeriodizedIntervals,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo
) {
Map<Long, List<OperatingPeriodActivityIntervalDto>> intervalsByPeriod = new LinkedHashMap<>();
for (OperatingPeriodActivityIntervalDto interval : clippedPeriodizedIntervals) {
intervalsByPeriod.computeIfAbsent(interval.operatingPeriodNo(), ignored -> new ArrayList<>()).add(interval);
}
List<OperatingPeriodDto> result = new ArrayList<>();
for (EsperClosedOperatingPeriod closedPeriod : closedPeriods) {
if (!closedPeriod.endedAt().isAfter(requestedFrom) || !closedPeriod.startedAt().isBefore(requestedTo)) {
continue;
}
OffsetDateTime start = max(closedPeriod.startedAt(), requestedFrom);
OffsetDateTime end = min(closedPeriod.endedAt(), requestedTo);
if (!end.isAfter(start)) {
continue;
}
List<OperatingPeriodActivityIntervalDto> intervals = intervalsByPeriod.getOrDefault(closedPeriod.operatingPeriodNo(), List.of());
long drivingSeconds = sumActivitySeconds(intervals, "DRIVE");
long workSeconds = sumActivitySeconds(intervals, "WORK");
long availabilitySeconds = sumActivitySeconds(intervals, "AVAILABILITY");
long unknownSeconds = sumActivitySeconds(intervals, "UNKNOWN");
result.add(new OperatingPeriodDto(
closedPeriod.operatingPeriodNo(),
start,
end,
Duration.between(start, end).getSeconds(),
closedPeriod.closedBy(),
drivingSeconds,
workSeconds,
availabilitySeconds,
unknownSeconds,
intervals.size(),
!start.equals(closedPeriod.startedAt()) || !end.equals(closedPeriod.endedAt())
));
}
return result;
}
private long sumActivitySeconds(List<OperatingPeriodActivityIntervalDto> intervals, String activityType) {
return intervals.stream()
.filter(interval -> activityType.equals(interval.activityType()))
.mapToLong(OperatingPeriodActivityIntervalDto::durationSeconds)
.sum();
}
private List<OperatingPeriodActivityIntervalDto> clipOperatingIntervals(
List<OperatingPeriodActivityIntervalDto> intervals,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo
) {
return intervals.stream()
.map(interval -> {
OffsetDateTime start = max(interval.startedAt(), requestedFrom);
OffsetDateTime end = min(interval.endedAt(), requestedTo);
if (!end.isAfter(start)) {
return null;
}
boolean clipped = interval.clippedToRequestedPeriod()
|| !start.equals(interval.startedAt())
|| !end.equals(interval.endedAt());
return interval.withTime(start, end, clipped);
})
.filter(Objects::nonNull)
.toList();
}
private List<NonDrivingIntervalDto> clipNonDrivingIntervals(
List<NonDrivingIntervalDto> intervals,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo
) {
return intervals.stream()
.map(interval -> {
OffsetDateTime start = max(interval.startedAt(), requestedFrom);
OffsetDateTime end = min(interval.endedAt(), requestedTo);
if (!end.isAfter(start)) {
return null;
}
return new NonDrivingIntervalDto(
interval.driverId(),
interval.vehicleId(),
interval.vehicleRegistrationId(),
interval.cardSlot(),
start,
end,
Duration.between(start, end).getSeconds(),
interval.operatingPeriodNo(),
interval.operatingPeriodStartedAt(),
interval.closedBy(),
interval.containsUnknown(),
interval.unknownSeconds(),
!start.equals(interval.startedAt()) || !end.equals(interval.endedAt())
);
})
.filter(Objects::nonNull)
.toList();
}
private ActivityIntervalDto unknownGapTemplate(
ActivityIntervalDto previous,
ActivityIntervalDto next,
OffsetDateTime gapStart,
OffsetDateTime gapEnd
) {
return new ActivityIntervalDto(
previous.driverEntityId(),
Objects.equals(previous.vehicleId(), next.vehicleId()) ? previous.vehicleId() : null,
Objects.equals(previous.vehicleRegistrationId(), next.vehicleRegistrationId()) ? previous.vehicleRegistrationId() : null,
"UNKNOWN",
Objects.equals(previous.cardSlot(), next.cardSlot()) ? previous.cardSlot() : null,
previous.cardStatus(),
"UNKNOWN",
"SYNTHETIC_GAP",
gapStart,
gapEnd,
Duration.between(gapStart, gapEnd).getSeconds(),
null,
List.of(),
false,
"UNKNOWN_GAP"
);
}
private List<ActivityIntervalDto> resolveVuFillGaps(
List<ActivityIntervalDto> driverCardRawIntervals,
List<ActivityIntervalDto> vehicleUnitRawIntervals
) {
List<ActivityIntervalDto> driverCard = sortedPositiveIntervals(driverCardRawIntervals);
List<ActivityIntervalDto> vehicleUnit = sortedPositiveIntervals(vehicleUnitRawIntervals);
if (driverCard.isEmpty()) {
return vehicleUnit;
}
if (vehicleUnit.isEmpty()) {
return driverCard;
}
List<ActivityIntervalDto> resolved = new ArrayList<>(driverCard);
for (ActivityIntervalDto vuInterval : vehicleUnit) {
resolved.addAll(subtractCoverage(vuInterval, driverCard));
}
return resolved.stream()
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt)
.thenComparing(ActivityIntervalDto::endedAt)
.thenComparing(ActivityIntervalDto::sourceKind, Comparator.nullsLast(String::compareTo)))
.toList();
}
private List<ActivityIntervalDto> subtractCoverage(
ActivityIntervalDto candidate,
List<ActivityIntervalDto> coverage
) {
List<ActivityIntervalDto> result = new ArrayList<>();
OffsetDateTime cursor = candidate.startedAt();
for (ActivityIntervalDto covered : coverage) {
if (!covered.endedAt().isAfter(cursor)) {
continue;
}
if (!covered.startedAt().isBefore(candidate.endedAt())) {
break;
}
OffsetDateTime overlapStart = max(cursor, covered.startedAt());
if (overlapStart.isAfter(cursor)) {
result.add(candidate.withTime(cursor, overlapStart, candidate.clippedToRequestedPeriod()));
}
if (covered.endedAt().isAfter(cursor)) {
cursor = max(cursor, covered.endedAt());
}
if (!candidate.endedAt().isAfter(cursor)) {
break;
}
}
if (candidate.endedAt().isAfter(cursor)) {
result.add(candidate.withTime(cursor, candidate.endedAt(), candidate.clippedToRequestedPeriod()));
}
return result.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.toList();
}
private List<ActivityIntervalDto> sortedPositiveIntervals(List<ActivityIntervalDto> intervals) {
if (intervals == null || intervals.isEmpty()) {
return List.of();
}
return intervals.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt)
.thenComparing(ActivityIntervalDto::endedAt)
.thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo)))
.toList();
}
private int resolveOperatingSplitIdleHours(EsperOperatingPeriodRequest request) {
if (request.operatingSplitIdleHours() != null) {
return request.operatingSplitIdleHours();
}
return properties == null ? 7 : properties.getEsperPoc().getOperatingPeriodEvaluation().getOperatingSplitIdleHours();
}
private int resolveSignificantDrivingMinutes(EsperOperatingPeriodRequest request) {
if (request.significantDrivingMinutes() != null) {
return request.significantDrivingMinutes();
}
return properties == null ? 3 : properties.getEsperPoc().getOperatingPeriodEvaluation().getSignificantDrivingMinutes();
}
private int resolveMergeGapSeconds(EsperOperatingPeriodRequest request) {
if (request.mergeGapSeconds() != null) {
return request.mergeGapSeconds();
}
return properties == null ? 0 : properties.getEsperPoc().getOperatingPeriodEvaluation().getMergeGapSeconds();
}
private int resolveGapDetectionToleranceSeconds(EsperOperatingPeriodRequest request) {
if (request.gapDetectionToleranceSeconds() != null) {
return request.gapDetectionToleranceSeconds();
}
return properties == null ? 0 : properties.getEsperPoc().getOperatingPeriodEvaluation().getGapDetectionToleranceSeconds();
}
private EsperUnknownTreatmentMode resolveUnknownTreatmentMode(EsperOperatingPeriodRequest request) {
if (request.unknownTreatmentMode() != null) {
return request.unknownTreatmentMode();
}
return properties == null
? EsperUnknownTreatmentMode.AS_BREAK_REST
: properties.getEsperPoc().getOperatingPeriodEvaluation().getUnknownTreatmentMode();
}
private List<String> notes(
EsperUnknownTreatmentMode unknownTreatmentMode,
int operatingSplitIdleHours,
int significantDrivingMinutes,
int gapDetectionToleranceSeconds
) {
return List.of(
"This endpoint runs in parallel to the existing working-shift PoC and does not change its semantics.",
"BREAK_REST events are ignored for activity evaluation but still prevent synthetic UNKNOWN intervals from being created over covered rest spans.",
"Synthetic UNKNOWN intervals are created only for uncovered gaps between non-rest activities.",
"UNKNOWN treatment mode is " + unknownTreatmentMode + ".",
"Operating periods split after " + operatingSplitIdleHours + " hours of no non-rest activity; significant driving closes non-driving intervals from " + significantDrivingMinutes + " minutes onward.",
"Synthetic UNKNOWN gaps are only emitted when uncovered time exceeds " + gapDetectionToleranceSeconds + " seconds."
);
}
private OffsetDateTime utc(OffsetDateTime value) {
return value == null ? null : value.withOffsetSameInstant(java.time.ZoneOffset.UTC);
}
private OffsetDateTime max(OffsetDateTime left, OffsetDateTime right) {
return left.isAfter(right) ? left : right;
}
private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) {
return left.isBefore(right) ? left : right;
}
private long elapsedMillis(long startedNanos) {
return Math.round((System.nanoTime() - startedNanos) / 1_000_000.0d);
}
private static final class NonDrivingAccumulator {
private final UUID driverId;
private UUID vehicleId;
private UUID vehicleRegistrationId;
private final String cardSlot;
private final OffsetDateTime startedAt;
private OffsetDateTime endedAt;
private final long operatingPeriodNo;
private final OffsetDateTime operatingPeriodStartedAt;
private boolean containsUnknown;
private long unknownSeconds;
private NonDrivingAccumulator(OperatingPeriodActivityIntervalDto interval) {
this.driverId = interval.driverId();
this.vehicleId = interval.vehicleId();
this.vehicleRegistrationId = interval.vehicleRegistrationId();
this.cardSlot = interval.cardSlot();
this.startedAt = interval.startedAt();
this.endedAt = interval.endedAt();
this.operatingPeriodNo = interval.operatingPeriodNo();
this.operatingPeriodStartedAt = interval.operatingPeriodStartedAt();
this.containsUnknown = "UNKNOWN".equals(interval.activityType());
this.unknownSeconds = "UNKNOWN".equals(interval.activityType()) ? interval.durationSeconds() : 0L;
}
private static NonDrivingAccumulator open(OperatingPeriodActivityIntervalDto interval) {
return new NonDrivingAccumulator(interval);
}
private NonDrivingAccumulator extend(OperatingPeriodActivityIntervalDto interval) {
if (interval.vehicleId() != null) {
this.vehicleId = interval.vehicleId();
}
if (interval.vehicleRegistrationId() != null) {
this.vehicleRegistrationId = interval.vehicleRegistrationId();
}
if (interval.endedAt().isAfter(this.endedAt)) {
this.endedAt = interval.endedAt();
}
if ("UNKNOWN".equals(interval.activityType())) {
this.containsUnknown = true;
this.unknownSeconds += interval.durationSeconds();
}
return this;
}
private NonDrivingIntervalDto close(String closedBy) {
return new NonDrivingIntervalDto(
driverId,
vehicleId,
vehicleRegistrationId,
cardSlot,
startedAt,
endedAt,
Duration.between(startedAt, endedAt).getSeconds(),
operatingPeriodNo,
operatingPeriodStartedAt,
closedBy,
containsUnknown,
unknownSeconds,
false
);
}
}
}

View File

@ -0,0 +1,23 @@
package at.procon.eventhub.esperpoc.service;
import java.util.List;
import java.util.UUID;
public record EsperOperatingPeriodIntervalInputEvent(
UUID driverId,
UUID vehicleId,
UUID vehicleRegistrationId,
String activityType,
String cardSlot,
String cardStatus,
String drivingStatus,
String sourceKind,
long startTs,
long endTs,
long durationMs,
String sourceRowId,
List<String> sourceRowIds,
boolean clippedToRequestedPeriod,
boolean synthetic
) {
}

View File

@ -123,6 +123,12 @@ eventhub:
esper-poc:
activity-merge-mode: JAVA
shift-resolution-mode: JAVA
operating-period-evaluation:
operating-split-idle-hours: 7
significant-driving-minutes: 3
merge-gap-seconds: 0
gap-detection-tolerance-seconds: 0
unknown-treatment-mode: AS_BREAK_REST
yellow-fox:
default-chunk-days: 1

View File

@ -0,0 +1,154 @@
package at.procon.eventhub.esperpoc.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
import at.procon.eventhub.esperpoc.dto.NonDrivingIntervalDto;
import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class EsperOperatingPeriodEvaluationServiceTest {
private final EsperOperatingPeriodEngine operatingPeriodEngine = new EsperOperatingPeriodEngine();
private final EsperOperatingPeriodEvaluationService service = new EsperOperatingPeriodEvaluationService(
null,
new EsperDriverActivityEngine(),
operatingPeriodEngine
);
@Test
void synthesizeUnknownGapsSkipsCoveredBreakRestAndCreatesOnlyUncoveredUnknown() {
UUID driverId = UUID.randomUUID();
List<ActivityIntervalDto> intervals = List.of(
activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"),
activity(driverId, "BREAK_REST", "2026-04-01T09:00:00Z", "2026-04-01T10:00:00Z", "r1", "DRIVER_CARD"),
activity(driverId, "AVAILABILITY", "2026-04-01T10:00:00Z", "2026-04-01T11:00:00Z", "a1", "DRIVER_CARD"),
activity(driverId, "WORK", "2026-04-01T12:00:00Z", "2026-04-01T13:00:00Z", "w2", "DRIVER_CARD")
);
List<ActivityIntervalDto> evaluationIntervals = service.synthesizeUnknownGaps(intervals, Duration.ZERO);
assertThat(evaluationIntervals).extracting(ActivityIntervalDto::activityType)
.containsExactly("WORK", "AVAILABILITY", "UNKNOWN", "WORK");
assertThat(evaluationIntervals.get(2).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z"));
assertThat(evaluationIntervals.get(2).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T12:00:00Z"));
}
@Test
void periodizationSplitsAfterLongUnknownGapAndShortDrivingDoesNotCloseNonDriving() {
UUID driverId = UUID.randomUUID();
List<ActivityIntervalDto> evaluationIntervals = List.of(
activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"),
activity(driverId, "AVAILABILITY", "2026-04-01T10:00:00Z", "2026-04-01T11:00:00Z", "a1", "DRIVER_CARD"),
activity(driverId, "DRIVE", "2026-04-01T11:00:00Z", "2026-04-01T11:02:00Z", "d1", "DRIVER_CARD"),
activity(driverId, "WORK", "2026-04-01T11:02:00Z", "2026-04-01T12:00:00Z", "w2", "DRIVER_CARD"),
unknown(driverId, "2026-04-01T12:00:00Z", "2026-04-01T20:00:00Z"),
activity(driverId, "DRIVE", "2026-04-01T20:00:00Z", "2026-04-01T20:30:00Z", "d2", "DRIVER_CARD")
);
EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate(
evaluationIntervals,
Duration.ofHours(7)
);
assertThat(evaluation.periodizedIntervals()).extracting(OperatingPeriodActivityIntervalDto::activityType)
.containsExactly("WORK", "AVAILABILITY", "DRIVE", "WORK", "DRIVE");
assertThat(evaluation.closedPeriods()).hasSize(2);
assertThat(evaluation.closedPeriods().get(0).closedBy()).isEqualTo("UNKNOWN_GAP");
assertThat(evaluation.closedPeriods().get(1).closedBy()).isEqualTo("FLUSH");
List<OperatingPeriodActivityIntervalDto> mergedIntervals = service.mergeConsecutiveActivities(
evaluation.periodizedIntervals(),
Duration.ZERO
);
List<NonDrivingIntervalDto> nonDrivingIntervals = service.buildNonDrivingIntervals(
mergedIntervals,
Duration.ofMinutes(3),
EsperUnknownTreatmentMode.AS_BREAK_REST
);
assertThat(nonDrivingIntervals).hasSize(1);
assertThat(nonDrivingIntervals.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T08:00:00Z"));
assertThat(nonDrivingIntervals.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T12:00:00Z"));
assertThat(nonDrivingIntervals.get(0).closedBy()).isEqualTo("NEW_OPERATING_PERIOD");
}
@Test
void unknownCanCloseNonDrivingWhenConfiguredAsNonBreak() {
UUID driverId = UUID.randomUUID();
List<OperatingPeriodActivityIntervalDto> mergedIntervals = List.of(
periodized(activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"), 1),
periodized(unknown(driverId, "2026-04-01T09:00:00Z", "2026-04-01T09:30:00Z"), 1),
periodized(activity(driverId, "AVAILABILITY", "2026-04-01T09:30:00Z", "2026-04-01T10:00:00Z", "a1", "DRIVER_CARD"), 1)
);
List<NonDrivingIntervalDto> nonDrivingIntervals = service.buildNonDrivingIntervals(
mergedIntervals,
Duration.ofMinutes(3),
EsperUnknownTreatmentMode.AS_UNKNOWN_NON_BREAK
);
assertThat(nonDrivingIntervals).hasSize(2);
assertThat(nonDrivingIntervals.get(0).closedBy()).isEqualTo("UNKNOWN_GAP");
assertThat(nonDrivingIntervals.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T09:00:00Z"));
assertThat(nonDrivingIntervals.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T09:30:00Z"));
}
private ActivityIntervalDto activity(
UUID driverId,
String activity,
String from,
String to,
String sourceRowId,
String sourceKind
) {
return ActivityIntervalDto.raw(
driverId,
null,
null,
activity,
"DRIVER",
"INSERTED",
"KNOWN",
sourceKind,
OffsetDateTime.parse(from),
OffsetDateTime.parse(to),
sourceRowId
);
}
private ActivityIntervalDto unknown(UUID driverId, String from, String to) {
return new ActivityIntervalDto(
driverId,
null,
null,
"UNKNOWN",
"DRIVER",
null,
"UNKNOWN",
"SYNTHETIC_GAP",
OffsetDateTime.parse(from),
OffsetDateTime.parse(to),
Duration.between(OffsetDateTime.parse(from), OffsetDateTime.parse(to)).getSeconds(),
null,
List.of(),
false,
"UNKNOWN_GAP"
);
}
private OperatingPeriodActivityIntervalDto periodized(ActivityIntervalDto interval, long operatingPeriodNo) {
return OperatingPeriodActivityIntervalDto.periodized(
interval,
operatingPeriodNo,
interval.startedAt(),
false,
0L
);
}
}