Add raw event tachograph projection input path

This commit is contained in:
trifonovt 2026-05-25 16:33:59 +02:00
parent 8a75db58fd
commit 3bccda20e8
9 changed files with 1407 additions and 7 deletions

View File

@ -357,6 +357,7 @@ public class EventHubProperties {
public static class Processing { public static class Processing {
private TimelineInputMode timelineInputMode = TimelineInputMode.INTERVALS; private TimelineInputMode timelineInputMode = TimelineInputMode.INTERVALS;
private DrivingDerivedProjectionInputMode drivingDerivedProjectionInputMode = DrivingDerivedProjectionInputMode.INTERVALS;
private int operatingSplitIdleHours = 7; private int operatingSplitIdleHours = 7;
private int significantDrivingMinutes = 3; private int significantDrivingMinutes = 3;
private int minimumRestPeriodMinutes = 720; private int minimumRestPeriodMinutes = 720;
@ -377,6 +378,16 @@ public class EventHubProperties {
} }
} }
public DrivingDerivedProjectionInputMode getDrivingDerivedProjectionInputMode() {
return drivingDerivedProjectionInputMode;
}
public void setDrivingDerivedProjectionInputMode(DrivingDerivedProjectionInputMode drivingDerivedProjectionInputMode) {
if (drivingDerivedProjectionInputMode != null) {
this.drivingDerivedProjectionInputMode = drivingDerivedProjectionInputMode;
}
}
public int getOperatingSplitIdleHours() { public int getOperatingSplitIdleHours() {
return operatingSplitIdleHours; return operatingSplitIdleHours;
} }
@ -456,6 +467,14 @@ public class EventHubProperties {
EVENTS EVENTS
} }
public enum DrivingDerivedProjectionInputMode {
/** Existing stable path: Java resolves intervals and EPL receives interval input streams. */
INTERVALS,
/** New path: EPL receives EventHub point events and reconstructs interval streams internally. */
EVENTS
}
public static class LegalRequirements { public static class LegalRequirements {
private static final String DEFAULT_BASE_URL = "https://legalrequirements.services.bytebar.eu/ODataV4/LR"; private static final String DEFAULT_BASE_URL = "https://legalrequirements.services.bytebar.eu/ODataV4/LR";

View File

@ -20,6 +20,19 @@ public interface DriverTimelineEventBuilder {
ResolvedDriverTimeline timeline ResolvedDriverTimeline timeline
); );
/**
* Builds the rawest point-event representation supported by the implementation.
*
* <p>The default preserves the existing interval-backed behavior. Implementations that can
* emit point events directly from source records should override this method.</p>
*/
default TachographTimelineEventBundle buildRawEventBundle(
TachographFileSession session,
DriverExtractionSession driverSession
) {
return buildEventBundle(session, driverSession);
}
default List<EventHubEventDto> buildEvents( default List<EventHubEventDto> buildEvents(
TachographFileSession session, TachographFileSession session,
DriverExtractionSession driverSession DriverExtractionSession driverSession

View File

@ -10,7 +10,13 @@ import com.espertech.esper.runtime.client.EPDeployException;
import com.espertech.esper.runtime.client.EPDeployment; import com.espertech.esper.runtime.client.EPDeployment;
import com.espertech.esper.runtime.client.EPRuntime; import com.espertech.esper.runtime.client.EPRuntime;
import com.espertech.esper.runtime.client.EPRuntimeProvider; import com.espertech.esper.runtime.client.EPRuntimeProvider;
import com.fasterxml.jackson.databind.JsonNode;
import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.processing.service.UnifiedEventTimelineReconstructor;
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent; import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent;
import at.procon.eventhub.tachographfilesession.model.ResolvedActivityInterval; import at.procon.eventhub.tachographfilesession.model.ResolvedActivityInterval;
@ -39,6 +45,7 @@ import java.util.Objects;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StreamUtils; import org.springframework.util.StreamUtils;
@ -49,15 +56,31 @@ public class DriverTimelineReusableProjectionBuilder {
private static final AtomicLong RUNTIME_COUNTER = new AtomicLong(); private static final AtomicLong RUNTIME_COUNTER = new AtomicLong();
private static final String DRIVING_DERIVED_PROJECTION_BUNDLE_EPL_TEMPLATE = private static final String DRIVING_DERIVED_PROJECTION_BUNDLE_EPL_TEMPLATE =
loadResource("esper/tachograph-driving-derived-projection-bundle.epl"); loadResource("esper/tachograph-driving-derived-projection-bundle.epl");
private static final String DRIVING_DERIVED_PROJECTION_EVENTS_PREPROCESSOR_EPL =
loadResource("esper/tachograph-driving-derived-projection-events-preprocessor.epl");
private final DriverTimelineBuilder driverTimelineBuilder; private final DriverTimelineBuilder driverTimelineBuilder;
private final RawSourceDriverTimelineEventBuilder rawSourceEventBuilder;
private final UnifiedEventTimelineReconstructor timelineReconstructor;
private final EventHubProperties properties; private final EventHubProperties properties;
public DriverTimelineReusableProjectionBuilder( public DriverTimelineReusableProjectionBuilder(
DriverTimelineBuilder driverTimelineBuilder, DriverTimelineBuilder driverTimelineBuilder,
EventHubProperties properties EventHubProperties properties
) {
this(driverTimelineBuilder, null, new UnifiedEventTimelineReconstructor(), properties);
}
@Autowired
public DriverTimelineReusableProjectionBuilder(
DriverTimelineBuilder driverTimelineBuilder,
RawSourceDriverTimelineEventBuilder rawSourceEventBuilder,
UnifiedEventTimelineReconstructor timelineReconstructor,
EventHubProperties properties
) { ) {
this.driverTimelineBuilder = driverTimelineBuilder; this.driverTimelineBuilder = driverTimelineBuilder;
this.rawSourceEventBuilder = rawSourceEventBuilder;
this.timelineReconstructor = timelineReconstructor;
this.properties = properties; this.properties = properties;
} }
@ -67,6 +90,18 @@ public class DriverTimelineReusableProjectionBuilder {
int significantDrivingMinutes, int significantDrivingMinutes,
int minimumRestPeriodMinutes int minimumRestPeriodMinutes
) { ) {
if (session == null || driverSession == null) {
return emptyBundle();
}
if (drivingDerivedProjectionInputMode() == EventHubProperties.DrivingDerivedProjectionInputMode.EVENTS) {
return buildEsperDrivingDerivedProjectionBundleFromEvents(
session.sessionId(),
driverSession.driverKey(),
rawSourceEventBuilder().buildRawEventBundle(session, driverSession).allEvents(),
significantDrivingMinutes,
minimumRestPeriodMinutes
);
}
ResolvedDriverTimeline timeline = driverTimelineBuilder.build(session, driverSession); ResolvedDriverTimeline timeline = driverTimelineBuilder.build(session, driverSession);
return buildEsperDrivingDerivedProjectionBundle( return buildEsperDrivingDerivedProjectionBundle(
session.sessionId(), session.sessionId(),
@ -86,6 +121,22 @@ public class DriverTimelineReusableProjectionBuilder {
return emptyBundle(); return emptyBundle();
} }
if (drivingDerivedProjectionInputMode() == EventHubProperties.DrivingDerivedProjectionInputMode.EVENTS) {
List<EventHubEventDto> events = new ArrayList<>();
for (DriverExtractionSession driverSession : session.driversByKey().values()) {
if (driverSession != null && driverSession.driverKey() != null) {
events.addAll(rawSourceEventBuilder().buildRawEventBundle(session, driverSession).allEvents());
}
}
return buildEsperDrivingDerivedProjectionBundleFromEvents(
session.sessionId(),
null,
events,
significantDrivingMinutes,
minimumRestPeriodMinutes
);
}
List<Map<String, Object>> activityInputEvents = new ArrayList<>(); List<Map<String, Object>> activityInputEvents = new ArrayList<>();
List<Map<String, Object>> vehicleUsageInputEvents = new ArrayList<>(); List<Map<String, Object>> vehicleUsageInputEvents = new ArrayList<>();
List<Map<String, Object>> supportGeoInputEvents = new ArrayList<>(); List<Map<String, Object>> supportGeoInputEvents = new ArrayList<>();
@ -234,6 +285,161 @@ public class DriverTimelineReusableProjectionBuilder {
); );
} }
public TachographEsperDrivingDerivedProjectionBundle buildEsperDrivingDerivedProjectionBundleFromEvents(
List<EventHubEventDto> events,
int significantDrivingMinutes,
int minimumRestPeriodMinutes
) {
return buildEsperDrivingDerivedProjectionBundleFromEvents(
null,
null,
events,
significantDrivingMinutes,
minimumRestPeriodMinutes
);
}
public TachographEsperDrivingDerivedProjectionBundle buildEsperDrivingDerivedProjectionBundleFromEvents(
UUID fallbackSessionId,
String fallbackDriverKey,
List<EventHubEventDto> events,
int significantDrivingMinutes,
int minimumRestPeriodMinutes
) {
ResolvedDriverTimeline reconstructedTimeline = reconstructMergedTimelineFromEvents(
fallbackSessionId,
fallbackDriverKey,
events
);
return buildEsperDrivingDerivedProjectionBundle(
fallbackSessionId == null ? new UUID(0L, 0L) : fallbackSessionId,
fallbackDriverKey,
reconstructedTimeline,
significantDrivingMinutes,
minimumRestPeriodMinutes
);
}
private ResolvedDriverTimeline reconstructMergedTimelineFromEvents(
UUID fallbackSessionId,
String fallbackDriverKey,
List<EventHubEventDto> events
) {
ResolvedDriverTimeline reconstructed = timelineReconstructor.reconstruct(
fallbackSessionId == null ? new UUID(0L, 0L) : fallbackSessionId,
fallbackDriverKey,
safeList(events)
);
List<ResolvedVehicleUsageInterval> mergedVehicleUsageIntervals = mergeVehicleUsageIntervals(
reconstructed.vehicleUsageIntervals(),
reconstructed.sourceKind()
);
return new ResolvedDriverTimeline(
reconstructed.sourceKind(),
reconstructed.loadedFrom(),
reconstructed.loadedTo(),
mergedVehicleUsageIntervals,
reconstructed.activityIntervals(),
reconstructed.supportEvents(),
reconstructed.warnings()
);
}
private List<ResolvedVehicleUsageInterval> mergeVehicleUsageIntervals(
List<ResolvedVehicleUsageInterval> intervals,
String sourceKind
) {
List<ResolvedVehicleUsageInterval> sorted = safeList(intervals).stream()
.sorted(Comparator.comparing(ResolvedVehicleUsageInterval::from)
.thenComparing(ResolvedVehicleUsageInterval::to, Comparator.nullsLast(Comparator.naturalOrder())))
.toList();
if (sorted.isEmpty()) {
return List.of();
}
List<ResolvedVehicleUsageInterval> result = new ArrayList<>();
ResolvedVehicleUsageInterval current = sorted.getFirst();
List<String> currentSources = new ArrayList<>(current.sourceIntervalIds());
for (int i = 1; i < sorted.size(); i++) {
ResolvedVehicleUsageInterval next = sorted.get(i);
if (canMergeVehicleUsage(current, next)) {
currentSources.addAll(next.sourceIntervalIds());
current = ResolvedVehicleUsageInterval.resolved(
current.sessionId(),
current.driverKey(),
current.intervalId() + "+" + next.intervalId(),
current.from(),
mergedTo(current.to(), next.to()),
current.odometerBeginKm(),
next.odometerEndKm() != null ? next.odometerEndKm() : current.odometerEndKm(),
current.registrationKey(),
current.vehicleKey(),
sourceKind,
currentSources
);
} else {
result.add(current);
current = next;
currentSources = new ArrayList<>(current.sourceIntervalIds());
}
}
result.add(current);
return result;
}
private boolean canMergeVehicleUsage(ResolvedVehicleUsageInterval left, ResolvedVehicleUsageInterval right) {
return Objects.equals(left.registrationKey(), right.registrationKey())
&& Objects.equals(left.vehicleKey(), right.vehicleKey())
&& !right.from().isAfter(mergeBoundary(left.to()));
}
private OffsetDateTime mergedTo(OffsetDateTime left, OffsetDateTime right) {
if (left == null || right == null) {
return null;
}
return left.isAfter(right) ? left : right;
}
private OffsetDateTime mergeBoundary(OffsetDateTime endInclusive) {
return endInclusive == null ? OffsetDateTime.MAX : endInclusive.plusSeconds(1);
}
private TachographEsperDrivingDerivedProjectionBundle buildEsperDrivingDerivedProjectionBundleFromPointInput(
List<Map<String, Object>> activityPointInputEvents,
List<Map<String, Object>> vehicleUsagePointInputEvents,
List<Map<String, Object>> supportGeoInputEvents,
int significantDrivingMinutes,
int minimumRestPeriodMinutes
) {
throw new UnsupportedOperationException("Direct EPL point-input preprocessing is currently disabled.");
}
private List<Map<String, Object>> buildProjectionFinalizeEvents(
List<Map<String, Object>> vehicleUsagePointInputEvents
) {
Map<String, Map<String, Object>> byDriver = new LinkedHashMap<>();
for (Map<String, Object> point : safeList(vehicleUsagePointInputEvents)) {
String driverKey = Objects.toString(point.get("driverKey"), null);
if (driverKey == null || driverKey.isBlank()) {
continue;
}
Map<String, Object> finalizeEvent = byDriver.computeIfAbsent(driverKey, ignored -> {
Map<String, Object> event = new LinkedHashMap<>();
event.put("sessionId", point.get("sessionId"));
event.put("driverKey", driverKey);
event.put("finalizedAtEpochSecond", 0L);
return event;
});
Long occurredAtEpochSecond = (Long) point.get("occurredAtEpochSecond");
Long finalizedAtEpochSecond = (Long) finalizeEvent.get("finalizedAtEpochSecond");
if (occurredAtEpochSecond != null
&& (finalizedAtEpochSecond == null || occurredAtEpochSecond > finalizedAtEpochSecond)) {
finalizeEvent.put("finalizedAtEpochSecond", occurredAtEpochSecond);
}
}
return new ArrayList<>(byDriver.values());
}
private List<Map<String, Object>> buildActivityIntervalInputEvents( private List<Map<String, Object>> buildActivityIntervalInputEvents(
UUID sessionId, UUID sessionId,
String driverKey, String driverKey,
@ -324,6 +530,56 @@ public class DriverTimelineReusableProjectionBuilder {
} }
} }
private Map<String, Object> activityPointInputDefinition() {
Map<String, Object> definition = new LinkedHashMap<>();
definition.put("sessionId", UUID.class);
definition.put("driverKey", String.class);
definition.put("eventId", String.class);
definition.put("intervalId", String.class);
definition.put("sourceRowId", String.class);
definition.put("sourceRowIds", java.util.List.class);
definition.put("activityType", String.class);
definition.put("lifecycle", String.class);
definition.put("occurredAt", OffsetDateTime.class);
definition.put("occurredAtEpochSecond", long.class);
definition.put("cardSlot", String.class);
definition.put("cardStatus", String.class);
definition.put("drivingStatus", String.class);
definition.put("registrationKey", String.class);
definition.put("vehicleKey", String.class);
definition.put("sourceKind", String.class);
definition.put("synthetic", boolean.class);
definition.put("clippedToRequestedPeriod", boolean.class);
definition.put("level", String.class);
return definition;
}
private Map<String, Object> vehicleUsagePointInputDefinition() {
Map<String, Object> definition = new LinkedHashMap<>();
definition.put("sessionId", UUID.class);
definition.put("driverKey", String.class);
definition.put("eventId", String.class);
definition.put("intervalId", String.class);
definition.put("sourceRowId", String.class);
definition.put("sourceRowIds", java.util.List.class);
definition.put("lifecycle", String.class);
definition.put("occurredAt", OffsetDateTime.class);
definition.put("occurredAtEpochSecond", long.class);
definition.put("registrationKey", String.class);
definition.put("vehicleKey", String.class);
definition.put("sourceKind", String.class);
definition.put("odometerKm", Long.class);
return definition;
}
private Map<String, Object> projectionFinalizeInputDefinition() {
Map<String, Object> definition = new LinkedHashMap<>();
definition.put("sessionId", UUID.class);
definition.put("driverKey", String.class);
definition.put("finalizedAtEpochSecond", long.class);
return definition;
}
private Map<String, Object> activityIntervalInputDefinition() { private Map<String, Object> activityIntervalInputDefinition() {
Map<String, Object> definition = new LinkedHashMap<>(); Map<String, Object> definition = new LinkedHashMap<>();
definition.put("sessionId", UUID.class); definition.put("sessionId", UUID.class);
@ -470,6 +726,120 @@ public class DriverTimelineReusableProjectionBuilder {
return event; return event;
} }
private Map<String, Object> toActivityPointInputMap(
UUID fallbackSessionId,
String fallbackDriverKey,
EventHubEventDto sourceEvent
) {
if (sourceEvent == null
|| sourceEvent.eventDomain() != EventDomain.DRIVER_ACTIVITY
|| (sourceEvent.lifecycle() != EventLifecycle.START && sourceEvent.lifecycle() != EventLifecycle.END)
|| sourceEvent.occurredAt() == null) {
return null;
}
JsonNode raw = rawPayload(sourceEvent);
String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId());
String driverKey = firstNonBlank(text(raw, "driverKey"), fallbackDriverKey, driverKey(sourceEvent));
if (driverKey == null || intervalId == null) {
return null;
}
JsonNode attributes = attributes(sourceEvent);
Map<String, Object> event = new LinkedHashMap<>();
event.put("sessionId", sessionId(fallbackSessionId, raw, sourceEvent));
event.put("driverKey", driverKey);
event.put("eventId", sourceEvent.externalSourceEventId());
event.put("intervalId", intervalId);
event.put("sourceRowId", firstNonBlank(text(raw, "sourceRowId"), intervalId));
event.put("sourceRowIds", stringList(raw, "sourceRowIds", intervalId));
event.put("activityType", firstNonBlank(text(raw, "activityType"), eventTypeAsActivity(sourceEvent.eventType())));
event.put("lifecycle", sourceEvent.lifecycle().name());
event.put("occurredAt", sourceEvent.occurredAt());
event.put("occurredAtEpochSecond", sourceEvent.occurredAt().toEpochSecond());
event.put("cardSlot", firstNonBlank(text(raw, "cardSlot"), text(attributes, "cardSlot")));
event.put("cardStatus", firstNonBlank(text(raw, "cardStatus"), text(attributes, "cardStatus")));
event.put("drivingStatus", firstNonBlank(text(raw, "drivingStatus"), text(attributes, "drivingStatus")));
event.put("registrationKey", firstNonBlank(text(raw, "registrationKey"), registrationKey(sourceEvent)));
event.put("vehicleKey", firstNonBlank(text(raw, "vehicleKey"), vehicleKey(sourceEvent)));
event.put("sourceKind", firstNonBlank(text(raw, "sourceKind"), sourceKind(sourceEvent)));
event.put("synthetic", booleanValue(raw, "synthetic", false));
event.put("clippedToRequestedPeriod", booleanValue(raw, "clippedToRequestedPeriod", false));
event.put("level", firstNonBlank(text(raw, "level"), "RAW_EVENT"));
return event;
}
private Map<String, Object> toVehicleUsagePointInputMap(
UUID fallbackSessionId,
String fallbackDriverKey,
EventHubEventDto sourceEvent
) {
if (sourceEvent == null
|| sourceEvent.eventDomain() != EventDomain.DRIVER_CARD
|| (sourceEvent.lifecycle() != EventLifecycle.INSERT && sourceEvent.lifecycle() != EventLifecycle.WITHDRAW)
|| sourceEvent.occurredAt() == null) {
return null;
}
boolean supportedType = sourceEvent.eventType() == EventType.CARD_INSERTED
|| sourceEvent.eventType() == EventType.CARD_WITHDRAWN;
if (!supportedType) {
return null;
}
JsonNode raw = rawPayload(sourceEvent);
String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId());
String driverKey = firstNonBlank(text(raw, "driverKey"), fallbackDriverKey, driverKey(sourceEvent));
if (driverKey == null || intervalId == null) {
return null;
}
Map<String, Object> event = new LinkedHashMap<>();
event.put("sessionId", sessionId(fallbackSessionId, raw, sourceEvent));
event.put("driverKey", driverKey);
event.put("eventId", sourceEvent.externalSourceEventId());
event.put("intervalId", intervalId);
event.put("sourceRowId", firstNonBlank(text(raw, "sourceRowId"), intervalId));
event.put("sourceRowIds", stringList(raw, "sourceRowIds", intervalId));
event.put("lifecycle", sourceEvent.lifecycle().name());
event.put("occurredAt", sourceEvent.occurredAt());
event.put("occurredAtEpochSecond", sourceEvent.occurredAt().toEpochSecond());
event.put("registrationKey", firstNonBlank(text(raw, "registrationKey"), registrationKey(sourceEvent)));
event.put("vehicleKey", firstNonBlank(text(raw, "vehicleKey"), vehicleKey(sourceEvent)));
event.put("sourceKind", firstNonBlank(text(raw, "sourceKind"), sourceKind(sourceEvent)));
event.put("odometerKm", odometerKm(sourceEvent, raw));
return event;
}
private Map<String, Object> toSupportGeoEvidenceInputMap(
UUID fallbackSessionId,
String fallbackDriverKey,
EventHubEventDto sourceEvent
) {
if (sourceEvent == null || sourceEvent.occurredAt() == null || sourceEvent.position() == null) {
return null;
}
String eventDomain = sourceEvent.eventDomain() == null ? null : sourceEvent.eventDomain().name();
int priority = supportGeoPriority(eventDomain);
if (priority <= 0) {
return null;
}
JsonNode raw = rawPayload(sourceEvent);
String driverKey = firstNonBlank(text(raw, "driverKey"), fallbackDriverKey, driverKey(sourceEvent));
if (driverKey == null) {
return null;
}
Map<String, Object> event = new LinkedHashMap<>();
event.put("sessionId", sessionId(fallbackSessionId, raw, sourceEvent));
event.put("driverKey", driverKey);
event.put("eventId", firstNonBlank(text(raw, "supportEventId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId()));
event.put("eventDomain", eventDomain);
event.put("occurredAt", sourceEvent.occurredAt());
event.put("occurredAtEpochSecond", sourceEvent.occurredAt().toEpochSecond());
event.put("registrationKey", firstNonBlank(text(raw, "registrationKey"), registrationKey(sourceEvent)));
event.put("vehicleKey", firstNonBlank(text(raw, "vehicleKey"), vehicleKey(sourceEvent)));
event.put("latitude", sourceEvent.position().latitude().doubleValue());
event.put("longitude", sourceEvent.position().longitude().doubleValue());
event.put("odometerKm", odometerKm(sourceEvent, raw));
event.put("priority", priority);
return event;
}
private int supportGeoPriority(String eventDomain) { private int supportGeoPriority(String eventDomain) {
if (eventDomain == null || eventDomain.isBlank()) { if (eventDomain == null || eventDomain.isBlank()) {
return 0; return 0;
@ -483,6 +853,186 @@ public class DriverTimelineReusableProjectionBuilder {
}; };
} }
private Comparator<Map<String, Object>> pointEventComparator() {
return Comparator
.comparing((Map<String, Object> event) -> (Long) event.get("occurredAtEpochSecond"))
.thenComparing(event -> lifecycleOrder(Objects.toString(event.get("lifecycle"), "")))
.thenComparing(event -> Objects.toString(event.get("driverKey"), ""))
.thenComparing(event -> Objects.toString(event.get("intervalId"), ""))
.thenComparing(event -> Objects.toString(event.get("eventId"), ""));
}
private int lifecycleOrder(String lifecycle) {
return switch (lifecycle) {
case "INSERT", "START" -> 0;
case "WITHDRAW", "END" -> 1;
default -> 2;
};
}
private EventHubProperties.DrivingDerivedProjectionInputMode drivingDerivedProjectionInputMode() {
return properties.getTachographFileSession().getProcessing().getDrivingDerivedProjectionInputMode();
}
private RawSourceDriverTimelineEventBuilder rawSourceEventBuilder() {
if (rawSourceEventBuilder == null) {
throw new IllegalStateException(
"Driving-derived projection input mode EVENTS requires RawSourceDriverTimelineEventBuilder"
);
}
return rawSourceEventBuilder;
}
private JsonNode rawPayload(EventHubEventDto event) {
JsonNode payload = event.payload();
if (payload == null || payload.isNull()) {
return null;
}
JsonNode raw = payload.get("raw");
return raw == null || raw.isNull() ? payload : raw;
}
private JsonNode attributes(EventHubEventDto event) {
return event.eventDetails() == null ? null : event.eventDetails().attributes();
}
private String text(JsonNode node, String field) {
if (node == null || field == null) {
return null;
}
JsonNode value = node.get(field);
if (value == null || value.isNull()) {
return null;
}
String text = value.asText(null);
return text == null || text.isBlank() ? null : text;
}
private boolean booleanValue(JsonNode node, String field, boolean fallback) {
if (node == null || field == null || node.get(field) == null || node.get(field).isNull()) {
return fallback;
}
return node.get(field).asBoolean(fallback);
}
private Long longValue(JsonNode node, String field) {
if (node == null || field == null || node.get(field) == null || node.get(field).isNull()) {
return null;
}
JsonNode value = node.get(field);
if (value.isNumber()) {
return value.asLong();
}
try {
return Long.parseLong(value.asText());
} catch (NumberFormatException ignored) {
return null;
}
}
private List<String> stringList(JsonNode node, String field, String fallback) {
JsonNode value = node == null || field == null ? null : node.get(field);
if (value == null || value.isNull()) {
return fallback == null ? List.of() : List.of(fallback);
}
if (value.isArray()) {
List<String> result = new ArrayList<>();
value.forEach(item -> {
if (item != null && !item.isNull()) {
String text = item.asText(null);
if (text != null && !text.isBlank()) {
result.add(text);
}
}
});
return result.isEmpty() && fallback != null ? List.of(fallback) : List.copyOf(result);
}
String text = value.asText(null);
return text == null || text.isBlank() ? (fallback == null ? List.of() : List.of(fallback)) : List.of(text);
}
private String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String value : values) {
if (value != null && !value.isBlank()) {
return value.trim();
}
}
return null;
}
private String eventTypeAsActivity(EventType eventType) {
if (eventType == null) {
return "UNKNOWN";
}
return switch (eventType) {
case DRIVE -> "DRIVE";
case WORK -> "WORK";
case AVAILABILITY -> "AVAILABILITY";
case BREAK_REST -> "BREAK_REST";
default -> eventType.name();
};
}
private UUID sessionId(UUID fallbackSessionId, JsonNode raw, EventHubEventDto event) {
String rawSessionId = firstNonBlank(
text(raw, "sessionId"),
event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId()
);
if (rawSessionId != null) {
try {
return UUID.fromString(rawSessionId);
} catch (IllegalArgumentException ignored) {
// DB-acquired source packages need not be UUID-based file sessions.
}
}
return fallbackSessionId == null ? new UUID(0L, 0L) : fallbackSessionId;
}
private String driverKey(EventHubEventDto event) {
if (event.driverRef() == null) {
return null;
}
if (event.driverRef().driverCard() != null && event.driverRef().driverCard().hasValue()) {
return event.driverRef().driverCard().stableKey();
}
return event.driverRef().sourceEntityId();
}
private String registrationKey(EventHubEventDto event) {
if (event.vehicleRef() == null || event.vehicleRef().vehicleRegistration() == null) {
return null;
}
return event.vehicleRef().vehicleRegistration().stableKey();
}
private String vehicleKey(EventHubEventDto event) {
if (event.vehicleRef() == null) {
return null;
}
return firstNonBlank(event.vehicleRef().vin(), event.vehicleRef().sourceVehicleEntityId());
}
private String sourceKind(EventHubEventDto event) {
return event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().sourceKind();
}
private Long odometerKm(EventHubEventDto event, JsonNode raw) {
Long explicit = longValue(raw, event.lifecycle() == EventLifecycle.WITHDRAW ? "odometerEndKm" : "odometerBeginKm");
if (explicit != null) {
return explicit;
}
explicit = longValue(raw, "odometerKm");
if (explicit != null) {
return explicit;
}
return event.odometerM() == null ? null : event.odometerM() / 1_000L;
}
private String firstSourceIntervalId(ResolvedActivityInterval interval) { private String firstSourceIntervalId(ResolvedActivityInterval interval) {
return interval.sourceIntervalIds().isEmpty() ? interval.intervalId() : interval.sourceIntervalIds().get(0); return interval.sourceIntervalIds().isEmpty() ? interval.intervalId() : interval.sourceIntervalIds().get(0);
} }
@ -901,6 +1451,12 @@ public class DriverTimelineReusableProjectionBuilder {
.toList(); .toList();
} }
private String renderDrivingDerivedProjectionEventsEpl(int significantDrivingMinutes, int minimumRestPeriodMinutes) {
return DRIVING_DERIVED_PROJECTION_EVENTS_PREPROCESSOR_EPL
+ "\n\n"
+ renderDrivingDerivedProjectionBundleEpl(significantDrivingMinutes, minimumRestPeriodMinutes);
}
private String renderDrivingDerivedProjectionBundleEpl(int significantDrivingMinutes, int minimumRestPeriodMinutes) { private String renderDrivingDerivedProjectionBundleEpl(int significantDrivingMinutes, int minimumRestPeriodMinutes) {
return DRIVING_DERIVED_PROJECTION_BUNDLE_EPL_TEMPLATE return DRIVING_DERIVED_PROJECTION_BUNDLE_EPL_TEMPLATE
.replace( .replace(

View File

@ -95,6 +95,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE
List<EventHubEventDto> activityEvents = buildActivityEvents( List<EventHubEventDto> activityEvents = buildActivityEvents(
session, session,
driverSession.driverKey(),
timeline.activityIntervals(), timeline.activityIntervals(),
driverRef, driverRef,
registrationsByKey, registrationsByKey,
@ -104,6 +105,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE
); );
List<EventHubEventDto> vehicleUsageEvents = buildVehicleUsageEvents( List<EventHubEventDto> vehicleUsageEvents = buildVehicleUsageEvents(
session, session,
driverSession.driverKey(),
timeline.vehicleUsageIntervals(), timeline.vehicleUsageIntervals(),
driverRef, driverRef,
registrationsByKey, registrationsByKey,
@ -125,6 +127,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE
private List<EventHubEventDto> buildActivityEvents( private List<EventHubEventDto> buildActivityEvents(
TachographFileSession session, TachographFileSession session,
String driverKey,
List<ResolvedActivityInterval> intervals, List<ResolvedActivityInterval> intervals,
DriverRefDto driverRef, DriverRefDto driverRef,
Map<String, ExtractedVehicleRegistration> registrationsByKey, Map<String, ExtractedVehicleRegistration> registrationsByKey,
@ -147,6 +150,8 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE
Map<String, Object> raw = new LinkedHashMap<>(); Map<String, Object> raw = new LinkedHashMap<>();
raw.put("intervalId", interval.intervalId()); raw.put("intervalId", interval.intervalId());
raw.put("sourceRowId", interval.intervalId()); raw.put("sourceRowId", interval.intervalId());
raw.put("driverKey", driverKey);
raw.put("activityType", interval.activityType());
raw.put("sourceRowIds", interval.sourceIntervalIds()); raw.put("sourceRowIds", interval.sourceIntervalIds());
raw.put("startedAt", timeText(interval.from())); raw.put("startedAt", timeText(interval.from()));
raw.put("endedAt", timeText(interval.to())); raw.put("endedAt", timeText(interval.to()));
@ -200,6 +205,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE
private List<EventHubEventDto> buildVehicleUsageEvents( private List<EventHubEventDto> buildVehicleUsageEvents(
TachographFileSession session, TachographFileSession session,
String driverKey,
List<ResolvedVehicleUsageInterval> intervals, List<ResolvedVehicleUsageInterval> intervals,
DriverRefDto driverRef, DriverRefDto driverRef,
Map<String, ExtractedVehicleRegistration> registrationsByKey, Map<String, ExtractedVehicleRegistration> registrationsByKey,
@ -226,6 +232,7 @@ public class IntervalBackedDriverTimelineEventBuilder implements DriverTimelineE
Map<String, Object> raw = new LinkedHashMap<>(); Map<String, Object> raw = new LinkedHashMap<>();
raw.put("intervalId", interval.intervalId()); raw.put("intervalId", interval.intervalId());
raw.put("sourceRowId", interval.intervalId()); raw.put("sourceRowId", interval.intervalId());
raw.put("driverKey", driverKey);
raw.put("sourceRowIds", interval.sourceIntervalIds()); raw.put("sourceRowIds", interval.sourceIntervalIds());
raw.put("startedAt", timeText(interval.from())); raw.put("startedAt", timeText(interval.from()));
raw.put("endedAt", timeText(interval.to())); raw.put("endedAt", timeText(interval.to()));

View File

@ -0,0 +1,211 @@
package at.procon.eventhub.tachographfilesession.service;
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent;
import at.procon.eventhub.tachographfilesession.model.ExtractionWarning;
import at.procon.eventhub.tachographfilesession.model.ResolvedActivityInterval;
import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline;
import at.procon.eventhub.tachographfilesession.model.ResolvedVehicleUsageInterval;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
import java.time.OffsetDateTime;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import org.springframework.stereotype.Component;
/**
* Builds EventHub point events directly from the extracted tachograph source records.
*
* <p>This builder intentionally does not run the normal {@link DriverTimelineBuilder} interval
* resolution/merging path. It only wraps each raw source interval as a START/END or
* INSERT/WITHDRAW point-event pair. Any subsequent pairing, coalescing, or derived interval logic
* belongs to the event-input EPL projection pipeline.</p>
*/
@Component
public class RawSourceDriverTimelineEventBuilder {
private final IntervalBackedDriverTimelineEventBuilder intervalEventBuilder;
public RawSourceDriverTimelineEventBuilder(IntervalBackedDriverTimelineEventBuilder intervalEventBuilder) {
this.intervalEventBuilder = intervalEventBuilder;
}
public TachographTimelineEventBundle buildRawEventBundle(
TachographFileSession session,
DriverExtractionSession driverSession
) {
if (session == null || driverSession == null) {
return new TachographTimelineEventBundle(List.of(), List.of(), List.of());
}
return intervalEventBuilder.buildEventBundle(session, driverSession, buildRawTimeline(session, driverSession));
}
public ResolvedDriverTimeline buildRawTimeline(
TachographFileSession session,
DriverExtractionSession driverSession
) {
String sourceKind = session.metadata().driverCardFile() ? "DRIVER_CARD" : "VEHICLE_UNIT";
List<ResolvedVehicleUsageInterval> vehicleUsageIntervals = rawVehicleUsageIntervals(
session,
driverSession,
sourceKind
);
List<ResolvedActivityInterval> activityIntervals = rawActivityIntervals(driverSession, sourceKind);
List<ExtractedSupportEvent> supportEvents = sortedSupportEvents(driverSession.supportEvents());
return new ResolvedDriverTimeline(
sourceKind,
minTimestamp(vehicleUsageIntervals, activityIntervals, supportEvents),
maxTimestamp(vehicleUsageIntervals, activityIntervals, supportEvents),
vehicleUsageIntervals,
activityIntervals,
supportEvents,
mergeWarnings(session.warnings(), driverSession.warnings())
);
}
private List<ResolvedVehicleUsageInterval> rawVehicleUsageIntervals(
TachographFileSession session,
DriverExtractionSession driverSession,
String sourceKind
) {
if (driverSession.cardVehicleUsageIntervals() == null || driverSession.cardVehicleUsageIntervals().isEmpty()) {
return List.of();
}
return driverSession.cardVehicleUsageIntervals().stream()
.filter(interval -> interval.from() != null && (interval.to() == null || interval.to().isAfter(interval.from())))
.map(interval -> ResolvedVehicleUsageInterval.resolved(
session.sessionId(),
driverSession.driverKey(),
interval.intervalId(),
interval.from(),
interval.to(),
interval.odometerBeginKm(),
interval.odometerEndKm(),
interval.registrationKey(),
interval.vehicleKey(),
sourceKind,
List.of(interval.intervalId())
))
.sorted(Comparator.comparing(ResolvedVehicleUsageInterval::from)
.thenComparing(ResolvedVehicleUsageInterval::to, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(ResolvedVehicleUsageInterval::intervalId, Comparator.nullsLast(String::compareTo)))
.toList();
}
private List<ResolvedActivityInterval> rawActivityIntervals(
DriverExtractionSession driverSession,
String sourceKind
) {
if (driverSession.cardActivityIntervals() == null || driverSession.cardActivityIntervals().isEmpty()) {
return List.of();
}
return driverSession.cardActivityIntervals().stream()
.filter(interval -> interval.from() != null && interval.to() != null && interval.to().isAfter(interval.from()))
.map(interval -> ResolvedActivityInterval.raw(
interval.intervalId(),
interval.from(),
interval.to(),
normalizeActivity(interval.activityType()),
interval.slot(),
interval.cardStatus(),
interval.drivingStatus(),
interval.registrationKey(),
interval.vehicleKey(),
sourceKind,
List.of(interval.intervalId())
))
.sorted(Comparator.comparing(ResolvedActivityInterval::from)
.thenComparing(ResolvedActivityInterval::to)
.thenComparing(ResolvedActivityInterval::intervalId, Comparator.nullsLast(String::compareTo)))
.toList();
}
private List<ExtractedSupportEvent> sortedSupportEvents(List<ExtractedSupportEvent> supportEvents) {
if (supportEvents == null || supportEvents.isEmpty()) {
return List.of();
}
return supportEvents.stream()
.sorted(Comparator.comparing(ExtractedSupportEvent::occurredAt)
.thenComparing(ExtractedSupportEvent::eventDomain, Comparator.nullsLast(String::compareTo))
.thenComparing(ExtractedSupportEvent::eventId, Comparator.nullsLast(String::compareTo)))
.toList();
}
private String normalizeActivity(String activityType) {
if (activityType == null || activityType.isBlank()) {
return "UNKNOWN";
}
if ("UNKNOWN_ACTIVITY".equals(activityType)) {
return "UNKNOWN";
}
return activityType;
}
private OffsetDateTime minTimestamp(
List<ResolvedVehicleUsageInterval> vehicleUsageIntervals,
List<ResolvedActivityInterval> activityIntervals,
List<ExtractedSupportEvent> supportEvents
) {
OffsetDateTime min = null;
for (ResolvedVehicleUsageInterval interval : vehicleUsageIntervals) {
min = min(min, interval.from());
}
for (ResolvedActivityInterval interval : activityIntervals) {
min = min(min, interval.from());
}
for (ExtractedSupportEvent supportEvent : supportEvents) {
min = min(min, supportEvent.occurredAt());
}
return min;
}
private OffsetDateTime maxTimestamp(
List<ResolvedVehicleUsageInterval> vehicleUsageIntervals,
List<ResolvedActivityInterval> activityIntervals,
List<ExtractedSupportEvent> supportEvents
) {
OffsetDateTime max = null;
for (ResolvedVehicleUsageInterval interval : vehicleUsageIntervals) {
max = max(max, interval.to());
}
for (ResolvedActivityInterval interval : activityIntervals) {
max = max(max, interval.to());
}
for (ExtractedSupportEvent supportEvent : supportEvents) {
max = max(max, supportEvent.occurredAt());
}
return max;
}
private List<ExtractionWarning> mergeWarnings(List<ExtractionWarning> sessionWarnings, List<ExtractionWarning> driverWarnings) {
LinkedHashSet<ExtractionWarning> merged = new LinkedHashSet<>();
if (sessionWarnings != null) {
merged.addAll(sessionWarnings);
}
if (driverWarnings != null) {
merged.addAll(driverWarnings);
}
return List.copyOf(merged);
}
private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) {
if (left == null) {
return right;
}
if (right == null) {
return left;
}
return left.isBefore(right) ? left : right;
}
private OffsetDateTime max(OffsetDateTime left, OffsetDateTime right) {
if (left == null) {
return right;
}
if (right == null) {
return left;
}
return left.isAfter(right) ? left : right;
}
}

View File

@ -175,7 +175,15 @@ public class TachographFileSessionProcessingService {
requestedTo requestedTo
); );
TachographEsperDrivingDerivedProjectionBundle derivedProjectionBundle = TachographEsperDrivingDerivedProjectionBundle derivedProjectionBundle =
reusableProjectionBuilder.buildEsperDrivingDerivedProjectionBundle( properties.getTachographFileSession().getProcessing().getDrivingDerivedProjectionInputMode()
== EventHubProperties.DrivingDerivedProjectionInputMode.EVENTS
? reusableProjectionBuilder.buildEsperDrivingDerivedProjectionBundle(
session,
driver,
significantDrivingMinutes,
minimumRestPeriodMinutes
)
: reusableProjectionBuilder.buildEsperDrivingDerivedProjectionBundle(
sessionId, sessionId,
driverKey, driverKey,
timeline, timeline,

View File

@ -131,6 +131,9 @@ eventhub:
merge-gap-seconds: 0 merge-gap-seconds: 0
gap-detection-tolerance-seconds: 0 gap-detection-tolerance-seconds: 0
timeline-input-mode: events timeline-input-mode: events
# INTERVALS = existing stable path: Java prepares activity/card-usage intervals for EPL.
# EVENTS = new path: EPL receives EventHub point events and reconstructs intervals internally.
driving-derived-projection-input-mode: events #intervals
legal-requirements: legal-requirements:
base-url: ${LEGAL_REQUIREMENTS_BASE_URL:https://legalrequirements.services.bytebar.eu/ODataV4/LR} base-url: ${LEGAL_REQUIREMENTS_BASE_URL:https://legalrequirements.services.bytebar.eu/ODataV4/LR}
username: ${LEGAL_REQUIREMENTS_USERNAME:} username: ${LEGAL_REQUIREMENTS_USERNAME:}

View File

@ -0,0 +1,348 @@
/*
* Event-input adapter for tachograph-driving-derived-projection-bundle.epl.
*
* The old bundle consumes resolved interval input streams. This preprocessor lets the same
* derived rules consume EventHub point events by pairing START/END activity events and
* INSERT/WITHDRAW card-vehicle usage events inside Esper.
*
* Vehicle-usage intervals are additionally coalesced in EPL before they are forwarded to
* the old interval bundle. This keeps event mode equivalent to DriverTimelineBuilder's
* Java merge behavior for consecutive same-vehicle CardVehiclesUsed rows, including the
* common midnight continuation 23:59:59 -> 00:00:00 on the next day.
*/
create window OpenActivityPoint#unique(driverKey, intervalId) as TachographActivityPointInputEvent;
insert into OpenActivityPoint
select *
from TachographActivityPointInputEvent(lifecycle = 'START');
@Priority(20)
on TachographActivityPointInputEvent(lifecycle = 'END') as endEvent
insert into TachographActivityIntervalInputEvent
select
startEvent.sessionId as sessionId,
startEvent.driverKey as driverKey,
startEvent.intervalId as intervalId,
startEvent.activityType as activityType,
startEvent.cardSlot as cardSlot,
startEvent.cardStatus as cardStatus,
startEvent.drivingStatus as drivingStatus,
startEvent.registrationKey as registrationKey,
startEvent.vehicleKey as vehicleKey,
startEvent.sourceKind as sourceKind,
startEvent.sourceRowId as firstSourceIntervalId,
endEvent.sourceRowId as lastSourceIntervalId,
startEvent.occurredAt as startedAt,
endEvent.occurredAt as endedAt,
startEvent.occurredAtEpochSecond as startedAtEpochSecond,
endEvent.occurredAtEpochSecond as endedAtEpochSecond,
endEvent.occurredAtEpochSecond - startEvent.occurredAtEpochSecond as durationSeconds,
startEvent.sourceRowIds as sourceIntervalIds,
startEvent.synthetic as synthetic,
startEvent.clippedToRequestedPeriod as clippedToRequestedPeriod,
startEvent.level as level
from OpenActivityPoint as startEvent
where startEvent.driverKey = endEvent.driverKey
and startEvent.intervalId = endEvent.intervalId
and endEvent.occurredAtEpochSecond > startEvent.occurredAtEpochSecond;
@Priority(10)
on TachographActivityPointInputEvent(lifecycle = 'END') as endEvent
delete from OpenActivityPoint as openEvent
where openEvent.driverKey = endEvent.driverKey
and openEvent.intervalId = endEvent.intervalId;
create schema RawVehicleUsageInterval(
sessionId java.util.UUID,
driverKey string,
intervalId string,
firstSourceIntervalId string,
lastSourceIntervalId string,
startedAt java.time.OffsetDateTime,
endedAt java.time.OffsetDateTime,
startedAtEpochSecond long,
endedAtEpochSecond java.lang.Long,
durationSeconds long,
odometerBeginKm java.lang.Long,
odometerEndKm java.lang.Long,
registrationKey string,
vehicleKey string,
sourceKind string,
sourceIntervalIds java.util.List
);
create window OpenVehicleUsagePoint#unique(driverKey, intervalId) as TachographVehicleUsagePointInputEvent;
insert into OpenVehicleUsagePoint
select *
from TachographVehicleUsagePointInputEvent(lifecycle = 'INSERT');
@Priority(20)
on TachographVehicleUsagePointInputEvent(lifecycle = 'WITHDRAW') as withdrawEvent
insert into RawVehicleUsageInterval
select
insertEvent.sessionId as sessionId,
insertEvent.driverKey as driverKey,
insertEvent.intervalId as intervalId,
insertEvent.sourceRowId as firstSourceIntervalId,
withdrawEvent.sourceRowId as lastSourceIntervalId,
insertEvent.occurredAt as startedAt,
withdrawEvent.occurredAt as endedAt,
insertEvent.occurredAtEpochSecond as startedAtEpochSecond,
withdrawEvent.occurredAtEpochSecond as endedAtEpochSecond,
withdrawEvent.occurredAtEpochSecond - insertEvent.occurredAtEpochSecond as durationSeconds,
insertEvent.odometerKm as odometerBeginKm,
withdrawEvent.odometerKm as odometerEndKm,
insertEvent.registrationKey as registrationKey,
insertEvent.vehicleKey as vehicleKey,
insertEvent.sourceKind as sourceKind,
insertEvent.sourceRowIds as sourceIntervalIds
from OpenVehicleUsagePoint as insertEvent
where insertEvent.driverKey = withdrawEvent.driverKey
and insertEvent.intervalId = withdrawEvent.intervalId
and withdrawEvent.occurredAtEpochSecond > insertEvent.occurredAtEpochSecond;
@Priority(10)
on TachographVehicleUsagePointInputEvent(lifecycle = 'WITHDRAW') as withdrawEvent
delete from OpenVehicleUsagePoint as openEvent
where openEvent.driverKey = withdrawEvent.driverKey
and openEvent.intervalId = withdrawEvent.intervalId;
create window MergedVehicleUsageAccumulator#unique(driverKey) as RawVehicleUsageInterval;
create window MergedVehicleUsageNext#unique(driverKey) as RawVehicleUsageInterval;
/*
* Case 1: first vehicle-usage interval for this driver. Start a new accumulator.
*/
@Priority(70)
on RawVehicleUsageInterval as next
insert into MergedVehicleUsageNext
select
next.sessionId as sessionId,
next.driverKey as driverKey,
next.intervalId as intervalId,
next.firstSourceIntervalId as firstSourceIntervalId,
next.lastSourceIntervalId as lastSourceIntervalId,
next.startedAt as startedAt,
next.endedAt as endedAt,
next.startedAtEpochSecond as startedAtEpochSecond,
next.endedAtEpochSecond as endedAtEpochSecond,
next.durationSeconds as durationSeconds,
next.odometerBeginKm as odometerBeginKm,
next.odometerEndKm as odometerEndKm,
next.registrationKey as registrationKey,
next.vehicleKey as vehicleKey,
next.sourceKind as sourceKind,
next.sourceIntervalIds as sourceIntervalIds
where not exists (
select * from MergedVehicleUsageAccumulator as current
where current.driverKey = next.driverKey
);
/*
* Case 2: same vehicle and registration, and the next interval starts at or before
* current.to + 1 second. This includes 23:59:59 -> 00:00:00. Keep one accumulated
* interval and extend its end/last-source/ending odometer.
*/
@Priority(70)
on RawVehicleUsageInterval as next
insert into MergedVehicleUsageNext
select
current.sessionId as sessionId,
current.driverKey as driverKey,
current.intervalId as intervalId,
current.firstSourceIntervalId as firstSourceIntervalId,
next.lastSourceIntervalId as lastSourceIntervalId,
current.startedAt as startedAt,
next.endedAt as endedAt,
current.startedAtEpochSecond as startedAtEpochSecond,
next.endedAtEpochSecond as endedAtEpochSecond,
next.endedAtEpochSecond - current.startedAtEpochSecond as durationSeconds,
current.odometerBeginKm as odometerBeginKm,
next.odometerEndKm as odometerEndKm,
current.registrationKey as registrationKey,
current.vehicleKey as vehicleKey,
current.sourceKind as sourceKind,
current.sourceIntervalIds as sourceIntervalIds
from MergedVehicleUsageAccumulator as current
where current.driverKey = next.driverKey
and current.endedAtEpochSecond is not null
and next.startedAtEpochSecond <= current.endedAtEpochSecond + 1L
and (
(current.registrationKey is null and next.registrationKey is null)
or (
current.registrationKey is not null
and next.registrationKey is not null
and current.registrationKey = next.registrationKey
)
)
and (
(current.vehicleKey is null and next.vehicleKey is null)
or (
current.vehicleKey is not null
and next.vehicleKey is not null
and current.vehicleKey = next.vehicleKey
)
);
/*
* Case 3a: next interval is not mergeable. Emit the accumulated interval to the old
* interval-based bundle.
*/
@Priority(70)
on RawVehicleUsageInterval as next
insert into TachographVehicleUsageIntervalInputEvent
select
current.sessionId as sessionId,
current.driverKey as driverKey,
current.intervalId as intervalId,
current.firstSourceIntervalId as firstSourceIntervalId,
current.lastSourceIntervalId as lastSourceIntervalId,
current.startedAt as startedAt,
current.endedAt as endedAt,
current.startedAtEpochSecond as startedAtEpochSecond,
current.endedAtEpochSecond as endedAtEpochSecond,
current.durationSeconds as durationSeconds,
current.odometerBeginKm as odometerBeginKm,
current.odometerEndKm as odometerEndKm,
current.registrationKey as registrationKey,
current.vehicleKey as vehicleKey,
current.sourceKind as sourceKind,
current.sourceIntervalIds as sourceIntervalIds
from MergedVehicleUsageAccumulator as current
where current.driverKey = next.driverKey
and not (
current.endedAtEpochSecond is not null
and next.startedAtEpochSecond <= current.endedAtEpochSecond + 1L
and (
(current.registrationKey is null and next.registrationKey is null)
or (
current.registrationKey is not null
and next.registrationKey is not null
and current.registrationKey = next.registrationKey
)
)
and (
(current.vehicleKey is null and next.vehicleKey is null)
or (
current.vehicleKey is not null
and next.vehicleKey is not null
and current.vehicleKey = next.vehicleKey
)
)
);
/*
* Case 3b: next interval is not mergeable. Start a new accumulator with it.
*/
@Priority(70)
on RawVehicleUsageInterval as next
insert into MergedVehicleUsageNext
select
next.sessionId as sessionId,
next.driverKey as driverKey,
next.intervalId as intervalId,
next.firstSourceIntervalId as firstSourceIntervalId,
next.lastSourceIntervalId as lastSourceIntervalId,
next.startedAt as startedAt,
next.endedAt as endedAt,
next.startedAtEpochSecond as startedAtEpochSecond,
next.endedAtEpochSecond as endedAtEpochSecond,
next.durationSeconds as durationSeconds,
next.odometerBeginKm as odometerBeginKm,
next.odometerEndKm as odometerEndKm,
next.registrationKey as registrationKey,
next.vehicleKey as vehicleKey,
next.sourceKind as sourceKind,
next.sourceIntervalIds as sourceIntervalIds
where exists (
select * from MergedVehicleUsageAccumulator as current
where current.driverKey = next.driverKey
and not (
current.endedAtEpochSecond is not null
and next.startedAtEpochSecond <= current.endedAtEpochSecond + 1L
and (
(current.registrationKey is null and next.registrationKey is null)
or (
current.registrationKey is not null
and next.registrationKey is not null
and current.registrationKey = next.registrationKey
)
)
and (
(current.vehicleKey is null and next.vehicleKey is null)
or (
current.vehicleKey is not null
and next.vehicleKey is not null
and current.vehicleKey = next.vehicleKey
)
)
)
);
@Priority(60)
on RawVehicleUsageInterval as next
delete from MergedVehicleUsageAccumulator as current
where current.driverKey = next.driverKey;
@Priority(50)
on RawVehicleUsageInterval as next
insert into MergedVehicleUsageAccumulator
select
candidate.sessionId as sessionId,
candidate.driverKey as driverKey,
candidate.intervalId as intervalId,
candidate.firstSourceIntervalId as firstSourceIntervalId,
candidate.lastSourceIntervalId as lastSourceIntervalId,
candidate.startedAt as startedAt,
candidate.endedAt as endedAt,
candidate.startedAtEpochSecond as startedAtEpochSecond,
candidate.endedAtEpochSecond as endedAtEpochSecond,
candidate.durationSeconds as durationSeconds,
candidate.odometerBeginKm as odometerBeginKm,
candidate.odometerEndKm as odometerEndKm,
candidate.registrationKey as registrationKey,
candidate.vehicleKey as vehicleKey,
candidate.sourceKind as sourceKind,
candidate.sourceIntervalIds as sourceIntervalIds
from MergedVehicleUsageNext as candidate
where candidate.driverKey = next.driverKey;
@Priority(40)
on RawVehicleUsageInterval as next
delete from MergedVehicleUsageNext as candidate
where candidate.driverKey = next.driverKey;
/*
* The last accumulated interval cannot be emitted by looking at the next interval, so Java
* sends one TachographProjectionFinalizeEvent per driver after all vehicle-usage point
* events and before activity point events.
*/
@Priority(20)
on TachographProjectionFinalizeEvent as finalizeEvent
insert into TachographVehicleUsageIntervalInputEvent
select
current.sessionId as sessionId,
current.driverKey as driverKey,
current.intervalId as intervalId,
current.firstSourceIntervalId as firstSourceIntervalId,
current.lastSourceIntervalId as lastSourceIntervalId,
current.startedAt as startedAt,
current.endedAt as endedAt,
current.startedAtEpochSecond as startedAtEpochSecond,
current.endedAtEpochSecond as endedAtEpochSecond,
current.durationSeconds as durationSeconds,
current.odometerBeginKm as odometerBeginKm,
current.odometerEndKm as odometerEndKm,
current.registrationKey as registrationKey,
current.vehicleKey as vehicleKey,
current.sourceKind as sourceKind,
current.sourceIntervalIds as sourceIntervalIds
from MergedVehicleUsageAccumulator as current
where current.driverKey = finalizeEvent.driverKey;
@Priority(10)
on TachographProjectionFinalizeEvent as finalizeEvent
delete from MergedVehicleUsageAccumulator as current
where current.driverKey = finalizeEvent.driverKey;

View File

@ -1,6 +1,8 @@
package at.procon.eventhub.tachographfilesession.service; package at.procon.eventhub.tachographfilesession.service;
import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.service.EventDetailsFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession; import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
@ -16,6 +18,7 @@ import at.procon.eventhub.tachographfilesession.model.TachographEsperPotentialHo
import at.procon.eventhub.tachographfilesession.model.TachographEsperVuCardAbsentIntervalEvent; import at.procon.eventhub.tachographfilesession.model.TachographEsperVuCardAbsentIntervalEvent;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession; import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.model.TachographFileSessionMetadata; import at.procon.eventhub.tachographfilesession.model.TachographFileSessionMetadata;
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
import java.time.Instant; import java.time.Instant;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
@ -299,4 +302,236 @@ class DriverTimelineReusableProjectionBuilderTest {
.extracting("eventId", "eventDomain") .extracting("eventId", "eventDomain")
.containsExactly("SUP-1", "POSITION"); .containsExactly("SUP-1", "POSITION");
} }
@Test
void derivesDrivingProjectionFromRawPointEvents() {
DriverExtractionSession driver = new DriverExtractionSession(
"12:123",
null,
null,
List.of(),
List.of(),
List.of(
new ExtractedCardVehicleUsageInterval(
"CVU-1",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
100L,
200L,
"12:REG-1",
"VIN-1",
"vu-1"
),
new ExtractedCardVehicleUsageInterval(
"CVU-2",
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
OffsetDateTime.parse("2026-05-01T12:00:00Z"),
200L,
250L,
"12:REG-1",
"VIN-1",
"vu-2"
),
new ExtractedCardVehicleUsageInterval(
"CVU-3",
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T02:00:00Z"),
251L,
300L,
"12:REG-2",
"VIN-2",
"vu-3"
)
),
List.of(
new ExtractedCardActivityInterval(
"ACT-1",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"a"
),
new ExtractedCardActivityInterval(
"ACT-2",
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:30:00Z"),
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-2",
"VIN-2",
"b"
)
),
List.of(),
List.of()
);
TachographFileSession session = new TachographFileSession(
UUID.randomUUID(),
new TachographFileSessionMetadata("default", "legalrequirements-drivercard", "sample", "sample.ddd", "a", 2, "42", "b", true, null),
Map.of(driver.driverKey(), driver),
new ExtractionStats(1, 2, 3, 1, 1, 0),
List.of(),
Instant.now(),
Instant.now().plus(4, ChronoUnit.HOURS)
);
IntervalBackedDriverTimelineEventBuilder intervalEventBuilder = new IntervalBackedDriverTimelineEventBuilder(
legacyBuilder,
new DriverKeyFactory(),
new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper())
);
RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(intervalEventBuilder);
TachographTimelineEventBundle rawPointEventBundle = rawSourceEventBuilder.buildRawEventBundle(session, driver);
assertThat(rawPointEventBundle.activityEvents()).hasSize(4);
assertThat(rawPointEventBundle.vehicleUsageEvents()).hasSize(6);
ResolvedDriverTimeline javaResolvedTimeline = legacyBuilder.build(session, driver);
TachographEsperDrivingDerivedProjectionBundle intervalBundle =
reusableBuilder.buildEsperDrivingDerivedProjectionBundle(
session.sessionId(),
driver.driverKey(),
javaResolvedTimeline,
3,
720
);
TachographEsperDrivingDerivedProjectionBundle eventBundle =
reusableBuilder.buildEsperDrivingDerivedProjectionBundleFromEvents(
session.sessionId(),
driver.driverKey(),
rawPointEventBundle.allEvents(),
3,
720
);
assertThat(eventBundle.drivingInterruptionIntervals())
.containsExactlyElementsOf(intervalBundle.drivingInterruptionIntervals());
assertThat(eventBundle.dailyWeeklyRestCandidateIntervals())
.containsExactlyElementsOf(intervalBundle.dailyWeeklyRestCandidateIntervals());
assertThat(eventBundle.drivingInterruptionVehicleChangeIntervals())
.containsExactlyElementsOf(intervalBundle.drivingInterruptionVehicleChangeIntervals());
assertThat(eventBundle.vuCardAbsentIntervals()).hasSize(1);
assertThat(eventBundle.vuCardAbsentIntervals().get(0).startedAt())
.isEqualTo(OffsetDateTime.parse("2026-05-01T12:00:01Z"));
assertThat(eventBundle.vuCardAbsentIntervals().get(0).endedAt())
.isEqualTo(OffsetDateTime.parse("2026-05-02T00:00:00Z"));
}
@Test
void eventModeMergesVehicleUsageIntervalsAcrossMidnightContinuation() {
DriverExtractionSession driver = new DriverExtractionSession(
"12:123",
null,
null,
List.of(),
List.of(),
List.of(
new ExtractedCardVehicleUsageInterval(
"CVU-1",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T23:59:59Z"),
100L,
240L,
"12:REG-1",
"VIN-1",
"vu-1"
),
new ExtractedCardVehicleUsageInterval(
"CVU-2",
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T02:00:00Z"),
240L,
280L,
"12:REG-1",
"VIN-1",
"vu-2"
)
),
List.of(
new ExtractedCardActivityInterval(
"ACT-1",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"a"
),
new ExtractedCardActivityInterval(
"ACT-2",
OffsetDateTime.parse("2026-05-02T00:30:00Z"),
OffsetDateTime.parse("2026-05-02T01:00:00Z"),
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"b"
)
),
List.of(),
List.of()
);
TachographFileSession session = new TachographFileSession(
UUID.randomUUID(),
new TachographFileSessionMetadata("default", "legalrequirements-drivercard", "sample", "sample.ddd", "a", 2, "42", "b", true, null),
Map.of(driver.driverKey(), driver),
new ExtractionStats(1, 2, 2, 1, 1, 0),
List.of(),
Instant.now(),
Instant.now().plus(4, ChronoUnit.HOURS)
);
IntervalBackedDriverTimelineEventBuilder intervalEventBuilder = new IntervalBackedDriverTimelineEventBuilder(
legacyBuilder,
new DriverKeyFactory(),
new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper())
);
RawSourceDriverTimelineEventBuilder rawSourceEventBuilder = new RawSourceDriverTimelineEventBuilder(intervalEventBuilder);
TachographTimelineEventBundle rawPointEventBundle = rawSourceEventBuilder.buildRawEventBundle(session, driver);
ResolvedDriverTimeline javaResolvedTimeline = legacyBuilder.build(session, driver);
TachographEsperDrivingDerivedProjectionBundle intervalBundle =
reusableBuilder.buildEsperDrivingDerivedProjectionBundle(
session.sessionId(),
driver.driverKey(),
javaResolvedTimeline,
3,
720
);
TachographEsperDrivingDerivedProjectionBundle eventBundle =
reusableBuilder.buildEsperDrivingDerivedProjectionBundleFromEvents(
session.sessionId(),
driver.driverKey(),
rawPointEventBundle.allEvents(),
3,
720
);
assertThat(javaResolvedTimeline.vehicleUsageIntervals()).hasSize(1);
assertThat(javaResolvedTimeline.vehicleUsageIntervals().get(0).from())
.isEqualTo(OffsetDateTime.parse("2026-05-01T08:00:00Z"));
assertThat(javaResolvedTimeline.vehicleUsageIntervals().get(0).to())
.isEqualTo(OffsetDateTime.parse("2026-05-02T02:00:00Z"));
assertThat(eventBundle.vuCardAbsentIntervals()).isEmpty();
assertThat(eventBundle.dailyWeeklyRestCandidateCoverageIntervals())
.containsExactlyElementsOf(intervalBundle.dailyWeeklyRestCandidateCoverageIntervals());
assertThat(eventBundle.potentialInVehicleOvernightStayIntervals())
.containsExactlyElementsOf(intervalBundle.potentialInVehicleOvernightStayIntervals());
}
} }