Initial eventhub ingestion service

This commit is contained in:
trifonovt 2026-04-30 11:01:01 +02:00
commit 0a0e2dc615
56 changed files with 2668 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
target/
.idea/
*.iml
.classpath
.project
.settings/
.DS_Store

372
README.md Normal file
View File

@ -0,0 +1,372 @@
# EventHub Acquisition Service
Spring Boot + Apache Camel project skeleton for acquiring normalized EventHub point events from multiple providers/sources.
The current version intentionally focuses on **acquisition**. Final canonical storage/deduplication can be discussed later. The included PostgreSQL schema is a small acquisition-stage store so the project can be run and tested end-to-end.
## Architecture
```text
source-specific Camel input route
-> source-specific mapper
-> EventHubEventDto
-> common EventHub acquisition route
-> validation
-> package-key creation from tenant + EventSource + event family + date/window
-> aggregation / batching
-> chronological sorting inside the batch
-> acquisition package handoff
```
## Namespace
```text
at.procon.eventhub
```
## Main model decisions
### 1. One event = one time point
`EventHubEventDto` has exactly one timestamp:
```text
occurredAt
```
There is no generic `duration`, `endTime`, `validFrom`, or `validTo`. If a source row represents an interval, the mapper may emit separate point events, for example `DRIVE START` and `DRIVE END`.
### 2. Tenant is package-level
`tenantKey` identifies the owner/client/account for the package. It is required for acquisition grouping and future master-data resolution.
```json
{
"tenantKey": "kralowetz"
}
```
Organisation is not mandatory in the incoming event. It can later be derived from resolved driver/vehicle + `occurredAt`.
### 3. EventSource replaces sourceTable/sourceSystem
The acquisition context is represented by `EventSourceDto`:
```json
{
"providerKey": "TACHOGRAPH",
"sourceKind": "VEHICLE_UNIT",
"sourceKey": "TACHOGRAPH_VEHICLE_UNIT",
"sourceInstanceKey": "main-tachograph-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod",
"externalFleetKey": null
}
```
Examples:
```text
TACHOGRAPH / VEHICLE_UNIT
TACHOGRAPH / DRIVER_CARD
YELLOWFOX / TELEMATICS_PLATFORM / YELLOWFOX_D8
FLEETBOARD / TELEMATICS_PLATFORM / FLEETBOARD_POSITION
```
`EventSource` is acquisition context. It should not be part of the canonical real-world event identity. A VU event and a driver-card event may describe the same real event.
### 4. Source-side master references, no incoming internal IDs
The incoming DTO does not require internal `driverId` or `vehicleId`, because in normal ingestion those ids are not known yet.
Driver reference:
```json
"driverRef": {
"sourceEntityId": "driver-100",
"driverCard": {
"nation": "AT",
"number": "D123456789"
}
}
```
Vehicle reference:
```json
"vehicleRef": {
"sourceEntityId": "vehicle-200",
"vin": "WDB9634031L123456",
"vehicleRegistration": {
"nation": "AT",
"number": "W-12345"
}
}
```
VIN is optional. Driver-card-only events can carry only the nation-scoped VRN/registration:
```json
"vehicleRef": {
"sourceEntityId": null,
"vin": null,
"vehicleRegistration": {
"nation": "AT",
"number": "W-12345"
}
}
```
This allows late resolution when VU/master data later connects the VRN to a VIN.
### 5. Generic normalized eventDetails
Reusable event-specific properties are stored in:
```json
"eventDetails": {
"type": "DRIVER_ACTIVITY",
"attributes": {
"cardSlot": "DRIVER",
"cardStatus": "INSERTED",
"drivingStatus": "SINGLE"
}
}
```
Raw provider values stay in `payload`:
```json
"payload": {
"raw": {
"cardSlot": 0,
"cardStatus": 0,
"drivingStatus": 0
}
}
```
This keeps the acquisition DTO generic while preserving meaningful normalized fields.
## Package-level acquisition request
For external/manual ingestion, the preferred request shape is:
```json
{
"package": {
"tenantKey": "kralowetz",
"eventSource": {
"providerKey": "TACHOGRAPH",
"sourceKind": "VEHICLE_UNIT",
"sourceKey": "TACHOGRAPH_VEHICLE_UNIT",
"sourceInstanceKey": "main-tachograph-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod"
},
"eventFamily": "DRIVER_ACTIVITY",
"businessDate": "2026-04-28",
"requestedFrom": "2026-04-28T00:00:00+02:00",
"requestedTo": "2026-04-29T00:00:00+02:00",
"externalPackageId": "TACHOGRAPH:VEHICLE_UNIT:DRIVER_ACTIVITY:2026-04-28"
},
"events": [
{
"externalSourceEventId": "TACHOGRAPH:VEHICLE_UNIT:activity:456:start",
"driverRef": {
"sourceEntityId": "driver-100",
"driverCard": {
"nation": "AT",
"number": "D123456789"
}
},
"vehicleRef": {
"sourceEntityId": "vehicle-200",
"vin": "WDB9634031L123456",
"vehicleRegistration": {
"nation": "AT",
"number": "W-12345"
}
},
"occurredAt": "2026-04-28T08:00:00+02:00",
"eventDomain": "DRIVER_ACTIVITY",
"eventType": "DRIVE",
"lifecycle": "START",
"eventDetails": {
"type": "DRIVER_ACTIVITY",
"attributes": {
"cardSlot": "DRIVER",
"cardStatus": "INSERTED",
"drivingStatus": "SINGLE"
}
},
"payload": {
"raw": {
"activity": 3,
"cardSlot": 0,
"cardStatus": 0,
"drivingStatus": 0
}
}
}
]
}
```
## Routes
### Source-specific input routes
```text
direct:yellowfox-d8-booking-input
direct:telematics-position-input
direct:tachograph-activity-input
direct:eventhub-package-input
direct:eventhub-manual-input
```
### Common route
```text
direct:eventhub-normalized-input
-> validate EventHubEventDto
-> create package key from tenant + EventSource/package context
-> seda:eventhub-batch-input
-> aggregate by eventhub.packageKey
-> sort by occurredAt inside the batch
-> EventHubIngestionService.ingest(...)
```
## REST endpoints
```text
POST /api/eventhub/acquisition/yellowfox/d8-bookings
POST /api/eventhub/acquisition/telematics/positions
POST /api/eventhub/acquisition/tachograph/activities
POST /api/eventhub/acquisition/packages
POST /api/eventhub/acquisition/events
```
## Example: tachograph driver-card activity with VRN only
```bash
curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activities \
-H "Content-Type: application/json" \
-d '[
{
"tenantKey": "kralowetz",
"sourceKind": "DRIVER_CARD",
"sourceInstanceKey": "main-tachograph-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod",
"externalSourceEventId": "TACHOGRAPH:DRIVER_CARD:activity:789:start",
"driverRef": {
"sourceEntityId": "driver-100",
"driverCard": {
"nation": "AT",
"number": "D123456789"
}
},
"vehicleRef": {
"sourceEntityId": null,
"vin": null,
"vehicleRegistration": {
"nation": "AT",
"number": "W-12345"
}
},
"occurredAt": "2026-04-28T08:00:00+02:00",
"activityType": "DRIVE",
"lifecycle": "START",
"cardSlot": "DRIVER",
"cardStatus": "INSERTED",
"drivingStatus": "SINGLE",
"payload": {
"raw": {
"activity": 3,
"cardSlot": 0,
"cardStatus": 0,
"drivingStatus": 0
}
}
}
]'
```
The mapper creates:
```text
Tenant = kralowetz
EventSource = TACHOGRAPH / DRIVER_CARD / TACHOGRAPH_DRIVER_CARD
EventDomain = DRIVER_ACTIVITY
EventType = DRIVE
Lifecycle = START
EventDetails.type = DRIVER_ACTIVITY
VehicleRef = VRN-only, VIN can be resolved later
```
## Start PostgreSQL
```bash
docker compose up -d
```
## Run the service
```bash
mvn spring-boot:run
```
## Check acquisition packages
```sql
select p.received_at,
p.tenant_key,
s.provider_key,
s.source_kind,
s.source_key,
p.event_family,
p.business_date,
p.status,
p.event_count
from eventhub.data_package p
join eventhub.event_source s on s.id = p.event_source_id
order by p.received_at desc;
```
## Check acquired events
```sql
select occurred_at,
driver_source_entity_id,
driver_card_nation,
driver_card_number,
vehicle_source_entity_id,
vehicle_vin,
vehicle_registration_nation,
vehicle_registration_number,
event_domain,
event_type,
lifecycle,
event_details,
payload
from eventhub.acquired_event
order by occurred_at desc;
```
## Next implementation steps
1. Add source-specific SQL extraction routes for the tachograph DB event families:
- activities from CardActivity/VUActivity
- card insert/withdraw from CardVehiclesUsed/IWCycle
- positions from places/GNSS/border/load-unload sources
- border crossings
- load/unload
- specific conditions: out-of-scope and ferry/train
- speeding events
2. Keep each extractor package-scoped by `tenant + EventSource + eventFamily + businessDate/import window`.
3. Add master-data resolution later:
- driver by tenant + driver card nation/number + occurredAt
- vehicle by tenant + VIN or tenant + registration nation/number + occurredAt
- late resolution from VRN-only driver-card events to VIN after VU/master data import
4. Discuss final storage model:
- canonical `eventhub.event`
- source-record table linked to EventSource/package
- deduplication policy for VU vs driver-card duplicates

