Add tachograph import planning scaffold

This commit is contained in:
trifonovt 2026-04-30 12:33:34 +02:00
parent 230ae1987d
commit 6a7395bec6
18 changed files with 642 additions and 300 deletions

504
README.md
View File

@ -1,22 +1,8 @@
# EventHub Acquisition Service # EventHub Acquisition Service
Spring Boot + Apache Camel project skeleton for acquiring normalized EventHub point events from multiple providers/sources. Spring Boot + Apache Camel skeleton for acquiring normalized EventHub point events from multiple providers/sources.
The current version intentionally focuses on **acquisition**. It stores source records as imported and does not merge or deduplicate equivalent events from different providers/sources. It does, however, keep a non-unique eventSignatureHash as a later merge/gap-filling hint. Later query/read models can merge sources when a preferred/main source contains gaps. The included PostgreSQL schema is a small acquisition-stage store so the project can be run and tested end-to-end. The current version focuses on **acquisition from source systems**, especially tachograph DB data. It stores source records as imported. It does **not** merge or deduplicate equivalent events from different providers/sources. It does keep a non-unique `eventSignatureHash` as a future query/projection hint.
## Architecture
```text
source-specific Camel input route
-> source-specific mapper
-> EventHubEventDto
-> common EventHub acquisition route
-> validation
-> package-key creation from tenant + EventSource + source group + import scope + event family
-> aggregation / batching
-> chronological sorting inside the batch
-> acquisition package handoff
```
## Namespace ## Namespace
@ -26,7 +12,7 @@ at.procon.eventhub
## Main model decisions ## Main model decisions
### 1. One event = one time point ### One event = one point in time
`EventHubEventDto` has exactly one timestamp: `EventHubEventDto` has exactly one timestamp:
@ -34,13 +20,15 @@ at.procon.eventhub
occurredAt occurredAt
``` ```
There is no generic `duration`, `endTime`, `validFrom`, or `validTo`. If a source row represents an interval, the mapper may emit separate point events, for example `DRIVE START` and `DRIVE END`. There is no generic `duration`, `endTime`, `validFrom`, or `validTo`. If a source row represents an interval, a mapper may emit separate point events such as `DRIVE START` and `DRIVE END`.
### 2. Tenant is package-level ### Tenant is package/job-level
`tenantKey` identifies the owner/client/account for the package. It is required for acquisition grouping and future master-data resolution. `tenantKey` identifies the customer/data owner. It is mandatory for import packages and tachograph import requests.
### 3. EventSource identifies the technical source ### EventSource identifies the technical source
Example:
```json ```json
{ {
@ -62,34 +50,9 @@ YELLOWFOX / TELEMATICS_PLATFORM / YELLOWFOX_D8
FLEETBOARD / TELEMATICS_PLATFORM / FLEETBOARD_POSITION FLEETBOARD / TELEMATICS_PLATFORM / FLEETBOARD_POSITION
``` ```
`EventSource` is acquisition context. A VU event, a driver-card event and a YellowFox D8 event may describe the same real-world event, but this acquisition service keeps them as separate acquired source records. Cross-source merging/gap filling is intentionally left for a later query/read model. ### SourceGroup is package/source grouping only
### 4. No cross-source deduplication during acquisition For tachograph, `sourceGroup` can identify the selected source organisation/root organisation.
The acquisition layer stores every source record independently. It uses `sourceRecordKeyHash` only for idempotency of the same source event, so the same input package can be retried safely. It does **not** deduplicate VU vs driver-card vs YellowFox records.
This is intentional because later queries may need to combine sources: for example, use tachograph data as the main source, but fill gaps from YellowFox or another provider.
The acquisition table also stores a non-unique `eventSignatureHash`. This is a semantic merge hint, not a unique key. It intentionally excludes `EventSource` and `externalSourceEventId`, so VU, driver-card and YellowFox records that look like the same real-world event can share a signature while still being stored separately. Later query/projection logic can use this signature for source comparison, gap filling, and merged timelines. The signature prefers nation-scoped driver card and vehicle registration when available, then VIN or source entity id as fallback, so it remains useful before final master-data resolution.
Therefore the current model preserves:
```text
tenantKey
eventSource
sourceGroup
importScope
externalSourceEventId
source-side driver/vehicle references
eventDetails
payload
```
### 5. SourceGroup captures tachograph organisation or YellowFox fleet
`sourceGroup` is package-level source grouping information.
For tachograph it can be a source organisation:
```json ```json
"sourceGroup": { "sourceGroup": {
@ -100,7 +63,7 @@ For tachograph it can be a source organisation:
} }
``` ```
For YellowFox it can be a fleet: For YellowFox, it can identify the provider fleet.
```json ```json
"sourceGroup": { "sourceGroup": {
@ -111,9 +74,9 @@ For YellowFox it can be a fleet:
} }
``` ```
The YellowFox fleet belongs to the same tenant/customer, but it is not forced to be an organisation. It can later be mapped to a tenant organisation if needed. YellowFox fleet is not forced to be an organisation. It belongs to the same tenant/customer and can later be mapped or resolved through vehicle/driver master data if needed.
### 6. ImportScope captures organisation and time filtering ### ImportScope describes data selection
`importScope` describes what was selected from the source system. `importScope` describes what was selected from the source system.
@ -146,13 +109,15 @@ Organisation subtree + time-window import:
} }
``` ```
`occurredFrom` is inclusive and `occurredTo` is exclusive. Both may be `null` for complete source DB import. `occurredFrom` is inclusive. `occurredTo` is exclusive. Both can be `null` for complete DB/history imports.
### 7. Source-side master references, no incoming internal IDs ### Driver/vehicle refs do not contain organisation
The incoming DTO does not require internal `driverId` or `vehicleId`, because in normal ingestion those ids are not known yet. Organisation assignment is a **master-data relation**, not an event property.
Driver reference with nation-scoped driver card: Events depend on driver and/or vehicle. The relation of organisation to driver/vehicle is imported and resolved separately from master data using `occurredAt`.
Driver ref:
```json ```json
"driverRef": { "driverRef": {
@ -160,17 +125,11 @@ Driver reference with nation-scoped driver card:
"driverCard": { "driverCard": {
"nation": "AT", "nation": "AT",
"number": "D123456789" "number": "D123456789"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57",
"code": "57",
"name": "Sub Org 57"
} }
} }
``` ```
Vehicle reference with optional VIN and nation-scoped VRN: Vehicle ref:
```json ```json
"vehicleRef": { "vehicleRef": {
@ -179,17 +138,11 @@ Vehicle reference with optional VIN and nation-scoped VRN:
"vehicleRegistration": { "vehicleRegistration": {
"nation": "AT", "nation": "AT",
"number": "W-12345" "number": "W-12345"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57",
"code": "57",
"name": "Sub Org 57"
} }
} }
``` ```
VIN is optional. Driver-card-only events can carry only the nation-scoped VRN/registration: Driver-card-only imports can carry only a nation-scoped VRN and no VIN:
```json ```json
"vehicleRef": { "vehicleRef": {
@ -198,33 +151,264 @@ VIN is optional. Driver-card-only events can carry only the nation-scoped VRN/re
"vehicleRegistration": { "vehicleRegistration": {
"nation": "AT", "nation": "AT",
"number": "W-12345" "number": "W-12345"
},
"sourceOrganisation": null
}
```
This allows late resolution when VU/master data later connects the VRN to a VIN.
### 8. Generic normalized eventDetails
Reusable event-specific properties are stored in:
```json
"eventDetails": {
"type": "DRIVER_ACTIVITY",
"attributes": {
"cardSlot": "DRIVER",
"cardStatus": "INSERTED",
"drivingStatus": "SINGLE"
} }
} }
``` ```
Raw provider values stay in `payload`. Later master-data resolution can connect `VRN + nation + occurredAt` to a VIN/vehicle.
## Package-level acquisition request ### No cross-source deduplication during acquisition
For external/manual ingestion, the preferred request shape is: The acquisition layer stores every source record independently. It uses `sourceRecordKeyHash` only for idempotency of the same source event:
```text
tenantKey + EventSource + externalSourceEventId
```
It also stores a non-unique `eventSignatureHash`. This is only a semantic hint for future query-time merging/gap filling. It is not unique and must not suppress imports.
## Tachograph import job model
For real tachograph DB extraction, use a tachograph import request. This describes the job and produces an import plan. SQL extraction routes are intentionally scaffolded as the next implementation step.
```http
POST /api/eventhub/acquisition/tachograph/imports/plan
POST /api/eventhub/acquisition/tachograph/imports/start
```
Example: initial import from one root organisation and its children:
```json
{
"tenantKey": "kralowetz",
"eventSource": {
"providerKey": "TACHOGRAPH",
"sourceKind": "MIXED",
"sourceKey": "TACHOGRAPH_DB",
"sourceInstanceKey": "main-tachograph-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod"
},
"sourceGroup": {
"type": "ORGANISATION",
"sourceEntityId": "147",
"code": "147",
"name": "Kralowetz"
},
"importScope": {
"type": "SOURCE_ORGANISATION_SUBTREE",
"rootSourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "147",
"code": "147",
"name": "Kralowetz"
},
"includeChildren": true,
"occurredFrom": "2025-01-01T00:00:00+01:00",
"occurredTo": null
},
"eventFamilies": [
"DRIVER_ACTIVITY",
"DRIVER_CARD",
"POSITION",
"BORDER_CROSSING",
"LOAD_UNLOAD",
"PLACE",
"SPECIFIC_CONDITION",
"SPEEDING"
],
"mode": "INITIAL_BACKFILL",
"refreshMasterDataFirst": true,
"acquisitionStrategy": "OCCURRED_AT_WINDOW_WITH_OVERLAP"
}
```
Example: regular incremental update:
```json
{
"tenantKey": "kralowetz",
"eventSource": {
"providerKey": "TACHOGRAPH",
"sourceKind": "MIXED",
"sourceKey": "TACHOGRAPH_DB",
"sourceInstanceKey": "main-tachograph-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod"
},
"sourceGroup": {
"type": "ORGANISATION",
"sourceEntityId": "147"
},
"importScope": {
"type": "SOURCE_ORGANISATION_SUBTREE",
"rootSourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "147"
},
"includeChildren": true,
"occurredFrom": null,
"occurredTo": null
},
"eventFamilies": ["DRIVER_ACTIVITY", "DRIVER_CARD", "POSITION", "BORDER_CROSSING", "LOAD_UNLOAD", "PLACE", "SPECIFIC_CONDITION", "SPEEDING"],
"mode": "INCREMENTAL_UPDATE",
"refreshMasterDataFirst": true,
"acquisitionStrategy": "SOURCE_PACKAGE_WATERMARK"
}
```
## Tachograph extraction plan
The import-plan service currently creates extraction definitions like:
```text
DRIVER_ACTIVITY / VEHICLE_UNIT -> VUActivity
DRIVER_ACTIVITY / DRIVER_CARD -> CardActivity
DRIVER_CARD / VEHICLE_UNIT -> IWCycle
DRIVER_CARD / DRIVER_CARD -> CardVehiclesUsed
POSITION / VEHICLE_UNIT -> VUPlaces, VULoadUnload, VUGnssAccumulatedDriving, VUBorderCrossing
POSITION / DRIVER_CARD -> CardPlaces, CardLoadUnload, CardGnssAccumulatedDriving, CardBorderCrossing
BORDER_CROSSING / VEHICLE_UNIT -> VUBorderCrossing
BORDER_CROSSING / DRIVER_CARD -> CardBorderCrossing
LOAD_UNLOAD / VEHICLE_UNIT -> VULoadUnload
LOAD_UNLOAD / DRIVER_CARD -> CardLoadUnload
SPECIFIC_CONDITION / VEHICLE_UNIT -> VUSpecificCondition
SPECIFIC_CONDITION / DRIVER_CARD -> CardSpecificCondition
PLACE / VEHICLE_UNIT -> VUPlaces
PLACE / DRIVER_CARD -> CardPlaces
SPEEDING / VEHICLE_UNIT -> SpeedingEvents
```
The next implementation step is to replace the scaffolded plan items with actual Camel/JDBC SQL extraction routes.
## Acquisition alternatives considered
### Alternative A: occurred-time window import
Read events by `occurredAt` for a root organisation/time window.
Pros:
```text
simple
works for initial backfill
matches explicit from/to import requests
```
Cons:
```text
unsafe as the only incremental method because a newly imported card/VU package can contain old occurredAt data
requires overlap windows for regular updates
```
Best use:
```text
initial backfill and reprocessing
fallback incremental strategy with overlap
```
### Alternative B: source-package watermark import
Read original tachograph card/VU packages that were imported/changed in the tachograph DB since the last successful EventHub run, then extract all events belonging to those packages.
Pros:
```text
best for regular updates
handles late-arriving historical tachograph packages
fits the tachograph package concept
```
Cons:
```text
requires reliable source package metadata and links from event rows to package/source download
more complex SQL and cursor state
```
Best use:
```text
primary incremental strategy if tachograph DB exposes package import timestamps/ids
```
### Alternative C: source-row watermark import
Read source event rows changed since last run using row-level `updatedAt` or monotonic IDs.
Pros:
```text
precise if row update timestamps are reliable
does not require package-level model
```
Cons:
```text
not possible if source tables do not have reliable changed/updated metadata
harder across many event tables
```
Best use:
```text
fallback when rows have reliable updatedAt/row version fields
```
### Alternative D: per vehicle/per driver polling
After master-data refresh, loop through vehicles and drivers in the selected organisation subtree and read their event data.
Pros:
```text
matches your existing data acquisition pattern
naturally separates vehicle-unit and driver-card data
supports organisation-scoped imports well
```
Cons:
```text
can be slower for large fleets
requires careful batching/chunking and parallelism
can miss late old data unless combined with package/row watermark or overlap
```
Best use:
```text
scope resolution and controlled extraction, combined with Alternative A or B
```
## Recommended ingestion strategy
Use a hybrid:
```text
Initial import:
master data first
organisation subtree + occurredFrom/occurredTo
chunk by time and/or vehicle/driver
import idempotently by sourceRecordKeyHash
Regular update:
master data first
prefer source-package watermark
fallback to occurredAt overlap window if package metadata is insufficient
import idempotently by sourceRecordKeyHash
```
This means the EventHub acquisition package is an **extraction package**, while the original tachograph card/VU package should be preserved as source metadata in payload or later in a dedicated source-package table.
## Existing package-level normalized event ingestion
```http
POST /api/eventhub/acquisition/packages
```
Example:
```json ```json
{ {
@ -239,17 +423,13 @@ For external/manual ingestion, the preferred request shape is:
}, },
"sourceGroup": { "sourceGroup": {
"type": "ORGANISATION", "type": "ORGANISATION",
"sourceEntityId": "147", "sourceEntityId": "147"
"code": "147",
"name": "Kralowetz"
}, },
"importScope": { "importScope": {
"type": "SOURCE_ORGANISATION_SUBTREE", "type": "SOURCE_ORGANISATION_SUBTREE",
"rootSourceOrganisation": { "rootSourceOrganisation": {
"type": "ORGANISATION", "type": "ORGANISATION",
"sourceEntityId": "147", "sourceEntityId": "147"
"code": "147",
"name": "Kralowetz"
}, },
"includeChildren": true, "includeChildren": true,
"occurredFrom": "2026-04-28T00:00:00+02:00", "occurredFrom": "2026-04-28T00:00:00+02:00",
@ -267,10 +447,6 @@ For external/manual ingestion, the preferred request shape is:
"driverCard": { "driverCard": {
"nation": "AT", "nation": "AT",
"number": "D123456789" "number": "D123456789"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57"
} }
}, },
"vehicleRef": { "vehicleRef": {
@ -279,10 +455,6 @@ For external/manual ingestion, the preferred request shape is:
"vehicleRegistration": { "vehicleRegistration": {
"nation": "AT", "nation": "AT",
"number": "W-12345" "number": "W-12345"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57"
} }
}, },
"occurredAt": "2026-04-28T08:00:00+02:00", "occurredAt": "2026-04-28T08:00:00+02:00",
@ -312,17 +484,16 @@ For external/manual ingestion, the preferred request shape is:
## Routes ## Routes
### Source-specific input routes
```text ```text
direct:yellowfox-d8-booking-input direct:yellowfox-d8-booking-input
direct:telematics-position-input direct:telematics-position-input
direct:tachograph-activity-input direct:tachograph-activity-input
direct:tachograph-import-start
direct:eventhub-package-input direct:eventhub-package-input
direct:eventhub-manual-input direct:eventhub-manual-input
``` ```
### Common route Common route:
```text ```text
direct:eventhub-normalized-input direct:eventhub-normalized-input
@ -334,97 +505,6 @@ direct:eventhub-normalized-input
-> EventHubIngestionService.ingest(...) -> EventHubIngestionService.ingest(...)
``` ```
## REST endpoints
```text
POST /api/eventhub/acquisition/yellowfox/d8-bookings
POST /api/eventhub/acquisition/telematics/positions
POST /api/eventhub/acquisition/tachograph/activities
POST /api/eventhub/acquisition/packages
POST /api/eventhub/acquisition/events
```
## Example: tachograph driver-card activity with VRN only
```bash
curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activities \
-H "Content-Type: application/json" \
-d '[
{
"tenantKey": "kralowetz",
"sourceKind": "DRIVER_CARD",
"sourceInstanceKey": "main-tachograph-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod",
"externalSourceEventId": "TACHOGRAPH:DRIVER_CARD:activity:789:start",
"driverRef": {
"sourceEntityId": "driver-100",
"driverCard": {
"nation": "AT",
"number": "D123456789"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57"
}
},
"vehicleRef": {
"sourceEntityId": null,
"vin": null,
"vehicleRegistration": {
"nation": "AT",
"number": "W-12345"
},
"sourceOrganisation": null
},
"occurredAt": "2026-04-28T08:00:00+02:00",
"activityType": "DRIVE",
"lifecycle": "START",
"cardSlot": "DRIVER",
"cardStatus": "INSERTED",
"drivingStatus": "SINGLE",
"payload": {
"raw": {
"activity": 3,
"cardSlot": 0,
"cardStatus": 0,
"drivingStatus": 0
}
}
}
]'
```
The mapper creates a default `TENANT_ALL` one-day import scope for this convenience endpoint. For real tachograph import jobs with organisation subtree/full DB scope, use the package-level request or add dedicated SQL extraction job routes.
## Example: full tachograph DB import package
```json
{
"package": {
"tenantKey": "kralowetz",
"eventSource": {
"providerKey": "TACHOGRAPH",
"sourceKind": "VEHICLE_UNIT",
"sourceKey": "TACHOGRAPH_VEHICLE_UNIT",
"sourceInstanceKey": "main-tachograph-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod"
},
"sourceGroup": null,
"importScope": {
"type": "TENANT_ALL",
"rootSourceOrganisation": null,
"includeChildren": false,
"occurredFrom": null,
"occurredTo": null
},
"eventFamily": "DRIVER_ACTIVITY",
"businessDate": null,
"externalPackageId": "TACHOGRAPH:ALL:DRIVER_ACTIVITY:FULL"
},
"events": []
}
```
## Start PostgreSQL ## Start PostgreSQL
```bash ```bash
@ -467,15 +547,14 @@ select occurred_at,
driver_source_entity_id, driver_source_entity_id,
driver_card_nation, driver_card_nation,
driver_card_number, driver_card_number,
driver_source_org_entity_id,
vehicle_source_entity_id, vehicle_source_entity_id,
vehicle_vin, vehicle_vin,
vehicle_registration_nation, vehicle_registration_nation,
vehicle_registration_number, vehicle_registration_number,
vehicle_source_org_entity_id,
event_domain, event_domain,
event_type, event_type,
lifecycle, lifecycle,
event_signature_hash,
event_details, event_details,
payload payload
from eventhub.acquired_event from eventhub.acquired_event
@ -484,23 +563,8 @@ order by occurred_at desc;
## Next implementation steps ## Next implementation steps
1. Add source-specific SQL extraction routes for the tachograph DB event families: 1. Add actual Camel/JDBC extraction routes behind the tachograph import plan.
- activities from CardActivity/VUActivity 2. Implement master-data acquisition first: organisation tree, driver/card assignments, vehicle VIN/VRN assignments, driver/vehicle organisation assignment histories.
- card insert/withdraw from CardVehiclesUsed/IWCycle 3. Implement initial backfill using organisation/time scope.
- positions from places/GNSS/border/load-unload sources 4. Implement incremental import using source-package watermark, with occurredAt overlap fallback.
- border crossings 5. Discuss query/read models later: source priority and gap filling across tachograph, YellowFox and other sources.
- load/unload
- specific conditions: out-of-scope and ferry/train
- speeding events
2. Each SQL extraction route should accept `ImportScopeDto`:
- optional source organisation root + include children
- optional occurredFrom/occurredTo
- null time bounds mean complete DB/history import
3. Add master-data resolution later:
- driver by tenant + driver card nation/number + occurredAt
- vehicle by tenant + VIN or tenant + registration nation/number + occurredAt
- late resolution from VRN-only driver-card events to VIN after VU/master data import
4. Discuss query/read models later:
- how to merge acquired events from all sources at query time
- source priority per event family when the main source contains gaps
- how to expose source provenance when multiple sources describe the same real-world event

