Add tachograph raw payload metadata support

This commit is contained in:
trifonovt 2026-05-25 15:58:04 +02:00
parent dd9c33c5fd
commit 8a75db58fd
20 changed files with 531 additions and 96 deletions

View File

@ -1,6 +1,9 @@
services:
postgres:
image: postgres:16
# The Flyway migrations enable both TimescaleDB and PostGIS. The TimescaleDB HA
# image is used for local development because it includes these PostgreSQL
# extensions, unlike the plain postgres image.
image: timescale/timescaledb-ha:pg16
container_name: eventhub-postgres
environment:
POSTGRES_DB: eventhub

View File

@ -0,0 +1,24 @@
package at.procon.eventhub.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class WebMvcJsonConfig implements WebMvcConfigurer {
private final ObjectMapper objectMapper;
public WebMvcJsonConfig(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public void extendMessageConverters(List<HttpMessageConverter<?>> converters) {
converters.removeIf(MappingJackson2HttpMessageConverter.class::isInstance);
converters.add(0, new MappingJackson2HttpMessageConverter(objectMapper));
}
}

View File

@ -22,7 +22,6 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
@ -44,6 +43,8 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe
CardSlot cardSlot = cardSlot(rs);
CardStatus cardStatus = cardStatus(rs);
DrivingStatus drivingStatus = drivingStatus(rs);
EventType eventType = eventType(rs);
EventLifecycle lifecycle = lifecycle(rs);
String externalSourceEventId = string(rs, "external_source_event_id");
if (externalSourceEventId == null) {
@ -59,13 +60,13 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe
offsetDateTime(rs, "received_partner_at"),
OffsetDateTime.now(),
EventDomain.DRIVER_ACTIVITY,
eventType(rs),
lifecycle(rs),
eventType,
lifecycle,
longValue(rs, "odometer_m"),
null,
detailsFactory.driverActivity(cardSlot, cardStatus, drivingStatus),
sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null,
detailsFactory.payloadFromMap(payload(rs, context)),
detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, eventType, lifecycle, cardSlot, cardStatus, drivingStatus)),
isManualEntry(cardStatus, drivingStatus),
context.packageInfo()
);
@ -115,9 +116,24 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe
);
}
private Map<String, Object> payload(ResultSet rs, ExtractionContext<TachographImportRequest> context) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>();
put(raw, "sourceRowId", string(rs, "source_row_id"));
private Map<String, Object> payload(
ResultSet rs,
ExtractionContext<TachographImportRequest> context,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
EventType eventType,
EventLifecycle lifecycle,
CardSlot cardSlot,
CardStatus cardStatus,
DrivingStatus drivingStatus
) throws SQLException {
Map<String, Object> raw = TachographRawPayloadSupport.baseRawPayload(
rs, context, driverRef, vehicleRef, eventType, lifecycle
);
put(raw, "activityType", eventType == null ? null : eventType.name());
put(raw, "slot", cardSlot == null ? null : cardSlot.name());
put(raw, "cardStatus", cardStatus == null ? null : cardStatus.name());
put(raw, "drivingStatus", drivingStatus == null ? null : drivingStatus.name());
raw.putAll(sourceSpecificPayload(rs));
return Map.of("raw", raw);
}

View File

@ -21,7 +21,6 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
@ -60,7 +59,7 @@ abstract class AbstractTachographBorderCrossingRowMapper implements ExtractionRo
position(rs),
detailsFactory.borderCrossing(string(rs, "country_from"), string(rs, "country_to")),
sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null,
detailsFactory.payloadFromMap(payload(rs)),
detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef)),
false,
context.packageInfo()
);
@ -112,9 +111,18 @@ abstract class AbstractTachographBorderCrossingRowMapper implements ExtractionRo
return latitude == null || longitude == null ? null : new GeoPointDto(latitude, longitude);
}
private Map<String, Object> payload(ResultSet rs) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>();
put(raw, "sourceRowId", string(rs, "source_row_id"));
private Map<String, Object> payload(
ResultSet rs,
ExtractionContext<TachographImportRequest> context,
DriverRefDto driverRef,
VehicleRefDto vehicleRef
) throws SQLException {
Map<String, Object> raw = TachographRawPayloadSupport.baseRawPayload(
rs, context, driverRef, vehicleRef, EventType.BORDER_OUTBOUND, EventLifecycle.OUTBOUND
);
put(raw, "supportEventType", EventType.BORDER_OUTBOUND.name());
put(raw, "countryFrom", string(rs, "country_from"));
put(raw, "countryTo", string(rs, "country_to"));
raw.putAll(sourceSpecificPayload(rs));
return Map.of("raw", raw);
}

