Compare commits

...

3 Commits

Author SHA1 Message Date
trifonovt ff12953e05 Propagate interval metadata through runtime pairing 2026-06-05 12:13:05 +02:00
trifonovt 6ba2df1a61 Add mixed-backend runtime source inputs 2026-06-05 12:13:04 +02:00
trifonovt 6bef8becf9 Adjust runtime interval loading and result projections 2026-06-05 12:13:04 +02:00
48 changed files with 2122 additions and 192 deletions

View File

@ -12,10 +12,36 @@ Use:
```json
{
"processingPlanKey": "driver-working-time-v1"
"processingPlanKey": "driver-working-time-v1",
"sourceSelection": {
"tenantKey": "default",
"driverKey": "12:12345678901234",
"occurredFrom": "2026-05-01T00:00:00Z",
"occurredTo": "2026-05-31T23:59:59Z",
"sourceInputs": [
{
"sourceFamily": "TACHOGRAPH_FILE_SESSION",
"eventBackend": "SOURCE_DB",
"sessionIds": [
"11111111-1111-1111-1111-111111111111",
"22222222-2222-2222-2222-222222222222"
]
},
{
"sourceFamily": "TACHOGRAPH_DB",
"eventBackend": "EVENTHUB_DB"
},
{
"sourceFamily": "YELLOWFOX_DB",
"eventBackend": "SOURCE_DB"
}
]
}
}
```
Use `sourceInputs` when one runtime request must mix direct-source and EventHub-backed inputs. Keep `sourceFamilies + eventBackend` only for legacy same-backend scopes.
## Canonical input idea
The plan should work with events from any source once they are normalized into canonical EventHub event semantics:

View File

