Optimize runtime event processing assembly and projection paths

This commit is contained in:
trifonovt 2026-06-08 17:20:28 +02:00
parent 3bfe05b64e
commit a9b62d807d
18 changed files with 826 additions and 87 deletions

View File

@ -26,9 +26,11 @@ import java.nio.charset.StandardCharsets;
import java.time.Instant; import java.time.Instant;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.Collections; import java.util.Collections;
import java.util.Deque;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -55,10 +57,34 @@ public class DriverWorkingTimeReusableProjectionBuilder {
private static final Map<String, Object> ACTIVITY_INTERVAL_INPUT_DEFINITION = activityIntervalInputDefinitionStatic(); private static final Map<String, Object> ACTIVITY_INTERVAL_INPUT_DEFINITION = activityIntervalInputDefinitionStatic();
private static final Map<String, Object> VEHICLE_USAGE_INTERVAL_INPUT_DEFINITION = vehicleUsageIntervalInputDefinitionStatic(); private static final Map<String, Object> VEHICLE_USAGE_INTERVAL_INPUT_DEFINITION = vehicleUsageIntervalInputDefinitionStatic();
private static final Map<String, Object> SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION = supportGeoEvidenceInputDefinitionStatic(); private static final Map<String, Object> SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION = supportGeoEvidenceInputDefinitionStatic();
private static final int MAX_IDLE_RUNTIMES_PER_DEFINITION = 2;
private static final List<String> REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES = List.of(
"delete from PreviousRestCandidateCoverageInterval",
"delete from OpenPotentialInVehicleTripState",
"delete from SupportGeoEvidenceWindow",
"delete from DailyWeeklyRestCandidateBeginGeoEvidenceCandidateWindow",
"delete from DailyWeeklyRestCandidateBeginGeoEvidenceBestScoreWindow",
"delete from DailyWeeklyRestCandidateBeginGeoEvidenceResolvedWindow",
"delete from DailyWeeklyRestCandidateEndGeoEvidenceCandidateWindow",
"delete from DailyWeeklyRestCandidateEndGeoEvidenceBestScoreWindow",
"delete from DailyWeeklyRestCandidateEndGeoEvidenceResolvedWindow",
"delete from DailyWeeklyRestCandidateBeginBoundaryOdometerCandidateWindow",
"delete from DailyWeeklyRestCandidateBeginBoundaryOdometerBestScoreWindow",
"delete from DailyWeeklyRestCandidateBeginBoundaryOdometerResolvedWindow",
"delete from DailyWeeklyRestCandidateEndBoundaryOdometerCandidateWindow",
"delete from DailyWeeklyRestCandidateEndBoundaryOdometerBestScoreWindow",
"delete from DailyWeeklyRestCandidateEndBoundaryOdometerResolvedWindow",
"delete from DailyWeeklyRestCandidateCoverageCardResolvedIntervalWindow",
"delete from DailyWeeklyRestCandidateCoverageEmittedKeyWindow",
"delete from PreviousSignificantDrivingInterval",
"context PerDriver delete from PreviousVehicleUsageInterval"
);
private final EventHubProperties properties; private final EventHubProperties properties;
private final ConcurrentMap<PreparedProjectionDefinitionCacheKey, PreparedProjectionDefinition> preparedDefinitions = private final ConcurrentMap<PreparedProjectionDefinitionCacheKey, PreparedProjectionDefinition> preparedDefinitions =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final ConcurrentMap<PreparedProjectionDefinitionCacheKey, ReusableProjectionRuntimePool> reusableRuntimePools =
new ConcurrentHashMap<>();
@Autowired @Autowired
public DriverWorkingTimeReusableProjectionBuilder( public DriverWorkingTimeReusableProjectionBuilder(
@ -159,13 +185,15 @@ public class DriverWorkingTimeReusableProjectionBuilder {
long sortOutputMs = elapsedMillis(sortOutputStartedAtNanos); long sortOutputMs = elapsedMillis(sortOutputStartedAtNanos);
LOG.info( LOG.info(
"Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})", "Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimePoolHit: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, runtimeResetMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})",
elapsedMillis(startedAtNanos), elapsedMillis(startedAtNanos),
runtimeMetrics.definitionCacheHit(), runtimeMetrics.definitionCacheHit(),
runtimeMetrics.definitionPreparationMs(), runtimeMetrics.definitionPreparationMs(),
runtimeMetrics.runtimePoolHit(),
runtimeMetrics.runtimeInitMs(), runtimeMetrics.runtimeInitMs(),
runtimeMetrics.deployMs(), runtimeMetrics.deployMs(),
runtimeMetrics.listenerRegistrationMs(), runtimeMetrics.listenerRegistrationMs(),
runtimeMetrics.runtimeResetMs(),
runtimeMetrics.sendSupportGeoMs(), runtimeMetrics.sendSupportGeoMs(),
runtimeMetrics.sendVehicleUsageMs(), runtimeMetrics.sendVehicleUsageMs(),
runtimeMetrics.sendActivityMs(), runtimeMetrics.sendActivityMs(),
@ -253,16 +281,20 @@ public class DriverWorkingTimeReusableProjectionBuilder {
Map<String, Consumer<EventBean[]>> listeners, Map<String, Consumer<EventBean[]>> listeners,
Consumer<DerivedProjectionEventSender> sender Consumer<DerivedProjectionEventSender> sender
) { ) {
EPRuntime runtime = null;
long definitionPreparationMs = 0L; long definitionPreparationMs = 0L;
boolean definitionCacheHit = false; boolean definitionCacheHit = false;
boolean runtimePoolHit = false;
long runtimeInitMs = 0L; long runtimeInitMs = 0L;
long deployMs = 0L; long deployMs = 0L;
long listenerRegistrationMs = 0L; long listenerRegistrationMs = 0L;
long runtimeResetMs = 0L;
long sendSupportGeoMs = 0L; long sendSupportGeoMs = 0L;
long sendVehicleUsageMs = 0L; long sendVehicleUsageMs = 0L;
long sendActivityMs = 0L; long sendActivityMs = 0L;
long destroyMs = 0L; long destroyMs = 0L;
ReusableProjectionRuntimePool runtimePool = null;
ReusableProjectionRuntime reusableRuntime = null;
boolean discardRuntime = false;
try { try {
PreparedProjectionDefinitionCacheKey cacheKey = new PreparedProjectionDefinitionCacheKey( PreparedProjectionDefinitionCacheKey cacheKey = new PreparedProjectionDefinitionCacheKey(
significantDrivingMinutes, significantDrivingMinutes,
@ -289,44 +321,41 @@ public class DriverWorkingTimeReusableProjectionBuilder {
definitionPreparationMs = definitionCacheHit ? 0L : elapsedMillis(definitionPreparationStartedAtNanos); definitionPreparationMs = definitionCacheHit ? 0L : elapsedMillis(definitionPreparationStartedAtNanos);
} }
long runtimeInitStartedAtNanos = System.nanoTime(); runtimePool = reusableRuntimePools.computeIfAbsent(cacheKey, ignored -> new ReusableProjectionRuntimePool());
Configuration configuration = createRuntimeConfiguration(); reusableRuntime = runtimePool.acquire(preparedDefinition, listeners);
String runtimeUri = "eventhub-driver-working-time-reusable-projection-" + RUNTIME_COUNTER.incrementAndGet(); runtimePoolHit = reusableRuntime.poolHit();
runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration); runtimeInitMs = reusableRuntime.runtimeInitMs();
runtimeInitMs = elapsedMillis(runtimeInitStartedAtNanos); deployMs = reusableRuntime.deployMs();
listenerRegistrationMs = reusableRuntime.listenerRegistrationMs();
long deployStartedAtNanos = System.nanoTime(); ReusableProjectionRuntimeExecution execution = reusableRuntime.execute(listeners, sender);
EPDeployment deployment = runtime.getDeploymentService().deploy(preparedDefinition.compiled()); runtimeResetMs = execution.runtimeResetMs();
deployMs = elapsedMillis(deployStartedAtNanos); sendSupportGeoMs = execution.sendSupportGeoMs();
sendVehicleUsageMs = execution.sendVehicleUsageMs();
long listenerRegistrationStartedAtNanos = System.nanoTime(); sendActivityMs = execution.sendActivityMs();
for (Map.Entry<String, Consumer<EventBean[]>> entry : listeners.entrySet()) {
runtime.getDeploymentService()
.getStatement(deployment.getDeploymentId(), entry.getKey())
.addListener((newData, oldData, statement, rt) -> entry.getValue().accept(newData));
}
listenerRegistrationMs = elapsedMillis(listenerRegistrationStartedAtNanos);
DerivedProjectionEventSender timedSender = new DerivedProjectionEventSender(runtime);
sender.accept(timedSender);
sendSupportGeoMs = timedSender.sendSupportGeoMs();
sendVehicleUsageMs = timedSender.sendVehicleUsageMs();
sendActivityMs = timedSender.sendActivityMs();
} catch (EPCompileException | EPDeployException e) { } catch (EPCompileException | EPDeployException e) {
discardRuntime = true;
throw new IllegalStateException("Cannot compile/deploy reusable driver working-time projection EPL bundle", e); throw new IllegalStateException("Cannot compile/deploy reusable driver working-time projection EPL bundle", e);
} catch (RuntimeException e) {
discardRuntime = true;
throw e;
} finally { } finally {
if (runtime != null) { if (reusableRuntime != null) {
long destroyStartedAtNanos = System.nanoTime(); if (discardRuntime || runtimePool == null) {
runtime.destroy(); destroyMs = reusableRuntime.destroy();
destroyMs = elapsedMillis(destroyStartedAtNanos); } else {
destroyMs = runtimePool.release(reusableRuntime);
}
} }
} }
return new DerivedProjectionRuntimeExecutionMetrics( return new DerivedProjectionRuntimeExecutionMetrics(
definitionCacheHit, definitionCacheHit,
definitionPreparationMs, definitionPreparationMs,
runtimePoolHit,
runtimeInitMs, runtimeInitMs,
deployMs, deployMs,
listenerRegistrationMs, listenerRegistrationMs,
runtimeResetMs,
sendSupportGeoMs, sendSupportGeoMs,
sendVehicleUsageMs, sendVehicleUsageMs,
sendActivityMs, sendActivityMs,
@ -334,6 +363,51 @@ public class DriverWorkingTimeReusableProjectionBuilder {
); );
} }
private ReusableProjectionRuntime createReusableRuntime(
PreparedProjectionDefinition preparedDefinition,
Map<String, Consumer<EventBean[]>> listeners
) throws EPDeployException, EPCompileException {
EPRuntime runtime = null;
try {
long runtimeInitStartedAtNanos = System.nanoTime();
Configuration configuration = createRuntimeConfiguration();
String runtimeUri = "eventhub-driver-working-time-reusable-projection-" + RUNTIME_COUNTER.incrementAndGet();
runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration);
long runtimeInitMs = elapsedMillis(runtimeInitStartedAtNanos);
long deployStartedAtNanos = System.nanoTime();
EPDeployment deployment = runtime.getDeploymentService().deploy(preparedDefinition.compiled());
long deployMs = elapsedMillis(deployStartedAtNanos);
ReusableProjectionRuntime reusableRuntime = new ReusableProjectionRuntime(runtime, runtimeInitMs, deployMs, false);
long listenerRegistrationStartedAtNanos = System.nanoTime();
for (String statementName : listeners.keySet()) {
runtime.getDeploymentService()
.getStatement(deployment.getDeploymentId(), statementName)
.addListener((newData, oldData, statement, rt) -> reusableRuntime.onStatementEvents(statement.getName(), newData));
}
reusableRuntime.listenerRegistrationMs(elapsedMillis(listenerRegistrationStartedAtNanos));
reusableRuntime.cleanupQueries(compileCleanupQueries(runtime));
return reusableRuntime;
} catch (EPDeployException | EPCompileException | RuntimeException ex) {
if (runtime != null && !runtime.isDestroyed()) {
runtime.destroy();
}
throw ex;
}
}
private List<EPCompiled> compileCleanupQueries(EPRuntime runtime) throws EPCompileException {
CompilerArguments arguments = new CompilerArguments(runtime.getConfigurationDeepCopy());
arguments.getPath().add(runtime.getRuntimePath());
List<EPCompiled> compiledQueries = new ArrayList<>(REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES.size());
for (String cleanupQuery : REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES) {
compiledQueries.add(EPCompilerProvider.getCompiler().compileQuery(cleanupQuery, arguments));
}
return List.copyOf(compiledQueries);
}
private Map<String, Object> activityIntervalInputDefinition() { private Map<String, Object> activityIntervalInputDefinition() {
return ACTIVITY_INTERVAL_INPUT_DEFINITION; return ACTIVITY_INTERVAL_INPUT_DEFINITION;
} }
@ -894,9 +968,11 @@ public class DriverWorkingTimeReusableProjectionBuilder {
private record DerivedProjectionRuntimeExecutionMetrics( private record DerivedProjectionRuntimeExecutionMetrics(
boolean definitionCacheHit, boolean definitionCacheHit,
long definitionPreparationMs, long definitionPreparationMs,
boolean runtimePoolHit,
long runtimeInitMs, long runtimeInitMs,
long deployMs, long deployMs,
long listenerRegistrationMs, long listenerRegistrationMs,
long runtimeResetMs,
long sendSupportGeoMs, long sendSupportGeoMs,
long sendVehicleUsageMs, long sendVehicleUsageMs,
long sendActivityMs, long sendActivityMs,
@ -904,6 +980,152 @@ public class DriverWorkingTimeReusableProjectionBuilder {
) { ) {
} }
private final class ReusableProjectionRuntimePool {
private final Deque<ReusableProjectionRuntime> idleRuntimes = new ArrayDeque<>();
private synchronized ReusableProjectionRuntime acquire(
PreparedProjectionDefinition preparedDefinition,
Map<String, Consumer<EventBean[]>> listeners
) throws EPDeployException, EPCompileException {
ReusableProjectionRuntime runtime = idleRuntimes.pollFirst();
if (runtime != null) {
return runtime.reused();
}
return createReusableRuntime(preparedDefinition, listeners);
}
private long release(ReusableProjectionRuntime runtime) {
if (runtime == null) {
return 0L;
}
synchronized (this) {
if (idleRuntimes.size() < MAX_IDLE_RUNTIMES_PER_DEFINITION) {
runtime.poolHit(false);
idleRuntimes.addLast(runtime);
return 0L;
}
}
return runtime.destroy();
}
}
private static final class ReusableProjectionRuntime {
private final EPRuntime runtime;
private final long runtimeInitMs;
private final long deployMs;
private volatile long listenerRegistrationMs;
private volatile List<EPCompiled> cleanupQueries = List.of();
private volatile boolean poolHit;
private ExecutionListeners currentExecutionListeners;
private ReusableProjectionRuntime(
EPRuntime runtime,
long runtimeInitMs,
long deployMs,
boolean poolHit
) {
this.runtime = runtime;
this.runtimeInitMs = runtimeInitMs;
this.deployMs = deployMs;
this.poolHit = poolHit;
}
private synchronized ReusableProjectionRuntimeExecution execute(
Map<String, Consumer<EventBean[]>> listeners,
Consumer<DerivedProjectionEventSender> sender
) {
currentExecutionListeners = new ExecutionListeners(listeners);
long runtimeResetStartedAtNanos = System.nanoTime();
for (EPCompiled cleanupQuery : cleanupQueries) {
runtime.getFireAndForgetService().executeQuery(cleanupQuery);
}
long runtimeResetMs = elapsedMillisStatic(runtimeResetStartedAtNanos);
try {
DerivedProjectionEventSender timedSender = new DerivedProjectionEventSender(runtime);
sender.accept(timedSender);
return new ReusableProjectionRuntimeExecution(
runtimeResetMs,
timedSender.sendSupportGeoMs(),
timedSender.sendVehicleUsageMs(),
timedSender.sendActivityMs()
);
} finally {
currentExecutionListeners = null;
}
}
private void onStatementEvents(String statementName, EventBean[] newData) {
ExecutionListeners listeners = currentExecutionListeners;
if (listeners == null || newData == null || statementName == null) {
return;
}
Consumer<EventBean[]> consumer = listeners.listeners().get(statementName);
if (consumer != null) {
consumer.accept(newData);
}
}
private long destroy() {
if (runtime == null || runtime.isDestroyed()) {
return 0L;
}
long destroyStartedAtNanos = System.nanoTime();
runtime.destroy();
return elapsedMillisStatic(destroyStartedAtNanos);
}
private ReusableProjectionRuntime reused() {
poolHit = true;
return this;
}
private void cleanupQueries(List<EPCompiled> value) {
cleanupQueries = value == null ? List.of() : value;
}
private void listenerRegistrationMs(long value) {
listenerRegistrationMs = value;
}
private void poolHit(boolean value) {
poolHit = value;
}
private boolean poolHit() {
return poolHit;
}
private long runtimeInitMs() {
return poolHit ? 0L : runtimeInitMs;
}
private long deployMs() {
return poolHit ? 0L : deployMs;
}
private long listenerRegistrationMs() {
return poolHit ? 0L : listenerRegistrationMs;
}
}
private record ExecutionListeners(
Map<String, Consumer<EventBean[]>> listeners
) {
private ExecutionListeners {
listeners = listeners == null ? Map.of() : Map.copyOf(listeners);
}
}
private record ReusableProjectionRuntimeExecution(
long runtimeResetMs,
long sendSupportGeoMs,
long sendVehicleUsageMs,
long sendActivityMs
) {
}
private static final class DerivedProjectionEventSender { private static final class DerivedProjectionEventSender {
private final EPRuntime delegate; private final EPRuntime delegate;
@ -949,4 +1171,8 @@ public class DriverWorkingTimeReusableProjectionBuilder {
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
} }
} }
private static long elapsedMillisStatic(long startedAtNanos) {
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
}
} }

View File

@ -40,7 +40,7 @@ public record RuntimeEventProcessingApiRequest(
scope != null ? scope.includeAllDrivers() : null, scope != null ? scope.includeAllDrivers() : null,
scope != null ? scope.vehicleKeys() : null, scope != null ? scope.vehicleKeys() : null,
scope != null ? scope.includeAllVehicles() : null, scope != null ? scope.includeAllVehicles() : null,
null, scope != null ? scope.expandVehicleEvents() : null,
scope != null ? scope.vehicleExpansionPaddingMinutes() : null, scope != null ? scope.vehicleExpansionPaddingMinutes() : null,
null null
), ),

View File

@ -39,6 +39,8 @@ public class RuntimeEventAssemblyModule implements RuntimeProcessingModule {
UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context); UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context);
UnifiedRuntimeEventBundle bundle = eventAssemblyService.assembleDriverScopedEvents(scopeRequest.toRuntimeRequest()); UnifiedRuntimeEventBundle bundle = eventAssemblyService.assembleDriverScopedEvents(scopeRequest.toRuntimeRequest());
Map<String, Object> metadata = new LinkedHashMap<>(); Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("expandVehicleEvents", scopeRequest.expandVehicleEvents() == null || scopeRequest.expandVehicleEvents());
metadata.put("vehicleExpansionPaddingMinutes", scopeRequest.vehicleExpansionPaddingMinutes() == null ? 0 : scopeRequest.vehicleExpansionPaddingMinutes());
metadata.put("driverSeedEventCount", bundle.driverSeedEvents().size()); metadata.put("driverSeedEventCount", bundle.driverSeedEvents().size());
metadata.put("expandedVehicleEventCount", bundle.expandedVehicleEvents().size()); metadata.put("expandedVehicleEventCount", bundle.expandedVehicleEvents().size());
metadata.put("mergedEventCount", bundle.mergedEvents().size()); metadata.put("mergedEventCount", bundle.mergedEvents().size());

View File

@ -85,8 +85,16 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
directDriverEvents, directDriverEvents,
broadBundle.mergedEvents(), broadBundle.mergedEvents(),
driverVehicleUsageIntervals, driverVehicleUsageIntervals,
scopeRequest.expandVehicleEvents() == null || scopeRequest.expandVehicleEvents(), booleanAttribute(
scopeRequest.vehicleExpansionPaddingMinutes() == null ? 0 : scopeRequest.vehicleExpansionPaddingMinutes(), context,
at.procon.eventhub.processing.eventprocessing.plan.DriverWorkingTimeRuntimeProcessingPlan.ATTACH_VEHICLE_ONLY_EVENTS_ATTRIBUTE,
scopeRequest.expandVehicleEvents() == null || scopeRequest.expandVehicleEvents()
),
integerAttribute(
context,
at.procon.eventhub.processing.eventprocessing.plan.DriverWorkingTimeRuntimeProcessingPlan.VEHICLE_EVIDENCE_PADDING_MINUTES_ATTRIBUTE,
scopeRequest.vehicleExpansionPaddingMinutes() == null ? 0 : scopeRequest.vehicleExpansionPaddingMinutes()
),
includePartitionDebug includePartitionDebug
); );
for (EventHubEventDto attachedEvent : attachmentResult.attachedVehicleEvidenceEvents()) { for (EventHubEventDto attachedEvent : attachmentResult.attachedVehicleEvidenceEvents()) {
@ -291,4 +299,23 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
} }
return fallback; return fallback;
} }
private int integerAttribute(RuntimeProcessingModuleContext context, String key, int fallback) {
Object value = context.attributes().get(key);
if (value instanceof Number number) {
return Math.max(0, number.intValue());
}
if (value == null) {
return Math.max(0, fallback);
}
String text = value.toString();
if (text.isBlank()) {
return Math.max(0, fallback);
}
try {
return Math.max(0, Integer.parseInt(text.trim()));
} catch (NumberFormatException ignored) {
return Math.max(0, fallback);
}
}
} }

