Compare commits

...

2 Commits

Author SHA1 Message Date
trifonovt c066c5c777 Add runtime processing dependency resolution 2026-06-15 10:58:36 +02:00
trifonovt e78d7d6ea8 Adjust runtime mixing and vehicle unit extraction 2026-06-12 16:34:37 +02:00
21 changed files with 648 additions and 64 deletions

View File

@ -143,6 +143,8 @@ public class RuntimeEventDescriptorFactory {
case POSITION -> prefix + "_POSITION"; case POSITION -> prefix + "_POSITION";
case PLACE -> prefix + "_PLACE"; case PLACE -> prefix + "_PLACE";
case BORDER_CROSSING -> prefix + "_BORDER_CROSSING"; case BORDER_CROSSING -> prefix + "_BORDER_CROSSING";
case LOAD_UNLOAD -> prefix + "_LOAD_UNLOAD";
case SPECIFIC_CONDITION -> prefix + "_SPECIFIC_CONDITION";
case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null; case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null;
default -> null; default -> null;
}; };
@ -203,7 +205,7 @@ public class RuntimeEventDescriptorFactory {
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)), nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()), nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()),
nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()), nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()),
nullToEmpty(event == null || event.lifecycle() == null ? null : event.lifecycle().name()), nullToEmpty(semanticSupportLifecycle(event)),
normalizeTime(event == null ? null : event.occurredAt()), normalizeTime(event == null ? null : event.occurredAt()),
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)), nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())), nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())),
@ -217,6 +219,17 @@ public class RuntimeEventDescriptorFactory {
); );
} }
private String semanticSupportLifecycle(EventHubEventDto event) {
if (event == null || event.lifecycle() == null) {
return null;
}
if (event.eventDomain() == EventDomain.PLACE
&& (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.BEGIN)) {
return EventLifecycle.BEGIN.name();
}
return event.lifecycle().name();
}
private JsonNode rawPayload(EventHubEventDto event) { private JsonNode rawPayload(EventHubEventDto event) {
return RuntimeEntityReferenceResolver.rawPayload(event); return RuntimeEntityReferenceResolver.rawPayload(event);
} }

View File

@ -10,6 +10,53 @@ import org.springframework.stereotype.Component;
@Component @Component
public class RuntimeEventMixingRuleRegistry { public class RuntimeEventMixingRuleRegistry {
private static final Set<EventDomain> SUPPORT_EVENT_DOMAINS = Set.of(
EventDomain.POSITION,
EventDomain.PLACE,
EventDomain.BORDER_CROSSING,
EventDomain.LOAD_UNLOAD,
EventDomain.SPECIFIC_CONDITION
);
private static final Set<EventType> SUPPORT_EVENT_TYPES = Set.of(
EventType.POSITION_RECORDED,
EventType.WORKING_DAY_PLACE_RECORDED,
EventType.BORDER_INBOUND,
EventType.BORDER_OUTBOUND,
EventType.BORDER_OUT_EU,
EventType.LOAD,
EventType.UNLOAD,
EventType.LOAD_UNLOAD,
EventType.OUT,
EventType.FERRY_TRAIN
);
private static final Set<EventLifecycle> SUPPORT_EVENT_LIFECYCLES = Set.of(
EventLifecycle.SNAPSHOT,
EventLifecycle.START,
EventLifecycle.BEGIN,
EventLifecycle.END,
EventLifecycle.INBOUND,
EventLifecycle.OUTBOUND,
EventLifecycle.OUT_EU
);
private static final Set<String> CARD_SUPPORT_EXTRACTION_CODES = Set.of(
"CARD_POSITION",
"CARD_PLACE",
"CARD_BORDER_CROSSING",
"CARD_LOAD_UNLOAD",
"CARD_SPECIFIC_CONDITION"
);
private static final Set<String> VU_SUPPORT_EXTRACTION_CODES = Set.of(
"VU_POSITION",
"VU_PLACE",
"VU_BORDER_CROSSING",
"VU_LOAD_UNLOAD",
"VU_SPECIFIC_CONDITION"
);
public List<RuntimeEventMixingRule> rulesForMode(String mode) { public List<RuntimeEventMixingRule> rulesForMode(String mode) {
if (RuntimeEventMixingService.MODE_OFF.equals(mode)) { if (RuntimeEventMixingService.MODE_OFF.equals(mode)) {
return List.of(); return List.of();
@ -61,12 +108,11 @@ public class RuntimeEventMixingRuleRegistry {
RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY, RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY,
RuntimeEventMixingChannel.SUPPORT_EVIDENCE, RuntimeEventMixingChannel.SUPPORT_EVIDENCE,
RuntimeEventMixingRule.EQUIVALENCE_EXACT_EVENT_KEY, RuntimeEventMixingRule.EQUIVALENCE_EXACT_EVENT_KEY,
Set.of(EventDomain.POSITION, EventDomain.PLACE, EventDomain.BORDER_CROSSING), SUPPORT_EVENT_DOMAINS,
Set.of(EventType.POSITION_RECORDED, EventType.WORKING_DAY_PLACE_RECORDED, SUPPORT_EVENT_TYPES,
EventType.BORDER_INBOUND, EventType.BORDER_OUTBOUND, EventType.BORDER_OUT_EU), SUPPORT_EVENT_LIFECYCLES,
Set.of(EventLifecycle.SNAPSHOT, EventLifecycle.INBOUND, EventLifecycle.OUTBOUND, EventLifecycle.OUT_EU), CARD_SUPPORT_EXTRACTION_CODES,
Set.of("CARD_POSITION", "CARD_PLACE", "CARD_BORDER_CROSSING"), VU_SUPPORT_EXTRACTION_CODES,
Set.of("VU_POSITION", "VU_PLACE", "VU_BORDER_CROSSING"),
RuntimeResolvedEventRole.FUSED_PRIMARY, RuntimeResolvedEventRole.FUSED_PRIMARY,
RuntimeResolvedEventRole.SUPPRESSED_DUPLICATE, RuntimeResolvedEventRole.SUPPRESSED_DUPLICATE,
"FUSED_PRIMARY_SELECTED", "FUSED_PRIMARY_SELECTED",
@ -79,12 +125,11 @@ public class RuntimeEventMixingRuleRegistry {
RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY, RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY,
RuntimeEventMixingChannel.SUPPORT_EVIDENCE, RuntimeEventMixingChannel.SUPPORT_EVIDENCE,
RuntimeEventMixingRule.EQUIVALENCE_COMPATIBLE_SUPPORT_KEY, RuntimeEventMixingRule.EQUIVALENCE_COMPATIBLE_SUPPORT_KEY,
Set.of(EventDomain.POSITION, EventDomain.PLACE, EventDomain.BORDER_CROSSING), SUPPORT_EVENT_DOMAINS,
Set.of(EventType.POSITION_RECORDED, EventType.WORKING_DAY_PLACE_RECORDED, SUPPORT_EVENT_TYPES,
EventType.BORDER_INBOUND, EventType.BORDER_OUTBOUND, EventType.BORDER_OUT_EU), SUPPORT_EVENT_LIFECYCLES,
Set.of(EventLifecycle.SNAPSHOT, EventLifecycle.INBOUND, EventLifecycle.OUTBOUND, EventLifecycle.OUT_EU), CARD_SUPPORT_EXTRACTION_CODES,
Set.of("CARD_POSITION", "CARD_PLACE", "CARD_BORDER_CROSSING"), VU_SUPPORT_EXTRACTION_CODES,
Set.of("VU_POSITION", "VU_PLACE", "VU_BORDER_CROSSING"),
RuntimeResolvedEventRole.FUSED_PRIMARY, RuntimeResolvedEventRole.FUSED_PRIMARY,
RuntimeResolvedEventRole.SUPPRESSED_DUPLICATE, RuntimeResolvedEventRole.SUPPRESSED_DUPLICATE,
"FUSED_PRIMARY_SELECTED", "FUSED_PRIMARY_SELECTED",

View File

@ -46,6 +46,8 @@ public class DriverActivityIntervalsModule implements RuntimeEplModule {
"Event to activity intervals", "Event to activity intervals",
"EPL module that pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral driver activity intervals.", "EPL module that pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral driver activity intervals.",
engine(), engine(),
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
Set.of("RuntimeMixedEventBundle.activityTimelineEvents"),
Set.of("DriverActivityIntervalEvent") Set.of("DriverActivityIntervalEvent")
); );
} }

View File

@ -46,6 +46,8 @@ public class DriverVehicleUsageIntervalsModule implements RuntimeEplModule {
"Event to vehicle usage intervals", "Event to vehicle usage intervals",
"EPL module that pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral driver vehicle-usage intervals.", "EPL module that pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral driver vehicle-usage intervals.",
engine(), engine(),
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
Set.of("RuntimeMixedEventBundle.vehicleUsageEvents"),
Set.of("DriverVehicleUsageIntervalEvent") Set.of("DriverVehicleUsageIntervalEvent")
); );
} }

View File

@ -29,6 +29,8 @@ public class DriverVehicleUsageMergeModule implements RuntimeProcessingModule {
"Vehicle usage merge", "Vehicle usage merge",
"Merges adjacent or continuous effective same-driver/same-vehicle usage intervals after source reconciliation.", "Merges adjacent or continuous effective same-driver/same-vehicle usage intervals after source reconciliation.",
"JAVA", "JAVA",
Set.of(DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION),
Set.of("RuntimeVehicleUsageReconciliationResult.effectiveVehicleUsageIntervals"),
Set.of("DriverWorkingTimeVehicleUsageInterval") Set.of("DriverWorkingTimeVehicleUsageInterval")
); );
} }

