Add tachograph import run planning

This commit is contained in:
trifonovt 2026-04-30 12:40:12 +02:00
parent 6a7395bec6
commit c52712f881
23 changed files with 817 additions and 94 deletions

143
README.md
View File

@ -568,3 +568,146 @@ order by occurred_at desc;
3. Implement initial backfill using organisation/time scope. 3. Implement initial backfill using organisation/time scope.
4. Implement incremental import using source-package watermark, with occurredAt overlap fallback. 4. Implement incremental import using source-package watermark, with occurredAt overlap fallback.
5. Discuss query/read models later: source priority and gap filling across tachograph, YellowFox and other sources. 5. Discuss query/read models later: source priority and gap filling across tachograph, YellowFox and other sources.
## Implemented tachograph ingestion run model
The tachograph import model now follows the agreed design:
```text
Initial import
organisation subtree + occurredFrom/occurredTo
chunked by time and/or entity
idempotent inserts by sourceRecordKeyHash
Regular update
refresh master data first
prefer discovery of new/changed original tachograph source packages
extract affected event families
import idempotently
advance cursor after successful extraction
Package model
import_run = one execution of the tachograph importer
data_package = one EventHub extraction batch
sourcePackageRef = original tachograph driver-card/VU package reference
Deduplication
no cross-source deduplication
sourceRecordKeyHash prevents same-source duplicate imports
eventSignatureHash is non-unique and only helps later query/projection logic
```
### Import run vs tachograph source package
A tachograph database already contains packages from driver cards and vehicle-unit devices. The EventHub model does not force those original packages to become EventHub packages. Instead:
```text
Original tachograph source package
card/VU package imported into tachograph DB
stored as sourcePackageRef when extracting events
EventHub data package
extraction batch produced by an EventHub import run
grouped by tenant, EventSource, event family, extraction code and time chunk
```
This allows a single EventHub import run to process many original tachograph packages, and a single extraction batch may contain rows from many original packages. If the SQL extractor can return source package metadata, it should populate `EventHubEventDto.sourcePackageRef`:
```json
"sourcePackageRef": {
"packageKind": "VEHICLE_UNIT",
"sourcePackageId": "VU-PACKAGE-12345",
"sourceEntityId": "vehicle-90021",
"packagePeriodFrom": "2026-04-01T00:00:00+02:00",
"packagePeriodTo": "2026-04-15T00:00:00+02:00",
"importedIntoSourceAt": "2026-04-16T10:30:00+02:00"
}
```
### Tachograph import endpoints
`POST /api/eventhub/acquisition/tachograph/imports/plan` returns the calculated event-family extraction plan and time chunks.
`POST /api/eventhub/acquisition/tachograph/imports/start` now creates:
```text
1 import_run row
N planned data_package rows, one per extraction definition and time chunk
```
The SQL extraction routes are intentionally separated from run planning. They should pick planned extraction packages, execute the corresponding SQL, map rows to `EventHubEventDto`, set `sourcePackageRef` when known, and send them to `direct:eventhub-normalized-input`.
### Initial import
Example initial import request:
```json
{
"tenantKey": "kralowetz",
"eventSource": {
"providerKey": "TACHOGRAPH",
"sourceKind": "MIXED",
"sourceKey": "TACHOGRAPH_DB",
"sourceInstanceKey": "tachograph-prod-db",
"tenantProviderSettingKey": "kralowetz-tachograph-prod"
},
"sourceGroup": {
"type": "ORGANISATION",
"sourceEntityId": "147",
"code": "147",
"name": "Kralowetz"
},
"importScope": {
"type": "SOURCE_ORGANISATION_SUBTREE",
"rootSourceOrganisation": {
"type": "ORGANISATION",
"sourceEntityId": "147"
},
"includeChildren": true,
"occurredFrom": "2026-01-01T00:00:00+01:00",
"occurredTo": "2026-02-01T00:00:00+01:00"
},
"eventFamilies": [
"DRIVER_ACTIVITY",
"DRIVER_CARD",
"POSITION",
"BORDER_CROSSING",
"LOAD_UNLOAD",
"PLACE",
"SPECIFIC_CONDITION",
"SPEEDING"
],
"mode": "INITIAL_BACKFILL",
"refreshMasterDataFirst": true,
"acquisitionStrategy": "OCCURRED_AT_WINDOW_WITH_OVERLAP"
}
```
The plan service chunks the occurred-time range using `eventhub.tachograph.default-chunk-days`.
### Regular update
For regular updates, the preferred mode is:
```json
{
"mode": "INCREMENTAL_UPDATE",
"refreshMasterDataFirst": true,
"acquisitionStrategy": "SOURCE_PACKAGE_WATERMARK"
}
```
This is preferred because newly imported original driver-card/VU packages can contain older occurredAt events. A simple occurredAt watermark would miss such late-arriving historical data. The `eventhub.import_cursor` table stores source-package, source-row and occurredAt fallback watermarks per tenant/source/scope/event family/source kind.
### Extraction route contract
A future concrete SQL extraction route should do this:
```text
planned data_package
-> execute SQL for extraction_code and chunk/import scope
-> map source rows to EventHubEventDto
-> populate sourcePackageRef if source package metadata is available
-> send to direct:eventhub-normalized-input
-> only advance eventhub.import_cursor after successful import
```

View File

@ -3,6 +3,7 @@ package at.procon.eventhub.api;
import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageIngestRequest; import at.procon.eventhub.dto.EventHubPackageIngestRequest;
import at.procon.eventhub.dto.TachographImportRequest; import at.procon.eventhub.dto.TachographImportRequest;
import at.procon.eventhub.dto.TachographImportRunResultDto;
import at.procon.eventhub.dto.source.TachographActivityDto; import at.procon.eventhub.dto.source.TachographActivityDto;
import at.procon.eventhub.dto.source.TelematicsPositionDto; import at.procon.eventhub.dto.source.TelematicsPositionDto;
import at.procon.eventhub.dto.source.YellowFoxD8BookingDto; import at.procon.eventhub.dto.source.YellowFoxD8BookingDto;
@ -53,13 +54,13 @@ public class EventHubIngestionController {
} }
@PostMapping("/tachograph/imports/start") @PostMapping("/tachograph/imports/start")
public ResponseEntity<Map<String, Object>> startTachographImport(@Valid @RequestBody TachographImportRequest request) { public ResponseEntity<TachographImportRunResultDto> startTachographImport(@Valid @RequestBody TachographImportRequest request) {
producerTemplate.sendBody("direct:tachograph-import-start", request); TachographImportRunResultDto result = producerTemplate.requestBody(
return ResponseEntity.accepted().body(Map.of( "direct:tachograph-import-start",
"accepted", true, request,
"route", "direct:tachograph-import-start", TachographImportRunResultDto.class
"note", "The current implementation prepares the tachograph import plan. SQL extraction routes are intentionally scaffolded as next step." );
)); return ResponseEntity.accepted().body(result);
} }
@PostMapping("/packages") @PostMapping("/packages")