View File

@ -30,6 +30,8 @@ import org.springframework.stereotype.Component;
public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessingPlan { public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessingPlan {
public static final String PLAN_KEY = "driver-working-time-v1"; public static final String PLAN_KEY = "driver-working-time-v1";
public static final String ATTACH_VEHICLE_ONLY_EVENTS_ATTRIBUTE = "attachVehicleOnlyEvents";
public static final String VEHICLE_EVIDENCE_PADDING_MINUTES_ATTRIBUTE = "vehicleEvidencePaddingMinutes";
private final RuntimeProcessingPipelineExecutor pipelineExecutor; private final RuntimeProcessingPipelineExecutor pipelineExecutor;
private final boolean includeRuntimeEventAssemblyModule; private final boolean includeRuntimeEventAssemblyModule;
@ -180,6 +182,16 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"includePartitionDebug", "includePartitionDebug",
request.partitioning() != null && request.partitioning().includeDebugOrDefault() request.partitioning() != null && request.partitioning().includeDebugOrDefault()
); );
boolean attachVehicleOnlyEvents = resolveAttachVehicleOnlyEvents(
request.sourceSelection(),
request.partitioning(),
request.parameters()
);
int vehicleEvidencePaddingMinutes = resolveVehicleEvidencePaddingMinutes(
request.sourceSelection(),
request.partitioning(),
request.parameters()
);
UnifiedRuntimeProcessingApiRequest scopeRequest = applyExecutionRequest( UnifiedRuntimeProcessingApiRequest scopeRequest = applyExecutionRequest(
request.sourceSelection(), request.sourceSelection(),
request.partitioning(), request.partitioning(),
@ -189,6 +201,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
Map<String, Object> attributes = new LinkedHashMap<>(); Map<String, Object> attributes = new LinkedHashMap<>();
attributes.put("runtimeScopeApiRequest", scopeRequest); attributes.put("runtimeScopeApiRequest", scopeRequest);
attributes.put("includePartitionDebug", includePartitionDebug); attributes.put("includePartitionDebug", includePartitionDebug);
attributes.put(ATTACH_VEHICLE_ONLY_EVENTS_ATTRIBUTE, attachVehicleOnlyEvents);
attributes.put(VEHICLE_EVIDENCE_PADDING_MINUTES_ATTRIBUTE, vehicleEvidencePaddingMinutes);
RuntimeProcessingModuleContext initialContext = new RuntimeProcessingModuleContext( RuntimeProcessingModuleContext initialContext = new RuntimeProcessingModuleContext(
request, request,
List.of(), List.of(),
@ -373,12 +387,6 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
sourceSelection.includeActivityIntervalsOrDefault()); sourceSelection.includeActivityIntervalsOrDefault());
boolean includeDrivingIntervals = booleanParameter(parameters, "includeDrivingIntervals", boolean includeDrivingIntervals = booleanParameter(parameters, "includeDrivingIntervals",
sourceSelection.includeDrivingIntervalsOrDefault()); sourceSelection.includeDrivingIntervalsOrDefault());
boolean attachVehicleOnlyEvents = booleanParameter(parameters, "attachVehicleOnlyEvents",
partitioning == null ? sourceSelection.expandVehicleEvents() == null || sourceSelection.expandVehicleEvents() : partitioning.attachVehicleEvidenceOrDefault());
Integer vehicleEvidencePaddingMinutes = nonNegativeIntegerParameter(parameters, "vehicleEvidencePaddingMinutes",
partitioning == null
? sourceSelection.vehicleExpansionPaddingMinutes()
: partitioning.vehicleEvidencePaddingMinutesOrDefault(sourceSelection.vehicleExpansionPaddingMinutes() == null ? 0 : sourceSelection.vehicleExpansionPaddingMinutes()));
return new UnifiedRuntimeProcessingApiRequest( return new UnifiedRuntimeProcessingApiRequest(
sourceSelection.sessionId(), sourceSelection.sessionId(),
@ -398,8 +406,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
sourceSelection.driverCardNumber(), sourceSelection.driverCardNumber(),
sourceSelection.occurredFrom(), sourceSelection.occurredFrom(),
sourceSelection.occurredTo(), sourceSelection.occurredTo(),
attachVehicleOnlyEvents, sourceSelection.expandVehicleEvents(),
vehicleEvidencePaddingMinutes, sourceSelection.vehicleExpansionPaddingMinutes(),
sourceSelection.includeIntersectingIntervals(), sourceSelection.includeIntersectingIntervals(),
significantDrivingMinutes, significantDrivingMinutes,
minimumRestPeriodMinutes, minimumRestPeriodMinutes,
@ -409,6 +417,37 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
); );
} }
private boolean resolveAttachVehicleOnlyEvents(
UnifiedRuntimeProcessingApiRequest sourceSelection,
RuntimeEventPartitioningApiRequest partitioning,
Map<String, Object> parameters
) {
return booleanParameter(
parameters,
"attachVehicleOnlyEvents",
partitioning == null
? sourceSelection.expandVehicleEvents() == null || sourceSelection.expandVehicleEvents()
: partitioning.attachVehicleEvidenceOrDefault()
);
}
private int resolveVehicleEvidencePaddingMinutes(
UnifiedRuntimeProcessingApiRequest sourceSelection,
RuntimeEventPartitioningApiRequest partitioning,
Map<String, Object> parameters
) {
Integer resolved = nonNegativeIntegerParameter(
parameters,
"vehicleEvidencePaddingMinutes",
partitioning == null
? sourceSelection.vehicleExpansionPaddingMinutes()
: partitioning.vehicleEvidencePaddingMinutesOrDefault(
sourceSelection.vehicleExpansionPaddingMinutes() == null ? 0 : sourceSelection.vehicleExpansionPaddingMinutes()
)
);
return resolved == null ? 0 : resolved;
}
private List<String> requestedOrDefaultModules(List<String> requestedModules) { private List<String> requestedOrDefaultModules(List<String> requestedModules) {
if (requestedModules != null && !requestedModules.isEmpty()) { if (requestedModules != null && !requestedModules.isEmpty()) {
LinkedHashMap<String, String> requested = new LinkedHashMap<>(); LinkedHashMap<String, String> requested = new LinkedHashMap<>();

View File

@ -48,7 +48,7 @@ public record RuntimeProcessingExecutionApiRequest(
sourceSelection != null ? sourceSelection.includeAllDrivers() : null, sourceSelection != null ? sourceSelection.includeAllDrivers() : null,
sourceSelection != null ? sourceSelection.vehicleKeys() : null, sourceSelection != null ? sourceSelection.vehicleKeys() : null,
sourceSelection != null ? sourceSelection.includeAllVehicles() : null, sourceSelection != null ? sourceSelection.includeAllVehicles() : null,
null, sourceSelection != null ? sourceSelection.expandVehicleEvents() : null,
sourceSelection != null ? sourceSelection.vehicleExpansionPaddingMinutes() : null, sourceSelection != null ? sourceSelection.vehicleExpansionPaddingMinutes() : null,
null null
), ),

View File

@ -14,11 +14,15 @@ import java.util.Comparator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
public class UnifiedRuntimeEventAssemblyService { public class UnifiedRuntimeEventAssemblyService {
private static final Logger LOG = LoggerFactory.getLogger(UnifiedRuntimeEventAssemblyService.class);
private final List<RuntimeDriverEventLoader> driverEventLoaders; private final List<RuntimeDriverEventLoader> driverEventLoaders;
private final List<RuntimeVehicleEventLoader> vehicleEventLoaders; private final List<RuntimeVehicleEventLoader> vehicleEventLoaders;
@ -34,10 +38,13 @@ public class UnifiedRuntimeEventAssemblyService {
List<UnifiedRuntimeSourceInput> sourceInputs = request.normalizedSourceInputs(); List<UnifiedRuntimeSourceInput> sourceInputs = request.normalizedSourceInputs();
List<EventHubEventDto> driverSeedEvents = loadDriverSeedEvents(request); List<EventHubEventDto> driverSeedEvents = loadDriverSeedEvents(request);
List<UnifiedDiscoveredVehicleRef> discoveredVehicles = discoverVehicles(driverSeedEvents); List<UnifiedDiscoveredVehicleRef> discoveredVehicles = discoverVehicles(driverSeedEvents);
List<EventHubEventDto> expandedVehicleEvents = request.expandVehicleEvents() boolean expandVehicleEvents = request.expandVehicleEvents();
List<EventHubEventDto> expandedVehicleEvents = expandVehicleEvents
? loadExpandedVehicleEvents(request, discoveredVehicles) ? loadExpandedVehicleEvents(request, discoveredVehicles)
: List.of(); : List.of();
List<EventHubEventDto> mergedEvents = deduplicateAndSort(driverSeedEvents, expandedVehicleEvents); List<EventHubEventDto> mergedEvents = expandVehicleEvents
? deduplicateAndSort(driverSeedEvents, expandedVehicleEvents)
: driverSeedEvents;
List<String> notes = new ArrayList<>(); List<String> notes = new ArrayList<>();
boolean includesEventHub = sourceInputs.stream() boolean includesEventHub = sourceInputs.stream()
@ -63,12 +70,21 @@ public class UnifiedRuntimeEventAssemblyService {
notes.add("Tachograph file-session events were loaded from session " + sourceInput.sessionId() + "."); notes.add("Tachograph file-session events were loaded from session " + sourceInput.sessionId() + ".");
} }
} }
if (request.expandVehicleEvents()) { if (expandVehicleEvents) {
notes.add("Vehicle expansion loaded additional events for vehicles discovered in the driver seed set."); notes.add("Vehicle expansion loaded additional events for vehicles discovered in the driver seed set.");
notes.add("Vehicle expansion padding minutes: " + request.vehicleExpansionPaddingMinutes() + "."); notes.add("Vehicle expansion padding minutes: " + request.vehicleExpansionPaddingMinutes() + ".");
} else { } else {
notes.add("Vehicle expansion was disabled for this runtime request."); notes.add("Vehicle expansion was disabled for this runtime request.");
} }
LOG.info(
"Runtime event assembly completed (expandVehicleEvents: {}, sourceInputs: {}, driverSeedEvents: {}, discoveredVehicles: {}, expandedVehicleEvents: {}, mergedEvents: {})",
expandVehicleEvents,
sourceInputs.size(),
driverSeedEvents.size(),
discoveredVehicles.size(),
expandedVehicleEvents.size(),
mergedEvents.size()
);
return new UnifiedRuntimeEventBundle( return new UnifiedRuntimeEventBundle(
request, request,
driverSeedEvents, driverSeedEvents,

View File

@ -9,7 +9,7 @@ import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession; import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle; import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
import at.procon.eventhub.tachographfilesession.service.DriverNotFoundInSessionException; import at.procon.eventhub.tachographfilesession.service.DriverNotFoundInSessionException;
import at.procon.eventhub.tachographfilesession.service.DriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import java.util.List; import java.util.List;
@ -19,14 +19,14 @@ import org.springframework.stereotype.Component;
public class TachographFileSessionUnifiedDriverEventSource implements UnifiedDriverEventSource { public class TachographFileSessionUnifiedDriverEventSource implements UnifiedDriverEventSource {
private final TachographFileSessionRepository repository; private final TachographFileSessionRepository repository;
private final DriverTimelineEventBuilder eventBuilder; private final RawSourceDriverTimelineEventBuilder rawSourceEventBuilder;
public TachographFileSessionUnifiedDriverEventSource( public TachographFileSessionUnifiedDriverEventSource(
TachographFileSessionRepository repository, TachographFileSessionRepository repository,
DriverTimelineEventBuilder eventBuilder RawSourceDriverTimelineEventBuilder rawSourceEventBuilder
) { ) {
this.repository = repository; this.repository = repository;
this.eventBuilder = eventBuilder; this.rawSourceEventBuilder = rawSourceEventBuilder;
} }
@Override @Override
@ -56,7 +56,7 @@ public class TachographFileSessionUnifiedDriverEventSource implements UnifiedDri
DriverExtractionSession driver, DriverExtractionSession driver,
UnifiedDriverEventsRequest request UnifiedDriverEventsRequest request
) { ) {
TachographTimelineEventBundle bundle = eventBuilder.buildEventBundle(session, driver); TachographTimelineEventBundle bundle = rawSourceEventBuilder.buildRawEventBundle(session, driver);
return TachographTimelineEventBundle.fromRuntimeBundle( return TachographTimelineEventBundle.fromRuntimeBundle(
RuntimeIntervalEventWindowSelector.filterBundle( RuntimeIntervalEventWindowSelector.filterBundle(
bundle == null ? null : bundle.toRuntimeBundle(), bundle == null ? null : bundle.toRuntimeBundle(),

View File

@ -10,7 +10,7 @@ import at.procon.eventhub.reference.TachographNationRegistry;
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession; import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle; import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
import at.procon.eventhub.tachographfilesession.service.DriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import java.util.List; import java.util.List;
@ -20,14 +20,14 @@ import org.springframework.stereotype.Component;
public class TachographFileSessionUnifiedVehicleEventSource implements UnifiedVehicleEventSource { public class TachographFileSessionUnifiedVehicleEventSource implements UnifiedVehicleEventSource {
private final TachographFileSessionRepository repository; private final TachographFileSessionRepository repository;
private final DriverTimelineEventBuilder eventBuilder; private final RawSourceDriverTimelineEventBuilder rawSourceEventBuilder;
public TachographFileSessionUnifiedVehicleEventSource( public TachographFileSessionUnifiedVehicleEventSource(
TachographFileSessionRepository repository, TachographFileSessionRepository repository,
DriverTimelineEventBuilder eventBuilder RawSourceDriverTimelineEventBuilder rawSourceEventBuilder
) { ) {
this.repository = repository; this.repository = repository;
this.eventBuilder = eventBuilder; this.rawSourceEventBuilder = rawSourceEventBuilder;
} }
@Override @Override
@ -52,7 +52,7 @@ public class TachographFileSessionUnifiedVehicleEventSource implements UnifiedVe
DriverExtractionSession driver, DriverExtractionSession driver,
UnifiedVehicleEventsRequest request UnifiedVehicleEventsRequest request
) { ) {
TachographTimelineEventBundle bundle = eventBuilder.buildEventBundle(session, driver); TachographTimelineEventBundle bundle = rawSourceEventBuilder.buildRawEventBundle(session, driver);
return TachographTimelineEventBundle.fromRuntimeBundle( return TachographTimelineEventBundle.fromRuntimeBundle(
RuntimeIntervalEventWindowSelector.filterBundle( RuntimeIntervalEventWindowSelector.filterBundle(
bundle == null ? null : bundle.toRuntimeBundle(), bundle == null ? null : bundle.toRuntimeBundle(),

View File

@ -288,7 +288,7 @@ create schema DailyWeeklyRestCandidateCoverageEmittedKey(
endedAtEpochSecond long endedAtEpochSecond long
); );
create context PerDriver partition by driverKey from TachographVehicleUsageIntervalInputEvent; @public create context PerDriver partition by driverKey from TachographVehicleUsageIntervalInputEvent;
create schema VuCardAbsentInterval( create schema VuCardAbsentInterval(
sessionId java.util.UUID, sessionId java.util.UUID,
@ -438,27 +438,27 @@ create schema PotentialInVehicleTripInterval(
lastNextDrivingSourceIntervalId string lastNextDrivingSourceIntervalId string
); );
create window PreviousRestCandidateCoverageInterval#unique(driverKey) as DailyWeeklyRestCandidateCoverageInterval; @public create window PreviousRestCandidateCoverageInterval#unique(driverKey) as DailyWeeklyRestCandidateCoverageInterval;
create window OpenPotentialInVehicleTripState#unique(driverKey) as PotentialInVehicleTripState; @public create window OpenPotentialInVehicleTripState#unique(driverKey) as PotentialInVehicleTripState;
create window SupportGeoEvidenceWindow#keepall as SupportGeoEvidence; @public create window SupportGeoEvidenceWindow#keepall as SupportGeoEvidence;
create window DailyWeeklyRestCandidateBeginGeoEvidenceCandidateWindow#keepall as DailyWeeklyRestCandidateBeginGeoEvidenceCandidate; @public create window DailyWeeklyRestCandidateBeginGeoEvidenceCandidateWindow#keepall as DailyWeeklyRestCandidateBeginGeoEvidenceCandidate;
create window DailyWeeklyRestCandidateBeginGeoEvidenceBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginGeoEvidenceBestScore; @public create window DailyWeeklyRestCandidateBeginGeoEvidenceBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginGeoEvidenceBestScore;
create window DailyWeeklyRestCandidateBeginGeoEvidenceResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginGeoEvidenceResolved; @public create window DailyWeeklyRestCandidateBeginGeoEvidenceResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginGeoEvidenceResolved;
create window DailyWeeklyRestCandidateEndGeoEvidenceCandidateWindow#keepall as DailyWeeklyRestCandidateEndGeoEvidenceCandidate; @public create window DailyWeeklyRestCandidateEndGeoEvidenceCandidateWindow#keepall as DailyWeeklyRestCandidateEndGeoEvidenceCandidate;
create window DailyWeeklyRestCandidateEndGeoEvidenceBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndGeoEvidenceBestScore; @public create window DailyWeeklyRestCandidateEndGeoEvidenceBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndGeoEvidenceBestScore;
create window DailyWeeklyRestCandidateEndGeoEvidenceResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndGeoEvidenceResolved; @public create window DailyWeeklyRestCandidateEndGeoEvidenceResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndGeoEvidenceResolved;
create window DailyWeeklyRestCandidateBeginBoundaryOdometerCandidateWindow#keepall as DailyWeeklyRestCandidateBeginBoundaryOdometerCandidate; @public create window DailyWeeklyRestCandidateBeginBoundaryOdometerCandidateWindow#keepall as DailyWeeklyRestCandidateBeginBoundaryOdometerCandidate;
create window DailyWeeklyRestCandidateBeginBoundaryOdometerBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginBoundaryOdometerBestScore; @public create window DailyWeeklyRestCandidateBeginBoundaryOdometerBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginBoundaryOdometerBestScore;
create window DailyWeeklyRestCandidateBeginBoundaryOdometerResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginBoundaryOdometerResolved; @public create window DailyWeeklyRestCandidateBeginBoundaryOdometerResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginBoundaryOdometerResolved;
create window DailyWeeklyRestCandidateEndBoundaryOdometerCandidateWindow#keepall as DailyWeeklyRestCandidateEndBoundaryOdometerCandidate; @public create window DailyWeeklyRestCandidateEndBoundaryOdometerCandidateWindow#keepall as DailyWeeklyRestCandidateEndBoundaryOdometerCandidate;
create window DailyWeeklyRestCandidateEndBoundaryOdometerBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndBoundaryOdometerBestScore; @public create window DailyWeeklyRestCandidateEndBoundaryOdometerBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndBoundaryOdometerBestScore;
create window DailyWeeklyRestCandidateEndBoundaryOdometerResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndBoundaryOdometerResolved; @public create window DailyWeeklyRestCandidateEndBoundaryOdometerResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndBoundaryOdometerResolved;
create window DailyWeeklyRestCandidateCoverageCardResolvedIntervalWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateCoverageCardResolvedInterval; @public create window DailyWeeklyRestCandidateCoverageCardResolvedIntervalWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateCoverageCardResolvedInterval;
create window DailyWeeklyRestCandidateCoverageEmittedKeyWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateCoverageEmittedKey; @public create window DailyWeeklyRestCandidateCoverageEmittedKeyWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateCoverageEmittedKey;
insert into SupportGeoEvidenceWindow insert into SupportGeoEvidenceWindow
select select
@ -488,7 +488,7 @@ select
vehicleKey vehicleKey
from TachographActivityIntervalInputEvent(activityType = 'DRIVE', durationSeconds > ${SIGNIFICANT_DRIVING_THRESHOLD_SECONDS}); from TachographActivityIntervalInputEvent(activityType = 'DRIVE', durationSeconds > ${SIGNIFICANT_DRIVING_THRESHOLD_SECONDS});
create window PreviousSignificantDrivingInterval#unique(driverKey) as SignificantDrivingInterval; @public create window PreviousSignificantDrivingInterval#unique(driverKey) as SignificantDrivingInterval;
on SignificantDrivingInterval as next on SignificantDrivingInterval as next
insert into DrivingInterruptionInterval insert into DrivingInterruptionInterval
@ -1483,7 +1483,7 @@ select
endedAtEpochSecond endedAtEpochSecond
from DailyWeeklyRestCandidateCoverageInterval; from DailyWeeklyRestCandidateCoverageInterval;
context PerDriver @public context PerDriver
create window PreviousVehicleUsageInterval#lastevent as TachographVehicleUsageIntervalInputEvent; create window PreviousVehicleUsageInterval#lastevent as TachographVehicleUsageIntervalInputEvent;
@Priority(30) @Priority(30)

View File

@ -0,0 +1,115 @@
package at.procon.eventhub.processing.driverworkingtime.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDerivedProjectionBundle;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeProcessingInput;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class DriverWorkingTimeReusableProjectionBuilderTest {
@Test
void reusesWarmRuntimeWithoutLeakingPreviousState() {
DriverWorkingTimeReusableProjectionBuilder builder =
new DriverWorkingTimeReusableProjectionBuilder(new EventHubProperties());
UUID sessionId = UUID.randomUUID();
OffsetDateTime from = OffsetDateTime.parse("2026-05-01T08:00:00Z");
OffsetDateTime firstDriveEnd = OffsetDateTime.parse("2026-05-01T09:00:00Z");
OffsetDateTime secondDriveStart = OffsetDateTime.parse("2026-05-01T10:00:00Z");
OffsetDateTime to = OffsetDateTime.parse("2026-05-01T11:00:00Z");
DriverWorkingTimeProcessingInput input = new DriverWorkingTimeProcessingInput(
sessionId,
"12:123",
"DRIVER_CARD",
from,
to,
from,
to,
15,
30,
List.of(
new DriverWorkingTimeActivityInterval(
sessionId,
"12:123",
"ACT-1",
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"DRIVER_CARD",
"ACT-1",
"ACT-1",
from,
firstDriveEnd,
from.toEpochSecond(),
firstDriveEnd.toEpochSecond(),
firstDriveEnd.toEpochSecond() - from.toEpochSecond(),
List.of("ACT-1"),
false,
false,
"RAW_INTERVAL"
),
new DriverWorkingTimeActivityInterval(
sessionId,
"12:123",
"ACT-2",
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"DRIVER_CARD",
"ACT-2",
"ACT-2",
secondDriveStart,
to,
secondDriveStart.toEpochSecond(),
to.toEpochSecond(),
to.toEpochSecond() - secondDriveStart.toEpochSecond(),
List.of("ACT-2"),
false,
false,
"RAW_INTERVAL"
)
),
List.of(
new DriverWorkingTimeVehicleUsageInterval(
sessionId,
"12:123",
"VU-1",
"VU-1",
"VU-1",
from,
to,
from.toEpochSecond(),
to.toEpochSecond(),
to.toEpochSecond() - from.toEpochSecond(),
100L,
150L,
"12:REG-1",
"VIN-1",
"DRIVER_CARD",
List.of("VU-1")
)
),
List.of(),
List.of()
);
DriverWorkingTimeDerivedProjectionBundle first = builder.buildDerivedProjectionBundle(input);
DriverWorkingTimeDerivedProjectionBundle second = builder.buildDerivedProjectionBundle(input);
assertThat(second).isEqualTo(first);
assertThat(second.drivingInterruptionIntervals()).hasSize(1);
}
}

View File

@ -0,0 +1,49 @@
package at.procon.eventhub.processing.eventprocessing.dto;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class RuntimeEventProcessingApiRequestTest {
@Test
void driverWorkingTimePreservesDisabledVehicleExpansion() {
UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest(
UUID.randomUUID(),
List.of(),
null,
null,
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:123",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0,
true,
null,
null,
null,
null,
List.of()
);
RuntimeEventProcessingApiRequest request = RuntimeEventProcessingApiRequest.driverWorkingTime(scope);
assertThat(request.partitioning().attachVehicleEvidence()).isFalse();
}
}

View File

@ -0,0 +1,137 @@
package at.procon.eventhub.processing.eventprocessing.module;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDriverPartition;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.eventprocessing.plan.DriverWorkingTimeRuntimeProcessingPlan;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.service.RuntimeDriverVehicleEvidenceAttachmentService;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class VehicleEvidenceAttachmentModuleTest {
@Test
void usesAttachmentAttributeInsteadOfScopeExpansionFlag() {
RuntimeDriverVehicleEvidenceAttachmentService service =
org.mockito.Mockito.mock(RuntimeDriverVehicleEvidenceAttachmentService.class);
VehicleEvidenceAttachmentModule module = new VehicleEvidenceAttachmentModule(service);
UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest(
UUID.randomUUID(),
List.of(),
null,
null,
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:123",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15,
true,
null,
null,
null,
null,
List.of()
);
UnifiedRuntimeProcessingRequest runtimeRequest = scope.toRuntimeRequest();
UnifiedRuntimeEventBundle bundle = new UnifiedRuntimeEventBundle(
runtimeRequest,
List.of(),
List.of(),
List.of(),
List.of(),
List.of()
);
when(service.attachVehicleEvidence(
eq("12:123"),
eq(List.of()),
eq(List.of()),
eq(List.<DriverWorkingTimeVehicleUsageInterval>of()),
eq(false),
eq(3),
anyBoolean()
)).thenReturn(new RuntimeDriverVehicleEvidenceAttachmentResult(
"12:123",
List.of(),
List.of(),
List.of(),
0,
0,
0,
List.of(),
List.of(),
List.of(),
List.of()
));
RuntimeProcessingModuleContext context = new RuntimeProcessingModuleContext(
new RuntimeProcessingExecutionApiRequest("driver-working-time-v1", scope, null, List.of(), Map.of()),
List.of(),
Map.of(
"runtimeScopeApiRequest", scope,
DriverWorkingTimeRuntimeProcessingPlan.ATTACH_VEHICLE_ONLY_EVENTS_ATTRIBUTE, false,
DriverWorkingTimeRuntimeProcessingPlan.VEHICLE_EVIDENCE_PADDING_MINUTES_ATTRIBUTE, 3
),
Map.of(
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
new RuntimeProcessingModuleResult(
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
RuntimeProcessingModuleStatus.SUCCESS,
bundle,
Map.of(),
List.of()
),
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
new RuntimeProcessingModuleResult(
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
RuntimeProcessingModuleStatus.SUCCESS,
List.of(),
Map.of(),
List.of()
)
)
);
RuntimeProcessingModuleResult result = module.execute(context);
verify(service).attachVehicleEvidence(
eq("12:123"),
eq(List.of()),
eq(List.of()),
eq(List.<DriverWorkingTimeVehicleUsageInterval>of()),
eq(false),
eq(3),
eq(false)
);
@SuppressWarnings("unchecked")
Map<String, DriverWorkingTimeDriverPartition> partitions =
(Map<String, DriverWorkingTimeDriverPartition>) result.output();
org.assertj.core.api.Assertions.assertThat(partitions).containsKey("12:123");
}
}

View File

@ -0,0 +1,72 @@
package at.procon.eventhub.processing.eventprocessing.plan;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.service.RuntimeDriverWorkingTimeScopeProcessingService;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class DriverWorkingTimeRuntimeProcessingPlanTest {
@Test
void applyExecutionRequestDoesNotRewriteScopeExpansionFromAttachmentFlag() {
DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan(
org.mockito.Mockito.mock(RuntimeDriverWorkingTimeScopeProcessingService.class)
);
UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest(
UUID.randomUUID(),
List.of(),
null,
null,
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:123",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15,
true,
null,
null,
null,
null,
List.of()
);
UnifiedRuntimeProcessingApiRequest resolved = plan.applyExecutionRequest(
scope,
new RuntimeEventPartitioningApiRequest(
RuntimeEventPartitioningStrategy.DRIVER,
Set.of(),
false,
Set.of(),
false,
Set.of(),
false,
false,
0,
false
),
Map.of("attachVehicleOnlyEvents", false)
);
assertThat(resolved.expandVehicleEvents()).isTrue();
assertThat(resolved.vehicleExpansionPaddingMinutes()).isEqualTo(15);
}
}

View File

@ -0,0 +1,49 @@
package at.procon.eventhub.processing.eventprocessing.plan;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class RuntimeProcessingExecutionApiRequestTest {
@Test
void driverWorkingTimePreservesDisabledVehicleExpansion() {
UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest(
UUID.randomUUID(),
List.of(),
null,
null,
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:123",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0,
true,
null,
null,
null,
null,
List.of()
);
RuntimeProcessingExecutionApiRequest request = RuntimeProcessingExecutionApiRequest.driverWorkingTime(scope);
assertThat(request.partitioning().attachVehicleEvidence()).isFalse();
}
}

View File

@ -28,6 +28,7 @@ import at.procon.eventhub.tachographfilesession.model.TachographCompositeSession
import at.procon.eventhub.tachographfilesession.service.InMemoryTachographCompositeSessionRepository; import at.procon.eventhub.tachographfilesession.service.InMemoryTachographCompositeSessionRepository;
import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory; import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -53,9 +54,10 @@ class TachographFileSessionRuntimeEventLoaderTest {
new VehicleKeyFactory(), new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper()) new EventDetailsFactory(new ObjectMapper())
); );
RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(eventBuilder);
TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader( TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader(
new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, eventBuilder))), new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, rawSourceEventBuilder))),
new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, eventBuilder))), new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, rawSourceEventBuilder))),
compositeRepository, compositeRepository,
new EventAcquisitionRecordKeyService(), new EventAcquisitionRecordKeyService(),
new EventHubEventSorter() new EventHubEventSorter()
@ -90,9 +92,10 @@ class TachographFileSessionRuntimeEventLoaderTest {
new VehicleKeyFactory(), new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper()) new EventDetailsFactory(new ObjectMapper())
); );
RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(eventBuilder);
TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader( TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader(
new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, eventBuilder))), new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, rawSourceEventBuilder))),
new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, eventBuilder))), new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, rawSourceEventBuilder))),
compositeRepository, compositeRepository,
new EventAcquisitionRecordKeyService(), new EventAcquisitionRecordKeyService(),
new EventHubEventSorter() new EventHubEventSorter()
@ -135,9 +138,10 @@ class TachographFileSessionRuntimeEventLoaderTest {
new VehicleKeyFactory(), new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper()) new EventDetailsFactory(new ObjectMapper())
); );
RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(eventBuilder);
TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader( TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader(
new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, eventBuilder))), new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, rawSourceEventBuilder))),
new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, eventBuilder))), new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, rawSourceEventBuilder))),
compositeRepository, compositeRepository,
new EventAcquisitionRecordKeyService(), new EventAcquisitionRecordKeyService(),
new EventHubEventSorter() new EventHubEventSorter()

