eventhub/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefresh...

362 lines
16 KiB
Java

package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult;
import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert;
import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.persistence.EventSourceRepository;
import at.procon.eventhub.persistence.SourceMasterDataRepository;
import at.procon.eventhub.persistence.VehicleIdentityRepository;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StreamUtils;
@Service
public class TachographMasterDataRefreshService {
private static final Logger log = LoggerFactory.getLogger(TachographMasterDataRefreshService.class);
private static final List<String> ENTITY_SQL_RESOURCES = List.of(
"classpath:sql/tachograph/master-data/organisations.sql",
"classpath:sql/tachograph/master-data/drivers.sql",
"classpath:sql/tachograph/master-data/driver-cards.sql",
"classpath:sql/tachograph/master-data/vehicles.sql",
"classpath:sql/tachograph/master-data/vehicle-registrations.sql"
);
private static final String RELATIONS_SQL_RESOURCE = "classpath:sql/tachograph/master-data/relations.sql";
private static final String MASTER_DATA_SOURCE_KIND = "MASTER_DATA";
private static final String MASTER_DATA_SOURCE_KEY = "TACHOGRAPH_MASTER_DATA";
private static final int MASTER_DATA_UPSERT_BATCH_SIZE = 5000;
private final ObjectProvider<NamedParameterJdbcTemplate> tachographJdbcTemplateProvider;
private final SourceMasterDataRepository sourceMasterDataRepository;
private final EventSourceRepository eventSourceRepository;
private final VehicleIdentityRepository vehicleIdentityRepository;
private final ResourceLoader resourceLoader;
public TachographMasterDataRefreshService(
@Qualifier("tachographNamedParameterJdbcTemplate") ObjectProvider<NamedParameterJdbcTemplate> tachographJdbcTemplateProvider,
SourceMasterDataRepository sourceMasterDataRepository,
EventSourceRepository eventSourceRepository,
VehicleIdentityRepository vehicleIdentityRepository,
ResourceLoader resourceLoader
) {
this.tachographJdbcTemplateProvider = tachographJdbcTemplateProvider;
this.sourceMasterDataRepository = sourceMasterDataRepository;
this.eventSourceRepository = eventSourceRepository;
this.vehicleIdentityRepository = vehicleIdentityRepository;
this.resourceLoader = resourceLoader;
}
public MasterDataRefreshResult refreshIfRequested(TachographImportRequest request) {
if (!request.refreshMasterDataFirst()) {
return MasterDataRefreshResult.empty();
}
return refresh(request);
}
public MasterDataRefreshResult refresh(TachographImportRequest request) {
NamedParameterJdbcTemplate tachographJdbcTemplate = tachographJdbcTemplateProvider.getIfAvailable();
if (tachographJdbcTemplate == null) {
log.info("Skipping tachograph master-data refresh for tenant={} because no tachograph datasource is configured.",
request.tenantKey());
return MasterDataRefreshResult.empty();
}
String tenantKey = request.tenantKey() == null || request.tenantKey().isBlank() ? "default" : request.tenantKey().trim();
EventSourceDto masterDataSource = masterDataSourceFor(request.eventSource());
int eventSourceId = eventSourceRepository.resolveSourceId(tenantKey, masterDataSource);
log.info("Starting tachograph source master-data refresh tenant={} source={} batchSize={}",
tenantKey, masterDataSource.stableKey(), MASTER_DATA_UPSERT_BATCH_SIZE);
int entities = 0;
for (String sqlResource : ENTITY_SQL_RESOURCES) {
entities += streamEntities(tachographJdbcTemplate, tenantKey, eventSourceId, sqlResource, loadSql(sqlResource));
}
int relationCount = streamRelations(tachographJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE));
log.info("Reconciling tachograph vehicle identities from source master data tenant={} source={}",
tenantKey, masterDataSource.stableKey());
int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount);
log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={} reconciledVehicles={}",
tenantKey, masterDataSource.stableKey(), result.entitiesUpserted(), result.relationsUpserted(), reconciledVehicles);
return result;
}
private int streamEntities(
NamedParameterJdbcTemplate tachographJdbcTemplate,
String tenantKey,
int eventSourceId,
String sqlResource,
String sql
) {
String section = masterDataSection(sqlResource);
List<SourceMasterEntityUpsert> buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE);
Map<String, Integer> typeCounts = new LinkedHashMap<>();
int[] count = {0};
int[] chunk = {0};
log.info("Reading tachograph master-data entities tenant={} section={}", tenantKey, section);
tachographJdbcTemplate.query(sql, Map.of(), rs -> {
SourceMasterEntityUpsert entity = entity(rs);
buffer.add(entity);
increment(typeCounts, entity.entityType());
if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) {
count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
buffer.clear();
}
});
if (!buffer.isEmpty()) {
count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
}
log.info("Finished tachograph master-data entities tenant={} section={} processed={} byType={}",
tenantKey, section, count[0], typeCounts);
return count[0];
}
private int streamRelations(
NamedParameterJdbcTemplate tachographJdbcTemplate,
String tenantKey,
int eventSourceId,
String sqlResource,
String sql
) {
String section = masterDataSection(sqlResource);
List<SourceMasterRelationUpsert> buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE);
Map<String, Integer> typeCounts = new LinkedHashMap<>();
int[] count = {0};
int[] chunk = {0};
log.info("Reading tachograph master-data relations tenant={} section={}", tenantKey, section);
tachographJdbcTemplate.query(sql, Map.of(), rs -> {
SourceMasterRelationUpsert relation = relation(rs);
buffer.add(relation);
increment(typeCounts, relation.relationType());
if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) {
count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
buffer.clear();
}
});
if (!buffer.isEmpty()) {
count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
}
log.info("Finished tachograph master-data relations tenant={} section={} processed={} byType={}",
tenantKey, section, count[0], typeCounts);
return count[0];
}
private int flushEntities(
String tenantKey,
int eventSourceId,
String section,
int chunk,
List<SourceMasterEntityUpsert> buffer,
int alreadyProcessed,
Map<String, Integer> typeCounts
) {
int upserted = sourceMasterDataRepository.upsertEntities(tenantKey, eventSourceId, buffer);
log.info("Tachograph master-data entity progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}",
tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts);
return upserted;
}
private int flushRelations(
String tenantKey,
int eventSourceId,
String section,
int chunk,
List<SourceMasterRelationUpsert> buffer,
int alreadyProcessed,
Map<String, Integer> typeCounts
) {
int upserted = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, buffer);
log.info("Tachograph master-data relation progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}",
tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts);
return upserted;
}
private void increment(Map<String, Integer> counts, String type) {
String key = type == null || type.isBlank() ? "UNKNOWN" : type;
counts.merge(key, 1, Integer::sum);
}
private String masterDataSection(String sqlResource) {
int slash = sqlResource.lastIndexOf('/');
String fileName = slash < 0 ? sqlResource : sqlResource.substring(slash + 1);
return fileName.endsWith(".sql") ? fileName.substring(0, fileName.length() - 4) : fileName;
}
private EventSourceDto masterDataSourceFor(EventSourceDto source) {
return new EventSourceDto(
source.providerKey(),
MASTER_DATA_SOURCE_KIND,
MASTER_DATA_SOURCE_KEY,
source.sourceInstanceKey(),
source.tenantProviderSettingKey(),
source.externalFleetKey()
);
}
private SourceMasterEntityUpsert entity(ResultSet rs) throws SQLException {
String entityType = string(rs, "entity_type");
String sourceEntityId = string(rs, "source_entity_id");
OffsetDateTime validFrom = offsetDateTime(rs, "valid_from");
OffsetDateTime validTo = offsetDateTime(rs, "valid_to");
ValidityRange validityRange = normalizeValidityRange(
validFrom,
validTo,
"entity",
entityType + ":" + sourceEntityId
);
return new SourceMasterEntityUpsert(
entityType,
sourceEntityId,
string(rs, "source_external_key"),
string(rs, "display_name"),
bool(rs, "active"),
validityRange.validFrom(),
validityRange.validTo(),
offsetDateTime(rs, "source_updated_at"),
payload(rs)
);
}
private SourceMasterRelationUpsert relation(ResultSet rs) throws SQLException {
String relationType = string(rs, "relation_type");
String fromEntityType = string(rs, "from_entity_type");
String fromSourceEntityId = string(rs, "from_source_entity_id");
String toEntityType = string(rs, "to_entity_type");
String toSourceEntityId = string(rs, "to_source_entity_id");
OffsetDateTime validFrom = offsetDateTime(rs, "valid_from");
OffsetDateTime validTo = offsetDateTime(rs, "valid_to");
ValidityRange validityRange = normalizeValidityRange(
validFrom,
validTo,
"relation",
relationType + ":" + fromEntityType + ":" + fromSourceEntityId + "->" + toEntityType + ":" + toSourceEntityId
);
return new SourceMasterRelationUpsert(
relationType,
fromEntityType,
fromSourceEntityId,
toEntityType,
toSourceEntityId,
validityRange.validFrom(),
validityRange.validTo(),
offsetDateTime(rs, "source_updated_at"),
payload(rs)
);
}
private Map<String, Object> payload(ResultSet rs) throws SQLException {
ResultSetMetaData metaData = rs.getMetaData();
Map<String, Object> payload = new LinkedHashMap<>();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String name = metaData.getColumnLabel(i);
Object value = rs.getObject(i);
if (value != null) {
payload.put(name, value);
}
}
return payload;
}
private String loadSql(String location) {
Resource resource = resourceLoader.getResource(location);
try (var inputStream = resource.getInputStream()) {
return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IllegalStateException("Cannot load tachograph master-data SQL resource " + location, e);
}
}
private String string(ResultSet rs, String column) throws SQLException {
String value = rs.getString(column);
return value == null || value.isBlank() ? null : value.trim();
}
private Boolean bool(ResultSet rs, String column) throws SQLException {
Object value = rs.getObject(column);
if (value == null) {
return null;
}
if (value instanceof Boolean bool) {
return bool;
}
if (value instanceof Number number) {
return number.intValue() != 0;
}
return Boolean.parseBoolean(value.toString());
}
private OffsetDateTime offsetDateTime(ResultSet rs, String column) throws SQLException {
Object value = rs.getObject(column);
if (value == null) {
return null;
}
if (value instanceof OffsetDateTime offsetDateTime) {
return offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC);
}
if (value instanceof Timestamp timestamp) {
return timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC);
}
if (value instanceof java.time.LocalDateTime localDateTime) {
return localDateTime.atOffset(ZoneOffset.UTC);
}
String text = value.toString();
try {
return OffsetDateTime.parse(text).withOffsetSameInstant(ZoneOffset.UTC);
} catch (RuntimeException ignored) {
return LocalDateTime.parse(text).atOffset(ZoneOffset.UTC);
}
}
private ValidityRange normalizeValidityRange(
OffsetDateTime validFrom,
OffsetDateTime validTo,
String rowKind,
String rowKey
) {
if (validFrom == null || validTo == null || !validFrom.isAfter(validTo)) {
return new ValidityRange(validFrom, validTo);
}
log.warn(
"Ignoring invalid validity end for {} {} because valid_from {} is after valid_to {}. Keeping valid_from and setting valid_to=null.",
rowKind,
rowKey,
validFrom,
validTo
);
return new ValidityRange(validFrom, null);
}
private record ValidityRange(OffsetDateTime validFrom, OffsetDateTime validTo) {
}
}