diff --git a/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java b/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java index d5a32a7..662a6ec 100644 --- a/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java +++ b/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java @@ -29,7 +29,8 @@ public class EventHubBatchBuildProcessor implements Processor { List events = exchange.getMessage().getBody(List.class); List sortedEvents = sorter.sort(events); - String packageKey = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_KEY, String.class); + String aggregatePackageKey = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_KEY, String.class); + String packageKey = aggregatePackageKey + ":CAMEL-" + exchange.getExchangeId(); EventHubPackageRequest packageInfo = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_INFO, EventHubPackageRequest.class); if (packageInfo == null && !sortedEvents.isEmpty()) { packageInfo = sortedEvents.getFirst().packageInfo(); @@ -46,6 +47,8 @@ public class EventHubBatchBuildProcessor implements Processor { Map metadata = new HashMap<>(); metadata.put("camelRouteId", exchange.getFromRouteId()); metadata.put("packageKey", packageKey); + metadata.put("aggregatePackageKey", aggregatePackageKey); + metadata.put("camelExchangeId", exchange.getExchangeId()); metadata.put("eventCount", sortedEvents.size()); if (packageInfo != null) { metadata.put("tenantKey", packageInfo.tenantKey()); diff --git a/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java index 66bfe2b..20b7404 100644 --- a/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java +++ b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java @@ -57,6 +57,7 @@ public class EventHubCommonIngestionRoute extends RouteBuilder { EventHubProperties.Batch batch = properties.getBatch(); return BATCH_INPUT_QUEUE + "?size=" + batch.getQueueSize() + + "&concurrentConsumers=" + batch.getConcurrentConsumers() + "&blockWhenFull=" + batch.isBlockWhenFull() + "&offerTimeout=" + batch.getQueueOfferTimeout().toMillis(); } diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index a208d07..038d6d0 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -32,7 +32,7 @@ public class EventHubProperties { public static class Batch { /** Number of events collected before a package is persisted. */ - private int completionSize = 1000; + private int completionSize = 5000; /** Maximum time to wait for more events belonging to the same package key. */ private Duration completionTimeout = Duration.ofSeconds(5); @@ -40,6 +40,9 @@ public class EventHubProperties { /** Maximum number of normalized events buffered before producers apply backpressure. */ private int queueSize = 10000; + /** Number of parallel consumers draining the normalized-event queue. */ + private int concurrentConsumers = 4; + /** Whether producers should wait for queue capacity instead of failing immediately. */ private boolean blockWhenFull = true; @@ -70,6 +73,14 @@ public class EventHubProperties { this.queueSize = Math.max(1, queueSize); } + public int getConcurrentConsumers() { + return concurrentConsumers; + } + + public void setConcurrentConsumers(int concurrentConsumers) { + this.concurrentConsumers = Math.max(1, concurrentConsumers); + } + public boolean isBlockWhenFull() { return blockWhenFull; } diff --git a/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java index 7ca889f..1c4d407 100644 --- a/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java @@ -14,11 +14,13 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; import java.util.HashMap; -import java.util.List; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; import org.apache.camel.ProducerTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -27,6 +29,9 @@ import org.springframework.util.StreamUtils; public abstract class AbstractJdbcExtractionBatchExecutor implements ExtractionBatchExecutor { + private static final Logger log = LoggerFactory.getLogger(AbstractJdbcExtractionBatchExecutor.class); + private static final int EVENT_EXTRACTION_PROGRESS_LOG_INTERVAL = 5000; + private final NamedParameterJdbcTemplate jdbcTemplate; private final ProducerTemplate producerTemplate; private final ResourceLoader resourceLoader; @@ -81,10 +86,21 @@ public abstract class AbstractJdbcExtractionBatchExecutor params = parameters(request, chunkScope, cursor); String sql = loadSql(definition.sqlResource()); - List events = jdbcTemplate.query(sql, params, (rs, rowNum) -> definition.rowMapper().map(rs, rowNum, context)); - events.forEach(event -> producerTemplate.sendBody(normalizedInputUri(), event)); + ExtractedEventStats stats = new ExtractedEventStats(); + log.info("Reading EventHub events provider={} tenant={} importRunId={} packageId={} extractionCode={} sourceKind={} chunk={} occurredFrom={} occurredTo={}", + providerPackagePrefix(), request.tenantKey(), importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), + chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo()); + jdbcTemplate.query(sql, params, rs -> { + EventHubEventDto event = definition.rowMapper().map(rs, stats.eventsMapped(), context); + producerTemplate.sendBody(normalizedInputUri(), event); + stats.accept(event); + if (stats.eventsMapped() % EVENT_EXTRACTION_PROGRESS_LOG_INTERVAL == 0) { + logEventExtractionProgress(request, importRunId, packageId, planItem, chunk, stats); + } + }); + logEventExtractionFinished(request, importRunId, packageId, planItem, chunk, stats); - return resultFor(packageId, planItem, chunk, cursor, events); + return resultFor(packageId, planItem, chunk, cursor, stats); } protected abstract Optional> findDefinition(String code); @@ -96,7 +112,7 @@ public abstract class AbstractJdbcExtractionBatchExecutor events + ExtractedEventStats stats ); protected String providerPackagePrefix() { @@ -107,6 +123,32 @@ public abstract class AbstractJdbcExtractionBatchExecutor parameters(R request, ImportScopeDto scope, ImportCursorStateDto cursor) { Map params = new HashMap<>(); String organisationId = scope == null || scope.rootSourceOrganisation() == null @@ -125,24 +167,20 @@ public abstract class AbstractJdbcExtractionBatchExecutor events, ImportCursorStateDto cursor) { - return events.stream() - .map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().importedIntoSourceAt()) - .filter(value -> value != null) - .max(OffsetDateTime::compareTo) - .orElse(cursor == null ? null : cursor.lastSourcePackageImportedAt()); + protected OffsetDateTime lastSourcePackageImportedAt(ExtractedEventStats stats, ImportCursorStateDto cursor) { + return stats.lastSourcePackageImportedAt() == null + ? cursor == null ? null : cursor.lastSourcePackageImportedAt() + : stats.lastSourcePackageImportedAt(); } - protected String lastSourcePackageId(List events, ImportCursorStateDto cursor) { - return events.stream() - .filter(event -> event.sourcePackageRef() != null && event.sourcePackageRef().importedIntoSourceAt() != null) - .max((left, right) -> left.sourcePackageRef().importedIntoSourceAt().compareTo(right.sourcePackageRef().importedIntoSourceAt())) - .map(event -> event.sourcePackageRef().sourcePackageId()) - .orElseGet(() -> events.stream() - .map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId()) - .filter(value -> value != null && !value.isBlank()) - .max(this::compareSourcePackageId) - .orElse(cursor == null ? null : cursor.lastSourcePackageId())); + protected String lastSourcePackageId(ExtractedEventStats stats, ImportCursorStateDto cursor) { + if (stats.lastSourcePackageIdByImportedAt() != null) { + return stats.lastSourcePackageIdByImportedAt(); + } + if (stats.maxSourcePackageId() != null) { + return stats.maxSourcePackageId(); + } + return cursor == null ? null : cursor.lastSourcePackageId(); } private EventHubPackageRequest packageInfo( @@ -215,4 +253,59 @@ public abstract class AbstractJdbcExtractionBatchExecutor eventTypeCounts = new LinkedHashMap<>(); + + public int eventsMapped() { + return eventsMapped; + } + + public OffsetDateTime lastSourcePackageImportedAt() { + return lastSourcePackageImportedAt; + } + + public String lastSourcePackageIdByImportedAt() { + return lastSourcePackageIdByImportedAt; + } + + public String maxSourcePackageId() { + return maxSourcePackageId; + } + + public Map eventTypeCounts() { + return eventTypeCounts; + } + + private void accept(EventHubEventDto event) { + eventsMapped++; + eventTypeCounts.merge(eventTypeKey(event), 1, Integer::sum); + if (event.sourcePackageRef() == null) { + return; + } + + OffsetDateTime importedAt = event.sourcePackageRef().importedIntoSourceAt(); + String sourcePackageId = event.sourcePackageRef().sourcePackageId(); + if (importedAt != null + && (lastSourcePackageImportedAt == null || importedAt.compareTo(lastSourcePackageImportedAt) > 0)) { + lastSourcePackageImportedAt = importedAt; + lastSourcePackageIdByImportedAt = sourcePackageId; + } + if (sourcePackageId != null + && !sourcePackageId.isBlank() + && (maxSourcePackageId == null || compareSourcePackageId(sourcePackageId, maxSourcePackageId) > 0)) { + maxSourcePackageId = sourcePackageId; + } + } + + private String eventTypeKey(EventHubEventDto event) { + String domain = event.eventDomain() == null ? "UNKNOWN_DOMAIN" : event.eventDomain().name(); + String type = event.eventType() == null ? "UNKNOWN_EVENT" : event.eventType().name(); + return domain + "/" + type; + } + } } diff --git a/src/main/java/at/procon/eventhub/persistence/EventRepository.java b/src/main/java/at/procon/eventhub/persistence/EventRepository.java index f2e107a..cdfc3d4 100644 --- a/src/main/java/at/procon/eventhub/persistence/EventRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/EventRepository.java @@ -9,12 +9,16 @@ import at.procon.eventhub.service.EventAcquisitionRecordKeyService; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.time.OffsetDateTime; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; @@ -47,21 +51,22 @@ public class EventRepository { */ public int batchInsert(UUID packageId, String tenantKey, int eventSourceId, List events) { Map entityIdCache = new HashMap<>(); - int insertedCount = 0; + List rows = new ArrayList<>(events.size()); for (EventHubEventDto event : events) { ResolvedEntityRefs refs = resolveEntityRefs(tenantKey, eventSourceId, event, entityIdCache); - InsertedEventRow row = upsertEvent(packageId, eventSourceId, event, refs); - upsertEventDetails(row, event); - if (row.inserted()) { - insertedCount++; - } + rows.add(resolveEventImportRow(packageId, eventSourceId, event, refs)); } + createEventImportStage(); + stageEvents(rows); + int insertedCount = insertStagedEvents(); + Map eventRowsBySourceRecord = resolveStagedEventRows(); + batchUpsertEventDetails(rows, eventRowsBySourceRecord); return insertedCount; } - private InsertedEventRow upsertEvent( + private ResolvedEventImportRow resolveEventImportRow( UUID packageId, int eventSourceId, EventHubEventDto event, @@ -70,62 +75,20 @@ public class EventRepository { UUID requestedEventId = event.eventId() == null ? UUID.randomUUID() : event.eventId(); OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt(); String sourceRecordKeyHash = recordKeyService.buildSourceRecordKeyHash(event, eventSourceId); - var longitude = event.position() == null ? null : event.position().longitude(); - var latitude = event.position() == null ? null : event.position().latitude(); + Double longitude = event.position() == null ? null : event.position().longitude().doubleValue(); + Double latitude = event.position() == null ? null : event.position().latitude().doubleValue(); + String sourcePackageId = event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId(); - return jdbcTemplate.query( - """ - with inserted as ( - insert into eventhub.event( - id, event_source_id, data_package_id, - external_source_event_id, - driver_entity_id, vehicle_id, vehicle_registration_id, source_package_entity_id, - occurred_at, received_partner_at, received_hub_at, - event_domain, event_type, lifecycle, - odometer_m, position, - payload, manual_entry, - source_record_key_hash, event_signature_hash - ) values ( - ?, ?, ?, - ?, - ?, ?, ?, ?, - ?, ?, ?, - ?, ?, ?, - ?, case - when ?::double precision is null or ?::double precision is null then null - else ST_SetSRID(ST_MakePoint(?::double precision, ?::double precision), 4326)::geography - end, - ?::jsonb, ?, - ?, ? - ) - on conflict (source_record_key_hash) do nothing - returning id, occurred_at, true as inserted - ) - select id, occurred_at, inserted - from inserted - union all - select e.id, e.occurred_at, false as inserted - from eventhub.event e - where e.source_record_key_hash = ? - and not exists (select 1 from inserted) - """, - rs -> { - if (!rs.next()) { - throw new IllegalStateException("Could not insert or resolve event row for source record hash " + sourceRecordKeyHash); - } - return new InsertedEventRow( - (UUID) rs.getObject("id"), - rs.getObject("occurred_at", OffsetDateTime.class), - rs.getBoolean("inserted") - ); - }, + return new ResolvedEventImportRow( + sourceRecordKeyHash, requestedEventId, - eventSourceId, packageId, + eventSourceId, event.externalSourceEventId(), refs.driverEntityId(), refs.vehicleId(), refs.vehicleRegistrationId(), + sourcePackageId, refs.sourcePackageEntityId(), event.occurredAt(), event.receivedPartnerAt(), @@ -136,21 +99,227 @@ public class EventRepository { event.odometerM(), longitude, latitude, - longitude, - latitude, toJson(event.payload()), event.manualEntry(), - sourceRecordKeyHash, recordKeyService.buildEventSignatureHash(event), - sourceRecordKeyHash + event.eventDetails() == null ? null : event.eventDetails().type(), + event.eventDetails() == null ? null : toJson(event.eventDetails().attributes()) ); } - private void upsertEventDetails(InsertedEventRow insertedEventRow, EventHubEventDto event) { - if (event.eventDetails() == null) { + private void createEventImportStage() { + jdbcTemplate.execute( + """ + create temporary table if not exists eventhub_event_import_stage ( + row_no integer not null, + source_record_key_hash text not null, + requested_event_id uuid not null, + data_package_id uuid not null, + event_source_id integer not null, + external_source_event_id text not null, + driver_entity_id uuid, + vehicle_id uuid, + vehicle_registration_id uuid, + source_package_id text, + source_package_entity_id uuid, + occurred_at timestamptz not null, + received_partner_at timestamptz, + received_hub_at timestamptz not null, + event_domain text not null, + event_type text not null, + lifecycle text not null, + odometer_m bigint, + longitude double precision, + latitude double precision, + payload jsonb not null, + manual_entry boolean not null, + event_signature_hash text + ) on commit drop + """ + ); + jdbcTemplate.execute("truncate table eventhub_event_import_stage"); + } + + private void stageEvents(List rows) { + jdbcTemplate.batchUpdate( + """ + insert into eventhub_event_import_stage( + row_no, source_record_key_hash, requested_event_id, data_package_id, event_source_id, + external_source_event_id, driver_entity_id, vehicle_id, vehicle_registration_id, + source_package_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at, + event_domain, event_type, lifecycle, odometer_m, longitude, latitude, + payload, manual_entry, event_signature_hash + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?) + """, + new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + ResolvedEventImportRow row = rows.get(i); + ps.setInt(1, i); + ps.setString(2, row.sourceRecordKeyHash()); + ps.setObject(3, row.requestedEventId()); + ps.setObject(4, row.packageId()); + ps.setInt(5, row.eventSourceId()); + ps.setString(6, row.externalSourceEventId()); + ps.setObject(7, row.driverEntityId()); + ps.setObject(8, row.vehicleId()); + ps.setObject(9, row.vehicleRegistrationId()); + ps.setString(10, row.sourcePackageId()); + ps.setObject(11, row.sourcePackageEntityId()); + ps.setObject(12, row.occurredAt()); + ps.setObject(13, row.receivedPartnerAt()); + ps.setObject(14, row.receivedHubAt()); + ps.setString(15, row.eventDomain()); + ps.setString(16, row.eventType()); + ps.setString(17, row.lifecycle()); + ps.setObject(18, row.odometerM()); + ps.setObject(19, row.longitude()); + ps.setObject(20, row.latitude()); + ps.setString(21, row.payloadJson()); + ps.setBoolean(22, row.manualEntry()); + ps.setString(23, row.eventSignatureHash()); + } + + @Override + public int getBatchSize() { + return rows.size(); + } + } + ); + } + + private int insertStagedEvents() { + Long insertedCount = jdbcTemplate.queryForObject( + """ + with stage_one as ( + select distinct on (source_record_key_hash) * + from eventhub_event_import_stage + order by source_record_key_hash, row_no + ), + reserved_source_record as ( + insert into eventhub.event_source_record( + source_record_key_hash, event_occurred_at, event_id + ) + select source_record_key_hash, occurred_at, requested_event_id + from stage_one + on conflict (source_record_key_hash) do nothing + returning source_record_key_hash, event_occurred_at, event_id + ), + existing_source_record as ( + select source_record.source_record_key_hash, + source_record.event_occurred_at, + source_record.event_id + from stage_one stage + join eventhub.event_source_record source_record + on source_record.source_record_key_hash = stage.source_record_key_hash + where not exists ( + select 1 + from reserved_source_record reserved + where reserved.source_record_key_hash = stage.source_record_key_hash + ) + ), + source_record_for_stage as ( + select * + from reserved_source_record + union all + select * + from existing_source_record + ), + inserted_event as ( + insert into eventhub.event( + id, event_source_id, data_package_id, + external_source_event_id, + driver_entity_id, vehicle_id, vehicle_registration_id, + source_package_id, source_package_entity_id, + occurred_at, received_partner_at, received_hub_at, + event_domain, event_type, lifecycle, + odometer_m, position, + payload, manual_entry, + source_record_key_hash, event_signature_hash + ) + select + source_record.event_id, stage.event_source_id, stage.data_package_id, + stage.external_source_event_id, + stage.driver_entity_id, stage.vehicle_id, stage.vehicle_registration_id, + stage.source_package_id, stage.source_package_entity_id, + source_record.event_occurred_at, stage.received_partner_at, stage.received_hub_at, + stage.event_domain, stage.event_type, stage.lifecycle, + stage.odometer_m, case + when stage.longitude is null or stage.latitude is null then null + else ST_SetSRID(ST_MakePoint(stage.longitude, stage.latitude), 4326)::geography + end, + stage.payload, stage.manual_entry, + stage.source_record_key_hash, stage.event_signature_hash + from stage_one stage + join source_record_for_stage source_record + on source_record.source_record_key_hash = stage.source_record_key_hash + where not exists ( + select 1 + from eventhub.event existing_event + where existing_event.occurred_at = source_record.event_occurred_at + and existing_event.id = source_record.event_id + ) + returning id + ) + select count(*) + from inserted_event + """, + Long.class + ); + return insertedCount == null ? 0 : Math.toIntExact(insertedCount); + } + + private Map resolveStagedEventRows() { + return jdbcTemplate.query( + """ + select distinct + stage.source_record_key_hash, + source_record.event_id, + source_record.event_occurred_at + from eventhub_event_import_stage stage + join eventhub.event_source_record source_record + on source_record.source_record_key_hash = stage.source_record_key_hash + """, + rs -> { + Map rows = new HashMap<>(); + while (rs.next()) { + rows.put( + rs.getString("source_record_key_hash"), + new InsertedEventRow( + (UUID) rs.getObject("event_id"), + rs.getObject("event_occurred_at", OffsetDateTime.class) + ) + ); + } + return rows; + } + ); + } + + private void batchUpsertEventDetails( + List rows, + Map eventRowsBySourceRecord + ) { + List details = new ArrayList<>(); + for (ResolvedEventImportRow row : rows) { + if (row.detailType() == null) { + continue; + } + InsertedEventRow insertedEventRow = eventRowsBySourceRecord.get(row.sourceRecordKeyHash()); + if (insertedEventRow == null) { + throw new IllegalStateException("Could not insert or resolve event row for source record hash " + row.sourceRecordKeyHash()); + } + details.add(new EventDetailImportRow( + insertedEventRow.eventId(), + insertedEventRow.occurredAt(), + row.detailType(), + row.detailAttributesJson() + )); + } + if (details.isEmpty()) { return; } - jdbcTemplate.update( + jdbcTemplate.batchUpdate( """ insert into eventhub.event_detail( event_occurred_at, event_id, detail_type, attributes @@ -159,10 +328,21 @@ public class EventRepository { do update set attributes = excluded.attributes """, - insertedEventRow.occurredAt(), - insertedEventRow.eventId(), - event.eventDetails().type(), - toJson(event.eventDetails().attributes()) + new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + EventDetailImportRow detail = details.get(i); + ps.setObject(1, detail.occurredAt()); + ps.setObject(2, detail.eventId()); + ps.setString(3, detail.detailType()); + ps.setString(4, detail.attributesJson()); + } + + @Override + public int getBatchSize() { + return details.size(); + } + } ); } @@ -341,8 +521,43 @@ public class EventRepository { private record InsertedEventRow( UUID eventId, + OffsetDateTime occurredAt + ) { + } + + private record ResolvedEventImportRow( + String sourceRecordKeyHash, + UUID requestedEventId, + UUID packageId, + int eventSourceId, + String externalSourceEventId, + UUID driverEntityId, + UUID vehicleId, + UUID vehicleRegistrationId, + String sourcePackageId, + UUID sourcePackageEntityId, OffsetDateTime occurredAt, - boolean inserted + OffsetDateTime receivedPartnerAt, + OffsetDateTime receivedHubAt, + String eventDomain, + String eventType, + String lifecycle, + Long odometerM, + Double longitude, + Double latitude, + String payloadJson, + boolean manualEntry, + String eventSignatureHash, + String detailType, + String detailAttributesJson + ) { + } + + private record EventDetailImportRow( + UUID eventId, + OffsetDateTime occurredAt, + String detailType, + String attributesJson ) { } } diff --git a/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java b/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java index 57de7b0..045198a 100644 --- a/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java @@ -8,7 +8,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.POJONode; import java.lang.reflect.Array; import java.sql.PreparedStatement; +import java.sql.SQLException; import java.sql.Timestamp; +import java.time.OffsetDateTime; import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalAmount; import java.util.ArrayList; @@ -18,8 +20,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; @Repository public class SourceMasterDataRepository { @@ -32,33 +36,15 @@ public class SourceMasterDataRepository { this.objectMapper = objectMapper; } + @Transactional public int upsertEntities(String tenantKey, int eventSourceId, List entities) { - int count = 0; + List rows = new ArrayList<>(entities.size()); for (SourceMasterEntityUpsert entity : entities) { if (entity.sourceEntityId() == null || entity.sourceEntityId().isBlank()) { continue; } - jdbcTemplate.update( - """ - insert into eventhub.source_master_entity( - id, tenant_key, event_source_id, entity_type, source_entity_id, - source_external_key, display_name, active, valid_from, valid_to, - source_updated_at, payload, updated_at - ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) - on conflict (tenant_key, event_source_id, entity_type, source_entity_id) - do update set - source_external_key = excluded.source_external_key, - display_name = excluded.display_name, - active = excluded.active, - valid_from = excluded.valid_from, - valid_to = excluded.valid_to, - source_updated_at = excluded.source_updated_at, - payload = excluded.payload, - updated_at = now() - """, + rows.add(new SourceMasterEntityStageRow( UUID.randomUUID(), - tenantKey, - eventSourceId, entity.entityType(), entity.sourceEntityId(), entity.sourceExternalKey(), @@ -68,43 +54,28 @@ public class SourceMasterDataRepository { entity.validTo(), entity.sourceUpdatedAt(), toJson(entity.payload()) - ); - count++; + )); } - return count; + if (rows.isEmpty()) { + return 0; + } + + createEntityStage(); + stageEntities(rows); + upsertStagedEntities(tenantKey, eventSourceId); + return rows.size(); } + @Transactional public int upsertRelations(String tenantKey, int eventSourceId, List relations) { - int count = 0; + List rows = new ArrayList<>(relations.size()); for (SourceMasterRelationUpsert relation : relations) { if (relation.fromSourceEntityId() == null || relation.toSourceEntityId() == null) { continue; } - String relationKey = relationKey(relation); - jdbcTemplate.update( - """ - insert into eventhub.source_master_relation( - id, tenant_key, event_source_id, relation_key, relation_type, - from_entity_type, from_source_entity_id, to_entity_type, to_source_entity_id, - valid_from, valid_to, source_updated_at, payload, updated_at - ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) - on conflict (tenant_key, event_source_id, relation_key) - do update set - relation_type = excluded.relation_type, - from_entity_type = excluded.from_entity_type, - from_source_entity_id = excluded.from_source_entity_id, - to_entity_type = excluded.to_entity_type, - to_source_entity_id = excluded.to_source_entity_id, - valid_from = excluded.valid_from, - valid_to = excluded.valid_to, - source_updated_at = excluded.source_updated_at, - payload = excluded.payload, - updated_at = now() - """, + rows.add(new SourceMasterRelationStageRow( UUID.randomUUID(), - tenantKey, - eventSourceId, - relationKey, + relationKey(relation), relation.relationType(), relation.fromEntityType(), relation.fromSourceEntityId(), @@ -114,10 +85,194 @@ public class SourceMasterDataRepository { relation.validTo(), relation.sourceUpdatedAt(), toJson(relation.payload()) - ); - count++; + )); } - return count; + if (rows.isEmpty()) { + return 0; + } + + createRelationStage(); + stageRelations(rows); + upsertStagedRelations(tenantKey, eventSourceId); + return rows.size(); + } + + private void createEntityStage() { + jdbcTemplate.execute( + """ + create temporary table if not exists eventhub_source_master_entity_stage ( + row_no integer not null, + id uuid not null, + entity_type text not null, + source_entity_id text not null, + source_external_key text, + display_name text, + active boolean, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null + ) on commit drop + """ + ); + jdbcTemplate.execute("truncate table eventhub_source_master_entity_stage"); + } + + private void stageEntities(List rows) { + jdbcTemplate.batchUpdate( + """ + insert into eventhub_source_master_entity_stage( + row_no, id, entity_type, source_entity_id, source_external_key, display_name, + active, valid_from, valid_to, source_updated_at, payload + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb) + """, + new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + SourceMasterEntityStageRow row = rows.get(i); + ps.setInt(1, i); + ps.setObject(2, row.id()); + ps.setString(3, row.entityType()); + ps.setString(4, row.sourceEntityId()); + ps.setString(5, row.sourceExternalKey()); + ps.setString(6, row.displayName()); + ps.setObject(7, row.active()); + ps.setObject(8, row.validFrom()); + ps.setObject(9, row.validTo()); + ps.setObject(10, row.sourceUpdatedAt()); + ps.setString(11, row.payloadJson()); + } + + @Override + public int getBatchSize() { + return rows.size(); + } + } + ); + } + + private void upsertStagedEntities(String tenantKey, int eventSourceId) { + jdbcTemplate.update( + """ + insert into eventhub.source_master_entity( + id, tenant_key, event_source_id, entity_type, source_entity_id, + source_external_key, display_name, active, valid_from, valid_to, + source_updated_at, payload, updated_at + ) + select + id, ?, ?, entity_type, source_entity_id, + source_external_key, display_name, active, valid_from, valid_to, + source_updated_at, payload, now() + from ( + select distinct on (entity_type, source_entity_id) * + from eventhub_source_master_entity_stage + order by entity_type, source_entity_id, row_no desc + ) stage + on conflict (tenant_key, event_source_id, entity_type, source_entity_id) + do update set + source_external_key = excluded.source_external_key, + display_name = excluded.display_name, + active = excluded.active, + valid_from = excluded.valid_from, + valid_to = excluded.valid_to, + source_updated_at = excluded.source_updated_at, + payload = excluded.payload, + updated_at = now() + """, + tenantKey, + eventSourceId + ); + } + + private void createRelationStage() { + jdbcTemplate.execute( + """ + create temporary table if not exists eventhub_source_master_relation_stage ( + row_no integer not null, + id uuid not null, + relation_key text not null, + relation_type text not null, + from_entity_type text not null, + from_source_entity_id text not null, + to_entity_type text not null, + to_source_entity_id text not null, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null + ) on commit drop + """ + ); + jdbcTemplate.execute("truncate table eventhub_source_master_relation_stage"); + } + + private void stageRelations(List rows) { + jdbcTemplate.batchUpdate( + """ + insert into eventhub_source_master_relation_stage( + row_no, id, relation_key, relation_type, from_entity_type, from_source_entity_id, + to_entity_type, to_source_entity_id, valid_from, valid_to, source_updated_at, payload + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb) + """, + new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + SourceMasterRelationStageRow row = rows.get(i); + ps.setInt(1, i); + ps.setObject(2, row.id()); + ps.setString(3, row.relationKey()); + ps.setString(4, row.relationType()); + ps.setString(5, row.fromEntityType()); + ps.setString(6, row.fromSourceEntityId()); + ps.setString(7, row.toEntityType()); + ps.setString(8, row.toSourceEntityId()); + ps.setObject(9, row.validFrom()); + ps.setObject(10, row.validTo()); + ps.setObject(11, row.sourceUpdatedAt()); + ps.setString(12, row.payloadJson()); + } + + @Override + public int getBatchSize() { + return rows.size(); + } + } + ); + } + + private void upsertStagedRelations(String tenantKey, int eventSourceId) { + jdbcTemplate.update( + """ + insert into eventhub.source_master_relation( + id, tenant_key, event_source_id, relation_key, relation_type, + from_entity_type, from_source_entity_id, to_entity_type, to_source_entity_id, + valid_from, valid_to, source_updated_at, payload, updated_at + ) + select + id, ?, ?, relation_key, relation_type, + from_entity_type, from_source_entity_id, to_entity_type, to_source_entity_id, + valid_from, valid_to, source_updated_at, payload, now() + from ( + select distinct on (relation_key) * + from eventhub_source_master_relation_stage + order by relation_key, row_no desc + ) stage + on conflict (tenant_key, event_source_id, relation_key) + do update set + relation_type = excluded.relation_type, + from_entity_type = excluded.from_entity_type, + from_source_entity_id = excluded.from_source_entity_id, + to_entity_type = excluded.to_entity_type, + to_source_entity_id = excluded.to_source_entity_id, + valid_from = excluded.valid_from, + valid_to = excluded.valid_to, + source_updated_at = excluded.source_updated_at, + payload = excluded.payload, + updated_at = now() + """, + tenantKey, + eventSourceId + ); } public UUID resolveOrCreateEntityId( @@ -135,8 +290,12 @@ public class SourceMasterDataRepository { String normalizedSourceEntityId = normalizeRequired(sourceEntityId, "sourceEntityId"); String normalizedSourceExternalKey = normalizeNullable(sourceExternalKey); String normalizedDisplayName = normalizeNullable(displayName); + UUID existing = findEntityId(normalizedTenantKey, eventSourceId, normalizedEntityType, normalizedSourceEntityId); + if (existing != null) { + return existing; + } - return jdbcTemplate.query( + UUID created = jdbcTemplate.query( con -> { PreparedStatement ps = con.prepareStatement( """ @@ -144,13 +303,7 @@ public class SourceMasterDataRepository { id, tenant_key, event_source_id, entity_type, source_entity_id, source_external_key, display_name, active, payload, updated_at ) values (?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) - on conflict (tenant_key, event_source_id, entity_type, source_entity_id) - do update set - source_external_key = coalesce(excluded.source_external_key, eventhub.source_master_entity.source_external_key), - display_name = coalesce(excluded.display_name, eventhub.source_master_entity.display_name), - active = coalesce(excluded.active, eventhub.source_master_entity.active), - payload = coalesce(excluded.payload, eventhub.source_master_entity.payload), - updated_at = now() + on conflict (tenant_key, event_source_id, entity_type, source_entity_id) do nothing returning id """ ); @@ -175,6 +328,36 @@ public class SourceMasterDataRepository { return (UUID) rs.getObject(1); } ); + if (created != null) { + return created; + } + + UUID resolved = findEntityId(normalizedTenantKey, eventSourceId, normalizedEntityType, normalizedSourceEntityId); + if (resolved != null) { + return resolved; + } + throw new IllegalStateException( + "Could not resolve source master entity id for " + + normalizedTenantKey + ":" + eventSourceId + ":" + normalizedEntityType + ":" + normalizedSourceEntityId + ); + } + + private UUID findEntityId(String tenantKey, int eventSourceId, String entityType, String sourceEntityId) { + return jdbcTemplate.query( + """ + select id + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id = ? + and entity_type = ? + and source_entity_id = ? + """, + rs -> rs.next() ? (UUID) rs.getObject("id") : null, + tenantKey, + eventSourceId, + entityType, + sourceEntityId + ); } private String relationKey(SourceMasterRelationUpsert relation) { @@ -324,4 +507,33 @@ public class SourceMasterDataRepository { String trimmed = value.trim(); return trimmed.isEmpty() ? null : trimmed; } + + private record SourceMasterEntityStageRow( + UUID id, + String entityType, + String sourceEntityId, + String sourceExternalKey, + String displayName, + Boolean active, + OffsetDateTime validFrom, + OffsetDateTime validTo, + OffsetDateTime sourceUpdatedAt, + String payloadJson + ) { + } + + private record SourceMasterRelationStageRow( + UUID id, + String relationKey, + String relationType, + String fromEntityType, + String fromSourceEntityId, + String toEntityType, + String toSourceEntityId, + OffsetDateTime validFrom, + OffsetDateTime validTo, + OffsetDateTime sourceUpdatedAt, + String payloadJson + ) { + } } diff --git a/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java index a8465c5..0b3c524 100644 --- a/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java @@ -4,15 +4,14 @@ import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import java.sql.Timestamp; import java.time.OffsetDateTime; -import java.time.ZoneOffset; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; @Repository public class VehicleIdentityRepository { @@ -90,103 +89,223 @@ public class VehicleIdentityRepository { return new ResolvedVehicleReference(vehicleId, registrationId); } + @Transactional public int reconcileFromMasterData(String tenantKey, int eventSourceId) { String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); - int updates = 0; - - List vehicles = jdbcTemplate.query( - compatibleSourcesCte() + """ - select event_source_id, source_entity_id, source_external_key, source_updated_at - from eventhub.source_master_entity - where tenant_key = ? - and event_source_id in (select id from compatible_sources) - and entity_type = 'VEHICLE' - """, - (rs, rowNum) -> new VehicleMasterRow( - rs.getInt("event_source_id"), - normalizeNullable(rs.getString("source_entity_id")), - normalizeNullable(rs.getString("source_external_key")), - offsetDateTime(rs.getObject("source_updated_at")) - ), - eventSourceId, - normalizedTenantKey - ); - for (VehicleMasterRow vehicle : vehicles) { - if (vehicle.sourceVehicleEntityId() == null && vehicle.vin() == null) { - continue; - } - UUID vehicleId = resolveVehicleId(normalizedTenantKey, vehicle.eventSourceId(), vehicle.sourceVehicleEntityId(), vehicle.vin()); - if (vehicleId == null) { - createVehicle(normalizedTenantKey, vehicle.eventSourceId(), vehicle.sourceVehicleEntityId(), vehicle.vin()); - } else { - touchVehicle(vehicleId, vehicle.sourceVehicleEntityId(), vehicle.vin()); - } - updates++; - } - - List registrations = jdbcTemplate.query( - compatibleSourcesCte() + """ - select event_source_id, - source_entity_id, - source_external_key, - source_updated_at, - payload ->> 'registration_nation' as registration_nation, - payload ->> 'registration_number' as registration_number - from eventhub.source_master_entity - where tenant_key = ? - and event_source_id in (select id from compatible_sources) - and entity_type = 'VEHICLE_REGISTRATION' - """, - (rs, rowNum) -> registrationMasterRow( - rs.getInt("event_source_id"), - normalizeNullable(rs.getString("source_entity_id")), - normalizeNullable(rs.getString("source_external_key")), - normalizeNullable(rs.getString("registration_nation")), - normalizeNullable(rs.getString("registration_number")), - offsetDateTime(rs.getObject("source_updated_at")) - ), - eventSourceId, - normalizedTenantKey - ); - for (RegistrationMasterRow registration : registrations) { - if (registration.nation() == null || registration.registrationNumber() == null) { - continue; - } - UUID registrationId = resolveRegistrationId( - normalizedTenantKey, - registration.eventSourceId(), - registration.sourceRegistrationEntityId(), - registration.nation(), - registration.registrationNumber(), - null - ); - if (registrationId == null) { - createRegistration( - normalizedTenantKey, - registration.eventSourceId(), - registration.sourceRegistrationEntityId(), - registration.nation(), - registration.registrationNumber(), - registration.sourceUpdatedAt(), - Map.of("source", "master-data") - ); - } else { - updateRegistrationFromMasterData( - registrationId, - registration.sourceRegistrationEntityId(), - registration.nation(), - registration.registrationNumber(), - registration.sourceUpdatedAt() - ); - } - updates++; - } - + int updates = reconcileVehiclesFromMasterData(normalizedTenantKey, eventSourceId); + updates += reconcileRegistrationsFromMasterData(normalizedTenantKey, eventSourceId); updates += projectVehicleRegistrationAssignments(normalizedTenantKey, eventSourceId); return updates; } + private int reconcileVehiclesFromMasterData(String tenantKey, int eventSourceId) { + Long count = jdbcTemplate.queryForObject( + compatibleSourcesCte() + """ + , master_vehicles as ( + select distinct on ( + event_source_id, + nullif(trim(source_entity_id), ''), + nullif(trim(source_external_key), '') + ) + event_source_id, + nullif(trim(source_entity_id), '') as source_vehicle_entity_id, + nullif(trim(source_external_key), '') as vin + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'VEHICLE' + and ( + nullif(trim(source_entity_id), '') is not null + or nullif(trim(source_external_key), '') is not null + ) + order by event_source_id, + nullif(trim(source_entity_id), ''), + nullif(trim(source_external_key), ''), + updated_at desc + ), + updated_by_source as ( + update eventhub.vehicle vehicle + set source_vehicle_entity_id = coalesce(vehicle.source_vehicle_entity_id, master.source_vehicle_entity_id), + vin = coalesce(vehicle.vin, master.vin), + updated_at = now() + from master_vehicles master + where vehicle.tenant_key = ? + and vehicle.event_source_id in (select id from compatible_sources) + and master.source_vehicle_entity_id is not null + and vehicle.source_vehicle_entity_id = master.source_vehicle_entity_id + returning vehicle.id + ), + updated_by_vin as ( + update eventhub.vehicle vehicle + set source_vehicle_entity_id = coalesce(vehicle.source_vehicle_entity_id, master.source_vehicle_entity_id), + vin = coalesce(vehicle.vin, master.vin), + updated_at = now() + from master_vehicles master + where vehicle.tenant_key = ? + and vehicle.event_source_id in (select id from compatible_sources) + and master.vin is not null + and vehicle.vin = master.vin + returning vehicle.id + ), + inserted as ( + insert into eventhub.vehicle(id, tenant_key, event_source_id, source_vehicle_entity_id, vin, updated_at) + select gen_random_uuid(), ?, master.event_source_id, master.source_vehicle_entity_id, master.vin, now() + from master_vehicles master + where not exists ( + select 1 + from eventhub.vehicle existing + where existing.tenant_key = ? + and existing.event_source_id in (select id from compatible_sources) + and ( + (master.source_vehicle_entity_id is not null and existing.source_vehicle_entity_id = master.source_vehicle_entity_id) + or (master.vin is not null and existing.vin = master.vin) + ) + ) + returning id + ) + select (select count(*) from updated_by_source) + + (select count(*) from updated_by_vin) + + (select count(*) from inserted) + """, + Long.class, + eventSourceId, + tenantKey, + tenantKey, + tenantKey, + tenantKey, + tenantKey + ); + return count == null ? 0 : Math.toIntExact(count); + } + + private int reconcileRegistrationsFromMasterData(String tenantKey, int eventSourceId) { + Long count = jdbcTemplate.queryForObject( + compatibleSourcesCte() + """ + , master_registrations as ( + select distinct on ( + event_source_id, + nullif(trim(source_entity_id), ''), + coalesce( + nullif(trim(payload ->> 'registration_nation'), ''), + nullif(split_part(source_external_key, ':', 1), '') + ), + coalesce( + nullif(trim(payload ->> 'registration_number'), ''), + nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + ) + ) + event_source_id, + nullif(trim(source_entity_id), '') as source_registration_entity_id, + coalesce( + nullif(trim(payload ->> 'registration_nation'), ''), + nullif(split_part(source_external_key, ':', 1), '') + ) as nation, + coalesce( + nullif(trim(payload ->> 'registration_number'), ''), + nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + ) as registration_number, + source_updated_at + from eventhub.source_master_entity + where tenant_key = ? + and event_source_id in (select id from compatible_sources) + and entity_type = 'VEHICLE_REGISTRATION' + and ( + nullif(trim(payload ->> 'registration_nation'), '') is not null + or source_external_key like '%:%' + ) + and ( + nullif(trim(payload ->> 'registration_number'), '') is not null + or source_external_key like '%:%' + ) + order by event_source_id, + nullif(trim(source_entity_id), ''), + coalesce( + nullif(trim(payload ->> 'registration_nation'), ''), + nullif(split_part(source_external_key, ':', 1), '') + ), + coalesce( + nullif(trim(payload ->> 'registration_number'), ''), + nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') + ), + updated_at desc + ), + updated_by_source as ( + update eventhub.vehicle_registration registration + set source_registration_entity_id = coalesce(registration.source_registration_entity_id, master.source_registration_entity_id), + nation = coalesce(master.nation, registration.nation), + registration_number = coalesce(master.registration_number, registration.registration_number), + source_updated_at = master.source_updated_at, + updated_at = now() + from master_registrations master + where registration.tenant_key = ? + and registration.event_source_id in (select id from compatible_sources) + and master.source_registration_entity_id is not null + and registration.source_registration_entity_id = master.source_registration_entity_id + returning registration.id + ), + updated_by_plate as ( + update eventhub.vehicle_registration registration + set source_registration_entity_id = coalesce(registration.source_registration_entity_id, master.source_registration_entity_id), + nation = coalesce(master.nation, registration.nation), + registration_number = coalesce(master.registration_number, registration.registration_number), + source_updated_at = master.source_updated_at, + updated_at = now() + from master_registrations master + where registration.tenant_key = ? + and registration.event_source_id in (select id from compatible_sources) + and registration.nation = master.nation + and registration.registration_number = master.registration_number + returning registration.id + ), + inserted as ( + insert into eventhub.vehicle_registration( + id, tenant_key, event_source_id, source_registration_entity_id, nation, registration_number, + source_updated_at, payload, updated_at + ) + select gen_random_uuid(), + ?, + master.event_source_id, + master.source_registration_entity_id, + master.nation, + master.registration_number, + master.source_updated_at, + jsonb_build_object('source', 'master-data'), + now() + from master_registrations master + where master.nation is not null + and master.registration_number is not null + and not exists ( + select 1 + from eventhub.vehicle_registration existing + where existing.tenant_key = ? + and existing.event_source_id in (select id from compatible_sources) + and ( + (master.source_registration_entity_id is not null + and existing.source_registration_entity_id = master.source_registration_entity_id) + or ( + existing.nation = master.nation + and existing.registration_number = master.registration_number + ) + ) + ) + returning id + ) + select (select count(*) from updated_by_source) + + (select count(*) from updated_by_plate) + + (select count(*) from inserted) + """, + Long.class, + eventSourceId, + tenantKey, + tenantKey, + tenantKey, + tenantKey, + tenantKey + ); + return count == null ? 0 : Math.toIntExact(count); + } + private UUID resolveVehicleId( String tenantKey, int eventSourceId, @@ -386,6 +505,9 @@ public class VehicleIdentityRepository { } private void touchVehicle(UUID vehicleId, String sourceVehicleEntityId, String vin) { + if (sourceVehicleEntityId == null && vin == null) { + return; + } jdbcTemplate.update( """ update eventhub.vehicle @@ -393,10 +515,16 @@ public class VehicleIdentityRepository { vin = coalesce(vin, ?), updated_at = now() where id = ? + and ( + (source_vehicle_entity_id is null and ? is not null) + or (vin is null and ? is not null) + ) """, sourceVehicleEntityId, vin, - vehicleId + vehicleId, + sourceVehicleEntityId, + vin ); } @@ -448,6 +576,9 @@ public class VehicleIdentityRepository { } private void touchRegistration(UUID registrationId, String sourceRegistrationEntityId, String nation, String registrationNumber) { + if (sourceRegistrationEntityId == null && nation == null && registrationNumber == null) { + return; + } jdbcTemplate.update( """ update eventhub.vehicle_registration @@ -456,11 +587,19 @@ public class VehicleIdentityRepository { registration_number = coalesce(?, registration_number), updated_at = now() where id = ? + and ( + (source_registration_entity_id is null and ? is not null) + or (nation is null and ? is not null) + or (registration_number is null and ? is not null) + ) """, sourceRegistrationEntityId, nation, registrationNumber, - registrationId + registrationId, + sourceRegistrationEntityId, + nation, + registrationNumber ); } @@ -507,50 +646,6 @@ public class VehicleIdentityRepository { """; } - private RegistrationMasterRow registrationMasterRow( - int eventSourceId, - String sourceRegistrationEntityId, - String sourceExternalKey, - String nation, - String registrationNumber, - OffsetDateTime sourceUpdatedAt - ) { - String resolvedNation = nation; - String resolvedRegistrationNumber = registrationNumber; - if ((resolvedNation == null || resolvedRegistrationNumber == null) && sourceExternalKey != null) { - int separator = sourceExternalKey.indexOf(':'); - if (separator > -1) { - resolvedNation = resolvedNation == null ? normalizeNullable(sourceExternalKey.substring(0, separator)) : resolvedNation; - resolvedRegistrationNumber = resolvedRegistrationNumber == null - ? normalizeNullable(sourceExternalKey.substring(separator + 1)) - : resolvedRegistrationNumber; - } - } - return new RegistrationMasterRow( - eventSourceId, - sourceRegistrationEntityId, - resolvedNation, - resolvedRegistrationNumber, - sourceUpdatedAt - ); - } - - private OffsetDateTime offsetDateTime(Object value) { - if (value == null) { - return null; - } - if (value instanceof OffsetDateTime offsetDateTime) { - return offsetDateTime; - } - if (value instanceof Timestamp timestamp) { - return timestamp.toInstant().atOffset(ZoneOffset.UTC); - } - if (value instanceof java.time.LocalDateTime localDateTime) { - return localDateTime.atOffset(ZoneOffset.UTC); - } - return OffsetDateTime.parse(value.toString()); - } - private String toJson(Map value) { try { return objectMapper.writeValueAsString(value == null ? Map.of() : value); @@ -581,16 +676,4 @@ public class VehicleIdentityRepository { } } - private record VehicleMasterRow(int eventSourceId, String sourceVehicleEntityId, String vin, OffsetDateTime sourceUpdatedAt) { - } - - private record RegistrationMasterRow( - int eventSourceId, - String sourceRegistrationEntityId, - String nation, - String registrationNumber, - OffsetDateTime sourceUpdatedAt - ) { - } - } diff --git a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java index e7b0392..56eabec 100644 --- a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java +++ b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java @@ -8,7 +8,9 @@ import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.persistence.DataPackageRepository; import at.procon.eventhub.persistence.EventRepository; import at.procon.eventhub.persistence.EventSourceRepository; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,12 +74,22 @@ public class EventHubIngestionService { try { int insertedCount = eventRepository.batchInsert(packageId, packageInfo.tenantKey(), eventSourceId, sortedEvents); dataPackageRepository.markImported(packageId, insertedCount); - log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={}", - packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount); + log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={} byType={}", + packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount, eventTypeCounts(sortedEvents)); return new EventHubPackageResult(packageId, batch.packageKey(), sortedEvents.size(), insertedCount); } catch (RuntimeException ex) { dataPackageRepository.markFailed(packageId, ex.getMessage()); throw ex; } } + + private Map eventTypeCounts(List events) { + Map counts = new LinkedHashMap<>(); + for (EventHubEventDto event : events) { + String domain = event.eventDomain() == null ? "UNKNOWN_DOMAIN" : event.eventDomain().name(); + String type = event.eventType() == null ? "UNKNOWN_EVENT" : event.eventType().name(); + counts.merge(domain + "/" + type, 1, Integer::sum); + } + return counts; + } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java index 21d7899..175bc62 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java @@ -1,6 +1,5 @@ package at.procon.eventhub.tachograph.service; -import at.procon.eventhub.dto.EventHubEventDto; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportCursorStateDto; import at.procon.eventhub.dto.ImportScopeDto; @@ -14,7 +13,6 @@ import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -72,19 +70,19 @@ public class JdbcTachographExtractionBatchExecutor ImportPlanItemDto planItem, ImportTimeChunkDto chunk, ImportCursorStateDto cursor, - List events + ExtractedEventStats stats ) { return new TachographExtractionBatchResultDto( packageId, planItem.extractionCode(), planItem.sourceKind(), - events.size(), - events.size(), - events.size(), + stats.eventsMapped(), + stats.eventsMapped(), + stats.eventsMapped(), 0, true, - lastSourcePackageImportedAt(events, cursor), - lastSourcePackageId(events, cursor), + lastSourcePackageImportedAt(stats, cursor), + lastSourcePackageId(stats, cursor), null, chunk.occurredTo() ); diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java index 2d357a2..216ebad 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java @@ -46,7 +46,6 @@ public class TachographImportExecutionService return toTachographResult(createImportRun(request, false)); } - @Transactional public TachographImportRunResultDto startAndExecuteImport(TachographImportRequest request) { return toTachographResult(createImportRun(request, true)); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java index 5e6c342..be1fcdd 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java @@ -17,6 +17,7 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -46,6 +47,7 @@ public class TachographMasterDataRefreshService { private static final String RELATIONS_SQL_RESOURCE = "classpath:sql/tachograph/master-data/relations.sql"; private static final String MASTER_DATA_SOURCE_KIND = "MASTER_DATA"; private static final String MASTER_DATA_SOURCE_KEY = "TACHOGRAPH_MASTER_DATA"; + private static final int MASTER_DATA_UPSERT_BATCH_SIZE = 5000; private final ObjectProvider tachographJdbcTemplateProvider; private final SourceMasterDataRepository sourceMasterDataRepository; @@ -86,22 +88,18 @@ public class TachographMasterDataRefreshService { EventSourceDto masterDataSource = masterDataSourceFor(request.eventSource()); int eventSourceId = eventSourceRepository.resolveSourceId(tenantKey, masterDataSource); + log.info("Starting tachograph source master-data refresh tenant={} source={} batchSize={}", + tenantKey, masterDataSource.stableKey(), MASTER_DATA_UPSERT_BATCH_SIZE); + int entities = 0; for (String sqlResource : ENTITY_SQL_RESOURCES) { - List batch = tachographJdbcTemplate.query( - loadSql(sqlResource), - Map.of(), - (rs, rowNum) -> entity(rs) - ); - entities += sourceMasterDataRepository.upsertEntities(tenantKey, eventSourceId, batch); + entities += streamEntities(tachographJdbcTemplate, tenantKey, eventSourceId, sqlResource, loadSql(sqlResource)); } - List relations = tachographJdbcTemplate.query( - loadSql(RELATIONS_SQL_RESOURCE), - Map.of(), - (rs, rowNum) -> relation(rs) - ); - int relationCount = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, relations); + int relationCount = streamRelations(tachographJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE)); + + log.info("Reconciling tachograph vehicle identities from source master data tenant={} source={}", + tenantKey, masterDataSource.stableKey()); int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount); @@ -110,6 +108,109 @@ public class TachographMasterDataRefreshService { return result; } + private int streamEntities( + NamedParameterJdbcTemplate tachographJdbcTemplate, + String tenantKey, + int eventSourceId, + String sqlResource, + String sql + ) { + String section = masterDataSection(sqlResource); + List buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE); + Map typeCounts = new LinkedHashMap<>(); + int[] count = {0}; + int[] chunk = {0}; + + log.info("Reading tachograph master-data entities tenant={} section={}", tenantKey, section); + tachographJdbcTemplate.query(sql, Map.of(), rs -> { + SourceMasterEntityUpsert entity = entity(rs); + buffer.add(entity); + increment(typeCounts, entity.entityType()); + if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) { + count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts); + buffer.clear(); + } + }); + if (!buffer.isEmpty()) { + count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts); + } + log.info("Finished tachograph master-data entities tenant={} section={} processed={} byType={}", + tenantKey, section, count[0], typeCounts); + return count[0]; + } + + private int streamRelations( + NamedParameterJdbcTemplate tachographJdbcTemplate, + String tenantKey, + int eventSourceId, + String sqlResource, + String sql + ) { + String section = masterDataSection(sqlResource); + List buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE); + Map typeCounts = new LinkedHashMap<>(); + int[] count = {0}; + int[] chunk = {0}; + + log.info("Reading tachograph master-data relations tenant={} section={}", tenantKey, section); + tachographJdbcTemplate.query(sql, Map.of(), rs -> { + SourceMasterRelationUpsert relation = relation(rs); + buffer.add(relation); + increment(typeCounts, relation.relationType()); + if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) { + count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts); + buffer.clear(); + } + }); + if (!buffer.isEmpty()) { + count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts); + } + log.info("Finished tachograph master-data relations tenant={} section={} processed={} byType={}", + tenantKey, section, count[0], typeCounts); + return count[0]; + } + + private int flushEntities( + String tenantKey, + int eventSourceId, + String section, + int chunk, + List buffer, + int alreadyProcessed, + Map typeCounts + ) { + int upserted = sourceMasterDataRepository.upsertEntities(tenantKey, eventSourceId, buffer); + log.info("Tachograph master-data entity progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}", + tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts); + return upserted; + } + + private int flushRelations( + String tenantKey, + int eventSourceId, + String section, + int chunk, + List buffer, + int alreadyProcessed, + Map typeCounts + ) { + int upserted = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, buffer); + log.info("Tachograph master-data relation progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}", + tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts); + return upserted; + } + + private void increment(Map counts, String type) { + String key = type == null || type.isBlank() ? "UNKNOWN" : type; + counts.merge(key, 1, Integer::sum); + } + + private String masterDataSection(String sqlResource) { + int slash = sqlResource.lastIndexOf('/'); + String fileName = slash < 0 ? sqlResource : sqlResource.substring(slash + 1); + return fileName.endsWith(".sql") ? fileName.substring(0, fileName.length() - 4) : fileName; + } + private EventSourceDto masterDataSourceFor(EventSourceDto source) { return new EventSourceDto( source.providerKey(), diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 26d6e30..07f0b5f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -27,9 +27,10 @@ management: eventhub: batch: - completion-size: 1000 + completion-size: 5000 completion-timeout: 5s queue-size: 10000 + concurrent-consumers: 4 block-when-full: true queue-offer-timeout: 5m tachograph: @@ -44,25 +45,25 @@ eventhub: driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver # Enables the scheduler that regularly triggers configured tachograph import plans. - scheduler-enabled: false + scheduler-enabled: true scheduler-poll-interval-ms: 60000 # PLAN_ONLY creates import_run + planned extraction packages. # EXECUTE also invokes the configured TachographExtractionBatchExecutor. - scheduler-trigger-mode: PLAN_ONLY + scheduler-trigger-mode: EXECUTE # Example plan. Keep disabled until the tachograph datasource/extractor is wired. import-plans: - plan-key: kralowetz-tachograph-org-147 - enabled: false + enabled: true cron: "0 15 * * * *" # hourly at minute 15 tenant-key: kralowetz event-source: provider-key: TACHOGRAPH source-kind: MIXED source-key: TACHOGRAPH_DB - source-instance-key: tachograph-prod-db - tenant-provider-setting-key: kralowetz-tachograph-prod + source-instance-key: ByteBar-DriverSettlement + tenant-provider-setting-key: ByteBar-DriverSettlement source-group: type: ORGANISATION source-entity-id: "147" @@ -91,7 +92,7 @@ eventhub: scheduled-mode: INCREMENTAL_UPDATE initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP scheduled-strategy: SOURCE_PACKAGE_WATERMARK - refresh-master-data-first: true - initial-occurred-from: "2025-01-01T00:00:00+01:00" - initial-occurred-to: null - run-initial-on-startup: false + refresh-master-data-first: false + initial-occurred-from: "2026-01-21T00:00:00+01:00" + initial-occurred-to: "2026-01-25T00:00:00+01:00" + run-initial-on-startup: true diff --git a/src/main/resources/db/eventhub_schema_create.sql b/src/main/resources/db/eventhub_schema_create.sql new file mode 100644 index 0000000..0ab1530 --- /dev/null +++ b/src/main/resources/db/eventhub_schema_create.sql @@ -0,0 +1,346 @@ +create extension if not exists pgcrypto; +create extension if not exists postgis; +create extension if not exists timescaledb; + +drop schema if exists eventhub cascade; +create schema if not exists eventhub; + +create table if not exists eventhub.event_source ( + id integer generated always as identity primary key, + tenant_key text not null, + provider_key text not null, + source_kind text not null, + source_key text not null, + source_instance_key text not null default 'default', + tenant_provider_setting_key text, + external_fleet_key text, + created_at timestamptz not null default now(), + constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key) +); + +create table if not exists eventhub.import_run ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + mode text not null, + status text not null, + refresh_master_data_first boolean not null default true, + source_group_type text, + source_group_entity_id text, + source_group_code text, + source_group_name text, + import_scope_type text not null, + root_source_org_entity_id text, + root_source_org_code text, + root_source_org_name text, + include_children boolean not null default false, + occurred_from timestamptz, + occurred_to timestamptz, + requested_event_families text[] not null default '{}', + acquisition_strategy text, + metadata jsonb not null default '{}'::jsonb, + planned_package_count integer not null default 0, + started_at timestamptz not null default now(), + finished_at timestamptz, + error_message text, + constraint chk_import_run_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to) +); + +create table if not exists eventhub.import_cursor ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + scope_hash text not null, + event_family text not null, + source_kind text not null, + cursor_type text not null, + last_source_package_imported_at timestamptz, + last_source_package_id text, + last_source_row_updated_at timestamptz, + last_occurred_to timestamptz, + updated_at timestamptz not null default now(), + constraint ux_import_cursor unique (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type) +); + +create table if not exists eventhub.data_package ( + id uuid primary key, + event_source_id integer not null references eventhub.event_source(id), + import_run_id uuid references eventhub.import_run(id), + tenant_key text not null, + package_key text not null, + package_type text not null, + status text not null, + source_group_type text, + source_group_entity_id text, + source_group_code text, + source_group_name text, + import_scope_type text, + root_source_org_entity_id text, + root_source_org_code text, + root_source_org_name text, + include_children boolean not null default false, + occurred_from timestamptz, + occurred_to timestamptz, + event_family text, + business_date date, + external_package_id text, + extraction_code text, + extraction_source_kind text, + entity_axis text, + batch_no integer, + chunk_from timestamptz, + chunk_to timestamptz, + source_package_kind text, + source_package_id text, + source_package_entity_id text, + source_package_period_from timestamptz, + source_package_period_to timestamptz, + source_package_imported_at timestamptz, + received_at timestamptz not null default now(), + completed_at timestamptz, + event_count integer not null default 0, + metadata jsonb not null default '{}'::jsonb, + error_message text, + constraint ux_data_package_package_key unique (tenant_key, event_source_id, package_key), + constraint chk_data_package_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to), + constraint chk_data_package_chunk_time_order check (chunk_from is null or chunk_to is null or chunk_from < chunk_to) +); + +create table if not exists eventhub.source_master_entity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + entity_type text not null, + source_entity_id text not null, + source_external_key text, + display_name text, + active boolean, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_master_entity unique (tenant_key, event_source_id, entity_type, source_entity_id), + constraint chk_source_master_entity_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) +); + +create table if not exists eventhub.source_master_relation ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + relation_key text not null, + relation_type text not null, + from_entity_type text not null, + from_source_entity_id text not null, + to_entity_type text not null, + to_source_entity_id text not null, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_master_relation unique (tenant_key, event_source_id, relation_key), + constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) +); + +create table if not exists eventhub.vehicle ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_vehicle_entity_id text, + vin text, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table if not exists eventhub.vehicle_registration ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + source_registration_entity_id text, + nation text not null, + registration_number text not null, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +create table if not exists eventhub.vehicle_registration_assignment ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade, + vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint chk_vehicle_registration_assignment_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to) +); + +create table if not exists eventhub.event ( + id uuid not null, + event_source_id integer not null references eventhub.event_source(id), + data_package_id uuid not null references eventhub.data_package(id), + external_source_event_id text not null, + driver_entity_id uuid references eventhub.source_master_entity(id), + vehicle_id uuid references eventhub.vehicle(id), + vehicle_registration_id uuid references eventhub.vehicle_registration(id), + source_package_id text, + source_package_entity_id uuid references eventhub.source_master_entity(id), + occurred_at timestamptz not null, + received_partner_at timestamptz, + received_hub_at timestamptz not null default now(), + event_domain text not null, + event_type text not null, + lifecycle text not null, + odometer_m bigint, + position geography(Point, 4326), + payload jsonb not null default '{}'::jsonb, + manual_entry boolean not null default false, + source_record_key_hash text not null, + event_signature_hash text, + created_at timestamptz not null default now(), + constraint pk_event primary key (occurred_at, id), + constraint chk_event_driver_or_vehicle_ref check ( + driver_entity_id is not null + or vehicle_id is not null + or vehicle_registration_id is not null + ) +); + +create table if not exists eventhub.event_source_record ( + source_record_key_hash text primary key, + event_occurred_at timestamptz not null, + event_id uuid not null, + created_at timestamptz not null default now() +); + +create table if not exists eventhub.event_detail ( + event_occurred_at timestamptz not null, + event_id uuid not null, + detail_type text not null, + attributes jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + constraint pk_event_detail primary key (event_occurred_at, event_id, detail_type) +); + +select create_hypertable( + 'eventhub.event', + 'occurred_at', + chunk_time_interval => interval '7 days', + if_not_exists => true +); + +alter table eventhub.event_source_record + add constraint fk_event_source_record_event foreign key (event_occurred_at, event_id) + references eventhub.event(occurred_at, id) + on delete cascade + deferrable initially deferred; + +alter table eventhub.event_detail + add constraint fk_event_detail_event foreign key (event_occurred_at, event_id) + references eventhub.event(occurred_at, id) + on delete cascade; + +create index if not exists idx_data_package_source_time + on eventhub.data_package(tenant_key, event_source_id, received_at desc); + +create index if not exists idx_data_package_scope + on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to); + +create index if not exists idx_data_package_extraction + on eventhub.data_package(tenant_key, event_source_id, import_run_id, event_family, extraction_source_kind, extraction_code, batch_no); + +create index if not exists idx_import_run_source_status + on eventhub.import_run(tenant_key, event_source_id, status, started_at desc); + +create index if not exists idx_source_master_entity_type_key + on eventhub.source_master_entity(tenant_key, event_source_id, entity_type, source_external_key) + where source_external_key is not null; + +create index if not exists idx_source_master_entity_payload_gin + on eventhub.source_master_entity using gin(payload); + +create index if not exists idx_source_master_relation_from + on eventhub.source_master_relation(tenant_key, event_source_id, from_entity_type, from_source_entity_id, relation_type); + +create index if not exists idx_source_master_relation_to + on eventhub.source_master_relation(tenant_key, event_source_id, to_entity_type, to_source_entity_id, relation_type); + +create index if not exists idx_source_master_relation_payload_gin + on eventhub.source_master_relation using gin(payload); + +create index if not exists idx_vehicle_lookup_ctx + on eventhub.vehicle(tenant_key, event_source_id, updated_at desc); + +create index if not exists idx_vehicle_source_entity + on eventhub.vehicle(tenant_key, event_source_id, source_vehicle_entity_id) + where source_vehicle_entity_id is not null; + +create index if not exists idx_vehicle_vin + on eventhub.vehicle(tenant_key, event_source_id, vin) + where vin is not null; + +create index if not exists idx_vehicle_registration_source_entity + on eventhub.vehicle_registration(tenant_key, event_source_id, source_registration_entity_id) + where source_registration_entity_id is not null; + +create index if not exists idx_vehicle_registration_plate + on eventhub.vehicle_registration(tenant_key, event_source_id, nation, registration_number); + +create index if not exists idx_vehicle_registration_assignment_registration_time + on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to); + +create index if not exists idx_vehicle_registration_assignment_vehicle_time + on eventhub.vehicle_registration_assignment(vehicle_id, valid_from desc, valid_to); + +create index if not exists idx_event_source_record_event + on eventhub.event_source_record(event_occurred_at, event_id); + +create index if not exists idx_event_signature + on eventhub.event(event_signature_hash) + where event_signature_hash is not null; + +create index if not exists idx_event_source_time + on eventhub.event(event_source_id, occurred_at desc); + +create index if not exists idx_event_package_time + on eventhub.event(data_package_id, occurred_at desc); + +create index if not exists idx_event_source_package_id + on eventhub.event(source_package_id) + where source_package_id is not null; + +create index if not exists idx_event_domain_type_time + on eventhub.event(event_domain, event_type, occurred_at desc); + +create index if not exists idx_event_driver_time + on eventhub.event(driver_entity_id, occurred_at desc) + where driver_entity_id is not null; + +create index if not exists idx_event_vehicle_time + on eventhub.event(vehicle_id, occurred_at desc) + where vehicle_id is not null; + +create index if not exists idx_event_vehicle_registration_time + on eventhub.event(vehicle_registration_id, occurred_at desc) + where vehicle_registration_id is not null; + +create index if not exists idx_event_position_gist + on eventhub.event using gist(position) + where position is not null; + +create index if not exists idx_event_payload_gin + on eventhub.event using gin(payload); + +create index if not exists idx_event_detail_type + on eventhub.event_detail(detail_type); + +create index if not exists idx_event_detail_attributes_gin + on eventhub.event_detail using gin(attributes);