Add runtime EventHub-backed event loading

This commit is contained in:
trifonovt 2026-05-20 13:08:06 +02:00
parent 8f5e51a5c1
commit b35d428e80
14 changed files with 1434 additions and 0 deletions

View File

@ -0,0 +1,70 @@
package at.procon.eventhub.processing.model;
public record UnifiedDiscoveredVehicleRef(
String sourceVehicleEntityId,
String vin,
String registrationNation,
String registrationNumber
) {
public UnifiedDiscoveredVehicleRef {
sourceVehicleEntityId = normalize(sourceVehicleEntityId);
vin = normalizeUpper(vin);
registrationNation = normalizeUpper(registrationNation);
registrationNumber = normalize(registrationNumber);
}
public boolean hasAnyReference() {
return sourceVehicleEntityId != null || vin != null || registrationNumber != null;
}
public boolean matches(UnifiedDiscoveredVehicleRef other) {
if (other == null) {
return false;
}
return same(sourceVehicleEntityId, other.sourceVehicleEntityId)
|| same(vin, other.vin)
|| (same(registrationNumber, other.registrationNumber)
&& same(registrationNation, other.registrationNation));
}
public UnifiedDiscoveredVehicleRef merge(UnifiedDiscoveredVehicleRef other) {
if (other == null) {
return this;
}
return new UnifiedDiscoveredVehicleRef(
firstNonBlank(sourceVehicleEntityId, other.sourceVehicleEntityId),
firstNonBlank(vin, other.vin),
firstNonBlank(registrationNation, other.registrationNation),
firstNonBlank(registrationNumber, other.registrationNumber)
);
}
public String stableKey() {
if (sourceVehicleEntityId != null) {
return "SOURCE_VEHICLE:" + sourceVehicleEntityId;
}
if (vin != null) {
return "VIN:" + vin;
}
return "REG:" + (registrationNation == null ? "" : registrationNation) + ":" + (registrationNumber == null ? "" : registrationNumber);
}
private boolean same(String left, String right) {
return left != null && right != null && left.equals(right);
}
private static String firstNonBlank(String first, String second) {
if (first != null && !first.isBlank()) {
return first;
}
return second != null && !second.isBlank() ? second : null;
}
private static String normalize(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
private static String normalizeUpper(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase();
}
}

View File

@ -123,6 +123,31 @@ public record UnifiedDriverEventsRequest(
); );
} }
public static UnifiedDriverEventsRequest forYellowFoxDbDriver(
String tenantKey,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
return new UnifiedDriverEventsRequest(
UnifiedEventSourceFamily.YELLOWFOX_DB,
null,
null,
tenantKey,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
null,
null,
null,
null,
occurredFrom,
occurredTo
);
}
public boolean hasDriverSelector() { public boolean hasDriverSelector() {
return hasDriverSelector(driverSourceEntityId, driverCardNumber); return hasDriverSelector(driverSourceEntityId, driverCardNumber);
} }

View File

@ -0,0 +1,6 @@
package at.procon.eventhub.processing.model;
public enum UnifiedRuntimeEventBackend {
SOURCE_DB,
EVENTHUB_DB
}

View File

@ -0,0 +1,21 @@
package at.procon.eventhub.processing.model;
import at.procon.eventhub.dto.EventHubEventDto;
import java.util.List;
public record UnifiedRuntimeEventBundle(
UnifiedRuntimeProcessingRequest request,
List<EventHubEventDto> driverSeedEvents,
List<UnifiedDiscoveredVehicleRef> discoveredVehicles,
List<EventHubEventDto> expandedVehicleEvents,
List<EventHubEventDto> mergedEvents,
List<String> notes
) {
public UnifiedRuntimeEventBundle {
driverSeedEvents = driverSeedEvents == null ? List.of() : List.copyOf(driverSeedEvents);
discoveredVehicles = discoveredVehicles == null ? List.of() : List.copyOf(discoveredVehicles);
expandedVehicleEvents = expandedVehicleEvents == null ? List.of() : List.copyOf(expandedVehicleEvents);
mergedEvents = mergedEvents == null ? List.of() : List.copyOf(mergedEvents);
notes = notes == null ? List.of() : List.copyOf(notes);
}
}

View File

