Add Esper tachograph activity PoC

This commit is contained in:
trifonovt 2026-05-05 15:29:10 +02:00
parent 17e2bbedf4
commit e4e9137af5
18 changed files with 1322 additions and 0 deletions

View File

@ -0,0 +1,94 @@
# Esper PoC: Tachograph Driver-Card Activity Evaluation
This PoC intentionally uses only existing imported EventHub source events:
- provider: `TACHOGRAPH`
- source kind: `DRIVER_CARD`
- extraction code: `CARD_ACTIVITY`
- event domain: `DRIVER_ACTIVITY`
- event types: `DRIVE`, `WORK`, `AVAILABILITY`, `BREAK_REST`
- lifecycles: `START`, `END`
It does not introduce canonical-event tables yet. The goal is to prove that Esper can replay one driver and one selected period and produce activity-level and operating-period results.
## Endpoint
```http
GET /api/eventhub/esper-poc/tachograph/driver-card-activities
```
Query parameters:
| Parameter | Required | Example | Meaning |
|---|---:|---|---|
| `tenantKey` | yes | `default` | EventHub tenant key. |
| `driverEntityId` | yes | UUID | Existing `eventhub.event.driver_entity_id`. |
| `occurredFrom` | yes | `2026-04-01T00:00:00Z` | Requested period start. |
| `occurredTo` | yes | `2026-05-01T00:00:00Z` | Requested period end. |
| `guardHours` | no | `24` | Extra loading window before/after requested period. Needed for activities crossing midnight/month boundaries and long rests crossing period boundaries. |
| `significantDrivingMinutes` | no | `3` | DRIVE intervals longer than this threshold count as significant driving periods. |
| `mergeGapSeconds` | no | `60` | Consecutive identical activities are merged if the gap is at most this value. |
| `operatingPeriodSplitRestHours` | no | `7` | A `BREAK_REST` activity longer than this threshold splits operating time periods. |
## Produced levels
### Level 1: Raw
`raw` contains the original point events from `eventhub.event`.
### Level 2: Activities
Esper consumes the raw point events and produces intervals by pairing:
```text
START + END with same sourceRowId, activity type, driver and card slot
```
The service merges consecutive identical activities in the full guard window first, then clips the merged activities to the requested period. This is important because a long `BREAK_REST` crossing the requested boundary must keep its full guard-window duration for operating-period splitting.
## Operating time periods
`operatingTimePeriods` are derived from the merged activity timeline.
A `BREAK_REST` interval splits operating periods when:
```text
activityType = BREAK_REST
and duration > operatingPeriodSplitRestHours
```
The default is 7 hours. With the default, a `BREAK_REST` of exactly 7 hours does not split; it must be longer than 7 hours.
Each operating period contains:
- `sequenceNumber`
- `startedAt`
- `endedAt`
- `durationSeconds`
- `activities`
- `workingOperationTimes`
- `drivingTimeInterruptionEvaluation`
- optional references to the long rest before/after the period
Departure and arrival are evaluated per operating period:
- departure = first significant `DRIVE` interval inside that operating period
- arrival = end of the last significant `DRIVE` interval inside that operating period
- middle/interruption = gaps between significant `DRIVE` intervals inside the same operating period
## Result semantics
- `workResultPerDriver` and `workingOperationTimesPerEmployee` currently use the same PoC summary for the whole requested period.
- `workingSeconds = DRIVE + WORK`.
- `operationSeconds = DRIVE + WORK + AVAILABILITY`.
- `breakRestSeconds` is reported separately.
- Top-level `drivingTimeInterruptionEvaluation` evaluates the whole requested period.
- Each item in `operatingTimePeriods` has its own `drivingTimeInterruptionEvaluation`.
## Current limitations
- Uses the existing source-level `driver_entity_id`, not a canonical employee table.
- Reads only tachograph driver-card activity events.
- Does not merge VU/card duplication.
- Does not persist results; the endpoint returns a PoC calculation response.
- Esper is used for interval creation. Summary, clipping, operating-period split, and merged activity report calculation are implemented in Java for auditability and easier future migration to canonical events.

17
pom.xml
View File