20
docker-compose.yml Normal file
View File

@ -0,0 +1,20 @@
services:
postgres:
image: postgres:16
container_name: eventhub-postgres
environment:
POSTGRES_DB: eventhub
POSTGRES_USER: eventhub
POSTGRES_PASSWORD: eventhub
ports:
- "5432:5432"
volumes:
- eventhub-postgres-data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U eventhub -d eventhub"]
interval: 5s
timeout: 5s
retries: 20
volumes:
eventhub-postgres-data:

View File

@ -0,0 +1,10 @@
-- Optional TimescaleDB migration for the acquisition-stage event table.
-- Enable only when the target PostgreSQL instance has TimescaleDB installed.
create extension if not exists timescaledb;
select create_hypertable(
'eventhub.acquired_event',
'occurred_at',
if_not_exists => true,
migrate_data => true
);

101
pom.xml Normal file
View File

@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>4.0.6</version>
<relativePath/>
</parent>
<groupId>at.procon.eventhub</groupId>
<artifactId>eventhub-ingestion-service</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>eventhub-ingestion-service</name>
<description>Spring Boot + Apache Camel EventHub ingestion service</description>
<properties>
<java.version>21</java.version>
<camel.version>4.18.2</camel.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-bom</artifactId>
<version>${camel.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-direct-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-seda-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-jackson-starter</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-database-postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,15 @@
package at.procon.eventhub;
import at.procon.eventhub.config.EventHubProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties(EventHubProperties.class)
public class EventHubIngestionApplication {
public static void main(String[] args) {
SpringApplication.run(EventHubIngestionApplication.class, args);
}
}

View File

@ -0,0 +1,65 @@
package at.procon.eventhub.api;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageIngestRequest;
import at.procon.eventhub.dto.source.TachographActivityDto;
import at.procon.eventhub.dto.source.TelematicsPositionDto;
import at.procon.eventhub.dto.source.YellowFoxD8BookingDto;
import jakarta.validation.Valid;
import java.util.List;
import java.util.Map;
import org.apache.camel.ProducerTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/eventhub/acquisition")
public class EventHubIngestionController {
private final ProducerTemplate producerTemplate;
public EventHubIngestionController(ProducerTemplate producerTemplate) {
this.producerTemplate = producerTemplate;
}
@PostMapping("/yellowfox/d8-bookings")
public ResponseEntity<Map<String, Object>> ingestYellowFoxD8Bookings(@RequestBody List<YellowFoxD8BookingDto> bookings) {
producerTemplate.sendBody("direct:yellowfox-d8-booking-input", bookings);
return accepted(bookings.size(), "direct:yellowfox-d8-booking-input");
}
@PostMapping("/telematics/positions")
public ResponseEntity<Map<String, Object>> ingestTelematicsPositions(@RequestBody List<TelematicsPositionDto> positions) {
producerTemplate.sendBody("direct:telematics-position-input", positions);
return accepted(positions.size(), "direct:telematics-position-input");
}
@PostMapping("/tachograph/activities")
public ResponseEntity<Map<String, Object>> ingestTachographActivities(@RequestBody List<TachographActivityDto> activities) {
producerTemplate.sendBody("direct:tachograph-activity-input", activities);
return accepted(activities.size(), "direct:tachograph-activity-input");
}
@PostMapping("/packages")
public ResponseEntity<Map<String, Object>> ingestPackage(@Valid @RequestBody EventHubPackageIngestRequest request) {
producerTemplate.sendBody("direct:eventhub-package-input", request);
return accepted(request.events().size(), "direct:eventhub-package-input");
}
@PostMapping("/events")
public ResponseEntity<Map<String, Object>> ingestNormalizedEvents(@RequestBody List<EventHubEventDto> events) {
producerTemplate.sendBody("direct:eventhub-manual-input", events);
return accepted(events.size(), "direct:eventhub-manual-input");
}
private ResponseEntity<Map<String, Object>> accepted(int count, String route) {
return ResponseEntity.accepted().body(Map.of(
"accepted", count,
"route", route,
"note", "Events are accepted into Camel acquisition routes, grouped by EventSource/package context, sorted in each batch, and then handed to the current ingestion adapter."
));
}
}

View File

@ -0,0 +1,66 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubEventBatchDto;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.service.EventHubEventSorter;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.springframework.stereotype.Component;
@Component
public class EventHubBatchBuildProcessor implements Processor {
private final EventHubEventSorter sorter;
public EventHubBatchBuildProcessor(EventHubEventSorter sorter) {
this.sorter = sorter;
}
@Override
public void process(Exchange exchange) {
@SuppressWarnings("unchecked")
List<EventHubEventDto> events = exchange.getMessage().getBody(List.class);
List<EventHubEventDto> sortedEvents = sorter.sort(events);
String packageKey = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_KEY, String.class);
EventHubPackageRequest packageInfo = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_INFO, EventHubPackageRequest.class);
if (packageInfo == null && !sortedEvents.isEmpty()) {
packageInfo = sortedEvents.getFirst().packageInfo();
}
OffsetDateTime requestedFrom = packageInfo != null && packageInfo.requestedFrom() != null
? packageInfo.requestedFrom()
: sortedEvents.getFirst().occurredAt();
OffsetDateTime requestedTo = packageInfo != null && packageInfo.requestedTo() != null
? packageInfo.requestedTo()
: sortedEvents.getLast().occurredAt();
Map<String, Object> metadata = new HashMap<>();
metadata.put("camelRouteId", exchange.getFromRouteId());
metadata.put("packageKey", packageKey);
metadata.put("eventCount", sortedEvents.size());
if (packageInfo != null) {
metadata.put("tenantKey", packageInfo.tenantKey());
metadata.put("eventSource", packageInfo.eventSource().stableKey());
metadata.put("eventFamily", packageInfo.eventFamily());
metadata.put("businessDate", packageInfo.businessDate());
metadata.put("externalPackageId", packageInfo.externalPackageId());
}
exchange.getMessage().setBody(new EventHubEventBatchDto(
packageKey,
packageInfo,
DataPackageType.CAMEL_BATCH,
requestedFrom,
requestedTo,
sortedEvents,
metadata
));
}
}

View File

@ -0,0 +1,51 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.service.EventHubIngestionService;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class EventHubCommonIngestionRoute extends RouteBuilder {
private final EventHubProperties properties;
private final EventHubEventValidationProcessor validationProcessor;
private final EventHubPackageKeyProcessor packageKeyProcessor;
private final EventHubEventAggregationStrategy aggregationStrategy;
private final EventHubBatchBuildProcessor batchBuildProcessor;
private final EventHubIngestionService ingestionService;
public EventHubCommonIngestionRoute(
EventHubProperties properties,
EventHubEventValidationProcessor validationProcessor,
EventHubPackageKeyProcessor packageKeyProcessor,
EventHubEventAggregationStrategy aggregationStrategy,
EventHubBatchBuildProcessor batchBuildProcessor,
EventHubIngestionService ingestionService
) {
this.properties = properties;
this.validationProcessor = validationProcessor;
this.packageKeyProcessor = packageKeyProcessor;
this.aggregationStrategy = aggregationStrategy;
this.batchBuildProcessor = batchBuildProcessor;
this.ingestionService = ingestionService;
}
@Override
public void configure() {
from("direct:eventhub-normalized-input")
.routeId("eventhub-normalized-input-route")
.process(validationProcessor)
.process(packageKeyProcessor)
.to("seda:eventhub-batch-input");
from("seda:eventhub-batch-input")
.routeId("eventhub-batch-and-persist-route")
.aggregate(header(EventHubHeaders.PACKAGE_KEY), aggregationStrategy)
.completionSize(properties.getBatch().getCompletionSize())
.completionTimeout(properties.getBatch().getCompletionTimeout().toMillis())
.forceCompletionOnStop()
.process(batchBuildProcessor)
.bean(ingestionService, "ingest");
}
}

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.dto.EventHubEventDto;
import java.util.ArrayList;
import java.util.List;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.springframework.stereotype.Component;
@Component
public class EventHubEventAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
EventHubEventDto event = newExchange.getMessage().getBody(EventHubEventDto.class);
if (oldExchange == null) {
List<EventHubEventDto> events = new ArrayList<>();
events.add(event);
newExchange.getMessage().setBody(events);
return newExchange;
}
@SuppressWarnings("unchecked")
List<EventHubEventDto> events = oldExchange.getMessage().getBody(List.class);
events.add(event);
return oldExchange;
}
}