View File

@ -53,11 +53,26 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
@Override @Override
public RuntimeProcessingModuleDescriptorDto descriptor() { public RuntimeProcessingModuleDescriptorDto descriptor() {
if (legacyScopeProcessingService != null) {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Driving-derived projections",
"Executes the legacy driver working-time scope adapter. This compatibility mode performs upstream assembly internally.",
"ESPER+JAVA",
Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto")
);
}
return new RuntimeProcessingModuleDescriptorDto( return new RuntimeProcessingModuleDescriptorDto(
moduleKey(), moduleKey(),
"Driving-derived projections", "Driving-derived projections",
"Executes the shared driver working-time core from typed per-driver module outputs for driving interruptions, rest candidates, card-absence coverage, overnight candidates, and trip candidates.", "Executes the shared driver working-time core from typed per-driver module outputs for driving interruptions, rest candidates, card-absence coverage, overnight candidates, and trip candidates.",
"ESPER+JAVA", "ESPER+JAVA",
Set.of(
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS,
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION
),
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"),
Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto") Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto")
); );
} }

View File

@ -41,6 +41,8 @@ public class EventEvidenceMixingModule implements RuntimeProcessingModule {
"Event evidence mixing", "Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged for vehicle-usage processing.", "Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged for vehicle-usage processing.",
"JAVA", "JAVA",
Set.of(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY),
Set.of("UnifiedRuntimeEventBundle"),
Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto") Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto")
); );
} }

View File

@ -30,6 +30,8 @@ public class RuntimeEventAssemblyModule implements RuntimeProcessingModule {
"Runtime event assembly", "Runtime event assembly",
"Loads and merges canonical runtime events from the selected source scope before plan-specific processing.", "Loads and merges canonical runtime events from the selected source scope before plan-specific processing.",
"JAVA", "JAVA",
Set.of(),
Set.of("runtime source selection"),
Set.of("UnifiedRuntimeEventBundle") Set.of("UnifiedRuntimeEventBundle")
); );
} }

View File

@ -0,0 +1,8 @@
package at.procon.eventhub.processing.eventprocessing.module;
public class RuntimeProcessingDependencyCycleException extends RuntimeProcessingDependencyException {
public RuntimeProcessingDependencyCycleException(String message) {
super(message);
}
}

