Refine reusable driver working time projections

This commit is contained in:
trifonovt 2026-06-15 13:14:43 +02:00
parent cdec89aa69
commit e45fe29d3f
5 changed files with 284 additions and 49 deletions

View File

@ -1,25 +1,60 @@
# Fix: cardAbsentCoveragePercent above 100% on reused Esper runtime
# Patch: Reusable Esper runtime execution-state cleanup
## Root cause
## Purpose
`DriverWorkingTimeReusableProjectionBuilder` pools Esper runtimes. The EPL used
`VuCardAbsentInterval#keepall` as a statement-local data window, but the runtime
cleanup did not clear that retained state before the next execution.
Prevent retained Esper state from leaking between pipeline executions when `DriverWorkingTimeReusableProjectionBuilder` reuses a pooled runtime.
When the same pooled runtime processed a second request, the previous execution's
card-absent intervals remained in the overlap calculation. New intervals were
added again, so `cardAbsentDurationSeconds` was doubled while the output listener
still reported only the newly emitted `VuCardAbsentInterval` events.
## Runtime lifecycle
This is source-independent. It appeared in the DB result because that request was
executed after the file-session request on the same pooled runtime.
A pooled runtime now follows this lifecycle:
## Changes
```text
acquire runtime
-> clear execution state before sending events
-> execute the complete derived-projection module
-> detach result listeners
-> clear execution state again in finally
-> return only a clean runtime to the pool
```
- Added public named window `VuCardAbsentIntervalWindow#keepall`.
- Routed generated `VuCardAbsentInterval` events into the named window.
- Changed rest-coverage overlap calculations to read from the named window.
- Added `delete from VuCardAbsentIntervalWindow` to reusable-runtime cleanup.
- Applied the same EPL structure to the legacy/reference projection bundle.
- Added a regression test that executes the same input twice on the same builder
and verifies coverage does not double and remains at or below 100%.
If execution or cleanup fails, the runtime is marked unsafe and destroyed instead of being pooled.
## Resettable input retention
The statement-local usages of:
```epl
TachographVehicleUsageIntervalInputEvent#keepall
```
have been replaced with the public named window:
```epl
TachographVehicleUsageIntervalInputWindow#keepall
```
The window is populated from `TachographVehicleUsageIntervalInputEvent` and is part of the cleanup contract. This prevents odometer and vehicle-usage evidence from a previous execution from participating in a later execution.
The earlier `VuCardAbsentIntervalWindow` fix is retained.
## Cleanup contract
All public named windows in `driver-working-time-derived-projections.epl`, including the context-scoped `PreviousVehicleUsageInterval`, have corresponding fire-and-forget delete queries.
A regression test scans the EPL and fails when a newly introduced public named window is not added to the cleanup contract.
## Metrics
Runtime logging now reports separate values:
- `runtimeResetBeforeMs`
- `runtimeResetAfterMs`
## Tests
Added/retained tests for:
- identical results when a warm runtime is reused;
- no doubled card-absent coverage and coverage not exceeding 100%;
- no retained vehicle-usage/odometer evidence in the next execution;
- cleanup-query coverage for every public named window.

View File