View File

@ -21,7 +21,6 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
@ -44,6 +43,7 @@ abstract class AbstractTachographCardEventRowMapper implements ExtractionRowMapp
EventType eventType = eventType(rs);
EventLifecycle lifecycle = lifecycle(rs);
CardStatus cardStatus = cardStatus(lifecycle);
CardSlot cardSlot = cardSlot(rs);
String externalSourceEventId = string(rs, "external_source_event_id");
if (externalSourceEventId == null) {
@ -63,9 +63,9 @@ abstract class AbstractTachographCardEventRowMapper implements ExtractionRowMapp
lifecycle,
longValue(rs, "odometer_m"),
null,
detailsFactory.driverCard(cardSlot(rs), cardStatus, driverCard),
detailsFactory.driverCard(cardSlot, cardStatus, driverCard),
sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null,
detailsFactory.payloadFromMap(payload(rs)),
detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, eventType, lifecycle, cardStatus, cardSlot)),
false,
context.packageInfo()
);
@ -111,9 +111,22 @@ abstract class AbstractTachographCardEventRowMapper implements ExtractionRowMapp
);
}
private Map<String, Object> payload(ResultSet rs) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>();
put(raw, "sourceRowId", string(rs, "source_row_id"));
private Map<String, Object> payload(
ResultSet rs,
ExtractionContext<TachographImportRequest> context,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
EventType eventType,
EventLifecycle lifecycle,
CardStatus cardStatus,
CardSlot cardSlot
) throws SQLException {
Map<String, Object> raw = TachographRawPayloadSupport.baseRawPayload(
rs, context, driverRef, vehicleRef, eventType, lifecycle
);
put(raw, "supportEventType", eventType == null ? null : eventType.name());
put(raw, "slot", cardSlot == null ? null : cardSlot.name());
put(raw, "cardStatus", cardStatus == null ? null : cardStatus.name());
raw.putAll(sourceSpecificPayload(rs));
return Map.of("raw", raw);
}

View File

@ -21,7 +21,6 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
@ -62,7 +61,7 @@ abstract class AbstractTachographLoadUnloadRowMapper implements ExtractionRowMap
position(rs),
detailsFactory.loadUnload(string(rs, "operation")),
sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null,
detailsFactory.payloadFromMap(payload(rs)),
detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, eventType)),
false,
context.packageInfo()
);
@ -114,9 +113,18 @@ abstract class AbstractTachographLoadUnloadRowMapper implements ExtractionRowMap
return latitude == null || longitude == null ? null : new GeoPointDto(latitude, longitude);
}
private Map<String, Object> payload(ResultSet rs) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>();
put(raw, "sourceRowId", string(rs, "source_row_id"));
private Map<String, Object> payload(
ResultSet rs,
ExtractionContext<TachographImportRequest> context,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
EventType eventType
) throws SQLException {
Map<String, Object> raw = TachographRawPayloadSupport.baseRawPayload(
rs, context, driverRef, vehicleRef, eventType, EventLifecycle.SNAPSHOT
);
put(raw, "supportEventType", eventType == null ? null : eventType.name());
put(raw, "operation", string(rs, "operation"));
raw.putAll(sourceSpecificPayload(rs));
return Map.of("raw", raw);
}

View File