@ -0,0 +1,156 @@
package at.procon.eventhub.processing.model;
import java.time.OffsetDateTime;
import java.util.Set;
public record UnifiedRuntimeProcessingRequest(
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
UnifiedRuntimeEventBackend eventBackend,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes
) {
public UnifiedRuntimeProcessingRequest {
tenantKey = normalize(tenantKey);
driverSourceEntityId = normalize(driverSourceEntityId);
driverCardNation = normalizeUpper(driverCardNation);
driverCardNumber = normalize(driverCardNumber);
if (tenantKey == null) {
throw new IllegalArgumentException("tenantKey must not be blank");
}
if (sourceFamilies == null || sourceFamilies.isEmpty()) {
throw new IllegalArgumentException("sourceFamilies must not be empty");
}
sourceFamilies = Set.copyOf(sourceFamilies);
eventBackend = eventBackend == null ? UnifiedRuntimeEventBackend.SOURCE_DB : eventBackend;
if (driverSourceEntityId == null && driverCardNumber == null) {
throw new IllegalArgumentException("At least one driver selector must be provided.");
}
if (occurredFrom != null && occurredTo != null && occurredTo.isBefore(occurredFrom)) {
throw new IllegalArgumentException("occurredTo must not be before occurredFrom");
}
if (vehicleExpansionPaddingMinutes < 0) {
throw new IllegalArgumentException("vehicleExpansionPaddingMinutes must not be negative");
}
}
public static UnifiedRuntimeProcessingRequest forDriver(
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
return forDriver(
tenantKey,
sourceFamilies,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
UnifiedRuntimeEventBackend.SOURCE_DB,
true,
0
);
}
public static UnifiedRuntimeProcessingRequest forDriver(
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes
) {
return forDriver(
tenantKey,
sourceFamilies,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
UnifiedRuntimeEventBackend.SOURCE_DB,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
);
}
public static UnifiedRuntimeProcessingRequest forDriver(
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
UnifiedRuntimeEventBackend eventBackend,
boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes
) {
return new UnifiedRuntimeProcessingRequest(
tenantKey,
sourceFamilies,
eventBackend,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
);
}
public static UnifiedRuntimeProcessingRequest forDriverFromEventHub(
String tenantKey,
Set<UnifiedEventSourceFamily> sourceFamilies,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo,
boolean expandVehicleEvents,
int vehicleExpansionPaddingMinutes
) {
return new UnifiedRuntimeProcessingRequest(
tenantKey,
sourceFamilies,
UnifiedRuntimeEventBackend.EVENTHUB_DB,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
occurredFrom,
occurredTo,
expandVehicleEvents,
vehicleExpansionPaddingMinutes
);
}
public OffsetDateTime vehicleOccurredFrom() {
return occurredFrom == null ? null : occurredFrom.minusMinutes(vehicleExpansionPaddingMinutes);
}
public OffsetDateTime vehicleOccurredTo() {
return occurredTo == null ? null : occurredTo.plusMinutes(vehicleExpansionPaddingMinutes);
}
private static String normalize(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
private static String normalizeUpper(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase();
}
}

View File

@ -0,0 +1,114 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.model.UnifiedVehicleEventsRequest;
import java.util.List;
import org.springframework.stereotype.Component;
@Component
public class EventHubRuntimeEventLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader {
private final UnifiedDriverEventSourceService driverEventSourceService;
private final UnifiedVehicleEventSourceService vehicleEventSourceService;
public EventHubRuntimeEventLoader(
UnifiedDriverEventSourceService driverEventSourceService,
UnifiedVehicleEventSourceService vehicleEventSourceService
) {
this.driverEventSourceService = driverEventSourceService;
this.vehicleEventSourceService = vehicleEventSourceService;
}
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB
&& (sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB
|| sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB);
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
throw new UnsupportedOperationException("Source family must be specified when loading EventHub-backed runtime driver events.");
}
@Override
public List<EventHubEventDto> loadVehicleEvents(
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef
) {
throw new UnsupportedOperationException("Source family must be specified when loading EventHub-backed runtime vehicle events.");
}
public List<EventHubEventDto> loadDriverEvents(
UnifiedRuntimeProcessingRequest request,
UnifiedEventSourceFamily sourceFamily
) {
return driverEventSourceService.loadDriverEvents(driverRequest(request, sourceFamily));
}
public List<EventHubEventDto> loadVehicleEvents(
UnifiedRuntimeProcessingRequest request,
UnifiedEventSourceFamily sourceFamily,
UnifiedDiscoveredVehicleRef vehicleRef
) {
return vehicleEventSourceService.loadVehicleEvents(vehicleRequest(request, sourceFamily, vehicleRef));
}
private UnifiedDriverEventsRequest driverRequest(
UnifiedRuntimeProcessingRequest request,
UnifiedEventSourceFamily sourceFamily
) {
return switch (sourceFamily) {
case TACHOGRAPH_DB -> UnifiedDriverEventsRequest.forTachographDbDriver(
request.tenantKey(),
request.driverSourceEntityId(),
request.driverCardNation(),
request.driverCardNumber(),
request.occurredFrom(),
request.occurredTo()
);
case YELLOWFOX_DB -> UnifiedDriverEventsRequest.forYellowFoxDbDriver(
request.tenantKey(),
request.driverSourceEntityId(),
request.driverCardNation(),
request.driverCardNumber(),
request.occurredFrom(),
request.occurredTo()
);
default -> throw new IllegalArgumentException("EventHub runtime backend does not support source family " + sourceFamily + ".");
};
}
private UnifiedVehicleEventsRequest vehicleRequest(
UnifiedRuntimeProcessingRequest request,
UnifiedEventSourceFamily sourceFamily,
UnifiedDiscoveredVehicleRef vehicleRef
) {
return switch (sourceFamily) {
case TACHOGRAPH_DB -> UnifiedVehicleEventsRequest.forTachographDb(
request.tenantKey(),
vehicleRef.sourceVehicleEntityId(),
vehicleRef.vin(),
vehicleRef.registrationNation(),
vehicleRef.registrationNumber(),
request.vehicleOccurredFrom(),
request.vehicleOccurredTo()
);
case YELLOWFOX_DB -> UnifiedVehicleEventsRequest.forYellowFoxDb(
request.tenantKey(),
vehicleRef.sourceVehicleEntityId(),
vehicleRef.vin(),
vehicleRef.registrationNation(),
vehicleRef.registrationNumber(),
request.vehicleOccurredFrom(),
request.vehicleOccurredTo()
);
default -> throw new IllegalArgumentException("EventHub runtime backend does not support source family " + sourceFamily + ".");
};
}
}