View File

@ -0,0 +1,12 @@
package at.procon.eventhub.processing.eventprocessing.module;
public class RuntimeProcessingDependencyException extends IllegalArgumentException {
public RuntimeProcessingDependencyException(String message) {
super(message);
}
public RuntimeProcessingDependencyException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -16,5 +16,25 @@ public interface RuntimeProcessingModule {
RuntimeProcessingModuleDescriptorDto descriptor(); RuntimeProcessingModuleDescriptorDto descriptor();
default java.util.Set<String> requiredModuleKeys() {
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
return descriptor == null ? java.util.Set.of() : descriptor.requiredModuleKeys();
}
default java.util.Set<String> optionalModuleKeys() {
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
return descriptor == null ? java.util.Set.of() : descriptor.optionalModuleKeys();
}
default java.util.Set<String> consumedStreams() {
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
return descriptor == null ? java.util.Set.of() : descriptor.consumedStreams();
}
default java.util.Set<String> producedStreams() {
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
return descriptor == null ? java.util.Set.of() : descriptor.producedStreams();
}
RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context); RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context);
} }

View File

@ -18,4 +18,27 @@ public record RuntimeProcessingModuleContext(
attributes = attributes == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(attributes)); attributes = attributes == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(attributes));
previousResults = previousResults == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(previousResults)); previousResults = previousResults == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(previousResults));
} }
public RuntimeProcessingModuleResult requireResult(String moduleKey) {
RuntimeProcessingModuleResult result = previousResults.get(moduleKey);
if (result == null) {
throw new RuntimeProcessingDependencyException("Required runtime processing module result is missing: " + moduleKey);
}
if (result.status() == RuntimeProcessingModuleStatus.FAILED) {
throw new RuntimeProcessingDependencyException("Required runtime processing module failed: " + moduleKey);
}
return result;
}
public <T> T requireOutput(String moduleKey, Class<T> outputType) {
RuntimeProcessingModuleResult result = requireResult(moduleKey);
Object output = result.output();
if (outputType.isInstance(output)) {
return outputType.cast(output);
}
throw new RuntimeProcessingDependencyException("Required runtime processing module " + moduleKey
+ " did not produce " + outputType.getName() + ". Actual: "
+ (output == null ? "null" : output.getClass().getName()));
}
} }

View File

@ -16,9 +16,11 @@ public class RuntimeProcessingPipelineExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RuntimeProcessingPipelineExecutor.class); private static final Logger LOG = LoggerFactory.getLogger(RuntimeProcessingPipelineExecutor.class);
private final RuntimeProcessingModuleRegistry moduleRegistry; private final RuntimeProcessingModuleRegistry moduleRegistry;
private final RuntimeProcessingPlanResolver planResolver;
public RuntimeProcessingPipelineExecutor(RuntimeProcessingModuleRegistry moduleRegistry) { public RuntimeProcessingPipelineExecutor(RuntimeProcessingModuleRegistry moduleRegistry) {
this.moduleRegistry = moduleRegistry; this.moduleRegistry = moduleRegistry;
this.planResolver = new RuntimeProcessingPlanResolver(moduleRegistry);
} }
public Map<String, RuntimeProcessingModuleResult> execute( public Map<String, RuntimeProcessingModuleResult> execute(
@ -30,7 +32,9 @@ public class RuntimeProcessingPipelineExecutor {
RuntimeProcessingModuleContext context = initialContext == null RuntimeProcessingModuleContext context = initialContext == null
? new RuntimeProcessingModuleContext(request, List.of(), Map.of(), Map.of()) ? new RuntimeProcessingModuleContext(request, List.of(), Map.of(), Map.of())
: initialContext; : initialContext;
for (String moduleKey : moduleKeys == null ? List.<String>of() : moduleKeys) { List<String> resolvedModuleKeys = planResolver.resolve(moduleKeys == null ? List.<String>of() : moduleKeys);
LOG.debug("Resolved runtime processing modules {} to execution order {}", moduleKeys, resolvedModuleKeys);
for (String moduleKey : resolvedModuleKeys) {
if (moduleKey == null || moduleKey.isBlank()) { if (moduleKey == null || moduleKey.isBlank()) {
continue; continue;
} }
@ -56,7 +60,7 @@ public class RuntimeProcessingPipelineExecutor {
break; break;
} }
} }
return Map.copyOf(results); return java.util.Collections.unmodifiableMap(new LinkedHashMap<>(results));
} }
private RuntimeProcessingModuleResult withExecutionDuration( private RuntimeProcessingModuleResult withExecutionDuration(

View File

@ -0,0 +1,90 @@
package at.procon.eventhub.processing.eventprocessing.module;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* Resolves a requested runtime module list into an executable dependency order.
*
* <p>The resolver intentionally treats module dependencies as execution contracts, not as UI hints:
* every required predecessor is inserted before the requested module, unknown modules fail fast, and
* dependency cycles are rejected before any module is executed.</p>
*/
public class RuntimeProcessingPlanResolver {
private final RuntimeProcessingModuleRegistry moduleRegistry;
public RuntimeProcessingPlanResolver(RuntimeProcessingModuleRegistry moduleRegistry) {
this.moduleRegistry = moduleRegistry;
}
public List<String> resolve(List<String> requestedModuleKeys) {
LinkedHashSet<String> requested = normalize(requestedModuleKeys);
LinkedHashSet<String> resolved = new LinkedHashSet<>();
LinkedHashSet<String> visiting = new LinkedHashSet<>();
ArrayDeque<String> path = new ArrayDeque<>();
for (String moduleKey : requested) {
visit(moduleKey, resolved, visiting, path);
}
return List.copyOf(resolved);
}
private void visit(
String moduleKey,
LinkedHashSet<String> resolved,
LinkedHashSet<String> visiting,
ArrayDeque<String> path
) {
if (resolved.contains(moduleKey)) {
return;
}
RuntimeProcessingModule module;
try {
module = moduleRegistry.require(moduleKey);
} catch (IllegalArgumentException ex) {
throw new RuntimeProcessingDependencyException("Unknown runtime processing module requested or required: " + moduleKey, ex);
}
if (!visiting.add(moduleKey)) {
throw new RuntimeProcessingDependencyCycleException("Runtime processing module dependency cycle detected: "
+ cycleDescription(path, moduleKey));
}
path.addLast(moduleKey);
for (String requiredModuleKey : module.requiredModuleKeys()) {
visit(requiredModuleKey, resolved, visiting, path);
}
path.removeLast();
visiting.remove(moduleKey);
resolved.add(moduleKey);
}
private LinkedHashSet<String> normalize(List<String> moduleKeys) {
LinkedHashSet<String> normalized = new LinkedHashSet<>();
if (moduleKeys != null) {
for (String moduleKey : moduleKeys) {
if (moduleKey != null && !moduleKey.isBlank()) {
normalized.add(moduleKey.trim());
}
}
}
return normalized;
}
private String cycleDescription(ArrayDeque<String> path, String repeatedModuleKey) {
List<String> cycle = new ArrayList<>();
boolean inCycle = false;
for (String element : path) {
if (element.equals(repeatedModuleKey)) {
inCycle = true;
}
if (inCycle) {
cycle.add(element);
}
}
cycle.add(repeatedModuleKey);
return String.join(" -> ", cycle);
}
}

View File

@ -59,6 +59,11 @@ public class SupportEvidenceNormalizationModule implements RuntimeProcessingModu
"Support evidence normalization", "Support evidence normalization",
"Normalizes mixed-source support evidence into typed per-driver working-time inputs for the common processing core.", "Normalizes mixed-source support evidence into typed per-driver working-time inputs for the common processing core.",
"JAVA", "JAVA",
Set.of(
DriverWorkingTimeModuleKeys.VEHICLE_EVIDENCE_ATTACHMENT,
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS
),
Set.of("Map<String, DriverWorkingTimeDriverPartition>", "DriverActivityIntervalEvent"),
Set.of("Map<String, DriverWorkingTimePreparedInput>") Set.of("Map<String, DriverWorkingTimePreparedInput>")
); );
} }