View File

@ -1,7 +1,7 @@
package at.procon.eventhub.camel; package at.procon.eventhub.camel;
import at.procon.eventhub.dto.TachographImportRequest; import at.procon.eventhub.dto.TachographImportRequest;
import at.procon.eventhub.service.TachographImportPlanService; import at.procon.eventhub.service.TachographImportExecutionService;
import org.apache.camel.builder.RouteBuilder; import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -12,10 +12,10 @@ public class TachographImportRequestRoute extends RouteBuilder {
private static final Logger log = LoggerFactory.getLogger(TachographImportRequestRoute.class); private static final Logger log = LoggerFactory.getLogger(TachographImportRequestRoute.class);
private final TachographImportPlanService planService; private final TachographImportExecutionService executionService;
public TachographImportRequestRoute(TachographImportPlanService planService) { public TachographImportRequestRoute(TachographImportExecutionService executionService) {
this.planService = planService; this.executionService = executionService;
} }
@Override @Override
@ -24,10 +24,10 @@ public class TachographImportRequestRoute extends RouteBuilder {
.routeId("tachograph-import-start-route") .routeId("tachograph-import-start-route")
.process(exchange -> { .process(exchange -> {
TachographImportRequest request = exchange.getMessage().getBody(TachographImportRequest.class); TachographImportRequest request = exchange.getMessage().getBody(TachographImportRequest.class);
var plan = planService.createPlan(request); var result = executionService.startImport(request);
log.info("Prepared tachograph import plan tenant={} mode={} strategy={} scope={} itemCount={}", log.info("Prepared tachograph import run importRunId={} plannedPackages={} status={}",
plan.tenantKey(), plan.mode(), plan.acquisitionStrategy(), plan.importScope().stableKey(), plan.items().size()); result.importRunId(), result.plannedPackageCount(), result.status());
exchange.getMessage().setBody(plan); exchange.getMessage().setBody(result);
}); });
} }
} }

View File

@ -7,11 +7,16 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
public class EventHubProperties { public class EventHubProperties {
private final Batch batch = new Batch(); private final Batch batch = new Batch();
private final Tachograph tachograph = new Tachograph();
public Batch getBatch() { public Batch getBatch() {
return batch; return batch;
} }
public Tachograph getTachograph() {
return tachograph;
}
public static class Batch { public static class Batch {
/** Number of events collected before a package is persisted. */ /** Number of events collected before a package is persisted. */
private int completionSize = 1000; private int completionSize = 1000;
@ -35,4 +40,28 @@ public class EventHubProperties {
this.completionTimeout = completionTimeout; this.completionTimeout = completionTimeout;
} }
} }
public static class Tachograph {
/** Default chunk size for initial/backfill occurred-time imports. */
private int defaultChunkDays = 1;
/** Overlap used by occurred-at fallback incremental imports. */
private Duration occurredAtOverlap = Duration.ofDays(7);
public int getDefaultChunkDays() {
return defaultChunkDays;
}
public void setDefaultChunkDays(int defaultChunkDays) {
this.defaultChunkDays = Math.max(1, defaultChunkDays);
}
public Duration getOccurredAtOverlap() {
return occurredAtOverlap;
}
public void setOccurredAtOverlap(Duration occurredAtOverlap) {
this.occurredAtOverlap = occurredAtOverlap;
}
}
} }

View File

@ -1,7 +1,9 @@
package at.procon.eventhub.dto; package at.procon.eventhub.dto;
public enum DataPackageStatus { public enum DataPackageStatus {
PLANNED,
IMPORTING, IMPORTING,
IMPORTED, IMPORTED,
EMPTY,
FAILED FAILED
} }

View File