View File

@ -0,0 +1,13 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import java.util.List;
public interface RuntimeDriverEventLoader {
boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily);
List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request);
}

View File

@ -0,0 +1,17 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import java.util.List;
public interface RuntimeVehicleEventLoader {
boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily);
List<EventHubEventDto> loadVehicleEvents(
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef
);
}

View File

@ -0,0 +1,254 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.extraction.ExtractionContext;
import at.procon.eventhub.importing.extraction.ExtractionDefinition;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import at.procon.eventhub.tachograph.service.TachographExtractionDefinitionRegistry;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StreamUtils;
@Component
@ConditionalOnBean(name = "tachographNamedParameterJdbcTemplate")
@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.tachograph.datasource.jdbc-url:}')")
public class TachographDbRuntimeEventLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader {
private final NamedParameterJdbcTemplate jdbcTemplate;
private final TachographExtractionDefinitionRegistry definitionRegistry;
private final ResourceLoader resourceLoader;
public TachographDbRuntimeEventLoader(
@Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate,
TachographExtractionDefinitionRegistry definitionRegistry,
ResourceLoader resourceLoader
) {
this.jdbcTemplate = jdbcTemplate;
this.definitionRegistry = definitionRegistry;
this.resourceLoader = resourceLoader;
}
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB
&& sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
List<EventHubEventDto> result = new ArrayList<>();
for (ExtractionDefinition<TachographImportRequest> definition : driverDefinitions()) {
result.addAll(queryDefinition(
definition,
request,
null,
request.occurredFrom(),
request.occurredTo()
));
}
return List.copyOf(result);
}
@Override
public List<EventHubEventDto> loadVehicleEvents(
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef
) {
List<EventHubEventDto> result = new ArrayList<>();
for (ExtractionDefinition<TachographImportRequest> definition : vehicleDefinitions()) {
result.addAll(queryDefinition(
definition,
request,
vehicleRef,
request.vehicleOccurredFrom(),
request.vehicleOccurredTo()
));
}
return List.copyOf(result);
}
private List<EventHubEventDto> queryDefinition(
ExtractionDefinition<TachographImportRequest> definition,
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
List<EventHubEventDto> events = new ArrayList<>();
String sql = wrapSql(loadSql(definition.sqlResource()), request, vehicleRef);
Map<String, Object> params = parameters(request, vehicleRef, occurredFrom, occurredTo);
ExtractionContext<TachographImportRequest> context = context(definition, request, occurredFrom, occurredTo);
jdbcTemplate.query(sql, params, rs -> {
int rowNum = 0;
while (rs.next()) {
events.add(definition.rowMapper().map(rs, rowNum, context));
rowNum++;
}
return null;
});
return events;
}
private List<ExtractionDefinition<TachographImportRequest>> driverDefinitions() {
return definitionRegistry.definitions().stream()
.filter(definition -> !"VEHICLE".equals(definition.entityAxis()))
.toList();
}
private List<ExtractionDefinition<TachographImportRequest>> vehicleDefinitions() {
return definitionRegistry.definitions().stream()
.filter(definition -> !"DRIVER".equals(definition.entityAxis()))
.toList();
}
private ExtractionContext<TachographImportRequest> context(
ExtractionDefinition<TachographImportRequest> definition,
UnifiedRuntimeProcessingRequest request,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
EventSourceDto eventSource = eventSourceFor(definition.sourceKind());
ImportScopeDto scope = ImportScopeDto.tenantAll(occurredFrom, occurredTo);
TachographImportRequest importRequest = new TachographImportRequest(
request.tenantKey(),
eventSource,
null,
scope,
EnumSet.of(definition.eventFamily()),
ImportMode.REPROCESS,
false,
AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP
);
ImportPlanItemDto planItem = new ImportPlanItemDto(
definition.eventFamily(),
definition.sourceKind(),
definition.code(),
List.of(),
definition.entityAxis(),
"Runtime direct tachograph load",
AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP
);
ImportTimeChunkDto chunk = new ImportTimeChunkDto(1, occurredFrom, occurredTo);
EventHubPackageRequest packageInfo = new EventHubPackageRequest(
request.tenantKey(),
eventSource,
null,
scope,
definition.eventFamily().name(),
occurredFrom == null ? null : occurredFrom.toLocalDate(),
"RUNTIME:TACHOGRAPH:" + definition.code() + ":" + UUID.randomUUID()
);
return new ExtractionContext<>(
UUID.randomUUID(),
UUID.randomUUID(),
0,
importRequest,
planItem,
chunk,
eventSource,
packageInfo
);
}
private EventSourceDto eventSourceFor(String sourceKind) {
String sourceKey = switch (sourceKind) {
case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT";
case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD";
default -> "TACHOGRAPH_" + sourceKind;
};
return new EventSourceDto("TACHOGRAPH", sourceKind, sourceKey, null, null, null);
}
private Map<String, Object> parameters(
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
Map<String, Object> params = new HashMap<>();
params.put("occurredFrom", occurredFrom);
params.put("occurredTo", occurredTo);
params.put("organisationId", null);
params.put("sourcePackageWatermarkEnabled", 0);
params.put("lastSourcePackageImportedAt", null);
params.put("lastSourcePackageIdNumeric", null);
params.put("driverSourceEntityId", request.driverSourceEntityId());
params.put("driverCardNation", request.driverCardNation());
params.put("driverCardNumber", request.driverCardNumber());
params.put("vehicleSourceEntityId", vehicleRef == null ? null : vehicleRef.sourceVehicleEntityId());
params.put("vehicleVin", vehicleRef == null ? null : vehicleRef.vin());
params.put("vehicleRegistrationNation", vehicleRef == null ? null : vehicleRef.registrationNation());
params.put("vehicleRegistrationNumber", vehicleRef == null ? null : vehicleRef.registrationNumber());
return params;
}
private String wrapSql(
String baseSql,
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef
) {
StringBuilder sql = new StringBuilder("select * from (");
sql.append(baseSql);
sql.append("\n) runtime where 1 = 1");
if (request.driverSourceEntityId() != null) {
sql.append("\n and runtime.driver_source_entity_id = :driverSourceEntityId");
}
if (request.driverCardNumber() != null) {
sql.append("\n and runtime.driver_card_number = :driverCardNumber");
if (request.driverCardNation() != null) {
sql.append("\n and runtime.driver_card_nation = :driverCardNation");
}
}
if (vehicleRef != null) {
if (vehicleRef.sourceVehicleEntityId() != null) {
sql.append("\n and runtime.vehicle_source_entity_id = :vehicleSourceEntityId");
}
if (vehicleRef.vin() != null) {
sql.append("\n and runtime.vehicle_vin = :vehicleVin");
}
if (vehicleRef.registrationNumber() != null) {
sql.append("\n and runtime.vehicle_registration_number = :vehicleRegistrationNumber");
if (vehicleRef.registrationNation() != null) {
sql.append("\n and runtime.vehicle_registration_nation = :vehicleRegistrationNation");
}
}
}
sql.append("\norder by runtime.occurred_at, runtime.external_source_event_id");
return sql.toString();
}
private String loadSql(String sqlResource) {
Resource resource = resourceLoader.getResource(sqlResource);
try (var in = resource.getInputStream()) {
return StreamUtils.copyToString(in, StandardCharsets.UTF_8);
} catch (IOException ex) {
throw new IllegalStateException("Failed to load tachograph runtime SQL " + sqlResource, ex);
}
}
}