View File

@ -0,0 +1,23 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.service.EventHubEventValidator;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.springframework.stereotype.Component;
@Component
public class EventHubEventValidationProcessor implements Processor {
private final EventHubEventValidator validator;
public EventHubEventValidationProcessor(EventHubEventValidator validator) {
this.validator = validator;
}
@Override
public void process(Exchange exchange) {
EventHubEventDto event = exchange.getMessage().getBody(EventHubEventDto.class);
validator.validate(event);
}
}

View File

@ -0,0 +1,10 @@
package at.procon.eventhub.camel;
public final class EventHubHeaders {
public static final String PACKAGE_KEY = "eventhub.packageKey";
public static final String PACKAGE_INFO = "eventhub.packageInfo";
private EventHubHeaders() {
}
}

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.service.EventHubPackageKeyBuilder;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.springframework.stereotype.Component;
@Component
public class EventHubPackageKeyProcessor implements Processor {
private final EventHubPackageKeyBuilder packageKeyBuilder;
public EventHubPackageKeyProcessor(EventHubPackageKeyBuilder packageKeyBuilder) {
this.packageKeyBuilder = packageKeyBuilder;
}
@Override
public void process(Exchange exchange) {
EventHubEventDto event = exchange.getMessage().getBody(EventHubEventDto.class);
if (exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_KEY) == null) {
exchange.getMessage().setHeader(EventHubHeaders.PACKAGE_KEY, packageKeyBuilder.build(event));
}
if (exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_INFO) == null && event.packageInfo() != null) {
exchange.getMessage().setHeader(EventHubHeaders.PACKAGE_INFO, event.packageInfo());
}
}
}

View File

@ -0,0 +1,36 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageIngestRequest;
import at.procon.eventhub.dto.EventHubPackageRequest;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class ManualEventHubInputRoute extends RouteBuilder {
@Override
public void configure() {
from("direct:eventhub-package-input")
.routeId("eventhub-package-input-route")
.process(exchange -> {
EventHubPackageIngestRequest request = exchange.getMessage().getBody(EventHubPackageIngestRequest.class);
exchange.getMessage().setHeader(EventHubHeaders.PACKAGE_INFO, request.packageInfo());
exchange.getMessage().setBody(request.events());
})
.split(body())
.process(exchange -> {
EventHubEventDto event = exchange.getMessage().getBody(EventHubEventDto.class);
EventHubPackageRequest packageInfo = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_INFO, EventHubPackageRequest.class);
exchange.getMessage().setBody(event.withPackageInfo(packageInfo));
})
.to("direct:eventhub-normalized-input")
.end();
from("direct:eventhub-manual-input")
.routeId("eventhub-manual-input-route")
.split(body())
.to("direct:eventhub-normalized-input")
.end();
}
}

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.service.TachographActivityEventMapper;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class TachographActivityInputRoute extends RouteBuilder {
private final TachographActivityEventMapper mapper;
public TachographActivityInputRoute(TachographActivityEventMapper mapper) {
this.mapper = mapper;
}
@Override
public void configure() {
from("direct:tachograph-activity-input")
.routeId("tachograph-activity-input-route")
.split(body())
.bean(mapper, "map")
.to("direct:eventhub-normalized-input")
.end();
}
}

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.service.TelematicsPositionEventMapper;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class TelematicsPositionInputRoute extends RouteBuilder {
private final TelematicsPositionEventMapper mapper;
public TelematicsPositionInputRoute(TelematicsPositionEventMapper mapper) {
this.mapper = mapper;
}
@Override
public void configure() {
from("direct:telematics-position-input")
.routeId("telematics-position-input-route")
.split(body())
.bean(mapper, "map")
.to("direct:eventhub-normalized-input")
.end();
}
}

View File

@ -0,0 +1,25 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.service.YellowFoxD8BookingEventMapper;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class YellowFoxD8BookingInputRoute extends RouteBuilder {
private final YellowFoxD8BookingEventMapper mapper;
public YellowFoxD8BookingInputRoute(YellowFoxD8BookingEventMapper mapper) {
this.mapper = mapper;
}
@Override
public void configure() {
from("direct:yellowfox-d8-booking-input")
.routeId("yellowfox-d8-booking-input-route")
.split(body())
.bean(mapper, "map")
.to("direct:eventhub-normalized-input")
.end();
}
}

View File

@ -0,0 +1,38 @@
package at.procon.eventhub.config;
import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "eventhub")
public class EventHubProperties {
private final Batch batch = new Batch();
public Batch getBatch() {
return batch;
}
public static class Batch {
/** Number of events collected before a package is persisted. */
private int completionSize = 1000;
/** Maximum time to wait for more events belonging to the same package key. */
private Duration completionTimeout = Duration.ofSeconds(5);
public int getCompletionSize() {
return completionSize;
}
public void setCompletionSize(int completionSize) {
this.completionSize = completionSize;
}
public Duration getCompletionTimeout() {
return completionTimeout;
}
public void setCompletionTimeout(Duration completionTimeout) {
this.completionTimeout = completionTimeout;
}
}
}

View File

@ -0,0 +1,6 @@
package at.procon.eventhub.dto;
public enum CardSlot {
DRIVER,
CO_DRIVER
}

View File

@ -0,0 +1,6 @@
package at.procon.eventhub.dto;
public enum CardStatus {
INSERTED,
NOT_INSERTED
}

View File

@ -0,0 +1,7 @@
package at.procon.eventhub.dto;
public enum DataPackageStatus {
IMPORTING,
IMPORTED,
FAILED
}

View File