@ -40,6 +40,9 @@ public record EventHubEventDto(
/** Normalized semantic details depending on eventDomain/eventType. */ /** Normalized semantic details depending on eventDomain/eventType. */
@Valid EventDetailsDto eventDetails, @Valid EventDetailsDto eventDetails,
/** Optional reference to the original source package/card/VU download containing this source record. */
@Valid SourcePackageRefDto sourcePackageRef,
/** Raw/provider-specific payload, stored as real JSON and not as encoded JSON string. */ /** Raw/provider-specific payload, stored as real JSON and not as encoded JSON string. */
JsonNode payload, JsonNode payload,
@ -71,6 +74,7 @@ public record EventHubEventDto(
odometerM, odometerM,
position, position,
eventDetails, eventDetails,
sourcePackageRef,
payload, payload,
manualEntry, manualEntry,
packageInfo packageInfo
@ -92,6 +96,7 @@ public record EventHubEventDto(
odometerM, odometerM,
position, position,
eventDetails, eventDetails,
sourcePackageRef,
payload, payload,
manualEntry, manualEntry,
packageInfo packageInfo
@ -113,6 +118,7 @@ public record EventHubEventDto(
odometerM, odometerM,
position, position,
eventDetails, eventDetails,
sourcePackageRef,
payload, payload,
manualEntry, manualEntry,
value value

View File

@ -0,0 +1,8 @@
package at.procon.eventhub.dto;
public enum ImportRunStatus {
PLANNED,
RUNNING,
COMPLETED,
FAILED
}

View File

@ -0,0 +1,40 @@
package at.procon.eventhub.dto;
import java.time.OffsetDateTime;
/**
* Reference to the original source package from which a source record was read.
*
* For tachograph data this is the original driver-card or vehicle-unit package
* already imported into the tachograph DB. EventHub data packages are extraction
* batches and are intentionally not forced to be identical to this source package.
*/
public record SourcePackageRefDto(
String packageKind,
String sourcePackageId,
String sourceEntityId,
OffsetDateTime packagePeriodFrom,
OffsetDateTime packagePeriodTo,
OffsetDateTime importedIntoSourceAt
) {
public SourcePackageRefDto {
packageKind = normalize(packageKind);
sourcePackageId = normalizeNullable(sourcePackageId);
sourceEntityId = normalizeNullable(sourceEntityId);
}
public boolean hasAnyReference() {
return sourcePackageId != null || sourceEntityId != null || importedIntoSourceAt != null;
}
private static String normalize(String value) {
if (value == null || value.isBlank()) {
return value;
}
return value.trim().toUpperCase().replace('-', '_').replace(' ', '_');
}
private static String normalizeNullable(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
}

View File

@ -10,6 +10,7 @@ public record TachographImportPlanDto(
ImportScopeDto importScope, ImportScopeDto importScope,
SourceGroupRefDto sourceGroup, SourceGroupRefDto sourceGroup,
EventSourceDto eventSource, EventSourceDto eventSource,
List<TimeChunkDto> chunks,
List<TachographImportPlanItemDto> items List<TachographImportPlanItemDto> items
) { ) {
} }

View File

@ -8,6 +8,7 @@ public record TachographImportPlanItemDto(
String extractionCode, String extractionCode,
List<String> sourceTables, List<String> sourceTables,
String entityAxis, String entityAxis,
String description String description,
AcquisitionStrategy preferredStrategy
) { ) {
} }

View File

@ -0,0 +1,13 @@
package at.procon.eventhub.dto;
import java.util.List;
import java.util.UUID;
public record TachographImportRunResultDto(
UUID importRunId,
ImportRunStatus status,
int plannedPackageCount,
TachographImportPlanDto plan,
List<UUID> plannedPackageIds
) {
}

View File

@ -0,0 +1,15 @@
package at.procon.eventhub.dto;
import java.time.OffsetDateTime;
public record TimeChunkDto(
int sequence,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
public String stableKey() {
String from = occurredFrom == null ? "MIN" : occurredFrom.toString();
String to = occurredTo == null ? "MAX" : occurredTo.toString();
return sequence + ":" + from + ":" + to;
}
}

View File

@ -5,6 +5,9 @@ import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto; import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.SourcePackageRefDto;
import at.procon.eventhub.dto.TachographImportPlanItemDto;
import at.procon.eventhub.dto.TimeChunkDto;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
@ -32,48 +35,139 @@ public class DataPackageRepository {
OffsetDateTime occurredFrom, OffsetDateTime occurredFrom,
OffsetDateTime occurredTo, OffsetDateTime occurredTo,
Map<String, Object> metadata Map<String, Object> metadata
) {
return insertPackage(
eventSourceId,
null,
packageKey,
packageInfo,
packageType,
DataPackageStatus.IMPORTING,
occurredFrom,
occurredTo,
null,
null,
null,
null,
null,
null,
metadata
);
}
public UUID createPlannedExtractionPackage(
UUID importRunId,
int eventSourceId,
String packageKey,
EventHubPackageRequest packageInfo,
TachographImportPlanItemDto planItem,
TimeChunkDto chunk,
int batchNo,
Map<String, Object> metadata
) {
return insertPackage(
eventSourceId,
importRunId,
packageKey,
packageInfo,
DataPackageType.DB_EXTRACT,
DataPackageStatus.PLANNED,
chunk == null ? null : chunk.occurredFrom(),
chunk == null ? null : chunk.occurredTo(),
planItem,
batchNo,
chunk == null ? null : chunk.occurredFrom(),
chunk == null ? null : chunk.occurredTo(),
null,
null,
metadata
);
}
private UUID insertPackage(
int eventSourceId,
UUID importRunId,
String packageKey,
EventHubPackageRequest packageInfo,
DataPackageType packageType,
DataPackageStatus status,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
TachographImportPlanItemDto planItem,
Integer batchNo,
OffsetDateTime chunkFrom,
OffsetDateTime chunkTo,
SourcePackageRefDto sourcePackageRef,
Integer eventCount,
Map<String, Object> metadata
) { ) {
UUID id = UUID.randomUUID(); UUID id = UUID.randomUUID();
SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup(); SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup();
ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope(); ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope();
SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation(); SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation();
jdbcTemplate.update( return jdbcTemplate.query(
""" con -> {
var ps = con.prepareStatement("""
insert into eventhub.data_package( insert into eventhub.data_package(
id, event_source_id, tenant_key, package_key, package_type, status, id, event_source_id, import_run_id, tenant_key, package_key, package_type, status,
source_group_type, source_group_entity_id, source_group_code, source_group_name, source_group_type, source_group_entity_id, source_group_code, source_group_name,
import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name, import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name,
include_children, occurred_from, occurred_to, include_children, occurred_from, occurred_to,
event_family, business_date, external_package_id, event_family, business_date, external_package_id,
extraction_code, extraction_source_kind, entity_axis, batch_no, chunk_from, chunk_to,
source_package_kind, source_package_id, source_package_entity_id,
source_package_period_from, source_package_period_to, source_package_imported_at,
received_at, event_count, metadata received_at, event_count, metadata
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), 0, ?::jsonb) ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), ?, ?::jsonb)
""", on conflict (tenant_key, event_source_id, package_key) do update
ps -> { set metadata = excluded.metadata
returning id
""");
ps.setObject(1, id); ps.setObject(1, id);
ps.setInt(2, eventSourceId); ps.setInt(2, eventSourceId);
ps.setString(3, packageInfo == null ? "default" : packageInfo.tenantKey()); ps.setObject(3, importRunId);
ps.setString(4, packageKey); ps.setString(4, packageInfo == null ? "default" : packageInfo.tenantKey());
ps.setString(5, packageType.name()); ps.setString(5, packageKey);
ps.setString(6, DataPackageStatus.IMPORTING.name()); ps.setString(6, packageType.name());
ps.setString(7, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name()); ps.setString(7, status.name());
ps.setString(8, sourceGroup == null ? null : sourceGroup.sourceEntityId()); ps.setString(8, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name());
ps.setString(9, sourceGroup == null ? null : sourceGroup.code()); ps.setString(9, sourceGroup == null ? null : sourceGroup.sourceEntityId());
ps.setString(10, sourceGroup == null ? null : sourceGroup.name()); ps.setString(10, sourceGroup == null ? null : sourceGroup.code());
ps.setString(11, importScope == null ? null : importScope.type().name()); ps.setString(11, sourceGroup == null ? null : sourceGroup.name());
ps.setString(12, rootOrg == null ? null : rootOrg.sourceEntityId()); ps.setString(12, importScope == null || importScope.type() == null ? null : importScope.type().name());
ps.setString(13, rootOrg == null ? null : rootOrg.code()); ps.setString(13, rootOrg == null ? null : rootOrg.sourceEntityId());
ps.setString(14, rootOrg == null ? null : rootOrg.name()); ps.setString(14, rootOrg == null ? null : rootOrg.code());
ps.setBoolean(15, importScope != null && importScope.includeChildren()); ps.setString(15, rootOrg == null ? null : rootOrg.name());
ps.setObject(16, occurredFrom); ps.setBoolean(16, importScope != null && importScope.includeChildren());
ps.setObject(17, occurredTo); ps.setObject(17, occurredFrom);
ps.setString(18, packageInfo == null ? null : packageInfo.eventFamily()); ps.setObject(18, occurredTo);
ps.setObject(19, packageInfo == null ? null : packageInfo.businessDate()); ps.setString(19, packageInfo == null ? null : packageInfo.eventFamily());
ps.setString(20, packageInfo == null ? packageKey : packageInfo.externalPackageId()); ps.setObject(20, packageInfo == null ? null : packageInfo.businessDate());
ps.setString(21, toJson(metadata)); ps.setString(21, packageInfo == null ? packageKey : packageInfo.externalPackageId());
ps.setString(22, planItem == null ? null : planItem.extractionCode());
ps.setString(23, planItem == null ? null : planItem.sourceKind());
ps.setString(24, planItem == null ? null : planItem.entityAxis());
ps.setObject(25, batchNo);
ps.setObject(26, chunkFrom);
ps.setObject(27, chunkTo);
ps.setString(28, sourcePackageRef == null ? null : sourcePackageRef.packageKind());
ps.setString(29, sourcePackageRef == null ? null : sourcePackageRef.sourcePackageId());
ps.setString(30, sourcePackageRef == null ? null : sourcePackageRef.sourceEntityId());
ps.setObject(31, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodFrom());
ps.setObject(32, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodTo());
ps.setObject(33, sourcePackageRef == null ? null : sourcePackageRef.importedIntoSourceAt());
ps.setInt(34, eventCount == null ? 0 : eventCount);
ps.setString(35, toJson(metadata));
return ps;
},
rs -> {
if (!rs.next()) {
throw new IllegalStateException("Could not create or resolve data package " + packageKey);
}
return (UUID) rs.getObject(1);
} }
); );
return id;
} }
public void markImported(UUID packageId, int insertedCount) { public void markImported(UUID packageId, int insertedCount) {
@ -83,7 +177,7 @@ public class DataPackageRepository {
set status = ?, event_count = ?, completed_at = now() set status = ?, event_count = ?, completed_at = now()
where id = ? where id = ?
""", """,
DataPackageStatus.IMPORTED.name(), insertedCount == 0 ? DataPackageStatus.EMPTY.name() : DataPackageStatus.IMPORTED.name(),
insertedCount, insertedCount,
packageId packageId
); );

View File

@ -3,6 +3,7 @@ package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.SourcePackageRefDto;
import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.service.EventAcquisitionRecordKeyService; import at.procon.eventhub.service.EventAcquisitionRecordKeyService;
@ -46,6 +47,8 @@ public class EventRepository {
external_source_event_id, external_source_event_id,
driver_source_entity_id, driver_card_nation, driver_card_number, driver_source_entity_id, driver_card_nation, driver_card_number,
vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number,
source_package_kind, source_package_id, source_package_entity_id,
source_package_period_from, source_package_period_to, source_package_imported_at,
occurred_at, received_partner_at, received_hub_at, occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle, event_domain, event_type, lifecycle,
odometer_m, latitude, longitude, odometer_m, latitude, longitude,
@ -59,6 +62,8 @@ public class EventRepository {
?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?,
?, ?, ?,
?::jsonb, ?::jsonb, ?, ?::jsonb, ?::jsonb, ?,
?, ? ?, ?
) )
@ -74,6 +79,7 @@ public class EventRepository {
DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard(); DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard();
VehicleRefDto vehicleRef = event.vehicleRef(); VehicleRefDto vehicleRef = event.vehicleRef();
VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration(); VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration();
SourcePackageRefDto sourcePackageRef = event.sourcePackageRef();
ps.setObject(1, eventId); ps.setObject(1, eventId);
ps.setInt(2, eventSourceId); ps.setInt(2, eventSourceId);
@ -89,25 +95,32 @@ public class EventRepository {
ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation()); ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation());
ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number()); ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number());
ps.setObject(12, event.occurredAt()); ps.setString(12, sourcePackageRef == null ? null : sourcePackageRef.packageKind());
ps.setObject(13, event.receivedPartnerAt()); ps.setString(13, sourcePackageRef == null ? null : sourcePackageRef.sourcePackageId());
ps.setObject(14, receivedHubAt); ps.setString(14, sourcePackageRef == null ? null : sourcePackageRef.sourceEntityId());
ps.setString(15, event.eventDomain().name()); ps.setObject(15, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodFrom());
ps.setString(16, event.eventType().name()); ps.setObject(16, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodTo());
ps.setString(17, event.lifecycle().name()); ps.setObject(17, sourcePackageRef == null ? null : sourcePackageRef.importedIntoSourceAt());
setNullableLong(ps, 18, event.odometerM());
ps.setObject(18, event.occurredAt());
ps.setObject(19, event.receivedPartnerAt());
ps.setObject(20, receivedHubAt);
ps.setString(21, event.eventDomain().name());
ps.setString(22, event.eventType().name());
ps.setString(23, event.lifecycle().name());
setNullableLong(ps, 24, event.odometerM());
if (event.position() == null) { if (event.position() == null) {
ps.setNull(19, Types.NUMERIC); ps.setNull(25, Types.NUMERIC);
ps.setNull(20, Types.NUMERIC); ps.setNull(26, Types.NUMERIC);
} else { } else {
ps.setObject(19, event.position().latitude()); ps.setObject(25, event.position().latitude());
ps.setObject(20, event.position().longitude()); ps.setObject(26, event.position().longitude());
} }
ps.setString(21, toJson(objectMapper.valueToTree(event.eventDetails()))); ps.setString(27, toJson(objectMapper.valueToTree(event.eventDetails())));
ps.setString(22, toJson(event.payload())); ps.setString(28, toJson(event.payload()));
ps.setBoolean(23, event.manualEntry()); ps.setBoolean(29, event.manualEntry());
ps.setString(24, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId)); ps.setString(30, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId));
ps.setString(25, recordKeyService.buildEventSignatureHash(event)); ps.setString(31, recordKeyService.buildEventSignatureHash(event));
} }
@Override @Override

