From e4e9137af50700928cfd70a6bba835879c15531a Mon Sep 17 00:00:00 2001
From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com>
Date: Tue, 5 May 2026 15:29:10 +0200
Subject: [PATCH] Add Esper tachograph activity PoC
---
docs/esper-poc/driver-card-activity-poc.md | 94 +++++
pom.xml | 17 +
...eventhub-esper-poc.postman_collection.json | 84 ++++
.../esperpoc/api/EsperPocController.java | 48 +++
.../esperpoc/dto/ActivityIntervalDto.java | 81 ++++
.../esperpoc/dto/DriverWorkSummaryDto.java | 19 +
.../esperpoc/dto/DrivingInterruptionDto.java | 12 +
.../esperpoc/dto/EsperPocRequest.java | 27 ++
.../esperpoc/dto/EsperPocResultDto.java | 28 ++
.../esperpoc/dto/OperatingTimePeriodDto.java | 17 +
.../esperpoc/dto/RawActivityEventDto.java | 21 +
.../dto/ShiftDrivingEvaluationDto.java | 14 +
.../EsperPocActivityRepository.java | 83 ++++
.../service/EsperDriverActivityEngine.java | 126 ++++++
.../EsperPocDriverCardActivityService.java | 397 ++++++++++++++++++
.../service/EsperRawDriverActivityPoint.java | 81 ++++
.../EsperDriverActivityEngineTest.java | 59 +++
...EsperPocDriverCardActivityServiceTest.java | 114 +++++
18 files changed, 1322 insertions(+)
create mode 100644 docs/esper-poc/driver-card-activity-poc.md
create mode 100644 postman/eventhub-esper-poc.postman_collection.json
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/ActivityIntervalDto.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/DriverWorkSummaryDto.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/DrivingInterruptionDto.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocRequest.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocResultDto.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/OperatingTimePeriodDto.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/RawActivityEventDto.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/dto/ShiftDrivingEvaluationDto.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/persistence/EsperPocActivityRepository.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngine.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityService.java
create mode 100644 src/main/java/at/procon/eventhub/esperpoc/service/EsperRawDriverActivityPoint.java
create mode 100644 src/test/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngineTest.java
create mode 100644 src/test/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityServiceTest.java
diff --git a/docs/esper-poc/driver-card-activity-poc.md b/docs/esper-poc/driver-card-activity-poc.md
new file mode 100644
index 0000000..a53f2f9
--- /dev/null
+++ b/docs/esper-poc/driver-card-activity-poc.md
@@ -0,0 +1,94 @@
+# Esper PoC: Tachograph Driver-Card Activity Evaluation
+
+This PoC intentionally uses only existing imported EventHub source events:
+
+- provider: `TACHOGRAPH`
+- source kind: `DRIVER_CARD`
+- extraction code: `CARD_ACTIVITY`
+- event domain: `DRIVER_ACTIVITY`
+- event types: `DRIVE`, `WORK`, `AVAILABILITY`, `BREAK_REST`
+- lifecycles: `START`, `END`
+
+It does not introduce canonical-event tables yet. The goal is to prove that Esper can replay one driver and one selected period and produce activity-level and operating-period results.
+
+## Endpoint
+
+```http
+GET /api/eventhub/esper-poc/tachograph/driver-card-activities
+```
+
+Query parameters:
+
+| Parameter | Required | Example | Meaning |
+|---|---:|---|---|
+| `tenantKey` | yes | `default` | EventHub tenant key. |
+| `driverEntityId` | yes | UUID | Existing `eventhub.event.driver_entity_id`. |
+| `occurredFrom` | yes | `2026-04-01T00:00:00Z` | Requested period start. |
+| `occurredTo` | yes | `2026-05-01T00:00:00Z` | Requested period end. |
+| `guardHours` | no | `24` | Extra loading window before/after requested period. Needed for activities crossing midnight/month boundaries and long rests crossing period boundaries. |
+| `significantDrivingMinutes` | no | `3` | DRIVE intervals longer than this threshold count as significant driving periods. |
+| `mergeGapSeconds` | no | `60` | Consecutive identical activities are merged if the gap is at most this value. |
+| `operatingPeriodSplitRestHours` | no | `7` | A `BREAK_REST` activity longer than this threshold splits operating time periods. |
+
+## Produced levels
+
+### Level 1: Raw
+
+`raw` contains the original point events from `eventhub.event`.
+
+### Level 2: Activities
+
+Esper consumes the raw point events and produces intervals by pairing:
+
+```text
+START + END with same sourceRowId, activity type, driver and card slot
+```
+
+The service merges consecutive identical activities in the full guard window first, then clips the merged activities to the requested period. This is important because a long `BREAK_REST` crossing the requested boundary must keep its full guard-window duration for operating-period splitting.
+
+## Operating time periods
+
+`operatingTimePeriods` are derived from the merged activity timeline.
+
+A `BREAK_REST` interval splits operating periods when:
+
+```text
+activityType = BREAK_REST
+and duration > operatingPeriodSplitRestHours
+```
+
+The default is 7 hours. With the default, a `BREAK_REST` of exactly 7 hours does not split; it must be longer than 7 hours.
+
+Each operating period contains:
+
+- `sequenceNumber`
+- `startedAt`
+- `endedAt`
+- `durationSeconds`
+- `activities`
+- `workingOperationTimes`
+- `drivingTimeInterruptionEvaluation`
+- optional references to the long rest before/after the period
+
+Departure and arrival are evaluated per operating period:
+
+- departure = first significant `DRIVE` interval inside that operating period
+- arrival = end of the last significant `DRIVE` interval inside that operating period
+- middle/interruption = gaps between significant `DRIVE` intervals inside the same operating period
+
+## Result semantics
+
+- `workResultPerDriver` and `workingOperationTimesPerEmployee` currently use the same PoC summary for the whole requested period.
+- `workingSeconds = DRIVE + WORK`.
+- `operationSeconds = DRIVE + WORK + AVAILABILITY`.
+- `breakRestSeconds` is reported separately.
+- Top-level `drivingTimeInterruptionEvaluation` evaluates the whole requested period.
+- Each item in `operatingTimePeriods` has its own `drivingTimeInterruptionEvaluation`.
+
+## Current limitations
+
+- Uses the existing source-level `driver_entity_id`, not a canonical employee table.
+- Reads only tachograph driver-card activity events.
+- Does not merge VU/card duplication.
+- Does not persist results; the endpoint returns a PoC calculation response.
+- Esper is used for interval creation. Summary, clipping, operating-period split, and merged activity report calculation are implemented in Java for auditability and easier future migration to canonical events.
diff --git a/pom.xml b/pom.xml
index 73f332f..19ce112 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,6 +20,7 @@
21
4.18.2
+ 9.0.0
@@ -88,6 +89,22 @@
runtime
+
+ com.espertech
+ esper-common
+ ${esper.version}
+
+
+ com.espertech
+ esper-compiler
+ ${esper.version}
+
+
+ com.espertech
+ esper-runtime
+ ${esper.version}
+
+
org.springframework.boot
spring-boot-starter-test
diff --git a/postman/eventhub-esper-poc.postman_collection.json b/postman/eventhub-esper-poc.postman_collection.json
new file mode 100644
index 0000000..a665d02
--- /dev/null
+++ b/postman/eventhub-esper-poc.postman_collection.json
@@ -0,0 +1,84 @@
+{
+ "info": {
+ "name": "EventHub Esper PoC",
+ "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
+ },
+ "item": [
+ {
+ "name": "Evaluate tachograph driver-card activities",
+ "request": {
+ "method": "GET",
+ "header": [],
+ "url": {
+ "raw": "{{baseUrl}}/api/eventhub/esper-poc/tachograph/driver-card-activities?tenantKey={{tenantKey}}&driverEntityId={{driverEntityId}}&occurredFrom={{occurredFrom}}&occurredTo={{occurredTo}}&guardHours=24&significantDrivingMinutes=3&mergeGapSeconds=60&operatingPeriodSplitRestHours=7",
+ "host": [
+ "{{baseUrl}}"
+ ],
+ "path": [
+ "api",
+ "eventhub",
+ "esper-poc",
+ "tachograph",
+ "driver-card-activities"
+ ],
+ "query": [
+ {
+ "key": "tenantKey",
+ "value": "{{tenantKey}}"
+ },
+ {
+ "key": "driverEntityId",
+ "value": "{{driverEntityId}}"
+ },
+ {
+ "key": "occurredFrom",
+ "value": "{{occurredFrom}}"
+ },
+ {
+ "key": "occurredTo",
+ "value": "{{occurredTo}}"
+ },
+ {
+ "key": "guardHours",
+ "value": "24"
+ },
+ {
+ "key": "significantDrivingMinutes",
+ "value": "3"
+ },
+ {
+ "key": "mergeGapSeconds",
+ "value": "60"
+ },
+ {
+ "key": "operatingPeriodSplitRestHours",
+ "value": "7"
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "variable": [
+ {
+ "key": "baseUrl",
+ "value": "http://localhost:8080"
+ },
+ {
+ "key": "tenantKey",
+ "value": "default"
+ },
+ {
+ "key": "driverEntityId",
+ "value": "00000000-0000-0000-0000-000000000000"
+ },
+ {
+ "key": "occurredFrom",
+ "value": "2026-04-01T00:00:00Z"
+ },
+ {
+ "key": "occurredTo",
+ "value": "2026-05-01T00:00:00Z"
+ }
+ ]
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java b/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java
new file mode 100644
index 0000000..7724045
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/api/EsperPocController.java
@@ -0,0 +1,48 @@
+package at.procon.eventhub.esperpoc.api;
+
+import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
+import at.procon.eventhub.esperpoc.dto.EsperPocResultDto;
+import at.procon.eventhub.esperpoc.service.EsperPocDriverCardActivityService;
+import java.time.OffsetDateTime;
+import java.util.UUID;
+import org.springframework.format.annotation.DateTimeFormat;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api/eventhub/esper-poc")
+public class EsperPocController {
+
+ private final EsperPocDriverCardActivityService service;
+
+ public EsperPocController(EsperPocDriverCardActivityService service) {
+ this.service = service;
+ }
+
+ @GetMapping("/tachograph/driver-card-activities")
+ public ResponseEntity evaluateDriverCardActivities(
+ @RequestParam String tenantKey,
+ @RequestParam UUID driverEntityId,
+ @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredFrom,
+ @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredTo,
+ @RequestParam(defaultValue = "24") Integer guardHours,
+ @RequestParam(defaultValue = "3") Integer significantDrivingMinutes,
+ @RequestParam(defaultValue = "60") Integer mergeGapSeconds,
+ @RequestParam(defaultValue = "7") Integer operatingPeriodSplitRestHours
+ ) {
+ EsperPocRequest request = new EsperPocRequest(
+ tenantKey,
+ driverEntityId,
+ occurredFrom,
+ occurredTo,
+ guardHours,
+ significantDrivingMinutes,
+ mergeGapSeconds,
+ operatingPeriodSplitRestHours
+ );
+ return ResponseEntity.ok(service.evaluate(request));
+ }
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/ActivityIntervalDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/ActivityIntervalDto.java
new file mode 100644
index 0000000..4437969
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/dto/ActivityIntervalDto.java
@@ -0,0 +1,81 @@
+package at.procon.eventhub.esperpoc.dto;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.UUID;
+
+public record ActivityIntervalDto(
+ UUID driverEntityId,
+ UUID vehicleId,
+ UUID vehicleRegistrationId,
+ String activityType,
+ String cardSlot,
+ OffsetDateTime startedAt,
+ OffsetDateTime endedAt,
+ long durationSeconds,
+ String sourceRowId,
+ List sourceRowIds,
+ boolean clippedToRequestedPeriod,
+ String level
+) {
+ public static ActivityIntervalDto raw(
+ UUID driverEntityId,
+ UUID vehicleId,
+ UUID vehicleRegistrationId,
+ String activityType,
+ String cardSlot,
+ OffsetDateTime startedAt,
+ OffsetDateTime endedAt,
+ String sourceRowId
+ ) {
+ return new ActivityIntervalDto(
+ driverEntityId,
+ vehicleId,
+ vehicleRegistrationId,
+ activityType,
+ cardSlot,
+ startedAt,
+ endedAt,
+ Duration.between(startedAt, endedAt).getSeconds(),
+ sourceRowId,
+ sourceRowId == null ? List.of() : List.of(sourceRowId),
+ false,
+ "RAW_INTERVAL"
+ );
+ }
+
+ public ActivityIntervalDto withTime(OffsetDateTime newStartedAt, OffsetDateTime newEndedAt, boolean clipped) {
+ return new ActivityIntervalDto(
+ driverEntityId,
+ vehicleId,
+ vehicleRegistrationId,
+ activityType,
+ cardSlot,
+ newStartedAt,
+ newEndedAt,
+ Duration.between(newStartedAt, newEndedAt).getSeconds(),
+ sourceRowId,
+ sourceRowIds,
+ clipped,
+ level
+ );
+ }
+
+ public ActivityIntervalDto asMerged(List mergedSourceRowIds) {
+ return new ActivityIntervalDto(
+ driverEntityId,
+ vehicleId,
+ vehicleRegistrationId,
+ activityType,
+ cardSlot,
+ startedAt,
+ endedAt,
+ durationSeconds,
+ sourceRowId,
+ mergedSourceRowIds,
+ clippedToRequestedPeriod,
+ "MERGED_ACTIVITY"
+ );
+ }
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/DriverWorkSummaryDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/DriverWorkSummaryDto.java
new file mode 100644
index 0000000..2ab7e4d
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/dto/DriverWorkSummaryDto.java
@@ -0,0 +1,19 @@
+package at.procon.eventhub.esperpoc.dto;
+
+import java.time.OffsetDateTime;
+import java.util.Map;
+import java.util.UUID;
+
+public record DriverWorkSummaryDto(
+ UUID driverEntityId,
+ OffsetDateTime periodFrom,
+ OffsetDateTime periodTo,
+ long drivingSeconds,
+ long workSeconds,
+ long availabilitySeconds,
+ long breakRestSeconds,
+ long workingSeconds,
+ long operationSeconds,
+ Map secondsByActivity
+) {
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/DrivingInterruptionDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/DrivingInterruptionDto.java
new file mode 100644
index 0000000..c2cb722
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/dto/DrivingInterruptionDto.java
@@ -0,0 +1,12 @@
+package at.procon.eventhub.esperpoc.dto;
+
+import java.time.OffsetDateTime;
+
+public record DrivingInterruptionDto(
+ OffsetDateTime from,
+ OffsetDateTime to,
+ long durationSeconds,
+ String previousDrivingSourceRowId,
+ String nextDrivingSourceRowId
+) {
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocRequest.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocRequest.java
new file mode 100644
index 0000000..e860729
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocRequest.java
@@ -0,0 +1,27 @@
+package at.procon.eventhub.esperpoc.dto;
+
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.NotNull;
+import java.time.OffsetDateTime;
+import java.util.UUID;
+
+public record EsperPocRequest(
+ @NotBlank String tenantKey,
+ @NotNull UUID driverEntityId,
+ @NotNull OffsetDateTime occurredFrom,
+ @NotNull OffsetDateTime occurredTo,
+ Integer guardHours,
+ Integer significantDrivingMinutes,
+ Integer mergeGapSeconds,
+ Integer operatingPeriodSplitRestHours
+) {
+ public EsperPocRequest {
+ if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) {
+ throw new IllegalArgumentException("occurredFrom must be before occurredTo");
+ }
+ guardHours = guardHours == null ? 24 : Math.max(0, guardHours);
+ significantDrivingMinutes = significantDrivingMinutes == null ? 3 : Math.max(1, significantDrivingMinutes);
+ mergeGapSeconds = mergeGapSeconds == null ? 60 : Math.max(0, mergeGapSeconds);
+ operatingPeriodSplitRestHours = operatingPeriodSplitRestHours == null ? 7 : Math.max(1, operatingPeriodSplitRestHours);
+ }
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocResultDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocResultDto.java
new file mode 100644
index 0000000..086c52e
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/dto/EsperPocResultDto.java
@@ -0,0 +1,28 @@
+package at.procon.eventhub.esperpoc.dto;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.UUID;
+
+public record EsperPocResultDto(
+ String tenantKey,
+ UUID driverEntityId,
+ OffsetDateTime requestedFrom,
+ OffsetDateTime requestedTo,
+ OffsetDateTime loadedFrom,
+ OffsetDateTime loadedTo,
+ int rawEventCount,
+ int rawIntervalCount,
+ int mergedActivityCount,
+ int operatingTimePeriodCount,
+ int operatingPeriodSplitRestHours,
+ List raw,
+ List rawIntervals,
+ List activities,
+ List operatingTimePeriods,
+ DriverWorkSummaryDto workResultPerDriver,
+ DriverWorkSummaryDto workingOperationTimesPerEmployee,
+ ShiftDrivingEvaluationDto drivingTimeInterruptionEvaluation,
+ List notes
+) {
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/OperatingTimePeriodDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/OperatingTimePeriodDto.java
new file mode 100644
index 0000000..8cd7308
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/dto/OperatingTimePeriodDto.java
@@ -0,0 +1,17 @@
+package at.procon.eventhub.esperpoc.dto;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+
+public record OperatingTimePeriodDto(
+ int sequenceNumber,
+ OffsetDateTime startedAt,
+ OffsetDateTime endedAt,
+ long durationSeconds,
+ ActivityIntervalDto splitStartedAfterLongRest,
+ ActivityIntervalDto splitEndedByLongRest,
+ List activities,
+ DriverWorkSummaryDto workingOperationTimes,
+ ShiftDrivingEvaluationDto drivingTimeInterruptionEvaluation
+) {
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/RawActivityEventDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/RawActivityEventDto.java
new file mode 100644
index 0000000..be81959
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/dto/RawActivityEventDto.java
@@ -0,0 +1,21 @@
+package at.procon.eventhub.esperpoc.dto;
+
+import java.time.OffsetDateTime;
+import java.util.UUID;
+
+public record RawActivityEventDto(
+ UUID eventId,
+ OffsetDateTime occurredAt,
+ String sourceRowId,
+ String externalSourceEventId,
+ UUID driverEntityId,
+ UUID vehicleId,
+ UUID vehicleRegistrationId,
+ String activityType,
+ String lifecycle,
+ String cardSlot,
+ String cardStatus,
+ String drivingStatus,
+ String sourcePackageId
+) {
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/dto/ShiftDrivingEvaluationDto.java b/src/main/java/at/procon/eventhub/esperpoc/dto/ShiftDrivingEvaluationDto.java
new file mode 100644
index 0000000..bd29645
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/dto/ShiftDrivingEvaluationDto.java
@@ -0,0 +1,14 @@
+package at.procon.eventhub.esperpoc.dto;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+
+public record ShiftDrivingEvaluationDto(
+ int significantDrivingMinutes,
+ OffsetDateTime departureAt,
+ OffsetDateTime arrivalAt,
+ ActivityIntervalDto firstSignificantDrivingPeriod,
+ ActivityIntervalDto lastSignificantDrivingPeriod,
+ List interruptionsBetweenSignificantDrivingPeriods
+) {
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/persistence/EsperPocActivityRepository.java b/src/main/java/at/procon/eventhub/esperpoc/persistence/EsperPocActivityRepository.java
new file mode 100644
index 0000000..3095900
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/persistence/EsperPocActivityRepository.java
@@ -0,0 +1,83 @@
+package at.procon.eventhub.esperpoc.persistence;
+
+import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.UUID;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public class EsperPocActivityRepository {
+
+ private final JdbcTemplate jdbcTemplate;
+
+ public EsperPocActivityRepository(JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+ public List findDriverCardActivityEvents(
+ String tenantKey,
+ UUID driverEntityId,
+ OffsetDateTime occurredFrom,
+ OffsetDateTime occurredTo
+ ) {
+ return jdbcTemplate.query(
+ """
+ select
+ event.id,
+ event.occurred_at,
+ coalesce(
+ event.payload #>> '{raw,sourceRowId}',
+ regexp_replace(event.external_source_event_id, ':(START|END)$', '')
+ ) as source_row_id,
+ event.external_source_event_id,
+ event.driver_entity_id,
+ event.vehicle_id,
+ event.vehicle_registration_id,
+ event.event_type,
+ event.lifecycle,
+ detail.attributes ->> 'cardSlot' as card_slot,
+ detail.attributes ->> 'cardStatus' as card_status,
+ detail.attributes ->> 'drivingStatus' as driving_status,
+ event.source_package_id
+ from eventhub.event event
+ join eventhub.event_source source on source.id = event.event_source_id
+ join eventhub.data_package pkg on pkg.id = event.data_package_id
+ left join eventhub.event_detail detail on detail.event_occurred_at = event.occurred_at
+ and detail.event_id = event.id
+ and detail.detail_type = 'DRIVER_ACTIVITY'
+ where pkg.tenant_key = ?
+ and source.provider_key = 'TACHOGRAPH'
+ and source.source_kind = 'DRIVER_CARD'
+ and coalesce(pkg.extraction_code, 'CARD_ACTIVITY') = 'CARD_ACTIVITY'
+ and event.driver_entity_id = ?
+ and event.occurred_at >= ?
+ and event.occurred_at < ?
+ and event.event_domain = 'DRIVER_ACTIVITY'
+ and event.event_type in ('DRIVE', 'WORK', 'AVAILABILITY', 'BREAK_REST')
+ and event.lifecycle in ('START', 'END')
+ order by event.occurred_at, event.lifecycle, event.event_type, event.id
+ """,
+ (rs, rowNum) -> new RawActivityEventDto(
+ (UUID) rs.getObject("id"),
+ rs.getObject("occurred_at", OffsetDateTime.class),
+ rs.getString("source_row_id"),
+ rs.getString("external_source_event_id"),
+ (UUID) rs.getObject("driver_entity_id"),
+ (UUID) rs.getObject("vehicle_id"),
+ (UUID) rs.getObject("vehicle_registration_id"),
+ rs.getString("event_type"),
+ rs.getString("lifecycle"),
+ rs.getString("card_slot"),
+ rs.getString("card_status"),
+ rs.getString("driving_status"),
+ rs.getString("source_package_id")
+ ),
+ tenantKey,
+ driverEntityId,
+ occurredFrom,
+ occurredTo
+ );
+ }
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngine.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngine.java
new file mode 100644
index 0000000..9404c86
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngine.java
@@ -0,0 +1,126 @@
+package at.procon.eventhub.esperpoc.service;
+
+import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
+import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
+import com.espertech.esper.common.client.EPCompiled;
+import com.espertech.esper.common.client.EventBean;
+import com.espertech.esper.common.client.configuration.Configuration;
+import com.espertech.esper.compiler.client.CompilerArguments;
+import com.espertech.esper.compiler.client.EPCompileException;
+import com.espertech.esper.compiler.client.EPCompilerProvider;
+import com.espertech.esper.runtime.client.EPDeployException;
+import com.espertech.esper.runtime.client.EPDeployment;
+import com.espertech.esper.runtime.client.EPRuntime;
+import com.espertech.esper.runtime.client.EPRuntimeProvider;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.springframework.stereotype.Component;
+
+@Component
+public class EsperDriverActivityEngine {
+
+ private static final AtomicLong RUNTIME_COUNTER = new AtomicLong();
+
+ private static final String EPL = """
+ @name('driverCardActivityIntervals')
+ select
+ s.driverEntityId as driverEntityId,
+ s.vehicleId as vehicleId,
+ s.vehicleRegistrationId as vehicleRegistrationId,
+ s.eventType as activityType,
+ s.cardSlot as cardSlot,
+ s.occurredAt as startedAt,
+ e.occurredAt as endedAt,
+ s.sourceRowId as sourceRowId
+ from pattern [
+ every s = RawDriverActivityPoint(lifecycle = 'START') ->
+ e = RawDriverActivityPoint(
+ lifecycle = 'END',
+ sourceRowId = s.sourceRowId,
+ eventType = s.eventType,
+ driverEntityId = s.driverEntityId,
+ cardSlot = s.cardSlot
+ )
+ ]
+ """;
+
+ public List buildIntervals(List rawEvents) {
+ if (rawEvents == null || rawEvents.isEmpty()) {
+ return List.of();
+ }
+
+ List intervals = new ArrayList<>();
+ EPRuntime runtime = null;
+ try {
+ Configuration configuration = new Configuration();
+ configuration.getCommon().addEventType("RawDriverActivityPoint", EsperRawDriverActivityPoint.class);
+ String runtimeUri = "eventhub-esper-poc-" + RUNTIME_COUNTER.incrementAndGet();
+ runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration);
+
+ CompilerArguments arguments = new CompilerArguments(configuration);
+ EPCompiled compiled = EPCompilerProvider.getCompiler().compile(EPL, arguments);
+ EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
+ runtime.getDeploymentService()
+ .getStatement(deployment.getDeploymentId(), "driverCardActivityIntervals")
+ .addListener((newData, oldData, statement, rt) -> collectIntervals(newData, intervals));
+
+ List points = rawEvents.stream()
+ .sorted(Comparator.comparing(RawActivityEventDto::occurredAt).thenComparing(RawActivityEventDto::lifecycle))
+ .map(this::toEsperPoint)
+ .toList();
+ for (EsperRawDriverActivityPoint point : points) {
+ runtime.getEventService().sendEventBean(point, "RawDriverActivityPoint");
+ }
+
+ return intervals.stream()
+ .filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
+ .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
+ .toList();
+ } catch (EPCompileException | EPDeployException e) {
+ throw new IllegalStateException("Cannot compile/deploy Esper PoC EPL", e);
+ } finally {
+ if (runtime != null) {
+ runtime.destroy();
+ }
+ }
+ }
+
+ private void collectIntervals(EventBean[] newData, List intervals) {
+ if (newData == null) {
+ return;
+ }
+ for (EventBean event : newData) {
+ OffsetDateTime startedAt = (OffsetDateTime) event.get("startedAt");
+ OffsetDateTime endedAt = (OffsetDateTime) event.get("endedAt");
+ intervals.add(ActivityIntervalDto.raw(
+ (UUID) event.get("driverEntityId"),
+ (UUID) event.get("vehicleId"),
+ (UUID) event.get("vehicleRegistrationId"),
+ (String) event.get("activityType"),
+ (String) event.get("cardSlot"),
+ startedAt,
+ endedAt,
+ (String) event.get("sourceRowId")
+ ));
+ }
+ }
+
+ private EsperRawDriverActivityPoint toEsperPoint(RawActivityEventDto event) {
+ return new EsperRawDriverActivityPoint(
+ event.eventId(),
+ event.occurredAt(),
+ event.sourceRowId(),
+ event.externalSourceEventId(),
+ event.driverEntityId(),
+ event.vehicleId(),
+ event.vehicleRegistrationId(),
+ event.activityType(),
+ event.lifecycle(),
+ event.cardSlot()
+ );
+ }
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityService.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityService.java
new file mode 100644
index 0000000..0ff9241
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityService.java
@@ -0,0 +1,397 @@
+package at.procon.eventhub.esperpoc.service;
+
+import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
+import at.procon.eventhub.esperpoc.dto.DriverWorkSummaryDto;
+import at.procon.eventhub.esperpoc.dto.DrivingInterruptionDto;
+import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
+import at.procon.eventhub.esperpoc.dto.EsperPocResultDto;
+import at.procon.eventhub.esperpoc.dto.OperatingTimePeriodDto;
+import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
+import at.procon.eventhub.esperpoc.dto.ShiftDrivingEvaluationDto;
+import at.procon.eventhub.esperpoc.persistence.EsperPocActivityRepository;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.springframework.stereotype.Service;
+
+@Service
+public class EsperPocDriverCardActivityService {
+
+ private final EsperPocActivityRepository activityRepository;
+ private final EsperDriverActivityEngine esperEngine;
+
+ public EsperPocDriverCardActivityService(
+ EsperPocActivityRepository activityRepository,
+ EsperDriverActivityEngine esperEngine
+ ) {
+ this.activityRepository = activityRepository;
+ this.esperEngine = esperEngine;
+ }
+
+ public EsperPocResultDto evaluate(EsperPocRequest request) {
+ OffsetDateTime requestedFrom = utc(request.occurredFrom());
+ OffsetDateTime requestedTo = utc(request.occurredTo());
+ OffsetDateTime loadedFrom = requestedFrom.minusHours(request.guardHours());
+ OffsetDateTime loadedTo = requestedTo.plusHours(request.guardHours());
+
+ List rawEvents = activityRepository.findDriverCardActivityEvents(
+ request.tenantKey(),
+ request.driverEntityId(),
+ loadedFrom,
+ loadedTo
+ );
+ List rawIntervals = esperEngine.buildIntervals(rawEvents);
+
+ // Merge in the full guard window first. This is important for long BREAK_REST detection:
+ // a rest crossing the requested period boundary must keep its full guard-window duration.
+ List mergedLoadedActivities = mergeConsecutiveIdenticalActivities(
+ rawIntervals,
+ Duration.ofSeconds(request.mergeGapSeconds())
+ );
+ List mergedActivities = clipToPeriod(mergedLoadedActivities, requestedFrom, requestedTo);
+
+ DriverWorkSummaryDto summary = summarize(request, requestedFrom, requestedTo, mergedActivities);
+ ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving(
+ mergedActivities,
+ request.significantDrivingMinutes()
+ );
+ List operatingTimePeriods = buildOperatingTimePeriods(
+ request,
+ requestedFrom,
+ requestedTo,
+ mergedLoadedActivities
+ );
+
+ return new EsperPocResultDto(
+ request.tenantKey(),
+ request.driverEntityId(),
+ requestedFrom,
+ requestedTo,
+ loadedFrom,
+ loadedTo,
+ rawEvents.size(),
+ rawIntervals.size(),
+ mergedActivities.size(),
+ operatingTimePeriods.size(),
+ request.operatingPeriodSplitRestHours(),
+ rawEvents,
+ rawIntervals,
+ mergedActivities,
+ operatingTimePeriods,
+ summary,
+ summary,
+ drivingEvaluation,
+ notes(request)
+ );
+ }
+
+ public List mergeConsecutiveIdenticalActivities(
+ List intervals,
+ Duration mergeGapTolerance
+ ) {
+ if (intervals == null || intervals.isEmpty()) {
+ return List.of();
+ }
+ List sorted = intervals.stream()
+ .filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
+ .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
+ .toList();
+ List result = new ArrayList<>();
+ ActivityIntervalDto current = null;
+ List currentSources = new ArrayList<>();
+ for (ActivityIntervalDto next : sorted) {
+ if (current == null) {
+ current = next;
+ currentSources = new ArrayList<>(next.sourceRowIds());
+ continue;
+ }
+ if (canMerge(current, next, mergeGapTolerance)) {
+ currentSources.addAll(next.sourceRowIds());
+ OffsetDateTime newEndedAt = max(current.endedAt(), next.endedAt());
+ current = new ActivityIntervalDto(
+ current.driverEntityId(),
+ current.vehicleId() == null ? next.vehicleId() : current.vehicleId(),
+ current.vehicleRegistrationId() == null ? next.vehicleRegistrationId() : current.vehicleRegistrationId(),
+ current.activityType(),
+ current.cardSlot(),
+ current.startedAt(),
+ newEndedAt,
+ Duration.between(current.startedAt(), newEndedAt).getSeconds(),
+ current.sourceRowId(),
+ List.copyOf(currentSources),
+ current.clippedToRequestedPeriod() || next.clippedToRequestedPeriod(),
+ "MERGED_ACTIVITY"
+ );
+ } else {
+ result.add(current.asMerged(List.copyOf(currentSources)));
+ current = next;
+ currentSources = new ArrayList<>(next.sourceRowIds());
+ }
+ }
+ if (current != null) {
+ result.add(current.asMerged(List.copyOf(currentSources)));
+ }
+ return result;
+ }
+
+ public List buildOperatingTimePeriods(
+ EsperPocRequest request,
+ OffsetDateTime requestedFrom,
+ OffsetDateTime requestedTo,
+ List mergedLoadedActivities
+ ) {
+ Duration splitRestThreshold = Duration.ofHours(request.operatingPeriodSplitRestHours());
+ List sorted = mergedLoadedActivities.stream()
+ .filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
+ .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
+ .toList();
+ List longRests = sorted.stream()
+ .filter(interval -> isOperatingPeriodSplitRest(interval, splitRestThreshold))
+ .sorted(Comparator.comparing(ActivityIntervalDto::startedAt))
+ .toList();
+
+ List result = new ArrayList<>();
+ OffsetDateTime currentSpanStart = requestedFrom;
+ ActivityIntervalDto previousLongRest = null;
+ int sequenceNumber = 1;
+
+ for (ActivityIntervalDto longRest : longRests) {
+ if (!longRest.endedAt().isAfter(requestedFrom)) {
+ previousLongRest = longRest;
+ continue;
+ }
+ if (!longRest.startedAt().isBefore(requestedTo)) {
+ break;
+ }
+
+ OffsetDateTime restStart = max(longRest.startedAt(), requestedFrom);
+ OffsetDateTime restEnd = min(longRest.endedAt(), requestedTo);
+ if (!restEnd.isAfter(restStart)) {
+ continue;
+ }
+ if (restStart.isAfter(currentSpanStart)) {
+ OperatingTimePeriodDto period = buildOperatingPeriod(
+ sequenceNumber,
+ request,
+ currentSpanStart,
+ restStart,
+ previousLongRest,
+ longRest,
+ sorted
+ );
+ if (period != null) {
+ result.add(period);
+ sequenceNumber++;
+ }
+ }
+ if (restEnd.isAfter(currentSpanStart)) {
+ currentSpanStart = restEnd;
+ }
+ previousLongRest = longRest;
+ }
+
+ if (requestedTo.isAfter(currentSpanStart)) {
+ OperatingTimePeriodDto period = buildOperatingPeriod(
+ sequenceNumber,
+ request,
+ currentSpanStart,
+ requestedTo,
+ previousLongRest,
+ null,
+ sorted
+ );
+ if (period != null) {
+ result.add(period);
+ }
+ }
+ return result;
+ }
+
+ private OperatingTimePeriodDto buildOperatingPeriod(
+ int sequenceNumber,
+ EsperPocRequest request,
+ OffsetDateTime spanFrom,
+ OffsetDateTime spanTo,
+ ActivityIntervalDto splitStartedAfterLongRest,
+ ActivityIntervalDto splitEndedByLongRest,
+ List allActivities
+ ) {
+ List activities = allActivities.stream()
+ .filter(activity -> activity.startedAt().isBefore(spanTo) && activity.endedAt().isAfter(spanFrom))
+ .map(activity -> {
+ OffsetDateTime start = max(activity.startedAt(), spanFrom);
+ OffsetDateTime end = min(activity.endedAt(), spanTo);
+ if (!end.isAfter(start)) {
+ return null;
+ }
+ boolean clipped = !start.equals(activity.startedAt()) || !end.equals(activity.endedAt());
+ return activity.withTime(start, end, clipped);
+ })
+ .filter(Objects::nonNull)
+ .filter(activity -> activity.endedAt().isAfter(activity.startedAt()))
+ .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
+ .toList();
+
+ if (activities.isEmpty()) {
+ return null;
+ }
+
+ OffsetDateTime startedAt = activities.get(0).startedAt();
+ OffsetDateTime endedAt = activities.get(activities.size() - 1).endedAt();
+ DriverWorkSummaryDto summary = summarize(request, startedAt, endedAt, activities);
+ ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving(
+ activities,
+ request.significantDrivingMinutes()
+ );
+ return new OperatingTimePeriodDto(
+ sequenceNumber,
+ startedAt,
+ endedAt,
+ Duration.between(startedAt, endedAt).getSeconds(),
+ splitStartedAfterLongRest,
+ splitEndedByLongRest,
+ activities,
+ summary,
+ drivingEvaluation
+ );
+ }
+
+ private boolean isOperatingPeriodSplitRest(ActivityIntervalDto interval, Duration splitRestThreshold) {
+ return "BREAK_REST".equals(interval.activityType())
+ && interval.durationSeconds() > splitRestThreshold.getSeconds();
+ }
+
+ private boolean canMerge(ActivityIntervalDto left, ActivityIntervalDto right, Duration tolerance) {
+ boolean sameDriver = Objects.equals(left.driverEntityId(), right.driverEntityId());
+ boolean sameActivity = Objects.equals(left.activityType(), right.activityType());
+ boolean sameSlot = Objects.equals(left.cardSlot(), right.cardSlot());
+ long gapSeconds = Duration.between(left.endedAt(), right.startedAt()).getSeconds();
+ boolean adjacentOrOverlapping = gapSeconds <= tolerance.getSeconds();
+ return sameDriver && sameActivity && sameSlot && adjacentOrOverlapping;
+ }
+
+ private List clipToPeriod(
+ List intervals,
+ OffsetDateTime periodFrom,
+ OffsetDateTime periodTo
+ ) {
+ return intervals.stream()
+ .map(interval -> {
+ OffsetDateTime start = max(interval.startedAt(), periodFrom);
+ OffsetDateTime end = min(interval.endedAt(), periodTo);
+ if (!end.isAfter(start)) {
+ return null;
+ }
+ boolean clipped = !start.equals(interval.startedAt()) || !end.equals(interval.endedAt());
+ return interval.withTime(start, end, clipped);
+ })
+ .filter(Objects::nonNull)
+ .sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
+ .toList();
+ }
+
+ private DriverWorkSummaryDto summarize(
+ EsperPocRequest request,
+ OffsetDateTime periodFrom,
+ OffsetDateTime periodTo,
+ List activities
+ ) {
+ Map secondsByActivity = new LinkedHashMap<>();
+ for (ActivityIntervalDto activity : activities) {
+ secondsByActivity.merge(activity.activityType(), activity.durationSeconds(), Long::sum);
+ }
+ long drivingSeconds = secondsByActivity.getOrDefault("DRIVE", 0L);
+ long workSeconds = secondsByActivity.getOrDefault("WORK", 0L);
+ long availabilitySeconds = secondsByActivity.getOrDefault("AVAILABILITY", 0L);
+ long breakRestSeconds = secondsByActivity.getOrDefault("BREAK_REST", 0L);
+ long workingSeconds = drivingSeconds + workSeconds;
+ long operationSeconds = drivingSeconds + workSeconds + availabilitySeconds;
+ return new DriverWorkSummaryDto(
+ request.driverEntityId(),
+ periodFrom,
+ periodTo,
+ drivingSeconds,
+ workSeconds,
+ availabilitySeconds,
+ breakRestSeconds,
+ workingSeconds,
+ operationSeconds,
+ secondsByActivity
+ );
+ }
+
+ private ShiftDrivingEvaluationDto evaluateSignificantDriving(
+ List activities,
+ int significantDrivingMinutes
+ ) {
+ long significantSeconds = Duration.ofMinutes(significantDrivingMinutes).getSeconds();
+ List significantDrivingPeriods = activities.stream()
+ .filter(activity -> "DRIVE".equals(activity.activityType()))
+ .filter(activity -> activity.durationSeconds() > significantSeconds)
+ .sorted(Comparator.comparing(ActivityIntervalDto::startedAt))
+ .toList();
+ if (significantDrivingPeriods.isEmpty()) {
+ return new ShiftDrivingEvaluationDto(
+ significantDrivingMinutes,
+ null,
+ null,
+ null,
+ null,
+ List.of()
+ );
+ }
+ List interruptions = new ArrayList<>();
+ for (int i = 1; i < significantDrivingPeriods.size(); i++) {
+ ActivityIntervalDto previous = significantDrivingPeriods.get(i - 1);
+ ActivityIntervalDto next = significantDrivingPeriods.get(i);
+ if (next.startedAt().isAfter(previous.endedAt())) {
+ interruptions.add(new DrivingInterruptionDto(
+ previous.endedAt(),
+ next.startedAt(),
+ Duration.between(previous.endedAt(), next.startedAt()).getSeconds(),
+ previous.sourceRowId(),
+ next.sourceRowId()
+ ));
+ }
+ }
+ ActivityIntervalDto first = significantDrivingPeriods.get(0);
+ ActivityIntervalDto last = significantDrivingPeriods.get(significantDrivingPeriods.size() - 1);
+ return new ShiftDrivingEvaluationDto(
+ significantDrivingMinutes,
+ first.startedAt(),
+ last.endedAt(),
+ first,
+ last,
+ interruptions
+ );
+ }
+
+ private List notes(EsperPocRequest request) {
+ return List.of(
+ "PoC reads only tachograph DRIVER_CARD/CARD_ACTIVITY source events from eventhub.event.",
+ "Level RAW contains original imported point events; level Activities contains Esper-created intervals merged by consecutive identical activity.",
+ "Activities are merged in the guard window first and then clipped to the requested period, so BREAK_REST across the period boundary can still split operating periods correctly.",
+ "Operating time periods are split by BREAK_REST activities longer than " + request.operatingPeriodSplitRestHours() + " hours.",
+ "Working seconds = DRIVE + WORK. Operation seconds = DRIVE + WORK + AVAILABILITY. BREAK_REST is reported separately.",
+ "Departure/arrival are evaluated globally and again inside each operating time period using DRIVE intervals longer than " + request.significantDrivingMinutes() + " minutes."
+ );
+ }
+
+ private OffsetDateTime utc(OffsetDateTime value) {
+ return value.withOffsetSameInstant(ZoneOffset.UTC);
+ }
+
+ private OffsetDateTime max(OffsetDateTime left, OffsetDateTime right) {
+ return left.isAfter(right) ? left : right;
+ }
+
+ private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) {
+ return left.isBefore(right) ? left : right;
+ }
+}
diff --git a/src/main/java/at/procon/eventhub/esperpoc/service/EsperRawDriverActivityPoint.java b/src/main/java/at/procon/eventhub/esperpoc/service/EsperRawDriverActivityPoint.java
new file mode 100644
index 0000000..6e71f83
--- /dev/null
+++ b/src/main/java/at/procon/eventhub/esperpoc/service/EsperRawDriverActivityPoint.java
@@ -0,0 +1,81 @@
+package at.procon.eventhub.esperpoc.service;
+
+import java.time.OffsetDateTime;
+import java.util.UUID;
+
+public final class EsperRawDriverActivityPoint {
+ private final UUID eventId;
+ private final OffsetDateTime occurredAt;
+ private final String sourceRowId;
+ private final String externalSourceEventId;
+ private final UUID driverEntityId;
+ private final UUID vehicleId;
+ private final UUID vehicleRegistrationId;
+ private final String eventType;
+ private final String lifecycle;
+ private final String cardSlot;
+
+ public EsperRawDriverActivityPoint(
+ UUID eventId,
+ OffsetDateTime occurredAt,
+ String sourceRowId,
+ String externalSourceEventId,
+ UUID driverEntityId,
+ UUID vehicleId,
+ UUID vehicleRegistrationId,
+ String eventType,
+ String lifecycle,
+ String cardSlot
+ ) {
+ this.eventId = eventId;
+ this.occurredAt = occurredAt;
+ this.sourceRowId = sourceRowId;
+ this.externalSourceEventId = externalSourceEventId;
+ this.driverEntityId = driverEntityId;
+ this.vehicleId = vehicleId;
+ this.vehicleRegistrationId = vehicleRegistrationId;
+ this.eventType = eventType;
+ this.lifecycle = lifecycle;
+ this.cardSlot = cardSlot;
+ }
+
+ public UUID getEventId() {
+ return eventId;
+ }
+
+ public OffsetDateTime getOccurredAt() {
+ return occurredAt;
+ }
+
+ public String getSourceRowId() {
+ return sourceRowId;
+ }
+
+ public String getExternalSourceEventId() {
+ return externalSourceEventId;
+ }
+
+ public UUID getDriverEntityId() {
+ return driverEntityId;
+ }
+
+ public UUID getVehicleId() {
+ return vehicleId;
+ }
+
+ public UUID getVehicleRegistrationId() {
+ return vehicleRegistrationId;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public String getLifecycle() {
+ return lifecycle;
+ }
+
+ public String getCardSlot() {
+ return cardSlot;
+ }
+}
diff --git a/src/test/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngineTest.java b/src/test/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngineTest.java
new file mode 100644
index 0000000..faa0f81
--- /dev/null
+++ b/src/test/java/at/procon/eventhub/esperpoc/service/EsperDriverActivityEngineTest.java
@@ -0,0 +1,59 @@
+package at.procon.eventhub.esperpoc.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+
+class EsperDriverActivityEngineTest {
+
+ private final EsperDriverActivityEngine engine = new EsperDriverActivityEngine();
+
+ @Test
+ void buildsActivityIntervalsFromStartAndEndEvents() {
+ UUID driverId = UUID.randomUUID();
+ UUID vehicleId = UUID.randomUUID();
+ List raw = List.of(
+ raw("100", "START", "DRIVE", "2026-04-30T23:55:00Z", driverId, vehicleId),
+ raw("100", "END", "DRIVE", "2026-05-01T00:10:00Z", driverId, vehicleId),
+ raw("101", "START", "WORK", "2026-05-01T00:10:00Z", driverId, vehicleId),
+ raw("101", "END", "WORK", "2026-05-01T00:40:00Z", driverId, vehicleId)
+ );
+
+ var intervals = engine.buildIntervals(raw);
+
+ assertThat(intervals).hasSize(2);
+ assertThat(intervals.get(0).activityType()).isEqualTo("DRIVE");
+ assertThat(intervals.get(0).durationSeconds()).isEqualTo(15 * 60L);
+ assertThat(intervals.get(1).activityType()).isEqualTo("WORK");
+ assertThat(intervals.get(1).durationSeconds()).isEqualTo(30 * 60L);
+ }
+
+ private RawActivityEventDto raw(
+ String rowId,
+ String lifecycle,
+ String activity,
+ String occurredAt,
+ UUID driverId,
+ UUID vehicleId
+ ) {
+ return new RawActivityEventDto(
+ UUID.randomUUID(),
+ OffsetDateTime.parse(occurredAt),
+ rowId,
+ "TACHOGRAPH:CARD_ACTIVITY:" + rowId + ":" + lifecycle,
+ driverId,
+ vehicleId,
+ null,
+ activity,
+ lifecycle,
+ "DRIVER",
+ "INSERTED",
+ "SINGLE",
+ "package-1"
+ );
+ }
+}
diff --git a/src/test/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityServiceTest.java b/src/test/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityServiceTest.java
new file mode 100644
index 0000000..4c10b43
--- /dev/null
+++ b/src/test/java/at/procon/eventhub/esperpoc/service/EsperPocDriverCardActivityServiceTest.java
@@ -0,0 +1,114 @@
+package at.procon.eventhub.esperpoc.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
+import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+
+class EsperPocDriverCardActivityServiceTest {
+
+ private final EsperPocDriverCardActivityService service = new EsperPocDriverCardActivityService(null, null);
+
+ @Test
+ void mergesIdenticalActivitiesAcrossUtcMidnight() {
+ UUID driverId = UUID.randomUUID();
+ ActivityIntervalDto beforeMidnight = ActivityIntervalDto.raw(
+ driverId,
+ null,
+ null,
+ "BREAK_REST",
+ "DRIVER",
+ OffsetDateTime.parse("2026-04-30T23:50:00Z"),
+ OffsetDateTime.parse("2026-05-01T00:00:00Z"),
+ "1"
+ );
+ ActivityIntervalDto afterMidnight = ActivityIntervalDto.raw(
+ driverId,
+ null,
+ null,
+ "BREAK_REST",
+ "DRIVER",
+ OffsetDateTime.parse("2026-05-01T00:00:00Z"),
+ OffsetDateTime.parse("2026-05-01T00:20:00Z"),
+ "2"
+ );
+
+ var merged = service.mergeConsecutiveIdenticalActivities(
+ List.of(beforeMidnight, afterMidnight),
+ java.time.Duration.ZERO
+ );
+
+ assertThat(merged).hasSize(1);
+ assertThat(merged.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-30T23:50:00Z"));
+ assertThat(merged.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-05-01T00:20:00Z"));
+ assertThat(merged.get(0).durationSeconds()).isEqualTo(30 * 60L);
+ assertThat(merged.get(0).sourceRowIds()).containsExactly("1", "2");
+ }
+
+ @Test
+ void splitsOperatingPeriodsByBreakRestLongerThanConfiguredHoursAndEvaluatesDepartureArrivalPerPeriod() {
+ UUID driverId = UUID.randomUUID();
+ EsperPocRequest request = new EsperPocRequest(
+ "default",
+ driverId,
+ OffsetDateTime.parse("2026-04-01T00:00:00Z"),
+ OffsetDateTime.parse("2026-04-03T00:00:00Z"),
+ 24,
+ 3,
+ 60,
+ 7
+ );
+
+ List activities = List.of(
+ activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T08:00:00Z", "d1"),
+ activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1"),
+ activity(driverId, "DRIVE", "2026-04-01T09:30:00Z", "2026-04-01T11:00:00Z", "d2"),
+ activity(driverId, "BREAK_REST", "2026-04-01T20:00:00Z", "2026-04-02T04:30:00Z", "r1"),
+ activity(driverId, "DRIVE", "2026-04-02T05:00:00Z", "2026-04-02T07:00:00Z", "d3"),
+ activity(driverId, "BREAK_REST", "2026-04-02T10:00:00Z", "2026-04-02T10:30:00Z", "r2"),
+ activity(driverId, "DRIVE", "2026-04-02T10:30:00Z", "2026-04-02T12:00:00Z", "d4")
+ );
+
+ var periods = service.buildOperatingTimePeriods(
+ request,
+ request.occurredFrom(),
+ request.occurredTo(),
+ activities
+ );
+
+ assertThat(periods).hasSize(2);
+ assertThat(periods.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T06:00:00Z"));
+ assertThat(periods.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z"));
+ assertThat(periods.get(0).drivingTimeInterruptionEvaluation().departureAt())
+ .isEqualTo(OffsetDateTime.parse("2026-04-01T06:00:00Z"));
+ assertThat(periods.get(0).drivingTimeInterruptionEvaluation().arrivalAt())
+ .isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z"));
+ assertThat(periods.get(0).drivingTimeInterruptionEvaluation().interruptionsBetweenSignificantDrivingPeriods())
+ .hasSize(1);
+
+ assertThat(periods.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-02T05:00:00Z"));
+ assertThat(periods.get(1).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-02T12:00:00Z"));
+ assertThat(periods.get(1).drivingTimeInterruptionEvaluation().departureAt())
+ .isEqualTo(OffsetDateTime.parse("2026-04-02T05:00:00Z"));
+ assertThat(periods.get(1).drivingTimeInterruptionEvaluation().arrivalAt())
+ .isEqualTo(OffsetDateTime.parse("2026-04-02T12:00:00Z"));
+ assertThat(periods.get(1).workingOperationTimes().breakRestSeconds()).isEqualTo(30 * 60L);
+ }
+
+ private ActivityIntervalDto activity(UUID driverId, String activity, String from, String to, String sourceRowId) {
+ return ActivityIntervalDto.raw(
+ driverId,
+ null,
+ null,
+ activity,
+ "DRIVER",
+ OffsetDateTime.parse(from),
+ OffsetDateTime.parse(to),
+ sourceRowId
+ ).asMerged(List.of(sourceRowId));
+ }
+}