@ -0,0 +1,9 @@
package at.procon.eventhub.dto;
public enum DataPackageType {
CAMEL_BATCH,
API_RESPONSE,
FILE_IMPORT,
DB_EXTRACT,
MANUAL_IMPORT
}

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.dto;
/**
* Tachograph driver-card identifier. The card number is scoped by issuing nation.
*/
public record DriverCardRefDto(
String nation,
String number
) {
public DriverCardRefDto {
nation = normalize(nation);
number = normalizeNullable(number);
}
public boolean hasValue() {
return number != null && !number.isBlank();
}
public String stableKey() {
return (nation == null ? "" : nation) + ":" + (number == null ? "" : number);
}
private static String normalize(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase();
}
private static String normalizeNullable(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
}

View File

@ -0,0 +1,31 @@
package at.procon.eventhub.dto;
import jakarta.validation.Valid;
/**
* Source-side driver reference. No internal EventHub driver id is required in
* incoming acquisition requests; it can be resolved later from sourceEntityId or
* nation-scoped driver card number.
*/
public record DriverRefDto(
String sourceEntityId,
@Valid DriverCardRefDto driverCard
) {
public DriverRefDto {
sourceEntityId = normalizeNullable(sourceEntityId);
}
public boolean hasAnyReference() {
return (sourceEntityId != null && !sourceEntityId.isBlank())
|| (driverCard != null && driverCard.hasValue());
}
public String stableKey() {
String cardKey = driverCard == null ? "" : driverCard.stableKey();
return (sourceEntityId == null ? "" : sourceEntityId) + "|" + cardKey;
}
private static String normalizeNullable(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
}

View File

@ -0,0 +1,8 @@
package at.procon.eventhub.dto;
public enum DrivingStatus {
SINGLE,
CREW,
KNOWN,
UNKNOWN
}

View File

@ -0,0 +1,22 @@
package at.procon.eventhub.dto;
import com.fasterxml.jackson.databind.JsonNode;
import jakarta.validation.constraints.NotBlank;
/**
* Generic, normalized event-specific attributes.
*
* Common fields stay on EventHubEventDto. Provider-specific raw values stay in
* payload. Reusable semantic attributes such as cardSlot, countryFrom, or
* operation are placed here under a detail type.
*/
public record EventDetailsDto(
@NotBlank String type,
JsonNode attributes
) {
public EventDetailsDto {
if (type != null) {
type = type.trim().toUpperCase().replace('-', '_').replace(' ', '_');
}
}
}

View File

@ -0,0 +1,17 @@
package at.procon.eventhub.dto;
public enum EventDomain {
DRIVER_CARD,
DRIVER_ACTIVITY,
IGNITION,
POSITION,
BORDER_CROSSING,
LOAD_UNLOAD,
OUT_OF_SCOPE,
FERRY_TRAIN,
SPEEDING,
PLACE,
VEHICLE_DATA,
TELEMATICS_DATA,
MANUAL_ENTRY
}

View File

@ -0,0 +1,20 @@
package at.procon.eventhub.dto;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
public record EventHubEventBatchDto(
String packageKey,
EventHubPackageRequest packageInfo,
DataPackageType packageType,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
List<EventHubEventDto> events,
Map<String, Object> metadata
) {
public EventHubEventBatchDto {
events = events == null ? List.of() : List.copyOf(events);
metadata = metadata == null ? Map.of() : Map.copyOf(metadata);
}
}

View File

@ -0,0 +1,121 @@
package at.procon.eventhub.dto;
import com.fasterxml.jackson.databind.JsonNode;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.time.OffsetDateTime;
import java.util.UUID;
/**
* Normalized acquisition event.
*
* It is a point event: it has exactly one occurredAt timestamp. Duration/end
* time are not part of the generic EventHub event contract. If a source row
* describes an interval, the acquisition mapper may emit multiple point events.
*/
public record EventHubEventDto(
UUID eventId,
/** Stable id of this imported/source event record, not the canonical business event id. */
@NotBlank String externalSourceEventId,
/** Source-side driver reference. No internal driver id is required during acquisition. */
@Valid DriverRefDto driverRef,
/** Source-side vehicle reference. VIN can be absent when only driver-card data is available. */
@Valid VehicleRefDto vehicleRef,
@NotNull OffsetDateTime occurredAt,
OffsetDateTime receivedPartnerAt,
OffsetDateTime receivedHubAt,
@NotNull EventDomain eventDomain,
@NotNull EventType eventType,
@NotNull EventLifecycle lifecycle,
Long odometerM,
@Valid GeoPointDto position,
/** Normalized semantic details depending on eventDomain/eventType. */
@Valid EventDetailsDto eventDetails,
/** Raw/provider-specific payload, stored as real JSON and not as encoded JSON string. */
JsonNode payload,
boolean manualEntry,
/** Package/source context used by Camel acquisition routes for grouping and batching. */
@Valid EventHubPackageRequest packageInfo
) {
public EventHubEventDto {
boolean hasDriver = driverRef != null && driverRef.hasAnyReference();
boolean hasVehicle = vehicleRef != null && vehicleRef.hasAnyReference();
if (!hasDriver && !hasVehicle) {
throw new IllegalArgumentException("driverRef or vehicleRef with at least one identifier must be set");
}
}
public EventHubEventDto withEventId(UUID newEventId) {
return new EventHubEventDto(
newEventId,
externalSourceEventId,
driverRef,
vehicleRef,
occurredAt,
receivedPartnerAt,
receivedHubAt,
eventDomain,
eventType,
lifecycle,
odometerM,
position,
eventDetails,
payload,
manualEntry,
packageInfo
);
}
public EventHubEventDto withReceivedHubAt(OffsetDateTime value) {
return new EventHubEventDto(
eventId,
externalSourceEventId,
driverRef,
vehicleRef,
occurredAt,
receivedPartnerAt,
value,
eventDomain,
eventType,
lifecycle,
odometerM,
position,
eventDetails,
payload,
manualEntry,
packageInfo
);
}
public EventHubEventDto withPackageInfo(EventHubPackageRequest value) {
return new EventHubEventDto(
eventId,
externalSourceEventId,
driverRef,
vehicleRef,
occurredAt,
receivedPartnerAt,
receivedHubAt,
eventDomain,
eventType,
lifecycle,
odometerM,
position,
eventDetails,
payload,
manualEntry,
value
);
}
}

View File

@ -0,0 +1,18 @@
package at.procon.eventhub.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import java.util.List;
public record EventHubPackageIngestRequest(
@JsonProperty("package")
@Valid @NotNull EventHubPackageRequest packageInfo,
@Valid @NotEmpty List<EventHubEventDto> events
) {
public EventHubPackageIngestRequest {
events = events == null ? List.of() : List.copyOf(events);
}
}

View File

@ -0,0 +1,33 @@
package at.procon.eventhub.dto;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.OffsetDateTime;
/**
* Acquisition package context. One package should represent one coherent import
* unit, e.g. tenant + TACHOGRAPH + VEHICLE_UNIT + DRIVER_ACTIVITY + business date.
*/
public record EventHubPackageRequest(
/** Tenant/client/account owning the acquired data. */
@NotBlank String tenantKey,
@Valid @NotNull EventSourceDto eventSource,
@NotBlank String eventFamily,
LocalDate businessDate,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
@NotBlank String externalPackageId
) {
public EventHubPackageRequest {
tenantKey = normalizeTenant(tenantKey);
if (eventFamily != null) {
eventFamily = eventFamily.trim().toUpperCase().replace('-', '_').replace(' ', '_');
}
}
private static String normalizeTenant(String value) {
return value == null || value.isBlank() ? value : value.trim();
}
}

View File

@ -0,0 +1,11 @@
package at.procon.eventhub.dto;
import java.util.UUID;
public record EventHubPackageResult(
UUID packageId,
String packageKey,
int receivedCount,
int insertedCount
) {
}

View File

@ -0,0 +1,16 @@
package at.procon.eventhub.dto;
public enum EventLifecycle {
INSERT,
WITHDRAW,
ON,
OFF,
START,
END,
BEGIN,
INBOUND,
OUTBOUND,
OUT_EU,
SNAPSHOT,
MANUAL
}

View File

@ -0,0 +1,48 @@
package at.procon.eventhub.dto;
import jakarta.validation.constraints.NotBlank;
/**
* Describes the origin of an acquired event record.
*
* This is intentionally acquisition/source context and not part of the canonical
* real-world event identity. The same canonical event can be acquired from
* TACHOGRAPH/VEHICLE_UNIT and later from TACHOGRAPH/DRIVER_CARD.
*/
public record EventSourceDto(
@NotBlank String providerKey,
@NotBlank String sourceKind,
@NotBlank String sourceKey,
String sourceInstanceKey,
String tenantProviderSettingKey,
String externalFleetKey
) {
public EventSourceDto {
providerKey = normalize(providerKey);
sourceKind = normalize(sourceKind);
sourceKey = normalize(sourceKey);
sourceInstanceKey = normalizeNullable(sourceInstanceKey);
tenantProviderSettingKey = normalizeNullable(tenantProviderSettingKey);
externalFleetKey = normalizeNullable(externalFleetKey);
}
public String stableKey() {
return String.join(":",
providerKey,
sourceKind,
sourceKey,
sourceInstanceKey == null ? "default" : sourceInstanceKey
);
}
private static String normalize(String value) {
if (value == null || value.isBlank()) {
return value;
}
return value.trim().toUpperCase().replace('-', '_').replace(' ', '_');
}
private static String normalizeNullable(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
}

View File

@ -0,0 +1,29 @@
package at.procon.eventhub.dto;
public enum EventType {
CARD_INSERTED,
CARD_WITHDRAWN,
DRIVE,
BREAK_REST,
AVAILABILITY,
WORK,
UNKNOWN_ACTIVITY,
IGNITION_ON,
IGNITION_OFF,
POSITION_RECORDED,
BORDER_INBOUND,
BORDER_OUTBOUND,
BORDER_OUT_EU,
LOAD,
UNLOAD,
LOAD_UNLOAD,
OUT,
FERRY_TRAIN,
SPEEDING,
START_PLACE,
END_PLACE,
VEHICLE_DATA,
TELEMATICS_DATA,
MANUAL_ENTRY,
UNKNOWN_EVENT
}

View File

@ -0,0 +1,16 @@
package at.procon.eventhub.dto;
import jakarta.validation.constraints.DecimalMax;
import jakarta.validation.constraints.DecimalMin;
import java.math.BigDecimal;
public record GeoPointDto(
@DecimalMin("-90.0") @DecimalMax("90.0") BigDecimal latitude,
@DecimalMin("-180.0") @DecimalMax("180.0") BigDecimal longitude
) {
public GeoPointDto {
if ((latitude == null) != (longitude == null)) {
throw new IllegalArgumentException("latitude and longitude must either both be set or both be null");
}
}
}

View File

@ -0,0 +1,39 @@
package at.procon.eventhub.dto;
import jakarta.validation.Valid;
/**
* Source-side vehicle reference. VIN can be missing for driver-card-only data;
* VRN/registration is nation-scoped and can be resolved to VIN later.
*/
public record VehicleRefDto(
String sourceEntityId,
String vin,
@Valid VehicleRegistrationRefDto vehicleRegistration
) {
public VehicleRefDto {
sourceEntityId = normalizeNullable(sourceEntityId);
vin = normalizeVin(vin);
}
public boolean hasAnyReference() {
return (sourceEntityId != null && !sourceEntityId.isBlank())
|| (vin != null && !vin.isBlank())
|| (vehicleRegistration != null && vehicleRegistration.hasValue());
}
public String stableKey() {
String registrationKey = vehicleRegistration == null ? "" : vehicleRegistration.stableKey();
return (sourceEntityId == null ? "" : sourceEntityId) + "|"
+ (vin == null ? "" : vin) + "|"
+ registrationKey;
}
private static String normalizeNullable(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
private static String normalizeVin(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase();
}
}

View File

@ -0,0 +1,31 @@
package at.procon.eventhub.dto;
/**
* Vehicle registration number reference. VRN/plate is scoped by nation and can
* be resolved historically by occurredAt later.
*/
public record VehicleRegistrationRefDto(
String nation,
String number
) {
public VehicleRegistrationRefDto {
nation = normalize(nation);
number = normalizeNullable(number);
}
public boolean hasValue() {
return number != null && !number.isBlank();
}
public String stableKey() {
return (nation == null ? "" : nation) + ":" + (number == null ? "" : number);
}
private static String normalize(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase();
}
private static String normalizeNullable(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
}

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.dto.source;
import at.procon.eventhub.dto.CardSlot;
import at.procon.eventhub.dto.CardStatus;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.DrivingStatus;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.VehicleRefDto;
import java.time.OffsetDateTime;
import java.util.Map;
public record TachographActivityDto(
String tenantKey,
String sourceKind, // VEHICLE_UNIT or DRIVER_CARD
String sourceInstanceKey, // e.g. main-tachograph-db
String tenantProviderSettingKey,
String externalSourceEventId,
OffsetDateTime occurredAt,
OffsetDateTime receivedPartnerAt,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
EventType activityType,
EventLifecycle lifecycle,
CardSlot cardSlot,
CardStatus cardStatus,
DrivingStatus drivingStatus,
Map<String, Object> payload
) {
}

View File

@ -0,0 +1,27 @@
package at.procon.eventhub.dto.source;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.VehicleRefDto;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
import java.util.Map;
public record TelematicsPositionDto(
String tenantKey,
String providerKey, // YELLOWFOX, FLEETBOARD, TOMTOM, ...
String sourceKey, // e.g. FLEETBOARD_POSITION
String sourceInstanceKey,
String tenantProviderSettingKey,
String externalFleetKey,
String externalSourceEventId,
OffsetDateTime occurredAt,
OffsetDateTime receivedPartnerAt,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
Long odometerM,
BigDecimal latitude,
BigDecimal longitude,
String positionReason,
Map<String, Object> payload
) {
}

View File

@ -0,0 +1,27 @@
package at.procon.eventhub.dto.source;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.VehicleRefDto;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
import java.util.Map;
public record YellowFoxD8BookingDto(
String tenantKey,
String sourceInstanceKey,
String tenantProviderSettingKey,
String externalFleetKey,
String eventId,
String key,
Integer eventType,
Integer state,
OffsetDateTime occurredAt,
OffsetDateTime receivedPartnerAt,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
Long odometerM,
BigDecimal latitude,
BigDecimal longitude,
Map<String, Object> payload
) {
}

View File

@ -0,0 +1,94 @@
package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.DataPackageStatus;
import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubPackageRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class DataPackageRepository {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
public DataPackageRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
}
public UUID createPackage(
int eventSourceId,
String packageKey,
EventHubPackageRequest packageInfo,
DataPackageType packageType,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
Map<String, Object> metadata
) {
UUID id = UUID.randomUUID();
jdbcTemplate.update(
"""
insert into eventhub.data_package(
id, event_source_id, tenant_key, package_key, package_type, status,
event_family, business_date, external_package_id,
requested_from, requested_to, received_at, event_count, metadata
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), 0, ?::jsonb)
""",
ps -> {
ps.setObject(1, id);
ps.setInt(2, eventSourceId);
ps.setString(3, packageInfo == null ? "default" : packageInfo.tenantKey());
ps.setString(4, packageKey);
ps.setString(5, packageType.name());
ps.setString(6, DataPackageStatus.IMPORTING.name());
ps.setString(7, packageInfo == null ? null : packageInfo.eventFamily());
ps.setObject(8, packageInfo == null ? null : packageInfo.businessDate());
ps.setString(9, packageInfo == null ? packageKey : packageInfo.externalPackageId());
ps.setObject(10, requestedFrom);
ps.setObject(11, requestedTo);
ps.setString(12, toJson(metadata));
}
);
return id;
}
public void markImported(UUID packageId, int insertedCount) {
jdbcTemplate.update(
"""
update eventhub.data_package
set status = ?, event_count = ?, completed_at = now()
where id = ?
""",
DataPackageStatus.IMPORTED.name(),
insertedCount,
packageId
);
}
public void markFailed(UUID packageId, String errorMessage) {
jdbcTemplate.update(
"""
update eventhub.data_package
set status = ?, error_message = ?, completed_at = now()
where id = ?
""",
DataPackageStatus.FAILED.name(),
errorMessage,
packageId
);
}
private String toJson(Map<String, Object> value) {
try {
return objectMapper.writeValueAsString(value == null ? Map.of() : value);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Cannot serialize package metadata", e);
}
}
}

View File

@ -0,0 +1,143 @@
package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.service.EventNaturalKeyService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.PreparedStatement;
import java.sql.Types;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class EventRepository {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
private final EventNaturalKeyService naturalKeyService;
public EventRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, EventNaturalKeyService naturalKeyService) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
this.naturalKeyService = naturalKeyService;
}
/**
* Temporary acquisition-stage persistence. The canonical storage model will
* be finalized later; for now this table keeps acquired point events with
* EventSource context, source-side driver/vehicle refs, generic eventDetails,
* and raw payload JSON.
*/
public int batchInsert(UUID packageId, int eventSourceId, List<EventHubEventDto> events) {
int[] counts = jdbcTemplate.batchUpdate(
"""
insert into eventhub.acquired_event(
id, event_source_id, data_package_id,
external_source_event_id,
driver_source_entity_id, driver_card_nation, driver_card_number,
vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number,
occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle,
odometer_m, latitude, longitude,
event_details, payload, manual_entry,
canonical_key_hash, source_record_key_hash
) values (
?, ?, ?,
?,
?, ?, ?,
?, ?, ?, ?,
?, ?, ?,
?, ?, ?,
?, ?, ?,
?::jsonb, ?::jsonb, ?,
?, ?
)
on conflict do nothing
""",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws java.sql.SQLException {
EventHubEventDto event = events.get(i);
UUID eventId = event.eventId() == null ? UUID.randomUUID() : event.eventId();
OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt();
DriverRefDto driverRef = event.driverRef();
DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard();
VehicleRefDto vehicleRef = event.vehicleRef();
VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration();
ps.setObject(1, eventId);
ps.setInt(2, eventSourceId);
ps.setObject(3, packageId);
ps.setString(4, event.externalSourceEventId());
ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId());
ps.setString(6, driverCard == null ? null : driverCard.nation());
ps.setString(7, driverCard == null ? null : driverCard.number());
ps.setString(8, vehicleRef == null ? null : vehicleRef.sourceEntityId());
ps.setString(9, vehicleRef == null ? null : vehicleRef.vin());
ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation());
ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number());
ps.setObject(12, event.occurredAt());
ps.setObject(13, event.receivedPartnerAt());
ps.setObject(14, receivedHubAt);
ps.setString(15, event.eventDomain().name());
ps.setString(16, event.eventType().name());
ps.setString(17, event.lifecycle().name());
setNullableLong(ps, 18, event.odometerM());
if (event.position() == null) {
ps.setNull(19, Types.NUMERIC);
ps.setNull(20, Types.NUMERIC);
} else {
ps.setObject(19, event.position().latitude());
ps.setObject(20, event.position().longitude());
}
ps.setString(21, toJson(objectMapper.valueToTree(event.eventDetails())));
ps.setString(22, toJson(event.payload()));
ps.setBoolean(23, event.manualEntry());
ps.setString(24, naturalKeyService.buildCanonicalKeyHash(event));
ps.setString(25, naturalKeyService.buildSourceRecordKeyHash(event, eventSourceId));
}
@Override
public int getBatchSize() {
return events.size();
}
}
);
int inserted = 0;
for (int count : counts) {
if (count > 0 || count == PreparedStatement.SUCCESS_NO_INFO) {
inserted++;
}
}
return inserted;
}
private void setNullableLong(PreparedStatement ps, int index, Long value) throws java.sql.SQLException {
if (value == null) {
ps.setNull(index, Types.BIGINT);
} else {
ps.setLong(index, value);
}
}
private String toJson(JsonNode value) {
try {
return objectMapper.writeValueAsString(value == null ? objectMapper.createObjectNode() : value);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Cannot serialize JSONB value", e);
}
}
}