@ -20,6 +20,7 @@
<properties>
<java.version>21</java.version>
<camel.version>4.18.2</camel.version>
<esper.version>9.0.0</esper.version>
</properties>
<dependencyManagement>
@ -88,6 +89,22 @@
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.espertech</groupId>
<artifactId>esper-common</artifactId>
<version>${esper.version}</version>
</dependency>
<dependency>
<groupId>com.espertech</groupId>
<artifactId>esper-compiler</artifactId>
<version>${esper.version}</version>
</dependency>
<dependency>
<groupId>com.espertech</groupId>
<artifactId>esper-runtime</artifactId>
<version>${esper.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@ -0,0 +1,84 @@
{
"info": {
"name": "EventHub Esper PoC",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"item": [
{
"name": "Evaluate tachograph driver-card activities",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{baseUrl}}/api/eventhub/esper-poc/tachograph/driver-card-activities?tenantKey={{tenantKey}}&driverEntityId={{driverEntityId}}&occurredFrom={{occurredFrom}}&occurredTo={{occurredTo}}&guardHours=24&significantDrivingMinutes=3&mergeGapSeconds=60&operatingPeriodSplitRestHours=7",
"host": [
"{{baseUrl}}"
],
"path": [
"api",
"eventhub",
"esper-poc",
"tachograph",
"driver-card-activities"
],
"query": [
{
"key": "tenantKey",
"value": "{{tenantKey}}"
},
{
"key": "driverEntityId",
"value": "{{driverEntityId}}"
},
{
"key": "occurredFrom",
"value": "{{occurredFrom}}"
},
{
"key": "occurredTo",
"value": "{{occurredTo}}"
},
{
"key": "guardHours",
"value": "24"
},
{
"key": "significantDrivingMinutes",
"value": "3"
},
{
"key": "mergeGapSeconds",
"value": "60"
},
{
"key": "operatingPeriodSplitRestHours",
"value": "7"
}
]
}
}
}
],
"variable": [
{
"key": "baseUrl",
"value": "http://localhost:8080"
},
{
"key": "tenantKey",
"value": "default"
},
{
"key": "driverEntityId",
"value": "00000000-0000-0000-0000-000000000000"
},
{
"key": "occurredFrom",
"value": "2026-04-01T00:00:00Z"
},
{
"key": "occurredTo",
"value": "2026-05-01T00:00:00Z"
}
]
}

View File

@ -0,0 +1,48 @@
package at.procon.eventhub.esperpoc.api;
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
import at.procon.eventhub.esperpoc.dto.EsperPocResultDto;
import at.procon.eventhub.esperpoc.service.EsperPocDriverCardActivityService;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/eventhub/esper-poc")
public class EsperPocController {
private final EsperPocDriverCardActivityService service;
public EsperPocController(EsperPocDriverCardActivityService service) {
this.service = service;
}
@GetMapping("/tachograph/driver-card-activities")
public ResponseEntity<EsperPocResultDto> evaluateDriverCardActivities(
@RequestParam String tenantKey,
@RequestParam UUID driverEntityId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredFrom,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredTo,
@RequestParam(defaultValue = "24") Integer guardHours,
@RequestParam(defaultValue = "3") Integer significantDrivingMinutes,
@RequestParam(defaultValue = "60") Integer mergeGapSeconds,
@RequestParam(defaultValue = "7") Integer operatingPeriodSplitRestHours
) {
EsperPocRequest request = new EsperPocRequest(
tenantKey,
driverEntityId,
occurredFrom,
occurredTo,
guardHours,
significantDrivingMinutes,
mergeGapSeconds,
operatingPeriodSplitRestHours
);
return ResponseEntity.ok(service.evaluate(request));
}
}

View File

@ -0,0 +1,81 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
public record ActivityIntervalDto(
UUID driverEntityId,
UUID vehicleId,
UUID vehicleRegistrationId,
String activityType,
String cardSlot,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
String sourceRowId,
List<String> sourceRowIds,
boolean clippedToRequestedPeriod,
String level
) {
public static ActivityIntervalDto raw(
UUID driverEntityId,
UUID vehicleId,
UUID vehicleRegistrationId,
String activityType,
String cardSlot,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
String sourceRowId
) {
return new ActivityIntervalDto(
driverEntityId,
vehicleId,
vehicleRegistrationId,
activityType,
cardSlot,
startedAt,
endedAt,
Duration.between(startedAt, endedAt).getSeconds(),
sourceRowId,
sourceRowId == null ? List.of() : List.of(sourceRowId),
false,
"RAW_INTERVAL"
);
}
public ActivityIntervalDto withTime(OffsetDateTime newStartedAt, OffsetDateTime newEndedAt, boolean clipped) {
return new ActivityIntervalDto(
driverEntityId,
vehicleId,
vehicleRegistrationId,
activityType,
cardSlot,
newStartedAt,
newEndedAt,
Duration.between(newStartedAt, newEndedAt).getSeconds(),
sourceRowId,
sourceRowIds,
clipped,
level
);
}
public ActivityIntervalDto asMerged(List<String> mergedSourceRowIds) {
return new ActivityIntervalDto(
driverEntityId,
vehicleId,
vehicleRegistrationId,
activityType,
cardSlot,
startedAt,
endedAt,
durationSeconds,
sourceRowId,
mergedSourceRowIds,
clippedToRequestedPeriod,
"MERGED_ACTIVITY"
);
}
}

View File

@ -0,0 +1,19 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.UUID;
public record DriverWorkSummaryDto(
UUID driverEntityId,
OffsetDateTime periodFrom,
OffsetDateTime periodTo,
long drivingSeconds,
long workSeconds,
long availabilitySeconds,
long breakRestSeconds,
long workingSeconds,
long operationSeconds,
Map<String, Long> secondsByActivity
) {
}

View File

@ -0,0 +1,12 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
public record DrivingInterruptionDto(
OffsetDateTime from,
OffsetDateTime to,
long durationSeconds,
String previousDrivingSourceRowId,
String nextDrivingSourceRowId
) {
}

View File

