diff --git a/README.md b/README.md index 0c3a4e6..4d5e165 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Spring Boot + Apache Camel project skeleton for acquiring normalized EventHub point events from multiple providers/sources. -The current version intentionally focuses on **acquisition**. Final canonical storage/deduplication can be discussed later. The included PostgreSQL schema is a small acquisition-stage store so the project can be run and tested end-to-end. +The current version intentionally focuses on **acquisition**. It stores source records as imported and does not merge or deduplicate equivalent events from different providers/sources. It does, however, keep a non-unique eventSignatureHash as a later merge/gap-filling hint. Later query/read models can merge sources when a preferred/main source contains gaps. The included PostgreSQL schema is a small acquisition-stage store so the project can be run and tested end-to-end. ## Architecture @@ -12,7 +12,7 @@ source-specific Camel input route -> EventHubEventDto -> common EventHub acquisition route -> validation - -> package-key creation from tenant + EventSource + event family + date/window + -> package-key creation from tenant + EventSource + source group + import scope + event family -> aggregation / batching -> chronological sorting inside the batch -> acquisition package handoff @@ -40,17 +40,7 @@ There is no generic `duration`, `endTime`, `validFrom`, or `validTo`. If a sourc `tenantKey` identifies the owner/client/account for the package. It is required for acquisition grouping and future master-data resolution. -```json -{ - "tenantKey": "kralowetz" -} -``` - -Organisation is not mandatory in the incoming event. It can later be derived from resolved driver/vehicle + `occurredAt`. - -### 3. EventSource replaces sourceTable/sourceSystem - -The acquisition context is represented by `EventSourceDto`: +### 3. EventSource identifies the technical source ```json { @@ -72,13 +62,97 @@ YELLOWFOX / TELEMATICS_PLATFORM / YELLOWFOX_D8 FLEETBOARD / TELEMATICS_PLATFORM / FLEETBOARD_POSITION ``` -`EventSource` is acquisition context. It should not be part of the canonical real-world event identity. A VU event and a driver-card event may describe the same real event. +`EventSource` is acquisition context. A VU event, a driver-card event and a YellowFox D8 event may describe the same real-world event, but this acquisition service keeps them as separate acquired source records. Cross-source merging/gap filling is intentionally left for a later query/read model. -### 4. Source-side master references, no incoming internal IDs +### 4. No cross-source deduplication during acquisition + +The acquisition layer stores every source record independently. It uses `sourceRecordKeyHash` only for idempotency of the same source event, so the same input package can be retried safely. It does **not** deduplicate VU vs driver-card vs YellowFox records. + +This is intentional because later queries may need to combine sources: for example, use tachograph data as the main source, but fill gaps from YellowFox or another provider. + +The acquisition table also stores a non-unique `eventSignatureHash`. This is a semantic merge hint, not a unique key. It intentionally excludes `EventSource` and `externalSourceEventId`, so VU, driver-card and YellowFox records that look like the same real-world event can share a signature while still being stored separately. Later query/projection logic can use this signature for source comparison, gap filling, and merged timelines. The signature prefers nation-scoped driver card and vehicle registration when available, then VIN or source entity id as fallback, so it remains useful before final master-data resolution. + +Therefore the current model preserves: + +```text +tenantKey +eventSource +sourceGroup +importScope +externalSourceEventId +source-side driver/vehicle references +eventDetails +payload +``` + +### 5. SourceGroup captures tachograph organisation or YellowFox fleet + +`sourceGroup` is package-level source grouping information. + +For tachograph it can be a source organisation: + +```json +"sourceGroup": { + "type": "ORGANISATION", + "sourceEntityId": "147", + "code": "147", + "name": "Kralowetz" +} +``` + +For YellowFox it can be a fleet: + +```json +"sourceGroup": { + "type": "FLEET", + "sourceEntityId": "7", + "code": "7", + "name": "YellowFox Fleet 7" +} +``` + +The YellowFox fleet belongs to the same tenant/customer, but it is not forced to be an organisation. It can later be mapped to a tenant organisation if needed. + +### 6. ImportScope captures organisation and time filtering + +`importScope` describes what was selected from the source system. + +Full DB import: + +```json +"importScope": { + "type": "TENANT_ALL", + "rootSourceOrganisation": null, + "includeChildren": false, + "occurredFrom": null, + "occurredTo": null +} +``` + +Organisation subtree + time-window import: + +```json +"importScope": { + "type": "SOURCE_ORGANISATION_SUBTREE", + "rootSourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "147", + "code": "147", + "name": "Kralowetz" + }, + "includeChildren": true, + "occurredFrom": "2026-04-28T00:00:00+02:00", + "occurredTo": "2026-04-29T00:00:00+02:00" +} +``` + +`occurredFrom` is inclusive and `occurredTo` is exclusive. Both may be `null` for complete source DB import. + +### 7. Source-side master references, no incoming internal IDs The incoming DTO does not require internal `driverId` or `vehicleId`, because in normal ingestion those ids are not known yet. -Driver reference: +Driver reference with nation-scoped driver card: ```json "driverRef": { @@ -86,11 +160,17 @@ Driver reference: "driverCard": { "nation": "AT", "number": "D123456789" + }, + "sourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "57", + "code": "57", + "name": "Sub Org 57" } } ``` -Vehicle reference: +Vehicle reference with optional VIN and nation-scoped VRN: ```json "vehicleRef": { @@ -99,6 +179,12 @@ Vehicle reference: "vehicleRegistration": { "nation": "AT", "number": "W-12345" + }, + "sourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "57", + "code": "57", + "name": "Sub Org 57" } } ``` @@ -112,13 +198,14 @@ VIN is optional. Driver-card-only events can carry only the nation-scoped VRN/re "vehicleRegistration": { "nation": "AT", "number": "W-12345" - } + }, + "sourceOrganisation": null } ``` This allows late resolution when VU/master data later connects the VRN to a VIN. -### 5. Generic normalized eventDetails +### 8. Generic normalized eventDetails Reusable event-specific properties are stored in: @@ -133,19 +220,7 @@ Reusable event-specific properties are stored in: } ``` -Raw provider values stay in `payload`: - -```json -"payload": { - "raw": { - "cardSlot": 0, - "cardStatus": 0, - "drivingStatus": 0 - } -} -``` - -This keeps the acquisition DTO generic while preserving meaningful normalized fields. +Raw provider values stay in `payload`. ## Package-level acquisition request @@ -162,11 +237,27 @@ For external/manual ingestion, the preferred request shape is: "sourceInstanceKey": "main-tachograph-db", "tenantProviderSettingKey": "kralowetz-tachograph-prod" }, + "sourceGroup": { + "type": "ORGANISATION", + "sourceEntityId": "147", + "code": "147", + "name": "Kralowetz" + }, + "importScope": { + "type": "SOURCE_ORGANISATION_SUBTREE", + "rootSourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "147", + "code": "147", + "name": "Kralowetz" + }, + "includeChildren": true, + "occurredFrom": "2026-04-28T00:00:00+02:00", + "occurredTo": "2026-04-29T00:00:00+02:00" + }, "eventFamily": "DRIVER_ACTIVITY", "businessDate": "2026-04-28", - "requestedFrom": "2026-04-28T00:00:00+02:00", - "requestedTo": "2026-04-29T00:00:00+02:00", - "externalPackageId": "TACHOGRAPH:VEHICLE_UNIT:DRIVER_ACTIVITY:2026-04-28" + "externalPackageId": "TACHOGRAPH:ORG-147-SUBTREE:DRIVER_ACTIVITY:2026-04-28" }, "events": [ { @@ -176,6 +267,10 @@ For external/manual ingestion, the preferred request shape is: "driverCard": { "nation": "AT", "number": "D123456789" + }, + "sourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "57" } }, "vehicleRef": { @@ -184,6 +279,10 @@ For external/manual ingestion, the preferred request shape is: "vehicleRegistration": { "nation": "AT", "number": "W-12345" + }, + "sourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "57" } }, "occurredAt": "2026-04-28T08:00:00+02:00", @@ -228,7 +327,7 @@ direct:eventhub-manual-input ```text direct:eventhub-normalized-input -> validate EventHubEventDto - -> create package key from tenant + EventSource/package context + -> create package key from tenant + EventSource + sourceGroup + importScope + eventFamily -> seda:eventhub-batch-input -> aggregate by eventhub.packageKey -> sort by occurredAt inside the batch @@ -262,6 +361,10 @@ curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activitie "driverCard": { "nation": "AT", "number": "D123456789" + }, + "sourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "57" } }, "vehicleRef": { @@ -270,7 +373,8 @@ curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activitie "vehicleRegistration": { "nation": "AT", "number": "W-12345" - } + }, + "sourceOrganisation": null }, "occurredAt": "2026-04-28T08:00:00+02:00", "activityType": "DRIVE", @@ -290,16 +394,35 @@ curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activitie ]' ``` -The mapper creates: +The mapper creates a default `TENANT_ALL` one-day import scope for this convenience endpoint. For real tachograph import jobs with organisation subtree/full DB scope, use the package-level request or add dedicated SQL extraction job routes. -```text -Tenant = kralowetz -EventSource = TACHOGRAPH / DRIVER_CARD / TACHOGRAPH_DRIVER_CARD -EventDomain = DRIVER_ACTIVITY -EventType = DRIVE -Lifecycle = START -EventDetails.type = DRIVER_ACTIVITY -VehicleRef = VRN-only, VIN can be resolved later +## Example: full tachograph DB import package + +```json +{ + "package": { + "tenantKey": "kralowetz", + "eventSource": { + "providerKey": "TACHOGRAPH", + "sourceKind": "VEHICLE_UNIT", + "sourceKey": "TACHOGRAPH_VEHICLE_UNIT", + "sourceInstanceKey": "main-tachograph-db", + "tenantProviderSettingKey": "kralowetz-tachograph-prod" + }, + "sourceGroup": null, + "importScope": { + "type": "TENANT_ALL", + "rootSourceOrganisation": null, + "includeChildren": false, + "occurredFrom": null, + "occurredTo": null + }, + "eventFamily": "DRIVER_ACTIVITY", + "businessDate": null, + "externalPackageId": "TACHOGRAPH:ALL:DRIVER_ACTIVITY:FULL" + }, + "events": [] +} ``` ## Start PostgreSQL @@ -322,6 +445,12 @@ select p.received_at, s.provider_key, s.source_kind, s.source_key, + p.source_group_type, + p.source_group_entity_id, + p.import_scope_type, + p.root_source_org_entity_id, + p.occurred_from, + p.occurred_to, p.event_family, p.business_date, p.status, @@ -338,10 +467,12 @@ select occurred_at, driver_source_entity_id, driver_card_nation, driver_card_number, + driver_source_org_entity_id, vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, + vehicle_source_org_entity_id, event_domain, event_type, lifecycle, @@ -361,12 +492,15 @@ order by occurred_at desc; - load/unload - specific conditions: out-of-scope and ferry/train - speeding events -2. Keep each extractor package-scoped by `tenant + EventSource + eventFamily + businessDate/import window`. +2. Each SQL extraction route should accept `ImportScopeDto`: + - optional source organisation root + include children + - optional occurredFrom/occurredTo + - null time bounds mean complete DB/history import 3. Add master-data resolution later: - driver by tenant + driver card nation/number + occurredAt - vehicle by tenant + VIN or tenant + registration nation/number + occurredAt - late resolution from VRN-only driver-card events to VIN after VU/master data import -4. Discuss final storage model: - - canonical `eventhub.event` - - source-record table linked to EventSource/package - - deduplication policy for VU vs driver-card duplicates +4. Discuss query/read models later: + - how to merge acquired events from all sources at query time + - source priority per event family when the main source contains gaps + - how to expose source provenance when multiple sources describe the same real-world event diff --git a/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java b/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java index a32e492..d5a32a7 100644 --- a/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java +++ b/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java @@ -4,6 +4,7 @@ import at.procon.eventhub.dto.DataPackageType; import at.procon.eventhub.dto.EventHubEventBatchDto; import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.service.EventHubEventSorter; import java.time.OffsetDateTime; import java.util.HashMap; @@ -34,11 +35,12 @@ public class EventHubBatchBuildProcessor implements Processor { packageInfo = sortedEvents.getFirst().packageInfo(); } - OffsetDateTime requestedFrom = packageInfo != null && packageInfo.requestedFrom() != null - ? packageInfo.requestedFrom() + ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope(); + OffsetDateTime occurredFrom = importScope != null && importScope.occurredFrom() != null + ? importScope.occurredFrom() : sortedEvents.getFirst().occurredAt(); - OffsetDateTime requestedTo = packageInfo != null && packageInfo.requestedTo() != null - ? packageInfo.requestedTo() + OffsetDateTime occurredTo = importScope != null && importScope.occurredTo() != null + ? importScope.occurredTo() : sortedEvents.getLast().occurredAt(); Map metadata = new HashMap<>(); @@ -48,6 +50,8 @@ public class EventHubBatchBuildProcessor implements Processor { if (packageInfo != null) { metadata.put("tenantKey", packageInfo.tenantKey()); metadata.put("eventSource", packageInfo.eventSource().stableKey()); + metadata.put("sourceGroup", packageInfo.sourceGroup() == null ? null : packageInfo.sourceGroup().stableKey()); + metadata.put("importScope", packageInfo.importScope() == null ? null : packageInfo.importScope().stableKey()); metadata.put("eventFamily", packageInfo.eventFamily()); metadata.put("businessDate", packageInfo.businessDate()); metadata.put("externalPackageId", packageInfo.externalPackageId()); @@ -57,8 +61,8 @@ public class EventHubBatchBuildProcessor implements Processor { packageKey, packageInfo, DataPackageType.CAMEL_BATCH, - requestedFrom, - requestedTo, + occurredFrom, + occurredTo, sortedEvents, metadata )); diff --git a/src/main/java/at/procon/eventhub/dto/DriverRefDto.java b/src/main/java/at/procon/eventhub/dto/DriverRefDto.java index 2898914..af9c775 100644 --- a/src/main/java/at/procon/eventhub/dto/DriverRefDto.java +++ b/src/main/java/at/procon/eventhub/dto/DriverRefDto.java @@ -9,7 +9,8 @@ import jakarta.validation.Valid; */ public record DriverRefDto( String sourceEntityId, - @Valid DriverCardRefDto driverCard + @Valid DriverCardRefDto driverCard, + @Valid SourceGroupRefDto sourceOrganisation ) { public DriverRefDto { sourceEntityId = normalizeNullable(sourceEntityId); diff --git a/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java b/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java index b1894af..4439950 100644 --- a/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java +++ b/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java @@ -8,8 +8,8 @@ public record EventHubEventBatchDto( String packageKey, EventHubPackageRequest packageInfo, DataPackageType packageType, - OffsetDateTime requestedFrom, - OffsetDateTime requestedTo, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, List events, Map metadata ) { diff --git a/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java b/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java index 6b24a0c..9e11d51 100644 --- a/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java +++ b/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java @@ -17,7 +17,7 @@ import java.util.UUID; public record EventHubEventDto( UUID eventId, - /** Stable id of this imported/source event record, not the canonical business event id. */ + /** Stable id of this imported/source event record. */ @NotBlank String externalSourceEventId, /** Source-side driver reference. No internal driver id is required during acquisition. */ diff --git a/src/main/java/at/procon/eventhub/dto/EventHubPackageRequest.java b/src/main/java/at/procon/eventhub/dto/EventHubPackageRequest.java index db5e744..ae81406 100644 --- a/src/main/java/at/procon/eventhub/dto/EventHubPackageRequest.java +++ b/src/main/java/at/procon/eventhub/dto/EventHubPackageRequest.java @@ -4,20 +4,24 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import java.time.LocalDate; -import java.time.OffsetDateTime; /** - * Acquisition package context. One package should represent one coherent import - * unit, e.g. tenant + TACHOGRAPH + VEHICLE_UNIT + DRIVER_ACTIVITY + business date. + * Acquisition package context. One package represents one coherent import unit, + * for example tenant + TACHOGRAPH + VEHICLE_UNIT + DRIVER_ACTIVITY + import scope. */ public record EventHubPackageRequest( /** Tenant/client/account owning the acquired data. */ @NotBlank String tenantKey, @Valid @NotNull EventSourceDto eventSource, + + /** Optional source-side group: tachograph organisation, YellowFox fleet, etc. */ + @Valid SourceGroupRefDto sourceGroup, + + /** Organisation/time selection used to acquire the package. */ + @Valid @NotNull ImportScopeDto importScope, + @NotBlank String eventFamily, LocalDate businessDate, - OffsetDateTime requestedFrom, - OffsetDateTime requestedTo, @NotBlank String externalPackageId ) { public EventHubPackageRequest { @@ -25,6 +29,9 @@ public record EventHubPackageRequest( if (eventFamily != null) { eventFamily = eventFamily.trim().toUpperCase().replace('-', '_').replace(' ', '_'); } + if (importScope == null) { + importScope = ImportScopeDto.tenantAll(null, null); + } } private static String normalizeTenant(String value) { diff --git a/src/main/java/at/procon/eventhub/dto/EventSourceDto.java b/src/main/java/at/procon/eventhub/dto/EventSourceDto.java index 201da90..17285d2 100644 --- a/src/main/java/at/procon/eventhub/dto/EventSourceDto.java +++ b/src/main/java/at/procon/eventhub/dto/EventSourceDto.java @@ -5,9 +5,9 @@ import jakarta.validation.constraints.NotBlank; /** * Describes the origin of an acquired event record. * - * This is intentionally acquisition/source context and not part of the canonical - * real-world event identity. The same canonical event can be acquired from - * TACHOGRAPH/VEHICLE_UNIT and later from TACHOGRAPH/DRIVER_CARD. + * This is intentionally acquisition/source context. Equivalent real-world events + * from different sources are kept as separate acquired records. Query/read + * models can later combine sources when a preferred source contains gaps. */ public record EventSourceDto( @NotBlank String providerKey, diff --git a/src/main/java/at/procon/eventhub/dto/ImportScopeDto.java b/src/main/java/at/procon/eventhub/dto/ImportScopeDto.java new file mode 100644 index 0000000..9ab1417 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/ImportScopeDto.java @@ -0,0 +1,46 @@ +package at.procon.eventhub.dto; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import java.time.OffsetDateTime; + +/** + * Selection scope used by acquisition jobs. + * + * Organisation scope and event-time scope are intentionally modeled together, + * because a tachograph import can be unrestricted, organisation-subtree scoped, + * time-window scoped, or both. + */ +public record ImportScopeDto( + @NotNull ImportScopeType type, + @Valid SourceGroupRefDto rootSourceOrganisation, + boolean includeChildren, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo +) { + public ImportScopeDto { + if (type == null) { + type = ImportScopeType.TENANT_ALL; + } + if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) { + throw new IllegalArgumentException("importScope.occurredFrom must be before occurredTo"); + } + if (type == ImportScopeType.SOURCE_ORGANISATION_SUBTREE) { + if (rootSourceOrganisation == null || rootSourceOrganisation.type() != SourceGroupType.ORGANISATION) { + throw new IllegalArgumentException("SOURCE_ORGANISATION_SUBTREE requires rootSourceOrganisation.type=ORGANISATION"); + } + } + } + + public static ImportScopeDto tenantAll(OffsetDateTime occurredFrom, OffsetDateTime occurredTo) { + return new ImportScopeDto(ImportScopeType.TENANT_ALL, null, false, occurredFrom, occurredTo); + } + + public String stableKey() { + return type.name() + ":" + + (rootSourceOrganisation == null ? "ALL" : rootSourceOrganisation.stableKey()) + ":" + + (includeChildren ? "WITH_CHILDREN" : "NO_CHILDREN") + ":" + + (occurredFrom == null ? "BEGIN" : occurredFrom.toString()) + ":" + + (occurredTo == null ? "END" : occurredTo.toString()); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/ImportScopeType.java b/src/main/java/at/procon/eventhub/dto/ImportScopeType.java new file mode 100644 index 0000000..aa7622e --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/ImportScopeType.java @@ -0,0 +1,9 @@ +package at.procon.eventhub.dto; + +public enum ImportScopeType { + /** Import all data available through the configured tenant/source. */ + TENANT_ALL, + + /** Import only data related to one source organisation, optionally including child organisations. */ + SOURCE_ORGANISATION_SUBTREE +} diff --git a/src/main/java/at/procon/eventhub/dto/SourceGroupRefDto.java b/src/main/java/at/procon/eventhub/dto/SourceGroupRefDto.java new file mode 100644 index 0000000..eb348ed --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/SourceGroupRefDto.java @@ -0,0 +1,35 @@ +package at.procon.eventhub.dto; + +/** + * Source-side grouping information. + * + * For tachograph imports this is usually an ORGANISATION from the source DB. + * For YellowFox this is usually a provider FLEET. The value is acquisition + * context and can later be mapped to an internal tenant organisation. + */ +public record SourceGroupRefDto( + SourceGroupType type, + String sourceEntityId, + String code, + String name +) { + public SourceGroupRefDto { + sourceEntityId = normalizeNullable(sourceEntityId); + code = normalizeNullable(code); + name = normalizeNullable(name); + } + + public boolean hasValue() { + return type != null || sourceEntityId != null || code != null || name != null; + } + + public String stableKey() { + return (type == null ? "" : type.name()) + "|" + + (sourceEntityId == null ? "" : sourceEntityId) + "|" + + (code == null ? "" : code); + } + + private static String normalizeNullable(String value) { + return value == null || value.isBlank() ? null : value.trim(); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/SourceGroupType.java b/src/main/java/at/procon/eventhub/dto/SourceGroupType.java new file mode 100644 index 0000000..a404250 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/SourceGroupType.java @@ -0,0 +1,7 @@ +package at.procon.eventhub.dto; + +public enum SourceGroupType { + ORGANISATION, + FLEET, + ALL +} diff --git a/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java b/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java index 6b715a6..867f1d8 100644 --- a/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java +++ b/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java @@ -9,7 +9,8 @@ import jakarta.validation.Valid; public record VehicleRefDto( String sourceEntityId, String vin, - @Valid VehicleRegistrationRefDto vehicleRegistration + @Valid VehicleRegistrationRefDto vehicleRegistration, + @Valid SourceGroupRefDto sourceOrganisation ) { public VehicleRefDto { sourceEntityId = normalizeNullable(sourceEntityId); diff --git a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java index b905a67..faba112 100644 --- a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java @@ -3,6 +3,8 @@ package at.procon.eventhub.persistence; import at.procon.eventhub.dto.DataPackageStatus; import at.procon.eventhub.dto.DataPackageType; import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.OffsetDateTime; @@ -27,18 +29,25 @@ public class DataPackageRepository { String packageKey, EventHubPackageRequest packageInfo, DataPackageType packageType, - OffsetDateTime requestedFrom, - OffsetDateTime requestedTo, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, Map metadata ) { UUID id = UUID.randomUUID(); + SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup(); + ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope(); + SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation(); + jdbcTemplate.update( """ insert into eventhub.data_package( id, event_source_id, tenant_key, package_key, package_type, status, + source_group_type, source_group_entity_id, source_group_code, source_group_name, + import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name, + include_children, occurred_from, occurred_to, event_family, business_date, external_package_id, - requested_from, requested_to, received_at, event_count, metadata - ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), 0, ?::jsonb) + received_at, event_count, metadata + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), 0, ?::jsonb) """, ps -> { ps.setObject(1, id); @@ -47,12 +56,21 @@ public class DataPackageRepository { ps.setString(4, packageKey); ps.setString(5, packageType.name()); ps.setString(6, DataPackageStatus.IMPORTING.name()); - ps.setString(7, packageInfo == null ? null : packageInfo.eventFamily()); - ps.setObject(8, packageInfo == null ? null : packageInfo.businessDate()); - ps.setString(9, packageInfo == null ? packageKey : packageInfo.externalPackageId()); - ps.setObject(10, requestedFrom); - ps.setObject(11, requestedTo); - ps.setString(12, toJson(metadata)); + ps.setString(7, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name()); + ps.setString(8, sourceGroup == null ? null : sourceGroup.sourceEntityId()); + ps.setString(9, sourceGroup == null ? null : sourceGroup.code()); + ps.setString(10, sourceGroup == null ? null : sourceGroup.name()); + ps.setString(11, importScope == null ? null : importScope.type().name()); + ps.setString(12, rootOrg == null ? null : rootOrg.sourceEntityId()); + ps.setString(13, rootOrg == null ? null : rootOrg.code()); + ps.setString(14, rootOrg == null ? null : rootOrg.name()); + ps.setBoolean(15, importScope != null && importScope.includeChildren()); + ps.setObject(16, occurredFrom); + ps.setObject(17, occurredTo); + ps.setString(18, packageInfo == null ? null : packageInfo.eventFamily()); + ps.setObject(19, packageInfo == null ? null : packageInfo.businessDate()); + ps.setString(20, packageInfo == null ? packageKey : packageInfo.externalPackageId()); + ps.setString(21, toJson(metadata)); } ); return id; diff --git a/src/main/java/at/procon/eventhub/persistence/EventRepository.java b/src/main/java/at/procon/eventhub/persistence/EventRepository.java index 04a4c8b..f2fe1cc 100644 --- a/src/main/java/at/procon/eventhub/persistence/EventRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/EventRepository.java @@ -3,9 +3,10 @@ package at.procon.eventhub.persistence; import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.SourceGroupRefDto; import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto; -import at.procon.eventhub.service.EventNaturalKeyService; +import at.procon.eventhub.service.EventAcquisitionRecordKeyService; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -23,19 +24,21 @@ public class EventRepository { private final JdbcTemplate jdbcTemplate; private final ObjectMapper objectMapper; - private final EventNaturalKeyService naturalKeyService; + private final EventAcquisitionRecordKeyService recordKeyService; - public EventRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, EventNaturalKeyService naturalKeyService) { + public EventRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, EventAcquisitionRecordKeyService recordKeyService) { this.jdbcTemplate = jdbcTemplate; this.objectMapper = objectMapper; - this.naturalKeyService = naturalKeyService; + this.recordKeyService = recordKeyService; } /** - * Temporary acquisition-stage persistence. The canonical storage model will - * be finalized later; for now this table keeps acquired point events with - * EventSource context, source-side driver/vehicle refs, generic eventDetails, - * and raw payload JSON. + * Acquisition-stage persistence. This table stores source records as imported. + * It does not merge or deduplicate equivalent events from different sources; + * later query/read models can combine sources when a preferred source has gaps. + * For now this table keeps acquired point events with EventSource context, + * source-side driver/vehicle refs, source organisation information, generic + * eventDetails, and raw payload JSON. */ public int batchInsert(UUID packageId, int eventSourceId, List events) { int[] counts = jdbcTemplate.batchUpdate( @@ -44,20 +47,24 @@ public class EventRepository { id, event_source_id, data_package_id, external_source_event_id, driver_source_entity_id, driver_card_nation, driver_card_number, + driver_source_org_entity_id, driver_source_org_code, driver_source_org_name, vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, + vehicle_source_org_entity_id, vehicle_source_org_code, vehicle_source_org_name, occurred_at, received_partner_at, received_hub_at, event_domain, event_type, lifecycle, odometer_m, latitude, longitude, event_details, payload, manual_entry, - canonical_key_hash, source_record_key_hash + source_record_key_hash, event_signature_hash ) values ( ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?::jsonb, ?::jsonb, ?, ?, ? ) @@ -71,8 +78,10 @@ public class EventRepository { OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt(); DriverRefDto driverRef = event.driverRef(); DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard(); + SourceGroupRefDto driverOrg = driverRef == null ? null : driverRef.sourceOrganisation(); VehicleRefDto vehicleRef = event.vehicleRef(); VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration(); + SourceGroupRefDto vehicleOrg = vehicleRef == null ? null : vehicleRef.sourceOrganisation(); ps.setObject(1, eventId); ps.setInt(2, eventSourceId); @@ -82,31 +91,37 @@ public class EventRepository { ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId()); ps.setString(6, driverCard == null ? null : driverCard.nation()); ps.setString(7, driverCard == null ? null : driverCard.number()); + ps.setString(8, driverOrg == null ? null : driverOrg.sourceEntityId()); + ps.setString(9, driverOrg == null ? null : driverOrg.code()); + ps.setString(10, driverOrg == null ? null : driverOrg.name()); - ps.setString(8, vehicleRef == null ? null : vehicleRef.sourceEntityId()); - ps.setString(9, vehicleRef == null ? null : vehicleRef.vin()); - ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation()); - ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number()); + ps.setString(11, vehicleRef == null ? null : vehicleRef.sourceEntityId()); + ps.setString(12, vehicleRef == null ? null : vehicleRef.vin()); + ps.setString(13, vehicleRegistration == null ? null : vehicleRegistration.nation()); + ps.setString(14, vehicleRegistration == null ? null : vehicleRegistration.number()); + ps.setString(15, vehicleOrg == null ? null : vehicleOrg.sourceEntityId()); + ps.setString(16, vehicleOrg == null ? null : vehicleOrg.code()); + ps.setString(17, vehicleOrg == null ? null : vehicleOrg.name()); - ps.setObject(12, event.occurredAt()); - ps.setObject(13, event.receivedPartnerAt()); - ps.setObject(14, receivedHubAt); - ps.setString(15, event.eventDomain().name()); - ps.setString(16, event.eventType().name()); - ps.setString(17, event.lifecycle().name()); - setNullableLong(ps, 18, event.odometerM()); + ps.setObject(18, event.occurredAt()); + ps.setObject(19, event.receivedPartnerAt()); + ps.setObject(20, receivedHubAt); + ps.setString(21, event.eventDomain().name()); + ps.setString(22, event.eventType().name()); + ps.setString(23, event.lifecycle().name()); + setNullableLong(ps, 24, event.odometerM()); if (event.position() == null) { - ps.setNull(19, Types.NUMERIC); - ps.setNull(20, Types.NUMERIC); + ps.setNull(25, Types.NUMERIC); + ps.setNull(26, Types.NUMERIC); } else { - ps.setObject(19, event.position().latitude()); - ps.setObject(20, event.position().longitude()); + ps.setObject(25, event.position().latitude()); + ps.setObject(26, event.position().longitude()); } - ps.setString(21, toJson(objectMapper.valueToTree(event.eventDetails()))); - ps.setString(22, toJson(event.payload())); - ps.setBoolean(23, event.manualEntry()); - ps.setString(24, naturalKeyService.buildCanonicalKeyHash(event)); - ps.setString(25, naturalKeyService.buildSourceRecordKeyHash(event, eventSourceId)); + ps.setString(27, toJson(objectMapper.valueToTree(event.eventDetails()))); + ps.setString(28, toJson(event.payload())); + ps.setBoolean(29, event.manualEntry()); + ps.setString(30, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId)); + ps.setString(31, recordKeyService.buildEventSignatureHash(event)); } @Override diff --git a/src/main/java/at/procon/eventhub/service/EventAcquisitionRecordKeyService.java b/src/main/java/at/procon/eventhub/service/EventAcquisitionRecordKeyService.java new file mode 100644 index 0000000..f2d6863 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/EventAcquisitionRecordKeyService.java @@ -0,0 +1,143 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class EventAcquisitionRecordKeyService { + + /** + * Source-record key is used only for acquisition idempotency of the same source record. + * It does not merge or deduplicate equivalent events from different providers/sources. + */ + public String buildSourceRecordKeyHash(EventHubEventDto event, int eventSourceId) { + String sourceRecordKey = String.join("|", + nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()), + String.valueOf(eventSourceId), + nullToEmpty(event.externalSourceEventId()) + ); + return sha256Hex(sourceRecordKey); + } + + /** + * Event signature is a non-unique acquisition-time semantic key. + * + * It intentionally excludes EventSource and externalSourceEventId. Multiple source records + * may have the same signature and still must be stored separately. The signature is only a + * later query/projection hint for comparing sources, filling gaps, or building merged views. + */ + public String buildEventSignatureHash(EventHubEventDto event) { + String signature = String.join("|", + nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()), + driverSignatureKey(event), + vehicleSignatureKey(event), + normalizeTime(event.occurredAt()), + event.eventDomain() == null ? "" : event.eventDomain().name(), + event.eventType() == null ? "" : event.eventType().name(), + event.lifecycle() == null ? "" : event.lifecycle().name(), + event.eventDetails() == null ? "" : nullToEmpty(event.eventDetails().type()), + canonicalJson(event.eventDetails() == null ? null : event.eventDetails().attributes()), + event.position() == null ? "" : nullToEmpty(event.position().latitude()) + ":" + nullToEmpty(event.position().longitude()) + ); + return sha256Hex(signature); + } + + private String driverSignatureKey(EventHubEventDto event) { + if (event.driverRef() == null) { + return ""; + } + if (event.driverRef().driverCard() != null && event.driverRef().driverCard().hasValue()) { + return "CARD:" + event.driverRef().driverCard().stableKey(); + } + return "SOURCE_DRIVER:" + nullToEmpty(event.driverRef().sourceEntityId()); + } + + private String vehicleSignatureKey(EventHubEventDto event) { + if (event.vehicleRef() == null) { + return ""; + } + if (event.vehicleRef().vehicleRegistration() != null && event.vehicleRef().vehicleRegistration().hasValue()) { + return "VRN:" + event.vehicleRef().vehicleRegistration().stableKey(); + } + if (event.vehicleRef().vin() != null && !event.vehicleRef().vin().isBlank()) { + return "VIN:" + event.vehicleRef().vin(); + } + return "SOURCE_VEHICLE:" + nullToEmpty(event.vehicleRef().sourceEntityId()); + } + + private String normalizeTime(OffsetDateTime value) { + return value == null ? "" : value.toInstant().toString(); + } + + private String nullToEmpty(Object value) { + return value == null ? "" : String.valueOf(value); + } + + private String canonicalJson(JsonNode node) { + if (node == null || node.isNull()) { + return ""; + } + if (node.isObject()) { + ObjectNode objectNode = (ObjectNode) node; + List> fields = new ArrayList<>(); + Iterator> iterator = objectNode.fields(); + iterator.forEachRemaining(fields::add); + fields.sort(Comparator.comparing(Map.Entry::getKey)); + StringBuilder result = new StringBuilder("{"); + boolean first = true; + for (Map.Entry field : fields) { + if (!first) { + result.append(','); + } + first = false; + result.append(escape(field.getKey())).append(':').append(canonicalJson(field.getValue())); + } + return result.append('}').toString(); + } + if (node.isArray()) { + ArrayNode arrayNode = (ArrayNode) node; + StringBuilder result = new StringBuilder("["); + for (int i = 0; i < arrayNode.size(); i++) { + if (i > 0) { + result.append(','); + } + result.append(canonicalJson(arrayNode.get(i))); + } + return result.append(']').toString(); + } + if (node.isTextual()) { + return escape(node.asText()); + } + return node.toString(); + } + + private String escape(String value) { + return '"' + value.replace("\\", "\\\\").replace("\"", "\\\"") + '"'; + } + + private String sha256Hex(String value) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(value.getBytes(StandardCharsets.UTF_8)); + StringBuilder result = new StringBuilder(hash.length * 2); + for (byte b : hash) { + result.append(String.format("%02x", b)); + } + return result.toString(); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 is not available", e); + } + } +} diff --git a/src/main/java/at/procon/eventhub/service/EventHubEventValidator.java b/src/main/java/at/procon/eventhub/service/EventHubEventValidator.java index 880522f..c50b501 100644 --- a/src/main/java/at/procon/eventhub/service/EventHubEventValidator.java +++ b/src/main/java/at/procon/eventhub/service/EventHubEventValidator.java @@ -2,6 +2,8 @@ package at.procon.eventhub.service; import at.procon.eventhub.dto.EventDomain; import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.ImportScopeType; +import at.procon.eventhub.dto.SourceGroupType; import org.springframework.stereotype.Component; @Component @@ -28,6 +30,7 @@ public class EventHubEventValidator { if (event.packageInfo().eventSource() == null) { throw new IllegalArgumentException("packageInfo.eventSource must be set"); } + validateImportScope(event); if (event.eventDomain() == null) { throw new IllegalArgumentException("eventDomain must be set"); } @@ -43,6 +46,23 @@ public class EventHubEventValidator { validateEventDetails(event); } + private void validateImportScope(EventHubEventDto event) { + var importScope = event.packageInfo().importScope(); + if (importScope == null) { + throw new IllegalArgumentException("packageInfo.importScope must be set"); + } + if (importScope.occurredFrom() != null && importScope.occurredTo() != null + && !importScope.occurredFrom().isBefore(importScope.occurredTo())) { + throw new IllegalArgumentException("importScope.occurredFrom must be before occurredTo"); + } + if (importScope.type() == ImportScopeType.SOURCE_ORGANISATION_SUBTREE) { + if (importScope.rootSourceOrganisation() == null + || importScope.rootSourceOrganisation().type() != SourceGroupType.ORGANISATION) { + throw new IllegalArgumentException("SOURCE_ORGANISATION_SUBTREE requires rootSourceOrganisation.type=ORGANISATION"); + } + } + } + private void validateEventDetails(EventHubEventDto event) { if (event.eventDetails() == null) { return; diff --git a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java index 5c64a1d..e8f7177 100644 --- a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java +++ b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java @@ -64,8 +64,8 @@ public class EventHubIngestionService { batch.packageKey(), packageInfo, batch.packageType(), - batch.requestedFrom(), - batch.requestedTo(), + batch.occurredFrom(), + batch.occurredTo(), batch.metadata() ); diff --git a/src/main/java/at/procon/eventhub/service/EventHubPackageKeyBuilder.java b/src/main/java/at/procon/eventhub/service/EventHubPackageKeyBuilder.java index 5beb676..bdae62a 100644 --- a/src/main/java/at/procon/eventhub/service/EventHubPackageKeyBuilder.java +++ b/src/main/java/at/procon/eventhub/service/EventHubPackageKeyBuilder.java @@ -12,6 +12,8 @@ public class EventHubPackageKeyBuilder { if (packageInfo != null) { return packageInfo.tenantKey() + ":" + packageInfo.eventSource().stableKey() + + ":" + (packageInfo.sourceGroup() == null ? "NO_GROUP" : packageInfo.sourceGroup().stableKey()) + + ":" + (packageInfo.importScope() == null ? "NO_SCOPE" : packageInfo.importScope().stableKey()) + ":" + packageInfo.eventFamily() + ":" + (packageInfo.businessDate() == null ? "NO_DATE" : packageInfo.businessDate()) + ":" + packageInfo.externalPackageId(); diff --git a/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java b/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java index 9db4835..3715659 100644 --- a/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java +++ b/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java @@ -6,6 +6,7 @@ import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventLifecycle; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.source.TachographActivityDto; import java.time.LocalDate; import java.time.OffsetDateTime; @@ -37,13 +38,15 @@ public class TachographActivityEventMapper { null ); LocalDate businessDate = source.occurredAt().toLocalDate(); + var occurredFrom = businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); + var occurredTo = businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); EventHubPackageRequest packageInfo = new EventHubPackageRequest( tenantOrDefault(source.tenantKey()), eventSource, + null, + ImportScopeDto.tenantAll(occurredFrom, occurredTo), EventDomain.DRIVER_ACTIVITY.name(), businessDate, - businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), - businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), eventSource.stableKey() + ":DRIVER_ACTIVITY:" + businessDate ); diff --git a/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java b/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java index b1c0cd6..2797d5d 100644 --- a/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java +++ b/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java @@ -7,6 +7,9 @@ import at.procon.eventhub.dto.EventLifecycle; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.EventType; import at.procon.eventhub.dto.GeoPointDto; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import at.procon.eventhub.dto.SourceGroupType; import at.procon.eventhub.dto.source.TelematicsPositionDto; import java.time.LocalDate; import java.time.OffsetDateTime; @@ -34,13 +37,18 @@ public class TelematicsPositionEventMapper { source.externalFleetKey() ); LocalDate businessDate = source.occurredAt().toLocalDate(); + var occurredFrom = businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); + var occurredTo = businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); + SourceGroupRefDto sourceGroup = source.externalFleetKey() == null || source.externalFleetKey().isBlank() + ? null + : new SourceGroupRefDto(SourceGroupType.FLEET, source.externalFleetKey(), source.externalFleetKey(), null); EventHubPackageRequest packageInfo = new EventHubPackageRequest( tenantOrDefault(source.tenantKey()), eventSource, + sourceGroup, + ImportScopeDto.tenantAll(occurredFrom, occurredTo), EventDomain.POSITION.name(), businessDate, - businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), - businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), eventSource.stableKey() + ":POSITION:" + businessDate ); diff --git a/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java b/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java index d7acc8c..fdb924b 100644 --- a/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java +++ b/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java @@ -10,6 +10,9 @@ import at.procon.eventhub.dto.EventLifecycle; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.EventType; import at.procon.eventhub.dto.GeoPointDto; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import at.procon.eventhub.dto.SourceGroupType; import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; import java.time.LocalDate; import java.time.OffsetDateTime; @@ -48,13 +51,18 @@ public class YellowFoxD8BookingEventMapper { source.externalFleetKey() ); LocalDate businessDate = source.occurredAt().toLocalDate(); + var occurredFrom = businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); + var occurredTo = businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(); + SourceGroupRefDto sourceGroup = source.externalFleetKey() == null || source.externalFleetKey().isBlank() + ? null + : new SourceGroupRefDto(SourceGroupType.FLEET, source.externalFleetKey(), source.externalFleetKey(), null); EventHubPackageRequest packageInfo = new EventHubPackageRequest( tenantOrDefault(source.tenantKey()), eventSource, + sourceGroup, + ImportScopeDto.tenantAll(occurredFrom, occurredTo), normalized.domain().name(), businessDate, - businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), - businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(), eventSource.stableKey() + ":" + normalized.domain().name() + ":" + businessDate ); diff --git a/src/main/resources/db/migration/V1__create_eventhub_schema.sql b/src/main/resources/db/migration/V1__create_eventhub_schema.sql index 40aaf7b..0a5c10f 100644 --- a/src/main/resources/db/migration/V1__create_eventhub_schema.sql +++ b/src/main/resources/db/migration/V1__create_eventhub_schema.sql @@ -3,7 +3,7 @@ create extension if not exists pgcrypto; create schema if not exists eventhub; -- Acquisition source definition. This represents where the imported source --- record came from, not necessarily the canonical identity of the real-world event. +-- record came from. Source records are intentionally kept separately by provider/source. create table if not exists eventhub.event_source ( id integer generated always as identity primary key, provider_key text not null, @@ -16,8 +16,8 @@ create table if not exists eventhub.event_source ( constraint ux_event_source unique (provider_key, source_kind, source_key, source_instance_key) ); --- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/2026-04-28. --- Final canonical storage can be discussed later; this table is still useful for acquisition audit. +-- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/import scope. +-- This table captures the source grouping and the organisation/time import scope used for acquisition. create table if not exists eventhub.data_package ( id uuid primary key, event_source_id integer not null references eventhub.event_source(id), @@ -25,24 +25,36 @@ create table if not exists eventhub.data_package ( package_key text not null, package_type text not null, status text not null, + + source_group_type text, + source_group_entity_id text, + source_group_code text, + source_group_name text, + + import_scope_type text, + root_source_org_entity_id text, + root_source_org_code text, + root_source_org_name text, + include_children boolean not null default false, + occurred_from timestamptz, + occurred_to timestamptz, + event_family text, business_date date, external_package_id text, - requested_from timestamptz, - requested_to timestamptz, received_at timestamptz not null default now(), completed_at timestamptz, event_count integer not null default 0, metadata jsonb not null default '{}'::jsonb, error_message text, - constraint ux_data_package_external unique (tenant_key, event_source_id, external_package_id, received_at) + constraint ux_data_package_external unique (tenant_key, event_source_id, external_package_id, received_at), + constraint chk_data_package_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to) ); -- Temporary acquisition-stage point-event store. -- It keeps the discussed DTO shape: EventSource context, externalSourceEventId, --- one occurredAt timestamp, source-side driver/vehicle refs, normalized event details, --- and raw JSON payload. It intentionally has no internal driver_id/vehicle_id in the --- incoming model; master-data resolution can be added later. +-- one occurredAt timestamp, source-side driver/vehicle refs, source-side organisation assignments, +-- normalized event details, and raw JSON payload. create table if not exists eventhub.acquired_event ( id uuid not null, event_source_id integer not null references eventhub.event_source(id), @@ -53,11 +65,17 @@ create table if not exists eventhub.acquired_event ( driver_source_entity_id text, driver_card_nation text, driver_card_number text, + driver_source_org_entity_id text, + driver_source_org_code text, + driver_source_org_name text, vehicle_source_entity_id text, vehicle_vin text, vehicle_registration_nation text, vehicle_registration_number text, + vehicle_source_org_entity_id text, + vehicle_source_org_code text, + vehicle_source_org_name text, occurred_at timestamptz not null, received_partner_at timestamptz, @@ -75,12 +93,15 @@ create table if not exists eventhub.acquired_event ( payload jsonb not null default '{}'::jsonb, manual_entry boolean not null default false, - -- Excludes EventSource: useful later for canonical event deduplication. - canonical_key_hash text not null, - - -- Includes tenant + EventSource + externalSourceEventId: prevents duplicate imports of the same source record. + -- Includes tenant + EventSource + externalSourceEventId. Used only for source-record import idempotency. + -- It does not merge equivalent events from different providers/sources. source_record_key_hash text not null, + -- Non-unique semantic acquisition signature. It intentionally excludes EventSource and + -- externalSourceEventId and is only a later query/projection hint for source comparison, + -- gap filling, and merged timelines. It must not be used for import deduplication. + event_signature_hash text, + created_at timestamptz not null default now(), constraint pk_acquired_event primary key (occurred_at, id), @@ -99,8 +120,9 @@ create table if not exists eventhub.acquired_event ( create unique index if not exists ux_acquired_event_source_record on eventhub.acquired_event(source_record_key_hash); -create index if not exists idx_acquired_event_canonical_key - on eventhub.acquired_event(canonical_key_hash); +create index if not exists idx_acquired_event_signature + on eventhub.acquired_event(event_signature_hash) + where event_signature_hash is not null; create index if not exists idx_acquired_event_vehicle_vin_time on eventhub.acquired_event(vehicle_vin, occurred_at desc) @@ -114,6 +136,14 @@ create index if not exists idx_acquired_event_driver_card_time on eventhub.acquired_event(driver_card_nation, driver_card_number, occurred_at desc) where driver_card_number is not null; +create index if not exists idx_acquired_event_driver_org_time + on eventhub.acquired_event(driver_source_org_entity_id, occurred_at desc) + where driver_source_org_entity_id is not null; + +create index if not exists idx_acquired_event_vehicle_org_time + on eventhub.acquired_event(vehicle_source_org_entity_id, occurred_at desc) + where vehicle_source_org_entity_id is not null; + create index if not exists idx_acquired_event_domain_type_time on eventhub.acquired_event(event_domain, event_type, occurred_at desc); @@ -125,3 +155,6 @@ create index if not exists idx_acquired_event_payload_gin create index if not exists idx_data_package_source_time on eventhub.data_package(tenant_key, event_source_id, received_at desc); + +create index if not exists idx_data_package_scope + on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to); diff --git a/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java b/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java new file mode 100644 index 0000000..bcb8ae3 --- /dev/null +++ b/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java @@ -0,0 +1,73 @@ +package at.procon.eventhub; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDetailsDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.service.EventAcquisitionRecordKeyService; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.OffsetDateTime; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class EventAcquisitionRecordKeyServiceTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final EventAcquisitionRecordKeyService service = new EventAcquisitionRecordKeyService(); + + @Test + void eventSignatureExcludesSourceAndExternalSourceEventId() { + EventHubEventDto vuEvent = event( + "TACHOGRAPH:VU:activity:456:start", + new EventSourceDto("TACHOGRAPH", "VEHICLE_UNIT", "TACHOGRAPH_VU", "tachograph-db", null, null) + ); + EventHubEventDto cardEvent = event( + "TACHOGRAPH:CARD:activity:789:start", + new EventSourceDto("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD", "tachograph-db", null, null) + ); + + assertThat(service.buildSourceRecordKeyHash(vuEvent, 1)) + .isNotEqualTo(service.buildSourceRecordKeyHash(cardEvent, 2)); + assertThat(service.buildEventSignatureHash(vuEvent)) + .isEqualTo(service.buildEventSignatureHash(cardEvent)); + } + + private EventHubEventDto event(String externalSourceEventId, EventSourceDto eventSource) { + EventHubPackageRequest packageInfo = new EventHubPackageRequest( + "tenant-1", + eventSource, + null, + ImportScopeDto.tenantAll(null, null), + "DRIVER_ACTIVITY", + null, + eventSource.sourceKey() + ":package" + ); + return new EventHubEventDto( + null, + externalSourceEventId, + new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789"), null), + new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345"), null), + OffsetDateTime.parse("2026-04-28T08:00:00+02:00"), + null, + null, + EventDomain.DRIVER_ACTIVITY, + EventType.DRIVE, + EventLifecycle.START, + null, + null, + new EventDetailsDto("DRIVER_ACTIVITY", objectMapper.createObjectNode().put("cardSlot", "DRIVER")), + null, + false, + packageInfo + ); + } +} diff --git a/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java index cabfa2b..b31b1d9 100644 --- a/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java +++ b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java @@ -35,8 +35,8 @@ class YellowFoxD8BookingEventMapperTest { 3, OffsetDateTime.parse("2026-04-29T08:15:00+02:00"), null, - new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")), - new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345")), + new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789"), null), + new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345"), null), null, null, null,