Add acquisition scope and source reference persistence

This commit is contained in:
trifonovt 2026-04-30 11:06:56 +02:00
parent 0a0e2dc615
commit 230ae1987d
24 changed files with 701 additions and 134 deletions

236
README.md
View File

@ -2,7 +2,7 @@
Spring Boot + Apache Camel project skeleton for acquiring normalized EventHub point events from multiple providers/sources. Spring Boot + Apache Camel project skeleton for acquiring normalized EventHub point events from multiple providers/sources.
The current version intentionally focuses on **acquisition**. Final canonical storage/deduplication can be discussed later. The included PostgreSQL schema is a small acquisition-stage store so the project can be run and tested end-to-end. The current version intentionally focuses on **acquisition**. It stores source records as imported and does not merge or deduplicate equivalent events from different providers/sources. It does, however, keep a non-unique eventSignatureHash as a later merge/gap-filling hint. Later query/read models can merge sources when a preferred/main source contains gaps. The included PostgreSQL schema is a small acquisition-stage store so the project can be run and tested end-to-end.
## Architecture ## Architecture
@ -12,7 +12,7 @@ source-specific Camel input route
-> EventHubEventDto -> EventHubEventDto
-> common EventHub acquisition route -> common EventHub acquisition route
-> validation -> validation
-> package-key creation from tenant + EventSource + event family + date/window -> package-key creation from tenant + EventSource + source group + import scope + event family
-> aggregation / batching -> aggregation / batching
-> chronological sorting inside the batch -> chronological sorting inside the batch
-> acquisition package handoff -> acquisition package handoff
@ -40,17 +40,7 @@ There is no generic `duration`, `endTime`, `validFrom`, or `validTo`. If a sourc
`tenantKey` identifies the owner/client/account for the package. It is required for acquisition grouping and future master-data resolution. `tenantKey` identifies the owner/client/account for the package. It is required for acquisition grouping and future master-data resolution.
```json ### 3. EventSource identifies the technical source
{
"tenantKey": "kralowetz"
}
```
Organisation is not mandatory in the incoming event. It can later be derived from resolved driver/vehicle + `occurredAt`.
### 3. EventSource replaces sourceTable/sourceSystem
The acquisition context is represented by `EventSourceDto`:
```json ```json
{ {
@ -72,13 +62,97 @@ YELLOWFOX / TELEMATICS_PLATFORM / YELLOWFOX_D8
FLEETBOARD / TELEMATICS_PLATFORM / FLEETBOARD_POSITION FLEETBOARD / TELEMATICS_PLATFORM / FLEETBOARD_POSITION
``` ```
`EventSource` is acquisition context. It should not be part of the canonical real-world event identity. A VU event and a driver-card event may describe the same real event. `EventSource` is acquisition context. A VU event, a driver-card event and a YellowFox D8 event may describe the same real-world event, but this acquisition service keeps them as separate acquired source records. Cross-source merging/gap filling is intentionally left for a later query/read model.
### 4. Source-side master references, no incoming internal IDs ### 4. No cross-source deduplication during acquisition
The acquisition layer stores every source record independently. It uses `sourceRecordKeyHash` only for idempotency of the same source event, so the same input package can be retried safely. It does **not** deduplicate VU vs driver-card vs YellowFox records.
This is intentional because later queries may need to combine sources: for example, use tachograph data as the main source, but fill gaps from YellowFox or another provider.
The acquisition table also stores a non-unique `eventSignatureHash`. This is a semantic merge hint, not a unique key. It intentionally excludes `EventSource` and `externalSourceEventId`, so VU, driver-card and YellowFox records that look like the same real-world event can share a signature while still being stored separately. Later query/projection logic can use this signature for source comparison, gap filling, and merged timelines. The signature prefers nation-scoped driver card and vehicle registration when available, then VIN or source entity id as fallback, so it remains useful before final master-data resolution.
Therefore the current model preserves:
```text
tenantKey
eventSource
sourceGroup
importScope
externalSourceEventId
source-side driver/vehicle references
eventDetails
payload
```
### 5. SourceGroup captures tachograph organisation or YellowFox fleet
`sourceGroup` is package-level source grouping information.
For tachograph it can be a source organisation:
```json
"sourceGroup": {
"type": "ORGANISATION",
"sourceEntityId": "147",
"code": "147",
"name": "Kralowetz"
}
```
For YellowFox it can be a fleet:
```json
"sourceGroup": {
"type": "FLEET",
"sourceEntityId": "7",
"code": "7",
"name": "YellowFox Fleet 7"
}
```
The YellowFox fleet belongs to the same tenant/customer, but it is not forced to be an organisation. It can later be mapped to a tenant organisation if needed.
### 6. ImportScope captures organisation and time filtering
`importScope` describes what was selected from the source system.
Full DB import:
```json
"importScope": {
"type": "TENANT_ALL",
"rootSourceOrganisation": null,
"includeChildren": false,
"occurredFrom": null,
"occurredTo": null
}
```
Organisation subtree + time-window import:
```json
"importScope": {
"type": "SOURCE_ORGANISATION_SUBTREE",
"rootSourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "147",
"code": "147",
"name": "Kralowetz"
},
"includeChildren": true,
"occurredFrom": "2026-04-28T00:00:00+02:00",
"occurredTo": "2026-04-29T00:00:00+02:00"
}
```
`occurredFrom` is inclusive and `occurredTo` is exclusive. Both may be `null` for complete source DB import.
### 7. Source-side master references, no incoming internal IDs
The incoming DTO does not require internal `driverId` or `vehicleId`, because in normal ingestion those ids are not known yet. The incoming DTO does not require internal `driverId` or `vehicleId`, because in normal ingestion those ids are not known yet.
Driver reference: Driver reference with nation-scoped driver card:
```json ```json
"driverRef": { "driverRef": {
@ -86,11 +160,17 @@ Driver reference:
"driverCard": { "driverCard": {
"nation": "AT", "nation": "AT",
"number": "D123456789" "number": "D123456789"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57",
"code": "57",
"name": "Sub Org 57"
} }
} }
``` ```
Vehicle reference: Vehicle reference with optional VIN and nation-scoped VRN:
```json ```json
"vehicleRef": { "vehicleRef": {
@ -99,6 +179,12 @@ Vehicle reference:
"vehicleRegistration": { "vehicleRegistration": {
"nation": "AT", "nation": "AT",
"number": "W-12345" "number": "W-12345"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57",
"code": "57",
"name": "Sub Org 57"
} }
} }
``` ```
@ -112,13 +198,14 @@ VIN is optional. Driver-card-only events can carry only the nation-scoped VRN/re
"vehicleRegistration": { "vehicleRegistration": {
"nation": "AT", "nation": "AT",
"number": "W-12345" "number": "W-12345"
} },
"sourceOrganisation": null
} }
``` ```
This allows late resolution when VU/master data later connects the VRN to a VIN. This allows late resolution when VU/master data later connects the VRN to a VIN.
### 5. Generic normalized eventDetails ### 8. Generic normalized eventDetails
Reusable event-specific properties are stored in: Reusable event-specific properties are stored in:
@ -133,19 +220,7 @@ Reusable event-specific properties are stored in:
} }
``` ```
Raw provider values stay in `payload`: Raw provider values stay in `payload`.
```json
"payload": {
"raw": {
"cardSlot": 0,
"cardStatus": 0,
"drivingStatus": 0
}
}
```
This keeps the acquisition DTO generic while preserving meaningful normalized fields.
## Package-level acquisition request ## Package-level acquisition request
@ -162,11 +237,27 @@ For external/manual ingestion, the preferred request shape is:
"sourceInstanceKey": "main-tachograph-db", "sourceInstanceKey": "main-tachograph-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod" "tenantProviderSettingKey": "kralowetz-tachograph-prod"
}, },
"sourceGroup": {
"type": "ORGANISATION",
"sourceEntityId": "147",
"code": "147",
"name": "Kralowetz"
},
"importScope": {
"type": "SOURCE_ORGANISATION_SUBTREE",
"rootSourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "147",
"code": "147",
"name": "Kralowetz"
},
"includeChildren": true,
"occurredFrom": "2026-04-28T00:00:00+02:00",
"occurredTo": "2026-04-29T00:00:00+02:00"
},
"eventFamily": "DRIVER_ACTIVITY", "eventFamily": "DRIVER_ACTIVITY",
"businessDate": "2026-04-28", "businessDate": "2026-04-28",
"requestedFrom": "2026-04-28T00:00:00+02:00", "externalPackageId": "TACHOGRAPH:ORG-147-SUBTREE:DRIVER_ACTIVITY:2026-04-28"
"requestedTo": "2026-04-29T00:00:00+02:00",
"externalPackageId": "TACHOGRAPH:VEHICLE_UNIT:DRIVER_ACTIVITY:2026-04-28"
}, },
"events": [ "events": [
{ {
@ -176,6 +267,10 @@ For external/manual ingestion, the preferred request shape is:
"driverCard": { "driverCard": {
"nation": "AT", "nation": "AT",
"number": "D123456789" "number": "D123456789"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57"
} }
}, },
"vehicleRef": { "vehicleRef": {
@ -184,6 +279,10 @@ For external/manual ingestion, the preferred request shape is:
"vehicleRegistration": { "vehicleRegistration": {
"nation": "AT", "nation": "AT",
"number": "W-12345" "number": "W-12345"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57"
} }
}, },
"occurredAt": "2026-04-28T08:00:00+02:00", "occurredAt": "2026-04-28T08:00:00+02:00",
@ -228,7 +327,7 @@ direct:eventhub-manual-input
```text ```text
direct:eventhub-normalized-input direct:eventhub-normalized-input
-> validate EventHubEventDto -> validate EventHubEventDto
-> create package key from tenant + EventSource/package context -> create package key from tenant + EventSource + sourceGroup + importScope + eventFamily
-> seda:eventhub-batch-input -> seda:eventhub-batch-input
-> aggregate by eventhub.packageKey -> aggregate by eventhub.packageKey
-> sort by occurredAt inside the batch -> sort by occurredAt inside the batch
@ -262,6 +361,10 @@ curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activitie
"driverCard": { "driverCard": {
"nation": "AT", "nation": "AT",
"number": "D123456789" "number": "D123456789"
},
"sourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "57"
} }
}, },
"vehicleRef": { "vehicleRef": {
@ -270,7 +373,8 @@ curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activitie
"vehicleRegistration": { "vehicleRegistration": {
"nation": "AT", "nation": "AT",
"number": "W-12345" "number": "W-12345"
} },
"sourceOrganisation": null
}, },
"occurredAt": "2026-04-28T08:00:00+02:00", "occurredAt": "2026-04-28T08:00:00+02:00",
"activityType": "DRIVE", "activityType": "DRIVE",
@ -290,16 +394,35 @@ curl -X POST http://localhost:8080/api/eventhub/acquisition/tachograph/activitie
]' ]'
``` ```
The mapper creates: The mapper creates a default `TENANT_ALL` one-day import scope for this convenience endpoint. For real tachograph import jobs with organisation subtree/full DB scope, use the package-level request or add dedicated SQL extraction job routes.
```text ## Example: full tachograph DB import package
Tenant = kralowetz
EventSource = TACHOGRAPH / DRIVER_CARD / TACHOGRAPH_DRIVER_CARD ```json
EventDomain = DRIVER_ACTIVITY {
EventType = DRIVE "package": {
Lifecycle = START "tenantKey": "kralowetz",
EventDetails.type = DRIVER_ACTIVITY "eventSource": {
VehicleRef = VRN-only, VIN can be resolved later "providerKey": "TACHOGRAPH",
"sourceKind": "VEHICLE_UNIT",
"sourceKey": "TACHOGRAPH_VEHICLE_UNIT",
"sourceInstanceKey": "main-tachograph-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod"
},
"sourceGroup": null,
"importScope": {
"type": "TENANT_ALL",
"rootSourceOrganisation": null,
"includeChildren": false,
"occurredFrom": null,
"occurredTo": null
},
"eventFamily": "DRIVER_ACTIVITY",
"businessDate": null,
"externalPackageId": "TACHOGRAPH:ALL:DRIVER_ACTIVITY:FULL"
},
"events": []
}
``` ```
## Start PostgreSQL ## Start PostgreSQL
@ -322,6 +445,12 @@ select p.received_at,
s.provider_key, s.provider_key,
s.source_kind, s.source_kind,
s.source_key, s.source_key,
p.source_group_type,
p.source_group_entity_id,
p.import_scope_type,
p.root_source_org_entity_id,
p.occurred_from,
p.occurred_to,
p.event_family, p.event_family,
p.business_date, p.business_date,
p.status, p.status,
@ -338,10 +467,12 @@ select occurred_at,
driver_source_entity_id, driver_source_entity_id,
driver_card_nation, driver_card_nation,
driver_card_number, driver_card_number,
driver_source_org_entity_id,
vehicle_source_entity_id, vehicle_source_entity_id,
vehicle_vin, vehicle_vin,
vehicle_registration_nation, vehicle_registration_nation,
vehicle_registration_number, vehicle_registration_number,
vehicle_source_org_entity_id,
event_domain, event_domain,
event_type, event_type,
lifecycle, lifecycle,
@ -361,12 +492,15 @@ order by occurred_at desc;
- load/unload - load/unload
- specific conditions: out-of-scope and ferry/train - specific conditions: out-of-scope and ferry/train
- speeding events - speeding events
2. Keep each extractor package-scoped by `tenant + EventSource + eventFamily + businessDate/import window`. 2. Each SQL extraction route should accept `ImportScopeDto`:
- optional source organisation root + include children
- optional occurredFrom/occurredTo
- null time bounds mean complete DB/history import
3. Add master-data resolution later: 3. Add master-data resolution later:
- driver by tenant + driver card nation/number + occurredAt - driver by tenant + driver card nation/number + occurredAt
- vehicle by tenant + VIN or tenant + registration nation/number + occurredAt - vehicle by tenant + VIN or tenant + registration nation/number + occurredAt
- late resolution from VRN-only driver-card events to VIN after VU/master data import - late resolution from VRN-only driver-card events to VIN after VU/master data import
4. Discuss final storage model: 4. Discuss query/read models later:
- canonical `eventhub.event` - how to merge acquired events from all sources at query time
- source-record table linked to EventSource/package - source priority per event family when the main source contains gaps
- deduplication policy for VU vs driver-card duplicates - how to expose source provenance when multiple sources describe the same real-world event

