From e84dfef614b18c60f1540ffc17e83e946ca68a1a Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Fri, 8 May 2026 16:50:34 +0200 Subject: [PATCH] Fix YellowFox bounded import cursor and ignition handling --- docs/sql/truncate-eventhub-data.sql | 21 ++ .../eventhub/config/EventHubProperties.java | 22 ++ .../AbstractConfiguredImportPlanService.java | 27 +- .../importing/ImportChunkPlanner.java | 7 +- .../AbstractJdbcExtractionBatchExecutor.java | 38 ++- .../persistence/ImportCursorRepository.java | 39 +++ .../persistence/DriverIdentityRepository.java | 86 ++++-- .../VehicleIdentityRepository.java | 263 +++++++++++++----- .../TachographMasterDataRefreshService.java | 17 +- .../yellowfox/dto/YellowFoxD8BookingDto.java | 1 + ...owFoxD8BookingExtractionBatchExecutor.java | 121 ++++++-- .../service/YellowFoxD8BookingRowMapper.java | 58 +++- ...YellowFoxD8IgnitionTransitionDetector.java | 5 +- .../YellowFoxMasterDataRefreshService.java | 17 +- src/main/resources/application.yml | 29 +- .../resources/sql/yellowfox/d8-bookings.sql | 26 +- .../yellowfox/master-data/driver-cards.sql | 8 +- .../sql/yellowfox/master-data/drivers.sql | 2 +- .../sql/yellowfox/master-data/relations.sql | 5 +- .../master-data/vehicle-registrations.sql | 10 +- .../sql/yellowfox/master-data/vehicles.sql | 13 +- .../YellowFoxD8BookingEventMapperTest.java | 1 + .../importing/ImportChunkPlannerTest.java | 42 +++ ...JdbcExtractionBatchExecutorCursorTest.java | 187 +++++++++++++ ...kingExtractionBatchExecutorCursorTest.java | 105 +++++++ ...xD8BookingExtractionBatchExecutorTest.java | 110 ++++++++ .../YellowFoxD8BookingRowMapperTest.java | 90 ++++++ ...wFoxD8ConfiguredImportPlanServiceTest.java | 39 +++ ...owFoxD8IgnitionTransitionDetectorTest.java | 65 +++++ 29 files changed, 1267 insertions(+), 187 deletions(-) create mode 100644 docs/sql/truncate-eventhub-data.sql create mode 100644 src/test/java/at/procon/eventhub/importing/ImportChunkPlannerTest.java create mode 100644 src/test/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutorCursorTest.java create mode 100644 src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest.java create mode 100644 src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorTest.java create mode 100644 src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapperTest.java create mode 100644 src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanServiceTest.java create mode 100644 src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetectorTest.java diff --git a/docs/sql/truncate-eventhub-data.sql b/docs/sql/truncate-eventhub-data.sql new file mode 100644 index 0000000..47265ed --- /dev/null +++ b/docs/sql/truncate-eventhub-data.sql @@ -0,0 +1,21 @@ +-- Truncate all application data in the eventhub schema. +-- Keeps the schema and Flyway migration history intact. +-- Intended for PostgreSQL / TimescaleDB environments. + +DO $$ +DECLARE + table_list text; +BEGIN + SELECT string_agg(format('%I.%I', schemaname, tablename), ', ' ORDER BY tablename) + INTO table_list + FROM pg_tables + WHERE schemaname = 'eventhub'; + + IF table_list IS NULL THEN + RAISE NOTICE 'No tables found in schema eventhub.'; + RETURN; + END IF; + + EXECUTE 'TRUNCATE TABLE ' || table_list || ' RESTART IDENTITY CASCADE'; +END +$$; diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index c5fa55a..290754c 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -224,6 +224,9 @@ public class EventHubProperties { /** How JDBC extraction batches are handed over to the ingest pipeline. */ private JdbcExtractionIngestMode jdbcExtractionIngestMode = JdbcExtractionIngestMode.SYNC_DIRECT; + /** Whether master-data refresh reconciles vehicle registrations and assignments. */ + private boolean syncVehicleRegistrationsOnMasterDataUpdate = true; + /** Configured tenant/source import plans. */ private List importPlans = new ArrayList<>(); @@ -280,6 +283,14 @@ public class EventHubProperties { : jdbcExtractionIngestMode; } + public boolean isSyncVehicleRegistrationsOnMasterDataUpdate() { + return syncVehicleRegistrationsOnMasterDataUpdate; + } + + public void setSyncVehicleRegistrationsOnMasterDataUpdate(boolean syncVehicleRegistrationsOnMasterDataUpdate) { + this.syncVehicleRegistrationsOnMasterDataUpdate = syncVehicleRegistrationsOnMasterDataUpdate; + } + public List getImportPlans() { return importPlans; } @@ -354,6 +365,9 @@ public class EventHubProperties { /** Emit a first ignition snapshot per vehicle if no previous D8 ignition state exists in the imported window. */ private boolean emitInitialIgnitionSnapshot = false; + /** Whether master-data refresh reconciles vehicle registrations and assignments. */ + private boolean syncVehicleRegistrationsOnMasterDataUpdate = true; + /** Configured tenant/source import plans. */ private List importPlans = new ArrayList<>(); @@ -412,6 +426,14 @@ public class EventHubProperties { this.emitInitialIgnitionSnapshot = emitInitialIgnitionSnapshot; } + public boolean isSyncVehicleRegistrationsOnMasterDataUpdate() { + return syncVehicleRegistrationsOnMasterDataUpdate; + } + + public void setSyncVehicleRegistrationsOnMasterDataUpdate(boolean syncVehicleRegistrationsOnMasterDataUpdate) { + this.syncVehicleRegistrationsOnMasterDataUpdate = syncVehicleRegistrationsOnMasterDataUpdate; + } + public List getImportPlans() { return importPlans; } diff --git a/src/main/java/at/procon/eventhub/importing/AbstractConfiguredImportPlanService.java b/src/main/java/at/procon/eventhub/importing/AbstractConfiguredImportPlanService.java index 8daae46..8abf31c 100644 --- a/src/main/java/at/procon/eventhub/importing/AbstractConfiguredImportPlanService.java +++ b/src/main/java/at/procon/eventhub/importing/AbstractConfiguredImportPlanService.java @@ -64,10 +64,15 @@ public abstract class AbstractConfiguredImportPlanService params = parameters(request, chunkScope, cursor); String sql = loadSql(definition.sqlResource()); @@ -290,6 +282,34 @@ public abstract class AbstractJdbcExtractionBatchExecutor rs.next() + ? new ImportCursorStateDto( + rs.getObject("last_source_package_imported_at", java.time.OffsetDateTime.class), + rs.getString("last_source_package_id"), + rs.getObject("last_source_row_updated_at", java.time.OffsetDateTime.class), + rs.getObject("last_occurred_to", java.time.OffsetDateTime.class) + ) + : null, + tenantKey, + eventSourceId, + scopeHash, + eventFamily.name(), + sourceKind + ); + } + public void advanceCursor( String tenantKey, int eventSourceId, diff --git a/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java b/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java index 83e2860..bc0191e 100644 --- a/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/DriverIdentityRepository.java @@ -16,6 +16,9 @@ import org.springframework.transaction.annotation.Transactional; @Repository public class DriverIdentityRepository { + private static final String YELLOWFOX_SYNTHETIC_REFERENCE_NATION = "YELLOWFOX"; + private static final String UNKNOWN_CARD_NATION = "UNKNOWN"; + private final JdbcTemplate jdbcTemplate; private final ObjectMapper objectMapper; @@ -37,7 +40,7 @@ public class DriverIdentityRepository { String sourceDriverEntityId = normalizeNullable(driverRef.sourceEntityId()); DriverCardRefDto driverCard = driverRef.driverCard(); String cardNation = driverCard == null ? null : normalizeNullable(driverCard.nation()); - String cardNumber = driverCard == null ? null : normalizeNullable(driverCard.number()); + String cardNumber = driverCard == null ? null : normalizeDriverCardNumber(cardNation, driverCard.number()); UUID driverId = findBySourceDriverEntityId(normalizedTenantKey, eventSourceId, sourceDriverEntityId); UUID driverCardId = resolveOrCreateDriverCardId(cardNation, cardNumber, driverId); @@ -120,10 +123,15 @@ public class DriverIdentityRepository { master.payload, coalesce(identity.driver_id, gen_random_uuid()) as driver_id from master_drivers master - left join eventhub.source_driver_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_driver_entity_id = master.source_driver_entity_id + left join lateral ( + select identity.driver_id + from eventhub.source_driver_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_entity_id = master.source_driver_entity_id + order by identity.updated_at desc, identity.id desc + limit 1 + ) identity on true ) insert into eventhub.driver( id, tenant_key, event_source_id, source_driver_entity_id, @@ -184,10 +192,15 @@ public class DriverIdentityRepository { master.payload, coalesce(identity.driver_id, gen_random_uuid()) as driver_id from master_drivers master - left join eventhub.source_driver_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_driver_entity_id = master.source_driver_entity_id + left join lateral ( + select identity.driver_id + from eventhub.source_driver_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_entity_id = master.source_driver_entity_id + order by identity.updated_at desc, identity.id desc + limit 1 + ) identity on true ) insert into eventhub.driver( id, first_names, last_name, birth_date, source_updated_at, payload, updated_at @@ -241,10 +254,15 @@ public class DriverIdentityRepository { payload = driver.payload || master.payload, updated_at = now() from master_drivers master - join eventhub.source_driver_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_driver_entity_id = master.source_driver_entity_id + join lateral ( + select identity.driver_id + from eventhub.source_driver_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_entity_id = master.source_driver_entity_id + order by identity.updated_at desc, identity.id desc + limit 1 + ) identity on true where identity.driver_id = driver.id """, eventSourceId, @@ -280,10 +298,15 @@ public class DriverIdentityRepository { limit 1 )) as driver_id from master_drivers master - left join eventhub.source_driver_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_driver_entity_id = master.source_driver_entity_id + left join lateral ( + select identity.driver_id + from eventhub.source_driver_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_entity_id = master.source_driver_entity_id + order by identity.updated_at desc, identity.id desc + limit 1 + ) identity on true ) insert into eventhub.source_driver_identity( id, tenant_key, event_source_id, source_driver_entity_id, @@ -515,10 +538,15 @@ public class DriverIdentityRepository { master.payload, coalesce(identity.driver_card_id, existing.id) as driver_card_id from master_driver_cards master - left join eventhub.source_driver_card_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_driver_card_entity_id = master.source_driver_card_entity_id + left join lateral ( + select identity.driver_card_id + from eventhub.source_driver_card_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_driver_card_entity_id = master.source_driver_card_entity_id + order by identity.updated_at desc, identity.id desc + limit 1 + ) identity on true left join existing_driver_cards existing on existing.nation = master.card_nation and existing.card_number = master.card_number @@ -735,6 +763,22 @@ public class DriverIdentityRepository { return legacyColumns != null && legacyColumns == 3; } + private String normalizeDriverCardNumber(String cardNation, String cardNumber) { + String normalized = normalizeNullable(cardNumber); + if (normalized == null) { + return null; + } + if (isSyntheticYellowFoxCardNation(cardNation)) { + return normalized.length() <= 14 ? normalized : normalized.substring(0, 14); + } + return normalized; + } + + private boolean isSyntheticYellowFoxCardNation(String cardNation) { + return YELLOWFOX_SYNTHETIC_REFERENCE_NATION.equalsIgnoreCase(cardNation) + || UNKNOWN_CARD_NATION.equalsIgnoreCase(cardNation); + } + private UUID createDriverCard( UUID driverId, String cardNation, diff --git a/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java index fc2e414..a7be805 100644 --- a/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java @@ -14,6 +14,9 @@ import org.springframework.transaction.annotation.Transactional; @Repository public class VehicleIdentityRepository { + private static final String YELLOWFOX_SYNTHETIC_REFERENCE_NATION = "YELLOWFOX"; + private static final String UNKNOWN_REGISTRATION_NATION = "UNKNOWN"; + private final JdbcTemplate jdbcTemplate; private final ObjectMapper objectMapper; @@ -39,6 +42,7 @@ public class VehicleIdentityRepository { VehicleRegistrationRefDto registration = vehicleRef.vehicleRegistration(); String registrationNation = registration == null ? null : normalizeNullable(registration.nation()); String registrationNumber = registration == null ? null : normalizeNullable(registration.number()); + String registrationNationForCreate = creationNation(registrationNation, registrationNumber); UUID registrationId = resolveRegistrationId( normalizedTenantKey, @@ -49,7 +53,7 @@ public class VehicleIdentityRepository { ); if (registrationId == null && (sourceRegistrationEntityId != null || registrationNumber != null)) { registrationId = createRegistration( - registrationNation, + registrationNationForCreate, registrationNumber, null, Map.of("source", "event") @@ -77,7 +81,7 @@ public class VehicleIdentityRepository { assignedVehicle = resolveAssignedVehicleReference(normalizedTenantKey, eventSourceId, registrationId, occurredAt); vehicleId = assignedVehicle == null ? null : assignedVehicle.vehicleId(); } - if (vehicleId == null && (sourceVehicleEntityId != null || vin != null)) { + if (vehicleId == null && vin != null) { vehicleId = createVehicle(vin); } if (vehicleId != null && sourceVehicleEntityId != null) { @@ -91,7 +95,7 @@ public class VehicleIdentityRepository { ); } touchVehicle(vehicleId, vin); - touchRegistration(registrationId, registrationNation, registrationNumber); + touchRegistration(registrationId, registrationNationForCreate, registrationNumber); return new ResolvedVehicleReferenceResolution( new ResolvedVehicleReference(vehicleId, registrationId), @@ -103,10 +107,17 @@ public class VehicleIdentityRepository { @Transactional public int reconcileFromMasterData(String tenantKey, int eventSourceId) { + return reconcileFromMasterData(tenantKey, eventSourceId, true); + } + + @Transactional + public int reconcileFromMasterData(String tenantKey, int eventSourceId, boolean syncVehicleRegistrations) { String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey"); int updates = reconcileVehiclesFromMasterData(normalizedTenantKey, eventSourceId); - updates += reconcileRegistrationsFromMasterData(normalizedTenantKey, eventSourceId); - updates += projectVehicleRegistrationAssignments(normalizedTenantKey, eventSourceId); + if (syncVehicleRegistrations) { + updates += reconcileRegistrationsFromMasterData(normalizedTenantKey, eventSourceId); + updates += projectVehicleRegistrationAssignments(normalizedTenantKey, eventSourceId); + } return updates; } @@ -125,10 +136,7 @@ public class VehicleIdentityRepository { where tenant_key = ? and event_source_id in (select id from compatible_sources) and entity_type = 'VEHICLE' - and ( - nullif(trim(source_entity_id), '') is not null - or nullif(trim(source_external_key), '') is not null - ) + and nullif(trim(source_external_key), '') is not null order by nullif(trim(source_entity_id), ''), nullif(trim(source_external_key), ''), updated_at desc @@ -139,13 +147,23 @@ public class VehicleIdentityRepository { master.vin, coalesce(identity.vehicle_id, existing.id, gen_random_uuid()) as vehicle_id from master_vehicles master - left join eventhub.source_vehicle_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_vehicle_entity_id = master.source_vehicle_entity_id - left join eventhub.vehicle existing - on master.vin is not null - and existing.vin = master.vin + left join lateral ( + select identity.vehicle_id + from eventhub.source_vehicle_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_vehicle_entity_id = master.source_vehicle_entity_id + order by identity.updated_at desc, identity.id desc + limit 1 + ) identity on true + left join lateral ( + select existing.id + from eventhub.vehicle existing + where master.vin is not null + and existing.vin = master.vin + order by existing.updated_at desc, existing.created_at desc, existing.id + limit 1 + ) existing on true ) insert into eventhub.vehicle(id, vin, updated_at) select distinct on (resolved.vehicle_id) @@ -177,10 +195,7 @@ public class VehicleIdentityRepository { where tenant_key = ? and event_source_id in (select id from compatible_sources) and entity_type = 'VEHICLE' - and ( - nullif(trim(source_entity_id), '') is not null - or nullif(trim(source_external_key), '') is not null - ) + and nullif(trim(source_external_key), '') is not null order by nullif(trim(source_entity_id), ''), nullif(trim(source_external_key), ''), updated_at desc @@ -189,15 +204,21 @@ public class VehicleIdentityRepository { set vin = coalesce(vehicle.vin, master.vin), updated_at = now() from master_vehicles master - left join eventhub.source_vehicle_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_vehicle_entity_id = master.source_vehicle_entity_id + left join lateral ( + select identity.vehicle_id + from eventhub.source_vehicle_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_vehicle_entity_id = master.source_vehicle_entity_id + order by identity.updated_at desc, identity.id desc + limit 1 + ) identity on true where vehicle.id = coalesce(identity.vehicle_id, ( select existing.id from eventhub.vehicle existing where master.vin is not null and existing.vin = master.vin + order by existing.updated_at desc, existing.created_at desc, existing.id limit 1 )) """, @@ -221,6 +242,7 @@ public class VehicleIdentityRepository { and event_source_id in (select id from compatible_sources) and entity_type = 'VEHICLE' and nullif(trim(source_entity_id), '') is not null + and nullif(trim(source_external_key), '') is not null order by nullif(trim(source_entity_id), ''), nullif(trim(source_external_key), ''), updated_at desc @@ -230,13 +252,23 @@ public class VehicleIdentityRepository { master.source_vehicle_entity_id, coalesce(identity.vehicle_id, existing.id) as vehicle_id from master_vehicles master - left join eventhub.source_vehicle_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_vehicle_entity_id = master.source_vehicle_entity_id - left join eventhub.vehicle existing - on master.vin is not null - and existing.vin = master.vin + left join lateral ( + select identity.vehicle_id + from eventhub.source_vehicle_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_vehicle_entity_id = master.source_vehicle_entity_id + order by identity.updated_at desc, identity.id desc + limit 1 + ) identity on true + left join lateral ( + select existing.id + from eventhub.vehicle existing + where master.vin is not null + and existing.vin = master.vin + order by existing.updated_at desc, existing.created_at desc, existing.id + limit 1 + ) existing on true ) insert into eventhub.source_vehicle_identity( id, tenant_key, event_source_id, source_vehicle_entity_id, @@ -316,21 +348,41 @@ public class VehicleIdentityRepository { ), updated_at desc ), - resolved_registrations as ( - select master.event_source_id, - master.source_registration_entity_id, + canonical_registrations as ( + select distinct on (master.nation, master.registration_number) master.nation, master.registration_number, - master.source_updated_at, - coalesce(identity.vehicle_registration_id, existing.id, gen_random_uuid()) as registration_id + master.source_updated_at from master_registrations master - left join eventhub.source_vehicle_registration_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_registration_entity_id = master.source_registration_entity_id - left join eventhub.vehicle_registration existing - on existing.nation = master.nation - and existing.registration_number = master.registration_number + where master.nation is not null + and master.registration_number is not null + order by master.nation, + master.registration_number, + case when master.source_registration_entity_id is null then 1 else 0 end, + master.source_updated_at desc, + master.source_registration_entity_id + ), + existing_registrations as ( + select distinct on (existing.nation, existing.registration_number) + existing.id, + existing.nation, + existing.registration_number + from eventhub.vehicle_registration existing + order by existing.nation, + existing.registration_number, + existing.updated_at desc, + existing.created_at desc, + existing.id + ), + resolved_registrations as ( + select canonical.nation, + canonical.registration_number, + canonical.source_updated_at, + coalesce(existing.id, gen_random_uuid()) as registration_id + from canonical_registrations canonical + left join existing_registrations existing + on existing.nation = canonical.nation + and existing.registration_number = canonical.registration_number ) insert into eventhub.vehicle_registration( id, nation, registration_number, source_updated_at, payload, updated_at @@ -343,16 +395,13 @@ public class VehicleIdentityRepository { jsonb_build_object('source', 'master-data'), now() from resolved_registrations resolved - where resolved.nation is not null - and resolved.registration_number is not null - and not exists ( + where not exists ( select 1 from eventhub.vehicle_registration existing where existing.id = resolved.registration_id ) """, eventSourceId, - tenantKey, tenantKey ); @@ -402,26 +451,49 @@ public class VehicleIdentityRepository { nullif(substring(source_external_key from position(':' in source_external_key) + 1), '') ), updated_at desc + ), + canonical_registrations as ( + select distinct on (master.nation, master.registration_number) + master.nation, + master.registration_number, + master.source_updated_at + from master_registrations master + where master.nation is not null + and master.registration_number is not null + order by master.nation, + master.registration_number, + case when master.source_registration_entity_id is null then 1 else 0 end, + master.source_updated_at desc, + master.source_registration_entity_id + ), + existing_registrations as ( + select distinct on (existing.nation, existing.registration_number) + existing.id, + existing.nation, + existing.registration_number + from eventhub.vehicle_registration existing + order by existing.nation, + existing.registration_number, + existing.updated_at desc, + existing.created_at desc, + existing.id + ), + resolved_registrations as ( + select canonical.source_updated_at, + existing.id as registration_id + from canonical_registrations canonical + join existing_registrations existing + on existing.nation = canonical.nation + and existing.registration_number = canonical.registration_number ) update eventhub.vehicle_registration registration - set source_updated_at = coalesce(master.source_updated_at, registration.source_updated_at), + set source_updated_at = coalesce(resolved.source_updated_at, registration.source_updated_at), payload = registration.payload || jsonb_build_object('source', 'master-data'), updated_at = now() - from master_registrations master - left join eventhub.source_vehicle_registration_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_registration_entity_id = master.source_registration_entity_id - where registration.id = coalesce(identity.vehicle_registration_id, ( - select existing.id - from eventhub.vehicle_registration existing - where existing.nation = master.nation - and existing.registration_number = master.registration_number - limit 1 - )) + from resolved_registrations resolved + where registration.id = resolved.registration_id """, eventSourceId, - tenantKey, tenantKey ); @@ -466,17 +538,34 @@ public class VehicleIdentityRepository { ), updated_at desc ), + existing_registrations as ( + select distinct on (existing.nation, existing.registration_number) + existing.id, + existing.nation, + existing.registration_number + from eventhub.vehicle_registration existing + order by existing.nation, + existing.registration_number, + existing.updated_at desc, + existing.created_at desc, + existing.id + ), resolved_registrations as ( select master.event_source_id, master.source_registration_entity_id, master.source_updated_at, coalesce(identity.vehicle_registration_id, existing.id) as registration_id from master_registrations master - left join eventhub.source_vehicle_registration_identity identity - on identity.tenant_key = ? - and identity.event_source_id in (select id from compatible_sources) - and identity.source_registration_entity_id = master.source_registration_entity_id - left join eventhub.vehicle_registration existing + left join lateral ( + select identity.vehicle_registration_id + from eventhub.source_vehicle_registration_identity identity + where identity.tenant_key = ? + and identity.event_source_id in (select id from compatible_sources) + and identity.source_registration_entity_id = master.source_registration_entity_id + order by identity.updated_at desc, identity.id desc + limit 1 + ) identity on true + left join existing_registrations existing on existing.nation = master.nation and existing.registration_number = master.registration_number ) @@ -530,6 +619,13 @@ public class VehicleIdentityRepository { String nation, String registrationNumber ) { + if (isYellowFoxSyntheticRegistrationNation(nation) && registrationNumber != null) { + UUID registrationId = findRegistrationByNumber(registrationNumber); + if (registrationId != null) { + return registrationId; + } + return findRegistrationBySourceRegistrationEntityId(tenantKey, eventSourceId, sourceRegistrationEntityId); + } UUID registrationId = findRegistrationBySourceRegistrationEntityId(tenantKey, eventSourceId, sourceRegistrationEntityId); if (registrationId == null) { registrationId = findRegistrationByPlate(nation, registrationNumber); @@ -619,6 +715,28 @@ public class VehicleIdentityRepository { ); } + private UUID findRegistrationByNumber(String registrationNumber) { + if (registrationNumber == null) { + return null; + } + return jdbcTemplate.query( + """ + select r.id + from eventhub.vehicle_registration r + where r.registration_number = ? + order by + case when r.nation = ? then 0 else 1 end, + r.updated_at desc, + r.created_at desc, + r.id + limit 1 + """, + rs -> rs.next() ? (UUID) rs.getObject("id") : null, + registrationNumber, + UNKNOWN_REGISTRATION_NATION + ); + } + private AssignedVehicleReference resolveAssignedVehicleReference( String tenantKey, int eventSourceId, @@ -894,6 +1012,19 @@ public class VehicleIdentityRepository { return trimmed.isEmpty() ? null : trimmed; } + private boolean isYellowFoxSyntheticRegistrationNation(String nation) { + return YELLOWFOX_SYNTHETIC_REFERENCE_NATION.equalsIgnoreCase(nation); + } + + private String creationNation(String registrationNation, String registrationNumber) { + if (registrationNumber == null) { + return registrationNation; + } + return isYellowFoxSyntheticRegistrationNation(registrationNation) + ? UNKNOWN_REGISTRATION_NATION + : registrationNation; + } + public record ResolvedVehicleReference(UUID vehicleId, UUID vehicleRegistrationId) { public static ResolvedVehicleReference empty() { return new ResolvedVehicleReference(null, null); diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java index ee6ca4b..547fb91 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java @@ -1,5 +1,6 @@ package at.procon.eventhub.tachograph.service; +import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult; import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert; import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert; @@ -56,6 +57,7 @@ public class TachographMasterDataRefreshService { private final DriverIdentityRepository driverIdentityRepository; private final VehicleIdentityRepository vehicleIdentityRepository; private final ResourceLoader resourceLoader; + private final EventHubProperties properties; public TachographMasterDataRefreshService( @Qualifier("tachographNamedParameterJdbcTemplate") ObjectProvider tachographJdbcTemplateProvider, @@ -63,7 +65,8 @@ public class TachographMasterDataRefreshService { EventSourceRepository eventSourceRepository, DriverIdentityRepository driverIdentityRepository, VehicleIdentityRepository vehicleIdentityRepository, - ResourceLoader resourceLoader + ResourceLoader resourceLoader, + EventHubProperties properties ) { this.tachographJdbcTemplateProvider = tachographJdbcTemplateProvider; this.sourceMasterDataRepository = sourceMasterDataRepository; @@ -71,6 +74,7 @@ public class TachographMasterDataRefreshService { this.driverIdentityRepository = driverIdentityRepository; this.vehicleIdentityRepository = vehicleIdentityRepository; this.resourceLoader = resourceLoader; + this.properties = properties; } public MasterDataRefreshResult refreshIfRequested(TachographImportRequest request) { @@ -101,14 +105,19 @@ public class TachographMasterDataRefreshService { } int relationCount = streamRelations(tachographJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE)); + boolean syncVehicleRegistrations = properties.getTachograph().isSyncVehicleRegistrationsOnMasterDataUpdate(); log.info("Reconciling tachograph driver identities from source master data tenant={} source={}", tenantKey, masterDataSource.stableKey()); int reconciledDrivers = driverIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); - log.info("Reconciling tachograph vehicle identities from source master data tenant={} source={}", - tenantKey, masterDataSource.stableKey()); - int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); + log.info("Reconciling tachograph vehicle identities from source master data tenant={} source={} syncVehicleRegistrations={}", + tenantKey, masterDataSource.stableKey(), syncVehicleRegistrations); + int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData( + tenantKey, + eventSourceId, + syncVehicleRegistrations + ); MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount); log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={} reconciledDrivers={} reconciledVehicles={}", diff --git a/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8BookingDto.java b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8BookingDto.java index 9c84354..4e02482 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8BookingDto.java +++ b/src/main/java/at/procon/eventhub/yellowfox/dto/YellowFoxD8BookingDto.java @@ -15,6 +15,7 @@ public record YellowFoxD8BookingDto( String eventId, String key, Integer ignition, + Integer previousIgnition, Integer eventType, Integer state, OffsetDateTime occurredAt, diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java index 2a7d5da..ee34f31 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java @@ -83,15 +83,15 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk); ImportCursorStateDto cursor = findCursor(eventSourceId, request, planItem); - Map params = parameters(request, chunkScope, cursor); + QuerySpec query = buildQuerySpec(request, chunkScope, cursor); Stats stats = new Stats(); YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = ignitionTransitionDetector .newSession(properties.getYellowFox().isEmitInitialIgnitionSnapshot()); log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={}", - request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), params.get("fleetId"), request.acquisitionStrategy()); + request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), query.fleetId(), request.acquisitionStrategy()); - jdbcTemplate.query(loadSql(), params, rs -> { + jdbcTemplate.query(query.sql(), query.params(), rs -> { stats.sourceRowsRead++; YellowFoxD8BookingDto booking = rowMapper.map( rs, @@ -136,9 +136,33 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD ); } - private ImportCursorStateDto findCursor(int eventSourceId, YellowFoxD8ImportRequest request, ImportPlanItemDto planItem) { + QuerySpec buildQuerySpec(YellowFoxD8ImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) { + Map params = new HashMap<>(); + StringBuilder filters = new StringBuilder("where 1 = 1"); + + if (scope != null && scope.occurredFrom() != null) { + params.put("occurredFrom", scope.occurredFrom()); + filters.append("\n and b.utc >= :occurredFrom"); + } + if (scope != null && scope.occurredTo() != null) { + params.put("occurredTo", scope.occurredTo()); + filters.append("\n and b.utc < :occurredTo"); + } + + Integer fleetId = fleetId(request); + if (fleetId != null) { + params.put("fleetId", fleetId); + filters.append("\n and f.id = :fleetId"); + } + + appendCursorFilter(filters, params, request, cursor); + + return new QuerySpec(applyFilters(loadSqlTemplate(), filters.toString()), params, fleetId); + } + + ImportCursorStateDto findCursor(int eventSourceId, YellowFoxD8ImportRequest request, ImportPlanItemDto planItem) { String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(); - return importCursorRepository.findCursor( + ImportCursorStateDto cursor = importCursorRepository.findCursor( request.tenantKey(), eventSourceId, scopeHash, @@ -146,28 +170,16 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD planItem.sourceKind(), request.acquisitionStrategy() ); - } - - private Map parameters(YellowFoxD8ImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) { - Map params = new HashMap<>(); - params.put("occurredFrom", scope == null ? null : scope.occurredFrom()); - params.put("occurredTo", scope == null ? null : scope.occurredTo()); - params.put("fleetId", fleetId(request)); - if (request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_ROW_WATERMARK && cursor != null) { - OffsetDateTime lastOccurredTo = cursor.lastOccurredTo(); - String lastSourceRowId = cursor.lastSourcePackageId(); - if (lastOccurredTo != null && properties.getYellowFox().getOccurredAtOverlap() != null - && !properties.getYellowFox().getOccurredAtOverlap().isZero()) { - lastOccurredTo = lastOccurredTo.minus(properties.getYellowFox().getOccurredAtOverlap()); - lastSourceRowId = null; - } - params.put("lastOccurredTo", lastOccurredTo); - params.put("lastSourceRowId", lastSourceRowId); - } else { - params.put("lastOccurredTo", null); - params.put("lastSourceRowId", null); + if (cursor != null || !shouldBootstrapWatermarkCursor(request)) { + return cursor; } - return params; + return importCursorRepository.findLatestCursor( + request.tenantKey(), + eventSourceId, + scopeHash, + planItem.eventFamily(), + planItem.sourceKind() + ); } private Integer fleetId(YellowFoxD8ImportRequest request) { @@ -204,7 +216,59 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD stats.acceptEvent(event); } - private String loadSql() { + private void appendCursorFilter( + StringBuilder filters, + Map params, + YellowFoxD8ImportRequest request, + ImportCursorStateDto cursor + ) { + if (request.acquisitionStrategy() != AcquisitionStrategy.SOURCE_ROW_WATERMARK || cursor == null) { + return; + } + + OffsetDateTime lastOccurredTo = cursor.lastOccurredTo(); + String lastSourceRowId = cursor.lastSourcePackageId(); + if (lastOccurredTo == null) { + return; + } + if (properties.getYellowFox().getOccurredAtOverlap() != null + && !properties.getYellowFox().getOccurredAtOverlap().isZero()) { + lastOccurredTo = lastOccurredTo.minus(properties.getYellowFox().getOccurredAtOverlap()); + lastSourceRowId = null; + } + + params.put("lastOccurredTo", lastOccurredTo); + if (lastSourceRowId == null || lastSourceRowId.isBlank()) { + filters.append("\n and b.utc >= :lastOccurredTo"); + return; + } + + params.put("lastSourceRowId", lastSourceRowId); + filters.append(""" + + and ( + b.utc > :lastOccurredTo + or ( + b.utc = :lastOccurredTo + and b.eventid > :lastSourceRowId + ) + )"""); + } + + private boolean shouldBootstrapWatermarkCursor(YellowFoxD8ImportRequest request) { + return request.mode() == at.procon.eventhub.dto.ImportMode.INCREMENTAL_UPDATE + && (request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_ROW_WATERMARK + || request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK); + } + + private String applyFilters(String sqlTemplate, String filters) { + if (!sqlTemplate.contains("/*__FILTERS__*/")) { + throw new IllegalStateException("YellowFox D8 extraction SQL template is missing filter marker"); + } + return sqlTemplate.replace("/*__FILTERS__*/", filters); + } + + private String loadSqlTemplate() { Resource resource = resourceLoader.getResource("classpath:sql/yellowfox/d8-bookings.sql"); try (var in = resource.getInputStream()) { return StreamUtils.copyToString(in, StandardCharsets.UTF_8); @@ -213,6 +277,9 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD } } + static record QuerySpec(String sql, Map params, Integer fleetId) { + } + private static class Stats { private int sourceRowsRead; private int eventsSent; diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java index e308b9e..6a34622 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapper.java @@ -28,29 +28,29 @@ public class YellowFoxD8BookingRowMapper { } public YellowFoxD8BookingDto map(ResultSet rs, String tenantKey, String sourceInstanceKey, String tenantProviderSettingKey) throws SQLException { - String eventId = rs.getString("eventid"); + String eventId = string(rs, "eventid"); OffsetDateTime occurredAt = rs.getObject("utc", OffsetDateTime.class); Integer vehicleId = getInteger(rs, "vehicle_id"); Integer driverId = getInteger(rs, "driver_id"); - String vehicleVrn = rs.getString("vehicle_vrn"); - String vehicleVin = rs.getString("vehicle_vin"); - String driverCard = rs.getString("driver_card_number"); + String vehicleVrn = string(rs, "vehicle_vrn"); + String vehicleVin = string(rs, "vehicle_vin"); + String driverCard = normalizeBookingDriverCardNumber(string(rs, "driver_card_number")); Integer fleetId = getInteger(rs, "fleet_id"); - String fleetName = rs.getString("fleet_name"); + String fleetName = string(rs, "fleet_name"); Integer odometer = getInteger(rs, "odometer"); - Map payload = payload(rs.getString("payload")); + Map payload = trimPayloadStrings(payload(rs.getString("payload"))); put(payload, "yellowFoxEventId", eventId); put(payload, "yellowFoxOdometerRaw", odometer); put(payload, "vehicleVrn", vehicleVrn); put(payload, "vehicleVin", vehicleVin); put(payload, "driverCardNumber", driverCard); - put(payload, "driverFirstName", rs.getString("driver_firstname")); - put(payload, "driverLastName", rs.getString("driver_lastname")); + put(payload, "driverFirstName", string(rs, "driver_firstname")); + put(payload, "driverLastName", string(rs, "driver_lastname")); put(payload, "fleetId", fleetId); put(payload, "fleetName", fleetName); put(payload, "telematicProviderId", getInteger(rs, "telematic_provider_id")); - put(payload, "telematicProviderName", rs.getString("telematic_provider_name")); + put(payload, "telematicProviderName", string(rs, "telematic_provider_name")); DriverRefDto driverRef = driverId == null && isBlank(driverCard) ? null @@ -74,8 +74,9 @@ public class YellowFoxD8BookingRowMapper { fleetId == null ? null : fleetId.toString(), fleetName, eventId, - rs.getString("key"), + string(rs, "key"), getInteger(rs, "ignition"), + getInteger(rs, "previous_ignition"), getInteger(rs, "eventtype"), getInteger(rs, "state"), occurredAt, @@ -108,6 +109,35 @@ public class YellowFoxD8BookingRowMapper { } } + private String string(ResultSet rs, String column) throws SQLException { + String value = rs.getString(column); + return value == null || value.isBlank() ? null : value.trim(); + } + + @SuppressWarnings("unchecked") + private Map trimPayloadStrings(Map payload) { + payload.replaceAll((key, value) -> trimPayloadValue(value)); + return payload; + } + + @SuppressWarnings("unchecked") + private Object trimPayloadValue(Object value) { + if (value instanceof String text) { + return text.trim(); + } + if (value instanceof Map nestedMap) { + ((Map) nestedMap).replaceAll((key, nestedValue) -> trimPayloadValue(nestedValue)); + return nestedMap; + } + if (value instanceof java.util.List list) { + for (int i = 0; i < list.size(); i++) { + ((java.util.List) list).set(i, trimPayloadValue(list.get(i))); + } + return list; + } + return value; + } + private void put(Map target, String key, Object value) { if (value != null) { target.put(key, value instanceof BigDecimal bd ? bd.stripTrailingZeros().toPlainString() : value); @@ -117,4 +147,12 @@ public class YellowFoxD8BookingRowMapper { private boolean isBlank(String value) { return value == null || value.isBlank(); } + + private String normalizeBookingDriverCardNumber(String value) { + if (value == null || value.isBlank()) { + return null; + } + String trimmed = value.trim(); + return trimmed.length() <= 14 ? trimmed : trimmed.substring(0, 14); + } } diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetector.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetector.java index 5eb536d..f907d26 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetector.java +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetector.java @@ -36,7 +36,10 @@ public class YellowFoxD8IgnitionTransitionDetector { String vehicleKey = booking.vehicleRef().stableKey(); Integer previous = lastIgnitionByVehicle.put(vehicleKey, booking.ignition()); if (previous == null) { - return emitInitialSnapshot ? mapper.mapIgnitionTransition(booking, null) : null; + previous = booking.previousIgnition(); + if (previous == null) { + return emitInitialSnapshot ? mapper.mapIgnitionTransition(booking, null) : null; + } } if (!previous.equals(booking.ignition())) { return mapper.mapIgnitionTransition(booking, previous); diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxMasterDataRefreshService.java b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxMasterDataRefreshService.java index 1ea8069..62fc317 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxMasterDataRefreshService.java +++ b/src/main/java/at/procon/eventhub/yellowfox/service/YellowFoxMasterDataRefreshService.java @@ -1,5 +1,6 @@ package at.procon.eventhub.yellowfox.service; +import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult; import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert; @@ -57,6 +58,7 @@ public class YellowFoxMasterDataRefreshService { private final DriverIdentityRepository driverIdentityRepository; private final VehicleIdentityRepository vehicleIdentityRepository; private final ResourceLoader resourceLoader; + private final EventHubProperties properties; public YellowFoxMasterDataRefreshService( @Qualifier("yellowFoxNamedParameterJdbcTemplate") ObjectProvider yellowFoxJdbcTemplateProvider, @@ -64,7 +66,8 @@ public class YellowFoxMasterDataRefreshService { EventSourceRepository eventSourceRepository, DriverIdentityRepository driverIdentityRepository, VehicleIdentityRepository vehicleIdentityRepository, - ResourceLoader resourceLoader + ResourceLoader resourceLoader, + EventHubProperties properties ) { this.yellowFoxJdbcTemplateProvider = yellowFoxJdbcTemplateProvider; this.sourceMasterDataRepository = sourceMasterDataRepository; @@ -72,6 +75,7 @@ public class YellowFoxMasterDataRefreshService { this.driverIdentityRepository = driverIdentityRepository; this.vehicleIdentityRepository = vehicleIdentityRepository; this.resourceLoader = resourceLoader; + this.properties = properties; } public MasterDataRefreshResult refreshIfRequested(YellowFoxD8ImportRequest request) { @@ -102,14 +106,19 @@ public class YellowFoxMasterDataRefreshService { } int relationCount = streamRelations(yellowFoxJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE)); + boolean syncVehicleRegistrations = properties.getYellowFox().isSyncVehicleRegistrationsOnMasterDataUpdate(); log.info("Reconciling YellowFox driver identities from source master data tenant={} source={}", tenantKey, masterDataSource.stableKey()); int reconciledDrivers = driverIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); - log.info("Reconciling YellowFox vehicle identities from source master data tenant={} source={}", - tenantKey, masterDataSource.stableKey()); - int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId); + log.info("Reconciling YellowFox vehicle identities from source master data tenant={} source={} syncVehicleRegistrations={}", + tenantKey, masterDataSource.stableKey(), syncVehicleRegistrations); + int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData( + tenantKey, + eventSourceId, + syncVehicleRegistrations + ); MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount); log.info("Refreshed YellowFox source master data tenant={} source={} entities={} relations={} reconciledDrivers={} reconciledVehicles={}", diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7d63d7a..0a14871 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -53,6 +53,7 @@ eventhub: tachograph: default-chunk-days: 1 occurred-at-overlap: 7d + sync-vehicle-registrations-on-master-data-update: true # Set TACHOGRAPH_DB_JDBC_URL to enable JdbcTachographExtractionBatchExecutor. datasource: @@ -63,7 +64,7 @@ eventhub: # Enables the scheduler that regularly triggers configured tachograph import plans. # Default is safe: no scheduled import starts unless explicitly enabled. - scheduler-enabled: true + scheduler-enabled: false scheduler-poll-interval-ms: 3600000 # PLAN_ONLY creates import_run + planned extraction packages. @@ -78,7 +79,7 @@ eventhub: # Example plan. Keep disabled until the tachograph datasource/extractor is wired. import-plans: - plan-key: tachograph-org-14708 - enabled: true + enabled: false cron: "0 15 * * * *" # hourly at minute 15 tenant-key: Procon event-source: @@ -115,10 +116,10 @@ eventhub: scheduled-mode: INCREMENTAL_UPDATE initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP scheduled-strategy: SOURCE_PACKAGE_WATERMARK - refresh-master-data-first: true + refresh-master-data-first: false initial-occurred-from: "2026-04-01T00:00:00+01:00" initial-occurred-to: "2026-04-10T00:00:00+01:00" - run-initial-on-startup: true + run-initial-on-startup: false esper-poc: activity-merge-mode: JAVA @@ -135,6 +136,7 @@ eventhub: default-chunk-days: 1 occurred-at-overlap: 2h emit-initial-ignition-snapshot: false + sync-vehicle-registrations-on-master-data-update: false datasource: jdbc-url: ${YELLOWFOX_DB_JDBC_URL:} @@ -142,13 +144,13 @@ eventhub: password: ${YELLOWFOX_DB_PASSWORD:} driver-class-name: org.postgresql.Driver - scheduler-enabled: false + scheduler-enabled: true scheduler-poll-interval-ms: 60000 - scheduler-trigger-mode: PLAN_ONLY + scheduler-trigger-mode: EXECUTE import-plans: - plan-key: yellowfox-d8-default - enabled: false + enabled: true cron: "0 */5 * * * *" tenant-key: Procon event-source: @@ -157,17 +159,20 @@ eventhub: source-key: YELLOWFOX_D8 source-instance-key: logistics-db-prod tenant-provider-setting-key: yellowfox-main + source-group: + type: FLEET + source-entity-id: "7" import-scope: type: TENANT_ALL include-children: false event-families: - DRIVER_ACTIVITY - #- DRIVER_CARD + - DRIVER_CARD initial-mode: INITIAL_BACKFILL - scheduled-mode: INCREMENTAL_UPDATE + scheduled-mode: INITIAL_BACKFILL # INITIAL_BACKFILL, INCREMENTAL_UPDATE initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP scheduled-strategy: SOURCE_ROW_WATERMARK initial-occurred-from: "2026-04-01T00:00:00+01:00" - initial-occurred-to: "2026-04-02T00:00:00+01:00" - refresh-master-data-first: false - run-initial-on-startup: false + initial-occurred-to: "2026-04-10T00:00:00+01:00" + refresh-master-data-first: true + run-initial-on-startup: true diff --git a/src/main/resources/sql/yellowfox/d8-bookings.sql b/src/main/resources/sql/yellowfox/d8-bookings.sql index 7700cbd..574c41c 100644 --- a/src/main/resources/sql/yellowfox/d8-bookings.sql +++ b/src/main/resources/sql/yellowfox/d8-bookings.sql @@ -1,3 +1,12 @@ +with bookings as ( + select + b.*, + lag(b.ignition) over ( + partition by b.vehicle_id + order by b.utc asc, b.eventid asc + ) as previous_ignition + from data.d8_booking b +) select b.eventid, b.utc, @@ -5,6 +14,7 @@ select b.driver_id, b.key, b.ignition, + b.previous_ignition, b.eventtype, b.state, b.odometer, @@ -18,7 +28,7 @@ select d.firstname as driver_firstname, d.name as driver_lastname, - d.drivers_card as driver_card_number, + left(trim(d.drivers_card), 14) as driver_card_number, d.fleet_id as driver_fleet_id, f.id as fleet_id, @@ -26,7 +36,7 @@ select tp.id as telematic_provider_id, tp.name as telematic_provider_name -from data.d8_booking b +from bookings b left join data.vehicle v on v.id = b.vehicle_id left join data.driver d @@ -35,15 +45,5 @@ left join data.fleet f on f.id = coalesce(v.fleet_id, d.fleet_id) left join data.telematic_provider tp on tp.id = v.telematic_provider_id -where (:occurredFrom is null or b.utc >= :occurredFrom) - and (:occurredTo is null or b.utc < :occurredTo) - and (:fleetId is null or f.id = :fleetId) - and ( - :lastOccurredTo is null - or b.utc > :lastOccurredTo - or ( - b.utc = :lastOccurredTo - and (:lastSourceRowId is null or b.eventid > :lastSourceRowId) - ) - ) +/*__FILTERS__*/ order by b.utc asc, b.eventid asc; diff --git a/src/main/resources/sql/yellowfox/master-data/driver-cards.sql b/src/main/resources/sql/yellowfox/master-data/driver-cards.sql index 76839a5..9385696 100644 --- a/src/main/resources/sql/yellowfox/master-data/driver-cards.sql +++ b/src/main/resources/sql/yellowfox/master-data/driver-cards.sql @@ -1,15 +1,15 @@ select 'DRIVER_CARD' as entity_type, cast(d.id as varchar(128)) as source_entity_id, - concat('YELLOWFOX:', d.drivers_card) as source_external_key, - d.drivers_card as display_name, + concat('YELLOWFOX:', left(trim(d.drivers_card), 14)) as source_external_key, + left(trim(d.drivers_card), 14) as display_name, true as active, null::timestamptz as valid_from, null::timestamptz as valid_to, null::timestamptz as source_updated_at, d.id as driver_id, - d.drivers_card as card_number, - 'YELLOWFOX' as card_nation, + left(trim(d.drivers_card), 14) as card_number, + 'UNKNOWN' as card_nation, d.fleet_id as fleet_id from data.driver d where nullif(trim(d.drivers_card), '') is not null; diff --git a/src/main/resources/sql/yellowfox/master-data/drivers.sql b/src/main/resources/sql/yellowfox/master-data/drivers.sql index d2799db..c6e0cd9 100644 --- a/src/main/resources/sql/yellowfox/master-data/drivers.sql +++ b/src/main/resources/sql/yellowfox/master-data/drivers.sql @@ -1,7 +1,7 @@ select 'DRIVER' as entity_type, cast(d.id as varchar(128)) as source_entity_id, - coalesce(nullif(trim(d.drivers_card), ''), cast(d.id as varchar(128))) as source_external_key, + coalesce(nullif(left(trim(d.drivers_card), 14), ''), cast(d.id as varchar(128))) as source_external_key, nullif(trim(concat_ws(' ', d.firstname, d.name)), '') as display_name, true as active, null::timestamptz as valid_from, diff --git a/src/main/resources/sql/yellowfox/master-data/relations.sql b/src/main/resources/sql/yellowfox/master-data/relations.sql index 055b485..b24aae8 100644 --- a/src/main/resources/sql/yellowfox/master-data/relations.sql +++ b/src/main/resources/sql/yellowfox/master-data/relations.sql @@ -43,6 +43,7 @@ select cast(v.id as varchar(128)) as source_row_id from data.vehicle v where v.fleet_id is not null + and nullif(trim(v.vin), '') is not null union all @@ -59,6 +60,7 @@ select cast(v.id as varchar(128)) as source_row_id from data.vehicle v where nullif(trim(v.vrn), '') is not null + and nullif(trim(v.vin), '') is not null union all @@ -74,4 +76,5 @@ select 'data.vehicle' as source_table, cast(v.id as varchar(128)) as source_row_id from data.vehicle v -where v.telematic_provider_id is not null; +where v.telematic_provider_id is not null + and nullif(trim(v.vin), '') is not null; diff --git a/src/main/resources/sql/yellowfox/master-data/vehicle-registrations.sql b/src/main/resources/sql/yellowfox/master-data/vehicle-registrations.sql index f218931..bb6490c 100644 --- a/src/main/resources/sql/yellowfox/master-data/vehicle-registrations.sql +++ b/src/main/resources/sql/yellowfox/master-data/vehicle-registrations.sql @@ -1,16 +1,16 @@ select 'VEHICLE_REGISTRATION' as entity_type, cast(v.id as varchar(128)) as source_entity_id, - concat('YELLOWFOX:', v.vrn) as source_external_key, - v.vrn as display_name, + concat('YELLOWFOX:', trim(v.vrn)) as source_external_key, + trim(v.vrn) as display_name, true as active, null::timestamptz as valid_from, null::timestamptz as valid_to, null::timestamptz as source_updated_at, v.id as vehicle_id, - v.vin as vin, - 'YELLOWFOX' as registration_nation, - v.vrn as registration_number, + trim(v.vin) as vin, + 'UNKNOWN' as registration_nation, + trim(v.vrn) as registration_number, v.fleet_id as fleet_id from data.vehicle v where nullif(trim(v.vrn), '') is not null; diff --git a/src/main/resources/sql/yellowfox/master-data/vehicles.sql b/src/main/resources/sql/yellowfox/master-data/vehicles.sql index de08ff9..9fa9d33 100644 --- a/src/main/resources/sql/yellowfox/master-data/vehicles.sql +++ b/src/main/resources/sql/yellowfox/master-data/vehicles.sql @@ -1,16 +1,17 @@ select 'VEHICLE' as entity_type, cast(v.id as varchar(128)) as source_entity_id, - coalesce(nullif(trim(v.vin), ''), cast(v.id as varchar(128))) as source_external_key, - coalesce(nullif(trim(v.vrn), ''), nullif(trim(v.vin), ''), cast(v.id as varchar(128))) as display_name, + trim(v.vin) as source_external_key, + coalesce(nullif(trim(v.vrn), ''), trim(v.vin)) as display_name, true as active, null::timestamptz as valid_from, null::timestamptz as valid_to, null::timestamptz as source_updated_at, v.id as vehicle_id, - v.vin as vin, - v.vrn as registration_number, - 'YELLOWFOX' as registration_nation, + trim(v.vin) as vin, + trim(v.vrn) as registration_number, + 'UNKNOWN' as registration_nation, v.fleet_id as fleet_id, v.telematic_provider_id as telematic_provider_id -from data.vehicle v; +from data.vehicle v +where nullif(trim(v.vin), '') is not null; diff --git a/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java index 5af598f..02aafd5 100644 --- a/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java +++ b/src/test/java/at/procon/eventhub/YellowFoxD8BookingEventMapperTest.java @@ -83,6 +83,7 @@ class YellowFoxD8BookingEventMapperTest { "event-1", "key-1", ignition, + null, eventType, state, OffsetDateTime.parse("2026-04-29T08:15:00+02:00"), diff --git a/src/test/java/at/procon/eventhub/importing/ImportChunkPlannerTest.java b/src/test/java/at/procon/eventhub/importing/ImportChunkPlannerTest.java new file mode 100644 index 0000000..8da0ad4 --- /dev/null +++ b/src/test/java/at/procon/eventhub/importing/ImportChunkPlannerTest.java @@ -0,0 +1,42 @@ +package at.procon.eventhub.importing; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.time.OffsetDateTime; +import java.util.EnumSet; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class ImportChunkPlannerTest { + + @Test + void keepsScheduledRowWatermarkImportsAsSingleChunkEvenWhenScopeHasWindow() { + ImportChunkPlanner planner = new ImportChunkPlanner(); + YellowFoxD8ImportRequest request = new YellowFoxD8ImportRequest( + "tenant-1", + new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "instance-1", "setting-1", null), + (SourceGroupRefDto) null, + ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-04-01T00:00:00+02:00"), + OffsetDateTime.parse("2026-04-10T00:00:00+02:00") + ), + EnumSet.noneOf(EventFamily.class), + ImportMode.INCREMENTAL_UPDATE, + false, + AcquisitionStrategy.SOURCE_ROW_WATERMARK + ); + + assertThat(planner.chunksFor(request, 1)) + .containsExactly(new ImportTimeChunkDto( + 1, + OffsetDateTime.parse("2026-04-01T00:00:00+02:00"), + OffsetDateTime.parse("2026-04-10T00:00:00+02:00") + )); + } +} diff --git a/src/test/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutorCursorTest.java b/src/test/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutorCursorTest.java new file mode 100644 index 0000000..046995d --- /dev/null +++ b/src/test/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutorCursorTest.java @@ -0,0 +1,187 @@ +package at.procon.eventhub.importing.extraction; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportCursorStateDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.importing.ExtractionBatchResult; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportRunRequest; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.persistence.ImportCursorRepository; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class AbstractJdbcExtractionBatchExecutorCursorTest { + + @Test + void bootstrapsWatermarkCursorFromLatestExistingCursorWhenExactCursorIsMissing() { + ImportCursorRepository repository = mock(ImportCursorRepository.class); + TestExecutor executor = new TestExecutor(repository); + TestRequest request = new TestRequest(); + ImportPlanItemDto planItem = new ImportPlanItemDto( + EventFamily.DRIVER_ACTIVITY, + "VEHICLE_UNIT", + "VU_ACTIVITY", + List.of("VUActivity"), + "VEHICLE", + "Vehicle activity", + AcquisitionStrategy.SOURCE_ROW_WATERMARK + ); + ImportCursorStateDto fallbackCursor = new ImportCursorStateDto( + null, + "9001", + null, + OffsetDateTime.parse("2026-04-09T23:59:59+02:00") + ); + + when(repository.findCursor( + "tenant-1", + 21, + request.importScope().stableKey(), + EventFamily.DRIVER_ACTIVITY, + "VEHICLE_UNIT", + AcquisitionStrategy.SOURCE_ROW_WATERMARK + )).thenReturn(null); + when(repository.findLatestCursor( + "tenant-1", + 21, + request.importScope().stableKey(), + EventFamily.DRIVER_ACTIVITY, + "VEHICLE_UNIT" + )).thenReturn(fallbackCursor); + + assertThat(executor.resolveCursor(21, request, planItem)).isEqualTo(fallbackCursor); + + verify(repository).findLatestCursor( + "tenant-1", + 21, + request.importScope().stableKey(), + EventFamily.DRIVER_ACTIVITY, + "VEHICLE_UNIT" + ); + } + + private static final class TestExecutor extends AbstractJdbcExtractionBatchExecutor { + + private TestExecutor(ImportCursorRepository repository) { + super(null, null, null, new EventHubProperties(), null, repository); + } + + private ImportCursorStateDto resolveCursor(int eventSourceId, TestRequest request, ImportPlanItemDto planItem) { + return findCursor(eventSourceId, request, planItem); + } + + @Override + protected Optional> findDefinition(String code) { + return Optional.empty(); + } + + @Override + protected EventSourceDto eventSourceFor(TestRequest request, ImportPlanItemDto planItem) { + return request.eventSource(); + } + + @Override + protected TestResult resultFor(UUID packageId, ImportPlanItemDto planItem, ImportTimeChunkDto chunk, ImportCursorStateDto cursor, ExtractedEventStats stats) { + return new TestResult(); + } + } + + private record TestRequest() implements ImportRunRequest { + + @Override + public String tenantKey() { + return "tenant-1"; + } + + @Override + public EventSourceDto eventSource() { + return new EventSourceDto("TACHOGRAPH", "VEHICLE_UNIT", "TACHOGRAPH_VEHICLE_UNIT", "instance-1", "setting-1", null); + } + + @Override + public at.procon.eventhub.dto.SourceGroupRefDto sourceGroup() { + return null; + } + + @Override + public ImportScopeDto importScope() { + return ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-04-01T00:00:00+02:00"), + OffsetDateTime.parse("2026-04-10T00:00:00+02:00") + ); + } + + @Override + public Set eventFamilies() { + return Set.of(EventFamily.DRIVER_ACTIVITY); + } + + @Override + public ImportMode mode() { + return ImportMode.INCREMENTAL_UPDATE; + } + + @Override + public boolean refreshMasterDataFirst() { + return false; + } + + @Override + public AcquisitionStrategy acquisitionStrategy() { + return AcquisitionStrategy.SOURCE_ROW_WATERMARK; + } + } + + private static final class TestResult implements ExtractionBatchResult { + + @Override + public int eventsInserted() { + return 0; + } + + @Override + public boolean executed() { + return false; + } + + @Override + public OffsetDateTime lastSourcePackageImportedAt() { + return null; + } + + @Override + public String lastSourcePackageId() { + return null; + } + + @Override + public OffsetDateTime lastSourceRowUpdatedAt() { + return null; + } + + @Override + public OffsetDateTime lastOccurredTo() { + return null; + } + + @Override + public Map eventTypeCounts() { + return Map.of(); + } + } +} diff --git a/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest.java b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest.java new file mode 100644 index 0000000..55c57b6 --- /dev/null +++ b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest.java @@ -0,0 +1,105 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportCursorStateDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import at.procon.eventhub.dto.SourceGroupType; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.persistence.ImportCursorRepository; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.time.OffsetDateTime; +import java.util.EnumSet; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.DefaultResourceLoader; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest { + + @Test + void bootstrapsScheduledWatermarkCursorFromLatestExistingCursorWhenExactCursorIsMissing() { + ImportCursorRepository repository = mock(ImportCursorRepository.class); + JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(repository); + YellowFoxD8ImportRequest request = request(); + ImportPlanItemDto planItem = new ImportPlanItemDto( + EventFamily.DRIVER_ACTIVITY, + "TELEMATICS_PLATFORM", + "YELLOWFOX_D8_BOOKING", + List.of("data.d8_booking"), + "BOTH", + "YellowFox bookings", + AcquisitionStrategy.SOURCE_ROW_WATERMARK + ); + ImportCursorStateDto fallbackCursor = new ImportCursorStateDto( + null, + "4711", + null, + OffsetDateTime.parse("2026-04-09T23:59:59+02:00") + ); + + when(repository.findCursor( + "tenant-1", + 17, + request.importScope().stableKey(), + EventFamily.DRIVER_ACTIVITY, + "TELEMATICS_PLATFORM", + AcquisitionStrategy.SOURCE_ROW_WATERMARK + )).thenReturn(null); + when(repository.findLatestCursor( + "tenant-1", + 17, + request.importScope().stableKey(), + EventFamily.DRIVER_ACTIVITY, + "TELEMATICS_PLATFORM" + )).thenReturn(fallbackCursor); + + assertThat(executor.findCursor(17, request, planItem)).isEqualTo(fallbackCursor); + + verify(repository).findLatestCursor( + "tenant-1", + 17, + request.importScope().stableKey(), + EventFamily.DRIVER_ACTIVITY, + "TELEMATICS_PLATFORM" + ); + } + + private JdbcYellowFoxD8BookingExtractionBatchExecutor executor(ImportCursorRepository repository) { + EventHubProperties properties = new EventHubProperties(); + return new JdbcYellowFoxD8BookingExtractionBatchExecutor( + null, + null, + new DefaultResourceLoader(), + repository, + properties, + null, + null, + null + ); + } + + private YellowFoxD8ImportRequest request() { + return new YellowFoxD8ImportRequest( + "tenant-1", + new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "instance-1", "setting-1", null), + new SourceGroupRefDto(SourceGroupType.FLEET, "7", null, "Fleet 7"), + ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-04-01T00:00:00+02:00"), + OffsetDateTime.parse("2026-04-10T00:00:00+02:00") + ), + EnumSet.of(EventFamily.DRIVER_ACTIVITY), + ImportMode.INCREMENTAL_UPDATE, + false, + AcquisitionStrategy.SOURCE_ROW_WATERMARK + ); + } +} diff --git a/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorTest.java b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorTest.java new file mode 100644 index 0000000..583db11 --- /dev/null +++ b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorTest.java @@ -0,0 +1,110 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.ImportCursorStateDto; +import at.procon.eventhub.dto.SourceGroupRefDto; +import at.procon.eventhub.dto.SourceGroupType; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.EnumSet; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.DefaultResourceLoader; + +import static org.assertj.core.api.Assertions.assertThat; + +class JdbcYellowFoxD8BookingExtractionBatchExecutorTest { + + @Test + void omitsCursorParametersWhenNoCursorExists() { + JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(Duration.ofHours(2)); + + JdbcYellowFoxD8BookingExtractionBatchExecutor.QuerySpec query = executor.buildQuerySpec( + request(AcquisitionStrategy.SOURCE_ROW_WATERMARK), + ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-04-01T00:00:00+02:00"), + OffsetDateTime.parse("2026-04-02T00:00:00+02:00") + ), + null + ); + + assertThat(query.sql()).contains("b.utc >= :occurredFrom"); + assertThat(query.sql()).contains("b.utc < :occurredTo"); + assertThat(query.sql()).contains("f.id = :fleetId"); + assertThat(query.sql()).doesNotContain(":lastOccurredTo"); + assertThat(query.sql()).doesNotContain(" is null"); + assertThat(query.params()).containsOnlyKeys("occurredFrom", "occurredTo", "fleetId"); + assertThat(query.fleetId()).isEqualTo(7); + } + + @Test + void keepsStrictUtcEventIdCursorWhenOverlapIsDisabled() { + JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(Duration.ZERO); + OffsetDateTime cursorTime = OffsetDateTime.parse("2026-04-02T09:15:00+02:00"); + + JdbcYellowFoxD8BookingExtractionBatchExecutor.QuerySpec query = executor.buildQuerySpec( + request(AcquisitionStrategy.SOURCE_ROW_WATERMARK), + ImportScopeDto.tenantAll(null, null), + new ImportCursorStateDto(null, "4711", null, cursorTime) + ); + + assertThat(query.sql()).contains("b.utc > :lastOccurredTo"); + assertThat(query.sql()).contains("b.utc = :lastOccurredTo"); + assertThat(query.sql()).contains("b.eventid > :lastSourceRowId"); + assertThat(query.sql()).doesNotContain(" is null"); + assertThat(query.params()).containsEntry("lastOccurredTo", cursorTime); + assertThat(query.params()).containsEntry("lastSourceRowId", "4711"); + } + + @Test + void keepsOccurredToCapOnceCursorExistsForWatermarkIncrementalRuns() { + JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(Duration.ZERO); + OffsetDateTime cursorTime = OffsetDateTime.parse("2026-04-02T09:15:00+02:00"); + + JdbcYellowFoxD8BookingExtractionBatchExecutor.QuerySpec query = executor.buildQuerySpec( + request(AcquisitionStrategy.SOURCE_ROW_WATERMARK), + ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-04-01T00:00:00+02:00"), + OffsetDateTime.parse("2026-04-02T00:00:00+02:00") + ), + new ImportCursorStateDto(null, "4711", null, cursorTime) + ); + + assertThat(query.sql()).contains("b.utc >= :occurredFrom"); + assertThat(query.sql()).contains("b.utc < :occurredTo"); + assertThat(query.params()).containsEntry("occurredFrom", OffsetDateTime.parse("2026-04-01T00:00:00+02:00")); + assertThat(query.params()).containsEntry("occurredTo", OffsetDateTime.parse("2026-04-02T00:00:00+02:00")); + } + + private JdbcYellowFoxD8BookingExtractionBatchExecutor executor(Duration overlap) { + EventHubProperties properties = new EventHubProperties(); + properties.getYellowFox().setOccurredAtOverlap(overlap); + return new JdbcYellowFoxD8BookingExtractionBatchExecutor( + null, + null, + new DefaultResourceLoader(), + null, + properties, + null, + null, + null + ); + } + + private YellowFoxD8ImportRequest request(AcquisitionStrategy strategy) { + return new YellowFoxD8ImportRequest( + "tenant-1", + new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "instance-1", "setting-1", null), + new SourceGroupRefDto(SourceGroupType.FLEET, "7", null, "Fleet 7"), + ImportScopeDto.tenantAll(null, null), + EnumSet.noneOf(at.procon.eventhub.dto.EventFamily.class), + ImportMode.INCREMENTAL_UPDATE, + false, + strategy + ); + } +} diff --git a/src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapperTest.java b/src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapperTest.java new file mode 100644 index 0000000..8aedfc4 --- /dev/null +++ b/src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8BookingRowMapperTest.java @@ -0,0 +1,90 @@ +package at.procon.eventhub.yellowfox.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class YellowFoxD8BookingRowMapperTest { + + @Test + void trimsVehicleRegistrationValuesInReferencesAndPayload() throws SQLException { + YellowFoxD8BookingRowMapper mapper = new YellowFoxD8BookingRowMapper(new ObjectMapper()); + ResultSet rs = mock(ResultSet.class); + OffsetDateTime occurredAt = OffsetDateTime.parse("2026-05-08T10:15:30+02:00"); + + when(rs.getString("eventid")).thenReturn(" evt-1 "); + when(rs.getObject("utc", OffsetDateTime.class)).thenReturn(occurredAt); + when(rs.getInt("vehicle_id")).thenReturn(42); + when(rs.getInt("driver_id")).thenReturn(7); + when(rs.getInt("fleet_id")).thenReturn(9); + when(rs.getInt("odometer")).thenReturn(1234); + when(rs.getInt("telematic_provider_id")).thenReturn(3); + when(rs.getInt("ignition")).thenReturn(1); + when(rs.getInt("previous_ignition")).thenReturn(0); + when(rs.getInt("eventtype")).thenReturn(2); + when(rs.getInt("state")).thenReturn(3); + when(rs.wasNull()).thenReturn(false); + when(rs.getString("vehicle_vrn")).thenReturn(" W-12345 "); + when(rs.getString("vehicle_vin")).thenReturn(" vin-123 "); + when(rs.getString("driver_card_number")).thenReturn(" card-9 "); + when(rs.getString("fleet_name")).thenReturn(" Fleet 9 "); + when(rs.getString("driver_firstname")).thenReturn(" Ada "); + when(rs.getString("driver_lastname")).thenReturn(" Lovelace "); + when(rs.getString("telematic_provider_name")).thenReturn(" YellowFox "); + when(rs.getString("key")).thenReturn(" booking-key "); + when(rs.getString("payload")).thenReturn(""" + {"vrn":" W-12345 ","nested":{"label":" value "},"list":[" x ",1]} + """); + + var booking = mapper.map(rs, "tenant-1", "instance-1", "setting-1"); + + assertThat(booking.eventId()).isEqualTo("evt-1"); + assertThat(booking.key()).isEqualTo("booking-key"); + assertThat(booking.previousIgnition()).isEqualTo(0); + assertThat(booking.vehicleRef().vin()).isEqualTo("VIN-123"); + assertThat(booking.vehicleRef().vehicleRegistration().number()).isEqualTo("W-12345"); + assertThat(booking.payload()).containsEntry("vrn", "W-12345"); + assertThat(booking.payload()).containsEntry("vehicleVrn", "W-12345"); + assertThat(booking.payload()).containsEntry("driverFirstName", "Ada"); + assertThat(booking.payload()).containsEntry("telematicProviderName", "YellowFox"); + assertThat(((Map) booking.payload().get("nested")).get("label")).isEqualTo("value"); + assertThat((List) booking.payload().get("list")).containsExactly("x", 1); + } + + @Test + void truncatesBookingDriverCardNumberToFirst14Characters() throws SQLException { + YellowFoxD8BookingRowMapper mapper = new YellowFoxD8BookingRowMapper(new ObjectMapper()); + ResultSet rs = mock(ResultSet.class); + OffsetDateTime occurredAt = OffsetDateTime.parse("2026-05-08T10:15:30+02:00"); + + when(rs.getString("eventid")).thenReturn("evt-2"); + when(rs.getObject("utc", OffsetDateTime.class)).thenReturn(occurredAt); + when(rs.getInt("vehicle_id")).thenReturn(0); + when(rs.getInt("driver_id")).thenReturn(0); + when(rs.getInt("fleet_id")).thenReturn(0); + when(rs.getInt("odometer")).thenReturn(0); + when(rs.getInt("telematic_provider_id")).thenReturn(0); + when(rs.getInt("ignition")).thenReturn(1); + when(rs.getInt("previous_ignition")).thenReturn(0); + when(rs.getInt("eventtype")).thenReturn(2); + when(rs.getInt("state")).thenReturn(3); + when(rs.wasNull()).thenReturn(true); + when(rs.getString("driver_card_number")).thenReturn(" 12345678901234AB "); + when(rs.getString("payload")).thenReturn("{}"); + + var booking = mapper.map(rs, "tenant-1", "instance-1", "setting-1"); + + assertThat(booking.driverRef()).isNotNull(); + assertThat(booking.driverRef().driverCard()).isNotNull(); + assertThat(booking.driverRef().driverCard().number()).isEqualTo("12345678901234"); + assertThat(booking.payload()).containsEntry("driverCardNumber", "12345678901234"); + } +} diff --git a/src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanServiceTest.java b/src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanServiceTest.java new file mode 100644 index 0000000..4a0a0d6 --- /dev/null +++ b/src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8ConfiguredImportPlanServiceTest.java @@ -0,0 +1,39 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportMode; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.ImportScopeType; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; +import java.time.OffsetDateTime; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class YellowFoxD8ConfiguredImportPlanServiceTest { + + @Test + void scheduledWatermarkRequestKeepsInitialOccurredWindowAsBootstrapScope() { + EventHubProperties properties = new EventHubProperties(); + EventHubProperties.ConfiguredImportPlan plan = new EventHubProperties.ConfiguredImportPlan(); + plan.setPlanKey("yellowfox-d8-default"); + plan.setTenantKey("Procon"); + plan.setEventSource(new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "logistics-db-prod", "yellowfox-main", null)); + plan.setImportScope(new ImportScopeDto(ImportScopeType.TENANT_ALL, null, false, null, null)); + plan.setScheduledMode(ImportMode.INCREMENTAL_UPDATE); + plan.setScheduledStrategy(AcquisitionStrategy.SOURCE_ROW_WATERMARK); + plan.setInitialOccurredFrom(OffsetDateTime.parse("2026-04-01T00:00:00+02:00")); + plan.setInitialOccurredTo(OffsetDateTime.parse("2026-04-02T00:00:00+02:00")); + properties.getYellowFox().getImportPlans().add(plan); + + YellowFoxD8ConfiguredImportPlanService service = new YellowFoxD8ConfiguredImportPlanService(properties); + YellowFoxD8ImportRequest request = service.createScheduledRequest(plan); + + assertThat(request.mode()).isEqualTo(ImportMode.INCREMENTAL_UPDATE); + assertThat(request.acquisitionStrategy()).isEqualTo(AcquisitionStrategy.SOURCE_ROW_WATERMARK); + assertThat(request.importScope().occurredFrom()).isEqualTo(OffsetDateTime.parse("2026-04-01T00:00:00+02:00")); + assertThat(request.importScope().occurredTo()).isEqualTo(OffsetDateTime.parse("2026-04-02T00:00:00+02:00")); + } +} diff --git a/src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetectorTest.java b/src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetectorTest.java new file mode 100644 index 0000000..e91ef86 --- /dev/null +++ b/src/test/java/at/procon/eventhub/yellowfox/service/YellowFoxD8IgnitionTransitionDetectorTest.java @@ -0,0 +1,65 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.service.EventDetailsFactory; +import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.OffsetDateTime; +import java.util.Map; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class YellowFoxD8IgnitionTransitionDetectorTest { + + @Test + void emitsBoundaryTransitionFromPreviousIgnitionFromSourceRow() { + YellowFoxD8BookingEventMapper mapper = new YellowFoxD8BookingEventMapper(new EventDetailsFactory(new ObjectMapper())); + YellowFoxD8IgnitionTransitionDetector detector = new YellowFoxD8IgnitionTransitionDetector(mapper); + + var event = detector.newSession(false).detect(booking(1, 0)); + + assertThat(event).isNotNull(); + assertThat(event.eventDomain()).isEqualTo(EventDomain.IGNITION); + assertThat(event.eventType()).isEqualTo(EventType.IGNITION_ON); + } + + @Test + void doesNotEmitWhenCurrentIgnitionMatchesPreviousIgnitionFromSourceRow() { + YellowFoxD8BookingEventMapper mapper = new YellowFoxD8BookingEventMapper(new EventDetailsFactory(new ObjectMapper())); + YellowFoxD8IgnitionTransitionDetector detector = new YellowFoxD8IgnitionTransitionDetector(mapper); + + var event = detector.newSession(false).detect(booking(1, 1)); + + assertThat(event).isNull(); + } + + private YellowFoxD8BookingDto booking(Integer ignition, Integer previousIgnition) { + return new YellowFoxD8BookingDto( + "tenant-1", + "instance-1", + "setting-1", + "7", + "Fleet 7", + "evt-1", + "key-1", + ignition, + previousIgnition, + 2, + 3, + OffsetDateTime.parse("2026-05-08T10:15:30+02:00"), + null, + new DriverRefDto("1", new DriverCardRefDto(YellowFoxReferenceSemantics.SYNTHETIC_REFERENCE_NATION, "12345678901234")), + new VehicleRefDto("42", "VIN-42", "42", new VehicleRegistrationRefDto(YellowFoxReferenceSemantics.SYNTHETIC_REFERENCE_NATION, "W-4242")), + 123_000L, + null, + null, + Map.of() + ); + } +}