Add unified event processing sources

This commit is contained in:
trifonovt 2026-05-20 10:57:36 +02:00
parent dd0ccae290
commit 04d7bf513e
15 changed files with 1219 additions and 0 deletions

View File

@ -0,0 +1,389 @@
package at.procon.eventhub.persistence;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDetailsDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.ImportScopeType;
import at.procon.eventhub.dto.SourceGroupRefDto;
import at.procon.eventhub.dto.SourceGroupType;
import at.procon.eventhub.dto.SourcePackageRefDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class EventHubEventReadRepository {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
public EventHubEventReadRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
}
public List<EventHubEventDto> findEvents(
UnifiedDriverEventsRequest request,
String providerKey,
List<String> sourceKinds
) {
StringBuilder sql = new StringBuilder(
"""
select
event.id,
event.external_source_event_id,
event.occurred_at,
event.received_partner_at,
event.received_hub_at,
event.event_domain,
event.event_type,
event.lifecycle,
event.odometer_m,
ST_Y(event.position::geometry) as latitude,
ST_X(event.position::geometry) as longitude,
event.payload,
event.manual_entry,
source.provider_key,
source.source_kind,
source.source_key,
source.source_instance_key,
source.tenant_provider_setting_key,
source.external_fleet_key,
package.id as data_package_id,
package.tenant_key,
package.event_family,
package.business_date,
package.external_package_id,
package.source_group_type,
package.source_group_entity_id,
package.source_group_code,
package.source_group_name,
package.import_scope_type,
package.root_source_org_entity_id,
package.root_source_org_code,
package.root_source_org_name,
package.include_children,
package.occurred_from as package_occurred_from,
package.occurred_to as package_occurred_to,
package.source_package_kind as package_source_package_kind,
package.source_package_period_from,
package.source_package_period_to,
package.source_package_imported_at,
event.source_package_id,
event.source_package_entity_id,
detail.detail_type,
detail.attributes,
driver.source_driver_entity_id,
driver_card.nation as driver_card_nation,
driver_card.card_number as driver_card_number,
vehicle.source_vehicle_entity_id,
vehicle.vin,
registration.source_registration_entity_id,
registration.nation as vehicle_registration_nation,
registration.registration_number as vehicle_registration_number,
event.driver_id,
event.vehicle_id,
event.vehicle_registration_id
from eventhub.event event
join eventhub.event_source source on source.id = event.event_source_id
join eventhub.data_package package on package.id = event.data_package_id
left join lateral (
select detail_type, attributes
from eventhub.event_detail detail
where detail.event_occurred_at = event.occurred_at
and detail.event_id = event.id
order by detail_type
limit 1
) detail on true
left join eventhub.driver driver on driver.id = event.driver_id
left join eventhub.driver_card driver_card on driver_card.id = event.driver_card_id
left join eventhub.vehicle vehicle on vehicle.id = event.vehicle_id
left join eventhub.vehicle_registration registration on registration.id = event.vehicle_registration_id
where package.tenant_key = ?
and source.provider_key = ?
"""
);
List<Object> params = new ArrayList<>();
params.add(request.tenantKey());
params.add(providerKey);
if (sourceKinds != null && !sourceKinds.isEmpty()) {
sql.append(" and source.source_kind in (");
for (int i = 0; i < sourceKinds.size(); i++) {
if (i > 0) {
sql.append(", ");
}
sql.append("?");
params.add(sourceKinds.get(i));
}
sql.append(")");
}
if (request.occurredFrom() != null) {
sql.append(" and event.occurred_at >= ?");
params.add(request.occurredFrom());
}
if (request.occurredTo() != null) {
sql.append(" and event.occurred_at <= ?");
params.add(request.occurredTo());
}
if (request.driverSourceEntityId() != null) {
sql.append(" and driver.source_driver_entity_id = ?");
params.add(request.driverSourceEntityId());
}
if (request.driverCardNumber() != null) {
sql.append(" and driver_card.card_number = ?");
params.add(request.driverCardNumber());
if (request.driverCardNation() != null) {
sql.append(" and driver_card.nation = ?");
params.add(request.driverCardNation());
}
}
if (request.vehicleSourceEntityId() != null) {
sql.append(" and vehicle.source_vehicle_entity_id = ?");
params.add(request.vehicleSourceEntityId());
}
if (request.vin() != null) {
sql.append(" and vehicle.vin = ?");
params.add(request.vin());
}
if (request.registrationNumber() != null) {
sql.append(" and registration.registration_number = ?");
params.add(request.registrationNumber());
if (request.registrationNation() != null) {
sql.append(" and registration.nation = ?");
params.add(request.registrationNation());
}
}
sql.append(" order by event.occurred_at, event.event_domain, event.event_type, event.lifecycle, event.id");
return jdbcTemplate.query(
sql.toString(),
(rs, rowNum) -> mapEvent(rs),
params.toArray()
);
}
private EventHubEventDto mapEvent(ResultSet rs) throws SQLException {
DriverRefDto driverRef = driverRef(rs);
VehicleRefDto vehicleRef = vehicleRef(rs);
if ((driverRef == null || !driverRef.hasAnyReference()) && (vehicleRef == null || !vehicleRef.hasAnyReference())) {
throw new IllegalStateException("Loaded event does not have any driver or vehicle reference.");
}
return new EventHubEventDto(
uuid(rs, "id"),
rs.getString("external_source_event_id"),
driverRef,
vehicleRef,
rs.getObject("occurred_at", OffsetDateTime.class),
rs.getObject("received_partner_at", OffsetDateTime.class),
rs.getObject("received_hub_at", OffsetDateTime.class),
enumValue(EventDomain.class, rs.getString("event_domain"), EventDomain.TELEMATICS_DATA),
enumValue(EventType.class, rs.getString("event_type"), EventType.UNKNOWN_EVENT),
enumValue(EventLifecycle.class, rs.getString("lifecycle"), EventLifecycle.SNAPSHOT),
longValue(rs, "odometer_m"),
point(rs),
eventDetails(rs),
sourcePackageRef(rs),
json(rs.getString("payload")),
rs.getBoolean("manual_entry"),
packageInfo(rs)
);
}
private DriverRefDto driverRef(ResultSet rs) throws SQLException {
String sourceEntityId = firstNonBlank(
rs.getString("source_driver_entity_id"),
syntheticId("EVENTHUB_DRIVER", uuid(rs, "driver_id"))
);
String cardNumber = rs.getString("driver_card_number");
DriverCardRefDto driverCard = cardNumber == null
? null
: new DriverCardRefDto(rs.getString("driver_card_nation"), cardNumber);
DriverRefDto driverRef = new DriverRefDto(sourceEntityId, driverCard);
return driverRef.hasAnyReference() ? driverRef : null;
}
private VehicleRefDto vehicleRef(ResultSet rs) throws SQLException {
VehicleRegistrationRefDto registration = null;
String registrationNumber = rs.getString("vehicle_registration_number");
if (registrationNumber != null) {
registration = new VehicleRegistrationRefDto(rs.getString("vehicle_registration_nation"), registrationNumber);
}
VehicleRefDto vehicleRef = new VehicleRefDto(
firstNonBlank(
rs.getString("source_vehicle_entity_id"),
syntheticId("EVENTHUB_VEHICLE", uuid(rs, "vehicle_id"))
),
rs.getString("vin"),
firstNonBlank(
rs.getString("source_registration_entity_id"),
syntheticId("EVENTHUB_VEHICLE_REGISTRATION", uuid(rs, "vehicle_registration_id"))
),
registration
);
return vehicleRef.hasAnyReference() ? vehicleRef : null;
}
private EventDetailsDto eventDetails(ResultSet rs) throws SQLException {
String detailType = rs.getString("detail_type");
if (detailType == null || detailType.isBlank()) {
return null;
}
return new EventDetailsDto(detailType, json(rs.getString("attributes")));
}
private SourcePackageRefDto sourcePackageRef(ResultSet rs) throws SQLException {
String packageKind = rs.getString("package_source_package_kind");
String sourcePackageId = firstNonBlank(rs.getString("source_package_id"), null);
String sourceEntityId = firstNonBlank(rs.getString("source_package_entity_id"), null);
OffsetDateTime periodFrom = rs.getObject("source_package_period_from", OffsetDateTime.class);
OffsetDateTime periodTo = rs.getObject("source_package_period_to", OffsetDateTime.class);
OffsetDateTime importedAt = rs.getObject("source_package_imported_at", OffsetDateTime.class);
SourcePackageRefDto ref = new SourcePackageRefDto(
packageKind,
sourcePackageId,
sourceEntityId,
periodFrom,
periodTo,
importedAt
);
return ref.hasAnyReference() ? ref : null;
}
private EventHubPackageRequest packageInfo(ResultSet rs) throws SQLException {
EventSourceDto eventSource = new EventSourceDto(
rs.getString("provider_key"),
rs.getString("source_kind"),
rs.getString("source_key"),
rs.getString("source_instance_key"),
rs.getString("tenant_provider_setting_key"),
rs.getString("external_fleet_key")
);
SourceGroupRefDto sourceGroup = sourceGroup(rs);
ImportScopeDto importScope = importScope(rs);
String eventFamily = firstNonBlank(rs.getString("event_family"), rs.getString("event_domain"));
LocalDate businessDate = rs.getObject("business_date", LocalDate.class);
String externalPackageId = firstNonBlank(rs.getString("external_package_id"), rs.getString("data_package_id"));
return new EventHubPackageRequest(
rs.getString("tenant_key"),
eventSource,
sourceGroup,
importScope,
eventFamily,
businessDate,
externalPackageId
);
}
private SourceGroupRefDto sourceGroup(ResultSet rs) throws SQLException {
String groupType = rs.getString("source_group_type");
String sourceEntityId = rs.getString("source_group_entity_id");
String code = rs.getString("source_group_code");
String name = rs.getString("source_group_name");
if (groupType == null && sourceEntityId == null && code == null && name == null) {
return null;
}
return new SourceGroupRefDto(
enumValue(SourceGroupType.class, groupType, null),
sourceEntityId,
code,
name
);
}
private ImportScopeDto importScope(ResultSet rs) throws SQLException {
SourceGroupRefDto rootOrganisation = null;
String rootEntityId = rs.getString("root_source_org_entity_id");
String rootCode = rs.getString("root_source_org_code");
String rootName = rs.getString("root_source_org_name");
if (rootEntityId != null || rootCode != null || rootName != null) {
rootOrganisation = new SourceGroupRefDto(SourceGroupType.ORGANISATION, rootEntityId, rootCode, rootName);
}
return new ImportScopeDto(
enumValue(ImportScopeType.class, rs.getString("import_scope_type"), ImportScopeType.TENANT_ALL),
rootOrganisation,
rs.getBoolean("include_children"),
rs.getObject("package_occurred_from", OffsetDateTime.class),
rs.getObject("package_occurred_to", OffsetDateTime.class)
);
}
private GeoPointDto point(ResultSet rs) throws SQLException {
BigDecimal latitude = rs.getBigDecimal("latitude");
BigDecimal longitude = rs.getBigDecimal("longitude");
return latitude == null || longitude == null ? null : new GeoPointDto(latitude, longitude);
}
private UUID uuid(ResultSet rs, String column) throws SQLException {
return (UUID) rs.getObject(column);
}
private Long longValue(ResultSet rs, String column) throws SQLException {
Object value = rs.getObject(column);
if (value == null) {
return null;
}
if (value instanceof Number number) {
return number.longValue();
}
return Long.parseLong(value.toString());
}
private JsonNode json(String value) {
try {
return value == null || value.isBlank()
? objectMapper.createObjectNode()
: objectMapper.readTree(value);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Failed to parse JSON column.", e);
}
}
private String syntheticId(String prefix, UUID id) {
return id == null ? null : prefix + ":" + id;
}
private String firstNonBlank(String first, String second) {
if (first != null && !first.isBlank()) {
return first;
}
return second != null && !second.isBlank() ? second : null;
}
private <T extends Enum<T>> T enumValue(Class<T> type, String value, T fallback) {
if (value == null || value.isBlank()) {
return fallback;
}
String normalized = value.trim().toUpperCase(Locale.ROOT).replace('-', '_').replace(' ', '_');
try {
return Enum.valueOf(type, normalized);
} catch (IllegalArgumentException ignored) {
return fallback;
}
}
}

