From 818009555a47e44414877e7d856bba225ac0417a Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Wed, 6 May 2026 09:02:38 +0200 Subject: [PATCH] Add configurable Esper activity pipeline and driver identity model --- .../eventhub/config/EventHubProperties.java | 28 + .../esperpoc/api/EsperPocController.java | 14 +- .../esperpoc/dto/ActivityIntervalDto.java | 15 + .../esperpoc/dto/EsperActivityMergeMode.java | 6 + .../esperpoc/dto/EsperPocRequest.java | 8 +- .../esperpoc/dto/EsperPocResultDto.java | 10 + .../dto/EsperShiftResolutionMode.java | 6 + .../esperpoc/dto/RawActivityEventDto.java | 2 + .../esperpoc/dto/ResolvedWorkShiftDto.java | 40 + .../EsperPocActivityRepository.java | 24 +- .../service/EsperActivityIntervalEvent.java | 24 + .../service/EsperActivitySemantics.java | 54 ++ .../service/EsperDriverActivityEngine.java | 315 ++++++- .../EsperPocDriverCardActivityService.java | 781 ++++++++++++++++-- .../service/EsperRawDriverActivityPoint.java | 23 +- .../service/EsperResolvedShiftSpan.java | 15 + .../persistence/DriverIdentityRepository.java | 372 +++++++++ .../eventhub/persistence/EventRepository.java | 57 +- .../TachographMasterDataRefreshService.java | 12 +- src/main/resources/application.yml | 28 +- .../V9__introduce_driver_identity_model.sql | 137 +++ .../repair-tachograph-driver-entities.sql | 239 ++++++ .../sql/tachograph/master-data/drivers.sql | 1 + .../EsperDriverActivityEngineTest.java | 2 + ...EsperPocDriverCardActivityServiceTest.java | 163 +++- 25 files changed, 2197 insertions(+), 179 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/EsperActivityMergeMode.java create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/EsperShiftResolutionMode.java create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/ResolvedWorkShiftDto.java create mode 100644 src/main/java/at/procon/eventhub/esperpoc/service/EsperActivityIntervalEvent.java create mode 100644 src/main/java/at/procon/eventhub/esperpoc/service/EsperActivitySemantics.java create mode 100644 src/main/java/at/procon/eventhub/esperpoc/service/EsperResolvedShiftSpan.java create mode 100644 src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java create mode 100644 src/main/resources/db/migration/V9__introduce_driver_identity_model.sql create mode 100644 src/main/resources/sql/maintenance/repair-tachograph-driver-entities.sql diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index 4a0e690..c2c9700 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -1,5 +1,7 @@ package at.procon.eventhub.config; +import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode; +import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode; import java.time.Duration; import java.time.OffsetDateTime; import java.util.ArrayList; @@ -26,6 +28,7 @@ public class EventHubProperties { private final Batch batch = new Batch(); private final Tachograph tachograph = new Tachograph(); + private final EsperPoc esperPoc = new EsperPoc(); private final YellowFox yellowFox = new YellowFox(); public Batch getBatch() { @@ -36,10 +39,35 @@ public class EventHubProperties { return tachograph; } + public EsperPoc getEsperPoc() { + return esperPoc; + } + public YellowFox getYellowFox() { return yellowFox; } + public static class EsperPoc { + private EsperActivityMergeMode activityMergeMode = EsperActivityMergeMode.JAVA; + private EsperShiftResolutionMode shiftResolutionMode = EsperShiftResolutionMode.JAVA; + + public EsperActivityMergeMode getActivityMergeMode() { + return activityMergeMode; + } + + public void setActivityMergeMode(EsperActivityMergeMode activityMergeMode) { + this.activityMergeMode = activityMergeMode == null ? EsperActivityMergeMode.JAVA : activityMergeMode; + } + + public EsperShiftResolutionMode getShiftResolutionMode() { + return shiftResolutionMode; + } + + public void setShiftResolutionMode(EsperShiftResolutionMode shiftResolutionMode) { + this.shiftResolutionMode = shiftResolutionMode == null ? EsperShiftResolutionMode.JAVA : shiftResolutionMode; + } + } + public static class Batch { /** Number of events collected before a package is persisted. */ private int completionSize = 5000; diff --git a/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java b/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java index 7724045..4e0c97a 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java +++ b/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java @@ -1,7 +1,9 @@ package at.procon.eventhub.esperpoc.api; +import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode; import at.procon.eventhub.esperpoc.dto.EsperPocRequest; import at.procon.eventhub.esperpoc.dto.EsperPocResultDto; +import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode; import at.procon.eventhub.esperpoc.service.EsperPocDriverCardActivityService; import java.time.OffsetDateTime; import java.util.UUID; @@ -31,7 +33,11 @@ public class EsperPocController { @RequestParam(defaultValue = "24") Integer guardHours, @RequestParam(defaultValue = "3") Integer significantDrivingMinutes, @RequestParam(defaultValue = "60") Integer mergeGapSeconds, - @RequestParam(defaultValue = "7") Integer operatingPeriodSplitRestHours + @RequestParam(defaultValue = "7") Integer operatingPeriodSplitRestHours, + @RequestParam(defaultValue = "420") Integer shiftEndMarkPeriodMinutes, + @RequestParam(defaultValue = "5") Integer absenceBeginEndMinActivityMinutes, + @RequestParam(required = false) EsperActivityMergeMode activityMergeMode, + @RequestParam(required = false) EsperShiftResolutionMode shiftResolutionMode ) { EsperPocRequest request = new EsperPocRequest( tenantKey, @@ -41,7 +47,11 @@ public class EsperPocController { guardHours, significantDrivingMinutes, mergeGapSeconds, - operatingPeriodSplitRestHours + operatingPeriodSplitRestHours, + shiftEndMarkPeriodMinutes, + absenceBeginEndMinActivityMinutes, + activityMergeMode, + shiftResolutionMode ); return ResponseEntity.ok(service.evaluate(request)); } diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/ActivityIntervalDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/ActivityIntervalDto.java index 4437969..82f56d2 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/dto/ActivityIntervalDto.java +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/ActivityIntervalDto.java @@ -11,6 +11,9 @@ public record ActivityIntervalDto( UUID vehicleRegistrationId, String activityType, String cardSlot, + String cardStatus, + String drivingStatus, + String sourceKind, OffsetDateTime startedAt, OffsetDateTime endedAt, long durationSeconds, @@ -25,6 +28,9 @@ public record ActivityIntervalDto( UUID vehicleRegistrationId, String activityType, String cardSlot, + String cardStatus, + String drivingStatus, + String sourceKind, OffsetDateTime startedAt, OffsetDateTime endedAt, String sourceRowId @@ -35,6 +41,9 @@ public record ActivityIntervalDto( vehicleRegistrationId, activityType, cardSlot, + cardStatus, + drivingStatus, + sourceKind, startedAt, endedAt, Duration.between(startedAt, endedAt).getSeconds(), @@ -52,6 +61,9 @@ public record ActivityIntervalDto( vehicleRegistrationId, activityType, cardSlot, + cardStatus, + drivingStatus, + sourceKind, newStartedAt, newEndedAt, Duration.between(newStartedAt, newEndedAt).getSeconds(), @@ -69,6 +81,9 @@ public record ActivityIntervalDto( vehicleRegistrationId, activityType, cardSlot, + cardStatus, + drivingStatus, + sourceKind, startedAt, endedAt, durationSeconds, diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperActivityMergeMode.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperActivityMergeMode.java new file mode 100644 index 0000000..f79d498 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperActivityMergeMode.java @@ -0,0 +1,6 @@ +package at.procon.eventhub.esperpoc.dto; + +public enum EsperActivityMergeMode { + JAVA, + ESPER +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocRequest.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocRequest.java index e860729..22cbed2 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocRequest.java +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocRequest.java @@ -13,7 +13,11 @@ public record EsperPocRequest( Integer guardHours, Integer significantDrivingMinutes, Integer mergeGapSeconds, - Integer operatingPeriodSplitRestHours + Integer operatingPeriodSplitRestHours, + Integer shiftEndMarkPeriodMinutes, + Integer absenceBeginEndMinActivityMinutes, + EsperActivityMergeMode activityMergeMode, + EsperShiftResolutionMode shiftResolutionMode ) { public EsperPocRequest { if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) { @@ -23,5 +27,7 @@ public record EsperPocRequest( significantDrivingMinutes = significantDrivingMinutes == null ? 3 : Math.max(1, significantDrivingMinutes); mergeGapSeconds = mergeGapSeconds == null ? 60 : Math.max(0, mergeGapSeconds); operatingPeriodSplitRestHours = operatingPeriodSplitRestHours == null ? 7 : Math.max(1, operatingPeriodSplitRestHours); + shiftEndMarkPeriodMinutes = shiftEndMarkPeriodMinutes == null ? 420 : Math.max(1, shiftEndMarkPeriodMinutes); + absenceBeginEndMinActivityMinutes = absenceBeginEndMinActivityMinutes == null ? 5 : Math.max(1, absenceBeginEndMinActivityMinutes); } } diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocResultDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocResultDto.java index 086c52e..cbed2c8 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocResultDto.java +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocResultDto.java @@ -12,14 +12,24 @@ public record EsperPocResultDto( OffsetDateTime loadedFrom, OffsetDateTime loadedTo, int rawEventCount, + int driverCardRawEventCount, + int vehicleUnitRawEventCount, + int driverCardRawIntervalCount, + int vehicleUnitRawIntervalCount, int rawIntervalCount, int mergedActivityCount, int operatingTimePeriodCount, + int resolvedWorkShiftCount, int operatingPeriodSplitRestHours, + int shiftEndMarkPeriodMinutes, + int absenceBeginEndMinActivityMinutes, + EsperActivityMergeMode activityMergeMode, + EsperShiftResolutionMode shiftResolutionMode, List raw, List rawIntervals, List activities, List operatingTimePeriods, + List workingShifts, DriverWorkSummaryDto workResultPerDriver, DriverWorkSummaryDto workingOperationTimesPerEmployee, ShiftDrivingEvaluationDto drivingTimeInterruptionEvaluation, diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperShiftResolutionMode.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperShiftResolutionMode.java new file mode 100644 index 0000000..c73395e --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperShiftResolutionMode.java @@ -0,0 +1,6 @@ +package at.procon.eventhub.esperpoc.dto; + +public enum EsperShiftResolutionMode { + JAVA, + ESPER +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/RawActivityEventDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/RawActivityEventDto.java index be81959..808825a 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/dto/RawActivityEventDto.java +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/RawActivityEventDto.java @@ -8,6 +8,8 @@ public record RawActivityEventDto( OffsetDateTime occurredAt, String sourceRowId, String externalSourceEventId, + String sourceKind, + String extractionCode, UUID driverEntityId, UUID vehicleId, UUID vehicleRegistrationId, diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/ResolvedWorkShiftDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/ResolvedWorkShiftDto.java new file mode 100644 index 0000000..6438422 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/dto/ResolvedWorkShiftDto.java @@ -0,0 +1,40 @@ +package at.procon.eventhub.esperpoc.dto; + +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; + +public record ResolvedWorkShiftDto( + int sequenceNumber, + OffsetDateTime startedAt, + OffsetDateTime endedAt, + OffsetDateTime nextShiftStartedAt, + long durationSeconds, + long dailyRestingTimeSeconds, + OffsetDateTime drivingFrom, + OffsetDateTime drivingTo, + OffsetDateTime absenceFrom, + OffsetDateTime absenceTo, + List vehicleIds, + List vehicleRegistrationIds, + String usedDataKind, + String shiftKind, + boolean beginTimeDeviationInRange, + boolean endTimeDeviationInRange, + boolean noDrivingActivity, + boolean dailyRestingTimeBeforeShiftOver24Hours, + boolean durationOver24Hours, + boolean dailyRestingTimeBetween9And11Hours, + boolean dailyRestingTimeBetween7And9Hours, + int activitiesCount, + int drivingCount, + int availabilityCount, + int workCount, + int breakRestCount, + int unknownCount, + int workingTimeCount, + DriverWorkSummaryDto workingOperationTimes, + ShiftDrivingEvaluationDto drivingTimeInterruptionEvaluation, + List activities +) { +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/persistence/EsperPocActivityRepository.java b/src/main/java/at/procon/eventhub/esperpoc/persistence/EsperPocActivityRepository.java index 3095900..2a7ee8a 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/persistence/EsperPocActivityRepository.java +++ b/src/main/java/at/procon/eventhub/esperpoc/persistence/EsperPocActivityRepository.java @@ -16,7 +16,7 @@ public class EsperPocActivityRepository { this.jdbcTemplate = jdbcTemplate; } - public List findDriverCardActivityEvents( + public List findDriverActivityEvents( String tenantKey, UUID driverEntityId, OffsetDateTime occurredFrom, @@ -32,7 +32,14 @@ public class EsperPocActivityRepository { regexp_replace(event.external_source_event_id, ':(START|END)$', '') ) as source_row_id, event.external_source_event_id, - event.driver_entity_id, + source.source_kind, + coalesce(pkg.extraction_code, + case + when source.source_kind = 'VEHICLE_UNIT' then 'VU_ACTIVITY' + else 'CARD_ACTIVITY' + end + ) as extraction_code, + event.driver_id, event.vehicle_id, event.vehicle_registration_id, event.event_type, @@ -49,9 +56,12 @@ public class EsperPocActivityRepository { and detail.detail_type = 'DRIVER_ACTIVITY' where pkg.tenant_key = ? and source.provider_key = 'TACHOGRAPH' - and source.source_kind = 'DRIVER_CARD' - and coalesce(pkg.extraction_code, 'CARD_ACTIVITY') = 'CARD_ACTIVITY' - and event.driver_entity_id = ? + and ( + (source.source_kind = 'DRIVER_CARD' and coalesce(pkg.extraction_code, 'CARD_ACTIVITY') = 'CARD_ACTIVITY') + or + (source.source_kind = 'VEHICLE_UNIT' and coalesce(pkg.extraction_code, 'VU_ACTIVITY') = 'VU_ACTIVITY') + ) + and event.driver_id = ? and event.occurred_at >= ? and event.occurred_at < ? and event.event_domain = 'DRIVER_ACTIVITY' @@ -64,7 +74,9 @@ public class EsperPocActivityRepository { rs.getObject("occurred_at", OffsetDateTime.class), rs.getString("source_row_id"), rs.getString("external_source_event_id"), - (UUID) rs.getObject("driver_entity_id"), + rs.getString("source_kind"), + rs.getString("extraction_code"), + (UUID) rs.getObject("driver_id"), (UUID) rs.getObject("vehicle_id"), (UUID) rs.getObject("vehicle_registration_id"), rs.getString("event_type"), diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperActivityIntervalEvent.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperActivityIntervalEvent.java new file mode 100644 index 0000000..2c2c532 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperActivityIntervalEvent.java @@ -0,0 +1,24 @@ +package at.procon.eventhub.esperpoc.service; + +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; + +public record EsperActivityIntervalEvent( + UUID driverEntityId, + UUID vehicleId, + UUID vehicleRegistrationId, + String activityType, + String cardSlot, + String cardStatus, + String drivingStatus, + String sourceKind, + OffsetDateTime startedAt, + OffsetDateTime endedAt, + long durationSeconds, + String sourceRowId, + List sourceRowIds, + boolean clippedToRequestedPeriod, + String level +) { +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperActivitySemantics.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperActivitySemantics.java new file mode 100644 index 0000000..9678214 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperActivitySemantics.java @@ -0,0 +1,54 @@ +package at.procon.eventhub.esperpoc.service; + +import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto; +import java.time.Duration; +import java.time.LocalDate; +import java.util.Objects; +import java.util.Set; + +final class EsperActivitySemantics { + + private static final Set ACTIVE_ACTIVITY_TYPES = Set.of("DRIVE", "WORK", "AVAILABILITY"); + + private EsperActivitySemantics() { + } + + static boolean canMerge(ActivityIntervalDto left, ActivityIntervalDto right, Duration tolerance) { + boolean sameDriver = Objects.equals(left.driverEntityId(), right.driverEntityId()); + boolean sameActivity = Objects.equals(left.activityType(), right.activityType()); + boolean sameSlot = Objects.equals(left.cardSlot(), right.cardSlot()); + boolean sameCardStatus = Objects.equals(left.cardStatus(), right.cardStatus()); + boolean sameDrivingStatus = Objects.equals(left.drivingStatus(), right.drivingStatus()); + boolean sameSourceKind = Objects.equals(left.sourceKind(), right.sourceKind()); + long gapSeconds = Duration.between(left.endedAt(), right.startedAt()).getSeconds(); + boolean adjacentOrOverlapping = gapSeconds <= tolerance.getSeconds(); + return sameDriver && sameActivity && sameSlot && sameCardStatus && sameDrivingStatus && sameSourceKind && adjacentOrOverlapping; + } + + static boolean isShiftActivity(ActivityIntervalDto interval) { + return isKnownActivity(interval) && ACTIVE_ACTIVITY_TYPES.contains(interval.activityType()); + } + + static boolean isRestOrUnknown(ActivityIntervalDto interval) { + return ("BREAK_REST".equals(interval.activityType()) && isKnownActivity(interval)) + || isUnknownActivity(interval); + } + + static boolean isKnownActivity(ActivityIntervalDto interval) { + return interval.cardStatus() == null + || !"NOT_INSERTED".equals(interval.cardStatus()) + || "KNOWN".equals(interval.drivingStatus()); + } + + static boolean isUnknownActivity(ActivityIntervalDto interval) { + if ("UNKNOWN_ACTIVITY".equals(interval.activityType())) { + return true; + } + return "UNKNOWN".equals(interval.drivingStatus()) + || ("NOT_INSERTED".equals(interval.cardStatus()) && !"KNOWN".equals(interval.drivingStatus())); + } + + static LocalDate recordDate(ActivityIntervalDto interval) { + return interval.startedAt().toLocalDate(); + } +} diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngine.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngine.java index 9404c86..b9c57fa 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngine.java +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngine.java @@ -12,12 +12,14 @@ import com.espertech.esper.runtime.client.EPDeployException; import com.espertech.esper.runtime.client.EPDeployment; import com.espertech.esper.runtime.client.EPRuntime; import com.espertech.esper.runtime.client.EPRuntimeProvider; +import java.time.Duration; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.springframework.stereotype.Component; @Component @@ -25,7 +27,7 @@ public class EsperDriverActivityEngine { private static final AtomicLong RUNTIME_COUNTER = new AtomicLong(); - private static final String EPL = """ + private static final String INTERVAL_EPL = """ @name('driverCardActivityIntervals') select s.driverEntityId as driverEntityId, @@ -33,6 +35,9 @@ public class EsperDriverActivityEngine { s.vehicleRegistrationId as vehicleRegistrationId, s.eventType as activityType, s.cardSlot as cardSlot, + s.cardStatus as cardStatus, + s.drivingStatus as drivingStatus, + s.sourceKind as sourceKind, s.occurredAt as startedAt, e.occurredAt as endedAt, s.sourceRowId as sourceRowId @@ -48,38 +53,104 @@ public class EsperDriverActivityEngine { ] """; + private static final String INTERVAL_STREAM_EPL = """ + @name('driverActivityIntervalStream') + select * from DriverActivityInterval + """; + public List buildIntervals(List rawEvents) { if (rawEvents == null || rawEvents.isEmpty()) { return List.of(); } List intervals = new ArrayList<>(); + executeWithRuntime( + configuration -> configuration.getCommon().addEventType("RawDriverActivityPoint", EsperRawDriverActivityPoint.class), + INTERVAL_EPL, + "driverCardActivityIntervals", + newData -> collectIntervals(newData, intervals), + runtime -> { + List points = rawEvents.stream() + .sorted(Comparator.comparing(RawActivityEventDto::occurredAt).thenComparing(RawActivityEventDto::lifecycle)) + .map(this::toEsperPoint) + .toList(); + for (EsperRawDriverActivityPoint point : points) { + runtime.getEventService().sendEventBean(point, "RawDriverActivityPoint"); + } + } + ); + + return intervals.stream() + .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) + .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt)) + .toList(); + } + + public List mergeConsecutiveIdenticalActivities( + List intervals, + Duration mergeGapTolerance + ) { + List sorted = sortedPositiveIntervals(intervals); + if (sorted.isEmpty()) { + return List.of(); + } + + MergedActivityCollector collector = new MergedActivityCollector(mergeGapTolerance); + executeIntervalStream(sorted, collector::accept); + return collector.finish(); + } + + public List resolveShiftSpans( + List intervals, + Duration shiftEndThreshold, + Duration contiguousGapTolerance + ) { + List sorted = sortedPositiveIntervals(intervals); + if (sorted.isEmpty()) { + return List.of(); + } + + ShiftSpanCollector collector = new ShiftSpanCollector(shiftEndThreshold, contiguousGapTolerance); + executeIntervalStream(sorted, collector::accept); + return collector.finish(); + } + + private void executeIntervalStream(List intervals, Consumer consumer) { + executeWithRuntime( + configuration -> configuration.getCommon().addEventType("DriverActivityInterval", EsperActivityIntervalEvent.class), + INTERVAL_STREAM_EPL, + "driverActivityIntervalStream", + newData -> collectInputIntervals(newData, consumer), + runtime -> { + for (ActivityIntervalDto interval : intervals) { + runtime.getEventService().sendEventBean(toEsperInterval(interval), "DriverActivityInterval"); + } + } + ); + } + + private void executeWithRuntime( + Consumer configurationSetup, + String epl, + String statementName, + Consumer listener, + Consumer sender + ) { EPRuntime runtime = null; try { Configuration configuration = new Configuration(); - configuration.getCommon().addEventType("RawDriverActivityPoint", EsperRawDriverActivityPoint.class); + configurationSetup.accept(configuration); String runtimeUri = "eventhub-esper-poc-" + RUNTIME_COUNTER.incrementAndGet(); runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration); CompilerArguments arguments = new CompilerArguments(configuration); - EPCompiled compiled = EPCompilerProvider.getCompiler().compile(EPL, arguments); + EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments); EPDeployment deployment = runtime.getDeploymentService().deploy(compiled); runtime.getDeploymentService() - .getStatement(deployment.getDeploymentId(), "driverCardActivityIntervals") - .addListener((newData, oldData, statement, rt) -> collectIntervals(newData, intervals)); + .getStatement(deployment.getDeploymentId(), statementName) + .addListener((newData, oldData, statement, rt) -> listener.accept(newData)); - List points = rawEvents.stream() - .sorted(Comparator.comparing(RawActivityEventDto::occurredAt).thenComparing(RawActivityEventDto::lifecycle)) - .map(this::toEsperPoint) - .toList(); - for (EsperRawDriverActivityPoint point : points) { - runtime.getEventService().sendEventBean(point, "RawDriverActivityPoint"); - } - - return intervals.stream() - .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) - .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt)) - .toList(); + sender.accept(runtime); } catch (EPCompileException | EPDeployException e) { throw new IllegalStateException("Cannot compile/deploy Esper PoC EPL", e); } finally { @@ -102,6 +173,9 @@ public class EsperDriverActivityEngine { (UUID) event.get("vehicleRegistrationId"), (String) event.get("activityType"), (String) event.get("cardSlot"), + (String) event.get("cardStatus"), + (String) event.get("drivingStatus"), + (String) event.get("sourceKind"), startedAt, endedAt, (String) event.get("sourceRowId") @@ -109,6 +183,32 @@ public class EsperDriverActivityEngine { } } + private void collectInputIntervals(EventBean[] newData, Consumer consumer) { + if (newData == null) { + return; + } + for (EventBean event : newData) { + EsperActivityIntervalEvent interval = (EsperActivityIntervalEvent) event.getUnderlying(); + consumer.accept(new ActivityIntervalDto( + interval.driverEntityId(), + interval.vehicleId(), + interval.vehicleRegistrationId(), + interval.activityType(), + interval.cardSlot(), + interval.cardStatus(), + interval.drivingStatus(), + interval.sourceKind(), + interval.startedAt(), + interval.endedAt(), + interval.durationSeconds(), + interval.sourceRowId(), + interval.sourceRowIds(), + interval.clippedToRequestedPeriod(), + interval.level() + )); + } + } + private EsperRawDriverActivityPoint toEsperPoint(RawActivityEventDto event) { return new EsperRawDriverActivityPoint( event.eventId(), @@ -120,7 +220,186 @@ public class EsperDriverActivityEngine { event.vehicleRegistrationId(), event.activityType(), event.lifecycle(), - event.cardSlot() + event.cardSlot(), + event.cardStatus(), + event.drivingStatus(), + event.sourceKind() ); } + + private EsperActivityIntervalEvent toEsperInterval(ActivityIntervalDto interval) { + return new EsperActivityIntervalEvent( + interval.driverEntityId(), + interval.vehicleId(), + interval.vehicleRegistrationId(), + interval.activityType(), + interval.cardSlot(), + interval.cardStatus(), + interval.drivingStatus(), + interval.sourceKind(), + interval.startedAt(), + interval.endedAt(), + interval.durationSeconds(), + interval.sourceRowId(), + interval.sourceRowIds(), + interval.clippedToRequestedPeriod(), + interval.level() + ); + } + + private List sortedPositiveIntervals(List intervals) { + if (intervals == null || intervals.isEmpty()) { + return List.of(); + } + return intervals.stream() + .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) + .sorted(Comparator.comparing(ActivityIntervalDto::startedAt) + .thenComparing(ActivityIntervalDto::endedAt) + .thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + private static final class MergedActivityCollector { + private final Duration mergeGapTolerance; + private final List merged = new ArrayList<>(); + private ActivityIntervalDto current; + private List currentSources = List.of(); + + private MergedActivityCollector(Duration mergeGapTolerance) { + this.mergeGapTolerance = mergeGapTolerance; + } + + private void accept(ActivityIntervalDto next) { + if (current == null) { + current = next; + currentSources = new ArrayList<>(next.sourceRowIds()); + return; + } + if (EsperActivitySemantics.canMerge(current, next, mergeGapTolerance)) { + currentSources.addAll(next.sourceRowIds()); + OffsetDateTime newEndedAt = current.endedAt().isAfter(next.endedAt()) ? current.endedAt() : next.endedAt(); + current = new ActivityIntervalDto( + current.driverEntityId(), + current.vehicleId() == null ? next.vehicleId() : current.vehicleId(), + current.vehicleRegistrationId() == null ? next.vehicleRegistrationId() : current.vehicleRegistrationId(), + current.activityType(), + current.cardSlot(), + current.cardStatus(), + current.drivingStatus(), + current.sourceKind(), + current.startedAt(), + newEndedAt, + Duration.between(current.startedAt(), newEndedAt).getSeconds(), + current.sourceRowId(), + List.copyOf(currentSources), + current.clippedToRequestedPeriod() || next.clippedToRequestedPeriod(), + "MERGED_ACTIVITY" + ); + return; + } + merged.add(current.asMerged(List.copyOf(currentSources))); + current = next; + currentSources = new ArrayList<>(next.sourceRowIds()); + } + + private List finish() { + if (current != null) { + merged.add(current.asMerged(List.copyOf(currentSources))); + } + return List.copyOf(merged); + } + } + + private static final class ShiftSpanCollector { + private final Duration shiftEndThreshold; + private final Duration contiguousGapTolerance; + private final List seen = new ArrayList<>(); + private final List spans = new ArrayList<>(); + private OffsetDateTime shiftBegin; + private long accumulatedRestSeconds; + private OffsetDateTime prevActivityEnd; + private OffsetDateTime pendingShiftEndTriggerAt; + + private ShiftSpanCollector(Duration shiftEndThreshold, Duration contiguousGapTolerance) { + this.shiftEndThreshold = shiftEndThreshold; + this.contiguousGapTolerance = contiguousGapTolerance; + } + + private void accept(ActivityIntervalDto interval) { + seen.add(interval); + if (shiftBegin == null) { + if (EsperActivitySemantics.isShiftActivity(interval)) { + shiftBegin = interval.startedAt(); + } + prevActivityEnd = interval.endedAt(); + return; + } + if (!shiftBegin.isBefore(interval.startedAt())) { + prevActivityEnd = interval.endedAt(); + return; + } + + if (pendingShiftEndTriggerAt != null && EsperActivitySemantics.isShiftActivity(interval)) { + appendShiftSpan(shiftBegin, interval.startedAt()); + shiftBegin = interval.startedAt(); + pendingShiftEndTriggerAt = null; + accumulatedRestSeconds = 0; + } + + if (EsperActivitySemantics.isRestOrUnknown(interval)) { + long gapSeconds = prevActivityEnd == null ? 0 : Math.max(0, Duration.between(prevActivityEnd, interval.startedAt()).getSeconds()); + boolean contiguous = prevActivityEnd != null + && Math.abs(Duration.between(prevActivityEnd, interval.startedAt()).getSeconds()) <= contiguousGapTolerance.getSeconds(); + boolean crossedRecordDate = prevActivityEnd != null + && !interval.startedAt().toLocalDate().equals(prevActivityEnd.toLocalDate()); + if (contiguous) { + accumulatedRestSeconds += interval.durationSeconds(); + } else if (crossedRecordDate) { + accumulatedRestSeconds = interval.durationSeconds() + gapSeconds; + } else { + accumulatedRestSeconds = interval.durationSeconds(); + } + if (accumulatedRestSeconds >= shiftEndThreshold.getSeconds() && pendingShiftEndTriggerAt == null) { + pendingShiftEndTriggerAt = interval.startedAt(); + } + } else if (EsperActivitySemantics.isShiftActivity(interval)) { + long gapSeconds = prevActivityEnd == null ? 0 : Math.max(0, Duration.between(prevActivityEnd, interval.startedAt()).getSeconds()); + boolean crossedRecordDate = prevActivityEnd != null + && !interval.startedAt().toLocalDate().equals(prevActivityEnd.toLocalDate()); + if (crossedRecordDate && accumulatedRestSeconds + gapSeconds >= shiftEndThreshold.getSeconds()) { + appendShiftSpan(shiftBegin, interval.startedAt()); + shiftBegin = interval.startedAt(); + } + accumulatedRestSeconds = 0; + pendingShiftEndTriggerAt = null; + } + + prevActivityEnd = interval.endedAt(); + } + + private List finish() { + if (shiftBegin != null) { + appendShiftSpan(shiftBegin, null); + } + return List.copyOf(spans); + } + + private void appendShiftSpan(OffsetDateTime currentShiftBegin, OffsetDateTime nextShiftBegin) { + if (currentShiftBegin == null) { + return; + } + OffsetDateTime shiftEnd = seen.stream() + .filter(interval -> !interval.startedAt().isBefore(currentShiftBegin)) + .filter(interval -> nextShiftBegin == null || interval.startedAt().isBefore(nextShiftBegin)) + .filter(EsperActivitySemantics::isShiftActivity) + .map(ActivityIntervalDto::endedAt) + .max(OffsetDateTime::compareTo) + .orElse(null); + if (shiftEnd == null || !shiftEnd.isAfter(currentShiftBegin)) { + return; + } + long restSeconds = nextShiftBegin == null ? 0 : Math.max(0, Duration.between(shiftEnd, nextShiftBegin).getSeconds()); + spans.add(new EsperResolvedShiftSpan(currentShiftBegin, shiftEnd, nextShiftBegin, restSeconds)); + } + } } diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityService.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityService.java index 0ff9241..ba49733 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityService.java +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityService.java @@ -1,12 +1,16 @@ package at.procon.eventhub.esperpoc.service; +import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto; import at.procon.eventhub.esperpoc.dto.DriverWorkSummaryDto; import at.procon.eventhub.esperpoc.dto.DrivingInterruptionDto; +import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode; import at.procon.eventhub.esperpoc.dto.EsperPocRequest; import at.procon.eventhub.esperpoc.dto.EsperPocResultDto; +import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode; import at.procon.eventhub.esperpoc.dto.OperatingTimePeriodDto; import at.procon.eventhub.esperpoc.dto.RawActivityEventDto; +import at.procon.eventhub.esperpoc.dto.ResolvedWorkShiftDto; import at.procon.eventhub.esperpoc.dto.ShiftDrivingEvaluationDto; import at.procon.eventhub.esperpoc.persistence.EsperPocActivityRepository; import java.time.Duration; @@ -15,58 +19,140 @@ import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Comparator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import org.springframework.beans.factory.annotation.Autowired; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @Service public class EsperPocDriverCardActivityService { + private static final Duration SHIFT_SCAN_CONTIGUOUS_GAP = Duration.ofMinutes(1); + private static final Logger log = LoggerFactory.getLogger(EsperPocDriverCardActivityService.class); + private final EsperPocActivityRepository activityRepository; private final EsperDriverActivityEngine esperEngine; + private final EventHubProperties properties; public EsperPocDriverCardActivityService( EsperPocActivityRepository activityRepository, EsperDriverActivityEngine esperEngine + ) { + this(activityRepository, esperEngine, null); + } + + @Autowired + public EsperPocDriverCardActivityService( + EsperPocActivityRepository activityRepository, + EsperDriverActivityEngine esperEngine, + EventHubProperties properties ) { this.activityRepository = activityRepository; this.esperEngine = esperEngine; + this.properties = properties; } public EsperPocResultDto evaluate(EsperPocRequest request) { + long startedNanos = System.nanoTime(); + EsperActivityMergeMode activityMergeMode = resolveActivityMergeMode(request); + EsperShiftResolutionMode shiftResolutionMode = resolveShiftResolutionMode(request); OffsetDateTime requestedFrom = utc(request.occurredFrom()); OffsetDateTime requestedTo = utc(request.occurredTo()); OffsetDateTime loadedFrom = requestedFrom.minusHours(request.guardHours()); OffsetDateTime loadedTo = requestedTo.plusHours(request.guardHours()); - List rawEvents = activityRepository.findDriverCardActivityEvents( + long dbStartedNanos = System.nanoTime(); + List rawEvents = activityRepository.findDriverActivityEvents( request.tenantKey(), request.driverEntityId(), loadedFrom, loadedTo ); - List rawIntervals = esperEngine.buildIntervals(rawEvents); + long dbElapsedMs = elapsedMillis(dbStartedNanos); + List driverCardRawEvents = rawEvents.stream() + .filter(event -> "DRIVER_CARD".equals(event.sourceKind())) + .toList(); + List vehicleUnitRawEvents = rawEvents.stream() + .filter(event -> "VEHICLE_UNIT".equals(event.sourceKind())) + .toList(); - // Merge in the full guard window first. This is important for long BREAK_REST detection: - // a rest crossing the requested period boundary must keep its full guard-window duration. - List mergedLoadedActivities = mergeConsecutiveIdenticalActivities( - rawIntervals, - Duration.ofSeconds(request.mergeGapSeconds()) + long cardIntervalsStartedNanos = System.nanoTime(); + List driverCardRawIntervals = esperEngine.buildIntervals(driverCardRawEvents); + long cardIntervalsElapsedMs = elapsedMillis(cardIntervalsStartedNanos); + long vuIntervalsStartedNanos = System.nanoTime(); + List vehicleUnitRawIntervals = esperEngine.buildIntervals(vehicleUnitRawEvents); + long vuIntervalsElapsedMs = elapsedMillis(vuIntervalsStartedNanos); + long vuGapFillStartedNanos = System.nanoTime(); + List resolvedLoadedIntervals = resolveVuFillGaps( + driverCardRawIntervals, + vehicleUnitRawIntervals ); + long vuGapFillElapsedMs = elapsedMillis(vuGapFillStartedNanos); + + long mergeStartedNanos = System.nanoTime(); + List mergedLoadedActivities = mergeActivities( + resolvedLoadedIntervals, + Duration.ofSeconds(request.mergeGapSeconds()), + activityMergeMode + ); + long mergeElapsedMs = elapsedMillis(mergeStartedNanos); + long summaryStartedNanos = System.nanoTime(); List mergedActivities = clipToPeriod(mergedLoadedActivities, requestedFrom, requestedTo); - DriverWorkSummaryDto summary = summarize(request, requestedFrom, requestedTo, mergedActivities); - ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving( - mergedActivities, - request.significantDrivingMinutes() - ); + long summaryElapsedMs = elapsedMillis(summaryStartedNanos); + long operatingPeriodsStartedNanos = System.nanoTime(); List operatingTimePeriods = buildOperatingTimePeriods( request, requestedFrom, requestedTo, mergedLoadedActivities ); + long operatingPeriodsElapsedMs = elapsedMillis(operatingPeriodsStartedNanos); + long workingShiftsStartedNanos = System.nanoTime(); + List workingShifts = buildResolvedWorkShifts( + request, + requestedFrom, + requestedTo, + resolvedLoadedIntervals, + activityMergeMode, + shiftResolutionMode + ); + long workingShiftsElapsedMs = elapsedMillis(workingShiftsStartedNanos); + long totalElapsedMs = elapsedMillis(startedNanos); + + log.info("Esper PoC working-shift evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} mergeMode={} shiftMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedIntervals={} mergedActivities={} operatingPeriods={} workingShifts={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, merge={}, summaryAndDriving={}, operatingPeriods={}, workingShifts={}, total={}}}", + request.tenantKey(), + request.driverEntityId(), + requestedFrom, + requestedTo, + loadedFrom, + loadedTo, + activityMergeMode, + shiftResolutionMode, + rawEvents.size(), + driverCardRawEvents.size(), + vehicleUnitRawEvents.size(), + driverCardRawIntervals.size(), + vehicleUnitRawIntervals.size(), + resolvedLoadedIntervals.size(), + mergedActivities.size(), + operatingTimePeriods.size(), + workingShifts.size(), + dbElapsedMs, + cardIntervalsElapsedMs, + vuIntervalsElapsedMs, + vuGapFillElapsedMs, + mergeElapsedMs, + summaryElapsedMs, + operatingPeriodsElapsedMs, + workingShiftsElapsedMs, + totalElapsedMs); return new EsperPocResultDto( request.tenantKey(), @@ -76,18 +162,28 @@ public class EsperPocDriverCardActivityService { loadedFrom, loadedTo, rawEvents.size(), - rawIntervals.size(), + driverCardRawEvents.size(), + vehicleUnitRawEvents.size(), + driverCardRawIntervals.size(), + vehicleUnitRawIntervals.size(), + resolvedLoadedIntervals.size(), mergedActivities.size(), operatingTimePeriods.size(), + workingShifts.size(), request.operatingPeriodSplitRestHours(), + request.shiftEndMarkPeriodMinutes(), + request.absenceBeginEndMinActivityMinutes(), + activityMergeMode, + shiftResolutionMode, rawEvents, - rawIntervals, + resolvedLoadedIntervals, mergedActivities, operatingTimePeriods, + workingShifts, summary, summary, - drivingEvaluation, - notes(request) + null, + notes(request, activityMergeMode, shiftResolutionMode) ); } @@ -98,10 +194,7 @@ public class EsperPocDriverCardActivityService { if (intervals == null || intervals.isEmpty()) { return List.of(); } - List sorted = intervals.stream() - .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) - .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt)) - .toList(); + List sorted = sortedPositiveIntervals(intervals); List result = new ArrayList<>(); ActivityIntervalDto current = null; List currentSources = new ArrayList<>(); @@ -111,7 +204,7 @@ public class EsperPocDriverCardActivityService { currentSources = new ArrayList<>(next.sourceRowIds()); continue; } - if (canMerge(current, next, mergeGapTolerance)) { + if (EsperActivitySemantics.canMerge(current, next, mergeGapTolerance)) { currentSources.addAll(next.sourceRowIds()); OffsetDateTime newEndedAt = max(current.endedAt(), next.endedAt()); current = new ActivityIntervalDto( @@ -120,6 +213,9 @@ public class EsperPocDriverCardActivityService { current.vehicleRegistrationId() == null ? next.vehicleRegistrationId() : current.vehicleRegistrationId(), current.activityType(), current.cardSlot(), + current.cardStatus(), + current.drivingStatus(), + current.sourceKind(), current.startedAt(), newEndedAt, Duration.between(current.startedAt(), newEndedAt).getSeconds(), @@ -147,13 +243,9 @@ public class EsperPocDriverCardActivityService { List mergedLoadedActivities ) { Duration splitRestThreshold = Duration.ofHours(request.operatingPeriodSplitRestHours()); - List sorted = mergedLoadedActivities.stream() - .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) - .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt)) - .toList(); + List sorted = sortedPositiveIntervals(mergedLoadedActivities); List longRests = sorted.stream() .filter(interval -> isOperatingPeriodSplitRest(interval, splitRestThreshold)) - .sorted(Comparator.comparing(ActivityIntervalDto::startedAt)) .toList(); List result = new ArrayList<>(); @@ -213,53 +305,367 @@ public class EsperPocDriverCardActivityService { return result; } - private OperatingTimePeriodDto buildOperatingPeriod( - int sequenceNumber, + public List buildResolvedWorkShifts( EsperPocRequest request, - OffsetDateTime spanFrom, - OffsetDateTime spanTo, - ActivityIntervalDto splitStartedAfterLongRest, - ActivityIntervalDto splitEndedByLongRest, - List allActivities + OffsetDateTime requestedFrom, + OffsetDateTime requestedTo, + List resolvedLoadedIntervals ) { - List activities = allActivities.stream() - .filter(activity -> activity.startedAt().isBefore(spanTo) && activity.endedAt().isAfter(spanFrom)) - .map(activity -> { - OffsetDateTime start = max(activity.startedAt(), spanFrom); - OffsetDateTime end = min(activity.endedAt(), spanTo); - if (!end.isAfter(start)) { - return null; - } - boolean clipped = !start.equals(activity.startedAt()) || !end.equals(activity.endedAt()); - return activity.withTime(start, end, clipped); - }) - .filter(Objects::nonNull) - .filter(activity -> activity.endedAt().isAfter(activity.startedAt())) - .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt)) - .toList(); + return buildResolvedWorkShifts( + request, + requestedFrom, + requestedTo, + resolvedLoadedIntervals, + resolveActivityMergeMode(request), + resolveShiftResolutionMode(request) + ); + } - if (activities.isEmpty()) { - return null; + public List buildResolvedWorkShifts( + EsperPocRequest request, + OffsetDateTime requestedFrom, + OffsetDateTime requestedTo, + List resolvedLoadedIntervals, + EsperActivityMergeMode activityMergeMode, + EsperShiftResolutionMode shiftResolutionMode + ) { + long startedNanos = System.nanoTime(); + List sorted = sortedPositiveIntervals(resolvedLoadedIntervals); + if (sorted.isEmpty()) { + log.info("Esper PoC work-shift resolution requestedFrom={} requestedTo={} shiftMode={} sourceIntervals=0 resolvedShifts=0 timingsMs={{resolveSpans=0, enrichShifts=0, total={}}}", + requestedFrom, + requestedTo, + shiftResolutionMode, + elapsedMillis(startedNanos)); + return List.of(); } - OffsetDateTime startedAt = activities.get(0).startedAt(); - OffsetDateTime endedAt = activities.get(activities.size() - 1).endedAt(); - DriverWorkSummaryDto summary = summarize(request, startedAt, endedAt, activities); - ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving( - activities, - request.significantDrivingMinutes() - ); - return new OperatingTimePeriodDto( - sequenceNumber, - startedAt, - endedAt, - Duration.between(startedAt, endedAt).getSeconds(), - splitStartedAfterLongRest, - splitEndedByLongRest, - activities, - summary, - drivingEvaluation - ); + long resolveSpansStartedNanos = System.nanoTime(); + List shiftSpans = resolveShiftSpans(sorted, request, shiftResolutionMode); + long resolveSpansElapsedMs = elapsedMillis(resolveSpansStartedNanos); + if (shiftSpans.isEmpty()) { + log.info("Esper PoC work-shift resolution requestedFrom={} requestedTo={} shiftMode={} sourceIntervals={} resolvedShiftSpans=0 resolvedShifts=0 timingsMs={{resolveSpans={}, enrichShifts=0, total={}}}", + requestedFrom, + requestedTo, + shiftResolutionMode, + sorted.size(), + resolveSpansElapsedMs, + elapsedMillis(startedNanos)); + return List.of(); + } + + ZoneOffset evaluationOffset = request.occurredFrom().getOffset(); + ShiftDeviationStats deviationStats = calculateShiftDeviationStats(shiftSpans, evaluationOffset); + List result = new ArrayList<>(); + int sequenceNumber = 1; + long enrichShiftsStartedNanos = System.nanoTime(); + + for (int index = 0; index < shiftSpans.size(); index++) { + EsperResolvedShiftSpan span = shiftSpans.get(index); + if (!span.endedAt().isAfter(requestedFrom) || !span.startedAt().isBefore(requestedTo)) { + continue; + } + + List rawShiftActivities = clipToPeriod(sorted, span.startedAt(), span.endedAt()); + if (rawShiftActivities.isEmpty()) { + continue; + } + List mergedShiftActivities = mergeActivities( + rawShiftActivities, + Duration.ofSeconds(request.mergeGapSeconds()), + activityMergeMode + ); + DriverWorkSummaryDto summary = summarize(request, span.startedAt(), span.endedAt(), mergedShiftActivities); + ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving( + mergedShiftActivities, + request.significantDrivingMinutes() + ); + ShiftDeviationSnapshot deviation = deviationStats.snapshot(span, evaluationOffset); + + result.add(new ResolvedWorkShiftDto( + sequenceNumber++, + span.startedAt(), + span.endedAt(), + span.nextShiftStartedAt(), + span.durationSeconds(), + span.dailyRestingTimeSeconds(), + firstDrivingStart(rawShiftActivities), + lastDrivingEnd(rawShiftActivities), + firstDrivingStart(rawShiftActivities, request.absenceBeginEndMinActivityMinutes()), + lastDrivingEnd(rawShiftActivities, request.absenceBeginEndMinActivityMinutes()), + distinctVehicleIds(rawShiftActivities), + distinctVehicleRegistrationIds(rawShiftActivities), + determineUsedDataKind(rawShiftActivities), + determineShiftKind(span, evaluationOffset), + deviation.beginInRange(), + deviation.endInRange(), + summary.drivingSeconds() == 0, + index > 0 && shiftSpans.get(index - 1).dailyRestingTimeSeconds() > Duration.ofHours(24).getSeconds(), + span.durationSeconds() > Duration.ofHours(24).getSeconds(), + span.dailyRestingTimeSeconds() >= Duration.ofHours(9).getSeconds() + && span.dailyRestingTimeSeconds() <= Duration.ofHours(11).getSeconds(), + span.dailyRestingTimeSeconds() >= Duration.ofHours(7).getSeconds() + && span.dailyRestingTimeSeconds() <= Duration.ofHours(9).getSeconds(), + rawShiftActivities.size(), + countActivities(rawShiftActivities, "DRIVE"), + countActivities(rawShiftActivities, "AVAILABILITY"), + countActivities(rawShiftActivities, "WORK"), + countActivities(rawShiftActivities, "BREAK_REST"), + countUnknownActivities(rawShiftActivities), + countWorkingActivities(rawShiftActivities), + summary, + drivingEvaluation, + rawShiftActivities + )); + } + long enrichShiftsElapsedMs = elapsedMillis(enrichShiftsStartedNanos); + log.info("Esper PoC work-shift resolution requestedFrom={} requestedTo={} shiftMode={} sourceIntervals={} resolvedShiftSpans={} resolvedShifts={} timingsMs={{resolveSpans={}, enrichShifts={}, total={}}}", + requestedFrom, + requestedTo, + shiftResolutionMode, + sorted.size(), + shiftSpans.size(), + result.size(), + resolveSpansElapsedMs, + enrichShiftsElapsedMs, + elapsedMillis(startedNanos)); + return result; + } + + List resolveVuFillGaps( + List driverCardRawIntervals, + List vehicleUnitRawIntervals + ) { + List driverCard = sortedPositiveIntervals(driverCardRawIntervals); + List vehicleUnit = sortedPositiveIntervals(vehicleUnitRawIntervals); + if (driverCard.isEmpty()) { + return vehicleUnit; + } + if (vehicleUnit.isEmpty()) { + return driverCard; + } + + List resolved = new ArrayList<>(driverCard); + for (ActivityIntervalDto vuInterval : vehicleUnit) { + resolved.addAll(subtractCoverage(vuInterval, driverCard)); + } + return resolved.stream() + .sorted(Comparator.comparing(ActivityIntervalDto::startedAt) + .thenComparing(ActivityIntervalDto::endedAt) + .thenComparing(ActivityIntervalDto::sourceKind, Comparator.nullsLast(String::compareTo))) + .toList(); + } + + private List mergeActivities( + List intervals, + Duration mergeGapTolerance, + EsperActivityMergeMode mode + ) { + if (mode == EsperActivityMergeMode.ESPER) { + return requireEsperEngine().mergeConsecutiveIdenticalActivities(intervals, mergeGapTolerance); + } + return mergeConsecutiveIdenticalActivities(intervals, mergeGapTolerance); + } + + private List subtractCoverage( + ActivityIntervalDto candidate, + List coverage + ) { + List result = new ArrayList<>(); + OffsetDateTime cursor = candidate.startedAt(); + for (ActivityIntervalDto covered : coverage) { + if (!covered.endedAt().isAfter(cursor)) { + continue; + } + if (!covered.startedAt().isBefore(candidate.endedAt())) { + break; + } + OffsetDateTime overlapStart = max(cursor, covered.startedAt()); + if (overlapStart.isAfter(cursor)) { + result.add(candidate.withTime(cursor, overlapStart, candidate.clippedToRequestedPeriod())); + } + if (covered.endedAt().isAfter(cursor)) { + cursor = max(cursor, covered.endedAt()); + } + if (!candidate.endedAt().isAfter(cursor)) { + break; + } + } + if (candidate.endedAt().isAfter(cursor)) { + result.add(candidate.withTime(cursor, candidate.endedAt(), candidate.clippedToRequestedPeriod())); + } + return result.stream() + .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) + .toList(); + } + + private List resolveShiftSpans( + List sorted, + EsperPocRequest request, + EsperShiftResolutionMode mode + ) { + if (mode == EsperShiftResolutionMode.ESPER) { + return requireEsperEngine().resolveShiftSpans( + sorted, + Duration.ofMinutes(request.shiftEndMarkPeriodMinutes()), + SHIFT_SCAN_CONTIGUOUS_GAP + ); + } + + ActivityIntervalDto firstShiftActivity = sorted.stream() + .filter(EsperActivitySemantics::isShiftActivity) + .findFirst() + .orElse(null); + if (firstShiftActivity == null) { + return List.of(); + } + + List result = new ArrayList<>(); + OffsetDateTime shiftBegin = firstShiftActivity.startedAt(); + long accumulatedRestSeconds = 0; + OffsetDateTime prevActivityEnd = null; + long shiftEndThresholdSeconds = Duration.ofMinutes(request.shiftEndMarkPeriodMinutes()).getSeconds(); + + for (ActivityIntervalDto interval : sorted) { + if (!shiftBegin.isBefore(interval.startedAt())) { + prevActivityEnd = interval.endedAt(); + continue; + } + + if (EsperActivitySemantics.isRestOrUnknown(interval)) { + if (prevActivityEnd != null + && Math.abs(Duration.between(prevActivityEnd, interval.startedAt()).getSeconds()) <= SHIFT_SCAN_CONTIGUOUS_GAP.getSeconds()) { + accumulatedRestSeconds += interval.durationSeconds(); + } else if (prevActivityEnd != null + && !Objects.equals(EsperActivitySemantics.recordDate(interval), prevActivityEnd.toLocalDate())) { + accumulatedRestSeconds = interval.durationSeconds() + Math.max(0, Duration.between(prevActivityEnd, interval.startedAt()).getSeconds()); + } else { + accumulatedRestSeconds = interval.durationSeconds(); + } + + if (accumulatedRestSeconds >= shiftEndThresholdSeconds) { + OffsetDateTime nextShiftBegin = findNextActiveStartAfter(sorted, interval.startedAt()); + appendShiftSpan(result, sorted, shiftBegin, nextShiftBegin); + shiftBegin = nextShiftBegin; + accumulatedRestSeconds = 0; + } + } else if (EsperActivitySemantics.isShiftActivity(interval)) { + if (prevActivityEnd != null + && !Objects.equals(EsperActivitySemantics.recordDate(interval), prevActivityEnd.toLocalDate()) + && accumulatedRestSeconds + Math.max(0, Duration.between(prevActivityEnd, interval.startedAt()).getSeconds()) >= shiftEndThresholdSeconds) { + OffsetDateTime nextShiftBegin = interval.startedAt(); + appendShiftSpan(result, sorted, shiftBegin, nextShiftBegin); + shiftBegin = nextShiftBegin; + } + accumulatedRestSeconds = 0; + } + + prevActivityEnd = interval.endedAt(); + } + + if (shiftBegin != null) { + appendShiftSpan(result, sorted, shiftBegin, null); + } + return result; + } + + private void appendShiftSpan( + List target, + List sorted, + OffsetDateTime shiftBegin, + OffsetDateTime nextShiftBegin + ) { + if (shiftBegin == null) { + return; + } + OffsetDateTime shiftEnd = findLastActiveEnd(sorted, shiftBegin, nextShiftBegin); + if (shiftEnd == null || !shiftEnd.isAfter(shiftBegin)) { + return; + } + long restSeconds = nextShiftBegin == null ? 0 : Math.max(0, Duration.between(shiftEnd, nextShiftBegin).getSeconds()); + target.add(new EsperResolvedShiftSpan(shiftBegin, shiftEnd, nextShiftBegin, restSeconds)); + } + + private OffsetDateTime findNextActiveStartAfter(List sorted, OffsetDateTime activityTime) { + return sorted.stream() + .filter(interval -> interval.startedAt().isAfter(activityTime)) + .filter(EsperActivitySemantics::isShiftActivity) + .map(ActivityIntervalDto::startedAt) + .findFirst() + .orElse(null); + } + + private OffsetDateTime findLastActiveEnd( + List sorted, + OffsetDateTime shiftBegin, + OffsetDateTime nextShiftBegin + ) { + return sorted.stream() + .filter(interval -> !interval.startedAt().isBefore(shiftBegin)) + .filter(interval -> nextShiftBegin == null || interval.startedAt().isBefore(nextShiftBegin)) + .filter(EsperActivitySemantics::isShiftActivity) + .map(ActivityIntervalDto::endedAt) + .max(OffsetDateTime::compareTo) + .orElse(null); + } + + private ShiftDeviationStats calculateShiftDeviationStats(List shiftSpans, ZoneOffset evaluationOffset) { + Map> grouped = new LinkedHashMap<>(); + for (EsperResolvedShiftSpan span : shiftSpans) { + grouped.computeIfAbsent(determineShiftKind(span, evaluationOffset), ignored -> new ArrayList<>()).add(span); + } + + Map stats = new LinkedHashMap<>(); + for (Map.Entry> entry : grouped.entrySet()) { + List spans = entry.getValue(); + if (spans.size() < 4) { + stats.put(entry.getKey(), new DeviationSnapshot(Double.NaN, Double.NaN, Double.NaN, Double.NaN, false)); + continue; + } + double beginMean = spans.stream().mapToInt(span -> minutesOfDay(span.startedAt(), evaluationOffset)).average().orElse(Double.NaN); + double endMean = spans.stream().mapToInt(span -> minutesOfDay(span.endedAt(), evaluationOffset)).average().orElse(Double.NaN); + double beginDeviation = standardDeviation(spans.stream().mapToInt(span -> minutesOfDay(span.startedAt(), evaluationOffset)).toArray(), beginMean); + double endDeviation = standardDeviation(spans.stream().mapToInt(span -> minutesOfDay(span.endedAt(), evaluationOffset)).toArray(), endMean); + stats.put(entry.getKey(), new DeviationSnapshot(beginMean, endMean, beginDeviation, endDeviation, true)); + } + return new ShiftDeviationStats(stats); + } + + private double standardDeviation(int[] values, double mean) { + double squaredSum = 0; + for (int value : values) { + double delta = value - mean; + squaredSum += delta * delta; + } + return Math.sqrt(squaredSum / values.length); + } + + private int minutesOfDay(OffsetDateTime value, ZoneOffset evaluationOffset) { + OffsetDateTime local = value.withOffsetSameInstant(evaluationOffset); + return local.getHour() * 60 + local.getMinute(); + } + + private String determineShiftKind(EsperResolvedShiftSpan span, ZoneOffset evaluationOffset) { + OffsetDateTime localStart = span.startedAt().withOffsetSameInstant(evaluationOffset); + OffsetDateTime localEnd = span.endedAt().withOffsetSameInstant(evaluationOffset); + if (localStart.toLocalDate().equals(localEnd.toLocalDate())) { + int startSecond = localStart.getHour() * 3600 + localStart.getMinute() * 60 + localStart.getSecond(); + int endSecond = localEnd.getHour() * 3600 + localEnd.getMinute() * 60 + localEnd.getSecond(); + return startSecond < 5 * 3600 || endSecond > 20 * 3600 ? "SPECIAL_HOURS" : "DAY"; + } + return "OVERNIGHT"; + } + + private String determineUsedDataKind(List activities) { + boolean hasCard = activities.stream().anyMatch(activity -> "DRIVER_CARD".equals(activity.sourceKind())); + boolean hasVu = activities.stream().anyMatch(activity -> "VEHICLE_UNIT".equals(activity.sourceKind())); + if (hasCard && hasVu) { + return "BOTH"; + } + if (hasVu) { + return "VEHICLE_UNIT"; + } + return "DRIVER_CARD"; } private boolean isOperatingPeriodSplitRest(ActivityIntervalDto interval, Duration splitRestThreshold) { @@ -267,15 +673,6 @@ public class EsperPocDriverCardActivityService { && interval.durationSeconds() > splitRestThreshold.getSeconds(); } - private boolean canMerge(ActivityIntervalDto left, ActivityIntervalDto right, Duration tolerance) { - boolean sameDriver = Objects.equals(left.driverEntityId(), right.driverEntityId()); - boolean sameActivity = Objects.equals(left.activityType(), right.activityType()); - boolean sameSlot = Objects.equals(left.cardSlot(), right.cardSlot()); - long gapSeconds = Duration.between(left.endedAt(), right.startedAt()).getSeconds(); - boolean adjacentOrOverlapping = gapSeconds <= tolerance.getSeconds(); - return sameDriver && sameActivity && sameSlot && adjacentOrOverlapping; - } - private List clipToPeriod( List intervals, OffsetDateTime periodFrom, @@ -296,6 +693,18 @@ public class EsperPocDriverCardActivityService { .toList(); } + private List sortedPositiveIntervals(List intervals) { + if (intervals == null || intervals.isEmpty()) { + return List.of(); + } + return intervals.stream() + .filter(interval -> interval.endedAt().isAfter(interval.startedAt())) + .sorted(Comparator.comparing(ActivityIntervalDto::startedAt) + .thenComparing(ActivityIntervalDto::endedAt) + .thenComparing(ActivityIntervalDto::activityType, Comparator.nullsLast(String::compareTo))) + .toList(); + } + private DriverWorkSummaryDto summarize( EsperPocRequest request, OffsetDateTime periodFrom, @@ -372,17 +781,177 @@ public class EsperPocDriverCardActivityService { ); } - private List notes(EsperPocRequest request) { + private OperatingTimePeriodDto buildOperatingPeriod( + int sequenceNumber, + EsperPocRequest request, + OffsetDateTime spanFrom, + OffsetDateTime spanTo, + ActivityIntervalDto splitStartedAfterLongRest, + ActivityIntervalDto splitEndedByLongRest, + List allActivities + ) { + List activities = allActivities.stream() + .filter(activity -> activity.startedAt().isBefore(spanTo) && activity.endedAt().isAfter(spanFrom)) + .map(activity -> { + OffsetDateTime start = max(activity.startedAt(), spanFrom); + OffsetDateTime end = min(activity.endedAt(), spanTo); + if (!end.isAfter(start)) { + return null; + } + boolean clipped = !start.equals(activity.startedAt()) || !end.equals(activity.endedAt()); + return activity.withTime(start, end, clipped); + }) + .filter(Objects::nonNull) + .filter(activity -> activity.endedAt().isAfter(activity.startedAt())) + .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt)) + .toList(); + + if (activities.isEmpty()) { + return null; + } + + OffsetDateTime startedAt = activities.get(0).startedAt(); + OffsetDateTime endedAt = activities.get(activities.size() - 1).endedAt(); + DriverWorkSummaryDto summary = summarize(request, startedAt, endedAt, activities); + ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving( + mergeActivities( + activities, + Duration.ofSeconds(request.mergeGapSeconds()), + resolveActivityMergeMode(request) + ), + request.significantDrivingMinutes() + ); + return new OperatingTimePeriodDto( + sequenceNumber, + startedAt, + endedAt, + Duration.between(startedAt, endedAt).getSeconds(), + splitStartedAfterLongRest, + splitEndedByLongRest, + activities, + summary, + drivingEvaluation + ); + } + + private OffsetDateTime firstDrivingStart(List activities) { + return activities.stream() + .filter(this::isDrivingActivity) + .map(ActivityIntervalDto::startedAt) + .min(OffsetDateTime::compareTo) + .orElse(null); + } + + private OffsetDateTime lastDrivingEnd(List activities) { + return activities.stream() + .filter(this::isDrivingActivity) + .map(ActivityIntervalDto::endedAt) + .max(OffsetDateTime::compareTo) + .orElse(null); + } + + private OffsetDateTime firstDrivingStart(List activities, int minMinutes) { + long threshold = Duration.ofMinutes(minMinutes).getSeconds(); + return activities.stream() + .filter(this::isDrivingActivity) + .filter(activity -> activity.durationSeconds() >= threshold) + .map(ActivityIntervalDto::startedAt) + .min(OffsetDateTime::compareTo) + .orElse(null); + } + + private OffsetDateTime lastDrivingEnd(List activities, int minMinutes) { + long threshold = Duration.ofMinutes(minMinutes).getSeconds(); + return activities.stream() + .filter(this::isDrivingActivity) + .filter(activity -> activity.durationSeconds() >= threshold) + .map(ActivityIntervalDto::endedAt) + .max(OffsetDateTime::compareTo) + .orElse(null); + } + + private boolean isDrivingActivity(ActivityIntervalDto activity) { + return EsperActivitySemantics.isKnownActivity(activity) && "DRIVE".equals(activity.activityType()); + } + + private int countActivities(List activities, String activityType) { + return (int) activities.stream() + .filter(EsperActivitySemantics::isKnownActivity) + .filter(activity -> activityType.equals(activity.activityType())) + .count(); + } + + private int countUnknownActivities(List activities) { + return (int) activities.stream() + .filter(EsperActivitySemantics::isUnknownActivity) + .count(); + } + + private int countWorkingActivities(List activities) { + return (int) activities.stream() + .filter(EsperActivitySemantics::isShiftActivity) + .count(); + } + + private List distinctVehicleIds(List activities) { + Set values = new LinkedHashSet<>(); + for (ActivityIntervalDto activity : activities) { + if (activity.vehicleId() != null) { + values.add(activity.vehicleId()); + } + } + return List.copyOf(values); + } + + private List distinctVehicleRegistrationIds(List activities) { + Set values = new LinkedHashSet<>(); + for (ActivityIntervalDto activity : activities) { + if (activity.vehicleRegistrationId() != null) { + values.add(activity.vehicleRegistrationId()); + } + } + return List.copyOf(values); + } + + private List notes( + EsperPocRequest request, + EsperActivityMergeMode activityMergeMode, + EsperShiftResolutionMode shiftResolutionMode + ) { return List.of( - "PoC reads only tachograph DRIVER_CARD/CARD_ACTIVITY source events from eventhub.event.", - "Level RAW contains original imported point events; level Activities contains Esper-created intervals merged by consecutive identical activity.", + "PoC reads tachograph DRIVER_CARD/CARD_ACTIVITY events and VEHICLE_UNIT/VU_ACTIVITY events from eventhub.event.", + "Driver-card intervals remain authoritative. VEHICLE_UNIT activity fills only uncovered time gaps in the driver-card timeline.", + "Configured activity merge mode: " + activityMergeMode + ". Configured shift resolution mode: " + shiftResolutionMode + ".", + "Resolved work shifts follow the stored-procedure split rule: BREAK_REST or unknown coverage accumulating to " + + request.shiftEndMarkPeriodMinutes() + " minutes ends a shift.", + "Driving-time interruption evaluation is reported per operating time period and per working shift, not as a single evaluation for the complete requested period.", "Activities are merged in the guard window first and then clipped to the requested period, so BREAK_REST across the period boundary can still split operating periods correctly.", "Operating time periods are split by BREAK_REST activities longer than " + request.operatingPeriodSplitRestHours() + " hours.", - "Working seconds = DRIVE + WORK. Operation seconds = DRIVE + WORK + AVAILABILITY. BREAK_REST is reported separately.", - "Departure/arrival are evaluated globally and again inside each operating time period using DRIVE intervals longer than " + request.significantDrivingMinutes() + " minutes." + "Working seconds = DRIVE + WORK. Operation seconds = DRIVE + WORK + AVAILABILITY. BREAK_REST is reported separately." ); } + private EsperActivityMergeMode resolveActivityMergeMode(EsperPocRequest request) { + if (request.activityMergeMode() != null) { + return request.activityMergeMode(); + } + return properties == null ? EsperActivityMergeMode.JAVA : properties.getEsperPoc().getActivityMergeMode(); + } + + private EsperShiftResolutionMode resolveShiftResolutionMode(EsperPocRequest request) { + if (request.shiftResolutionMode() != null) { + return request.shiftResolutionMode(); + } + return properties == null ? EsperShiftResolutionMode.JAVA : properties.getEsperPoc().getShiftResolutionMode(); + } + + private EsperDriverActivityEngine requireEsperEngine() { + if (esperEngine == null) { + throw new IllegalStateException("Esper engine is required for ESPER execution modes"); + } + return esperEngine; + } + private OffsetDateTime utc(OffsetDateTime value) { return value.withOffsetSameInstant(ZoneOffset.UTC); } @@ -394,4 +963,52 @@ public class EsperPocDriverCardActivityService { private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) { return left.isBefore(right) ? left : right; } + + private long elapsedMillis(long startedNanos) { + return Duration.ofNanos(System.nanoTime() - startedNanos).toMillis(); + } + + private record ShiftDeviationStats(Map stats) { + ShiftDeviationSnapshot snapshot(EsperResolvedShiftSpan span, ZoneOffset evaluationOffset) { + DeviationSnapshot snapshot = stats.get(determineShiftKindStatic(span, evaluationOffset)); + if (snapshot == null || !snapshot.computed()) { + return new ShiftDeviationSnapshot(true, true); + } + int beginMinutes = minutesOfDayStatic(span.startedAt(), evaluationOffset); + int endMinutes = minutesOfDayStatic(span.endedAt(), evaluationOffset); + boolean beginInRange = beginMinutes >= snapshot.beginMean() - snapshot.beginDeviation() + && beginMinutes <= snapshot.beginMean() + snapshot.beginDeviation(); + boolean endInRange = endMinutes >= snapshot.endMean() - snapshot.endDeviation() + && endMinutes <= snapshot.endMean() + snapshot.endDeviation(); + return new ShiftDeviationSnapshot(beginInRange, endInRange); + } + + private static String determineShiftKindStatic(EsperResolvedShiftSpan span, ZoneOffset evaluationOffset) { + OffsetDateTime localStart = span.startedAt().withOffsetSameInstant(evaluationOffset); + OffsetDateTime localEnd = span.endedAt().withOffsetSameInstant(evaluationOffset); + if (localStart.toLocalDate().equals(localEnd.toLocalDate())) { + int startSecond = localStart.getHour() * 3600 + localStart.getMinute() * 60 + localStart.getSecond(); + int endSecond = localEnd.getHour() * 3600 + localEnd.getMinute() * 60 + localEnd.getSecond(); + return startSecond < 5 * 3600 || endSecond > 20 * 3600 ? "SPECIAL_HOURS" : "DAY"; + } + return "OVERNIGHT"; + } + + private static int minutesOfDayStatic(OffsetDateTime value, ZoneOffset evaluationOffset) { + OffsetDateTime local = value.withOffsetSameInstant(evaluationOffset); + return local.getHour() * 60 + local.getMinute(); + } + } + + private record DeviationSnapshot( + double beginMean, + double endMean, + double beginDeviation, + double endDeviation, + boolean computed + ) { + } + + private record ShiftDeviationSnapshot(boolean beginInRange, boolean endInRange) { + } } diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperRawDriverActivityPoint.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperRawDriverActivityPoint.java index 6e71f83..42f6aad 100644 --- a/src/main/java/at/procon/eventhub/esperpoc/service/EsperRawDriverActivityPoint.java +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperRawDriverActivityPoint.java @@ -14,6 +14,9 @@ public final class EsperRawDriverActivityPoint { private final String eventType; private final String lifecycle; private final String cardSlot; + private final String cardStatus; + private final String drivingStatus; + private final String sourceKind; public EsperRawDriverActivityPoint( UUID eventId, @@ -25,7 +28,10 @@ public final class EsperRawDriverActivityPoint { UUID vehicleRegistrationId, String eventType, String lifecycle, - String cardSlot + String cardSlot, + String cardStatus, + String drivingStatus, + String sourceKind ) { this.eventId = eventId; this.occurredAt = occurredAt; @@ -37,6 +43,9 @@ public final class EsperRawDriverActivityPoint { this.eventType = eventType; this.lifecycle = lifecycle; this.cardSlot = cardSlot; + this.cardStatus = cardStatus; + this.drivingStatus = drivingStatus; + this.sourceKind = sourceKind; } public UUID getEventId() { @@ -78,4 +87,16 @@ public final class EsperRawDriverActivityPoint { public String getCardSlot() { return cardSlot; } + + public String getCardStatus() { + return cardStatus; + } + + public String getDrivingStatus() { + return drivingStatus; + } + + public String getSourceKind() { + return sourceKind; + } } diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperResolvedShiftSpan.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperResolvedShiftSpan.java new file mode 100644 index 0000000..9458633 --- /dev/null +++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperResolvedShiftSpan.java @@ -0,0 +1,15 @@ +package at.procon.eventhub.esperpoc.service; + +import java.time.Duration; +import java.time.OffsetDateTime; + +record EsperResolvedShiftSpan( + OffsetDateTime startedAt, + OffsetDateTime endedAt, + OffsetDateTime nextShiftStartedAt, + long dailyRestingTimeSeconds +) { + long durationSeconds() { + return Duration.between(startedAt, endedAt).getSeconds(); + } +} diff --git a/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java b/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java new file mode 100644 index 0000000..8b710c0 --- /dev/null +++ b/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java @@ -0,0 +1,372 @@ +package at.procon.eventhub.persistence; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.OffsetDateTime; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; + +@Repository +public class DriverIdentityRepository { + + private final JdbcTemplate jdbcTemplate; + private final ObjectMapper objectMapper; + + public DriverIdentityRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) { + this.jdbcTemplate = jdbcTemplate; + this.objectMapper = objectMapper; + } + + public UUID resolveOrCreateDriverId( + String tenantKey, + int eventSourceId, + DriverRefDto driverRef + ) { + if (driverRef == null || !driverRef.hasAnyReference()) { + return null; + } + + String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); + String sourceDriverEntityId = normalizeNullable(driverRef.sourceEntityId()); + DriverCardRefDto driverCard = driverRef.driverCard(); + String cardNation = driverCard == null ? null : normalizeNullable(driverCard.nation()); + String cardNumber = driverCard == null ? null : normalizeNullable(driverCard.number()); + + UUID driverId = resolveDriverId(normalizedTenantKey, eventSourceId, sourceDriverEntityId, cardNation, cardNumber); + if (driverId == null) { + Map payload = new LinkedHashMap<>(); + put(payload, "source", "event"); + put(payload, "source_driver_entity_id", sourceDriverEntityId); + put(payload, "card_nation", cardNation); + put(payload, "card_number", cardNumber); + driverId = createDriver( + normalizedTenantKey, + eventSourceId, + sourceDriverEntityId, + cardNation, + cardNumber, + null, + null, + null, + payload + ); + } + + touchDriver(driverId, sourceDriverEntityId, cardNation, cardNumber); + return driverId; + } + + @Transactional + public int reconcileFromMasterData(String tenantKey, int eventSourceId) { + String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); + int updates = reconcileDriversFromMasterData(normalizedTenantKey, eventSourceId); + updates += projectDriverCardsFromMasterData(normalizedTenantKey, eventSourceId); + return updates; + } + + private int reconcileDriversFromMasterData(String tenantKey, int eventSourceId) { + Long count = jdbcTemplate.queryForObject( + compatibleSourcesCte() + """ + , master_drivers as ( + select distinct on (nullif(trim(source_entity_id), '')) + event_source_id, + nullif(trim(source_entity_id), '') as source_driver_entity_id, + nullif(trim(payload ->> 'first_names'), '') as first_names, + coalesce( + nullif(trim(payload ->> 'last_name'), ''), + nullif(trim(payload ->> 'surname'), '') + ) as last_name, + cast(nullif(trim(payload ->> 'birth_date'), '') as date) as birth_date, + source_updated_at, + payload + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'DRIVER' + and nullif(trim(source_entity_id), '') is not null + and source_entity_id not like 'DRIVER_CARD:%' + order by nullif(trim(source_entity_id), ''), updated_at desc + ), + updated_by_source as ( + update eventhub.driver driver + set first_names = coalesce(master.first_names, driver.first_names), + last_name = coalesce(master.last_name, driver.last_name), + birth_date = coalesce(master.birth_date, driver.birth_date), + source_updated_at = master.source_updated_at, + payload = driver.payload || master.payload, + updated_at = now() + from master_drivers master + where driver.tenant_key = ? + and driver.event_source_id in (select id from compatible_sources) + and driver.source_driver_entity_id = master.source_driver_entity_id + returning driver.id + ), + inserted as ( + insert into eventhub.driver( + id, tenant_key, event_source_id, source_driver_entity_id, + first_names, last_name, birth_date, source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + ?, + master.event_source_id, + master.source_driver_entity_id, + master.first_names, + master.last_name, + master.birth_date, + master.source_updated_at, + master.payload, + now() + from master_drivers master + where not exists ( + select 1 + from eventhub.driver existing + where existing.tenant_key = ? + and existing.event_source_id in (select id from compatible_sources) + and existing.source_driver_entity_id = master.source_driver_entity_id + ) + returning id + ) + select (select count(*) from updated_by_source) + + (select count(*) from inserted) + """, + Long.class, + eventSourceId, + tenantKey, + tenantKey, + tenantKey, + tenantKey + ); + return count == null ? 0 : Math.toIntExact(count); + } + + private int projectDriverCardsFromMasterData(String tenantKey, int eventSourceId) { + Long count = jdbcTemplate.queryForObject( + compatibleSourcesCte() + """ + , driver_card_projection as ( + select distinct on (rel.to_source_entity_id) + rel.to_source_entity_id as source_driver_entity_id, + nullif(trim(card.payload ->> 'card_nation'), '') as card_nation, + nullif(trim(card.payload ->> 'card_number'), '') as card_number, + rel.source_updated_at + from eventhub.source_master_relation rel + join eventhub.source_master_entity card + on card.tenant_key = rel.tenant_key + and card.event_source_id = rel.event_source_id + and card.entity_type = 'DRIVER_CARD' + and card.source_entity_id = rel.from_source_entity_id + where rel.tenant_key = ? + and rel.event_source_id in (select id from compatible_sources) + and rel.relation_type = 'DRIVER_CARD_DRIVER' + and rel.from_entity_type = 'DRIVER_CARD' + and rel.to_entity_type = 'DRIVER' + order by rel.to_source_entity_id, + rel.valid_to desc nulls last, + rel.valid_from desc nulls last, + rel.updated_at desc + ), + updated_by_source as ( + update eventhub.driver driver + set card_nation = coalesce(driver.card_nation, projection.card_nation), + card_number = coalesce(driver.card_number, projection.card_number), + source_updated_at = coalesce(projection.source_updated_at, driver.source_updated_at), + updated_at = now() + from driver_card_projection projection + where driver.tenant_key = ? + and driver.event_source_id in (select id from compatible_sources) + and driver.source_driver_entity_id = projection.source_driver_entity_id + and ( + (driver.card_nation is null and projection.card_nation is not null) + or (driver.card_number is null and projection.card_number is not null) + ) + returning driver.id + ) + select count(*) + from updated_by_source + """, + Long.class, + eventSourceId, + tenantKey, + tenantKey + ); + return count == null ? 0 : Math.toIntExact(count); + } + + private UUID resolveDriverId( + String tenantKey, + int eventSourceId, + String sourceDriverEntityId, + String cardNation, + String cardNumber + ) { + UUID driverId = findBySourceDriverEntityId(tenantKey, eventSourceId, sourceDriverEntityId); + if (driverId == null) { + driverId = findByCard(tenantKey, eventSourceId, cardNation, cardNumber); + } + return driverId; + } + + private UUID findBySourceDriverEntityId(String tenantKey, int eventSourceId, String sourceDriverEntityId) { + if (sourceDriverEntityId == null) { + return null; + } + return jdbcTemplate.query( + compatibleSourcesCte() + """ + select d.id + from eventhub.driver d + where d.tenant_key = ? + and d.event_source_id in (select id from compatible_sources) + and d.source_driver_entity_id = ? + order by d.updated_at desc + limit 1 + """, + rs -> rs.next() ? (UUID) rs.getObject("id") : null, + eventSourceId, + tenantKey, + sourceDriverEntityId + ); + } + + private UUID findByCard(String tenantKey, int eventSourceId, String cardNation, String cardNumber) { + if (cardNation == null || cardNumber == null) { + return null; + } + return jdbcTemplate.query( + compatibleSourcesCte() + """ + select d.id + from eventhub.driver d + where d.tenant_key = ? + and d.event_source_id in (select id from compatible_sources) + and d.card_nation = ? + and d.card_number = ? + order by d.updated_at desc + limit 1 + """, + rs -> rs.next() ? (UUID) rs.getObject("id") : null, + eventSourceId, + tenantKey, + cardNation, + cardNumber + ); + } + + private UUID createDriver( + String tenantKey, + int eventSourceId, + String sourceDriverEntityId, + String cardNation, + String cardNumber, + String firstNames, + String lastName, + OffsetDateTime sourceUpdatedAt, + Map payload + ) { + UUID driverId = UUID.randomUUID(); + jdbcTemplate.update( + """ + insert into eventhub.driver( + id, tenant_key, event_source_id, source_driver_entity_id, + card_nation, card_number, first_names, last_name, + source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) + """, + driverId, + tenantKey, + eventSourceId, + sourceDriverEntityId, + cardNation, + cardNumber, + firstNames, + lastName, + sourceUpdatedAt, + toJson(payload) + ); + return driverId; + } + + private void touchDriver( + UUID driverId, + String sourceDriverEntityId, + String cardNation, + String cardNumber + ) { + if (sourceDriverEntityId == null && cardNation == null && cardNumber == null) { + return; + } + jdbcTemplate.update( + """ + update eventhub.driver + set source_driver_entity_id = coalesce(source_driver_entity_id, cast(? as text)), + card_nation = coalesce(card_nation, cast(? as text)), + card_number = coalesce(card_number, cast(? as text)), + updated_at = now() + where id = ? + and ( + (source_driver_entity_id is null and cast(? as text) is not null) + or (card_nation is null and cast(? as text) is not null) + or (card_number is null and cast(? as text) is not null) + ) + """, + sourceDriverEntityId, + cardNation, + cardNumber, + driverId, + sourceDriverEntityId, + cardNation, + cardNumber + ); + } + + private String compatibleSourcesCte() { + return """ + with source_context as ( + select tenant_key, provider_key, source_instance_key, coalesce(tenant_provider_setting_key, '') as tenant_provider_setting_key + from eventhub.event_source + where id = ? + ), + compatible_sources as ( + select es.id + from eventhub.event_source es + join source_context ctx on ctx.tenant_key = es.tenant_key + and ctx.provider_key = es.provider_key + and ctx.source_instance_key = es.source_instance_key + and ctx.tenant_provider_setting_key = coalesce(es.tenant_provider_setting_key, '') + ) + """; + } + + private String toJson(Map value) { + try { + return objectMapper.writeValueAsString(value == null ? Map.of() : value); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Cannot serialize driver identity payload", e); + } + } + + private void put(Map payload, String key, Object value) { + if (value != null) { + payload.put(key, value); + } + } + + private String normalizeRequired(String value, String fieldName) { + String normalized = normalizeNullable(value); + if (normalized == null) { + throw new IllegalArgumentException(fieldName + " must not be blank"); + } + return normalized; + } + + private String normalizeNullable(String value) { + if (value == null) { + return null; + } + String trimmed = value.trim(); + return trimmed.isEmpty() ? null : trimmed; + } +} diff --git a/src/main/java/at/procon/eventhub/persistence/EventRepository.java b/src/main/java/at/procon/eventhub/persistence/EventRepository.java index a5bae89..04e5f0b 100644 --- a/src/main/java/at/procon/eventhub/persistence/EventRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/EventRepository.java @@ -32,6 +32,7 @@ public class EventRepository { private final ObjectMapper objectMapper; private final EventAcquisitionRecordKeyService recordKeyService; private final SourceMasterDataRepository sourceMasterDataRepository; + private final DriverIdentityRepository driverIdentityRepository; private final VehicleIdentityRepository vehicleIdentityRepository; public EventRepository( @@ -39,12 +40,14 @@ public class EventRepository { ObjectMapper objectMapper, EventAcquisitionRecordKeyService recordKeyService, SourceMasterDataRepository sourceMasterDataRepository, + DriverIdentityRepository driverIdentityRepository, VehicleIdentityRepository vehicleIdentityRepository ) { this.jdbcTemplate = jdbcTemplate; this.objectMapper = objectMapper; this.recordKeyService = recordKeyService; this.sourceMasterDataRepository = sourceMasterDataRepository; + this.driverIdentityRepository = driverIdentityRepository; this.vehicleIdentityRepository = vehicleIdentityRepository; } @@ -89,7 +92,7 @@ public class EventRepository { packageId, eventSourceId, event.externalSourceEventId(), - refs.driverEntityId(), + refs.driverId(), refs.vehicleId(), refs.vehicleRegistrationId(), sourcePackageId, @@ -121,7 +124,7 @@ public class EventRepository { data_package_id uuid not null, event_source_id integer not null, external_source_event_id text not null, - driver_entity_id uuid, + driver_id uuid, vehicle_id uuid, vehicle_registration_id uuid, source_package_id text, @@ -149,7 +152,7 @@ public class EventRepository { """ insert into eventhub_event_import_stage( row_no, source_record_key_hash, requested_event_id, data_package_id, event_source_id, - external_source_event_id, driver_entity_id, vehicle_id, vehicle_registration_id, + external_source_event_id, driver_id, vehicle_id, vehicle_registration_id, source_package_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at, event_domain, event_type, lifecycle, odometer_m, longitude, latitude, payload, manual_entry, event_signature_hash @@ -165,7 +168,7 @@ public class EventRepository { ps.setObject(4, row.packageId()); ps.setInt(5, row.eventSourceId()); ps.setString(6, row.externalSourceEventId()); - ps.setObject(7, row.driverEntityId()); + ps.setObject(7, row.driverId()); ps.setObject(8, row.vehicleId()); ps.setObject(9, row.vehicleRegistrationId()); ps.setString(10, row.sourcePackageId()); @@ -233,7 +236,7 @@ public class EventRepository { insert into eventhub.event( id, event_source_id, data_package_id, external_source_event_id, - driver_entity_id, vehicle_id, vehicle_registration_id, + driver_id, vehicle_id, vehicle_registration_id, source_package_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at, event_domain, event_type, lifecycle, @@ -244,7 +247,7 @@ public class EventRepository { select source_record.event_id, stage.event_source_id, stage.data_package_id, stage.external_source_event_id, - stage.driver_entity_id, stage.vehicle_id, stage.vehicle_registration_id, + stage.driver_id, stage.vehicle_id, stage.vehicle_registration_id, stage.source_package_id, stage.source_package_entity_id, source_record.event_occurred_at, stage.received_partner_at, stage.received_hub_at, stage.event_domain, stage.event_type, stage.lifecycle, @@ -357,13 +360,13 @@ public class EventRepository { Map entityIdCache, Map> vehicleRefCache ) { - UUID driverEntityId = resolveDriverEntityId(tenantKey, eventSourceId, event, entityIdCache); + UUID driverId = resolveDriverId(tenantKey, eventSourceId, event, entityIdCache); ResolvedVehicleReference vehicleRef = resolveVehicleReference(tenantKey, eventSourceId, event, vehicleRefCache); UUID sourcePackageEntityId = resolveSourcePackageEntityId(tenantKey, eventSourceId, event, entityIdCache); - return new ResolvedEntityRefs(driverEntityId, vehicleRef.vehicleId(), vehicleRef.vehicleRegistrationId(), sourcePackageEntityId); + return new ResolvedEntityRefs(driverId, vehicleRef.vehicleId(), vehicleRef.vehicleRegistrationId(), sourcePackageEntityId); } - private UUID resolveDriverEntityId( + private UUID resolveDriverId( String tenantKey, int eventSourceId, EventHubEventDto event, @@ -373,32 +376,16 @@ public class EventRepository { if (driverRef == null || !driverRef.hasAnyReference()) { return null; } - - DriverCardRefDto card = driverRef.driverCard(); - String cardKey = card == null || !card.hasValue() ? null : card.stableKey(); - String sourceEntityId = normalizeNullable(driverRef.sourceEntityId()); - if (sourceEntityId == null && cardKey != null) { - sourceEntityId = "DRIVER_CARD:" + cardKey; + String cacheKey = "DRIVER|" + driverRef.stableKey(); + UUID cached = entityIdCache.get(cacheKey); + if (cached != null) { + return cached; } - if (sourceEntityId == null) { - return null; + UUID resolved = driverIdentityRepository.resolveOrCreateDriverId(tenantKey, eventSourceId, driverRef); + if (resolved != null) { + entityIdCache.put(cacheKey, resolved); } - - Map payload = new LinkedHashMap<>(); - put(payload, "source_entity_id", driverRef.sourceEntityId()); - put(payload, "driver_card_nation", card == null ? null : card.nation()); - put(payload, "driver_card_number", card == null ? null : card.number()); - return resolveEntityId( - tenantKey, - eventSourceId, - "DRIVER", - sourceEntityId, - cardKey, - sourceEntityId, - null, - payload, - entityIdCache - ); + return resolved; } private ResolvedVehicleReference resolveVehicleReference( @@ -599,7 +586,7 @@ public class EventRepository { } private record ResolvedEntityRefs( - UUID driverEntityId, + UUID driverId, UUID vehicleId, UUID vehicleRegistrationId, UUID sourcePackageEntityId @@ -618,7 +605,7 @@ public class EventRepository { UUID packageId, int eventSourceId, String externalSourceEventId, - UUID driverEntityId, + UUID driverId, UUID vehicleId, UUID vehicleRegistrationId, String sourcePackageId, diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java index be1fcdd..ee6ca4b 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java @@ -6,6 +6,7 @@ import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.persistence.EventSourceRepository; import at.procon.eventhub.persistence.SourceMasterDataRepository; +import at.procon.eventhub.persistence.DriverIdentityRepository; import at.procon.eventhub.persistence.VehicleIdentityRepository; import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.io.IOException; @@ -52,6 +53,7 @@ public class TachographMasterDataRefreshService { private final ObjectProvider tachographJdbcTemplateProvider; private final SourceMasterDataRepository sourceMasterDataRepository; private final EventSourceRepository eventSourceRepository; + private final DriverIdentityRepository driverIdentityRepository; private final VehicleIdentityRepository vehicleIdentityRepository; private final ResourceLoader resourceLoader; @@ -59,12 +61,14 @@ public class TachographMasterDataRefreshService { @Qualifier("tachographNamedParameterJdbcTemplate") ObjectProvider tachographJdbcTemplateProvider, SourceMasterDataRepository sourceMasterDataRepository, EventSourceRepository eventSourceRepository, + DriverIdentityRepository driverIdentityRepository, VehicleIdentityRepository vehicleIdentityRepository, ResourceLoader resourceLoader ) { this.tachographJdbcTemplateProvider = tachographJdbcTemplateProvider; this.sourceMasterDataRepository = sourceMasterDataRepository; this.eventSourceRepository = eventSourceRepository; + this.driverIdentityRepository = driverIdentityRepository; this.vehicleIdentityRepository = vehicleIdentityRepository; this.resourceLoader = resourceLoader; } @@ -98,13 +102,17 @@ public class TachographMasterDataRefreshService { int relationCount = streamRelations(tachographJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE)); + log.info("Reconciling tachograph driver identities from source master data tenant={} source={}", + tenantKey, masterDataSource.stableKey()); + int reconciledDrivers = driverIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); + log.info("Reconciling tachograph vehicle identities from source master data tenant={} source={}", tenantKey, masterDataSource.stableKey()); int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount); - log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={} reconciledVehicles={}", - tenantKey, masterDataSource.stableKey(), result.entitiesUpserted(), result.relationsUpserted(), reconciledVehicles); + log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={} reconciledDrivers={} reconciledVehicles={}", + tenantKey, masterDataSource.stableKey(), result.entitiesUpserted(), result.relationsUpserted(), reconciledDrivers, reconciledVehicles); return result; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index bdbceb1..3c53752 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -63,7 +63,7 @@ eventhub: # Enables the scheduler that regularly triggers configured tachograph import plans. # Default is safe: no scheduled import starts unless explicitly enabled. - scheduler-enabled: false + scheduler-enabled: true scheduler-poll-interval-ms: 3600000 # PLAN_ONLY creates import_run + planned extraction packages. @@ -77,10 +77,10 @@ eventhub: # Example plan. Keep disabled until the tachograph datasource/extractor is wired. import-plans: - - plan-key: kralowetz-tachograph-org-147 - enabled: false + - plan-key: tachograph-org-14708 + enabled: true cron: "0 15 * * * *" # hourly at minute 15 - tenant-key: + tenant-key: Procon event-source: provider-key: TACHOGRAPH source-kind: MIXED @@ -89,16 +89,16 @@ eventhub: tenant-provider-setting-key: ByteBar-DriverSettlement source-group: type: ORGANISATION - source-entity-id: "147" - code: "147" - name: Kralowetz root organisation + source-entity-id: "14708" + code: "14708" + name: Zeller root organisation import-scope: type: SOURCE_ORGANISATION_SUBTREE root-source-organisation: type: ORGANISATION - source-entity-id: "147" - code: "147" - name: Kralowetz root organisation + source-entity-id: "14708" + code: "14708" + name: Zeller root organisation include-children: true occurred-from: null occurred-to: null @@ -115,11 +115,15 @@ eventhub: scheduled-mode: INCREMENTAL_UPDATE initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP scheduled-strategy: SOURCE_PACKAGE_WATERMARK - refresh-master-data-first: false + refresh-master-data-first: true initial-occurred-from: "2026-01-21T00:00:00+01:00" - initial-occurred-to: "2026-01-31T00:00:00+01:00" + initial-occurred-to: run-initial-on-startup: true + esper-poc: + activity-merge-mode: JAVA + shift-resolution-mode: JAVA + yellow-fox: default-chunk-days: 1 occurred-at-overlap: 2h diff --git a/src/main/resources/db/migration/V9__introduce_driver_identity_model.sql b/src/main/resources/db/migration/V9__introduce_driver_identity_model.sql new file mode 100644 index 0000000..0f622a5 --- /dev/null +++ b/src/main/resources/db/migration/V9__introduce_driver_identity_model.sql @@ -0,0 +1,137 @@ +create table if not exists eventhub.driver ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_driver_entity_id text, + card_nation text, + card_number text, + first_names text, + last_name text, + birth_date date, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint chk_driver_card_nation_when_number + check (card_number is null or card_nation is not null) +); + +insert into eventhub.driver( + id, tenant_key, event_source_id, source_driver_entity_id, + card_nation, card_number, first_names, last_name, birth_date, + source_updated_at, payload, created_at, updated_at +) +select gen_random_uuid(), + sme.tenant_key, + sme.event_source_id, + case + when sme.source_entity_id like 'DRIVER_CARD:%' then null + else sme.source_entity_id + end as source_driver_entity_id, + coalesce( + nullif(trim(sme.payload ->> 'card_nation'), ''), + nullif(trim(sme.payload ->> 'driver_card_nation'), ''), + nullif(split_part(case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else null end, ':', 1), '') + ) as card_nation, + coalesce( + nullif(trim(sme.payload ->> 'card_number'), ''), + nullif(trim(sme.payload ->> 'driver_card_number'), ''), + nullif(substring(case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else null end from position(':' in case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else '' end) + 1), '') + ) as card_number, + coalesce( + nullif(trim(sme.payload ->> 'first_names'), ''), + nullif(trim(sme.payload ->> 'firstnames'), '') + ) as first_names, + coalesce( + nullif(trim(sme.payload ->> 'last_name'), ''), + nullif(trim(sme.payload ->> 'surname'), '') + ) as last_name, + cast(nullif(trim(sme.payload ->> 'birth_date'), '') as date) as birth_date, + sme.source_updated_at, + sme.payload, + sme.created_at, + sme.updated_at +from eventhub.source_master_entity sme +where sme.entity_type = 'DRIVER' + and not exists ( + select 1 + from eventhub.driver existing + where existing.tenant_key = sme.tenant_key + and existing.event_source_id = sme.event_source_id + and existing.source_driver_entity_id is not distinct from case + when sme.source_entity_id like 'DRIVER_CARD:%' then null + else sme.source_entity_id + end + and existing.card_nation is not distinct from coalesce( + nullif(trim(sme.payload ->> 'card_nation'), ''), + nullif(trim(sme.payload ->> 'driver_card_nation'), ''), + nullif(split_part(case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else null end, ':', 1), '') + ) + and existing.card_number is not distinct from coalesce( + nullif(trim(sme.payload ->> 'card_number'), ''), + nullif(trim(sme.payload ->> 'driver_card_number'), ''), + nullif(substring(case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else null end from position(':' in case when sme.source_entity_id like 'DRIVER_CARD:%' then substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) else '' end) + 1), '') + ) + ); + +alter table eventhub.event + add column if not exists driver_id uuid references eventhub.driver(id); + +with source_driver_map as ( + select distinct on (sme.id) + sme.id as source_master_entity_id, + driver.id as driver_id + from eventhub.source_master_entity sme + join eventhub.driver driver + on driver.tenant_key = sme.tenant_key + and driver.event_source_id = sme.event_source_id + and ( + ( + sme.source_entity_id not like 'DRIVER_CARD:%' + and driver.source_driver_entity_id = sme.source_entity_id + ) or ( + sme.source_entity_id like 'DRIVER_CARD:%' + and driver.card_nation is not distinct from coalesce( + nullif(trim(sme.payload ->> 'card_nation'), ''), + nullif(trim(sme.payload ->> 'driver_card_nation'), ''), + nullif(split_part(substring(sme.source_entity_id from length('DRIVER_CARD:') + 1), ':', 1), '') + ) + and driver.card_number is not distinct from coalesce( + nullif(trim(sme.payload ->> 'card_number'), ''), + nullif(trim(sme.payload ->> 'driver_card_number'), ''), + nullif(substring(substring(sme.source_entity_id from length('DRIVER_CARD:') + 1) from position(':' in substring(sme.source_entity_id from length('DRIVER_CARD:') + 1)) + 1), '') + ) + ) + ) + where sme.entity_type = 'DRIVER' + order by sme.id, driver.updated_at desc +) +update eventhub.event event +set driver_id = map.driver_id +from source_driver_map map +where event.driver_entity_id = map.source_master_entity_id + and event.driver_id is null; + +create index if not exists idx_driver_source_entity + on eventhub.driver(tenant_key, event_source_id, source_driver_entity_id) + where source_driver_entity_id is not null; + +create index if not exists idx_driver_card + on eventhub.driver(tenant_key, event_source_id, card_nation, card_number) + where card_number is not null; + +create index if not exists idx_event_driver_time + on eventhub.event(driver_id, occurred_at desc) + where driver_id is not null; + +alter table eventhub.event + drop constraint if exists chk_event_driver_or_vehicle_ref; + +alter table eventhub.event + add constraint chk_event_driver_or_vehicle_ref + check ( + driver_id is not null + or driver_entity_id is not null + or vehicle_id is not null + or vehicle_registration_id is not null + ); diff --git a/src/main/resources/sql/maintenance/repair-tachograph-driver-entities.sql b/src/main/resources/sql/maintenance/repair-tachograph-driver-entities.sql new file mode 100644 index 0000000..6eadb75 --- /dev/null +++ b/src/main/resources/sql/maintenance/repair-tachograph-driver-entities.sql @@ -0,0 +1,239 @@ +/* + * Repairs and normalizes tachograph driver aggregates after introducing eventhub.driver. + * + * What it does: + * 1. Ensures tachograph DRIVER master-data payload carries last_name while keeping source_master_entity.display_name unchanged. + * 2. Upserts eventhub.driver rows from MASTER_DATA DRIVER entities. + * 3. Projects card nation/number onto eventhub.driver from DRIVER_CARD_DRIVER relations. + * 4. Remaps event.driver_id from provisional card-only drivers to proper source-driver aggregates when possible. + * 5. Deletes now-unreferenced provisional tachograph driver rows with no source_driver_entity_id. + * + * Assumptions: + * - Tachograph master-data source is provider_key=TACHOGRAPH, source_kind=MASTER_DATA, source_key=TACHOGRAPH_MASTER_DATA. + * - eventhub.driver and event.driver_id already exist. + */ + +-- 1) Keep display_name, but ensure DRIVER payload has last_name. +with master_sources as ( + select es.id, es.tenant_key + from eventhub.event_source es + where es.provider_key = 'TACHOGRAPH' + and es.source_kind = 'MASTER_DATA' + and es.source_key = 'TACHOGRAPH_MASTER_DATA' +), +updated_master_payload as ( + update eventhub.source_master_entity sme + set payload = jsonb_strip_nulls( + sme.payload + || jsonb_build_object( + 'first_names', coalesce(sme.payload ->> 'first_names', sme.payload ->> 'firstnames'), + 'last_name', coalesce(sme.payload ->> 'last_name', sme.payload ->> 'surname') + ) + ), + updated_at = now() + from master_sources ms + where sme.tenant_key = ms.tenant_key + and sme.event_source_id = ms.id + and sme.entity_type = 'DRIVER' + returning sme.id +) +select count(*) as updated_master_payload +from updated_master_payload; + +-- 2) Upsert driver aggregates from tachograph master data. +with master_sources as ( + select es.id, + es.tenant_key, + es.source_instance_key, + coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key + from eventhub.event_source es + where es.provider_key = 'TACHOGRAPH' + and es.source_kind = 'MASTER_DATA' + and es.source_key = 'TACHOGRAPH_MASTER_DATA' +), +master_drivers as ( + select ms.id as master_event_source_id, + ms.tenant_key, + ms.source_instance_key, + ms.tenant_provider_setting_key, + d.source_entity_id as source_driver_entity_id, + coalesce(nullif(trim(d.payload ->> 'first_names'), ''), nullif(trim(d.payload ->> 'firstnames'), '')) as first_names, + coalesce(nullif(trim(d.payload ->> 'last_name'), ''), nullif(trim(d.payload ->> 'surname'), '')) as last_name, + cast(nullif(trim(d.payload ->> 'birth_date'), '') as date) as birth_date, + d.source_updated_at, + d.payload + from master_sources ms + join eventhub.source_master_entity d + on d.tenant_key = ms.tenant_key + and d.event_source_id = ms.id + and d.entity_type = 'DRIVER' + and d.source_entity_id not like 'DRIVER_CARD:%' +), +compatible_targets as ( + select md.*, + es.id as target_event_source_id + from master_drivers md + join eventhub.event_source es + on es.tenant_key = md.tenant_key + and es.provider_key = 'TACHOGRAPH' + and es.source_instance_key = md.source_instance_key + and coalesce(es.tenant_provider_setting_key, '') = md.tenant_provider_setting_key +), +updated_drivers as ( + update eventhub.driver driver + set first_names = coalesce(ct.first_names, driver.first_names), + last_name = coalesce(ct.last_name, driver.last_name), + birth_date = coalesce(ct.birth_date, driver.birth_date), + source_updated_at = ct.source_updated_at, + payload = driver.payload || ct.payload, + updated_at = now() + from compatible_targets ct + where driver.tenant_key = ct.tenant_key + and driver.event_source_id = ct.target_event_source_id + and driver.source_driver_entity_id = ct.source_driver_entity_id + returning driver.id +), +inserted_drivers as ( + insert into eventhub.driver( + id, tenant_key, event_source_id, source_driver_entity_id, + first_names, last_name, birth_date, source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + ct.tenant_key, + ct.target_event_source_id, + ct.source_driver_entity_id, + ct.first_names, + ct.last_name, + ct.birth_date, + ct.source_updated_at, + ct.payload, + now() + from compatible_targets ct + where not exists ( + select 1 + from eventhub.driver existing + where existing.tenant_key = ct.tenant_key + and existing.event_source_id = ct.target_event_source_id + and existing.source_driver_entity_id = ct.source_driver_entity_id + ) + returning id +) +select (select count(*) from updated_drivers) as updated_drivers, + (select count(*) from inserted_drivers) as inserted_drivers; + +-- 3) Project driver-card identifiers from master-data relations. +with master_sources as ( + select es.id, + es.tenant_key, + es.source_instance_key, + coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key + from eventhub.event_source es + where es.provider_key = 'TACHOGRAPH' + and es.source_kind = 'MASTER_DATA' + and es.source_key = 'TACHOGRAPH_MASTER_DATA' +), +card_projection as ( + select distinct on (ms.tenant_key, ms.source_instance_key, ms.tenant_provider_setting_key, rel.to_source_entity_id) + ms.tenant_key, + ms.source_instance_key, + ms.tenant_provider_setting_key, + rel.to_source_entity_id as source_driver_entity_id, + nullif(trim(card.payload ->> 'card_nation'), '') as card_nation, + nullif(trim(card.payload ->> 'card_number'), '') as card_number, + rel.source_updated_at + from master_sources ms + join eventhub.source_master_relation rel + on rel.tenant_key = ms.tenant_key + and rel.event_source_id = ms.id + and rel.relation_type = 'DRIVER_CARD_DRIVER' + and rel.from_entity_type = 'DRIVER_CARD' + and rel.to_entity_type = 'DRIVER' + join eventhub.source_master_entity card + on card.tenant_key = ms.tenant_key + and card.event_source_id = ms.id + and card.entity_type = 'DRIVER_CARD' + and card.source_entity_id = rel.from_source_entity_id + order by ms.tenant_key, + ms.source_instance_key, + ms.tenant_provider_setting_key, + rel.to_source_entity_id, + rel.valid_to desc nulls last, + rel.valid_from desc nulls last, + rel.updated_at desc +), +updated_driver_cards as ( + update eventhub.driver driver + set card_nation = coalesce(driver.card_nation, projection.card_nation), + card_number = coalesce(driver.card_number, projection.card_number), + source_updated_at = coalesce(projection.source_updated_at, driver.source_updated_at), + updated_at = now() + from card_projection projection + join eventhub.event_source es + on es.id = driver.event_source_id + where driver.tenant_key = projection.tenant_key + and es.provider_key = 'TACHOGRAPH' + and es.source_instance_key = projection.source_instance_key + and coalesce(es.tenant_provider_setting_key, '') = projection.tenant_provider_setting_key + and driver.source_driver_entity_id = projection.source_driver_entity_id + and ( + (driver.card_nation is null and projection.card_nation is not null) + or (driver.card_number is null and projection.card_number is not null) + ) + returning driver.id +) +select count(*) as updated_driver_cards +from updated_driver_cards; + +-- 4) Remap events from provisional card-only drivers to proper source-driver aggregates. +with provisional_to_real as ( + select provisional.id as provisional_driver_id, + real.id as real_driver_id + from eventhub.driver provisional + join eventhub.event_source provisional_source + on provisional_source.id = provisional.event_source_id + and provisional_source.provider_key = 'TACHOGRAPH' + join eventhub.driver real + on real.tenant_key = provisional.tenant_key + and real.source_driver_entity_id is not null + and real.card_nation = provisional.card_nation + and real.card_number = provisional.card_number + join eventhub.event_source real_source + on real_source.id = real.event_source_id + and real_source.provider_key = provisional_source.provider_key + and real_source.tenant_key = provisional_source.tenant_key + and real_source.source_instance_key = provisional_source.source_instance_key + and coalesce(real_source.tenant_provider_setting_key, '') = coalesce(provisional_source.tenant_provider_setting_key, '') + where provisional.source_driver_entity_id is null + and provisional.card_nation is not null + and provisional.card_number is not null + and provisional.id <> real.id +), +updated_events as ( + update eventhub.event e + set driver_id = map.real_driver_id + from provisional_to_real map + where e.driver_id = map.provisional_driver_id + and e.driver_id <> map.real_driver_id + returning e.id +) +select count(*) as remapped_events +from updated_events; + +-- 5) Delete now-unreferenced provisional tachograph driver rows. +with deleted_drivers as ( + delete from eventhub.driver driver + using eventhub.event_source es + where es.id = driver.event_source_id + and es.provider_key = 'TACHOGRAPH' + and driver.source_driver_entity_id is null + and driver.card_nation is not null + and driver.card_number is not null + and not exists ( + select 1 + from eventhub.event e + where e.driver_id = driver.id + ) + returning driver.id +) +select count(*) as deleted_provisional_drivers +from deleted_drivers; diff --git a/src/main/resources/sql/tachograph/master-data/drivers.sql b/src/main/resources/sql/tachograph/master-data/drivers.sql index 8495fa2..65c070e 100644 --- a/src/main/resources/sql/tachograph/master-data/drivers.sql +++ b/src/main/resources/sql/tachograph/master-data/drivers.sql @@ -9,6 +9,7 @@ select d.LastUpdate as source_updated_at, d.ID as driver_id, d.Surname as surname, + d.Surname as last_name, d.Firstnames as first_names, d.Birthdate as birth_date, d.BirthPlace as birth_place, diff --git a/src/test/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngineTest.java b/src/test/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngineTest.java index faa0f81..9c7264f 100644 --- a/src/test/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngineTest.java +++ b/src/test/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngineTest.java @@ -45,6 +45,8 @@ class EsperDriverActivityEngineTest { OffsetDateTime.parse(occurredAt), rowId, "TACHOGRAPH:CARD_ACTIVITY:" + rowId + ":" + lifecycle, + "DRIVER_CARD", + "CARD_ACTIVITY", driverId, vehicleId, null, diff --git a/src/test/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityServiceTest.java b/src/test/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityServiceTest.java index 4c10b43..8fceca8 100644 --- a/src/test/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityServiceTest.java +++ b/src/test/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityServiceTest.java @@ -3,7 +3,10 @@ package at.procon.eventhub.esperpoc.service; import static org.assertj.core.api.Assertions.assertThat; import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto; +import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode; import at.procon.eventhub.esperpoc.dto.EsperPocRequest; +import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode; +import java.time.Duration; import java.time.OffsetDateTime; import java.util.List; import java.util.UUID; @@ -11,7 +14,8 @@ import org.junit.jupiter.api.Test; class EsperPocDriverCardActivityServiceTest { - private final EsperPocDriverCardActivityService service = new EsperPocDriverCardActivityService(null, null); + private final EsperDriverActivityEngine engine = new EsperDriverActivityEngine(); + private final EsperPocDriverCardActivityService service = new EsperPocDriverCardActivityService(null, engine); @Test void mergesIdenticalActivitiesAcrossUtcMidnight() { @@ -22,6 +26,9 @@ class EsperPocDriverCardActivityServiceTest { null, "BREAK_REST", "DRIVER", + "INSERTED", + "SINGLE", + "DRIVER_CARD", OffsetDateTime.parse("2026-04-30T23:50:00Z"), OffsetDateTime.parse("2026-05-01T00:00:00Z"), "1" @@ -32,6 +39,9 @@ class EsperPocDriverCardActivityServiceTest { null, "BREAK_REST", "DRIVER", + "INSERTED", + "SINGLE", + "DRIVER_CARD", OffsetDateTime.parse("2026-05-01T00:00:00Z"), OffsetDateTime.parse("2026-05-01T00:20:00Z"), "2" @@ -39,7 +49,7 @@ class EsperPocDriverCardActivityServiceTest { var merged = service.mergeConsecutiveIdenticalActivities( List.of(beforeMidnight, afterMidnight), - java.time.Duration.ZERO + Duration.ZERO ); assertThat(merged).hasSize(1); @@ -52,25 +62,16 @@ class EsperPocDriverCardActivityServiceTest { @Test void splitsOperatingPeriodsByBreakRestLongerThanConfiguredHoursAndEvaluatesDepartureArrivalPerPeriod() { UUID driverId = UUID.randomUUID(); - EsperPocRequest request = new EsperPocRequest( - "default", - driverId, - OffsetDateTime.parse("2026-04-01T00:00:00Z"), - OffsetDateTime.parse("2026-04-03T00:00:00Z"), - 24, - 3, - 60, - 7 - ); + EsperPocRequest request = request(driverId, null, null); List activities = List.of( - activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T08:00:00Z", "d1"), - activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1"), - activity(driverId, "DRIVE", "2026-04-01T09:30:00Z", "2026-04-01T11:00:00Z", "d2"), - activity(driverId, "BREAK_REST", "2026-04-01T20:00:00Z", "2026-04-02T04:30:00Z", "r1"), - activity(driverId, "DRIVE", "2026-04-02T05:00:00Z", "2026-04-02T07:00:00Z", "d3"), - activity(driverId, "BREAK_REST", "2026-04-02T10:00:00Z", "2026-04-02T10:30:00Z", "r2"), - activity(driverId, "DRIVE", "2026-04-02T10:30:00Z", "2026-04-02T12:00:00Z", "d4") + activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T08:00:00Z", "d1", "DRIVER_CARD"), + activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1", "DRIVER_CARD"), + activity(driverId, "DRIVE", "2026-04-01T09:30:00Z", "2026-04-01T11:00:00Z", "d2", "DRIVER_CARD"), + activity(driverId, "BREAK_REST", "2026-04-01T20:00:00Z", "2026-04-02T04:30:00Z", "r1", "DRIVER_CARD"), + activity(driverId, "DRIVE", "2026-04-02T05:00:00Z", "2026-04-02T07:00:00Z", "d3", "DRIVER_CARD"), + activity(driverId, "BREAK_REST", "2026-04-02T10:00:00Z", "2026-04-02T10:30:00Z", "r2", "DRIVER_CARD"), + activity(driverId, "DRIVE", "2026-04-02T10:30:00Z", "2026-04-02T12:00:00Z", "d4", "DRIVER_CARD") ); var periods = service.buildOperatingTimePeriods( @@ -99,13 +100,135 @@ class EsperPocDriverCardActivityServiceTest { assertThat(periods.get(1).workingOperationTimes().breakRestSeconds()).isEqualTo(30 * 60L); } - private ActivityIntervalDto activity(UUID driverId, String activity, String from, String to, String sourceRowId) { + @Test + void usesVuIntervalsOnlyForUncoveredDriverCardGaps() { + UUID driverId = UUID.randomUUID(); + List resolved = service.resolveVuFillGaps( + List.of( + activity(driverId, "DRIVE", "2026-04-01T08:00:00Z", "2026-04-01T10:00:00Z", "c1", "DRIVER_CARD") + ), + List.of( + activity(driverId, "DRIVE", "2026-04-01T09:00:00Z", "2026-04-01T11:00:00Z", "v1", "VEHICLE_UNIT") + ) + ); + + assertThat(resolved).hasSize(2); + assertThat(resolved.get(0).sourceKind()).isEqualTo("DRIVER_CARD"); + assertThat(resolved.get(1).sourceKind()).isEqualTo("VEHICLE_UNIT"); + assertThat(resolved.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T10:00:00Z")); + assertThat(resolved.get(1).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z")); + } + + @Test + void resolvesTwoWorkingShiftsAcrossLongRestGap() { + UUID driverId = UUID.randomUUID(); + EsperPocRequest request = request(driverId, null, null); + + var shifts = service.buildResolvedWorkShifts( + request, + request.occurredFrom(), + request.occurredTo(), + List.of( + activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T10:00:00Z", "d1", "DRIVER_CARD"), + activity(driverId, "BREAK_REST", "2026-04-01T10:00:00Z", "2026-04-01T17:30:00Z", "r1", "DRIVER_CARD"), + activity(driverId, "WORK", "2026-04-01T17:30:00Z", "2026-04-01T19:00:00Z", "w1", "VEHICLE_UNIT") + ) + ); + + assertThat(shifts).hasSize(2); + assertThat(shifts.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T06:00:00Z")); + assertThat(shifts.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T10:00:00Z")); + assertThat(shifts.get(0).dailyRestingTimeSeconds()).isEqualTo(7L * 60L * 60L + 30L * 60L); + assertThat(shifts.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T17:30:00Z")); + assertThat(shifts.get(1).usedDataKind()).isEqualTo("VEHICLE_UNIT"); + } + + @Test + void esperMergeModeMatchesJavaMergeMode() { + UUID driverId = UUID.randomUUID(); + List activities = List.of( + activity(driverId, "BREAK_REST", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "r1", "DRIVER_CARD"), + activity(driverId, "BREAK_REST", "2026-04-01T09:00:00Z", "2026-04-01T10:00:00Z", "r2", "DRIVER_CARD"), + activity(driverId, "WORK", "2026-04-01T10:05:00Z", "2026-04-01T11:00:00Z", "w1", "DRIVER_CARD") + ); + + List javaMerged = service.mergeConsecutiveIdenticalActivities( + activities, + Duration.ofSeconds(60) + ); + List esperMerged = engine.mergeConsecutiveIdenticalActivities( + activities, + Duration.ofSeconds(60) + ); + + assertThat(esperMerged).usingRecursiveComparison().isEqualTo(javaMerged); + } + + @Test + void esperShiftResolutionModeMatchesJavaShiftResolutionMode() { + UUID driverId = UUID.randomUUID(); + EsperPocRequest javaRequest = request(driverId, EsperActivityMergeMode.JAVA, EsperShiftResolutionMode.JAVA); + EsperPocRequest esperRequest = request(driverId, EsperActivityMergeMode.JAVA, EsperShiftResolutionMode.ESPER); + List intervals = List.of( + activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T10:00:00Z", "d1", "DRIVER_CARD"), + activity(driverId, "BREAK_REST", "2026-04-01T10:00:00Z", "2026-04-01T17:30:00Z", "r1", "DRIVER_CARD"), + activity(driverId, "WORK", "2026-04-01T17:30:00Z", "2026-04-01T19:00:00Z", "w1", "VEHICLE_UNIT") + ); + + var javaShifts = service.buildResolvedWorkShifts( + javaRequest, + javaRequest.occurredFrom(), + javaRequest.occurredTo(), + intervals + ); + var esperShifts = service.buildResolvedWorkShifts( + esperRequest, + esperRequest.occurredFrom(), + esperRequest.occurredTo(), + intervals + ); + + assertThat(esperShifts).usingRecursiveComparison().isEqualTo(javaShifts); + } + + private EsperPocRequest request( + UUID driverId, + EsperActivityMergeMode activityMergeMode, + EsperShiftResolutionMode shiftResolutionMode + ) { + return new EsperPocRequest( + "default", + driverId, + OffsetDateTime.parse("2026-04-01T00:00:00Z"), + OffsetDateTime.parse("2026-04-03T00:00:00Z"), + 24, + 3, + 60, + 7, + 420, + 5, + activityMergeMode, + shiftResolutionMode + ); + } + + private ActivityIntervalDto activity( + UUID driverId, + String activity, + String from, + String to, + String sourceRowId, + String sourceKind + ) { return ActivityIntervalDto.raw( driverId, null, null, activity, "DRIVER", + "INSERTED", + "KNOWN", + sourceKind, OffsetDateTime.parse(from), OffsetDateTime.parse(to), sourceRowId