@ -21,7 +21,6 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
@ -63,7 +62,7 @@ abstract class AbstractTachographPlaceRowMapper implements ExtractionRowMapper<T
position(rs),
detailsFactory.place(string(rs, "country"), string(rs, "region")),
sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null,
detailsFactory.payloadFromMap(payload(rs)),
detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, eventType, lifecycle)),
booleanValue(rs, "manual_entry"),
context.packageInfo()
);
@ -115,9 +114,20 @@ abstract class AbstractTachographPlaceRowMapper implements ExtractionRowMapper<T
return latitude == null || longitude == null ? null : new GeoPointDto(latitude, longitude);
}
private Map<String, Object> payload(ResultSet rs) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>();
put(raw, "sourceRowId", string(rs, "source_row_id"));
private Map<String, Object> payload(
ResultSet rs,
ExtractionContext<TachographImportRequest> context,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
EventType eventType,
EventLifecycle lifecycle
) throws SQLException {
Map<String, Object> raw = TachographRawPayloadSupport.baseRawPayload(
rs, context, driverRef, vehicleRef, eventType, lifecycle
);
put(raw, "supportEventType", eventType == null ? null : eventType.name());
put(raw, "country", string(rs, "country"));
put(raw, "region", string(rs, "region"));
raw.putAll(sourceSpecificPayload(rs));
return Map.of("raw", raw);
}

View File

@ -21,7 +21,6 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
@ -60,7 +59,7 @@ abstract class AbstractTachographPositionRowMapper implements ExtractionRowMappe
position(rs),
detailsFactory.position("GNSS_ACCUMULATED_DRIVING"),
sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null,
detailsFactory.payloadFromMap(payload(rs)),
detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef)),
false,
context.packageInfo()
);
@ -112,9 +111,16 @@ abstract class AbstractTachographPositionRowMapper implements ExtractionRowMappe
return latitude == null || longitude == null ? null : new GeoPointDto(latitude, longitude);
}
private Map<String, Object> payload(ResultSet rs) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>();
put(raw, "sourceRowId", string(rs, "source_row_id"));
private Map<String, Object> payload(
ResultSet rs,
ExtractionContext<TachographImportRequest> context,
DriverRefDto driverRef,
VehicleRefDto vehicleRef
) throws SQLException {
Map<String, Object> raw = TachographRawPayloadSupport.baseRawPayload(
rs, context, driverRef, vehicleRef, EventType.POSITION_RECORDED, EventLifecycle.SNAPSHOT
);
put(raw, "supportEventType", EventType.POSITION_RECORDED.name());
raw.putAll(sourceSpecificPayload(rs));
return Map.of("raw", raw);
}

View File

@ -19,7 +19,6 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
@ -62,7 +61,7 @@ abstract class AbstractTachographSpecificConditionRowMapper implements Extractio
null,
detailsFactory.specificCondition(),
sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null,
detailsFactory.payloadFromMap(payload(rs)),
detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, eventType, lifecycle)),
false,
context.packageInfo()
);
@ -108,9 +107,18 @@ abstract class AbstractTachographSpecificConditionRowMapper implements Extractio
);
}
private Map<String, Object> payload(ResultSet rs) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>();
put(raw, "sourceRowId", string(rs, "source_row_id"));
private Map<String, Object> payload(
ResultSet rs,
ExtractionContext<TachographImportRequest> context,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
EventType eventType,
EventLifecycle lifecycle
) throws SQLException {
Map<String, Object> raw = TachographRawPayloadSupport.baseRawPayload(
rs, context, driverRef, vehicleRef, eventType, lifecycle
);
put(raw, "supportEventType", eventType == null ? null : eventType.name());
raw.putAll(sourceSpecificPayload(rs));
return Map.of("raw", raw);
}

View File

@ -20,7 +20,6 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
@ -63,7 +62,7 @@ public class SpeedingEventRowMapper implements ExtractionRowMapper<TachographImp
null,
detailsFactory.speeding(decimal(rs, "avg_speed_kmh"), decimal(rs, "max_speed_kmh"), null),
sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null,
detailsFactory.payloadFromMap(payload(rs)),
detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, lifecycle)),
false,
context.packageInfo()
);
@ -105,9 +104,19 @@ public class SpeedingEventRowMapper implements ExtractionRowMapper<TachographImp
);
}
private Map<String, Object> payload(ResultSet rs) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>();
put(raw, "sourceRowId", string(rs, "source_row_id"));
private Map<String, Object> payload(
ResultSet rs,
ExtractionContext<TachographImportRequest> context,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
EventLifecycle lifecycle
) throws SQLException {
Map<String, Object> raw = TachographRawPayloadSupport.baseRawPayload(
rs, context, driverRef, vehicleRef, EventType.SPEEDING, lifecycle
);
put(raw, "supportEventType", EventType.SPEEDING.name());
put(raw, "avgSpeedKmh", decimal(rs, "avg_speed_kmh"));
put(raw, "maxSpeedKmh", decimal(rs, "max_speed_kmh"));
return Map.of("raw", raw);
}