@ -0,0 +1,27 @@
package at.procon.eventhub.esperpoc.dto;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.time.OffsetDateTime;
import java.util.UUID;
public record EsperPocRequest(
@NotBlank String tenantKey,
@NotNull UUID driverEntityId,
@NotNull OffsetDateTime occurredFrom,
@NotNull OffsetDateTime occurredTo,
Integer guardHours,
Integer significantDrivingMinutes,
Integer mergeGapSeconds,
Integer operatingPeriodSplitRestHours
) {
public EsperPocRequest {
if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) {
throw new IllegalArgumentException("occurredFrom must be before occurredTo");
}
guardHours = guardHours == null ? 24 : Math.max(0, guardHours);
significantDrivingMinutes = significantDrivingMinutes == null ? 3 : Math.max(1, significantDrivingMinutes);
mergeGapSeconds = mergeGapSeconds == null ? 60 : Math.max(0, mergeGapSeconds);
operatingPeriodSplitRestHours = operatingPeriodSplitRestHours == null ? 7 : Math.max(1, operatingPeriodSplitRestHours);
}
}

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
public record EsperPocResultDto(
String tenantKey,
UUID driverEntityId,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
OffsetDateTime loadedFrom,
OffsetDateTime loadedTo,
int rawEventCount,
int rawIntervalCount,
int mergedActivityCount,
int operatingTimePeriodCount,
int operatingPeriodSplitRestHours,
List<RawActivityEventDto> raw,
List<ActivityIntervalDto> rawIntervals,
List<ActivityIntervalDto> activities,
List<OperatingTimePeriodDto> operatingTimePeriods,
DriverWorkSummaryDto workResultPerDriver,
DriverWorkSummaryDto workingOperationTimesPerEmployee,
ShiftDrivingEvaluationDto drivingTimeInterruptionEvaluation,
List<String> notes
) {
}

View File

@ -0,0 +1,17 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.List;
public record OperatingTimePeriodDto(
int sequenceNumber,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
ActivityIntervalDto splitStartedAfterLongRest,
ActivityIntervalDto splitEndedByLongRest,
List<ActivityIntervalDto> activities,
DriverWorkSummaryDto workingOperationTimes,
ShiftDrivingEvaluationDto drivingTimeInterruptionEvaluation
) {
}

View File

@ -0,0 +1,21 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.UUID;
public record RawActivityEventDto(
UUID eventId,
OffsetDateTime occurredAt,
String sourceRowId,
String externalSourceEventId,
UUID driverEntityId,
UUID vehicleId,
UUID vehicleRegistrationId,
String activityType,
String lifecycle,
String cardSlot,
String cardStatus,
String drivingStatus,
String sourcePackageId
) {
}

View File

@ -0,0 +1,14 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.List;
public record ShiftDrivingEvaluationDto(
int significantDrivingMinutes,
OffsetDateTime departureAt,
OffsetDateTime arrivalAt,
ActivityIntervalDto firstSignificantDrivingPeriod,
ActivityIntervalDto lastSignificantDrivingPeriod,
List<DrivingInterruptionDto> interruptionsBetweenSignificantDrivingPeriods
) {
}

View File

