From 6a7395bec634fd3dcd8a14f8ad273746194a5661 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:33:34 +0200 Subject: [PATCH] Add tachograph import planning scaffold --- README.md | 504 ++++++++++-------- .../api/EventHubIngestionController.java | 21 +- .../camel/TachographImportRequestRoute.java | 33 ++ .../eventhub/dto/AcquisitionStrategy.java | 21 + .../at/procon/eventhub/dto/DriverRefDto.java | 7 +- .../at/procon/eventhub/dto/EventFamily.java | 12 + .../at/procon/eventhub/dto/ImportMode.java | 7 + .../eventhub/dto/TachographImportPlanDto.java | 15 + .../dto/TachographImportPlanItemDto.java | 13 + .../eventhub/dto/TachographImportRequest.java | 43 ++ .../at/procon/eventhub/dto/VehicleRefDto.java | 7 +- .../eventhub/persistence/EventRepository.java | 58 +- .../persistence/EventSourceRepository.java | 34 +- .../service/EventHubIngestionService.java | 2 +- .../service/TachographImportPlanService.java | 77 +++ .../migration/V1__create_eventhub_schema.sql | 80 ++- .../EventAcquisitionRecordKeyServiceTest.java | 4 +- .../YellowFoxD8BookingEventMapperTest.java | 4 +- 18 files changed, 642 insertions(+), 300 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/camel/TachographImportRequestRoute.java create mode 100644 src/main/java/at/procon/eventhub/dto/AcquisitionStrategy.java create mode 100644 src/main/java/at/procon/eventhub/dto/EventFamily.java create mode 100644 src/main/java/at/procon/eventhub/dto/ImportMode.java create mode 100644 src/main/java/at/procon/eventhub/dto/TachographImportPlanDto.java create mode 100644 src/main/java/at/procon/eventhub/dto/TachographImportPlanItemDto.java create mode 100644 src/main/java/at/procon/eventhub/dto/TachographImportRequest.java create mode 100644 src/main/java/at/procon/eventhub/service/TachographImportPlanService.java diff --git a/README.md b/README.md index 4d5e165..df58610 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,8 @@ # EventHub Acquisition Service -Spring Boot + Apache Camel project skeleton for acquiring normalized EventHub point events from multiple providers/sources. +Spring Boot + Apache Camel skeleton for acquiring normalized EventHub point events from multiple providers/sources. -The current version intentionally focuses on **acquisition**. It stores source records as imported and does not merge or deduplicate equivalent events from different providers/sources. It does, however, keep a non-unique eventSignatureHash as a later merge/gap-filling hint. Later query/read models can merge sources when a preferred/main source contains gaps. The included PostgreSQL schema is a small acquisition-stage store so the project can be run and tested end-to-end. - -## Architecture - -```text -source-specific Camel input route - -> source-specific mapper - -> EventHubEventDto - -> common EventHub acquisition route - -> validation - -> package-key creation from tenant + EventSource + source group + import scope + event family - -> aggregation / batching - -> chronological sorting inside the batch - -> acquisition package handoff -``` +The current version focuses on **acquisition from source systems**, especially tachograph DB data. It stores source records as imported. It does **not** merge or deduplicate equivalent events from different providers/sources. It does keep a non-unique `eventSignatureHash` as a future query/projection hint. ## Namespace @@ -26,7 +12,7 @@ at.procon.eventhub ## Main model decisions -### 1. One event = one time point +### One event = one point in time `EventHubEventDto` has exactly one timestamp: @@ -34,13 +20,15 @@ at.procon.eventhub occurredAt ``` -There is no generic `duration`, `endTime`, `validFrom`, or `validTo`. If a source row represents an interval, the mapper may emit separate point events, for example `DRIVE START` and `DRIVE END`. +There is no generic `duration`, `endTime`, `validFrom`, or `validTo`. If a source row represents an interval, a mapper may emit separate point events such as `DRIVE START` and `DRIVE END`. -### 2. Tenant is package-level +### Tenant is package/job-level -`tenantKey` identifies the owner/client/account for the package. It is required for acquisition grouping and future master-data resolution. +`tenantKey` identifies the customer/data owner. It is mandatory for import packages and tachograph import requests. -### 3. EventSource identifies the technical source +### EventSource identifies the technical source + +Example: ```json { @@ -62,34 +50,9 @@ YELLOWFOX / TELEMATICS_PLATFORM / YELLOWFOX_D8 FLEETBOARD / TELEMATICS_PLATFORM / FLEETBOARD_POSITION ``` -`EventSource` is acquisition context. A VU event, a driver-card event and a YellowFox D8 event may describe the same real-world event, but this acquisition service keeps them as separate acquired source records. Cross-source merging/gap filling is intentionally left for a later query/read model. +### SourceGroup is package/source grouping only -### 4. No cross-source deduplication during acquisition - -The acquisition layer stores every source record independently. It uses `sourceRecordKeyHash` only for idempotency of the same source event, so the same input package can be retried safely. It does **not** deduplicate VU vs driver-card vs YellowFox records. - -This is intentional because later queries may need to combine sources: for example, use tachograph data as the main source, but fill gaps from YellowFox or another provider. - -The acquisition table also stores a non-unique `eventSignatureHash`. This is a semantic merge hint, not a unique key. It intentionally excludes `EventSource` and `externalSourceEventId`, so VU, driver-card and YellowFox records that look like the same real-world event can share a signature while still being stored separately. Later query/projection logic can use this signature for source comparison, gap filling, and merged timelines. The signature prefers nation-scoped driver card and vehicle registration when available, then VIN or source entity id as fallback, so it remains useful before final master-data resolution. - -Therefore the current model preserves: - -```text -tenantKey -eventSource -sourceGroup -importScope -externalSourceEventId -source-side driver/vehicle references -eventDetails -payload -``` - -### 5. SourceGroup captures tachograph organisation or YellowFox fleet - -`sourceGroup` is package-level source grouping information. - -For tachograph it can be a source organisation: +For tachograph, `sourceGroup` can identify the selected source organisation/root organisation. ```json "sourceGroup": { @@ -100,7 +63,7 @@ For tachograph it can be a source organisation: } ``` -For YellowFox it can be a fleet: +For YellowFox, it can identify the provider fleet. ```json "sourceGroup": { @@ -111,9 +74,9 @@ For YellowFox it can be a fleet: } ``` -The YellowFox fleet belongs to the same tenant/customer, but it is not forced to be an organisation. It can later be mapped to a tenant organisation if needed. +YellowFox fleet is not forced to be an organisation. It belongs to the same tenant/customer and can later be mapped or resolved through vehicle/driver master data if needed. -### 6. ImportScope captures organisation and time filtering +### ImportScope describes data selection `importScope` describes what was selected from the source system. @@ -146,13 +109,15 @@ Organisation subtree + time-window import: } ``` -`occurredFrom` is inclusive and `occurredTo` is exclusive. Both may be `null` for complete source DB import. +`occurredFrom` is inclusive. `occurredTo` is exclusive. Both can be `null` for complete DB/history imports. -### 7. Source-side master references, no incoming internal IDs +### Driver/vehicle refs do not contain organisation -The incoming DTO does not require internal `driverId` or `vehicleId`, because in normal ingestion those ids are not known yet. +Organisation assignment is a **master-data relation**, not an event property. -Driver reference with nation-scoped driver card: +Events depend on driver and/or vehicle. The relation of organisation to driver/vehicle is imported and resolved separately from master data using `occurredAt`. + +Driver ref: ```json "driverRef": { @@ -160,17 +125,11 @@ Driver reference with nation-scoped driver card: "driverCard": { "nation": "AT", "number": "D123456789" - }, - "sourceOrganisation": { - "type": "ORGANISATION", - "sourceEntityId": "57", - "code": "57", - "name": "Sub Org 57" } } ``` -Vehicle reference with optional VIN and nation-scoped VRN: +Vehicle ref: ```json "vehicleRef": { @@ -179,17 +138,11 @@ Vehicle reference with optional VIN and nation-scoped VRN: "vehicleRegistration": { "nation": "AT", "number": "W-12345" - }, - "sourceOrganisation": { - "type": "ORGANISATION", - "sourceEntityId": "57", - "code": "57", - "name": "Sub Org 57" } } ``` -VIN is optional. Driver-card-only events can carry only the nation-scoped VRN/registration: +Driver-card-only imports can carry only a nation-scoped VRN and no VIN: ```json "vehicleRef": { @@ -198,33 +151,264 @@ VIN is optional. Driver-card-only events can carry only the nation-scoped VRN/re "vehicleRegistration": { "nation": "AT", "number": "W-12345" - }, - "sourceOrganisation": null -} -``` - -This allows late resolution when VU/master data later connects the VRN to a VIN. - -### 8. Generic normalized eventDetails - -Reusable event-specific properties are stored in: - -```json -"eventDetails": { - "type": "DRIVER_ACTIVITY", - "attributes": { - "cardSlot": "DRIVER", - "cardStatus": "INSERTED", - "drivingStatus": "SINGLE" } } ``` -Raw provider values stay in `payload`. +Later master-data resolution can connect `VRN + nation + occurredAt` to a VIN/vehicle. -## Package-level acquisition request +### No cross-source deduplication during acquisition -For external/manual ingestion, the preferred request shape is: +The acquisition layer stores every source record independently. It uses `sourceRecordKeyHash` only for idempotency of the same source event: + +```text +tenantKey + EventSource + externalSourceEventId +``` + +It also stores a non-unique `eventSignatureHash`. This is only a semantic hint for future query-time merging/gap filling. It is not unique and must not suppress imports. + +## Tachograph import job model + +For real tachograph DB extraction, use a tachograph import request. This describes the job and produces an import plan. SQL extraction routes are intentionally scaffolded as the next implementation step. + +```http +POST /api/eventhub/acquisition/tachograph/imports/plan +POST /api/eventhub/acquisition/tachograph/imports/start +``` + +Example: initial import from one root organisation and its children: + +```json +{ + "tenantKey": "kralowetz", + "eventSource": { + "providerKey": "TACHOGRAPH", + "sourceKind": "MIXED", + "sourceKey": "TACHOGRAPH_DB", + "sourceInstanceKey": "main-tachograph-db", + "tenantProviderSettingKey": "kralowetz-tachograph-prod" + }, + "sourceGroup": { + "type": "ORGANISATION", + "sourceEntityId": "147", + "code": "147", + "name": "Kralowetz" + }, + "importScope": { + "type": "SOURCE_ORGANISATION_SUBTREE", + "rootSourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "147", + "code": "147", + "name": "Kralowetz" + }, + "includeChildren": true, + "occurredFrom": "2025-01-01T00:00:00+01:00", + "occurredTo": null + }, + "eventFamilies": [ + "DRIVER_ACTIVITY", + "DRIVER_CARD", + "POSITION", + "BORDER_CROSSING", + "LOAD_UNLOAD", + "PLACE", + "SPECIFIC_CONDITION", + "SPEEDING" + ], + "mode": "INITIAL_BACKFILL", + "refreshMasterDataFirst": true, + "acquisitionStrategy": "OCCURRED_AT_WINDOW_WITH_OVERLAP" +} +``` + +Example: regular incremental update: + +```json +{ + "tenantKey": "kralowetz", + "eventSource": { + "providerKey": "TACHOGRAPH", + "sourceKind": "MIXED", + "sourceKey": "TACHOGRAPH_DB", + "sourceInstanceKey": "main-tachograph-db", + "tenantProviderSettingKey": "kralowetz-tachograph-prod" + }, + "sourceGroup": { + "type": "ORGANISATION", + "sourceEntityId": "147" + }, + "importScope": { + "type": "SOURCE_ORGANISATION_SUBTREE", + "rootSourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "147" + }, + "includeChildren": true, + "occurredFrom": null, + "occurredTo": null + }, + "eventFamilies": ["DRIVER_ACTIVITY", "DRIVER_CARD", "POSITION", "BORDER_CROSSING", "LOAD_UNLOAD", "PLACE", "SPECIFIC_CONDITION", "SPEEDING"], + "mode": "INCREMENTAL_UPDATE", + "refreshMasterDataFirst": true, + "acquisitionStrategy": "SOURCE_PACKAGE_WATERMARK" +} +``` + +## Tachograph extraction plan + +The import-plan service currently creates extraction definitions like: + +```text +DRIVER_ACTIVITY / VEHICLE_UNIT -> VUActivity +DRIVER_ACTIVITY / DRIVER_CARD -> CardActivity +DRIVER_CARD / VEHICLE_UNIT -> IWCycle +DRIVER_CARD / DRIVER_CARD -> CardVehiclesUsed +POSITION / VEHICLE_UNIT -> VUPlaces, VULoadUnload, VUGnssAccumulatedDriving, VUBorderCrossing +POSITION / DRIVER_CARD -> CardPlaces, CardLoadUnload, CardGnssAccumulatedDriving, CardBorderCrossing +BORDER_CROSSING / VEHICLE_UNIT -> VUBorderCrossing +BORDER_CROSSING / DRIVER_CARD -> CardBorderCrossing +LOAD_UNLOAD / VEHICLE_UNIT -> VULoadUnload +LOAD_UNLOAD / DRIVER_CARD -> CardLoadUnload +SPECIFIC_CONDITION / VEHICLE_UNIT -> VUSpecificCondition +SPECIFIC_CONDITION / DRIVER_CARD -> CardSpecificCondition +PLACE / VEHICLE_UNIT -> VUPlaces +PLACE / DRIVER_CARD -> CardPlaces +SPEEDING / VEHICLE_UNIT -> SpeedingEvents +``` + +The next implementation step is to replace the scaffolded plan items with actual Camel/JDBC SQL extraction routes. + +## Acquisition alternatives considered + +### Alternative A: occurred-time window import + +Read events by `occurredAt` for a root organisation/time window. + +Pros: + +```text +simple +works for initial backfill +matches explicit from/to import requests +``` + +Cons: + +```text +unsafe as the only incremental method because a newly imported card/VU package can contain old occurredAt data +requires overlap windows for regular updates +``` + +Best use: + +```text +initial backfill and reprocessing +fallback incremental strategy with overlap +``` + +### Alternative B: source-package watermark import + +Read original tachograph card/VU packages that were imported/changed in the tachograph DB since the last successful EventHub run, then extract all events belonging to those packages. + +Pros: + +```text +best for regular updates +handles late-arriving historical tachograph packages +fits the tachograph package concept +``` + +Cons: + +```text +requires reliable source package metadata and links from event rows to package/source download +more complex SQL and cursor state +``` + +Best use: + +```text +primary incremental strategy if tachograph DB exposes package import timestamps/ids +``` + +### Alternative C: source-row watermark import + +Read source event rows changed since last run using row-level `updatedAt` or monotonic IDs. + +Pros: + +```text +precise if row update timestamps are reliable +does not require package-level model +``` + +Cons: + +```text +not possible if source tables do not have reliable changed/updated metadata +harder across many event tables +``` + +Best use: + +```text +fallback when rows have reliable updatedAt/row version fields +``` + +### Alternative D: per vehicle/per driver polling + +After master-data refresh, loop through vehicles and drivers in the selected organisation subtree and read their event data. + +Pros: + +```text +matches your existing data acquisition pattern +naturally separates vehicle-unit and driver-card data +supports organisation-scoped imports well +``` + +Cons: + +```text +can be slower for large fleets +requires careful batching/chunking and parallelism +can miss late old data unless combined with package/row watermark or overlap +``` + +Best use: + +```text +scope resolution and controlled extraction, combined with Alternative A or B +``` + +## Recommended ingestion strategy + +Use a hybrid: + +```text +Initial import: + master data first + organisation subtree + occurredFrom/occurredTo + chunk by time and/or vehicle/driver + import idempotently by sourceRecordKeyHash + +Regular update: + master data first + prefer source-package watermark + fallback to occurredAt overlap window if package metadata is insufficient + import idempotently by sourceRecordKeyHash +``` + +This means the EventHub acquisition package is an **extraction package**, while the original tachograph card/VU package should be preserved as source metadata in payload or later in a dedicated source-package table. + +## Existing package-level normalized event ingestion + +```http +POST /api/eventhub/acquisition/packages +``` + +Example: ```json { @@ -239,17 +423,13 @@ For external/manual ingestion, the preferred request shape is: }, "sourceGroup": { "type": "ORGANISATION", - "sourceEntityId": "147", - "code": "147", - "name": "Kralowetz" + "sourceEntityId": "147" }, "importScope": { "type": "SOURCE_ORGANISATION_SUBTREE", "rootSourceOrganisation": { "type": "ORGANISATION", - "sourceEntityId": "147", - "code": "147", - "name": "Kralowetz" + "sourceEntityId": "147" }, "includeChildren": true, "occurredFrom": "2026-04-28T00:00:00+02:00", @@ -267,10 +447,6 @@ For external/manual ingestion, the preferred request shape is: "driverCard": { "nation": "AT", "number": "D123456789" - }, - "sourceOrganisation": { - "type": "ORGANISATION", - "sourceEntityId": "57" } }, "vehicleRef": { @@ -279,10 +455,6 @@ For external/manual ingestion, the preferred request shape is: "vehicleRegistration": { "nation": "AT", "number": "W-12345" - }, - "sourceOrganisation": { - "type": "ORGANISATION", - "sourceEntityId": "57" } }, "occurredAt": "2026-04-28T08:00:00+02:00", @@ -312,17 +484,16 @@ For external/manual ingestion, the preferred request shape is: ## Routes -### Source-specific input routes - ```text direct:yellowfox-d8-booking-input direct:telematics-position-input direct:tachograph-activity-input +direct:tachograph-import-start direct:eventhub-package-input direct:eventhub-manual-input ``` -### Common route +Common route: ```text direct:eventhub-normalized-input @@ -334,97 +505,6 @@ direct:eventhub-normalized-input -> EventHubIngestionService.ingest(...) ``` -## REST endpoints - -```text -POST /api/eventhub/acquisition/yellowfox/d8-bookings -POST /api/eventhub/acquisition/telematics/positions -POST /api/eventhub/acquisition/tachograph/activities -POST /api/eventhub/acquisition/packages -POST /api/eventhub/acquisition/events -``` - -## Example: tachograph driver-card activity with VRN only - -```bash -curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activities \ - -H "Content-Type: application/json" \ - -d '[ - { - "tenantKey": "kralowetz", - "sourceKind": "DRIVER_CARD", - "sourceInstanceKey": "main-tachograph-db", - "tenantProviderSettingKey": "kralowetz-tachograph-prod", - "externalSourceEventId": "TACHOGRAPH:DRIVER_CARD:activity:789:start", - "driverRef": { - "sourceEntityId": "driver-100", - "driverCard": { - "nation": "AT", - "number": "D123456789" - }, - "sourceOrganisation": { - "type": "ORGANISATION", - "sourceEntityId": "57" - } - }, - "vehicleRef": { - "sourceEntityId": null, - "vin": null, - "vehicleRegistration": { - "nation": "AT", - "number": "W-12345" - }, - "sourceOrganisation": null - }, - "occurredAt": "2026-04-28T08:00:00+02:00", - "activityType": "DRIVE", - "lifecycle": "START", - "cardSlot": "DRIVER", - "cardStatus": "INSERTED", - "drivingStatus": "SINGLE", - "payload": { - "raw": { - "activity": 3, - "cardSlot": 0, - "cardStatus": 0, - "drivingStatus": 0 - } - } - } - ]' -``` - -The mapper creates a default `TENANT_ALL` one-day import scope for this convenience endpoint. For real tachograph import jobs with organisation subtree/full DB scope, use the package-level request or add dedicated SQL extraction job routes. - -## Example: full tachograph DB import package - -```json -{ - "package": { - "tenantKey": "kralowetz", - "eventSource": { - "providerKey": "TACHOGRAPH", - "sourceKind": "VEHICLE_UNIT", - "sourceKey": "TACHOGRAPH_VEHICLE_UNIT", - "sourceInstanceKey": "main-tachograph-db", - "tenantProviderSettingKey": "kralowetz-tachograph-prod" - }, - "sourceGroup": null, - "importScope": { - "type": "TENANT_ALL", - "rootSourceOrganisation": null, - "includeChildren": false, - "occurredFrom": null, - "occurredTo": null - }, - "eventFamily": "DRIVER_ACTIVITY", - "businessDate": null, - "externalPackageId": "TACHOGRAPH:ALL:DRIVER_ACTIVITY:FULL" - }, - "events": [] -} -``` - ## Start PostgreSQL ```bash @@ -467,15 +547,14 @@ select occurred_at, driver_source_entity_id, driver_card_nation, driver_card_number, - driver_source_org_entity_id, vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, - vehicle_source_org_entity_id, event_domain, event_type, lifecycle, + event_signature_hash, event_details, payload from eventhub.acquired_event @@ -484,23 +563,8 @@ order by occurred_at desc; ## Next implementation steps -1. Add source-specific SQL extraction routes for the tachograph DB event families: - - activities from CardActivity/VUActivity - - card insert/withdraw from CardVehiclesUsed/IWCycle - - positions from places/GNSS/border/load-unload sources - - border crossings - - load/unload - - specific conditions: out-of-scope and ferry/train - - speeding events -2. Each SQL extraction route should accept `ImportScopeDto`: - - optional source organisation root + include children - - optional occurredFrom/occurredTo - - null time bounds mean complete DB/history import -3. Add master-data resolution later: - - driver by tenant + driver card nation/number + occurredAt - - vehicle by tenant + VIN or tenant + registration nation/number + occurredAt - - late resolution from VRN-only driver-card events to VIN after VU/master data import -4. Discuss query/read models later: - - how to merge acquired events from all sources at query time - - source priority per event family when the main source contains gaps - - how to expose source provenance when multiple sources describe the same real-world event +1. Add actual Camel/JDBC extraction routes behind the tachograph import plan. +2. Implement master-data acquisition first: organisation tree, driver/card assignments, vehicle VIN/VRN assignments, driver/vehicle organisation assignment histories. +3. Implement initial backfill using organisation/time scope. +4. Implement incremental import using source-package watermark, with occurredAt overlap fallback. +5. Discuss query/read models later: source priority and gap filling across tachograph, YellowFox and other sources. diff --git a/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java b/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java index ed33565..d457c7e 100644 --- a/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java +++ b/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java @@ -2,9 +2,11 @@ package at.procon.eventhub.api; import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubPackageIngestRequest; +import at.procon.eventhub.dto.TachographImportRequest; import at.procon.eventhub.dto.source.TachographActivityDto; import at.procon.eventhub.dto.source.TelematicsPositionDto; import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; +import at.procon.eventhub.service.TachographImportPlanService; import jakarta.validation.Valid; import java.util.List; import java.util.Map; @@ -20,9 +22,11 @@ import org.springframework.web.bind.annotation.RestController; public class EventHubIngestionController { private final ProducerTemplate producerTemplate; + private final TachographImportPlanService tachographImportPlanService; - public EventHubIngestionController(ProducerTemplate producerTemplate) { + public EventHubIngestionController(ProducerTemplate producerTemplate, TachographImportPlanService tachographImportPlanService) { this.producerTemplate = producerTemplate; + this.tachographImportPlanService = tachographImportPlanService; } @PostMapping("/yellowfox/d8-bookings") @@ -43,6 +47,21 @@ public class EventHubIngestionController { return accepted(activities.size(), "direct:tachograph-activity-input"); } + @PostMapping("/tachograph/imports/plan") + public ResponseEntity planTachographImport(@Valid @RequestBody TachographImportRequest request) { + return ResponseEntity.ok(tachographImportPlanService.createPlan(request)); + } + + @PostMapping("/tachograph/imports/start") + public ResponseEntity> startTachographImport(@Valid @RequestBody TachographImportRequest request) { + producerTemplate.sendBody("direct:tachograph-import-start", request); + return ResponseEntity.accepted().body(Map.of( + "accepted", true, + "route", "direct:tachograph-import-start", + "note", "The current implementation prepares the tachograph import plan. SQL extraction routes are intentionally scaffolded as next step." + )); + } + @PostMapping("/packages") public ResponseEntity> ingestPackage(@Valid @RequestBody EventHubPackageIngestRequest request) { producerTemplate.sendBody("direct:eventhub-package-input", request); diff --git a/src/main/java/at/procon/eventhub/camel/TachographImportRequestRoute.java b/src/main/java/at/procon/eventhub/camel/TachographImportRequestRoute.java new file mode 100644 index 0000000..9c55cc1 --- /dev/null +++ b/src/main/java/at/procon/eventhub/camel/TachographImportRequestRoute.java @@ -0,0 +1,33 @@ +package at.procon.eventhub.camel; + +import at.procon.eventhub.dto.TachographImportRequest; +import at.procon.eventhub.service.TachographImportPlanService; +import org.apache.camel.builder.RouteBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class TachographImportRequestRoute extends RouteBuilder { + + private static final Logger log = LoggerFactory.getLogger(TachographImportRequestRoute.class); + + private final TachographImportPlanService planService; + + public TachographImportRequestRoute(TachographImportPlanService planService) { + this.planService = planService; + } + + @Override + public void configure() { + from("direct:tachograph-import-start") + .routeId("tachograph-import-start-route") + .process(exchange -> { + TachographImportRequest request = exchange.getMessage().getBody(TachographImportRequest.class); + var plan = planService.createPlan(request); + log.info("Prepared tachograph import plan tenant={} mode={} strategy={} scope={} itemCount={}", + plan.tenantKey(), plan.mode(), plan.acquisitionStrategy(), plan.importScope().stableKey(), plan.items().size()); + exchange.getMessage().setBody(plan); + }); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/AcquisitionStrategy.java b/src/main/java/at/procon/eventhub/dto/AcquisitionStrategy.java new file mode 100644 index 0000000..ddfe76b --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/AcquisitionStrategy.java @@ -0,0 +1,21 @@ +package at.procon.eventhub.dto; + +/** + * Strategy hint for tachograph DB acquisition. + */ +public enum AcquisitionStrategy { + /** + * Preferred when the tachograph DB has original card/VU package metadata and import timestamps. + */ + SOURCE_PACKAGE_WATERMARK, + + /** + * Use if event/source rows have reliable updated-at timestamps. + */ + SOURCE_ROW_WATERMARK, + + /** + * Simple and robust fallback: re-read an occurred-time window with overlap and rely on source-record idempotency. + */ + OCCURRED_AT_WINDOW_WITH_OVERLAP +} diff --git a/src/main/java/at/procon/eventhub/dto/DriverRefDto.java b/src/main/java/at/procon/eventhub/dto/DriverRefDto.java index af9c775..11f7fdb 100644 --- a/src/main/java/at/procon/eventhub/dto/DriverRefDto.java +++ b/src/main/java/at/procon/eventhub/dto/DriverRefDto.java @@ -6,11 +6,14 @@ import jakarta.validation.Valid; * Source-side driver reference. No internal EventHub driver id is required in * incoming acquisition requests; it can be resolved later from sourceEntityId or * nation-scoped driver card number. + * + * Organisation assignment is intentionally not stored on the event. Driver ↔ + * organisation relation belongs to master data and can be resolved by + * sourceEntityId/driverCard + occurredAt when needed. */ public record DriverRefDto( String sourceEntityId, - @Valid DriverCardRefDto driverCard, - @Valid SourceGroupRefDto sourceOrganisation + @Valid DriverCardRefDto driverCard ) { public DriverRefDto { sourceEntityId = normalizeNullable(sourceEntityId); diff --git a/src/main/java/at/procon/eventhub/dto/EventFamily.java b/src/main/java/at/procon/eventhub/dto/EventFamily.java new file mode 100644 index 0000000..d77c936 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/EventFamily.java @@ -0,0 +1,12 @@ +package at.procon.eventhub.dto; + +public enum EventFamily { + DRIVER_ACTIVITY, + DRIVER_CARD, + POSITION, + BORDER_CROSSING, + LOAD_UNLOAD, + SPECIFIC_CONDITION, + PLACE, + SPEEDING +} diff --git a/src/main/java/at/procon/eventhub/dto/ImportMode.java b/src/main/java/at/procon/eventhub/dto/ImportMode.java new file mode 100644 index 0000000..4cfb70a --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/ImportMode.java @@ -0,0 +1,7 @@ +package at.procon.eventhub.dto; + +public enum ImportMode { + INITIAL_BACKFILL, + INCREMENTAL_UPDATE, + REPROCESS +} diff --git a/src/main/java/at/procon/eventhub/dto/TachographImportPlanDto.java b/src/main/java/at/procon/eventhub/dto/TachographImportPlanDto.java new file mode 100644 index 0000000..7354ebd --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/TachographImportPlanDto.java @@ -0,0 +1,15 @@ +package at.procon.eventhub.dto; + +import java.util.List; + +public record TachographImportPlanDto( + String tenantKey, + ImportMode mode, + AcquisitionStrategy acquisitionStrategy, + boolean refreshMasterDataFirst, + ImportScopeDto importScope, + SourceGroupRefDto sourceGroup, + EventSourceDto eventSource, + List items +) { +} diff --git a/src/main/java/at/procon/eventhub/dto/TachographImportPlanItemDto.java b/src/main/java/at/procon/eventhub/dto/TachographImportPlanItemDto.java new file mode 100644 index 0000000..9dac5e4 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/TachographImportPlanItemDto.java @@ -0,0 +1,13 @@ +package at.procon.eventhub.dto; + +import java.util.List; + +public record TachographImportPlanItemDto( + EventFamily eventFamily, + String sourceKind, + String extractionCode, + List sourceTables, + String entityAxis, + String description +) { +} diff --git a/src/main/java/at/procon/eventhub/dto/TachographImportRequest.java b/src/main/java/at/procon/eventhub/dto/TachographImportRequest.java new file mode 100644 index 0000000..d18f78e --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/TachographImportRequest.java @@ -0,0 +1,43 @@ +package at.procon.eventhub.dto; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import java.util.EnumSet; +import java.util.Set; + +/** + * Request for acquiring tachograph DB data into EventHub. This request describes + * the import job/scope; individual SQL extraction routes then create acquisition + * packages and EventHubEventDto records. + */ +public record TachographImportRequest( + @NotBlank String tenantKey, + @Valid @NotNull EventSourceDto eventSource, + @Valid SourceGroupRefDto sourceGroup, + @Valid @NotNull ImportScopeDto importScope, + Set eventFamilies, + ImportMode mode, + boolean refreshMasterDataFirst, + AcquisitionStrategy acquisitionStrategy +) { + public TachographImportRequest { + tenantKey = tenantKey == null ? null : tenantKey.trim(); + if (importScope == null) { + importScope = ImportScopeDto.tenantAll(null, null); + } + if (eventFamilies == null || eventFamilies.isEmpty()) { + eventFamilies = EnumSet.allOf(EventFamily.class); + } else { + eventFamilies = EnumSet.copyOf(eventFamilies); + } + if (mode == null) { + mode = ImportMode.INITIAL_BACKFILL; + } + if (acquisitionStrategy == null) { + acquisitionStrategy = mode == ImportMode.INCREMENTAL_UPDATE + ? AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK + : AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP; + } + } +} diff --git a/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java b/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java index 867f1d8..e5a7a02 100644 --- a/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java +++ b/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java @@ -5,12 +5,15 @@ import jakarta.validation.Valid; /** * Source-side vehicle reference. VIN can be missing for driver-card-only data; * VRN/registration is nation-scoped and can be resolved to VIN later. + * + * Organisation assignment is intentionally not stored on the event. Vehicle ↔ + * organisation relation belongs to master data and can be resolved by + * sourceEntityId/VIN/VRN + occurredAt when needed. */ public record VehicleRefDto( String sourceEntityId, String vin, - @Valid VehicleRegistrationRefDto vehicleRegistration, - @Valid SourceGroupRefDto sourceOrganisation + @Valid VehicleRegistrationRefDto vehicleRegistration ) { public VehicleRefDto { sourceEntityId = normalizeNullable(sourceEntityId); diff --git a/src/main/java/at/procon/eventhub/persistence/EventRepository.java b/src/main/java/at/procon/eventhub/persistence/EventRepository.java index f2fe1cc..3320deb 100644 --- a/src/main/java/at/procon/eventhub/persistence/EventRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/EventRepository.java @@ -3,7 +3,6 @@ package at.procon.eventhub.persistence; import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.EventHubEventDto; -import at.procon.eventhub.dto.SourceGroupRefDto; import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto; import at.procon.eventhub.service.EventAcquisitionRecordKeyService; @@ -36,9 +35,8 @@ public class EventRepository { * Acquisition-stage persistence. This table stores source records as imported. * It does not merge or deduplicate equivalent events from different sources; * later query/read models can combine sources when a preferred source has gaps. - * For now this table keeps acquired point events with EventSource context, - * source-side driver/vehicle refs, source organisation information, generic - * eventDetails, and raw payload JSON. + * Organisation assignment is not stored per event; it belongs to master-data + * relations for driver/vehicle and can be resolved by occurredAt later. */ public int batchInsert(UUID packageId, int eventSourceId, List events) { int[] counts = jdbcTemplate.batchUpdate( @@ -47,9 +45,7 @@ public class EventRepository { id, event_source_id, data_package_id, external_source_event_id, driver_source_entity_id, driver_card_nation, driver_card_number, - driver_source_org_entity_id, driver_source_org_code, driver_source_org_name, vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, - vehicle_source_org_entity_id, vehicle_source_org_code, vehicle_source_org_name, occurred_at, received_partner_at, received_hub_at, event_domain, event_type, lifecycle, odometer_m, latitude, longitude, @@ -59,12 +55,10 @@ public class EventRepository { ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?::jsonb, ?::jsonb, ?, ?, ? ) @@ -78,10 +72,8 @@ public class EventRepository { OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt(); DriverRefDto driverRef = event.driverRef(); DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard(); - SourceGroupRefDto driverOrg = driverRef == null ? null : driverRef.sourceOrganisation(); VehicleRefDto vehicleRef = event.vehicleRef(); VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration(); - SourceGroupRefDto vehicleOrg = vehicleRef == null ? null : vehicleRef.sourceOrganisation(); ps.setObject(1, eventId); ps.setInt(2, eventSourceId); @@ -91,37 +83,31 @@ public class EventRepository { ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId()); ps.setString(6, driverCard == null ? null : driverCard.nation()); ps.setString(7, driverCard == null ? null : driverCard.number()); - ps.setString(8, driverOrg == null ? null : driverOrg.sourceEntityId()); - ps.setString(9, driverOrg == null ? null : driverOrg.code()); - ps.setString(10, driverOrg == null ? null : driverOrg.name()); - ps.setString(11, vehicleRef == null ? null : vehicleRef.sourceEntityId()); - ps.setString(12, vehicleRef == null ? null : vehicleRef.vin()); - ps.setString(13, vehicleRegistration == null ? null : vehicleRegistration.nation()); - ps.setString(14, vehicleRegistration == null ? null : vehicleRegistration.number()); - ps.setString(15, vehicleOrg == null ? null : vehicleOrg.sourceEntityId()); - ps.setString(16, vehicleOrg == null ? null : vehicleOrg.code()); - ps.setString(17, vehicleOrg == null ? null : vehicleOrg.name()); + ps.setString(8, vehicleRef == null ? null : vehicleRef.sourceEntityId()); + ps.setString(9, vehicleRef == null ? null : vehicleRef.vin()); + ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation()); + ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number()); - ps.setObject(18, event.occurredAt()); - ps.setObject(19, event.receivedPartnerAt()); - ps.setObject(20, receivedHubAt); - ps.setString(21, event.eventDomain().name()); - ps.setString(22, event.eventType().name()); - ps.setString(23, event.lifecycle().name()); - setNullableLong(ps, 24, event.odometerM()); + ps.setObject(12, event.occurredAt()); + ps.setObject(13, event.receivedPartnerAt()); + ps.setObject(14, receivedHubAt); + ps.setString(15, event.eventDomain().name()); + ps.setString(16, event.eventType().name()); + ps.setString(17, event.lifecycle().name()); + setNullableLong(ps, 18, event.odometerM()); if (event.position() == null) { - ps.setNull(25, Types.NUMERIC); - ps.setNull(26, Types.NUMERIC); + ps.setNull(19, Types.NUMERIC); + ps.setNull(20, Types.NUMERIC); } else { - ps.setObject(25, event.position().latitude()); - ps.setObject(26, event.position().longitude()); + ps.setObject(19, event.position().latitude()); + ps.setObject(20, event.position().longitude()); } - ps.setString(27, toJson(objectMapper.valueToTree(event.eventDetails()))); - ps.setString(28, toJson(event.payload())); - ps.setBoolean(29, event.manualEntry()); - ps.setString(30, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId)); - ps.setString(31, recordKeyService.buildEventSignatureHash(event)); + ps.setString(21, toJson(objectMapper.valueToTree(event.eventDetails()))); + ps.setString(22, toJson(event.payload())); + ps.setBoolean(23, event.manualEntry()); + ps.setString(24, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId)); + ps.setString(25, recordKeyService.buildEventSignatureHash(event)); } @Override diff --git a/src/main/java/at/procon/eventhub/persistence/EventSourceRepository.java b/src/main/java/at/procon/eventhub/persistence/EventSourceRepository.java index f6fc71a..c6d3b49 100644 --- a/src/main/java/at/procon/eventhub/persistence/EventSourceRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/EventSourceRepository.java @@ -15,8 +15,9 @@ public class EventSourceRepository { this.jdbcTemplate = jdbcTemplate; } - public int resolveSourceId(EventSourceDto eventSource) { - Integer existing = findSourceId(eventSource); + public int resolveSourceId(String tenantKey, EventSourceDto eventSource) { + String normalizedTenantKey = tenantKey == null || tenantKey.isBlank() ? "default" : tenantKey.trim(); + Integer existing = findSourceId(normalizedTenantKey, eventSource); if (existing != null) { return existing; } @@ -25,40 +26,43 @@ public class EventSourceRepository { PreparedStatement ps = connection.prepareStatement( """ insert into eventhub.event_source( - provider_key, source_kind, source_key, source_instance_key, + tenant_key, provider_key, source_kind, source_key, source_instance_key, tenant_provider_setting_key, external_fleet_key - ) values (?, ?, ?, ?, ?, ?) - on conflict (provider_key, source_kind, source_key, source_instance_key) do nothing + ) values (?, ?, ?, ?, ?, ?, ?) + on conflict (tenant_key, provider_key, source_kind, source_key, source_instance_key) do nothing """, Statement.NO_GENERATED_KEYS ); - ps.setString(1, eventSource.providerKey()); - ps.setString(2, eventSource.sourceKind()); - ps.setString(3, eventSource.sourceKey()); - ps.setString(4, eventSource.sourceInstanceKey() == null ? "default" : eventSource.sourceInstanceKey()); - ps.setString(5, eventSource.tenantProviderSettingKey()); - ps.setString(6, eventSource.externalFleetKey()); + ps.setString(1, normalizedTenantKey); + ps.setString(2, eventSource.providerKey()); + ps.setString(3, eventSource.sourceKind()); + ps.setString(4, eventSource.sourceKey()); + ps.setString(5, eventSource.sourceInstanceKey() == null ? "default" : eventSource.sourceInstanceKey()); + ps.setString(6, eventSource.tenantProviderSettingKey()); + ps.setString(7, eventSource.externalFleetKey()); return ps; }); - Integer created = findSourceId(eventSource); + Integer created = findSourceId(normalizedTenantKey, eventSource); if (created == null) { - throw new IllegalStateException("Could not resolve event source id for " + eventSource.stableKey()); + throw new IllegalStateException("Could not resolve event source id for " + normalizedTenantKey + ":" + eventSource.stableKey()); } return created; } - private Integer findSourceId(EventSourceDto eventSource) { + private Integer findSourceId(String tenantKey, EventSourceDto eventSource) { return jdbcTemplate.query( """ select id from eventhub.event_source - where provider_key = ? + where tenant_key = ? + and provider_key = ? and source_kind = ? and source_key = ? and source_instance_key = ? """, rs -> rs.next() ? rs.getInt("id") : null, + tenantKey, eventSource.providerKey(), eventSource.sourceKind(), eventSource.sourceKey(), diff --git a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java index e8f7177..a78163a 100644 --- a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java +++ b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java @@ -55,7 +55,7 @@ public class EventHubIngestionService { } EventSourceDto eventSource = packageInfo.eventSource(); - int eventSourceId = eventSourceRepository.resolveSourceId(eventSource); + int eventSourceId = eventSourceRepository.resolveSourceId(packageInfo.tenantKey(), eventSource); List sortedEvents = sorter.sort(batch.events()); sortedEvents.forEach(validator::validate); diff --git a/src/main/java/at/procon/eventhub/service/TachographImportPlanService.java b/src/main/java/at/procon/eventhub/service/TachographImportPlanService.java new file mode 100644 index 0000000..6c7c2ae --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographImportPlanService.java @@ -0,0 +1,77 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.TachographImportPlanDto; +import at.procon.eventhub.dto.TachographImportPlanItemDto; +import at.procon.eventhub.dto.TachographImportRequest; +import java.util.ArrayList; +import java.util.List; +import org.springframework.stereotype.Service; + +@Service +public class TachographImportPlanService { + + public TachographImportPlanDto createPlan(TachographImportRequest request) { + List items = new ArrayList<>(); + for (EventFamily family : request.eventFamilies()) { + items.addAll(itemsFor(family)); + } + return new TachographImportPlanDto( + request.tenantKey(), + request.mode(), + request.acquisitionStrategy(), + request.refreshMasterDataFirst(), + request.importScope(), + request.sourceGroup(), + request.eventSource(), + items + ); + } + + private List itemsFor(EventFamily family) { + return switch (family) { + case DRIVER_ACTIVITY -> List.of( + item(family, "VEHICLE_UNIT", "VU_ACTIVITY", List.of("VUActivity"), "VEHICLE", "Vehicle-unit driver activity point events"), + item(family, "DRIVER_CARD", "CARD_ACTIVITY", List.of("CardActivity"), "DRIVER", "Driver-card activity point events") + ); + case DRIVER_CARD -> List.of( + item(family, "VEHICLE_UNIT", "IW_CYCLE", List.of("IWCycle"), "BOTH", "Card insert/withdraw events from VU cycles"), + item(family, "DRIVER_CARD", "CARD_VEHICLES_USED", List.of("CardVehiclesUsed"), "DRIVER", "Card insert/withdraw/use events from card vehicle usage") + ); + case POSITION -> List.of( + item(family, "VEHICLE_UNIT", "VU_POSITION", List.of("VUPlaces", "VULoadUnload", "VUGnssAccumulatedDriving", "VUBorderCrossing"), "VEHICLE", "Position points from VU tachograph sources"), + item(family, "DRIVER_CARD", "CARD_POSITION", List.of("CardPlaces", "CardLoadUnload", "CardGnssAccumulatedDriving", "CardBorderCrossing"), "DRIVER", "Position points from driver-card tachograph sources") + ); + case BORDER_CROSSING -> List.of( + item(family, "VEHICLE_UNIT", "VU_BORDER_CROSSING", List.of("VUBorderCrossing"), "VEHICLE", "Border crossing events from VU"), + item(family, "DRIVER_CARD", "CARD_BORDER_CROSSING", List.of("CardBorderCrossing"), "DRIVER", "Border crossing events from driver card") + ); + case LOAD_UNLOAD -> List.of( + item(family, "VEHICLE_UNIT", "VU_LOAD_UNLOAD", List.of("VULoadUnload"), "VEHICLE", "Load/unload operation events from VU"), + item(family, "DRIVER_CARD", "CARD_LOAD_UNLOAD", List.of("CardLoadUnload"), "DRIVER", "Load/unload operation events from driver card") + ); + case SPECIFIC_CONDITION -> List.of( + item(family, "VEHICLE_UNIT", "VU_SPECIFIC_CONDITION", List.of("VUSpecificCondition"), "VEHICLE", "Out-of-scope and ferry/train events from VU"), + item(family, "DRIVER_CARD", "CARD_SPECIFIC_CONDITION", List.of("CardSpecificCondition"), "DRIVER", "Out-of-scope and ferry/train events from driver card") + ); + case PLACE -> List.of( + item(family, "VEHICLE_UNIT", "VU_PLACE", List.of("VUPlaces"), "VEHICLE", "Start/end place events from VU"), + item(family, "DRIVER_CARD", "CARD_PLACE", List.of("CardPlaces"), "DRIVER", "Start/end place events from driver card") + ); + case SPEEDING -> List.of( + item(family, "VEHICLE_UNIT", "SPEEDING_EVENTS", List.of("SpeedingEvents"), "VEHICLE", "Speeding begin/end events") + ); + }; + } + + private TachographImportPlanItemDto item( + EventFamily family, + String sourceKind, + String extractionCode, + List sourceTables, + String entityAxis, + String description + ) { + return new TachographImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description); + } +} diff --git a/src/main/resources/db/migration/V1__create_eventhub_schema.sql b/src/main/resources/db/migration/V1__create_eventhub_schema.sql index 0a5c10f..b24efa2 100644 --- a/src/main/resources/db/migration/V1__create_eventhub_schema.sql +++ b/src/main/resources/db/migration/V1__create_eventhub_schema.sql @@ -3,9 +3,11 @@ create extension if not exists pgcrypto; create schema if not exists eventhub; -- Acquisition source definition. This represents where the imported source --- record came from. Source records are intentionally kept separately by provider/source. +-- record came from. It is tenant-scoped because the same provider/source keys +-- may be reused by different customers. create table if not exists eventhub.event_source ( id integer generated always as identity primary key, + tenant_key text not null, provider_key text not null, source_kind text not null, source_key text not null, @@ -13,7 +15,56 @@ create table if not exists eventhub.event_source ( tenant_provider_setting_key text, external_fleet_key text, created_at timestamptz not null default now(), - constraint ux_event_source unique (provider_key, source_kind, source_key, source_instance_key) + constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key) +); + +-- One execution of a tachograph acquisition job. A run may create many data packages. +create table if not exists eventhub.import_run ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + mode text not null, + status text not null, + refresh_master_data_first boolean not null default true, + + source_group_type text, + source_group_entity_id text, + source_group_code text, + source_group_name text, + + import_scope_type text not null, + root_source_org_entity_id text, + root_source_org_code text, + root_source_org_name text, + include_children boolean not null default false, + occurred_from timestamptz, + occurred_to timestamptz, + + requested_event_families text[] not null default '{}', + acquisition_strategy text, + metadata jsonb not null default '{}'::jsonb, + started_at timestamptz not null default now(), + finished_at timestamptz, + error_message text, + constraint chk_import_run_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to) +); + +-- Optional cursor table for scheduled/difference imports. The first implementation can +-- use occurredAt windows; later it can switch to source-package or source-row watermarks. +create table if not exists eventhub.import_cursor ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + scope_hash text not null, + event_family text not null, + source_kind text not null, + cursor_type text not null, + last_source_package_imported_at timestamptz, + last_source_package_id text, + last_source_row_updated_at timestamptz, + last_occurred_to timestamptz, + updated_at timestamptz not null default now(), + constraint ux_import_cursor unique (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type) ); -- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/import scope. @@ -21,6 +72,7 @@ create table if not exists eventhub.event_source ( create table if not exists eventhub.data_package ( id uuid primary key, event_source_id integer not null references eventhub.event_source(id), + import_run_id uuid references eventhub.import_run(id), tenant_key text not null, package_key text not null, package_type text not null, @@ -52,9 +104,10 @@ create table if not exists eventhub.data_package ( ); -- Temporary acquisition-stage point-event store. --- It keeps the discussed DTO shape: EventSource context, externalSourceEventId, --- one occurredAt timestamp, source-side driver/vehicle refs, source-side organisation assignments, --- normalized event details, and raw JSON payload. +-- It keeps acquired point events with EventSource context, externalSourceEventId, +-- one occurredAt timestamp, source-side driver/vehicle refs, normalized event details, +-- and raw JSON payload. Organisation is intentionally not stored per event; it belongs +-- to master-data relations for driver/vehicle and is represented in importScope/sourceGroup. create table if not exists eventhub.acquired_event ( id uuid not null, event_source_id integer not null references eventhub.event_source(id), @@ -65,17 +118,11 @@ create table if not exists eventhub.acquired_event ( driver_source_entity_id text, driver_card_nation text, driver_card_number text, - driver_source_org_entity_id text, - driver_source_org_code text, - driver_source_org_name text, vehicle_source_entity_id text, vehicle_vin text, vehicle_registration_nation text, vehicle_registration_number text, - vehicle_source_org_entity_id text, - vehicle_source_org_code text, - vehicle_source_org_name text, occurred_at timestamptz not null, received_partner_at timestamptz, @@ -136,14 +183,6 @@ create index if not exists idx_acquired_event_driver_card_time on eventhub.acquired_event(driver_card_nation, driver_card_number, occurred_at desc) where driver_card_number is not null; -create index if not exists idx_acquired_event_driver_org_time - on eventhub.acquired_event(driver_source_org_entity_id, occurred_at desc) - where driver_source_org_entity_id is not null; - -create index if not exists idx_acquired_event_vehicle_org_time - on eventhub.acquired_event(vehicle_source_org_entity_id, occurred_at desc) - where vehicle_source_org_entity_id is not null; - create index if not exists idx_acquired_event_domain_type_time on eventhub.acquired_event(event_domain, event_type, occurred_at desc); @@ -158,3 +197,6 @@ create index if not exists idx_data_package_source_time create index if not exists idx_data_package_scope on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to); + +create index if not exists idx_import_run_source_status + on eventhub.import_run(tenant_key, event_source_id, status, started_at desc); diff --git a/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java b/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java index bcb8ae3..4d6cebf 100644 --- a/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java +++ b/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java @@ -54,8 +54,8 @@ class EventAcquisitionRecordKeyServiceTest { return new EventHubEventDto( null, externalSourceEventId, - new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789"), null), - new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345"), null), + new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")), + new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345")), OffsetDateTime.parse("2026-04-28T08:00:00+02:00"), null, null, diff --git a/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java index b31b1d9..cabfa2b 100644 --- a/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java +++ b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java @@ -35,8 +35,8 @@ class YellowFoxD8BookingEventMapperTest { 3, OffsetDateTime.parse("2026-04-29T08:15:00+02:00"), null, - new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789"), null), - new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345"), null), + new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")), + new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345")), null, null, null,