View File

@ -0,0 +1,68 @@
package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.EventSourceDto;
import java.sql.PreparedStatement;
import java.sql.Statement;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class EventSourceRepository {
private final JdbcTemplate jdbcTemplate;
public EventSourceRepository(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public int resolveSourceId(EventSourceDto eventSource) {
Integer existing = findSourceId(eventSource);
if (existing != null) {
return existing;
}
jdbcTemplate.update(connection -> {
PreparedStatement ps = connection.prepareStatement(
"""
insert into eventhub.event_source(
provider_key, source_kind, source_key, source_instance_key,
tenant_provider_setting_key, external_fleet_key
) values (?, ?, ?, ?, ?, ?)
on conflict (provider_key, source_kind, source_key, source_instance_key) do nothing
""",
Statement.NO_GENERATED_KEYS
);
ps.setString(1, eventSource.providerKey());
ps.setString(2, eventSource.sourceKind());
ps.setString(3, eventSource.sourceKey());
ps.setString(4, eventSource.sourceInstanceKey() == null ? "default" : eventSource.sourceInstanceKey());
ps.setString(5, eventSource.tenantProviderSettingKey());
ps.setString(6, eventSource.externalFleetKey());
return ps;
});
Integer created = findSourceId(eventSource);
if (created == null) {
throw new IllegalStateException("Could not resolve event source id for " + eventSource.stableKey());
}
return created;
}
private Integer findSourceId(EventSourceDto eventSource) {
return jdbcTemplate.query(
"""
select id
from eventhub.event_source
where provider_key = ?
and source_kind = ?
and source_key = ?
and source_instance_key = ?
""",
rs -> rs.next() ? rs.getInt("id") : null,
eventSource.providerKey(),
eventSource.sourceKind(),
eventSource.sourceKey(),
eventSource.sourceInstanceKey() == null ? "default" : eventSource.sourceInstanceKey()
);
}
}

View File

@ -0,0 +1,76 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.CardSlot;
import at.procon.eventhub.dto.CardStatus;
import at.procon.eventhub.dto.DrivingStatus;
import at.procon.eventhub.dto.EventDetailsDto;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.math.BigDecimal;
import java.util.LinkedHashMap;
import java.util.Map;
import org.springframework.stereotype.Component;
@Component
public class EventDetailsFactory {
private final ObjectMapper objectMapper;
public EventDetailsFactory(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
public EventDetailsDto driverActivity(CardSlot cardSlot, CardStatus cardStatus, DrivingStatus drivingStatus) {
Map<String, Object> attributes = new LinkedHashMap<>();
put(attributes, "cardSlot", cardSlot);
put(attributes, "cardStatus", cardStatus);
put(attributes, "drivingStatus", drivingStatus);
return new EventDetailsDto("DRIVER_ACTIVITY", objectMapper.valueToTree(attributes));
}
public EventDetailsDto driverCard(CardSlot cardSlot, CardStatus cardStatus) {
Map<String, Object> attributes = new LinkedHashMap<>();
put(attributes, "cardSlot", cardSlot);
put(attributes, "cardStatus", cardStatus);
return new EventDetailsDto("DRIVER_CARD", objectMapper.valueToTree(attributes));
}
public EventDetailsDto position(String reason) {
Map<String, Object> attributes = new LinkedHashMap<>();
put(attributes, "positionReason", reason);
return new EventDetailsDto("POSITION", objectMapper.valueToTree(attributes));
}
public EventDetailsDto borderCrossing(String countryFrom, String countryTo) {
Map<String, Object> attributes = new LinkedHashMap<>();
put(attributes, "countryFrom", countryFrom);
put(attributes, "countryTo", countryTo);
return new EventDetailsDto("BORDER_CROSSING", objectMapper.valueToTree(attributes));
}
public EventDetailsDto loadUnload(String operation) {
Map<String, Object> attributes = new LinkedHashMap<>();
put(attributes, "operation", operation);
return new EventDetailsDto("LOAD_UNLOAD", objectMapper.valueToTree(attributes));
}
public EventDetailsDto speeding(BigDecimal speedKmh, BigDecimal permittedSpeedKmh) {
Map<String, Object> attributes = new LinkedHashMap<>();
put(attributes, "speedKmh", speedKmh);
put(attributes, "permittedSpeedKmh", permittedSpeedKmh);
if (speedKmh != null && permittedSpeedKmh != null) {
put(attributes, "overspeedKmh", speedKmh.subtract(permittedSpeedKmh));
}
return new EventDetailsDto("SPEEDING", objectMapper.valueToTree(attributes));
}
public JsonNode payloadFromMap(Map<String, Object> payload) {
return objectMapper.valueToTree(payload == null ? Map.of() : payload);
}
private void put(Map<String, Object> target, String key, Object value) {
if (value != null) {
target.put(key, value instanceof Enum<?> enumValue ? enumValue.name() : value);
}
}
}

View File

@ -0,0 +1,19 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventHubEventDto;
import java.util.Comparator;
import java.util.List;
import org.springframework.stereotype.Component;
@Component
public class EventHubEventSorter {
public List<EventHubEventDto> sort(List<EventHubEventDto> events) {
return events.stream()
.sorted(Comparator
.comparing(EventHubEventDto::occurredAt)
.thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo))
.thenComparing(event -> event.eventType().name()))
.toList();
}
}