View File

@ -0,0 +1,165 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import org.springframework.stereotype.Service;
@Service
public class UnifiedRuntimeEventAssemblyService {
private final List<RuntimeDriverEventLoader> driverEventLoaders;
private final List<RuntimeVehicleEventLoader> vehicleEventLoaders;
public UnifiedRuntimeEventAssemblyService(
List<RuntimeDriverEventLoader> driverEventLoaders,
List<RuntimeVehicleEventLoader> vehicleEventLoaders
) {
this.driverEventLoaders = List.copyOf(driverEventLoaders);
this.vehicleEventLoaders = List.copyOf(vehicleEventLoaders);
}
public UnifiedRuntimeEventBundle assembleDriverScopedEvents(UnifiedRuntimeProcessingRequest request) {
List<EventHubEventDto> driverSeedEvents = loadDriverSeedEvents(request);
List<UnifiedDiscoveredVehicleRef> discoveredVehicles = discoverVehicles(driverSeedEvents);
List<EventHubEventDto> expandedVehicleEvents = request.expandVehicleEvents()
? loadExpandedVehicleEvents(request, discoveredVehicles)
: List.of();
List<EventHubEventDto> mergedEvents = deduplicateAndSort(driverSeedEvents, expandedVehicleEvents);
List<String> notes = new ArrayList<>();
notes.add(request.eventBackend() == UnifiedRuntimeEventBackend.EVENTHUB_DB
? "Driver seed events were loaded from the local EventHub event store."
: "Driver seed events were loaded directly from the selected source databases.");
if (request.expandVehicleEvents()) {
notes.add("Vehicle expansion loaded additional events for vehicles discovered in the driver seed set.");
notes.add("Vehicle expansion padding minutes: " + request.vehicleExpansionPaddingMinutes() + ".");
} else {
notes.add("Vehicle expansion was disabled for this runtime request.");
}
return new UnifiedRuntimeEventBundle(
request,
driverSeedEvents,
discoveredVehicles,
expandedVehicleEvents,
mergedEvents,
notes
);
}
private List<EventHubEventDto> loadDriverSeedEvents(UnifiedRuntimeProcessingRequest request) {
List<EventHubEventDto> result = new ArrayList<>();
for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) {
RuntimeDriverEventLoader loader = driverLoader(request, sourceFamily);
if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) {
result.addAll(eventHubLoader.loadDriverEvents(request, sourceFamily));
} else {
result.addAll(loader.loadDriverEvents(request));
}
}
return deduplicateAndSort(result, List.of());
}
private List<EventHubEventDto> loadExpandedVehicleEvents(
UnifiedRuntimeProcessingRequest request,
List<UnifiedDiscoveredVehicleRef> discoveredVehicles
) {
List<EventHubEventDto> result = new ArrayList<>();
for (UnifiedDiscoveredVehicleRef vehicleRef : discoveredVehicles) {
for (UnifiedEventSourceFamily sourceFamily : request.sourceFamilies()) {
RuntimeVehicleEventLoader loader = vehicleLoader(request, sourceFamily);
if (loader instanceof EventHubRuntimeEventLoader eventHubLoader) {
result.addAll(eventHubLoader.loadVehicleEvents(request, sourceFamily, vehicleRef));
} else {
result.addAll(loader.loadVehicleEvents(request, vehicleRef));
}
}
}
return deduplicateAndSort(result, List.of());
}
private List<UnifiedDiscoveredVehicleRef> discoverVehicles(List<EventHubEventDto> events) {
List<UnifiedDiscoveredVehicleRef> result = new ArrayList<>();
for (EventHubEventDto event : events) {
VehicleRefDto vehicleRef = event.vehicleRef();
if (vehicleRef == null || !vehicleRef.hasAnyReference()) {
continue;
}
UnifiedDiscoveredVehicleRef candidate = new UnifiedDiscoveredVehicleRef(
vehicleRef.sourceVehicleEntityId(),
vehicleRef.vin(),
vehicleRef.vehicleRegistration() == null ? null : vehicleRef.vehicleRegistration().nation(),
vehicleRef.vehicleRegistration() == null ? null : vehicleRef.vehicleRegistration().number()
);
if (!candidate.hasAnyReference()) {
continue;
}
boolean merged = false;
for (int i = 0; i < result.size(); i++) {
UnifiedDiscoveredVehicleRef existing = result.get(i);
if (existing.matches(candidate)) {
result.set(i, existing.merge(candidate));
merged = true;
break;
}
}
if (!merged) {
result.add(candidate);
}
}
result.sort(Comparator.comparing(UnifiedDiscoveredVehicleRef::stableKey));
return List.copyOf(result);
}
private List<EventHubEventDto> deduplicateAndSort(
List<EventHubEventDto> left,
List<EventHubEventDto> right
) {
LinkedHashMap<String, EventHubEventDto> byKey = new LinkedHashMap<>();
appendDeduplicated(byKey, left);
appendDeduplicated(byKey, right);
return byKey.values().stream()
.sorted(Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(event -> event.eventDomain().name())
.thenComparing(event -> event.eventType().name())
.thenComparing(event -> event.lifecycle().name())
.thenComparing(EventHubEventDto::externalSourceEventId))
.toList();
}
private void appendDeduplicated(LinkedHashMap<String, EventHubEventDto> byKey, List<EventHubEventDto> events) {
for (EventHubEventDto event : events) {
byKey.putIfAbsent(dedupKey(event), event);
}
}
private String dedupKey(EventHubEventDto event) {
String sourceKey = event.packageInfo() != null && event.packageInfo().eventSource() != null
? event.packageInfo().eventSource().stableKey()
: "NO_SOURCE";
return sourceKey + "|" + event.externalSourceEventId();
}
private RuntimeDriverEventLoader driverLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return driverEventLoaders.stream()
.filter(loader -> loader.supports(request, sourceFamily))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No runtime driver event loader is registered for source family " + sourceFamily + "."));
}
private RuntimeVehicleEventLoader vehicleLoader(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return vehicleEventLoaders.stream()
.filter(loader -> loader.supports(request, sourceFamily))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No runtime vehicle event loader is registered for source family " + sourceFamily + "."));
}
}