View File

@ -0,0 +1,99 @@
package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.TachographImportRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Array;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class ImportRunRepository {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
public ImportRunRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
}
public UUID createPlannedRun(int eventSourceId, TachographImportRequest request, Map<String, Object> metadata) {
UUID id = UUID.randomUUID();
SourceGroupRefDto sourceGroup = request.sourceGroup();
ImportScopeDto importScope = request.importScope();
SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation();
EventSourceDto eventSource = request.eventSource();
jdbcTemplate.update(con -> {
var ps = con.prepareStatement("""
insert into eventhub.import_run(
id, tenant_key, event_source_id, mode, status, refresh_master_data_first,
source_group_type, source_group_entity_id, source_group_code, source_group_name,
import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name,
include_children, occurred_from, occurred_to,
requested_event_families, acquisition_strategy, metadata, planned_package_count
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, 0)
""");
ps.setObject(1, id);
ps.setString(2, request.tenantKey());
ps.setInt(3, eventSourceId);
ps.setString(4, request.mode().name());
ps.setString(5, ImportRunStatus.PLANNED.name());
ps.setBoolean(6, request.refreshMasterDataFirst());
ps.setString(7, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name());
ps.setString(8, sourceGroup == null ? null : sourceGroup.sourceEntityId());
ps.setString(9, sourceGroup == null ? null : sourceGroup.code());
ps.setString(10, sourceGroup == null ? null : sourceGroup.name());
ps.setString(11, importScope == null || importScope.type() == null ? null : importScope.type().name());
ps.setString(12, rootOrg == null ? null : rootOrg.sourceEntityId());
ps.setString(13, rootOrg == null ? null : rootOrg.code());
ps.setString(14, rootOrg == null ? null : rootOrg.name());
ps.setBoolean(15, importScope != null && importScope.includeChildren());
ps.setObject(16, importScope == null ? null : importScope.occurredFrom());
ps.setObject(17, importScope == null ? null : importScope.occurredTo());
ps.setArray(18, eventFamilyArray(con, request));
ps.setString(19, request.acquisitionStrategy().name());
ps.setString(20, toJson(metadata == null ? Map.of() : metadata));
return ps;
});
return id;
}
public void markRunning(UUID id) {
jdbcTemplate.update("update eventhub.import_run set status = ? where id = ?", ImportRunStatus.RUNNING.name(), id);
}
public void markPlannedPackages(UUID id, int plannedPackageCount) {
jdbcTemplate.update("update eventhub.import_run set planned_package_count = ? where id = ?", plannedPackageCount, id);
}
public void markCompleted(UUID id) {
jdbcTemplate.update("update eventhub.import_run set status = ?, finished_at = now() where id = ?", ImportRunStatus.COMPLETED.name(), id);
}
public void markFailed(UUID id, String errorMessage) {
jdbcTemplate.update("update eventhub.import_run set status = ?, error_message = ?, finished_at = now() where id = ?", ImportRunStatus.FAILED.name(), errorMessage, id);
}
private Array eventFamilyArray(Connection con, TachographImportRequest request) throws SQLException {
String[] values = request.eventFamilies().stream().map(Enum::name).toArray(String[]::new);
return con.createArrayOf("text", values);
}
private String toJson(Map<String, Object> value) {
try {
return objectMapper.writeValueAsString(value == null ? Map.of() : value);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Cannot serialize import run metadata", e);
}
}
}

