Fix YellowFox bounded import cursor and ignition handling

This commit is contained in:
trifonovt 2026-05-08 16:50:34 +02:00
parent a0242eedee
commit e84dfef614
29 changed files with 1267 additions and 187 deletions

View File

@ -0,0 +1,21 @@
-- Truncate all application data in the eventhub schema.
-- Keeps the schema and Flyway migration history intact.
-- Intended for PostgreSQL / TimescaleDB environments.
DO $$
DECLARE
table_list text;
BEGIN
SELECT string_agg(format('%I.%I', schemaname, tablename), ', ' ORDER BY tablename)
INTO table_list
FROM pg_tables
WHERE schemaname = 'eventhub';
IF table_list IS NULL THEN
RAISE NOTICE 'No tables found in schema eventhub.';
RETURN;
END IF;
EXECUTE 'TRUNCATE TABLE ' || table_list || ' RESTART IDENTITY CASCADE';
END
$$;

View File

@ -224,6 +224,9 @@ public class EventHubProperties {
/** How JDBC extraction batches are handed over to the ingest pipeline. */
private JdbcExtractionIngestMode jdbcExtractionIngestMode = JdbcExtractionIngestMode.SYNC_DIRECT;
/** Whether master-data refresh reconciles vehicle registrations and assignments. */
private boolean syncVehicleRegistrationsOnMasterDataUpdate = true;
/** Configured tenant/source import plans. */
private List<ConfiguredImportPlan> importPlans = new ArrayList<>();
@ -280,6 +283,14 @@ public class EventHubProperties {
: jdbcExtractionIngestMode;
}
public boolean isSyncVehicleRegistrationsOnMasterDataUpdate() {
return syncVehicleRegistrationsOnMasterDataUpdate;
}
public void setSyncVehicleRegistrationsOnMasterDataUpdate(boolean syncVehicleRegistrationsOnMasterDataUpdate) {
this.syncVehicleRegistrationsOnMasterDataUpdate = syncVehicleRegistrationsOnMasterDataUpdate;
}
public List<ConfiguredImportPlan> getImportPlans() {
return importPlans;
}
@ -354,6 +365,9 @@ public class EventHubProperties {
/** Emit a first ignition snapshot per vehicle if no previous D8 ignition state exists in the imported window. */
private boolean emitInitialIgnitionSnapshot = false;
/** Whether master-data refresh reconciles vehicle registrations and assignments. */
private boolean syncVehicleRegistrationsOnMasterDataUpdate = true;
/** Configured tenant/source import plans. */
private List<ConfiguredImportPlan> importPlans = new ArrayList<>();
@ -412,6 +426,14 @@ public class EventHubProperties {
this.emitInitialIgnitionSnapshot = emitInitialIgnitionSnapshot;
}
public boolean isSyncVehicleRegistrationsOnMasterDataUpdate() {
return syncVehicleRegistrationsOnMasterDataUpdate;
}
public void setSyncVehicleRegistrationsOnMasterDataUpdate(boolean syncVehicleRegistrationsOnMasterDataUpdate) {
this.syncVehicleRegistrationsOnMasterDataUpdate = syncVehicleRegistrationsOnMasterDataUpdate;
}
public List<ConfiguredImportPlan> getImportPlans() {
return importPlans;
}

View File

@ -64,10 +64,15 @@ public abstract class AbstractConfiguredImportPlanService<R extends ImportRunReq
AcquisitionStrategy strategy = strategyOverride == null
? (mode == ImportMode.INCREMENTAL_UPDATE ? plan.getScheduledStrategy() : plan.getInitialStrategy())
: strategyOverride;
return buildRequest(plan, mode, strategy, scopedForRequest(plan, applyInitialOccurredWindow));
return buildRequest(plan, mode, strategy, scopedForRequest(plan, mode, strategy, applyInitialOccurredWindow));
}
private ImportScopeDto scopedForRequest(EventHubProperties.ConfiguredImportPlan plan, boolean applyInitialOccurredWindow) {
private ImportScopeDto scopedForRequest(
EventHubProperties.ConfiguredImportPlan plan,
ImportMode mode,
AcquisitionStrategy strategy,
boolean applyInitialOccurredWindow
) {
ImportScopeDto scope = plan.getImportScope();
if (applyInitialOccurredWindow && scope != null
&& (plan.getInitialOccurredFrom() != null || plan.getInitialOccurredTo() != null)) {
@ -79,9 +84,27 @@ public abstract class AbstractConfiguredImportPlanService<R extends ImportRunReq
plan.getInitialOccurredTo() == null ? scope.occurredTo() : plan.getInitialOccurredTo()
);
}
if (mode == ImportMode.INCREMENTAL_UPDATE
&& isWatermarkStrategy(strategy)
&& scope != null
&& scope.occurredFrom() == null
&& plan.getInitialOccurredFrom() != null) {
return new ImportScopeDto(
scope.type(),
scope.rootSourceOrganisation(),
scope.includeChildren(),
plan.getInitialOccurredFrom(),
scope.occurredTo() == null ? plan.getInitialOccurredTo() : scope.occurredTo()
);
}
return scope;
}
private boolean isWatermarkStrategy(AcquisitionStrategy strategy) {
return strategy == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK
|| strategy == AcquisitionStrategy.SOURCE_ROW_WATERMARK;
}
private D planByKey(String planKey) {
return toDto(rawPlanByKey(planKey));
}

View File

@ -17,7 +17,7 @@ public class ImportChunkPlanner {
OffsetDateTime to = scope == null ? null : scope.occurredTo();
if (request.mode() == ImportMode.INCREMENTAL_UPDATE
&& request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) {
&& isWatermarkStrategy(request.acquisitionStrategy())) {
return List.of(new ImportTimeChunkDto(1, from, to));
}
@ -39,4 +39,9 @@ public class ImportChunkPlanner {
}
return chunks.isEmpty() ? List.of(new ImportTimeChunkDto(1, from, to)) : chunks;
}
private boolean isWatermarkStrategy(AcquisitionStrategy strategy) {
return strategy == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK
|| strategy == AcquisitionStrategy.SOURCE_ROW_WATERMARK;
}
}

View File