View File

@ -0,0 +1,64 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import org.springframework.stereotype.Component;
@Component
public class EventHubEventValidator {
public void validate(EventHubEventDto event) {
if (event == null) {
throw new IllegalArgumentException("EventHubEventDto must not be null");
}
if (event.externalSourceEventId() == null || event.externalSourceEventId().isBlank()) {
throw new IllegalArgumentException("externalSourceEventId must be set");
}
if (event.occurredAt() == null) {
throw new IllegalArgumentException("occurredAt must be set");
}
boolean hasDriverRef = event.driverRef() != null && event.driverRef().hasAnyReference();
boolean hasVehicleRef = event.vehicleRef() != null && event.vehicleRef().hasAnyReference();
if (!hasDriverRef && !hasVehicleRef) {
throw new IllegalArgumentException("driverRef or vehicleRef with at least one identifier must be set");
}
if (event.packageInfo() == null || event.packageInfo().tenantKey() == null || event.packageInfo().tenantKey().isBlank()) {
throw new IllegalArgumentException("packageInfo.tenantKey must be set");
}
if (event.packageInfo().eventSource() == null) {
throw new IllegalArgumentException("packageInfo.eventSource must be set");
}
if (event.eventDomain() == null) {
throw new IllegalArgumentException("eventDomain must be set");
}
if (event.eventType() == null) {
throw new IllegalArgumentException("eventType must be set");
}
if (event.lifecycle() == null) {
throw new IllegalArgumentException("lifecycle must be set");
}
if (event.position() != null && (event.position().latitude() == null || event.position().longitude() == null)) {
throw new IllegalArgumentException("position latitude and longitude must both be set");
}
validateEventDetails(event);
}
private void validateEventDetails(EventHubEventDto event) {
if (event.eventDetails() == null) {
return;
}
String detailType = event.eventDetails().type();
if (event.eventDomain() == EventDomain.DRIVER_ACTIVITY && !"DRIVER_ACTIVITY".equals(detailType)) {
throw new IllegalArgumentException("DRIVER_ACTIVITY events must use eventDetails.type=DRIVER_ACTIVITY");
}
if (event.eventDomain() == EventDomain.BORDER_CROSSING && !"BORDER_CROSSING".equals(detailType)) {
throw new IllegalArgumentException("BORDER_CROSSING events must use eventDetails.type=BORDER_CROSSING");
}
if (event.eventDomain() == EventDomain.LOAD_UNLOAD && !"LOAD_UNLOAD".equals(detailType)) {
throw new IllegalArgumentException("LOAD_UNLOAD events must use eventDetails.type=LOAD_UNLOAD");
}
if (event.eventDomain() == EventDomain.SPEEDING && !"SPEEDING".equals(detailType)) {
throw new IllegalArgumentException("SPEEDING events must use eventDetails.type=SPEEDING");
}
}
}

View File

@ -0,0 +1,83 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventHubEventBatchDto;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventHubPackageResult;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.persistence.DataPackageRepository;
import at.procon.eventhub.persistence.EventRepository;
import at.procon.eventhub.persistence.EventSourceRepository;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class EventHubIngestionService {
private static final Logger log = LoggerFactory.getLogger(EventHubIngestionService.class);
private final EventSourceRepository eventSourceRepository;
private final DataPackageRepository dataPackageRepository;
private final EventRepository eventRepository;
private final EventHubEventValidator validator;
private final EventHubEventSorter sorter;
public EventHubIngestionService(
EventSourceRepository eventSourceRepository,
DataPackageRepository dataPackageRepository,
EventRepository eventRepository,
EventHubEventValidator validator,
EventHubEventSorter sorter
) {
this.eventSourceRepository = eventSourceRepository;
this.dataPackageRepository = dataPackageRepository;
this.eventRepository = eventRepository;
this.validator = validator;
this.sorter = sorter;
}
@Transactional
public EventHubPackageResult ingest(EventHubEventBatchDto batch) {
if (batch == null || batch.events().isEmpty()) {
return new EventHubPackageResult(null, batch == null ? null : batch.packageKey(), 0, 0);
}
EventHubPackageRequest packageInfo = batch.packageInfo();
if (packageInfo == null) {
packageInfo = batch.events().getFirst().packageInfo();
}
if (packageInfo == null || packageInfo.eventSource() == null) {
throw new IllegalArgumentException("packageInfo.eventSource must be set before ingestion");
}
EventSourceDto eventSource = packageInfo.eventSource();
int eventSourceId = eventSourceRepository.resolveSourceId(eventSource);
List<EventHubEventDto> sortedEvents = sorter.sort(batch.events());
sortedEvents.forEach(validator::validate);
UUID packageId = dataPackageRepository.createPackage(
eventSourceId,
batch.packageKey(),
packageInfo,
batch.packageType(),
batch.requestedFrom(),
batch.requestedTo(),
batch.metadata()
);
try {
int insertedCount = eventRepository.batchInsert(packageId, eventSourceId, sortedEvents);
dataPackageRepository.markImported(packageId, insertedCount);
log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={}",
packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount);
return new EventHubPackageResult(packageId, batch.packageKey(), sortedEvents.size(), insertedCount);
} catch (RuntimeException ex) {
dataPackageRepository.markFailed(packageId, ex.getMessage());
throw ex;
}
}
}