View File

@ -0,0 +1,149 @@
package at.procon.eventhub.processing.model;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.UUID;
public record UnifiedDriverEventsRequest(
UnifiedEventSourceFamily sourceFamily,
UUID sessionId,
String driverKey,
String tenantKey,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
String vehicleSourceEntityId,
String vin,
String registrationNation,
String registrationNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
public UnifiedDriverEventsRequest {
Objects.requireNonNull(sourceFamily, "sourceFamily must not be null");
driverKey = normalize(driverKey);
tenantKey = normalize(tenantKey);
driverSourceEntityId = normalize(driverSourceEntityId);
driverCardNation = normalizeUpper(driverCardNation);
driverCardNumber = normalize(driverCardNumber);
vehicleSourceEntityId = normalize(vehicleSourceEntityId);
vin = normalizeUpper(vin);
registrationNation = normalizeUpper(registrationNation);
registrationNumber = normalize(registrationNumber);
if (occurredFrom != null && occurredTo != null && occurredTo.isBefore(occurredFrom)) {
throw new IllegalArgumentException("occurredTo must not be before occurredFrom");
}
if (sourceFamily == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION) {
Objects.requireNonNull(sessionId, "sessionId must not be null");
if (driverKey == null) {
throw new IllegalArgumentException("driverKey must not be blank");
}
} else {
if (tenantKey == null) {
throw new IllegalArgumentException("tenantKey must not be blank");
}
if (!hasDriverSelector(driverSourceEntityId, driverCardNumber)
&& !hasVehicleSelector(vehicleSourceEntityId, vin, registrationNumber)) {
throw new IllegalArgumentException("At least one driver or vehicle selector must be provided.");
}
}
}
public static UnifiedDriverEventsRequest forTachographFileSession(
UUID sessionId,
String driverKey,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
return new UnifiedDriverEventsRequest(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
sessionId,
driverKey,
null,
null,
null,
null,
null,
null,
null,
null,
occurredFrom,
occurredTo
);
}
public static UnifiedDriverEventsRequest forTachographDbDriver(
String tenantKey,
String driverSourceEntityId,
String driverCardNation,
String driverCardNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
return new UnifiedDriverEventsRequest(
UnifiedEventSourceFamily.TACHOGRAPH_DB,
null,
null,
tenantKey,
driverSourceEntityId,
driverCardNation,
driverCardNumber,
null,
null,
null,
null,
occurredFrom,
occurredTo
);
}
public static UnifiedDriverEventsRequest forYellowFoxDbVehicle(
String tenantKey,
String vehicleSourceEntityId,
String vin,
String registrationNation,
String registrationNumber,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
return new UnifiedDriverEventsRequest(
UnifiedEventSourceFamily.YELLOWFOX_DB,
null,
null,
tenantKey,
null,
null,
null,
vehicleSourceEntityId,
vin,
registrationNation,
registrationNumber,
occurredFrom,
occurredTo
);
}
public boolean hasDriverSelector() {
return hasDriverSelector(driverSourceEntityId, driverCardNumber);
}
public boolean hasVehicleSelector() {
return hasVehicleSelector(vehicleSourceEntityId, vin, registrationNumber);
}
private static boolean hasDriverSelector(String driverSourceEntityId, String driverCardNumber) {
return driverSourceEntityId != null || driverCardNumber != null;
}
private static boolean hasVehicleSelector(String vehicleSourceEntityId, String vin, String registrationNumber) {
return vehicleSourceEntityId != null || vin != null || registrationNumber != null;
}
private static String normalize(String value) {
return value == null || value.isBlank() ? null : value.trim();
}
private static String normalizeUpper(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase();
}
}

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.processing.model;
import java.util.Objects;
import java.util.UUID;
public record UnifiedDriverTimelineRequest(
UnifiedEventSourceFamily sourceFamily,
UUID sessionId,
String driverKey
) {
public UnifiedDriverTimelineRequest {
Objects.requireNonNull(sourceFamily, "sourceFamily must not be null");
Objects.requireNonNull(sessionId, "sessionId must not be null");
if (driverKey == null || driverKey.isBlank()) {
throw new IllegalArgumentException("driverKey must not be blank");
}
driverKey = driverKey.trim();
}
public static UnifiedDriverTimelineRequest forTachographFileSession(
UUID sessionId,
String driverKey
) {
return new UnifiedDriverTimelineRequest(
UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION,
sessionId,
driverKey
);
}
}