View File

@ -64,6 +64,7 @@ public class TachographActivityEventMapper {
null, null,
null, null,
detailsFactory.driverActivity(source.cardSlot(), source.cardStatus(), source.drivingStatus()), detailsFactory.driverActivity(source.cardSlot(), source.cardStatus(), source.drivingStatus()),
null,
detailsFactory.payloadFromMap(source.payload()), detailsFactory.payloadFromMap(source.payload()),
false, false,
packageInfo packageInfo

View File

@ -0,0 +1,164 @@
package at.procon.eventhub.service;
import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.dto.TachographImportPlanDto;
import at.procon.eventhub.dto.TachographImportPlanItemDto;
import at.procon.eventhub.dto.TachographImportRequest;
import at.procon.eventhub.dto.TachographImportRunResultDto;
import at.procon.eventhub.dto.TimeChunkDto;
import at.procon.eventhub.persistence.DataPackageRepository;
import at.procon.eventhub.persistence.EventSourceRepository;
import at.procon.eventhub.persistence.ImportRunRepository;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* Creates import runs and extraction data packages for tachograph acquisition.
*
* This service deliberately creates EventHub packages for extraction batches. The
* original tachograph card/VU package is not treated as the EventHub package; it
* is preserved later as SourcePackageRefDto on acquired events or in batch
* metadata when an extractor processes one concrete source package.
*/
@Service
public class TachographImportExecutionService {
private static final Logger log = LoggerFactory.getLogger(TachographImportExecutionService.class);
private final TachographImportPlanService planService;
private final EventSourceRepository eventSourceRepository;
private final ImportRunRepository importRunRepository;
private final DataPackageRepository dataPackageRepository;
public TachographImportExecutionService(
TachographImportPlanService planService,
EventSourceRepository eventSourceRepository,
ImportRunRepository importRunRepository,
DataPackageRepository dataPackageRepository
) {
this.planService = planService;
this.eventSourceRepository = eventSourceRepository;
this.importRunRepository = importRunRepository;
this.dataPackageRepository = dataPackageRepository;
}
@Transactional
public TachographImportRunResultDto startImport(TachographImportRequest request) {
TachographImportPlanDto plan = planService.createPlan(request);
int baseEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), request.eventSource());
UUID importRunId = importRunRepository.createPlannedRun(baseEventSourceId, request, Map.of(
"note", "Created tachograph import run and planned extraction packages. SQL extraction is handled by event-family routes.",
"packageModel", "EventHub data packages are extraction batches; original tachograph packages are SourcePackageRefDto."
));
List<UUID> packageIds = new ArrayList<>();
int batchNo = 1;
try {
for (TachographImportPlanItemDto item : plan.items()) {
EventSourceDto itemEventSource = eventSourceForItem(request.eventSource(), item);
int itemEventSourceId = eventSourceRepository.resolveSourceId(request.tenantKey(), itemEventSource);
for (TimeChunkDto chunk : plan.chunks()) {
EventHubPackageRequest packageInfo = packageRequestFor(request, itemEventSource, item, chunk);
String packageKey = packageKey(importRunId, packageInfo, item, chunk, batchNo);
UUID packageId = dataPackageRepository.createPlannedExtractionPackage(
importRunId,
itemEventSourceId,
packageKey,
packageInfo,
item,
chunk,
batchNo,
metadata(request, item, chunk, importRunId)
);
packageIds.add(packageId);
batchNo++;
}
}
importRunRepository.markPlannedPackages(importRunId, packageIds.size());
log.info("Created tachograph import run importRunId={} plannedPackages={} tenant={} mode={} strategy={}",
importRunId, packageIds.size(), request.tenantKey(), request.mode(), request.acquisitionStrategy());
return new TachographImportRunResultDto(importRunId, ImportRunStatus.PLANNED, packageIds.size(), plan, List.copyOf(packageIds));
} catch (RuntimeException ex) {
importRunRepository.markFailed(importRunId, ex.getMessage());
throw ex;
}
}
private EventSourceDto eventSourceForItem(EventSourceDto base, TachographImportPlanItemDto item) {
String sourceKind = item.sourceKind();
String sourceKey = switch (sourceKind) {
case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT";
case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD";
default -> base.sourceKey();
};
return new EventSourceDto(
base.providerKey(),
sourceKind,
sourceKey,
base.sourceInstanceKey(),
base.tenantProviderSettingKey(),
base.externalFleetKey()
);
}
private EventHubPackageRequest packageRequestFor(
TachographImportRequest request,
EventSourceDto itemEventSource,
TachographImportPlanItemDto item,
TimeChunkDto chunk
) {
return new EventHubPackageRequest(
request.tenantKey(),
itemEventSource,
request.sourceGroup(),
request.importScope(),
item.eventFamily().name(),
null,
externalPackageId(request, item, chunk)
);
}
private String externalPackageId(TachographImportRequest request, TachographImportPlanItemDto item, TimeChunkDto chunk) {
String scope = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
return "TACHOGRAPH:" + item.sourceKind() + ":" + item.extractionCode() + ":" + item.eventFamily()
+ ":" + scope + ":CHUNK-" + chunk.sequence();
}
private String packageKey(UUID importRunId, EventHubPackageRequest packageInfo, TachographImportPlanItemDto item, TimeChunkDto chunk, int batchNo) {
return packageInfo.tenantKey()
+ ":" + packageInfo.eventSource().stableKey()
+ ":" + item.eventFamily()
+ ":" + item.extractionCode()
+ ":RUN-" + importRunId
+ ":CHUNK-" + chunk.sequence()
+ ":BATCH-" + batchNo;
}
private Map<String, Object> metadata(TachographImportRequest request, TachographImportPlanItemDto item, TimeChunkDto chunk, UUID importRunId) {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("importRunId", importRunId.toString());
metadata.put("mode", request.mode().name());
metadata.put("acquisitionStrategy", request.acquisitionStrategy().name());
metadata.put("refreshMasterDataFirst", request.refreshMasterDataFirst());
metadata.put("eventFamily", item.eventFamily().name());
metadata.put("sourceKind", item.sourceKind());
metadata.put("extractionCode", item.extractionCode());
metadata.put("sourceTables", item.sourceTables());
metadata.put("entityAxis", item.entityAxis());
metadata.put("chunkSequence", chunk.sequence());
metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString());
metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString());
metadata.put("sourcePackageRefPolicy", "Original tachograph card/VU package is preserved per acquired event when SQL extraction returns it.");
return metadata;
}
}