@ -88,15 +88,7 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
packageInfo
);
String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
ImportCursorStateDto cursor = importCursorRepository.findCursor(
request.tenantKey(),
eventSourceId,
scopeHash,
planItem.eventFamily(),
planItem.sourceKind(),
request.acquisitionStrategy()
);
ImportCursorStateDto cursor = findCursor(eventSourceId, request, planItem);
Map<String, Object> params = parameters(request, chunkScope, cursor);
String sql = loadSql(definition.sqlResource());
@ -290,6 +282,34 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
return params;
}
protected ImportCursorStateDto findCursor(int eventSourceId, R request, ImportPlanItemDto planItem) {
String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
ImportCursorStateDto cursor = importCursorRepository.findCursor(
request.tenantKey(),
eventSourceId,
scopeHash,
planItem.eventFamily(),
planItem.sourceKind(),
request.acquisitionStrategy()
);
if (cursor != null || !shouldBootstrapWatermarkCursor(request)) {
return cursor;
}
return importCursorRepository.findLatestCursor(
request.tenantKey(),
eventSourceId,
scopeHash,
planItem.eventFamily(),
planItem.sourceKind()
);
}
private boolean shouldBootstrapWatermarkCursor(R request) {
return request.mode() == at.procon.eventhub.dto.ImportMode.INCREMENTAL_UPDATE
&& (request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_ROW_WATERMARK
|| request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK);
}
protected OffsetDateTime lastSourcePackageImportedAt(ExtractedEventStats stats, ImportCursorStateDto cursor) {
return stats.lastSourcePackageImportedAt() == null
? cursor == null ? null : cursor.lastSourcePackageImportedAt()

View File

@ -56,6 +56,45 @@ public class ImportCursorRepository {
);
}
public ImportCursorStateDto findLatestCursor(
String tenantKey,
int eventSourceId,
String scopeHash,
EventFamily eventFamily,
String sourceKind
) {
return jdbcTemplate.query(
"""
select last_source_package_imported_at,
last_source_package_id,
last_source_row_updated_at,
last_occurred_to
from eventhub.import_cursor
where tenant_key = ?
and event_source_id = ?
and scope_hash = ?
and event_family = ?
and source_kind = ?
order by coalesce(last_occurred_to, last_source_row_updated_at, last_source_package_imported_at) desc nulls last,
updated_at desc
limit 1
""",
rs -> rs.next()
? new ImportCursorStateDto(
rs.getObject("last_source_package_imported_at", java.time.OffsetDateTime.class),
rs.getString("last_source_package_id"),
rs.getObject("last_source_row_updated_at", java.time.OffsetDateTime.class),
rs.getObject("last_occurred_to", java.time.OffsetDateTime.class)
)
: null,
tenantKey,
eventSourceId,
scopeHash,
eventFamily.name(),
sourceKind
);
}
public void advanceCursor(
String tenantKey,
int eventSourceId,

View File

@ -16,6 +16,9 @@ import org.springframework.transaction.annotation.Transactional;
@Repository
public class DriverIdentityRepository {
private static final String YELLOWFOX_SYNTHETIC_REFERENCE_NATION = "YELLOWFOX";
private static final String UNKNOWN_CARD_NATION = "UNKNOWN";
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@ -37,7 +40,7 @@ public class DriverIdentityRepository {
String sourceDriverEntityId = normalizeNullable(driverRef.sourceEntityId());
DriverCardRefDto driverCard = driverRef.driverCard();
String cardNation = driverCard == null ? null : normalizeNullable(driverCard.nation());
String cardNumber = driverCard == null ? null : normalizeNullable(driverCard.number());
String cardNumber = driverCard == null ? null : normalizeDriverCardNumber(cardNation, driverCard.number());
UUID driverId = findBySourceDriverEntityId(normalizedTenantKey, eventSourceId, sourceDriverEntityId);
UUID driverCardId = resolveOrCreateDriverCardId(cardNation, cardNumber, driverId);
@ -120,10 +123,15 @@ public class DriverIdentityRepository {
master.payload,
coalesce(identity.driver_id, gen_random_uuid()) as driver_id
from master_drivers master
left join eventhub.source_driver_identity identity
on identity.tenant_key = ?
left join lateral (
select identity.driver_id
from eventhub.source_driver_identity identity
where identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_entity_id = master.source_driver_entity_id
order by identity.updated_at desc, identity.id desc
limit 1
) identity on true
)
insert into eventhub.driver(
id, tenant_key, event_source_id, source_driver_entity_id,
@ -184,10 +192,15 @@ public class DriverIdentityRepository {
master.payload,
coalesce(identity.driver_id, gen_random_uuid()) as driver_id
from master_drivers master
left join eventhub.source_driver_identity identity
on identity.tenant_key = ?
left join lateral (
select identity.driver_id
from eventhub.source_driver_identity identity
where identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_entity_id = master.source_driver_entity_id
order by identity.updated_at desc, identity.id desc
limit 1
) identity on true
)
insert into eventhub.driver(
id, first_names, last_name, birth_date, source_updated_at, payload, updated_at
@ -241,10 +254,15 @@ public class DriverIdentityRepository {
payload = driver.payload || master.payload,
updated_at = now()
from master_drivers master
join eventhub.source_driver_identity identity
on identity.tenant_key = ?
join lateral (
select identity.driver_id
from eventhub.source_driver_identity identity
where identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_entity_id = master.source_driver_entity_id
order by identity.updated_at desc, identity.id desc
limit 1
) identity on true
where identity.driver_id = driver.id
""",
eventSourceId,
@ -280,10 +298,15 @@ public class DriverIdentityRepository {
limit 1
)) as driver_id
from master_drivers master
left join eventhub.source_driver_identity identity
on identity.tenant_key = ?
left join lateral (
select identity.driver_id
from eventhub.source_driver_identity identity
where identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_entity_id = master.source_driver_entity_id
order by identity.updated_at desc, identity.id desc
limit 1
) identity on true
)
insert into eventhub.source_driver_identity(
id, tenant_key, event_source_id, source_driver_entity_id,
@ -515,10 +538,15 @@ public class DriverIdentityRepository {
master.payload,
coalesce(identity.driver_card_id, existing.id) as driver_card_id
from master_driver_cards master
left join eventhub.source_driver_card_identity identity
on identity.tenant_key = ?
left join lateral (
select identity.driver_card_id
from eventhub.source_driver_card_identity identity
where identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_driver_card_entity_id = master.source_driver_card_entity_id
order by identity.updated_at desc, identity.id desc
limit 1
) identity on true
left join existing_driver_cards existing
on existing.nation = master.card_nation
and existing.card_number = master.card_number
@ -735,6 +763,22 @@ public class DriverIdentityRepository {
return legacyColumns != null && legacyColumns == 3;
}
private String normalizeDriverCardNumber(String cardNation, String cardNumber) {
String normalized = normalizeNullable(cardNumber);
if (normalized == null) {
return null;
}
if (isSyntheticYellowFoxCardNation(cardNation)) {
return normalized.length() <= 14 ? normalized : normalized.substring(0, 14);
}
return normalized;
}
private boolean isSyntheticYellowFoxCardNation(String cardNation) {
return YELLOWFOX_SYNTHETIC_REFERENCE_NATION.equalsIgnoreCase(cardNation)
|| UNKNOWN_CARD_NATION.equalsIgnoreCase(cardNation);
}
private UUID createDriverCard(
UUID driverId,
String cardNation,

View File

@ -14,6 +14,9 @@ import org.springframework.transaction.annotation.Transactional;
@Repository
public class VehicleIdentityRepository {
private static final String YELLOWFOX_SYNTHETIC_REFERENCE_NATION = "YELLOWFOX";
private static final String UNKNOWN_REGISTRATION_NATION = "UNKNOWN";
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@ -39,6 +42,7 @@ public class VehicleIdentityRepository {
VehicleRegistrationRefDto registration = vehicleRef.vehicleRegistration();
String registrationNation = registration == null ? null : normalizeNullable(registration.nation());
String registrationNumber = registration == null ? null : normalizeNullable(registration.number());
String registrationNationForCreate = creationNation(registrationNation, registrationNumber);
UUID registrationId = resolveRegistrationId(
normalizedTenantKey,
@ -49,7 +53,7 @@ public class VehicleIdentityRepository {
);
if (registrationId == null && (sourceRegistrationEntityId != null || registrationNumber != null)) {
registrationId = createRegistration(
registrationNation,
registrationNationForCreate,
registrationNumber,
null,
Map.of("source", "event")
@ -77,7 +81,7 @@ public class VehicleIdentityRepository {
assignedVehicle = resolveAssignedVehicleReference(normalizedTenantKey, eventSourceId, registrationId, occurredAt);
vehicleId = assignedVehicle == null ? null : assignedVehicle.vehicleId();
}
if (vehicleId == null && (sourceVehicleEntityId != null || vin != null)) {
if (vehicleId == null && vin != null) {
vehicleId = createVehicle(vin);
}
if (vehicleId != null && sourceVehicleEntityId != null) {
@ -91,7 +95,7 @@ public class VehicleIdentityRepository {
);
}
touchVehicle(vehicleId, vin);
touchRegistration(registrationId, registrationNation, registrationNumber);
touchRegistration(registrationId, registrationNationForCreate, registrationNumber);
return new ResolvedVehicleReferenceResolution(
new ResolvedVehicleReference(vehicleId, registrationId),
@ -103,10 +107,17 @@ public class VehicleIdentityRepository {
@Transactional
public int reconcileFromMasterData(String tenantKey, int eventSourceId) {
return reconcileFromMasterData(tenantKey, eventSourceId, true);
}
@Transactional
public int reconcileFromMasterData(String tenantKey, int eventSourceId, boolean syncVehicleRegistrations) {
String normalizedTenantKey = normalizeRequired(tenantKey, "tenantKey");
int updates = reconcileVehiclesFromMasterData(normalizedTenantKey, eventSourceId);
if (syncVehicleRegistrations) {
updates += reconcileRegistrationsFromMasterData(normalizedTenantKey, eventSourceId);
updates += projectVehicleRegistrationAssignments(normalizedTenantKey, eventSourceId);
}
return updates;
}
@ -125,10 +136,7 @@ public class VehicleIdentityRepository {
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE'
and (
nullif(trim(source_entity_id), '') is not null
or nullif(trim(source_external_key), '') is not null
)
and nullif(trim(source_external_key), '') is not null
order by nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), ''),
updated_at desc
@ -139,13 +147,23 @@ public class VehicleIdentityRepository {
master.vin,
coalesce(identity.vehicle_id, existing.id, gen_random_uuid()) as vehicle_id
from master_vehicles master
left join eventhub.source_vehicle_identity identity
on identity.tenant_key = ?
left join lateral (
select identity.vehicle_id
from eventhub.source_vehicle_identity identity
where identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_vehicle_entity_id = master.source_vehicle_entity_id
left join eventhub.vehicle existing
on master.vin is not null
order by identity.updated_at desc, identity.id desc
limit 1
) identity on true
left join lateral (
select existing.id
from eventhub.vehicle existing
where master.vin is not null
and existing.vin = master.vin
order by existing.updated_at desc, existing.created_at desc, existing.id
limit 1
) existing on true
)
insert into eventhub.vehicle(id, vin, updated_at)
select distinct on (resolved.vehicle_id)
@ -177,10 +195,7 @@ public class VehicleIdentityRepository {
where tenant_key = ?
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE'
and (
nullif(trim(source_entity_id), '') is not null
or nullif(trim(source_external_key), '') is not null
)
and nullif(trim(source_external_key), '') is not null
order by nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), ''),
updated_at desc
@ -189,15 +204,21 @@ public class VehicleIdentityRepository {
set vin = coalesce(vehicle.vin, master.vin),
updated_at = now()
from master_vehicles master
left join eventhub.source_vehicle_identity identity
on identity.tenant_key = ?
left join lateral (
select identity.vehicle_id
from eventhub.source_vehicle_identity identity
where identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_vehicle_entity_id = master.source_vehicle_entity_id
order by identity.updated_at desc, identity.id desc
limit 1
) identity on true
where vehicle.id = coalesce(identity.vehicle_id, (
select existing.id
from eventhub.vehicle existing
where master.vin is not null
and existing.vin = master.vin
order by existing.updated_at desc, existing.created_at desc, existing.id
limit 1
))
""",
@ -221,6 +242,7 @@ public class VehicleIdentityRepository {
and event_source_id in (select id from compatible_sources)
and entity_type = 'VEHICLE'
and nullif(trim(source_entity_id), '') is not null
and nullif(trim(source_external_key), '') is not null
order by nullif(trim(source_entity_id), ''),
nullif(trim(source_external_key), ''),
updated_at desc
@ -230,13 +252,23 @@ public class VehicleIdentityRepository {
master.source_vehicle_entity_id,
coalesce(identity.vehicle_id, existing.id) as vehicle_id
from master_vehicles master
left join eventhub.source_vehicle_identity identity
on identity.tenant_key = ?
left join lateral (
select identity.vehicle_id
from eventhub.source_vehicle_identity identity
where identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_vehicle_entity_id = master.source_vehicle_entity_id
left join eventhub.vehicle existing
on master.vin is not null
order by identity.updated_at desc, identity.id desc
limit 1
) identity on true
left join lateral (
select existing.id
from eventhub.vehicle existing
where master.vin is not null
and existing.vin = master.vin
order by existing.updated_at desc, existing.created_at desc, existing.id
limit 1
) existing on true
)
insert into eventhub.source_vehicle_identity(
id, tenant_key, event_source_id, source_vehicle_entity_id,
@ -316,21 +348,41 @@ public class VehicleIdentityRepository {
),
updated_at desc
),
resolved_registrations as (
select master.event_source_id,
master.source_registration_entity_id,
canonical_registrations as (
select distinct on (master.nation, master.registration_number)
master.nation,
master.registration_number,
master.source_updated_at,
coalesce(identity.vehicle_registration_id, existing.id, gen_random_uuid()) as registration_id
master.source_updated_at
from master_registrations master
left join eventhub.source_vehicle_registration_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_registration_entity_id = master.source_registration_entity_id
left join eventhub.vehicle_registration existing
on existing.nation = master.nation
and existing.registration_number = master.registration_number
where master.nation is not null
and master.registration_number is not null
order by master.nation,
master.registration_number,
case when master.source_registration_entity_id is null then 1 else 0 end,
master.source_updated_at desc,
master.source_registration_entity_id
),
existing_registrations as (
select distinct on (existing.nation, existing.registration_number)
existing.id,
existing.nation,
existing.registration_number
from eventhub.vehicle_registration existing
order by existing.nation,
existing.registration_number,
existing.updated_at desc,
existing.created_at desc,
existing.id
),
resolved_registrations as (
select canonical.nation,
canonical.registration_number,
canonical.source_updated_at,
coalesce(existing.id, gen_random_uuid()) as registration_id
from canonical_registrations canonical
left join existing_registrations existing
on existing.nation = canonical.nation
and existing.registration_number = canonical.registration_number
)
insert into eventhub.vehicle_registration(
id, nation, registration_number, source_updated_at, payload, updated_at
@ -343,16 +395,13 @@ public class VehicleIdentityRepository {
jsonb_build_object('source', 'master-data'),
now()
from resolved_registrations resolved
where resolved.nation is not null
and resolved.registration_number is not null
and not exists (
where not exists (
select 1
from eventhub.vehicle_registration existing
where existing.id = resolved.registration_id
)
""",
eventSourceId,
tenantKey,
tenantKey
);
@ -402,26 +451,49 @@ public class VehicleIdentityRepository {
nullif(substring(source_external_key from position(':' in source_external_key) + 1), '')
),
updated_at desc
),
canonical_registrations as (
select distinct on (master.nation, master.registration_number)
master.nation,
master.registration_number,
master.source_updated_at
from master_registrations master
where master.nation is not null
and master.registration_number is not null
order by master.nation,
master.registration_number,
case when master.source_registration_entity_id is null then 1 else 0 end,
master.source_updated_at desc,
master.source_registration_entity_id
),
existing_registrations as (
select distinct on (existing.nation, existing.registration_number)
existing.id,
existing.nation,
existing.registration_number
from eventhub.vehicle_registration existing
order by existing.nation,
existing.registration_number,
existing.updated_at desc,
existing.created_at desc,
existing.id
),
resolved_registrations as (
select canonical.source_updated_at,
existing.id as registration_id
from canonical_registrations canonical
join existing_registrations existing
on existing.nation = canonical.nation
and existing.registration_number = canonical.registration_number
)
update eventhub.vehicle_registration registration
set source_updated_at = coalesce(master.source_updated_at, registration.source_updated_at),
set source_updated_at = coalesce(resolved.source_updated_at, registration.source_updated_at),
payload = registration.payload || jsonb_build_object('source', 'master-data'),
updated_at = now()
from master_registrations master
left join eventhub.source_vehicle_registration_identity identity
on identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_registration_entity_id = master.source_registration_entity_id
where registration.id = coalesce(identity.vehicle_registration_id, (
select existing.id
from eventhub.vehicle_registration existing
where existing.nation = master.nation
and existing.registration_number = master.registration_number
limit 1
))
from resolved_registrations resolved
where registration.id = resolved.registration_id
""",
eventSourceId,
tenantKey,
tenantKey
);
@ -466,17 +538,34 @@ public class VehicleIdentityRepository {
),
updated_at desc
),
existing_registrations as (
select distinct on (existing.nation, existing.registration_number)
existing.id,
existing.nation,
existing.registration_number
from eventhub.vehicle_registration existing
order by existing.nation,
existing.registration_number,
existing.updated_at desc,
existing.created_at desc,
existing.id
),
resolved_registrations as (
select master.event_source_id,
master.source_registration_entity_id,
master.source_updated_at,
coalesce(identity.vehicle_registration_id, existing.id) as registration_id
from master_registrations master
left join eventhub.source_vehicle_registration_identity identity
on identity.tenant_key = ?
left join lateral (
select identity.vehicle_registration_id
from eventhub.source_vehicle_registration_identity identity
where identity.tenant_key = ?
and identity.event_source_id in (select id from compatible_sources)
and identity.source_registration_entity_id = master.source_registration_entity_id
left join eventhub.vehicle_registration existing
order by identity.updated_at desc, identity.id desc
limit 1
) identity on true
left join existing_registrations existing
on existing.nation = master.nation
and existing.registration_number = master.registration_number
)
@ -530,6 +619,13 @@ public class VehicleIdentityRepository {
String nation,
String registrationNumber
) {
if (isYellowFoxSyntheticRegistrationNation(nation) && registrationNumber != null) {
UUID registrationId = findRegistrationByNumber(registrationNumber);
if (registrationId != null) {
return registrationId;
}
return findRegistrationBySourceRegistrationEntityId(tenantKey, eventSourceId, sourceRegistrationEntityId);
}
UUID registrationId = findRegistrationBySourceRegistrationEntityId(tenantKey, eventSourceId, sourceRegistrationEntityId);
if (registrationId == null) {
registrationId = findRegistrationByPlate(nation, registrationNumber);
@ -619,6 +715,28 @@ public class VehicleIdentityRepository {
);
}
private UUID findRegistrationByNumber(String registrationNumber) {
if (registrationNumber == null) {
return null;
}
return jdbcTemplate.query(
"""
select r.id
from eventhub.vehicle_registration r
where r.registration_number = ?
order by
case when r.nation = ? then 0 else 1 end,
r.updated_at desc,
r.created_at desc,
r.id
limit 1
""",
rs -> rs.next() ? (UUID) rs.getObject("id") : null,
registrationNumber,
UNKNOWN_REGISTRATION_NATION
);
}
private AssignedVehicleReference resolveAssignedVehicleReference(
String tenantKey,
int eventSourceId,
@ -894,6 +1012,19 @@ public class VehicleIdentityRepository {
return trimmed.isEmpty() ? null : trimmed;
}
private boolean isYellowFoxSyntheticRegistrationNation(String nation) {
return YELLOWFOX_SYNTHETIC_REFERENCE_NATION.equalsIgnoreCase(nation);
}
private String creationNation(String registrationNation, String registrationNumber) {
if (registrationNumber == null) {
return registrationNation;
}
return isYellowFoxSyntheticRegistrationNation(registrationNation)
? UNKNOWN_REGISTRATION_NATION
: registrationNation;
}
public record ResolvedVehicleReference(UUID vehicleId, UUID vehicleRegistrationId) {
public static ResolvedVehicleReference empty() {
return new ResolvedVehicleReference(null, null);

View File

@ -1,5 +1,6 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult;
import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert;
import at.procon.eventhub.importing.masterdata.SourceMasterRelationUpsert;
@ -56,6 +57,7 @@ public class TachographMasterDataRefreshService {
private final DriverIdentityRepository driverIdentityRepository;
private final VehicleIdentityRepository vehicleIdentityRepository;
private final ResourceLoader resourceLoader;
private final EventHubProperties properties;
public TachographMasterDataRefreshService(
@Qualifier("tachographNamedParameterJdbcTemplate") ObjectProvider<NamedParameterJdbcTemplate> tachographJdbcTemplateProvider,
@ -63,7 +65,8 @@ public class TachographMasterDataRefreshService {
EventSourceRepository eventSourceRepository,
DriverIdentityRepository driverIdentityRepository,
VehicleIdentityRepository vehicleIdentityRepository,
ResourceLoader resourceLoader
ResourceLoader resourceLoader,
EventHubProperties properties
) {
this.tachographJdbcTemplateProvider = tachographJdbcTemplateProvider;
this.sourceMasterDataRepository = sourceMasterDataRepository;
@ -71,6 +74,7 @@ public class TachographMasterDataRefreshService {
this.driverIdentityRepository = driverIdentityRepository;
this.vehicleIdentityRepository = vehicleIdentityRepository;
this.resourceLoader = resourceLoader;
this.properties = properties;
}
public MasterDataRefreshResult refreshIfRequested(TachographImportRequest request) {
@ -101,14 +105,19 @@ public class TachographMasterDataRefreshService {
}
int relationCount = streamRelations(tachographJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE));
boolean syncVehicleRegistrations = properties.getTachograph().isSyncVehicleRegistrationsOnMasterDataUpdate();
log.info("Reconciling tachograph driver identities from source master data tenant={} source={}",
tenantKey, masterDataSource.stableKey());
int reconciledDrivers = driverIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
log.info("Reconciling tachograph vehicle identities from source master data tenant={} source={}",
tenantKey, masterDataSource.stableKey());
int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
log.info("Reconciling tachograph vehicle identities from source master data tenant={} source={} syncVehicleRegistrations={}",
tenantKey, masterDataSource.stableKey(), syncVehicleRegistrations);
int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(
tenantKey,
eventSourceId,
syncVehicleRegistrations
);
MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount);
log.info("Refreshed tachograph source master data tenant={} source={} entities={} relations={} reconciledDrivers={} reconciledVehicles={}",

View File

@ -15,6 +15,7 @@ public record YellowFoxD8BookingDto(
String eventId,
String key,
Integer ignition,
Integer previousIgnition,
Integer eventType,
Integer state,
OffsetDateTime occurredAt,

View File

@ -83,15 +83,15 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk);
ImportCursorStateDto cursor = findCursor(eventSourceId, request, planItem);
Map<String, Object> params = parameters(request, chunkScope, cursor);
QuerySpec query = buildQuerySpec(request, chunkScope, cursor);
Stats stats = new Stats();
YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = ignitionTransitionDetector
.newSession(properties.getYellowFox().isEmitInitialIgnitionSnapshot());
log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={}",
request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), params.get("fleetId"), request.acquisitionStrategy());
request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), query.fleetId(), request.acquisitionStrategy());
jdbcTemplate.query(loadSql(), params, rs -> {
jdbcTemplate.query(query.sql(), query.params(), rs -> {
stats.sourceRowsRead++;
YellowFoxD8BookingDto booking = rowMapper.map(
rs,
@ -136,9 +136,33 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
);
}
private ImportCursorStateDto findCursor(int eventSourceId, YellowFoxD8ImportRequest request, ImportPlanItemDto planItem) {
QuerySpec buildQuerySpec(YellowFoxD8ImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) {
Map<String, Object> params = new HashMap<>();
StringBuilder filters = new StringBuilder("where 1 = 1");
if (scope != null && scope.occurredFrom() != null) {
params.put("occurredFrom", scope.occurredFrom());
filters.append("\n and b.utc >= :occurredFrom");
}
if (scope != null && scope.occurredTo() != null) {
params.put("occurredTo", scope.occurredTo());
filters.append("\n and b.utc < :occurredTo");
}
Integer fleetId = fleetId(request);
if (fleetId != null) {
params.put("fleetId", fleetId);
filters.append("\n and f.id = :fleetId");
}
appendCursorFilter(filters, params, request, cursor);
return new QuerySpec(applyFilters(loadSqlTemplate(), filters.toString()), params, fleetId);
}
ImportCursorStateDto findCursor(int eventSourceId, YellowFoxD8ImportRequest request, ImportPlanItemDto planItem) {
String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
return importCursorRepository.findCursor(
ImportCursorStateDto cursor = importCursorRepository.findCursor(
request.tenantKey(),
eventSourceId,
scopeHash,
@ -146,28 +170,16 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
planItem.sourceKind(),
request.acquisitionStrategy()
);
if (cursor != null || !shouldBootstrapWatermarkCursor(request)) {
return cursor;
}
private Map<String, Object> parameters(YellowFoxD8ImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) {
Map<String, Object> params = new HashMap<>();
params.put("occurredFrom", scope == null ? null : scope.occurredFrom());
params.put("occurredTo", scope == null ? null : scope.occurredTo());
params.put("fleetId", fleetId(request));
if (request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_ROW_WATERMARK && cursor != null) {
OffsetDateTime lastOccurredTo = cursor.lastOccurredTo();
String lastSourceRowId = cursor.lastSourcePackageId();
if (lastOccurredTo != null && properties.getYellowFox().getOccurredAtOverlap() != null
&& !properties.getYellowFox().getOccurredAtOverlap().isZero()) {
lastOccurredTo = lastOccurredTo.minus(properties.getYellowFox().getOccurredAtOverlap());
lastSourceRowId = null;
}
params.put("lastOccurredTo", lastOccurredTo);
params.put("lastSourceRowId", lastSourceRowId);
} else {
params.put("lastOccurredTo", null);
params.put("lastSourceRowId", null);
}
return params;
return importCursorRepository.findLatestCursor(
request.tenantKey(),
eventSourceId,
scopeHash,
planItem.eventFamily(),
planItem.sourceKind()
);
}
private Integer fleetId(YellowFoxD8ImportRequest request) {
@ -204,7 +216,59 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
stats.acceptEvent(event);
}
private String loadSql() {
private void appendCursorFilter(
StringBuilder filters,
Map<String, Object> params,
YellowFoxD8ImportRequest request,
ImportCursorStateDto cursor
) {
if (request.acquisitionStrategy() != AcquisitionStrategy.SOURCE_ROW_WATERMARK || cursor == null) {
return;
}
OffsetDateTime lastOccurredTo = cursor.lastOccurredTo();
String lastSourceRowId = cursor.lastSourcePackageId();
if (lastOccurredTo == null) {
return;
}
if (properties.getYellowFox().getOccurredAtOverlap() != null
&& !properties.getYellowFox().getOccurredAtOverlap().isZero()) {
lastOccurredTo = lastOccurredTo.minus(properties.getYellowFox().getOccurredAtOverlap());
lastSourceRowId = null;
}
params.put("lastOccurredTo", lastOccurredTo);
if (lastSourceRowId == null || lastSourceRowId.isBlank()) {
filters.append("\n and b.utc >= :lastOccurredTo");
return;
}
params.put("lastSourceRowId", lastSourceRowId);
filters.append("""
and (
b.utc > :lastOccurredTo
or (
b.utc = :lastOccurredTo
and b.eventid > :lastSourceRowId
)
)""");
}
private boolean shouldBootstrapWatermarkCursor(YellowFoxD8ImportRequest request) {
return request.mode() == at.procon.eventhub.dto.ImportMode.INCREMENTAL_UPDATE
&& (request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_ROW_WATERMARK
|| request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK);
}
private String applyFilters(String sqlTemplate, String filters) {
if (!sqlTemplate.contains("/*__FILTERS__*/")) {
throw new IllegalStateException("YellowFox D8 extraction SQL template is missing filter marker");
}
return sqlTemplate.replace("/*__FILTERS__*/", filters);
}
private String loadSqlTemplate() {
Resource resource = resourceLoader.getResource("classpath:sql/yellowfox/d8-bookings.sql");
try (var in = resource.getInputStream()) {
return StreamUtils.copyToString(in, StandardCharsets.UTF_8);
@ -213,6 +277,9 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
}
}
static record QuerySpec(String sql, Map<String, Object> params, Integer fleetId) {
}
private static class Stats {
private int sourceRowsRead;
private int eventsSent;

View File

@ -28,29 +28,29 @@ public class YellowFoxD8BookingRowMapper {
}
public YellowFoxD8BookingDto map(ResultSet rs, String tenantKey, String sourceInstanceKey, String tenantProviderSettingKey) throws SQLException {
String eventId = rs.getString("eventid");
String eventId = string(rs, "eventid");
OffsetDateTime occurredAt = rs.getObject("utc", OffsetDateTime.class);
Integer vehicleId = getInteger(rs, "vehicle_id");
Integer driverId = getInteger(rs, "driver_id");
String vehicleVrn = rs.getString("vehicle_vrn");
String vehicleVin = rs.getString("vehicle_vin");
String driverCard = rs.getString("driver_card_number");
String vehicleVrn = string(rs, "vehicle_vrn");
String vehicleVin = string(rs, "vehicle_vin");
String driverCard = normalizeBookingDriverCardNumber(string(rs, "driver_card_number"));
Integer fleetId = getInteger(rs, "fleet_id");
String fleetName = rs.getString("fleet_name");
String fleetName = string(rs, "fleet_name");
Integer odometer = getInteger(rs, "odometer");
Map<String, Object> payload = payload(rs.getString("payload"));
Map<String, Object> payload = trimPayloadStrings(payload(rs.getString("payload")));
put(payload, "yellowFoxEventId", eventId);
put(payload, "yellowFoxOdometerRaw", odometer);
put(payload, "vehicleVrn", vehicleVrn);
put(payload, "vehicleVin", vehicleVin);
put(payload, "driverCardNumber", driverCard);
put(payload, "driverFirstName", rs.getString("driver_firstname"));
put(payload, "driverLastName", rs.getString("driver_lastname"));
put(payload, "driverFirstName", string(rs, "driver_firstname"));
put(payload, "driverLastName", string(rs, "driver_lastname"));
put(payload, "fleetId", fleetId);
put(payload, "fleetName", fleetName);
put(payload, "telematicProviderId", getInteger(rs, "telematic_provider_id"));
put(payload, "telematicProviderName", rs.getString("telematic_provider_name"));
put(payload, "telematicProviderName", string(rs, "telematic_provider_name"));
DriverRefDto driverRef = driverId == null && isBlank(driverCard)
? null
@ -74,8 +74,9 @@ public class YellowFoxD8BookingRowMapper {
fleetId == null ? null : fleetId.toString(),
fleetName,
eventId,
rs.getString("key"),
string(rs, "key"),
getInteger(rs, "ignition"),
getInteger(rs, "previous_ignition"),
getInteger(rs, "eventtype"),
getInteger(rs, "state"),
occurredAt,
@ -108,6 +109,35 @@ public class YellowFoxD8BookingRowMapper {
}
}
private String string(ResultSet rs, String column) throws SQLException {
String value = rs.getString(column);
return value == null || value.isBlank() ? null : value.trim();
}
@SuppressWarnings("unchecked")
private Map<String, Object> trimPayloadStrings(Map<String, Object> payload) {
payload.replaceAll((key, value) -> trimPayloadValue(value));
return payload;
}
@SuppressWarnings("unchecked")
private Object trimPayloadValue(Object value) {
if (value instanceof String text) {
return text.trim();
}
if (value instanceof Map<?, ?> nestedMap) {
((Map<Object, Object>) nestedMap).replaceAll((key, nestedValue) -> trimPayloadValue(nestedValue));
return nestedMap;
}
if (value instanceof java.util.List<?> list) {
for (int i = 0; i < list.size(); i++) {
((java.util.List<Object>) list).set(i, trimPayloadValue(list.get(i)));
}
return list;
}
return value;
}
private void put(Map<String, Object> target, String key, Object value) {
if (value != null) {
target.put(key, value instanceof BigDecimal bd ? bd.stripTrailingZeros().toPlainString() : value);
@ -117,4 +147,12 @@ public class YellowFoxD8BookingRowMapper {
private boolean isBlank(String value) {
return value == null || value.isBlank();
}
private String normalizeBookingDriverCardNumber(String value) {
if (value == null || value.isBlank()) {
return null;
}
String trimmed = value.trim();
return trimmed.length() <= 14 ? trimmed : trimmed.substring(0, 14);
}
}

View File

@ -35,9 +35,12 @@ public class YellowFoxD8IgnitionTransitionDetector {
}
String vehicleKey = booking.vehicleRef().stableKey();
Integer previous = lastIgnitionByVehicle.put(vehicleKey, booking.ignition());
if (previous == null) {
previous = booking.previousIgnition();
if (previous == null) {
return emitInitialSnapshot ? mapper.mapIgnitionTransition(booking, null) : null;
}
}
if (!previous.equals(booking.ignition())) {
return mapper.mapIgnitionTransition(booking, previous);
}

View File

@ -1,5 +1,6 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.importing.masterdata.MasterDataRefreshResult;
import at.procon.eventhub.importing.masterdata.SourceMasterEntityUpsert;
@ -57,6 +58,7 @@ public class YellowFoxMasterDataRefreshService {
private final DriverIdentityRepository driverIdentityRepository;
private final VehicleIdentityRepository vehicleIdentityRepository;
private final ResourceLoader resourceLoader;
private final EventHubProperties properties;
public YellowFoxMasterDataRefreshService(
@Qualifier("yellowFoxNamedParameterJdbcTemplate") ObjectProvider<NamedParameterJdbcTemplate> yellowFoxJdbcTemplateProvider,
@ -64,7 +66,8 @@ public class YellowFoxMasterDataRefreshService {
EventSourceRepository eventSourceRepository,
DriverIdentityRepository driverIdentityRepository,
VehicleIdentityRepository vehicleIdentityRepository,
ResourceLoader resourceLoader
ResourceLoader resourceLoader,
EventHubProperties properties
) {
this.yellowFoxJdbcTemplateProvider = yellowFoxJdbcTemplateProvider;
this.sourceMasterDataRepository = sourceMasterDataRepository;
@ -72,6 +75,7 @@ public class YellowFoxMasterDataRefreshService {
this.driverIdentityRepository = driverIdentityRepository;
this.vehicleIdentityRepository = vehicleIdentityRepository;
this.resourceLoader = resourceLoader;
this.properties = properties;
}
public MasterDataRefreshResult refreshIfRequested(YellowFoxD8ImportRequest request) {
@ -102,14 +106,19 @@ public class YellowFoxMasterDataRefreshService {
}
int relationCount = streamRelations(yellowFoxJdbcTemplate, tenantKey, eventSourceId, RELATIONS_SQL_RESOURCE, loadSql(RELATIONS_SQL_RESOURCE));
boolean syncVehicleRegistrations = properties.getYellowFox().isSyncVehicleRegistrationsOnMasterDataUpdate();
log.info("Reconciling YellowFox driver identities from source master data tenant={} source={}",
tenantKey, masterDataSource.stableKey());
int reconciledDrivers = driverIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
log.info("Reconciling YellowFox vehicle identities from source master data tenant={} source={}",
tenantKey, masterDataSource.stableKey());
int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(tenantKey, eventSourceId);
log.info("Reconciling YellowFox vehicle identities from source master data tenant={} source={} syncVehicleRegistrations={}",
tenantKey, masterDataSource.stableKey(), syncVehicleRegistrations);
int reconciledVehicles = vehicleIdentityRepository.reconcileFromMasterData(
tenantKey,
eventSourceId,
syncVehicleRegistrations
);
MasterDataRefreshResult result = new MasterDataRefreshResult(entities, relationCount);
log.info("Refreshed YellowFox source master data tenant={} source={} entities={} relations={} reconciledDrivers={} reconciledVehicles={}",

View File

@ -53,6 +53,7 @@ eventhub:
tachograph:
default-chunk-days: 1
occurred-at-overlap: 7d
sync-vehicle-registrations-on-master-data-update: true
# Set TACHOGRAPH_DB_JDBC_URL to enable JdbcTachographExtractionBatchExecutor.
datasource:
@ -63,7 +64,7 @@ eventhub:
# Enables the scheduler that regularly triggers configured tachograph import plans.
# Default is safe: no scheduled import starts unless explicitly enabled.
scheduler-enabled: true
scheduler-enabled: false
scheduler-poll-interval-ms: 3600000
# PLAN_ONLY creates import_run + planned extraction packages.
@ -78,7 +79,7 @@ eventhub:
# Example plan. Keep disabled until the tachograph datasource/extractor is wired.
import-plans:
- plan-key: tachograph-org-14708
enabled: true
enabled: false
cron: "0 15 * * * *" # hourly at minute 15
tenant-key: Procon
event-source:
@ -115,10 +116,10 @@ eventhub:
scheduled-mode: INCREMENTAL_UPDATE
initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP
scheduled-strategy: SOURCE_PACKAGE_WATERMARK
refresh-master-data-first: true
refresh-master-data-first: false
initial-occurred-from: "2026-04-01T00:00:00+01:00"
initial-occurred-to: "2026-04-10T00:00:00+01:00"
run-initial-on-startup: true
run-initial-on-startup: false
esper-poc:
activity-merge-mode: JAVA
@ -135,6 +136,7 @@ eventhub:
default-chunk-days: 1
occurred-at-overlap: 2h
emit-initial-ignition-snapshot: false
sync-vehicle-registrations-on-master-data-update: false
datasource:
jdbc-url: ${YELLOWFOX_DB_JDBC_URL:}
@ -142,13 +144,13 @@ eventhub:
password: ${YELLOWFOX_DB_PASSWORD:}
driver-class-name: org.postgresql.Driver
scheduler-enabled: false
scheduler-enabled: true
scheduler-poll-interval-ms: 60000
scheduler-trigger-mode: PLAN_ONLY
scheduler-trigger-mode: EXECUTE
import-plans:
- plan-key: yellowfox-d8-default
enabled: false
enabled: true
cron: "0 */5 * * * *"
tenant-key: Procon
event-source:
@ -157,17 +159,20 @@ eventhub:
source-key: YELLOWFOX_D8
source-instance-key: logistics-db-prod
tenant-provider-setting-key: yellowfox-main
source-group:
type: FLEET
source-entity-id: "7"
import-scope:
type: TENANT_ALL
include-children: false
event-families:
- DRIVER_ACTIVITY
#- DRIVER_CARD
- DRIVER_CARD
initial-mode: INITIAL_BACKFILL
scheduled-mode: INCREMENTAL_UPDATE
scheduled-mode: INITIAL_BACKFILL # INITIAL_BACKFILL, INCREMENTAL_UPDATE
initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP
scheduled-strategy: SOURCE_ROW_WATERMARK
initial-occurred-from: "2026-04-01T00:00:00+01:00"
initial-occurred-to: "2026-04-02T00:00:00+01:00"
refresh-master-data-first: false
run-initial-on-startup: false
initial-occurred-to: "2026-04-10T00:00:00+01:00"
refresh-master-data-first: true
run-initial-on-startup: true

View File

@ -1,3 +1,12 @@
with bookings as (
select
b.*,
lag(b.ignition) over (
partition by b.vehicle_id
order by b.utc asc, b.eventid asc
) as previous_ignition
from data.d8_booking b
)
select
b.eventid,
b.utc,
@ -5,6 +14,7 @@ select
b.driver_id,
b.key,
b.ignition,
b.previous_ignition,
b.eventtype,
b.state,
b.odometer,
@ -18,7 +28,7 @@ select
d.firstname as driver_firstname,
d.name as driver_lastname,
d.drivers_card as driver_card_number,
left(trim(d.drivers_card), 14) as driver_card_number,
d.fleet_id as driver_fleet_id,
f.id as fleet_id,
@ -26,7 +36,7 @@ select
tp.id as telematic_provider_id,
tp.name as telematic_provider_name
from data.d8_booking b
from bookings b
left join data.vehicle v
on v.id = b.vehicle_id
left join data.driver d
@ -35,15 +45,5 @@ left join data.fleet f
on f.id = coalesce(v.fleet_id, d.fleet_id)
left join data.telematic_provider tp
on tp.id = v.telematic_provider_id
where (:occurredFrom is null or b.utc >= :occurredFrom)
and (:occurredTo is null or b.utc < :occurredTo)
and (:fleetId is null or f.id = :fleetId)
and (
:lastOccurredTo is null
or b.utc > :lastOccurredTo
or (
b.utc = :lastOccurredTo
and (:lastSourceRowId is null or b.eventid > :lastSourceRowId)
)
)
/*__FILTERS__*/
order by b.utc asc, b.eventid asc;

View File

@ -1,15 +1,15 @@
select
'DRIVER_CARD' as entity_type,
cast(d.id as varchar(128)) as source_entity_id,
concat('YELLOWFOX:', d.drivers_card) as source_external_key,
d.drivers_card as display_name,
concat('YELLOWFOX:', left(trim(d.drivers_card), 14)) as source_external_key,
left(trim(d.drivers_card), 14) as display_name,
true as active,
null::timestamptz as valid_from,
null::timestamptz as valid_to,
null::timestamptz as source_updated_at,
d.id as driver_id,
d.drivers_card as card_number,
'YELLOWFOX' as card_nation,
left(trim(d.drivers_card), 14) as card_number,
'UNKNOWN' as card_nation,
d.fleet_id as fleet_id
from data.driver d
where nullif(trim(d.drivers_card), '') is not null;

View File

@ -1,7 +1,7 @@
select
'DRIVER' as entity_type,
cast(d.id as varchar(128)) as source_entity_id,
coalesce(nullif(trim(d.drivers_card), ''), cast(d.id as varchar(128))) as source_external_key,
coalesce(nullif(left(trim(d.drivers_card), 14), ''), cast(d.id as varchar(128))) as source_external_key,
nullif(trim(concat_ws(' ', d.firstname, d.name)), '') as display_name,
true as active,
null::timestamptz as valid_from,

View File

@ -43,6 +43,7 @@ select
cast(v.id as varchar(128)) as source_row_id
from data.vehicle v
where v.fleet_id is not null
and nullif(trim(v.vin), '') is not null
union all
@ -59,6 +60,7 @@ select
cast(v.id as varchar(128)) as source_row_id
from data.vehicle v
where nullif(trim(v.vrn), '') is not null
and nullif(trim(v.vin), '') is not null
union all
@ -74,4 +76,5 @@ select
'data.vehicle' as source_table,
cast(v.id as varchar(128)) as source_row_id
from data.vehicle v
where v.telematic_provider_id is not null;
where v.telematic_provider_id is not null
and nullif(trim(v.vin), '') is not null;

View File

@ -1,16 +1,16 @@
select
'VEHICLE_REGISTRATION' as entity_type,
cast(v.id as varchar(128)) as source_entity_id,
concat('YELLOWFOX:', v.vrn) as source_external_key,
v.vrn as display_name,
concat('YELLOWFOX:', trim(v.vrn)) as source_external_key,
trim(v.vrn) as display_name,
true as active,
null::timestamptz as valid_from,
null::timestamptz as valid_to,
null::timestamptz as source_updated_at,
v.id as vehicle_id,
v.vin as vin,
'YELLOWFOX' as registration_nation,
v.vrn as registration_number,
trim(v.vin) as vin,
'UNKNOWN' as registration_nation,
trim(v.vrn) as registration_number,
v.fleet_id as fleet_id
from data.vehicle v
where nullif(trim(v.vrn), '') is not null;

View File

@ -1,16 +1,17 @@
select
'VEHICLE' as entity_type,
cast(v.id as varchar(128)) as source_entity_id,
coalesce(nullif(trim(v.vin), ''), cast(v.id as varchar(128))) as source_external_key,
coalesce(nullif(trim(v.vrn), ''), nullif(trim(v.vin), ''), cast(v.id as varchar(128))) as display_name,
trim(v.vin) as source_external_key,
coalesce(nullif(trim(v.vrn), ''), trim(v.vin)) as display_name,
true as active,
null::timestamptz as valid_from,
null::timestamptz as valid_to,
null::timestamptz as source_updated_at,
v.id as vehicle_id,
v.vin as vin,
v.vrn as registration_number,
'YELLOWFOX' as registration_nation,
trim(v.vin) as vin,
trim(v.vrn) as registration_number,
'UNKNOWN' as registration_nation,
v.fleet_id as fleet_id,
v.telematic_provider_id as telematic_provider_id
from data.vehicle v;
from data.vehicle v
where nullif(trim(v.vin), '') is not null;

View File

@ -83,6 +83,7 @@ class YellowFoxD8BookingEventMapperTest {
"event-1",
"key-1",
ignition,
null,
eventType,
state,
OffsetDateTime.parse("2026-04-29T08:15:00+02:00"),

View File

@ -0,0 +1,42 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.time.OffsetDateTime;
import java.util.EnumSet;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class ImportChunkPlannerTest {
@Test
void keepsScheduledRowWatermarkImportsAsSingleChunkEvenWhenScopeHasWindow() {
ImportChunkPlanner planner = new ImportChunkPlanner();
YellowFoxD8ImportRequest request = new YellowFoxD8ImportRequest(
"tenant-1",
new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "instance-1", "setting-1", null),
(SourceGroupRefDto) null,
ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-04-01T00:00:00+02:00"),
OffsetDateTime.parse("2026-04-10T00:00:00+02:00")
),
EnumSet.noneOf(EventFamily.class),
ImportMode.INCREMENTAL_UPDATE,
false,
AcquisitionStrategy.SOURCE_ROW_WATERMARK
);
assertThat(planner.chunksFor(request, 1))
.containsExactly(new ImportTimeChunkDto(
1,
OffsetDateTime.parse("2026-04-01T00:00:00+02:00"),
OffsetDateTime.parse("2026-04-10T00:00:00+02:00")
));
}
}

View File

@ -0,0 +1,187 @@
package at.procon.eventhub.importing.extraction;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.importing.ExtractionBatchResult;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportRunRequest;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class AbstractJdbcExtractionBatchExecutorCursorTest {
@Test
void bootstrapsWatermarkCursorFromLatestExistingCursorWhenExactCursorIsMissing() {
ImportCursorRepository repository = mock(ImportCursorRepository.class);
TestExecutor executor = new TestExecutor(repository);
TestRequest request = new TestRequest();
ImportPlanItemDto planItem = new ImportPlanItemDto(
EventFamily.DRIVER_ACTIVITY,
"VEHICLE_UNIT",
"VU_ACTIVITY",
List.of("VUActivity"),
"VEHICLE",
"Vehicle activity",
AcquisitionStrategy.SOURCE_ROW_WATERMARK
);
ImportCursorStateDto fallbackCursor = new ImportCursorStateDto(
null,
"9001",
null,
OffsetDateTime.parse("2026-04-09T23:59:59+02:00")
);
when(repository.findCursor(
"tenant-1",
21,
request.importScope().stableKey(),
EventFamily.DRIVER_ACTIVITY,
"VEHICLE_UNIT",
AcquisitionStrategy.SOURCE_ROW_WATERMARK
)).thenReturn(null);
when(repository.findLatestCursor(
"tenant-1",
21,
request.importScope().stableKey(),
EventFamily.DRIVER_ACTIVITY,
"VEHICLE_UNIT"
)).thenReturn(fallbackCursor);
assertThat(executor.resolveCursor(21, request, planItem)).isEqualTo(fallbackCursor);
verify(repository).findLatestCursor(
"tenant-1",
21,
request.importScope().stableKey(),
EventFamily.DRIVER_ACTIVITY,
"VEHICLE_UNIT"
);
}
private static final class TestExecutor extends AbstractJdbcExtractionBatchExecutor<TestRequest, TestResult> {
private TestExecutor(ImportCursorRepository repository) {
super(null, null, null, new EventHubProperties(), null, repository);
}
private ImportCursorStateDto resolveCursor(int eventSourceId, TestRequest request, ImportPlanItemDto planItem) {
return findCursor(eventSourceId, request, planItem);
}
@Override
protected Optional<ExtractionDefinition<TestRequest>> findDefinition(String code) {
return Optional.empty();
}
@Override
protected EventSourceDto eventSourceFor(TestRequest request, ImportPlanItemDto planItem) {
return request.eventSource();
}
@Override
protected TestResult resultFor(UUID packageId, ImportPlanItemDto planItem, ImportTimeChunkDto chunk, ImportCursorStateDto cursor, ExtractedEventStats stats) {
return new TestResult();
}
}
private record TestRequest() implements ImportRunRequest {
@Override
public String tenantKey() {
return "tenant-1";
}
@Override
public EventSourceDto eventSource() {
return new EventSourceDto("TACHOGRAPH", "VEHICLE_UNIT", "TACHOGRAPH_VEHICLE_UNIT", "instance-1", "setting-1", null);
}
@Override
public at.procon.eventhub.dto.SourceGroupRefDto sourceGroup() {
return null;
}
@Override
public ImportScopeDto importScope() {
return ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-04-01T00:00:00+02:00"),
OffsetDateTime.parse("2026-04-10T00:00:00+02:00")
);
}
@Override
public Set<EventFamily> eventFamilies() {
return Set.of(EventFamily.DRIVER_ACTIVITY);
}
@Override
public ImportMode mode() {
return ImportMode.INCREMENTAL_UPDATE;
}
@Override
public boolean refreshMasterDataFirst() {
return false;
}
@Override
public AcquisitionStrategy acquisitionStrategy() {
return AcquisitionStrategy.SOURCE_ROW_WATERMARK;
}
}
private static final class TestResult implements ExtractionBatchResult {
@Override
public int eventsInserted() {
return 0;
}
@Override
public boolean executed() {
return false;
}
@Override
public OffsetDateTime lastSourcePackageImportedAt() {
return null;
}
@Override
public String lastSourcePackageId() {
return null;
}
@Override
public OffsetDateTime lastSourceRowUpdatedAt() {
return null;
}
@Override
public OffsetDateTime lastOccurredTo() {
return null;
}
@Override
public Map<String, Integer> eventTypeCounts() {
return Map.of();
}
}
}

View File

@ -0,0 +1,105 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.SourceGroupType;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.time.OffsetDateTime;
import java.util.EnumSet;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.DefaultResourceLoader;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest {
@Test
void bootstrapsScheduledWatermarkCursorFromLatestExistingCursorWhenExactCursorIsMissing() {
ImportCursorRepository repository = mock(ImportCursorRepository.class);
JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(repository);
YellowFoxD8ImportRequest request = request();
ImportPlanItemDto planItem = new ImportPlanItemDto(
EventFamily.DRIVER_ACTIVITY,
"TELEMATICS_PLATFORM",
"YELLOWFOX_D8_BOOKING",
List.of("data.d8_booking"),
"BOTH",
"YellowFox bookings",
AcquisitionStrategy.SOURCE_ROW_WATERMARK
);
ImportCursorStateDto fallbackCursor = new ImportCursorStateDto(
null,
"4711",
null,
OffsetDateTime.parse("2026-04-09T23:59:59+02:00")
);
when(repository.findCursor(
"tenant-1",
17,
request.importScope().stableKey(),
EventFamily.DRIVER_ACTIVITY,
"TELEMATICS_PLATFORM",
AcquisitionStrategy.SOURCE_ROW_WATERMARK
)).thenReturn(null);
when(repository.findLatestCursor(
"tenant-1",
17,
request.importScope().stableKey(),
EventFamily.DRIVER_ACTIVITY,
"TELEMATICS_PLATFORM"
)).thenReturn(fallbackCursor);
assertThat(executor.findCursor(17, request, planItem)).isEqualTo(fallbackCursor);
verify(repository).findLatestCursor(
"tenant-1",
17,
request.importScope().stableKey(),
EventFamily.DRIVER_ACTIVITY,
"TELEMATICS_PLATFORM"
);
}
private JdbcYellowFoxD8BookingExtractionBatchExecutor executor(ImportCursorRepository repository) {
EventHubProperties properties = new EventHubProperties();
return new JdbcYellowFoxD8BookingExtractionBatchExecutor(
null,
null,
new DefaultResourceLoader(),
repository,
properties,
null,
null,
null
);
}
private YellowFoxD8ImportRequest request() {
return new YellowFoxD8ImportRequest(
"tenant-1",
new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "instance-1", "setting-1", null),
new SourceGroupRefDto(SourceGroupType.FLEET, "7", null, "Fleet 7"),
ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-04-01T00:00:00+02:00"),
OffsetDateTime.parse("2026-04-10T00:00:00+02:00")
),
EnumSet.of(EventFamily.DRIVER_ACTIVITY),
ImportMode.INCREMENTAL_UPDATE,
false,
AcquisitionStrategy.SOURCE_ROW_WATERMARK
);
}
}

View File

@ -0,0 +1,110 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.SourceGroupType;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.EnumSet;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.DefaultResourceLoader;
import static org.assertj.core.api.Assertions.assertThat;
class JdbcYellowFoxD8BookingExtractionBatchExecutorTest {
@Test
void omitsCursorParametersWhenNoCursorExists() {
JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(Duration.ofHours(2));
JdbcYellowFoxD8BookingExtractionBatchExecutor.QuerySpec query = executor.buildQuerySpec(
request(AcquisitionStrategy.SOURCE_ROW_WATERMARK),
ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-04-01T00:00:00+02:00"),
OffsetDateTime.parse("2026-04-02T00:00:00+02:00")
),
null
);
assertThat(query.sql()).contains("b.utc >= :occurredFrom");
assertThat(query.sql()).contains("b.utc < :occurredTo");
assertThat(query.sql()).contains("f.id = :fleetId");
assertThat(query.sql()).doesNotContain(":lastOccurredTo");
assertThat(query.sql()).doesNotContain(" is null");
assertThat(query.params()).containsOnlyKeys("occurredFrom", "occurredTo", "fleetId");
assertThat(query.fleetId()).isEqualTo(7);
}
@Test
void keepsStrictUtcEventIdCursorWhenOverlapIsDisabled() {
JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(Duration.ZERO);
OffsetDateTime cursorTime = OffsetDateTime.parse("2026-04-02T09:15:00+02:00");
JdbcYellowFoxD8BookingExtractionBatchExecutor.QuerySpec query = executor.buildQuerySpec(
request(AcquisitionStrategy.SOURCE_ROW_WATERMARK),
ImportScopeDto.tenantAll(null, null),
new ImportCursorStateDto(null, "4711", null, cursorTime)
);
assertThat(query.sql()).contains("b.utc > :lastOccurredTo");
assertThat(query.sql()).contains("b.utc = :lastOccurredTo");
assertThat(query.sql()).contains("b.eventid > :lastSourceRowId");
assertThat(query.sql()).doesNotContain(" is null");
assertThat(query.params()).containsEntry("lastOccurredTo", cursorTime);
assertThat(query.params()).containsEntry("lastSourceRowId", "4711");
}
@Test
void keepsOccurredToCapOnceCursorExistsForWatermarkIncrementalRuns() {
JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(Duration.ZERO);
OffsetDateTime cursorTime = OffsetDateTime.parse("2026-04-02T09:15:00+02:00");
JdbcYellowFoxD8BookingExtractionBatchExecutor.QuerySpec query = executor.buildQuerySpec(
request(AcquisitionStrategy.SOURCE_ROW_WATERMARK),
ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-04-01T00:00:00+02:00"),
OffsetDateTime.parse("2026-04-02T00:00:00+02:00")
),
new ImportCursorStateDto(null, "4711", null, cursorTime)
);
assertThat(query.sql()).contains("b.utc >= :occurredFrom");
assertThat(query.sql()).contains("b.utc < :occurredTo");
assertThat(query.params()).containsEntry("occurredFrom", OffsetDateTime.parse("2026-04-01T00:00:00+02:00"));
assertThat(query.params()).containsEntry("occurredTo", OffsetDateTime.parse("2026-04-02T00:00:00+02:00"));
}
private JdbcYellowFoxD8BookingExtractionBatchExecutor executor(Duration overlap) {
EventHubProperties properties = new EventHubProperties();
properties.getYellowFox().setOccurredAtOverlap(overlap);
return new JdbcYellowFoxD8BookingExtractionBatchExecutor(
null,
null,
new DefaultResourceLoader(),
null,
properties,
null,
null,
null
);
}
private YellowFoxD8ImportRequest request(AcquisitionStrategy strategy) {
return new YellowFoxD8ImportRequest(
"tenant-1",
new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "instance-1", "setting-1", null),
new SourceGroupRefDto(SourceGroupType.FLEET, "7", null, "Fleet 7"),
ImportScopeDto.tenantAll(null, null),
EnumSet.noneOf(at.procon.eventhub.dto.EventFamily.class),
ImportMode.INCREMENTAL_UPDATE,
false,
strategy
);
}
}

View File

@ -0,0 +1,90 @@
package at.procon.eventhub.yellowfox.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class YellowFoxD8BookingRowMapperTest {
@Test
void trimsVehicleRegistrationValuesInReferencesAndPayload() throws SQLException {
YellowFoxD8BookingRowMapper mapper = new YellowFoxD8BookingRowMapper(new ObjectMapper());
ResultSet rs = mock(ResultSet.class);
OffsetDateTime occurredAt = OffsetDateTime.parse("2026-05-08T10:15:30+02:00");
when(rs.getString("eventid")).thenReturn(" evt-1 ");
when(rs.getObject("utc", OffsetDateTime.class)).thenReturn(occurredAt);
when(rs.getInt("vehicle_id")).thenReturn(42);
when(rs.getInt("driver_id")).thenReturn(7);
when(rs.getInt("fleet_id")).thenReturn(9);
when(rs.getInt("odometer")).thenReturn(1234);
when(rs.getInt("telematic_provider_id")).thenReturn(3);
when(rs.getInt("ignition")).thenReturn(1);
when(rs.getInt("previous_ignition")).thenReturn(0);
when(rs.getInt("eventtype")).thenReturn(2);
when(rs.getInt("state")).thenReturn(3);
when(rs.wasNull()).thenReturn(false);
when(rs.getString("vehicle_vrn")).thenReturn(" W-12345 ");
when(rs.getString("vehicle_vin")).thenReturn(" vin-123 ");
when(rs.getString("driver_card_number")).thenReturn(" card-9 ");
when(rs.getString("fleet_name")).thenReturn(" Fleet 9 ");
when(rs.getString("driver_firstname")).thenReturn(" Ada ");
when(rs.getString("driver_lastname")).thenReturn(" Lovelace ");
when(rs.getString("telematic_provider_name")).thenReturn(" YellowFox ");
when(rs.getString("key")).thenReturn(" booking-key ");
when(rs.getString("payload")).thenReturn("""
{"vrn":" W-12345 ","nested":{"label":" value "},"list":[" x ",1]}
""");
var booking = mapper.map(rs, "tenant-1", "instance-1", "setting-1");
assertThat(booking.eventId()).isEqualTo("evt-1");
assertThat(booking.key()).isEqualTo("booking-key");
assertThat(booking.previousIgnition()).isEqualTo(0);
assertThat(booking.vehicleRef().vin()).isEqualTo("VIN-123");
assertThat(booking.vehicleRef().vehicleRegistration().number()).isEqualTo("W-12345");
assertThat(booking.payload()).containsEntry("vrn", "W-12345");
assertThat(booking.payload()).containsEntry("vehicleVrn", "W-12345");
assertThat(booking.payload()).containsEntry("driverFirstName", "Ada");
assertThat(booking.payload()).containsEntry("telematicProviderName", "YellowFox");
assertThat(((Map<?, ?>) booking.payload().get("nested")).get("label")).isEqualTo("value");
assertThat((List<Object>) booking.payload().get("list")).containsExactly("x", 1);
}
@Test
void truncatesBookingDriverCardNumberToFirst14Characters() throws SQLException {
YellowFoxD8BookingRowMapper mapper = new YellowFoxD8BookingRowMapper(new ObjectMapper());
ResultSet rs = mock(ResultSet.class);
OffsetDateTime occurredAt = OffsetDateTime.parse("2026-05-08T10:15:30+02:00");
when(rs.getString("eventid")).thenReturn("evt-2");
when(rs.getObject("utc", OffsetDateTime.class)).thenReturn(occurredAt);
when(rs.getInt("vehicle_id")).thenReturn(0);
when(rs.getInt("driver_id")).thenReturn(0);
when(rs.getInt("fleet_id")).thenReturn(0);
when(rs.getInt("odometer")).thenReturn(0);
when(rs.getInt("telematic_provider_id")).thenReturn(0);
when(rs.getInt("ignition")).thenReturn(1);
when(rs.getInt("previous_ignition")).thenReturn(0);
when(rs.getInt("eventtype")).thenReturn(2);
when(rs.getInt("state")).thenReturn(3);
when(rs.wasNull()).thenReturn(true);
when(rs.getString("driver_card_number")).thenReturn(" 12345678901234AB ");
when(rs.getString("payload")).thenReturn("{}");
var booking = mapper.map(rs, "tenant-1", "instance-1", "setting-1");
assertThat(booking.driverRef()).isNotNull();
assertThat(booking.driverRef().driverCard()).isNotNull();
assertThat(booking.driverRef().driverCard().number()).isEqualTo("12345678901234");
assertThat(booking.payload()).containsEntry("driverCardNumber", "12345678901234");
}
}

View File

@ -0,0 +1,39 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.ImportScopeType;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.time.OffsetDateTime;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class YellowFoxD8ConfiguredImportPlanServiceTest {
@Test
void scheduledWatermarkRequestKeepsInitialOccurredWindowAsBootstrapScope() {
EventHubProperties properties = new EventHubProperties();
EventHubProperties.ConfiguredImportPlan plan = new EventHubProperties.ConfiguredImportPlan();
plan.setPlanKey("yellowfox-d8-default");
plan.setTenantKey("Procon");
plan.setEventSource(new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "logistics-db-prod", "yellowfox-main", null));
plan.setImportScope(new ImportScopeDto(ImportScopeType.TENANT_ALL, null, false, null, null));
plan.setScheduledMode(ImportMode.INCREMENTAL_UPDATE);
plan.setScheduledStrategy(AcquisitionStrategy.SOURCE_ROW_WATERMARK);
plan.setInitialOccurredFrom(OffsetDateTime.parse("2026-04-01T00:00:00+02:00"));
plan.setInitialOccurredTo(OffsetDateTime.parse("2026-04-02T00:00:00+02:00"));
properties.getYellowFox().getImportPlans().add(plan);
YellowFoxD8ConfiguredImportPlanService service = new YellowFoxD8ConfiguredImportPlanService(properties);
YellowFoxD8ImportRequest request = service.createScheduledRequest(plan);
assertThat(request.mode()).isEqualTo(ImportMode.INCREMENTAL_UPDATE);
assertThat(request.acquisitionStrategy()).isEqualTo(AcquisitionStrategy.SOURCE_ROW_WATERMARK);
assertThat(request.importScope().occurredFrom()).isEqualTo(OffsetDateTime.parse("2026-04-01T00:00:00+02:00"));
assertThat(request.importScope().occurredTo()).isEqualTo(OffsetDateTime.parse("2026-04-02T00:00:00+02:00"));
}
}

View File

@ -0,0 +1,65 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.service.EventDetailsFactory;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime;
import java.util.Map;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class YellowFoxD8IgnitionTransitionDetectorTest {
@Test
void emitsBoundaryTransitionFromPreviousIgnitionFromSourceRow() {
YellowFoxD8BookingEventMapper mapper = new YellowFoxD8BookingEventMapper(new EventDetailsFactory(new ObjectMapper()));
YellowFoxD8IgnitionTransitionDetector detector = new YellowFoxD8IgnitionTransitionDetector(mapper);
var event = detector.newSession(false).detect(booking(1, 0));
assertThat(event).isNotNull();
assertThat(event.eventDomain()).isEqualTo(EventDomain.IGNITION);
assertThat(event.eventType()).isEqualTo(EventType.IGNITION_ON);
}
@Test
void doesNotEmitWhenCurrentIgnitionMatchesPreviousIgnitionFromSourceRow() {
YellowFoxD8BookingEventMapper mapper = new YellowFoxD8BookingEventMapper(new EventDetailsFactory(new ObjectMapper()));
YellowFoxD8IgnitionTransitionDetector detector = new YellowFoxD8IgnitionTransitionDetector(mapper);
var event = detector.newSession(false).detect(booking(1, 1));
assertThat(event).isNull();
}
private YellowFoxD8BookingDto booking(Integer ignition, Integer previousIgnition) {
return new YellowFoxD8BookingDto(
"tenant-1",
"instance-1",
"setting-1",
"7",
"Fleet 7",
"evt-1",
"key-1",
ignition,
previousIgnition,
2,
3,
OffsetDateTime.parse("2026-05-08T10:15:30+02:00"),
null,
new DriverRefDto("1", new DriverCardRefDto(YellowFoxReferenceSemantics.SYNTHETIC_REFERENCE_NATION, "12345678901234")),
new VehicleRefDto("42", "VIN-42", "42", new VehicleRegistrationRefDto(YellowFoxReferenceSemantics.SYNTHETIC_REFERENCE_NATION, "W-4242")),
123_000L,
null,
null,
Map.of()
);
}
}