View File

@ -0,0 +1,7 @@
package at.procon.eventhub.processing.model;
public enum UnifiedEventSourceFamily {
TACHOGRAPH_FILE_SESSION,
TACHOGRAPH_DB,
YELLOWFOX_DB
}

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.persistence.EventHubEventReadRepository;
import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import java.util.List;
import org.springframework.stereotype.Component;
@Component
public class TachographDbUnifiedDriverEventSource implements UnifiedDriverEventSource {
private static final List<String> SOURCE_KINDS = List.of("DRIVER_CARD", "VEHICLE_UNIT");
private final EventHubEventReadRepository repository;
public TachographDbUnifiedDriverEventSource(EventHubEventReadRepository repository) {
this.repository = repository;
}
@Override
public boolean supports(UnifiedDriverEventsRequest request) {
return request.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedDriverEventsRequest request) {
return repository.findEvents(request, "TACHOGRAPH", SOURCE_KINDS);
}
}

View File

@ -0,0 +1,61 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.service.DriverNotFoundInSessionException;
import at.procon.eventhub.tachographfilesession.service.DriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import java.time.OffsetDateTime;
import java.util.List;
import org.springframework.stereotype.Component;
@Component
public class TachographFileSessionUnifiedDriverEventSource implements UnifiedDriverEventSource {
private final TachographFileSessionRepository repository;
private final DriverTimelineEventBuilder eventBuilder;
public TachographFileSessionUnifiedDriverEventSource(
TachographFileSessionRepository repository,
DriverTimelineEventBuilder eventBuilder
) {
this.repository = repository;
this.eventBuilder = eventBuilder;
}
@Override
public boolean supports(UnifiedDriverEventsRequest request) {
return request.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedDriverEventsRequest request) {
TachographFileSession session = repository.find(request.sessionId())
.orElseThrow(() -> new TachographFileSessionNotFoundException(request.sessionId()));
DriverExtractionSession driver = session.driversByKey().get(request.driverKey());
if (driver == null) {
throw new DriverNotFoundInSessionException(request.sessionId(), request.driverKey());
}
return eventBuilder.buildEvents(session, driver).stream()
.filter(event -> withinWindow(event.occurredAt(), request.occurredFrom(), request.occurredTo()))
.toList();
}
private boolean withinWindow(
OffsetDateTime occurredAt,
OffsetDateTime occurredFrom,
OffsetDateTime occurredTo
) {
if (occurredAt == null) {
return false;
}
if (occurredFrom != null && occurredAt.isBefore(occurredFrom)) {
return false;
}
return occurredTo == null || !occurredAt.isAfter(occurredTo);
}
}