View File

@ -0,0 +1,148 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.importing.extraction.ExtractionContext;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
final class TachographRawPayloadSupport {
private TachographRawPayloadSupport() {
}
static Map<String, Object> baseRawPayload(
ResultSet rs,
ExtractionContext<TachographImportRequest> context,
DriverRefDto driverRef,
VehicleRefDto vehicleRef,
EventType eventType,
EventLifecycle lifecycle
) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>();
String sourceRowId = string(rs, "source_row_id");
put(raw, "sourceRowId", sourceRowId);
if (sourceRowId != null) {
put(raw, "sourceRowIds", List.of(sourceRowId));
put(raw, "intervalId", intervalId(context, sourceRowId));
}
put(raw, "sourceKind", context == null || context.planItem() == null ? null : context.planItem().sourceKind());
put(raw, "extractionCode", context == null || context.planItem() == null ? null : context.planItem().extractionCode());
put(raw, "level", "RAW_INTERVAL");
put(raw, "eventType", eventType == null ? null : eventType.name());
put(raw, "lifecycle", lifecycle == null ? null : lifecycle.name());
enrichDriver(raw, driverRef);
enrichVehicle(raw, vehicleRef);
enrichSourcePackage(raw, rs);
enrichOdometer(raw, rs);
return raw;
}
static void put(Map<String, Object> target, String key, Object value) {
if (value != null) {
target.put(key, value instanceof Enum<?> enumValue ? enumValue.name() : value);
}
}
private static String intervalId(ExtractionContext<TachographImportRequest> context, String sourceRowId) {
String extractionCode = context == null || context.planItem() == null ? null : context.planItem().extractionCode();
return "TACHOGRAPH:" + (extractionCode == null || extractionCode.isBlank() ? "UNKNOWN" : extractionCode) + ":" + sourceRowId;
}
private static void enrichDriver(Map<String, Object> raw, DriverRefDto driverRef) {
if (driverRef == null || !driverRef.hasAnyReference()) {
return;
}
put(raw, "driverKey", driverRef.stableKey());
put(raw, "driverSourceEntityId", driverRef.sourceEntityId());
DriverCardRefDto driverCard = driverRef.driverCard();
if (driverCard != null && driverCard.hasValue()) {
put(raw, "driverCardKey", driverCard.stableKey());
put(raw, "driverCardNation", driverCard.nation());
put(raw, "driverCardNationNumericCode", driverCard.nationNumericCode());
put(raw, "driverCardNumber", driverCard.number());
}
}
private static void enrichVehicle(Map<String, Object> raw, VehicleRefDto vehicleRef) {
if (vehicleRef == null || !vehicleRef.hasAnyReference()) {
return;
}
put(raw, "vehicleKey", vehicleRef.stableKey());
put(raw, "vehicleSourceEntityId", vehicleRef.sourceVehicleEntityId());
put(raw, "vehicleVin", vehicleRef.vin());
put(raw, "vehicleRegistrationSourceEntityId", vehicleRef.sourceRegistrationEntityId());
VehicleRegistrationRefDto registration = vehicleRef.vehicleRegistration();
if (registration != null && registration.hasValue()) {
put(raw, "registrationKey", registration.stableKey());
put(raw, "vehicleRegistrationNation", registration.nation());
put(raw, "vehicleRegistrationNationNumericCode", registration.nationNumericCode());
put(raw, "vehicleRegistrationNumber", registration.number());
}
}
private static void enrichSourcePackage(Map<String, Object> raw, ResultSet rs) throws SQLException {
put(raw, "sourcePackageKind", string(rs, "source_package_kind"));
put(raw, "sourcePackageId", string(rs, "source_package_id"));
put(raw, "sourcePackageEntityId", string(rs, "source_package_entity_id"));
}
private static void enrichOdometer(Map<String, Object> raw, ResultSet rs) throws SQLException {
Long odometerM = longValue(rs, "odometer_m");
put(raw, "odometerM", odometerM);
if (odometerM != null) {
put(raw, "odometerKm", odometerM / 1_000L);
}
}
private static String string(ResultSet rs, String column) throws SQLException {
try {
String value = rs.getString(column);
return value == null || value.isBlank() ? null : value.trim();
} catch (SQLException ex) {
if (missingColumn(ex)) {
return null;
}
throw ex;
}
}
private static Long longValue(ResultSet rs, String column) throws SQLException {
Object value;
try {
value = rs.getObject(column);
} catch (SQLException ex) {
if (missingColumn(ex)) {
return null;
}
throw ex;
}
if (value == null) {
return null;
}
if (value instanceof Number number) {
return number.longValue();
}
return Long.parseLong(value.toString());
}
private static boolean missingColumn(SQLException ex) {
String state = ex.getSQLState();
String message = ex.getMessage();
return "S0022".equals(state)
|| "42703".equals(state)
|| (message != null && message.toLowerCase(java.util.Locale.ROOT).contains("column")
&& (message.toLowerCase(java.util.Locale.ROOT).contains("not found")
|| message.toLowerCase(java.util.Locale.ROOT).contains("not valid")
|| message.toLowerCase(java.util.Locale.ROOT).contains("invalid")));
}
}