View File

@ -0,0 +1,23 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import org.springframework.stereotype.Component;
@Component
public class EventHubPackageKeyBuilder {
public String build(EventHubEventDto event) {
EventHubPackageRequest packageInfo = event.packageInfo();
if (packageInfo != null) {
return packageInfo.tenantKey()
+ ":" + packageInfo.eventSource().stableKey()
+ ":" + packageInfo.eventFamily()
+ ":" + (packageInfo.businessDate() == null ? "NO_DATE" : packageInfo.businessDate())
+ ":" + packageInfo.externalPackageId();
}
String day = event.occurredAt().toLocalDate().toString();
return "UNSPECIFIED_TENANT:UNSPECIFIED_SOURCE:" + event.eventDomain() + ":" + day;
}
}

View File

@ -0,0 +1,61 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventHubEventDto;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.springframework.stereotype.Component;
@Component
public class EventNaturalKeyService {
/**
* Canonical event key intentionally excludes EventSource. VU and driver-card
* acquisitions of the same real-world event can therefore converge later when
* storage/deduplication is implemented.
*
* Because incoming acquisition requests usually do not know internal master
* ids, this acquisition-stage key uses tenant + source-side master references.
* Late master-data resolution can later rebuild a stronger canonical key.
*/
public String buildCanonicalKeyHash(EventHubEventDto event) {
String naturalKey = String.join("|",
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
nullToEmpty(event.driverRef() == null ? null : event.driverRef().stableKey()),
nullToEmpty(event.vehicleRef() == null ? null : event.vehicleRef().stableKey()),
nullToEmpty(event.occurredAt()),
nullToEmpty(event.eventDomain()),
nullToEmpty(event.eventType()),
nullToEmpty(event.lifecycle())
);
return sha256Hex(naturalKey);
}
/** Source-record key includes the source id and external source event id. */
public String buildSourceRecordKeyHash(EventHubEventDto event, int eventSourceId) {
String sourceRecordKey = String.join("|",
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
String.valueOf(eventSourceId),
nullToEmpty(event.externalSourceEventId())
);
return sha256Hex(sourceRecordKey);
}
private String nullToEmpty(Object value) {
return value == null ? "" : String.valueOf(value);
}
private String sha256Hex(String value) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(value.getBytes(StandardCharsets.UTF_8));
StringBuilder result = new StringBuilder(hash.length * 2);
for (byte b : hash) {
result.append(String.format("%02x", b));
}
return result.toString();
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("SHA-256 is not available", e);
}
}
}

View File

@ -0,0 +1,73 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.source.TachographActivityDto;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.springframework.stereotype.Component;
@Component
public class TachographActivityEventMapper {
private final EventDetailsFactory detailsFactory;
public TachographActivityEventMapper(EventDetailsFactory detailsFactory) {
this.detailsFactory = detailsFactory;
}
public EventHubEventDto map(TachographActivityDto source) {
EventType activityType = source.activityType() == null ? EventType.UNKNOWN_ACTIVITY : source.activityType();
EventLifecycle lifecycle = source.lifecycle() == null ? EventLifecycle.SNAPSHOT : source.lifecycle();
String sourceKind = source.sourceKind() == null || source.sourceKind().isBlank()
? "VEHICLE_UNIT"
: source.sourceKind();
EventSourceDto eventSource = new EventSourceDto(
"TACHOGRAPH",
sourceKind,
"TACHOGRAPH_" + sourceKind,
source.sourceInstanceKey(),
source.tenantProviderSettingKey(),
null
);
LocalDate businessDate = source.occurredAt().toLocalDate();
EventHubPackageRequest packageInfo = new EventHubPackageRequest(
tenantOrDefault(source.tenantKey()),
eventSource,
EventDomain.DRIVER_ACTIVITY.name(),
businessDate,
businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
eventSource.stableKey() + ":DRIVER_ACTIVITY:" + businessDate
);
return new EventHubEventDto(
UUID.randomUUID(),
source.externalSourceEventId(),
source.driverRef(),
source.vehicleRef(),
source.occurredAt(),
source.receivedPartnerAt(),
OffsetDateTime.now(),
EventDomain.DRIVER_ACTIVITY,
activityType,
lifecycle,
null,
null,
detailsFactory.driverActivity(source.cardSlot(), source.cardStatus(), source.drivingStatus()),
detailsFactory.payloadFromMap(source.payload()),
false,
packageInfo
);
}
private String tenantOrDefault(String value) {
return value == null || value.isBlank() ? "default" : value.trim();
}
}

View File

@ -0,0 +1,70 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.dto.source.TelematicsPositionDto;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.springframework.stereotype.Component;
@Component
public class TelematicsPositionEventMapper {
private final EventDetailsFactory detailsFactory;
public TelematicsPositionEventMapper(EventDetailsFactory detailsFactory) {
this.detailsFactory = detailsFactory;
}
public EventHubEventDto map(TelematicsPositionDto source) {
String provider = source.providerKey() == null || source.providerKey().isBlank() ? "TELEMATICS" : source.providerKey();
String sourceKey = source.sourceKey() == null || source.sourceKey().isBlank() ? provider + "_POSITION" : source.sourceKey();
EventSourceDto eventSource = new EventSourceDto(
provider,
"TELEMATICS_PLATFORM",
sourceKey,
source.sourceInstanceKey(),
source.tenantProviderSettingKey(),
source.externalFleetKey()
);
LocalDate businessDate = source.occurredAt().toLocalDate();
EventHubPackageRequest packageInfo = new EventHubPackageRequest(
tenantOrDefault(source.tenantKey()),
eventSource,
EventDomain.POSITION.name(),
businessDate,
businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
eventSource.stableKey() + ":POSITION:" + businessDate
);
return new EventHubEventDto(
UUID.randomUUID(),
source.externalSourceEventId(),
source.driverRef(),
source.vehicleRef(),
source.occurredAt(),
source.receivedPartnerAt(),
OffsetDateTime.now(),
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT,
source.odometerM(),
new GeoPointDto(source.latitude(), source.longitude()),
detailsFactory.position(source.positionReason()),
detailsFactory.payloadFromMap(source.payload()),
false,
packageInfo
);
}
private String tenantOrDefault(String value) {
return value == null || value.isBlank() ? "default" : value.trim();
}
}

View File

@ -0,0 +1,128 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.CardSlot;
import at.procon.eventhub.dto.CardStatus;
import at.procon.eventhub.dto.DrivingStatus;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.dto.source.YellowFoxD8BookingDto;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.springframework.stereotype.Component;
@Component
public class YellowFoxD8BookingEventMapper {
private final EventDetailsFactory detailsFactory;
public YellowFoxD8BookingEventMapper(EventDetailsFactory detailsFactory) {
this.detailsFactory = detailsFactory;
}
public EventHubEventDto map(YellowFoxD8BookingDto source) {
NormalizedEvent normalized = normalize(source.eventType(), source.state());
Map<String, Object> payload = new LinkedHashMap<>();
if (source.payload() != null) {
payload.putAll(source.payload());
}
payload.put("provider", "YELLOWFOX");
payload.put("sourceKind", "TELEMATICS_PLATFORM");
payload.put("yellowFoxEventType", source.eventType());
payload.put("yellowFoxState", source.state());
payload.put("yellowFoxKey", source.key());
EventSourceDto eventSource = new EventSourceDto(
"YELLOWFOX",
"TELEMATICS_PLATFORM",
"YELLOWFOX_D8",
source.sourceInstanceKey(),
source.tenantProviderSettingKey(),
source.externalFleetKey()
);
LocalDate businessDate = source.occurredAt().toLocalDate();
EventHubPackageRequest packageInfo = new EventHubPackageRequest(
tenantOrDefault(source.tenantKey()),
eventSource,
normalized.domain().name(),
businessDate,
businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
eventSource.stableKey() + ":" + normalized.domain().name() + ":" + businessDate
);
return new EventHubEventDto(
UUID.randomUUID(),
source.eventId(),
source.driverRef(),
source.vehicleRef(),
source.occurredAt(),
source.receivedPartnerAt(),
OffsetDateTime.now(),
normalized.domain(),
normalized.type(),
normalized.lifecycle(),
source.odometerM(),
new GeoPointDto(source.latitude(), source.longitude()),
detailsFor(normalized),
detailsFactory.payloadFromMap(payload),
false,
packageInfo
);
}
private String tenantOrDefault(String value) {
return value == null || value.isBlank() ? "default" : value.trim();
}
private at.procon.eventhub.dto.EventDetailsDto detailsFor(NormalizedEvent normalized) {
if (normalized.domain() == EventDomain.DRIVER_ACTIVITY) {
return detailsFactory.driverActivity(CardSlot.DRIVER, CardStatus.INSERTED, DrivingStatus.UNKNOWN);
}
if (normalized.domain() == EventDomain.DRIVER_CARD) {
CardStatus status = normalized.lifecycle() == EventLifecycle.INSERT ? CardStatus.INSERTED : CardStatus.NOT_INSERTED;
return detailsFactory.driverCard(CardSlot.DRIVER, status);
}
return null;
}
private NormalizedEvent normalize(Integer eventType, Integer state) {
if (eventType == null || state == null) {
return new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT);
}
return switch (eventType) {
case 0, 1 -> normalizeCardActivity(state);
case 2, 3 -> normalizeDriverActivity(state);
default -> new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT);
};
}
private NormalizedEvent normalizeCardActivity(Integer state) {
return switch (state) {
case 0 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW);
case 1 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT);
default -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT);
};
}
private NormalizedEvent normalizeDriverActivity(Integer state) {
return switch (state) {
case 0 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.BREAK_REST, EventLifecycle.SNAPSHOT);
case 1 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.AVAILABILITY, EventLifecycle.SNAPSHOT);
case 2 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.WORK, EventLifecycle.SNAPSHOT);
case 3 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.SNAPSHOT);
default -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.UNKNOWN_ACTIVITY, EventLifecycle.SNAPSHOT);
};
}
private record NormalizedEvent(EventDomain domain, EventType type, EventLifecycle lifecycle) {
}
}