View File

@ -0,0 +1,43 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.processing.model.UnifiedDriverTimelineRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.service.DriverNotFoundInSessionException;
import at.procon.eventhub.tachographfilesession.service.EventBackedDriverTimelineBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionNotFoundException;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import org.springframework.stereotype.Component;
@Component
public class TachographFileSessionUnifiedDriverTimelineSource implements UnifiedDriverTimelineSource {
private final TachographFileSessionRepository repository;
private final EventBackedDriverTimelineBuilder eventBackedDriverTimelineBuilder;
public TachographFileSessionUnifiedDriverTimelineSource(
TachographFileSessionRepository repository,
EventBackedDriverTimelineBuilder eventBackedDriverTimelineBuilder
) {
this.repository = repository;
this.eventBackedDriverTimelineBuilder = eventBackedDriverTimelineBuilder;
}
@Override
public boolean supports(UnifiedDriverTimelineRequest request) {
return request.sourceFamily() == UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION;
}
@Override
public ResolvedDriverTimeline loadDriverTimeline(UnifiedDriverTimelineRequest request) {
TachographFileSession session = repository.find(request.sessionId())
.orElseThrow(() -> new TachographFileSessionNotFoundException(request.sessionId()));
DriverExtractionSession driver = session.driversByKey().get(request.driverKey());
if (driver == null) {
throw new DriverNotFoundInSessionException(request.sessionId(), request.driverKey());
}
return eventBackedDriverTimelineBuilder.build(session, driver);
}
}

