diff --git a/docs/db/create_eventhub_schema.sql b/docs/db/create_eventhub_schema.sql index 490cf3b..adeb274 100644 --- a/docs/db/create_eventhub_schema.sql +++ b/docs/db/create_eventhub_schema.sql @@ -1,6 +1,8 @@ create extension if not exists pgcrypto; create extension if not exists postgis; +create extension if not exists timescaledb; +drop schema if exists eventhub cascade; create schema if not exists eventhub; create table if not exists eventhub.event_source ( @@ -143,11 +145,56 @@ create table if not exists eventhub.source_master_relation ( constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) ); -create table if not exists eventhub.vehicle ( +create table if not exists eventhub.driver ( + id uuid primary key, + first_names text, + last_name text, + birth_date date, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table if not exists eventhub.driver_card ( + id uuid primary key, + driver_id uuid references eventhub.driver(id), + nation text not null, + card_number text not null, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table if not exists eventhub.source_driver_identity ( id uuid primary key, tenant_key text not null, event_source_id integer not null references eventhub.event_source(id), - source_vehicle_entity_id text, + source_driver_entity_id text not null, + driver_id uuid not null references eventhub.driver(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_driver_identity unique (tenant_key, event_source_id, source_driver_entity_id) +); + +create table if not exists eventhub.source_driver_card_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_driver_card_entity_id text not null, + driver_card_id uuid not null references eventhub.driver_card(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_driver_card_identity unique (tenant_key, event_source_id, source_driver_card_entity_id) +); + +create table if not exists eventhub.vehicle ( + id uuid primary key, vin text, created_at timestamptz not null default now(), updated_at timestamptz not null default now() @@ -155,9 +202,6 @@ create table if not exists eventhub.vehicle ( create table if not exists eventhub.vehicle_registration ( id uuid primary key, - tenant_key text not null, - event_source_id integer not null references eventhub.event_source(id), - source_registration_entity_id text, nation text not null, registration_number text not null, source_updated_at timestamptz, @@ -166,6 +210,32 @@ create table if not exists eventhub.vehicle_registration ( updated_at timestamptz not null default now() ); +create table if not exists eventhub.source_vehicle_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_vehicle_entity_id text not null, + vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_vehicle_identity unique (tenant_key, event_source_id, source_vehicle_entity_id) +); + +create table if not exists eventhub.source_vehicle_registration_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_registration_entity_id text not null, + vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_vehicle_registration_identity unique (tenant_key, event_source_id, source_registration_entity_id) +); + create table if not exists eventhub.vehicle_registration_assignment ( id uuid primary key, tenant_key text not null, @@ -186,9 +256,11 @@ create table if not exists eventhub.event ( event_source_id integer not null references eventhub.event_source(id), data_package_id uuid not null references eventhub.data_package(id), external_source_event_id text not null, - driver_entity_id uuid references eventhub.source_master_entity(id), + driver_id uuid references eventhub.driver(id), + driver_card_id uuid references eventhub.driver_card(id), vehicle_id uuid references eventhub.vehicle(id), vehicle_registration_id uuid references eventhub.vehicle_registration(id), + source_package_id text, source_package_entity_id uuid references eventhub.source_master_entity(id), occurred_at timestamptz not null, received_partner_at timestamptz, @@ -205,26 +277,90 @@ create table if not exists eventhub.event ( created_at timestamptz not null default now(), constraint pk_event primary key (occurred_at, id), constraint chk_event_driver_or_vehicle_ref check ( - driver_entity_id is not null + driver_id is not null + or driver_card_id is not null or vehicle_id is not null or vehicle_registration_id is not null ) ); +create table if not exists eventhub.event_source_record ( + source_record_key_hash text primary key, + event_occurred_at timestamptz not null, + event_id uuid not null, + created_at timestamptz not null default now() +); + create table if not exists eventhub.event_detail ( event_occurred_at timestamptz not null, event_id uuid not null, detail_type text not null, attributes jsonb not null default '{}'::jsonb, created_at timestamptz not null default now(), - constraint pk_event_detail primary key (event_occurred_at, event_id, detail_type), - constraint fk_event_detail_event foreign key (event_occurred_at, event_id) - references eventhub.event(occurred_at, id) - on delete cascade + constraint pk_event_detail primary key (event_occurred_at, event_id, detail_type) ); -create unique index if not exists ux_event_source_record - on eventhub.event(source_record_key_hash); +select create_hypertable( + 'eventhub.event', + 'occurred_at', + chunk_time_interval => interval '7 days', + if_not_exists => true +); + +alter table eventhub.event_source_record + add constraint fk_event_source_record_event foreign key (event_occurred_at, event_id) + references eventhub.event(occurred_at, id) + on delete cascade + deferrable initially deferred; + +alter table eventhub.event_detail + add constraint fk_event_detail_event foreign key (event_occurred_at, event_id) + references eventhub.event(occurred_at, id) + on delete cascade; + +create index if not exists idx_data_package_source_time + on eventhub.data_package(tenant_key, event_source_id, received_at desc); + +create index if not exists idx_data_package_scope + on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to); + +create index if not exists idx_data_package_extraction + on eventhub.data_package(tenant_key, event_source_id, import_run_id, event_family, extraction_source_kind, extraction_code, batch_no); + +create index if not exists idx_import_run_source_status + on eventhub.import_run(tenant_key, event_source_id, status, started_at desc); + +create index if not exists idx_source_master_entity_type_key + on eventhub.source_master_entity(tenant_key, event_source_id, entity_type, source_external_key) + where source_external_key is not null; + +create index if not exists idx_source_master_entity_payload_gin + on eventhub.source_master_entity using gin(payload); + +create index if not exists idx_source_master_relation_from + on eventhub.source_master_relation(tenant_key, event_source_id, from_entity_type, from_source_entity_id, relation_type); + +create index if not exists idx_source_master_relation_to + on eventhub.source_master_relation(tenant_key, event_source_id, to_entity_type, to_source_entity_id, relation_type); + +create index if not exists idx_source_master_relation_payload_gin + on eventhub.source_master_relation using gin(payload); + +create index if not exists idx_vehicle_vin + on eventhub.vehicle(vin) + where vin is not null; + +create index if not exists idx_vehicle_registration_plate + on eventhub.vehicle_registration(nation, registration_number); + +create index if not exists idx_vehicle_registration_assignment_registration_time + on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to); + +create index if not exists idx_vehicle_registration_assignment_vehicle_time + on eventhub.vehicle_registration_assignment(vehicle_id, valid_from desc, valid_to); + +create index if not exists idx_event_source_record_event + on eventhub.event_source_record(event_occurred_at, event_id); create index if not exists idx_event_signature on eventhub.event(event_signature_hash) @@ -236,12 +372,49 @@ create index if not exists idx_event_source_time create index if not exists idx_event_package_time on eventhub.event(data_package_id, occurred_at desc); +create index if not exists idx_event_source_package_id + on eventhub.event(source_package_id) + where source_package_id is not null; + create index if not exists idx_event_domain_type_time on eventhub.event(event_domain, event_type, occurred_at desc); +create index if not exists idx_driver_card_key + on eventhub.driver_card(nation, card_number); + +create index if not exists idx_driver_card_driver + on eventhub.driver_card(driver_id) + where driver_id is not null; + +create unique index if not exists ux_driver_card_key + on eventhub.driver_card(nation, card_number); + +create unique index if not exists ux_vehicle_vin + on eventhub.vehicle(vin) + where vin is not null; + +create unique index if not exists ux_vehicle_registration_plate + on eventhub.vehicle_registration(nation, registration_number); + +create index if not exists idx_source_driver_identity_driver + on eventhub.source_driver_identity(driver_id); + +create index if not exists idx_source_driver_card_identity_card + on eventhub.source_driver_card_identity(driver_card_id); + +create index if not exists idx_source_vehicle_identity_vehicle + on eventhub.source_vehicle_identity(vehicle_id); + +create index if not exists idx_source_vehicle_registration_identity_registration + on eventhub.source_vehicle_registration_identity(vehicle_registration_id); + create index if not exists idx_event_driver_time - on eventhub.event(driver_entity_id, occurred_at desc) - where driver_entity_id is not null; + on eventhub.event(driver_id, occurred_at desc) + where driver_id is not null; + +create index if not exists idx_event_driver_card_time + on eventhub.event(driver_card_id, occurred_at desc) + where driver_card_id is not null; create index if not exists idx_event_vehicle_time on eventhub.event(vehicle_id, occurred_at desc) @@ -264,54 +437,18 @@ create index if not exists idx_event_detail_type create index if not exists idx_event_detail_attributes_gin on eventhub.event_detail using gin(attributes); -create index if not exists idx_source_master_entity_type_key - on eventhub.source_master_entity(tenant_key, event_source_id, entity_type, source_external_key) - where source_external_key is not null; +create index if not exists idx_event_detail_yellowfox_slot + on eventhub.event_detail(detail_type, (attributes ->> 'slot'), event_occurred_at) + where detail_type in ('DRIVER_ACTIVITY', 'DRIVER_CARD'); -create index if not exists idx_source_master_entity_payload_gin - on eventhub.source_master_entity using gin(payload); +create index if not exists idx_event_detail_yellowfox_eventtype_state + on eventhub.event_detail( + (attributes ->> 'yellowFoxEventType'), + (attributes ->> 'yellowFoxState'), + event_occurred_at + ) + where attributes ? 'yellowFoxEventType'; -create index if not exists idx_source_master_relation_from - on eventhub.source_master_relation(tenant_key, event_source_id, from_entity_type, from_source_entity_id, relation_type); - -create index if not exists idx_source_master_relation_to - on eventhub.source_master_relation(tenant_key, event_source_id, to_entity_type, to_source_entity_id, relation_type); - -create index if not exists idx_source_master_relation_payload_gin - on eventhub.source_master_relation using gin(payload); - -create index if not exists idx_vehicle_lookup_ctx - on eventhub.vehicle(tenant_key, event_source_id, updated_at desc); - -create index if not exists idx_vehicle_source_entity - on eventhub.vehicle(tenant_key, event_source_id, source_vehicle_entity_id) - where source_vehicle_entity_id is not null; - -create index if not exists idx_vehicle_vin - on eventhub.vehicle(tenant_key, event_source_id, vin) - where vin is not null; - -create index if not exists idx_vehicle_registration_source_entity - on eventhub.vehicle_registration(tenant_key, event_source_id, source_registration_entity_id) - where source_registration_entity_id is not null; - -create index if not exists idx_vehicle_registration_plate - on eventhub.vehicle_registration(tenant_key, event_source_id, nation, registration_number); - -create index if not exists idx_vehicle_registration_assignment_registration_time - on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to); - -create index if not exists idx_vehicle_registration_assignment_vehicle_time - on eventhub.vehicle_registration_assignment(vehicle_id, valid_from desc, valid_to); - -create index if not exists idx_data_package_source_time - on eventhub.data_package(tenant_key, event_source_id, received_at desc); - -create index if not exists idx_data_package_scope - on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to); - -create index if not exists idx_data_package_extraction - on eventhub.data_package(tenant_key, event_source_id, import_run_id, event_family, extraction_source_kind, extraction_code, batch_no); - -create index if not exists idx_import_run_source_status - on eventhub.import_run(tenant_key, event_source_id, status, started_at desc); +create index if not exists idx_event_detail_yellowfox_ignition + on eventhub.event_detail(detail_type, (attributes ->> 'ignitionState'), event_occurred_at) + where attributes ? 'ignitionState'; diff --git a/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java b/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java index 8b710c0..83e2860 100644 --- a/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java @@ -4,6 +4,7 @@ import at.procon.eventhub.dto.DriverCardRefDto; import at.procon.eventhub.dto.DriverRefDto; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.LocalDate; import java.time.OffsetDateTime; import java.util.LinkedHashMap; import java.util.Map; @@ -23,13 +24,13 @@ public class DriverIdentityRepository { this.objectMapper = objectMapper; } - public UUID resolveOrCreateDriverId( + public ResolvedDriverReference resolveOrCreateDriverReference( String tenantKey, int eventSourceId, DriverRefDto driverRef ) { if (driverRef == null || !driverRef.hasAnyReference()) { - return null; + return ResolvedDriverReference.empty(); } String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); @@ -38,19 +39,21 @@ public class DriverIdentityRepository { String cardNation = driverCard == null ? null : normalizeNullable(driverCard.nation()); String cardNumber = driverCard == null ? null : normalizeNullable(driverCard.number()); - UUID driverId = resolveDriverId(normalizedTenantKey, eventSourceId, sourceDriverEntityId, cardNation, cardNumber); - if (driverId == null) { + UUID driverId = findBySourceDriverEntityId(normalizedTenantKey, eventSourceId, sourceDriverEntityId); + UUID driverCardId = resolveOrCreateDriverCardId(cardNation, cardNumber, driverId); + + if (driverId == null && driverCardId != null) { + driverId = findDriverIdByCardId(driverCardId); + } + if (driverId == null && sourceDriverEntityId != null) { Map payload = new LinkedHashMap<>(); put(payload, "source", "event"); put(payload, "source_driver_entity_id", sourceDriverEntityId); - put(payload, "card_nation", cardNation); - put(payload, "card_number", cardNumber); driverId = createDriver( normalizedTenantKey, eventSourceId, sourceDriverEntityId, - cardNation, - cardNumber, + null, null, null, null, @@ -58,24 +61,162 @@ public class DriverIdentityRepository { ); } - touchDriver(driverId, sourceDriverEntityId, cardNation, cardNumber); - return driverId; + if (driverId != null && sourceDriverEntityId != null) { + upsertSourceDriverIdentity( + normalizedTenantKey, + eventSourceId, + sourceDriverEntityId, + driverId, + null, + Map.of("source", "event") + ); + } + if (driverCardId != null && driverId != null) { + linkDriverCard(driverCardId, driverId); + } + return new ResolvedDriverReference(driverId, driverCardId); } @Transactional public int reconcileFromMasterData(String tenantKey, int eventSourceId) { String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); int updates = reconcileDriversFromMasterData(normalizedTenantKey, eventSourceId); - updates += projectDriverCardsFromMasterData(normalizedTenantKey, eventSourceId); + updates += reconcileDriverCardsFromMasterData(normalizedTenantKey, eventSourceId); + updates += projectDriverCardLinksFromMasterData(normalizedTenantKey, eventSourceId); return updates; } private int reconcileDriversFromMasterData(String tenantKey, int eventSourceId) { - Long count = jdbcTemplate.queryForObject( + int insertedDrivers; + if (driverUsesLegacySchema()) { + insertedDrivers = jdbcTemplate.update( + compatibleSourcesCte() + """ + , master_drivers as ( + select distinct on (nullif(trim(source_entity_id), '')) + event_source_id, + nullif(trim(source_entity_id), '') as source_driver_entity_id, + nullif(trim(payload ->> 'first_names'), '') as first_names, + coalesce( + nullif(trim(payload ->> 'last_name'), ''), + nullif(trim(payload ->> 'surname'), '') + ) as last_name, + cast(nullif(trim(payload ->> 'birth_date'), '') as date) as birth_date, + source_updated_at, + payload + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'DRIVER' + and nullif(trim(source_entity_id), '') is not null + order by nullif(trim(source_entity_id), ''), updated_at desc + ), + resolved_drivers as ( + select master.event_source_id, + master.source_driver_entity_id, + master.first_names, + master.last_name, + master.birth_date, + master.source_updated_at, + master.payload, + coalesce(identity.driver_id, gen_random_uuid()) as driver_id + from master_drivers master + left join eventhub.source_driver_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_entity_id = master.source_driver_entity_id + ) + insert into eventhub.driver( + id, tenant_key, event_source_id, source_driver_entity_id, + first_names, last_name, birth_date, source_updated_at, payload, updated_at + ) + select distinct on (resolved.driver_id) + resolved.driver_id, + ?, + resolved.event_source_id, + resolved.source_driver_entity_id, + resolved.first_names, + resolved.last_name, + resolved.birth_date, + resolved.source_updated_at, + resolved.payload, + now() + from resolved_drivers resolved + where not exists ( + select 1 + from eventhub.driver existing + where existing.id = resolved.driver_id + ) + """, + eventSourceId, + tenantKey, + tenantKey, + tenantKey + ); + } else { + insertedDrivers = jdbcTemplate.update( + compatibleSourcesCte() + """ + , master_drivers as ( + select distinct on (nullif(trim(source_entity_id), '')) + event_source_id, + nullif(trim(source_entity_id), '') as source_driver_entity_id, + nullif(trim(payload ->> 'first_names'), '') as first_names, + coalesce( + nullif(trim(payload ->> 'last_name'), ''), + nullif(trim(payload ->> 'surname'), '') + ) as last_name, + cast(nullif(trim(payload ->> 'birth_date'), '') as date) as birth_date, + source_updated_at, + payload + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'DRIVER' + and nullif(trim(source_entity_id), '') is not null + order by nullif(trim(source_entity_id), ''), updated_at desc + ), + resolved_drivers as ( + select master.event_source_id, + master.source_driver_entity_id, + master.first_names, + master.last_name, + master.birth_date, + master.source_updated_at, + master.payload, + coalesce(identity.driver_id, gen_random_uuid()) as driver_id + from master_drivers master + left join eventhub.source_driver_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_entity_id = master.source_driver_entity_id + ) + insert into eventhub.driver( + id, first_names, last_name, birth_date, source_updated_at, payload, updated_at + ) + select distinct on (resolved.driver_id) + resolved.driver_id, + resolved.first_names, + resolved.last_name, + resolved.birth_date, + resolved.source_updated_at, + resolved.payload, + now() + from resolved_drivers resolved + where not exists ( + select 1 + from eventhub.driver existing + where existing.id = resolved.driver_id + ) + """, + eventSourceId, + tenantKey, + tenantKey + ); + } + + int updatedDrivers = jdbcTemplate.update( compatibleSourcesCte() + """ , master_drivers as ( select distinct on (nullif(trim(source_entity_id), '')) - event_source_id, nullif(trim(source_entity_id), '') as source_driver_entity_id, nullif(trim(payload ->> 'first_names'), '') as first_names, coalesce( @@ -90,125 +231,392 @@ public class DriverIdentityRepository { and event_source_id in (select id from compatible_sources) and entity_type = 'DRIVER' and nullif(trim(source_entity_id), '') is not null - and source_entity_id not like 'DRIVER_CARD:%' + order by nullif(trim(source_entity_id), ''), updated_at desc + ) + update eventhub.driver driver + set first_names = coalesce(master.first_names, driver.first_names), + last_name = coalesce(master.last_name, driver.last_name), + birth_date = coalesce(master.birth_date, driver.birth_date), + source_updated_at = coalesce(master.source_updated_at, driver.source_updated_at), + payload = driver.payload || master.payload, + updated_at = now() + from master_drivers master + join eventhub.source_driver_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_entity_id = master.source_driver_entity_id + where identity.driver_id = driver.id + """, + eventSourceId, + tenantKey, + tenantKey + ); + + int linkedDrivers = jdbcTemplate.update( + compatibleSourcesCte() + """ + , master_drivers as ( + select distinct on (nullif(trim(source_entity_id), '')) + event_source_id, + nullif(trim(source_entity_id), '') as source_driver_entity_id, + source_updated_at, + payload + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'DRIVER' + and nullif(trim(source_entity_id), '') is not null order by nullif(trim(source_entity_id), ''), updated_at desc ), - updated_by_source as ( - update eventhub.driver driver - set first_names = coalesce(master.first_names, driver.first_names), - last_name = coalesce(master.last_name, driver.last_name), - birth_date = coalesce(master.birth_date, driver.birth_date), - source_updated_at = master.source_updated_at, - payload = driver.payload || master.payload, - updated_at = now() - from master_drivers master - where driver.tenant_key = ? - and driver.event_source_id in (select id from compatible_sources) - and driver.source_driver_entity_id = master.source_driver_entity_id - returning driver.id - ), - inserted as ( - insert into eventhub.driver( - id, tenant_key, event_source_id, source_driver_entity_id, - first_names, last_name, birth_date, source_updated_at, payload, updated_at - ) - select gen_random_uuid(), - ?, - master.event_source_id, + resolved_driver_ids as ( + select master.event_source_id, master.source_driver_entity_id, - master.first_names, - master.last_name, - master.birth_date, master.source_updated_at, master.payload, - now() + coalesce(identity.driver_id, ( + select created.id + from eventhub.driver created + where created.payload = master.payload + order by created.updated_at desc + limit 1 + )) as driver_id from master_drivers master - where not exists ( - select 1 - from eventhub.driver existing - where existing.tenant_key = ? - and existing.event_source_id in (select id from compatible_sources) - and existing.source_driver_entity_id = master.source_driver_entity_id - ) - returning id + left join eventhub.source_driver_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_entity_id = master.source_driver_entity_id ) - select (select count(*) from updated_by_source) - + (select count(*) from inserted) + insert into eventhub.source_driver_identity( + id, tenant_key, event_source_id, source_driver_entity_id, + driver_id, source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + ?, + resolved.event_source_id, + resolved.source_driver_entity_id, + resolved.driver_id, + resolved.source_updated_at, + resolved.payload, + now() + from resolved_driver_ids resolved + where resolved.driver_id is not null + on conflict (tenant_key, event_source_id, source_driver_entity_id) + do update set + driver_id = excluded.driver_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_identity.source_updated_at), + payload = eventhub.source_driver_identity.payload || excluded.payload, + updated_at = now() """, - Long.class, eventSourceId, tenantKey, tenantKey, - tenantKey, tenantKey ); - return count == null ? 0 : Math.toIntExact(count); + + return insertedDrivers + updatedDrivers + linkedDrivers; } - private int projectDriverCardsFromMasterData(String tenantKey, int eventSourceId) { - Long count = jdbcTemplate.queryForObject( + private int reconcileDriverCardsFromMasterData(String tenantKey, int eventSourceId) { + int insertedCards = jdbcTemplate.update( compatibleSourcesCte() + """ - , driver_card_projection as ( - select distinct on (rel.to_source_entity_id) - rel.to_source_entity_id as source_driver_entity_id, - nullif(trim(card.payload ->> 'card_nation'), '') as card_nation, - nullif(trim(card.payload ->> 'card_number'), '') as card_number, - rel.source_updated_at - from eventhub.source_master_relation rel - join eventhub.source_master_entity card - on card.tenant_key = rel.tenant_key - and card.event_source_id = rel.event_source_id - and card.entity_type = 'DRIVER_CARD' - and card.source_entity_id = rel.from_source_entity_id - where rel.tenant_key = ? - and rel.event_source_id in (select id from compatible_sources) - and rel.relation_type = 'DRIVER_CARD_DRIVER' - and rel.from_entity_type = 'DRIVER_CARD' - and rel.to_entity_type = 'DRIVER' - order by rel.to_source_entity_id, - rel.valid_to desc nulls last, - rel.valid_from desc nulls last, - rel.updated_at desc + , master_driver_cards as ( + select distinct on ( + nullif(trim(source_entity_id), ''), + nullif(trim(payload ->> 'card_nation'), ''), + nullif(trim(payload ->> 'card_number'), '') + ) + event_source_id, + nullif(trim(source_entity_id), '') as source_driver_card_entity_id, + nullif(trim(payload ->> 'card_nation'), '') as card_nation, + nullif(trim(payload ->> 'card_number'), '') as card_number, + source_updated_at, + payload + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'DRIVER_CARD' + and nullif(trim(payload ->> 'card_nation'), '') is not null + and nullif(trim(payload ->> 'card_number'), '') is not null + order by nullif(trim(source_entity_id), ''), + nullif(trim(payload ->> 'card_nation'), ''), + nullif(trim(payload ->> 'card_number'), ''), + updated_at desc ), - updated_by_source as ( - update eventhub.driver driver - set card_nation = coalesce(driver.card_nation, projection.card_nation), - card_number = coalesce(driver.card_number, projection.card_number), - source_updated_at = coalesce(projection.source_updated_at, driver.source_updated_at), - updated_at = now() - from driver_card_projection projection - where driver.tenant_key = ? - and driver.event_source_id in (select id from compatible_sources) - and driver.source_driver_entity_id = projection.source_driver_entity_id - and ( - (driver.card_nation is null and projection.card_nation is not null) - or (driver.card_number is null and projection.card_number is not null) - ) - returning driver.id + canonical_driver_cards as ( + select distinct on (master.card_nation, master.card_number) + master.card_nation, + master.card_number, + master.source_updated_at, + master.payload + from master_driver_cards master + order by master.card_nation, + master.card_number, + case when master.source_driver_card_entity_id is null then 1 else 0 end, + master.source_updated_at desc, + master.source_driver_card_entity_id + ), + existing_driver_cards as ( + select distinct on (existing.nation, existing.card_number) + existing.id, + existing.nation, + existing.card_number + from eventhub.driver_card existing + order by existing.nation, + existing.card_number, + case when existing.driver_id is null then 1 else 0 end, + existing.updated_at desc, + existing.created_at desc, + existing.id + ), + resolved_cards as ( + select canonical.card_nation, + canonical.card_number, + canonical.source_updated_at, + canonical.payload, + coalesce(existing.id, gen_random_uuid()) as driver_card_id + from canonical_driver_cards canonical + left join existing_driver_cards existing + on existing.nation = canonical.card_nation + and existing.card_number = canonical.card_number + ) + insert into eventhub.driver_card( + id, driver_id, nation, card_number, source_updated_at, payload, updated_at + ) + select distinct on (resolved.driver_card_id) + resolved.driver_card_id, + null, + resolved.card_nation, + resolved.card_number, + resolved.source_updated_at, + resolved.payload, + now() + from resolved_cards resolved + where not exists ( + select 1 + from eventhub.driver_card existing + where existing.id = resolved.driver_card_id ) - select count(*) - from updated_by_source """, - Long.class, eventSourceId, + tenantKey + ); + + int updatedCards = jdbcTemplate.update( + compatibleSourcesCte() + """ + , master_driver_cards as ( + select distinct on ( + nullif(trim(source_entity_id), ''), + nullif(trim(payload ->> 'card_nation'), ''), + nullif(trim(payload ->> 'card_number'), '') + ) + nullif(trim(source_entity_id), '') as source_driver_card_entity_id, + nullif(trim(payload ->> 'card_nation'), '') as card_nation, + nullif(trim(payload ->> 'card_number'), '') as card_number, + source_updated_at, + payload + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'DRIVER_CARD' + and nullif(trim(payload ->> 'card_nation'), '') is not null + and nullif(trim(payload ->> 'card_number'), '') is not null + order by nullif(trim(source_entity_id), ''), + nullif(trim(payload ->> 'card_nation'), ''), + nullif(trim(payload ->> 'card_number'), ''), + updated_at desc + ), + canonical_driver_cards as ( + select distinct on (master.card_nation, master.card_number) + master.card_nation, + master.card_number, + master.source_updated_at, + master.payload + from master_driver_cards master + order by master.card_nation, + master.card_number, + case when master.source_driver_card_entity_id is null then 1 else 0 end, + master.source_updated_at desc, + master.source_driver_card_entity_id + ), + existing_driver_cards as ( + select distinct on (existing.nation, existing.card_number) + existing.id, + existing.nation, + existing.card_number + from eventhub.driver_card existing + order by existing.nation, + existing.card_number, + case when existing.driver_id is null then 1 else 0 end, + existing.updated_at desc, + existing.created_at desc, + existing.id + ), + resolved_cards as ( + select canonical.source_updated_at, + canonical.payload, + existing.id as driver_card_id + from canonical_driver_cards canonical + join existing_driver_cards existing + on existing.nation = canonical.card_nation + and existing.card_number = canonical.card_number + ) + update eventhub.driver_card card + set source_updated_at = coalesce(resolved.source_updated_at, card.source_updated_at), + payload = card.payload || resolved.payload, + updated_at = now() + from resolved_cards resolved + where card.id = resolved.driver_card_id + """, + eventSourceId, + tenantKey + ); + + int linkedCards = jdbcTemplate.update( + compatibleSourcesCte() + """ + , master_driver_cards as ( + select distinct on ( + nullif(trim(source_entity_id), ''), + nullif(trim(payload ->> 'card_nation'), ''), + nullif(trim(payload ->> 'card_number'), '') + ) + event_source_id, + nullif(trim(source_entity_id), '') as source_driver_card_entity_id, + nullif(trim(payload ->> 'card_nation'), '') as card_nation, + nullif(trim(payload ->> 'card_number'), '') as card_number, + source_updated_at, + payload + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'DRIVER_CARD' + and nullif(trim(payload ->> 'card_nation'), '') is not null + and nullif(trim(payload ->> 'card_number'), '') is not null + order by nullif(trim(source_entity_id), ''), + nullif(trim(payload ->> 'card_nation'), ''), + nullif(trim(payload ->> 'card_number'), ''), + updated_at desc + ), + existing_driver_cards as ( + select distinct on (existing.nation, existing.card_number) + existing.id, + existing.nation, + existing.card_number + from eventhub.driver_card existing + order by existing.nation, + existing.card_number, + case when existing.driver_id is null then 1 else 0 end, + existing.updated_at desc, + existing.created_at desc, + existing.id + ), + resolved_cards as ( + select master.event_source_id, + master.source_driver_card_entity_id, + master.source_updated_at, + master.payload, + coalesce(identity.driver_card_id, existing.id) as driver_card_id + from master_driver_cards master + left join eventhub.source_driver_card_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_card_entity_id = master.source_driver_card_entity_id + left join existing_driver_cards existing + on existing.nation = master.card_nation + and existing.card_number = master.card_number + ) + insert into eventhub.source_driver_card_identity( + id, tenant_key, event_source_id, source_driver_card_entity_id, + driver_card_id, source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + ?, + resolved.event_source_id, + resolved.source_driver_card_entity_id, + resolved.driver_card_id, + resolved.source_updated_at, + resolved.payload, + now() + from resolved_cards resolved + where resolved.source_driver_card_entity_id is not null + and resolved.driver_card_id is not null + on conflict (tenant_key, event_source_id, source_driver_card_entity_id) + do update set + driver_card_id = excluded.driver_card_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_card_identity.source_updated_at), + payload = eventhub.source_driver_card_identity.payload || excluded.payload, + updated_at = now() + """, + eventSourceId, + tenantKey, tenantKey, tenantKey ); - return count == null ? 0 : Math.toIntExact(count); + + return insertedCards + updatedCards + linkedCards; } - private UUID resolveDriverId( - String tenantKey, - int eventSourceId, - String sourceDriverEntityId, + private int projectDriverCardLinksFromMasterData(String tenantKey, int eventSourceId) { + return jdbcTemplate.update( + compatibleSourcesCte() + """ + update eventhub.driver_card card + set driver_id = driver_identity.driver_id, + source_updated_at = coalesce(relation.source_updated_at, card.source_updated_at), + updated_at = now() + from eventhub.source_master_relation relation + join eventhub.source_driver_card_identity card_identity + on card_identity.tenant_key = relation.tenant_key + and card_identity.event_source_id = relation.event_source_id + and card_identity.source_driver_card_entity_id = relation.from_source_entity_id + join eventhub.source_driver_identity driver_identity + on driver_identity.tenant_key = relation.tenant_key + and driver_identity.event_source_id = relation.event_source_id + and driver_identity.source_driver_entity_id = relation.to_source_entity_id + where relation.tenant_key = ? + and relation.event_source_id in (select id from compatible_sources) + and relation.relation_type = 'DRIVER_CARD_DRIVER' + and relation.from_entity_type = 'DRIVER_CARD' + and relation.to_entity_type = 'DRIVER' + and card.id = card_identity.driver_card_id + and ( + card.driver_id is null + or card.driver_id = driver_identity.driver_id + or not exists ( + select 1 + from eventhub.source_driver_identity existing_identity + where existing_identity.driver_id = card.driver_id + ) + ) + """, + eventSourceId, + tenantKey + ); + } + + private UUID resolveOrCreateDriverCardId( String cardNation, - String cardNumber + String cardNumber, + UUID preferredDriverId ) { - UUID driverId = findBySourceDriverEntityId(tenantKey, eventSourceId, sourceDriverEntityId); - if (driverId == null) { - driverId = findByCard(tenantKey, eventSourceId, cardNation, cardNumber); + if (cardNation == null || cardNumber == null) { + return null; } - return driverId; + UUID driverCardId = findDriverCardByCard(cardNation, cardNumber); + if (driverCardId == null) { + Map payload = new LinkedHashMap<>(); + put(payload, "source", "event"); + put(payload, "card_nation", cardNation); + put(payload, "card_number", cardNumber); + return createDriverCard( + preferredDriverId, + cardNation, + cardNumber, + null, + payload + ); + } + if (preferredDriverId != null) { + linkDriverCard(driverCardId, preferredDriverId); + } + return driverCardId; } private UUID findBySourceDriverEntityId(String tenantKey, int eventSourceId, String sourceDriverEntityId) { @@ -217,39 +625,50 @@ public class DriverIdentityRepository { } return jdbcTemplate.query( compatibleSourcesCte() + """ - select d.id - from eventhub.driver d - where d.tenant_key = ? - and d.event_source_id in (select id from compatible_sources) - and d.source_driver_entity_id = ? - order by d.updated_at desc + select identity.driver_id + from eventhub.source_driver_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_entity_id = ? + order by identity.updated_at desc limit 1 """, - rs -> rs.next() ? (UUID) rs.getObject("id") : null, + rs -> rs.next() ? (UUID) rs.getObject("driver_id") : null, eventSourceId, tenantKey, sourceDriverEntityId ); } - private UUID findByCard(String tenantKey, int eventSourceId, String cardNation, String cardNumber) { + private UUID findDriverIdByCardId(UUID driverCardId) { + if (driverCardId == null) { + return null; + } + return jdbcTemplate.query( + """ + select driver_id + from eventhub.driver_card + where id = ? + """, + rs -> rs.next() ? (UUID) rs.getObject("driver_id") : null, + driverCardId + ); + } + + private UUID findDriverCardByCard(String cardNation, String cardNumber) { if (cardNation == null || cardNumber == null) { return null; } return jdbcTemplate.query( - compatibleSourcesCte() + """ - select d.id - from eventhub.driver d - where d.tenant_key = ? - and d.event_source_id in (select id from compatible_sources) - and d.card_nation = ? - and d.card_number = ? - order by d.updated_at desc + """ + select card.id + from eventhub.driver_card card + where card.nation = ? + and card.card_number = ? + order by card.updated_at desc limit 1 """, rs -> rs.next() ? (UUID) rs.getObject("id") : null, - eventSourceId, - tenantKey, cardNation, cardNumber ); @@ -259,66 +678,152 @@ public class DriverIdentityRepository { String tenantKey, int eventSourceId, String sourceDriverEntityId, - String cardNation, - String cardNumber, String firstNames, String lastName, OffsetDateTime sourceUpdatedAt, + LocalDate birthDate, Map payload ) { UUID driverId = UUID.randomUUID(); - jdbcTemplate.update( - """ - insert into eventhub.driver( - id, tenant_key, event_source_id, source_driver_entity_id, - card_nation, card_number, first_names, last_name, - source_updated_at, payload, updated_at - ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) - """, - driverId, - tenantKey, - eventSourceId, - sourceDriverEntityId, - cardNation, - cardNumber, - firstNames, - lastName, - sourceUpdatedAt, - toJson(payload) - ); + if (driverUsesLegacySchema()) { + jdbcTemplate.update( + """ + insert into eventhub.driver( + id, tenant_key, event_source_id, source_driver_entity_id, + first_names, last_name, birth_date, source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) + """, + driverId, + tenantKey, + eventSourceId, + sourceDriverEntityId, + firstNames, + lastName, + birthDate, + sourceUpdatedAt, + toJson(payload) + ); + } else { + jdbcTemplate.update( + """ + insert into eventhub.driver( + id, first_names, last_name, birth_date, source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?::jsonb, now()) + """, + driverId, + firstNames, + lastName, + birthDate, + sourceUpdatedAt, + toJson(payload) + ); + } return driverId; } - private void touchDriver( + private boolean driverUsesLegacySchema() { + Integer legacyColumns = jdbcTemplate.queryForObject( + """ + select count(*) + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'driver' + and column_name in ('tenant_key', 'event_source_id', 'source_driver_entity_id') + """, + Integer.class + ); + return legacyColumns != null && legacyColumns == 3; + } + + private UUID createDriverCard( UUID driverId, - String sourceDriverEntityId, String cardNation, - String cardNumber + String cardNumber, + OffsetDateTime sourceUpdatedAt, + Map payload ) { - if (sourceDriverEntityId == null && cardNation == null && cardNumber == null) { + UUID driverCardId = UUID.randomUUID(); + jdbcTemplate.update( + """ + insert into eventhub.driver_card( + id, driver_id, nation, card_number, source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?::jsonb, now()) + """, + driverCardId, + driverId, + cardNation, + cardNumber, + sourceUpdatedAt, + toJson(payload) + ); + return driverCardId; + } + + private void upsertSourceDriverIdentity( + String tenantKey, + int eventSourceId, + String sourceDriverEntityId, + UUID driverId, + OffsetDateTime sourceUpdatedAt, + Map payload + ) { + if (sourceDriverEntityId == null || driverId == null) { return; } jdbcTemplate.update( """ - update eventhub.driver - set source_driver_entity_id = coalesce(source_driver_entity_id, cast(? as text)), - card_nation = coalesce(card_nation, cast(? as text)), - card_number = coalesce(card_number, cast(? as text)), + insert into eventhub.source_driver_identity( + id, tenant_key, event_source_id, source_driver_entity_id, + driver_id, source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?, ?::jsonb, now()) + on conflict (tenant_key, event_source_id, source_driver_entity_id) + do update set + driver_id = excluded.driver_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_identity.source_updated_at), + payload = eventhub.source_driver_identity.payload || excluded.payload, + updated_at = now() + """, + UUID.randomUUID(), + tenantKey, + eventSourceId, + sourceDriverEntityId, + driverId, + sourceUpdatedAt, + toJson(payload) + ); + } + + private void linkDriverCard(UUID driverCardId, UUID driverId) { + if (driverCardId == null || driverId == null) { + return; + } + jdbcTemplate.update( + """ + update eventhub.driver_card + set driver_id = ?, updated_at = now() where id = ? and ( - (source_driver_entity_id is null and cast(? as text) is not null) - or (card_nation is null and cast(? as text) is not null) - or (card_number is null and cast(? as text) is not null) + driver_id is null + or driver_id = ? + or ( + exists ( + select 1 + from eventhub.source_driver_identity preferred_identity + where preferred_identity.driver_id = ? + ) + and not exists ( + select 1 + from eventhub.source_driver_identity current_identity + where current_identity.driver_id = eventhub.driver_card.driver_id + ) + ) ) """, - sourceDriverEntityId, - cardNation, - cardNumber, driverId, - sourceDriverEntityId, - cardNation, - cardNumber + driverCardId, + driverId, + driverId ); } @@ -369,4 +874,10 @@ public class DriverIdentityRepository { String trimmed = value.trim(); return trimmed.isEmpty() ? null : trimmed; } + + public record ResolvedDriverReference(UUID driverId, UUID driverCardId) { + public static ResolvedDriverReference empty() { + return new ResolvedDriverReference(null, null); + } + } } diff --git a/src/main/java/at/procon/eventhub/persistence/EventRepository.java b/src/main/java/at/procon/eventhub/persistence/EventRepository.java index 04e5f0b..87dbfb9 100644 --- a/src/main/java/at/procon/eventhub/persistence/EventRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/EventRepository.java @@ -57,11 +57,12 @@ public class EventRepository { */ public int batchInsert(UUID packageId, String tenantKey, int eventSourceId, List events) { Map entityIdCache = new HashMap<>(); + Map driverRefCache = new HashMap<>(); Map> vehicleRefCache = new HashMap<>(); List rows = new ArrayList<>(events.size()); for (EventHubEventDto event : events) { - ResolvedEntityRefs refs = resolveEntityRefs(tenantKey, eventSourceId, event, entityIdCache, vehicleRefCache); + ResolvedEntityRefs refs = resolveEntityRefs(tenantKey, eventSourceId, event, entityIdCache, driverRefCache, vehicleRefCache); rows.add(resolveEventImportRow(packageId, eventSourceId, event, refs)); } @@ -93,6 +94,7 @@ public class EventRepository { eventSourceId, event.externalSourceEventId(), refs.driverId(), + refs.driverCardId(), refs.vehicleId(), refs.vehicleRegistrationId(), sourcePackageId, @@ -125,6 +127,7 @@ public class EventRepository { event_source_id integer not null, external_source_event_id text not null, driver_id uuid, + driver_card_id uuid, vehicle_id uuid, vehicle_registration_id uuid, source_package_id text, @@ -152,11 +155,11 @@ public class EventRepository { """ insert into eventhub_event_import_stage( row_no, source_record_key_hash, requested_event_id, data_package_id, event_source_id, - external_source_event_id, driver_id, vehicle_id, vehicle_registration_id, + external_source_event_id, driver_id, driver_card_id, vehicle_id, vehicle_registration_id, source_package_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at, event_domain, event_type, lifecycle, odometer_m, longitude, latitude, payload, manual_entry, event_signature_hash - ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?) + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?) """, new BatchPreparedStatementSetter() { @Override @@ -169,22 +172,23 @@ public class EventRepository { ps.setInt(5, row.eventSourceId()); ps.setString(6, row.externalSourceEventId()); ps.setObject(7, row.driverId()); - ps.setObject(8, row.vehicleId()); - ps.setObject(9, row.vehicleRegistrationId()); - ps.setString(10, row.sourcePackageId()); - ps.setObject(11, row.sourcePackageEntityId()); - ps.setObject(12, row.occurredAt()); - ps.setObject(13, row.receivedPartnerAt()); - ps.setObject(14, row.receivedHubAt()); - ps.setString(15, row.eventDomain()); - ps.setString(16, row.eventType()); - ps.setString(17, row.lifecycle()); - ps.setObject(18, row.odometerM()); - ps.setObject(19, row.longitude()); - ps.setObject(20, row.latitude()); - ps.setString(21, row.payloadJson()); - ps.setBoolean(22, row.manualEntry()); - ps.setString(23, row.eventSignatureHash()); + ps.setObject(8, row.driverCardId()); + ps.setObject(9, row.vehicleId()); + ps.setObject(10, row.vehicleRegistrationId()); + ps.setString(11, row.sourcePackageId()); + ps.setObject(12, row.sourcePackageEntityId()); + ps.setObject(13, row.occurredAt()); + ps.setObject(14, row.receivedPartnerAt()); + ps.setObject(15, row.receivedHubAt()); + ps.setString(16, row.eventDomain()); + ps.setString(17, row.eventType()); + ps.setString(18, row.lifecycle()); + ps.setObject(19, row.odometerM()); + ps.setObject(20, row.longitude()); + ps.setObject(21, row.latitude()); + ps.setString(22, row.payloadJson()); + ps.setBoolean(23, row.manualEntry()); + ps.setString(24, row.eventSignatureHash()); } @Override @@ -236,7 +240,7 @@ public class EventRepository { insert into eventhub.event( id, event_source_id, data_package_id, external_source_event_id, - driver_id, vehicle_id, vehicle_registration_id, + driver_id, driver_card_id, vehicle_id, vehicle_registration_id, source_package_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at, event_domain, event_type, lifecycle, @@ -247,7 +251,7 @@ public class EventRepository { select source_record.event_id, stage.event_source_id, stage.data_package_id, stage.external_source_event_id, - stage.driver_id, stage.vehicle_id, stage.vehicle_registration_id, + stage.driver_id, stage.driver_card_id, stage.vehicle_id, stage.vehicle_registration_id, stage.source_package_id, stage.source_package_entity_id, source_record.event_occurred_at, stage.received_partner_at, stage.received_hub_at, stage.event_domain, stage.event_type, stage.lifecycle, @@ -358,33 +362,39 @@ public class EventRepository { int eventSourceId, EventHubEventDto event, Map entityIdCache, + Map driverRefCache, Map> vehicleRefCache ) { - UUID driverId = resolveDriverId(tenantKey, eventSourceId, event, entityIdCache); + DriverIdentityRepository.ResolvedDriverReference driverRef = resolveDriverReference(tenantKey, eventSourceId, event, driverRefCache); ResolvedVehicleReference vehicleRef = resolveVehicleReference(tenantKey, eventSourceId, event, vehicleRefCache); UUID sourcePackageEntityId = resolveSourcePackageEntityId(tenantKey, eventSourceId, event, entityIdCache); - return new ResolvedEntityRefs(driverId, vehicleRef.vehicleId(), vehicleRef.vehicleRegistrationId(), sourcePackageEntityId); + return new ResolvedEntityRefs( + driverRef.driverId(), + driverRef.driverCardId(), + vehicleRef.vehicleId(), + vehicleRef.vehicleRegistrationId(), + sourcePackageEntityId + ); } - private UUID resolveDriverId( + private DriverIdentityRepository.ResolvedDriverReference resolveDriverReference( String tenantKey, int eventSourceId, EventHubEventDto event, - Map entityIdCache + Map driverRefCache ) { DriverRefDto driverRef = event.driverRef(); if (driverRef == null || !driverRef.hasAnyReference()) { - return null; + return DriverIdentityRepository.ResolvedDriverReference.empty(); } String cacheKey = "DRIVER|" + driverRef.stableKey(); - UUID cached = entityIdCache.get(cacheKey); + DriverIdentityRepository.ResolvedDriverReference cached = driverRefCache.get(cacheKey); if (cached != null) { return cached; } - UUID resolved = driverIdentityRepository.resolveOrCreateDriverId(tenantKey, eventSourceId, driverRef); - if (resolved != null) { - entityIdCache.put(cacheKey, resolved); - } + DriverIdentityRepository.ResolvedDriverReference resolved = + driverIdentityRepository.resolveOrCreateDriverReference(tenantKey, eventSourceId, driverRef); + driverRefCache.put(cacheKey, resolved); return resolved; } @@ -587,6 +597,7 @@ public class EventRepository { private record ResolvedEntityRefs( UUID driverId, + UUID driverCardId, UUID vehicleId, UUID vehicleRegistrationId, UUID sourcePackageEntityId @@ -606,6 +617,7 @@ public class EventRepository { int eventSourceId, String externalSourceEventId, UUID driverId, + UUID driverCardId, UUID vehicleId, UUID vehicleRegistrationId, String sourcePackageId, diff --git a/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java index ab38373..fc2e414 100644 --- a/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java @@ -5,8 +5,6 @@ import at.procon.eventhub.dto.VehicleRegistrationRefDto; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.OffsetDateTime; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import org.springframework.jdbc.core.JdbcTemplate; @@ -47,16 +45,22 @@ public class VehicleIdentityRepository { eventSourceId, sourceRegistrationEntityId, registrationNation, - registrationNumber, - occurredAt + registrationNumber ); if (registrationId == null && (sourceRegistrationEntityId != null || registrationNumber != null)) { registrationId = createRegistration( + registrationNation, + registrationNumber, + null, + Map.of("source", "event") + ); + } + if (registrationId != null && sourceRegistrationEntityId != null) { + upsertSourceVehicleRegistrationIdentity( normalizedTenantKey, eventSourceId, sourceRegistrationEntityId, - registrationNation, - registrationNumber, + registrationId, null, Map.of("source", "event") ); @@ -70,24 +74,25 @@ public class VehicleIdentityRepository { ); AssignedVehicleReference assignedVehicle = null; if (vehicleId == null && registrationId != null) { - assignedVehicle = resolveAssignedVehicleReference(registrationId, occurredAt); + assignedVehicle = resolveAssignedVehicleReference(normalizedTenantKey, eventSourceId, registrationId, occurredAt); vehicleId = assignedVehicle == null ? null : assignedVehicle.vehicleId(); } if (vehicleId == null && (sourceVehicleEntityId != null || vin != null)) { - vehicleId = createVehicle( + vehicleId = createVehicle(vin); + } + if (vehicleId != null && sourceVehicleEntityId != null) { + upsertSourceVehicleIdentity( normalizedTenantKey, eventSourceId, sourceVehicleEntityId, - vin + vehicleId, + null, + Map.of("source", "event") ); } + touchVehicle(vehicleId, vin); + touchRegistration(registrationId, registrationNation, registrationNumber); - if (vehicleId != null) { - touchVehicle(vehicleId, sourceVehicleEntityId, vin); - } - if (registrationId != null) { - touchRegistration(registrationId, sourceRegistrationEntityId, registrationNation, registrationNumber); - } return new ResolvedVehicleReferenceResolution( new ResolvedVehicleReference(vehicleId, registrationId), assignedVehicle != null, @@ -102,16 +107,14 @@ public class VehicleIdentityRepository { int updates = reconcileVehiclesFromMasterData(normalizedTenantKey, eventSourceId); updates += reconcileRegistrationsFromMasterData(normalizedTenantKey, eventSourceId); updates += projectVehicleRegistrationAssignments(normalizedTenantKey, eventSourceId); - return updates; } private int reconcileVehiclesFromMasterData(String tenantKey, int eventSourceId) { - Long count = jdbcTemplate.queryForObject( + int insertedVehicles = jdbcTemplate.update( compatibleSourcesCte() + """ , master_vehicles as ( select distinct on ( - event_source_id, nullif(trim(source_entity_id), ''), nullif(trim(source_external_key), '') ) @@ -126,72 +129,149 @@ public class VehicleIdentityRepository { nullif(trim(source_entity_id), '') is not null or nullif(trim(source_external_key), '') is not null ) - order by event_source_id, - nullif(trim(source_entity_id), ''), + order by nullif(trim(source_entity_id), ''), nullif(trim(source_external_key), ''), updated_at desc ), - updated_by_source as ( - update eventhub.vehicle vehicle - set source_vehicle_entity_id = coalesce(vehicle.source_vehicle_entity_id, master.source_vehicle_entity_id), - vin = coalesce(vehicle.vin, master.vin), - updated_at = now() + resolved_vehicles as ( + select master.event_source_id, + master.source_vehicle_entity_id, + master.vin, + coalesce(identity.vehicle_id, existing.id, gen_random_uuid()) as vehicle_id from master_vehicles master - where vehicle.tenant_key = ? - and vehicle.event_source_id in (select id from compatible_sources) - and master.source_vehicle_entity_id is not null - and vehicle.source_vehicle_entity_id = master.source_vehicle_entity_id - returning vehicle.id - ), - updated_by_vin as ( - update eventhub.vehicle vehicle - set source_vehicle_entity_id = coalesce(vehicle.source_vehicle_entity_id, master.source_vehicle_entity_id), - vin = coalesce(vehicle.vin, master.vin), - updated_at = now() - from master_vehicles master - where vehicle.tenant_key = ? - and vehicle.event_source_id in (select id from compatible_sources) - and master.vin is not null - and vehicle.vin = master.vin - returning vehicle.id - ), - inserted as ( - insert into eventhub.vehicle(id, tenant_key, event_source_id, source_vehicle_entity_id, vin, updated_at) - select gen_random_uuid(), ?, master.event_source_id, master.source_vehicle_entity_id, master.vin, now() - from master_vehicles master - where not exists ( - select 1 - from eventhub.vehicle existing - where existing.tenant_key = ? - and existing.event_source_id in (select id from compatible_sources) - and ( - (master.source_vehicle_entity_id is not null and existing.source_vehicle_entity_id = master.source_vehicle_entity_id) - or (master.vin is not null and existing.vin = master.vin) - ) - ) - returning id + left join eventhub.source_vehicle_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_vehicle_entity_id = master.source_vehicle_entity_id + left join eventhub.vehicle existing + on master.vin is not null + and existing.vin = master.vin + ) + insert into eventhub.vehicle(id, vin, updated_at) + select distinct on (resolved.vehicle_id) + resolved.vehicle_id, + resolved.vin, + now() + from resolved_vehicles resolved + where not exists ( + select 1 + from eventhub.vehicle existing + where existing.id = resolved.vehicle_id ) - select (select count(*) from updated_by_source) - + (select count(*) from updated_by_vin) - + (select count(*) from inserted) """, - Long.class, eventSourceId, tenantKey, + tenantKey + ); + + int updatedVehicles = jdbcTemplate.update( + compatibleSourcesCte() + """ + , master_vehicles as ( + select distinct on ( + nullif(trim(source_entity_id), ''), + nullif(trim(source_external_key), '') + ) + nullif(trim(source_entity_id), '') as source_vehicle_entity_id, + nullif(trim(source_external_key), '') as vin + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'VEHICLE' + and ( + nullif(trim(source_entity_id), '') is not null + or nullif(trim(source_external_key), '') is not null + ) + order by nullif(trim(source_entity_id), ''), + nullif(trim(source_external_key), ''), + updated_at desc + ) + update eventhub.vehicle vehicle + set vin = coalesce(vehicle.vin, master.vin), + updated_at = now() + from master_vehicles master + left join eventhub.source_vehicle_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_vehicle_entity_id = master.source_vehicle_entity_id + where vehicle.id = coalesce(identity.vehicle_id, ( + select existing.id + from eventhub.vehicle existing + where master.vin is not null + and existing.vin = master.vin + limit 1 + )) + """, + eventSourceId, tenantKey, + tenantKey + ); + + int linkedVehicles = jdbcTemplate.update( + compatibleSourcesCte() + """ + , master_vehicles as ( + select distinct on ( + nullif(trim(source_entity_id), ''), + nullif(trim(source_external_key), '') + ) + event_source_id, + nullif(trim(source_entity_id), '') as source_vehicle_entity_id, + nullif(trim(source_external_key), '') as vin + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'VEHICLE' + and nullif(trim(source_entity_id), '') is not null + order by nullif(trim(source_entity_id), ''), + nullif(trim(source_external_key), ''), + updated_at desc + ), + resolved_vehicles as ( + select master.event_source_id, + master.source_vehicle_entity_id, + coalesce(identity.vehicle_id, existing.id) as vehicle_id + from master_vehicles master + left join eventhub.source_vehicle_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_vehicle_entity_id = master.source_vehicle_entity_id + left join eventhub.vehicle existing + on master.vin is not null + and existing.vin = master.vin + ) + insert into eventhub.source_vehicle_identity( + id, tenant_key, event_source_id, source_vehicle_entity_id, + vehicle_id, source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + ?, + resolved.event_source_id, + resolved.source_vehicle_entity_id, + resolved.vehicle_id, + null, + '{}'::jsonb, + now() + from resolved_vehicles resolved + where resolved.source_vehicle_entity_id is not null + and resolved.vehicle_id is not null + on conflict (tenant_key, event_source_id, source_vehicle_entity_id) + do update set + vehicle_id = excluded.vehicle_id, + updated_at = now() + """, + eventSourceId, tenantKey, tenantKey, tenantKey ); - return count == null ? 0 : Math.toIntExact(count); + + return insertedVehicles + updatedVehicles + linkedVehicles; } private int reconcileRegistrationsFromMasterData(String tenantKey, int eventSourceId) { - Long count = jdbcTemplate.queryForObject( + int insertedRegistrations = jdbcTemplate.update( compatibleSourcesCte() + """ , master_registrations as ( select distinct on ( - event_source_id, nullif(trim(source_entity_id), ''), coalesce( nullif(trim(payload ->> 'registration_nation'), ''), @@ -225,8 +305,7 @@ public class VehicleIdentityRepository { nullif(trim(payload ->> 'registration_number'), '') is not null or source_external_key like '%:%' ) - order by event_source_id, - nullif(trim(source_entity_id), ''), + order by nullif(trim(source_entity_id), ''), coalesce( nullif(trim(payload ->> 'registration_nation'), ''), nullif(split_part(source_external_key, ':', 1), '') @@ -237,80 +316,198 @@ public class VehicleIdentityRepository { ), updated_at desc ), - updated_by_source as ( - update eventhub.vehicle_registration registration - set source_registration_entity_id = coalesce(registration.source_registration_entity_id, master.source_registration_entity_id), - nation = coalesce(master.nation, registration.nation), - registration_number = coalesce(master.registration_number, registration.registration_number), - source_updated_at = master.source_updated_at, - updated_at = now() - from master_registrations master - where registration.tenant_key = ? - and registration.event_source_id in (select id from compatible_sources) - and master.source_registration_entity_id is not null - and registration.source_registration_entity_id = master.source_registration_entity_id - returning registration.id - ), - updated_by_plate as ( - update eventhub.vehicle_registration registration - set source_registration_entity_id = coalesce(registration.source_registration_entity_id, master.source_registration_entity_id), - nation = coalesce(master.nation, registration.nation), - registration_number = coalesce(master.registration_number, registration.registration_number), - source_updated_at = master.source_updated_at, - updated_at = now() - from master_registrations master - where registration.tenant_key = ? - and registration.event_source_id in (select id from compatible_sources) - and registration.nation = master.nation - and registration.registration_number = master.registration_number - returning registration.id - ), - inserted as ( - insert into eventhub.vehicle_registration( - id, tenant_key, event_source_id, source_registration_entity_id, nation, registration_number, - source_updated_at, payload, updated_at - ) - select gen_random_uuid(), - ?, - master.event_source_id, + resolved_registrations as ( + select master.event_source_id, master.source_registration_entity_id, master.nation, master.registration_number, master.source_updated_at, - jsonb_build_object('source', 'master-data'), - now() + coalesce(identity.vehicle_registration_id, existing.id, gen_random_uuid()) as registration_id from master_registrations master - where master.nation is not null - and master.registration_number is not null - and not exists ( - select 1 - from eventhub.vehicle_registration existing - where existing.tenant_key = ? - and existing.event_source_id in (select id from compatible_sources) - and ( - (master.source_registration_entity_id is not null - and existing.source_registration_entity_id = master.source_registration_entity_id) - or ( - existing.nation = master.nation - and existing.registration_number = master.registration_number - ) - ) - ) - returning id + left join eventhub.source_vehicle_registration_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_registration_entity_id = master.source_registration_entity_id + left join eventhub.vehicle_registration existing + on existing.nation = master.nation + and existing.registration_number = master.registration_number ) - select (select count(*) from updated_by_source) - + (select count(*) from updated_by_plate) - + (select count(*) from inserted) + insert into eventhub.vehicle_registration( + id, nation, registration_number, source_updated_at, payload, updated_at + ) + select distinct on (resolved.registration_id) + resolved.registration_id, + resolved.nation, + resolved.registration_number, + resolved.source_updated_at, + jsonb_build_object('source', 'master-data'), + now() + from resolved_registrations resolved + where resolved.nation is not null + and resolved.registration_number is not null + and not exists ( + select 1 + from eventhub.vehicle_registration existing + where existing.id = resolved.registration_id + ) """, - Long.class, eventSourceId, tenantKey, + tenantKey + ); + + int updatedRegistrations = jdbcTemplate.update( + compatibleSourcesCte() + """ + , master_registrations as ( + select distinct on ( + nullif(trim(source_entity_id), ''), + coalesce( + nullif(trim(payload ->> 'registration_nation'), ''), + nullif(split_part(source_external_key, ':', 1), '') + ), + coalesce( + nullif(trim(payload ->> 'registration_number'), ''), + nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + ) + ) + nullif(trim(source_entity_id), '') as source_registration_entity_id, + coalesce( + nullif(trim(payload ->> 'registration_nation'), ''), + nullif(split_part(source_external_key, ':', 1), '') + ) as nation, + coalesce( + nullif(trim(payload ->> 'registration_number'), ''), + nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + ) as registration_number, + source_updated_at + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'VEHICLE_REGISTRATION' + and ( + nullif(trim(payload ->> 'registration_nation'), '') is not null + or source_external_key like '%:%' + ) + and ( + nullif(trim(payload ->> 'registration_number'), '') is not null + or source_external_key like '%:%' + ) + order by nullif(trim(source_entity_id), ''), + coalesce( + nullif(trim(payload ->> 'registration_nation'), ''), + nullif(split_part(source_external_key, ':', 1), '') + ), + coalesce( + nullif(trim(payload ->> 'registration_number'), ''), + nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + ), + updated_at desc + ) + update eventhub.vehicle_registration registration + set source_updated_at = coalesce(master.source_updated_at, registration.source_updated_at), + payload = registration.payload || jsonb_build_object('source', 'master-data'), + updated_at = now() + from master_registrations master + left join eventhub.source_vehicle_registration_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_registration_entity_id = master.source_registration_entity_id + where registration.id = coalesce(identity.vehicle_registration_id, ( + select existing.id + from eventhub.vehicle_registration existing + where existing.nation = master.nation + and existing.registration_number = master.registration_number + limit 1 + )) + """, + eventSourceId, tenantKey, + tenantKey + ); + + int linkedRegistrations = jdbcTemplate.update( + compatibleSourcesCte() + """ + , master_registrations as ( + select distinct on ( + nullif(trim(source_entity_id), ''), + coalesce( + nullif(trim(payload ->> 'registration_nation'), ''), + nullif(split_part(source_external_key, ':', 1), '') + ), + coalesce( + nullif(trim(payload ->> 'registration_number'), ''), + nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + ) + ) + event_source_id, + nullif(trim(source_entity_id), '') as source_registration_entity_id, + coalesce( + nullif(trim(payload ->> 'registration_nation'), ''), + nullif(split_part(source_external_key, ':', 1), '') + ) as nation, + coalesce( + nullif(trim(payload ->> 'registration_number'), ''), + nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + ) as registration_number, + source_updated_at + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'VEHICLE_REGISTRATION' + and nullif(trim(source_entity_id), '') is not null + order by nullif(trim(source_entity_id), ''), + coalesce( + nullif(trim(payload ->> 'registration_nation'), ''), + nullif(split_part(source_external_key, ':', 1), '') + ), + coalesce( + nullif(trim(payload ->> 'registration_number'), ''), + nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + ), + updated_at desc + ), + resolved_registrations as ( + select master.event_source_id, + master.source_registration_entity_id, + master.source_updated_at, + coalesce(identity.vehicle_registration_id, existing.id) as registration_id + from master_registrations master + left join eventhub.source_vehicle_registration_identity identity + on identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_registration_entity_id = master.source_registration_entity_id + left join eventhub.vehicle_registration existing + on existing.nation = master.nation + and existing.registration_number = master.registration_number + ) + insert into eventhub.source_vehicle_registration_identity( + id, tenant_key, event_source_id, source_registration_entity_id, + vehicle_registration_id, source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + ?, + resolved.event_source_id, + resolved.source_registration_entity_id, + resolved.registration_id, + resolved.source_updated_at, + '{}'::jsonb, + now() + from resolved_registrations resolved + where resolved.source_registration_entity_id is not null + and resolved.registration_id is not null + on conflict (tenant_key, event_source_id, source_registration_entity_id) + do update set + vehicle_registration_id = excluded.vehicle_registration_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_vehicle_registration_identity.source_updated_at), + updated_at = now() + """, + eventSourceId, tenantKey, tenantKey, tenantKey ); - return count == null ? 0 : Math.toIntExact(count); + + return insertedRegistrations + updatedRegistrations + linkedRegistrations; } private UUID resolveVehicleId( @@ -321,7 +518,7 @@ public class VehicleIdentityRepository { ) { UUID vehicleId = findVehicleBySourceVehicleEntityId(tenantKey, eventSourceId, sourceVehicleEntityId); if (vehicleId == null) { - vehicleId = findVehicleByVin(tenantKey, eventSourceId, vin); + vehicleId = findVehicleByVin(vin); } return vehicleId; } @@ -331,12 +528,11 @@ public class VehicleIdentityRepository { int eventSourceId, String sourceRegistrationEntityId, String nation, - String registrationNumber, - OffsetDateTime occurredAt + String registrationNumber ) { - UUID registrationId = findRegistrationBySourceRegistrationEntityId(tenantKey, eventSourceId, sourceRegistrationEntityId, occurredAt); + UUID registrationId = findRegistrationBySourceRegistrationEntityId(tenantKey, eventSourceId, sourceRegistrationEntityId); if (registrationId == null) { - registrationId = findRegistrationByPlate(tenantKey, eventSourceId, nation, registrationNumber, occurredAt); + registrationId = findRegistrationByPlate(nation, registrationNumber); } return registrationId; } @@ -347,38 +543,34 @@ public class VehicleIdentityRepository { } return jdbcTemplate.query( compatibleSourcesCte() + """ - select v.id - from eventhub.vehicle v - where v.tenant_key = ? - and v.event_source_id in (select id from compatible_sources) - and v.source_vehicle_entity_id = ? - order by v.updated_at desc + select identity.vehicle_id + from eventhub.source_vehicle_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_vehicle_entity_id = ? + order by identity.updated_at desc limit 1 """, - rs -> rs.next() ? (UUID) rs.getObject("id") : null, + rs -> rs.next() ? (UUID) rs.getObject("vehicle_id") : null, eventSourceId, tenantKey, sourceVehicleEntityId ); } - private UUID findVehicleByVin(String tenantKey, int eventSourceId, String vin) { + private UUID findVehicleByVin(String vin) { if (vin == null) { return null; } return jdbcTemplate.query( - compatibleSourcesCte() + """ + """ select v.id from eventhub.vehicle v - where v.tenant_key = ? - and v.event_source_id in (select id from compatible_sources) - and v.vin = ? + where v.vin = ? order by v.updated_at desc limit 1 """, rs -> rs.next() ? (UUID) rs.getObject("id") : null, - eventSourceId, - tenantKey, vin ); } @@ -386,64 +578,60 @@ public class VehicleIdentityRepository { private UUID findRegistrationBySourceRegistrationEntityId( String tenantKey, int eventSourceId, - String sourceRegistrationEntityId, - OffsetDateTime occurredAt + String sourceRegistrationEntityId ) { if (sourceRegistrationEntityId == null) { return null; } return jdbcTemplate.query( compatibleSourcesCte() + """ - select r.id - from eventhub.vehicle_registration r - where r.tenant_key = ? - and r.event_source_id in (select id from compatible_sources) - and r.source_registration_entity_id = ? - order by r.updated_at desc + select identity.vehicle_registration_id + from eventhub.source_vehicle_registration_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_registration_entity_id = ? + order by identity.updated_at desc limit 1 """, - rs -> rs.next() ? (UUID) rs.getObject("id") : null, + rs -> rs.next() ? (UUID) rs.getObject("vehicle_registration_id") : null, eventSourceId, tenantKey, sourceRegistrationEntityId ); } - private UUID findRegistrationByPlate( - String tenantKey, - int eventSourceId, - String nation, - String registrationNumber, - OffsetDateTime occurredAt - ) { + private UUID findRegistrationByPlate(String nation, String registrationNumber) { if (nation == null || registrationNumber == null) { return null; } return jdbcTemplate.query( - compatibleSourcesCte() + """ + """ select r.id from eventhub.vehicle_registration r - where r.tenant_key = ? - and r.event_source_id in (select id from compatible_sources) - and r.nation = ? + where r.nation = ? and r.registration_number = ? order by r.updated_at desc limit 1 """, rs -> rs.next() ? (UUID) rs.getObject("id") : null, - eventSourceId, - tenantKey, nation, registrationNumber ); } - private AssignedVehicleReference resolveAssignedVehicleReference(UUID registrationId, OffsetDateTime occurredAt) { + private AssignedVehicleReference resolveAssignedVehicleReference( + String tenantKey, + int eventSourceId, + UUID registrationId, + OffsetDateTime occurredAt + ) { return jdbcTemplate.query( - """ + compatibleSourcesCte() + """ select vehicle_id, valid_from, valid_to from eventhub.vehicle_registration_assignment - where vehicle_registration_id = ? + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and vehicle_registration_id = ? and (? is null or valid_from is null or valid_from <= ?) and (? is null or valid_to is null or ? < valid_to) order by valid_from desc nulls last, updated_at desc @@ -456,6 +644,8 @@ public class VehicleIdentityRepository { rs.getObject("valid_to", OffsetDateTime.class) ) : null, + eventSourceId, + tenantKey, registrationId, occurredAt, occurredAt, @@ -464,31 +654,20 @@ public class VehicleIdentityRepository { ); } - private UUID createVehicle( - String tenantKey, - int eventSourceId, - String sourceVehicleEntityId, - String vin - ) { + private UUID createVehicle(String vin) { UUID vehicleId = UUID.randomUUID(); jdbcTemplate.update( """ - insert into eventhub.vehicle(id, tenant_key, event_source_id, source_vehicle_entity_id, vin, updated_at) - values (?, ?, ?, ?, ?, now()) + insert into eventhub.vehicle(id, vin, updated_at) + values (?, ?, now()) """, vehicleId, - tenantKey, - eventSourceId, - sourceVehicleEntityId, vin ); return vehicleId; } private UUID createRegistration( - String tenantKey, - int eventSourceId, - String sourceRegistrationEntityId, String nation, String registrationNumber, OffsetDateTime sourceUpdatedAt, @@ -501,14 +680,10 @@ public class VehicleIdentityRepository { jdbcTemplate.update( """ insert into eventhub.vehicle_registration( - id, tenant_key, event_source_id, source_registration_entity_id, nation, registration_number, - source_updated_at, payload, updated_at - ) values (?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) + id, nation, registration_number, source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?::jsonb, now()) """, registrationId, - tenantKey, - eventSourceId, - sourceRegistrationEntityId, nation, registrationNumber, sourceUpdatedAt, @@ -517,27 +692,88 @@ public class VehicleIdentityRepository { return registrationId; } - private void touchVehicle(UUID vehicleId, String sourceVehicleEntityId, String vin) { - if (sourceVehicleEntityId == null && vin == null) { + private void upsertSourceVehicleIdentity( + String tenantKey, + int eventSourceId, + String sourceVehicleEntityId, + UUID vehicleId, + OffsetDateTime sourceUpdatedAt, + Map payload + ) { + if (sourceVehicleEntityId == null || vehicleId == null) { + return; + } + jdbcTemplate.update( + """ + insert into eventhub.source_vehicle_identity( + id, tenant_key, event_source_id, source_vehicle_entity_id, + vehicle_id, source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?, ?::jsonb, now()) + on conflict (tenant_key, event_source_id, source_vehicle_entity_id) + do update set + vehicle_id = excluded.vehicle_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_vehicle_identity.source_updated_at), + payload = eventhub.source_vehicle_identity.payload || excluded.payload, + updated_at = now() + """, + UUID.randomUUID(), + tenantKey, + eventSourceId, + sourceVehicleEntityId, + vehicleId, + sourceUpdatedAt, + toJson(payload) + ); + } + + private void upsertSourceVehicleRegistrationIdentity( + String tenantKey, + int eventSourceId, + String sourceRegistrationEntityId, + UUID registrationId, + OffsetDateTime sourceUpdatedAt, + Map payload + ) { + if (sourceRegistrationEntityId == null || registrationId == null) { + return; + } + jdbcTemplate.update( + """ + insert into eventhub.source_vehicle_registration_identity( + id, tenant_key, event_source_id, source_registration_entity_id, + vehicle_registration_id, source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?, ?::jsonb, now()) + on conflict (tenant_key, event_source_id, source_registration_entity_id) + do update set + vehicle_registration_id = excluded.vehicle_registration_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_vehicle_registration_identity.source_updated_at), + payload = eventhub.source_vehicle_registration_identity.payload || excluded.payload, + updated_at = now() + """, + UUID.randomUUID(), + tenantKey, + eventSourceId, + sourceRegistrationEntityId, + registrationId, + sourceUpdatedAt, + toJson(payload) + ); + } + + private void touchVehicle(UUID vehicleId, String vin) { + if (vehicleId == null || vin == null) { return; } jdbcTemplate.update( """ update eventhub.vehicle - set source_vehicle_entity_id = coalesce(source_vehicle_entity_id, cast(? as text)), - vin = coalesce(vin, cast(? as text)), + set vin = coalesce(vin, cast(? as text)), updated_at = now() where id = ? - and ( - (source_vehicle_entity_id is null and cast(? as text) is not null) - or (vin is null and cast(? as text) is not null) - ) + and vin is null """, - sourceVehicleEntityId, vin, - vehicleId, - sourceVehicleEntityId, - vin + vehicleId ); } @@ -551,8 +787,8 @@ public class VehicleIdentityRepository { select gen_random_uuid(), relation.tenant_key, relation.event_source_id, - registration.id, - vehicle.id, + registration_identity.vehicle_registration_id, + vehicle_identity.vehicle_id, relation.valid_from, relation.valid_to, relation.source_updated_at, @@ -563,12 +799,14 @@ public class VehicleIdentityRepository { ), now() from eventhub.source_master_relation relation - join eventhub.vehicle_registration registration on registration.tenant_key = relation.tenant_key - and registration.event_source_id = relation.event_source_id - and registration.source_registration_entity_id = relation.from_source_entity_id - join eventhub.vehicle vehicle on vehicle.tenant_key = relation.tenant_key - and vehicle.event_source_id = relation.event_source_id - and vehicle.source_vehicle_entity_id = relation.to_source_entity_id + join eventhub.source_vehicle_registration_identity registration_identity + on registration_identity.tenant_key = relation.tenant_key + and registration_identity.event_source_id = relation.event_source_id + and registration_identity.source_registration_entity_id = relation.from_source_entity_id + join eventhub.source_vehicle_identity vehicle_identity + on vehicle_identity.tenant_key = relation.tenant_key + and vehicle_identity.event_source_id = relation.event_source_id + and vehicle_identity.source_vehicle_entity_id = relation.to_source_entity_id where relation.tenant_key = ? and relation.event_source_id in (select id from compatible_sources) and relation.relation_type = 'VEHICLE_REGISTRATION_VEHICLE' @@ -577,8 +815,10 @@ public class VehicleIdentityRepository { and not exists ( select 1 from eventhub.vehicle_registration_assignment existing - where existing.vehicle_registration_id = registration.id - and existing.vehicle_id = vehicle.id + where existing.tenant_key = relation.tenant_key + and existing.event_source_id = relation.event_source_id + and existing.vehicle_registration_id = registration_identity.vehicle_registration_id + and existing.vehicle_id = vehicle_identity.vehicle_id and existing.valid_from is not distinct from relation.valid_from and existing.valid_to is not distinct from relation.valid_to ) @@ -588,59 +828,30 @@ public class VehicleIdentityRepository { ); } - private void touchRegistration(UUID registrationId, String sourceRegistrationEntityId, String nation, String registrationNumber) { - if (sourceRegistrationEntityId == null && nation == null && registrationNumber == null) { + private void touchRegistration(UUID registrationId, String nation, String registrationNumber) { + if (registrationId == null || (nation == null && registrationNumber == null)) { return; } jdbcTemplate.update( """ update eventhub.vehicle_registration - set source_registration_entity_id = coalesce(source_registration_entity_id, cast(? as text)), - nation = coalesce(cast(? as text), nation), + set nation = coalesce(cast(? as text), nation), registration_number = coalesce(cast(? as text), registration_number), updated_at = now() where id = ? and ( - (source_registration_entity_id is null and cast(? as text) is not null) - or (nation is null and cast(? as text) is not null) + (nation is null and cast(? as text) is not null) or (registration_number is null and cast(? as text) is not null) ) """, - sourceRegistrationEntityId, nation, registrationNumber, registrationId, - sourceRegistrationEntityId, nation, registrationNumber ); } - private void updateRegistrationFromMasterData( - UUID registrationId, - String sourceRegistrationEntityId, - String nation, - String registrationNumber, - OffsetDateTime sourceUpdatedAt - ) { - jdbcTemplate.update( - """ - update eventhub.vehicle_registration - set source_registration_entity_id = coalesce(source_registration_entity_id, ?), - nation = coalesce(?, nation), - registration_number = coalesce(?, registration_number), - source_updated_at = ?, - updated_at = now() - where id = ? - """, - sourceRegistrationEntityId, - nation, - registrationNumber, - sourceUpdatedAt, - registrationId - ); - } - private String compatibleSourcesCte() { return """ with source_context as ( @@ -706,5 +917,4 @@ public class VehicleIdentityRepository { OffsetDateTime validTo ) { } - } diff --git a/src/main/resources/db/eventhub_schema_create.sql b/src/main/resources/db/eventhub_schema_create.sql index 0ab1530..adeb274 100644 --- a/src/main/resources/db/eventhub_schema_create.sql +++ b/src/main/resources/db/eventhub_schema_create.sql @@ -145,11 +145,56 @@ create table if not exists eventhub.source_master_relation ( constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) ); -create table if not exists eventhub.vehicle ( +create table if not exists eventhub.driver ( + id uuid primary key, + first_names text, + last_name text, + birth_date date, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table if not exists eventhub.driver_card ( + id uuid primary key, + driver_id uuid references eventhub.driver(id), + nation text not null, + card_number text not null, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table if not exists eventhub.source_driver_identity ( id uuid primary key, tenant_key text not null, event_source_id integer not null references eventhub.event_source(id), - source_vehicle_entity_id text, + source_driver_entity_id text not null, + driver_id uuid not null references eventhub.driver(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_driver_identity unique (tenant_key, event_source_id, source_driver_entity_id) +); + +create table if not exists eventhub.source_driver_card_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_driver_card_entity_id text not null, + driver_card_id uuid not null references eventhub.driver_card(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_driver_card_identity unique (tenant_key, event_source_id, source_driver_card_entity_id) +); + +create table if not exists eventhub.vehicle ( + id uuid primary key, vin text, created_at timestamptz not null default now(), updated_at timestamptz not null default now() @@ -157,9 +202,6 @@ create table if not exists eventhub.vehicle ( create table if not exists eventhub.vehicle_registration ( id uuid primary key, - tenant_key text not null, - event_source_id integer not null references eventhub.event_source(id), - source_registration_entity_id text, nation text not null, registration_number text not null, source_updated_at timestamptz, @@ -168,6 +210,32 @@ create table if not exists eventhub.vehicle_registration ( updated_at timestamptz not null default now() ); +create table if not exists eventhub.source_vehicle_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_vehicle_entity_id text not null, + vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_vehicle_identity unique (tenant_key, event_source_id, source_vehicle_entity_id) +); + +create table if not exists eventhub.source_vehicle_registration_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_registration_entity_id text not null, + vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_vehicle_registration_identity unique (tenant_key, event_source_id, source_registration_entity_id) +); + create table if not exists eventhub.vehicle_registration_assignment ( id uuid primary key, tenant_key text not null, @@ -188,7 +256,8 @@ create table if not exists eventhub.event ( event_source_id integer not null references eventhub.event_source(id), data_package_id uuid not null references eventhub.data_package(id), external_source_event_id text not null, - driver_entity_id uuid references eventhub.source_master_entity(id), + driver_id uuid references eventhub.driver(id), + driver_card_id uuid references eventhub.driver_card(id), vehicle_id uuid references eventhub.vehicle(id), vehicle_registration_id uuid references eventhub.vehicle_registration(id), source_package_id text, @@ -208,7 +277,8 @@ create table if not exists eventhub.event ( created_at timestamptz not null default now(), constraint pk_event primary key (occurred_at, id), constraint chk_event_driver_or_vehicle_ref check ( - driver_entity_id is not null + driver_id is not null + or driver_card_id is not null or vehicle_id is not null or vehicle_registration_id is not null ) @@ -276,23 +346,12 @@ create index if not exists idx_source_master_relation_to create index if not exists idx_source_master_relation_payload_gin on eventhub.source_master_relation using gin(payload); -create index if not exists idx_vehicle_lookup_ctx - on eventhub.vehicle(tenant_key, event_source_id, updated_at desc); - -create index if not exists idx_vehicle_source_entity - on eventhub.vehicle(tenant_key, event_source_id, source_vehicle_entity_id) - where source_vehicle_entity_id is not null; - create index if not exists idx_vehicle_vin - on eventhub.vehicle(tenant_key, event_source_id, vin) + on eventhub.vehicle(vin) where vin is not null; -create index if not exists idx_vehicle_registration_source_entity - on eventhub.vehicle_registration(tenant_key, event_source_id, source_registration_entity_id) - where source_registration_entity_id is not null; - create index if not exists idx_vehicle_registration_plate - on eventhub.vehicle_registration(tenant_key, event_source_id, nation, registration_number); + on eventhub.vehicle_registration(nation, registration_number); create index if not exists idx_vehicle_registration_assignment_registration_time on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to); @@ -320,9 +379,42 @@ create index if not exists idx_event_source_package_id create index if not exists idx_event_domain_type_time on eventhub.event(event_domain, event_type, occurred_at desc); +create index if not exists idx_driver_card_key + on eventhub.driver_card(nation, card_number); + +create index if not exists idx_driver_card_driver + on eventhub.driver_card(driver_id) + where driver_id is not null; + +create unique index if not exists ux_driver_card_key + on eventhub.driver_card(nation, card_number); + +create unique index if not exists ux_vehicle_vin + on eventhub.vehicle(vin) + where vin is not null; + +create unique index if not exists ux_vehicle_registration_plate + on eventhub.vehicle_registration(nation, registration_number); + +create index if not exists idx_source_driver_identity_driver + on eventhub.source_driver_identity(driver_id); + +create index if not exists idx_source_driver_card_identity_card + on eventhub.source_driver_card_identity(driver_card_id); + +create index if not exists idx_source_vehicle_identity_vehicle + on eventhub.source_vehicle_identity(vehicle_id); + +create index if not exists idx_source_vehicle_registration_identity_registration + on eventhub.source_vehicle_registration_identity(vehicle_registration_id); + create index if not exists idx_event_driver_time - on eventhub.event(driver_entity_id, occurred_at desc) - where driver_entity_id is not null; + on eventhub.event(driver_id, occurred_at desc) + where driver_id is not null; + +create index if not exists idx_event_driver_card_time + on eventhub.event(driver_card_id, occurred_at desc) + where driver_card_id is not null; create index if not exists idx_event_vehicle_time on eventhub.event(vehicle_id, occurred_at desc) @@ -344,3 +436,19 @@ create index if not exists idx_event_detail_type create index if not exists idx_event_detail_attributes_gin on eventhub.event_detail using gin(attributes); + +create index if not exists idx_event_detail_yellowfox_slot + on eventhub.event_detail(detail_type, (attributes ->> 'slot'), event_occurred_at) + where detail_type in ('DRIVER_ACTIVITY', 'DRIVER_CARD'); + +create index if not exists idx_event_detail_yellowfox_eventtype_state + on eventhub.event_detail( + (attributes ->> 'yellowFoxEventType'), + (attributes ->> 'yellowFoxState'), + event_occurred_at + ) + where attributes ? 'yellowFoxEventType'; + +create index if not exists idx_event_detail_yellowfox_ignition + on eventhub.event_detail(detail_type, (attributes ->> 'ignitionState'), event_occurred_at) + where attributes ? 'ignitionState'; diff --git a/src/main/resources/db/migration/V10__introduce_driver_card_model.sql b/src/main/resources/db/migration/V10__introduce_driver_card_model.sql new file mode 100644 index 0000000..e9fb6aa --- /dev/null +++ b/src/main/resources/db/migration/V10__introduce_driver_card_model.sql @@ -0,0 +1,363 @@ +create table if not exists eventhub.driver_card ( + id uuid primary key, + driver_id uuid references eventhub.driver(id), + nation text not null, + card_number text not null, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table if not exists eventhub.source_driver_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_driver_entity_id text not null, + driver_id uuid not null references eventhub.driver(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_driver_identity unique (tenant_key, event_source_id, source_driver_entity_id) +); + +create table if not exists eventhub.source_driver_card_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_driver_card_entity_id text not null, + driver_card_id uuid not null references eventhub.driver_card(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_driver_card_identity unique (tenant_key, event_source_id, source_driver_card_entity_id) +); + +create table if not exists eventhub.source_vehicle_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_vehicle_entity_id text not null, + vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_vehicle_identity unique (tenant_key, event_source_id, source_vehicle_entity_id) +); + +create table if not exists eventhub.source_vehicle_registration_identity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_registration_entity_id text not null, + vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_vehicle_registration_identity unique (tenant_key, event_source_id, source_registration_entity_id) +); + +alter table eventhub.event + add column if not exists driver_card_id uuid references eventhub.driver_card(id); + +insert into eventhub.source_driver_identity( + id, tenant_key, event_source_id, source_driver_entity_id, + driver_id, source_updated_at, payload, created_at, updated_at +) +select gen_random_uuid(), + driver.tenant_key, + driver.event_source_id, + driver.source_driver_entity_id, + driver.id, + driver.source_updated_at, + driver.payload, + coalesce(driver.created_at, now()), + coalesce(driver.updated_at, now()) +from eventhub.driver driver +where exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'driver' + and column_name = 'source_driver_entity_id' +) + and driver.source_driver_entity_id is not null +on conflict (tenant_key, event_source_id, source_driver_entity_id) +do update set + driver_id = excluded.driver_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_identity.source_updated_at), + payload = eventhub.source_driver_identity.payload || excluded.payload, + updated_at = now(); + +with legacy_driver_cards as ( + select distinct on (driver.card_nation, driver.card_number) + driver.id as driver_id, + driver.card_nation as nation, + driver.card_number, + driver.source_updated_at, + driver.payload, + driver.created_at, + driver.updated_at + from eventhub.driver driver + where exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'driver' + and column_name = 'card_nation' + ) + and driver.card_nation is not null + and driver.card_number is not null + order by driver.card_nation, + driver.card_number, + case when driver.source_driver_entity_id is null then 1 else 0 end, + driver.updated_at desc, + driver.created_at desc, + driver.id +), +existing_driver_cards as ( + select distinct on (card.nation, card.card_number) + card.id, + card.nation, + card.card_number + from eventhub.driver_card card + order by card.nation, + card.card_number, + case when card.driver_id is null then 1 else 0 end, + card.updated_at desc, + card.created_at desc, + card.id +), +resolved_cards as ( + select legacy.*, + coalesce(existing.id, gen_random_uuid()) as driver_card_id + from legacy_driver_cards legacy + left join existing_driver_cards existing + on existing.nation = legacy.nation + and existing.card_number = legacy.card_number +), +inserted_cards as ( + insert into eventhub.driver_card( + id, driver_id, nation, card_number, source_updated_at, payload, created_at, updated_at + ) + select distinct on (resolved.driver_card_id) + resolved.driver_card_id, + resolved.driver_id, + resolved.nation, + resolved.card_number, + resolved.source_updated_at, + jsonb_build_object('migrated_from', 'eventhub.driver') || coalesce(resolved.payload, '{}'::jsonb), + resolved.created_at, + resolved.updated_at + from resolved_cards resolved + where not exists ( + select 1 + from eventhub.driver_card existing + where existing.id = resolved.driver_card_id + ) + returning id +), +updated_cards as ( + update eventhub.driver_card card + set driver_id = coalesce(card.driver_id, resolved.driver_id), + source_updated_at = coalesce(resolved.source_updated_at, card.source_updated_at), + payload = card.payload || jsonb_build_object('migrated_from', 'eventhub.driver') || coalesce(resolved.payload, '{}'::jsonb), + updated_at = now() + from resolved_cards resolved + where card.id = resolved.driver_card_id + returning card.id +) +select count(*) from inserted_cards +union all +select count(*) from updated_cards; + +with single_card_per_driver as ( + select driver_id, + min(id::text)::uuid as driver_card_id + from eventhub.driver_card + where driver_id is not null + group by driver_id + having count(*) = 1 +) +update eventhub.event event +set driver_card_id = card.driver_card_id +from single_card_per_driver card +where event.driver_id = card.driver_id + and event.driver_card_id is null; + +with ranked_driver_cards as ( + select card.id, + card.driver_id, + card.source_updated_at, + first_value(card.id) over ( + partition by card.nation, card.card_number + order by case when card.driver_id is null then 1 else 0 end, + card.updated_at desc, + card.created_at desc, + card.id + ) as canonical_id, + row_number() over ( + partition by card.nation, card.card_number + order by case when card.driver_id is null then 1 else 0 end, + card.updated_at desc, + card.created_at desc, + card.id + ) as duplicate_rank + from eventhub.driver_card card +), +duplicate_driver_cards as ( + select id, canonical_id, driver_id, source_updated_at + from ranked_driver_cards + where duplicate_rank > 1 +), +duplicate_card_rollup as ( + select duplicate.canonical_id, + (array_agg(duplicate.driver_id order by case when duplicate.driver_id is null then 1 else 0 end, duplicate.id))[1] as preferred_driver_id, + max(duplicate.source_updated_at) as latest_source_updated_at + from duplicate_driver_cards duplicate + group by duplicate.canonical_id +), +updated_canonical_cards as ( + update eventhub.driver_card card + set driver_id = coalesce(card.driver_id, rollup.preferred_driver_id), + source_updated_at = case + when card.source_updated_at is null then rollup.latest_source_updated_at + when rollup.latest_source_updated_at is null then card.source_updated_at + else greatest(card.source_updated_at, rollup.latest_source_updated_at) + end, + updated_at = now() + from duplicate_card_rollup rollup + where card.id = rollup.canonical_id + returning card.id +), +relinked_source_driver_card_identity as ( + update eventhub.source_driver_card_identity identity + set driver_card_id = duplicate.canonical_id, + updated_at = now() + from duplicate_driver_cards duplicate + where identity.driver_card_id = duplicate.id + returning identity.id +), +relinked_events as ( + update eventhub.event event + set driver_card_id = duplicate.canonical_id + from duplicate_driver_cards duplicate + where event.driver_card_id = duplicate.id + returning event.id +), +deleted_duplicate_driver_cards as ( + delete from eventhub.driver_card card + using duplicate_driver_cards duplicate + where card.id = duplicate.id + returning card.id +) +select (select count(*) from updated_canonical_cards) as updated_canonical_cards, + (select count(*) from relinked_source_driver_card_identity) as relinked_source_driver_card_identity, + (select count(*) from relinked_events) as relinked_events, + (select count(*) from deleted_duplicate_driver_cards) as deleted_duplicate_driver_cards; + +insert into eventhub.source_vehicle_identity( + id, tenant_key, event_source_id, source_vehicle_entity_id, + vehicle_id, source_updated_at, payload, created_at, updated_at +) +select gen_random_uuid(), + vehicle.tenant_key, + vehicle.event_source_id, + vehicle.source_vehicle_entity_id, + vehicle.id, + null, + '{}'::jsonb, + coalesce(vehicle.created_at, now()), + coalesce(vehicle.updated_at, now()) +from eventhub.vehicle vehicle +where exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'vehicle' + and column_name = 'source_vehicle_entity_id' +) + and vehicle.source_vehicle_entity_id is not null +on conflict (tenant_key, event_source_id, source_vehicle_entity_id) +do update set + vehicle_id = excluded.vehicle_id, + updated_at = now(); + +insert into eventhub.source_vehicle_registration_identity( + id, tenant_key, event_source_id, source_registration_entity_id, + vehicle_registration_id, source_updated_at, payload, created_at, updated_at +) +select gen_random_uuid(), + registration.tenant_key, + registration.event_source_id, + registration.source_registration_entity_id, + registration.id, + registration.source_updated_at, + registration.payload, + coalesce(registration.created_at, now()), + coalesce(registration.updated_at, now()) +from eventhub.vehicle_registration registration +where exists ( + select 1 + from information_schema.columns + where table_schema = 'eventhub' + and table_name = 'vehicle_registration' + and column_name = 'source_registration_entity_id' +) + and registration.source_registration_entity_id is not null +on conflict (tenant_key, event_source_id, source_registration_entity_id) +do update set + vehicle_registration_id = excluded.vehicle_registration_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_vehicle_registration_identity.source_updated_at), + payload = eventhub.source_vehicle_registration_identity.payload || excluded.payload, + updated_at = now(); + +create index if not exists idx_driver_card_key + on eventhub.driver_card(nation, card_number); + +create index if not exists idx_driver_card_driver + on eventhub.driver_card(driver_id) + where driver_id is not null; + +create unique index if not exists ux_driver_card_key + on eventhub.driver_card(nation, card_number); + +create unique index if not exists ux_vehicle_vin + on eventhub.vehicle(vin) + where vin is not null; + +create unique index if not exists ux_vehicle_registration_plate + on eventhub.vehicle_registration(nation, registration_number); + +create index if not exists idx_source_driver_identity_driver + on eventhub.source_driver_identity(driver_id); + +create index if not exists idx_source_driver_card_identity_card + on eventhub.source_driver_card_identity(driver_card_id); + +create index if not exists idx_source_vehicle_identity_vehicle + on eventhub.source_vehicle_identity(vehicle_id); + +create index if not exists idx_source_vehicle_registration_identity_registration + on eventhub.source_vehicle_registration_identity(vehicle_registration_id); + +create index if not exists idx_event_driver_card_time + on eventhub.event(driver_card_id, occurred_at desc) + where driver_card_id is not null; + +alter table eventhub.event + drop constraint if exists chk_event_driver_or_vehicle_ref; + +alter table eventhub.event + add constraint chk_event_driver_or_vehicle_ref + check ( + driver_id is not null + or driver_card_id is not null + or driver_entity_id is not null + or vehicle_id is not null + or vehicle_registration_id is not null + ); diff --git a/src/main/resources/sql/maintenance/repair-tachograph-driver-entities.sql b/src/main/resources/sql/maintenance/repair-tachograph-driver-entities.sql index 6eadb75..3cc1214 100644 --- a/src/main/resources/sql/maintenance/repair-tachograph-driver-entities.sql +++ b/src/main/resources/sql/maintenance/repair-tachograph-driver-entities.sql @@ -1,19 +1,22 @@ /* - * Repairs and normalizes tachograph driver aggregates after introducing eventhub.driver. + * Repairs and normalizes tachograph driver identities after introducing: + * - global eventhub.driver + * - global eventhub.driver_card + * - source_driver_identity + * - source_driver_card_identity * * What it does: - * 1. Ensures tachograph DRIVER master-data payload carries last_name while keeping source_master_entity.display_name unchanged. - * 2. Upserts eventhub.driver rows from MASTER_DATA DRIVER entities. - * 3. Projects card nation/number onto eventhub.driver from DRIVER_CARD_DRIVER relations. - * 4. Remaps event.driver_id from provisional card-only drivers to proper source-driver aggregates when possible. - * 5. Deletes now-unreferenced provisional tachograph driver rows with no source_driver_entity_id. - * - * Assumptions: - * - Tachograph master-data source is provider_key=TACHOGRAPH, source_kind=MASTER_DATA, source_key=TACHOGRAPH_MASTER_DATA. - * - eventhub.driver and event.driver_id already exist. + * 1. Ensures tachograph DRIVER master-data payload carries last_name while + * keeping source_master_entity.display_name unchanged. + * 2. Upserts global eventhub.driver rows from tachograph DRIVER entities. + * 3. Upserts global eventhub.driver_card rows from tachograph DRIVER_CARD entities. + * 4. Upserts source identity links for drivers and cards. + * 5. Links cards to drivers using DRIVER_CARD_DRIVER master-data relations. + * 6. Backfills event.driver_card_id where a driver has exactly one card. + * 7. Remaps event.driver_id from provisional rows to the linked canonical driver. + * 8. Deletes now-unreferenced provisional driver rows. */ --- 1) Keep display_name, but ensure DRIVER payload has last_name. with master_sources as ( select es.id, es.tenant_key from eventhub.event_source es @@ -40,22 +43,16 @@ updated_master_payload as ( select count(*) as updated_master_payload from updated_master_payload; --- 2) Upsert driver aggregates from tachograph master data. with master_sources as ( - select es.id, - es.tenant_key, - es.source_instance_key, - coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key + select es.id as master_event_source_id, es.tenant_key from eventhub.event_source es where es.provider_key = 'TACHOGRAPH' and es.source_kind = 'MASTER_DATA' and es.source_key = 'TACHOGRAPH_MASTER_DATA' ), master_drivers as ( - select ms.id as master_event_source_id, + select ms.master_event_source_id, ms.tenant_key, - ms.source_instance_key, - ms.tenant_provider_setting_key, d.source_entity_id as source_driver_entity_id, coalesce(nullif(trim(d.payload ->> 'first_names'), ''), nullif(trim(d.payload ->> 'firstnames'), '')) as first_names, coalesce(nullif(trim(d.payload ->> 'last_name'), ''), nullif(trim(d.payload ->> 'surname'), '')) as last_name, @@ -65,174 +62,298 @@ master_drivers as ( from master_sources ms join eventhub.source_master_entity d on d.tenant_key = ms.tenant_key - and d.event_source_id = ms.id + and d.event_source_id = ms.master_event_source_id and d.entity_type = 'DRIVER' - and d.source_entity_id not like 'DRIVER_CARD:%' ), -compatible_targets as ( +resolved_drivers as ( select md.*, - es.id as target_event_source_id + coalesce(identity.driver_id, gen_random_uuid()) as driver_id from master_drivers md - join eventhub.event_source es - on es.tenant_key = md.tenant_key - and es.provider_key = 'TACHOGRAPH' - and es.source_instance_key = md.source_instance_key - and coalesce(es.tenant_provider_setting_key, '') = md.tenant_provider_setting_key -), -updated_drivers as ( - update eventhub.driver driver - set first_names = coalesce(ct.first_names, driver.first_names), - last_name = coalesce(ct.last_name, driver.last_name), - birth_date = coalesce(ct.birth_date, driver.birth_date), - source_updated_at = ct.source_updated_at, - payload = driver.payload || ct.payload, - updated_at = now() - from compatible_targets ct - where driver.tenant_key = ct.tenant_key - and driver.event_source_id = ct.target_event_source_id - and driver.source_driver_entity_id = ct.source_driver_entity_id - returning driver.id + left join eventhub.source_driver_identity identity + on identity.tenant_key = md.tenant_key + and identity.event_source_id = md.master_event_source_id + and identity.source_driver_entity_id = md.source_driver_entity_id ), inserted_drivers as ( insert into eventhub.driver( - id, tenant_key, event_source_id, source_driver_entity_id, - first_names, last_name, birth_date, source_updated_at, payload, updated_at + id, first_names, last_name, birth_date, source_updated_at, payload, updated_at ) - select gen_random_uuid(), - ct.tenant_key, - ct.target_event_source_id, - ct.source_driver_entity_id, - ct.first_names, - ct.last_name, - ct.birth_date, - ct.source_updated_at, - ct.payload, + select distinct on (rd.driver_id) + rd.driver_id, + rd.first_names, + rd.last_name, + rd.birth_date, + rd.source_updated_at, + rd.payload, now() - from compatible_targets ct + from resolved_drivers rd where not exists ( select 1 from eventhub.driver existing - where existing.tenant_key = ct.tenant_key - and existing.event_source_id = ct.target_event_source_id - and existing.source_driver_entity_id = ct.source_driver_entity_id + where existing.id = rd.driver_id ) returning id +), +updated_drivers as ( + update eventhub.driver driver + set first_names = coalesce(rd.first_names, driver.first_names), + last_name = coalesce(rd.last_name, driver.last_name), + birth_date = coalesce(rd.birth_date, driver.birth_date), + source_updated_at = coalesce(rd.source_updated_at, driver.source_updated_at), + payload = driver.payload || rd.payload, + updated_at = now() + from resolved_drivers rd + where driver.id = rd.driver_id + returning driver.id +), +upserted_source_driver_identity as ( + insert into eventhub.source_driver_identity( + id, tenant_key, event_source_id, source_driver_entity_id, + driver_id, source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + rd.tenant_key, + rd.master_event_source_id, + rd.source_driver_entity_id, + rd.driver_id, + rd.source_updated_at, + rd.payload, + now() + from resolved_drivers rd + on conflict (tenant_key, event_source_id, source_driver_entity_id) + do update set + driver_id = excluded.driver_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_identity.source_updated_at), + payload = eventhub.source_driver_identity.payload || excluded.payload, + updated_at = now() + returning id ) -select (select count(*) from updated_drivers) as updated_drivers, - (select count(*) from inserted_drivers) as inserted_drivers; +select (select count(*) from inserted_drivers) as inserted_drivers, + (select count(*) from updated_drivers) as updated_drivers, + (select count(*) from upserted_source_driver_identity) as upserted_source_driver_identity; --- 3) Project driver-card identifiers from master-data relations. with master_sources as ( - select es.id, - es.tenant_key, - es.source_instance_key, - coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key + select es.id as master_event_source_id, es.tenant_key from eventhub.event_source es where es.provider_key = 'TACHOGRAPH' and es.source_kind = 'MASTER_DATA' and es.source_key = 'TACHOGRAPH_MASTER_DATA' ), -card_projection as ( - select distinct on (ms.tenant_key, ms.source_instance_key, ms.tenant_provider_setting_key, rel.to_source_entity_id) +master_cards as ( + select ms.master_event_source_id, ms.tenant_key, - ms.source_instance_key, - ms.tenant_provider_setting_key, - rel.to_source_entity_id as source_driver_entity_id, + card.source_entity_id as source_driver_card_entity_id, nullif(trim(card.payload ->> 'card_nation'), '') as card_nation, nullif(trim(card.payload ->> 'card_number'), '') as card_number, - rel.source_updated_at + card.source_updated_at, + card.payload from master_sources ms - join eventhub.source_master_relation rel - on rel.tenant_key = ms.tenant_key - and rel.event_source_id = ms.id - and rel.relation_type = 'DRIVER_CARD_DRIVER' - and rel.from_entity_type = 'DRIVER_CARD' - and rel.to_entity_type = 'DRIVER' join eventhub.source_master_entity card on card.tenant_key = ms.tenant_key - and card.event_source_id = ms.id + and card.event_source_id = ms.master_event_source_id and card.entity_type = 'DRIVER_CARD' - and card.source_entity_id = rel.from_source_entity_id - order by ms.tenant_key, - ms.source_instance_key, - ms.tenant_provider_setting_key, - rel.to_source_entity_id, - rel.valid_to desc nulls last, - rel.valid_from desc nulls last, - rel.updated_at desc + and nullif(trim(card.payload ->> 'card_nation'), '') is not null + and nullif(trim(card.payload ->> 'card_number'), '') is not null ), -updated_driver_cards as ( - update eventhub.driver driver - set card_nation = coalesce(driver.card_nation, projection.card_nation), - card_number = coalesce(driver.card_number, projection.card_number), - source_updated_at = coalesce(projection.source_updated_at, driver.source_updated_at), +canonical_driver_cards as ( + select distinct on (mc.card_nation, mc.card_number) + mc.card_nation, + mc.card_number, + mc.source_updated_at, + mc.payload + from master_cards mc + order by mc.card_nation, + mc.card_number, + case when mc.source_driver_card_entity_id is null then 1 else 0 end, + mc.source_updated_at desc, + mc.source_driver_card_entity_id +), +existing_driver_cards as ( + select distinct on (card.nation, card.card_number) + card.id, + card.nation, + card.card_number + from eventhub.driver_card card + order by card.nation, + card.card_number, + case when card.driver_id is null then 1 else 0 end, + card.updated_at desc, + card.created_at desc, + card.id +), +resolved_cards as ( + select canonical.card_nation, + canonical.card_number, + canonical.source_updated_at, + canonical.payload, + coalesce(existing.id, gen_random_uuid()) as driver_card_id + from canonical_driver_cards canonical + left join existing_driver_cards existing + on existing.nation = canonical.card_nation + and existing.card_number = canonical.card_number +), +inserted_cards as ( + insert into eventhub.driver_card( + id, driver_id, nation, card_number, source_updated_at, payload, updated_at + ) + select distinct on (rc.driver_card_id) + rc.driver_card_id, + null, + rc.card_nation, + rc.card_number, + rc.source_updated_at, + rc.payload, + now() + from resolved_cards rc + where not exists ( + select 1 + from eventhub.driver_card existing + where existing.id = rc.driver_card_id + ) + returning id +), +updated_cards as ( + update eventhub.driver_card card + set source_updated_at = coalesce(rc.source_updated_at, card.source_updated_at), + payload = card.payload || rc.payload, updated_at = now() - from card_projection projection - join eventhub.event_source es - on es.id = driver.event_source_id - where driver.tenant_key = projection.tenant_key - and es.provider_key = 'TACHOGRAPH' - and es.source_instance_key = projection.source_instance_key - and coalesce(es.tenant_provider_setting_key, '') = projection.tenant_provider_setting_key - and driver.source_driver_entity_id = projection.source_driver_entity_id - and ( - (driver.card_nation is null and projection.card_nation is not null) - or (driver.card_number is null and projection.card_number is not null) - ) - returning driver.id + from resolved_cards rc + where card.id = rc.driver_card_id + returning card.id +), +upserted_source_driver_card_identity as ( + insert into eventhub.source_driver_card_identity( + id, tenant_key, event_source_id, source_driver_card_entity_id, + driver_card_id, source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + mc.tenant_key, + mc.master_event_source_id, + mc.source_driver_card_entity_id, + coalesce(identity.driver_card_id, existing.id), + mc.source_updated_at, + mc.payload, + now() + from master_cards mc + left join eventhub.source_driver_card_identity identity + on identity.tenant_key = mc.tenant_key + and identity.event_source_id = mc.master_event_source_id + and identity.source_driver_card_entity_id = mc.source_driver_card_entity_id + left join existing_driver_cards existing + on existing.nation = mc.card_nation + and existing.card_number = mc.card_number + where mc.source_driver_card_entity_id is not null + and coalesce(identity.driver_card_id, existing.id) is not null + on conflict (tenant_key, event_source_id, source_driver_card_entity_id) + do update set + driver_card_id = excluded.driver_card_id, + source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_card_identity.source_updated_at), + payload = eventhub.source_driver_card_identity.payload || excluded.payload, + updated_at = now() + returning id ) -select count(*) as updated_driver_cards -from updated_driver_cards; +select (select count(*) from inserted_cards) as inserted_cards, + (select count(*) from updated_cards) as updated_cards, + (select count(*) from upserted_source_driver_card_identity) as upserted_source_driver_card_identity; --- 4) Remap events from provisional card-only drivers to proper source-driver aggregates. -with provisional_to_real as ( - select provisional.id as provisional_driver_id, - real.id as real_driver_id - from eventhub.driver provisional - join eventhub.event_source provisional_source - on provisional_source.id = provisional.event_source_id - and provisional_source.provider_key = 'TACHOGRAPH' - join eventhub.driver real - on real.tenant_key = provisional.tenant_key - and real.source_driver_entity_id is not null - and real.card_nation = provisional.card_nation - and real.card_number = provisional.card_number - join eventhub.event_source real_source - on real_source.id = real.event_source_id - and real_source.provider_key = provisional_source.provider_key - and real_source.tenant_key = provisional_source.tenant_key - and real_source.source_instance_key = provisional_source.source_instance_key - and coalesce(real_source.tenant_provider_setting_key, '') = coalesce(provisional_source.tenant_provider_setting_key, '') - where provisional.source_driver_entity_id is null - and provisional.card_nation is not null - and provisional.card_number is not null - and provisional.id <> real.id +with master_sources as ( + select es.id as master_event_source_id, es.tenant_key + from eventhub.event_source es + where es.provider_key = 'TACHOGRAPH' + and es.source_kind = 'MASTER_DATA' + and es.source_key = 'TACHOGRAPH_MASTER_DATA' +), +updated_card_links as ( + update eventhub.driver_card card + set driver_id = driver_identity.driver_id, + source_updated_at = coalesce(rel.source_updated_at, card.source_updated_at), + updated_at = now() + from master_sources ms + join eventhub.source_master_relation rel + on rel.tenant_key = ms.tenant_key + and rel.event_source_id = ms.master_event_source_id + and rel.relation_type = 'DRIVER_CARD_DRIVER' + and rel.from_entity_type = 'DRIVER_CARD' + and rel.to_entity_type = 'DRIVER' + join eventhub.source_driver_card_identity card_identity + on card_identity.tenant_key = rel.tenant_key + and card_identity.event_source_id = rel.event_source_id + and card_identity.source_driver_card_entity_id = rel.from_source_entity_id + join eventhub.source_driver_identity driver_identity + on driver_identity.tenant_key = rel.tenant_key + and driver_identity.event_source_id = rel.event_source_id + and driver_identity.source_driver_entity_id = rel.to_source_entity_id + where card.id = card_identity.driver_card_id + and ( + card.driver_id is null + or card.driver_id = driver_identity.driver_id + or not exists ( + select 1 + from eventhub.source_driver_identity existing_identity + where existing_identity.driver_id = card.driver_id + ) + ) + returning card.id +) +select count(*) as updated_card_links +from updated_card_links; + +with single_card_per_driver as ( + select driver_id, + min(id::text)::uuid as driver_card_id + from eventhub.driver_card + where driver_id is not null + group by driver_id + having count(*) = 1 ), updated_events as ( - update eventhub.event e - set driver_id = map.real_driver_id - from provisional_to_real map - where e.driver_id = map.provisional_driver_id - and e.driver_id <> map.real_driver_id - returning e.id + update eventhub.event event + set driver_card_id = card.driver_card_id + from single_card_per_driver card + where event.driver_id = card.driver_id + and event.driver_card_id is null + returning event.id ) -select count(*) as remapped_events +select count(*) as backfilled_event_driver_cards from updated_events; --- 5) Delete now-unreferenced provisional tachograph driver rows. +with remapped_events as ( + update eventhub.event event + set driver_id = card.driver_id + from eventhub.driver_card card + where event.driver_card_id = card.id + and card.driver_id is not null + and ( + event.driver_id is null + or not exists ( + select 1 + from eventhub.source_driver_identity source_identity + where source_identity.driver_id = event.driver_id + ) + ) + and event.driver_id is distinct from card.driver_id + returning event.id +) +select count(*) as remapped_events +from remapped_events; + with deleted_drivers as ( delete from eventhub.driver driver - using eventhub.event_source es - where es.id = driver.event_source_id - and es.provider_key = 'TACHOGRAPH' - and driver.source_driver_entity_id is null - and driver.card_nation is not null - and driver.card_number is not null + where not exists ( + select 1 + from eventhub.event event + where event.driver_id = driver.id + ) and not exists ( - select 1 - from eventhub.event e - where e.driver_id = driver.id - ) + select 1 + from eventhub.driver_card card + where card.driver_id = driver.id + ) + and not exists ( + select 1 + from eventhub.source_driver_identity source_identity + where source_identity.driver_id = driver.id + ) returning driver.id ) select count(*) as deleted_provisional_drivers diff --git a/src/main/resources/sql/maintenance/restore-driver-card-columns.sql b/src/main/resources/sql/maintenance/restore-driver-card-columns.sql new file mode 100644 index 0000000..bf68ffa --- /dev/null +++ b/src/main/resources/sql/maintenance/restore-driver-card-columns.sql @@ -0,0 +1,110 @@ +/* + * Restores legacy columns on eventhub.driver: + * - card_nation + * - card_number + * + * The script is conservative: + * - it recreates the columns and the old check constraint + * - it backfills values only when a driver has exactly one distinct card + * recoverable from: + * 1. eventhub.driver_card.driver_id + * 2. eventhub.event.driver_id + event.driver_card_id + * + * If a driver is associated with multiple distinct cards, the columns remain null. + */ + +alter table eventhub.driver + add column if not exists card_nation text; + +alter table eventhub.driver + add column if not exists card_number text; + +do $restore$ +begin + if not exists ( + select 1 + from information_schema.table_constraints + where constraint_schema = 'eventhub' + and table_name = 'driver' + and constraint_name = 'chk_driver_card_nation_when_number' + ) then + alter table eventhub.driver + add constraint chk_driver_card_nation_when_number + check (card_number is null or card_nation is not null); + end if; +end +$restore$; + +with direct_driver_cards as ( + select driver.id as driver_id, + card.nation, + card.card_number + from eventhub.driver driver + join eventhub.driver_card card + on card.driver_id = driver.id +), +event_driver_cards as ( + select distinct event.driver_id, + card.nation, + card.card_number + from eventhub.event event + join eventhub.driver_card card + on card.id = event.driver_card_id + where event.driver_id is not null + and event.driver_card_id is not null +), +all_driver_cards as ( + select driver_id, nation, card_number + from direct_driver_cards + union + select driver_id, nation, card_number + from event_driver_cards +), +single_card_per_driver as ( + select driver_id, + max(nation) as card_nation, + max(card_number) as card_number + from all_driver_cards + group by driver_id + having count(*) = 1 +), +updated_drivers as ( + update eventhub.driver driver + set card_nation = single.card_nation, + card_number = single.card_number, + updated_at = now() + from single_card_per_driver single + where driver.id = single.driver_id + and ( + driver.card_nation is distinct from single.card_nation + or driver.card_number is distinct from single.card_number + ) + returning driver.id +) +select count(*) as restored_driver_card_columns +from updated_drivers; + +with ambiguous_drivers as ( + select driver_id, + count(*) as distinct_card_count + from ( + select distinct driver_id, nation, card_number + from ( + select driver.id as driver_id, card.nation, card.card_number + from eventhub.driver driver + join eventhub.driver_card card + on card.driver_id = driver.id + union all + select distinct event.driver_id, card.nation, card.card_number + from eventhub.event event + join eventhub.driver_card card + on card.id = event.driver_card_id + where event.driver_id is not null + and event.driver_card_id is not null + ) cards + ) distinct_cards + group by driver_id + having count(*) > 1 +) +select count(*) as ambiguous_driver_count +from ambiguous_drivers;