/* * Repairs and normalizes tachograph driver aggregates after introducing eventhub.driver. * * What it does: * 1. Ensures tachograph DRIVER master-data payload carries last_name while keeping source_master_entity.display_name unchanged. * 2. Upserts eventhub.driver rows from MASTER_DATA DRIVER entities. * 3. Projects card nation/number onto eventhub.driver from DRIVER_CARD_DRIVER relations. * 4. Remaps event.driver_id from provisional card-only drivers to proper source-driver aggregates when possible. * 5. Deletes now-unreferenced provisional tachograph driver rows with no source_driver_entity_id. * * Assumptions: * - Tachograph master-data source is provider_key=TACHOGRAPH, source_kind=MASTER_DATA, source_key=TACHOGRAPH_MASTER_DATA. * - eventhub.driver and event.driver_id already exist. */ -- 1) Keep display_name, but ensure DRIVER payload has last_name. with master_sources as ( select es.id, es.tenant_key from eventhub.event_source es where es.provider_key = 'TACHOGRAPH' and es.source_kind = 'MASTER_DATA' and es.source_key = 'TACHOGRAPH_MASTER_DATA' ), updated_master_payload as ( update eventhub.source_master_entity sme set payload = jsonb_strip_nulls( sme.payload || jsonb_build_object( 'first_names', coalesce(sme.payload ->> 'first_names', sme.payload ->> 'firstnames'), 'last_name', coalesce(sme.payload ->> 'last_name', sme.payload ->> 'surname') ) ), updated_at = now() from master_sources ms where sme.tenant_key = ms.tenant_key and sme.event_source_id = ms.id and sme.entity_type = 'DRIVER' returning sme.id ) select count(*) as updated_master_payload from updated_master_payload; -- 2) Upsert driver aggregates from tachograph master data. with master_sources as ( select es.id, es.tenant_key, es.source_instance_key, coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key from eventhub.event_source es where es.provider_key = 'TACHOGRAPH' and es.source_kind = 'MASTER_DATA' and es.source_key = 'TACHOGRAPH_MASTER_DATA' ), master_drivers as ( select ms.id as master_event_source_id, ms.tenant_key, ms.source_instance_key, ms.tenant_provider_setting_key, d.source_entity_id as source_driver_entity_id, coalesce(nullif(trim(d.payload ->> 'first_names'), ''), nullif(trim(d.payload ->> 'firstnames'), '')) as first_names, coalesce(nullif(trim(d.payload ->> 'last_name'), ''), nullif(trim(d.payload ->> 'surname'), '')) as last_name, cast(nullif(trim(d.payload ->> 'birth_date'), '') as date) as birth_date, d.source_updated_at, d.payload from master_sources ms join eventhub.source_master_entity d on d.tenant_key = ms.tenant_key and d.event_source_id = ms.id and d.entity_type = 'DRIVER' and d.source_entity_id not like 'DRIVER_CARD:%' ), compatible_targets as ( select md.*, es.id as target_event_source_id from master_drivers md join eventhub.event_source es on es.tenant_key = md.tenant_key and es.provider_key = 'TACHOGRAPH' and es.source_instance_key = md.source_instance_key and coalesce(es.tenant_provider_setting_key, '') = md.tenant_provider_setting_key ), updated_drivers as ( update eventhub.driver driver set first_names = coalesce(ct.first_names, driver.first_names), last_name = coalesce(ct.last_name, driver.last_name), birth_date = coalesce(ct.birth_date, driver.birth_date), source_updated_at = ct.source_updated_at, payload = driver.payload || ct.payload, updated_at = now() from compatible_targets ct where driver.tenant_key = ct.tenant_key and driver.event_source_id = ct.target_event_source_id and driver.source_driver_entity_id = ct.source_driver_entity_id returning driver.id ), inserted_drivers as ( insert into eventhub.driver( id, tenant_key, event_source_id, source_driver_entity_id, first_names, last_name, birth_date, source_updated_at, payload, updated_at ) select gen_random_uuid(), ct.tenant_key, ct.target_event_source_id, ct.source_driver_entity_id, ct.first_names, ct.last_name, ct.birth_date, ct.source_updated_at, ct.payload, now() from compatible_targets ct where not exists ( select 1 from eventhub.driver existing where existing.tenant_key = ct.tenant_key and existing.event_source_id = ct.target_event_source_id and existing.source_driver_entity_id = ct.source_driver_entity_id ) returning id ) select (select count(*) from updated_drivers) as updated_drivers, (select count(*) from inserted_drivers) as inserted_drivers; -- 3) Project driver-card identifiers from master-data relations. with master_sources as ( select es.id, es.tenant_key, es.source_instance_key, coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key from eventhub.event_source es where es.provider_key = 'TACHOGRAPH' and es.source_kind = 'MASTER_DATA' and es.source_key = 'TACHOGRAPH_MASTER_DATA' ), card_projection as ( select distinct on (ms.tenant_key, ms.source_instance_key, ms.tenant_provider_setting_key, rel.to_source_entity_id) ms.tenant_key, ms.source_instance_key, ms.tenant_provider_setting_key, rel.to_source_entity_id as source_driver_entity_id, nullif(trim(card.payload ->> 'card_nation'), '') as card_nation, nullif(trim(card.payload ->> 'card_number'), '') as card_number, rel.source_updated_at from master_sources ms join eventhub.source_master_relation rel on rel.tenant_key = ms.tenant_key and rel.event_source_id = ms.id and rel.relation_type = 'DRIVER_CARD_DRIVER' and rel.from_entity_type = 'DRIVER_CARD' and rel.to_entity_type = 'DRIVER' join eventhub.source_master_entity card on card.tenant_key = ms.tenant_key and card.event_source_id = ms.id and card.entity_type = 'DRIVER_CARD' and card.source_entity_id = rel.from_source_entity_id order by ms.tenant_key, ms.source_instance_key, ms.tenant_provider_setting_key, rel.to_source_entity_id, rel.valid_to desc nulls last, rel.valid_from desc nulls last, rel.updated_at desc ), updated_driver_cards as ( update eventhub.driver driver set card_nation = coalesce(driver.card_nation, projection.card_nation), card_number = coalesce(driver.card_number, projection.card_number), source_updated_at = coalesce(projection.source_updated_at, driver.source_updated_at), updated_at = now() from card_projection projection join eventhub.event_source es on es.id = driver.event_source_id where driver.tenant_key = projection.tenant_key and es.provider_key = 'TACHOGRAPH' and es.source_instance_key = projection.source_instance_key and coalesce(es.tenant_provider_setting_key, '') = projection.tenant_provider_setting_key and driver.source_driver_entity_id = projection.source_driver_entity_id and ( (driver.card_nation is null and projection.card_nation is not null) or (driver.card_number is null and projection.card_number is not null) ) returning driver.id ) select count(*) as updated_driver_cards from updated_driver_cards; -- 4) Remap events from provisional card-only drivers to proper source-driver aggregates. with provisional_to_real as ( select provisional.id as provisional_driver_id, real.id as real_driver_id from eventhub.driver provisional join eventhub.event_source provisional_source on provisional_source.id = provisional.event_source_id and provisional_source.provider_key = 'TACHOGRAPH' join eventhub.driver real on real.tenant_key = provisional.tenant_key and real.source_driver_entity_id is not null and real.card_nation = provisional.card_nation and real.card_number = provisional.card_number join eventhub.event_source real_source on real_source.id = real.event_source_id and real_source.provider_key = provisional_source.provider_key and real_source.tenant_key = provisional_source.tenant_key and real_source.source_instance_key = provisional_source.source_instance_key and coalesce(real_source.tenant_provider_setting_key, '') = coalesce(provisional_source.tenant_provider_setting_key, '') where provisional.source_driver_entity_id is null and provisional.card_nation is not null and provisional.card_number is not null and provisional.id <> real.id ), updated_events as ( update eventhub.event e set driver_id = map.real_driver_id from provisional_to_real map where e.driver_id = map.provisional_driver_id and e.driver_id <> map.real_driver_id returning e.id ) select count(*) as remapped_events from updated_events; -- 5) Delete now-unreferenced provisional tachograph driver rows. with deleted_drivers as ( delete from eventhub.driver driver using eventhub.event_source es where es.id = driver.event_source_id and es.provider_key = 'TACHOGRAPH' and driver.source_driver_entity_id is null and driver.card_nation is not null and driver.card_number is not null and not exists ( select 1 from eventhub.event e where e.driver_id = driver.id ) returning driver.id ) select count(*) as deleted_provisional_drivers from deleted_drivers;