View File

@ -0,0 +1,181 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingEventMapper;
import at.procon.eventhub.yellowfox.service.YellowFoxD8BookingRowMapper;
import at.procon.eventhub.yellowfox.service.YellowFoxD8IgnitionTransitionDetector;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StreamUtils;
@Component
@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.yellow-fox.datasource.jdbc-url:}')")
public class YellowFoxDbRuntimeEventLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader {
private static final String SYNTHETIC_NATION = "YELLOWFOX";
private final NamedParameterJdbcTemplate jdbcTemplate;
private final ResourceLoader resourceLoader;
private final YellowFoxD8BookingRowMapper rowMapper;
private final YellowFoxD8BookingEventMapper eventMapper;
private final YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector;
private final EventHubProperties properties;
public YellowFoxDbRuntimeEventLoader(
@Qualifier("yellowFoxNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate,
ResourceLoader resourceLoader,
YellowFoxD8BookingRowMapper rowMapper,
YellowFoxD8BookingEventMapper eventMapper,
YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector,
EventHubProperties properties
) {
this.jdbcTemplate = jdbcTemplate;
this.resourceLoader = resourceLoader;
this.rowMapper = rowMapper;
this.eventMapper = eventMapper;
this.ignitionTransitionDetector = ignitionTransitionDetector;
this.properties = properties;
}
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB
&& sourceFamily == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
if (request.driverCardNation() != null && !SYNTHETIC_NATION.equalsIgnoreCase(request.driverCardNation())) {
return List.of();
}
return query(request, null, request.occurredFrom(), request.occurredTo());
}
@Override
public List<EventHubEventDto> loadVehicleEvents(
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef
) {
if (vehicleRef.registrationNation() != null && !SYNTHETIC_NATION.equalsIgnoreCase(vehicleRef.registrationNation())) {
return List.of();
}
return query(request, vehicleRef, request.vehicleOccurredFrom(), request.vehicleOccurredTo());
}
private List<EventHubEventDto> query(
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
String sql = applyFilters(loadSqlTemplate(), filters(request, vehicleRef, occurredFrom, occurredTo));
Map<String, Object> params = parameters(request, vehicleRef, occurredFrom, occurredTo);
List<EventHubEventDto> events = new ArrayList<>();
YellowFoxD8IgnitionTransitionDetector.Session ignitionSession =
ignitionTransitionDetector.newSession(properties.getYellowFox().isEmitInitialIgnitionSnapshot());
jdbcTemplate.query(sql, params, rs -> {
while (rs.next()) {
YellowFoxD8BookingDto booking = rowMapper.map(rs, request.tenantKey(), null, null);
if (!hasEventReference(booking)) {
continue;
}
events.add(eventMapper.map(booking));
EventHubEventDto ignitionEvent = ignitionSession.detect(booking);
if (ignitionEvent != null) {
events.add(ignitionEvent);
}
}
return null;
});
return List.copyOf(events);
}
private String filters(
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
StringBuilder filters = new StringBuilder("where 1 = 1");
if (occurredFrom != null) {
filters.append("\n and b.utc >= :occurredFrom");
}
if (occurredTo != null) {
filters.append("\n and b.utc < :occurredTo");
}
if (request.driverSourceEntityId() != null) {
filters.append("\n and cast(b.driver_id as varchar(128)) = :driverSourceEntityId");
}
if (request.driverCardNumber() != null) {
filters.append("\n and left(trim(d.drivers_card), 14) = :driverCardNumber");
}
if (vehicleRef != null) {
if (vehicleRef.sourceVehicleEntityId() != null) {
filters.append("\n and cast(b.vehicle_id as varchar(128)) = :vehicleSourceEntityId");
}
if (vehicleRef.vin() != null) {
filters.append("\n and v.vin = :vehicleVin");
}
if (vehicleRef.registrationNumber() != null) {
filters.append("\n and v.vrn = :vehicleRegistrationNumber");
}
}
return filters.toString();
}
private Map<String, Object> parameters(
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
Map<String, Object> params = new HashMap<>();
params.put("occurredFrom", occurredFrom);
params.put("occurredTo", occurredTo);
params.put("driverSourceEntityId", request.driverSourceEntityId());
params.put("driverCardNumber", request.driverCardNumber());
params.put("vehicleSourceEntityId", vehicleRef == null ? null : vehicleRef.sourceVehicleEntityId());
params.put("vehicleVin", vehicleRef == null ? null : vehicleRef.vin());
params.put("vehicleRegistrationNumber", vehicleRef == null ? null : vehicleRef.registrationNumber());
return params;
}
private boolean hasEventReference(YellowFoxD8BookingDto booking) {
return (booking.driverRef() != null && booking.driverRef().hasAnyReference())
|| (booking.vehicleRef() != null && booking.vehicleRef().hasAnyReference());
}
private String applyFilters(String sqlTemplate, String filters) {
if (!sqlTemplate.contains("/*__FILTERS__*/")) {
throw new IllegalStateException("YellowFox D8 runtime SQL template is missing filter marker");
}
return sqlTemplate.replace("/*__FILTERS__*/", filters);
}
private String loadSqlTemplate() {
Resource resource = resourceLoader.getResource("classpath:sql/yellowfox/d8-bookings.sql");
try (var in = resource.getInputStream()) {
return StreamUtils.copyToString(in, StandardCharsets.UTF_8);
} catch (IOException ex) {
throw new IllegalStateException("Failed to load YellowFox D8 runtime SQL", ex);
}
}
}

