diff --git a/docs/timescale/V2__enable_timescale_hypertable_optional.sql b/docs/timescale/V2__enable_timescale_hypertable_optional.sql index 7fc49ec..d302eca 100644 --- a/docs/timescale/V2__enable_timescale_hypertable_optional.sql +++ b/docs/timescale/V2__enable_timescale_hypertable_optional.sql @@ -3,7 +3,7 @@ create extension if not exists timescaledb; select create_hypertable( - 'eventhub.acquired_event', + 'eventhub.event', 'occurred_at', if_not_exists => true, migrate_data => true diff --git a/pom.xml b/pom.xml index 332ce7d..73f332f 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,27 @@ + + org.apache.maven.plugins + maven-enforcer-plugin + 3.6.1 + + + enforce-java + + enforce + + + + + [21,) + This project requires Java 21+ to build and run. + + + + + + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/at/procon/eventhub/config/JacksonConfig.java b/src/main/java/at/procon/eventhub/config/JacksonConfig.java new file mode 100644 index 0000000..ce6a3ad --- /dev/null +++ b/src/main/java/at/procon/eventhub/config/JacksonConfig.java @@ -0,0 +1,17 @@ +package at.procon.eventhub.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class JacksonConfig { + + @Bean + @ConditionalOnMissingBean(ObjectMapper.class) + public ObjectMapper objectMapper() { + return new ObjectMapper().findAndRegisterModules(); + } +} + diff --git a/src/main/java/at/procon/eventhub/importing/masterdata/MasterDataRefreshResult.java b/src/main/java/at/procon/eventhub/importing/masterdata/MasterDataRefreshResult.java new file mode 100644 index 0000000..87361bd --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/masterdata/MasterDataRefreshResult.java @@ -0,0 +1,10 @@ +package at.procon.eventhub.importing.masterdata; + +public record MasterDataRefreshResult( + int entitiesUpserted, + int relationsUpserted +) { + public static MasterDataRefreshResult empty() { + return new MasterDataRefreshResult(0, 0); + } +} diff --git a/src/main/java/at/procon/eventhub/importing/masterdata/SourceMasterEntityUpsert.java b/src/main/java/at/procon/eventhub/importing/masterdata/SourceMasterEntityUpsert.java new file mode 100644 index 0000000..11a343a --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/masterdata/SourceMasterEntityUpsert.java @@ -0,0 +1,17 @@ +package at.procon.eventhub.importing.masterdata; + +import java.time.OffsetDateTime; +import java.util.Map; + +public record SourceMasterEntityUpsert( + String entityType, + String sourceEntityId, + String sourceExternalKey, + String displayName, + Boolean active, + OffsetDateTime validFrom, + OffsetDateTime validTo, + OffsetDateTime sourceUpdatedAt, + Map payload +) { +} diff --git a/src/main/java/at/procon/eventhub/importing/masterdata/SourceMasterRelationUpsert.java b/src/main/java/at/procon/eventhub/importing/masterdata/SourceMasterRelationUpsert.java new file mode 100644 index 0000000..144eb56 --- /dev/null +++ b/src/main/java/at/procon/eventhub/importing/masterdata/SourceMasterRelationUpsert.java @@ -0,0 +1,17 @@ +package at.procon.eventhub.importing.masterdata; + +import java.time.OffsetDateTime; +import java.util.Map; + +public record SourceMasterRelationUpsert( + String relationType, + String fromEntityType, + String fromSourceEntityId, + String toEntityType, + String toSourceEntityId, + OffsetDateTime validFrom, + OffsetDateTime validTo, + OffsetDateTime sourceUpdatedAt, + Map payload +) { +} diff --git a/src/main/java/at/procon/eventhub/persistence/EventRepository.java b/src/main/java/at/procon/eventhub/persistence/EventRepository.java index 30f7b90..f144d50 100644 --- a/src/main/java/at/procon/eventhub/persistence/EventRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/EventRepository.java @@ -10,12 +10,12 @@ import at.procon.eventhub.service.EventAcquisitionRecordKeyService; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import java.sql.PreparedStatement; -import java.sql.Types; import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.UUID; -import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; @@ -25,125 +25,325 @@ public class EventRepository { private final JdbcTemplate jdbcTemplate; private final ObjectMapper objectMapper; private final EventAcquisitionRecordKeyService recordKeyService; + private final SourceMasterDataRepository sourceMasterDataRepository; - public EventRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, EventAcquisitionRecordKeyService recordKeyService) { + public EventRepository( + JdbcTemplate jdbcTemplate, + ObjectMapper objectMapper, + EventAcquisitionRecordKeyService recordKeyService, + SourceMasterDataRepository sourceMasterDataRepository + ) { this.jdbcTemplate = jdbcTemplate; this.objectMapper = objectMapper; this.recordKeyService = recordKeyService; + this.sourceMasterDataRepository = sourceMasterDataRepository; } /** - * Acquisition-stage persistence. This table stores source records as imported. - * It does not merge or deduplicate equivalent events from different sources; - * later query/read models can combine sources when a preferred source has gaps. - * Organisation assignment is not stored per event; it belongs to master-data - * relations for driver/vehicle and can be resolved by occurredAt later. + * Persists normalized events and resolves master-data references on the fly. + * The source-record hash is unique and provides source-level import idempotency. */ - public int batchInsert(UUID packageId, int eventSourceId, List events) { - int[] counts = jdbcTemplate.batchUpdate( - """ - insert into eventhub.acquired_event( - id, event_source_id, data_package_id, - external_source_event_id, - driver_source_entity_id, driver_card_nation, driver_card_number, - vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number, - source_package_kind, source_package_id, source_package_entity_id, - source_package_period_from, source_package_period_to, source_package_imported_at, - occurred_at, received_partner_at, received_hub_at, - event_domain, event_type, lifecycle, - odometer_m, latitude, longitude, - event_details, payload, manual_entry, - source_record_key_hash, event_signature_hash - ) values ( - ?, ?, ?, - ?, - ?, ?, ?, - ?, ?, ?, ?, - ?, ?, ?, - ?, ?, ?, - ?, ?, ?, - ?, ?, ?, - ?, ?, ?, - ?::jsonb, ?::jsonb, ?, - ?, ? - ) - on conflict do nothing - """, - new BatchPreparedStatementSetter() { - @Override - public void setValues(PreparedStatement ps, int i) throws java.sql.SQLException { - EventHubEventDto event = events.get(i); - UUID eventId = event.eventId() == null ? UUID.randomUUID() : event.eventId(); - OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt(); - DriverRefDto driverRef = event.driverRef(); - DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard(); - VehicleRefDto vehicleRef = event.vehicleRef(); - VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration(); - SourcePackageRefDto sourcePackageRef = event.sourcePackageRef(); + public int batchInsert(UUID packageId, String tenantKey, int eventSourceId, List events) { + Map entityIdCache = new HashMap<>(); + int insertedCount = 0; - ps.setObject(1, eventId); - ps.setInt(2, eventSourceId); - ps.setObject(3, packageId); - ps.setString(4, event.externalSourceEventId()); - - ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId()); - ps.setString(6, driverCard == null ? null : driverCard.nation()); - ps.setString(7, driverCard == null ? null : driverCard.number()); - - ps.setString(8, vehicleRef == null ? null : vehicleRef.sourceEntityId()); - ps.setString(9, vehicleRef == null ? null : vehicleRef.vin()); - ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation()); - ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number()); - - ps.setString(12, sourcePackageRef == null ? null : sourcePackageRef.packageKind()); - ps.setString(13, sourcePackageRef == null ? null : sourcePackageRef.sourcePackageId()); - ps.setString(14, sourcePackageRef == null ? null : sourcePackageRef.sourceEntityId()); - ps.setObject(15, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodFrom()); - ps.setObject(16, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodTo()); - ps.setObject(17, sourcePackageRef == null ? null : sourcePackageRef.importedIntoSourceAt()); - - ps.setObject(18, event.occurredAt()); - ps.setObject(19, event.receivedPartnerAt()); - ps.setObject(20, receivedHubAt); - ps.setString(21, event.eventDomain().name()); - ps.setString(22, event.eventType().name()); - ps.setString(23, event.lifecycle().name()); - setNullableLong(ps, 24, event.odometerM()); - if (event.position() == null) { - ps.setNull(25, Types.NUMERIC); - ps.setNull(26, Types.NUMERIC); - } else { - ps.setObject(25, event.position().latitude()); - ps.setObject(26, event.position().longitude()); - } - ps.setString(27, toJson(objectMapper.valueToTree(event.eventDetails()))); - ps.setString(28, toJson(event.payload())); - ps.setBoolean(29, event.manualEntry()); - ps.setString(30, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId)); - ps.setString(31, recordKeyService.buildEventSignatureHash(event)); - } - - @Override - public int getBatchSize() { - return events.size(); - } - } - ); - - int inserted = 0; - for (int count : counts) { - if (count > 0 || count == PreparedStatement.SUCCESS_NO_INFO) { - inserted++; + for (EventHubEventDto event : events) { + ResolvedEntityRefs refs = resolveEntityRefs(tenantKey, eventSourceId, event, entityIdCache); + InsertedEventRow row = upsertEvent(packageId, eventSourceId, event, refs); + upsertEventDetails(row, event); + if (row.inserted()) { + insertedCount++; } } - return inserted; + + return insertedCount; } - private void setNullableLong(PreparedStatement ps, int index, Long value) throws java.sql.SQLException { + private InsertedEventRow upsertEvent( + UUID packageId, + int eventSourceId, + EventHubEventDto event, + ResolvedEntityRefs refs + ) { + UUID requestedEventId = event.eventId() == null ? UUID.randomUUID() : event.eventId(); + OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt(); + String sourceRecordKeyHash = recordKeyService.buildSourceRecordKeyHash(event, eventSourceId); + + return jdbcTemplate.query( + """ + with inserted as ( + insert into eventhub.event( + id, event_source_id, data_package_id, + external_source_event_id, + driver_entity_id, vehicle_entity_id, source_package_entity_id, + occurred_at, received_partner_at, received_hub_at, + event_domain, event_type, lifecycle, + odometer_m, position, + payload, manual_entry, + source_record_key_hash, event_signature_hash + ) values ( + ?, ?, ?, + ?, + ?, ?, ?, + ?, ?, ?, + ?, ?, ?, + ?, case + when ? is null or ? is null then null + else ST_SetSRID(ST_MakePoint(?, ?), 4326)::geography + end, + ?::jsonb, ?, + ?, ? + ) + on conflict (source_record_key_hash) do nothing + returning id, occurred_at, true as inserted + ) + select id, occurred_at, inserted + from inserted + union all + select e.id, e.occurred_at, false as inserted + from eventhub.event e + where e.source_record_key_hash = ? + and not exists (select 1 from inserted) + """, + rs -> { + if (!rs.next()) { + throw new IllegalStateException("Could not insert or resolve event row for source record hash " + sourceRecordKeyHash); + } + return new InsertedEventRow( + (UUID) rs.getObject("id"), + rs.getObject("occurred_at", OffsetDateTime.class), + rs.getBoolean("inserted") + ); + }, + requestedEventId, + eventSourceId, + packageId, + event.externalSourceEventId(), + refs.driverEntityId(), + refs.vehicleEntityId(), + refs.sourcePackageEntityId(), + event.occurredAt(), + event.receivedPartnerAt(), + receivedHubAt, + event.eventDomain().name(), + event.eventType().name(), + event.lifecycle().name(), + event.odometerM(), + event.position() == null ? null : event.position().longitude(), + event.position() == null ? null : event.position().latitude(), + event.position() == null ? null : event.position().longitude(), + event.position() == null ? null : event.position().latitude(), + toJson(event.payload()), + event.manualEntry(), + sourceRecordKeyHash, + recordKeyService.buildEventSignatureHash(event), + sourceRecordKeyHash + ); + } + + private void upsertEventDetails(InsertedEventRow insertedEventRow, EventHubEventDto event) { + if (event.eventDetails() == null) { + return; + } + jdbcTemplate.update( + """ + insert into eventhub.event_detail( + event_occurred_at, event_id, detail_type, attributes + ) values (?, ?, ?, ?::jsonb) + on conflict (event_occurred_at, event_id, detail_type) + do update set + attributes = excluded.attributes + """, + insertedEventRow.occurredAt(), + insertedEventRow.eventId(), + event.eventDetails().type(), + toJson(event.eventDetails().attributes()) + ); + } + + private ResolvedEntityRefs resolveEntityRefs( + String tenantKey, + int eventSourceId, + EventHubEventDto event, + Map entityIdCache + ) { + UUID driverEntityId = resolveDriverEntityId(tenantKey, eventSourceId, event, entityIdCache); + UUID vehicleEntityId = resolveVehicleEntityId(tenantKey, eventSourceId, event, entityIdCache); + UUID sourcePackageEntityId = resolveSourcePackageEntityId(tenantKey, eventSourceId, event, entityIdCache); + return new ResolvedEntityRefs(driverEntityId, vehicleEntityId, sourcePackageEntityId); + } + + private UUID resolveDriverEntityId( + String tenantKey, + int eventSourceId, + EventHubEventDto event, + Map entityIdCache + ) { + DriverRefDto driverRef = event.driverRef(); + if (driverRef == null || !driverRef.hasAnyReference()) { + return null; + } + + DriverCardRefDto card = driverRef.driverCard(); + String cardKey = card == null || !card.hasValue() ? null : card.stableKey(); + String sourceEntityId = normalizeNullable(driverRef.sourceEntityId()); + if (sourceEntityId == null && cardKey != null) { + sourceEntityId = "DRIVER_CARD:" + cardKey; + } + if (sourceEntityId == null) { + return null; + } + + Map payload = new LinkedHashMap<>(); + put(payload, "source_entity_id", driverRef.sourceEntityId()); + put(payload, "driver_card_nation", card == null ? null : card.nation()); + put(payload, "driver_card_number", card == null ? null : card.number()); + return resolveEntityId( + tenantKey, + eventSourceId, + "DRIVER", + sourceEntityId, + cardKey, + sourceEntityId, + null, + payload, + entityIdCache + ); + } + + private UUID resolveVehicleEntityId( + String tenantKey, + int eventSourceId, + EventHubEventDto event, + Map entityIdCache + ) { + VehicleRefDto vehicleRef = event.vehicleRef(); + if (vehicleRef == null || !vehicleRef.hasAnyReference()) { + return null; + } + + VehicleRegistrationRefDto registration = vehicleRef.vehicleRegistration(); + String registrationKey = registration == null || !registration.hasValue() ? null : registration.stableKey(); + String sourceEntityId = normalizeNullable(vehicleRef.sourceEntityId()); + if (sourceEntityId == null && normalizeNullable(vehicleRef.vin()) != null) { + sourceEntityId = "VIN:" + vehicleRef.vin(); + } + if (sourceEntityId == null && registrationKey != null) { + sourceEntityId = "VRN:" + registrationKey; + } + if (sourceEntityId == null) { + return null; + } + + Map payload = new LinkedHashMap<>(); + put(payload, "source_entity_id", vehicleRef.sourceEntityId()); + put(payload, "vin", vehicleRef.vin()); + put(payload, "vehicle_registration_nation", registration == null ? null : registration.nation()); + put(payload, "vehicle_registration_number", registration == null ? null : registration.number()); + return resolveEntityId( + tenantKey, + eventSourceId, + "VEHICLE", + sourceEntityId, + normalizeNullable(vehicleRef.vin()) == null ? registrationKey : vehicleRef.vin(), + sourceEntityId, + null, + payload, + entityIdCache + ); + } + + private UUID resolveSourcePackageEntityId( + String tenantKey, + int eventSourceId, + EventHubEventDto event, + Map entityIdCache + ) { + SourcePackageRefDto sourcePackageRef = event.sourcePackageRef(); + if (sourcePackageRef == null || !sourcePackageRef.hasAnyReference()) { + return null; + } + + String packageKind = normalizeNullable(sourcePackageRef.packageKind()); + String sourcePackageId = normalizeNullable(sourcePackageRef.sourcePackageId()); + + String sourceEntityId = normalizeNullable(sourcePackageRef.sourceEntityId()); + if (sourceEntityId == null && sourcePackageId != null) { + sourceEntityId = "SOURCE_PACKAGE:" + (packageKind == null ? "UNKNOWN" : packageKind) + ":" + sourcePackageId; + } + if (sourceEntityId == null) { + return null; + } + + Map payload = new LinkedHashMap<>(); + put(payload, "package_kind", sourcePackageRef.packageKind()); + put(payload, "source_package_id", sourcePackageRef.sourcePackageId()); + put(payload, "source_entity_id", sourcePackageRef.sourceEntityId()); + put(payload, "package_period_from", sourcePackageRef.packagePeriodFrom()); + put(payload, "package_period_to", sourcePackageRef.packagePeriodTo()); + put(payload, "imported_into_source_at", sourcePackageRef.importedIntoSourceAt()); + + String displayName = packageKind == null ? sourceEntityId : packageKind + ":" + (sourcePackageId == null ? sourceEntityId : sourcePackageId); + return resolveEntityId( + tenantKey, + eventSourceId, + "SOURCE_PACKAGE", + sourceEntityId, + sourcePackageId, + displayName, + null, + payload, + entityIdCache + ); + } + + private UUID resolveEntityId( + String tenantKey, + int eventSourceId, + String entityType, + String sourceEntityId, + String sourceExternalKey, + String displayName, + Boolean active, + Map payload, + Map entityIdCache + ) { + String normalizedSourceEntityId = normalizeNullable(sourceEntityId); + if (normalizedSourceEntityId == null) { + return null; + } + + String cacheKey = entityType + "|" + normalizedSourceEntityId + "|" + normalizeNullable(sourceExternalKey); + UUID cached = entityIdCache.get(cacheKey); + if (cached != null) { + return cached; + } + + UUID resolved = sourceMasterDataRepository.resolveOrCreateEntityId( + tenantKey, + eventSourceId, + entityType, + normalizedSourceEntityId, + normalizeNullable(sourceExternalKey), + normalizeNullable(displayName), + active, + payload + ); + entityIdCache.put(cacheKey, resolved); + return resolved; + } + + private String normalizeNullable(String value) { if (value == null) { - ps.setNull(index, Types.BIGINT); - } else { - ps.setLong(index, value); + return null; + } + String trimmed = value.trim(); + return trimmed.isEmpty() ? null : trimmed; + } + + private void put(Map target, String key, Object value) { + if (value != null) { + target.put(key, value); } } @@ -154,4 +354,18 @@ public class EventRepository { throw new IllegalArgumentException("Cannot serialize JSONB value", e); } } + + private record ResolvedEntityRefs( + UUID driverEntityId, + UUID vehicleEntityId, + UUID sourcePackageEntityId + ) { + } + + private record InsertedEventRow( + UUID eventId, + OffsetDateTime occurredAt, + boolean inserted + ) { + } } diff --git a/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java b/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java new file mode 100644 index 0000000..4c75c02 --- /dev/null +++ b/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java @@ -0,0 +1,205 @@ +package at.procon.eventhub.persistence; + +import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert; +import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.PreparedStatement; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +public class SourceMasterDataRepository { + + private final JdbcTemplate jdbcTemplate; + private final ObjectMapper objectMapper; + + public SourceMasterDataRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) { + this.jdbcTemplate = jdbcTemplate; + this.objectMapper = objectMapper; + } + + public int upsertEntities(String tenantKey, int eventSourceId, List entities) { + int count = 0; + for (SourceMasterEntityUpsert entity : entities) { + if (entity.sourceEntityId() == null || entity.sourceEntityId().isBlank()) { + continue; + } + jdbcTemplate.update( + """ + insert into eventhub.source_master_entity( + id, tenant_key, event_source_id, entity_type, source_entity_id, + source_external_key, display_name, active, valid_from, valid_to, + source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) + on conflict (tenant_key, event_source_id, entity_type, source_entity_id) + do update set + source_external_key = excluded.source_external_key, + display_name = excluded.display_name, + active = excluded.active, + valid_from = excluded.valid_from, + valid_to = excluded.valid_to, + source_updated_at = excluded.source_updated_at, + payload = excluded.payload, + updated_at = now() + """, + UUID.randomUUID(), + tenantKey, + eventSourceId, + entity.entityType(), + entity.sourceEntityId(), + entity.sourceExternalKey(), + entity.displayName(), + entity.active(), + entity.validFrom(), + entity.validTo(), + entity.sourceUpdatedAt(), + toJson(entity.payload()) + ); + count++; + } + return count; + } + + public int upsertRelations(String tenantKey, int eventSourceId, List relations) { + int count = 0; + for (SourceMasterRelationUpsert relation : relations) { + if (relation.fromSourceEntityId() == null || relation.toSourceEntityId() == null) { + continue; + } + String relationKey = relationKey(relation); + jdbcTemplate.update( + """ + insert into eventhub.source_master_relation( + id, tenant_key, event_source_id, relation_key, relation_type, + from_entity_type, from_source_entity_id, to_entity_type, to_source_entity_id, + valid_from, valid_to, source_updated_at, payload, updated_at + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) + on conflict (tenant_key, event_source_id, relation_key) + do update set + relation_type = excluded.relation_type, + from_entity_type = excluded.from_entity_type, + from_source_entity_id = excluded.from_source_entity_id, + to_entity_type = excluded.to_entity_type, + to_source_entity_id = excluded.to_source_entity_id, + valid_from = excluded.valid_from, + valid_to = excluded.valid_to, + source_updated_at = excluded.source_updated_at, + payload = excluded.payload, + updated_at = now() + """, + UUID.randomUUID(), + tenantKey, + eventSourceId, + relationKey, + relation.relationType(), + relation.fromEntityType(), + relation.fromSourceEntityId(), + relation.toEntityType(), + relation.toSourceEntityId(), + relation.validFrom(), + relation.validTo(), + relation.sourceUpdatedAt(), + toJson(relation.payload()) + ); + count++; + } + return count; + } + + public UUID resolveOrCreateEntityId( + String tenantKey, + int eventSourceId, + String entityType, + String sourceEntityId, + String sourceExternalKey, + String displayName, + Boolean active, + Map payload + ) { + String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); + String normalizedEntityType = normalizeRequired(entityType, "entityType").toUpperCase(); + String normalizedSourceEntityId = normalizeRequired(sourceEntityId, "sourceEntityId"); + String normalizedSourceExternalKey = normalizeNullable(sourceExternalKey); + String normalizedDisplayName = normalizeNullable(displayName); + + return jdbcTemplate.query( + con -> { + PreparedStatement ps = con.prepareStatement( + """ + insert into eventhub.source_master_entity( + id, tenant_key, event_source_id, entity_type, source_entity_id, + source_external_key, display_name, active, payload, updated_at + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now()) + on conflict (tenant_key, event_source_id, entity_type, source_entity_id) + do update set + source_external_key = coalesce(excluded.source_external_key, eventhub.source_master_entity.source_external_key), + display_name = coalesce(excluded.display_name, eventhub.source_master_entity.display_name), + active = coalesce(excluded.active, eventhub.source_master_entity.active), + payload = coalesce(excluded.payload, eventhub.source_master_entity.payload), + updated_at = now() + returning id + """ + ); + ps.setObject(1, UUID.randomUUID()); + ps.setString(2, normalizedTenantKey); + ps.setInt(3, eventSourceId); + ps.setString(4, normalizedEntityType); + ps.setString(5, normalizedSourceEntityId); + ps.setString(6, normalizedSourceExternalKey); + ps.setString(7, normalizedDisplayName); + ps.setObject(8, active); + ps.setString(9, toJson(payload)); + return ps; + }, + rs -> { + if (!rs.next()) { + throw new IllegalStateException( + "Could not resolve source master entity id for " + + normalizedTenantKey + ":" + eventSourceId + ":" + normalizedEntityType + ":" + normalizedSourceEntityId + ); + } + return (UUID) rs.getObject(1); + } + ); + } + + private String relationKey(SourceMasterRelationUpsert relation) { + String validFrom = relation.validFrom() == null ? "MIN" : relation.validFrom().toString(); + String validTo = relation.validTo() == null ? "MAX" : relation.validTo().toString(); + return relation.relationType() + + ":" + relation.fromEntityType() + + ":" + relation.fromSourceEntityId() + + ":" + relation.toEntityType() + + ":" + relation.toSourceEntityId() + + ":" + validFrom + + ":" + validTo; + } + + private String toJson(Map value) { + try { + return objectMapper.writeValueAsString(value == null ? Map.of() : value); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Cannot serialize source master data payload", e); + } + } + + private String normalizeRequired(String value, String fieldName) { + String normalized = normalizeNullable(value); + if (normalized == null) { + throw new IllegalArgumentException(fieldName + " must not be blank"); + } + return normalized; + } + + private String normalizeNullable(String value) { + if (value == null) { + return null; + } + String trimmed = value.trim(); + return trimmed.isEmpty() ? null : trimmed; + } +} diff --git a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java index a78163a..e7b0392 100644 --- a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java +++ b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java @@ -70,7 +70,7 @@ public class EventHubIngestionService { ); try { - int insertedCount = eventRepository.batchInsert(packageId, eventSourceId, sortedEvents); + int insertedCount = eventRepository.batchInsert(packageId, packageInfo.tenantKey(), eventSourceId, sortedEvents); dataPackageRepository.markImported(packageId, insertedCount); log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={}", packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount); diff --git a/src/main/java/at/procon/eventhub/tachograph/api/TachographIngestionController.java b/src/main/java/at/procon/eventhub/tachograph/api/TachographIngestionController.java index 1b13c72..2fffd8e 100644 --- a/src/main/java/at/procon/eventhub/tachograph/api/TachographIngestionController.java +++ b/src/main/java/at/procon/eventhub/tachograph/api/TachographIngestionController.java @@ -3,6 +3,7 @@ package at.procon.eventhub.tachograph.api; import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.ImportMode; import at.procon.eventhub.dto.SchedulerTriggerMode; +import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult; import at.procon.eventhub.tachograph.dto.ConfiguredTachographImportPlanDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; import at.procon.eventhub.tachograph.dto.TachographImportRunResultDto; @@ -11,6 +12,7 @@ import at.procon.eventhub.tachograph.dto.source.TachographActivityDto; import at.procon.eventhub.tachograph.service.TachographConfiguredImportPlanService; import at.procon.eventhub.tachograph.service.TachographImportExecutionService; import at.procon.eventhub.tachograph.service.TachographImportPlanService; +import at.procon.eventhub.tachograph.service.TachographMasterDataRefreshService; import jakarta.validation.Valid; import java.time.OffsetDateTime; import java.util.List; @@ -33,17 +35,20 @@ public class TachographIngestionController { private final TachographImportPlanService tachographImportPlanService; private final TachographConfiguredImportPlanService configuredImportPlanService; private final TachographImportExecutionService tachographImportExecutionService; + private final TachographMasterDataRefreshService masterDataRefreshService; public TachographIngestionController( ProducerTemplate producerTemplate, TachographImportPlanService tachographImportPlanService, TachographConfiguredImportPlanService configuredImportPlanService, - TachographImportExecutionService tachographImportExecutionService + TachographImportExecutionService tachographImportExecutionService, + TachographMasterDataRefreshService masterDataRefreshService ) { this.producerTemplate = producerTemplate; this.tachographImportPlanService = tachographImportPlanService; this.configuredImportPlanService = configuredImportPlanService; this.tachographImportExecutionService = tachographImportExecutionService; + this.masterDataRefreshService = masterDataRefreshService; } @PostMapping("/activities") @@ -68,6 +73,13 @@ public class TachographIngestionController { return ResponseEntity.accepted().body(result); } + @PostMapping("/master-data/refresh") + public ResponseEntity refreshTachographMasterData( + @Valid @RequestBody TachographImportRequest request + ) { + return ResponseEntity.ok(masterDataRefreshService.refresh(request)); + } + @GetMapping("/imports/configured-plans") public ResponseEntity> listConfiguredTachographPlans() { return ResponseEntity.ok(configuredImportPlanService.listPlans()); @@ -99,6 +111,16 @@ public class TachographIngestionController { )); } + @PostMapping("/imports/configured-plans/{planKey}/master-data/refresh") + public ResponseEntity refreshConfiguredTachographMasterData( + @PathVariable String planKey, + @RequestParam(required = false) ImportMode mode, + @RequestParam(required = false) AcquisitionStrategy strategy + ) { + TachographImportRequest request = configuredImportPlanService.createRequest(planKey, mode, strategy); + return ResponseEntity.ok(masterDataRefreshService.refresh(request)); + } + private ResponseEntity> accepted(int count, String route) { return ResponseEntity.accepted().body(Map.of( "accepted", count, diff --git a/src/main/java/at/procon/eventhub/tachograph/config/TachographDataSourceConfig.java b/src/main/java/at/procon/eventhub/tachograph/config/TachographDataSourceConfig.java index a57c56d..1216006 100644 --- a/src/main/java/at/procon/eventhub/tachograph/config/TachographDataSourceConfig.java +++ b/src/main/java/at/procon/eventhub/tachograph/config/TachographDataSourceConfig.java @@ -1,7 +1,7 @@ package at.procon.eventhub.tachograph.config; -import at.procon.eventhub.config.EventHubProperties; import javax.sql.DataSource; +import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; @@ -13,15 +13,24 @@ import org.springframework.jdbc.datasource.DriverManagerDataSource; @ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''") public class TachographDataSourceConfig { + private static final String SQL_SERVER_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; + private static final String SQL_SERVER_JDBC_PREFIX = "jdbc:sqlserver://"; + @Bean - public DataSource tachographDataSource(EventHubProperties properties) { - EventHubProperties.TachographDataSource config = properties.getTachograph().getDatasource(); + @ConfigurationProperties(prefix = "eventhub.tachograph.datasource") + public TachographDataSourceProperties tachographDataSourceProperties() { + return new TachographDataSourceProperties(); + } + + @Bean(defaultCandidate = false) + public DataSource tachographDataSource(TachographDataSourceProperties config) { DriverManagerDataSource dataSource = new DriverManagerDataSource(); - dataSource.setUrl(config.getJdbcUrl()); + dataSource.setUrl(validateJdbcUrl(config)); dataSource.setUsername(config.getUsername()); dataSource.setPassword(config.getPassword()); - if (config.getDriverClassName() != null && !config.getDriverClassName().isBlank()) { - dataSource.setDriverClassName(config.getDriverClassName()); + String driverClassName = trimToNull(config.getDriverClassName()); + if (driverClassName != null) { + dataSource.setDriverClassName(driverClassName); } return dataSource; } @@ -32,4 +41,75 @@ public class TachographDataSourceConfig { ) { return new NamedParameterJdbcTemplate(tachographDataSource); } + + private String validateJdbcUrl(TachographDataSourceProperties config) { + String jdbcUrl = trimToNull(config.getJdbcUrl()); + if (jdbcUrl == null) { + throw new IllegalStateException("eventhub.tachograph.datasource.jdbc-url must not be empty"); + } + + String driverClassName = trimToNull(config.getDriverClassName()); + if (SQL_SERVER_DRIVER_CLASS.equals(driverClassName) && !jdbcUrl.startsWith(SQL_SERVER_JDBC_PREFIX)) { + if (jdbcUrl.startsWith("jdbc:")) { + String suggestedUrl = SQL_SERVER_JDBC_PREFIX + jdbcUrl.substring("jdbc:".length()); + throw new IllegalStateException( + "Invalid SQL Server JDBC URL '" + jdbcUrl + "'. Expected prefix '" + + SQL_SERVER_JDBC_PREFIX + "'. Example: " + suggestedUrl + ); + } + throw new IllegalStateException( + "Invalid SQL Server JDBC URL '" + jdbcUrl + "'. Expected prefix '" + + SQL_SERVER_JDBC_PREFIX + "'" + ); + } + + return jdbcUrl; + } + + private String trimToNull(String value) { + if (value == null) { + return null; + } + String trimmedValue = value.trim(); + return trimmedValue.isEmpty() ? null : trimmedValue; + } + + public static class TachographDataSourceProperties { + private String jdbcUrl; + private String username; + private String password; + private String driverClassName; + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getDriverClassName() { + return driverClassName; + } + + public void setDriverClassName(String driverClassName) { + this.driverClassName = driverClassName; + } + } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java index 18aeb13..1363500 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java @@ -1,27 +1,234 @@ package at.procon.eventhub.tachograph.service; +import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult; +import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert; +import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert; +import at.procon.eventhub.persistence.EventSourceRepository; +import at.procon.eventhub.persistence.SourceMasterDataRepository; import at.procon.eventhub.tachograph.dto.TachographImportRequest; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Service; +import org.springframework.util.StreamUtils; -/** - * Hook for refreshing tachograph master data before event extraction. - * - * The generated project does not yet know the concrete tachograph master-data - * schema. Replace/extend this service with SQL readers for organisations, - * vehicles, vehicle registrations, drivers, and driver cards. - */ @Service public class TachographMasterDataRefreshService { private static final Logger log = LoggerFactory.getLogger(TachographMasterDataRefreshService.class); - public void refreshIfRequested(TachographImportRequest request) { + private static final List ENTITY_SQL_RESOURCES = List.of( + "classpath:sql/tachograph/master-data/organisations.sql", + "classpath:sql/tachograph/master-data/drivers.sql", + "classpath:sql/tachograph/master-data/driver-cards.sql", + "classpath:sql/tachograph/master-data/vehicles.sql", + "classpath:sql/tachograph/master-data/vehicle-registrations.sql" + ); + + private static final String RELATIONS_SQL_RESOURCE = "classpath:sql/tachograph/master-data/relations.sql"; + + private final ObjectProvider tachographJdbcTemplateProvider; + private final SourceMasterDataRepository sourceMasterDataRepository; + private final EventSourceRepository eventSourceRepository; + private final ResourceLoader resourceLoader; + + public TachographMasterDataRefreshService( + @Qualifier("tachographNamedParameterJdbcTemplate") ObjectProvider tachographJdbcTemplateProvider, + SourceMasterDataRepository sourceMasterDataRepository, + EventSourceRepository eventSourceRepository, + ResourceLoader resourceLoader + ) { + this.tachographJdbcTemplateProvider = tachographJdbcTemplateProvider; + this.sourceMasterDataRepository = sourceMasterDataRepository; + this.eventSourceRepository = eventSourceRepository; + this.resourceLoader = resourceLoader; + } + + public MasterDataRefreshResult refreshIfRequested(TachographImportRequest request) { if (!request.refreshMasterDataFirst()) { - return; + return MasterDataRefreshResult.empty(); } - log.info("Tachograph master-data refresh requested for tenant={} source={}. Concrete SQL refresh is a project-specific extension point.", - request.tenantKey(), request.eventSource().stableKey()); + return refresh(request); + } + + public MasterDataRefreshResult refresh(TachographImportRequest request) { + NamedParameterJdbcTemplate tachographJdbcTemplate = tachographJdbcTemplateProvider.getIfAvailable(); + if (tachographJdbcTemplate == null) { + log.info("Skipping tachograph master-data refresh for tenant={} because no tachograph datasource is configured.", + request.tenantKey()); + return MasterDataRefreshResult.empty(); + } + + String tenantKey = request.tenantKey() == null || request.tenantKey().isBlank() ? "default" : request.tenantKey().trim(); + int eventSourceId = eventSourceRepository.resolveSourceId(tenantKey, request.eventSource()); + + int entities = 0; + for (String sqlResource : ENTITY_SQL_RESOURCES) { + List batch = tachographJdbcTemplate.query( + loadSql(sqlResource), + Map.of(), + (rs, rowNum) -> entity(rs) + ); + entities += sourceMasterDataRepository.upsertEntities(tenantKey, eventSourceId, batch); + } + + List relations = tachographJdbcTemplate.query( + loadSql(RELATIONS_SQL_RESOURCE), + Map.of(), + (rs, rowNum) -> relation(rs) + ); + int relationCount = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, relations); + + MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount); + log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={}", + tenantKey, request.eventSource().stableKey(), result.entitiesUpserted(), result.relationsUpserted()); + return result; + } + + private SourceMasterEntityUpsert entity(ResultSet rs) throws SQLException { + String entityType = string(rs, "entity_type"); + String sourceEntityId = string(rs, "source_entity_id"); + OffsetDateTime validFrom = offsetDateTime(rs, "valid_from"); + OffsetDateTime validTo = offsetDateTime(rs, "valid_to"); + ValidityRange validityRange = normalizeValidityRange( + validFrom, + validTo, + "entity", + entityType + ":" + sourceEntityId + ); + return new SourceMasterEntityUpsert( + entityType, + sourceEntityId, + string(rs, "source_external_key"), + string(rs, "display_name"), + bool(rs, "active"), + validityRange.validFrom(), + validityRange.validTo(), + offsetDateTime(rs, "source_updated_at"), + payload(rs) + ); + } + + private SourceMasterRelationUpsert relation(ResultSet rs) throws SQLException { + String relationType = string(rs, "relation_type"); + String fromEntityType = string(rs, "from_entity_type"); + String fromSourceEntityId = string(rs, "from_source_entity_id"); + String toEntityType = string(rs, "to_entity_type"); + String toSourceEntityId = string(rs, "to_source_entity_id"); + OffsetDateTime validFrom = offsetDateTime(rs, "valid_from"); + OffsetDateTime validTo = offsetDateTime(rs, "valid_to"); + ValidityRange validityRange = normalizeValidityRange( + validFrom, + validTo, + "relation", + relationType + ":" + fromEntityType + ":" + fromSourceEntityId + "->" + toEntityType + ":" + toSourceEntityId + ); + return new SourceMasterRelationUpsert( + relationType, + fromEntityType, + fromSourceEntityId, + toEntityType, + toSourceEntityId, + validityRange.validFrom(), + validityRange.validTo(), + offsetDateTime(rs, "source_updated_at"), + payload(rs) + ); + } + + private Map payload(ResultSet rs) throws SQLException { + ResultSetMetaData metaData = rs.getMetaData(); + Map payload = new LinkedHashMap<>(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + String name = metaData.getColumnLabel(i); + Object value = rs.getObject(i); + if (value != null) { + payload.put(name, value); + } + } + return payload; + } + + private String loadSql(String location) { + Resource resource = resourceLoader.getResource(location); + try (var inputStream = resource.getInputStream()) { + return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new IllegalStateException("Cannot load tachograph master-data SQL resource " + location, e); + } + } + + private String string(ResultSet rs, String column) throws SQLException { + String value = rs.getString(column); + return value == null || value.isBlank() ? null : value.trim(); + } + + private Boolean bool(ResultSet rs, String column) throws SQLException { + Object value = rs.getObject(column); + if (value == null) { + return null; + } + if (value instanceof Boolean bool) { + return bool; + } + if (value instanceof Number number) { + return number.intValue() != 0; + } + return Boolean.parseBoolean(value.toString()); + } + + private OffsetDateTime offsetDateTime(ResultSet rs, String column) throws SQLException { + Object value = rs.getObject(column); + if (value == null) { + return null; + } + if (value instanceof OffsetDateTime offsetDateTime) { + return offsetDateTime; + } + if (value instanceof Timestamp timestamp) { + return timestamp.toInstant().atOffset(ZoneOffset.UTC); + } + if (value instanceof java.time.LocalDateTime localDateTime) { + return localDateTime.atOffset(ZoneOffset.UTC); + } + return OffsetDateTime.parse(value.toString()); + } + + private ValidityRange normalizeValidityRange( + OffsetDateTime validFrom, + OffsetDateTime validTo, + String rowKind, + String rowKey + ) { + if (validFrom == null || validTo == null || !validFrom.isAfter(validTo)) { + return new ValidityRange(validFrom, validTo); + } + + log.warn( + "Ignoring invalid validity end for {} {} because valid_from {} is after valid_to {}. Keeping valid_from and setting valid_to=null.", + rowKind, + rowKey, + validFrom, + validTo + ); + return new ValidityRange(validFrom, null); + } + + private record ValidityRange(OffsetDateTime validFrom, OffsetDateTime validTo) { } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d3e08b6..e3c2e23 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,8 +3,8 @@ spring: name: eventhub-ingestion-service datasource: url: jdbc:postgresql://localhost:5432/eventhub - username: eventhub - password: eventhub + username: postgres + password: P54!pcd#Wi flyway: enabled: true default-schema: eventhub @@ -34,11 +34,11 @@ eventhub: occurred-at-overlap: 7d # Configure this block to enable JdbcTachographExtractionBatchExecutor. - # datasource: - # jdbc-url: jdbc:sqlserver://localhost:1433;databaseName=tachograph;encrypt=true;trustServerCertificate=true - # username: tachograph_user - # password: change-me - # driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver + datasource: + jdbc-url: jdbc:sqlserver://db.bytebar.eu:22996;databaseName=ByteBarDriverSettlement;trustServerCertificate=true + username: ReadOnly + password: p2=race! + driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver # Enables the scheduler that regularly triggers configured tachograph import plans. scheduler-enabled: false diff --git a/src/main/resources/db/migration/V2__create_source_master_data_schema.sql b/src/main/resources/db/migration/V2__create_source_master_data_schema.sql new file mode 100644 index 0000000..16369db --- /dev/null +++ b/src/main/resources/db/migration/V2__create_source_master_data_schema.sql @@ -0,0 +1,54 @@ +create table if not exists eventhub.source_master_entity ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + entity_type text not null, + source_entity_id text not null, + source_external_key text, + display_name text, + active boolean, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_master_entity unique (tenant_key, event_source_id, entity_type, source_entity_id), + constraint chk_source_master_entity_valid_time_order check (valid_from is null or valid_to is null or valid_from < valid_to) +); + +create table if not exists eventhub.source_master_relation ( + id uuid primary key, + tenant_key text not null, + event_source_id integer not null references eventhub.event_source(id), + relation_key text not null, + relation_type text not null, + from_entity_type text not null, + from_source_entity_id text not null, + to_entity_type text not null, + to_source_entity_id text not null, + valid_from timestamptz, + valid_to timestamptz, + source_updated_at timestamptz, + payload jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint ux_source_master_relation unique (tenant_key, event_source_id, relation_key), + constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from < valid_to) +); + +create index if not exists idx_source_master_entity_type_key + on eventhub.source_master_entity(tenant_key, event_source_id, entity_type, source_external_key) + where source_external_key is not null; + +create index if not exists idx_source_master_entity_payload_gin + on eventhub.source_master_entity using gin(payload); + +create index if not exists idx_source_master_relation_from + on eventhub.source_master_relation(tenant_key, event_source_id, from_entity_type, from_source_entity_id, relation_type); + +create index if not exists idx_source_master_relation_to + on eventhub.source_master_relation(tenant_key, event_source_id, to_entity_type, to_source_entity_id, relation_type); + +create index if not exists idx_source_master_relation_payload_gin + on eventhub.source_master_relation using gin(payload); diff --git a/src/main/resources/db/migration/V3__relax_source_master_valid_time_constraints.sql b/src/main/resources/db/migration/V3__relax_source_master_valid_time_constraints.sql new file mode 100644 index 0000000..5fae1ef --- /dev/null +++ b/src/main/resources/db/migration/V3__relax_source_master_valid_time_constraints.sql @@ -0,0 +1,13 @@ +alter table eventhub.source_master_entity + drop constraint if exists chk_source_master_entity_valid_time_order; + +alter table eventhub.source_master_entity + add constraint chk_source_master_entity_valid_time_order + check (valid_from is null or valid_to is null or valid_from <= valid_to); + +alter table eventhub.source_master_relation + drop constraint if exists chk_source_master_relation_valid_time_order; + +alter table eventhub.source_master_relation + add constraint chk_source_master_relation_valid_time_order + check (valid_from is null or valid_to is null or valid_from <= valid_to); diff --git a/src/main/resources/db/migration/V4__replace_acquired_event_with_event_and_details.sql b/src/main/resources/db/migration/V4__replace_acquired_event_with_event_and_details.sql new file mode 100644 index 0000000..da47c14 --- /dev/null +++ b/src/main/resources/db/migration/V4__replace_acquired_event_with_event_and_details.sql @@ -0,0 +1,91 @@ +create extension if not exists postgis; + +drop table if exists eventhub.event_detail; +drop table if exists eventhub.acquired_event; +drop table if exists eventhub.event; + +create table eventhub.event ( + id uuid not null, + event_source_id integer not null references eventhub.event_source(id), + data_package_id uuid not null references eventhub.data_package(id), + + external_source_event_id text not null, + + driver_entity_id uuid references eventhub.source_master_entity(id), + vehicle_entity_id uuid references eventhub.source_master_entity(id), + source_package_entity_id uuid references eventhub.source_master_entity(id), + + occurred_at timestamptz not null, + received_partner_at timestamptz, + received_hub_at timestamptz not null default now(), + + event_domain text not null, + event_type text not null, + lifecycle text not null, + + odometer_m bigint, + position geography(Point, 4326), + + payload jsonb not null default '{}'::jsonb, + manual_entry boolean not null default false, + + source_record_key_hash text not null, + event_signature_hash text, + + created_at timestamptz not null default now(), + + constraint pk_event primary key (occurred_at, id), + constraint chk_event_driver_or_vehicle_ref check ( + driver_entity_id is not null + or vehicle_entity_id is not null + ) +); + +create unique index ux_event_source_record + on eventhub.event(source_record_key_hash); + +create index idx_event_signature + on eventhub.event(event_signature_hash) + where event_signature_hash is not null; + +create index idx_event_source_time + on eventhub.event(event_source_id, occurred_at desc); + +create index idx_event_package_time + on eventhub.event(data_package_id, occurred_at desc); + +create index idx_event_domain_type_time + on eventhub.event(event_domain, event_type, occurred_at desc); + +create index idx_event_driver_time + on eventhub.event(driver_entity_id, occurred_at desc) + where driver_entity_id is not null; + +create index idx_event_vehicle_time + on eventhub.event(vehicle_entity_id, occurred_at desc) + where vehicle_entity_id is not null; + +create index idx_event_position_gist + on eventhub.event using gist(position) + where position is not null; + +create index idx_event_payload_gin + on eventhub.event using gin(payload); + +create table eventhub.event_detail ( + event_occurred_at timestamptz not null, + event_id uuid not null, + detail_type text not null, + attributes jsonb not null default '{}'::jsonb, + created_at timestamptz not null default now(), + constraint pk_event_detail primary key (event_occurred_at, event_id, detail_type), + constraint fk_event_detail_event foreign key (event_occurred_at, event_id) + references eventhub.event(occurred_at, id) + on delete cascade +); + +create index idx_event_detail_type + on eventhub.event_detail(detail_type); + +create index idx_event_detail_attributes_gin + on eventhub.event_detail using gin(attributes); diff --git a/src/main/resources/sql/tachograph/master-data/driver-cards.sql b/src/main/resources/sql/tachograph/master-data/driver-cards.sql new file mode 100644 index 0000000..5ec543f --- /dev/null +++ b/src/main/resources/sql/tachograph/master-data/driver-cards.sql @@ -0,0 +1,27 @@ +select + 'DRIVER_CARD' as entity_type, + cast(c.ID as varchar(128)) as source_entity_id, + concat(coalesce(n.AlphaCode, ''), ':', c.CardNumber) as source_external_key, + concat(coalesce(n.AlphaCode, ''), ':', c.CardNumber) as display_name, + cast(case when c.IsLastCard = 1 and (c.ExpiryDate is null or c.ExpiryDate > getutcdate()) then 1 else 0 end as bit) as active, + c.IssueDate as valid_from, + c.ExpiryDate as valid_to, + cast(null as datetime) as source_updated_at, + c.ID as card_id, + cast(c.ID_Driver as varchar(128)) as driver_id, + n.AlphaCode as card_nation, + c.CardNumber as card_number, + c.Consecutive as consecutive, + c.Replacement as replacement, + c.Renewal as renewal, + c.IssueDate as issue_date, + c.ExpiryDate as expiry_date, + c.ValidityDate as validity_date, + c.Authorityname as authority_name, + c.ID_FileLog as file_log_id, + c.IsAnalog as is_analog, + c.IsLastCard as is_last_card, + c.Generation as generation +from dbo.Card c +left join dbo.Nation n on n.ID = c.ID_Nation +where c.CardNumber is not null diff --git a/src/main/resources/sql/tachograph/master-data/drivers.sql b/src/main/resources/sql/tachograph/master-data/drivers.sql new file mode 100644 index 0000000..8495fa2 --- /dev/null +++ b/src/main/resources/sql/tachograph/master-data/drivers.sql @@ -0,0 +1,35 @@ +select + 'DRIVER' as entity_type, + cast(d.ID as varchar(128)) as source_entity_id, + coalesce(nullif(d.IdentificationNumber, ''), nullif(d.LicenseNumber, ''), cast(d.ID as varchar(128))) as source_external_key, + ltrim(rtrim(concat(d.Surname, ' ', d.Firstnames))) as display_name, + cast(case when d.IsActive = 1 then 1 else 0 end as bit) as active, + d.EmploymentStartDate as valid_from, + d.RetirementDate as valid_to, + d.LastUpdate as source_updated_at, + d.ID as driver_id, + d.Surname as surname, + d.Firstnames as first_names, + d.Birthdate as birth_date, + d.BirthPlace as birth_place, + d.LicenseNumber as license_number, + ln.AlphaCode as license_nation, + d.LicenseAuthority as license_authority, + d.LExpiryDate as license_expiry_date, + d.Phone as phone, + d.Mobile as mobile, + d.Email as email, + d.CostCenter as cost_center, + d.OrganisationUserID as organisation_user_id, + d.IdentificationNumber as identification_number, + d.Region as region, + d.ID_FileLog as file_log_id, + d.LastFileLog_ID as last_file_log_id, + d.FirstDrivingTime as first_driving_time, + d.LastDrivingTime as last_driving_time, + d.LastUsedVehicle_ID as last_used_vehicle_id, + d.LastUsedVehicleTime as last_used_vehicle_time, + d.IsDigital as is_digital, + d.IsAnalog as is_analog +from dbo.Driver d +left join dbo.Nation ln on ln.ID = d.ID_LicenseNation diff --git a/src/main/resources/sql/tachograph/master-data/organisations.sql b/src/main/resources/sql/tachograph/master-data/organisations.sql new file mode 100644 index 0000000..5b7e943 --- /dev/null +++ b/src/main/resources/sql/tachograph/master-data/organisations.sql @@ -0,0 +1,22 @@ +select + 'ORGANISATION' as entity_type, + cast(o.oid as varchar(128)) as source_entity_id, + cast(o.GUID as varchar(64)) as source_external_key, + coalesce(nullif(o.name, ''), nullif(o.kurzbez, ''), cast(o.oid as varchar(128))) as display_name, + cast(case when o.gueltigbis is null or o.gueltigbis > getutcdate() then 1 else 0 end as bit) as active, + o.VertragGiltAb as valid_from, + o.gueltigbis as valid_to, + cast(null as datetime) as source_updated_at, + o.oid as organisation_id, + cast(o.GUID as varchar(64)) as organisation_guid, + o.kurzbez as organisation_code, + o.name as organisation_name, + cast(o.n_rekey01 as varchar(128)) as parent_organisation_id, + o.Unternehmens_ID as company_id, + o.kostenstelle as cost_center, + o.Country as country, + o.PostalCode as postal_code, + o.City as city, + o.strasse as street, + o.Email as email +from dbo.I_90021 o diff --git a/src/main/resources/sql/tachograph/master-data/relations.sql b/src/main/resources/sql/tachograph/master-data/relations.sql new file mode 100644 index 0000000..2174df2 --- /dev/null +++ b/src/main/resources/sql/tachograph/master-data/relations.sql @@ -0,0 +1,75 @@ +select + 'ORGANISATION_PARENT' as relation_type, + 'ORGANISATION' as from_entity_type, + cast(o.oid as varchar(128)) as from_source_entity_id, + 'ORGANISATION' as to_entity_type, + cast(o.n_rekey01 as varchar(128)) as to_source_entity_id, + o.VertragGiltAb as valid_from, + o.gueltigbis as valid_to, + cast(null as datetime) as source_updated_at, + 'I_90021' as source_table, + cast(o.oid as varchar(128)) as source_row_id +from dbo.I_90021 o +where o.n_rekey01 is not null + +union all + +select + 'DRIVER_ORGANISATION' as relation_type, + 'DRIVER' as from_entity_type, + cast(rel.ID_Driver as varchar(128)) as from_source_entity_id, + 'ORGANISATION' as to_entity_type, + cast(rel.ID_I_90021 as varchar(128)) as to_source_entity_id, + rel.GILT_AB as valid_from, + rel.GILT_BIS as valid_to, + rel.LastUpdate as source_updated_at, + 'Driver_I_90021' as source_table, + cast(rel.ID as varchar(128)) as source_row_id +from dbo.Driver_I_90021 rel + +union all + +select + 'DRIVER_CARD_DRIVER' as relation_type, + 'DRIVER_CARD' as from_entity_type, + cast(c.ID as varchar(128)) as from_source_entity_id, + 'DRIVER' as to_entity_type, + cast(c.ID_Driver as varchar(128)) as to_source_entity_id, + c.IssueDate as valid_from, + c.ExpiryDate as valid_to, + cast(null as datetime) as source_updated_at, + 'Card' as source_table, + cast(c.ID as varchar(128)) as source_row_id +from dbo.Card c +where c.ID_Driver is not null + +union all + +select + 'VEHICLE_ORGANISATION' as relation_type, + 'VEHICLE_REGISTRATION' as from_entity_type, + cast(rel.ID_Vehicle as varchar(128)) as from_source_entity_id, + 'ORGANISATION' as to_entity_type, + cast(rel.ID_I_90021 as varchar(128)) as to_source_entity_id, + rel.GILT_AB as valid_from, + rel.GILT_BIS as valid_to, + rel.LastUpdate as source_updated_at, + 'Vehicle_I_90021' as source_table, + cast(rel.ID as varchar(128)) as source_row_id +from dbo.Vehicle_I_90021 rel + +union all + +select + 'VEHICLE_REGISTRATION_VEHICLE' as relation_type, + 'VEHICLE_REGISTRATION' as from_entity_type, + cast(v.ID as varchar(128)) as from_source_entity_id, + 'VEHICLE' as to_entity_type, + cast(v.ID_VehicleIdentification as varchar(128)) as to_source_entity_id, + v.ValidFrom as valid_from, + v.ValidTo as valid_to, + cast(null as datetime) as source_updated_at, + 'Vehicle' as source_table, + cast(v.ID as varchar(128)) as source_row_id +from dbo.Vehicle v +where v.ID_VehicleIdentification is not null diff --git a/src/main/resources/sql/tachograph/master-data/vehicle-registrations.sql b/src/main/resources/sql/tachograph/master-data/vehicle-registrations.sql new file mode 100644 index 0000000..780eba7 --- /dev/null +++ b/src/main/resources/sql/tachograph/master-data/vehicle-registrations.sql @@ -0,0 +1,20 @@ +select + 'VEHICLE_REGISTRATION' as entity_type, + cast(v.ID as varchar(128)) as source_entity_id, + concat(coalesce(n.AlphaCode, ''), ':', v.VRN) as source_external_key, + concat(coalesce(n.AlphaCode, ''), ':', v.VRN) as display_name, + cast(case when v.ValidTo is null or v.ValidTo > getutcdate() then 1 else 0 end as bit) as active, + v.ValidFrom as valid_from, + v.ValidTo as valid_to, + cast(null as datetime) as source_updated_at, + v.ID as vehicle_registration_id, + cast(v.ID_VehicleIdentification as varchar(128)) as vehicle_identification_id, + n.AlphaCode as registration_nation, + v.VRN as registration_number, + v.VrnNormalized as registration_number_normalized, + v.ID_FileLog as file_log_id, + v.LastDriver_ID as last_driver_id, + v.LastDriverTime as last_driver_time +from dbo.Vehicle v +left join dbo.Nation n on n.ID = v.ID_Nation +where v.VRN is not null diff --git a/src/main/resources/sql/tachograph/master-data/vehicles.sql b/src/main/resources/sql/tachograph/master-data/vehicles.sql new file mode 100644 index 0000000..7bee3d9 --- /dev/null +++ b/src/main/resources/sql/tachograph/master-data/vehicles.sql @@ -0,0 +1,28 @@ +select + 'VEHICLE' as entity_type, + cast(vi.ID as varchar(128)) as source_entity_id, + vi.VIN as source_external_key, + vi.VIN as display_name, + cast(case when vi.IsActive = 1 then 1 else 0 end as bit) as active, + cast(null as datetime) as valid_from, + cast(null as datetime) as valid_to, + cast(null as datetime) as source_updated_at, + vi.ID as vehicle_identification_id, + vi.VIN as vin, + vi.CostCenter as cost_center, + vi.ID_FileLog as file_log_id, + vi.BillingDate as billing_date, + vi.Mobile as mobile, + vi.Email as email, + vi.GrossVehicleMass as gross_vehicle_mass, + vi.VehicleType_ID as vehicle_type_id, + vi.FirstDrivingTime as first_driving_time, + vi.LastDrivingTime as last_driving_time, + vi.LastOdoEnd as last_odo_end, + vi.LastOdoEndTime as last_odo_end_time, + vi.LastDriver_ID as last_driver_id, + vi.LastDriverTime as last_driver_time, + vi.LastFileLog_ID as last_file_log_id, + vi.IsDigital as is_digital, + vi.IsAnalog as is_analog +from dbo.VehicleIdentification vi