Refine runtime event assembly scope

This commit is contained in:
trifonovt 2026-06-15 11:14:17 +02:00
parent c066c5c777
commit c1b4847cf0
14 changed files with 192 additions and 32 deletions

View File

@ -22,6 +22,12 @@ POST /api/eventhub/runtime-processing/event-processing
but it should be treated as a legacy profile adapter. New clients should use `processingPlanKey`, not `profileKey`.
## Assembly terminology
`runtime-event-assembly` collects driver seed events and, when enabled, additional events for vehicles discovered in the driver scope. The resulting collection is an **aggregated runtime scope**. It is only canonically de-duplicated and ordered at this stage; semantic card/VU evidence mixing and vehicle-usage interval reconciliation are performed by later modules.
For API compatibility, `UnifiedRuntimeEventBundle` still exposes this collection as `mergedEvents`. New internal assembly-stage code uses the alias `aggregatedEvents`, and assembly-module metadata exposes `aggregatedEventCount` while retaining `mergedEventCount` as a compatibility alias.
## First predefined plan
The first predefined plan is:

View File

@ -129,7 +129,7 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
UnifiedRuntimeDriverWorkingTimeScopeResultDto result = new UnifiedRuntimeDriverWorkingTimeScopeResultDto(
broadBundle.request(),
broadBundle.mergedEvents().size(),
broadBundle.aggregatedEventCount(),
driverResults.size(),
broadBundle.discoveredVehicles().size(),
broadBundle.discoveredVehicles(),

View File

@ -89,7 +89,7 @@ public class EventEvidenceMixingModule implements RuntimeProcessingModule {
private List<EventHubEventDto> sourceEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult assemblyResult = context.previousResults().get(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY);
if (assemblyResult != null && assemblyResult.output() instanceof UnifiedRuntimeEventBundle bundle) {
return bundle.mergedEvents();
return bundle.aggregatedEvents();
}
return context.events();
}

View File

@ -28,7 +28,7 @@ public class RuntimeEventAssemblyModule implements RuntimeProcessingModule {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Runtime event assembly",
"Loads and merges canonical runtime events from the selected source scope before plan-specific processing.",
"Loads driver seed events and optionally aggregates additional events for discovered vehicles into a broad runtime scope before plan-specific mixing and reconciliation.",
"JAVA",
Set.of(),
Set.of("runtime source selection"),
@ -45,7 +45,9 @@ public class RuntimeEventAssemblyModule implements RuntimeProcessingModule {
metadata.put("vehicleExpansionPaddingMinutes", scopeRequest.vehicleExpansionPaddingMinutes() == null ? 0 : scopeRequest.vehicleExpansionPaddingMinutes());
metadata.put("driverSeedEventCount", bundle.driverSeedEvents().size());
metadata.put("expandedVehicleEventCount", bundle.expandedVehicleEvents().size());
metadata.put("mergedEventCount", bundle.mergedEvents().size());
metadata.put("aggregatedEventCount", bundle.aggregatedEventCount());
// Compatibility alias retained for existing API clients and stored execution metadata.
metadata.put("mergedEventCount", bundle.aggregatedEventCount());
metadata.put("discoveredVehicleCount", bundle.discoveredVehicles().size());
return new RuntimeProcessingModuleResult(
moduleKey(),

View File

@ -141,7 +141,9 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
.mapToInt(partition -> partition.attachedVehicleEvidenceEvents().size())
.sum());
metadata.put("partitionSourceEventCount", partitionSourceEvents.size());
metadata.put("rawMergedEventCount", broadBundle.mergedEvents().size());
metadata.put("rawAggregatedEventCount", broadBundle.aggregatedEventCount());
// Compatibility alias retained for existing execution metadata consumers.
metadata.put("rawMergedEventCount", broadBundle.aggregatedEventCount());
return new RuntimeProcessingModuleResult(
moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS,

View File

@ -34,7 +34,7 @@ public final class DriverWorkingTimeEplEventMapper {
public static List<EventHubEventDto> sourceEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult assemblyResult = context.previousResults().get(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY);
if (assemblyResult != null && assemblyResult.output() instanceof UnifiedRuntimeEventBundle bundle) {
return safeList(bundle.mergedEvents());
return safeList(bundle.aggregatedEvents());
}
return safeList(context.events());
}

View File

@ -116,7 +116,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
descriptors.add(new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
"Runtime event assembly",
"Loads and merges canonical runtime events from the selected source scope before plan-specific processing.",
"Loads driver seed events and optionally aggregates additional events for discovered vehicles into a broad runtime scope before plan-specific mixing and reconciliation.",
"JAVA",
Set.of(),
Set.of("runtime source selection"),

View File

@ -1,8 +1,18 @@
package at.procon.eventhub.processing.model;
import at.procon.eventhub.dto.EventHubEventDto;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
/**
* Runtime event scope assembled for subsequent processing modules.
*
* <p>The legacy {@code mergedEvents} component contains the event aggregation represented by this
* bundle. For the assembly-stage bundle, this is the broad scope of driver seed events and optionally
* expanded vehicle events after canonical de-duplication and ordering; it is not the later semantic
* card/VU mixing or interval-reconciliation result. New assembly-stage code should prefer
* {@link #aggregatedEvents()}.
*/
public record UnifiedRuntimeEventBundle(
UnifiedRuntimeProcessingRequest request,
List<EventHubEventDto> driverSeedEvents,
@ -18,4 +28,19 @@ public record UnifiedRuntimeEventBundle(
mergedEvents = mergedEvents == null ? List.of() : List.copyOf(mergedEvents);
notes = notes == null ? List.of() : List.copyOf(notes);
}
/**
* Preferred terminology for the broad event scope assembled before plan-specific mixing.
*
* @return the same immutable list exposed by the legacy {@link #mergedEvents()} accessor
*/
@JsonIgnore
public List<EventHubEventDto> aggregatedEvents() {
return mergedEvents;
}
@JsonIgnore
public int aggregatedEventCount() {
return mergedEvents.size();
}
}

View File

@ -50,7 +50,7 @@ public class RuntimeDriverWorkingTimeScopeProcessingService {
) {
UnifiedRuntimeProcessingRequest request = apiRequest.toRuntimeRequest();
UnifiedRuntimeEventBundle broadBundle = eventAssemblyService.assembleDriverScopedEvents(request);
LinkedHashSet<String> selectedDriverKeys = selectedDriverKeys(request, broadBundle.mergedEvents());
LinkedHashSet<String> selectedDriverKeys = selectedDriverKeys(request, broadBundle.aggregatedEvents());
if (selectedDriverKeys.isEmpty()) {
throw new IllegalArgumentException("No driver partitions could be resolved from the runtime event scope.");
}
@ -107,7 +107,7 @@ public class RuntimeDriverWorkingTimeScopeProcessingService {
return new UnifiedRuntimeDriverWorkingTimeScopeResultDto(
request,
broadBundle.mergedEvents().size(),
broadBundle.aggregatedEventCount(),
driverResults.size(),
broadBundle.discoveredVehicles().size(),
broadBundle.discoveredVehicles(),
@ -151,13 +151,13 @@ public class RuntimeDriverWorkingTimeScopeProcessingService {
String driverKey,
boolean includePartitionDebug
) {
List<EventHubEventDto> directDriverEvents = broadBundle.mergedEvents().stream()
List<EventHubEventDto> directDriverEvents = broadBundle.aggregatedEvents().stream()
.filter(event -> Objects.equals(driverKey(event), driverKey))
.toList();
RuntimeDriverVehicleEvidenceAttachmentResult attachmentResult = vehicleEvidenceAttachmentService.attachVehicleEvidence(
driverKey,
directDriverEvents,
broadBundle.mergedEvents(),
broadBundle.aggregatedEvents(),
request.expandVehicleEvents(),
request.vehicleExpansionPaddingMinutes(),
includePartitionDebug

View File

@ -83,11 +83,11 @@ public class UnifiedRuntimeDerivedProjectionService {
String explicitDriverKey
) {
String driverKey = explicitDriverKey == null
? resolveDriverKey(request, eventBundle.mergedEvents())
? resolveDriverKey(request, eventBundle.aggregatedEvents())
: explicitDriverKey;
RuntimeSupportEvidenceNormalizationResult normalizationResult = supportEvidenceNormalizer.normalizeForDriverWorkingTime(
driverKey,
eventBundle.mergedEvents()
eventBundle.aggregatedEvents()
);
List<EventHubEventDto> normalizedEvents = normalizationResult.normalizedEvents();
RuntimeDriverTimeline timeline = timelineReconstructor.reconstruct(
@ -169,7 +169,7 @@ public class UnifiedRuntimeDerivedProjectionService {
eventBundle.driverSeedEvents().size(),
eventBundle.discoveredVehicles().size(),
eventBundle.expandedVehicleEvents().size(),
eventBundle.mergedEvents().size(),
eventBundle.aggregatedEventCount(),
eventBundle.discoveredVehicles(),
projection,
notes,

View File

@ -26,8 +26,8 @@ public class UnifiedRuntimeDriverTimelineService {
UnifiedRuntimeEventBundle bundle = runtimeEventAssemblyService.assembleDriverScopedEvents(request);
return timelineReconstructor.reconstruct(
null,
resolveDriverKey(request, bundle.mergedEvents()),
bundle.mergedEvents()
resolveDriverKey(request, bundle.aggregatedEvents()),
bundle.aggregatedEvents()
);
}

View File

@ -42,7 +42,7 @@ public class UnifiedRuntimeEventAssemblyService {
List<EventHubEventDto> expandedVehicleEvents = expandVehicleEvents
? loadExpandedVehicleEvents(request, discoveredVehicles)
: List.of();
List<EventHubEventDto> mergedEvents = expandVehicleEvents
List<EventHubEventDto> aggregatedEvents = expandVehicleEvents
? deduplicateAndSort(driverSeedEvents, expandedVehicleEvents)
: driverSeedEvents;
@ -71,26 +71,27 @@ public class UnifiedRuntimeEventAssemblyService {
}
}
if (expandVehicleEvents) {
notes.add("Vehicle expansion loaded additional events for vehicles discovered in the driver seed set.");
notes.add("Vehicle expansion aggregated additional events for vehicles discovered in the driver seed set.");
notes.add("Vehicle expansion padding minutes: " + request.vehicleExpansionPaddingMinutes() + ".");
} else {
notes.add("Vehicle expansion was disabled for this runtime request.");
}
notes.add("The assembled event set is a broad aggregated runtime scope; semantic card/VU mixing and interval reconciliation are performed by later modules.");
LOG.info(
"Runtime event assembly completed (expandVehicleEvents: {}, sourceInputs: {}, driverSeedEvents: {}, discoveredVehicles: {}, expandedVehicleEvents: {}, mergedEvents: {})",
"Runtime event assembly completed (expandVehicleEvents: {}, sourceInputs: {}, driverSeedEvents: {}, discoveredVehicles: {}, expandedVehicleEvents: {}, aggregatedEvents: {})",
expandVehicleEvents,
sourceInputs.size(),
driverSeedEvents.size(),
discoveredVehicles.size(),
expandedVehicleEvents.size(),
mergedEvents.size()
aggregatedEvents.size()
);
return new UnifiedRuntimeEventBundle(
request,
driverSeedEvents,
discoveredVehicles,
expandedVehicleEvents,
mergedEvents,
aggregatedEvents,
notes
);
}

View File

@ -0,0 +1,118 @@
package at.procon.eventhub.processing.eventprocessing.module;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.service.UnifiedRuntimeEventAssemblyService;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class RuntimeEventAssemblyModuleTest {
@Test
void exposesAggregatedTerminologyAndKeepsMergedCountCompatibilityAlias() {
UnifiedRuntimeEventAssemblyService service =
org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class);
RuntimeEventAssemblyModule module = new RuntimeEventAssemblyModule(service);
UnifiedRuntimeProcessingApiRequest scope = scopeRequest();
EventHubEventDto first = event("EVENT-1", "2026-05-01T08:00:00Z");
EventHubEventDto second = event("EVENT-2", "2026-05-01T09:00:00Z");
UnifiedRuntimeEventBundle bundle = new UnifiedRuntimeEventBundle(
scope.toRuntimeRequest(),
List.of(first),
List.of(),
List.of(second),
List.of(first, second),
List.of("assembled")
);
when(service.assembleDriverScopedEvents(any())).thenReturn(bundle);
RuntimeProcessingModuleResult result = module.execute(new RuntimeProcessingModuleContext(
new RuntimeProcessingExecutionApiRequest(
"driver-working-time-v1",
scope,
null,
List.of(),
Map.of()
),
List.of(),
Map.of("runtimeScopeApiRequest", scope),
Map.of()
));
assertThat(module.descriptor().description())
.contains("aggregates additional events")
.contains("mixing and reconciliation");
assertThat(result.output()).isSameAs(bundle);
assertThat(result.metadata())
.containsEntry("aggregatedEventCount", 2)
.containsEntry("mergedEventCount", 2);
}
private UnifiedRuntimeProcessingApiRequest scopeRequest() {
return new UnifiedRuntimeProcessingApiRequest(
UUID.randomUUID(),
List.of(),
null,
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:123",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15,
true,
null,
null,
null,
null,
List.of()
);
}
private EventHubEventDto event(String externalId, String occurredAt) {
OffsetDateTime timestamp = OffsetDateTime.parse(occurredAt);
return new EventHubEventDto(
UUID.randomUUID(),
externalId,
new DriverRefDto("12:123", null),
null,
timestamp,
null,
timestamp,
EventDomain.DRIVER_ACTIVITY,
EventType.DRIVE,
EventLifecycle.START,
null,
null,
null,
null,
null,
false,
null
);
}
}

View File

@ -64,8 +64,14 @@ class UnifiedRuntimeEventAssemblyServiceTest {
assertThat(bundle.discoveredVehicles()).extracting(UnifiedDiscoveredVehicleRef::stableKey)
.containsExactly("SOURCE_VEHICLE:VEH-1", "VIN:VIN-2");
assertThat(bundle.expandedVehicleEvents()).hasSize(2);
assertThat(bundle.mergedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
assertThat(bundle.aggregatedEvents()).hasSize(3);
assertThat(bundle.aggregatedEventCount()).isEqualTo(3);
assertThat(bundle.mergedEvents()).isSameAs(bundle.aggregatedEvents());
assertThat(bundle.notes()).anySatisfy(note -> assertThat(note)
.contains("broad aggregated runtime scope")
.contains("mixing")
.contains("reconciliation"));
assertThat(bundle.aggregatedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("SEED-1", "VEHICLE-EXPANDED", "SEED-2");
}
@ -93,7 +99,7 @@ class UnifiedRuntimeEventAssemblyServiceTest {
assertThat(bundle.driverSeedEvents()).hasSize(2);
assertThat(bundle.discoveredVehicles()).hasSize(2);
assertThat(bundle.expandedVehicleEvents()).isEmpty();
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
assertThat(bundle.aggregatedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("SEED-1", "SEED-2");
}
@ -119,10 +125,10 @@ class UnifiedRuntimeEventAssemblyServiceTest {
);
assertThat(bundle.driverSeedEvents()).hasSize(1);
assertThat(bundle.mergedEvents()).hasSize(1);
assertThat(bundle.mergedEvents().get(0).occurredAt()).isEqualTo(OffsetDateTime.parse("2026-05-01T08:00:00Z"));
assertThat(bundle.mergedEvents().get(0).eventType()).isEqualTo(EventType.DRIVE);
assertThat(bundle.mergedEvents().get(0).lifecycle()).isEqualTo(EventLifecycle.START);
assertThat(bundle.aggregatedEvents()).hasSize(1);
assertThat(bundle.aggregatedEvents().get(0).occurredAt()).isEqualTo(OffsetDateTime.parse("2026-05-01T08:00:00Z"));
assertThat(bundle.aggregatedEvents().get(0).eventType()).isEqualTo(EventType.DRIVE);
assertThat(bundle.aggregatedEvents().get(0).lifecycle()).isEqualTo(EventLifecycle.START);
}
@Test
@ -166,8 +172,8 @@ class UnifiedRuntimeEventAssemblyServiceTest {
);
assertThat(bundle.driverSeedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
assertThat(bundle.aggregatedEvents()).hasSize(3);
assertThat(bundle.aggregatedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("FILE-SESSION-1", "TACHO-DB-1", "YF-DB-1");
}
@ -231,8 +237,8 @@ class UnifiedRuntimeEventAssemblyServiceTest {
);
assertThat(bundle.driverSeedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
assertThat(bundle.aggregatedEvents()).hasSize(3);
assertThat(bundle.aggregatedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("FILE-SESSION-1", "TACHO-ACT-1", "YF-DB-1");
assertThat(bundle.notes()).anySatisfy(note -> assertThat(note).contains("mixed runtime scope"));
}