@ -54,13 +54,28 @@ The important design point is that runtime processing is not tachograph-specific
{
"processingPlanKey": "driver-working-time-v1",
"sourceSelection": {
"sessionIds": [
"11111111-1111-1111-1111-111111111111",
"22222222-2222-2222-2222-222222222222"
],
"sourceFamilies": ["TACHOGRAPH_FILE_SESSION", "YELLOWFOX_DB"],
"tenantKey": "default",
"driverKey": "12:12345678901234",
"occurredFrom": "2026-05-01T00:00:00Z",
"occurredTo": "2026-05-31T23:59:59Z",
"sourceInputs": [
{
"sourceFamily": "TACHOGRAPH_FILE_SESSION",
"eventBackend": "SOURCE_DB",
"sessionIds": [
"11111111-1111-1111-1111-111111111111",
"22222222-2222-2222-2222-222222222222"
]
},
{
"sourceFamily": "TACHOGRAPH_DB",
"eventBackend": "EVENTHUB_DB"
},
{
"sourceFamily": "YELLOWFOX_DB",
"eventBackend": "SOURCE_DB"
}
],
"expandVehicleEvents": true
},
"partitioning": {
@ -78,6 +93,8 @@ The important design point is that runtime processing is not tachograph-specific
}
```
`sourceFamilies + eventBackend` is still accepted as a legacy shorthand when every selected family uses the same backend. Use `sourceInputs` when one runtime request must mix direct-source and EventHub-backed inputs.
## Conceptual flow
```text

View File

@ -40,6 +40,8 @@ with:
also remains available, but new clients should use `processingPlanKey` and `/executions`.
When the runtime scope must mix file sessions, direct source databases, and EventHub-backed canonical events in one request, use `/executions` with per-source `sourceInputs`. The tachograph compatibility endpoints remain useful for file-session-centric scopes, but they are no longer the best shape for mixed-source runtime execution.
The current `driver-working-time-v1` plan uses these modules:
```text

View File

@ -24,6 +24,38 @@
{
"key": "driverKey",
"value": "12:12345678901234"
},
{
"key": "sessionId1",
"value": "11111111-1111-1111-1111-111111111111"
},
{
"key": "sessionId2",
"value": "22222222-2222-2222-2222-222222222222"
},
{
"key": "tenantKey",
"value": "default"
},
{
"key": "driverSourceEntityId",
"value": "DRIVER:42"
},
{
"key": "driverCardNation",
"value": "12"
},
{
"key": "driverCardNumber",
"value": "12345678901234"
},
{
"key": "occurredFrom",
"value": "2026-05-01T00:00:00Z"
},
{
"key": "occurredTo",
"value": "2026-05-31T23:59:59Z"
}
],
"item": [
@ -132,7 +164,7 @@
}
},
{
"name": "Execute processing plan - driver working time source DB mixed sources",
"name": "Execute processing plan - driver working time mixed backends and sources",
"request": {
"method": "POST",
"header": [
@ -155,7 +187,7 @@
},
"body": {
"mode": "raw",
"raw": "{\n \"processingPlanKey\": \"driver-working-time-v1\",\n \"sourceSelection\": {\n \"tenantKey\": \"{{tenantKey}}\",\n \"sourceFamilies\": [\n \"TACHOGRAPH_DB\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"SOURCE_DB\",\n \"driverSourceEntityId\": \"{{driverSourceEntityId}}\",\n \"driverCardNation\": \"{{driverCardNation}}\",\n \"driverCardNumber\": \"{{driverCardNumber}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": false,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n}"
"raw": "{\n \"processingPlanKey\": \"driver-working-time-v1\",\n \"sourceSelection\": {\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": false,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n}"
}
}
},
@ -334,7 +366,7 @@
],
"body": {
"mode": "raw",
"raw": "{\n \"processingRequest\": {\n \"profileKey\": \"tachograph-driver-esper-v1\",\n \"scope\": {\n \"sessionIds\": [\n \"{{sessionId1}}\",\n \"{{sessionId2}}\"\n ],\n \"sourceFamilies\": [\n \"TACHOGRAPH_FILE_SESSION\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"SOURCE_DB\",\n \"occurredFrom\": \"2026-05-01T00:00:00Z\",\n \"occurredTo\": \"2026-05-31T23:59:59Z\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": true,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n },\n \"minimumAttachedVehicleEvidenceEvents\": 1,\n \"minimumNormalizedSupportEvidenceEvents\": 1,\n \"failWhenPartitionDebugMissing\": true\n}"
"raw": "{\n \"processingRequest\": {\n \"profileKey\": \"tachograph-driver-esper-v1\",\n \"scope\": {\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId1}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n },\n \"partitioning\": {\n \"strategy\": \"DRIVER\",\n \"includeAllPartitions\": true,\n \"attachVehicleEvidence\": true,\n \"vehicleEvidencePaddingMinutes\": 15,\n \"includeDebug\": true\n },\n \"parameters\": {\n \"significantDrivingMinutes\": 3,\n \"minimumRestPeriodMinutes\": 720,\n \"includePartitionDebug\": true\n }\n },\n \"minimumAttachedVehicleEvidenceEvents\": 1,\n \"minimumNormalizedSupportEvidenceEvents\": 1,\n \"failWhenPartitionDebugMissing\": true\n}"
},
"url": {
"raw": "{{baseUrl}}/api/eventhub/runtime-processing/event-processing/validation/mixed-source-evidence",
@ -493,7 +525,7 @@
}
},
{
"name": "Runtime diagnostics - driver events from source DB",
"name": "Runtime diagnostics - driver events from mixed backends and sources",
"request": {
"method": "POST",
"header": [
@ -516,12 +548,12 @@
},
"body": {
"mode": "raw",
"raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"sourceFamilies\": [\n \"TACHOGRAPH_DB\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"SOURCE_DB\",\n \"driverSourceEntityId\": \"{{driverSourceEntityId}}\",\n \"driverCardNation\": \"{{driverCardNation}}\",\n \"driverCardNumber\": \"{{driverCardNumber}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}"
"raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}"
}
}
},
{
"name": "Runtime diagnostics - driver timeline from EventHub DB",
"name": "Runtime diagnostics - driver timeline from mixed backends and sources",
"request": {
"method": "POST",
"header": [
@ -544,7 +576,7 @@
},
"body": {
"mode": "raw",
"raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"sourceFamilies\": [\n \"TACHOGRAPH_DB\",\n \"YELLOWFOX_DB\"\n ],\n \"eventBackend\": \"EVENTHUB_DB\",\n \"driverCardNation\": \"{{driverCardNation}}\",\n \"driverCardNumber\": \"{{driverCardNumber}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}"
"raw": "{\n \"tenantKey\": \"{{tenantKey}}\",\n \"driverKey\": \"{{driverKey}}\",\n \"occurredFrom\": \"{{occurredFrom}}\",\n \"occurredTo\": \"{{occurredTo}}\",\n \"sourceInputs\": [\n {\n \"sourceFamily\": \"TACHOGRAPH_FILE_SESSION\",\n \"eventBackend\": \"SOURCE_DB\",\n \"sessionIds\": [\n \"{{sessionId}}\",\n \"{{sessionId2}}\"\n ]\n },\n {\n \"sourceFamily\": \"TACHOGRAPH_DB\",\n \"eventBackend\": \"EVENTHUB_DB\"\n },\n {\n \"sourceFamily\": \"YELLOWFOX_DB\",\n \"eventBackend\": \"SOURCE_DB\"\n }\n ],\n \"expandVehicleEvents\": true,\n \"vehicleExpansionPaddingMinutes\": 15\n}"
}
}
}

View File

@ -55,6 +55,7 @@ public class EventHubEventReadRepository {
request.tenantKey(),
request.occurredFrom(),
request.occurredTo(),
request.includeIntersectingIntervals() && "TACHOGRAPH".equalsIgnoreCase(providerKey),
request.driverSourceEntityId(),
request.driverCardNation(),
request.driverCardNumber(),
@ -76,6 +77,7 @@ public class EventHubEventReadRepository {
request.tenantKey(),
request.occurredFrom(),
request.occurredTo(),
request.includeIntersectingIntervals() && "TACHOGRAPH".equalsIgnoreCase(providerKey),
null,
null,
null,
@ -92,6 +94,7 @@ public class EventHubEventReadRepository {
String tenantKey,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
@ -104,6 +107,7 @@ public class EventHubEventReadRepository {
) {
StringBuilder sql = new StringBuilder(
"""
with candidate_events as (
select
event.id,
event.external_source_event_id,
@ -223,14 +227,6 @@ public class EventHubEventReadRepository {
}
sql.append(")");
}
if (occurredFrom != null) {
sql.append(" and event.occurred_at >= ?");
params.add(occurredFrom);
}
if (occurredTo != null) {
sql.append(" and event.occurred_at <= ?");
params.add(occurredTo);
}
if (driverSourceEntityId != null) {
sql.append(
"""
@ -307,7 +303,9 @@ public class EventHubEventReadRepository {
}
}
sql.append(" order by event.occurred_at, event.event_domain, event.event_type, event.lifecycle, event.id");
sql.append("\n)");
appendTemporalFilter(sql, params, occurredFrom, occurredTo, includeIntersectingIntervals);
sql.append(" order by occurred_at, event_domain, event_type, lifecycle, id");
return jdbcTemplate.query(
sql.toString(),
@ -316,6 +314,109 @@ public class EventHubEventReadRepository {
);
}
private void appendTemporalFilter(
StringBuilder sql,
List<Object> params,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals
) {
if (!includeIntersectingIntervals) {
sql.append("\nselect * from candidate_events");
appendPointWindowFilter(sql, params, occurredFrom, occurredTo);
return;
}
if (occurredFrom == null && occurredTo == null) {
sql.append("\nselect * from candidate_events");
return;
}
sql.append(
"""
, interval_events as (
select
'DRIVER_ACTIVITY' as interval_scope,
coalesce(payload #>> '{raw,intervalId}', payload #>> '{raw,sourceRowId}', external_source_event_id) as interval_key,
min(case when lifecycle = 'START' then occurred_at end) as started_at,
max(case when lifecycle = 'END' then occurred_at end) as ended_at
from candidate_events
where event_domain = 'DRIVER_ACTIVITY'
group by 1, 2
union all
select
'DRIVER_CARD' as interval_scope,
coalesce(payload #>> '{raw,intervalId}', payload #>> '{raw,sourceRowId}', external_source_event_id) as interval_key,
min(case when event_type = 'CARD_INSERTED' then occurred_at end) as started_at,
max(case when event_type = 'CARD_WITHDRAWN' then occurred_at end) as ended_at
from candidate_events
where event_domain = 'DRIVER_CARD'
and event_type in ('CARD_INSERTED', 'CARD_WITHDRAWN')
group by 1, 2
)
select ce.*
from candidate_events ce
left join interval_events ie
on ie.interval_key = coalesce(ce.payload #>> '{raw,intervalId}', ce.payload #>> '{raw,sourceRowId}', ce.external_source_event_id)
and (
(ie.interval_scope = 'DRIVER_ACTIVITY' and ce.event_domain = 'DRIVER_ACTIVITY')
or (ie.interval_scope = 'DRIVER_CARD'
and ce.event_domain = 'DRIVER_CARD'
and ce.event_type in ('CARD_INSERTED', 'CARD_WITHDRAWN'))
)
"""
);
StringBuilder where = new StringBuilder();
appendPointWindowPredicate(where, params, "ce.occurred_at", occurredFrom, occurredTo);
if (where.length() == 0) {
where.append("\nwhere 1 = 0");
}
where.append("\n or (ie.interval_key is not null");
if (occurredTo != null) {
where.append("\n and ie.started_at <= ?");
params.add(occurredTo);
}
if (occurredFrom != null) {
where.append("\n and coalesce(ie.ended_at, 'infinity'::timestamptz) >= ?");
params.add(occurredFrom);
}
where.append("\n )");
sql.append(where);
}
private void appendPointWindowFilter(
StringBuilder sql,
List<Object> params,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
StringBuilder where = new StringBuilder();
appendPointWindowPredicate(where, params, "occurred_at", occurredFrom, occurredTo);
sql.append(where);
}
private void appendPointWindowPredicate(
StringBuilder sql,
List<Object> params,
String occurredAtColumn,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
boolean hasCondition = false;
if (occurredFrom != null) {
sql.append(hasCondition ? " and " : "\nwhere ");
sql.append(occurredAtColumn).append(" >= ?");
params.add(occurredFrom);
hasCondition = true;
}
if (occurredTo != null) {
sql.append(hasCondition ? " and " : "\nwhere ");
sql.append(occurredAtColumn).append(" <= ?");
params.add(occurredTo);
}
}
private EventHubEventDto mapEvent(ResultSet rs) throws SQLException {
DriverRefDto driverRef = driverRef(rs);
VehicleRefDto vehicleRef = vehicleRef(rs);

View File

@ -47,4 +47,48 @@ public record DriverWorkingTimeProcessingResultDto(
List<DriverWorkingTimeSupportGeoEvent> supportGeoEvents,
List<String> notes
) {
public DriverWorkingTimeProcessingResultDto withIncludedIntervals(
boolean includeActivityIntervals,
boolean includeDrivingIntervals
) {
if (includeActivityIntervals && includeDrivingIntervals) {
return this;
}
return new DriverWorkingTimeProcessingResultDto(
sessionId,
driverKey,
sourceKind,
loadedFrom,
loadedTo,
requestedFrom,
requestedTo,
activityIntervalCount,
drivingIntervalCount,
drivingInterruptionIntervalCount,
drivingInterruptionVehicleChangeIntervalCount,
dailyWeeklyRestCandidateIntervalCount,
dailyWeeklyRestCandidateCoverageIntervalCount,
unclassifiedDailyWeeklyRestCandidateCoverageIntervalCount,
potentialHomeOvernightStayIntervalCount,
potentialInVehicleOvernightStayIntervalCount,
potentialInVehicleTripIntervalCount,
vehicleUsageIntervalCount,
vuCardAbsentIntervalCount,
supportGeoEventCount,
includeActivityIntervals ? activityIntervals : List.of(),
includeDrivingIntervals ? drivingIntervals : List.of(),
drivingInterruptionIntervals,
drivingInterruptionVehicleChangeIntervals,
dailyWeeklyRestCandidateIntervals,
dailyWeeklyRestCandidateCoverageIntervals,
unclassifiedDailyWeeklyRestCandidateCoverageIntervals,
potentialHomeOvernightStayIntervals,
potentialInVehicleOvernightStayIntervals,
potentialInVehicleTripIntervals,
vehicleUsageIntervals,
vuCardAbsentIntervals,
supportGeoEvents,
notes
);
}
}

View File

@ -29,9 +29,68 @@ public record UnifiedRuntimeProcessingApiRequest(
OffsetDateTime occurredTo,
Boolean expandVehicleEvents,
Integer vehicleExpansionPaddingMinutes,
Boolean includeIntersectingIntervals,
Integer significantDrivingMinutes,
Integer minimumRestPeriodMinutes
Integer minimumRestPeriodMinutes,
Boolean includeActivityIntervals,
Boolean includeDrivingIntervals,
List<UnifiedRuntimeSourceInputApiRequest> sourceInputs
) {
public UnifiedRuntimeProcessingApiRequest(
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId,
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
UnifiedRuntimeEventBackend eventBackend,
Set<UnifiedTachographSourceKind> tachographSourceKinds,
String driverKey,
Set<String> driverKeys,
Boolean includeAllDrivers,
Set<String> vehicleKeys,
Boolean includeAllVehicles,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
Boolean expandVehicleEvents,
Integer vehicleExpansionPaddingMinutes,
Boolean includeIntersectingIntervals,
Integer significantDrivingMinutes,
Integer minimumRestPeriodMinutes,
Boolean includeActivityIntervals,
Boolean includeDrivingIntervals
) {
this(
sessionId,
sessionIds,
compositeSessionId,
tenantKey,
sourceFamilies,
eventBackend,
tachographSourceKinds,
driverKey,
driverKeys,
includeAllDrivers,
vehicleKeys,
includeAllVehicles,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes,
includeIntersectingIntervals,
significantDrivingMinutes,
minimumRestPeriodMinutes,
includeActivityIntervals,
includeDrivingIntervals,
List.of()
);
}
public UnifiedRuntimeProcessingRequest toRuntimeRequest() {
return new UnifiedRuntimeProcessingRequest(
sessionId,
@ -52,7 +111,20 @@ public record UnifiedRuntimeProcessingApiRequest(
occurredFrom,
occurredTo,
expandVehicleEvents == null || expandVehicleEvents,
vehicleExpansionPaddingMinutes == null ? 0 : Math.max(0, vehicleExpansionPaddingMinutes)
vehicleExpansionPaddingMinutes == null ? 0 : Math.max(0, vehicleExpansionPaddingMinutes),
includeIntersectingIntervals == null || includeIntersectingIntervals,
sourceInputs == null ? List.of() : sourceInputs.stream()
.filter(value -> value != null)
.map(UnifiedRuntimeSourceInputApiRequest::toRuntimeSourceInput)
.toList()
);
}
public boolean includeActivityIntervalsOrDefault() {
return includeActivityIntervals != null && includeActivityIntervals;
}
public boolean includeDrivingIntervalsOrDefault() {
return includeDrivingIntervals != null && includeDrivingIntervals;
}
}

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.processing.dto;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput;
import java.util.List;
import java.util.UUID;
public record UnifiedRuntimeSourceInputApiRequest(
UnifiedEventSourceFamily sourceFamily,
UnifiedRuntimeEventBackend eventBackend,
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId
) {
public UnifiedRuntimeSourceInput toRuntimeSourceInput() {
return new UnifiedRuntimeSourceInput(
sourceFamily,
eventBackend,
sessionId,
sessionIds,
compositeSessionId
);
}
}

View File

@ -69,7 +69,11 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
for (Map.Entry<String, DriverWorkingTimePreparedInput> entry : preparedInputs.entrySet()) {
DriverWorkingTimePreparedInput preparedInput = entry.getValue();
DriverWorkingTimeProcessingResultDto projection =
workingTimeProcessingCore.process(preparedInput.processingInput());
workingTimeProcessingCore.process(preparedInput.processingInput())
.withIncludedIntervals(
scopeRequest.includeActivityIntervalsOrDefault(),
scopeRequest.includeDrivingIntervalsOrDefault()
);
warnings.addAll(preparedInput.partition().warnings());
UnifiedRuntimeProcessingRequest driverRequest = broadBundle.request().withDriverKey(preparedInput.driverKey());
driverResults.put(preparedInput.driverKey(), new UnifiedRuntimeDerivedProjectionResultDto(

View File

@ -193,7 +193,9 @@ public class SupportEvidenceNormalizationModule implements RuntimeProcessingModu
}
private UUID runtimeSessionId(UnifiedRuntimeProcessingRequest request) {
if (request.compositeSessionId() != null || request.sessionIds().size() > 1) {
if (request.normalizedSourceInputs().size() > 1
|| request.compositeSessionId() != null
|| request.sessionIds().size() > 1) {
return null;
}
return request.sessionIds().size() == 1 ? request.sessionIds().get(0) : request.sessionId();

View File

@ -13,6 +13,7 @@ import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.service.RuntimeDriverVehicleEvidenceAttachmentService;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
@ -90,7 +91,7 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
);
for (EventHubEventDto attachedEvent : attachmentResult.attachedVehicleEvidenceEvents()) {
attachedVehicleEvidenceByEvent
.computeIfAbsent(dedupKey(attachedEvent), ignored -> new ArrayList<>())
.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(attachedEvent), ignored -> new ArrayList<>())
.add(driverKey);
}
RuntimeDriverPartitionDebugDto partitionDebug = includePartitionDebug ? attachmentResult.toPartitionDebug() : null;
@ -283,13 +284,6 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
return text == null || text.isBlank() ? null : text.trim();
}
private String dedupKey(EventHubEventDto event) {
String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null
? event.packageInfo().eventSource().stableKey()
: "NO_SOURCE";
return sourceKey + "|" + event.externalSourceEventId();
}
private boolean booleanAttribute(RuntimeProcessingModuleContext context, String key, boolean fallback) {
Object value = context.attributes().get(key);
if (value instanceof Boolean booleanValue) {

View File

@ -8,6 +8,7 @@ import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeMod
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleContext;
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleResult;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.OffsetDateTime;
@ -59,6 +60,7 @@ public final class DriverWorkingTimeEplEventMapper {
definition.put("driverKey", String.class);
definition.put("eventId", String.class);
definition.put("intervalId", String.class);
definition.put("runtimeIntervalKey", String.class);
definition.put("sourceRowId", String.class);
definition.put("sourceRowIds", java.util.List.class);
definition.put("activityType", String.class);
@ -109,6 +111,7 @@ public final class DriverWorkingTimeEplEventMapper {
definition.put("driverKey", String.class);
definition.put("eventId", String.class);
definition.put("intervalId", String.class);
definition.put("runtimeIntervalKey", String.class);
definition.put("sourceRowId", String.class);
definition.put("sourceRowIds", java.util.List.class);
definition.put("lifecycle", String.class);
@ -151,9 +154,10 @@ public final class DriverWorkingTimeEplEventMapper {
}
JsonNode raw = rawPayload(sourceEvent);
JsonNode attributes = attributes(sourceEvent);
String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId());
String intervalId = RuntimeEventIdentityResolver.presentationIntervalId(sourceEvent);
String runtimeIntervalKey = RuntimeEventIdentityResolver.runtimeIntervalKey(sourceEvent);
String driverKey = TachographRuntimeIdentityResolver.driverKey(sourceEvent);
if (driverKey == null || intervalId == null) {
if (driverKey == null || intervalId == null || runtimeIntervalKey == null) {
return null;
}
Map<String, Object> event = new LinkedHashMap<>();
@ -161,6 +165,7 @@ public final class DriverWorkingTimeEplEventMapper {
event.put("driverKey", driverKey);
event.put("eventId", sourceEvent.externalSourceEventId());
event.put("intervalId", intervalId);
event.put("runtimeIntervalKey", runtimeIntervalKey);
event.put("sourceRowId", firstNonBlank(text(raw, "sourceRowId"), intervalId));
event.put("sourceRowIds", stringList(raw, "sourceRowIds", intervalId));
event.put("activityType", firstNonBlank(text(raw, "activityType"), eventTypeAsActivity(sourceEvent.eventType())));
@ -192,9 +197,10 @@ public final class DriverWorkingTimeEplEventMapper {
return null;
}
JsonNode raw = rawPayload(sourceEvent);
String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId());
String intervalId = RuntimeEventIdentityResolver.presentationIntervalId(sourceEvent);
String runtimeIntervalKey = RuntimeEventIdentityResolver.runtimeIntervalKey(sourceEvent);
String driverKey = TachographRuntimeIdentityResolver.driverKey(sourceEvent);
if (driverKey == null || intervalId == null) {
if (driverKey == null || intervalId == null || runtimeIntervalKey == null) {
return null;
}
Map<String, Object> event = new LinkedHashMap<>();
@ -202,6 +208,7 @@ public final class DriverWorkingTimeEplEventMapper {
event.put("driverKey", driverKey);
event.put("eventId", sourceEvent.externalSourceEventId());
event.put("intervalId", intervalId);
event.put("runtimeIntervalKey", runtimeIntervalKey);
event.put("sourceRowId", firstNonBlank(text(raw, "sourceRowId"), intervalId));
event.put("sourceRowIds", stringList(raw, "sourceRowIds", intervalId));
event.put("lifecycle", sourceEvent.lifecycle().name());
@ -219,7 +226,7 @@ public final class DriverWorkingTimeEplEventMapper {
.comparing((Map<String, Object> event) -> (Long) event.get("occurredAtEpochSecond"))
.thenComparing(event -> lifecycleOrder(Objects.toString(event.get("lifecycle"), "")))
.thenComparing(event -> Objects.toString(event.get("driverKey"), ""))
.thenComparing(event -> Objects.toString(event.get("intervalId"), ""))
.thenComparing(event -> Objects.toString(event.get("runtimeIntervalKey"), ""))
.thenComparing(event -> Objects.toString(event.get("eventId"), ""));
}

View File

@ -168,6 +168,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"minimumRestPeriodMinutes",
"attachVehicleOnlyEvents",
"vehicleEvidencePaddingMinutes",
"includeActivityIntervals",
"includeDrivingIntervals",
"includePartitionDebug"
);
}
@ -368,6 +370,10 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
Integer significantDrivingMinutes = integerParameter(parameters, "significantDrivingMinutes", sourceSelection.significantDrivingMinutes());
Integer minimumRestPeriodMinutes = integerParameter(parameters, "minimumRestPeriodMinutes", sourceSelection.minimumRestPeriodMinutes());
boolean includeActivityIntervals = booleanParameter(parameters, "includeActivityIntervals",
sourceSelection.includeActivityIntervalsOrDefault());
boolean includeDrivingIntervals = booleanParameter(parameters, "includeDrivingIntervals",
sourceSelection.includeDrivingIntervalsOrDefault());
boolean attachVehicleOnlyEvents = booleanParameter(parameters, "attachVehicleOnlyEvents",
partitioning == null ? sourceSelection.expandVehicleEvents() == null || sourceSelection.expandVehicleEvents() : partitioning.attachVehicleEvidenceOrDefault());
Integer vehicleEvidencePaddingMinutes = nonNegativeIntegerParameter(parameters, "vehicleEvidencePaddingMinutes",
@ -395,8 +401,12 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
sourceSelection.occurredTo(),
attachVehicleOnlyEvents,
vehicleEvidencePaddingMinutes,
sourceSelection.includeIntersectingIntervals(),
significantDrivingMinutes,
minimumRestPeriodMinutes
minimumRestPeriodMinutes,
includeActivityIntervals,
includeDrivingIntervals,
sourceSelection.sourceInputs()
);
}

View File

@ -178,8 +178,11 @@ public class RuntimeTachographParityValidationService {
request.occurredTo(),
request.expandVehicleEventsOrDefault(),
request.vehicleExpansionPaddingMinutesOrDefault(),
null,
request.significantDrivingMinutes(),
request.minimumRestPeriodMinutes()
request.minimumRestPeriodMinutes(),
false,
false
);
RuntimeEventPartitioningApiRequest partitioning = new RuntimeEventPartitioningApiRequest(
RuntimeEventPartitioningStrategy.DRIVER,

View File

@ -20,7 +20,8 @@ public record UnifiedDriverEventsRequest(
String registrationNation,
String registrationNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals
) {
public UnifiedDriverEventsRequest {
Objects.requireNonNull(sourceFamily, "sourceFamily must not be null");
@ -59,6 +60,16 @@ public record UnifiedDriverEventsRequest(
String driverKey,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
return forTachographFileSession(sessionId, driverKey, occurredFrom, occurredTo, false);
}
public static UnifiedDriverEventsRequest forTachographFileSession(
UUID sessionId,
String driverKey,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals
) {
return new UnifiedDriverEventsRequest(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
@ -74,7 +85,8 @@ public record UnifiedDriverEventsRequest(
null,
null,
occurredFrom,
occurredTo
occurredTo,
includeIntersectingIntervals
);
}
@ -93,7 +105,29 @@ public record UnifiedDriverEventsRequest(
driverCardNumber,
occurredFrom,
occurredTo,
List.of()
List.of(),
false
);
}
public static UnifiedDriverEventsRequest forTachographDbDriver(
String tenantKey,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals
) {
return forTachographDbDriver(
tenantKey,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
List.of(),
includeIntersectingIntervals
);
}
@ -105,6 +139,28 @@ public record UnifiedDriverEventsRequest(
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
List<String> sourceKinds
) {
return forTachographDbDriver(
tenantKey,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
sourceKinds,
false
);
}
public static UnifiedDriverEventsRequest forTachographDbDriver(
String tenantKey,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
List<String> sourceKinds,
boolean includeIntersectingIntervals
) {
return new UnifiedDriverEventsRequest(
UnifiedEventSourceFamily.TACHOGRAPH_DB,
@ -120,7 +176,8 @@ public record UnifiedDriverEventsRequest(
null,
null,
occurredFrom,
occurredTo
occurredTo,
includeIntersectingIntervals
);
}
@ -147,7 +204,8 @@ public record UnifiedDriverEventsRequest(
registrationNation,
registrationNumber,
occurredFrom,
occurredTo
occurredTo,
false
);
}
@ -173,7 +231,8 @@ public record UnifiedDriverEventsRequest(
null,
null,
occurredFrom,
occurredTo
occurredTo,
false
);
}

View File

@ -6,6 +6,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@ -28,13 +29,63 @@ public record UnifiedRuntimeProcessingRequest(
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes
int vehicleExpansionPaddingMinutes,
boolean includeIntersectingIntervals,
List<UnifiedRuntimeSourceInput> sourceInputs
) {
public UnifiedRuntimeProcessingRequest(
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId,
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
UnifiedRuntimeEventBackend eventBackend,
Set<UnifiedTachographSourceKind> tachographSourceKinds,
String driverKey,
Set<String> driverKeys,
boolean includeAllDrivers,
Set<String> vehicleKeys,
boolean includeAllVehicles,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes,
boolean includeIntersectingIntervals
) {
this(
sessionId,
sessionIds,
compositeSessionId,
tenantKey,
sourceFamilies,
eventBackend,
tachographSourceKinds,
driverKey,
driverKeys,
includeAllDrivers,
vehicleKeys,
includeAllVehicles,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes,
includeIntersectingIntervals,
List.of()
);
}
public UnifiedRuntimeProcessingRequest {
if (sourceFamilies == null || sourceFamilies.isEmpty()) {
sourceInputs = normalizeSourceInputs(sourceInputs);
sourceFamilies = normalizeSourceFamilies(sourceFamilies, sourceInputs);
if (sourceFamilies.isEmpty()) {
throw new IllegalArgumentException("sourceFamilies must not be empty");
}
sourceFamilies = Set.copyOf(sourceFamilies);
eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend;
tachographSourceKinds = normalizeTachographSourceKinds(tachographSourceKinds);
sessionIds = normalizeSessionIds(sessionId, sessionIds);
@ -59,22 +110,44 @@ public record UnifiedRuntimeProcessingRequest(
driverSourceEntityId = normalize(driverSourceEntityId);
driverCardNation = normalizeUpper(driverCardNation);
driverCardNumber = normalizeDriverCardNumber(driverCardNumber);
boolean includesFileSession = sourceFamilies.contains(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION);
boolean includesExternalDb = sourceFamilies.stream()
.anyMatch(family -> family == UnifiedEventSourceFamily.TACHOGRAPH_DB || family == UnifiedEventSourceFamily.YELLOWFOX_DB);
if (includesFileSession && eventBackend == UnifiedRuntimeEventBackend.EVENTHUB_DB) {
throw new IllegalArgumentException("TACHOGRAPH_FILE_SESSION runtime processing currently supports SOURCE_DB backend only.");
}
String[] inferredDriverCardSelector = inferDriverCardSelector(driverKey, driverCardNation, driverCardNumber);
driverCardNation = inferredDriverCardSelector[0];
driverCardNumber = inferredDriverCardSelector[1];
List<UnifiedRuntimeSourceInput> normalizedSourceInputs = normalizedSourceInputs(
sourceFamilies,
eventBackend,
sessionId,
sessionIds,
compositeSessionId,
sourceInputs
);
boolean includesFileSession = normalizedSourceInputs.stream()
.anyMatch(input -> input.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION);
boolean includesExternalDb = normalizedSourceInputs.stream()
.map(UnifiedRuntimeSourceInput::sourceFamily)
.anyMatch(UnifiedRuntimeProcessingRequest::isExternalDbFamily);
boolean includesEventHubExternalDb = normalizedSourceInputs.stream()
.anyMatch(input -> input.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB
&& isExternalDbFamily(input.sourceFamily()));
if (tenantKey == null) {
if (!includesFileSession || includesExternalDb) {
throw new IllegalArgumentException("tenantKey must not be blank");
}
}
if (includesFileSession && compositeSessionId == null && sessionIds.isEmpty()) {
throw new IllegalArgumentException("sessionId, sessionIds or compositeSessionId must be provided when TACHOGRAPH_FILE_SESSION is selected.");
if (includesFileSession && compositeSessionId == null && sessionIds.isEmpty()
&& normalizedSourceInputs.stream()
.filter(input -> input.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION)
.allMatch(input -> input.compositeSessionId() == null && input.sessionIds().isEmpty())) {
throw new IllegalArgumentException(
"sessionId, sessionIds or compositeSessionId must be provided when TACHOGRAPH_FILE_SESSION is selected."
);
}
if (includesFileSession && driverKey == null && driverKeys.isEmpty() && !includeAllDrivers) {
throw new IllegalArgumentException("driverKey, driverKeys or includeAllDrivers must be provided when TACHOGRAPH_FILE_SESSION is selected.");
throw new IllegalArgumentException(
"driverKey, driverKeys or includeAllDrivers must be provided when TACHOGRAPH_FILE_SESSION is selected."
);
}
if (includesExternalDb && driverSourceEntityId == null && driverCardNumber == null
&& driverKeys.isEmpty() && !includeAllDrivers) {
@ -82,14 +155,17 @@ public record UnifiedRuntimeProcessingRequest(
}
if (includesExternalDb && (includeAllDrivers || !driverKeys.isEmpty())
&& (occurredFrom == null || occurredTo == null)) {
throw new IllegalArgumentException("occurredFrom and occurredTo are required when loading broad external DB runtime scopes.");
throw new IllegalArgumentException(
"occurredFrom and occurredTo are required when loading broad external DB runtime scopes."
);
}
if (eventBackend == UnifiedRuntimeEventBackend.EVENTHUB_DB
&& includesExternalDb
if (includesEventHubExternalDb
&& driverSourceEntityId == null
&& driverCardNumber == null
&& (includeAllDrivers || !driverKeys.isEmpty())) {
throw new IllegalArgumentException("Broad multi-driver EVENTHUB_DB runtime scopes are not supported yet; provide a concrete EventHub driver selector or use SOURCE_DB.");
throw new IllegalArgumentException(
"Broad multi-driver EVENTHUB_DB runtime scopes are not supported yet; provide a concrete EventHub driver selector or use SOURCE_DB."
);
}
if (occurredFrom != null && occurredTo != null && occurredTo.isBefore(occurredFrom)) {
throw new IllegalArgumentException("occurredTo must not be before occurredFrom");
@ -118,7 +194,8 @@ public record UnifiedRuntimeProcessingRequest(
occurredTo,
UnifiedRuntimeEventBackend.SOURCE_DB,
true,
0
0,
true
);
}
@ -143,7 +220,8 @@ public record UnifiedRuntimeProcessingRequest(
occurredTo,
UnifiedRuntimeEventBackend.SOURCE_DB,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
vehicleExpansionPaddingMinutes,
true
);
}
@ -158,6 +236,34 @@ public record UnifiedRuntimeProcessingRequest(
UnifiedRuntimeEventBackend eventBackend,
boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes
) {
return forDriver(
tenantKey,
sourceFamilies,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
eventBackend,
expandVehicleEvents,
vehicleExpansionPaddingMinutes,
true
);
}
public static UnifiedRuntimeProcessingRequest forDriver(
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
UnifiedRuntimeEventBackend eventBackend,
boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes,
boolean includeIntersectingIntervals
) {
return new UnifiedRuntimeProcessingRequest(
null,
@ -178,7 +284,8 @@ public record UnifiedRuntimeProcessingRequest(
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
vehicleExpansionPaddingMinutes,
includeIntersectingIntervals
);
}
@ -212,7 +319,8 @@ public record UnifiedRuntimeProcessingRequest(
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
vehicleExpansionPaddingMinutes,
true
);
}
@ -243,7 +351,8 @@ public record UnifiedRuntimeProcessingRequest(
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
vehicleExpansionPaddingMinutes,
true
);
}
@ -274,7 +383,8 @@ public record UnifiedRuntimeProcessingRequest(
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
vehicleExpansionPaddingMinutes,
true
);
}
@ -305,7 +415,8 @@ public record UnifiedRuntimeProcessingRequest(
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
vehicleExpansionPaddingMinutes,
true
);
}
@ -317,6 +428,44 @@ public record UnifiedRuntimeProcessingRequest(
return occurredTo == null ? null : occurredTo.plusMinutes(vehicleExpansionPaddingMinutes);
}
public List<UnifiedRuntimeSourceInput> normalizedSourceInputs() {
return normalizedSourceInputs(
sourceFamilies,
eventBackend,
sessionId,
sessionIds,
compositeSessionId,
sourceInputs
);
}
public UnifiedRuntimeProcessingRequest forSourceInput(UnifiedRuntimeSourceInput sourceInput) {
Objects.requireNonNull(sourceInput, "sourceInput must not be null");
return new UnifiedRuntimeProcessingRequest(
sourceInput.sessionId(),
sourceInput.sessionIds(),
sourceInput.compositeSessionId(),
tenantKey,
Set.of(sourceInput.sourceFamily()),
sourceInput.eventBackend(),
tachographSourceKinds,
driverKey,
driverKeys,
includeAllDrivers,
vehicleKeys,
includeAllVehicles,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes,
includeIntersectingIntervals,
List.of(sourceInput)
);
}
private static List<UUID> normalizeSessionIds(UUID sessionId, List<UUID> sessionIds) {
LinkedHashSet<UUID> result = new LinkedHashSet<>();
if (sessionId != null) {
@ -348,7 +497,9 @@ public record UnifiedRuntimeProcessingRequest(
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
vehicleExpansionPaddingMinutes,
includeIntersectingIntervals,
sourceInputs
);
}
@ -373,6 +524,68 @@ public record UnifiedRuntimeProcessingRequest(
.toList();
}
private static Set<UnifiedEventSourceFamily> normalizeSourceFamilies(
Set<UnifiedEventSourceFamily> sourceFamilies,
List<UnifiedRuntimeSourceInput> sourceInputs
) {
LinkedHashSet<UnifiedEventSourceFamily> normalized = new LinkedHashSet<>();
if (sourceFamilies != null) {
normalized.addAll(sourceFamilies.stream().filter(Objects::nonNull).toList());
}
if (sourceInputs != null) {
sourceInputs.stream()
.filter(Objects::nonNull)
.map(UnifiedRuntimeSourceInput::sourceFamily)
.forEach(normalized::add);
}
return Set.copyOf(normalized);
}
private static List<UnifiedRuntimeSourceInput> normalizeSourceInputs(List<UnifiedRuntimeSourceInput> sourceInputs) {
if (sourceInputs == null || sourceInputs.isEmpty()) {
return List.of();
}
LinkedHashSet<UnifiedRuntimeSourceInput> normalized = new LinkedHashSet<>();
sourceInputs.stream()
.filter(Objects::nonNull)
.forEach(normalized::add);
return List.copyOf(normalized);
}
private static List<UnifiedRuntimeSourceInput> normalizedSourceInputs(
Set<UnifiedEventSourceFamily> sourceFamilies,
UnifiedRuntimeEventBackend eventBackend,
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId,
List<UnifiedRuntimeSourceInput> sourceInputs
) {
if (sourceInputs != null && !sourceInputs.isEmpty()) {
return List.copyOf(sourceInputs);
}
List<UnifiedRuntimeSourceInput> normalized = new ArrayList<>();
for (UnifiedEventSourceFamily sourceFamily : sourceFamilies == null ? Set.<UnifiedEventSourceFamily>of() : sourceFamilies) {
if (sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) {
normalized.add(new UnifiedRuntimeSourceInput(
sourceFamily,
eventBackend,
sessionId,
sessionIds,
compositeSessionId
));
} else {
normalized.add(new UnifiedRuntimeSourceInput(
sourceFamily,
eventBackend,
null,
List.of(),
null
));
}
}
return List.copyOf(normalized);
}
private static Set<String> normalizeStrings(Set<String> values) {
if (values == null || values.isEmpty()) {
return Set.of();
@ -396,6 +609,10 @@ public record UnifiedRuntimeProcessingRequest(
return Set.copyOf(values);
}
private static boolean isExternalDbFamily(UnifiedEventSourceFamily family) {
return family == UnifiedEventSourceFamily.TACHOGRAPH_DB || family == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
private static String normalize(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
@ -407,4 +624,39 @@ public record UnifiedRuntimeProcessingRequest(
private static String normalizeDriverCardNumber(String value) {
return DriverCardNumberNormalizer.canonical(value);
}
private static String[] inferDriverCardSelector(
String driverKey,
String driverCardNation,
String driverCardNumber
) {
if (driverCardNumber != null) {
return new String[]{driverCardNation, driverCardNumber};
}
if (driverKey == null) {
return new String[]{driverCardNation, driverCardNumber};
}
int separator = driverKey.indexOf(':');
if (separator <= 0 || separator >= driverKey.length() - 1) {
return new String[]{driverCardNation, driverCardNumber};
}
String prefix = driverKey.substring(0, separator).trim();
if (!looksLikeDriverCardNation(prefix)) {
return new String[]{driverCardNation, driverCardNumber};
}
return new String[]{
driverCardNation == null ? normalizeUpper(prefix) : driverCardNation,
normalizeDriverCardNumber(driverKey.substring(separator + 1))
};
}
private static boolean looksLikeDriverCardNation(String value) {
if (value == null || value.isBlank()) {
return false;
}
String trimmed = value.trim();
return trimmed.matches("\\d+")
|| trimmed.matches("(?i)[A-Z]{2}")
|| trimmed.matches("(?i)UNKNOWN\\s+\\d+");
}
}

View File

@ -0,0 +1,53 @@
package at.procon.eventhub.processing.model;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.UUID;
public record UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily sourceFamily,
UnifiedRuntimeEventBackend eventBackend,
UUID sessionId,
List<UUID> sessionIds,
UUID compositeSessionId
) {
public UnifiedRuntimeSourceInput {
if (sourceFamily == null) {
throw new IllegalArgumentException("sourceFamily must not be null");
}
eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend;
sessionIds = normalizeSessionIds(sessionId, sessionIds);
if (sessionId == null && !sessionIds.isEmpty()) {
sessionId = sessionIds.get(0);
}
if (compositeSessionId != null && !sessionIds.isEmpty()) {
throw new IllegalArgumentException("Use either compositeSessionId or sessionId/sessionIds, not both.");
}
if (sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) {
if (eventBackend == UnifiedRuntimeEventBackend.EVENTHUB_DB) {
throw new IllegalArgumentException("TACHOGRAPH_FILE_SESSION runtime processing currently supports SOURCE_DB backend only.");
}
if (compositeSessionId == null && sessionIds.isEmpty()) {
throw new IllegalArgumentException(
"sessionId, sessionIds or compositeSessionId must be provided for TACHOGRAPH_FILE_SESSION source inputs."
);
}
} else if (compositeSessionId != null || !sessionIds.isEmpty()) {
throw new IllegalArgumentException(
"sessionId, sessionIds or compositeSessionId are supported for TACHOGRAPH_FILE_SESSION source inputs only."
);
}
}
private static List<UUID> normalizeSessionIds(UUID sessionId, List<UUID> sessionIds) {
LinkedHashSet<UUID> result = new LinkedHashSet<>();
if (sessionId != null) {
result.add(sessionId);
}
if (sessionIds != null) {
result.addAll(sessionIds.stream().filter(value -> value != null).toList());
}
return List.copyOf(new ArrayList<>(result));
}
}

View File

@ -15,7 +15,8 @@ public record UnifiedVehicleEventsRequest(
String registrationNation,
String registrationNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals
) {
public UnifiedVehicleEventsRequest {
Objects.requireNonNull(sourceFamily, "sourceFamily must not be null");
@ -46,6 +47,28 @@ public record UnifiedVehicleEventsRequest(
String registrationNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
return forTachographFileSession(
sessionId,
vehicleSourceEntityId,
vin,
registrationNation,
registrationNumber,
occurredFrom,
occurredTo,
false
);
}
public static UnifiedVehicleEventsRequest forTachographFileSession(
UUID sessionId,
String vehicleSourceEntityId,
String vin,
String registrationNation,
String registrationNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals
) {
return new UnifiedVehicleEventsRequest(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
@ -57,7 +80,8 @@ public record UnifiedVehicleEventsRequest(
registrationNation,
registrationNumber,
occurredFrom,
occurredTo
occurredTo,
includeIntersectingIntervals
);
}
@ -78,7 +102,31 @@ public record UnifiedVehicleEventsRequest(
registrationNumber,
occurredFrom,
occurredTo,
List.of()
List.of(),
false
);
}
public static UnifiedVehicleEventsRequest forTachographDb(
String tenantKey,
String vehicleSourceEntityId,
String vin,
String registrationNation,
String registrationNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals
) {
return forTachographDb(
tenantKey,
vehicleSourceEntityId,
vin,
registrationNation,
registrationNumber,
occurredFrom,
occurredTo,
List.of(),
includeIntersectingIntervals
);
}
@ -91,6 +139,30 @@ public record UnifiedVehicleEventsRequest(
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
List<String> sourceKinds
) {
return forTachographDb(
tenantKey,
vehicleSourceEntityId,
vin,
registrationNation,
registrationNumber,
occurredFrom,
occurredTo,
sourceKinds,
false
);
}
public static UnifiedVehicleEventsRequest forTachographDb(
String tenantKey,
String vehicleSourceEntityId,
String vin,
String registrationNation,
String registrationNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
List<String> sourceKinds,
boolean includeIntersectingIntervals
) {
return new UnifiedVehicleEventsRequest(
UnifiedEventSourceFamily.TACHOGRAPH_DB,
@ -102,7 +174,8 @@ public record UnifiedVehicleEventsRequest(
registrationNation,
registrationNumber,
occurredFrom,
occurredTo
occurredTo,
includeIntersectingIntervals
);
}
@ -125,7 +198,8 @@ public record UnifiedVehicleEventsRequest(
registrationNation,
registrationNumber,
occurredFrom,
occurredTo
occurredTo,
false
);
}

View File

@ -71,7 +71,8 @@ public class EventHubRuntimeEventLoader implements RuntimeDriverEventLoader, Run
request.driverCardNumber(),
request.occurredFrom(),
request.occurredTo(),
request.tachographSourceKindNames()
request.tachographSourceKindNames(),
request.includeIntersectingIntervals()
);
case YELLOWFOX_DB -> UnifiedDriverEventsRequest.forYellowFoxDbDriver(
request.tenantKey(),
@ -99,7 +100,8 @@ public class EventHubRuntimeEventLoader implements RuntimeDriverEventLoader, Run
vehicleRef.registrationNumber(),
request.vehicleOccurredFrom(),
request.vehicleOccurredTo(),
request.tachographSourceKindNames()
request.tachographSourceKindNames(),
request.includeIntersectingIntervals()
);
case YELLOWFOX_DB -> UnifiedVehicleEventsRequest.forYellowFoxDb(
request.tenantKey(),

View File

@ -8,6 +8,7 @@ import at.procon.eventhub.processing.dto.RuntimeVehicleUsageIntervalDebugDto;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventScopeClassifier;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventScopeType;
import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline;
import at.procon.eventhub.tachographfilesession.model.ResolvedVehicleUsageInterval;
@ -220,7 +221,7 @@ public class RuntimeDriverVehicleEvidenceAttachmentService {
return new RuntimeVehicleEvidenceAttachmentDecisionDto(
decision,
reason,
dedupKey(event),
RuntimeEventIdentityResolver.canonicalEventKey(event),
event == null ? null : event.externalSourceEventId(),
event == null ? null : event.occurredAt(),
event == null || event.eventDomain() == null ? null : event.eventDomain().name(),
@ -460,17 +461,10 @@ public class RuntimeDriverVehicleEvidenceAttachmentService {
private void appendDeduplicated(LinkedHashMap<String, EventHubEventDto> byKey, List<EventHubEventDto> events) {
for (EventHubEventDto event : events == null ? List.<EventHubEventDto>of() : events) {
byKey.putIfAbsent(dedupKey(event), event);
byKey.putIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), event);
}
}
private String dedupKey(EventHubEventDto event) {
String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null
? event.packageInfo().eventSource().stableKey()
: "NO_SOURCE";
return sourceKey + "|" + event.externalSourceEventId();
}
private List<EventHubEventDto> sort(List<EventHubEventDto> events) {
return (events == null ? List.<EventHubEventDto>of() : events).stream()
.sorted(Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))

View File

@ -10,6 +10,7 @@ import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
@ -66,7 +67,7 @@ public class RuntimeDriverWorkingTimeScopeProcessingService {
}
for (EventHubEventDto attachedEvent : driverBundle.expandedVehicleEvents()) {
attachedVehicleEvidenceByEvent
.computeIfAbsent(dedupKey(attachedEvent), ignored -> new ArrayList<>())
.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(attachedEvent), ignored -> new ArrayList<>())
.add(driverKey);
}
driverBundle.notes().stream()
@ -260,17 +261,10 @@ public class RuntimeDriverWorkingTimeScopeProcessingService {
private void appendDeduplicated(LinkedHashMap<String, EventHubEventDto> byKey, List<EventHubEventDto> events) {
for (EventHubEventDto event : events) {
byKey.putIfAbsent(dedupKey(event), event);
byKey.putIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), event);
}
}
private String dedupKey(EventHubEventDto event) {
String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null
? event.packageInfo().eventSource().stableKey()
: "NO_SOURCE";
return sourceKey + "|" + event.externalSourceEventId();
}
private List<EventHubEventDto> sort(List<EventHubEventDto> events) {
return (events == null ? List.<EventHubEventDto>of() : events).stream()
.sorted(Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))

View File

@ -0,0 +1,175 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
final class RuntimeIntervalEventWindowSelector {
private RuntimeIntervalEventWindowSelector() {
}
static TachographTimelineEventBundle filterBundle(
TachographTimelineEventBundle bundle,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals
) {
if (bundle == null) {
return new TachographTimelineEventBundle(List.of(), List.of(), List.of());
}
return new TachographTimelineEventBundle(
filterIntervalEvents(bundle.activityEvents(), occurredFrom, occurredTo, includeIntersectingIntervals),
filterIntervalEvents(bundle.vehicleUsageEvents(), occurredFrom, occurredTo, includeIntersectingIntervals),
filterPointEvents(bundle.supportEvents(), occurredFrom, occurredTo)
);
}
private static List<EventHubEventDto> filterIntervalEvents(
List<EventHubEventDto> events,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean includeIntersectingIntervals
) {
if (events == null || events.isEmpty()) {
return List.of();
}
if (!includeIntersectingIntervals) {
return filterPointEvents(events, occurredFrom, occurredTo);
}
LinkedHashMap<String, IntervalGroup> byInterval = new LinkedHashMap<>();
for (EventHubEventDto event : events) {
byInterval.computeIfAbsent(intervalKey(event), ignored -> new IntervalGroup()).add(event);
}
List<EventHubEventDto> result = new ArrayList<>();
for (IntervalGroup group : byInterval.values()) {
if (group.overlaps(occurredFrom, occurredTo)) {
result.addAll(group.events);
}
}
return List.copyOf(result);
}
private static List<EventHubEventDto> filterPointEvents(
List<EventHubEventDto> events,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
return events.stream()
.filter(event -> withinWindow(event.occurredAt(), occurredFrom, occurredTo))
.toList();
}
private static boolean withinWindow(
OffsetDateTime occurredAt,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
if (occurredAt == null) {
return false;
}
if (occurredFrom != null && occurredAt.isBefore(occurredFrom)) {
return false;
}
return occurredTo == null || !occurredAt.isAfter(occurredTo);
}
private static String intervalKey(EventHubEventDto event) {
return RuntimeEventIdentityResolver.runtimeIntervalKey(event);
}
private static JsonNode raw(EventHubEventDto event) {
JsonNode payload = event == null ? null : event.payload();
if (payload == null || payload.isNull() || payload.isMissingNode()) {
return null;
}
JsonNode raw = payload.get("raw");
return raw == null || raw.isNull() ? payload : raw;
}
private static String text(JsonNode node, String field) {
if (node == null || field == null) {
return null;
}
JsonNode value = node.get(field);
if (value == null || value.isNull()) {
return null;
}
String text = value.asText(null);
return text == null || text.isBlank() ? null : text.trim();
}
private static final class IntervalGroup {
private final List<EventHubEventDto> events = new ArrayList<>();
private OffsetDateTime startedAt;
private OffsetDateTime endedAt;
private void add(EventHubEventDto event) {
events.add(event);
if (event == null || event.occurredAt() == null) {
return;
}
if (isStartEvent(event)) {
startedAt = min(startedAt, event.occurredAt());
return;
}
if (isEndEvent(event)) {
endedAt = max(endedAt, event.occurredAt());
return;
}
startedAt = min(startedAt, event.occurredAt());
endedAt = max(endedAt, event.occurredAt());
}
private boolean overlaps(OffsetDateTime occurredFrom, OffsetDateTime occurredTo) {
if (startedAt == null && endedAt == null) {
return false;
}
OffsetDateTime effectiveStart = startedAt == null ? endedAt : startedAt;
OffsetDateTime effectiveEnd = endedAt;
if (occurredTo != null && effectiveStart != null && effectiveStart.isAfter(occurredTo)) {
return false;
}
return occurredFrom == null || effectiveEnd == null || !effectiveEnd.isBefore(occurredFrom);
}
private boolean isStartEvent(EventHubEventDto event) {
return event.lifecycle() == EventLifecycle.START
|| event.eventType() == EventType.CARD_INSERTED;
}
private boolean isEndEvent(EventHubEventDto event) {
return event.lifecycle() == EventLifecycle.END
|| event.eventType() == EventType.CARD_WITHDRAWN;
}
private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) {
if (left == null) {
return right;
}
if (right == null) {
return left;
}
return left.isBefore(right) ? left : right;
}
private OffsetDateTime max(OffsetDateTime left, OffsetDateTime right) {
if (left == null) {
return right;
}
if (right == null) {
return left;
}
return left.isAfter(right) ? left : right;
}
}
}

View File

@ -55,7 +55,8 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve
sessionId,
request.driverKey(),
request.occurredFrom(),
request.occurredTo()
request.occurredTo(),
request.includeIntersectingIntervals()
)
));
}
@ -77,7 +78,8 @@ public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEve
vehicleRef.registrationNation(),
vehicleRef.registrationNumber(),
request.vehicleOccurredFrom(),
request.vehicleOccurredTo()
request.vehicleOccurredTo(),
request.includeIntersectingIntervals()
)
));
}

View File

@ -5,11 +5,11 @@ import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
import at.procon.eventhub.tachographfilesession.service.DriverNotFoundInSessionException;
import at.procon.eventhub.tachographfilesession.service.DriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import java.time.OffsetDateTime;
import java.util.List;
import org.springframework.stereotype.Component;
@ -38,30 +38,27 @@ public class TachographFileSessionUnifiedDriverEventSource implements UnifiedDri
.orElseThrow(() -> new TachographFileSessionNotFoundException(request.sessionId()));
if (request.driverKey() == null) {
return session.driversByKey().values().stream()
.flatMap(driver -> eventBuilder.buildEvents(session, driver).stream())
.filter(event -> withinWindow(event.occurredAt(), request.occurredFrom(), request.occurredTo()))
.map(driver -> filterBundle(session, driver, request).allEvents())
.flatMap(List::stream)
.toList();
}
DriverExtractionSession driver = session.driversByKey().get(request.driverKey());
if (driver == null) {
throw new DriverNotFoundInSessionException(request.sessionId(), request.driverKey());
}
return eventBuilder.buildEvents(session, driver).stream()
.filter(event -> withinWindow(event.occurredAt(), request.occurredFrom(), request.occurredTo()))
.toList();
return filterBundle(session, driver, request).allEvents();
}
private boolean withinWindow(
OffsetDateTime occurredAt,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
private TachographTimelineEventBundle filterBundle(
TachographFileSession session,
DriverExtractionSession driver,
UnifiedDriverEventsRequest request
) {
if (occurredAt == null) {
return false;
}
if (occurredFrom != null && occurredAt.isBefore(occurredFrom)) {
return false;
}
return occurredTo == null || !occurredAt.isAfter(occurredTo);
return RuntimeIntervalEventWindowSelector.filterBundle(
eventBuilder.buildEventBundle(session, driver),
request.occurredFrom(),
request.occurredTo(),
request.includeIntersectingIntervals()
);
}
}

View File

@ -6,10 +6,10 @@ import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedVehicleEventsRequest;
import at.procon.eventhub.reference.TachographNationRegistry;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
import at.procon.eventhub.tachographfilesession.service.DriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import java.time.OffsetDateTime;
import java.util.List;
import org.springframework.stereotype.Component;
@ -37,9 +37,14 @@ public class TachographFileSessionUnifiedVehicleEventSource implements UnifiedVe
TachographFileSession session = repository.find(request.sessionId())
.orElseThrow(() -> new TachographFileSessionNotFoundException(request.sessionId()));
return session.driversByKey().values().stream()
.flatMap(driver -> eventBuilder.buildEvents(session, driver).stream())
.map(driver -> RuntimeIntervalEventWindowSelector.filterBundle(
eventBuilder.buildEventBundle(session, driver),
request.occurredFrom(),
request.occurredTo(),
request.includeIntersectingIntervals()
).allEvents())
.flatMap(List::stream)
.filter(event -> matchesVehicle(event.vehicleRef(), request))
.filter(event -> withinWindow(event.occurredAt(), request.occurredFrom(), request.occurredTo()))
.distinct()
.toList();
}
@ -61,20 +66,6 @@ public class TachographFileSessionUnifiedVehicleEventSource implements UnifiedVe
&& matchesNation(request.registrationNation(), vehicleRef.vehicleRegistration().nation(), vehicleRef.vehicleRegistration().nationNumericCode());
}
private boolean withinWindow(
OffsetDateTime occurredAt,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
if (occurredAt == null) {
return false;
}
if (occurredFrom != null && occurredAt.isBefore(occurredFrom)) {
return false;
}
return occurredTo == null || !occurredAt.isAfter(occurredTo);
}
private boolean matchesNation(String requestedNation, String actualNation, Integer actualNationNumericCode) {
if (requestedNation == null) {
return true;

View File

@ -4,6 +4,7 @@ import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent;
import at.procon.eventhub.tachographfilesession.model.ExtractionWarning;
@ -68,13 +69,13 @@ public class UnifiedEventTimelineReconstructor {
continue;
}
JsonNode raw = raw(event);
String intervalId = firstNonBlank(
text(raw, "intervalId"),
text(raw, "sourceRowId"),
event.externalSourceEventId()
);
String runtimeIntervalKey = RuntimeEventIdentityResolver.runtimeIntervalKey(event);
String intervalId = RuntimeEventIdentityResolver.presentationIntervalId(event);
if (runtimeIntervalKey == null || intervalId == null) {
continue;
}
ActivityAccumulator accumulator = byIntervalId.computeIfAbsent(
intervalId,
runtimeIntervalKey,
ignored -> new ActivityAccumulator(intervalId)
);
accumulator.accept(event, raw);
@ -102,13 +103,13 @@ public class UnifiedEventTimelineReconstructor {
continue;
}
JsonNode raw = raw(event);
String intervalId = firstNonBlank(
text(raw, "intervalId"),
text(raw, "sourceRowId"),
event.externalSourceEventId()
);
String runtimeIntervalKey = RuntimeEventIdentityResolver.runtimeIntervalKey(event);
String intervalId = RuntimeEventIdentityResolver.presentationIntervalId(event);
if (runtimeIntervalKey == null || intervalId == null) {
continue;
}
VehicleUsageAccumulator accumulator = byIntervalId.computeIfAbsent(
intervalId,
runtimeIntervalKey,
ignored -> new VehicleUsageAccumulator(sessionId, driverKey, intervalId)
);
accumulator.accept(event, raw);

View File

@ -167,7 +167,11 @@ public class UnifiedRuntimeDerivedProjectionService {
.toList(),
notes
);
DriverWorkingTimeProcessingResultDto projection = workingTimeProcessingCore.process(processingInput);
DriverWorkingTimeProcessingResultDto projection = workingTimeProcessingCore.process(processingInput)
.withIncludedIntervals(
apiRequest.includeActivityIntervalsOrDefault(),
apiRequest.includeDrivingIntervalsOrDefault()
);
notes = projection.notes();
RuntimeSupportEvidenceNormalizationDebugDto normalizationDebug = new RuntimeSupportEvidenceNormalizationDebugDto(
@ -195,7 +199,9 @@ public class UnifiedRuntimeDerivedProjectionService {
}
private UUID runtimeSessionId(UnifiedRuntimeProcessingRequest request) {
if (request.compositeSessionId() != null || request.sessionIds().size() > 1) {
if (request.normalizedSourceInputs().size() > 1
|| request.compositeSessionId() != null
|| request.sessionIds().size() > 1) {
return null;
}
return request.sessionIds().size() == 1 ? request.sessionIds().get(0) : request.sessionId();

View File

@ -7,6 +7,8 @@ import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
@ -29,6 +31,7 @@ public class UnifiedRuntimeEventAssemblyService {
}
public UnifiedRuntimeEventBundle assembleDriverScopedEvents(UnifiedRuntimeProcessingRequest request) {
List<UnifiedRuntimeSourceInput> sourceInputs = request.normalizedSourceInputs();
List<EventHubEventDto> driverSeedEvents = loadDriverSeedEvents(request);
List<UnifiedDiscoveredVehicleRef> discoveredVehicles = discoverVehicles(driverSeedEvents);
List<EventHubEventDto> expandedVehicleEvents = request.expandVehicleEvents()
@ -37,16 +40,27 @@ public class UnifiedRuntimeEventAssemblyService {
List<EventHubEventDto> mergedEvents = deduplicateAndSort(driverSeedEvents, expandedVehicleEvents);
List<String> notes = new ArrayList<>();
notes.add(request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB
? "Driver seed events were loaded from the local EventHub event store."
: "Driver seed events were loaded directly from the selected runtime sources.");
if (request.sourceFamilies().contains(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION)) {
if (request.compositeSessionId() != null) {
notes.add("Tachograph file-session events were loaded from composite session " + request.compositeSessionId() + ".");
} else if (request.sessionIds().size() > 1) {
notes.add("Tachograph file-session events were loaded from " + request.sessionIds().size() + " selected sessions.");
} else if (request.sessionId() != null) {
notes.add("Tachograph file-session events were loaded from session " + request.sessionId() + ".");
boolean includesEventHub = sourceInputs.stream()
.anyMatch(input -> input.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB);
boolean includesSourceDb = sourceInputs.stream()
.anyMatch(input -> input.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB);
if (includesEventHub && includesSourceDb) {
notes.add("Driver seed events were loaded from a mixed runtime scope spanning the local EventHub event store and direct source systems.");
} else if (includesEventHub) {
notes.add("Driver seed events were loaded from the local EventHub event store.");
} else {
notes.add("Driver seed events were loaded directly from the selected runtime sources.");
}
for (UnifiedRuntimeSourceInput sourceInput : sourceInputs) {
if (sourceInput.sourceFamily() != UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) {
continue;
}
if (sourceInput.compositeSessionId() != null) {
notes.add("Tachograph file-session events were loaded from composite session " + sourceInput.compositeSessionId() + ".");
} else if (sourceInput.sessionIds().size() > 1) {
notes.add("Tachograph file-session events were loaded from " + sourceInput.sessionIds().size() + " selected sessions.");
} else if (sourceInput.sessionId() != null) {
notes.add("Tachograph file-session events were loaded from session " + sourceInput.sessionId() + ".");
}
}
if (request.expandVehicleEvents()) {
@ -67,12 +81,13 @@ public class UnifiedRuntimeEventAssemblyService {
private List<EventHubEventDto> loadDriverSeedEvents(UnifiedRuntimeProcessingRequest request) {
List<EventHubEventDto> result = new ArrayList<>();
for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) {
RuntimeDriverEventLoader loader = driverLoader(request, sourceFamily);
for (UnifiedRuntimeSourceInput sourceInput : request.normalizedSourceInputs()) {
UnifiedRuntimeProcessingRequest sourceRequest = request.forSourceInput(sourceInput);
RuntimeDriverEventLoader loader = driverLoader(sourceRequest, sourceInput.sourceFamily());
if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) {
result.addAll(eventHubLoader.loadDriverEvents(request, sourceFamily));
result.addAll(eventHubLoader.loadDriverEvents(sourceRequest, sourceInput.sourceFamily()));
} else {
result.addAll(loader.loadDriverEvents(request));
result.addAll(loader.loadDriverEvents(sourceRequest));
}
}
return deduplicateAndSort(result, List.of());
@ -84,12 +99,13 @@ public class UnifiedRuntimeEventAssemblyService {
) {
List<EventHubEventDto> result = new ArrayList<>();
for (UnifiedDiscoveredVehicleRef vehicleRef : discoveredVehicles) {
for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) {
RuntimeVehicleEventLoader loader = vehicleLoader(request, sourceFamily);
for (UnifiedRuntimeSourceInput sourceInput : request.normalizedSourceInputs()) {
UnifiedRuntimeProcessingRequest sourceRequest = request.forSourceInput(sourceInput);
RuntimeVehicleEventLoader loader = vehicleLoader(sourceRequest, sourceInput.sourceFamily());
if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) {
result.addAll(eventHubLoader.loadVehicleEvents(request, sourceFamily, vehicleRef));
result.addAll(eventHubLoader.loadVehicleEvents(sourceRequest, sourceInput.sourceFamily(), vehicleRef));
} else {
result.addAll(loader.loadVehicleEvents(request, vehicleRef));
result.addAll(loader.loadVehicleEvents(sourceRequest, vehicleRef));
}
}
}
@ -151,28 +167,33 @@ public class UnifiedRuntimeEventAssemblyService {
private void appendDeduplicated(LinkedHashMap<String, EventHubEventDto> byKey, List<EventHubEventDto> events) {
for (EventHubEventDto event : events) {
byKey.putIfAbsent(dedupKey(event), event);
byKey.putIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), event);
}
}
private String dedupKey(EventHubEventDto event) {
String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null
? event.packageInfo().eventSource().stableKey()
: "NO_SOURCE";
return sourceKey + "|" + event.externalSourceEventId();
}
private RuntimeDriverEventLoader driverLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return driverEventLoaders.stream()
.filter(loader -> loader.supports(request, sourceFamily))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No runtime driver event loader is registered for source family " + sourceFamily + "."));
.orElseThrow(() -> new IllegalArgumentException(
"No runtime driver event loader is registered for source family "
+ sourceFamily
+ " and backend "
+ request.eventBackend()
+ "."
));
}
private RuntimeVehicleEventLoader vehicleLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return vehicleEventLoaders.stream()
.filter(loader -> loader.supports(request, sourceFamily))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No runtime vehicle event loader is registered for source family " + sourceFamily + "."));
.orElseThrow(() -> new IllegalArgumentException(
"No runtime vehicle event loader is registered for source family "
+ sourceFamily
+ " and backend "
+ request.eventBackend()
+ "."
));
}
}

View File

@ -10,6 +10,10 @@ import at.procon.eventhub.importing.extraction.ExtractionContext;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -34,6 +38,8 @@ final class TachographRawPayloadSupport {
put(raw, "sourceRowIds", List.of(sourceRowId));
put(raw, "intervalId", intervalId(context, sourceRowId));
}
put(raw, "startedAt", offsetDateTimeText(rs, "interval_started_at"));
put(raw, "endedAt", offsetDateTimeText(rs, "interval_ended_at"));
put(raw, "sourceKind", context == null || context.planItem() == null ? null : context.planItem().sourceKind());
put(raw, "extractionCode", context == null || context.planItem() == null ? null : context.planItem().extractionCode());
put(raw, "level", "RAW_INTERVAL");
@ -135,6 +141,36 @@ final class TachographRawPayloadSupport {
return Long.parseLong(value.toString());
}
private static String offsetDateTimeText(ResultSet rs, String column) throws SQLException {
Object value;
try {
value = rs.getObject(column);
} catch (SQLException ex) {
if (missingColumn(ex)) {
return null;
}
throw ex;
}
if (value == null) {
return null;
}
if (value instanceof OffsetDateTime offsetDateTime) {
return offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC).toString();
}
if (value instanceof Timestamp timestamp) {
return timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC).toString();
}
if (value instanceof LocalDateTime localDateTime) {
return localDateTime.atOffset(ZoneOffset.UTC).toString();
}
String text = value.toString();
try {
return OffsetDateTime.parse(text).withOffsetSameInstant(ZoneOffset.UTC).toString();
} catch (RuntimeException ignored) {
return LocalDateTime.parse(text).atOffset(ZoneOffset.UTC).toString();
}
}
private static boolean missingColumn(SQLException ex) {
String state = ex.getSQLState();
String message = ex.getMessage();
@ -145,4 +181,5 @@ final class TachographRawPayloadSupport {
|| message.toLowerCase(java.util.Locale.ROOT).contains("not valid")
|| message.toLowerCase(java.util.Locale.ROOT).contains("invalid")));
}
}

View File

@ -150,6 +150,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE
Map<String, Object> raw = new LinkedHashMap<>();
raw.put("intervalId", interval.intervalId());
raw.put("sourceRowId", interval.intervalId());
raw.put("sessionId", session.sessionId().toString());
raw.put("driverKey", driverKey);
raw.put("activityType", interval.activityType());
raw.put("sourceRowIds", interval.sourceIntervalIds());
@ -232,6 +233,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE
Map<String, Object> raw = new LinkedHashMap<>();
raw.put("intervalId", interval.intervalId());
raw.put("sourceRowId", interval.intervalId());
raw.put("sessionId", session.sessionId().toString());
raw.put("driverKey", driverKey);
raw.put("sourceRowIds", interval.sourceIntervalIds());
raw.put("startedAt", timeText(interval.from()));
@ -312,6 +314,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE
EventDetailsDto details = supportDetails(eventDomain, supportEvent);
Map<String, Object> raw = new LinkedHashMap<>();
raw.put("sourceRowId", supportEvent.eventId());
raw.put("sessionId", session.sessionId().toString());
raw.put("supportEventId", supportEvent.eventId());
raw.put("driverKey", supportEvent.driverKey());
raw.put("supportEventType", supportEvent.eventType());

View File

@ -27,8 +27,7 @@ create schema DriverActivityIntervalEvent(
level string
);
create window OpenDriverActivityPoint#unique(driverKey, intervalId) as DriverActivityPointEvent;
create window OpenDriverActivityPoint#unique(driverKey, runtimeIntervalKey) as DriverActivityPointEvent;
insert into OpenDriverActivityPoint
select *
from DriverActivityPointEvent(lifecycle = 'START');
@ -60,14 +59,14 @@ select
startEvent.level as level
from OpenDriverActivityPoint as startEvent
where startEvent.driverKey = endEvent.driverKey
and startEvent.intervalId = endEvent.intervalId
and startEvent.runtimeIntervalKey = endEvent.runtimeIntervalKey
and endEvent.occurredAtEpochSecond > startEvent.occurredAtEpochSecond;
@Priority(10)
on DriverActivityPointEvent(lifecycle = 'END') as endEvent
delete from OpenDriverActivityPoint as openEvent
where openEvent.driverKey = endEvent.driverKey
and openEvent.intervalId = endEvent.intervalId;
and openEvent.runtimeIntervalKey = endEvent.runtimeIntervalKey;
@name('driverActivityIntervals')
select *

View File

@ -22,7 +22,7 @@ create schema DriverVehicleUsageIntervalEvent(
sourceIntervalIds java.util.List
);
create window OpenDriverVehicleUsagePoint#unique(driverKey, intervalId) as DriverVehicleUsagePointEvent;
create window OpenDriverVehicleUsagePoint#unique(driverKey, runtimeIntervalKey) as DriverVehicleUsagePointEvent;
insert into OpenDriverVehicleUsagePoint
select *
@ -50,14 +50,14 @@ select
insertEvent.sourceRowIds as sourceIntervalIds
from OpenDriverVehicleUsagePoint as insertEvent
where insertEvent.driverKey = withdrawEvent.driverKey
and insertEvent.intervalId = withdrawEvent.intervalId
and insertEvent.runtimeIntervalKey = withdrawEvent.runtimeIntervalKey
and withdrawEvent.occurredAtEpochSecond >= insertEvent.occurredAtEpochSecond;
@Priority(10)
on DriverVehicleUsagePointEvent(lifecycle = 'WITHDRAW') as withdrawEvent
delete from OpenDriverVehicleUsagePoint as openEvent
where openEvent.driverKey = withdrawEvent.driverKey
and openEvent.intervalId = withdrawEvent.intervalId;
and openEvent.runtimeIntervalKey = withdrawEvent.runtimeIntervalKey;
@name('driverVehicleUsageIntervals')
select *

View File

@ -105,6 +105,8 @@ select
concat('TACHOGRAPH:CARD_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id,
evt.occurred_at as occurred_at,
base.BeginTime as interval_started_at,
base.EndTime as interval_ended_at,
base.received_partner_at,
base.Activity as activity_code,
case upper(coalesce(base.Activity, ''))

View File

@ -61,6 +61,8 @@ Base as (
used.vehicle_vin,
used.vehicle_registration_nation,
used.vehicle_registration_number,
used.FirstUse,
used.LastUse,
evt.lifecycle,
evt.occurred_at,
evt.odometer_m,
@ -87,6 +89,8 @@ select
concat('TACHOGRAPH:CARD_VEHICLES_USED:', base.ID, ':', base.lifecycle) as external_source_event_id,
base.occurred_at,
case when base.lifecycle = 'INSERT' then base.occurred_at else base.FirstUse end as interval_started_at,
case when base.lifecycle = 'WITHDRAW' then base.occurred_at else base.LastUse end as interval_ended_at,
base.received_partner_at,
case base.lifecycle
when 'INSERT' then 'CARD_INSERTED'

View File

@ -131,6 +131,8 @@ select
concat('TACHOGRAPH:VU_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id,
evt.occurred_at as occurred_at,
base.BeginTime as interval_started_at,
base.EndTime as interval_ended_at,
base.received_partner_at,
base.Activity as activity_code,
case upper(coalesce(base.Activity, ''))

View File

@ -23,6 +23,7 @@ import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingP
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionResultDto;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionService;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingPlanDescriptorDto;
import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographDriverParityResultDto;
import at.procon.eventhub.processing.eventprocessing.validation.RuntimeTachographParityCategoryComparisonDto;
@ -378,6 +379,91 @@ class UnifiedRuntimeProcessingControllerTest {
.andExpect(jsonPath("$.partitioningStrategy").value("DRIVER"));
}
@Test
void runsRuntimeProcessingExecutionWithExplicitSourceInputsViaRuntimeApi() throws Exception {
UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class);
UnifiedRuntimeDriverTimelineService timelineService = org.mockito.Mockito.mock(UnifiedRuntimeDriverTimelineService.class);
UnifiedRuntimeDerivedProjectionService derivedProjectionService = org.mockito.Mockito.mock(UnifiedRuntimeDerivedProjectionService.class);
RuntimeProcessingExecutionService executionService = org.mockito.Mockito.mock(RuntimeProcessingExecutionService.class);
MockMvc mockMvc = MockMvcBuilders.standaloneSetup(new UnifiedRuntimeProcessingController(
eventAssemblyService,
timelineService,
derivedProjectionService,
null,
null,
null,
null,
executionService
))
.setMessageConverters(new MappingJackson2HttpMessageConverter(objectMapper))
.setControllerAdvice(new UnifiedRuntimeProcessingExceptionHandler())
.build();
when(executionService.execute(any()))
.thenAnswer(invocation -> {
RuntimeProcessingExecutionApiRequest apiRequest = invocation.getArgument(0);
UnifiedRuntimeProcessingRequest request = apiRequest.sourceSelection().toRuntimeRequest();
return new RuntimeProcessingExecutionResultDto(
"driver-working-time-v1",
List.of("event-to-activity-intervals"),
RuntimeEventPartitioningStrategy.DRIVER,
request,
3,
1,
1,
List.of(new UnifiedDiscoveredVehicleRef("VEH-1", "VIN-1", "12", "REG-1")),
Map.of(),
List.of("execution"),
List.of()
);
});
mockMvc.perform(post("/api/eventhub/runtime-processing/executions")
.contentType("application/json")
.content("""
{
"processingPlanKey": "driver-working-time-v1",
"sourceSelection": {
"tenantKey": "default",
"driverKey": "12:12345678901234",
"occurredFrom": "2026-05-01T08:00:00Z",
"occurredTo": "2026-05-01T10:00:00Z",
"sourceInputs": [
{
"sourceFamily": "TACHOGRAPH_FILE_SESSION",
"eventBackend": "SOURCE_DB",
"sessionIds": [
"11111111-1111-1111-1111-111111111111",
"22222222-2222-2222-2222-222222222222"
]
},
{
"sourceFamily": "TACHOGRAPH_DB",
"eventBackend": "EVENTHUB_DB"
},
{
"sourceFamily": "YELLOWFOX_DB",
"eventBackend": "SOURCE_DB"
}
]
},
"partitioning": {
"strategy": "DRIVER"
}
}
"""))
.andExpect(status().isOk())
.andExpect(jsonPath("$.processingPlanKey").value("driver-working-time-v1"))
.andExpect(jsonPath("$.request.sourceInputs[0].sourceFamily").value("TACHOGRAPH_FILE_SESSION"))
.andExpect(jsonPath("$.request.sourceInputs[0].eventBackend").value("SOURCE_DB"))
.andExpect(jsonPath("$.request.sourceInputs[0].sessionIds[0]").value("11111111-1111-1111-1111-111111111111"))
.andExpect(jsonPath("$.request.sourceInputs[1].sourceFamily").value("TACHOGRAPH_DB"))
.andExpect(jsonPath("$.request.sourceInputs[1].eventBackend").value("EVENTHUB_DB"))
.andExpect(jsonPath("$.request.sourceInputs[2].sourceFamily").value("YELLOWFOX_DB"))
.andExpect(jsonPath("$.request.driverCardNation").value("12"))
.andExpect(jsonPath("$.request.driverCardNumber").value("12345678901234"));
}
@Test
void listsRuntimeEventProcessingProfilesViaRuntimeApi() throws Exception {
UnifiedRuntimeEventAssemblyService eventAssemblyService = org.mockito.Mockito.mock(UnifiedRuntimeEventAssemblyService.class);

View File

@ -0,0 +1,106 @@
package at.procon.eventhub.processing.driverworkingtime.dto;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class DriverWorkingTimeProcessingResultDtoTest {
@Test
void canExcludeActivityAndDrivingIntervalsWhileKeepingCounts() {
DriverWorkingTimeActivityInterval activityInterval = new DriverWorkingTimeActivityInterval(
null,
"12:123",
"ACT-1",
"WORK",
null,
null,
null,
null,
null,
"DRIVER_CARD",
null,
null,
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T09:00:00Z"),
OffsetDateTime.parse("2026-05-01T08:00:00Z").toEpochSecond(),
OffsetDateTime.parse("2026-05-01T09:00:00Z").toEpochSecond(),
3600L,
List.of("ACT-1"),
false,
false,
"RAW_INTERVAL"
);
DriverWorkingTimeActivityInterval drivingInterval = new DriverWorkingTimeActivityInterval(
null,
"12:123",
"DRV-1",
"DRIVE",
null,
null,
null,
null,
null,
"DRIVER_CARD",
null,
null,
OffsetDateTime.parse("2026-05-01T09:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
OffsetDateTime.parse("2026-05-01T09:00:00Z").toEpochSecond(),
OffsetDateTime.parse("2026-05-01T10:00:00Z").toEpochSecond(),
3600L,
List.of("DRV-1"),
false,
false,
"RAW_INTERVAL"
);
DriverWorkingTimeProcessingResultDto result = new DriverWorkingTimeProcessingResultDto(
UUID.randomUUID(),
"12:123",
"UNIFIED_EVENT_STREAM",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
1,
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
List.of(activityInterval),
List.of(drivingInterval),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of("note")
);
DriverWorkingTimeProcessingResultDto compact = result.withIncludedIntervals(false, false);
assertThat(compact.activityIntervalCount()).isEqualTo(1);
assertThat(compact.drivingIntervalCount()).isEqualTo(1);
assertThat(compact.activityIntervals()).isEmpty();
assertThat(compact.drivingIntervals()).isEmpty();
assertThat(compact.notes()).containsExactly("note");
}
}

View File

@ -49,6 +49,9 @@ class RuntimeEventProcessingServiceTest {
true,
0,
null,
null,
null,
null,
null
),
new RuntimeEventPartitioningApiRequest(RuntimeEventPartitioningStrategy.DRIVER, null, false, null, false, null, false, null, null, null),

View File

@ -0,0 +1,219 @@
package at.procon.eventhub.processing.eventprocessing.module;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import at.procon.eventhub.processing.driverworkingtime.dto.DriverWorkingTimeProcessingResultDto;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeActivityInterval;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDriverPartition;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimePreparedInput;
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeProcessingInput;
import at.procon.eventhub.processing.driverworkingtime.service.DriverWorkingTimeProcessingCore;
import at.procon.eventhub.processing.dto.UnifiedRuntimeDerivedProjectionResultDto;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.dto.UnifiedRuntimeDriverWorkingTimeScopeResultDto;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class DriverWorkingTimeDerivedProjectionsModuleTest {
@Test
void appliesIncludeFlagsOnModulePath() {
DriverWorkingTimeProcessingCore core = org.mockito.Mockito.mock(DriverWorkingTimeProcessingCore.class);
DriverWorkingTimeDerivedProjectionsModule module = new DriverWorkingTimeDerivedProjectionsModule(core);
DriverWorkingTimeActivityInterval activityInterval = new DriverWorkingTimeActivityInterval(
null,
"12:123",
"ACT-1",
"WORK",
null,
null,
null,
null,
null,
"DRIVER_CARD",
null,
null,
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T09:00:00Z"),
OffsetDateTime.parse("2026-05-01T08:00:00Z").toEpochSecond(),
OffsetDateTime.parse("2026-05-01T09:00:00Z").toEpochSecond(),
3600L,
List.of("ACT-1"),
false,
false,
"RAW_INTERVAL"
);
DriverWorkingTimeActivityInterval drivingInterval = new DriverWorkingTimeActivityInterval(
null,
"12:123",
"DRV-1",
"DRIVE",
null,
null,
null,
null,
null,
"DRIVER_CARD",
null,
null,
OffsetDateTime.parse("2026-05-01T09:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
OffsetDateTime.parse("2026-05-01T09:00:00Z").toEpochSecond(),
OffsetDateTime.parse("2026-05-01T10:00:00Z").toEpochSecond(),
3600L,
List.of("DRV-1"),
false,
false,
"RAW_INTERVAL"
);
DriverWorkingTimeProcessingResultDto rawProjection = new DriverWorkingTimeProcessingResultDto(
UUID.randomUUID(),
"12:123",
"UNIFIED_EVENT_STREAM",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
1,
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
List.of(activityInterval),
List.of(drivingInterval),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of("note")
);
when(core.process(any())).thenReturn(rawProjection);
UnifiedRuntimeProcessingApiRequest scope = new UnifiedRuntimeProcessingApiRequest(
UUID.randomUUID(),
List.of(),
null,
null,
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
null,
null,
"12:123",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
true,
0,
null,
null,
null,
false,
false
);
UnifiedRuntimeProcessingRequest runtimeRequest = scope.toRuntimeRequest();
DriverWorkingTimeProcessingInput processingInput = new DriverWorkingTimeProcessingInput(
null,
"12:123",
"UNIFIED_EVENT_STREAM",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
3,
720,
List.of(),
List.of(),
List.of(),
List.of()
);
DriverWorkingTimePreparedInput preparedInput = new DriverWorkingTimePreparedInput(
"12:123",
new DriverWorkingTimeDriverPartition(
"12:123",
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
null,
List.of(),
null,
List.of(),
List.of()
),
processingInput
);
UnifiedRuntimeEventBundle bundle = new UnifiedRuntimeEventBundle(
runtimeRequest,
List.of(),
List.of(),
List.of(),
List.of(),
List.of()
);
RuntimeProcessingModuleContext context = new RuntimeProcessingModuleContext(
new RuntimeProcessingExecutionApiRequest("driver-working-time-v1", scope, null, List.of(), Map.of()),
List.of(),
Map.of("runtimeScopeApiRequest", scope),
Map.of(
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
new RuntimeProcessingModuleResult(
DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY,
RuntimeProcessingModuleStatus.SUCCESS,
bundle,
Map.of(),
List.of()
),
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION,
new RuntimeProcessingModuleResult(
DriverWorkingTimeModuleKeys.SUPPORT_EVIDENCE_NORMALIZATION,
RuntimeProcessingModuleStatus.SUCCESS,
Map.of("12:123", preparedInput),
Map.of(),
List.of()
)
)
);
RuntimeProcessingModuleResult result = module.execute(context);
UnifiedRuntimeDriverWorkingTimeScopeResultDto scopeResult =
(UnifiedRuntimeDriverWorkingTimeScopeResultDto) result.output();
UnifiedRuntimeDerivedProjectionResultDto driverResult = scopeResult.driverResults().get("12:123");
assertThat(driverResult.projection().activityIntervalCount()).isEqualTo(1);
assertThat(driverResult.projection().drivingIntervalCount()).isEqualTo(1);
assertThat(driverResult.projection().activityIntervals()).isEmpty();
assertThat(driverResult.projection().drivingIntervals()).isEmpty();
}
}

View File

@ -40,6 +40,8 @@ class TachographDriverEsperRuntimeEventProcessingProfileTest {
"minimumRestPeriodMinutes",
"attachVehicleOnlyEvents",
"vehicleEvidencePaddingMinutes",
"includeActivityIntervals",
"includeDrivingIntervals",
"includePartitionDebug"
);
}
@ -71,6 +73,9 @@ class TachographDriverEsperRuntimeEventProcessingProfileTest {
true,
15,
null,
null,
null,
null,
null
);
RuntimeEventProcessingApiRequest request = new RuntimeEventProcessingApiRequest(
@ -93,6 +98,8 @@ class TachographDriverEsperRuntimeEventProcessingProfileTest {
"minimumRestPeriodMinutes", "600",
"vehicleEvidencePaddingMinutes", 20,
"attachVehicleOnlyEvents", true,
"includeActivityIntervals", true,
"includeDrivingIntervals", true,
"includePartitionDebug", true
)
);
@ -116,7 +123,8 @@ class TachographDriverEsperRuntimeEventProcessingProfileTest {
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-31T23:59:59Z"),
true,
15
15,
true
);
UnifiedRuntimeDerivedProjectionResultDto driverResult = new UnifiedRuntimeDerivedProjectionResultDto(
processedRequest,
@ -158,6 +166,8 @@ class TachographDriverEsperRuntimeEventProcessingProfileTest {
assertThat(delegated.minimumRestPeriodMinutes()).isEqualTo(600);
assertThat(delegated.vehicleExpansionPaddingMinutes()).isEqualTo(20);
assertThat(delegated.expandVehicleEvents()).isTrue();
assertThat(delegated.includeActivityIntervals()).isTrue();
assertThat(delegated.includeDrivingIntervals()).isTrue();
assertThat(debugCaptor.getValue()).isTrue();
}
}

View File

@ -99,8 +99,11 @@ class RuntimeMixedSourceEvidenceValidationServiceTest {
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15,
null,
3,
720
720,
null,
null
),
new RuntimeEventPartitioningApiRequest(
RuntimeEventPartitioningStrategy.DRIVER,

View File

@ -82,7 +82,8 @@ class UnifiedDriverEventsRequestTest {
null,
null,
null,
null
null,
false
)).isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("At least one driver or vehicle selector");
}

View File

@ -34,6 +34,7 @@ class UnifiedRuntimeProcessingRequestTest {
assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.SOURCE_DB);
assertThat(request.expandVehicleEvents()).isTrue();
assertThat(request.vehicleOccurredFrom()).isEqualTo(OffsetDateTime.parse("2026-05-01T00:00:00Z"));
assertThat(request.includeIntersectingIntervals()).isTrue();
}
@Test
@ -92,7 +93,8 @@ class UnifiedRuntimeProcessingRequestTest {
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
0
0,
true
);
assertThat(driverCardOnlyRequest.tachographSourceKinds()).containsExactly(UnifiedTachographSourceKind.DRIVER_CARD);
@ -136,6 +138,140 @@ class UnifiedRuntimeProcessingRequestTest {
assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.SOURCE_DB);
}
@Test
void infersDriverCardSelectorFromDriverKeyForMixedSourceScope() {
UUID sessionId = UUID.randomUUID();
UnifiedRuntimeProcessingRequest request = new UnifiedRuntimeProcessingRequest(
sessionId,
List.of(),
null,
"default",
Set.of(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedEventSourceFamily.YELLOWFOX_DB
),
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
"12:1234567890123456",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
10,
true
);
assertThat(request.driverKey()).isEqualTo("12:1234567890123456");
assertThat(request.driverCardNation()).isEqualTo("12");
assertThat(request.driverCardNumber()).isEqualTo("12345678901234");
assertThat(request.sourceFamilies()).containsExactlyInAnyOrder(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedEventSourceFamily.YELLOWFOX_DB
);
}
@Test
void supportsExplicitMixedBackendSourceInputs() {
UUID sessionId = UUID.randomUUID();
UnifiedRuntimeProcessingRequest request = new UnifiedRuntimeProcessingRequest(
null,
List.of(),
null,
"default",
Set.of(),
null,
null,
"12:1234567890123456",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
10,
true,
List.of(
new UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedRuntimeEventBackend.SOURCE_DB,
sessionId,
List.of(),
null
),
new UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedRuntimeEventBackend.EVENTHUB_DB,
null,
List.of(),
null
),
new UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.YELLOWFOX_DB,
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
List.of(),
null
)
)
);
assertThat(request.sourceFamilies()).containsExactlyInAnyOrder(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedEventSourceFamily.YELLOWFOX_DB
);
assertThat(request.normalizedSourceInputs()).hasSize(3);
assertThat(request.normalizedSourceInputs()).extracting(UnifiedRuntimeSourceInput::eventBackend)
.containsExactly(
UnifiedRuntimeEventBackend.SOURCE_DB,
UnifiedRuntimeEventBackend.EVENTHUB_DB,
UnifiedRuntimeEventBackend.SOURCE_DB
);
assertThat(request.driverCardNation()).isEqualTo("12");
assertThat(request.driverCardNumber()).isEqualTo("12345678901234");
}
@Test
void doesNotInferDriverCardSelectorFromNonCardDriverKey() {
UnifiedRuntimeProcessingRequest request = new UnifiedRuntimeProcessingRequest(
null,
List.of(),
null,
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB),
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
"DRIVER:42",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
0,
true
);
assertThat(request.driverCardNation()).isNull();
assertThat(request.driverCardNumber()).isNull();
}
@Test
@ -195,7 +331,8 @@ class UnifiedRuntimeProcessingRequestTest {
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
10
10,
true
);
assertThat(request.driverKey()).isNull();
@ -224,7 +361,8 @@ class UnifiedRuntimeProcessingRequestTest {
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
10
10,
true
);
assertThat(request.includeAllDrivers()).isTrue();
@ -254,7 +392,8 @@ class UnifiedRuntimeProcessingRequestTest {
null,
null,
true,
0
0,
true
)).isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Use either compositeSessionId");
}
@ -281,7 +420,8 @@ class UnifiedRuntimeProcessingRequestTest {
null,
null,
true,
0
0,
true
)).isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("At least one driver selector");
}

View File

@ -63,7 +63,8 @@ class UnifiedVehicleEventsRequestTest {
null,
null,
null,
null
null,
false
)).isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("At least one vehicle selector");
}

View File

@ -84,6 +84,8 @@ class EventHubRuntimeEventLoaderTest {
assertThat(driverRequest.occurredFrom()).isEqualTo(OffsetDateTime.parse("2026-05-01T00:00:00Z"));
assertThat(driverRequest.occurredTo()).isEqualTo(OffsetDateTime.parse("2026-05-02T00:00:00Z"));
});
assertThat(driverSource.requests).extracting(UnifiedDriverEventsRequest::includeIntersectingIntervals)
.containsExactly(true, false);
}
@Test
@ -121,6 +123,8 @@ class EventHubRuntimeEventLoaderTest {
assertThat(vehicleRequest.occurredFrom()).isEqualTo(OffsetDateTime.parse("2026-04-30T23:45:00Z"));
assertThat(vehicleRequest.occurredTo()).isEqualTo(OffsetDateTime.parse("2026-05-02T00:15:00Z"));
});
assertThat(vehicleSource.requests).extracting(UnifiedVehicleEventsRequest::includeIntersectingIntervals)
.containsExactly(true, false);
}
private static final class CapturingDriverSource implements UnifiedDriverEventSource {

View File

@ -76,6 +76,49 @@ class TachographFileSessionRuntimeEventLoaderTest {
assertThat(loader.loadVehicleEvents(request, new UnifiedDiscoveredVehicleRef("VIN-1", "VIN-1", "12", "REG-1"))).hasSize(5);
}
@Test
void keepsCompleteIntersectingIntervalsWhenRequestStartsInsideInterval() {
EventHubProperties properties = new EventHubProperties();
TachographFileSessionRepository repository = new InMemoryTachographFileSessionRepository(properties);
InMemoryTachographCompositeSessionRepository compositeRepository = new InMemoryTachographCompositeSessionRepository();
IntervalBackedDriverTimelineEventBuilder eventBuilder = new IntervalBackedDriverTimelineEventBuilder(
new DriverTimelineBuilder(),
new DriverKeyFactory(),
new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper())
);
TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader(
new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, eventBuilder))),
new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, eventBuilder))),
compositeRepository,
new EventAcquisitionRecordKeyService(),
new EventHubEventSorter()
);
DriverExtractionSession driver = driver();
TachographFileSession session = session(driver);
repository.save(session);
UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forTachographFileSession(
session.sessionId(),
driver.driverKey(),
OffsetDateTime.parse("2026-05-01T08:45:00Z"),
OffsetDateTime.parse("2026-05-01T09:15:00Z"),
true,
0
);
assertThat(loader.loadDriverEvents(request))
.extracting(event -> event.occurredAt())
.containsExactly(
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T08:30:00Z"),
OffsetDateTime.parse("2026-05-01T08:45:00Z"),
OffsetDateTime.parse("2026-05-01T09:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z")
);
}
@Test

View File

@ -60,7 +60,8 @@ class UnifiedRuntimeDriverTimelineServiceTest {
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0
0,
true
)
);

View File

@ -16,6 +16,8 @@ import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.List;
@ -52,7 +54,8 @@ class UnifiedRuntimeEventAssemblyServiceTest {
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15
15,
true
)
);
@ -94,6 +97,146 @@ class UnifiedRuntimeEventAssemblyServiceTest {
.containsExactly("SEED-1", "SEED-2");
}
@Test
void deduplicatesEquivalentCrossSourceActivityEventsByCanonicalIdentity() {
UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService(
List.of(new FakeTachographDbLoader(), new FakeYellowFoxDbLoader()),
List.of()
);
UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents(
UnifiedRuntimeProcessingRequest.forDriverFromEventHub(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB),
"DRIVER:42",
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0
)
);
assertThat(bundle.driverSeedEvents()).hasSize(1);
assertThat(bundle.mergedEvents()).hasSize(1);
assertThat(bundle.mergedEvents().get(0).occurredAt()).isEqualTo(OffsetDateTime.parse("2026-05-01T08:00:00Z"));
assertThat(bundle.mergedEvents().get(0).eventType()).isEqualTo(EventType.DRIVE);
assertThat(bundle.mergedEvents().get(0).lifecycle()).isEqualTo(EventLifecycle.START);
}
@Test
void assemblesMixedFileSessionAndDirectDbSourcesUnderSourceDbBackend() {
UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService(
List.of(
new FakeFileSessionLoader(),
new FakeDirectTachographDbLoader(),
new FakeDirectYellowFoxDbLoader()
),
List.of()
);
UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents(
new UnifiedRuntimeProcessingRequest(
UUID.fromString("11111111-1111-1111-1111-111111111111"),
List.of(),
null,
"default",
Set.of(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedEventSourceFamily.YELLOWFOX_DB
),
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
"12:1234567890123456",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0,
true
)
);
assertThat(bundle.driverSeedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("FILE-SESSION-1", "TACHO-DB-1", "YF-DB-1");
}
@Test
void assemblesMixedBackendSourcesFromExplicitSourceInputs() {
UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService(
List.of(
new FakeFileSessionLoader(),
new FakeTachographDbLoader(),
new FakeDirectYellowFoxDbLoader()
),
List.of()
);
UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents(
new UnifiedRuntimeProcessingRequest(
null,
List.of(),
null,
"default",
Set.of(),
null,
null,
"12:1234567890123456",
Set.of(),
false,
Set.of(),
false,
null,
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0,
true,
List.of(
new at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
UnifiedRuntimeEventBackend.SOURCE_DB,
UUID.fromString("11111111-1111-1111-1111-111111111111"),
List.of(),
null
),
new at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedRuntimeEventBackend.EVENTHUB_DB,
null,
List.of(),
null
),
new at.procon.eventhub.processing.model.UnifiedRuntimeSourceInput(
UnifiedEventSourceFamily.YELLOWFOX_DB,
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
List.of(),
null
)
)
)
);
assertThat(bundle.driverSeedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("FILE-SESSION-1", "TACHO-ACT-1", "YF-DB-1");
assertThat(bundle.notes()).anySatisfy(note -> assertThat(note).contains("mixed runtime scope"));
}
private static final class FakeLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader {
@Override
@ -170,4 +313,167 @@ class UnifiedRuntimeEventAssemblyServiceTest {
);
}
}
private static final class FakeTachographDbLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB
&& sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"TACHO-ACT-1",
source("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD"),
"2026-05-01T08:00:00Z",
"2026-05-01T08:00:00Z",
"2026-05-01T09:00:00Z"
));
}
}
private static final class FakeYellowFoxDbLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB
&& sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"YF-ACT-99",
source("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8"),
"2026-05-01T08:00:00Z",
"2026-05-01T08:00:00Z",
"2026-05-01T09:00:00Z"
));
}
}
private static final class FakeFileSessionLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB
&& sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"FILE-SESSION-1",
source("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_FILE_SESSION"),
"2026-05-01T07:55:00Z",
"2026-05-01T07:55:00Z",
"2026-05-01T08:25:00Z"
));
}
}
private static final class FakeDirectTachographDbLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB
&& sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"TACHO-DB-1",
source("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD"),
"2026-05-01T08:30:00Z",
"2026-05-01T08:30:00Z",
"2026-05-01T09:00:00Z"
));
}
}
private static final class FakeDirectYellowFoxDbLoader implements RuntimeDriverEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB
&& sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(canonicalActivityEvent(
"YF-DB-1",
source("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8"),
"2026-05-01T09:05:00Z",
"2026-05-01T09:05:00Z",
"2026-05-01T09:35:00Z"
));
}
}
private static EventHubEventDto canonicalActivityEvent(
String externalId,
EventSourceDto source,
String occurredAt,
String startedAt,
String endedAt
) {
ObjectNode raw = JsonNodeFactory.instance.objectNode();
raw.put("intervalId", externalId + "-INTERVAL");
raw.put("startedAt", startedAt);
raw.put("endedAt", endedAt);
raw.put("driverKey", "DRIVER:42");
raw.put("activityType", "DRIVE");
raw.put("cardSlot", "DRIVER");
raw.put("cardStatus", "INSERTED");
raw.put("drivingStatus", "SINGLE");
raw.put("registrationKey", "AT:W-1");
raw.put("vehicleKey", "VIN-1");
raw.put("sourceKind", source.sourceKind());
raw.putArray("sourceRowIds").add(externalId + "-ROW");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
return new EventHubEventDto(
UUID.randomUUID(),
externalId,
new DriverRefDto("DRIVER:42", null),
new VehicleRefDto(
"VEH-1",
"VIN-1",
"VEH-1",
new VehicleRegistrationRefDto("AT", "W-1")
),
OffsetDateTime.parse(occurredAt),
null,
OffsetDateTime.parse(occurredAt),
EventDomain.DRIVER_ACTIVITY,
EventType.DRIVE,
EventLifecycle.START,
null,
null,
null,
null,
payload,
false,
new at.procon.eventhub.dto.EventHubPackageRequest(
"default",
source,
null,
ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z")
),
EventDomain.DRIVER_ACTIVITY.name(),
LocalDate.parse("2026-05-01"),
source.stableKey() + ":DRIVER_ACTIVITY:2026-05-01"
)
);
}
private static EventSourceDto source(String provider, String sourceKind, String sourceKey) {
return new EventSourceDto(provider, sourceKind, sourceKey, null, null, null);
}
}