View File

@ -1,52 +0,0 @@
create extension if not exists timescaledb;
create table if not exists eventhub.event_source_record (
source_record_key_hash text primary key,
event_occurred_at timestamptz not null,
event_id uuid not null,
created_at timestamptz not null default now()
);
insert into eventhub.event_source_record(source_record_key_hash, event_occurred_at, event_id)
select event.source_record_key_hash, event.occurred_at, event.id
from eventhub.event
on conflict (source_record_key_hash) do nothing;
alter table eventhub.event_detail
drop constraint if exists fk_event_detail_event;
alter table eventhub.event_source_record
drop constraint if exists fk_event_source_record_event;
drop index if exists eventhub.ux_event_source_record;
do $$
begin
if not exists (
select 1
from timescaledb_information.hypertables
where hypertable_schema = 'eventhub'
and hypertable_name = 'event'
) then
perform create_hypertable(
'eventhub.event',
'occurred_at',
chunk_time_interval => interval '7 days',
migrate_data => true
);
end if;
end $$;
alter table eventhub.event_detail
add constraint fk_event_detail_event foreign key (event_occurred_at, event_id)
references eventhub.event(occurred_at, id)
on delete cascade;
alter table eventhub.event_source_record
add constraint fk_event_source_record_event foreign key (event_occurred_at, event_id)
references eventhub.event(occurred_at, id)
on delete cascade
deferrable initially deferred;
create index if not exists idx_event_source_record_event
on eventhub.event_source_record(event_occurred_at, event_id);

View File

@ -0,0 +1,45 @@
package at.procon.eventhub;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class MigrationScriptNamingTest {
private static final Pattern VERSION_PATTERN = Pattern.compile("V([0-9]+)__.*\\.sql");
@Test
void versionedFlywayMigrationsUseUniqueVersionNumbers() throws IOException {
Path migrationDirectory = Path.of("src/main/resources/db/migration");
Map<String, List<String>> namesByVersion = new TreeMap<>();
try (var stream = Files.list(migrationDirectory)) {
stream.filter(path -> path.getFileName().toString().endsWith(".sql"))
.map(path -> path.getFileName().toString())
.forEach(fileName -> {
String version = version(fileName);
if (version != null) {
namesByVersion.computeIfAbsent(version, ignored -> new ArrayList<>()).add(fileName);
}
});
}
assertThat(namesByVersion)
.allSatisfy((version, names) -> assertThat(names)
.as("Flyway version V%s must be unique", version)
.hasSize(1));
}
private String version(String fileName) {
var matcher = VERSION_PATTERN.matcher(fileName);
return matcher.matches() ? matcher.group(1) : null;
}
}

View File