@ -0,0 +1,83 @@
package at.procon.eventhub.esperpoc.persistence;
import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class EsperPocActivityRepository {
private final JdbcTemplate jdbcTemplate;
public EsperPocActivityRepository(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public List<RawActivityEventDto> findDriverCardActivityEvents(
String tenantKey,
UUID driverEntityId,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
return jdbcTemplate.query(
"""
select
event.id,
event.occurred_at,
coalesce(
event.payload #>> '{raw,sourceRowId}',
regexp_replace(event.external_source_event_id, ':(START|END)$', '')
) as source_row_id,
event.external_source_event_id,
event.driver_entity_id,
event.vehicle_id,
event.vehicle_registration_id,
event.event_type,
event.lifecycle,
detail.attributes ->> 'cardSlot' as card_slot,
detail.attributes ->> 'cardStatus' as card_status,
detail.attributes ->> 'drivingStatus' as driving_status,
event.source_package_id
from eventhub.event event
join eventhub.event_source source on source.id = event.event_source_id
join eventhub.data_package pkg on pkg.id = event.data_package_id
left join eventhub.event_detail detail on detail.event_occurred_at = event.occurred_at
and detail.event_id = event.id
and detail.detail_type = 'DRIVER_ACTIVITY'
where pkg.tenant_key = ?
and source.provider_key = 'TACHOGRAPH'
and source.source_kind = 'DRIVER_CARD'
and coalesce(pkg.extraction_code, 'CARD_ACTIVITY') = 'CARD_ACTIVITY'
and event.driver_entity_id = ?
and event.occurred_at >= ?
and event.occurred_at < ?
and event.event_domain = 'DRIVER_ACTIVITY'
and event.event_type in ('DRIVE', 'WORK', 'AVAILABILITY', 'BREAK_REST')
and event.lifecycle in ('START', 'END')
order by event.occurred_at, event.lifecycle, event.event_type, event.id
""",
(rs, rowNum) -> new RawActivityEventDto(
(UUID) rs.getObject("id"),
rs.getObject("occurred_at", OffsetDateTime.class),
rs.getString("source_row_id"),
rs.getString("external_source_event_id"),
(UUID) rs.getObject("driver_entity_id"),
(UUID) rs.getObject("vehicle_id"),
(UUID) rs.getObject("vehicle_registration_id"),
rs.getString("event_type"),
rs.getString("lifecycle"),
rs.getString("card_slot"),
rs.getString("card_status"),
rs.getString("driving_status"),
rs.getString("source_package_id")
),
tenantKey,
driverEntityId,
occurredFrom,
occurredTo
);
}
}

View File

@ -0,0 +1,126 @@
package at.procon.eventhub.esperpoc.service;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
import com.espertech.esper.common.client.EPCompiled;
import com.espertech.esper.common.client.EventBean;
import com.espertech.esper.common.client.configuration.Configuration;
import com.espertech.esper.compiler.client.CompilerArguments;
import com.espertech.esper.compiler.client.EPCompileException;
import com.espertech.esper.compiler.client.EPCompilerProvider;
import com.espertech.esper.runtime.client.EPDeployException;
import com.espertech.esper.runtime.client.EPDeployment;
import com.espertech.esper.runtime.client.EPRuntime;
import com.espertech.esper.runtime.client.EPRuntimeProvider;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.stereotype.Component;
@Component
public class EsperDriverActivityEngine {
private static final AtomicLong RUNTIME_COUNTER = new AtomicLong();
private static final String EPL = """
@name('driverCardActivityIntervals')
select
s.driverEntityId as driverEntityId,
s.vehicleId as vehicleId,
s.vehicleRegistrationId as vehicleRegistrationId,
s.eventType as activityType,
s.cardSlot as cardSlot,
s.occurredAt as startedAt,
e.occurredAt as endedAt,
s.sourceRowId as sourceRowId
from pattern [
every s = RawDriverActivityPoint(lifecycle = 'START') ->
e = RawDriverActivityPoint(
lifecycle = 'END',
sourceRowId = s.sourceRowId,
eventType = s.eventType,
driverEntityId = s.driverEntityId,
cardSlot = s.cardSlot
)
]
""";
public List<ActivityIntervalDto> buildIntervals(List<RawActivityEventDto> rawEvents) {
if (rawEvents == null || rawEvents.isEmpty()) {
return List.of();
}
List<ActivityIntervalDto> intervals = new ArrayList<>();
EPRuntime runtime = null;
try {
Configuration configuration = new Configuration();
configuration.getCommon().addEventType("RawDriverActivityPoint", EsperRawDriverActivityPoint.class);
String runtimeUri = "eventhub-esper-poc-" + RUNTIME_COUNTER.incrementAndGet();
runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration);
CompilerArguments arguments = new CompilerArguments(configuration);
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(EPL, arguments);
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
runtime.getDeploymentService()
.getStatement(deployment.getDeploymentId(), "driverCardActivityIntervals")
.addListener((newData, oldData, statement, rt) -> collectIntervals(newData, intervals));
List<EsperRawDriverActivityPoint> points = rawEvents.stream()
.sorted(Comparator.comparing(RawActivityEventDto::occurredAt).thenComparing(RawActivityEventDto::lifecycle))
.map(this::toEsperPoint)
.toList();
for (EsperRawDriverActivityPoint point : points) {
runtime.getEventService().sendEventBean(point, "RawDriverActivityPoint");
}
return intervals.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
} catch (EPCompileException | EPDeployException e) {
throw new IllegalStateException("Cannot compile/deploy Esper PoC EPL", e);
} finally {
if (runtime != null) {
runtime.destroy();
}
}
}
private void collectIntervals(EventBean[] newData, List<ActivityIntervalDto> intervals) {
if (newData == null) {
return;
}
for (EventBean event : newData) {
OffsetDateTime startedAt = (OffsetDateTime) event.get("startedAt");
OffsetDateTime endedAt = (OffsetDateTime) event.get("endedAt");
intervals.add(ActivityIntervalDto.raw(
(UUID) event.get("driverEntityId"),
(UUID) event.get("vehicleId"),
(UUID) event.get("vehicleRegistrationId"),
(String) event.get("activityType"),
(String) event.get("cardSlot"),
startedAt,
endedAt,
(String) event.get("sourceRowId")
));
}
}
private EsperRawDriverActivityPoint toEsperPoint(RawActivityEventDto event) {
return new EsperRawDriverActivityPoint(
event.eventId(),
event.occurredAt(),
event.sourceRowId(),
event.externalSourceEventId(),
event.driverEntityId(),
event.vehicleId(),
event.vehicleRegistrationId(),
event.activityType(),
event.lifecycle(),
event.cardSlot()
);
}
}

View File

