commit 0a0e2dc61539d4bbeb5d56abe516403b41979824 Author: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Thu Apr 30 11:01:01 2026 +0200 Initial eventhub ingestion service diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8640df6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +target/ +.idea/ +*.iml +.classpath +.project +.settings/ +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..0c3a4e6 --- /dev/null +++ b/README.md @@ -0,0 +1,372 @@ +# EventHub Acquisition Service + +Spring Boot + Apache Camel project skeleton for acquiring normalized EventHub point events from multiple providers/sources. + +The current version intentionally focuses on **acquisition**. Final canonical storage/deduplication can be discussed later. The included PostgreSQL schema is a small acquisition-stage store so the project can be run and tested end-to-end. + +## Architecture + +```text +source-specific Camel input route + -> source-specific mapper + -> EventHubEventDto + -> common EventHub acquisition route + -> validation + -> package-key creation from tenant + EventSource + event family + date/window + -> aggregation / batching + -> chronological sorting inside the batch + -> acquisition package handoff +``` + +## Namespace + +```text +at.procon.eventhub +``` + +## Main model decisions + +### 1. One event = one time point + +`EventHubEventDto` has exactly one timestamp: + +```text +occurredAt +``` + +There is no generic `duration`, `endTime`, `validFrom`, or `validTo`. If a source row represents an interval, the mapper may emit separate point events, for example `DRIVE START` and `DRIVE END`. + +### 2. Tenant is package-level + +`tenantKey` identifies the owner/client/account for the package. It is required for acquisition grouping and future master-data resolution. + +```json +{ + "tenantKey": "kralowetz" +} +``` + +Organisation is not mandatory in the incoming event. It can later be derived from resolved driver/vehicle + `occurredAt`. + +### 3. EventSource replaces sourceTable/sourceSystem + +The acquisition context is represented by `EventSourceDto`: + +```json +{ + "providerKey": "TACHOGRAPH", + "sourceKind": "VEHICLE_UNIT", + "sourceKey": "TACHOGRAPH_VEHICLE_UNIT", + "sourceInstanceKey": "main-tachograph-db", + "tenantProviderSettingKey": "kralowetz-tachograph-prod", + "externalFleetKey": null +} +``` + +Examples: + +```text +TACHOGRAPH / VEHICLE_UNIT +TACHOGRAPH / DRIVER_CARD +YELLOWFOX / TELEMATICS_PLATFORM / YELLOWFOX_D8 +FLEETBOARD / TELEMATICS_PLATFORM / FLEETBOARD_POSITION +``` + +`EventSource` is acquisition context. It should not be part of the canonical real-world event identity. A VU event and a driver-card event may describe the same real event. + +### 4. Source-side master references, no incoming internal IDs + +The incoming DTO does not require internal `driverId` or `vehicleId`, because in normal ingestion those ids are not known yet. + +Driver reference: + +```json +"driverRef": { + "sourceEntityId": "driver-100", + "driverCard": { + "nation": "AT", + "number": "D123456789" + } +} +``` + +Vehicle reference: + +```json +"vehicleRef": { + "sourceEntityId": "vehicle-200", + "vin": "WDB9634031L123456", + "vehicleRegistration": { + "nation": "AT", + "number": "W-12345" + } +} +``` + +VIN is optional. Driver-card-only events can carry only the nation-scoped VRN/registration: + +```json +"vehicleRef": { + "sourceEntityId": null, + "vin": null, + "vehicleRegistration": { + "nation": "AT", + "number": "W-12345" + } +} +``` + +This allows late resolution when VU/master data later connects the VRN to a VIN. + +### 5. Generic normalized eventDetails + +Reusable event-specific properties are stored in: + +```json +"eventDetails": { + "type": "DRIVER_ACTIVITY", + "attributes": { + "cardSlot": "DRIVER", + "cardStatus": "INSERTED", + "drivingStatus": "SINGLE" + } +} +``` + +Raw provider values stay in `payload`: + +```json +"payload": { + "raw": { + "cardSlot": 0, + "cardStatus": 0, + "drivingStatus": 0 + } +} +``` + +This keeps the acquisition DTO generic while preserving meaningful normalized fields. + +## Package-level acquisition request + +For external/manual ingestion, the preferred request shape is: + +```json +{ + "package": { + "tenantKey": "kralowetz", + "eventSource": { + "providerKey": "TACHOGRAPH", + "sourceKind": "VEHICLE_UNIT", + "sourceKey": "TACHOGRAPH_VEHICLE_UNIT", + "sourceInstanceKey": "main-tachograph-db", + "tenantProviderSettingKey": "kralowetz-tachograph-prod" + }, + "eventFamily": "DRIVER_ACTIVITY", + "businessDate": "2026-04-28", + "requestedFrom": "2026-04-28T00:00:00+02:00", + "requestedTo": "2026-04-29T00:00:00+02:00", + "externalPackageId": "TACHOGRAPH:VEHICLE_UNIT:DRIVER_ACTIVITY:2026-04-28" + }, + "events": [ + { + "externalSourceEventId": "TACHOGRAPH:VEHICLE_UNIT:activity:456:start", + "driverRef": { + "sourceEntityId": "driver-100", + "driverCard": { + "nation": "AT", + "number": "D123456789" + } + }, + "vehicleRef": { + "sourceEntityId": "vehicle-200", + "vin": "WDB9634031L123456", + "vehicleRegistration": { + "nation": "AT", + "number": "W-12345" + } + }, + "occurredAt": "2026-04-28T08:00:00+02:00", + "eventDomain": "DRIVER_ACTIVITY", + "eventType": "DRIVE", + "lifecycle": "START", + "eventDetails": { + "type": "DRIVER_ACTIVITY", + "attributes": { + "cardSlot": "DRIVER", + "cardStatus": "INSERTED", + "drivingStatus": "SINGLE" + } + }, + "payload": { + "raw": { + "activity": 3, + "cardSlot": 0, + "cardStatus": 0, + "drivingStatus": 0 + } + } + } + ] +} +``` + +## Routes + +### Source-specific input routes + +```text +direct:yellowfox-d8-booking-input +direct:telematics-position-input +direct:tachograph-activity-input +direct:eventhub-package-input +direct:eventhub-manual-input +``` + +### Common route + +```text +direct:eventhub-normalized-input + -> validate EventHubEventDto + -> create package key from tenant + EventSource/package context + -> seda:eventhub-batch-input + -> aggregate by eventhub.packageKey + -> sort by occurredAt inside the batch + -> EventHubIngestionService.ingest(...) +``` + +## REST endpoints + +```text +POST /api/eventhub/acquisition/yellowfox/d8-bookings +POST /api/eventhub/acquisition/telematics/positions +POST /api/eventhub/acquisition/tachograph/activities +POST /api/eventhub/acquisition/packages +POST /api/eventhub/acquisition/events +``` + +## Example: tachograph driver-card activity with VRN only + +```bash +curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activities \ + -H "Content-Type: application/json" \ + -d '[ + { + "tenantKey": "kralowetz", + "sourceKind": "DRIVER_CARD", + "sourceInstanceKey": "main-tachograph-db", + "tenantProviderSettingKey": "kralowetz-tachograph-prod", + "externalSourceEventId": "TACHOGRAPH:DRIVER_CARD:activity:789:start", + "driverRef": { + "sourceEntityId": "driver-100", + "driverCard": { + "nation": "AT", + "number": "D123456789" + } + }, + "vehicleRef": { + "sourceEntityId": null, + "vin": null, + "vehicleRegistration": { + "nation": "AT", + "number": "W-12345" + } + }, + "occurredAt": "2026-04-28T08:00:00+02:00", + "activityType": "DRIVE", + "lifecycle": "START", + "cardSlot": "DRIVER", + "cardStatus": "INSERTED", + "drivingStatus": "SINGLE", + "payload": { + "raw": { + "activity": 3, + "cardSlot": 0, + "cardStatus": 0, + "drivingStatus": 0 + } + } + } + ]' +``` + +The mapper creates: + +```text +Tenant = kralowetz +EventSource = TACHOGRAPH / DRIVER_CARD / TACHOGRAPH_DRIVER_CARD +EventDomain = DRIVER_ACTIVITY +EventType = DRIVE +Lifecycle = START +EventDetails.type = DRIVER_ACTIVITY +VehicleRef = VRN-only, VIN can be resolved later +``` + +## Start PostgreSQL + +```bash +docker compose up -d +``` + +## Run the service + +```bash +mvn spring-boot:run +``` + +## Check acquisition packages + +```sql +select p.received_at, + p.tenant_key, + s.provider_key, + s.source_kind, + s.source_key, + p.event_family, + p.business_date, + p.status, + p.event_count +from eventhub.data_package p +join eventhub.event_source s on s.id = p.event_source_id +order by p.received_at desc; +``` + +## Check acquired events + +```sql +select occurred_at, + driver_source_entity_id, + driver_card_nation, + driver_card_number, + vehicle_source_entity_id, + vehicle_vin, + vehicle_registration_nation, + vehicle_registration_number, + event_domain, + event_type, + lifecycle, + event_details, + payload +from eventhub.acquired_event +order by occurred_at desc; +``` + +## Next implementation steps + +1. Add source-specific SQL extraction routes for the tachograph DB event families: + - activities from CardActivity/VUActivity + - card insert/withdraw from CardVehiclesUsed/IWCycle + - positions from places/GNSS/border/load-unload sources + - border crossings + - load/unload + - specific conditions: out-of-scope and ferry/train + - speeding events +2. Keep each extractor package-scoped by `tenant + EventSource + eventFamily + businessDate/import window`. +3. Add master-data resolution later: + - driver by tenant + driver card nation/number + occurredAt + - vehicle by tenant + VIN or tenant + registration nation/number + occurredAt + - late resolution from VRN-only driver-card events to VIN after VU/master data import +4. Discuss final storage model: + - canonical `eventhub.event` + - source-record table linked to EventSource/package + - deduplication policy for VU vs driver-card duplicates diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..27ab309 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,20 @@ +services: + postgres: + image: postgres:16 + container_name: eventhub-postgres + environment: + POSTGRES_DB: eventhub + POSTGRES_USER: eventhub + POSTGRES_PASSWORD: eventhub + ports: + - "5432:5432" + volumes: + - eventhub-postgres-data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U eventhub -d eventhub"] + interval: 5s + timeout: 5s + retries: 20 + +volumes: + eventhub-postgres-data: diff --git a/docs/timescale/V2__enable_timescale_hypertable_optional.sql b/docs/timescale/V2__enable_timescale_hypertable_optional.sql new file mode 100644 index 0000000..7fc49ec --- /dev/null +++ b/docs/timescale/V2__enable_timescale_hypertable_optional.sql @@ -0,0 +1,10 @@ +-- Optional TimescaleDB migration for the acquisition-stage event table. +-- Enable only when the target PostgreSQL instance has TimescaleDB installed. +create extension if not exists timescaledb; + +select create_hypertable( + 'eventhub.acquired_event', + 'occurred_at', + if_not_exists => true, + migrate_data => true +); diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..4cb53e2 --- /dev/null +++ b/pom.xml @@ -0,0 +1,101 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 4.0.6 + + + + at.procon.eventhub + eventhub-ingestion-service + 0.1.0-SNAPSHOT + eventhub-ingestion-service + Spring Boot + Apache Camel EventHub ingestion service + + + 21 + 4.18.2 + + + + + + org.apache.camel.springboot + camel-spring-boot-bom + ${camel.version} + pom + import + + + + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-jdbc + + + org.springframework.boot + spring-boot-starter-validation + + + org.springframework.boot + spring-boot-starter-webmvc + + + + org.apache.camel.springboot + camel-spring-boot-starter + + + org.apache.camel.springboot + camel-direct-starter + + + org.apache.camel.springboot + camel-seda-starter + + + org.apache.camel.springboot + camel-jackson-starter + + + + org.flywaydb + flyway-core + + + org.flywaydb + flyway-database-postgresql + + + org.postgresql + postgresql + runtime + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/src/main/java/at/procon/eventhub/EventHubIngestionApplication.java b/src/main/java/at/procon/eventhub/EventHubIngestionApplication.java new file mode 100644 index 0000000..62bbeda --- /dev/null +++ b/src/main/java/at/procon/eventhub/EventHubIngestionApplication.java @@ -0,0 +1,15 @@ +package at.procon.eventhub; + +import at.procon.eventhub.config.EventHubProperties; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +@SpringBootApplication +@EnableConfigurationProperties(EventHubProperties.class) +public class EventHubIngestionApplication { + + public static void main(String[] args) { + SpringApplication.run(EventHubIngestionApplication.class, args); + } +} diff --git a/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java b/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java new file mode 100644 index 0000000..ed33565 --- /dev/null +++ b/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java @@ -0,0 +1,65 @@ +package at.procon.eventhub.api; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageIngestRequest; +import at.procon.eventhub.dto.source.TachographActivityDto; +import at.procon.eventhub.dto.source.TelematicsPositionDto; +import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; +import jakarta.validation.Valid; +import java.util.List; +import java.util.Map; +import org.apache.camel.ProducerTemplate; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/eventhub/acquisition") +public class EventHubIngestionController { + + private final ProducerTemplate producerTemplate; + + public EventHubIngestionController(ProducerTemplate producerTemplate) { + this.producerTemplate = producerTemplate; + } + + @PostMapping("/yellowfox/d8-bookings") + public ResponseEntity> ingestYellowFoxD8Bookings(@RequestBody List bookings) { + producerTemplate.sendBody("direct:yellowfox-d8-booking-input", bookings); + return accepted(bookings.size(), "direct:yellowfox-d8-booking-input"); + } + + @PostMapping("/telematics/positions") + public ResponseEntity> ingestTelematicsPositions(@RequestBody List positions) { + producerTemplate.sendBody("direct:telematics-position-input", positions); + return accepted(positions.size(), "direct:telematics-position-input"); + } + + @PostMapping("/tachograph/activities") + public ResponseEntity> ingestTachographActivities(@RequestBody List activities) { + producerTemplate.sendBody("direct:tachograph-activity-input", activities); + return accepted(activities.size(), "direct:tachograph-activity-input"); + } + + @PostMapping("/packages") + public ResponseEntity> ingestPackage(@Valid @RequestBody EventHubPackageIngestRequest request) { + producerTemplate.sendBody("direct:eventhub-package-input", request); + return accepted(request.events().size(), "direct:eventhub-package-input"); + } + + @PostMapping("/events") + public ResponseEntity> ingestNormalizedEvents(@RequestBody List events) { + producerTemplate.sendBody("direct:eventhub-manual-input", events); + return accepted(events.size(), "direct:eventhub-manual-input"); + } + + private ResponseEntity> accepted(int count, String route) { + return ResponseEntity.accepted().body(Map.of( + "accepted", count, + "route", route, + "note", "Events are accepted into Camel acquisition routes, grouped by EventSource/package context, sorted in each batch, and then handed to the current ingestion adapter." + )); + } +} diff --git a/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java b/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java new file mode 100644 index 0000000..a32e492 --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java @@ -0,0 +1,66 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.dto.DataPackageType; +import at.procon.eventhub.dto.EventHubEventBatchDto; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.service.EventHubEventSorter; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.springframework.stereotype.Component; + +@Component +public class EventHubBatchBuildProcessor implements Processor { + + private final EventHubEventSorter sorter; + + public EventHubBatchBuildProcessor(EventHubEventSorter sorter) { + this.sorter = sorter; + } + + @Override + public void process(Exchange exchange) { + @SuppressWarnings("unchecked") + List events = exchange.getMessage().getBody(List.class); + List sortedEvents = sorter.sort(events); + + String packageKey = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_KEY, String.class); + EventHubPackageRequest packageInfo = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_INFO, EventHubPackageRequest.class); + if (packageInfo == null && !sortedEvents.isEmpty()) { + packageInfo = sortedEvents.getFirst().packageInfo(); + } + + OffsetDateTime requestedFrom = packageInfo != null && packageInfo.requestedFrom() != null + ? packageInfo.requestedFrom() + : sortedEvents.getFirst().occurredAt(); + OffsetDateTime requestedTo = packageInfo != null && packageInfo.requestedTo() != null + ? packageInfo.requestedTo() + : sortedEvents.getLast().occurredAt(); + + Map metadata = new HashMap<>(); + metadata.put("camelRouteId", exchange.getFromRouteId()); + metadata.put("packageKey", packageKey); + metadata.put("eventCount", sortedEvents.size()); + if (packageInfo != null) { + metadata.put("tenantKey", packageInfo.tenantKey()); + metadata.put("eventSource", packageInfo.eventSource().stableKey()); + metadata.put("eventFamily", packageInfo.eventFamily()); + metadata.put("businessDate", packageInfo.businessDate()); + metadata.put("externalPackageId", packageInfo.externalPackageId()); + } + + exchange.getMessage().setBody(new EventHubEventBatchDto( + packageKey, + packageInfo, + DataPackageType.CAMEL_BATCH, + requestedFrom, + requestedTo, + sortedEvents, + metadata + )); + } +} diff --git a/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java new file mode 100644 index 0000000..4bdaeab --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java @@ -0,0 +1,51 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.service.EventHubIngestionService; +import org.apache.camel.builder.RouteBuilder; +import org.springframework.stereotype.Component; + +@Component +public class EventHubCommonIngestionRoute extends RouteBuilder { + + private final EventHubProperties properties; + private final EventHubEventValidationProcessor validationProcessor; + private final EventHubPackageKeyProcessor packageKeyProcessor; + private final EventHubEventAggregationStrategy aggregationStrategy; + private final EventHubBatchBuildProcessor batchBuildProcessor; + private final EventHubIngestionService ingestionService; + + public EventHubCommonIngestionRoute( + EventHubProperties properties, + EventHubEventValidationProcessor validationProcessor, + EventHubPackageKeyProcessor packageKeyProcessor, + EventHubEventAggregationStrategy aggregationStrategy, + EventHubBatchBuildProcessor batchBuildProcessor, + EventHubIngestionService ingestionService + ) { + this.properties = properties; + this.validationProcessor = validationProcessor; + this.packageKeyProcessor = packageKeyProcessor; + this.aggregationStrategy = aggregationStrategy; + this.batchBuildProcessor = batchBuildProcessor; + this.ingestionService = ingestionService; + } + + @Override + public void configure() { + from("direct:eventhub-normalized-input") + .routeId("eventhub-normalized-input-route") + .process(validationProcessor) + .process(packageKeyProcessor) + .to("seda:eventhub-batch-input"); + + from("seda:eventhub-batch-input") + .routeId("eventhub-batch-and-persist-route") + .aggregate(header(EventHubHeaders.PACKAGE_KEY), aggregationStrategy) + .completionSize(properties.getBatch().getCompletionSize()) + .completionTimeout(properties.getBatch().getCompletionTimeout().toMillis()) + .forceCompletionOnStop() + .process(batchBuildProcessor) + .bean(ingestionService, "ingest"); + } +} diff --git a/src/main/java/at/procon/eventhub/camel/EventHubEventAggregationStrategy.java b/src/main/java/at/procon/eventhub/camel/EventHubEventAggregationStrategy.java new file mode 100644 index 0000000..528982b --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/EventHubEventAggregationStrategy.java @@ -0,0 +1,28 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.dto.EventHubEventDto; +import java.util.ArrayList; +import java.util.List; +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; +import org.springframework.stereotype.Component; + +@Component +public class EventHubEventAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + EventHubEventDto event = newExchange.getMessage().getBody(EventHubEventDto.class); + if (oldExchange == null) { + List events = new ArrayList<>(); + events.add(event); + newExchange.getMessage().setBody(events); + return newExchange; + } + + @SuppressWarnings("unchecked") + List events = oldExchange.getMessage().getBody(List.class); + events.add(event); + return oldExchange; + } +} diff --git a/src/main/java/at/procon/eventhub/camel/EventHubEventValidationProcessor.java b/src/main/java/at/procon/eventhub/camel/EventHubEventValidationProcessor.java new file mode 100644 index 0000000..8072051 --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/EventHubEventValidationProcessor.java @@ -0,0 +1,23 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.service.EventHubEventValidator; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.springframework.stereotype.Component; + +@Component +public class EventHubEventValidationProcessor implements Processor { + + private final EventHubEventValidator validator; + + public EventHubEventValidationProcessor(EventHubEventValidator validator) { + this.validator = validator; + } + + @Override + public void process(Exchange exchange) { + EventHubEventDto event = exchange.getMessage().getBody(EventHubEventDto.class); + validator.validate(event); + } +} diff --git a/src/main/java/at/procon/eventhub/camel/EventHubHeaders.java b/src/main/java/at/procon/eventhub/camel/EventHubHeaders.java new file mode 100644 index 0000000..88056aa --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/EventHubHeaders.java @@ -0,0 +1,10 @@ +package at.procon.eventhub.camel; + +public final class EventHubHeaders { + + public static final String PACKAGE_KEY = "eventhub.packageKey"; + public static final String PACKAGE_INFO = "eventhub.packageInfo"; + + private EventHubHeaders() { + } +} diff --git a/src/main/java/at/procon/eventhub/camel/EventHubPackageKeyProcessor.java b/src/main/java/at/procon/eventhub/camel/EventHubPackageKeyProcessor.java new file mode 100644 index 0000000..d317596 --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/EventHubPackageKeyProcessor.java @@ -0,0 +1,28 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.service.EventHubPackageKeyBuilder; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.springframework.stereotype.Component; + +@Component +public class EventHubPackageKeyProcessor implements Processor { + + private final EventHubPackageKeyBuilder packageKeyBuilder; + + public EventHubPackageKeyProcessor(EventHubPackageKeyBuilder packageKeyBuilder) { + this.packageKeyBuilder = packageKeyBuilder; + } + + @Override + public void process(Exchange exchange) { + EventHubEventDto event = exchange.getMessage().getBody(EventHubEventDto.class); + if (exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_KEY) == null) { + exchange.getMessage().setHeader(EventHubHeaders.PACKAGE_KEY, packageKeyBuilder.build(event)); + } + if (exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_INFO) == null && event.packageInfo() != null) { + exchange.getMessage().setHeader(EventHubHeaders.PACKAGE_INFO, event.packageInfo()); + } + } +} diff --git a/src/main/java/at/procon/eventhub/camel/ManualEventHubInputRoute.java b/src/main/java/at/procon/eventhub/camel/ManualEventHubInputRoute.java new file mode 100644 index 0000000..7e851c0 --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/ManualEventHubInputRoute.java @@ -0,0 +1,36 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageIngestRequest; +import at.procon.eventhub.dto.EventHubPackageRequest; +import org.apache.camel.builder.RouteBuilder; +import org.springframework.stereotype.Component; + +@Component +public class ManualEventHubInputRoute extends RouteBuilder { + + @Override + public void configure() { + from("direct:eventhub-package-input") + .routeId("eventhub-package-input-route") + .process(exchange -> { + EventHubPackageIngestRequest request = exchange.getMessage().getBody(EventHubPackageIngestRequest.class); + exchange.getMessage().setHeader(EventHubHeaders.PACKAGE_INFO, request.packageInfo()); + exchange.getMessage().setBody(request.events()); + }) + .split(body()) + .process(exchange -> { + EventHubEventDto event = exchange.getMessage().getBody(EventHubEventDto.class); + EventHubPackageRequest packageInfo = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_INFO, EventHubPackageRequest.class); + exchange.getMessage().setBody(event.withPackageInfo(packageInfo)); + }) + .to("direct:eventhub-normalized-input") + .end(); + + from("direct:eventhub-manual-input") + .routeId("eventhub-manual-input-route") + .split(body()) + .to("direct:eventhub-normalized-input") + .end(); + } +} diff --git a/src/main/java/at/procon/eventhub/camel/TachographActivityInputRoute.java b/src/main/java/at/procon/eventhub/camel/TachographActivityInputRoute.java new file mode 100644 index 0000000..732e775 --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/TachographActivityInputRoute.java @@ -0,0 +1,25 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.service.TachographActivityEventMapper; +import org.apache.camel.builder.RouteBuilder; +import org.springframework.stereotype.Component; + +@Component +public class TachographActivityInputRoute extends RouteBuilder { + + private final TachographActivityEventMapper mapper; + + public TachographActivityInputRoute(TachographActivityEventMapper mapper) { + this.mapper = mapper; + } + + @Override + public void configure() { + from("direct:tachograph-activity-input") + .routeId("tachograph-activity-input-route") + .split(body()) + .bean(mapper, "map") + .to("direct:eventhub-normalized-input") + .end(); + } +} diff --git a/src/main/java/at/procon/eventhub/camel/TelematicsPositionInputRoute.java b/src/main/java/at/procon/eventhub/camel/TelematicsPositionInputRoute.java new file mode 100644 index 0000000..4dc2bb7 --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/TelematicsPositionInputRoute.java @@ -0,0 +1,25 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.service.TelematicsPositionEventMapper; +import org.apache.camel.builder.RouteBuilder; +import org.springframework.stereotype.Component; + +@Component +public class TelematicsPositionInputRoute extends RouteBuilder { + + private final TelematicsPositionEventMapper mapper; + + public TelematicsPositionInputRoute(TelematicsPositionEventMapper mapper) { + this.mapper = mapper; + } + + @Override + public void configure() { + from("direct:telematics-position-input") + .routeId("telematics-position-input-route") + .split(body()) + .bean(mapper, "map") + .to("direct:eventhub-normalized-input") + .end(); + } +} diff --git a/src/main/java/at/procon/eventhub/camel/YellowFoxD8BookingInputRoute.java b/src/main/java/at/procon/eventhub/camel/YellowFoxD8BookingInputRoute.java new file mode 100644 index 0000000..11c495a --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/YellowFoxD8BookingInputRoute.java @@ -0,0 +1,25 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.service.YellowFoxD8BookingEventMapper; +import org.apache.camel.builder.RouteBuilder; +import org.springframework.stereotype.Component; + +@Component +public class YellowFoxD8BookingInputRoute extends RouteBuilder { + + private final YellowFoxD8BookingEventMapper mapper; + + public YellowFoxD8BookingInputRoute(YellowFoxD8BookingEventMapper mapper) { + this.mapper = mapper; + } + + @Override + public void configure() { + from("direct:yellowfox-d8-booking-input") + .routeId("yellowfox-d8-booking-input-route") + .split(body()) + .bean(mapper, "map") + .to("direct:eventhub-normalized-input") + .end(); + } +} diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java new file mode 100644 index 0000000..e115483 --- /dev/null +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -0,0 +1,38 @@ +package at.procon.eventhub.config; + +import java.time.Duration; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "eventhub") +public class EventHubProperties { + + private final Batch batch = new Batch(); + + public Batch getBatch() { + return batch; + } + + public static class Batch { + /** Number of events collected before a package is persisted. */ + private int completionSize = 1000; + + /** Maximum time to wait for more events belonging to the same package key. */ + private Duration completionTimeout = Duration.ofSeconds(5); + + public int getCompletionSize() { + return completionSize; + } + + public void setCompletionSize(int completionSize) { + this.completionSize = completionSize; + } + + public Duration getCompletionTimeout() { + return completionTimeout; + } + + public void setCompletionTimeout(Duration completionTimeout) { + this.completionTimeout = completionTimeout; + } + } +} diff --git a/src/main/java/at/procon/eventhub/dto/CardSlot.java b/src/main/java/at/procon/eventhub/dto/CardSlot.java new file mode 100644 index 0000000..eb3da37 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/CardSlot.java @@ -0,0 +1,6 @@ +package at.procon.eventhub.dto; + +public enum CardSlot { + DRIVER, + CO_DRIVER +} diff --git a/src/main/java/at/procon/eventhub/dto/CardStatus.java b/src/main/java/at/procon/eventhub/dto/CardStatus.java new file mode 100644 index 0000000..1a9a6d2 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/CardStatus.java @@ -0,0 +1,6 @@ +package at.procon.eventhub.dto; + +public enum CardStatus { + INSERTED, + NOT_INSERTED +} diff --git a/src/main/java/at/procon/eventhub/dto/DataPackageStatus.java b/src/main/java/at/procon/eventhub/dto/DataPackageStatus.java new file mode 100644 index 0000000..f6e3495 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/DataPackageStatus.java @@ -0,0 +1,7 @@ +package at.procon.eventhub.dto; + +public enum DataPackageStatus { + IMPORTING, + IMPORTED, + FAILED +} diff --git a/src/main/java/at/procon/eventhub/dto/DataPackageType.java b/src/main/java/at/procon/eventhub/dto/DataPackageType.java new file mode 100644 index 0000000..3ec7fdf --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/DataPackageType.java @@ -0,0 +1,9 @@ +package at.procon.eventhub.dto; + +public enum DataPackageType { + CAMEL_BATCH, + API_RESPONSE, + FILE_IMPORT, + DB_EXTRACT, + MANUAL_IMPORT +} diff --git a/src/main/java/at/procon/eventhub/dto/DriverCardRefDto.java b/src/main/java/at/procon/eventhub/dto/DriverCardRefDto.java new file mode 100644 index 0000000..7c7b54c --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/DriverCardRefDto.java @@ -0,0 +1,30 @@ +package at.procon.eventhub.dto; + +/** + * Tachograph driver-card identifier. The card number is scoped by issuing nation. + */ +public record DriverCardRefDto( + String nation, + String number +) { + public DriverCardRefDto { + nation = normalize(nation); + number = normalizeNullable(number); + } + + public boolean hasValue() { + return number != null && !number.isBlank(); + } + + public String stableKey() { + return (nation == null ? "" : nation) + ":" + (number == null ? "" : number); + } + + private static String normalize(String value) { + return value == null || value.isBlank() ? null : value.trim().toUpperCase(); + } + + private static String normalizeNullable(String value) { + return value == null || value.isBlank() ? null : value.trim(); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/DriverRefDto.java b/src/main/java/at/procon/eventhub/dto/DriverRefDto.java new file mode 100644 index 0000000..2898914 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/DriverRefDto.java @@ -0,0 +1,31 @@ +package at.procon.eventhub.dto; + +import jakarta.validation.Valid; + +/** + * Source-side driver reference. No internal EventHub driver id is required in + * incoming acquisition requests; it can be resolved later from sourceEntityId or + * nation-scoped driver card number. + */ +public record DriverRefDto( + String sourceEntityId, + @Valid DriverCardRefDto driverCard +) { + public DriverRefDto { + sourceEntityId = normalizeNullable(sourceEntityId); + } + + public boolean hasAnyReference() { + return (sourceEntityId != null && !sourceEntityId.isBlank()) + || (driverCard != null && driverCard.hasValue()); + } + + public String stableKey() { + String cardKey = driverCard == null ? "" : driverCard.stableKey(); + return (sourceEntityId == null ? "" : sourceEntityId) + "|" + cardKey; + } + + private static String normalizeNullable(String value) { + return value == null || value.isBlank() ? null : value.trim(); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/DrivingStatus.java b/src/main/java/at/procon/eventhub/dto/DrivingStatus.java new file mode 100644 index 0000000..53d9fde --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/DrivingStatus.java @@ -0,0 +1,8 @@ +package at.procon.eventhub.dto; + +public enum DrivingStatus { + SINGLE, + CREW, + KNOWN, + UNKNOWN +} diff --git a/src/main/java/at/procon/eventhub/dto/EventDetailsDto.java b/src/main/java/at/procon/eventhub/dto/EventDetailsDto.java new file mode 100644 index 0000000..bcc59f3 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventDetailsDto.java @@ -0,0 +1,22 @@ +package at.procon.eventhub.dto; + +import com.fasterxml.jackson.databind.JsonNode; +import jakarta.validation.constraints.NotBlank; + +/** + * Generic, normalized event-specific attributes. + * + * Common fields stay on EventHubEventDto. Provider-specific raw values stay in + * payload. Reusable semantic attributes such as cardSlot, countryFrom, or + * operation are placed here under a detail type. + */ +public record EventDetailsDto( + @NotBlank String type, + JsonNode attributes +) { + public EventDetailsDto { + if (type != null) { + type = type.trim().toUpperCase().replace('-', '_').replace(' ', '_'); + } + } +} diff --git a/src/main/java/at/procon/eventhub/dto/EventDomain.java b/src/main/java/at/procon/eventhub/dto/EventDomain.java new file mode 100644 index 0000000..bfc900d --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventDomain.java @@ -0,0 +1,17 @@ +package at.procon.eventhub.dto; + +public enum EventDomain { + DRIVER_CARD, + DRIVER_ACTIVITY, + IGNITION, + POSITION, + BORDER_CROSSING, + LOAD_UNLOAD, + OUT_OF_SCOPE, + FERRY_TRAIN, + SPEEDING, + PLACE, + VEHICLE_DATA, + TELEMATICS_DATA, + MANUAL_ENTRY +} diff --git a/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java b/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java new file mode 100644 index 0000000..b1894af --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java @@ -0,0 +1,20 @@ +package at.procon.eventhub.dto; + +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; + +public record EventHubEventBatchDto( + String packageKey, + EventHubPackageRequest packageInfo, + DataPackageType packageType, + OffsetDateTime requestedFrom, + OffsetDateTime requestedTo, + List events, + Map metadata +) { + public EventHubEventBatchDto { + events = events == null ? List.of() : List.copyOf(events); + metadata = metadata == null ? Map.of() : Map.copyOf(metadata); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java b/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java new file mode 100644 index 0000000..6b24a0c --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java @@ -0,0 +1,121 @@ +package at.procon.eventhub.dto; + +import com.fasterxml.jackson.databind.JsonNode; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import java.time.OffsetDateTime; +import java.util.UUID; + +/** + * Normalized acquisition event. + * + * It is a point event: it has exactly one occurredAt timestamp. Duration/end + * time are not part of the generic EventHub event contract. If a source row + * describes an interval, the acquisition mapper may emit multiple point events. + */ +public record EventHubEventDto( + UUID eventId, + + /** Stable id of this imported/source event record, not the canonical business event id. */ + @NotBlank String externalSourceEventId, + + /** Source-side driver reference. No internal driver id is required during acquisition. */ + @Valid DriverRefDto driverRef, + + /** Source-side vehicle reference. VIN can be absent when only driver-card data is available. */ + @Valid VehicleRefDto vehicleRef, + + @NotNull OffsetDateTime occurredAt, + OffsetDateTime receivedPartnerAt, + OffsetDateTime receivedHubAt, + + @NotNull EventDomain eventDomain, + @NotNull EventType eventType, + @NotNull EventLifecycle lifecycle, + + Long odometerM, + @Valid GeoPointDto position, + + /** Normalized semantic details depending on eventDomain/eventType. */ + @Valid EventDetailsDto eventDetails, + + /** Raw/provider-specific payload, stored as real JSON and not as encoded JSON string. */ + JsonNode payload, + + boolean manualEntry, + + /** Package/source context used by Camel acquisition routes for grouping and batching. */ + @Valid EventHubPackageRequest packageInfo +) { + public EventHubEventDto { + boolean hasDriver = driverRef != null && driverRef.hasAnyReference(); + boolean hasVehicle = vehicleRef != null && vehicleRef.hasAnyReference(); + if (!hasDriver && !hasVehicle) { + throw new IllegalArgumentException("driverRef or vehicleRef with at least one identifier must be set"); + } + } + + public EventHubEventDto withEventId(UUID newEventId) { + return new EventHubEventDto( + newEventId, + externalSourceEventId, + driverRef, + vehicleRef, + occurredAt, + receivedPartnerAt, + receivedHubAt, + eventDomain, + eventType, + lifecycle, + odometerM, + position, + eventDetails, + payload, + manualEntry, + packageInfo + ); + } + + public EventHubEventDto withReceivedHubAt(OffsetDateTime value) { + return new EventHubEventDto( + eventId, + externalSourceEventId, + driverRef, + vehicleRef, + occurredAt, + receivedPartnerAt, + value, + eventDomain, + eventType, + lifecycle, + odometerM, + position, + eventDetails, + payload, + manualEntry, + packageInfo + ); + } + + public EventHubEventDto withPackageInfo(EventHubPackageRequest value) { + return new EventHubEventDto( + eventId, + externalSourceEventId, + driverRef, + vehicleRef, + occurredAt, + receivedPartnerAt, + receivedHubAt, + eventDomain, + eventType, + lifecycle, + odometerM, + position, + eventDetails, + payload, + manualEntry, + value + ); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/EventHubPackageIngestRequest.java b/src/main/java/at/procon/eventhub/dto/EventHubPackageIngestRequest.java new file mode 100644 index 0000000..0414cab --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventHubPackageIngestRequest.java @@ -0,0 +1,18 @@ +package at.procon.eventhub.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import java.util.List; + +public record EventHubPackageIngestRequest( + @JsonProperty("package") + @Valid @NotNull EventHubPackageRequest packageInfo, + + @Valid @NotEmpty List events +) { + public EventHubPackageIngestRequest { + events = events == null ? List.of() : List.copyOf(events); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/EventHubPackageRequest.java b/src/main/java/at/procon/eventhub/dto/EventHubPackageRequest.java new file mode 100644 index 0000000..db5e744 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventHubPackageRequest.java @@ -0,0 +1,33 @@ +package at.procon.eventhub.dto; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import java.time.LocalDate; +import java.time.OffsetDateTime; + +/** + * Acquisition package context. One package should represent one coherent import + * unit, e.g. tenant + TACHOGRAPH + VEHICLE_UNIT + DRIVER_ACTIVITY + business date. + */ +public record EventHubPackageRequest( + /** Tenant/client/account owning the acquired data. */ + @NotBlank String tenantKey, + @Valid @NotNull EventSourceDto eventSource, + @NotBlank String eventFamily, + LocalDate businessDate, + OffsetDateTime requestedFrom, + OffsetDateTime requestedTo, + @NotBlank String externalPackageId +) { + public EventHubPackageRequest { + tenantKey = normalizeTenant(tenantKey); + if (eventFamily != null) { + eventFamily = eventFamily.trim().toUpperCase().replace('-', '_').replace(' ', '_'); + } + } + + private static String normalizeTenant(String value) { + return value == null || value.isBlank() ? value : value.trim(); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/EventHubPackageResult.java b/src/main/java/at/procon/eventhub/dto/EventHubPackageResult.java new file mode 100644 index 0000000..ea751cc --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventHubPackageResult.java @@ -0,0 +1,11 @@ +package at.procon.eventhub.dto; + +import java.util.UUID; + +public record EventHubPackageResult( + UUID packageId, + String packageKey, + int receivedCount, + int insertedCount +) { +} diff --git a/src/main/java/at/procon/eventhub/dto/EventLifecycle.java b/src/main/java/at/procon/eventhub/dto/EventLifecycle.java new file mode 100644 index 0000000..a1fab77 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventLifecycle.java @@ -0,0 +1,16 @@ +package at.procon.eventhub.dto; + +public enum EventLifecycle { + INSERT, + WITHDRAW, + ON, + OFF, + START, + END, + BEGIN, + INBOUND, + OUTBOUND, + OUT_EU, + SNAPSHOT, + MANUAL +} diff --git a/src/main/java/at/procon/eventhub/dto/EventSourceDto.java b/src/main/java/at/procon/eventhub/dto/EventSourceDto.java new file mode 100644 index 0000000..201da90 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventSourceDto.java @@ -0,0 +1,48 @@ +package at.procon.eventhub.dto; + +import jakarta.validation.constraints.NotBlank; + +/** + * Describes the origin of an acquired event record. + * + * This is intentionally acquisition/source context and not part of the canonical + * real-world event identity. The same canonical event can be acquired from + * TACHOGRAPH/VEHICLE_UNIT and later from TACHOGRAPH/DRIVER_CARD. + */ +public record EventSourceDto( + @NotBlank String providerKey, + @NotBlank String sourceKind, + @NotBlank String sourceKey, + String sourceInstanceKey, + String tenantProviderSettingKey, + String externalFleetKey +) { + public EventSourceDto { + providerKey = normalize(providerKey); + sourceKind = normalize(sourceKind); + sourceKey = normalize(sourceKey); + sourceInstanceKey = normalizeNullable(sourceInstanceKey); + tenantProviderSettingKey = normalizeNullable(tenantProviderSettingKey); + externalFleetKey = normalizeNullable(externalFleetKey); + } + + public String stableKey() { + return String.join(":", + providerKey, + sourceKind, + sourceKey, + sourceInstanceKey == null ? "default" : sourceInstanceKey + ); + } + + private static String normalize(String value) { + if (value == null || value.isBlank()) { + return value; + } + return value.trim().toUpperCase().replace('-', '_').replace(' ', '_'); + } + + private static String normalizeNullable(String value) { + return value == null || value.isBlank() ? null : value.trim(); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/EventType.java b/src/main/java/at/procon/eventhub/dto/EventType.java new file mode 100644 index 0000000..384614c --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventType.java @@ -0,0 +1,29 @@ +package at.procon.eventhub.dto; + +public enum EventType { + CARD_INSERTED, + CARD_WITHDRAWN, + DRIVE, + BREAK_REST, + AVAILABILITY, + WORK, + UNKNOWN_ACTIVITY, + IGNITION_ON, + IGNITION_OFF, + POSITION_RECORDED, + BORDER_INBOUND, + BORDER_OUTBOUND, + BORDER_OUT_EU, + LOAD, + UNLOAD, + LOAD_UNLOAD, + OUT, + FERRY_TRAIN, + SPEEDING, + START_PLACE, + END_PLACE, + VEHICLE_DATA, + TELEMATICS_DATA, + MANUAL_ENTRY, + UNKNOWN_EVENT +} diff --git a/src/main/java/at/procon/eventhub/dto/GeoPointDto.java b/src/main/java/at/procon/eventhub/dto/GeoPointDto.java new file mode 100644 index 0000000..a7cb54d --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/GeoPointDto.java @@ -0,0 +1,16 @@ +package at.procon.eventhub.dto; + +import jakarta.validation.constraints.DecimalMax; +import jakarta.validation.constraints.DecimalMin; +import java.math.BigDecimal; + +public record GeoPointDto( + @DecimalMin("-90.0") @DecimalMax("90.0") BigDecimal latitude, + @DecimalMin("-180.0") @DecimalMax("180.0") BigDecimal longitude +) { + public GeoPointDto { + if ((latitude == null) != (longitude == null)) { + throw new IllegalArgumentException("latitude and longitude must either both be set or both be null"); + } + } +} diff --git a/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java b/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java new file mode 100644 index 0000000..6b715a6 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java @@ -0,0 +1,39 @@ +package at.procon.eventhub.dto; + +import jakarta.validation.Valid; + +/** + * Source-side vehicle reference. VIN can be missing for driver-card-only data; + * VRN/registration is nation-scoped and can be resolved to VIN later. + */ +public record VehicleRefDto( + String sourceEntityId, + String vin, + @Valid VehicleRegistrationRefDto vehicleRegistration +) { + public VehicleRefDto { + sourceEntityId = normalizeNullable(sourceEntityId); + vin = normalizeVin(vin); + } + + public boolean hasAnyReference() { + return (sourceEntityId != null && !sourceEntityId.isBlank()) + || (vin != null && !vin.isBlank()) + || (vehicleRegistration != null && vehicleRegistration.hasValue()); + } + + public String stableKey() { + String registrationKey = vehicleRegistration == null ? "" : vehicleRegistration.stableKey(); + return (sourceEntityId == null ? "" : sourceEntityId) + "|" + + (vin == null ? "" : vin) + "|" + + registrationKey; + } + + private static String normalizeNullable(String value) { + return value == null || value.isBlank() ? null : value.trim(); + } + + private static String normalizeVin(String value) { + return value == null || value.isBlank() ? null : value.trim().toUpperCase(); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/VehicleRegistrationRefDto.java b/src/main/java/at/procon/eventhub/dto/VehicleRegistrationRefDto.java new file mode 100644 index 0000000..e34d129 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/VehicleRegistrationRefDto.java @@ -0,0 +1,31 @@ +package at.procon.eventhub.dto; + +/** + * Vehicle registration number reference. VRN/plate is scoped by nation and can + * be resolved historically by occurredAt later. + */ +public record VehicleRegistrationRefDto( + String nation, + String number +) { + public VehicleRegistrationRefDto { + nation = normalize(nation); + number = normalizeNullable(number); + } + + public boolean hasValue() { + return number != null && !number.isBlank(); + } + + public String stableKey() { + return (nation == null ? "" : nation) + ":" + (number == null ? "" : number); + } + + private static String normalize(String value) { + return value == null || value.isBlank() ? null : value.trim().toUpperCase(); + } + + private static String normalizeNullable(String value) { + return value == null || value.isBlank() ? null : value.trim(); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/source/TachographActivityDto.java b/src/main/java/at/procon/eventhub/dto/source/TachographActivityDto.java new file mode 100644 index 0000000..bc5653d --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/source/TachographActivityDto.java @@ -0,0 +1,30 @@ +package at.procon.eventhub.dto.source; + +import at.procon.eventhub.dto.CardSlot; +import at.procon.eventhub.dto.CardStatus; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.DrivingStatus; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.VehicleRefDto; +import java.time.OffsetDateTime; +import java.util.Map; + +public record TachographActivityDto( + String tenantKey, + String sourceKind, // VEHICLE_UNIT or DRIVER_CARD + String sourceInstanceKey, // e.g. main-tachograph-db + String tenantProviderSettingKey, + String externalSourceEventId, + OffsetDateTime occurredAt, + OffsetDateTime receivedPartnerAt, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + EventType activityType, + EventLifecycle lifecycle, + CardSlot cardSlot, + CardStatus cardStatus, + DrivingStatus drivingStatus, + Map payload +) { +} diff --git a/src/main/java/at/procon/eventhub/dto/source/TelematicsPositionDto.java b/src/main/java/at/procon/eventhub/dto/source/TelematicsPositionDto.java new file mode 100644 index 0000000..9ae4f36 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/source/TelematicsPositionDto.java @@ -0,0 +1,27 @@ +package at.procon.eventhub.dto.source; + +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.VehicleRefDto; +import java.math.BigDecimal; +import java.time.OffsetDateTime; +import java.util.Map; + +public record TelematicsPositionDto( + String tenantKey, + String providerKey, // YELLOWFOX, FLEETBOARD, TOMTOM, ... + String sourceKey, // e.g. FLEETBOARD_POSITION + String sourceInstanceKey, + String tenantProviderSettingKey, + String externalFleetKey, + String externalSourceEventId, + OffsetDateTime occurredAt, + OffsetDateTime receivedPartnerAt, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + Long odometerM, + BigDecimal latitude, + BigDecimal longitude, + String positionReason, + Map payload +) { +} diff --git a/src/main/java/at/procon/eventhub/dto/source/YellowFoxD8BookingDto.java b/src/main/java/at/procon/eventhub/dto/source/YellowFoxD8BookingDto.java new file mode 100644 index 0000000..77a203c --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/source/YellowFoxD8BookingDto.java @@ -0,0 +1,27 @@ +package at.procon.eventhub.dto.source; + +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.VehicleRefDto; +import java.math.BigDecimal; +import java.time.OffsetDateTime; +import java.util.Map; + +public record YellowFoxD8BookingDto( + String tenantKey, + String sourceInstanceKey, + String tenantProviderSettingKey, + String externalFleetKey, + String eventId, + String key, + Integer eventType, + Integer state, + OffsetDateTime occurredAt, + OffsetDateTime receivedPartnerAt, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + Long odometerM, + BigDecimal latitude, + BigDecimal longitude, + Map payload +) { +} diff --git a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java new file mode 100644 index 0000000..b905a67 --- /dev/null +++ b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java @@ -0,0 +1,94 @@ +package at.procon.eventhub.persistence; + +import at.procon.eventhub.dto.DataPackageStatus; +import at.procon.eventhub.dto.DataPackageType; +import at.procon.eventhub.dto.EventHubPackageRequest; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.UUID; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +public class DataPackageRepository { + + private final JdbcTemplate jdbcTemplate; + private final ObjectMapper objectMapper; + + public DataPackageRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) { + this.jdbcTemplate = jdbcTemplate; + this.objectMapper = objectMapper; + } + + public UUID createPackage( + int eventSourceId, + String packageKey, + EventHubPackageRequest packageInfo, + DataPackageType packageType, + OffsetDateTime requestedFrom, + OffsetDateTime requestedTo, + Map metadata + ) { + UUID id = UUID.randomUUID(); + jdbcTemplate.update( + """ + insert into eventhub.data_package( + id, event_source_id, tenant_key, package_key, package_type, status, + event_family, business_date, external_package_id, + requested_from, requested_to, received_at, event_count, metadata + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), 0, ?::jsonb) + """, + ps -> { + ps.setObject(1, id); + ps.setInt(2, eventSourceId); + ps.setString(3, packageInfo == null ? "default" : packageInfo.tenantKey()); + ps.setString(4, packageKey); + ps.setString(5, packageType.name()); + ps.setString(6, DataPackageStatus.IMPORTING.name()); + ps.setString(7, packageInfo == null ? null : packageInfo.eventFamily()); + ps.setObject(8, packageInfo == null ? null : packageInfo.businessDate()); + ps.setString(9, packageInfo == null ? packageKey : packageInfo.externalPackageId()); + ps.setObject(10, requestedFrom); + ps.setObject(11, requestedTo); + ps.setString(12, toJson(metadata)); + } + ); + return id; + } + + public void markImported(UUID packageId, int insertedCount) { + jdbcTemplate.update( + """ + update eventhub.data_package + set status = ?, event_count = ?, completed_at = now() + where id = ? + """, + DataPackageStatus.IMPORTED.name(), + insertedCount, + packageId + ); + } + + public void markFailed(UUID packageId, String errorMessage) { + jdbcTemplate.update( + """ + update eventhub.data_package + set status = ?, error_message = ?, completed_at = now() + where id = ? + """, + DataPackageStatus.FAILED.name(), + errorMessage, + packageId + ); + } + + private String toJson(Map value) { + try { + return objectMapper.writeValueAsString(value == null ? Map.of() : value); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Cannot serialize package metadata", e); + } + } +} diff --git a/src/main/java/at/procon/eventhub/persistence/EventRepository.java b/src/main/java/at/procon/eventhub/persistence/EventRepository.java new file mode 100644 index 0000000..04a4c8b --- /dev/null +++ b/src/main/java/at/procon/eventhub/persistence/EventRepository.java @@ -0,0 +1,143 @@ +package at.procon.eventhub.persistence; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.service.EventNaturalKeyService; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +public class EventRepository { + + private final JdbcTemplate jdbcTemplate; + private final ObjectMapper objectMapper; + private final EventNaturalKeyService naturalKeyService; + + public EventRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, EventNaturalKeyService naturalKeyService) { + this.jdbcTemplate = jdbcTemplate; + this.objectMapper = objectMapper; + this.naturalKeyService = naturalKeyService; + } + + /** + * Temporary acquisition-stage persistence. The canonical storage model will + * be finalized later; for now this table keeps acquired point events with + * EventSource context, source-side driver/vehicle refs, generic eventDetails, + * and raw payload JSON. + */ + public int batchInsert(UUID packageId, int eventSourceId, List events) { + int[] counts = jdbcTemplate.batchUpdate( + """ + insert into eventhub.acquired_event( + id, event_source_id, data_package_id, + external_source_event_id, + driver_source_entity_id, driver_card_nation, driver_card_number, + vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, + occurred_at, received_partner_at, received_hub_at, + event_domain, event_type, lifecycle, + odometer_m, latitude, longitude, + event_details, payload, manual_entry, + canonical_key_hash, source_record_key_hash + ) values ( + ?, ?, ?, + ?, + ?, ?, ?, + ?, ?, ?, ?, + ?, ?, ?, + ?, ?, ?, + ?, ?, ?, + ?::jsonb, ?::jsonb, ?, + ?, ? + ) + on conflict do nothing + """, + new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws java.sql.SQLException { + EventHubEventDto event = events.get(i); + UUID eventId = event.eventId() == null ? UUID.randomUUID() : event.eventId(); + OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt(); + DriverRefDto driverRef = event.driverRef(); + DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard(); + VehicleRefDto vehicleRef = event.vehicleRef(); + VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration(); + + ps.setObject(1, eventId); + ps.setInt(2, eventSourceId); + ps.setObject(3, packageId); + ps.setString(4, event.externalSourceEventId()); + + ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId()); + ps.setString(6, driverCard == null ? null : driverCard.nation()); + ps.setString(7, driverCard == null ? null : driverCard.number()); + + ps.setString(8, vehicleRef == null ? null : vehicleRef.sourceEntityId()); + ps.setString(9, vehicleRef == null ? null : vehicleRef.vin()); + ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation()); + ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number()); + + ps.setObject(12, event.occurredAt()); + ps.setObject(13, event.receivedPartnerAt()); + ps.setObject(14, receivedHubAt); + ps.setString(15, event.eventDomain().name()); + ps.setString(16, event.eventType().name()); + ps.setString(17, event.lifecycle().name()); + setNullableLong(ps, 18, event.odometerM()); + if (event.position() == null) { + ps.setNull(19, Types.NUMERIC); + ps.setNull(20, Types.NUMERIC); + } else { + ps.setObject(19, event.position().latitude()); + ps.setObject(20, event.position().longitude()); + } + ps.setString(21, toJson(objectMapper.valueToTree(event.eventDetails()))); + ps.setString(22, toJson(event.payload())); + ps.setBoolean(23, event.manualEntry()); + ps.setString(24, naturalKeyService.buildCanonicalKeyHash(event)); + ps.setString(25, naturalKeyService.buildSourceRecordKeyHash(event, eventSourceId)); + } + + @Override + public int getBatchSize() { + return events.size(); + } + } + ); + + int inserted = 0; + for (int count : counts) { + if (count > 0 || count == PreparedStatement.SUCCESS_NO_INFO) { + inserted++; + } + } + return inserted; + } + + private void setNullableLong(PreparedStatement ps, int index, Long value) throws java.sql.SQLException { + if (value == null) { + ps.setNull(index, Types.BIGINT); + } else { + ps.setLong(index, value); + } + } + + private String toJson(JsonNode value) { + try { + return objectMapper.writeValueAsString(value == null ? objectMapper.createObjectNode() : value); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Cannot serialize JSONB value", e); + } + } +} diff --git a/src/main/java/at/procon/eventhub/persistence/EventSourceRepository.java b/src/main/java/at/procon/eventhub/persistence/EventSourceRepository.java new file mode 100644 index 0000000..f6fc71a --- /dev/null +++ b/src/main/java/at/procon/eventhub/persistence/EventSourceRepository.java @@ -0,0 +1,68 @@ +package at.procon.eventhub.persistence; + +import at.procon.eventhub.dto.EventSourceDto; +import java.sql.PreparedStatement; +import java.sql.Statement; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +public class EventSourceRepository { + + private final JdbcTemplate jdbcTemplate; + + public EventSourceRepository(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + public int resolveSourceId(EventSourceDto eventSource) { + Integer existing = findSourceId(eventSource); + if (existing != null) { + return existing; + } + + jdbcTemplate.update(connection -> { + PreparedStatement ps = connection.prepareStatement( + """ + insert into eventhub.event_source( + provider_key, source_kind, source_key, source_instance_key, + tenant_provider_setting_key, external_fleet_key + ) values (?, ?, ?, ?, ?, ?) + on conflict (provider_key, source_kind, source_key, source_instance_key) do nothing + """, + Statement.NO_GENERATED_KEYS + ); + ps.setString(1, eventSource.providerKey()); + ps.setString(2, eventSource.sourceKind()); + ps.setString(3, eventSource.sourceKey()); + ps.setString(4, eventSource.sourceInstanceKey() == null ? "default" : eventSource.sourceInstanceKey()); + ps.setString(5, eventSource.tenantProviderSettingKey()); + ps.setString(6, eventSource.externalFleetKey()); + return ps; + }); + + Integer created = findSourceId(eventSource); + if (created == null) { + throw new IllegalStateException("Could not resolve event source id for " + eventSource.stableKey()); + } + return created; + } + + private Integer findSourceId(EventSourceDto eventSource) { + return jdbcTemplate.query( + """ + select id + from eventhub.event_source + where provider_key = ? + and source_kind = ? + and source_key = ? + and source_instance_key = ? + """, + rs -> rs.next() ? rs.getInt("id") : null, + eventSource.providerKey(), + eventSource.sourceKind(), + eventSource.sourceKey(), + eventSource.sourceInstanceKey() == null ? "default" : eventSource.sourceInstanceKey() + ); + } +} diff --git a/src/main/java/at/procon/eventhub/service/EventDetailsFactory.java b/src/main/java/at/procon/eventhub/service/EventDetailsFactory.java new file mode 100644 index 0000000..0f96b3c --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/EventDetailsFactory.java @@ -0,0 +1,76 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.CardSlot; +import at.procon.eventhub.dto.CardStatus; +import at.procon.eventhub.dto.DrivingStatus; +import at.procon.eventhub.dto.EventDetailsDto; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.math.BigDecimal; +import java.util.LinkedHashMap; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class EventDetailsFactory { + + private final ObjectMapper objectMapper; + + public EventDetailsFactory(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public EventDetailsDto driverActivity(CardSlot cardSlot, CardStatus cardStatus, DrivingStatus drivingStatus) { + Map attributes = new LinkedHashMap<>(); + put(attributes, "cardSlot", cardSlot); + put(attributes, "cardStatus", cardStatus); + put(attributes, "drivingStatus", drivingStatus); + return new EventDetailsDto("DRIVER_ACTIVITY", objectMapper.valueToTree(attributes)); + } + + public EventDetailsDto driverCard(CardSlot cardSlot, CardStatus cardStatus) { + Map attributes = new LinkedHashMap<>(); + put(attributes, "cardSlot", cardSlot); + put(attributes, "cardStatus", cardStatus); + return new EventDetailsDto("DRIVER_CARD", objectMapper.valueToTree(attributes)); + } + + public EventDetailsDto position(String reason) { + Map attributes = new LinkedHashMap<>(); + put(attributes, "positionReason", reason); + return new EventDetailsDto("POSITION", objectMapper.valueToTree(attributes)); + } + + public EventDetailsDto borderCrossing(String countryFrom, String countryTo) { + Map attributes = new LinkedHashMap<>(); + put(attributes, "countryFrom", countryFrom); + put(attributes, "countryTo", countryTo); + return new EventDetailsDto("BORDER_CROSSING", objectMapper.valueToTree(attributes)); + } + + public EventDetailsDto loadUnload(String operation) { + Map attributes = new LinkedHashMap<>(); + put(attributes, "operation", operation); + return new EventDetailsDto("LOAD_UNLOAD", objectMapper.valueToTree(attributes)); + } + + public EventDetailsDto speeding(BigDecimal speedKmh, BigDecimal permittedSpeedKmh) { + Map attributes = new LinkedHashMap<>(); + put(attributes, "speedKmh", speedKmh); + put(attributes, "permittedSpeedKmh", permittedSpeedKmh); + if (speedKmh != null && permittedSpeedKmh != null) { + put(attributes, "overspeedKmh", speedKmh.subtract(permittedSpeedKmh)); + } + return new EventDetailsDto("SPEEDING", objectMapper.valueToTree(attributes)); + } + + public JsonNode payloadFromMap(Map payload) { + return objectMapper.valueToTree(payload == null ? Map.of() : payload); + } + + private void put(Map target, String key, Object value) { + if (value != null) { + target.put(key, value instanceof Enum enumValue ? enumValue.name() : value); + } + } +} diff --git a/src/main/java/at/procon/eventhub/service/EventHubEventSorter.java b/src/main/java/at/procon/eventhub/service/EventHubEventSorter.java new file mode 100644 index 0000000..bbde029 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/EventHubEventSorter.java @@ -0,0 +1,19 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import java.util.Comparator; +import java.util.List; +import org.springframework.stereotype.Component; + +@Component +public class EventHubEventSorter { + + public List sort(List events) { + return events.stream() + .sorted(Comparator + .comparing(EventHubEventDto::occurredAt) + .thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo)) + .thenComparing(event -> event.eventType().name())) + .toList(); + } +} diff --git a/src/main/java/at/procon/eventhub/service/EventHubEventValidator.java b/src/main/java/at/procon/eventhub/service/EventHubEventValidator.java new file mode 100644 index 0000000..880522f --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/EventHubEventValidator.java @@ -0,0 +1,64 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import org.springframework.stereotype.Component; + +@Component +public class EventHubEventValidator { + + public void validate(EventHubEventDto event) { + if (event == null) { + throw new IllegalArgumentException("EventHubEventDto must not be null"); + } + if (event.externalSourceEventId() == null || event.externalSourceEventId().isBlank()) { + throw new IllegalArgumentException("externalSourceEventId must be set"); + } + if (event.occurredAt() == null) { + throw new IllegalArgumentException("occurredAt must be set"); + } + boolean hasDriverRef = event.driverRef() != null && event.driverRef().hasAnyReference(); + boolean hasVehicleRef = event.vehicleRef() != null && event.vehicleRef().hasAnyReference(); + if (!hasDriverRef && !hasVehicleRef) { + throw new IllegalArgumentException("driverRef or vehicleRef with at least one identifier must be set"); + } + if (event.packageInfo() == null || event.packageInfo().tenantKey() == null || event.packageInfo().tenantKey().isBlank()) { + throw new IllegalArgumentException("packageInfo.tenantKey must be set"); + } + if (event.packageInfo().eventSource() == null) { + throw new IllegalArgumentException("packageInfo.eventSource must be set"); + } + if (event.eventDomain() == null) { + throw new IllegalArgumentException("eventDomain must be set"); + } + if (event.eventType() == null) { + throw new IllegalArgumentException("eventType must be set"); + } + if (event.lifecycle() == null) { + throw new IllegalArgumentException("lifecycle must be set"); + } + if (event.position() != null && (event.position().latitude() == null || event.position().longitude() == null)) { + throw new IllegalArgumentException("position latitude and longitude must both be set"); + } + validateEventDetails(event); + } + + private void validateEventDetails(EventHubEventDto event) { + if (event.eventDetails() == null) { + return; + } + String detailType = event.eventDetails().type(); + if (event.eventDomain() == EventDomain.DRIVER_ACTIVITY && !"DRIVER_ACTIVITY".equals(detailType)) { + throw new IllegalArgumentException("DRIVER_ACTIVITY events must use eventDetails.type=DRIVER_ACTIVITY"); + } + if (event.eventDomain() == EventDomain.BORDER_CROSSING && !"BORDER_CROSSING".equals(detailType)) { + throw new IllegalArgumentException("BORDER_CROSSING events must use eventDetails.type=BORDER_CROSSING"); + } + if (event.eventDomain() == EventDomain.LOAD_UNLOAD && !"LOAD_UNLOAD".equals(detailType)) { + throw new IllegalArgumentException("LOAD_UNLOAD events must use eventDetails.type=LOAD_UNLOAD"); + } + if (event.eventDomain() == EventDomain.SPEEDING && !"SPEEDING".equals(detailType)) { + throw new IllegalArgumentException("SPEEDING events must use eventDetails.type=SPEEDING"); + } + } +} diff --git a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java new file mode 100644 index 0000000..5c64a1d --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java @@ -0,0 +1,83 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventHubEventBatchDto; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventHubPackageResult; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.persistence.DataPackageRepository; +import at.procon.eventhub.persistence.EventRepository; +import at.procon.eventhub.persistence.EventSourceRepository; +import java.util.List; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Service +public class EventHubIngestionService { + + private static final Logger log = LoggerFactory.getLogger(EventHubIngestionService.class); + + private final EventSourceRepository eventSourceRepository; + private final DataPackageRepository dataPackageRepository; + private final EventRepository eventRepository; + private final EventHubEventValidator validator; + private final EventHubEventSorter sorter; + + public EventHubIngestionService( + EventSourceRepository eventSourceRepository, + DataPackageRepository dataPackageRepository, + EventRepository eventRepository, + EventHubEventValidator validator, + EventHubEventSorter sorter + ) { + this.eventSourceRepository = eventSourceRepository; + this.dataPackageRepository = dataPackageRepository; + this.eventRepository = eventRepository; + this.validator = validator; + this.sorter = sorter; + } + + @Transactional + public EventHubPackageResult ingest(EventHubEventBatchDto batch) { + if (batch == null || batch.events().isEmpty()) { + return new EventHubPackageResult(null, batch == null ? null : batch.packageKey(), 0, 0); + } + + EventHubPackageRequest packageInfo = batch.packageInfo(); + if (packageInfo == null) { + packageInfo = batch.events().getFirst().packageInfo(); + } + if (packageInfo == null || packageInfo.eventSource() == null) { + throw new IllegalArgumentException("packageInfo.eventSource must be set before ingestion"); + } + + EventSourceDto eventSource = packageInfo.eventSource(); + int eventSourceId = eventSourceRepository.resolveSourceId(eventSource); + List sortedEvents = sorter.sort(batch.events()); + sortedEvents.forEach(validator::validate); + + UUID packageId = dataPackageRepository.createPackage( + eventSourceId, + batch.packageKey(), + packageInfo, + batch.packageType(), + batch.requestedFrom(), + batch.requestedTo(), + batch.metadata() + ); + + try { + int insertedCount = eventRepository.batchInsert(packageId, eventSourceId, sortedEvents); + dataPackageRepository.markImported(packageId, insertedCount); + log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={}", + packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount); + return new EventHubPackageResult(packageId, batch.packageKey(), sortedEvents.size(), insertedCount); + } catch (RuntimeException ex) { + dataPackageRepository.markFailed(packageId, ex.getMessage()); + throw ex; + } + } +} diff --git a/src/main/java/at/procon/eventhub/service/EventHubPackageKeyBuilder.java b/src/main/java/at/procon/eventhub/service/EventHubPackageKeyBuilder.java new file mode 100644 index 0000000..5beb676 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/EventHubPackageKeyBuilder.java @@ -0,0 +1,23 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import org.springframework.stereotype.Component; + +@Component +public class EventHubPackageKeyBuilder { + + public String build(EventHubEventDto event) { + EventHubPackageRequest packageInfo = event.packageInfo(); + if (packageInfo != null) { + return packageInfo.tenantKey() + + ":" + packageInfo.eventSource().stableKey() + + ":" + packageInfo.eventFamily() + + ":" + (packageInfo.businessDate() == null ? "NO_DATE" : packageInfo.businessDate()) + + ":" + packageInfo.externalPackageId(); + } + + String day = event.occurredAt().toLocalDate().toString(); + return "UNSPECIFIED_TENANT:UNSPECIFIED_SOURCE:" + event.eventDomain() + ":" + day; + } +} diff --git a/src/main/java/at/procon/eventhub/service/EventNaturalKeyService.java b/src/main/java/at/procon/eventhub/service/EventNaturalKeyService.java new file mode 100644 index 0000000..0310d1a --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/EventNaturalKeyService.java @@ -0,0 +1,61 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import org.springframework.stereotype.Component; + +@Component +public class EventNaturalKeyService { + + /** + * Canonical event key intentionally excludes EventSource. VU and driver-card + * acquisitions of the same real-world event can therefore converge later when + * storage/deduplication is implemented. + * + * Because incoming acquisition requests usually do not know internal master + * ids, this acquisition-stage key uses tenant + source-side master references. + * Late master-data resolution can later rebuild a stronger canonical key. + */ + public String buildCanonicalKeyHash(EventHubEventDto event) { + String naturalKey = String.join("|", + nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()), + nullToEmpty(event.driverRef() == null ? null : event.driverRef().stableKey()), + nullToEmpty(event.vehicleRef() == null ? null : event.vehicleRef().stableKey()), + nullToEmpty(event.occurredAt()), + nullToEmpty(event.eventDomain()), + nullToEmpty(event.eventType()), + nullToEmpty(event.lifecycle()) + ); + return sha256Hex(naturalKey); + } + + /** Source-record key includes the source id and external source event id. */ + public String buildSourceRecordKeyHash(EventHubEventDto event, int eventSourceId) { + String sourceRecordKey = String.join("|", + nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()), + String.valueOf(eventSourceId), + nullToEmpty(event.externalSourceEventId()) + ); + return sha256Hex(sourceRecordKey); + } + + private String nullToEmpty(Object value) { + return value == null ? "" : String.valueOf(value); + } + + private String sha256Hex(String value) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(value.getBytes(StandardCharsets.UTF_8)); + StringBuilder result = new StringBuilder(hash.length * 2); + for (byte b : hash) { + result.append(String.format("%02x", b)); + } + return result.toString(); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 is not available", e); + } + } +} diff --git a/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java b/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java new file mode 100644 index 0000000..9db4835 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java @@ -0,0 +1,73 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.source.TachographActivityDto; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.UUID; +import org.springframework.stereotype.Component; + +@Component +public class TachographActivityEventMapper { + + private final EventDetailsFactory detailsFactory; + + public TachographActivityEventMapper(EventDetailsFactory detailsFactory) { + this.detailsFactory = detailsFactory; + } + + public EventHubEventDto map(TachographActivityDto source) { + EventType activityType = source.activityType() == null ? EventType.UNKNOWN_ACTIVITY : source.activityType(); + EventLifecycle lifecycle = source.lifecycle() == null ? EventLifecycle.SNAPSHOT : source.lifecycle(); + String sourceKind = source.sourceKind() == null || source.sourceKind().isBlank() + ? "VEHICLE_UNIT" + : source.sourceKind(); + + EventSourceDto eventSource = new EventSourceDto( + "TACHOGRAPH", + sourceKind, + "TACHOGRAPH_" + sourceKind, + source.sourceInstanceKey(), + source.tenantProviderSettingKey(), + null + ); + LocalDate businessDate = source.occurredAt().toLocalDate(); + EventHubPackageRequest packageInfo = new EventHubPackageRequest( + tenantOrDefault(source.tenantKey()), + eventSource, + EventDomain.DRIVER_ACTIVITY.name(), + businessDate, + businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), + businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), + eventSource.stableKey() + ":DRIVER_ACTIVITY:" + businessDate + ); + + return new EventHubEventDto( + UUID.randomUUID(), + source.externalSourceEventId(), + source.driverRef(), + source.vehicleRef(), + source.occurredAt(), + source.receivedPartnerAt(), + OffsetDateTime.now(), + EventDomain.DRIVER_ACTIVITY, + activityType, + lifecycle, + null, + null, + detailsFactory.driverActivity(source.cardSlot(), source.cardStatus(), source.drivingStatus()), + detailsFactory.payloadFromMap(source.payload()), + false, + packageInfo + ); + } + + private String tenantOrDefault(String value) { + return value == null || value.isBlank() ? "default" : value.trim(); + } +} diff --git a/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java b/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java new file mode 100644 index 0000000..b1c0cd6 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java @@ -0,0 +1,70 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.GeoPointDto; +import at.procon.eventhub.dto.source.TelematicsPositionDto; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.UUID; +import org.springframework.stereotype.Component; + +@Component +public class TelematicsPositionEventMapper { + + private final EventDetailsFactory detailsFactory; + + public TelematicsPositionEventMapper(EventDetailsFactory detailsFactory) { + this.detailsFactory = detailsFactory; + } + + public EventHubEventDto map(TelematicsPositionDto source) { + String provider = source.providerKey() == null || source.providerKey().isBlank() ? "TELEMATICS" : source.providerKey(); + String sourceKey = source.sourceKey() == null || source.sourceKey().isBlank() ? provider + "_POSITION" : source.sourceKey(); + EventSourceDto eventSource = new EventSourceDto( + provider, + "TELEMATICS_PLATFORM", + sourceKey, + source.sourceInstanceKey(), + source.tenantProviderSettingKey(), + source.externalFleetKey() + ); + LocalDate businessDate = source.occurredAt().toLocalDate(); + EventHubPackageRequest packageInfo = new EventHubPackageRequest( + tenantOrDefault(source.tenantKey()), + eventSource, + EventDomain.POSITION.name(), + businessDate, + businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), + businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), + eventSource.stableKey() + ":POSITION:" + businessDate + ); + + return new EventHubEventDto( + UUID.randomUUID(), + source.externalSourceEventId(), + source.driverRef(), + source.vehicleRef(), + source.occurredAt(), + source.receivedPartnerAt(), + OffsetDateTime.now(), + EventDomain.POSITION, + EventType.POSITION_RECORDED, + EventLifecycle.SNAPSHOT, + source.odometerM(), + new GeoPointDto(source.latitude(), source.longitude()), + detailsFactory.position(source.positionReason()), + detailsFactory.payloadFromMap(source.payload()), + false, + packageInfo + ); + } + + private String tenantOrDefault(String value) { + return value == null || value.isBlank() ? "default" : value.trim(); + } +} diff --git a/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java b/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java new file mode 100644 index 0000000..d7acc8c --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java @@ -0,0 +1,128 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.CardSlot; +import at.procon.eventhub.dto.CardStatus; +import at.procon.eventhub.dto.DrivingStatus; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.GeoPointDto; +import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import org.springframework.stereotype.Component; + +@Component +public class YellowFoxD8BookingEventMapper { + + private final EventDetailsFactory detailsFactory; + + public YellowFoxD8BookingEventMapper(EventDetailsFactory detailsFactory) { + this.detailsFactory = detailsFactory; + } + + public EventHubEventDto map(YellowFoxD8BookingDto source) { + NormalizedEvent normalized = normalize(source.eventType(), source.state()); + Map payload = new LinkedHashMap<>(); + if (source.payload() != null) { + payload.putAll(source.payload()); + } + payload.put("provider", "YELLOWFOX"); + payload.put("sourceKind", "TELEMATICS_PLATFORM"); + payload.put("yellowFoxEventType", source.eventType()); + payload.put("yellowFoxState", source.state()); + payload.put("yellowFoxKey", source.key()); + + EventSourceDto eventSource = new EventSourceDto( + "YELLOWFOX", + "TELEMATICS_PLATFORM", + "YELLOWFOX_D8", + source.sourceInstanceKey(), + source.tenantProviderSettingKey(), + source.externalFleetKey() + ); + LocalDate businessDate = source.occurredAt().toLocalDate(); + EventHubPackageRequest packageInfo = new EventHubPackageRequest( + tenantOrDefault(source.tenantKey()), + eventSource, + normalized.domain().name(), + businessDate, + businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), + businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), + eventSource.stableKey() + ":" + normalized.domain().name() + ":" + businessDate + ); + + return new EventHubEventDto( + UUID.randomUUID(), + source.eventId(), + source.driverRef(), + source.vehicleRef(), + source.occurredAt(), + source.receivedPartnerAt(), + OffsetDateTime.now(), + normalized.domain(), + normalized.type(), + normalized.lifecycle(), + source.odometerM(), + new GeoPointDto(source.latitude(), source.longitude()), + detailsFor(normalized), + detailsFactory.payloadFromMap(payload), + false, + packageInfo + ); + } + + private String tenantOrDefault(String value) { + return value == null || value.isBlank() ? "default" : value.trim(); + } + + private at.procon.eventhub.dto.EventDetailsDto detailsFor(NormalizedEvent normalized) { + if (normalized.domain() == EventDomain.DRIVER_ACTIVITY) { + return detailsFactory.driverActivity(CardSlot.DRIVER, CardStatus.INSERTED, DrivingStatus.UNKNOWN); + } + if (normalized.domain() == EventDomain.DRIVER_CARD) { + CardStatus status = normalized.lifecycle() == EventLifecycle.INSERT ? CardStatus.INSERTED : CardStatus.NOT_INSERTED; + return detailsFactory.driverCard(CardSlot.DRIVER, status); + } + return null; + } + + private NormalizedEvent normalize(Integer eventType, Integer state) { + if (eventType == null || state == null) { + return new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT); + } + + return switch (eventType) { + case 0, 1 -> normalizeCardActivity(state); + case 2, 3 -> normalizeDriverActivity(state); + default -> new NormalizedEvent(EventDomain.TELEMATICS_DATA, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT); + }; + } + + private NormalizedEvent normalizeCardActivity(Integer state) { + return switch (state) { + case 0 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_WITHDRAWN, EventLifecycle.WITHDRAW); + case 1 -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.CARD_INSERTED, EventLifecycle.INSERT); + default -> new NormalizedEvent(EventDomain.DRIVER_CARD, EventType.UNKNOWN_EVENT, EventLifecycle.SNAPSHOT); + }; + } + + private NormalizedEvent normalizeDriverActivity(Integer state) { + return switch (state) { + case 0 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.BREAK_REST, EventLifecycle.SNAPSHOT); + case 1 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.AVAILABILITY, EventLifecycle.SNAPSHOT); + case 2 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.WORK, EventLifecycle.SNAPSHOT); + case 3 -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.DRIVE, EventLifecycle.SNAPSHOT); + default -> new NormalizedEvent(EventDomain.DRIVER_ACTIVITY, EventType.UNKNOWN_ACTIVITY, EventLifecycle.SNAPSHOT); + }; + } + + private record NormalizedEvent(EventDomain domain, EventType type, EventLifecycle lifecycle) { + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..68a841b --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,31 @@ +spring: + application: + name: eventhub-ingestion-service + datasource: + url: jdbc:postgresql://localhost:5432/eventhub + username: eventhub + password: eventhub + flyway: + enabled: true + default-schema: eventhub + schemas: eventhub + create-schemas: true + +server: + port: 8080 + +camel: + springboot: + name: eventhub-ingestion-camel + main-run-controller: true + +management: + endpoints: + web: + exposure: + include: health,info,metrics,camelroutes + +eventhub: + batch: + completion-size: 1000 + completion-timeout: 5s diff --git a/src/main/resources/db/migration/V1__create_eventhub_schema.sql b/src/main/resources/db/migration/V1__create_eventhub_schema.sql new file mode 100644 index 0000000..40aaf7b --- /dev/null +++ b/src/main/resources/db/migration/V1__create_eventhub_schema.sql @@ -0,0 +1,127 @@ +create extension if not exists pgcrypto; + +create schema if not exists eventhub; + +-- Acquisition source definition. This represents where the imported source +-- record came from, not necessarily the canonical identity of the real-world event. +create table if not exists eventhub.event_source ( + id integer generated always as identity primary key, + provider_key text not null, + source_kind text not null, + source_key text not null, + source_instance_key text not null default 'default', + tenant_provider_setting_key text, + external_fleet_key text, + created_at timestamptz not null default now(), + constraint ux_event_source unique (provider_key, source_kind, source_key, source_instance_key) +); + +-- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/2026-04-28. +-- Final canonical storage can be discussed later; this table is still useful for acquisition audit. +create table if not exists eventhub.data_package ( + id uuid primary key, + event_source_id integer not null references eventhub.event_source(id), + tenant_key text not null, + package_key text not null, + package_type text not null, + status text not null, + event_family text, + business_date date, + external_package_id text, + requested_from timestamptz, + requested_to timestamptz, + received_at timestamptz not null default now(), + completed_at timestamptz, + event_count integer not null default 0, + metadata jsonb not null default '{}'::jsonb, + error_message text, + constraint ux_data_package_external unique (tenant_key, event_source_id, external_package_id, received_at) +); + +-- Temporary acquisition-stage point-event store. +-- It keeps the discussed DTO shape: EventSource context, externalSourceEventId, +-- one occurredAt timestamp, source-side driver/vehicle refs, normalized event details, +-- and raw JSON payload. It intentionally has no internal driver_id/vehicle_id in the +-- incoming model; master-data resolution can be added later. +create table if not exists eventhub.acquired_event ( + id uuid not null, + event_source_id integer not null references eventhub.event_source(id), + data_package_id uuid not null references eventhub.data_package(id), + + external_source_event_id text not null, + + driver_source_entity_id text, + driver_card_nation text, + driver_card_number text, + + vehicle_source_entity_id text, + vehicle_vin text, + vehicle_registration_nation text, + vehicle_registration_number text, + + occurred_at timestamptz not null, + received_partner_at timestamptz, + received_hub_at timestamptz not null default now(), + + event_domain text not null, + event_type text not null, + lifecycle text not null, + + odometer_m bigint, + latitude numeric(10, 7), + longitude numeric(10, 7), + + event_details jsonb not null default '{}'::jsonb, + payload jsonb not null default '{}'::jsonb, + manual_entry boolean not null default false, + + -- Excludes EventSource: useful later for canonical event deduplication. + canonical_key_hash text not null, + + -- Includes tenant + EventSource + externalSourceEventId: prevents duplicate imports of the same source record. + source_record_key_hash text not null, + + created_at timestamptz not null default now(), + + constraint pk_acquired_event primary key (occurred_at, id), + constraint chk_acquired_event_driver_or_vehicle_ref check ( + driver_source_entity_id is not null + or driver_card_number is not null + or vehicle_source_entity_id is not null + or vehicle_vin is not null + or vehicle_registration_number is not null + ), + constraint chk_acquired_event_position_pair check ((latitude is null and longitude is null) or (latitude is not null and longitude is not null)), + constraint chk_driver_card_nation_when_number check (driver_card_number is null or driver_card_nation is not null), + constraint chk_vehicle_registration_nation_when_number check (vehicle_registration_number is null or vehicle_registration_nation is not null) +); + +create unique index if not exists ux_acquired_event_source_record + on eventhub.acquired_event(source_record_key_hash); + +create index if not exists idx_acquired_event_canonical_key + on eventhub.acquired_event(canonical_key_hash); + +create index if not exists idx_acquired_event_vehicle_vin_time + on eventhub.acquired_event(vehicle_vin, occurred_at desc) + where vehicle_vin is not null; + +create index if not exists idx_acquired_event_vehicle_registration_time + on eventhub.acquired_event(vehicle_registration_nation, vehicle_registration_number, occurred_at desc) + where vehicle_registration_number is not null; + +create index if not exists idx_acquired_event_driver_card_time + on eventhub.acquired_event(driver_card_nation, driver_card_number, occurred_at desc) + where driver_card_number is not null; + +create index if not exists idx_acquired_event_domain_type_time + on eventhub.acquired_event(event_domain, event_type, occurred_at desc); + +create index if not exists idx_acquired_event_details_gin + on eventhub.acquired_event using gin(event_details); + +create index if not exists idx_acquired_event_payload_gin + on eventhub.acquired_event using gin(payload); + +create index if not exists idx_data_package_source_time + on eventhub.data_package(tenant_key, event_source_id, received_at desc); diff --git a/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java new file mode 100644 index 0000000..cabfa2b --- /dev/null +++ b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java @@ -0,0 +1,61 @@ +package at.procon.eventhub; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; +import at.procon.eventhub.service.EventDetailsFactory; +import at.procon.eventhub.service.YellowFoxD8BookingEventMapper; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.OffsetDateTime; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class YellowFoxD8BookingEventMapperTest { + + private final YellowFoxD8BookingEventMapper mapper = new YellowFoxD8BookingEventMapper( + new EventDetailsFactory(new ObjectMapper()) + ); + + @Test + void mapsYellowFoxDrivingStateAsSnapshotWithEventSourceAndDetails() { + YellowFoxD8BookingDto source = new YellowFoxD8BookingDto( + "tenant-1", + "yellowfox-tenant-7", + "tenant-yellowfox-7", + "7", + "event-1", + "key-1", + 2, + 3, + OffsetDateTime.parse("2026-04-29T08:15:00+02:00"), + null, + new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")), + new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345")), + null, + null, + null, + null + ); + + var result = mapper.map(source); + + assertThat(result.eventDomain()).isEqualTo(EventDomain.DRIVER_ACTIVITY); + assertThat(result.eventType()).isEqualTo(EventType.DRIVE); + assertThat(result.lifecycle()).isEqualTo(EventLifecycle.SNAPSHOT); + assertThat(result.externalSourceEventId()).isEqualTo("event-1"); + assertThat(result.packageInfo().tenantKey()).isEqualTo("tenant-1"); + assertThat(result.packageInfo().eventSource().providerKey()).isEqualTo("YELLOWFOX"); + assertThat(result.packageInfo().eventSource().sourceKey()).isEqualTo("YELLOWFOX_D8"); + assertThat(result.driverRef().driverCard().nation()).isEqualTo("AT"); + assertThat(result.vehicleRef().vehicleRegistration().nation()).isEqualTo("AT"); + assertThat(result.eventDetails().type()).isEqualTo("DRIVER_ACTIVITY"); + assertThat(result.payload().get("yellowFoxEventType").asInt()).isEqualTo(2); + assertThat(result.payload().get("yellowFoxState").asInt()).isEqualTo(3); + } +}