View File

@ -0,0 +1,89 @@
package at.procon.eventhub.processing.model;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.OffsetDateTime;
import java.util.Set;
import org.junit.jupiter.api.Test;
class UnifiedRuntimeProcessingRequestTest {
@Test
void buildsDriverScopedRuntimeRequest() {
UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriver(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB),
"DRIVER:42",
"AT",
"123",
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z")
);
assertThat(request.tenantKey()).isEqualTo("default");
assertThat(request.sourceFamilies()).containsExactlyInAnyOrder(
UnifiedEventSourceFamily.TACHOGRAPH_DB,
UnifiedEventSourceFamily.YELLOWFOX_DB
);
assertThat(request.driverSourceEntityId()).isEqualTo("DRIVER:42");
assertThat(request.driverCardNation()).isEqualTo("AT");
assertThat(request.driverCardNumber()).isEqualTo("123");
assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.SOURCE_DB);
assertThat(request.expandVehicleEvents()).isTrue();
assertThat(request.vehicleOccurredFrom()).isEqualTo(OffsetDateTime.parse("2026-05-01T00:00:00Z"));
}
@Test
void canDisableVehicleExpansionExplicitly() {
UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriver(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB),
"DRIVER:42",
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
30
);
assertThat(request.expandVehicleEvents()).isFalse();
assertThat(request.vehicleExpansionPaddingMinutes()).isEqualTo(30);
}
@Test
void canBuildEventHubBackedRuntimeRequest() {
UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriverFromEventHub(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB),
"DRIVER:42",
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15
);
assertThat(request.eventBackend()).isEqualTo(UnifiedRuntimeEventBackend.EVENTHUB_DB);
assertThat(request.vehicleExpansionPaddingMinutes()).isEqualTo(15);
}
@Test
void rejectsRequestWithoutDriverSelector() {
assertThatThrownBy(() -> new UnifiedRuntimeProcessingRequest(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB),
UnifiedRuntimeEventBackend.SOURCE_DB,
null,
null,
null,
null,
null,
true,
0
)).isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("At least one driver selector");
}
}

