diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index a132a34..c5fa55a 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -1,6 +1,7 @@ package at.procon.eventhub.config; import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode; +import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode; import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode; import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode; import java.time.Duration; @@ -80,6 +81,7 @@ public class EventHubProperties { private int mergeGapSeconds = 0; private int gapDetectionToleranceSeconds = 0; private EsperUnknownTreatmentMode unknownTreatmentMode = EsperUnknownTreatmentMode.AS_BREAK_REST; + private EsperOperatingPeriodEngineMode engineMode = EsperOperatingPeriodEngineMode.STREAM_COLLECTOR; public int getOperatingSplitIdleHours() { return operatingSplitIdleHours; @@ -122,6 +124,16 @@ public class EventHubProperties { ? EsperUnknownTreatmentMode.AS_BREAK_REST : unknownTreatmentMode; } + + public EsperOperatingPeriodEngineMode getEngineMode() { + return engineMode; + } + + public void setEngineMode(EsperOperatingPeriodEngineMode engineMode) { + this.engineMode = engineMode == null + ? EsperOperatingPeriodEngineMode.STREAM_COLLECTOR + : engineMode; + } } public static class Batch { diff --git a/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java b/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java index 0bd93ab..5315971 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java +++ b/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java @@ -1,6 +1,7 @@ package at.procon.eventhub.esperpoc.api; import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode; +import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode; import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest; import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto; import at.procon.eventhub.esperpoc.dto.EsperPocRequest; @@ -76,7 +77,8 @@ public class EsperPocController { @RequestParam(required = false) Integer significantDrivingMinutes, @RequestParam(required = false) Integer mergeGapSeconds, @RequestParam(required = false) Integer gapDetectionToleranceSeconds, - @RequestParam(required = false) EsperUnknownTreatmentMode unknownTreatmentMode + @RequestParam(required = false) EsperUnknownTreatmentMode unknownTreatmentMode, + @RequestParam(required = false) EsperOperatingPeriodEngineMode engineMode ) { EsperOperatingPeriodRequest request = new EsperOperatingPeriodRequest( tenantKey, @@ -88,7 +90,8 @@ public class EsperPocController { significantDrivingMinutes, mergeGapSeconds, gapDetectionToleranceSeconds, - unknownTreatmentMode + unknownTreatmentMode, + engineMode ); return ResponseEntity.ok(operatingPeriodEvaluationService.evaluate(request)); } diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodEngineMode.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodEngineMode.java new file mode 100644 index 0000000..40106d5 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodEngineMode.java @@ -0,0 +1,6 @@ +package at.procon.eventhub.esperpoc.dto; + +public enum EsperOperatingPeriodEngineMode { + STREAM_COLLECTOR, + FULL_EPL +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodRequest.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodRequest.java index 59420f4..2c8b846 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodRequest.java +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodRequest.java @@ -15,7 +15,8 @@ public record EsperOperatingPeriodRequest( Integer significantDrivingMinutes, Integer mergeGapSeconds, Integer gapDetectionToleranceSeconds, - EsperUnknownTreatmentMode unknownTreatmentMode + EsperUnknownTreatmentMode unknownTreatmentMode, + EsperOperatingPeriodEngineMode engineMode ) { public EsperOperatingPeriodRequest { if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) { diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodResultDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodResultDto.java index 01048f1..525a312 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodResultDto.java +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperOperatingPeriodResultDto.java @@ -27,6 +27,7 @@ public record EsperOperatingPeriodResultDto( int mergeGapSeconds, int gapDetectionToleranceSeconds, EsperUnknownTreatmentMode unknownTreatmentMode, + EsperOperatingPeriodEngineMode engineMode, List rawEvents, List resolvedKnownIntervals, List evaluationIntervals, diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEngine.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEngine.java index 0e51e0e..80868e8 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEngine.java +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEngine.java @@ -1,6 +1,7 @@ package at.procon.eventhub.esperpoc.service; import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto; +import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode; import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto; import com.espertech.esper.common.client.EPCompiled; import com.espertech.esper.common.client.EventBean; @@ -12,37 +13,66 @@ import com.espertech.esper.runtime.client.EPDeployException; import com.espertech.esper.runtime.client.EPDeployment; import com.espertech.esper.runtime.client.EPRuntime; import com.espertech.esper.runtime.client.EPRuntimeProvider; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.time.Duration; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Component; +import org.springframework.util.StreamUtils; @Component public class EsperOperatingPeriodEngine { private static final AtomicLong RUNTIME_COUNTER = new AtomicLong(); + // Minimal stream-only mode: Esper preserves event ordering, Java owns the period state machine. private static final String INPUT_STREAM_EPL = """ @name('operatingPeriodIntervalStream') select * from OperatingPeriodIntervalInputEvent """; + // Full-EPL mode: Esper owns the operating-period state machine and emits periodized intervals/closures. + private static final String FULL_EPL_TEMPLATE = loadResource("esper/operating-period-state-machine.epl"); public EsperOperatingPeriodEvaluation evaluate( List intervals, - Duration operatingSplitIdleThreshold + Duration operatingSplitIdleThreshold, + EsperOperatingPeriodEngineMode mode ) { List sorted = sortedPositiveIntervals(intervals); if (sorted.isEmpty()) { return new EsperOperatingPeriodEvaluation(List.of(), List.of()); } + if (mode == EsperOperatingPeriodEngineMode.FULL_EPL) { + return evaluateFullEpl(sorted, operatingSplitIdleThreshold); + } + return evaluateStreamCollector(sorted, operatingSplitIdleThreshold); + } + + public EsperOperatingPeriodEvaluation evaluate( + List intervals, + Duration operatingSplitIdleThreshold + ) { + return evaluate(intervals, operatingSplitIdleThreshold, EsperOperatingPeriodEngineMode.STREAM_COLLECTOR); + } + + private EsperOperatingPeriodEvaluation evaluateStreamCollector( + List sorted, + Duration operatingSplitIdleThreshold + ) { + // In stream-collector mode Esper only serializes the stream; period transitions are evaluated in Java. PeriodizationCollector collector = new PeriodizationCollector(operatingSplitIdleThreshold); executeWithRuntime( configuration -> configuration.getCommon().addEventType("OperatingPeriodIntervalInputEvent", EsperOperatingPeriodIntervalInputEvent.class), INPUT_STREAM_EPL, - "operatingPeriodIntervalStream", - newData -> collectInputIntervals(newData, collector), + List.of("operatingPeriodIntervalStream"), + (statementName, newData) -> collectInputIntervals(newData, collector), runtime -> { for (ActivityIntervalDto interval : sorted) { runtime.getEventService().sendEventBean(toInputEvent(interval), "OperatingPeriodIntervalInputEvent"); @@ -52,11 +82,72 @@ public class EsperOperatingPeriodEngine { return collector.finish(); } + private EsperOperatingPeriodEvaluation evaluateFullEpl( + List sorted, + Duration operatingSplitIdleThreshold + ) { + // The full-EPL script is parameterized per request so the idle-split threshold matches the request/config. + String epl = FULL_EPL_TEMPLATE.replace("${operatingSplitIdleMs}", Long.toString(operatingSplitIdleThreshold.toMillis())); + List periodizedIntervals = new ArrayList<>(); + List closedPeriods = new ArrayList<>(); + executeWithRuntime( + configuration -> { + // Full-EPL mode uses explicit map schemas so EPL can own the whole state machine without + // relying on Java bean-property resolution during compilation. + Map inputDefinition = new LinkedHashMap<>(); + inputDefinition.put("driverId", java.util.UUID.class); + inputDefinition.put("vehicleId", java.util.UUID.class); + inputDefinition.put("vehicleRegistrationId", java.util.UUID.class); + inputDefinition.put("activityType", String.class); + inputDefinition.put("cardSlot", String.class); + inputDefinition.put("cardStatus", String.class); + inputDefinition.put("drivingStatus", String.class); + inputDefinition.put("sourceKind", String.class); + inputDefinition.put("startTs", long.class); + inputDefinition.put("endTs", long.class); + inputDefinition.put("durationMs", long.class); + inputDefinition.put("sourceRowId", String.class); + inputDefinition.put("sourceRowIds", java.util.List.class); + inputDefinition.put("clippedToRequestedPeriod", boolean.class); + inputDefinition.put("synthetic", boolean.class); + configuration.getCommon().addEventType("OperatingPeriodInputMap", inputDefinition); + configuration.getCommon().addEventType("OperatingPeriodFlushEvent", Map.of("reason", String.class)); + }, + epl, + List.of("periodizedActivityIntervals", "operatingPeriodClosed"), + (statementName, newData) -> { + if ("periodizedActivityIntervals".equals(statementName)) { + collectPeriodizedOutputs(newData, periodizedIntervals); + } else if ("operatingPeriodClosed".equals(statementName)) { + collectClosedOutputs(newData, closedPeriods); + } + }, + runtime -> { + // Historical evaluation sends the complete interval timeline first and then a single flush event + // so the EPL state machine can emit the final still-open operating period. + for (ActivityIntervalDto interval : sorted) { + runtime.getEventService().sendEventMap(toInputMap(interval), "OperatingPeriodInputMap"); + } + runtime.getEventService().sendEventMap(Map.of("reason", "HISTORICAL_EVALUATION"), "OperatingPeriodFlushEvent"); + } + ); + return new EsperOperatingPeriodEvaluation( + periodizedIntervals.stream() + .sorted(Comparator.comparing(OperatingPeriodActivityIntervalDto::startedAt) + .thenComparing(OperatingPeriodActivityIntervalDto::endedAt) + .thenComparing(OperatingPeriodActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo))) + .toList(), + closedPeriods.stream() + .sorted(Comparator.comparing(EsperClosedOperatingPeriod::startedAt)) + .toList() + ); + } + private void executeWithRuntime( java.util.function.Consumer configurationSetup, String epl, - String statementName, - java.util.function.Consumer listener, + List statementNames, + StatementListener listener, java.util.function.Consumer sender ) { EPRuntime runtime = null; @@ -69,9 +160,12 @@ public class EsperOperatingPeriodEngine { CompilerArguments arguments = new CompilerArguments(configuration); EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments); EPDeployment deployment = runtime.getDeploymentService().deploy(compiled); - runtime.getDeploymentService() - .getStatement(deployment.getDeploymentId(), statementName) - .addListener((newData, oldData, statement, rt) -> listener.accept(newData)); + // Multiple statements may emit outputs from a single deployment; we dispatch by statement name. + for (String statementName : statementNames) { + runtime.getDeploymentService() + .getStatement(deployment.getDeploymentId(), statementName) + .addListener((newData, oldData, statement, rt) -> listener.accept(statementName, newData)); + } sender.accept(runtime); } catch (EPCompileException | EPDeployException e) { throw new IllegalStateException("Cannot compile/deploy Esper operating-period EPL", e); @@ -87,10 +181,66 @@ public class EsperOperatingPeriodEngine { return; } for (EventBean event : newData) { + // Stream-collector mode receives the ordered interval stream back from Esper and applies the + // deterministic Java state machine to it. collector.accept((EsperOperatingPeriodIntervalInputEvent) event.getUnderlying()); } } + private void collectPeriodizedOutputs(EventBean[] newData, List target) { + if (newData == null) { + return; + } + for (EventBean event : newData) { + target.add(new OperatingPeriodActivityIntervalDto( + (java.util.UUID) event.get("driverId"), + (java.util.UUID) event.get("vehicleId"), + (java.util.UUID) event.get("vehicleRegistrationId"), + (String) event.get("activityType"), + (String) event.get("cardSlot"), + (String) event.get("cardStatus"), + (String) event.get("drivingStatus"), + (String) event.get("sourceKind"), + OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("startedAtTs")), java.time.ZoneOffset.UTC), + OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("endedAtTs")), java.time.ZoneOffset.UTC), + ((Long) event.get("durationMs")) / 1000L, + (String) event.get("sourceRowId"), + castSourceRowIds(event.get("sourceRowIds")), + (Boolean) event.get("clippedToRequestedPeriod"), + "PERIODIZED_ACTIVITY", + (Long) event.get("operatingPeriodNo"), + OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("operatingPeriodStartedAtTs")), java.time.ZoneOffset.UTC), + (Boolean) event.get("newOperatingPeriod"), + nullableMillisToSeconds((Long) event.get("gapSincePreviousActivityMs")), + (Boolean) event.get("synthetic") + )); + } + } + + private void collectClosedOutputs(EventBean[] newData, List target) { + if (newData == null) { + return; + } + for (EventBean event : newData) { + target.add(new EsperClosedOperatingPeriod( + (Long) event.get("operatingPeriodNo"), + OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("operatingPeriodStartedAtTs")), java.time.ZoneOffset.UTC), + OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("operatingPeriodEndedAtTs")), java.time.ZoneOffset.UTC), + ((Long) event.get("durationMs")) / 1000L, + (String) event.get("closedBy") + )); + } + } + + @SuppressWarnings("unchecked") + private List castSourceRowIds(Object value) { + return value == null ? List.of() : List.copyOf((List) value); + } + + private Long nullableMillisToSeconds(Long value) { + return value == null ? null : value / 1000L; + } + private EsperOperatingPeriodIntervalInputEvent toInputEvent(ActivityIntervalDto interval) { return new EsperOperatingPeriodIntervalInputEvent( interval.driverEntityId(), @@ -111,6 +261,26 @@ public class EsperOperatingPeriodEngine { ); } + private Map toInputMap(ActivityIntervalDto interval) { + Map map = new LinkedHashMap<>(); + map.put("driverId", interval.driverEntityId()); + map.put("vehicleId", interval.vehicleId()); + map.put("vehicleRegistrationId", interval.vehicleRegistrationId()); + map.put("activityType", interval.activityType()); + map.put("cardSlot", interval.cardSlot()); + map.put("cardStatus", interval.cardStatus()); + map.put("drivingStatus", interval.drivingStatus()); + map.put("sourceKind", interval.sourceKind()); + map.put("startTs", interval.startedAt().toInstant().toEpochMilli()); + map.put("endTs", interval.endedAt().toInstant().toEpochMilli()); + map.put("durationMs", interval.durationSeconds() * 1000L); + map.put("sourceRowId", interval.sourceRowId()); + map.put("sourceRowIds", interval.sourceRowIds()); + map.put("clippedToRequestedPeriod", interval.clippedToRequestedPeriod()); + map.put("synthetic", "UNKNOWN_GAP".equals(interval.level())); + return map; + } + private List sortedPositiveIntervals(List intervals) { if (intervals == null || intervals.isEmpty()) { return List.of(); @@ -129,6 +299,18 @@ public class EsperOperatingPeriodEngine { ) { } + private interface StatementListener { + void accept(String statementName, EventBean[] newData); + } + + private static String loadResource(String path) { + try { + return StreamUtils.copyToString(new ClassPathResource(path).getInputStream(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new IllegalStateException("Cannot load Esper resource " + path, e); + } + } + private static final class PeriodizationCollector { private final Duration operatingSplitIdleThreshold; private final List periodizedIntervals = new ArrayList<>(); @@ -163,12 +345,16 @@ public class EsperOperatingPeriodEngine { if ("UNKNOWN".equals(dto.activityType())) { if (!hasOpenPeriod) { + // Unknown time before the first known activity does not belong to any operating period. return; } if (dto.durationSeconds() >= operatingSplitIdleThreshold.getSeconds()) { + // Long UNKNOWN behaves like a closing gap: close the current period and wait for the next + // known activity to reopen a new period number. closeCurrent("UNKNOWN_GAP"); return; } + // Short UNKNOWN stays inside the current period as explicit uncertainty. periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized( dto, operatingPeriodNo, @@ -180,6 +366,7 @@ public class EsperOperatingPeriodEngine { } if (!hasOpenPeriod) { + // First known activity, or first activity after a long closing gap, opens a new operating period. operatingPeriodNo = operatingPeriodNo < 1 ? 1 : operatingPeriodNo + 1; hasOpenPeriod = true; operatingPeriodStartedAt = dto.startedAt(); @@ -196,6 +383,7 @@ public class EsperOperatingPeriodEngine { long gapSeconds = Math.max(0, Duration.between(lastKnownActivityEndAt, dto.startedAt()).getSeconds()); if (gapSeconds >= operatingSplitIdleThreshold.getSeconds()) { + // Long idle time between known activities closes the current period and starts the next one. closeCurrent("IDLE_GAP"); operatingPeriodNo++; hasOpenPeriod = true; @@ -211,6 +399,7 @@ public class EsperOperatingPeriodEngine { return; } + // Normal forward continuity inside the same period. periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized( dto, operatingPeriodNo, @@ -225,6 +414,7 @@ public class EsperOperatingPeriodEngine { private EsperOperatingPeriodEvaluation finish() { if (hasOpenPeriod) { + // Historical evaluation has no future event to close the final period, so emit it explicitly. closeCurrent("FLUSH"); } return new EsperOperatingPeriodEvaluation( @@ -244,6 +434,7 @@ public class EsperOperatingPeriodEngine { hasOpenPeriod = false; return; } + // A closed period always ends at the last known non-rest activity end, never at the synthetic UNKNOWN. closedPeriods.add(new EsperClosedOperatingPeriod( operatingPeriodNo, operatingPeriodStartedAt, diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationService.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationService.java index e6a2bb2..51b6718 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationService.java +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationService.java @@ -3,6 +3,7 @@ package at.procon.eventhub.esperpoc.service; import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto; import at.procon.eventhub.esperpoc.dto.DrivingInterruptionDto; +import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode; import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest; import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto; import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode; @@ -68,6 +69,7 @@ public class EsperOperatingPeriodEvaluationService { Duration mergeGapTolerance = Duration.ofSeconds(resolveMergeGapSeconds(request)); Duration gapDetectionTolerance = Duration.ofSeconds(resolveGapDetectionToleranceSeconds(request)); EsperUnknownTreatmentMode unknownTreatmentMode = resolveUnknownTreatmentMode(request); + EsperOperatingPeriodEngineMode engineMode = resolveEngineMode(request); long dbStartedNanos = System.nanoTime(); List rawEvents = activityRepository.findDriverActivityEvents( @@ -105,7 +107,8 @@ public class EsperOperatingPeriodEvaluationService { long periodizeStartedNanos = System.nanoTime(); EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate( evaluationLoadedIntervals, - splitIdleThreshold + splitIdleThreshold, + engineMode ); long periodizeElapsedMs = elapsedMillis(periodizeStartedNanos); @@ -150,7 +153,7 @@ public class EsperOperatingPeriodEvaluationService { ); long totalElapsedMs = elapsedMillis(startedNanos); - log.info("Esper operating-period evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} unknownMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedKnownIntervals={} evaluationIntervals={} periodizedIntervals={} mergedIntervals={} nonDrivingIntervals={} operatingPeriods={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, synthUnknown={}, periodizeEsper={}, merge={}, nonDriving={}, total={}}}", + log.info("Esper operating-period evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} unknownMode={} engineMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedKnownIntervals={} evaluationIntervals={} periodizedIntervals={} mergedIntervals={} nonDrivingIntervals={} operatingPeriods={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, synthUnknown={}, periodizeEsper={}, merge={}, nonDriving={}, total={}}}", request.tenantKey(), request.driverId(), requestedFrom, @@ -158,6 +161,7 @@ public class EsperOperatingPeriodEvaluationService { loadedFrom, loadedTo, unknownTreatmentMode, + engineMode, rawEvents.size(), driverCardRawEvents.size(), vehicleUnitRawEvents.size(), @@ -202,6 +206,7 @@ public class EsperOperatingPeriodEvaluationService { resolveMergeGapSeconds(request), resolveGapDetectionToleranceSeconds(request), unknownTreatmentMode, + engineMode, rawEvents, resolvedKnownLoadedIntervals, evaluationLoadedIntervals, @@ -210,6 +215,7 @@ public class EsperOperatingPeriodEvaluationService { nonDrivingIntervals, operatingPeriods, notes( + engineMode, unknownTreatmentMode, resolveOperatingSplitIdleHours(request), resolveSignificantDrivingMinutes(request), @@ -708,7 +714,17 @@ public class EsperOperatingPeriodEvaluationService { : properties.getEsperPoc().getOperatingPeriodEvaluation().getUnknownTreatmentMode(); } + private EsperOperatingPeriodEngineMode resolveEngineMode(EsperOperatingPeriodRequest request) { + if (request.engineMode() != null) { + return request.engineMode(); + } + return properties == null + ? EsperOperatingPeriodEngineMode.STREAM_COLLECTOR + : properties.getEsperPoc().getOperatingPeriodEvaluation().getEngineMode(); + } + private List notes( + EsperOperatingPeriodEngineMode engineMode, EsperUnknownTreatmentMode unknownTreatmentMode, int operatingSplitIdleHours, int significantDrivingMinutes, @@ -719,6 +735,7 @@ public class EsperOperatingPeriodEvaluationService { "BREAK_REST events are ignored for activity evaluation but still prevent synthetic UNKNOWN intervals from being created over covered rest spans.", "Synthetic UNKNOWN intervals are created only for uncovered gaps between non-rest activities.", "UNKNOWN treatment mode is " + unknownTreatmentMode + ".", + "Operating-period engine mode is " + engineMode + ".", "Operating periods split after " + operatingSplitIdleHours + " hours of no non-rest activity; significant driving closes non-driving intervals from " + significantDrivingMinutes + " minutes onward.", "Synthetic UNKNOWN gaps are only emitted when uncovered time exceeds " + gapDetectionToleranceSeconds + " seconds." ); diff --git a/src/main/resources/esper/operating-period-state-machine.epl b/src/main/resources/esper/operating-period-state-machine.epl new file mode 100644 index 0000000..5453946 --- /dev/null +++ b/src/main/resources/esper/operating-period-state-machine.epl @@ -0,0 +1,355 @@ +create variable long operatingSplitIdleMs = ${operatingSplitIdleMs}; + +/* + * Full-EPL operating-period state machine. + * + * Input contract: + * - Java sends already-resolved intervals, not raw START/END boundaries. + * - DRIVER_CARD remains authoritative and VU is only used to fill uncovered gaps before events reach this EPL. + * - Synthetic uncovered gaps arrive as activityType = 'UNKNOWN'. + * + * Output contract: + * - PeriodizedActivityInterval: every input interval assigned to an operating period + * - OperatingPeriodClosed: closed operating periods, including the final FLUSH period + */ + +create schema KnownOperatingInput( + driverId java.util.UUID, + vehicleId java.util.UUID, + vehicleRegistrationId java.util.UUID, + activityType string, + cardSlot string, + cardStatus string, + drivingStatus string, + sourceKind string, + startTs long, + endTs long, + durationMs long, + sourceRowId string, + sourceRowIds java.util.List, + clippedToRequestedPeriod boolean, + synthetic boolean +); + +create schema UnknownOperatingInput( + driverId java.util.UUID, + vehicleId java.util.UUID, + vehicleRegistrationId java.util.UUID, + activityType string, + cardSlot string, + cardStatus string, + drivingStatus string, + sourceKind string, + startTs long, + endTs long, + durationMs long, + sourceRowId string, + sourceRowIds java.util.List, + clippedToRequestedPeriod boolean, + synthetic boolean +); + +create schema PeriodizedActivityInterval( + driverId java.util.UUID, + vehicleId java.util.UUID, + vehicleRegistrationId java.util.UUID, + activityType string, + cardSlot string, + cardStatus string, + drivingStatus string, + sourceKind string, + startedAtTs long, + endedAtTs long, + durationMs long, + sourceRowId string, + sourceRowIds java.util.List, + clippedToRequestedPeriod boolean, + operatingPeriodNo long, + operatingPeriodStartedAtTs long, + newOperatingPeriod boolean, + gapSincePreviousActivityMs java.lang.Long, + synthetic boolean +); + +create schema OperatingPeriodClosed( + driverId java.util.UUID, + operatingPeriodNo long, + operatingPeriodStartedAtTs long, + operatingPeriodEndedAtTs long, + durationMs long, + closedBy string +); + +create window OperatingPeriodState#unique(driverId) as ( + driverId java.util.UUID, + hasOpen boolean, + operatingPeriodNo long, + operatingPeriodStartedAtTs long, + lastKnownActivityEndTs long +); + +/* Split the timeline into known activities and synthetic UNKNOWN gaps. */ +insert into KnownOperatingInput +select * from OperatingPeriodInputMap as i where i.activityType != 'UNKNOWN'; + +insert into UnknownOperatingInput +select * from OperatingPeriodInputMap as i where i.activityType = 'UNKNOWN'; + +/* First known activity for a driver opens operating period 1. */ +@Priority(200) +on KnownOperatingInput as i +insert into PeriodizedActivityInterval +select + i.driverId as driverId, + i.vehicleId as vehicleId, + i.vehicleRegistrationId as vehicleRegistrationId, + i.activityType as activityType, + i.cardSlot as cardSlot, + i.cardStatus as cardStatus, + i.drivingStatus as drivingStatus, + i.sourceKind as sourceKind, + i.startTs as startedAtTs, + i.endTs as endedAtTs, + i.durationMs as durationMs, + i.sourceRowId as sourceRowId, + i.sourceRowIds as sourceRowIds, + i.clippedToRequestedPeriod as clippedToRequestedPeriod, + 1L as operatingPeriodNo, + i.startTs as operatingPeriodStartedAtTs, + true as newOperatingPeriod, + cast(null, java.lang.Long) as gapSincePreviousActivityMs, + i.synthetic as synthetic +where not exists (select * from OperatingPeriodState as s where s.driverId = i.driverId); + +@Priority(190) +on KnownOperatingInput as i +insert into OperatingPeriodState +select + i.driverId as driverId, + true as hasOpen, + 1L as operatingPeriodNo, + i.startTs as operatingPeriodStartedAtTs, + i.endTs as lastKnownActivityEndTs +where not exists (select * from OperatingPeriodState as s where s.driverId = i.driverId); + +/* A long forward gap between known activities closes the current period with reason IDLE_GAP. */ +@Priority(180) +on KnownOperatingInput as i +insert into OperatingPeriodClosed +select + s.driverId as driverId, + s.operatingPeriodNo as operatingPeriodNo, + s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs, + s.lastKnownActivityEndTs as operatingPeriodEndedAtTs, + s.lastKnownActivityEndTs - s.operatingPeriodStartedAtTs as durationMs, + 'IDLE_GAP' as closedBy +from OperatingPeriodState as s +where s.driverId = i.driverId + and s.hasOpen = true + and i.startTs - s.lastKnownActivityEndTs >= operatingSplitIdleMs; + +/* After a long idle gap, the next known interval is emitted as the first interval of the next period. */ +@Priority(170) +on KnownOperatingInput as i +insert into PeriodizedActivityInterval +select + i.driverId as driverId, + i.vehicleId as vehicleId, + i.vehicleRegistrationId as vehicleRegistrationId, + i.activityType as activityType, + i.cardSlot as cardSlot, + i.cardStatus as cardStatus, + i.drivingStatus as drivingStatus, + i.sourceKind as sourceKind, + i.startTs as startedAtTs, + i.endTs as endedAtTs, + i.durationMs as durationMs, + i.sourceRowId as sourceRowId, + i.sourceRowIds as sourceRowIds, + i.clippedToRequestedPeriod as clippedToRequestedPeriod, + s.operatingPeriodNo + 1 as operatingPeriodNo, + i.startTs as operatingPeriodStartedAtTs, + true as newOperatingPeriod, + cast(i.startTs - s.lastKnownActivityEndTs, java.lang.Long) as gapSincePreviousActivityMs, + i.synthetic as synthetic +from OperatingPeriodState as s +where s.driverId = i.driverId + and s.hasOpen = true + and i.startTs - s.lastKnownActivityEndTs >= operatingSplitIdleMs; + +/* Update the window state to the newly opened period after an IDLE_GAP close. */ +@Priority(160) +on KnownOperatingInput as i +update OperatingPeriodState as s +set + hasOpen = true, + operatingPeriodNo = s.operatingPeriodNo + 1, + operatingPeriodStartedAtTs = i.startTs, + lastKnownActivityEndTs = i.endTs +where s.driverId = i.driverId + and s.hasOpen = true + and i.startTs - s.lastKnownActivityEndTs >= operatingSplitIdleMs; + +/* After a long UNKNOWN gap we keep the counter but mark the period closed. The next known activity reopens + * the next period number using the retained state row. */ +@Priority(155) +on KnownOperatingInput as i +insert into PeriodizedActivityInterval +select + i.driverId as driverId, + i.vehicleId as vehicleId, + i.vehicleRegistrationId as vehicleRegistrationId, + i.activityType as activityType, + i.cardSlot as cardSlot, + i.cardStatus as cardStatus, + i.drivingStatus as drivingStatus, + i.sourceKind as sourceKind, + i.startTs as startedAtTs, + i.endTs as endedAtTs, + i.durationMs as durationMs, + i.sourceRowId as sourceRowId, + i.sourceRowIds as sourceRowIds, + i.clippedToRequestedPeriod as clippedToRequestedPeriod, + s.operatingPeriodNo + 1 as operatingPeriodNo, + i.startTs as operatingPeriodStartedAtTs, + true as newOperatingPeriod, + cast(null, java.lang.Long) as gapSincePreviousActivityMs, + i.synthetic as synthetic +from OperatingPeriodState as s +where s.driverId = i.driverId + and s.hasOpen = false; + +@Priority(145) +on KnownOperatingInput as i +update OperatingPeriodState as s +set + hasOpen = true, + operatingPeriodNo = s.operatingPeriodNo + 1, + operatingPeriodStartedAtTs = i.startTs, + lastKnownActivityEndTs = i.endTs +where s.driverId = i.driverId + and s.hasOpen = false; + +/* Normal same-period continuity: the gap is forward, non-negative, and still below the split threshold. */ +@Priority(150) +on KnownOperatingInput as i +insert into PeriodizedActivityInterval +select + i.driverId as driverId, + i.vehicleId as vehicleId, + i.vehicleRegistrationId as vehicleRegistrationId, + i.activityType as activityType, + i.cardSlot as cardSlot, + i.cardStatus as cardStatus, + i.drivingStatus as drivingStatus, + i.sourceKind as sourceKind, + i.startTs as startedAtTs, + i.endTs as endedAtTs, + i.durationMs as durationMs, + i.sourceRowId as sourceRowId, + i.sourceRowIds as sourceRowIds, + i.clippedToRequestedPeriod as clippedToRequestedPeriod, + s.operatingPeriodNo as operatingPeriodNo, + s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs, + false as newOperatingPeriod, + cast(i.startTs - s.lastKnownActivityEndTs, java.lang.Long) as gapSincePreviousActivityMs, + i.synthetic as synthetic +from OperatingPeriodState as s +where s.driverId = i.driverId + and s.hasOpen = true + and i.startTs - s.lastKnownActivityEndTs >= 0 + and i.startTs - s.lastKnownActivityEndTs < operatingSplitIdleMs; + +@Priority(140) +on KnownOperatingInput as i +update OperatingPeriodState as s +set + lastKnownActivityEndTs = case + when i.endTs > s.lastKnownActivityEndTs then i.endTs + else s.lastKnownActivityEndTs + end +where s.driverId = i.driverId + and s.hasOpen = true + and i.startTs - s.lastKnownActivityEndTs >= 0 + and i.startTs - s.lastKnownActivityEndTs < operatingSplitIdleMs; + +/* A long UNKNOWN interval closes the current period, but the state row remains so the next known activity + * can reopen with the next period number. */ +@Priority(130) +on UnknownOperatingInput as i +insert into OperatingPeriodClosed +select + s.driverId as driverId, + s.operatingPeriodNo as operatingPeriodNo, + s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs, + s.lastKnownActivityEndTs as operatingPeriodEndedAtTs, + s.lastKnownActivityEndTs - s.operatingPeriodStartedAtTs as durationMs, + 'UNKNOWN_GAP' as closedBy +from OperatingPeriodState as s +where s.driverId = i.driverId + and s.hasOpen = true + and i.durationMs >= operatingSplitIdleMs; + +@Priority(120) +on UnknownOperatingInput as i +update OperatingPeriodState as s +set hasOpen = false +where s.driverId = i.driverId + and s.hasOpen = true + and i.durationMs >= operatingSplitIdleMs; + +/* Short UNKNOWN stays inside the open period as explicit uncertainty, without changing period state. */ +@Priority(110) +on UnknownOperatingInput as i +insert into PeriodizedActivityInterval +select + i.driverId as driverId, + i.vehicleId as vehicleId, + i.vehicleRegistrationId as vehicleRegistrationId, + i.activityType as activityType, + i.cardSlot as cardSlot, + i.cardStatus as cardStatus, + i.drivingStatus as drivingStatus, + i.sourceKind as sourceKind, + i.startTs as startedAtTs, + i.endTs as endedAtTs, + i.durationMs as durationMs, + i.sourceRowId as sourceRowId, + i.sourceRowIds as sourceRowIds, + i.clippedToRequestedPeriod as clippedToRequestedPeriod, + s.operatingPeriodNo as operatingPeriodNo, + s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs, + false as newOperatingPeriod, + cast(i.startTs - s.lastKnownActivityEndTs, java.lang.Long) as gapSincePreviousActivityMs, + i.synthetic as synthetic +from OperatingPeriodState as s +where s.driverId = i.driverId + and s.hasOpen = true + and i.durationMs < operatingSplitIdleMs; + +/* Historical evaluation ends with a flush event so the final still-open period is emitted. */ +@Priority(100) +on OperatingPeriodFlushEvent as f +insert into OperatingPeriodClosed +select + s.driverId as driverId, + s.operatingPeriodNo as operatingPeriodNo, + s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs, + s.lastKnownActivityEndTs as operatingPeriodEndedAtTs, + s.lastKnownActivityEndTs - s.operatingPeriodStartedAtTs as durationMs, + 'FLUSH' as closedBy +from OperatingPeriodState as s +where s.hasOpen = true; + +@Priority(90) +on OperatingPeriodFlushEvent as f +update OperatingPeriodState as s +set hasOpen = false +where s.hasOpen = true; + +/* Listener-facing output statements consumed by Java. */ +@name('periodizedActivityIntervals') +select * from PeriodizedActivityInterval; + +@name('operatingPeriodClosed') +select * from OperatingPeriodClosed; diff --git a/src/test/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationServiceTest.java b/src/test/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationServiceTest.java index fe5c442..bebb916 100644 --- a/src/test/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationServiceTest.java +++ b/src/test/java/at/procon/eventhub/esperpoc/service/EsperOperatingPeriodEvaluationServiceTest.java @@ -3,6 +3,7 @@ package at.procon.eventhub.esperpoc.service; import static org.assertj.core.api.Assertions.assertThat; import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto; +import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode; import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode; import at.procon.eventhub.esperpoc.dto.NonDrivingIntervalDto; import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto; @@ -53,7 +54,8 @@ class EsperOperatingPeriodEvaluationServiceTest { EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate( evaluationIntervals, - Duration.ofHours(7) + Duration.ofHours(7), + EsperOperatingPeriodEngineMode.STREAM_COLLECTOR ); assertThat(evaluation.periodizedIntervals()).extracting(OperatingPeriodActivityIntervalDto::activityType) @@ -99,6 +101,37 @@ class EsperOperatingPeriodEvaluationServiceTest { assertThat(nonDrivingIntervals.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T09:30:00Z")); } + @Test + void fullEplModeMatchesStreamCollectorMode() { + UUID driverId = UUID.randomUUID(); + List evaluationIntervals = List.of( + activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"), + activity(driverId, "AVAILABILITY", "2026-04-01T10:00:00Z", "2026-04-01T11:00:00Z", "a1", "DRIVER_CARD"), + unknown(driverId, "2026-04-01T11:00:00Z", "2026-04-01T11:30:00Z"), + activity(driverId, "WORK", "2026-04-01T11:30:00Z", "2026-04-01T12:00:00Z", "w2", "DRIVER_CARD"), + unknown(driverId, "2026-04-01T12:00:00Z", "2026-04-01T20:00:00Z"), + activity(driverId, "DRIVE", "2026-04-01T20:00:00Z", "2026-04-01T20:30:00Z", "d1", "DRIVER_CARD") + ); + + EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation collectorEvaluation = operatingPeriodEngine.evaluate( + evaluationIntervals, + Duration.ofHours(7), + EsperOperatingPeriodEngineMode.STREAM_COLLECTOR + ); + EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation fullEplEvaluation = operatingPeriodEngine.evaluate( + evaluationIntervals, + Duration.ofHours(7), + EsperOperatingPeriodEngineMode.FULL_EPL + ); + + assertThat(fullEplEvaluation.periodizedIntervals()) + .usingRecursiveComparison() + .isEqualTo(collectorEvaluation.periodizedIntervals()); + assertThat(fullEplEvaluation.closedPeriods()) + .usingRecursiveComparison() + .isEqualTo(collectorEvaluation.closedPeriods()); + } + private ActivityIntervalDto activity( UUID driverId, String activity,