Extract driver working time Esper contract names

This commit is contained in:
trifonovt 2026-06-15 13:47:01 +02:00
parent e45fe29d3f
commit e24df88736
8 changed files with 146 additions and 93 deletions

View File

@ -1,60 +1,35 @@
# Patch: Reusable Esper runtime execution-state cleanup
# Driver-working-time Esper contract refactoring
## Purpose
This patch removes tachograph-prefixed Esper contract names from the active source-neutral driver-working-time pipeline without changing processing behavior.
Prevent retained Esper state from leaking between pipeline executions when `DriverWorkingTimeReusableProjectionBuilder` reuses a pooled runtime.
## New canonical Esper contracts
## Runtime lifecycle
- `DriverWorkingTimeActivityPointInputEvent`
- `DriverWorkingTimeVehicleUsagePointInputEvent`
- `DriverWorkingTimeActivityIntervalInputEvent`
- `DriverWorkingTimeVehicleUsageIntervalInputEvent`
- `DriverWorkingTimeSupportEvidenceInputEvent`
- `DriverWorkingTimeProjectionFinalizeEvent`
- `DriverWorkingTimeVehicleUsageIntervalInputWindow`
A pooled runtime now follows this lifecycle:
`DriverWorkingTimeEsperContractNames` centralizes these names for Java-side registration, event sending, cleanup, and future common modules.
```text
acquire runtime
-> clear execution state before sending events
-> execute the complete derived-projection module
-> detach result listeners
-> clear execution state again in finally
-> return only a clean runtime to the pool
```
## Updated active common components
If execution or cleanup fails, the runtime is marked unsafe and destroyed instead of being pooled.
- `DriverWorkingTimeReusableProjectionBuilder`
- `driver-working-time-derived-projections.epl`
- `runtime-driver-event-interval-preprocessor.epl`
- runtime cleanup query for the vehicle-usage input window
- contract and cleanup regression tests
- runtime-processing documentation
## Resettable input retention
## Compatibility
The statement-local usages of:
The tachograph-specific legacy resources and file-session compatibility code are intentionally not renamed. They remain isolated compatibility artifacts. No public request/response DTO, module key, plan key, or business rule is changed.
```epl
TachographVehicleUsageIntervalInputEvent#keepall
```
## Validation
have been replaced with the public named window:
```epl
TachographVehicleUsageIntervalInputWindow#keepall
```
The window is populated from `TachographVehicleUsageIntervalInputEvent` and is part of the cleanup contract. This prevents odometer and vehicle-usage evidence from a previous execution from participating in a later execution.
The earlier `VuCardAbsentIntervalWindow` fix is retained.
## Cleanup contract
All public named windows in `driver-working-time-derived-projections.epl`, including the context-scoped `PreviousVehicleUsageInterval`, have corresponding fire-and-forget delete queries.
A regression test scans the EPL and fails when a newly introduced public named window is not added to the cleanup contract.
## Metrics
Runtime logging now reports separate values:
- `runtimeResetBeforeMs`
- `runtimeResetAfterMs`
## Tests
Added/retained tests for:
- identical results when a warm runtime is reused;
- no doubled card-absent coverage and coverage not exceeding 100%;
- no retained vehicle-usage/odometer evidence in the next execution;
- cleanup-query coverage for every public named window.
- Verified that active common Java/EPL files contain no legacy tachograph input-contract names.
- Verified that all public named windows still have cleanup coverage.
- Compiled the new contract-name class with `javac`.
- Maven tests were not run because Maven and a Maven wrapper are unavailable in the environment.

View File

@ -79,6 +79,20 @@ tachograph-driving-derived-projection-bundle.epl
New runtime-processing code should use the driver-working-time names.
The common Esper contract is source-neutral as well:
```text
DriverWorkingTimeActivityPointInputEvent
DriverWorkingTimeVehicleUsagePointInputEvent
DriverWorkingTimeActivityIntervalInputEvent
DriverWorkingTimeVehicleUsageIntervalInputEvent
DriverWorkingTimeSupportEvidenceInputEvent
DriverWorkingTimeProjectionFinalizeEvent
DriverWorkingTimeVehicleUsageIntervalInputWindow
```
Tachograph-prefixed Esper types remain only inside the compatibility resources listed above.
## EPL-backed phase modules
The driver working-time plan now contains first-class EPL-backed phase modules for event-to-interval conversion:

View File