View File

@ -0,0 +1,12 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import java.util.List;
public interface UnifiedDriverEventSource {
boolean supports(UnifiedDriverEventsRequest request);
List<EventHubEventDto> loadDriverEvents(UnifiedDriverEventsRequest request);
}

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import java.util.List;
import org.springframework.stereotype.Service;
@Service
public class UnifiedDriverEventSourceService {
private final List<UnifiedDriverEventSource> eventSources;
public UnifiedDriverEventSourceService(List<UnifiedDriverEventSource> eventSources) {
this.eventSources = List.copyOf(eventSources);
}
public List<EventHubEventDto> loadDriverEvents(UnifiedDriverEventsRequest request) {
return eventSources.stream()
.filter(source -> source.supports(request))
.findFirst()
.orElseThrow(() -> unsupportedSource(request.sourceFamily().name()))
.loadDriverEvents(request);
}
private IllegalArgumentException unsupportedSource(String sourceFamily) {
return new IllegalArgumentException("No unified driver event source is registered for source family " + sourceFamily + ".");
}
}

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.processing.model.UnifiedDriverTimelineRequest;
import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline;
import java.util.List;
import org.springframework.stereotype.Service;
@Service
public class UnifiedDriverTimelineService {
private final List<UnifiedDriverTimelineSource> timelineSources;
public UnifiedDriverTimelineService(List<UnifiedDriverTimelineSource> timelineSources) {
this.timelineSources = List.copyOf(timelineSources);
}
public ResolvedDriverTimeline loadDriverTimeline(UnifiedDriverTimelineRequest request) {
return timelineSources.stream()
.filter(source -> source.supports(request))
.findFirst()
.orElseThrow(() -> unsupportedSource(request.sourceFamily().name()))
.loadDriverTimeline(request);
}
private IllegalArgumentException unsupportedSource(String sourceFamily) {
return new IllegalArgumentException("No unified driver timeline source is registered for source family " + sourceFamily + ".");
}
}

View File

