Add runtime processing dependency resolution

This commit is contained in:
trifonovt 2026-06-15 10:58:36 +02:00
parent e78d7d6ea8
commit c066c5c777
18 changed files with 364 additions and 53 deletions

View File

@ -46,6 +46,8 @@ public class DriverActivityIntervalsModule implements RuntimeEplModule {
"Event to activity intervals",
"EPL module that pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral driver activity intervals.",
engine(),
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
Set.of("RuntimeMixedEventBundle.activityTimelineEvents"),
Set.of("DriverActivityIntervalEvent")
);
}

View File

@ -46,6 +46,8 @@ public class DriverVehicleUsageIntervalsModule implements RuntimeEplModule {
"Event to vehicle usage intervals",
"EPL module that pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral driver vehicle-usage intervals.",
engine(),
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
Set.of("RuntimeMixedEventBundle.vehicleUsageEvents"),
Set.of("DriverVehicleUsageIntervalEvent")
);
}

View File

@ -29,6 +29,8 @@ public class DriverVehicleUsageMergeModule implements RuntimeProcessingModule {
"Vehicle usage merge",
"Merges adjacent or continuous effective same-driver/same-vehicle usage intervals after source reconciliation.",
"JAVA",
Set.of(DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION),
Set.of("RuntimeVehicleUsageReconciliationResult.effectiveVehicleUsageIntervals"),
Set.of("DriverWorkingTimeVehicleUsageInterval")
);
}

View File

@ -53,11 +53,26 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
@Override
public RuntimeProcessingModuleDescriptorDto descriptor() {
if (legacyScopeProcessingService != null) {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Driving-derived projections",
"Executes the legacy driver working-time scope adapter. This compatibility mode performs upstream assembly internally.",
"ESPER+JAVA",
Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto")
);
}
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Driving-derived projections",
"Executes the shared driver working-time core from typed per-driver module outputs for driving interruptions, rest candidates, card-absence coverage, overnight candidates, and trip candidates.",
"ESPER+JAVA",
Set.of(
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS,
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION
),
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"),
Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto")
);
}

View File

@ -41,6 +41,8 @@ public class EventEvidenceMixingModule implements RuntimeProcessingModule {
"Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged for vehicle-usage processing.",
"JAVA",
Set.of(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY),
Set.of("UnifiedRuntimeEventBundle"),
Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto")
);
}

View File

@ -30,6 +30,8 @@ public class RuntimeEventAssemblyModule implements RuntimeProcessingModule {
"Runtime event assembly",
"Loads and merges canonical runtime events from the selected source scope before plan-specific processing.",
"JAVA",
Set.of(),
Set.of("runtime source selection"),
Set.of("UnifiedRuntimeEventBundle")
);
}

View File

@ -0,0 +1,8 @@
package at.procon.eventhub.processing.eventprocessing.module;
public class RuntimeProcessingDependencyCycleException extends RuntimeProcessingDependencyException {
public RuntimeProcessingDependencyCycleException(String message) {
super(message);
}
}

View File

@ -0,0 +1,12 @@
package at.procon.eventhub.processing.eventprocessing.module;
public class RuntimeProcessingDependencyException extends IllegalArgumentException {
public RuntimeProcessingDependencyException(String message) {
super(message);
}
public RuntimeProcessingDependencyException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -16,5 +16,25 @@ public interface RuntimeProcessingModule {
RuntimeProcessingModuleDescriptorDto descriptor();
default java.util.Set<String> requiredModuleKeys() {
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
return descriptor == null ? java.util.Set.of() : descriptor.requiredModuleKeys();
}
default java.util.Set<String> optionalModuleKeys() {
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
return descriptor == null ? java.util.Set.of() : descriptor.optionalModuleKeys();
}
default java.util.Set<String> consumedStreams() {
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
return descriptor == null ? java.util.Set.of() : descriptor.consumedStreams();
}
default java.util.Set<String> producedStreams() {
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
return descriptor == null ? java.util.Set.of() : descriptor.producedStreams();
}
RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context);
}

View File