@ -0,0 +1,397 @@
package at.procon.eventhub.esperpoc.service;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.DriverWorkSummaryDto;
import at.procon.eventhub.esperpoc.dto.DrivingInterruptionDto;
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
import at.procon.eventhub.esperpoc.dto.EsperPocResultDto;
import at.procon.eventhub.esperpoc.dto.OperatingTimePeriodDto;
import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
import at.procon.eventhub.esperpoc.dto.ShiftDrivingEvaluationDto;
import at.procon.eventhub.esperpoc.persistence.EsperPocActivityRepository;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.springframework.stereotype.Service;
@Service
public class EsperPocDriverCardActivityService {
private final EsperPocActivityRepository activityRepository;
private final EsperDriverActivityEngine esperEngine;
public EsperPocDriverCardActivityService(
EsperPocActivityRepository activityRepository,
EsperDriverActivityEngine esperEngine
) {
this.activityRepository = activityRepository;
this.esperEngine = esperEngine;
}
public EsperPocResultDto evaluate(EsperPocRequest request) {
OffsetDateTime requestedFrom = utc(request.occurredFrom());
OffsetDateTime requestedTo = utc(request.occurredTo());
OffsetDateTime loadedFrom = requestedFrom.minusHours(request.guardHours());
OffsetDateTime loadedTo = requestedTo.plusHours(request.guardHours());
List<RawActivityEventDto> rawEvents = activityRepository.findDriverCardActivityEvents(
request.tenantKey(),
request.driverEntityId(),
loadedFrom,
loadedTo
);
List<ActivityIntervalDto> rawIntervals = esperEngine.buildIntervals(rawEvents);
// Merge in the full guard window first. This is important for long BREAK_REST detection:
// a rest crossing the requested period boundary must keep its full guard-window duration.
List<ActivityIntervalDto> mergedLoadedActivities = mergeConsecutiveIdenticalActivities(
rawIntervals,
Duration.ofSeconds(request.mergeGapSeconds())
);
List<ActivityIntervalDto> mergedActivities = clipToPeriod(mergedLoadedActivities, requestedFrom, requestedTo);
DriverWorkSummaryDto summary = summarize(request, requestedFrom, requestedTo, mergedActivities);
ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving(
mergedActivities,
request.significantDrivingMinutes()
);
List<OperatingTimePeriodDto> operatingTimePeriods = buildOperatingTimePeriods(
request,
requestedFrom,
requestedTo,
mergedLoadedActivities
);
return new EsperPocResultDto(
request.tenantKey(),
request.driverEntityId(),
requestedFrom,
requestedTo,
loadedFrom,
loadedTo,
rawEvents.size(),
rawIntervals.size(),
mergedActivities.size(),
operatingTimePeriods.size(),
request.operatingPeriodSplitRestHours(),
rawEvents,
rawIntervals,
mergedActivities,
operatingTimePeriods,
summary,
summary,
drivingEvaluation,
notes(request)
);
}
public List<ActivityIntervalDto> mergeConsecutiveIdenticalActivities(
List<ActivityIntervalDto> intervals,
Duration mergeGapTolerance
) {
if (intervals == null || intervals.isEmpty()) {
return List.of();
}
List<ActivityIntervalDto> sorted = intervals.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
List<ActivityIntervalDto> result = new ArrayList<>();
ActivityIntervalDto current = null;
List<String> currentSources = new ArrayList<>();
for (ActivityIntervalDto next : sorted) {
if (current == null) {
current = next;
currentSources = new ArrayList<>(next.sourceRowIds());
continue;
}
if (canMerge(current, next, mergeGapTolerance)) {
currentSources.addAll(next.sourceRowIds());
OffsetDateTime newEndedAt = max(current.endedAt(), next.endedAt());
current = new ActivityIntervalDto(
current.driverEntityId(),
current.vehicleId() == null ? next.vehicleId() : current.vehicleId(),
current.vehicleRegistrationId() == null ? next.vehicleRegistrationId() : current.vehicleRegistrationId(),
current.activityType(),
current.cardSlot(),
current.startedAt(),
newEndedAt,
Duration.between(current.startedAt(), newEndedAt).getSeconds(),
current.sourceRowId(),
List.copyOf(currentSources),
current.clippedToRequestedPeriod() || next.clippedToRequestedPeriod(),
"MERGED_ACTIVITY"
);
} else {
result.add(current.asMerged(List.copyOf(currentSources)));
current = next;
currentSources = new ArrayList<>(next.sourceRowIds());
}
}
if (current != null) {
result.add(current.asMerged(List.copyOf(currentSources)));
}
return result;
}
public List<OperatingTimePeriodDto> buildOperatingTimePeriods(
EsperPocRequest request,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
List<ActivityIntervalDto> mergedLoadedActivities
) {
Duration splitRestThreshold = Duration.ofHours(request.operatingPeriodSplitRestHours());
List<ActivityIntervalDto> sorted = mergedLoadedActivities.stream()
.filter(interval -> interval.endedAt().isAfter(interval.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
List<ActivityIntervalDto> longRests = sorted.stream()
.filter(interval -> isOperatingPeriodSplitRest(interval, splitRestThreshold))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt))
.toList();
List<OperatingTimePeriodDto> result = new ArrayList<>();
OffsetDateTime currentSpanStart = requestedFrom;
ActivityIntervalDto previousLongRest = null;
int sequenceNumber = 1;
for (ActivityIntervalDto longRest : longRests) {
if (!longRest.endedAt().isAfter(requestedFrom)) {
previousLongRest = longRest;
continue;
}
if (!longRest.startedAt().isBefore(requestedTo)) {
break;
}
OffsetDateTime restStart = max(longRest.startedAt(), requestedFrom);
OffsetDateTime restEnd = min(longRest.endedAt(), requestedTo);
if (!restEnd.isAfter(restStart)) {
continue;
}
if (restStart.isAfter(currentSpanStart)) {
OperatingTimePeriodDto period = buildOperatingPeriod(
sequenceNumber,
request,
currentSpanStart,
restStart,
previousLongRest,
longRest,
sorted
);
if (period != null) {
result.add(period);
sequenceNumber++;
}
}
if (restEnd.isAfter(currentSpanStart)) {
currentSpanStart = restEnd;
}
previousLongRest = longRest;
}
if (requestedTo.isAfter(currentSpanStart)) {
OperatingTimePeriodDto period = buildOperatingPeriod(
sequenceNumber,
request,
currentSpanStart,
requestedTo,
previousLongRest,
null,
sorted
);
if (period != null) {
result.add(period);
}
}
return result;
}
private OperatingTimePeriodDto buildOperatingPeriod(
int sequenceNumber,
EsperPocRequest request,
OffsetDateTime spanFrom,
OffsetDateTime spanTo,
ActivityIntervalDto splitStartedAfterLongRest,
ActivityIntervalDto splitEndedByLongRest,
List<ActivityIntervalDto> allActivities
) {
List<ActivityIntervalDto> activities = allActivities.stream()
.filter(activity -> activity.startedAt().isBefore(spanTo) && activity.endedAt().isAfter(spanFrom))
.map(activity -> {
OffsetDateTime start = max(activity.startedAt(), spanFrom);
OffsetDateTime end = min(activity.endedAt(), spanTo);
if (!end.isAfter(start)) {
return null;
}
boolean clipped = !start.equals(activity.startedAt()) || !end.equals(activity.endedAt());
return activity.withTime(start, end, clipped);
})
.filter(Objects::nonNull)
.filter(activity -> activity.endedAt().isAfter(activity.startedAt()))
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
if (activities.isEmpty()) {
return null;
}
OffsetDateTime startedAt = activities.get(0).startedAt();
OffsetDateTime endedAt = activities.get(activities.size() - 1).endedAt();
DriverWorkSummaryDto summary = summarize(request, startedAt, endedAt, activities);
ShiftDrivingEvaluationDto drivingEvaluation = evaluateSignificantDriving(
activities,
request.significantDrivingMinutes()
);
return new OperatingTimePeriodDto(
sequenceNumber,
startedAt,
endedAt,
Duration.between(startedAt, endedAt).getSeconds(),
splitStartedAfterLongRest,
splitEndedByLongRest,
activities,
summary,
drivingEvaluation
);
}
private boolean isOperatingPeriodSplitRest(ActivityIntervalDto interval, Duration splitRestThreshold) {
return "BREAK_REST".equals(interval.activityType())
&& interval.durationSeconds() > splitRestThreshold.getSeconds();
}
private boolean canMerge(ActivityIntervalDto left, ActivityIntervalDto right, Duration tolerance) {
boolean sameDriver = Objects.equals(left.driverEntityId(), right.driverEntityId());
boolean sameActivity = Objects.equals(left.activityType(), right.activityType());
boolean sameSlot = Objects.equals(left.cardSlot(), right.cardSlot());
long gapSeconds = Duration.between(left.endedAt(), right.startedAt()).getSeconds();
boolean adjacentOrOverlapping = gapSeconds <= tolerance.getSeconds();
return sameDriver && sameActivity && sameSlot && adjacentOrOverlapping;
}
private List<ActivityIntervalDto> clipToPeriod(
List<ActivityIntervalDto> intervals,
OffsetDateTime periodFrom,
OffsetDateTime periodTo
) {
return intervals.stream()
.map(interval -> {
OffsetDateTime start = max(interval.startedAt(), periodFrom);
OffsetDateTime end = min(interval.endedAt(), periodTo);
if (!end.isAfter(start)) {
return null;
}
boolean clipped = !start.equals(interval.startedAt()) || !end.equals(interval.endedAt());
return interval.withTime(start, end, clipped);
})
.filter(Objects::nonNull)
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt).thenComparing(ActivityIntervalDto::endedAt))
.toList();
}
private DriverWorkSummaryDto summarize(
EsperPocRequest request,
OffsetDateTime periodFrom,
OffsetDateTime periodTo,
List<ActivityIntervalDto> activities
) {
Map<String, Long> secondsByActivity = new LinkedHashMap<>();
for (ActivityIntervalDto activity : activities) {
secondsByActivity.merge(activity.activityType(), activity.durationSeconds(), Long::sum);
}
long drivingSeconds = secondsByActivity.getOrDefault("DRIVE", 0L);
long workSeconds = secondsByActivity.getOrDefault("WORK", 0L);
long availabilitySeconds = secondsByActivity.getOrDefault("AVAILABILITY", 0L);
long breakRestSeconds = secondsByActivity.getOrDefault("BREAK_REST", 0L);
long workingSeconds = drivingSeconds + workSeconds;
long operationSeconds = drivingSeconds + workSeconds + availabilitySeconds;
return new DriverWorkSummaryDto(
request.driverEntityId(),
periodFrom,
periodTo,
drivingSeconds,
workSeconds,
availabilitySeconds,
breakRestSeconds,
workingSeconds,
operationSeconds,
secondsByActivity
);
}
private ShiftDrivingEvaluationDto evaluateSignificantDriving(
List<ActivityIntervalDto> activities,
int significantDrivingMinutes
) {
long significantSeconds = Duration.ofMinutes(significantDrivingMinutes).getSeconds();
List<ActivityIntervalDto> significantDrivingPeriods = activities.stream()
.filter(activity -> "DRIVE".equals(activity.activityType()))
.filter(activity -> activity.durationSeconds() > significantSeconds)
.sorted(Comparator.comparing(ActivityIntervalDto::startedAt))
.toList();
if (significantDrivingPeriods.isEmpty()) {
return new ShiftDrivingEvaluationDto(
significantDrivingMinutes,
null,
null,
null,
null,
List.of()
);
}
List<DrivingInterruptionDto> interruptions = new ArrayList<>();
for (int i = 1; i < significantDrivingPeriods.size(); i++) {
ActivityIntervalDto previous = significantDrivingPeriods.get(i - 1);
ActivityIntervalDto next = significantDrivingPeriods.get(i);
if (next.startedAt().isAfter(previous.endedAt())) {
interruptions.add(new DrivingInterruptionDto(
previous.endedAt(),
next.startedAt(),
Duration.between(previous.endedAt(), next.startedAt()).getSeconds(),
previous.sourceRowId(),
next.sourceRowId()
));
}
}
ActivityIntervalDto first = significantDrivingPeriods.get(0);
ActivityIntervalDto last = significantDrivingPeriods.get(significantDrivingPeriods.size() - 1);
return new ShiftDrivingEvaluationDto(
significantDrivingMinutes,
first.startedAt(),
last.endedAt(),
first,
last,
interruptions
);
}
private List<String> notes(EsperPocRequest request) {
return List.of(
"PoC reads only tachograph DRIVER_CARD/CARD_ACTIVITY source events from eventhub.event.",
"Level RAW contains original imported point events; level Activities contains Esper-created intervals merged by consecutive identical activity.",
"Activities are merged in the guard window first and then clipped to the requested period, so BREAK_REST across the period boundary can still split operating periods correctly.",
"Operating time periods are split by BREAK_REST activities longer than " + request.operatingPeriodSplitRestHours() + " hours.",
"Working seconds = DRIVE + WORK. Operation seconds = DRIVE + WORK + AVAILABILITY. BREAK_REST is reported separately.",
"Departure/arrival are evaluated globally and again inside each operating time period using DRIVE intervals longer than " + request.significantDrivingMinutes() + " minutes."
);
}
private OffsetDateTime utc(OffsetDateTime value) {
return value.withOffsetSameInstant(ZoneOffset.UTC);
}
private OffsetDateTime max(OffsetDateTime left, OffsetDateTime right) {
return left.isAfter(right) ? left : right;
}
private OffsetDateTime min(OffsetDateTime left, OffsetDateTime right) {
return left.isBefore(right) ? left : right;
}
}

