Optimize ingestion pipeline and reduce import contention

This commit is contained in:
trifonovt 2026-05-02 21:43:55 +02:00
parent c21530826c
commit 2e6e1aa5c6
13 changed files with 1409 additions and 334 deletions

View File

@ -29,7 +29,8 @@ public class EventHubBatchBuildProcessor implements Processor {
List<EventHubEventDto> events = exchange.getMessage().getBody(List.class);
List<EventHubEventDto> sortedEvents = sorter.sort(events);
String packageKey = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_KEY, String.class);
String aggregatePackageKey = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_KEY, String.class);
String packageKey = aggregatePackageKey + ":CAMEL-" + exchange.getExchangeId();
EventHubPackageRequest packageInfo = exchange.getMessage().getHeader(EventHubHeaders.PACKAGE_INFO, EventHubPackageRequest.class);
if (packageInfo == null && !sortedEvents.isEmpty()) {
packageInfo = sortedEvents.getFirst().packageInfo();
@ -46,6 +47,8 @@ public class EventHubBatchBuildProcessor implements Processor {
Map<String, Object> metadata = new HashMap<>();
metadata.put("camelRouteId", exchange.getFromRouteId());
metadata.put("packageKey", packageKey);
metadata.put("aggregatePackageKey", aggregatePackageKey);
metadata.put("camelExchangeId", exchange.getExchangeId());
metadata.put("eventCount", sortedEvents.size());
if (packageInfo != null) {
metadata.put("tenantKey", packageInfo.tenantKey());

View File

@ -57,6 +57,7 @@ public class EventHubCommonIngestionRoute extends RouteBuilder {
EventHubProperties.Batch batch = properties.getBatch();
return BATCH_INPUT_QUEUE
+ "?size=" + batch.getQueueSize()
+ "&concurrentConsumers=" + batch.getConcurrentConsumers()
+ "&blockWhenFull=" + batch.isBlockWhenFull()
+ "&offerTimeout=" + batch.getQueueOfferTimeout().toMillis();
}

View File

@ -32,7 +32,7 @@ public class EventHubProperties {
public static class Batch {
/** Number of events collected before a package is persisted. */
private int completionSize = 1000;
private int completionSize = 5000;
/** Maximum time to wait for more events belonging to the same package key. */
private Duration completionTimeout = Duration.ofSeconds(5);
@ -40,6 +40,9 @@ public class EventHubProperties {
/** Maximum number of normalized events buffered before producers apply backpressure. */
private int queueSize = 10000;
/** Number of parallel consumers draining the normalized-event queue. */
private int concurrentConsumers = 4;
/** Whether producers should wait for queue capacity instead of failing immediately. */
private boolean blockWhenFull = true;
@ -70,6 +73,14 @@ public class EventHubProperties {
this.queueSize = Math.max(1, queueSize);
}
public int getConcurrentConsumers() {
return concurrentConsumers;
}
public void setConcurrentConsumers(int concurrentConsumers) {
this.concurrentConsumers = Math.max(1, concurrentConsumers);
}
public boolean isBlockWhenFull() {
return blockWhenFull;
}

View File

@ -14,11 +14,13 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.camel.ProducerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
@ -27,6 +29,9 @@ import org.springframework.util.StreamUtils;
public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunRequest, B extends ExtractionBatchResult>
implements ExtractionBatchExecutor<R, B> {
private static final Logger log = LoggerFactory.getLogger(AbstractJdbcExtractionBatchExecutor.class);
private static final int EVENT_EXTRACTION_PROGRESS_LOG_INTERVAL = 5000;
private final NamedParameterJdbcTemplate jdbcTemplate;
private final ProducerTemplate producerTemplate;
private final ResourceLoader resourceLoader;
@ -81,10 +86,21 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
Map<String, Object> params = parameters(request, chunkScope, cursor);
String sql = loadSql(definition.sqlResource());
List<EventHubEventDto> events = jdbcTemplate.query(sql, params, (rs, rowNum) -> definition.rowMapper().map(rs, rowNum, context));
events.forEach(event -> producerTemplate.sendBody(normalizedInputUri(), event));
ExtractedEventStats stats = new ExtractedEventStats();
log.info("Reading EventHub events provider={} tenant={} importRunId={} packageId={} extractionCode={} sourceKind={} chunk={} occurredFrom={} occurredTo={}",
providerPackagePrefix(), request.tenantKey(), importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(),
chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo());
jdbcTemplate.query(sql, params, rs -> {
EventHubEventDto event = definition.rowMapper().map(rs, stats.eventsMapped(), context);
producerTemplate.sendBody(normalizedInputUri(), event);
stats.accept(event);
if (stats.eventsMapped() % EVENT_EXTRACTION_PROGRESS_LOG_INTERVAL == 0) {
logEventExtractionProgress(request, importRunId, packageId, planItem, chunk, stats);
}
});
logEventExtractionFinished(request, importRunId, packageId, planItem, chunk, stats);
return resultFor(packageId, planItem, chunk, cursor, events);
return resultFor(packageId, planItem, chunk, cursor, stats);
}
protected abstract Optional<ExtractionDefinition<R>> findDefinition(String code);
@ -96,7 +112,7 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
ImportCursorStateDto cursor,
List<EventHubEventDto> events
ExtractedEventStats stats
);
protected String providerPackagePrefix() {
@ -107,6 +123,32 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
return "direct:eventhub-normalized-input";
}
private void logEventExtractionProgress(
R request,
UUID importRunId,
UUID packageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
ExtractedEventStats stats
) {
log.info("EventHub event extraction progress provider={} tenant={} importRunId={} packageId={} extractionCode={} sourceKind={} chunk={} mapped={} byType={}",
providerPackagePrefix(), request.tenantKey(), importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(),
chunk.sequence(), stats.eventsMapped(), stats.eventTypeCounts());
}
private void logEventExtractionFinished(
R request,
UUID importRunId,
UUID packageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
ExtractedEventStats stats
) {
log.info("Finished EventHub event extraction provider={} tenant={} importRunId={} packageId={} extractionCode={} sourceKind={} chunk={} mapped={} byType={}",
providerPackagePrefix(), request.tenantKey(), importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(),
chunk.sequence(), stats.eventsMapped(), stats.eventTypeCounts());
}
protected Map<String, Object> parameters(R request, ImportScopeDto scope, ImportCursorStateDto cursor) {
Map<String, Object> params = new HashMap<>();
String organisationId = scope == null || scope.rootSourceOrganisation() == null
@ -125,24 +167,20 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
return params;
}
protected OffsetDateTime lastSourcePackageImportedAt(List<EventHubEventDto> events, ImportCursorStateDto cursor) {
return events.stream()
.map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().importedIntoSourceAt())
.filter(value -> value != null)
.max(OffsetDateTime::compareTo)
.orElse(cursor == null ? null : cursor.lastSourcePackageImportedAt());
protected OffsetDateTime lastSourcePackageImportedAt(ExtractedEventStats stats, ImportCursorStateDto cursor) {
return stats.lastSourcePackageImportedAt() == null
? cursor == null ? null : cursor.lastSourcePackageImportedAt()
: stats.lastSourcePackageImportedAt();
}
protected String lastSourcePackageId(List<EventHubEventDto> events, ImportCursorStateDto cursor) {
return events.stream()
.filter(event -> event.sourcePackageRef() != null && event.sourcePackageRef().importedIntoSourceAt() != null)
.max((left, right) -> left.sourcePackageRef().importedIntoSourceAt().compareTo(right.sourcePackageRef().importedIntoSourceAt()))
.map(event -> event.sourcePackageRef().sourcePackageId())
.orElseGet(() -> events.stream()
.map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId())
.filter(value -> value != null && !value.isBlank())
.max(this::compareSourcePackageId)
.orElse(cursor == null ? null : cursor.lastSourcePackageId()));
protected String lastSourcePackageId(ExtractedEventStats stats, ImportCursorStateDto cursor) {
if (stats.lastSourcePackageIdByImportedAt() != null) {
return stats.lastSourcePackageIdByImportedAt();
}
if (stats.maxSourcePackageId() != null) {
return stats.maxSourcePackageId();
}
return cursor == null ? null : cursor.lastSourcePackageId();
}
private EventHubPackageRequest packageInfo(
@ -215,4 +253,59 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
return null;
}
}
protected final class ExtractedEventStats {
private int eventsMapped;
private OffsetDateTime lastSourcePackageImportedAt;
private String lastSourcePackageIdByImportedAt;
private String maxSourcePackageId;
private final Map<String, Integer> eventTypeCounts = new LinkedHashMap<>();
public int eventsMapped() {
return eventsMapped;
}
public OffsetDateTime lastSourcePackageImportedAt() {
return lastSourcePackageImportedAt;
}
public String lastSourcePackageIdByImportedAt() {
return lastSourcePackageIdByImportedAt;
}
public String maxSourcePackageId() {
return maxSourcePackageId;
}
public Map<String, Integer> eventTypeCounts() {
return eventTypeCounts;
}
private void accept(EventHubEventDto event) {
eventsMapped++;
eventTypeCounts.merge(eventTypeKey(event), 1, Integer::sum);
if (event.sourcePackageRef() == null) {
return;
}
OffsetDateTime importedAt = event.sourcePackageRef().importedIntoSourceAt();
String sourcePackageId = event.sourcePackageRef().sourcePackageId();
if (importedAt != null
&& (lastSourcePackageImportedAt == null || importedAt.compareTo(lastSourcePackageImportedAt) > 0)) {
lastSourcePackageImportedAt = importedAt;
lastSourcePackageIdByImportedAt = sourcePackageId;
}
if (sourcePackageId != null
&& !sourcePackageId.isBlank()
&& (maxSourcePackageId == null || compareSourcePackageId(sourcePackageId, maxSourcePackageId) > 0)) {
maxSourcePackageId = sourcePackageId;
}
}
private String eventTypeKey(EventHubEventDto event) {
String domain = event.eventDomain() == null ? "UNKNOWN_DOMAIN" : event.eventDomain().name();
String type = event.eventType() == null ? "UNKNOWN_EVENT" : event.eventType().name();
return domain + "/" + type;
}
}
}

View File

@ -9,12 +9,16 @@ import at.procon.eventhub.service.EventAcquisitionRecordKeyService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@ -47,21 +51,22 @@ public class EventRepository {
*/
public int batchInsert(UUID packageId, String tenantKey, int eventSourceId, List<EventHubEventDto> events) {
Map<String, UUID> entityIdCache = new HashMap<>();
int insertedCount = 0;
List<ResolvedEventImportRow> rows = new ArrayList<>(events.size());
for (EventHubEventDto event : events) {
ResolvedEntityRefs refs = resolveEntityRefs(tenantKey, eventSourceId, event, entityIdCache);
InsertedEventRow row = upsertEvent(packageId, eventSourceId, event, refs);
upsertEventDetails(row, event);
if (row.inserted()) {
insertedCount++;
}
rows.add(resolveEventImportRow(packageId, eventSourceId, event, refs));
}
createEventImportStage();
stageEvents(rows);
int insertedCount = insertStagedEvents();
Map<String, InsertedEventRow> eventRowsBySourceRecord = resolveStagedEventRows();
batchUpsertEventDetails(rows, eventRowsBySourceRecord);
return insertedCount;
}
private InsertedEventRow upsertEvent(
private ResolvedEventImportRow resolveEventImportRow(
UUID packageId,
int eventSourceId,
EventHubEventDto event,
@ -70,62 +75,20 @@ public class EventRepository {
UUID requestedEventId = event.eventId() == null ? UUID.randomUUID() : event.eventId();
OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt();
String sourceRecordKeyHash = recordKeyService.buildSourceRecordKeyHash(event, eventSourceId);
var longitude = event.position() == null ? null : event.position().longitude();
var latitude = event.position() == null ? null : event.position().latitude();
Double longitude = event.position() == null ? null : event.position().longitude().doubleValue();
Double latitude = event.position() == null ? null : event.position().latitude().doubleValue();
String sourcePackageId = event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId();
return jdbcTemplate.query(
"""
with inserted as (
insert into eventhub.event(
id, event_source_id, data_package_id,
external_source_event_id,
driver_entity_id, vehicle_id, vehicle_registration_id, source_package_entity_id,
occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle,
odometer_m, position,
payload, manual_entry,
source_record_key_hash, event_signature_hash
) values (
?, ?, ?,
?,
?, ?, ?, ?,
?, ?, ?,
?, ?, ?,
?, case
when ?::double precision is null or ?::double precision is null then null
else ST_SetSRID(ST_MakePoint(?::double precision, ?::double precision), 4326)::geography
end,
?::jsonb, ?,
?, ?
)
on conflict (source_record_key_hash) do nothing
returning id, occurred_at, true as inserted
)
select id, occurred_at, inserted
from inserted
union all
select e.id, e.occurred_at, false as inserted
from eventhub.event e
where e.source_record_key_hash = ?
and not exists (select 1 from inserted)
""",
rs -> {
if (!rs.next()) {
throw new IllegalStateException("Could not insert or resolve event row for source record hash " + sourceRecordKeyHash);
}
return new InsertedEventRow(
(UUID) rs.getObject("id"),
rs.getObject("occurred_at", OffsetDateTime.class),
rs.getBoolean("inserted")
);
},
return new ResolvedEventImportRow(
sourceRecordKeyHash,
requestedEventId,
eventSourceId,
packageId,
eventSourceId,
event.externalSourceEventId(),
refs.driverEntityId(),
refs.vehicleId(),
refs.vehicleRegistrationId(),
sourcePackageId,
refs.sourcePackageEntityId(),
event.occurredAt(),
event.receivedPartnerAt(),
@ -136,21 +99,227 @@ public class EventRepository {
event.odometerM(),
longitude,
latitude,
longitude,
latitude,
toJson(event.payload()),
event.manualEntry(),
sourceRecordKeyHash,
recordKeyService.buildEventSignatureHash(event),
sourceRecordKeyHash
event.eventDetails() == null ? null : event.eventDetails().type(),
event.eventDetails() == null ? null : toJson(event.eventDetails().attributes())
);
}
private void upsertEventDetails(InsertedEventRow insertedEventRow, EventHubEventDto event) {
if (event.eventDetails() == null) {
private void createEventImportStage() {
jdbcTemplate.execute(
"""
create temporary table if not exists eventhub_event_import_stage (
row_no integer not null,
source_record_key_hash text not null,
requested_event_id uuid not null,
data_package_id uuid not null,
event_source_id integer not null,
external_source_event_id text not null,
driver_entity_id uuid,
vehicle_id uuid,
vehicle_registration_id uuid,
source_package_id text,
source_package_entity_id uuid,
occurred_at timestamptz not null,
received_partner_at timestamptz,
received_hub_at timestamptz not null,
event_domain text not null,
event_type text not null,
lifecycle text not null,
odometer_m bigint,
longitude double precision,
latitude double precision,
payload jsonb not null,
manual_entry boolean not null,
event_signature_hash text
) on commit drop
"""
);
jdbcTemplate.execute("truncate table eventhub_event_import_stage");
}
private void stageEvents(List<ResolvedEventImportRow> rows) {
jdbcTemplate.batchUpdate(
"""
insert into eventhub_event_import_stage(
row_no, source_record_key_hash, requested_event_id, data_package_id, event_source_id,
external_source_event_id, driver_entity_id, vehicle_id, vehicle_registration_id,
source_package_id, source_package_entity_id, occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle, odometer_m, longitude, latitude,
payload, manual_entry, event_signature_hash
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?)
""",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ResolvedEventImportRow row = rows.get(i);
ps.setInt(1, i);
ps.setString(2, row.sourceRecordKeyHash());
ps.setObject(3, row.requestedEventId());
ps.setObject(4, row.packageId());
ps.setInt(5, row.eventSourceId());
ps.setString(6, row.externalSourceEventId());
ps.setObject(7, row.driverEntityId());
ps.setObject(8, row.vehicleId());
ps.setObject(9, row.vehicleRegistrationId());
ps.setString(10, row.sourcePackageId());
ps.setObject(11, row.sourcePackageEntityId());
ps.setObject(12, row.occurredAt());
ps.setObject(13, row.receivedPartnerAt());
ps.setObject(14, row.receivedHubAt());
ps.setString(15, row.eventDomain());
ps.setString(16, row.eventType());
ps.setString(17, row.lifecycle());
ps.setObject(18, row.odometerM());
ps.setObject(19, row.longitude());
ps.setObject(20, row.latitude());
ps.setString(21, row.payloadJson());
ps.setBoolean(22, row.manualEntry());
ps.setString(23, row.eventSignatureHash());
}
@Override
public int getBatchSize() {
return rows.size();
}
}
);
}
private int insertStagedEvents() {
Long insertedCount = jdbcTemplate.queryForObject(
"""
with stage_one as (
select distinct on (source_record_key_hash) *
from eventhub_event_import_stage
order by source_record_key_hash, row_no
),
reserved_source_record as (
insert into eventhub.event_source_record(
source_record_key_hash, event_occurred_at, event_id
)
select source_record_key_hash, occurred_at, requested_event_id
from stage_one
on conflict (source_record_key_hash) do nothing
returning source_record_key_hash, event_occurred_at, event_id
),
existing_source_record as (
select source_record.source_record_key_hash,
source_record.event_occurred_at,
source_record.event_id
from stage_one stage
join eventhub.event_source_record source_record
on source_record.source_record_key_hash = stage.source_record_key_hash
where not exists (
select 1
from reserved_source_record reserved
where reserved.source_record_key_hash = stage.source_record_key_hash
)
),
source_record_for_stage as (
select *
from reserved_source_record
union all
select *
from existing_source_record
),
inserted_event as (
insert into eventhub.event(
id, event_source_id, data_package_id,
external_source_event_id,
driver_entity_id, vehicle_id, vehicle_registration_id,
source_package_id, source_package_entity_id,
occurred_at, received_partner_at, received_hub_at,
event_domain, event_type, lifecycle,
odometer_m, position,
payload, manual_entry,
source_record_key_hash, event_signature_hash
)
select
source_record.event_id, stage.event_source_id, stage.data_package_id,
stage.external_source_event_id,
stage.driver_entity_id, stage.vehicle_id, stage.vehicle_registration_id,
stage.source_package_id, stage.source_package_entity_id,
source_record.event_occurred_at, stage.received_partner_at, stage.received_hub_at,
stage.event_domain, stage.event_type, stage.lifecycle,
stage.odometer_m, case
when stage.longitude is null or stage.latitude is null then null
else ST_SetSRID(ST_MakePoint(stage.longitude, stage.latitude), 4326)::geography
end,
stage.payload, stage.manual_entry,
stage.source_record_key_hash, stage.event_signature_hash
from stage_one stage
join source_record_for_stage source_record
on source_record.source_record_key_hash = stage.source_record_key_hash
where not exists (
select 1
from eventhub.event existing_event
where existing_event.occurred_at = source_record.event_occurred_at
and existing_event.id = source_record.event_id
)
returning id
)
select count(*)
from inserted_event
""",
Long.class
);
return insertedCount == null ? 0 : Math.toIntExact(insertedCount);
}
private Map<String, InsertedEventRow> resolveStagedEventRows() {
return jdbcTemplate.query(
"""
select distinct
stage.source_record_key_hash,
source_record.event_id,
source_record.event_occurred_at
from eventhub_event_import_stage stage
join eventhub.event_source_record source_record
on source_record.source_record_key_hash = stage.source_record_key_hash
""",
rs -> {
Map<String, InsertedEventRow> rows = new HashMap<>();
while (rs.next()) {
rows.put(
rs.getString("source_record_key_hash"),
new InsertedEventRow(
(UUID) rs.getObject("event_id"),
rs.getObject("event_occurred_at", OffsetDateTime.class)
)
);
}
return rows;
}
);
}
private void batchUpsertEventDetails(
List<ResolvedEventImportRow> rows,
Map<String, InsertedEventRow> eventRowsBySourceRecord
) {
List<EventDetailImportRow> details = new ArrayList<>();
for (ResolvedEventImportRow row : rows) {
if (row.detailType() == null) {
continue;
}
InsertedEventRow insertedEventRow = eventRowsBySourceRecord.get(row.sourceRecordKeyHash());
if (insertedEventRow == null) {
throw new IllegalStateException("Could not insert or resolve event row for source record hash " + row.sourceRecordKeyHash());
}
details.add(new EventDetailImportRow(
insertedEventRow.eventId(),
insertedEventRow.occurredAt(),
row.detailType(),
row.detailAttributesJson()
));
}
if (details.isEmpty()) {
return;
}
jdbcTemplate.update(
jdbcTemplate.batchUpdate(
"""
insert into eventhub.event_detail(
event_occurred_at, event_id, detail_type, attributes
@ -159,10 +328,21 @@ public class EventRepository {
do update set
attributes = excluded.attributes
""",
insertedEventRow.occurredAt(),
insertedEventRow.eventId(),
event.eventDetails().type(),
toJson(event.eventDetails().attributes())
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
EventDetailImportRow detail = details.get(i);
ps.setObject(1, detail.occurredAt());
ps.setObject(2, detail.eventId());
ps.setString(3, detail.detailType());
ps.setString(4, detail.attributesJson());
}
@Override
public int getBatchSize() {
return details.size();
}
}
);
}
@ -341,8 +521,43 @@ public class EventRepository {
private record InsertedEventRow(
UUID eventId,
OffsetDateTime occurredAt
) {
}
private record ResolvedEventImportRow(
String sourceRecordKeyHash,
UUID requestedEventId,
UUID packageId,
int eventSourceId,
String externalSourceEventId,
UUID driverEntityId,
UUID vehicleId,
UUID vehicleRegistrationId,
String sourcePackageId,
UUID sourcePackageEntityId,
OffsetDateTime occurredAt,
boolean inserted
OffsetDateTime receivedPartnerAt,
OffsetDateTime receivedHubAt,
String eventDomain,
String eventType,
String lifecycle,
Long odometerM,
Double longitude,
Double latitude,
String payloadJson,
boolean manualEntry,
String eventSignatureHash,
String detailType,
String detailAttributesJson
) {
}
private record EventDetailImportRow(
UUID eventId,
OffsetDateTime occurredAt,
String detailType,
String attributesJson
) {
}
}

View File

@ -8,7 +8,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.POJONode;
import java.lang.reflect.Array;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
@ -18,8 +20,10 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@Repository
public class SourceMasterDataRepository {
@ -32,33 +36,15 @@ public class SourceMasterDataRepository {
this.objectMapper = objectMapper;
}
@Transactional
public int upsertEntities(String tenantKey, int eventSourceId, List<SourceMasterEntityUpsert> entities) {
int count = 0;
List<SourceMasterEntityStageRow> rows = new ArrayList<>(entities.size());
for (SourceMasterEntityUpsert entity : entities) {
if (entity.sourceEntityId() == null || entity.sourceEntityId().isBlank()) {
continue;
}
jdbcTemplate.update(
"""
insert into eventhub.source_master_entity(
id, tenant_key, event_source_id, entity_type, source_entity_id,
source_external_key, display_name, active, valid_from, valid_to,
source_updated_at, payload, updated_at
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
on conflict (tenant_key, event_source_id, entity_type, source_entity_id)
do update set
source_external_key = excluded.source_external_key,
display_name = excluded.display_name,
active = excluded.active,
valid_from = excluded.valid_from,
valid_to = excluded.valid_to,
source_updated_at = excluded.source_updated_at,
payload = excluded.payload,
updated_at = now()
""",
rows.add(new SourceMasterEntityStageRow(
UUID.randomUUID(),
tenantKey,
eventSourceId,
entity.entityType(),
entity.sourceEntityId(),
entity.sourceExternalKey(),
@ -68,43 +54,28 @@ public class SourceMasterDataRepository {
entity.validTo(),
entity.sourceUpdatedAt(),
toJson(entity.payload())
);
count++;
));
}
return count;
if (rows.isEmpty()) {
return 0;
}
createEntityStage();
stageEntities(rows);
upsertStagedEntities(tenantKey, eventSourceId);
return rows.size();
}
@Transactional
public int upsertRelations(String tenantKey, int eventSourceId, List<SourceMasterRelationUpsert> relations) {
int count = 0;
List<SourceMasterRelationStageRow> rows = new ArrayList<>(relations.size());
for (SourceMasterRelationUpsert relation : relations) {
if (relation.fromSourceEntityId() == null || relation.toSourceEntityId() == null) {
continue;
}
String relationKey = relationKey(relation);
jdbcTemplate.update(
"""
insert into eventhub.source_master_relation(
id, tenant_key, event_source_id, relation_key, relation_type,
from_entity_type, from_source_entity_id, to_entity_type, to_source_entity_id,
valid_from, valid_to, source_updated_at, payload, updated_at
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
on conflict (tenant_key, event_source_id, relation_key)
do update set
relation_type = excluded.relation_type,
from_entity_type = excluded.from_entity_type,
from_source_entity_id = excluded.from_source_entity_id,
to_entity_type = excluded.to_entity_type,
to_source_entity_id = excluded.to_source_entity_id,
valid_from = excluded.valid_from,
valid_to = excluded.valid_to,
source_updated_at = excluded.source_updated_at,
payload = excluded.payload,
updated_at = now()
""",
rows.add(new SourceMasterRelationStageRow(
UUID.randomUUID(),
tenantKey,
eventSourceId,
relationKey,
relationKey(relation),
relation.relationType(),
relation.fromEntityType(),
relation.fromSourceEntityId(),
@ -114,10 +85,194 @@ public class SourceMasterDataRepository {
relation.validTo(),
relation.sourceUpdatedAt(),
toJson(relation.payload())
);
count++;
));
}
return count;
if (rows.isEmpty()) {
return 0;
}
createRelationStage();
stageRelations(rows);
upsertStagedRelations(tenantKey, eventSourceId);
return rows.size();
}
private void createEntityStage() {
jdbcTemplate.execute(
"""
create temporary table if not exists eventhub_source_master_entity_stage (
row_no integer not null,
id uuid not null,
entity_type text not null,
source_entity_id text not null,
source_external_key text,
display_name text,
active boolean,
valid_from timestamptz,
valid_to timestamptz,
source_updated_at timestamptz,
payload jsonb not null
) on commit drop
"""
);
jdbcTemplate.execute("truncate table eventhub_source_master_entity_stage");
}
private void stageEntities(List<SourceMasterEntityStageRow> rows) {
jdbcTemplate.batchUpdate(
"""
insert into eventhub_source_master_entity_stage(
row_no, id, entity_type, source_entity_id, source_external_key, display_name,
active, valid_from, valid_to, source_updated_at, payload
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb)
""",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
SourceMasterEntityStageRow row = rows.get(i);
ps.setInt(1, i);
ps.setObject(2, row.id());
ps.setString(3, row.entityType());
ps.setString(4, row.sourceEntityId());
ps.setString(5, row.sourceExternalKey());
ps.setString(6, row.displayName());
ps.setObject(7, row.active());
ps.setObject(8, row.validFrom());
ps.setObject(9, row.validTo());
ps.setObject(10, row.sourceUpdatedAt());
ps.setString(11, row.payloadJson());
}
@Override
public int getBatchSize() {
return rows.size();
}
}
);
}
private void upsertStagedEntities(String tenantKey, int eventSourceId) {
jdbcTemplate.update(
"""
insert into eventhub.source_master_entity(
id, tenant_key, event_source_id, entity_type, source_entity_id,
source_external_key, display_name, active, valid_from, valid_to,
source_updated_at, payload, updated_at
)
select
id, ?, ?, entity_type, source_entity_id,
source_external_key, display_name, active, valid_from, valid_to,
source_updated_at, payload, now()
from (
select distinct on (entity_type, source_entity_id) *
from eventhub_source_master_entity_stage
order by entity_type, source_entity_id, row_no desc
) stage
on conflict (tenant_key, event_source_id, entity_type, source_entity_id)
do update set
source_external_key = excluded.source_external_key,
display_name = excluded.display_name,
active = excluded.active,
valid_from = excluded.valid_from,
valid_to = excluded.valid_to,
source_updated_at = excluded.source_updated_at,
payload = excluded.payload,
updated_at = now()
""",
tenantKey,
eventSourceId
);
}
private void createRelationStage() {
jdbcTemplate.execute(
"""
create temporary table if not exists eventhub_source_master_relation_stage (
row_no integer not null,
id uuid not null,
relation_key text not null,
relation_type text not null,
from_entity_type text not null,
from_source_entity_id text not null,
to_entity_type text not null,
to_source_entity_id text not null,
valid_from timestamptz,
valid_to timestamptz,
source_updated_at timestamptz,
payload jsonb not null
) on commit drop
"""
);
jdbcTemplate.execute("truncate table eventhub_source_master_relation_stage");
}
private void stageRelations(List<SourceMasterRelationStageRow> rows) {
jdbcTemplate.batchUpdate(
"""
insert into eventhub_source_master_relation_stage(
row_no, id, relation_key, relation_type, from_entity_type, from_source_entity_id,
to_entity_type, to_source_entity_id, valid_from, valid_to, source_updated_at, payload
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb)
""",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
SourceMasterRelationStageRow row = rows.get(i);
ps.setInt(1, i);
ps.setObject(2, row.id());
ps.setString(3, row.relationKey());
ps.setString(4, row.relationType());
ps.setString(5, row.fromEntityType());
ps.setString(6, row.fromSourceEntityId());
ps.setString(7, row.toEntityType());
ps.setString(8, row.toSourceEntityId());
ps.setObject(9, row.validFrom());
ps.setObject(10, row.validTo());
ps.setObject(11, row.sourceUpdatedAt());
ps.setString(12, row.payloadJson());
}
@Override
public int getBatchSize() {
return rows.size();
}
}
);
}
private void upsertStagedRelations(String tenantKey, int eventSourceId) {
jdbcTemplate.update(
"""
insert into eventhub.source_master_relation(
id, tenant_key, event_source_id, relation_key, relation_type,
from_entity_type, from_source_entity_id, to_entity_type, to_source_entity_id,
valid_from, valid_to, source_updated_at, payload, updated_at
)
select
id, ?, ?, relation_key, relation_type,
from_entity_type, from_source_entity_id, to_entity_type, to_source_entity_id,
valid_from, valid_to, source_updated_at, payload, now()
from (
select distinct on (relation_key) *
from eventhub_source_master_relation_stage
order by relation_key, row_no desc
) stage
on conflict (tenant_key, event_source_id, relation_key)
do update set
relation_type = excluded.relation_type,
from_entity_type = excluded.from_entity_type,
from_source_entity_id = excluded.from_source_entity_id,
to_entity_type = excluded.to_entity_type,
to_source_entity_id = excluded.to_source_entity_id,
valid_from = excluded.valid_from,
valid_to = excluded.valid_to,
source_updated_at = excluded.source_updated_at,
payload = excluded.payload,
updated_at = now()
""",
tenantKey,
eventSourceId
);
}
public UUID resolveOrCreateEntityId(
@ -135,8 +290,12 @@ public class SourceMasterDataRepository {
String normalizedSourceEntityId = normalizeRequired(sourceEntityId, "sourceEntityId");
String normalizedSourceExternalKey = normalizeNullable(sourceExternalKey);
String normalizedDisplayName = normalizeNullable(displayName);
UUID existing = findEntityId(normalizedTenantKey, eventSourceId, normalizedEntityType, normalizedSourceEntityId);
if (existing != null) {
return existing;
}
return jdbcTemplate.query(
UUID created = jdbcTemplate.query(
con -> {
PreparedStatement ps = con.prepareStatement(
"""
@ -144,13 +303,7 @@ public class SourceMasterDataRepository {
id, tenant_key, event_source_id, entity_type, source_entity_id,
source_external_key, display_name, active, payload, updated_at
) values (?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
on conflict (tenant_key, event_source_id, entity_type, source_entity_id)
do update set
source_external_key = coalesce(excluded.source_external_key, eventhub.source_master_entity.source_external_key),
display_name = coalesce(excluded.display_name, eventhub.source_master_entity.display_name),
active = coalesce(excluded.active, eventhub.source_master_entity.active),
payload = coalesce(excluded.payload, eventhub.source_master_entity.payload),
updated_at = now()
on conflict (tenant_key, event_source_id, entity_type, source_entity_id) do nothing
returning id
"""
);
@ -175,6 +328,36 @@ public class SourceMasterDataRepository {
return (UUID) rs.getObject(1);
}
);
if (created != null) {
return created;
}
UUID resolved = findEntityId(normalizedTenantKey, eventSourceId, normalizedEntityType, normalizedSourceEntityId);
if (resolved != null) {
return resolved;
}
throw new IllegalStateException(
"Could not resolve source master entity id for "
+ normalizedTenantKey + ":" + eventSourceId + ":" + normalizedEntityType + ":" + normalizedSourceEntityId
);
}
private UUID findEntityId(String tenantKey, int eventSourceId, String entityType, String sourceEntityId) {
return jdbcTemplate.query(
"""
select id
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id = ?
and entity_type = ?
and source_entity_id = ?
""",
rs -> rs.next() ? (UUID) rs.getObject("id") : null,
tenantKey,
eventSourceId,
entityType,
sourceEntityId
);
}
private String relationKey(SourceMasterRelationUpsert relation) {
@ -324,4 +507,33 @@ public class SourceMasterDataRepository {
String trimmed = value.trim();
return trimmed.isEmpty() ? null : trimmed;
}
private record SourceMasterEntityStageRow(
UUID id,
String entityType,
String sourceEntityId,
String sourceExternalKey,
String displayName,
Boolean active,
OffsetDateTime validFrom,
OffsetDateTime validTo,
OffsetDateTime sourceUpdatedAt,
String payloadJson
) {
}
private record SourceMasterRelationStageRow(
UUID id,
String relationKey,
String relationType,
String fromEntityType,
String fromSourceEntityId,
String toEntityType,
String toSourceEntityId,
OffsetDateTime validFrom,
OffsetDateTime validTo,
OffsetDateTime sourceUpdatedAt,
String payloadJson
) {
}
}

View File

@ -4,15 +4,14 @@ import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@Repository
public class VehicleIdentityRepository {
@ -90,103 +89,223 @@ public class VehicleIdentityRepository {
return new ResolvedVehicleReference(vehicleId, registrationId);
}
@Transactional
public int reconcileFromMasterData(String tenantKey, int eventSourceId) {
String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey");
int updates = 0;
List<VehicleMasterRow> vehicles = jdbcTemplate.query(
compatibleSourcesCte() + """
select event_source_id, source_entity_id, source_external_key, source_updated_at
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE'
""",
(rs, rowNum) -> new VehicleMasterRow(
rs.getInt("event_source_id"),
normalizeNullable(rs.getString("source_entity_id")),
normalizeNullable(rs.getString("source_external_key")),
offsetDateTime(rs.getObject("source_updated_at"))
),
eventSourceId,
normalizedTenantKey
);
for (VehicleMasterRow vehicle : vehicles) {
if (vehicle.sourceVehicleEntityId() == null && vehicle.vin() == null) {
continue;
}
UUID vehicleId = resolveVehicleId(normalizedTenantKey, vehicle.eventSourceId(), vehicle.sourceVehicleEntityId(), vehicle.vin());
if (vehicleId == null) {
createVehicle(normalizedTenantKey, vehicle.eventSourceId(), vehicle.sourceVehicleEntityId(), vehicle.vin());
} else {
touchVehicle(vehicleId, vehicle.sourceVehicleEntityId(), vehicle.vin());
}
updates++;
}
List<RegistrationMasterRow> registrations = jdbcTemplate.query(
compatibleSourcesCte() + """
select event_source_id,
source_entity_id,
source_external_key,
source_updated_at,
payload ->> 'registration_nation' as registration_nation,
payload ->> 'registration_number' as registration_number
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE_REGISTRATION'
""",
(rs, rowNum) -> registrationMasterRow(
rs.getInt("event_source_id"),
normalizeNullable(rs.getString("source_entity_id")),
normalizeNullable(rs.getString("source_external_key")),
normalizeNullable(rs.getString("registration_nation")),
normalizeNullable(rs.getString("registration_number")),
offsetDateTime(rs.getObject("source_updated_at"))
),
eventSourceId,
normalizedTenantKey
);
for (RegistrationMasterRow registration : registrations) {
if (registration.nation() == null || registration.registrationNumber() == null) {
continue;
}
UUID registrationId = resolveRegistrationId(
normalizedTenantKey,
registration.eventSourceId(),
registration.sourceRegistrationEntityId(),
registration.nation(),
registration.registrationNumber(),
null
);
if (registrationId == null) {
createRegistration(
normalizedTenantKey,
registration.eventSourceId(),
registration.sourceRegistrationEntityId(),
registration.nation(),
registration.registrationNumber(),
registration.sourceUpdatedAt(),
Map.of("source", "master-data")
);
} else {
updateRegistrationFromMasterData(
registrationId,
registration.sourceRegistrationEntityId(),
registration.nation(),
registration.registrationNumber(),
registration.sourceUpdatedAt()
);
}
updates++;
}
int updates = reconcileVehiclesFromMasterData(normalizedTenantKey, eventSourceId);
updates += reconcileRegistrationsFromMasterData(normalizedTenantKey, eventSourceId);
updates += projectVehicleRegistrationAssignments(normalizedTenantKey, eventSourceId);
return updates;
}
private int reconcileVehiclesFromMasterData(String tenantKey, int eventSourceId) {
Long count = jdbcTemplate.queryForObject(
compatibleSourcesCte() + """
, master_vehicles as (
select distinct on (
event_source_id,
nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), '')
)
event_source_id,
nullif(trim(source_entity_id), '') as source_vehicle_entity_id,
nullif(trim(source_external_key), '') as vin
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE'
and (
nullif(trim(source_entity_id), '') is not null
or nullif(trim(source_external_key), '') is not null
)
order by event_source_id,
nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), ''),
updated_at desc
),
updated_by_source as (
update eventhub.vehicle vehicle
set source_vehicle_entity_id = coalesce(vehicle.source_vehicle_entity_id, master.source_vehicle_entity_id),
vin = coalesce(vehicle.vin, master.vin),
updated_at = now()
from master_vehicles master
where vehicle.tenant_key = ?
and vehicle.event_source_id in (select id from compatible_sources)
and master.source_vehicle_entity_id is not null
and vehicle.source_vehicle_entity_id = master.source_vehicle_entity_id
returning vehicle.id
),
updated_by_vin as (
update eventhub.vehicle vehicle
set source_vehicle_entity_id = coalesce(vehicle.source_vehicle_entity_id, master.source_vehicle_entity_id),
vin = coalesce(vehicle.vin, master.vin),
updated_at = now()
from master_vehicles master
where vehicle.tenant_key = ?
and vehicle.event_source_id in (select id from compatible_sources)
and master.vin is not null
and vehicle.vin = master.vin
returning vehicle.id
),
inserted as (
insert into eventhub.vehicle(id, tenant_key, event_source_id, source_vehicle_entity_id, vin, updated_at)
select gen_random_uuid(), ?, master.event_source_id, master.source_vehicle_entity_id, master.vin, now()
from master_vehicles master
where not exists (
select 1
from eventhub.vehicle existing
where existing.tenant_key = ?
and existing.event_source_id in (select id from compatible_sources)
and (
(master.source_vehicle_entity_id is not null and existing.source_vehicle_entity_id = master.source_vehicle_entity_id)
or (master.vin is not null and existing.vin = master.vin)
)
)
returning id
)
select (select count(*) from updated_by_source)
+ (select count(*) from updated_by_vin)
+ (select count(*) from inserted)
""",
Long.class,
eventSourceId,
tenantKey,
tenantKey,
tenantKey,
tenantKey,
tenantKey
);
return count == null ? 0 : Math.toIntExact(count);
}
private int reconcileRegistrationsFromMasterData(String tenantKey, int eventSourceId) {
Long count = jdbcTemplate.queryForObject(
compatibleSourcesCte() + """
, master_registrations as (
select distinct on (
event_source_id,
nullif(trim(source_entity_id), ''),
coalesce(
nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '')
),
coalesce(
nullif(trim(payload ->> 'registration_number'), ''),
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
)
)
event_source_id,
nullif(trim(source_entity_id), '') as source_registration_entity_id,
coalesce(
nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '')
) as nation,
coalesce(
nullif(trim(payload ->> 'registration_number'), ''),
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
) as registration_number,
source_updated_at
from eventhub.source_master_entity
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE_REGISTRATION'
and (
nullif(trim(payload ->> 'registration_nation'), '') is not null
or source_external_key like '%:%'
)
and (
nullif(trim(payload ->> 'registration_number'), '') is not null
or source_external_key like '%:%'
)
order by event_source_id,
nullif(trim(source_entity_id), ''),
coalesce(
nullif(trim(payload ->> 'registration_nation'), ''),
nullif(split_part(source_external_key, ':', 1), '')
),
coalesce(
nullif(trim(payload ->> 'registration_number'), ''),
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
),
updated_at desc
),
updated_by_source as (
update eventhub.vehicle_registration registration
set source_registration_entity_id = coalesce(registration.source_registration_entity_id, master.source_registration_entity_id),
nation = coalesce(master.nation, registration.nation),
registration_number = coalesce(master.registration_number, registration.registration_number),
source_updated_at = master.source_updated_at,
updated_at = now()
from master_registrations master
where registration.tenant_key = ?
and registration.event_source_id in (select id from compatible_sources)
and master.source_registration_entity_id is not null
and registration.source_registration_entity_id = master.source_registration_entity_id
returning registration.id
),
updated_by_plate as (
update eventhub.vehicle_registration registration
set source_registration_entity_id = coalesce(registration.source_registration_entity_id, master.source_registration_entity_id),
nation = coalesce(master.nation, registration.nation),
registration_number = coalesce(master.registration_number, registration.registration_number),
source_updated_at = master.source_updated_at,
updated_at = now()
from master_registrations master
where registration.tenant_key = ?
and registration.event_source_id in (select id from compatible_sources)
and registration.nation = master.nation
and registration.registration_number = master.registration_number
returning registration.id
),
inserted as (
insert into eventhub.vehicle_registration(
id, tenant_key, event_source_id, source_registration_entity_id, nation, registration_number,
source_updated_at, payload, updated_at
)
select gen_random_uuid(),
?,
master.event_source_id,
master.source_registration_entity_id,
master.nation,
master.registration_number,
master.source_updated_at,
jsonb_build_object('source', 'master-data'),
now()
from master_registrations master
where master.nation is not null
and master.registration_number is not null
and not exists (
select 1
from eventhub.vehicle_registration existing
where existing.tenant_key = ?
and existing.event_source_id in (select id from compatible_sources)
and (
(master.source_registration_entity_id is not null
and existing.source_registration_entity_id = master.source_registration_entity_id)
or (
existing.nation = master.nation
and existing.registration_number = master.registration_number
)
)
)
returning id
)
select (select count(*) from updated_by_source)
+ (select count(*) from updated_by_plate)
+ (select count(*) from inserted)
""",
Long.class,
eventSourceId,
tenantKey,
tenantKey,
tenantKey,
tenantKey,
tenantKey
);
return count == null ? 0 : Math.toIntExact(count);
}
private UUID resolveVehicleId(
String tenantKey,
int eventSourceId,
@ -386,6 +505,9 @@ public class VehicleIdentityRepository {
}
private void touchVehicle(UUID vehicleId, String sourceVehicleEntityId, String vin) {
if (sourceVehicleEntityId == null && vin == null) {
return;
}
jdbcTemplate.update(
"""
update eventhub.vehicle
@ -393,10 +515,16 @@ public class VehicleIdentityRepository {
vin = coalesce(vin, ?),
updated_at = now()
where id = ?
and (
(source_vehicle_entity_id is null and ? is not null)
or (vin is null and ? is not null)
)
""",
sourceVehicleEntityId,
vin,
vehicleId
vehicleId,
sourceVehicleEntityId,
vin
);
}
@ -448,6 +576,9 @@ public class VehicleIdentityRepository {
}
private void touchRegistration(UUID registrationId, String sourceRegistrationEntityId, String nation, String registrationNumber) {
if (sourceRegistrationEntityId == null && nation == null && registrationNumber == null) {
return;
}
jdbcTemplate.update(
"""
update eventhub.vehicle_registration
@ -456,11 +587,19 @@ public class VehicleIdentityRepository {
registration_number = coalesce(?, registration_number),
updated_at = now()
where id = ?
and (
(source_registration_entity_id is null and ? is not null)
or (nation is null and ? is not null)
or (registration_number is null and ? is not null)
)
""",
sourceRegistrationEntityId,
nation,
registrationNumber,
registrationId
registrationId,
sourceRegistrationEntityId,
nation,
registrationNumber
);
}
@ -507,50 +646,6 @@ public class VehicleIdentityRepository {
""";
}
private RegistrationMasterRow registrationMasterRow(
int eventSourceId,
String sourceRegistrationEntityId,
String sourceExternalKey,
String nation,
String registrationNumber,
OffsetDateTime sourceUpdatedAt
) {
String resolvedNation = nation;
String resolvedRegistrationNumber = registrationNumber;
if ((resolvedNation == null || resolvedRegistrationNumber == null) && sourceExternalKey != null) {
int separator = sourceExternalKey.indexOf(':');
if (separator > -1) {
resolvedNation = resolvedNation == null ? normalizeNullable(sourceExternalKey.substring(0, separator)) : resolvedNation;
resolvedRegistrationNumber = resolvedRegistrationNumber == null
? normalizeNullable(sourceExternalKey.substring(separator + 1))
: resolvedRegistrationNumber;
}
}
return new RegistrationMasterRow(
eventSourceId,
sourceRegistrationEntityId,
resolvedNation,
resolvedRegistrationNumber,
sourceUpdatedAt
);
}
private OffsetDateTime offsetDateTime(Object value) {
if (value == null) {
return null;
}
if (value instanceof OffsetDateTime offsetDateTime) {
return offsetDateTime;
}
if (value instanceof Timestamp timestamp) {
return timestamp.toInstant().atOffset(ZoneOffset.UTC);
}
if (value instanceof java.time.LocalDateTime localDateTime) {
return localDateTime.atOffset(ZoneOffset.UTC);
}
return OffsetDateTime.parse(value.toString());
}
private String toJson(Map<String, Object> value) {
try {
return objectMapper.writeValueAsString(value == null ? Map.of() : value);
@ -581,16 +676,4 @@ public class VehicleIdentityRepository {
}
}
private record VehicleMasterRow(int eventSourceId, String sourceVehicleEntityId, String vin, OffsetDateTime sourceUpdatedAt) {
}
private record RegistrationMasterRow(
int eventSourceId,
String sourceRegistrationEntityId,
String nation,
String registrationNumber,
OffsetDateTime sourceUpdatedAt
) {
}
}

View File

@ -8,7 +8,9 @@ import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.persistence.DataPackageRepository;
import at.procon.eventhub.persistence.EventRepository;
import at.procon.eventhub.persistence.EventSourceRepository;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -72,12 +74,22 @@ public class EventHubIngestionService {
try {
int insertedCount = eventRepository.batchInsert(packageId, packageInfo.tenantKey(), eventSourceId, sortedEvents);
dataPackageRepository.markImported(packageId, insertedCount);
log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={}",
packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount);
log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={} byType={}",
packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount, eventTypeCounts(sortedEvents));
return new EventHubPackageResult(packageId, batch.packageKey(), sortedEvents.size(), insertedCount);
} catch (RuntimeException ex) {
dataPackageRepository.markFailed(packageId, ex.getMessage());
throw ex;
}
}
private Map<String, Integer> eventTypeCounts(List<EventHubEventDto> events) {
Map<String, Integer> counts = new LinkedHashMap<>();
for (EventHubEventDto event : events) {
String domain = event.eventDomain() == null ? "UNKNOWN_DOMAIN" : event.eventDomain().name();
String type = event.eventType() == null ? "UNKNOWN_EVENT" : event.eventType().name();
counts.merge(domain + "/" + type, 1, Integer::sum);
}
return counts;
}
}

View File

@ -1,6 +1,5 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportScopeDto;
@ -14,7 +13,6 @@ import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@ -72,19 +70,19 @@ public class JdbcTachographExtractionBatchExecutor
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
ImportCursorStateDto cursor,
List<EventHubEventDto> events
ExtractedEventStats stats
) {
return new TachographExtractionBatchResultDto(
packageId,
planItem.extractionCode(),
planItem.sourceKind(),
events.size(),
events.size(),
events.size(),
stats.eventsMapped(),
stats.eventsMapped(),
stats.eventsMapped(),
0,
true,
lastSourcePackageImportedAt(events, cursor),
lastSourcePackageId(events, cursor),
lastSourcePackageImportedAt(stats, cursor),
lastSourcePackageId(stats, cursor),
null,
chunk.occurredTo()
);

View File

@ -46,7 +46,6 @@ public class TachographImportExecutionService
return toTachographResult(createImportRun(request, false));
}
@Transactional
public TachographImportRunResultDto startAndExecuteImport(TachographImportRequest request) {
return toTachographResult(createImportRun(request, true));
}

View File

@ -17,6 +17,7 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -46,6 +47,7 @@ public class TachographMasterDataRefreshService {
private static final String RELATIONS_SQL_RESOURCE = "classpath:sql/tachograph/master-data/relations.sql";
private static final String MASTER_DATA_SOURCE_KIND = "MASTER_DATA";
private static final String MASTER_DATA_SOURCE_KEY = "TACHOGRAPH_MASTER_DATA";
private static final int MASTER_DATA_UPSERT_BATCH_SIZE = 5000;
private final ObjectProvider<NamedParameterJdbcTemplate> tachographJdbcTemplateProvider;
private final SourceMasterDataRepository sourceMasterDataRepository;
@ -86,22 +88,18 @@ public class TachographMasterDataRefreshService {
EventSourceDto masterDataSource = masterDataSourceFor(request.eventSource());
int eventSourceId = eventSourceRepository.resolveSourceId(tenantKey, masterDataSource);
log.info("Starting tachograph source master-data refresh tenant={} source={} batchSize={}",
tenantKey, masterDataSource.stableKey(), MASTER_DATA_UPSERT_BATCH_SIZE);
int entities = 0;
for (String sqlResource : ENTITY_SQL_RESOURCES) {
List<SourceMasterEntityUpsert> batch = tachographJdbcTemplate.query(
loadSql(sqlResource),
Map.of(),
(rs, rowNum) -> entity(rs)
);
entities += sourceMasterDataRepository.upsertEntities(tenantKey, eventSourceId, batch);
entities += streamEntities(tachographJdbcTemplate, tenantKey, eventSourceId, sqlResource, loadSql(sqlResource));
}
List<SourceMasterRelationUpsert> relations = tachographJdbcTemplate.query(
loadSql(RELATIONS_SQL_RESOURCE),
Map.of(),
(rs, rowNum) -> relation(rs)
);
int relationCount = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, relations);
int relationCount = streamRelations(tachographJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE));
log.info("Reconciling tachograph vehicle identities from source master data tenant={} source={}",
tenantKey, masterDataSource.stableKey());
int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount);
@ -110,6 +108,109 @@ public class TachographMasterDataRefreshService {
return result;
}
private int streamEntities(
NamedParameterJdbcTemplate tachographJdbcTemplate,
String tenantKey,
int eventSourceId,
String sqlResource,
String sql
) {
String section = masterDataSection(sqlResource);
List<SourceMasterEntityUpsert> buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE);
Map<String, Integer> typeCounts = new LinkedHashMap<>();
int[] count = {0};
int[] chunk = {0};
log.info("Reading tachograph master-data entities tenant={} section={}", tenantKey, section);
tachographJdbcTemplate.query(sql, Map.of(), rs -> {
SourceMasterEntityUpsert entity = entity(rs);
buffer.add(entity);
increment(typeCounts, entity.entityType());
if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) {
count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
buffer.clear();
}
});
if (!buffer.isEmpty()) {
count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
}
log.info("Finished tachograph master-data entities tenant={} section={} processed={} byType={}",
tenantKey, section, count[0], typeCounts);
return count[0];
}
private int streamRelations(
NamedParameterJdbcTemplate tachographJdbcTemplate,
String tenantKey,
int eventSourceId,
String sqlResource,
String sql
) {
String section = masterDataSection(sqlResource);
List<SourceMasterRelationUpsert> buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE);
Map<String, Integer> typeCounts = new LinkedHashMap<>();
int[] count = {0};
int[] chunk = {0};
log.info("Reading tachograph master-data relations tenant={} section={}", tenantKey, section);
tachographJdbcTemplate.query(sql, Map.of(), rs -> {
SourceMasterRelationUpsert relation = relation(rs);
buffer.add(relation);
increment(typeCounts, relation.relationType());
if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) {
count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
buffer.clear();
}
});
if (!buffer.isEmpty()) {
count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
}
log.info("Finished tachograph master-data relations tenant={} section={} processed={} byType={}",
tenantKey, section, count[0], typeCounts);
return count[0];
}
private int flushEntities(
String tenantKey,
int eventSourceId,
String section,
int chunk,
List<SourceMasterEntityUpsert> buffer,
int alreadyProcessed,
Map<String, Integer> typeCounts
) {
int upserted = sourceMasterDataRepository.upsertEntities(tenantKey, eventSourceId, buffer);
log.info("Tachograph master-data entity progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}",
tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts);
return upserted;
}
private int flushRelations(
String tenantKey,
int eventSourceId,
String section,
int chunk,
List<SourceMasterRelationUpsert> buffer,
int alreadyProcessed,
Map<String, Integer> typeCounts
) {
int upserted = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, buffer);
log.info("Tachograph master-data relation progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}",
tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts);
return upserted;
}
private void increment(Map<String, Integer> counts, String type) {
String key = type == null || type.isBlank() ? "UNKNOWN" : type;
counts.merge(key, 1, Integer::sum);
}
private String masterDataSection(String sqlResource) {
int slash = sqlResource.lastIndexOf('/');
String fileName = slash < 0 ? sqlResource : sqlResource.substring(slash + 1);
return fileName.endsWith(".sql") ? fileName.substring(0, fileName.length() - 4) : fileName;
}
private EventSourceDto masterDataSourceFor(EventSourceDto source) {
return new EventSourceDto(
source.providerKey(),

View File

@ -27,9 +27,10 @@ management:
eventhub:
batch:
completion-size: 1000
completion-size: 5000
completion-timeout: 5s
queue-size: 10000
concurrent-consumers: 4
block-when-full: true
queue-offer-timeout: 5m
tachograph:
@ -44,25 +45,25 @@ eventhub:
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# Enables the scheduler that regularly triggers configured tachograph import plans.
scheduler-enabled: false
scheduler-enabled: true
scheduler-poll-interval-ms: 60000
# PLAN_ONLY creates import_run + planned extraction packages.
# EXECUTE also invokes the configured TachographExtractionBatchExecutor.
scheduler-trigger-mode: PLAN_ONLY
scheduler-trigger-mode: EXECUTE
# Example plan. Keep disabled until the tachograph datasource/extractor is wired.
import-plans:
- plan-key: kralowetz-tachograph-org-147
enabled: false
enabled: true
cron: "0 15 * * * *" # hourly at minute 15
tenant-key: kralowetz
event-source:
provider-key: TACHOGRAPH
source-kind: MIXED
source-key: TACHOGRAPH_DB
source-instance-key: tachograph-prod-db
tenant-provider-setting-key: kralowetz-tachograph-prod
source-instance-key: ByteBar-DriverSettlement
tenant-provider-setting-key: ByteBar-DriverSettlement
source-group:
type: ORGANISATION
source-entity-id: "147"
@ -91,7 +92,7 @@ eventhub:
scheduled-mode: INCREMENTAL_UPDATE
initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP
scheduled-strategy: SOURCE_PACKAGE_WATERMARK
refresh-master-data-first: true
initial-occurred-from: "2025-01-01T00:00:00+01:00"
initial-occurred-to: null
run-initial-on-startup: false
refresh-master-data-first: false
initial-occurred-from: "2026-01-21T00:00:00+01:00"
initial-occurred-to: "2026-01-25T00:00:00+01:00"
run-initial-on-startup: true

View File

@ -0,0 +1,346 @@
create extension if not exists pgcrypto;
create extension if not exists postgis;
create extension if not exists timescaledb;
drop schema if exists eventhub cascade;
create schema if not exists eventhub;
create table if not exists eventhub.event_source (
id integer generated always as identity primary key,
tenant_key text not null,
provider_key text not null,
source_kind text not null,
source_key text not null,
source_instance_key text not null default 'default',
tenant_provider_setting_key text,
external_fleet_key text,
created_at timestamptz not null default now(),
constraint ux_event_source unique (tenant_key, provider_key, source_kind, source_key, source_instance_key)
);
create table if not exists eventhub.import_run (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
mode text not null,
status text not null,
refresh_master_data_first boolean not null default true,
source_group_type text,
source_group_entity_id text,
source_group_code text,
source_group_name text,
import_scope_type text not null,
root_source_org_entity_id text,
root_source_org_code text,
root_source_org_name text,
include_children boolean not null default false,
occurred_from timestamptz,
occurred_to timestamptz,
requested_event_families text[] not null default '{}',
acquisition_strategy text,
metadata jsonb not null default '{}'::jsonb,
planned_package_count integer not null default 0,
started_at timestamptz not null default now(),
finished_at timestamptz,
error_message text,
constraint chk_import_run_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to)
);
create table if not exists eventhub.import_cursor (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
scope_hash text not null,
event_family text not null,
source_kind text not null,
cursor_type text not null,
last_source_package_imported_at timestamptz,
last_source_package_id text,
last_source_row_updated_at timestamptz,
last_occurred_to timestamptz,
updated_at timestamptz not null default now(),
constraint ux_import_cursor unique (tenant_key, event_source_id, scope_hash, event_family, source_kind, cursor_type)
);
create table if not exists eventhub.data_package (
id uuid primary key,
event_source_id integer not null references eventhub.event_source(id),
import_run_id uuid references eventhub.import_run(id),
tenant_key text not null,
package_key text not null,
package_type text not null,
status text not null,
source_group_type text,
source_group_entity_id text,
source_group_code text,
source_group_name text,
import_scope_type text,
root_source_org_entity_id text,
root_source_org_code text,
root_source_org_name text,
include_children boolean not null default false,
occurred_from timestamptz,
occurred_to timestamptz,
event_family text,
business_date date,
external_package_id text,
extraction_code text,
extraction_source_kind text,
entity_axis text,
batch_no integer,
chunk_from timestamptz,
chunk_to timestamptz,
source_package_kind text,
source_package_id text,
source_package_entity_id text,
source_package_period_from timestamptz,
source_package_period_to timestamptz,
source_package_imported_at timestamptz,
received_at timestamptz not null default now(),
completed_at timestamptz,
event_count integer not null default 0,
metadata jsonb not null default '{}'::jsonb,
error_message text,
constraint ux_data_package_package_key unique (tenant_key, event_source_id, package_key),
constraint chk_data_package_occ_time_order check (occurred_from is null or occurred_to is null or occurred_from < occurred_to),
constraint chk_data_package_chunk_time_order check (chunk_from is null or chunk_to is null or chunk_from < chunk_to)
);
create table if not exists eventhub.source_master_entity (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
entity_type text not null,
source_entity_id text not null,
source_external_key text,
display_name text,
active boolean,
valid_from timestamptz,
valid_to timestamptz,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_master_entity unique (tenant_key, event_source_id, entity_type, source_entity_id),
constraint chk_source_master_entity_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to)
);
create table if not exists eventhub.source_master_relation (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
relation_key text not null,
relation_type text not null,
from_entity_type text not null,
from_source_entity_id text not null,
to_entity_type text not null,
to_source_entity_id text not null,
valid_from timestamptz,
valid_to timestamptz,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint ux_source_master_relation unique (tenant_key, event_source_id, relation_key),
constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to)
);
create table if not exists eventhub.vehicle (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_vehicle_entity_id text,
vin text,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists eventhub.vehicle_registration (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
source_registration_entity_id text,
nation text not null,
registration_number text not null,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists eventhub.vehicle_registration_assignment (
id uuid primary key,
tenant_key text not null,
event_source_id integer not null references eventhub.event_source(id),
vehicle_registration_id uuid not null references eventhub.vehicle_registration(id) on delete cascade,
vehicle_id uuid not null references eventhub.vehicle(id) on delete cascade,
valid_from timestamptz,
valid_to timestamptz,
source_updated_at timestamptz,
payload jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
constraint chk_vehicle_registration_assignment_valid_time_order check (valid_from is null or valid_to is null or valid_from <= valid_to)
);
create table if not exists eventhub.event (
id uuid not null,
event_source_id integer not null references eventhub.event_source(id),
data_package_id uuid not null references eventhub.data_package(id),
external_source_event_id text not null,
driver_entity_id uuid references eventhub.source_master_entity(id),
vehicle_id uuid references eventhub.vehicle(id),
vehicle_registration_id uuid references eventhub.vehicle_registration(id),
source_package_id text,
source_package_entity_id uuid references eventhub.source_master_entity(id),
occurred_at timestamptz not null,
received_partner_at timestamptz,
received_hub_at timestamptz not null default now(),
event_domain text not null,
event_type text not null,
lifecycle text not null,
odometer_m bigint,
position geography(Point, 4326),
payload jsonb not null default '{}'::jsonb,
manual_entry boolean not null default false,
source_record_key_hash text not null,
event_signature_hash text,
created_at timestamptz not null default now(),
constraint pk_event primary key (occurred_at, id),
constraint chk_event_driver_or_vehicle_ref check (
driver_entity_id is not null
or vehicle_id is not null
or vehicle_registration_id is not null
)
);
create table if not exists eventhub.event_source_record (
source_record_key_hash text primary key,
event_occurred_at timestamptz not null,
event_id uuid not null,
created_at timestamptz not null default now()
);
create table if not exists eventhub.event_detail (
event_occurred_at timestamptz not null,
event_id uuid not null,
detail_type text not null,
attributes jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
constraint pk_event_detail primary key (event_occurred_at, event_id, detail_type)
);
select create_hypertable(
'eventhub.event',
'occurred_at',
chunk_time_interval => interval '7 days',
if_not_exists => true
);
alter table eventhub.event_source_record
add constraint fk_event_source_record_event foreign key (event_occurred_at, event_id)
references eventhub.event(occurred_at, id)
on delete cascade
deferrable initially deferred;
alter table eventhub.event_detail
add constraint fk_event_detail_event foreign key (event_occurred_at, event_id)
references eventhub.event(occurred_at, id)
on delete cascade;
create index if not exists idx_data_package_source_time
on eventhub.data_package(tenant_key, event_source_id, received_at desc);
create index if not exists idx_data_package_scope
on eventhub.data_package(tenant_key, import_scope_type, root_source_org_entity_id, occurred_from, occurred_to);
create index if not exists idx_data_package_extraction
on eventhub.data_package(tenant_key, event_source_id, import_run_id, event_family, extraction_source_kind, extraction_code, batch_no);
create index if not exists idx_import_run_source_status
on eventhub.import_run(tenant_key, event_source_id, status, started_at desc);
create index if not exists idx_source_master_entity_type_key
on eventhub.source_master_entity(tenant_key, event_source_id, entity_type, source_external_key)
where source_external_key is not null;
create index if not exists idx_source_master_entity_payload_gin
on eventhub.source_master_entity using gin(payload);
create index if not exists idx_source_master_relation_from
on eventhub.source_master_relation(tenant_key, event_source_id, from_entity_type, from_source_entity_id, relation_type);
create index if not exists idx_source_master_relation_to
on eventhub.source_master_relation(tenant_key, event_source_id, to_entity_type, to_source_entity_id, relation_type);
create index if not exists idx_source_master_relation_payload_gin
on eventhub.source_master_relation using gin(payload);
create index if not exists idx_vehicle_lookup_ctx
on eventhub.vehicle(tenant_key, event_source_id, updated_at desc);
create index if not exists idx_vehicle_source_entity
on eventhub.vehicle(tenant_key, event_source_id, source_vehicle_entity_id)
where source_vehicle_entity_id is not null;
create index if not exists idx_vehicle_vin
on eventhub.vehicle(tenant_key, event_source_id, vin)
where vin is not null;
create index if not exists idx_vehicle_registration_source_entity
on eventhub.vehicle_registration(tenant_key, event_source_id, source_registration_entity_id)
where source_registration_entity_id is not null;
create index if not exists idx_vehicle_registration_plate
on eventhub.vehicle_registration(tenant_key, event_source_id, nation, registration_number);
create index if not exists idx_vehicle_registration_assignment_registration_time
on eventhub.vehicle_registration_assignment(vehicle_registration_id, valid_from desc, valid_to);
create index if not exists idx_vehicle_registration_assignment_vehicle_time
on eventhub.vehicle_registration_assignment(vehicle_id, valid_from desc, valid_to);
create index if not exists idx_event_source_record_event
on eventhub.event_source_record(event_occurred_at, event_id);
create index if not exists idx_event_signature
on eventhub.event(event_signature_hash)
where event_signature_hash is not null;
create index if not exists idx_event_source_time
on eventhub.event(event_source_id, occurred_at desc);
create index if not exists idx_event_package_time
on eventhub.event(data_package_id, occurred_at desc);
create index if not exists idx_event_source_package_id
on eventhub.event(source_package_id)
where source_package_id is not null;
create index if not exists idx_event_domain_type_time
on eventhub.event(event_domain, event_type, occurred_at desc);
create index if not exists idx_event_driver_time
on eventhub.event(driver_entity_id, occurred_at desc)
where driver_entity_id is not null;
create index if not exists idx_event_vehicle_time
on eventhub.event(vehicle_id, occurred_at desc)
where vehicle_id is not null;
create index if not exists idx_event_vehicle_registration_time
on eventhub.event(vehicle_registration_id, occurred_at desc)
where vehicle_registration_id is not null;
create index if not exists idx_event_position_gist
on eventhub.event using gist(position)
where position is not null;
create index if not exists idx_event_payload_gin
on eventhub.event using gin(payload);
create index if not exists idx_event_detail_type
on eventhub.event_detail(detail_type);
create index if not exists idx_event_detail_attributes_gin
on eventhub.event_detail using gin(attributes);