@ -0,0 +1,11 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.processing.model.UnifiedDriverTimelineRequest;
import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline;
public interface UnifiedDriverTimelineSource {
boolean supports(UnifiedDriverTimelineRequest request);
ResolvedDriverTimeline loadDriverTimeline(UnifiedDriverTimelineRequest request);
}

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.processing.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.persistence.EventHubEventReadRepository;
import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
import java.util.List;
import org.springframework.stereotype.Component;
@Component
public class YellowFoxDbUnifiedDriverEventSource implements UnifiedDriverEventSource {
private final EventHubEventReadRepository repository;
public YellowFoxDbUnifiedDriverEventSource(EventHubEventReadRepository repository) {
this.repository = repository;
}
@Override
public boolean supports(UnifiedDriverEventsRequest request) {
return request.sourceFamily() == UnifiedEventSourceFamily.YELLOWFOX_DB;
}
@Override
public List<EventHubEventDto> loadDriverEvents(UnifiedDriverEventsRequest request) {
return repository.findEvents(request, "YELLOWFOX", List.of("TELEMATICS_PLATFORM"));
}
}

View File

@ -0,0 +1,88 @@
package at.procon.eventhub.processing.model;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class UnifiedDriverEventsRequestTest {
@Test
void buildsFileSessionRequest() {
UUID sessionId = UUID.randomUUID();
UnifiedDriverEventsRequest request = UnifiedDriverEventsRequest.forTachographFileSession(
sessionId,
" 12:123 ",
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z")
);
assertThat(request.sourceFamily()).isEqualTo(UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION);
assertThat(request.sessionId()).isEqualTo(sessionId);
assertThat(request.driverKey()).isEqualTo("12:123");
assertThat(request.tenantKey()).isNull();
}
@Test
void buildsTachographDbDriverRequest() {
UnifiedDriverEventsRequest request = UnifiedDriverEventsRequest.forTachographDbDriver(
" default ",
" DRIVER:42 ",
"at",
" 123 ",
OffsetDateTime.parse("2026-05-01T00:00:00Z"),
OffsetDateTime.parse("2026-05-02T00:00:00Z")
);
assertThat(request.sourceFamily()).isEqualTo(UnifiedEventSourceFamily.TACHOGRAPH_DB);
assertThat(request.tenantKey()).isEqualTo("default");
assertThat(request.driverSourceEntityId()).isEqualTo("DRIVER:42");
assertThat(request.driverCardNation()).isEqualTo("AT");
assertThat(request.driverCardNumber()).isEqualTo("123");
assertThat(request.hasDriverSelector()).isTrue();
assertThat(request.hasVehicleSelector()).isFalse();
}
@Test
void buildsYellowFoxVehicleRequest() {
UnifiedDriverEventsRequest request = UnifiedDriverEventsRequest.forYellowFoxDbVehicle(
"default",
"VEHICLE:99",
"wdb123",
"de",
"W-123AB",
null,
null
);
assertThat(request.sourceFamily()).isEqualTo(UnifiedEventSourceFamily.YELLOWFOX_DB);
assertThat(request.vehicleSourceEntityId()).isEqualTo("VEHICLE:99");
assertThat(request.vin()).isEqualTo("WDB123");
assertThat(request.registrationNation()).isEqualTo("DE");
assertThat(request.registrationNumber()).isEqualTo("W-123AB");
assertThat(request.hasVehicleSelector()).isTrue();
}
@Test
void rejectsDbRequestWithoutSubjectSelector() {
assertThatThrownBy(() -> new UnifiedDriverEventsRequest(
UnifiedEventSourceFamily.TACHOGRAPH_DB,
null,
null,
"default",
null,
null,
null,
null,
null,
null,
null,
null,
null
)).isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("At least one driver or vehicle selector");
}
}

View File