@ -0,0 +1,36 @@
package at.procon.eventhub.persistence;
import static org.assertj.core.api.Assertions.assertThat;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
class EventHubEventReadRepositoryTest {
private final ObjectMapper objectMapper = new ObjectMapper().findAndRegisterModules();
@Test
void parsesRegularJsonObjectColumn() {
JsonNode node = EventHubEventReadRepository.parseJsonColumn(
objectMapper,
"{\"raw\":{\"driverKey\":\"12:123\"},\"details\":{\"k\":1}}"
);
assertThat(node.isObject()).isTrue();
assertThat(node.get("raw").get("driverKey").asText()).isEqualTo("12:123");
assertThat(node.get("details").get("k").asInt()).isEqualTo(1);
}
@Test
void unwrapsDoubleEncodedJsonObjectColumn() {
JsonNode node = EventHubEventReadRepository.parseJsonColumn(
objectMapper,
"\"{\\\"raw\\\":{\\\"driverKey\\\":\\\"12:123\\\"},\\\"details\\\":{\\\"k\\\":1}}\""
);
assertThat(node.isObject()).isTrue();
assertThat(node.get("raw").get("driverKey").asText()).isEqualTo("12:123");
assertThat(node.get("details").get("k").asInt()).isEqualTo(1);
}
}

View File