View File

@ -4,6 +4,7 @@ import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubEventBatchDto; import at.procon.eventhub.dto.EventHubEventBatchDto;
import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.service.EventHubEventSorter; import at.procon.eventhub.service.EventHubEventSorter;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.HashMap; import java.util.HashMap;
@ -34,11 +35,12 @@ public class EventHubBatchBuildProcessor implements Processor {
packageInfo = sortedEvents.getFirst().packageInfo(); packageInfo = sortedEvents.getFirst().packageInfo();
} }
OffsetDateTime requestedFrom = packageInfo != null && packageInfo.requestedFrom() != null ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope();
? packageInfo.requestedFrom() OffsetDateTime occurredFrom = importScope != null && importScope.occurredFrom() != null
? importScope.occurredFrom()
: sortedEvents.getFirst().occurredAt(); : sortedEvents.getFirst().occurredAt();
OffsetDateTime requestedTo = packageInfo != null && packageInfo.requestedTo() != null OffsetDateTime occurredTo = importScope != null && importScope.occurredTo() != null
? packageInfo.requestedTo() ? importScope.occurredTo()
: sortedEvents.getLast().occurredAt(); : sortedEvents.getLast().occurredAt();
Map<String, Object> metadata = new HashMap<>(); Map<String, Object> metadata = new HashMap<>();
@ -48,6 +50,8 @@ public class EventHubBatchBuildProcessor implements Processor {
if (packageInfo != null) { if (packageInfo != null) {
metadata.put("tenantKey", packageInfo.tenantKey()); metadata.put("tenantKey", packageInfo.tenantKey());
metadata.put("eventSource", packageInfo.eventSource().stableKey()); metadata.put("eventSource", packageInfo.eventSource().stableKey());
metadata.put("sourceGroup", packageInfo.sourceGroup() == null ? null : packageInfo.sourceGroup().stableKey());
metadata.put("importScope", packageInfo.importScope() == null ? null : packageInfo.importScope().stableKey());
metadata.put("eventFamily", packageInfo.eventFamily()); metadata.put("eventFamily", packageInfo.eventFamily());
metadata.put("businessDate", packageInfo.businessDate()); metadata.put("businessDate", packageInfo.businessDate());
metadata.put("externalPackageId", packageInfo.externalPackageId()); metadata.put("externalPackageId", packageInfo.externalPackageId());
@ -57,8 +61,8 @@ public class EventHubBatchBuildProcessor implements Processor {
packageKey, packageKey,
packageInfo, packageInfo,
DataPackageType.CAMEL_BATCH, DataPackageType.CAMEL_BATCH,
requestedFrom, occurredFrom,
requestedTo, occurredTo,
sortedEvents, sortedEvents,
metadata metadata
)); ));

