From 3bfe05b64e3966ff0f65a281829a45a912e1e300 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:00:54 +0200 Subject: [PATCH] Add runtime processing performance tracing --- .../DriverWorkingTimeProcessingCore.java | 30 +- ...rWorkingTimeReusableProjectionBuilder.java | 271 +++++++++++++++--- .../RuntimeEventProcessingService.java | 32 ++- ...erWorkingTimeDerivedProjectionsModule.java | 34 +++ .../RuntimeProcessingPipelineExecutor.java | 58 +++- .../module/epl/RuntimeEplModuleExecutor.java | 144 +++++++++- .../RuntimeProcessingExecutionService.java | 40 ++- ...DriverVehicleUsageIntervalsModuleTest.java | 59 ++++ ...rkingTimeDerivedProjectionsModuleTest.java | 3 + ...RuntimeProcessingPipelineExecutorTest.java | 119 ++++++++ 10 files changed, 730 insertions(+), 60 deletions(-) create mode 100644 src/test/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutorTest.java diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeProcessingCore.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeProcessingCore.java index 7491f2e..f465dd1 100644 --- a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeProcessingCore.java +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeProcessingCore.java @@ -19,6 +19,8 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; /** @@ -32,6 +34,8 @@ import org.springframework.stereotype.Service; @Service public class DriverWorkingTimeProcessingCore { + private static final Logger LOG = LoggerFactory.getLogger(DriverWorkingTimeProcessingCore.class); + private final DriverWorkingTimeDerivedProjectionEngine derivedProjectionEngine; public DriverWorkingTimeProcessingCore( @@ -42,6 +46,7 @@ public class DriverWorkingTimeProcessingCore { public DriverWorkingTimeProcessingResultDto process(DriverWorkingTimeProcessingInput input) { Objects.requireNonNull(input, "input must not be null"); + long startedAtNanos = System.nanoTime(); OffsetDateTime loadedFrom = input.loadedFrom(); OffsetDateTime loadedTo = input.loadedTo(); String driverKey = input.driverKey(); @@ -51,6 +56,7 @@ public class DriverWorkingTimeProcessingCore { throw new IllegalArgumentException("occurredTo must not be before occurredFrom."); } + long activityClipStartedAtNanos = System.nanoTime(); List activityIntervals = clipActivityIntervals( input.activityIntervals(), requestedFrom, @@ -63,9 +69,13 @@ public class DriverWorkingTimeProcessingCore { requestedFrom, requestedTo ); + long activityClipMs = elapsedMillis(activityClipStartedAtNanos); + long derivedProjectionBuildStartedAtNanos = System.nanoTime(); DriverWorkingTimeDerivedProjectionBundle derivedProjectionBundle = derivedProjectionEngine.build(input); + long derivedProjectionBuildMs = elapsedMillis(derivedProjectionBuildStartedAtNanos); + long derivedIntervalClipStartedAtNanos = System.nanoTime(); List drivingInterruptionIntervals = clipDrivingInterruptionIntervals( derivedProjectionBundle.drivingInterruptionIntervals(), @@ -131,8 +141,9 @@ public class DriverWorkingTimeProcessingCore { requestedFrom, requestedTo ); + long derivedIntervalClipMs = elapsedMillis(derivedIntervalClipStartedAtNanos); - return new DriverWorkingTimeProcessingResultDto( + DriverWorkingTimeProcessingResultDto result = new DriverWorkingTimeProcessingResultDto( input.sessionId(), driverKey, input.sourceKind(), @@ -168,6 +179,19 @@ public class DriverWorkingTimeProcessingCore { supportGeoEvents, combinedNotes(input.notes()) ); + LOG.info( + "Driver working-time core processed driver {} in {} ms (activityClipMs: {}, derivedProjectionBuildMs: {}, derivedIntervalClipMs: {}, activityIntervals: {}, drivingIntervals: {}, vehicleUsageIntervals: {}, supportGeoEvents: {})", + driverKey, + elapsedMillis(startedAtNanos), + activityClipMs, + derivedProjectionBuildMs, + derivedIntervalClipMs, + result.activityIntervalCount(), + result.drivingIntervalCount(), + result.vehicleUsageIntervalCount(), + result.supportGeoEventCount() + ); + return result; } public DriverWorkingTimeProcessingResultDto processDriverWorkingTime(DriverWorkingTimeProcessingInput input) { @@ -649,4 +673,8 @@ public class DriverWorkingTimeProcessingCore { } return left.isBefore(right) ? left : right; } + + private long elapsedMillis(long startedAtNanos) { + return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + } } diff --git a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java index d71cac0..2e56bc9 100644 --- a/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java +++ b/src/main/java/at/procon/eventhub/processing/driverworkingtime/service/DriverWorkingTimeReusableProjectionBuilder.java @@ -28,13 +28,18 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Comparator; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Service; @@ -43,11 +48,17 @@ import org.springframework.util.StreamUtils; @Service public class DriverWorkingTimeReusableProjectionBuilder { + private static final Logger LOG = LoggerFactory.getLogger(DriverWorkingTimeReusableProjectionBuilder.class); private static final AtomicLong RUNTIME_COUNTER = new AtomicLong(); private static final String DRIVING_DERIVED_PROJECTION_BUNDLE_EPL_TEMPLATE = loadResource("esper/driver-working-time-derived-projections.epl"); + private static final Map ACTIVITY_INTERVAL_INPUT_DEFINITION = activityIntervalInputDefinitionStatic(); + private static final Map VEHICLE_USAGE_INTERVAL_INPUT_DEFINITION = vehicleUsageIntervalInputDefinitionStatic(); + private static final Map SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION = supportGeoEvidenceInputDefinitionStatic(); private final EventHubProperties properties; + private final ConcurrentMap preparedDefinitions = + new ConcurrentHashMap<>(); @Autowired public DriverWorkingTimeReusableProjectionBuilder( @@ -78,6 +89,7 @@ public class DriverWorkingTimeReusableProjectionBuilder { int significantDrivingMinutes, int minimumRestPeriodMinutes ) { + long startedAtNanos = System.nanoTime(); if ((activityInputEvents == null || activityInputEvents.isEmpty()) && (vehicleUsageInputEvents == null || vehicleUsageInputEvents.isEmpty())) { return emptyDerivedProjectionBundle(); @@ -93,22 +105,9 @@ public class DriverWorkingTimeReusableProjectionBuilder { List potentialInVehicleOvernightStayIntervals = new ArrayList<>(); List potentialInVehicleTripIntervals = new ArrayList<>(); - executeWithRuntime( - configuration -> { - configuration.getCommon().addEventType( - "TachographActivityIntervalInputEvent", - activityIntervalInputDefinition() - ); - configuration.getCommon().addEventType( - "TachographVehicleUsageIntervalInputEvent", - vehicleUsageIntervalInputDefinition() - ); - configuration.getCommon().addEventType( - "TachographSupportGeoEvidenceInputEvent", - supportGeoEvidenceInputDefinition() - ); - }, - renderDrivingDerivedProjectionBundleEpl(significantDrivingMinutes, minimumRestPeriodMinutes), + DerivedProjectionRuntimeExecutionMetrics runtimeMetrics = executeWithRuntime( + significantDrivingMinutes, + minimumRestPeriodMinutes, Map.of( "drivingInterruptionIntervals", newData -> collectDrivingInterruptionIntervalModels(newData, drivingInterruptionIntervals), @@ -129,34 +128,26 @@ public class DriverWorkingTimeReusableProjectionBuilder { "potentialInVehicleTripIntervals", newData -> collectPotentialInVehicleTripIntervalModels(newData, potentialInVehicleTripIntervals) ), - runtime -> { + sender -> { if (supportGeoInputEvents != null) { for (Map supportGeoEvidence : supportGeoInputEvents) { - runtime.getEventService().sendEventMap( - supportGeoEvidence, - "TachographSupportGeoEvidenceInputEvent" - ); + sender.sendSupportGeoEvent(supportGeoEvidence); } } if (vehicleUsageInputEvents != null) { for (Map interval : vehicleUsageInputEvents) { - runtime.getEventService().sendEventMap( - interval, - "TachographVehicleUsageIntervalInputEvent" - ); + sender.sendVehicleUsageEvent(interval); } } if (activityInputEvents != null) { for (Map interval : activityInputEvents) { - runtime.getEventService().sendEventMap( - interval, - "TachographActivityIntervalInputEvent" - ); + sender.sendActivityEvent(interval); } } } ); + long sortOutputStartedAtNanos = System.nanoTime(); List sortedDailyWeeklyRestCandidateCoverageIntervals = sortDailyWeeklyRestCandidateCoverageIntervalsCommon(dailyWeeklyRestCandidateCoverageIntervals); List sortedUnclassifiedDailyWeeklyRestCandidateCoverageIntervals = @@ -165,6 +156,25 @@ public class DriverWorkingTimeReusableProjectionBuilder { sortPotentialHomeOvernightStayIntervalsCommon(potentialHomeOvernightStayIntervals); List sortedPotentialInVehicleOvernightStayIntervals = sortPotentialInVehicleOvernightStayIntervalsCommon(potentialInVehicleOvernightStayIntervals); + long sortOutputMs = elapsedMillis(sortOutputStartedAtNanos); + + LOG.info( + "Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})", + elapsedMillis(startedAtNanos), + runtimeMetrics.definitionCacheHit(), + runtimeMetrics.definitionPreparationMs(), + runtimeMetrics.runtimeInitMs(), + runtimeMetrics.deployMs(), + runtimeMetrics.listenerRegistrationMs(), + runtimeMetrics.sendSupportGeoMs(), + runtimeMetrics.sendVehicleUsageMs(), + runtimeMetrics.sendActivityMs(), + runtimeMetrics.destroyMs(), + sortOutputMs, + safeList(activityInputEvents).size(), + safeList(vehicleUsageInputEvents).size(), + safeList(supportGeoInputEvents).size() + ); return new DriverWorkingTimeDerivedProjectionBundle( sortDrivingInterruptionIntervalsCommon(drivingInterruptionIntervals), @@ -237,39 +247,134 @@ public class DriverWorkingTimeReusableProjectionBuilder { ); } - private void executeWithRuntime( - Consumer configurationSetup, - String epl, + private DerivedProjectionRuntimeExecutionMetrics executeWithRuntime( + int significantDrivingMinutes, + int minimumRestPeriodMinutes, Map> listeners, - Consumer sender + Consumer sender ) { EPRuntime runtime = null; + long definitionPreparationMs = 0L; + boolean definitionCacheHit = false; + long runtimeInitMs = 0L; + long deployMs = 0L; + long listenerRegistrationMs = 0L; + long sendSupportGeoMs = 0L; + long sendVehicleUsageMs = 0L; + long sendActivityMs = 0L; + long destroyMs = 0L; try { - Configuration configuration = new Configuration(); - configurationSetup.accept(configuration); + PreparedProjectionDefinitionCacheKey cacheKey = new PreparedProjectionDefinitionCacheKey( + significantDrivingMinutes, + minimumRestPeriodMinutes, + Math.max(1, properties.getRuntimeProcessing().getRestCandidateGeoLookbackMinutes()), + Math.max(1, properties.getRuntimeProcessing().getRestCandidateGeoLookaheadMinutes()), + Math.max(0, properties.getRuntimeProcessing().getRestCandidateGeoStationaryMaxMeters()), + Math.max( + properties.getRuntimeProcessing().getRestCandidateGeoStationaryMaxMeters(), + properties.getRuntimeProcessing().getRestCandidateGeoMinorMovementMaxMeters() + ) + ); + PreparedProjectionDefinition preparedDefinition = preparedDefinitions.get(cacheKey); + definitionCacheHit = preparedDefinition != null; + if (!definitionCacheHit) { + long definitionPreparationStartedAtNanos = System.nanoTime(); + PreparedProjectionDefinition candidate = prepareProjectionDefinition( + significantDrivingMinutes, + minimumRestPeriodMinutes + ); + PreparedProjectionDefinition existing = preparedDefinitions.putIfAbsent(cacheKey, candidate); + preparedDefinition = existing == null ? candidate : existing; + definitionCacheHit = existing != null; + definitionPreparationMs = definitionCacheHit ? 0L : elapsedMillis(definitionPreparationStartedAtNanos); + } + + long runtimeInitStartedAtNanos = System.nanoTime(); + Configuration configuration = createRuntimeConfiguration(); String runtimeUri = "eventhub-driver-working-time-reusable-projection-" + RUNTIME_COUNTER.incrementAndGet(); runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration); + runtimeInitMs = elapsedMillis(runtimeInitStartedAtNanos); - CompilerArguments arguments = new CompilerArguments(configuration); - EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments); - EPDeployment deployment = runtime.getDeploymentService().deploy(compiled); + long deployStartedAtNanos = System.nanoTime(); + EPDeployment deployment = runtime.getDeploymentService().deploy(preparedDefinition.compiled()); + deployMs = elapsedMillis(deployStartedAtNanos); + + long listenerRegistrationStartedAtNanos = System.nanoTime(); for (Map.Entry> entry : listeners.entrySet()) { runtime.getDeploymentService() .getStatement(deployment.getDeploymentId(), entry.getKey()) .addListener((newData, oldData, statement, rt) -> entry.getValue().accept(newData)); } + listenerRegistrationMs = elapsedMillis(listenerRegistrationStartedAtNanos); - sender.accept(runtime); + DerivedProjectionEventSender timedSender = new DerivedProjectionEventSender(runtime); + sender.accept(timedSender); + sendSupportGeoMs = timedSender.sendSupportGeoMs(); + sendVehicleUsageMs = timedSender.sendVehicleUsageMs(); + sendActivityMs = timedSender.sendActivityMs(); } catch (EPCompileException | EPDeployException e) { throw new IllegalStateException("Cannot compile/deploy reusable driver working-time projection EPL bundle", e); } finally { if (runtime != null) { + long destroyStartedAtNanos = System.nanoTime(); runtime.destroy(); + destroyMs = elapsedMillis(destroyStartedAtNanos); } } + return new DerivedProjectionRuntimeExecutionMetrics( + definitionCacheHit, + definitionPreparationMs, + runtimeInitMs, + deployMs, + listenerRegistrationMs, + sendSupportGeoMs, + sendVehicleUsageMs, + sendActivityMs, + destroyMs + ); } private Map activityIntervalInputDefinition() { + return ACTIVITY_INTERVAL_INPUT_DEFINITION; + } + + private Map vehicleUsageIntervalInputDefinition() { + return VEHICLE_USAGE_INTERVAL_INPUT_DEFINITION; + } + + private Map supportGeoEvidenceInputDefinition() { + return SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION; + } + + private PreparedProjectionDefinition prepareProjectionDefinition( + int significantDrivingMinutes, + int minimumRestPeriodMinutes + ) throws EPCompileException { + Configuration configuration = createRuntimeConfiguration(); + String epl = renderDrivingDerivedProjectionBundleEpl(significantDrivingMinutes, minimumRestPeriodMinutes); + CompilerArguments arguments = new CompilerArguments(configuration); + EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments); + return new PreparedProjectionDefinition(compiled); + } + + private Configuration createRuntimeConfiguration() { + Configuration configuration = new Configuration(); + configuration.getCommon().addEventType( + "TachographActivityIntervalInputEvent", + activityIntervalInputDefinition() + ); + configuration.getCommon().addEventType( + "TachographVehicleUsageIntervalInputEvent", + vehicleUsageIntervalInputDefinition() + ); + configuration.getCommon().addEventType( + "TachographSupportGeoEvidenceInputEvent", + supportGeoEvidenceInputDefinition() + ); + return configuration; + } + + private static Map activityIntervalInputDefinitionStatic() { Map definition = new LinkedHashMap<>(); definition.put("sessionId", UUID.class); definition.put("driverKey", String.class); @@ -292,10 +397,10 @@ public class DriverWorkingTimeReusableProjectionBuilder { definition.put("synthetic", boolean.class); definition.put("clippedToRequestedPeriod", boolean.class); definition.put("level", String.class); - return definition; + return Collections.unmodifiableMap(definition); } - private Map vehicleUsageIntervalInputDefinition() { + private static Map vehicleUsageIntervalInputDefinitionStatic() { Map definition = new LinkedHashMap<>(); definition.put("sessionId", UUID.class); definition.put("driverKey", String.class); @@ -313,10 +418,10 @@ public class DriverWorkingTimeReusableProjectionBuilder { definition.put("vehicleKey", String.class); definition.put("sourceKind", String.class); definition.put("sourceIntervalIds", java.util.List.class); - return definition; + return Collections.unmodifiableMap(definition); } - private Map supportGeoEvidenceInputDefinition() { + private static Map supportGeoEvidenceInputDefinitionStatic() { Map definition = new LinkedHashMap<>(); definition.put("sessionId", UUID.class); definition.put("driverKey", String.class); @@ -330,7 +435,7 @@ public class DriverWorkingTimeReusableProjectionBuilder { definition.put("longitude", Double.class); definition.put("odometerKm", Long.class); definition.put("priority", int.class); - return definition; + return Collections.unmodifiableMap(definition); } private Map toActivityIntervalInputMap( @@ -758,6 +863,10 @@ public class DriverWorkingTimeReusableProjectionBuilder { return value instanceof Long number ? number : null; } + private long elapsedMillis(long startedAtNanos) { + return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + } + private static String loadResource(String path) { try { ClassPathResource resource = new ClassPathResource(path); @@ -766,4 +875,78 @@ public class DriverWorkingTimeReusableProjectionBuilder { throw new IllegalStateException("Cannot load EPL resource: " + path, e); } } + + private record PreparedProjectionDefinition( + EPCompiled compiled + ) { + } + + private record PreparedProjectionDefinitionCacheKey( + int significantDrivingMinutes, + int minimumRestPeriodMinutes, + int restGeoLookbackMinutes, + int restGeoLookaheadMinutes, + int restGeoStationaryMaxMeters, + int restGeoMinorMovementMaxMeters + ) { + } + + private record DerivedProjectionRuntimeExecutionMetrics( + boolean definitionCacheHit, + long definitionPreparationMs, + long runtimeInitMs, + long deployMs, + long listenerRegistrationMs, + long sendSupportGeoMs, + long sendVehicleUsageMs, + long sendActivityMs, + long destroyMs + ) { + } + + private static final class DerivedProjectionEventSender { + + private final EPRuntime delegate; + private long sendSupportGeoMs; + private long sendVehicleUsageMs; + private long sendActivityMs; + + private DerivedProjectionEventSender(EPRuntime delegate) { + this.delegate = delegate; + } + + private void sendSupportGeoEvent(Map event) { + long startedAtNanos = System.nanoTime(); + delegate.getEventService().sendEventMap(event, "TachographSupportGeoEvidenceInputEvent"); + sendSupportGeoMs += elapsedMillisStatic(startedAtNanos); + } + + private void sendVehicleUsageEvent(Map event) { + long startedAtNanos = System.nanoTime(); + delegate.getEventService().sendEventMap(event, "TachographVehicleUsageIntervalInputEvent"); + sendVehicleUsageMs += elapsedMillisStatic(startedAtNanos); + } + + private void sendActivityEvent(Map event) { + long startedAtNanos = System.nanoTime(); + delegate.getEventService().sendEventMap(event, "TachographActivityIntervalInputEvent"); + sendActivityMs += elapsedMillisStatic(startedAtNanos); + } + + private long sendSupportGeoMs() { + return sendSupportGeoMs; + } + + private long sendVehicleUsageMs() { + return sendVehicleUsageMs; + } + + private long sendActivityMs() { + return sendActivityMs; + } + + private static long elapsedMillisStatic(long startedAtNanos) { + return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + } + } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/RuntimeEventProcessingService.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/RuntimeEventProcessingService.java index 9ca260b..aac14c6 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/RuntimeEventProcessingService.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/RuntimeEventProcessingService.java @@ -3,13 +3,17 @@ package at.procon.eventhub.processing.eventprocessing; import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingApiRequest; import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingResultDto; import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingProfileDescriptorDto; -import java.util.List; import at.procon.eventhub.processing.eventprocessing.profile.RuntimeEventProcessingProfileRegistry; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @Service public class RuntimeEventProcessingService { + private static final Logger LOG = LoggerFactory.getLogger(RuntimeEventProcessingService.class); + private final RuntimeEventProcessingProfileRegistry profileRegistry; public RuntimeEventProcessingService(RuntimeEventProcessingProfileRegistry profileRegistry) { @@ -17,10 +21,34 @@ public class RuntimeEventProcessingService { } public RuntimeEventProcessingResultDto process(RuntimeEventProcessingApiRequest request) { - return profileRegistry.require(request.profileKey()).process(request); + long startedAtNanos = System.nanoTime(); + try { + RuntimeEventProcessingResultDto result = profileRegistry.require(request.profileKey()).process(request); + LOG.info( + "Runtime event processing profile {} completed in {} ms (partitions: {}, inputEvents: {}, warnings: {})", + request.profileKey(), + elapsedMillis(startedAtNanos), + result.selectedPartitionCount(), + result.inputEventCount(), + result.warnings().size() + ); + return result; + } catch (RuntimeException ex) { + LOG.error( + "Runtime event processing profile {} failed after {} ms", + request == null ? null : request.profileKey(), + elapsedMillis(startedAtNanos), + ex + ); + throw ex; + } } public List listProfiles() { return profileRegistry.profileDescriptors(); } + + private long elapsedMillis(long startedAtNanos) { + return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java index afd19cd..bba8372 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java @@ -15,12 +15,19 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcessingModule { + private static final Logger LOG = LoggerFactory.getLogger(DriverWorkingTimeDerivedProjectionsModule.class); + private static final String DRIVER_PROCESSING_TOTAL_MS_METADATA_KEY = "driverProcessingTotalMs"; + private static final String SLOWEST_DRIVER_KEY_METADATA_KEY = "slowestDriverKey"; + private static final String SLOWEST_DRIVER_PROCESSING_MS_METADATA_KEY = "slowestDriverProcessingMs"; + private final DriverWorkingTimeProcessingCore workingTimeProcessingCore; private final RuntimeDriverWorkingTimeScopeProcessingService legacyScopeProcessingService; @@ -60,20 +67,31 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess if (legacyScopeProcessingService != null) { return executeLegacy(context); } + long startedAtNanos = System.nanoTime(); UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context); UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context); Map preparedInputs = preparedInputs(context); LinkedHashMap driverResults = new LinkedHashMap<>(); List warnings = new ArrayList<>(); + long driverProcessingTotalMs = 0L; + String slowestDriverKey = null; + long slowestDriverProcessingMs = -1L; for (Map.Entry entry : preparedInputs.entrySet()) { DriverWorkingTimePreparedInput preparedInput = entry.getValue(); + long driverStartedAtNanos = System.nanoTime(); DriverWorkingTimeProcessingResultDto projection = workingTimeProcessingCore.process(preparedInput.processingInput()) .withIncludedIntervals( scopeRequest.includeActivityIntervalsOrDefault(), scopeRequest.includeDrivingIntervalsOrDefault() ); + long driverProcessingMs = elapsedMillis(driverStartedAtNanos); + driverProcessingTotalMs += driverProcessingMs; + if (driverProcessingMs > slowestDriverProcessingMs) { + slowestDriverProcessingMs = driverProcessingMs; + slowestDriverKey = preparedInput.driverKey(); + } warnings.addAll(preparedInput.partition().warnings()); UnifiedRuntimeProcessingRequest driverRequest = broadBundle.request().withDriverKey(preparedInput.driverKey()); driverResults.put(preparedInput.driverKey(), new UnifiedRuntimeDerivedProjectionResultDto( @@ -110,6 +128,18 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess metadata.put("selectedDriverCount", result.selectedDriverCount()); metadata.put("discoveredVehicleCount", result.discoveredVehicleCount()); metadata.put("driverResultCount", result.driverResults().size()); + metadata.put(DRIVER_PROCESSING_TOTAL_MS_METADATA_KEY, driverProcessingTotalMs); + metadata.put(SLOWEST_DRIVER_KEY_METADATA_KEY, slowestDriverKey); + metadata.put(SLOWEST_DRIVER_PROCESSING_MS_METADATA_KEY, Math.max(0L, slowestDriverProcessingMs)); + LOG.info( + "Driving-derived projections processed {} drivers in {} ms (driverProcessingTotalMs: {}, slowestDriverKey: {}, slowestDriverProcessingMs: {}, warnings: {})", + result.driverResults().size(), + elapsedMillis(startedAtNanos), + driverProcessingTotalMs, + slowestDriverKey, + Math.max(0L, slowestDriverProcessingMs), + result.warnings().size() + ); return new RuntimeProcessingModuleResult( moduleKey(), RuntimeProcessingModuleStatus.SUCCESS, @@ -187,4 +217,8 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess } return fallback; } + + private long elapsedMillis(long startedAtNanos) { + return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutor.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutor.java index db179bf..3c559c5 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutor.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutor.java @@ -4,11 +4,17 @@ import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecu import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @Service public class RuntimeProcessingPipelineExecutor { + public static final String EXECUTION_DURATION_MS_METADATA_KEY = "executionDurationMs"; + + private static final Logger LOG = LoggerFactory.getLogger(RuntimeProcessingPipelineExecutor.class); + private final RuntimeProcessingModuleRegistry moduleRegistry; public RuntimeProcessingPipelineExecutor(RuntimeProcessingModuleRegistry moduleRegistry) { @@ -29,8 +35,17 @@ public class RuntimeProcessingPipelineExecutor { continue; } RuntimeProcessingModule module = moduleRegistry.require(moduleKey.trim()); - RuntimeProcessingModuleResult result = module.execute(context); + long startedAtNanos = System.nanoTime(); + RuntimeProcessingModuleResult result; + try { + result = withExecutionDuration(module.execute(context), startedAtNanos); + } catch (RuntimeException ex) { + long durationMs = elapsedMillis(startedAtNanos); + LOG.error("Runtime processing module {} failed after {} ms", module.moduleKey(), durationMs, ex); + throw ex; + } results.put(module.moduleKey(), result); + logModuleCompletion(result); context = new RuntimeProcessingModuleContext( request, context.events(), @@ -43,4 +58,45 @@ public class RuntimeProcessingPipelineExecutor { } return Map.copyOf(results); } + + private RuntimeProcessingModuleResult withExecutionDuration( + RuntimeProcessingModuleResult result, + long startedAtNanos + ) { + long durationMs = elapsedMillis(startedAtNanos); + RuntimeProcessingModuleResult safeResult = result == null + ? new RuntimeProcessingModuleResult( + "UNKNOWN", + RuntimeProcessingModuleStatus.FAILED, + null, + Map.of(), + List.of("Runtime processing module returned null result.") + ) + : result; + LinkedHashMap metadata = new LinkedHashMap<>(safeResult.metadata()); + metadata.put(EXECUTION_DURATION_MS_METADATA_KEY, durationMs); + return new RuntimeProcessingModuleResult( + safeResult.moduleKey(), + safeResult.status(), + safeResult.output(), + metadata, + safeResult.warnings() + ); + } + + private void logModuleCompletion(RuntimeProcessingModuleResult result) { + Object durationValue = result.metadata().get(EXECUTION_DURATION_MS_METADATA_KEY); + long durationMs = durationValue instanceof Number number ? number.longValue() : -1L; + LOG.info( + "Runtime processing module {} completed with status {} in {} ms (warnings: {})", + result.moduleKey(), + result.status(), + durationMs, + result.warnings().size() + ); + } + + private long elapsedMillis(long startedAtNanos) { + return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/epl/RuntimeEplModuleExecutor.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/epl/RuntimeEplModuleExecutor.java index 0557f72..5d76805 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/epl/RuntimeEplModuleExecutor.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/epl/RuntimeEplModuleExecutor.java @@ -13,10 +13,15 @@ import com.espertech.esper.runtime.client.EPRuntimeProvider; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Service; import org.springframework.util.StreamUtils; @@ -24,24 +29,63 @@ import org.springframework.util.StreamUtils; @Service public class RuntimeEplModuleExecutor { + public static final String DEFINITION_CACHE_HIT_METADATA_KEY = "eplDefinitionCacheHit"; + public static final String DEFINITION_PREPARATION_MS_METADATA_KEY = "eplDefinitionPreparationMs"; + public static final String RUNTIME_INIT_MS_METADATA_KEY = "eplRuntimeInitMs"; + public static final String DEPLOY_MS_METADATA_KEY = "eplDeployMs"; + public static final String LISTENER_REGISTRATION_MS_METADATA_KEY = "eplListenerRegistrationMs"; + public static final String EVENT_SEND_MS_METADATA_KEY = "eplEventSendMs"; + public static final String DESTROY_MS_METADATA_KEY = "eplDestroyMs"; + + private static final Logger LOG = LoggerFactory.getLogger(RuntimeEplModuleExecutor.class); private static final AtomicLong RUNTIME_COUNTER = new AtomicLong(); + private final ConcurrentMap preparedDefinitions = new ConcurrentHashMap<>(); + private final ConcurrentMap eplResourceContents = new ConcurrentHashMap<>(); + public RuntimeEplModuleExecutionResult execute(RuntimeEplModuleDefinition definition) { EPRuntime runtime = null; + Map>> outputs = null; + LinkedHashMap metadata = null; + PreparedDefinition preparedDefinition = null; + boolean definitionCacheHit = false; + long definitionPreparationMs = 0L; + long runtimeInitMs = 0L; + long deployMs = 0L; + long listenerRegistrationMs = 0L; + long eventSendMs = 0L; + long destroyMs = 0L; + long executionStartedAtNanos = System.nanoTime(); try { - Configuration configuration = new Configuration(); - definition.eventTypes().forEach((eventTypeName, eventTypeDefinition) -> - configuration.getCommon().addEventType(eventTypeName, eventTypeDefinition)); + PreparedDefinitionCacheKey cacheKey = PreparedDefinitionCacheKey.from(definition); + preparedDefinition = preparedDefinitions.get(cacheKey); + definitionCacheHit = preparedDefinition != null; + if (!definitionCacheHit) { + long definitionPreparationStartedAtNanos = System.nanoTime(); + PreparedDefinition candidate = prepareDefinition(definition); + PreparedDefinition existing = preparedDefinitions.putIfAbsent(cacheKey, candidate); + preparedDefinition = existing == null ? candidate : existing; + definitionCacheHit = existing != null; + definitionPreparationMs = definitionCacheHit + ? 0L + : elapsedMillis(definitionPreparationStartedAtNanos); + } + + long runtimeInitStartedAtNanos = System.nanoTime(); + Configuration configuration = createConfiguration(preparedDefinition.eventTypes()); String runtimeUri = "eventhub-runtime-epl-module-" + definition.moduleKey().replaceAll("[^A-Za-z0-9_-]", "-") + "-" + RUNTIME_COUNTER.incrementAndGet(); runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration); + runtimeInitMs = elapsedMillis(runtimeInitStartedAtNanos); - EPCompiled compiled = EPCompilerProvider.getCompiler().compile(renderEpl(definition.eplResources()), new CompilerArguments(configuration)); - EPDeployment deployment = runtime.getDeploymentService().deploy(compiled); + long deployStartedAtNanos = System.nanoTime(); + EPDeployment deployment = runtime.getDeploymentService().deploy(preparedDefinition.compiled()); + deployMs = elapsedMillis(deployStartedAtNanos); - Map>> outputs = new LinkedHashMap<>(); + long listenerRegistrationStartedAtNanos = System.nanoTime(); + outputs = new LinkedHashMap<>(); for (String statementName : definition.outputStatementNames()) { outputs.put(statementName, new ArrayList<>()); var statement = runtime.getDeploymentService().getStatement(deployment.getDeploymentId(), statementName); @@ -49,18 +93,28 @@ public class RuntimeEplModuleExecutor { throw new IllegalStateException("EPL module " + definition.moduleKey() + " did not deploy expected statement '" + statementName + "'."); } - statement.addListener((newData, oldData, stmt, rt) -> collect(newData, outputs.get(statementName))); + List> statementOutputs = outputs.get(statementName); + statement.addListener((newData, oldData, stmt, rt) -> collect(newData, statementOutputs)); } + listenerRegistrationMs = elapsedMillis(listenerRegistrationStartedAtNanos); + long eventSendStartedAtNanos = System.nanoTime(); for (RuntimeEplInputEventStream inputStream : definition.inputStreams()) { for (Map event : inputStream.events()) { runtime.getEventService().sendEventMap(event, inputStream.eventTypeName()); } } + eventSendMs = elapsedMillis(eventSendStartedAtNanos); - Map metadata = new LinkedHashMap<>(); + metadata = new LinkedHashMap<>(); metadata.put("engine", "EPL"); metadata.put("eplResources", definition.eplResources()); + metadata.put(DEFINITION_CACHE_HIT_METADATA_KEY, definitionCacheHit); + metadata.put(DEFINITION_PREPARATION_MS_METADATA_KEY, definitionPreparationMs); + metadata.put(RUNTIME_INIT_MS_METADATA_KEY, runtimeInitMs); + metadata.put(DEPLOY_MS_METADATA_KEY, deployMs); + metadata.put(LISTENER_REGISTRATION_MS_METADATA_KEY, listenerRegistrationMs); + metadata.put(EVENT_SEND_MS_METADATA_KEY, eventSendMs); Map inputCounts = new LinkedHashMap<>(); for (RuntimeEplInputEventStream inputStream : definition.inputStreams()) { inputCounts.merge(inputStream.eventTypeName(), inputStream.events().size(), Integer::sum); @@ -69,14 +123,48 @@ public class RuntimeEplModuleExecutor { Map outputCounts = new LinkedHashMap<>(); outputs.forEach((statement, events) -> outputCounts.put(statement, events.size())); metadata.put("outputCounts", outputCounts); - return new RuntimeEplModuleExecutionResult(outputs, metadata); } catch (EPCompileException | EPDeployException e) { throw new IllegalStateException("Cannot compile/deploy runtime EPL module " + definition.moduleKey(), e); } finally { if (runtime != null) { + long destroyStartedAtNanos = System.nanoTime(); runtime.destroy(); + destroyMs = elapsedMillis(destroyStartedAtNanos); + } + if (metadata != null) { + metadata.put(DESTROY_MS_METADATA_KEY, destroyMs); + LOG.info( + "Runtime EPL module {} completed in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, eventSendMs: {}, destroyMs: {})", + definition.moduleKey(), + elapsedMillis(executionStartedAtNanos), + definitionCacheHit, + definitionPreparationMs, + runtimeInitMs, + deployMs, + listenerRegistrationMs, + eventSendMs, + destroyMs + ); } } + return new RuntimeEplModuleExecutionResult( + outputs == null ? Map.of() : outputs, + metadata == null ? Map.of() : metadata + ); + } + + private PreparedDefinition prepareDefinition(RuntimeEplModuleDefinition definition) throws EPCompileException { + Configuration configuration = createConfiguration(definition.eventTypes()); + String epl = renderEpl(definition.eplResources()); + EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, new CompilerArguments(configuration)); + return new PreparedDefinition(definition.eventTypes(), compiled); + } + + private Configuration createConfiguration(Map> eventTypes) { + Configuration configuration = new Configuration(); + eventTypes.forEach((eventTypeName, eventTypeDefinition) -> + configuration.getCommon().addEventType(eventTypeName, eventTypeDefinition)); + return configuration; } private String renderEpl(List resources) { @@ -88,7 +176,7 @@ public class RuntimeEplModuleExecutor { if (!builder.isEmpty()) { builder.append("\n\n"); } - builder.append(loadResource(resource.trim())); + builder.append(eplResourceContents.computeIfAbsent(resource.trim(), RuntimeEplModuleExecutor::loadResource)); } return builder.toString(); } @@ -117,4 +205,40 @@ public class RuntimeEplModuleExecutor { target.add(values); } } + + private long elapsedMillis(long startedAtNanos) { + return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + } + + private record PreparedDefinition( + Map> eventTypes, + EPCompiled compiled + ) { + private PreparedDefinition { + eventTypes = eventTypes == null ? Map.of() : immutableNestedMap(eventTypes); + } + } + + private record PreparedDefinitionCacheKey( + String moduleKey, + Map> eventTypes, + List eplResources + ) { + private static PreparedDefinitionCacheKey from(RuntimeEplModuleDefinition definition) { + return new PreparedDefinitionCacheKey( + definition.moduleKey(), + immutableNestedMap(definition.eventTypes()), + List.copyOf(definition.eplResources()) + ); + } + } + + private static Map> immutableNestedMap(Map> source) { + LinkedHashMap> copy = new LinkedHashMap<>(); + source.forEach((key, value) -> copy.put( + key, + value == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(value)) + )); + return Collections.unmodifiableMap(copy); + } } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionService.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionService.java index 4dfce32..a292871 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionService.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingExecutionService.java @@ -1,11 +1,17 @@ package at.procon.eventhub.processing.eventprocessing.plan; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleResult; +import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingPipelineExecutor; import org.springframework.stereotype.Service; @Service public class RuntimeProcessingExecutionService { + private static final Logger LOG = LoggerFactory.getLogger(RuntimeProcessingExecutionService.class); + private final RuntimeProcessingPlanRegistry planRegistry; public RuntimeProcessingExecutionService(RuntimeProcessingPlanRegistry planRegistry) { @@ -13,12 +19,42 @@ public class RuntimeProcessingExecutionService { } public RuntimeProcessingExecutionResultDto execute(RuntimeProcessingExecutionApiRequest request) { + long startedAtNanos = System.nanoTime(); RuntimeProcessingPlan plan = planRegistry.require(request.processingPlanKey()); - plan.validatePartitioning(request.partitioning()); - return plan.execute(request); + try { + plan.validatePartitioning(request.partitioning()); + RuntimeProcessingExecutionResultDto result = plan.execute(request); + RuntimeProcessingModuleResult slowestModule = result.moduleResults().values().stream() + .filter(moduleResult -> moduleResult.metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY) instanceof Number) + .max(java.util.Comparator.comparingLong(moduleResult -> + ((Number) moduleResult.metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())) + .orElse(null); + LOG.info( + "Runtime processing plan {} completed in {} ms (modules: {}, slowestModule: {}, slowestModuleMs: {}, warnings: {})", + request.processingPlanKey(), + elapsedMillis(startedAtNanos), + result.executedModules().size(), + slowestModule == null ? null : slowestModule.moduleKey(), + slowestModule == null ? null : slowestModule.metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY), + result.warnings().size() + ); + return result; + } catch (RuntimeException ex) { + LOG.error( + "Runtime processing plan {} failed after {} ms", + request == null ? null : request.processingPlanKey(), + elapsedMillis(startedAtNanos), + ex + ); + throw ex; + } } public List listPlans() { return planRegistry.planDescriptors(); } + + private long elapsedMillis(long startedAtNanos) { + return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + } } diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModuleTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModuleTest.java index c05742d..ee4388f 100644 --- a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModuleTest.java +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModuleTest.java @@ -15,6 +15,7 @@ import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval; import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; +import at.procon.eventhub.processing.eventprocessing.module.epl.RuntimeEplModuleExecutor; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest; import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; import com.fasterxml.jackson.databind.node.JsonNodeFactory; @@ -78,6 +79,64 @@ class DriverVehicleUsageIntervalsModuleTest { assertThat(intervals.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-05-01T10:10:00Z")); } + @Test + void reusesPreparedEplDefinitionAcrossExecutions() { + DriverVehicleUsageIntervalsModule module = new DriverVehicleUsageIntervalsModule(new RuntimeEplModuleExecutor()); + RuntimeProcessingModuleContext context = new RuntimeProcessingModuleContext( + new RuntimeProcessingExecutionApiRequest( + "driver-working-time-v1", + runtimeScope(), + null, + List.of(), + Map.of() + ), + List.of( + vehicleUsageEvent( + "CVU-1", + EventType.CARD_INSERTED, + EventLifecycle.INSERT, + "2026-05-01T07:50:00Z", + "2026-05-01T07:50:00Z", + "2026-05-01T10:10:00Z", + 100_000L, + "INSERTED" + ), + vehicleUsageEvent( + "CVU-99", + EventType.CARD_WITHDRAWN, + EventLifecycle.WITHDRAW, + "2026-05-01T10:10:00Z", + "2026-05-01T07:50:00Z", + "2026-05-01T10:10:00Z", + 140_000L, + "NOT_INSERTED" + ) + ), + Map.of(), + Map.of() + ); + + RuntimeProcessingModuleResult first = module.execute(context); + RuntimeProcessingModuleResult second = module.execute(context); + + assertThat(first.metadata()) + .containsEntry(RuntimeEplModuleExecutor.DEFINITION_CACHE_HIT_METADATA_KEY, false) + .containsKey(RuntimeEplModuleExecutor.DEFINITION_PREPARATION_MS_METADATA_KEY) + .containsKey(RuntimeEplModuleExecutor.RUNTIME_INIT_MS_METADATA_KEY) + .containsKey(RuntimeEplModuleExecutor.DEPLOY_MS_METADATA_KEY) + .containsKey(RuntimeEplModuleExecutor.LISTENER_REGISTRATION_MS_METADATA_KEY) + .containsKey(RuntimeEplModuleExecutor.EVENT_SEND_MS_METADATA_KEY) + .containsKey(RuntimeEplModuleExecutor.DESTROY_MS_METADATA_KEY); + assertThat(second.metadata()) + .containsEntry(RuntimeEplModuleExecutor.DEFINITION_CACHE_HIT_METADATA_KEY, true) + .containsEntry(RuntimeEplModuleExecutor.DEFINITION_PREPARATION_MS_METADATA_KEY, 0L) + .containsKey(RuntimeEplModuleExecutor.RUNTIME_INIT_MS_METADATA_KEY) + .containsKey(RuntimeEplModuleExecutor.DEPLOY_MS_METADATA_KEY) + .containsKey(RuntimeEplModuleExecutor.LISTENER_REGISTRATION_MS_METADATA_KEY) + .containsKey(RuntimeEplModuleExecutor.EVENT_SEND_MS_METADATA_KEY) + .containsKey(RuntimeEplModuleExecutor.DESTROY_MS_METADATA_KEY); + } + private UnifiedRuntimeProcessingApiRequest runtimeScope() { return new UnifiedRuntimeProcessingApiRequest( UUID.randomUUID(), diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModuleTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModuleTest.java index 714efd8..d359b27 100644 --- a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModuleTest.java +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModuleTest.java @@ -211,6 +211,9 @@ class DriverWorkingTimeDerivedProjectionsModuleTest { (UnifiedRuntimeDriverWorkingTimeScopeResultDto) result.output(); UnifiedRuntimeDerivedProjectionResultDto driverResult = scopeResult.driverResults().get("12:123"); + assertThat(result.metadata()).containsKey("driverProcessingTotalMs"); + assertThat(result.metadata()).containsEntry("slowestDriverKey", "12:123"); + assertThat(result.metadata()).containsKey("slowestDriverProcessingMs"); assertThat(driverResult.projection().activityIntervalCount()).isEqualTo(1); assertThat(driverResult.projection().drivingIntervalCount()).isEqualTo(1); assertThat(driverResult.projection().activityIntervals()).isEmpty(); diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutorTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutorTest.java new file mode 100644 index 0000000..6b723e0 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutorTest.java @@ -0,0 +1,119 @@ +package at.procon.eventhub.processing.eventprocessing.module; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest; +import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class RuntimeProcessingPipelineExecutorTest { + + @Test + void addsExecutionDurationMetadataToEachModuleResult() { + RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor( + new RuntimeProcessingModuleRegistry(List.of( + new TestModule("first"), + new TestModule("second") + )) + ); + + Map results = executor.execute( + new RuntimeProcessingExecutionApiRequest( + "driver-working-time-v1", + new at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest( + java.util.UUID.randomUUID(), + List.of(), + null, + null, + java.util.Set.of(at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION), + null, + null, + "12:123", + java.util.Set.of(), + false, + java.util.Set.of(), + false, + null, + null, + null, + java.time.OffsetDateTime.parse("2026-05-01T00:00:00Z"), + java.time.OffsetDateTime.parse("2026-05-01T01:00:00Z"), + true, + 0, + null, + null, + null, + null, + null + ), + new at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest( + RuntimeEventPartitioningStrategy.DRIVER, + null, + false, + null, + false, + null, + false, + null, + null, + null + ), + List.of(), + Map.of() + ), + List.of("first", "second"), + null + ); + + assertThat(results).containsKeys("first", "second"); + assertThat(results.get("first").metadata()).containsEntry("marker", "first"); + assertThat(results.get("first").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY); + assertThat(results.get("second").metadata()).containsEntry("marker", "second"); + assertThat(results.get("second").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY); + assertThat(((Number) results.get("first").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue()) + .isGreaterThanOrEqualTo(0L); + assertThat(((Number) results.get("second").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue()) + .isGreaterThanOrEqualTo(0L); + } + + private static final class TestModule implements RuntimeProcessingModule { + + private final String key; + + private TestModule(String key) { + this.key = key; + } + + @Override + public String moduleKey() { + return key; + } + + @Override + public at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto descriptor() { + return new at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto( + key, + key, + "test", + "JAVA", + java.util.Set.of() + ); + } + + @Override + public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) { + if ("second".equals(key)) { + assertThat(context.previousResults()).containsKey("first"); + } + return new RuntimeProcessingModuleResult( + key, + RuntimeProcessingModuleStatus.SUCCESS, + null, + Map.of("marker", key), + List.of() + ); + } + } +}