From 3bccda20e8be5ca88e7dabe10ad0e84644bdf7ca Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 25 May 2026 16:33:59 +0200 Subject: [PATCH] Add raw event tachograph projection input path --- .../eventhub/config/EventHubProperties.java | 19 + .../service/DriverTimelineEventBuilder.java | 13 + ...iverTimelineReusableProjectionBuilder.java | 556 ++++++++++++++++++ ...ervalBackedDriverTimelineEventBuilder.java | 7 + .../RawSourceDriverTimelineEventBuilder.java | 211 +++++++ ...achographFileSessionProcessingService.java | 22 +- src/main/resources/application.yml | 3 + ...derived-projection-events-preprocessor.epl | 348 +++++++++++ ...TimelineReusableProjectionBuilderTest.java | 235 ++++++++ 9 files changed, 1407 insertions(+), 7 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/tachographfilesession/service/RawSourceDriverTimelineEventBuilder.java create mode 100644 src/main/resources/esper/tachograph-driving-derived-projection-events-preprocessor.epl diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index 5dffa74..821cdf6 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -357,6 +357,7 @@ public class EventHubProperties { public static class Processing { private TimelineInputMode timelineInputMode = TimelineInputMode.INTERVALS; + private DrivingDerivedProjectionInputMode drivingDerivedProjectionInputMode = DrivingDerivedProjectionInputMode.INTERVALS; private int operatingSplitIdleHours = 7; private int significantDrivingMinutes = 3; private int minimumRestPeriodMinutes = 720; @@ -377,6 +378,16 @@ public class EventHubProperties { } } + public DrivingDerivedProjectionInputMode getDrivingDerivedProjectionInputMode() { + return drivingDerivedProjectionInputMode; + } + + public void setDrivingDerivedProjectionInputMode(DrivingDerivedProjectionInputMode drivingDerivedProjectionInputMode) { + if (drivingDerivedProjectionInputMode != null) { + this.drivingDerivedProjectionInputMode = drivingDerivedProjectionInputMode; + } + } + public int getOperatingSplitIdleHours() { return operatingSplitIdleHours; } @@ -456,6 +467,14 @@ public class EventHubProperties { EVENTS } + public enum DrivingDerivedProjectionInputMode { + /** Existing stable path: Java resolves intervals and EPL receives interval input streams. */ + INTERVALS, + + /** New path: EPL receives EventHub point events and reconstructs interval streams internally. */ + EVENTS + } + public static class LegalRequirements { private static final String DEFAULT_BASE_URL = "https://legalrequirements.services.bytebar.eu/ODataV4/LR"; diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineEventBuilder.java b/src/main/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineEventBuilder.java index 08b56f4..8d16886 100644 --- a/src/main/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineEventBuilder.java +++ b/src/main/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineEventBuilder.java @@ -20,6 +20,19 @@ public interface DriverTimelineEventBuilder { ResolvedDriverTimeline timeline ); + /** + * Builds the rawest point-event representation supported by the implementation. + * + *

The default preserves the existing interval-backed behavior. Implementations that can + * emit point events directly from source records should override this method.

+ */ + default TachographTimelineEventBundle buildRawEventBundle( + TachographFileSession session, + DriverExtractionSession driverSession + ) { + return buildEventBundle(session, driverSession); + } + default List buildEvents( TachographFileSession session, DriverExtractionSession driverSession diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineReusableProjectionBuilder.java b/src/main/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineReusableProjectionBuilder.java index 6af166e..b4e270c 100644 --- a/src/main/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineReusableProjectionBuilder.java +++ b/src/main/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineReusableProjectionBuilder.java @@ -10,7 +10,13 @@ import com.espertech.esper.runtime.client.EPDeployException; import com.espertech.esper.runtime.client.EPDeployment; import com.espertech.esper.runtime.client.EPRuntime; import com.espertech.esper.runtime.client.EPRuntimeProvider; +import com.fasterxml.jackson.databind.JsonNode; import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.processing.service.UnifiedEventTimelineReconstructor; import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent; import at.procon.eventhub.tachographfilesession.model.ResolvedActivityInterval; @@ -39,6 +45,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Component; import org.springframework.util.StreamUtils; @@ -49,15 +56,31 @@ public class DriverTimelineReusableProjectionBuilder { private static final AtomicLong RUNTIME_COUNTER = new AtomicLong(); private static final String DRIVING_DERIVED_PROJECTION_BUNDLE_EPL_TEMPLATE = loadResource("esper/tachograph-driving-derived-projection-bundle.epl"); + private static final String DRIVING_DERIVED_PROJECTION_EVENTS_PREPROCESSOR_EPL = + loadResource("esper/tachograph-driving-derived-projection-events-preprocessor.epl"); private final DriverTimelineBuilder driverTimelineBuilder; + private final RawSourceDriverTimelineEventBuilder rawSourceEventBuilder; + private final UnifiedEventTimelineReconstructor timelineReconstructor; private final EventHubProperties properties; public DriverTimelineReusableProjectionBuilder( DriverTimelineBuilder driverTimelineBuilder, EventHubProperties properties + ) { + this(driverTimelineBuilder, null, new UnifiedEventTimelineReconstructor(), properties); + } + + @Autowired + public DriverTimelineReusableProjectionBuilder( + DriverTimelineBuilder driverTimelineBuilder, + RawSourceDriverTimelineEventBuilder rawSourceEventBuilder, + UnifiedEventTimelineReconstructor timelineReconstructor, + EventHubProperties properties ) { this.driverTimelineBuilder = driverTimelineBuilder; + this.rawSourceEventBuilder = rawSourceEventBuilder; + this.timelineReconstructor = timelineReconstructor; this.properties = properties; } @@ -67,6 +90,18 @@ public class DriverTimelineReusableProjectionBuilder { int significantDrivingMinutes, int minimumRestPeriodMinutes ) { + if (session == null || driverSession == null) { + return emptyBundle(); + } + if (drivingDerivedProjectionInputMode() == EventHubProperties.DrivingDerivedProjectionInputMode.EVENTS) { + return buildEsperDrivingDerivedProjectionBundleFromEvents( + session.sessionId(), + driverSession.driverKey(), + rawSourceEventBuilder().buildRawEventBundle(session, driverSession).allEvents(), + significantDrivingMinutes, + minimumRestPeriodMinutes + ); + } ResolvedDriverTimeline timeline = driverTimelineBuilder.build(session, driverSession); return buildEsperDrivingDerivedProjectionBundle( session.sessionId(), @@ -86,6 +121,22 @@ public class DriverTimelineReusableProjectionBuilder { return emptyBundle(); } + if (drivingDerivedProjectionInputMode() == EventHubProperties.DrivingDerivedProjectionInputMode.EVENTS) { + List events = new ArrayList<>(); + for (DriverExtractionSession driverSession : session.driversByKey().values()) { + if (driverSession != null && driverSession.driverKey() != null) { + events.addAll(rawSourceEventBuilder().buildRawEventBundle(session, driverSession).allEvents()); + } + } + return buildEsperDrivingDerivedProjectionBundleFromEvents( + session.sessionId(), + null, + events, + significantDrivingMinutes, + minimumRestPeriodMinutes + ); + } + List> activityInputEvents = new ArrayList<>(); List> vehicleUsageInputEvents = new ArrayList<>(); List> supportGeoInputEvents = new ArrayList<>(); @@ -234,6 +285,161 @@ public class DriverTimelineReusableProjectionBuilder { ); } + public TachographEsperDrivingDerivedProjectionBundle buildEsperDrivingDerivedProjectionBundleFromEvents( + List events, + int significantDrivingMinutes, + int minimumRestPeriodMinutes + ) { + return buildEsperDrivingDerivedProjectionBundleFromEvents( + null, + null, + events, + significantDrivingMinutes, + minimumRestPeriodMinutes + ); + } + + public TachographEsperDrivingDerivedProjectionBundle buildEsperDrivingDerivedProjectionBundleFromEvents( + UUID fallbackSessionId, + String fallbackDriverKey, + List events, + int significantDrivingMinutes, + int minimumRestPeriodMinutes + ) { + ResolvedDriverTimeline reconstructedTimeline = reconstructMergedTimelineFromEvents( + fallbackSessionId, + fallbackDriverKey, + events + ); + return buildEsperDrivingDerivedProjectionBundle( + fallbackSessionId == null ? new UUID(0L, 0L) : fallbackSessionId, + fallbackDriverKey, + reconstructedTimeline, + significantDrivingMinutes, + minimumRestPeriodMinutes + ); + } + + private ResolvedDriverTimeline reconstructMergedTimelineFromEvents( + UUID fallbackSessionId, + String fallbackDriverKey, + List events + ) { + ResolvedDriverTimeline reconstructed = timelineReconstructor.reconstruct( + fallbackSessionId == null ? new UUID(0L, 0L) : fallbackSessionId, + fallbackDriverKey, + safeList(events) + ); + List mergedVehicleUsageIntervals = mergeVehicleUsageIntervals( + reconstructed.vehicleUsageIntervals(), + reconstructed.sourceKind() + ); + return new ResolvedDriverTimeline( + reconstructed.sourceKind(), + reconstructed.loadedFrom(), + reconstructed.loadedTo(), + mergedVehicleUsageIntervals, + reconstructed.activityIntervals(), + reconstructed.supportEvents(), + reconstructed.warnings() + ); + } + + private List mergeVehicleUsageIntervals( + List intervals, + String sourceKind + ) { + List sorted = safeList(intervals).stream() + .sorted(Comparator.comparing(ResolvedVehicleUsageInterval::from) + .thenComparing(ResolvedVehicleUsageInterval::to, Comparator.nullsLast(Comparator.naturalOrder()))) + .toList(); + if (sorted.isEmpty()) { + return List.of(); + } + + List result = new ArrayList<>(); + ResolvedVehicleUsageInterval current = sorted.getFirst(); + List currentSources = new ArrayList<>(current.sourceIntervalIds()); + for (int i = 1; i < sorted.size(); i++) { + ResolvedVehicleUsageInterval next = sorted.get(i); + if (canMergeVehicleUsage(current, next)) { + currentSources.addAll(next.sourceIntervalIds()); + current = ResolvedVehicleUsageInterval.resolved( + current.sessionId(), + current.driverKey(), + current.intervalId() + "+" + next.intervalId(), + current.from(), + mergedTo(current.to(), next.to()), + current.odometerBeginKm(), + next.odometerEndKm() != null ? next.odometerEndKm() : current.odometerEndKm(), + current.registrationKey(), + current.vehicleKey(), + sourceKind, + currentSources + ); + } else { + result.add(current); + current = next; + currentSources = new ArrayList<>(current.sourceIntervalIds()); + } + } + result.add(current); + return result; + } + + private boolean canMergeVehicleUsage(ResolvedVehicleUsageInterval left, ResolvedVehicleUsageInterval right) { + return Objects.equals(left.registrationKey(), right.registrationKey()) + && Objects.equals(left.vehicleKey(), right.vehicleKey()) + && !right.from().isAfter(mergeBoundary(left.to())); + } + + private OffsetDateTime mergedTo(OffsetDateTime left, OffsetDateTime right) { + if (left == null || right == null) { + return null; + } + return left.isAfter(right) ? left : right; + } + + private OffsetDateTime mergeBoundary(OffsetDateTime endInclusive) { + return endInclusive == null ? OffsetDateTime.MAX : endInclusive.plusSeconds(1); + } + + private TachographEsperDrivingDerivedProjectionBundle buildEsperDrivingDerivedProjectionBundleFromPointInput( + List> activityPointInputEvents, + List> vehicleUsagePointInputEvents, + List> supportGeoInputEvents, + int significantDrivingMinutes, + int minimumRestPeriodMinutes + ) { + throw new UnsupportedOperationException("Direct EPL point-input preprocessing is currently disabled."); + } + + private List> buildProjectionFinalizeEvents( + List> vehicleUsagePointInputEvents + ) { + Map> byDriver = new LinkedHashMap<>(); + for (Map point : safeList(vehicleUsagePointInputEvents)) { + String driverKey = Objects.toString(point.get("driverKey"), null); + if (driverKey == null || driverKey.isBlank()) { + continue; + } + Map finalizeEvent = byDriver.computeIfAbsent(driverKey, ignored -> { + Map event = new LinkedHashMap<>(); + event.put("sessionId", point.get("sessionId")); + event.put("driverKey", driverKey); + event.put("finalizedAtEpochSecond", 0L); + return event; + }); + Long occurredAtEpochSecond = (Long) point.get("occurredAtEpochSecond"); + Long finalizedAtEpochSecond = (Long) finalizeEvent.get("finalizedAtEpochSecond"); + if (occurredAtEpochSecond != null + && (finalizedAtEpochSecond == null || occurredAtEpochSecond > finalizedAtEpochSecond)) { + finalizeEvent.put("finalizedAtEpochSecond", occurredAtEpochSecond); + } + } + return new ArrayList<>(byDriver.values()); + } + private List> buildActivityIntervalInputEvents( UUID sessionId, String driverKey, @@ -324,6 +530,56 @@ public class DriverTimelineReusableProjectionBuilder { } } + private Map activityPointInputDefinition() { + Map definition = new LinkedHashMap<>(); + definition.put("sessionId", UUID.class); + definition.put("driverKey", String.class); + definition.put("eventId", String.class); + definition.put("intervalId", String.class); + definition.put("sourceRowId", String.class); + definition.put("sourceRowIds", java.util.List.class); + definition.put("activityType", String.class); + definition.put("lifecycle", String.class); + definition.put("occurredAt", OffsetDateTime.class); + definition.put("occurredAtEpochSecond", long.class); + definition.put("cardSlot", String.class); + definition.put("cardStatus", String.class); + definition.put("drivingStatus", String.class); + definition.put("registrationKey", String.class); + definition.put("vehicleKey", String.class); + definition.put("sourceKind", String.class); + definition.put("synthetic", boolean.class); + definition.put("clippedToRequestedPeriod", boolean.class); + definition.put("level", String.class); + return definition; + } + + private Map vehicleUsagePointInputDefinition() { + Map definition = new LinkedHashMap<>(); + definition.put("sessionId", UUID.class); + definition.put("driverKey", String.class); + definition.put("eventId", String.class); + definition.put("intervalId", String.class); + definition.put("sourceRowId", String.class); + definition.put("sourceRowIds", java.util.List.class); + definition.put("lifecycle", String.class); + definition.put("occurredAt", OffsetDateTime.class); + definition.put("occurredAtEpochSecond", long.class); + definition.put("registrationKey", String.class); + definition.put("vehicleKey", String.class); + definition.put("sourceKind", String.class); + definition.put("odometerKm", Long.class); + return definition; + } + + private Map projectionFinalizeInputDefinition() { + Map definition = new LinkedHashMap<>(); + definition.put("sessionId", UUID.class); + definition.put("driverKey", String.class); + definition.put("finalizedAtEpochSecond", long.class); + return definition; + } + private Map activityIntervalInputDefinition() { Map definition = new LinkedHashMap<>(); definition.put("sessionId", UUID.class); @@ -470,6 +726,120 @@ public class DriverTimelineReusableProjectionBuilder { return event; } + private Map toActivityPointInputMap( + UUID fallbackSessionId, + String fallbackDriverKey, + EventHubEventDto sourceEvent + ) { + if (sourceEvent == null + || sourceEvent.eventDomain() != EventDomain.DRIVER_ACTIVITY + || (sourceEvent.lifecycle() != EventLifecycle.START && sourceEvent.lifecycle() != EventLifecycle.END) + || sourceEvent.occurredAt() == null) { + return null; + } + JsonNode raw = rawPayload(sourceEvent); + String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId()); + String driverKey = firstNonBlank(text(raw, "driverKey"), fallbackDriverKey, driverKey(sourceEvent)); + if (driverKey == null || intervalId == null) { + return null; + } + JsonNode attributes = attributes(sourceEvent); + Map event = new LinkedHashMap<>(); + event.put("sessionId", sessionId(fallbackSessionId, raw, sourceEvent)); + event.put("driverKey", driverKey); + event.put("eventId", sourceEvent.externalSourceEventId()); + event.put("intervalId", intervalId); + event.put("sourceRowId", firstNonBlank(text(raw, "sourceRowId"), intervalId)); + event.put("sourceRowIds", stringList(raw, "sourceRowIds", intervalId)); + event.put("activityType", firstNonBlank(text(raw, "activityType"), eventTypeAsActivity(sourceEvent.eventType()))); + event.put("lifecycle", sourceEvent.lifecycle().name()); + event.put("occurredAt", sourceEvent.occurredAt()); + event.put("occurredAtEpochSecond", sourceEvent.occurredAt().toEpochSecond()); + event.put("cardSlot", firstNonBlank(text(raw, "cardSlot"), text(attributes, "cardSlot"))); + event.put("cardStatus", firstNonBlank(text(raw, "cardStatus"), text(attributes, "cardStatus"))); + event.put("drivingStatus", firstNonBlank(text(raw, "drivingStatus"), text(attributes, "drivingStatus"))); + event.put("registrationKey", firstNonBlank(text(raw, "registrationKey"), registrationKey(sourceEvent))); + event.put("vehicleKey", firstNonBlank(text(raw, "vehicleKey"), vehicleKey(sourceEvent))); + event.put("sourceKind", firstNonBlank(text(raw, "sourceKind"), sourceKind(sourceEvent))); + event.put("synthetic", booleanValue(raw, "synthetic", false)); + event.put("clippedToRequestedPeriod", booleanValue(raw, "clippedToRequestedPeriod", false)); + event.put("level", firstNonBlank(text(raw, "level"), "RAW_EVENT")); + return event; + } + + private Map toVehicleUsagePointInputMap( + UUID fallbackSessionId, + String fallbackDriverKey, + EventHubEventDto sourceEvent + ) { + if (sourceEvent == null + || sourceEvent.eventDomain() != EventDomain.DRIVER_CARD + || (sourceEvent.lifecycle() != EventLifecycle.INSERT && sourceEvent.lifecycle() != EventLifecycle.WITHDRAW) + || sourceEvent.occurredAt() == null) { + return null; + } + boolean supportedType = sourceEvent.eventType() == EventType.CARD_INSERTED + || sourceEvent.eventType() == EventType.CARD_WITHDRAWN; + if (!supportedType) { + return null; + } + JsonNode raw = rawPayload(sourceEvent); + String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId()); + String driverKey = firstNonBlank(text(raw, "driverKey"), fallbackDriverKey, driverKey(sourceEvent)); + if (driverKey == null || intervalId == null) { + return null; + } + Map event = new LinkedHashMap<>(); + event.put("sessionId", sessionId(fallbackSessionId, raw, sourceEvent)); + event.put("driverKey", driverKey); + event.put("eventId", sourceEvent.externalSourceEventId()); + event.put("intervalId", intervalId); + event.put("sourceRowId", firstNonBlank(text(raw, "sourceRowId"), intervalId)); + event.put("sourceRowIds", stringList(raw, "sourceRowIds", intervalId)); + event.put("lifecycle", sourceEvent.lifecycle().name()); + event.put("occurredAt", sourceEvent.occurredAt()); + event.put("occurredAtEpochSecond", sourceEvent.occurredAt().toEpochSecond()); + event.put("registrationKey", firstNonBlank(text(raw, "registrationKey"), registrationKey(sourceEvent))); + event.put("vehicleKey", firstNonBlank(text(raw, "vehicleKey"), vehicleKey(sourceEvent))); + event.put("sourceKind", firstNonBlank(text(raw, "sourceKind"), sourceKind(sourceEvent))); + event.put("odometerKm", odometerKm(sourceEvent, raw)); + return event; + } + + private Map toSupportGeoEvidenceInputMap( + UUID fallbackSessionId, + String fallbackDriverKey, + EventHubEventDto sourceEvent + ) { + if (sourceEvent == null || sourceEvent.occurredAt() == null || sourceEvent.position() == null) { + return null; + } + String eventDomain = sourceEvent.eventDomain() == null ? null : sourceEvent.eventDomain().name(); + int priority = supportGeoPriority(eventDomain); + if (priority <= 0) { + return null; + } + JsonNode raw = rawPayload(sourceEvent); + String driverKey = firstNonBlank(text(raw, "driverKey"), fallbackDriverKey, driverKey(sourceEvent)); + if (driverKey == null) { + return null; + } + Map event = new LinkedHashMap<>(); + event.put("sessionId", sessionId(fallbackSessionId, raw, sourceEvent)); + event.put("driverKey", driverKey); + event.put("eventId", firstNonBlank(text(raw, "supportEventId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId())); + event.put("eventDomain", eventDomain); + event.put("occurredAt", sourceEvent.occurredAt()); + event.put("occurredAtEpochSecond", sourceEvent.occurredAt().toEpochSecond()); + event.put("registrationKey", firstNonBlank(text(raw, "registrationKey"), registrationKey(sourceEvent))); + event.put("vehicleKey", firstNonBlank(text(raw, "vehicleKey"), vehicleKey(sourceEvent))); + event.put("latitude", sourceEvent.position().latitude().doubleValue()); + event.put("longitude", sourceEvent.position().longitude().doubleValue()); + event.put("odometerKm", odometerKm(sourceEvent, raw)); + event.put("priority", priority); + return event; + } + private int supportGeoPriority(String eventDomain) { if (eventDomain == null || eventDomain.isBlank()) { return 0; @@ -483,6 +853,186 @@ public class DriverTimelineReusableProjectionBuilder { }; } + private Comparator> pointEventComparator() { + return Comparator + .comparing((Map event) -> (Long) event.get("occurredAtEpochSecond")) + .thenComparing(event -> lifecycleOrder(Objects.toString(event.get("lifecycle"), ""))) + .thenComparing(event -> Objects.toString(event.get("driverKey"), "")) + .thenComparing(event -> Objects.toString(event.get("intervalId"), "")) + .thenComparing(event -> Objects.toString(event.get("eventId"), "")); + } + + private int lifecycleOrder(String lifecycle) { + return switch (lifecycle) { + case "INSERT", "START" -> 0; + case "WITHDRAW", "END" -> 1; + default -> 2; + }; + } + + private EventHubProperties.DrivingDerivedProjectionInputMode drivingDerivedProjectionInputMode() { + return properties.getTachographFileSession().getProcessing().getDrivingDerivedProjectionInputMode(); + } + + private RawSourceDriverTimelineEventBuilder rawSourceEventBuilder() { + if (rawSourceEventBuilder == null) { + throw new IllegalStateException( + "Driving-derived projection input mode EVENTS requires RawSourceDriverTimelineEventBuilder" + ); + } + return rawSourceEventBuilder; + } + + private JsonNode rawPayload(EventHubEventDto event) { + JsonNode payload = event.payload(); + if (payload == null || payload.isNull()) { + return null; + } + JsonNode raw = payload.get("raw"); + return raw == null || raw.isNull() ? payload : raw; + } + + private JsonNode attributes(EventHubEventDto event) { + return event.eventDetails() == null ? null : event.eventDetails().attributes(); + } + + private String text(JsonNode node, String field) { + if (node == null || field == null) { + return null; + } + JsonNode value = node.get(field); + if (value == null || value.isNull()) { + return null; + } + String text = value.asText(null); + return text == null || text.isBlank() ? null : text; + } + + private boolean booleanValue(JsonNode node, String field, boolean fallback) { + if (node == null || field == null || node.get(field) == null || node.get(field).isNull()) { + return fallback; + } + return node.get(field).asBoolean(fallback); + } + + private Long longValue(JsonNode node, String field) { + if (node == null || field == null || node.get(field) == null || node.get(field).isNull()) { + return null; + } + JsonNode value = node.get(field); + if (value.isNumber()) { + return value.asLong(); + } + try { + return Long.parseLong(value.asText()); + } catch (NumberFormatException ignored) { + return null; + } + } + + private List stringList(JsonNode node, String field, String fallback) { + JsonNode value = node == null || field == null ? null : node.get(field); + if (value == null || value.isNull()) { + return fallback == null ? List.of() : List.of(fallback); + } + if (value.isArray()) { + List result = new ArrayList<>(); + value.forEach(item -> { + if (item != null && !item.isNull()) { + String text = item.asText(null); + if (text != null && !text.isBlank()) { + result.add(text); + } + } + }); + return result.isEmpty() && fallback != null ? List.of(fallback) : List.copyOf(result); + } + String text = value.asText(null); + return text == null || text.isBlank() ? (fallback == null ? List.of() : List.of(fallback)) : List.of(text); + } + + private String firstNonBlank(String... values) { + if (values == null) { + return null; + } + for (String value : values) { + if (value != null && !value.isBlank()) { + return value.trim(); + } + } + return null; + } + + private String eventTypeAsActivity(EventType eventType) { + if (eventType == null) { + return "UNKNOWN"; + } + return switch (eventType) { + case DRIVE -> "DRIVE"; + case WORK -> "WORK"; + case AVAILABILITY -> "AVAILABILITY"; + case BREAK_REST -> "BREAK_REST"; + default -> eventType.name(); + }; + } + + private UUID sessionId(UUID fallbackSessionId, JsonNode raw, EventHubEventDto event) { + String rawSessionId = firstNonBlank( + text(raw, "sessionId"), + event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId() + ); + if (rawSessionId != null) { + try { + return UUID.fromString(rawSessionId); + } catch (IllegalArgumentException ignored) { + // DB-acquired source packages need not be UUID-based file sessions. + } + } + return fallbackSessionId == null ? new UUID(0L, 0L) : fallbackSessionId; + } + + private String driverKey(EventHubEventDto event) { + if (event.driverRef() == null) { + return null; + } + if (event.driverRef().driverCard() != null && event.driverRef().driverCard().hasValue()) { + return event.driverRef().driverCard().stableKey(); + } + return event.driverRef().sourceEntityId(); + } + + private String registrationKey(EventHubEventDto event) { + if (event.vehicleRef() == null || event.vehicleRef().vehicleRegistration() == null) { + return null; + } + return event.vehicleRef().vehicleRegistration().stableKey(); + } + + private String vehicleKey(EventHubEventDto event) { + if (event.vehicleRef() == null) { + return null; + } + return firstNonBlank(event.vehicleRef().vin(), event.vehicleRef().sourceVehicleEntityId()); + } + + private String sourceKind(EventHubEventDto event) { + return event.packageInfo() == null || event.packageInfo().eventSource() == null + ? null + : event.packageInfo().eventSource().sourceKind(); + } + + private Long odometerKm(EventHubEventDto event, JsonNode raw) { + Long explicit = longValue(raw, event.lifecycle() == EventLifecycle.WITHDRAW ? "odometerEndKm" : "odometerBeginKm"); + if (explicit != null) { + return explicit; + } + explicit = longValue(raw, "odometerKm"); + if (explicit != null) { + return explicit; + } + return event.odometerM() == null ? null : event.odometerM() / 1_000L; + } + private String firstSourceIntervalId(ResolvedActivityInterval interval) { return interval.sourceIntervalIds().isEmpty() ? interval.intervalId() : interval.sourceIntervalIds().get(0); } @@ -901,6 +1451,12 @@ public class DriverTimelineReusableProjectionBuilder { .toList(); } + private String renderDrivingDerivedProjectionEventsEpl(int significantDrivingMinutes, int minimumRestPeriodMinutes) { + return DRIVING_DERIVED_PROJECTION_EVENTS_PREPROCESSOR_EPL + + "\n\n" + + renderDrivingDerivedProjectionBundleEpl(significantDrivingMinutes, minimumRestPeriodMinutes); + } + private String renderDrivingDerivedProjectionBundleEpl(int significantDrivingMinutes, int minimumRestPeriodMinutes) { return DRIVING_DERIVED_PROJECTION_BUNDLE_EPL_TEMPLATE .replace( diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/service/IntervalBackedDriverTimelineEventBuilder.java b/src/main/java/at/procon/eventhub/tachographfilesession/service/IntervalBackedDriverTimelineEventBuilder.java index 992caa5..b98f5d4 100644 --- a/src/main/java/at/procon/eventhub/tachographfilesession/service/IntervalBackedDriverTimelineEventBuilder.java +++ b/src/main/java/at/procon/eventhub/tachographfilesession/service/IntervalBackedDriverTimelineEventBuilder.java @@ -95,6 +95,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE List activityEvents = buildActivityEvents( session, + driverSession.driverKey(), timeline.activityIntervals(), driverRef, registrationsByKey, @@ -104,6 +105,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE ); List vehicleUsageEvents = buildVehicleUsageEvents( session, + driverSession.driverKey(), timeline.vehicleUsageIntervals(), driverRef, registrationsByKey, @@ -125,6 +127,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE private List buildActivityEvents( TachographFileSession session, + String driverKey, List intervals, DriverRefDto driverRef, Map registrationsByKey, @@ -147,6 +150,8 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE Map raw = new LinkedHashMap<>(); raw.put("intervalId", interval.intervalId()); raw.put("sourceRowId", interval.intervalId()); + raw.put("driverKey", driverKey); + raw.put("activityType", interval.activityType()); raw.put("sourceRowIds", interval.sourceIntervalIds()); raw.put("startedAt", timeText(interval.from())); raw.put("endedAt", timeText(interval.to())); @@ -200,6 +205,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE private List buildVehicleUsageEvents( TachographFileSession session, + String driverKey, List intervals, DriverRefDto driverRef, Map registrationsByKey, @@ -226,6 +232,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE Map raw = new LinkedHashMap<>(); raw.put("intervalId", interval.intervalId()); raw.put("sourceRowId", interval.intervalId()); + raw.put("driverKey", driverKey); raw.put("sourceRowIds", interval.sourceIntervalIds()); raw.put("startedAt", timeText(interval.from())); raw.put("endedAt", timeText(interval.to())); diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/service/RawSourceDriverTimelineEventBuilder.java b/src/main/java/at/procon/eventhub/tachographfilesession/service/RawSourceDriverTimelineEventBuilder.java new file mode 100644 index 0000000..ca7a63f --- /dev/null +++ b/src/main/java/at/procon/eventhub/tachographfilesession/service/RawSourceDriverTimelineEventBuilder.java @@ -0,0 +1,211 @@ +package at.procon.eventhub.tachographfilesession.service; + +import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; +import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent; +import at.procon.eventhub.tachographfilesession.model.ExtractionWarning; +import at.procon.eventhub.tachographfilesession.model.ResolvedActivityInterval; +import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline; +import at.procon.eventhub.tachographfilesession.model.ResolvedVehicleUsageInterval; +import at.procon.eventhub.tachographfilesession.model.TachographFileSession; +import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle; +import java.time.OffsetDateTime; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import org.springframework.stereotype.Component; + +/** + * Builds EventHub point events directly from the extracted tachograph source records. + * + *

This builder intentionally does not run the normal {@link DriverTimelineBuilder} interval + * resolution/merging path. It only wraps each raw source interval as a START/END or + * INSERT/WITHDRAW point-event pair. Any subsequent pairing, coalescing, or derived interval logic + * belongs to the event-input EPL projection pipeline.

+ */ +@Component +public class RawSourceDriverTimelineEventBuilder { + + private final IntervalBackedDriverTimelineEventBuilder intervalEventBuilder; + + public RawSourceDriverTimelineEventBuilder(IntervalBackedDriverTimelineEventBuilder intervalEventBuilder) { + this.intervalEventBuilder = intervalEventBuilder; + } + + public TachographTimelineEventBundle buildRawEventBundle( + TachographFileSession session, + DriverExtractionSession driverSession + ) { + if (session == null || driverSession == null) { + return new TachographTimelineEventBundle(List.of(), List.of(), List.of()); + } + return intervalEventBuilder.buildEventBundle(session, driverSession, buildRawTimeline(session, driverSession)); + } + + public ResolvedDriverTimeline buildRawTimeline( + TachographFileSession session, + DriverExtractionSession driverSession + ) { + String sourceKind = session.metadata().driverCardFile() ? "DRIVER_CARD" : "VEHICLE_UNIT"; + List vehicleUsageIntervals = rawVehicleUsageIntervals( + session, + driverSession, + sourceKind + ); + List activityIntervals = rawActivityIntervals(driverSession, sourceKind); + List supportEvents = sortedSupportEvents(driverSession.supportEvents()); + return new ResolvedDriverTimeline( + sourceKind, + minTimestamp(vehicleUsageIntervals, activityIntervals, supportEvents), + maxTimestamp(vehicleUsageIntervals, activityIntervals, supportEvents), + vehicleUsageIntervals, + activityIntervals, + supportEvents, + mergeWarnings(session.warnings(), driverSession.warnings()) + ); + } + + private List rawVehicleUsageIntervals( + TachographFileSession session, + DriverExtractionSession driverSession, + String sourceKind + ) { + if (driverSession.cardVehicleUsageIntervals() == null || driverSession.cardVehicleUsageIntervals().isEmpty()) { + return List.of(); + } + return driverSession.cardVehicleUsageIntervals().stream() + .filter(interval -> interval.from() != null && (interval.to() == null || interval.to().isAfter(interval.from()))) + .map(interval -> ResolvedVehicleUsageInterval.resolved( + session.sessionId(), + driverSession.driverKey(), + interval.intervalId(), + interval.from(), + interval.to(), + interval.odometerBeginKm(), + interval.odometerEndKm(), + interval.registrationKey(), + interval.vehicleKey(), + sourceKind, + List.of(interval.intervalId()) + )) + .sorted(Comparator.comparing(ResolvedVehicleUsageInterval::from) + .thenComparing(ResolvedVehicleUsageInterval::to, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(ResolvedVehicleUsageInterval::intervalId, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + private List rawActivityIntervals( + DriverExtractionSession driverSession, + String sourceKind + ) { + if (driverSession.cardActivityIntervals() == null || driverSession.cardActivityIntervals().isEmpty()) { + return List.of(); + } + return driverSession.cardActivityIntervals().stream() + .filter(interval -> interval.from() != null && interval.to() != null && interval.to().isAfter(interval.from())) + .map(interval -> ResolvedActivityInterval.raw( + interval.intervalId(), + interval.from(), + interval.to(), + normalizeActivity(interval.activityType()), + interval.slot(), + interval.cardStatus(), + interval.drivingStatus(), + interval.registrationKey(), + interval.vehicleKey(), + sourceKind, + List.of(interval.intervalId()) + )) + .sorted(Comparator.comparing(ResolvedActivityInterval::from) + .thenComparing(ResolvedActivityInterval::to) + .thenComparing(ResolvedActivityInterval::intervalId, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + private List sortedSupportEvents(List supportEvents) { + if (supportEvents == null || supportEvents.isEmpty()) { + return List.of(); + } + return supportEvents.stream() + .sorted(Comparator.comparing(ExtractedSupportEvent::occurredAt) + .thenComparing(ExtractedSupportEvent::eventDomain, Comparator.nullsLast(String::compareTo)) + .thenComparing(ExtractedSupportEvent::eventId, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + private String normalizeActivity(String activityType) { + if (activityType == null || activityType.isBlank()) { + return "UNKNOWN"; + } + if ("UNKNOWN_ACTIVITY".equals(activityType)) { + return "UNKNOWN"; + } + return activityType; + } + + private OffsetDateTime minTimestamp( + List vehicleUsageIntervals, + List activityIntervals, + List supportEvents + ) { + OffsetDateTime min = null; + for (ResolvedVehicleUsageInterval interval : vehicleUsageIntervals) { + min = min(min, interval.from()); + } + for (ResolvedActivityInterval interval : activityIntervals) { + min = min(min, interval.from()); + } + for (ExtractedSupportEvent supportEvent : supportEvents) { + min = min(min, supportEvent.occurredAt()); + } + return min; + } + + private OffsetDateTime maxTimestamp( + List vehicleUsageIntervals, + List activityIntervals, + List supportEvents + ) { + OffsetDateTime max = null; + for (ResolvedVehicleUsageInterval interval : vehicleUsageIntervals) { + max = max(max, interval.to()); + } + for (ResolvedActivityInterval interval : activityIntervals) { + max = max(max, interval.to()); + } + for (ExtractedSupportEvent supportEvent : supportEvents) { + max = max(max, supportEvent.occurredAt()); + } + return max; + } + + private List mergeWarnings(List sessionWarnings, List driverWarnings) { + LinkedHashSet merged = new LinkedHashSet<>(); + if (sessionWarnings != null) { + merged.addAll(sessionWarnings); + } + if (driverWarnings != null) { + merged.addAll(driverWarnings); + } + return List.copyOf(merged); + } + + private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) { + if (left == null) { + return right; + } + if (right == null) { + return left; + } + return left.isBefore(right) ? left : right; + } + + private OffsetDateTime max(OffsetDateTime left, OffsetDateTime right) { + if (left == null) { + return right; + } + if (right == null) { + return left; + } + return left.isAfter(right) ? left : right; + } +} diff --git a/src/main/java/at/procon/eventhub/tachographfilesession/service/TachographFileSessionProcessingService.java b/src/main/java/at/procon/eventhub/tachographfilesession/service/TachographFileSessionProcessingService.java index fd5b0cc..08fd605 100644 --- a/src/main/java/at/procon/eventhub/tachographfilesession/service/TachographFileSessionProcessingService.java +++ b/src/main/java/at/procon/eventhub/tachographfilesession/service/TachographFileSessionProcessingService.java @@ -175,13 +175,21 @@ public class TachographFileSessionProcessingService { requestedTo ); TachographEsperDrivingDerivedProjectionBundle derivedProjectionBundle = - reusableProjectionBuilder.buildEsperDrivingDerivedProjectionBundle( - sessionId, - driverKey, - timeline, - significantDrivingMinutes, - minimumRestPeriodMinutes - ); + properties.getTachographFileSession().getProcessing().getDrivingDerivedProjectionInputMode() + == EventHubProperties.DrivingDerivedProjectionInputMode.EVENTS + ? reusableProjectionBuilder.buildEsperDrivingDerivedProjectionBundle( + session, + driver, + significantDrivingMinutes, + minimumRestPeriodMinutes + ) + : reusableProjectionBuilder.buildEsperDrivingDerivedProjectionBundle( + sessionId, + driverKey, + timeline, + significantDrivingMinutes, + minimumRestPeriodMinutes + ); List rawDrivingInterruptionIntervals = derivedProjectionBundle.drivingInterruptionIntervals(); List drivingInterruptionIntervals = diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index da6cf33..79e3eb1 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -131,6 +131,9 @@ eventhub: merge-gap-seconds: 0 gap-detection-tolerance-seconds: 0 timeline-input-mode: events + # INTERVALS = existing stable path: Java prepares activity/card-usage intervals for EPL. + # EVENTS = new path: EPL receives EventHub point events and reconstructs intervals internally. + driving-derived-projection-input-mode: events #intervals legal-requirements: base-url: ${LEGAL_REQUIREMENTS_BASE_URL:https://legalrequirements.services.bytebar.eu/ODataV4/LR} username: ${LEGAL_REQUIREMENTS_USERNAME:} diff --git a/src/main/resources/esper/tachograph-driving-derived-projection-events-preprocessor.epl b/src/main/resources/esper/tachograph-driving-derived-projection-events-preprocessor.epl new file mode 100644 index 0000000..808d619 --- /dev/null +++ b/src/main/resources/esper/tachograph-driving-derived-projection-events-preprocessor.epl @@ -0,0 +1,348 @@ +/* + * Event-input adapter for tachograph-driving-derived-projection-bundle.epl. + * + * The old bundle consumes resolved interval input streams. This preprocessor lets the same + * derived rules consume EventHub point events by pairing START/END activity events and + * INSERT/WITHDRAW card-vehicle usage events inside Esper. + * + * Vehicle-usage intervals are additionally coalesced in EPL before they are forwarded to + * the old interval bundle. This keeps event mode equivalent to DriverTimelineBuilder's + * Java merge behavior for consecutive same-vehicle CardVehiclesUsed rows, including the + * common midnight continuation 23:59:59 -> 00:00:00 on the next day. + */ + +create window OpenActivityPoint#unique(driverKey, intervalId) as TachographActivityPointInputEvent; + +insert into OpenActivityPoint +select * +from TachographActivityPointInputEvent(lifecycle = 'START'); + +@Priority(20) +on TachographActivityPointInputEvent(lifecycle = 'END') as endEvent +insert into TachographActivityIntervalInputEvent +select + startEvent.sessionId as sessionId, + startEvent.driverKey as driverKey, + startEvent.intervalId as intervalId, + startEvent.activityType as activityType, + startEvent.cardSlot as cardSlot, + startEvent.cardStatus as cardStatus, + startEvent.drivingStatus as drivingStatus, + startEvent.registrationKey as registrationKey, + startEvent.vehicleKey as vehicleKey, + startEvent.sourceKind as sourceKind, + startEvent.sourceRowId as firstSourceIntervalId, + endEvent.sourceRowId as lastSourceIntervalId, + startEvent.occurredAt as startedAt, + endEvent.occurredAt as endedAt, + startEvent.occurredAtEpochSecond as startedAtEpochSecond, + endEvent.occurredAtEpochSecond as endedAtEpochSecond, + endEvent.occurredAtEpochSecond - startEvent.occurredAtEpochSecond as durationSeconds, + startEvent.sourceRowIds as sourceIntervalIds, + startEvent.synthetic as synthetic, + startEvent.clippedToRequestedPeriod as clippedToRequestedPeriod, + startEvent.level as level +from OpenActivityPoint as startEvent +where startEvent.driverKey = endEvent.driverKey + and startEvent.intervalId = endEvent.intervalId + and endEvent.occurredAtEpochSecond > startEvent.occurredAtEpochSecond; + +@Priority(10) +on TachographActivityPointInputEvent(lifecycle = 'END') as endEvent +delete from OpenActivityPoint as openEvent +where openEvent.driverKey = endEvent.driverKey + and openEvent.intervalId = endEvent.intervalId; + +create schema RawVehicleUsageInterval( + sessionId java.util.UUID, + driverKey string, + intervalId string, + firstSourceIntervalId string, + lastSourceIntervalId string, + startedAt java.time.OffsetDateTime, + endedAt java.time.OffsetDateTime, + startedAtEpochSecond long, + endedAtEpochSecond java.lang.Long, + durationSeconds long, + odometerBeginKm java.lang.Long, + odometerEndKm java.lang.Long, + registrationKey string, + vehicleKey string, + sourceKind string, + sourceIntervalIds java.util.List +); + +create window OpenVehicleUsagePoint#unique(driverKey, intervalId) as TachographVehicleUsagePointInputEvent; + +insert into OpenVehicleUsagePoint +select * +from TachographVehicleUsagePointInputEvent(lifecycle = 'INSERT'); + +@Priority(20) +on TachographVehicleUsagePointInputEvent(lifecycle = 'WITHDRAW') as withdrawEvent +insert into RawVehicleUsageInterval +select + insertEvent.sessionId as sessionId, + insertEvent.driverKey as driverKey, + insertEvent.intervalId as intervalId, + insertEvent.sourceRowId as firstSourceIntervalId, + withdrawEvent.sourceRowId as lastSourceIntervalId, + insertEvent.occurredAt as startedAt, + withdrawEvent.occurredAt as endedAt, + insertEvent.occurredAtEpochSecond as startedAtEpochSecond, + withdrawEvent.occurredAtEpochSecond as endedAtEpochSecond, + withdrawEvent.occurredAtEpochSecond - insertEvent.occurredAtEpochSecond as durationSeconds, + insertEvent.odometerKm as odometerBeginKm, + withdrawEvent.odometerKm as odometerEndKm, + insertEvent.registrationKey as registrationKey, + insertEvent.vehicleKey as vehicleKey, + insertEvent.sourceKind as sourceKind, + insertEvent.sourceRowIds as sourceIntervalIds +from OpenVehicleUsagePoint as insertEvent +where insertEvent.driverKey = withdrawEvent.driverKey + and insertEvent.intervalId = withdrawEvent.intervalId + and withdrawEvent.occurredAtEpochSecond > insertEvent.occurredAtEpochSecond; + +@Priority(10) +on TachographVehicleUsagePointInputEvent(lifecycle = 'WITHDRAW') as withdrawEvent +delete from OpenVehicleUsagePoint as openEvent +where openEvent.driverKey = withdrawEvent.driverKey + and openEvent.intervalId = withdrawEvent.intervalId; + +create window MergedVehicleUsageAccumulator#unique(driverKey) as RawVehicleUsageInterval; +create window MergedVehicleUsageNext#unique(driverKey) as RawVehicleUsageInterval; + +/* + * Case 1: first vehicle-usage interval for this driver. Start a new accumulator. + */ +@Priority(70) +on RawVehicleUsageInterval as next +insert into MergedVehicleUsageNext +select + next.sessionId as sessionId, + next.driverKey as driverKey, + next.intervalId as intervalId, + next.firstSourceIntervalId as firstSourceIntervalId, + next.lastSourceIntervalId as lastSourceIntervalId, + next.startedAt as startedAt, + next.endedAt as endedAt, + next.startedAtEpochSecond as startedAtEpochSecond, + next.endedAtEpochSecond as endedAtEpochSecond, + next.durationSeconds as durationSeconds, + next.odometerBeginKm as odometerBeginKm, + next.odometerEndKm as odometerEndKm, + next.registrationKey as registrationKey, + next.vehicleKey as vehicleKey, + next.sourceKind as sourceKind, + next.sourceIntervalIds as sourceIntervalIds +where not exists ( + select * from MergedVehicleUsageAccumulator as current + where current.driverKey = next.driverKey +); + +/* + * Case 2: same vehicle and registration, and the next interval starts at or before + * current.to + 1 second. This includes 23:59:59 -> 00:00:00. Keep one accumulated + * interval and extend its end/last-source/ending odometer. + */ +@Priority(70) +on RawVehicleUsageInterval as next +insert into MergedVehicleUsageNext +select + current.sessionId as sessionId, + current.driverKey as driverKey, + current.intervalId as intervalId, + current.firstSourceIntervalId as firstSourceIntervalId, + next.lastSourceIntervalId as lastSourceIntervalId, + current.startedAt as startedAt, + next.endedAt as endedAt, + current.startedAtEpochSecond as startedAtEpochSecond, + next.endedAtEpochSecond as endedAtEpochSecond, + next.endedAtEpochSecond - current.startedAtEpochSecond as durationSeconds, + current.odometerBeginKm as odometerBeginKm, + next.odometerEndKm as odometerEndKm, + current.registrationKey as registrationKey, + current.vehicleKey as vehicleKey, + current.sourceKind as sourceKind, + current.sourceIntervalIds as sourceIntervalIds +from MergedVehicleUsageAccumulator as current +where current.driverKey = next.driverKey + and current.endedAtEpochSecond is not null + and next.startedAtEpochSecond <= current.endedAtEpochSecond + 1L + and ( + (current.registrationKey is null and next.registrationKey is null) + or ( + current.registrationKey is not null + and next.registrationKey is not null + and current.registrationKey = next.registrationKey + ) + ) + and ( + (current.vehicleKey is null and next.vehicleKey is null) + or ( + current.vehicleKey is not null + and next.vehicleKey is not null + and current.vehicleKey = next.vehicleKey + ) + ); + +/* + * Case 3a: next interval is not mergeable. Emit the accumulated interval to the old + * interval-based bundle. + */ +@Priority(70) +on RawVehicleUsageInterval as next +insert into TachographVehicleUsageIntervalInputEvent +select + current.sessionId as sessionId, + current.driverKey as driverKey, + current.intervalId as intervalId, + current.firstSourceIntervalId as firstSourceIntervalId, + current.lastSourceIntervalId as lastSourceIntervalId, + current.startedAt as startedAt, + current.endedAt as endedAt, + current.startedAtEpochSecond as startedAtEpochSecond, + current.endedAtEpochSecond as endedAtEpochSecond, + current.durationSeconds as durationSeconds, + current.odometerBeginKm as odometerBeginKm, + current.odometerEndKm as odometerEndKm, + current.registrationKey as registrationKey, + current.vehicleKey as vehicleKey, + current.sourceKind as sourceKind, + current.sourceIntervalIds as sourceIntervalIds +from MergedVehicleUsageAccumulator as current +where current.driverKey = next.driverKey + and not ( + current.endedAtEpochSecond is not null + and next.startedAtEpochSecond <= current.endedAtEpochSecond + 1L + and ( + (current.registrationKey is null and next.registrationKey is null) + or ( + current.registrationKey is not null + and next.registrationKey is not null + and current.registrationKey = next.registrationKey + ) + ) + and ( + (current.vehicleKey is null and next.vehicleKey is null) + or ( + current.vehicleKey is not null + and next.vehicleKey is not null + and current.vehicleKey = next.vehicleKey + ) + ) + ); + +/* + * Case 3b: next interval is not mergeable. Start a new accumulator with it. + */ +@Priority(70) +on RawVehicleUsageInterval as next +insert into MergedVehicleUsageNext +select + next.sessionId as sessionId, + next.driverKey as driverKey, + next.intervalId as intervalId, + next.firstSourceIntervalId as firstSourceIntervalId, + next.lastSourceIntervalId as lastSourceIntervalId, + next.startedAt as startedAt, + next.endedAt as endedAt, + next.startedAtEpochSecond as startedAtEpochSecond, + next.endedAtEpochSecond as endedAtEpochSecond, + next.durationSeconds as durationSeconds, + next.odometerBeginKm as odometerBeginKm, + next.odometerEndKm as odometerEndKm, + next.registrationKey as registrationKey, + next.vehicleKey as vehicleKey, + next.sourceKind as sourceKind, + next.sourceIntervalIds as sourceIntervalIds +where exists ( + select * from MergedVehicleUsageAccumulator as current + where current.driverKey = next.driverKey + and not ( + current.endedAtEpochSecond is not null + and next.startedAtEpochSecond <= current.endedAtEpochSecond + 1L + and ( + (current.registrationKey is null and next.registrationKey is null) + or ( + current.registrationKey is not null + and next.registrationKey is not null + and current.registrationKey = next.registrationKey + ) + ) + and ( + (current.vehicleKey is null and next.vehicleKey is null) + or ( + current.vehicleKey is not null + and next.vehicleKey is not null + and current.vehicleKey = next.vehicleKey + ) + ) + ) +); + +@Priority(60) +on RawVehicleUsageInterval as next +delete from MergedVehicleUsageAccumulator as current +where current.driverKey = next.driverKey; + +@Priority(50) +on RawVehicleUsageInterval as next +insert into MergedVehicleUsageAccumulator +select + candidate.sessionId as sessionId, + candidate.driverKey as driverKey, + candidate.intervalId as intervalId, + candidate.firstSourceIntervalId as firstSourceIntervalId, + candidate.lastSourceIntervalId as lastSourceIntervalId, + candidate.startedAt as startedAt, + candidate.endedAt as endedAt, + candidate.startedAtEpochSecond as startedAtEpochSecond, + candidate.endedAtEpochSecond as endedAtEpochSecond, + candidate.durationSeconds as durationSeconds, + candidate.odometerBeginKm as odometerBeginKm, + candidate.odometerEndKm as odometerEndKm, + candidate.registrationKey as registrationKey, + candidate.vehicleKey as vehicleKey, + candidate.sourceKind as sourceKind, + candidate.sourceIntervalIds as sourceIntervalIds +from MergedVehicleUsageNext as candidate +where candidate.driverKey = next.driverKey; + +@Priority(40) +on RawVehicleUsageInterval as next +delete from MergedVehicleUsageNext as candidate +where candidate.driverKey = next.driverKey; + +/* + * The last accumulated interval cannot be emitted by looking at the next interval, so Java + * sends one TachographProjectionFinalizeEvent per driver after all vehicle-usage point + * events and before activity point events. + */ +@Priority(20) +on TachographProjectionFinalizeEvent as finalizeEvent +insert into TachographVehicleUsageIntervalInputEvent +select + current.sessionId as sessionId, + current.driverKey as driverKey, + current.intervalId as intervalId, + current.firstSourceIntervalId as firstSourceIntervalId, + current.lastSourceIntervalId as lastSourceIntervalId, + current.startedAt as startedAt, + current.endedAt as endedAt, + current.startedAtEpochSecond as startedAtEpochSecond, + current.endedAtEpochSecond as endedAtEpochSecond, + current.durationSeconds as durationSeconds, + current.odometerBeginKm as odometerBeginKm, + current.odometerEndKm as odometerEndKm, + current.registrationKey as registrationKey, + current.vehicleKey as vehicleKey, + current.sourceKind as sourceKind, + current.sourceIntervalIds as sourceIntervalIds +from MergedVehicleUsageAccumulator as current +where current.driverKey = finalizeEvent.driverKey; + +@Priority(10) +on TachographProjectionFinalizeEvent as finalizeEvent +delete from MergedVehicleUsageAccumulator as current +where current.driverKey = finalizeEvent.driverKey; diff --git a/src/test/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineReusableProjectionBuilderTest.java b/src/test/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineReusableProjectionBuilderTest.java index d783606..ba649ad 100644 --- a/src/test/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineReusableProjectionBuilderTest.java +++ b/src/test/java/at/procon/eventhub/tachographfilesession/service/DriverTimelineReusableProjectionBuilderTest.java @@ -1,6 +1,8 @@ package at.procon.eventhub.tachographfilesession.service; import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.service.EventDetailsFactory; +import com.fasterxml.jackson.databind.ObjectMapper; import static org.assertj.core.api.Assertions.assertThat; import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; @@ -16,6 +18,7 @@ import at.procon.eventhub.tachographfilesession.model.TachographEsperPotentialHo import at.procon.eventhub.tachographfilesession.model.TachographEsperVuCardAbsentIntervalEvent; import at.procon.eventhub.tachographfilesession.model.TachographFileSession; import at.procon.eventhub.tachographfilesession.model.TachographFileSessionMetadata; +import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle; import java.time.Instant; import java.time.OffsetDateTime; import java.time.temporal.ChronoUnit; @@ -299,4 +302,236 @@ class DriverTimelineReusableProjectionBuilderTest { .extracting("eventId", "eventDomain") .containsExactly("SUP-1", "POSITION"); } + + @Test + void derivesDrivingProjectionFromRawPointEvents() { + DriverExtractionSession driver = new DriverExtractionSession( + "12:123", + null, + null, + List.of(), + List.of(), + List.of( + new ExtractedCardVehicleUsageInterval( + "CVU-1", + OffsetDateTime.parse("2026-05-01T08:00:00Z"), + OffsetDateTime.parse("2026-05-01T10:00:00Z"), + 100L, + 200L, + "12:REG-1", + "VIN-1", + "vu-1" + ), + new ExtractedCardVehicleUsageInterval( + "CVU-2", + OffsetDateTime.parse("2026-05-01T10:00:00Z"), + OffsetDateTime.parse("2026-05-01T12:00:00Z"), + 200L, + 250L, + "12:REG-1", + "VIN-1", + "vu-2" + ), + new ExtractedCardVehicleUsageInterval( + "CVU-3", + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T02:00:00Z"), + 251L, + 300L, + "12:REG-2", + "VIN-2", + "vu-3" + ) + ), + List.of( + new ExtractedCardActivityInterval( + "ACT-1", + OffsetDateTime.parse("2026-05-01T08:00:00Z"), + OffsetDateTime.parse("2026-05-01T10:00:00Z"), + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "a" + ), + new ExtractedCardActivityInterval( + "ACT-2", + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T00:30:00Z"), + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-2", + "VIN-2", + "b" + ) + ), + List.of(), + List.of() + ); + TachographFileSession session = new TachographFileSession( + UUID.randomUUID(), + new TachographFileSessionMetadata("default", "legalrequirements-drivercard", "sample", "sample.ddd", "a", 2, "42", "b", true, null), + Map.of(driver.driverKey(), driver), + new ExtractionStats(1, 2, 3, 1, 1, 0), + List.of(), + Instant.now(), + Instant.now().plus(4, ChronoUnit.HOURS) + ); + + IntervalBackedDriverTimelineEventBuilder intervalEventBuilder = new IntervalBackedDriverTimelineEventBuilder( + legacyBuilder, + new DriverKeyFactory(), + new VehicleKeyFactory(), + new EventDetailsFactory(new ObjectMapper()) + ); + RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(intervalEventBuilder); + TachographTimelineEventBundle rawPointEventBundle = rawSourceEventBuilder.buildRawEventBundle(session, driver); + + assertThat(rawPointEventBundle.activityEvents()).hasSize(4); + assertThat(rawPointEventBundle.vehicleUsageEvents()).hasSize(6); + + ResolvedDriverTimeline javaResolvedTimeline = legacyBuilder.build(session, driver); + TachographEsperDrivingDerivedProjectionBundle intervalBundle = + reusableBuilder.buildEsperDrivingDerivedProjectionBundle( + session.sessionId(), + driver.driverKey(), + javaResolvedTimeline, + 3, + 720 + ); + TachographEsperDrivingDerivedProjectionBundle eventBundle = + reusableBuilder.buildEsperDrivingDerivedProjectionBundleFromEvents( + session.sessionId(), + driver.driverKey(), + rawPointEventBundle.allEvents(), + 3, + 720 + ); + + assertThat(eventBundle.drivingInterruptionIntervals()) + .containsExactlyElementsOf(intervalBundle.drivingInterruptionIntervals()); + assertThat(eventBundle.dailyWeeklyRestCandidateIntervals()) + .containsExactlyElementsOf(intervalBundle.dailyWeeklyRestCandidateIntervals()); + assertThat(eventBundle.drivingInterruptionVehicleChangeIntervals()) + .containsExactlyElementsOf(intervalBundle.drivingInterruptionVehicleChangeIntervals()); + assertThat(eventBundle.vuCardAbsentIntervals()).hasSize(1); + assertThat(eventBundle.vuCardAbsentIntervals().get(0).startedAt()) + .isEqualTo(OffsetDateTime.parse("2026-05-01T12:00:01Z")); + assertThat(eventBundle.vuCardAbsentIntervals().get(0).endedAt()) + .isEqualTo(OffsetDateTime.parse("2026-05-02T00:00:00Z")); + } + + + @Test + void eventModeMergesVehicleUsageIntervalsAcrossMidnightContinuation() { + DriverExtractionSession driver = new DriverExtractionSession( + "12:123", + null, + null, + List.of(), + List.of(), + List.of( + new ExtractedCardVehicleUsageInterval( + "CVU-1", + OffsetDateTime.parse("2026-05-01T08:00:00Z"), + OffsetDateTime.parse("2026-05-01T23:59:59Z"), + 100L, + 240L, + "12:REG-1", + "VIN-1", + "vu-1" + ), + new ExtractedCardVehicleUsageInterval( + "CVU-2", + OffsetDateTime.parse("2026-05-02T00:00:00Z"), + OffsetDateTime.parse("2026-05-02T02:00:00Z"), + 240L, + 280L, + "12:REG-1", + "VIN-1", + "vu-2" + ) + ), + List.of( + new ExtractedCardActivityInterval( + "ACT-1", + OffsetDateTime.parse("2026-05-01T08:00:00Z"), + OffsetDateTime.parse("2026-05-01T10:00:00Z"), + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "a" + ), + new ExtractedCardActivityInterval( + "ACT-2", + OffsetDateTime.parse("2026-05-02T00:30:00Z"), + OffsetDateTime.parse("2026-05-02T01:00:00Z"), + "DRIVE", + "DRIVER", + "INSERTED", + "SINGLE", + "12:REG-1", + "VIN-1", + "b" + ) + ), + List.of(), + List.of() + ); + TachographFileSession session = new TachographFileSession( + UUID.randomUUID(), + new TachographFileSessionMetadata("default", "legalrequirements-drivercard", "sample", "sample.ddd", "a", 2, "42", "b", true, null), + Map.of(driver.driverKey(), driver), + new ExtractionStats(1, 2, 2, 1, 1, 0), + List.of(), + Instant.now(), + Instant.now().plus(4, ChronoUnit.HOURS) + ); + + IntervalBackedDriverTimelineEventBuilder intervalEventBuilder = new IntervalBackedDriverTimelineEventBuilder( + legacyBuilder, + new DriverKeyFactory(), + new VehicleKeyFactory(), + new EventDetailsFactory(new ObjectMapper()) + ); + RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(intervalEventBuilder); + TachographTimelineEventBundle rawPointEventBundle = rawSourceEventBuilder.buildRawEventBundle(session, driver); + ResolvedDriverTimeline javaResolvedTimeline = legacyBuilder.build(session, driver); + + TachographEsperDrivingDerivedProjectionBundle intervalBundle = + reusableBuilder.buildEsperDrivingDerivedProjectionBundle( + session.sessionId(), + driver.driverKey(), + javaResolvedTimeline, + 3, + 720 + ); + TachographEsperDrivingDerivedProjectionBundle eventBundle = + reusableBuilder.buildEsperDrivingDerivedProjectionBundleFromEvents( + session.sessionId(), + driver.driverKey(), + rawPointEventBundle.allEvents(), + 3, + 720 + ); + + assertThat(javaResolvedTimeline.vehicleUsageIntervals()).hasSize(1); + assertThat(javaResolvedTimeline.vehicleUsageIntervals().get(0).from()) + .isEqualTo(OffsetDateTime.parse("2026-05-01T08:00:00Z")); + assertThat(javaResolvedTimeline.vehicleUsageIntervals().get(0).to()) + .isEqualTo(OffsetDateTime.parse("2026-05-02T02:00:00Z")); + assertThat(eventBundle.vuCardAbsentIntervals()).isEmpty(); + assertThat(eventBundle.dailyWeeklyRestCandidateCoverageIntervals()) + .containsExactlyElementsOf(intervalBundle.dailyWeeklyRestCandidateCoverageIntervals()); + assertThat(eventBundle.potentialInVehicleOvernightStayIntervals()) + .containsExactlyElementsOf(intervalBundle.potentialInVehicleOvernightStayIntervals()); + } + }