View File

@ -9,7 +9,8 @@ import jakarta.validation.Valid;
*/ */
public record DriverRefDto( public record DriverRefDto(
String sourceEntityId, String sourceEntityId,
@Valid DriverCardRefDto driverCard @Valid DriverCardRefDto driverCard,
@Valid SourceGroupRefDto sourceOrganisation
) { ) {
public DriverRefDto { public DriverRefDto {
sourceEntityId = normalizeNullable(sourceEntityId); sourceEntityId = normalizeNullable(sourceEntityId);

View File

@ -8,8 +8,8 @@ public record EventHubEventBatchDto(
String packageKey, String packageKey,
EventHubPackageRequest packageInfo, EventHubPackageRequest packageInfo,
DataPackageType packageType, DataPackageType packageType,
OffsetDateTime requestedFrom, OffsetDateTime occurredFrom,
OffsetDateTime requestedTo, OffsetDateTime occurredTo,
List<EventHubEventDto> events, List<EventHubEventDto> events,
Map<String, Object> metadata Map<String, Object> metadata
) { ) {

View File

@ -17,7 +17,7 @@ import java.util.UUID;
public record EventHubEventDto( public record EventHubEventDto(
UUID eventId, UUID eventId,
/** Stable id of this imported/source event record, not the canonical business event id. */ /** Stable id of this imported/source event record. */
@NotBlank String externalSourceEventId, @NotBlank String externalSourceEventId,
/** Source-side driver reference. No internal driver id is required during acquisition. */ /** Source-side driver reference. No internal driver id is required during acquisition. */

View File

@ -4,20 +4,24 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.OffsetDateTime;
/** /**
* Acquisition package context. One package should represent one coherent import * Acquisition package context. One package represents one coherent import unit,
* unit, e.g. tenant + TACHOGRAPH + VEHICLE_UNIT + DRIVER_ACTIVITY + business date. * for example tenant + TACHOGRAPH + VEHICLE_UNIT + DRIVER_ACTIVITY + import scope.
*/ */
public record EventHubPackageRequest( public record EventHubPackageRequest(
/** Tenant/client/account owning the acquired data. */ /** Tenant/client/account owning the acquired data. */
@NotBlank String tenantKey, @NotBlank String tenantKey,
@Valid @NotNull EventSourceDto eventSource, @Valid @NotNull EventSourceDto eventSource,
/** Optional source-side group: tachograph organisation, YellowFox fleet, etc. */
@Valid SourceGroupRefDto sourceGroup,
/** Organisation/time selection used to acquire the package. */
@Valid @NotNull ImportScopeDto importScope,
@NotBlank String eventFamily, @NotBlank String eventFamily,
LocalDate businessDate, LocalDate businessDate,
OffsetDateTime requestedFrom,
OffsetDateTime requestedTo,
@NotBlank String externalPackageId @NotBlank String externalPackageId
) { ) {
public EventHubPackageRequest { public EventHubPackageRequest {
@ -25,6 +29,9 @@ public record EventHubPackageRequest(
if (eventFamily != null) { if (eventFamily != null) {
eventFamily = eventFamily.trim().toUpperCase().replace('-', '_').replace(' ', '_'); eventFamily = eventFamily.trim().toUpperCase().replace('-', '_').replace(' ', '_');
} }
if (importScope == null) {
importScope = ImportScopeDto.tenantAll(null, null);
}
} }
private static String normalizeTenant(String value) { private static String normalizeTenant(String value) {

View File

@ -5,9 +5,9 @@ import jakarta.validation.constraints.NotBlank;
/** /**
* Describes the origin of an acquired event record. * Describes the origin of an acquired event record.
* *
* This is intentionally acquisition/source context and not part of the canonical * This is intentionally acquisition/source context. Equivalent real-world events
* real-world event identity. The same canonical event can be acquired from * from different sources are kept as separate acquired records. Query/read
* TACHOGRAPH/VEHICLE_UNIT and later from TACHOGRAPH/DRIVER_CARD. * models can later combine sources when a preferred source contains gaps.
*/ */
public record EventSourceDto( public record EventSourceDto(
@NotBlank String providerKey, @NotBlank String providerKey,

View File

@ -0,0 +1,46 @@
package at.procon.eventhub.dto;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import java.time.OffsetDateTime;
/**
* Selection scope used by acquisition jobs.
*
* Organisation scope and event-time scope are intentionally modeled together,
* because a tachograph import can be unrestricted, organisation-subtree scoped,
* time-window scoped, or both.
*/
public record ImportScopeDto(
@NotNull ImportScopeType type,
@Valid SourceGroupRefDto rootSourceOrganisation,
boolean includeChildren,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
public ImportScopeDto {
if (type == null) {
type = ImportScopeType.TENANT_ALL;
}
if (occurredFrom != null && occurredTo != null && !occurredFrom.isBefore(occurredTo)) {
throw new IllegalArgumentException("importScope.occurredFrom must be before occurredTo");
}
if (type == ImportScopeType.SOURCE_ORGANISATION_SUBTREE) {
if (rootSourceOrganisation == null || rootSourceOrganisation.type() != SourceGroupType.ORGANISATION) {
throw new IllegalArgumentException("SOURCE_ORGANISATION_SUBTREE requires rootSourceOrganisation.type=ORGANISATION");
}
}
}
public static ImportScopeDto tenantAll(OffsetDateTime occurredFrom, OffsetDateTime occurredTo) {
return new ImportScopeDto(ImportScopeType.TENANT_ALL, null, false, occurredFrom, occurredTo);
}
public String stableKey() {
return type.name() + ":"
+ (rootSourceOrganisation == null ? "ALL" : rootSourceOrganisation.stableKey()) + ":"
+ (includeChildren ? "WITH_CHILDREN" : "NO_CHILDREN") + ":"
+ (occurredFrom == null ? "BEGIN" : occurredFrom.toString()) + ":"
+ (occurredTo == null ? "END" : occurredTo.toString());
}
}

View File

@ -0,0 +1,9 @@
package at.procon.eventhub.dto;
public enum ImportScopeType {
/** Import all data available through the configured tenant/source. */
TENANT_ALL,
/** Import only data related to one source organisation, optionally including child organisations. */
SOURCE_ORGANISATION_SUBTREE
}

View File

@ -0,0 +1,35 @@
package at.procon.eventhub.dto;
/**
* Source-side grouping information.
*
* For tachograph imports this is usually an ORGANISATION from the source DB.
* For YellowFox this is usually a provider FLEET. The value is acquisition
* context and can later be mapped to an internal tenant organisation.
*/
public record SourceGroupRefDto(
SourceGroupType type,
String sourceEntityId,
String code,
String name
) {
public SourceGroupRefDto {
sourceEntityId = normalizeNullable(sourceEntityId);
code = normalizeNullable(code);
name = normalizeNullable(name);
}
public boolean hasValue() {
return type != null || sourceEntityId != null || code != null || name != null;
}
public String stableKey() {
return (type == null ? "" : type.name()) + "|"
+ (sourceEntityId == null ? "" : sourceEntityId) + "|"
+ (code == null ? "" : code);
}
private static String normalizeNullable(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
}

View File

@ -0,0 +1,7 @@
package at.procon.eventhub.dto;
public enum SourceGroupType {
ORGANISATION,
FLEET,
ALL
}

View File

@ -9,7 +9,8 @@ import jakarta.validation.Valid;
public record VehicleRefDto( public record VehicleRefDto(
String sourceEntityId, String sourceEntityId,
String vin, String vin,
@Valid VehicleRegistrationRefDto vehicleRegistration @Valid VehicleRegistrationRefDto vehicleRegistration,
@Valid SourceGroupRefDto sourceOrganisation
) { ) {
public VehicleRefDto { public VehicleRefDto {
sourceEntityId = normalizeNullable(sourceEntityId); sourceEntityId = normalizeNullable(sourceEntityId);

View File

@ -3,6 +3,8 @@ package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.DataPackageStatus; import at.procon.eventhub.dto.DataPackageStatus;
import at.procon.eventhub.dto.DataPackageType; import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
@ -27,18 +29,25 @@ public class DataPackageRepository {
String packageKey, String packageKey,
EventHubPackageRequest packageInfo, EventHubPackageRequest packageInfo,
DataPackageType packageType, DataPackageType packageType,
OffsetDateTime requestedFrom, OffsetDateTime occurredFrom,
OffsetDateTime requestedTo, OffsetDateTime occurredTo,
Map<String, Object> metadata Map<String, Object> metadata
) { ) {
UUID id = UUID.randomUUID(); UUID id = UUID.randomUUID();
SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup();
ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope();
SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation();
jdbcTemplate.update( jdbcTemplate.update(
""" """
insert into eventhub.data_package( insert into eventhub.data_package(
id, event_source_id, tenant_key, package_key, package_type, status, id, event_source_id, tenant_key, package_key, package_type, status,
source_group_type, source_group_entity_id, source_group_code, source_group_name,
import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name,
include_children, occurred_from, occurred_to,
event_family, business_date, external_package_id, event_family, business_date, external_package_id,
requested_from, requested_to, received_at, event_count, metadata received_at, event_count, metadata
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), 0, ?::jsonb) ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), 0, ?::jsonb)
""", """,
ps -> { ps -> {
ps.setObject(1, id); ps.setObject(1, id);
@ -47,12 +56,21 @@ public class DataPackageRepository {
ps.setString(4, packageKey); ps.setString(4, packageKey);
ps.setString(5, packageType.name()); ps.setString(5, packageType.name());
ps.setString(6, DataPackageStatus.IMPORTING.name()); ps.setString(6, DataPackageStatus.IMPORTING.name());
ps.setString(7, packageInfo == null ? null : packageInfo.eventFamily()); ps.setString(7, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name());
ps.setObject(8, packageInfo == null ? null : packageInfo.businessDate()); ps.setString(8, sourceGroup == null ? null : sourceGroup.sourceEntityId());
ps.setString(9, packageInfo == null ? packageKey : packageInfo.externalPackageId()); ps.setString(9, sourceGroup == null ? null : sourceGroup.code());
ps.setObject(10, requestedFrom); ps.setString(10, sourceGroup == null ? null : sourceGroup.name());
ps.setObject(11, requestedTo); ps.setString(11, importScope == null ? null : importScope.type().name());
ps.setString(12, toJson(metadata)); ps.setString(12, rootOrg == null ? null : rootOrg.sourceEntityId());
ps.setString(13, rootOrg == null ? null : rootOrg.code());
ps.setString(14, rootOrg == null ? null : rootOrg.name());
ps.setBoolean(15, importScope != null && importScope.includeChildren());
ps.setObject(16, occurredFrom);
ps.setObject(17, occurredTo);
ps.setString(18, packageInfo == null ? null : packageInfo.eventFamily());
ps.setObject(19, packageInfo == null ? null : packageInfo.businessDate());
ps.setString(20, packageInfo == null ? packageKey : packageInfo.externalPackageId());
ps.setString(21, toJson(metadata));
} }
); );
return id; return id;

View File

@ -3,9 +3,10 @@ package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.service.EventNaturalKeyService; import at.procon.eventhub.service.EventAcquisitionRecordKeyService;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -23,19 +24,21 @@ public class EventRepository {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final EventNaturalKeyService naturalKeyService; private final EventAcquisitionRecordKeyService recordKeyService;
public EventRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, EventNaturalKeyService naturalKeyService) { public EventRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, EventAcquisitionRecordKeyService recordKeyService) {
this.jdbcTemplate = jdbcTemplate; this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.naturalKeyService = naturalKeyService; this.recordKeyService = recordKeyService;
} }
/** /**
* Temporary acquisition-stage persistence. The canonical storage model will * Acquisition-stage persistence. This table stores source records as imported.
* be finalized later; for now this table keeps acquired point events with * It does not merge or deduplicate equivalent events from different sources;
* EventSource context, source-side driver/vehicle refs, generic eventDetails, * later query/read models can combine sources when a preferred source has gaps.
* and raw payload JSON. * For now this table keeps acquired point events with EventSource context,
* source-side driver/vehicle refs, source organisation information, generic
* eventDetails, and raw payload JSON.
*/ */
public int batchInsert(UUID packageId, int eventSourceId, List<EventHubEventDto> events) { public int batchInsert(UUID packageId, int eventSourceId, List<EventHubEventDto> events) {
int[] counts = jdbcTemplate.batchUpdate( int[] counts = jdbcTemplate.batchUpdate(
@ -44,20 +47,24 @@ public class EventRepository {
id, event_source_id, data_package_id, id, event_source_id, data_package_id,
external_source_event_id, external_source_event_id,
driver_source_entity_id, driver_card_nation, driver_card_number, driver_source_entity_id, driver_card_nation, driver_card_number,
driver_source_org_entity_id, driver_source_org_code, driver_source_org_name,
vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number,
vehicle_source_org_entity_id, vehicle_source_org_code, vehicle_source_org_name,
occurred_at, received_partner_at, received_hub_at, occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle, event_domain, event_type, lifecycle,
odometer_m, latitude, longitude, odometer_m, latitude, longitude,
event_details, payload, manual_entry, event_details, payload, manual_entry,
canonical_key_hash, source_record_key_hash source_record_key_hash, event_signature_hash
) values ( ) values (
?, ?, ?, ?, ?, ?,
?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?,
?::jsonb, ?::jsonb, ?, ?::jsonb, ?::jsonb, ?,
?, ? ?, ?
) )
@ -71,8 +78,10 @@ public class EventRepository {
OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt(); OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt();
DriverRefDto driverRef = event.driverRef(); DriverRefDto driverRef = event.driverRef();
DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard(); DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard();
SourceGroupRefDto driverOrg = driverRef == null ? null : driverRef.sourceOrganisation();
VehicleRefDto vehicleRef = event.vehicleRef(); VehicleRefDto vehicleRef = event.vehicleRef();
VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration(); VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration();
SourceGroupRefDto vehicleOrg = vehicleRef == null ? null : vehicleRef.sourceOrganisation();
ps.setObject(1, eventId); ps.setObject(1, eventId);
ps.setInt(2, eventSourceId); ps.setInt(2, eventSourceId);
@ -82,31 +91,37 @@ public class EventRepository {
ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId()); ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId());
ps.setString(6, driverCard == null ? null : driverCard.nation()); ps.setString(6, driverCard == null ? null : driverCard.nation());
ps.setString(7, driverCard == null ? null : driverCard.number()); ps.setString(7, driverCard == null ? null : driverCard.number());
ps.setString(8, driverOrg == null ? null : driverOrg.sourceEntityId());
ps.setString(9, driverOrg == null ? null : driverOrg.code());
ps.setString(10, driverOrg == null ? null : driverOrg.name());
ps.setString(8, vehicleRef == null ? null : vehicleRef.sourceEntityId()); ps.setString(11, vehicleRef == null ? null : vehicleRef.sourceEntityId());
ps.setString(9, vehicleRef == null ? null : vehicleRef.vin()); ps.setString(12, vehicleRef == null ? null : vehicleRef.vin());
ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation()); ps.setString(13, vehicleRegistration == null ? null : vehicleRegistration.nation());
ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number()); ps.setString(14, vehicleRegistration == null ? null : vehicleRegistration.number());
ps.setString(15, vehicleOrg == null ? null : vehicleOrg.sourceEntityId());
ps.setString(16, vehicleOrg == null ? null : vehicleOrg.code());
ps.setString(17, vehicleOrg == null ? null : vehicleOrg.name());
ps.setObject(12, event.occurredAt()); ps.setObject(18, event.occurredAt());
ps.setObject(13, event.receivedPartnerAt()); ps.setObject(19, event.receivedPartnerAt());
ps.setObject(14, receivedHubAt); ps.setObject(20, receivedHubAt);
ps.setString(15, event.eventDomain().name()); ps.setString(21, event.eventDomain().name());
ps.setString(16, event.eventType().name()); ps.setString(22, event.eventType().name());
ps.setString(17, event.lifecycle().name()); ps.setString(23, event.lifecycle().name());
setNullableLong(ps, 18, event.odometerM()); setNullableLong(ps, 24, event.odometerM());
if (event.position() == null) { if (event.position() == null) {
ps.setNull(19, Types.NUMERIC); ps.setNull(25, Types.NUMERIC);
ps.setNull(20, Types.NUMERIC); ps.setNull(26, Types.NUMERIC);
} else { } else {
ps.setObject(19, event.position().latitude()); ps.setObject(25, event.position().latitude());
ps.setObject(20, event.position().longitude()); ps.setObject(26, event.position().longitude());
} }
ps.setString(21, toJson(objectMapper.valueToTree(event.eventDetails()))); ps.setString(27, toJson(objectMapper.valueToTree(event.eventDetails())));
ps.setString(22, toJson(event.payload())); ps.setString(28, toJson(event.payload()));
ps.setBoolean(23, event.manualEntry()); ps.setBoolean(29, event.manualEntry());
ps.setString(24, naturalKeyService.buildCanonicalKeyHash(event)); ps.setString(30, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId));
ps.setString(25, naturalKeyService.buildSourceRecordKeyHash(event, eventSourceId)); ps.setString(31, recordKeyService.buildEventSignatureHash(event));
} }
@Override @Override

View File

@ -0,0 +1,143 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventHubEventDto;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.springframework.stereotype.Component;
@Component
public class EventAcquisitionRecordKeyService {
/**
* Source-record key is used only for acquisition idempotency of the same source record.
* It does not merge or deduplicate equivalent events from different providers/sources.
*/
public String buildSourceRecordKeyHash(EventHubEventDto event, int eventSourceId) {
String sourceRecordKey = String.join("|",
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
String.valueOf(eventSourceId),
nullToEmpty(event.externalSourceEventId())
);
return sha256Hex(sourceRecordKey);
}
/**
* Event signature is a non-unique acquisition-time semantic key.
*
* It intentionally excludes EventSource and externalSourceEventId. Multiple source records
* may have the same signature and still must be stored separately. The signature is only a
* later query/projection hint for comparing sources, filling gaps, or building merged views.
*/
public String buildEventSignatureHash(EventHubEventDto event) {
String signature = String.join("|",
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
driverSignatureKey(event),
vehicleSignatureKey(event),
normalizeTime(event.occurredAt()),
event.eventDomain() == null ? "" : event.eventDomain().name(),
event.eventType() == null ? "" : event.eventType().name(),
event.lifecycle() == null ? "" : event.lifecycle().name(),
event.eventDetails() == null ? "" : nullToEmpty(event.eventDetails().type()),
canonicalJson(event.eventDetails() == null ? null : event.eventDetails().attributes()),
event.position() == null ? "" : nullToEmpty(event.position().latitude()) + ":" + nullToEmpty(event.position().longitude())
);
return sha256Hex(signature);
}
private String driverSignatureKey(EventHubEventDto event) {
if (event.driverRef() == null) {
return "";
}
if (event.driverRef().driverCard() != null && event.driverRef().driverCard().hasValue()) {
return "CARD:" + event.driverRef().driverCard().stableKey();
}
return "SOURCE_DRIVER:" + nullToEmpty(event.driverRef().sourceEntityId());
}
private String vehicleSignatureKey(EventHubEventDto event) {
if (event.vehicleRef() == null) {
return "";
}
if (event.vehicleRef().vehicleRegistration() != null && event.vehicleRef().vehicleRegistration().hasValue()) {
return "VRN:" + event.vehicleRef().vehicleRegistration().stableKey();
}
if (event.vehicleRef().vin() != null && !event.vehicleRef().vin().isBlank()) {
return "VIN:" + event.vehicleRef().vin();
}
return "SOURCE_VEHICLE:" + nullToEmpty(event.vehicleRef().sourceEntityId());
}
private String normalizeTime(OffsetDateTime value) {
return value == null ? "" : value.toInstant().toString();
}
private String nullToEmpty(Object value) {
return value == null ? "" : String.valueOf(value);
}
private String canonicalJson(JsonNode node) {
if (node == null || node.isNull()) {
return "";
}
if (node.isObject()) {
ObjectNode objectNode = (ObjectNode) node;
List<Map.Entry<String, JsonNode>> fields = new ArrayList<>();
Iterator<Map.Entry<String, JsonNode>> iterator = objectNode.fields();
iterator.forEachRemaining(fields::add);
fields.sort(Comparator.comparing(Map.Entry::getKey));
StringBuilder result = new StringBuilder("{");
boolean first = true;
for (Map.Entry<String, JsonNode> field : fields) {
if (!first) {
result.append(',');
}
first = false;
result.append(escape(field.getKey())).append(':').append(canonicalJson(field.getValue()));
}
return result.append('}').toString();
}
if (node.isArray()) {
ArrayNode arrayNode = (ArrayNode) node;
StringBuilder result = new StringBuilder("[");
for (int i = 0; i < arrayNode.size(); i++) {
if (i > 0) {
result.append(',');
}
result.append(canonicalJson(arrayNode.get(i)));
}
return result.append(']').toString();
}
if (node.isTextual()) {
return escape(node.asText());
}
return node.toString();
}
private String escape(String value) {
return '"' + value.replace("\\", "\\\\").replace("\"", "\\\"") + '"';
}
private String sha256Hex(String value) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(value.getBytes(StandardCharsets.UTF_8));
StringBuilder result = new StringBuilder(hash.length * 2);
for (byte b : hash) {
result.append(String.format("%02x", b));
}
return result.toString();
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("SHA-256 is not available", e);
}
}
}

View File

@ -2,6 +2,8 @@ package at.procon.eventhub.service;
import at.procon.eventhub.dto.EventDomain; import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.ImportScopeType;
import at.procon.eventhub.dto.SourceGroupType;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
@ -28,6 +30,7 @@ public class EventHubEventValidator {
if (event.packageInfo().eventSource() == null) { if (event.packageInfo().eventSource() == null) {
throw new IllegalArgumentException("packageInfo.eventSource must be set"); throw new IllegalArgumentException("packageInfo.eventSource must be set");
} }
validateImportScope(event);
if (event.eventDomain() == null) { if (event.eventDomain() == null) {
throw new IllegalArgumentException("eventDomain must be set"); throw new IllegalArgumentException("eventDomain must be set");
} }
@ -43,6 +46,23 @@ public class EventHubEventValidator {
validateEventDetails(event); validateEventDetails(event);
} }
private void validateImportScope(EventHubEventDto event) {
var importScope = event.packageInfo().importScope();
if (importScope == null) {
throw new IllegalArgumentException("packageInfo.importScope must be set");
}
if (importScope.occurredFrom() != null && importScope.occurredTo() != null
&& !importScope.occurredFrom().isBefore(importScope.occurredTo())) {
throw new IllegalArgumentException("importScope.occurredFrom must be before occurredTo");
}
if (importScope.type() == ImportScopeType.SOURCE_ORGANISATION_SUBTREE) {
if (importScope.rootSourceOrganisation() == null
|| importScope.rootSourceOrganisation().type() != SourceGroupType.ORGANISATION) {
throw new IllegalArgumentException("SOURCE_ORGANISATION_SUBTREE requires rootSourceOrganisation.type=ORGANISATION");
}
}
}
private void validateEventDetails(EventHubEventDto event) { private void validateEventDetails(EventHubEventDto event) {
if (event.eventDetails() == null) { if (event.eventDetails() == null) {
return; return;

View File

@ -64,8 +64,8 @@ public class EventHubIngestionService {
batch.packageKey(), batch.packageKey(),
packageInfo, packageInfo,
batch.packageType(), batch.packageType(),
batch.requestedFrom(), batch.occurredFrom(),
batch.requestedTo(), batch.occurredTo(),
batch.metadata() batch.metadata()
); );

View File

@ -12,6 +12,8 @@ public class EventHubPackageKeyBuilder {
if (packageInfo != null) { if (packageInfo != null) {
return packageInfo.tenantKey() return packageInfo.tenantKey()
+ ":" + packageInfo.eventSource().stableKey() + ":" + packageInfo.eventSource().stableKey()
+ ":" + (packageInfo.sourceGroup() == null ? "NO_GROUP" : packageInfo.sourceGroup().stableKey())
+ ":" + (packageInfo.importScope() == null ? "NO_SCOPE" : packageInfo.importScope().stableKey())
+ ":" + packageInfo.eventFamily() + ":" + packageInfo.eventFamily()
+ ":" + (packageInfo.businessDate() == null ? "NO_DATE" : packageInfo.businessDate()) + ":" + (packageInfo.businessDate() == null ? "NO_DATE" : packageInfo.businessDate())
+ ":" + packageInfo.externalPackageId(); + ":" + packageInfo.externalPackageId();

View File

@ -6,6 +6,7 @@ import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle; import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType; import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.source.TachographActivityDto; import at.procon.eventhub.dto.source.TachographActivityDto;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
@ -37,13 +38,15 @@ public class TachographActivityEventMapper {
null null
); );
LocalDate businessDate = source.occurredAt().toLocalDate(); LocalDate businessDate = source.occurredAt().toLocalDate();
var occurredFrom = businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime();
var occurredTo = businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime();
EventHubPackageRequest packageInfo = new EventHubPackageRequest( EventHubPackageRequest packageInfo = new EventHubPackageRequest(
tenantOrDefault(source.tenantKey()), tenantOrDefault(source.tenantKey()),
eventSource, eventSource,
null,
ImportScopeDto.tenantAll(occurredFrom, occurredTo),
EventDomain.DRIVER_ACTIVITY.name(), EventDomain.DRIVER_ACTIVITY.name(),
businessDate, businessDate,
businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
eventSource.stableKey() + ":DRIVER_ACTIVITY:" + businessDate eventSource.stableKey() + ":DRIVER_ACTIVITY:" + businessDate
); );

View File

@ -7,6 +7,9 @@ import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType; import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.GeoPointDto; import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.SourceGroupType;
import at.procon.eventhub.dto.source.TelematicsPositionDto; import at.procon.eventhub.dto.source.TelematicsPositionDto;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
@ -34,13 +37,18 @@ public class TelematicsPositionEventMapper {
source.externalFleetKey() source.externalFleetKey()
); );
LocalDate businessDate = source.occurredAt().toLocalDate(); LocalDate businessDate = source.occurredAt().toLocalDate();
var occurredFrom = businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime();
var occurredTo = businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime();
SourceGroupRefDto sourceGroup = source.externalFleetKey() == null || source.externalFleetKey().isBlank()
? null
: new SourceGroupRefDto(SourceGroupType.FLEET, source.externalFleetKey(), source.externalFleetKey(), null);
EventHubPackageRequest packageInfo = new EventHubPackageRequest( EventHubPackageRequest packageInfo = new EventHubPackageRequest(
tenantOrDefault(source.tenantKey()), tenantOrDefault(source.tenantKey()),
eventSource, eventSource,
sourceGroup,
ImportScopeDto.tenantAll(occurredFrom, occurredTo),
EventDomain.POSITION.name(), EventDomain.POSITION.name(),
businessDate, businessDate,
businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
eventSource.stableKey() + ":POSITION:" + businessDate eventSource.stableKey() + ":POSITION:" + businessDate
); );

View File

@ -10,6 +10,9 @@ import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType; import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.GeoPointDto; import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.SourceGroupType;
import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; import at.procon.eventhub.dto.source.YellowFoxD8BookingDto;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
@ -48,13 +51,18 @@ public class YellowFoxD8BookingEventMapper {
source.externalFleetKey() source.externalFleetKey()
); );
LocalDate businessDate = source.occurredAt().toLocalDate(); LocalDate businessDate = source.occurredAt().toLocalDate();
var occurredFrom = businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime();
var occurredTo = businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime();
SourceGroupRefDto sourceGroup = source.externalFleetKey() == null || source.externalFleetKey().isBlank()
? null
: new SourceGroupRefDto(SourceGroupType.FLEET, source.externalFleetKey(), source.externalFleetKey(), null);
EventHubPackageRequest packageInfo = new EventHubPackageRequest( EventHubPackageRequest packageInfo = new EventHubPackageRequest(
tenantOrDefault(source.tenantKey()), tenantOrDefault(source.tenantKey()),
eventSource, eventSource,
sourceGroup,
ImportScopeDto.tenantAll(occurredFrom, occurredTo),
normalized.domain().name(), normalized.domain().name(),
businessDate, businessDate,
businessDate.atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
businessDate.plusDays(1).atStartOfDay(source.occurredAt().getOffset()).toOffsetDateTime(),
eventSource.stableKey() + ":" + normalized.domain().name() + ":" + businessDate eventSource.stableKey() + ":" + normalized.domain().name() + ":" + businessDate
); );

View File

@ -3,7 +3,7 @@ create extension if not exists pgcrypto;
create schema if not exists eventhub; create schema if not exists eventhub;
-- Acquisition source definition. This represents where the imported source -- Acquisition source definition. This represents where the imported source
-- record came from, not necessarily the canonical identity of the real-world event. -- record came from. Source records are intentionally kept separately by provider/source.
create table if not exists eventhub.event_source ( create table if not exists eventhub.event_source (
id integer generated always as identity primary key, id integer generated always as identity primary key,
provider_key text not null, provider_key text not null,
@ -16,8 +16,8 @@ create table if not exists eventhub.event_source (
constraint ux_event_source unique (provider_key, source_kind, source_key, source_instance_key) constraint ux_event_source unique (provider_key, source_kind, source_key, source_instance_key)
); );
-- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/2026-04-28. -- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/import scope.
-- Final canonical storage can be discussed later; this table is still useful for acquisition audit. -- This table captures the source grouping and the organisation/time import scope used for acquisition.
create table if not exists eventhub.data_package ( create table if not exists eventhub.data_package (
id uuid primary key, id uuid primary key,
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
@ -25,24 +25,36 @@ create table if not exists eventhub.data_package (
package_key text not null, package_key text not null,
package_type text not null, package_type text not null,
status text not null, status text not null,
source_group_type text,
source_group_entity_id text,
source_group_code text,
source_group_name text,
import_scope_type text,
root_source_org_entity_id text,
root_source_org_code text,
root_source_org_name text,
include_children boolean not null default false,
occurred_from timestamptz,
occurred_to timestamptz,
event_family text, event_family text,
business_date date, business_date date,
external_package_id text, external_package_id text,
requested_from timestamptz,
requested_to timestamptz,
received_at timestamptz not null default now(), received_at timestamptz not null default now(),
completed_at timestamptz, completed_at timestamptz,
event_count integer not null default 0, event_count integer not null default 0,
metadata jsonb not null default '{}'::jsonb, metadata jsonb not null default '{}'::jsonb,
error_message text, error_message text,
constraint ux_data_package_external unique (tenant_key, event_source_id, external_package_id, received_at) constraint ux_data_package_external unique (tenant_key, event_source_id, external_package_id, received_at),
constraint chk_data_package_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to)
); );
-- Temporary acquisition-stage point-event store. -- Temporary acquisition-stage point-event store.
-- It keeps the discussed DTO shape: EventSource context, externalSourceEventId, -- It keeps the discussed DTO shape: EventSource context, externalSourceEventId,
-- one occurredAt timestamp, source-side driver/vehicle refs, normalized event details, -- one occurredAt timestamp, source-side driver/vehicle refs, source-side organisation assignments,
-- and raw JSON payload. It intentionally has no internal driver_id/vehicle_id in the -- normalized event details, and raw JSON payload.
-- incoming model; master-data resolution can be added later.
create table if not exists eventhub.acquired_event ( create table if not exists eventhub.acquired_event (
id uuid not null, id uuid not null,
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
@ -53,11 +65,17 @@ create table if not exists eventhub.acquired_event (
driver_source_entity_id text, driver_source_entity_id text,
driver_card_nation text, driver_card_nation text,
driver_card_number text, driver_card_number text,
driver_source_org_entity_id text,
driver_source_org_code text,
driver_source_org_name text,
vehicle_source_entity_id text, vehicle_source_entity_id text,
vehicle_vin text, vehicle_vin text,
vehicle_registration_nation text, vehicle_registration_nation text,
vehicle_registration_number text, vehicle_registration_number text,
vehicle_source_org_entity_id text,
vehicle_source_org_code text,
vehicle_source_org_name text,
occurred_at timestamptz not null, occurred_at timestamptz not null,
received_partner_at timestamptz, received_partner_at timestamptz,
@ -75,12 +93,15 @@ create table if not exists eventhub.acquired_event (
payload jsonb not null default '{}'::jsonb, payload jsonb not null default '{}'::jsonb,
manual_entry boolean not null default false, manual_entry boolean not null default false,
-- Excludes EventSource: useful later for canonical event deduplication. -- Includes tenant + EventSource + externalSourceEventId. Used only for source-record import idempotency.
canonical_key_hash text not null, -- It does not merge equivalent events from different providers/sources.
-- Includes tenant + EventSource + externalSourceEventId: prevents duplicate imports of the same source record.
source_record_key_hash text not null, source_record_key_hash text not null,
-- Non-unique semantic acquisition signature. It intentionally excludes EventSource and
-- externalSourceEventId and is only a later query/projection hint for source comparison,
-- gap filling, and merged timelines. It must not be used for import deduplication.
event_signature_hash text,
created_at timestamptz not null default now(), created_at timestamptz not null default now(),
constraint pk_acquired_event primary key (occurred_at, id), constraint pk_acquired_event primary key (occurred_at, id),
@ -99,8 +120,9 @@ create table if not exists eventhub.acquired_event (
create unique index if not exists ux_acquired_event_source_record create unique index if not exists ux_acquired_event_source_record
on eventhub.acquired_event(source_record_key_hash); on eventhub.acquired_event(source_record_key_hash);
create index if not exists idx_acquired_event_canonical_key create index if not exists idx_acquired_event_signature
on eventhub.acquired_event(canonical_key_hash); on eventhub.acquired_event(event_signature_hash)
where event_signature_hash is not null;
create index if not exists idx_acquired_event_vehicle_vin_time create index if not exists idx_acquired_event_vehicle_vin_time
on eventhub.acquired_event(vehicle_vin, occurred_at desc) on eventhub.acquired_event(vehicle_vin, occurred_at desc)
@ -114,6 +136,14 @@ create index if not exists idx_acquired_event_driver_card_time
on eventhub.acquired_event(driver_card_nation, driver_card_number, occurred_at desc) on eventhub.acquired_event(driver_card_nation, driver_card_number, occurred_at desc)
where driver_card_number is not null; where driver_card_number is not null;
create index if not exists idx_acquired_event_driver_org_time
on eventhub.acquired_event(driver_source_org_entity_id, occurred_at desc)
where driver_source_org_entity_id is not null;
create index if not exists idx_acquired_event_vehicle_org_time
on eventhub.acquired_event(vehicle_source_org_entity_id, occurred_at desc)
where vehicle_source_org_entity_id is not null;
create index if not exists idx_acquired_event_domain_type_time create index if not exists idx_acquired_event_domain_type_time
on eventhub.acquired_event(event_domain, event_type, occurred_at desc); on eventhub.acquired_event(event_domain, event_type, occurred_at desc);
@ -125,3 +155,6 @@ create index if not exists idx_acquired_event_payload_gin
create index if not exists idx_data_package_source_time create index if not exists idx_data_package_source_time
on eventhub.data_package(tenant_key, event_source_id, received_at desc); on eventhub.data_package(tenant_key, event_source_id, received_at desc);
create index if not exists idx_data_package_scope
on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to);

View File

@ -0,0 +1,73 @@
package at.procon.eventhub;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDetailsDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.service.EventAcquisitionRecordKeyService;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class EventAcquisitionRecordKeyServiceTest {
private final ObjectMapper objectMapper = new ObjectMapper();
private final EventAcquisitionRecordKeyService service = new EventAcquisitionRecordKeyService();
@Test
void eventSignatureExcludesSourceAndExternalSourceEventId() {
EventHubEventDto vuEvent = event(
"TACHOGRAPH:VU:activity:456:start",
new EventSourceDto("TACHOGRAPH", "VEHICLE_UNIT", "TACHOGRAPH_VU", "tachograph-db", null, null)
);
EventHubEventDto cardEvent = event(
"TACHOGRAPH:CARD:activity:789:start",
new EventSourceDto("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD", "tachograph-db", null, null)
);
assertThat(service.buildSourceRecordKeyHash(vuEvent, 1))
.isNotEqualTo(service.buildSourceRecordKeyHash(cardEvent, 2));
assertThat(service.buildEventSignatureHash(vuEvent))
.isEqualTo(service.buildEventSignatureHash(cardEvent));
}
private EventHubEventDto event(String externalSourceEventId, EventSourceDto eventSource) {
EventHubPackageRequest packageInfo = new EventHubPackageRequest(
"tenant-1",
eventSource,
null,
ImportScopeDto.tenantAll(null, null),
"DRIVER_ACTIVITY",
null,
eventSource.sourceKey() + ":package"
);
return new EventHubEventDto(
null,
externalSourceEventId,
new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789"), null),
new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345"), null),
OffsetDateTime.parse("2026-04-28T08:00:00+02:00"),
null,
null,
EventDomain.DRIVER_ACTIVITY,
EventType.DRIVE,
EventLifecycle.START,
null,
null,
new EventDetailsDto("DRIVER_ACTIVITY", objectMapper.createObjectNode().put("cardSlot", "DRIVER")),
null,
false,
packageInfo
);
}
}

View File

@ -35,8 +35,8 @@ class YellowFoxD8BookingEventMapperTest {
3, 3,
OffsetDateTime.parse("2026-04-29T08:15:00+02:00"), OffsetDateTime.parse("2026-04-29T08:15:00+02:00"),
null, null,
new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789")), new DriverRefDto("driver-source-100", new DriverCardRefDto("AT", "D123456789"), null),
new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345")), new VehicleRefDto("vehicle-source-200", "WDB9634031L123456", new VehicleRegistrationRefDto("AT", "W-12345"), null),
null, null,
null, null,
null, null,