Compare commits
2 Commits
829dc2e06a
...
c066c5c777
| Author | SHA1 | Date |
|---|---|---|
|
|
c066c5c777 | |
|
|
e78d7d6ea8 |
|
|
@ -143,6 +143,8 @@ public class RuntimeEventDescriptorFactory {
|
|||
case POSITION -> prefix + "_POSITION";
|
||||
case PLACE -> prefix + "_PLACE";
|
||||
case BORDER_CROSSING -> prefix + "_BORDER_CROSSING";
|
||||
case LOAD_UNLOAD -> prefix + "_LOAD_UNLOAD";
|
||||
case SPECIFIC_CONDITION -> prefix + "_SPECIFIC_CONDITION";
|
||||
case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null;
|
||||
default -> null;
|
||||
};
|
||||
|
|
@ -203,7 +205,7 @@ public class RuntimeEventDescriptorFactory {
|
|||
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
|
||||
nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()),
|
||||
nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()),
|
||||
nullToEmpty(event == null || event.lifecycle() == null ? null : event.lifecycle().name()),
|
||||
nullToEmpty(semanticSupportLifecycle(event)),
|
||||
normalizeTime(event == null ? null : event.occurredAt()),
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
|
||||
nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())),
|
||||
|
|
@ -217,6 +219,17 @@ public class RuntimeEventDescriptorFactory {
|
|||
);
|
||||
}
|
||||
|
||||
private String semanticSupportLifecycle(EventHubEventDto event) {
|
||||
if (event == null || event.lifecycle() == null) {
|
||||
return null;
|
||||
}
|
||||
if (event.eventDomain() == EventDomain.PLACE
|
||||
&& (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.BEGIN)) {
|
||||
return EventLifecycle.BEGIN.name();
|
||||
}
|
||||
return event.lifecycle().name();
|
||||
}
|
||||
|
||||
private JsonNode rawPayload(EventHubEventDto event) {
|
||||
return RuntimeEntityReferenceResolver.rawPayload(event);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,53 @@ import org.springframework.stereotype.Component;
|
|||
@Component
|
||||
public class RuntimeEventMixingRuleRegistry {
|
||||
|
||||
private static final Set<EventDomain> SUPPORT_EVENT_DOMAINS = Set.of(
|
||||
EventDomain.POSITION,
|
||||
EventDomain.PLACE,
|
||||
EventDomain.BORDER_CROSSING,
|
||||
EventDomain.LOAD_UNLOAD,
|
||||
EventDomain.SPECIFIC_CONDITION
|
||||
);
|
||||
|
||||
private static final Set<EventType> SUPPORT_EVENT_TYPES = Set.of(
|
||||
EventType.POSITION_RECORDED,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventType.BORDER_INBOUND,
|
||||
EventType.BORDER_OUTBOUND,
|
||||
EventType.BORDER_OUT_EU,
|
||||
EventType.LOAD,
|
||||
EventType.UNLOAD,
|
||||
EventType.LOAD_UNLOAD,
|
||||
EventType.OUT,
|
||||
EventType.FERRY_TRAIN
|
||||
);
|
||||
|
||||
private static final Set<EventLifecycle> SUPPORT_EVENT_LIFECYCLES = Set.of(
|
||||
EventLifecycle.SNAPSHOT,
|
||||
EventLifecycle.START,
|
||||
EventLifecycle.BEGIN,
|
||||
EventLifecycle.END,
|
||||
EventLifecycle.INBOUND,
|
||||
EventLifecycle.OUTBOUND,
|
||||
EventLifecycle.OUT_EU
|
||||
);
|
||||
|
||||
private static final Set<String> CARD_SUPPORT_EXTRACTION_CODES = Set.of(
|
||||
"CARD_POSITION",
|
||||
"CARD_PLACE",
|
||||
"CARD_BORDER_CROSSING",
|
||||
"CARD_LOAD_UNLOAD",
|
||||
"CARD_SPECIFIC_CONDITION"
|
||||
);
|
||||
|
||||
private static final Set<String> VU_SUPPORT_EXTRACTION_CODES = Set.of(
|
||||
"VU_POSITION",
|
||||
"VU_PLACE",
|
||||
"VU_BORDER_CROSSING",
|
||||
"VU_LOAD_UNLOAD",
|
||||
"VU_SPECIFIC_CONDITION"
|
||||
);
|
||||
|
||||
public List<RuntimeEventMixingRule> rulesForMode(String mode) {
|
||||
if (RuntimeEventMixingService.MODE_OFF.equals(mode)) {
|
||||
return List.of();
|
||||
|
|
@ -61,12 +108,11 @@ public class RuntimeEventMixingRuleRegistry {
|
|||
RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY,
|
||||
RuntimeEventMixingChannel.SUPPORT_EVIDENCE,
|
||||
RuntimeEventMixingRule.EQUIVALENCE_EXACT_EVENT_KEY,
|
||||
Set.of(EventDomain.POSITION, EventDomain.PLACE, EventDomain.BORDER_CROSSING),
|
||||
Set.of(EventType.POSITION_RECORDED, EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventType.BORDER_INBOUND, EventType.BORDER_OUTBOUND, EventType.BORDER_OUT_EU),
|
||||
Set.of(EventLifecycle.SNAPSHOT, EventLifecycle.INBOUND, EventLifecycle.OUTBOUND, EventLifecycle.OUT_EU),
|
||||
Set.of("CARD_POSITION", "CARD_PLACE", "CARD_BORDER_CROSSING"),
|
||||
Set.of("VU_POSITION", "VU_PLACE", "VU_BORDER_CROSSING"),
|
||||
SUPPORT_EVENT_DOMAINS,
|
||||
SUPPORT_EVENT_TYPES,
|
||||
SUPPORT_EVENT_LIFECYCLES,
|
||||
CARD_SUPPORT_EXTRACTION_CODES,
|
||||
VU_SUPPORT_EXTRACTION_CODES,
|
||||
RuntimeResolvedEventRole.FUSED_PRIMARY,
|
||||
RuntimeResolvedEventRole.SUPPRESSED_DUPLICATE,
|
||||
"FUSED_PRIMARY_SELECTED",
|
||||
|
|
@ -79,12 +125,11 @@ public class RuntimeEventMixingRuleRegistry {
|
|||
RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY,
|
||||
RuntimeEventMixingChannel.SUPPORT_EVIDENCE,
|
||||
RuntimeEventMixingRule.EQUIVALENCE_COMPATIBLE_SUPPORT_KEY,
|
||||
Set.of(EventDomain.POSITION, EventDomain.PLACE, EventDomain.BORDER_CROSSING),
|
||||
Set.of(EventType.POSITION_RECORDED, EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventType.BORDER_INBOUND, EventType.BORDER_OUTBOUND, EventType.BORDER_OUT_EU),
|
||||
Set.of(EventLifecycle.SNAPSHOT, EventLifecycle.INBOUND, EventLifecycle.OUTBOUND, EventLifecycle.OUT_EU),
|
||||
Set.of("CARD_POSITION", "CARD_PLACE", "CARD_BORDER_CROSSING"),
|
||||
Set.of("VU_POSITION", "VU_PLACE", "VU_BORDER_CROSSING"),
|
||||
SUPPORT_EVENT_DOMAINS,
|
||||
SUPPORT_EVENT_TYPES,
|
||||
SUPPORT_EVENT_LIFECYCLES,
|
||||
CARD_SUPPORT_EXTRACTION_CODES,
|
||||
VU_SUPPORT_EXTRACTION_CODES,
|
||||
RuntimeResolvedEventRole.FUSED_PRIMARY,
|
||||
RuntimeResolvedEventRole.SUPPRESSED_DUPLICATE,
|
||||
"FUSED_PRIMARY_SELECTED",
|
||||
|
|
|
|||
|
|
@ -46,6 +46,8 @@ public class DriverActivityIntervalsModule implements RuntimeEplModule {
|
|||
"Event to activity intervals",
|
||||
"EPL module that pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral driver activity intervals.",
|
||||
engine(),
|
||||
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
|
||||
Set.of("RuntimeMixedEventBundle.activityTimelineEvents"),
|
||||
Set.of("DriverActivityIntervalEvent")
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,6 +46,8 @@ public class DriverVehicleUsageIntervalsModule implements RuntimeEplModule {
|
|||
"Event to vehicle usage intervals",
|
||||
"EPL module that pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral driver vehicle-usage intervals.",
|
||||
engine(),
|
||||
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
|
||||
Set.of("RuntimeMixedEventBundle.vehicleUsageEvents"),
|
||||
Set.of("DriverVehicleUsageIntervalEvent")
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,8 @@ public class DriverVehicleUsageMergeModule implements RuntimeProcessingModule {
|
|||
"Vehicle usage merge",
|
||||
"Merges adjacent or continuous effective same-driver/same-vehicle usage intervals after source reconciliation.",
|
||||
"JAVA",
|
||||
Set.of(DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION),
|
||||
Set.of("RuntimeVehicleUsageReconciliationResult.effectiveVehicleUsageIntervals"),
|
||||
Set.of("DriverWorkingTimeVehicleUsageInterval")
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -53,11 +53,26 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
|
|||
|
||||
@Override
|
||||
public RuntimeProcessingModuleDescriptorDto descriptor() {
|
||||
if (legacyScopeProcessingService != null) {
|
||||
return new RuntimeProcessingModuleDescriptorDto(
|
||||
moduleKey(),
|
||||
"Driving-derived projections",
|
||||
"Executes the legacy driver working-time scope adapter. This compatibility mode performs upstream assembly internally.",
|
||||
"ESPER+JAVA",
|
||||
Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto")
|
||||
);
|
||||
}
|
||||
return new RuntimeProcessingModuleDescriptorDto(
|
||||
moduleKey(),
|
||||
"Driving-derived projections",
|
||||
"Executes the shared driver working-time core from typed per-driver module outputs for driving interruptions, rest candidates, card-absence coverage, overnight candidates, and trip candidates.",
|
||||
"ESPER+JAVA",
|
||||
Set.of(
|
||||
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS,
|
||||
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
|
||||
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION
|
||||
),
|
||||
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"),
|
||||
Set.of("UnifiedRuntimeDriverWorkingTimeScopeResultDto")
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,6 +41,8 @@ public class EventEvidenceMixingModule implements RuntimeProcessingModule {
|
|||
"Event evidence mixing",
|
||||
"Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged for vehicle-usage processing.",
|
||||
"JAVA",
|
||||
Set.of(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY),
|
||||
Set.of("UnifiedRuntimeEventBundle"),
|
||||
Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto")
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,8 @@ public class RuntimeEventAssemblyModule implements RuntimeProcessingModule {
|
|||
"Runtime event assembly",
|
||||
"Loads and merges canonical runtime events from the selected source scope before plan-specific processing.",
|
||||
"JAVA",
|
||||
Set.of(),
|
||||
Set.of("runtime source selection"),
|
||||
Set.of("UnifiedRuntimeEventBundle")
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,8 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.module;
|
||||
|
||||
public class RuntimeProcessingDependencyCycleException extends RuntimeProcessingDependencyException {
|
||||
|
||||
public RuntimeProcessingDependencyCycleException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.module;
|
||||
|
||||
public class RuntimeProcessingDependencyException extends IllegalArgumentException {
|
||||
|
||||
public RuntimeProcessingDependencyException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public RuntimeProcessingDependencyException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
|
|
@ -16,5 +16,25 @@ public interface RuntimeProcessingModule {
|
|||
|
||||
RuntimeProcessingModuleDescriptorDto descriptor();
|
||||
|
||||
default java.util.Set<String> requiredModuleKeys() {
|
||||
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
|
||||
return descriptor == null ? java.util.Set.of() : descriptor.requiredModuleKeys();
|
||||
}
|
||||
|
||||
default java.util.Set<String> optionalModuleKeys() {
|
||||
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
|
||||
return descriptor == null ? java.util.Set.of() : descriptor.optionalModuleKeys();
|
||||
}
|
||||
|
||||
default java.util.Set<String> consumedStreams() {
|
||||
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
|
||||
return descriptor == null ? java.util.Set.of() : descriptor.consumedStreams();
|
||||
}
|
||||
|
||||
default java.util.Set<String> producedStreams() {
|
||||
RuntimeProcessingModuleDescriptorDto descriptor = descriptor();
|
||||
return descriptor == null ? java.util.Set.of() : descriptor.producedStreams();
|
||||
}
|
||||
|
||||
RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,4 +18,27 @@ public record RuntimeProcessingModuleContext(
|
|||
attributes = attributes == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(attributes));
|
||||
previousResults = previousResults == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(previousResults));
|
||||
}
|
||||
|
||||
public RuntimeProcessingModuleResult requireResult(String moduleKey) {
|
||||
RuntimeProcessingModuleResult result = previousResults.get(moduleKey);
|
||||
if (result == null) {
|
||||
throw new RuntimeProcessingDependencyException("Required runtime processing module result is missing: " + moduleKey);
|
||||
}
|
||||
if (result.status() == RuntimeProcessingModuleStatus.FAILED) {
|
||||
throw new RuntimeProcessingDependencyException("Required runtime processing module failed: " + moduleKey);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public <T> T requireOutput(String moduleKey, Class<T> outputType) {
|
||||
RuntimeProcessingModuleResult result = requireResult(moduleKey);
|
||||
Object output = result.output();
|
||||
if (outputType.isInstance(output)) {
|
||||
return outputType.cast(output);
|
||||
}
|
||||
throw new RuntimeProcessingDependencyException("Required runtime processing module " + moduleKey
|
||||
+ " did not produce " + outputType.getName() + ". Actual: "
|
||||
+ (output == null ? "null" : output.getClass().getName()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,9 +16,11 @@ public class RuntimeProcessingPipelineExecutor {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(RuntimeProcessingPipelineExecutor.class);
|
||||
|
||||
private final RuntimeProcessingModuleRegistry moduleRegistry;
|
||||
private final RuntimeProcessingPlanResolver planResolver;
|
||||
|
||||
public RuntimeProcessingPipelineExecutor(RuntimeProcessingModuleRegistry moduleRegistry) {
|
||||
this.moduleRegistry = moduleRegistry;
|
||||
this.planResolver = new RuntimeProcessingPlanResolver(moduleRegistry);
|
||||
}
|
||||
|
||||
public Map<String, RuntimeProcessingModuleResult> execute(
|
||||
|
|
@ -30,7 +32,9 @@ public class RuntimeProcessingPipelineExecutor {
|
|||
RuntimeProcessingModuleContext context = initialContext == null
|
||||
? new RuntimeProcessingModuleContext(request, List.of(), Map.of(), Map.of())
|
||||
: initialContext;
|
||||
for (String moduleKey : moduleKeys == null ? List.<String>of() : moduleKeys) {
|
||||
List<String> resolvedModuleKeys = planResolver.resolve(moduleKeys == null ? List.<String>of() : moduleKeys);
|
||||
LOG.debug("Resolved runtime processing modules {} to execution order {}", moduleKeys, resolvedModuleKeys);
|
||||
for (String moduleKey : resolvedModuleKeys) {
|
||||
if (moduleKey == null || moduleKey.isBlank()) {
|
||||
continue;
|
||||
}
|
||||
|
|
@ -56,7 +60,7 @@ public class RuntimeProcessingPipelineExecutor {
|
|||
break;
|
||||
}
|
||||
}
|
||||
return Map.copyOf(results);
|
||||
return java.util.Collections.unmodifiableMap(new LinkedHashMap<>(results));
|
||||
}
|
||||
|
||||
private RuntimeProcessingModuleResult withExecutionDuration(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,90 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.module;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Resolves a requested runtime module list into an executable dependency order.
|
||||
*
|
||||
* <p>The resolver intentionally treats module dependencies as execution contracts, not as UI hints:
|
||||
* every required predecessor is inserted before the requested module, unknown modules fail fast, and
|
||||
* dependency cycles are rejected before any module is executed.</p>
|
||||
*/
|
||||
public class RuntimeProcessingPlanResolver {
|
||||
|
||||
private final RuntimeProcessingModuleRegistry moduleRegistry;
|
||||
|
||||
public RuntimeProcessingPlanResolver(RuntimeProcessingModuleRegistry moduleRegistry) {
|
||||
this.moduleRegistry = moduleRegistry;
|
||||
}
|
||||
|
||||
public List<String> resolve(List<String> requestedModuleKeys) {
|
||||
LinkedHashSet<String> requested = normalize(requestedModuleKeys);
|
||||
LinkedHashSet<String> resolved = new LinkedHashSet<>();
|
||||
LinkedHashSet<String> visiting = new LinkedHashSet<>();
|
||||
ArrayDeque<String> path = new ArrayDeque<>();
|
||||
|
||||
for (String moduleKey : requested) {
|
||||
visit(moduleKey, resolved, visiting, path);
|
||||
}
|
||||
return List.copyOf(resolved);
|
||||
}
|
||||
|
||||
private void visit(
|
||||
String moduleKey,
|
||||
LinkedHashSet<String> resolved,
|
||||
LinkedHashSet<String> visiting,
|
||||
ArrayDeque<String> path
|
||||
) {
|
||||
if (resolved.contains(moduleKey)) {
|
||||
return;
|
||||
}
|
||||
RuntimeProcessingModule module;
|
||||
try {
|
||||
module = moduleRegistry.require(moduleKey);
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new RuntimeProcessingDependencyException("Unknown runtime processing module requested or required: " + moduleKey, ex);
|
||||
}
|
||||
if (!visiting.add(moduleKey)) {
|
||||
throw new RuntimeProcessingDependencyCycleException("Runtime processing module dependency cycle detected: "
|
||||
+ cycleDescription(path, moduleKey));
|
||||
}
|
||||
path.addLast(moduleKey);
|
||||
for (String requiredModuleKey : module.requiredModuleKeys()) {
|
||||
visit(requiredModuleKey, resolved, visiting, path);
|
||||
}
|
||||
path.removeLast();
|
||||
visiting.remove(moduleKey);
|
||||
resolved.add(moduleKey);
|
||||
}
|
||||
|
||||
private LinkedHashSet<String> normalize(List<String> moduleKeys) {
|
||||
LinkedHashSet<String> normalized = new LinkedHashSet<>();
|
||||
if (moduleKeys != null) {
|
||||
for (String moduleKey : moduleKeys) {
|
||||
if (moduleKey != null && !moduleKey.isBlank()) {
|
||||
normalized.add(moduleKey.trim());
|
||||
}
|
||||
}
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
private String cycleDescription(ArrayDeque<String> path, String repeatedModuleKey) {
|
||||
List<String> cycle = new ArrayList<>();
|
||||
boolean inCycle = false;
|
||||
for (String element : path) {
|
||||
if (element.equals(repeatedModuleKey)) {
|
||||
inCycle = true;
|
||||
}
|
||||
if (inCycle) {
|
||||
cycle.add(element);
|
||||
}
|
||||
}
|
||||
cycle.add(repeatedModuleKey);
|
||||
return String.join(" -> ", cycle);
|
||||
}
|
||||
}
|
||||
|
|
@ -59,6 +59,11 @@ public class SupportEvidenceNormalizationModule implements RuntimeProcessingModu
|
|||
"Support evidence normalization",
|
||||
"Normalizes mixed-source support evidence into typed per-driver working-time inputs for the common processing core.",
|
||||
"JAVA",
|
||||
Set.of(
|
||||
DriverWorkingTimeModuleKeys.VEHICLE_EVIDENCE_ATTACHMENT,
|
||||
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS
|
||||
),
|
||||
Set.of("Map<String, DriverWorkingTimeDriverPartition>", "DriverActivityIntervalEvent"),
|
||||
Set.of("Map<String, DriverWorkingTimePreparedInput>")
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,6 +57,12 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
|
|||
"Vehicle evidence attachment",
|
||||
"Partitions the broad runtime scope by driver and attaches vehicle-only evidence using merged vehicle-usage intervals from the common pipeline.",
|
||||
"JAVA",
|
||||
Set.of(
|
||||
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
|
||||
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
|
||||
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE
|
||||
),
|
||||
Set.of("UnifiedRuntimeEventBundle", "RuntimeMixedEventBundle.driverPartitionEvents", "DriverWorkingTimeVehicleUsageInterval"),
|
||||
Set.of("Map<String, DriverWorkingTimeDriverPartition>")
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ public class VehicleUsageReconciliationModule implements RuntimeProcessingModule
|
|||
"Vehicle usage reconciliation",
|
||||
"Normalizes CARD_VEHICLES_USED technical midnight splits, then reconciles normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary for effective vehicle usage; CARD_VEHICLES_USED remains fallback or corroborating evidence.",
|
||||
"JAVA",
|
||||
Set.of(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS),
|
||||
Set.of("DriverVehicleUsageIntervalEvent"),
|
||||
Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent")
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -118,6 +118,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"Runtime event assembly",
|
||||
"Loads and merges canonical runtime events from the selected source scope before plan-specific processing.",
|
||||
"JAVA",
|
||||
Set.of(),
|
||||
Set.of("runtime source selection"),
|
||||
Set.of("UnifiedRuntimeEventBundle")
|
||||
));
|
||||
descriptors.add(new RuntimeProcessingModuleDescriptorDto(
|
||||
|
|
@ -125,6 +127,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"Event evidence mixing",
|
||||
"Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged.",
|
||||
"JAVA",
|
||||
Set.of(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY),
|
||||
Set.of("UnifiedRuntimeEventBundle"),
|
||||
Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto")
|
||||
));
|
||||
}
|
||||
|
|
@ -134,6 +138,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"Event to activity intervals",
|
||||
"Pairs canonical DRIVER_ACTIVITY START/END point events into source-neutral activity intervals using a first-class EPL module.",
|
||||
"ESPER",
|
||||
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
|
||||
Set.of("RuntimeMixedEventBundle.activityTimelineEvents"),
|
||||
Set.of("DriverActivityIntervalEvent")
|
||||
),
|
||||
new RuntimeProcessingModuleDescriptorDto(
|
||||
|
|
@ -141,6 +147,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"Event to vehicle usage intervals",
|
||||
"Pairs canonical driver/card INSERT/WITHDRAW point events into source-neutral vehicle-usage intervals using a first-class EPL module.",
|
||||
"ESPER",
|
||||
Set.of(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING),
|
||||
Set.of("RuntimeMixedEventBundle.vehicleUsageEvents"),
|
||||
Set.of("DriverVehicleUsageIntervalEvent")
|
||||
),
|
||||
new RuntimeProcessingModuleDescriptorDto(
|
||||
|
|
@ -148,6 +156,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"Vehicle usage reconciliation",
|
||||
"Coalesces CARD_VEHICLES_USED technical midnight splits before reconciling normalized CARD_VEHICLES_USED intervals with IW_CYCLE intervals. IW_CYCLE is primary; CARD_VEHICLES_USED is fallback or corroborating evidence.",
|
||||
"JAVA",
|
||||
Set.of(DriverWorkingTimeModuleKeys.EVENT_TO_VEHICLE_USAGE_INTERVALS),
|
||||
Set.of("DriverVehicleUsageIntervalEvent"),
|
||||
Set.of("RuntimeVehicleUsageReconciliationResult", "DriverVehicleUsageIntervalEvent")
|
||||
),
|
||||
new RuntimeProcessingModuleDescriptorDto(
|
||||
|
|
@ -155,6 +165,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"Vehicle usage merge",
|
||||
"Merges adjacent effective same-driver/same-vehicle usage intervals after vehicle-usage source reconciliation.",
|
||||
"JAVA",
|
||||
Set.of(DriverWorkingTimeModuleKeys.VEHICLE_USAGE_RECONCILIATION),
|
||||
Set.of("RuntimeVehicleUsageReconciliationResult.effectiveVehicleUsageIntervals"),
|
||||
Set.of("DriverVehicleUsageIntervalEvent")
|
||||
),
|
||||
new RuntimeProcessingModuleDescriptorDto(
|
||||
|
|
@ -162,6 +174,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"Vehicle evidence attachment",
|
||||
"Attaches vehicle-only events to driver partitions when they overlap driver vehicle-usage intervals.",
|
||||
"JAVA",
|
||||
Set.of(
|
||||
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
|
||||
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
|
||||
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE
|
||||
),
|
||||
Set.of("UnifiedRuntimeEventBundle", "RuntimeMixedEventBundle.driverPartitionEvents", "DriverWorkingTimeVehicleUsageInterval"),
|
||||
Set.of("RuntimeDriverPartitionDebugDto")
|
||||
),
|
||||
new RuntimeProcessingModuleDescriptorDto(
|
||||
|
|
@ -169,6 +187,11 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"Support evidence normalization",
|
||||
"Normalizes attached mixed-source support evidence for driver working-time processing.",
|
||||
"JAVA",
|
||||
Set.of(
|
||||
DriverWorkingTimeModuleKeys.VEHICLE_EVIDENCE_ATTACHMENT,
|
||||
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS
|
||||
),
|
||||
Set.of("Map<String, DriverWorkingTimeDriverPartition>", "DriverActivityIntervalEvent"),
|
||||
Set.of("RuntimeSupportEvidenceNormalizationDebugDto")
|
||||
),
|
||||
new RuntimeProcessingModuleDescriptorDto(
|
||||
|
|
@ -176,6 +199,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"Driving-derived projections",
|
||||
"Runs the shared driver working-time derived projection module for driving interruptions, rest candidates, trips, and overnight candidates.",
|
||||
"ESPER+JAVA",
|
||||
Set.of(
|
||||
DriverWorkingTimeModuleKeys.EVENT_TO_ACTIVITY_INTERVALS,
|
||||
DriverWorkingTimeModuleKeys.VEHICLE_USAGE_MERGE,
|
||||
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION
|
||||
),
|
||||
Set.of("DriverActivityIntervalEvent", "DriverWorkingTimeVehicleUsageInterval", "Map<String, DriverWorkingTimePreparedInput>"),
|
||||
Set.of("DriverWorkingTimeProcessingResultDto")
|
||||
)
|
||||
));
|
||||
|
|
@ -255,12 +284,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
Map.of()
|
||||
);
|
||||
|
||||
List<String> executedModules = requestedOrDefaultModules(request.modules());
|
||||
List<String> requestedModules = requestedOrDefaultModules(request.modules());
|
||||
Map<String, RuntimeProcessingModuleResult> moduleResults = pipelineExecutor.execute(
|
||||
request,
|
||||
executedModules,
|
||||
requestedModules,
|
||||
initialContext
|
||||
);
|
||||
List<String> executedModules = List.copyOf(moduleResults.keySet());
|
||||
UnifiedRuntimeDriverWorkingTimeScopeResultDto workingTimeResult = extractWorkingTimeResult(moduleResults);
|
||||
|
||||
Map<String, RuntimeEventProcessingPartitionResultDto> partitionResults = new LinkedHashMap<>();
|
||||
|
|
@ -527,6 +557,9 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
}
|
||||
|
||||
private List<String> requestedOrDefaultModules(List<String> requestedModules) {
|
||||
if (!includeRuntimeEventAssemblyModule) {
|
||||
return List.of(DriverWorkingTimeModuleKeys.DRIVING_DERIVED_PROJECTIONS);
|
||||
}
|
||||
if (requestedModules != null && !requestedModules.isEmpty()) {
|
||||
LinkedHashMap<String, String> requested = new LinkedHashMap<>();
|
||||
for (String module : requestedModules) {
|
||||
|
|
|
|||
|
|
@ -7,7 +7,11 @@ public record RuntimeProcessingModuleDescriptorDto(
|
|||
String displayName,
|
||||
String description,
|
||||
String engine,
|
||||
Set<String> producedStreams
|
||||
Set<String> requiredModuleKeys,
|
||||
Set<String> optionalModuleKeys,
|
||||
Set<String> consumedStreams,
|
||||
Set<String> producedStreams,
|
||||
String version
|
||||
) {
|
||||
public RuntimeProcessingModuleDescriptorDto {
|
||||
if (moduleKey == null || moduleKey.isBlank()) {
|
||||
|
|
@ -17,6 +21,42 @@ public record RuntimeProcessingModuleDescriptorDto(
|
|||
displayName = displayName == null || displayName.isBlank() ? moduleKey : displayName.trim();
|
||||
description = description == null ? "" : description;
|
||||
engine = engine == null || engine.isBlank() ? "UNKNOWN" : engine.trim();
|
||||
requiredModuleKeys = normalizeKeys(requiredModuleKeys);
|
||||
optionalModuleKeys = normalizeKeys(optionalModuleKeys);
|
||||
consumedStreams = consumedStreams == null ? Set.of() : Set.copyOf(consumedStreams);
|
||||
producedStreams = producedStreams == null ? Set.of() : Set.copyOf(producedStreams);
|
||||
version = version == null || version.isBlank() ? "1" : version.trim();
|
||||
}
|
||||
|
||||
public RuntimeProcessingModuleDescriptorDto(
|
||||
String moduleKey,
|
||||
String displayName,
|
||||
String description,
|
||||
String engine,
|
||||
Set<String> producedStreams
|
||||
) {
|
||||
this(moduleKey, displayName, description, engine, Set.of(), Set.of(), Set.of(), producedStreams, "1");
|
||||
}
|
||||
|
||||
public RuntimeProcessingModuleDescriptorDto(
|
||||
String moduleKey,
|
||||
String displayName,
|
||||
String description,
|
||||
String engine,
|
||||
Set<String> requiredModuleKeys,
|
||||
Set<String> consumedStreams,
|
||||
Set<String> producedStreams
|
||||
) {
|
||||
this(moduleKey, displayName, description, engine, requiredModuleKeys, Set.of(), consumedStreams, producedStreams, "1");
|
||||
}
|
||||
|
||||
private static Set<String> normalizeKeys(Set<String> keys) {
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
return Set.of();
|
||||
}
|
||||
return keys.stream()
|
||||
.filter(key -> key != null && !key.isBlank())
|
||||
.map(String::trim)
|
||||
.collect(java.util.stream.Collectors.toUnmodifiableSet());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -148,7 +148,7 @@ class RuntimeEventMixingServiceTest {
|
|||
"TACHOGRAPH:CARD_PLACE:1",
|
||||
EventDomain.PLACE,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
EventLifecycle.START,
|
||||
false
|
||||
);
|
||||
EventHubEventDto vuPlace = supportEvidence(
|
||||
|
|
@ -157,7 +157,7 @@ class RuntimeEventMixingServiceTest {
|
|||
"TACHOGRAPH:VU_PLACE:2",
|
||||
EventDomain.PLACE,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
EventLifecycle.START,
|
||||
true
|
||||
);
|
||||
EventHubEventDto cardBorder = supportEvidence(
|
||||
|
|
@ -188,6 +188,72 @@ class RuntimeEventMixingServiceTest {
|
|||
assertThat(mixed.eventMixingDecisions()).hasSize(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void suppressesVuPlaceDuplicateForFileSessionBeginLifecycle() {
|
||||
EventHubEventDto cardPlace = fileSessionSupportEvidence(
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-place-1:BEGIN:2026-04-01T00:00:00Z",
|
||||
EventDomain.PLACE,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventLifecycle.BEGIN,
|
||||
false
|
||||
);
|
||||
EventHubEventDto vuPlace = fileSessionSupportEvidence(
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-place-1:BEGIN:2026-04-01T00:00:00Z",
|
||||
EventDomain.PLACE,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventLifecycle.BEGIN,
|
||||
true
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(
|
||||
List.of(cardPlace, vuPlace),
|
||||
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
|
||||
);
|
||||
|
||||
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-place-1:BEGIN:2026-04-01T00:00:00Z");
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-place-1:BEGIN:2026-04-01T00:00:00Z");
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void treatsStartAndBeginAsEquivalentPlaceLifecycles() {
|
||||
EventHubEventDto cardPlace = supportEvidence(
|
||||
"CARD_PLACE",
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH:CARD_PLACE:START",
|
||||
EventDomain.PLACE,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventLifecycle.START,
|
||||
false
|
||||
);
|
||||
EventHubEventDto vuPlace = supportEvidence(
|
||||
"VU_PLACE",
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH:VU_PLACE:BEGIN",
|
||||
EventDomain.PLACE,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventLifecycle.BEGIN,
|
||||
true
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(
|
||||
List.of(cardPlace, vuPlace),
|
||||
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
|
||||
);
|
||||
|
||||
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:CARD_PLACE:START");
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:VU_PLACE:BEGIN");
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(1);
|
||||
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
|
||||
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY);
|
||||
}
|
||||
|
||||
@Test
|
||||
void suppressesTachographFileSessionVuPositionDuplicateWhenNoExtractionCodeIsPresent() {
|
||||
EventHubEventDto card = fileSessionSupportEvidence(
|
||||
|
|
@ -220,6 +286,143 @@ class RuntimeEventMixingServiceTest {
|
|||
.containsExactly("VU_POSITION");
|
||||
}
|
||||
|
||||
@Test
|
||||
void suppressesVuDuplicatesForAllAdditionalSupportEventTypes() {
|
||||
EventHubEventDto cardLoad = supportEvidence(
|
||||
"CARD_LOAD_UNLOAD", "DRIVER_CARD", "TACHOGRAPH:CARD_LOAD_UNLOAD:LOAD",
|
||||
EventDomain.LOAD_UNLOAD, EventType.LOAD, EventLifecycle.SNAPSHOT, false
|
||||
);
|
||||
EventHubEventDto vuLoad = supportEvidence(
|
||||
"VU_LOAD_UNLOAD", "VEHICLE_UNIT", "TACHOGRAPH:VU_LOAD_UNLOAD:LOAD",
|
||||
EventDomain.LOAD_UNLOAD, EventType.LOAD, EventLifecycle.SNAPSHOT, true
|
||||
);
|
||||
EventHubEventDto cardUnload = supportEvidence(
|
||||
"CARD_LOAD_UNLOAD", "DRIVER_CARD", "TACHOGRAPH:CARD_LOAD_UNLOAD:UNLOAD",
|
||||
EventDomain.LOAD_UNLOAD, EventType.UNLOAD, EventLifecycle.SNAPSHOT, false
|
||||
);
|
||||
EventHubEventDto vuUnload = supportEvidence(
|
||||
"VU_LOAD_UNLOAD", "VEHICLE_UNIT", "TACHOGRAPH:VU_LOAD_UNLOAD:UNLOAD",
|
||||
EventDomain.LOAD_UNLOAD, EventType.UNLOAD, EventLifecycle.SNAPSHOT, true
|
||||
);
|
||||
EventHubEventDto cardLoadUnload = supportEvidence(
|
||||
"CARD_LOAD_UNLOAD", "DRIVER_CARD", "TACHOGRAPH:CARD_LOAD_UNLOAD:LOAD_UNLOAD",
|
||||
EventDomain.LOAD_UNLOAD, EventType.LOAD_UNLOAD, EventLifecycle.SNAPSHOT, false
|
||||
);
|
||||
EventHubEventDto vuLoadUnload = supportEvidence(
|
||||
"VU_LOAD_UNLOAD", "VEHICLE_UNIT", "TACHOGRAPH:VU_LOAD_UNLOAD:LOAD_UNLOAD",
|
||||
EventDomain.LOAD_UNLOAD, EventType.LOAD_UNLOAD, EventLifecycle.SNAPSHOT, true
|
||||
);
|
||||
EventHubEventDto cardOut = supportEvidence(
|
||||
"CARD_SPECIFIC_CONDITION", "DRIVER_CARD", "TACHOGRAPH:CARD_SPECIFIC_CONDITION:OUT:BEGIN",
|
||||
EventDomain.SPECIFIC_CONDITION, EventType.OUT, EventLifecycle.BEGIN, false
|
||||
);
|
||||
EventHubEventDto vuOut = supportEvidence(
|
||||
"VU_SPECIFIC_CONDITION", "VEHICLE_UNIT", "TACHOGRAPH:VU_SPECIFIC_CONDITION:OUT:BEGIN",
|
||||
EventDomain.SPECIFIC_CONDITION, EventType.OUT, EventLifecycle.BEGIN, true
|
||||
);
|
||||
EventHubEventDto cardFerryTrain = supportEvidence(
|
||||
"CARD_SPECIFIC_CONDITION", "DRIVER_CARD", "TACHOGRAPH:CARD_SPECIFIC_CONDITION:FERRY_TRAIN:END",
|
||||
EventDomain.SPECIFIC_CONDITION, EventType.FERRY_TRAIN, EventLifecycle.END, false
|
||||
);
|
||||
EventHubEventDto vuFerryTrain = supportEvidence(
|
||||
"VU_SPECIFIC_CONDITION", "VEHICLE_UNIT", "TACHOGRAPH:VU_SPECIFIC_CONDITION:FERRY_TRAIN:END",
|
||||
EventDomain.SPECIFIC_CONDITION, EventType.FERRY_TRAIN, EventLifecycle.END, true
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(
|
||||
List.of(
|
||||
cardLoad, vuLoad,
|
||||
cardUnload, vuUnload,
|
||||
cardLoadUnload, vuLoadUnload,
|
||||
cardOut, vuOut,
|
||||
cardFerryTrain, vuFerryTrain
|
||||
),
|
||||
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
|
||||
);
|
||||
|
||||
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactlyInAnyOrder(
|
||||
"TACHOGRAPH:CARD_LOAD_UNLOAD:LOAD",
|
||||
"TACHOGRAPH:CARD_LOAD_UNLOAD:UNLOAD",
|
||||
"TACHOGRAPH:CARD_LOAD_UNLOAD:LOAD_UNLOAD",
|
||||
"TACHOGRAPH:CARD_SPECIFIC_CONDITION:OUT:BEGIN",
|
||||
"TACHOGRAPH:CARD_SPECIFIC_CONDITION:FERRY_TRAIN:END"
|
||||
);
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactlyInAnyOrder(
|
||||
"TACHOGRAPH:VU_LOAD_UNLOAD:LOAD",
|
||||
"TACHOGRAPH:VU_LOAD_UNLOAD:UNLOAD",
|
||||
"TACHOGRAPH:VU_LOAD_UNLOAD:LOAD_UNLOAD",
|
||||
"TACHOGRAPH:VU_SPECIFIC_CONDITION:OUT:BEGIN",
|
||||
"TACHOGRAPH:VU_SPECIFIC_CONDITION:FERRY_TRAIN:END"
|
||||
);
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(5);
|
||||
assertThat(mixed.eventMixingDecisions())
|
||||
.allSatisfy(decision -> assertThat(decision.secondaryExtractionCodes()).hasSize(1));
|
||||
assertThat(mixed.eventMixingDecisions())
|
||||
.extracting(decision -> decision.secondaryExtractionCodes().getFirst())
|
||||
.containsExactlyInAnyOrder(
|
||||
"VU_LOAD_UNLOAD", "VU_LOAD_UNLOAD", "VU_LOAD_UNLOAD",
|
||||
"VU_SPECIFIC_CONDITION", "VU_SPECIFIC_CONDITION"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void suppressesFileSessionVuLoadUnloadAndSpecificConditionDuplicatesWithoutExtractionCodes() {
|
||||
EventHubEventDto cardLoad = fileSessionSupportEvidence(
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-load-1:SNAPSHOT:2026-04-01T00:00:00Z",
|
||||
EventDomain.LOAD_UNLOAD,
|
||||
EventType.LOAD,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
false
|
||||
);
|
||||
EventHubEventDto vuLoad = fileSessionSupportEvidence(
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-load-1:SNAPSHOT:2026-04-01T00:00:00Z",
|
||||
EventDomain.LOAD_UNLOAD,
|
||||
EventType.LOAD,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
true
|
||||
);
|
||||
EventHubEventDto cardOut = fileSessionSupportEvidence(
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-out-1:BEGIN:2026-04-01T00:00:00Z",
|
||||
EventDomain.SPECIFIC_CONDITION,
|
||||
EventType.OUT,
|
||||
EventLifecycle.BEGIN,
|
||||
false
|
||||
);
|
||||
EventHubEventDto vuOut = fileSessionSupportEvidence(
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-out-1:BEGIN:2026-04-01T00:00:00Z",
|
||||
EventDomain.SPECIFIC_CONDITION,
|
||||
EventType.OUT,
|
||||
EventLifecycle.BEGIN,
|
||||
true
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(
|
||||
List.of(cardLoad, vuLoad, cardOut, vuOut),
|
||||
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
|
||||
);
|
||||
|
||||
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactlyInAnyOrder(
|
||||
cardLoad.externalSourceEventId(),
|
||||
cardOut.externalSourceEventId()
|
||||
);
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactlyInAnyOrder(
|
||||
vuLoad.externalSourceEventId(),
|
||||
vuOut.externalSourceEventId()
|
||||
);
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(2);
|
||||
assertThat(mixed.eventMixingDecisions())
|
||||
.extracting(decision -> decision.secondaryExtractionCodes().getFirst())
|
||||
.containsExactlyInAnyOrder("VU_LOAD_UNLOAD", "VU_SPECIFIC_CONDITION");
|
||||
}
|
||||
|
||||
private EventHubEventDto activity(String extractionCode, String sourceKind, String externalId) {
|
||||
ObjectNode raw = baseRaw(extractionCode, sourceKind);
|
||||
raw.put("activityType", "BREAK_REST");
|
||||
|
|
@ -264,6 +467,9 @@ class RuntimeEventMixingServiceTest {
|
|||
raw.put("region", "W");
|
||||
raw.put("countryFrom", "AT");
|
||||
raw.put("countryTo", "DE");
|
||||
if (domain == EventDomain.LOAD_UNLOAD) {
|
||||
raw.put("operation", type.name());
|
||||
}
|
||||
ObjectNode payload = JsonNodeFactory.instance.objectNode();
|
||||
payload.set("raw", raw);
|
||||
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
|
||||
|
|
@ -271,6 +477,9 @@ class RuntimeEventMixingServiceTest {
|
|||
attributes.put("region", "W");
|
||||
attributes.put("countryFrom", "AT");
|
||||
attributes.put("countryTo", "DE");
|
||||
if (domain == EventDomain.LOAD_UNLOAD) {
|
||||
attributes.put("operation", type.name());
|
||||
}
|
||||
VehicleRefDto vehicleRef = withVin
|
||||
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
|
||||
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));
|
||||
|
|
@ -330,6 +539,9 @@ class RuntimeEventMixingServiceTest {
|
|||
raw.put("region", "W");
|
||||
raw.put("countryFrom", "AT");
|
||||
raw.put("countryTo", "DE");
|
||||
if (domain == EventDomain.LOAD_UNLOAD) {
|
||||
raw.put("operation", type.name());
|
||||
}
|
||||
ObjectNode payload = JsonNodeFactory.instance.objectNode();
|
||||
payload.set("raw", raw);
|
||||
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
|
||||
|
|
@ -337,6 +549,9 @@ class RuntimeEventMixingServiceTest {
|
|||
attributes.put("region", "W");
|
||||
attributes.put("countryFrom", "AT");
|
||||
attributes.put("countryTo", "DE");
|
||||
if (domain == EventDomain.LOAD_UNLOAD) {
|
||||
attributes.put("operation", type.name());
|
||||
}
|
||||
VehicleRefDto vehicleRef = withVin
|
||||
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
|
||||
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.module;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
|
||||
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
|
||||
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
|
||||
|
|
@ -20,7 +21,57 @@ class RuntimeProcessingPipelineExecutorTest {
|
|||
);
|
||||
|
||||
Map<String, RuntimeProcessingModuleResult> results = executor.execute(
|
||||
new RuntimeProcessingExecutionApiRequest(
|
||||
request(),
|
||||
List.of("first", "second"),
|
||||
null
|
||||
);
|
||||
|
||||
assertThat(results).containsKeys("first", "second");
|
||||
assertThat(results.get("first").metadata()).containsEntry("marker", "first");
|
||||
assertThat(results.get("first").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
|
||||
assertThat(results.get("second").metadata()).containsEntry("marker", "second");
|
||||
assertThat(results.get("second").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
|
||||
assertThat(((Number) results.get("first").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
|
||||
.isGreaterThanOrEqualTo(0L);
|
||||
assertThat(((Number) results.get("second").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
|
||||
.isGreaterThanOrEqualTo(0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void insertsRequiredModulesBeforeRequestedModule() {
|
||||
RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor(
|
||||
new RuntimeProcessingModuleRegistry(List.of(
|
||||
new TestModule("assembly"),
|
||||
new TestModule("mixing", "assembly"),
|
||||
new TestModule("final", "mixing")
|
||||
))
|
||||
);
|
||||
|
||||
Map<String, RuntimeProcessingModuleResult> results = executor.execute(
|
||||
request(),
|
||||
List.of("final"),
|
||||
null
|
||||
);
|
||||
|
||||
assertThat(results.keySet()).containsExactly("assembly", "mixing", "final");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rejectsDependencyCyclesBeforeExecution() {
|
||||
RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor(
|
||||
new RuntimeProcessingModuleRegistry(List.of(
|
||||
new TestModule("a", "b"),
|
||||
new TestModule("b", "a")
|
||||
))
|
||||
);
|
||||
|
||||
assertThatThrownBy(() -> executor.execute(request(), List.of("a"), null))
|
||||
.isInstanceOf(RuntimeProcessingDependencyCycleException.class)
|
||||
.hasMessageContaining("a -> b -> a");
|
||||
}
|
||||
|
||||
private RuntimeProcessingExecutionApiRequest request() {
|
||||
return new RuntimeProcessingExecutionApiRequest(
|
||||
"driver-working-time-v1",
|
||||
new at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest(
|
||||
java.util.UUID.randomUUID(),
|
||||
|
|
@ -62,28 +113,18 @@ class RuntimeProcessingPipelineExecutorTest {
|
|||
),
|
||||
List.of(),
|
||||
Map.of()
|
||||
),
|
||||
List.of("first", "second"),
|
||||
null
|
||||
);
|
||||
|
||||
assertThat(results).containsKeys("first", "second");
|
||||
assertThat(results.get("first").metadata()).containsEntry("marker", "first");
|
||||
assertThat(results.get("first").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
|
||||
assertThat(results.get("second").metadata()).containsEntry("marker", "second");
|
||||
assertThat(results.get("second").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
|
||||
assertThat(((Number) results.get("first").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
|
||||
.isGreaterThanOrEqualTo(0L);
|
||||
assertThat(((Number) results.get("second").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
|
||||
.isGreaterThanOrEqualTo(0L);
|
||||
}
|
||||
|
||||
private static final class TestModule implements RuntimeProcessingModule {
|
||||
|
||||
private final String key;
|
||||
private final java.util.Set<String> requiredKeys;
|
||||
|
||||
private TestModule(String key) {
|
||||
private TestModule(String key, String... requiredKeys) {
|
||||
this.key = key;
|
||||
this.requiredKeys = java.util.Arrays.stream(requiredKeys)
|
||||
.collect(java.util.stream.Collectors.toUnmodifiableSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -98,6 +139,8 @@ class RuntimeProcessingPipelineExecutorTest {
|
|||
key,
|
||||
"test",
|
||||
"JAVA",
|
||||
requiredKeys,
|
||||
java.util.Set.of(),
|
||||
java.util.Set.of()
|
||||
);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue