Add mixed-backend runtime source inputs

This commit is contained in:
trifonovt 2026-06-05 12:09:35 +02:00
parent 6bef8becf9
commit 6ba2df1a61
15 changed files with 1044 additions and 63 deletions

View File

@ -12,10 +12,36 @@ Use:
```json ```json
{ {
"processingPlanKey": "driver-working-time-v1" "processingPlanKey": "driver-working-time-v1",
"sourceSelection": {
"tenantKey": "default",
"driverKey": "12:12345678901234",
"occurredFrom": "2026-05-01T00:00:00Z",
"occurredTo": "2026-05-31T23:59:59Z",
"sourceInputs": [
{
"sourceFamily": "TACHOGRAPH_FILE_SESSION",
"eventBackend": "SOURCE_DB",
"sessionIds": [
"11111111-1111-1111-1111-111111111111",
"22222222-2222-2222-2222-222222222222"
]
},
{
"sourceFamily": "TACHOGRAPH_DB",
"eventBackend": "EVENTHUB_DB"
},
{
"sourceFamily": "YELLOWFOX_DB",
"eventBackend": "SOURCE_DB"
}
]
}
} }
``` ```
Use `sourceInputs` when one runtime request must mix direct-source and EventHub-backed inputs. Keep `sourceFamilies + eventBackend` only for legacy same-backend scopes.
## Canonical input idea ## Canonical input idea
The plan should work with events from any source once they are normalized into canonical EventHub event semantics: The plan should work with events from any source once they are normalized into canonical EventHub event semantics:

View File

@ -54,13 +54,28 @@ The important design point is that runtime processing is not tachograph-specific
{ {
"processingPlanKey": "driver-working-time-v1", "processingPlanKey": "driver-working-time-v1",
"sourceSelection": { "sourceSelection": {
"tenantKey": "default",
"driverKey": "12:12345678901234",
"occurredFrom": "2026-05-01T00:00:00Z",
"occurredTo": "2026-05-31T23:59:59Z",
"sourceInputs": [
{
"sourceFamily": "TACHOGRAPH_FILE_SESSION",
"eventBackend": "SOURCE_DB",
"sessionIds": [ "sessionIds": [
"11111111-1111-1111-1111-111111111111", "11111111-1111-1111-1111-111111111111",
"22222222-2222-2222-2222-222222222222" "22222222-2222-2222-2222-222222222222"
]
},
{
"sourceFamily": "TACHOGRAPH_DB",
"eventBackend": "EVENTHUB_DB"
},
{
"sourceFamily": "YELLOWFOX_DB",
"eventBackend": "SOURCE_DB"
}
], ],
"sourceFamilies": ["TACHOGRAPH_FILE_SESSION", "YELLOWFOX_DB"],
"occurredFrom": "2026-05-01T00:00:00Z",
"occurredTo": "2026-05-31T23:59:59Z",
"expandVehicleEvents": true "expandVehicleEvents": true
}, },
"partitioning": { "partitioning": {
@ -78,6 +93,8 @@ The important design point is that runtime processing is not tachograph-specific
} }
``` ```
`sourceFamilies + eventBackend` is still accepted as a legacy shorthand when every selected family uses the same backend. Use `sourceInputs` when one runtime request must mix direct-source and EventHub-backed inputs.
## Conceptual flow ## Conceptual flow
```text ```text

View File

@ -40,6 +40,8 @@ with:
also remains available, but new clients should use `processingPlanKey` and `/executions`. also remains available, but new clients should use `processingPlanKey` and `/executions`.
When the runtime scope must mix file sessions, direct source databases, and EventHub-backed canonical events in one request, use `/executions` with per-source `sourceInputs`. The tachograph compatibility endpoints remain useful for file-session-centric scopes, but they are no longer the best shape for mixed-source runtime execution.
The current `driver-working-time-v1` plan uses these modules: The current `driver-working-time-v1` plan uses these modules:
```text ```text

View File

@ -24,6 +24,38 @@
{ {
"key": "driverKey", "key": "driverKey",
"value": "12:12345678901234" "value": "12:12345678901234"
},
{
"key": "sessionId1",
"value": "11111111-1111-1111-1111-111111111111"
},
{
"key": "sessionId2",
"value": "22222222-2222-2222-2222-222222222222"
},
{
"key": "tenantKey",
"value": "default"
},
{
"key": "driverSourceEntityId",
"value": "DRIVER:42"
},
{
"key": "driverCardNation",
"value": "12"
},
{
"key": "driverCardNumber",
"value": "12345678901234"
},
{
"key": "occurredFrom",
"value": "2026-05-01T00:00:00Z"
},
{
"key": "occurredTo",
"value": "2026-05-31T23:59:59Z"
} }
], ],
"item": [ "item": [
@ -132,7 +164,7 @@
} }
}, },
{ {
"name": "Execute processing plan - driver working time source DB mixed sources", "name": "Execute processing plan - driver working time mixed backends and sources",
"request": { "request": {
"method": "POST", "method": "POST",
"header": [ "header": [
@ -155,7 +187,7 @@
}, },
"body": { "body": {
"mode": "raw", "mode": "raw",
"raw": "{\n \"processingPlanKey\": \"driver-working-time-v1\",\n \"sourceSelection\": {\n \"tenantKey\": \"{{tenantKey}}\",\n \"sourceFamilies\": [\n \"TACHOGRAPH_DB\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"SOURCE_DB\",\n \"driverSourceEntityId\": \"{{driverSourceEntityId}}\",\n \"driverCardNation\": \"{{driverCardNation}}\",\n \"driverCardNumber\": \"{{driverCardNumber}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": false,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n}" "raw": "{\n \"processingPlanKey\": \"driver-working-time-v1\",\n \"sourceSelection\": {\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": false,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n}"
} }
} }
}, },
@ -334,7 +366,7 @@
], ],
"body": { "body": {
"mode": "raw", "mode": "raw",
"raw": "{\n \"processingRequest\": {\n \"profileKey\": \"tachograph-driver-esper-v1\",\n \"scope\": {\n \"sessionIds\": [\n \"{{sessionId1}}\",\n \"{{sessionId2}}\"\n ],\n \"sourceFamilies\": [\n \"TACHOGRAPH_FILE_SESSION\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"SOURCE_DB\",\n \"occurredFrom\": \"2026-05-01T00:00:00Z\",\n \"occurredTo\": \"2026-05-31T23:59:59Z\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": true,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n },\n \"minimumAttachedVehicleEvidenceEvents\": 1,\n \"minimumNormalizedSupportEvidenceEvents\": 1,\n \"failWhenPartitionDebugMissing\": true\n}" "raw": "{\n \"processingRequest\": {\n \"profileKey\": \"tachograph-driver-esper-v1\",\n \"scope\": {\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId1}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": true,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n },\n \"minimumAttachedVehicleEvidenceEvents\": 1,\n \"minimumNormalizedSupportEvidenceEvents\": 1,\n \"failWhenPartitionDebugMissing\": true\n}"
}, },
"url": { "url": {
"raw": "{{baseUrl}}/api/eventhub/runtime-processing/event-processing/validation/mixed-source-evidence", "raw": "{{baseUrl}}/api/eventhub/runtime-processing/event-processing/validation/mixed-source-evidence",
@ -493,7 +525,7 @@
} }
}, },
{ {
"name": "Runtime diagnostics - driver events from source DB", "name": "Runtime diagnostics - driver events from mixed backends and sources",
"request": { "request": {
"method": "POST", "method": "POST",
"header": [ "header": [
@ -516,12 +548,12 @@
}, },
"body": { "body": {
"mode": "raw", "mode": "raw",
"raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"sourceFamilies\": [\n \"TACHOGRAPH_DB\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"SOURCE_DB\",\n \"driverSourceEntityId\": \"{{driverSourceEntityId}}\",\n \"driverCardNation\": \"{{driverCardNation}}\",\n \"driverCardNumber\": \"{{driverCardNumber}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}" "raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}"
} }
} }
}, },
{ {
"name": "Runtime diagnostics - driver timeline from EventHub DB", "name": "Runtime diagnostics - driver timeline from mixed backends and sources",
"request": { "request": {
"method": "POST", "method": "POST",
"header": [ "header": [
@ -544,7 +576,7 @@
}, },
"body": { "body": {
"mode": "raw", "mode": "raw",
"raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"sourceFamilies\": [\n \"TACHOGRAPH_DB\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"EVENTHUB_DB\",\n \"driverCardNation\": \"{{driverCardNation}}\",\n \"driverCardNumber\": \"{{driverCardNumber}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}" "raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}"
} }
} }
} }

View File

@ -33,8 +33,64 @@ public record UnifiedRuntimeProcessingApiRequest(
Integer significantDrivingMinutes, Integer significantDrivingMinutes,
Integer minimumRestPeriodMinutes, Integer minimumRestPeriodMinutes,
Boolean includeActivityIntervals, Boolean includeActivityIntervals,
Boolean includeDrivingIntervals Boolean includeDrivingIntervals,
List<UnifiedRuntimeSourceInputApiRequest> sourceInputs
) { ) {
public UnifiedRuntimeProcessingApiRequest(
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId,
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
UnifiedRuntimeEventBackend eventBackend,
Set<UnifiedTachographSourceKind> tachographSourceKinds,
String driverKey,
Set<String> driverKeys,
Boolean includeAllDrivers,
Set<String> vehicleKeys,
Boolean includeAllVehicles,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
Boolean expandVehicleEvents,
Integer vehicleExpansionPaddingMinutes,
Boolean includeIntersectingIntervals,
Integer significantDrivingMinutes,
Integer minimumRestPeriodMinutes,
Boolean includeActivityIntervals,
Boolean includeDrivingIntervals
) {
this(
sessionId,
sessionIds,
compositeSessionId,
tenantKey,
sourceFamilies,
eventBackend,
tachographSourceKinds,
driverKey,
driverKeys,
includeAllDrivers,
vehicleKeys,
includeAllVehicles,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes,
includeIntersectingIntervals,
significantDrivingMinutes,
minimumRestPeriodMinutes,
includeActivityIntervals,
includeDrivingIntervals,
List.of()
);
}
public UnifiedRuntimeProcessingRequest toRuntimeRequest() { public UnifiedRuntimeProcessingRequest toRuntimeRequest() {
return new UnifiedRuntimeProcessingRequest( return new UnifiedRuntimeProcessingRequest(
sessionId, sessionId,
@ -56,7 +112,11 @@ public record UnifiedRuntimeProcessingApiRequest(
occurredTo, occurredTo,
expandVehicleEvents == null || expandVehicleEvents, expandVehicleEvents == null || expandVehicleEvents,
vehicleExpansionPaddingMinutes == null ? 0 : Math.max(0, vehicleExpansionPaddingMinutes), vehicleExpansionPaddingMinutes == null ? 0 : Math.max(0, vehicleExpansionPaddingMinutes),
includeIntersectingIntervals == null || includeIntersectingIntervals includeIntersectingIntervals == null || includeIntersectingIntervals,
sourceInputs == null ? List.of() : sourceInputs.stream()
.filter(value -> value != null)
.map(UnifiedRuntimeSourceInputApiRequest::toRuntimeSourceInput)
.toList()
); );
} }

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.processing.dto;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput;
import java.util.List;
import java.util.UUID;
public record UnifiedRuntimeSourceInputApiRequest(
UnifiedEventSourceFamily sourceFamily,
UnifiedRuntimeEventBackend eventBackend,
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId
) {
public UnifiedRuntimeSourceInput toRuntimeSourceInput() {
return new UnifiedRuntimeSourceInput(
sourceFamily,
eventBackend,
sessionId,
sessionIds,
compositeSessionId
);
}
}

View File

@ -193,7 +193,9 @@ public class SupportEvidenceNormalizationModule implements RuntimeProcessingModu
} }
private UUID runtimeSessionId(UnifiedRuntimeProcessingRequest request) { private UUID runtimeSessionId(UnifiedRuntimeProcessingRequest request) {
if (request.compositeSessionId() != null || request.sessionIds().size() > 1) { if (request.normalizedSourceInputs().size() > 1
|| request.compositeSessionId() != null
|| request.sessionIds().size() > 1) {
return null; return null;
} }
return request.sessionIds().size() == 1 ? request.sessionIds().get(0) : request.sessionId(); return request.sessionIds().size() == 1 ? request.sessionIds().get(0) : request.sessionId();

View File

@ -405,7 +405,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
significantDrivingMinutes, significantDrivingMinutes,
minimumRestPeriodMinutes, minimumRestPeriodMinutes,
includeActivityIntervals, includeActivityIntervals,
includeDrivingIntervals includeDrivingIntervals,
sourceSelection.sourceInputs()
); );
} }

View File

@ -6,6 +6,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
@ -29,13 +30,62 @@ public record UnifiedRuntimeProcessingRequest(
OffsetDateTime occurredTo, OffsetDateTime occurredTo,
boolean expandVehicleEvents, boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes, int vehicleExpansionPaddingMinutes,
boolean includeIntersectingIntervals boolean includeIntersectingIntervals,
List<UnifiedRuntimeSourceInput> sourceInputs
) { ) {
public UnifiedRuntimeProcessingRequest(
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId,
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
UnifiedRuntimeEventBackend eventBackend,
Set<UnifiedTachographSourceKind> tachographSourceKinds,
String driverKey,
Set<String> driverKeys,
boolean includeAllDrivers,
Set<String> vehicleKeys,
boolean includeAllVehicles,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes,
boolean includeIntersectingIntervals
) {
this(
sessionId,
sessionIds,
compositeSessionId,
tenantKey,
sourceFamilies,
eventBackend,
tachographSourceKinds,
driverKey,
driverKeys,
includeAllDrivers,
vehicleKeys,
includeAllVehicles,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes,
includeIntersectingIntervals,
List.of()
);
}
public UnifiedRuntimeProcessingRequest { public UnifiedRuntimeProcessingRequest {
if (sourceFamilies == null || sourceFamilies.isEmpty()) { sourceInputs = normalizeSourceInputs(sourceInputs);
sourceFamilies = normalizeSourceFamilies(sourceFamilies, sourceInputs);
if (sourceFamilies.isEmpty()) {
throw new IllegalArgumentException("sourceFamilies must not be empty"); throw new IllegalArgumentException("sourceFamilies must not be empty");
} }
sourceFamilies = Set.copyOf(sourceFamilies);
eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend; eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend;
tachographSourceKinds = normalizeTachographSourceKinds(tachographSourceKinds); tachographSourceKinds = normalizeTachographSourceKinds(tachographSourceKinds);
sessionIds = normalizeSessionIds(sessionId, sessionIds); sessionIds = normalizeSessionIds(sessionId, sessionIds);
@ -60,22 +110,44 @@ public record UnifiedRuntimeProcessingRequest(
driverSourceEntityId = normalize(driverSourceEntityId); driverSourceEntityId = normalize(driverSourceEntityId);
driverCardNation = normalizeUpper(driverCardNation); driverCardNation = normalizeUpper(driverCardNation);
driverCardNumber = normalizeDriverCardNumber(driverCardNumber); driverCardNumber = normalizeDriverCardNumber(driverCardNumber);
boolean includesFileSession = sourceFamilies.contains(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION); String[] inferredDriverCardSelector = inferDriverCardSelector(driverKey, driverCardNation, driverCardNumber);
boolean includesExternalDb = sourceFamilies.stream() driverCardNation = inferredDriverCardSelector[0];
.anyMatch(family -> family == UnifiedEventSourceFamily.TACHOGRAPH_DB || family == UnifiedEventSourceFamily.YELLOWFOX_DB); driverCardNumber = inferredDriverCardSelector[1];
if (includesFileSession && eventBackend == UnifiedRuntimeEventBackend.EVENTHUB_DB) {
throw new IllegalArgumentException("TACHOGRAPH_FILE_SESSION runtime processing currently supports SOURCE_DB backend only."); List<UnifiedRuntimeSourceInput> normalizedSourceInputs = normalizedSourceInputs(
} sourceFamilies,
eventBackend,
sessionId,
sessionIds,
compositeSessionId,
sourceInputs
);
boolean includesFileSession = normalizedSourceInputs.stream()
.anyMatch(input -> input.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION);
boolean includesExternalDb = normalizedSourceInputs.stream()
.map(UnifiedRuntimeSourceInput::sourceFamily)
.anyMatch(UnifiedRuntimeProcessingRequest::isExternalDbFamily);
boolean includesEventHubExternalDb = normalizedSourceInputs.stream()
.anyMatch(input -> input.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB
&& isExternalDbFamily(input.sourceFamily()));
if (tenantKey == null) { if (tenantKey == null) {
if (!includesFileSession || includesExternalDb) { if (!includesFileSession || includesExternalDb) {
throw new IllegalArgumentException("tenantKey must not be blank"); throw new IllegalArgumentException("tenantKey must not be blank");
} }
} }
if (includesFileSession && compositeSessionId == null && sessionIds.isEmpty()) { if (includesFileSession && compositeSessionId == null && sessionIds.isEmpty()
throw new IllegalArgumentException("sessionId, sessionIds or compositeSessionId must be provided when TACHOGRAPH_FILE_SESSION is selected."); && normalizedSourceInputs.stream()
.filter(input -> input.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION)
.allMatch(input -> input.compositeSessionId() == null && input.sessionIds().isEmpty())) {
throw new IllegalArgumentException(
"sessionId, sessionIds or compositeSessionId must be provided when TACHOGRAPH_FILE_SESSION is selected."
);
} }
if (includesFileSession && driverKey == null && driverKeys.isEmpty() && !includeAllDrivers) { if (includesFileSession && driverKey == null && driverKeys.isEmpty() && !includeAllDrivers) {
throw new IllegalArgumentException("driverKey, driverKeys or includeAllDrivers must be provided when TACHOGRAPH_FILE_SESSION is selected."); throw new IllegalArgumentException(
"driverKey, driverKeys or includeAllDrivers must be provided when TACHOGRAPH_FILE_SESSION is selected."
);
} }
if (includesExternalDb && driverSourceEntityId == null && driverCardNumber == null if (includesExternalDb && driverSourceEntityId == null && driverCardNumber == null
&& driverKeys.isEmpty() && !includeAllDrivers) { && driverKeys.isEmpty() && !includeAllDrivers) {
@ -83,14 +155,17 @@ public record UnifiedRuntimeProcessingRequest(
} }
if (includesExternalDb && (includeAllDrivers || !driverKeys.isEmpty()) if (includesExternalDb && (includeAllDrivers || !driverKeys.isEmpty())
&& (occurredFrom == null || occurredTo == null)) { && (occurredFrom == null || occurredTo == null)) {
throw new IllegalArgumentException("occurredFrom and occurredTo are required when loading broad external DB runtime scopes."); throw new IllegalArgumentException(
"occurredFrom and occurredTo are required when loading broad external DB runtime scopes."
);
} }
if (eventBackend == UnifiedRuntimeEventBackend.EVENTHUB_DB if (includesEventHubExternalDb
&& includesExternalDb
&& driverSourceEntityId == null && driverSourceEntityId == null
&& driverCardNumber == null && driverCardNumber == null
&& (includeAllDrivers || !driverKeys.isEmpty())) { && (includeAllDrivers || !driverKeys.isEmpty())) {
throw new IllegalArgumentException("Broad multi-driver EVENTHUB_DB runtime scopes are not supported yet; provide a concrete EventHub driver selector or use SOURCE_DB."); throw new IllegalArgumentException(
"Broad multi-driver EVENTHUB_DB runtime scopes are not supported yet; provide a concrete EventHub driver selector or use SOURCE_DB."
);
} }
if (occurredFrom != null && occurredTo != null && occurredTo.isBefore(occurredFrom)) { if (occurredFrom != null && occurredTo != null && occurredTo.isBefore(occurredFrom)) {
throw new IllegalArgumentException("occurredTo must not be before occurredFrom"); throw new IllegalArgumentException("occurredTo must not be before occurredFrom");
@ -353,6 +428,44 @@ public record UnifiedRuntimeProcessingRequest(
return occurredTo == null ? null : occurredTo.plusMinutes(vehicleExpansionPaddingMinutes); return occurredTo == null ? null : occurredTo.plusMinutes(vehicleExpansionPaddingMinutes);
} }
public List<UnifiedRuntimeSourceInput> normalizedSourceInputs() {
return normalizedSourceInputs(
sourceFamilies,
eventBackend,
sessionId,
sessionIds,
compositeSessionId,
sourceInputs
);
}
public UnifiedRuntimeProcessingRequest forSourceInput(UnifiedRuntimeSourceInput sourceInput) {
Objects.requireNonNull(sourceInput, "sourceInput must not be null");
return new UnifiedRuntimeProcessingRequest(
sourceInput.sessionId(),
sourceInput.sessionIds(),
sourceInput.compositeSessionId(),
tenantKey,
Set.of(sourceInput.sourceFamily()),
sourceInput.eventBackend(),
tachographSourceKinds,
driverKey,
driverKeys,
includeAllDrivers,
vehicleKeys,
includeAllVehicles,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes,
includeIntersectingIntervals,
List.of(sourceInput)
);
}
private static List<UUID> normalizeSessionIds(UUID sessionId, List<UUID> sessionIds) { private static List<UUID> normalizeSessionIds(UUID sessionId, List<UUID> sessionIds) {
LinkedHashSet<UUID> result = new LinkedHashSet<>(); LinkedHashSet<UUID> result = new LinkedHashSet<>();
if (sessionId != null) { if (sessionId != null) {
@ -385,7 +498,8 @@ public record UnifiedRuntimeProcessingRequest(
occurredTo, occurredTo,
expandVehicleEvents, expandVehicleEvents,
vehicleExpansionPaddingMinutes, vehicleExpansionPaddingMinutes,
includeIntersectingIntervals includeIntersectingIntervals,
sourceInputs
); );
} }
@ -410,6 +524,68 @@ public record UnifiedRuntimeProcessingRequest(
.toList(); .toList();
} }
private static Set<UnifiedEventSourceFamily> normalizeSourceFamilies(
Set<UnifiedEventSourceFamily> sourceFamilies,
List<UnifiedRuntimeSourceInput> sourceInputs
) {
LinkedHashSet<UnifiedEventSourceFamily> normalized = new LinkedHashSet<>();
if (sourceFamilies != null) {
normalized.addAll(sourceFamilies.stream().filter(Objects::nonNull).toList());
}
if (sourceInputs != null) {
sourceInputs.stream()
.filter(Objects::nonNull)
.map(UnifiedRuntimeSourceInput::sourceFamily)
.forEach(normalized::add);
}
return Set.copyOf(normalized);
}
private static List<UnifiedRuntimeSourceInput> normalizeSourceInputs(List<UnifiedRuntimeSourceInput> sourceInputs) {
if (sourceInputs == null || sourceInputs.isEmpty()) {
return List.of();
}
LinkedHashSet<UnifiedRuntimeSourceInput> normalized = new LinkedHashSet<>();
sourceInputs.stream()
.filter(Objects::nonNull)
.forEach(normalized::add);
return List.copyOf(normalized);
}
private static List<UnifiedRuntimeSourceInput> normalizedSourceInputs(
Set<UnifiedEventSourceFamily> sourceFamilies,
UnifiedRuntimeEventBackend eventBackend,
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId,
List<UnifiedRuntimeSourceInput> sourceInputs
) {
if (sourceInputs != null && !sourceInputs.isEmpty()) {
return List.copyOf(sourceInputs);
}
List<UnifiedRuntimeSourceInput> normalized = new ArrayList<>();
for (UnifiedEventSourceFamily sourceFamily : sourceFamilies == null ? Set.<UnifiedEventSourceFamily>of() : sourceFamilies) {
if (sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) {
normalized.add(new UnifiedRuntimeSourceInput(
sourceFamily,
eventBackend,
sessionId,
sessionIds,
compositeSessionId
));
} else {
normalized.add(new UnifiedRuntimeSourceInput(
sourceFamily,
eventBackend,
null,
List.of(),
null
));
}
}
return List.copyOf(normalized);
}
private static Set<String> normalizeStrings(Set<String> values) { private static Set<String> normalizeStrings(Set<String> values) {
if (values == null || values.isEmpty()) { if (values == null || values.isEmpty()) {
return Set.of(); return Set.of();
@ -433,6 +609,10 @@ public record UnifiedRuntimeProcessingRequest(
return Set.copyOf(values); return Set.copyOf(values);
} }
private static boolean isExternalDbFamily(UnifiedEventSourceFamily family) {
return family == UnifiedEventSourceFamily.TACHOGRAPH_DB || family == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
private static String normalize(String value) { private static String normalize(String value) {
return value == null || value.isBlank() ? null : value.trim(); return value == null || value.isBlank() ? null : value.trim();
} }
@ -444,4 +624,39 @@ public record UnifiedRuntimeProcessingRequest(
private static String normalizeDriverCardNumber(String value) { private static String normalizeDriverCardNumber(String value) {
return DriverCardNumberNormalizer.canonical(value); return DriverCardNumberNormalizer.canonical(value);
} }
private static String[] inferDriverCardSelector(
String driverKey,
String driverCardNation,
String driverCardNumber
) {
if (driverCardNumber != null) {
return new String[]{driverCardNation, driverCardNumber};
}
if (driverKey == null) {
return new String[]{driverCardNation, driverCardNumber};
}
int separator = driverKey.indexOf(':');
if (separator <= 0 || separator >= driverKey.length() - 1) {
return new String[]{driverCardNation, driverCardNumber};
}
String prefix = driverKey.substring(0, separator).trim();
if (!looksLikeDriverCardNation(prefix)) {
return new String[]{driverCardNation, driverCardNumber};
}
return new String[]{
driverCardNation == null ? normalizeUpper(prefix) : driverCardNation,
normalizeDriverCardNumber(driverKey.substring(separator + 1))
};
}
private static boolean looksLikeDriverCardNation(String value) {
if (value == null || value.isBlank()) {
return false;
}
String trimmed = value.trim();
return trimmed.matches("\\d+")
|| trimmed.matches("(?i)[A-Z]{2}")
|| trimmed.matches("(?i)UNKNOWN\\s+\\d+");
}
} }

View File

@ -0,0 +1,53 @@
package at.procon.eventhub.processing.model;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.UUID;
public record UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily sourceFamily,
UnifiedRuntimeEventBackend eventBackend,
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId
) {
public UnifiedRuntimeSourceInput {
if (sourceFamily == null) {
throw new IllegalArgumentException("sourceFamily must not be null");
}
eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend;
sessionIds = normalizeSessionIds(sessionId, sessionIds);
if (sessionId == null && !sessionIds.isEmpty()) {
sessionId = sessionIds.get(0);
}
if (compositeSessionId != null && !sessionIds.isEmpty()) {
throw new IllegalArgumentException("Use either compositeSessionId or sessionId/sessionIds, not both.");
}
if (sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) {
if (eventBackend == UnifiedRuntimeEventBackend.EVENTHUB_DB) {
throw new IllegalArgumentException("TACHOGRAPH_FILE_SESSION runtime processing currently supports SOURCE_DB backend only.");
}
if (compositeSessionId == null && sessionIds.isEmpty()) {
throw new IllegalArgumentException(
"sessionId, sessionIds or compositeSessionId must be provided for TACHOGRAPH_FILE_SESSION source inputs."
);
}
} else if (compositeSessionId != null || !sessionIds.isEmpty()) {
throw new IllegalArgumentException(
"sessionId, sessionIds or compositeSessionId are supported for TACHOGRAPH_FILE_SESSION source inputs only."
);
}
}
private static List<UUID> normalizeSessionIds(UUID sessionId, List<UUID> sessionIds) {
LinkedHashSet<UUID> result = new LinkedHashSet<>();
if (sessionId != null) {
result.add(sessionId);
}
if (sessionIds != null) {
result.addAll(sessionIds.stream().filter(value -> value != null).toList());
}
return List.copyOf(new ArrayList<>(result));
}
}

View File

@ -199,7 +199,9 @@ public class UnifiedRuntimeDerivedProjectionService {
} }
private UUID runtimeSessionId(UnifiedRuntimeProcessingRequest request) { private UUID runtimeSessionId(UnifiedRuntimeProcessingRequest request) {
if (request.compositeSessionId() != null || request.sessionIds().size() > 1) { if (request.normalizedSourceInputs().size() > 1
|| request.compositeSessionId() != null
|| request.sessionIds().size() > 1) {
return null; return null;
} }
return request.sessionIds().size() == 1 ? request.sessionIds().get(0) : request.sessionId(); return request.sessionIds().size() == 1 ? request.sessionIds().get(0) : request.sessionId();

View File

@ -7,6 +7,8 @@ import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -29,6 +31,7 @@ public class UnifiedRuntimeEventAssemblyService {
} }
public UnifiedRuntimeEventBundle assembleDriverScopedEvents(UnifiedRuntimeProcessingRequest request) { public UnifiedRuntimeEventBundle assembleDriverScopedEvents(UnifiedRuntimeProcessingRequest request) {
List<UnifiedRuntimeSourceInput> sourceInputs = request.normalizedSourceInputs();
List<EventHubEventDto> driverSeedEvents = loadDriverSeedEvents(request); List<EventHubEventDto> driverSeedEvents = loadDriverSeedEvents(request);
List<UnifiedDiscoveredVehicleRef> discoveredVehicles = discoverVehicles(driverSeedEvents); List<UnifiedDiscoveredVehicleRef> discoveredVehicles = discoverVehicles(driverSeedEvents);
List<EventHubEventDto> expandedVehicleEvents = request.expandVehicleEvents() List<EventHubEventDto> expandedVehicleEvents = request.expandVehicleEvents()
@ -37,16 +40,27 @@ public class UnifiedRuntimeEventAssemblyService {
List<EventHubEventDto> mergedEvents = deduplicateAndSort(driverSeedEvents, expandedVehicleEvents); List<EventHubEventDto> mergedEvents = deduplicateAndSort(driverSeedEvents, expandedVehicleEvents);
List<String> notes = new ArrayList<>(); List<String> notes = new ArrayList<>();
notes.add(request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB boolean includesEventHub = sourceInputs.stream()
? "Driver seed events were loaded from the local EventHub event store." .anyMatch(input -> input.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB);
: "Driver seed events were loaded directly from the selected runtime sources."); boolean includesSourceDb = sourceInputs.stream()
if (request.sourceFamilies().contains(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION)) { .anyMatch(input -> input.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB);
if (request.compositeSessionId() != null) { if (includesEventHub && includesSourceDb) {
notes.add("Tachograph file-session events were loaded from composite session " + request.compositeSessionId() + "."); notes.add("Driver seed events were loaded from a mixed runtime scope spanning the local EventHub event store and direct source systems.");
} else if (request.sessionIds().size() > 1) { } else if (includesEventHub) {
notes.add("Tachograph file-session events were loaded from " + request.sessionIds().size() + " selected sessions."); notes.add("Driver seed events were loaded from the local EventHub event store.");
} else if (request.sessionId() != null) { } else {
notes.add("Tachograph file-session events were loaded from session " + request.sessionId() + "."); notes.add("Driver seed events were loaded directly from the selected runtime sources.");
}
for (UnifiedRuntimeSourceInput sourceInput : sourceInputs) {
if (sourceInput.sourceFamily() != UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) {
continue;
}
if (sourceInput.compositeSessionId() != null) {
notes.add("Tachograph file-session events were loaded from composite session " + sourceInput.compositeSessionId() + ".");
} else if (sourceInput.sessionIds().size() > 1) {
notes.add("Tachograph file-session events were loaded from " + sourceInput.sessionIds().size() + " selected sessions.");
} else if (sourceInput.sessionId() != null) {
notes.add("Tachograph file-session events were loaded from session " + sourceInput.sessionId() + ".");
} }
} }
if (request.expandVehicleEvents()) { if (request.expandVehicleEvents()) {
@ -67,12 +81,13 @@ public class UnifiedRuntimeEventAssemblyService {
private List<EventHubEventDto> loadDriverSeedEvents(UnifiedRuntimeProcessingRequest request) { private List<EventHubEventDto> loadDriverSeedEvents(UnifiedRuntimeProcessingRequest request) {
List<EventHubEventDto> result = new ArrayList<>(); List<EventHubEventDto> result = new ArrayList<>();
for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) { for (UnifiedRuntimeSourceInput sourceInput : request.normalizedSourceInputs()) {
RuntimeDriverEventLoader loader = driverLoader(request, sourceFamily); UnifiedRuntimeProcessingRequest sourceRequest = request.forSourceInput(sourceInput);
RuntimeDriverEventLoader loader = driverLoader(sourceRequest, sourceInput.sourceFamily());
if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) { if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) {
result.addAll(eventHubLoader.loadDriverEvents(request, sourceFamily)); result.addAll(eventHubLoader.loadDriverEvents(sourceRequest, sourceInput.sourceFamily()));
} else { } else {
result.addAll(loader.loadDriverEvents(request)); result.addAll(loader.loadDriverEvents(sourceRequest));
} }
} }
return deduplicateAndSort(result, List.of()); return deduplicateAndSort(result, List.of());
@ -84,12 +99,13 @@ public class UnifiedRuntimeEventAssemblyService {
) { ) {
List<EventHubEventDto> result = new ArrayList<>(); List<EventHubEventDto> result = new ArrayList<>();
for (UnifiedDiscoveredVehicleRef vehicleRef : discoveredVehicles) { for (UnifiedDiscoveredVehicleRef vehicleRef : discoveredVehicles) {
for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) { for (UnifiedRuntimeSourceInput sourceInput : request.normalizedSourceInputs()) {
RuntimeVehicleEventLoader loader = vehicleLoader(request, sourceFamily); UnifiedRuntimeProcessingRequest sourceRequest = request.forSourceInput(sourceInput);
RuntimeVehicleEventLoader loader = vehicleLoader(sourceRequest, sourceInput.sourceFamily());
if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) { if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) {
result.addAll(eventHubLoader.loadVehicleEvents(request, sourceFamily, vehicleRef)); result.addAll(eventHubLoader.loadVehicleEvents(sourceRequest, sourceInput.sourceFamily(), vehicleRef));
} else { } else {
result.addAll(loader.loadVehicleEvents(request, vehicleRef)); result.addAll(loader.loadVehicleEvents(sourceRequest, vehicleRef));
} }
} }
} }
@ -151,28 +167,33 @@ public class UnifiedRuntimeEventAssemblyService {
private void appendDeduplicated(LinkedHashMap<String, EventHubEventDto> byKey, List<EventHubEventDto> events) { private void appendDeduplicated(LinkedHashMap<String, EventHubEventDto> byKey, List<EventHubEventDto> events) {
for (EventHubEventDto event : events) { for (EventHubEventDto event : events) {
byKey.putIfAbsent(dedupKey(event), event); byKey.putIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), event);
} }
} }
private String dedupKey(EventHubEventDto event) {
String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null
? event.packageInfo().eventSource().stableKey()
: "NO_SOURCE";
return sourceKey + "|" + event.externalSourceEventId();
}
private RuntimeDriverEventLoader driverLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { private RuntimeDriverEventLoader driverLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return driverEventLoaders.stream() return driverEventLoaders.stream()
.filter(loader -> loader.supports(request, sourceFamily)) .filter(loader -> loader.supports(request, sourceFamily))
.findFirst() .findFirst()
.orElseThrow(() -> new IllegalArgumentException("No runtime driver event loader is registered for source family " + sourceFamily + ".")); .orElseThrow(() -> new IllegalArgumentException(
"No runtime driver event loader is registered for source family "
+ sourceFamily
+ " and backend "
+ request.eventBackend()
+ "."
));
} }
private RuntimeVehicleEventLoader vehicleLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { private RuntimeVehicleEventLoader vehicleLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return vehicleEventLoaders.stream() return vehicleEventLoaders.stream()
.filter(loader -> loader.supports(request, sourceFamily)) .filter(loader -> loader.supports(request, sourceFamily))
.findFirst() .findFirst()
.orElseThrow(() -> new IllegalArgumentException("No runtime vehicle event loader is registered for source family " + sourceFamily + ".")); .orElseThrow(() -> new IllegalArgumentException(
"No runtime vehicle event loader is registered for source family "
+ sourceFamily
+ " and backend "
+ request.eventBackend()
+ "."
));
} }
} }

View File

@ -23,6 +23,7 @@ import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingP
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy; import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionResultDto; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionResultDto;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionService; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionService;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingPlanDescriptorDto; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingPlanDescriptorDto;
import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographDriverParityResultDto; import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographDriverParityResultDto;
import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographParityCategoryComparisonDto; import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographParityCategoryComparisonDto;
@ -378,6 +379,91 @@ class UnifiedRuntimeProcessingControllerTest {
.andExpect(jsonPath("$.partitioningStrategy").value("DRIVER")); .andExpect(jsonPath("$.partitioningStrategy").value("DRIVER"));
} }
@Test
void runsRuntimeProcessingExecutionWithExplicitSourceInputsViaRuntimeApi() throws Exception {
UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class);
UnifiedRuntimeDriverTimelineService timelineService = org.mockito.Mockito.mock(UnifiedRuntimeDriverTimelineService.class);
UnifiedRuntimeDerivedProjectionService derivedProjectionService = org.mockito.Mockito.mock(UnifiedRuntimeDerivedProjectionService.class);
RuntimeProcessingExecutionService executionService = org.mockito.Mockito.mock(RuntimeProcessingExecutionService.class);
MockMvc mockMvc = MockMvcBuilders.standaloneSetup(new UnifiedRuntimeProcessingController(
eventAssemblyService,
timelineService,
derivedProjectionService,
null,
null,
null,
null,
executionService
))
.setMessageConverters(new MappingJackson2HttpMessageConverter(objectMapper))
.setControllerAdvice(new UnifiedRuntimeProcessingExceptionHandler())
.build();
when(executionService.execute(any()))
.thenAnswer(invocation -> {
RuntimeProcessingExecutionApiRequest apiRequest = invocation.getArgument(0);
UnifiedRuntimeProcessingRequest request = apiRequest.sourceSelection().toRuntimeRequest();
return new RuntimeProcessingExecutionResultDto(
"driver-working-time-v1",
List.of("event-to-activity-intervals"),
RuntimeEventPartitioningStrategy.DRIVER,
request,
3,
1,
1,
List.of(new UnifiedDiscoveredVehicleRef("VEH-1", "VIN-1", "12", "REG-1")),
Map.of(),
List.of("execution"),
List.of()
);
});
mockMvc.perform(post("/api/eventhub/runtime-processing/executions")
.contentType("application/json")
.content("""
{
"processingPlanKey": "driver-working-time-v1",
"sourceSelection": {
"tenantKey": "default",
"driverKey": "12:12345678901234",
"occurredFrom": "2026-05-01T08:00:00Z",
"occurredTo": "2026-05-01T10:00:00Z",
"sourceInputs": [
{
"sourceFamily": "TACHOGRAPH_FILE_SESSION",
"eventBackend": "SOURCE_DB",
"sessionIds": [
"11111111-1111-1111-1111-111111111111",
"22222222-2222-2222-2222-222222222222"
]
},
{
"sourceFamily": "TACHOGRAPH_DB",
"eventBackend": "EVENTHUB_DB"
},
{
"sourceFamily": "YELLOWFOX_DB",
"eventBackend": "SOURCE_DB"
}
]
},
"partitioning": {
"strategy": "DRIVER"
}
}
"""))
.andExpect(status().isOk())
.andExpect(jsonPath("$.processingPlanKey").value("driver-working-time-v1"))
.andExpect(jsonPath("$.request.sourceInputs[0].sourceFamily").value("TACHOGRAPH_FILE_SESSION"))
.andExpect(jsonPath("$.request.sourceInputs[0].eventBackend").value("SOURCE_DB"))
.andExpect(jsonPath("$.request.sourceInputs[0].sessionIds[0]").value("11111111-1111-1111-1111-111111111111"))
.andExpect(jsonPath("$.request.sourceInputs[1].sourceFamily").value("TACHOGRAPH_DB"))
.andExpect(jsonPath("$.request.sourceInputs[1].eventBackend").value("EVENTHUB_DB"))
.andExpect(jsonPath("$.request.sourceInputs[2].sourceFamily").value("YELLOWFOX_DB"))
.andExpect(jsonPath("$.request.driverCardNation").value("12"))
.andExpect(jsonPath("$.request.driverCardNumber").value("12345678901234"));
}
@Test @Test
void listsRuntimeEventProcessingProfilesViaRuntimeApi() throws Exception { void listsRuntimeEventProcessingProfilesViaRuntimeApi() throws Exception {
UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class); UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class);

View File

@ -138,6 +138,140 @@ class UnifiedRuntimeProcessingRequestTest {
assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.SOURCE_DB); assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.SOURCE_DB);
} }
@Test
void infersDriverCardSelectorFromDriverKeyForMixedSourceScope() {
UUID sessionId = UUID.randomUUID();
UnifiedRuntimeProcessingRequest request = new UnifiedRuntimeProcessingRequest(
sessionId,
List.of(),
null,
"default",
Set.of(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedEventSourceFamily.YELLOWFOX_DB
),
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
"12:1234567890123456",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
10,
true
);
assertThat(request.driverKey()).isEqualTo("12:1234567890123456");
assertThat(request.driverCardNation()).isEqualTo("12");
assertThat(request.driverCardNumber()).isEqualTo("12345678901234");
assertThat(request.sourceFamilies()).containsExactlyInAnyOrder(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedEventSourceFamily.YELLOWFOX_DB
);
}
@Test
void supportsExplicitMixedBackendSourceInputs() {
UUID sessionId = UUID.randomUUID();
UnifiedRuntimeProcessingRequest request = new UnifiedRuntimeProcessingRequest(
null,
List.of(),
null,
"default",
Set.of(),
null,
null,
"12:1234567890123456",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
10,
true,
List.of(
new UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedRuntimeEventBackend.SOURCE_DB,
sessionId,
List.of(),
null
),
new UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedRuntimeEventBackend.EVENTHUB_DB,
null,
List.of(),
null
),
new UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.YELLOWFOX_DB,
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
List.of(),
null
)
)
);
assertThat(request.sourceFamilies()).containsExactlyInAnyOrder(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedEventSourceFamily.YELLOWFOX_DB
);
assertThat(request.normalizedSourceInputs()).hasSize(3);
assertThat(request.normalizedSourceInputs()).extracting(UnifiedRuntimeSourceInput::eventBackend)
.containsExactly(
UnifiedRuntimeEventBackend.SOURCE_DB,
UnifiedRuntimeEventBackend.EVENTHUB_DB,
UnifiedRuntimeEventBackend.SOURCE_DB
);
assertThat(request.driverCardNation()).isEqualTo("12");
assertThat(request.driverCardNumber()).isEqualTo("12345678901234");
}
@Test
void doesNotInferDriverCardSelectorFromNonCardDriverKey() {
UnifiedRuntimeProcessingRequest request = new UnifiedRuntimeProcessingRequest(
null,
List.of(),
null,
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB),
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
"DRIVER:42",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
0,
true
);
assertThat(request.driverCardNation()).isNull();
assertThat(request.driverCardNumber()).isNull();
}
@Test @Test

View File

@ -16,6 +16,8 @@ import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.List; import java.util.List;
@ -95,6 +97,146 @@ class UnifiedRuntimeEventAssemblyServiceTest {
.containsExactly("SEED-1", "SEED-2"); .containsExactly("SEED-1", "SEED-2");
} }
@Test
void deduplicatesEquivalentCrossSourceActivityEventsByCanonicalIdentity() {
UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService(
List.of(new FakeTachographDbLoader(), new FakeYellowFoxDbLoader()),
List.of()
);
UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents(
UnifiedRuntimeProcessingRequest.forDriverFromEventHub(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB),
"DRIVER:42",
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0
)
);
assertThat(bundle.driverSeedEvents()).hasSize(1);
assertThat(bundle.mergedEvents()).hasSize(1);
assertThat(bundle.mergedEvents().get(0).occurredAt()).isEqualTo(OffsetDateTime.parse("2026-05-01T08:00:00Z"));
assertThat(bundle.mergedEvents().get(0).eventType()).isEqualTo(EventType.DRIVE);
assertThat(bundle.mergedEvents().get(0).lifecycle()).isEqualTo(EventLifecycle.START);
}
@Test
void assemblesMixedFileSessionAndDirectDbSourcesUnderSourceDbBackend() {
UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService(
List.of(
new FakeFileSessionLoader(),
new FakeDirectTachographDbLoader(),
new FakeDirectYellowFoxDbLoader()
),
List.of()
);
UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents(
new UnifiedRuntimeProcessingRequest(
UUID.fromString("11111111-1111-1111-1111-111111111111"),
List.of(),
null,
"default",
Set.of(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedEventSourceFamily.YELLOWFOX_DB
),
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
"12:1234567890123456",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0,
true
)
);
assertThat(bundle.driverSeedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("FILE-SESSION-1", "TACHO-DB-1", "YF-DB-1");
}
@Test
void assemblesMixedBackendSourcesFromExplicitSourceInputs() {
UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService(
List.of(
new FakeFileSessionLoader(),
new FakeTachographDbLoader(),
new FakeDirectYellowFoxDbLoader()
),
List.of()
);
UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents(
new UnifiedRuntimeProcessingRequest(
null,
List.of(),
null,
"default",
Set.of(),
null,
null,
"12:1234567890123456",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0,
true,
List.of(
new at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedRuntimeEventBackend.SOURCE_DB,
UUID.fromString("11111111-1111-1111-1111-111111111111"),
List.of(),
null
),
new at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedRuntimeEventBackend.EVENTHUB_DB,
null,
List.of(),
null
),
new at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.YELLOWFOX_DB,
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
List.of(),
null
)
)
)
);
assertThat(bundle.driverSeedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("FILE-SESSION-1", "TACHO-ACT-1", "YF-DB-1");
assertThat(bundle.notes()).anySatisfy(note -> assertThat(note).contains("mixed runtime scope"));
}
private static final class FakeLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader { private static final class FakeLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader {
@Override @Override
@ -171,4 +313,167 @@ class UnifiedRuntimeEventAssemblyServiceTest {
); );
} }
} }
private static final class FakeTachographDbLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB
&& sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"TACHO-ACT-1",
source("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD"),
"2026-05-01T08:00:00Z",
"2026-05-01T08:00:00Z",
"2026-05-01T09:00:00Z"
));
}
}
private static final class FakeYellowFoxDbLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB
&& sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"YF-ACT-99",
source("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8"),
"2026-05-01T08:00:00Z",
"2026-05-01T08:00:00Z",
"2026-05-01T09:00:00Z"
));
}
}
private static final class FakeFileSessionLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB
&& sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"FILE-SESSION-1",
source("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_FILE_SESSION"),
"2026-05-01T07:55:00Z",
"2026-05-01T07:55:00Z",
"2026-05-01T08:25:00Z"
));
}
}
private static final class FakeDirectTachographDbLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB
&& sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"TACHO-DB-1",
source("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD"),
"2026-05-01T08:30:00Z",
"2026-05-01T08:30:00Z",
"2026-05-01T09:00:00Z"
));
}
}
private static final class FakeDirectYellowFoxDbLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB
&& sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"YF-DB-1",
source("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8"),
"2026-05-01T09:05:00Z",
"2026-05-01T09:05:00Z",
"2026-05-01T09:35:00Z"
));
}
}
private static EventHubEventDto canonicalActivityEvent(
String externalId,
EventSourceDto source,
String occurredAt,
String startedAt,
String endedAt
) {
ObjectNode raw = JsonNodeFactory.instance.objectNode();
raw.put("intervalId", externalId + "-INTERVAL");
raw.put("startedAt", startedAt);
raw.put("endedAt", endedAt);
raw.put("driverKey", "DRIVER:42");
raw.put("activityType", "DRIVE");
raw.put("cardSlot", "DRIVER");
raw.put("cardStatus", "INSERTED");
raw.put("drivingStatus", "SINGLE");
raw.put("registrationKey", "AT:W-1");
raw.put("vehicleKey", "VIN-1");
raw.put("sourceKind", source.sourceKind());
raw.putArray("sourceRowIds").add(externalId + "-ROW");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
return new EventHubEventDto(
UUID.randomUUID(),
externalId,
new DriverRefDto("DRIVER:42", null),
new VehicleRefDto(
"VEH-1",
"VIN-1",
"VEH-1",
new VehicleRegistrationRefDto("AT", "W-1")
),
OffsetDateTime.parse(occurredAt),
null,
OffsetDateTime.parse(occurredAt),
EventDomain.DRIVER_ACTIVITY,
EventType.DRIVE,
EventLifecycle.START,
null,
null,
null,
null,
payload,
false,
new at.procon.eventhub.dto.EventHubPackageRequest(
"default",
source,
null,
ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z")
),
EventDomain.DRIVER_ACTIVITY.name(),
LocalDate.parse("2026-05-01"),
source.stableKey() + ":DRIVER_ACTIVITY:2026-05-01"
)
);
}
private static EventSourceDto source(String provider, String sourceKind, String sourceKey) {
return new EventSourceDto(provider, sourceKind, sourceKey, null, null, null);
}
} }