diff --git a/postman/eventhub-rest-methods.postman_collection.json b/postman/eventhub-rest-methods.postman_collection.json index 43671b0..d8110cd 100644 --- a/postman/eventhub-rest-methods.postman_collection.json +++ b/postman/eventhub-rest-methods.postman_collection.json @@ -12,6 +12,10 @@ { "key": "planKey", "value": "kralowetz-tachograph-org-147" + }, + { + "key": "yellowFoxPlanKey", + "value": "yellowfox-d8-default" } ], "item": [ @@ -376,6 +380,219 @@ ] } } + }, + { + "name": "POST /api/eventhub/acquisition/yellowfox/d8/imports/plan", + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"tenantKey\": \"Procon\",\n \"eventSource\": {\n \"providerKey\": \"YELLOWFOX\",\n \"sourceKind\": \"TELEMATICS_PLATFORM\",\n \"sourceKey\": \"YELLOWFOX_D8\",\n \"sourceInstanceKey\": \"logistics-db-prod\",\n \"tenantProviderSettingKey\": \"yellowfox-main\",\n \"externalFleetKey\": null\n },\n \"sourceGroup\": null,\n \"importScope\": {\n \"type\": \"TENANT_ALL\",\n \"rootSourceOrganisation\": null,\n \"includeChildren\": false,\n \"occurredFrom\": null,\n \"occurredTo\": null\n },\n \"eventFamilies\": [\n \"DRIVER_ACTIVITY\",\n \"DRIVER_CARD\"\n ],\n \"mode\": \"INCREMENTAL_UPDATE\",\n \"refreshMasterDataFirst\": false,\n \"acquisitionStrategy\": \"SOURCE_ROW_WATERMARK\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/plan", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "eventhub", + "acquisition", + "yellowfox", + "d8", + "imports", + "plan" + ] + } + } + }, + { + "name": "POST /api/eventhub/acquisition/yellowfox/d8/imports/start?execute=true", + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"tenantKey\": \"Procon\",\n \"eventSource\": {\n \"providerKey\": \"YELLOWFOX\",\n \"sourceKind\": \"TELEMATICS_PLATFORM\",\n \"sourceKey\": \"YELLOWFOX_D8\",\n \"sourceInstanceKey\": \"logistics-db-prod\",\n \"tenantProviderSettingKey\": \"yellowfox-main\",\n \"externalFleetKey\": null\n },\n \"sourceGroup\": null,\n \"importScope\": {\n \"type\": \"TENANT_ALL\",\n \"rootSourceOrganisation\": null,\n \"includeChildren\": false,\n \"occurredFrom\": null,\n \"occurredTo\": null\n },\n \"eventFamilies\": [\n \"DRIVER_ACTIVITY\",\n \"DRIVER_CARD\"\n ],\n \"mode\": \"INCREMENTAL_UPDATE\",\n \"refreshMasterDataFirst\": false,\n \"acquisitionStrategy\": \"SOURCE_ROW_WATERMARK\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/start?execute=true", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "eventhub", + "acquisition", + "yellowfox", + "d8", + "imports", + "start" + ], + "query": [ + { + "key": "execute", + "value": "true" + } + ] + } + } + }, + { + "name": "POST /api/eventhub/acquisition/yellowfox/d8/master-data/refresh", + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"tenantKey\": \"Procon\",\n \"eventSource\": {\n \"providerKey\": \"YELLOWFOX\",\n \"sourceKind\": \"TELEMATICS_PLATFORM\",\n \"sourceKey\": \"YELLOWFOX_D8\",\n \"sourceInstanceKey\": \"logistics-db-prod\",\n \"tenantProviderSettingKey\": \"yellowfox-main\",\n \"externalFleetKey\": null\n },\n \"sourceGroup\": null,\n \"importScope\": {\n \"type\": \"TENANT_ALL\",\n \"rootSourceOrganisation\": null,\n \"includeChildren\": false,\n \"occurredFrom\": null,\n \"occurredTo\": null\n },\n \"eventFamilies\": [\n \"DRIVER_ACTIVITY\",\n \"DRIVER_CARD\"\n ],\n \"mode\": \"INCREMENTAL_UPDATE\",\n \"refreshMasterDataFirst\": true,\n \"acquisitionStrategy\": \"SOURCE_ROW_WATERMARK\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/master-data/refresh", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "eventhub", + "acquisition", + "yellowfox", + "d8", + "master-data", + "refresh" + ] + } + } + }, + { + "name": "GET /api/eventhub/acquisition/yellowfox/d8/imports/configured-plans", + "request": { + "method": "GET", + "url": { + "raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/configured-plans", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "eventhub", + "acquisition", + "yellowfox", + "d8", + "imports", + "configured-plans" + ] + } + } + }, + { + "name": "GET /api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{planKey}", + "request": { + "method": "GET", + "url": { + "raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{{yellowFoxPlanKey}}", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "eventhub", + "acquisition", + "yellowfox", + "d8", + "imports", + "configured-plans", + "{{yellowFoxPlanKey}}" + ] + } + } + }, + { + "name": "POST /api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{planKey}/start", + "request": { + "method": "POST", + "url": { + "raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{{yellowFoxPlanKey}}/start?triggerMode=EXECUTE&mode=INCREMENTAL_UPDATE&strategy=SOURCE_ROW_WATERMARK", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "eventhub", + "acquisition", + "yellowfox", + "d8", + "imports", + "configured-plans", + "{{yellowFoxPlanKey}}", + "start" + ], + "query": [ + { + "key": "triggerMode", + "value": "EXECUTE" + }, + { + "key": "mode", + "value": "INCREMENTAL_UPDATE" + }, + { + "key": "strategy", + "value": "SOURCE_ROW_WATERMARK" + } + ] + } + } + }, + { + "name": "POST /api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{planKey}/master-data/refresh", + "request": { + "method": "POST", + "url": { + "raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{{yellowFoxPlanKey}}/master-data/refresh?mode=INCREMENTAL_UPDATE&strategy=SOURCE_ROW_WATERMARK", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "eventhub", + "acquisition", + "yellowfox", + "d8", + "imports", + "configured-plans", + "{{yellowFoxPlanKey}}", + "master-data", + "refresh" + ], + "query": [ + { + "key": "mode", + "value": "INCREMENTAL_UPDATE" + }, + { + "key": "strategy", + "value": "SOURCE_ROW_WATERMARK" + } + ] + } + } } ] }, diff --git a/src/main/java/at/procon/eventhub/yellowfox/api/YellowFoxD8ImportController.java b/src/main/java/at/procon/eventhub/yellowfox/api/YellowFoxD8ImportController.java index 17aa698..3a8473e 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/api/YellowFoxD8ImportController.java +++ b/src/main/java/at/procon/eventhub/yellowfox/api/YellowFoxD8ImportController.java @@ -3,6 +3,7 @@ package at.procon.eventhub.yellowfox.api; import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.ImportMode; import at.procon.eventhub.dto.SchedulerTriggerMode; +import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult; import at.procon.eventhub.yellowfox.dto.ConfiguredYellowFoxD8ImportPlanDto; import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRunResultDto; @@ -10,6 +11,7 @@ import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportTriggerResultDto; import at.procon.eventhub.yellowfox.service.YellowFoxD8ConfiguredImportPlanService; import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportExecutionService; import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportPlanService; +import at.procon.eventhub.yellowfox.service.YellowFoxMasterDataRefreshService; import jakarta.validation.Valid; import java.time.OffsetDateTime; import java.util.List; @@ -31,17 +33,20 @@ public class YellowFoxD8ImportController { private final YellowFoxD8ImportPlanService importPlanService; private final YellowFoxD8ConfiguredImportPlanService configuredImportPlanService; private final YellowFoxD8ImportExecutionService importExecutionService; + private final YellowFoxMasterDataRefreshService masterDataRefreshService; public YellowFoxD8ImportController( ProducerTemplate producerTemplate, YellowFoxD8ImportPlanService importPlanService, YellowFoxD8ConfiguredImportPlanService configuredImportPlanService, - YellowFoxD8ImportExecutionService importExecutionService + YellowFoxD8ImportExecutionService importExecutionService, + YellowFoxMasterDataRefreshService masterDataRefreshService ) { this.producerTemplate = producerTemplate; this.importPlanService = importPlanService; this.configuredImportPlanService = configuredImportPlanService; this.importExecutionService = importExecutionService; + this.masterDataRefreshService = masterDataRefreshService; } @PostMapping("/imports/plan") @@ -60,6 +65,13 @@ public class YellowFoxD8ImportController { return ResponseEntity.accepted().body(result); } + @PostMapping("/master-data/refresh") + public ResponseEntity refreshYellowFoxMasterData( + @Valid @RequestBody YellowFoxD8ImportRequest request + ) { + return ResponseEntity.ok(masterDataRefreshService.refresh(request)); + } + @GetMapping("/imports/configured-plans") public ResponseEntity> listConfiguredYellowFoxPlans() { return ResponseEntity.ok(configuredImportPlanService.listPlans()); @@ -90,4 +102,14 @@ public class YellowFoxD8ImportController { result )); } + + @PostMapping("/imports/configured-plans/{planKey}/master-data/refresh") + public ResponseEntity refreshConfiguredYellowFoxMasterData( + @PathVariable String planKey, + @RequestParam(required = false) ImportMode mode, + @RequestParam(required = false) AcquisitionStrategy strategy + ) { + YellowFoxD8ImportRequest request = configuredImportPlanService.createRequest(planKey, mode, strategy); + return ResponseEntity.ok(masterDataRefreshService.refresh(request)); + } } diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java index 6fa62b2..e308b9e 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java @@ -54,14 +54,17 @@ public class YellowFoxD8BookingRowMapper { DriverRefDto driverRef = driverId == null && isBlank(driverCard) ? null - : new DriverRefDto(driverId == null ? null : driverId.toString(), new DriverCardRefDto(null, driverCard)); + : new DriverRefDto( + driverId == null ? null : driverId.toString(), + new DriverCardRefDto(YellowFoxReferenceSemantics.SYNTHETIC_REFERENCE_NATION, driverCard) + ); VehicleRefDto vehicleRef = vehicleId == null && isBlank(vehicleVin) && isBlank(vehicleVrn) ? null : new VehicleRefDto( vehicleId == null ? null : vehicleId.toString(), vehicleVin, vehicleId == null ? null : vehicleId.toString(), - new VehicleRegistrationRefDto(null, vehicleVrn) + new VehicleRegistrationRefDto(YellowFoxReferenceSemantics.SYNTHETIC_REFERENCE_NATION, vehicleVrn) ); return new YellowFoxD8BookingDto( diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanService.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanService.java index 502de8f..bc05399 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanService.java +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanService.java @@ -32,7 +32,7 @@ public class YellowFoxD8ConfiguredImportPlanService scope, plan.getEventFamilies(), mode, - false, + plan.isRefreshMasterDataFirst(), strategy ); } @@ -53,7 +53,7 @@ public class YellowFoxD8ConfiguredImportPlanService dto.scheduledMode(), dto.initialStrategy(), dto.scheduledStrategy(), - false, + dto.refreshMasterDataFirst(), dto.initialOccurredFrom(), dto.initialOccurredTo(), dto.runInitialOnStartup() diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportExecutionService.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportExecutionService.java index 8e925c7..189c425 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ImportExecutionService.java @@ -23,6 +23,7 @@ public class YellowFoxD8ImportExecutionService extends AbstractImportExecutionService { private final YellowFoxD8ImportPlanService planService; + private final YellowFoxMasterDataRefreshService masterDataRefreshService; private final YellowFoxD8ExtractionBatchExecutor extractionBatchExecutor; public YellowFoxD8ImportExecutionService( @@ -31,10 +32,12 @@ public class YellowFoxD8ImportExecutionService ImportRunRepository importRunRepository, DataPackageRepository dataPackageRepository, ImportCursorRepository importCursorRepository, + YellowFoxMasterDataRefreshService masterDataRefreshService, YellowFoxD8ExtractionBatchExecutor extractionBatchExecutor ) { super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository); this.planService = planService; + this.masterDataRefreshService = masterDataRefreshService; this.extractionBatchExecutor = extractionBatchExecutor; } @@ -64,6 +67,11 @@ public class YellowFoxD8ImportExecutionService return extractionBatchExecutor.execute(importRunId, packageId, eventSourceId, request, planItem, chunk); } + @Override + protected void beforeExecute(YellowFoxD8ImportRequest request) { + masterDataRefreshService.refreshIfRequested(request); + } + @Override protected EventSourceDto eventSourceForItem(EventSourceDto base, ImportPlanItemDto item) { return new EventSourceDto( diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxMasterDataRefreshService.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxMasterDataRefreshService.java new file mode 100644 index 0000000..1ea8069 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxMasterDataRefreshService.java @@ -0,0 +1,326 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult; +import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert; +import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert; +import at.procon.eventhub.persistence.DriverIdentityRepository; +import at.procon.eventhub.persistence.EventSourceRepository; +import at.procon.eventhub.persistence.SourceMasterDataRepository; +import at.procon.eventhub.persistence.VehicleIdentityRepository; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Service; +import org.springframework.util.StreamUtils; + +@Service +public class YellowFoxMasterDataRefreshService { + + private static final Logger log = LoggerFactory.getLogger(YellowFoxMasterDataRefreshService.class); + + private static final List ENTITY_SQL_RESOURCES = List.of( + "classpath:sql/yellowfox/master-data/fleets.sql", + "classpath:sql/yellowfox/master-data/drivers.sql", + "classpath:sql/yellowfox/master-data/driver-cards.sql", + "classpath:sql/yellowfox/master-data/vehicles.sql", + "classpath:sql/yellowfox/master-data/vehicle-registrations.sql", + "classpath:sql/yellowfox/master-data/telematic-providers.sql" + ); + + private static final String RELATIONS_SQL_RESOURCE = "classpath:sql/yellowfox/master-data/relations.sql"; + private static final String MASTER_DATA_SOURCE_KIND = "MASTER_DATA"; + private static final String MASTER_DATA_SOURCE_KEY = "YELLOWFOX_MASTER_DATA"; + private static final int MASTER_DATA_UPSERT_BATCH_SIZE = 5000; + + private final ObjectProvider yellowFoxJdbcTemplateProvider; + private final SourceMasterDataRepository sourceMasterDataRepository; + private final EventSourceRepository eventSourceRepository; + private final DriverIdentityRepository driverIdentityRepository; + private final VehicleIdentityRepository vehicleIdentityRepository; + private final ResourceLoader resourceLoader; + + public YellowFoxMasterDataRefreshService( + @Qualifier("yellowFoxNamedParameterJdbcTemplate") ObjectProvider yellowFoxJdbcTemplateProvider, + SourceMasterDataRepository sourceMasterDataRepository, + EventSourceRepository eventSourceRepository, + DriverIdentityRepository driverIdentityRepository, + VehicleIdentityRepository vehicleIdentityRepository, + ResourceLoader resourceLoader + ) { + this.yellowFoxJdbcTemplateProvider = yellowFoxJdbcTemplateProvider; + this.sourceMasterDataRepository = sourceMasterDataRepository; + this.eventSourceRepository = eventSourceRepository; + this.driverIdentityRepository = driverIdentityRepository; + this.vehicleIdentityRepository = vehicleIdentityRepository; + this.resourceLoader = resourceLoader; + } + + public MasterDataRefreshResult refreshIfRequested(YellowFoxD8ImportRequest request) { + if (!request.refreshMasterDataFirst()) { + return MasterDataRefreshResult.empty(); + } + return refresh(request); + } + + public MasterDataRefreshResult refresh(YellowFoxD8ImportRequest request) { + NamedParameterJdbcTemplate yellowFoxJdbcTemplate = yellowFoxJdbcTemplateProvider.getIfAvailable(); + if (yellowFoxJdbcTemplate == null) { + log.info("Skipping YellowFox master-data refresh for tenant={} because no YellowFox datasource is configured.", + request.tenantKey()); + return MasterDataRefreshResult.empty(); + } + + String tenantKey = request.tenantKey() == null || request.tenantKey().isBlank() ? "default" : request.tenantKey().trim(); + EventSourceDto masterDataSource = masterDataSourceFor(request.eventSource()); + int eventSourceId = eventSourceRepository.resolveSourceId(tenantKey, masterDataSource); + + log.info("Starting YellowFox source master-data refresh tenant={} source={} batchSize={}", + tenantKey, masterDataSource.stableKey(), MASTER_DATA_UPSERT_BATCH_SIZE); + + int entities = 0; + for (String sqlResource : ENTITY_SQL_RESOURCES) { + entities += streamEntities(yellowFoxJdbcTemplate, tenantKey, eventSourceId, sqlResource, loadSql(sqlResource)); + } + + int relationCount = streamRelations(yellowFoxJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE)); + + log.info("Reconciling YellowFox driver identities from source master data tenant={} source={}", + tenantKey, masterDataSource.stableKey()); + int reconciledDrivers = driverIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); + + log.info("Reconciling YellowFox vehicle identities from source master data tenant={} source={}", + tenantKey, masterDataSource.stableKey()); + int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); + + MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount); + log.info("Refreshed YellowFox source master data tenant={} source={} entities={} relations={} reconciledDrivers={} reconciledVehicles={}", + tenantKey, masterDataSource.stableKey(), result.entitiesUpserted(), result.relationsUpserted(), reconciledDrivers, reconciledVehicles); + return result; + } + + private int streamEntities( + NamedParameterJdbcTemplate yellowFoxJdbcTemplate, + String tenantKey, + int eventSourceId, + String sqlResource, + String sql + ) { + String section = masterDataSection(sqlResource); + List buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE); + Map typeCounts = new LinkedHashMap<>(); + int[] count = {0}; + int[] chunk = {0}; + + log.info("Reading YellowFox master-data entities tenant={} section={}", tenantKey, section); + yellowFoxJdbcTemplate.query(sql, Map.of(), rs -> { + SourceMasterEntityUpsert entity = entity(rs); + buffer.add(entity); + increment(typeCounts, entity.entityType()); + if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) { + count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts); + buffer.clear(); + } + }); + if (!buffer.isEmpty()) { + count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts); + } + log.info("Finished YellowFox master-data entities tenant={} section={} processed={} byType={}", + tenantKey, section, count[0], typeCounts); + return count[0]; + } + + private int streamRelations( + NamedParameterJdbcTemplate yellowFoxJdbcTemplate, + String tenantKey, + int eventSourceId, + String sqlResource, + String sql + ) { + String section = masterDataSection(sqlResource); + List buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE); + Map typeCounts = new LinkedHashMap<>(); + int[] count = {0}; + int[] chunk = {0}; + + log.info("Reading YellowFox master-data relations tenant={} section={}", tenantKey, section); + yellowFoxJdbcTemplate.query(sql, Map.of(), rs -> { + SourceMasterRelationUpsert relation = relation(rs); + buffer.add(relation); + increment(typeCounts, relation.relationType()); + if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) { + count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts); + buffer.clear(); + } + }); + if (!buffer.isEmpty()) { + count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts); + } + log.info("Finished YellowFox master-data relations tenant={} section={} processed={} byType={}", + tenantKey, section, count[0], typeCounts); + return count[0]; + } + + private int flushEntities( + String tenantKey, + int eventSourceId, + String section, + int chunk, + List buffer, + int alreadyProcessed, + Map typeCounts + ) { + int upserted = sourceMasterDataRepository.upsertEntities(tenantKey, eventSourceId, buffer); + log.info("YellowFox master-data entity progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}", + tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts); + return upserted; + } + + private int flushRelations( + String tenantKey, + int eventSourceId, + String section, + int chunk, + List buffer, + int alreadyProcessed, + Map typeCounts + ) { + int upserted = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, buffer); + log.info("YellowFox master-data relation progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}", + tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts); + return upserted; + } + + private void increment(Map counts, String type) { + String key = type == null || type.isBlank() ? "UNKNOWN" : type; + counts.merge(key, 1, Integer::sum); + } + + private String masterDataSection(String sqlResource) { + int slash = sqlResource.lastIndexOf('/'); + String fileName = slash < 0 ? sqlResource : sqlResource.substring(slash + 1); + return fileName.endsWith(".sql") ? fileName.substring(0, fileName.length() - 4) : fileName; + } + + private EventSourceDto masterDataSourceFor(EventSourceDto source) { + return new EventSourceDto( + source.providerKey(), + MASTER_DATA_SOURCE_KIND, + MASTER_DATA_SOURCE_KEY, + source.sourceInstanceKey(), + source.tenantProviderSettingKey(), + source.externalFleetKey() + ); + } + + private SourceMasterEntityUpsert entity(ResultSet rs) throws SQLException { + String entityType = string(rs, "entity_type"); + String sourceEntityId = string(rs, "source_entity_id"); + return new SourceMasterEntityUpsert( + entityType, + sourceEntityId, + string(rs, "source_external_key"), + string(rs, "display_name"), + bool(rs, "active"), + offsetDateTime(rs, "valid_from"), + offsetDateTime(rs, "valid_to"), + offsetDateTime(rs, "source_updated_at"), + payload(rs) + ); + } + + private SourceMasterRelationUpsert relation(ResultSet rs) throws SQLException { + return new SourceMasterRelationUpsert( + string(rs, "relation_type"), + string(rs, "from_entity_type"), + string(rs, "from_source_entity_id"), + string(rs, "to_entity_type"), + string(rs, "to_source_entity_id"), + offsetDateTime(rs, "valid_from"), + offsetDateTime(rs, "valid_to"), + offsetDateTime(rs, "source_updated_at"), + payload(rs) + ); + } + + private Map payload(ResultSet rs) throws SQLException { + ResultSetMetaData metaData = rs.getMetaData(); + Map payload = new LinkedHashMap<>(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + String name = metaData.getColumnLabel(i); + Object value = rs.getObject(i); + if (value != null) { + payload.put(name, value); + } + } + return payload; + } + + private String loadSql(String location) { + Resource resource = resourceLoader.getResource(location); + try (var inputStream = resource.getInputStream()) { + return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new IllegalStateException("Cannot load YellowFox master-data SQL resource " + location, e); + } + } + + private String string(ResultSet rs, String column) throws SQLException { + String value = rs.getString(column); + return value == null || value.isBlank() ? null : value.trim(); + } + + private Boolean bool(ResultSet rs, String column) throws SQLException { + Object value = rs.getObject(column); + if (value == null) { + return null; + } + if (value instanceof Boolean bool) { + return bool; + } + if (value instanceof Number number) { + return number.intValue() != 0; + } + return Boolean.parseBoolean(value.toString()); + } + + private OffsetDateTime offsetDateTime(ResultSet rs, String column) throws SQLException { + Object value = rs.getObject(column); + if (value == null) { + return null; + } + if (value instanceof OffsetDateTime offsetDateTime) { + return offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC); + } + if (value instanceof Timestamp timestamp) { + return timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC); + } + if (value instanceof LocalDateTime localDateTime) { + return localDateTime.atOffset(ZoneOffset.UTC); + } + String text = value.toString(); + try { + return OffsetDateTime.parse(text).withOffsetSameInstant(ZoneOffset.UTC); + } catch (RuntimeException ignored) { + return LocalDateTime.parse(text).atOffset(ZoneOffset.UTC); + } + } +} diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxReferenceSemantics.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxReferenceSemantics.java new file mode 100644 index 0000000..3003bd4 --- /dev/null +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxReferenceSemantics.java @@ -0,0 +1,9 @@ +package at.procon.eventhub.yellowfox.service; + +final class YellowFoxReferenceSemantics { + + static final String SYNTHETIC_REFERENCE_NATION = "YELLOWFOX"; + + private YellowFoxReferenceSemantics() { + } +} diff --git a/src/main/resources/sql/yellowfox/master-data/driver-cards.sql b/src/main/resources/sql/yellowfox/master-data/driver-cards.sql new file mode 100644 index 0000000..76839a5 --- /dev/null +++ b/src/main/resources/sql/yellowfox/master-data/driver-cards.sql @@ -0,0 +1,15 @@ +select + 'DRIVER_CARD' as entity_type, + cast(d.id as varchar(128)) as source_entity_id, + concat('YELLOWFOX:', d.drivers_card) as source_external_key, + d.drivers_card as display_name, + true as active, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + d.id as driver_id, + d.drivers_card as card_number, + 'YELLOWFOX' as card_nation, + d.fleet_id as fleet_id +from data.driver d +where nullif(trim(d.drivers_card), '') is not null; diff --git a/src/main/resources/sql/yellowfox/master-data/drivers.sql b/src/main/resources/sql/yellowfox/master-data/drivers.sql new file mode 100644 index 0000000..d2799db --- /dev/null +++ b/src/main/resources/sql/yellowfox/master-data/drivers.sql @@ -0,0 +1,14 @@ +select + 'DRIVER' as entity_type, + cast(d.id as varchar(128)) as source_entity_id, + coalesce(nullif(trim(d.drivers_card), ''), cast(d.id as varchar(128))) as source_external_key, + nullif(trim(concat_ws(' ', d.firstname, d.name)), '') as display_name, + true as active, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + d.id as driver_id, + d.firstname as first_names, + d.name as last_name, + d.fleet_id as fleet_id +from data.driver d; diff --git a/src/main/resources/sql/yellowfox/master-data/fleets.sql b/src/main/resources/sql/yellowfox/master-data/fleets.sql new file mode 100644 index 0000000..7d6bc42 --- /dev/null +++ b/src/main/resources/sql/yellowfox/master-data/fleets.sql @@ -0,0 +1,12 @@ +select + 'FLEET' as entity_type, + cast(f.id as varchar(128)) as source_entity_id, + cast(f.id as varchar(128)) as source_external_key, + nullif(trim(f.name), '') as display_name, + true as active, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + f.id as fleet_id, + f.name as fleet_name +from data.fleet f; diff --git a/src/main/resources/sql/yellowfox/master-data/relations.sql b/src/main/resources/sql/yellowfox/master-data/relations.sql new file mode 100644 index 0000000..055b485 --- /dev/null +++ b/src/main/resources/sql/yellowfox/master-data/relations.sql @@ -0,0 +1,77 @@ +select + 'DRIVER_FLEET' as relation_type, + 'DRIVER' as from_entity_type, + cast(d.id as varchar(128)) as from_source_entity_id, + 'FLEET' as to_entity_type, + cast(d.fleet_id as varchar(128)) as to_source_entity_id, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + 'data.driver' as source_table, + cast(d.id as varchar(128)) as source_row_id +from data.driver d +where d.fleet_id is not null + +union all + +select + 'DRIVER_CARD_DRIVER' as relation_type, + 'DRIVER_CARD' as from_entity_type, + cast(d.id as varchar(128)) as from_source_entity_id, + 'DRIVER' as to_entity_type, + cast(d.id as varchar(128)) as to_source_entity_id, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + 'data.driver' as source_table, + cast(d.id as varchar(128)) as source_row_id +from data.driver d +where nullif(trim(d.drivers_card), '') is not null + +union all + +select + 'VEHICLE_FLEET' as relation_type, + 'VEHICLE' as from_entity_type, + cast(v.id as varchar(128)) as from_source_entity_id, + 'FLEET' as to_entity_type, + cast(v.fleet_id as varchar(128)) as to_source_entity_id, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + 'data.vehicle' as source_table, + cast(v.id as varchar(128)) as source_row_id +from data.vehicle v +where v.fleet_id is not null + +union all + +select + 'VEHICLE_REGISTRATION_VEHICLE' as relation_type, + 'VEHICLE_REGISTRATION' as from_entity_type, + cast(v.id as varchar(128)) as from_source_entity_id, + 'VEHICLE' as to_entity_type, + cast(v.id as varchar(128)) as to_source_entity_id, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + 'data.vehicle' as source_table, + cast(v.id as varchar(128)) as source_row_id +from data.vehicle v +where nullif(trim(v.vrn), '') is not null + +union all + +select + 'VEHICLE_TELEMATIC_PROVIDER' as relation_type, + 'VEHICLE' as from_entity_type, + cast(v.id as varchar(128)) as from_source_entity_id, + 'TELEMATIC_PROVIDER' as to_entity_type, + cast(v.telematic_provider_id as varchar(128)) as to_source_entity_id, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + 'data.vehicle' as source_table, + cast(v.id as varchar(128)) as source_row_id +from data.vehicle v +where v.telematic_provider_id is not null; diff --git a/src/main/resources/sql/yellowfox/master-data/telematic-providers.sql b/src/main/resources/sql/yellowfox/master-data/telematic-providers.sql new file mode 100644 index 0000000..e7f5ab9 --- /dev/null +++ b/src/main/resources/sql/yellowfox/master-data/telematic-providers.sql @@ -0,0 +1,12 @@ +select + 'TELEMATIC_PROVIDER' as entity_type, + cast(tp.id as varchar(128)) as source_entity_id, + cast(tp.id as varchar(128)) as source_external_key, + nullif(trim(tp.name), '') as display_name, + true as active, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + tp.id as telematic_provider_id, + tp.name as telematic_provider_name +from data.telematic_provider tp; diff --git a/src/main/resources/sql/yellowfox/master-data/vehicle-registrations.sql b/src/main/resources/sql/yellowfox/master-data/vehicle-registrations.sql new file mode 100644 index 0000000..f218931 --- /dev/null +++ b/src/main/resources/sql/yellowfox/master-data/vehicle-registrations.sql @@ -0,0 +1,16 @@ +select + 'VEHICLE_REGISTRATION' as entity_type, + cast(v.id as varchar(128)) as source_entity_id, + concat('YELLOWFOX:', v.vrn) as source_external_key, + v.vrn as display_name, + true as active, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + v.id as vehicle_id, + v.vin as vin, + 'YELLOWFOX' as registration_nation, + v.vrn as registration_number, + v.fleet_id as fleet_id +from data.vehicle v +where nullif(trim(v.vrn), '') is not null; diff --git a/src/main/resources/sql/yellowfox/master-data/vehicles.sql b/src/main/resources/sql/yellowfox/master-data/vehicles.sql new file mode 100644 index 0000000..de08ff9 --- /dev/null +++ b/src/main/resources/sql/yellowfox/master-data/vehicles.sql @@ -0,0 +1,16 @@ +select + 'VEHICLE' as entity_type, + cast(v.id as varchar(128)) as source_entity_id, + coalesce(nullif(trim(v.vin), ''), cast(v.id as varchar(128))) as source_external_key, + coalesce(nullif(trim(v.vrn), ''), nullif(trim(v.vin), ''), cast(v.id as varchar(128))) as display_name, + true as active, + null::timestamptz as valid_from, + null::timestamptz as valid_to, + null::timestamptz as source_updated_at, + v.id as vehicle_id, + v.vin as vin, + v.vrn as registration_number, + 'YELLOWFOX' as registration_nation, + v.fleet_id as fleet_id, + v.telematic_provider_id as telematic_provider_id +from data.vehicle v;