Compare commits

...

3 Commits

Author SHA1 Message Date
trifonovt d5a6254a33 Commit remaining local workspace changes 2026-05-12 13:04:54 +02:00
trifonovt 7be6a08013 Add DTI enrichment evaluation endpoint 2026-05-12 12:05:13 +02:00
trifonovt 94e1227ab3 Add direct JDBC batch ingestion for YellowFox 2026-05-11 11:26:59 +02:00
32 changed files with 2097 additions and 32 deletions

Binary file not shown.

BIN
eventhub-spring-boot.zip Normal file

Binary file not shown.

View File

@ -0,0 +1,207 @@
2026-05-12T11:15:13.519+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-8' version 9.0.0
2026-05-12T11:15:14.119+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-8'
2026-05-12T11:15:14.122+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-9' version 9.0.0
2026-05-12T11:15:14.293+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-9'
2026-05-12T11:15:14.299+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-operating-period-2' version 9.0.0
2026-05-12T11:15:14.348+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-operating-period-2'
2026-05-12T11:15:14.353+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-10' version 9.0.0
2026-05-12T11:15:14.391+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-10'
2026-05-12T11:15:14.395+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-11' version 9.0.0
2026-05-12T11:15:14.438+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-11'
2026-05-12T11:15:14.440+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-12' version 9.0.0
2026-05-12T11:15:14.481+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-12'
2026-05-12T11:15:14.484+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-13' version 9.0.0
2026-05-12T11:15:14.526+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-13'
2026-05-12T11:15:14.528+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-14' version 9.0.0
2026-05-12T11:15:14.568+02:00 INFO 12264 --- [http-nio-8085-exec-5] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-14'
2026-05-12T11:15:14.569+02:00 INFO 12264 --- [http-nio-8085-exec-5] a.p.e.e.s.EsperOperatingPeriodEvaluationService : Esper operating-period evaluation tenant=Procon driverId=026598bf-002d-460d-8b06-ccbd05fb46b7 requestedFrom=2026-04-01T00:00Z requestedTo=2026-05-01T00:00Z loadedFrom=2026-03-31T00:00Z loadedTo=2026-05-02T00:00Z unknownMode=AS_BREAK_REST engineMode=STREAM_COLLECTOR rawEvents=708 cardRawEvents=354 vuRawEvents=354 cardIntervals=176 vuIntervals=168 resolvedKnownIntervals=176 evaluationIntervals=113 periodizedIntervals=113 mergedIntervals=113 nonDrivingIntervals=31 operatingPeriods=5 timingsMs={{dbRetrieve=347, cardIntervalEsper=939, vuIntervalEsper=174, vuGapFill=2, synthUnknown=1, periodizeEsper=51, merge=1, nonDriving=0, total=1738}}
2026-05-12T11:16:09.058+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.s.boot.tomcat.GracefulShutdown : Commencing graceful shutdown. Waiting for active requests to complete
2026-05-12T11:16:09.272+02:00 INFO 12264 --- [tomcat-shutdown] o.s.boot.tomcat.GracefulShutdown : Graceful shutdown complete
2026-05-12T11:16:09.310+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) is shutting down (timeout:45s)
2026-05-12T11:16:11.015+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Routes stopped (total:10)
2026-05-12T11:16:11.016+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped yellowfox-d8-import-start-route (direct://yellowfox-d8-import-start)
2026-05-12T11:16:11.016+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped yellowfox-d8-booking-input-route (direct://yellowfox-d8-booking-input)
2026-05-12T11:16:11.144+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped tachograph-import-start-route (direct://tachograph-import-start)
2026-05-12T11:16:11.145+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped tachograph-activity-input-route (direct://tachograph-activity-input)
2026-05-12T11:16:11.146+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped telematics-position-input-route (direct://telematics-position-input)
2026-05-12T11:16:11.147+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-manual-input-route (direct://eventhub-manual-input)
2026-05-12T11:16:11.149+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-package-input-route (direct://eventhub-package-input)
2026-05-12T11:16:11.149+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-direct-batch-persist-route (direct://eventhub-batch-persist-input)
2026-05-12T11:16:11.149+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-batch-and-persist-route (seda://eventhub-batch-input)
2026-05-12T11:16:11.149+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-normalized-input-route (direct://eventhub-normalized-input)
2026-05-12T11:16:11.352+02:00 INFO 12264 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) shutdown in 2s8ms (uptime:18h55m)
2026-05-12T11:16:11.582+02:00 INFO 12264 --- [SpringApplicationShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2026-05-12T11:16:11.614+02:00 INFO 12264 --- [SpringApplicationShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
2026-05-12T11:23:05.801+02:00 INFO 9580 --- [background-preinit] o.h.validator.internal.util.Version : HV000001: Hibernate Validator 9.0.1.Final
2026-05-12T11:23:06.558+02:00 INFO 9580 --- [main] a.p.e.EventHubIngestionApplication : Starting EventHubIngestionApplication using Java 21.0.2 with PID 9580 (C:\Work\java\gite\eventhub-spring-boot\eventhub-spring-boot\target\classes started by Parzival in C:\Work\java\gite\eventhub-spring-boot\eventhub-spring-boot)
2026-05-12T11:23:06.566+02:00 INFO 9580 --- [main] a.p.e.EventHubIngestionApplication : No active profile set, falling back to 1 default profile: "default"
2026-05-12T11:23:11.900+02:00 INFO 9580 --- [main] o.s.boot.tomcat.TomcatWebServer : Tomcat initialized with port 8085 (http)
2026-05-12T11:23:11.930+02:00 INFO 9580 --- [main] o.a.coyote.http11.Http11NioProtocol : Initializing ProtocolHandler ["http-nio-8085"]
2026-05-12T11:23:11.933+02:00 INFO 9580 --- [main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2026-05-12T11:23:11.933+02:00 INFO 9580 --- [main] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/11.0.21]
2026-05-12T11:23:12.061+02:00 INFO 9580 --- [main] o.s.b.w.c.s.WebApplicationContextInitializer : Root WebApplicationContext: initialization completed in 5194 ms
2026-05-12T11:23:15.477+02:00 INFO 9580 --- [main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 4 endpoints beneath base path '/actuator'
2026-05-12T11:23:15.639+02:00 INFO 9580 --- [main] o.a.coyote.http11.Http11NioProtocol : Starting ProtocolHandler ["http-nio-8085"]
2026-05-12T11:23:15.673+02:00 INFO 9580 --- [main] o.s.boot.tomcat.TomcatWebServer : Tomcat started on port 8085 (http) with context path '/'
2026-05-12T11:23:20.172+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) is starting
2026-05-12T11:23:20.203+02:00 INFO 9580 --- [main] o.a.c.p.aggregate.AggregateProcessor : Defaulting to MemoryAggregationRepository
2026-05-12T11:23:20.204+02:00 INFO 9580 --- [main] o.a.c.p.aggregate.AggregateProcessor : Using CompletionTimeout to trigger after 5000 millis of inactivity.
2026-05-12T11:23:20.276+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Routes startup (total:10)
2026-05-12T11:23:20.277+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-normalized-input-route (direct://eventhub-normalized-input)
2026-05-12T11:23:20.279+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-batch-and-persist-route (seda://eventhub-batch-input)
2026-05-12T11:23:20.281+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-direct-batch-persist-route (direct://eventhub-batch-persist-input)
2026-05-12T11:23:20.281+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-package-input-route (direct://eventhub-package-input)
2026-05-12T11:23:20.282+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-manual-input-route (direct://eventhub-manual-input)
2026-05-12T11:23:20.282+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started telematics-position-input-route (direct://telematics-position-input)
2026-05-12T11:23:20.283+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started tachograph-activity-input-route (direct://tachograph-activity-input)
2026-05-12T11:23:20.284+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started tachograph-import-start-route (direct://tachograph-import-start)
2026-05-12T11:23:20.286+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started yellowfox-d8-booking-input-route (direct://yellowfox-d8-booking-input)
2026-05-12T11:23:20.286+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started yellowfox-d8-import-start-route (direct://yellowfox-d8-import-start)
2026-05-12T11:23:20.286+02:00 INFO 9580 --- [main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) started in 110ms (build:0ms init:0ms start:110ms boot:7s769ms)
2026-05-12T11:23:20.336+02:00 INFO 9580 --- [main] a.p.e.EventHubIngestionApplication : Started EventHubIngestionApplication in 18.093 seconds (process running for 22.74)
2026-05-12T11:23:30.575+02:00 INFO 9580 --- [RMI TCP Connection(5)-169.254.53.84] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2026-05-12T11:23:30.576+02:00 INFO 9580 --- [RMI TCP Connection(5)-169.254.53.84] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2026-05-12T11:23:30.590+02:00 INFO 9580 --- [RMI TCP Connection(5)-169.254.53.84] o.s.web.servlet.DispatcherServlet : Completed initialization in 13 ms
2026-05-12T11:23:35.926+02:00 INFO 9580 --- [RMI TCP Connection(3)-169.254.53.84] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2026-05-12T11:23:36.395+02:00 INFO 9580 --- [RMI TCP Connection(3)-169.254.53.84] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Added connection org.postgresql.jdbc.PgConnection@1a7ba505
2026-05-12T11:23:36.417+02:00 INFO 9580 --- [RMI TCP Connection(3)-169.254.53.84] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2026-05-12T11:23:56.009+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-1' version 9.0.0
2026-05-12T11:23:58.662+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-1'
2026-05-12T11:23:58.675+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-2' version 9.0.0
2026-05-12T11:23:59.023+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-2'
2026-05-12T11:23:59.045+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-operating-period-1' version 9.0.0
2026-05-12T11:23:59.127+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-operating-period-1'
2026-05-12T11:23:59.150+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-3' version 9.0.0
2026-05-12T11:23:59.212+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-3'
2026-05-12T11:23:59.216+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-4' version 9.0.0
2026-05-12T11:23:59.297+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-4'
2026-05-12T11:23:59.301+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-5' version 9.0.0
2026-05-12T11:23:59.357+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-5'
2026-05-12T11:23:59.360+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-6' version 9.0.0
2026-05-12T11:23:59.409+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-6'
2026-05-12T11:23:59.412+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-7' version 9.0.0
2026-05-12T11:23:59.464+02:00 INFO 9580 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-7'
2026-05-12T11:23:59.464+02:00 INFO 9580 --- [http-nio-8085-exec-2] a.p.e.e.s.EsperOperatingPeriodEvaluationService : Esper operating-period evaluation tenant=Procon driverId=026598bf-002d-460d-8b06-ccbd05fb46b7 requestedFrom=2026-04-01T00:00Z requestedTo=2026-05-01T00:00Z loadedFrom=2026-03-31T00:00Z loadedTo=2026-05-02T00:00Z unknownMode=AS_BREAK_REST engineMode=STREAM_COLLECTOR rawEvents=708 cardRawEvents=354 vuRawEvents=354 cardIntervals=176 vuIntervals=168 resolvedKnownIntervals=176 evaluationIntervals=113 periodizedIntervals=113 mergedIntervals=113 nonDrivingIntervals=31 operatingPeriods=5 timingsMs={{dbRetrieve=411, cardIntervalEsper=2793, vuIntervalEsper=355, vuGapFill=9, synthUnknown=4, periodizeEsper=93, merge=3, nonDriving=2, total=4001}}
2026-05-12T11:24:58.174+02:00 WARN 9580 --- [HikariPool-1:housekeeper] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=49s488ms454µs500ns).
2026-05-12T11:26:51.871+02:00 WARN 9580 --- [HikariPool-1:housekeeper] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m9s537ms744µs800ns).
2026-05-12T11:29:00.780+02:00 WARN 9580 --- [HikariPool-1:housekeeper] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=2m8s908ms981µs900ns).
2026-05-12T11:29:00.971+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.s.boot.tomcat.GracefulShutdown : Commencing graceful shutdown. Waiting for active requests to complete
2026-05-12T11:29:01.419+02:00 INFO 9580 --- [tomcat-shutdown] o.s.boot.tomcat.GracefulShutdown : Graceful shutdown complete
2026-05-12T11:29:01.425+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) is shutting down (timeout:45s)
2026-05-12T11:29:01.857+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Routes stopped (total:10)
2026-05-12T11:29:01.858+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped yellowfox-d8-import-start-route (direct://yellowfox-d8-import-start)
2026-05-12T11:29:01.858+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped yellowfox-d8-booking-input-route (direct://yellowfox-d8-booking-input)
2026-05-12T11:29:01.858+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped tachograph-import-start-route (direct://tachograph-import-start)
2026-05-12T11:29:01.858+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped tachograph-activity-input-route (direct://tachograph-activity-input)
2026-05-12T11:29:01.858+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped telematics-position-input-route (direct://telematics-position-input)
2026-05-12T11:29:01.858+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-manual-input-route (direct://eventhub-manual-input)
2026-05-12T11:29:01.860+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-package-input-route (direct://eventhub-package-input)
2026-05-12T11:29:01.863+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-direct-batch-persist-route (direct://eventhub-batch-persist-input)
2026-05-12T11:29:01.863+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-batch-and-persist-route (seda://eventhub-batch-input)
2026-05-12T11:29:01.863+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-normalized-input-route (direct://eventhub-normalized-input)
2026-05-12T11:29:01.885+02:00 INFO 9580 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) shutdown in 459ms (uptime:5m41s)
2026-05-12T11:29:01.998+02:00 INFO 9580 --- [SpringApplicationShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2026-05-12T11:29:02.548+02:00 INFO 9580 --- [SpringApplicationShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
2026-05-12T11:47:04.173+02:00 INFO 29032 --- [background-preinit] o.h.validator.internal.util.Version : HV000001: Hibernate Validator 9.0.1.Final
2026-05-12T11:47:04.534+02:00 INFO 29032 --- [main] a.p.e.EventHubIngestionApplication : Starting EventHubIngestionApplication using Java 21.0.2 with PID 29032 (C:\Work\java\gite\eventhub-spring-boot\eventhub-spring-boot\target\classes started by Parzival in C:\Work\java\gite\eventhub-spring-boot\eventhub-spring-boot)
2026-05-12T11:47:04.536+02:00 INFO 29032 --- [main] a.p.e.EventHubIngestionApplication : No active profile set, falling back to 1 default profile: "default"
2026-05-12T11:47:08.931+02:00 INFO 29032 --- [main] o.s.boot.tomcat.TomcatWebServer : Tomcat initialized with port 8085 (http)
2026-05-12T11:47:08.966+02:00 INFO 29032 --- [main] o.a.coyote.http11.Http11NioProtocol : Initializing ProtocolHandler ["http-nio-8085"]
2026-05-12T11:47:08.970+02:00 INFO 29032 --- [main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2026-05-12T11:47:08.970+02:00 INFO 29032 --- [main] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/11.0.21]
2026-05-12T11:47:09.120+02:00 INFO 29032 --- [main] o.s.b.w.c.s.WebApplicationContextInitializer : Root WebApplicationContext: initialization completed in 4466 ms
2026-05-12T11:47:12.877+02:00 INFO 29032 --- [main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 4 endpoints beneath base path '/actuator'
2026-05-12T11:47:13.064+02:00 INFO 29032 --- [main] o.a.coyote.http11.Http11NioProtocol : Starting ProtocolHandler ["http-nio-8085"]
2026-05-12T11:47:13.108+02:00 INFO 29032 --- [main] o.s.boot.tomcat.TomcatWebServer : Tomcat started on port 8085 (http) with context path '/'
2026-05-12T11:47:17.510+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) is starting
2026-05-12T11:47:17.593+02:00 INFO 29032 --- [main] o.a.c.p.aggregate.AggregateProcessor : Defaulting to MemoryAggregationRepository
2026-05-12T11:47:17.600+02:00 INFO 29032 --- [main] o.a.c.p.aggregate.AggregateProcessor : Using CompletionTimeout to trigger after 5000 millis of inactivity.
2026-05-12T11:47:17.646+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Routes startup (total:10)
2026-05-12T11:47:17.646+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-normalized-input-route (direct://eventhub-normalized-input)
2026-05-12T11:47:17.646+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-batch-and-persist-route (seda://eventhub-batch-input)
2026-05-12T11:47:17.646+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-direct-batch-persist-route (direct://eventhub-batch-persist-input)
2026-05-12T11:47:17.646+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-package-input-route (direct://eventhub-package-input)
2026-05-12T11:47:17.647+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started eventhub-manual-input-route (direct://eventhub-manual-input)
2026-05-12T11:47:17.647+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started telematics-position-input-route (direct://telematics-position-input)
2026-05-12T11:47:17.648+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started tachograph-activity-input-route (direct://tachograph-activity-input)
2026-05-12T11:47:17.648+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started tachograph-import-start-route (direct://tachograph-import-start)
2026-05-12T11:47:17.648+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started yellowfox-d8-booking-input-route (direct://yellowfox-d8-booking-input)
2026-05-12T11:47:17.648+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Started yellowfox-d8-import-start-route (direct://yellowfox-d8-import-start)
2026-05-12T11:47:17.649+02:00 INFO 29032 --- [main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) started in 137ms (build:0ms init:0ms start:137ms boot:7s937ms)
2026-05-12T11:47:17.672+02:00 INFO 29032 --- [main] a.p.e.EventHubIngestionApplication : Started EventHubIngestionApplication in 15.724 seconds (process running for 19.226)
2026-05-12T11:47:22.629+02:00 INFO 29032 --- [RMI TCP Connection(8)-169.254.53.84] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2026-05-12T11:47:22.629+02:00 INFO 29032 --- [RMI TCP Connection(8)-169.254.53.84] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2026-05-12T11:47:22.637+02:00 INFO 29032 --- [RMI TCP Connection(8)-169.254.53.84] o.s.web.servlet.DispatcherServlet : Completed initialization in 7 ms
2026-05-12T11:47:28.596+02:00 INFO 29032 --- [RMI TCP Connection(6)-169.254.53.84] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2026-05-12T11:47:29.328+02:00 INFO 29032 --- [RMI TCP Connection(6)-169.254.53.84] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Added connection org.postgresql.jdbc.PgConnection@40294cec
2026-05-12T11:47:29.337+02:00 INFO 29032 --- [RMI TCP Connection(6)-169.254.53.84] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2026-05-12T11:47:47.390+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-1' version 9.0.0
2026-05-12T11:47:50.506+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-1'
2026-05-12T11:47:50.516+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-2' version 9.0.0
2026-05-12T11:47:50.761+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-2'
2026-05-12T11:47:50.785+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-operating-period-1' version 9.0.0
2026-05-12T11:47:50.874+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-operating-period-1'
2026-05-12T11:47:50.896+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-3' version 9.0.0
2026-05-12T11:47:50.961+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-3'
2026-05-12T11:47:50.968+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-4' version 9.0.0
2026-05-12T11:47:51.069+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-4'
2026-05-12T11:47:51.072+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-5' version 9.0.0
2026-05-12T11:47:51.143+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-5'
2026-05-12T11:47:51.148+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-6' version 9.0.0
2026-05-12T11:47:51.233+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-6'
2026-05-12T11:47:51.238+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-7' version 9.0.0
2026-05-12T11:47:51.328+02:00 INFO 29032 --- [http-nio-8085-exec-2] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-7'
2026-05-12T11:47:51.331+02:00 INFO 29032 --- [http-nio-8085-exec-2] a.p.e.e.s.EsperOperatingPeriodEvaluationService : Esper operating-period evaluation tenant=Procon driverId=026598bf-002d-460d-8b06-ccbd05fb46b7 requestedFrom=2026-04-01T00:00Z requestedTo=2026-05-01T00:00Z loadedFrom=2026-03-31T00:00Z loadedTo=2026-05-02T00:00Z sourceSelectionMode=MIXED unknownMode=AS_BREAK_REST engineMode=STREAM_COLLECTOR rawEvents=708 cardRawEvents=354 vuRawEvents=354 cardIntervals=176 vuIntervals=168 resolvedKnownIntervals=176 evaluationIntervals=113 periodizedIntervals=113 mergedIntervals=113 nonDrivingIntervals=31 operatingPeriods=5 timingsMs={{dbRetrieve=636, cardIntervalEsper=3333, vuIntervalEsper=251, vuGapFill=13, synthUnknown=5, periodizeEsper=96, merge=3, nonDriving=2, total=4792}}
2026-05-12T11:50:30.990+02:00 WARN 29032 --- [HikariPool-1:housekeeper] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=2m28s615ms199µs300ns).
2026-05-12T11:50:42.829+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-8' version 9.0.0
2026-05-12T11:50:43.060+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-8'
2026-05-12T11:50:43.065+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-9' version 9.0.0
2026-05-12T11:50:43.248+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-9'
2026-05-12T11:50:43.257+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-operating-period-2' version 9.0.0
2026-05-12T11:50:43.310+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-operating-period-2'
2026-05-12T11:50:43.314+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-10' version 9.0.0
2026-05-12T11:50:43.364+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-10'
2026-05-12T11:50:43.368+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-11' version 9.0.0
2026-05-12T11:50:43.418+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-11'
2026-05-12T11:50:43.421+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-12' version 9.0.0
2026-05-12T11:50:43.471+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-12'
2026-05-12T11:50:43.474+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-13' version 9.0.0
2026-05-12T11:50:43.537+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-13'
2026-05-12T11:50:43.541+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-14' version 9.0.0
2026-05-12T11:50:43.590+02:00 INFO 29032 --- [http-nio-8085-exec-4] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-14'
2026-05-12T11:50:43.590+02:00 INFO 29032 --- [http-nio-8085-exec-4] a.p.e.e.s.EsperOperatingPeriodEvaluationService : Esper operating-period evaluation tenant=Procon driverId=026598bf-002d-460d-8b06-ccbd05fb46b7 requestedFrom=2026-04-01T00:00Z requestedTo=2026-05-01T00:00Z loadedFrom=2026-03-31T00:00Z loadedTo=2026-05-02T00:00Z sourceSelectionMode=MIXED unknownMode=AS_BREAK_REST engineMode=STREAM_COLLECTOR rawEvents=708 cardRawEvents=354 vuRawEvents=354 cardIntervals=176 vuIntervals=168 resolvedKnownIntervals=176 evaluationIntervals=113 periodizedIntervals=113 mergedIntervals=113 nonDrivingIntervals=31 operatingPeriods=5 timingsMs={{dbRetrieve=25, cardIntervalEsper=236, vuIntervalEsper=189, vuGapFill=3, synthUnknown=2, periodizeEsper=56, merge=1, nonDriving=0, total=790}}
2026-05-12T11:51:45.110+02:00 WARN 29032 --- [HikariPool-1:housekeeper] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m14s121ms587µs300ns).
2026-05-12T11:52:29.604+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-15' version 9.0.0
2026-05-12T11:52:29.671+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.s.boot.tomcat.GracefulShutdown : Commencing graceful shutdown. Waiting for active requests to complete
2026-05-12T11:52:29.787+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-15'
2026-05-12T11:52:29.789+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-16' version 9.0.0
2026-05-12T11:52:29.991+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-16'
2026-05-12T11:52:29.995+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-operating-period-3' version 9.0.0
2026-05-12T11:52:30.034+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-operating-period-3'
2026-05-12T11:52:30.038+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-17' version 9.0.0
2026-05-12T11:52:30.072+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-17'
2026-05-12T11:52:30.073+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-18' version 9.0.0
2026-05-12T11:52:30.102+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-18'
2026-05-12T11:52:30.104+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-19' version 9.0.0
2026-05-12T11:52:30.130+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-19'
2026-05-12T11:52:30.132+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-20' version 9.0.0
2026-05-12T11:52:30.171+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-20'
2026-05-12T11:52:30.176+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Initializing runtime URI 'eventhub-esper-poc-21' version 9.0.0
2026-05-12T11:52:30.406+02:00 INFO 29032 --- [http-nio-8085-exec-7] c.e.e.r.i.kernel.service.EPRuntimeImpl : Destroying runtime URI 'eventhub-esper-poc-21'
2026-05-12T11:52:30.408+02:00 INFO 29032 --- [http-nio-8085-exec-7] a.p.e.e.s.EsperOperatingPeriodEvaluationService : Esper operating-period evaluation tenant=Procon driverId=026598bf-002d-460d-8b06-ccbd05fb46b7 requestedFrom=2026-04-01T00:00Z requestedTo=2026-05-01T00:00Z loadedFrom=2026-03-31T00:00Z loadedTo=2026-05-02T00:00Z sourceSelectionMode=MIXED unknownMode=AS_BREAK_REST engineMode=STREAM_COLLECTOR rawEvents=708 cardRawEvents=354 vuRawEvents=354 cardIntervals=176 vuIntervals=168 resolvedKnownIntervals=176 evaluationIntervals=113 periodizedIntervals=113 mergedIntervals=113 nonDrivingIntervals=31 operatingPeriods=5 timingsMs={{dbRetrieve=39, cardIntervalEsper=186, vuIntervalEsper=203, vuGapFill=2, synthUnknown=1, periodizeEsper=41, merge=0, nonDriving=0, total=845}}
2026-05-12T11:52:30.449+02:00 INFO 29032 --- [tomcat-shutdown] o.s.boot.tomcat.GracefulShutdown : Graceful shutdown complete
2026-05-12T11:52:30.458+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) is shutting down (timeout:45s)
2026-05-12T11:52:30.590+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Routes stopped (total:10)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped yellowfox-d8-import-start-route (direct://yellowfox-d8-import-start)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped yellowfox-d8-booking-input-route (direct://yellowfox-d8-booking-input)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped tachograph-import-start-route (direct://tachograph-import-start)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped tachograph-activity-input-route (direct://tachograph-activity-input)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped telematics-position-input-route (direct://telematics-position-input)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-manual-input-route (direct://eventhub-manual-input)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-package-input-route (direct://eventhub-package-input)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-direct-batch-persist-route (direct://eventhub-batch-persist-input)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-batch-and-persist-route (seda://eventhub-batch-input)
2026-05-12T11:52:30.591+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Stopped eventhub-normalized-input-route (direct://eventhub-normalized-input)
2026-05-12T11:52:30.599+02:00 INFO 29032 --- [SpringApplicationShutdownHook] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 4.18.2 (camel-1) shutdown in 140ms (uptime:5m13s)
2026-05-12T11:52:30.611+02:00 INFO 29032 --- [SpringApplicationShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2026-05-12T11:52:30.615+02:00 INFO 29032 --- [SpringApplicationShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -64,7 +64,7 @@
"method": "GET",
"header": [],
"url": {
"raw": "{{baseUrl}}/api/eventhub/esper-poc/tachograph/operating-period-evaluation?tenantKey={{tenantKey}}&driverId={{driverId}}&occurredFrom={{occurredFrom}}&occurredTo={{occurredTo}}&guardHours=24&operatingSplitIdleHours=7&significantDrivingMinutes=3&mergeGapSeconds=0&gapDetectionToleranceSeconds=0&unknownTreatmentMode=AS_BREAK_REST",
"raw": "{{baseUrl}}/api/eventhub/esper-poc/tachograph/operating-period-evaluation?tenantKey={{tenantKey}}&driverId={{driverId}}&occurredFrom={{occurredFrom}}&occurredTo={{occurredTo}}&guardHours=24&operatingSplitIdleHours=7&significantDrivingMinutes=3&mergeGapSeconds=0&gapDetectionToleranceSeconds=0&sourceSelectionMode=MIXED&unknownTreatmentMode=AS_BREAK_REST",
"host": [
"{{baseUrl}}"
],
@ -112,6 +112,10 @@
"key": "gapDetectionToleranceSeconds",
"value": "0"
},
{
"key": "sourceSelectionMode",
"value": "MIXED"
},
{
"key": "unknownTreatmentMode",
"value": "AS_BREAK_REST"
@ -119,6 +123,84 @@
]
}
}
},
{
"name": "Evaluate tachograph DTI enrichment",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{baseUrl}}/api/eventhub/esper-poc/tachograph/dti-enrichment?tenantKey={{tenantKey}}&driverId={{driverId}}&occurredFrom={{occurredFrom}}&occurredTo={{occurredTo}}&guardHours=24&operatingSplitIdleHours=7&significantDrivingMinutes=3&mergeGapSeconds=0&gapDetectionToleranceSeconds=0&sourceSelectionMode=MIXED&unknownTreatmentMode=AS_BREAK_REST&vehicleEvidenceLookbackHours=720&geoSearchWindowMinutes=180&vicinityWindowMinutes=180",
"host": [
"{{baseUrl}}"
],
"path": [
"api",
"eventhub",
"esper-poc",
"tachograph",
"dti-enrichment"
],
"query": [
{
"key": "tenantKey",
"value": "{{tenantKey}}"
},
{
"key": "driverId",
"value": "{{driverId}}"
},
{
"key": "occurredFrom",
"value": "{{occurredFrom}}"
},
{
"key": "occurredTo",
"value": "{{occurredTo}}"
},
{
"key": "guardHours",
"value": "24"
},
{
"key": "operatingSplitIdleHours",
"value": "7"
},
{
"key": "significantDrivingMinutes",
"value": "3"
},
{
"key": "mergeGapSeconds",
"value": "0"
},
{
"key": "gapDetectionToleranceSeconds",
"value": "0"
},
{
"key": "sourceSelectionMode",
"value": "MIXED"
},
{
"key": "unknownTreatmentMode",
"value": "AS_BREAK_REST"
},
{
"key": "vehicleEvidenceLookbackHours",
"value": "720"
},
{
"key": "geoSearchWindowMinutes",
"value": "180"
},
{
"key": "vicinityWindowMinutes",
"value": "180"
}
]
}
}
}
],
"variable": [

View File

@ -354,6 +354,9 @@ public class EventHubProperties {
/** Overlap used by incremental utc/eventid cursor imports to catch late rows and ignition transitions. */
private Duration occurredAtOverlap = Duration.ofHours(2);
/** How JDBC extraction batches are handed over to the ingest pipeline. */
private JdbcExtractionIngestMode jdbcExtractionIngestMode = JdbcExtractionIngestMode.SYNC_DIRECT;
/** Regular scheduler scan interval; each configured plan still uses its own cron. */
private Duration schedulerPollInterval = Duration.ofMinutes(1);
@ -392,6 +395,16 @@ public class EventHubProperties {
}
}
public JdbcExtractionIngestMode getJdbcExtractionIngestMode() {
return jdbcExtractionIngestMode;
}
public void setJdbcExtractionIngestMode(JdbcExtractionIngestMode jdbcExtractionIngestMode) {
this.jdbcExtractionIngestMode = jdbcExtractionIngestMode == null
? JdbcExtractionIngestMode.SYNC_DIRECT
: jdbcExtractionIngestMode;
}
public Duration getSchedulerPollInterval() {
return schedulerPollInterval;
}

View File

@ -1,13 +1,17 @@
package at.procon.eventhub.esperpoc.api;
import at.procon.eventhub.esperpoc.dto.EsperActivityMergeMode;
import at.procon.eventhub.esperpoc.dto.EsperDtiEnrichmentRequest;
import at.procon.eventhub.esperpoc.dto.EsperDtiEnrichmentResultDto;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
import at.procon.eventhub.esperpoc.dto.EsperPocRequest;
import at.procon.eventhub.esperpoc.dto.EsperPocResultDto;
import at.procon.eventhub.esperpoc.dto.EsperShiftResolutionMode;
import at.procon.eventhub.esperpoc.dto.EsperSourceSelectionMode;
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
import at.procon.eventhub.esperpoc.service.EsperDtiEnrichmentService;
import at.procon.eventhub.esperpoc.service.EsperOperatingPeriodEvaluationService;
import at.procon.eventhub.esperpoc.service.EsperPocDriverCardActivityService;
import java.time.OffsetDateTime;
@ -25,13 +29,16 @@ public class EsperPocController {
private final EsperPocDriverCardActivityService service;
private final EsperOperatingPeriodEvaluationService operatingPeriodEvaluationService;
private final EsperDtiEnrichmentService dtiEnrichmentService;
public EsperPocController(
EsperPocDriverCardActivityService service,
EsperOperatingPeriodEvaluationService operatingPeriodEvaluationService
EsperOperatingPeriodEvaluationService operatingPeriodEvaluationService,
EsperDtiEnrichmentService dtiEnrichmentService
) {
this.service = service;
this.operatingPeriodEvaluationService = operatingPeriodEvaluationService;
this.dtiEnrichmentService = dtiEnrichmentService;
}
@GetMapping("/tachograph/driver-card-activities")
@ -77,6 +84,7 @@ public class EsperPocController {
@RequestParam(required = false) Integer significantDrivingMinutes,
@RequestParam(required = false) Integer mergeGapSeconds,
@RequestParam(required = false) Integer gapDetectionToleranceSeconds,
@RequestParam(required = false) EsperSourceSelectionMode sourceSelectionMode,
@RequestParam(required = false) EsperUnknownTreatmentMode unknownTreatmentMode,
@RequestParam(required = false) EsperOperatingPeriodEngineMode engineMode
) {
@ -90,9 +98,48 @@ public class EsperPocController {
significantDrivingMinutes,
mergeGapSeconds,
gapDetectionToleranceSeconds,
sourceSelectionMode,
unknownTreatmentMode,
engineMode
);
return ResponseEntity.ok(operatingPeriodEvaluationService.evaluate(request));
}
@GetMapping("/tachograph/dti-enrichment")
public ResponseEntity<EsperDtiEnrichmentResultDto> evaluateDtiEnrichment(
@RequestParam String tenantKey,
@RequestParam UUID driverId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredFrom,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime occurredTo,
@RequestParam(defaultValue = "24") Integer guardHours,
@RequestParam(required = false) Integer operatingSplitIdleHours,
@RequestParam(required = false) Integer significantDrivingMinutes,
@RequestParam(required = false) Integer mergeGapSeconds,
@RequestParam(required = false) Integer gapDetectionToleranceSeconds,
@RequestParam(required = false) EsperSourceSelectionMode sourceSelectionMode,
@RequestParam(required = false) EsperUnknownTreatmentMode unknownTreatmentMode,
@RequestParam(required = false) EsperOperatingPeriodEngineMode engineMode,
@RequestParam(required = false) Integer vehicleEvidenceLookbackHours,
@RequestParam(required = false) Integer geoSearchWindowMinutes,
@RequestParam(required = false) Integer vicinityWindowMinutes
) {
EsperDtiEnrichmentRequest request = new EsperDtiEnrichmentRequest(
tenantKey,
driverId,
occurredFrom,
occurredTo,
guardHours,
operatingSplitIdleHours,
significantDrivingMinutes,
mergeGapSeconds,
gapDetectionToleranceSeconds,
sourceSelectionMode,
unknownTreatmentMode,
engineMode,
vehicleEvidenceLookbackHours,
geoSearchWindowMinutes,
vicinityWindowMinutes
);
return ResponseEntity.ok(dtiEnrichmentService.evaluate(request));
}
}

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.esperpoc.dto;
import at.procon.eventhub.dto.GeoPointDto;
import java.time.OffsetDateTime;
import java.util.UUID;
public record DtiBoundaryPositionDto(
OffsetDateTime evidenceAt,
GeoPointDto position,
String eventDomain,
String eventType,
String sourceKind,
String extractionCode,
UUID vehicleId,
UUID vehicleRegistrationId,
String country,
String region,
String countryFrom,
String countryTo,
String operation,
long deltaSeconds,
int confidence,
String evidenceSourceRowId
) {
}

View File

@ -0,0 +1,17 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
public record DtiBoundaryVehicleDto(
UUID vehicleId,
UUID vehicleRegistrationId,
String vehicleVin,
String resolutionSource,
int confidence,
OffsetDateTime evidenceStartedAt,
OffsetDateTime evidenceEndedAt,
List<String> evidenceSourceRowIds
) {
}

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.esperpoc.dto;
import at.procon.eventhub.dto.GeoPointDto;
import java.time.OffsetDateTime;
import java.util.UUID;
public record DtiBoundaryVicinityEventDto(
OffsetDateTime occurredAt,
String eventDomain,
String eventType,
String lifecycle,
String sourceKind,
String extractionCode,
UUID vehicleId,
UUID vehicleRegistrationId,
GeoPointDto position,
String country,
String region,
String countryFrom,
String countryTo,
String operation,
long deltaSeconds,
String sourceRowId
) {
}

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
public record EnrichedDtiIntervalDto(
String dtiId,
UUID driverId,
String intervalKind,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
long operatingPeriodNo,
OffsetDateTime operatingPeriodStartedAt,
String previousDrivingSourceRowId,
String nextDrivingSourceRowId,
DtiBoundaryVehicleDto beginVehicle,
DtiBoundaryVehicleDto endVehicle,
DtiBoundaryPositionDto beginPosition,
DtiBoundaryPositionDto endPosition,
List<DtiBoundaryVicinityEventDto> beginVicinityEvents,
List<DtiBoundaryVicinityEventDto> endVicinityEvents
) {
}

View File

@ -0,0 +1,38 @@
package at.procon.eventhub.esperpoc.dto;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.time.OffsetDateTime;
import java.util.UUID;
public record EsperDtiEnrichmentRequest(
@NotBlank String tenantKey,
@NotNull UUID driverId,
@NotNull OffsetDateTime occurredFrom,
@NotNull OffsetDateTime occurredTo,
Integer guardHours,
Integer operatingSplitIdleHours,
Integer significantDrivingMinutes,
Integer mergeGapSeconds,
Integer gapDetectionToleranceSeconds,
EsperSourceSelectionMode sourceSelectionMode,
EsperUnknownTreatmentMode unknownTreatmentMode,
EsperOperatingPeriodEngineMode engineMode,
Integer vehicleEvidenceLookbackHours,
Integer geoSearchWindowMinutes,
Integer vicinityWindowMinutes
) {
public EsperDtiEnrichmentRequest {
if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) {
throw new IllegalArgumentException("occurredFrom must be before occurredTo");
}
guardHours = guardHours == null ? 24 : Math.max(0, guardHours);
operatingSplitIdleHours = operatingSplitIdleHours == null ? null : Math.max(1, operatingSplitIdleHours);
significantDrivingMinutes = significantDrivingMinutes == null ? null : Math.max(1, significantDrivingMinutes);
mergeGapSeconds = mergeGapSeconds == null ? null : Math.max(0, mergeGapSeconds);
gapDetectionToleranceSeconds = gapDetectionToleranceSeconds == null ? null : Math.max(0, gapDetectionToleranceSeconds);
vehicleEvidenceLookbackHours = vehicleEvidenceLookbackHours == null ? 24 * 30 : Math.max(1, vehicleEvidenceLookbackHours);
geoSearchWindowMinutes = geoSearchWindowMinutes == null ? 180 : Math.max(1, geoSearchWindowMinutes);
vicinityWindowMinutes = vicinityWindowMinutes == null ? 180 : Math.max(1, vicinityWindowMinutes);
}
}

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.esperpoc.dto;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
public record EsperDtiEnrichmentResultDto(
String tenantKey,
UUID driverId,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
OffsetDateTime loadedFrom,
OffsetDateTime loadedTo,
OffsetDateTime supportFrom,
OffsetDateTime supportTo,
int pureDtiCount,
int supportEventCount,
int vehicleUsageIntervalCount,
int geoSearchWindowMinutes,
int vicinityWindowMinutes,
int vehicleEvidenceLookbackHours,
List<EnrichedDtiIntervalDto> dtiIntervals,
List<String> notes
) {
}

View File

@ -15,6 +15,7 @@ public record EsperOperatingPeriodRequest(
Integer significantDrivingMinutes,
Integer mergeGapSeconds,
Integer gapDetectionToleranceSeconds,
EsperSourceSelectionMode sourceSelectionMode,
EsperUnknownTreatmentMode unknownTreatmentMode,
EsperOperatingPeriodEngineMode engineMode
) {

View File

@ -26,6 +26,7 @@ public record EsperOperatingPeriodResultDto(
int significantDrivingMinutes,
int mergeGapSeconds,
int gapDetectionToleranceSeconds,
EsperSourceSelectionMode sourceSelectionMode,
EsperUnknownTreatmentMode unknownTreatmentMode,
EsperOperatingPeriodEngineMode engineMode,
List<RawActivityEventDto> rawEvents,

View File

@ -0,0 +1,6 @@
package at.procon.eventhub.esperpoc.dto;
public enum EsperSourceSelectionMode {
MIXED,
DRIVER_CARD_ONLY
}

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.esperpoc.dto;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
import java.util.UUID;
public record EsperSupportEventDto(
UUID eventId,
OffsetDateTime occurredAt,
String sourceRowId,
String externalSourceEventId,
String sourceKind,
String extractionCode,
UUID driverId,
UUID driverCardId,
UUID vehicleId,
UUID vehicleRegistrationId,
String eventDomain,
String eventType,
String lifecycle,
String cardSlot,
BigDecimal latitude,
BigDecimal longitude,
String country,
String region,
String countryFrom,
String countryTo,
String operation
) {
}

View File

@ -0,0 +1,125 @@
package at.procon.eventhub.esperpoc.persistence;
import at.procon.eventhub.esperpoc.dto.EsperSupportEventDto;
import at.procon.eventhub.esperpoc.dto.EsperSourceSelectionMode;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class EsperPocDtiEnrichmentRepository {
private final JdbcTemplate jdbcTemplate;
public EsperPocDtiEnrichmentRepository(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public List<EsperSupportEventDto> findDriverSupportEvents(
String tenantKey,
UUID driverId,
OffsetDateTime supportFrom,
OffsetDateTime supportTo,
int precedingDriverCardEventCount,
EsperSourceSelectionMode sourceSelectionMode
) {
return jdbcTemplate.query(
"""
with candidate as (
select
event.id,
event.occurred_at,
coalesce(
event.payload #>> '{raw,sourceRowId}',
regexp_replace(event.external_source_event_id, ':(START|END|INSERT|WITHDRAW)$', '')
) as source_row_id,
event.external_source_event_id,
source.source_kind,
coalesce(pkg.extraction_code, '') as extraction_code,
event.driver_id,
event.driver_card_id,
event.vehicle_id,
event.vehicle_registration_id,
event.event_domain,
event.event_type,
event.lifecycle,
detail.attributes ->> 'cardSlot' as card_slot,
st_y(event.position::geometry) as latitude,
st_x(event.position::geometry) as longitude,
detail.attributes ->> 'country' as country,
detail.attributes ->> 'region' as region,
detail.attributes ->> 'countryFrom' as country_from,
detail.attributes ->> 'countryTo' as country_to,
detail.attributes ->> 'operation' as operation
from eventhub.event event
join eventhub.event_source source on source.id = event.event_source_id
join eventhub.data_package pkg on pkg.id = event.data_package_id
left join lateral (
select detail.attributes
from eventhub.event_detail detail
where detail.event_occurred_at = event.occurred_at
and detail.event_id = event.id
order by detail.detail_type
limit 1
) detail on true
where pkg.tenant_key = ?
and source.provider_key = 'TACHOGRAPH'
and event.driver_id = ?
and (? <> 'DRIVER_CARD_ONLY' or source.source_kind = 'DRIVER_CARD')
and event.event_domain in ('DRIVER_CARD', 'POSITION', 'PLACE', 'BORDER_CROSSING', 'LOAD_UNLOAD', 'SPECIFIC_CONDITION', 'SPEEDING')
and event.occurred_at < ?
),
in_range as (
select * from candidate where occurred_at >= ?
),
preceding_driver_card as (
select *
from candidate
where occurred_at < ?
and event_domain = 'DRIVER_CARD'
order by occurred_at desc, id desc
limit ?
)
select *
from (
select * from in_range
union all
select * from preceding_driver_card
) result
order by occurred_at, lifecycle, event_domain, event_type, id
""",
(rs, rowNum) -> new EsperSupportEventDto(
(UUID) rs.getObject("id"),
rs.getObject("occurred_at", OffsetDateTime.class),
rs.getString("source_row_id"),
rs.getString("external_source_event_id"),
rs.getString("source_kind"),
rs.getString("extraction_code"),
(UUID) rs.getObject("driver_id"),
(UUID) rs.getObject("driver_card_id"),
(UUID) rs.getObject("vehicle_id"),
(UUID) rs.getObject("vehicle_registration_id"),
rs.getString("event_domain"),
rs.getString("event_type"),
rs.getString("lifecycle"),
rs.getString("card_slot"),
rs.getBigDecimal("latitude"),
rs.getBigDecimal("longitude"),
rs.getString("country"),
rs.getString("region"),
rs.getString("country_from"),
rs.getString("country_to"),
rs.getString("operation")
),
tenantKey,
driverId,
sourceSelectionMode == null ? EsperSourceSelectionMode.MIXED.name() : sourceSelectionMode.name(),
supportTo,
supportFrom,
supportFrom,
precedingDriverCardEventCount
);
}
}

View File

@ -0,0 +1,649 @@
package at.procon.eventhub.esperpoc.service;
import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.esperpoc.dto.DtiBoundaryPositionDto;
import at.procon.eventhub.esperpoc.dto.DtiBoundaryVehicleDto;
import at.procon.eventhub.esperpoc.dto.DtiBoundaryVicinityEventDto;
import at.procon.eventhub.esperpoc.dto.EnrichedDtiIntervalDto;
import at.procon.eventhub.esperpoc.dto.EsperDtiEnrichmentRequest;
import at.procon.eventhub.esperpoc.dto.EsperDtiEnrichmentResultDto;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
import at.procon.eventhub.esperpoc.dto.OperatingPeriodDto;
import at.procon.eventhub.esperpoc.dto.EsperSupportEventDto;
import at.procon.eventhub.esperpoc.persistence.EsperPocDtiEnrichmentRepository;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.springframework.stereotype.Service;
@Service
public class EsperDtiEnrichmentService {
private static final Set<String> VEHICLE_INTERVAL_EXTRACTION_CODES = Set.of("IW_CYCLE", "CARD_VEHICLES_USED");
private static final int PRECEDING_DRIVER_CARD_EVENT_COUNT = 50;
private final EsperOperatingPeriodEvaluationService operatingPeriodEvaluationService;
private final EsperPocDtiEnrichmentRepository enrichmentRepository;
public EsperDtiEnrichmentService(
EsperOperatingPeriodEvaluationService operatingPeriodEvaluationService,
EsperPocDtiEnrichmentRepository enrichmentRepository
) {
this.operatingPeriodEvaluationService = operatingPeriodEvaluationService;
this.enrichmentRepository = enrichmentRepository;
}
public EsperDtiEnrichmentResultDto evaluate(EsperDtiEnrichmentRequest request) {
EsperOperatingPeriodRequest operatingRequest = new EsperOperatingPeriodRequest(
request.tenantKey(),
request.driverId(),
request.occurredFrom(),
request.occurredTo(),
request.guardHours(),
request.operatingSplitIdleHours(),
request.significantDrivingMinutes(),
request.mergeGapSeconds(),
request.gapDetectionToleranceSeconds(),
request.sourceSelectionMode(),
request.unknownTreatmentMode(),
request.engineMode()
);
EsperOperatingPeriodResultDto pureDtiResult = operatingPeriodEvaluationService.evaluate(operatingRequest);
OffsetDateTime supportFrom = request.occurredFrom().minusHours(request.vehicleEvidenceLookbackHours());
OffsetDateTime supportTo = request.occurredTo().plusHours(Math.max(request.guardHours(), 24));
List<EsperSupportEventDto> rawSupportEvents = enrichmentRepository.findDriverSupportEvents(
request.tenantKey(),
request.driverId(),
supportFrom,
supportTo,
PRECEDING_DRIVER_CARD_EVENT_COUNT,
request.sourceSelectionMode()
);
List<ResolvedVehicleUsageInterval> vehicleUsageIntervals = mergeVehicleUsageIntervals(buildVehicleUsageIntervals(rawSupportEvents));
List<EsperSupportEventDto> supportEvents = condenseSupportEvents(rawSupportEvents);
List<PureDtiInterval> pureDtiIntervals = extractPureDtiIntervals(pureDtiResult);
List<EnrichedDtiIntervalDto> enrichedIntervals = pureDtiIntervals.stream()
.map(interval -> enrichInterval(interval, supportEvents, vehicleUsageIntervals, request))
.toList();
return new EsperDtiEnrichmentResultDto(
request.tenantKey(),
request.driverId(),
pureDtiResult.requestedFrom(),
pureDtiResult.requestedTo(),
pureDtiResult.loadedFrom(),
pureDtiResult.loadedTo(),
supportFrom,
supportTo,
pureDtiIntervals.size(),
supportEvents.size(),
vehicleUsageIntervals.size(),
request.geoSearchWindowMinutes(),
request.vicinityWindowMinutes(),
request.vehicleEvidenceLookbackHours(),
enrichedIntervals,
notes(request)
);
}
EnrichedDtiIntervalDto enrichInterval(
PureDtiInterval interval,
List<EsperSupportEventDto> supportEvents,
List<ResolvedVehicleUsageInterval> vehicleUsageIntervals,
EsperDtiEnrichmentRequest request
) {
DtiBoundaryVehicleDto beginVehicle = resolveBoundaryVehicle(interval.startedAt(), vehicleUsageIntervals);
DtiBoundaryVehicleDto endVehicle = resolveBoundaryVehicle(interval.endedAt(), vehicleUsageIntervals);
DtiBoundaryPositionDto beginPosition = resolveBoundaryPosition(
interval.startedAt(),
beginVehicle,
supportEvents,
request.geoSearchWindowMinutes()
);
DtiBoundaryPositionDto endPosition = resolveBoundaryPosition(
interval.endedAt(),
endVehicle,
supportEvents,
request.geoSearchWindowMinutes()
);
List<DtiBoundaryVicinityEventDto> beginVicinity = resolveBoundaryVicinityEvents(
interval.startedAt(),
supportEvents,
request.vicinityWindowMinutes()
);
List<DtiBoundaryVicinityEventDto> endVicinity = resolveBoundaryVicinityEvents(
interval.endedAt(),
supportEvents,
request.vicinityWindowMinutes()
);
return new EnrichedDtiIntervalDto(
interval.dtiId(),
interval.driverId(),
interval.intervalKind(),
interval.startedAt(),
interval.endedAt(),
interval.durationSeconds(),
interval.operatingPeriodNo(),
interval.operatingPeriodStartedAt(),
interval.previousDrivingSourceRowId(),
interval.nextDrivingSourceRowId(),
beginVehicle,
endVehicle,
beginPosition,
endPosition,
beginVicinity,
endVicinity
);
}
List<PureDtiInterval> extractPureDtiIntervals(EsperOperatingPeriodResultDto result) {
if (result == null || result.operatingPeriods() == null || result.operatingPeriods().isEmpty()) {
return List.of();
}
List<PureDtiInterval> intervals = new ArrayList<>();
for (OperatingPeriodDto operatingPeriod : result.operatingPeriods()) {
if (operatingPeriod.drivingTimeInterruptionEvaluation() == null
|| operatingPeriod.drivingTimeInterruptionEvaluation().interruptionsBetweenSignificantDrivingPeriods() == null) {
continue;
}
if (operatingPeriod.drivingTimeInterruptionEvaluation().departureAt() != null
&& operatingPeriod.startedAt().isBefore(operatingPeriod.drivingTimeInterruptionEvaluation().departureAt())) {
intervals.add(new PureDtiInterval(
"DTI-PRE-" + operatingPeriod.operatingPeriodNo() + "-" + operatingPeriod.startedAt().toInstant().toEpochMilli(),
result.driverId(),
"BEFORE_FIRST_SIGNIFICANT_DRIVING",
operatingPeriod.startedAt(),
operatingPeriod.drivingTimeInterruptionEvaluation().departureAt(),
Duration.between(operatingPeriod.startedAt(), operatingPeriod.drivingTimeInterruptionEvaluation().departureAt()).getSeconds(),
operatingPeriod.operatingPeriodNo(),
operatingPeriod.startedAt(),
null,
operatingPeriod.drivingTimeInterruptionEvaluation().firstSignificantDrivingPeriod() == null
? null
: operatingPeriod.drivingTimeInterruptionEvaluation().firstSignificantDrivingPeriod().sourceRowId()
));
}
operatingPeriod.drivingTimeInterruptionEvaluation().interruptionsBetweenSignificantDrivingPeriods()
.forEach(interruption -> intervals.add(new PureDtiInterval(
"DTI-" + operatingPeriod.operatingPeriodNo() + "-" + interruption.from().toInstant().toEpochMilli(),
result.driverId(),
"BETWEEN_SIGNIFICANT_DRIVING",
interruption.from(),
interruption.to(),
interruption.durationSeconds(),
operatingPeriod.operatingPeriodNo(),
operatingPeriod.startedAt(),
interruption.previousDrivingSourceRowId(),
interruption.nextDrivingSourceRowId()
)));
if (operatingPeriod.drivingTimeInterruptionEvaluation().arrivalAt() != null
&& operatingPeriod.drivingTimeInterruptionEvaluation().arrivalAt().isBefore(operatingPeriod.endedAt())) {
intervals.add(new PureDtiInterval(
"DTI-POST-" + operatingPeriod.operatingPeriodNo() + "-" + operatingPeriod.drivingTimeInterruptionEvaluation().arrivalAt().toInstant().toEpochMilli(),
result.driverId(),
"AFTER_LAST_SIGNIFICANT_DRIVING",
operatingPeriod.drivingTimeInterruptionEvaluation().arrivalAt(),
operatingPeriod.endedAt(),
Duration.between(operatingPeriod.drivingTimeInterruptionEvaluation().arrivalAt(), operatingPeriod.endedAt()).getSeconds(),
operatingPeriod.operatingPeriodNo(),
operatingPeriod.startedAt(),
operatingPeriod.drivingTimeInterruptionEvaluation().lastSignificantDrivingPeriod() == null
? null
: operatingPeriod.drivingTimeInterruptionEvaluation().lastSignificantDrivingPeriod().sourceRowId(),
null
));
}
}
return intervals;
}
List<ResolvedVehicleUsageInterval> buildVehicleUsageIntervals(List<EsperSupportEventDto> supportEvents) {
record VehicleIntervalSeed(
EsperSupportEventDto insertEvent,
EsperSupportEventDto withdrawEvent
) {
}
java.util.Map<String, VehicleIntervalSeed> bySourceRow = new java.util.LinkedHashMap<>();
supportEvents.stream()
.filter(event -> "DRIVER_CARD".equals(event.eventDomain()))
.filter(event -> VEHICLE_INTERVAL_EXTRACTION_CODES.contains(event.extractionCode()))
.filter(event -> "CARD_INSERTED".equals(event.eventType()) || "CARD_WITHDRAWN".equals(event.eventType()))
.filter(event -> event.sourceRowId() != null)
.forEach(event -> {
String key = event.extractionCode() + ":" + event.sourceRowId();
VehicleIntervalSeed current = bySourceRow.get(key);
EsperSupportEventDto insert = current == null ? null : current.insertEvent();
EsperSupportEventDto withdraw = current == null ? null : current.withdrawEvent();
if ("CARD_INSERTED".equals(event.eventType())) {
if (insert == null || event.occurredAt().isBefore(insert.occurredAt())) {
insert = event;
}
} else if (withdraw == null || event.occurredAt().isAfter(withdraw.occurredAt())) {
withdraw = event;
}
bySourceRow.put(key, new VehicleIntervalSeed(insert, withdraw));
});
List<ResolvedVehicleUsageInterval> result = new ArrayList<>();
for (VehicleIntervalSeed seed : bySourceRow.values()) {
EsperSupportEventDto anchor = seed.insertEvent() != null ? seed.insertEvent() : seed.withdrawEvent();
if (anchor == null || anchor.vehicleId() == null) {
continue;
}
OffsetDateTime startedAt = seed.insertEvent() == null ? null : seed.insertEvent().occurredAt();
OffsetDateTime endedAt = seed.withdrawEvent() == null ? null : seed.withdrawEvent().occurredAt();
if (startedAt == null && endedAt == null) {
continue;
}
result.add(new ResolvedVehicleUsageInterval(
anchor.driverId(),
anchor.driverCardId(),
anchor.vehicleId(),
anchor.vehicleRegistrationId(),
anchor.extractionCode(),
startedAt,
endedAt,
sourcePriority(anchor.extractionCode()),
sourceConfidence(anchor.extractionCode()),
collectSourceRowIds(seed.insertEvent(), seed.withdrawEvent())
));
}
return result.stream()
.sorted(Comparator.comparing(ResolvedVehicleUsageInterval::startedAt, Comparator.nullsFirst(Comparator.naturalOrder()))
.thenComparing(ResolvedVehicleUsageInterval::endedAt, Comparator.nullsLast(Comparator.naturalOrder())))
.toList();
}
List<ResolvedVehicleUsageInterval> mergeVehicleUsageIntervals(List<ResolvedVehicleUsageInterval> intervals) {
if (intervals.isEmpty()) {
return List.of();
}
List<ResolvedVehicleUsageInterval> sorted = intervals.stream()
.sorted(Comparator.comparing(ResolvedVehicleUsageInterval::startedAt, Comparator.nullsFirst(Comparator.naturalOrder()))
.thenComparing(ResolvedVehicleUsageInterval::endedAt, Comparator.nullsLast(Comparator.naturalOrder())))
.toList();
List<ResolvedVehicleUsageInterval> merged = new ArrayList<>();
ResolvedVehicleUsageInterval current = null;
for (ResolvedVehicleUsageInterval next : sorted) {
if (current == null) {
current = next;
continue;
}
if (canMerge(current, next)) {
current = current.merge(next);
} else {
merged.add(current);
current = next;
}
}
if (current != null) {
merged.add(current);
}
return merged;
}
List<EsperSupportEventDto> condenseSupportEvents(List<EsperSupportEventDto> supportEvents) {
if (supportEvents == null || supportEvents.isEmpty()) {
return List.of();
}
record VehicleIntervalSeed(
EsperSupportEventDto insertEvent,
EsperSupportEventDto withdrawEvent
) {
}
java.util.Map<String, VehicleIntervalSeed> seedsByKey = new java.util.LinkedHashMap<>();
supportEvents.stream()
.filter(event -> "DRIVER_CARD".equals(event.eventDomain()))
.filter(event -> "CARD_VEHICLES_USED".equals(event.extractionCode()))
.filter(event -> "CARD_INSERTED".equals(event.eventType()) || "CARD_WITHDRAWN".equals(event.eventType()))
.filter(event -> event.sourceRowId() != null)
.forEach(event -> {
String key = event.extractionCode() + ":" + event.sourceRowId();
VehicleIntervalSeed current = seedsByKey.get(key);
EsperSupportEventDto insert = current == null ? null : current.insertEvent();
EsperSupportEventDto withdraw = current == null ? null : current.withdrawEvent();
if ("CARD_INSERTED".equals(event.eventType())) {
if (insert == null || event.occurredAt().isBefore(insert.occurredAt())) {
insert = event;
}
} else if (withdraw == null || event.occurredAt().isAfter(withdraw.occurredAt())) {
withdraw = event;
}
seedsByKey.put(key, new VehicleIntervalSeed(insert, withdraw));
});
List<ResolvedVehicleUsageInterval> mergedCardVehicleIntervals = mergeVehicleUsageIntervals(
buildVehicleUsageIntervals(supportEvents).stream()
.filter(interval -> "CARD_VEHICLES_USED".equals(interval.authoritativeSource()))
.toList()
);
Set<UUID> keptDriverCardEventIds = new LinkedHashSet<>();
for (ResolvedVehicleUsageInterval interval : mergedCardVehicleIntervals) {
EsperSupportEventDto earliestInsert = null;
EsperSupportEventDto latestWithdraw = null;
for (String sourceRowId : interval.sourceRowIds()) {
VehicleIntervalSeed seed = seedsByKey.get("CARD_VEHICLES_USED:" + sourceRowId);
if (seed == null) {
continue;
}
if (seed.insertEvent() != null
&& (earliestInsert == null || seed.insertEvent().occurredAt().isBefore(earliestInsert.occurredAt()))) {
earliestInsert = seed.insertEvent();
}
if (seed.withdrawEvent() != null
&& (latestWithdraw == null || seed.withdrawEvent().occurredAt().isAfter(latestWithdraw.occurredAt()))) {
latestWithdraw = seed.withdrawEvent();
}
}
if (earliestInsert != null) {
keptDriverCardEventIds.add(earliestInsert.eventId());
}
if (latestWithdraw != null) {
keptDriverCardEventIds.add(latestWithdraw.eventId());
}
}
return supportEvents.stream()
.filter(event -> !("DRIVER_CARD".equals(event.eventDomain())
&& "CARD_VEHICLES_USED".equals(event.extractionCode())
&& !keptDriverCardEventIds.contains(event.eventId())))
.sorted(Comparator.comparing(EsperSupportEventDto::occurredAt)
.thenComparing(EsperSupportEventDto::lifecycle, Comparator.nullsLast(String::compareTo))
.thenComparing(EsperSupportEventDto::eventDomain, Comparator.nullsLast(String::compareTo))
.thenComparing(EsperSupportEventDto::eventType, Comparator.nullsLast(String::compareTo)))
.toList();
}
DtiBoundaryVehicleDto resolveBoundaryVehicle(
OffsetDateTime boundary,
List<ResolvedVehicleUsageInterval> intervals
) {
return intervals.stream()
.map(interval -> intervalCandidate(boundary, interval))
.filter(Objects::nonNull)
.sorted(Comparator.comparing(VehicleBoundaryCandidate::score).reversed()
.thenComparing(VehicleBoundaryCandidate::deltaSeconds)
.thenComparing(candidate -> candidate.interval.startedAt(), Comparator.nullsLast(Comparator.reverseOrder())))
.map(candidate -> new DtiBoundaryVehicleDto(
candidate.interval.vehicleId(),
candidate.interval.vehicleRegistrationId(),
null,
candidate.interval.authoritativeSource(),
candidate.interval.confidence(),
candidate.interval.startedAt(),
candidate.interval.endedAt(),
candidate.interval.sourceRowIds()
))
.findFirst()
.orElse(null);
}
DtiBoundaryPositionDto resolveBoundaryPosition(
OffsetDateTime boundary,
DtiBoundaryVehicleDto boundaryVehicle,
List<EsperSupportEventDto> supportEvents,
int searchWindowMinutes
) {
long maxDeltaSeconds = Duration.ofMinutes(searchWindowMinutes).getSeconds();
return supportEvents.stream()
.filter(this::hasPosition)
.filter(event -> Math.abs(Duration.between(boundary, event.occurredAt()).getSeconds()) <= maxDeltaSeconds)
.sorted(Comparator
.comparing((EsperSupportEventDto event) -> vehicleMatch(event, boundaryVehicle)).reversed()
.thenComparing(event -> Math.abs(Duration.between(boundary, event.occurredAt()).getSeconds()))
.thenComparing((EsperSupportEventDto event) -> geoDomainPriority(event.eventDomain()), Comparator.reverseOrder())
.thenComparing(EsperSupportEventDto::occurredAt, Comparator.reverseOrder()))
.map(event -> toBoundaryPosition(boundary, boundaryVehicle, event))
.findFirst()
.orElse(null);
}
List<DtiBoundaryVicinityEventDto> resolveBoundaryVicinityEvents(
OffsetDateTime boundary,
List<EsperSupportEventDto> supportEvents,
int vicinityWindowMinutes
) {
long maxDeltaSeconds = Duration.ofMinutes(vicinityWindowMinutes).getSeconds();
return supportEvents.stream()
.filter(event -> Math.abs(Duration.between(boundary, event.occurredAt()).getSeconds()) <= maxDeltaSeconds)
.sorted(Comparator
.comparing((EsperSupportEventDto event) -> Math.abs(Duration.between(boundary, event.occurredAt()).getSeconds()))
.thenComparing((EsperSupportEventDto event) -> supportEventPriority(event.eventDomain()), Comparator.reverseOrder())
.thenComparing(EsperSupportEventDto::occurredAt, Comparator.reverseOrder()))
.limit(20)
.map(event -> new DtiBoundaryVicinityEventDto(
event.occurredAt(),
event.eventDomain(),
event.eventType(),
event.lifecycle(),
event.sourceKind(),
event.extractionCode(),
event.vehicleId(),
event.vehicleRegistrationId(),
toGeoPoint(event),
event.country(),
event.region(),
event.countryFrom(),
event.countryTo(),
event.operation(),
Math.abs(Duration.between(boundary, event.occurredAt()).getSeconds()),
event.sourceRowId()
))
.toList();
}
private VehicleBoundaryCandidate intervalCandidate(OffsetDateTime boundary, ResolvedVehicleUsageInterval interval) {
if (interval.startedAt() == null) {
return null;
}
boolean covers = !interval.startedAt().isAfter(boundary) && (interval.endedAt() == null || interval.endedAt().isAfter(boundary) || interval.endedAt().isEqual(boundary));
long deltaSeconds;
int baseScore;
if (covers) {
deltaSeconds = 0;
baseScore = interval.priority() + 200;
} else if (interval.endedAt() != null && interval.endedAt().isBefore(boundary)) {
deltaSeconds = Duration.between(interval.endedAt(), boundary).getSeconds();
baseScore = interval.priority();
} else {
deltaSeconds = Math.abs(Duration.between(boundary, interval.startedAt()).getSeconds());
baseScore = interval.priority() - 25;
}
if (deltaSeconds > Duration.ofHours(24).getSeconds()) {
return null;
}
return new VehicleBoundaryCandidate(interval, baseScore, deltaSeconds);
}
private boolean canMerge(ResolvedVehicleUsageInterval left, ResolvedVehicleUsageInterval right) {
if (!Objects.equals(left.driverId(), right.driverId())) {
return false;
}
if (!Objects.equals(left.vehicleId(), right.vehicleId())) {
return false;
}
boolean midnightCardVehiclesUsedContinuation =
"CARD_VEHICLES_USED".equals(left.authoritativeSource())
&& "CARD_VEHICLES_USED".equals(right.authoritativeSource())
&& left.endedAt() != null
&& right.startedAt() != null
&& Duration.between(left.endedAt(), right.startedAt()).getSeconds() == 1;
if (!Objects.equals(left.vehicleRegistrationId(), right.vehicleRegistrationId())
&& !midnightCardVehiclesUsedContinuation) {
return false;
}
if (left.endedAt() == null || right.startedAt() == null) {
return true;
}
return !right.startedAt().isAfter(left.endedAt().plusSeconds(60));
}
private boolean hasPosition(EsperSupportEventDto event) {
return event.latitude() != null && event.longitude() != null;
}
private boolean vehicleMatch(EsperSupportEventDto event, DtiBoundaryVehicleDto boundaryVehicle) {
if (boundaryVehicle == null) {
return false;
}
return Objects.equals(event.vehicleId(), boundaryVehicle.vehicleId())
|| (boundaryVehicle.vehicleRegistrationId() != null
&& Objects.equals(event.vehicleRegistrationId(), boundaryVehicle.vehicleRegistrationId()));
}
private DtiBoundaryPositionDto toBoundaryPosition(
OffsetDateTime boundary,
DtiBoundaryVehicleDto boundaryVehicle,
EsperSupportEventDto event
) {
int confidence = geoDomainPriority(event.eventDomain()) + (vehicleMatch(event, boundaryVehicle) ? 100 : 0);
return new DtiBoundaryPositionDto(
event.occurredAt(),
toGeoPoint(event),
event.eventDomain(),
event.eventType(),
event.sourceKind(),
event.extractionCode(),
event.vehicleId(),
event.vehicleRegistrationId(),
event.country(),
event.region(),
event.countryFrom(),
event.countryTo(),
event.operation(),
Math.abs(Duration.between(boundary, event.occurredAt()).getSeconds()),
confidence,
event.sourceRowId()
);
}
private GeoPointDto toGeoPoint(EsperSupportEventDto event) {
if (!hasPosition(event)) {
return null;
}
return new GeoPointDto(event.latitude(), event.longitude());
}
private int geoDomainPriority(String eventDomain) {
return switch (eventDomain) {
case "POSITION" -> 400;
case "PLACE" -> 350;
case "BORDER_CROSSING" -> 300;
case "LOAD_UNLOAD" -> 250;
case "SPECIFIC_CONDITION" -> 200;
default -> 100;
};
}
private int supportEventPriority(String eventDomain) {
return switch (eventDomain) {
case "DRIVER_CARD" -> 500;
case "POSITION" -> 450;
case "PLACE" -> 400;
case "BORDER_CROSSING" -> 350;
case "LOAD_UNLOAD" -> 300;
case "SPECIFIC_CONDITION" -> 250;
case "SPEEDING" -> 200;
default -> 100;
};
}
private int sourcePriority(String extractionCode) {
return "IW_CYCLE".equals(extractionCode) ? 1000 : "CARD_VEHICLES_USED".equals(extractionCode) ? 900 : 100;
}
private int sourceConfidence(String extractionCode) {
return "IW_CYCLE".equals(extractionCode) ? 100 : "CARD_VEHICLES_USED".equals(extractionCode) ? 90 : 50;
}
private List<String> collectSourceRowIds(EsperSupportEventDto first, EsperSupportEventDto second) {
Set<String> ids = new LinkedHashSet<>();
if (first != null && first.sourceRowId() != null) {
ids.add(first.sourceRowId());
}
if (second != null && second.sourceRowId() != null) {
ids.add(second.sourceRowId());
}
return List.copyOf(ids);
}
private List<String> notes(EsperDtiEnrichmentRequest request) {
return List.of(
"Pure DTI intervals come from operating-period driving interruption evaluation, including before-first and after-last significant driving intervals.",
"Vehicle evidence prefers IW_CYCLE over CARD_VEHICLES_USED when both describe the same time span.",
"Boundary geo selection prefers POSITION, then PLACE, then BORDER_CROSSING and LOAD_UNLOAD.",
"Vicinity events include non-activity tachograph events around each DTI boundary.",
"Vehicle lookback window is " + request.vehicleEvidenceLookbackHours() + " hours."
);
}
record ResolvedVehicleUsageInterval(
UUID driverId,
UUID driverCardId,
UUID vehicleId,
UUID vehicleRegistrationId,
String authoritativeSource,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
int priority,
int confidence,
List<String> sourceRowIds
) {
ResolvedVehicleUsageInterval merge(ResolvedVehicleUsageInterval other) {
OffsetDateTime mergedStart = startedAt == null ? other.startedAt : other.startedAt == null ? startedAt
: startedAt.isBefore(other.startedAt) ? startedAt : other.startedAt;
OffsetDateTime mergedEnd;
if (endedAt == null || other.endedAt == null) {
mergedEnd = null;
} else {
mergedEnd = endedAt.isAfter(other.endedAt) ? endedAt : other.endedAt;
}
String source = priority >= other.priority ? authoritativeSource : other.authoritativeSource;
int mergedPriority = Math.max(priority, other.priority);
int mergedConfidence = Math.max(confidence, other.confidence);
LinkedHashSet<String> mergedIds = new LinkedHashSet<>(sourceRowIds);
mergedIds.addAll(other.sourceRowIds);
return new ResolvedVehicleUsageInterval(
driverId,
driverCardId != null ? driverCardId : other.driverCardId,
vehicleId,
vehicleRegistrationId != null ? vehicleRegistrationId : other.vehicleRegistrationId,
source,
mergedStart,
mergedEnd,
mergedPriority,
mergedConfidence,
List.copyOf(mergedIds)
);
}
}
record VehicleBoundaryCandidate(
ResolvedVehicleUsageInterval interval,
int score,
long deltaSeconds
) {
}
record PureDtiInterval(
String dtiId,
UUID driverId,
String intervalKind,
OffsetDateTime startedAt,
OffsetDateTime endedAt,
long durationSeconds,
long operatingPeriodNo,
OffsetDateTime operatingPeriodStartedAt,
String previousDrivingSourceRowId,
String nextDrivingSourceRowId
) {
}
}

View File

@ -6,6 +6,7 @@ import at.procon.eventhub.esperpoc.dto.DrivingInterruptionDto;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
import at.procon.eventhub.esperpoc.dto.EsperSourceSelectionMode;
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
import at.procon.eventhub.esperpoc.dto.NonDrivingIntervalDto;
import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto;
@ -68,6 +69,7 @@ public class EsperOperatingPeriodEvaluationService {
Duration significantDrivingThreshold = Duration.ofMinutes(resolveSignificantDrivingMinutes(request));
Duration mergeGapTolerance = Duration.ofSeconds(resolveMergeGapSeconds(request));
Duration gapDetectionTolerance = Duration.ofSeconds(resolveGapDetectionToleranceSeconds(request));
EsperSourceSelectionMode sourceSelectionMode = resolveSourceSelectionMode(request);
EsperUnknownTreatmentMode unknownTreatmentMode = resolveUnknownTreatmentMode(request);
EsperOperatingPeriodEngineMode engineMode = resolveEngineMode(request);
@ -82,9 +84,14 @@ public class EsperOperatingPeriodEvaluationService {
List<RawActivityEventDto> driverCardRawEvents = rawEvents.stream()
.filter(event -> "DRIVER_CARD".equals(event.sourceKind()))
.toList();
List<RawActivityEventDto> vehicleUnitRawEvents = rawEvents.stream()
List<RawActivityEventDto> vehicleUnitRawEvents = sourceSelectionMode == EsperSourceSelectionMode.DRIVER_CARD_ONLY
? List.of()
: rawEvents.stream()
.filter(event -> "VEHICLE_UNIT".equals(event.sourceKind()))
.toList();
List<RawActivityEventDto> selectedRawEvents = sourceSelectionMode == EsperSourceSelectionMode.DRIVER_CARD_ONLY
? driverCardRawEvents
: rawEvents;
long cardIntervalsStartedNanos = System.nanoTime();
List<ActivityIntervalDto> driverCardRawIntervals = activityEngine.buildIntervals(driverCardRawEvents);
@ -94,7 +101,9 @@ public class EsperOperatingPeriodEvaluationService {
long vuIntervalsElapsedMs = elapsedMillis(vuIntervalsStartedNanos);
long vuGapFillStartedNanos = System.nanoTime();
List<ActivityIntervalDto> resolvedKnownLoadedIntervals = resolveVuFillGaps(driverCardRawIntervals, vehicleUnitRawIntervals);
List<ActivityIntervalDto> resolvedKnownLoadedIntervals = sourceSelectionMode == EsperSourceSelectionMode.DRIVER_CARD_ONLY
? driverCardRawIntervals
: resolveVuFillGaps(driverCardRawIntervals, vehicleUnitRawIntervals);
long vuGapFillElapsedMs = elapsedMillis(vuGapFillStartedNanos);
long unknownGapStartedNanos = System.nanoTime();
@ -153,16 +162,17 @@ public class EsperOperatingPeriodEvaluationService {
);
long totalElapsedMs = elapsedMillis(startedNanos);
log.info("Esper operating-period evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} unknownMode={} engineMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedKnownIntervals={} evaluationIntervals={} periodizedIntervals={} mergedIntervals={} nonDrivingIntervals={} operatingPeriods={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, synthUnknown={}, periodizeEsper={}, merge={}, nonDriving={}, total={}}}",
log.info("Esper operating-period evaluation tenant={} driverId={} requestedFrom={} requestedTo={} loadedFrom={} loadedTo={} sourceSelectionMode={} unknownMode={} engineMode={} rawEvents={} cardRawEvents={} vuRawEvents={} cardIntervals={} vuIntervals={} resolvedKnownIntervals={} evaluationIntervals={} periodizedIntervals={} mergedIntervals={} nonDrivingIntervals={} operatingPeriods={} timingsMs={{dbRetrieve={}, cardIntervalEsper={}, vuIntervalEsper={}, vuGapFill={}, synthUnknown={}, periodizeEsper={}, merge={}, nonDriving={}, total={}}}",
request.tenantKey(),
request.driverId(),
requestedFrom,
requestedTo,
loadedFrom,
loadedTo,
sourceSelectionMode,
unknownTreatmentMode,
engineMode,
rawEvents.size(),
selectedRawEvents.size(),
driverCardRawEvents.size(),
vehicleUnitRawEvents.size(),
driverCardRawIntervals.size(),
@ -190,7 +200,7 @@ public class EsperOperatingPeriodEvaluationService {
requestedTo,
loadedFrom,
loadedTo,
rawEvents.size(),
selectedRawEvents.size(),
driverCardRawEvents.size(),
vehicleUnitRawEvents.size(),
driverCardRawIntervals.size(),
@ -205,9 +215,10 @@ public class EsperOperatingPeriodEvaluationService {
resolveSignificantDrivingMinutes(request),
resolveMergeGapSeconds(request),
resolveGapDetectionToleranceSeconds(request),
sourceSelectionMode,
unknownTreatmentMode,
engineMode,
rawEvents,
selectedRawEvents,
resolvedKnownLoadedIntervals,
evaluationLoadedIntervals,
periodizedIntervals,
@ -219,7 +230,8 @@ public class EsperOperatingPeriodEvaluationService {
unknownTreatmentMode,
resolveOperatingSplitIdleHours(request),
resolveSignificantDrivingMinutes(request),
resolveGapDetectionToleranceSeconds(request)
resolveGapDetectionToleranceSeconds(request),
sourceSelectionMode
)
);
}
@ -723,15 +735,21 @@ public class EsperOperatingPeriodEvaluationService {
: properties.getEsperPoc().getOperatingPeriodEvaluation().getEngineMode();
}
private EsperSourceSelectionMode resolveSourceSelectionMode(EsperOperatingPeriodRequest request) {
return request.sourceSelectionMode() == null ? EsperSourceSelectionMode.MIXED : request.sourceSelectionMode();
}
private List<String> notes(
EsperOperatingPeriodEngineMode engineMode,
EsperUnknownTreatmentMode unknownTreatmentMode,
int operatingSplitIdleHours,
int significantDrivingMinutes,
int gapDetectionToleranceSeconds
int gapDetectionToleranceSeconds,
EsperSourceSelectionMode sourceSelectionMode
) {
return List.of(
"This endpoint runs in parallel to the existing working-shift PoC and does not change its semantics.",
"Source selection mode is " + sourceSelectionMode + ".",
"BREAK_REST events are ignored for activity evaluation but still prevent synthetic UNKNOWN intervals from being created over covered rest spans.",
"Synthetic UNKNOWN intervals are created only for uncovered gaps between non-rest activities.",
"UNKNOWN treatment mode is " + unknownTreatmentMode + ".",

View File

@ -2,7 +2,12 @@ package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubEventBatchDto;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventHubPackageResult;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportScopeDto;
@ -10,14 +15,17 @@ import at.procon.eventhub.dto.SourceGroupType;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.service.EventHubIngestionService;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.camel.ProducerTemplate;
@ -40,6 +48,7 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
private static final int PROGRESS_LOG_INTERVAL = 5000;
private final NamedParameterJdbcTemplate jdbcTemplate;
private final EventHubIngestionService ingestionService;
private final ProducerTemplate producerTemplate;
private final ResourceLoader resourceLoader;
private final ImportCursorRepository importCursorRepository;
@ -50,6 +59,7 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
public JdbcYellowFoxD8BookingExtractionBatchExecutor(
@Qualifier("yellowFoxNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate,
EventHubIngestionService ingestionService,
ProducerTemplate producerTemplate,
ResourceLoader resourceLoader,
ImportCursorRepository importCursorRepository,
@ -59,6 +69,7 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector
) {
this.jdbcTemplate = jdbcTemplate;
this.ingestionService = ingestionService;
this.producerTemplate = producerTemplate;
this.resourceLoader = resourceLoader;
this.importCursorRepository = importCursorRepository;
@ -84,12 +95,15 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk);
ImportCursorStateDto cursor = findCursor(eventSourceId, request, planItem);
QuerySpec query = buildQuerySpec(request, chunkScope, cursor);
EventHubPackageRequest packageInfo = packageInfo(importRunId, request, planItem, chunk, chunkScope);
Stats stats = new Stats();
List<EventHubEventDto> pendingEvents = new ArrayList<>(jdbcPersistBatchSize());
YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = ignitionTransitionDetector
.newSession(properties.getYellowFox().isEmitInitialIgnitionSnapshot());
log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={}",
request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), query.fleetId(), request.acquisitionStrategy());
log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={} ingestMode={}",
request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), query.fleetId(),
request.acquisitionStrategy(), jdbcExtractionIngestMode());
jdbcTemplate.query(query.sql(), query.params(), rs -> {
stats.sourceRowsRead++;
@ -105,27 +119,33 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
return;
}
EventHubEventDto primaryEvent = eventMapper.map(booking);
send(primaryEvent, stats);
pendingEvents.add(primaryEvent);
stats.acceptEvent(primaryEvent);
EventHubEventDto ignitionEvent = ignitionSession.detect(booking);
if (ignitionEvent != null) {
send(ignitionEvent, stats);
pendingEvents.add(ignitionEvent);
stats.acceptEvent(ignitionEvent);
}
if (pendingEvents.size() >= jdbcPersistBatchSize()) {
flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats);
}
if (stats.sourceRowsRead % PROGRESS_LOG_INTERVAL == 0) {
log.info("YellowFox D8 extraction progress tenant={} importRunId={} packageId={} rows={} events={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.eventTypeCounts);
log.info("YellowFox D8 extraction progress tenant={} importRunId={} packageId={} rows={} events={} inserted={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsMapped, stats.eventsInserted, stats.eventTypeCounts);
}
});
flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats);
log.info("Finished YellowFox D8 extraction tenant={} importRunId={} packageId={} rows={} events={} skippedRows={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.skippedRows, stats.eventTypeCounts);
log.info("Finished YellowFox D8 extraction tenant={} importRunId={} packageId={} rows={} events={} inserted={} skippedRows={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsMapped, stats.eventsInserted, stats.skippedRows, stats.eventTypeCounts);
return new YellowFoxD8ExtractionBatchResultDto(
packageId,
planItem.extractionCode(),
planItem.sourceKind(),
stats.sourceRowsRead,
stats.eventsSent,
stats.eventsSent,
stats.eventsMapped,
stats.eventsInserted,
stats.skippedRows,
true,
null,
@ -136,6 +156,14 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
);
}
EventHubProperties.JdbcExtractionIngestMode jdbcExtractionIngestMode() {
return properties.getYellowFox().getJdbcExtractionIngestMode();
}
int jdbcPersistBatchSize() {
return Math.max(1, properties.getBatch().getCompletionSize());
}
QuerySpec buildQuerySpec(YellowFoxD8ImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) {
Map<String, Object> params = new HashMap<>();
StringBuilder filters = new StringBuilder("where 1 = 1");
@ -211,9 +239,125 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
|| (booking.vehicleRef() != null && booking.vehicleRef().hasAnyReference());
}
private void send(EventHubEventDto event, Stats stats) {
producerTemplate.sendBody("direct:eventhub-normalized-input", event);
stats.acceptEvent(event);
private void flushPersistBatch(
YellowFoxD8ImportRequest request,
UUID importRunId,
UUID extractionPackageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
EventHubPackageRequest packageInfo,
List<EventHubEventDto> pendingEvents,
Stats stats
) {
if (pendingEvents.isEmpty()) {
return;
}
int batchNo = stats.nextPersistBatchNo();
List<EventHubEventDto> eventsToPersist = List.copyOf(pendingEvents);
pendingEvents.clear();
EventHubEventBatchDto batch = new EventHubEventBatchDto(
packageInfo.externalPackageId() + ":JDBC-" + batchNo,
packageInfo,
DataPackageType.DB_EXTRACT,
occurredFrom(eventsToPersist, chunk),
occurredTo(eventsToPersist, chunk),
eventsToPersist,
persistBatchMetadata(request, importRunId, extractionPackageId, planItem, chunk, batchNo, eventsToPersist)
);
EventHubPackageResult result = persistBatch(batch);
stats.acceptPersistResult(result);
log.info("Persisted YellowFox extraction batch tenant={} importRunId={} extractionPackageId={} extractionCode={} sourceKind={} chunk={} batchNo={} received={} inserted={}",
request.tenantKey(), importRunId, extractionPackageId, planItem.extractionCode(), planItem.sourceKind(),
chunk.sequence(), batchNo, result.receivedCount(), result.insertedCount());
}
EventHubPackageResult persistBatch(EventHubEventBatchDto batch) {
if (jdbcExtractionIngestMode() == EventHubProperties.JdbcExtractionIngestMode.CAMEL_ROUTE) {
return producerTemplate.requestBody("direct:eventhub-batch-persist-input", batch, EventHubPackageResult.class);
}
return ingestionService.ingest(batch);
}
private Map<String, Object> persistBatchMetadata(
YellowFoxD8ImportRequest request,
UUID importRunId,
UUID extractionPackageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
int batchNo,
List<EventHubEventDto> events
) {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("ingestMode", jdbcExtractionIngestMode().name());
metadata.put("importRunId", importRunId.toString());
metadata.put("extractionPackageId", extractionPackageId.toString());
metadata.put("tenantKey", request.tenantKey());
metadata.put("mode", request.mode().name());
metadata.put("acquisitionStrategy", request.acquisitionStrategy().name());
metadata.put("eventFamily", planItem.eventFamily().name());
metadata.put("sourceKind", planItem.sourceKind());
metadata.put("extractionCode", planItem.extractionCode());
metadata.put("entityAxis", planItem.entityAxis());
metadata.put("chunkSequence", chunk.sequence());
metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString());
metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString());
metadata.put("batchNo", batchNo);
metadata.put("receivedEventCount", events.size());
metadata.put("slotPolicy", "eventtype 0/2 = DRIVER, eventtype 1/3 = CO_DRIVER");
metadata.put("ignitionPolicy", "Store ignition state on every D8 detail; emit separate ignition event only on state change.");
return metadata;
}
private OffsetDateTime occurredFrom(List<EventHubEventDto> events, ImportTimeChunkDto chunk) {
if (chunk.occurredFrom() != null) {
return chunk.occurredFrom();
}
return events.stream()
.map(EventHubEventDto::occurredAt)
.filter(value -> value != null)
.min(OffsetDateTime::compareTo)
.orElse(null);
}
private OffsetDateTime occurredTo(List<EventHubEventDto> events, ImportTimeChunkDto chunk) {
if (chunk.occurredTo() != null) {
return chunk.occurredTo();
}
return events.stream()
.map(EventHubEventDto::occurredAt)
.filter(value -> value != null)
.max(OffsetDateTime::compareTo)
.orElse(null);
}
private EventHubPackageRequest packageInfo(
UUID importRunId,
YellowFoxD8ImportRequest request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
ImportScopeDto chunkScope
) {
return new EventHubPackageRequest(
request.tenantKey(),
eventSourceFor(request, planItem),
request.sourceGroup(),
chunkScope,
planItem.eventFamily().name(),
chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(),
"YELLOWFOX_D8:" + planItem.sourceKind() + ":" + planItem.extractionCode() + ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence()
);
}
private EventSourceDto eventSourceFor(YellowFoxD8ImportRequest request, ImportPlanItemDto planItem) {
return new EventSourceDto(
"YELLOWFOX",
planItem.sourceKind(),
"YELLOWFOX_D8",
request.eventSource().sourceInstanceKey(),
request.eventSource().tenantProviderSettingKey(),
request.eventSource().externalFleetKey()
);
}
private void appendCursorFilter(
@ -282,11 +426,13 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
private static class Stats {
private int sourceRowsRead;
private int eventsSent;
private int eventsMapped;
private int eventsInserted;
private int skippedRows;
private OffsetDateTime lastOccurredAt;
private String lastEventId;
private final Map<String, Integer> eventTypeCounts = new LinkedHashMap<>();
private int persistBatchNo;
private void acceptSourceRow(YellowFoxD8BookingDto booking) {
if (booking.occurredAt() != null) {
@ -296,10 +442,21 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
}
private void acceptEvent(EventHubEventDto event) {
eventsSent++;
eventsMapped++;
EventType type = event.eventType();
String key = event.eventDomain().name() + "/" + (type == null ? "UNKNOWN" : type.name());
eventTypeCounts.merge(key, 1, Integer::sum);
}
private int nextPersistBatchNo() {
persistBatchNo++;
return persistBatchNo;
}
private void acceptPersistResult(EventHubPackageResult result) {
if (result != null) {
eventsInserted += result.insertedCount();
}
}
}
}

View File

@ -80,7 +80,7 @@ eventhub:
import-plans:
- plan-key: tachograph-org-14708
enabled: false
cron: "0 15 * * * *" # hourly at minute 15
cron: "" #"0 15 * * * *" # hourly at minute 15
tenant-key: Procon
event-source:
provider-key: TACHOGRAPH
@ -113,13 +113,13 @@ eventhub:
- SPECIFIC_CONDITION
- SPEEDING
initial-mode: INITIAL_BACKFILL
scheduled-mode: INCREMENTAL_UPDATE
scheduled-mode: INITIAL_BACKFILL # INITIAL_BACKFILL INCREMENTAL_UPDATE
initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP
scheduled-strategy: SOURCE_PACKAGE_WATERMARK
refresh-master-data-first: false
refresh-master-data-first: true
initial-occurred-from: "2026-04-01T00:00:00+01:00"
initial-occurred-to: "2026-04-10T00:00:00+01:00"
run-initial-on-startup: false
run-initial-on-startup: true
esper-poc:
activity-merge-mode: JAVA
@ -138,20 +138,25 @@ eventhub:
emit-initial-ignition-snapshot: false
sync-vehicle-registrations-on-master-data-update: false
# JDBC extraction handoff mode:
# SYNC_DIRECT = persist controlled JDBC batches directly, cursor-safe default.
# CAMEL_ROUTE = persist the same controlled batches through direct:eventhub-batch-persist-input.
jdbc-extraction-ingest-mode: ${YELLOWFOX_JDBC_EXTRACTION_INGEST_MODE:SYNC_DIRECT}
datasource:
jdbc-url: ${YELLOWFOX_DB_JDBC_URL:}
username: ${YELLOWFOX_DB_USERNAME:}
password: ${YELLOWFOX_DB_PASSWORD:}
driver-class-name: org.postgresql.Driver
scheduler-enabled: true
scheduler-enabled: false
scheduler-poll-interval-ms: 60000
scheduler-trigger-mode: EXECUTE
import-plans:
- plan-key: yellowfox-d8-default
enabled: true
cron: "0 */5 * * * *"
enabled: false
cron: "" #"0 */5 * * * *"
tenant-key: Procon
event-source:
provider-key: YELLOWFOX

View File

@ -0,0 +1,59 @@
package at.procon.eventhub.esperpoc.api;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import at.procon.eventhub.esperpoc.dto.EsperDtiEnrichmentResultDto;
import at.procon.eventhub.esperpoc.service.EsperDtiEnrichmentService;
import at.procon.eventhub.esperpoc.service.EsperOperatingPeriodEvaluationService;
import at.procon.eventhub.esperpoc.service.EsperPocDriverCardActivityService;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
class EsperPocControllerTest {
@Test
void exposesDtiEnrichmentEndpoint() throws Exception {
EsperPocDriverCardActivityService activityService = org.mockito.Mockito.mock(EsperPocDriverCardActivityService.class);
EsperOperatingPeriodEvaluationService operatingService = org.mockito.Mockito.mock(EsperOperatingPeriodEvaluationService.class);
EsperDtiEnrichmentService enrichmentService = org.mockito.Mockito.mock(EsperDtiEnrichmentService.class);
EsperPocController controller = new EsperPocController(activityService, operatingService, enrichmentService);
MockMvc mockMvc = MockMvcBuilders.standaloneSetup(controller).build();
when(enrichmentService.evaluate(any())).thenReturn(new EsperDtiEnrichmentResultDto(
"default",
UUID.fromString("00000000-0000-0000-0000-000000000123"),
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
OffsetDateTime.parse("2026-04-02T00:00:00Z"),
OffsetDateTime.parse("2026-03-31T00:00:00Z"),
OffsetDateTime.parse("2026-04-03T00:00:00Z"),
OffsetDateTime.parse("2026-03-01T00:00:00Z"),
OffsetDateTime.parse("2026-04-03T00:00:00Z"),
1,
2,
1,
180,
180,
720,
List.of(),
List.of("note")
));
mockMvc.perform(get("/api/eventhub/esper-poc/tachograph/dti-enrichment")
.param("tenantKey", "default")
.param("driverId", "00000000-0000-0000-0000-000000000123")
.param("occurredFrom", "2026-04-01T00:00:00Z")
.param("occurredTo", "2026-04-02T00:00:00Z"))
.andExpect(status().isOk())
.andExpect(jsonPath("$.tenantKey").value("default"))
.andExpect(jsonPath("$.pureDtiCount").value(1))
.andExpect(jsonPath("$.vehicleUsageIntervalCount").value(1));
}
}

View File

@ -0,0 +1,320 @@
package at.procon.eventhub.esperpoc.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.esperpoc.dto.DtiBoundaryPositionDto;
import at.procon.eventhub.esperpoc.dto.DtiBoundaryVehicleDto;
import at.procon.eventhub.esperpoc.dto.DtiBoundaryVicinityEventDto;
import at.procon.eventhub.esperpoc.dto.DrivingInterruptionDto;
import at.procon.eventhub.esperpoc.dto.EnrichedDtiIntervalDto;
import at.procon.eventhub.esperpoc.dto.EsperDtiEnrichmentRequest;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodResultDto;
import at.procon.eventhub.esperpoc.dto.EsperSourceSelectionMode;
import at.procon.eventhub.esperpoc.dto.EsperSupportEventDto;
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
import at.procon.eventhub.esperpoc.dto.OperatingPeriodDto;
import at.procon.eventhub.esperpoc.dto.ShiftDrivingEvaluationDto;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class EsperDtiEnrichmentServiceTest {
private final EsperDtiEnrichmentService service = new EsperDtiEnrichmentService(null, null);
@Test
void mergesVehicleIntervalsAndPrefersIwCycleAsAuthoritativeSource() {
UUID driverId = UUID.randomUUID();
UUID vehicleId = UUID.randomUUID();
UUID registrationId = UUID.randomUUID();
List<EsperDtiEnrichmentService.ResolvedVehicleUsageInterval> merged = service.mergeVehicleUsageIntervals(
service.buildVehicleUsageIntervals(List.of(
supportEvent(driverId, vehicleId, registrationId, "DRIVER_CARD", "CARD_INSERTED", "INSERT", "CARD_VEHICLES_USED", "cv1", "2026-04-01T08:00:00Z", null, null),
supportEvent(driverId, vehicleId, registrationId, "DRIVER_CARD", "CARD_WITHDRAWN", "WITHDRAW", "CARD_VEHICLES_USED", "cv1", "2026-04-01T12:00:00Z", null, null),
supportEvent(driverId, vehicleId, registrationId, "DRIVER_CARD", "CARD_INSERTED", "INSERT", "IW_CYCLE", "iw1", "2026-04-01T08:05:00Z", null, null),
supportEvent(driverId, vehicleId, registrationId, "DRIVER_CARD", "CARD_WITHDRAWN", "WITHDRAW", "IW_CYCLE", "iw1", "2026-04-01T11:55:00Z", null, null)
))
);
assertThat(merged).hasSize(1);
assertThat(merged.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T08:00:00Z"));
assertThat(merged.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T12:00:00Z"));
assertThat(merged.get(0).authoritativeSource()).isEqualTo("IW_CYCLE");
assertThat(merged.get(0).sourceRowIds()).containsExactly("cv1", "iw1");
}
@Test
void mergesCardVehiclesUsedAcrossMidnightBoundaryWhenCardRemainsInVehicle() {
UUID driverId = UUID.randomUUID();
UUID vehicleId = UUID.randomUUID();
UUID firstRegistrationId = UUID.randomUUID();
UUID secondRegistrationId = UUID.randomUUID();
List<EsperDtiEnrichmentService.ResolvedVehicleUsageInterval> merged = service.mergeVehicleUsageIntervals(
service.buildVehicleUsageIntervals(List.of(
supportEvent(driverId, vehicleId, firstRegistrationId, "DRIVER_CARD", "CARD_INSERTED", "INSERT", "CARD_VEHICLES_USED", "cv1", "2026-04-01T08:00:00Z", null, null),
supportEvent(driverId, vehicleId, firstRegistrationId, "DRIVER_CARD", "CARD_WITHDRAWN", "WITHDRAW", "CARD_VEHICLES_USED", "cv1", "2026-04-01T23:59:59Z", null, null),
supportEvent(driverId, vehicleId, secondRegistrationId, "DRIVER_CARD", "CARD_INSERTED", "INSERT", "CARD_VEHICLES_USED", "cv2", "2026-04-02T00:00:00Z", null, null),
supportEvent(driverId, vehicleId, secondRegistrationId, "DRIVER_CARD", "CARD_WITHDRAWN", "WITHDRAW", "CARD_VEHICLES_USED", "cv2", "2026-04-02T12:00:00Z", null, null)
))
);
assertThat(merged).hasSize(1);
assertThat(merged.get(0).authoritativeSource()).isEqualTo("CARD_VEHICLES_USED");
assertThat(merged.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T08:00:00Z"));
assertThat(merged.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-02T12:00:00Z"));
assertThat(merged.get(0).sourceRowIds()).containsExactly("cv1", "cv2");
}
@Test
void condensesConsecutiveCardVehiclesUsedSupportEventsToMergedBoundaryEvents() {
UUID driverId = UUID.randomUUID();
UUID vehicleId = UUID.randomUUID();
UUID firstRegistrationId = UUID.randomUUID();
UUID secondRegistrationId = UUID.randomUUID();
List<EsperSupportEventDto> condensed = service.condenseSupportEvents(List.of(
supportEvent(driverId, vehicleId, firstRegistrationId, "DRIVER_CARD", "CARD_INSERTED", "INSERT", "CARD_VEHICLES_USED", "cv1", "2026-04-01T08:00:00Z", null, null),
supportEvent(driverId, vehicleId, firstRegistrationId, "DRIVER_CARD", "CARD_WITHDRAWN", "WITHDRAW", "CARD_VEHICLES_USED", "cv1", "2026-04-01T23:59:59Z", null, null),
supportEvent(driverId, vehicleId, secondRegistrationId, "DRIVER_CARD", "CARD_INSERTED", "INSERT", "CARD_VEHICLES_USED", "cv2", "2026-04-02T00:00:00Z", null, null),
supportEvent(driverId, vehicleId, secondRegistrationId, "DRIVER_CARD", "CARD_WITHDRAWN", "WITHDRAW", "CARD_VEHICLES_USED", "cv2", "2026-04-02T12:00:00Z", null, null),
supportEvent(driverId, vehicleId, secondRegistrationId, "POSITION", "POSITION_RECORDED", "SNAPSHOT", "CARD_POSITION", "pos1", "2026-04-02T00:05:00Z", "48.2082", "16.3738")
));
assertThat(condensed).hasSize(3);
assertThat(condensed)
.extracting(EsperSupportEventDto::extractionCode, EsperSupportEventDto::eventType, EsperSupportEventDto::sourceRowId)
.containsExactly(
org.assertj.core.groups.Tuple.tuple("CARD_VEHICLES_USED", "CARD_INSERTED", "cv1"),
org.assertj.core.groups.Tuple.tuple("CARD_POSITION", "POSITION_RECORDED", "pos1"),
org.assertj.core.groups.Tuple.tuple("CARD_VEHICLES_USED", "CARD_WITHDRAWN", "cv2")
);
}
@Test
void resolvesBoundaryVehicleGeoAndVicinityFromSupportEvents() {
UUID driverId = UUID.randomUUID();
UUID vehicleId = UUID.randomUUID();
UUID registrationId = UUID.randomUUID();
OffsetDateTime start = OffsetDateTime.parse("2026-04-01T09:00:00Z");
OffsetDateTime end = OffsetDateTime.parse("2026-04-01T10:00:00Z");
List<EsperSupportEventDto> supportEvents = List.of(
supportEvent(driverId, vehicleId, registrationId, "DRIVER_CARD", "CARD_INSERTED", "INSERT", "IW_CYCLE", "iw1", "2026-04-01T08:00:00Z", null, null),
supportEvent(driverId, vehicleId, registrationId, "DRIVER_CARD", "CARD_WITHDRAWN", "WITHDRAW", "IW_CYCLE", "iw1", "2026-04-01T12:00:00Z", null, null),
supportEvent(driverId, vehicleId, registrationId, "POSITION", "POSITION_RECORDED", "SNAPSHOT", "VU_POSITION", "p1", "2026-04-01T09:01:00Z", "48.2082", "16.3738"),
supportEvent(driverId, vehicleId, registrationId, "LOAD_UNLOAD", "LOAD", "SNAPSHOT", "VU_LOAD_UNLOAD", "lu1", "2026-04-01T09:03:00Z", "48.2085", "16.3740"),
supportEvent(driverId, vehicleId, registrationId, "PLACE", "WORKING_DAY_PLACE_RECORDED", "END", "VU_PLACE", "pl1", "2026-04-01T09:59:00Z", "48.2090", "16.3750")
);
List<EsperDtiEnrichmentService.ResolvedVehicleUsageInterval> usageIntervals = service.mergeVehicleUsageIntervals(
service.buildVehicleUsageIntervals(supportEvents)
);
EnrichedDtiIntervalDto enriched = service.enrichInterval(
new EsperDtiEnrichmentService.PureDtiInterval(
"DTI-1",
driverId,
"BETWEEN_SIGNIFICANT_DRIVING",
start,
end,
3600,
1,
OffsetDateTime.parse("2026-04-01T08:00:00Z"),
"d-prev",
"d-next"
),
supportEvents,
usageIntervals,
new EsperDtiEnrichmentRequest(
"default",
driverId,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
OffsetDateTime.parse("2026-04-02T00:00:00Z"),
24,
7,
3,
0,
0,
EsperSourceSelectionMode.MIXED,
null,
null,
24 * 30,
180,
180
)
);
DtiBoundaryVehicleDto beginVehicle = enriched.beginVehicle();
assertThat(beginVehicle).isNotNull();
assertThat(beginVehicle.vehicleId()).isEqualTo(vehicleId);
assertThat(beginVehicle.resolutionSource()).isEqualTo("IW_CYCLE");
assertThat(enriched.intervalKind()).isEqualTo("BETWEEN_SIGNIFICANT_DRIVING");
DtiBoundaryPositionDto beginPosition = enriched.beginPosition();
assertThat(beginPosition).isNotNull();
assertThat(beginPosition.eventDomain()).isEqualTo("POSITION");
assertThat(beginPosition.position().latitude()).isEqualByComparingTo("48.2082");
DtiBoundaryPositionDto endPosition = enriched.endPosition();
assertThat(endPosition).isNotNull();
assertThat(endPosition.eventDomain()).isEqualTo("PLACE");
List<DtiBoundaryVicinityEventDto> beginVicinity = enriched.beginVicinityEvents();
assertThat(beginVicinity).extracting(DtiBoundaryVicinityEventDto::eventDomain)
.contains("DRIVER_CARD", "POSITION", "LOAD_UNLOAD");
}
@Test
void extractsPureDtiFromOperatingPeriodDrivingInterruptions() {
UUID driverId = UUID.randomUUID();
OffsetDateTime requestedFrom = OffsetDateTime.parse("2026-04-01T00:00:00Z");
OffsetDateTime requestedTo = OffsetDateTime.parse("2026-04-02T00:00:00Z");
EsperOperatingPeriodResultDto result = new EsperOperatingPeriodResultDto(
"default",
driverId,
requestedFrom,
requestedTo,
requestedFrom.minusHours(24),
requestedTo.plusHours(24),
0,
0,
0,
0,
0,
0,
0,
0,
0,
1,
1,
7,
3,
0,
0,
EsperSourceSelectionMode.MIXED,
EsperUnknownTreatmentMode.AS_BREAK_REST,
EsperOperatingPeriodEngineMode.STREAM_COLLECTOR,
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(),
List.of(new OperatingPeriodDto(
4,
OffsetDateTime.parse("2026-04-01T06:00:00Z"),
OffsetDateTime.parse("2026-04-01T18:00:00Z"),
12 * 3600L,
"FLUSH",
List.of(),
0,
0,
0,
0,
0,
0,
new ShiftDrivingEvaluationDto(
3,
OffsetDateTime.parse("2026-04-01T08:00:00Z"),
OffsetDateTime.parse("2026-04-01T16:00:00Z"),
activity(driverId, "2026-04-01T08:00:00Z", "2026-04-01T10:00:00Z", "d1"),
activity(driverId, "2026-04-01T14:00:00Z", "2026-04-01T16:00:00Z", "d2"),
List.of(new DrivingInterruptionDto(
OffsetDateTime.parse("2026-04-01T10:00:00Z"),
OffsetDateTime.parse("2026-04-01T14:00:00Z"),
14400,
"d1",
"d2"
))
),
false
)),
List.of("note")
);
List<EsperDtiEnrichmentService.PureDtiInterval> pureDtiIntervals = service.extractPureDtiIntervals(result);
assertThat(pureDtiIntervals).hasSize(3);
assertThat(pureDtiIntervals).extracting(EsperDtiEnrichmentService.PureDtiInterval::intervalKind)
.containsExactly("BEFORE_FIRST_SIGNIFICANT_DRIVING", "BETWEEN_SIGNIFICANT_DRIVING", "AFTER_LAST_SIGNIFICANT_DRIVING");
assertThat(pureDtiIntervals.get(0).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T06:00:00Z"));
assertThat(pureDtiIntervals.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T08:00:00Z"));
assertThat(pureDtiIntervals.get(0).previousDrivingSourceRowId()).isNull();
assertThat(pureDtiIntervals.get(0).nextDrivingSourceRowId()).isEqualTo("d1");
assertThat(pureDtiIntervals.get(1).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T10:00:00Z"));
assertThat(pureDtiIntervals.get(1).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T14:00:00Z"));
assertThat(pureDtiIntervals.get(1).operatingPeriodNo()).isEqualTo(4);
assertThat(pureDtiIntervals.get(1).previousDrivingSourceRowId()).isEqualTo("d1");
assertThat(pureDtiIntervals.get(1).nextDrivingSourceRowId()).isEqualTo("d2");
assertThat(pureDtiIntervals.get(2).startedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T16:00:00Z"));
assertThat(pureDtiIntervals.get(2).endedAt()).isEqualTo(OffsetDateTime.parse("2026-04-01T18:00:00Z"));
assertThat(pureDtiIntervals.get(2).previousDrivingSourceRowId()).isEqualTo("d2");
assertThat(pureDtiIntervals.get(2).nextDrivingSourceRowId()).isNull();
}
private at.procon.eventhub.esperpoc.dto.ActivityIntervalDto activity(
UUID driverId,
String from,
String to,
String sourceRowId
) {
return at.procon.eventhub.esperpoc.dto.ActivityIntervalDto.raw(
driverId,
null,
null,
"DRIVE",
"DRIVER",
"INSERTED",
"KNOWN",
"DRIVER_CARD",
OffsetDateTime.parse(from),
OffsetDateTime.parse(to),
sourceRowId
);
}
private EsperSupportEventDto supportEvent(
UUID driverId,
UUID vehicleId,
UUID registrationId,
String eventDomain,
String eventType,
String lifecycle,
String extractionCode,
String sourceRowId,
String occurredAt,
String latitude,
String longitude
) {
return new EsperSupportEventDto(
UUID.randomUUID(),
OffsetDateTime.parse(occurredAt),
sourceRowId,
extractionCode + ":" + sourceRowId + ":" + eventType,
extractionCode.startsWith("VU_") || "IW_CYCLE".equals(extractionCode) ? "VEHICLE_UNIT" : "DRIVER_CARD",
extractionCode,
driverId,
UUID.randomUUID(),
vehicleId,
registrationId,
eventDomain,
eventType,
lifecycle,
"DRIVER",
latitude == null ? null : new BigDecimal(latitude),
longitude == null ? null : new BigDecimal(longitude),
null,
null,
null,
null,
"LOAD".equals(eventType) ? "LOAD" : null
);
}
}

View File

@ -4,7 +4,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.esperpoc.dto.ActivityIntervalDto;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodEngineMode;
import at.procon.eventhub.esperpoc.dto.EsperOperatingPeriodRequest;
import at.procon.eventhub.esperpoc.dto.EsperSourceSelectionMode;
import at.procon.eventhub.esperpoc.dto.EsperUnknownTreatmentMode;
import at.procon.eventhub.esperpoc.dto.RawActivityEventDto;
import at.procon.eventhub.esperpoc.persistence.EsperPocActivityRepository;
import at.procon.eventhub.esperpoc.dto.NonDrivingIntervalDto;
import at.procon.eventhub.esperpoc.dto.OperatingPeriodActivityIntervalDto;
import java.time.Duration;
@ -12,6 +16,10 @@ import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class EsperOperatingPeriodEvaluationServiceTest {
@ -132,6 +140,46 @@ class EsperOperatingPeriodEvaluationServiceTest {
.isEqualTo(collectorEvaluation.closedPeriods());
}
@Test
void driverCardOnlyModeIgnoresVehicleUnitGapFill() {
UUID driverId = UUID.randomUUID();
EsperPocActivityRepository repository = mock(EsperPocActivityRepository.class);
EsperOperatingPeriodEvaluationService evaluationService = new EsperOperatingPeriodEvaluationService(
repository,
new EsperDriverActivityEngine(),
operatingPeriodEngine
);
when(repository.findDriverActivityEvents(eq("default"), eq(driverId), any(), any())).thenReturn(List.of(
raw(driverId, "DRIVER_CARD", "DRIVE", "START", "2026-04-01T08:00:00Z", "card-1"),
raw(driverId, "DRIVER_CARD", "DRIVE", "END", "2026-04-01T09:00:00Z", "card-1"),
raw(driverId, "VEHICLE_UNIT", "DRIVE", "START", "2026-04-01T09:00:00Z", "vu-1"),
raw(driverId, "VEHICLE_UNIT", "DRIVE", "END", "2026-04-01T10:00:00Z", "vu-1")
));
var result = evaluationService.evaluate(new EsperOperatingPeriodRequest(
"default",
driverId,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
OffsetDateTime.parse("2026-04-02T00:00:00Z"),
24,
7,
3,
0,
0,
EsperSourceSelectionMode.DRIVER_CARD_ONLY,
EsperUnknownTreatmentMode.AS_BREAK_REST,
EsperOperatingPeriodEngineMode.STREAM_COLLECTOR
));
assertThat(result.rawEventCount()).isEqualTo(2);
assertThat(result.driverCardRawEventCount()).isEqualTo(2);
assertThat(result.vehicleUnitRawEventCount()).isZero();
assertThat(result.driverCardIntervalCount()).isEqualTo(1);
assertThat(result.vehicleUnitIntervalCount()).isZero();
assertThat(result.resolvedKnownIntervalCount()).isEqualTo(1);
assertThat(result.rawEvents()).extracting(RawActivityEventDto::sourceKind).containsOnly("DRIVER_CARD");
}
private ActivityIntervalDto activity(
UUID driverId,
String activity,
@ -184,4 +232,31 @@ class EsperOperatingPeriodEvaluationServiceTest {
0L
);
}
private RawActivityEventDto raw(
UUID driverId,
String sourceKind,
String eventType,
String lifecycle,
String occurredAt,
String sourceRowId
) {
return new RawActivityEventDto(
UUID.randomUUID(),
OffsetDateTime.parse(occurredAt),
sourceRowId,
sourceKind + ":" + sourceRowId + ":" + lifecycle,
sourceKind,
"DRIVER_CARD".equals(sourceKind) ? "CARD_ACTIVITY" : "VU_ACTIVITY",
driverId,
null,
null,
eventType,
lifecycle,
"DRIVER",
"INSERTED",
"KNOWN",
null
);
}
}

View File

@ -76,6 +76,7 @@ class JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest {
private JdbcYellowFoxD8BookingExtractionBatchExecutor executor(ImportCursorRepository repository) {
EventHubProperties properties = new EventHubProperties();
return new JdbcYellowFoxD8BookingExtractionBatchExecutor(
null,
null,
null,
new DefaultResourceLoader(),

View File

@ -0,0 +1,113 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubEventBatchDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventHubPackageResult;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.service.EventHubIngestionService;
import java.time.OffsetDateTime;
import java.util.List;
import org.apache.camel.ProducerTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.DefaultResourceLoader;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class JdbcYellowFoxD8BookingExtractionBatchExecutorIngestModeTest {
@Test
void usesDirectIngestionServiceInSyncDirectMode() {
EventHubIngestionService ingestionService = mock(EventHubIngestionService.class);
ProducerTemplate producerTemplate = mock(ProducerTemplate.class);
JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(
EventHubProperties.JdbcExtractionIngestMode.SYNC_DIRECT,
ingestionService,
producerTemplate
);
EventHubEventBatchDto batch = batch();
EventHubPackageResult result = new EventHubPackageResult(null, batch.packageKey(), 2, 2);
when(ingestionService.ingest(batch)).thenReturn(result);
assertThat(executor.persistBatch(batch)).isEqualTo(result);
verify(ingestionService).ingest(batch);
}
@Test
void usesCamelBatchPersistRouteInCamelMode() {
EventHubIngestionService ingestionService = mock(EventHubIngestionService.class);
ProducerTemplate producerTemplate = mock(ProducerTemplate.class);
JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(
EventHubProperties.JdbcExtractionIngestMode.CAMEL_ROUTE,
ingestionService,
producerTemplate
);
EventHubEventBatchDto batch = batch();
EventHubPackageResult result = new EventHubPackageResult(null, batch.packageKey(), 2, 2);
when(producerTemplate.requestBody(
eq("direct:eventhub-batch-persist-input"),
eq(batch),
eq(EventHubPackageResult.class)
)).thenReturn(result);
assertThat(executor.persistBatch(batch)).isEqualTo(result);
verify(producerTemplate).requestBody(
"direct:eventhub-batch-persist-input",
batch,
EventHubPackageResult.class
);
}
private JdbcYellowFoxD8BookingExtractionBatchExecutor executor(
EventHubProperties.JdbcExtractionIngestMode ingestMode,
EventHubIngestionService ingestionService,
ProducerTemplate producerTemplate
) {
EventHubProperties properties = new EventHubProperties();
properties.getYellowFox().setJdbcExtractionIngestMode(ingestMode);
return new JdbcYellowFoxD8BookingExtractionBatchExecutor(
null,
ingestionService,
producerTemplate,
new DefaultResourceLoader(),
null,
properties,
null,
null,
null
);
}
private EventHubEventBatchDto batch() {
return new EventHubEventBatchDto(
"pkg-1",
new EventHubPackageRequest(
"tenant-1",
new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "instance-1", "setting-1", "7"),
null,
ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-04-01T00:00:00+02:00"),
OffsetDateTime.parse("2026-04-02T00:00:00+02:00")
),
"DRIVER_ACTIVITY",
null,
"external-1"
),
DataPackageType.DB_EXTRACT,
null,
null,
List.of(),
null
);
}
}

View File

@ -84,6 +84,7 @@ class JdbcYellowFoxD8BookingExtractionBatchExecutorTest {
EventHubProperties properties = new EventHubProperties();
properties.getYellowFox().setOccurredAtOverlap(overlap);
return new JdbcYellowFoxD8BookingExtractionBatchExecutor(
null,
null,
null,
new DefaultResourceLoader(),