View File

@ -0,0 +1,159 @@
package at.procon.eventhub.processing.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import at.procon.eventhub.processing.model.UnifiedVehicleEventsRequest;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.junit.jupiter.api.Test;
class EventHubRuntimeEventLoaderTest {
@Test
void supportsOnlyEventHubBackendForTachographAndYellowFox() {
EventHubRuntimeEventLoader loader = new EventHubRuntimeEventLoader(
new UnifiedDriverEventSourceService(List.of(new CapturingDriverSource())),
new UnifiedVehicleEventSourceService(List.of(new CapturingVehicleSource()))
);
UnifiedRuntimeProcessingRequest eventHubRequest = UnifiedRuntimeProcessingRequest.forDriverFromEventHub(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB),
"DRIVER:42",
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15
);
UnifiedRuntimeProcessingRequest sourceDbRequest = UnifiedRuntimeProcessingRequest.forDriver(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB),
"DRIVER:42",
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z")
);
assertThat(loader.supports(eventHubRequest, UnifiedEventSourceFamily.TACHOGRAPH_DB)).isTrue();
assertThat(loader.supports(eventHubRequest, UnifiedEventSourceFamily.YELLOWFOX_DB)).isTrue();
assertThat(loader.supports(sourceDbRequest, UnifiedEventSourceFamily.TACHOGRAPH_DB)).isFalse();
assertThat(loader.supports(eventHubRequest, UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION)).isFalse();
}
@Test
void buildsDriverRequestsForLocalEventHubSources() {
CapturingDriverSource driverSource = new CapturingDriverSource();
EventHubRuntimeEventLoader loader = new EventHubRuntimeEventLoader(
new UnifiedDriverEventSourceService(List.of(driverSource)),
new UnifiedVehicleEventSourceService(List.of(new CapturingVehicleSource()))
);
UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriverFromEventHub(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB),
"DRIVER:42",
"AT",
"123",
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15
);
loader.loadDriverEvents(request, UnifiedEventSourceFamily.TACHOGRAPH_DB);
loader.loadDriverEvents(request, UnifiedEventSourceFamily.YELLOWFOX_DB);
assertThat(driverSource.requests).hasSize(2);
assertThat(driverSource.requests).extracting(UnifiedDriverEventsRequest::sourceFamily)
.containsExactly(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB);
assertThat(driverSource.requests).allSatisfy(driverRequest -> {
assertThat(driverRequest.tenantKey()).isEqualTo("default");
assertThat(driverRequest.driverSourceEntityId()).isEqualTo("DRIVER:42");
assertThat(driverRequest.driverCardNation()).isEqualTo("AT");
assertThat(driverRequest.driverCardNumber()).isEqualTo("123");
assertThat(driverRequest.occurredFrom()).isEqualTo(OffsetDateTime.parse("2026-05-01T00:00:00Z"));
assertThat(driverRequest.occurredTo()).isEqualTo(OffsetDateTime.parse("2026-05-02T00:00:00Z"));
});
}
@Test
void buildsVehicleRequestsForLocalEventHubSourcesWithPadding() {
CapturingVehicleSource vehicleSource = new CapturingVehicleSource();
EventHubRuntimeEventLoader loader = new EventHubRuntimeEventLoader(
new UnifiedDriverEventSourceService(List.of(new CapturingDriverSource())),
new UnifiedVehicleEventSourceService(List.of(vehicleSource))
);
UnifiedRuntimeProcessingRequest request = UnifiedRuntimeProcessingRequest.forDriverFromEventHub(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB),
"DRIVER:42",
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15
);
UnifiedDiscoveredVehicleRef vehicleRef = new UnifiedDiscoveredVehicleRef("VEH-1", "VIN-1", "AT", "W-1");
loader.loadVehicleEvents(request, UnifiedEventSourceFamily.TACHOGRAPH_DB, vehicleRef);
loader.loadVehicleEvents(request, UnifiedEventSourceFamily.YELLOWFOX_DB, vehicleRef);
assertThat(vehicleSource.requests).hasSize(2);
assertThat(vehicleSource.requests).extracting(UnifiedVehicleEventsRequest::sourceFamily)
.containsExactly(UnifiedEventSourceFamily.TACHOGRAPH_DB, UnifiedEventSourceFamily.YELLOWFOX_DB);
assertThat(vehicleSource.requests).allSatisfy(vehicleRequest -> {
assertThat(vehicleRequest.tenantKey()).isEqualTo("default");
assertThat(vehicleRequest.vehicleSourceEntityId()).isEqualTo("VEH-1");
assertThat(vehicleRequest.vin()).isEqualTo("VIN-1");
assertThat(vehicleRequest.registrationNation()).isEqualTo("AT");
assertThat(vehicleRequest.registrationNumber()).isEqualTo("W-1");
assertThat(vehicleRequest.occurredFrom()).isEqualTo(OffsetDateTime.parse("2026-04-30T23:45:00Z"));
assertThat(vehicleRequest.occurredTo()).isEqualTo(OffsetDateTime.parse("2026-05-02T00:15:00Z"));
});
}
private static final class CapturingDriverSource implements UnifiedDriverEventSource {
private final List<UnifiedDriverEventsRequest> requests = new ArrayList<>();
@Override
public boolean supports(UnifiedDriverEventsRequest request) {
return request.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_DB
|| request.sourceFamily() == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedDriverEventsRequest request) {
requests.add(request);
return List.of();
}
}
private static final class CapturingVehicleSource implements UnifiedVehicleEventSource {
private final List<UnifiedVehicleEventsRequest> requests = new ArrayList<>();
@Override
public boolean supports(UnifiedVehicleEventsRequest request) {
return request.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_DB
|| request.sourceFamily() == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
@Override
public List<EventHubEventDto> loadVehicleEvents(UnifiedVehicleEventsRequest request) {
requests.add(request);
return List.of();
}
}
}