View File

@ -0,0 +1,31 @@
spring:
application:
name: eventhub-ingestion-service
datasource:
url: jdbc:postgresql://localhost:5432/eventhub
username: eventhub
password: eventhub
flyway:
enabled: true
default-schema: eventhub
schemas: eventhub
create-schemas: true
server:
port: 8080
camel:
springboot:
name: eventhub-ingestion-camel
main-run-controller: true
management:
endpoints:
web:
exposure:
include: health,info,metrics,camelroutes
eventhub:
batch:
completion-size: 1000
completion-timeout: 5s

View File

@ -0,0 +1,127 @@
create extension if not exists pgcrypto;
create schema if not exists eventhub;
-- Acquisition source definition. This represents where the imported source
-- record came from, not necessarily the canonical identity of the real-world event.
create table if not exists eventhub.event_source (
id integer generated always as identity primary key,
provider_key text not null,
source_kind text not null,
source_key text not null,
source_instance_key text not null default 'default',
tenant_provider_setting_key text,
external_fleet_key text,
created_at timestamptz not null default now(),
constraint ux_event_source unique (provider_key, source_kind, source_key, source_instance_key)
);
-- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/2026-04-28.
-- Final canonical storage can be discussed later; this table is still useful for acquisition audit.
create table if not exists eventhub.data_package (
id uuid primary key,
event_source_id integer not null references eventhub.event_source(id),
tenant_key text not null,
package_key text not null,
package_type text not null,
status text not null,
event_family text,
business_date date,
external_package_id text,
requested_from timestamptz,
requested_to timestamptz,
received_at timestamptz not null default now(),
completed_at timestamptz,
event_count integer not null default 0,
metadata jsonb not null default '{}'::jsonb,
error_message text,
constraint ux_data_package_external unique (tenant_key, event_source_id, external_package_id, received_at)
);
-- Temporary acquisition-stage point-event store.
-- It keeps the discussed DTO shape: EventSource context, externalSourceEventId,
-- one occurredAt timestamp, source-side driver/vehicle refs, normalized event details,
-- and raw JSON payload. It intentionally has no internal driver_id/vehicle_id in the
-- incoming model; master-data resolution can be added later.
create table if not exists eventhub.acquired_event (
id uuid not null,
event_source_id integer not null references eventhub.event_source(id),
data_package_id uuid not null references eventhub.data_package(id),
external_source_event_id text not null,
driver_source_entity_id text,
driver_card_nation text,
driver_card_number text,
vehicle_source_entity_id text,
vehicle_vin text,
vehicle_registration_nation text,
vehicle_registration_number text,
occurred_at timestamptz not null,
received_partner_at timestamptz,
received_hub_at timestamptz not null default now(),
event_domain text not null,
event_type text not null,
lifecycle text not null,
odometer_m bigint,
latitude numeric(10, 7),
longitude numeric(10, 7),
event_details jsonb not null default '{}'::jsonb,
payload jsonb not null default '{}'::jsonb,
manual_entry boolean not null default false,
-- Excludes EventSource: useful later for canonical event deduplication.
canonical_key_hash text not null,
-- Includes tenant + EventSource + externalSourceEventId: prevents duplicate imports of the same source record.
source_record_key_hash text not null,
created_at timestamptz not null default now(),
constraint pk_acquired_event primary key (occurred_at, id),
constraint chk_acquired_event_driver_or_vehicle_ref check (
driver_source_entity_id is not null
or driver_card_number is not null
or vehicle_source_entity_id is not null
or vehicle_vin is not null
or vehicle_registration_number is not null
),
constraint chk_acquired_event_position_pair check ((latitude is null and longitude is null) or (latitude is not null and longitude is not null)),
constraint chk_driver_card_nation_when_number check (driver_card_number is null or driver_card_nation is not null),
constraint chk_vehicle_registration_nation_when_number check (vehicle_registration_number is null or vehicle_registration_nation is not null)
);
create unique index if not exists ux_acquired_event_source_record
on eventhub.acquired_event(source_record_key_hash);
create index if not exists idx_acquired_event_canonical_key
on eventhub.acquired_event(canonical_key_hash);
create index if not exists idx_acquired_event_vehicle_vin_time
on eventhub.acquired_event(vehicle_vin, occurred_at desc)
where vehicle_vin is not null;
create index if not exists idx_acquired_event_vehicle_registration_time
on eventhub.acquired_event(vehicle_registration_nation, vehicle_registration_number, occurred_at desc)
where vehicle_registration_number is not null;
create index if not exists idx_acquired_event_driver_card_time
on eventhub.acquired_event(driver_card_nation, driver_card_number, occurred_at desc)
where driver_card_number is not null;
create index if not exists idx_acquired_event_domain_type_time
on eventhub.acquired_event(event_domain, event_type, occurred_at desc);
create index if not exists idx_acquired_event_details_gin
on eventhub.acquired_event using gin(event_details);
create index if not exists idx_acquired_event_payload_gin
on eventhub.acquired_event using gin(payload);
create index if not exists idx_data_package_source_time
on eventhub.data_package(tenant_key, event_source_id, received_at desc);

View File

@ -0,0 +1,61 @@
package at.procon.eventhub;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.dto.source.YellowFoxD8BookingDto;
import at.procon.eventhub.service.EventDetailsFactory;
import at.procon.eventhub.service.YellowFoxD8BookingEventMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class YellowFoxD8BookingEventMapperTest {
private final YellowFoxD8BookingEventMapper mapper = new YellowFoxD8BookingEventMapper(
new EventDetailsFactory(new ObjectMapper())
);
@Test
void mapsYellowFoxDrivingStateAsSnapshotWithEventSourceAndDetails() {
YellowFoxD8BookingDto source = new YellowFoxD8BookingDto(
"tenant-1",
"yellowfox-tenant-7",
"tenant-yellowfox-7",
"7",
"event-1",
"key-1",
2,
3,
OffsetDateTime.parse("2026-04-29T08:15:00+02:00"),
null,
new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")),
new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345")),
null,
null,
null,
null
);
var result = mapper.map(source);
assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_ACTIVITY);
assertThat(result.eventType()).isEqualTo(EventType.DRIVE);
assertThat(result.lifecycle()).isEqualTo(EventLifecycle.SNAPSHOT);
assertThat(result.externalSourceEventId()).isEqualTo("event-1");
assertThat(result.packageInfo().tenantKey()).isEqualTo("tenant-1");
assertThat(result.packageInfo().eventSource().providerKey()).isEqualTo("YELLOWFOX");
assertThat(result.packageInfo().eventSource().sourceKey()).isEqualTo("YELLOWFOX_D8");
assertThat(result.driverRef().driverCard().nation()).isEqualTo("AT");
assertThat(result.vehicleRef().vehicleRegistration().nation()).isEqualTo("AT");
assertThat(result.eventDetails().type()).isEqualTo("DRIVER_ACTIVITY");
assertThat(result.payload().get("yellowFoxEventType").asInt()).isEqualTo(2);
assertThat(result.payload().get("yellowFoxState").asInt()).isEqualTo(3);
}
}