View File

@ -1,9 +1,15 @@
package at.procon.eventhub.service; package at.procon.eventhub.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily; import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.TachographImportPlanDto; import at.procon.eventhub.dto.TachographImportPlanDto;
import at.procon.eventhub.dto.TachographImportPlanItemDto; import at.procon.eventhub.dto.TachographImportPlanItemDto;
import at.procon.eventhub.dto.TachographImportRequest; import at.procon.eventhub.dto.TachographImportRequest;
import at.procon.eventhub.dto.TimeChunkDto;
import java.time.OffsetDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -11,10 +17,16 @@ import org.springframework.stereotype.Service;
@Service @Service
public class TachographImportPlanService { public class TachographImportPlanService {
private final EventHubProperties properties;
public TachographImportPlanService(EventHubProperties properties) {
this.properties = properties;
}
public TachographImportPlanDto createPlan(TachographImportRequest request) { public TachographImportPlanDto createPlan(TachographImportRequest request) {
List<TachographImportPlanItemDto> items = new ArrayList<>(); List<TachographImportPlanItemDto> items = new ArrayList<>();
for (EventFamily family : request.eventFamilies()) { for (EventFamily family : request.eventFamilies()) {
items.addAll(itemsFor(family)); items.addAll(itemsFor(family, request.acquisitionStrategy()));
} }
return new TachographImportPlanDto( return new TachographImportPlanDto(
request.tenantKey(), request.tenantKey(),
@ -24,42 +36,75 @@ public class TachographImportPlanService {
request.importScope(), request.importScope(),
request.sourceGroup(), request.sourceGroup(),
request.eventSource(), request.eventSource(),
chunksFor(request),
items items
); );
} }
private List<TachographImportPlanItemDto> itemsFor(EventFamily family) { private List<TimeChunkDto> chunksFor(TachographImportRequest request) {
ImportScopeDto scope = request.importScope();
OffsetDateTime from = scope == null ? null : scope.occurredFrom();
OffsetDateTime to = scope == null ? null : scope.occurredTo();
// Source-package driven increments discover original card/VU packages by source-package
// watermark. The occurred window may be null because package period and imported-at
// timestamps are used later by the extractor.
if (request.mode() == ImportMode.INCREMENTAL_UPDATE
&& request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) {
return List.of(new TimeChunkDto(1, from, to));
}
if (from == null || to == null) {
return List.of(new TimeChunkDto(1, from, to));
}
List<TimeChunkDto> chunks = new ArrayList<>();
int days = Math.max(1, properties.getTachograph().getDefaultChunkDays());
OffsetDateTime cursor = from;
int sequence = 1;
while (cursor.isBefore(to)) {
OffsetDateTime next = cursor.plusDays(days);
if (next.isAfter(to)) {
next = to;
}
chunks.add(new TimeChunkDto(sequence++, cursor, next));
cursor = next;
}
return chunks.isEmpty() ? List.of(new TimeChunkDto(1, from, to)) : chunks;
}
private List<TachographImportPlanItemDto> itemsFor(EventFamily family, AcquisitionStrategy strategy) {
return switch (family) { return switch (family) {
case DRIVER_ACTIVITY -> List.of( case DRIVER_ACTIVITY -> List.of(
item(family, "VEHICLE_UNIT", "VU_ACTIVITY", List.of("VUActivity"), "VEHICLE", "Vehicle-unit driver activity point events"), item(family, "VEHICLE_UNIT", "VU_ACTIVITY", List.of("VUActivity"), "VEHICLE", "Vehicle-unit driver activity point events", strategy),
item(family, "DRIVER_CARD", "CARD_ACTIVITY", List.of("CardActivity"), "DRIVER", "Driver-card activity point events") item(family, "DRIVER_CARD", "CARD_ACTIVITY", List.of("CardActivity"), "DRIVER", "Driver-card activity point events", strategy)
); );
case DRIVER_CARD -> List.of( case DRIVER_CARD -> List.of(
item(family, "VEHICLE_UNIT", "IW_CYCLE", List.of("IWCycle"), "BOTH", "Card insert/withdraw events from VU cycles"), item(family, "VEHICLE_UNIT", "IW_CYCLE", List.of("IWCycle"), "BOTH", "Card insert/withdraw events from VU cycles", strategy),
item(family, "DRIVER_CARD", "CARD_VEHICLES_USED", List.of("CardVehiclesUsed"), "DRIVER", "Card insert/withdraw/use events from card vehicle usage") item(family, "DRIVER_CARD", "CARD_VEHICLES_USED", List.of("CardVehiclesUsed"), "DRIVER", "Card insert/withdraw/use events from card vehicle usage", strategy)
); );
case POSITION -> List.of( case POSITION -> List.of(
item(family, "VEHICLE_UNIT", "VU_POSITION", List.of("VUPlaces", "VULoadUnload", "VUGnssAccumulatedDriving", "VUBorderCrossing"), "VEHICLE", "Position points from VU tachograph sources"), item(family, "VEHICLE_UNIT", "VU_POSITION", List.of("VUPlaces", "VULoadUnload", "VUGnssAccumulatedDriving", "VUBorderCrossing"), "VEHICLE", "Position points from VU tachograph sources", strategy),
item(family, "DRIVER_CARD", "CARD_POSITION", List.of("CardPlaces", "CardLoadUnload", "CardGnssAccumulatedDriving", "CardBorderCrossing"), "DRIVER", "Position points from driver-card tachograph sources") item(family, "DRIVER_CARD", "CARD_POSITION", List.of("CardPlaces", "CardLoadUnload", "CardGnssAccumulatedDriving", "CardBorderCrossing"), "DRIVER", "Position points from driver-card tachograph sources", strategy)
); );
case BORDER_CROSSING -> List.of( case BORDER_CROSSING -> List.of(
item(family, "VEHICLE_UNIT", "VU_BORDER_CROSSING", List.of("VUBorderCrossing"), "VEHICLE", "Border crossing events from VU"), item(family, "VEHICLE_UNIT", "VU_BORDER_CROSSING", List.of("VUBorderCrossing"), "VEHICLE", "Border crossing events from VU", strategy),
item(family, "DRIVER_CARD", "CARD_BORDER_CROSSING", List.of("CardBorderCrossing"), "DRIVER", "Border crossing events from driver card") item(family, "DRIVER_CARD", "CARD_BORDER_CROSSING", List.of("CardBorderCrossing"), "DRIVER", "Border crossing events from driver card", strategy)
); );
case LOAD_UNLOAD -> List.of( case LOAD_UNLOAD -> List.of(
item(family, "VEHICLE_UNIT", "VU_LOAD_UNLOAD", List.of("VULoadUnload"), "VEHICLE", "Load/unload operation events from VU"), item(family, "VEHICLE_UNIT", "VU_LOAD_UNLOAD", List.of("VULoadUnload"), "VEHICLE", "Load/unload operation events from VU", strategy),
item(family, "DRIVER_CARD", "CARD_LOAD_UNLOAD", List.of("CardLoadUnload"), "DRIVER", "Load/unload operation events from driver card") item(family, "DRIVER_CARD", "CARD_LOAD_UNLOAD", List.of("CardLoadUnload"), "DRIVER", "Load/unload operation events from driver card", strategy)
); );
case SPECIFIC_CONDITION -> List.of( case SPECIFIC_CONDITION -> List.of(
item(family, "VEHICLE_UNIT", "VU_SPECIFIC_CONDITION", List.of("VUSpecificCondition"), "VEHICLE", "Out-of-scope and ferry/train events from VU"), item(family, "VEHICLE_UNIT", "VU_SPECIFIC_CONDITION", List.of("VUSpecificCondition"), "VEHICLE", "Out-of-scope and ferry/train events from VU", strategy),
item(family, "DRIVER_CARD", "CARD_SPECIFIC_CONDITION", List.of("CardSpecificCondition"), "DRIVER", "Out-of-scope and ferry/train events from driver card") item(family, "DRIVER_CARD", "CARD_SPECIFIC_CONDITION", List.of("CardSpecificCondition"), "DRIVER", "Out-of-scope and ferry/train events from driver card", strategy)
); );
case PLACE -> List.of( case PLACE -> List.of(
item(family, "VEHICLE_UNIT", "VU_PLACE", List.of("VUPlaces"), "VEHICLE", "Start/end place events from VU"), item(family, "VEHICLE_UNIT", "VU_PLACE", List.of("VUPlaces"), "VEHICLE", "Start/end place events from VU", strategy),
item(family, "DRIVER_CARD", "CARD_PLACE", List.of("CardPlaces"), "DRIVER", "Start/end place events from driver card") item(family, "DRIVER_CARD", "CARD_PLACE", List.of("CardPlaces"), "DRIVER", "Start/end place events from driver card", strategy)
); );
case SPEEDING -> List.of( case SPEEDING -> List.of(
item(family, "VEHICLE_UNIT", "SPEEDING_EVENTS", List.of("SpeedingEvents"), "VEHICLE", "Speeding begin/end events") item(family, "VEHICLE_UNIT", "SPEEDING_EVENTS", List.of("SpeedingEvents"), "VEHICLE", "Speeding begin/end events", strategy)
); );
}; };
} }
@ -70,8 +115,9 @@ public class TachographImportPlanService {
String extractionCode, String extractionCode,
List<String> sourceTables, List<String> sourceTables,
String entityAxis, String entityAxis,
String description String description,
AcquisitionStrategy strategy
) { ) {
return new TachographImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description); return new TachographImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description, strategy);
} }
} }