@ -0,0 +1,165 @@
package at.procon.eventhub.processing.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.processing.model.UnifiedDriverEventsRequest;
import at.procon.eventhub.service.EventDetailsFactory;
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
import at.procon.eventhub.tachographfilesession.model.ExtractedCardActivityInterval;
import at.procon.eventhub.tachographfilesession.model.ExtractedCardVehicleUsageInterval;
import at.procon.eventhub.tachographfilesession.model.ExtractedDriver;
import at.procon.eventhub.tachographfilesession.model.ExtractedDriverCard;
import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent;
import at.procon.eventhub.tachographfilesession.model.ExtractedVehicle;
import at.procon.eventhub.tachographfilesession.model.ExtractedVehicleRegistration;
import at.procon.eventhub.tachographfilesession.model.ExtractionStats;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.model.TachographFileSessionMetadata;
import at.procon.eventhub.tachographfilesession.service.DriverKeyFactory;
import at.procon.eventhub.tachographfilesession.service.DriverTimelineBuilder;
import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class UnifiedDriverEventSourceServiceTest {
@Test
void loadsNormalizedFileSessionEventsThroughUnifiedService() {
EventHubProperties properties = new EventHubProperties();
TachographFileSessionRepository repository = new InMemoryTachographFileSessionRepository(properties);
DriverTimelineBuilder timelineBuilder = new DriverTimelineBuilder();
UnifiedDriverEventSourceService service = new UnifiedDriverEventSourceService(List.of(
new TachographFileSessionUnifiedDriverEventSource(
repository,
new IntervalBackedDriverTimelineEventBuilder(
timelineBuilder,
new DriverKeyFactory(),
new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper())
)
)
));
DriverExtractionSession driver = driver();
TachographFileSession session = session(driver);
repository.save(session);
List<at.procon.eventhub.dto.EventHubEventDto> events = service.loadDriverEvents(
UnifiedDriverEventsRequest.forTachographFileSession(
session.sessionId(),
driver.driverKey(),
OffsetDateTime.parse("2026-05-01T08:30:00Z"),
OffsetDateTime.parse("2026-05-01T09:00:00Z")
)
);
assertThat(events).hasSize(3);
assertThat(events).extracting(event -> event.occurredAt())
.containsExactly(
OffsetDateTime.parse("2026-05-01T08:30:00Z"),
OffsetDateTime.parse("2026-05-01T08:45:00Z"),
OffsetDateTime.parse("2026-05-01T09:00:00Z")
);
assertThat(events).extracting(event -> event.eventDomain())
.containsExactly(
EventDomain.DRIVER_ACTIVITY,
EventDomain.POSITION,
EventDomain.DRIVER_ACTIVITY
);
assertThat(events.get(0).lifecycle()).isEqualTo(EventLifecycle.START);
assertThat(events.get(2).lifecycle()).isEqualTo(EventLifecycle.END);
assertThat(events.get(1).eventDetails().type()).isEqualTo("POSITION");
}
private DriverExtractionSession driver() {
return new DriverExtractionSession(
"12:123",
new ExtractedDriver("12:123", "DRV:12:123", "Doe", "Jane", null, null, null, null, null),
new ExtractedDriverCard("CARD:12:123", "12", "123", null, null, null, null),
List.of(new ExtractedVehicleRegistration("12:REG-1", "VR:12:REG-1", "12", "REG-1")),
List.of(new ExtractedVehicle("VIN-1", "VIN:VIN-1", "VIN-1")),
List.of(new ExtractedCardVehicleUsageInterval(
"CVU-1",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
100L,
200L,
"12:REG-1",
"VIN-1",
"vu-1"
)),
List.of(new ExtractedCardActivityInterval(
"ACT-1",
OffsetDateTime.parse("2026-05-01T08:30:00Z"),
OffsetDateTime.parse("2026-05-01T09:00:00Z"),
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"a"
)),
List.of(new ExtractedSupportEvent(
"SUP-1",
OffsetDateTime.parse("2026-05-01T08:45:00Z"),
"POSITION",
"POSITION_RECORDED",
"SNAPSHOT",
"DRIVER",
"12:REG-1",
"VIN-1",
null,
null,
null,
null,
null,
BigDecimal.valueOf(48.2082),
BigDecimal.valueOf(16.3738),
"AUTHENTIC",
150L,
null,
null,
null,
"raw-path"
)),
List.of()
);
}
private TachographFileSession session(DriverExtractionSession driver) {
return new TachographFileSession(
UUID.randomUUID(),
new TachographFileSessionMetadata(
"default",
"legalrequirements-drivercard",
"sample",
"sample.ddd",
"a",
2,
"42",
"b",
true,
null
),
Map.of(driver.driverKey(), driver),
new ExtractionStats(1, 1, 1, 1, 1, 0),
List.of(),
Instant.now(),
Instant.now().plus(4, ChronoUnit.HOURS)
);
}
}

View File

