diff --git a/README.md b/README.md index 8cab9f9..db8e700 100644 --- a/README.md +++ b/README.md @@ -806,3 +806,23 @@ Replace `NoopTachographExtractionBatchExecutor` with an implementation that: ``` The import cursor is advanced only when the executor reports `executed=true`. The default no-op executor returns `executed=false`, so it does not move cursors accidentally. + +## JDBC tachograph extraction + +The first concrete extractor is `JdbcTachographExtractionBatchExecutor`. It is enabled only when `eventhub.tachograph.datasource.jdbc-url` is configured. Without that datasource, the application keeps using `NoopTachographExtractionBatchExecutor`. + +Currently implemented extraction definitions: + +```text +CARD_ACTIVITY -> DRIVER_ACTIVITY / DRIVER_CARD / CardActivity +VU_ACTIVITY -> DRIVER_ACTIVITY / VEHICLE_UNIT / VUActivity +``` + +SQL resources: + +```text +src/main/resources/sql/tachograph/card-activity.sql +src/main/resources/sql/tachograph/vu-activity.sql +``` + +These files are the schema-specific layer. They must keep the documented column aliases because the row mappers consume those aliases to build `EventHubEventDto`, including `sourcePackageRef`, driver/vehicle references, event details and raw payload. diff --git a/pom.xml b/pom.xml index 4cb53e2..332ce7d 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,11 @@ postgresql runtime + + com.microsoft.sqlserver + mssql-jdbc + runtime + org.springframework.boot diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index 640c678..ae3f158 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -72,6 +72,9 @@ public class EventHubProperties { /** Configured tenant/source import plans. */ private List importPlans = new ArrayList<>(); + /** Optional external tachograph DB datasource. If absent, the no-op extractor is used. */ + private TachographDataSource datasource = new TachographDataSource(); + public int getDefaultChunkDays() { return defaultChunkDays; } @@ -119,6 +122,53 @@ public class EventHubProperties { public void setImportPlans(List importPlans) { this.importPlans = importPlans == null ? new ArrayList<>() : importPlans; } + + public TachographDataSource getDatasource() { + return datasource; + } + + public void setDatasource(TachographDataSource datasource) { + this.datasource = datasource == null ? new TachographDataSource() : datasource; + } + } + + public static class TachographDataSource { + private String jdbcUrl; + private String username; + private String password; + private String driverClassName; + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getDriverClassName() { + return driverClassName; + } + + public void setDriverClassName(String driverClassName) { + this.driverClassName = driverClassName; + } } public static class ConfiguredImportPlan { diff --git a/src/main/java/at/procon/eventhub/config/TachographDataSourceConfig.java b/src/main/java/at/procon/eventhub/config/TachographDataSourceConfig.java new file mode 100644 index 0000000..11fceaf --- /dev/null +++ b/src/main/java/at/procon/eventhub/config/TachographDataSourceConfig.java @@ -0,0 +1,34 @@ +package at.procon.eventhub.config; + +import javax.sql.DataSource; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.datasource.DriverManagerDataSource; + +@Configuration +@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''") +public class TachographDataSourceConfig { + + @Bean + public DataSource tachographDataSource(EventHubProperties properties) { + EventHubProperties.TachographDataSource config = properties.getTachograph().getDatasource(); + DriverManagerDataSource dataSource = new DriverManagerDataSource(); + dataSource.setUrl(config.getJdbcUrl()); + dataSource.setUsername(config.getUsername()); + dataSource.setPassword(config.getPassword()); + if (config.getDriverClassName() != null && !config.getDriverClassName().isBlank()) { + dataSource.setDriverClassName(config.getDriverClassName()); + } + return dataSource; + } + + @Bean + public NamedParameterJdbcTemplate tachographNamedParameterJdbcTemplate( + @Qualifier("tachographDataSource") DataSource tachographDataSource + ) { + return new NamedParameterJdbcTemplate(tachographDataSource); + } +} diff --git a/src/main/java/at/procon/eventhub/dto/ImportCursorStateDto.java b/src/main/java/at/procon/eventhub/dto/ImportCursorStateDto.java new file mode 100644 index 0000000..f5956ca --- /dev/null +++ b/src/main/java/at/procon/eventhub/dto/ImportCursorStateDto.java @@ -0,0 +1,11 @@ +package at.procon.eventhub.dto; + +import java.time.OffsetDateTime; + +public record ImportCursorStateDto( + OffsetDateTime lastSourcePackageImportedAt, + String lastSourcePackageId, + OffsetDateTime lastSourceRowUpdatedAt, + OffsetDateTime lastOccurredTo +) { +} diff --git a/src/main/java/at/procon/eventhub/persistence/ImportCursorRepository.java b/src/main/java/at/procon/eventhub/persistence/ImportCursorRepository.java index 68615bc..c9d3eaa 100644 --- a/src/main/java/at/procon/eventhub/persistence/ImportCursorRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/ImportCursorRepository.java @@ -2,6 +2,7 @@ package at.procon.eventhub.persistence; import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.EventFamily; +import at.procon.eventhub.dto.ImportCursorStateDto; import at.procon.eventhub.dto.TachographExtractionBatchResultDto; import java.util.UUID; import org.springframework.jdbc.core.JdbcTemplate; @@ -16,6 +17,45 @@ public class ImportCursorRepository { this.jdbcTemplate = jdbcTemplate; } + public ImportCursorStateDto findCursor( + String tenantKey, + int eventSourceId, + String scopeHash, + EventFamily eventFamily, + String sourceKind, + AcquisitionStrategy strategy + ) { + return jdbcTemplate.query( + """ + select last_source_package_imported_at, + last_source_package_id, + last_source_row_updated_at, + last_occurred_to + from eventhub.import_cursor + where tenant_key = ? + and event_source_id = ? + and scope_hash = ? + and event_family = ? + and source_kind = ? + and cursor_type = ? + """, + rs -> rs.next() + ? new ImportCursorStateDto( + rs.getObject("last_source_package_imported_at", java.time.OffsetDateTime.class), + rs.getString("last_source_package_id"), + rs.getObject("last_source_row_updated_at", java.time.OffsetDateTime.class), + rs.getObject("last_occurred_to", java.time.OffsetDateTime.class) + ) + : null, + tenantKey, + eventSourceId, + scopeHash, + eventFamily.name(), + sourceKind, + strategy.name() + ); + } + public void advanceCursor( String tenantKey, int eventSourceId, diff --git a/src/main/java/at/procon/eventhub/service/AbstractTachographActivityRowMapper.java b/src/main/java/at/procon/eventhub/service/AbstractTachographActivityRowMapper.java new file mode 100644 index 0000000..4df29e2 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/AbstractTachographActivityRowMapper.java @@ -0,0 +1,293 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.CardSlot; +import at.procon.eventhub.dto.CardStatus; +import at.procon.eventhub.dto.DriverCardRefDto; +import at.procon.eventhub.dto.DriverRefDto; +import at.procon.eventhub.dto.DrivingStatus; +import at.procon.eventhub.dto.EventDomain; +import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventLifecycle; +import at.procon.eventhub.dto.EventType; +import at.procon.eventhub.dto.SourcePackageRefDto; +import at.procon.eventhub.dto.VehicleRefDto; +import at.procon.eventhub.dto.VehicleRegistrationRefDto; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.LinkedHashMap; +import java.util.Locale; +import java.util.Map; +import java.util.UUID; + +abstract class AbstractTachographActivityRowMapper implements TachographExtractionRowMapper { + + private final EventDetailsFactory detailsFactory; + + protected AbstractTachographActivityRowMapper(EventDetailsFactory detailsFactory) { + this.detailsFactory = detailsFactory; + } + + @Override + public EventHubEventDto map(ResultSet rs, int rowNum, TachographExtractionContext context) throws SQLException { + OffsetDateTime occurredAt = offsetDateTime(rs, "occurred_at"); + SourcePackageRefDto sourcePackageRef = sourcePackageRef(rs); + DriverRefDto driverRef = driverRef(rs); + VehicleRefDto vehicleRef = vehicleRef(rs); + + String externalSourceEventId = string(rs, "external_source_event_id"); + if (externalSourceEventId == null) { + externalSourceEventId = defaultExternalSourceEventId(context, rowNum, occurredAt, sourcePackageRef, driverRef, vehicleRef); + } + + return new EventHubEventDto( + UUID.randomUUID(), + externalSourceEventId, + driverRef, + vehicleRef, + occurredAt, + offsetDateTime(rs, "received_partner_at"), + OffsetDateTime.now(), + EventDomain.DRIVER_ACTIVITY, + eventType(rs), + lifecycle(rs), + longValue(rs, "odometer_m"), + null, + detailsFactory.driverActivity(cardSlot(rs), cardStatus(rs), drivingStatus(rs)), + sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null, + detailsFactory.payloadFromMap(payload(rs, context)), + false, + context.packageInfo() + ); + } + + protected abstract Map sourceSpecificPayload(ResultSet rs) throws SQLException; + + private DriverRefDto driverRef(ResultSet rs) throws SQLException { + DriverCardRefDto driverCard = null; + String cardNumber = string(rs, "driver_card_number"); + if (cardNumber != null) { + driverCard = new DriverCardRefDto(string(rs, "driver_card_nation"), cardNumber); + } + DriverRefDto driverRef = new DriverRefDto(string(rs, "driver_source_entity_id"), driverCard); + return driverRef.hasAnyReference() ? driverRef : null; + } + + private VehicleRefDto vehicleRef(ResultSet rs) throws SQLException { + VehicleRegistrationRefDto registration = null; + String registrationNumber = string(rs, "vehicle_registration_number"); + if (registrationNumber != null) { + registration = new VehicleRegistrationRefDto(string(rs, "vehicle_registration_nation"), registrationNumber); + } + VehicleRefDto vehicleRef = new VehicleRefDto(string(rs, "vehicle_source_entity_id"), string(rs, "vehicle_vin"), registration); + return vehicleRef.hasAnyReference() ? vehicleRef : null; + } + + private SourcePackageRefDto sourcePackageRef(ResultSet rs) throws SQLException { + return new SourcePackageRefDto( + string(rs, "source_package_kind"), + string(rs, "source_package_id"), + string(rs, "source_package_entity_id"), + offsetDateTime(rs, "source_package_period_from"), + offsetDateTime(rs, "source_package_period_to"), + offsetDateTime(rs, "source_package_imported_at") + ); + } + + private Map payload(ResultSet rs, TachographExtractionContext context) throws SQLException { + Map raw = new LinkedHashMap<>(); + raw.put("extractionCode", context.planItem().extractionCode()); + raw.put("sourceKind", context.planItem().sourceKind()); + raw.put("sourceTables", context.planItem().sourceTables()); + put(raw, "sourceRowId", string(rs, "source_row_id")); + put(raw, "activityCode", object(rs, "activity_code")); + put(raw, "activityText", string(rs, "activity_text")); + put(raw, "eventType", string(rs, "event_type")); + put(raw, "lifecycle", string(rs, "lifecycle")); + put(raw, "cardSlot", object(rs, "card_slot")); + put(raw, "cardStatus", object(rs, "card_status")); + put(raw, "drivingStatus", object(rs, "driving_status")); + put(raw, "driverSourceEntityId", string(rs, "driver_source_entity_id")); + put(raw, "driverCardNation", string(rs, "driver_card_nation")); + put(raw, "driverCardNumber", string(rs, "driver_card_number")); + put(raw, "vehicleSourceEntityId", string(rs, "vehicle_source_entity_id")); + put(raw, "vehicleVin", string(rs, "vehicle_vin")); + put(raw, "vehicleRegistrationNation", string(rs, "vehicle_registration_nation")); + put(raw, "vehicleRegistrationNumber", string(rs, "vehicle_registration_number")); + raw.putAll(sourceSpecificPayload(rs)); + return Map.of("raw", raw); + } + + private String defaultExternalSourceEventId( + TachographExtractionContext context, + int rowNum, + OffsetDateTime occurredAt, + SourcePackageRefDto sourcePackageRef, + DriverRefDto driverRef, + VehicleRefDto vehicleRef + ) { + String sourcePackageId = sourcePackageRef == null || sourcePackageRef.sourcePackageId() == null + ? "NO_SOURCE_PACKAGE" + : sourcePackageRef.sourcePackageId(); + String subject = driverRef != null && driverRef.hasAnyReference() + ? driverRef.stableKey() + : vehicleRef == null ? "NO_SUBJECT" : vehicleRef.stableKey(); + return "TACHOGRAPH:" + context.planItem().extractionCode() + + ":" + sourcePackageId + + ":" + occurredAt + + ":" + subject + + ":ROW-" + rowNum; + } + + protected String string(ResultSet rs, String column) throws SQLException { + String value = rs.getString(column); + return value == null || value.isBlank() ? null : value.trim(); + } + + protected Object object(ResultSet rs, String column) throws SQLException { + return rs.getObject(column); + } + + protected OffsetDateTime offsetDateTime(ResultSet rs, String column) throws SQLException { + Object value = rs.getObject(column); + if (value == null) { + return null; + } + if (value instanceof OffsetDateTime offsetDateTime) { + return offsetDateTime; + } + if (value instanceof Timestamp timestamp) { + return timestamp.toInstant().atOffset(ZoneOffset.UTC); + } + if (value instanceof java.time.LocalDateTime localDateTime) { + return localDateTime.atOffset(ZoneOffset.UTC); + } + return OffsetDateTime.parse(value.toString()); + } + + private Long longValue(ResultSet rs, String column) throws SQLException { + Object value = rs.getObject(column); + if (value == null) { + return null; + } + if (value instanceof Number number) { + return number.longValue(); + } + return Long.parseLong(value.toString()); + } + + private EventType eventType(ResultSet rs) throws SQLException { + String eventType = string(rs, "event_type"); + if (eventType != null) { + return parseEnum(EventType.class, eventType, EventType.UNKNOWN_ACTIVITY); + } + return switch (integer(rs, "activity_code", -1)) { + case 0 -> EventType.BREAK_REST; + case 1 -> EventType.AVAILABILITY; + case 2 -> EventType.WORK; + case 3 -> EventType.DRIVE; + default -> EventType.UNKNOWN_ACTIVITY; + }; + } + + private EventLifecycle lifecycle(ResultSet rs) throws SQLException { + return parseEnum(EventLifecycle.class, string(rs, "lifecycle"), EventLifecycle.SNAPSHOT); + } + + private CardSlot cardSlot(ResultSet rs) throws SQLException { + Object value = object(rs, "card_slot"); + if (value instanceof Number number) { + return switch (number.intValue()) { + case 0 -> CardSlot.DRIVER; + case 1 -> CardSlot.CO_DRIVER; + default -> null; + }; + } + String text = value == null ? null : value.toString(); + Integer parsed = parseInteger(text); + if (parsed != null) { + return parsed == 0 ? CardSlot.DRIVER : parsed == 1 ? CardSlot.CO_DRIVER : null; + } + return parseEnum(CardSlot.class, text, null); + } + + private CardStatus cardStatus(ResultSet rs) throws SQLException { + Object value = object(rs, "card_status"); + if (value instanceof Number number) { + return switch (number.intValue()) { + case 0 -> CardStatus.INSERTED; + case 1 -> CardStatus.NOT_INSERTED; + default -> null; + }; + } + String text = value == null ? null : value.toString(); + Integer parsed = parseInteger(text); + if (parsed != null) { + return parsed == 0 ? CardStatus.INSERTED : parsed == 1 ? CardStatus.NOT_INSERTED : null; + } + return parseEnum(CardStatus.class, text, null); + } + + private DrivingStatus drivingStatus(ResultSet rs) throws SQLException { + Object value = object(rs, "driving_status"); + if (value instanceof Number number) { + return switch (number.intValue()) { + case 0 -> DrivingStatus.SINGLE; + case 1 -> DrivingStatus.CREW; + default -> DrivingStatus.UNKNOWN; + }; + } + String text = value == null ? null : value.toString(); + Integer parsed = parseInteger(text); + if (parsed != null) { + return parsed == 0 ? DrivingStatus.SINGLE : parsed == 1 ? DrivingStatus.CREW : DrivingStatus.UNKNOWN; + } + return parseEnum(DrivingStatus.class, text, DrivingStatus.UNKNOWN); + } + + private int integer(ResultSet rs, String column, int fallback) throws SQLException { + Object value = rs.getObject(column); + if (value == null) { + return fallback; + } + if (value instanceof Number number) { + return number.intValue(); + } + try { + return Integer.parseInt(value.toString()); + } catch (NumberFormatException ignored) { + return fallback; + } + } + + private > T parseEnum(Class type, String value, T fallback) { + if (value == null) { + return fallback; + } + String normalized = value.trim().toUpperCase(Locale.ROOT).replace('-', '_').replace(' ', '_'); + try { + return Enum.valueOf(type, normalized); + } catch (IllegalArgumentException ignored) { + return fallback; + } + } + + private Integer parseInteger(String value) { + if (value == null || value.isBlank()) { + return null; + } + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException ignored) { + return null; + } + } + + protected void put(Map target, String key, Object value) { + if (value != null) { + target.put(key, value); + } + } +} diff --git a/src/main/java/at/procon/eventhub/service/CardActivityRowMapper.java b/src/main/java/at/procon/eventhub/service/CardActivityRowMapper.java new file mode 100644 index 0000000..0c0379f --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/CardActivityRowMapper.java @@ -0,0 +1,22 @@ +package at.procon.eventhub.service; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class CardActivityRowMapper extends AbstractTachographActivityRowMapper { + + public CardActivityRowMapper(EventDetailsFactory detailsFactory) { + super(detailsFactory); + } + + @Override + protected Map sourceSpecificPayload(ResultSet rs) throws SQLException { + Map raw = new LinkedHashMap<>(); + put(raw, "cardActivityId", string(rs, "card_activity_id")); + return raw; + } +} diff --git a/src/main/java/at/procon/eventhub/service/JdbcTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/service/JdbcTachographExtractionBatchExecutor.java new file mode 100644 index 0000000..75b5947 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/JdbcTachographExtractionBatchExecutor.java @@ -0,0 +1,177 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.ImportCursorStateDto; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.dto.TachographExtractionBatchResultDto; +import at.procon.eventhub.dto.TachographImportPlanItemDto; +import at.procon.eventhub.dto.TachographImportRequest; +import at.procon.eventhub.dto.TimeChunkDto; +import at.procon.eventhub.persistence.ImportCursorRepository; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.camel.ProducerTemplate; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Service; +import org.springframework.util.StreamUtils; + +@Service +@ConditionalOnBean(name = "tachographNamedParameterJdbcTemplate") +@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''") +public class JdbcTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor { + + private final NamedParameterJdbcTemplate tachographJdbcTemplate; + private final ProducerTemplate producerTemplate; + private final ResourceLoader resourceLoader; + private final TachographExtractionDefinitionRegistry definitionRegistry; + private final ImportCursorRepository importCursorRepository; + + public JdbcTachographExtractionBatchExecutor( + @Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate tachographJdbcTemplate, + ProducerTemplate producerTemplate, + ResourceLoader resourceLoader, + TachographExtractionDefinitionRegistry definitionRegistry, + ImportCursorRepository importCursorRepository + ) { + this.tachographJdbcTemplate = tachographJdbcTemplate; + this.producerTemplate = producerTemplate; + this.resourceLoader = resourceLoader; + this.definitionRegistry = definitionRegistry; + this.importCursorRepository = importCursorRepository; + } + + @Override + public TachographExtractionBatchResultDto execute( + UUID importRunId, + UUID packageId, + int eventSourceId, + TachographImportRequest request, + TachographImportPlanItemDto planItem, + TimeChunkDto chunk + ) { + TachographExtractionDefinition definition = definitionRegistry.findByCode(planItem.extractionCode()) + .orElseThrow(() -> new IllegalArgumentException("No tachograph extraction definition for " + planItem.extractionCode())); + + ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk); + var packageInfo = new at.procon.eventhub.dto.EventHubPackageRequest( + request.tenantKey(), + eventSourceFor(request, planItem), + request.sourceGroup(), + chunkScope, + planItem.eventFamily().name(), + chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(), + "TACHOGRAPH:" + planItem.sourceKind() + ":" + planItem.extractionCode() + ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence() + ); + TachographExtractionContext context = new TachographExtractionContext( + importRunId, + packageId, + eventSourceId, + request, + planItem, + chunk, + packageInfo.eventSource(), + packageInfo + ); + + String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey(); + ImportCursorStateDto cursor = importCursorRepository.findCursor( + request.tenantKey(), + eventSourceId, + scopeHash, + planItem.eventFamily(), + planItem.sourceKind(), + request.acquisitionStrategy() + ); + + Map params = parameters(request, chunkScope, cursor); + String sql = loadSql(definition.sqlResource()); + var events = tachographJdbcTemplate.query(sql, params, (rs, rowNum) -> definition.rowMapper().map(rs, rowNum, context)); + events.forEach(event -> producerTemplate.sendBody("direct:eventhub-normalized-input", event)); + + OffsetDateTime lastSourcePackageImportedAt = events.stream() + .map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().importedIntoSourceAt()) + .filter(value -> value != null) + .max(OffsetDateTime::compareTo) + .orElse(cursor == null ? null : cursor.lastSourcePackageImportedAt()); + String lastSourcePackageId = events.stream() + .filter(event -> event.sourcePackageRef() != null && event.sourcePackageRef().importedIntoSourceAt() != null) + .max((left, right) -> left.sourcePackageRef().importedIntoSourceAt().compareTo(right.sourcePackageRef().importedIntoSourceAt())) + .map(event -> event.sourcePackageRef().sourcePackageId()) + .orElse(cursor == null ? null : cursor.lastSourcePackageId()); + + return new TachographExtractionBatchResultDto( + packageId, + planItem.extractionCode(), + planItem.sourceKind(), + events.size(), + events.size(), + events.size(), + 0, + true, + lastSourcePackageImportedAt, + lastSourcePackageId, + null, + chunk.occurredTo() + ); + } + + private Map parameters(TachographImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) { + Map params = new HashMap<>(); + params.put("tenantKey", request.tenantKey()); + params.put("occurredFrom", scope == null ? null : scope.occurredFrom()); + params.put("occurredTo", scope == null ? null : scope.occurredTo()); + params.put("rootOrganisationId", scope == null || scope.rootSourceOrganisation() == null ? null : scope.rootSourceOrganisation().sourceEntityId()); + params.put("includeChildren", scope != null && scope.includeChildren()); + params.put("lastSourcePackageImportedAt", cursor == null ? null : cursor.lastSourcePackageImportedAt()); + params.put("lastSourcePackageId", cursor == null ? null : cursor.lastSourcePackageId()); + params.put("lastSourceRowUpdatedAt", cursor == null ? null : cursor.lastSourceRowUpdatedAt()); + params.put("lastOccurredTo", cursor == null ? null : cursor.lastOccurredTo()); + return params; + } + + private ImportScopeDto chunkScope(ImportScopeDto scope, TimeChunkDto chunk) { + if (scope == null) { + return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo()); + } + return new ImportScopeDto( + scope.type(), + scope.rootSourceOrganisation(), + scope.includeChildren(), + chunk.occurredFrom(), + chunk.occurredTo() + ); + } + + private at.procon.eventhub.dto.EventSourceDto eventSourceFor(TachographImportRequest request, TachographImportPlanItemDto planItem) { + String sourceKey = switch (planItem.sourceKind()) { + case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT"; + case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD"; + default -> request.eventSource().sourceKey(); + }; + return new at.procon.eventhub.dto.EventSourceDto( + request.eventSource().providerKey(), + planItem.sourceKind(), + sourceKey, + request.eventSource().sourceInstanceKey(), + request.eventSource().tenantProviderSettingKey(), + request.eventSource().externalFleetKey() + ); + } + + private String loadSql(String location) { + Resource resource = resourceLoader.getResource(location); + try (var inputStream = resource.getInputStream()) { + return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new IllegalStateException("Cannot load tachograph extraction SQL resource " + location, e); + } + } +} diff --git a/src/main/java/at/procon/eventhub/service/NoopTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/service/NoopTachographExtractionBatchExecutor.java index 33343bf..34368fd 100644 --- a/src/main/java/at/procon/eventhub/service/NoopTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/service/NoopTachographExtractionBatchExecutor.java @@ -5,6 +5,8 @@ import at.procon.eventhub.dto.TachographImportPlanItemDto; import at.procon.eventhub.dto.TachographImportRequest; import at.procon.eventhub.dto.TimeChunkDto; import java.util.UUID; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -17,6 +19,8 @@ import org.springframework.stereotype.Service; * direct:eventhub-normalized-input. */ @Service +@ConditionalOnMissingBean(TachographExtractionBatchExecutor.class) +@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' == ''") public class NoopTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor { private static final Logger log = LoggerFactory.getLogger(NoopTachographExtractionBatchExecutor.class); @@ -25,6 +29,7 @@ public class NoopTachographExtractionBatchExecutor implements TachographExtracti public TachographExtractionBatchResultDto execute( UUID importRunId, UUID packageId, + int eventSourceId, TachographImportRequest request, TachographImportPlanItemDto planItem, TimeChunkDto chunk diff --git a/src/main/java/at/procon/eventhub/service/TachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/service/TachographExtractionBatchExecutor.java index 4ad9ebf..40a872c 100644 --- a/src/main/java/at/procon/eventhub/service/TachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/service/TachographExtractionBatchExecutor.java @@ -11,6 +11,7 @@ public interface TachographExtractionBatchExecutor { TachographExtractionBatchResultDto execute( UUID importRunId, UUID packageId, + int eventSourceId, TachographImportRequest request, TachographImportPlanItemDto planItem, TimeChunkDto chunk diff --git a/src/main/java/at/procon/eventhub/service/TachographExtractionContext.java b/src/main/java/at/procon/eventhub/service/TachographExtractionContext.java new file mode 100644 index 0000000..658f9ed --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographExtractionContext.java @@ -0,0 +1,20 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.TachographImportPlanItemDto; +import at.procon.eventhub.dto.TachographImportRequest; +import at.procon.eventhub.dto.TimeChunkDto; +import java.util.UUID; + +public record TachographExtractionContext( + UUID importRunId, + UUID packageId, + int eventSourceId, + TachographImportRequest request, + TachographImportPlanItemDto planItem, + TimeChunkDto chunk, + EventSourceDto eventSource, + EventHubPackageRequest packageInfo +) { +} diff --git a/src/main/java/at/procon/eventhub/service/TachographExtractionDefinition.java b/src/main/java/at/procon/eventhub/service/TachographExtractionDefinition.java new file mode 100644 index 0000000..44db71c --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographExtractionDefinition.java @@ -0,0 +1,13 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventFamily; + +public record TachographExtractionDefinition( + String code, + EventFamily eventFamily, + String sourceKind, + String entityAxis, + String sqlResource, + TachographExtractionRowMapper rowMapper +) { +} diff --git a/src/main/java/at/procon/eventhub/service/TachographExtractionDefinitionRegistry.java b/src/main/java/at/procon/eventhub/service/TachographExtractionDefinitionRegistry.java new file mode 100644 index 0000000..1bb40be --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographExtractionDefinitionRegistry.java @@ -0,0 +1,50 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventFamily; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.springframework.stereotype.Component; + +@Component +public class TachographExtractionDefinitionRegistry { + + private final Map definitionsByCode; + + public TachographExtractionDefinitionRegistry( + CardActivityRowMapper cardActivityRowMapper, + VuActivityRowMapper vuActivityRowMapper + ) { + List definitions = List.of( + new TachographExtractionDefinition( + "CARD_ACTIVITY", + EventFamily.DRIVER_ACTIVITY, + "DRIVER_CARD", + "DRIVER", + "classpath:sql/tachograph/card-activity.sql", + cardActivityRowMapper + ), + new TachographExtractionDefinition( + "VU_ACTIVITY", + EventFamily.DRIVER_ACTIVITY, + "VEHICLE_UNIT", + "VEHICLE", + "classpath:sql/tachograph/vu-activity.sql", + vuActivityRowMapper + ) + ); + this.definitionsByCode = definitions.stream() + .collect(Collectors.toUnmodifiableMap(definition -> normalize(definition.code()), Function.identity())); + } + + public Optional findByCode(String code) { + return Optional.ofNullable(definitionsByCode.get(normalize(code))); + } + + private String normalize(String value) { + return value == null ? "" : value.trim().toUpperCase(Locale.ROOT); + } +} diff --git a/src/main/java/at/procon/eventhub/service/TachographExtractionRowMapper.java b/src/main/java/at/procon/eventhub/service/TachographExtractionRowMapper.java new file mode 100644 index 0000000..2a41ec5 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/TachographExtractionRowMapper.java @@ -0,0 +1,10 @@ +package at.procon.eventhub.service; + +import at.procon.eventhub.dto.EventHubEventDto; +import java.sql.ResultSet; +import java.sql.SQLException; + +public interface TachographExtractionRowMapper { + + EventHubEventDto map(ResultSet rs, int rowNum, TachographExtractionContext context) throws SQLException; +} diff --git a/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java b/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java index dd775f3..3e9f07c 100644 --- a/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/service/TachographImportExecutionService.java @@ -132,6 +132,7 @@ public class TachographImportExecutionService { TachographExtractionBatchResultDto result = extractionBatchExecutor.execute( importRunId, plannedPackage.packageId(), + plannedPackage.eventSourceId(), request, plannedPackage.planItem(), plannedPackage.chunk() diff --git a/src/main/java/at/procon/eventhub/service/VuActivityRowMapper.java b/src/main/java/at/procon/eventhub/service/VuActivityRowMapper.java new file mode 100644 index 0000000..2805fb1 --- /dev/null +++ b/src/main/java/at/procon/eventhub/service/VuActivityRowMapper.java @@ -0,0 +1,22 @@ +package at.procon.eventhub.service; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class VuActivityRowMapper extends AbstractTachographActivityRowMapper { + + public VuActivityRowMapper(EventDetailsFactory detailsFactory) { + super(detailsFactory); + } + + @Override + protected Map sourceSpecificPayload(ResultSet rs) throws SQLException { + Map raw = new LinkedHashMap<>(); + put(raw, "vuActivityId", string(rs, "vu_activity_id")); + return raw; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 6e5e5e0..d3e08b6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -33,6 +33,13 @@ eventhub: default-chunk-days: 1 occurred-at-overlap: 7d + # Configure this block to enable JdbcTachographExtractionBatchExecutor. + # datasource: + # jdbc-url: jdbc:sqlserver://localhost:1433;databaseName=tachograph;encrypt=true;trustServerCertificate=true + # username: tachograph_user + # password: change-me + # driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver + # Enables the scheduler that regularly triggers configured tachograph import plans. scheduler-enabled: false scheduler-poll-interval-ms: 60000 diff --git a/src/main/resources/sql/tachograph/card-activity.sql b/src/main/resources/sql/tachograph/card-activity.sql new file mode 100644 index 0000000..0eee2a9 --- /dev/null +++ b/src/main/resources/sql/tachograph/card-activity.sql @@ -0,0 +1,50 @@ +/* + * CardActivity DRIVER_ACTIVITY extraction. + * + * Adapt table and column names to the concrete tachograph SQL Server schema. + * Keep the selected aliases stable; CardActivityRowMapper consumes this alias contract. + */ +select + cast(ca.Id as varchar(128)) as source_row_id, + cast(ca.Id as varchar(128)) as card_activity_id, + concat('TACHOGRAPH:CARD_ACTIVITY:', ca.Id) as external_source_event_id, + + ca.ActivityTime as occurred_at, + ca.ReceivedAt as received_partner_at, + ca.Activity as activity_code, + ca.ActivityText as activity_text, + ca.EventType as event_type, + ca.Lifecycle as lifecycle, + ca.CardSlot as card_slot, + ca.CardStatus as card_status, + ca.DrivingStatus as driving_status, + ca.OdometerM as odometer_m, + + cast(ca.DriverId as varchar(128)) as driver_source_entity_id, + ca.DriverCardNation as driver_card_nation, + ca.DriverCardNumber as driver_card_number, + + cast(ca.VehicleId as varchar(128)) as vehicle_source_entity_id, + ca.VehicleVin as vehicle_vin, + ca.VehicleRegistrationNation as vehicle_registration_nation, + ca.VehicleRegistrationNumber as vehicle_registration_number, + + 'DRIVER_CARD' as source_package_kind, + cast(ca.SourcePackageId as varchar(128)) as source_package_id, + cast(ca.DriverId as varchar(128)) as source_package_entity_id, + ca.SourcePackagePeriodFrom as source_package_period_from, + ca.SourcePackagePeriodTo as source_package_period_to, + ca.SourcePackageImportedAt as source_package_imported_at +from CardActivity ca +where (:occurredFrom is null or ca.ActivityTime >= :occurredFrom) + and (:occurredTo is null or ca.ActivityTime < :occurredTo) + and ( + :lastSourcePackageImportedAt is null + or ca.SourcePackageImportedAt > :lastSourcePackageImportedAt + or (ca.SourcePackageImportedAt = :lastSourcePackageImportedAt and cast(ca.SourcePackageId as varchar(128)) > :lastSourcePackageId) + ) +/* + * Organisation filtering is schema-specific. Once stable joins are known, add: + * and (:rootOrganisationId is null or ...) + * using :includeChildren to decide whether to match only root or descendants. + */ diff --git a/src/main/resources/sql/tachograph/vu-activity.sql b/src/main/resources/sql/tachograph/vu-activity.sql new file mode 100644 index 0000000..d32d391 --- /dev/null +++ b/src/main/resources/sql/tachograph/vu-activity.sql @@ -0,0 +1,50 @@ +/* + * VUActivity DRIVER_ACTIVITY extraction. + * + * Adapt table and column names to the concrete tachograph SQL Server schema. + * Keep the selected aliases stable; VuActivityRowMapper consumes this alias contract. + */ +select + cast(va.Id as varchar(128)) as source_row_id, + cast(va.Id as varchar(128)) as vu_activity_id, + concat('TACHOGRAPH:VU_ACTIVITY:', va.Id) as external_source_event_id, + + va.ActivityTime as occurred_at, + va.ReceivedAt as received_partner_at, + va.Activity as activity_code, + va.ActivityText as activity_text, + va.EventType as event_type, + va.Lifecycle as lifecycle, + va.CardSlot as card_slot, + va.CardStatus as card_status, + va.DrivingStatus as driving_status, + va.OdometerM as odometer_m, + + cast(va.DriverId as varchar(128)) as driver_source_entity_id, + va.DriverCardNation as driver_card_nation, + va.DriverCardNumber as driver_card_number, + + cast(va.VehicleId as varchar(128)) as vehicle_source_entity_id, + va.VehicleVin as vehicle_vin, + va.VehicleRegistrationNation as vehicle_registration_nation, + va.VehicleRegistrationNumber as vehicle_registration_number, + + 'VEHICLE_UNIT' as source_package_kind, + cast(va.SourcePackageId as varchar(128)) as source_package_id, + cast(va.VehicleId as varchar(128)) as source_package_entity_id, + va.SourcePackagePeriodFrom as source_package_period_from, + va.SourcePackagePeriodTo as source_package_period_to, + va.SourcePackageImportedAt as source_package_imported_at +from VUActivity va +where (:occurredFrom is null or va.ActivityTime >= :occurredFrom) + and (:occurredTo is null or va.ActivityTime < :occurredTo) + and ( + :lastSourcePackageImportedAt is null + or va.SourcePackageImportedAt > :lastSourcePackageImportedAt + or (va.SourcePackageImportedAt = :lastSourcePackageImportedAt and cast(va.SourcePackageId as varchar(128)) > :lastSourcePackageId) + ) +/* + * Organisation filtering is schema-specific. Once stable joins are known, add: + * and (:rootOrganisationId is null or ...) + * using :includeChildren to decide whether to match only root or descendants. + */