From 6ba2df1a619a26f74ae4c08d56cbf6c95411ef28 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Fri, 5 Jun 2026 12:09:35 +0200 Subject: [PATCH] Add mixed-backend runtime source inputs --- .../runtime-driver-working-time-processing.md | 28 +- docs/runtime-event-processing.md | 27 +- ...ntime-tachograph-esper-scope-processing.md | 2 + ...e-event-processing.postman_collection.json | 46 ++- .../UnifiedRuntimeProcessingApiRequest.java | 64 +++- .../UnifiedRuntimeSourceInputApiRequest.java | 25 ++ .../SupportEvidenceNormalizationModule.java | 4 +- ...riverWorkingTimeRuntimeProcessingPlan.java | 3 +- .../UnifiedRuntimeProcessingRequest.java | 249 +++++++++++++- .../model/UnifiedRuntimeSourceInput.java | 53 +++ ...nifiedRuntimeDerivedProjectionService.java | 4 +- .../UnifiedRuntimeEventAssemblyService.java | 77 +++-- ...nifiedRuntimeProcessingControllerTest.java | 86 +++++ .../UnifiedRuntimeProcessingRequestTest.java | 134 ++++++++ ...nifiedRuntimeEventAssemblyServiceTest.java | 305 ++++++++++++++++++ 15 files changed, 1044 insertions(+), 63 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeSourceInputApiRequest.java create mode 100644 src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeSourceInput.java diff --git a/docs/runtime-driver-working-time-processing.md b/docs/runtime-driver-working-time-processing.md index 532a324..96b5619 100644 --- a/docs/runtime-driver-working-time-processing.md +++ b/docs/runtime-driver-working-time-processing.md @@ -12,10 +12,36 @@ Use: ```json { - "processingPlanKey": "driver-working-time-v1" + "processingPlanKey": "driver-working-time-v1", + "sourceSelection": { + "tenantKey": "default", + "driverKey": "12:12345678901234", + "occurredFrom": "2026-05-01T00:00:00Z", + "occurredTo": "2026-05-31T23:59:59Z", + "sourceInputs": [ + { + "sourceFamily": "TACHOGRAPH_FILE_SESSION", + "eventBackend": "SOURCE_DB", + "sessionIds": [ + "11111111-1111-1111-1111-111111111111", + "22222222-2222-2222-2222-222222222222" + ] + }, + { + "sourceFamily": "TACHOGRAPH_DB", + "eventBackend": "EVENTHUB_DB" + }, + { + "sourceFamily": "YELLOWFOX_DB", + "eventBackend": "SOURCE_DB" + } + ] + } } ``` +Use `sourceInputs` when one runtime request must mix direct-source and EventHub-backed inputs. Keep `sourceFamilies + eventBackend` only for legacy same-backend scopes. + ## Canonical input idea The plan should work with events from any source once they are normalized into canonical EventHub event semantics: diff --git a/docs/runtime-event-processing.md b/docs/runtime-event-processing.md index 5715510..0f5f720 100644 --- a/docs/runtime-event-processing.md +++ b/docs/runtime-event-processing.md @@ -54,13 +54,28 @@ The important design point is that runtime processing is not tachograph-specific { "processingPlanKey": "driver-working-time-v1", "sourceSelection": { - "sessionIds": [ - "11111111-1111-1111-1111-111111111111", - "22222222-2222-2222-2222-222222222222" - ], - "sourceFamilies": ["TACHOGRAPH_FILE_SESSION", "YELLOWFOX_DB"], + "tenantKey": "default", + "driverKey": "12:12345678901234", "occurredFrom": "2026-05-01T00:00:00Z", "occurredTo": "2026-05-31T23:59:59Z", + "sourceInputs": [ + { + "sourceFamily": "TACHOGRAPH_FILE_SESSION", + "eventBackend": "SOURCE_DB", + "sessionIds": [ + "11111111-1111-1111-1111-111111111111", + "22222222-2222-2222-2222-222222222222" + ] + }, + { + "sourceFamily": "TACHOGRAPH_DB", + "eventBackend": "EVENTHUB_DB" + }, + { + "sourceFamily": "YELLOWFOX_DB", + "eventBackend": "SOURCE_DB" + } + ], "expandVehicleEvents": true }, "partitioning": { @@ -78,6 +93,8 @@ The important design point is that runtime processing is not tachograph-specific } ``` +`sourceFamilies + eventBackend` is still accepted as a legacy shorthand when every selected family uses the same backend. Use `sourceInputs` when one runtime request must mix direct-source and EventHub-backed inputs. + ## Conceptual flow ```text diff --git a/docs/runtime-tachograph-esper-scope-processing.md b/docs/runtime-tachograph-esper-scope-processing.md index 75788d8..9f3db0a 100644 --- a/docs/runtime-tachograph-esper-scope-processing.md +++ b/docs/runtime-tachograph-esper-scope-processing.md @@ -40,6 +40,8 @@ with: also remains available, but new clients should use `processingPlanKey` and `/executions`. +When the runtime scope must mix file sessions, direct source databases, and EventHub-backed canonical events in one request, use `/executions` with per-source `sourceInputs`. The tachograph compatibility endpoints remain useful for file-session-centric scopes, but they are no longer the best shape for mixed-source runtime execution. + The current `driver-working-time-v1` plan uses these modules: ```text diff --git a/postman/eventhub-runtime-event-processing.postman_collection.json b/postman/eventhub-runtime-event-processing.postman_collection.json index c08db6b..0685e04 100644 --- a/postman/eventhub-runtime-event-processing.postman_collection.json +++ b/postman/eventhub-runtime-event-processing.postman_collection.json @@ -24,6 +24,38 @@ { "key": "driverKey", "value": "12:12345678901234" + }, + { + "key": "sessionId1", + "value": "11111111-1111-1111-1111-111111111111" + }, + { + "key": "sessionId2", + "value": "22222222-2222-2222-2222-222222222222" + }, + { + "key": "tenantKey", + "value": "default" + }, + { + "key": "driverSourceEntityId", + "value": "DRIVER:42" + }, + { + "key": "driverCardNation", + "value": "12" + }, + { + "key": "driverCardNumber", + "value": "12345678901234" + }, + { + "key": "occurredFrom", + "value": "2026-05-01T00:00:00Z" + }, + { + "key": "occurredTo", + "value": "2026-05-31T23:59:59Z" } ], "item": [ @@ -132,7 +164,7 @@ } }, { - "name": "Execute processing plan - driver working time source DB mixed sources", + "name": "Execute processing plan - driver working time mixed backends and sources", "request": { "method": "POST", "header": [ @@ -155,7 +187,7 @@ }, "body": { "mode": "raw", - "raw": "{\n \"processingPlanKey\": \"driver-working-time-v1\",\n \"sourceSelection\": {\n \"tenantKey\": \"{{tenantKey}}\",\n \"sourceFamilies\": [\n \"TACHOGRAPH_DB\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"SOURCE_DB\",\n \"driverSourceEntityId\": \"{{driverSourceEntityId}}\",\n \"driverCardNation\": \"{{driverCardNation}}\",\n \"driverCardNumber\": \"{{driverCardNumber}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": false,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n}" + "raw": "{\n \"processingPlanKey\": \"driver-working-time-v1\",\n \"sourceSelection\": {\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": false,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n}" } } }, @@ -334,7 +366,7 @@ ], "body": { "mode": "raw", - "raw": "{\n \"processingRequest\": {\n \"profileKey\": \"tachograph-driver-esper-v1\",\n \"scope\": {\n \"sessionIds\": [\n \"{{sessionId1}}\",\n \"{{sessionId2}}\"\n ],\n \"sourceFamilies\": [\n \"TACHOGRAPH_FILE_SESSION\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"SOURCE_DB\",\n \"occurredFrom\": \"2026-05-01T00:00:00Z\",\n \"occurredTo\": \"2026-05-31T23:59:59Z\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": true,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n },\n \"minimumAttachedVehicleEvidenceEvents\": 1,\n \"minimumNormalizedSupportEvidenceEvents\": 1,\n \"failWhenPartitionDebugMissing\": true\n}" + "raw": "{\n \"processingRequest\": {\n \"profileKey\": \"tachograph-driver-esper-v1\",\n \"scope\": {\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId1}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": true,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n },\n \"minimumAttachedVehicleEvidenceEvents\": 1,\n \"minimumNormalizedSupportEvidenceEvents\": 1,\n \"failWhenPartitionDebugMissing\": true\n}" }, "url": { "raw": "{{baseUrl}}/api/eventhub/runtime-processing/event-processing/validation/mixed-source-evidence", @@ -493,7 +525,7 @@ } }, { - "name": "Runtime diagnostics - driver events from source DB", + "name": "Runtime diagnostics - driver events from mixed backends and sources", "request": { "method": "POST", "header": [ @@ -516,12 +548,12 @@ }, "body": { "mode": "raw", - "raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"sourceFamilies\": [\n \"TACHOGRAPH_DB\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"SOURCE_DB\",\n \"driverSourceEntityId\": \"{{driverSourceEntityId}}\",\n \"driverCardNation\": \"{{driverCardNation}}\",\n \"driverCardNumber\": \"{{driverCardNumber}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}" + "raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}" } } }, { - "name": "Runtime diagnostics - driver timeline from EventHub DB", + "name": "Runtime diagnostics - driver timeline from mixed backends and sources", "request": { "method": "POST", "header": [ @@ -544,7 +576,7 @@ }, "body": { "mode": "raw", - "raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"sourceFamilies\": [\n \"TACHOGRAPH_DB\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"EVENTHUB_DB\",\n \"driverCardNation\": \"{{driverCardNation}}\",\n \"driverCardNumber\": \"{{driverCardNumber}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}" + "raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}" } } } diff --git a/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeProcessingApiRequest.java b/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeProcessingApiRequest.java index a9c97e1..460f80c 100644 --- a/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeProcessingApiRequest.java +++ b/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeProcessingApiRequest.java @@ -33,8 +33,64 @@ public record UnifiedRuntimeProcessingApiRequest( Integer significantDrivingMinutes, Integer minimumRestPeriodMinutes, Boolean includeActivityIntervals, - Boolean includeDrivingIntervals + Boolean includeDrivingIntervals, + List sourceInputs ) { + public UnifiedRuntimeProcessingApiRequest( + UUID sessionId, + List sessionIds, + UUID compositeSessionId, + String tenantKey, + Set sourceFamilies, + UnifiedRuntimeEventBackend eventBackend, + Set tachographSourceKinds, + String driverKey, + Set driverKeys, + Boolean includeAllDrivers, + Set vehicleKeys, + Boolean includeAllVehicles, + String driverSourceEntityId, + String driverCardNation, + String driverCardNumber, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, + Boolean expandVehicleEvents, + Integer vehicleExpansionPaddingMinutes, + Boolean includeIntersectingIntervals, + Integer significantDrivingMinutes, + Integer minimumRestPeriodMinutes, + Boolean includeActivityIntervals, + Boolean includeDrivingIntervals + ) { + this( + sessionId, + sessionIds, + compositeSessionId, + tenantKey, + sourceFamilies, + eventBackend, + tachographSourceKinds, + driverKey, + driverKeys, + includeAllDrivers, + vehicleKeys, + includeAllVehicles, + driverSourceEntityId, + driverCardNation, + driverCardNumber, + occurredFrom, + occurredTo, + expandVehicleEvents, + vehicleExpansionPaddingMinutes, + includeIntersectingIntervals, + significantDrivingMinutes, + minimumRestPeriodMinutes, + includeActivityIntervals, + includeDrivingIntervals, + List.of() + ); + } + public UnifiedRuntimeProcessingRequest toRuntimeRequest() { return new UnifiedRuntimeProcessingRequest( sessionId, @@ -56,7 +112,11 @@ public record UnifiedRuntimeProcessingApiRequest( occurredTo, expandVehicleEvents == null || expandVehicleEvents, vehicleExpansionPaddingMinutes == null ? 0 : Math.max(0, vehicleExpansionPaddingMinutes), - includeIntersectingIntervals == null || includeIntersectingIntervals + includeIntersectingIntervals == null || includeIntersectingIntervals, + sourceInputs == null ? List.of() : sourceInputs.stream() + .filter(value -> value != null) + .map(UnifiedRuntimeSourceInputApiRequest::toRuntimeSourceInput) + .toList() ); } diff --git a/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeSourceInputApiRequest.java b/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeSourceInputApiRequest.java new file mode 100644 index 0000000..b9efc93 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/dto/UnifiedRuntimeSourceInputApiRequest.java @@ -0,0 +1,25 @@ +package at.procon.eventhub.processing.dto; + +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; +import at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput; +import java.util.List; +import java.util.UUID; + +public record UnifiedRuntimeSourceInputApiRequest( + UnifiedEventSourceFamily sourceFamily, + UnifiedRuntimeEventBackend eventBackend, + UUID sessionId, + List sessionIds, + UUID compositeSessionId +) { + public UnifiedRuntimeSourceInput toRuntimeSourceInput() { + return new UnifiedRuntimeSourceInput( + sourceFamily, + eventBackend, + sessionId, + sessionIds, + compositeSessionId + ); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/SupportEvidenceNormalizationModule.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/SupportEvidenceNormalizationModule.java index 1cc437b..249b595 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/module/SupportEvidenceNormalizationModule.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/module/SupportEvidenceNormalizationModule.java @@ -193,7 +193,9 @@ public class SupportEvidenceNormalizationModule implements RuntimeProcessingModu } private UUID runtimeSessionId(UnifiedRuntimeProcessingRequest request) { - if (request.compositeSessionId() != null || request.sessionIds().size() > 1) { + if (request.normalizedSourceInputs().size() > 1 + || request.compositeSessionId() != null + || request.sessionIds().size() > 1) { return null; } return request.sessionIds().size() == 1 ? request.sessionIds().get(0) : request.sessionId(); diff --git a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java index d7b467a..333a506 100644 --- a/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java +++ b/src/main/java/at/procon/eventhub/processing/eventprocessing/plan/DriverWorkingTimeRuntimeProcessingPlan.java @@ -405,7 +405,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing significantDrivingMinutes, minimumRestPeriodMinutes, includeActivityIntervals, - includeDrivingIntervals + includeDrivingIntervals, + sourceSelection.sourceInputs() ); } diff --git a/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java index 259262b..303f8d6 100644 --- a/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java +++ b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -29,13 +30,62 @@ public record UnifiedRuntimeProcessingRequest( OffsetDateTime occurredTo, boolean expandVehicleEvents, int vehicleExpansionPaddingMinutes, - boolean includeIntersectingIntervals + boolean includeIntersectingIntervals, + List sourceInputs ) { + public UnifiedRuntimeProcessingRequest( + UUID sessionId, + List sessionIds, + UUID compositeSessionId, + String tenantKey, + Set sourceFamilies, + UnifiedRuntimeEventBackend eventBackend, + Set tachographSourceKinds, + String driverKey, + Set driverKeys, + boolean includeAllDrivers, + Set vehicleKeys, + boolean includeAllVehicles, + String driverSourceEntityId, + String driverCardNation, + String driverCardNumber, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, + boolean expandVehicleEvents, + int vehicleExpansionPaddingMinutes, + boolean includeIntersectingIntervals + ) { + this( + sessionId, + sessionIds, + compositeSessionId, + tenantKey, + sourceFamilies, + eventBackend, + tachographSourceKinds, + driverKey, + driverKeys, + includeAllDrivers, + vehicleKeys, + includeAllVehicles, + driverSourceEntityId, + driverCardNation, + driverCardNumber, + occurredFrom, + occurredTo, + expandVehicleEvents, + vehicleExpansionPaddingMinutes, + includeIntersectingIntervals, + List.of() + ); + } + public UnifiedRuntimeProcessingRequest { - if (sourceFamilies == null || sourceFamilies.isEmpty()) { + sourceInputs = normalizeSourceInputs(sourceInputs); + sourceFamilies = normalizeSourceFamilies(sourceFamilies, sourceInputs); + if (sourceFamilies.isEmpty()) { throw new IllegalArgumentException("sourceFamilies must not be empty"); } - sourceFamilies = Set.copyOf(sourceFamilies); eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend; tachographSourceKinds = normalizeTachographSourceKinds(tachographSourceKinds); sessionIds = normalizeSessionIds(sessionId, sessionIds); @@ -60,22 +110,44 @@ public record UnifiedRuntimeProcessingRequest( driverSourceEntityId = normalize(driverSourceEntityId); driverCardNation = normalizeUpper(driverCardNation); driverCardNumber = normalizeDriverCardNumber(driverCardNumber); - boolean includesFileSession = sourceFamilies.contains(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION); - boolean includesExternalDb = sourceFamilies.stream() - .anyMatch(family -> family == UnifiedEventSourceFamily.TACHOGRAPH_DB || family == UnifiedEventSourceFamily.YELLOWFOX_DB); - if (includesFileSession && eventBackend == UnifiedRuntimeEventBackend.EVENTHUB_DB) { - throw new IllegalArgumentException("TACHOGRAPH_FILE_SESSION runtime processing currently supports SOURCE_DB backend only."); - } + String[] inferredDriverCardSelector = inferDriverCardSelector(driverKey, driverCardNation, driverCardNumber); + driverCardNation = inferredDriverCardSelector[0]; + driverCardNumber = inferredDriverCardSelector[1]; + + List normalizedSourceInputs = normalizedSourceInputs( + sourceFamilies, + eventBackend, + sessionId, + sessionIds, + compositeSessionId, + sourceInputs + ); + boolean includesFileSession = normalizedSourceInputs.stream() + .anyMatch(input -> input.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION); + boolean includesExternalDb = normalizedSourceInputs.stream() + .map(UnifiedRuntimeSourceInput::sourceFamily) + .anyMatch(UnifiedRuntimeProcessingRequest::isExternalDbFamily); + boolean includesEventHubExternalDb = normalizedSourceInputs.stream() + .anyMatch(input -> input.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB + && isExternalDbFamily(input.sourceFamily())); + if (tenantKey == null) { if (!includesFileSession || includesExternalDb) { throw new IllegalArgumentException("tenantKey must not be blank"); } } - if (includesFileSession && compositeSessionId == null && sessionIds.isEmpty()) { - throw new IllegalArgumentException("sessionId, sessionIds or compositeSessionId must be provided when TACHOGRAPH_FILE_SESSION is selected."); + if (includesFileSession && compositeSessionId == null && sessionIds.isEmpty() + && normalizedSourceInputs.stream() + .filter(input -> input.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) + .allMatch(input -> input.compositeSessionId() == null && input.sessionIds().isEmpty())) { + throw new IllegalArgumentException( + "sessionId, sessionIds or compositeSessionId must be provided when TACHOGRAPH_FILE_SESSION is selected." + ); } if (includesFileSession && driverKey == null && driverKeys.isEmpty() && !includeAllDrivers) { - throw new IllegalArgumentException("driverKey, driverKeys or includeAllDrivers must be provided when TACHOGRAPH_FILE_SESSION is selected."); + throw new IllegalArgumentException( + "driverKey, driverKeys or includeAllDrivers must be provided when TACHOGRAPH_FILE_SESSION is selected." + ); } if (includesExternalDb && driverSourceEntityId == null && driverCardNumber == null && driverKeys.isEmpty() && !includeAllDrivers) { @@ -83,14 +155,17 @@ public record UnifiedRuntimeProcessingRequest( } if (includesExternalDb && (includeAllDrivers || !driverKeys.isEmpty()) && (occurredFrom == null || occurredTo == null)) { - throw new IllegalArgumentException("occurredFrom and occurredTo are required when loading broad external DB runtime scopes."); + throw new IllegalArgumentException( + "occurredFrom and occurredTo are required when loading broad external DB runtime scopes." + ); } - if (eventBackend == UnifiedRuntimeEventBackend.EVENTHUB_DB - && includesExternalDb + if (includesEventHubExternalDb && driverSourceEntityId == null && driverCardNumber == null && (includeAllDrivers || !driverKeys.isEmpty())) { - throw new IllegalArgumentException("Broad multi-driver EVENTHUB_DB runtime scopes are not supported yet; provide a concrete EventHub driver selector or use SOURCE_DB."); + throw new IllegalArgumentException( + "Broad multi-driver EVENTHUB_DB runtime scopes are not supported yet; provide a concrete EventHub driver selector or use SOURCE_DB." + ); } if (occurredFrom != null && occurredTo != null && occurredTo.isBefore(occurredFrom)) { throw new IllegalArgumentException("occurredTo must not be before occurredFrom"); @@ -353,6 +428,44 @@ public record UnifiedRuntimeProcessingRequest( return occurredTo == null ? null : occurredTo.plusMinutes(vehicleExpansionPaddingMinutes); } + public List normalizedSourceInputs() { + return normalizedSourceInputs( + sourceFamilies, + eventBackend, + sessionId, + sessionIds, + compositeSessionId, + sourceInputs + ); + } + + public UnifiedRuntimeProcessingRequest forSourceInput(UnifiedRuntimeSourceInput sourceInput) { + Objects.requireNonNull(sourceInput, "sourceInput must not be null"); + return new UnifiedRuntimeProcessingRequest( + sourceInput.sessionId(), + sourceInput.sessionIds(), + sourceInput.compositeSessionId(), + tenantKey, + Set.of(sourceInput.sourceFamily()), + sourceInput.eventBackend(), + tachographSourceKinds, + driverKey, + driverKeys, + includeAllDrivers, + vehicleKeys, + includeAllVehicles, + driverSourceEntityId, + driverCardNation, + driverCardNumber, + occurredFrom, + occurredTo, + expandVehicleEvents, + vehicleExpansionPaddingMinutes, + includeIntersectingIntervals, + List.of(sourceInput) + ); + } + private static List normalizeSessionIds(UUID sessionId, List sessionIds) { LinkedHashSet result = new LinkedHashSet<>(); if (sessionId != null) { @@ -385,7 +498,8 @@ public record UnifiedRuntimeProcessingRequest( occurredTo, expandVehicleEvents, vehicleExpansionPaddingMinutes, - includeIntersectingIntervals + includeIntersectingIntervals, + sourceInputs ); } @@ -410,6 +524,68 @@ public record UnifiedRuntimeProcessingRequest( .toList(); } + private static Set normalizeSourceFamilies( + Set sourceFamilies, + List sourceInputs + ) { + LinkedHashSet normalized = new LinkedHashSet<>(); + if (sourceFamilies != null) { + normalized.addAll(sourceFamilies.stream().filter(Objects::nonNull).toList()); + } + if (sourceInputs != null) { + sourceInputs.stream() + .filter(Objects::nonNull) + .map(UnifiedRuntimeSourceInput::sourceFamily) + .forEach(normalized::add); + } + return Set.copyOf(normalized); + } + + private static List normalizeSourceInputs(List sourceInputs) { + if (sourceInputs == null || sourceInputs.isEmpty()) { + return List.of(); + } + LinkedHashSet normalized = new LinkedHashSet<>(); + sourceInputs.stream() + .filter(Objects::nonNull) + .forEach(normalized::add); + return List.copyOf(normalized); + } + + private static List normalizedSourceInputs( + Set sourceFamilies, + UnifiedRuntimeEventBackend eventBackend, + UUID sessionId, + List sessionIds, + UUID compositeSessionId, + List sourceInputs + ) { + if (sourceInputs != null && !sourceInputs.isEmpty()) { + return List.copyOf(sourceInputs); + } + List normalized = new ArrayList<>(); + for (UnifiedEventSourceFamily sourceFamily : sourceFamilies == null ? Set.of() : sourceFamilies) { + if (sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) { + normalized.add(new UnifiedRuntimeSourceInput( + sourceFamily, + eventBackend, + sessionId, + sessionIds, + compositeSessionId + )); + } else { + normalized.add(new UnifiedRuntimeSourceInput( + sourceFamily, + eventBackend, + null, + List.of(), + null + )); + } + } + return List.copyOf(normalized); + } + private static Set normalizeStrings(Set values) { if (values == null || values.isEmpty()) { return Set.of(); @@ -433,6 +609,10 @@ public record UnifiedRuntimeProcessingRequest( return Set.copyOf(values); } + private static boolean isExternalDbFamily(UnifiedEventSourceFamily family) { + return family == UnifiedEventSourceFamily.TACHOGRAPH_DB || family == UnifiedEventSourceFamily.YELLOWFOX_DB; + } + private static String normalize(String value) { return value == null || value.isBlank() ? null : value.trim(); } @@ -444,4 +624,39 @@ public record UnifiedRuntimeProcessingRequest( private static String normalizeDriverCardNumber(String value) { return DriverCardNumberNormalizer.canonical(value); } + + private static String[] inferDriverCardSelector( + String driverKey, + String driverCardNation, + String driverCardNumber + ) { + if (driverCardNumber != null) { + return new String[]{driverCardNation, driverCardNumber}; + } + if (driverKey == null) { + return new String[]{driverCardNation, driverCardNumber}; + } + int separator = driverKey.indexOf(':'); + if (separator <= 0 || separator >= driverKey.length() - 1) { + return new String[]{driverCardNation, driverCardNumber}; + } + String prefix = driverKey.substring(0, separator).trim(); + if (!looksLikeDriverCardNation(prefix)) { + return new String[]{driverCardNation, driverCardNumber}; + } + return new String[]{ + driverCardNation == null ? normalizeUpper(prefix) : driverCardNation, + normalizeDriverCardNumber(driverKey.substring(separator + 1)) + }; + } + + private static boolean looksLikeDriverCardNation(String value) { + if (value == null || value.isBlank()) { + return false; + } + String trimmed = value.trim(); + return trimmed.matches("\\d+") + || trimmed.matches("(?i)[A-Z]{2}") + || trimmed.matches("(?i)UNKNOWN\\s+\\d+"); + } } diff --git a/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeSourceInput.java b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeSourceInput.java new file mode 100644 index 0000000..7efcdc5 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeSourceInput.java @@ -0,0 +1,53 @@ +package at.procon.eventhub.processing.model; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.UUID; + +public record UnifiedRuntimeSourceInput( + UnifiedEventSourceFamily sourceFamily, + UnifiedRuntimeEventBackend eventBackend, + UUID sessionId, + List sessionIds, + UUID compositeSessionId +) { + public UnifiedRuntimeSourceInput { + if (sourceFamily == null) { + throw new IllegalArgumentException("sourceFamily must not be null"); + } + eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend; + sessionIds = normalizeSessionIds(sessionId, sessionIds); + if (sessionId == null && !sessionIds.isEmpty()) { + sessionId = sessionIds.get(0); + } + if (compositeSessionId != null && !sessionIds.isEmpty()) { + throw new IllegalArgumentException("Use either compositeSessionId or sessionId/sessionIds, not both."); + } + if (sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) { + if (eventBackend == UnifiedRuntimeEventBackend.EVENTHUB_DB) { + throw new IllegalArgumentException("TACHOGRAPH_FILE_SESSION runtime processing currently supports SOURCE_DB backend only."); + } + if (compositeSessionId == null && sessionIds.isEmpty()) { + throw new IllegalArgumentException( + "sessionId, sessionIds or compositeSessionId must be provided for TACHOGRAPH_FILE_SESSION source inputs." + ); + } + } else if (compositeSessionId != null || !sessionIds.isEmpty()) { + throw new IllegalArgumentException( + "sessionId, sessionIds or compositeSessionId are supported for TACHOGRAPH_FILE_SESSION source inputs only." + ); + } + } + + private static List normalizeSessionIds(UUID sessionId, List sessionIds) { + LinkedHashSet result = new LinkedHashSet<>(); + if (sessionId != null) { + result.add(sessionId); + } + if (sessionIds != null) { + result.addAll(sessionIds.stream().filter(value -> value != null).toList()); + } + return List.copyOf(new ArrayList<>(result)); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDerivedProjectionService.java b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDerivedProjectionService.java index 06b67d8..63d20e0 100644 --- a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDerivedProjectionService.java +++ b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDerivedProjectionService.java @@ -199,7 +199,9 @@ public class UnifiedRuntimeDerivedProjectionService { } private UUID runtimeSessionId(UnifiedRuntimeProcessingRequest request) { - if (request.compositeSessionId() != null || request.sessionIds().size() > 1) { + if (request.normalizedSourceInputs().size() > 1 + || request.compositeSessionId() != null + || request.sessionIds().size() > 1) { return null; } return request.sessionIds().size() == 1 ? request.sessionIds().get(0) : request.sessionId(); diff --git a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java index 6f740e9..bf8e752 100644 --- a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java +++ b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java @@ -7,6 +7,8 @@ import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput; +import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver; import java.util.ArrayList; import java.util.Comparator; import java.util.LinkedHashMap; @@ -29,6 +31,7 @@ public class UnifiedRuntimeEventAssemblyService { } public UnifiedRuntimeEventBundle assembleDriverScopedEvents(UnifiedRuntimeProcessingRequest request) { + List sourceInputs = request.normalizedSourceInputs(); List driverSeedEvents = loadDriverSeedEvents(request); List discoveredVehicles = discoverVehicles(driverSeedEvents); List expandedVehicleEvents = request.expandVehicleEvents() @@ -37,16 +40,27 @@ public class UnifiedRuntimeEventAssemblyService { List mergedEvents = deduplicateAndSort(driverSeedEvents, expandedVehicleEvents); List notes = new ArrayList<>(); - notes.add(request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB - ? "Driver seed events were loaded from the local EventHub event store." - : "Driver seed events were loaded directly from the selected runtime sources."); - if (request.sourceFamilies().contains(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION)) { - if (request.compositeSessionId() != null) { - notes.add("Tachograph file-session events were loaded from composite session " + request.compositeSessionId() + "."); - } else if (request.sessionIds().size() > 1) { - notes.add("Tachograph file-session events were loaded from " + request.sessionIds().size() + " selected sessions."); - } else if (request.sessionId() != null) { - notes.add("Tachograph file-session events were loaded from session " + request.sessionId() + "."); + boolean includesEventHub = sourceInputs.stream() + .anyMatch(input -> input.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB); + boolean includesSourceDb = sourceInputs.stream() + .anyMatch(input -> input.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB); + if (includesEventHub && includesSourceDb) { + notes.add("Driver seed events were loaded from a mixed runtime scope spanning the local EventHub event store and direct source systems."); + } else if (includesEventHub) { + notes.add("Driver seed events were loaded from the local EventHub event store."); + } else { + notes.add("Driver seed events were loaded directly from the selected runtime sources."); + } + for (UnifiedRuntimeSourceInput sourceInput : sourceInputs) { + if (sourceInput.sourceFamily() != UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) { + continue; + } + if (sourceInput.compositeSessionId() != null) { + notes.add("Tachograph file-session events were loaded from composite session " + sourceInput.compositeSessionId() + "."); + } else if (sourceInput.sessionIds().size() > 1) { + notes.add("Tachograph file-session events were loaded from " + sourceInput.sessionIds().size() + " selected sessions."); + } else if (sourceInput.sessionId() != null) { + notes.add("Tachograph file-session events were loaded from session " + sourceInput.sessionId() + "."); } } if (request.expandVehicleEvents()) { @@ -67,12 +81,13 @@ public class UnifiedRuntimeEventAssemblyService { private List loadDriverSeedEvents(UnifiedRuntimeProcessingRequest request) { List result = new ArrayList<>(); - for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) { - RuntimeDriverEventLoader loader = driverLoader(request, sourceFamily); + for (UnifiedRuntimeSourceInput sourceInput : request.normalizedSourceInputs()) { + UnifiedRuntimeProcessingRequest sourceRequest = request.forSourceInput(sourceInput); + RuntimeDriverEventLoader loader = driverLoader(sourceRequest, sourceInput.sourceFamily()); if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) { - result.addAll(eventHubLoader.loadDriverEvents(request, sourceFamily)); + result.addAll(eventHubLoader.loadDriverEvents(sourceRequest, sourceInput.sourceFamily())); } else { - result.addAll(loader.loadDriverEvents(request)); + result.addAll(loader.loadDriverEvents(sourceRequest)); } } return deduplicateAndSort(result, List.of()); @@ -84,12 +99,13 @@ public class UnifiedRuntimeEventAssemblyService { ) { List result = new ArrayList<>(); for (UnifiedDiscoveredVehicleRef vehicleRef : discoveredVehicles) { - for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) { - RuntimeVehicleEventLoader loader = vehicleLoader(request, sourceFamily); + for (UnifiedRuntimeSourceInput sourceInput : request.normalizedSourceInputs()) { + UnifiedRuntimeProcessingRequest sourceRequest = request.forSourceInput(sourceInput); + RuntimeVehicleEventLoader loader = vehicleLoader(sourceRequest, sourceInput.sourceFamily()); if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) { - result.addAll(eventHubLoader.loadVehicleEvents(request, sourceFamily, vehicleRef)); + result.addAll(eventHubLoader.loadVehicleEvents(sourceRequest, sourceInput.sourceFamily(), vehicleRef)); } else { - result.addAll(loader.loadVehicleEvents(request, vehicleRef)); + result.addAll(loader.loadVehicleEvents(sourceRequest, vehicleRef)); } } } @@ -151,28 +167,33 @@ public class UnifiedRuntimeEventAssemblyService { private void appendDeduplicated(LinkedHashMap byKey, List events) { for (EventHubEventDto event : events) { - byKey.putIfAbsent(dedupKey(event), event); + byKey.putIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), event); } } - private String dedupKey(EventHubEventDto event) { - String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null - ? event.packageInfo().eventSource().stableKey() - : "NO_SOURCE"; - return sourceKey + "|" + event.externalSourceEventId(); - } - private RuntimeDriverEventLoader driverLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { return driverEventLoaders.stream() .filter(loader -> loader.supports(request, sourceFamily)) .findFirst() - .orElseThrow(() -> new IllegalArgumentException("No runtime driver event loader is registered for source family " + sourceFamily + ".")); + .orElseThrow(() -> new IllegalArgumentException( + "No runtime driver event loader is registered for source family " + + sourceFamily + + " and backend " + + request.eventBackend() + + "." + )); } private RuntimeVehicleEventLoader vehicleLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { return vehicleEventLoaders.stream() .filter(loader -> loader.supports(request, sourceFamily)) .findFirst() - .orElseThrow(() -> new IllegalArgumentException("No runtime vehicle event loader is registered for source family " + sourceFamily + ".")); + .orElseThrow(() -> new IllegalArgumentException( + "No runtime vehicle event loader is registered for source family " + + sourceFamily + + " and backend " + + request.eventBackend() + + "." + )); } } diff --git a/src/test/java/at/procon/eventhub/processing/api/UnifiedRuntimeProcessingControllerTest.java b/src/test/java/at/procon/eventhub/processing/api/UnifiedRuntimeProcessingControllerTest.java index 4559b91..a801999 100644 --- a/src/test/java/at/procon/eventhub/processing/api/UnifiedRuntimeProcessingControllerTest.java +++ b/src/test/java/at/procon/eventhub/processing/api/UnifiedRuntimeProcessingControllerTest.java @@ -23,6 +23,7 @@ import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingP import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionResultDto; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionService; +import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingPlanDescriptorDto; import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographDriverParityResultDto; import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographParityCategoryComparisonDto; @@ -378,6 +379,91 @@ class UnifiedRuntimeProcessingControllerTest { .andExpect(jsonPath("$.partitioningStrategy").value("DRIVER")); } + @Test + void runsRuntimeProcessingExecutionWithExplicitSourceInputsViaRuntimeApi() throws Exception { + UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class); + UnifiedRuntimeDriverTimelineService timelineService = org.mockito.Mockito.mock(UnifiedRuntimeDriverTimelineService.class); + UnifiedRuntimeDerivedProjectionService derivedProjectionService = org.mockito.Mockito.mock(UnifiedRuntimeDerivedProjectionService.class); + RuntimeProcessingExecutionService executionService = org.mockito.Mockito.mock(RuntimeProcessingExecutionService.class); + MockMvc mockMvc = MockMvcBuilders.standaloneSetup(new UnifiedRuntimeProcessingController( + eventAssemblyService, + timelineService, + derivedProjectionService, + null, + null, + null, + null, + executionService + )) + .setMessageConverters(new MappingJackson2HttpMessageConverter(objectMapper)) + .setControllerAdvice(new UnifiedRuntimeProcessingExceptionHandler()) + .build(); + + when(executionService.execute(any())) + .thenAnswer(invocation -> { + RuntimeProcessingExecutionApiRequest apiRequest = invocation.getArgument(0); + UnifiedRuntimeProcessingRequest request = apiRequest.sourceSelection().toRuntimeRequest(); + return new RuntimeProcessingExecutionResultDto( + "driver-working-time-v1", + List.of("event-to-activity-intervals"), + RuntimeEventPartitioningStrategy.DRIVER, + request, + 3, + 1, + 1, + List.of(new UnifiedDiscoveredVehicleRef("VEH-1", "VIN-1", "12", "REG-1")), + Map.of(), + List.of("execution"), + List.of() + ); + }); + + mockMvc.perform(post("/api/eventhub/runtime-processing/executions") + .contentType("application/json") + .content(""" + { + "processingPlanKey": "driver-working-time-v1", + "sourceSelection": { + "tenantKey": "default", + "driverKey": "12:12345678901234", + "occurredFrom": "2026-05-01T08:00:00Z", + "occurredTo": "2026-05-01T10:00:00Z", + "sourceInputs": [ + { + "sourceFamily": "TACHOGRAPH_FILE_SESSION", + "eventBackend": "SOURCE_DB", + "sessionIds": [ + "11111111-1111-1111-1111-111111111111", + "22222222-2222-2222-2222-222222222222" + ] + }, + { + "sourceFamily": "TACHOGRAPH_DB", + "eventBackend": "EVENTHUB_DB" + }, + { + "sourceFamily": "YELLOWFOX_DB", + "eventBackend": "SOURCE_DB" + } + ] + }, + "partitioning": { + "strategy": "DRIVER" + } + } + """)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.processingPlanKey").value("driver-working-time-v1")) + .andExpect(jsonPath("$.request.sourceInputs[0].sourceFamily").value("TACHOGRAPH_FILE_SESSION")) + .andExpect(jsonPath("$.request.sourceInputs[0].eventBackend").value("SOURCE_DB")) + .andExpect(jsonPath("$.request.sourceInputs[0].sessionIds[0]").value("11111111-1111-1111-1111-111111111111")) + .andExpect(jsonPath("$.request.sourceInputs[1].sourceFamily").value("TACHOGRAPH_DB")) + .andExpect(jsonPath("$.request.sourceInputs[1].eventBackend").value("EVENTHUB_DB")) + .andExpect(jsonPath("$.request.sourceInputs[2].sourceFamily").value("YELLOWFOX_DB")) + .andExpect(jsonPath("$.request.driverCardNation").value("12")) + .andExpect(jsonPath("$.request.driverCardNumber").value("12345678901234")); + } + @Test void listsRuntimeEventProcessingProfilesViaRuntimeApi() throws Exception { UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class); diff --git a/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java b/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java index a78c3c8..3d7dde8 100644 --- a/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java +++ b/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java @@ -138,6 +138,140 @@ class UnifiedRuntimeProcessingRequestTest { assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.SOURCE_DB); } + @Test + void infersDriverCardSelectorFromDriverKeyForMixedSourceScope() { + UUID sessionId = UUID.randomUUID(); + UnifiedRuntimeProcessingRequest request = new UnifiedRuntimeProcessingRequest( + sessionId, + List.of(), + null, + "default", + Set.of( + UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION, + UnifiedEventSourceFamily.TACHOGRAPH_DB, + UnifiedEventSourceFamily.YELLOWFOX_DB + ), + UnifiedRuntimeEventBackend.SOURCE_DB, + null, + "12:1234567890123456", + Set.of(), + false, + Set.of(), + false, + null, + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 10, + true + ); + + assertThat(request.driverKey()).isEqualTo("12:1234567890123456"); + assertThat(request.driverCardNation()).isEqualTo("12"); + assertThat(request.driverCardNumber()).isEqualTo("12345678901234"); + assertThat(request.sourceFamilies()).containsExactlyInAnyOrder( + UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION, + UnifiedEventSourceFamily.TACHOGRAPH_DB, + UnifiedEventSourceFamily.YELLOWFOX_DB + ); + } + + @Test + void supportsExplicitMixedBackendSourceInputs() { + UUID sessionId = UUID.randomUUID(); + UnifiedRuntimeProcessingRequest request = new UnifiedRuntimeProcessingRequest( + null, + List.of(), + null, + "default", + Set.of(), + null, + null, + "12:1234567890123456", + Set.of(), + false, + Set.of(), + false, + null, + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 10, + true, + List.of( + new UnifiedRuntimeSourceInput( + UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION, + UnifiedRuntimeEventBackend.SOURCE_DB, + sessionId, + List.of(), + null + ), + new UnifiedRuntimeSourceInput( + UnifiedEventSourceFamily.TACHOGRAPH_DB, + UnifiedRuntimeEventBackend.EVENTHUB_DB, + null, + List.of(), + null + ), + new UnifiedRuntimeSourceInput( + UnifiedEventSourceFamily.YELLOWFOX_DB, + UnifiedRuntimeEventBackend.SOURCE_DB, + null, + List.of(), + null + ) + ) + ); + + assertThat(request.sourceFamilies()).containsExactlyInAnyOrder( + UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION, + UnifiedEventSourceFamily.TACHOGRAPH_DB, + UnifiedEventSourceFamily.YELLOWFOX_DB + ); + assertThat(request.normalizedSourceInputs()).hasSize(3); + assertThat(request.normalizedSourceInputs()).extracting(UnifiedRuntimeSourceInput::eventBackend) + .containsExactly( + UnifiedRuntimeEventBackend.SOURCE_DB, + UnifiedRuntimeEventBackend.EVENTHUB_DB, + UnifiedRuntimeEventBackend.SOURCE_DB + ); + assertThat(request.driverCardNation()).isEqualTo("12"); + assertThat(request.driverCardNumber()).isEqualTo("12345678901234"); + } + + @Test + void doesNotInferDriverCardSelectorFromNonCardDriverKey() { + UnifiedRuntimeProcessingRequest request = new UnifiedRuntimeProcessingRequest( + null, + List.of(), + null, + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), + UnifiedRuntimeEventBackend.SOURCE_DB, + null, + "DRIVER:42", + Set.of(), + false, + Set.of(), + false, + null, + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 0, + true + ); + + assertThat(request.driverCardNation()).isNull(); + assertThat(request.driverCardNumber()).isNull(); + } + @Test diff --git a/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java b/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java index bc4175c..6d6f6a7 100644 --- a/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java +++ b/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java @@ -16,6 +16,8 @@ import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.time.LocalDate; import java.time.OffsetDateTime; import java.util.List; @@ -95,6 +97,146 @@ class UnifiedRuntimeEventAssemblyServiceTest { .containsExactly("SEED-1", "SEED-2"); } + @Test + void deduplicatesEquivalentCrossSourceActivityEventsByCanonicalIdentity() { + UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService( + List.of(new FakeTachographDbLoader(), new FakeYellowFoxDbLoader()), + List.of() + ); + + UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents( + UnifiedRuntimeProcessingRequest.forDriverFromEventHub( + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB), + "DRIVER:42", + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + false, + 0 + ) + ); + + assertThat(bundle.driverSeedEvents()).hasSize(1); + assertThat(bundle.mergedEvents()).hasSize(1); + assertThat(bundle.mergedEvents().get(0).occurredAt()).isEqualTo(OffsetDateTime.parse("2026-05-01T08:00:00Z")); + assertThat(bundle.mergedEvents().get(0).eventType()).isEqualTo(EventType.DRIVE); + assertThat(bundle.mergedEvents().get(0).lifecycle()).isEqualTo(EventLifecycle.START); + } + + @Test + void assemblesMixedFileSessionAndDirectDbSourcesUnderSourceDbBackend() { + UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService( + List.of( + new FakeFileSessionLoader(), + new FakeDirectTachographDbLoader(), + new FakeDirectYellowFoxDbLoader() + ), + List.of() + ); + + UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents( + new UnifiedRuntimeProcessingRequest( + UUID.fromString("11111111-1111-1111-1111-111111111111"), + List.of(), + null, + "default", + Set.of( + UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION, + UnifiedEventSourceFamily.TACHOGRAPH_DB, + UnifiedEventSourceFamily.YELLOWFOX_DB + ), + UnifiedRuntimeEventBackend.SOURCE_DB, + null, + "12:1234567890123456", + Set.of(), + false, + Set.of(), + false, + null, + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + false, + 0, + true + ) + ); + + assertThat(bundle.driverSeedEvents()).hasSize(3); + assertThat(bundle.mergedEvents()).hasSize(3); + assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("FILE-SESSION-1", "TACHO-DB-1", "YF-DB-1"); + } + + @Test + void assemblesMixedBackendSourcesFromExplicitSourceInputs() { + UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService( + List.of( + new FakeFileSessionLoader(), + new FakeTachographDbLoader(), + new FakeDirectYellowFoxDbLoader() + ), + List.of() + ); + + UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents( + new UnifiedRuntimeProcessingRequest( + null, + List.of(), + null, + "default", + Set.of(), + null, + null, + "12:1234567890123456", + Set.of(), + false, + Set.of(), + false, + null, + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + false, + 0, + true, + List.of( + new at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput( + UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION, + UnifiedRuntimeEventBackend.SOURCE_DB, + UUID.fromString("11111111-1111-1111-1111-111111111111"), + List.of(), + null + ), + new at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput( + UnifiedEventSourceFamily.TACHOGRAPH_DB, + UnifiedRuntimeEventBackend.EVENTHUB_DB, + null, + List.of(), + null + ), + new at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput( + UnifiedEventSourceFamily.YELLOWFOX_DB, + UnifiedRuntimeEventBackend.SOURCE_DB, + null, + List.of(), + null + ) + ) + ) + ); + + assertThat(bundle.driverSeedEvents()).hasSize(3); + assertThat(bundle.mergedEvents()).hasSize(3); + assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId) + .containsExactly("FILE-SESSION-1", "TACHO-ACT-1", "YF-DB-1"); + assertThat(bundle.notes()).anySatisfy(note -> assertThat(note).contains("mixed runtime scope")); + } + private static final class FakeLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader { @Override @@ -171,4 +313,167 @@ class UnifiedRuntimeEventAssemblyServiceTest { ); } } + + private static final class FakeTachographDbLoader implements RuntimeDriverEventLoader { + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB + && sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + return List.of(canonicalActivityEvent( + "TACHO-ACT-1", + source("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD"), + "2026-05-01T08:00:00Z", + "2026-05-01T08:00:00Z", + "2026-05-01T09:00:00Z" + )); + } + } + + private static final class FakeYellowFoxDbLoader implements RuntimeDriverEventLoader { + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB + && sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + return List.of(canonicalActivityEvent( + "YF-ACT-99", + source("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8"), + "2026-05-01T08:00:00Z", + "2026-05-01T08:00:00Z", + "2026-05-01T09:00:00Z" + )); + } + } + + private static final class FakeFileSessionLoader implements RuntimeDriverEventLoader { + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB + && sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + return List.of(canonicalActivityEvent( + "FILE-SESSION-1", + source("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_FILE_SESSION"), + "2026-05-01T07:55:00Z", + "2026-05-01T07:55:00Z", + "2026-05-01T08:25:00Z" + )); + } + } + + private static final class FakeDirectTachographDbLoader implements RuntimeDriverEventLoader { + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB + && sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + return List.of(canonicalActivityEvent( + "TACHO-DB-1", + source("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD"), + "2026-05-01T08:30:00Z", + "2026-05-01T08:30:00Z", + "2026-05-01T09:00:00Z" + )); + } + } + + private static final class FakeDirectYellowFoxDbLoader implements RuntimeDriverEventLoader { + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB + && sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + return List.of(canonicalActivityEvent( + "YF-DB-1", + source("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8"), + "2026-05-01T09:05:00Z", + "2026-05-01T09:05:00Z", + "2026-05-01T09:35:00Z" + )); + } + } + + private static EventHubEventDto canonicalActivityEvent( + String externalId, + EventSourceDto source, + String occurredAt, + String startedAt, + String endedAt + ) { + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("intervalId", externalId + "-INTERVAL"); + raw.put("startedAt", startedAt); + raw.put("endedAt", endedAt); + raw.put("driverKey", "DRIVER:42"); + raw.put("activityType", "DRIVE"); + raw.put("cardSlot", "DRIVER"); + raw.put("cardStatus", "INSERTED"); + raw.put("drivingStatus", "SINGLE"); + raw.put("registrationKey", "AT:W-1"); + raw.put("vehicleKey", "VIN-1"); + raw.put("sourceKind", source.sourceKind()); + raw.putArray("sourceRowIds").add(externalId + "-ROW"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + return new EventHubEventDto( + UUID.randomUUID(), + externalId, + new DriverRefDto("DRIVER:42", null), + new VehicleRefDto( + "VEH-1", + "VIN-1", + "VEH-1", + new VehicleRegistrationRefDto("AT", "W-1") + ), + OffsetDateTime.parse(occurredAt), + null, + OffsetDateTime.parse(occurredAt), + EventDomain.DRIVER_ACTIVITY, + EventType.DRIVE, + EventLifecycle.START, + null, + null, + null, + null, + payload, + false, + new at.procon.eventhub.dto.EventHubPackageRequest( + "default", + source, + null, + ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z") + ), + EventDomain.DRIVER_ACTIVITY.name(), + LocalDate.parse("2026-05-01"), + source.stableKey() + ":DRIVER_ACTIVITY:2026-05-01" + ) + ); + } + + private static EventSourceDto source(String provider, String sourceKind, String sourceKey) { + return new EventSourceDto(provider, sourceKind, sourceKey, null, null, null); + } }