View File

@ -66,6 +66,7 @@ public class TelematicsPositionEventMapper {
source.odometerM(), source.odometerM(),
new GeoPointDto(source.latitude(), source.longitude()), new GeoPointDto(source.latitude(), source.longitude()),
detailsFactory.position(source.positionReason()), detailsFactory.position(source.positionReason()),
null,
detailsFactory.payloadFromMap(source.payload()), detailsFactory.payloadFromMap(source.payload()),
false, false,
packageInfo packageInfo

View File

@ -80,6 +80,7 @@ public class YellowFoxD8BookingEventMapper {
source.odometerM(), source.odometerM(),
new GeoPointDto(source.latitude(), source.longitude()), new GeoPointDto(source.latitude(), source.longitude()),
detailsFor(normalized), detailsFor(normalized),
null,
detailsFactory.payloadFromMap(payload), detailsFactory.payloadFromMap(payload),
false, false,
packageInfo packageInfo

View File

@ -29,3 +29,6 @@ eventhub:
batch: batch:
completion-size: 1000 completion-size: 1000
completion-timeout: 5s completion-timeout: 5s
tachograph:
default-chunk-days: 1
occurred-at-overlap: 7d

View File

@ -18,7 +18,9 @@ create table if not exists eventhub.event_source (
constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key) constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key)
); );
-- One execution of a tachograph acquisition job. A run may create many data packages. -- One execution of a tachograph acquisition job. A run may create many EventHub
-- extraction packages. The original tachograph card/VU packages are preserved
-- separately as source_package_* references, not used as EventHub package identity.
create table if not exists eventhub.import_run ( create table if not exists eventhub.import_run (
id uuid primary key, id uuid primary key,
tenant_key text not null, tenant_key text not null,
@ -43,14 +45,17 @@ create table if not exists eventhub.import_run (
requested_event_families text[] not null default '{}', requested_event_families text[] not null default '{}',
acquisition_strategy text, acquisition_strategy text,
metadata jsonb not null default '{}'::jsonb, metadata jsonb not null default '{}'::jsonb,
planned_package_count integer not null default 0,
started_at timestamptz not null default now(), started_at timestamptz not null default now(),
finished_at timestamptz, finished_at timestamptz,
error_message text, error_message text,
constraint chk_import_run_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to) constraint chk_import_run_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to)
); );
-- Optional cursor table for scheduled/difference imports. The first implementation can -- Optional cursor table for scheduled/difference imports. For regular updates,
-- use occurredAt windows; later it can switch to source-package or source-row watermarks. -- prefer source-package watermarks because new source packages may contain older
-- occurredAt events. Fallback strategies can use source-row updatedAt or occurredAt
-- overlap windows. Cursor scope is separated by scope_hash and event family/source kind.
create table if not exists eventhub.import_cursor ( create table if not exists eventhub.import_cursor (
id uuid primary key, id uuid primary key,
tenant_key text not null, tenant_key text not null,
@ -67,8 +72,10 @@ create table if not exists eventhub.import_cursor (
constraint ux_import_cursor unique (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type) constraint ux_import_cursor unique (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type)
); );
-- One coherent acquisition package, e.g. tenant + TACHOGRAPH/VEHICLE_UNIT/DRIVER_ACTIVITY/import scope. -- One EventHub acquisition package. It represents an extraction batch, not
-- This table captures the source grouping and the organisation/time import scope used for acquisition. -- necessarily one original tachograph package. A single extraction batch can
-- contain rows from multiple original card/VU packages; those are preserved on
-- acquired_event.source_package_* where available.
create table if not exists eventhub.data_package ( create table if not exists eventhub.data_package (
id uuid primary key, id uuid primary key,
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
@ -94,20 +101,39 @@ create table if not exists eventhub.data_package (
event_family text, event_family text,
business_date date, business_date date,
external_package_id text, external_package_id text,
extraction_code text,
extraction_source_kind text,
entity_axis text,
batch_no integer,
chunk_from timestamptz,
chunk_to timestamptz,
-- Optional package-level source package reference. Usually null for extraction
-- batches, because the original tachograph package is stored per acquired event.
source_package_kind text,
source_package_id text,
source_package_entity_id text,
source_package_period_from timestamptz,
source_package_period_to timestamptz,
source_package_imported_at timestamptz,
received_at timestamptz not null default now(), received_at timestamptz not null default now(),
completed_at timestamptz, completed_at timestamptz,
event_count integer not null default 0, event_count integer not null default 0,
metadata jsonb not null default '{}'::jsonb, metadata jsonb not null default '{}'::jsonb,
error_message text, error_message text,
constraint ux_data_package_external unique (tenant_key, event_source_id, external_package_id, received_at), constraint ux_data_package_package_key unique (tenant_key, event_source_id, package_key),
constraint chk_data_package_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to) constraint chk_data_package_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to),
constraint chk_data_package_chunk_time_order check (chunk_from is null or chunk_to is null or chunk_from < chunk_to)
); );
-- Temporary acquisition-stage point-event store. -- Temporary acquisition-stage point-event store.
-- It keeps acquired point events with EventSource context, externalSourceEventId, -- It keeps acquired point events with EventSource context, externalSourceEventId,
-- one occurredAt timestamp, source-side driver/vehicle refs, normalized event details, -- one occurredAt timestamp, source-side driver/vehicle refs, normalized event details,
-- and raw JSON payload. Organisation is intentionally not stored per event; it belongs -- optional original source-package reference, and raw JSON payload. Organisation is
-- to master-data relations for driver/vehicle and is represented in importScope/sourceGroup. -- intentionally not stored per event; it belongs to master-data relations for
-- driver/vehicle and is represented in importScope/sourceGroup.
create table if not exists eventhub.acquired_event ( create table if not exists eventhub.acquired_event (
id uuid not null, id uuid not null,
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
@ -124,6 +150,13 @@ create table if not exists eventhub.acquired_event (
vehicle_registration_nation text, vehicle_registration_nation text,
vehicle_registration_number text, vehicle_registration_number text,
source_package_kind text,
source_package_id text,
source_package_entity_id text,
source_package_period_from timestamptz,
source_package_period_to timestamptz,
source_package_imported_at timestamptz,
occurred_at timestamptz not null, occurred_at timestamptz not null,
received_partner_at timestamptz, received_partner_at timestamptz,
received_hub_at timestamptz not null default now(), received_hub_at timestamptz not null default now(),
@ -161,7 +194,8 @@ create table if not exists eventhub.acquired_event (
), ),
constraint chk_acquired_event_position_pair check ((latitude is null and longitude is null) or (latitude is not null and longitude is not null)), constraint chk_acquired_event_position_pair check ((latitude is null and longitude is null) or (latitude is not null and longitude is not null)),
constraint chk_driver_card_nation_when_number check (driver_card_number is null or driver_card_nation is not null), constraint chk_driver_card_nation_when_number check (driver_card_number is null or driver_card_nation is not null),
constraint chk_vehicle_registration_nation_when_number check (vehicle_registration_number is null or vehicle_registration_nation is not null) constraint chk_vehicle_registration_nation_when_number check (vehicle_registration_number is null or vehicle_registration_nation is not null),
constraint chk_acquired_event_source_package_time_order check (source_package_period_from is null or source_package_period_to is null or source_package_period_from < source_package_period_to)
); );
create unique index if not exists ux_acquired_event_source_record create unique index if not exists ux_acquired_event_source_record
@ -171,6 +205,10 @@ create index if not exists idx_acquired_event_signature
on eventhub.acquired_event(event_signature_hash) on eventhub.acquired_event(event_signature_hash)
where event_signature_hash is not null; where event_signature_hash is not null;
create index if not exists idx_acquired_event_source_package
on eventhub.acquired_event(source_package_kind, source_package_id, source_package_imported_at)
where source_package_id is not null;
create index if not exists idx_acquired_event_vehicle_vin_time create index if not exists idx_acquired_event_vehicle_vin_time
on eventhub.acquired_event(vehicle_vin, occurred_at desc) on eventhub.acquired_event(vehicle_vin, occurred_at desc)
where vehicle_vin is not null; where vehicle_vin is not null;
@ -198,5 +236,8 @@ create index if not exists idx_data_package_source_time
create index if not exists idx_data_package_scope create index if not exists idx_data_package_scope
on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to); on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to);
create index if not exists idx_data_package_extraction
on eventhub.data_package(tenant_key, event_source_id, import_run_id, event_family, extraction_source_kind, extraction_code, batch_no);
create index if not exists idx_import_run_source_status create index if not exists idx_import_run_source_status
on eventhub.import_run(tenant_key, event_source_id, status, started_at desc); on eventhub.import_run(tenant_key, event_source_id, status, started_at desc);

View File

@ -66,6 +66,7 @@ class EventAcquisitionRecordKeyServiceTest {
null, null,
new EventDetailsDto("DRIVER_ACTIVITY", objectMapper.createObjectNode().put("cardSlot", "DRIVER")), new EventDetailsDto("DRIVER_ACTIVITY", objectMapper.createObjectNode().put("cardSlot", "DRIVER")),
null, null,
objectMapper.createObjectNode(),
false, false,
packageInfo packageInfo
); );