From c52712f8811e5cd310787ea70ae3e5a3e72cda1b Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:40:12 +0200 Subject: [PATCH] Add tachograph import run planning --- README.md | 143 +++++++++++++++ .../api/EventHubIngestionController.java | 15 +- .../camel/TachographImportRequestRoute.java | 16 +- .../eventhub/config/EventHubProperties.java | 29 ++++ .../eventhub/dto/DataPackageStatus.java | 2 + .../procon/eventhub/dto/EventHubEventDto.java | 6 + .../procon/eventhub/dto/ImportRunStatus.java | 8 + .../eventhub/dto/SourcePackageRefDto.java | 40 +++++ .../eventhub/dto/TachographImportPlanDto.java | 1 + .../dto/TachographImportPlanItemDto.java | 3 +- .../dto/TachographImportRunResultDto.java | 13 ++ .../at/procon/eventhub/dto/TimeChunkDto.java | 15 ++ .../persistence/DataPackageRepository.java | 160 +++++++++++++---- .../eventhub/persistence/EventRepository.java | 45 +++-- .../persistence/ImportRunRepository.java | 99 +++++++++++ .../TachographActivityEventMapper.java | 1 + .../TachographImportExecutionService.java | 164 ++++++++++++++++++ .../service/TachographImportPlanService.java | 84 +++++++-- .../TelematicsPositionEventMapper.java | 1 + .../YellowFoxD8BookingEventMapper.java | 1 + src/main/resources/application.yml | 3 + .../migration/V1__create_eventhub_schema.sql | 61 +++++-- .../EventAcquisitionRecordKeyServiceTest.java | 1 + 23 files changed, 817 insertions(+), 94 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/dto/ImportRunStatus.java create mode 100644 src/main/java/at/procon/eventhub/dto/SourcePackageRefDto.java create mode 100644 src/main/java/at/procon/eventhub/dto/TachographImportRunResultDto.java create mode 100644 src/main/java/at/procon/eventhub/dto/TimeChunkDto.java create mode 100644 src/main/java/at/procon/eventhub/persistence/ImportRunRepository.java create mode 100644 src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java diff --git a/README.md b/README.md index df58610..44270d8 100644 --- a/README.md +++ b/README.md @@ -568,3 +568,146 @@ order by occurred_at desc; 3. Implement initial backfill using organisation/time scope. 4. Implement incremental import using source-package watermark, with occurredAt overlap fallback. 5. Discuss query/read models later: source priority and gap filling across tachograph, YellowFox and other sources. + +## Implemented tachograph ingestion run model + +The tachograph import model now follows the agreed design: + +```text +Initial import + organisation subtree + occurredFrom/occurredTo + chunked by time and/or entity + idempotent inserts by sourceRecordKeyHash + +Regular update + refresh master data first + prefer discovery of new/changed original tachograph source packages + extract affected event families + import idempotently + advance cursor after successful extraction + +Package model + import_run = one execution of the tachograph importer + data_package = one EventHub extraction batch + sourcePackageRef = original tachograph driver-card/VU package reference + +Deduplication + no cross-source deduplication + sourceRecordKeyHash prevents same-source duplicate imports + eventSignatureHash is non-unique and only helps later query/projection logic +``` + +### Import run vs tachograph source package + +A tachograph database already contains packages from driver cards and vehicle-unit devices. The EventHub model does not force those original packages to become EventHub packages. Instead: + +```text +Original tachograph source package + card/VU package imported into tachograph DB + stored as sourcePackageRef when extracting events + +EventHub data package + extraction batch produced by an EventHub import run + grouped by tenant, EventSource, event family, extraction code and time chunk +``` + +This allows a single EventHub import run to process many original tachograph packages, and a single extraction batch may contain rows from many original packages. If the SQL extractor can return source package metadata, it should populate `EventHubEventDto.sourcePackageRef`: + +```json +"sourcePackageRef": { + "packageKind": "VEHICLE_UNIT", + "sourcePackageId": "VU-PACKAGE-12345", + "sourceEntityId": "vehicle-90021", + "packagePeriodFrom": "2026-04-01T00:00:00+02:00", + "packagePeriodTo": "2026-04-15T00:00:00+02:00", + "importedIntoSourceAt": "2026-04-16T10:30:00+02:00" +} +``` + +### Tachograph import endpoints + +`POST /api/eventhub/acquisition/tachograph/imports/plan` returns the calculated event-family extraction plan and time chunks. + +`POST /api/eventhub/acquisition/tachograph/imports/start` now creates: + +```text +1 import_run row +N planned data_package rows, one per extraction definition and time chunk +``` + +The SQL extraction routes are intentionally separated from run planning. They should pick planned extraction packages, execute the corresponding SQL, map rows to `EventHubEventDto`, set `sourcePackageRef` when known, and send them to `direct:eventhub-normalized-input`. + +### Initial import + +Example initial import request: + +```json +{ + "tenantKey": "kralowetz", + "eventSource": { + "providerKey": "TACHOGRAPH", + "sourceKind": "MIXED", + "sourceKey": "TACHOGRAPH_DB", + "sourceInstanceKey": "tachograph-prod-db", + "tenantProviderSettingKey": "kralowetz-tachograph-prod" + }, + "sourceGroup": { + "type": "ORGANISATION", + "sourceEntityId": "147", + "code": "147", + "name": "Kralowetz" + }, + "importScope": { + "type": "SOURCE_ORGANISATION_SUBTREE", + "rootSourceOrganisation": { + "type": "ORGANISATION", + "sourceEntityId": "147" + }, + "includeChildren": true, + "occurredFrom": "2026-01-01T00:00:00+01:00", + "occurredTo": "2026-02-01T00:00:00+01:00" + }, + "eventFamilies": [ + "DRIVER_ACTIVITY", + "DRIVER_CARD", + "POSITION", + "BORDER_CROSSING", + "LOAD_UNLOAD", + "PLACE", + "SPECIFIC_CONDITION", + "SPEEDING" + ], + "mode": "INITIAL_BACKFILL", + "refreshMasterDataFirst": true, + "acquisitionStrategy": "OCCURRED_AT_WINDOW_WITH_OVERLAP" +} +``` + +The plan service chunks the occurred-time range using `eventhub.tachograph.default-chunk-days`. + +### Regular update + +For regular updates, the preferred mode is: + +```json +{ + "mode": "INCREMENTAL_UPDATE", + "refreshMasterDataFirst": true, + "acquisitionStrategy": "SOURCE_PACKAGE_WATERMARK" +} +``` + +This is preferred because newly imported original driver-card/VU packages can contain older occurredAt events. A simple occurredAt watermark would miss such late-arriving historical data. The `eventhub.import_cursor` table stores source-package, source-row and occurredAt fallback watermarks per tenant/source/scope/event family/source kind. + +### Extraction route contract + +A future concrete SQL extraction route should do this: + +```text +planned data_package + -> execute SQL for extraction_code and chunk/import scope + -> map source rows to EventHubEventDto + -> populate sourcePackageRef if source package metadata is available + -> send to direct:eventhub-normalized-input + -> only advance eventhub.import_cursor after successful import +``` diff --git a/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java b/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java index d457c7e..25ccacc 100644 --- a/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java +++ b/src/main/java/at/procon/eventhub/api/EventHubIngestionController.java @@ -3,6 +3,7 @@ package at.procon.eventhub.api; import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubPackageIngestRequest; import at.procon.eventhub.dto.TachographImportRequest; +import at.procon.eventhub.dto.TachographImportRunResultDto; import at.procon.eventhub.dto.source.TachographActivityDto; import at.procon.eventhub.dto.source.TelematicsPositionDto; import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; @@ -53,13 +54,13 @@ public class EventHubIngestionController { } @PostMapping("/tachograph/imports/start") - public ResponseEntity> startTachographImport(@Valid @RequestBody TachographImportRequest request) { - producerTemplate.sendBody("direct:tachograph-import-start", request); - return ResponseEntity.accepted().body(Map.of( - "accepted", true, - "route", "direct:tachograph-import-start", - "note", "The current implementation prepares the tachograph import plan. SQL extraction routes are intentionally scaffolded as next step." - )); + public ResponseEntity startTachographImport(@Valid @RequestBody TachographImportRequest request) { + TachographImportRunResultDto result = producerTemplate.requestBody( + "direct:tachograph-import-start", + request, + TachographImportRunResultDto.class + ); + return ResponseEntity.accepted().body(result); } @PostMapping("/packages") diff --git a/src/main/java/at/procon/eventhub/camel/TachographImportRequestRoute.java b/src/main/java/at/procon/eventhub/camel/TachographImportRequestRoute.java index 9c55cc1..b4a9e5e 100644 --- a/src/main/java/at/procon/eventhub/camel/TachographImportRequestRoute.java +++ b/src/main/java/at/procon/eventhub/camel/TachographImportRequestRoute.java @@ -1,7 +1,7 @@ package at.procon.eventhub.camel; import at.procon.eventhub.dto.TachographImportRequest; -import at.procon.eventhub.service.TachographImportPlanService; +import at.procon.eventhub.service.TachographImportExecutionService; import org.apache.camel.builder.RouteBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,10 +12,10 @@ public class TachographImportRequestRoute extends RouteBuilder { private static final Logger log = LoggerFactory.getLogger(TachographImportRequestRoute.class); - private final TachographImportPlanService planService; + private final TachographImportExecutionService executionService; - public TachographImportRequestRoute(TachographImportPlanService planService) { - this.planService = planService; + public TachographImportRequestRoute(TachographImportExecutionService executionService) { + this.executionService = executionService; } @Override @@ -24,10 +24,10 @@ public class TachographImportRequestRoute extends RouteBuilder { .routeId("tachograph-import-start-route") .process(exchange -> { TachographImportRequest request = exchange.getMessage().getBody(TachographImportRequest.class); - var plan = planService.createPlan(request); - log.info("Prepared tachograph import plan tenant={} mode={} strategy={} scope={} itemCount={}", - plan.tenantKey(), plan.mode(), plan.acquisitionStrategy(), plan.importScope().stableKey(), plan.items().size()); - exchange.getMessage().setBody(plan); + var result = executionService.startImport(request); + log.info("Prepared tachograph import run importRunId={} plannedPackages={} status={}", + result.importRunId(), result.plannedPackageCount(), result.status()); + exchange.getMessage().setBody(result); }); } } diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index e115483..088f29d 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -7,11 +7,16 @@ import org.springframework.boot.context.properties.ConfigurationProperties; public class EventHubProperties { private final Batch batch = new Batch(); + private final Tachograph tachograph = new Tachograph(); public Batch getBatch() { return batch; } + public Tachograph getTachograph() { + return tachograph; + } + public static class Batch { /** Number of events collected before a package is persisted. */ private int completionSize = 1000; @@ -35,4 +40,28 @@ public class EventHubProperties { this.completionTimeout = completionTimeout; } } + + public static class Tachograph { + /** Default chunk size for initial/backfill occurred-time imports. */ + private int defaultChunkDays = 1; + + /** Overlap used by occurred-at fallback incremental imports. */ + private Duration occurredAtOverlap = Duration.ofDays(7); + + public int getDefaultChunkDays() { + return defaultChunkDays; + } + + public void setDefaultChunkDays(int defaultChunkDays) { + this.defaultChunkDays = Math.max(1, defaultChunkDays); + } + + public Duration getOccurredAtOverlap() { + return occurredAtOverlap; + } + + public void setOccurredAtOverlap(Duration occurredAtOverlap) { + this.occurredAtOverlap = occurredAtOverlap; + } + } } diff --git a/src/main/java/at/procon/eventhub/dto/DataPackageStatus.java b/src/main/java/at/procon/eventhub/dto/DataPackageStatus.java index f6e3495..c49ac85 100644 --- a/src/main/java/at/procon/eventhub/dto/DataPackageStatus.java +++ b/src/main/java/at/procon/eventhub/dto/DataPackageStatus.java @@ -1,7 +1,9 @@ package at.procon.eventhub.dto; public enum DataPackageStatus { + PLANNED, IMPORTING, IMPORTED, + EMPTY, FAILED } diff --git a/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java b/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java index 9e11d51..c1eff55 100644 --- a/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java +++ b/src/main/java/at/procon/eventhub/dto/EventHubEventDto.java @@ -40,6 +40,9 @@ public record EventHubEventDto( /** Normalized semantic details depending on eventDomain/eventType. */ @Valid EventDetailsDto eventDetails, + /** Optional reference to the original source package/card/VU download containing this source record. */ + @Valid SourcePackageRefDto sourcePackageRef, + /** Raw/provider-specific payload, stored as real JSON and not as encoded JSON string. */ JsonNode payload, @@ -71,6 +74,7 @@ public record EventHubEventDto( odometerM, position, eventDetails, + sourcePackageRef, payload, manualEntry, packageInfo @@ -92,6 +96,7 @@ public record EventHubEventDto( odometerM, position, eventDetails, + sourcePackageRef, payload, manualEntry, packageInfo @@ -113,6 +118,7 @@ public record EventHubEventDto( odometerM, position, eventDetails, + sourcePackageRef, payload, manualEntry, value diff --git a/src/main/java/at/procon/eventhub/dto/ImportRunStatus.java b/src/main/java/at/procon/eventhub/dto/ImportRunStatus.java new file mode 100644 index 0000000..760de05 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/ImportRunStatus.java @@ -0,0 +1,8 @@ +package at.procon.eventhub.dto; + +public enum ImportRunStatus { + PLANNED, + RUNNING, + COMPLETED, + FAILED +} diff --git a/src/main/java/at/procon/eventhub/dto/SourcePackageRefDto.java b/src/main/java/at/procon/eventhub/dto/SourcePackageRefDto.java new file mode 100644 index 0000000..4614328 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/SourcePackageRefDto.java @@ -0,0 +1,40 @@ +package at.procon.eventhub.dto; + +import java.time.OffsetDateTime; + +/** + * Reference to the original source package from which a source record was read. + * + * For tachograph data this is the original driver-card or vehicle-unit package + * already imported into the tachograph DB. EventHub data packages are extraction + * batches and are intentionally not forced to be identical to this source package. + */ +public record SourcePackageRefDto( + String packageKind, + String sourcePackageId, + String sourceEntityId, + OffsetDateTime packagePeriodFrom, + OffsetDateTime packagePeriodTo, + OffsetDateTime importedIntoSourceAt +) { + public SourcePackageRefDto { + packageKind = normalize(packageKind); + sourcePackageId = normalizeNullable(sourcePackageId); + sourceEntityId = normalizeNullable(sourceEntityId); + } + + public boolean hasAnyReference() { + return sourcePackageId != null || sourceEntityId != null || importedIntoSourceAt != null; + } + + private static String normalize(String value) { + if (value == null || value.isBlank()) { + return value; + } + return value.trim().toUpperCase().replace('-', '_').replace(' ', '_'); + } + + private static String normalizeNullable(String value) { + return value == null || value.isBlank() ? null : value.trim(); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/TachographImportPlanDto.java b/src/main/java/at/procon/eventhub/dto/TachographImportPlanDto.java index 7354ebd..f1fc876 100644 --- a/src/main/java/at/procon/eventhub/dto/TachographImportPlanDto.java +++ b/src/main/java/at/procon/eventhub/dto/TachographImportPlanDto.java @@ -10,6 +10,7 @@ public record TachographImportPlanDto( ImportScopeDto importScope, SourceGroupRefDto sourceGroup, EventSourceDto eventSource, + List chunks, List items ) { } diff --git a/src/main/java/at/procon/eventhub/dto/TachographImportPlanItemDto.java b/src/main/java/at/procon/eventhub/dto/TachographImportPlanItemDto.java index 9dac5e4..4ab35aa 100644 --- a/src/main/java/at/procon/eventhub/dto/TachographImportPlanItemDto.java +++ b/src/main/java/at/procon/eventhub/dto/TachographImportPlanItemDto.java @@ -8,6 +8,7 @@ public record TachographImportPlanItemDto( String extractionCode, List sourceTables, String entityAxis, - String description + String description, + AcquisitionStrategy preferredStrategy ) { } diff --git a/src/main/java/at/procon/eventhub/dto/TachographImportRunResultDto.java b/src/main/java/at/procon/eventhub/dto/TachographImportRunResultDto.java new file mode 100644 index 0000000..b5573e3 --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/TachographImportRunResultDto.java @@ -0,0 +1,13 @@ +package at.procon.eventhub.dto; + +import java.util.List; +import java.util.UUID; + +public record TachographImportRunResultDto( + UUID importRunId, + ImportRunStatus status, + int plannedPackageCount, + TachographImportPlanDto plan, + List plannedPackageIds +) { +} diff --git a/src/main/java/at/procon/eventhub/dto/TimeChunkDto.java b/src/main/java/at/procon/eventhub/dto/TimeChunkDto.java new file mode 100644 index 0000000..d483a7a --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/TimeChunkDto.java @@ -0,0 +1,15 @@ +package at.procon.eventhub.dto; + +import java.time.OffsetDateTime; + +public record TimeChunkDto( + int sequence, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo +) { + public String stableKey() { + String from = occurredFrom == null ? "MIN" : occurredFrom.toString(); + String to = occurredTo == null ? "MAX" : occurredTo.toString(); + return sequence + ":" + from + ":" + to; + } +} diff --git a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java index faba112..7fef2f9 100644 --- a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java @@ -5,6 +5,9 @@ import at.procon.eventhub.dto.DataPackageType; import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.SourceGroupRefDto; +import at.procon.eventhub.dto.SourcePackageRefDto; +import at.procon.eventhub.dto.TachographImportPlanItemDto; +import at.procon.eventhub.dto.TimeChunkDto; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.OffsetDateTime; @@ -32,48 +35,139 @@ public class DataPackageRepository { OffsetDateTime occurredFrom, OffsetDateTime occurredTo, Map metadata + ) { + return insertPackage( + eventSourceId, + null, + packageKey, + packageInfo, + packageType, + DataPackageStatus.IMPORTING, + occurredFrom, + occurredTo, + null, + null, + null, + null, + null, + null, + metadata + ); + } + + public UUID createPlannedExtractionPackage( + UUID importRunId, + int eventSourceId, + String packageKey, + EventHubPackageRequest packageInfo, + TachographImportPlanItemDto planItem, + TimeChunkDto chunk, + int batchNo, + Map metadata + ) { + return insertPackage( + eventSourceId, + importRunId, + packageKey, + packageInfo, + DataPackageType.DB_EXTRACT, + DataPackageStatus.PLANNED, + chunk == null ? null : chunk.occurredFrom(), + chunk == null ? null : chunk.occurredTo(), + planItem, + batchNo, + chunk == null ? null : chunk.occurredFrom(), + chunk == null ? null : chunk.occurredTo(), + null, + null, + metadata + ); + } + + private UUID insertPackage( + int eventSourceId, + UUID importRunId, + String packageKey, + EventHubPackageRequest packageInfo, + DataPackageType packageType, + DataPackageStatus status, + OffsetDateTime occurredFrom, + OffsetDateTime occurredTo, + TachographImportPlanItemDto planItem, + Integer batchNo, + OffsetDateTime chunkFrom, + OffsetDateTime chunkTo, + SourcePackageRefDto sourcePackageRef, + Integer eventCount, + Map metadata ) { UUID id = UUID.randomUUID(); SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup(); ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope(); SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation(); - jdbcTemplate.update( - """ - insert into eventhub.data_package( - id, event_source_id, tenant_key, package_key, package_type, status, - source_group_type, source_group_entity_id, source_group_code, source_group_name, - import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name, - include_children, occurred_from, occurred_to, - event_family, business_date, external_package_id, - received_at, event_count, metadata - ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), 0, ?::jsonb) - """, - ps -> { + return jdbcTemplate.query( + con -> { + var ps = con.prepareStatement(""" + insert into eventhub.data_package( + id, event_source_id, import_run_id, tenant_key, package_key, package_type, status, + source_group_type, source_group_entity_id, source_group_code, source_group_name, + import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name, + include_children, occurred_from, occurred_to, + event_family, business_date, external_package_id, + extraction_code, extraction_source_kind, entity_axis, batch_no, chunk_from, chunk_to, + source_package_kind, source_package_id, source_package_entity_id, + source_package_period_from, source_package_period_to, source_package_imported_at, + received_at, event_count, metadata + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), ?, ?::jsonb) + on conflict (tenant_key, event_source_id, package_key) do update + set metadata = excluded.metadata + returning id + """); ps.setObject(1, id); ps.setInt(2, eventSourceId); - ps.setString(3, packageInfo == null ? "default" : packageInfo.tenantKey()); - ps.setString(4, packageKey); - ps.setString(5, packageType.name()); - ps.setString(6, DataPackageStatus.IMPORTING.name()); - ps.setString(7, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name()); - ps.setString(8, sourceGroup == null ? null : sourceGroup.sourceEntityId()); - ps.setString(9, sourceGroup == null ? null : sourceGroup.code()); - ps.setString(10, sourceGroup == null ? null : sourceGroup.name()); - ps.setString(11, importScope == null ? null : importScope.type().name()); - ps.setString(12, rootOrg == null ? null : rootOrg.sourceEntityId()); - ps.setString(13, rootOrg == null ? null : rootOrg.code()); - ps.setString(14, rootOrg == null ? null : rootOrg.name()); - ps.setBoolean(15, importScope != null && importScope.includeChildren()); - ps.setObject(16, occurredFrom); - ps.setObject(17, occurredTo); - ps.setString(18, packageInfo == null ? null : packageInfo.eventFamily()); - ps.setObject(19, packageInfo == null ? null : packageInfo.businessDate()); - ps.setString(20, packageInfo == null ? packageKey : packageInfo.externalPackageId()); - ps.setString(21, toJson(metadata)); + ps.setObject(3, importRunId); + ps.setString(4, packageInfo == null ? "default" : packageInfo.tenantKey()); + ps.setString(5, packageKey); + ps.setString(6, packageType.name()); + ps.setString(7, status.name()); + ps.setString(8, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name()); + ps.setString(9, sourceGroup == null ? null : sourceGroup.sourceEntityId()); + ps.setString(10, sourceGroup == null ? null : sourceGroup.code()); + ps.setString(11, sourceGroup == null ? null : sourceGroup.name()); + ps.setString(12, importScope == null || importScope.type() == null ? null : importScope.type().name()); + ps.setString(13, rootOrg == null ? null : rootOrg.sourceEntityId()); + ps.setString(14, rootOrg == null ? null : rootOrg.code()); + ps.setString(15, rootOrg == null ? null : rootOrg.name()); + ps.setBoolean(16, importScope != null && importScope.includeChildren()); + ps.setObject(17, occurredFrom); + ps.setObject(18, occurredTo); + ps.setString(19, packageInfo == null ? null : packageInfo.eventFamily()); + ps.setObject(20, packageInfo == null ? null : packageInfo.businessDate()); + ps.setString(21, packageInfo == null ? packageKey : packageInfo.externalPackageId()); + ps.setString(22, planItem == null ? null : planItem.extractionCode()); + ps.setString(23, planItem == null ? null : planItem.sourceKind()); + ps.setString(24, planItem == null ? null : planItem.entityAxis()); + ps.setObject(25, batchNo); + ps.setObject(26, chunkFrom); + ps.setObject(27, chunkTo); + ps.setString(28, sourcePackageRef == null ? null : sourcePackageRef.packageKind()); + ps.setString(29, sourcePackageRef == null ? null : sourcePackageRef.sourcePackageId()); + ps.setString(30, sourcePackageRef == null ? null : sourcePackageRef.sourceEntityId()); + ps.setObject(31, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodFrom()); + ps.setObject(32, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodTo()); + ps.setObject(33, sourcePackageRef == null ? null : sourcePackageRef.importedIntoSourceAt()); + ps.setInt(34, eventCount == null ? 0 : eventCount); + ps.setString(35, toJson(metadata)); + return ps; + }, + rs -> { + if (!rs.next()) { + throw new IllegalStateException("Could not create or resolve data package " + packageKey); + } + return (UUID) rs.getObject(1); } ); - return id; } public void markImported(UUID packageId, int insertedCount) { @@ -83,7 +177,7 @@ public class DataPackageRepository { set status = ?, event_count = ?, completed_at = now() where id = ? """, - DataPackageStatus.IMPORTED.name(), + insertedCount == 0 ? DataPackageStatus.EMPTY.name() : DataPackageStatus.IMPORTED.name(), insertedCount, packageId ); diff --git a/src/main/java/at/procon/eventhub/persistence/EventRepository.java b/src/main/java/at/procon/eventhub/persistence/EventRepository.java index 3320deb..30f7b90 100644 --- a/src/main/java/at/procon/eventhub/persistence/EventRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/EventRepository.java @@ -3,6 +3,7 @@ package at.procon.eventhub.persistence; import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.SourcePackageRefDto; import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto; import at.procon.eventhub.service.EventAcquisitionRecordKeyService; @@ -46,6 +47,8 @@ public class EventRepository { external_source_event_id, driver_source_entity_id, driver_card_nation, driver_card_number, vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, + source_package_kind, source_package_id, source_package_entity_id, + source_package_period_from, source_package_period_to, source_package_imported_at, occurred_at, received_partner_at, received_hub_at, event_domain, event_type, lifecycle, odometer_m, latitude, longitude, @@ -59,6 +62,8 @@ public class EventRepository { ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, + ?, ?, ?, ?::jsonb, ?::jsonb, ?, ?, ? ) @@ -74,6 +79,7 @@ public class EventRepository { DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard(); VehicleRefDto vehicleRef = event.vehicleRef(); VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration(); + SourcePackageRefDto sourcePackageRef = event.sourcePackageRef(); ps.setObject(1, eventId); ps.setInt(2, eventSourceId); @@ -89,25 +95,32 @@ public class EventRepository { ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation()); ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number()); - ps.setObject(12, event.occurredAt()); - ps.setObject(13, event.receivedPartnerAt()); - ps.setObject(14, receivedHubAt); - ps.setString(15, event.eventDomain().name()); - ps.setString(16, event.eventType().name()); - ps.setString(17, event.lifecycle().name()); - setNullableLong(ps, 18, event.odometerM()); + ps.setString(12, sourcePackageRef == null ? null : sourcePackageRef.packageKind()); + ps.setString(13, sourcePackageRef == null ? null : sourcePackageRef.sourcePackageId()); + ps.setString(14, sourcePackageRef == null ? null : sourcePackageRef.sourceEntityId()); + ps.setObject(15, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodFrom()); + ps.setObject(16, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodTo()); + ps.setObject(17, sourcePackageRef == null ? null : sourcePackageRef.importedIntoSourceAt()); + + ps.setObject(18, event.occurredAt()); + ps.setObject(19, event.receivedPartnerAt()); + ps.setObject(20, receivedHubAt); + ps.setString(21, event.eventDomain().name()); + ps.setString(22, event.eventType().name()); + ps.setString(23, event.lifecycle().name()); + setNullableLong(ps, 24, event.odometerM()); if (event.position() == null) { - ps.setNull(19, Types.NUMERIC); - ps.setNull(20, Types.NUMERIC); + ps.setNull(25, Types.NUMERIC); + ps.setNull(26, Types.NUMERIC); } else { - ps.setObject(19, event.position().latitude()); - ps.setObject(20, event.position().longitude()); + ps.setObject(25, event.position().latitude()); + ps.setObject(26, event.position().longitude()); } - ps.setString(21, toJson(objectMapper.valueToTree(event.eventDetails()))); - ps.setString(22, toJson(event.payload())); - ps.setBoolean(23, event.manualEntry()); - ps.setString(24, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId)); - ps.setString(25, recordKeyService.buildEventSignatureHash(event)); + ps.setString(27, toJson(objectMapper.valueToTree(event.eventDetails()))); + ps.setString(28, toJson(event.payload())); + ps.setBoolean(29, event.manualEntry()); + ps.setString(30, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId)); + ps.setString(31, recordKeyService.buildEventSignatureHash(event)); } @Override diff --git a/src/main/java/at/procon/eventhub/persistence/ImportRunRepository.java b/src/main/java/at/procon/eventhub/persistence/ImportRunRepository.java new file mode 100644 index 0000000..d32b599 --- /dev/null +++ b/src/main/java/at/procon/eventhub/persistence/ImportRunRepository.java @@ -0,0 +1,99 @@ +package at.procon.eventhub.persistence; + +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportRunStatus; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import at.procon.eventhub.dto.TachographImportRequest; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.Array; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map; +import java.util.UUID; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +public class ImportRunRepository { + + private final JdbcTemplate jdbcTemplate; + private final ObjectMapper objectMapper; + + public ImportRunRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) { + this.jdbcTemplate = jdbcTemplate; + this.objectMapper = objectMapper; + } + + public UUID createPlannedRun(int eventSourceId, TachographImportRequest request, Map metadata) { + UUID id = UUID.randomUUID(); + SourceGroupRefDto sourceGroup = request.sourceGroup(); + ImportScopeDto importScope = request.importScope(); + SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation(); + EventSourceDto eventSource = request.eventSource(); + + jdbcTemplate.update(con -> { + var ps = con.prepareStatement(""" + insert into eventhub.import_run( + id, tenant_key, event_source_id, mode, status, refresh_master_data_first, + source_group_type, source_group_entity_id, source_group_code, source_group_name, + import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name, + include_children, occurred_from, occurred_to, + requested_event_families, acquisition_strategy, metadata, planned_package_count + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, 0) + """); + ps.setObject(1, id); + ps.setString(2, request.tenantKey()); + ps.setInt(3, eventSourceId); + ps.setString(4, request.mode().name()); + ps.setString(5, ImportRunStatus.PLANNED.name()); + ps.setBoolean(6, request.refreshMasterDataFirst()); + ps.setString(7, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name()); + ps.setString(8, sourceGroup == null ? null : sourceGroup.sourceEntityId()); + ps.setString(9, sourceGroup == null ? null : sourceGroup.code()); + ps.setString(10, sourceGroup == null ? null : sourceGroup.name()); + ps.setString(11, importScope == null || importScope.type() == null ? null : importScope.type().name()); + ps.setString(12, rootOrg == null ? null : rootOrg.sourceEntityId()); + ps.setString(13, rootOrg == null ? null : rootOrg.code()); + ps.setString(14, rootOrg == null ? null : rootOrg.name()); + ps.setBoolean(15, importScope != null && importScope.includeChildren()); + ps.setObject(16, importScope == null ? null : importScope.occurredFrom()); + ps.setObject(17, importScope == null ? null : importScope.occurredTo()); + ps.setArray(18, eventFamilyArray(con, request)); + ps.setString(19, request.acquisitionStrategy().name()); + ps.setString(20, toJson(metadata == null ? Map.of() : metadata)); + return ps; + }); + return id; + } + + public void markRunning(UUID id) { + jdbcTemplate.update("update eventhub.import_run set status = ? where id = ?", ImportRunStatus.RUNNING.name(), id); + } + + public void markPlannedPackages(UUID id, int plannedPackageCount) { + jdbcTemplate.update("update eventhub.import_run set planned_package_count = ? where id = ?", plannedPackageCount, id); + } + + public void markCompleted(UUID id) { + jdbcTemplate.update("update eventhub.import_run set status = ?, finished_at = now() where id = ?", ImportRunStatus.COMPLETED.name(), id); + } + + public void markFailed(UUID id, String errorMessage) { + jdbcTemplate.update("update eventhub.import_run set status = ?, error_message = ?, finished_at = now() where id = ?", ImportRunStatus.FAILED.name(), errorMessage, id); + } + + private Array eventFamilyArray(Connection con, TachographImportRequest request) throws SQLException { + String[] values = request.eventFamilies().stream().map(Enum::name).toArray(String[]::new); + return con.createArrayOf("text", values); + } + + private String toJson(Map value) { + try { + return objectMapper.writeValueAsString(value == null ? Map.of() : value); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Cannot serialize import run metadata", e); + } + } +} diff --git a/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java b/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java index 3715659..f1aef55 100644 --- a/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java +++ b/src/main/java/at/procon/eventhub/service/TachographActivityEventMapper.java @@ -64,6 +64,7 @@ public class TachographActivityEventMapper { null, null, detailsFactory.driverActivity(source.cardSlot(), source.cardStatus(), source.drivingStatus()), + null, detailsFactory.payloadFromMap(source.payload()), false, packageInfo diff --git a/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java b/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java new file mode 100644 index 0000000..1aa85b6 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java @@ -0,0 +1,164 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.DataPackageType; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportRunStatus; +import at.procon.eventhub.dto.TachographImportPlanDto; +import at.procon.eventhub.dto.TachographImportPlanItemDto; +import at.procon.eventhub.dto.TachographImportRequest; +import at.procon.eventhub.dto.TachographImportRunResultDto; +import at.procon.eventhub.dto.TimeChunkDto; +import at.procon.eventhub.persistence.DataPackageRepository; +import at.procon.eventhub.persistence.EventSourceRepository; +import at.procon.eventhub.persistence.ImportRunRepository; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +/** + * Creates import runs and extraction data packages for tachograph acquisition. + * + * This service deliberately creates EventHub packages for extraction batches. The + * original tachograph card/VU package is not treated as the EventHub package; it + * is preserved later as SourcePackageRefDto on acquired events or in batch + * metadata when an extractor processes one concrete source package. + */ +@Service +public class TachographImportExecutionService { + + private static final Logger log = LoggerFactory.getLogger(TachographImportExecutionService.class); + + private final TachographImportPlanService planService; + private final EventSourceRepository eventSourceRepository; + private final ImportRunRepository importRunRepository; + private final DataPackageRepository dataPackageRepository; + + public TachographImportExecutionService( + TachographImportPlanService planService, + EventSourceRepository eventSourceRepository, + ImportRunRepository importRunRepository, + DataPackageRepository dataPackageRepository + ) { + this.planService = planService; + this.eventSourceRepository = eventSourceRepository; + this.importRunRepository = importRunRepository; + this.dataPackageRepository = dataPackageRepository; + } + + @Transactional + public TachographImportRunResultDto startImport(TachographImportRequest request) { + TachographImportPlanDto plan = planService.createPlan(request); + int baseEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), request.eventSource()); + UUID importRunId = importRunRepository.createPlannedRun(baseEventSourceId, request, Map.of( + "note", "Created tachograph import run and planned extraction packages. SQL extraction is handled by event-family routes.", + "packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto." + )); + + List packageIds = new ArrayList<>(); + int batchNo = 1; + try { + for (TachographImportPlanItemDto item : plan.items()) { + EventSourceDto itemEventSource = eventSourceForItem(request.eventSource(), item); + int itemEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), itemEventSource); + for (TimeChunkDto chunk : plan.chunks()) { + EventHubPackageRequest packageInfo = packageRequestFor(request, itemEventSource, item, chunk); + String packageKey = packageKey(importRunId, packageInfo, item, chunk, batchNo); + UUID packageId = dataPackageRepository.createPlannedExtractionPackage( + importRunId, + itemEventSourceId, + packageKey, + packageInfo, + item, + chunk, + batchNo, + metadata(request, item, chunk, importRunId) + ); + packageIds.add(packageId); + batchNo++; + } + } + importRunRepository.markPlannedPackages(importRunId, packageIds.size()); + log.info("Created tachograph import run importRunId={} plannedPackages={} tenant={} mode={} strategy={}", + importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy()); + return new TachographImportRunResultDto(importRunId, ImportRunStatus.PLANNED, packageIds.size(), plan, List.copyOf(packageIds)); + } catch (RuntimeException ex) { + importRunRepository.markFailed(importRunId, ex.getMessage()); + throw ex; + } + } + + private EventSourceDto eventSourceForItem(EventSourceDto base, TachographImportPlanItemDto item) { + String sourceKind = item.sourceKind(); + String sourceKey = switch (sourceKind) { + case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT"; + case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD"; + default -> base.sourceKey(); + }; + return new EventSourceDto( + base.providerKey(), + sourceKind, + sourceKey, + base.sourceInstanceKey(), + base.tenantProviderSettingKey(), + base.externalFleetKey() + ); + } + + private EventHubPackageRequest packageRequestFor( + TachographImportRequest request, + EventSourceDto itemEventSource, + TachographImportPlanItemDto item, + TimeChunkDto chunk + ) { + return new EventHubPackageRequest( + request.tenantKey(), + itemEventSource, + request.sourceGroup(), + request.importScope(), + item.eventFamily().name(), + null, + externalPackageId(request, item, chunk) + ); + } + + private String externalPackageId(TachographImportRequest request, TachographImportPlanItemDto item, TimeChunkDto chunk) { + String scope = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(); + return "TACHOGRAPH:" + item.sourceKind() + ":" + item.extractionCode() + ":" + item.eventFamily() + + ":" + scope + ":CHUNK-" + chunk.sequence(); + } + + private String packageKey(UUID importRunId, EventHubPackageRequest packageInfo, TachographImportPlanItemDto item, TimeChunkDto chunk, int batchNo) { + return packageInfo.tenantKey() + + ":" + packageInfo.eventSource().stableKey() + + ":" + item.eventFamily() + + ":" + item.extractionCode() + + ":RUN-" + importRunId + + ":CHUNK-" + chunk.sequence() + + ":BATCH-" + batchNo; + } + + private Map metadata(TachographImportRequest request, TachographImportPlanItemDto item, TimeChunkDto chunk, UUID importRunId) { + Map metadata = new LinkedHashMap<>(); + metadata.put("importRunId", importRunId.toString()); + metadata.put("mode", request.mode().name()); + metadata.put("acquisitionStrategy", request.acquisitionStrategy().name()); + metadata.put("refreshMasterDataFirst", request.refreshMasterDataFirst()); + metadata.put("eventFamily", item.eventFamily().name()); + metadata.put("sourceKind", item.sourceKind()); + metadata.put("extractionCode", item.extractionCode()); + metadata.put("sourceTables", item.sourceTables()); + metadata.put("entityAxis", item.entityAxis()); + metadata.put("chunkSequence", chunk.sequence()); + metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString()); + metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString()); + metadata.put("sourcePackageRefPolicy", "Original tachograph card/VU package is preserved per acquired event when SQL extraction returns it."); + return metadata; + } +} diff --git a/src/main/java/at/procon/eventhub/service/TachographImportPlanService.java b/src/main/java/at/procon/eventhub/service/TachographImportPlanService.java index 6c7c2ae..213a113 100644 --- a/src/main/java/at/procon/eventhub/service/TachographImportPlanService.java +++ b/src/main/java/at/procon/eventhub/service/TachographImportPlanService.java @@ -1,9 +1,15 @@ package at.procon.eventhub.service; +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.TachographImportPlanDto; import at.procon.eventhub.dto.TachographImportPlanItemDto; import at.procon.eventhub.dto.TachographImportRequest; +import at.procon.eventhub.dto.TimeChunkDto; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; import org.springframework.stereotype.Service; @@ -11,10 +17,16 @@ import org.springframework.stereotype.Service; @Service public class TachographImportPlanService { + private final EventHubProperties properties; + + public TachographImportPlanService(EventHubProperties properties) { + this.properties = properties; + } + public TachographImportPlanDto createPlan(TachographImportRequest request) { List items = new ArrayList<>(); for (EventFamily family : request.eventFamilies()) { - items.addAll(itemsFor(family)); + items.addAll(itemsFor(family, request.acquisitionStrategy())); } return new TachographImportPlanDto( request.tenantKey(), @@ -24,42 +36,75 @@ public class TachographImportPlanService { request.importScope(), request.sourceGroup(), request.eventSource(), + chunksFor(request), items ); } - private List itemsFor(EventFamily family) { + private List chunksFor(TachographImportRequest request) { + ImportScopeDto scope = request.importScope(); + OffsetDateTime from = scope == null ? null : scope.occurredFrom(); + OffsetDateTime to = scope == null ? null : scope.occurredTo(); + + // Source-package driven increments discover original card/VU packages by source-package + // watermark. The occurred window may be null because package period and imported-at + // timestamps are used later by the extractor. + if (request.mode() == ImportMode.INCREMENTAL_UPDATE + && request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) { + return List.of(new TimeChunkDto(1, from, to)); + } + + if (from == null || to == null) { + return List.of(new TimeChunkDto(1, from, to)); + } + + List chunks = new ArrayList<>(); + int days = Math.max(1, properties.getTachograph().getDefaultChunkDays()); + OffsetDateTime cursor = from; + int sequence = 1; + while (cursor.isBefore(to)) { + OffsetDateTime next = cursor.plusDays(days); + if (next.isAfter(to)) { + next = to; + } + chunks.add(new TimeChunkDto(sequence++, cursor, next)); + cursor = next; + } + return chunks.isEmpty() ? List.of(new TimeChunkDto(1, from, to)) : chunks; + } + + private List itemsFor(EventFamily family, AcquisitionStrategy strategy) { return switch (family) { case DRIVER_ACTIVITY -> List.of( - item(family, "VEHICLE_UNIT", "VU_ACTIVITY", List.of("VUActivity"), "VEHICLE", "Vehicle-unit driver activity point events"), - item(family, "DRIVER_CARD", "CARD_ACTIVITY", List.of("CardActivity"), "DRIVER", "Driver-card activity point events") + item(family, "VEHICLE_UNIT", "VU_ACTIVITY", List.of("VUActivity"), "VEHICLE", "Vehicle-unit driver activity point events", strategy), + item(family, "DRIVER_CARD", "CARD_ACTIVITY", List.of("CardActivity"), "DRIVER", "Driver-card activity point events", strategy) ); case DRIVER_CARD -> List.of( - item(family, "VEHICLE_UNIT", "IW_CYCLE", List.of("IWCycle"), "BOTH", "Card insert/withdraw events from VU cycles"), - item(family, "DRIVER_CARD", "CARD_VEHICLES_USED", List.of("CardVehiclesUsed"), "DRIVER", "Card insert/withdraw/use events from card vehicle usage") + item(family, "VEHICLE_UNIT", "IW_CYCLE", List.of("IWCycle"), "BOTH", "Card insert/withdraw events from VU cycles", strategy), + item(family, "DRIVER_CARD", "CARD_VEHICLES_USED", List.of("CardVehiclesUsed"), "DRIVER", "Card insert/withdraw/use events from card vehicle usage", strategy) ); case POSITION -> List.of( - item(family, "VEHICLE_UNIT", "VU_POSITION", List.of("VUPlaces", "VULoadUnload", "VUGnssAccumulatedDriving", "VUBorderCrossing"), "VEHICLE", "Position points from VU tachograph sources"), - item(family, "DRIVER_CARD", "CARD_POSITION", List.of("CardPlaces", "CardLoadUnload", "CardGnssAccumulatedDriving", "CardBorderCrossing"), "DRIVER", "Position points from driver-card tachograph sources") + item(family, "VEHICLE_UNIT", "VU_POSITION", List.of("VUPlaces", "VULoadUnload", "VUGnssAccumulatedDriving", "VUBorderCrossing"), "VEHICLE", "Position points from VU tachograph sources", strategy), + item(family, "DRIVER_CARD", "CARD_POSITION", List.of("CardPlaces", "CardLoadUnload", "CardGnssAccumulatedDriving", "CardBorderCrossing"), "DRIVER", "Position points from driver-card tachograph sources", strategy) ); case BORDER_CROSSING -> List.of( - item(family, "VEHICLE_UNIT", "VU_BORDER_CROSSING", List.of("VUBorderCrossing"), "VEHICLE", "Border crossing events from VU"), - item(family, "DRIVER_CARD", "CARD_BORDER_CROSSING", List.of("CardBorderCrossing"), "DRIVER", "Border crossing events from driver card") + item(family, "VEHICLE_UNIT", "VU_BORDER_CROSSING", List.of("VUBorderCrossing"), "VEHICLE", "Border crossing events from VU", strategy), + item(family, "DRIVER_CARD", "CARD_BORDER_CROSSING", List.of("CardBorderCrossing"), "DRIVER", "Border crossing events from driver card", strategy) ); case LOAD_UNLOAD -> List.of( - item(family, "VEHICLE_UNIT", "VU_LOAD_UNLOAD", List.of("VULoadUnload"), "VEHICLE", "Load/unload operation events from VU"), - item(family, "DRIVER_CARD", "CARD_LOAD_UNLOAD", List.of("CardLoadUnload"), "DRIVER", "Load/unload operation events from driver card") + item(family, "VEHICLE_UNIT", "VU_LOAD_UNLOAD", List.of("VULoadUnload"), "VEHICLE", "Load/unload operation events from VU", strategy), + item(family, "DRIVER_CARD", "CARD_LOAD_UNLOAD", List.of("CardLoadUnload"), "DRIVER", "Load/unload operation events from driver card", strategy) ); case SPECIFIC_CONDITION -> List.of( - item(family, "VEHICLE_UNIT", "VU_SPECIFIC_CONDITION", List.of("VUSpecificCondition"), "VEHICLE", "Out-of-scope and ferry/train events from VU"), - item(family, "DRIVER_CARD", "CARD_SPECIFIC_CONDITION", List.of("CardSpecificCondition"), "DRIVER", "Out-of-scope and ferry/train events from driver card") + item(family, "VEHICLE_UNIT", "VU_SPECIFIC_CONDITION", List.of("VUSpecificCondition"), "VEHICLE", "Out-of-scope and ferry/train events from VU", strategy), + item(family, "DRIVER_CARD", "CARD_SPECIFIC_CONDITION", List.of("CardSpecificCondition"), "DRIVER", "Out-of-scope and ferry/train events from driver card", strategy) ); case PLACE -> List.of( - item(family, "VEHICLE_UNIT", "VU_PLACE", List.of("VUPlaces"), "VEHICLE", "Start/end place events from VU"), - item(family, "DRIVER_CARD", "CARD_PLACE", List.of("CardPlaces"), "DRIVER", "Start/end place events from driver card") + item(family, "VEHICLE_UNIT", "VU_PLACE", List.of("VUPlaces"), "VEHICLE", "Start/end place events from VU", strategy), + item(family, "DRIVER_CARD", "CARD_PLACE", List.of("CardPlaces"), "DRIVER", "Start/end place events from driver card", strategy) ); case SPEEDING -> List.of( - item(family, "VEHICLE_UNIT", "SPEEDING_EVENTS", List.of("SpeedingEvents"), "VEHICLE", "Speeding begin/end events") + item(family, "VEHICLE_UNIT", "SPEEDING_EVENTS", List.of("SpeedingEvents"), "VEHICLE", "Speeding begin/end events", strategy) ); }; } @@ -70,8 +115,9 @@ public class TachographImportPlanService { String extractionCode, List sourceTables, String entityAxis, - String description + String description, + AcquisitionStrategy strategy ) { - return new TachographImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description); + return new TachographImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description, strategy); } } diff --git a/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java b/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java index 2797d5d..3725b78 100644 --- a/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java +++ b/src/main/java/at/procon/eventhub/service/TelematicsPositionEventMapper.java @@ -66,6 +66,7 @@ public class TelematicsPositionEventMapper { source.odometerM(), new GeoPointDto(source.latitude(), source.longitude()), detailsFactory.position(source.positionReason()), + null, detailsFactory.payloadFromMap(source.payload()), false, packageInfo diff --git a/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java b/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java index fdb924b..e280586 100644 --- a/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java +++ b/src/main/java/at/procon/eventhub/service/YellowFoxD8BookingEventMapper.java @@ -80,6 +80,7 @@ public class YellowFoxD8BookingEventMapper { source.odometerM(), new GeoPointDto(source.latitude(), source.longitude()), detailsFor(normalized), + null, detailsFactory.payloadFromMap(payload), false, packageInfo diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 68a841b..227d06e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -29,3 +29,6 @@ eventhub: batch: completion-size: 1000 completion-timeout: 5s + tachograph: + default-chunk-days: 1 + occurred-at-overlap: 7d diff --git a/src/main/resources/db/migration/V1__create_eventhub_schema.sql b/src/main/resources/db/migration/V1__create_eventhub_schema.sql index b24efa2..b4e5718 100644 --- a/src/main/resources/db/migration/V1__create_eventhub_schema.sql +++ b/src/main/resources/db/migration/V1__create_eventhub_schema.sql @@ -18,7 +18,9 @@ create table if not exists eventhub.event_source ( constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key) ); --- One execution of a tachograph acquisition job. A run may create many data packages. +-- One execution of a tachograph acquisition job. A run may create many EventHub +-- extraction packages. The original tachograph card/VU packages are preserved +-- separately as source_package_* references, not used as EventHub package identity. create table if not exists eventhub.import_run ( id uuid primary key, tenant_key text not null, @@ -43,14 +45,17 @@ create table if not exists eventhub.import_run ( requested_event_families text[] not null default '{}', acquisition_strategy text, metadata jsonb not null default '{}'::jsonb, + planned_package_count integer not null default 0, started_at timestamptz not null default now(), finished_at timestamptz, error_message text, constraint chk_import_run_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to) ); --- Optional cursor table for scheduled/difference imports. The first implementation can --- use occurredAt windows; later it can switch to source-package or source-row watermarks. +-- Optional cursor table for scheduled/difference imports. For regular updates, +-- prefer source-package watermarks because new source packages may contain older +-- occurredAt events. Fallback strategies can use source-row updatedAt or occurredAt +-- overlap windows. Cursor scope is separated by scope_hash and event family/source kind. create table if not exists eventhub.import_cursor ( id uuid primary key, tenant_key text not null, @@ -67,8 +72,10 @@ create table if not exists eventhub.import_cursor ( constraint ux_import_cursor unique (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type) ); --- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/import scope. --- This table captures the source grouping and the organisation/time import scope used for acquisition. +-- One EventHub acquisition package. It represents an extraction batch, not +-- necessarily one original tachograph package. A single extraction batch can +-- contain rows from multiple original card/VU packages; those are preserved on +-- acquired_event.source_package_* where available. create table if not exists eventhub.data_package ( id uuid primary key, event_source_id integer not null references eventhub.event_source(id), @@ -94,20 +101,39 @@ create table if not exists eventhub.data_package ( event_family text, business_date date, external_package_id text, + + extraction_code text, + extraction_source_kind text, + entity_axis text, + batch_no integer, + chunk_from timestamptz, + chunk_to timestamptz, + + -- Optional package-level source package reference. Usually null for extraction + -- batches, because the original tachograph package is stored per acquired event. + source_package_kind text, + source_package_id text, + source_package_entity_id text, + source_package_period_from timestamptz, + source_package_period_to timestamptz, + source_package_imported_at timestamptz, + received_at timestamptz not null default now(), completed_at timestamptz, event_count integer not null default 0, metadata jsonb not null default '{}'::jsonb, error_message text, - constraint ux_data_package_external unique (tenant_key, event_source_id, external_package_id, received_at), - constraint chk_data_package_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to) + constraint ux_data_package_package_key unique (tenant_key, event_source_id, package_key), + constraint chk_data_package_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to), + constraint chk_data_package_chunk_time_order check (chunk_from is null or chunk_to is null or chunk_from < chunk_to) ); -- Temporary acquisition-stage point-event store. -- It keeps acquired point events with EventSource context, externalSourceEventId, -- one occurredAt timestamp, source-side driver/vehicle refs, normalized event details, --- and raw JSON payload. Organisation is intentionally not stored per event; it belongs --- to master-data relations for driver/vehicle and is represented in importScope/sourceGroup. +-- optional original source-package reference, and raw JSON payload. Organisation is +-- intentionally not stored per event; it belongs to master-data relations for +-- driver/vehicle and is represented in importScope/sourceGroup. create table if not exists eventhub.acquired_event ( id uuid not null, event_source_id integer not null references eventhub.event_source(id), @@ -124,6 +150,13 @@ create table if not exists eventhub.acquired_event ( vehicle_registration_nation text, vehicle_registration_number text, + source_package_kind text, + source_package_id text, + source_package_entity_id text, + source_package_period_from timestamptz, + source_package_period_to timestamptz, + source_package_imported_at timestamptz, + occurred_at timestamptz not null, received_partner_at timestamptz, received_hub_at timestamptz not null default now(), @@ -161,7 +194,8 @@ create table if not exists eventhub.acquired_event ( ), constraint chk_acquired_event_position_pair check ((latitude is null and longitude is null) or (latitude is not null and longitude is not null)), constraint chk_driver_card_nation_when_number check (driver_card_number is null or driver_card_nation is not null), - constraint chk_vehicle_registration_nation_when_number check (vehicle_registration_number is null or vehicle_registration_nation is not null) + constraint chk_vehicle_registration_nation_when_number check (vehicle_registration_number is null or vehicle_registration_nation is not null), + constraint chk_acquired_event_source_package_time_order check (source_package_period_from is null or source_package_period_to is null or source_package_period_from < source_package_period_to) ); create unique index if not exists ux_acquired_event_source_record @@ -171,6 +205,10 @@ create index if not exists idx_acquired_event_signature on eventhub.acquired_event(event_signature_hash) where event_signature_hash is not null; +create index if not exists idx_acquired_event_source_package + on eventhub.acquired_event(source_package_kind, source_package_id, source_package_imported_at) + where source_package_id is not null; + create index if not exists idx_acquired_event_vehicle_vin_time on eventhub.acquired_event(vehicle_vin, occurred_at desc) where vehicle_vin is not null; @@ -198,5 +236,8 @@ create index if not exists idx_data_package_source_time create index if not exists idx_data_package_scope on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to); +create index if not exists idx_data_package_extraction + on eventhub.data_package(tenant_key, event_source_id, import_run_id, event_family, extraction_source_kind, extraction_code, batch_no); + create index if not exists idx_import_run_source_status on eventhub.import_run(tenant_key, event_source_id, status, started_at desc); diff --git a/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java b/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java index 4d6cebf..603a36e 100644 --- a/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java +++ b/src/test/java/at/procon/eventhub/EventAcquisitionRecordKeyServiceTest.java @@ -66,6 +66,7 @@ class EventAcquisitionRecordKeyServiceTest { null, new EventDetailsDto("DRIVER_ACTIVITY", objectMapper.createObjectNode().put("cardSlot", "DRIVER")), null, + objectMapper.createObjectNode(), false, packageInfo );