Introduce driver card identity model

This commit is contained in:
trifonovt 2026-05-08 13:19:00 +02:00
parent 14a6f8d42e
commit de9c884578
8 changed files with 2242 additions and 670 deletions

View File

@ -1,6 +1,8 @@
create extension if not exists pgcrypto; create extension if not exists pgcrypto;
create extension if not exists postgis; create extension if not exists postgis;
create extension if not exists timescaledb;
drop schema if exists eventhub cascade;
create schema if not exists eventhub; create schema if not exists eventhub;
create table if not exists eventhub.event_source ( create table if not exists eventhub.event_source (
@ -143,11 +145,56 @@ create table if not exists eventhub.source_master_relation (
constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to)
); );
create table if not exists eventhub.vehicle ( create table if not exists eventhub.driver (
id uuid primary key,
first_names text,
last_name text,
birth_date date,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists eventhub.driver_card (
id uuid primary key,
driver_id uuid references eventhub.driver(id),
nation text not null,
card_number text not null,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists eventhub.source_driver_identity (
id uuid primary key, id uuid primary key,
tenant_key text not null, tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
source_vehicle_entity_id text, source_driver_entity_id text not null,
driver_id uuid not null references eventhub.driver(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_driver_identity unique (tenant_key, event_source_id, source_driver_entity_id)
);
create table if not exists eventhub.source_driver_card_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_driver_card_entity_id text not null,
driver_card_id uuid not null references eventhub.driver_card(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_driver_card_identity unique (tenant_key, event_source_id, source_driver_card_entity_id)
);
create table if not exists eventhub.vehicle (
id uuid primary key,
vin text, vin text,
created_at timestamptz not null default now(), created_at timestamptz not null default now(),
updated_at timestamptz not null default now() updated_at timestamptz not null default now()
@ -155,9 +202,6 @@ create table if not exists eventhub.vehicle (
create table if not exists eventhub.vehicle_registration ( create table if not exists eventhub.vehicle_registration (
id uuid primary key, id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_registration_entity_id text,
nation text not null, nation text not null,
registration_number text not null, registration_number text not null,
source_updated_at timestamptz, source_updated_at timestamptz,
@ -166,6 +210,32 @@ create table if not exists eventhub.vehicle_registration (
updated_at timestamptz not null default now() updated_at timestamptz not null default now()
); );
create table if not exists eventhub.source_vehicle_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_vehicle_entity_id text not null,
vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_vehicle_identity unique (tenant_key, event_source_id, source_vehicle_entity_id)
);
create table if not exists eventhub.source_vehicle_registration_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_registration_entity_id text not null,
vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_vehicle_registration_identity unique (tenant_key, event_source_id, source_registration_entity_id)
);
create table if not exists eventhub.vehicle_registration_assignment ( create table if not exists eventhub.vehicle_registration_assignment (
id uuid primary key, id uuid primary key,
tenant_key text not null, tenant_key text not null,
@ -186,9 +256,11 @@ create table if not exists eventhub.event (
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
data_package_id uuid not null references eventhub.data_package(id), data_package_id uuid not null references eventhub.data_package(id),
external_source_event_id text not null, external_source_event_id text not null,
driver_entity_id uuid references eventhub.source_master_entity(id), driver_id uuid references eventhub.driver(id),
driver_card_id uuid references eventhub.driver_card(id),
vehicle_id uuid references eventhub.vehicle(id), vehicle_id uuid references eventhub.vehicle(id),
vehicle_registration_id uuid references eventhub.vehicle_registration(id), vehicle_registration_id uuid references eventhub.vehicle_registration(id),
source_package_id text,
source_package_entity_id uuid references eventhub.source_master_entity(id), source_package_entity_id uuid references eventhub.source_master_entity(id),
occurred_at timestamptz not null, occurred_at timestamptz not null,
received_partner_at timestamptz, received_partner_at timestamptz,
@ -205,26 +277,90 @@ create table if not exists eventhub.event (
created_at timestamptz not null default now(), created_at timestamptz not null default now(),
constraint pk_event primary key (occurred_at, id), constraint pk_event primary key (occurred_at, id),
constraint chk_event_driver_or_vehicle_ref check ( constraint chk_event_driver_or_vehicle_ref check (
driver_entity_id is not null driver_id is not null
or driver_card_id is not null
or vehicle_id is not null or vehicle_id is not null
or vehicle_registration_id is not null or vehicle_registration_id is not null
) )
); );
create table if not exists eventhub.event_source_record (
source_record_key_hash text primary key,
event_occurred_at timestamptz not null,
event_id uuid not null,
created_at timestamptz not null default now()
);
create table if not exists eventhub.event_detail ( create table if not exists eventhub.event_detail (
event_occurred_at timestamptz not null, event_occurred_at timestamptz not null,
event_id uuid not null, event_id uuid not null,
detail_type text not null, detail_type text not null,
attributes jsonb not null default '{}'::jsonb, attributes jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(), created_at timestamptz not null default now(),
constraint pk_event_detail primary key (event_occurred_at, event_id, detail_type), constraint pk_event_detail primary key (event_occurred_at, event_id, detail_type)
constraint fk_event_detail_event foreign key (event_occurred_at, event_id)
references eventhub.event(occurred_at, id)
on delete cascade
); );
create unique index if not exists ux_event_source_record select create_hypertable(
on eventhub.event(source_record_key_hash); 'eventhub.event',
'occurred_at',
chunk_time_interval => interval '7 days',
if_not_exists => true
);
alter table eventhub.event_source_record
add constraint fk_event_source_record_event foreign key (event_occurred_at, event_id)
references eventhub.event(occurred_at, id)
on delete cascade
deferrable initially deferred;
alter table eventhub.event_detail
add constraint fk_event_detail_event foreign key (event_occurred_at, event_id)
references eventhub.event(occurred_at, id)
on delete cascade;
create index if not exists idx_data_package_source_time
on eventhub.data_package(tenant_key, event_source_id, received_at desc);
create index if not exists idx_data_package_scope
on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to);
create index if not exists idx_data_package_extraction
on eventhub.data_package(tenant_key, event_source_id, import_run_id, event_family, extraction_source_kind, extraction_code, batch_no);
create index if not exists idx_import_run_source_status
on eventhub.import_run(tenant_key, event_source_id, status, started_at desc);
create index if not exists idx_source_master_entity_type_key
on eventhub.source_master_entity(tenant_key, event_source_id, entity_type, source_external_key)
where source_external_key is not null;
create index if not exists idx_source_master_entity_payload_gin
on eventhub.source_master_entity using gin(payload);
create index if not exists idx_source_master_relation_from
on eventhub.source_master_relation(tenant_key, event_source_id, from_entity_type, from_source_entity_id, relation_type);
create index if not exists idx_source_master_relation_to
on eventhub.source_master_relation(tenant_key, event_source_id, to_entity_type, to_source_entity_id, relation_type);
create index if not exists idx_source_master_relation_payload_gin
on eventhub.source_master_relation using gin(payload);
create index if not exists idx_vehicle_vin
on eventhub.vehicle(vin)
where vin is not null;
create index if not exists idx_vehicle_registration_plate
on eventhub.vehicle_registration(nation, registration_number);
create index if not exists idx_vehicle_registration_assignment_registration_time
on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to);
create index if not exists idx_vehicle_registration_assignment_vehicle_time
on eventhub.vehicle_registration_assignment(vehicle_id, valid_from desc, valid_to);
create index if not exists idx_event_source_record_event
on eventhub.event_source_record(event_occurred_at, event_id);
create index if not exists idx_event_signature create index if not exists idx_event_signature
on eventhub.event(event_signature_hash) on eventhub.event(event_signature_hash)
@ -236,12 +372,49 @@ create index if not exists idx_event_source_time
create index if not exists idx_event_package_time create index if not exists idx_event_package_time
on eventhub.event(data_package_id, occurred_at desc); on eventhub.event(data_package_id, occurred_at desc);
create index if not exists idx_event_source_package_id
on eventhub.event(source_package_id)
where source_package_id is not null;
create index if not exists idx_event_domain_type_time create index if not exists idx_event_domain_type_time
on eventhub.event(event_domain, event_type, occurred_at desc); on eventhub.event(event_domain, event_type, occurred_at desc);
create index if not exists idx_driver_card_key
on eventhub.driver_card(nation, card_number);
create index if not exists idx_driver_card_driver
on eventhub.driver_card(driver_id)
where driver_id is not null;
create unique index if not exists ux_driver_card_key
on eventhub.driver_card(nation, card_number);
create unique index if not exists ux_vehicle_vin
on eventhub.vehicle(vin)
where vin is not null;
create unique index if not exists ux_vehicle_registration_plate
on eventhub.vehicle_registration(nation, registration_number);
create index if not exists idx_source_driver_identity_driver
on eventhub.source_driver_identity(driver_id);
create index if not exists idx_source_driver_card_identity_card
on eventhub.source_driver_card_identity(driver_card_id);
create index if not exists idx_source_vehicle_identity_vehicle
on eventhub.source_vehicle_identity(vehicle_id);
create index if not exists idx_source_vehicle_registration_identity_registration
on eventhub.source_vehicle_registration_identity(vehicle_registration_id);
create index if not exists idx_event_driver_time create index if not exists idx_event_driver_time
on eventhub.event(driver_entity_id, occurred_at desc) on eventhub.event(driver_id, occurred_at desc)
where driver_entity_id is not null; where driver_id is not null;
create index if not exists idx_event_driver_card_time
on eventhub.event(driver_card_id, occurred_at desc)
where driver_card_id is not null;
create index if not exists idx_event_vehicle_time create index if not exists idx_event_vehicle_time
on eventhub.event(vehicle_id, occurred_at desc) on eventhub.event(vehicle_id, occurred_at desc)
@ -264,54 +437,18 @@ create index if not exists idx_event_detail_type
create index if not exists idx_event_detail_attributes_gin create index if not exists idx_event_detail_attributes_gin
on eventhub.event_detail using gin(attributes); on eventhub.event_detail using gin(attributes);
create index if not exists idx_source_master_entity_type_key create index if not exists idx_event_detail_yellowfox_slot
on eventhub.source_master_entity(tenant_key, event_source_id, entity_type, source_external_key) on eventhub.event_detail(detail_type, (attributes ->> 'slot'), event_occurred_at)
where source_external_key is not null; where detail_type in ('DRIVER_ACTIVITY', 'DRIVER_CARD');
create index if not exists idx_source_master_entity_payload_gin create index if not exists idx_event_detail_yellowfox_eventtype_state
on eventhub.source_master_entity using gin(payload); on eventhub.event_detail(
(attributes ->> 'yellowFoxEventType'),
(attributes ->> 'yellowFoxState'),
event_occurred_at
)
where attributes ? 'yellowFoxEventType';
create index if not exists idx_source_master_relation_from create index if not exists idx_event_detail_yellowfox_ignition
on eventhub.source_master_relation(tenant_key, event_source_id, from_entity_type, from_source_entity_id, relation_type); on eventhub.event_detail(detail_type, (attributes ->> 'ignitionState'), event_occurred_at)
where attributes ? 'ignitionState';
create index if not exists idx_source_master_relation_to
on eventhub.source_master_relation(tenant_key, event_source_id, to_entity_type, to_source_entity_id, relation_type);
create index if not exists idx_source_master_relation_payload_gin
on eventhub.source_master_relation using gin(payload);
create index if not exists idx_vehicle_lookup_ctx
on eventhub.vehicle(tenant_key, event_source_id, updated_at desc);
create index if not exists idx_vehicle_source_entity
on eventhub.vehicle(tenant_key, event_source_id, source_vehicle_entity_id)
where source_vehicle_entity_id is not null;
create index if not exists idx_vehicle_vin
on eventhub.vehicle(tenant_key, event_source_id, vin)
where vin is not null;
create index if not exists idx_vehicle_registration_source_entity
on eventhub.vehicle_registration(tenant_key, event_source_id, source_registration_entity_id)
where source_registration_entity_id is not null;
create index if not exists idx_vehicle_registration_plate
on eventhub.vehicle_registration(tenant_key, event_source_id, nation, registration_number);
create index if not exists idx_vehicle_registration_assignment_registration_time
on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to);
create index if not exists idx_vehicle_registration_assignment_vehicle_time
on eventhub.vehicle_registration_assignment(vehicle_id, valid_from desc, valid_to);
create index if not exists idx_data_package_source_time
on eventhub.data_package(tenant_key, event_source_id, received_at desc);
create index if not exists idx_data_package_scope
on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to);
create index if not exists idx_data_package_extraction
on eventhub.data_package(tenant_key, event_source_id, import_run_id, event_family, extraction_source_kind, extraction_code, batch_no);
create index if not exists idx_import_run_source_status
on eventhub.import_run(tenant_key, event_source_id, status, started_at desc);

View File

@ -4,6 +4,7 @@ import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto; import at.procon.eventhub.dto.DriverRefDto;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.LocalDate;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@ -23,13 +24,13 @@ public class DriverIdentityRepository {
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
} }
public UUID resolveOrCreateDriverId( public ResolvedDriverReference resolveOrCreateDriverReference(
String tenantKey, String tenantKey,
int eventSourceId, int eventSourceId,
DriverRefDto driverRef DriverRefDto driverRef
) { ) {
if (driverRef == null || !driverRef.hasAnyReference()) { if (driverRef == null || !driverRef.hasAnyReference()) {
return null; return ResolvedDriverReference.empty();
} }
String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey");
@ -38,19 +39,21 @@ public class DriverIdentityRepository {
String cardNation = driverCard == null ? null : normalizeNullable(driverCard.nation()); String cardNation = driverCard == null ? null : normalizeNullable(driverCard.nation());
String cardNumber = driverCard == null ? null : normalizeNullable(driverCard.number()); String cardNumber = driverCard == null ? null : normalizeNullable(driverCard.number());
UUID driverId = resolveDriverId(normalizedTenantKey, eventSourceId, sourceDriverEntityId, cardNation, cardNumber); UUID driverId = findBySourceDriverEntityId(normalizedTenantKey, eventSourceId, sourceDriverEntityId);
if (driverId == null) { UUID driverCardId = resolveOrCreateDriverCardId(cardNation, cardNumber, driverId);
if (driverId == null && driverCardId != null) {
driverId = findDriverIdByCardId(driverCardId);
}
if (driverId == null && sourceDriverEntityId != null) {
Map<String, Object> payload = new LinkedHashMap<>(); Map<String, Object> payload = new LinkedHashMap<>();
put(payload, "source", "event"); put(payload, "source", "event");
put(payload, "source_driver_entity_id", sourceDriverEntityId); put(payload, "source_driver_entity_id", sourceDriverEntityId);
put(payload, "card_nation", cardNation);
put(payload, "card_number", cardNumber);
driverId = createDriver( driverId = createDriver(
normalizedTenantKey, normalizedTenantKey,
eventSourceId, eventSourceId,
sourceDriverEntityId, sourceDriverEntityId,
cardNation, null,
cardNumber,
null, null,
null, null,
null, null,
@ -58,20 +61,35 @@ public class DriverIdentityRepository {
); );
} }
touchDriver(driverId, sourceDriverEntityId, cardNation, cardNumber); if (driverId != null && sourceDriverEntityId != null) {
return driverId; upsertSourceDriverIdentity(
normalizedTenantKey,
eventSourceId,
sourceDriverEntityId,
driverId,
null,
Map.of("source", "event")
);
}
if (driverCardId != null && driverId != null) {
linkDriverCard(driverCardId, driverId);
}
return new ResolvedDriverReference(driverId, driverCardId);
} }
@Transactional @Transactional
public int reconcileFromMasterData(String tenantKey, int eventSourceId) { public int reconcileFromMasterData(String tenantKey, int eventSourceId) {
String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey");
int updates = reconcileDriversFromMasterData(normalizedTenantKey, eventSourceId); int updates = reconcileDriversFromMasterData(normalizedTenantKey, eventSourceId);
updates += projectDriverCardsFromMasterData(normalizedTenantKey, eventSourceId); updates += reconcileDriverCardsFromMasterData(normalizedTenantKey, eventSourceId);
updates += projectDriverCardLinksFromMasterData(normalizedTenantKey, eventSourceId);
return updates; return updates;
} }
private int reconcileDriversFromMasterData(String tenantKey, int eventSourceId) { private int reconcileDriversFromMasterData(String tenantKey, int eventSourceId) {
Long count = jdbcTemplate.queryForObject( int insertedDrivers;
if (driverUsesLegacySchema()) {
insertedDrivers = jdbcTemplate.update(
compatibleSourcesCte() + """ compatibleSourcesCte() + """
, master_drivers as ( , master_drivers as (
select distinct on (nullif(trim(source_entity_id), '')) select distinct on (nullif(trim(source_entity_id), ''))
@ -90,125 +108,515 @@ public class DriverIdentityRepository {
and event_source_id in (select id from compatible_sources) and event_source_id in (select id from compatible_sources)
and entity_type = 'DRIVER' and entity_type = 'DRIVER'
and nullif(trim(source_entity_id), '') is not null and nullif(trim(source_entity_id), '') is not null
and source_entity_id not like 'DRIVER_CARD:%'
order by nullif(trim(source_entity_id), ''), updated_at desc order by nullif(trim(source_entity_id), ''), updated_at desc
), ),
updated_by_source as ( resolved_drivers as (
update eventhub.driver driver select master.event_source_id,
set first_names = coalesce(master.first_names, driver.first_names),
last_name = coalesce(master.last_name, driver.last_name),
birth_date = coalesce(master.birth_date, driver.birth_date),
source_updated_at = master.source_updated_at,
payload = driver.payload || master.payload,
updated_at = now()
from master_drivers master
where driver.tenant_key = ?
and driver.event_source_id in (select id from compatible_sources)
and driver.source_driver_entity_id = master.source_driver_entity_id
returning driver.id
),
inserted as (
insert into eventhub.driver(
id, tenant_key, event_source_id, source_driver_entity_id,
first_names, last_name, birth_date, source_updated_at, payload, updated_at
)
select gen_random_uuid(),
?,
master.event_source_id,
master.source_driver_entity_id, master.source_driver_entity_id,
master.first_names, master.first_names,
master.last_name, master.last_name,
master.birth_date, master.birth_date,
master.source_updated_at, master.source_updated_at,
master.payload, master.payload,
now() coalesce(identity.driver_id, gen_random_uuid()) as driver_id
from master_drivers master from master_drivers master
left join eventhub.source_driver_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_entity_id = master.source_driver_entity_id
)
insert into eventhub.driver(
id, tenant_key, event_source_id, source_driver_entity_id,
first_names, last_name, birth_date, source_updated_at, payload, updated_at
)
select distinct on (resolved.driver_id)
resolved.driver_id,
?,
resolved.event_source_id,
resolved.source_driver_entity_id,
resolved.first_names,
resolved.last_name,
resolved.birth_date,
resolved.source_updated_at,
resolved.payload,
now()
from resolved_drivers resolved
where not exists ( where not exists (
select 1 select 1
from eventhub.driver existing from eventhub.driver existing
where existing.tenant_key = ? where existing.id = resolved.driver_id
and existing.event_source_id in (select id from compatible_sources)
and existing.source_driver_entity_id = master.source_driver_entity_id
) )
returning id
)
select (select count(*) from updated_by_source)
+ (select count(*) from inserted)
""", """,
Long.class,
eventSourceId, eventSourceId,
tenantKey, tenantKey,
tenantKey, tenantKey,
tenantKey,
tenantKey tenantKey
); );
return count == null ? 0 : Math.toIntExact(count); } else {
} insertedDrivers = jdbcTemplate.update(
private int projectDriverCardsFromMasterData(String tenantKey, int eventSourceId) {
Long count = jdbcTemplate.queryForObject(
compatibleSourcesCte() + """ compatibleSourcesCte() + """
, driver_card_projection as ( , master_drivers as (
select distinct on (rel.to_source_entity_id) select distinct on (nullif(trim(source_entity_id), ''))
rel.to_source_entity_id as source_driver_entity_id, event_source_id,
nullif(trim(card.payload ->> 'card_nation'), '') as card_nation, nullif(trim(source_entity_id), '') as source_driver_entity_id,
nullif(trim(card.payload ->> 'card_number'), '') as card_number, nullif(trim(payload ->> 'first_names'), '') as first_names,
rel.source_updated_at coalesce(
from eventhub.source_master_relation rel nullif(trim(payload ->> 'last_name'), ''),
join eventhub.source_master_entity card nullif(trim(payload ->> 'surname'), '')
on card.tenant_key = rel.tenant_key ) as last_name,
and card.event_source_id = rel.event_source_id cast(nullif(trim(payload ->> 'birth_date'), '') as date) as birth_date,
and card.entity_type = 'DRIVER_CARD' source_updated_at,
and card.source_entity_id = rel.from_source_entity_id payload
where rel.tenant_key = ? from eventhub.source_master_entity
and rel.event_source_id in (select id from compatible_sources) where tenant_key = ?
and rel.relation_type = 'DRIVER_CARD_DRIVER' and event_source_id in (select id from compatible_sources)
and rel.from_entity_type = 'DRIVER_CARD' and entity_type = 'DRIVER'
and rel.to_entity_type = 'DRIVER' and nullif(trim(source_entity_id), '') is not null
order by rel.to_source_entity_id, order by nullif(trim(source_entity_id), ''), updated_at desc
rel.valid_to desc nulls last,
rel.valid_from desc nulls last,
rel.updated_at desc
), ),
updated_by_source as ( resolved_drivers as (
update eventhub.driver driver select master.event_source_id,
set card_nation = coalesce(driver.card_nation, projection.card_nation), master.source_driver_entity_id,
card_number = coalesce(driver.card_number, projection.card_number), master.first_names,
source_updated_at = coalesce(projection.source_updated_at, driver.source_updated_at), master.last_name,
updated_at = now() master.birth_date,
from driver_card_projection projection master.source_updated_at,
where driver.tenant_key = ? master.payload,
and driver.event_source_id in (select id from compatible_sources) coalesce(identity.driver_id, gen_random_uuid()) as driver_id
and driver.source_driver_entity_id = projection.source_driver_entity_id from master_drivers master
and ( left join eventhub.source_driver_identity identity
(driver.card_nation is null and projection.card_nation is not null) on identity.tenant_key = ?
or (driver.card_number is null and projection.card_number is not null) and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_entity_id = master.source_driver_entity_id
) )
returning driver.id insert into eventhub.driver(
id, first_names, last_name, birth_date, source_updated_at, payload, updated_at
)
select distinct on (resolved.driver_id)
resolved.driver_id,
resolved.first_names,
resolved.last_name,
resolved.birth_date,
resolved.source_updated_at,
resolved.payload,
now()
from resolved_drivers resolved
where not exists (
select 1
from eventhub.driver existing
where existing.id = resolved.driver_id
) )
select count(*)
from updated_by_source
""", """,
Long.class,
eventSourceId, eventSourceId,
tenantKey, tenantKey,
tenantKey tenantKey
); );
return count == null ? 0 : Math.toIntExact(count);
} }
private UUID resolveDriverId( int updatedDrivers = jdbcTemplate.update(
String tenantKey, compatibleSourcesCte() + """
int eventSourceId, , master_drivers as (
String sourceDriverEntityId, select distinct on (nullif(trim(source_entity_id), ''))
String cardNation, nullif(trim(source_entity_id), '') as source_driver_entity_id,
String cardNumber nullif(trim(payload ->> 'first_names'), '') as first_names,
) { coalesce(
UUID driverId = findBySourceDriverEntityId(tenantKey, eventSourceId, sourceDriverEntityId); nullif(trim(payload ->> 'last_name'), ''),
if (driverId == null) { nullif(trim(payload ->> 'surname'), '')
driverId = findByCard(tenantKey, eventSourceId, cardNation, cardNumber); ) as last_name,
cast(nullif(trim(payload ->> 'birth_date'), '') as date) as birth_date,
source_updated_at,
payload
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'DRIVER'
and nullif(trim(source_entity_id), '') is not null
order by nullif(trim(source_entity_id), ''), updated_at desc
)
update eventhub.driver driver
set first_names = coalesce(master.first_names, driver.first_names),
last_name = coalesce(master.last_name, driver.last_name),
birth_date = coalesce(master.birth_date, driver.birth_date),
source_updated_at = coalesce(master.source_updated_at, driver.source_updated_at),
payload = driver.payload || master.payload,
updated_at = now()
from master_drivers master
join eventhub.source_driver_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_entity_id = master.source_driver_entity_id
where identity.driver_id = driver.id
""",
eventSourceId,
tenantKey,
tenantKey
);
int linkedDrivers = jdbcTemplate.update(
compatibleSourcesCte() + """
, master_drivers as (
select distinct on (nullif(trim(source_entity_id), ''))
event_source_id,
nullif(trim(source_entity_id), '') as source_driver_entity_id,
source_updated_at,
payload
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'DRIVER'
and nullif(trim(source_entity_id), '') is not null
order by nullif(trim(source_entity_id), ''), updated_at desc
),
resolved_driver_ids as (
select master.event_source_id,
master.source_driver_entity_id,
master.source_updated_at,
master.payload,
coalesce(identity.driver_id, (
select created.id
from eventhub.driver created
where created.payload = master.payload
order by created.updated_at desc
limit 1
)) as driver_id
from master_drivers master
left join eventhub.source_driver_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_entity_id = master.source_driver_entity_id
)
insert into eventhub.source_driver_identity(
id, tenant_key, event_source_id, source_driver_entity_id,
driver_id, source_updated_at, payload, updated_at
)
select gen_random_uuid(),
?,
resolved.event_source_id,
resolved.source_driver_entity_id,
resolved.driver_id,
resolved.source_updated_at,
resolved.payload,
now()
from resolved_driver_ids resolved
where resolved.driver_id is not null
on conflict (tenant_key, event_source_id, source_driver_entity_id)
do update set
driver_id = excluded.driver_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_identity.source_updated_at),
payload = eventhub.source_driver_identity.payload || excluded.payload,
updated_at = now()
""",
eventSourceId,
tenantKey,
tenantKey,
tenantKey
);
return insertedDrivers + updatedDrivers + linkedDrivers;
} }
return driverId;
private int reconcileDriverCardsFromMasterData(String tenantKey, int eventSourceId) {
int insertedCards = jdbcTemplate.update(
compatibleSourcesCte() + """
, master_driver_cards as (
select distinct on (
nullif(trim(source_entity_id), ''),
nullif(trim(payload ->> 'card_nation'), ''),
nullif(trim(payload ->> 'card_number'), '')
)
event_source_id,
nullif(trim(source_entity_id), '') as source_driver_card_entity_id,
nullif(trim(payload ->> 'card_nation'), '') as card_nation,
nullif(trim(payload ->> 'card_number'), '') as card_number,
source_updated_at,
payload
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'DRIVER_CARD'
and nullif(trim(payload ->> 'card_nation'), '') is not null
and nullif(trim(payload ->> 'card_number'), '') is not null
order by nullif(trim(source_entity_id), ''),
nullif(trim(payload ->> 'card_nation'), ''),
nullif(trim(payload ->> 'card_number'), ''),
updated_at desc
),
canonical_driver_cards as (
select distinct on (master.card_nation, master.card_number)
master.card_nation,
master.card_number,
master.source_updated_at,
master.payload
from master_driver_cards master
order by master.card_nation,
master.card_number,
case when master.source_driver_card_entity_id is null then 1 else 0 end,
master.source_updated_at desc,
master.source_driver_card_entity_id
),
existing_driver_cards as (
select distinct on (existing.nation, existing.card_number)
existing.id,
existing.nation,
existing.card_number
from eventhub.driver_card existing
order by existing.nation,
existing.card_number,
case when existing.driver_id is null then 1 else 0 end,
existing.updated_at desc,
existing.created_at desc,
existing.id
),
resolved_cards as (
select canonical.card_nation,
canonical.card_number,
canonical.source_updated_at,
canonical.payload,
coalesce(existing.id, gen_random_uuid()) as driver_card_id
from canonical_driver_cards canonical
left join existing_driver_cards existing
on existing.nation = canonical.card_nation
and existing.card_number = canonical.card_number
)
insert into eventhub.driver_card(
id, driver_id, nation, card_number, source_updated_at, payload, updated_at
)
select distinct on (resolved.driver_card_id)
resolved.driver_card_id,
null,
resolved.card_nation,
resolved.card_number,
resolved.source_updated_at,
resolved.payload,
now()
from resolved_cards resolved
where not exists (
select 1
from eventhub.driver_card existing
where existing.id = resolved.driver_card_id
)
""",
eventSourceId,
tenantKey
);
int updatedCards = jdbcTemplate.update(
compatibleSourcesCte() + """
, master_driver_cards as (
select distinct on (
nullif(trim(source_entity_id), ''),
nullif(trim(payload ->> 'card_nation'), ''),
nullif(trim(payload ->> 'card_number'), '')
)
nullif(trim(source_entity_id), '') as source_driver_card_entity_id,
nullif(trim(payload ->> 'card_nation'), '') as card_nation,
nullif(trim(payload ->> 'card_number'), '') as card_number,
source_updated_at,
payload
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'DRIVER_CARD'
and nullif(trim(payload ->> 'card_nation'), '') is not null
and nullif(trim(payload ->> 'card_number'), '') is not null
order by nullif(trim(source_entity_id), ''),
nullif(trim(payload ->> 'card_nation'), ''),
nullif(trim(payload ->> 'card_number'), ''),
updated_at desc
),
canonical_driver_cards as (
select distinct on (master.card_nation, master.card_number)
master.card_nation,
master.card_number,
master.source_updated_at,
master.payload
from master_driver_cards master
order by master.card_nation,
master.card_number,
case when master.source_driver_card_entity_id is null then 1 else 0 end,
master.source_updated_at desc,
master.source_driver_card_entity_id
),
existing_driver_cards as (
select distinct on (existing.nation, existing.card_number)
existing.id,
existing.nation,
existing.card_number
from eventhub.driver_card existing
order by existing.nation,
existing.card_number,
case when existing.driver_id is null then 1 else 0 end,
existing.updated_at desc,
existing.created_at desc,
existing.id
),
resolved_cards as (
select canonical.source_updated_at,
canonical.payload,
existing.id as driver_card_id
from canonical_driver_cards canonical
join existing_driver_cards existing
on existing.nation = canonical.card_nation
and existing.card_number = canonical.card_number
)
update eventhub.driver_card card
set source_updated_at = coalesce(resolved.source_updated_at, card.source_updated_at),
payload = card.payload || resolved.payload,
updated_at = now()
from resolved_cards resolved
where card.id = resolved.driver_card_id
""",
eventSourceId,
tenantKey
);
int linkedCards = jdbcTemplate.update(
compatibleSourcesCte() + """
, master_driver_cards as (
select distinct on (
nullif(trim(source_entity_id), ''),
nullif(trim(payload ->> 'card_nation'), ''),
nullif(trim(payload ->> 'card_number'), '')
)
event_source_id,
nullif(trim(source_entity_id), '') as source_driver_card_entity_id,
nullif(trim(payload ->> 'card_nation'), '') as card_nation,
nullif(trim(payload ->> 'card_number'), '') as card_number,
source_updated_at,
payload
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'DRIVER_CARD'
and nullif(trim(payload ->> 'card_nation'), '') is not null
and nullif(trim(payload ->> 'card_number'), '') is not null
order by nullif(trim(source_entity_id), ''),
nullif(trim(payload ->> 'card_nation'), ''),
nullif(trim(payload ->> 'card_number'), ''),
updated_at desc
),
existing_driver_cards as (
select distinct on (existing.nation, existing.card_number)
existing.id,
existing.nation,
existing.card_number
from eventhub.driver_card existing
order by existing.nation,
existing.card_number,
case when existing.driver_id is null then 1 else 0 end,
existing.updated_at desc,
existing.created_at desc,
existing.id
),
resolved_cards as (
select master.event_source_id,
master.source_driver_card_entity_id,
master.source_updated_at,
master.payload,
coalesce(identity.driver_card_id, existing.id) as driver_card_id
from master_driver_cards master
left join eventhub.source_driver_card_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_card_entity_id = master.source_driver_card_entity_id
left join existing_driver_cards existing
on existing.nation = master.card_nation
and existing.card_number = master.card_number
)
insert into eventhub.source_driver_card_identity(
id, tenant_key, event_source_id, source_driver_card_entity_id,
driver_card_id, source_updated_at, payload, updated_at
)
select gen_random_uuid(),
?,
resolved.event_source_id,
resolved.source_driver_card_entity_id,
resolved.driver_card_id,
resolved.source_updated_at,
resolved.payload,
now()
from resolved_cards resolved
where resolved.source_driver_card_entity_id is not null
and resolved.driver_card_id is not null
on conflict (tenant_key, event_source_id, source_driver_card_entity_id)
do update set
driver_card_id = excluded.driver_card_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_card_identity.source_updated_at),
payload = eventhub.source_driver_card_identity.payload || excluded.payload,
updated_at = now()
""",
eventSourceId,
tenantKey,
tenantKey,
tenantKey
);
return insertedCards + updatedCards + linkedCards;
}
private int projectDriverCardLinksFromMasterData(String tenantKey, int eventSourceId) {
return jdbcTemplate.update(
compatibleSourcesCte() + """
update eventhub.driver_card card
set driver_id = driver_identity.driver_id,
source_updated_at = coalesce(relation.source_updated_at, card.source_updated_at),
updated_at = now()
from eventhub.source_master_relation relation
join eventhub.source_driver_card_identity card_identity
on card_identity.tenant_key = relation.tenant_key
and card_identity.event_source_id = relation.event_source_id
and card_identity.source_driver_card_entity_id = relation.from_source_entity_id
join eventhub.source_driver_identity driver_identity
on driver_identity.tenant_key = relation.tenant_key
and driver_identity.event_source_id = relation.event_source_id
and driver_identity.source_driver_entity_id = relation.to_source_entity_id
where relation.tenant_key = ?
and relation.event_source_id in (select id from compatible_sources)
and relation.relation_type = 'DRIVER_CARD_DRIVER'
and relation.from_entity_type = 'DRIVER_CARD'
and relation.to_entity_type = 'DRIVER'
and card.id = card_identity.driver_card_id
and (
card.driver_id is null
or card.driver_id = driver_identity.driver_id
or not exists (
select 1
from eventhub.source_driver_identity existing_identity
where existing_identity.driver_id = card.driver_id
)
)
""",
eventSourceId,
tenantKey
);
}
private UUID resolveOrCreateDriverCardId(
String cardNation,
String cardNumber,
UUID preferredDriverId
) {
if (cardNation == null || cardNumber == null) {
return null;
}
UUID driverCardId = findDriverCardByCard(cardNation, cardNumber);
if (driverCardId == null) {
Map<String, Object> payload = new LinkedHashMap<>();
put(payload, "source", "event");
put(payload, "card_nation", cardNation);
put(payload, "card_number", cardNumber);
return createDriverCard(
preferredDriverId,
cardNation,
cardNumber,
null,
payload
);
}
if (preferredDriverId != null) {
linkDriverCard(driverCardId, preferredDriverId);
}
return driverCardId;
} }
private UUID findBySourceDriverEntityId(String tenantKey, int eventSourceId, String sourceDriverEntityId) { private UUID findBySourceDriverEntityId(String tenantKey, int eventSourceId, String sourceDriverEntityId) {
@ -217,39 +625,50 @@ public class DriverIdentityRepository {
} }
return jdbcTemplate.query( return jdbcTemplate.query(
compatibleSourcesCte() + """ compatibleSourcesCte() + """
select d.id select identity.driver_id
from eventhub.driver d from eventhub.source_driver_identity identity
where d.tenant_key = ? where identity.tenant_key = ?
and d.event_source_id in (select id from compatible_sources) and identity.event_source_id in (select id from compatible_sources)
and d.source_driver_entity_id = ? and identity.source_driver_entity_id = ?
order by d.updated_at desc order by identity.updated_at desc
limit 1 limit 1
""", """,
rs -> rs.next() ? (UUID) rs.getObject("id") : null, rs -> rs.next() ? (UUID) rs.getObject("driver_id") : null,
eventSourceId, eventSourceId,
tenantKey, tenantKey,
sourceDriverEntityId sourceDriverEntityId
); );
} }
private UUID findByCard(String tenantKey, int eventSourceId, String cardNation, String cardNumber) { private UUID findDriverIdByCardId(UUID driverCardId) {
if (driverCardId == null) {
return null;
}
return jdbcTemplate.query(
"""
select driver_id
from eventhub.driver_card
where id = ?
""",
rs -> rs.next() ? (UUID) rs.getObject("driver_id") : null,
driverCardId
);
}
private UUID findDriverCardByCard(String cardNation, String cardNumber) {
if (cardNation == null || cardNumber == null) { if (cardNation == null || cardNumber == null) {
return null; return null;
} }
return jdbcTemplate.query( return jdbcTemplate.query(
compatibleSourcesCte() + """ """
select d.id select card.id
from eventhub.driver d from eventhub.driver_card card
where d.tenant_key = ? where card.nation = ?
and d.event_source_id in (select id from compatible_sources) and card.card_number = ?
and d.card_nation = ? order by card.updated_at desc
and d.card_number = ?
order by d.updated_at desc
limit 1 limit 1
""", """,
rs -> rs.next() ? (UUID) rs.getObject("id") : null, rs -> rs.next() ? (UUID) rs.getObject("id") : null,
eventSourceId,
tenantKey,
cardNation, cardNation,
cardNumber cardNumber
); );
@ -259,66 +678,152 @@ public class DriverIdentityRepository {
String tenantKey, String tenantKey,
int eventSourceId, int eventSourceId,
String sourceDriverEntityId, String sourceDriverEntityId,
String cardNation,
String cardNumber,
String firstNames, String firstNames,
String lastName, String lastName,
OffsetDateTime sourceUpdatedAt, OffsetDateTime sourceUpdatedAt,
LocalDate birthDate,
Map<String, Object> payload Map<String, Object> payload
) { ) {
UUID driverId = UUID.randomUUID(); UUID driverId = UUID.randomUUID();
if (driverUsesLegacySchema()) {
jdbcTemplate.update( jdbcTemplate.update(
""" """
insert into eventhub.driver( insert into eventhub.driver(
id, tenant_key, event_source_id, source_driver_entity_id, id, tenant_key, event_source_id, source_driver_entity_id,
card_nation, card_number, first_names, last_name, first_names, last_name, birth_date, source_updated_at, payload, updated_at
source_updated_at, payload, updated_at ) values (?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
""", """,
driverId, driverId,
tenantKey, tenantKey,
eventSourceId, eventSourceId,
sourceDriverEntityId, sourceDriverEntityId,
cardNation,
cardNumber,
firstNames, firstNames,
lastName, lastName,
birthDate,
sourceUpdatedAt, sourceUpdatedAt,
toJson(payload) toJson(payload)
); );
} else {
jdbcTemplate.update(
"""
insert into eventhub.driver(
id, first_names, last_name, birth_date, source_updated_at, payload, updated_at
) values (?, ?, ?, ?, ?, ?::jsonb, now())
""",
driverId,
firstNames,
lastName,
birthDate,
sourceUpdatedAt,
toJson(payload)
);
}
return driverId; return driverId;
} }
private void touchDriver( private boolean driverUsesLegacySchema() {
Integer legacyColumns = jdbcTemplate.queryForObject(
"""
select count(*)
from information_schema.columns
where table_schema = 'eventhub'
and table_name = 'driver'
and column_name in ('tenant_key', 'event_source_id', 'source_driver_entity_id')
""",
Integer.class
);
return legacyColumns != null && legacyColumns == 3;
}
private UUID createDriverCard(
UUID driverId, UUID driverId,
String sourceDriverEntityId,
String cardNation, String cardNation,
String cardNumber String cardNumber,
OffsetDateTime sourceUpdatedAt,
Map<String, Object> payload
) { ) {
if (sourceDriverEntityId == null && cardNation == null && cardNumber == null) { UUID driverCardId = UUID.randomUUID();
jdbcTemplate.update(
"""
insert into eventhub.driver_card(
id, driver_id, nation, card_number, source_updated_at, payload, updated_at
) values (?, ?, ?, ?, ?, ?::jsonb, now())
""",
driverCardId,
driverId,
cardNation,
cardNumber,
sourceUpdatedAt,
toJson(payload)
);
return driverCardId;
}
private void upsertSourceDriverIdentity(
String tenantKey,
int eventSourceId,
String sourceDriverEntityId,
UUID driverId,
OffsetDateTime sourceUpdatedAt,
Map<String, Object> payload
) {
if (sourceDriverEntityId == null || driverId == null) {
return; return;
} }
jdbcTemplate.update( jdbcTemplate.update(
""" """
update eventhub.driver insert into eventhub.source_driver_identity(
set source_driver_entity_id = coalesce(source_driver_entity_id, cast(? as text)), id, tenant_key, event_source_id, source_driver_entity_id,
card_nation = coalesce(card_nation, cast(? as text)), driver_id, source_updated_at, payload, updated_at
card_number = coalesce(card_number, cast(? as text)), ) values (?, ?, ?, ?, ?, ?, ?::jsonb, now())
on conflict (tenant_key, event_source_id, source_driver_entity_id)
do update set
driver_id = excluded.driver_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_identity.source_updated_at),
payload = eventhub.source_driver_identity.payload || excluded.payload,
updated_at = now()
""",
UUID.randomUUID(),
tenantKey,
eventSourceId,
sourceDriverEntityId,
driverId,
sourceUpdatedAt,
toJson(payload)
);
}
private void linkDriverCard(UUID driverCardId, UUID driverId) {
if (driverCardId == null || driverId == null) {
return;
}
jdbcTemplate.update(
"""
update eventhub.driver_card
set driver_id = ?,
updated_at = now() updated_at = now()
where id = ? where id = ?
and ( and (
(source_driver_entity_id is null and cast(? as text) is not null) driver_id is null
or (card_nation is null and cast(? as text) is not null) or driver_id = ?
or (card_number is null and cast(? as text) is not null) or (
exists (
select 1
from eventhub.source_driver_identity preferred_identity
where preferred_identity.driver_id = ?
)
and not exists (
select 1
from eventhub.source_driver_identity current_identity
where current_identity.driver_id = eventhub.driver_card.driver_id
)
)
) )
""", """,
sourceDriverEntityId,
cardNation,
cardNumber,
driverId, driverId,
sourceDriverEntityId, driverCardId,
cardNation, driverId,
cardNumber driverId
); );
} }
@ -369,4 +874,10 @@ public class DriverIdentityRepository {
String trimmed = value.trim(); String trimmed = value.trim();
return trimmed.isEmpty() ? null : trimmed; return trimmed.isEmpty() ? null : trimmed;
} }
public record ResolvedDriverReference(UUID driverId, UUID driverCardId) {
public static ResolvedDriverReference empty() {
return new ResolvedDriverReference(null, null);
}
}
} }

View File

@ -57,11 +57,12 @@ public class EventRepository {
*/ */
public int batchInsert(UUID packageId, String tenantKey, int eventSourceId, List<EventHubEventDto> events) { public int batchInsert(UUID packageId, String tenantKey, int eventSourceId, List<EventHubEventDto> events) {
Map<String, UUID> entityIdCache = new HashMap<>(); Map<String, UUID> entityIdCache = new HashMap<>();
Map<String, DriverIdentityRepository.ResolvedDriverReference> driverRefCache = new HashMap<>();
Map<String, List<VehicleRefCacheEntry>> vehicleRefCache = new HashMap<>(); Map<String, List<VehicleRefCacheEntry>> vehicleRefCache = new HashMap<>();
List<ResolvedEventImportRow> rows = new ArrayList<>(events.size()); List<ResolvedEventImportRow> rows = new ArrayList<>(events.size());
for (EventHubEventDto event : events) { for (EventHubEventDto event : events) {
ResolvedEntityRefs refs = resolveEntityRefs(tenantKey, eventSourceId, event, entityIdCache, vehicleRefCache); ResolvedEntityRefs refs = resolveEntityRefs(tenantKey, eventSourceId, event, entityIdCache, driverRefCache, vehicleRefCache);
rows.add(resolveEventImportRow(packageId, eventSourceId, event, refs)); rows.add(resolveEventImportRow(packageId, eventSourceId, event, refs));
} }
@ -93,6 +94,7 @@ public class EventRepository {
eventSourceId, eventSourceId,
event.externalSourceEventId(), event.externalSourceEventId(),
refs.driverId(), refs.driverId(),
refs.driverCardId(),
refs.vehicleId(), refs.vehicleId(),
refs.vehicleRegistrationId(), refs.vehicleRegistrationId(),
sourcePackageId, sourcePackageId,
@ -125,6 +127,7 @@ public class EventRepository {
event_source_id integer not null, event_source_id integer not null,
external_source_event_id text not null, external_source_event_id text not null,
driver_id uuid, driver_id uuid,
driver_card_id uuid,
vehicle_id uuid, vehicle_id uuid,
vehicle_registration_id uuid, vehicle_registration_id uuid,
source_package_id text, source_package_id text,
@ -152,11 +155,11 @@ public class EventRepository {
""" """
insert into eventhub_event_import_stage( insert into eventhub_event_import_stage(
row_no, source_record_key_hash, requested_event_id, data_package_id, event_source_id, row_no, source_record_key_hash, requested_event_id, data_package_id, event_source_id,
external_source_event_id, driver_id, vehicle_id, vehicle_registration_id, external_source_event_id, driver_id, driver_card_id, vehicle_id, vehicle_registration_id,
source_package_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at, source_package_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle, odometer_m, longitude, latitude, event_domain, event_type, lifecycle, odometer_m, longitude, latitude,
payload, manual_entry, event_signature_hash payload, manual_entry, event_signature_hash
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?) ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?)
""", """,
new BatchPreparedStatementSetter() { new BatchPreparedStatementSetter() {
@Override @Override
@ -169,22 +172,23 @@ public class EventRepository {
ps.setInt(5, row.eventSourceId()); ps.setInt(5, row.eventSourceId());
ps.setString(6, row.externalSourceEventId()); ps.setString(6, row.externalSourceEventId());
ps.setObject(7, row.driverId()); ps.setObject(7, row.driverId());
ps.setObject(8, row.vehicleId()); ps.setObject(8, row.driverCardId());
ps.setObject(9, row.vehicleRegistrationId()); ps.setObject(9, row.vehicleId());
ps.setString(10, row.sourcePackageId()); ps.setObject(10, row.vehicleRegistrationId());
ps.setObject(11, row.sourcePackageEntityId()); ps.setString(11, row.sourcePackageId());
ps.setObject(12, row.occurredAt()); ps.setObject(12, row.sourcePackageEntityId());
ps.setObject(13, row.receivedPartnerAt()); ps.setObject(13, row.occurredAt());
ps.setObject(14, row.receivedHubAt()); ps.setObject(14, row.receivedPartnerAt());
ps.setString(15, row.eventDomain()); ps.setObject(15, row.receivedHubAt());
ps.setString(16, row.eventType()); ps.setString(16, row.eventDomain());
ps.setString(17, row.lifecycle()); ps.setString(17, row.eventType());
ps.setObject(18, row.odometerM()); ps.setString(18, row.lifecycle());
ps.setObject(19, row.longitude()); ps.setObject(19, row.odometerM());
ps.setObject(20, row.latitude()); ps.setObject(20, row.longitude());
ps.setString(21, row.payloadJson()); ps.setObject(21, row.latitude());
ps.setBoolean(22, row.manualEntry()); ps.setString(22, row.payloadJson());
ps.setString(23, row.eventSignatureHash()); ps.setBoolean(23, row.manualEntry());
ps.setString(24, row.eventSignatureHash());
} }
@Override @Override
@ -236,7 +240,7 @@ public class EventRepository {
insert into eventhub.event( insert into eventhub.event(
id, event_source_id, data_package_id, id, event_source_id, data_package_id,
external_source_event_id, external_source_event_id,
driver_id, vehicle_id, vehicle_registration_id, driver_id, driver_card_id, vehicle_id, vehicle_registration_id,
source_package_id, source_package_entity_id, source_package_id, source_package_entity_id,
occurred_at, received_partner_at, received_hub_at, occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle, event_domain, event_type, lifecycle,
@ -247,7 +251,7 @@ public class EventRepository {
select select
source_record.event_id, stage.event_source_id, stage.data_package_id, source_record.event_id, stage.event_source_id, stage.data_package_id,
stage.external_source_event_id, stage.external_source_event_id,
stage.driver_id, stage.vehicle_id, stage.vehicle_registration_id, stage.driver_id, stage.driver_card_id, stage.vehicle_id, stage.vehicle_registration_id,
stage.source_package_id, stage.source_package_entity_id, stage.source_package_id, stage.source_package_entity_id,
source_record.event_occurred_at, stage.received_partner_at, stage.received_hub_at, source_record.event_occurred_at, stage.received_partner_at, stage.received_hub_at,
stage.event_domain, stage.event_type, stage.lifecycle, stage.event_domain, stage.event_type, stage.lifecycle,
@ -358,33 +362,39 @@ public class EventRepository {
int eventSourceId, int eventSourceId,
EventHubEventDto event, EventHubEventDto event,
Map<String, UUID> entityIdCache, Map<String, UUID> entityIdCache,
Map<String, DriverIdentityRepository.ResolvedDriverReference> driverRefCache,
Map<String, List<VehicleRefCacheEntry>> vehicleRefCache Map<String, List<VehicleRefCacheEntry>> vehicleRefCache
) { ) {
UUID driverId = resolveDriverId(tenantKey, eventSourceId, event, entityIdCache); DriverIdentityRepository.ResolvedDriverReference driverRef = resolveDriverReference(tenantKey, eventSourceId, event, driverRefCache);
ResolvedVehicleReference vehicleRef = resolveVehicleReference(tenantKey, eventSourceId, event, vehicleRefCache); ResolvedVehicleReference vehicleRef = resolveVehicleReference(tenantKey, eventSourceId, event, vehicleRefCache);
UUID sourcePackageEntityId = resolveSourcePackageEntityId(tenantKey, eventSourceId, event, entityIdCache); UUID sourcePackageEntityId = resolveSourcePackageEntityId(tenantKey, eventSourceId, event, entityIdCache);
return new ResolvedEntityRefs(driverId, vehicleRef.vehicleId(), vehicleRef.vehicleRegistrationId(), sourcePackageEntityId); return new ResolvedEntityRefs(
driverRef.driverId(),
driverRef.driverCardId(),
vehicleRef.vehicleId(),
vehicleRef.vehicleRegistrationId(),
sourcePackageEntityId
);
} }
private UUID resolveDriverId( private DriverIdentityRepository.ResolvedDriverReference resolveDriverReference(
String tenantKey, String tenantKey,
int eventSourceId, int eventSourceId,
EventHubEventDto event, EventHubEventDto event,
Map<String, UUID> entityIdCache Map<String, DriverIdentityRepository.ResolvedDriverReference> driverRefCache
) { ) {
DriverRefDto driverRef = event.driverRef(); DriverRefDto driverRef = event.driverRef();
if (driverRef == null || !driverRef.hasAnyReference()) { if (driverRef == null || !driverRef.hasAnyReference()) {
return null; return DriverIdentityRepository.ResolvedDriverReference.empty();
} }
String cacheKey = "DRIVER|" + driverRef.stableKey(); String cacheKey = "DRIVER|" + driverRef.stableKey();
UUID cached = entityIdCache.get(cacheKey); DriverIdentityRepository.ResolvedDriverReference cached = driverRefCache.get(cacheKey);
if (cached != null) { if (cached != null) {
return cached; return cached;
} }
UUID resolved = driverIdentityRepository.resolveOrCreateDriverId(tenantKey, eventSourceId, driverRef); DriverIdentityRepository.ResolvedDriverReference resolved =
if (resolved != null) { driverIdentityRepository.resolveOrCreateDriverReference(tenantKey, eventSourceId, driverRef);
entityIdCache.put(cacheKey, resolved); driverRefCache.put(cacheKey, resolved);
}
return resolved; return resolved;
} }
@ -587,6 +597,7 @@ public class EventRepository {
private record ResolvedEntityRefs( private record ResolvedEntityRefs(
UUID driverId, UUID driverId,
UUID driverCardId,
UUID vehicleId, UUID vehicleId,
UUID vehicleRegistrationId, UUID vehicleRegistrationId,
UUID sourcePackageEntityId UUID sourcePackageEntityId
@ -606,6 +617,7 @@ public class EventRepository {
int eventSourceId, int eventSourceId,
String externalSourceEventId, String externalSourceEventId,
UUID driverId, UUID driverId,
UUID driverCardId,
UUID vehicleId, UUID vehicleId,
UUID vehicleRegistrationId, UUID vehicleRegistrationId,
String sourcePackageId, String sourcePackageId,

View File

@ -5,8 +5,6 @@ import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
@ -47,16 +45,22 @@ public class VehicleIdentityRepository {
eventSourceId, eventSourceId,
sourceRegistrationEntityId, sourceRegistrationEntityId,
registrationNation, registrationNation,
registrationNumber, registrationNumber
occurredAt
); );
if (registrationId == null && (sourceRegistrationEntityId != null || registrationNumber != null)) { if (registrationId == null && (sourceRegistrationEntityId != null || registrationNumber != null)) {
registrationId = createRegistration( registrationId = createRegistration(
registrationNation,
registrationNumber,
null,
Map.of("source", "event")
);
}
if (registrationId != null && sourceRegistrationEntityId != null) {
upsertSourceVehicleRegistrationIdentity(
normalizedTenantKey, normalizedTenantKey,
eventSourceId, eventSourceId,
sourceRegistrationEntityId, sourceRegistrationEntityId,
registrationNation, registrationId,
registrationNumber,
null, null,
Map.of("source", "event") Map.of("source", "event")
); );
@ -70,24 +74,25 @@ public class VehicleIdentityRepository {
); );
AssignedVehicleReference assignedVehicle = null; AssignedVehicleReference assignedVehicle = null;
if (vehicleId == null && registrationId != null) { if (vehicleId == null && registrationId != null) {
assignedVehicle = resolveAssignedVehicleReference(registrationId, occurredAt); assignedVehicle = resolveAssignedVehicleReference(normalizedTenantKey, eventSourceId, registrationId, occurredAt);
vehicleId = assignedVehicle == null ? null : assignedVehicle.vehicleId(); vehicleId = assignedVehicle == null ? null : assignedVehicle.vehicleId();
} }
if (vehicleId == null && (sourceVehicleEntityId != null || vin != null)) { if (vehicleId == null && (sourceVehicleEntityId != null || vin != null)) {
vehicleId = createVehicle( vehicleId = createVehicle(vin);
}
if (vehicleId != null && sourceVehicleEntityId != null) {
upsertSourceVehicleIdentity(
normalizedTenantKey, normalizedTenantKey,
eventSourceId, eventSourceId,
sourceVehicleEntityId, sourceVehicleEntityId,
vin vehicleId,
null,
Map.of("source", "event")
); );
} }
touchVehicle(vehicleId, vin);
touchRegistration(registrationId, registrationNation, registrationNumber);
if (vehicleId != null) {
touchVehicle(vehicleId, sourceVehicleEntityId, vin);
}
if (registrationId != null) {
touchRegistration(registrationId, sourceRegistrationEntityId, registrationNation, registrationNumber);
}
return new ResolvedVehicleReferenceResolution( return new ResolvedVehicleReferenceResolution(
new ResolvedVehicleReference(vehicleId, registrationId), new ResolvedVehicleReference(vehicleId, registrationId),
assignedVehicle != null, assignedVehicle != null,
@ -102,16 +107,14 @@ public class VehicleIdentityRepository {
int updates = reconcileVehiclesFromMasterData(normalizedTenantKey, eventSourceId); int updates = reconcileVehiclesFromMasterData(normalizedTenantKey, eventSourceId);
updates += reconcileRegistrationsFromMasterData(normalizedTenantKey, eventSourceId); updates += reconcileRegistrationsFromMasterData(normalizedTenantKey, eventSourceId);
updates += projectVehicleRegistrationAssignments(normalizedTenantKey, eventSourceId); updates += projectVehicleRegistrationAssignments(normalizedTenantKey, eventSourceId);
return updates; return updates;
} }
private int reconcileVehiclesFromMasterData(String tenantKey, int eventSourceId) { private int reconcileVehiclesFromMasterData(String tenantKey, int eventSourceId) {
Long count = jdbcTemplate.queryForObject( int insertedVehicles = jdbcTemplate.update(
compatibleSourcesCte() + """ compatibleSourcesCte() + """
, master_vehicles as ( , master_vehicles as (
select distinct on ( select distinct on (
event_source_id,
nullif(trim(source_entity_id), ''), nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), '') nullif(trim(source_external_key), '')
) )
@ -126,72 +129,149 @@ public class VehicleIdentityRepository {
nullif(trim(source_entity_id), '') is not null nullif(trim(source_entity_id), '') is not null
or nullif(trim(source_external_key), '') is not null or nullif(trim(source_external_key), '') is not null
) )
order by event_source_id, order by nullif(trim(source_entity_id), ''),
nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), ''), nullif(trim(source_external_key), ''),
updated_at desc updated_at desc
), ),
updated_by_source as ( resolved_vehicles as (
update eventhub.vehicle vehicle select master.event_source_id,
set source_vehicle_entity_id = coalesce(vehicle.source_vehicle_entity_id, master.source_vehicle_entity_id), master.source_vehicle_entity_id,
vin = coalesce(vehicle.vin, master.vin), master.vin,
updated_at = now() coalesce(identity.vehicle_id, existing.id, gen_random_uuid()) as vehicle_id
from master_vehicles master
where vehicle.tenant_key = ?
and vehicle.event_source_id in (select id from compatible_sources)
and master.source_vehicle_entity_id is not null
and vehicle.source_vehicle_entity_id = master.source_vehicle_entity_id
returning vehicle.id
),
updated_by_vin as (
update eventhub.vehicle vehicle
set source_vehicle_entity_id = coalesce(vehicle.source_vehicle_entity_id, master.source_vehicle_entity_id),
vin = coalesce(vehicle.vin, master.vin),
updated_at = now()
from master_vehicles master
where vehicle.tenant_key = ?
and vehicle.event_source_id in (select id from compatible_sources)
and master.vin is not null
and vehicle.vin = master.vin
returning vehicle.id
),
inserted as (
insert into eventhub.vehicle(id, tenant_key, event_source_id, source_vehicle_entity_id, vin, updated_at)
select gen_random_uuid(), ?, master.event_source_id, master.source_vehicle_entity_id, master.vin, now()
from master_vehicles master from master_vehicles master
left join eventhub.source_vehicle_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_vehicle_entity_id = master.source_vehicle_entity_id
left join eventhub.vehicle existing
on master.vin is not null
and existing.vin = master.vin
)
insert into eventhub.vehicle(id, vin, updated_at)
select distinct on (resolved.vehicle_id)
resolved.vehicle_id,
resolved.vin,
now()
from resolved_vehicles resolved
where not exists ( where not exists (
select 1 select 1
from eventhub.vehicle existing from eventhub.vehicle existing
where existing.tenant_key = ? where existing.id = resolved.vehicle_id
and existing.event_source_id in (select id from compatible_sources)
and (
(master.source_vehicle_entity_id is not null and existing.source_vehicle_entity_id = master.source_vehicle_entity_id)
or (master.vin is not null and existing.vin = master.vin)
) )
)
returning id
)
select (select count(*) from updated_by_source)
+ (select count(*) from updated_by_vin)
+ (select count(*) from inserted)
""", """,
Long.class,
eventSourceId, eventSourceId,
tenantKey, tenantKey,
tenantKey
);
int updatedVehicles = jdbcTemplate.update(
compatibleSourcesCte() + """
, master_vehicles as (
select distinct on (
nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), '')
)
nullif(trim(source_entity_id), '') as source_vehicle_entity_id,
nullif(trim(source_external_key), '') as vin
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE'
and (
nullif(trim(source_entity_id), '') is not null
or nullif(trim(source_external_key), '') is not null
)
order by nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), ''),
updated_at desc
)
update eventhub.vehicle vehicle
set vin = coalesce(vehicle.vin, master.vin),
updated_at = now()
from master_vehicles master
left join eventhub.source_vehicle_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_vehicle_entity_id = master.source_vehicle_entity_id
where vehicle.id = coalesce(identity.vehicle_id, (
select existing.id
from eventhub.vehicle existing
where master.vin is not null
and existing.vin = master.vin
limit 1
))
""",
eventSourceId,
tenantKey, tenantKey,
tenantKey
);
int linkedVehicles = jdbcTemplate.update(
compatibleSourcesCte() + """
, master_vehicles as (
select distinct on (
nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), '')
)
event_source_id,
nullif(trim(source_entity_id), '') as source_vehicle_entity_id,
nullif(trim(source_external_key), '') as vin
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE'
and nullif(trim(source_entity_id), '') is not null
order by nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), ''),
updated_at desc
),
resolved_vehicles as (
select master.event_source_id,
master.source_vehicle_entity_id,
coalesce(identity.vehicle_id, existing.id) as vehicle_id
from master_vehicles master
left join eventhub.source_vehicle_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_vehicle_entity_id = master.source_vehicle_entity_id
left join eventhub.vehicle existing
on master.vin is not null
and existing.vin = master.vin
)
insert into eventhub.source_vehicle_identity(
id, tenant_key, event_source_id, source_vehicle_entity_id,
vehicle_id, source_updated_at, payload, updated_at
)
select gen_random_uuid(),
?,
resolved.event_source_id,
resolved.source_vehicle_entity_id,
resolved.vehicle_id,
null,
'{}'::jsonb,
now()
from resolved_vehicles resolved
where resolved.source_vehicle_entity_id is not null
and resolved.vehicle_id is not null
on conflict (tenant_key, event_source_id, source_vehicle_entity_id)
do update set
vehicle_id = excluded.vehicle_id,
updated_at = now()
""",
eventSourceId,
tenantKey, tenantKey,
tenantKey, tenantKey,
tenantKey tenantKey
); );
return count == null ? 0 : Math.toIntExact(count);
return insertedVehicles + updatedVehicles + linkedVehicles;
} }
private int reconcileRegistrationsFromMasterData(String tenantKey, int eventSourceId) { private int reconcileRegistrationsFromMasterData(String tenantKey, int eventSourceId) {
Long count = jdbcTemplate.queryForObject( int insertedRegistrations = jdbcTemplate.update(
compatibleSourcesCte() + """ compatibleSourcesCte() + """
, master_registrations as ( , master_registrations as (
select distinct on ( select distinct on (
event_source_id,
nullif(trim(source_entity_id), ''), nullif(trim(source_entity_id), ''),
coalesce( coalesce(
nullif(trim(payload ->> 'registration_nation'), ''), nullif(trim(payload ->> 'registration_nation'), ''),
@ -225,8 +305,7 @@ public class VehicleIdentityRepository {
nullif(trim(payload ->> 'registration_number'), '') is not null nullif(trim(payload ->> 'registration_number'), '') is not null
or source_external_key like '%:%' or source_external_key like '%:%'
) )
order by event_source_id, order by nullif(trim(source_entity_id), ''),
nullif(trim(source_entity_id), ''),
coalesce( coalesce(
nullif(trim(payload ->> 'registration_nation'), ''), nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '') nullif(split_part(source_external_key, ':', 1), '')
@ -237,80 +316,198 @@ public class VehicleIdentityRepository {
), ),
updated_at desc updated_at desc
), ),
updated_by_source as ( resolved_registrations as (
update eventhub.vehicle_registration registration select master.event_source_id,
set source_registration_entity_id = coalesce(registration.source_registration_entity_id, master.source_registration_entity_id),
nation = coalesce(master.nation, registration.nation),
registration_number = coalesce(master.registration_number, registration.registration_number),
source_updated_at = master.source_updated_at,
updated_at = now()
from master_registrations master
where registration.tenant_key = ?
and registration.event_source_id in (select id from compatible_sources)
and master.source_registration_entity_id is not null
and registration.source_registration_entity_id = master.source_registration_entity_id
returning registration.id
),
updated_by_plate as (
update eventhub.vehicle_registration registration
set source_registration_entity_id = coalesce(registration.source_registration_entity_id, master.source_registration_entity_id),
nation = coalesce(master.nation, registration.nation),
registration_number = coalesce(master.registration_number, registration.registration_number),
source_updated_at = master.source_updated_at,
updated_at = now()
from master_registrations master
where registration.tenant_key = ?
and registration.event_source_id in (select id from compatible_sources)
and registration.nation = master.nation
and registration.registration_number = master.registration_number
returning registration.id
),
inserted as (
insert into eventhub.vehicle_registration(
id, tenant_key, event_source_id, source_registration_entity_id, nation, registration_number,
source_updated_at, payload, updated_at
)
select gen_random_uuid(),
?,
master.event_source_id,
master.source_registration_entity_id, master.source_registration_entity_id,
master.nation, master.nation,
master.registration_number, master.registration_number,
master.source_updated_at, master.source_updated_at,
coalesce(identity.vehicle_registration_id, existing.id, gen_random_uuid()) as registration_id
from master_registrations master
left join eventhub.source_vehicle_registration_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_registration_entity_id = master.source_registration_entity_id
left join eventhub.vehicle_registration existing
on existing.nation = master.nation
and existing.registration_number = master.registration_number
)
insert into eventhub.vehicle_registration(
id, nation, registration_number, source_updated_at, payload, updated_at
)
select distinct on (resolved.registration_id)
resolved.registration_id,
resolved.nation,
resolved.registration_number,
resolved.source_updated_at,
jsonb_build_object('source', 'master-data'), jsonb_build_object('source', 'master-data'),
now() now()
from master_registrations master from resolved_registrations resolved
where master.nation is not null where resolved.nation is not null
and master.registration_number is not null and resolved.registration_number is not null
and not exists ( and not exists (
select 1 select 1
from eventhub.vehicle_registration existing from eventhub.vehicle_registration existing
where existing.tenant_key = ? where existing.id = resolved.registration_id
and existing.event_source_id in (select id from compatible_sources)
and (
(master.source_registration_entity_id is not null
and existing.source_registration_entity_id = master.source_registration_entity_id)
or (
existing.nation = master.nation
and existing.registration_number = master.registration_number
) )
)
)
returning id
)
select (select count(*) from updated_by_source)
+ (select count(*) from updated_by_plate)
+ (select count(*) from inserted)
""", """,
Long.class,
eventSourceId, eventSourceId,
tenantKey, tenantKey,
tenantKey
);
int updatedRegistrations = jdbcTemplate.update(
compatibleSourcesCte() + """
, master_registrations as (
select distinct on (
nullif(trim(source_entity_id), ''),
coalesce(
nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '')
),
coalesce(
nullif(trim(payload ->> 'registration_number'), ''),
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
)
)
nullif(trim(source_entity_id), '') as source_registration_entity_id,
coalesce(
nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '')
) as nation,
coalesce(
nullif(trim(payload ->> 'registration_number'), ''),
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
) as registration_number,
source_updated_at
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE_REGISTRATION'
and (
nullif(trim(payload ->> 'registration_nation'), '') is not null
or source_external_key like '%:%'
)
and (
nullif(trim(payload ->> 'registration_number'), '') is not null
or source_external_key like '%:%'
)
order by nullif(trim(source_entity_id), ''),
coalesce(
nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '')
),
coalesce(
nullif(trim(payload ->> 'registration_number'), ''),
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
),
updated_at desc
)
update eventhub.vehicle_registration registration
set source_updated_at = coalesce(master.source_updated_at, registration.source_updated_at),
payload = registration.payload || jsonb_build_object('source', 'master-data'),
updated_at = now()
from master_registrations master
left join eventhub.source_vehicle_registration_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_registration_entity_id = master.source_registration_entity_id
where registration.id = coalesce(identity.vehicle_registration_id, (
select existing.id
from eventhub.vehicle_registration existing
where existing.nation = master.nation
and existing.registration_number = master.registration_number
limit 1
))
""",
eventSourceId,
tenantKey, tenantKey,
tenantKey
);
int linkedRegistrations = jdbcTemplate.update(
compatibleSourcesCte() + """
, master_registrations as (
select distinct on (
nullif(trim(source_entity_id), ''),
coalesce(
nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '')
),
coalesce(
nullif(trim(payload ->> 'registration_number'), ''),
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
)
)
event_source_id,
nullif(trim(source_entity_id), '') as source_registration_entity_id,
coalesce(
nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '')
) as nation,
coalesce(
nullif(trim(payload ->> 'registration_number'), ''),
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
) as registration_number,
source_updated_at
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE_REGISTRATION'
and nullif(trim(source_entity_id), '') is not null
order by nullif(trim(source_entity_id), ''),
coalesce(
nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '')
),
coalesce(
nullif(trim(payload ->> 'registration_number'), ''),
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
),
updated_at desc
),
resolved_registrations as (
select master.event_source_id,
master.source_registration_entity_id,
master.source_updated_at,
coalesce(identity.vehicle_registration_id, existing.id) as registration_id
from master_registrations master
left join eventhub.source_vehicle_registration_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_registration_entity_id = master.source_registration_entity_id
left join eventhub.vehicle_registration existing
on existing.nation = master.nation
and existing.registration_number = master.registration_number
)
insert into eventhub.source_vehicle_registration_identity(
id, tenant_key, event_source_id, source_registration_entity_id,
vehicle_registration_id, source_updated_at, payload, updated_at
)
select gen_random_uuid(),
?,
resolved.event_source_id,
resolved.source_registration_entity_id,
resolved.registration_id,
resolved.source_updated_at,
'{}'::jsonb,
now()
from resolved_registrations resolved
where resolved.source_registration_entity_id is not null
and resolved.registration_id is not null
on conflict (tenant_key, event_source_id, source_registration_entity_id)
do update set
vehicle_registration_id = excluded.vehicle_registration_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_vehicle_registration_identity.source_updated_at),
updated_at = now()
""",
eventSourceId,
tenantKey, tenantKey,
tenantKey, tenantKey,
tenantKey tenantKey
); );
return count == null ? 0 : Math.toIntExact(count);
return insertedRegistrations + updatedRegistrations + linkedRegistrations;
} }
private UUID resolveVehicleId( private UUID resolveVehicleId(
@ -321,7 +518,7 @@ public class VehicleIdentityRepository {
) { ) {
UUID vehicleId = findVehicleBySourceVehicleEntityId(tenantKey, eventSourceId, sourceVehicleEntityId); UUID vehicleId = findVehicleBySourceVehicleEntityId(tenantKey, eventSourceId, sourceVehicleEntityId);
if (vehicleId == null) { if (vehicleId == null) {
vehicleId = findVehicleByVin(tenantKey, eventSourceId, vin); vehicleId = findVehicleByVin(vin);
} }
return vehicleId; return vehicleId;
} }
@ -331,12 +528,11 @@ public class VehicleIdentityRepository {
int eventSourceId, int eventSourceId,
String sourceRegistrationEntityId, String sourceRegistrationEntityId,
String nation, String nation,
String registrationNumber, String registrationNumber
OffsetDateTime occurredAt
) { ) {
UUID registrationId = findRegistrationBySourceRegistrationEntityId(tenantKey, eventSourceId, sourceRegistrationEntityId, occurredAt); UUID registrationId = findRegistrationBySourceRegistrationEntityId(tenantKey, eventSourceId, sourceRegistrationEntityId);
if (registrationId == null) { if (registrationId == null) {
registrationId = findRegistrationByPlate(tenantKey, eventSourceId, nation, registrationNumber, occurredAt); registrationId = findRegistrationByPlate(nation, registrationNumber);
} }
return registrationId; return registrationId;
} }
@ -347,38 +543,34 @@ public class VehicleIdentityRepository {
} }
return jdbcTemplate.query( return jdbcTemplate.query(
compatibleSourcesCte() + """ compatibleSourcesCte() + """
select v.id select identity.vehicle_id
from eventhub.vehicle v from eventhub.source_vehicle_identity identity
where v.tenant_key = ? where identity.tenant_key = ?
and v.event_source_id in (select id from compatible_sources) and identity.event_source_id in (select id from compatible_sources)
and v.source_vehicle_entity_id = ? and identity.source_vehicle_entity_id = ?
order by v.updated_at desc order by identity.updated_at desc
limit 1 limit 1
""", """,
rs -> rs.next() ? (UUID) rs.getObject("id") : null, rs -> rs.next() ? (UUID) rs.getObject("vehicle_id") : null,
eventSourceId, eventSourceId,
tenantKey, tenantKey,
sourceVehicleEntityId sourceVehicleEntityId
); );
} }
private UUID findVehicleByVin(String tenantKey, int eventSourceId, String vin) { private UUID findVehicleByVin(String vin) {
if (vin == null) { if (vin == null) {
return null; return null;
} }
return jdbcTemplate.query( return jdbcTemplate.query(
compatibleSourcesCte() + """ """
select v.id select v.id
from eventhub.vehicle v from eventhub.vehicle v
where v.tenant_key = ? where v.vin = ?
and v.event_source_id in (select id from compatible_sources)
and v.vin = ?
order by v.updated_at desc order by v.updated_at desc
limit 1 limit 1
""", """,
rs -> rs.next() ? (UUID) rs.getObject("id") : null, rs -> rs.next() ? (UUID) rs.getObject("id") : null,
eventSourceId,
tenantKey,
vin vin
); );
} }
@ -386,64 +578,60 @@ public class VehicleIdentityRepository {
private UUID findRegistrationBySourceRegistrationEntityId( private UUID findRegistrationBySourceRegistrationEntityId(
String tenantKey, String tenantKey,
int eventSourceId, int eventSourceId,
String sourceRegistrationEntityId, String sourceRegistrationEntityId
OffsetDateTime occurredAt
) { ) {
if (sourceRegistrationEntityId == null) { if (sourceRegistrationEntityId == null) {
return null; return null;
} }
return jdbcTemplate.query( return jdbcTemplate.query(
compatibleSourcesCte() + """ compatibleSourcesCte() + """
select r.id select identity.vehicle_registration_id
from eventhub.vehicle_registration r from eventhub.source_vehicle_registration_identity identity
where r.tenant_key = ? where identity.tenant_key = ?
and r.event_source_id in (select id from compatible_sources) and identity.event_source_id in (select id from compatible_sources)
and r.source_registration_entity_id = ? and identity.source_registration_entity_id = ?
order by r.updated_at desc order by identity.updated_at desc
limit 1 limit 1
""", """,
rs -> rs.next() ? (UUID) rs.getObject("id") : null, rs -> rs.next() ? (UUID) rs.getObject("vehicle_registration_id") : null,
eventSourceId, eventSourceId,
tenantKey, tenantKey,
sourceRegistrationEntityId sourceRegistrationEntityId
); );
} }
private UUID findRegistrationByPlate( private UUID findRegistrationByPlate(String nation, String registrationNumber) {
String tenantKey,
int eventSourceId,
String nation,
String registrationNumber,
OffsetDateTime occurredAt
) {
if (nation == null || registrationNumber == null) { if (nation == null || registrationNumber == null) {
return null; return null;
} }
return jdbcTemplate.query( return jdbcTemplate.query(
compatibleSourcesCte() + """ """
select r.id select r.id
from eventhub.vehicle_registration r from eventhub.vehicle_registration r
where r.tenant_key = ? where r.nation = ?
and r.event_source_id in (select id from compatible_sources)
and r.nation = ?
and r.registration_number = ? and r.registration_number = ?
order by r.updated_at desc order by r.updated_at desc
limit 1 limit 1
""", """,
rs -> rs.next() ? (UUID) rs.getObject("id") : null, rs -> rs.next() ? (UUID) rs.getObject("id") : null,
eventSourceId,
tenantKey,
nation, nation,
registrationNumber registrationNumber
); );
} }
private AssignedVehicleReference resolveAssignedVehicleReference(UUID registrationId, OffsetDateTime occurredAt) { private AssignedVehicleReference resolveAssignedVehicleReference(
String tenantKey,
int eventSourceId,
UUID registrationId,
OffsetDateTime occurredAt
) {
return jdbcTemplate.query( return jdbcTemplate.query(
""" compatibleSourcesCte() + """
select vehicle_id, valid_from, valid_to select vehicle_id, valid_from, valid_to
from eventhub.vehicle_registration_assignment from eventhub.vehicle_registration_assignment
where vehicle_registration_id = ? where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and vehicle_registration_id = ?
and (? is null or valid_from is null or valid_from <= ?) and (? is null or valid_from is null or valid_from <= ?)
and (? is null or valid_to is null or ? < valid_to) and (? is null or valid_to is null or ? < valid_to)
order by valid_from desc nulls last, updated_at desc order by valid_from desc nulls last, updated_at desc
@ -456,6 +644,8 @@ public class VehicleIdentityRepository {
rs.getObject("valid_to", OffsetDateTime.class) rs.getObject("valid_to", OffsetDateTime.class)
) )
: null, : null,
eventSourceId,
tenantKey,
registrationId, registrationId,
occurredAt, occurredAt,
occurredAt, occurredAt,
@ -464,31 +654,20 @@ public class VehicleIdentityRepository {
); );
} }
private UUID createVehicle( private UUID createVehicle(String vin) {
String tenantKey,
int eventSourceId,
String sourceVehicleEntityId,
String vin
) {
UUID vehicleId = UUID.randomUUID(); UUID vehicleId = UUID.randomUUID();
jdbcTemplate.update( jdbcTemplate.update(
""" """
insert into eventhub.vehicle(id, tenant_key, event_source_id, source_vehicle_entity_id, vin, updated_at) insert into eventhub.vehicle(id, vin, updated_at)
values (?, ?, ?, ?, ?, now()) values (?, ?, now())
""", """,
vehicleId, vehicleId,
tenantKey,
eventSourceId,
sourceVehicleEntityId,
vin vin
); );
return vehicleId; return vehicleId;
} }
private UUID createRegistration( private UUID createRegistration(
String tenantKey,
int eventSourceId,
String sourceRegistrationEntityId,
String nation, String nation,
String registrationNumber, String registrationNumber,
OffsetDateTime sourceUpdatedAt, OffsetDateTime sourceUpdatedAt,
@ -501,14 +680,10 @@ public class VehicleIdentityRepository {
jdbcTemplate.update( jdbcTemplate.update(
""" """
insert into eventhub.vehicle_registration( insert into eventhub.vehicle_registration(
id, tenant_key, event_source_id, source_registration_entity_id, nation, registration_number, id, nation, registration_number, source_updated_at, payload, updated_at
source_updated_at, payload, updated_at ) values (?, ?, ?, ?, ?::jsonb, now())
) values (?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
""", """,
registrationId, registrationId,
tenantKey,
eventSourceId,
sourceRegistrationEntityId,
nation, nation,
registrationNumber, registrationNumber,
sourceUpdatedAt, sourceUpdatedAt,
@ -517,27 +692,88 @@ public class VehicleIdentityRepository {
return registrationId; return registrationId;
} }
private void touchVehicle(UUID vehicleId, String sourceVehicleEntityId, String vin) { private void upsertSourceVehicleIdentity(
if (sourceVehicleEntityId == null && vin == null) { String tenantKey,
int eventSourceId,
String sourceVehicleEntityId,
UUID vehicleId,
OffsetDateTime sourceUpdatedAt,
Map<String, Object> payload
) {
if (sourceVehicleEntityId == null || vehicleId == null) {
return;
}
jdbcTemplate.update(
"""
insert into eventhub.source_vehicle_identity(
id, tenant_key, event_source_id, source_vehicle_entity_id,
vehicle_id, source_updated_at, payload, updated_at
) values (?, ?, ?, ?, ?, ?, ?::jsonb, now())
on conflict (tenant_key, event_source_id, source_vehicle_entity_id)
do update set
vehicle_id = excluded.vehicle_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_vehicle_identity.source_updated_at),
payload = eventhub.source_vehicle_identity.payload || excluded.payload,
updated_at = now()
""",
UUID.randomUUID(),
tenantKey,
eventSourceId,
sourceVehicleEntityId,
vehicleId,
sourceUpdatedAt,
toJson(payload)
);
}
private void upsertSourceVehicleRegistrationIdentity(
String tenantKey,
int eventSourceId,
String sourceRegistrationEntityId,
UUID registrationId,
OffsetDateTime sourceUpdatedAt,
Map<String, Object> payload
) {
if (sourceRegistrationEntityId == null || registrationId == null) {
return;
}
jdbcTemplate.update(
"""
insert into eventhub.source_vehicle_registration_identity(
id, tenant_key, event_source_id, source_registration_entity_id,
vehicle_registration_id, source_updated_at, payload, updated_at
) values (?, ?, ?, ?, ?, ?, ?::jsonb, now())
on conflict (tenant_key, event_source_id, source_registration_entity_id)
do update set
vehicle_registration_id = excluded.vehicle_registration_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_vehicle_registration_identity.source_updated_at),
payload = eventhub.source_vehicle_registration_identity.payload || excluded.payload,
updated_at = now()
""",
UUID.randomUUID(),
tenantKey,
eventSourceId,
sourceRegistrationEntityId,
registrationId,
sourceUpdatedAt,
toJson(payload)
);
}
private void touchVehicle(UUID vehicleId, String vin) {
if (vehicleId == null || vin == null) {
return; return;
} }
jdbcTemplate.update( jdbcTemplate.update(
""" """
update eventhub.vehicle update eventhub.vehicle
set source_vehicle_entity_id = coalesce(source_vehicle_entity_id, cast(? as text)), set vin = coalesce(vin, cast(? as text)),
vin = coalesce(vin, cast(? as text)),
updated_at = now() updated_at = now()
where id = ? where id = ?
and ( and vin is null
(source_vehicle_entity_id is null and cast(? as text) is not null)
or (vin is null and cast(? as text) is not null)
)
""", """,
sourceVehicleEntityId,
vin, vin,
vehicleId, vehicleId
sourceVehicleEntityId,
vin
); );
} }
@ -551,8 +787,8 @@ public class VehicleIdentityRepository {
select gen_random_uuid(), select gen_random_uuid(),
relation.tenant_key, relation.tenant_key,
relation.event_source_id, relation.event_source_id,
registration.id, registration_identity.vehicle_registration_id,
vehicle.id, vehicle_identity.vehicle_id,
relation.valid_from, relation.valid_from,
relation.valid_to, relation.valid_to,
relation.source_updated_at, relation.source_updated_at,
@ -563,12 +799,14 @@ public class VehicleIdentityRepository {
), ),
now() now()
from eventhub.source_master_relation relation from eventhub.source_master_relation relation
join eventhub.vehicle_registration registration on registration.tenant_key = relation.tenant_key join eventhub.source_vehicle_registration_identity registration_identity
and registration.event_source_id = relation.event_source_id on registration_identity.tenant_key = relation.tenant_key
and registration.source_registration_entity_id = relation.from_source_entity_id and registration_identity.event_source_id = relation.event_source_id
join eventhub.vehicle vehicle on vehicle.tenant_key = relation.tenant_key and registration_identity.source_registration_entity_id = relation.from_source_entity_id
and vehicle.event_source_id = relation.event_source_id join eventhub.source_vehicle_identity vehicle_identity
and vehicle.source_vehicle_entity_id = relation.to_source_entity_id on vehicle_identity.tenant_key = relation.tenant_key
and vehicle_identity.event_source_id = relation.event_source_id
and vehicle_identity.source_vehicle_entity_id = relation.to_source_entity_id
where relation.tenant_key = ? where relation.tenant_key = ?
and relation.event_source_id in (select id from compatible_sources) and relation.event_source_id in (select id from compatible_sources)
and relation.relation_type = 'VEHICLE_REGISTRATION_VEHICLE' and relation.relation_type = 'VEHICLE_REGISTRATION_VEHICLE'
@ -577,8 +815,10 @@ public class VehicleIdentityRepository {
and not exists ( and not exists (
select 1 select 1
from eventhub.vehicle_registration_assignment existing from eventhub.vehicle_registration_assignment existing
where existing.vehicle_registration_id = registration.id where existing.tenant_key = relation.tenant_key
and existing.vehicle_id = vehicle.id and existing.event_source_id = relation.event_source_id
and existing.vehicle_registration_id = registration_identity.vehicle_registration_id
and existing.vehicle_id = vehicle_identity.vehicle_id
and existing.valid_from is not distinct from relation.valid_from and existing.valid_from is not distinct from relation.valid_from
and existing.valid_to is not distinct from relation.valid_to and existing.valid_to is not distinct from relation.valid_to
) )
@ -588,59 +828,30 @@ public class VehicleIdentityRepository {
); );
} }
private void touchRegistration(UUID registrationId, String sourceRegistrationEntityId, String nation, String registrationNumber) { private void touchRegistration(UUID registrationId, String nation, String registrationNumber) {
if (sourceRegistrationEntityId == null && nation == null && registrationNumber == null) { if (registrationId == null || (nation == null && registrationNumber == null)) {
return; return;
} }
jdbcTemplate.update( jdbcTemplate.update(
""" """
update eventhub.vehicle_registration update eventhub.vehicle_registration
set source_registration_entity_id = coalesce(source_registration_entity_id, cast(? as text)), set nation = coalesce(cast(? as text), nation),
nation = coalesce(cast(? as text), nation),
registration_number = coalesce(cast(? as text), registration_number), registration_number = coalesce(cast(? as text), registration_number),
updated_at = now() updated_at = now()
where id = ? where id = ?
and ( and (
(source_registration_entity_id is null and cast(? as text) is not null) (nation is null and cast(? as text) is not null)
or (nation is null and cast(? as text) is not null)
or (registration_number is null and cast(? as text) is not null) or (registration_number is null and cast(? as text) is not null)
) )
""", """,
sourceRegistrationEntityId,
nation, nation,
registrationNumber, registrationNumber,
registrationId, registrationId,
sourceRegistrationEntityId,
nation, nation,
registrationNumber registrationNumber
); );
} }
private void updateRegistrationFromMasterData(
UUID registrationId,
String sourceRegistrationEntityId,
String nation,
String registrationNumber,
OffsetDateTime sourceUpdatedAt
) {
jdbcTemplate.update(
"""
update eventhub.vehicle_registration
set source_registration_entity_id = coalesce(source_registration_entity_id, ?),
nation = coalesce(?, nation),
registration_number = coalesce(?, registration_number),
source_updated_at = ?,
updated_at = now()
where id = ?
""",
sourceRegistrationEntityId,
nation,
registrationNumber,
sourceUpdatedAt,
registrationId
);
}
private String compatibleSourcesCte() { private String compatibleSourcesCte() {
return """ return """
with source_context as ( with source_context as (
@ -706,5 +917,4 @@ public class VehicleIdentityRepository {
OffsetDateTime validTo OffsetDateTime validTo
) { ) {
} }
} }

View File

@ -145,11 +145,56 @@ create table if not exists eventhub.source_master_relation (
constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to)
); );
create table if not exists eventhub.vehicle ( create table if not exists eventhub.driver (
id uuid primary key,
first_names text,
last_name text,
birth_date date,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists eventhub.driver_card (
id uuid primary key,
driver_id uuid references eventhub.driver(id),
nation text not null,
card_number text not null,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists eventhub.source_driver_identity (
id uuid primary key, id uuid primary key,
tenant_key text not null, tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
source_vehicle_entity_id text, source_driver_entity_id text not null,
driver_id uuid not null references eventhub.driver(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_driver_identity unique (tenant_key, event_source_id, source_driver_entity_id)
);
create table if not exists eventhub.source_driver_card_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_driver_card_entity_id text not null,
driver_card_id uuid not null references eventhub.driver_card(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_driver_card_identity unique (tenant_key, event_source_id, source_driver_card_entity_id)
);
create table if not exists eventhub.vehicle (
id uuid primary key,
vin text, vin text,
created_at timestamptz not null default now(), created_at timestamptz not null default now(),
updated_at timestamptz not null default now() updated_at timestamptz not null default now()
@ -157,9 +202,6 @@ create table if not exists eventhub.vehicle (
create table if not exists eventhub.vehicle_registration ( create table if not exists eventhub.vehicle_registration (
id uuid primary key, id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_registration_entity_id text,
nation text not null, nation text not null,
registration_number text not null, registration_number text not null,
source_updated_at timestamptz, source_updated_at timestamptz,
@ -168,6 +210,32 @@ create table if not exists eventhub.vehicle_registration (
updated_at timestamptz not null default now() updated_at timestamptz not null default now()
); );
create table if not exists eventhub.source_vehicle_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_vehicle_entity_id text not null,
vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_vehicle_identity unique (tenant_key, event_source_id, source_vehicle_entity_id)
);
create table if not exists eventhub.source_vehicle_registration_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_registration_entity_id text not null,
vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_vehicle_registration_identity unique (tenant_key, event_source_id, source_registration_entity_id)
);
create table if not exists eventhub.vehicle_registration_assignment ( create table if not exists eventhub.vehicle_registration_assignment (
id uuid primary key, id uuid primary key,
tenant_key text not null, tenant_key text not null,
@ -188,7 +256,8 @@ create table if not exists eventhub.event (
event_source_id integer not null references eventhub.event_source(id), event_source_id integer not null references eventhub.event_source(id),
data_package_id uuid not null references eventhub.data_package(id), data_package_id uuid not null references eventhub.data_package(id),
external_source_event_id text not null, external_source_event_id text not null,
driver_entity_id uuid references eventhub.source_master_entity(id), driver_id uuid references eventhub.driver(id),
driver_card_id uuid references eventhub.driver_card(id),
vehicle_id uuid references eventhub.vehicle(id), vehicle_id uuid references eventhub.vehicle(id),
vehicle_registration_id uuid references eventhub.vehicle_registration(id), vehicle_registration_id uuid references eventhub.vehicle_registration(id),
source_package_id text, source_package_id text,
@ -208,7 +277,8 @@ create table if not exists eventhub.event (
created_at timestamptz not null default now(), created_at timestamptz not null default now(),
constraint pk_event primary key (occurred_at, id), constraint pk_event primary key (occurred_at, id),
constraint chk_event_driver_or_vehicle_ref check ( constraint chk_event_driver_or_vehicle_ref check (
driver_entity_id is not null driver_id is not null
or driver_card_id is not null
or vehicle_id is not null or vehicle_id is not null
or vehicle_registration_id is not null or vehicle_registration_id is not null
) )
@ -276,23 +346,12 @@ create index if not exists idx_source_master_relation_to
create index if not exists idx_source_master_relation_payload_gin create index if not exists idx_source_master_relation_payload_gin
on eventhub.source_master_relation using gin(payload); on eventhub.source_master_relation using gin(payload);
create index if not exists idx_vehicle_lookup_ctx
on eventhub.vehicle(tenant_key, event_source_id, updated_at desc);
create index if not exists idx_vehicle_source_entity
on eventhub.vehicle(tenant_key, event_source_id, source_vehicle_entity_id)
where source_vehicle_entity_id is not null;
create index if not exists idx_vehicle_vin create index if not exists idx_vehicle_vin
on eventhub.vehicle(tenant_key, event_source_id, vin) on eventhub.vehicle(vin)
where vin is not null; where vin is not null;
create index if not exists idx_vehicle_registration_source_entity
on eventhub.vehicle_registration(tenant_key, event_source_id, source_registration_entity_id)
where source_registration_entity_id is not null;
create index if not exists idx_vehicle_registration_plate create index if not exists idx_vehicle_registration_plate
on eventhub.vehicle_registration(tenant_key, event_source_id, nation, registration_number); on eventhub.vehicle_registration(nation, registration_number);
create index if not exists idx_vehicle_registration_assignment_registration_time create index if not exists idx_vehicle_registration_assignment_registration_time
on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to); on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to);
@ -320,9 +379,42 @@ create index if not exists idx_event_source_package_id
create index if not exists idx_event_domain_type_time create index if not exists idx_event_domain_type_time
on eventhub.event(event_domain, event_type, occurred_at desc); on eventhub.event(event_domain, event_type, occurred_at desc);
create index if not exists idx_driver_card_key
on eventhub.driver_card(nation, card_number);
create index if not exists idx_driver_card_driver
on eventhub.driver_card(driver_id)
where driver_id is not null;
create unique index if not exists ux_driver_card_key
on eventhub.driver_card(nation, card_number);
create unique index if not exists ux_vehicle_vin
on eventhub.vehicle(vin)
where vin is not null;
create unique index if not exists ux_vehicle_registration_plate
on eventhub.vehicle_registration(nation, registration_number);
create index if not exists idx_source_driver_identity_driver
on eventhub.source_driver_identity(driver_id);
create index if not exists idx_source_driver_card_identity_card
on eventhub.source_driver_card_identity(driver_card_id);
create index if not exists idx_source_vehicle_identity_vehicle
on eventhub.source_vehicle_identity(vehicle_id);
create index if not exists idx_source_vehicle_registration_identity_registration
on eventhub.source_vehicle_registration_identity(vehicle_registration_id);
create index if not exists idx_event_driver_time create index if not exists idx_event_driver_time
on eventhub.event(driver_entity_id, occurred_at desc) on eventhub.event(driver_id, occurred_at desc)
where driver_entity_id is not null; where driver_id is not null;
create index if not exists idx_event_driver_card_time
on eventhub.event(driver_card_id, occurred_at desc)
where driver_card_id is not null;
create index if not exists idx_event_vehicle_time create index if not exists idx_event_vehicle_time
on eventhub.event(vehicle_id, occurred_at desc) on eventhub.event(vehicle_id, occurred_at desc)
@ -344,3 +436,19 @@ create index if not exists idx_event_detail_type
create index if not exists idx_event_detail_attributes_gin create index if not exists idx_event_detail_attributes_gin
on eventhub.event_detail using gin(attributes); on eventhub.event_detail using gin(attributes);
create index if not exists idx_event_detail_yellowfox_slot
on eventhub.event_detail(detail_type, (attributes ->> 'slot'), event_occurred_at)
where detail_type in ('DRIVER_ACTIVITY', 'DRIVER_CARD');
create index if not exists idx_event_detail_yellowfox_eventtype_state
on eventhub.event_detail(
(attributes ->> 'yellowFoxEventType'),
(attributes ->> 'yellowFoxState'),
event_occurred_at
)
where attributes ? 'yellowFoxEventType';
create index if not exists idx_event_detail_yellowfox_ignition
on eventhub.event_detail(detail_type, (attributes ->> 'ignitionState'), event_occurred_at)
where attributes ? 'ignitionState';

View File

@ -0,0 +1,363 @@
create table if not exists eventhub.driver_card (
id uuid primary key,
driver_id uuid references eventhub.driver(id),
nation text not null,
card_number text not null,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists eventhub.source_driver_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_driver_entity_id text not null,
driver_id uuid not null references eventhub.driver(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_driver_identity unique (tenant_key, event_source_id, source_driver_entity_id)
);
create table if not exists eventhub.source_driver_card_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_driver_card_entity_id text not null,
driver_card_id uuid not null references eventhub.driver_card(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_driver_card_identity unique (tenant_key, event_source_id, source_driver_card_entity_id)
);
create table if not exists eventhub.source_vehicle_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_vehicle_entity_id text not null,
vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_vehicle_identity unique (tenant_key, event_source_id, source_vehicle_entity_id)
);
create table if not exists eventhub.source_vehicle_registration_identity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_registration_entity_id text not null,
vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_vehicle_registration_identity unique (tenant_key, event_source_id, source_registration_entity_id)
);
alter table eventhub.event
add column if not exists driver_card_id uuid references eventhub.driver_card(id);
insert into eventhub.source_driver_identity(
id, tenant_key, event_source_id, source_driver_entity_id,
driver_id, source_updated_at, payload, created_at, updated_at
)
select gen_random_uuid(),
driver.tenant_key,
driver.event_source_id,
driver.source_driver_entity_id,
driver.id,
driver.source_updated_at,
driver.payload,
coalesce(driver.created_at, now()),
coalesce(driver.updated_at, now())
from eventhub.driver driver
where exists (
select 1
from information_schema.columns
where table_schema = 'eventhub'
and table_name = 'driver'
and column_name = 'source_driver_entity_id'
)
and driver.source_driver_entity_id is not null
on conflict (tenant_key, event_source_id, source_driver_entity_id)
do update set
driver_id = excluded.driver_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_identity.source_updated_at),
payload = eventhub.source_driver_identity.payload || excluded.payload,
updated_at = now();
with legacy_driver_cards as (
select distinct on (driver.card_nation, driver.card_number)
driver.id as driver_id,
driver.card_nation as nation,
driver.card_number,
driver.source_updated_at,
driver.payload,
driver.created_at,
driver.updated_at
from eventhub.driver driver
where exists (
select 1
from information_schema.columns
where table_schema = 'eventhub'
and table_name = 'driver'
and column_name = 'card_nation'
)
and driver.card_nation is not null
and driver.card_number is not null
order by driver.card_nation,
driver.card_number,
case when driver.source_driver_entity_id is null then 1 else 0 end,
driver.updated_at desc,
driver.created_at desc,
driver.id
),
existing_driver_cards as (
select distinct on (card.nation, card.card_number)
card.id,
card.nation,
card.card_number
from eventhub.driver_card card
order by card.nation,
card.card_number,
case when card.driver_id is null then 1 else 0 end,
card.updated_at desc,
card.created_at desc,
card.id
),
resolved_cards as (
select legacy.*,
coalesce(existing.id, gen_random_uuid()) as driver_card_id
from legacy_driver_cards legacy
left join existing_driver_cards existing
on existing.nation = legacy.nation
and existing.card_number = legacy.card_number
),
inserted_cards as (
insert into eventhub.driver_card(
id, driver_id, nation, card_number, source_updated_at, payload, created_at, updated_at
)
select distinct on (resolved.driver_card_id)
resolved.driver_card_id,
resolved.driver_id,
resolved.nation,
resolved.card_number,
resolved.source_updated_at,
jsonb_build_object('migrated_from', 'eventhub.driver') || coalesce(resolved.payload, '{}'::jsonb),
resolved.created_at,
resolved.updated_at
from resolved_cards resolved
where not exists (
select 1
from eventhub.driver_card existing
where existing.id = resolved.driver_card_id
)
returning id
),
updated_cards as (
update eventhub.driver_card card
set driver_id = coalesce(card.driver_id, resolved.driver_id),
source_updated_at = coalesce(resolved.source_updated_at, card.source_updated_at),
payload = card.payload || jsonb_build_object('migrated_from', 'eventhub.driver') || coalesce(resolved.payload, '{}'::jsonb),
updated_at = now()
from resolved_cards resolved
where card.id = resolved.driver_card_id
returning card.id
)
select count(*) from inserted_cards
union all
select count(*) from updated_cards;
with single_card_per_driver as (
select driver_id,
min(id::text)::uuid as driver_card_id
from eventhub.driver_card
where driver_id is not null
group by driver_id
having count(*) = 1
)
update eventhub.event event
set driver_card_id = card.driver_card_id
from single_card_per_driver card
where event.driver_id = card.driver_id
and event.driver_card_id is null;
with ranked_driver_cards as (
select card.id,
card.driver_id,
card.source_updated_at,
first_value(card.id) over (
partition by card.nation, card.card_number
order by case when card.driver_id is null then 1 else 0 end,
card.updated_at desc,
card.created_at desc,
card.id
) as canonical_id,
row_number() over (
partition by card.nation, card.card_number
order by case when card.driver_id is null then 1 else 0 end,
card.updated_at desc,
card.created_at desc,
card.id
) as duplicate_rank
from eventhub.driver_card card
),
duplicate_driver_cards as (
select id, canonical_id, driver_id, source_updated_at
from ranked_driver_cards
where duplicate_rank > 1
),
duplicate_card_rollup as (
select duplicate.canonical_id,
(array_agg(duplicate.driver_id order by case when duplicate.driver_id is null then 1 else 0 end, duplicate.id))[1] as preferred_driver_id,
max(duplicate.source_updated_at) as latest_source_updated_at
from duplicate_driver_cards duplicate
group by duplicate.canonical_id
),
updated_canonical_cards as (
update eventhub.driver_card card
set driver_id = coalesce(card.driver_id, rollup.preferred_driver_id),
source_updated_at = case
when card.source_updated_at is null then rollup.latest_source_updated_at
when rollup.latest_source_updated_at is null then card.source_updated_at
else greatest(card.source_updated_at, rollup.latest_source_updated_at)
end,
updated_at = now()
from duplicate_card_rollup rollup
where card.id = rollup.canonical_id
returning card.id
),
relinked_source_driver_card_identity as (
update eventhub.source_driver_card_identity identity
set driver_card_id = duplicate.canonical_id,
updated_at = now()
from duplicate_driver_cards duplicate
where identity.driver_card_id = duplicate.id
returning identity.id
),
relinked_events as (
update eventhub.event event
set driver_card_id = duplicate.canonical_id
from duplicate_driver_cards duplicate
where event.driver_card_id = duplicate.id
returning event.id
),
deleted_duplicate_driver_cards as (
delete from eventhub.driver_card card
using duplicate_driver_cards duplicate
where card.id = duplicate.id
returning card.id
)
select (select count(*) from updated_canonical_cards) as updated_canonical_cards,
(select count(*) from relinked_source_driver_card_identity) as relinked_source_driver_card_identity,
(select count(*) from relinked_events) as relinked_events,
(select count(*) from deleted_duplicate_driver_cards) as deleted_duplicate_driver_cards;
insert into eventhub.source_vehicle_identity(
id, tenant_key, event_source_id, source_vehicle_entity_id,
vehicle_id, source_updated_at, payload, created_at, updated_at
)
select gen_random_uuid(),
vehicle.tenant_key,
vehicle.event_source_id,
vehicle.source_vehicle_entity_id,
vehicle.id,
null,
'{}'::jsonb,
coalesce(vehicle.created_at, now()),
coalesce(vehicle.updated_at, now())
from eventhub.vehicle vehicle
where exists (
select 1
from information_schema.columns
where table_schema = 'eventhub'
and table_name = 'vehicle'
and column_name = 'source_vehicle_entity_id'
)
and vehicle.source_vehicle_entity_id is not null
on conflict (tenant_key, event_source_id, source_vehicle_entity_id)
do update set
vehicle_id = excluded.vehicle_id,
updated_at = now();
insert into eventhub.source_vehicle_registration_identity(
id, tenant_key, event_source_id, source_registration_entity_id,
vehicle_registration_id, source_updated_at, payload, created_at, updated_at
)
select gen_random_uuid(),
registration.tenant_key,
registration.event_source_id,
registration.source_registration_entity_id,
registration.id,
registration.source_updated_at,
registration.payload,
coalesce(registration.created_at, now()),
coalesce(registration.updated_at, now())
from eventhub.vehicle_registration registration
where exists (
select 1
from information_schema.columns
where table_schema = 'eventhub'
and table_name = 'vehicle_registration'
and column_name = 'source_registration_entity_id'
)
and registration.source_registration_entity_id is not null
on conflict (tenant_key, event_source_id, source_registration_entity_id)
do update set
vehicle_registration_id = excluded.vehicle_registration_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_vehicle_registration_identity.source_updated_at),
payload = eventhub.source_vehicle_registration_identity.payload || excluded.payload,
updated_at = now();
create index if not exists idx_driver_card_key
on eventhub.driver_card(nation, card_number);
create index if not exists idx_driver_card_driver
on eventhub.driver_card(driver_id)
where driver_id is not null;
create unique index if not exists ux_driver_card_key
on eventhub.driver_card(nation, card_number);
create unique index if not exists ux_vehicle_vin
on eventhub.vehicle(vin)
where vin is not null;
create unique index if not exists ux_vehicle_registration_plate
on eventhub.vehicle_registration(nation, registration_number);
create index if not exists idx_source_driver_identity_driver
on eventhub.source_driver_identity(driver_id);
create index if not exists idx_source_driver_card_identity_card
on eventhub.source_driver_card_identity(driver_card_id);
create index if not exists idx_source_vehicle_identity_vehicle
on eventhub.source_vehicle_identity(vehicle_id);
create index if not exists idx_source_vehicle_registration_identity_registration
on eventhub.source_vehicle_registration_identity(vehicle_registration_id);
create index if not exists idx_event_driver_card_time
on eventhub.event(driver_card_id, occurred_at desc)
where driver_card_id is not null;
alter table eventhub.event
drop constraint if exists chk_event_driver_or_vehicle_ref;
alter table eventhub.event
add constraint chk_event_driver_or_vehicle_ref
check (
driver_id is not null
or driver_card_id is not null
or driver_entity_id is not null
or vehicle_id is not null
or vehicle_registration_id is not null
);

View File

@ -1,19 +1,22 @@
/* /*
* Repairs and normalizes tachograph driver aggregates after introducing eventhub.driver. * Repairs and normalizes tachograph driver identities after introducing:
* - global eventhub.driver
* - global eventhub.driver_card
* - source_driver_identity
* - source_driver_card_identity
* *
* What it does: * What it does:
* 1. Ensures tachograph DRIVER master-data payload carries last_name while keeping source_master_entity.display_name unchanged. * 1. Ensures tachograph DRIVER master-data payload carries last_name while
* 2. Upserts eventhub.driver rows from MASTER_DATA DRIVER entities. * keeping source_master_entity.display_name unchanged.
* 3. Projects card nation/number onto eventhub.driver from DRIVER_CARD_DRIVER relations. * 2. Upserts global eventhub.driver rows from tachograph DRIVER entities.
* 4. Remaps event.driver_id from provisional card-only drivers to proper source-driver aggregates when possible. * 3. Upserts global eventhub.driver_card rows from tachograph DRIVER_CARD entities.
* 5. Deletes now-unreferenced provisional tachograph driver rows with no source_driver_entity_id. * 4. Upserts source identity links for drivers and cards.
* * 5. Links cards to drivers using DRIVER_CARD_DRIVER master-data relations.
* Assumptions: * 6. Backfills event.driver_card_id where a driver has exactly one card.
* - Tachograph master-data source is provider_key=TACHOGRAPH, source_kind=MASTER_DATA, source_key=TACHOGRAPH_MASTER_DATA. * 7. Remaps event.driver_id from provisional rows to the linked canonical driver.
* - eventhub.driver and event.driver_id already exist. * 8. Deletes now-unreferenced provisional driver rows.
*/ */
-- 1) Keep display_name, but ensure DRIVER payload has last_name.
with master_sources as ( with master_sources as (
select es.id, es.tenant_key select es.id, es.tenant_key
from eventhub.event_source es from eventhub.event_source es
@ -40,22 +43,16 @@ updated_master_payload as (
select count(*) as updated_master_payload select count(*) as updated_master_payload
from updated_master_payload; from updated_master_payload;
-- 2) Upsert driver aggregates from tachograph master data.
with master_sources as ( with master_sources as (
select es.id, select es.id as master_event_source_id, es.tenant_key
es.tenant_key,
es.source_instance_key,
coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key
from eventhub.event_source es from eventhub.event_source es
where es.provider_key = 'TACHOGRAPH' where es.provider_key = 'TACHOGRAPH'
and es.source_kind = 'MASTER_DATA' and es.source_kind = 'MASTER_DATA'
and es.source_key = 'TACHOGRAPH_MASTER_DATA' and es.source_key = 'TACHOGRAPH_MASTER_DATA'
), ),
master_drivers as ( master_drivers as (
select ms.id as master_event_source_id, select ms.master_event_source_id,
ms.tenant_key, ms.tenant_key,
ms.source_instance_key,
ms.tenant_provider_setting_key,
d.source_entity_id as source_driver_entity_id, d.source_entity_id as source_driver_entity_id,
coalesce(nullif(trim(d.payload ->> 'first_names'), ''), nullif(trim(d.payload ->> 'firstnames'), '')) as first_names, coalesce(nullif(trim(d.payload ->> 'first_names'), ''), nullif(trim(d.payload ->> 'firstnames'), '')) as first_names,
coalesce(nullif(trim(d.payload ->> 'last_name'), ''), nullif(trim(d.payload ->> 'surname'), '')) as last_name, coalesce(nullif(trim(d.payload ->> 'last_name'), ''), nullif(trim(d.payload ->> 'surname'), '')) as last_name,
@ -65,173 +62,297 @@ master_drivers as (
from master_sources ms from master_sources ms
join eventhub.source_master_entity d join eventhub.source_master_entity d
on d.tenant_key = ms.tenant_key on d.tenant_key = ms.tenant_key
and d.event_source_id = ms.id and d.event_source_id = ms.master_event_source_id
and d.entity_type = 'DRIVER' and d.entity_type = 'DRIVER'
and d.source_entity_id not like 'DRIVER_CARD:%'
), ),
compatible_targets as ( resolved_drivers as (
select md.*, select md.*,
es.id as target_event_source_id coalesce(identity.driver_id, gen_random_uuid()) as driver_id
from master_drivers md from master_drivers md
join eventhub.event_source es left join eventhub.source_driver_identity identity
on es.tenant_key = md.tenant_key on identity.tenant_key = md.tenant_key
and es.provider_key = 'TACHOGRAPH' and identity.event_source_id = md.master_event_source_id
and es.source_instance_key = md.source_instance_key and identity.source_driver_entity_id = md.source_driver_entity_id
and coalesce(es.tenant_provider_setting_key, '') = md.tenant_provider_setting_key
),
updated_drivers as (
update eventhub.driver driver
set first_names = coalesce(ct.first_names, driver.first_names),
last_name = coalesce(ct.last_name, driver.last_name),
birth_date = coalesce(ct.birth_date, driver.birth_date),
source_updated_at = ct.source_updated_at,
payload = driver.payload || ct.payload,
updated_at = now()
from compatible_targets ct
where driver.tenant_key = ct.tenant_key
and driver.event_source_id = ct.target_event_source_id
and driver.source_driver_entity_id = ct.source_driver_entity_id
returning driver.id
), ),
inserted_drivers as ( inserted_drivers as (
insert into eventhub.driver( insert into eventhub.driver(
id, tenant_key, event_source_id, source_driver_entity_id, id, first_names, last_name, birth_date, source_updated_at, payload, updated_at
first_names, last_name, birth_date, source_updated_at, payload, updated_at
) )
select gen_random_uuid(), select distinct on (rd.driver_id)
ct.tenant_key, rd.driver_id,
ct.target_event_source_id, rd.first_names,
ct.source_driver_entity_id, rd.last_name,
ct.first_names, rd.birth_date,
ct.last_name, rd.source_updated_at,
ct.birth_date, rd.payload,
ct.source_updated_at,
ct.payload,
now() now()
from compatible_targets ct from resolved_drivers rd
where not exists ( where not exists (
select 1 select 1
from eventhub.driver existing from eventhub.driver existing
where existing.tenant_key = ct.tenant_key where existing.id = rd.driver_id
and existing.event_source_id = ct.target_event_source_id
and existing.source_driver_entity_id = ct.source_driver_entity_id
) )
returning id returning id
),
updated_drivers as (
update eventhub.driver driver
set first_names = coalesce(rd.first_names, driver.first_names),
last_name = coalesce(rd.last_name, driver.last_name),
birth_date = coalesce(rd.birth_date, driver.birth_date),
source_updated_at = coalesce(rd.source_updated_at, driver.source_updated_at),
payload = driver.payload || rd.payload,
updated_at = now()
from resolved_drivers rd
where driver.id = rd.driver_id
returning driver.id
),
upserted_source_driver_identity as (
insert into eventhub.source_driver_identity(
id, tenant_key, event_source_id, source_driver_entity_id,
driver_id, source_updated_at, payload, updated_at
)
select gen_random_uuid(),
rd.tenant_key,
rd.master_event_source_id,
rd.source_driver_entity_id,
rd.driver_id,
rd.source_updated_at,
rd.payload,
now()
from resolved_drivers rd
on conflict (tenant_key, event_source_id, source_driver_entity_id)
do update set
driver_id = excluded.driver_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_identity.source_updated_at),
payload = eventhub.source_driver_identity.payload || excluded.payload,
updated_at = now()
returning id
) )
select (select count(*) from updated_drivers) as updated_drivers, select (select count(*) from inserted_drivers) as inserted_drivers,
(select count(*) from inserted_drivers) as inserted_drivers; (select count(*) from updated_drivers) as updated_drivers,
(select count(*) from upserted_source_driver_identity) as upserted_source_driver_identity;
-- 3) Project driver-card identifiers from master-data relations.
with master_sources as ( with master_sources as (
select es.id, select es.id as master_event_source_id, es.tenant_key
es.tenant_key,
es.source_instance_key,
coalesce(es.tenant_provider_setting_key, '') as tenant_provider_setting_key
from eventhub.event_source es from eventhub.event_source es
where es.provider_key = 'TACHOGRAPH' where es.provider_key = 'TACHOGRAPH'
and es.source_kind = 'MASTER_DATA' and es.source_kind = 'MASTER_DATA'
and es.source_key = 'TACHOGRAPH_MASTER_DATA' and es.source_key = 'TACHOGRAPH_MASTER_DATA'
), ),
card_projection as ( master_cards as (
select distinct on (ms.tenant_key, ms.source_instance_key, ms.tenant_provider_setting_key, rel.to_source_entity_id) select ms.master_event_source_id,
ms.tenant_key, ms.tenant_key,
ms.source_instance_key, card.source_entity_id as source_driver_card_entity_id,
ms.tenant_provider_setting_key,
rel.to_source_entity_id as source_driver_entity_id,
nullif(trim(card.payload ->> 'card_nation'), '') as card_nation, nullif(trim(card.payload ->> 'card_nation'), '') as card_nation,
nullif(trim(card.payload ->> 'card_number'), '') as card_number, nullif(trim(card.payload ->> 'card_number'), '') as card_number,
rel.source_updated_at card.source_updated_at,
card.payload
from master_sources ms
join eventhub.source_master_entity card
on card.tenant_key = ms.tenant_key
and card.event_source_id = ms.master_event_source_id
and card.entity_type = 'DRIVER_CARD'
and nullif(trim(card.payload ->> 'card_nation'), '') is not null
and nullif(trim(card.payload ->> 'card_number'), '') is not null
),
canonical_driver_cards as (
select distinct on (mc.card_nation, mc.card_number)
mc.card_nation,
mc.card_number,
mc.source_updated_at,
mc.payload
from master_cards mc
order by mc.card_nation,
mc.card_number,
case when mc.source_driver_card_entity_id is null then 1 else 0 end,
mc.source_updated_at desc,
mc.source_driver_card_entity_id
),
existing_driver_cards as (
select distinct on (card.nation, card.card_number)
card.id,
card.nation,
card.card_number
from eventhub.driver_card card
order by card.nation,
card.card_number,
case when card.driver_id is null then 1 else 0 end,
card.updated_at desc,
card.created_at desc,
card.id
),
resolved_cards as (
select canonical.card_nation,
canonical.card_number,
canonical.source_updated_at,
canonical.payload,
coalesce(existing.id, gen_random_uuid()) as driver_card_id
from canonical_driver_cards canonical
left join existing_driver_cards existing
on existing.nation = canonical.card_nation
and existing.card_number = canonical.card_number
),
inserted_cards as (
insert into eventhub.driver_card(
id, driver_id, nation, card_number, source_updated_at, payload, updated_at
)
select distinct on (rc.driver_card_id)
rc.driver_card_id,
null,
rc.card_nation,
rc.card_number,
rc.source_updated_at,
rc.payload,
now()
from resolved_cards rc
where not exists (
select 1
from eventhub.driver_card existing
where existing.id = rc.driver_card_id
)
returning id
),
updated_cards as (
update eventhub.driver_card card
set source_updated_at = coalesce(rc.source_updated_at, card.source_updated_at),
payload = card.payload || rc.payload,
updated_at = now()
from resolved_cards rc
where card.id = rc.driver_card_id
returning card.id
),
upserted_source_driver_card_identity as (
insert into eventhub.source_driver_card_identity(
id, tenant_key, event_source_id, source_driver_card_entity_id,
driver_card_id, source_updated_at, payload, updated_at
)
select gen_random_uuid(),
mc.tenant_key,
mc.master_event_source_id,
mc.source_driver_card_entity_id,
coalesce(identity.driver_card_id, existing.id),
mc.source_updated_at,
mc.payload,
now()
from master_cards mc
left join eventhub.source_driver_card_identity identity
on identity.tenant_key = mc.tenant_key
and identity.event_source_id = mc.master_event_source_id
and identity.source_driver_card_entity_id = mc.source_driver_card_entity_id
left join existing_driver_cards existing
on existing.nation = mc.card_nation
and existing.card_number = mc.card_number
where mc.source_driver_card_entity_id is not null
and coalesce(identity.driver_card_id, existing.id) is not null
on conflict (tenant_key, event_source_id, source_driver_card_entity_id)
do update set
driver_card_id = excluded.driver_card_id,
source_updated_at = coalesce(excluded.source_updated_at, eventhub.source_driver_card_identity.source_updated_at),
payload = eventhub.source_driver_card_identity.payload || excluded.payload,
updated_at = now()
returning id
)
select (select count(*) from inserted_cards) as inserted_cards,
(select count(*) from updated_cards) as updated_cards,
(select count(*) from upserted_source_driver_card_identity) as upserted_source_driver_card_identity;
with master_sources as (
select es.id as master_event_source_id, es.tenant_key
from eventhub.event_source es
where es.provider_key = 'TACHOGRAPH'
and es.source_kind = 'MASTER_DATA'
and es.source_key = 'TACHOGRAPH_MASTER_DATA'
),
updated_card_links as (
update eventhub.driver_card card
set driver_id = driver_identity.driver_id,
source_updated_at = coalesce(rel.source_updated_at, card.source_updated_at),
updated_at = now()
from master_sources ms from master_sources ms
join eventhub.source_master_relation rel join eventhub.source_master_relation rel
on rel.tenant_key = ms.tenant_key on rel.tenant_key = ms.tenant_key
and rel.event_source_id = ms.id and rel.event_source_id = ms.master_event_source_id
and rel.relation_type = 'DRIVER_CARD_DRIVER' and rel.relation_type = 'DRIVER_CARD_DRIVER'
and rel.from_entity_type = 'DRIVER_CARD' and rel.from_entity_type = 'DRIVER_CARD'
and rel.to_entity_type = 'DRIVER' and rel.to_entity_type = 'DRIVER'
join eventhub.source_master_entity card join eventhub.source_driver_card_identity card_identity
on card.tenant_key = ms.tenant_key on card_identity.tenant_key = rel.tenant_key
and card.event_source_id = ms.id and card_identity.event_source_id = rel.event_source_id
and card.entity_type = 'DRIVER_CARD' and card_identity.source_driver_card_entity_id = rel.from_source_entity_id
and card.source_entity_id = rel.from_source_entity_id join eventhub.source_driver_identity driver_identity
order by ms.tenant_key, on driver_identity.tenant_key = rel.tenant_key
ms.source_instance_key, and driver_identity.event_source_id = rel.event_source_id
ms.tenant_provider_setting_key, and driver_identity.source_driver_entity_id = rel.to_source_entity_id
rel.to_source_entity_id, where card.id = card_identity.driver_card_id
rel.valid_to desc nulls last,
rel.valid_from desc nulls last,
rel.updated_at desc
),
updated_driver_cards as (
update eventhub.driver driver
set card_nation = coalesce(driver.card_nation, projection.card_nation),
card_number = coalesce(driver.card_number, projection.card_number),
source_updated_at = coalesce(projection.source_updated_at, driver.source_updated_at),
updated_at = now()
from card_projection projection
join eventhub.event_source es
on es.id = driver.event_source_id
where driver.tenant_key = projection.tenant_key
and es.provider_key = 'TACHOGRAPH'
and es.source_instance_key = projection.source_instance_key
and coalesce(es.tenant_provider_setting_key, '') = projection.tenant_provider_setting_key
and driver.source_driver_entity_id = projection.source_driver_entity_id
and ( and (
(driver.card_nation is null and projection.card_nation is not null) card.driver_id is null
or (driver.card_number is null and projection.card_number is not null) or card.driver_id = driver_identity.driver_id
or not exists (
select 1
from eventhub.source_driver_identity existing_identity
where existing_identity.driver_id = card.driver_id
) )
returning driver.id )
returning card.id
) )
select count(*) as updated_driver_cards select count(*) as updated_card_links
from updated_driver_cards; from updated_card_links;
-- 4) Remap events from provisional card-only drivers to proper source-driver aggregates. with single_card_per_driver as (
with provisional_to_real as ( select driver_id,
select provisional.id as provisional_driver_id, min(id::text)::uuid as driver_card_id
real.id as real_driver_id from eventhub.driver_card
from eventhub.driver provisional where driver_id is not null
join eventhub.event_source provisional_source group by driver_id
on provisional_source.id = provisional.event_source_id having count(*) = 1
and provisional_source.provider_key = 'TACHOGRAPH'
join eventhub.driver real
on real.tenant_key = provisional.tenant_key
and real.source_driver_entity_id is not null
and real.card_nation = provisional.card_nation
and real.card_number = provisional.card_number
join eventhub.event_source real_source
on real_source.id = real.event_source_id
and real_source.provider_key = provisional_source.provider_key
and real_source.tenant_key = provisional_source.tenant_key
and real_source.source_instance_key = provisional_source.source_instance_key
and coalesce(real_source.tenant_provider_setting_key, '') = coalesce(provisional_source.tenant_provider_setting_key, '')
where provisional.source_driver_entity_id is null
and provisional.card_nation is not null
and provisional.card_number is not null
and provisional.id <> real.id
), ),
updated_events as ( updated_events as (
update eventhub.event e update eventhub.event event
set driver_id = map.real_driver_id set driver_card_id = card.driver_card_id
from provisional_to_real map from single_card_per_driver card
where e.driver_id = map.provisional_driver_id where event.driver_id = card.driver_id
and e.driver_id <> map.real_driver_id and event.driver_card_id is null
returning e.id returning event.id
) )
select count(*) as remapped_events select count(*) as backfilled_event_driver_cards
from updated_events; from updated_events;
-- 5) Delete now-unreferenced provisional tachograph driver rows. with remapped_events as (
update eventhub.event event
set driver_id = card.driver_id
from eventhub.driver_card card
where event.driver_card_id = card.id
and card.driver_id is not null
and (
event.driver_id is null
or not exists (
select 1
from eventhub.source_driver_identity source_identity
where source_identity.driver_id = event.driver_id
)
)
and event.driver_id is distinct from card.driver_id
returning event.id
)
select count(*) as remapped_events
from remapped_events;
with deleted_drivers as ( with deleted_drivers as (
delete from eventhub.driver driver delete from eventhub.driver driver
using eventhub.event_source es where not exists (
where es.id = driver.event_source_id select 1
and es.provider_key = 'TACHOGRAPH' from eventhub.event event
and driver.source_driver_entity_id is null where event.driver_id = driver.id
and driver.card_nation is not null )
and driver.card_number is not null
and not exists ( and not exists (
select 1 select 1
from eventhub.event e from eventhub.driver_card card
where e.driver_id = driver.id where card.driver_id = driver.id
)
and not exists (
select 1
from eventhub.source_driver_identity source_identity
where source_identity.driver_id = driver.id
) )
returning driver.id returning driver.id
) )

View File

@ -0,0 +1,110 @@
/*
* Restores legacy columns on eventhub.driver:
* - card_nation
* - card_number
*
* The script is conservative:
* - it recreates the columns and the old check constraint
* - it backfills values only when a driver has exactly one distinct card
* recoverable from:
* 1. eventhub.driver_card.driver_id
* 2. eventhub.event.driver_id + event.driver_card_id
*
* If a driver is associated with multiple distinct cards, the columns remain null.
*/
alter table eventhub.driver
add column if not exists card_nation text;
alter table eventhub.driver
add column if not exists card_number text;
do $restore$
begin
if not exists (
select 1
from information_schema.table_constraints
where constraint_schema = 'eventhub'
and table_name = 'driver'
and constraint_name = 'chk_driver_card_nation_when_number'
) then
alter table eventhub.driver
add constraint chk_driver_card_nation_when_number
check (card_number is null or card_nation is not null);
end if;
end
$restore$;
with direct_driver_cards as (
select driver.id as driver_id,
card.nation,
card.card_number
from eventhub.driver driver
join eventhub.driver_card card
on card.driver_id = driver.id
),
event_driver_cards as (
select distinct event.driver_id,
card.nation,
card.card_number
from eventhub.event event
join eventhub.driver_card card
on card.id = event.driver_card_id
where event.driver_id is not null
and event.driver_card_id is not null
),
all_driver_cards as (
select driver_id, nation, card_number
from direct_driver_cards
union
select driver_id, nation, card_number
from event_driver_cards
),
single_card_per_driver as (
select driver_id,
max(nation) as card_nation,
max(card_number) as card_number
from all_driver_cards
group by driver_id
having count(*) = 1
),
updated_drivers as (
update eventhub.driver driver
set card_nation = single.card_nation,
card_number = single.card_number,
updated_at = now()
from single_card_per_driver single
where driver.id = single.driver_id
and (
driver.card_nation is distinct from single.card_nation
or driver.card_number is distinct from single.card_number
)
returning driver.id
)
select count(*) as restored_driver_card_columns
from updated_drivers;
with ambiguous_drivers as (
select driver_id,
count(*) as distinct_card_count
from (
select distinct driver_id, nation, card_number
from (
select driver.id as driver_id, card.nation, card.card_number
from eventhub.driver driver
join eventhub.driver_card card
on card.driver_id = driver.id
union all
select distinct event.driver_id, card.nation, card.card_number
from eventhub.event event
join eventhub.driver_card card
on card.id = event.driver_card_id
where event.driver_id is not null
and event.driver_card_id is not null
) cards
) distinct_cards
group by driver_id
having count(*) > 1
)
select count(*) as ambiguous_driver_count
from ambiguous_drivers;