diff --git a/README_PATCH.md b/README_PATCH.md index c904110..bf6c938 100644 --- a/README_PATCH.md +++ b/README_PATCH.md @@ -1,25 +1,60 @@ -# Fix: cardAbsentCoveragePercent above 100% on reused Esper runtime +# Patch: Reusable Esper runtime execution-state cleanup -## Root cause +## Purpose -`DriverWorkingTimeReusableProjectionBuilder` pools Esper runtimes. The EPL used -`VuCardAbsentInterval#keepall` as a statement-local data window, but the runtime -cleanup did not clear that retained state before the next execution. +Prevent retained Esper state from leaking between pipeline executions when `DriverWorkingTimeReusableProjectionBuilder` reuses a pooled runtime. -When the same pooled runtime processed a second request, the previous execution's -card-absent intervals remained in the overlap calculation. New intervals were -added again, so `cardAbsentDurationSeconds` was doubled while the output listener -still reported only the newly emitted `VuCardAbsentInterval` events. +## Runtime lifecycle -This is source-independent. It appeared in the DB result because that request was -executed after the file-session request on the same pooled runtime. +A pooled runtime now follows this lifecycle: -## Changes +```text +acquire runtime +-> clear execution state before sending events +-> execute the complete derived-projection module +-> detach result listeners +-> clear execution state again in finally +-> return only a clean runtime to the pool +``` -- Added public named window `VuCardAbsentIntervalWindow#keepall`. -- Routed generated `VuCardAbsentInterval` events into the named window. -- Changed rest-coverage overlap calculations to read from the named window. -- Added `delete from VuCardAbsentIntervalWindow` to reusable-runtime cleanup. -- Applied the same EPL structure to the legacy/reference projection bundle. -- Added a regression test that executes the same input twice on the same builder - and verifies coverage does not double and remains at or below 100%. +If execution or cleanup fails, the runtime is marked unsafe and destroyed instead of being pooled. + +## Resettable input retention + +The statement-local usages of: + +```epl +TachographVehicleUsageIntervalInputEvent#keepall +``` + +have been replaced with the public named window: + +```epl +TachographVehicleUsageIntervalInputWindow#keepall +``` + +The window is populated from `TachographVehicleUsageIntervalInputEvent` and is part of the cleanup contract. This prevents odometer and vehicle-usage evidence from a previous execution from participating in a later execution. + +The earlier `VuCardAbsentIntervalWindow` fix is retained. + +## Cleanup contract + +All public named windows in `driver-working-time-derived-projections.epl`, including the context-scoped `PreviousVehicleUsageInterval`, have corresponding fire-and-forget delete queries. + +A regression test scans the EPL and fails when a newly introduced public named window is not added to the cleanup contract. + +## Metrics + +Runtime logging now reports separate values: + +- `runtimeResetBeforeMs` +- `runtimeResetAfterMs` + +## Tests + +Added/retained tests for: + +- identical results when a warm runtime is reused; +- no doubled card-absent coverage and coverage not exceeding 100%; +- no retained vehicle-usage/odometer evidence in the next execution; +- cleanup-query coverage for every public named window. diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java index 8a3cb97..de94cd8 100644 --- a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java @@ -58,7 +58,8 @@ public class DriverWorkingTimeReusableProjectionBuilder { private static final Map VEHICLE_USAGE_INTERVAL_INPUT_DEFINITION = vehicleUsageIntervalInputDefinitionStatic(); private static final Map SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION = supportGeoEvidenceInputDefinitionStatic(); private static final int MAX_IDLE_RUNTIMES_PER_DEFINITION = 2; - private static final List REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES = List.of( + static final List REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES = List.of( + "delete from TachographVehicleUsageIntervalInputWindow", "delete from PreviousRestCandidateCoverageInterval", "delete from OpenPotentialInVehicleTripState", "delete from SupportGeoEvidenceWindow", @@ -186,7 +187,7 @@ public class DriverWorkingTimeReusableProjectionBuilder { long sortOutputMs = elapsedMillis(sortOutputStartedAtNanos); LOG.info( - "Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimePoolHit: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, runtimeResetMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})", + "Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimePoolHit: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, runtimeResetBeforeMs: {}, runtimeResetAfterMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})", elapsedMillis(startedAtNanos), runtimeMetrics.definitionCacheHit(), runtimeMetrics.definitionPreparationMs(), @@ -194,7 +195,8 @@ public class DriverWorkingTimeReusableProjectionBuilder { runtimeMetrics.runtimeInitMs(), runtimeMetrics.deployMs(), runtimeMetrics.listenerRegistrationMs(), - runtimeMetrics.runtimeResetMs(), + runtimeMetrics.runtimeResetBeforeMs(), + runtimeMetrics.runtimeResetAfterMs(), runtimeMetrics.sendSupportGeoMs(), runtimeMetrics.sendVehicleUsageMs(), runtimeMetrics.sendActivityMs(), @@ -288,7 +290,8 @@ public class DriverWorkingTimeReusableProjectionBuilder { long runtimeInitMs = 0L; long deployMs = 0L; long listenerRegistrationMs = 0L; - long runtimeResetMs = 0L; + long runtimeResetBeforeMs = 0L; + long runtimeResetAfterMs = 0L; long sendSupportGeoMs = 0L; long sendVehicleUsageMs = 0L; long sendActivityMs = 0L; @@ -330,14 +333,15 @@ public class DriverWorkingTimeReusableProjectionBuilder { listenerRegistrationMs = reusableRuntime.listenerRegistrationMs(); ReusableProjectionRuntimeExecution execution = reusableRuntime.execute(listeners, sender); - runtimeResetMs = execution.runtimeResetMs(); + runtimeResetBeforeMs = execution.runtimeResetBeforeMs(); + runtimeResetAfterMs = execution.runtimeResetAfterMs(); sendSupportGeoMs = execution.sendSupportGeoMs(); sendVehicleUsageMs = execution.sendVehicleUsageMs(); sendActivityMs = execution.sendActivityMs(); } catch (EPCompileException | EPDeployException e) { discardRuntime = true; throw new IllegalStateException("Cannot compile/deploy reusable driver working-time projection EPL bundle", e); - } catch (RuntimeException e) { + } catch (RuntimeException | Error e) { discardRuntime = true; throw e; } finally { @@ -356,7 +360,8 @@ public class DriverWorkingTimeReusableProjectionBuilder { runtimeInitMs, deployMs, listenerRegistrationMs, - runtimeResetMs, + runtimeResetBeforeMs, + runtimeResetAfterMs, sendSupportGeoMs, sendVehicleUsageMs, sendActivityMs, @@ -973,7 +978,8 @@ public class DriverWorkingTimeReusableProjectionBuilder { long runtimeInitMs, long deployMs, long listenerRegistrationMs, - long runtimeResetMs, + long runtimeResetBeforeMs, + long runtimeResetAfterMs, long sendSupportGeoMs, long sendVehicleUsageMs, long sendActivityMs, @@ -1000,6 +1006,9 @@ public class DriverWorkingTimeReusableProjectionBuilder { if (runtime == null) { return 0L; } + if (!runtime.cleanForReuse()) { + return runtime.destroy(); + } synchronized (this) { if (idleRuntimes.size() < MAX_IDLE_RUNTIMES_PER_DEFINITION) { runtime.poolHit(false); @@ -1019,6 +1028,7 @@ public class DriverWorkingTimeReusableProjectionBuilder { private volatile long listenerRegistrationMs; private volatile List cleanupQueries = List.of(); private volatile boolean poolHit; + private volatile boolean cleanForReuse = true; private ExecutionListeners currentExecutionListeners; private ReusableProjectionRuntime( @@ -1037,24 +1047,48 @@ public class DriverWorkingTimeReusableProjectionBuilder { Map> listeners, Consumer sender ) { + cleanForReuse = false; currentExecutionListeners = new ExecutionListeners(listeners); + long runtimeResetBeforeMs = 0L; + long runtimeResetAfterMs = 0L; + DerivedProjectionEventSender timedSender = new DerivedProjectionEventSender(runtime); + Throwable executionFailure = null; + try { + runtimeResetBeforeMs = resetExecutionState(); + sender.accept(timedSender); + } catch (RuntimeException | Error failure) { + executionFailure = failure; + throw failure; + } finally { + // Cleanup must not feed delete/old-data callbacks into result collectors. + currentExecutionListeners = null; + try { + runtimeResetAfterMs = resetExecutionState(); + cleanForReuse = true; + } catch (RuntimeException | Error cleanupFailure) { + cleanForReuse = false; + if (executionFailure != null) { + executionFailure.addSuppressed(cleanupFailure); + } else { + throw cleanupFailure; + } + } + } + return new ReusableProjectionRuntimeExecution( + runtimeResetBeforeMs, + runtimeResetAfterMs, + timedSender.sendSupportGeoMs(), + timedSender.sendVehicleUsageMs(), + timedSender.sendActivityMs() + ); + } + + private long resetExecutionState() { long runtimeResetStartedAtNanos = System.nanoTime(); for (EPCompiled cleanupQuery : cleanupQueries) { runtime.getFireAndForgetService().executeQuery(cleanupQuery); } - long runtimeResetMs = elapsedMillisStatic(runtimeResetStartedAtNanos); - try { - DerivedProjectionEventSender timedSender = new DerivedProjectionEventSender(runtime); - sender.accept(timedSender); - return new ReusableProjectionRuntimeExecution( - runtimeResetMs, - timedSender.sendSupportGeoMs(), - timedSender.sendVehicleUsageMs(), - timedSender.sendActivityMs() - ); - } finally { - currentExecutionListeners = null; - } + return elapsedMillisStatic(runtimeResetStartedAtNanos); } private void onStatementEvents(String statementName, EventBean[] newData) { @@ -1098,6 +1132,10 @@ public class DriverWorkingTimeReusableProjectionBuilder { return poolHit; } + private boolean cleanForReuse() { + return cleanForReuse; + } + private long runtimeInitMs() { return poolHit ? 0L : runtimeInitMs; } @@ -1120,7 +1158,8 @@ public class DriverWorkingTimeReusableProjectionBuilder { } private record ReusableProjectionRuntimeExecution( - long runtimeResetMs, + long runtimeResetBeforeMs, + long runtimeResetAfterMs, long sendSupportGeoMs, long sendVehicleUsageMs, long sendActivityMs diff --git a/src/main/resources/esper/driver-working-time-derived-projections.epl b/src/main/resources/esper/driver-working-time-derived-projections.epl index 553902e..02ee189 100644 --- a/src/main/resources/esper/driver-working-time-derived-projections.epl +++ b/src/main/resources/esper/driver-working-time-derived-projections.epl @@ -290,6 +290,12 @@ create schema DailyWeeklyRestCandidateCoverageEmittedKey( @public create context PerDriver partition by driverKey from TachographVehicleUsageIntervalInputEvent; +@public create window TachographVehicleUsageIntervalInputWindow#keepall as TachographVehicleUsageIntervalInputEvent; + +insert into TachographVehicleUsageIntervalInputWindow +select * +from TachographVehicleUsageIntervalInputEvent; + create schema VuCardAbsentInterval( sessionId java.util.UUID, driverKey string, @@ -651,7 +657,7 @@ select end ) as rankScore from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional, - TachographVehicleUsageIntervalInputEvent#keepall as v + TachographVehicleUsageIntervalInputWindow as v where v.driverKey = c.driverKey and v.odometerBeginKm is not null and v.startedAtEpochSecond >= c.startedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS} @@ -698,7 +704,7 @@ select end ) as rankScore from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional, - TachographVehicleUsageIntervalInputEvent#keepall as v + TachographVehicleUsageIntervalInputWindow as v where v.driverKey = c.driverKey and v.endedAtEpochSecond is not null and v.odometerEndKm is not null @@ -774,7 +780,7 @@ select end ) as rankScore from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional, - TachographVehicleUsageIntervalInputEvent#keepall as v + TachographVehicleUsageIntervalInputWindow as v where v.driverKey = c.driverKey and v.odometerBeginKm is not null and v.startedAtEpochSecond >= c.endedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS} @@ -821,7 +827,7 @@ select end ) as rankScore from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional, - TachographVehicleUsageIntervalInputEvent#keepall as v + TachographVehicleUsageIntervalInputWindow as v where v.driverKey = c.driverKey and v.endedAtEpochSecond is not null and v.odometerEndKm is not null diff --git a/src/main/resources/esper/tachograph-driving-derived-projection-bundle.epl b/src/main/resources/esper/tachograph-driving-derived-projection-bundle.epl index 5eca511..df349cd 100644 --- a/src/main/resources/esper/tachograph-driving-derived-projection-bundle.epl +++ b/src/main/resources/esper/tachograph-driving-derived-projection-bundle.epl @@ -282,6 +282,12 @@ create schema DailyWeeklyRestCandidateCoverageEmittedKey( create context PerDriver partition by driverKey from TachographVehicleUsageIntervalInputEvent; +@public create window TachographVehicleUsageIntervalInputWindow#keepall as TachographVehicleUsageIntervalInputEvent; + +insert into TachographVehicleUsageIntervalInputWindow +select * +from TachographVehicleUsageIntervalInputEvent; + create schema VuCardAbsentInterval( sessionId java.util.UUID, driverKey string, @@ -643,7 +649,7 @@ select end ) as rankScore from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional, - TachographVehicleUsageIntervalInputEvent#keepall as v + TachographVehicleUsageIntervalInputWindow as v where v.driverKey = c.driverKey and v.odometerBeginKm is not null and v.startedAtEpochSecond >= c.startedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS} @@ -690,7 +696,7 @@ select end ) as rankScore from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional, - TachographVehicleUsageIntervalInputEvent#keepall as v + TachographVehicleUsageIntervalInputWindow as v where v.driverKey = c.driverKey and v.endedAtEpochSecond is not null and v.odometerEndKm is not null @@ -766,7 +772,7 @@ select end ) as rankScore from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional, - TachographVehicleUsageIntervalInputEvent#keepall as v + TachographVehicleUsageIntervalInputWindow as v where v.driverKey = c.driverKey and v.odometerBeginKm is not null and v.startedAtEpochSecond >= c.endedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS} @@ -813,7 +819,7 @@ select end ) as rankScore from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional, - TachographVehicleUsageIntervalInputEvent#keepall as v + TachographVehicleUsageIntervalInputWindow as v where v.driverKey = c.driverKey and v.endedAtEpochSecond is not null and v.odometerEndKm is not null diff --git a/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java b/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java index 7c14a44..987d389 100644 --- a/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java +++ b/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java @@ -7,9 +7,17 @@ import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeAc import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDerivedProjectionBundle; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeProcessingInput; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.springframework.core.io.ClassPathResource; +import org.springframework.util.StreamUtils; import org.junit.jupiter.api.Test; class DriverWorkingTimeReusableProjectionBuilderTest { @@ -237,4 +245,145 @@ class DriverWorkingTimeReusableProjectionBuilderTest { .isLessThanOrEqualTo(100.0d); } + @Test + void cleanupContractCoversEveryPublicNamedWindow() throws IOException { + String epl = StreamUtils.copyToString( + new ClassPathResource("esper/driver-working-time-derived-projections.epl").getInputStream(), + StandardCharsets.UTF_8 + ); + Matcher matcher = Pattern.compile( + "(?m)@public(?:\\s+context\\s+[A-Za-z0-9_]+)?\\s*create\\s+window\\s+([A-Za-z0-9_]+)" + ).matcher(epl); + Set publicNamedWindows = new LinkedHashSet<>(); + while (matcher.find()) { + publicNamedWindows.add(matcher.group(1)); + } + + assertThat(publicNamedWindows).isNotEmpty(); + for (String windowName : publicNamedWindows) { + assertThat(DriverWorkingTimeReusableProjectionBuilder.REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES) + .anyMatch(query -> query.contains("delete from " + windowName)); + } + } + + @Test + void clearsRetainedVehicleUsageInputBetweenExecutions() { + DriverWorkingTimeReusableProjectionBuilder builder = + new DriverWorkingTimeReusableProjectionBuilder(new EventHubProperties()); + UUID sessionId = UUID.randomUUID(); + OffsetDateTime from = OffsetDateTime.parse("2026-05-01T08:00:00Z"); + OffsetDateTime firstDriveEnd = OffsetDateTime.parse("2026-05-01T09:00:00Z"); + OffsetDateTime secondDriveStart = OffsetDateTime.parse("2026-05-01T12:00:00Z"); + OffsetDateTime to = OffsetDateTime.parse("2026-05-01T13:00:00Z"); + + List activities = List.of( + new DriverWorkingTimeActivityInterval( + sessionId, + "12:123", + "ACT-1", + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "DRIVER_CARD", + "ACT-1", + "ACT-1", + from, + firstDriveEnd, + from.toEpochSecond(), + firstDriveEnd.toEpochSecond(), + firstDriveEnd.toEpochSecond() - from.toEpochSecond(), + List.of("ACT-1"), + false, + false, + "RAW_INTERVAL" + ), + new DriverWorkingTimeActivityInterval( + sessionId, + "12:123", + "ACT-2", + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "DRIVER_CARD", + "ACT-2", + "ACT-2", + secondDriveStart, + to, + secondDriveStart.toEpochSecond(), + to.toEpochSecond(), + to.toEpochSecond() - secondDriveStart.toEpochSecond(), + List.of("ACT-2"), + false, + false, + "RAW_INTERVAL" + ) + ); + + DriverWorkingTimeProcessingInput withVehicleUsage = new DriverWorkingTimeProcessingInput( + sessionId, + "12:123", + "DRIVER_CARD", + from, + to, + from, + to, + 15, + 30, + activities, + List.of(new DriverWorkingTimeVehicleUsageInterval( + sessionId, + "12:123", + "VU-1", + "VU-1", + "VU-1", + from, + to, + from.toEpochSecond(), + to.toEpochSecond(), + to.toEpochSecond() - from.toEpochSecond(), + 100L, + 200L, + "12:REG-1", + "VIN-1", + "DRIVER_CARD", + List.of("VU-1") + )), + List.of(), + List.of() + ); + DriverWorkingTimeProcessingInput withoutVehicleUsage = new DriverWorkingTimeProcessingInput( + sessionId, + "12:123", + "DRIVER_CARD", + from, + to, + from, + to, + 15, + 30, + activities, + List.of(), + List.of(), + List.of() + ); + + DriverWorkingTimeDerivedProjectionBundle first = builder.buildDerivedProjectionBundle(withVehicleUsage); + DriverWorkingTimeDerivedProjectionBundle second = builder.buildDerivedProjectionBundle(withoutVehicleUsage); + + assertThat(first.dailyWeeklyRestCandidateCoverageIntervals()).hasSize(1); + assertThat(first.dailyWeeklyRestCandidateCoverageIntervals().get(0).beginBoundaryOdometerKm()) + .isNotNull(); + assertThat(second.dailyWeeklyRestCandidateCoverageIntervals()).hasSize(1); + assertThat(second.dailyWeeklyRestCandidateCoverageIntervals().get(0).beginBoundaryOdometerKm()) + .isNull(); + assertThat(second.dailyWeeklyRestCandidateCoverageIntervals().get(0).endBoundaryOdometerKm()) + .isNull(); + } + }