Implement master-data-backed event model and tachograph refresh flow
This commit is contained in:
parent
e3ffa56932
commit
ec533bb24f
|
|
@ -3,7 +3,7 @@
|
||||||
create extension if not exists timescaledb;
|
create extension if not exists timescaledb;
|
||||||
|
|
||||||
select create_hypertable(
|
select create_hypertable(
|
||||||
'eventhub.acquired_event',
|
'eventhub.event',
|
||||||
'occurred_at',
|
'occurred_at',
|
||||||
if_not_exists => true,
|
if_not_exists => true,
|
||||||
migrate_data => true
|
migrate_data => true
|
||||||
|
|
|
||||||
21
pom.xml
21
pom.xml
|
|
@ -97,6 +97,27 @@
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-enforcer-plugin</artifactId>
|
||||||
|
<version>3.6.1</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>enforce-java</id>
|
||||||
|
<goals>
|
||||||
|
<goal>enforce</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<rules>
|
||||||
|
<requireJavaVersion>
|
||||||
|
<version>[21,)</version>
|
||||||
|
<message>This project requires Java 21+ to build and run.</message>
|
||||||
|
</requireJavaVersion>
|
||||||
|
</rules>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
package at.procon.eventhub.config;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class JacksonConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnMissingBean(ObjectMapper.class)
|
||||||
|
public ObjectMapper objectMapper() {
|
||||||
|
return new ObjectMapper().findAndRegisterModules();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -0,0 +1,10 @@
|
||||||
|
package at.procon.eventhub.importing.masterdata;
|
||||||
|
|
||||||
|
public record MasterDataRefreshResult(
|
||||||
|
int entitiesUpserted,
|
||||||
|
int relationsUpserted
|
||||||
|
) {
|
||||||
|
public static MasterDataRefreshResult empty() {
|
||||||
|
return new MasterDataRefreshResult(0, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
package at.procon.eventhub.importing.masterdata;
|
||||||
|
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public record SourceMasterEntityUpsert(
|
||||||
|
String entityType,
|
||||||
|
String sourceEntityId,
|
||||||
|
String sourceExternalKey,
|
||||||
|
String displayName,
|
||||||
|
Boolean active,
|
||||||
|
OffsetDateTime validFrom,
|
||||||
|
OffsetDateTime validTo,
|
||||||
|
OffsetDateTime sourceUpdatedAt,
|
||||||
|
Map<String, Object> payload
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
package at.procon.eventhub.importing.masterdata;
|
||||||
|
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public record SourceMasterRelationUpsert(
|
||||||
|
String relationType,
|
||||||
|
String fromEntityType,
|
||||||
|
String fromSourceEntityId,
|
||||||
|
String toEntityType,
|
||||||
|
String toSourceEntityId,
|
||||||
|
OffsetDateTime validFrom,
|
||||||
|
OffsetDateTime validTo,
|
||||||
|
OffsetDateTime sourceUpdatedAt,
|
||||||
|
Map<String, Object> payload
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
@ -10,12 +10,12 @@ import at.procon.eventhub.service.EventAcquisitionRecordKeyService;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.sql.PreparedStatement;
|
|
||||||
import java.sql.Types;
|
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
|
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
|
|
||||||
|
|
@ -25,125 +25,325 @@ public class EventRepository {
|
||||||
private final JdbcTemplate jdbcTemplate;
|
private final JdbcTemplate jdbcTemplate;
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
private final EventAcquisitionRecordKeyService recordKeyService;
|
private final EventAcquisitionRecordKeyService recordKeyService;
|
||||||
|
private final SourceMasterDataRepository sourceMasterDataRepository;
|
||||||
|
|
||||||
public EventRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, EventAcquisitionRecordKeyService recordKeyService) {
|
public EventRepository(
|
||||||
|
JdbcTemplate jdbcTemplate,
|
||||||
|
ObjectMapper objectMapper,
|
||||||
|
EventAcquisitionRecordKeyService recordKeyService,
|
||||||
|
SourceMasterDataRepository sourceMasterDataRepository
|
||||||
|
) {
|
||||||
this.jdbcTemplate = jdbcTemplate;
|
this.jdbcTemplate = jdbcTemplate;
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
this.recordKeyService = recordKeyService;
|
this.recordKeyService = recordKeyService;
|
||||||
|
this.sourceMasterDataRepository = sourceMasterDataRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acquisition-stage persistence. This table stores source records as imported.
|
* Persists normalized events and resolves master-data references on the fly.
|
||||||
* It does not merge or deduplicate equivalent events from different sources;
|
* The source-record hash is unique and provides source-level import idempotency.
|
||||||
* later query/read models can combine sources when a preferred source has gaps.
|
|
||||||
* Organisation assignment is not stored per event; it belongs to master-data
|
|
||||||
* relations for driver/vehicle and can be resolved by occurredAt later.
|
|
||||||
*/
|
*/
|
||||||
public int batchInsert(UUID packageId, int eventSourceId, List<EventHubEventDto> events) {
|
public int batchInsert(UUID packageId, String tenantKey, int eventSourceId, List<EventHubEventDto> events) {
|
||||||
int[] counts = jdbcTemplate.batchUpdate(
|
Map<String, UUID> entityIdCache = new HashMap<>();
|
||||||
|
int insertedCount = 0;
|
||||||
|
|
||||||
|
for (EventHubEventDto event : events) {
|
||||||
|
ResolvedEntityRefs refs = resolveEntityRefs(tenantKey, eventSourceId, event, entityIdCache);
|
||||||
|
InsertedEventRow row = upsertEvent(packageId, eventSourceId, event, refs);
|
||||||
|
upsertEventDetails(row, event);
|
||||||
|
if (row.inserted()) {
|
||||||
|
insertedCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return insertedCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
private InsertedEventRow upsertEvent(
|
||||||
|
UUID packageId,
|
||||||
|
int eventSourceId,
|
||||||
|
EventHubEventDto event,
|
||||||
|
ResolvedEntityRefs refs
|
||||||
|
) {
|
||||||
|
UUID requestedEventId = event.eventId() == null ? UUID.randomUUID() : event.eventId();
|
||||||
|
OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt();
|
||||||
|
String sourceRecordKeyHash = recordKeyService.buildSourceRecordKeyHash(event, eventSourceId);
|
||||||
|
|
||||||
|
return jdbcTemplate.query(
|
||||||
"""
|
"""
|
||||||
insert into eventhub.acquired_event(
|
with inserted as (
|
||||||
|
insert into eventhub.event(
|
||||||
id, event_source_id, data_package_id,
|
id, event_source_id, data_package_id,
|
||||||
external_source_event_id,
|
external_source_event_id,
|
||||||
driver_source_entity_id, driver_card_nation, driver_card_number,
|
driver_entity_id, vehicle_entity_id, source_package_entity_id,
|
||||||
vehicle_source_entity_id, vehicle_vin, vehicle_registration_nation, vehicle_registration_number,
|
|
||||||
source_package_kind, source_package_id, source_package_entity_id,
|
|
||||||
source_package_period_from, source_package_period_to, source_package_imported_at,
|
|
||||||
occurred_at, received_partner_at, received_hub_at,
|
occurred_at, received_partner_at, received_hub_at,
|
||||||
event_domain, event_type, lifecycle,
|
event_domain, event_type, lifecycle,
|
||||||
odometer_m, latitude, longitude,
|
odometer_m, position,
|
||||||
event_details, payload, manual_entry,
|
payload, manual_entry,
|
||||||
source_record_key_hash, event_signature_hash
|
source_record_key_hash, event_signature_hash
|
||||||
) values (
|
) values (
|
||||||
?, ?, ?,
|
?, ?, ?,
|
||||||
?,
|
?,
|
||||||
?, ?, ?,
|
?, ?, ?,
|
||||||
?, ?, ?, ?,
|
|
||||||
?, ?, ?,
|
?, ?, ?,
|
||||||
?, ?, ?,
|
?, ?, ?,
|
||||||
?, ?, ?,
|
?, case
|
||||||
?, ?, ?,
|
when ? is null or ? is null then null
|
||||||
?, ?, ?,
|
else ST_SetSRID(ST_MakePoint(?, ?), 4326)::geography
|
||||||
?::jsonb, ?::jsonb, ?,
|
end,
|
||||||
|
?::jsonb, ?,
|
||||||
?, ?
|
?, ?
|
||||||
)
|
)
|
||||||
on conflict do nothing
|
on conflict (source_record_key_hash) do nothing
|
||||||
|
returning id, occurred_at, true as inserted
|
||||||
|
)
|
||||||
|
select id, occurred_at, inserted
|
||||||
|
from inserted
|
||||||
|
union all
|
||||||
|
select e.id, e.occurred_at, false as inserted
|
||||||
|
from eventhub.event e
|
||||||
|
where e.source_record_key_hash = ?
|
||||||
|
and not exists (select 1 from inserted)
|
||||||
""",
|
""",
|
||||||
new BatchPreparedStatementSetter() {
|
rs -> {
|
||||||
@Override
|
if (!rs.next()) {
|
||||||
public void setValues(PreparedStatement ps, int i) throws java.sql.SQLException {
|
throw new IllegalStateException("Could not insert or resolve event row for source record hash " + sourceRecordKeyHash);
|
||||||
EventHubEventDto event = events.get(i);
|
|
||||||
UUID eventId = event.eventId() == null ? UUID.randomUUID() : event.eventId();
|
|
||||||
OffsetDateTime receivedHubAt = event.receivedHubAt() == null ? OffsetDateTime.now() : event.receivedHubAt();
|
|
||||||
DriverRefDto driverRef = event.driverRef();
|
|
||||||
DriverCardRefDto driverCard = driverRef == null ? null : driverRef.driverCard();
|
|
||||||
VehicleRefDto vehicleRef = event.vehicleRef();
|
|
||||||
VehicleRegistrationRefDto vehicleRegistration = vehicleRef == null ? null : vehicleRef.vehicleRegistration();
|
|
||||||
SourcePackageRefDto sourcePackageRef = event.sourcePackageRef();
|
|
||||||
|
|
||||||
ps.setObject(1, eventId);
|
|
||||||
ps.setInt(2, eventSourceId);
|
|
||||||
ps.setObject(3, packageId);
|
|
||||||
ps.setString(4, event.externalSourceEventId());
|
|
||||||
|
|
||||||
ps.setString(5, driverRef == null ? null : driverRef.sourceEntityId());
|
|
||||||
ps.setString(6, driverCard == null ? null : driverCard.nation());
|
|
||||||
ps.setString(7, driverCard == null ? null : driverCard.number());
|
|
||||||
|
|
||||||
ps.setString(8, vehicleRef == null ? null : vehicleRef.sourceEntityId());
|
|
||||||
ps.setString(9, vehicleRef == null ? null : vehicleRef.vin());
|
|
||||||
ps.setString(10, vehicleRegistration == null ? null : vehicleRegistration.nation());
|
|
||||||
ps.setString(11, vehicleRegistration == null ? null : vehicleRegistration.number());
|
|
||||||
|
|
||||||
ps.setString(12, sourcePackageRef == null ? null : sourcePackageRef.packageKind());
|
|
||||||
ps.setString(13, sourcePackageRef == null ? null : sourcePackageRef.sourcePackageId());
|
|
||||||
ps.setString(14, sourcePackageRef == null ? null : sourcePackageRef.sourceEntityId());
|
|
||||||
ps.setObject(15, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodFrom());
|
|
||||||
ps.setObject(16, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodTo());
|
|
||||||
ps.setObject(17, sourcePackageRef == null ? null : sourcePackageRef.importedIntoSourceAt());
|
|
||||||
|
|
||||||
ps.setObject(18, event.occurredAt());
|
|
||||||
ps.setObject(19, event.receivedPartnerAt());
|
|
||||||
ps.setObject(20, receivedHubAt);
|
|
||||||
ps.setString(21, event.eventDomain().name());
|
|
||||||
ps.setString(22, event.eventType().name());
|
|
||||||
ps.setString(23, event.lifecycle().name());
|
|
||||||
setNullableLong(ps, 24, event.odometerM());
|
|
||||||
if (event.position() == null) {
|
|
||||||
ps.setNull(25, Types.NUMERIC);
|
|
||||||
ps.setNull(26, Types.NUMERIC);
|
|
||||||
} else {
|
|
||||||
ps.setObject(25, event.position().latitude());
|
|
||||||
ps.setObject(26, event.position().longitude());
|
|
||||||
}
|
|
||||||
ps.setString(27, toJson(objectMapper.valueToTree(event.eventDetails())));
|
|
||||||
ps.setString(28, toJson(event.payload()));
|
|
||||||
ps.setBoolean(29, event.manualEntry());
|
|
||||||
ps.setString(30, recordKeyService.buildSourceRecordKeyHash(event, eventSourceId));
|
|
||||||
ps.setString(31, recordKeyService.buildEventSignatureHash(event));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getBatchSize() {
|
|
||||||
return events.size();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return new InsertedEventRow(
|
||||||
|
(UUID) rs.getObject("id"),
|
||||||
|
rs.getObject("occurred_at", OffsetDateTime.class),
|
||||||
|
rs.getBoolean("inserted")
|
||||||
|
);
|
||||||
|
},
|
||||||
|
requestedEventId,
|
||||||
|
eventSourceId,
|
||||||
|
packageId,
|
||||||
|
event.externalSourceEventId(),
|
||||||
|
refs.driverEntityId(),
|
||||||
|
refs.vehicleEntityId(),
|
||||||
|
refs.sourcePackageEntityId(),
|
||||||
|
event.occurredAt(),
|
||||||
|
event.receivedPartnerAt(),
|
||||||
|
receivedHubAt,
|
||||||
|
event.eventDomain().name(),
|
||||||
|
event.eventType().name(),
|
||||||
|
event.lifecycle().name(),
|
||||||
|
event.odometerM(),
|
||||||
|
event.position() == null ? null : event.position().longitude(),
|
||||||
|
event.position() == null ? null : event.position().latitude(),
|
||||||
|
event.position() == null ? null : event.position().longitude(),
|
||||||
|
event.position() == null ? null : event.position().latitude(),
|
||||||
|
toJson(event.payload()),
|
||||||
|
event.manualEntry(),
|
||||||
|
sourceRecordKeyHash,
|
||||||
|
recordKeyService.buildEventSignatureHash(event),
|
||||||
|
sourceRecordKeyHash
|
||||||
);
|
);
|
||||||
|
|
||||||
int inserted = 0;
|
|
||||||
for (int count : counts) {
|
|
||||||
if (count > 0 || count == PreparedStatement.SUCCESS_NO_INFO) {
|
|
||||||
inserted++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return inserted;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setNullableLong(PreparedStatement ps, int index, Long value) throws java.sql.SQLException {
|
private void upsertEventDetails(InsertedEventRow insertedEventRow, EventHubEventDto event) {
|
||||||
|
if (event.eventDetails() == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
jdbcTemplate.update(
|
||||||
|
"""
|
||||||
|
insert into eventhub.event_detail(
|
||||||
|
event_occurred_at, event_id, detail_type, attributes
|
||||||
|
) values (?, ?, ?, ?::jsonb)
|
||||||
|
on conflict (event_occurred_at, event_id, detail_type)
|
||||||
|
do update set
|
||||||
|
attributes = excluded.attributes
|
||||||
|
""",
|
||||||
|
insertedEventRow.occurredAt(),
|
||||||
|
insertedEventRow.eventId(),
|
||||||
|
event.eventDetails().type(),
|
||||||
|
toJson(event.eventDetails().attributes())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResolvedEntityRefs resolveEntityRefs(
|
||||||
|
String tenantKey,
|
||||||
|
int eventSourceId,
|
||||||
|
EventHubEventDto event,
|
||||||
|
Map<String, UUID> entityIdCache
|
||||||
|
) {
|
||||||
|
UUID driverEntityId = resolveDriverEntityId(tenantKey, eventSourceId, event, entityIdCache);
|
||||||
|
UUID vehicleEntityId = resolveVehicleEntityId(tenantKey, eventSourceId, event, entityIdCache);
|
||||||
|
UUID sourcePackageEntityId = resolveSourcePackageEntityId(tenantKey, eventSourceId, event, entityIdCache);
|
||||||
|
return new ResolvedEntityRefs(driverEntityId, vehicleEntityId, sourcePackageEntityId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private UUID resolveDriverEntityId(
|
||||||
|
String tenantKey,
|
||||||
|
int eventSourceId,
|
||||||
|
EventHubEventDto event,
|
||||||
|
Map<String, UUID> entityIdCache
|
||||||
|
) {
|
||||||
|
DriverRefDto driverRef = event.driverRef();
|
||||||
|
if (driverRef == null || !driverRef.hasAnyReference()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
DriverCardRefDto card = driverRef.driverCard();
|
||||||
|
String cardKey = card == null || !card.hasValue() ? null : card.stableKey();
|
||||||
|
String sourceEntityId = normalizeNullable(driverRef.sourceEntityId());
|
||||||
|
if (sourceEntityId == null && cardKey != null) {
|
||||||
|
sourceEntityId = "DRIVER_CARD:" + cardKey;
|
||||||
|
}
|
||||||
|
if (sourceEntityId == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> payload = new LinkedHashMap<>();
|
||||||
|
put(payload, "source_entity_id", driverRef.sourceEntityId());
|
||||||
|
put(payload, "driver_card_nation", card == null ? null : card.nation());
|
||||||
|
put(payload, "driver_card_number", card == null ? null : card.number());
|
||||||
|
return resolveEntityId(
|
||||||
|
tenantKey,
|
||||||
|
eventSourceId,
|
||||||
|
"DRIVER",
|
||||||
|
sourceEntityId,
|
||||||
|
cardKey,
|
||||||
|
sourceEntityId,
|
||||||
|
null,
|
||||||
|
payload,
|
||||||
|
entityIdCache
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private UUID resolveVehicleEntityId(
|
||||||
|
String tenantKey,
|
||||||
|
int eventSourceId,
|
||||||
|
EventHubEventDto event,
|
||||||
|
Map<String, UUID> entityIdCache
|
||||||
|
) {
|
||||||
|
VehicleRefDto vehicleRef = event.vehicleRef();
|
||||||
|
if (vehicleRef == null || !vehicleRef.hasAnyReference()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
VehicleRegistrationRefDto registration = vehicleRef.vehicleRegistration();
|
||||||
|
String registrationKey = registration == null || !registration.hasValue() ? null : registration.stableKey();
|
||||||
|
String sourceEntityId = normalizeNullable(vehicleRef.sourceEntityId());
|
||||||
|
if (sourceEntityId == null && normalizeNullable(vehicleRef.vin()) != null) {
|
||||||
|
sourceEntityId = "VIN:" + vehicleRef.vin();
|
||||||
|
}
|
||||||
|
if (sourceEntityId == null && registrationKey != null) {
|
||||||
|
sourceEntityId = "VRN:" + registrationKey;
|
||||||
|
}
|
||||||
|
if (sourceEntityId == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> payload = new LinkedHashMap<>();
|
||||||
|
put(payload, "source_entity_id", vehicleRef.sourceEntityId());
|
||||||
|
put(payload, "vin", vehicleRef.vin());
|
||||||
|
put(payload, "vehicle_registration_nation", registration == null ? null : registration.nation());
|
||||||
|
put(payload, "vehicle_registration_number", registration == null ? null : registration.number());
|
||||||
|
return resolveEntityId(
|
||||||
|
tenantKey,
|
||||||
|
eventSourceId,
|
||||||
|
"VEHICLE",
|
||||||
|
sourceEntityId,
|
||||||
|
normalizeNullable(vehicleRef.vin()) == null ? registrationKey : vehicleRef.vin(),
|
||||||
|
sourceEntityId,
|
||||||
|
null,
|
||||||
|
payload,
|
||||||
|
entityIdCache
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private UUID resolveSourcePackageEntityId(
|
||||||
|
String tenantKey,
|
||||||
|
int eventSourceId,
|
||||||
|
EventHubEventDto event,
|
||||||
|
Map<String, UUID> entityIdCache
|
||||||
|
) {
|
||||||
|
SourcePackageRefDto sourcePackageRef = event.sourcePackageRef();
|
||||||
|
if (sourcePackageRef == null || !sourcePackageRef.hasAnyReference()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
String packageKind = normalizeNullable(sourcePackageRef.packageKind());
|
||||||
|
String sourcePackageId = normalizeNullable(sourcePackageRef.sourcePackageId());
|
||||||
|
|
||||||
|
String sourceEntityId = normalizeNullable(sourcePackageRef.sourceEntityId());
|
||||||
|
if (sourceEntityId == null && sourcePackageId != null) {
|
||||||
|
sourceEntityId = "SOURCE_PACKAGE:" + (packageKind == null ? "UNKNOWN" : packageKind) + ":" + sourcePackageId;
|
||||||
|
}
|
||||||
|
if (sourceEntityId == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> payload = new LinkedHashMap<>();
|
||||||
|
put(payload, "package_kind", sourcePackageRef.packageKind());
|
||||||
|
put(payload, "source_package_id", sourcePackageRef.sourcePackageId());
|
||||||
|
put(payload, "source_entity_id", sourcePackageRef.sourceEntityId());
|
||||||
|
put(payload, "package_period_from", sourcePackageRef.packagePeriodFrom());
|
||||||
|
put(payload, "package_period_to", sourcePackageRef.packagePeriodTo());
|
||||||
|
put(payload, "imported_into_source_at", sourcePackageRef.importedIntoSourceAt());
|
||||||
|
|
||||||
|
String displayName = packageKind == null ? sourceEntityId : packageKind + ":" + (sourcePackageId == null ? sourceEntityId : sourcePackageId);
|
||||||
|
return resolveEntityId(
|
||||||
|
tenantKey,
|
||||||
|
eventSourceId,
|
||||||
|
"SOURCE_PACKAGE",
|
||||||
|
sourceEntityId,
|
||||||
|
sourcePackageId,
|
||||||
|
displayName,
|
||||||
|
null,
|
||||||
|
payload,
|
||||||
|
entityIdCache
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private UUID resolveEntityId(
|
||||||
|
String tenantKey,
|
||||||
|
int eventSourceId,
|
||||||
|
String entityType,
|
||||||
|
String sourceEntityId,
|
||||||
|
String sourceExternalKey,
|
||||||
|
String displayName,
|
||||||
|
Boolean active,
|
||||||
|
Map<String, Object> payload,
|
||||||
|
Map<String, UUID> entityIdCache
|
||||||
|
) {
|
||||||
|
String normalizedSourceEntityId = normalizeNullable(sourceEntityId);
|
||||||
|
if (normalizedSourceEntityId == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
String cacheKey = entityType + "|" + normalizedSourceEntityId + "|" + normalizeNullable(sourceExternalKey);
|
||||||
|
UUID cached = entityIdCache.get(cacheKey);
|
||||||
|
if (cached != null) {
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
|
||||||
|
UUID resolved = sourceMasterDataRepository.resolveOrCreateEntityId(
|
||||||
|
tenantKey,
|
||||||
|
eventSourceId,
|
||||||
|
entityType,
|
||||||
|
normalizedSourceEntityId,
|
||||||
|
normalizeNullable(sourceExternalKey),
|
||||||
|
normalizeNullable(displayName),
|
||||||
|
active,
|
||||||
|
payload
|
||||||
|
);
|
||||||
|
entityIdCache.put(cacheKey, resolved);
|
||||||
|
return resolved;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String normalizeNullable(String value) {
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
ps.setNull(index, Types.BIGINT);
|
return null;
|
||||||
} else {
|
}
|
||||||
ps.setLong(index, value);
|
String trimmed = value.trim();
|
||||||
|
return trimmed.isEmpty() ? null : trimmed;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void put(Map<String, Object> target, String key, Object value) {
|
||||||
|
if (value != null) {
|
||||||
|
target.put(key, value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -154,4 +354,18 @@ public class EventRepository {
|
||||||
throw new IllegalArgumentException("Cannot serialize JSONB value", e);
|
throw new IllegalArgumentException("Cannot serialize JSONB value", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private record ResolvedEntityRefs(
|
||||||
|
UUID driverEntityId,
|
||||||
|
UUID vehicleEntityId,
|
||||||
|
UUID sourcePackageEntityId
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
private record InsertedEventRow(
|
||||||
|
UUID eventId,
|
||||||
|
OffsetDateTime occurredAt,
|
||||||
|
boolean inserted
|
||||||
|
) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,205 @@
|
||||||
|
package at.procon.eventhub.persistence;
|
||||||
|
|
||||||
|
import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert;
|
||||||
|
import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.stereotype.Repository;
|
||||||
|
|
||||||
|
@Repository
|
||||||
|
public class SourceMasterDataRepository {
|
||||||
|
|
||||||
|
private final JdbcTemplate jdbcTemplate;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
public SourceMasterDataRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
|
||||||
|
this.jdbcTemplate = jdbcTemplate;
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int upsertEntities(String tenantKey, int eventSourceId, List<SourceMasterEntityUpsert> entities) {
|
||||||
|
int count = 0;
|
||||||
|
for (SourceMasterEntityUpsert entity : entities) {
|
||||||
|
if (entity.sourceEntityId() == null || entity.sourceEntityId().isBlank()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
jdbcTemplate.update(
|
||||||
|
"""
|
||||||
|
insert into eventhub.source_master_entity(
|
||||||
|
id, tenant_key, event_source_id, entity_type, source_entity_id,
|
||||||
|
source_external_key, display_name, active, valid_from, valid_to,
|
||||||
|
source_updated_at, payload, updated_at
|
||||||
|
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
|
||||||
|
on conflict (tenant_key, event_source_id, entity_type, source_entity_id)
|
||||||
|
do update set
|
||||||
|
source_external_key = excluded.source_external_key,
|
||||||
|
display_name = excluded.display_name,
|
||||||
|
active = excluded.active,
|
||||||
|
valid_from = excluded.valid_from,
|
||||||
|
valid_to = excluded.valid_to,
|
||||||
|
source_updated_at = excluded.source_updated_at,
|
||||||
|
payload = excluded.payload,
|
||||||
|
updated_at = now()
|
||||||
|
""",
|
||||||
|
UUID.randomUUID(),
|
||||||
|
tenantKey,
|
||||||
|
eventSourceId,
|
||||||
|
entity.entityType(),
|
||||||
|
entity.sourceEntityId(),
|
||||||
|
entity.sourceExternalKey(),
|
||||||
|
entity.displayName(),
|
||||||
|
entity.active(),
|
||||||
|
entity.validFrom(),
|
||||||
|
entity.validTo(),
|
||||||
|
entity.sourceUpdatedAt(),
|
||||||
|
toJson(entity.payload())
|
||||||
|
);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int upsertRelations(String tenantKey, int eventSourceId, List<SourceMasterRelationUpsert> relations) {
|
||||||
|
int count = 0;
|
||||||
|
for (SourceMasterRelationUpsert relation : relations) {
|
||||||
|
if (relation.fromSourceEntityId() == null || relation.toSourceEntityId() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String relationKey = relationKey(relation);
|
||||||
|
jdbcTemplate.update(
|
||||||
|
"""
|
||||||
|
insert into eventhub.source_master_relation(
|
||||||
|
id, tenant_key, event_source_id, relation_key, relation_type,
|
||||||
|
from_entity_type, from_source_entity_id, to_entity_type, to_source_entity_id,
|
||||||
|
valid_from, valid_to, source_updated_at, payload, updated_at
|
||||||
|
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
|
||||||
|
on conflict (tenant_key, event_source_id, relation_key)
|
||||||
|
do update set
|
||||||
|
relation_type = excluded.relation_type,
|
||||||
|
from_entity_type = excluded.from_entity_type,
|
||||||
|
from_source_entity_id = excluded.from_source_entity_id,
|
||||||
|
to_entity_type = excluded.to_entity_type,
|
||||||
|
to_source_entity_id = excluded.to_source_entity_id,
|
||||||
|
valid_from = excluded.valid_from,
|
||||||
|
valid_to = excluded.valid_to,
|
||||||
|
source_updated_at = excluded.source_updated_at,
|
||||||
|
payload = excluded.payload,
|
||||||
|
updated_at = now()
|
||||||
|
""",
|
||||||
|
UUID.randomUUID(),
|
||||||
|
tenantKey,
|
||||||
|
eventSourceId,
|
||||||
|
relationKey,
|
||||||
|
relation.relationType(),
|
||||||
|
relation.fromEntityType(),
|
||||||
|
relation.fromSourceEntityId(),
|
||||||
|
relation.toEntityType(),
|
||||||
|
relation.toSourceEntityId(),
|
||||||
|
relation.validFrom(),
|
||||||
|
relation.validTo(),
|
||||||
|
relation.sourceUpdatedAt(),
|
||||||
|
toJson(relation.payload())
|
||||||
|
);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UUID resolveOrCreateEntityId(
|
||||||
|
String tenantKey,
|
||||||
|
int eventSourceId,
|
||||||
|
String entityType,
|
||||||
|
String sourceEntityId,
|
||||||
|
String sourceExternalKey,
|
||||||
|
String displayName,
|
||||||
|
Boolean active,
|
||||||
|
Map<String, Object> payload
|
||||||
|
) {
|
||||||
|
String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey");
|
||||||
|
String normalizedEntityType = normalizeRequired(entityType, "entityType").toUpperCase();
|
||||||
|
String normalizedSourceEntityId = normalizeRequired(sourceEntityId, "sourceEntityId");
|
||||||
|
String normalizedSourceExternalKey = normalizeNullable(sourceExternalKey);
|
||||||
|
String normalizedDisplayName = normalizeNullable(displayName);
|
||||||
|
|
||||||
|
return jdbcTemplate.query(
|
||||||
|
con -> {
|
||||||
|
PreparedStatement ps = con.prepareStatement(
|
||||||
|
"""
|
||||||
|
insert into eventhub.source_master_entity(
|
||||||
|
id, tenant_key, event_source_id, entity_type, source_entity_id,
|
||||||
|
source_external_key, display_name, active, payload, updated_at
|
||||||
|
) values (?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, now())
|
||||||
|
on conflict (tenant_key, event_source_id, entity_type, source_entity_id)
|
||||||
|
do update set
|
||||||
|
source_external_key = coalesce(excluded.source_external_key, eventhub.source_master_entity.source_external_key),
|
||||||
|
display_name = coalesce(excluded.display_name, eventhub.source_master_entity.display_name),
|
||||||
|
active = coalesce(excluded.active, eventhub.source_master_entity.active),
|
||||||
|
payload = coalesce(excluded.payload, eventhub.source_master_entity.payload),
|
||||||
|
updated_at = now()
|
||||||
|
returning id
|
||||||
|
"""
|
||||||
|
);
|
||||||
|
ps.setObject(1, UUID.randomUUID());
|
||||||
|
ps.setString(2, normalizedTenantKey);
|
||||||
|
ps.setInt(3, eventSourceId);
|
||||||
|
ps.setString(4, normalizedEntityType);
|
||||||
|
ps.setString(5, normalizedSourceEntityId);
|
||||||
|
ps.setString(6, normalizedSourceExternalKey);
|
||||||
|
ps.setString(7, normalizedDisplayName);
|
||||||
|
ps.setObject(8, active);
|
||||||
|
ps.setString(9, toJson(payload));
|
||||||
|
return ps;
|
||||||
|
},
|
||||||
|
rs -> {
|
||||||
|
if (!rs.next()) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Could not resolve source master entity id for "
|
||||||
|
+ normalizedTenantKey + ":" + eventSourceId + ":" + normalizedEntityType + ":" + normalizedSourceEntityId
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return (UUID) rs.getObject(1);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String relationKey(SourceMasterRelationUpsert relation) {
|
||||||
|
String validFrom = relation.validFrom() == null ? "MIN" : relation.validFrom().toString();
|
||||||
|
String validTo = relation.validTo() == null ? "MAX" : relation.validTo().toString();
|
||||||
|
return relation.relationType()
|
||||||
|
+ ":" + relation.fromEntityType()
|
||||||
|
+ ":" + relation.fromSourceEntityId()
|
||||||
|
+ ":" + relation.toEntityType()
|
||||||
|
+ ":" + relation.toSourceEntityId()
|
||||||
|
+ ":" + validFrom
|
||||||
|
+ ":" + validTo;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String toJson(Map<String, Object> value) {
|
||||||
|
try {
|
||||||
|
return objectMapper.writeValueAsString(value == null ? Map.of() : value);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new IllegalArgumentException("Cannot serialize source master data payload", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String normalizeRequired(String value, String fieldName) {
|
||||||
|
String normalized = normalizeNullable(value);
|
||||||
|
if (normalized == null) {
|
||||||
|
throw new IllegalArgumentException(fieldName + " must not be blank");
|
||||||
|
}
|
||||||
|
return normalized;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String normalizeNullable(String value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String trimmed = value.trim();
|
||||||
|
return trimmed.isEmpty() ? null : trimmed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -70,7 +70,7 @@ public class EventHubIngestionService {
|
||||||
);
|
);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int insertedCount = eventRepository.batchInsert(packageId, eventSourceId, sortedEvents);
|
int insertedCount = eventRepository.batchInsert(packageId, packageInfo.tenantKey(), eventSourceId, sortedEvents);
|
||||||
dataPackageRepository.markImported(packageId, insertedCount);
|
dataPackageRepository.markImported(packageId, insertedCount);
|
||||||
log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={}",
|
log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={}",
|
||||||
packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount);
|
packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount);
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package at.procon.eventhub.tachograph.api;
|
||||||
import at.procon.eventhub.dto.AcquisitionStrategy;
|
import at.procon.eventhub.dto.AcquisitionStrategy;
|
||||||
import at.procon.eventhub.dto.ImportMode;
|
import at.procon.eventhub.dto.ImportMode;
|
||||||
import at.procon.eventhub.dto.SchedulerTriggerMode;
|
import at.procon.eventhub.dto.SchedulerTriggerMode;
|
||||||
|
import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult;
|
||||||
import at.procon.eventhub.tachograph.dto.ConfiguredTachographImportPlanDto;
|
import at.procon.eventhub.tachograph.dto.ConfiguredTachographImportPlanDto;
|
||||||
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
|
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
|
||||||
import at.procon.eventhub.tachograph.dto.TachographImportRunResultDto;
|
import at.procon.eventhub.tachograph.dto.TachographImportRunResultDto;
|
||||||
|
|
@ -11,6 +12,7 @@ import at.procon.eventhub.tachograph.dto.source.TachographActivityDto;
|
||||||
import at.procon.eventhub.tachograph.service.TachographConfiguredImportPlanService;
|
import at.procon.eventhub.tachograph.service.TachographConfiguredImportPlanService;
|
||||||
import at.procon.eventhub.tachograph.service.TachographImportExecutionService;
|
import at.procon.eventhub.tachograph.service.TachographImportExecutionService;
|
||||||
import at.procon.eventhub.tachograph.service.TachographImportPlanService;
|
import at.procon.eventhub.tachograph.service.TachographImportPlanService;
|
||||||
|
import at.procon.eventhub.tachograph.service.TachographMasterDataRefreshService;
|
||||||
import jakarta.validation.Valid;
|
import jakarta.validation.Valid;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -33,17 +35,20 @@ public class TachographIngestionController {
|
||||||
private final TachographImportPlanService tachographImportPlanService;
|
private final TachographImportPlanService tachographImportPlanService;
|
||||||
private final TachographConfiguredImportPlanService configuredImportPlanService;
|
private final TachographConfiguredImportPlanService configuredImportPlanService;
|
||||||
private final TachographImportExecutionService tachographImportExecutionService;
|
private final TachographImportExecutionService tachographImportExecutionService;
|
||||||
|
private final TachographMasterDataRefreshService masterDataRefreshService;
|
||||||
|
|
||||||
public TachographIngestionController(
|
public TachographIngestionController(
|
||||||
ProducerTemplate producerTemplate,
|
ProducerTemplate producerTemplate,
|
||||||
TachographImportPlanService tachographImportPlanService,
|
TachographImportPlanService tachographImportPlanService,
|
||||||
TachographConfiguredImportPlanService configuredImportPlanService,
|
TachographConfiguredImportPlanService configuredImportPlanService,
|
||||||
TachographImportExecutionService tachographImportExecutionService
|
TachographImportExecutionService tachographImportExecutionService,
|
||||||
|
TachographMasterDataRefreshService masterDataRefreshService
|
||||||
) {
|
) {
|
||||||
this.producerTemplate = producerTemplate;
|
this.producerTemplate = producerTemplate;
|
||||||
this.tachographImportPlanService = tachographImportPlanService;
|
this.tachographImportPlanService = tachographImportPlanService;
|
||||||
this.configuredImportPlanService = configuredImportPlanService;
|
this.configuredImportPlanService = configuredImportPlanService;
|
||||||
this.tachographImportExecutionService = tachographImportExecutionService;
|
this.tachographImportExecutionService = tachographImportExecutionService;
|
||||||
|
this.masterDataRefreshService = masterDataRefreshService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostMapping("/activities")
|
@PostMapping("/activities")
|
||||||
|
|
@ -68,6 +73,13 @@ public class TachographIngestionController {
|
||||||
return ResponseEntity.accepted().body(result);
|
return ResponseEntity.accepted().body(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping("/master-data/refresh")
|
||||||
|
public ResponseEntity<MasterDataRefreshResult> refreshTachographMasterData(
|
||||||
|
@Valid @RequestBody TachographImportRequest request
|
||||||
|
) {
|
||||||
|
return ResponseEntity.ok(masterDataRefreshService.refresh(request));
|
||||||
|
}
|
||||||
|
|
||||||
@GetMapping("/imports/configured-plans")
|
@GetMapping("/imports/configured-plans")
|
||||||
public ResponseEntity<List<ConfiguredTachographImportPlanDto>> listConfiguredTachographPlans() {
|
public ResponseEntity<List<ConfiguredTachographImportPlanDto>> listConfiguredTachographPlans() {
|
||||||
return ResponseEntity.ok(configuredImportPlanService.listPlans());
|
return ResponseEntity.ok(configuredImportPlanService.listPlans());
|
||||||
|
|
@ -99,6 +111,16 @@ public class TachographIngestionController {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping("/imports/configured-plans/{planKey}/master-data/refresh")
|
||||||
|
public ResponseEntity<MasterDataRefreshResult> refreshConfiguredTachographMasterData(
|
||||||
|
@PathVariable String planKey,
|
||||||
|
@RequestParam(required = false) ImportMode mode,
|
||||||
|
@RequestParam(required = false) AcquisitionStrategy strategy
|
||||||
|
) {
|
||||||
|
TachographImportRequest request = configuredImportPlanService.createRequest(planKey, mode, strategy);
|
||||||
|
return ResponseEntity.ok(masterDataRefreshService.refresh(request));
|
||||||
|
}
|
||||||
|
|
||||||
private ResponseEntity<Map<String, Object>> accepted(int count, String route) {
|
private ResponseEntity<Map<String, Object>> accepted(int count, String route) {
|
||||||
return ResponseEntity.accepted().body(Map.of(
|
return ResponseEntity.accepted().body(Map.of(
|
||||||
"accepted", count,
|
"accepted", count,
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
package at.procon.eventhub.tachograph.config;
|
package at.procon.eventhub.tachograph.config;
|
||||||
|
|
||||||
import at.procon.eventhub.config.EventHubProperties;
|
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
|
|
@ -13,15 +13,24 @@ import org.springframework.jdbc.datasource.DriverManagerDataSource;
|
||||||
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''")
|
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''")
|
||||||
public class TachographDataSourceConfig {
|
public class TachographDataSourceConfig {
|
||||||
|
|
||||||
|
private static final String SQL_SERVER_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
|
||||||
|
private static final String SQL_SERVER_JDBC_PREFIX = "jdbc:sqlserver://";
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public DataSource tachographDataSource(EventHubProperties properties) {
|
@ConfigurationProperties(prefix = "eventhub.tachograph.datasource")
|
||||||
EventHubProperties.TachographDataSource config = properties.getTachograph().getDatasource();
|
public TachographDataSourceProperties tachographDataSourceProperties() {
|
||||||
|
return new TachographDataSourceProperties();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(defaultCandidate = false)
|
||||||
|
public DataSource tachographDataSource(TachographDataSourceProperties config) {
|
||||||
DriverManagerDataSource dataSource = new DriverManagerDataSource();
|
DriverManagerDataSource dataSource = new DriverManagerDataSource();
|
||||||
dataSource.setUrl(config.getJdbcUrl());
|
dataSource.setUrl(validateJdbcUrl(config));
|
||||||
dataSource.setUsername(config.getUsername());
|
dataSource.setUsername(config.getUsername());
|
||||||
dataSource.setPassword(config.getPassword());
|
dataSource.setPassword(config.getPassword());
|
||||||
if (config.getDriverClassName() != null && !config.getDriverClassName().isBlank()) {
|
String driverClassName = trimToNull(config.getDriverClassName());
|
||||||
dataSource.setDriverClassName(config.getDriverClassName());
|
if (driverClassName != null) {
|
||||||
|
dataSource.setDriverClassName(driverClassName);
|
||||||
}
|
}
|
||||||
return dataSource;
|
return dataSource;
|
||||||
}
|
}
|
||||||
|
|
@ -32,4 +41,75 @@ public class TachographDataSourceConfig {
|
||||||
) {
|
) {
|
||||||
return new NamedParameterJdbcTemplate(tachographDataSource);
|
return new NamedParameterJdbcTemplate(tachographDataSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String validateJdbcUrl(TachographDataSourceProperties config) {
|
||||||
|
String jdbcUrl = trimToNull(config.getJdbcUrl());
|
||||||
|
if (jdbcUrl == null) {
|
||||||
|
throw new IllegalStateException("eventhub.tachograph.datasource.jdbc-url must not be empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
String driverClassName = trimToNull(config.getDriverClassName());
|
||||||
|
if (SQL_SERVER_DRIVER_CLASS.equals(driverClassName) && !jdbcUrl.startsWith(SQL_SERVER_JDBC_PREFIX)) {
|
||||||
|
if (jdbcUrl.startsWith("jdbc:")) {
|
||||||
|
String suggestedUrl = SQL_SERVER_JDBC_PREFIX + jdbcUrl.substring("jdbc:".length());
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Invalid SQL Server JDBC URL '" + jdbcUrl + "'. Expected prefix '"
|
||||||
|
+ SQL_SERVER_JDBC_PREFIX + "'. Example: " + suggestedUrl
|
||||||
|
);
|
||||||
|
}
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Invalid SQL Server JDBC URL '" + jdbcUrl + "'. Expected prefix '"
|
||||||
|
+ SQL_SERVER_JDBC_PREFIX + "'"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return jdbcUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String trimToNull(String value) {
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String trimmedValue = value.trim();
|
||||||
|
return trimmedValue.isEmpty() ? null : trimmedValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TachographDataSourceProperties {
|
||||||
|
private String jdbcUrl;
|
||||||
|
private String username;
|
||||||
|
private String password;
|
||||||
|
private String driverClassName;
|
||||||
|
|
||||||
|
public String getJdbcUrl() {
|
||||||
|
return jdbcUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJdbcUrl(String jdbcUrl) {
|
||||||
|
this.jdbcUrl = jdbcUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getUsername() {
|
||||||
|
return username;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUsername(String username) {
|
||||||
|
this.username = username;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPassword() {
|
||||||
|
return password;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPassword(String password) {
|
||||||
|
this.password = password;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDriverClassName() {
|
||||||
|
return driverClassName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDriverClassName(String driverClassName) {
|
||||||
|
this.driverClassName = driverClassName;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,27 +1,234 @@
|
||||||
package at.procon.eventhub.tachograph.service;
|
package at.procon.eventhub.tachograph.service;
|
||||||
|
|
||||||
|
import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult;
|
||||||
|
import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert;
|
||||||
|
import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert;
|
||||||
|
import at.procon.eventhub.persistence.EventSourceRepository;
|
||||||
|
import at.procon.eventhub.persistence.SourceMasterDataRepository;
|
||||||
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
|
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.ResultSetMetaData;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.ObjectProvider;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.core.io.Resource;
|
||||||
|
import org.springframework.core.io.ResourceLoader;
|
||||||
|
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.util.StreamUtils;
|
||||||
|
|
||||||
/**
|
|
||||||
* Hook for refreshing tachograph master data before event extraction.
|
|
||||||
*
|
|
||||||
* The generated project does not yet know the concrete tachograph master-data
|
|
||||||
* schema. Replace/extend this service with SQL readers for organisations,
|
|
||||||
* vehicles, vehicle registrations, drivers, and driver cards.
|
|
||||||
*/
|
|
||||||
@Service
|
@Service
|
||||||
public class TachographMasterDataRefreshService {
|
public class TachographMasterDataRefreshService {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(TachographMasterDataRefreshService.class);
|
private static final Logger log = LoggerFactory.getLogger(TachographMasterDataRefreshService.class);
|
||||||
|
|
||||||
public void refreshIfRequested(TachographImportRequest request) {
|
private static final List<String> ENTITY_SQL_RESOURCES = List.of(
|
||||||
if (!request.refreshMasterDataFirst()) {
|
"classpath:sql/tachograph/master-data/organisations.sql",
|
||||||
return;
|
"classpath:sql/tachograph/master-data/drivers.sql",
|
||||||
|
"classpath:sql/tachograph/master-data/driver-cards.sql",
|
||||||
|
"classpath:sql/tachograph/master-data/vehicles.sql",
|
||||||
|
"classpath:sql/tachograph/master-data/vehicle-registrations.sql"
|
||||||
|
);
|
||||||
|
|
||||||
|
private static final String RELATIONS_SQL_RESOURCE = "classpath:sql/tachograph/master-data/relations.sql";
|
||||||
|
|
||||||
|
private final ObjectProvider<NamedParameterJdbcTemplate> tachographJdbcTemplateProvider;
|
||||||
|
private final SourceMasterDataRepository sourceMasterDataRepository;
|
||||||
|
private final EventSourceRepository eventSourceRepository;
|
||||||
|
private final ResourceLoader resourceLoader;
|
||||||
|
|
||||||
|
public TachographMasterDataRefreshService(
|
||||||
|
@Qualifier("tachographNamedParameterJdbcTemplate") ObjectProvider<NamedParameterJdbcTemplate> tachographJdbcTemplateProvider,
|
||||||
|
SourceMasterDataRepository sourceMasterDataRepository,
|
||||||
|
EventSourceRepository eventSourceRepository,
|
||||||
|
ResourceLoader resourceLoader
|
||||||
|
) {
|
||||||
|
this.tachographJdbcTemplateProvider = tachographJdbcTemplateProvider;
|
||||||
|
this.sourceMasterDataRepository = sourceMasterDataRepository;
|
||||||
|
this.eventSourceRepository = eventSourceRepository;
|
||||||
|
this.resourceLoader = resourceLoader;
|
||||||
}
|
}
|
||||||
log.info("Tachograph master-data refresh requested for tenant={} source={}. Concrete SQL refresh is a project-specific extension point.",
|
|
||||||
request.tenantKey(), request.eventSource().stableKey());
|
public MasterDataRefreshResult refreshIfRequested(TachographImportRequest request) {
|
||||||
|
if (!request.refreshMasterDataFirst()) {
|
||||||
|
return MasterDataRefreshResult.empty();
|
||||||
|
}
|
||||||
|
return refresh(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MasterDataRefreshResult refresh(TachographImportRequest request) {
|
||||||
|
NamedParameterJdbcTemplate tachographJdbcTemplate = tachographJdbcTemplateProvider.getIfAvailable();
|
||||||
|
if (tachographJdbcTemplate == null) {
|
||||||
|
log.info("Skipping tachograph master-data refresh for tenant={} because no tachograph datasource is configured.",
|
||||||
|
request.tenantKey());
|
||||||
|
return MasterDataRefreshResult.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
String tenantKey = request.tenantKey() == null || request.tenantKey().isBlank() ? "default" : request.tenantKey().trim();
|
||||||
|
int eventSourceId = eventSourceRepository.resolveSourceId(tenantKey, request.eventSource());
|
||||||
|
|
||||||
|
int entities = 0;
|
||||||
|
for (String sqlResource : ENTITY_SQL_RESOURCES) {
|
||||||
|
List<SourceMasterEntityUpsert> batch = tachographJdbcTemplate.query(
|
||||||
|
loadSql(sqlResource),
|
||||||
|
Map.of(),
|
||||||
|
(rs, rowNum) -> entity(rs)
|
||||||
|
);
|
||||||
|
entities += sourceMasterDataRepository.upsertEntities(tenantKey, eventSourceId, batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<SourceMasterRelationUpsert> relations = tachographJdbcTemplate.query(
|
||||||
|
loadSql(RELATIONS_SQL_RESOURCE),
|
||||||
|
Map.of(),
|
||||||
|
(rs, rowNum) -> relation(rs)
|
||||||
|
);
|
||||||
|
int relationCount = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, relations);
|
||||||
|
|
||||||
|
MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount);
|
||||||
|
log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={}",
|
||||||
|
tenantKey, request.eventSource().stableKey(), result.entitiesUpserted(), result.relationsUpserted());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private SourceMasterEntityUpsert entity(ResultSet rs) throws SQLException {
|
||||||
|
String entityType = string(rs, "entity_type");
|
||||||
|
String sourceEntityId = string(rs, "source_entity_id");
|
||||||
|
OffsetDateTime validFrom = offsetDateTime(rs, "valid_from");
|
||||||
|
OffsetDateTime validTo = offsetDateTime(rs, "valid_to");
|
||||||
|
ValidityRange validityRange = normalizeValidityRange(
|
||||||
|
validFrom,
|
||||||
|
validTo,
|
||||||
|
"entity",
|
||||||
|
entityType + ":" + sourceEntityId
|
||||||
|
);
|
||||||
|
return new SourceMasterEntityUpsert(
|
||||||
|
entityType,
|
||||||
|
sourceEntityId,
|
||||||
|
string(rs, "source_external_key"),
|
||||||
|
string(rs, "display_name"),
|
||||||
|
bool(rs, "active"),
|
||||||
|
validityRange.validFrom(),
|
||||||
|
validityRange.validTo(),
|
||||||
|
offsetDateTime(rs, "source_updated_at"),
|
||||||
|
payload(rs)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private SourceMasterRelationUpsert relation(ResultSet rs) throws SQLException {
|
||||||
|
String relationType = string(rs, "relation_type");
|
||||||
|
String fromEntityType = string(rs, "from_entity_type");
|
||||||
|
String fromSourceEntityId = string(rs, "from_source_entity_id");
|
||||||
|
String toEntityType = string(rs, "to_entity_type");
|
||||||
|
String toSourceEntityId = string(rs, "to_source_entity_id");
|
||||||
|
OffsetDateTime validFrom = offsetDateTime(rs, "valid_from");
|
||||||
|
OffsetDateTime validTo = offsetDateTime(rs, "valid_to");
|
||||||
|
ValidityRange validityRange = normalizeValidityRange(
|
||||||
|
validFrom,
|
||||||
|
validTo,
|
||||||
|
"relation",
|
||||||
|
relationType + ":" + fromEntityType + ":" + fromSourceEntityId + "->" + toEntityType + ":" + toSourceEntityId
|
||||||
|
);
|
||||||
|
return new SourceMasterRelationUpsert(
|
||||||
|
relationType,
|
||||||
|
fromEntityType,
|
||||||
|
fromSourceEntityId,
|
||||||
|
toEntityType,
|
||||||
|
toSourceEntityId,
|
||||||
|
validityRange.validFrom(),
|
||||||
|
validityRange.validTo(),
|
||||||
|
offsetDateTime(rs, "source_updated_at"),
|
||||||
|
payload(rs)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> payload(ResultSet rs) throws SQLException {
|
||||||
|
ResultSetMetaData metaData = rs.getMetaData();
|
||||||
|
Map<String, Object> payload = new LinkedHashMap<>();
|
||||||
|
for (int i = 1; i <= metaData.getColumnCount(); i++) {
|
||||||
|
String name = metaData.getColumnLabel(i);
|
||||||
|
Object value = rs.getObject(i);
|
||||||
|
if (value != null) {
|
||||||
|
payload.put(name, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String loadSql(String location) {
|
||||||
|
Resource resource = resourceLoader.getResource(location);
|
||||||
|
try (var inputStream = resource.getInputStream()) {
|
||||||
|
return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalStateException("Cannot load tachograph master-data SQL resource " + location, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String string(ResultSet rs, String column) throws SQLException {
|
||||||
|
String value = rs.getString(column);
|
||||||
|
return value == null || value.isBlank() ? null : value.trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Boolean bool(ResultSet rs, String column) throws SQLException {
|
||||||
|
Object value = rs.getObject(column);
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (value instanceof Boolean bool) {
|
||||||
|
return bool;
|
||||||
|
}
|
||||||
|
if (value instanceof Number number) {
|
||||||
|
return number.intValue() != 0;
|
||||||
|
}
|
||||||
|
return Boolean.parseBoolean(value.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private OffsetDateTime offsetDateTime(ResultSet rs, String column) throws SQLException {
|
||||||
|
Object value = rs.getObject(column);
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (value instanceof OffsetDateTime offsetDateTime) {
|
||||||
|
return offsetDateTime;
|
||||||
|
}
|
||||||
|
if (value instanceof Timestamp timestamp) {
|
||||||
|
return timestamp.toInstant().atOffset(ZoneOffset.UTC);
|
||||||
|
}
|
||||||
|
if (value instanceof java.time.LocalDateTime localDateTime) {
|
||||||
|
return localDateTime.atOffset(ZoneOffset.UTC);
|
||||||
|
}
|
||||||
|
return OffsetDateTime.parse(value.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ValidityRange normalizeValidityRange(
|
||||||
|
OffsetDateTime validFrom,
|
||||||
|
OffsetDateTime validTo,
|
||||||
|
String rowKind,
|
||||||
|
String rowKey
|
||||||
|
) {
|
||||||
|
if (validFrom == null || validTo == null || !validFrom.isAfter(validTo)) {
|
||||||
|
return new ValidityRange(validFrom, validTo);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.warn(
|
||||||
|
"Ignoring invalid validity end for {} {} because valid_from {} is after valid_to {}. Keeping valid_from and setting valid_to=null.",
|
||||||
|
rowKind,
|
||||||
|
rowKey,
|
||||||
|
validFrom,
|
||||||
|
validTo
|
||||||
|
);
|
||||||
|
return new ValidityRange(validFrom, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private record ValidityRange(OffsetDateTime validFrom, OffsetDateTime validTo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,8 @@ spring:
|
||||||
name: eventhub-ingestion-service
|
name: eventhub-ingestion-service
|
||||||
datasource:
|
datasource:
|
||||||
url: jdbc:postgresql://localhost:5432/eventhub
|
url: jdbc:postgresql://localhost:5432/eventhub
|
||||||
username: eventhub
|
username: postgres
|
||||||
password: eventhub
|
password: P54!pcd#Wi
|
||||||
flyway:
|
flyway:
|
||||||
enabled: true
|
enabled: true
|
||||||
default-schema: eventhub
|
default-schema: eventhub
|
||||||
|
|
@ -34,11 +34,11 @@ eventhub:
|
||||||
occurred-at-overlap: 7d
|
occurred-at-overlap: 7d
|
||||||
|
|
||||||
# Configure this block to enable JdbcTachographExtractionBatchExecutor.
|
# Configure this block to enable JdbcTachographExtractionBatchExecutor.
|
||||||
# datasource:
|
datasource:
|
||||||
# jdbc-url: jdbc:sqlserver://localhost:1433;databaseName=tachograph;encrypt=true;trustServerCertificate=true
|
jdbc-url: jdbc:sqlserver://db.bytebar.eu:22996;databaseName=ByteBarDriverSettlement;trustServerCertificate=true
|
||||||
# username: tachograph_user
|
username: ReadOnly
|
||||||
# password: change-me
|
password: p2=race!
|
||||||
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
|
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
|
||||||
|
|
||||||
# Enables the scheduler that regularly triggers configured tachograph import plans.
|
# Enables the scheduler that regularly triggers configured tachograph import plans.
|
||||||
scheduler-enabled: false
|
scheduler-enabled: false
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,54 @@
|
||||||
|
create table if not exists eventhub.source_master_entity (
|
||||||
|
id uuid primary key,
|
||||||
|
tenant_key text not null,
|
||||||
|
event_source_id integer not null references eventhub.event_source(id),
|
||||||
|
entity_type text not null,
|
||||||
|
source_entity_id text not null,
|
||||||
|
source_external_key text,
|
||||||
|
display_name text,
|
||||||
|
active boolean,
|
||||||
|
valid_from timestamptz,
|
||||||
|
valid_to timestamptz,
|
||||||
|
source_updated_at timestamptz,
|
||||||
|
payload jsonb not null default '{}'::jsonb,
|
||||||
|
created_at timestamptz not null default now(),
|
||||||
|
updated_at timestamptz not null default now(),
|
||||||
|
constraint ux_source_master_entity unique (tenant_key, event_source_id, entity_type, source_entity_id),
|
||||||
|
constraint chk_source_master_entity_valid_time_order check (valid_from is null or valid_to is null or valid_from < valid_to)
|
||||||
|
);
|
||||||
|
|
||||||
|
create table if not exists eventhub.source_master_relation (
|
||||||
|
id uuid primary key,
|
||||||
|
tenant_key text not null,
|
||||||
|
event_source_id integer not null references eventhub.event_source(id),
|
||||||
|
relation_key text not null,
|
||||||
|
relation_type text not null,
|
||||||
|
from_entity_type text not null,
|
||||||
|
from_source_entity_id text not null,
|
||||||
|
to_entity_type text not null,
|
||||||
|
to_source_entity_id text not null,
|
||||||
|
valid_from timestamptz,
|
||||||
|
valid_to timestamptz,
|
||||||
|
source_updated_at timestamptz,
|
||||||
|
payload jsonb not null default '{}'::jsonb,
|
||||||
|
created_at timestamptz not null default now(),
|
||||||
|
updated_at timestamptz not null default now(),
|
||||||
|
constraint ux_source_master_relation unique (tenant_key, event_source_id, relation_key),
|
||||||
|
constraint chk_source_master_relation_valid_time_order check (valid_from is null or valid_to is null or valid_from < valid_to)
|
||||||
|
);
|
||||||
|
|
||||||
|
create index if not exists idx_source_master_entity_type_key
|
||||||
|
on eventhub.source_master_entity(tenant_key, event_source_id, entity_type, source_external_key)
|
||||||
|
where source_external_key is not null;
|
||||||
|
|
||||||
|
create index if not exists idx_source_master_entity_payload_gin
|
||||||
|
on eventhub.source_master_entity using gin(payload);
|
||||||
|
|
||||||
|
create index if not exists idx_source_master_relation_from
|
||||||
|
on eventhub.source_master_relation(tenant_key, event_source_id, from_entity_type, from_source_entity_id, relation_type);
|
||||||
|
|
||||||
|
create index if not exists idx_source_master_relation_to
|
||||||
|
on eventhub.source_master_relation(tenant_key, event_source_id, to_entity_type, to_source_entity_id, relation_type);
|
||||||
|
|
||||||
|
create index if not exists idx_source_master_relation_payload_gin
|
||||||
|
on eventhub.source_master_relation using gin(payload);
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
alter table eventhub.source_master_entity
|
||||||
|
drop constraint if exists chk_source_master_entity_valid_time_order;
|
||||||
|
|
||||||
|
alter table eventhub.source_master_entity
|
||||||
|
add constraint chk_source_master_entity_valid_time_order
|
||||||
|
check (valid_from is null or valid_to is null or valid_from <= valid_to);
|
||||||
|
|
||||||
|
alter table eventhub.source_master_relation
|
||||||
|
drop constraint if exists chk_source_master_relation_valid_time_order;
|
||||||
|
|
||||||
|
alter table eventhub.source_master_relation
|
||||||
|
add constraint chk_source_master_relation_valid_time_order
|
||||||
|
check (valid_from is null or valid_to is null or valid_from <= valid_to);
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
create extension if not exists postgis;
|
||||||
|
|
||||||
|
drop table if exists eventhub.event_detail;
|
||||||
|
drop table if exists eventhub.acquired_event;
|
||||||
|
drop table if exists eventhub.event;
|
||||||
|
|
||||||
|
create table eventhub.event (
|
||||||
|
id uuid not null,
|
||||||
|
event_source_id integer not null references eventhub.event_source(id),
|
||||||
|
data_package_id uuid not null references eventhub.data_package(id),
|
||||||
|
|
||||||
|
external_source_event_id text not null,
|
||||||
|
|
||||||
|
driver_entity_id uuid references eventhub.source_master_entity(id),
|
||||||
|
vehicle_entity_id uuid references eventhub.source_master_entity(id),
|
||||||
|
source_package_entity_id uuid references eventhub.source_master_entity(id),
|
||||||
|
|
||||||
|
occurred_at timestamptz not null,
|
||||||
|
received_partner_at timestamptz,
|
||||||
|
received_hub_at timestamptz not null default now(),
|
||||||
|
|
||||||
|
event_domain text not null,
|
||||||
|
event_type text not null,
|
||||||
|
lifecycle text not null,
|
||||||
|
|
||||||
|
odometer_m bigint,
|
||||||
|
position geography(Point, 4326),
|
||||||
|
|
||||||
|
payload jsonb not null default '{}'::jsonb,
|
||||||
|
manual_entry boolean not null default false,
|
||||||
|
|
||||||
|
source_record_key_hash text not null,
|
||||||
|
event_signature_hash text,
|
||||||
|
|
||||||
|
created_at timestamptz not null default now(),
|
||||||
|
|
||||||
|
constraint pk_event primary key (occurred_at, id),
|
||||||
|
constraint chk_event_driver_or_vehicle_ref check (
|
||||||
|
driver_entity_id is not null
|
||||||
|
or vehicle_entity_id is not null
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
create unique index ux_event_source_record
|
||||||
|
on eventhub.event(source_record_key_hash);
|
||||||
|
|
||||||
|
create index idx_event_signature
|
||||||
|
on eventhub.event(event_signature_hash)
|
||||||
|
where event_signature_hash is not null;
|
||||||
|
|
||||||
|
create index idx_event_source_time
|
||||||
|
on eventhub.event(event_source_id, occurred_at desc);
|
||||||
|
|
||||||
|
create index idx_event_package_time
|
||||||
|
on eventhub.event(data_package_id, occurred_at desc);
|
||||||
|
|
||||||
|
create index idx_event_domain_type_time
|
||||||
|
on eventhub.event(event_domain, event_type, occurred_at desc);
|
||||||
|
|
||||||
|
create index idx_event_driver_time
|
||||||
|
on eventhub.event(driver_entity_id, occurred_at desc)
|
||||||
|
where driver_entity_id is not null;
|
||||||
|
|
||||||
|
create index idx_event_vehicle_time
|
||||||
|
on eventhub.event(vehicle_entity_id, occurred_at desc)
|
||||||
|
where vehicle_entity_id is not null;
|
||||||
|
|
||||||
|
create index idx_event_position_gist
|
||||||
|
on eventhub.event using gist(position)
|
||||||
|
where position is not null;
|
||||||
|
|
||||||
|
create index idx_event_payload_gin
|
||||||
|
on eventhub.event using gin(payload);
|
||||||
|
|
||||||
|
create table eventhub.event_detail (
|
||||||
|
event_occurred_at timestamptz not null,
|
||||||
|
event_id uuid not null,
|
||||||
|
detail_type text not null,
|
||||||
|
attributes jsonb not null default '{}'::jsonb,
|
||||||
|
created_at timestamptz not null default now(),
|
||||||
|
constraint pk_event_detail primary key (event_occurred_at, event_id, detail_type),
|
||||||
|
constraint fk_event_detail_event foreign key (event_occurred_at, event_id)
|
||||||
|
references eventhub.event(occurred_at, id)
|
||||||
|
on delete cascade
|
||||||
|
);
|
||||||
|
|
||||||
|
create index idx_event_detail_type
|
||||||
|
on eventhub.event_detail(detail_type);
|
||||||
|
|
||||||
|
create index idx_event_detail_attributes_gin
|
||||||
|
on eventhub.event_detail using gin(attributes);
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
select
|
||||||
|
'DRIVER_CARD' as entity_type,
|
||||||
|
cast(c.ID as varchar(128)) as source_entity_id,
|
||||||
|
concat(coalesce(n.AlphaCode, ''), ':', c.CardNumber) as source_external_key,
|
||||||
|
concat(coalesce(n.AlphaCode, ''), ':', c.CardNumber) as display_name,
|
||||||
|
cast(case when c.IsLastCard = 1 and (c.ExpiryDate is null or c.ExpiryDate > getutcdate()) then 1 else 0 end as bit) as active,
|
||||||
|
c.IssueDate as valid_from,
|
||||||
|
c.ExpiryDate as valid_to,
|
||||||
|
cast(null as datetime) as source_updated_at,
|
||||||
|
c.ID as card_id,
|
||||||
|
cast(c.ID_Driver as varchar(128)) as driver_id,
|
||||||
|
n.AlphaCode as card_nation,
|
||||||
|
c.CardNumber as card_number,
|
||||||
|
c.Consecutive as consecutive,
|
||||||
|
c.Replacement as replacement,
|
||||||
|
c.Renewal as renewal,
|
||||||
|
c.IssueDate as issue_date,
|
||||||
|
c.ExpiryDate as expiry_date,
|
||||||
|
c.ValidityDate as validity_date,
|
||||||
|
c.Authorityname as authority_name,
|
||||||
|
c.ID_FileLog as file_log_id,
|
||||||
|
c.IsAnalog as is_analog,
|
||||||
|
c.IsLastCard as is_last_card,
|
||||||
|
c.Generation as generation
|
||||||
|
from dbo.Card c
|
||||||
|
left join dbo.Nation n on n.ID = c.ID_Nation
|
||||||
|
where c.CardNumber is not null
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
select
|
||||||
|
'DRIVER' as entity_type,
|
||||||
|
cast(d.ID as varchar(128)) as source_entity_id,
|
||||||
|
coalesce(nullif(d.IdentificationNumber, ''), nullif(d.LicenseNumber, ''), cast(d.ID as varchar(128))) as source_external_key,
|
||||||
|
ltrim(rtrim(concat(d.Surname, ' ', d.Firstnames))) as display_name,
|
||||||
|
cast(case when d.IsActive = 1 then 1 else 0 end as bit) as active,
|
||||||
|
d.EmploymentStartDate as valid_from,
|
||||||
|
d.RetirementDate as valid_to,
|
||||||
|
d.LastUpdate as source_updated_at,
|
||||||
|
d.ID as driver_id,
|
||||||
|
d.Surname as surname,
|
||||||
|
d.Firstnames as first_names,
|
||||||
|
d.Birthdate as birth_date,
|
||||||
|
d.BirthPlace as birth_place,
|
||||||
|
d.LicenseNumber as license_number,
|
||||||
|
ln.AlphaCode as license_nation,
|
||||||
|
d.LicenseAuthority as license_authority,
|
||||||
|
d.LExpiryDate as license_expiry_date,
|
||||||
|
d.Phone as phone,
|
||||||
|
d.Mobile as mobile,
|
||||||
|
d.Email as email,
|
||||||
|
d.CostCenter as cost_center,
|
||||||
|
d.OrganisationUserID as organisation_user_id,
|
||||||
|
d.IdentificationNumber as identification_number,
|
||||||
|
d.Region as region,
|
||||||
|
d.ID_FileLog as file_log_id,
|
||||||
|
d.LastFileLog_ID as last_file_log_id,
|
||||||
|
d.FirstDrivingTime as first_driving_time,
|
||||||
|
d.LastDrivingTime as last_driving_time,
|
||||||
|
d.LastUsedVehicle_ID as last_used_vehicle_id,
|
||||||
|
d.LastUsedVehicleTime as last_used_vehicle_time,
|
||||||
|
d.IsDigital as is_digital,
|
||||||
|
d.IsAnalog as is_analog
|
||||||
|
from dbo.Driver d
|
||||||
|
left join dbo.Nation ln on ln.ID = d.ID_LicenseNation
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
select
|
||||||
|
'ORGANISATION' as entity_type,
|
||||||
|
cast(o.oid as varchar(128)) as source_entity_id,
|
||||||
|
cast(o.GUID as varchar(64)) as source_external_key,
|
||||||
|
coalesce(nullif(o.name, ''), nullif(o.kurzbez, ''), cast(o.oid as varchar(128))) as display_name,
|
||||||
|
cast(case when o.gueltigbis is null or o.gueltigbis > getutcdate() then 1 else 0 end as bit) as active,
|
||||||
|
o.VertragGiltAb as valid_from,
|
||||||
|
o.gueltigbis as valid_to,
|
||||||
|
cast(null as datetime) as source_updated_at,
|
||||||
|
o.oid as organisation_id,
|
||||||
|
cast(o.GUID as varchar(64)) as organisation_guid,
|
||||||
|
o.kurzbez as organisation_code,
|
||||||
|
o.name as organisation_name,
|
||||||
|
cast(o.n_rekey01 as varchar(128)) as parent_organisation_id,
|
||||||
|
o.Unternehmens_ID as company_id,
|
||||||
|
o.kostenstelle as cost_center,
|
||||||
|
o.Country as country,
|
||||||
|
o.PostalCode as postal_code,
|
||||||
|
o.City as city,
|
||||||
|
o.strasse as street,
|
||||||
|
o.Email as email
|
||||||
|
from dbo.I_90021 o
|
||||||
|
|
@ -0,0 +1,75 @@
|
||||||
|
select
|
||||||
|
'ORGANISATION_PARENT' as relation_type,
|
||||||
|
'ORGANISATION' as from_entity_type,
|
||||||
|
cast(o.oid as varchar(128)) as from_source_entity_id,
|
||||||
|
'ORGANISATION' as to_entity_type,
|
||||||
|
cast(o.n_rekey01 as varchar(128)) as to_source_entity_id,
|
||||||
|
o.VertragGiltAb as valid_from,
|
||||||
|
o.gueltigbis as valid_to,
|
||||||
|
cast(null as datetime) as source_updated_at,
|
||||||
|
'I_90021' as source_table,
|
||||||
|
cast(o.oid as varchar(128)) as source_row_id
|
||||||
|
from dbo.I_90021 o
|
||||||
|
where o.n_rekey01 is not null
|
||||||
|
|
||||||
|
union all
|
||||||
|
|
||||||
|
select
|
||||||
|
'DRIVER_ORGANISATION' as relation_type,
|
||||||
|
'DRIVER' as from_entity_type,
|
||||||
|
cast(rel.ID_Driver as varchar(128)) as from_source_entity_id,
|
||||||
|
'ORGANISATION' as to_entity_type,
|
||||||
|
cast(rel.ID_I_90021 as varchar(128)) as to_source_entity_id,
|
||||||
|
rel.GILT_AB as valid_from,
|
||||||
|
rel.GILT_BIS as valid_to,
|
||||||
|
rel.LastUpdate as source_updated_at,
|
||||||
|
'Driver_I_90021' as source_table,
|
||||||
|
cast(rel.ID as varchar(128)) as source_row_id
|
||||||
|
from dbo.Driver_I_90021 rel
|
||||||
|
|
||||||
|
union all
|
||||||
|
|
||||||
|
select
|
||||||
|
'DRIVER_CARD_DRIVER' as relation_type,
|
||||||
|
'DRIVER_CARD' as from_entity_type,
|
||||||
|
cast(c.ID as varchar(128)) as from_source_entity_id,
|
||||||
|
'DRIVER' as to_entity_type,
|
||||||
|
cast(c.ID_Driver as varchar(128)) as to_source_entity_id,
|
||||||
|
c.IssueDate as valid_from,
|
||||||
|
c.ExpiryDate as valid_to,
|
||||||
|
cast(null as datetime) as source_updated_at,
|
||||||
|
'Card' as source_table,
|
||||||
|
cast(c.ID as varchar(128)) as source_row_id
|
||||||
|
from dbo.Card c
|
||||||
|
where c.ID_Driver is not null
|
||||||
|
|
||||||
|
union all
|
||||||
|
|
||||||
|
select
|
||||||
|
'VEHICLE_ORGANISATION' as relation_type,
|
||||||
|
'VEHICLE_REGISTRATION' as from_entity_type,
|
||||||
|
cast(rel.ID_Vehicle as varchar(128)) as from_source_entity_id,
|
||||||
|
'ORGANISATION' as to_entity_type,
|
||||||
|
cast(rel.ID_I_90021 as varchar(128)) as to_source_entity_id,
|
||||||
|
rel.GILT_AB as valid_from,
|
||||||
|
rel.GILT_BIS as valid_to,
|
||||||
|
rel.LastUpdate as source_updated_at,
|
||||||
|
'Vehicle_I_90021' as source_table,
|
||||||
|
cast(rel.ID as varchar(128)) as source_row_id
|
||||||
|
from dbo.Vehicle_I_90021 rel
|
||||||
|
|
||||||
|
union all
|
||||||
|
|
||||||
|
select
|
||||||
|
'VEHICLE_REGISTRATION_VEHICLE' as relation_type,
|
||||||
|
'VEHICLE_REGISTRATION' as from_entity_type,
|
||||||
|
cast(v.ID as varchar(128)) as from_source_entity_id,
|
||||||
|
'VEHICLE' as to_entity_type,
|
||||||
|
cast(v.ID_VehicleIdentification as varchar(128)) as to_source_entity_id,
|
||||||
|
v.ValidFrom as valid_from,
|
||||||
|
v.ValidTo as valid_to,
|
||||||
|
cast(null as datetime) as source_updated_at,
|
||||||
|
'Vehicle' as source_table,
|
||||||
|
cast(v.ID as varchar(128)) as source_row_id
|
||||||
|
from dbo.Vehicle v
|
||||||
|
where v.ID_VehicleIdentification is not null
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
select
|
||||||
|
'VEHICLE_REGISTRATION' as entity_type,
|
||||||
|
cast(v.ID as varchar(128)) as source_entity_id,
|
||||||
|
concat(coalesce(n.AlphaCode, ''), ':', v.VRN) as source_external_key,
|
||||||
|
concat(coalesce(n.AlphaCode, ''), ':', v.VRN) as display_name,
|
||||||
|
cast(case when v.ValidTo is null or v.ValidTo > getutcdate() then 1 else 0 end as bit) as active,
|
||||||
|
v.ValidFrom as valid_from,
|
||||||
|
v.ValidTo as valid_to,
|
||||||
|
cast(null as datetime) as source_updated_at,
|
||||||
|
v.ID as vehicle_registration_id,
|
||||||
|
cast(v.ID_VehicleIdentification as varchar(128)) as vehicle_identification_id,
|
||||||
|
n.AlphaCode as registration_nation,
|
||||||
|
v.VRN as registration_number,
|
||||||
|
v.VrnNormalized as registration_number_normalized,
|
||||||
|
v.ID_FileLog as file_log_id,
|
||||||
|
v.LastDriver_ID as last_driver_id,
|
||||||
|
v.LastDriverTime as last_driver_time
|
||||||
|
from dbo.Vehicle v
|
||||||
|
left join dbo.Nation n on n.ID = v.ID_Nation
|
||||||
|
where v.VRN is not null
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
select
|
||||||
|
'VEHICLE' as entity_type,
|
||||||
|
cast(vi.ID as varchar(128)) as source_entity_id,
|
||||||
|
vi.VIN as source_external_key,
|
||||||
|
vi.VIN as display_name,
|
||||||
|
cast(case when vi.IsActive = 1 then 1 else 0 end as bit) as active,
|
||||||
|
cast(null as datetime) as valid_from,
|
||||||
|
cast(null as datetime) as valid_to,
|
||||||
|
cast(null as datetime) as source_updated_at,
|
||||||
|
vi.ID as vehicle_identification_id,
|
||||||
|
vi.VIN as vin,
|
||||||
|
vi.CostCenter as cost_center,
|
||||||
|
vi.ID_FileLog as file_log_id,
|
||||||
|
vi.BillingDate as billing_date,
|
||||||
|
vi.Mobile as mobile,
|
||||||
|
vi.Email as email,
|
||||||
|
vi.GrossVehicleMass as gross_vehicle_mass,
|
||||||
|
vi.VehicleType_ID as vehicle_type_id,
|
||||||
|
vi.FirstDrivingTime as first_driving_time,
|
||||||
|
vi.LastDrivingTime as last_driving_time,
|
||||||
|
vi.LastOdoEnd as last_odo_end,
|
||||||
|
vi.LastOdoEndTime as last_odo_end_time,
|
||||||
|
vi.LastDriver_ID as last_driver_id,
|
||||||
|
vi.LastDriverTime as last_driver_time,
|
||||||
|
vi.LastFileLog_ID as last_file_log_id,
|
||||||
|
vi.IsDigital as is_digital,
|
||||||
|
vi.IsAnalog as is_analog
|
||||||
|
from dbo.VehicleIdentification vi
|
||||||
Loading…
Reference in New Issue