# EventHub Acquisition Service Spring Boot + Apache Camel skeleton for acquiring normalized EventHub point events from multiple providers/sources. The current version focuses on **acquisition from source systems**, especially tachograph DB data. It stores source records as imported. It does **not** merge or deduplicate equivalent events from different providers/sources. It does keep a non-unique `eventSignatureHash` as a future query/projection hint. ## Namespace ```text at.procon.eventhub ``` ## Main model decisions ### One event = one point in time `EventHubEventDto` has exactly one timestamp: ```text occurredAt ``` There is no generic `duration`, `endTime`, `validFrom`, or `validTo`. If a source row represents an interval, a mapper may emit separate point events such as `DRIVE START` and `DRIVE END`. ### Tenant is package/job-level `tenantKey` identifies the customer/data owner. It is mandatory for import packages and tachograph import requests. ### EventSource identifies the technical source Example: ```json { "providerKey": "TACHOGRAPH", "sourceKind": "VEHICLE_UNIT", "sourceKey": "TACHOGRAPH_VEHICLE_UNIT", "sourceInstanceKey": "main-tachograph-db", "tenantProviderSettingKey": "kralowetz-tachograph-prod", "externalFleetKey": null } ``` Examples: ```text TACHOGRAPH / VEHICLE_UNIT TACHOGRAPH / DRIVER_CARD YELLOWFOX / TELEMATICS_PLATFORM / YELLOWFOX_D8 FLEETBOARD / TELEMATICS_PLATFORM / FLEETBOARD_POSITION ``` ### SourceGroup is package/source grouping only For tachograph, `sourceGroup` can identify the selected source organisation/root organisation. ```json "sourceGroup": { "type": "ORGANISATION", "sourceEntityId": "147", "code": "147", "name": "Kralowetz" } ``` For YellowFox, it can identify the provider fleet. ```json "sourceGroup": { "type": "FLEET", "sourceEntityId": "7", "code": "7", "name": "YellowFox Fleet 7" } ``` YellowFox fleet is not forced to be an organisation. It belongs to the same tenant/customer and can later be mapped or resolved through vehicle/driver master data if needed. ### ImportScope describes data selection `importScope` describes what was selected from the source system. Full DB import: ```json "importScope": { "type": "TENANT_ALL", "rootSourceOrganisation": null, "includeChildren": false, "occurredFrom": null, "occurredTo": null } ``` Organisation subtree + time-window import: ```json "importScope": { "type": "SOURCE_ORGANISATION_SUBTREE", "rootSourceOrganisation": { "type": "ORGANISATION", "sourceEntityId": "147", "code": "147", "name": "Kralowetz" }, "includeChildren": true, "occurredFrom": "2026-04-28T00:00:00+02:00", "occurredTo": "2026-04-29T00:00:00+02:00" } ``` `occurredFrom` is inclusive. `occurredTo` is exclusive. Both can be `null` for complete DB/history imports. ### Driver/vehicle refs do not contain organisation Organisation assignment is a **master-data relation**, not an event property. Events depend on driver and/or vehicle. The relation of organisation to driver/vehicle is imported and resolved separately from master data using `occurredAt`. Driver ref: ```json "driverRef": { "sourceEntityId": "driver-100", "driverCard": { "nation": "AT", "number": "D123456789" } } ``` Vehicle ref: ```json "vehicleRef": { "sourceEntityId": "vehicle-200", "vin": "WDB9634031L123456", "vehicleRegistration": { "nation": "AT", "number": "W-12345" } } ``` Driver-card-only imports can carry only a nation-scoped VRN and no VIN: ```json "vehicleRef": { "sourceEntityId": null, "vin": null, "vehicleRegistration": { "nation": "AT", "number": "W-12345" } } ``` Later master-data resolution can connect `VRN + nation + occurredAt` to a VIN/vehicle. ### No cross-source deduplication during acquisition The acquisition layer stores every source record independently. It uses `sourceRecordKeyHash` only for idempotency of the same source event: ```text tenantKey + EventSource + externalSourceEventId ``` It also stores a non-unique `eventSignatureHash`. This is only a semantic hint for future query-time merging/gap filling. It is not unique and must not suppress imports. ## Tachograph import job model For real tachograph DB extraction, use a tachograph import request. This describes the job and produces an import plan. SQL extraction routes are intentionally scaffolded as the next implementation step. ```http POST /api/eventhub/acquisition/tachograph/imports/plan POST /api/eventhub/acquisition/tachograph/imports/start ``` Example: initial import from one root organisation and its children: ```json { "tenantKey": "kralowetz", "eventSource": { "providerKey": "TACHOGRAPH", "sourceKind": "MIXED", "sourceKey": "TACHOGRAPH_DB", "sourceInstanceKey": "main-tachograph-db", "tenantProviderSettingKey": "kralowetz-tachograph-prod" }, "sourceGroup": { "type": "ORGANISATION", "sourceEntityId": "147", "code": "147", "name": "Kralowetz" }, "importScope": { "type": "SOURCE_ORGANISATION_SUBTREE", "rootSourceOrganisation": { "type": "ORGANISATION", "sourceEntityId": "147", "code": "147", "name": "Kralowetz" }, "includeChildren": true, "occurredFrom": "2025-01-01T00:00:00+01:00", "occurredTo": null }, "eventFamilies": [ "DRIVER_ACTIVITY", "DRIVER_CARD", "POSITION", "BORDER_CROSSING", "LOAD_UNLOAD", "PLACE", "SPECIFIC_CONDITION", "SPEEDING" ], "mode": "INITIAL_BACKFILL", "refreshMasterDataFirst": true, "acquisitionStrategy": "OCCURRED_AT_WINDOW_WITH_OVERLAP" } ``` Example: regular incremental update: ```json { "tenantKey": "kralowetz", "eventSource": { "providerKey": "TACHOGRAPH", "sourceKind": "MIXED", "sourceKey": "TACHOGRAPH_DB", "sourceInstanceKey": "main-tachograph-db", "tenantProviderSettingKey": "kralowetz-tachograph-prod" }, "sourceGroup": { "type": "ORGANISATION", "sourceEntityId": "147" }, "importScope": { "type": "SOURCE_ORGANISATION_SUBTREE", "rootSourceOrganisation": { "type": "ORGANISATION", "sourceEntityId": "147" }, "includeChildren": true, "occurredFrom": null, "occurredTo": null }, "eventFamilies": ["DRIVER_ACTIVITY", "DRIVER_CARD", "POSITION", "BORDER_CROSSING", "LOAD_UNLOAD", "PLACE", "SPECIFIC_CONDITION", "SPEEDING"], "mode": "INCREMENTAL_UPDATE", "refreshMasterDataFirst": true, "acquisitionStrategy": "SOURCE_PACKAGE_WATERMARK" } ``` ## Tachograph extraction plan The import-plan service currently creates extraction definitions like: ```text DRIVER_ACTIVITY / VEHICLE_UNIT -> VUActivity DRIVER_ACTIVITY / DRIVER_CARD -> CardActivity DRIVER_CARD / VEHICLE_UNIT -> IWCycle DRIVER_CARD / DRIVER_CARD -> CardVehiclesUsed POSITION / VEHICLE_UNIT -> VUPlaces, VULoadUnload, VUGnssAccumulatedDriving, VUBorderCrossing POSITION / DRIVER_CARD -> CardPlaces, CardLoadUnload, CardGnssAccumulatedDriving, CardBorderCrossing BORDER_CROSSING / VEHICLE_UNIT -> VUBorderCrossing BORDER_CROSSING / DRIVER_CARD -> CardBorderCrossing LOAD_UNLOAD / VEHICLE_UNIT -> VULoadUnload LOAD_UNLOAD / DRIVER_CARD -> CardLoadUnload SPECIFIC_CONDITION / VEHICLE_UNIT -> VUSpecificCondition SPECIFIC_CONDITION / DRIVER_CARD -> CardSpecificCondition PLACE / VEHICLE_UNIT -> VUPlaces PLACE / DRIVER_CARD -> CardPlaces SPEEDING / VEHICLE_UNIT -> SpeedingEvents ``` The next implementation step is to replace the scaffolded plan items with actual Camel/JDBC SQL extraction routes. ## Acquisition alternatives considered ### Alternative A: occurred-time window import Read events by `occurredAt` for a root organisation/time window. Pros: ```text simple works for initial backfill matches explicit from/to import requests ``` Cons: ```text unsafe as the only incremental method because a newly imported card/VU package can contain old occurredAt data requires overlap windows for regular updates ``` Best use: ```text initial backfill and reprocessing fallback incremental strategy with overlap ``` ### Alternative B: source-package watermark import Read original tachograph card/VU packages that were imported/changed in the tachograph DB since the last successful EventHub run, then extract all events belonging to those packages. Pros: ```text best for regular updates handles late-arriving historical tachograph packages fits the tachograph package concept ``` Cons: ```text requires reliable source package metadata and links from event rows to package/source download more complex SQL and cursor state ``` Best use: ```text primary incremental strategy if tachograph DB exposes package import timestamps/ids ``` ### Alternative C: source-row watermark import Read source event rows changed since last run using row-level `updatedAt` or monotonic IDs. Pros: ```text precise if row update timestamps are reliable does not require package-level model ``` Cons: ```text not possible if source tables do not have reliable changed/updated metadata harder across many event tables ``` Best use: ```text fallback when rows have reliable updatedAt/row version fields ``` ### Alternative D: per vehicle/per driver polling After master-data refresh, loop through vehicles and drivers in the selected organisation subtree and read their event data. Pros: ```text matches your existing data acquisition pattern naturally separates vehicle-unit and driver-card data supports organisation-scoped imports well ``` Cons: ```text can be slower for large fleets requires careful batching/chunking and parallelism can miss late old data unless combined with package/row watermark or overlap ``` Best use: ```text scope resolution and controlled extraction, combined with Alternative A or B ``` ## Recommended ingestion strategy Use a hybrid: ```text Initial import: master data first organisation subtree + occurredFrom/occurredTo chunk by time and/or vehicle/driver import idempotently by sourceRecordKeyHash Regular update: master data first prefer source-package watermark fallback to occurredAt overlap window if package metadata is insufficient import idempotently by sourceRecordKeyHash ``` This means the EventHub acquisition package is an **extraction package**, while the original tachograph card/VU package should be preserved as source metadata in payload or later in a dedicated source-package table. ## Existing package-level normalized event ingestion ```http POST /api/eventhub/acquisition/packages ``` Example: ```json { "package": { "tenantKey": "kralowetz", "eventSource": { "providerKey": "TACHOGRAPH", "sourceKind": "VEHICLE_UNIT", "sourceKey": "TACHOGRAPH_VEHICLE_UNIT", "sourceInstanceKey": "main-tachograph-db", "tenantProviderSettingKey": "kralowetz-tachograph-prod" }, "sourceGroup": { "type": "ORGANISATION", "sourceEntityId": "147" }, "importScope": { "type": "SOURCE_ORGANISATION_SUBTREE", "rootSourceOrganisation": { "type": "ORGANISATION", "sourceEntityId": "147" }, "includeChildren": true, "occurredFrom": "2026-04-28T00:00:00+02:00", "occurredTo": "2026-04-29T00:00:00+02:00" }, "eventFamily": "DRIVER_ACTIVITY", "businessDate": "2026-04-28", "externalPackageId": "TACHOGRAPH:ORG-147-SUBTREE:DRIVER_ACTIVITY:2026-04-28" }, "events": [ { "externalSourceEventId": "TACHOGRAPH:VEHICLE_UNIT:activity:456:start", "driverRef": { "sourceEntityId": "driver-100", "driverCard": { "nation": "AT", "number": "D123456789" } }, "vehicleRef": { "sourceEntityId": "vehicle-200", "vin": "WDB9634031L123456", "vehicleRegistration": { "nation": "AT", "number": "W-12345" } }, "occurredAt": "2026-04-28T08:00:00+02:00", "eventDomain": "DRIVER_ACTIVITY", "eventType": "DRIVE", "lifecycle": "START", "eventDetails": { "type": "DRIVER_ACTIVITY", "attributes": { "cardSlot": "DRIVER", "cardStatus": "INSERTED", "drivingStatus": "SINGLE" } }, "payload": { "raw": { "activity": 3, "cardSlot": 0, "cardStatus": 0, "drivingStatus": 0 } } } ] } ``` ## Routes ```text direct:yellowfox-d8-booking-input direct:telematics-position-input direct:tachograph-activity-input direct:tachograph-import-start direct:eventhub-package-input direct:eventhub-manual-input ``` Common route: ```text direct:eventhub-normalized-input -> validate EventHubEventDto -> create package key from tenant + EventSource + sourceGroup + importScope + eventFamily -> seda:eventhub-batch-input -> aggregate by eventhub.packageKey -> sort by occurredAt inside the batch -> EventHubIngestionService.ingest(...) ``` ## Start PostgreSQL ```bash docker compose up -d ``` ## Run the service ```bash mvn spring-boot:run ``` ## Check acquisition packages ```sql select p.received_at, p.tenant_key, s.provider_key, s.source_kind, s.source_key, p.source_group_type, p.source_group_entity_id, p.import_scope_type, p.root_source_org_entity_id, p.occurred_from, p.occurred_to, p.event_family, p.business_date, p.status, p.event_count from eventhub.data_package p join eventhub.event_source s on s.id = p.event_source_id order by p.received_at desc; ``` ## Check acquired events ```sql select occurred_at, driver_source_entity_id, driver_card_nation, driver_card_number, vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, event_domain, event_type, lifecycle, event_signature_hash, event_details, payload from eventhub.acquired_event order by occurred_at desc; ``` ## Next implementation steps 1. Add actual Camel/JDBC extraction routes behind the tachograph import plan. 2. Implement master-data acquisition first: organisation tree, driver/card assignments, vehicle VIN/VRN assignments, driver/vehicle organisation assignment histories. 3. Implement initial backfill using organisation/time scope. 4. Implement incremental import using source-package watermark, with occurredAt overlap fallback. 5. Discuss query/read models later: source priority and gap filling across tachograph, YellowFox and other sources. ## Implemented tachograph ingestion run model The tachograph import model now follows the agreed design: ```text Initial import organisation subtree + occurredFrom/occurredTo chunked by time and/or entity idempotent inserts by sourceRecordKeyHash Regular update refresh master data first prefer discovery of new/changed original tachograph source packages extract affected event families import idempotently advance cursor after successful extraction Package model import_run = one execution of the tachograph importer data_package = one EventHub extraction batch sourcePackageRef = original tachograph driver-card/VU package reference Deduplication no cross-source deduplication sourceRecordKeyHash prevents same-source duplicate imports eventSignatureHash is non-unique and only helps later query/projection logic ``` ### Import run vs tachograph source package A tachograph database already contains packages from driver cards and vehicle-unit devices. The EventHub model does not force those original packages to become EventHub packages. Instead: ```text Original tachograph source package card/VU package imported into tachograph DB stored as sourcePackageRef when extracting events EventHub data package extraction batch produced by an EventHub import run grouped by tenant, EventSource, event family, extraction code and time chunk ``` This allows a single EventHub import run to process many original tachograph packages, and a single extraction batch may contain rows from many original packages. If the SQL extractor can return source package metadata, it should populate `EventHubEventDto.sourcePackageRef`: ```json "sourcePackageRef": { "packageKind": "VEHICLE_UNIT", "sourcePackageId": "VU-PACKAGE-12345", "sourceEntityId": "vehicle-90021", "packagePeriodFrom": "2026-04-01T00:00:00+02:00", "packagePeriodTo": "2026-04-15T00:00:00+02:00", "importedIntoSourceAt": "2026-04-16T10:30:00+02:00" } ``` ### Tachograph import endpoints `POST /api/eventhub/acquisition/tachograph/imports/plan` returns the calculated event-family extraction plan and time chunks. `POST /api/eventhub/acquisition/tachograph/imports/start` now creates: ```text 1 import_run row N planned data_package rows, one per extraction definition and time chunk ``` The SQL extraction routes are intentionally separated from run planning. They should pick planned extraction packages, execute the corresponding SQL, map rows to `EventHubEventDto`, set `sourcePackageRef` when known, and send them to `direct:eventhub-normalized-input`. ### Initial import Example initial import request: ```json { "tenantKey": "kralowetz", "eventSource": { "providerKey": "TACHOGRAPH", "sourceKind": "MIXED", "sourceKey": "TACHOGRAPH_DB", "sourceInstanceKey": "tachograph-prod-db", "tenantProviderSettingKey": "kralowetz-tachograph-prod" }, "sourceGroup": { "type": "ORGANISATION", "sourceEntityId": "147", "code": "147", "name": "Kralowetz" }, "importScope": { "type": "SOURCE_ORGANISATION_SUBTREE", "rootSourceOrganisation": { "type": "ORGANISATION", "sourceEntityId": "147" }, "includeChildren": true, "occurredFrom": "2026-01-01T00:00:00+01:00", "occurredTo": "2026-02-01T00:00:00+01:00" }, "eventFamilies": [ "DRIVER_ACTIVITY", "DRIVER_CARD", "POSITION", "BORDER_CROSSING", "LOAD_UNLOAD", "PLACE", "SPECIFIC_CONDITION", "SPEEDING" ], "mode": "INITIAL_BACKFILL", "refreshMasterDataFirst": true, "acquisitionStrategy": "OCCURRED_AT_WINDOW_WITH_OVERLAP" } ``` The plan service chunks the occurred-time range using `eventhub.tachograph.default-chunk-days`. ### Regular update For regular updates, the preferred mode is: ```json { "mode": "INCREMENTAL_UPDATE", "refreshMasterDataFirst": true, "acquisitionStrategy": "SOURCE_PACKAGE_WATERMARK" } ``` This is preferred because newly imported original driver-card/VU packages can contain older occurredAt events. A simple occurredAt watermark would miss such late-arriving historical data. The `eventhub.import_cursor` table stores source-package, source-row and occurredAt fallback watermarks per tenant/source/scope/event family/source kind. ### Extraction route contract A future concrete SQL extraction route should do this: ```text planned data_package -> execute SQL for extraction_code and chunk/import scope -> map source rows to EventHubEventDto -> populate sourcePackageRef if source package metadata is available -> send to direct:eventhub-normalized-input -> only advance eventhub.import_cursor after successful import ``` ## Configurable scheduled tachograph imports The project now supports configuration-driven tachograph import plans. A configured plan describes: ```text tenant EventSource optional sourceGroup, e.g. tachograph root organisation ImportScope, including organisation subtree and occurred-time filter event families initial backfill strategy scheduled incremental strategy cron schedule ``` Example configuration is included in `application.yml` under: ```yaml eventhub: tachograph: scheduler-enabled: false scheduler-poll-interval-ms: 60000 scheduler-trigger-mode: PLAN_ONLY import-plans: - plan-key: kralowetz-tachograph-org-147 enabled: false cron: "0 15 * * * *" tenant-key: kralowetz event-source: provider-key: TACHOGRAPH source-kind: MIXED source-key: TACHOGRAPH_DB source-instance-key: tachograph-prod-db tenant-provider-setting-key: kralowetz-tachograph-prod import-scope: type: SOURCE_ORGANISATION_SUBTREE root-source-organisation: type: ORGANISATION source-entity-id: "147" include-children: true occurred-from: null occurred-to: null event-families: - DRIVER_ACTIVITY - DRIVER_CARD - POSITION initial-mode: INITIAL_BACKFILL scheduled-mode: INCREMENTAL_UPDATE initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP scheduled-strategy: SOURCE_PACKAGE_WATERMARK refresh-master-data-first: true initial-occurred-from: "2025-01-01T00:00:00+01:00" run-initial-on-startup: false ``` `PLAN_ONLY` creates an `import_run` plus planned extraction `data_package` rows. `EXECUTE` also invokes the configured `TachographExtractionBatchExecutor`. The generated project provides a no-op executor as an extension point; replace it with a SQL/JDBC extractor that reads the real tachograph DB. Configured plan endpoints: ```http GET /api/eventhub/acquisition/tachograph/imports/configured-plans GET /api/eventhub/acquisition/tachograph/imports/configured-plans/{planKey} POST /api/eventhub/acquisition/tachograph/imports/configured-plans/{planKey}/start?triggerMode=PLAN_ONLY POST /api/eventhub/acquisition/tachograph/imports/configured-plans/{planKey}/start?triggerMode=EXECUTE ``` Manual start from a configured plan can override mode and strategy: ```http POST /api/eventhub/acquisition/tachograph/imports/configured-plans/kralowetz-tachograph-org-147/start?mode=INCREMENTAL_UPDATE&strategy=SOURCE_PACKAGE_WATERMARK&triggerMode=PLAN_ONLY ``` ## Concrete extraction extension point The scheduler and import-run service are now implemented, but the generated skeleton still does not know the real tachograph DB SQL. The extension point is: ```java TachographExtractionBatchExecutor ``` Replace `NoopTachographExtractionBatchExecutor` with an implementation that: ```text 1. receives importRunId, packageId, TachographImportRequest, planItem and time chunk 2. uses planItem.extractionCode to select the SQL statement 3. applies importScope organisation and occurred-time filters 4. applies source-package watermark or source-row watermark for incremental updates 5. maps rows to EventHubEventDto 6. sets sourcePackageRef when the row can be traced to an original card/VU package 7. sends events to direct:eventhub-normalized-input or EventHubIngestionService 8. returns TachographExtractionBatchResultDto with cursor watermarks ``` The import cursor is advanced only when the executor reports `executed=true`. The default no-op executor returns `executed=false`, so it does not move cursors accidentally. ## JDBC tachograph extraction The first concrete extractor is `JdbcTachographExtractionBatchExecutor`. It is enabled only when `eventhub.tachograph.datasource.jdbc-url` is configured. Without that datasource, the application keeps using `NoopTachographExtractionBatchExecutor`. Currently implemented extraction definitions: ```text CARD_ACTIVITY -> DRIVER_ACTIVITY / DRIVER_CARD / CardActivity VU_ACTIVITY -> DRIVER_ACTIVITY / VEHICLE_UNIT / VUActivity CARD_VEHICLES_USED -> DRIVER_CARD / DRIVER_CARD / CardVehiclesUsed IW_CYCLE -> DRIVER_CARD / VEHICLE_UNIT / IWCycle CARD_BORDER_CROSSING -> BORDER_CROSSING / DRIVER_CARD / CardBorderCrossing VU_BORDER_CROSSING -> BORDER_CROSSING / VEHICLE_UNIT / VUBorderCrossing CARD_LOAD_UNLOAD -> LOAD_UNLOAD / DRIVER_CARD / CardLoadUnload VU_LOAD_UNLOAD -> LOAD_UNLOAD / VEHICLE_UNIT / VULoadUnload CARD_SPECIFIC_CONDITION -> SPECIFIC_CONDITION / DRIVER_CARD / CardSpecificCondition VU_SPECIFIC_CONDITION -> SPECIFIC_CONDITION / VEHICLE_UNIT / VUSpecificCondition ``` SQL resources: ```text src/main/resources/sql/tachograph/card-border-crossing.sql src/main/resources/sql/tachograph/card-load-unload.sql src/main/resources/sql/tachograph/card-specific-condition.sql src/main/resources/sql/tachograph/card-vehicles-used.sql src/main/resources/sql/tachograph/card-activity.sql src/main/resources/sql/tachograph/iw-cycle.sql src/main/resources/sql/tachograph/vu-border-crossing.sql src/main/resources/sql/tachograph/vu-load-unload.sql src/main/resources/sql/tachograph/vu-specific-condition.sql src/main/resources/sql/tachograph/vu-activity.sql ``` These files are the schema-specific layer. They must keep the documented column aliases because the row mappers consume those aliases to build `EventHubEventDto`, including `sourcePackageRef`, driver/vehicle references, event details and raw payload.