@ -22,12 +22,6 @@ POST /api/eventhub/runtime-processing/event-processing
but it should be treated as a legacy profile adapter. New clients should use `processingPlanKey`, not `profileKey`.
## Assembly terminology
`runtime-event-assembly` collects driver seed events and, when enabled, additional events for vehicles discovered in the driver scope. The resulting collection is an **aggregated runtime scope**. It is only canonically de-duplicated and ordered at this stage; semantic card/VU evidence mixing and vehicle-usage interval reconciliation are performed by later modules.
For API compatibility, `UnifiedRuntimeEventBundle` still exposes this collection as `mergedEvents`. New internal assembly-stage code uses the alias `aggregatedEvents`, and assembly-module metadata exposes `aggregatedEventCount` while retaining `mergedEventCount` as a compatibility alias.
## First predefined plan
The first predefined plan is:
@ -174,6 +168,10 @@ RuntimeDriverWorkingTimeScopeProcessingService
Legacy tachograph names are kept as compatibility adapters for existing file-session endpoints and Postman requests. New runtime code should use the `driver-working-time-*` classes/resources and the `driver-working-time-v1` processing plan.
The active common EPL contracts use `DriverWorkingTime*InputEvent` and
`DriverWorkingTimeVehicleUsageIntervalInputWindow` names. Tachograph-prefixed Esper input
contracts are limited to legacy compatibility resources.
## Module execution results
`/api/eventhub/runtime-processing/executions` now exposes module execution metadata explicitly.

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.processing.driverworkingtime.esper;
/**
* Canonical Esper type and state names used by the source-neutral driver-working-time pipeline.
*
* <p>Source-specific adapters may keep their own compatibility names, but common runtime modules
* must use these contracts after source events have been normalized.</p>
*/
public final class DriverWorkingTimeEsperContractNames {
public static final String ACTIVITY_POINT_INPUT_EVENT_TYPE =
"DriverWorkingTimeActivityPointInputEvent";
public static final String VEHICLE_USAGE_POINT_INPUT_EVENT_TYPE =
"DriverWorkingTimeVehicleUsagePointInputEvent";
public static final String ACTIVITY_INTERVAL_INPUT_EVENT_TYPE =
"DriverWorkingTimeActivityIntervalInputEvent";
public static final String VEHICLE_USAGE_INTERVAL_INPUT_EVENT_TYPE =
"DriverWorkingTimeVehicleUsageIntervalInputEvent";
public static final String SUPPORT_EVIDENCE_INPUT_EVENT_TYPE =
"DriverWorkingTimeSupportEvidenceInputEvent";
public static final String PROJECTION_FINALIZE_EVENT_TYPE =
"DriverWorkingTimeProjectionFinalizeEvent";
public static final String VEHICLE_USAGE_INTERVAL_INPUT_WINDOW =
"DriverWorkingTimeVehicleUsageIntervalInputWindow";
private DriverWorkingTimeEsperContractNames() {
}
}

View File

