From c066c5c7778286f4f1d30a34ea3be0cf875c584b Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 15 Jun 2026 10:58:36 +0200 Subject: [PATCH] Add runtime processing dependency resolution --- .../module/DriverActivityIntervalsModule.java | 2 + .../DriverVehicleUsageIntervalsModule.java | 2 + .../module/DriverVehicleUsageMergeModule.java | 2 + ...erWorkingTimeDerivedProjectionsModule.java | 15 ++ .../module/EventEvidenceMixingModule.java | 2 + .../module/RuntimeEventAssemblyModule.java | 2 + ...imeProcessingDependencyCycleException.java | 8 ++ .../RuntimeProcessingDependencyException.java | 12 ++ .../module/RuntimeProcessingModule.java | 20 +++ .../RuntimeProcessingModuleContext.java | 23 +++ .../RuntimeProcessingPipelineExecutor.java | 8 +- .../module/RuntimeProcessingPlanResolver.java | 90 ++++++++++++ .../SupportEvidenceNormalizationModule.java | 5 + .../VehicleEvidenceAttachmentModule.java | 6 + .../VehicleUsageReconciliationModule.java | 2 + ...riverWorkingTimeRuntimeProcessingPlan.java | 45 +++++- .../RuntimeProcessingModuleDescriptorDto.java | 42 +++++- ...RuntimeProcessingPipelineExecutorTest.java | 131 ++++++++++++------ 18 files changed, 364 insertions(+), 53 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingDependencyCycleException.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingDependencyException.java create mode 100644 src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPlanResolver.java diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverActivityIntervalsModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverActivityIntervalsModule.java index 35e3e8c..3a2d632 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverActivityIntervalsModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverActivityIntervalsModule.java @@ -46,6 +46,8 @@ public class DriverActivityIntervalsModule implements RuntimeEplModule { "Event to activity intervals", "EPL module that pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral driver activity intervals.", engine(), + Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING), + Set.of("RuntimeMixedEventBundle.activityTimelineEvents"), Set.of("DriverActivityIntervalEvent") ); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModule.java index 25301ea..17b2aa8 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageIntervalsModule.java @@ -46,6 +46,8 @@ public class DriverVehicleUsageIntervalsModule implements RuntimeEplModule { "Event to vehicle usage intervals", "EPL module that pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral driver vehicle-usage intervals.", engine(), + Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING), + Set.of("RuntimeMixedEventBundle.vehicleUsageEvents"), Set.of("DriverVehicleUsageIntervalEvent") ); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageMergeModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageMergeModule.java index 9e746bf..554c51c 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageMergeModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverVehicleUsageMergeModule.java @@ -29,6 +29,8 @@ public class DriverVehicleUsageMergeModule implements RuntimeProcessingModule { "Vehicle usage merge", "Merges adjacent or continuous effective same-driver/same-vehicle usage intervals after source reconciliation.", "JAVA", + Set.of(DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION), + Set.of("RuntimeVehicleUsageReconciliationResult.effectiveVehicleUsageIntervals"), Set.of("DriverWorkingTimeVehicleUsageInterval") ); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java index bba8372..e71b367 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/DriverWorkingTimeDerivedProjectionsModule.java @@ -53,11 +53,26 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess @Override public RuntimeProcessingModuleDescriptorDto descriptor() { + if (legacyScopeProcessingService != null) { + return new RuntimeProcessingModuleDescriptorDto( + moduleKey(), + "Driving-derived projections", + "Executes the legacy driver working-time scope adapter. This compatibility mode performs upstream assembly internally.", + "ESPER+JAVA", + Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto") + ); + } return new RuntimeProcessingModuleDescriptorDto( moduleKey(), "Driving-derived projections", "Executes the shared driver working-time core from typed per-driver module outputs for driving interruptions, rest candidates, card-absence coverage, overnight candidates, and trip candidates.", "ESPER+JAVA", + Set.of( + DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS, + DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE, + DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION + ), + Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map"), Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto") ); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java index 36dbf96..58ec56b 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/EventEvidenceMixingModule.java @@ -41,6 +41,8 @@ public class EventEvidenceMixingModule implements RuntimeProcessingModule { "Event evidence mixing", "Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged for vehicle-usage processing.", "JAVA", + Set.of(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY), + Set.of("UnifiedRuntimeEventBundle"), Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto") ); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeEventAssemblyModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeEventAssemblyModule.java index a845eef..457c371 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeEventAssemblyModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeEventAssemblyModule.java @@ -30,6 +30,8 @@ public class RuntimeEventAssemblyModule implements RuntimeProcessingModule { "Runtime event assembly", "Loads and merges canonical runtime events from the selected source scope before plan-specific processing.", "JAVA", + Set.of(), + Set.of("runtime source selection"), Set.of("UnifiedRuntimeEventBundle") ); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingDependencyCycleException.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingDependencyCycleException.java new file mode 100644 index 0000000..462e184 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingDependencyCycleException.java @@ -0,0 +1,8 @@ +package at.procon.eventhub.processing.eventprocessing.module; + +public class RuntimeProcessingDependencyCycleException extends RuntimeProcessingDependencyException { + + public RuntimeProcessingDependencyCycleException(String message) { + super(message); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingDependencyException.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingDependencyException.java new file mode 100644 index 0000000..864fceb --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingDependencyException.java @@ -0,0 +1,12 @@ +package at.procon.eventhub.processing.eventprocessing.module; + +public class RuntimeProcessingDependencyException extends IllegalArgumentException { + + public RuntimeProcessingDependencyException(String message) { + super(message); + } + + public RuntimeProcessingDependencyException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingModule.java index 65f23a5..e518965 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingModule.java @@ -16,5 +16,25 @@ public interface RuntimeProcessingModule { RuntimeProcessingModuleDescriptorDto descriptor(); + default java.util.Set requiredModuleKeys() { + RuntimeProcessingModuleDescriptorDto descriptor = descriptor(); + return descriptor == null ? java.util.Set.of() : descriptor.requiredModuleKeys(); + } + + default java.util.Set optionalModuleKeys() { + RuntimeProcessingModuleDescriptorDto descriptor = descriptor(); + return descriptor == null ? java.util.Set.of() : descriptor.optionalModuleKeys(); + } + + default java.util.Set consumedStreams() { + RuntimeProcessingModuleDescriptorDto descriptor = descriptor(); + return descriptor == null ? java.util.Set.of() : descriptor.consumedStreams(); + } + + default java.util.Set producedStreams() { + RuntimeProcessingModuleDescriptorDto descriptor = descriptor(); + return descriptor == null ? java.util.Set.of() : descriptor.producedStreams(); + } + RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingModuleContext.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingModuleContext.java index f9e8bbb..92c0dd6 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingModuleContext.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingModuleContext.java @@ -18,4 +18,27 @@ public record RuntimeProcessingModuleContext( attributes = attributes == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(attributes)); previousResults = previousResults == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(previousResults)); } + + public RuntimeProcessingModuleResult requireResult(String moduleKey) { + RuntimeProcessingModuleResult result = previousResults.get(moduleKey); + if (result == null) { + throw new RuntimeProcessingDependencyException("Required runtime processing module result is missing: " + moduleKey); + } + if (result.status() == RuntimeProcessingModuleStatus.FAILED) { + throw new RuntimeProcessingDependencyException("Required runtime processing module failed: " + moduleKey); + } + return result; + } + + public T requireOutput(String moduleKey, Class outputType) { + RuntimeProcessingModuleResult result = requireResult(moduleKey); + Object output = result.output(); + if (outputType.isInstance(output)) { + return outputType.cast(output); + } + throw new RuntimeProcessingDependencyException("Required runtime processing module " + moduleKey + + " did not produce " + outputType.getName() + ". Actual: " + + (output == null ? "null" : output.getClass().getName())); + } } + diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutor.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutor.java index 3c559c5..acbce04 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutor.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutor.java @@ -16,9 +16,11 @@ public class RuntimeProcessingPipelineExecutor { private static final Logger LOG = LoggerFactory.getLogger(RuntimeProcessingPipelineExecutor.class); private final RuntimeProcessingModuleRegistry moduleRegistry; + private final RuntimeProcessingPlanResolver planResolver; public RuntimeProcessingPipelineExecutor(RuntimeProcessingModuleRegistry moduleRegistry) { this.moduleRegistry = moduleRegistry; + this.planResolver = new RuntimeProcessingPlanResolver(moduleRegistry); } public Map execute( @@ -30,7 +32,9 @@ public class RuntimeProcessingPipelineExecutor { RuntimeProcessingModuleContext context = initialContext == null ? new RuntimeProcessingModuleContext(request, List.of(), Map.of(), Map.of()) : initialContext; - for (String moduleKey : moduleKeys == null ? List.of() : moduleKeys) { + List resolvedModuleKeys = planResolver.resolve(moduleKeys == null ? List.of() : moduleKeys); + LOG.debug("Resolved runtime processing modules {} to execution order {}", moduleKeys, resolvedModuleKeys); + for (String moduleKey : resolvedModuleKeys) { if (moduleKey == null || moduleKey.isBlank()) { continue; } @@ -56,7 +60,7 @@ public class RuntimeProcessingPipelineExecutor { break; } } - return Map.copyOf(results); + return java.util.Collections.unmodifiableMap(new LinkedHashMap<>(results)); } private RuntimeProcessingModuleResult withExecutionDuration( diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPlanResolver.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPlanResolver.java new file mode 100644 index 0000000..d2150a7 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPlanResolver.java @@ -0,0 +1,90 @@ +package at.procon.eventhub.processing.eventprocessing.module; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Resolves a requested runtime module list into an executable dependency order. + * + *

The resolver intentionally treats module dependencies as execution contracts, not as UI hints: + * every required predecessor is inserted before the requested module, unknown modules fail fast, and + * dependency cycles are rejected before any module is executed.

+ */ +public class RuntimeProcessingPlanResolver { + + private final RuntimeProcessingModuleRegistry moduleRegistry; + + public RuntimeProcessingPlanResolver(RuntimeProcessingModuleRegistry moduleRegistry) { + this.moduleRegistry = moduleRegistry; + } + + public List resolve(List requestedModuleKeys) { + LinkedHashSet requested = normalize(requestedModuleKeys); + LinkedHashSet resolved = new LinkedHashSet<>(); + LinkedHashSet visiting = new LinkedHashSet<>(); + ArrayDeque path = new ArrayDeque<>(); + + for (String moduleKey : requested) { + visit(moduleKey, resolved, visiting, path); + } + return List.copyOf(resolved); + } + + private void visit( + String moduleKey, + LinkedHashSet resolved, + LinkedHashSet visiting, + ArrayDeque path + ) { + if (resolved.contains(moduleKey)) { + return; + } + RuntimeProcessingModule module; + try { + module = moduleRegistry.require(moduleKey); + } catch (IllegalArgumentException ex) { + throw new RuntimeProcessingDependencyException("Unknown runtime processing module requested or required: " + moduleKey, ex); + } + if (!visiting.add(moduleKey)) { + throw new RuntimeProcessingDependencyCycleException("Runtime processing module dependency cycle detected: " + + cycleDescription(path, moduleKey)); + } + path.addLast(moduleKey); + for (String requiredModuleKey : module.requiredModuleKeys()) { + visit(requiredModuleKey, resolved, visiting, path); + } + path.removeLast(); + visiting.remove(moduleKey); + resolved.add(moduleKey); + } + + private LinkedHashSet normalize(List moduleKeys) { + LinkedHashSet normalized = new LinkedHashSet<>(); + if (moduleKeys != null) { + for (String moduleKey : moduleKeys) { + if (moduleKey != null && !moduleKey.isBlank()) { + normalized.add(moduleKey.trim()); + } + } + } + return normalized; + } + + private String cycleDescription(ArrayDeque path, String repeatedModuleKey) { + List cycle = new ArrayList<>(); + boolean inCycle = false; + for (String element : path) { + if (element.equals(repeatedModuleKey)) { + inCycle = true; + } + if (inCycle) { + cycle.add(element); + } + } + cycle.add(repeatedModuleKey); + return String.join(" -> ", cycle); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/SupportEvidenceNormalizationModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/SupportEvidenceNormalizationModule.java index 96c92a3..6781f1d 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/SupportEvidenceNormalizationModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/SupportEvidenceNormalizationModule.java @@ -59,6 +59,11 @@ public class SupportEvidenceNormalizationModule implements RuntimeProcessingModu "Support evidence normalization", "Normalizes mixed-source support evidence into typed per-driver working-time inputs for the common processing core.", "JAVA", + Set.of( + DriverWorkingTimeModuleKeys.VEHICLE_EVIDENCE_ATTACHMENT, + DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS + ), + Set.of("Map", "DriverActivityIntervalEvent"), Set.of("Map") ); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java index 88214d5..1e49e5e 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleEvidenceAttachmentModule.java @@ -57,6 +57,12 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule "Vehicle evidence attachment", "Partitions the broad runtime scope by driver and attaches vehicle-only evidence using merged vehicle-usage intervals from the common pipeline.", "JAVA", + Set.of( + DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY, + DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING, + DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE + ), + Set.of("UnifiedRuntimeEventBundle", "RuntimeMixedEventBundle.driverPartitionEvents", "DriverWorkingTimeVehicleUsageInterval"), Set.of("Map") ); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleUsageReconciliationModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleUsageReconciliationModule.java index 9ad4138..549daee 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleUsageReconciliationModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/VehicleUsageReconciliationModule.java @@ -38,6 +38,8 @@ public class VehicleUsageReconciliationModule implements RuntimeProcessingModule "Vehicle usage reconciliation", "Normalizes CARD_VEHICLES_USED technical midnight splits, then reconciles normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary for effective vehicle usage; CARD_VEHICLES_USED remains fallback or corroborating evidence.", "JAVA", + Set.of(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS), + Set.of("DriverVehicleUsageIntervalEvent"), Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent") ); } diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java index 26e17cc..aa8de7a 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java @@ -118,6 +118,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "Runtime event assembly", "Loads and merges canonical runtime events from the selected source scope before plan-specific processing.", "JAVA", + Set.of(), + Set.of("runtime source selection"), Set.of("UnifiedRuntimeEventBundle") )); descriptors.add(new RuntimeProcessingModuleDescriptorDto( @@ -125,6 +127,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "Event evidence mixing", "Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged.", "JAVA", + Set.of(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY), + Set.of("UnifiedRuntimeEventBundle"), Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto") )); } @@ -134,6 +138,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "Event to activity intervals", "Pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral activity intervals using a first-class EPL module.", "ESPER", + Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING), + Set.of("RuntimeMixedEventBundle.activityTimelineEvents"), Set.of("DriverActivityIntervalEvent") ), new RuntimeProcessingModuleDescriptorDto( @@ -141,6 +147,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "Event to vehicle usage intervals", "Pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral vehicle-usage intervals using a first-class EPL module.", "ESPER", + Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING), + Set.of("RuntimeMixedEventBundle.vehicleUsageEvents"), Set.of("DriverVehicleUsageIntervalEvent") ), new RuntimeProcessingModuleDescriptorDto( @@ -148,6 +156,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "Vehicle usage reconciliation", "Coalesces CARD_VEHICLES_USED technical midnight splits before reconciling normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary; CARD_VEHICLES_USED is fallback or corroborating evidence.", "JAVA", + Set.of(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS), + Set.of("DriverVehicleUsageIntervalEvent"), Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent") ), new RuntimeProcessingModuleDescriptorDto( @@ -155,6 +165,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "Vehicle usage merge", "Merges adjacent effective same-driver/same-vehicle usage intervals after vehicle-usage source reconciliation.", "JAVA", + Set.of(DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION), + Set.of("RuntimeVehicleUsageReconciliationResult.effectiveVehicleUsageIntervals"), Set.of("DriverVehicleUsageIntervalEvent") ), new RuntimeProcessingModuleDescriptorDto( @@ -162,6 +174,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "Vehicle evidence attachment", "Attaches vehicle-only events to driver partitions when they overlap driver vehicle-usage intervals.", "JAVA", + Set.of( + DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY, + DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING, + DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE + ), + Set.of("UnifiedRuntimeEventBundle", "RuntimeMixedEventBundle.driverPartitionEvents", "DriverWorkingTimeVehicleUsageInterval"), Set.of("RuntimeDriverPartitionDebugDto") ), new RuntimeProcessingModuleDescriptorDto( @@ -169,6 +187,11 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "Support evidence normalization", "Normalizes attached mixed-source support evidence for driver working-time processing.", "JAVA", + Set.of( + DriverWorkingTimeModuleKeys.VEHICLE_EVIDENCE_ATTACHMENT, + DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS + ), + Set.of("Map", "DriverActivityIntervalEvent"), Set.of("RuntimeSupportEvidenceNormalizationDebugDto") ), new RuntimeProcessingModuleDescriptorDto( @@ -176,6 +199,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing "Driving-derived projections", "Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates.", "ESPER+JAVA", + Set.of( + DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS, + DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE, + DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION + ), + Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map"), Set.of("DriverWorkingTimeProcessingResultDto") ) )); @@ -215,22 +244,22 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing boolean includeExecutionModuleResults = booleanParameter( request.parameters(), INCLUDE_EXECUTION_MODULE_RESULTS_PARAMETER, - false + true ); boolean includePartitionMetadata = booleanParameter( request.parameters(), INCLUDE_PARTITION_METADATA_PARAMETER, - false + true ); boolean includePartitionModuleResults = booleanParameter( request.parameters(), INCLUDE_PARTITION_MODULE_RESULTS_PARAMETER, - false + true ); boolean includeSupportEvidenceNormalization = booleanParameter( request.parameters(), INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER, - false + true ); int vehicleEvidencePaddingMinutes = resolveVehicleEvidencePaddingMinutes( request.sourceSelection(), @@ -255,12 +284,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing Map.of() ); - List executedModules = requestedOrDefaultModules(request.modules()); + List requestedModules = requestedOrDefaultModules(request.modules()); Map moduleResults = pipelineExecutor.execute( request, - executedModules, + requestedModules, initialContext ); + List executedModules = List.copyOf(moduleResults.keySet()); UnifiedRuntimeDriverWorkingTimeScopeResultDto workingTimeResult = extractWorkingTimeResult(moduleResults); Map partitionResults = new LinkedHashMap<>(); @@ -527,6 +557,9 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing } private List requestedOrDefaultModules(List requestedModules) { + if (!includeRuntimeEventAssemblyModule) { + return List.of(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS); + } if (requestedModules != null && !requestedModules.isEmpty()) { LinkedHashMap requested = new LinkedHashMap<>(); for (String module : requestedModules) { diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingModuleDescriptorDto.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingModuleDescriptorDto.java index e7b44cb..f8732b6 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingModuleDescriptorDto.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/RuntimeProcessingModuleDescriptorDto.java @@ -7,7 +7,11 @@ public record RuntimeProcessingModuleDescriptorDto( String displayName, String description, String engine, - Set producedStreams + Set requiredModuleKeys, + Set optionalModuleKeys, + Set consumedStreams, + Set producedStreams, + String version ) { public RuntimeProcessingModuleDescriptorDto { if (moduleKey == null || moduleKey.isBlank()) { @@ -17,6 +21,42 @@ public record RuntimeProcessingModuleDescriptorDto( displayName = displayName == null || displayName.isBlank() ? moduleKey : displayName.trim(); description = description == null ? "" : description; engine = engine == null || engine.isBlank() ? "UNKNOWN" : engine.trim(); + requiredModuleKeys = normalizeKeys(requiredModuleKeys); + optionalModuleKeys = normalizeKeys(optionalModuleKeys); + consumedStreams = consumedStreams == null ? Set.of() : Set.copyOf(consumedStreams); producedStreams = producedStreams == null ? Set.of() : Set.copyOf(producedStreams); + version = version == null || version.isBlank() ? "1" : version.trim(); + } + + public RuntimeProcessingModuleDescriptorDto( + String moduleKey, + String displayName, + String description, + String engine, + Set producedStreams + ) { + this(moduleKey, displayName, description, engine, Set.of(), Set.of(), Set.of(), producedStreams, "1"); + } + + public RuntimeProcessingModuleDescriptorDto( + String moduleKey, + String displayName, + String description, + String engine, + Set requiredModuleKeys, + Set consumedStreams, + Set producedStreams + ) { + this(moduleKey, displayName, description, engine, requiredModuleKeys, Set.of(), consumedStreams, producedStreams, "1"); + } + + private static Set normalizeKeys(Set keys) { + if (keys == null || keys.isEmpty()) { + return Set.of(); + } + return keys.stream() + .filter(key -> key != null && !key.isBlank()) + .map(String::trim) + .collect(java.util.stream.Collectors.toUnmodifiableSet()); } } diff --git a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutorTest.java b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutorTest.java index 6b723e0..33fbb88 100644 --- a/src/test/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutorTest.java +++ b/src/test/java/at/procon/eventhub/processing/eventprocessing/module/RuntimeProcessingPipelineExecutorTest.java @@ -1,6 +1,7 @@ package at.procon.eventhub.processing.eventprocessing.module; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest; import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; @@ -20,49 +21,7 @@ class RuntimeProcessingPipelineExecutorTest { ); Map results = executor.execute( - new RuntimeProcessingExecutionApiRequest( - "driver-working-time-v1", - new at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest( - java.util.UUID.randomUUID(), - List.of(), - null, - null, - java.util.Set.of(at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION), - null, - null, - "12:123", - java.util.Set.of(), - false, - java.util.Set.of(), - false, - null, - null, - null, - java.time.OffsetDateTime.parse("2026-05-01T00:00:00Z"), - java.time.OffsetDateTime.parse("2026-05-01T01:00:00Z"), - true, - 0, - null, - null, - null, - null, - null - ), - new at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest( - RuntimeEventPartitioningStrategy.DRIVER, - null, - false, - null, - false, - null, - false, - null, - null, - null - ), - List.of(), - Map.of() - ), + request(), List.of("first", "second"), null ); @@ -78,12 +37,94 @@ class RuntimeProcessingPipelineExecutorTest { .isGreaterThanOrEqualTo(0L); } + @Test + void insertsRequiredModulesBeforeRequestedModule() { + RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor( + new RuntimeProcessingModuleRegistry(List.of( + new TestModule("assembly"), + new TestModule("mixing", "assembly"), + new TestModule("final", "mixing") + )) + ); + + Map results = executor.execute( + request(), + List.of("final"), + null + ); + + assertThat(results.keySet()).containsExactly("assembly", "mixing", "final"); + } + + @Test + void rejectsDependencyCyclesBeforeExecution() { + RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor( + new RuntimeProcessingModuleRegistry(List.of( + new TestModule("a", "b"), + new TestModule("b", "a") + )) + ); + + assertThatThrownBy(() -> executor.execute(request(), List.of("a"), null)) + .isInstanceOf(RuntimeProcessingDependencyCycleException.class) + .hasMessageContaining("a -> b -> a"); + } + + private RuntimeProcessingExecutionApiRequest request() { + return new RuntimeProcessingExecutionApiRequest( + "driver-working-time-v1", + new at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest( + java.util.UUID.randomUUID(), + List.of(), + null, + null, + java.util.Set.of(at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION), + null, + null, + "12:123", + java.util.Set.of(), + false, + java.util.Set.of(), + false, + null, + null, + null, + java.time.OffsetDateTime.parse("2026-05-01T00:00:00Z"), + java.time.OffsetDateTime.parse("2026-05-01T01:00:00Z"), + true, + 0, + null, + null, + null, + null, + null + ), + new at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest( + RuntimeEventPartitioningStrategy.DRIVER, + null, + false, + null, + false, + null, + false, + null, + null, + null + ), + List.of(), + Map.of() + ); + } + private static final class TestModule implements RuntimeProcessingModule { private final String key; + private final java.util.Set requiredKeys; - private TestModule(String key) { + private TestModule(String key, String... requiredKeys) { this.key = key; + this.requiredKeys = java.util.Arrays.stream(requiredKeys) + .collect(java.util.stream.Collectors.toUnmodifiableSet()); } @Override @@ -98,6 +139,8 @@ class RuntimeProcessingPipelineExecutorTest { key, "test", "JAVA", + requiredKeys, + java.util.Set.of(), java.util.Set.of() ); }