View File

@ -2,9 +2,11 @@ package at.procon.eventhub.api;
import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageIngestRequest; import at.procon.eventhub.dto.EventHubPackageIngestRequest;
import at.procon.eventhub.dto.TachographImportRequest;
import at.procon.eventhub.dto.source.TachographActivityDto; import at.procon.eventhub.dto.source.TachographActivityDto;
import at.procon.eventhub.dto.source.TelematicsPositionDto; import at.procon.eventhub.dto.source.TelematicsPositionDto;
import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; import at.procon.eventhub.dto.source.YellowFoxD8BookingDto;
import at.procon.eventhub.service.TachographImportPlanService;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -20,9 +22,11 @@ import org.springframework.web.bind.annotation.RestController;
public class EventHubIngestionController { public class EventHubIngestionController {
private final ProducerTemplate producerTemplate; private final ProducerTemplate producerTemplate;
private final TachographImportPlanService tachographImportPlanService;
public EventHubIngestionController(ProducerTemplate producerTemplate) { public EventHubIngestionController(ProducerTemplate producerTemplate, TachographImportPlanService tachographImportPlanService) {
this.producerTemplate = producerTemplate; this.producerTemplate = producerTemplate;
this.tachographImportPlanService = tachographImportPlanService;
} }
@PostMapping("/yellowfox/d8-bookings") @PostMapping("/yellowfox/d8-bookings")
@ -43,6 +47,21 @@ public class EventHubIngestionController {
return accepted(activities.size(), "direct:tachograph-activity-input"); return accepted(activities.size(), "direct:tachograph-activity-input");
} }
@PostMapping("/tachograph/imports/plan")
public ResponseEntity<?> planTachographImport(@Valid @RequestBody TachographImportRequest request) {
return ResponseEntity.ok(tachographImportPlanService.createPlan(request));
}
@PostMapping("/tachograph/imports/start")
public ResponseEntity<Map<String, Object>> startTachographImport(@Valid @RequestBody TachographImportRequest request) {
producerTemplate.sendBody("direct:tachograph-import-start", request);
return ResponseEntity.accepted().body(Map.of(
"accepted", true,
"route", "direct:tachograph-import-start",
"note", "The current implementation prepares the tachograph import plan. SQL extraction routes are intentionally scaffolded as next step."
));
}
@PostMapping("/packages") @PostMapping("/packages")
public ResponseEntity<Map<String, Object>> ingestPackage(@Valid @RequestBody EventHubPackageIngestRequest request) { public ResponseEntity<Map<String, Object>> ingestPackage(@Valid @RequestBody EventHubPackageIngestRequest request) {
producerTemplate.sendBody("direct:eventhub-package-input", request); producerTemplate.sendBody("direct:eventhub-package-input", request);

View File

@ -0,0 +1,33 @@
package at.procon.eventhub.camel;
import at.procon.eventhub.dto.TachographImportRequest;
import at.procon.eventhub.service.TachographImportPlanService;
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class TachographImportRequestRoute extends RouteBuilder {
private static final Logger log = LoggerFactory.getLogger(TachographImportRequestRoute.class);
private final TachographImportPlanService planService;
public TachographImportRequestRoute(TachographImportPlanService planService) {
this.planService = planService;
}
@Override
public void configure() {
from("direct:tachograph-import-start")
.routeId("tachograph-import-start-route")
.process(exchange -> {
TachographImportRequest request = exchange.getMessage().getBody(TachographImportRequest.class);
var plan = planService.createPlan(request);
log.info("Prepared tachograph import plan tenant={} mode={} strategy={} scope={} itemCount={}",
plan.tenantKey(), plan.mode(), plan.acquisitionStrategy(), plan.importScope().stableKey(), plan.items().size());
exchange.getMessage().setBody(plan);
});
}
}

View File

@ -0,0 +1,21 @@
package at.procon.eventhub.dto;
/**
* Strategy hint for tachograph DB acquisition.
*/
public enum AcquisitionStrategy {
/**
* Preferred when the tachograph DB has original card/VU package metadata and import timestamps.
*/
SOURCE_PACKAGE_WATERMARK,
/**
* Use if event/source rows have reliable updated-at timestamps.
*/
SOURCE_ROW_WATERMARK,
/**
* Simple and robust fallback: re-read an occurred-time window with overlap and rely on source-record idempotency.
*/
OCCURRED_AT_WINDOW_WITH_OVERLAP
}

View File

@ -6,11 +6,14 @@ import jakarta.validation.Valid;
* Source-side driver reference. No internal EventHub driver id is required in * Source-side driver reference. No internal EventHub driver id is required in
* incoming acquisition requests; it can be resolved later from sourceEntityId or * incoming acquisition requests; it can be resolved later from sourceEntityId or
* nation-scoped driver card number. * nation-scoped driver card number.
*
* Organisation assignment is intentionally not stored on the event. Driver
* organisation relation belongs to master data and can be resolved by
* sourceEntityId/driverCard + occurredAt when needed.
*/ */
public record DriverRefDto( public record DriverRefDto(
String sourceEntityId, String sourceEntityId,
@Valid DriverCardRefDto driverCard, @Valid DriverCardRefDto driverCard
@Valid SourceGroupRefDto sourceOrganisation
) { ) {
public DriverRefDto { public DriverRefDto {
sourceEntityId = normalizeNullable(sourceEntityId); sourceEntityId = normalizeNullable(sourceEntityId);

View File

@ -0,0 +1,12 @@
package at.procon.eventhub.dto;
public enum EventFamily {
DRIVER_ACTIVITY,
DRIVER_CARD,
POSITION,
BORDER_CROSSING,
LOAD_UNLOAD,
SPECIFIC_CONDITION,
PLACE,
SPEEDING
}

View File

@ -0,0 +1,7 @@
package at.procon.eventhub.dto;
public enum ImportMode {
INITIAL_BACKFILL,
INCREMENTAL_UPDATE,
REPROCESS
}

View File

@ -0,0 +1,15 @@
package at.procon.eventhub.dto;
import java.util.List;
public record TachographImportPlanDto(
String tenantKey,
ImportMode mode,
AcquisitionStrategy acquisitionStrategy,
boolean refreshMasterDataFirst,
ImportScopeDto importScope,
SourceGroupRefDto sourceGroup,
EventSourceDto eventSource,
List<TachographImportPlanItemDto> items
) {
}

View File

@ -0,0 +1,13 @@
package at.procon.eventhub.dto;
import java.util.List;
public record TachographImportPlanItemDto(
EventFamily eventFamily,
String sourceKind,
String extractionCode,
List<String> sourceTables,
String entityAxis,
String description
) {
}

View File

@ -0,0 +1,43 @@
package at.procon.eventhub.dto;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.util.EnumSet;
import java.util.Set;
/**
* Request for acquiring tachograph DB data into EventHub. This request describes
* the import job/scope; individual SQL extraction routes then create acquisition
* packages and EventHubEventDto records.
*/
public record TachographImportRequest(
@NotBlank String tenantKey,
@Valid @NotNull EventSourceDto eventSource,
@Valid SourceGroupRefDto sourceGroup,
@Valid @NotNull ImportScopeDto importScope,
Set<EventFamily> eventFamilies,
ImportMode mode,
boolean refreshMasterDataFirst,
AcquisitionStrategy acquisitionStrategy
) {
public TachographImportRequest {
tenantKey = tenantKey == null ? null : tenantKey.trim();
if (importScope == null) {
importScope = ImportScopeDto.tenantAll(null, null);
}
if (eventFamilies == null || eventFamilies.isEmpty()) {
eventFamilies = EnumSet.allOf(EventFamily.class);
} else {
eventFamilies = EnumSet.copyOf(eventFamilies);
}
if (mode == null) {
mode = ImportMode.INITIAL_BACKFILL;
}
if (acquisitionStrategy == null) {
acquisitionStrategy = mode == ImportMode.INCREMENTAL_UPDATE
? AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK
: AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP;
}
}
}

View File

@ -5,12 +5,15 @@ import jakarta.validation.Valid;
/** /**
* Source-side vehicle reference. VIN can be missing for driver-card-only data; * Source-side vehicle reference. VIN can be missing for driver-card-only data;
* VRN/registration is nation-scoped and can be resolved to VIN later. * VRN/registration is nation-scoped and can be resolved to VIN later.
*
* Organisation assignment is intentionally not stored on the event. Vehicle
* organisation relation belongs to master data and can be resolved by
* sourceEntityId/VIN/VRN + occurredAt when needed.
*/ */
public record VehicleRefDto( public record VehicleRefDto(
String sourceEntityId, String sourceEntityId,
String vin, String vin,
@Valid VehicleRegistrationRefDto vehicleRegistration, @Valid VehicleRegistrationRefDto vehicleRegistration
@Valid SourceGroupRefDto sourceOrganisation
) { ) {
public VehicleRefDto { public VehicleRefDto {
sourceEntityId = normalizeNullable(sourceEntityId); sourceEntityId = normalizeNullable(sourceEntityId);

View File

@ -3,7 +3,6 @@ package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.service.EventAcquisitionRecordKeyService; import at.procon.eventhub.service.EventAcquisitionRecordKeyService;
@ -36,9 +35,8 @@ public class EventRepository {
* Acquisition-stage persistence. This table stores source records as imported. * Acquisition-stage persistence. This table stores source records as imported.
* It does not merge or deduplicate equivalent events from different sources; * It does not merge or deduplicate equivalent events from different sources;
* later query/read models can combine sources when a preferred source has gaps. * later query/read models can combine sources when a preferred source has gaps.
* For now this table keeps acquired point events with EventSource context, * Organisation assignment is not stored per event; it belongs to master-data
* source-side driver/vehicle refs, source organisation information, generic * relations for driver/vehicle and can be resolved by occurredAt later.
* eventDetails, and raw payload JSON.
*/ */
public int batchInsert(UUID packageId, int eventSourceId, List<EventHubEventDto> events) { public int batchInsert(UUID packageId, int eventSourceId, List<EventHubEventDto> events) {
int[] counts = jdbcTemplate.batchUpdate( int[] counts = jdbcTemplate.batchUpdate(
@ -47,9 +45,7 @@ public class EventRepository {
id, event_source_id, data_package_id, id, event_source_id, data_package_id,
external_source_event_id, external_source_event_id,
driver_source_entity_id, driver_card_nation, driver_card_number, driver_source_entity_id, driver_card_nation, driver_card_number,
driver_source_org_entity_id, driver_source_org_code, driver_source_org_name,
vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number,
vehicle_source_org_entity_id, vehicle_source_org_code, vehicle_source_org_name,
occurred_at, received_partner_at, received_hub_at, occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle, event_domain, event_type, lifecycle,
odometer_m, latitude, longitude, odometer_m, latitude, longitude,
@ -59,12 +55,10 @@ public class EventRepository {
?, ?, ?, ?, ?, ?,
?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?,
?::jsonb, ?::jsonb, ?, ?::jsonb, ?::jsonb, ?,
?, ? ?, ?
) )
@ -78,10 +72,8 @@ public class EventRepository {
OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt(); OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt();
DriverRefDto driverRef = event.driverRef(); DriverRefDto driverRef = event.driverRef();
DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard(); DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard();
SourceGroupRefDto driverOrg = driverRef == null ? null : driverRef.sourceOrganisation();
VehicleRefDto vehicleRef = event.vehicleRef(); VehicleRefDto vehicleRef = event.vehicleRef();
VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration(); VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration();
SourceGroupRefDto vehicleOrg = vehicleRef == null ? null : vehicleRef.sourceOrganisation();
ps.setObject(1, eventId); ps.setObject(1, eventId);
ps.setInt(2, eventSourceId); ps.setInt(2, eventSourceId);
@ -91,37 +83,31 @@ public class EventRepository {
ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId()); ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId());
ps.setString(6, driverCard == null ? null : driverCard.nation()); ps.setString(6, driverCard == null ? null : driverCard.nation());
ps.setString(7, driverCard == null ? null : driverCard.number()); ps.setString(7, driverCard == null ? null : driverCard.number());
ps.setString(8, driverOrg == null ? null : driverOrg.sourceEntityId());
ps.setString(9, driverOrg == null ? null : driverOrg.code());
ps.setString(10, driverOrg == null ? null : driverOrg.name());
ps.setString(11, vehicleRef == null ? null : vehicleRef.sourceEntityId()); ps.setString(8, vehicleRef == null ? null : vehicleRef.sourceEntityId());
ps.setString(12, vehicleRef == null ? null : vehicleRef.vin()); ps.setString(9, vehicleRef == null ? null : vehicleRef.vin());
ps.setString(13, vehicleRegistration == null ? null : vehicleRegistration.nation()); ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation());
ps.setString(14, vehicleRegistration == null ? null : vehicleRegistration.number()); ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number());
ps.setString(15, vehicleOrg == null ? null : vehicleOrg.sourceEntityId());
ps.setString(16, vehicleOrg == null ? null : vehicleOrg.code());
ps.setString(17, vehicleOrg == null ? null : vehicleOrg.name());
ps.setObject(18, event.occurredAt()); ps.setObject(12, event.occurredAt());
ps.setObject(19, event.receivedPartnerAt()); ps.setObject(13, event.receivedPartnerAt());
ps.setObject(20, receivedHubAt); ps.setObject(14, receivedHubAt);
ps.setString(21, event.eventDomain().name()); ps.setString(15, event.eventDomain().name());
ps.setString(22, event.eventType().name()); ps.setString(16, event.eventType().name());
ps.setString(23, event.lifecycle().name()); ps.setString(17, event.lifecycle().name());
setNullableLong(ps, 24, event.odometerM()); setNullableLong(ps, 18, event.odometerM());
if (event.position() == null) { if (event.position() == null) {
ps.setNull(25, Types.NUMERIC); ps.setNull(19, Types.NUMERIC);
ps.setNull(26, Types.NUMERIC); ps.setNull(20, Types.NUMERIC);
} else { } else {
ps.setObject(25, event.position().latitude()); ps.setObject(19, event.position().latitude());
ps.setObject(26, event.position().longitude()); ps.setObject(20, event.position().longitude());
} }
ps.setString(27, toJson(objectMapper.valueToTree(event.eventDetails()))); ps.setString(21, toJson(objectMapper.valueToTree(event.eventDetails())));
ps.setString(28, toJson(event.payload())); ps.setString(22, toJson(event.payload()));
ps.setBoolean(29, event.manualEntry()); ps.setBoolean(23, event.manualEntry());
ps.setString(30, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId)); ps.setString(24, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId));
ps.setString(31, recordKeyService.buildEventSignatureHash(event)); ps.setString(25, recordKeyService.buildEventSignatureHash(event));
} }
@Override @Override

View File

@ -15,8 +15,9 @@ public class EventSourceRepository {
this.jdbcTemplate = jdbcTemplate; this.jdbcTemplate = jdbcTemplate;
} }
public int resolveSourceId(EventSourceDto eventSource) { public int resolveSourceId(String tenantKey, EventSourceDto eventSource) {
Integer existing = findSourceId(eventSource); String normalizedTenantKey = tenantKey == null || tenantKey.isBlank() ? "default" : tenantKey.trim();
Integer existing = findSourceId(normalizedTenantKey, eventSource);
if (existing != null) { if (existing != null) {
return existing; return existing;
} }
@ -25,40 +26,43 @@ public class EventSourceRepository {
PreparedStatement ps = connection.prepareStatement( PreparedStatement ps = connection.prepareStatement(
""" """
insert into eventhub.event_source( insert into eventhub.event_source(
provider_key, source_kind, source_key, source_instance_key, tenant_key, provider_key, source_kind, source_key, source_instance_key,
tenant_provider_setting_key, external_fleet_key tenant_provider_setting_key, external_fleet_key
) values (?, ?, ?, ?, ?, ?) ) values (?, ?, ?, ?, ?, ?, ?)
on conflict (provider_key, source_kind, source_key, source_instance_key) do nothing on conflict (tenant_key, provider_key, source_kind, source_key, source_instance_key) do nothing
""", """,
Statement.NO_GENERATED_KEYS Statement.NO_GENERATED_KEYS
); );
ps.setString(1, eventSource.providerKey()); ps.setString(1, normalizedTenantKey);
ps.setString(2, eventSource.sourceKind()); ps.setString(2, eventSource.providerKey());
ps.setString(3, eventSource.sourceKey()); ps.setString(3, eventSource.sourceKind());
ps.setString(4, eventSource.sourceInstanceKey() == null ? "default" : eventSource.sourceInstanceKey()); ps.setString(4, eventSource.sourceKey());
ps.setString(5, eventSource.tenantProviderSettingKey()); ps.setString(5, eventSource.sourceInstanceKey() == null ? "default" : eventSource.sourceInstanceKey());
ps.setString(6, eventSource.externalFleetKey()); ps.setString(6, eventSource.tenantProviderSettingKey());
ps.setString(7, eventSource.externalFleetKey());
return ps; return ps;
}); });
Integer created = findSourceId(eventSource); Integer created = findSourceId(normalizedTenantKey, eventSource);
if (created == null) { if (created == null) {
throw new IllegalStateException("Could not resolve event source id for " + eventSource.stableKey()); throw new IllegalStateException("Could not resolve event source id for " + normalizedTenantKey + ":" + eventSource.stableKey());
} }
return created; return created;
} }
private Integer findSourceId(EventSourceDto eventSource) { private Integer findSourceId(String tenantKey, EventSourceDto eventSource) {
return jdbcTemplate.query( return jdbcTemplate.query(
""" """
select id select id
from eventhub.event_source from eventhub.event_source
where provider_key = ? where tenant_key = ?
and provider_key = ?
and source_kind = ? and source_kind = ?
and source_key = ? and source_key = ?
and source_instance_key = ? and source_instance_key = ?
""", """,
rs -> rs.next() ? rs.getInt("id") : null, rs -> rs.next() ? rs.getInt("id") : null,
tenantKey,
eventSource.providerKey(), eventSource.providerKey(),
eventSource.sourceKind(), eventSource.sourceKind(),
eventSource.sourceKey(), eventSource.sourceKey(),

View File

@ -55,7 +55,7 @@ public class EventHubIngestionService {
} }
EventSourceDto eventSource = packageInfo.eventSource(); EventSourceDto eventSource = packageInfo.eventSource();
int eventSourceId = eventSourceRepository.resolveSourceId(eventSource); int eventSourceId = eventSourceRepository.resolveSourceId(packageInfo.tenantKey(), eventSource);
List<EventHubEventDto> sortedEvents = sorter.sort(batch.events()); List<EventHubEventDto> sortedEvents = sorter.sort(batch.events());
sortedEvents.forEach(validator::validate); sortedEvents.forEach(validator::validate);

View File

@ -0,0 +1,77 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.TachographImportPlanDto;
import at.procon.eventhub.dto.TachographImportPlanItemDto;
import at.procon.eventhub.dto.TachographImportRequest;
import java.util.ArrayList;
import java.util.List;
import org.springframework.stereotype.Service;
@Service
public class TachographImportPlanService {
public TachographImportPlanDto createPlan(TachographImportRequest request) {
List<TachographImportPlanItemDto> items = new ArrayList<>();
for (EventFamily family : request.eventFamilies()) {
items.addAll(itemsFor(family));
}
return new TachographImportPlanDto(
request.tenantKey(),
request.mode(),
request.acquisitionStrategy(),
request.refreshMasterDataFirst(),
request.importScope(),
request.sourceGroup(),
request.eventSource(),
items
);
}
private List<TachographImportPlanItemDto> itemsFor(EventFamily family) {
return switch (family) {
case DRIVER_ACTIVITY -> List.of(
item(family, "VEHICLE_UNIT", "VU_ACTIVITY", List.of("VUActivity"), "VEHICLE", "Vehicle-unit driver activity point events"),
item(family, "DRIVER_CARD", "CARD_ACTIVITY", List.of("CardActivity"), "DRIVER", "Driver-card activity point events")
);
case DRIVER_CARD -> List.of(
item(family, "VEHICLE_UNIT", "IW_CYCLE", List.of("IWCycle"), "BOTH", "Card insert/withdraw events from VU cycles"),
item(family, "DRIVER_CARD", "CARD_VEHICLES_USED", List.of("CardVehiclesUsed"), "DRIVER", "Card insert/withdraw/use events from card vehicle usage")
);
case POSITION -> List.of(
item(family, "VEHICLE_UNIT", "VU_POSITION", List.of("VUPlaces", "VULoadUnload", "VUGnssAccumulatedDriving", "VUBorderCrossing"), "VEHICLE", "Position points from VU tachograph sources"),
item(family, "DRIVER_CARD", "CARD_POSITION", List.of("CardPlaces", "CardLoadUnload", "CardGnssAccumulatedDriving", "CardBorderCrossing"), "DRIVER", "Position points from driver-card tachograph sources")
);
case BORDER_CROSSING -> List.of(
item(family, "VEHICLE_UNIT", "VU_BORDER_CROSSING", List.of("VUBorderCrossing"), "VEHICLE", "Border crossing events from VU"),
item(family, "DRIVER_CARD", "CARD_BORDER_CROSSING", List.of("CardBorderCrossing"), "DRIVER", "Border crossing events from driver card")
);
case LOAD_UNLOAD -> List.of(
item(family, "VEHICLE_UNIT", "VU_LOAD_UNLOAD", List.of("VULoadUnload"), "VEHICLE", "Load/unload operation events from VU"),
item(family, "DRIVER_CARD", "CARD_LOAD_UNLOAD", List.of("CardLoadUnload"), "DRIVER", "Load/unload operation events from driver card")
);
case SPECIFIC_CONDITION -> List.of(
item(family, "VEHICLE_UNIT", "VU_SPECIFIC_CONDITION", List.of("VUSpecificCondition"), "VEHICLE", "Out-of-scope and ferry/train events from VU"),
item(family, "DRIVER_CARD", "CARD_SPECIFIC_CONDITION", List.of("CardSpecificCondition"), "DRIVER", "Out-of-scope and ferry/train events from driver card")
);
case PLACE -> List.of(
item(family, "VEHICLE_UNIT", "VU_PLACE", List.of("VUPlaces"), "VEHICLE", "Start/end place events from VU"),
item(family, "DRIVER_CARD", "CARD_PLACE", List.of("CardPlaces"), "DRIVER", "Start/end place events from driver card")
);
case SPEEDING -> List.of(
item(family, "VEHICLE_UNIT", "SPEEDING_EVENTS", List.of("SpeedingEvents"), "VEHICLE", "Speeding begin/end events")
);
};
}
private TachographImportPlanItemDto item(
EventFamily family,
String sourceKind,
String extractionCode,
List<String> sourceTables,
String entityAxis,
String description
) {
return new TachographImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description);
}
}

View File

@ -3,9 +3,11 @@ create extension if not exists pgcrypto;
create schema if not exists eventhub; create schema if not exists eventhub;
-- Acquisition source definition. This represents where the imported source -- Acquisition source definition. This represents where the imported source
-- record came from. Source records are intentionally kept separately by provider/source. -- record came from. It is tenant-scoped because the same provider/source keys
-- may be reused by different customers.
create table if not exists eventhub.event_source ( create table if not exists eventhub.event_source (
id integer generated always as identity primary key, id integer generated always as identity primary key,
tenant_key text not null,
provider_key text not null, provider_key text not null,
source_kind text not null, source_kind text not null,
source_key text not null, source_key text not null,
@ -13,7 +15,56 @@ create table if not exists eventhub.event_source (
tenant_provider_setting_key text, tenant_provider_setting_key text,
external_fleet_key text, external_fleet_key text,
created_at timestamptz not null default now(), created_at timestamptz not null default now(),
constraint ux_event_source unique (provider_key, source_kind, source_key, source_instance_key) constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key)
);
-- One execution of a tachograph acquisition job. A run may create many data packages.
create table if not exists eventhub.import_run (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
mode text not null,
status text not null,
refresh_master_data_first boolean not null default true,
source_group_type text,
source_group_entity_id text,
source_group_code text,
source_group_name text,
import_scope_type text not null,
root_source_org_entity_id text,
root_source_org_code text,
root_source_org_name text,
include_children boolean not null default false,
occurred_from timestamptz,
occurred_to timestamptz,
requested_event_families text[] not null default '{}',
acquisition_strategy text,
metadata jsonb not null default '{}'::jsonb,
started_at timestamptz not null default now(),
finished_at timestamptz,
error_message text,
constraint chk_import_run_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to)
);
-- Optional cursor table for scheduled/difference imports. The first implementation can
-- use occurredAt windows; later it can switch to source-package or source-row watermarks.
create table if not exists eventhub.import_cursor (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
scope_hash text not null,
event_family text not null,
source_kind text not null,
cursor_type text not null,
last_source_package_imported_at timestamptz,
last_source_package_id text,
last_source_row_updated_at timestamptz,
last_occurred_to timestamptz,
updated_at timestamptz not null default now(),
constraint ux_import_cursor unique (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type)
); );
-- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/import scope. -- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/import scope.
@ -21,6 +72,7 @@ create table if not exists eventhub.event_source (
create table if not exists eventhub.data_package ( create table if not exists eventhub.data_package (
id uuid primary key, id uuid primary key,
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
import_run_id uuid references eventhub.import_run(id),
tenant_key text not null, tenant_key text not null,
package_key text not null, package_key text not null,
package_type text not null, package_type text not null,
@ -52,9 +104,10 @@ create table if not exists eventhub.data_package (
); );
-- Temporary acquisition-stage point-event store. -- Temporary acquisition-stage point-event store.
-- It keeps the discussed DTO shape: EventSource context, externalSourceEventId, -- It keeps acquired point events with EventSource context, externalSourceEventId,
-- one occurredAt timestamp, source-side driver/vehicle refs, source-side organisation assignments, -- one occurredAt timestamp, source-side driver/vehicle refs, normalized event details,
-- normalized event details, and raw JSON payload. -- and raw JSON payload. Organisation is intentionally not stored per event; it belongs
-- to master-data relations for driver/vehicle and is represented in importScope/sourceGroup.
create table if not exists eventhub.acquired_event ( create table if not exists eventhub.acquired_event (
id uuid not null, id uuid not null,
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
@ -65,17 +118,11 @@ create table if not exists eventhub.acquired_event (
driver_source_entity_id text, driver_source_entity_id text,
driver_card_nation text, driver_card_nation text,
driver_card_number text, driver_card_number text,
driver_source_org_entity_id text,
driver_source_org_code text,
driver_source_org_name text,
vehicle_source_entity_id text, vehicle_source_entity_id text,
vehicle_vin text, vehicle_vin text,
vehicle_registration_nation text, vehicle_registration_nation text,
vehicle_registration_number text, vehicle_registration_number text,
vehicle_source_org_entity_id text,
vehicle_source_org_code text,
vehicle_source_org_name text,
occurred_at timestamptz not null, occurred_at timestamptz not null,
received_partner_at timestamptz, received_partner_at timestamptz,
@ -136,14 +183,6 @@ create index if not exists idx_acquired_event_driver_card_time
on eventhub.acquired_event(driver_card_nation, driver_card_number, occurred_at desc) on eventhub.acquired_event(driver_card_nation, driver_card_number, occurred_at desc)
where driver_card_number is not null; where driver_card_number is not null;
create index if not exists idx_acquired_event_driver_org_time
on eventhub.acquired_event(driver_source_org_entity_id, occurred_at desc)
where driver_source_org_entity_id is not null;
create index if not exists idx_acquired_event_vehicle_org_time
on eventhub.acquired_event(vehicle_source_org_entity_id, occurred_at desc)
where vehicle_source_org_entity_id is not null;
create index if not exists idx_acquired_event_domain_type_time create index if not exists idx_acquired_event_domain_type_time
on eventhub.acquired_event(event_domain, event_type, occurred_at desc); on eventhub.acquired_event(event_domain, event_type, occurred_at desc);
@ -158,3 +197,6 @@ create index if not exists idx_data_package_source_time
create index if not exists idx_data_package_scope create index if not exists idx_data_package_scope
on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to); on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to);
create index if not exists idx_import_run_source_status
on eventhub.import_run(tenant_key, event_source_id, status, started_at desc);

View File

@ -54,8 +54,8 @@ class EventAcquisitionRecordKeyServiceTest {
return new EventHubEventDto( return new EventHubEventDto(
null, null,
externalSourceEventId, externalSourceEventId,
new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789"), null), new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")),
new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345"), null), new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345")),
OffsetDateTime.parse("2026-04-28T08:00:00+02:00"), OffsetDateTime.parse("2026-04-28T08:00:00+02:00"),
null, null,
null, null,

View File

@ -35,8 +35,8 @@ class YellowFoxD8BookingEventMapperTest {
3, 3,
OffsetDateTime.parse("2026-04-29T08:15:00+02:00"), OffsetDateTime.parse("2026-04-29T08:15:00+02:00"),
null, null,
new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789"), null), new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")),
new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345"), null), new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345")),
null, null,
null, null,
null, null,