diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java index 2e56bc9..61cd92b 100644 --- a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java @@ -26,9 +26,11 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Comparator; import java.util.Collections; +import java.util.Deque; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -55,10 +57,34 @@ public class DriverWorkingTimeReusableProjectionBuilder { private static final Map ACTIVITY_INTERVAL_INPUT_DEFINITION = activityIntervalInputDefinitionStatic(); private static final Map VEHICLE_USAGE_INTERVAL_INPUT_DEFINITION = vehicleUsageIntervalInputDefinitionStatic(); private static final Map SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION = supportGeoEvidenceInputDefinitionStatic(); + private static final int MAX_IDLE_RUNTIMES_PER_DEFINITION = 2; + private static final List REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES = List.of( + "delete from PreviousRestCandidateCoverageInterval", + "delete from OpenPotentialInVehicleTripState", + "delete from SupportGeoEvidenceWindow", + "delete from DailyWeeklyRestCandidateBeginGeoEvidenceCandidateWindow", + "delete from DailyWeeklyRestCandidateBeginGeoEvidenceBestScoreWindow", + "delete from DailyWeeklyRestCandidateBeginGeoEvidenceResolvedWindow", + "delete from DailyWeeklyRestCandidateEndGeoEvidenceCandidateWindow", + "delete from DailyWeeklyRestCandidateEndGeoEvidenceBestScoreWindow", + "delete from DailyWeeklyRestCandidateEndGeoEvidenceResolvedWindow", + "delete from DailyWeeklyRestCandidateBeginBoundaryOdometerCandidateWindow", + "delete from DailyWeeklyRestCandidateBeginBoundaryOdometerBestScoreWindow", + "delete from DailyWeeklyRestCandidateBeginBoundaryOdometerResolvedWindow", + "delete from DailyWeeklyRestCandidateEndBoundaryOdometerCandidateWindow", + "delete from DailyWeeklyRestCandidateEndBoundaryOdometerBestScoreWindow", + "delete from DailyWeeklyRestCandidateEndBoundaryOdometerResolvedWindow", + "delete from DailyWeeklyRestCandidateCoverageCardResolvedIntervalWindow", + "delete from DailyWeeklyRestCandidateCoverageEmittedKeyWindow", + "delete from PreviousSignificantDrivingInterval", + "context PerDriver delete from PreviousVehicleUsageInterval" + ); private final EventHubProperties properties; private final ConcurrentMap preparedDefinitions = new ConcurrentHashMap<>(); + private final ConcurrentMap reusableRuntimePools = + new ConcurrentHashMap<>(); @Autowired public DriverWorkingTimeReusableProjectionBuilder( @@ -159,13 +185,15 @@ public class DriverWorkingTimeReusableProjectionBuilder { long sortOutputMs = elapsedMillis(sortOutputStartedAtNanos); LOG.info( - "Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})", + "Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimePoolHit: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, runtimeResetMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})", elapsedMillis(startedAtNanos), runtimeMetrics.definitionCacheHit(), runtimeMetrics.definitionPreparationMs(), + runtimeMetrics.runtimePoolHit(), runtimeMetrics.runtimeInitMs(), runtimeMetrics.deployMs(), runtimeMetrics.listenerRegistrationMs(), + runtimeMetrics.runtimeResetMs(), runtimeMetrics.sendSupportGeoMs(), runtimeMetrics.sendVehicleUsageMs(), runtimeMetrics.sendActivityMs(), @@ -253,16 +281,20 @@ public class DriverWorkingTimeReusableProjectionBuilder { Map> listeners, Consumer sender ) { - EPRuntime runtime = null; long definitionPreparationMs = 0L; boolean definitionCacheHit = false; + boolean runtimePoolHit = false; long runtimeInitMs = 0L; long deployMs = 0L; long listenerRegistrationMs = 0L; + long runtimeResetMs = 0L; long sendSupportGeoMs = 0L; long sendVehicleUsageMs = 0L; long sendActivityMs = 0L; long destroyMs = 0L; + ReusableProjectionRuntimePool runtimePool = null; + ReusableProjectionRuntime reusableRuntime = null; + boolean discardRuntime = false; try { PreparedProjectionDefinitionCacheKey cacheKey = new PreparedProjectionDefinitionCacheKey( significantDrivingMinutes, @@ -289,44 +321,41 @@ public class DriverWorkingTimeReusableProjectionBuilder { definitionPreparationMs = definitionCacheHit ? 0L : elapsedMillis(definitionPreparationStartedAtNanos); } - long runtimeInitStartedAtNanos = System.nanoTime(); - Configuration configuration = createRuntimeConfiguration(); - String runtimeUri = "eventhub-driver-working-time-reusable-projection-" + RUNTIME_COUNTER.incrementAndGet(); - runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration); - runtimeInitMs = elapsedMillis(runtimeInitStartedAtNanos); + runtimePool = reusableRuntimePools.computeIfAbsent(cacheKey, ignored -> new ReusableProjectionRuntimePool()); + reusableRuntime = runtimePool.acquire(preparedDefinition, listeners); + runtimePoolHit = reusableRuntime.poolHit(); + runtimeInitMs = reusableRuntime.runtimeInitMs(); + deployMs = reusableRuntime.deployMs(); + listenerRegistrationMs = reusableRuntime.listenerRegistrationMs(); - long deployStartedAtNanos = System.nanoTime(); - EPDeployment deployment = runtime.getDeploymentService().deploy(preparedDefinition.compiled()); - deployMs = elapsedMillis(deployStartedAtNanos); - - long listenerRegistrationStartedAtNanos = System.nanoTime(); - for (Map.Entry> entry : listeners.entrySet()) { - runtime.getDeploymentService() - .getStatement(deployment.getDeploymentId(), entry.getKey()) - .addListener((newData, oldData, statement, rt) -> entry.getValue().accept(newData)); - } - listenerRegistrationMs = elapsedMillis(listenerRegistrationStartedAtNanos); - - DerivedProjectionEventSender timedSender = new DerivedProjectionEventSender(runtime); - sender.accept(timedSender); - sendSupportGeoMs = timedSender.sendSupportGeoMs(); - sendVehicleUsageMs = timedSender.sendVehicleUsageMs(); - sendActivityMs = timedSender.sendActivityMs(); + ReusableProjectionRuntimeExecution execution = reusableRuntime.execute(listeners, sender); + runtimeResetMs = execution.runtimeResetMs(); + sendSupportGeoMs = execution.sendSupportGeoMs(); + sendVehicleUsageMs = execution.sendVehicleUsageMs(); + sendActivityMs = execution.sendActivityMs(); } catch (EPCompileException | EPDeployException e) { + discardRuntime = true; throw new IllegalStateException("Cannot compile/deploy reusable driver working-time projection EPL bundle", e); + } catch (RuntimeException e) { + discardRuntime = true; + throw e; } finally { - if (runtime != null) { - long destroyStartedAtNanos = System.nanoTime(); - runtime.destroy(); - destroyMs = elapsedMillis(destroyStartedAtNanos); + if (reusableRuntime != null) { + if (discardRuntime || runtimePool == null) { + destroyMs = reusableRuntime.destroy(); + } else { + destroyMs = runtimePool.release(reusableRuntime); + } } } return new DerivedProjectionRuntimeExecutionMetrics( definitionCacheHit, definitionPreparationMs, + runtimePoolHit, runtimeInitMs, deployMs, listenerRegistrationMs, + runtimeResetMs, sendSupportGeoMs, sendVehicleUsageMs, sendActivityMs, @@ -334,6 +363,51 @@ public class DriverWorkingTimeReusableProjectionBuilder { ); } + private ReusableProjectionRuntime createReusableRuntime( + PreparedProjectionDefinition preparedDefinition, + Map> listeners + ) throws EPDeployException, EPCompileException { + EPRuntime runtime = null; + try { + long runtimeInitStartedAtNanos = System.nanoTime(); + Configuration configuration = createRuntimeConfiguration(); + String runtimeUri = "eventhub-driver-working-time-reusable-projection-" + RUNTIME_COUNTER.incrementAndGet(); + runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration); + long runtimeInitMs = elapsedMillis(runtimeInitStartedAtNanos); + + long deployStartedAtNanos = System.nanoTime(); + EPDeployment deployment = runtime.getDeploymentService().deploy(preparedDefinition.compiled()); + long deployMs = elapsedMillis(deployStartedAtNanos); + + ReusableProjectionRuntime reusableRuntime = new ReusableProjectionRuntime(runtime, runtimeInitMs, deployMs, false); + + long listenerRegistrationStartedAtNanos = System.nanoTime(); + for (String statementName : listeners.keySet()) { + runtime.getDeploymentService() + .getStatement(deployment.getDeploymentId(), statementName) + .addListener((newData, oldData, statement, rt) -> reusableRuntime.onStatementEvents(statement.getName(), newData)); + } + reusableRuntime.listenerRegistrationMs(elapsedMillis(listenerRegistrationStartedAtNanos)); + reusableRuntime.cleanupQueries(compileCleanupQueries(runtime)); + return reusableRuntime; + } catch (EPDeployException | EPCompileException | RuntimeException ex) { + if (runtime != null && !runtime.isDestroyed()) { + runtime.destroy(); + } + throw ex; + } + } + + private List compileCleanupQueries(EPRuntime runtime) throws EPCompileException { + CompilerArguments arguments = new CompilerArguments(runtime.getConfigurationDeepCopy()); + arguments.getPath().add(runtime.getRuntimePath()); + List compiledQueries = new ArrayList<>(REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES.size()); + for (String cleanupQuery : REUSABLE_RUNTIME_STATE_CLEANUP_QUERIES) { + compiledQueries.add(EPCompilerProvider.getCompiler().compileQuery(cleanupQuery, arguments)); + } + return List.copyOf(compiledQueries); + } + private Map activityIntervalInputDefinition() { return ACTIVITY_INTERVAL_INPUT_DEFINITION; } @@ -894,9 +968,11 @@ public class DriverWorkingTimeReusableProjectionBuilder { private record DerivedProjectionRuntimeExecutionMetrics( boolean definitionCacheHit, long definitionPreparationMs, + boolean runtimePoolHit, long runtimeInitMs, long deployMs, long listenerRegistrationMs, + long runtimeResetMs, long sendSupportGeoMs, long sendVehicleUsageMs, long sendActivityMs, @@ -904,6 +980,152 @@ public class DriverWorkingTimeReusableProjectionBuilder { ) { } + private final class ReusableProjectionRuntimePool { + + private final Deque idleRuntimes = new ArrayDeque<>(); + + private synchronized ReusableProjectionRuntime acquire( + PreparedProjectionDefinition preparedDefinition, + Map> listeners + ) throws EPDeployException, EPCompileException { + ReusableProjectionRuntime runtime = idleRuntimes.pollFirst(); + if (runtime != null) { + return runtime.reused(); + } + return createReusableRuntime(preparedDefinition, listeners); + } + + private long release(ReusableProjectionRuntime runtime) { + if (runtime == null) { + return 0L; + } + synchronized (this) { + if (idleRuntimes.size() < MAX_IDLE_RUNTIMES_PER_DEFINITION) { + runtime.poolHit(false); + idleRuntimes.addLast(runtime); + return 0L; + } + } + return runtime.destroy(); + } + } + + private static final class ReusableProjectionRuntime { + + private final EPRuntime runtime; + private final long runtimeInitMs; + private final long deployMs; + private volatile long listenerRegistrationMs; + private volatile List cleanupQueries = List.of(); + private volatile boolean poolHit; + private ExecutionListeners currentExecutionListeners; + + private ReusableProjectionRuntime( + EPRuntime runtime, + long runtimeInitMs, + long deployMs, + boolean poolHit + ) { + this.runtime = runtime; + this.runtimeInitMs = runtimeInitMs; + this.deployMs = deployMs; + this.poolHit = poolHit; + } + + private synchronized ReusableProjectionRuntimeExecution execute( + Map> listeners, + Consumer sender + ) { + currentExecutionListeners = new ExecutionListeners(listeners); + long runtimeResetStartedAtNanos = System.nanoTime(); + for (EPCompiled cleanupQuery : cleanupQueries) { + runtime.getFireAndForgetService().executeQuery(cleanupQuery); + } + long runtimeResetMs = elapsedMillisStatic(runtimeResetStartedAtNanos); + try { + DerivedProjectionEventSender timedSender = new DerivedProjectionEventSender(runtime); + sender.accept(timedSender); + return new ReusableProjectionRuntimeExecution( + runtimeResetMs, + timedSender.sendSupportGeoMs(), + timedSender.sendVehicleUsageMs(), + timedSender.sendActivityMs() + ); + } finally { + currentExecutionListeners = null; + } + } + + private void onStatementEvents(String statementName, EventBean[] newData) { + ExecutionListeners listeners = currentExecutionListeners; + if (listeners == null || newData == null || statementName == null) { + return; + } + Consumer consumer = listeners.listeners().get(statementName); + if (consumer != null) { + consumer.accept(newData); + } + } + + private long destroy() { + if (runtime == null || runtime.isDestroyed()) { + return 0L; + } + long destroyStartedAtNanos = System.nanoTime(); + runtime.destroy(); + return elapsedMillisStatic(destroyStartedAtNanos); + } + + private ReusableProjectionRuntime reused() { + poolHit = true; + return this; + } + + private void cleanupQueries(List value) { + cleanupQueries = value == null ? List.of() : value; + } + + private void listenerRegistrationMs(long value) { + listenerRegistrationMs = value; + } + + private void poolHit(boolean value) { + poolHit = value; + } + + private boolean poolHit() { + return poolHit; + } + + private long runtimeInitMs() { + return poolHit ? 0L : runtimeInitMs; + } + + private long deployMs() { + return poolHit ? 0L : deployMs; + } + + private long listenerRegistrationMs() { + return poolHit ? 0L : listenerRegistrationMs; + } + } + + private record ExecutionListeners( + Map> listeners + ) { + private ExecutionListeners { + listeners = listeners == null ? Map.of() : Map.copyOf(listeners); + } + } + + private record ReusableProjectionRuntimeExecution( + long runtimeResetMs, + long sendSupportGeoMs, + long sendVehicleUsageMs, + long sendActivityMs + ) { + } + private static final class DerivedProjectionEventSender { private final EPRuntime delegate; @@ -949,4 +1171,8 @@ public class DriverWorkingTimeReusableProjectionBuilder { return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); } } + + private static long elapsedMillisStatic(long startedAtNanos) { + return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/dto/RuntimeEventProcessingApiRequest.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/dto/RuntimeEventProcessingApiRequest.java index 98de486..9384710 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/dto/RuntimeEventProcessingApiRequest.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/dto/RuntimeEventProcessingApiRequest.java @@ -40,7 +40,7 @@ public record RuntimeEventProcessingApiRequest( scope != null ? scope.includeAllDrivers() : null, scope != null ? scope.vehicleKeys() : null, scope != null ? scope.includeAllVehicles() : null, - null, + scope != null ? scope.expandVehicleEvents() : null, scope != null ? scope.vehicleExpansionPaddingMinutes() : null, null ), diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeEventAssemblyModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeEventAssemblyModule.java index 335037f..a845eef 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeEventAssemblyModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeEventAssemblyModule.java @@ -39,6 +39,8 @@ public class RuntimeEventAssemblyModule implements RuntimeProcessingModule { UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context); UnifiedRuntimeEventBundle bundle = eventAssemblyService.assembleDriverScopedEvents(scopeRequest.toRuntimeRequest()); Map metadata = new LinkedHashMap<>(); + metadata.put("expandVehicleEvents", scopeRequest.expandVehicleEvents() == null || scopeRequest.expandVehicleEvents()); + metadata.put("vehicleExpansionPaddingMinutes", scopeRequest.vehicleExpansionPaddingMinutes() == null ? 0 : scopeRequest.vehicleExpansionPaddingMinutes()); metadata.put("driverSeedEventCount", bundle.driverSeedEvents().size()); metadata.put("expandedVehicleEventCount", bundle.expandedVehicleEvents().size()); metadata.put("mergedEventCount", bundle.mergedEvents().size()); diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java index 871be89..8de698f 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java @@ -85,8 +85,16 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule directDriverEvents, broadBundle.mergedEvents(), driverVehicleUsageIntervals, - scopeRequest.expandVehicleEvents() == null || scopeRequest.expandVehicleEvents(), - scopeRequest.vehicleExpansionPaddingMinutes() == null ? 0 : scopeRequest.vehicleExpansionPaddingMinutes(), + booleanAttribute( + context, + at.procon.eventhub.processing.eventprocessing.plan.DriverWorkingTimeRuntimeProcessingPlan.ATTACH_VEHICLE_ONLY_EVENTS_ATTRIBUTE, + scopeRequest.expandVehicleEvents() == null || scopeRequest.expandVehicleEvents() + ), + integerAttribute( + context, + at.procon.eventhub.processing.eventprocessing.plan.DriverWorkingTimeRuntimeProcessingPlan.VEHICLE_EVIDENCE_PADDING_MINUTES_ATTRIBUTE, + scopeRequest.vehicleExpansionPaddingMinutes() == null ? 0 : scopeRequest.vehicleExpansionPaddingMinutes() + ), includePartitionDebug ); for (EventHubEventDto attachedEvent : attachmentResult.attachedVehicleEvidenceEvents()) { @@ -291,4 +299,23 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule } return fallback; } + + private int integerAttribute(RuntimeProcessingModuleContext context, String key, int fallback) { + Object value = context.attributes().get(key); + if (value instanceof Number number) { + return Math.max(0, number.intValue()); + } + if (value == null) { + return Math.max(0, fallback); + } + String text = value.toString(); + if (text.isBlank()) { + return Math.max(0, fallback); + } + try { + return Math.max(0, Integer.parseInt(text.trim())); + } catch (NumberFormatException ignored) { + return Math.max(0, fallback); + } + } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java index 2c51e43..cd48003 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java @@ -30,6 +30,8 @@ import org.springframework.stereotype.Component; public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessingPlan { public static final String PLAN_KEY = "driver-working-time-v1"; + public static final String ATTACH_VEHICLE_ONLY_EVENTS_ATTRIBUTE = "attachVehicleOnlyEvents"; + public static final String VEHICLE_EVIDENCE_PADDING_MINUTES_ATTRIBUTE = "vehicleEvidencePaddingMinutes"; private final RuntimeProcessingPipelineExecutor pipelineExecutor; private final boolean includeRuntimeEventAssemblyModule; @@ -180,6 +182,16 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "includePartitionDebug", request.partitioning() != null && request.partitioning().includeDebugOrDefault() ); + boolean attachVehicleOnlyEvents = resolveAttachVehicleOnlyEvents( + request.sourceSelection(), + request.partitioning(), + request.parameters() + ); + int vehicleEvidencePaddingMinutes = resolveVehicleEvidencePaddingMinutes( + request.sourceSelection(), + request.partitioning(), + request.parameters() + ); UnifiedRuntimeProcessingApiRequest scopeRequest = applyExecutionRequest( request.sourceSelection(), request.partitioning(), @@ -189,6 +201,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing Map attributes = new LinkedHashMap<>(); attributes.put("runtimeScopeApiRequest", scopeRequest); attributes.put("includePartitionDebug", includePartitionDebug); + attributes.put(ATTACH_VEHICLE_ONLY_EVENTS_ATTRIBUTE, attachVehicleOnlyEvents); + attributes.put(VEHICLE_EVIDENCE_PADDING_MINUTES_ATTRIBUTE, vehicleEvidencePaddingMinutes); RuntimeProcessingModuleContext initialContext = new RuntimeProcessingModuleContext( request, List.of(), @@ -373,12 +387,6 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing sourceSelection.includeActivityIntervalsOrDefault()); boolean includeDrivingIntervals = booleanParameter(parameters, "includeDrivingIntervals", sourceSelection.includeDrivingIntervalsOrDefault()); - boolean attachVehicleOnlyEvents = booleanParameter(parameters, "attachVehicleOnlyEvents", - partitioning == null ? sourceSelection.expandVehicleEvents() == null || sourceSelection.expandVehicleEvents() : partitioning.attachVehicleEvidenceOrDefault()); - Integer vehicleEvidencePaddingMinutes = nonNegativeIntegerParameter(parameters, "vehicleEvidencePaddingMinutes", - partitioning == null - ? sourceSelection.vehicleExpansionPaddingMinutes() - : partitioning.vehicleEvidencePaddingMinutesOrDefault(sourceSelection.vehicleExpansionPaddingMinutes() == null ? 0 : sourceSelection.vehicleExpansionPaddingMinutes())); return new UnifiedRuntimeProcessingApiRequest( sourceSelection.sessionId(), @@ -398,8 +406,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing sourceSelection.driverCardNumber(), sourceSelection.occurredFrom(), sourceSelection.occurredTo(), - attachVehicleOnlyEvents, - vehicleEvidencePaddingMinutes, + sourceSelection.expandVehicleEvents(), + sourceSelection.vehicleExpansionPaddingMinutes(), sourceSelection.includeIntersectingIntervals(), significantDrivingMinutes, minimumRestPeriodMinutes, @@ -409,6 +417,37 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing ); } + private boolean resolveAttachVehicleOnlyEvents( + UnifiedRuntimeProcessingApiRequest sourceSelection, + RuntimeEventPartitioningApiRequest partitioning, + Map parameters + ) { + return booleanParameter( + parameters, + "attachVehicleOnlyEvents", + partitioning == null + ? sourceSelection.expandVehicleEvents() == null || sourceSelection.expandVehicleEvents() + : partitioning.attachVehicleEvidenceOrDefault() + ); + } + + private int resolveVehicleEvidencePaddingMinutes( + UnifiedRuntimeProcessingApiRequest sourceSelection, + RuntimeEventPartitioningApiRequest partitioning, + Map parameters + ) { + Integer resolved = nonNegativeIntegerParameter( + parameters, + "vehicleEvidencePaddingMinutes", + partitioning == null + ? sourceSelection.vehicleExpansionPaddingMinutes() + : partitioning.vehicleEvidencePaddingMinutesOrDefault( + sourceSelection.vehicleExpansionPaddingMinutes() == null ? 0 : sourceSelection.vehicleExpansionPaddingMinutes() + ) + ); + return resolved == null ? 0 : resolved; + } + private List requestedOrDefaultModules(List requestedModules) { if (requestedModules != null && !requestedModules.isEmpty()) { LinkedHashMap requested = new LinkedHashMap<>(); diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionApiRequest.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionApiRequest.java index 71ce22c..f56054f 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionApiRequest.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionApiRequest.java @@ -48,7 +48,7 @@ public record RuntimeProcessingExecutionApiRequest( sourceSelection != null ? sourceSelection.includeAllDrivers() : null, sourceSelection != null ? sourceSelection.vehicleKeys() : null, sourceSelection != null ? sourceSelection.includeAllVehicles() : null, - null, + sourceSelection != null ? sourceSelection.expandVehicleEvents() : null, sourceSelection != null ? sourceSelection.vehicleExpansionPaddingMinutes() : null, null ), diff --git a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java index bf8e752..00f4ed9 100644 --- a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java +++ b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java @@ -14,11 +14,15 @@ import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @Service public class UnifiedRuntimeEventAssemblyService { + private static final Logger LOG = LoggerFactory.getLogger(UnifiedRuntimeEventAssemblyService.class); + private final List driverEventLoaders; private final List vehicleEventLoaders; @@ -34,10 +38,13 @@ public class UnifiedRuntimeEventAssemblyService { List sourceInputs = request.normalizedSourceInputs(); List driverSeedEvents = loadDriverSeedEvents(request); List discoveredVehicles = discoverVehicles(driverSeedEvents); - List expandedVehicleEvents = request.expandVehicleEvents() + boolean expandVehicleEvents = request.expandVehicleEvents(); + List expandedVehicleEvents = expandVehicleEvents ? loadExpandedVehicleEvents(request, discoveredVehicles) : List.of(); - List mergedEvents = deduplicateAndSort(driverSeedEvents, expandedVehicleEvents); + List mergedEvents = expandVehicleEvents + ? deduplicateAndSort(driverSeedEvents, expandedVehicleEvents) + : driverSeedEvents; List notes = new ArrayList<>(); boolean includesEventHub = sourceInputs.stream() @@ -63,12 +70,21 @@ public class UnifiedRuntimeEventAssemblyService { notes.add("Tachograph file-session events were loaded from session " + sourceInput.sessionId() + "."); } } - if (request.expandVehicleEvents()) { + if (expandVehicleEvents) { notes.add("Vehicle expansion loaded additional events for vehicles discovered in the driver seed set."); notes.add("Vehicle expansion padding minutes: " + request.vehicleExpansionPaddingMinutes() + "."); } else { notes.add("Vehicle expansion was disabled for this runtime request."); } + LOG.info( + "Runtime event assembly completed (expandVehicleEvents: {}, sourceInputs: {}, driverSeedEvents: {}, discoveredVehicles: {}, expandedVehicleEvents: {}, mergedEvents: {})", + expandVehicleEvents, + sourceInputs.size(), + driverSeedEvents.size(), + discoveredVehicles.size(), + expandedVehicleEvents.size(), + mergedEvents.size() + ); return new UnifiedRuntimeEventBundle( request, driverSeedEvents, diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionUnifiedDriverEventSource.java b/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionUnifiedDriverEventSource.java index 3c278ff..8aae7e2 100644 --- a/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionUnifiedDriverEventSource.java +++ b/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionUnifiedDriverEventSource.java @@ -9,7 +9,7 @@ import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; import at.procon.eventhub.tachographfilesession.model.TachographFileSession; import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle; import at.procon.eventhub.tachographfilesession.service.DriverNotFoundInSessionException; -import at.procon.eventhub.tachographfilesession.service.DriverTimelineEventBuilder; +import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import java.util.List; @@ -19,14 +19,14 @@ import org.springframework.stereotype.Component; public class TachographFileSessionUnifiedDriverEventSource implements UnifiedDriverEventSource { private final TachographFileSessionRepository repository; - private final DriverTimelineEventBuilder eventBuilder; + private final RawSourceDriverTimelineEventBuilder rawSourceEventBuilder; public TachographFileSessionUnifiedDriverEventSource( TachographFileSessionRepository repository, - DriverTimelineEventBuilder eventBuilder + RawSourceDriverTimelineEventBuilder rawSourceEventBuilder ) { this.repository = repository; - this.eventBuilder = eventBuilder; + this.rawSourceEventBuilder = rawSourceEventBuilder; } @Override @@ -56,7 +56,7 @@ public class TachographFileSessionUnifiedDriverEventSource implements UnifiedDri DriverExtractionSession driver, UnifiedDriverEventsRequest request ) { - TachographTimelineEventBundle bundle = eventBuilder.buildEventBundle(session, driver); + TachographTimelineEventBundle bundle = rawSourceEventBuilder.buildRawEventBundle(session, driver); return TachographTimelineEventBundle.fromRuntimeBundle( RuntimeIntervalEventWindowSelector.filterBundle( bundle == null ? null : bundle.toRuntimeBundle(), diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionUnifiedVehicleEventSource.java b/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionUnifiedVehicleEventSource.java index db7a0ef..f60d5e2 100644 --- a/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionUnifiedVehicleEventSource.java +++ b/src/main/java/at/procon/eventhub/tachographfilesession/processing/service/TachographFileSessionUnifiedVehicleEventSource.java @@ -10,7 +10,7 @@ import at.procon.eventhub.reference.TachographNationRegistry; import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; import at.procon.eventhub.tachographfilesession.model.TachographFileSession; import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle; -import at.procon.eventhub.tachographfilesession.service.DriverTimelineEventBuilder; +import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import java.util.List; @@ -20,14 +20,14 @@ import org.springframework.stereotype.Component; public class TachographFileSessionUnifiedVehicleEventSource implements UnifiedVehicleEventSource { private final TachographFileSessionRepository repository; - private final DriverTimelineEventBuilder eventBuilder; + private final RawSourceDriverTimelineEventBuilder rawSourceEventBuilder; public TachographFileSessionUnifiedVehicleEventSource( TachographFileSessionRepository repository, - DriverTimelineEventBuilder eventBuilder + RawSourceDriverTimelineEventBuilder rawSourceEventBuilder ) { this.repository = repository; - this.eventBuilder = eventBuilder; + this.rawSourceEventBuilder = rawSourceEventBuilder; } @Override @@ -52,7 +52,7 @@ public class TachographFileSessionUnifiedVehicleEventSource implements UnifiedVe DriverExtractionSession driver, UnifiedVehicleEventsRequest request ) { - TachographTimelineEventBundle bundle = eventBuilder.buildEventBundle(session, driver); + TachographTimelineEventBundle bundle = rawSourceEventBuilder.buildRawEventBundle(session, driver); return TachographTimelineEventBundle.fromRuntimeBundle( RuntimeIntervalEventWindowSelector.filterBundle( bundle == null ? null : bundle.toRuntimeBundle(), diff --git a/src/main/resources/esper/driver-working-time-derived-projections.epl b/src/main/resources/esper/driver-working-time-derived-projections.epl index c02e4ed..5c72d15 100644 --- a/src/main/resources/esper/driver-working-time-derived-projections.epl +++ b/src/main/resources/esper/driver-working-time-derived-projections.epl @@ -288,7 +288,7 @@ create schema DailyWeeklyRestCandidateCoverageEmittedKey( endedAtEpochSecond long ); -create context PerDriver partition by driverKey from TachographVehicleUsageIntervalInputEvent; +@public create context PerDriver partition by driverKey from TachographVehicleUsageIntervalInputEvent; create schema VuCardAbsentInterval( sessionId java.util.UUID, @@ -438,27 +438,27 @@ create schema PotentialInVehicleTripInterval( lastNextDrivingSourceIntervalId string ); -create window PreviousRestCandidateCoverageInterval#unique(driverKey) as DailyWeeklyRestCandidateCoverageInterval; +@public create window PreviousRestCandidateCoverageInterval#unique(driverKey) as DailyWeeklyRestCandidateCoverageInterval; -create window OpenPotentialInVehicleTripState#unique(driverKey) as PotentialInVehicleTripState; +@public create window OpenPotentialInVehicleTripState#unique(driverKey) as PotentialInVehicleTripState; -create window SupportGeoEvidenceWindow#keepall as SupportGeoEvidence; +@public create window SupportGeoEvidenceWindow#keepall as SupportGeoEvidence; -create window DailyWeeklyRestCandidateBeginGeoEvidenceCandidateWindow#keepall as DailyWeeklyRestCandidateBeginGeoEvidenceCandidate; -create window DailyWeeklyRestCandidateBeginGeoEvidenceBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginGeoEvidenceBestScore; -create window DailyWeeklyRestCandidateBeginGeoEvidenceResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginGeoEvidenceResolved; +@public create window DailyWeeklyRestCandidateBeginGeoEvidenceCandidateWindow#keepall as DailyWeeklyRestCandidateBeginGeoEvidenceCandidate; +@public create window DailyWeeklyRestCandidateBeginGeoEvidenceBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginGeoEvidenceBestScore; +@public create window DailyWeeklyRestCandidateBeginGeoEvidenceResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginGeoEvidenceResolved; -create window DailyWeeklyRestCandidateEndGeoEvidenceCandidateWindow#keepall as DailyWeeklyRestCandidateEndGeoEvidenceCandidate; -create window DailyWeeklyRestCandidateEndGeoEvidenceBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndGeoEvidenceBestScore; -create window DailyWeeklyRestCandidateEndGeoEvidenceResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndGeoEvidenceResolved; -create window DailyWeeklyRestCandidateBeginBoundaryOdometerCandidateWindow#keepall as DailyWeeklyRestCandidateBeginBoundaryOdometerCandidate; -create window DailyWeeklyRestCandidateBeginBoundaryOdometerBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginBoundaryOdometerBestScore; -create window DailyWeeklyRestCandidateBeginBoundaryOdometerResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginBoundaryOdometerResolved; -create window DailyWeeklyRestCandidateEndBoundaryOdometerCandidateWindow#keepall as DailyWeeklyRestCandidateEndBoundaryOdometerCandidate; -create window DailyWeeklyRestCandidateEndBoundaryOdometerBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndBoundaryOdometerBestScore; -create window DailyWeeklyRestCandidateEndBoundaryOdometerResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndBoundaryOdometerResolved; -create window DailyWeeklyRestCandidateCoverageCardResolvedIntervalWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateCoverageCardResolvedInterval; -create window DailyWeeklyRestCandidateCoverageEmittedKeyWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateCoverageEmittedKey; +@public create window DailyWeeklyRestCandidateEndGeoEvidenceCandidateWindow#keepall as DailyWeeklyRestCandidateEndGeoEvidenceCandidate; +@public create window DailyWeeklyRestCandidateEndGeoEvidenceBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndGeoEvidenceBestScore; +@public create window DailyWeeklyRestCandidateEndGeoEvidenceResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndGeoEvidenceResolved; +@public create window DailyWeeklyRestCandidateBeginBoundaryOdometerCandidateWindow#keepall as DailyWeeklyRestCandidateBeginBoundaryOdometerCandidate; +@public create window DailyWeeklyRestCandidateBeginBoundaryOdometerBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginBoundaryOdometerBestScore; +@public create window DailyWeeklyRestCandidateBeginBoundaryOdometerResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateBeginBoundaryOdometerResolved; +@public create window DailyWeeklyRestCandidateEndBoundaryOdometerCandidateWindow#keepall as DailyWeeklyRestCandidateEndBoundaryOdometerCandidate; +@public create window DailyWeeklyRestCandidateEndBoundaryOdometerBestScoreWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndBoundaryOdometerBestScore; +@public create window DailyWeeklyRestCandidateEndBoundaryOdometerResolvedWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateEndBoundaryOdometerResolved; +@public create window DailyWeeklyRestCandidateCoverageCardResolvedIntervalWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateCoverageCardResolvedInterval; +@public create window DailyWeeklyRestCandidateCoverageEmittedKeyWindow#unique(driverKey, startedAtEpochSecond, endedAtEpochSecond) as DailyWeeklyRestCandidateCoverageEmittedKey; insert into SupportGeoEvidenceWindow select @@ -488,7 +488,7 @@ select vehicleKey from TachographActivityIntervalInputEvent(activityType = 'DRIVE', durationSeconds > ${SIGNIFICANT_DRIVING_THRESHOLD_SECONDS}); -create window PreviousSignificantDrivingInterval#unique(driverKey) as SignificantDrivingInterval; +@public create window PreviousSignificantDrivingInterval#unique(driverKey) as SignificantDrivingInterval; on SignificantDrivingInterval as next insert into DrivingInterruptionInterval @@ -1483,7 +1483,7 @@ select endedAtEpochSecond from DailyWeeklyRestCandidateCoverageInterval; -context PerDriver +@public context PerDriver create window PreviousVehicleUsageInterval#lastevent as TachographVehicleUsageIntervalInputEvent; @Priority(30) diff --git a/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java b/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java new file mode 100644 index 0000000..de30639 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilderTest.java @@ -0,0 +1,115 @@ +package at.procon.eventhub.processing.driverworkingtime.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDerivedProjectionBundle; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeProcessingInput; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class DriverWorkingTimeReusableProjectionBuilderTest { + + @Test + void reusesWarmRuntimeWithoutLeakingPreviousState() { + DriverWorkingTimeReusableProjectionBuilder builder = + new DriverWorkingTimeReusableProjectionBuilder(new EventHubProperties()); + UUID sessionId = UUID.randomUUID(); + OffsetDateTime from = OffsetDateTime.parse("2026-05-01T08:00:00Z"); + OffsetDateTime firstDriveEnd = OffsetDateTime.parse("2026-05-01T09:00:00Z"); + OffsetDateTime secondDriveStart = OffsetDateTime.parse("2026-05-01T10:00:00Z"); + OffsetDateTime to = OffsetDateTime.parse("2026-05-01T11:00:00Z"); + + DriverWorkingTimeProcessingInput input = new DriverWorkingTimeProcessingInput( + sessionId, + "12:123", + "DRIVER_CARD", + from, + to, + from, + to, + 15, + 30, + List.of( + new DriverWorkingTimeActivityInterval( + sessionId, + "12:123", + "ACT-1", + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "DRIVER_CARD", + "ACT-1", + "ACT-1", + from, + firstDriveEnd, + from.toEpochSecond(), + firstDriveEnd.toEpochSecond(), + firstDriveEnd.toEpochSecond() - from.toEpochSecond(), + List.of("ACT-1"), + false, + false, + "RAW_INTERVAL" + ), + new DriverWorkingTimeActivityInterval( + sessionId, + "12:123", + "ACT-2", + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "DRIVER_CARD", + "ACT-2", + "ACT-2", + secondDriveStart, + to, + secondDriveStart.toEpochSecond(), + to.toEpochSecond(), + to.toEpochSecond() - secondDriveStart.toEpochSecond(), + List.of("ACT-2"), + false, + false, + "RAW_INTERVAL" + ) + ), + List.of( + new DriverWorkingTimeVehicleUsageInterval( + sessionId, + "12:123", + "VU-1", + "VU-1", + "VU-1", + from, + to, + from.toEpochSecond(), + to.toEpochSecond(), + to.toEpochSecond() - from.toEpochSecond(), + 100L, + 150L, + "12:REG-1", + "VIN-1", + "DRIVER_CARD", + List.of("VU-1") + ) + ), + List.of(), + List.of() + ); + + DriverWorkingTimeDerivedProjectionBundle first = builder.buildDerivedProjectionBundle(input); + DriverWorkingTimeDerivedProjectionBundle second = builder.buildDerivedProjectionBundle(input); + + assertThat(second).isEqualTo(first); + assertThat(second.drivingInterruptionIntervals()).hasSize(1); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/dto/RuntimeEventProcessingApiRequestTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/dto/RuntimeEventProcessingApiRequestTest.java new file mode 100644 index 0000000..a2fc1fc --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/dto/RuntimeEventProcessingApiRequestTest.java @@ -0,0 +1,49 @@ +package at.procon.eventhub.processing.eventprocessing.dto; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class RuntimeEventProcessingApiRequestTest { + + @Test + void driverWorkingTimePreservesDisabledVehicleExpansion() { + UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest( + UUID.randomUUID(), + List.of(), + null, + null, + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION), + null, + null, + "12:123", + Set.of(), + false, + Set.of(), + false, + null, + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + false, + 0, + true, + null, + null, + null, + null, + List.of() + ); + + RuntimeEventProcessingApiRequest request = RuntimeEventProcessingApiRequest.driverWorkingTime(scope); + + assertThat(request.partitioning().attachVehicleEvidence()).isFalse(); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModuleTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModuleTest.java new file mode 100644 index 0000000..d6818e6 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModuleTest.java @@ -0,0 +1,137 @@ +package at.procon.eventhub.processing.eventprocessing.module; + +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDriverPartition; +import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval; +import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.eventprocessing.plan.DriverWorkingTimeRuntimeProcessingPlan; +import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest; +import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.processing.service.RuntimeDriverVehicleEvidenceAttachmentService; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class VehicleEvidenceAttachmentModuleTest { + + @Test + void usesAttachmentAttributeInsteadOfScopeExpansionFlag() { + RuntimeDriverVehicleEvidenceAttachmentService service = + org.mockito.Mockito.mock(RuntimeDriverVehicleEvidenceAttachmentService.class); + VehicleEvidenceAttachmentModule module = new VehicleEvidenceAttachmentModule(service); + + UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest( + UUID.randomUUID(), + List.of(), + null, + null, + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION), + null, + null, + "12:123", + Set.of(), + false, + Set.of(), + false, + null, + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 15, + true, + null, + null, + null, + null, + List.of() + ); + UnifiedRuntimeProcessingRequest runtimeRequest = scope.toRuntimeRequest(); + UnifiedRuntimeEventBundle bundle = new UnifiedRuntimeEventBundle( + runtimeRequest, + List.of(), + List.of(), + List.of(), + List.of(), + List.of() + ); + + when(service.attachVehicleEvidence( + eq("12:123"), + eq(List.of()), + eq(List.of()), + eq(List.of()), + eq(false), + eq(3), + anyBoolean() + )).thenReturn(new RuntimeDriverVehicleEvidenceAttachmentResult( + "12:123", + List.of(), + List.of(), + List.of(), + 0, + 0, + 0, + List.of(), + List.of(), + List.of(), + List.of() + )); + + RuntimeProcessingModuleContext context = new RuntimeProcessingModuleContext( + new RuntimeProcessingExecutionApiRequest("driver-working-time-v1", scope, null, List.of(), Map.of()), + List.of(), + Map.of( + "runtimeScopeApiRequest", scope, + DriverWorkingTimeRuntimeProcessingPlan.ATTACH_VEHICLE_ONLY_EVENTS_ATTRIBUTE, false, + DriverWorkingTimeRuntimeProcessingPlan.VEHICLE_EVIDENCE_PADDING_MINUTES_ATTRIBUTE, 3 + ), + Map.of( + DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY, + new RuntimeProcessingModuleResult( + DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY, + RuntimeProcessingModuleStatus.SUCCESS, + bundle, + Map.of(), + List.of() + ), + DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE, + new RuntimeProcessingModuleResult( + DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE, + RuntimeProcessingModuleStatus.SUCCESS, + List.of(), + Map.of(), + List.of() + ) + ) + ); + + RuntimeProcessingModuleResult result = module.execute(context); + + verify(service).attachVehicleEvidence( + eq("12:123"), + eq(List.of()), + eq(List.of()), + eq(List.of()), + eq(false), + eq(3), + eq(false) + ); + @SuppressWarnings("unchecked") + Map partitions = + (Map) result.output(); + org.assertj.core.api.Assertions.assertThat(partitions).containsKey("12:123"); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java new file mode 100644 index 0000000..fea3d5f --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlanTest.java @@ -0,0 +1,72 @@ +package at.procon.eventhub.processing.eventprocessing.plan; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest; +import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.service.RuntimeDriverWorkingTimeScopeProcessingService; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class DriverWorkingTimeRuntimeProcessingPlanTest { + + @Test + void applyExecutionRequestDoesNotRewriteScopeExpansionFromAttachmentFlag() { + DriverWorkingTimeRuntimeProcessingPlan plan = new DriverWorkingTimeRuntimeProcessingPlan( + org.mockito.Mockito.mock(RuntimeDriverWorkingTimeScopeProcessingService.class) + ); + UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest( + UUID.randomUUID(), + List.of(), + null, + null, + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION), + null, + null, + "12:123", + Set.of(), + false, + Set.of(), + false, + null, + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 15, + true, + null, + null, + null, + null, + List.of() + ); + + UnifiedRuntimeProcessingApiRequest resolved = plan.applyExecutionRequest( + scope, + new RuntimeEventPartitioningApiRequest( + RuntimeEventPartitioningStrategy.DRIVER, + Set.of(), + false, + Set.of(), + false, + Set.of(), + false, + false, + 0, + false + ), + Map.of("attachVehicleOnlyEvents", false) + ); + + assertThat(resolved.expandVehicleEvents()).isTrue(); + assertThat(resolved.vehicleExpansionPaddingMinutes()).isEqualTo(15); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionApiRequestTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionApiRequestTest.java new file mode 100644 index 0000000..963494c --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionApiRequestTest.java @@ -0,0 +1,49 @@ +package at.procon.eventhub.processing.eventprocessing.plan; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class RuntimeProcessingExecutionApiRequestTest { + + @Test + void driverWorkingTimePreservesDisabledVehicleExpansion() { + UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest( + UUID.randomUUID(), + List.of(), + null, + null, + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION), + null, + null, + "12:123", + Set.of(), + false, + Set.of(), + false, + null, + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + false, + 0, + true, + null, + null, + null, + null, + List.of() + ); + + RuntimeProcessingExecutionApiRequest request = RuntimeProcessingExecutionApiRequest.driverWorkingTime(scope); + + assertThat(request.partitioning().attachVehicleEvidence()).isFalse(); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoaderTest.java b/src/test/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoaderTest.java index 2ede05a..2dcd3e7 100644 --- a/src/test/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoaderTest.java +++ b/src/test/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoaderTest.java @@ -28,6 +28,7 @@ import at.procon.eventhub.tachographfilesession.model.TachographCompositeSession import at.procon.eventhub.tachographfilesession.service.InMemoryTachographCompositeSessionRepository; import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder; +import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory; import com.fasterxml.jackson.databind.ObjectMapper; @@ -53,9 +54,10 @@ class TachographFileSessionRuntimeEventLoaderTest { new VehicleKeyFactory(), new EventDetailsFactory(new ObjectMapper()) ); + RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(eventBuilder); TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader( - new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, eventBuilder))), - new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, eventBuilder))), + new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, rawSourceEventBuilder))), + new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, rawSourceEventBuilder))), compositeRepository, new EventAcquisitionRecordKeyService(), new EventHubEventSorter() @@ -90,9 +92,10 @@ class TachographFileSessionRuntimeEventLoaderTest { new VehicleKeyFactory(), new EventDetailsFactory(new ObjectMapper()) ); + RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(eventBuilder); TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader( - new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, eventBuilder))), - new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, eventBuilder))), + new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, rawSourceEventBuilder))), + new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, rawSourceEventBuilder))), compositeRepository, new EventAcquisitionRecordKeyService(), new EventHubEventSorter() @@ -135,9 +138,10 @@ class TachographFileSessionRuntimeEventLoaderTest { new VehicleKeyFactory(), new EventDetailsFactory(new ObjectMapper()) ); + RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(eventBuilder); TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader( - new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, eventBuilder))), - new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, eventBuilder))), + new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, rawSourceEventBuilder))), + new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, rawSourceEventBuilder))), compositeRepository, new EventAcquisitionRecordKeyService(), new EventHubEventSorter() diff --git a/src/test/java/at/procon/eventhub/processing/service/UnifiedDriverEventSourceServiceTest.java b/src/test/java/at/procon/eventhub/processing/service/UnifiedDriverEventSourceServiceTest.java index e023b11..3122166 100644 --- a/src/test/java/at/procon/eventhub/processing/service/UnifiedDriverEventSourceServiceTest.java +++ b/src/test/java/at/procon/eventhub/processing/service/UnifiedDriverEventSourceServiceTest.java @@ -23,6 +23,7 @@ import at.procon.eventhub.tachographfilesession.service.DriverKeyFactory; import at.procon.eventhub.tachographfilesession.service.DriverTimelineBuilder; import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder; +import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory; import com.fasterxml.jackson.databind.ObjectMapper; @@ -45,12 +46,12 @@ class UnifiedDriverEventSourceServiceTest { UnifiedDriverEventSourceService service = new UnifiedDriverEventSourceService(List.of( new TachographFileSessionUnifiedDriverEventSource( repository, - new IntervalBackedDriverTimelineEventBuilder( + new RawSourceDriverTimelineEventBuilder(new IntervalBackedDriverTimelineEventBuilder( timelineBuilder, new DriverKeyFactory(), new VehicleKeyFactory(), new EventDetailsFactory(new ObjectMapper()) - ) + )) ) )); diff --git a/src/test/java/at/procon/eventhub/processing/service/UnifiedVehicleEventSourceServiceTest.java b/src/test/java/at/procon/eventhub/processing/service/UnifiedVehicleEventSourceServiceTest.java index 91015f4..c88a0c9 100644 --- a/src/test/java/at/procon/eventhub/processing/service/UnifiedVehicleEventSourceServiceTest.java +++ b/src/test/java/at/procon/eventhub/processing/service/UnifiedVehicleEventSourceServiceTest.java @@ -23,6 +23,7 @@ import at.procon.eventhub.tachographfilesession.service.DriverKeyFactory; import at.procon.eventhub.tachographfilesession.service.DriverTimelineBuilder; import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder; +import at.procon.eventhub.tachographfilesession.service.RawSourceDriverTimelineEventBuilder; import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory; import com.fasterxml.jackson.databind.ObjectMapper; @@ -42,15 +43,16 @@ class UnifiedVehicleEventSourceServiceTest { EventHubProperties properties = new EventHubProperties(); TachographFileSessionRepository repository = new InMemoryTachographFileSessionRepository(properties); DriverTimelineBuilder timelineBuilder = new DriverTimelineBuilder(); + IntervalBackedDriverTimelineEventBuilder eventBuilder = new IntervalBackedDriverTimelineEventBuilder( + timelineBuilder, + new DriverKeyFactory(), + new VehicleKeyFactory(), + new EventDetailsFactory(new ObjectMapper()) + ); UnifiedVehicleEventSourceService service = new UnifiedVehicleEventSourceService(List.of( new TachographFileSessionUnifiedVehicleEventSource( repository, - new IntervalBackedDriverTimelineEventBuilder( - timelineBuilder, - new DriverKeyFactory(), - new VehicleKeyFactory(), - new EventDetailsFactory(new ObjectMapper()) - ) + new RawSourceDriverTimelineEventBuilder(eventBuilder) ) ));