View File

@ -0,0 +1,164 @@
package at.procon.eventhub.processing.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBackend;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeProcessingRequest;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class UnifiedRuntimeEventAssemblyServiceTest {
@Test
void assemblesSeedAndExpandedVehicleEventsInMemory() {
UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService(
List.of(new FakeLoader()),
List.of(new FakeLoader())
);
UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents(
new UnifiedRuntimeProcessingRequest(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB),
UnifiedRuntimeEventBackend.SOURCE_DB,
"DRIVER:42",
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
true,
15
)
);
assertThat(bundle.driverSeedEvents()).hasSize(2);
assertThat(bundle.discoveredVehicles()).hasSize(2);
assertThat(bundle.discoveredVehicles()).extracting(UnifiedDiscoveredVehicleRef::stableKey)
.containsExactly("SOURCE_VEHICLE:VEH-1", "VIN:VIN-2");
assertThat(bundle.expandedVehicleEvents()).hasSize(2);
assertThat(bundle.mergedEvents()).hasSize(3);
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("SEED-1", "VEHICLE-EXPANDED", "SEED-2");
}
@Test
void canSkipVehicleExpansion() {
UnifiedRuntimeEventAssemblyService service = new UnifiedRuntimeEventAssemblyService(
List.of(new FakeLoader()),
List.of(new FakeLoader())
);
UnifiedRuntimeEventBundle bundle = service.assembleDriverScopedEvents(
UnifiedRuntimeProcessingRequest.forDriver(
"default",
Set.of(UnifiedEventSourceFamily.TACHOGRAPH_DB),
"DRIVER:42",
null,
null,
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z"),
false,
0
)
);
assertThat(bundle.driverSeedEvents()).hasSize(2);
assertThat(bundle.discoveredVehicles()).hasSize(2);
assertThat(bundle.expandedVehicleEvents()).isEmpty();
assertThat(bundle.mergedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("SEED-1", "SEED-2");
}
private static final class FakeLoader implements RuntimeDriverEventLoader, RuntimeVehicleEventLoader {
@Override
public boolean supports(UnifiedRuntimeProcessingRequest request, UnifiedEventSourceFamily sourceFamily) {
return request.eventBackend() == UnifiedRuntimeEventBackend.SOURCE_DB
&& sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedRuntimeProcessingRequest request) {
return List.of(
event("SEED-1", OffsetDateTime.parse("2026-05-01T08:00:00Z"), "VEH-1", "VIN-1", "AT", "W-1"),
event("SEED-2", OffsetDateTime.parse("2026-05-01T09:00:00Z"), null, "VIN-2", "AT", "W-2")
);
}
@Override
public List<EventHubEventDto> loadVehicleEvents(
UnifiedRuntimeProcessingRequest request,
UnifiedDiscoveredVehicleRef vehicleRef
) {
if ("SOURCE_VEHICLE:VEH-1".equals(vehicleRef.stableKey())) {
return List.of(
event("SEED-1", OffsetDateTime.parse("2026-05-01T08:00:00Z"), "VEH-1", "VIN-1", "AT", "W-1"),
event("VEHICLE-EXPANDED", OffsetDateTime.parse("2026-05-01T08:30:00Z"), "VEH-1", "VIN-1", "AT", "W-1")
);
}
return List.of();
}
private EventHubEventDto event(
String externalId,
OffsetDateTime occurredAt,
String sourceVehicleId,
String vin,
String registrationNation,
String registrationNumber
) {
EventSourceDto source = new EventSourceDto("TACHOGRAPH", "DRIVER_CARD", "TACHOGRAPH_DRIVER_CARD", null, null, null);
return new EventHubEventDto(
UUID.randomUUID(),
externalId,
new DriverRefDto("DRIVER:42", null),
new VehicleRefDto(
sourceVehicleId,
vin,
sourceVehicleId,
new VehicleRegistrationRefDto(registrationNation, registrationNumber)
),
occurredAt,
null,
occurredAt,
EventDomain.DRIVER_ACTIVITY,
EventType.DRIVE,
EventLifecycle.START,
null,
null,
null,
null,
null,
false,
new at.procon.eventhub.dto.EventHubPackageRequest(
"default",
source,
null,
ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z")
),
EventDomain.DRIVER_ACTIVITY.name(),
LocalDate.parse("2026-05-01"),
source.stableKey() + ":2026-05-01"
)
);
}
}
}