diff --git a/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java index b64aab4..f5b1f47 100644 --- a/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java +++ b/src/main/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequest.java @@ -2,11 +2,14 @@ package at.procon.eventhub.processing.model; import java.time.OffsetDateTime; import java.util.Set; +import java.util.UUID; public record UnifiedRuntimeProcessingRequest( + UUID sessionId, String tenantKey, Set sourceFamilies, UnifiedRuntimeEventBackend eventBackend, + String driverKey, String driverSourceEntityId, String driverCardNation, String driverCardNumber, @@ -16,19 +19,31 @@ public record UnifiedRuntimeProcessingRequest( int vehicleExpansionPaddingMinutes ) { public UnifiedRuntimeProcessingRequest { + driverKey = normalize(driverKey); tenantKey = normalize(tenantKey); driverSourceEntityId = normalize(driverSourceEntityId); driverCardNation = normalizeUpper(driverCardNation); driverCardNumber = normalize(driverCardNumber); + boolean includesFileSession = sourceFamilies != null && sourceFamilies.contains(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION); + boolean includesExternalDb = sourceFamilies != null && sourceFamilies.stream() + .anyMatch(family -> family == UnifiedEventSourceFamily.TACHOGRAPH_DB || family == UnifiedEventSourceFamily.YELLOWFOX_DB); if (tenantKey == null) { - throw new IllegalArgumentException("tenantKey must not be blank"); + if (!includesFileSession || includesExternalDb) { + throw new IllegalArgumentException("tenantKey must not be blank"); + } } if (sourceFamilies == null || sourceFamilies.isEmpty()) { throw new IllegalArgumentException("sourceFamilies must not be empty"); } sourceFamilies = Set.copyOf(sourceFamilies); eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend; - if (driverSourceEntityId == null && driverCardNumber == null) { + if (includesFileSession && sessionId == null) { + throw new IllegalArgumentException("sessionId must not be null when TACHOGRAPH_FILE_SESSION is selected."); + } + if (includesFileSession && driverKey == null) { + throw new IllegalArgumentException("driverKey must not be blank when TACHOGRAPH_FILE_SESSION is selected."); + } + if (includesExternalDb && driverSourceEntityId == null && driverCardNumber == null) { throw new IllegalArgumentException("At least one driver selector must be provided."); } if (occurredFrom != null && occurredTo != null && occurredTo.isBefore(occurredFrom)) { @@ -100,9 +115,11 @@ public record UnifiedRuntimeProcessingRequest( int vehicleExpansionPaddingMinutes ) { return new UnifiedRuntimeProcessingRequest( + null, tenantKey, sourceFamilies, eventBackend, + null, driverSourceEntityId, driverCardNation, driverCardNumber, @@ -125,9 +142,11 @@ public record UnifiedRuntimeProcessingRequest( int vehicleExpansionPaddingMinutes ) { return new UnifiedRuntimeProcessingRequest( + null, tenantKey, sourceFamilies, UnifiedRuntimeEventBackend.EVENTHUB_DB, + null, driverSourceEntityId, driverCardNation, driverCardNumber, @@ -138,6 +157,30 @@ public record UnifiedRuntimeProcessingRequest( ); } + public static UnifiedRuntimeProcessingRequest forTachographFileSession( + UUID sessionId, + String driverKey, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, + boolean expandVehicleEvents, + int vehicleExpansionPaddingMinutes + ) { + return new UnifiedRuntimeProcessingRequest( + sessionId, + null, + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION), + UnifiedRuntimeEventBackend.SOURCE_DB, + driverKey, + null, + null, + null, + occurredFrom, + occurredTo, + expandVehicleEvents, + vehicleExpansionPaddingMinutes + ); + } + public OffsetDateTime vehicleOccurredFrom() { return occurredFrom == null ? null : occurredFrom.minusMinutes(vehicleExpansionPaddingMinutes); } diff --git a/src/main/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoader.java b/src/main/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoader.java new file mode 100644 index 0000000..a8bd91a --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoader.java @@ -0,0 +1,62 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.processing.model.UnifiedVehicleEventsRequest; +import java.util.List; +import org.springframework.stereotype.Component; + +@Component +public class TachographFileSessionRuntimeEventLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader { + + private final UnifiedDriverEventSourceService driverEventSourceService; + private final UnifiedVehicleEventSourceService vehicleEventSourceService; + + public TachographFileSessionRuntimeEventLoader( + UnifiedDriverEventSourceService driverEventSourceService, + UnifiedVehicleEventSourceService vehicleEventSourceService + ) { + this.driverEventSourceService = driverEventSourceService; + this.vehicleEventSourceService = vehicleEventSourceService; + } + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB + && sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + return driverEventSourceService.loadDriverEvents( + UnifiedDriverEventsRequest.forTachographFileSession( + request.sessionId(), + request.driverKey(), + request.occurredFrom(), + request.occurredTo() + ) + ); + } + + @Override + public List loadVehicleEvents( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef + ) { + return vehicleEventSourceService.loadVehicleEvents( + UnifiedVehicleEventsRequest.forTachographFileSession( + request.sessionId(), + vehicleRef.sourceVehicleEntityId(), + vehicleRef.vin(), + vehicleRef.registrationNation(), + vehicleRef.registrationNumber(), + request.vehicleOccurredFrom(), + request.vehicleOccurredTo() + ) + ); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/UnifiedEventTimelineReconstructor.java b/src/main/java/at/procon/eventhub/processing/service/UnifiedEventTimelineReconstructor.java new file mode 100644 index 0000000..bc0c752 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/UnifiedEventTimelineReconstructor.java @@ -0,0 +1,442 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent; +import at.procon.eventhub.tachographfilesession.model.ExtractionWarning; +import at.procon.eventhub.tachographfilesession.model.ResolvedActivityInterval; +import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline; +import at.procon.eventhub.tachographfilesession.model.ResolvedVehicleUsageInterval; +import com.fasterxml.jackson.databind.JsonNode; +import java.math.BigDecimal; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.springframework.stereotype.Service; + +@Service +public class UnifiedEventTimelineReconstructor { + + public ResolvedDriverTimeline reconstruct( + UUID sessionId, + String driverKey, + List events + ) { + return reconstruct(sessionId, driverKey, events, List.of(), null); + } + + public ResolvedDriverTimeline reconstruct( + UUID sessionId, + String driverKey, + List events, + List warnings, + String sourceKind + ) { + List safeEvents = events == null ? List.of() : List.copyOf(events); + List activityIntervals = reconstructActivityIntervals(safeEvents); + List vehicleUsageIntervals = + reconstructVehicleUsageIntervals(sessionId, driverKey, safeEvents); + List supportEvents = reconstructSupportEvents(safeEvents); + List mergedWarnings = warnings == null ? List.of() : List.copyOf(new LinkedHashSet<>(warnings)); + + OffsetDateTime loadedFrom = minTimestamp(activityIntervals, vehicleUsageIntervals, supportEvents); + OffsetDateTime loadedTo = maxTimestamp(activityIntervals, vehicleUsageIntervals, supportEvents); + return new ResolvedDriverTimeline( + resolveSourceKind(safeEvents, sourceKind), + loadedFrom, + loadedTo, + vehicleUsageIntervals, + activityIntervals, + supportEvents, + mergedWarnings + ); + } + + private List reconstructActivityIntervals(List events) { + Map byIntervalId = new LinkedHashMap<>(); + for (EventHubEventDto event : events) { + if (event.eventDomain() != EventDomain.DRIVER_ACTIVITY) { + continue; + } + JsonNode raw = raw(event); + String intervalId = firstNonBlank( + text(raw, "intervalId"), + text(raw, "sourceRowId"), + event.externalSourceEventId() + ); + ActivityAccumulator accumulator = byIntervalId.computeIfAbsent( + intervalId, + ignored -> new ActivityAccumulator(intervalId) + ); + accumulator.accept(event, raw); + } + return byIntervalId.values().stream() + .map(ActivityAccumulator::finish) + .filter(interval -> interval != null) + .sorted(Comparator.comparing(ResolvedActivityInterval::from) + .thenComparing(ResolvedActivityInterval::to) + .thenComparing(ResolvedActivityInterval::activityType, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + private List reconstructVehicleUsageIntervals( + UUID sessionId, + String driverKey, + List events + ) { + Map byIntervalId = new LinkedHashMap<>(); + for (EventHubEventDto event : events) { + if (event.eventDomain() != EventDomain.DRIVER_CARD) { + continue; + } + if (event.eventType() != EventType.CARD_INSERTED && event.eventType() != EventType.CARD_WITHDRAWN) { + continue; + } + JsonNode raw = raw(event); + String intervalId = firstNonBlank( + text(raw, "intervalId"), + text(raw, "sourceRowId"), + event.externalSourceEventId() + ); + VehicleUsageAccumulator accumulator = byIntervalId.computeIfAbsent( + intervalId, + ignored -> new VehicleUsageAccumulator(sessionId, driverKey, intervalId) + ); + accumulator.accept(event, raw); + } + return byIntervalId.values().stream() + .map(VehicleUsageAccumulator::finish) + .filter(interval -> interval != null) + .sorted(Comparator.comparing(ResolvedVehicleUsageInterval::from) + .thenComparing(ResolvedVehicleUsageInterval::to, Comparator.nullsLast(Comparator.naturalOrder()))) + .toList(); + } + + private List reconstructSupportEvents(List events) { + List result = new ArrayList<>(); + for (EventHubEventDto event : events) { + if (event.eventDomain() == EventDomain.DRIVER_ACTIVITY) { + continue; + } + if (event.eventDomain() == EventDomain.DRIVER_CARD + && (event.eventType() == EventType.CARD_INSERTED || event.eventType() == EventType.CARD_WITHDRAWN)) { + continue; + } + JsonNode raw = raw(event); + String eventId = firstNonBlank(text(raw, "supportEventId"), event.externalSourceEventId()); + BigDecimal latitude = event.position() == null ? null : event.position().latitude(); + BigDecimal longitude = event.position() == null ? null : event.position().longitude(); + result.add(new ExtractedSupportEvent( + eventId, + event.occurredAt(), + event.eventDomain().name(), + text(raw, "supportEventType") == null ? event.eventType().name() : text(raw, "supportEventType"), + event.lifecycle().name(), + text(raw, "slot"), + text(raw, "registrationKey"), + text(raw, "vehicleKey"), + firstNonBlank(text(raw, "country"), detailText(event, "country")), + text(raw, "region"), + firstNonBlank(text(raw, "countryFrom"), detailText(event, "countryFrom")), + firstNonBlank(text(raw, "countryTo"), detailText(event, "countryTo")), + text(raw, "operation"), + latitude, + longitude, + text(raw, "authenticationStatus"), + longValue(raw, "odometerKm"), + text(raw, "code"), + decimal(raw, "avgSpeedKmh"), + decimal(raw, "maxSpeedKmh"), + text(raw, "rawRecordPath") + )); + } + result.sort(Comparator.comparing(ExtractedSupportEvent::occurredAt) + .thenComparing(ExtractedSupportEvent::eventDomain, Comparator.nullsLast(String::compareTo)) + .thenComparing(ExtractedSupportEvent::eventId, Comparator.nullsLast(String::compareTo))); + return List.copyOf(result); + } + + private String resolveSourceKind(List events, String preferredSourceKind) { + if (preferredSourceKind != null && !preferredSourceKind.isBlank()) { + return preferredSourceKind; + } + LinkedHashSet sourceKinds = new LinkedHashSet<>(); + for (EventHubEventDto event : events) { + JsonNode raw = raw(event); + String sourceKind = text(raw, "sourceKind"); + if (sourceKind == null && event.packageInfo() != null && event.packageInfo().eventSource() != null) { + sourceKind = event.packageInfo().eventSource().sourceKind(); + } + if (sourceKind != null && !sourceKind.isBlank()) { + sourceKinds.add(sourceKind); + } + } + if (sourceKinds.isEmpty()) { + return "UNIFIED_EVENT_STREAM"; + } + return sourceKinds.size() == 1 ? sourceKinds.iterator().next() : "UNIFIED_EVENT_STREAM"; + } + + private OffsetDateTime minTimestamp( + List activityIntervals, + List vehicleUsageIntervals, + List supportEvents + ) { + OffsetDateTime min = null; + for (ResolvedActivityInterval interval : activityIntervals) { + min = min(min, interval.from()); + } + for (ResolvedVehicleUsageInterval interval : vehicleUsageIntervals) { + min = min(min, interval.from()); + } + for (ExtractedSupportEvent supportEvent : supportEvents) { + min = min(min, supportEvent.occurredAt()); + } + return min; + } + + private OffsetDateTime maxTimestamp( + List activityIntervals, + List vehicleUsageIntervals, + List supportEvents + ) { + OffsetDateTime max = null; + for (ResolvedActivityInterval interval : activityIntervals) { + max = max(max, interval.to()); + } + for (ResolvedVehicleUsageInterval interval : vehicleUsageIntervals) { + max = max(max, interval.to()); + } + for (ExtractedSupportEvent supportEvent : supportEvents) { + max = max(max, supportEvent.occurredAt()); + } + return max; + } + + private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) { + if (left == null) { + return right; + } + if (right == null) { + return left; + } + return left.isBefore(right) ? left : right; + } + + private OffsetDateTime max(OffsetDateTime left, OffsetDateTime right) { + if (left == null) { + return right; + } + if (right == null) { + return left; + } + return left.isAfter(right) ? left : right; + } + + private JsonNode raw(EventHubEventDto event) { + JsonNode payload = event.payload(); + if (payload == null || payload.isMissingNode()) { + return null; + } + JsonNode raw = payload.get("raw"); + return raw == null || raw.isNull() ? payload : raw; + } + + private String text(JsonNode node, String field) { + if (node == null || field == null) { + return null; + } + JsonNode value = node.get(field); + return value == null || value.isNull() ? null : value.asText(null); + } + + private boolean booleanValue(JsonNode node, String field) { + if (node == null || field == null) { + return false; + } + JsonNode value = node.get(field); + return value != null && !value.isNull() && value.asBoolean(false); + } + + private Long longValue(JsonNode node, String field) { + if (node == null || field == null) { + return null; + } + JsonNode value = node.get(field); + if (value == null || value.isNull()) { + return null; + } + return value.isNumber() ? value.asLong() : Long.parseLong(value.asText()); + } + + private BigDecimal decimal(JsonNode node, String field) { + if (node == null || field == null) { + return null; + } + JsonNode value = node.get(field); + if (value == null || value.isNull()) { + return null; + } + if (value.isNumber()) { + return value.decimalValue(); + } + String text = value.asText(null); + return text == null || text.isBlank() ? null : new BigDecimal(text); + } + + private List stringList(JsonNode node, String field) { + if (node == null || field == null) { + return List.of(); + } + JsonNode value = node.get(field); + if (value == null || value.isNull() || !value.isArray()) { + return List.of(); + } + List result = new ArrayList<>(); + value.forEach(item -> { + String text = item == null || item.isNull() ? null : item.asText(null); + if (text != null && !text.isBlank()) { + result.add(text); + } + }); + return List.copyOf(result); + } + + private String detailText(EventHubEventDto event, String field) { + if (event.eventDetails() == null || event.eventDetails().attributes() == null) { + return null; + } + JsonNode value = event.eventDetails().attributes().get(field); + return value == null || value.isNull() ? null : value.asText(null); + } + + private String activityType(EventHubEventDto event) { + return switch (event.eventType()) { + case DRIVE -> "DRIVE"; + case WORK -> "WORK"; + case AVAILABILITY -> "AVAILABILITY"; + case BREAK_REST -> "BREAK_REST"; + default -> "UNKNOWN"; + }; + } + + private Long toKilometers(Long meters) { + return meters == null ? null : meters / 1_000L; + } + + private String firstNonBlank(String... values) { + if (values == null) { + return null; + } + for (String value : values) { + if (value != null && !value.isBlank()) { + return value; + } + } + return null; + } + + private final class ActivityAccumulator { + private final String intervalId; + private OffsetDateTime startedAt; + private OffsetDateTime endedAt; + private EventHubEventDto sample; + private JsonNode raw; + + private ActivityAccumulator(String intervalId) { + this.intervalId = intervalId; + } + + private void accept(EventHubEventDto event, JsonNode raw) { + if (sample == null) { + sample = event; + this.raw = raw; + } + if (event.lifecycle() == EventLifecycle.START) { + startedAt = event.occurredAt(); + } else if (event.lifecycle() == EventLifecycle.END) { + endedAt = event.occurredAt(); + } + } + + private ResolvedActivityInterval finish() { + if (sample == null || startedAt == null || endedAt == null || !endedAt.isAfter(startedAt)) { + return null; + } + return new ResolvedActivityInterval( + intervalId, + startedAt, + endedAt, + Duration.between(startedAt, endedAt).getSeconds(), + activityType(sample), + detailText(sample, "cardSlot"), + detailText(sample, "cardStatus"), + detailText(sample, "drivingStatus"), + text(raw, "registrationKey"), + text(raw, "vehicleKey"), + text(raw, "sourceKind"), + stringList(raw, "sourceRowIds"), + booleanValue(raw, "synthetic"), + booleanValue(raw, "clippedToRequestedPeriod"), + text(raw, "level") == null ? "RAW_INTERVAL" : text(raw, "level") + ); + } + } + + private final class VehicleUsageAccumulator { + private final UUID sessionId; + private final String driverKey; + private final String intervalId; + private OffsetDateTime startedAt; + private OffsetDateTime endedAt; + private Long odometerBeginKm; + private Long odometerEndKm; + private JsonNode raw; + + private VehicleUsageAccumulator(UUID sessionId, String driverKey, String intervalId) { + this.sessionId = sessionId; + this.driverKey = driverKey; + this.intervalId = intervalId; + } + + private void accept(EventHubEventDto event, JsonNode raw) { + if (this.raw == null) { + this.raw = raw; + } + if (event.eventType() == EventType.CARD_INSERTED) { + startedAt = event.occurredAt(); + odometerBeginKm = toKilometers(event.odometerM()); + } else if (event.eventType() == EventType.CARD_WITHDRAWN) { + endedAt = event.occurredAt(); + odometerEndKm = toKilometers(event.odometerM()); + } + } + + private ResolvedVehicleUsageInterval finish() { + if (startedAt == null) { + return null; + } + return ResolvedVehicleUsageInterval.resolved( + sessionId, + driverKey, + intervalId, + startedAt, + endedAt, + odometerBeginKm, + odometerEndKm, + text(raw, "registrationKey"), + text(raw, "vehicleKey"), + text(raw, "sourceKind"), + stringList(raw, "sourceRowIds") + ); + } + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDriverTimelineService.java b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDriverTimelineService.java new file mode 100644 index 0000000..a8b96f7 --- /dev/null +++ b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeDriverTimelineService.java @@ -0,0 +1,55 @@ +package at.procon.eventhub.processing.service; + +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline; +import java.util.List; +import org.springframework.stereotype.Service; + +@Service +public class UnifiedRuntimeDriverTimelineService { + + private final UnifiedRuntimeEventAssemblyService runtimeEventAssemblyService; + private final UnifiedEventTimelineReconstructor timelineReconstructor; + + public UnifiedRuntimeDriverTimelineService( + UnifiedRuntimeEventAssemblyService runtimeEventAssemblyService, + UnifiedEventTimelineReconstructor timelineReconstructor + ) { + this.runtimeEventAssemblyService = runtimeEventAssemblyService; + this.timelineReconstructor = timelineReconstructor; + } + + public ResolvedDriverTimeline loadDriverTimeline(UnifiedRuntimeProcessingRequest request) { + UnifiedRuntimeEventBundle bundle = runtimeEventAssemblyService.assembleDriverScopedEvents(request); + return timelineReconstructor.reconstruct( + null, + resolveDriverKey(request, bundle.mergedEvents()), + bundle.mergedEvents() + ); + } + + private String resolveDriverKey( + UnifiedRuntimeProcessingRequest request, + List events + ) { + if (request.driverKey() != null) { + return request.driverKey(); + } + if (request.driverSourceEntityId() != null) { + return request.driverSourceEntityId(); + } + for (EventHubEventDto event : events) { + DriverRefDto driverRef = event.driverRef(); + if (driverRef != null && driverRef.hasAnyReference()) { + return driverRef.stableKey(); + } + } + if (request.driverCardNation() != null && request.driverCardNumber() != null) { + return request.driverCardNation() + ":" + request.driverCardNumber(); + } + return request.driverCardNumber(); + } +} diff --git a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java index 909b92d..eeaef36 100644 --- a/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java +++ b/src/main/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyService.java @@ -39,7 +39,7 @@ public class UnifiedRuntimeEventAssemblyService { List notes = new ArrayList<>(); notes.add(request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB ? "Driver seed events were loaded from the local EventHub event store." - : "Driver seed events were loaded directly from the selected source databases."); + : "Driver seed events were loaded directly from the selected runtime sources."); if (request.expandVehicleEvents()) { notes.add("Vehicle expansion loaded additional events for vehicles discovered in the driver seed set."); notes.add("Vehicle expansion padding minutes: " + request.vehicleExpansionPaddingMinutes() + "."); diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/service/EventBackedDriverTimelineBuilder.java b/src/main/java/at/procon/eventhub/tachographfilesession/service/EventBackedDriverTimelineBuilder.java index 111e64f..0df193b 100644 --- a/src/main/java/at/procon/eventhub/tachographfilesession/service/EventBackedDriverTimelineBuilder.java +++ b/src/main/java/at/procon/eventhub/tachographfilesession/service/EventBackedDriverTimelineBuilder.java @@ -1,34 +1,33 @@ package at.procon.eventhub.tachographfilesession.service; -import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.processing.service.UnifiedEventTimelineReconstructor; import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; -import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent; import at.procon.eventhub.tachographfilesession.model.ExtractionWarning; -import at.procon.eventhub.tachographfilesession.model.ResolvedActivityInterval; import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline; -import at.procon.eventhub.tachographfilesession.model.ResolvedVehicleUsageInterval; import at.procon.eventhub.tachographfilesession.model.TachographFileSession; import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle; -import com.fasterxml.jackson.databind.JsonNode; -import java.math.BigDecimal; -import java.time.OffsetDateTime; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class EventBackedDriverTimelineBuilder { private final DriverTimelineEventBuilder eventBuilder; + private final UnifiedEventTimelineReconstructor timelineReconstructor; public EventBackedDriverTimelineBuilder(DriverTimelineEventBuilder eventBuilder) { + this(eventBuilder, new UnifiedEventTimelineReconstructor()); + } + + @Autowired + public EventBackedDriverTimelineBuilder( + DriverTimelineEventBuilder eventBuilder, + UnifiedEventTimelineReconstructor timelineReconstructor + ) { this.eventBuilder = eventBuilder; + this.timelineReconstructor = timelineReconstructor; } public ResolvedDriverTimeline build( @@ -36,147 +35,15 @@ public class EventBackedDriverTimelineBuilder { DriverExtractionSession driverSession ) { TachographTimelineEventBundle bundle = eventBuilder.buildEventBundle(session, driverSession); - List activityIntervals = - reconstructActivityIntervals(bundle.activityEvents()); - List vehicleUsageIntervals = - reconstructVehicleUsageIntervals(session.sessionId(), driverSession.driverKey(), bundle.vehicleUsageEvents()); - List supportEvents = - reconstructSupportEvents(bundle.supportEvents()); - List warnings = mergeWarnings(session.warnings(), driverSession.warnings()); - - OffsetDateTime loadedFrom = minTimestamp(activityIntervals, vehicleUsageIntervals, supportEvents); - OffsetDateTime loadedTo = maxTimestamp(activityIntervals, vehicleUsageIntervals, supportEvents); - return new ResolvedDriverTimeline( - resolveSourceKind(session, bundle), - loadedFrom, - loadedTo, - vehicleUsageIntervals, - activityIntervals, - supportEvents, - warnings + return timelineReconstructor.reconstruct( + session.sessionId(), + driverSession.driverKey(), + bundle.allEvents(), + mergeWarnings(session.warnings(), driverSession.warnings()), + fallbackSourceKind(session) ); } - private List reconstructActivityIntervals(List activityEvents) { - if (activityEvents == null || activityEvents.isEmpty()) { - return List.of(); - } - Map byIntervalId = new LinkedHashMap<>(); - for (EventHubEventDto event : activityEvents) { - if (!"DRIVER_ACTIVITY".equals(event.eventDomain().name())) { - continue; - } - JsonNode raw = raw(event); - String intervalId = text(raw, "intervalId"); - if (intervalId == null) { - intervalId = text(raw, "sourceRowId"); - } - if (intervalId == null) { - intervalId = event.externalSourceEventId(); - } - String resolvedIntervalId = intervalId; - ActivityAccumulator accumulator = byIntervalId.computeIfAbsent( - resolvedIntervalId, - ignored -> new ActivityAccumulator(resolvedIntervalId) - ); - accumulator.accept(event, raw); - } - List result = new ArrayList<>(byIntervalId.size()); - for (ActivityAccumulator accumulator : byIntervalId.values()) { - ResolvedActivityInterval interval = accumulator.finish(); - if (interval != null) { - result.add(interval); - } - } - result.sort(Comparator.comparing(ResolvedActivityInterval::from) - .thenComparing(ResolvedActivityInterval::to) - .thenComparing(ResolvedActivityInterval::activityType, Comparator.nullsLast(String::compareTo))); - return List.copyOf(result); - } - - private List reconstructVehicleUsageIntervals( - UUID sessionId, - String driverKey, - List vehicleUsageEvents - ) { - if (vehicleUsageEvents == null || vehicleUsageEvents.isEmpty()) { - return List.of(); - } - Map byIntervalId = new LinkedHashMap<>(); - for (EventHubEventDto event : vehicleUsageEvents) { - if (!"DRIVER_CARD".equals(event.eventDomain().name())) { - continue; - } - JsonNode raw = raw(event); - String intervalId = text(raw, "intervalId"); - if (intervalId == null) { - intervalId = text(raw, "sourceRowId"); - } - if (intervalId == null) { - intervalId = event.externalSourceEventId(); - } - String resolvedIntervalId = intervalId; - VehicleUsageAccumulator accumulator = byIntervalId.computeIfAbsent( - resolvedIntervalId, - ignored -> new VehicleUsageAccumulator(sessionId, driverKey, resolvedIntervalId) - ); - accumulator.accept(event, raw); - } - List result = new ArrayList<>(byIntervalId.size()); - for (VehicleUsageAccumulator accumulator : byIntervalId.values()) { - ResolvedVehicleUsageInterval interval = accumulator.finish(); - if (interval != null) { - result.add(interval); - } - } - result.sort(Comparator.comparing(ResolvedVehicleUsageInterval::from) - .thenComparing(ResolvedVehicleUsageInterval::to, Comparator.nullsLast(Comparator.naturalOrder()))); - return List.copyOf(result); - } - - private List reconstructSupportEvents(List supportEvents) { - if (supportEvents == null || supportEvents.isEmpty()) { - return List.of(); - } - List result = new ArrayList<>(supportEvents.size()); - for (EventHubEventDto event : supportEvents) { - JsonNode raw = raw(event); - String eventId = text(raw, "supportEventId"); - if (eventId == null) { - eventId = event.externalSourceEventId(); - } - BigDecimal latitude = event.position() == null ? null : event.position().latitude(); - BigDecimal longitude = event.position() == null ? null : event.position().longitude(); - result.add(new ExtractedSupportEvent( - eventId, - event.occurredAt(), - event.eventDomain().name(), - text(raw, "supportEventType") == null ? event.eventType().name() : text(raw, "supportEventType"), - event.lifecycle().name(), - text(raw, "slot"), - text(raw, "registrationKey"), - text(raw, "vehicleKey"), - text(raw, "country"), - text(raw, "region"), - text(raw, "countryFrom"), - text(raw, "countryTo"), - text(raw, "operation"), - latitude, - longitude, - text(raw, "authenticationStatus"), - longValue(raw, "odometerKm"), - text(raw, "code"), - decimal(raw, "avgSpeedKmh"), - decimal(raw, "maxSpeedKmh"), - text(raw, "rawRecordPath") - )); - } - result.sort(Comparator.comparing(ExtractedSupportEvent::occurredAt) - .thenComparing(ExtractedSupportEvent::eventDomain, Comparator.nullsLast(String::compareTo)) - .thenComparing(ExtractedSupportEvent::eventId, Comparator.nullsLast(String::compareTo))); - return List.copyOf(result); - } - private List mergeWarnings(List sessionWarnings, List driverWarnings) { LinkedHashSet merged = new LinkedHashSet<>(); if (sessionWarnings != null) { @@ -188,261 +55,7 @@ public class EventBackedDriverTimelineBuilder { return List.copyOf(merged); } - private OffsetDateTime minTimestamp( - List activityIntervals, - List vehicleUsageIntervals, - List supportEvents - ) { - OffsetDateTime min = null; - for (ResolvedActivityInterval interval : activityIntervals) { - min = min(min, interval.from()); - } - for (ResolvedVehicleUsageInterval interval : vehicleUsageIntervals) { - min = min(min, interval.from()); - } - for (ExtractedSupportEvent supportEvent : supportEvents) { - min = min(min, supportEvent.occurredAt()); - } - return min; - } - - private OffsetDateTime maxTimestamp( - List activityIntervals, - List vehicleUsageIntervals, - List supportEvents - ) { - OffsetDateTime max = null; - for (ResolvedActivityInterval interval : activityIntervals) { - max = max(max, interval.to()); - } - for (ResolvedVehicleUsageInterval interval : vehicleUsageIntervals) { - max = max(max, interval.to()); - } - for (ExtractedSupportEvent supportEvent : supportEvents) { - max = max(max, supportEvent.occurredAt()); - } - return max; - } - - private String resolveSourceKind(TachographFileSession session, TachographTimelineEventBundle bundle) { - for (EventHubEventDto event : bundle.allEvents()) { - JsonNode raw = raw(event); - String sourceKind = text(raw, "sourceKind"); - if (sourceKind != null) { - return sourceKind; - } - if (event.packageInfo() != null && event.packageInfo().eventSource() != null - && event.packageInfo().eventSource().sourceKind() != null) { - return event.packageInfo().eventSource().sourceKind(); - } - } + private String fallbackSourceKind(TachographFileSession session) { return session.metadata().driverCardFile() ? "DRIVER_CARD" : "VEHICLE_UNIT"; } - - private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) { - if (left == null) { - return right; - } - if (right == null) { - return left; - } - return left.isBefore(right) ? left : right; - } - - private OffsetDateTime max(OffsetDateTime left, OffsetDateTime right) { - if (left == null) { - return right; - } - if (right == null) { - return left; - } - return left.isAfter(right) ? left : right; - } - - private JsonNode raw(EventHubEventDto event) { - JsonNode payload = event.payload(); - if (payload == null || payload.isMissingNode()) { - return null; - } - JsonNode raw = payload.get("raw"); - return raw == null || raw.isNull() ? payload : raw; - } - - private String text(JsonNode node, String field) { - if (node == null || field == null) { - return null; - } - JsonNode value = node.get(field); - return value == null || value.isNull() ? null : value.asText(null); - } - - private boolean booleanValue(JsonNode node, String field) { - if (node == null || field == null) { - return false; - } - JsonNode value = node.get(field); - return value != null && !value.isNull() && value.asBoolean(false); - } - - private Long longValue(JsonNode node, String field) { - if (node == null || field == null) { - return null; - } - JsonNode value = node.get(field); - if (value == null || value.isNull()) { - return null; - } - return value.isNumber() ? value.asLong() : Long.parseLong(value.asText()); - } - - private BigDecimal decimal(JsonNode node, String field) { - if (node == null || field == null) { - return null; - } - JsonNode value = node.get(field); - if (value == null || value.isNull()) { - return null; - } - if (value.isNumber()) { - return value.decimalValue(); - } - String text = value.asText(null); - return text == null || text.isBlank() ? null : new BigDecimal(text); - } - - private List stringList(JsonNode node, String field) { - if (node == null || field == null) { - return List.of(); - } - JsonNode value = node.get(field); - if (value == null || value.isNull() || !value.isArray()) { - return List.of(); - } - List result = new ArrayList<>(); - value.forEach(item -> { - String text = item == null || item.isNull() ? null : item.asText(null); - if (text != null && !text.isBlank()) { - result.add(text); - } - }); - return List.copyOf(result); - } - - private String detailText(EventHubEventDto event, String field) { - if (event.eventDetails() == null || event.eventDetails().attributes() == null) { - return null; - } - JsonNode value = event.eventDetails().attributes().get(field); - return value == null || value.isNull() ? null : value.asText(null); - } - - private String activityType(EventHubEventDto event) { - return switch (event.eventType()) { - case DRIVE -> "DRIVE"; - case WORK -> "WORK"; - case AVAILABILITY -> "AVAILABILITY"; - case BREAK_REST -> "BREAK_REST"; - default -> "UNKNOWN"; - }; - } - - private Long toKilometers(Long meters) { - return meters == null ? null : meters / 1_000L; - } - - private final class ActivityAccumulator { - private final String intervalId; - private OffsetDateTime startedAt; - private OffsetDateTime endedAt; - private EventHubEventDto sample; - private JsonNode raw; - - private ActivityAccumulator(String intervalId) { - this.intervalId = intervalId; - } - - private void accept(EventHubEventDto event, JsonNode raw) { - if (sample == null) { - sample = event; - this.raw = raw; - } - if (event.lifecycle() == at.procon.eventhub.dto.EventLifecycle.START) { - startedAt = event.occurredAt(); - } else if (event.lifecycle() == at.procon.eventhub.dto.EventLifecycle.END) { - endedAt = event.occurredAt(); - } - } - - private ResolvedActivityInterval finish() { - if (sample == null || startedAt == null || endedAt == null || !endedAt.isAfter(startedAt)) { - return null; - } - return new ResolvedActivityInterval( - intervalId, - startedAt, - endedAt, - java.time.Duration.between(startedAt, endedAt).getSeconds(), - activityType(sample), - detailText(sample, "cardSlot"), - detailText(sample, "cardStatus"), - detailText(sample, "drivingStatus"), - text(raw, "registrationKey"), - text(raw, "vehicleKey"), - text(raw, "sourceKind"), - stringList(raw, "sourceRowIds"), - booleanValue(raw, "synthetic"), - booleanValue(raw, "clippedToRequestedPeriod"), - text(raw, "level") == null ? "RAW_INTERVAL" : text(raw, "level") - ); - } - } - - private final class VehicleUsageAccumulator { - private final UUID sessionId; - private final String driverKey; - private final String intervalId; - private OffsetDateTime startedAt; - private OffsetDateTime endedAt; - private Long odometerBeginKm; - private Long odometerEndKm; - private JsonNode raw; - - private VehicleUsageAccumulator(UUID sessionId, String driverKey, String intervalId) { - this.sessionId = sessionId; - this.driverKey = driverKey; - this.intervalId = intervalId; - } - - private void accept(EventHubEventDto event, JsonNode raw) { - if (this.raw == null) { - this.raw = raw; - } - if (event.eventType() == at.procon.eventhub.dto.EventType.CARD_INSERTED) { - startedAt = event.occurredAt(); - odometerBeginKm = toKilometers(event.odometerM()); - } else if (event.eventType() == at.procon.eventhub.dto.EventType.CARD_WITHDRAWN) { - endedAt = event.occurredAt(); - odometerEndKm = toKilometers(event.odometerM()); - } - } - - private ResolvedVehicleUsageInterval finish() { - if (startedAt == null) { - return null; - } - return ResolvedVehicleUsageInterval.resolved( - sessionId, - driverKey, - intervalId, - startedAt, - endedAt, - odometerBeginKm, - odometerEndKm, - text(raw, "registrationKey"), - text(raw, "vehicleKey"), - text(raw, "sourceKind"), - stringList(raw, "sourceRowIds") - ); - } - } } diff --git a/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java b/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java index 22e5d56..5d9c71f 100644 --- a/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java +++ b/src/test/java/at/procon/eventhub/processing/model/UnifiedRuntimeProcessingRequestTest.java @@ -5,6 +5,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.time.OffsetDateTime; import java.util.Set; +import java.util.UUID; import org.junit.jupiter.api.Test; class UnifiedRuntimeProcessingRequestTest { @@ -70,9 +71,28 @@ class UnifiedRuntimeProcessingRequestTest { assertThat(request.vehicleExpansionPaddingMinutes()).isEqualTo(15); } + @Test + void canBuildFileSessionRuntimeRequest() { + UUID sessionId = UUID.randomUUID(); + UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forTachographFileSession( + sessionId, + "12:123", + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + true, + 10 + ); + + assertThat(request.sessionId()).isEqualTo(sessionId); + assertThat(request.driverKey()).isEqualTo("12:123"); + assertThat(request.sourceFamilies()).containsExactly(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION); + assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.SOURCE_DB); + } + @Test void rejectsRequestWithoutDriverSelector() { assertThatThrownBy(() -> new UnifiedRuntimeProcessingRequest( + null, "default", Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), UnifiedRuntimeEventBackend.SOURCE_DB, @@ -81,6 +101,7 @@ class UnifiedRuntimeProcessingRequestTest { null, null, null, + null, true, 0 )).isInstanceOf(IllegalArgumentException.class) diff --git a/src/test/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoaderTest.java b/src/test/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoaderTest.java new file mode 100644 index 0000000..88d19cd --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/service/TachographFileSessionRuntimeEventLoaderTest.java @@ -0,0 +1,149 @@ +package at.procon.eventhub.processing.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.service.EventDetailsFactory; +import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; +import at.procon.eventhub.tachographfilesession.model.ExtractedCardActivityInterval; +import at.procon.eventhub.tachographfilesession.model.ExtractedCardVehicleUsageInterval; +import at.procon.eventhub.tachographfilesession.model.ExtractedDriver; +import at.procon.eventhub.tachographfilesession.model.ExtractedDriverCard; +import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent; +import at.procon.eventhub.tachographfilesession.model.ExtractedVehicle; +import at.procon.eventhub.tachographfilesession.model.ExtractedVehicleRegistration; +import at.procon.eventhub.tachographfilesession.model.ExtractionStats; +import at.procon.eventhub.tachographfilesession.model.TachographFileSession; +import at.procon.eventhub.tachographfilesession.model.TachographFileSessionMetadata; +import at.procon.eventhub.tachographfilesession.service.DriverKeyFactory; +import at.procon.eventhub.tachographfilesession.service.DriverTimelineBuilder; +import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository; +import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder; +import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository; +import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class TachographFileSessionRuntimeEventLoaderTest { + + @Test + void loadsDriverAndVehicleEventsFromFileSessionRuntimePath() { + EventHubProperties properties = new EventHubProperties(); + TachographFileSessionRepository repository = new InMemoryTachographFileSessionRepository(properties); + IntervalBackedDriverTimelineEventBuilder eventBuilder = new IntervalBackedDriverTimelineEventBuilder( + new DriverTimelineBuilder(), + new DriverKeyFactory(), + new VehicleKeyFactory(), + new EventDetailsFactory(new ObjectMapper()) + ); + TachographFileSessionRuntimeEventLoader loader = new TachographFileSessionRuntimeEventLoader( + new UnifiedDriverEventSourceService(List.of(new TachographFileSessionUnifiedDriverEventSource(repository, eventBuilder))), + new UnifiedVehicleEventSourceService(List.of(new TachographFileSessionUnifiedVehicleEventSource(repository, eventBuilder))) + ); + + DriverExtractionSession driver = driver(); + TachographFileSession session = session(driver); + repository.save(session); + + UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forTachographFileSession( + session.sessionId(), + driver.driverKey(), + OffsetDateTime.parse("2026-05-01T08:00:00Z"), + OffsetDateTime.parse("2026-05-01T10:00:00Z"), + true, + 0 + ); + + assertThat(loader.supports(request, at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION)).isTrue(); + assertThat(loader.loadDriverEvents(request)).hasSize(5); + assertThat(loader.loadVehicleEvents(request, new UnifiedDiscoveredVehicleRef("VIN-1", "VIN-1", "12", "REG-1"))).hasSize(5); + } + + private DriverExtractionSession driver() { + return new DriverExtractionSession( + "12:123", + new ExtractedDriver("12:123", "DRV:12:123", "Doe", "Jane", null, null, null, null, null), + new ExtractedDriverCard("CARD:12:123", "12", "123", null, null, null, null), + List.of(new ExtractedVehicleRegistration("12:REG-1", "VR:12:REG-1", "12", "REG-1")), + List.of(new ExtractedVehicle("VIN-1", "VIN:VIN-1", "VIN-1")), + List.of(new ExtractedCardVehicleUsageInterval( + "CVU-1", + OffsetDateTime.parse("2026-05-01T08:00:00Z"), + OffsetDateTime.parse("2026-05-01T10:00:00Z"), + 100L, + 200L, + "12:REG-1", + "VIN-1", + "vu-1" + )), + List.of(new ExtractedCardActivityInterval( + "ACT-1", + OffsetDateTime.parse("2026-05-01T08:30:00Z"), + OffsetDateTime.parse("2026-05-01T09:00:00Z"), + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "a" + )), + List.of(new ExtractedSupportEvent( + "SUP-1", + OffsetDateTime.parse("2026-05-01T08:45:00Z"), + "POSITION", + "POSITION_RECORDED", + "SNAPSHOT", + "DRIVER", + "12:REG-1", + "VIN-1", + null, + null, + null, + null, + null, + BigDecimal.valueOf(48.2082), + BigDecimal.valueOf(16.3738), + "AUTHENTIC", + 150L, + null, + null, + null, + "raw-path" + )), + List.of() + ); + } + + private TachographFileSession session(DriverExtractionSession driver) { + return new TachographFileSession( + UUID.randomUUID(), + new TachographFileSessionMetadata( + "default", + "legalrequirements-drivercard", + "sample", + "sample.ddd", + "a", + 2, + "42", + "b", + true, + null + ), + Map.of(driver.driverKey(), driver), + new ExtractionStats(1, 1, 1, 1, 1, 0), + List.of(), + Instant.now(), + Instant.now().plus(4, ChronoUnit.HOURS) + ); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/service/UnifiedEventTimelineReconstructorTest.java b/src/test/java/at/procon/eventhub/processing/service/UnifiedEventTimelineReconstructorTest.java new file mode 100644 index 0000000..38a00ed --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/service/UnifiedEventTimelineReconstructorTest.java @@ -0,0 +1,176 @@ +package at.procon.eventhub.processing.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.GeoPointDto; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class UnifiedEventTimelineReconstructorTest { + + private final UnifiedEventTimelineReconstructor reconstructor = new UnifiedEventTimelineReconstructor(); + + @Test + void reconstructsTimelineFromUnifiedEvents() { + List events = List.of( + activityEvent("ACT-1", EventLifecycle.START, "2026-05-01T08:00:00Z"), + activityEvent("ACT-1", EventLifecycle.END, "2026-05-01T09:00:00Z"), + vehicleUsageEvent("CVU-1", EventType.CARD_INSERTED, EventLifecycle.INSERT, "2026-05-01T07:50:00Z", 100_000L), + vehicleUsageEvent("CVU-1", EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW, "2026-05-01T10:10:00Z", 140_000L), + supportEvent("SUP-1", "2026-05-01T08:30:00Z") + ); + + ResolvedDriverTimeline timeline = reconstructor.reconstruct( + UUID.randomUUID(), + "DRIVER:42", + events + ); + + assertThat(timeline.sourceKind()).isEqualTo("DRIVER_CARD"); + assertThat(timeline.loadedFrom()).isEqualTo(OffsetDateTime.parse("2026-05-01T07:50:00Z")); + assertThat(timeline.loadedTo()).isEqualTo(OffsetDateTime.parse("2026-05-01T10:10:00Z")); + assertThat(timeline.activityIntervals()).hasSize(1); + assertThat(timeline.activityIntervals().get(0).intervalId()).isEqualTo("ACT-1"); + assertThat(timeline.activityIntervals().get(0).activityType()).isEqualTo("DRIVE"); + assertThat(timeline.activityIntervals().get(0).registrationKey()).isEqualTo("12:REG-1"); + assertThat(timeline.vehicleUsageIntervals()).hasSize(1); + assertThat(timeline.vehicleUsageIntervals().get(0).intervalId()).isEqualTo("CVU-1"); + assertThat(timeline.vehicleUsageIntervals().get(0).odometerBeginKm()).isEqualTo(100L); + assertThat(timeline.vehicleUsageIntervals().get(0).odometerEndKm()).isEqualTo(140L); + assertThat(timeline.supportEvents()).hasSize(1); + assertThat(timeline.supportEvents().get(0).eventId()).isEqualTo("SUP-1"); + assertThat(timeline.supportEvents().get(0).eventDomain()).isEqualTo("POSITION"); + assertThat(timeline.supportEvents().get(0).latitude()).isEqualByComparingTo(BigDecimal.valueOf(48.2082)); + } + + private EventHubEventDto activityEvent(String intervalId, EventLifecycle lifecycle, String occurredAt) { + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("intervalId", intervalId); + raw.put("registrationKey", "12:REG-1"); + raw.put("vehicleKey", "VIN-1"); + raw.put("sourceKind", "DRIVER_CARD"); + raw.putArray("sourceRowIds").add("row-" + intervalId); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + return new EventHubEventDto( + UUID.randomUUID(), + intervalId + ":" + lifecycle.name(), + new DriverRefDto("DRIVER:42", null), + new VehicleRefDto("VEH-1", "VIN-1", "VR-1", new VehicleRegistrationRefDto("12", 12, "REG-1")), + OffsetDateTime.parse(occurredAt), + null, + OffsetDateTime.parse(occurredAt), + EventDomain.DRIVER_ACTIVITY, + EventType.DRIVE, + lifecycle, + null, + null, + null, + null, + payload, + false, + packageInfo("DRIVER_CARD", EventDomain.DRIVER_ACTIVITY) + ); + } + + private EventHubEventDto vehicleUsageEvent( + String intervalId, + EventType eventType, + EventLifecycle lifecycle, + String occurredAt, + long odometerM + ) { + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("intervalId", intervalId); + raw.put("registrationKey", "12:REG-1"); + raw.put("vehicleKey", "VIN-1"); + raw.put("sourceKind", "DRIVER_CARD"); + raw.putArray("sourceRowIds").add("row-" + intervalId); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + return new EventHubEventDto( + UUID.randomUUID(), + intervalId + ":" + eventType.name(), + new DriverRefDto("DRIVER:42", null), + new VehicleRefDto("VEH-1", "VIN-1", "VR-1", new VehicleRegistrationRefDto("12", 12, "REG-1")), + OffsetDateTime.parse(occurredAt), + null, + OffsetDateTime.parse(occurredAt), + EventDomain.DRIVER_CARD, + eventType, + lifecycle, + odometerM, + null, + null, + null, + payload, + false, + packageInfo("DRIVER_CARD", EventDomain.DRIVER_CARD) + ); + } + + private EventHubEventDto supportEvent(String eventId, String occurredAt) { + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("supportEventId", eventId); + raw.put("registrationKey", "12:REG-1"); + raw.put("vehicleKey", "VIN-1"); + raw.put("sourceKind", "DRIVER_CARD"); + raw.put("authenticationStatus", "AUTHENTIC"); + raw.put("odometerKm", 120); + raw.put("rawRecordPath", "card/positions/1"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + return new EventHubEventDto( + UUID.randomUUID(), + eventId, + new DriverRefDto("DRIVER:42", null), + new VehicleRefDto("VEH-1", "VIN-1", "VR-1", new VehicleRegistrationRefDto("12", 12, "REG-1")), + OffsetDateTime.parse(occurredAt), + null, + OffsetDateTime.parse(occurredAt), + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT, + 120_000L, + new GeoPointDto(BigDecimal.valueOf(48.2082), BigDecimal.valueOf(16.3738)), + null, + null, + payload, + false, + packageInfo("DRIVER_CARD", EventDomain.POSITION) + ); + } + + private EventHubPackageRequest packageInfo(String sourceKind, EventDomain eventDomain) { + EventSourceDto source = new EventSourceDto("TACHOGRAPH", sourceKind, "SOURCE", null, null, null); + return new EventHubPackageRequest( + "default", + source, + null, + ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z") + ), + eventDomain.name(), + LocalDate.parse("2026-05-01"), + source.stableKey() + ":" + eventDomain.name() + ":2026-05-01" + ); + } +} diff --git a/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeDriverTimelineServiceTest.java b/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeDriverTimelineServiceTest.java new file mode 100644 index 0000000..cb85529 --- /dev/null +++ b/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeDriverTimelineServiceTest.java @@ -0,0 +1,179 @@ +package at.procon.eventhub.processing.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; +import at.procon.eventhub.processing.model.UnifiedEventSourceFamily; +import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend; +import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest; +import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class UnifiedRuntimeDriverTimelineServiceTest { + + @Test + void reconstructsTimelineFromRuntimeAssembledEvents() { + UnifiedRuntimeEventAssemblyService assemblyService = new UnifiedRuntimeEventAssemblyService( + List.of(new FakeRuntimeLoader()), + List.of(new FakeRuntimeLoader()) + ); + UnifiedRuntimeDriverTimelineService service = new UnifiedRuntimeDriverTimelineService( + assemblyService, + new UnifiedEventTimelineReconstructor() + ); + + ResolvedDriverTimeline timeline = service.loadDriverTimeline( + new UnifiedRuntimeProcessingRequest( + null, + "default", + Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), + UnifiedRuntimeEventBackend.SOURCE_DB, + null, + "DRIVER:42", + null, + null, + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + false, + 0 + ) + ); + + assertThat(timeline.sourceKind()).isEqualTo("DRIVER_CARD"); + assertThat(timeline.loadedFrom()).isEqualTo(OffsetDateTime.parse("2026-05-01T08:00:00Z")); + assertThat(timeline.loadedTo()).isEqualTo(OffsetDateTime.parse("2026-05-01T11:00:00Z")); + assertThat(timeline.activityIntervals()).hasSize(1); + assertThat(timeline.activityIntervals().get(0).intervalId()).isEqualTo("ACT-1"); + assertThat(timeline.vehicleUsageIntervals()).hasSize(1); + assertThat(timeline.vehicleUsageIntervals().get(0).intervalId()).isEqualTo("CVU-1"); + assertThat(timeline.supportEvents()).isEmpty(); + } + + private static final class FakeRuntimeLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader { + + @Override + public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) { + return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB + && sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB; + } + + @Override + public List loadDriverEvents(UnifiedRuntimeProcessingRequest request) { + return List.of( + activityEvent("ACT-1", EventLifecycle.START, "2026-05-01T08:00:00Z"), + vehicleUsageEvent("CVU-1", EventType.CARD_INSERTED, EventLifecycle.INSERT, "2026-05-01T08:00:00Z", 100_000L), + activityEvent("ACT-1", EventLifecycle.END, "2026-05-01T09:30:00Z"), + vehicleUsageEvent("CVU-1", EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW, "2026-05-01T11:00:00Z", 160_000L) + ); + } + + @Override + public List loadVehicleEvents( + UnifiedRuntimeProcessingRequest request, + UnifiedDiscoveredVehicleRef vehicleRef + ) { + return List.of(); + } + + private EventHubEventDto activityEvent(String intervalId, EventLifecycle lifecycle, String occurredAt) { + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("intervalId", intervalId); + raw.put("registrationKey", "12:REG-1"); + raw.put("vehicleKey", "VIN-1"); + raw.put("sourceKind", "DRIVER_CARD"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + return new EventHubEventDto( + UUID.randomUUID(), + intervalId + ":" + lifecycle.name(), + new DriverRefDto("DRIVER:42", null), + vehicleRef(), + OffsetDateTime.parse(occurredAt), + null, + OffsetDateTime.parse(occurredAt), + EventDomain.DRIVER_ACTIVITY, + EventType.DRIVE, + lifecycle, + null, + null, + null, + null, + payload, + false, + packageInfo(EventDomain.DRIVER_ACTIVITY) + ); + } + + private EventHubEventDto vehicleUsageEvent( + String intervalId, + EventType eventType, + EventLifecycle lifecycle, + String occurredAt, + long odometerM + ) { + ObjectNode raw = JsonNodeFactory.instance.objectNode(); + raw.put("intervalId", intervalId); + raw.put("registrationKey", "12:REG-1"); + raw.put("vehicleKey", "VIN-1"); + raw.put("sourceKind", "DRIVER_CARD"); + ObjectNode payload = JsonNodeFactory.instance.objectNode(); + payload.set("raw", raw); + return new EventHubEventDto( + UUID.randomUUID(), + intervalId + ":" + eventType.name(), + new DriverRefDto("DRIVER:42", null), + vehicleRef(), + OffsetDateTime.parse(occurredAt), + null, + OffsetDateTime.parse(occurredAt), + EventDomain.DRIVER_CARD, + eventType, + lifecycle, + odometerM, + null, + null, + null, + payload, + false, + packageInfo(EventDomain.DRIVER_CARD) + ); + } + + private VehicleRefDto vehicleRef() { + return new VehicleRefDto("VEH-1", "VIN-1", "VR-1", new VehicleRegistrationRefDto("12", 12, "REG-1")); + } + + private EventHubPackageRequest packageInfo(EventDomain eventDomain) { + EventSourceDto source = new EventSourceDto("TACHOGRAPH", "DRIVER_CARD", "SOURCE", null, null, null); + return new EventHubPackageRequest( + "default", + source, + null, + ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-05-01T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:00:00Z") + ), + eventDomain.name(), + LocalDate.parse("2026-05-01"), + source.stableKey() + ":" + eventDomain.name() + ":2026-05-01" + ); + } + } +} diff --git a/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java b/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java index e48710f..ffc7e3c 100644 --- a/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java +++ b/src/test/java/at/procon/eventhub/processing/service/UnifiedRuntimeEventAssemblyServiceTest.java @@ -34,9 +34,11 @@ class UnifiedRuntimeEventAssemblyServiceTest { UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents( new UnifiedRuntimeProcessingRequest( + null, "default", Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB), UnifiedRuntimeEventBackend.SOURCE_DB, + null, "DRIVER:42", null, null,