View File

@ -0,0 +1,81 @@
package at.procon.eventhub.esperpoc.service;
import java.time.OffsetDateTime;
import java.util.UUID;
public final class EsperRawDriverActivityPoint {
private final UUID eventId;
private final OffsetDateTime occurredAt;
private final String sourceRowId;
private final String externalSourceEventId;
private final UUID driverEntityId;
private final UUID vehicleId;
private final UUID vehicleRegistrationId;
private final String eventType;
private final String lifecycle;
private final String cardSlot;
public EsperRawDriverActivityPoint(
UUID eventId,
OffsetDateTime occurredAt,
String sourceRowId,
String externalSourceEventId,
UUID driverEntityId,
UUID vehicleId,
UUID vehicleRegistrationId,
String eventType,
String lifecycle,
String cardSlot
) {
this.eventId = eventId;
this.occurredAt = occurredAt;
this.sourceRowId = sourceRowId;
this.externalSourceEventId = externalSourceEventId;
this.driverEntityId = driverEntityId;
this.vehicleId = vehicleId;
this.vehicleRegistrationId = vehicleRegistrationId;
this.eventType = eventType;
this.lifecycle = lifecycle;
this.cardSlot = cardSlot;
}
public UUID getEventId() {
return eventId;
}
public OffsetDateTime getOccurredAt() {
return occurredAt;
}
public String getSourceRowId() {
return sourceRowId;
}
public String getExternalSourceEventId() {
return externalSourceEventId;
}
public UUID getDriverEntityId() {
return driverEntityId;
}
public UUID getVehicleId() {
return vehicleId;
}
public UUID getVehicleRegistrationId() {
return vehicleRegistrationId;
}
public String getEventType() {
return eventType;
}
public String getLifecycle() {
return lifecycle;
}
public String getCardSlot() {
return cardSlot;
}
}