View File

@ -57,6 +57,12 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
"Vehicle evidence attachment", "Vehicle evidence attachment",
"Partitions the broad runtime scope by driver and attaches vehicle-only evidence using merged vehicle-usage intervals from the common pipeline.", "Partitions the broad runtime scope by driver and attaches vehicle-only evidence using merged vehicle-usage intervals from the common pipeline.",
"JAVA", "JAVA",
Set.of(
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE
),
Set.of("UnifiedRuntimeEventBundle", "RuntimeMixedEventBundle.driverPartitionEvents", "DriverWorkingTimeVehicleUsageInterval"),
Set.of("Map<String, DriverWorkingTimeDriverPartition>") Set.of("Map<String, DriverWorkingTimeDriverPartition>")
); );
} }

View File

@ -38,6 +38,8 @@ public class VehicleUsageReconciliationModule implements RuntimeProcessingModule
"Vehicle usage reconciliation", "Vehicle usage reconciliation",
"Normalizes CARD_VEHICLES_USED technical midnight splits, then reconciles normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary for effective vehicle usage; CARD_VEHICLES_USED remains fallback or corroborating evidence.", "Normalizes CARD_VEHICLES_USED technical midnight splits, then reconciles normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary for effective vehicle usage; CARD_VEHICLES_USED remains fallback or corroborating evidence.",
"JAVA", "JAVA",
Set.of(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS),
Set.of("DriverVehicleUsageIntervalEvent"),
Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent") Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent")
); );
} }

View File