@ -0,0 +1,153 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.extraction.ExtractionContext;
import at.procon.eventhub.processing.service.UnifiedEventTimelineReconstructor;
import at.procon.eventhub.service.EventDetailsFactory;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class TachographDbRowMapperTimelineMetadataTest {
private final EventDetailsFactory detailsFactory = new EventDetailsFactory(new ObjectMapper());
private final UnifiedEventTimelineReconstructor reconstructor = new UnifiedEventTimelineReconstructor();
@Test
void activityRowsExposeTimelineMetadataForReconstruction() throws SQLException {
CardActivityRowMapper mapper = new CardActivityRowMapper(detailsFactory);
ExtractionContext<TachographImportRequest> context = context(EventFamily.DRIVER_ACTIVITY, "DRIVER_CARD", "CARD_ACTIVITY");
EventHubEventDto started = mapper.map(activityRow("START", "2026-05-22T06:00:00Z"), 0, context);
EventHubEventDto ended = mapper.map(activityRow("END", "2026-05-22T07:15:00Z"), 1, context);
JsonNode raw = started.payload().get("raw");
assertThat(raw.get("intervalId").asText()).isEqualTo("TACHOGRAPH:CARD_ACTIVITY:CA-1");
assertThat(raw.get("sourceKind").asText()).isEqualTo("DRIVER_CARD");
assertThat(raw.get("extractionCode").asText()).isEqualTo("CARD_ACTIVITY");
assertThat(raw.get("sourceRowIds").get(0).asText()).isEqualTo("CA-1");
assertThat(raw.get("driverKey").asText()).contains("driver-1");
assertThat(raw.get("vehicleKey").asText()).contains("VIN123");
assertThat(raw.get("registrationKey").asText()).contains("W-12345");
var timeline = reconstructor.reconstruct(UUID.randomUUID(), "driver-1", List.of(started, ended));
assertThat(timeline.activityIntervals()).hasSize(1);
var interval = timeline.activityIntervals().getFirst();
assertThat(interval.intervalId()).isEqualTo("TACHOGRAPH:CARD_ACTIVITY:CA-1");
assertThat(interval.activityType()).isEqualTo("DRIVE");
assertThat(interval.sourceKind()).isEqualTo("DRIVER_CARD");
assertThat(interval.sourceIntervalIds()).containsExactly("CA-1");
assertThat(interval.registrationKey()).contains("W-12345");
assertThat(interval.vehicleKey()).contains("VIN123");
assertThat(interval.durationSeconds()).isEqualTo(4_500L);
}
@Test
void cardUsageRowsExposeTimelineMetadataForVehicleUsageReconstruction() throws SQLException {
CardVehiclesUsedCardEventRowMapper mapper = new CardVehiclesUsedCardEventRowMapper(detailsFactory);
ExtractionContext<TachographImportRequest> context = context(EventFamily.DRIVER_CARD, "DRIVER_CARD", "CARD_VEHICLES_USED");
EventHubEventDto inserted = mapper.map(cardUsageRow("CARD_INSERTED", "INSERT", "2026-05-22T06:00:00Z", 120_000L), 0, context);
EventHubEventDto withdrawn = mapper.map(cardUsageRow("CARD_WITHDRAWN", "WITHDRAW", "2026-05-22T18:00:00Z", 120_550L), 1, context);
JsonNode raw = inserted.payload().get("raw");
assertThat(raw.get("intervalId").asText()).isEqualTo("TACHOGRAPH:CARD_VEHICLES_USED:CVU-1");
assertThat(raw.get("sourceRowIds").get(0).asText()).isEqualTo("CVU-1");
assertThat(raw.get("registrationKey").asText()).contains("W-12345");
var timeline = reconstructor.reconstruct(UUID.randomUUID(), "driver-1", List.of(inserted, withdrawn));
assertThat(timeline.vehicleUsageIntervals()).hasSize(1);
var usage = timeline.vehicleUsageIntervals().getFirst();
assertThat(usage.intervalId()).isEqualTo("TACHOGRAPH:CARD_VEHICLES_USED:CVU-1");
assertThat(usage.sourceKind()).isEqualTo("DRIVER_CARD");
assertThat(usage.sourceIntervalIds()).containsExactly("CVU-1");
assertThat(usage.registrationKey()).contains("W-12345");
assertThat(usage.vehicleKey()).contains("VIN123");
assertThat(usage.odometerBeginKm()).isEqualTo(120L);
assertThat(usage.odometerEndKm()).isEqualTo(120L);
assertThat(usage.durationSeconds()).isEqualTo(43_200L);
}
private ResultSet activityRow(String lifecycle, String occurredAt) throws SQLException {
ResultSet rs = commonRow("CA-1", occurredAt, 120_000L);
when(rs.getString("external_source_event_id")).thenReturn("CARD_ACTIVITY:CA-1:" + lifecycle);
when(rs.getString("event_type")).thenReturn("DRIVE");
when(rs.getString("lifecycle")).thenReturn(lifecycle);
when(rs.getObject("activity_code")).thenReturn(3);
when(rs.getObject("card_slot")).thenReturn(0);
when(rs.getObject("card_status")).thenReturn(0);
when(rs.getObject("driving_status")).thenReturn(0);
return rs;
}
private ResultSet cardUsageRow(String eventType, String lifecycle, String occurredAt, Long odometerM) throws SQLException {
ResultSet rs = commonRow("CVU-1", occurredAt, odometerM);
when(rs.getString("external_source_event_id")).thenReturn("CARD_VEHICLES_USED:CVU-1:" + lifecycle);
when(rs.getString("event_type")).thenReturn(eventType);
when(rs.getString("lifecycle")).thenReturn(lifecycle);
when(rs.getObject("card_slot")).thenReturn(0);
return rs;
}
private ResultSet commonRow(String sourceRowId, String occurredAt, Long odometerM) throws SQLException {
ResultSet rs = mock(ResultSet.class);
when(rs.getString("source_row_id")).thenReturn(sourceRowId);
when(rs.getObject("occurred_at")).thenReturn(OffsetDateTime.parse(occurredAt));
when(rs.getObject("received_partner_at")).thenReturn(null);
when(rs.getObject("odometer_m")).thenReturn(odometerM);
when(rs.getString("driver_source_entity_id")).thenReturn("driver-1");
when(rs.getString("driver_card_nation")).thenReturn("A");
when(rs.getString("driver_card_number")).thenReturn("12345678901234");
when(rs.getString("vehicle_source_entity_id")).thenReturn("vehicle-1");
when(rs.getString("vehicle_vin")).thenReturn("VIN123");
when(rs.getString("vehicle_registration_source_entity_id")).thenReturn("reg-1");
when(rs.getString("vehicle_registration_nation")).thenReturn("A");
when(rs.getString("vehicle_registration_number")).thenReturn("W-12345");
when(rs.getString("source_package_kind")).thenReturn("DRIVER_CARD");
when(rs.getString("source_package_id")).thenReturn("package-1");
when(rs.getString("source_package_entity_id")).thenReturn("card-1");
when(rs.getObject("source_package_period_from")).thenReturn(null);
when(rs.getObject("source_package_period_to")).thenReturn(null);
when(rs.getObject("source_package_imported_at")).thenReturn(null);
return rs;
}
private ExtractionContext<TachographImportRequest> context(EventFamily family, String sourceKind, String extractionCode) {
return new ExtractionContext<>(
UUID.randomUUID(),
UUID.randomUUID(),
1,
null,
new ImportPlanItemDto(
family,
sourceKind,
extractionCode,
List.of("source-table"),
"DRIVER",
"test extraction",
AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP
),
new ImportTimeChunkDto(0, null, null),
new EventSourceDto("TACHOGRAPH", "MIXED", "TACHOGRAPH_DB", "test", null, null),
null
);
}
}