Add full-EPL operating period mode
This commit is contained in:
parent
519711b214
commit
ddc45f3c30
|
|
@ -1,6 +1,7 @@
|
||||||
package at.procon.eventhub.config;
|
package at.procon.eventhub.config;
|
||||||
|
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
|
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
|
||||||
|
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode;
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
|
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode;
|
import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
|
@ -80,6 +81,7 @@ public class EventHubProperties {
|
||||||
private int mergeGapSeconds = 0;
|
private int mergeGapSeconds = 0;
|
||||||
private int gapDetectionToleranceSeconds = 0;
|
private int gapDetectionToleranceSeconds = 0;
|
||||||
private EsperUnknownTreatmentMode unknownTreatmentMode = EsperUnknownTreatmentMode.AS_BREAK_REST;
|
private EsperUnknownTreatmentMode unknownTreatmentMode = EsperUnknownTreatmentMode.AS_BREAK_REST;
|
||||||
|
private EsperOperatingPeriodEngineMode engineMode = EsperOperatingPeriodEngineMode.STREAM_COLLECTOR;
|
||||||
|
|
||||||
public int getOperatingSplitIdleHours() {
|
public int getOperatingSplitIdleHours() {
|
||||||
return operatingSplitIdleHours;
|
return operatingSplitIdleHours;
|
||||||
|
|
@ -122,6 +124,16 @@ public class EventHubProperties {
|
||||||
? EsperUnknownTreatmentMode.AS_BREAK_REST
|
? EsperUnknownTreatmentMode.AS_BREAK_REST
|
||||||
: unknownTreatmentMode;
|
: unknownTreatmentMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public EsperOperatingPeriodEngineMode getEngineMode() {
|
||||||
|
return engineMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEngineMode(EsperOperatingPeriodEngineMode engineMode) {
|
||||||
|
this.engineMode = engineMode == null
|
||||||
|
? EsperOperatingPeriodEngineMode.STREAM_COLLECTOR
|
||||||
|
: engineMode;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Batch {
|
public static class Batch {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package at.procon.eventhub.esperpoc.api;
|
package at.procon.eventhub.esperpoc.api;
|
||||||
|
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
|
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
|
||||||
|
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode;
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
|
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
|
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
|
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
|
||||||
|
|
@ -76,7 +77,8 @@ public class EsperPocController {
|
||||||
@RequestParam(required = false) Integer significantDrivingMinutes,
|
@RequestParam(required = false) Integer significantDrivingMinutes,
|
||||||
@RequestParam(required = false) Integer mergeGapSeconds,
|
@RequestParam(required = false) Integer mergeGapSeconds,
|
||||||
@RequestParam(required = false) Integer gapDetectionToleranceSeconds,
|
@RequestParam(required = false) Integer gapDetectionToleranceSeconds,
|
||||||
@RequestParam(required = false) EsperUnknownTreatmentMode unknownTreatmentMode
|
@RequestParam(required = false) EsperUnknownTreatmentMode unknownTreatmentMode,
|
||||||
|
@RequestParam(required = false) EsperOperatingPeriodEngineMode engineMode
|
||||||
) {
|
) {
|
||||||
EsperOperatingPeriodRequest request = new EsperOperatingPeriodRequest(
|
EsperOperatingPeriodRequest request = new EsperOperatingPeriodRequest(
|
||||||
tenantKey,
|
tenantKey,
|
||||||
|
|
@ -88,7 +90,8 @@ public class EsperPocController {
|
||||||
significantDrivingMinutes,
|
significantDrivingMinutes,
|
||||||
mergeGapSeconds,
|
mergeGapSeconds,
|
||||||
gapDetectionToleranceSeconds,
|
gapDetectionToleranceSeconds,
|
||||||
unknownTreatmentMode
|
unknownTreatmentMode,
|
||||||
|
engineMode
|
||||||
);
|
);
|
||||||
return ResponseEntity.ok(operatingPeriodEvaluationService.evaluate(request));
|
return ResponseEntity.ok(operatingPeriodEvaluationService.evaluate(request));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
package at.procon.eventhub.esperpoc.dto;
|
||||||
|
|
||||||
|
public enum EsperOperatingPeriodEngineMode {
|
||||||
|
STREAM_COLLECTOR,
|
||||||
|
FULL_EPL
|
||||||
|
}
|
||||||
|
|
@ -15,7 +15,8 @@ public record EsperOperatingPeriodRequest(
|
||||||
Integer significantDrivingMinutes,
|
Integer significantDrivingMinutes,
|
||||||
Integer mergeGapSeconds,
|
Integer mergeGapSeconds,
|
||||||
Integer gapDetectionToleranceSeconds,
|
Integer gapDetectionToleranceSeconds,
|
||||||
EsperUnknownTreatmentMode unknownTreatmentMode
|
EsperUnknownTreatmentMode unknownTreatmentMode,
|
||||||
|
EsperOperatingPeriodEngineMode engineMode
|
||||||
) {
|
) {
|
||||||
public EsperOperatingPeriodRequest {
|
public EsperOperatingPeriodRequest {
|
||||||
if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) {
|
if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) {
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ public record EsperOperatingPeriodResultDto(
|
||||||
int mergeGapSeconds,
|
int mergeGapSeconds,
|
||||||
int gapDetectionToleranceSeconds,
|
int gapDetectionToleranceSeconds,
|
||||||
EsperUnknownTreatmentMode unknownTreatmentMode,
|
EsperUnknownTreatmentMode unknownTreatmentMode,
|
||||||
|
EsperOperatingPeriodEngineMode engineMode,
|
||||||
List<RawActivityEventDto> rawEvents,
|
List<RawActivityEventDto> rawEvents,
|
||||||
List<ActivityIntervalDto> resolvedKnownIntervals,
|
List<ActivityIntervalDto> resolvedKnownIntervals,
|
||||||
List<ActivityIntervalDto> evaluationIntervals,
|
List<ActivityIntervalDto> evaluationIntervals,
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package at.procon.eventhub.esperpoc.service;
|
package at.procon.eventhub.esperpoc.service;
|
||||||
|
|
||||||
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
|
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
|
||||||
|
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode;
|
||||||
import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto;
|
import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto;
|
||||||
import com.espertech.esper.common.client.EPCompiled;
|
import com.espertech.esper.common.client.EPCompiled;
|
||||||
import com.espertech.esper.common.client.EventBean;
|
import com.espertech.esper.common.client.EventBean;
|
||||||
|
|
@ -12,37 +13,66 @@ import com.espertech.esper.runtime.client.EPDeployException;
|
||||||
import com.espertech.esper.runtime.client.EPDeployment;
|
import com.espertech.esper.runtime.client.EPDeployment;
|
||||||
import com.espertech.esper.runtime.client.EPRuntime;
|
import com.espertech.esper.runtime.client.EPRuntime;
|
||||||
import com.espertech.esper.runtime.client.EPRuntimeProvider;
|
import com.espertech.esper.runtime.client.EPRuntimeProvider;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Instant;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.StreamUtils;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
public class EsperOperatingPeriodEngine {
|
public class EsperOperatingPeriodEngine {
|
||||||
|
|
||||||
private static final AtomicLong RUNTIME_COUNTER = new AtomicLong();
|
private static final AtomicLong RUNTIME_COUNTER = new AtomicLong();
|
||||||
|
// Minimal stream-only mode: Esper preserves event ordering, Java owns the period state machine.
|
||||||
private static final String INPUT_STREAM_EPL = """
|
private static final String INPUT_STREAM_EPL = """
|
||||||
@name('operatingPeriodIntervalStream')
|
@name('operatingPeriodIntervalStream')
|
||||||
select * from OperatingPeriodIntervalInputEvent
|
select * from OperatingPeriodIntervalInputEvent
|
||||||
""";
|
""";
|
||||||
|
// Full-EPL mode: Esper owns the operating-period state machine and emits periodized intervals/closures.
|
||||||
|
private static final String FULL_EPL_TEMPLATE = loadResource("esper/operating-period-state-machine.epl");
|
||||||
|
|
||||||
public EsperOperatingPeriodEvaluation evaluate(
|
public EsperOperatingPeriodEvaluation evaluate(
|
||||||
List<ActivityIntervalDto> intervals,
|
List<ActivityIntervalDto> intervals,
|
||||||
Duration operatingSplitIdleThreshold
|
Duration operatingSplitIdleThreshold,
|
||||||
|
EsperOperatingPeriodEngineMode mode
|
||||||
) {
|
) {
|
||||||
List<ActivityIntervalDto> sorted = sortedPositiveIntervals(intervals);
|
List<ActivityIntervalDto> sorted = sortedPositiveIntervals(intervals);
|
||||||
if (sorted.isEmpty()) {
|
if (sorted.isEmpty()) {
|
||||||
return new EsperOperatingPeriodEvaluation(List.of(), List.of());
|
return new EsperOperatingPeriodEvaluation(List.of(), List.of());
|
||||||
}
|
}
|
||||||
|
if (mode == EsperOperatingPeriodEngineMode.FULL_EPL) {
|
||||||
|
return evaluateFullEpl(sorted, operatingSplitIdleThreshold);
|
||||||
|
}
|
||||||
|
return evaluateStreamCollector(sorted, operatingSplitIdleThreshold);
|
||||||
|
}
|
||||||
|
|
||||||
|
public EsperOperatingPeriodEvaluation evaluate(
|
||||||
|
List<ActivityIntervalDto> intervals,
|
||||||
|
Duration operatingSplitIdleThreshold
|
||||||
|
) {
|
||||||
|
return evaluate(intervals, operatingSplitIdleThreshold, EsperOperatingPeriodEngineMode.STREAM_COLLECTOR);
|
||||||
|
}
|
||||||
|
|
||||||
|
private EsperOperatingPeriodEvaluation evaluateStreamCollector(
|
||||||
|
List<ActivityIntervalDto> sorted,
|
||||||
|
Duration operatingSplitIdleThreshold
|
||||||
|
) {
|
||||||
|
// In stream-collector mode Esper only serializes the stream; period transitions are evaluated in Java.
|
||||||
PeriodizationCollector collector = new PeriodizationCollector(operatingSplitIdleThreshold);
|
PeriodizationCollector collector = new PeriodizationCollector(operatingSplitIdleThreshold);
|
||||||
executeWithRuntime(
|
executeWithRuntime(
|
||||||
configuration -> configuration.getCommon().addEventType("OperatingPeriodIntervalInputEvent", EsperOperatingPeriodIntervalInputEvent.class),
|
configuration -> configuration.getCommon().addEventType("OperatingPeriodIntervalInputEvent", EsperOperatingPeriodIntervalInputEvent.class),
|
||||||
INPUT_STREAM_EPL,
|
INPUT_STREAM_EPL,
|
||||||
"operatingPeriodIntervalStream",
|
List.of("operatingPeriodIntervalStream"),
|
||||||
newData -> collectInputIntervals(newData, collector),
|
(statementName, newData) -> collectInputIntervals(newData, collector),
|
||||||
runtime -> {
|
runtime -> {
|
||||||
for (ActivityIntervalDto interval : sorted) {
|
for (ActivityIntervalDto interval : sorted) {
|
||||||
runtime.getEventService().sendEventBean(toInputEvent(interval), "OperatingPeriodIntervalInputEvent");
|
runtime.getEventService().sendEventBean(toInputEvent(interval), "OperatingPeriodIntervalInputEvent");
|
||||||
|
|
@ -52,11 +82,72 @@ public class EsperOperatingPeriodEngine {
|
||||||
return collector.finish();
|
return collector.finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private EsperOperatingPeriodEvaluation evaluateFullEpl(
|
||||||
|
List<ActivityIntervalDto> sorted,
|
||||||
|
Duration operatingSplitIdleThreshold
|
||||||
|
) {
|
||||||
|
// The full-EPL script is parameterized per request so the idle-split threshold matches the request/config.
|
||||||
|
String epl = FULL_EPL_TEMPLATE.replace("${operatingSplitIdleMs}", Long.toString(operatingSplitIdleThreshold.toMillis()));
|
||||||
|
List<OperatingPeriodActivityIntervalDto> periodizedIntervals = new ArrayList<>();
|
||||||
|
List<EsperClosedOperatingPeriod> closedPeriods = new ArrayList<>();
|
||||||
|
executeWithRuntime(
|
||||||
|
configuration -> {
|
||||||
|
// Full-EPL mode uses explicit map schemas so EPL can own the whole state machine without
|
||||||
|
// relying on Java bean-property resolution during compilation.
|
||||||
|
Map<String, Object> inputDefinition = new LinkedHashMap<>();
|
||||||
|
inputDefinition.put("driverId", java.util.UUID.class);
|
||||||
|
inputDefinition.put("vehicleId", java.util.UUID.class);
|
||||||
|
inputDefinition.put("vehicleRegistrationId", java.util.UUID.class);
|
||||||
|
inputDefinition.put("activityType", String.class);
|
||||||
|
inputDefinition.put("cardSlot", String.class);
|
||||||
|
inputDefinition.put("cardStatus", String.class);
|
||||||
|
inputDefinition.put("drivingStatus", String.class);
|
||||||
|
inputDefinition.put("sourceKind", String.class);
|
||||||
|
inputDefinition.put("startTs", long.class);
|
||||||
|
inputDefinition.put("endTs", long.class);
|
||||||
|
inputDefinition.put("durationMs", long.class);
|
||||||
|
inputDefinition.put("sourceRowId", String.class);
|
||||||
|
inputDefinition.put("sourceRowIds", java.util.List.class);
|
||||||
|
inputDefinition.put("clippedToRequestedPeriod", boolean.class);
|
||||||
|
inputDefinition.put("synthetic", boolean.class);
|
||||||
|
configuration.getCommon().addEventType("OperatingPeriodInputMap", inputDefinition);
|
||||||
|
configuration.getCommon().addEventType("OperatingPeriodFlushEvent", Map.of("reason", String.class));
|
||||||
|
},
|
||||||
|
epl,
|
||||||
|
List.of("periodizedActivityIntervals", "operatingPeriodClosed"),
|
||||||
|
(statementName, newData) -> {
|
||||||
|
if ("periodizedActivityIntervals".equals(statementName)) {
|
||||||
|
collectPeriodizedOutputs(newData, periodizedIntervals);
|
||||||
|
} else if ("operatingPeriodClosed".equals(statementName)) {
|
||||||
|
collectClosedOutputs(newData, closedPeriods);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
runtime -> {
|
||||||
|
// Historical evaluation sends the complete interval timeline first and then a single flush event
|
||||||
|
// so the EPL state machine can emit the final still-open operating period.
|
||||||
|
for (ActivityIntervalDto interval : sorted) {
|
||||||
|
runtime.getEventService().sendEventMap(toInputMap(interval), "OperatingPeriodInputMap");
|
||||||
|
}
|
||||||
|
runtime.getEventService().sendEventMap(Map.of("reason", "HISTORICAL_EVALUATION"), "OperatingPeriodFlushEvent");
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return new EsperOperatingPeriodEvaluation(
|
||||||
|
periodizedIntervals.stream()
|
||||||
|
.sorted(Comparator.comparing(OperatingPeriodActivityIntervalDto::startedAt)
|
||||||
|
.thenComparing(OperatingPeriodActivityIntervalDto::endedAt)
|
||||||
|
.thenComparing(OperatingPeriodActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo)))
|
||||||
|
.toList(),
|
||||||
|
closedPeriods.stream()
|
||||||
|
.sorted(Comparator.comparing(EsperClosedOperatingPeriod::startedAt))
|
||||||
|
.toList()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private void executeWithRuntime(
|
private void executeWithRuntime(
|
||||||
java.util.function.Consumer<Configuration> configurationSetup,
|
java.util.function.Consumer<Configuration> configurationSetup,
|
||||||
String epl,
|
String epl,
|
||||||
String statementName,
|
List<String> statementNames,
|
||||||
java.util.function.Consumer<EventBean[]> listener,
|
StatementListener listener,
|
||||||
java.util.function.Consumer<EPRuntime> sender
|
java.util.function.Consumer<EPRuntime> sender
|
||||||
) {
|
) {
|
||||||
EPRuntime runtime = null;
|
EPRuntime runtime = null;
|
||||||
|
|
@ -69,9 +160,12 @@ public class EsperOperatingPeriodEngine {
|
||||||
CompilerArguments arguments = new CompilerArguments(configuration);
|
CompilerArguments arguments = new CompilerArguments(configuration);
|
||||||
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments);
|
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments);
|
||||||
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
|
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
|
||||||
|
// Multiple statements may emit outputs from a single deployment; we dispatch by statement name.
|
||||||
|
for (String statementName : statementNames) {
|
||||||
runtime.getDeploymentService()
|
runtime.getDeploymentService()
|
||||||
.getStatement(deployment.getDeploymentId(), statementName)
|
.getStatement(deployment.getDeploymentId(), statementName)
|
||||||
.addListener((newData, oldData, statement, rt) -> listener.accept(newData));
|
.addListener((newData, oldData, statement, rt) -> listener.accept(statementName, newData));
|
||||||
|
}
|
||||||
sender.accept(runtime);
|
sender.accept(runtime);
|
||||||
} catch (EPCompileException | EPDeployException e) {
|
} catch (EPCompileException | EPDeployException e) {
|
||||||
throw new IllegalStateException("Cannot compile/deploy Esper operating-period EPL", e);
|
throw new IllegalStateException("Cannot compile/deploy Esper operating-period EPL", e);
|
||||||
|
|
@ -87,10 +181,66 @@ public class EsperOperatingPeriodEngine {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (EventBean event : newData) {
|
for (EventBean event : newData) {
|
||||||
|
// Stream-collector mode receives the ordered interval stream back from Esper and applies the
|
||||||
|
// deterministic Java state machine to it.
|
||||||
collector.accept((EsperOperatingPeriodIntervalInputEvent) event.getUnderlying());
|
collector.accept((EsperOperatingPeriodIntervalInputEvent) event.getUnderlying());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void collectPeriodizedOutputs(EventBean[] newData, List<OperatingPeriodActivityIntervalDto> target) {
|
||||||
|
if (newData == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (EventBean event : newData) {
|
||||||
|
target.add(new OperatingPeriodActivityIntervalDto(
|
||||||
|
(java.util.UUID) event.get("driverId"),
|
||||||
|
(java.util.UUID) event.get("vehicleId"),
|
||||||
|
(java.util.UUID) event.get("vehicleRegistrationId"),
|
||||||
|
(String) event.get("activityType"),
|
||||||
|
(String) event.get("cardSlot"),
|
||||||
|
(String) event.get("cardStatus"),
|
||||||
|
(String) event.get("drivingStatus"),
|
||||||
|
(String) event.get("sourceKind"),
|
||||||
|
OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("startedAtTs")), java.time.ZoneOffset.UTC),
|
||||||
|
OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("endedAtTs")), java.time.ZoneOffset.UTC),
|
||||||
|
((Long) event.get("durationMs")) / 1000L,
|
||||||
|
(String) event.get("sourceRowId"),
|
||||||
|
castSourceRowIds(event.get("sourceRowIds")),
|
||||||
|
(Boolean) event.get("clippedToRequestedPeriod"),
|
||||||
|
"PERIODIZED_ACTIVITY",
|
||||||
|
(Long) event.get("operatingPeriodNo"),
|
||||||
|
OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("operatingPeriodStartedAtTs")), java.time.ZoneOffset.UTC),
|
||||||
|
(Boolean) event.get("newOperatingPeriod"),
|
||||||
|
nullableMillisToSeconds((Long) event.get("gapSincePreviousActivityMs")),
|
||||||
|
(Boolean) event.get("synthetic")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void collectClosedOutputs(EventBean[] newData, List<EsperClosedOperatingPeriod> target) {
|
||||||
|
if (newData == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (EventBean event : newData) {
|
||||||
|
target.add(new EsperClosedOperatingPeriod(
|
||||||
|
(Long) event.get("operatingPeriodNo"),
|
||||||
|
OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("operatingPeriodStartedAtTs")), java.time.ZoneOffset.UTC),
|
||||||
|
OffsetDateTime.ofInstant(Instant.ofEpochMilli((Long) event.get("operatingPeriodEndedAtTs")), java.time.ZoneOffset.UTC),
|
||||||
|
((Long) event.get("durationMs")) / 1000L,
|
||||||
|
(String) event.get("closedBy")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private List<String> castSourceRowIds(Object value) {
|
||||||
|
return value == null ? List.of() : List.copyOf((List<String>) value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long nullableMillisToSeconds(Long value) {
|
||||||
|
return value == null ? null : value / 1000L;
|
||||||
|
}
|
||||||
|
|
||||||
private EsperOperatingPeriodIntervalInputEvent toInputEvent(ActivityIntervalDto interval) {
|
private EsperOperatingPeriodIntervalInputEvent toInputEvent(ActivityIntervalDto interval) {
|
||||||
return new EsperOperatingPeriodIntervalInputEvent(
|
return new EsperOperatingPeriodIntervalInputEvent(
|
||||||
interval.driverEntityId(),
|
interval.driverEntityId(),
|
||||||
|
|
@ -111,6 +261,26 @@ public class EsperOperatingPeriodEngine {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> toInputMap(ActivityIntervalDto interval) {
|
||||||
|
Map<String, Object> map = new LinkedHashMap<>();
|
||||||
|
map.put("driverId", interval.driverEntityId());
|
||||||
|
map.put("vehicleId", interval.vehicleId());
|
||||||
|
map.put("vehicleRegistrationId", interval.vehicleRegistrationId());
|
||||||
|
map.put("activityType", interval.activityType());
|
||||||
|
map.put("cardSlot", interval.cardSlot());
|
||||||
|
map.put("cardStatus", interval.cardStatus());
|
||||||
|
map.put("drivingStatus", interval.drivingStatus());
|
||||||
|
map.put("sourceKind", interval.sourceKind());
|
||||||
|
map.put("startTs", interval.startedAt().toInstant().toEpochMilli());
|
||||||
|
map.put("endTs", interval.endedAt().toInstant().toEpochMilli());
|
||||||
|
map.put("durationMs", interval.durationSeconds() * 1000L);
|
||||||
|
map.put("sourceRowId", interval.sourceRowId());
|
||||||
|
map.put("sourceRowIds", interval.sourceRowIds());
|
||||||
|
map.put("clippedToRequestedPeriod", interval.clippedToRequestedPeriod());
|
||||||
|
map.put("synthetic", "UNKNOWN_GAP".equals(interval.level()));
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
private List<ActivityIntervalDto> sortedPositiveIntervals(List<ActivityIntervalDto> intervals) {
|
private List<ActivityIntervalDto> sortedPositiveIntervals(List<ActivityIntervalDto> intervals) {
|
||||||
if (intervals == null || intervals.isEmpty()) {
|
if (intervals == null || intervals.isEmpty()) {
|
||||||
return List.of();
|
return List.of();
|
||||||
|
|
@ -129,6 +299,18 @@ public class EsperOperatingPeriodEngine {
|
||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private interface StatementListener {
|
||||||
|
void accept(String statementName, EventBean[] newData);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String loadResource(String path) {
|
||||||
|
try {
|
||||||
|
return StreamUtils.copyToString(new ClassPathResource(path).getInputStream(), StandardCharsets.UTF_8);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalStateException("Cannot load Esper resource " + path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static final class PeriodizationCollector {
|
private static final class PeriodizationCollector {
|
||||||
private final Duration operatingSplitIdleThreshold;
|
private final Duration operatingSplitIdleThreshold;
|
||||||
private final List<OperatingPeriodActivityIntervalDto> periodizedIntervals = new ArrayList<>();
|
private final List<OperatingPeriodActivityIntervalDto> periodizedIntervals = new ArrayList<>();
|
||||||
|
|
@ -163,12 +345,16 @@ public class EsperOperatingPeriodEngine {
|
||||||
|
|
||||||
if ("UNKNOWN".equals(dto.activityType())) {
|
if ("UNKNOWN".equals(dto.activityType())) {
|
||||||
if (!hasOpenPeriod) {
|
if (!hasOpenPeriod) {
|
||||||
|
// Unknown time before the first known activity does not belong to any operating period.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (dto.durationSeconds() >= operatingSplitIdleThreshold.getSeconds()) {
|
if (dto.durationSeconds() >= operatingSplitIdleThreshold.getSeconds()) {
|
||||||
|
// Long UNKNOWN behaves like a closing gap: close the current period and wait for the next
|
||||||
|
// known activity to reopen a new period number.
|
||||||
closeCurrent("UNKNOWN_GAP");
|
closeCurrent("UNKNOWN_GAP");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// Short UNKNOWN stays inside the current period as explicit uncertainty.
|
||||||
periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized(
|
periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized(
|
||||||
dto,
|
dto,
|
||||||
operatingPeriodNo,
|
operatingPeriodNo,
|
||||||
|
|
@ -180,6 +366,7 @@ public class EsperOperatingPeriodEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hasOpenPeriod) {
|
if (!hasOpenPeriod) {
|
||||||
|
// First known activity, or first activity after a long closing gap, opens a new operating period.
|
||||||
operatingPeriodNo = operatingPeriodNo < 1 ? 1 : operatingPeriodNo + 1;
|
operatingPeriodNo = operatingPeriodNo < 1 ? 1 : operatingPeriodNo + 1;
|
||||||
hasOpenPeriod = true;
|
hasOpenPeriod = true;
|
||||||
operatingPeriodStartedAt = dto.startedAt();
|
operatingPeriodStartedAt = dto.startedAt();
|
||||||
|
|
@ -196,6 +383,7 @@ public class EsperOperatingPeriodEngine {
|
||||||
|
|
||||||
long gapSeconds = Math.max(0, Duration.between(lastKnownActivityEndAt, dto.startedAt()).getSeconds());
|
long gapSeconds = Math.max(0, Duration.between(lastKnownActivityEndAt, dto.startedAt()).getSeconds());
|
||||||
if (gapSeconds >= operatingSplitIdleThreshold.getSeconds()) {
|
if (gapSeconds >= operatingSplitIdleThreshold.getSeconds()) {
|
||||||
|
// Long idle time between known activities closes the current period and starts the next one.
|
||||||
closeCurrent("IDLE_GAP");
|
closeCurrent("IDLE_GAP");
|
||||||
operatingPeriodNo++;
|
operatingPeriodNo++;
|
||||||
hasOpenPeriod = true;
|
hasOpenPeriod = true;
|
||||||
|
|
@ -211,6 +399,7 @@ public class EsperOperatingPeriodEngine {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Normal forward continuity inside the same period.
|
||||||
periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized(
|
periodizedIntervals.add(OperatingPeriodActivityIntervalDto.periodized(
|
||||||
dto,
|
dto,
|
||||||
operatingPeriodNo,
|
operatingPeriodNo,
|
||||||
|
|
@ -225,6 +414,7 @@ public class EsperOperatingPeriodEngine {
|
||||||
|
|
||||||
private EsperOperatingPeriodEvaluation finish() {
|
private EsperOperatingPeriodEvaluation finish() {
|
||||||
if (hasOpenPeriod) {
|
if (hasOpenPeriod) {
|
||||||
|
// Historical evaluation has no future event to close the final period, so emit it explicitly.
|
||||||
closeCurrent("FLUSH");
|
closeCurrent("FLUSH");
|
||||||
}
|
}
|
||||||
return new EsperOperatingPeriodEvaluation(
|
return new EsperOperatingPeriodEvaluation(
|
||||||
|
|
@ -244,6 +434,7 @@ public class EsperOperatingPeriodEngine {
|
||||||
hasOpenPeriod = false;
|
hasOpenPeriod = false;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// A closed period always ends at the last known non-rest activity end, never at the synthetic UNKNOWN.
|
||||||
closedPeriods.add(new EsperClosedOperatingPeriod(
|
closedPeriods.add(new EsperClosedOperatingPeriod(
|
||||||
operatingPeriodNo,
|
operatingPeriodNo,
|
||||||
operatingPeriodStartedAt,
|
operatingPeriodStartedAt,
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package at.procon.eventhub.esperpoc.service;
|
||||||
import at.procon.eventhub.config.EventHubProperties;
|
import at.procon.eventhub.config.EventHubProperties;
|
||||||
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
|
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
|
||||||
import at.procon.eventhub.esperpoc.dto.DrivingInterruptionDto;
|
import at.procon.eventhub.esperpoc.dto.DrivingInterruptionDto;
|
||||||
|
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode;
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
|
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
|
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
|
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
|
||||||
|
|
@ -68,6 +69,7 @@ public class EsperOperatingPeriodEvaluationService {
|
||||||
Duration mergeGapTolerance = Duration.ofSeconds(resolveMergeGapSeconds(request));
|
Duration mergeGapTolerance = Duration.ofSeconds(resolveMergeGapSeconds(request));
|
||||||
Duration gapDetectionTolerance = Duration.ofSeconds(resolveGapDetectionToleranceSeconds(request));
|
Duration gapDetectionTolerance = Duration.ofSeconds(resolveGapDetectionToleranceSeconds(request));
|
||||||
EsperUnknownTreatmentMode unknownTreatmentMode = resolveUnknownTreatmentMode(request);
|
EsperUnknownTreatmentMode unknownTreatmentMode = resolveUnknownTreatmentMode(request);
|
||||||
|
EsperOperatingPeriodEngineMode engineMode = resolveEngineMode(request);
|
||||||
|
|
||||||
long dbStartedNanos = System.nanoTime();
|
long dbStartedNanos = System.nanoTime();
|
||||||
List<RawActivityEventDto> rawEvents = activityRepository.findDriverActivityEvents(
|
List<RawActivityEventDto> rawEvents = activityRepository.findDriverActivityEvents(
|
||||||
|
|
@ -105,7 +107,8 @@ public class EsperOperatingPeriodEvaluationService {
|
||||||
long periodizeStartedNanos = System.nanoTime();
|
long periodizeStartedNanos = System.nanoTime();
|
||||||
EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate(
|
EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate(
|
||||||
evaluationLoadedIntervals,
|
evaluationLoadedIntervals,
|
||||||
splitIdleThreshold
|
splitIdleThreshold,
|
||||||
|
engineMode
|
||||||
);
|
);
|
||||||
long periodizeElapsedMs = elapsedMillis(periodizeStartedNanos);
|
long periodizeElapsedMs = elapsedMillis(periodizeStartedNanos);
|
||||||
|
|
||||||
|
|
@ -150,7 +153,7 @@ public class EsperOperatingPeriodEvaluationService {
|
||||||
);
|
);
|
||||||
long totalElapsedMs = elapsedMillis(startedNanos);
|
long totalElapsedMs = elapsedMillis(startedNanos);
|
||||||
|
|
||||||
log.info("Esper operating-period evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} unknownMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedKnownIntervals={} evaluationIntervals={} periodizedIntervals={} mergedIntervals={} nonDrivingIntervals={} operatingPeriods={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, synthUnknown={}, periodizeEsper={}, merge={}, nonDriving={}, total={}}}",
|
log.info("Esper operating-period evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} unknownMode={} engineMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedKnownIntervals={} evaluationIntervals={} periodizedIntervals={} mergedIntervals={} nonDrivingIntervals={} operatingPeriods={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, synthUnknown={}, periodizeEsper={}, merge={}, nonDriving={}, total={}}}",
|
||||||
request.tenantKey(),
|
request.tenantKey(),
|
||||||
request.driverId(),
|
request.driverId(),
|
||||||
requestedFrom,
|
requestedFrom,
|
||||||
|
|
@ -158,6 +161,7 @@ public class EsperOperatingPeriodEvaluationService {
|
||||||
loadedFrom,
|
loadedFrom,
|
||||||
loadedTo,
|
loadedTo,
|
||||||
unknownTreatmentMode,
|
unknownTreatmentMode,
|
||||||
|
engineMode,
|
||||||
rawEvents.size(),
|
rawEvents.size(),
|
||||||
driverCardRawEvents.size(),
|
driverCardRawEvents.size(),
|
||||||
vehicleUnitRawEvents.size(),
|
vehicleUnitRawEvents.size(),
|
||||||
|
|
@ -202,6 +206,7 @@ public class EsperOperatingPeriodEvaluationService {
|
||||||
resolveMergeGapSeconds(request),
|
resolveMergeGapSeconds(request),
|
||||||
resolveGapDetectionToleranceSeconds(request),
|
resolveGapDetectionToleranceSeconds(request),
|
||||||
unknownTreatmentMode,
|
unknownTreatmentMode,
|
||||||
|
engineMode,
|
||||||
rawEvents,
|
rawEvents,
|
||||||
resolvedKnownLoadedIntervals,
|
resolvedKnownLoadedIntervals,
|
||||||
evaluationLoadedIntervals,
|
evaluationLoadedIntervals,
|
||||||
|
|
@ -210,6 +215,7 @@ public class EsperOperatingPeriodEvaluationService {
|
||||||
nonDrivingIntervals,
|
nonDrivingIntervals,
|
||||||
operatingPeriods,
|
operatingPeriods,
|
||||||
notes(
|
notes(
|
||||||
|
engineMode,
|
||||||
unknownTreatmentMode,
|
unknownTreatmentMode,
|
||||||
resolveOperatingSplitIdleHours(request),
|
resolveOperatingSplitIdleHours(request),
|
||||||
resolveSignificantDrivingMinutes(request),
|
resolveSignificantDrivingMinutes(request),
|
||||||
|
|
@ -708,7 +714,17 @@ public class EsperOperatingPeriodEvaluationService {
|
||||||
: properties.getEsperPoc().getOperatingPeriodEvaluation().getUnknownTreatmentMode();
|
: properties.getEsperPoc().getOperatingPeriodEvaluation().getUnknownTreatmentMode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private EsperOperatingPeriodEngineMode resolveEngineMode(EsperOperatingPeriodRequest request) {
|
||||||
|
if (request.engineMode() != null) {
|
||||||
|
return request.engineMode();
|
||||||
|
}
|
||||||
|
return properties == null
|
||||||
|
? EsperOperatingPeriodEngineMode.STREAM_COLLECTOR
|
||||||
|
: properties.getEsperPoc().getOperatingPeriodEvaluation().getEngineMode();
|
||||||
|
}
|
||||||
|
|
||||||
private List<String> notes(
|
private List<String> notes(
|
||||||
|
EsperOperatingPeriodEngineMode engineMode,
|
||||||
EsperUnknownTreatmentMode unknownTreatmentMode,
|
EsperUnknownTreatmentMode unknownTreatmentMode,
|
||||||
int operatingSplitIdleHours,
|
int operatingSplitIdleHours,
|
||||||
int significantDrivingMinutes,
|
int significantDrivingMinutes,
|
||||||
|
|
@ -719,6 +735,7 @@ public class EsperOperatingPeriodEvaluationService {
|
||||||
"BREAK_REST events are ignored for activity evaluation but still prevent synthetic UNKNOWN intervals from being created over covered rest spans.",
|
"BREAK_REST events are ignored for activity evaluation but still prevent synthetic UNKNOWN intervals from being created over covered rest spans.",
|
||||||
"Synthetic UNKNOWN intervals are created only for uncovered gaps between non-rest activities.",
|
"Synthetic UNKNOWN intervals are created only for uncovered gaps between non-rest activities.",
|
||||||
"UNKNOWN treatment mode is " + unknownTreatmentMode + ".",
|
"UNKNOWN treatment mode is " + unknownTreatmentMode + ".",
|
||||||
|
"Operating-period engine mode is " + engineMode + ".",
|
||||||
"Operating periods split after " + operatingSplitIdleHours + " hours of no non-rest activity; significant driving closes non-driving intervals from " + significantDrivingMinutes + " minutes onward.",
|
"Operating periods split after " + operatingSplitIdleHours + " hours of no non-rest activity; significant driving closes non-driving intervals from " + significantDrivingMinutes + " minutes onward.",
|
||||||
"Synthetic UNKNOWN gaps are only emitted when uncovered time exceeds " + gapDetectionToleranceSeconds + " seconds."
|
"Synthetic UNKNOWN gaps are only emitted when uncovered time exceeds " + gapDetectionToleranceSeconds + " seconds."
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,355 @@
|
||||||
|
create variable long operatingSplitIdleMs = ${operatingSplitIdleMs};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Full-EPL operating-period state machine.
|
||||||
|
*
|
||||||
|
* Input contract:
|
||||||
|
* - Java sends already-resolved intervals, not raw START/END boundaries.
|
||||||
|
* - DRIVER_CARD remains authoritative and VU is only used to fill uncovered gaps before events reach this EPL.
|
||||||
|
* - Synthetic uncovered gaps arrive as activityType = 'UNKNOWN'.
|
||||||
|
*
|
||||||
|
* Output contract:
|
||||||
|
* - PeriodizedActivityInterval: every input interval assigned to an operating period
|
||||||
|
* - OperatingPeriodClosed: closed operating periods, including the final FLUSH period
|
||||||
|
*/
|
||||||
|
|
||||||
|
create schema KnownOperatingInput(
|
||||||
|
driverId java.util.UUID,
|
||||||
|
vehicleId java.util.UUID,
|
||||||
|
vehicleRegistrationId java.util.UUID,
|
||||||
|
activityType string,
|
||||||
|
cardSlot string,
|
||||||
|
cardStatus string,
|
||||||
|
drivingStatus string,
|
||||||
|
sourceKind string,
|
||||||
|
startTs long,
|
||||||
|
endTs long,
|
||||||
|
durationMs long,
|
||||||
|
sourceRowId string,
|
||||||
|
sourceRowIds java.util.List,
|
||||||
|
clippedToRequestedPeriod boolean,
|
||||||
|
synthetic boolean
|
||||||
|
);
|
||||||
|
|
||||||
|
create schema UnknownOperatingInput(
|
||||||
|
driverId java.util.UUID,
|
||||||
|
vehicleId java.util.UUID,
|
||||||
|
vehicleRegistrationId java.util.UUID,
|
||||||
|
activityType string,
|
||||||
|
cardSlot string,
|
||||||
|
cardStatus string,
|
||||||
|
drivingStatus string,
|
||||||
|
sourceKind string,
|
||||||
|
startTs long,
|
||||||
|
endTs long,
|
||||||
|
durationMs long,
|
||||||
|
sourceRowId string,
|
||||||
|
sourceRowIds java.util.List,
|
||||||
|
clippedToRequestedPeriod boolean,
|
||||||
|
synthetic boolean
|
||||||
|
);
|
||||||
|
|
||||||
|
create schema PeriodizedActivityInterval(
|
||||||
|
driverId java.util.UUID,
|
||||||
|
vehicleId java.util.UUID,
|
||||||
|
vehicleRegistrationId java.util.UUID,
|
||||||
|
activityType string,
|
||||||
|
cardSlot string,
|
||||||
|
cardStatus string,
|
||||||
|
drivingStatus string,
|
||||||
|
sourceKind string,
|
||||||
|
startedAtTs long,
|
||||||
|
endedAtTs long,
|
||||||
|
durationMs long,
|
||||||
|
sourceRowId string,
|
||||||
|
sourceRowIds java.util.List,
|
||||||
|
clippedToRequestedPeriod boolean,
|
||||||
|
operatingPeriodNo long,
|
||||||
|
operatingPeriodStartedAtTs long,
|
||||||
|
newOperatingPeriod boolean,
|
||||||
|
gapSincePreviousActivityMs java.lang.Long,
|
||||||
|
synthetic boolean
|
||||||
|
);
|
||||||
|
|
||||||
|
create schema OperatingPeriodClosed(
|
||||||
|
driverId java.util.UUID,
|
||||||
|
operatingPeriodNo long,
|
||||||
|
operatingPeriodStartedAtTs long,
|
||||||
|
operatingPeriodEndedAtTs long,
|
||||||
|
durationMs long,
|
||||||
|
closedBy string
|
||||||
|
);
|
||||||
|
|
||||||
|
create window OperatingPeriodState#unique(driverId) as (
|
||||||
|
driverId java.util.UUID,
|
||||||
|
hasOpen boolean,
|
||||||
|
operatingPeriodNo long,
|
||||||
|
operatingPeriodStartedAtTs long,
|
||||||
|
lastKnownActivityEndTs long
|
||||||
|
);
|
||||||
|
|
||||||
|
/* Split the timeline into known activities and synthetic UNKNOWN gaps. */
|
||||||
|
insert into KnownOperatingInput
|
||||||
|
select * from OperatingPeriodInputMap as i where i.activityType != 'UNKNOWN';
|
||||||
|
|
||||||
|
insert into UnknownOperatingInput
|
||||||
|
select * from OperatingPeriodInputMap as i where i.activityType = 'UNKNOWN';
|
||||||
|
|
||||||
|
/* First known activity for a driver opens operating period 1. */
|
||||||
|
@Priority(200)
|
||||||
|
on KnownOperatingInput as i
|
||||||
|
insert into PeriodizedActivityInterval
|
||||||
|
select
|
||||||
|
i.driverId as driverId,
|
||||||
|
i.vehicleId as vehicleId,
|
||||||
|
i.vehicleRegistrationId as vehicleRegistrationId,
|
||||||
|
i.activityType as activityType,
|
||||||
|
i.cardSlot as cardSlot,
|
||||||
|
i.cardStatus as cardStatus,
|
||||||
|
i.drivingStatus as drivingStatus,
|
||||||
|
i.sourceKind as sourceKind,
|
||||||
|
i.startTs as startedAtTs,
|
||||||
|
i.endTs as endedAtTs,
|
||||||
|
i.durationMs as durationMs,
|
||||||
|
i.sourceRowId as sourceRowId,
|
||||||
|
i.sourceRowIds as sourceRowIds,
|
||||||
|
i.clippedToRequestedPeriod as clippedToRequestedPeriod,
|
||||||
|
1L as operatingPeriodNo,
|
||||||
|
i.startTs as operatingPeriodStartedAtTs,
|
||||||
|
true as newOperatingPeriod,
|
||||||
|
cast(null, java.lang.Long) as gapSincePreviousActivityMs,
|
||||||
|
i.synthetic as synthetic
|
||||||
|
where not exists (select * from OperatingPeriodState as s where s.driverId = i.driverId);
|
||||||
|
|
||||||
|
@Priority(190)
|
||||||
|
on KnownOperatingInput as i
|
||||||
|
insert into OperatingPeriodState
|
||||||
|
select
|
||||||
|
i.driverId as driverId,
|
||||||
|
true as hasOpen,
|
||||||
|
1L as operatingPeriodNo,
|
||||||
|
i.startTs as operatingPeriodStartedAtTs,
|
||||||
|
i.endTs as lastKnownActivityEndTs
|
||||||
|
where not exists (select * from OperatingPeriodState as s where s.driverId = i.driverId);
|
||||||
|
|
||||||
|
/* A long forward gap between known activities closes the current period with reason IDLE_GAP. */
|
||||||
|
@Priority(180)
|
||||||
|
on KnownOperatingInput as i
|
||||||
|
insert into OperatingPeriodClosed
|
||||||
|
select
|
||||||
|
s.driverId as driverId,
|
||||||
|
s.operatingPeriodNo as operatingPeriodNo,
|
||||||
|
s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs,
|
||||||
|
s.lastKnownActivityEndTs as operatingPeriodEndedAtTs,
|
||||||
|
s.lastKnownActivityEndTs - s.operatingPeriodStartedAtTs as durationMs,
|
||||||
|
'IDLE_GAP' as closedBy
|
||||||
|
from OperatingPeriodState as s
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = true
|
||||||
|
and i.startTs - s.lastKnownActivityEndTs >= operatingSplitIdleMs;
|
||||||
|
|
||||||
|
/* After a long idle gap, the next known interval is emitted as the first interval of the next period. */
|
||||||
|
@Priority(170)
|
||||||
|
on KnownOperatingInput as i
|
||||||
|
insert into PeriodizedActivityInterval
|
||||||
|
select
|
||||||
|
i.driverId as driverId,
|
||||||
|
i.vehicleId as vehicleId,
|
||||||
|
i.vehicleRegistrationId as vehicleRegistrationId,
|
||||||
|
i.activityType as activityType,
|
||||||
|
i.cardSlot as cardSlot,
|
||||||
|
i.cardStatus as cardStatus,
|
||||||
|
i.drivingStatus as drivingStatus,
|
||||||
|
i.sourceKind as sourceKind,
|
||||||
|
i.startTs as startedAtTs,
|
||||||
|
i.endTs as endedAtTs,
|
||||||
|
i.durationMs as durationMs,
|
||||||
|
i.sourceRowId as sourceRowId,
|
||||||
|
i.sourceRowIds as sourceRowIds,
|
||||||
|
i.clippedToRequestedPeriod as clippedToRequestedPeriod,
|
||||||
|
s.operatingPeriodNo + 1 as operatingPeriodNo,
|
||||||
|
i.startTs as operatingPeriodStartedAtTs,
|
||||||
|
true as newOperatingPeriod,
|
||||||
|
cast(i.startTs - s.lastKnownActivityEndTs, java.lang.Long) as gapSincePreviousActivityMs,
|
||||||
|
i.synthetic as synthetic
|
||||||
|
from OperatingPeriodState as s
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = true
|
||||||
|
and i.startTs - s.lastKnownActivityEndTs >= operatingSplitIdleMs;
|
||||||
|
|
||||||
|
/* Update the window state to the newly opened period after an IDLE_GAP close. */
|
||||||
|
@Priority(160)
|
||||||
|
on KnownOperatingInput as i
|
||||||
|
update OperatingPeriodState as s
|
||||||
|
set
|
||||||
|
hasOpen = true,
|
||||||
|
operatingPeriodNo = s.operatingPeriodNo + 1,
|
||||||
|
operatingPeriodStartedAtTs = i.startTs,
|
||||||
|
lastKnownActivityEndTs = i.endTs
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = true
|
||||||
|
and i.startTs - s.lastKnownActivityEndTs >= operatingSplitIdleMs;
|
||||||
|
|
||||||
|
/* After a long UNKNOWN gap we keep the counter but mark the period closed. The next known activity reopens
|
||||||
|
* the next period number using the retained state row. */
|
||||||
|
@Priority(155)
|
||||||
|
on KnownOperatingInput as i
|
||||||
|
insert into PeriodizedActivityInterval
|
||||||
|
select
|
||||||
|
i.driverId as driverId,
|
||||||
|
i.vehicleId as vehicleId,
|
||||||
|
i.vehicleRegistrationId as vehicleRegistrationId,
|
||||||
|
i.activityType as activityType,
|
||||||
|
i.cardSlot as cardSlot,
|
||||||
|
i.cardStatus as cardStatus,
|
||||||
|
i.drivingStatus as drivingStatus,
|
||||||
|
i.sourceKind as sourceKind,
|
||||||
|
i.startTs as startedAtTs,
|
||||||
|
i.endTs as endedAtTs,
|
||||||
|
i.durationMs as durationMs,
|
||||||
|
i.sourceRowId as sourceRowId,
|
||||||
|
i.sourceRowIds as sourceRowIds,
|
||||||
|
i.clippedToRequestedPeriod as clippedToRequestedPeriod,
|
||||||
|
s.operatingPeriodNo + 1 as operatingPeriodNo,
|
||||||
|
i.startTs as operatingPeriodStartedAtTs,
|
||||||
|
true as newOperatingPeriod,
|
||||||
|
cast(null, java.lang.Long) as gapSincePreviousActivityMs,
|
||||||
|
i.synthetic as synthetic
|
||||||
|
from OperatingPeriodState as s
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = false;
|
||||||
|
|
||||||
|
@Priority(145)
|
||||||
|
on KnownOperatingInput as i
|
||||||
|
update OperatingPeriodState as s
|
||||||
|
set
|
||||||
|
hasOpen = true,
|
||||||
|
operatingPeriodNo = s.operatingPeriodNo + 1,
|
||||||
|
operatingPeriodStartedAtTs = i.startTs,
|
||||||
|
lastKnownActivityEndTs = i.endTs
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = false;
|
||||||
|
|
||||||
|
/* Normal same-period continuity: the gap is forward, non-negative, and still below the split threshold. */
|
||||||
|
@Priority(150)
|
||||||
|
on KnownOperatingInput as i
|
||||||
|
insert into PeriodizedActivityInterval
|
||||||
|
select
|
||||||
|
i.driverId as driverId,
|
||||||
|
i.vehicleId as vehicleId,
|
||||||
|
i.vehicleRegistrationId as vehicleRegistrationId,
|
||||||
|
i.activityType as activityType,
|
||||||
|
i.cardSlot as cardSlot,
|
||||||
|
i.cardStatus as cardStatus,
|
||||||
|
i.drivingStatus as drivingStatus,
|
||||||
|
i.sourceKind as sourceKind,
|
||||||
|
i.startTs as startedAtTs,
|
||||||
|
i.endTs as endedAtTs,
|
||||||
|
i.durationMs as durationMs,
|
||||||
|
i.sourceRowId as sourceRowId,
|
||||||
|
i.sourceRowIds as sourceRowIds,
|
||||||
|
i.clippedToRequestedPeriod as clippedToRequestedPeriod,
|
||||||
|
s.operatingPeriodNo as operatingPeriodNo,
|
||||||
|
s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs,
|
||||||
|
false as newOperatingPeriod,
|
||||||
|
cast(i.startTs - s.lastKnownActivityEndTs, java.lang.Long) as gapSincePreviousActivityMs,
|
||||||
|
i.synthetic as synthetic
|
||||||
|
from OperatingPeriodState as s
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = true
|
||||||
|
and i.startTs - s.lastKnownActivityEndTs >= 0
|
||||||
|
and i.startTs - s.lastKnownActivityEndTs < operatingSplitIdleMs;
|
||||||
|
|
||||||
|
@Priority(140)
|
||||||
|
on KnownOperatingInput as i
|
||||||
|
update OperatingPeriodState as s
|
||||||
|
set
|
||||||
|
lastKnownActivityEndTs = case
|
||||||
|
when i.endTs > s.lastKnownActivityEndTs then i.endTs
|
||||||
|
else s.lastKnownActivityEndTs
|
||||||
|
end
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = true
|
||||||
|
and i.startTs - s.lastKnownActivityEndTs >= 0
|
||||||
|
and i.startTs - s.lastKnownActivityEndTs < operatingSplitIdleMs;
|
||||||
|
|
||||||
|
/* A long UNKNOWN interval closes the current period, but the state row remains so the next known activity
|
||||||
|
* can reopen with the next period number. */
|
||||||
|
@Priority(130)
|
||||||
|
on UnknownOperatingInput as i
|
||||||
|
insert into OperatingPeriodClosed
|
||||||
|
select
|
||||||
|
s.driverId as driverId,
|
||||||
|
s.operatingPeriodNo as operatingPeriodNo,
|
||||||
|
s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs,
|
||||||
|
s.lastKnownActivityEndTs as operatingPeriodEndedAtTs,
|
||||||
|
s.lastKnownActivityEndTs - s.operatingPeriodStartedAtTs as durationMs,
|
||||||
|
'UNKNOWN_GAP' as closedBy
|
||||||
|
from OperatingPeriodState as s
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = true
|
||||||
|
and i.durationMs >= operatingSplitIdleMs;
|
||||||
|
|
||||||
|
@Priority(120)
|
||||||
|
on UnknownOperatingInput as i
|
||||||
|
update OperatingPeriodState as s
|
||||||
|
set hasOpen = false
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = true
|
||||||
|
and i.durationMs >= operatingSplitIdleMs;
|
||||||
|
|
||||||
|
/* Short UNKNOWN stays inside the open period as explicit uncertainty, without changing period state. */
|
||||||
|
@Priority(110)
|
||||||
|
on UnknownOperatingInput as i
|
||||||
|
insert into PeriodizedActivityInterval
|
||||||
|
select
|
||||||
|
i.driverId as driverId,
|
||||||
|
i.vehicleId as vehicleId,
|
||||||
|
i.vehicleRegistrationId as vehicleRegistrationId,
|
||||||
|
i.activityType as activityType,
|
||||||
|
i.cardSlot as cardSlot,
|
||||||
|
i.cardStatus as cardStatus,
|
||||||
|
i.drivingStatus as drivingStatus,
|
||||||
|
i.sourceKind as sourceKind,
|
||||||
|
i.startTs as startedAtTs,
|
||||||
|
i.endTs as endedAtTs,
|
||||||
|
i.durationMs as durationMs,
|
||||||
|
i.sourceRowId as sourceRowId,
|
||||||
|
i.sourceRowIds as sourceRowIds,
|
||||||
|
i.clippedToRequestedPeriod as clippedToRequestedPeriod,
|
||||||
|
s.operatingPeriodNo as operatingPeriodNo,
|
||||||
|
s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs,
|
||||||
|
false as newOperatingPeriod,
|
||||||
|
cast(i.startTs - s.lastKnownActivityEndTs, java.lang.Long) as gapSincePreviousActivityMs,
|
||||||
|
i.synthetic as synthetic
|
||||||
|
from OperatingPeriodState as s
|
||||||
|
where s.driverId = i.driverId
|
||||||
|
and s.hasOpen = true
|
||||||
|
and i.durationMs < operatingSplitIdleMs;
|
||||||
|
|
||||||
|
/* Historical evaluation ends with a flush event so the final still-open period is emitted. */
|
||||||
|
@Priority(100)
|
||||||
|
on OperatingPeriodFlushEvent as f
|
||||||
|
insert into OperatingPeriodClosed
|
||||||
|
select
|
||||||
|
s.driverId as driverId,
|
||||||
|
s.operatingPeriodNo as operatingPeriodNo,
|
||||||
|
s.operatingPeriodStartedAtTs as operatingPeriodStartedAtTs,
|
||||||
|
s.lastKnownActivityEndTs as operatingPeriodEndedAtTs,
|
||||||
|
s.lastKnownActivityEndTs - s.operatingPeriodStartedAtTs as durationMs,
|
||||||
|
'FLUSH' as closedBy
|
||||||
|
from OperatingPeriodState as s
|
||||||
|
where s.hasOpen = true;
|
||||||
|
|
||||||
|
@Priority(90)
|
||||||
|
on OperatingPeriodFlushEvent as f
|
||||||
|
update OperatingPeriodState as s
|
||||||
|
set hasOpen = false
|
||||||
|
where s.hasOpen = true;
|
||||||
|
|
||||||
|
/* Listener-facing output statements consumed by Java. */
|
||||||
|
@name('periodizedActivityIntervals')
|
||||||
|
select * from PeriodizedActivityInterval;
|
||||||
|
|
||||||
|
@name('operatingPeriodClosed')
|
||||||
|
select * from OperatingPeriodClosed;
|
||||||
|
|
@ -3,6 +3,7 @@ package at.procon.eventhub.esperpoc.service;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
|
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
|
||||||
|
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode;
|
||||||
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
|
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
|
||||||
import at.procon.eventhub.esperpoc.dto.NonDrivingIntervalDto;
|
import at.procon.eventhub.esperpoc.dto.NonDrivingIntervalDto;
|
||||||
import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto;
|
import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto;
|
||||||
|
|
@ -53,7 +54,8 @@ class EsperOperatingPeriodEvaluationServiceTest {
|
||||||
|
|
||||||
EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate(
|
EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation evaluation = operatingPeriodEngine.evaluate(
|
||||||
evaluationIntervals,
|
evaluationIntervals,
|
||||||
Duration.ofHours(7)
|
Duration.ofHours(7),
|
||||||
|
EsperOperatingPeriodEngineMode.STREAM_COLLECTOR
|
||||||
);
|
);
|
||||||
|
|
||||||
assertThat(evaluation.periodizedIntervals()).extracting(OperatingPeriodActivityIntervalDto::activityType)
|
assertThat(evaluation.periodizedIntervals()).extracting(OperatingPeriodActivityIntervalDto::activityType)
|
||||||
|
|
@ -99,6 +101,37 @@ class EsperOperatingPeriodEvaluationServiceTest {
|
||||||
assertThat(nonDrivingIntervals.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T09:30:00Z"));
|
assertThat(nonDrivingIntervals.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T09:30:00Z"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void fullEplModeMatchesStreamCollectorMode() {
|
||||||
|
UUID driverId = UUID.randomUUID();
|
||||||
|
List<ActivityIntervalDto> evaluationIntervals = List.of(
|
||||||
|
activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"),
|
||||||
|
activity(driverId, "AVAILABILITY", "2026-04-01T10:00:00Z", "2026-04-01T11:00:00Z", "a1", "DRIVER_CARD"),
|
||||||
|
unknown(driverId, "2026-04-01T11:00:00Z", "2026-04-01T11:30:00Z"),
|
||||||
|
activity(driverId, "WORK", "2026-04-01T11:30:00Z", "2026-04-01T12:00:00Z", "w2", "DRIVER_CARD"),
|
||||||
|
unknown(driverId, "2026-04-01T12:00:00Z", "2026-04-01T20:00:00Z"),
|
||||||
|
activity(driverId, "DRIVE", "2026-04-01T20:00:00Z", "2026-04-01T20:30:00Z", "d1", "DRIVER_CARD")
|
||||||
|
);
|
||||||
|
|
||||||
|
EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation collectorEvaluation = operatingPeriodEngine.evaluate(
|
||||||
|
evaluationIntervals,
|
||||||
|
Duration.ofHours(7),
|
||||||
|
EsperOperatingPeriodEngineMode.STREAM_COLLECTOR
|
||||||
|
);
|
||||||
|
EsperOperatingPeriodEngine.EsperOperatingPeriodEvaluation fullEplEvaluation = operatingPeriodEngine.evaluate(
|
||||||
|
evaluationIntervals,
|
||||||
|
Duration.ofHours(7),
|
||||||
|
EsperOperatingPeriodEngineMode.FULL_EPL
|
||||||
|
);
|
||||||
|
|
||||||
|
assertThat(fullEplEvaluation.periodizedIntervals())
|
||||||
|
.usingRecursiveComparison()
|
||||||
|
.isEqualTo(collectorEvaluation.periodizedIntervals());
|
||||||
|
assertThat(fullEplEvaluation.closedPeriods())
|
||||||
|
.usingRecursiveComparison()
|
||||||
|
.isEqualTo(collectorEvaluation.closedPeriods());
|
||||||
|
}
|
||||||
|
|
||||||
private ActivityIntervalDto activity(
|
private ActivityIntervalDto activity(
|
||||||
UUID driverId,
|
UUID driverId,
|
||||||
String activity,
|
String activity,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue