From 8a75db58fd6222b64f4844d82d96ffddf3dac707 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 25 May 2026 15:58:04 +0200 Subject: [PATCH] Add tachograph raw payload metadata support --- docker-compose.yml | 5 +- .../eventhub/config/WebMvcJsonConfig.java | 24 +++ .../AbstractTachographActivityRowMapper.java | 30 +++- ...ractTachographBorderCrossingRowMapper.java | 18 ++- .../AbstractTachographCardEventRowMapper.java | 25 ++- ...AbstractTachographLoadUnloadRowMapper.java | 18 ++- .../AbstractTachographPlaceRowMapper.java | 20 ++- .../AbstractTachographPositionRowMapper.java | 16 +- ...tTachographSpecificConditionRowMapper.java | 18 ++- .../service/SpeedingEventRowMapper.java | 19 ++- .../service/TachographRawPayloadSupport.java | 148 +++++++++++++++++ ... V10__introduce_driver_identity_model.sql} | 0 .../V11__ensure_event_source_record.sql | 52 ------ ...l => V11__introduce_driver_card_model.sql} | 0 ...ble.sql => V12__make_event_hypertable.sql} | 0 ... V13__add_yellowfox_d8_detail_indexes.sql} | 0 ...dd_nation_reference_and_numeric_codes.sql} | 0 .../eventhub/MigrationScriptNamingTest.java | 45 ++++++ .../EventHubEventReadRepositoryTest.java | 36 +++++ ...ographDbRowMapperTimelineMetadataTest.java | 153 ++++++++++++++++++ 20 files changed, 531 insertions(+), 96 deletions(-) create mode 100644 src/main/java/at/procon/eventhub/config/WebMvcJsonConfig.java create mode 100644 src/main/java/at/procon/eventhub/tachograph/service/TachographRawPayloadSupport.java rename src/main/resources/db/migration/{V9__introduce_driver_identity_model.sql => V10__introduce_driver_identity_model.sql} (100%) delete mode 100644 src/main/resources/db/migration/V11__ensure_event_source_record.sql rename src/main/resources/db/migration/{V10__introduce_driver_card_model.sql => V11__introduce_driver_card_model.sql} (100%) rename src/main/resources/db/migration/{V10__make_event_hypertable.sql => V12__make_event_hypertable.sql} (100%) rename src/main/resources/db/migration/{V12__add_yellowfox_d8_detail_indexes.sql => V13__add_yellowfox_d8_detail_indexes.sql} (100%) rename src/main/resources/db/migration/{V13__add_nation_reference_and_numeric_codes.sql => V14__add_nation_reference_and_numeric_codes.sql} (100%) create mode 100644 src/test/java/at/procon/eventhub/MigrationScriptNamingTest.java create mode 100644 src/test/java/at/procon/eventhub/persistence/EventHubEventReadRepositoryTest.java create mode 100644 src/test/java/at/procon/eventhub/tachograph/service/TachographDbRowMapperTimelineMetadataTest.java diff --git a/docker-compose.yml b/docker-compose.yml index 27ab309..2c8cd11 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,9 @@ services: postgres: - image: postgres:16 + # The Flyway migrations enable both TimescaleDB and PostGIS. The TimescaleDB HA + # image is used for local development because it includes these PostgreSQL + # extensions, unlike the plain postgres image. + image: timescale/timescaledb-ha:pg16 container_name: eventhub-postgres environment: POSTGRES_DB: eventhub diff --git a/src/main/java/at/procon/eventhub/config/WebMvcJsonConfig.java b/src/main/java/at/procon/eventhub/config/WebMvcJsonConfig.java new file mode 100644 index 0000000..a2d5f12 --- /dev/null +++ b/src/main/java/at/procon/eventhub/config/WebMvcJsonConfig.java @@ -0,0 +1,24 @@ +package at.procon.eventhub.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.converter.HttpMessageConverter; +import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@Configuration +public class WebMvcJsonConfig implements WebMvcConfigurer { + + private final ObjectMapper objectMapper; + + public WebMvcJsonConfig(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public void extendMessageConverters(List> converters) { + converters.removeIf(MappingJackson2HttpMessageConverter.class::isInstance); + converters.add(0, new MappingJackson2HttpMessageConverter(objectMapper)); + } +} diff --git a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java index 6e2f14c..79c3831 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographActivityRowMapper.java @@ -22,7 +22,6 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; import java.util.UUID; @@ -44,6 +43,8 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe CardSlot cardSlot = cardSlot(rs); CardStatus cardStatus = cardStatus(rs); DrivingStatus drivingStatus = drivingStatus(rs); + EventType eventType = eventType(rs); + EventLifecycle lifecycle = lifecycle(rs); String externalSourceEventId = string(rs, "external_source_event_id"); if (externalSourceEventId == null) { @@ -59,13 +60,13 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe offsetDateTime(rs, "received_partner_at"), OffsetDateTime.now(), EventDomain.DRIVER_ACTIVITY, - eventType(rs), - lifecycle(rs), + eventType, + lifecycle, longValue(rs, "odometer_m"), null, detailsFactory.driverActivity(cardSlot, cardStatus, drivingStatus), sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null, - detailsFactory.payloadFromMap(payload(rs, context)), + detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, eventType, lifecycle, cardSlot, cardStatus, drivingStatus)), isManualEntry(cardStatus, drivingStatus), context.packageInfo() ); @@ -115,9 +116,24 @@ abstract class AbstractTachographActivityRowMapper implements ExtractionRowMappe ); } - private Map payload(ResultSet rs, ExtractionContext context) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "sourceRowId", string(rs, "source_row_id")); + private Map payload( + ResultSet rs, + ExtractionContext context, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + EventType eventType, + EventLifecycle lifecycle, + CardSlot cardSlot, + CardStatus cardStatus, + DrivingStatus drivingStatus + ) throws SQLException { + Map raw = TachographRawPayloadSupport.baseRawPayload( + rs, context, driverRef, vehicleRef, eventType, lifecycle + ); + put(raw, "activityType", eventType == null ? null : eventType.name()); + put(raw, "slot", cardSlot == null ? null : cardSlot.name()); + put(raw, "cardStatus", cardStatus == null ? null : cardStatus.name()); + put(raw, "drivingStatus", drivingStatus == null ? null : drivingStatus.name()); raw.putAll(sourceSpecificPayload(rs)); return Map.of("raw", raw); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographBorderCrossingRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographBorderCrossingRowMapper.java index daed7ed..46f17b7 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographBorderCrossingRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographBorderCrossingRowMapper.java @@ -21,7 +21,6 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -60,7 +59,7 @@ abstract class AbstractTachographBorderCrossingRowMapper implements ExtractionRo position(rs), detailsFactory.borderCrossing(string(rs, "country_from"), string(rs, "country_to")), sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null, - detailsFactory.payloadFromMap(payload(rs)), + detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef)), false, context.packageInfo() ); @@ -112,9 +111,18 @@ abstract class AbstractTachographBorderCrossingRowMapper implements ExtractionRo return latitude == null || longitude == null ? null : new GeoPointDto(latitude, longitude); } - private Map payload(ResultSet rs) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "sourceRowId", string(rs, "source_row_id")); + private Map payload( + ResultSet rs, + ExtractionContext context, + DriverRefDto driverRef, + VehicleRefDto vehicleRef + ) throws SQLException { + Map raw = TachographRawPayloadSupport.baseRawPayload( + rs, context, driverRef, vehicleRef, EventType.BORDER_OUTBOUND, EventLifecycle.OUTBOUND + ); + put(raw, "supportEventType", EventType.BORDER_OUTBOUND.name()); + put(raw, "countryFrom", string(rs, "country_from")); + put(raw, "countryTo", string(rs, "country_to")); raw.putAll(sourceSpecificPayload(rs)); return Map.of("raw", raw); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographCardEventRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographCardEventRowMapper.java index 53ae000..257ff20 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographCardEventRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographCardEventRowMapper.java @@ -21,7 +21,6 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; import java.util.UUID; @@ -44,6 +43,7 @@ abstract class AbstractTachographCardEventRowMapper implements ExtractionRowMapp EventType eventType = eventType(rs); EventLifecycle lifecycle = lifecycle(rs); CardStatus cardStatus = cardStatus(lifecycle); + CardSlot cardSlot = cardSlot(rs); String externalSourceEventId = string(rs, "external_source_event_id"); if (externalSourceEventId == null) { @@ -63,9 +63,9 @@ abstract class AbstractTachographCardEventRowMapper implements ExtractionRowMapp lifecycle, longValue(rs, "odometer_m"), null, - detailsFactory.driverCard(cardSlot(rs), cardStatus, driverCard), + detailsFactory.driverCard(cardSlot, cardStatus, driverCard), sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null, - detailsFactory.payloadFromMap(payload(rs)), + detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, eventType, lifecycle, cardStatus, cardSlot)), false, context.packageInfo() ); @@ -111,9 +111,22 @@ abstract class AbstractTachographCardEventRowMapper implements ExtractionRowMapp ); } - private Map payload(ResultSet rs) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "sourceRowId", string(rs, "source_row_id")); + private Map payload( + ResultSet rs, + ExtractionContext context, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + EventType eventType, + EventLifecycle lifecycle, + CardStatus cardStatus, + CardSlot cardSlot + ) throws SQLException { + Map raw = TachographRawPayloadSupport.baseRawPayload( + rs, context, driverRef, vehicleRef, eventType, lifecycle + ); + put(raw, "supportEventType", eventType == null ? null : eventType.name()); + put(raw, "slot", cardSlot == null ? null : cardSlot.name()); + put(raw, "cardStatus", cardStatus == null ? null : cardStatus.name()); raw.putAll(sourceSpecificPayload(rs)); return Map.of("raw", raw); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographLoadUnloadRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographLoadUnloadRowMapper.java index 85c9120..ed836f1 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographLoadUnloadRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographLoadUnloadRowMapper.java @@ -21,7 +21,6 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; import java.util.UUID; @@ -62,7 +61,7 @@ abstract class AbstractTachographLoadUnloadRowMapper implements ExtractionRowMap position(rs), detailsFactory.loadUnload(string(rs, "operation")), sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null, - detailsFactory.payloadFromMap(payload(rs)), + detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, eventType)), false, context.packageInfo() ); @@ -114,9 +113,18 @@ abstract class AbstractTachographLoadUnloadRowMapper implements ExtractionRowMap return latitude == null || longitude == null ? null : new GeoPointDto(latitude, longitude); } - private Map payload(ResultSet rs) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "sourceRowId", string(rs, "source_row_id")); + private Map payload( + ResultSet rs, + ExtractionContext context, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + EventType eventType + ) throws SQLException { + Map raw = TachographRawPayloadSupport.baseRawPayload( + rs, context, driverRef, vehicleRef, eventType, EventLifecycle.SNAPSHOT + ); + put(raw, "supportEventType", eventType == null ? null : eventType.name()); + put(raw, "operation", string(rs, "operation")); raw.putAll(sourceSpecificPayload(rs)); return Map.of("raw", raw); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographPlaceRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographPlaceRowMapper.java index 015e0f4..343ba6e 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographPlaceRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographPlaceRowMapper.java @@ -21,7 +21,6 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; import java.util.UUID; @@ -63,7 +62,7 @@ abstract class AbstractTachographPlaceRowMapper implements ExtractionRowMapper payload(ResultSet rs) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "sourceRowId", string(rs, "source_row_id")); + private Map payload( + ResultSet rs, + ExtractionContext context, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + EventType eventType, + EventLifecycle lifecycle + ) throws SQLException { + Map raw = TachographRawPayloadSupport.baseRawPayload( + rs, context, driverRef, vehicleRef, eventType, lifecycle + ); + put(raw, "supportEventType", eventType == null ? null : eventType.name()); + put(raw, "country", string(rs, "country")); + put(raw, "region", string(rs, "region")); raw.putAll(sourceSpecificPayload(rs)); return Map.of("raw", raw); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographPositionRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographPositionRowMapper.java index e16d0c2..1d27929 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographPositionRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographPositionRowMapper.java @@ -21,7 +21,6 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -60,7 +59,7 @@ abstract class AbstractTachographPositionRowMapper implements ExtractionRowMappe position(rs), detailsFactory.position("GNSS_ACCUMULATED_DRIVING"), sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null, - detailsFactory.payloadFromMap(payload(rs)), + detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef)), false, context.packageInfo() ); @@ -112,9 +111,16 @@ abstract class AbstractTachographPositionRowMapper implements ExtractionRowMappe return latitude == null || longitude == null ? null : new GeoPointDto(latitude, longitude); } - private Map payload(ResultSet rs) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "sourceRowId", string(rs, "source_row_id")); + private Map payload( + ResultSet rs, + ExtractionContext context, + DriverRefDto driverRef, + VehicleRefDto vehicleRef + ) throws SQLException { + Map raw = TachographRawPayloadSupport.baseRawPayload( + rs, context, driverRef, vehicleRef, EventType.POSITION_RECORDED, EventLifecycle.SNAPSHOT + ); + put(raw, "supportEventType", EventType.POSITION_RECORDED.name()); raw.putAll(sourceSpecificPayload(rs)); return Map.of("raw", raw); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographSpecificConditionRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographSpecificConditionRowMapper.java index de9a3b3..054ae83 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographSpecificConditionRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/AbstractTachographSpecificConditionRowMapper.java @@ -19,7 +19,6 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; import java.util.UUID; @@ -62,7 +61,7 @@ abstract class AbstractTachographSpecificConditionRowMapper implements Extractio null, detailsFactory.specificCondition(), sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null, - detailsFactory.payloadFromMap(payload(rs)), + detailsFactory.payloadFromMap(payload(rs, context, driverRef, vehicleRef, eventType, lifecycle)), false, context.packageInfo() ); @@ -108,9 +107,18 @@ abstract class AbstractTachographSpecificConditionRowMapper implements Extractio ); } - private Map payload(ResultSet rs) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "sourceRowId", string(rs, "source_row_id")); + private Map payload( + ResultSet rs, + ExtractionContext context, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + EventType eventType, + EventLifecycle lifecycle + ) throws SQLException { + Map raw = TachographRawPayloadSupport.baseRawPayload( + rs, context, driverRef, vehicleRef, eventType, lifecycle + ); + put(raw, "supportEventType", eventType == null ? null : eventType.name()); raw.putAll(sourceSpecificPayload(rs)); return Map.of("raw", raw); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/SpeedingEventRowMapper.java b/src/main/java/at/procon/eventhub/tachograph/service/SpeedingEventRowMapper.java index 94f7a6a..0cb8d02 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/SpeedingEventRowMapper.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/SpeedingEventRowMapper.java @@ -20,7 +20,6 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; import java.util.UUID; @@ -63,7 +62,7 @@ public class SpeedingEventRowMapper implements ExtractionRowMapper payload(ResultSet rs) throws SQLException { - Map raw = new LinkedHashMap<>(); - put(raw, "sourceRowId", string(rs, "source_row_id")); + private Map payload( + ResultSet rs, + ExtractionContext context, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + EventLifecycle lifecycle + ) throws SQLException { + Map raw = TachographRawPayloadSupport.baseRawPayload( + rs, context, driverRef, vehicleRef, EventType.SPEEDING, lifecycle + ); + put(raw, "supportEventType", EventType.SPEEDING.name()); + put(raw, "avgSpeedKmh", decimal(rs, "avg_speed_kmh")); + put(raw, "maxSpeedKmh", decimal(rs, "max_speed_kmh")); return Map.of("raw", raw); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographRawPayloadSupport.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographRawPayloadSupport.java new file mode 100644 index 0000000..5f75550 --- /dev/null +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographRawPayloadSupport.java @@ -0,0 +1,148 @@ +package at.procon.eventhub.tachograph.service; + +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import at.procon.eventhub.importing.extraction.ExtractionContext; +import at.procon.eventhub.tachograph.dto.TachographImportRequest; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +final class TachographRawPayloadSupport { + + private TachographRawPayloadSupport() { + } + + static Map baseRawPayload( + ResultSet rs, + ExtractionContext context, + DriverRefDto driverRef, + VehicleRefDto vehicleRef, + EventType eventType, + EventLifecycle lifecycle + ) throws SQLException { + Map raw = new LinkedHashMap<>(); + String sourceRowId = string(rs, "source_row_id"); + put(raw, "sourceRowId", sourceRowId); + if (sourceRowId != null) { + put(raw, "sourceRowIds", List.of(sourceRowId)); + put(raw, "intervalId", intervalId(context, sourceRowId)); + } + put(raw, "sourceKind", context == null || context.planItem() == null ? null : context.planItem().sourceKind()); + put(raw, "extractionCode", context == null || context.planItem() == null ? null : context.planItem().extractionCode()); + put(raw, "level", "RAW_INTERVAL"); + put(raw, "eventType", eventType == null ? null : eventType.name()); + put(raw, "lifecycle", lifecycle == null ? null : lifecycle.name()); + + enrichDriver(raw, driverRef); + enrichVehicle(raw, vehicleRef); + enrichSourcePackage(raw, rs); + enrichOdometer(raw, rs); + return raw; + } + + static void put(Map target, String key, Object value) { + if (value != null) { + target.put(key, value instanceof Enum enumValue ? enumValue.name() : value); + } + } + + private static String intervalId(ExtractionContext context, String sourceRowId) { + String extractionCode = context == null || context.planItem() == null ? null : context.planItem().extractionCode(); + return "TACHOGRAPH:" + (extractionCode == null || extractionCode.isBlank() ? "UNKNOWN" : extractionCode) + ":" + sourceRowId; + } + + private static void enrichDriver(Map raw, DriverRefDto driverRef) { + if (driverRef == null || !driverRef.hasAnyReference()) { + return; + } + put(raw, "driverKey", driverRef.stableKey()); + put(raw, "driverSourceEntityId", driverRef.sourceEntityId()); + DriverCardRefDto driverCard = driverRef.driverCard(); + if (driverCard != null && driverCard.hasValue()) { + put(raw, "driverCardKey", driverCard.stableKey()); + put(raw, "driverCardNation", driverCard.nation()); + put(raw, "driverCardNationNumericCode", driverCard.nationNumericCode()); + put(raw, "driverCardNumber", driverCard.number()); + } + } + + private static void enrichVehicle(Map raw, VehicleRefDto vehicleRef) { + if (vehicleRef == null || !vehicleRef.hasAnyReference()) { + return; + } + put(raw, "vehicleKey", vehicleRef.stableKey()); + put(raw, "vehicleSourceEntityId", vehicleRef.sourceVehicleEntityId()); + put(raw, "vehicleVin", vehicleRef.vin()); + put(raw, "vehicleRegistrationSourceEntityId", vehicleRef.sourceRegistrationEntityId()); + VehicleRegistrationRefDto registration = vehicleRef.vehicleRegistration(); + if (registration != null && registration.hasValue()) { + put(raw, "registrationKey", registration.stableKey()); + put(raw, "vehicleRegistrationNation", registration.nation()); + put(raw, "vehicleRegistrationNationNumericCode", registration.nationNumericCode()); + put(raw, "vehicleRegistrationNumber", registration.number()); + } + } + + private static void enrichSourcePackage(Map raw, ResultSet rs) throws SQLException { + put(raw, "sourcePackageKind", string(rs, "source_package_kind")); + put(raw, "sourcePackageId", string(rs, "source_package_id")); + put(raw, "sourcePackageEntityId", string(rs, "source_package_entity_id")); + } + + private static void enrichOdometer(Map raw, ResultSet rs) throws SQLException { + Long odometerM = longValue(rs, "odometer_m"); + put(raw, "odometerM", odometerM); + if (odometerM != null) { + put(raw, "odometerKm", odometerM / 1_000L); + } + } + + private static String string(ResultSet rs, String column) throws SQLException { + try { + String value = rs.getString(column); + return value == null || value.isBlank() ? null : value.trim(); + } catch (SQLException ex) { + if (missingColumn(ex)) { + return null; + } + throw ex; + } + } + + private static Long longValue(ResultSet rs, String column) throws SQLException { + Object value; + try { + value = rs.getObject(column); + } catch (SQLException ex) { + if (missingColumn(ex)) { + return null; + } + throw ex; + } + if (value == null) { + return null; + } + if (value instanceof Number number) { + return number.longValue(); + } + return Long.parseLong(value.toString()); + } + + private static boolean missingColumn(SQLException ex) { + String state = ex.getSQLState(); + String message = ex.getMessage(); + return "S0022".equals(state) + || "42703".equals(state) + || (message != null && message.toLowerCase(java.util.Locale.ROOT).contains("column") + && (message.toLowerCase(java.util.Locale.ROOT).contains("not found") + || message.toLowerCase(java.util.Locale.ROOT).contains("not valid") + || message.toLowerCase(java.util.Locale.ROOT).contains("invalid"))); + } +} diff --git a/src/main/resources/db/migration/V9__introduce_driver_identity_model.sql b/src/main/resources/db/migration/V10__introduce_driver_identity_model.sql similarity index 100% rename from src/main/resources/db/migration/V9__introduce_driver_identity_model.sql rename to src/main/resources/db/migration/V10__introduce_driver_identity_model.sql diff --git a/src/main/resources/db/migration/V11__ensure_event_source_record.sql b/src/main/resources/db/migration/V11__ensure_event_source_record.sql deleted file mode 100644 index 9e844b2..0000000 --- a/src/main/resources/db/migration/V11__ensure_event_source_record.sql +++ /dev/null @@ -1,52 +0,0 @@ -create extension if not exists timescaledb; - -create table if not exists eventhub.event_source_record ( - source_record_key_hash text primary key, - event_occurred_at timestamptz not null, - event_id uuid not null, - created_at timestamptz not null default now() -); - -insert into eventhub.event_source_record(source_record_key_hash, event_occurred_at, event_id) -select event.source_record_key_hash, event.occurred_at, event.id -from eventhub.event -on conflict (source_record_key_hash) do nothing; - -alter table eventhub.event_detail - drop constraint if exists fk_event_detail_event; - -alter table eventhub.event_source_record - drop constraint if exists fk_event_source_record_event; - -drop index if exists eventhub.ux_event_source_record; - -do $$ -begin - if not exists ( - select 1 - from timescaledb_information.hypertables - where hypertable_schema = 'eventhub' - and hypertable_name = 'event' - ) then - perform create_hypertable( - 'eventhub.event', - 'occurred_at', - chunk_time_interval => interval '7 days', - migrate_data => true - ); - end if; -end $$; - -alter table eventhub.event_detail - add constraint fk_event_detail_event foreign key (event_occurred_at, event_id) - references eventhub.event(occurred_at, id) - on delete cascade; - -alter table eventhub.event_source_record - add constraint fk_event_source_record_event foreign key (event_occurred_at, event_id) - references eventhub.event(occurred_at, id) - on delete cascade - deferrable initially deferred; - -create index if not exists idx_event_source_record_event - on eventhub.event_source_record(event_occurred_at, event_id); diff --git a/src/main/resources/db/migration/V10__introduce_driver_card_model.sql b/src/main/resources/db/migration/V11__introduce_driver_card_model.sql similarity index 100% rename from src/main/resources/db/migration/V10__introduce_driver_card_model.sql rename to src/main/resources/db/migration/V11__introduce_driver_card_model.sql diff --git a/src/main/resources/db/migration/V10__make_event_hypertable.sql b/src/main/resources/db/migration/V12__make_event_hypertable.sql similarity index 100% rename from src/main/resources/db/migration/V10__make_event_hypertable.sql rename to src/main/resources/db/migration/V12__make_event_hypertable.sql diff --git a/src/main/resources/db/migration/V12__add_yellowfox_d8_detail_indexes.sql b/src/main/resources/db/migration/V13__add_yellowfox_d8_detail_indexes.sql similarity index 100% rename from src/main/resources/db/migration/V12__add_yellowfox_d8_detail_indexes.sql rename to src/main/resources/db/migration/V13__add_yellowfox_d8_detail_indexes.sql diff --git a/src/main/resources/db/migration/V13__add_nation_reference_and_numeric_codes.sql b/src/main/resources/db/migration/V14__add_nation_reference_and_numeric_codes.sql similarity index 100% rename from src/main/resources/db/migration/V13__add_nation_reference_and_numeric_codes.sql rename to src/main/resources/db/migration/V14__add_nation_reference_and_numeric_codes.sql diff --git a/src/test/java/at/procon/eventhub/MigrationScriptNamingTest.java b/src/test/java/at/procon/eventhub/MigrationScriptNamingTest.java new file mode 100644 index 0000000..c72fe5a --- /dev/null +++ b/src/test/java/at/procon/eventhub/MigrationScriptNamingTest.java @@ -0,0 +1,45 @@ +package at.procon.eventhub; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.regex.Pattern; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class MigrationScriptNamingTest { + + private static final Pattern VERSION_PATTERN = Pattern.compile("V([0-9]+)__.*\\.sql"); + + @Test + void versionedFlywayMigrationsUseUniqueVersionNumbers() throws IOException { + Path migrationDirectory = Path.of("src/main/resources/db/migration"); + Map> namesByVersion = new TreeMap<>(); + + try (var stream = Files.list(migrationDirectory)) { + stream.filter(path -> path.getFileName().toString().endsWith(".sql")) + .map(path -> path.getFileName().toString()) + .forEach(fileName -> { + String version = version(fileName); + if (version != null) { + namesByVersion.computeIfAbsent(version, ignored -> new ArrayList<>()).add(fileName); + } + }); + } + + assertThat(namesByVersion) + .allSatisfy((version, names) -> assertThat(names) + .as("Flyway version V%s must be unique", version) + .hasSize(1)); + } + + private String version(String fileName) { + var matcher = VERSION_PATTERN.matcher(fileName); + return matcher.matches() ? matcher.group(1) : null; + } +} diff --git a/src/test/java/at/procon/eventhub/persistence/EventHubEventReadRepositoryTest.java b/src/test/java/at/procon/eventhub/persistence/EventHubEventReadRepositoryTest.java new file mode 100644 index 0000000..753dc43 --- /dev/null +++ b/src/test/java/at/procon/eventhub/persistence/EventHubEventReadRepositoryTest.java @@ -0,0 +1,36 @@ +package at.procon.eventhub.persistence; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +class EventHubEventReadRepositoryTest { + + private final ObjectMapper objectMapper = new ObjectMapper().findAndRegisterModules(); + + @Test + void parsesRegularJsonObjectColumn() { + JsonNode node = EventHubEventReadRepository.parseJsonColumn( + objectMapper, + "{\"raw\":{\"driverKey\":\"12:123\"},\"details\":{\"k\":1}}" + ); + + assertThat(node.isObject()).isTrue(); + assertThat(node.get("raw").get("driverKey").asText()).isEqualTo("12:123"); + assertThat(node.get("details").get("k").asInt()).isEqualTo(1); + } + + @Test + void unwrapsDoubleEncodedJsonObjectColumn() { + JsonNode node = EventHubEventReadRepository.parseJsonColumn( + objectMapper, + "\"{\\\"raw\\\":{\\\"driverKey\\\":\\\"12:123\\\"},\\\"details\\\":{\\\"k\\\":1}}\"" + ); + + assertThat(node.isObject()).isTrue(); + assertThat(node.get("raw").get("driverKey").asText()).isEqualTo("12:123"); + assertThat(node.get("details").get("k").asInt()).isEqualTo(1); + } +} diff --git a/src/test/java/at/procon/eventhub/tachograph/service/TachographDbRowMapperTimelineMetadataTest.java b/src/test/java/at/procon/eventhub/tachograph/service/TachographDbRowMapperTimelineMetadataTest.java new file mode 100644 index 0000000..6494bef --- /dev/null +++ b/src/test/java/at/procon/eventhub/tachograph/service/TachographDbRowMapperTimelineMetadataTest.java @@ -0,0 +1,153 @@ +package at.procon.eventhub.tachograph.service; + +import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.importing.ImportPlanItemDto; +import at.procon.eventhub.importing.ImportTimeChunkDto; +import at.procon.eventhub.importing.extraction.ExtractionContext; +import at.procon.eventhub.processing.service.UnifiedEventTimelineReconstructor; +import at.procon.eventhub.service.EventDetailsFactory; +import at.procon.eventhub.tachograph.dto.TachographImportRequest; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class TachographDbRowMapperTimelineMetadataTest { + + private final EventDetailsFactory detailsFactory = new EventDetailsFactory(new ObjectMapper()); + private final UnifiedEventTimelineReconstructor reconstructor = new UnifiedEventTimelineReconstructor(); + + @Test + void activityRowsExposeTimelineMetadataForReconstruction() throws SQLException { + CardActivityRowMapper mapper = new CardActivityRowMapper(detailsFactory); + ExtractionContext context = context(EventFamily.DRIVER_ACTIVITY, "DRIVER_CARD", "CARD_ACTIVITY"); + EventHubEventDto started = mapper.map(activityRow("START", "2026-05-22T06:00:00Z"), 0, context); + EventHubEventDto ended = mapper.map(activityRow("END", "2026-05-22T07:15:00Z"), 1, context); + + JsonNode raw = started.payload().get("raw"); + assertThat(raw.get("intervalId").asText()).isEqualTo("TACHOGRAPH:CARD_ACTIVITY:CA-1"); + assertThat(raw.get("sourceKind").asText()).isEqualTo("DRIVER_CARD"); + assertThat(raw.get("extractionCode").asText()).isEqualTo("CARD_ACTIVITY"); + assertThat(raw.get("sourceRowIds").get(0).asText()).isEqualTo("CA-1"); + assertThat(raw.get("driverKey").asText()).contains("driver-1"); + assertThat(raw.get("vehicleKey").asText()).contains("VIN123"); + assertThat(raw.get("registrationKey").asText()).contains("W-12345"); + + var timeline = reconstructor.reconstruct(UUID.randomUUID(), "driver-1", List.of(started, ended)); + + assertThat(timeline.activityIntervals()).hasSize(1); + var interval = timeline.activityIntervals().getFirst(); + assertThat(interval.intervalId()).isEqualTo("TACHOGRAPH:CARD_ACTIVITY:CA-1"); + assertThat(interval.activityType()).isEqualTo("DRIVE"); + assertThat(interval.sourceKind()).isEqualTo("DRIVER_CARD"); + assertThat(interval.sourceIntervalIds()).containsExactly("CA-1"); + assertThat(interval.registrationKey()).contains("W-12345"); + assertThat(interval.vehicleKey()).contains("VIN123"); + assertThat(interval.durationSeconds()).isEqualTo(4_500L); + } + + @Test + void cardUsageRowsExposeTimelineMetadataForVehicleUsageReconstruction() throws SQLException { + CardVehiclesUsedCardEventRowMapper mapper = new CardVehiclesUsedCardEventRowMapper(detailsFactory); + ExtractionContext context = context(EventFamily.DRIVER_CARD, "DRIVER_CARD", "CARD_VEHICLES_USED"); + EventHubEventDto inserted = mapper.map(cardUsageRow("CARD_INSERTED", "INSERT", "2026-05-22T06:00:00Z", 120_000L), 0, context); + EventHubEventDto withdrawn = mapper.map(cardUsageRow("CARD_WITHDRAWN", "WITHDRAW", "2026-05-22T18:00:00Z", 120_550L), 1, context); + + JsonNode raw = inserted.payload().get("raw"); + assertThat(raw.get("intervalId").asText()).isEqualTo("TACHOGRAPH:CARD_VEHICLES_USED:CVU-1"); + assertThat(raw.get("sourceRowIds").get(0).asText()).isEqualTo("CVU-1"); + assertThat(raw.get("registrationKey").asText()).contains("W-12345"); + + var timeline = reconstructor.reconstruct(UUID.randomUUID(), "driver-1", List.of(inserted, withdrawn)); + + assertThat(timeline.vehicleUsageIntervals()).hasSize(1); + var usage = timeline.vehicleUsageIntervals().getFirst(); + assertThat(usage.intervalId()).isEqualTo("TACHOGRAPH:CARD_VEHICLES_USED:CVU-1"); + assertThat(usage.sourceKind()).isEqualTo("DRIVER_CARD"); + assertThat(usage.sourceIntervalIds()).containsExactly("CVU-1"); + assertThat(usage.registrationKey()).contains("W-12345"); + assertThat(usage.vehicleKey()).contains("VIN123"); + assertThat(usage.odometerBeginKm()).isEqualTo(120L); + assertThat(usage.odometerEndKm()).isEqualTo(120L); + assertThat(usage.durationSeconds()).isEqualTo(43_200L); + } + + private ResultSet activityRow(String lifecycle, String occurredAt) throws SQLException { + ResultSet rs = commonRow("CA-1", occurredAt, 120_000L); + when(rs.getString("external_source_event_id")).thenReturn("CARD_ACTIVITY:CA-1:" + lifecycle); + when(rs.getString("event_type")).thenReturn("DRIVE"); + when(rs.getString("lifecycle")).thenReturn(lifecycle); + when(rs.getObject("activity_code")).thenReturn(3); + when(rs.getObject("card_slot")).thenReturn(0); + when(rs.getObject("card_status")).thenReturn(0); + when(rs.getObject("driving_status")).thenReturn(0); + return rs; + } + + private ResultSet cardUsageRow(String eventType, String lifecycle, String occurredAt, Long odometerM) throws SQLException { + ResultSet rs = commonRow("CVU-1", occurredAt, odometerM); + when(rs.getString("external_source_event_id")).thenReturn("CARD_VEHICLES_USED:CVU-1:" + lifecycle); + when(rs.getString("event_type")).thenReturn(eventType); + when(rs.getString("lifecycle")).thenReturn(lifecycle); + when(rs.getObject("card_slot")).thenReturn(0); + return rs; + } + + private ResultSet commonRow(String sourceRowId, String occurredAt, Long odometerM) throws SQLException { + ResultSet rs = mock(ResultSet.class); + when(rs.getString("source_row_id")).thenReturn(sourceRowId); + when(rs.getObject("occurred_at")).thenReturn(OffsetDateTime.parse(occurredAt)); + when(rs.getObject("received_partner_at")).thenReturn(null); + when(rs.getObject("odometer_m")).thenReturn(odometerM); + + when(rs.getString("driver_source_entity_id")).thenReturn("driver-1"); + when(rs.getString("driver_card_nation")).thenReturn("A"); + when(rs.getString("driver_card_number")).thenReturn("12345678901234"); + + when(rs.getString("vehicle_source_entity_id")).thenReturn("vehicle-1"); + when(rs.getString("vehicle_vin")).thenReturn("VIN123"); + when(rs.getString("vehicle_registration_source_entity_id")).thenReturn("reg-1"); + when(rs.getString("vehicle_registration_nation")).thenReturn("A"); + when(rs.getString("vehicle_registration_number")).thenReturn("W-12345"); + + when(rs.getString("source_package_kind")).thenReturn("DRIVER_CARD"); + when(rs.getString("source_package_id")).thenReturn("package-1"); + when(rs.getString("source_package_entity_id")).thenReturn("card-1"); + when(rs.getObject("source_package_period_from")).thenReturn(null); + when(rs.getObject("source_package_period_to")).thenReturn(null); + when(rs.getObject("source_package_imported_at")).thenReturn(null); + return rs; + } + + private ExtractionContext context(EventFamily family, String sourceKind, String extractionCode) { + return new ExtractionContext<>( + UUID.randomUUID(), + UUID.randomUUID(), + 1, + null, + new ImportPlanItemDto( + family, + sourceKind, + extractionCode, + List.of("source-table"), + "DRIVER", + "test extraction", + AcquisitionStrategy.OCCURRED_AT_WINDOW_WITH_OVERLAP + ), + new ImportTimeChunkDto(0, null, null), + new EventSourceDto("TACHOGRAPH", "MIXED", "TACHOGRAPH_DB", "test", null, null), + null + ); + } +}