Fix runtime processing build regressions

This commit is contained in:
trifonovt 2026-05-26 13:17:46 +02:00
parent 27b411e647
commit 5c887e8cb2
4 changed files with 256 additions and 14 deletions

View File

@ -91,7 +91,7 @@ public class RuntimeDriverVehicleEvidenceAttachmentService {
? usageIntervals.stream().map(RuntimeVehicleUsageIntervalDebugDto::from).toList() ? usageIntervals.stream().map(RuntimeVehicleUsageIntervalDebugDto::from).toList()
: List.of(); : List.of();
List<RuntimeVehicleEvidenceAttachmentDecisionDto> decisions = includeDebug List<RuntimeVehicleEvidenceAttachmentDecisionDto> decisions = includeDebug
? directDriverDecisions(safeDriverEvents) ? new ArrayList<>(directDriverDecisions(safeDriverEvents))
: new ArrayList<>(); : new ArrayList<>();
List<EventHubEventDto> candidateVehicleEvidence = safeScopeEvents.stream() List<EventHubEventDto> candidateVehicleEvidence = safeScopeEvents.stream()
.filter(event -> scopeClassifier.classify(event) == RuntimeEventScopeType.VEHICLE_SCOPED) .filter(event -> scopeClassifier.classify(event) == RuntimeEventScopeType.VEHICLE_SCOPED)

View File

@ -34,7 +34,7 @@ public final class TachographNationRegistry {
continue; continue;
} }
String[] parts = line.split(";", -1); String[] parts = line.split(";", -1);
if (parts.length < 5) { if (parts.length < 4) {
continue; continue;
} }
Integer numericCode = parseInteger(parts[3]); Integer numericCode = parseInteger(parts[3]);
@ -43,7 +43,7 @@ public final class TachographNationRegistry {
} }
String name = normalizeNullable(parts[1]); String name = normalizeNullable(parts[1]);
String alphaCode = normalizeAlpha(parts[2]); String alphaCode = normalizeAlpha(parts[2]);
String defaultLanguageCode = normalizeNullable(parts[4]); String defaultLanguageCode = parts.length > 4 ? normalizeNullable(parts[4]) : null;
NationRecord record = new NationRecord(numericCode, alphaCode, name, defaultLanguageCode, true); NationRecord record = new NationRecord(numericCode, alphaCode, name, defaultLanguageCode, true);
byNumeric.put(numericCode, record); byNumeric.put(numericCode, record);
if (alphaCode != null) { if (alphaCode != null) {

View File

@ -61,23 +61,36 @@ public class DriverTimelineReusableProjectionBuilder {
private final DriverTimelineBuilder driverTimelineBuilder; private final DriverTimelineBuilder driverTimelineBuilder;
private final RawSourceDriverTimelineEventBuilder rawSourceEventBuilder; private final RawSourceDriverTimelineEventBuilder rawSourceEventBuilder;
private final UnifiedEventTimelineReconstructor timelineReconstructor;
private final EventHubProperties properties; private final EventHubProperties properties;
public DriverTimelineReusableProjectionBuilder( public DriverTimelineReusableProjectionBuilder(
DriverTimelineBuilder driverTimelineBuilder, DriverTimelineBuilder driverTimelineBuilder,
EventHubProperties properties EventHubProperties properties
) { ) {
this(driverTimelineBuilder, null, properties); this(driverTimelineBuilder, null, new UnifiedEventTimelineReconstructor(), properties);
}
public DriverTimelineReusableProjectionBuilder(
DriverTimelineBuilder driverTimelineBuilder,
RawSourceDriverTimelineEventBuilder rawSourceEventBuilder,
EventHubProperties properties
) {
this(driverTimelineBuilder, rawSourceEventBuilder, new UnifiedEventTimelineReconstructor(), properties);
} }
@Autowired @Autowired
public DriverTimelineReusableProjectionBuilder( public DriverTimelineReusableProjectionBuilder(
DriverTimelineBuilder driverTimelineBuilder, DriverTimelineBuilder driverTimelineBuilder,
RawSourceDriverTimelineEventBuilder rawSourceEventBuilder, RawSourceDriverTimelineEventBuilder rawSourceEventBuilder,
UnifiedEventTimelineReconstructor timelineReconstructor,
EventHubProperties properties EventHubProperties properties
) { ) {
this.driverTimelineBuilder = driverTimelineBuilder; this.driverTimelineBuilder = driverTimelineBuilder;
this.rawSourceEventBuilder = rawSourceEventBuilder; this.rawSourceEventBuilder = rawSourceEventBuilder;
this.timelineReconstructor = timelineReconstructor == null
? new UnifiedEventTimelineReconstructor()
: timelineReconstructor;
this.properties = properties; this.properties = properties;
} }
@ -119,16 +132,35 @@ public class DriverTimelineReusableProjectionBuilder {
} }
if (drivingDerivedProjectionInputMode() == EventHubProperties.DrivingDerivedProjectionInputMode.EVENTS) { if (drivingDerivedProjectionInputMode() == EventHubProperties.DrivingDerivedProjectionInputMode.EVENTS) {
List<EventHubEventDto> events = new ArrayList<>(); List<Map<String, Object>> activityInputEvents = new ArrayList<>();
List<Map<String, Object>> vehicleUsageInputEvents = new ArrayList<>();
List<Map<String, Object>> supportGeoInputEvents = new ArrayList<>();
for (DriverExtractionSession driverSession : session.driversByKey().values()) { for (DriverExtractionSession driverSession : session.driversByKey().values()) {
if (driverSession != null && driverSession.driverKey() != null) { if (driverSession == null || driverSession.driverKey() == null) {
events.addAll(rawSourceEventBuilder().buildRawEventBundle(session, driverSession).allEvents()); continue;
} }
ResolvedDriverTimeline timeline = reconstructMergedTimelineFromEvents(
session.sessionId(),
driverSession.driverKey(),
rawSourceEventBuilder().buildRawEventBundle(session, driverSession).allEvents()
);
if (timeline == null) {
continue;
}
activityInputEvents.addAll(buildActivityIntervalInputEvents(
session.sessionId(),
driverSession.driverKey(),
timeline.activityIntervals()
));
vehicleUsageInputEvents.addAll(buildVehicleUsageIntervalInputEvents(timeline.vehicleUsageIntervals()));
supportGeoInputEvents.addAll(buildSupportGeoInputEvents(session.sessionId(), timeline.supportEvents()));
} }
return buildEsperDrivingDerivedProjectionBundleFromEvents(
session.sessionId(), return buildEsperDrivingDerivedProjectionBundle(
null, activityInputEvents,
events, vehicleUsageInputEvents,
supportGeoInputEvents,
significantDrivingMinutes, significantDrivingMinutes,
minimumRestPeriodMinutes minimumRestPeriodMinutes
); );
@ -303,6 +335,23 @@ public class DriverTimelineReusableProjectionBuilder {
int significantDrivingMinutes, int significantDrivingMinutes,
int minimumRestPeriodMinutes int minimumRestPeriodMinutes
) { ) {
if (fallbackDriverKey == null) {
Map<String, List<EventHubEventDto>> eventsByDriver = groupEventsByDriverKey(events);
if (eventsByDriver.size() > 1) {
return buildEsperDrivingDerivedProjectionBundleFromGroupedEvents(
fallbackSessionId,
eventsByDriver,
significantDrivingMinutes,
minimumRestPeriodMinutes
);
}
if (eventsByDriver.size() == 1) {
Map.Entry<String, List<EventHubEventDto>> onlyDriver = eventsByDriver.entrySet().iterator().next();
fallbackDriverKey = onlyDriver.getKey();
events = onlyDriver.getValue();
}
}
ResolvedDriverTimeline reconstructedTimeline = reconstructMergedTimelineFromEvents( ResolvedDriverTimeline reconstructedTimeline = reconstructMergedTimelineFromEvents(
fallbackSessionId, fallbackSessionId,
fallbackDriverKey, fallbackDriverKey,
@ -317,12 +366,73 @@ public class DriverTimelineReusableProjectionBuilder {
); );
} }
private TachographEsperDrivingDerivedProjectionBundle buildEsperDrivingDerivedProjectionBundleFromGroupedEvents(
UUID fallbackSessionId,
Map<String, List<EventHubEventDto>> eventsByDriver,
int significantDrivingMinutes,
int minimumRestPeriodMinutes
) {
List<Map<String, Object>> activityInputEvents = new ArrayList<>();
List<Map<String, Object>> vehicleUsageInputEvents = new ArrayList<>();
List<Map<String, Object>> supportGeoInputEvents = new ArrayList<>();
UUID sessionId = fallbackSessionId == null ? new UUID(0L, 0L) : fallbackSessionId;
for (Map.Entry<String, List<EventHubEventDto>> entry : eventsByDriver.entrySet()) {
String driverKey = entry.getKey();
if (driverKey == null || driverKey.isBlank()) {
continue;
}
ResolvedDriverTimeline timeline = reconstructMergedTimelineFromEvents(
sessionId,
driverKey,
entry.getValue()
);
if (timeline == null) {
continue;
}
activityInputEvents.addAll(buildActivityIntervalInputEvents(
sessionId,
driverKey,
timeline.activityIntervals()
));
vehicleUsageInputEvents.addAll(buildVehicleUsageIntervalInputEvents(timeline.vehicleUsageIntervals()));
supportGeoInputEvents.addAll(buildSupportGeoInputEvents(sessionId, timeline.supportEvents()));
}
return buildEsperDrivingDerivedProjectionBundle(
activityInputEvents,
vehicleUsageInputEvents,
supportGeoInputEvents,
significantDrivingMinutes,
minimumRestPeriodMinutes
);
}
private Map<String, List<EventHubEventDto>> groupEventsByDriverKey(List<EventHubEventDto> events) {
Map<String, List<EventHubEventDto>> grouped = new LinkedHashMap<>();
for (EventHubEventDto event : safeList(events)) {
String driverKey = eventDriverKey(event);
if (driverKey == null || driverKey.isBlank()) {
continue;
}
grouped.computeIfAbsent(driverKey, ignored -> new ArrayList<>()).add(event);
}
return grouped;
}
private String eventDriverKey(EventHubEventDto event) {
if (event == null) {
return null;
}
JsonNode raw = rawPayload(event);
return firstNonBlank(text(raw, "driverKey"), driverKey(event));
}
private ResolvedDriverTimeline reconstructMergedTimelineFromEvents( private ResolvedDriverTimeline reconstructMergedTimelineFromEvents(
UUID fallbackSessionId, UUID fallbackSessionId,
String fallbackDriverKey, String fallbackDriverKey,
List<EventHubEventDto> events List<EventHubEventDto> events
) { ) {
UnifiedEventTimelineReconstructor timelineReconstructor = new UnifiedEventTimelineReconstructor();
ResolvedDriverTimeline reconstructed = timelineReconstructor.reconstruct( ResolvedDriverTimeline reconstructed = timelineReconstructor.reconstruct(
fallbackSessionId == null ? new UUID(0L, 0L) : fallbackSessionId, fallbackSessionId == null ? new UUID(0L, 0L) : fallbackSessionId,
fallbackDriverKey, fallbackDriverKey,
@ -365,7 +475,7 @@ public class DriverTimelineReusableProjectionBuilder {
current.from(), current.from(),
mergedTo(current.to(), next.to()), mergedTo(current.to(), next.to()),
current.odometerBeginKm(), current.odometerBeginKm(),
next.odometerEndKm() != null ? next.odometerEndKm() : current.odometerEndKm(), mergedOdometerEnd(current, next),
current.registrationKey(), current.registrationKey(),
current.vehicleKey(), current.vehicleKey(),
sourceKind, sourceKind,
@ -387,6 +497,25 @@ public class DriverTimelineReusableProjectionBuilder {
&& !right.from().isAfter(mergeBoundary(left.to())); && !right.from().isAfter(mergeBoundary(left.to()));
} }
private Long mergedOdometerEnd(
ResolvedVehicleUsageInterval current,
ResolvedVehicleUsageInterval next
) {
if (current == null) {
return next == null ? null : next.odometerEndKm();
}
if (next == null) {
return current.odometerEndKm();
}
if (current.to() == null || next.to() == null) {
return next.odometerEndKm() != null ? next.odometerEndKm() : current.odometerEndKm();
}
if (next.to().isAfter(current.to()) || next.to().isEqual(current.to())) {
return next.odometerEndKm() != null ? next.odometerEndKm() : current.odometerEndKm();
}
return current.odometerEndKm() != null ? current.odometerEndKm() : next.odometerEndKm();
}
private OffsetDateTime mergedTo(OffsetDateTime left, OffsetDateTime right) { private OffsetDateTime mergedTo(OffsetDateTime left, OffsetDateTime right) {
if (left == null || right == null) { if (left == null || right == null) {
return null; return null;

View File

@ -21,6 +21,9 @@ import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingR
import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingProfileDescriptorDto; import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingProfileDescriptorDto;
import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingPartitionResultDto; import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingPartitionResultDto;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionResultDto;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionService;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingPlanDescriptorDto;
import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographDriverParityResultDto; import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographDriverParityResultDto;
import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographParityCategoryComparisonDto; import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographParityCategoryComparisonDto;
import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographParityValidationResultDto; import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographParityValidationResultDto;
@ -267,6 +270,114 @@ class UnifiedRuntimeProcessingControllerTest {
} }
@Test
void listsRuntimeProcessingPlansViaRuntimeApi() throws Exception {
UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class);
UnifiedRuntimeDriverTimelineService timelineService = org.mockito.Mockito.mock(UnifiedRuntimeDriverTimelineService.class);
UnifiedRuntimeDerivedProjectionService derivedProjectionService = org.mockito.Mockito.mock(UnifiedRuntimeDerivedProjectionService.class);
RuntimeProcessingExecutionService executionService = org.mockito.Mockito.mock(RuntimeProcessingExecutionService.class);
MockMvc mockMvc = MockMvcBuilders.standaloneSetup(new UnifiedRuntimeProcessingController(
eventAssemblyService,
timelineService,
derivedProjectionService,
null,
null,
null,
null,
executionService
))
.setMessageConverters(new MappingJackson2HttpMessageConverter(objectMapper))
.setControllerAdvice(new UnifiedRuntimeProcessingExceptionHandler())
.build();
when(executionService.listPlans())
.thenReturn(List.of(new RuntimeProcessingPlanDescriptorDto(
"driver-working-time-v1",
Set.of("tachograph-driver-esper-v1"),
"Driver working-time and tachograph-derived processing",
"Runs common runtime event processing modules.",
RuntimeEventPartitioningStrategy.DRIVER,
List.of(RuntimeEventPartitioningStrategy.DRIVER),
List.of(),
Set.of(),
Set.of("significantDrivingMinutes")
)));
mockMvc.perform(get("/api/eventhub/runtime-processing/executions/plans"))
.andExpect(status().isOk())
.andExpect(jsonPath("$[0].processingPlanKey").value("driver-working-time-v1"))
.andExpect(jsonPath("$[0].aliases[0]").value("tachograph-driver-esper-v1"));
}
@Test
void runsRuntimeProcessingExecutionViaRuntimeApi() throws Exception {
UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class);
UnifiedRuntimeDriverTimelineService timelineService = org.mockito.Mockito.mock(UnifiedRuntimeDriverTimelineService.class);
UnifiedRuntimeDerivedProjectionService derivedProjectionService = org.mockito.Mockito.mock(UnifiedRuntimeDerivedProjectionService.class);
RuntimeProcessingExecutionService executionService = org.mockito.Mockito.mock(RuntimeProcessingExecutionService.class);
MockMvc mockMvc = MockMvcBuilders.standaloneSetup(new UnifiedRuntimeProcessingController(
eventAssemblyService,
timelineService,
derivedProjectionService,
null,
null,
null,
null,
executionService
))
.setMessageConverters(new MappingJackson2HttpMessageConverter(objectMapper))
.setControllerAdvice(new UnifiedRuntimeProcessingExceptionHandler())
.build();
UUID sessionId = UUID.randomUUID();
UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forTachographFileSession(
sessionId,
"12:123",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
true,
0
);
when(executionService.execute(any()))
.thenReturn(new RuntimeProcessingExecutionResultDto(
"driver-working-time-v1",
List.of("event-to-activity-intervals"),
RuntimeEventPartitioningStrategy.DRIVER,
request,
3,
1,
1,
List.of(new UnifiedDiscoveredVehicleRef("VEH-1", "VIN-1", "12", "REG-1")),
Map.of(),
List.of("execution"),
List.of()
));
mockMvc.perform(post("/api/eventhub/runtime-processing/executions")
.contentType("application/json")
.content("""
{
"processingPlanKey": "driver-working-time-v1",
"sourceSelection": {
"sessionId": "%s",
"sourceFamilies": ["TACHOGRAPH_FILE_SESSION"],
"driverKey": "12:123",
"occurredFrom": "2026-05-01T08:00:00Z",
"occurredTo": "2026-05-01T10:00:00Z"
},
"partitioning": {
"strategy": "DRIVER"
}
}
""".formatted(sessionId)))
.andExpect(status().isOk())
.andExpect(jsonPath("$.processingPlanKey").value("driver-working-time-v1"))
.andExpect(jsonPath("$.executedModules[0]").value("event-to-activity-intervals"))
.andExpect(jsonPath("$.partitioningStrategy").value("DRIVER"));
}
@Test @Test
void listsRuntimeEventProcessingProfilesViaRuntimeApi() throws Exception { void listsRuntimeEventProcessingProfilesViaRuntimeApi() throws Exception {
UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class); UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class);
@ -465,7 +576,9 @@ class UnifiedRuntimeProcessingControllerTest {
derivedProjectionService, derivedProjectionService,
null, null,
runtimeEventProcessingService, runtimeEventProcessingService,
parityValidationService parityValidationService,
null,
null
)) ))
.setMessageConverters(new MappingJackson2HttpMessageConverter(objectMapper)) .setMessageConverters(new MappingJackson2HttpMessageConverter(objectMapper))
.setControllerAdvice(new UnifiedRuntimeProcessingExceptionHandler()) .setControllerAdvice(new UnifiedRuntimeProcessingExceptionHandler())