@ -0,0 +1,150 @@
package at.procon.eventhub.processing.service;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.processing.model.UnifiedDriverTimelineRequest;
import at.procon.eventhub.service.EventDetailsFactory;
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
import at.procon.eventhub.tachographfilesession.model.ExtractedCardActivityInterval;
import at.procon.eventhub.tachographfilesession.model.ExtractedCardVehicleUsageInterval;
import at.procon.eventhub.tachographfilesession.model.ExtractedDriver;
import at.procon.eventhub.tachographfilesession.model.ExtractedDriverCard;
import at.procon.eventhub.tachographfilesession.model.ExtractedSupportEvent;
import at.procon.eventhub.tachographfilesession.model.ExtractedVehicle;
import at.procon.eventhub.tachographfilesession.model.ExtractedVehicleRegistration;
import at.procon.eventhub.tachographfilesession.model.ExtractionStats;
import at.procon.eventhub.tachographfilesession.model.ResolvedDriverTimeline;
import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
import at.procon.eventhub.tachographfilesession.model.TachographFileSessionMetadata;
import at.procon.eventhub.tachographfilesession.service.DriverKeyFactory;
import at.procon.eventhub.tachographfilesession.service.EventBackedDriverTimelineBuilder;
import at.procon.eventhub.tachographfilesession.service.InMemoryTachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.IntervalBackedDriverTimelineEventBuilder;
import at.procon.eventhub.tachographfilesession.service.TachographFileSessionRepository;
import at.procon.eventhub.tachographfilesession.service.VehicleKeyFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class UnifiedDriverTimelineServiceTest {
@Test
void reconstructsTimelineThroughUnifiedService() {
EventHubProperties properties = new EventHubProperties();
TachographFileSessionRepository repository = new InMemoryTachographFileSessionRepository(properties);
EventBackedDriverTimelineBuilder eventBackedBuilder = new EventBackedDriverTimelineBuilder(
new IntervalBackedDriverTimelineEventBuilder(
new at.procon.eventhub.tachographfilesession.service.DriverTimelineBuilder(),
new DriverKeyFactory(),
new VehicleKeyFactory(),
new EventDetailsFactory(new ObjectMapper())
)
);
UnifiedDriverTimelineService service = new UnifiedDriverTimelineService(List.of(
new TachographFileSessionUnifiedDriverTimelineSource(repository, eventBackedBuilder)
));
DriverExtractionSession driver = driver();
TachographFileSession session = session(driver);
repository.save(session);
ResolvedDriverTimeline timeline = service.loadDriverTimeline(
UnifiedDriverTimelineRequest.forTachographFileSession(session.sessionId(), driver.driverKey())
);
assertThat(timeline.sourceKind()).isEqualTo("DRIVER_CARD");
assertThat(timeline.loadedFrom()).isEqualTo(OffsetDateTime.parse("2026-05-01T08:00:00Z"));
assertThat(timeline.loadedTo()).isEqualTo(OffsetDateTime.parse("2026-05-01T10:00:00Z"));
assertThat(timeline.activityIntervals()).hasSize(1);
assertThat(timeline.vehicleUsageIntervals()).hasSize(1);
assertThat(timeline.supportEvents()).hasSize(1);
assertThat(timeline.activityIntervals().get(0).activityType()).isEqualTo("DRIVE");
assertThat(timeline.vehicleUsageIntervals().get(0).registrationKey()).isEqualTo("12:REG-1");
}
private DriverExtractionSession driver() {
return new DriverExtractionSession(
"12:123",
new ExtractedDriver("12:123", "DRV:12:123", "Doe", "Jane", null, null, null, null, null),
new ExtractedDriverCard("CARD:12:123", "12", "123", null, null, null, null),
List.of(new ExtractedVehicleRegistration("12:REG-1", "VR:12:REG-1", "12", "REG-1")),
List.of(new ExtractedVehicle("VIN-1", "VIN:VIN-1", "VIN-1")),
List.of(new ExtractedCardVehicleUsageInterval(
"CVU-1",
OffsetDateTime.parse("2026-05-01T08:00:00Z"),
OffsetDateTime.parse("2026-05-01T10:00:00Z"),
100L,
200L,
"12:REG-1",
"VIN-1",
"vu-1"
)),
List.of(new ExtractedCardActivityInterval(
"ACT-1",
OffsetDateTime.parse("2026-05-01T08:30:00Z"),
OffsetDateTime.parse("2026-05-01T09:00:00Z"),
"DRIVE",
"DRIVER",
"INSERTED",
"SINGLE",
"12:REG-1",
"VIN-1",
"a"
)),
List.of(new ExtractedSupportEvent(
"SUP-1",
OffsetDateTime.parse("2026-05-01T08:45:00Z"),
"POSITION",
"POSITION_RECORDED",
"SNAPSHOT",
"DRIVER",
"12:REG-1",
"VIN-1",
null,
null,
null,
null,
null,
BigDecimal.valueOf(48.2082),
BigDecimal.valueOf(16.3738),
"AUTHENTIC",
150L,
null,
null,
null,
"raw-path"
)),
List.of()
);
}
private TachographFileSession session(DriverExtractionSession driver) {
return new TachographFileSession(
UUID.randomUUID(),
new TachographFileSessionMetadata(
"default",
"legalrequirements-drivercard",
"sample",
"sample.ddd",
"a",
2,
"42",
"b",
true,
null
),
Map.of(driver.driverKey(), driver),
new ExtractionStats(1, 1, 1, 1, 1, 0),
List.of(),
Instant.now(),
Instant.now().plus(4, ChronoUnit.HOURS)
);
}
}