View File

@ -0,0 +1,59 @@
package at.procon.eventhub.esperpoc.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class EsperDriverActivityEngineTest {
private final EsperDriverActivityEngine engine = new EsperDriverActivityEngine();
@Test
void buildsActivityIntervalsFromStartAndEndEvents() {
UUID driverId = UUID.randomUUID();
UUID vehicleId = UUID.randomUUID();
List<RawActivityEventDto> raw = List.of(
raw("100", "START", "DRIVE", "2026-04-30T23:55:00Z", driverId, vehicleId),
raw("100", "END", "DRIVE", "2026-05-01T00:10:00Z", driverId, vehicleId),
raw("101", "START", "WORK", "2026-05-01T00:10:00Z", driverId, vehicleId),
raw("101", "END", "WORK", "2026-05-01T00:40:00Z", driverId, vehicleId)
);
var intervals = engine.buildIntervals(raw);
assertThat(intervals).hasSize(2);
assertThat(intervals.get(0).activityType()).isEqualTo("DRIVE");
assertThat(intervals.get(0).durationSeconds()).isEqualTo(15 * 60L);
assertThat(intervals.get(1).activityType()).isEqualTo("WORK");
assertThat(intervals.get(1).durationSeconds()).isEqualTo(30 * 60L);
}
private RawActivityEventDto raw(
String rowId,
String lifecycle,
String activity,
String occurredAt,
UUID driverId,
UUID vehicleId
) {
return new RawActivityEventDto(
UUID.randomUUID(),
OffsetDateTime.parse(occurredAt),
rowId,
"TACHOGRAPH:CARD_ACTIVITY:" + rowId + ":" + lifecycle,
driverId,
vehicleId,
null,
activity,
lifecycle,
"DRIVER",
"INSERTED",
"SINGLE",
"package-1"
);
}
}