View File

@ -23,6 +23,7 @@ import at.procon.eventhub.tachographfilesession.service.DriverKeyFactory;
import at.procon.eventhub.tachographfilesession.service.DriverTimelineBuilder; import at.procon.eventhub.tachographfilesession.service.DriverTimelineBuilder;
import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory; import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -45,12 +46,12 @@ class UnifiedDriverEventSourceServiceTest {
UnifiedDriverEventSourceService service = new UnifiedDriverEventSourceService(List.of( UnifiedDriverEventSourceService service = new UnifiedDriverEventSourceService(List.of(
new TachographFileSessionUnifiedDriverEventSource( new TachographFileSessionUnifiedDriverEventSource(
repository, repository,
new IntervalBackedDriverTimelineEventBuilder( new RawSourceDriverTimelineEventBuilder(new IntervalBackedDriverTimelineEventBuilder(
timelineBuilder, timelineBuilder,
new DriverKeyFactory(), new DriverKeyFactory(),
new VehicleKeyFactory(), new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper()) new EventDetailsFactory(new ObjectMapper())
) ))
) )
)); ));

View File

@ -23,6 +23,7 @@ import at.procon.eventhub.tachographfilesession.service.DriverKeyFactory;
import at.procon.eventhub.tachographfilesession.service.DriverTimelineBuilder; import at.procon.eventhub.tachographfilesession.service.DriverTimelineBuilder;
import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory; import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -42,15 +43,16 @@ class UnifiedVehicleEventSourceServiceTest {
EventHubProperties properties = new EventHubProperties(); EventHubProperties properties = new EventHubProperties();
TachographFileSessionRepository repository = new InMemoryTachographFileSessionRepository(properties); TachographFileSessionRepository repository = new InMemoryTachographFileSessionRepository(properties);
DriverTimelineBuilder timelineBuilder = new DriverTimelineBuilder(); DriverTimelineBuilder timelineBuilder = new DriverTimelineBuilder();
UnifiedVehicleEventSourceService service = new UnifiedVehicleEventSourceService(List.of( IntervalBackedDriverTimelineEventBuilder eventBuilder = new IntervalBackedDriverTimelineEventBuilder(
new TachographFileSessionUnifiedVehicleEventSource(
repository,
new IntervalBackedDriverTimelineEventBuilder(
timelineBuilder, timelineBuilder,
new DriverKeyFactory(), new DriverKeyFactory(),
new VehicleKeyFactory(), new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper()) new EventDetailsFactory(new ObjectMapper())
) );
UnifiedVehicleEventSourceService service = new UnifiedVehicleEventSourceService(List.of(
new TachographFileSessionUnifiedVehicleEventSource(
repository,
new RawSourceDriverTimelineEventBuilder(eventBuilder)
) )
)); ));