@ -118,6 +118,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Runtime event assembly", "Runtime event assembly",
"Loads and merges canonical runtime events from the selected source scope before plan-specific processing.", "Loads and merges canonical runtime events from the selected source scope before plan-specific processing.",
"JAVA", "JAVA",
Set.of(),
Set.of("runtime source selection"),
Set.of("UnifiedRuntimeEventBundle") Set.of("UnifiedRuntimeEventBundle")
)); ));
descriptors.add(new RuntimeProcessingModuleDescriptorDto( descriptors.add(new RuntimeProcessingModuleDescriptorDto(
@ -125,6 +127,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Event evidence mixing", "Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged.", "Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged.",
"JAVA", "JAVA",
Set.of(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY),
Set.of("UnifiedRuntimeEventBundle"),
Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto") Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto")
)); ));
} }
@ -134,6 +138,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Event to activity intervals", "Event to activity intervals",
"Pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral activity intervals using a first-class EPL module.", "Pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral activity intervals using a first-class EPL module.",
"ESPER", "ESPER",
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
Set.of("RuntimeMixedEventBundle.activityTimelineEvents"),
Set.of("DriverActivityIntervalEvent") Set.of("DriverActivityIntervalEvent")
), ),
new RuntimeProcessingModuleDescriptorDto( new RuntimeProcessingModuleDescriptorDto(
@ -141,6 +147,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Event to vehicle usage intervals", "Event to vehicle usage intervals",
"Pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral vehicle-usage intervals using a first-class EPL module.", "Pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral vehicle-usage intervals using a first-class EPL module.",
"ESPER", "ESPER",
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
Set.of("RuntimeMixedEventBundle.vehicleUsageEvents"),
Set.of("DriverVehicleUsageIntervalEvent") Set.of("DriverVehicleUsageIntervalEvent")
), ),
new RuntimeProcessingModuleDescriptorDto( new RuntimeProcessingModuleDescriptorDto(
@ -148,6 +156,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Vehicle usage reconciliation", "Vehicle usage reconciliation",
"Coalesces CARD_VEHICLES_USED technical midnight splits before reconciling normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary; CARD_VEHICLES_USED is fallback or corroborating evidence.", "Coalesces CARD_VEHICLES_USED technical midnight splits before reconciling normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary; CARD_VEHICLES_USED is fallback or corroborating evidence.",
"JAVA", "JAVA",
Set.of(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS),
Set.of("DriverVehicleUsageIntervalEvent"),
Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent") Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent")
), ),
new RuntimeProcessingModuleDescriptorDto( new RuntimeProcessingModuleDescriptorDto(
@ -155,6 +165,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Vehicle usage merge", "Vehicle usage merge",
"Merges adjacent effective same-driver/same-vehicle usage intervals after vehicle-usage source reconciliation.", "Merges adjacent effective same-driver/same-vehicle usage intervals after vehicle-usage source reconciliation.",
"JAVA", "JAVA",
Set.of(DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION),
Set.of("RuntimeVehicleUsageReconciliationResult.effectiveVehicleUsageIntervals"),
Set.of("DriverVehicleUsageIntervalEvent") Set.of("DriverVehicleUsageIntervalEvent")
), ),
new RuntimeProcessingModuleDescriptorDto( new RuntimeProcessingModuleDescriptorDto(
@ -162,6 +174,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Vehicle evidence attachment", "Vehicle evidence attachment",
"Attaches vehicle-only events to driver partitions when they overlap driver vehicle-usage intervals.", "Attaches vehicle-only events to driver partitions when they overlap driver vehicle-usage intervals.",
"JAVA", "JAVA",
Set.of(
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE
),
Set.of("UnifiedRuntimeEventBundle", "RuntimeMixedEventBundle.driverPartitionEvents", "DriverWorkingTimeVehicleUsageInterval"),
Set.of("RuntimeDriverPartitionDebugDto") Set.of("RuntimeDriverPartitionDebugDto")
), ),
new RuntimeProcessingModuleDescriptorDto( new RuntimeProcessingModuleDescriptorDto(
@ -169,6 +187,11 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Support evidence normalization", "Support evidence normalization",
"Normalizes attached mixed-source support evidence for driver working-time processing.", "Normalizes attached mixed-source support evidence for driver working-time processing.",
"JAVA", "JAVA",
Set.of(
DriverWorkingTimeModuleKeys.VEHICLE_EVIDENCE_ATTACHMENT,
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS
),
Set.of("Map<String, DriverWorkingTimeDriverPartition>", "DriverActivityIntervalEvent"),
Set.of("RuntimeSupportEvidenceNormalizationDebugDto") Set.of("RuntimeSupportEvidenceNormalizationDebugDto")
), ),
new RuntimeProcessingModuleDescriptorDto( new RuntimeProcessingModuleDescriptorDto(
@ -176,6 +199,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"Driving-derived projections", "Driving-derived projections",
"Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates.", "Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates.",
"ESPER+JAVA", "ESPER+JAVA",
Set.of(
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS,
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION
),
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"),
Set.of("DriverWorkingTimeProcessingResultDto") Set.of("DriverWorkingTimeProcessingResultDto")
) )
)); ));
@ -255,12 +284,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
Map.of() Map.of()
); );
List<String> executedModules = requestedOrDefaultModules(request.modules()); List<String> requestedModules = requestedOrDefaultModules(request.modules());
Map<String, RuntimeProcessingModuleResult> moduleResults = pipelineExecutor.execute( Map<String, RuntimeProcessingModuleResult> moduleResults = pipelineExecutor.execute(
request, request,
executedModules, requestedModules,
initialContext initialContext
); );
List<String> executedModules = List.copyOf(moduleResults.keySet());
UnifiedRuntimeDriverWorkingTimeScopeResultDto workingTimeResult = extractWorkingTimeResult(moduleResults); UnifiedRuntimeDriverWorkingTimeScopeResultDto workingTimeResult = extractWorkingTimeResult(moduleResults);
Map<String, RuntimeEventProcessingPartitionResultDto> partitionResults = new LinkedHashMap<>(); Map<String, RuntimeEventProcessingPartitionResultDto> partitionResults = new LinkedHashMap<>();
@ -527,6 +557,9 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
} }
private List<String> requestedOrDefaultModules(List<String> requestedModules) { private List<String> requestedOrDefaultModules(List<String> requestedModules) {
if (!includeRuntimeEventAssemblyModule) {
return List.of(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS);
}
if (requestedModules != null && !requestedModules.isEmpty()) { if (requestedModules != null && !requestedModules.isEmpty()) {
LinkedHashMap<String, String> requested = new LinkedHashMap<>(); LinkedHashMap<String, String> requested = new LinkedHashMap<>();
for (String module : requestedModules) { for (String module : requestedModules) {

View File

@ -7,7 +7,11 @@ public record RuntimeProcessingModuleDescriptorDto(
String displayName, String displayName,
String description, String description,
String engine, String engine,
Set<String> producedStreams Set<String> requiredModuleKeys,
Set<String> optionalModuleKeys,
Set<String> consumedStreams,
Set<String> producedStreams,
String version
) { ) {
public RuntimeProcessingModuleDescriptorDto { public RuntimeProcessingModuleDescriptorDto {
if (moduleKey == null || moduleKey.isBlank()) { if (moduleKey == null || moduleKey.isBlank()) {
@ -17,6 +21,42 @@ public record RuntimeProcessingModuleDescriptorDto(
displayName = displayName == null || displayName.isBlank() ? moduleKey : displayName.trim(); displayName = displayName == null || displayName.isBlank() ? moduleKey : displayName.trim();
description = description == null ? "" : description; description = description == null ? "" : description;
engine = engine == null || engine.isBlank() ? "UNKNOWN" : engine.trim(); engine = engine == null || engine.isBlank() ? "UNKNOWN" : engine.trim();
requiredModuleKeys = normalizeKeys(requiredModuleKeys);
optionalModuleKeys = normalizeKeys(optionalModuleKeys);
consumedStreams = consumedStreams == null ? Set.of() : Set.copyOf(consumedStreams);
producedStreams = producedStreams == null ? Set.of() : Set.copyOf(producedStreams); producedStreams = producedStreams == null ? Set.of() : Set.copyOf(producedStreams);
version = version == null || version.isBlank() ? "1" : version.trim();
}
public RuntimeProcessingModuleDescriptorDto(
String moduleKey,
String displayName,
String description,
String engine,
Set<String> producedStreams
) {
this(moduleKey, displayName, description, engine, Set.of(), Set.of(), Set.of(), producedStreams, "1");
}
public RuntimeProcessingModuleDescriptorDto(
String moduleKey,
String displayName,
String description,
String engine,
Set<String> requiredModuleKeys,
Set<String> consumedStreams,
Set<String> producedStreams
) {
this(moduleKey, displayName, description, engine, requiredModuleKeys, Set.of(), consumedStreams, producedStreams, "1");
}
private static Set<String> normalizeKeys(Set<String> keys) {
if (keys == null || keys.isEmpty()) {
return Set.of();
}
return keys.stream()
.filter(key -> key != null && !key.isBlank())
.map(String::trim)
.collect(java.util.stream.Collectors.toUnmodifiableSet());
} }
} }

View File

@ -148,7 +148,7 @@ class RuntimeEventMixingServiceTest {
"TACHOGRAPH:CARD_PLACE:1", "TACHOGRAPH:CARD_PLACE:1",
EventDomain.PLACE, EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED, EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.SNAPSHOT, EventLifecycle.START,
false false
); );
EventHubEventDto vuPlace = supportEvidence( EventHubEventDto vuPlace = supportEvidence(
@ -157,7 +157,7 @@ class RuntimeEventMixingServiceTest {
"TACHOGRAPH:VU_PLACE:2", "TACHOGRAPH:VU_PLACE:2",
EventDomain.PLACE, EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED, EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.SNAPSHOT, EventLifecycle.START,
true true
); );
EventHubEventDto cardBorder = supportEvidence( EventHubEventDto cardBorder = supportEvidence(
@ -188,6 +188,72 @@ class RuntimeEventMixingServiceTest {
assertThat(mixed.eventMixingDecisions()).hasSize(2); assertThat(mixed.eventMixingDecisions()).hasSize(2);
} }
@Test
void suppressesVuPlaceDuplicateForFileSessionBeginLifecycle() {
EventHubEventDto cardPlace = fileSessionSupportEvidence(
"DRIVER_CARD",
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-place-1:BEGIN:2026-04-01T00:00:00Z",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN,
false
);
EventHubEventDto vuPlace = fileSessionSupportEvidence(
"VEHICLE_UNIT",
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-place-1:BEGIN:2026-04-01T00:00:00Z",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN,
true
);
RuntimeMixedEventBundle mixed = service.mix(
List.of(cardPlace, vuPlace),
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-place-1:BEGIN:2026-04-01T00:00:00Z");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-place-1:BEGIN:2026-04-01T00:00:00Z");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
}
@Test
void treatsStartAndBeginAsEquivalentPlaceLifecycles() {
EventHubEventDto cardPlace = supportEvidence(
"CARD_PLACE",
"DRIVER_CARD",
"TACHOGRAPH:CARD_PLACE:START",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.START,
false
);
EventHubEventDto vuPlace = supportEvidence(
"VU_PLACE",
"VEHICLE_UNIT",
"TACHOGRAPH:VU_PLACE:BEGIN",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.BEGIN,
true
);
RuntimeMixedEventBundle mixed = service.mix(
List.of(cardPlace, vuPlace),
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_PLACE:START");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:VU_PLACE:BEGIN");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY);
}
@Test @Test
void suppressesTachographFileSessionVuPositionDuplicateWhenNoExtractionCodeIsPresent() { void suppressesTachographFileSessionVuPositionDuplicateWhenNoExtractionCodeIsPresent() {
EventHubEventDto card = fileSessionSupportEvidence( EventHubEventDto card = fileSessionSupportEvidence(
@ -220,6 +286,143 @@ class RuntimeEventMixingServiceTest {
.containsExactly("VU_POSITION"); .containsExactly("VU_POSITION");
} }
@Test
void suppressesVuDuplicatesForAllAdditionalSupportEventTypes() {
EventHubEventDto cardLoad = supportEvidence(
"CARD_LOAD_UNLOAD", "DRIVER_CARD", "TACHOGRAPH:CARD_LOAD_UNLOAD:LOAD",
EventDomain.LOAD_UNLOAD, EventType.LOAD, EventLifecycle.SNAPSHOT, false
);
EventHubEventDto vuLoad = supportEvidence(
"VU_LOAD_UNLOAD", "VEHICLE_UNIT", "TACHOGRAPH:VU_LOAD_UNLOAD:LOAD",
EventDomain.LOAD_UNLOAD, EventType.LOAD, EventLifecycle.SNAPSHOT, true
);
EventHubEventDto cardUnload = supportEvidence(
"CARD_LOAD_UNLOAD", "DRIVER_CARD", "TACHOGRAPH:CARD_LOAD_UNLOAD:UNLOAD",
EventDomain.LOAD_UNLOAD, EventType.UNLOAD, EventLifecycle.SNAPSHOT, false
);
EventHubEventDto vuUnload = supportEvidence(
"VU_LOAD_UNLOAD", "VEHICLE_UNIT", "TACHOGRAPH:VU_LOAD_UNLOAD:UNLOAD",
EventDomain.LOAD_UNLOAD, EventType.UNLOAD, EventLifecycle.SNAPSHOT, true
);
EventHubEventDto cardLoadUnload = supportEvidence(
"CARD_LOAD_UNLOAD", "DRIVER_CARD", "TACHOGRAPH:CARD_LOAD_UNLOAD:LOAD_UNLOAD",
EventDomain.LOAD_UNLOAD, EventType.LOAD_UNLOAD, EventLifecycle.SNAPSHOT, false
);
EventHubEventDto vuLoadUnload = supportEvidence(
"VU_LOAD_UNLOAD", "VEHICLE_UNIT", "TACHOGRAPH:VU_LOAD_UNLOAD:LOAD_UNLOAD",
EventDomain.LOAD_UNLOAD, EventType.LOAD_UNLOAD, EventLifecycle.SNAPSHOT, true
);
EventHubEventDto cardOut = supportEvidence(
"CARD_SPECIFIC_CONDITION", "DRIVER_CARD", "TACHOGRAPH:CARD_SPECIFIC_CONDITION:OUT:BEGIN",
EventDomain.SPECIFIC_CONDITION, EventType.OUT, EventLifecycle.BEGIN, false
);
EventHubEventDto vuOut = supportEvidence(
"VU_SPECIFIC_CONDITION", "VEHICLE_UNIT", "TACHOGRAPH:VU_SPECIFIC_CONDITION:OUT:BEGIN",
EventDomain.SPECIFIC_CONDITION, EventType.OUT, EventLifecycle.BEGIN, true
);
EventHubEventDto cardFerryTrain = supportEvidence(
"CARD_SPECIFIC_CONDITION", "DRIVER_CARD", "TACHOGRAPH:CARD_SPECIFIC_CONDITION:FERRY_TRAIN:END",
EventDomain.SPECIFIC_CONDITION, EventType.FERRY_TRAIN, EventLifecycle.END, false
);
EventHubEventDto vuFerryTrain = supportEvidence(
"VU_SPECIFIC_CONDITION", "VEHICLE_UNIT", "TACHOGRAPH:VU_SPECIFIC_CONDITION:FERRY_TRAIN:END",
EventDomain.SPECIFIC_CONDITION, EventType.FERRY_TRAIN, EventLifecycle.END, true
);
RuntimeMixedEventBundle mixed = service.mix(
List.of(
cardLoad, vuLoad,
cardUnload, vuUnload,
cardLoadUnload, vuLoadUnload,
cardOut, vuOut,
cardFerryTrain, vuFerryTrain
),
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactlyInAnyOrder(
"TACHOGRAPH:CARD_LOAD_UNLOAD:LOAD",
"TACHOGRAPH:CARD_LOAD_UNLOAD:UNLOAD",
"TACHOGRAPH:CARD_LOAD_UNLOAD:LOAD_UNLOAD",
"TACHOGRAPH:CARD_SPECIFIC_CONDITION:OUT:BEGIN",
"TACHOGRAPH:CARD_SPECIFIC_CONDITION:FERRY_TRAIN:END"
);
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactlyInAnyOrder(
"TACHOGRAPH:VU_LOAD_UNLOAD:LOAD",
"TACHOGRAPH:VU_LOAD_UNLOAD:UNLOAD",
"TACHOGRAPH:VU_LOAD_UNLOAD:LOAD_UNLOAD",
"TACHOGRAPH:VU_SPECIFIC_CONDITION:OUT:BEGIN",
"TACHOGRAPH:VU_SPECIFIC_CONDITION:FERRY_TRAIN:END"
);
assertThat(mixed.eventMixingDecisions()).hasSize(5);
assertThat(mixed.eventMixingDecisions())
.allSatisfy(decision -> assertThat(decision.secondaryExtractionCodes()).hasSize(1));
assertThat(mixed.eventMixingDecisions())
.extracting(decision -> decision.secondaryExtractionCodes().getFirst())
.containsExactlyInAnyOrder(
"VU_LOAD_UNLOAD", "VU_LOAD_UNLOAD", "VU_LOAD_UNLOAD",
"VU_SPECIFIC_CONDITION", "VU_SPECIFIC_CONDITION"
);
}
@Test
void suppressesFileSessionVuLoadUnloadAndSpecificConditionDuplicatesWithoutExtractionCodes() {
EventHubEventDto cardLoad = fileSessionSupportEvidence(
"DRIVER_CARD",
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-load-1:SNAPSHOT:2026-04-01T00:00:00Z",
EventDomain.LOAD_UNLOAD,
EventType.LOAD,
EventLifecycle.SNAPSHOT,
false
);
EventHubEventDto vuLoad = fileSessionSupportEvidence(
"VEHICLE_UNIT",
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-load-1:SNAPSHOT:2026-04-01T00:00:00Z",
EventDomain.LOAD_UNLOAD,
EventType.LOAD,
EventLifecycle.SNAPSHOT,
true
);
EventHubEventDto cardOut = fileSessionSupportEvidence(
"DRIVER_CARD",
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-out-1:BEGIN:2026-04-01T00:00:00Z",
EventDomain.SPECIFIC_CONDITION,
EventType.OUT,
EventLifecycle.BEGIN,
false
);
EventHubEventDto vuOut = fileSessionSupportEvidence(
"VEHICLE_UNIT",
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-out-1:BEGIN:2026-04-01T00:00:00Z",
EventDomain.SPECIFIC_CONDITION,
EventType.OUT,
EventLifecycle.BEGIN,
true
);
RuntimeMixedEventBundle mixed = service.mix(
List.of(cardLoad, vuLoad, cardOut, vuOut),
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactlyInAnyOrder(
cardLoad.externalSourceEventId(),
cardOut.externalSourceEventId()
);
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactlyInAnyOrder(
vuLoad.externalSourceEventId(),
vuOut.externalSourceEventId()
);
assertThat(mixed.eventMixingDecisions()).hasSize(2);
assertThat(mixed.eventMixingDecisions())
.extracting(decision -> decision.secondaryExtractionCodes().getFirst())
.containsExactlyInAnyOrder("VU_LOAD_UNLOAD", "VU_SPECIFIC_CONDITION");
}
private EventHubEventDto activity(String extractionCode, String sourceKind, String externalId) { private EventHubEventDto activity(String extractionCode, String sourceKind, String externalId) {
ObjectNode raw = baseRaw(extractionCode, sourceKind); ObjectNode raw = baseRaw(extractionCode, sourceKind);
raw.put("activityType", "BREAK_REST"); raw.put("activityType", "BREAK_REST");
@ -264,6 +467,9 @@ class RuntimeEventMixingServiceTest {
raw.put("region", "W"); raw.put("region", "W");
raw.put("countryFrom", "AT"); raw.put("countryFrom", "AT");
raw.put("countryTo", "DE"); raw.put("countryTo", "DE");
if (domain == EventDomain.LOAD_UNLOAD) {
raw.put("operation", type.name());
}
ObjectNode payload = JsonNodeFactory.instance.objectNode(); ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw); payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode(); ObjectNode attributes = JsonNodeFactory.instance.objectNode();
@ -271,6 +477,9 @@ class RuntimeEventMixingServiceTest {
attributes.put("region", "W"); attributes.put("region", "W");
attributes.put("countryFrom", "AT"); attributes.put("countryFrom", "AT");
attributes.put("countryTo", "DE"); attributes.put("countryTo", "DE");
if (domain == EventDomain.LOAD_UNLOAD) {
attributes.put("operation", type.name());
}
VehicleRefDto vehicleRef = withVin VehicleRefDto vehicleRef = withVin
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE")) ? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE")); : new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));
@ -330,6 +539,9 @@ class RuntimeEventMixingServiceTest {
raw.put("region", "W"); raw.put("region", "W");
raw.put("countryFrom", "AT"); raw.put("countryFrom", "AT");
raw.put("countryTo", "DE"); raw.put("countryTo", "DE");
if (domain == EventDomain.LOAD_UNLOAD) {
raw.put("operation", type.name());
}
ObjectNode payload = JsonNodeFactory.instance.objectNode(); ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw); payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode(); ObjectNode attributes = JsonNodeFactory.instance.objectNode();
@ -337,6 +549,9 @@ class RuntimeEventMixingServiceTest {
attributes.put("region", "W"); attributes.put("region", "W");
attributes.put("countryFrom", "AT"); attributes.put("countryFrom", "AT");
attributes.put("countryTo", "DE"); attributes.put("countryTo", "DE");
if (domain == EventDomain.LOAD_UNLOAD) {
attributes.put("operation", type.name());
}
VehicleRefDto vehicleRef = withVin VehicleRefDto vehicleRef = withVin
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE")) ? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE")); : new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));

View File

@ -1,6 +1,7 @@
package at.procon.eventhub.processing.eventprocessing.module; package at.procon.eventhub.processing.eventprocessing.module;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
@ -20,49 +21,7 @@ class RuntimeProcessingPipelineExecutorTest {
); );
Map<String, RuntimeProcessingModuleResult> results = executor.execute( Map<String, RuntimeProcessingModuleResult> results = executor.execute(
new RuntimeProcessingExecutionApiRequest( request(),
"driver-working-time-v1",
new at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest(
java.util.UUID.randomUUID(),
List.of(),
null,
null,
java.util.Set.of(at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:123",
java.util.Set.of(),
false,
java.util.Set.of(),
false,
null,
null,
null,
java.time.OffsetDateTime.parse("2026-05-01T00:00:00Z"),
java.time.OffsetDateTime.parse("2026-05-01T01:00:00Z"),
true,
0,
null,
null,
null,
null,
null
),
new at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest(
RuntimeEventPartitioningStrategy.DRIVER,
null,
false,
null,
false,
null,
false,
null,
null,
null
),
List.of(),
Map.of()
),
List.of("first", "second"), List.of("first", "second"),
null null
); );
@ -78,12 +37,94 @@ class RuntimeProcessingPipelineExecutorTest {
.isGreaterThanOrEqualTo(0L); .isGreaterThanOrEqualTo(0L);
} }
@Test
void insertsRequiredModulesBeforeRequestedModule() {
RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor(
new RuntimeProcessingModuleRegistry(List.of(
new TestModule("assembly"),
new TestModule("mixing", "assembly"),
new TestModule("final", "mixing")
))
);
Map<String, RuntimeProcessingModuleResult> results = executor.execute(
request(),
List.of("final"),
null
);
assertThat(results.keySet()).containsExactly("assembly", "mixing", "final");
}
@Test
void rejectsDependencyCyclesBeforeExecution() {
RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor(
new RuntimeProcessingModuleRegistry(List.of(
new TestModule("a", "b"),
new TestModule("b", "a")
))
);
assertThatThrownBy(() -> executor.execute(request(), List.of("a"), null))
.isInstanceOf(RuntimeProcessingDependencyCycleException.class)
.hasMessageContaining("a -> b -> a");
}
private RuntimeProcessingExecutionApiRequest request() {
return new RuntimeProcessingExecutionApiRequest(
"driver-working-time-v1",
new at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest(
java.util.UUID.randomUUID(),
List.of(),
null,
null,
java.util.Set.of(at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:123",
java.util.Set.of(),
false,
java.util.Set.of(),
false,
null,
null,
null,
java.time.OffsetDateTime.parse("2026-05-01T00:00:00Z"),
java.time.OffsetDateTime.parse("2026-05-01T01:00:00Z"),
true,
0,
null,
null,
null,
null,
null
),
new at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest(
RuntimeEventPartitioningStrategy.DRIVER,
null,
false,
null,
false,
null,
false,
null,
null,
null
),
List.of(),
Map.of()
);
}
private static final class TestModule implements RuntimeProcessingModule { private static final class TestModule implements RuntimeProcessingModule {
private final String key; private final String key;
private final java.util.Set<String> requiredKeys;
private TestModule(String key) { private TestModule(String key, String... requiredKeys) {
this.key = key; this.key = key;
this.requiredKeys = java.util.Arrays.stream(requiredKeys)
.collect(java.util.stream.Collectors.toUnmodifiableSet());
} }
@Override @Override
@ -98,6 +139,8 @@ class RuntimeProcessingPipelineExecutorTest {
key, key,
"test", "test",
"JAVA", "JAVA",
requiredKeys,
java.util.Set.of(),
java.util.Set.of() java.util.Set.of()
); );
} }