@ -18,4 +18,27 @@ public record RuntimeProcessingModuleContext(
attributes = attributes == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(attributes));
previousResults = previousResults == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(previousResults));
}
public RuntimeProcessingModuleResult requireResult(String moduleKey) {
RuntimeProcessingModuleResult result = previousResults.get(moduleKey);
if (result == null) {
throw new RuntimeProcessingDependencyException("Required runtime processing module result is missing: " + moduleKey);
}
if (result.status() == RuntimeProcessingModuleStatus.FAILED) {
throw new RuntimeProcessingDependencyException("Required runtime processing module failed: " + moduleKey);
}
return result;
}
public <T> T requireOutput(String moduleKey, Class<T> outputType) {
RuntimeProcessingModuleResult result = requireResult(moduleKey);
Object output = result.output();
if (outputType.isInstance(output)) {
return outputType.cast(output);
}
throw new RuntimeProcessingDependencyException("Required runtime processing module " + moduleKey
+ " did not produce " + outputType.getName() + ". Actual: "
+ (output == null ? "null" : output.getClass().getName()));
}
}

View File

@ -16,9 +16,11 @@ public class RuntimeProcessingPipelineExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RuntimeProcessingPipelineExecutor.class);
private final RuntimeProcessingModuleRegistry moduleRegistry;
private final RuntimeProcessingPlanResolver planResolver;
public RuntimeProcessingPipelineExecutor(RuntimeProcessingModuleRegistry moduleRegistry) {
this.moduleRegistry = moduleRegistry;
this.planResolver = new RuntimeProcessingPlanResolver(moduleRegistry);
}
public Map<String, RuntimeProcessingModuleResult> execute(
@ -30,7 +32,9 @@ public class RuntimeProcessingPipelineExecutor {
RuntimeProcessingModuleContext context = initialContext == null
? new RuntimeProcessingModuleContext(request, List.of(), Map.of(), Map.of())
: initialContext;
for (String moduleKey : moduleKeys == null ? List.<String>of() : moduleKeys) {
List<String> resolvedModuleKeys = planResolver.resolve(moduleKeys == null ? List.<String>of() : moduleKeys);
LOG.debug("Resolved runtime processing modules {} to execution order {}", moduleKeys, resolvedModuleKeys);
for (String moduleKey : resolvedModuleKeys) {
if (moduleKey == null || moduleKey.isBlank()) {
continue;
}
@ -56,7 +60,7 @@ public class RuntimeProcessingPipelineExecutor {
break;
}
}
return Map.copyOf(results);
return java.util.Collections.unmodifiableMap(new LinkedHashMap<>(results));
}
private RuntimeProcessingModuleResult withExecutionDuration(

View File

@ -0,0 +1,90 @@
package at.procon.eventhub.processing.eventprocessing.module;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* Resolves a requested runtime module list into an executable dependency order.
*
* <p>The resolver intentionally treats module dependencies as execution contracts, not as UI hints:
* every required predecessor is inserted before the requested module, unknown modules fail fast, and
* dependency cycles are rejected before any module is executed.</p>
*/
public class RuntimeProcessingPlanResolver {
private final RuntimeProcessingModuleRegistry moduleRegistry;
public RuntimeProcessingPlanResolver(RuntimeProcessingModuleRegistry moduleRegistry) {
this.moduleRegistry = moduleRegistry;
}
public List<String> resolve(List<String> requestedModuleKeys) {
LinkedHashSet<String> requested = normalize(requestedModuleKeys);
LinkedHashSet<String> resolved = new LinkedHashSet<>();
LinkedHashSet<String> visiting = new LinkedHashSet<>();
ArrayDeque<String> path = new ArrayDeque<>();
for (String moduleKey : requested) {
visit(moduleKey, resolved, visiting, path);
}
return List.copyOf(resolved);
}
private void visit(
String moduleKey,
LinkedHashSet<String> resolved,
LinkedHashSet<String> visiting,
ArrayDeque<String> path
) {
if (resolved.contains(moduleKey)) {
return;
}
RuntimeProcessingModule module;
try {
module = moduleRegistry.require(moduleKey);
} catch (IllegalArgumentException ex) {
throw new RuntimeProcessingDependencyException("Unknown runtime processing module requested or required: " + moduleKey, ex);
}
if (!visiting.add(moduleKey)) {
throw new RuntimeProcessingDependencyCycleException("Runtime processing module dependency cycle detected: "
+ cycleDescription(path, moduleKey));
}
path.addLast(moduleKey);
for (String requiredModuleKey : module.requiredModuleKeys()) {
visit(requiredModuleKey, resolved, visiting, path);
}
path.removeLast();
visiting.remove(moduleKey);
resolved.add(moduleKey);
}
private LinkedHashSet<String> normalize(List<String> moduleKeys) {
LinkedHashSet<String> normalized = new LinkedHashSet<>();
if (moduleKeys != null) {
for (String moduleKey : moduleKeys) {
if (moduleKey != null && !moduleKey.isBlank()) {
normalized.add(moduleKey.trim());
}
}
}
return normalized;
}
private String cycleDescription(ArrayDeque<String> path, String repeatedModuleKey) {
List<String> cycle = new ArrayList<>();
boolean inCycle = false;
for (String element : path) {
if (element.equals(repeatedModuleKey)) {
inCycle = true;
}
if (inCycle) {
cycle.add(element);
}
}
cycle.add(repeatedModuleKey);
return String.join(" -> ", cycle);
}
}

View File

@ -59,6 +59,11 @@ public class SupportEvidenceNormalizationModule implements RuntimeProcessingModu
"Support evidence normalization",
"Normalizes mixed-source support evidence into typed per-driver working-time inputs for the common processing core.",
"JAVA",
Set.of(
DriverWorkingTimeModuleKeys.VEHICLE_EVIDENCE_ATTACHMENT,
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS
),
Set.of("Map<String, DriverWorkingTimeDriverPartition>", "DriverActivityIntervalEvent"),
Set.of("Map<String, DriverWorkingTimePreparedInput>")
);
}

View File

@ -57,6 +57,12 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
"Vehicle evidence attachment",
"Partitions the broad runtime scope by driver and attaches vehicle-only evidence using merged vehicle-usage intervals from the common pipeline.",
"JAVA",
Set.of(
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE
),
Set.of("UnifiedRuntimeEventBundle", "RuntimeMixedEventBundle.driverPartitionEvents", "DriverWorkingTimeVehicleUsageInterval"),
Set.of("Map<String, DriverWorkingTimeDriverPartition>")
);
}