View File

@ -0,0 +1,114 @@
package at.procon.eventhub.esperpoc.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class EsperPocDriverCardActivityServiceTest {
private final EsperPocDriverCardActivityService service = new EsperPocDriverCardActivityService(null, null);
@Test
void mergesIdenticalActivitiesAcrossUtcMidnight() {
UUID driverId = UUID.randomUUID();
ActivityIntervalDto beforeMidnight = ActivityIntervalDto.raw(
driverId,
null,
null,
"BREAK_REST",
"DRIVER",
OffsetDateTime.parse("2026-04-30T23:50:00Z"),
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
"1"
);
ActivityIntervalDto afterMidnight = ActivityIntervalDto.raw(
driverId,
null,
null,
"BREAK_REST",
"DRIVER",
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-01T00:20:00Z"),
"2"
);
var merged = service.mergeConsecutiveIdenticalActivities(
List.of(beforeMidnight, afterMidnight),
java.time.Duration.ZERO
);
assertThat(merged).hasSize(1);
assertThat(merged.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-30T23:50:00Z"));
assertThat(merged.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-05-01T00:20:00Z"));
assertThat(merged.get(0).durationSeconds()).isEqualTo(30 * 60L);
assertThat(merged.get(0).sourceRowIds()).containsExactly("1", "2");
}
@Test
void splitsOperatingPeriodsByBreakRestLongerThanConfiguredHoursAndEvaluatesDepartureArrivalPerPeriod() {
UUID driverId = UUID.randomUUID();
EsperPocRequest request = new EsperPocRequest(
"default",
driverId,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
OffsetDateTime.parse("2026-04-03T00:00:00Z"),
24,
3,
60,
7
);
List<ActivityIntervalDto> activities = List.of(
activity(driverId, "DRIVE", "2026-04-01T06:00:00Z", "2026-04-01T08:00:00Z", "d1"),
activity(driverId, "WORK", "2026-04-01T08:00:00Z", "2026-04-01T09:00:00Z", "w1"),
activity(driverId, "DRIVE", "2026-04-01T09:30:00Z", "2026-04-01T11:00:00Z", "d2"),
activity(driverId, "BREAK_REST", "2026-04-01T20:00:00Z", "2026-04-02T04:30:00Z", "r1"),
activity(driverId, "DRIVE", "2026-04-02T05:00:00Z", "2026-04-02T07:00:00Z", "d3"),
activity(driverId, "BREAK_REST", "2026-04-02T10:00:00Z", "2026-04-02T10:30:00Z", "r2"),
activity(driverId, "DRIVE", "2026-04-02T10:30:00Z", "2026-04-02T12:00:00Z", "d4")
);
var periods = service.buildOperatingTimePeriods(
request,
request.occurredFrom(),
request.occurredTo(),
activities
);
assertThat(periods).hasSize(2);
assertThat(periods.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T06:00:00Z"));
assertThat(periods.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z"));
assertThat(periods.get(0).drivingTimeInterruptionEvaluation().departureAt())
.isEqualTo(OffsetDateTime.parse("2026-04-01T06:00:00Z"));
assertThat(periods.get(0).drivingTimeInterruptionEvaluation().arrivalAt())
.isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z"));
assertThat(periods.get(0).drivingTimeInterruptionEvaluation().interruptionsBetweenSignificantDrivingPeriods())
.hasSize(1);
assertThat(periods.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-02T05:00:00Z"));
assertThat(periods.get(1).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-02T12:00:00Z"));
assertThat(periods.get(1).drivingTimeInterruptionEvaluation().departureAt())
.isEqualTo(OffsetDateTime.parse("2026-04-02T05:00:00Z"));
assertThat(periods.get(1).drivingTimeInterruptionEvaluation().arrivalAt())
.isEqualTo(OffsetDateTime.parse("2026-04-02T12:00:00Z"));
assertThat(periods.get(1).workingOperationTimes().breakRestSeconds()).isEqualTo(30 * 60L);
}
private ActivityIntervalDto activity(UUID driverId, String activity, String from, String to, String sourceRowId) {
return ActivityIntervalDto.raw(
driverId,
null,
null,
activity,
"DRIVER",
OffsetDateTime.parse(from),
OffsetDateTime.parse(to),
sourceRowId
).asMerged(List.of(sourceRowId));
}
}