@ -58,7 +58,8 @@ public class DriverWorkingTimeReusableProjectionBuilder {
private static final Map<String, Object> VEHICLE_USAGE_INTERVAL_INPUT_DEFINITION = vehicleUsageIntervalInputDefinitionStatic();
private static final Map<String, Object> SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION = supportGeoEvidenceInputDefinitionStatic();
private static final int MAX_IDLE_RUNTIMES_PER_DEFINITION = 2;
private static final List<String> REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES = List.of(
static final List<String> REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES = List.of(
"delete from TachographVehicleUsageIntervalInputWindow",
"delete from PreviousRestCandidateCoverageInterval",
"delete from OpenPotentialInVehicleTripState",
"delete from SupportGeoEvidenceWindow",
@ -186,7 +187,7 @@ public class DriverWorkingTimeReusableProjectionBuilder {
long sortOutputMs = elapsedMillis(sortOutputStartedAtNanos);
LOG.info(
"Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimePoolHit: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, runtimeResetMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})",
"Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimePoolHit: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, runtimeResetBeforeMs: {}, runtimeResetAfterMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})",
elapsedMillis(startedAtNanos),
runtimeMetrics.definitionCacheHit(),
runtimeMetrics.definitionPreparationMs(),
@ -194,7 +195,8 @@ public class DriverWorkingTimeReusableProjectionBuilder {
runtimeMetrics.runtimeInitMs(),
runtimeMetrics.deployMs(),
runtimeMetrics.listenerRegistrationMs(),
runtimeMetrics.runtimeResetMs(),
runtimeMetrics.runtimeResetBeforeMs(),
runtimeMetrics.runtimeResetAfterMs(),
runtimeMetrics.sendSupportGeoMs(),
runtimeMetrics.sendVehicleUsageMs(),
runtimeMetrics.sendActivityMs(),
@ -288,7 +290,8 @@ public class DriverWorkingTimeReusableProjectionBuilder {
long runtimeInitMs = 0L;
long deployMs = 0L;
long listenerRegistrationMs = 0L;
long runtimeResetMs = 0L;
long runtimeResetBeforeMs = 0L;
long runtimeResetAfterMs = 0L;
long sendSupportGeoMs = 0L;
long sendVehicleUsageMs = 0L;
long sendActivityMs = 0L;
@ -330,14 +333,15 @@ public class DriverWorkingTimeReusableProjectionBuilder {
listenerRegistrationMs = reusableRuntime.listenerRegistrationMs();
ReusableProjectionRuntimeExecution execution = reusableRuntime.execute(listeners, sender);
runtimeResetMs = execution.runtimeResetMs();
runtimeResetBeforeMs = execution.runtimeResetBeforeMs();
runtimeResetAfterMs = execution.runtimeResetAfterMs();
sendSupportGeoMs = execution.sendSupportGeoMs();
sendVehicleUsageMs = execution.sendVehicleUsageMs();
sendActivityMs = execution.sendActivityMs();
} catch (EPCompileException | EPDeployException e) {
discardRuntime = true;
throw new IllegalStateException("Cannot compile/deploy reusable driver working-time projection EPL bundle", e);
} catch (RuntimeException e) {
} catch (RuntimeException | Error e) {
discardRuntime = true;
throw e;
} finally {
@ -356,7 +360,8 @@ public class DriverWorkingTimeReusableProjectionBuilder {
runtimeInitMs,
deployMs,
listenerRegistrationMs,
runtimeResetMs,
runtimeResetBeforeMs,
runtimeResetAfterMs,
sendSupportGeoMs,
sendVehicleUsageMs,
sendActivityMs,
@ -973,7 +978,8 @@ public class DriverWorkingTimeReusableProjectionBuilder {
long runtimeInitMs,
long deployMs,
long listenerRegistrationMs,
long runtimeResetMs,
long runtimeResetBeforeMs,
long runtimeResetAfterMs,
long sendSupportGeoMs,
long sendVehicleUsageMs,
long sendActivityMs,
@ -1000,6 +1006,9 @@ public class DriverWorkingTimeReusableProjectionBuilder {
if (runtime == null) {
return 0L;
}
if (!runtime.cleanForReuse()) {
return runtime.destroy();
}
synchronized (this) {
if (idleRuntimes.size() < MAX_IDLE_RUNTIMES_PER_DEFINITION) {
runtime.poolHit(false);
@ -1019,6 +1028,7 @@ public class DriverWorkingTimeReusableProjectionBuilder {
private volatile long listenerRegistrationMs;
private volatile List<EPCompiled> cleanupQueries = List.of();
private volatile boolean poolHit;
private volatile boolean cleanForReuse = true;
private ExecutionListeners currentExecutionListeners;
private ReusableProjectionRuntime(
@ -1037,24 +1047,48 @@ public class DriverWorkingTimeReusableProjectionBuilder {
Map<String, Consumer<EventBean[]>> listeners,
Consumer<DerivedProjectionEventSender> sender
) {
cleanForReuse = false;
currentExecutionListeners = new ExecutionListeners(listeners);
long runtimeResetStartedAtNanos = System.nanoTime();
for (EPCompiled cleanupQuery : cleanupQueries) {
runtime.getFireAndForgetService().executeQuery(cleanupQuery);
}
long runtimeResetMs = elapsedMillisStatic(runtimeResetStartedAtNanos);
try {
long runtimeResetBeforeMs = 0L;
long runtimeResetAfterMs = 0L;
DerivedProjectionEventSender timedSender = new DerivedProjectionEventSender(runtime);
Throwable executionFailure = null;
try {
runtimeResetBeforeMs = resetExecutionState();
sender.accept(timedSender);
} catch (RuntimeException | Error failure) {
executionFailure = failure;
throw failure;
} finally {
// Cleanup must not feed delete/old-data callbacks into result collectors.
currentExecutionListeners = null;
try {
runtimeResetAfterMs = resetExecutionState();
cleanForReuse = true;
} catch (RuntimeException | Error cleanupFailure) {
cleanForReuse = false;
if (executionFailure != null) {
executionFailure.addSuppressed(cleanupFailure);
} else {
throw cleanupFailure;
}
}
}
return new ReusableProjectionRuntimeExecution(
runtimeResetMs,
runtimeResetBeforeMs,
runtimeResetAfterMs,
timedSender.sendSupportGeoMs(),
timedSender.sendVehicleUsageMs(),
timedSender.sendActivityMs()
);
} finally {
currentExecutionListeners = null;
}
private long resetExecutionState() {
long runtimeResetStartedAtNanos = System.nanoTime();
for (EPCompiled cleanupQuery : cleanupQueries) {
runtime.getFireAndForgetService().executeQuery(cleanupQuery);
}
return elapsedMillisStatic(runtimeResetStartedAtNanos);
}
private void onStatementEvents(String statementName, EventBean[] newData) {
@ -1098,6 +1132,10 @@ public class DriverWorkingTimeReusableProjectionBuilder {
return poolHit;
}
private boolean cleanForReuse() {
return cleanForReuse;
}
private long runtimeInitMs() {
return poolHit ? 0L : runtimeInitMs;
}
@ -1120,7 +1158,8 @@ public class DriverWorkingTimeReusableProjectionBuilder {
}
private record ReusableProjectionRuntimeExecution(
long runtimeResetMs,
long runtimeResetBeforeMs,
long runtimeResetAfterMs,
long sendSupportGeoMs,
long sendVehicleUsageMs,
long sendActivityMs

View File

@ -290,6 +290,12 @@ create schema DailyWeeklyRestCandidateCoverageEmittedKey(
@public create context PerDriver partition by driverKey from TachographVehicleUsageIntervalInputEvent;
@public create window TachographVehicleUsageIntervalInputWindow#keepall as TachographVehicleUsageIntervalInputEvent;
insert into TachographVehicleUsageIntervalInputWindow
select *
from TachographVehicleUsageIntervalInputEvent;
create schema VuCardAbsentInterval(
sessionId java.util.UUID,
driverKey string,
@ -651,7 +657,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputEvent#keepall as v
TachographVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.odometerBeginKm is not null
and v.startedAtEpochSecond >= c.startedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS}
@ -698,7 +704,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputEvent#keepall as v
TachographVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.endedAtEpochSecond is not null
and v.odometerEndKm is not null
@ -774,7 +780,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputEvent#keepall as v
TachographVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.odometerBeginKm is not null
and v.startedAtEpochSecond >= c.endedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS}
@ -821,7 +827,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputEvent#keepall as v
TachographVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.endedAtEpochSecond is not null
and v.odometerEndKm is not null

View File

@ -282,6 +282,12 @@ create schema DailyWeeklyRestCandidateCoverageEmittedKey(
create context PerDriver partition by driverKey from TachographVehicleUsageIntervalInputEvent;
@public create window TachographVehicleUsageIntervalInputWindow#keepall as TachographVehicleUsageIntervalInputEvent;
insert into TachographVehicleUsageIntervalInputWindow
select *
from TachographVehicleUsageIntervalInputEvent;
create schema VuCardAbsentInterval(
sessionId java.util.UUID,
driverKey string,
@ -643,7 +649,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputEvent#keepall as v
TachographVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.odometerBeginKm is not null
and v.startedAtEpochSecond >= c.startedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS}
@ -690,7 +696,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputEvent#keepall as v
TachographVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.endedAtEpochSecond is not null
and v.odometerEndKm is not null
@ -766,7 +772,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputEvent#keepall as v
TachographVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.odometerBeginKm is not null
and v.startedAtEpochSecond >= c.endedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS}
@ -813,7 +819,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputEvent#keepall as v
TachographVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.endedAtEpochSecond is not null
and v.odometerEndKm is not null

View File

@ -7,9 +7,17 @@ import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeAc
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDerivedProjectionBundle;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeProcessingInput;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.StreamUtils;
import org.junit.jupiter.api.Test;
class DriverWorkingTimeReusableProjectionBuilderTest {
@ -237,4 +245,145 @@ class DriverWorkingTimeReusableProjectionBuilderTest {
.isLessThanOrEqualTo(100.0d);
}
@Test
void cleanupContractCoversEveryPublicNamedWindow() throws IOException {
String epl = StreamUtils.copyToString(
new ClassPathResource("esper/driver-working-time-derived-projections.epl").getInputStream(),
StandardCharsets.UTF_8
);
Matcher matcher = Pattern.compile(
"(?m)@public(?:\\s+context\\s+[A-Za-z0-9_]+)?\\s*create\\s+window\\s+([A-Za-z0-9_]+)"
).matcher(epl);
Set<String> publicNamedWindows = new LinkedHashSet<>();
while (matcher.find()) {
publicNamedWindows.add(matcher.group(1));
}
assertThat(publicNamedWindows).isNotEmpty();
for (String windowName : publicNamedWindows) {
assertThat(DriverWorkingTimeReusableProjectionBuilder.REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES)
.anyMatch(query -> query.contains("delete from " + windowName));
}
}
@Test
void clearsRetainedVehicleUsageInputBetweenExecutions() {
DriverWorkingTimeReusableProjectionBuilder builder =
new DriverWorkingTimeReusableProjectionBuilder(new EventHubProperties());
UUID sessionId = UUID.randomUUID();
OffsetDateTime from = OffsetDateTime.parse("2026-05-01T08:00:00Z");
OffsetDateTime firstDriveEnd = OffsetDateTime.parse("2026-05-01T09:00:00Z");
OffsetDateTime secondDriveStart = OffsetDateTime.parse("2026-05-01T12:00:00Z");
OffsetDateTime to = OffsetDateTime.parse("2026-05-01T13:00:00Z");
List<DriverWorkingTimeActivityInterval> activities = List.of(
new DriverWorkingTimeActivityInterval(
sessionId,
"12:123",
"ACT-1",
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"DRIVER_CARD",
"ACT-1",
"ACT-1",
from,
firstDriveEnd,
from.toEpochSecond(),
firstDriveEnd.toEpochSecond(),
firstDriveEnd.toEpochSecond() - from.toEpochSecond(),
List.of("ACT-1"),
false,
false,
"RAW_INTERVAL"
),
new DriverWorkingTimeActivityInterval(
sessionId,
"12:123",
"ACT-2",
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"DRIVER_CARD",
"ACT-2",
"ACT-2",
secondDriveStart,
to,
secondDriveStart.toEpochSecond(),
to.toEpochSecond(),
to.toEpochSecond() - secondDriveStart.toEpochSecond(),
List.of("ACT-2"),
false,
false,
"RAW_INTERVAL"
)
);
DriverWorkingTimeProcessingInput withVehicleUsage = new DriverWorkingTimeProcessingInput(
sessionId,
"12:123",
"DRIVER_CARD",
from,
to,
from,
to,
15,
30,
activities,
List.of(new DriverWorkingTimeVehicleUsageInterval(
sessionId,
"12:123",
"VU-1",
"VU-1",
"VU-1",
from,
to,
from.toEpochSecond(),
to.toEpochSecond(),
to.toEpochSecond() - from.toEpochSecond(),
100L,
200L,
"12:REG-1",
"VIN-1",
"DRIVER_CARD",
List.of("VU-1")
)),
List.of(),
List.of()
);
DriverWorkingTimeProcessingInput withoutVehicleUsage = new DriverWorkingTimeProcessingInput(
sessionId,
"12:123",
"DRIVER_CARD",
from,
to,
from,
to,
15,
30,
activities,
List.of(),
List.of(),
List.of()
);
DriverWorkingTimeDerivedProjectionBundle first = builder.buildDerivedProjectionBundle(withVehicleUsage);
DriverWorkingTimeDerivedProjectionBundle second = builder.buildDerivedProjectionBundle(withoutVehicleUsage);
assertThat(first.dailyWeeklyRestCandidateCoverageIntervals()).hasSize(1);
assertThat(first.dailyWeeklyRestCandidateCoverageIntervals().get(0).beginBoundaryOdometerKm())
.isNotNull();
assertThat(second.dailyWeeklyRestCandidateCoverageIntervals()).hasSize(1);
assertThat(second.dailyWeeklyRestCandidateCoverageIntervals().get(0).beginBoundaryOdometerKm())
.isNull();
assertThat(second.dailyWeeklyRestCandidateCoverageIntervals().get(0).endBoundaryOdometerKm())
.isNull();
}
}