View File

@ -38,6 +38,8 @@ public class VehicleUsageReconciliationModule implements RuntimeProcessingModule
"Vehicle usage reconciliation",
"Normalizes CARD_VEHICLES_USED technical midnight splits, then reconciles normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary for effective vehicle usage; CARD_VEHICLES_USED remains fallback or corroborating evidence.",
"JAVA",
Set.of(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS),
Set.of("DriverVehicleUsageIntervalEvent"),
Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent")
);
}

View File

@ -118,6 +118,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Runtime event assembly",
"Loads and merges canonical runtime events from the selected source scope before plan-specific processing.",
"JAVA",
Set.of(),
Set.of("runtime source selection"),
Set.of("UnifiedRuntimeEventBundle")
));
descriptors.add(new RuntimeProcessingModuleDescriptorDto(
@ -125,6 +127,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged.",
"JAVA",
Set.of(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY),
Set.of("UnifiedRuntimeEventBundle"),
Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto")
));
}
@ -134,6 +138,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Event to activity intervals",
"Pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral activity intervals using a first-class EPL module.",
"ESPER",
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
Set.of("RuntimeMixedEventBundle.activityTimelineEvents"),
Set.of("DriverActivityIntervalEvent")
),
new RuntimeProcessingModuleDescriptorDto(
@ -141,6 +147,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Event to vehicle usage intervals",
"Pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral vehicle-usage intervals using a first-class EPL module.",
"ESPER",
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
Set.of("RuntimeMixedEventBundle.vehicleUsageEvents"),
Set.of("DriverVehicleUsageIntervalEvent")
),
new RuntimeProcessingModuleDescriptorDto(
@ -148,6 +156,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Vehicle usage reconciliation",
"Coalesces CARD_VEHICLES_USED technical midnight splits before reconciling normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary; CARD_VEHICLES_USED is fallback or corroborating evidence.",
"JAVA",
Set.of(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS),
Set.of("DriverVehicleUsageIntervalEvent"),
Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent")
),
new RuntimeProcessingModuleDescriptorDto(
@ -155,6 +165,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Vehicle usage merge",
"Merges adjacent effective same-driver/same-vehicle usage intervals after vehicle-usage source reconciliation.",
"JAVA",
Set.of(DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION),
Set.of("RuntimeVehicleUsageReconciliationResult.effectiveVehicleUsageIntervals"),
Set.of("DriverVehicleUsageIntervalEvent")
),
new RuntimeProcessingModuleDescriptorDto(
@ -162,6 +174,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Vehicle evidence attachment",
"Attaches vehicle-only events to driver partitions when they overlap driver vehicle-usage intervals.",
"JAVA",
Set.of(
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE
),
Set.of("UnifiedRuntimeEventBundle", "RuntimeMixedEventBundle.driverPartitionEvents", "DriverWorkingTimeVehicleUsageInterval"),
Set.of("RuntimeDriverPartitionDebugDto")
),
new RuntimeProcessingModuleDescriptorDto(
@ -169,6 +187,11 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Support evidence normalization",
"Normalizes attached mixed-source support evidence for driver working-time processing.",
"JAVA",
Set.of(
DriverWorkingTimeModuleKeys.VEHICLE_EVIDENCE_ATTACHMENT,
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS
),
Set.of("Map<String, DriverWorkingTimeDriverPartition>", "DriverActivityIntervalEvent"),
Set.of("RuntimeSupportEvidenceNormalizationDebugDto")
),
new RuntimeProcessingModuleDescriptorDto(
@ -176,6 +199,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Driving-derived projections",
"Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates.",
"ESPER+JAVA",
Set.of(
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS,
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION
),
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"),
Set.of("DriverWorkingTimeProcessingResultDto")
)
));
@ -215,22 +244,22 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
boolean includeExecutionModuleResults = booleanParameter(
request.parameters(),
INCLUDE_EXECUTION_MODULE_RESULTS_PARAMETER,
false
true
);
boolean includePartitionMetadata = booleanParameter(
request.parameters(),
INCLUDE_PARTITION_METADATA_PARAMETER,
false
true
);
boolean includePartitionModuleResults = booleanParameter(
request.parameters(),
INCLUDE_PARTITION_MODULE_RESULTS_PARAMETER,
false
true
);
boolean includeSupportEvidenceNormalization = booleanParameter(
request.parameters(),
INCLUDE_SUPPORT_EVIDENCE_NORMALIZATION_PARAMETER,
false
true
);
int vehicleEvidencePaddingMinutes = resolveVehicleEvidencePaddingMinutes(
request.sourceSelection(),
@ -255,12 +284,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
Map.of()
);
List<String> executedModules = requestedOrDefaultModules(request.modules());
List<String> requestedModules = requestedOrDefaultModules(request.modules());
Map<String, RuntimeProcessingModuleResult> moduleResults = pipelineExecutor.execute(
request,
executedModules,
requestedModules,
initialContext
);
List<String> executedModules = List.copyOf(moduleResults.keySet());
UnifiedRuntimeDriverWorkingTimeScopeResultDto workingTimeResult = extractWorkingTimeResult(moduleResults);
Map<String, RuntimeEventProcessingPartitionResultDto> partitionResults = new LinkedHashMap<>();
@ -527,6 +557,9 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
}
private List<String> requestedOrDefaultModules(List<String> requestedModules) {
if (!includeRuntimeEventAssemblyModule) {
return List.of(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS);
}
if (requestedModules != null && !requestedModules.isEmpty()) {
LinkedHashMap<String, String> requested = new LinkedHashMap<>();
for (String module : requestedModules) {

View File

@ -7,7 +7,11 @@ public record RuntimeProcessingModuleDescriptorDto(
String displayName,
String description,
String engine,
Set<String> producedStreams
Set<String> requiredModuleKeys,
Set<String> optionalModuleKeys,
Set<String> consumedStreams,
Set<String> producedStreams,
String version
) {
public RuntimeProcessingModuleDescriptorDto {
if (moduleKey == null || moduleKey.isBlank()) {
@ -17,6 +21,42 @@ public record RuntimeProcessingModuleDescriptorDto(
displayName = displayName == null || displayName.isBlank() ? moduleKey : displayName.trim();
description = description == null ? "" : description;
engine = engine == null || engine.isBlank() ? "UNKNOWN" : engine.trim();
requiredModuleKeys = normalizeKeys(requiredModuleKeys);
optionalModuleKeys = normalizeKeys(optionalModuleKeys);
consumedStreams = consumedStreams == null ? Set.of() : Set.copyOf(consumedStreams);
producedStreams = producedStreams == null ? Set.of() : Set.copyOf(producedStreams);
version = version == null || version.isBlank() ? "1" : version.trim();
}
public RuntimeProcessingModuleDescriptorDto(
String moduleKey,
String displayName,
String description,
String engine,
Set<String> producedStreams
) {
this(moduleKey, displayName, description, engine, Set.of(), Set.of(), Set.of(), producedStreams, "1");
}
public RuntimeProcessingModuleDescriptorDto(
String moduleKey,
String displayName,
String description,
String engine,
Set<String> requiredModuleKeys,
Set<String> consumedStreams,
Set<String> producedStreams
) {
this(moduleKey, displayName, description, engine, requiredModuleKeys, Set.of(), consumedStreams, producedStreams, "1");
}
private static Set<String> normalizeKeys(Set<String> keys) {
if (keys == null || keys.isEmpty()) {
return Set.of();
}
return keys.stream()
.filter(key -> key != null && !key.isBlank())
.map(String::trim)
.collect(java.util.stream.Collectors.toUnmodifiableSet());
}
}

View File

@ -1,6 +1,7 @@
package at.procon.eventhub.processing.eventprocessing.module;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
@ -20,7 +21,57 @@ class RuntimeProcessingPipelineExecutorTest {
);
Map<String, RuntimeProcessingModuleResult> results = executor.execute(
new RuntimeProcessingExecutionApiRequest(
request(),
List.of("first", "second"),
null
);
assertThat(results).containsKeys("first", "second");
assertThat(results.get("first").metadata()).containsEntry("marker", "first");
assertThat(results.get("first").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
assertThat(results.get("second").metadata()).containsEntry("marker", "second");
assertThat(results.get("second").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
assertThat(((Number) results.get("first").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
.isGreaterThanOrEqualTo(0L);
assertThat(((Number) results.get("second").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
.isGreaterThanOrEqualTo(0L);
}
@Test
void insertsRequiredModulesBeforeRequestedModule() {
RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor(
new RuntimeProcessingModuleRegistry(List.of(
new TestModule("assembly"),
new TestModule("mixing", "assembly"),
new TestModule("final", "mixing")
))
);
Map<String, RuntimeProcessingModuleResult> results = executor.execute(
request(),
List.of("final"),
null
);
assertThat(results.keySet()).containsExactly("assembly", "mixing", "final");
}
@Test
void rejectsDependencyCyclesBeforeExecution() {
RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor(
new RuntimeProcessingModuleRegistry(List.of(
new TestModule("a", "b"),
new TestModule("b", "a")
))
);
assertThatThrownBy(() -> executor.execute(request(), List.of("a"), null))
.isInstanceOf(RuntimeProcessingDependencyCycleException.class)
.hasMessageContaining("a -> b -> a");
}
private RuntimeProcessingExecutionApiRequest request() {
return new RuntimeProcessingExecutionApiRequest(
"driver-working-time-v1",
new at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest(
java.util.UUID.randomUUID(),
@ -62,28 +113,18 @@ class RuntimeProcessingPipelineExecutorTest {
),
List.of(),
Map.of()
),
List.of("first", "second"),
null
);
assertThat(results).containsKeys("first", "second");
assertThat(results.get("first").metadata()).containsEntry("marker", "first");
assertThat(results.get("first").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
assertThat(results.get("second").metadata()).containsEntry("marker", "second");
assertThat(results.get("second").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
assertThat(((Number) results.get("first").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
.isGreaterThanOrEqualTo(0L);
assertThat(((Number) results.get("second").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
.isGreaterThanOrEqualTo(0L);
}
private static final class TestModule implements RuntimeProcessingModule {
private final String key;
private final java.util.Set<String> requiredKeys;
private TestModule(String key) {
private TestModule(String key, String... requiredKeys) {
this.key = key;
this.requiredKeys = java.util.Arrays.stream(requiredKeys)
.collect(java.util.stream.Collectors.toUnmodifiableSet());
}
@Override
@ -98,6 +139,8 @@ class RuntimeProcessingPipelineExecutorTest {
key,
"test",
"JAVA",
requiredKeys,
java.util.Set.of(),
java.util.Set.of()
);
}