Propagate interval metadata through runtime pairing

This commit is contained in:
trifonovt 2026-06-05 12:10:55 +02:00
parent 6ba2df1a61
commit ff12953e05
12 changed files with 81 additions and 50 deletions

View File

@ -13,6 +13,7 @@ import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.service.RuntimeDriverVehicleEvidenceAttachmentService;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
@ -90,7 +91,7 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
);
for (EventHubEventDto attachedEvent : attachmentResult.attachedVehicleEvidenceEvents()) {
attachedVehicleEvidenceByEvent
.computeIfAbsent(dedupKey(attachedEvent), ignored -> new ArrayList<>())
.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(attachedEvent), ignored -> new ArrayList<>())
.add(driverKey);
}
RuntimeDriverPartitionDebugDto partitionDebug = includePartitionDebug ? attachmentResult.toPartitionDebug() : null;
@ -283,13 +284,6 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
return text == null || text.isBlank() ? null : text.trim();
}
private String dedupKey(EventHubEventDto event) {
String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null
? event.packageInfo().eventSource().stableKey()
: "NO_SOURCE";
return sourceKey + "|" + event.externalSourceEventId();
}
private boolean booleanAttribute(RuntimeProcessingModuleContext context, String key, boolean fallback) {
Object value = context.attributes().get(key);
if (value instanceof Boolean booleanValue) {

View File

@ -8,6 +8,7 @@ import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeMod
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleContext;
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleResult;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.OffsetDateTime;
@ -59,6 +60,7 @@ public final class DriverWorkingTimeEplEventMapper {
definition.put("driverKey", String.class);
definition.put("eventId", String.class);
definition.put("intervalId", String.class);
definition.put("runtimeIntervalKey", String.class);
definition.put("sourceRowId", String.class);
definition.put("sourceRowIds", java.util.List.class);
definition.put("activityType", String.class);
@ -109,6 +111,7 @@ public final class DriverWorkingTimeEplEventMapper {
definition.put("driverKey", String.class);
definition.put("eventId", String.class);
definition.put("intervalId", String.class);
definition.put("runtimeIntervalKey", String.class);
definition.put("sourceRowId", String.class);
definition.put("sourceRowIds", java.util.List.class);
definition.put("lifecycle", String.class);
@ -151,9 +154,10 @@ public final class DriverWorkingTimeEplEventMapper {
}
JsonNode raw = rawPayload(sourceEvent);
JsonNode attributes = attributes(sourceEvent);
String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId());
String intervalId = RuntimeEventIdentityResolver.presentationIntervalId(sourceEvent);
String runtimeIntervalKey = RuntimeEventIdentityResolver.runtimeIntervalKey(sourceEvent);
String driverKey = TachographRuntimeIdentityResolver.driverKey(sourceEvent);
if (driverKey == null || intervalId == null) {
if (driverKey == null || intervalId == null || runtimeIntervalKey == null) {
return null;
}
Map<String, Object> event = new LinkedHashMap<>();
@ -161,6 +165,7 @@ public final class DriverWorkingTimeEplEventMapper {
event.put("driverKey", driverKey);
event.put("eventId", sourceEvent.externalSourceEventId());
event.put("intervalId", intervalId);
event.put("runtimeIntervalKey", runtimeIntervalKey);
event.put("sourceRowId", firstNonBlank(text(raw, "sourceRowId"), intervalId));
event.put("sourceRowIds", stringList(raw, "sourceRowIds", intervalId));
event.put("activityType", firstNonBlank(text(raw, "activityType"), eventTypeAsActivity(sourceEvent.eventType())));
@ -192,9 +197,10 @@ public final class DriverWorkingTimeEplEventMapper {
return null;
}
JsonNode raw = rawPayload(sourceEvent);
String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), sourceEvent.externalSourceEventId());
String intervalId = RuntimeEventIdentityResolver.presentationIntervalId(sourceEvent);
String runtimeIntervalKey = RuntimeEventIdentityResolver.runtimeIntervalKey(sourceEvent);
String driverKey = TachographRuntimeIdentityResolver.driverKey(sourceEvent);
if (driverKey == null || intervalId == null) {
if (driverKey == null || intervalId == null || runtimeIntervalKey == null) {
return null;
}
Map<String, Object> event = new LinkedHashMap<>();
@ -202,6 +208,7 @@ public final class DriverWorkingTimeEplEventMapper {
event.put("driverKey", driverKey);
event.put("eventId", sourceEvent.externalSourceEventId());
event.put("intervalId", intervalId);
event.put("runtimeIntervalKey", runtimeIntervalKey);
event.put("sourceRowId", firstNonBlank(text(raw, "sourceRowId"), intervalId));
event.put("sourceRowIds", stringList(raw, "sourceRowIds", intervalId));
event.put("lifecycle", sourceEvent.lifecycle().name());
@ -219,7 +226,7 @@ public final class DriverWorkingTimeEplEventMapper {
.comparing((Map<String, Object> event) -> (Long) event.get("occurredAtEpochSecond"))
.thenComparing(event -> lifecycleOrder(Objects.toString(event.get("lifecycle"), "")))
.thenComparing(event -> Objects.toString(event.get("driverKey"), ""))
.thenComparing(event -> Objects.toString(event.get("intervalId"), ""))
.thenComparing(event -> Objects.toString(event.get("runtimeIntervalKey"), ""))
.thenComparing(event -> Objects.toString(event.get("eventId"), ""));
}

View File

@ -8,6 +8,7 @@ import at.procon.eventhub.processing.dto.RuntimeVehicleUsageIntervalDebugDto;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventScopeClassifier;
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventScopeType;
import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline;
import at.procon.eventhub.tachographfilesession.model.ResolvedVehicleUsageInterval;
@ -220,7 +221,7 @@ public class RuntimeDriverVehicleEvidenceAttachmentService {
return new RuntimeVehicleEvidenceAttachmentDecisionDto(
decision,
reason,
dedupKey(event),
RuntimeEventIdentityResolver.canonicalEventKey(event),
event == null ? null : event.externalSourceEventId(),
event == null ? null : event.occurredAt(),
event == null || event.eventDomain() == null ? null : event.eventDomain().name(),
@ -460,17 +461,10 @@ public class RuntimeDriverVehicleEvidenceAttachmentService {
private void appendDeduplicated(LinkedHashMap<String, EventHubEventDto> byKey, List<EventHubEventDto> events) {
for (EventHubEventDto event : events == null ? List.<EventHubEventDto>of() : events) {
byKey.putIfAbsent(dedupKey(event), event);
byKey.putIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), event);
}
}
private String dedupKey(EventHubEventDto event) {
String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null
? event.packageInfo().eventSource().stableKey()
: "NO_SOURCE";
return sourceKey + "|" + event.externalSourceEventId();
}
private List<EventHubEventDto> sort(List<EventHubEventDto> events) {
return (events == null ? List.<EventHubEventDto>of() : events).stream()
.sorted(Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))

View File

@ -10,6 +10,7 @@ import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
@ -66,7 +67,7 @@ public class RuntimeDriverWorkingTimeScopeProcessingService {
}
for (EventHubEventDto attachedEvent : driverBundle.expandedVehicleEvents()) {
attachedVehicleEvidenceByEvent
.computeIfAbsent(dedupKey(attachedEvent), ignored -> new ArrayList<>())
.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(attachedEvent), ignored -> new ArrayList<>())
.add(driverKey);
}
driverBundle.notes().stream()
@ -260,17 +261,10 @@ public class RuntimeDriverWorkingTimeScopeProcessingService {
private void appendDeduplicated(LinkedHashMap<String, EventHubEventDto> byKey, List<EventHubEventDto> events) {
for (EventHubEventDto event : events) {
byKey.putIfAbsent(dedupKey(event), event);
byKey.putIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), event);
}
}
private String dedupKey(EventHubEventDto event) {
String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null
? event.packageInfo().eventSource().stableKey()
: "NO_SOURCE";
return sourceKey + "|" + event.externalSourceEventId();
}
private List<EventHubEventDto> sort(List<EventHubEventDto> events) {
return (events == null ? List.<EventHubEventDto>of() : events).stream()
.sorted(Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))

View File

@ -3,6 +3,7 @@ package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
import java.time.OffsetDateTime;
@ -83,13 +84,7 @@ final class RuntimeIntervalEventWindowSelector {
}
private static String intervalKey(EventHubEventDto event) {
JsonNode raw = raw(event);
String intervalId = text(raw, "intervalId");
if (intervalId != null) {
return intervalId;
}
String sourceRowId = text(raw, "sourceRowId");
return sourceRowId != null ? sourceRowId : event.externalSourceEventId();
return RuntimeEventIdentityResolver.runtimeIntervalKey(event);
}
private static JsonNode raw(EventHubEventDto event) {

View File

@ -4,6 +4,7 @@ import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.TachographRuntimeIdentityResolver;
import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent;
import at.procon.eventhub.tachographfilesession.model.ExtractionWarning;
@ -68,12 +69,13 @@ public class UnifiedEventTimelineReconstructor {
continue;
}
JsonNode raw = raw(event);
String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), event.externalSourceEventId());
if (intervalId == null) {
String runtimeIntervalKey = RuntimeEventIdentityResolver.runtimeIntervalKey(event);
String intervalId = RuntimeEventIdentityResolver.presentationIntervalId(event);
if (runtimeIntervalKey == null || intervalId == null) {
continue;
}
ActivityAccumulator accumulator = byIntervalId.computeIfAbsent(
intervalId,
runtimeIntervalKey,
ignored -> new ActivityAccumulator(intervalId)
);
accumulator.accept(event, raw);
@ -101,12 +103,13 @@ public class UnifiedEventTimelineReconstructor {
continue;
}
JsonNode raw = raw(event);
String intervalId = firstNonBlank(text(raw, "intervalId"), text(raw, "sourceRowId"), event.externalSourceEventId());
if (intervalId == null) {
String runtimeIntervalKey = RuntimeEventIdentityResolver.runtimeIntervalKey(event);
String intervalId = RuntimeEventIdentityResolver.presentationIntervalId(event);
if (runtimeIntervalKey == null || intervalId == null) {
continue;
}
VehicleUsageAccumulator accumulator = byIntervalId.computeIfAbsent(
intervalId,
runtimeIntervalKey,
ignored -> new VehicleUsageAccumulator(sessionId, driverKey, intervalId)
);
accumulator.accept(event, raw);

View File

@ -10,6 +10,10 @@ import at.procon.eventhub.importing.extraction.ExtractionContext;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -34,6 +38,8 @@ final class TachographRawPayloadSupport {
put(raw, "sourceRowIds", List.of(sourceRowId));
put(raw, "intervalId", intervalId(context, sourceRowId));
}
put(raw, "startedAt", offsetDateTimeText(rs, "interval_started_at"));
put(raw, "endedAt", offsetDateTimeText(rs, "interval_ended_at"));
put(raw, "sourceKind", context == null || context.planItem() == null ? null : context.planItem().sourceKind());
put(raw, "extractionCode", context == null || context.planItem() == null ? null : context.planItem().extractionCode());
put(raw, "level", "RAW_INTERVAL");
@ -135,6 +141,36 @@ final class TachographRawPayloadSupport {
return Long.parseLong(value.toString());
}
private static String offsetDateTimeText(ResultSet rs, String column) throws SQLException {
Object value;
try {
value = rs.getObject(column);
} catch (SQLException ex) {
if (missingColumn(ex)) {
return null;
}
throw ex;
}
if (value == null) {
return null;
}
if (value instanceof OffsetDateTime offsetDateTime) {
return offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC).toString();
}
if (value instanceof Timestamp timestamp) {
return timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC).toString();
}
if (value instanceof LocalDateTime localDateTime) {
return localDateTime.atOffset(ZoneOffset.UTC).toString();
}
String text = value.toString();
try {
return OffsetDateTime.parse(text).withOffsetSameInstant(ZoneOffset.UTC).toString();
} catch (RuntimeException ignored) {
return LocalDateTime.parse(text).atOffset(ZoneOffset.UTC).toString();
}
}
private static boolean missingColumn(SQLException ex) {
String state = ex.getSQLState();
String message = ex.getMessage();

View File

@ -27,7 +27,7 @@ create schema DriverActivityIntervalEvent(
level string
);
create window OpenDriverActivityPoint#unique(driverKey, intervalId) as DriverActivityPointEvent;
create window OpenDriverActivityPoint#unique(driverKey, runtimeIntervalKey) as DriverActivityPointEvent;
insert into OpenDriverActivityPoint
select *
from DriverActivityPointEvent(lifecycle = 'START');
@ -59,14 +59,14 @@ select
startEvent.level as level
from OpenDriverActivityPoint as startEvent
where startEvent.driverKey = endEvent.driverKey
and startEvent.intervalId = endEvent.intervalId
and startEvent.runtimeIntervalKey = endEvent.runtimeIntervalKey
and endEvent.occurredAtEpochSecond > startEvent.occurredAtEpochSecond;
@Priority(10)
on DriverActivityPointEvent(lifecycle = 'END') as endEvent
delete from OpenDriverActivityPoint as openEvent
where openEvent.driverKey = endEvent.driverKey
and openEvent.intervalId = endEvent.intervalId;
and openEvent.runtimeIntervalKey = endEvent.runtimeIntervalKey;
@name('driverActivityIntervals')
select *

View File

@ -22,7 +22,7 @@ create schema DriverVehicleUsageIntervalEvent(
sourceIntervalIds java.util.List
);
create window OpenDriverVehicleUsagePoint#unique(driverKey, intervalId) as DriverVehicleUsagePointEvent;
create window OpenDriverVehicleUsagePoint#unique(driverKey, runtimeIntervalKey) as DriverVehicleUsagePointEvent;
insert into OpenDriverVehicleUsagePoint
select *
@ -50,14 +50,14 @@ select
insertEvent.sourceRowIds as sourceIntervalIds
from OpenDriverVehicleUsagePoint as insertEvent
where insertEvent.driverKey = withdrawEvent.driverKey
and insertEvent.intervalId = withdrawEvent.intervalId
and insertEvent.runtimeIntervalKey = withdrawEvent.runtimeIntervalKey
and withdrawEvent.occurredAtEpochSecond >= insertEvent.occurredAtEpochSecond;
@Priority(10)
on DriverVehicleUsagePointEvent(lifecycle = 'WITHDRAW') as withdrawEvent
delete from OpenDriverVehicleUsagePoint as openEvent
where openEvent.driverKey = withdrawEvent.driverKey
and openEvent.intervalId = withdrawEvent.intervalId;
and openEvent.runtimeIntervalKey = withdrawEvent.runtimeIntervalKey;
@name('driverVehicleUsageIntervals')
select *

View File

@ -105,6 +105,8 @@ select
concat('TACHOGRAPH:CARD_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id,
evt.occurred_at as occurred_at,
base.BeginTime as interval_started_at,
base.EndTime as interval_ended_at,
base.received_partner_at,
base.Activity as activity_code,
case upper(coalesce(base.Activity, ''))

View File

@ -61,6 +61,8 @@ Base as (
used.vehicle_vin,
used.vehicle_registration_nation,
used.vehicle_registration_number,
used.FirstUse,
used.LastUse,
evt.lifecycle,
evt.occurred_at,
evt.odometer_m,
@ -87,6 +89,8 @@ select
concat('TACHOGRAPH:CARD_VEHICLES_USED:', base.ID, ':', base.lifecycle) as external_source_event_id,
base.occurred_at,
case when base.lifecycle = 'INSERT' then base.occurred_at else base.FirstUse end as interval_started_at,
case when base.lifecycle = 'WITHDRAW' then base.occurred_at else base.LastUse end as interval_ended_at,
base.received_partner_at,
case base.lifecycle
when 'INSERT' then 'CARD_INSERTED'

View File

@ -131,6 +131,8 @@ select
concat('TACHOGRAPH:VU_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id,
evt.occurred_at as occurred_at,
base.BeginTime as interval_started_at,
base.EndTime as interval_ended_at,
base.received_partner_at,
base.Activity as activity_code,
case upper(coalesce(base.Activity, ''))