Add configurable Esper activity pipeline and driver identity model

This commit is contained in:
trifonovt 2026-05-06 09:02:38 +02:00
parent 094007d817
commit 818009555a
25 changed files with 2197 additions and 179 deletions

View File

@ -1,5 +1,7 @@
package at.procon.eventhub.config;
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
@ -26,6 +28,7 @@ public class EventHubProperties {
private final Batch batch = new Batch();
private final Tachograph tachograph = new Tachograph();
private final EsperPoc esperPoc = new EsperPoc();
private final YellowFox yellowFox = new YellowFox();
public Batch getBatch() {
@ -36,10 +39,35 @@ public class EventHubProperties {
return tachograph;
}
public EsperPoc getEsperPoc() {
return esperPoc;
}
public YellowFox getYellowFox() {
return yellowFox;
}
public static class EsperPoc {
private EsperActivityMergeMode activityMergeMode = EsperActivityMergeMode.JAVA;
private EsperShiftResolutionMode shiftResolutionMode = EsperShiftResolutionMode.JAVA;
public EsperActivityMergeMode getActivityMergeMode() {
return activityMergeMode;
}
public void setActivityMergeMode(EsperActivityMergeMode activityMergeMode) {
this.activityMergeMode = activityMergeMode == null ? EsperActivityMergeMode.JAVA : activityMergeMode;
}
public EsperShiftResolutionMode getShiftResolutionMode() {
return shiftResolutionMode;
}
public void setShiftResolutionMode(EsperShiftResolutionMode shiftResolutionMode) {
this.shiftResolutionMode = shiftResolutionMode == null ? EsperShiftResolutionMode.JAVA : shiftResolutionMode;
}
}
public static class Batch {
/** Number of events collected before a package is persisted. */
private int completionSize = 5000;

View File

@ -1,7 +1,9 @@
package at.procon.eventhub.esperpoc.api;
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
import at.procon.eventhub.esperpoc.dto.EsperPocResultDto;
import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode;
import at.procon.eventhub.esperpoc.service.EsperPocDriverCardActivityService;
import java.time.OffsetDateTime;
import java.util.UUID;
@ -31,7 +33,11 @@ public class EsperPocController {
@RequestParam(defaultValue = "24") Integer guardHours,
@RequestParam(defaultValue = "3") Integer significantDrivingMinutes,
@RequestParam(defaultValue = "60") Integer mergeGapSeconds,
@RequestParam(defaultValue = "7") Integer operatingPeriodSplitRestHours
@RequestParam(defaultValue = "7") Integer operatingPeriodSplitRestHours,
@RequestParam(defaultValue = "420") Integer shiftEndMarkPeriodMinutes,
@RequestParam(defaultValue = "5") Integer absenceBeginEndMinActivityMinutes,
@RequestParam(required = false) EsperActivityMergeMode activityMergeMode,
@RequestParam(required = false) EsperShiftResolutionMode shiftResolutionMode
) {
EsperPocRequest request = new EsperPocRequest(
tenantKey,
@ -41,7 +47,11 @@ public class EsperPocController {
guardHours,
significantDrivingMinutes,
mergeGapSeconds,
operatingPeriodSplitRestHours
operatingPeriodSplitRestHours,
shiftEndMarkPeriodMinutes,
absenceBeginEndMinActivityMinutes,
activityMergeMode,
shiftResolutionMode
);
return ResponseEntity.ok(service.evaluate(request));
}

View File

@ -11,6 +11,9 @@ public record ActivityIntervalDto(
UUID vehicleRegistrationId,
String activityType,
String cardSlot,
String cardStatus,
String drivingStatus,
String sourceKind,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
@ -25,6 +28,9 @@ public record ActivityIntervalDto(
UUID vehicleRegistrationId,
String activityType,
String cardSlot,
String cardStatus,
String drivingStatus,
String sourceKind,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
String sourceRowId
@ -35,6 +41,9 @@ public record ActivityIntervalDto(
vehicleRegistrationId,
activityType,
cardSlot,
cardStatus,
drivingStatus,
sourceKind,
startedAt,
endedAt,
Duration.between(startedAt, endedAt).getSeconds(),
@ -52,6 +61,9 @@ public record ActivityIntervalDto(
vehicleRegistrationId,
activityType,
cardSlot,
cardStatus,
drivingStatus,
sourceKind,
newStartedAt,
newEndedAt,
Duration.between(newStartedAt, newEndedAt).getSeconds(),
@ -69,6 +81,9 @@ public record ActivityIntervalDto(
vehicleRegistrationId,
activityType,
cardSlot,
cardStatus,
drivingStatus,
sourceKind,
startedAt,
endedAt,
durationSeconds,

View File

@ -0,0 +1,6 @@
package at.procon.eventhub.esperpoc.dto;
public enum EsperActivityMergeMode {
JAVA,
ESPER
}

View File

@ -13,7 +13,11 @@ public record EsperPocRequest(
Integer guardHours,
Integer significantDrivingMinutes,
Integer mergeGapSeconds,
Integer operatingPeriodSplitRestHours
Integer operatingPeriodSplitRestHours,
Integer shiftEndMarkPeriodMinutes,
Integer absenceBeginEndMinActivityMinutes,
EsperActivityMergeMode activityMergeMode,
EsperShiftResolutionMode shiftResolutionMode
) {
public EsperPocRequest {
if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) {
@ -23,5 +27,7 @@ public record EsperPocRequest(
significantDrivingMinutes = significantDrivingMinutes == null ? 3 : Math.max(1, significantDrivingMinutes);
mergeGapSeconds = mergeGapSeconds == null ? 60 : Math.max(0, mergeGapSeconds);
operatingPeriodSplitRestHours = operatingPeriodSplitRestHours == null ? 7 : Math.max(1, operatingPeriodSplitRestHours);
shiftEndMarkPeriodMinutes = shiftEndMarkPeriodMinutes == null ? 420 : Math.max(1, shiftEndMarkPeriodMinutes);
absenceBeginEndMinActivityMinutes = absenceBeginEndMinActivityMinutes == null ? 5 : Math.max(1, absenceBeginEndMinActivityMinutes);
}
}

View File

@ -12,14 +12,24 @@ public record EsperPocResultDto(
OffsetDateTime loadedFrom,
OffsetDateTime loadedTo,
int rawEventCount,
int driverCardRawEventCount,
int vehicleUnitRawEventCount,
int driverCardRawIntervalCount,
int vehicleUnitRawIntervalCount,
int rawIntervalCount,
int mergedActivityCount,
int operatingTimePeriodCount,
int resolvedWorkShiftCount,
int operatingPeriodSplitRestHours,
int shiftEndMarkPeriodMinutes,
int absenceBeginEndMinActivityMinutes,
EsperActivityMergeMode activityMergeMode,
EsperShiftResolutionMode shiftResolutionMode,
List<RawActivityEventDto> raw,
List<ActivityIntervalDto> rawIntervals,
List<ActivityIntervalDto> activities,
List<OperatingTimePeriodDto> operatingTimePeriods,
List<ResolvedWorkShiftDto> workingShifts,
DriverWorkSummaryDto workResultPerDriver,
DriverWorkSummaryDto workingOperationTimesPerEmployee,
ShiftDrivingEvaluationDto drivingTimeInterruptionEvaluation,

View File

@ -0,0 +1,6 @@
package at.procon.eventhub.esperpoc.dto;
public enum EsperShiftResolutionMode {
JAVA,
ESPER
}

View File

@ -8,6 +8,8 @@ public record RawActivityEventDto(
OffsetDateTime occurredAt,
String sourceRowId,
String externalSourceEventId,
String sourceKind,
String extractionCode,
UUID driverEntityId,
UUID vehicleId,
UUID vehicleRegistrationId,

View File

@ -0,0 +1,40 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
public record ResolvedWorkShiftDto(
int sequenceNumber,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
OffsetDateTime nextShiftStartedAt,
long durationSeconds,
long dailyRestingTimeSeconds,
OffsetDateTime drivingFrom,
OffsetDateTime drivingTo,
OffsetDateTime absenceFrom,
OffsetDateTime absenceTo,
List<UUID> vehicleIds,
List<UUID> vehicleRegistrationIds,
String usedDataKind,
String shiftKind,
boolean beginTimeDeviationInRange,
boolean endTimeDeviationInRange,
boolean noDrivingActivity,
boolean dailyRestingTimeBeforeShiftOver24Hours,
boolean durationOver24Hours,
boolean dailyRestingTimeBetween9And11Hours,
boolean dailyRestingTimeBetween7And9Hours,
int activitiesCount,
int drivingCount,
int availabilityCount,
int workCount,
int breakRestCount,
int unknownCount,
int workingTimeCount,
DriverWorkSummaryDto workingOperationTimes,
ShiftDrivingEvaluationDto drivingTimeInterruptionEvaluation,
List<ActivityIntervalDto> activities
) {
}

View File

@ -16,7 +16,7 @@ public class EsperPocActivityRepository {
this.jdbcTemplate = jdbcTemplate;
}
public List<RawActivityEventDto> findDriverCardActivityEvents(
public List<RawActivityEventDto> findDriverActivityEvents(
String tenantKey,
UUID driverEntityId,
OffsetDateTime occurredFrom,
@ -32,7 +32,14 @@ public class EsperPocActivityRepository {
regexp_replace(event.external_source_event_id, ':(START|END)$', '')
) as source_row_id,
event.external_source_event_id,
event.driver_entity_id,
source.source_kind,
coalesce(pkg.extraction_code,
case
when source.source_kind = 'VEHICLE_UNIT' then 'VU_ACTIVITY'
else 'CARD_ACTIVITY'
end
) as extraction_code,
event.driver_id,
event.vehicle_id,
event.vehicle_registration_id,
event.event_type,
@ -49,9 +56,12 @@ public class EsperPocActivityRepository {
and detail.detail_type = 'DRIVER_ACTIVITY'
where pkg.tenant_key = ?
and source.provider_key = 'TACHOGRAPH'
and source.source_kind = 'DRIVER_CARD'
and coalesce(pkg.extraction_code, 'CARD_ACTIVITY') = 'CARD_ACTIVITY'
and event.driver_entity_id = ?
and (
(source.source_kind = 'DRIVER_CARD' and coalesce(pkg.extraction_code, 'CARD_ACTIVITY') = 'CARD_ACTIVITY')
or
(source.source_kind = 'VEHICLE_UNIT' and coalesce(pkg.extraction_code, 'VU_ACTIVITY') = 'VU_ACTIVITY')
)
and event.driver_id = ?
and event.occurred_at >= ?
and event.occurred_at < ?
and event.event_domain = 'DRIVER_ACTIVITY'
@ -64,7 +74,9 @@ public class EsperPocActivityRepository {
rs.getObject("occurred_at", OffsetDateTime.class),
rs.getString("source_row_id"),
rs.getString("external_source_event_id"),
(UUID) rs.getObject("driver_entity_id"),
rs.getString("source_kind"),
rs.getString("extraction_code"),
(UUID) rs.getObject("driver_id"),
(UUID) rs.getObject("vehicle_id"),
(UUID) rs.getObject("vehicle_registration_id"),
rs.getString("event_type"),

View File

@ -0,0 +1,24 @@
package at.procon.eventhub.esperpoc.service;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
public record EsperActivityIntervalEvent(
UUID driverEntityId,
UUID vehicleId,
UUID vehicleRegistrationId,
String activityType,
String cardSlot,
String cardStatus,
String drivingStatus,
String sourceKind,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
String sourceRowId,
List<String> sourceRowIds,
boolean clippedToRequestedPeriod,
String level
) {
}

View File

@ -0,0 +1,54 @@
package at.procon.eventhub.esperpoc.service;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import java.time.Duration;
import java.time.LocalDate;
import java.util.Objects;
import java.util.Set;
final class EsperActivitySemantics {
private static final Set<String> ACTIVE_ACTIVITY_TYPES = Set.of("DRIVE", "WORK", "AVAILABILITY");
private EsperActivitySemantics() {
}
static boolean canMerge(ActivityIntervalDto left, ActivityIntervalDto right, Duration tolerance) {
boolean sameDriver = Objects.equals(left.driverEntityId(), right.driverEntityId());
boolean sameActivity = Objects.equals(left.activityType(), right.activityType());
boolean sameSlot = Objects.equals(left.cardSlot(), right.cardSlot());
boolean sameCardStatus = Objects.equals(left.cardStatus(), right.cardStatus());
boolean sameDrivingStatus = Objects.equals(left.drivingStatus(), right.drivingStatus());
boolean sameSourceKind = Objects.equals(left.sourceKind(), right.sourceKind());
long gapSeconds = Duration.between(left.endedAt(), right.startedAt()).getSeconds();
boolean adjacentOrOverlapping = gapSeconds <= tolerance.getSeconds();
return sameDriver && sameActivity && sameSlot && sameCardStatus && sameDrivingStatus && sameSourceKind && adjacentOrOverlapping;
}
static boolean isShiftActivity(ActivityIntervalDto interval) {
return isKnownActivity(interval) && ACTIVE_ACTIVITY_TYPES.contains(interval.activityType());
}
static boolean isRestOrUnknown(ActivityIntervalDto interval) {
return ("BREAK_REST".equals(interval.activityType()) && isKnownActivity(interval))
|| isUnknownActivity(interval);
}
static boolean isKnownActivity(ActivityIntervalDto interval) {
return interval.cardStatus() == null
|| !"NOT_INSERTED".equals(interval.cardStatus())
|| "KNOWN".equals(interval.drivingStatus());
}
static boolean isUnknownActivity(ActivityIntervalDto interval) {
if ("UNKNOWN_ACTIVITY".equals(interval.activityType())) {
return true;
}
return "UNKNOWN".equals(interval.drivingStatus())
|| ("NOT_INSERTED".equals(interval.cardStatus()) && !"KNOWN".equals(interval.drivingStatus()));
}
static LocalDate recordDate(ActivityIntervalDto interval) {
return interval.startedAt().toLocalDate();
}
}

View File

@ -12,12 +12,14 @@ import com.espertech.esper.runtime.client.EPDeployException;
import com.espertech.esper.runtime.client.EPDeployment;
import com.espertech.esper.runtime.client.EPRuntime;
import com.espertech.esper.runtime.client.EPRuntimeProvider;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.springframework.stereotype.Component;
@Component
@ -25,7 +27,7 @@ public class EsperDriverActivityEngine {
private static final AtomicLong RUNTIME_COUNTER = new AtomicLong();
private static final String EPL = """
private static final String INTERVAL_EPL = """
@name('driverCardActivityIntervals')
select
s.driverEntityId as driverEntityId,
@ -33,6 +35,9 @@ public class EsperDriverActivityEngine {
s.vehicleRegistrationId as vehicleRegistrationId,
s.eventType as activityType,
s.cardSlot as cardSlot,
s.cardStatus as cardStatus,
s.drivingStatus as drivingStatus,
s.sourceKind as sourceKind,
s.occurredAt as startedAt,
e.occurredAt as endedAt,
s.sourceRowId as sourceRowId
@ -48,26 +53,23 @@ public class EsperDriverActivityEngine {
]
""";
private static final String INTERVAL_STREAM_EPL = """
@name('driverActivityIntervalStream')
select * from DriverActivityInterval
""";
public List<ActivityIntervalDto> buildIntervals(List<RawActivityEventDto> rawEvents) {
if (rawEvents == null || rawEvents.isEmpty()) {
return List.of();
}
List<ActivityIntervalDto> intervals = new ArrayList<>();
EPRuntime runtime = null;
try {
Configuration configuration = new Configuration();
configuration.getCommon().addEventType("RawDriverActivityPoint", EsperRawDriverActivityPoint.class);
String runtimeUri = "eventhub-esper-poc-" + RUNTIME_COUNTER.incrementAndGet();
runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration);
CompilerArguments arguments = new CompilerArguments(configuration);
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(EPL, arguments);
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
runtime.getDeploymentService()
.getStatement(deployment.getDeploymentId(), "driverCardActivityIntervals")
.addListener((newData, oldData, statement, rt) -> collectIntervals(newData, intervals));
executeWithRuntime(
configuration -> configuration.getCommon().addEventType("RawDriverActivityPoint", EsperRawDriverActivityPoint.class),
INTERVAL_EPL,
"driverCardActivityIntervals",
newData -> collectIntervals(newData, intervals),
runtime -> {
List<EsperRawDriverActivityPoint> points = rawEvents.stream()
.sorted(Comparator.comparing(RawActivityEventDto::occurredAt).thenComparing(RawActivityEventDto::lifecycle))
.map(this::toEsperPoint)
@ -75,11 +77,80 @@ public class EsperDriverActivityEngine {
for (EsperRawDriverActivityPoint point : points) {
runtime.getEventService().sendEventBean(point, "RawDriverActivityPoint");
}
}
);
return intervals.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
}
public List<ActivityIntervalDto> mergeConsecutiveIdenticalActivities(
List<ActivityIntervalDto> intervals,
Duration mergeGapTolerance
) {
List<ActivityIntervalDto> sorted = sortedPositiveIntervals(intervals);
if (sorted.isEmpty()) {
return List.of();
}
MergedActivityCollector collector = new MergedActivityCollector(mergeGapTolerance);
executeIntervalStream(sorted, collector::accept);
return collector.finish();
}
public List<EsperResolvedShiftSpan> resolveShiftSpans(
List<ActivityIntervalDto> intervals,
Duration shiftEndThreshold,
Duration contiguousGapTolerance
) {
List<ActivityIntervalDto> sorted = sortedPositiveIntervals(intervals);
if (sorted.isEmpty()) {
return List.of();
}
ShiftSpanCollector collector = new ShiftSpanCollector(shiftEndThreshold, contiguousGapTolerance);
executeIntervalStream(sorted, collector::accept);
return collector.finish();
}
private void executeIntervalStream(List<ActivityIntervalDto> intervals, Consumer<ActivityIntervalDto> consumer) {
executeWithRuntime(
configuration -> configuration.getCommon().addEventType("DriverActivityInterval", EsperActivityIntervalEvent.class),
INTERVAL_STREAM_EPL,
"driverActivityIntervalStream",
newData -> collectInputIntervals(newData, consumer),
runtime -> {
for (ActivityIntervalDto interval : intervals) {
runtime.getEventService().sendEventBean(toEsperInterval(interval), "DriverActivityInterval");
}
}
);
}
private void executeWithRuntime(
Consumer<Configuration> configurationSetup,
String epl,
String statementName,
Consumer<EventBean[]> listener,
Consumer<EPRuntime> sender
) {
EPRuntime runtime = null;
try {
Configuration configuration = new Configuration();
configurationSetup.accept(configuration);
String runtimeUri = "eventhub-esper-poc-" + RUNTIME_COUNTER.incrementAndGet();
runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration);
CompilerArguments arguments = new CompilerArguments(configuration);
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments);
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
runtime.getDeploymentService()
.getStatement(deployment.getDeploymentId(), statementName)
.addListener((newData, oldData, statement, rt) -> listener.accept(newData));
sender.accept(runtime);
} catch (EPCompileException | EPDeployException e) {
throw new IllegalStateException("Cannot compile/deploy Esper PoC EPL", e);
} finally {
@ -102,6 +173,9 @@ public class EsperDriverActivityEngine {
(UUID) event.get("vehicleRegistrationId"),
(String) event.get("activityType"),
(String) event.get("cardSlot"),
(String) event.get("cardStatus"),
(String) event.get("drivingStatus"),
(String) event.get("sourceKind"),
startedAt,
endedAt,
(String) event.get("sourceRowId")
@ -109,6 +183,32 @@ public class EsperDriverActivityEngine {
}
}
private void collectInputIntervals(EventBean[] newData, Consumer<ActivityIntervalDto> consumer) {
if (newData == null) {
return;
}
for (EventBean event : newData) {
EsperActivityIntervalEvent interval = (EsperActivityIntervalEvent) event.getUnderlying();
consumer.accept(new ActivityIntervalDto(
interval.driverEntityId(),
interval.vehicleId(),
interval.vehicleRegistrationId(),
interval.activityType(),
interval.cardSlot(),
interval.cardStatus(),
interval.drivingStatus(),
interval.sourceKind(),
interval.startedAt(),
interval.endedAt(),
interval.durationSeconds(),
interval.sourceRowId(),
interval.sourceRowIds(),
interval.clippedToRequestedPeriod(),
interval.level()
));
}
}
private EsperRawDriverActivityPoint toEsperPoint(RawActivityEventDto event) {
return new EsperRawDriverActivityPoint(
event.eventId(),
@ -120,7 +220,186 @@ public class EsperDriverActivityEngine {
event.vehicleRegistrationId(),
event.activityType(),
event.lifecycle(),
event.cardSlot()
event.cardSlot(),
event.cardStatus(),
event.drivingStatus(),
event.sourceKind()
);
}
private EsperActivityIntervalEvent toEsperInterval(ActivityIntervalDto interval) {
return new EsperActivityIntervalEvent(
interval.driverEntityId(),
interval.vehicleId(),
interval.vehicleRegistrationId(),
interval.activityType(),
interval.cardSlot(),
interval.cardStatus(),
interval.drivingStatus(),
interval.sourceKind(),
interval.startedAt(),
interval.endedAt(),
interval.durationSeconds(),
interval.sourceRowId(),
interval.sourceRowIds(),
interval.clippedToRequestedPeriod(),
interval.level()
);
}
private List<ActivityIntervalDto> sortedPositiveIntervals(List<ActivityIntervalDto> intervals) {
if (intervals == null || intervals.isEmpty()) {
return List.of();
}
return intervals.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt)
.thenComparing(ActivityIntervalDto::endedAt)
.thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo)))
.toList();
}
private static final class MergedActivityCollector {
private final Duration mergeGapTolerance;
private final List<ActivityIntervalDto> merged = new ArrayList<>();
private ActivityIntervalDto current;
private List<String> currentSources = List.of();
private MergedActivityCollector(Duration mergeGapTolerance) {
this.mergeGapTolerance = mergeGapTolerance;
}
private void accept(ActivityIntervalDto next) {
if (current == null) {
current = next;
currentSources = new ArrayList<>(next.sourceRowIds());
return;
}
if (EsperActivitySemantics.canMerge(current, next, mergeGapTolerance)) {
currentSources.addAll(next.sourceRowIds());
OffsetDateTime newEndedAt = current.endedAt().isAfter(next.endedAt()) ? current.endedAt() : next.endedAt();
current = new ActivityIntervalDto(
current.driverEntityId(),
current.vehicleId() == null ? next.vehicleId() : current.vehicleId(),
current.vehicleRegistrationId() == null ? next.vehicleRegistrationId() : current.vehicleRegistrationId(),
current.activityType(),
current.cardSlot(),
current.cardStatus(),
current.drivingStatus(),
current.sourceKind(),
current.startedAt(),
newEndedAt,
Duration.between(current.startedAt(), newEndedAt).getSeconds(),
current.sourceRowId(),
List.copyOf(currentSources),
current.clippedToRequestedPeriod() || next.clippedToRequestedPeriod(),
"MERGED_ACTIVITY"
);
return;
}
merged.add(current.asMerged(List.copyOf(currentSources)));
current = next;
currentSources = new ArrayList<>(next.sourceRowIds());
}
private List<ActivityIntervalDto> finish() {
if (current != null) {
merged.add(current.asMerged(List.copyOf(currentSources)));
}
return List.copyOf(merged);
}
}
private static final class ShiftSpanCollector {
private final Duration shiftEndThreshold;
private final Duration contiguousGapTolerance;
private final List<ActivityIntervalDto> seen = new ArrayList<>();
private final List<EsperResolvedShiftSpan> spans = new ArrayList<>();
private OffsetDateTime shiftBegin;
private long accumulatedRestSeconds;
private OffsetDateTime prevActivityEnd;
private OffsetDateTime pendingShiftEndTriggerAt;
private ShiftSpanCollector(Duration shiftEndThreshold, Duration contiguousGapTolerance) {
this.shiftEndThreshold = shiftEndThreshold;
this.contiguousGapTolerance = contiguousGapTolerance;
}
private void accept(ActivityIntervalDto interval) {
seen.add(interval);
if (shiftBegin == null) {
if (EsperActivitySemantics.isShiftActivity(interval)) {
shiftBegin = interval.startedAt();
}
prevActivityEnd = interval.endedAt();
return;
}
if (!shiftBegin.isBefore(interval.startedAt())) {
prevActivityEnd = interval.endedAt();
return;
}
if (pendingShiftEndTriggerAt != null && EsperActivitySemantics.isShiftActivity(interval)) {
appendShiftSpan(shiftBegin, interval.startedAt());
shiftBegin = interval.startedAt();
pendingShiftEndTriggerAt = null;
accumulatedRestSeconds = 0;
}
if (EsperActivitySemantics.isRestOrUnknown(interval)) {
long gapSeconds = prevActivityEnd == null ? 0 : Math.max(0, Duration.between(prevActivityEnd, interval.startedAt()).getSeconds());
boolean contiguous = prevActivityEnd != null
&& Math.abs(Duration.between(prevActivityEnd, interval.startedAt()).getSeconds()) <= contiguousGapTolerance.getSeconds();
boolean crossedRecordDate = prevActivityEnd != null
&& !interval.startedAt().toLocalDate().equals(prevActivityEnd.toLocalDate());
if (contiguous) {
accumulatedRestSeconds += interval.durationSeconds();
} else if (crossedRecordDate) {
accumulatedRestSeconds = interval.durationSeconds() + gapSeconds;
} else {
accumulatedRestSeconds = interval.durationSeconds();
}
if (accumulatedRestSeconds >= shiftEndThreshold.getSeconds() && pendingShiftEndTriggerAt == null) {
pendingShiftEndTriggerAt = interval.startedAt();
}
} else if (EsperActivitySemantics.isShiftActivity(interval)) {
long gapSeconds = prevActivityEnd == null ? 0 : Math.max(0, Duration.between(prevActivityEnd, interval.startedAt()).getSeconds());
boolean crossedRecordDate = prevActivityEnd != null
&& !interval.startedAt().toLocalDate().equals(prevActivityEnd.toLocalDate());
if (crossedRecordDate && accumulatedRestSeconds + gapSeconds >= shiftEndThreshold.getSeconds()) {
appendShiftSpan(shiftBegin, interval.startedAt());
shiftBegin = interval.startedAt();
}
accumulatedRestSeconds = 0;
pendingShiftEndTriggerAt = null;
}
prevActivityEnd = interval.endedAt();
}
private List<EsperResolvedShiftSpan> finish() {
if (shiftBegin != null) {
appendShiftSpan(shiftBegin, null);
}
return List.copyOf(spans);
}
private void appendShiftSpan(OffsetDateTime currentShiftBegin, OffsetDateTime nextShiftBegin) {
if (currentShiftBegin == null) {
return;
}
OffsetDateTime shiftEnd = seen.stream()
.filter(interval -> !interval.startedAt().isBefore(currentShiftBegin))
.filter(interval -> nextShiftBegin == null || interval.startedAt().isBefore(nextShiftBegin))
.filter(EsperActivitySemantics::isShiftActivity)
.map(ActivityIntervalDto::endedAt)
.max(OffsetDateTime::compareTo)
.orElse(null);
if (shiftEnd == null || !shiftEnd.isAfter(currentShiftBegin)) {
return;
}
long restSeconds = nextShiftBegin == null ? 0 : Math.max(0, Duration.between(shiftEnd, nextShiftBegin).getSeconds());
spans.add(new EsperResolvedShiftSpan(currentShiftBegin, shiftEnd, nextShiftBegin, restSeconds));
}
}
}

View File

@ -1,12 +1,16 @@
package at.procon.eventhub.esperpoc.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.DriverWorkSummaryDto;
import at.procon.eventhub.esperpoc.dto.DrivingInterruptionDto;
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
import at.procon.eventhub.esperpoc.dto.EsperPocResultDto;
import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode;
import at.procon.eventhub.esperpoc.dto.OperatingTimePeriodDto;
import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
import at.procon.eventhub.esperpoc.dto.ResolvedWorkShiftDto;
import at.procon.eventhub.esperpoc.dto.ShiftDrivingEvaluationDto;
import at.procon.eventhub.esperpoc.persistence.EsperPocActivityRepository;
import java.time.Duration;
@ -15,58 +19,140 @@ import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class EsperPocDriverCardActivityService {
private static final Duration SHIFT_SCAN_CONTIGUOUS_GAP = Duration.ofMinutes(1);
private static final Logger log = LoggerFactory.getLogger(EsperPocDriverCardActivityService.class);
private final EsperPocActivityRepository activityRepository;
private final EsperDriverActivityEngine esperEngine;
private final EventHubProperties properties;
public EsperPocDriverCardActivityService(
EsperPocActivityRepository activityRepository,
EsperDriverActivityEngine esperEngine
) {
this(activityRepository, esperEngine, null);
}
@Autowired
public EsperPocDriverCardActivityService(
EsperPocActivityRepository activityRepository,
EsperDriverActivityEngine esperEngine,
EventHubProperties properties
) {
this.activityRepository = activityRepository;
this.esperEngine = esperEngine;
this.properties = properties;
}
public EsperPocResultDto evaluate(EsperPocRequest request) {
long startedNanos = System.nanoTime();
EsperActivityMergeMode activityMergeMode = resolveActivityMergeMode(request);
EsperShiftResolutionMode shiftResolutionMode = resolveShiftResolutionMode(request);
OffsetDateTime requestedFrom = utc(request.occurredFrom());
OffsetDateTime requestedTo = utc(request.occurredTo());
OffsetDateTime loadedFrom = requestedFrom.minusHours(request.guardHours());
OffsetDateTime loadedTo = requestedTo.plusHours(request.guardHours());
List<RawActivityEventDto> rawEvents = activityRepository.findDriverCardActivityEvents(
long dbStartedNanos = System.nanoTime();
List<RawActivityEventDto> rawEvents = activityRepository.findDriverActivityEvents(
request.tenantKey(),
request.driverEntityId(),
loadedFrom,
loadedTo
);
List<ActivityIntervalDto> rawIntervals = esperEngine.buildIntervals(rawEvents);
long dbElapsedMs = elapsedMillis(dbStartedNanos);
List<RawActivityEventDto> driverCardRawEvents = rawEvents.stream()
.filter(event -> "DRIVER_CARD".equals(event.sourceKind()))
.toList();
List<RawActivityEventDto> vehicleUnitRawEvents = rawEvents.stream()
.filter(event -> "VEHICLE_UNIT".equals(event.sourceKind()))
.toList();
// Merge in the full guard window first. This is important for long BREAK_REST detection:
// a rest crossing the requested period boundary must keep its full guard-window duration.
List<ActivityIntervalDto> mergedLoadedActivities = mergeConsecutiveIdenticalActivities(
rawIntervals,
Duration.ofSeconds(request.mergeGapSeconds())
long cardIntervalsStartedNanos = System.nanoTime();
List<ActivityIntervalDto> driverCardRawIntervals = esperEngine.buildIntervals(driverCardRawEvents);
long cardIntervalsElapsedMs = elapsedMillis(cardIntervalsStartedNanos);
long vuIntervalsStartedNanos = System.nanoTime();
List<ActivityIntervalDto> vehicleUnitRawIntervals = esperEngine.buildIntervals(vehicleUnitRawEvents);
long vuIntervalsElapsedMs = elapsedMillis(vuIntervalsStartedNanos);
long vuGapFillStartedNanos = System.nanoTime();
List<ActivityIntervalDto> resolvedLoadedIntervals = resolveVuFillGaps(
driverCardRawIntervals,
vehicleUnitRawIntervals
);
long vuGapFillElapsedMs = elapsedMillis(vuGapFillStartedNanos);
long mergeStartedNanos = System.nanoTime();
List<ActivityIntervalDto> mergedLoadedActivities = mergeActivities(
resolvedLoadedIntervals,
Duration.ofSeconds(request.mergeGapSeconds()),
activityMergeMode
);
long mergeElapsedMs = elapsedMillis(mergeStartedNanos);
long summaryStartedNanos = System.nanoTime();
List<ActivityIntervalDto> mergedActivities = clipToPeriod(mergedLoadedActivities, requestedFrom, requestedTo);
DriverWorkSummaryDto summary = summarize(request, requestedFrom, requestedTo, mergedActivities);
ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving(
mergedActivities,
request.significantDrivingMinutes()
);
long summaryElapsedMs = elapsedMillis(summaryStartedNanos);
long operatingPeriodsStartedNanos = System.nanoTime();
List<OperatingTimePeriodDto> operatingTimePeriods = buildOperatingTimePeriods(
request,
requestedFrom,
requestedTo,
mergedLoadedActivities
);
long operatingPeriodsElapsedMs = elapsedMillis(operatingPeriodsStartedNanos);
long workingShiftsStartedNanos = System.nanoTime();
List<ResolvedWorkShiftDto> workingShifts = buildResolvedWorkShifts(
request,
requestedFrom,
requestedTo,
resolvedLoadedIntervals,
activityMergeMode,
shiftResolutionMode
);
long workingShiftsElapsedMs = elapsedMillis(workingShiftsStartedNanos);
long totalElapsedMs = elapsedMillis(startedNanos);
log.info("Esper PoC working-shift evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} mergeMode={} shiftMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedIntervals={} mergedActivities={} operatingPeriods={} workingShifts={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, merge={}, summaryAndDriving={}, operatingPeriods={}, workingShifts={}, total={}}}",
request.tenantKey(),
request.driverEntityId(),
requestedFrom,
requestedTo,
loadedFrom,
loadedTo,
activityMergeMode,
shiftResolutionMode,
rawEvents.size(),
driverCardRawEvents.size(),
vehicleUnitRawEvents.size(),
driverCardRawIntervals.size(),
vehicleUnitRawIntervals.size(),
resolvedLoadedIntervals.size(),
mergedActivities.size(),
operatingTimePeriods.size(),
workingShifts.size(),
dbElapsedMs,
cardIntervalsElapsedMs,
vuIntervalsElapsedMs,
vuGapFillElapsedMs,
mergeElapsedMs,
summaryElapsedMs,
operatingPeriodsElapsedMs,
workingShiftsElapsedMs,
totalElapsedMs);
return new EsperPocResultDto(
request.tenantKey(),
@ -76,18 +162,28 @@ public class EsperPocDriverCardActivityService {
loadedFrom,
loadedTo,
rawEvents.size(),
rawIntervals.size(),
driverCardRawEvents.size(),
vehicleUnitRawEvents.size(),
driverCardRawIntervals.size(),
vehicleUnitRawIntervals.size(),
resolvedLoadedIntervals.size(),
mergedActivities.size(),
operatingTimePeriods.size(),
workingShifts.size(),
request.operatingPeriodSplitRestHours(),
request.shiftEndMarkPeriodMinutes(),
request.absenceBeginEndMinActivityMinutes(),
activityMergeMode,
shiftResolutionMode,
rawEvents,
rawIntervals,
resolvedLoadedIntervals,
mergedActivities,
operatingTimePeriods,
workingShifts,
summary,
summary,
drivingEvaluation,
notes(request)
null,
notes(request, activityMergeMode, shiftResolutionMode)
);
}
@ -98,10 +194,7 @@ public class EsperPocDriverCardActivityService {
if (intervals == null || intervals.isEmpty()) {
return List.of();
}
List<ActivityIntervalDto> sorted = intervals.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
List<ActivityIntervalDto> sorted = sortedPositiveIntervals(intervals);
List<ActivityIntervalDto> result = new ArrayList<>();
ActivityIntervalDto current = null;
List<String> currentSources = new ArrayList<>();
@ -111,7 +204,7 @@ public class EsperPocDriverCardActivityService {
currentSources = new ArrayList<>(next.sourceRowIds());
continue;
}
if (canMerge(current, next, mergeGapTolerance)) {
if (EsperActivitySemantics.canMerge(current, next, mergeGapTolerance)) {
currentSources.addAll(next.sourceRowIds());
OffsetDateTime newEndedAt = max(current.endedAt(), next.endedAt());
current = new ActivityIntervalDto(
@ -120,6 +213,9 @@ public class EsperPocDriverCardActivityService {
current.vehicleRegistrationId() == null ? next.vehicleRegistrationId() : current.vehicleRegistrationId(),
current.activityType(),
current.cardSlot(),
current.cardStatus(),
current.drivingStatus(),
current.sourceKind(),
current.startedAt(),
newEndedAt,
Duration.between(current.startedAt(), newEndedAt).getSeconds(),
@ -147,13 +243,9 @@ public class EsperPocDriverCardActivityService {
List<ActivityIntervalDto> mergedLoadedActivities
) {
Duration splitRestThreshold = Duration.ofHours(request.operatingPeriodSplitRestHours());
List<ActivityIntervalDto> sorted = mergedLoadedActivities.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
List<ActivityIntervalDto> sorted = sortedPositiveIntervals(mergedLoadedActivities);
List<ActivityIntervalDto> longRests = sorted.stream()
.filter(interval -> isOperatingPeriodSplitRest(interval, splitRestThreshold))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt))
.toList();
List<OperatingTimePeriodDto> result = new ArrayList<>();
@ -213,69 +305,374 @@ public class EsperPocDriverCardActivityService {
return result;
}
private OperatingTimePeriodDto buildOperatingPeriod(
int sequenceNumber,
public List<ResolvedWorkShiftDto> buildResolvedWorkShifts(
EsperPocRequest request,
OffsetDateTime spanFrom,
OffsetDateTime spanTo,
ActivityIntervalDto splitStartedAfterLongRest,
ActivityIntervalDto splitEndedByLongRest,
List<ActivityIntervalDto> allActivities
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
List<ActivityIntervalDto> resolvedLoadedIntervals
) {
List<ActivityIntervalDto> activities = allActivities.stream()
.filter(activity -> activity.startedAt().isBefore(spanTo) && activity.endedAt().isAfter(spanFrom))
.map(activity -> {
OffsetDateTime start = max(activity.startedAt(), spanFrom);
OffsetDateTime end = min(activity.endedAt(), spanTo);
if (!end.isAfter(start)) {
return null;
}
boolean clipped = !start.equals(activity.startedAt()) || !end.equals(activity.endedAt());
return activity.withTime(start, end, clipped);
})
.filter(Objects::nonNull)
.filter(activity -> activity.endedAt().isAfter(activity.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
if (activities.isEmpty()) {
return null;
return buildResolvedWorkShifts(
request,
requestedFrom,
requestedTo,
resolvedLoadedIntervals,
resolveActivityMergeMode(request),
resolveShiftResolutionMode(request)
);
}
OffsetDateTime startedAt = activities.get(0).startedAt();
OffsetDateTime endedAt = activities.get(activities.size() - 1).endedAt();
DriverWorkSummaryDto summary = summarize(request, startedAt, endedAt, activities);
public List<ResolvedWorkShiftDto> buildResolvedWorkShifts(
EsperPocRequest request,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
List<ActivityIntervalDto> resolvedLoadedIntervals,
EsperActivityMergeMode activityMergeMode,
EsperShiftResolutionMode shiftResolutionMode
) {
long startedNanos = System.nanoTime();
List<ActivityIntervalDto> sorted = sortedPositiveIntervals(resolvedLoadedIntervals);
if (sorted.isEmpty()) {
log.info("Esper PoC work-shift resolution requestedFrom={} requestedTo={} shiftMode={} sourceIntervals=0 resolvedShifts=0 timingsMs={{resolveSpans=0, enrichShifts=0, total={}}}",
requestedFrom,
requestedTo,
shiftResolutionMode,
elapsedMillis(startedNanos));
return List.of();
}
long resolveSpansStartedNanos = System.nanoTime();
List<EsperResolvedShiftSpan> shiftSpans = resolveShiftSpans(sorted, request, shiftResolutionMode);
long resolveSpansElapsedMs = elapsedMillis(resolveSpansStartedNanos);
if (shiftSpans.isEmpty()) {
log.info("Esper PoC work-shift resolution requestedFrom={} requestedTo={} shiftMode={} sourceIntervals={} resolvedShiftSpans=0 resolvedShifts=0 timingsMs={{resolveSpans={}, enrichShifts=0, total={}}}",
requestedFrom,
requestedTo,
shiftResolutionMode,
sorted.size(),
resolveSpansElapsedMs,
elapsedMillis(startedNanos));
return List.of();
}
ZoneOffset evaluationOffset = request.occurredFrom().getOffset();
ShiftDeviationStats deviationStats = calculateShiftDeviationStats(shiftSpans, evaluationOffset);
List<ResolvedWorkShiftDto> result = new ArrayList<>();
int sequenceNumber = 1;
long enrichShiftsStartedNanos = System.nanoTime();
for (int index = 0; index < shiftSpans.size(); index++) {
EsperResolvedShiftSpan span = shiftSpans.get(index);
if (!span.endedAt().isAfter(requestedFrom) || !span.startedAt().isBefore(requestedTo)) {
continue;
}
List<ActivityIntervalDto> rawShiftActivities = clipToPeriod(sorted, span.startedAt(), span.endedAt());
if (rawShiftActivities.isEmpty()) {
continue;
}
List<ActivityIntervalDto> mergedShiftActivities = mergeActivities(
rawShiftActivities,
Duration.ofSeconds(request.mergeGapSeconds()),
activityMergeMode
);
DriverWorkSummaryDto summary = summarize(request, span.startedAt(), span.endedAt(), mergedShiftActivities);
ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving(
activities,
mergedShiftActivities,
request.significantDrivingMinutes()
);
return new OperatingTimePeriodDto(
sequenceNumber,
startedAt,
endedAt,
Duration.between(startedAt, endedAt).getSeconds(),
splitStartedAfterLongRest,
splitEndedByLongRest,
activities,
ShiftDeviationSnapshot deviation = deviationStats.snapshot(span, evaluationOffset);
result.add(new ResolvedWorkShiftDto(
sequenceNumber++,
span.startedAt(),
span.endedAt(),
span.nextShiftStartedAt(),
span.durationSeconds(),
span.dailyRestingTimeSeconds(),
firstDrivingStart(rawShiftActivities),
lastDrivingEnd(rawShiftActivities),
firstDrivingStart(rawShiftActivities, request.absenceBeginEndMinActivityMinutes()),
lastDrivingEnd(rawShiftActivities, request.absenceBeginEndMinActivityMinutes()),
distinctVehicleIds(rawShiftActivities),
distinctVehicleRegistrationIds(rawShiftActivities),
determineUsedDataKind(rawShiftActivities),
determineShiftKind(span, evaluationOffset),
deviation.beginInRange(),
deviation.endInRange(),
summary.drivingSeconds() == 0,
index > 0 && shiftSpans.get(index - 1).dailyRestingTimeSeconds() > Duration.ofHours(24).getSeconds(),
span.durationSeconds() > Duration.ofHours(24).getSeconds(),
span.dailyRestingTimeSeconds() >= Duration.ofHours(9).getSeconds()
&& span.dailyRestingTimeSeconds() <= Duration.ofHours(11).getSeconds(),
span.dailyRestingTimeSeconds() >= Duration.ofHours(7).getSeconds()
&& span.dailyRestingTimeSeconds() <= Duration.ofHours(9).getSeconds(),
rawShiftActivities.size(),
countActivities(rawShiftActivities, "DRIVE"),
countActivities(rawShiftActivities, "AVAILABILITY"),
countActivities(rawShiftActivities, "WORK"),
countActivities(rawShiftActivities, "BREAK_REST"),
countUnknownActivities(rawShiftActivities),
countWorkingActivities(rawShiftActivities),
summary,
drivingEvaluation
drivingEvaluation,
rawShiftActivities
));
}
long enrichShiftsElapsedMs = elapsedMillis(enrichShiftsStartedNanos);
log.info("Esper PoC work-shift resolution requestedFrom={} requestedTo={} shiftMode={} sourceIntervals={} resolvedShiftSpans={} resolvedShifts={} timingsMs={{resolveSpans={}, enrichShifts={}, total={}}}",
requestedFrom,
requestedTo,
shiftResolutionMode,
sorted.size(),
shiftSpans.size(),
result.size(),
resolveSpansElapsedMs,
enrichShiftsElapsedMs,
elapsedMillis(startedNanos));
return result;
}
List<ActivityIntervalDto> resolveVuFillGaps(
List<ActivityIntervalDto> driverCardRawIntervals,
List<ActivityIntervalDto> vehicleUnitRawIntervals
) {
List<ActivityIntervalDto> driverCard = sortedPositiveIntervals(driverCardRawIntervals);
List<ActivityIntervalDto> vehicleUnit = sortedPositiveIntervals(vehicleUnitRawIntervals);
if (driverCard.isEmpty()) {
return vehicleUnit;
}
if (vehicleUnit.isEmpty()) {
return driverCard;
}
List<ActivityIntervalDto> resolved = new ArrayList<>(driverCard);
for (ActivityIntervalDto vuInterval : vehicleUnit) {
resolved.addAll(subtractCoverage(vuInterval, driverCard));
}
return resolved.stream()
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt)
.thenComparing(ActivityIntervalDto::endedAt)
.thenComparing(ActivityIntervalDto::sourceKind, Comparator.nullsLast(String::compareTo)))
.toList();
}
private List<ActivityIntervalDto> mergeActivities(
List<ActivityIntervalDto> intervals,
Duration mergeGapTolerance,
EsperActivityMergeMode mode
) {
if (mode == EsperActivityMergeMode.ESPER) {
return requireEsperEngine().mergeConsecutiveIdenticalActivities(intervals, mergeGapTolerance);
}
return mergeConsecutiveIdenticalActivities(intervals, mergeGapTolerance);
}
private List<ActivityIntervalDto> subtractCoverage(
ActivityIntervalDto candidate,
List<ActivityIntervalDto> coverage
) {
List<ActivityIntervalDto> result = new ArrayList<>();
OffsetDateTime cursor = candidate.startedAt();
for (ActivityIntervalDto covered : coverage) {
if (!covered.endedAt().isAfter(cursor)) {
continue;
}
if (!covered.startedAt().isBefore(candidate.endedAt())) {
break;
}
OffsetDateTime overlapStart = max(cursor, covered.startedAt());
if (overlapStart.isAfter(cursor)) {
result.add(candidate.withTime(cursor, overlapStart, candidate.clippedToRequestedPeriod()));
}
if (covered.endedAt().isAfter(cursor)) {
cursor = max(cursor, covered.endedAt());
}
if (!candidate.endedAt().isAfter(cursor)) {
break;
}
}
if (candidate.endedAt().isAfter(cursor)) {
result.add(candidate.withTime(cursor, candidate.endedAt(), candidate.clippedToRequestedPeriod()));
}
return result.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.toList();
}
private List<EsperResolvedShiftSpan> resolveShiftSpans(
List<ActivityIntervalDto> sorted,
EsperPocRequest request,
EsperShiftResolutionMode mode
) {
if (mode == EsperShiftResolutionMode.ESPER) {
return requireEsperEngine().resolveShiftSpans(
sorted,
Duration.ofMinutes(request.shiftEndMarkPeriodMinutes()),
SHIFT_SCAN_CONTIGUOUS_GAP
);
}
ActivityIntervalDto firstShiftActivity = sorted.stream()
.filter(EsperActivitySemantics::isShiftActivity)
.findFirst()
.orElse(null);
if (firstShiftActivity == null) {
return List.of();
}
List<EsperResolvedShiftSpan> result = new ArrayList<>();
OffsetDateTime shiftBegin = firstShiftActivity.startedAt();
long accumulatedRestSeconds = 0;
OffsetDateTime prevActivityEnd = null;
long shiftEndThresholdSeconds = Duration.ofMinutes(request.shiftEndMarkPeriodMinutes()).getSeconds();
for (ActivityIntervalDto interval : sorted) {
if (!shiftBegin.isBefore(interval.startedAt())) {
prevActivityEnd = interval.endedAt();
continue;
}
if (EsperActivitySemantics.isRestOrUnknown(interval)) {
if (prevActivityEnd != null
&& Math.abs(Duration.between(prevActivityEnd, interval.startedAt()).getSeconds()) <= SHIFT_SCAN_CONTIGUOUS_GAP.getSeconds()) {
accumulatedRestSeconds += interval.durationSeconds();
} else if (prevActivityEnd != null
&& !Objects.equals(EsperActivitySemantics.recordDate(interval), prevActivityEnd.toLocalDate())) {
accumulatedRestSeconds = interval.durationSeconds() + Math.max(0, Duration.between(prevActivityEnd, interval.startedAt()).getSeconds());
} else {
accumulatedRestSeconds = interval.durationSeconds();
}
if (accumulatedRestSeconds >= shiftEndThresholdSeconds) {
OffsetDateTime nextShiftBegin = findNextActiveStartAfter(sorted, interval.startedAt());
appendShiftSpan(result, sorted, shiftBegin, nextShiftBegin);
shiftBegin = nextShiftBegin;
accumulatedRestSeconds = 0;
}
} else if (EsperActivitySemantics.isShiftActivity(interval)) {
if (prevActivityEnd != null
&& !Objects.equals(EsperActivitySemantics.recordDate(interval), prevActivityEnd.toLocalDate())
&& accumulatedRestSeconds + Math.max(0, Duration.between(prevActivityEnd, interval.startedAt()).getSeconds()) >= shiftEndThresholdSeconds) {
OffsetDateTime nextShiftBegin = interval.startedAt();
appendShiftSpan(result, sorted, shiftBegin, nextShiftBegin);
shiftBegin = nextShiftBegin;
}
accumulatedRestSeconds = 0;
}
prevActivityEnd = interval.endedAt();
}
if (shiftBegin != null) {
appendShiftSpan(result, sorted, shiftBegin, null);
}
return result;
}
private void appendShiftSpan(
List<EsperResolvedShiftSpan> target,
List<ActivityIntervalDto> sorted,
OffsetDateTime shiftBegin,
OffsetDateTime nextShiftBegin
) {
if (shiftBegin == null) {
return;
}
OffsetDateTime shiftEnd = findLastActiveEnd(sorted, shiftBegin, nextShiftBegin);
if (shiftEnd == null || !shiftEnd.isAfter(shiftBegin)) {
return;
}
long restSeconds = nextShiftBegin == null ? 0 : Math.max(0, Duration.between(shiftEnd, nextShiftBegin).getSeconds());
target.add(new EsperResolvedShiftSpan(shiftBegin, shiftEnd, nextShiftBegin, restSeconds));
}
private OffsetDateTime findNextActiveStartAfter(List<ActivityIntervalDto> sorted, OffsetDateTime activityTime) {
return sorted.stream()
.filter(interval -> interval.startedAt().isAfter(activityTime))
.filter(EsperActivitySemantics::isShiftActivity)
.map(ActivityIntervalDto::startedAt)
.findFirst()
.orElse(null);
}
private OffsetDateTime findLastActiveEnd(
List<ActivityIntervalDto> sorted,
OffsetDateTime shiftBegin,
OffsetDateTime nextShiftBegin
) {
return sorted.stream()
.filter(interval -> !interval.startedAt().isBefore(shiftBegin))
.filter(interval -> nextShiftBegin == null || interval.startedAt().isBefore(nextShiftBegin))
.filter(EsperActivitySemantics::isShiftActivity)
.map(ActivityIntervalDto::endedAt)
.max(OffsetDateTime::compareTo)
.orElse(null);
}
private ShiftDeviationStats calculateShiftDeviationStats(List<EsperResolvedShiftSpan> shiftSpans, ZoneOffset evaluationOffset) {
Map<String, List<EsperResolvedShiftSpan>> grouped = new LinkedHashMap<>();
for (EsperResolvedShiftSpan span : shiftSpans) {
grouped.computeIfAbsent(determineShiftKind(span, evaluationOffset), ignored -> new ArrayList<>()).add(span);
}
Map<String, DeviationSnapshot> stats = new LinkedHashMap<>();
for (Map.Entry<String, List<EsperResolvedShiftSpan>> entry : grouped.entrySet()) {
List<EsperResolvedShiftSpan> spans = entry.getValue();
if (spans.size() < 4) {
stats.put(entry.getKey(), new DeviationSnapshot(Double.NaN, Double.NaN, Double.NaN, Double.NaN, false));
continue;
}
double beginMean = spans.stream().mapToInt(span -> minutesOfDay(span.startedAt(), evaluationOffset)).average().orElse(Double.NaN);
double endMean = spans.stream().mapToInt(span -> minutesOfDay(span.endedAt(), evaluationOffset)).average().orElse(Double.NaN);
double beginDeviation = standardDeviation(spans.stream().mapToInt(span -> minutesOfDay(span.startedAt(), evaluationOffset)).toArray(), beginMean);
double endDeviation = standardDeviation(spans.stream().mapToInt(span -> minutesOfDay(span.endedAt(), evaluationOffset)).toArray(), endMean);
stats.put(entry.getKey(), new DeviationSnapshot(beginMean, endMean, beginDeviation, endDeviation, true));
}
return new ShiftDeviationStats(stats);
}
private double standardDeviation(int[] values, double mean) {
double squaredSum = 0;
for (int value : values) {
double delta = value - mean;
squaredSum += delta * delta;
}
return Math.sqrt(squaredSum / values.length);
}
private int minutesOfDay(OffsetDateTime value, ZoneOffset evaluationOffset) {
OffsetDateTime local = value.withOffsetSameInstant(evaluationOffset);
return local.getHour() * 60 + local.getMinute();
}
private String determineShiftKind(EsperResolvedShiftSpan span, ZoneOffset evaluationOffset) {
OffsetDateTime localStart = span.startedAt().withOffsetSameInstant(evaluationOffset);
OffsetDateTime localEnd = span.endedAt().withOffsetSameInstant(evaluationOffset);
if (localStart.toLocalDate().equals(localEnd.toLocalDate())) {
int startSecond = localStart.getHour() * 3600 + localStart.getMinute() * 60 + localStart.getSecond();
int endSecond = localEnd.getHour() * 3600 + localEnd.getMinute() * 60 + localEnd.getSecond();
return startSecond < 5 * 3600 || endSecond > 20 * 3600 ? "SPECIAL_HOURS" : "DAY";
}
return "OVERNIGHT";
}
private String determineUsedDataKind(List<ActivityIntervalDto> activities) {
boolean hasCard = activities.stream().anyMatch(activity -> "DRIVER_CARD".equals(activity.sourceKind()));
boolean hasVu = activities.stream().anyMatch(activity -> "VEHICLE_UNIT".equals(activity.sourceKind()));
if (hasCard && hasVu) {
return "BOTH";
}
if (hasVu) {
return "VEHICLE_UNIT";
}
return "DRIVER_CARD";
}
private boolean isOperatingPeriodSplitRest(ActivityIntervalDto interval, Duration splitRestThreshold) {
return "BREAK_REST".equals(interval.activityType())
&& interval.durationSeconds() > splitRestThreshold.getSeconds();
}
private boolean canMerge(ActivityIntervalDto left, ActivityIntervalDto right, Duration tolerance) {
boolean sameDriver = Objects.equals(left.driverEntityId(), right.driverEntityId());
boolean sameActivity = Objects.equals(left.activityType(), right.activityType());
boolean sameSlot = Objects.equals(left.cardSlot(), right.cardSlot());
long gapSeconds = Duration.between(left.endedAt(), right.startedAt()).getSeconds();
boolean adjacentOrOverlapping = gapSeconds <= tolerance.getSeconds();
return sameDriver && sameActivity && sameSlot && adjacentOrOverlapping;
}
private List<ActivityIntervalDto> clipToPeriod(
List<ActivityIntervalDto> intervals,
OffsetDateTime periodFrom,
@ -296,6 +693,18 @@ public class EsperPocDriverCardActivityService {
.toList();
}
private List<ActivityIntervalDto> sortedPositiveIntervals(List<ActivityIntervalDto> intervals) {
if (intervals == null || intervals.isEmpty()) {
return List.of();
}
return intervals.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt)
.thenComparing(ActivityIntervalDto::endedAt)
.thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo)))
.toList();
}
private DriverWorkSummaryDto summarize(
EsperPocRequest request,
OffsetDateTime periodFrom,
@ -372,17 +781,177 @@ public class EsperPocDriverCardActivityService {
);
}
private List<String> notes(EsperPocRequest request) {
private OperatingTimePeriodDto buildOperatingPeriod(
int sequenceNumber,
EsperPocRequest request,
OffsetDateTime spanFrom,
OffsetDateTime spanTo,
ActivityIntervalDto splitStartedAfterLongRest,
ActivityIntervalDto splitEndedByLongRest,
List<ActivityIntervalDto> allActivities
) {
List<ActivityIntervalDto> activities = allActivities.stream()
.filter(activity -> activity.startedAt().isBefore(spanTo) && activity.endedAt().isAfter(spanFrom))
.map(activity -> {
OffsetDateTime start = max(activity.startedAt(), spanFrom);
OffsetDateTime end = min(activity.endedAt(), spanTo);
if (!end.isAfter(start)) {
return null;
}
boolean clipped = !start.equals(activity.startedAt()) || !end.equals(activity.endedAt());
return activity.withTime(start, end, clipped);
})
.filter(Objects::nonNull)
.filter(activity -> activity.endedAt().isAfter(activity.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
if (activities.isEmpty()) {
return null;
}
OffsetDateTime startedAt = activities.get(0).startedAt();
OffsetDateTime endedAt = activities.get(activities.size() - 1).endedAt();
DriverWorkSummaryDto summary = summarize(request, startedAt, endedAt, activities);
ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving(
mergeActivities(
activities,
Duration.ofSeconds(request.mergeGapSeconds()),
resolveActivityMergeMode(request)
),
request.significantDrivingMinutes()
);
return new OperatingTimePeriodDto(
sequenceNumber,
startedAt,
endedAt,
Duration.between(startedAt, endedAt).getSeconds(),
splitStartedAfterLongRest,
splitEndedByLongRest,
activities,
summary,
drivingEvaluation
);
}
private OffsetDateTime firstDrivingStart(List<ActivityIntervalDto> activities) {
return activities.stream()
.filter(this::isDrivingActivity)
.map(ActivityIntervalDto::startedAt)
.min(OffsetDateTime::compareTo)
.orElse(null);
}
private OffsetDateTime lastDrivingEnd(List<ActivityIntervalDto> activities) {
return activities.stream()
.filter(this::isDrivingActivity)
.map(ActivityIntervalDto::endedAt)
.max(OffsetDateTime::compareTo)
.orElse(null);
}
private OffsetDateTime firstDrivingStart(List<ActivityIntervalDto> activities, int minMinutes) {
long threshold = Duration.ofMinutes(minMinutes).getSeconds();
return activities.stream()
.filter(this::isDrivingActivity)
.filter(activity -> activity.durationSeconds() >= threshold)
.map(ActivityIntervalDto::startedAt)
.min(OffsetDateTime::compareTo)
.orElse(null);
}
private OffsetDateTime lastDrivingEnd(List<ActivityIntervalDto> activities, int minMinutes) {
long threshold = Duration.ofMinutes(minMinutes).getSeconds();
return activities.stream()
.filter(this::isDrivingActivity)
.filter(activity -> activity.durationSeconds() >= threshold)
.map(ActivityIntervalDto::endedAt)
.max(OffsetDateTime::compareTo)
.orElse(null);
}
private boolean isDrivingActivity(ActivityIntervalDto activity) {
return EsperActivitySemantics.isKnownActivity(activity) && "DRIVE".equals(activity.activityType());
}
private int countActivities(List<ActivityIntervalDto> activities, String activityType) {
return (int) activities.stream()
.filter(EsperActivitySemantics::isKnownActivity)
.filter(activity -> activityType.equals(activity.activityType()))
.count();
}
private int countUnknownActivities(List<ActivityIntervalDto> activities) {
return (int) activities.stream()
.filter(EsperActivitySemantics::isUnknownActivity)
.count();
}
private int countWorkingActivities(List<ActivityIntervalDto> activities) {
return (int) activities.stream()
.filter(EsperActivitySemantics::isShiftActivity)
.count();
}
private List<UUID> distinctVehicleIds(List<ActivityIntervalDto> activities) {
Set<UUID> values = new LinkedHashSet<>();
for (ActivityIntervalDto activity : activities) {
if (activity.vehicleId() != null) {
values.add(activity.vehicleId());
}
}
return List.copyOf(values);
}
private List<UUID> distinctVehicleRegistrationIds(List<ActivityIntervalDto> activities) {
Set<UUID> values = new LinkedHashSet<>();
for (ActivityIntervalDto activity : activities) {
if (activity.vehicleRegistrationId() != null) {
values.add(activity.vehicleRegistrationId());
}
}
return List.copyOf(values);
}
private List<String> notes(
EsperPocRequest request,
EsperActivityMergeMode activityMergeMode,
EsperShiftResolutionMode shiftResolutionMode
) {
return List.of(
"PoC reads only tachograph DRIVER_CARD/CARD_ACTIVITY source events from eventhub.event.",
"Level RAW contains original imported point events; level Activities contains Esper-created intervals merged by consecutive identical activity.",
"PoC reads tachograph DRIVER_CARD/CARD_ACTIVITY events and VEHICLE_UNIT/VU_ACTIVITY events from eventhub.event.",
"Driver-card intervals remain authoritative. VEHICLE_UNIT activity fills only uncovered time gaps in the driver-card timeline.",
"Configured activity merge mode: " + activityMergeMode + ". Configured shift resolution mode: " + shiftResolutionMode + ".",
"Resolved work shifts follow the stored-procedure split rule: BREAK_REST or unknown coverage accumulating to "
+ request.shiftEndMarkPeriodMinutes() + " minutes ends a shift.",
"Driving-time interruption evaluation is reported per operating time period and per working shift, not as a single evaluation for the complete requested period.",
"Activities are merged in the guard window first and then clipped to the requested period, so BREAK_REST across the period boundary can still split operating periods correctly.",
"Operating time periods are split by BREAK_REST activities longer than " + request.operatingPeriodSplitRestHours() + " hours.",
"Working seconds = DRIVE + WORK. Operation seconds = DRIVE + WORK + AVAILABILITY. BREAK_REST is reported separately.",
"Departure/arrival are evaluated globally and again inside each operating time period using DRIVE intervals longer than " + request.significantDrivingMinutes() + " minutes."
"Working seconds = DRIVE + WORK. Operation seconds = DRIVE + WORK + AVAILABILITY. BREAK_REST is reported separately."
);
}
private EsperActivityMergeMode resolveActivityMergeMode(EsperPocRequest request) {
if (request.activityMergeMode() != null) {
return request.activityMergeMode();
}
return properties == null ? EsperActivityMergeMode.JAVA : properties.getEsperPoc().getActivityMergeMode();
}
private EsperShiftResolutionMode resolveShiftResolutionMode(EsperPocRequest request) {
if (request.shiftResolutionMode() != null) {
return request.shiftResolutionMode();
}
return properties == null ? EsperShiftResolutionMode.JAVA : properties.getEsperPoc().getShiftResolutionMode();
}
private EsperDriverActivityEngine requireEsperEngine() {
if (esperEngine == null) {
throw new IllegalStateException("Esper engine is required for ESPER execution modes");
}
return esperEngine;
}
private OffsetDateTime utc(OffsetDateTime value) {
return value.withOffsetSameInstant(ZoneOffset.UTC);
}
@ -394,4 +963,52 @@ public class EsperPocDriverCardActivityService {
private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) {
return left.isBefore(right) ? left : right;
}
private long elapsedMillis(long startedNanos) {
return Duration.ofNanos(System.nanoTime() - startedNanos).toMillis();
}
private record ShiftDeviationStats(Map<String, DeviationSnapshot> stats) {
ShiftDeviationSnapshot snapshot(EsperResolvedShiftSpan span, ZoneOffset evaluationOffset) {
DeviationSnapshot snapshot = stats.get(determineShiftKindStatic(span, evaluationOffset));
if (snapshot == null || !snapshot.computed()) {
return new ShiftDeviationSnapshot(true, true);
}
int beginMinutes = minutesOfDayStatic(span.startedAt(), evaluationOffset);
int endMinutes = minutesOfDayStatic(span.endedAt(), evaluationOffset);
boolean beginInRange = beginMinutes >= snapshot.beginMean() - snapshot.beginDeviation()
&& beginMinutes <= snapshot.beginMean() + snapshot.beginDeviation();
boolean endInRange = endMinutes >= snapshot.endMean() - snapshot.endDeviation()
&& endMinutes <= snapshot.endMean() + snapshot.endDeviation();
return new ShiftDeviationSnapshot(beginInRange, endInRange);
}
private static String determineShiftKindStatic(EsperResolvedShiftSpan span, ZoneOffset evaluationOffset) {
OffsetDateTime localStart = span.startedAt().withOffsetSameInstant(evaluationOffset);
OffsetDateTime localEnd = span.endedAt().withOffsetSameInstant(evaluationOffset);
if (localStart.toLocalDate().equals(localEnd.toLocalDate())) {
int startSecond = localStart.getHour() * 3600 + localStart.getMinute() * 60 + localStart.getSecond();
int endSecond = localEnd.getHour() * 3600 + localEnd.getMinute() * 60 + localEnd.getSecond();
return startSecond < 5 * 3600 || endSecond > 20 * 3600 ? "SPECIAL_HOURS" : "DAY";
}
return "OVERNIGHT";
}
private static int minutesOfDayStatic(OffsetDateTime value, ZoneOffset evaluationOffset) {
OffsetDateTime local = value.withOffsetSameInstant(evaluationOffset);
return local.getHour() * 60 + local.getMinute();
}
}
private record DeviationSnapshot(
double beginMean,
double endMean,
double beginDeviation,
double endDeviation,
boolean computed
) {
}
private record ShiftDeviationSnapshot(boolean beginInRange, boolean endInRange) {
}
}

View File

@ -14,6 +14,9 @@ public final class EsperRawDriverActivityPoint {
private final String eventType;
private final String lifecycle;
private final String cardSlot;
private final String cardStatus;
private final String drivingStatus;
private final String sourceKind;
public EsperRawDriverActivityPoint(
UUID eventId,
@ -25,7 +28,10 @@ public final class EsperRawDriverActivityPoint {
UUID vehicleRegistrationId,
String eventType,
String lifecycle,
String cardSlot
String cardSlot,
String cardStatus,
String drivingStatus,
String sourceKind
) {
this.eventId = eventId;
this.occurredAt = occurredAt;
@ -37,6 +43,9 @@ public final class EsperRawDriverActivityPoint {
this.eventType = eventType;
this.lifecycle = lifecycle;
this.cardSlot = cardSlot;
this.cardStatus = cardStatus;
this.drivingStatus = drivingStatus;
this.sourceKind = sourceKind;
}
public UUID getEventId() {
@ -78,4 +87,16 @@ public final class EsperRawDriverActivityPoint {
public String getCardSlot() {
return cardSlot;
}
public String getCardStatus() {
return cardStatus;
}
public String getDrivingStatus() {
return drivingStatus;
}
public String getSourceKind() {
return sourceKind;
}
}

View File

@ -0,0 +1,15 @@
package at.procon.eventhub.esperpoc.service;
import java.time.Duration;
import java.time.OffsetDateTime;
record EsperResolvedShiftSpan(
OffsetDateTime startedAt,
OffsetDateTime endedAt,
OffsetDateTime nextShiftStartedAt,
long dailyRestingTimeSeconds
) {
long durationSeconds() {
return Duration.between(startedAt, endedAt).getSeconds();
}
}

View File

@ -0,0 +1,372 @@
package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@Repository
public class DriverIdentityRepository {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
public DriverIdentityRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
}
public UUID resolveOrCreateDriverId(
String tenantKey,
int eventSourceId,
DriverRefDto driverRef
) {
if (driverRef == null || !driverRef.hasAnyReference()) {
return null;
}
String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey");
String sourceDriverEntityId = normalizeNullable(driverRef.sourceEntityId());
DriverCardRefDto driverCard = driverRef.driverCard();
String cardNation = driverCard == null ? null : normalizeNullable(driverCard.nation());
String cardNumber = driverCard == null ? null : normalizeNullable(driverCard.number());
UUID driverId = resolveDriverId(normalizedTenantKey, eventSourceId, sourceDriverEntityId, cardNation, cardNumber);
if (driverId == null) {
Map<String, Object> payload = new LinkedHashMap<>();
put(payload, "source", "event");
put(payload, "source_driver_entity_id", sourceDriverEntityId);
put(payload, "card_nation", cardNation);
put(payload, "card_number", cardNumber);
driverId = createDriver(
normalizedTenantKey,
eventSourceId,
sourceDriverEntityId,
cardNation,
cardNumber,
null,
null,
null,
payload
);
}
touchDriver(driverId, sourceDriverEntityId, cardNation, cardNumber);
return driverId;
}
@Transactional
public int reconcileFromMasterData(String tenantKey, int eventSourceId) {
String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey");
int updates = reconcileDriversFromMasterData(normalizedTenantKey, eventSourceId);
updates += projectDriverCardsFromMasterData(normalizedTenantKey, eventSourceId);
return updates;
}
private int reconcileDriversFromMasterData(String tenantKey, int eventSourceId) {
Long count = jdbcTemplate.queryForObject(
compatibleSourcesCte() + """
, master_drivers as (
select distinct on (nullif(trim(source_entity_id), ''))
event_source_id,
nullif(trim(source_entity_id), '') as source_driver_entity_id,
nullif(trim(payload ->> 'first_names'), '') as first_names,
coalesce(
nullif(trim(payload ->> 'last_name'), ''),
nullif(trim(payload ->> 'surname'), '')
) as last_name,
cast(nullif(trim(payload ->> 'birth_date'), '') as date) as birth_date,
source_updated_at,
payload
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'DRIVER'
and nullif(trim(source_entity_id), '') is not null
and source_entity_id not like 'DRIVER_CARD:%'
order by nullif(trim(source_entity_id), ''), updated_at desc
),
updated_by_source as (
update eventhub.driver driver
set first_names = coalesce(master.first_names, driver.first_names),
last_name = coalesce(master.last_name, driver.last_name),
birth_date = coalesce(master.birth_date, driver.birth_date),
source_updated_at = master.source_updated_at,
payload = driver.payload || master.payload,
updated_at = now()
from master_drivers master
where driver.tenant_key = ?
and driver.event_source_id in (select id from compatible_sources)
and driver.source_driver_entity_id = master.source_driver_entity_id
returning driver.id
),
inserted as (
insert into eventhub.driver(
id, tenant_key, event_source_id, source_driver_entity_id,
first_names, last_name, birth_date, source_updated_at, payload, updated_at
)
select gen_random_uuid(),
?,
master.event_source_id,
master.source_driver_entity_id,
master.first_names,
master.last_name,
master.birth_date,
master.source_updated_at,
master.payload,
now()
from master_drivers master
where not exists (
select 1
from eventhub.driver existing
where existing.tenant_key = ?
and existing.event_source_id in (select id from compatible_sources)
and existing.source_driver_entity_id = master.source_driver_entity_id
)
returning id
)
select (select count(*) from updated_by_source)
+ (select count(*) from inserted)
""",
Long.class,
eventSourceId,
tenantKey,
tenantKey,
tenantKey,
tenantKey
);
return count == null ? 0 : Math.toIntExact(count);
}
private int projectDriverCardsFromMasterData(String tenantKey, int eventSourceId) {
Long count = jdbcTemplate.queryForObject(
compatibleSourcesCte() + """
, driver_card_projection as (
select distinct on (rel.to_source_entity_id)
rel.to_source_entity_id as source_driver_entity_id,
nullif(trim(card.payload ->> 'card_nation'), '') as card_nation,
nullif(trim(card.payload ->> 'card_number'), '') as card_number,
rel.source_updated_at
from eventhub.source_master_relation rel
join eventhub.source_master_entity card
on card.tenant_key = rel.tenant_key
and card.event_source_id = rel.event_source_id
and card.entity_type = 'DRIVER_CARD'
and card.source_entity_id = rel.from_source_entity_id
where rel.tenant_key = ?
and rel.event_source_id in (select id from compatible_sources)
and rel.relation_type = 'DRIVER_CARD_DRIVER'
and rel.from_entity_type = 'DRIVER_CARD'
and rel.to_entity_type = 'DRIVER'
order by rel.to_source_entity_id,
rel.valid_to desc nulls last,
rel.valid_from desc nulls last,
rel.updated_at desc
),
updated_by_source as (
update eventhub.driver driver
set card_nation = coalesce(driver.card_nation, projection.card_nation),
card_number = coalesce(driver.card_number, projection.card_number),
source_updated_at = coalesce(projection.source_updated_at, driver.source_updated_at),
updated_at = now()
from driver_card_projection projection
where driver.tenant_key = ?
and driver.event_source_id in (select id from compatible_sources)
and driver.source_driver_entity_id = projection.source_driver_entity_id
and (
(driver.card_nation is null and projection.card_nation is not null)
or (driver.card_number is null and projection.card_number is not null)
)
returning driver.id
)
select count(*)
from updated_by_source
""",
Long.class,
eventSourceId,
tenantKey,
tenantKey
);
return count == null ? 0 : Math.toIntExact(count);
}
private UUID resolveDriverId(
String tenantKey,
int eventSourceId,
String sourceDriverEntityId,
String cardNation,
String cardNumber
) {
UUID driverId = findBySourceDriverEntityId(tenantKey, eventSourceId, sourceDriverEntityId);
if (driverId == null) {
driverId = findByCard(tenantKey, eventSourceId, cardNation, cardNumber);
}
return driverId;
}
private UUID findBySourceDriverEntityId(String tenantKey, int eventSourceId, String sourceDriverEntityId) {
if (sourceDriverEntityId == null) {
return null;
}
return jdbcTemplate.query(
compatibleSourcesCte() + """
select d.id
from eventhub.driver d
where d.tenant_key = ?
and d.event_source_id in (select id from compatible_sources)
and d.source_driver_entity_id = ?
order by d.updated_at desc
limit 1
""",
rs -> rs.next() ? (UUID) rs.getObject("id") : null,
eventSourceId,
tenantKey,
sourceDriverEntityId
);
}
private UUID findByCard(String tenantKey, int eventSourceId, String cardNation, String cardNumber) {
if (cardNation == null || cardNumber == null) {
return null;
}
return jdbcTemplate.query(
compatibleSourcesCte() + """
select d.id
from eventhub.driver d
where d.tenant_key = ?
and d.event_source_id in (select id from compatible_sources)
and d.card_nation = ?
and d.card_number = ?
order by d.updated_at desc
limit 1
""",
rs -> rs.next() ? (UUID) rs.getObject("id") : null,
eventSourceId,
tenantKey,
cardNation,
cardNumber
);
}
private UUID createDriver(
String tenantKey,
int eventSourceId,
String sourceDriverEntityId,
String cardNation,
String cardNumber,
String firstNames,
String lastName,
OffsetDateTime sourceUpdatedAt,
Map<String, Object> payload
) {
UUID driverId = UUID.randomUUID();
jdbcTemplate.update(
"""
insert into eventhub.driver(
id, tenant_key, event_source_id, source_driver_entity_id,
card_nation, card_number, first_names, last_name,
source_updated_at, payload, updated_at
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
""",
driverId,
tenantKey,
eventSourceId,
sourceDriverEntityId,
cardNation,
cardNumber,
firstNames,
lastName,
sourceUpdatedAt,
toJson(payload)
);
return driverId;
}
private void touchDriver(
UUID driverId,
String sourceDriverEntityId,
String cardNation,
String cardNumber
) {
if (sourceDriverEntityId == null && cardNation == null && cardNumber == null) {
return;
}
jdbcTemplate.update(
"""
update eventhub.driver
set source_driver_entity_id = coalesce(source_driver_entity_id, cast(? as text)),
card_nation = coalesce(card_nation, cast(? as text)),
card_number = coalesce(card_number, cast(? as text)),
updated_at = now()
where id = ?
and (
(source_driver_entity_id is null and cast(? as text) is not null)
or (card_nation is null and cast(? as text) is not null)
or (card_number is null and cast(? as text) is not null)
)
""",
sourceDriverEntityId,
cardNation,
cardNumber,
driverId,
sourceDriverEntityId,
cardNation,
cardNumber
);
}
private String compatibleSourcesCte() {
return """
with source_context as (
select tenant_key, provider_key, source_instance_key, coalesce(tenant_provider_setting_key, '') as tenant_provider_setting_key
from eventhub.event_source
where id = ?
),
compatible_sources as (
select es.id
from eventhub.event_source es
join source_context ctx on ctx.tenant_key = es.tenant_key
and ctx.provider_key = es.provider_key
and ctx.source_instance_key = es.source_instance_key
and ctx.tenant_provider_setting_key = coalesce(es.tenant_provider_setting_key, '')
)
""";
}
private String toJson(Map<String, Object> value) {
try {
return objectMapper.writeValueAsString(value == null ? Map.of() : value);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Cannot serialize driver identity payload", e);
}
}
private void put(Map<String, Object> payload, String key, Object value) {
if (value != null) {
payload.put(key, value);
}
}
private String normalizeRequired(String value, String fieldName) {
String normalized = normalizeNullable(value);
if (normalized == null) {
throw new IllegalArgumentException(fieldName + " must not be blank");
}
return normalized;
}
private String normalizeNullable(String value) {
if (value == null) {
return null;
}
String trimmed = value.trim();
return trimmed.isEmpty() ? null : trimmed;
}
}

View File

@ -32,6 +32,7 @@ public class EventRepository {
private final ObjectMapper objectMapper;
private final EventAcquisitionRecordKeyService recordKeyService;
private final SourceMasterDataRepository sourceMasterDataRepository;
private final DriverIdentityRepository driverIdentityRepository;
private final VehicleIdentityRepository vehicleIdentityRepository;
public EventRepository(
@ -39,12 +40,14 @@ public class EventRepository {
ObjectMapper objectMapper,
EventAcquisitionRecordKeyService recordKeyService,
SourceMasterDataRepository sourceMasterDataRepository,
DriverIdentityRepository driverIdentityRepository,
VehicleIdentityRepository vehicleIdentityRepository
) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
this.recordKeyService = recordKeyService;
this.sourceMasterDataRepository = sourceMasterDataRepository;
this.driverIdentityRepository = driverIdentityRepository;
this.vehicleIdentityRepository = vehicleIdentityRepository;
}
@ -89,7 +92,7 @@ public class EventRepository {
packageId,
eventSourceId,
event.externalSourceEventId(),
refs.driverEntityId(),
refs.driverId(),
refs.vehicleId(),
refs.vehicleRegistrationId(),
sourcePackageId,
@ -121,7 +124,7 @@ public class EventRepository {
data_package_id uuid not null,
event_source_id integer not null,
external_source_event_id text not null,
driver_entity_id uuid,
driver_id uuid,
vehicle_id uuid,
vehicle_registration_id uuid,
source_package_id text,
@ -149,7 +152,7 @@ public class EventRepository {
"""
insert into eventhub_event_import_stage(
row_no, source_record_key_hash, requested_event_id, data_package_id, event_source_id,
external_source_event_id, driver_entity_id, vehicle_id, vehicle_registration_id,
external_source_event_id, driver_id, vehicle_id, vehicle_registration_id,
source_package_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle, odometer_m, longitude, latitude,
payload, manual_entry, event_signature_hash
@ -165,7 +168,7 @@ public class EventRepository {
ps.setObject(4, row.packageId());
ps.setInt(5, row.eventSourceId());
ps.setString(6, row.externalSourceEventId());
ps.setObject(7, row.driverEntityId());
ps.setObject(7, row.driverId());
ps.setObject(8, row.vehicleId());
ps.setObject(9, row.vehicleRegistrationId());
ps.setString(10, row.sourcePackageId());
@ -233,7 +236,7 @@ public class EventRepository {
insert into eventhub.event(
id, event_source_id, data_package_id,
external_source_event_id,
driver_entity_id, vehicle_id, vehicle_registration_id,
driver_id, vehicle_id, vehicle_registration_id,
source_package_id, source_package_entity_id,
occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle,
@ -244,7 +247,7 @@ public class EventRepository {
select
source_record.event_id, stage.event_source_id, stage.data_package_id,
stage.external_source_event_id,
stage.driver_entity_id, stage.vehicle_id, stage.vehicle_registration_id,
stage.driver_id, stage.vehicle_id, stage.vehicle_registration_id,
stage.source_package_id, stage.source_package_entity_id,
source_record.event_occurred_at, stage.received_partner_at, stage.received_hub_at,
stage.event_domain, stage.event_type, stage.lifecycle,
@ -357,13 +360,13 @@ public class EventRepository {
Map<String, UUID> entityIdCache,
Map<String, List<VehicleRefCacheEntry>> vehicleRefCache
) {
UUID driverEntityId = resolveDriverEntityId(tenantKey, eventSourceId, event, entityIdCache);
UUID driverId = resolveDriverId(tenantKey, eventSourceId, event, entityIdCache);
ResolvedVehicleReference vehicleRef = resolveVehicleReference(tenantKey, eventSourceId, event, vehicleRefCache);
UUID sourcePackageEntityId = resolveSourcePackageEntityId(tenantKey, eventSourceId, event, entityIdCache);
return new ResolvedEntityRefs(driverEntityId, vehicleRef.vehicleId(), vehicleRef.vehicleRegistrationId(), sourcePackageEntityId);
return new ResolvedEntityRefs(driverId, vehicleRef.vehicleId(), vehicleRef.vehicleRegistrationId(), sourcePackageEntityId);
}
private UUID resolveDriverEntityId(
private UUID resolveDriverId(
String tenantKey,
int eventSourceId,
EventHubEventDto event,
@ -373,32 +376,16 @@ public class EventRepository {
if (driverRef == null || !driverRef.hasAnyReference()) {
return null;
}
DriverCardRefDto card = driverRef.driverCard();
String cardKey = card == null || !card.hasValue() ? null : card.stableKey();
String sourceEntityId = normalizeNullable(driverRef.sourceEntityId());
if (sourceEntityId == null && cardKey != null) {
sourceEntityId = "DRIVER_CARD:" + cardKey;
String cacheKey = "DRIVER|" + driverRef.stableKey();
UUID cached = entityIdCache.get(cacheKey);
if (cached != null) {
return cached;
}
if (sourceEntityId == null) {
return null;
UUID resolved = driverIdentityRepository.resolveOrCreateDriverId(tenantKey, eventSourceId, driverRef);
if (resolved != null) {
entityIdCache.put(cacheKey, resolved);
}
Map<String, Object> payload = new LinkedHashMap<>();
put(payload, "source_entity_id", driverRef.sourceEntityId());
put(payload, "driver_card_nation", card == null ? null : card.nation());
put(payload, "driver_card_number", card == null ? null : card.number());
return resolveEntityId(
tenantKey,
eventSourceId,
"DRIVER",
sourceEntityId,
cardKey,
sourceEntityId,
null,
payload,
entityIdCache
);
return resolved;
}
private ResolvedVehicleReference resolveVehicleReference(
@ -599,7 +586,7 @@ public class EventRepository {
}
private record ResolvedEntityRefs(
UUID driverEntityId,
UUID driverId,
UUID vehicleId,
UUID vehicleRegistrationId,
UUID sourcePackageEntityId
@ -618,7 +605,7 @@ public class EventRepository {
UUID packageId,
int eventSourceId,
String externalSourceEventId,
UUID driverEntityId,
UUID driverId,
UUID vehicleId,
UUID vehicleRegistrationId,
String sourcePackageId,

View File

@ -6,6 +6,7 @@ import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.persistence.EventSourceRepository;
import at.procon.eventhub.persistence.SourceMasterDataRepository;
import at.procon.eventhub.persistence.DriverIdentityRepository;
import at.procon.eventhub.persistence.VehicleIdentityRepository;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.io.IOException;
@ -52,6 +53,7 @@ public class TachographMasterDataRefreshService {
private final ObjectProvider<NamedParameterJdbcTemplate> tachographJdbcTemplateProvider;
private final SourceMasterDataRepository sourceMasterDataRepository;
private final EventSourceRepository eventSourceRepository;
private final DriverIdentityRepository driverIdentityRepository;
private final VehicleIdentityRepository vehicleIdentityRepository;
private final ResourceLoader resourceLoader;
@ -59,12 +61,14 @@ public class TachographMasterDataRefreshService {
@Qualifier("tachographNamedParameterJdbcTemplate") ObjectProvider<NamedParameterJdbcTemplate> tachographJdbcTemplateProvider,
SourceMasterDataRepository sourceMasterDataRepository,
EventSourceRepository eventSourceRepository,
DriverIdentityRepository driverIdentityRepository,
VehicleIdentityRepository vehicleIdentityRepository,
ResourceLoader resourceLoader
) {
this.tachographJdbcTemplateProvider = tachographJdbcTemplateProvider;
this.sourceMasterDataRepository = sourceMasterDataRepository;
this.eventSourceRepository = eventSourceRepository;
this.driverIdentityRepository = driverIdentityRepository;
this.vehicleIdentityRepository = vehicleIdentityRepository;
this.resourceLoader = resourceLoader;
}
@ -98,13 +102,17 @@ public class TachographMasterDataRefreshService {
int relationCount = streamRelations(tachographJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE));
log.info("Reconciling tachograph driver identities from source master data tenant={} source={}",
tenantKey, masterDataSource.stableKey());
int reconciledDrivers = driverIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
log.info("Reconciling tachograph vehicle identities from source master data tenant={} source={}",
tenantKey, masterDataSource.stableKey());
int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount);
log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={} reconciledVehicles={}",
tenantKey, masterDataSource.stableKey(), result.entitiesUpserted(), result.relationsUpserted(), reconciledVehicles);
log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={} reconciledDrivers={} reconciledVehicles={}",
tenantKey, masterDataSource.stableKey(), result.entitiesUpserted(), result.relationsUpserted(), reconciledDrivers, reconciledVehicles);
return result;
}

View File

@ -63,7 +63,7 @@ eventhub:
# Enables the scheduler that regularly triggers configured tachograph import plans.
# Default is safe: no scheduled import starts unless explicitly enabled.
scheduler-enabled: false
scheduler-enabled: true
scheduler-poll-interval-ms: 3600000
# PLAN_ONLY creates import_run + planned extraction packages.
@ -77,10 +77,10 @@ eventhub:
# Example plan. Keep disabled until the tachograph datasource/extractor is wired.
import-plans:
- plan-key: kralowetz-tachograph-org-147
enabled: false
- plan-key: tachograph-org-14708
enabled: true
cron: "0 15 * * * *" # hourly at minute 15
tenant-key:
tenant-key: Procon
event-source:
provider-key: TACHOGRAPH
source-kind: MIXED
@ -89,16 +89,16 @@ eventhub:
tenant-provider-setting-key: ByteBar-DriverSettlement
source-group:
type: ORGANISATION
source-entity-id: "147"
code: "147"
name: Kralowetz root organisation
source-entity-id: "14708"
code: "14708"
name: Zeller root organisation
import-scope:
type: SOURCE_ORGANISATION_SUBTREE
root-source-organisation:
type: ORGANISATION
source-entity-id: "147"
code: "147"
name: Kralowetz root organisation
source-entity-id: "14708"
code: "14708"
name: Zeller root organisation
include-children: true
occurred-from: null
occurred-to: null
@ -115,11 +115,15 @@ eventhub:
scheduled-mode: INCREMENTAL_UPDATE
initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP
scheduled-strategy: SOURCE_PACKAGE_WATERMARK
refresh-master-data-first: false
refresh-master-data-first: true
initial-occurred-from: "2026-01-21T00:00:00+01:00"
initial-occurred-to: "2026-01-31T00:00:00+01:00"
initial-occurred-to:
run-initial-on-startup: true
esper-poc:
activity-merge-mode: JAVA
shift-resolution-mode: JAVA
yellow-fox:
default-chunk-days: 1
occurred-at-overlap: 2h

View File

@ -0,0 +1,137 @@
create table if not exists eventhub.driver (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_driver_entity_id text,
card_nation text,
card_number text,
first_names text,
last_name text,
birth_date date,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint chk_driver_card_nation_when_number
check (card_number is null or card_nation is not null)
);
insert into eventhub.driver(
id, tenant_key, event_source_id, source_driver_entity_id,
card_nation, card_number, first_names, last_name, birth_date,
source_updated_at, payload, created_at, updated_at
)
select gen_random_uuid(),
sme.tenant_key,
sme.event_source_id,
case
when sme.source_entity_id like 'DRIVER_CARD:%' then null
else sme.source_entity_id
end as source_driver_entity_id,
coalesce(
nullif(trim(sme.payload ->> 'card_nation'), ''),
nullif(trim(sme.payload ->> 'driver_card_nation'), ''),
nullif(split_part(case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else null end, ':', 1), '')
) as card_nation,
coalesce(
nullif(trim(sme.payload ->> 'card_number'), ''),
nullif(trim(sme.payload ->> 'driver_card_number'), ''),
nullif(substring(case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else null end from position(':' in case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else '' end) + 1), '')
) as card_number,
coalesce(
nullif(trim(sme.payload ->> 'first_names'), ''),
nullif(trim(sme.payload ->> 'firstnames'), '')
) as first_names,
coalesce(
nullif(trim(sme.payload ->> 'last_name'), ''),
nullif(trim(sme.payload ->> 'surname'), '')
) as last_name,
cast(nullif(trim(sme.payload ->> 'birth_date'), '') as date) as birth_date,
sme.source_updated_at,
sme.payload,
sme.created_at,
sme.updated_at
from eventhub.source_master_entity sme
where sme.entity_type = 'DRIVER'
and not exists (
select 1
from eventhub.driver existing
where existing.tenant_key = sme.tenant_key
and existing.event_source_id = sme.event_source_id
and existing.source_driver_entity_id is not distinct from case
when sme.source_entity_id like 'DRIVER_CARD:%' then null
else sme.source_entity_id
end
and existing.card_nation is not distinct from coalesce(
nullif(trim(sme.payload ->> 'card_nation'), ''),
nullif(trim(sme.payload ->> 'driver_card_nation'), ''),
nullif(split_part(case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else null end, ':', 1), '')
)
and existing.card_number is not distinct from coalesce(
nullif(trim(sme.payload ->> 'card_number'), ''),
nullif(trim(sme.payload ->> 'driver_card_number'), ''),
nullif(substring(case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else null end from position(':' in case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else '' end) + 1), '')
)
);
alter table eventhub.event
add column if not exists driver_id uuid references eventhub.driver(id);
with source_driver_map as (
select distinct on (sme.id)
sme.id as source_master_entity_id,
driver.id as driver_id
from eventhub.source_master_entity sme
join eventhub.driver driver
on driver.tenant_key = sme.tenant_key
and driver.event_source_id = sme.event_source_id
and (
(
sme.source_entity_id not like 'DRIVER_CARD:%'
and driver.source_driver_entity_id = sme.source_entity_id
) or (
sme.source_entity_id like 'DRIVER_CARD:%'
and driver.card_nation is not distinct from coalesce(
nullif(trim(sme.payload ->> 'card_nation'), ''),
nullif(trim(sme.payload ->> 'driver_card_nation'), ''),
nullif(split_part(substring(sme.source_entity_id from length('DRIVER_CARD:') + 1), ':', 1), '')
)
and driver.card_number is not distinct from coalesce(
nullif(trim(sme.payload ->> 'card_number'), ''),
nullif(trim(sme.payload ->> 'driver_card_number'), ''),
nullif(substring(substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) from position(':' in substring(sme.source_entity_id from length('DRIVER_CARD:') + 1)) + 1), '')
)
)
)
where sme.entity_type = 'DRIVER'
order by sme.id, driver.updated_at desc
)
update eventhub.event event
set driver_id = map.driver_id
from source_driver_map map
where event.driver_entity_id = map.source_master_entity_id
and event.driver_id is null;
create index if not exists idx_driver_source_entity
on eventhub.driver(tenant_key, event_source_id, source_driver_entity_id)
where source_driver_entity_id is not null;
create index if not exists idx_driver_card
on eventhub.driver(tenant_key, event_source_id, card_nation, card_number)
where card_number is not null;
create index if not exists idx_event_driver_time
on eventhub.event(driver_id, occurred_at desc)
where driver_id is not null;
alter table eventhub.event
drop constraint if exists chk_event_driver_or_vehicle_ref;
alter table eventhub.event
add constraint chk_event_driver_or_vehicle_ref
check (
driver_id is not null
or driver_entity_id is not null
or vehicle_id is not null
or vehicle_registration_id is not null
);

View File

@ -0,0 +1,239 @@
/*
* Repairs and normalizes tachograph driver aggregates after introducing eventhub.driver.
*
* What it does:
* 1. Ensures tachograph DRIVER master-data payload carries last_name while keeping source_master_entity.display_name unchanged.
* 2. Upserts eventhub.driver rows from MASTER_DATA DRIVER entities.
* 3. Projects card nation/number onto eventhub.driver from DRIVER_CARD_DRIVER relations.
* 4. Remaps event.driver_id from provisional card-only drivers to proper source-driver aggregates when possible.
* 5. Deletes now-unreferenced provisional tachograph driver rows with no source_driver_entity_id.
*
* Assumptions:
* - Tachograph master-data source is provider_key=TACHOGRAPH, source_kind=MASTER_DATA, source_key=TACHOGRAPH_MASTER_DATA.
* - eventhub.driver and event.driver_id already exist.
*/
-- 1) Keep display_name, but ensure DRIVER payload has last_name.
with master_sources as (
select es.id, es.tenant_key
from eventhub.event_source es
where es.provider_key = 'TACHOGRAPH'
and es.source_kind = 'MASTER_DATA'
and es.source_key = 'TACHOGRAPH_MASTER_DATA'
),
updated_master_payload as (
update eventhub.source_master_entity sme
set payload = jsonb_strip_nulls(
sme.payload
|| jsonb_build_object(
'first_names', coalesce(sme.payload ->> 'first_names', sme.payload ->> 'firstnames'),
'last_name', coalesce(sme.payload ->> 'last_name', sme.payload ->> 'surname')
)
),
updated_at = now()
from master_sources ms
where sme.tenant_key = ms.tenant_key
and sme.event_source_id = ms.id
and sme.entity_type = 'DRIVER'
returning sme.id
)
select count(*) as updated_master_payload
from updated_master_payload;
-- 2) Upsert driver aggregates from tachograph master data.
with master_sources as (
select es.id,
es.tenant_key,
es.source_instance_key,
coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key
from eventhub.event_source es
where es.provider_key = 'TACHOGRAPH'
and es.source_kind = 'MASTER_DATA'
and es.source_key = 'TACHOGRAPH_MASTER_DATA'
),
master_drivers as (
select ms.id as master_event_source_id,
ms.tenant_key,
ms.source_instance_key,
ms.tenant_provider_setting_key,
d.source_entity_id as source_driver_entity_id,
coalesce(nullif(trim(d.payload ->> 'first_names'), ''), nullif(trim(d.payload ->> 'firstnames'), '')) as first_names,
coalesce(nullif(trim(d.payload ->> 'last_name'), ''), nullif(trim(d.payload ->> 'surname'), '')) as last_name,
cast(nullif(trim(d.payload ->> 'birth_date'), '') as date) as birth_date,
d.source_updated_at,
d.payload
from master_sources ms
join eventhub.source_master_entity d
on d.tenant_key = ms.tenant_key
and d.event_source_id = ms.id
and d.entity_type = 'DRIVER'
and d.source_entity_id not like 'DRIVER_CARD:%'
),
compatible_targets as (
select md.*,
es.id as target_event_source_id
from master_drivers md
join eventhub.event_source es
on es.tenant_key = md.tenant_key
and es.provider_key = 'TACHOGRAPH'
and es.source_instance_key = md.source_instance_key
and coalesce(es.tenant_provider_setting_key, '') = md.tenant_provider_setting_key
),
updated_drivers as (
update eventhub.driver driver
set first_names = coalesce(ct.first_names, driver.first_names),
last_name = coalesce(ct.last_name, driver.last_name),
birth_date = coalesce(ct.birth_date, driver.birth_date),
source_updated_at = ct.source_updated_at,
payload = driver.payload || ct.payload,
updated_at = now()
from compatible_targets ct
where driver.tenant_key = ct.tenant_key
and driver.event_source_id = ct.target_event_source_id
and driver.source_driver_entity_id = ct.source_driver_entity_id
returning driver.id
),
inserted_drivers as (
insert into eventhub.driver(
id, tenant_key, event_source_id, source_driver_entity_id,
first_names, last_name, birth_date, source_updated_at, payload, updated_at
)
select gen_random_uuid(),
ct.tenant_key,
ct.target_event_source_id,
ct.source_driver_entity_id,
ct.first_names,
ct.last_name,
ct.birth_date,
ct.source_updated_at,
ct.payload,
now()
from compatible_targets ct
where not exists (
select 1
from eventhub.driver existing
where existing.tenant_key = ct.tenant_key
and existing.event_source_id = ct.target_event_source_id
and existing.source_driver_entity_id = ct.source_driver_entity_id
)
returning id
)
select (select count(*) from updated_drivers) as updated_drivers,
(select count(*) from inserted_drivers) as inserted_drivers;
-- 3) Project driver-card identifiers from master-data relations.
with master_sources as (
select es.id,
es.tenant_key,
es.source_instance_key,
coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key
from eventhub.event_source es
where es.provider_key = 'TACHOGRAPH'
and es.source_kind = 'MASTER_DATA'
and es.source_key = 'TACHOGRAPH_MASTER_DATA'
),
card_projection as (
select distinct on (ms.tenant_key, ms.source_instance_key, ms.tenant_provider_setting_key, rel.to_source_entity_id)
ms.tenant_key,
ms.source_instance_key,
ms.tenant_provider_setting_key,
rel.to_source_entity_id as source_driver_entity_id,
nullif(trim(card.payload ->> 'card_nation'), '') as card_nation,
nullif(trim(card.payload ->> 'card_number'), '') as card_number,
rel.source_updated_at
from master_sources ms
join eventhub.source_master_relation rel
on rel.tenant_key = ms.tenant_key
and rel.event_source_id = ms.id
and rel.relation_type = 'DRIVER_CARD_DRIVER'
and rel.from_entity_type = 'DRIVER_CARD'
and rel.to_entity_type = 'DRIVER'
join eventhub.source_master_entity card
on card.tenant_key = ms.tenant_key
and card.event_source_id = ms.id
and card.entity_type = 'DRIVER_CARD'
and card.source_entity_id = rel.from_source_entity_id
order by ms.tenant_key,
ms.source_instance_key,
ms.tenant_provider_setting_key,
rel.to_source_entity_id,
rel.valid_to desc nulls last,
rel.valid_from desc nulls last,
rel.updated_at desc
),
updated_driver_cards as (
update eventhub.driver driver
set card_nation = coalesce(driver.card_nation, projection.card_nation),
card_number = coalesce(driver.card_number, projection.card_number),
source_updated_at = coalesce(projection.source_updated_at, driver.source_updated_at),
updated_at = now()
from card_projection projection
join eventhub.event_source es
on es.id = driver.event_source_id
where driver.tenant_key = projection.tenant_key
and es.provider_key = 'TACHOGRAPH'
and es.source_instance_key = projection.source_instance_key
and coalesce(es.tenant_provider_setting_key, '') = projection.tenant_provider_setting_key
and driver.source_driver_entity_id = projection.source_driver_entity_id
and (
(driver.card_nation is null and projection.card_nation is not null)
or (driver.card_number is null and projection.card_number is not null)
)
returning driver.id
)
select count(*) as updated_driver_cards
from updated_driver_cards;
-- 4) Remap events from provisional card-only drivers to proper source-driver aggregates.
with provisional_to_real as (
select provisional.id as provisional_driver_id,
real.id as real_driver_id
from eventhub.driver provisional
join eventhub.event_source provisional_source
on provisional_source.id = provisional.event_source_id
and provisional_source.provider_key = 'TACHOGRAPH'
join eventhub.driver real
on real.tenant_key = provisional.tenant_key
and real.source_driver_entity_id is not null
and real.card_nation = provisional.card_nation
and real.card_number = provisional.card_number
join eventhub.event_source real_source
on real_source.id = real.event_source_id
and real_source.provider_key = provisional_source.provider_key
and real_source.tenant_key = provisional_source.tenant_key
and real_source.source_instance_key = provisional_source.source_instance_key
and coalesce(real_source.tenant_provider_setting_key, '') = coalesce(provisional_source.tenant_provider_setting_key, '')
where provisional.source_driver_entity_id is null
and provisional.card_nation is not null
and provisional.card_number is not null
and provisional.id <> real.id
),
updated_events as (
update eventhub.event e
set driver_id = map.real_driver_id
from provisional_to_real map
where e.driver_id = map.provisional_driver_id
and e.driver_id <> map.real_driver_id
returning e.id
)
select count(*) as remapped_events
from updated_events;
-- 5) Delete now-unreferenced provisional tachograph driver rows.
with deleted_drivers as (
delete from eventhub.driver driver
using eventhub.event_source es
where es.id = driver.event_source_id
and es.provider_key = 'TACHOGRAPH'
and driver.source_driver_entity_id is null
and driver.card_nation is not null
and driver.card_number is not null
and not exists (
select 1
from eventhub.event e
where e.driver_id = driver.id
)
returning driver.id
)
select count(*) as deleted_provisional_drivers
from deleted_drivers;

View File

@ -9,6 +9,7 @@ select
d.LastUpdate as source_updated_at,
d.ID as driver_id,
d.Surname as surname,
d.Surname as last_name,
d.Firstnames as first_names,
d.Birthdate as birth_date,
d.BirthPlace as birth_place,

View File

@ -45,6 +45,8 @@ class EsperDriverActivityEngineTest {
OffsetDateTime.parse(occurredAt),
rowId,
"TACHOGRAPH:CARD_ACTIVITY:" + rowId + ":" + lifecycle,
"DRIVER_CARD",
"CARD_ACTIVITY",
driverId,
vehicleId,
null,

View File

@ -3,7 +3,10 @@ package at.procon.eventhub.esperpoc.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
@ -11,7 +14,8 @@ import org.junit.jupiter.api.Test;
class EsperPocDriverCardActivityServiceTest {
private final EsperPocDriverCardActivityService service = new EsperPocDriverCardActivityService(null, null);
private final EsperDriverActivityEngine engine = new EsperDriverActivityEngine();
private final EsperPocDriverCardActivityService service = new EsperPocDriverCardActivityService(null, engine);
@Test
void mergesIdenticalActivitiesAcrossUtcMidnight() {
@ -22,6 +26,9 @@ class EsperPocDriverCardActivityServiceTest {
null,
"BREAK_REST",
"DRIVER",
"INSERTED",
"SINGLE",
"DRIVER_CARD",
OffsetDateTime.parse("2026-04-30T23:50:00Z"),
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
"1"
@ -32,6 +39,9 @@ class EsperPocDriverCardActivityServiceTest {
null,
"BREAK_REST",
"DRIVER",
"INSERTED",
"SINGLE",
"DRIVER_CARD",
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-01T00:20:00Z"),
"2"
@ -39,7 +49,7 @@ class EsperPocDriverCardActivityServiceTest {
var merged = service.mergeConsecutiveIdenticalActivities(
List.of(beforeMidnight, afterMidnight),
java.time.Duration.ZERO
Duration.ZERO
);
assertThat(merged).hasSize(1);
@ -52,25 +62,16 @@ class EsperPocDriverCardActivityServiceTest {
@Test
void splitsOperatingPeriodsByBreakRestLongerThanConfiguredHoursAndEvaluatesDepartureArrivalPerPeriod() {
UUID driverId = UUID.randomUUID();
EsperPocRequest request = new EsperPocRequest(
"default",
driverId,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
OffsetDateTime.parse("2026-04-03T00:00:00Z"),
24,
3,
60,
7
);
EsperPocRequest request = request(driverId, null, null);
List<ActivityIntervalDto> activities = List.of(
activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T08:00:00Z", "d1"),
activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1"),
activity(driverId, "DRIVE", "2026-04-01T09:30:00Z", "2026-04-01T11:00:00Z", "d2"),
activity(driverId, "BREAK_REST", "2026-04-01T20:00:00Z", "2026-04-02T04:30:00Z", "r1"),
activity(driverId, "DRIVE", "2026-04-02T05:00:00Z", "2026-04-02T07:00:00Z", "d3"),
activity(driverId, "BREAK_REST", "2026-04-02T10:00:00Z", "2026-04-02T10:30:00Z", "r2"),
activity(driverId, "DRIVE", "2026-04-02T10:30:00Z", "2026-04-02T12:00:00Z", "d4")
activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T08:00:00Z", "d1", "DRIVER_CARD"),
activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"),
activity(driverId, "DRIVE", "2026-04-01T09:30:00Z", "2026-04-01T11:00:00Z", "d2", "DRIVER_CARD"),
activity(driverId, "BREAK_REST", "2026-04-01T20:00:00Z", "2026-04-02T04:30:00Z", "r1", "DRIVER_CARD"),
activity(driverId, "DRIVE", "2026-04-02T05:00:00Z", "2026-04-02T07:00:00Z", "d3", "DRIVER_CARD"),
activity(driverId, "BREAK_REST", "2026-04-02T10:00:00Z", "2026-04-02T10:30:00Z", "r2", "DRIVER_CARD"),
activity(driverId, "DRIVE", "2026-04-02T10:30:00Z", "2026-04-02T12:00:00Z", "d4", "DRIVER_CARD")
);
var periods = service.buildOperatingTimePeriods(
@ -99,13 +100,135 @@ class EsperPocDriverCardActivityServiceTest {
assertThat(periods.get(1).workingOperationTimes().breakRestSeconds()).isEqualTo(30 * 60L);
}
private ActivityIntervalDto activity(UUID driverId, String activity, String from, String to, String sourceRowId) {
@Test
void usesVuIntervalsOnlyForUncoveredDriverCardGaps() {
UUID driverId = UUID.randomUUID();
List<ActivityIntervalDto> resolved = service.resolveVuFillGaps(
List.of(
activity(driverId, "DRIVE", "2026-04-01T08:00:00Z", "2026-04-01T10:00:00Z", "c1", "DRIVER_CARD")
),
List.of(
activity(driverId, "DRIVE", "2026-04-01T09:00:00Z", "2026-04-01T11:00:00Z", "v1", "VEHICLE_UNIT")
)
);
assertThat(resolved).hasSize(2);
assertThat(resolved.get(0).sourceKind()).isEqualTo("DRIVER_CARD");
assertThat(resolved.get(1).sourceKind()).isEqualTo("VEHICLE_UNIT");
assertThat(resolved.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T10:00:00Z"));
assertThat(resolved.get(1).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z"));
}
@Test
void resolvesTwoWorkingShiftsAcrossLongRestGap() {
UUID driverId = UUID.randomUUID();
EsperPocRequest request = request(driverId, null, null);
var shifts = service.buildResolvedWorkShifts(
request,
request.occurredFrom(),
request.occurredTo(),
List.of(
activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T10:00:00Z", "d1", "DRIVER_CARD"),
activity(driverId, "BREAK_REST", "2026-04-01T10:00:00Z", "2026-04-01T17:30:00Z", "r1", "DRIVER_CARD"),
activity(driverId, "WORK", "2026-04-01T17:30:00Z", "2026-04-01T19:00:00Z", "w1", "VEHICLE_UNIT")
)
);
assertThat(shifts).hasSize(2);
assertThat(shifts.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T06:00:00Z"));
assertThat(shifts.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T10:00:00Z"));
assertThat(shifts.get(0).dailyRestingTimeSeconds()).isEqualTo(7L * 60L * 60L + 30L * 60L);
assertThat(shifts.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T17:30:00Z"));
assertThat(shifts.get(1).usedDataKind()).isEqualTo("VEHICLE_UNIT");
}
@Test
void esperMergeModeMatchesJavaMergeMode() {
UUID driverId = UUID.randomUUID();
List<ActivityIntervalDto> activities = List.of(
activity(driverId, "BREAK_REST", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "r1", "DRIVER_CARD"),
activity(driverId, "BREAK_REST", "2026-04-01T09:00:00Z", "2026-04-01T10:00:00Z", "r2", "DRIVER_CARD"),
activity(driverId, "WORK", "2026-04-01T10:05:00Z", "2026-04-01T11:00:00Z", "w1", "DRIVER_CARD")
);
List<ActivityIntervalDto> javaMerged = service.mergeConsecutiveIdenticalActivities(
activities,
Duration.ofSeconds(60)
);
List<ActivityIntervalDto> esperMerged = engine.mergeConsecutiveIdenticalActivities(
activities,
Duration.ofSeconds(60)
);
assertThat(esperMerged).usingRecursiveComparison().isEqualTo(javaMerged);
}
@Test
void esperShiftResolutionModeMatchesJavaShiftResolutionMode() {
UUID driverId = UUID.randomUUID();
EsperPocRequest javaRequest = request(driverId, EsperActivityMergeMode.JAVA, EsperShiftResolutionMode.JAVA);
EsperPocRequest esperRequest = request(driverId, EsperActivityMergeMode.JAVA, EsperShiftResolutionMode.ESPER);
List<ActivityIntervalDto> intervals = List.of(
activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T10:00:00Z", "d1", "DRIVER_CARD"),
activity(driverId, "BREAK_REST", "2026-04-01T10:00:00Z", "2026-04-01T17:30:00Z", "r1", "DRIVER_CARD"),
activity(driverId, "WORK", "2026-04-01T17:30:00Z", "2026-04-01T19:00:00Z", "w1", "VEHICLE_UNIT")
);
var javaShifts = service.buildResolvedWorkShifts(
javaRequest,
javaRequest.occurredFrom(),
javaRequest.occurredTo(),
intervals
);
var esperShifts = service.buildResolvedWorkShifts(
esperRequest,
esperRequest.occurredFrom(),
esperRequest.occurredTo(),
intervals
);
assertThat(esperShifts).usingRecursiveComparison().isEqualTo(javaShifts);
}
private EsperPocRequest request(
UUID driverId,
EsperActivityMergeMode activityMergeMode,
EsperShiftResolutionMode shiftResolutionMode
) {
return new EsperPocRequest(
"default",
driverId,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
OffsetDateTime.parse("2026-04-03T00:00:00Z"),
24,
3,
60,
7,
420,
5,
activityMergeMode,
shiftResolutionMode
);
}
private ActivityIntervalDto activity(
UUID driverId,
String activity,
String from,
String to,
String sourceRowId,
String sourceKind
) {
return ActivityIntervalDto.raw(
driverId,
null,
null,
activity,
"DRIVER",
"INSERTED",
"KNOWN",
sourceKind,
OffsetDateTime.parse(from),
OffsetDateTime.parse(to),
sourceRowId