@ -11,6 +11,7 @@ import com.espertech.esper.runtime.client.EPDeployment;
import com.espertech.esper.runtime.client.EPRuntime;
import com.espertech.esper.runtime.client.EPRuntimeProvider;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.processing.driverworkingtime.esper.DriverWorkingTimeEsperContractNames;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDerivedProjectionBundle;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDrivingInterruptionInterval;
@ -59,7 +60,7 @@ public class DriverWorkingTimeReusableProjectionBuilder {
private static final Map<String, Object> SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION = supportGeoEvidenceInputDefinitionStatic();
private static final int MAX_IDLE_RUNTIMES_PER_DEFINITION = 2;
static final List<String> REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES = List.of(
"delete from TachographVehicleUsageIntervalInputWindow",
"delete from " + DriverWorkingTimeEsperContractNames.VEHICLE_USAGE_INTERVAL_INPUT_WINDOW,
"delete from PreviousRestCandidateCoverageInterval",
"delete from OpenPotentialInVehicleTripState",
"delete from SupportGeoEvidenceWindow",
@ -440,15 +441,15 @@ public class DriverWorkingTimeReusableProjectionBuilder {
private Configuration createRuntimeConfiguration() {
Configuration configuration = new Configuration();
configuration.getCommon().addEventType(
"TachographActivityIntervalInputEvent",
DriverWorkingTimeEsperContractNames.ACTIVITY_INTERVAL_INPUT_EVENT_TYPE,
activityIntervalInputDefinition()
);
configuration.getCommon().addEventType(
"TachographVehicleUsageIntervalInputEvent",
DriverWorkingTimeEsperContractNames.VEHICLE_USAGE_INTERVAL_INPUT_EVENT_TYPE,
vehicleUsageIntervalInputDefinition()
);
configuration.getCommon().addEventType(
"TachographSupportGeoEvidenceInputEvent",
DriverWorkingTimeEsperContractNames.SUPPORT_EVIDENCE_INPUT_EVENT_TYPE,
supportGeoEvidenceInputDefinition()
);
return configuration;
@ -1179,19 +1180,19 @@ public class DriverWorkingTimeReusableProjectionBuilder {
private void sendSupportGeoEvent(Map<String, Object> event) {
long startedAtNanos = System.nanoTime();
delegate.getEventService().sendEventMap(event, "TachographSupportGeoEvidenceInputEvent");
delegate.getEventService().sendEventMap(event, DriverWorkingTimeEsperContractNames.SUPPORT_EVIDENCE_INPUT_EVENT_TYPE);
sendSupportGeoMs += elapsedMillisStatic(startedAtNanos);
}
private void sendVehicleUsageEvent(Map<String, Object> event) {
long startedAtNanos = System.nanoTime();
delegate.getEventService().sendEventMap(event, "TachographVehicleUsageIntervalInputEvent");
delegate.getEventService().sendEventMap(event, DriverWorkingTimeEsperContractNames.VEHICLE_USAGE_INTERVAL_INPUT_EVENT_TYPE);
sendVehicleUsageMs += elapsedMillisStatic(startedAtNanos);
}
private void sendActivityEvent(Map<String, Object> event) {
long startedAtNanos = System.nanoTime();
delegate.getEventService().sendEventMap(event, "TachographActivityIntervalInputEvent");
delegate.getEventService().sendEventMap(event, DriverWorkingTimeEsperContractNames.ACTIVITY_INTERVAL_INPUT_EVENT_TYPE);
sendActivityMs += elapsedMillisStatic(startedAtNanos);
}

View File

@ -288,13 +288,13 @@ create schema DailyWeeklyRestCandidateCoverageEmittedKey(
endedAtEpochSecond long
);
@public create context PerDriver partition by driverKey from TachographVehicleUsageIntervalInputEvent;
@public create context PerDriver partition by driverKey from DriverWorkingTimeVehicleUsageIntervalInputEvent;
@public create window TachographVehicleUsageIntervalInputWindow#keepall as TachographVehicleUsageIntervalInputEvent;
@public create window DriverWorkingTimeVehicleUsageIntervalInputWindow#keepall as DriverWorkingTimeVehicleUsageIntervalInputEvent;
insert into TachographVehicleUsageIntervalInputWindow
insert into DriverWorkingTimeVehicleUsageIntervalInputWindow
select *
from TachographVehicleUsageIntervalInputEvent;
from DriverWorkingTimeVehicleUsageIntervalInputEvent;
create schema VuCardAbsentInterval(
sessionId java.util.UUID,
@ -481,7 +481,7 @@ select
longitude,
odometerKm,
priority
from TachographSupportGeoEvidenceInputEvent;
from DriverWorkingTimeSupportEvidenceInputEvent;
insert into SignificantDrivingInterval
select
@ -494,7 +494,7 @@ select
durationSeconds,
registrationKey,
vehicleKey
from TachographActivityIntervalInputEvent(activityType = 'DRIVE', durationSeconds > ${SIGNIFICANT_DRIVING_THRESHOLD_SECONDS});
from DriverWorkingTimeActivityIntervalInputEvent(activityType = 'DRIVE', durationSeconds > ${SIGNIFICANT_DRIVING_THRESHOLD_SECONDS});
@public create window PreviousSignificantDrivingInterval#unique(driverKey) as SignificantDrivingInterval;
@ -657,7 +657,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputWindow as v
DriverWorkingTimeVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.odometerBeginKm is not null
and v.startedAtEpochSecond >= c.startedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS}
@ -704,7 +704,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputWindow as v
DriverWorkingTimeVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.endedAtEpochSecond is not null
and v.odometerEndKm is not null
@ -780,7 +780,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputWindow as v
DriverWorkingTimeVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.odometerBeginKm is not null
and v.startedAtEpochSecond >= c.endedAtEpochSecond - ${REST_GEO_LOOKBACK_SECONDS}
@ -827,7 +827,7 @@ select
end
) as rankScore
from DailyWeeklyRestCandidateCoverageCardResolvedInterval as c unidirectional,
TachographVehicleUsageIntervalInputWindow as v
DriverWorkingTimeVehicleUsageIntervalInputWindow as v
where v.driverKey = c.driverKey
and v.endedAtEpochSecond is not null
and v.odometerEndKm is not null
@ -1492,11 +1492,11 @@ select
from DailyWeeklyRestCandidateCoverageInterval;
@public context PerDriver
create window PreviousVehicleUsageInterval#lastevent as TachographVehicleUsageIntervalInputEvent;
create window PreviousVehicleUsageInterval#lastevent as DriverWorkingTimeVehicleUsageIntervalInputEvent;
@Priority(30)
context PerDriver
on TachographVehicleUsageIntervalInputEvent as next
on DriverWorkingTimeVehicleUsageIntervalInputEvent as next
insert into VuCardAbsentInterval
select
priorInterval.sessionId as sessionId,
@ -1517,12 +1517,12 @@ where priorInterval.endedAt is not null
@Priority(20)
context PerDriver
on TachographVehicleUsageIntervalInputEvent
on DriverWorkingTimeVehicleUsageIntervalInputEvent
delete from PreviousVehicleUsageInterval;
@Priority(10)
context PerDriver
on TachographVehicleUsageIntervalInputEvent as current
on DriverWorkingTimeVehicleUsageIntervalInputEvent as current
insert into PreviousVehicleUsageInterval
select *;

View File

@ -1,8 +1,8 @@
/*
* Source-neutral event-input adapter for driver-working-time-derived-projections.epl.
*
* The old bundle consumes resolved interval input streams. This preprocessor lets the same
* derived rules consume EventHub point events by pairing START/END activity events and
* The projection bundle consumes resolved interval input streams. This preprocessor lets the
* same derived rules consume EventHub point events by pairing START/END activity events and
* INSERT/WITHDRAW card-vehicle usage events inside Esper.
*
* Vehicle-usage intervals are additionally coalesced in EPL before they are forwarded to
@ -11,15 +11,15 @@
* common midnight continuation 23:59:59 -> 00:00:00 on the next day.
*/
create window OpenActivityPoint#unique(driverKey, intervalId) as TachographActivityPointInputEvent;
create window OpenActivityPoint#unique(driverKey, intervalId) as DriverWorkingTimeActivityPointInputEvent;
insert into OpenActivityPoint
select *
from TachographActivityPointInputEvent(lifecycle = 'START');
from DriverWorkingTimeActivityPointInputEvent(lifecycle = 'START');
@Priority(20)
on TachographActivityPointInputEvent(lifecycle = 'END') as endEvent
insert into TachographActivityIntervalInputEvent
on DriverWorkingTimeActivityPointInputEvent(lifecycle = 'END') as endEvent
insert into DriverWorkingTimeActivityIntervalInputEvent
select
startEvent.sessionId as sessionId,
startEvent.driverKey as driverKey,
@ -48,7 +48,7 @@ where startEvent.driverKey = endEvent.driverKey
and endEvent.occurredAtEpochSecond > startEvent.occurredAtEpochSecond;
@Priority(10)
on TachographActivityPointInputEvent(lifecycle = 'END') as endEvent
on DriverWorkingTimeActivityPointInputEvent(lifecycle = 'END') as endEvent
delete from OpenActivityPoint as openEvent
where openEvent.driverKey = endEvent.driverKey
and openEvent.intervalId = endEvent.intervalId;
@ -72,14 +72,14 @@ create schema RawVehicleUsageInterval(
sourceIntervalIds java.util.List
);
create window OpenVehicleUsagePoint#unique(driverKey, intervalId) as TachographVehicleUsagePointInputEvent;
create window OpenVehicleUsagePoint#unique(driverKey, intervalId) as DriverWorkingTimeVehicleUsagePointInputEvent;
insert into OpenVehicleUsagePoint
select *
from TachographVehicleUsagePointInputEvent(lifecycle = 'INSERT');
from DriverWorkingTimeVehicleUsagePointInputEvent(lifecycle = 'INSERT');
@Priority(20)
on TachographVehicleUsagePointInputEvent(lifecycle = 'WITHDRAW') as withdrawEvent
on DriverWorkingTimeVehicleUsagePointInputEvent(lifecycle = 'WITHDRAW') as withdrawEvent
insert into RawVehicleUsageInterval
select
insertEvent.sessionId as sessionId,
@ -104,7 +104,7 @@ where insertEvent.driverKey = withdrawEvent.driverKey
and withdrawEvent.occurredAtEpochSecond > insertEvent.occurredAtEpochSecond;
@Priority(10)
on TachographVehicleUsagePointInputEvent(lifecycle = 'WITHDRAW') as withdrawEvent
on DriverWorkingTimeVehicleUsagePointInputEvent(lifecycle = 'WITHDRAW') as withdrawEvent
delete from OpenVehicleUsagePoint as openEvent
where openEvent.driverKey = withdrawEvent.driverKey
and openEvent.intervalId = withdrawEvent.intervalId;
@ -192,7 +192,7 @@ where current.driverKey = next.driverKey
*/
@Priority(70)
on RawVehicleUsageInterval as next
insert into TachographVehicleUsageIntervalInputEvent
insert into DriverWorkingTimeVehicleUsageIntervalInputEvent
select
current.sessionId as sessionId,
current.driverKey as driverKey,
@ -316,12 +316,12 @@ where candidate.driverKey = next.driverKey;
/*
* The last accumulated interval cannot be emitted by looking at the next interval, so Java
* sends one TachographProjectionFinalizeEvent per driver after all vehicle-usage point
* sends one DriverWorkingTimeProjectionFinalizeEvent per driver after all vehicle-usage point
* events and before activity point events.
*/
@Priority(20)
on TachographProjectionFinalizeEvent as finalizeEvent
insert into TachographVehicleUsageIntervalInputEvent
on DriverWorkingTimeProjectionFinalizeEvent as finalizeEvent
insert into DriverWorkingTimeVehicleUsageIntervalInputEvent
select
current.sessionId as sessionId,
current.driverKey as driverKey,
@ -343,6 +343,6 @@ from MergedVehicleUsageAccumulator as current
where current.driverKey = finalizeEvent.driverKey;
@Priority(10)
on TachographProjectionFinalizeEvent as finalizeEvent
on DriverWorkingTimeProjectionFinalizeEvent as finalizeEvent
delete from MergedVehicleUsageAccumulator as current
where current.driverKey = finalizeEvent.driverKey;

View File

@ -3,6 +3,7 @@ package at.procon.eventhub.processing.driverworkingtime.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.processing.driverworkingtime.esper.DriverWorkingTimeEsperContractNames;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDerivedProjectionBundle;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeProcessingInput;
@ -22,6 +23,42 @@ import org.junit.jupiter.api.Test;
class DriverWorkingTimeReusableProjectionBuilderTest {
@Test
void commonEplUsesSourceNeutralDriverWorkingTimeInputContracts() throws IOException {
String projectionEpl = StreamUtils.copyToString(
new ClassPathResource("esper/driver-working-time-derived-projections.epl").getInputStream(),
StandardCharsets.UTF_8
);
String preprocessorEpl = StreamUtils.copyToString(
new ClassPathResource("esper/runtime-driver-event-interval-preprocessor.epl").getInputStream(),
StandardCharsets.UTF_8
);
assertThat(projectionEpl)
.contains(DriverWorkingTimeEsperContractNames.ACTIVITY_INTERVAL_INPUT_EVENT_TYPE)
.contains(DriverWorkingTimeEsperContractNames.VEHICLE_USAGE_INTERVAL_INPUT_EVENT_TYPE)
.contains(DriverWorkingTimeEsperContractNames.SUPPORT_EVIDENCE_INPUT_EVENT_TYPE)
.contains(DriverWorkingTimeEsperContractNames.VEHICLE_USAGE_INTERVAL_INPUT_WINDOW)
.doesNotContain("TachographActivityIntervalInputEvent")
.doesNotContain("TachographVehicleUsageIntervalInputEvent")
.doesNotContain("TachographSupportGeoEvidenceInputEvent")
.doesNotContain("TachographVehicleUsageIntervalInputWindow");
assertThat(preprocessorEpl)
.contains(DriverWorkingTimeEsperContractNames.ACTIVITY_POINT_INPUT_EVENT_TYPE)
.contains(DriverWorkingTimeEsperContractNames.VEHICLE_USAGE_POINT_INPUT_EVENT_TYPE)
.contains(DriverWorkingTimeEsperContractNames.ACTIVITY_INTERVAL_INPUT_EVENT_TYPE)
.contains(DriverWorkingTimeEsperContractNames.VEHICLE_USAGE_INTERVAL_INPUT_EVENT_TYPE)
.contains(DriverWorkingTimeEsperContractNames.PROJECTION_FINALIZE_EVENT_TYPE)
.doesNotContain("TachographActivityPointInputEvent")
.doesNotContain("TachographVehicleUsagePointInputEvent")
.doesNotContain("TachographProjectionFinalizeEvent");
assertThat(DriverWorkingTimeReusableProjectionBuilder.REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES)
.contains("delete from "
+ DriverWorkingTimeEsperContractNames.VEHICLE_USAGE_INTERVAL_INPUT_WINDOW);
}
@Test
void reusesWarmRuntimeWithoutLeakingPreviousState() {
DriverWorkingTimeReusableProjectionBuilder builder =