diff --git a/docs/db/create_eventhub_schema.sql b/docs/db/create_eventhub_schema.sql new file mode 100644 index 0000000..490cf3b --- /dev/null +++ b/docs/db/create_eventhub_schema.sql @@ -0,0 +1,317 @@ +create extension if not exists pgcrypto; +create extension if not exists postgis; + +create schema if not exists eventhub; + +create table if not exists eventhub.event_source ( + id integer generated always as identity primary key, + tenant_key text not null, + provider_key text not null, + source_kind text not null, + source_key text not null, + source_instance_key text not null default 'default', + tenant_provider_setting_key text, + external_fleet_key text, + created_at timestamptz not null default now(), + constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key) +); + +create table if not exists eventhub.import_run ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + mode text not null, + status text not null, + refresh_master_data_first boolean not null default true, + source_group_type text, + source_group_entity_id text, + source_group_code text, + source_group_name text, + import_scope_type text not null, + root_source_org_entity_id text, + root_source_org_code text, + root_source_org_name text, + include_children boolean not null default false, + occurred_from timestamptz, + occurred_to timestamptz, + requested_event_families text[] not null default '{}', + acquisition_strategy text, + metadata jsonb not null default '{}'::jsonb, + planned_package_count integer not null default 0, + started_at timestamptz not null default now(), + finished_at timestamptz, + error_message text, + constraint chk_import_run_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to) +); + +create table if not exists eventhub.import_cursor ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + scope_hash text not null, + event_family text not null, + source_kind text not null, + cursor_type text not null, + last_source_package_imported_at timestamptz, + last_source_package_id text, + last_source_row_updated_at timestamptz, + last_occurred_to timestamptz, + updated_at timestamptz not null default now(), + constraint ux_import_cursor unique (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type) +); + +create table if not exists eventhub.data_package ( + id uuid primary key, + event_source_id integer not null references eventhub.event_source(id), + import_run_id uuid references eventhub.import_run(id), + tenant_key text not null, + package_key text not null, + package_type text not null, + status text not null, + source_group_type text, + source_group_entity_id text, + source_group_code text, + source_group_name text, + import_scope_type text, + root_source_org_entity_id text, + root_source_org_code text, + root_source_org_name text, + include_children boolean not null default false, + occurred_from timestamptz, + occurred_to timestamptz, + event_family text, + business_date date, + external_package_id text, + extraction_code text, + extraction_source_kind text, + entity_axis text, + batch_no integer, + chunk_from timestamptz, + chunk_to timestamptz, + source_package_kind text, + source_package_id text, + source_package_entity_id text, + source_package_period_from timestamptz, + source_package_period_to timestamptz, + source_package_imported_at timestamptz, + received_at timestamptz not null default now(), + completed_at timestamptz, + event_count integer not null default 0, + metadata jsonb not null default '{}'::jsonb, + error_message text, + constraint ux_data_package_package_key unique (tenant_key, event_source_id, package_key), + constraint chk_data_package_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to), + constraint chk_data_package_chunk_time_order check (chunk_from is null or chunk_to is null or chunk_from < chunk_to) +); + +create table if not exists eventhub.source_master_entity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + entity_type text not null, + source_entity_id text not null, + source_external_key text, + display_name text, + active boolean, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_master_entity unique (tenant_key, event_source_id, entity_type, source_entity_id), + constraint chk_source_master_entity_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) +); + +create table if not exists eventhub.source_master_relation ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + relation_key text not null, + relation_type text not null, + from_entity_type text not null, + from_source_entity_id text not null, + to_entity_type text not null, + to_source_entity_id text not null, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_master_relation unique (tenant_key, event_source_id, relation_key), + constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) +); + +create table if not exists eventhub.vehicle ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_vehicle_entity_id text, + vin text, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table if not exists eventhub.vehicle_registration ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_registration_entity_id text, + nation text not null, + registration_number text not null, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table if not exists eventhub.vehicle_registration_assignment ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade, + vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint chk_vehicle_registration_assignment_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) +); + +create table if not exists eventhub.event ( + id uuid not null, + event_source_id integer not null references eventhub.event_source(id), + data_package_id uuid not null references eventhub.data_package(id), + external_source_event_id text not null, + driver_entity_id uuid references eventhub.source_master_entity(id), + vehicle_id uuid references eventhub.vehicle(id), + vehicle_registration_id uuid references eventhub.vehicle_registration(id), + source_package_entity_id uuid references eventhub.source_master_entity(id), + occurred_at timestamptz not null, + received_partner_at timestamptz, + received_hub_at timestamptz not null default now(), + event_domain text not null, + event_type text not null, + lifecycle text not null, + odometer_m bigint, + position geography(Point, 4326), + payload jsonb not null default '{}'::jsonb, + manual_entry boolean not null default false, + source_record_key_hash text not null, + event_signature_hash text, + created_at timestamptz not null default now(), + constraint pk_event primary key (occurred_at, id), + constraint chk_event_driver_or_vehicle_ref check ( + driver_entity_id is not null + or vehicle_id is not null + or vehicle_registration_id is not null + ) +); + +create table if not exists eventhub.event_detail ( + event_occurred_at timestamptz not null, + event_id uuid not null, + detail_type text not null, + attributes jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + constraint pk_event_detail primary key (event_occurred_at, event_id, detail_type), + constraint fk_event_detail_event foreign key (event_occurred_at, event_id) + references eventhub.event(occurred_at, id) + on delete cascade +); + +create unique index if not exists ux_event_source_record + on eventhub.event(source_record_key_hash); + +create index if not exists idx_event_signature + on eventhub.event(event_signature_hash) + where event_signature_hash is not null; + +create index if not exists idx_event_source_time + on eventhub.event(event_source_id, occurred_at desc); + +create index if not exists idx_event_package_time + on eventhub.event(data_package_id, occurred_at desc); + +create index if not exists idx_event_domain_type_time + on eventhub.event(event_domain, event_type, occurred_at desc); + +create index if not exists idx_event_driver_time + on eventhub.event(driver_entity_id, occurred_at desc) + where driver_entity_id is not null; + +create index if not exists idx_event_vehicle_time + on eventhub.event(vehicle_id, occurred_at desc) + where vehicle_id is not null; + +create index if not exists idx_event_vehicle_registration_time + on eventhub.event(vehicle_registration_id, occurred_at desc) + where vehicle_registration_id is not null; + +create index if not exists idx_event_position_gist + on eventhub.event using gist(position) + where position is not null; + +create index if not exists idx_event_payload_gin + on eventhub.event using gin(payload); + +create index if not exists idx_event_detail_type + on eventhub.event_detail(detail_type); + +create index if not exists idx_event_detail_attributes_gin + on eventhub.event_detail using gin(attributes); + +create index if not exists idx_source_master_entity_type_key + on eventhub.source_master_entity(tenant_key, event_source_id, entity_type, source_external_key) + where source_external_key is not null; + +create index if not exists idx_source_master_entity_payload_gin + on eventhub.source_master_entity using gin(payload); + +create index if not exists idx_source_master_relation_from + on eventhub.source_master_relation(tenant_key, event_source_id, from_entity_type, from_source_entity_id, relation_type); + +create index if not exists idx_source_master_relation_to + on eventhub.source_master_relation(tenant_key, event_source_id, to_entity_type, to_source_entity_id, relation_type); + +create index if not exists idx_source_master_relation_payload_gin + on eventhub.source_master_relation using gin(payload); + +create index if not exists idx_vehicle_lookup_ctx + on eventhub.vehicle(tenant_key, event_source_id, updated_at desc); + +create index if not exists idx_vehicle_source_entity + on eventhub.vehicle(tenant_key, event_source_id, source_vehicle_entity_id) + where source_vehicle_entity_id is not null; + +create index if not exists idx_vehicle_vin + on eventhub.vehicle(tenant_key, event_source_id, vin) + where vin is not null; + +create index if not exists idx_vehicle_registration_source_entity + on eventhub.vehicle_registration(tenant_key, event_source_id, source_registration_entity_id) + where source_registration_entity_id is not null; + +create index if not exists idx_vehicle_registration_plate + on eventhub.vehicle_registration(tenant_key, event_source_id, nation, registration_number); + +create index if not exists idx_vehicle_registration_assignment_registration_time + on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to); + +create index if not exists idx_vehicle_registration_assignment_vehicle_time + on eventhub.vehicle_registration_assignment(vehicle_id, valid_from desc, valid_to); + +create index if not exists idx_data_package_source_time + on eventhub.data_package(tenant_key, event_source_id, received_at desc); + +create index if not exists idx_data_package_scope + on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to); + +create index if not exists idx_data_package_extraction + on eventhub.data_package(tenant_key, event_source_id, import_run_id, event_family, extraction_source_kind, extraction_code, batch_no); + +create index if not exists idx_import_run_source_status + on eventhub.import_run(tenant_key, event_source_id, status, started_at desc); diff --git a/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java index 4bdaeab..66bfe2b 100644 --- a/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java +++ b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java @@ -8,6 +8,8 @@ import org.springframework.stereotype.Component; @Component public class EventHubCommonIngestionRoute extends RouteBuilder { + private static final String BATCH_INPUT_QUEUE = "seda:eventhub-batch-input"; + private final EventHubProperties properties; private final EventHubEventValidationProcessor validationProcessor; private final EventHubPackageKeyProcessor packageKeyProcessor; @@ -33,13 +35,15 @@ public class EventHubCommonIngestionRoute extends RouteBuilder { @Override public void configure() { + String batchInputUri = batchInputUri(); + from("direct:eventhub-normalized-input") .routeId("eventhub-normalized-input-route") .process(validationProcessor) .process(packageKeyProcessor) - .to("seda:eventhub-batch-input"); + .to(batchInputUri); - from("seda:eventhub-batch-input") + from(batchInputUri) .routeId("eventhub-batch-and-persist-route") .aggregate(header(EventHubHeaders.PACKAGE_KEY), aggregationStrategy) .completionSize(properties.getBatch().getCompletionSize()) @@ -48,4 +52,12 @@ public class EventHubCommonIngestionRoute extends RouteBuilder { .process(batchBuildProcessor) .bean(ingestionService, "ingest"); } + + private String batchInputUri() { + EventHubProperties.Batch batch = properties.getBatch(); + return BATCH_INPUT_QUEUE + + "?size=" + batch.getQueueSize() + + "&blockWhenFull=" + batch.isBlockWhenFull() + + "&offerTimeout=" + batch.getQueueOfferTimeout().toMillis(); + } } diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index ae3f158..a208d07 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -37,6 +37,15 @@ public class EventHubProperties { /** Maximum time to wait for more events belonging to the same package key. */ private Duration completionTimeout = Duration.ofSeconds(5); + /** Maximum number of normalized events buffered before producers apply backpressure. */ + private int queueSize = 10000; + + /** Whether producers should wait for queue capacity instead of failing immediately. */ + private boolean blockWhenFull = true; + + /** Maximum time a producer waits for queue capacity before failing the import. */ + private Duration queueOfferTimeout = Duration.ofMinutes(5); + public int getCompletionSize() { return completionSize; } @@ -52,6 +61,32 @@ public class EventHubProperties { public void setCompletionTimeout(Duration completionTimeout) { this.completionTimeout = completionTimeout; } + + public int getQueueSize() { + return queueSize; + } + + public void setQueueSize(int queueSize) { + this.queueSize = Math.max(1, queueSize); + } + + public boolean isBlockWhenFull() { + return blockWhenFull; + } + + public void setBlockWhenFull(boolean blockWhenFull) { + this.blockWhenFull = blockWhenFull; + } + + public Duration getQueueOfferTimeout() { + return queueOfferTimeout; + } + + public void setQueueOfferTimeout(Duration queueOfferTimeout) { + if (queueOfferTimeout != null && !queueOfferTimeout.isNegative()) { + this.queueOfferTimeout = queueOfferTimeout; + } + } } public static class Tachograph { diff --git a/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java b/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java index 4439950..ae17576 100644 --- a/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java +++ b/src/main/java/at/procon/eventhub/dto/EventHubEventBatchDto.java @@ -1,6 +1,8 @@ package at.procon.eventhub.dto; import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -15,6 +17,6 @@ public record EventHubEventBatchDto( ) { public EventHubEventBatchDto { events = events == null ? List.of() : List.copyOf(events); - metadata = metadata == null ? Map.of() : Map.copyOf(metadata); + metadata = metadata == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(metadata)); } } diff --git a/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java b/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java index e5a7a02..bf48fe5 100644 --- a/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java +++ b/src/main/java/at/procon/eventhub/dto/VehicleRefDto.java @@ -3,36 +3,56 @@ package at.procon.eventhub.dto; import jakarta.validation.Valid; /** - * Source-side vehicle reference. VIN can be missing for driver-card-only data; - * VRN/registration is nation-scoped and can be resolved to VIN later. + * Source-side vehicle reference. A physical tachograph vehicle is identified by + * VehicleIdentification.ID/VIN. A registration/plate is identified separately by + * Vehicle.ID/VRN because plates can move between vehicles over time. * - * Organisation assignment is intentionally not stored on the event. Vehicle ↔ - * organisation relation belongs to master data and can be resolved by - * sourceEntityId/VIN/VRN + occurredAt when needed. + * Organisation assignment is intentionally not stored on the event. Vehicle and + * registration relations belong to master data and can be resolved historically. */ public record VehicleRefDto( - String sourceEntityId, + String sourceVehicleEntityId, String vin, - @Valid VehicleRegistrationRefDto vehicleRegistration + String sourceRegistrationEntityId, + @Valid at.procon.eventhub.dto.VehicleRegistrationRefDto vehicleRegistration ) { + public VehicleRefDto( + String sourceEntityId, + String vin, + at.procon.eventhub.dto.VehicleRegistrationRefDto vehicleRegistration + ) { + this(sourceEntityId, vin, null, vehicleRegistration); + } + public VehicleRefDto { - sourceEntityId = normalizeNullable(sourceEntityId); + sourceVehicleEntityId = normalizeNullable(sourceVehicleEntityId); vin = normalizeVin(vin); + sourceRegistrationEntityId = normalizeNullable(sourceRegistrationEntityId); } public boolean hasAnyReference() { - return (sourceEntityId != null && !sourceEntityId.isBlank()) + return (sourceVehicleEntityId != null && !sourceVehicleEntityId.isBlank()) || (vin != null && !vin.isBlank()) + || (sourceRegistrationEntityId != null && !sourceRegistrationEntityId.isBlank()) || (vehicleRegistration != null && vehicleRegistration.hasValue()); } public String stableKey() { String registrationKey = vehicleRegistration == null ? "" : vehicleRegistration.stableKey(); - return (sourceEntityId == null ? "" : sourceEntityId) + "|" + return (sourceVehicleEntityId == null ? "" : sourceVehicleEntityId) + "|" + (vin == null ? "" : vin) + "|" + + (sourceRegistrationEntityId == null ? "" : sourceRegistrationEntityId) + "|" + registrationKey; } + /** + * Backward-compatible accessor for older code paths that treated vehicle and + * registration source IDs as one field. + */ + public String sourceEntityId() { + return sourceVehicleEntityId != null ? sourceVehicleEntityId : sourceRegistrationEntityId; + } + private static String normalizeNullable(String value) { return value == null || value.isBlank() ? null : value.trim(); } diff --git a/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java index 338ff26..7ca889f 100644 --- a/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java @@ -109,13 +109,17 @@ public abstract class AbstractJdbcExtractionBatchExecutor parameters(R request, ImportScopeDto scope, ImportCursorStateDto cursor) { Map params = new HashMap<>(); + String organisationId = scope == null || scope.rootSourceOrganisation() == null + ? null + : scope.rootSourceOrganisation().sourceEntityId(); params.put("tenantKey", request.tenantKey()); params.put("occurredFrom", scope == null ? null : scope.occurredFrom()); params.put("occurredTo", scope == null ? null : scope.occurredTo()); - params.put("rootOrganisationId", scope == null || scope.rootSourceOrganisation() == null ? null : scope.rootSourceOrganisation().sourceEntityId()); + params.put("organisationId", organisationId); params.put("includeChildren", scope != null && scope.includeChildren()); params.put("lastSourcePackageImportedAt", cursor == null ? null : cursor.lastSourcePackageImportedAt()); params.put("lastSourcePackageId", cursor == null ? null : cursor.lastSourcePackageId()); + params.put("lastSourcePackageIdNumeric", parseLong(cursor == null ? null : cursor.lastSourcePackageId())); params.put("lastSourceRowUpdatedAt", cursor == null ? null : cursor.lastSourceRowUpdatedAt()); params.put("lastOccurredTo", cursor == null ? null : cursor.lastOccurredTo()); return params; @@ -200,4 +204,15 @@ public abstract class AbstractJdbcExtractionBatchExecutor { return Optional.ofNullable(definitionsByCode.get(normalize(code))); } + public Set supportedCodes() { + return definitionsByCode.keySet(); + } + + public List> definitions() { + return definitionsByCode.values().stream() + .sorted(Comparator.comparing(ExtractionDefinition::code)) + .toList(); + } + private String normalize(String value) { return value == null ? "" : value.trim().toUpperCase(Locale.ROOT); } diff --git a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java index 1b9b10e..3d29287 100644 --- a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java @@ -7,12 +7,21 @@ import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.SourceGroupRefDto; import at.procon.eventhub.dto.SourcePackageRefDto; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import java.lang.reflect.Array; import java.time.OffsetDateTime; +import java.time.temporal.TemporalAccessor; +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; @Repository public class DataPackageRepository { @@ -202,6 +211,7 @@ public class DataPackageRepository { ); } + @Transactional(propagation = Propagation.REQUIRES_NEW) public void markFailed(UUID packageId, String errorMessage) { jdbcTemplate.update( """ @@ -217,9 +227,68 @@ public class DataPackageRepository { private String toJson(Map value) { try { - return objectMapper.writeValueAsString(value == null ? Map.of() : value); + return objectMapper.writeValueAsString(normalizeMetadataMap(value)); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Cannot serialize package metadata", e); } } + + private Map normalizeMetadataMap(Map value) { + if (value == null || value.isEmpty()) { + return Map.of(); + } + Map normalized = new LinkedHashMap<>(); + for (Map.Entry entry : value.entrySet()) { + normalized.put(entry.getKey(), normalizeMetadataValue(entry.getValue())); + } + return normalized; + } + + private Object normalizeMetadataValue(Object value) { + if (value == null + || value instanceof String + || value instanceof Number + || value instanceof Boolean + || value instanceof JsonNode) { + return value; + } + if (value instanceof CharSequence charSequence) { + return charSequence.toString(); + } + if (value instanceof Enum enumValue) { + return enumValue.name(); + } + if (value instanceof UUID uuid) { + return uuid.toString(); + } + if (value instanceof TemporalAccessor temporalValue) { + return temporalValue.toString(); + } + if (value instanceof Date dateValue) { + return dateValue.toInstant().toString(); + } + if (value instanceof Map mapValue) { + Map nested = new LinkedHashMap<>(); + for (Map.Entry entry : mapValue.entrySet()) { + nested.put(String.valueOf(entry.getKey()), normalizeMetadataValue(entry.getValue())); + } + return nested; + } + if (value instanceof Iterable iterable) { + List nested = new ArrayList<>(); + for (Object item : iterable) { + nested.add(normalizeMetadataValue(item)); + } + return nested; + } + if (value.getClass().isArray()) { + int length = Array.getLength(value); + List nested = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + nested.add(normalizeMetadataValue(Array.get(value, i))); + } + return nested; + } + return value.toString(); + } } diff --git a/src/main/java/at/procon/eventhub/persistence/EventRepository.java b/src/main/java/at/procon/eventhub/persistence/EventRepository.java index f144d50..f2e107a 100644 --- a/src/main/java/at/procon/eventhub/persistence/EventRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/EventRepository.java @@ -4,8 +4,7 @@ import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.SourcePackageRefDto; -import at.procon.eventhub.dto.VehicleRefDto; -import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.persistence.VehicleIdentityRepository.ResolvedVehicleReference; import at.procon.eventhub.service.EventAcquisitionRecordKeyService; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -26,17 +25,20 @@ public class EventRepository { private final ObjectMapper objectMapper; private final EventAcquisitionRecordKeyService recordKeyService; private final SourceMasterDataRepository sourceMasterDataRepository; + private final VehicleIdentityRepository vehicleIdentityRepository; public EventRepository( JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, EventAcquisitionRecordKeyService recordKeyService, - SourceMasterDataRepository sourceMasterDataRepository + SourceMasterDataRepository sourceMasterDataRepository, + VehicleIdentityRepository vehicleIdentityRepository ) { this.jdbcTemplate = jdbcTemplate; this.objectMapper = objectMapper; this.recordKeyService = recordKeyService; this.sourceMasterDataRepository = sourceMasterDataRepository; + this.vehicleIdentityRepository = vehicleIdentityRepository; } /** @@ -68,6 +70,8 @@ public class EventRepository { UUID requestedEventId = event.eventId() == null ? UUID.randomUUID() : event.eventId(); OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt(); String sourceRecordKeyHash = recordKeyService.buildSourceRecordKeyHash(event, eventSourceId); + var longitude = event.position() == null ? null : event.position().longitude(); + var latitude = event.position() == null ? null : event.position().latitude(); return jdbcTemplate.query( """ @@ -75,7 +79,7 @@ public class EventRepository { insert into eventhub.event( id, event_source_id, data_package_id, external_source_event_id, - driver_entity_id, vehicle_entity_id, source_package_entity_id, + driver_entity_id, vehicle_id, vehicle_registration_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at, event_domain, event_type, lifecycle, odometer_m, position, @@ -84,12 +88,12 @@ public class EventRepository { ) values ( ?, ?, ?, ?, - ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, case - when ? is null or ? is null then null - else ST_SetSRID(ST_MakePoint(?, ?), 4326)::geography + when ?::double precision is null or ?::double precision is null then null + else ST_SetSRID(ST_MakePoint(?::double precision, ?::double precision), 4326)::geography end, ?::jsonb, ?, ?, ? @@ -120,7 +124,8 @@ public class EventRepository { packageId, event.externalSourceEventId(), refs.driverEntityId(), - refs.vehicleEntityId(), + refs.vehicleId(), + refs.vehicleRegistrationId(), refs.sourcePackageEntityId(), event.occurredAt(), event.receivedPartnerAt(), @@ -129,10 +134,10 @@ public class EventRepository { event.eventType().name(), event.lifecycle().name(), event.odometerM(), - event.position() == null ? null : event.position().longitude(), - event.position() == null ? null : event.position().latitude(), - event.position() == null ? null : event.position().longitude(), - event.position() == null ? null : event.position().latitude(), + longitude, + latitude, + longitude, + latitude, toJson(event.payload()), event.manualEntry(), sourceRecordKeyHash, @@ -168,9 +173,9 @@ public class EventRepository { Map entityIdCache ) { UUID driverEntityId = resolveDriverEntityId(tenantKey, eventSourceId, event, entityIdCache); - UUID vehicleEntityId = resolveVehicleEntityId(tenantKey, eventSourceId, event, entityIdCache); + ResolvedVehicleReference vehicleRef = resolveVehicleReference(tenantKey, eventSourceId, event); UUID sourcePackageEntityId = resolveSourcePackageEntityId(tenantKey, eventSourceId, event, entityIdCache); - return new ResolvedEntityRefs(driverEntityId, vehicleEntityId, sourcePackageEntityId); + return new ResolvedEntityRefs(driverEntityId, vehicleRef.vehicleId(), vehicleRef.vehicleRegistrationId(), sourcePackageEntityId); } private UUID resolveDriverEntityId( @@ -211,45 +216,16 @@ public class EventRepository { ); } - private UUID resolveVehicleEntityId( + private ResolvedVehicleReference resolveVehicleReference( String tenantKey, int eventSourceId, - EventHubEventDto event, - Map entityIdCache + EventHubEventDto event ) { - VehicleRefDto vehicleRef = event.vehicleRef(); - if (vehicleRef == null || !vehicleRef.hasAnyReference()) { - return null; - } - - VehicleRegistrationRefDto registration = vehicleRef.vehicleRegistration(); - String registrationKey = registration == null || !registration.hasValue() ? null : registration.stableKey(); - String sourceEntityId = normalizeNullable(vehicleRef.sourceEntityId()); - if (sourceEntityId == null && normalizeNullable(vehicleRef.vin()) != null) { - sourceEntityId = "VIN:" + vehicleRef.vin(); - } - if (sourceEntityId == null && registrationKey != null) { - sourceEntityId = "VRN:" + registrationKey; - } - if (sourceEntityId == null) { - return null; - } - - Map payload = new LinkedHashMap<>(); - put(payload, "source_entity_id", vehicleRef.sourceEntityId()); - put(payload, "vin", vehicleRef.vin()); - put(payload, "vehicle_registration_nation", registration == null ? null : registration.nation()); - put(payload, "vehicle_registration_number", registration == null ? null : registration.number()); - return resolveEntityId( + return vehicleIdentityRepository.resolveOrCreateVehicleReference( tenantKey, eventSourceId, - "VEHICLE", - sourceEntityId, - normalizeNullable(vehicleRef.vin()) == null ? registrationKey : vehicleRef.vin(), - sourceEntityId, - null, - payload, - entityIdCache + event.vehicleRef(), + event.occurredAt() ); } @@ -357,7 +333,8 @@ public class EventRepository { private record ResolvedEntityRefs( UUID driverEntityId, - UUID vehicleEntityId, + UUID vehicleId, + UUID vehicleRegistrationId, UUID sourcePackageEntityId ) { } diff --git a/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java b/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java index 4c75c02..57de7b0 100644 --- a/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java @@ -3,11 +3,21 @@ package at.procon.eventhub.persistence; import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert; import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.POJONode; +import java.lang.reflect.Array; import java.sql.PreparedStatement; +import java.sql.Timestamp; +import java.time.temporal.TemporalAccessor; +import java.time.temporal.TemporalAmount; +import java.util.ArrayList; +import java.util.Base64; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; @@ -180,13 +190,125 @@ public class SourceMasterDataRepository { } private String toJson(Map value) { + Map source = value == null ? Map.of() : value; try { - return objectMapper.writeValueAsString(value == null ? Map.of() : value); + return objectMapper.writeValueAsString(normalizeJsonMap(source)); } catch (JsonProcessingException e) { - throw new IllegalArgumentException("Cannot serialize source master data payload", e); + throw new IllegalArgumentException( + "Cannot serialize source master data payload. payloadTypes=" + describePayloadTypes(source), + e + ); } } + private Map normalizeJsonMap(Map source) { + Map normalized = new LinkedHashMap<>(source.size()); + for (Map.Entry entry : source.entrySet()) { + if (entry.getKey() == null) { + continue; + } + normalized.put(String.valueOf(entry.getKey()), normalizeJsonValue(entry.getValue())); + } + return normalized; + } + + private Object normalizeJsonValue(Object value) { + if (value == null + || value instanceof String + || value instanceof Number + || value instanceof Boolean) { + return value; + } + if (value instanceof JsonNode node) { + return normalizeJsonNode(node); + } + if (value instanceof Enum enumValue) { + return enumValue.name(); + } + if (value instanceof UUID uuid) { + return uuid.toString(); + } + if (value instanceof byte[] bytes) { + return Base64.getEncoder().encodeToString(bytes); + } + if (value instanceof Timestamp timestamp) { + return timestamp.toInstant().toString(); + } + if (value instanceof java.util.Date date) { + return date.toInstant().toString(); + } + if (value instanceof TemporalAccessor || value instanceof TemporalAmount) { + return value.toString(); + } + if (value instanceof Map map) { + return normalizeJsonMap(map); + } + if (value instanceof Iterable iterable) { + List values = new ArrayList<>(); + for (Object entry : iterable) { + values.add(normalizeJsonValue(entry)); + } + return values; + } + if (value.getClass().isArray()) { + int length = Array.getLength(value); + List values = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + values.add(normalizeJsonValue(Array.get(value, i))); + } + return values; + } + return String.valueOf(value); + } + + private Object normalizeJsonNode(JsonNode node) { + if (node == null || node.isNull() || node.isMissingNode()) { + return null; + } + if (node instanceof POJONode pojoNode) { + return normalizeJsonValue(pojoNode.getPojo()); + } + if (node.isObject()) { + return normalizeJsonMap(objectNodeToMap(node)); + } + if (node.isArray()) { + List values = new ArrayList<>(); + for (JsonNode child : node) { + values.add(normalizeJsonNode(child)); + } + return values; + } + if (node.isTextual()) { + return node.textValue(); + } + if (node.isNumber()) { + return node.numberValue(); + } + if (node.isBoolean()) { + return node.booleanValue(); + } + if (node.isBinary()) { + return node.asText(); + } + return node.asText(); + } + + private Map objectNodeToMap(JsonNode node) { + Map values = new LinkedHashMap<>(); + node.fields().forEachRemaining(entry -> values.put(entry.getKey(), entry.getValue())); + return values; + } + + private String describePayloadTypes(Map payload) { + if (payload.isEmpty()) { + return "{}"; + } + return payload.entrySet() + .stream() + .map(entry -> entry.getKey() + "=" + (entry.getValue() == null ? "null" : entry.getValue().getClass().getName())) + .collect(Collectors.joining(", ", "{", "}")); + } + private String normalizeRequired(String value, String fieldName) { String normalized = normalizeNullable(value); if (normalized == null) { diff --git a/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java new file mode 100644 index 0000000..a8465c5 --- /dev/null +++ b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java @@ -0,0 +1,596 @@ +package at.procon.eventhub.persistence; + +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +public class VehicleIdentityRepository { + + private final JdbcTemplate jdbcTemplate; + private final ObjectMapper objectMapper; + + public VehicleIdentityRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) { + this.jdbcTemplate = jdbcTemplate; + this.objectMapper = objectMapper; + } + + public ResolvedVehicleReference resolveOrCreateVehicleReference( + String tenantKey, + int eventSourceId, + VehicleRefDto vehicleRef, + OffsetDateTime occurredAt + ) { + if (vehicleRef == null || !vehicleRef.hasAnyReference()) { + return ResolvedVehicleReference.empty(); + } + + String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); + String sourceVehicleEntityId = normalizeNullable(vehicleRef.sourceVehicleEntityId()); + String vin = normalizeNullable(vehicleRef.vin()); + String sourceRegistrationEntityId = normalizeNullable(vehicleRef.sourceRegistrationEntityId()); + VehicleRegistrationRefDto registration = vehicleRef.vehicleRegistration(); + String registrationNation = registration == null ? null : normalizeNullable(registration.nation()); + String registrationNumber = registration == null ? null : normalizeNullable(registration.number()); + + UUID registrationId = resolveRegistrationId( + normalizedTenantKey, + eventSourceId, + sourceRegistrationEntityId, + registrationNation, + registrationNumber, + occurredAt + ); + if (registrationId == null && (sourceRegistrationEntityId != null || registrationNumber != null)) { + registrationId = createRegistration( + normalizedTenantKey, + eventSourceId, + sourceRegistrationEntityId, + registrationNation, + registrationNumber, + null, + Map.of("source", "event") + ); + } + + UUID vehicleId = resolveVehicleId( + normalizedTenantKey, + eventSourceId, + sourceVehicleEntityId, + vin + ); + if (vehicleId == null && registrationId != null) { + vehicleId = resolveAssignedVehicleId(registrationId, occurredAt); + } + if (vehicleId == null && (sourceVehicleEntityId != null || vin != null)) { + vehicleId = createVehicle( + normalizedTenantKey, + eventSourceId, + sourceVehicleEntityId, + vin + ); + } + + if (vehicleId != null) { + touchVehicle(vehicleId, sourceVehicleEntityId, vin); + } + if (registrationId != null) { + touchRegistration(registrationId, sourceRegistrationEntityId, registrationNation, registrationNumber); + } + return new ResolvedVehicleReference(vehicleId, registrationId); + } + + public int reconcileFromMasterData(String tenantKey, int eventSourceId) { + String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); + int updates = 0; + + List vehicles = jdbcTemplate.query( + compatibleSourcesCte() + """ + select event_source_id, source_entity_id, source_external_key, source_updated_at + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'VEHICLE' + """, + (rs, rowNum) -> new VehicleMasterRow( + rs.getInt("event_source_id"), + normalizeNullable(rs.getString("source_entity_id")), + normalizeNullable(rs.getString("source_external_key")), + offsetDateTime(rs.getObject("source_updated_at")) + ), + eventSourceId, + normalizedTenantKey + ); + for (VehicleMasterRow vehicle : vehicles) { + if (vehicle.sourceVehicleEntityId() == null && vehicle.vin() == null) { + continue; + } + UUID vehicleId = resolveVehicleId(normalizedTenantKey, vehicle.eventSourceId(), vehicle.sourceVehicleEntityId(), vehicle.vin()); + if (vehicleId == null) { + createVehicle(normalizedTenantKey, vehicle.eventSourceId(), vehicle.sourceVehicleEntityId(), vehicle.vin()); + } else { + touchVehicle(vehicleId, vehicle.sourceVehicleEntityId(), vehicle.vin()); + } + updates++; + } + + List registrations = jdbcTemplate.query( + compatibleSourcesCte() + """ + select event_source_id, + source_entity_id, + source_external_key, + source_updated_at, + payload ->> 'registration_nation' as registration_nation, + payload ->> 'registration_number' as registration_number + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'VEHICLE_REGISTRATION' + """, + (rs, rowNum) -> registrationMasterRow( + rs.getInt("event_source_id"), + normalizeNullable(rs.getString("source_entity_id")), + normalizeNullable(rs.getString("source_external_key")), + normalizeNullable(rs.getString("registration_nation")), + normalizeNullable(rs.getString("registration_number")), + offsetDateTime(rs.getObject("source_updated_at")) + ), + eventSourceId, + normalizedTenantKey + ); + for (RegistrationMasterRow registration : registrations) { + if (registration.nation() == null || registration.registrationNumber() == null) { + continue; + } + UUID registrationId = resolveRegistrationId( + normalizedTenantKey, + registration.eventSourceId(), + registration.sourceRegistrationEntityId(), + registration.nation(), + registration.registrationNumber(), + null + ); + if (registrationId == null) { + createRegistration( + normalizedTenantKey, + registration.eventSourceId(), + registration.sourceRegistrationEntityId(), + registration.nation(), + registration.registrationNumber(), + registration.sourceUpdatedAt(), + Map.of("source", "master-data") + ); + } else { + updateRegistrationFromMasterData( + registrationId, + registration.sourceRegistrationEntityId(), + registration.nation(), + registration.registrationNumber(), + registration.sourceUpdatedAt() + ); + } + updates++; + } + + updates += projectVehicleRegistrationAssignments(normalizedTenantKey, eventSourceId); + + return updates; + } + + private UUID resolveVehicleId( + String tenantKey, + int eventSourceId, + String sourceVehicleEntityId, + String vin + ) { + UUID vehicleId = findVehicleBySourceVehicleEntityId(tenantKey, eventSourceId, sourceVehicleEntityId); + if (vehicleId == null) { + vehicleId = findVehicleByVin(tenantKey, eventSourceId, vin); + } + return vehicleId; + } + + private UUID resolveRegistrationId( + String tenantKey, + int eventSourceId, + String sourceRegistrationEntityId, + String nation, + String registrationNumber, + OffsetDateTime occurredAt + ) { + UUID registrationId = findRegistrationBySourceRegistrationEntityId(tenantKey, eventSourceId, sourceRegistrationEntityId, occurredAt); + if (registrationId == null) { + registrationId = findRegistrationByPlate(tenantKey, eventSourceId, nation, registrationNumber, occurredAt); + } + return registrationId; + } + + private UUID findVehicleBySourceVehicleEntityId(String tenantKey, int eventSourceId, String sourceVehicleEntityId) { + if (sourceVehicleEntityId == null) { + return null; + } + return jdbcTemplate.query( + compatibleSourcesCte() + """ + select v.id + from eventhub.vehicle v + where v.tenant_key = ? + and v.event_source_id in (select id from compatible_sources) + and v.source_vehicle_entity_id = ? + order by v.updated_at desc + limit 1 + """, + rs -> rs.next() ? (UUID) rs.getObject("id") : null, + eventSourceId, + tenantKey, + sourceVehicleEntityId + ); + } + + private UUID findVehicleByVin(String tenantKey, int eventSourceId, String vin) { + if (vin == null) { + return null; + } + return jdbcTemplate.query( + compatibleSourcesCte() + """ + select v.id + from eventhub.vehicle v + where v.tenant_key = ? + and v.event_source_id in (select id from compatible_sources) + and v.vin = ? + order by v.updated_at desc + limit 1 + """, + rs -> rs.next() ? (UUID) rs.getObject("id") : null, + eventSourceId, + tenantKey, + vin + ); + } + + private UUID findRegistrationBySourceRegistrationEntityId( + String tenantKey, + int eventSourceId, + String sourceRegistrationEntityId, + OffsetDateTime occurredAt + ) { + if (sourceRegistrationEntityId == null) { + return null; + } + return jdbcTemplate.query( + compatibleSourcesCte() + """ + select r.id + from eventhub.vehicle_registration r + where r.tenant_key = ? + and r.event_source_id in (select id from compatible_sources) + and r.source_registration_entity_id = ? + order by r.updated_at desc + limit 1 + """, + rs -> rs.next() ? (UUID) rs.getObject("id") : null, + eventSourceId, + tenantKey, + sourceRegistrationEntityId + ); + } + + private UUID findRegistrationByPlate( + String tenantKey, + int eventSourceId, + String nation, + String registrationNumber, + OffsetDateTime occurredAt + ) { + if (nation == null || registrationNumber == null) { + return null; + } + return jdbcTemplate.query( + compatibleSourcesCte() + """ + select r.id + from eventhub.vehicle_registration r + where r.tenant_key = ? + and r.event_source_id in (select id from compatible_sources) + and r.nation = ? + and r.registration_number = ? + order by r.updated_at desc + limit 1 + """, + rs -> rs.next() ? (UUID) rs.getObject("id") : null, + eventSourceId, + tenantKey, + nation, + registrationNumber + ); + } + + private UUID resolveAssignedVehicleId(UUID registrationId, OffsetDateTime occurredAt) { + return jdbcTemplate.query( + """ + select vehicle_id + from eventhub.vehicle_registration_assignment + where vehicle_registration_id = ? + and (? is null or valid_from is null or valid_from <= ?) + and (? is null or valid_to is null or ? < valid_to) + order by valid_from desc nulls last, updated_at desc + limit 1 + """, + rs -> rs.next() ? (UUID) rs.getObject("vehicle_id") : null, + registrationId, + occurredAt, + occurredAt, + occurredAt, + occurredAt + ); + } + + private UUID createVehicle( + String tenantKey, + int eventSourceId, + String sourceVehicleEntityId, + String vin + ) { + UUID vehicleId = UUID.randomUUID(); + jdbcTemplate.update( + """ + insert into eventhub.vehicle(id, tenant_key, event_source_id, source_vehicle_entity_id, vin, updated_at) + values (?, ?, ?, ?, ?, now()) + """, + vehicleId, + tenantKey, + eventSourceId, + sourceVehicleEntityId, + vin + ); + return vehicleId; + } + + private UUID createRegistration( + String tenantKey, + int eventSourceId, + String sourceRegistrationEntityId, + String nation, + String registrationNumber, + OffsetDateTime sourceUpdatedAt, + Map payload + ) { + if (nation == null || registrationNumber == null) { + return null; + } + UUID registrationId = UUID.randomUUID(); + jdbcTemplate.update( + """ + insert into eventhub.vehicle_registration( + id, tenant_key, event_source_id, source_registration_entity_id, nation, registration_number, + source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) + """, + registrationId, + tenantKey, + eventSourceId, + sourceRegistrationEntityId, + nation, + registrationNumber, + sourceUpdatedAt, + toJson(payload) + ); + return registrationId; + } + + private void touchVehicle(UUID vehicleId, String sourceVehicleEntityId, String vin) { + jdbcTemplate.update( + """ + update eventhub.vehicle + set source_vehicle_entity_id = coalesce(source_vehicle_entity_id, ?), + vin = coalesce(vin, ?), + updated_at = now() + where id = ? + """, + sourceVehicleEntityId, + vin, + vehicleId + ); + } + + private int projectVehicleRegistrationAssignments(String tenantKey, int eventSourceId) { + return jdbcTemplate.update( + compatibleSourcesCte() + """ + insert into eventhub.vehicle_registration_assignment( + id, tenant_key, event_source_id, vehicle_registration_id, vehicle_id, + valid_from, valid_to, source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + relation.tenant_key, + relation.event_source_id, + registration.id, + vehicle.id, + relation.valid_from, + relation.valid_to, + relation.source_updated_at, + jsonb_build_object( + 'source', 'master-data', + 'sourceRelationId', relation.id, + 'relationKey', relation.relation_key + ), + now() + from eventhub.source_master_relation relation + join eventhub.vehicle_registration registration on registration.tenant_key = relation.tenant_key + and registration.event_source_id = relation.event_source_id + and registration.source_registration_entity_id = relation.from_source_entity_id + join eventhub.vehicle vehicle on vehicle.tenant_key = relation.tenant_key + and vehicle.event_source_id = relation.event_source_id + and vehicle.source_vehicle_entity_id = relation.to_source_entity_id + where relation.tenant_key = ? + and relation.event_source_id in (select id from compatible_sources) + and relation.relation_type = 'VEHICLE_REGISTRATION_VEHICLE' + and relation.from_entity_type = 'VEHICLE_REGISTRATION' + and relation.to_entity_type = 'VEHICLE' + and not exists ( + select 1 + from eventhub.vehicle_registration_assignment existing + where existing.vehicle_registration_id = registration.id + and existing.vehicle_id = vehicle.id + and existing.valid_from is not distinct from relation.valid_from + and existing.valid_to is not distinct from relation.valid_to + ) + """, + eventSourceId, + tenantKey + ); + } + + private void touchRegistration(UUID registrationId, String sourceRegistrationEntityId, String nation, String registrationNumber) { + jdbcTemplate.update( + """ + update eventhub.vehicle_registration + set source_registration_entity_id = coalesce(source_registration_entity_id, ?), + nation = coalesce(?, nation), + registration_number = coalesce(?, registration_number), + updated_at = now() + where id = ? + """, + sourceRegistrationEntityId, + nation, + registrationNumber, + registrationId + ); + } + + private void updateRegistrationFromMasterData( + UUID registrationId, + String sourceRegistrationEntityId, + String nation, + String registrationNumber, + OffsetDateTime sourceUpdatedAt + ) { + jdbcTemplate.update( + """ + update eventhub.vehicle_registration + set source_registration_entity_id = coalesce(source_registration_entity_id, ?), + nation = coalesce(?, nation), + registration_number = coalesce(?, registration_number), + source_updated_at = ?, + updated_at = now() + where id = ? + """, + sourceRegistrationEntityId, + nation, + registrationNumber, + sourceUpdatedAt, + registrationId + ); + } + + private String compatibleSourcesCte() { + return """ + with source_context as ( + select tenant_key, provider_key, source_instance_key, coalesce(tenant_provider_setting_key, '') as tenant_provider_setting_key + from eventhub.event_source + where id = ? + ), + compatible_sources as ( + select es.id + from eventhub.event_source es + join source_context ctx on ctx.tenant_key = es.tenant_key + and ctx.provider_key = es.provider_key + and ctx.source_instance_key = es.source_instance_key + and ctx.tenant_provider_setting_key = coalesce(es.tenant_provider_setting_key, '') + ) + """; + } + + private RegistrationMasterRow registrationMasterRow( + int eventSourceId, + String sourceRegistrationEntityId, + String sourceExternalKey, + String nation, + String registrationNumber, + OffsetDateTime sourceUpdatedAt + ) { + String resolvedNation = nation; + String resolvedRegistrationNumber = registrationNumber; + if ((resolvedNation == null || resolvedRegistrationNumber == null) && sourceExternalKey != null) { + int separator = sourceExternalKey.indexOf(':'); + if (separator > -1) { + resolvedNation = resolvedNation == null ? normalizeNullable(sourceExternalKey.substring(0, separator)) : resolvedNation; + resolvedRegistrationNumber = resolvedRegistrationNumber == null + ? normalizeNullable(sourceExternalKey.substring(separator + 1)) + : resolvedRegistrationNumber; + } + } + return new RegistrationMasterRow( + eventSourceId, + sourceRegistrationEntityId, + resolvedNation, + resolvedRegistrationNumber, + sourceUpdatedAt + ); + } + + private OffsetDateTime offsetDateTime(Object value) { + if (value == null) { + return null; + } + if (value instanceof OffsetDateTime offsetDateTime) { + return offsetDateTime; + } + if (value instanceof Timestamp timestamp) { + return timestamp.toInstant().atOffset(ZoneOffset.UTC); + } + if (value instanceof java.time.LocalDateTime localDateTime) { + return localDateTime.atOffset(ZoneOffset.UTC); + } + return OffsetDateTime.parse(value.toString()); + } + + private String toJson(Map value) { + try { + return objectMapper.writeValueAsString(value == null ? Map.of() : value); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Cannot serialize vehicle identity payload", e); + } + } + + private String normalizeRequired(String value, String fieldName) { + String normalized = normalizeNullable(value); + if (normalized == null) { + throw new IllegalArgumentException(fieldName + " must not be blank"); + } + return normalized; + } + + private String normalizeNullable(String value) { + if (value == null) { + return null; + } + String trimmed = value.trim(); + return trimmed.isEmpty() ? null : trimmed; + } + + public record ResolvedVehicleReference(UUID vehicleId, UUID vehicleRegistrationId) { + public static ResolvedVehicleReference empty() { + return new ResolvedVehicleReference(null, null); + } + } + + private record VehicleMasterRow(int eventSourceId, String sourceVehicleEntityId, String vin, OffsetDateTime sourceUpdatedAt) { + } + + private record RegistrationMasterRow( + int eventSourceId, + String sourceRegistrationEntityId, + String nation, + String registrationNumber, + OffsetDateTime sourceUpdatedAt + ) { + } + +} diff --git a/src/main/java/at/procon/eventhub/service/EventAcquisitionRecordKeyService.java b/src/main/java/at/procon/eventhub/service/EventAcquisitionRecordKeyService.java index f2d6863..d0fa240 100644 --- a/src/main/java/at/procon/eventhub/service/EventAcquisitionRecordKeyService.java +++ b/src/main/java/at/procon/eventhub/service/EventAcquisitionRecordKeyService.java @@ -68,13 +68,20 @@ public class EventAcquisitionRecordKeyService { if (event.vehicleRef() == null) { return ""; } - if (event.vehicleRef().vehicleRegistration() != null && event.vehicleRef().vehicleRegistration().hasValue()) { - return "VRN:" + event.vehicleRef().vehicleRegistration().stableKey(); - } - if (event.vehicleRef().vin() != null && !event.vehicleRef().vin().isBlank()) { + boolean hasVin = event.vehicleRef().vin() != null && !event.vehicleRef().vin().isBlank(); + boolean hasRegistration = event.vehicleRef().vehicleRegistration() != null + && event.vehicleRef().vehicleRegistration().hasValue(); + + if (hasVin) { return "VIN:" + event.vehicleRef().vin(); } - return "SOURCE_VEHICLE:" + nullToEmpty(event.vehicleRef().sourceEntityId()); + if (hasRegistration) { + return "VRN:" + event.vehicleRef().vehicleRegistration().stableKey(); + } + if (event.vehicleRef().sourceVehicleEntityId() != null) { + return "SOURCE_VEHICLE:" + event.vehicleRef().sourceVehicleEntityId(); + } + return "SOURCE_REGISTRATION:" + nullToEmpty(event.vehicleRef().sourceRegistrationEntityId()); } private String normalizeTime(OffsetDateTime value) { diff --git a/src/main/java/at/procon/eventhub/tachograph/api/TachographApiExceptionHandler.java b/src/main/java/at/procon/eventhub/tachograph/api/TachographApiExceptionHandler.java new file mode 100644 index 0000000..7251f16 --- /dev/null +++ b/src/main/java/at/procon/eventhub/tachograph/api/TachographApiExceptionHandler.java @@ -0,0 +1,22 @@ +package at.procon.eventhub.tachograph.api; + +import at.procon.eventhub.tachograph.service.UnsupportedTachographExtractionException; +import java.time.OffsetDateTime; +import java.util.Map; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +@RestControllerAdvice(basePackageClasses = TachographIngestionController.class) +public class TachographApiExceptionHandler { + + @ExceptionHandler(UnsupportedTachographExtractionException.class) + public ResponseEntity> handleUnsupportedExtraction(UnsupportedTachographExtractionException ex) { + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Map.of( + "error", "UNSUPPORTED_TACHOGRAPH_EXTRACTION", + "message", ex.getMessage(), + "timestamp", OffsetDateTime.now().toString() + )); + } +} diff --git a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java index 163549f..6e2f14c 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java @@ -19,6 +19,7 @@ import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; +import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.LinkedHashMap; @@ -40,6 +41,9 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe SourcePackageRefDto sourcePackageRef = sourcePackageRef(rs); DriverRefDto driverRef = driverRef(rs); VehicleRefDto vehicleRef = vehicleRef(rs); + CardSlot cardSlot = cardSlot(rs); + CardStatus cardStatus = cardStatus(rs); + DrivingStatus drivingStatus = drivingStatus(rs); String externalSourceEventId = string(rs, "external_source_event_id"); if (externalSourceEventId == null) { @@ -59,15 +63,21 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe lifecycle(rs), longValue(rs, "odometer_m"), null, - detailsFactory.driverActivity(cardSlot(rs), cardStatus(rs), drivingStatus(rs)), + detailsFactory.driverActivity(cardSlot, cardStatus, drivingStatus), sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null, detailsFactory.payloadFromMap(payload(rs, context)), - false, + isManualEntry(cardStatus, drivingStatus), context.packageInfo() ); } - protected abstract Map sourceSpecificPayload(ResultSet rs) throws SQLException; + protected Map sourceSpecificPayload(ResultSet rs) throws SQLException { + return Map.of(); + } + + private boolean isManualEntry(CardStatus cardStatus, DrivingStatus drivingStatus) { + return cardStatus == CardStatus.NOT_INSERTED && drivingStatus == DrivingStatus.KNOWN; + } private DriverRefDto driverRef(ResultSet rs) throws SQLException { DriverCardRefDto driverCard = null; @@ -85,7 +95,12 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe if (registrationNumber != null) { registration = new VehicleRegistrationRefDto(string(rs, "vehicle_registration_nation"), registrationNumber); } - VehicleRefDto vehicleRef = new VehicleRefDto(string(rs, "vehicle_source_entity_id"), string(rs, "vehicle_vin"), registration); + VehicleRefDto vehicleRef = new VehicleRefDto( + string(rs, "vehicle_source_entity_id"), + string(rs, "vehicle_vin"), + string(rs, "vehicle_registration_source_entity_id"), + registration + ); return vehicleRef.hasAnyReference() ? vehicleRef : null; } @@ -102,24 +117,7 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe private Map payload(ResultSet rs, ExtractionContext context) throws SQLException { Map raw = new LinkedHashMap<>(); - raw.put("extractionCode", context.planItem().extractionCode()); - raw.put("sourceKind", context.planItem().sourceKind()); - raw.put("sourceTables", context.planItem().sourceTables()); put(raw, "sourceRowId", string(rs, "source_row_id")); - put(raw, "activityCode", object(rs, "activity_code")); - put(raw, "activityText", string(rs, "activity_text")); - put(raw, "eventType", string(rs, "event_type")); - put(raw, "lifecycle", string(rs, "lifecycle")); - put(raw, "cardSlot", object(rs, "card_slot")); - put(raw, "cardStatus", object(rs, "card_status")); - put(raw, "drivingStatus", object(rs, "driving_status")); - put(raw, "driverSourceEntityId", string(rs, "driver_source_entity_id")); - put(raw, "driverCardNation", string(rs, "driver_card_nation")); - put(raw, "driverCardNumber", string(rs, "driver_card_number")); - put(raw, "vehicleSourceEntityId", string(rs, "vehicle_source_entity_id")); - put(raw, "vehicleVin", string(rs, "vehicle_vin")); - put(raw, "vehicleRegistrationNation", string(rs, "vehicle_registration_nation")); - put(raw, "vehicleRegistrationNumber", string(rs, "vehicle_registration_number")); raw.putAll(sourceSpecificPayload(rs)); return Map.of("raw", raw); } @@ -160,15 +158,20 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe return null; } if (value instanceof OffsetDateTime offsetDateTime) { - return offsetDateTime; + return offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC); } if (value instanceof Timestamp timestamp) { - return timestamp.toInstant().atOffset(ZoneOffset.UTC); + return timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC); } if (value instanceof java.time.LocalDateTime localDateTime) { return localDateTime.atOffset(ZoneOffset.UTC); } - return OffsetDateTime.parse(value.toString()); + String text = value.toString(); + try { + return OffsetDateTime.parse(text).withOffsetSameInstant(ZoneOffset.UTC); + } catch (RuntimeException ignored) { + return LocalDateTime.parse(text).atOffset(ZoneOffset.UTC); + } } private Long longValue(ResultSet rs, String column) throws SQLException { diff --git a/src/main/java/at/procon/eventhub/tachograph/service/CardActivityRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/CardActivityRowMapper.java index d0d135d..0a9bc68 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/CardActivityRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/CardActivityRowMapper.java @@ -1,10 +1,6 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.service.EventDetailsFactory; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.LinkedHashMap; -import java.util.Map; import org.springframework.stereotype.Component; @Component @@ -13,11 +9,4 @@ public class CardActivityRowMapper extends AbstractTachographActivityRowMapper { public CardActivityRowMapper(EventDetailsFactory detailsFactory) { super(detailsFactory); } - - @Override - protected Map sourceSpecificPayload(ResultSet rs) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "cardActivityId", string(rs, "card_activity_id")); - return raw; - } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java index 54a5c65..21d7899 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java @@ -3,6 +3,7 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportCursorStateDto; +import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.importing.extraction.AbstractJdbcExtractionBatchExecutor; @@ -10,7 +11,11 @@ import at.procon.eventhub.importing.extraction.ExtractionDefinition; import at.procon.eventhub.importing.persistence.ImportCursorRepository; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import org.apache.camel.ProducerTemplate; @@ -46,6 +51,21 @@ public class JdbcTachographExtractionBatchExecutor return definitionRegistry.findByCode(code); } + @Override + protected Map parameters( + TachographImportRequest request, + ImportScopeDto scope, + ImportCursorStateDto cursor + ) { + Map params = super.parameters(request, scope, cursor); + params.put("occurredFrom", utcLocalDateTime(scope == null ? null : scope.occurredFrom())); + params.put("occurredTo", utcLocalDateTime(scope == null ? null : scope.occurredTo())); + params.put("lastSourcePackageImportedAt", utcLocalDateTime(cursor == null ? null : cursor.lastSourcePackageImportedAt())); + params.put("lastSourceRowUpdatedAt", utcLocalDateTime(cursor == null ? null : cursor.lastSourceRowUpdatedAt())); + params.put("lastOccurredTo", utcLocalDateTime(cursor == null ? null : cursor.lastOccurredTo())); + return params; + } + @Override protected TachographExtractionBatchResultDto resultFor( UUID packageId, @@ -91,4 +111,8 @@ public class JdbcTachographExtractionBatchExecutor protected String providerPackagePrefix() { return "TACHOGRAPH"; } + + private LocalDateTime utcLocalDateTime(OffsetDateTime value) { + return value == null ? null : value.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime(); + } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java index 97a07bf..416898b 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportPlanService.java @@ -9,6 +9,8 @@ import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.springframework.stereotype.Service; @Service @@ -16,10 +18,16 @@ public class TachographImportPlanService { private final EventHubProperties properties; private final ImportChunkPlanner chunkPlanner; + private final TachographExtractionDefinitionRegistry definitionRegistry; - public TachographImportPlanService(EventHubProperties properties, ImportChunkPlanner chunkPlanner) { + public TachographImportPlanService( + EventHubProperties properties, + ImportChunkPlanner chunkPlanner, + TachographExtractionDefinitionRegistry definitionRegistry + ) { this.properties = properties; this.chunkPlanner = chunkPlanner; + this.definitionRegistry = definitionRegistry; } public ImportPlanDto createPlan(TachographImportRequest request) { @@ -27,6 +35,7 @@ public class TachographImportPlanService { for (EventFamily family : request.eventFamilies()) { items.addAll(itemsFor(family, request.acquisitionStrategy())); } + validateJdbcExtractions(items, request.eventFamilies()); return new ImportPlanDto( request.tenantKey(), request.mode(), @@ -87,4 +96,47 @@ public class TachographImportPlanService { ) { return new ImportPlanItemDto(family, sourceKind, extractionCode, sourceTables, entityAxis, description, strategy); } + + private void validateJdbcExtractions(List items, Set requestedFamilies) { + String jdbcUrl = properties.getTachograph().getDatasource().getJdbcUrl(); + if (jdbcUrl == null || jdbcUrl.isBlank()) { + return; + } + + List unsupportedCodes = items.stream() + .map(ImportPlanItemDto::extractionCode) + .filter(code -> definitionRegistry.findByCode(code).isEmpty()) + .distinct() + .sorted() + .toList(); + if (unsupportedCodes.isEmpty()) { + return; + } + + List supportedCodes = definitionRegistry.supportedCodes().stream() + .sorted() + .toList(); + List supportedFamilies = definitionRegistry.definitions().stream() + .map(definition -> definition.eventFamily().name()) + .distinct() + .sorted() + .toList(); + String requestedFamilyNames = requestedFamilies.stream() + .map(EventFamily::name) + .sorted() + .collect(Collectors.joining(", ")); + + throw new UnsupportedTachographExtractionException( + "Tachograph JDBC extraction is enabled, but the plan contains extraction codes without JDBC definitions: " + + unsupportedCodes + + ". Supported JDBC extraction codes are: " + + supportedCodes + + ". Requested event families: [" + + requestedFamilyNames + + "]. " + + "Use only supported event families " + + supportedFamilies + + " for JDBC execution, or add the missing codes to TachographExtractionDefinitionRegistry." + ); + } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java index 1363500..83dc5ff 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java @@ -5,6 +5,7 @@ import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert; import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert; import at.procon.eventhub.persistence.EventSourceRepository; import at.procon.eventhub.persistence.SourceMasterDataRepository; +import at.procon.eventhub.persistence.VehicleIdentityRepository; import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -12,6 +13,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Timestamp; +import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.LinkedHashMap; @@ -45,17 +47,20 @@ public class TachographMasterDataRefreshService { private final ObjectProvider tachographJdbcTemplateProvider; private final SourceMasterDataRepository sourceMasterDataRepository; private final EventSourceRepository eventSourceRepository; + private final VehicleIdentityRepository vehicleIdentityRepository; private final ResourceLoader resourceLoader; public TachographMasterDataRefreshService( @Qualifier("tachographNamedParameterJdbcTemplate") ObjectProvider tachographJdbcTemplateProvider, SourceMasterDataRepository sourceMasterDataRepository, EventSourceRepository eventSourceRepository, + VehicleIdentityRepository vehicleIdentityRepository, ResourceLoader resourceLoader ) { this.tachographJdbcTemplateProvider = tachographJdbcTemplateProvider; this.sourceMasterDataRepository = sourceMasterDataRepository; this.eventSourceRepository = eventSourceRepository; + this.vehicleIdentityRepository = vehicleIdentityRepository; this.resourceLoader = resourceLoader; } @@ -93,10 +98,11 @@ public class TachographMasterDataRefreshService { (rs, rowNum) -> relation(rs) ); int relationCount = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, relations); + int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount); - log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={}", - tenantKey, request.eventSource().stableKey(), result.entitiesUpserted(), result.relationsUpserted()); + log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={} reconciledVehicles={}", + tenantKey, request.eventSource().stableKey(), result.entitiesUpserted(), result.relationsUpserted(), reconciledVehicles); return result; } @@ -198,15 +204,20 @@ public class TachographMasterDataRefreshService { return null; } if (value instanceof OffsetDateTime offsetDateTime) { - return offsetDateTime; + return offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC); } if (value instanceof Timestamp timestamp) { - return timestamp.toInstant().atOffset(ZoneOffset.UTC); + return timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC); } if (value instanceof java.time.LocalDateTime localDateTime) { return localDateTime.atOffset(ZoneOffset.UTC); } - return OffsetDateTime.parse(value.toString()); + String text = value.toString(); + try { + return OffsetDateTime.parse(text).withOffsetSameInstant(ZoneOffset.UTC); + } catch (RuntimeException ignored) { + return LocalDateTime.parse(text).atOffset(ZoneOffset.UTC); + } } private ValidityRange normalizeValidityRange( diff --git a/src/main/java/at/procon/eventhub/tachograph/service/UnsupportedTachographExtractionException.java b/src/main/java/at/procon/eventhub/tachograph/service/UnsupportedTachographExtractionException.java new file mode 100644 index 0000000..d7f11cf --- /dev/null +++ b/src/main/java/at/procon/eventhub/tachograph/service/UnsupportedTachographExtractionException.java @@ -0,0 +1,8 @@ +package at.procon.eventhub.tachograph.service; + +public class UnsupportedTachographExtractionException extends IllegalArgumentException { + + public UnsupportedTachographExtractionException(String message) { + super(message); + } +} diff --git a/src/main/java/at/procon/eventhub/tachograph/service/VuActivityRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/VuActivityRowMapper.java index 08a86cd..c07e563 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/VuActivityRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/VuActivityRowMapper.java @@ -1,10 +1,6 @@ package at.procon.eventhub.tachograph.service; import at.procon.eventhub.service.EventDetailsFactory; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.LinkedHashMap; -import java.util.Map; import org.springframework.stereotype.Component; @Component @@ -13,11 +9,4 @@ public class VuActivityRowMapper extends AbstractTachographActivityRowMapper { public VuActivityRowMapper(EventDetailsFactory detailsFactory) { super(detailsFactory); } - - @Override - protected Map sourceSpecificPayload(ResultSet rs) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "vuActivityId", string(rs, "vu_activity_id")); - return raw; - } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e3c2e23..26d6e30 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -12,7 +12,7 @@ spring: create-schemas: true server: - port: 8080 + port: 8085 camel: springboot: @@ -29,6 +29,9 @@ eventhub: batch: completion-size: 1000 completion-timeout: 5s + queue-size: 10000 + block-when-full: true + queue-offer-timeout: 5m tachograph: default-chunk-days: 1 occurred-at-overlap: 7d diff --git a/src/main/resources/db/migration/V5__introduce_vehicle_identity_aggregate.sql b/src/main/resources/db/migration/V5__introduce_vehicle_identity_aggregate.sql new file mode 100644 index 0000000..db66718 --- /dev/null +++ b/src/main/resources/db/migration/V5__introduce_vehicle_identity_aggregate.sql @@ -0,0 +1,75 @@ +create table eventhub.vehicle ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table eventhub.vehicle_identifier ( + id uuid primary key, + vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + identifier_type text not null, + nation text, + identifier_value text not null, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint chk_vehicle_identifier_valid_time_order + check (valid_from is null or valid_to is null or valid_from <= valid_to), + constraint chk_vehicle_identifier_nation_for_vrn + check ( + identifier_type <> 'VRN' + or nation is not null + ) +); + +create index idx_vehicle_lookup_ctx + on eventhub.vehicle(tenant_key, event_source_id, updated_at desc); + +create index idx_vehicle_identifier_lookup + on eventhub.vehicle_identifier( + tenant_key, event_source_id, identifier_type, nation, identifier_value, valid_from desc + ); + +create unique index ux_vehicle_identifier_exact + on eventhub.vehicle_identifier( + vehicle_id, + identifier_type, + coalesce(nation, ''), + identifier_value, + coalesce(valid_from, '-infinity'::timestamptz), + coalesce(valid_to, 'infinity'::timestamptz) + ); + +alter table eventhub.event + rename column vehicle_entity_id to vehicle_id; + +alter table eventhub.event + drop constraint if exists event_vehicle_entity_id_fkey; + +alter table eventhub.event + add constraint fk_event_vehicle + foreign key (vehicle_id) + references eventhub.vehicle(id); + +drop index if exists eventhub.idx_event_vehicle_time; + +create index idx_event_vehicle_time + on eventhub.event(vehicle_id, occurred_at desc) + where vehicle_id is not null; + +alter table eventhub.event + drop constraint if exists chk_event_driver_or_vehicle_ref; + +alter table eventhub.event + add constraint chk_event_driver_or_vehicle_ref + check ( + driver_entity_id is not null + or vehicle_id is not null + ); diff --git a/src/main/resources/db/migration/V6__introduce_vehicle_registration_model.sql b/src/main/resources/db/migration/V6__introduce_vehicle_registration_model.sql new file mode 100644 index 0000000..61a8e7d --- /dev/null +++ b/src/main/resources/db/migration/V6__introduce_vehicle_registration_model.sql @@ -0,0 +1,241 @@ +alter table eventhub.vehicle + add column if not exists source_vehicle_entity_id text, + add column if not exists vin text; + +do $migration$ +begin + if to_regclass('eventhub.vehicle_identifier') is not null + and exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'vehicle_identifier' + and column_name = 'vehicle_id' + ) + then + execute $sql$ + update eventhub.vehicle v + set source_vehicle_entity_id = src.identifier_value + from eventhub.vehicle_identifier src + where src.vehicle_id = v.id + and src.identifier_type = 'SOURCE_VEHICLE' + and v.source_vehicle_entity_id is null + $sql$; + + execute $sql$ + update eventhub.vehicle v + set vin = src.identifier_value + from eventhub.vehicle_identifier src + where src.vehicle_id = v.id + and src.identifier_type = 'VIN' + and v.vin is null + $sql$; + end if; +end +$migration$; + +create table if not exists eventhub.vehicle_registration ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_registration_entity_id text, + nation text not null, + registration_number text not null, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint chk_vehicle_registration_valid_time_order + check (valid_from is null or valid_to is null or valid_from <= valid_to) +); + +create table if not exists eventhub.vehicle_registration_assignment ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade, + vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint chk_vehicle_registration_assignment_valid_time_order + check (valid_from is null or valid_to is null or valid_from <= valid_to) +); + +do $migration$ +begin + if to_regclass('eventhub.vehicle_identifier') is not null + and exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'vehicle_identifier' + and column_name = 'vehicle_id' + ) + then + execute $sql$ + insert into eventhub.vehicle_registration( + id, tenant_key, event_source_id, source_registration_entity_id, nation, registration_number, + valid_from, valid_to, payload + ) + select gen_random_uuid(), + src.tenant_key, + src.event_source_id, + source_reg.identifier_value, + src.nation, + src.identifier_value, + src.valid_from, + src.valid_to, + src.payload + from eventhub.vehicle_identifier src + left join eventhub.vehicle_identifier source_reg on source_reg.vehicle_id = src.vehicle_id + and source_reg.identifier_type = 'SOURCE_REGISTRATION' + where src.identifier_type = 'VRN' + and src.nation is not null + and not exists ( + select 1 + from eventhub.vehicle_registration existing + where existing.tenant_key = src.tenant_key + and existing.event_source_id = src.event_source_id + and existing.nation = src.nation + and existing.registration_number = src.identifier_value + and existing.valid_from is not distinct from src.valid_from + and existing.valid_to is not distinct from src.valid_to + ) + $sql$; + + execute $sql$ + insert into eventhub.vehicle_registration_assignment( + id, tenant_key, event_source_id, vehicle_registration_id, vehicle_id, valid_from, valid_to, payload + ) + select gen_random_uuid(), + reg.tenant_key, + reg.event_source_id, + reg.id, + v.id, + reg.valid_from, + reg.valid_to, + jsonb_build_object('migrated_from', 'vehicle_identifier') + from eventhub.vehicle_registration reg + join eventhub.vehicle_identifier vrn on vrn.tenant_key = reg.tenant_key + and vrn.event_source_id = reg.event_source_id + and vrn.identifier_type = 'VRN' + and vrn.nation = reg.nation + and vrn.identifier_value = reg.registration_number + and vrn.valid_from is not distinct from reg.valid_from + and vrn.valid_to is not distinct from reg.valid_to + join eventhub.vehicle v on v.id = vrn.vehicle_id + where not exists ( + select 1 + from eventhub.vehicle_registration_assignment existing + where existing.vehicle_registration_id = reg.id + and existing.vehicle_id = v.id + and existing.valid_from is not distinct from reg.valid_from + and existing.valid_to is not distinct from reg.valid_to + ) + $sql$; + end if; +end +$migration$; + +do $migration$ +begin + if exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'event' + and column_name = 'vehicle_entity_id' + ) + and not exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'event' + and column_name = 'vehicle_id' + ) + then + alter table eventhub.event rename column vehicle_entity_id to vehicle_id; + end if; + + if not exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'event' + and column_name = 'vehicle_id' + ) + then + alter table eventhub.event add column vehicle_id uuid references eventhub.vehicle(id); + end if; +end +$migration$; + +alter table eventhub.event + drop constraint if exists event_vehicle_entity_id_fkey; + +alter table eventhub.event + drop constraint if exists fk_event_vehicle; + +do $migration$ +begin + if exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'event' + and column_name = 'vehicle_id' + ) + then + alter table eventhub.event + add constraint fk_event_vehicle + foreign key (vehicle_id) + references eventhub.vehicle(id) + not valid; + end if; +end +$migration$; + +alter table eventhub.event + add column if not exists vehicle_registration_id uuid references eventhub.vehicle_registration(id); + +create index if not exists idx_vehicle_source_entity + on eventhub.vehicle(tenant_key, event_source_id, source_vehicle_entity_id) + where source_vehicle_entity_id is not null; + +create index if not exists idx_vehicle_vin + on eventhub.vehicle(tenant_key, event_source_id, vin) + where vin is not null; + +create index if not exists idx_vehicle_registration_source_entity + on eventhub.vehicle_registration(tenant_key, event_source_id, source_registration_entity_id) + where source_registration_entity_id is not null; + +create index if not exists idx_vehicle_registration_plate + on eventhub.vehicle_registration(tenant_key, event_source_id, nation, registration_number, valid_from desc); + +create index if not exists idx_vehicle_registration_assignment_registration_time + on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to); + +create index if not exists idx_vehicle_registration_assignment_vehicle_time + on eventhub.vehicle_registration_assignment(vehicle_id, valid_from desc, valid_to); + +create index if not exists idx_event_vehicle_registration_time + on eventhub.event(vehicle_registration_id, occurred_at desc) + where vehicle_registration_id is not null; + +alter table eventhub.event + drop constraint if exists chk_event_driver_or_vehicle_ref; + +alter table eventhub.event + add constraint chk_event_driver_or_vehicle_ref + check ( + driver_entity_id is not null + or vehicle_id is not null + or vehicle_registration_id is not null + ); diff --git a/src/main/resources/db/migration/V7__repair_event_vehicle_foreign_key.sql b/src/main/resources/db/migration/V7__repair_event_vehicle_foreign_key.sql new file mode 100644 index 0000000..44ceff9 --- /dev/null +++ b/src/main/resources/db/migration/V7__repair_event_vehicle_foreign_key.sql @@ -0,0 +1,24 @@ +alter table eventhub.event + drop constraint if exists event_vehicle_entity_id_fkey; + +alter table eventhub.event + drop constraint if exists fk_event_vehicle; + +do $migration$ +begin + if exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'event' + and column_name = 'vehicle_id' + ) + then + alter table eventhub.event + add constraint fk_event_vehicle + foreign key (vehicle_id) + references eventhub.vehicle(id) + not valid; + end if; +end +$migration$; diff --git a/src/main/resources/db/migration/V8__project_vehicle_master_data_assignments.sql b/src/main/resources/db/migration/V8__project_vehicle_master_data_assignments.sql new file mode 100644 index 0000000..fc9e7ea --- /dev/null +++ b/src/main/resources/db/migration/V8__project_vehicle_master_data_assignments.sql @@ -0,0 +1,193 @@ +drop index if exists eventhub.idx_vehicle_registration_plate; + +alter table eventhub.vehicle_registration + drop constraint if exists chk_vehicle_registration_valid_time_order; + +alter table eventhub.vehicle_registration + drop column if exists valid_from, + drop column if exists valid_to; + +create index if not exists idx_vehicle_registration_plate + on eventhub.vehicle_registration(tenant_key, event_source_id, nation, registration_number); + +update eventhub.source_master_entity +set valid_from = null, + valid_to = null, + updated_at = now() +where entity_type = 'VEHICLE_REGISTRATION' + and (valid_from is not null or valid_to is not null); + +with vehicle_entities as ( + select tenant_key, + event_source_id, + nullif(source_entity_id, '') as source_vehicle_entity_id, + nullif(source_external_key, '') as vin + from eventhub.source_master_entity + where entity_type = 'VEHICLE' + and (nullif(source_entity_id, '') is not null or nullif(source_external_key, '') is not null) +) +update eventhub.vehicle vehicle +set source_vehicle_entity_id = coalesce(vehicle.source_vehicle_entity_id, vehicle_entities.source_vehicle_entity_id), + vin = coalesce(vehicle.vin, vehicle_entities.vin), + updated_at = now() +from vehicle_entities +where vehicle.tenant_key = vehicle_entities.tenant_key + and vehicle.event_source_id = vehicle_entities.event_source_id + and ( + vehicle.source_vehicle_entity_id = vehicle_entities.source_vehicle_entity_id + or vehicle.vin = vehicle_entities.vin + ); + +with vehicle_entities as ( + select tenant_key, + event_source_id, + nullif(source_entity_id, '') as source_vehicle_entity_id, + nullif(source_external_key, '') as vin + from eventhub.source_master_entity + where entity_type = 'VEHICLE' + and (nullif(source_entity_id, '') is not null or nullif(source_external_key, '') is not null) +) +insert into eventhub.vehicle(id, tenant_key, event_source_id, source_vehicle_entity_id, vin) +select gen_random_uuid(), + vehicle_entities.tenant_key, + vehicle_entities.event_source_id, + vehicle_entities.source_vehicle_entity_id, + vehicle_entities.vin +from vehicle_entities +where not exists ( + select 1 + from eventhub.vehicle vehicle + where vehicle.tenant_key = vehicle_entities.tenant_key + and vehicle.event_source_id = vehicle_entities.event_source_id + and ( + vehicle.source_vehicle_entity_id = vehicle_entities.source_vehicle_entity_id + or vehicle.vin = vehicle_entities.vin + ) +); + +with registration_entities as ( + select tenant_key, + event_source_id, + nullif(source_entity_id, '') as source_registration_entity_id, + coalesce( + nullif(payload ->> 'registration_nation', ''), + nullif(split_part(source_external_key, ':', 1), '') + ) as nation, + coalesce( + nullif(payload ->> 'registration_number', ''), + case + when position(':' in source_external_key) > 0 + then nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + else nullif(source_external_key, '') + end + ) as registration_number, + source_updated_at + from eventhub.source_master_entity + where entity_type = 'VEHICLE_REGISTRATION' +) +update eventhub.vehicle_registration registration +set source_registration_entity_id = coalesce( + registration.source_registration_entity_id, + registration_entities.source_registration_entity_id + ), + nation = coalesce(registration_entities.nation, registration.nation), + registration_number = coalesce(registration_entities.registration_number, registration.registration_number), + source_updated_at = registration_entities.source_updated_at, + updated_at = now() +from registration_entities +where registration_entities.nation is not null + and registration_entities.registration_number is not null + and registration.tenant_key = registration_entities.tenant_key + and registration.event_source_id = registration_entities.event_source_id + and ( + registration.source_registration_entity_id = registration_entities.source_registration_entity_id + or ( + registration.nation = registration_entities.nation + and registration.registration_number = registration_entities.registration_number + ) + ); + +with registration_entities as ( + select tenant_key, + event_source_id, + nullif(source_entity_id, '') as source_registration_entity_id, + coalesce( + nullif(payload ->> 'registration_nation', ''), + nullif(split_part(source_external_key, ':', 1), '') + ) as nation, + coalesce( + nullif(payload ->> 'registration_number', ''), + case + when position(':' in source_external_key) > 0 + then nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + else nullif(source_external_key, '') + end + ) as registration_number, + source_updated_at + from eventhub.source_master_entity + where entity_type = 'VEHICLE_REGISTRATION' +) +insert into eventhub.vehicle_registration( + id, tenant_key, event_source_id, source_registration_entity_id, + nation, registration_number, source_updated_at, payload +) +select gen_random_uuid(), + registration_entities.tenant_key, + registration_entities.event_source_id, + registration_entities.source_registration_entity_id, + registration_entities.nation, + registration_entities.registration_number, + registration_entities.source_updated_at, + jsonb_build_object('source', 'master-data') +from registration_entities +where registration_entities.nation is not null + and registration_entities.registration_number is not null + and not exists ( + select 1 + from eventhub.vehicle_registration registration + where registration.tenant_key = registration_entities.tenant_key + and registration.event_source_id = registration_entities.event_source_id + and ( + registration.source_registration_entity_id = registration_entities.source_registration_entity_id + or ( + registration.nation = registration_entities.nation + and registration.registration_number = registration_entities.registration_number + ) + ) + ); + +insert into eventhub.vehicle_registration_assignment( + id, tenant_key, event_source_id, vehicle_registration_id, vehicle_id, + valid_from, valid_to, source_updated_at, payload +) +select gen_random_uuid(), + relation.tenant_key, + relation.event_source_id, + registration.id, + vehicle.id, + relation.valid_from, + relation.valid_to, + relation.source_updated_at, + jsonb_build_object( + 'source', 'master-data', + 'sourceRelationId', relation.id, + 'relationKey', relation.relation_key + ) +from eventhub.source_master_relation relation +join eventhub.vehicle_registration registration on registration.tenant_key = relation.tenant_key + and registration.event_source_id = relation.event_source_id + and registration.source_registration_entity_id = relation.from_source_entity_id +join eventhub.vehicle vehicle on vehicle.tenant_key = relation.tenant_key + and vehicle.event_source_id = relation.event_source_id + and vehicle.source_vehicle_entity_id = relation.to_source_entity_id +where relation.relation_type = 'VEHICLE_REGISTRATION_VEHICLE' + and relation.from_entity_type = 'VEHICLE_REGISTRATION' + and relation.to_entity_type = 'VEHICLE' + and not exists ( + select 1 + from eventhub.vehicle_registration_assignment existing + where existing.vehicle_registration_id = registration.id + and existing.vehicle_id = vehicle.id + and existing.valid_from is not distinct from relation.valid_from + and existing.valid_to is not distinct from relation.valid_to + ); diff --git a/src/main/resources/sql/tachograph/card-activity.sql b/src/main/resources/sql/tachograph/card-activity.sql index 74d7976..7a01a53 100644 --- a/src/main/resources/sql/tachograph/card-activity.sql +++ b/src/main/resources/sql/tachograph/card-activity.sql @@ -8,16 +8,104 @@ * the best matching CardVehiclesUsed row for the activity timestamp when one is * available. */ -select - cast(ca.ID as varchar(128)) as source_row_id, - cast(ca.ID as varchar(128)) as card_activity_id, - concat('TACHOGRAPH:CARD_ACTIVITY:', ca.ID) as external_source_event_id, +with OrgTree as ( + select org.I_90021_OID + from dbo.GetOrganisationTree(null, :organisationId, 0, null) org + where :organisationId is not null +) +, +CandidateActivity as ( + select + ca.ID, + ca.BeginTime, + activity_time.EndTime, + ca.Activity, + ca.Slot, + ca.CardStatus, + ca.DrivingStatus, + ca.ID_FileLog, + cda.RecordDate, + cda.RecordDateTo, + cda.ID_FileLog as cda_filelog_id, + c.ID as card_id, + c.ID_Driver as driver_id, + c.ID_Nation as driver_card_nation_id, + c.CardNumber as driver_card_number, + c.ID_FileLog as card_filelog_id, + coalesce(ca.ID_FileLog, cda.ID_FileLog, c.ID_FileLog) as file_log_id + from dbo.CardActivity ca + join dbo.CardDailyActivity cda on cda.ID = ca.ID_DailyActivity + join dbo.Card c on c.ID = cda.ID_Card + cross apply (values (dateadd(minute, coalesce(ca.Duration, 0), ca.BeginTime))) activity_time(EndTime) + where (:occurredTo is null or ca.BeginTime < :occurredTo) + and ( + :occurredFrom is null + or ca.BeginTime >= :occurredFrom + or activity_time.EndTime >= :occurredFrom + ) + and ( + :organisationId is null + or exists ( + select 1 + from dbo.Driver_I_90021 rel + join OrgTree on OrgTree.I_90021_OID = rel.ID_I_90021 + where rel.ID_Driver = c.ID_Driver + and rel.GILT_BIS is null + ) + ) +) +, +Base as ( + select + ca.ID, + ca.BeginTime, + ca.EndTime, + ca.Activity, + ca.Slot, + ca.CardStatus, + ca.DrivingStatus, + ca.ID_FileLog, + ca.cda_filelog_id, + ca.card_filelog_id, - ca.BeginTime as occurred_at, - coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - ca.Activity as activity_code, - ca.Activity as activity_text, - case upper(coalesce(ca.Activity, '')) + coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, + coalesce(fl.ID, ca.ID_FileLog, ca.cda_filelog_id, ca.card_filelog_id, ca.ID) as source_package_id_raw, + coalesce(fl.ID_Card, ca.card_id) as source_package_entity_id_raw, + coalesce(fl.DownloadFrom, ca.RecordDate) as source_package_period_from, + coalesce(fl.DownloadTo, ca.RecordDateTo, dateadd(day, 1, ca.RecordDate)) as source_package_period_to, + coalesce(fl.CreationDate, fl.TStamp) as source_package_imported_at, + + ca.driver_id, + cn.AlphaCode as driver_card_nation, + ca.driver_card_number, + coalesce(cvu.ID_Vehicle, v.ID) as vehicle_registration_id, + null as vehicle_vin, + vn.AlphaCode as vehicle_registration_nation, + v.VRN as vehicle_registration_number + from CandidateActivity ca + left join dbo.FileLog fl on fl.ID = ca.file_log_id + left join dbo.Nation cn on cn.ID = ca.driver_card_nation_id + outer apply ( + select top 1 used.ID_Vehicle + from dbo.CardVehiclesUsed used + where used.ID_Card = ca.card_id + and (used.FirstUse is null or used.FirstUse <= ca.BeginTime) + and (used.LastUse is null or used.LastUse >= ca.BeginTime) + order by + used.FirstUse desc, + used.ID desc + ) cvu + left join dbo.Vehicle v on v.ID = cvu.ID_Vehicle + left join dbo.Nation vn on vn.ID = v.ID_Nation +) +select + cast(base.ID as varchar(128)) as source_row_id, + concat('TACHOGRAPH:CARD_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id, + + evt.occurred_at as occurred_at, + base.received_partner_at, + base.Activity as activity_code, + case upper(coalesce(base.Activity, '')) when 'DRIVING' then 'DRIVE' when 'DRIVE' then 'DRIVE' when 'WORK' then 'WORK' @@ -28,67 +116,35 @@ select when 'REST' then 'BREAK_REST' else 'UNKNOWN_ACTIVITY' end as event_type, - 'SNAPSHOT' as lifecycle, - ca.Slot as card_slot, - ca.CardStatus as card_status, - ca.DrivingStatus as driving_status, + evt.lifecycle as lifecycle, + base.Slot as card_slot, + base.CardStatus as card_status, + base.DrivingStatus as driving_status, cast(null as bigint) as odometer_m, - cast(d.ID as varchar(128)) as driver_source_entity_id, - cn.AlphaCode as driver_card_nation, - c.CardNumber as driver_card_number, + cast(base.driver_id as varchar(128)) as driver_source_entity_id, + base.driver_card_nation, + base.driver_card_number, - cast(coalesce(cvu.ID_Vehicle, v.ID) as varchar(128)) as vehicle_source_entity_id, - coalesce(cvu.VIN, vi.VIN) as vehicle_vin, - vn.AlphaCode as vehicle_registration_nation, - v.VRN as vehicle_registration_number, + cast(null as varchar(128)) as vehicle_source_entity_id, + base.vehicle_vin, + cast(base.vehicle_registration_id as varchar(128)) as vehicle_registration_source_entity_id, + base.vehicle_registration_nation, + base.vehicle_registration_number, 'DRIVER_CARD' as source_package_kind, - cast(coalesce(fl.ID, ca.ID_FileLog, cda.ID_FileLog, c.ID_FileLog, ca.ID) as varchar(128)) as source_package_id, - cast(coalesce(fl.ID_Card, c.ID) as varchar(128)) as source_package_entity_id, - coalesce(fl.DownloadFrom, cda.RecordDate) as source_package_period_from, - coalesce(fl.DownloadTo, cda.RecordDateTo, dateadd(day, 1, cda.RecordDate)) as source_package_period_to, - coalesce(fl.CreationDate, fl.TStamp) as source_package_imported_at -from dbo.CardActivity ca -join dbo.CardDailyActivity cda on cda.ID = ca.ID_DailyActivity -join dbo.Card c on c.ID = cda.ID_Card -left join dbo.FileLog fl on fl.ID = coalesce(ca.ID_FileLog, cda.ID_FileLog, c.ID_FileLog) -left join dbo.Driver d on d.ID = c.ID_Driver -left join dbo.Nation cn on cn.ID = c.ID_Nation -outer apply ( - select top 1 used.ID_Vehicle, - used.VIN, - used.OdoBegin - from dbo.CardVehiclesUsed used - where used.ID_Card = c.ID - and (used.FirstUse is null or used.FirstUse <= ca.BeginTime) - and (used.LastUse is null or used.LastUse >= ca.BeginTime) - order by - case when used.FirstUse is null then 1 else 0 end, - used.FirstUse desc, - used.ID desc -) cvu -left join dbo.Vehicle v on v.ID = cvu.ID_Vehicle -left join dbo.VehicleIdentification vi on vi.ID = v.ID_VehicleIdentification -left join dbo.Nation vn on vn.ID = v.ID_Nation -where (:occurredFrom is null or ca.BeginTime >= :occurredFrom) - and (:occurredTo is null or ca.BeginTime < :occurredTo) - and ( - :lastSourcePackageImportedAt is null - or coalesce(fl.CreationDate, fl.TStamp) > :lastSourcePackageImportedAt - or ( - coalesce(fl.CreationDate, fl.TStamp) = :lastSourcePackageImportedAt - and coalesce(fl.ID, ca.ID_FileLog, cda.ID_FileLog, c.ID_FileLog, ca.ID) > try_convert(int, :lastSourcePackageId) - ) - or ( - coalesce(fl.CreationDate, fl.TStamp) is null - and ( - :lastSourcePackageId is null - or coalesce(fl.ID, ca.ID_FileLog, cda.ID_FileLog, c.ID_FileLog, ca.ID) > try_convert(int, :lastSourcePackageId) - ) - ) - ) + cast(base.source_package_id_raw as varchar(128)) as source_package_id, + cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, + base.source_package_period_from, + base.source_package_period_to, + base.source_package_imported_at +from Base base +cross apply (values + ('START', base.BeginTime), + ('END', base.EndTime) +) evt(lifecycle, occurred_at) +where (:occurredFrom is null or evt.occurred_at >= :occurredFrom) + and (:occurredTo is null or evt.occurred_at < :occurredTo) /* - * Organisation filtering can use FileLog.I_90021_ID / FileLog.OrgID or - * Driver_I_90021 / Vehicle_I_90021 once subtree semantics are confirmed. + * Organisation filter: driver membership in GetOrganisationTree(null, :organisationId, 0, null). */ diff --git a/src/main/resources/sql/tachograph/master-data/vehicle-registrations.sql b/src/main/resources/sql/tachograph/master-data/vehicle-registrations.sql index 780eba7..62188e8 100644 --- a/src/main/resources/sql/tachograph/master-data/vehicle-registrations.sql +++ b/src/main/resources/sql/tachograph/master-data/vehicle-registrations.sql @@ -4,8 +4,8 @@ select concat(coalesce(n.AlphaCode, ''), ':', v.VRN) as source_external_key, concat(coalesce(n.AlphaCode, ''), ':', v.VRN) as display_name, cast(case when v.ValidTo is null or v.ValidTo > getutcdate() then 1 else 0 end as bit) as active, - v.ValidFrom as valid_from, - v.ValidTo as valid_to, + cast(null as datetime) as valid_from, + cast(null as datetime) as valid_to, cast(null as datetime) as source_updated_at, v.ID as vehicle_registration_id, cast(v.ID_VehicleIdentification as varchar(128)) as vehicle_identification_id, diff --git a/src/main/resources/sql/tachograph/vu-activity.sql b/src/main/resources/sql/tachograph/vu-activity.sql index 040ecd4..35a5933 100644 --- a/src/main/resources/sql/tachograph/vu-activity.sql +++ b/src/main/resources/sql/tachograph/vu-activity.sql @@ -5,16 +5,133 @@ * VUActivity -> VUDailyActivity -> VUInstallation -> VehicleIdentification * Optional driver/card context comes from VUActivity.ID_IWCycle -> IWCycle -> Card. */ -select - cast(va.ID as varchar(128)) as source_row_id, - cast(va.ID as varchar(128)) as vu_activity_id, - concat('TACHOGRAPH:VU_ACTIVITY:', va.ID) as external_source_event_id, +with OrgTree as ( + select org.I_90021_OID + from dbo.GetOrganisationTree(null, :organisationId, 0, null) org + where :organisationId is not null +) +, +CandidateActivity as ( + select + va.ID, + va.BeginTime, + activity_time.EndTime, + va.Activity, + va.Slot, + va.CardStatus, + va.DrivingStatus, + va.ID_FileLog, + va.ID_IWCycle, + vda.RecordDate, + vda.ID_FileLog as vda_filelog_id, + vui.ID_VehicleIdentification, + vui.ID_FileLog as vui_filelog_id, + vi.ID as vehicle_identification_id, + vi.VIN as vehicle_vin, + coalesce(va.ID_FileLog, vda.ID_FileLog, vui.ID_FileLog) as file_log_id + from dbo.VUActivity va + join dbo.VUDailyActivity vda on vda.ID = va.ID_VUDailyActivity + join dbo.VUInstallation vui on vui.ID = vda.ID_VUInstallation + join dbo.VehicleIdentification vi on vi.ID = vui.ID_VehicleIdentification + cross apply (values (dateadd(minute, coalesce(va.Duration, 0), va.BeginTime))) activity_time(EndTime) + where (:occurredTo is null or va.BeginTime < :occurredTo) + and ( + :occurredFrom is null + or va.BeginTime >= :occurredFrom + or activity_time.EndTime >= :occurredFrom + ) +) +, +CandidateVehicle as ( + select + va.ID, + va.BeginTime, + va.EndTime, + va.Activity, + va.Slot, + va.CardStatus, + va.DrivingStatus, + va.ID_FileLog, + va.ID_IWCycle, + va.RecordDate, + va.vda_filelog_id, + va.ID_VehicleIdentification, + va.vui_filelog_id, + va.vehicle_identification_id, + va.vehicle_vin, + va.file_log_id, + v.ID as vehicle_registration_id, + v.ID_Nation as vehicle_registration_nation_id, + v.VRN as vehicle_registration_number + from CandidateActivity va + outer apply ( + select top 1 vehicle.ID, + vehicle.VRN, + vehicle.ID_Nation + from dbo.Vehicle vehicle + where vehicle.ID_VehicleIdentification = va.vehicle_identification_id + and (vehicle.ValidFrom is null or vehicle.ValidFrom <= va.BeginTime) + and (vehicle.ValidTo is null or vehicle.ValidTo > va.BeginTime) + order by + vehicle.ValidFrom desc, + vehicle.ID desc + ) v + where ( + :organisationId is null + or exists ( + select 1 + from dbo.Vehicle_I_90021 rel + join OrgTree on OrgTree.I_90021_OID = rel.ID_I_90021 + where rel.ID_Vehicle = v.ID + and rel.GILT_BIS is null + ) + ) +) +, +Base as ( + select + va.ID, + va.BeginTime, + va.EndTime, + va.Activity, + va.Slot, + va.CardStatus, + va.DrivingStatus, + va.ID_FileLog, + va.vda_filelog_id, + va.vui_filelog_id, - va.BeginTime as occurred_at, - coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - va.Activity as activity_code, - va.Activity as activity_text, - case upper(coalesce(va.Activity, '')) + coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, + coalesce(fl.ID, va.ID_FileLog, va.vda_filelog_id, va.vui_filelog_id, va.ID) as source_package_id_raw, + coalesce(fl.ID_VehicleIdentification, va.ID_VehicleIdentification) as source_package_entity_id_raw, + coalesce(fl.DownloadFrom, va.RecordDate) as source_package_period_from, + coalesce(fl.DownloadTo, dateadd(day, 1, va.RecordDate)) as source_package_period_to, + coalesce(fl.CreationDate, fl.TStamp) as source_package_imported_at, + + null as odometer_m, + c.ID_Driver as driver_id, + cn.AlphaCode as driver_card_nation, + c.CardNumber as driver_card_number, + va.vehicle_identification_id, + va.vehicle_registration_id, + va.vehicle_vin, + vn.AlphaCode as vehicle_registration_nation, + va.vehicle_registration_number + from CandidateVehicle va + left join dbo.FileLog fl on fl.ID = va.file_log_id + left join dbo.Nation vn on vn.ID = va.vehicle_registration_nation_id + left join dbo.IWCycle iw on iw.ID = va.ID_IWCycle + left join dbo.Card c on c.ID = iw.ID_Card + left join dbo.Nation cn on cn.ID = c.ID_Nation +) +select + cast(base.ID as varchar(128)) as source_row_id, + concat('TACHOGRAPH:VU_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id, + + evt.occurred_at as occurred_at, + base.received_partner_at, + base.Activity as activity_code, + case upper(coalesce(base.Activity, '')) when 'DRIVING' then 'DRIVE' when 'DRIVE' then 'DRIVE' when 'WORK' then 'WORK' @@ -25,68 +142,35 @@ select when 'REST' then 'BREAK_REST' else 'UNKNOWN_ACTIVITY' end as event_type, - 'SNAPSHOT' as lifecycle, - va.Slot as card_slot, - va.CardStatus as card_status, - va.DrivingStatus as driving_status, - cast(iw.OdoBegin as bigint) as odometer_m, + evt.lifecycle as lifecycle, + base.Slot as card_slot, + base.CardStatus as card_status, + base.DrivingStatus as driving_status, + base.odometer_m, - cast(d.ID as varchar(128)) as driver_source_entity_id, - cn.AlphaCode as driver_card_nation, - c.CardNumber as driver_card_number, + cast(base.driver_id as varchar(128)) as driver_source_entity_id, + base.driver_card_nation, + base.driver_card_number, - cast(v.ID as varchar(128)) as vehicle_source_entity_id, - vi.VIN as vehicle_vin, - vn.AlphaCode as vehicle_registration_nation, - v.VRN as vehicle_registration_number, + cast(base.vehicle_identification_id as varchar(128)) as vehicle_source_entity_id, + base.vehicle_vin, + cast(base.vehicle_registration_id as varchar(128)) as vehicle_registration_source_entity_id, + base.vehicle_registration_nation, + base.vehicle_registration_number, 'VEHICLE_UNIT' as source_package_kind, - cast(coalesce(fl.ID, va.ID_FileLog, vda.ID_FileLog, vui.ID_FileLog, va.ID) as varchar(128)) as source_package_id, - cast(coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as varchar(128)) as source_package_entity_id, - coalesce(fl.DownloadFrom, vda.RecordDate) as source_package_period_from, - coalesce(fl.DownloadTo, dateadd(day, 1, vda.RecordDate)) as source_package_period_to, - coalesce(fl.CreationDate, fl.TStamp) as source_package_imported_at -from dbo.VUActivity va -join dbo.VUDailyActivity vda on vda.ID = va.ID_VUDailyActivity -join dbo.VUInstallation vui on vui.ID = vda.ID_VUInstallation -left join dbo.FileLog fl on fl.ID = coalesce(va.ID_FileLog, vda.ID_FileLog, vui.ID_FileLog) -join dbo.VehicleIdentification vi on vi.ID = vui.ID_VehicleIdentification -outer apply ( - select top 1 vehicle.ID, - vehicle.VRN, - vehicle.ID_Nation - from dbo.Vehicle vehicle - where vehicle.ID_VehicleIdentification = vi.ID - and (vehicle.ValidFrom is null or vehicle.ValidFrom <= va.BeginTime) - and (vehicle.ValidTo is null or vehicle.ValidTo > va.BeginTime) - order by - case when vehicle.ValidFrom is null then 1 else 0 end, - vehicle.ValidFrom desc, - vehicle.ID desc -) v -left join dbo.Nation vn on vn.ID = v.ID_Nation -left join dbo.IWCycle iw on iw.ID = va.ID_IWCycle -left join dbo.Card c on c.ID = iw.ID_Card -left join dbo.Driver d on d.ID = c.ID_Driver -left join dbo.Nation cn on cn.ID = c.ID_Nation -where (:occurredFrom is null or va.BeginTime >= :occurredFrom) - and (:occurredTo is null or va.BeginTime < :occurredTo) - and ( - :lastSourcePackageImportedAt is null - or coalesce(fl.CreationDate, fl.TStamp) > :lastSourcePackageImportedAt - or ( - coalesce(fl.CreationDate, fl.TStamp) = :lastSourcePackageImportedAt - and coalesce(fl.ID, va.ID_FileLog, vda.ID_FileLog, vui.ID_FileLog, va.ID) > try_convert(int, :lastSourcePackageId) - ) - or ( - coalesce(fl.CreationDate, fl.TStamp) is null - and ( - :lastSourcePackageId is null - or coalesce(fl.ID, va.ID_FileLog, vda.ID_FileLog, vui.ID_FileLog, va.ID) > try_convert(int, :lastSourcePackageId) - ) - ) - ) + cast(base.source_package_id_raw as varchar(128)) as source_package_id, + cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, + base.source_package_period_from, + base.source_package_period_to, + base.source_package_imported_at +from Base base +cross apply (values + ('START', base.BeginTime), + ('END', base.EndTime) +) evt(lifecycle, occurred_at) +where (:occurredFrom is null or evt.occurred_at >= :occurredFrom) + and (:occurredTo is null or evt.occurred_at < :occurredTo) /* - * Organisation filtering can use FileLog.I_90021_ID / FileLog.OrgID or - * Vehicle_I_90021 / Driver_I_90021 once subtree semantics are confirmed. + * Organisation filter: vehicle membership in GetOrganisationTree(null, :organisationId, 0, null). */ diff --git a/src/test/java/at/procon/eventhub/tachograph/service/TachographImportPlanServiceTest.java b/src/test/java/at/procon/eventhub/tachograph/service/TachographImportPlanServiceTest.java new file mode 100644 index 0000000..5a49121 --- /dev/null +++ b/src/test/java/at/procon/eventhub/tachograph/service/TachographImportPlanServiceTest.java @@ -0,0 +1,71 @@ +package at.procon.eventhub.tachograph.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.importing.ImportChunkPlanner; +import at.procon.eventhub.service.EventDetailsFactory; +import at.procon.eventhub.tachograph.dto.TachographImportRequest; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.EnumSet; +import java.util.List; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class TachographImportPlanServiceTest { + + private final EventDetailsFactory detailsFactory = new EventDetailsFactory(new ObjectMapper()); + private final TachographExtractionDefinitionRegistry definitionRegistry = new TachographExtractionDefinitionRegistry( + new CardActivityRowMapper(detailsFactory), + new VuActivityRowMapper(detailsFactory) + ); + + @Test + void rejectsUnsupportedEventFamiliesWhenJdbcExtractionIsEnabled() { + TachographImportPlanService service = serviceWithJdbcExtractor(); + TachographImportRequest request = requestForFamilies(EventFamily.DRIVER_CARD); + + assertThatThrownBy(() -> service.createPlan(request)) + .isInstanceOf(UnsupportedTachographExtractionException.class) + .hasMessageContaining("IW_CYCLE") + .hasMessageContaining("Supported JDBC extraction codes") + .hasMessageContaining("DRIVER_ACTIVITY"); + } + + @Test + void allowsSupportedDriverActivityFamilyWhenJdbcExtractionIsEnabled() { + TachographImportPlanService service = serviceWithJdbcExtractor(); + TachographImportRequest request = requestForFamilies(EventFamily.DRIVER_ACTIVITY); + + var plan = service.createPlan(request); + + assertThat(plan.items()) + .extracting(item -> item.extractionCode()) + .containsExactlyInAnyOrder("VU_ACTIVITY", "CARD_ACTIVITY"); + } + + private TachographImportPlanService serviceWithJdbcExtractor() { + EventHubProperties properties = new EventHubProperties(); + properties.getTachograph().getDatasource().setJdbcUrl("jdbc:sqlserver://tachograph-db"); + return new TachographImportPlanService(properties, new ImportChunkPlanner(), definitionRegistry); + } + + private TachographImportRequest requestForFamilies(EventFamily... families) { + EnumSet selectedFamilies = families.length == 0 + ? EnumSet.noneOf(EventFamily.class) + : EnumSet.copyOf(List.of(families)); + return new TachographImportRequest( + "tenant-1", + new EventSourceDto("TACHOGRAPH", "MIXED", "TACHOGRAPH_DB", "tachograph-db", null, null), + null, + ImportScopeDto.tenantAll(null, null), + selectedFamilies, + null, + false, + null + ); + } +}