Add YellowFox master-data refresh flow
This commit is contained in:
parent
de9c884578
commit
519711b214
|
|
@ -12,6 +12,10 @@
|
|||
{
|
||||
"key": "planKey",
|
||||
"value": "kralowetz-tachograph-org-147"
|
||||
},
|
||||
{
|
||||
"key": "yellowFoxPlanKey",
|
||||
"value": "yellowfox-d8-default"
|
||||
}
|
||||
],
|
||||
"item": [
|
||||
|
|
@ -376,6 +380,219 @@
|
|||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "POST /api/eventhub/acquisition/yellowfox/d8/imports/plan",
|
||||
"request": {
|
||||
"method": "POST",
|
||||
"header": [
|
||||
{
|
||||
"key": "Content-Type",
|
||||
"value": "application/json"
|
||||
}
|
||||
],
|
||||
"body": {
|
||||
"mode": "raw",
|
||||
"raw": "{\n \"tenantKey\": \"Procon\",\n \"eventSource\": {\n \"providerKey\": \"YELLOWFOX\",\n \"sourceKind\": \"TELEMATICS_PLATFORM\",\n \"sourceKey\": \"YELLOWFOX_D8\",\n \"sourceInstanceKey\": \"logistics-db-prod\",\n \"tenantProviderSettingKey\": \"yellowfox-main\",\n \"externalFleetKey\": null\n },\n \"sourceGroup\": null,\n \"importScope\": {\n \"type\": \"TENANT_ALL\",\n \"rootSourceOrganisation\": null,\n \"includeChildren\": false,\n \"occurredFrom\": null,\n \"occurredTo\": null\n },\n \"eventFamilies\": [\n \"DRIVER_ACTIVITY\",\n \"DRIVER_CARD\"\n ],\n \"mode\": \"INCREMENTAL_UPDATE\",\n \"refreshMasterDataFirst\": false,\n \"acquisitionStrategy\": \"SOURCE_ROW_WATERMARK\"\n}"
|
||||
},
|
||||
"url": {
|
||||
"raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/plan",
|
||||
"host": [
|
||||
"{{baseUrl}}"
|
||||
],
|
||||
"path": [
|
||||
"api",
|
||||
"eventhub",
|
||||
"acquisition",
|
||||
"yellowfox",
|
||||
"d8",
|
||||
"imports",
|
||||
"plan"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "POST /api/eventhub/acquisition/yellowfox/d8/imports/start?execute=true",
|
||||
"request": {
|
||||
"method": "POST",
|
||||
"header": [
|
||||
{
|
||||
"key": "Content-Type",
|
||||
"value": "application/json"
|
||||
}
|
||||
],
|
||||
"body": {
|
||||
"mode": "raw",
|
||||
"raw": "{\n \"tenantKey\": \"Procon\",\n \"eventSource\": {\n \"providerKey\": \"YELLOWFOX\",\n \"sourceKind\": \"TELEMATICS_PLATFORM\",\n \"sourceKey\": \"YELLOWFOX_D8\",\n \"sourceInstanceKey\": \"logistics-db-prod\",\n \"tenantProviderSettingKey\": \"yellowfox-main\",\n \"externalFleetKey\": null\n },\n \"sourceGroup\": null,\n \"importScope\": {\n \"type\": \"TENANT_ALL\",\n \"rootSourceOrganisation\": null,\n \"includeChildren\": false,\n \"occurredFrom\": null,\n \"occurredTo\": null\n },\n \"eventFamilies\": [\n \"DRIVER_ACTIVITY\",\n \"DRIVER_CARD\"\n ],\n \"mode\": \"INCREMENTAL_UPDATE\",\n \"refreshMasterDataFirst\": false,\n \"acquisitionStrategy\": \"SOURCE_ROW_WATERMARK\"\n}"
|
||||
},
|
||||
"url": {
|
||||
"raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/start?execute=true",
|
||||
"host": [
|
||||
"{{baseUrl}}"
|
||||
],
|
||||
"path": [
|
||||
"api",
|
||||
"eventhub",
|
||||
"acquisition",
|
||||
"yellowfox",
|
||||
"d8",
|
||||
"imports",
|
||||
"start"
|
||||
],
|
||||
"query": [
|
||||
{
|
||||
"key": "execute",
|
||||
"value": "true"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "POST /api/eventhub/acquisition/yellowfox/d8/master-data/refresh",
|
||||
"request": {
|
||||
"method": "POST",
|
||||
"header": [
|
||||
{
|
||||
"key": "Content-Type",
|
||||
"value": "application/json"
|
||||
}
|
||||
],
|
||||
"body": {
|
||||
"mode": "raw",
|
||||
"raw": "{\n \"tenantKey\": \"Procon\",\n \"eventSource\": {\n \"providerKey\": \"YELLOWFOX\",\n \"sourceKind\": \"TELEMATICS_PLATFORM\",\n \"sourceKey\": \"YELLOWFOX_D8\",\n \"sourceInstanceKey\": \"logistics-db-prod\",\n \"tenantProviderSettingKey\": \"yellowfox-main\",\n \"externalFleetKey\": null\n },\n \"sourceGroup\": null,\n \"importScope\": {\n \"type\": \"TENANT_ALL\",\n \"rootSourceOrganisation\": null,\n \"includeChildren\": false,\n \"occurredFrom\": null,\n \"occurredTo\": null\n },\n \"eventFamilies\": [\n \"DRIVER_ACTIVITY\",\n \"DRIVER_CARD\"\n ],\n \"mode\": \"INCREMENTAL_UPDATE\",\n \"refreshMasterDataFirst\": true,\n \"acquisitionStrategy\": \"SOURCE_ROW_WATERMARK\"\n}"
|
||||
},
|
||||
"url": {
|
||||
"raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/master-data/refresh",
|
||||
"host": [
|
||||
"{{baseUrl}}"
|
||||
],
|
||||
"path": [
|
||||
"api",
|
||||
"eventhub",
|
||||
"acquisition",
|
||||
"yellowfox",
|
||||
"d8",
|
||||
"master-data",
|
||||
"refresh"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "GET /api/eventhub/acquisition/yellowfox/d8/imports/configured-plans",
|
||||
"request": {
|
||||
"method": "GET",
|
||||
"url": {
|
||||
"raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/configured-plans",
|
||||
"host": [
|
||||
"{{baseUrl}}"
|
||||
],
|
||||
"path": [
|
||||
"api",
|
||||
"eventhub",
|
||||
"acquisition",
|
||||
"yellowfox",
|
||||
"d8",
|
||||
"imports",
|
||||
"configured-plans"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "GET /api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{planKey}",
|
||||
"request": {
|
||||
"method": "GET",
|
||||
"url": {
|
||||
"raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{{yellowFoxPlanKey}}",
|
||||
"host": [
|
||||
"{{baseUrl}}"
|
||||
],
|
||||
"path": [
|
||||
"api",
|
||||
"eventhub",
|
||||
"acquisition",
|
||||
"yellowfox",
|
||||
"d8",
|
||||
"imports",
|
||||
"configured-plans",
|
||||
"{{yellowFoxPlanKey}}"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "POST /api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{planKey}/start",
|
||||
"request": {
|
||||
"method": "POST",
|
||||
"url": {
|
||||
"raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{{yellowFoxPlanKey}}/start?triggerMode=EXECUTE&mode=INCREMENTAL_UPDATE&strategy=SOURCE_ROW_WATERMARK",
|
||||
"host": [
|
||||
"{{baseUrl}}"
|
||||
],
|
||||
"path": [
|
||||
"api",
|
||||
"eventhub",
|
||||
"acquisition",
|
||||
"yellowfox",
|
||||
"d8",
|
||||
"imports",
|
||||
"configured-plans",
|
||||
"{{yellowFoxPlanKey}}",
|
||||
"start"
|
||||
],
|
||||
"query": [
|
||||
{
|
||||
"key": "triggerMode",
|
||||
"value": "EXECUTE"
|
||||
},
|
||||
{
|
||||
"key": "mode",
|
||||
"value": "INCREMENTAL_UPDATE"
|
||||
},
|
||||
{
|
||||
"key": "strategy",
|
||||
"value": "SOURCE_ROW_WATERMARK"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "POST /api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{planKey}/master-data/refresh",
|
||||
"request": {
|
||||
"method": "POST",
|
||||
"url": {
|
||||
"raw": "{{baseUrl}}/api/eventhub/acquisition/yellowfox/d8/imports/configured-plans/{{yellowFoxPlanKey}}/master-data/refresh?mode=INCREMENTAL_UPDATE&strategy=SOURCE_ROW_WATERMARK",
|
||||
"host": [
|
||||
"{{baseUrl}}"
|
||||
],
|
||||
"path": [
|
||||
"api",
|
||||
"eventhub",
|
||||
"acquisition",
|
||||
"yellowfox",
|
||||
"d8",
|
||||
"imports",
|
||||
"configured-plans",
|
||||
"{{yellowFoxPlanKey}}",
|
||||
"master-data",
|
||||
"refresh"
|
||||
],
|
||||
"query": [
|
||||
{
|
||||
"key": "mode",
|
||||
"value": "INCREMENTAL_UPDATE"
|
||||
},
|
||||
{
|
||||
"key": "strategy",
|
||||
"value": "SOURCE_ROW_WATERMARK"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package at.procon.eventhub.yellowfox.api;
|
|||
import at.procon.eventhub.dto.AcquisitionStrategy;
|
||||
import at.procon.eventhub.dto.ImportMode;
|
||||
import at.procon.eventhub.dto.SchedulerTriggerMode;
|
||||
import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult;
|
||||
import at.procon.eventhub.yellowfox.dto.ConfiguredYellowFoxD8ImportPlanDto;
|
||||
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
|
||||
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRunResultDto;
|
||||
|
|
@ -10,6 +11,7 @@ import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportTriggerResultDto;
|
|||
import at.procon.eventhub.yellowfox.service.YellowFoxD8ConfiguredImportPlanService;
|
||||
import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportExecutionService;
|
||||
import at.procon.eventhub.yellowfox.service.YellowFoxD8ImportPlanService;
|
||||
import at.procon.eventhub.yellowfox.service.YellowFoxMasterDataRefreshService;
|
||||
import jakarta.validation.Valid;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.List;
|
||||
|
|
@ -31,17 +33,20 @@ public class YellowFoxD8ImportController {
|
|||
private final YellowFoxD8ImportPlanService importPlanService;
|
||||
private final YellowFoxD8ConfiguredImportPlanService configuredImportPlanService;
|
||||
private final YellowFoxD8ImportExecutionService importExecutionService;
|
||||
private final YellowFoxMasterDataRefreshService masterDataRefreshService;
|
||||
|
||||
public YellowFoxD8ImportController(
|
||||
ProducerTemplate producerTemplate,
|
||||
YellowFoxD8ImportPlanService importPlanService,
|
||||
YellowFoxD8ConfiguredImportPlanService configuredImportPlanService,
|
||||
YellowFoxD8ImportExecutionService importExecutionService
|
||||
YellowFoxD8ImportExecutionService importExecutionService,
|
||||
YellowFoxMasterDataRefreshService masterDataRefreshService
|
||||
) {
|
||||
this.producerTemplate = producerTemplate;
|
||||
this.importPlanService = importPlanService;
|
||||
this.configuredImportPlanService = configuredImportPlanService;
|
||||
this.importExecutionService = importExecutionService;
|
||||
this.masterDataRefreshService = masterDataRefreshService;
|
||||
}
|
||||
|
||||
@PostMapping("/imports/plan")
|
||||
|
|
@ -60,6 +65,13 @@ public class YellowFoxD8ImportController {
|
|||
return ResponseEntity.accepted().body(result);
|
||||
}
|
||||
|
||||
@PostMapping("/master-data/refresh")
|
||||
public ResponseEntity<MasterDataRefreshResult> refreshYellowFoxMasterData(
|
||||
@Valid @RequestBody YellowFoxD8ImportRequest request
|
||||
) {
|
||||
return ResponseEntity.ok(masterDataRefreshService.refresh(request));
|
||||
}
|
||||
|
||||
@GetMapping("/imports/configured-plans")
|
||||
public ResponseEntity<List<ConfiguredYellowFoxD8ImportPlanDto>> listConfiguredYellowFoxPlans() {
|
||||
return ResponseEntity.ok(configuredImportPlanService.listPlans());
|
||||
|
|
@ -90,4 +102,14 @@ public class YellowFoxD8ImportController {
|
|||
result
|
||||
));
|
||||
}
|
||||
|
||||
@PostMapping("/imports/configured-plans/{planKey}/master-data/refresh")
|
||||
public ResponseEntity<MasterDataRefreshResult> refreshConfiguredYellowFoxMasterData(
|
||||
@PathVariable String planKey,
|
||||
@RequestParam(required = false) ImportMode mode,
|
||||
@RequestParam(required = false) AcquisitionStrategy strategy
|
||||
) {
|
||||
YellowFoxD8ImportRequest request = configuredImportPlanService.createRequest(planKey, mode, strategy);
|
||||
return ResponseEntity.ok(masterDataRefreshService.refresh(request));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -54,14 +54,17 @@ public class YellowFoxD8BookingRowMapper {
|
|||
|
||||
DriverRefDto driverRef = driverId == null && isBlank(driverCard)
|
||||
? null
|
||||
: new DriverRefDto(driverId == null ? null : driverId.toString(), new DriverCardRefDto(null, driverCard));
|
||||
: new DriverRefDto(
|
||||
driverId == null ? null : driverId.toString(),
|
||||
new DriverCardRefDto(YellowFoxReferenceSemantics.SYNTHETIC_REFERENCE_NATION, driverCard)
|
||||
);
|
||||
VehicleRefDto vehicleRef = vehicleId == null && isBlank(vehicleVin) && isBlank(vehicleVrn)
|
||||
? null
|
||||
: new VehicleRefDto(
|
||||
vehicleId == null ? null : vehicleId.toString(),
|
||||
vehicleVin,
|
||||
vehicleId == null ? null : vehicleId.toString(),
|
||||
new VehicleRegistrationRefDto(null, vehicleVrn)
|
||||
new VehicleRegistrationRefDto(YellowFoxReferenceSemantics.SYNTHETIC_REFERENCE_NATION, vehicleVrn)
|
||||
);
|
||||
|
||||
return new YellowFoxD8BookingDto(
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ public class YellowFoxD8ConfiguredImportPlanService
|
|||
scope,
|
||||
plan.getEventFamilies(),
|
||||
mode,
|
||||
false,
|
||||
plan.isRefreshMasterDataFirst(),
|
||||
strategy
|
||||
);
|
||||
}
|
||||
|
|
@ -53,7 +53,7 @@ public class YellowFoxD8ConfiguredImportPlanService
|
|||
dto.scheduledMode(),
|
||||
dto.initialStrategy(),
|
||||
dto.scheduledStrategy(),
|
||||
false,
|
||||
dto.refreshMasterDataFirst(),
|
||||
dto.initialOccurredFrom(),
|
||||
dto.initialOccurredTo(),
|
||||
dto.runInitialOnStartup()
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ public class YellowFoxD8ImportExecutionService
|
|||
extends AbstractImportExecutionService<YellowFoxD8ImportRequest, YellowFoxD8ExtractionBatchResultDto> {
|
||||
|
||||
private final YellowFoxD8ImportPlanService planService;
|
||||
private final YellowFoxMasterDataRefreshService masterDataRefreshService;
|
||||
private final YellowFoxD8ExtractionBatchExecutor extractionBatchExecutor;
|
||||
|
||||
public YellowFoxD8ImportExecutionService(
|
||||
|
|
@ -31,10 +32,12 @@ public class YellowFoxD8ImportExecutionService
|
|||
ImportRunRepository importRunRepository,
|
||||
DataPackageRepository dataPackageRepository,
|
||||
ImportCursorRepository importCursorRepository,
|
||||
YellowFoxMasterDataRefreshService masterDataRefreshService,
|
||||
YellowFoxD8ExtractionBatchExecutor extractionBatchExecutor
|
||||
) {
|
||||
super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository);
|
||||
this.planService = planService;
|
||||
this.masterDataRefreshService = masterDataRefreshService;
|
||||
this.extractionBatchExecutor = extractionBatchExecutor;
|
||||
}
|
||||
|
||||
|
|
@ -64,6 +67,11 @@ public class YellowFoxD8ImportExecutionService
|
|||
return extractionBatchExecutor.execute(importRunId, packageId, eventSourceId, request, planItem, chunk);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeExecute(YellowFoxD8ImportRequest request) {
|
||||
masterDataRefreshService.refreshIfRequested(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventSourceDto eventSourceForItem(EventSourceDto base, ImportPlanItemDto item) {
|
||||
return new EventSourceDto(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,326 @@
|
|||
package at.procon.eventhub.yellowfox.service;
|
||||
|
||||
import at.procon.eventhub.dto.EventSourceDto;
|
||||
import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult;
|
||||
import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert;
|
||||
import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert;
|
||||
import at.procon.eventhub.persistence.DriverIdentityRepository;
|
||||
import at.procon.eventhub.persistence.EventSourceRepository;
|
||||
import at.procon.eventhub.persistence.SourceMasterDataRepository;
|
||||
import at.procon.eventhub.persistence.VehicleIdentityRepository;
|
||||
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.core.io.ResourceLoader;
|
||||
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StreamUtils;
|
||||
|
||||
@Service
|
||||
public class YellowFoxMasterDataRefreshService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(YellowFoxMasterDataRefreshService.class);
|
||||
|
||||
private static final List<String> ENTITY_SQL_RESOURCES = List.of(
|
||||
"classpath:sql/yellowfox/master-data/fleets.sql",
|
||||
"classpath:sql/yellowfox/master-data/drivers.sql",
|
||||
"classpath:sql/yellowfox/master-data/driver-cards.sql",
|
||||
"classpath:sql/yellowfox/master-data/vehicles.sql",
|
||||
"classpath:sql/yellowfox/master-data/vehicle-registrations.sql",
|
||||
"classpath:sql/yellowfox/master-data/telematic-providers.sql"
|
||||
);
|
||||
|
||||
private static final String RELATIONS_SQL_RESOURCE = "classpath:sql/yellowfox/master-data/relations.sql";
|
||||
private static final String MASTER_DATA_SOURCE_KIND = "MASTER_DATA";
|
||||
private static final String MASTER_DATA_SOURCE_KEY = "YELLOWFOX_MASTER_DATA";
|
||||
private static final int MASTER_DATA_UPSERT_BATCH_SIZE = 5000;
|
||||
|
||||
private final ObjectProvider<NamedParameterJdbcTemplate> yellowFoxJdbcTemplateProvider;
|
||||
private final SourceMasterDataRepository sourceMasterDataRepository;
|
||||
private final EventSourceRepository eventSourceRepository;
|
||||
private final DriverIdentityRepository driverIdentityRepository;
|
||||
private final VehicleIdentityRepository vehicleIdentityRepository;
|
||||
private final ResourceLoader resourceLoader;
|
||||
|
||||
public YellowFoxMasterDataRefreshService(
|
||||
@Qualifier("yellowFoxNamedParameterJdbcTemplate") ObjectProvider<NamedParameterJdbcTemplate> yellowFoxJdbcTemplateProvider,
|
||||
SourceMasterDataRepository sourceMasterDataRepository,
|
||||
EventSourceRepository eventSourceRepository,
|
||||
DriverIdentityRepository driverIdentityRepository,
|
||||
VehicleIdentityRepository vehicleIdentityRepository,
|
||||
ResourceLoader resourceLoader
|
||||
) {
|
||||
this.yellowFoxJdbcTemplateProvider = yellowFoxJdbcTemplateProvider;
|
||||
this.sourceMasterDataRepository = sourceMasterDataRepository;
|
||||
this.eventSourceRepository = eventSourceRepository;
|
||||
this.driverIdentityRepository = driverIdentityRepository;
|
||||
this.vehicleIdentityRepository = vehicleIdentityRepository;
|
||||
this.resourceLoader = resourceLoader;
|
||||
}
|
||||
|
||||
public MasterDataRefreshResult refreshIfRequested(YellowFoxD8ImportRequest request) {
|
||||
if (!request.refreshMasterDataFirst()) {
|
||||
return MasterDataRefreshResult.empty();
|
||||
}
|
||||
return refresh(request);
|
||||
}
|
||||
|
||||
public MasterDataRefreshResult refresh(YellowFoxD8ImportRequest request) {
|
||||
NamedParameterJdbcTemplate yellowFoxJdbcTemplate = yellowFoxJdbcTemplateProvider.getIfAvailable();
|
||||
if (yellowFoxJdbcTemplate == null) {
|
||||
log.info("Skipping YellowFox master-data refresh for tenant={} because no YellowFox datasource is configured.",
|
||||
request.tenantKey());
|
||||
return MasterDataRefreshResult.empty();
|
||||
}
|
||||
|
||||
String tenantKey = request.tenantKey() == null || request.tenantKey().isBlank() ? "default" : request.tenantKey().trim();
|
||||
EventSourceDto masterDataSource = masterDataSourceFor(request.eventSource());
|
||||
int eventSourceId = eventSourceRepository.resolveSourceId(tenantKey, masterDataSource);
|
||||
|
||||
log.info("Starting YellowFox source master-data refresh tenant={} source={} batchSize={}",
|
||||
tenantKey, masterDataSource.stableKey(), MASTER_DATA_UPSERT_BATCH_SIZE);
|
||||
|
||||
int entities = 0;
|
||||
for (String sqlResource : ENTITY_SQL_RESOURCES) {
|
||||
entities += streamEntities(yellowFoxJdbcTemplate, tenantKey, eventSourceId, sqlResource, loadSql(sqlResource));
|
||||
}
|
||||
|
||||
int relationCount = streamRelations(yellowFoxJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE));
|
||||
|
||||
log.info("Reconciling YellowFox driver identities from source master data tenant={} source={}",
|
||||
tenantKey, masterDataSource.stableKey());
|
||||
int reconciledDrivers = driverIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
|
||||
|
||||
log.info("Reconciling YellowFox vehicle identities from source master data tenant={} source={}",
|
||||
tenantKey, masterDataSource.stableKey());
|
||||
int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
|
||||
|
||||
MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount);
|
||||
log.info("Refreshed YellowFox source master data tenant={} source={} entities={} relations={} reconciledDrivers={} reconciledVehicles={}",
|
||||
tenantKey, masterDataSource.stableKey(), result.entitiesUpserted(), result.relationsUpserted(), reconciledDrivers, reconciledVehicles);
|
||||
return result;
|
||||
}
|
||||
|
||||
private int streamEntities(
|
||||
NamedParameterJdbcTemplate yellowFoxJdbcTemplate,
|
||||
String tenantKey,
|
||||
int eventSourceId,
|
||||
String sqlResource,
|
||||
String sql
|
||||
) {
|
||||
String section = masterDataSection(sqlResource);
|
||||
List<SourceMasterEntityUpsert> buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE);
|
||||
Map<String, Integer> typeCounts = new LinkedHashMap<>();
|
||||
int[] count = {0};
|
||||
int[] chunk = {0};
|
||||
|
||||
log.info("Reading YellowFox master-data entities tenant={} section={}", tenantKey, section);
|
||||
yellowFoxJdbcTemplate.query(sql, Map.of(), rs -> {
|
||||
SourceMasterEntityUpsert entity = entity(rs);
|
||||
buffer.add(entity);
|
||||
increment(typeCounts, entity.entityType());
|
||||
if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) {
|
||||
count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
|
||||
buffer.clear();
|
||||
}
|
||||
});
|
||||
if (!buffer.isEmpty()) {
|
||||
count[0] += flushEntities(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
|
||||
}
|
||||
log.info("Finished YellowFox master-data entities tenant={} section={} processed={} byType={}",
|
||||
tenantKey, section, count[0], typeCounts);
|
||||
return count[0];
|
||||
}
|
||||
|
||||
private int streamRelations(
|
||||
NamedParameterJdbcTemplate yellowFoxJdbcTemplate,
|
||||
String tenantKey,
|
||||
int eventSourceId,
|
||||
String sqlResource,
|
||||
String sql
|
||||
) {
|
||||
String section = masterDataSection(sqlResource);
|
||||
List<SourceMasterRelationUpsert> buffer = new ArrayList<>(MASTER_DATA_UPSERT_BATCH_SIZE);
|
||||
Map<String, Integer> typeCounts = new LinkedHashMap<>();
|
||||
int[] count = {0};
|
||||
int[] chunk = {0};
|
||||
|
||||
log.info("Reading YellowFox master-data relations tenant={} section={}", tenantKey, section);
|
||||
yellowFoxJdbcTemplate.query(sql, Map.of(), rs -> {
|
||||
SourceMasterRelationUpsert relation = relation(rs);
|
||||
buffer.add(relation);
|
||||
increment(typeCounts, relation.relationType());
|
||||
if (buffer.size() >= MASTER_DATA_UPSERT_BATCH_SIZE) {
|
||||
count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
|
||||
buffer.clear();
|
||||
}
|
||||
});
|
||||
if (!buffer.isEmpty()) {
|
||||
count[0] += flushRelations(tenantKey, eventSourceId, section, ++chunk[0], buffer, count[0], typeCounts);
|
||||
}
|
||||
log.info("Finished YellowFox master-data relations tenant={} section={} processed={} byType={}",
|
||||
tenantKey, section, count[0], typeCounts);
|
||||
return count[0];
|
||||
}
|
||||
|
||||
private int flushEntities(
|
||||
String tenantKey,
|
||||
int eventSourceId,
|
||||
String section,
|
||||
int chunk,
|
||||
List<SourceMasterEntityUpsert> buffer,
|
||||
int alreadyProcessed,
|
||||
Map<String, Integer> typeCounts
|
||||
) {
|
||||
int upserted = sourceMasterDataRepository.upsertEntities(tenantKey, eventSourceId, buffer);
|
||||
log.info("YellowFox master-data entity progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}",
|
||||
tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts);
|
||||
return upserted;
|
||||
}
|
||||
|
||||
private int flushRelations(
|
||||
String tenantKey,
|
||||
int eventSourceId,
|
||||
String section,
|
||||
int chunk,
|
||||
List<SourceMasterRelationUpsert> buffer,
|
||||
int alreadyProcessed,
|
||||
Map<String, Integer> typeCounts
|
||||
) {
|
||||
int upserted = sourceMasterDataRepository.upsertRelations(tenantKey, eventSourceId, buffer);
|
||||
log.info("YellowFox master-data relation progress tenant={} section={} chunk={} chunkSize={} processed={} byType={}",
|
||||
tenantKey, section, chunk, buffer.size(), alreadyProcessed + upserted, typeCounts);
|
||||
return upserted;
|
||||
}
|
||||
|
||||
private void increment(Map<String, Integer> counts, String type) {
|
||||
String key = type == null || type.isBlank() ? "UNKNOWN" : type;
|
||||
counts.merge(key, 1, Integer::sum);
|
||||
}
|
||||
|
||||
private String masterDataSection(String sqlResource) {
|
||||
int slash = sqlResource.lastIndexOf('/');
|
||||
String fileName = slash < 0 ? sqlResource : sqlResource.substring(slash + 1);
|
||||
return fileName.endsWith(".sql") ? fileName.substring(0, fileName.length() - 4) : fileName;
|
||||
}
|
||||
|
||||
private EventSourceDto masterDataSourceFor(EventSourceDto source) {
|
||||
return new EventSourceDto(
|
||||
source.providerKey(),
|
||||
MASTER_DATA_SOURCE_KIND,
|
||||
MASTER_DATA_SOURCE_KEY,
|
||||
source.sourceInstanceKey(),
|
||||
source.tenantProviderSettingKey(),
|
||||
source.externalFleetKey()
|
||||
);
|
||||
}
|
||||
|
||||
private SourceMasterEntityUpsert entity(ResultSet rs) throws SQLException {
|
||||
String entityType = string(rs, "entity_type");
|
||||
String sourceEntityId = string(rs, "source_entity_id");
|
||||
return new SourceMasterEntityUpsert(
|
||||
entityType,
|
||||
sourceEntityId,
|
||||
string(rs, "source_external_key"),
|
||||
string(rs, "display_name"),
|
||||
bool(rs, "active"),
|
||||
offsetDateTime(rs, "valid_from"),
|
||||
offsetDateTime(rs, "valid_to"),
|
||||
offsetDateTime(rs, "source_updated_at"),
|
||||
payload(rs)
|
||||
);
|
||||
}
|
||||
|
||||
private SourceMasterRelationUpsert relation(ResultSet rs) throws SQLException {
|
||||
return new SourceMasterRelationUpsert(
|
||||
string(rs, "relation_type"),
|
||||
string(rs, "from_entity_type"),
|
||||
string(rs, "from_source_entity_id"),
|
||||
string(rs, "to_entity_type"),
|
||||
string(rs, "to_source_entity_id"),
|
||||
offsetDateTime(rs, "valid_from"),
|
||||
offsetDateTime(rs, "valid_to"),
|
||||
offsetDateTime(rs, "source_updated_at"),
|
||||
payload(rs)
|
||||
);
|
||||
}
|
||||
|
||||
private Map<String, Object> payload(ResultSet rs) throws SQLException {
|
||||
ResultSetMetaData metaData = rs.getMetaData();
|
||||
Map<String, Object> payload = new LinkedHashMap<>();
|
||||
for (int i = 1; i <= metaData.getColumnCount(); i++) {
|
||||
String name = metaData.getColumnLabel(i);
|
||||
Object value = rs.getObject(i);
|
||||
if (value != null) {
|
||||
payload.put(name, value);
|
||||
}
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
private String loadSql(String location) {
|
||||
Resource resource = resourceLoader.getResource(location);
|
||||
try (var inputStream = resource.getInputStream()) {
|
||||
return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Cannot load YellowFox master-data SQL resource " + location, e);
|
||||
}
|
||||
}
|
||||
|
||||
private String string(ResultSet rs, String column) throws SQLException {
|
||||
String value = rs.getString(column);
|
||||
return value == null || value.isBlank() ? null : value.trim();
|
||||
}
|
||||
|
||||
private Boolean bool(ResultSet rs, String column) throws SQLException {
|
||||
Object value = rs.getObject(column);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof Boolean bool) {
|
||||
return bool;
|
||||
}
|
||||
if (value instanceof Number number) {
|
||||
return number.intValue() != 0;
|
||||
}
|
||||
return Boolean.parseBoolean(value.toString());
|
||||
}
|
||||
|
||||
private OffsetDateTime offsetDateTime(ResultSet rs, String column) throws SQLException {
|
||||
Object value = rs.getObject(column);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof OffsetDateTime offsetDateTime) {
|
||||
return offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC);
|
||||
}
|
||||
if (value instanceof Timestamp timestamp) {
|
||||
return timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC);
|
||||
}
|
||||
if (value instanceof LocalDateTime localDateTime) {
|
||||
return localDateTime.atOffset(ZoneOffset.UTC);
|
||||
}
|
||||
String text = value.toString();
|
||||
try {
|
||||
return OffsetDateTime.parse(text).withOffsetSameInstant(ZoneOffset.UTC);
|
||||
} catch (RuntimeException ignored) {
|
||||
return LocalDateTime.parse(text).atOffset(ZoneOffset.UTC);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
package at.procon.eventhub.yellowfox.service;
|
||||
|
||||
final class YellowFoxReferenceSemantics {
|
||||
|
||||
static final String SYNTHETIC_REFERENCE_NATION = "YELLOWFOX";
|
||||
|
||||
private YellowFoxReferenceSemantics() {
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
select
|
||||
'DRIVER_CARD' as entity_type,
|
||||
cast(d.id as varchar(128)) as source_entity_id,
|
||||
concat('YELLOWFOX:', d.drivers_card) as source_external_key,
|
||||
d.drivers_card as display_name,
|
||||
true as active,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
d.id as driver_id,
|
||||
d.drivers_card as card_number,
|
||||
'YELLOWFOX' as card_nation,
|
||||
d.fleet_id as fleet_id
|
||||
from data.driver d
|
||||
where nullif(trim(d.drivers_card), '') is not null;
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
select
|
||||
'DRIVER' as entity_type,
|
||||
cast(d.id as varchar(128)) as source_entity_id,
|
||||
coalesce(nullif(trim(d.drivers_card), ''), cast(d.id as varchar(128))) as source_external_key,
|
||||
nullif(trim(concat_ws(' ', d.firstname, d.name)), '') as display_name,
|
||||
true as active,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
d.id as driver_id,
|
||||
d.firstname as first_names,
|
||||
d.name as last_name,
|
||||
d.fleet_id as fleet_id
|
||||
from data.driver d;
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
select
|
||||
'FLEET' as entity_type,
|
||||
cast(f.id as varchar(128)) as source_entity_id,
|
||||
cast(f.id as varchar(128)) as source_external_key,
|
||||
nullif(trim(f.name), '') as display_name,
|
||||
true as active,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
f.id as fleet_id,
|
||||
f.name as fleet_name
|
||||
from data.fleet f;
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
select
|
||||
'DRIVER_FLEET' as relation_type,
|
||||
'DRIVER' as from_entity_type,
|
||||
cast(d.id as varchar(128)) as from_source_entity_id,
|
||||
'FLEET' as to_entity_type,
|
||||
cast(d.fleet_id as varchar(128)) as to_source_entity_id,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
'data.driver' as source_table,
|
||||
cast(d.id as varchar(128)) as source_row_id
|
||||
from data.driver d
|
||||
where d.fleet_id is not null
|
||||
|
||||
union all
|
||||
|
||||
select
|
||||
'DRIVER_CARD_DRIVER' as relation_type,
|
||||
'DRIVER_CARD' as from_entity_type,
|
||||
cast(d.id as varchar(128)) as from_source_entity_id,
|
||||
'DRIVER' as to_entity_type,
|
||||
cast(d.id as varchar(128)) as to_source_entity_id,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
'data.driver' as source_table,
|
||||
cast(d.id as varchar(128)) as source_row_id
|
||||
from data.driver d
|
||||
where nullif(trim(d.drivers_card), '') is not null
|
||||
|
||||
union all
|
||||
|
||||
select
|
||||
'VEHICLE_FLEET' as relation_type,
|
||||
'VEHICLE' as from_entity_type,
|
||||
cast(v.id as varchar(128)) as from_source_entity_id,
|
||||
'FLEET' as to_entity_type,
|
||||
cast(v.fleet_id as varchar(128)) as to_source_entity_id,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
'data.vehicle' as source_table,
|
||||
cast(v.id as varchar(128)) as source_row_id
|
||||
from data.vehicle v
|
||||
where v.fleet_id is not null
|
||||
|
||||
union all
|
||||
|
||||
select
|
||||
'VEHICLE_REGISTRATION_VEHICLE' as relation_type,
|
||||
'VEHICLE_REGISTRATION' as from_entity_type,
|
||||
cast(v.id as varchar(128)) as from_source_entity_id,
|
||||
'VEHICLE' as to_entity_type,
|
||||
cast(v.id as varchar(128)) as to_source_entity_id,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
'data.vehicle' as source_table,
|
||||
cast(v.id as varchar(128)) as source_row_id
|
||||
from data.vehicle v
|
||||
where nullif(trim(v.vrn), '') is not null
|
||||
|
||||
union all
|
||||
|
||||
select
|
||||
'VEHICLE_TELEMATIC_PROVIDER' as relation_type,
|
||||
'VEHICLE' as from_entity_type,
|
||||
cast(v.id as varchar(128)) as from_source_entity_id,
|
||||
'TELEMATIC_PROVIDER' as to_entity_type,
|
||||
cast(v.telematic_provider_id as varchar(128)) as to_source_entity_id,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
'data.vehicle' as source_table,
|
||||
cast(v.id as varchar(128)) as source_row_id
|
||||
from data.vehicle v
|
||||
where v.telematic_provider_id is not null;
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
select
|
||||
'TELEMATIC_PROVIDER' as entity_type,
|
||||
cast(tp.id as varchar(128)) as source_entity_id,
|
||||
cast(tp.id as varchar(128)) as source_external_key,
|
||||
nullif(trim(tp.name), '') as display_name,
|
||||
true as active,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
tp.id as telematic_provider_id,
|
||||
tp.name as telematic_provider_name
|
||||
from data.telematic_provider tp;
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
select
|
||||
'VEHICLE_REGISTRATION' as entity_type,
|
||||
cast(v.id as varchar(128)) as source_entity_id,
|
||||
concat('YELLOWFOX:', v.vrn) as source_external_key,
|
||||
v.vrn as display_name,
|
||||
true as active,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
v.id as vehicle_id,
|
||||
v.vin as vin,
|
||||
'YELLOWFOX' as registration_nation,
|
||||
v.vrn as registration_number,
|
||||
v.fleet_id as fleet_id
|
||||
from data.vehicle v
|
||||
where nullif(trim(v.vrn), '') is not null;
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
select
|
||||
'VEHICLE' as entity_type,
|
||||
cast(v.id as varchar(128)) as source_entity_id,
|
||||
coalesce(nullif(trim(v.vin), ''), cast(v.id as varchar(128))) as source_external_key,
|
||||
coalesce(nullif(trim(v.vrn), ''), nullif(trim(v.vin), ''), cast(v.id as varchar(128))) as display_name,
|
||||
true as active,
|
||||
null::timestamptz as valid_from,
|
||||
null::timestamptz as valid_to,
|
||||
null::timestamptz as source_updated_at,
|
||||
v.id as vehicle_id,
|
||||
v.vin as vin,
|
||||
v.vrn as registration_number,
|
||||
'YELLOWFOX' as registration_nation,
|
||||
v.fleet_id as fleet_id,
|
||||
v.telematic_provider_id as telematic_provider_id
|
||||
from data.vehicle v;
|
||||
Loading…
Reference in New Issue