Add JDBC tachograph activity extraction
This commit is contained in:
parent
21a4fe12fb
commit
29ba656ed2
20
README.md
20
README.md
|
|
@ -806,3 +806,23 @@ Replace `NoopTachographExtractionBatchExecutor` with an implementation that:
|
|||
```
|
||||
|
||||
The import cursor is advanced only when the executor reports `executed=true`. The default no-op executor returns `executed=false`, so it does not move cursors accidentally.
|
||||
|
||||
## JDBC tachograph extraction
|
||||
|
||||
The first concrete extractor is `JdbcTachographExtractionBatchExecutor`. It is enabled only when `eventhub.tachograph.datasource.jdbc-url` is configured. Without that datasource, the application keeps using `NoopTachographExtractionBatchExecutor`.
|
||||
|
||||
Currently implemented extraction definitions:
|
||||
|
||||
```text
|
||||
CARD_ACTIVITY -> DRIVER_ACTIVITY / DRIVER_CARD / CardActivity
|
||||
VU_ACTIVITY -> DRIVER_ACTIVITY / VEHICLE_UNIT / VUActivity
|
||||
```
|
||||
|
||||
SQL resources:
|
||||
|
||||
```text
|
||||
src/main/resources/sql/tachograph/card-activity.sql
|
||||
src/main/resources/sql/tachograph/vu-activity.sql
|
||||
```
|
||||
|
||||
These files are the schema-specific layer. They must keep the documented column aliases because the row mappers consume those aliases to build `EventHubEventDto`, including `sourcePackageRef`, driver/vehicle references, event details and raw payload.
|
||||
|
|
|
|||
5
pom.xml
5
pom.xml
|
|
@ -82,6 +82,11 @@
|
|||
<artifactId>postgresql</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.microsoft.sqlserver</groupId>
|
||||
<artifactId>mssql-jdbc</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
|
|
|||
|
|
@ -72,6 +72,9 @@ public class EventHubProperties {
|
|||
/** Configured tenant/source import plans. */
|
||||
private List<ConfiguredImportPlan> importPlans = new ArrayList<>();
|
||||
|
||||
/** Optional external tachograph DB datasource. If absent, the no-op extractor is used. */
|
||||
private TachographDataSource datasource = new TachographDataSource();
|
||||
|
||||
public int getDefaultChunkDays() {
|
||||
return defaultChunkDays;
|
||||
}
|
||||
|
|
@ -119,6 +122,53 @@ public class EventHubProperties {
|
|||
public void setImportPlans(List<ConfiguredImportPlan> importPlans) {
|
||||
this.importPlans = importPlans == null ? new ArrayList<>() : importPlans;
|
||||
}
|
||||
|
||||
public TachographDataSource getDatasource() {
|
||||
return datasource;
|
||||
}
|
||||
|
||||
public void setDatasource(TachographDataSource datasource) {
|
||||
this.datasource = datasource == null ? new TachographDataSource() : datasource;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TachographDataSource {
|
||||
private String jdbcUrl;
|
||||
private String username;
|
||||
private String password;
|
||||
private String driverClassName;
|
||||
|
||||
public String getJdbcUrl() {
|
||||
return jdbcUrl;
|
||||
}
|
||||
|
||||
public void setJdbcUrl(String jdbcUrl) {
|
||||
this.jdbcUrl = jdbcUrl;
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
return username;
|
||||
}
|
||||
|
||||
public void setUsername(String username) {
|
||||
this.username = username;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public String getDriverClassName() {
|
||||
return driverClassName;
|
||||
}
|
||||
|
||||
public void setDriverClassName(String driverClassName) {
|
||||
this.driverClassName = driverClassName;
|
||||
}
|
||||
}
|
||||
|
||||
public static class ConfiguredImportPlan {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,34 @@
|
|||
package at.procon.eventhub.config;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||
import org.springframework.jdbc.datasource.DriverManagerDataSource;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''")
|
||||
public class TachographDataSourceConfig {
|
||||
|
||||
@Bean
|
||||
public DataSource tachographDataSource(EventHubProperties properties) {
|
||||
EventHubProperties.TachographDataSource config = properties.getTachograph().getDatasource();
|
||||
DriverManagerDataSource dataSource = new DriverManagerDataSource();
|
||||
dataSource.setUrl(config.getJdbcUrl());
|
||||
dataSource.setUsername(config.getUsername());
|
||||
dataSource.setPassword(config.getPassword());
|
||||
if (config.getDriverClassName() != null && !config.getDriverClassName().isBlank()) {
|
||||
dataSource.setDriverClassName(config.getDriverClassName());
|
||||
}
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public NamedParameterJdbcTemplate tachographNamedParameterJdbcTemplate(
|
||||
@Qualifier("tachographDataSource") DataSource tachographDataSource
|
||||
) {
|
||||
return new NamedParameterJdbcTemplate(tachographDataSource);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
package at.procon.eventhub.dto;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
|
||||
public record ImportCursorStateDto(
|
||||
OffsetDateTime lastSourcePackageImportedAt,
|
||||
String lastSourcePackageId,
|
||||
OffsetDateTime lastSourceRowUpdatedAt,
|
||||
OffsetDateTime lastOccurredTo
|
||||
) {
|
||||
}
|
||||
|
|
@ -2,6 +2,7 @@ package at.procon.eventhub.persistence;
|
|||
|
||||
import at.procon.eventhub.dto.AcquisitionStrategy;
|
||||
import at.procon.eventhub.dto.EventFamily;
|
||||
import at.procon.eventhub.dto.ImportCursorStateDto;
|
||||
import at.procon.eventhub.dto.TachographExtractionBatchResultDto;
|
||||
import java.util.UUID;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
|
@ -16,6 +17,45 @@ public class ImportCursorRepository {
|
|||
this.jdbcTemplate = jdbcTemplate;
|
||||
}
|
||||
|
||||
public ImportCursorStateDto findCursor(
|
||||
String tenantKey,
|
||||
int eventSourceId,
|
||||
String scopeHash,
|
||||
EventFamily eventFamily,
|
||||
String sourceKind,
|
||||
AcquisitionStrategy strategy
|
||||
) {
|
||||
return jdbcTemplate.query(
|
||||
"""
|
||||
select last_source_package_imported_at,
|
||||
last_source_package_id,
|
||||
last_source_row_updated_at,
|
||||
last_occurred_to
|
||||
from eventhub.import_cursor
|
||||
where tenant_key = ?
|
||||
and event_source_id = ?
|
||||
and scope_hash = ?
|
||||
and event_family = ?
|
||||
and source_kind = ?
|
||||
and cursor_type = ?
|
||||
""",
|
||||
rs -> rs.next()
|
||||
? new ImportCursorStateDto(
|
||||
rs.getObject("last_source_package_imported_at", java.time.OffsetDateTime.class),
|
||||
rs.getString("last_source_package_id"),
|
||||
rs.getObject("last_source_row_updated_at", java.time.OffsetDateTime.class),
|
||||
rs.getObject("last_occurred_to", java.time.OffsetDateTime.class)
|
||||
)
|
||||
: null,
|
||||
tenantKey,
|
||||
eventSourceId,
|
||||
scopeHash,
|
||||
eventFamily.name(),
|
||||
sourceKind,
|
||||
strategy.name()
|
||||
);
|
||||
}
|
||||
|
||||
public void advanceCursor(
|
||||
String tenantKey,
|
||||
int eventSourceId,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,293 @@
|
|||
package at.procon.eventhub.service;
|
||||
|
||||
import at.procon.eventhub.dto.CardSlot;
|
||||
import at.procon.eventhub.dto.CardStatus;
|
||||
import at.procon.eventhub.dto.DriverCardRefDto;
|
||||
import at.procon.eventhub.dto.DriverRefDto;
|
||||
import at.procon.eventhub.dto.DrivingStatus;
|
||||
import at.procon.eventhub.dto.EventDomain;
|
||||
import at.procon.eventhub.dto.EventHubEventDto;
|
||||
import at.procon.eventhub.dto.EventLifecycle;
|
||||
import at.procon.eventhub.dto.EventType;
|
||||
import at.procon.eventhub.dto.SourcePackageRefDto;
|
||||
import at.procon.eventhub.dto.VehicleRefDto;
|
||||
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
abstract class AbstractTachographActivityRowMapper implements TachographExtractionRowMapper {
|
||||
|
||||
private final EventDetailsFactory detailsFactory;
|
||||
|
||||
protected AbstractTachographActivityRowMapper(EventDetailsFactory detailsFactory) {
|
||||
this.detailsFactory = detailsFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHubEventDto map(ResultSet rs, int rowNum, TachographExtractionContext context) throws SQLException {
|
||||
OffsetDateTime occurredAt = offsetDateTime(rs, "occurred_at");
|
||||
SourcePackageRefDto sourcePackageRef = sourcePackageRef(rs);
|
||||
DriverRefDto driverRef = driverRef(rs);
|
||||
VehicleRefDto vehicleRef = vehicleRef(rs);
|
||||
|
||||
String externalSourceEventId = string(rs, "external_source_event_id");
|
||||
if (externalSourceEventId == null) {
|
||||
externalSourceEventId = defaultExternalSourceEventId(context, rowNum, occurredAt, sourcePackageRef, driverRef, vehicleRef);
|
||||
}
|
||||
|
||||
return new EventHubEventDto(
|
||||
UUID.randomUUID(),
|
||||
externalSourceEventId,
|
||||
driverRef,
|
||||
vehicleRef,
|
||||
occurredAt,
|
||||
offsetDateTime(rs, "received_partner_at"),
|
||||
OffsetDateTime.now(),
|
||||
EventDomain.DRIVER_ACTIVITY,
|
||||
eventType(rs),
|
||||
lifecycle(rs),
|
||||
longValue(rs, "odometer_m"),
|
||||
null,
|
||||
detailsFactory.driverActivity(cardSlot(rs), cardStatus(rs), drivingStatus(rs)),
|
||||
sourcePackageRef != null && sourcePackageRef.hasAnyReference() ? sourcePackageRef : null,
|
||||
detailsFactory.payloadFromMap(payload(rs, context)),
|
||||
false,
|
||||
context.packageInfo()
|
||||
);
|
||||
}
|
||||
|
||||
protected abstract Map<String, Object> sourceSpecificPayload(ResultSet rs) throws SQLException;
|
||||
|
||||
private DriverRefDto driverRef(ResultSet rs) throws SQLException {
|
||||
DriverCardRefDto driverCard = null;
|
||||
String cardNumber = string(rs, "driver_card_number");
|
||||
if (cardNumber != null) {
|
||||
driverCard = new DriverCardRefDto(string(rs, "driver_card_nation"), cardNumber);
|
||||
}
|
||||
DriverRefDto driverRef = new DriverRefDto(string(rs, "driver_source_entity_id"), driverCard);
|
||||
return driverRef.hasAnyReference() ? driverRef : null;
|
||||
}
|
||||
|
||||
private VehicleRefDto vehicleRef(ResultSet rs) throws SQLException {
|
||||
VehicleRegistrationRefDto registration = null;
|
||||
String registrationNumber = string(rs, "vehicle_registration_number");
|
||||
if (registrationNumber != null) {
|
||||
registration = new VehicleRegistrationRefDto(string(rs, "vehicle_registration_nation"), registrationNumber);
|
||||
}
|
||||
VehicleRefDto vehicleRef = new VehicleRefDto(string(rs, "vehicle_source_entity_id"), string(rs, "vehicle_vin"), registration);
|
||||
return vehicleRef.hasAnyReference() ? vehicleRef : null;
|
||||
}
|
||||
|
||||
private SourcePackageRefDto sourcePackageRef(ResultSet rs) throws SQLException {
|
||||
return new SourcePackageRefDto(
|
||||
string(rs, "source_package_kind"),
|
||||
string(rs, "source_package_id"),
|
||||
string(rs, "source_package_entity_id"),
|
||||
offsetDateTime(rs, "source_package_period_from"),
|
||||
offsetDateTime(rs, "source_package_period_to"),
|
||||
offsetDateTime(rs, "source_package_imported_at")
|
||||
);
|
||||
}
|
||||
|
||||
private Map<String, Object> payload(ResultSet rs, TachographExtractionContext context) throws SQLException {
|
||||
Map<String, Object> raw = new LinkedHashMap<>();
|
||||
raw.put("extractionCode", context.planItem().extractionCode());
|
||||
raw.put("sourceKind", context.planItem().sourceKind());
|
||||
raw.put("sourceTables", context.planItem().sourceTables());
|
||||
put(raw, "sourceRowId", string(rs, "source_row_id"));
|
||||
put(raw, "activityCode", object(rs, "activity_code"));
|
||||
put(raw, "activityText", string(rs, "activity_text"));
|
||||
put(raw, "eventType", string(rs, "event_type"));
|
||||
put(raw, "lifecycle", string(rs, "lifecycle"));
|
||||
put(raw, "cardSlot", object(rs, "card_slot"));
|
||||
put(raw, "cardStatus", object(rs, "card_status"));
|
||||
put(raw, "drivingStatus", object(rs, "driving_status"));
|
||||
put(raw, "driverSourceEntityId", string(rs, "driver_source_entity_id"));
|
||||
put(raw, "driverCardNation", string(rs, "driver_card_nation"));
|
||||
put(raw, "driverCardNumber", string(rs, "driver_card_number"));
|
||||
put(raw, "vehicleSourceEntityId", string(rs, "vehicle_source_entity_id"));
|
||||
put(raw, "vehicleVin", string(rs, "vehicle_vin"));
|
||||
put(raw, "vehicleRegistrationNation", string(rs, "vehicle_registration_nation"));
|
||||
put(raw, "vehicleRegistrationNumber", string(rs, "vehicle_registration_number"));
|
||||
raw.putAll(sourceSpecificPayload(rs));
|
||||
return Map.of("raw", raw);
|
||||
}
|
||||
|
||||
private String defaultExternalSourceEventId(
|
||||
TachographExtractionContext context,
|
||||
int rowNum,
|
||||
OffsetDateTime occurredAt,
|
||||
SourcePackageRefDto sourcePackageRef,
|
||||
DriverRefDto driverRef,
|
||||
VehicleRefDto vehicleRef
|
||||
) {
|
||||
String sourcePackageId = sourcePackageRef == null || sourcePackageRef.sourcePackageId() == null
|
||||
? "NO_SOURCE_PACKAGE"
|
||||
: sourcePackageRef.sourcePackageId();
|
||||
String subject = driverRef != null && driverRef.hasAnyReference()
|
||||
? driverRef.stableKey()
|
||||
: vehicleRef == null ? "NO_SUBJECT" : vehicleRef.stableKey();
|
||||
return "TACHOGRAPH:" + context.planItem().extractionCode()
|
||||
+ ":" + sourcePackageId
|
||||
+ ":" + occurredAt
|
||||
+ ":" + subject
|
||||
+ ":ROW-" + rowNum;
|
||||
}
|
||||
|
||||
protected String string(ResultSet rs, String column) throws SQLException {
|
||||
String value = rs.getString(column);
|
||||
return value == null || value.isBlank() ? null : value.trim();
|
||||
}
|
||||
|
||||
protected Object object(ResultSet rs, String column) throws SQLException {
|
||||
return rs.getObject(column);
|
||||
}
|
||||
|
||||
protected OffsetDateTime offsetDateTime(ResultSet rs, String column) throws SQLException {
|
||||
Object value = rs.getObject(column);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof OffsetDateTime offsetDateTime) {
|
||||
return offsetDateTime;
|
||||
}
|
||||
if (value instanceof Timestamp timestamp) {
|
||||
return timestamp.toInstant().atOffset(ZoneOffset.UTC);
|
||||
}
|
||||
if (value instanceof java.time.LocalDateTime localDateTime) {
|
||||
return localDateTime.atOffset(ZoneOffset.UTC);
|
||||
}
|
||||
return OffsetDateTime.parse(value.toString());
|
||||
}
|
||||
|
||||
private Long longValue(ResultSet rs, String column) throws SQLException {
|
||||
Object value = rs.getObject(column);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof Number number) {
|
||||
return number.longValue();
|
||||
}
|
||||
return Long.parseLong(value.toString());
|
||||
}
|
||||
|
||||
private EventType eventType(ResultSet rs) throws SQLException {
|
||||
String eventType = string(rs, "event_type");
|
||||
if (eventType != null) {
|
||||
return parseEnum(EventType.class, eventType, EventType.UNKNOWN_ACTIVITY);
|
||||
}
|
||||
return switch (integer(rs, "activity_code", -1)) {
|
||||
case 0 -> EventType.BREAK_REST;
|
||||
case 1 -> EventType.AVAILABILITY;
|
||||
case 2 -> EventType.WORK;
|
||||
case 3 -> EventType.DRIVE;
|
||||
default -> EventType.UNKNOWN_ACTIVITY;
|
||||
};
|
||||
}
|
||||
|
||||
private EventLifecycle lifecycle(ResultSet rs) throws SQLException {
|
||||
return parseEnum(EventLifecycle.class, string(rs, "lifecycle"), EventLifecycle.SNAPSHOT);
|
||||
}
|
||||
|
||||
private CardSlot cardSlot(ResultSet rs) throws SQLException {
|
||||
Object value = object(rs, "card_slot");
|
||||
if (value instanceof Number number) {
|
||||
return switch (number.intValue()) {
|
||||
case 0 -> CardSlot.DRIVER;
|
||||
case 1 -> CardSlot.CO_DRIVER;
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
String text = value == null ? null : value.toString();
|
||||
Integer parsed = parseInteger(text);
|
||||
if (parsed != null) {
|
||||
return parsed == 0 ? CardSlot.DRIVER : parsed == 1 ? CardSlot.CO_DRIVER : null;
|
||||
}
|
||||
return parseEnum(CardSlot.class, text, null);
|
||||
}
|
||||
|
||||
private CardStatus cardStatus(ResultSet rs) throws SQLException {
|
||||
Object value = object(rs, "card_status");
|
||||
if (value instanceof Number number) {
|
||||
return switch (number.intValue()) {
|
||||
case 0 -> CardStatus.INSERTED;
|
||||
case 1 -> CardStatus.NOT_INSERTED;
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
String text = value == null ? null : value.toString();
|
||||
Integer parsed = parseInteger(text);
|
||||
if (parsed != null) {
|
||||
return parsed == 0 ? CardStatus.INSERTED : parsed == 1 ? CardStatus.NOT_INSERTED : null;
|
||||
}
|
||||
return parseEnum(CardStatus.class, text, null);
|
||||
}
|
||||
|
||||
private DrivingStatus drivingStatus(ResultSet rs) throws SQLException {
|
||||
Object value = object(rs, "driving_status");
|
||||
if (value instanceof Number number) {
|
||||
return switch (number.intValue()) {
|
||||
case 0 -> DrivingStatus.SINGLE;
|
||||
case 1 -> DrivingStatus.CREW;
|
||||
default -> DrivingStatus.UNKNOWN;
|
||||
};
|
||||
}
|
||||
String text = value == null ? null : value.toString();
|
||||
Integer parsed = parseInteger(text);
|
||||
if (parsed != null) {
|
||||
return parsed == 0 ? DrivingStatus.SINGLE : parsed == 1 ? DrivingStatus.CREW : DrivingStatus.UNKNOWN;
|
||||
}
|
||||
return parseEnum(DrivingStatus.class, text, DrivingStatus.UNKNOWN);
|
||||
}
|
||||
|
||||
private int integer(ResultSet rs, String column, int fallback) throws SQLException {
|
||||
Object value = rs.getObject(column);
|
||||
if (value == null) {
|
||||
return fallback;
|
||||
}
|
||||
if (value instanceof Number number) {
|
||||
return number.intValue();
|
||||
}
|
||||
try {
|
||||
return Integer.parseInt(value.toString());
|
||||
} catch (NumberFormatException ignored) {
|
||||
return fallback;
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends Enum<T>> T parseEnum(Class<T> type, String value, T fallback) {
|
||||
if (value == null) {
|
||||
return fallback;
|
||||
}
|
||||
String normalized = value.trim().toUpperCase(Locale.ROOT).replace('-', '_').replace(' ', '_');
|
||||
try {
|
||||
return Enum.valueOf(type, normalized);
|
||||
} catch (IllegalArgumentException ignored) {
|
||||
return fallback;
|
||||
}
|
||||
}
|
||||
|
||||
private Integer parseInteger(String value) {
|
||||
if (value == null || value.isBlank()) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return Integer.parseInt(value.trim());
|
||||
} catch (NumberFormatException ignored) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
protected void put(Map<String, Object> target, String key, Object value) {
|
||||
if (value != null) {
|
||||
target.put(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
package at.procon.eventhub.service;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class CardActivityRowMapper extends AbstractTachographActivityRowMapper {
|
||||
|
||||
public CardActivityRowMapper(EventDetailsFactory detailsFactory) {
|
||||
super(detailsFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> sourceSpecificPayload(ResultSet rs) throws SQLException {
|
||||
Map<String, Object> raw = new LinkedHashMap<>();
|
||||
put(raw, "cardActivityId", string(rs, "card_activity_id"));
|
||||
return raw;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,177 @@
|
|||
package at.procon.eventhub.service;
|
||||
|
||||
import at.procon.eventhub.dto.ImportCursorStateDto;
|
||||
import at.procon.eventhub.dto.ImportScopeDto;
|
||||
import at.procon.eventhub.dto.TachographExtractionBatchResultDto;
|
||||
import at.procon.eventhub.dto.TachographImportPlanItemDto;
|
||||
import at.procon.eventhub.dto.TachographImportRequest;
|
||||
import at.procon.eventhub.dto.TimeChunkDto;
|
||||
import at.procon.eventhub.persistence.ImportCursorRepository;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import org.apache.camel.ProducerTemplate;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.core.io.ResourceLoader;
|
||||
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StreamUtils;
|
||||
|
||||
@Service
|
||||
@ConditionalOnBean(name = "tachographNamedParameterJdbcTemplate")
|
||||
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''")
|
||||
public class JdbcTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor {
|
||||
|
||||
private final NamedParameterJdbcTemplate tachographJdbcTemplate;
|
||||
private final ProducerTemplate producerTemplate;
|
||||
private final ResourceLoader resourceLoader;
|
||||
private final TachographExtractionDefinitionRegistry definitionRegistry;
|
||||
private final ImportCursorRepository importCursorRepository;
|
||||
|
||||
public JdbcTachographExtractionBatchExecutor(
|
||||
@Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate tachographJdbcTemplate,
|
||||
ProducerTemplate producerTemplate,
|
||||
ResourceLoader resourceLoader,
|
||||
TachographExtractionDefinitionRegistry definitionRegistry,
|
||||
ImportCursorRepository importCursorRepository
|
||||
) {
|
||||
this.tachographJdbcTemplate = tachographJdbcTemplate;
|
||||
this.producerTemplate = producerTemplate;
|
||||
this.resourceLoader = resourceLoader;
|
||||
this.definitionRegistry = definitionRegistry;
|
||||
this.importCursorRepository = importCursorRepository;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TachographExtractionBatchResultDto execute(
|
||||
UUID importRunId,
|
||||
UUID packageId,
|
||||
int eventSourceId,
|
||||
TachographImportRequest request,
|
||||
TachographImportPlanItemDto planItem,
|
||||
TimeChunkDto chunk
|
||||
) {
|
||||
TachographExtractionDefinition definition = definitionRegistry.findByCode(planItem.extractionCode())
|
||||
.orElseThrow(() -> new IllegalArgumentException("No tachograph extraction definition for " + planItem.extractionCode()));
|
||||
|
||||
ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk);
|
||||
var packageInfo = new at.procon.eventhub.dto.EventHubPackageRequest(
|
||||
request.tenantKey(),
|
||||
eventSourceFor(request, planItem),
|
||||
request.sourceGroup(),
|
||||
chunkScope,
|
||||
planItem.eventFamily().name(),
|
||||
chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(),
|
||||
"TACHOGRAPH:" + planItem.sourceKind() + ":" + planItem.extractionCode() + ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence()
|
||||
);
|
||||
TachographExtractionContext context = new TachographExtractionContext(
|
||||
importRunId,
|
||||
packageId,
|
||||
eventSourceId,
|
||||
request,
|
||||
planItem,
|
||||
chunk,
|
||||
packageInfo.eventSource(),
|
||||
packageInfo
|
||||
);
|
||||
|
||||
String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
|
||||
ImportCursorStateDto cursor = importCursorRepository.findCursor(
|
||||
request.tenantKey(),
|
||||
eventSourceId,
|
||||
scopeHash,
|
||||
planItem.eventFamily(),
|
||||
planItem.sourceKind(),
|
||||
request.acquisitionStrategy()
|
||||
);
|
||||
|
||||
Map<String, Object> params = parameters(request, chunkScope, cursor);
|
||||
String sql = loadSql(definition.sqlResource());
|
||||
var events = tachographJdbcTemplate.query(sql, params, (rs, rowNum) -> definition.rowMapper().map(rs, rowNum, context));
|
||||
events.forEach(event -> producerTemplate.sendBody("direct:eventhub-normalized-input", event));
|
||||
|
||||
OffsetDateTime lastSourcePackageImportedAt = events.stream()
|
||||
.map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().importedIntoSourceAt())
|
||||
.filter(value -> value != null)
|
||||
.max(OffsetDateTime::compareTo)
|
||||
.orElse(cursor == null ? null : cursor.lastSourcePackageImportedAt());
|
||||
String lastSourcePackageId = events.stream()
|
||||
.filter(event -> event.sourcePackageRef() != null && event.sourcePackageRef().importedIntoSourceAt() != null)
|
||||
.max((left, right) -> left.sourcePackageRef().importedIntoSourceAt().compareTo(right.sourcePackageRef().importedIntoSourceAt()))
|
||||
.map(event -> event.sourcePackageRef().sourcePackageId())
|
||||
.orElse(cursor == null ? null : cursor.lastSourcePackageId());
|
||||
|
||||
return new TachographExtractionBatchResultDto(
|
||||
packageId,
|
||||
planItem.extractionCode(),
|
||||
planItem.sourceKind(),
|
||||
events.size(),
|
||||
events.size(),
|
||||
events.size(),
|
||||
0,
|
||||
true,
|
||||
lastSourcePackageImportedAt,
|
||||
lastSourcePackageId,
|
||||
null,
|
||||
chunk.occurredTo()
|
||||
);
|
||||
}
|
||||
|
||||
private Map<String, Object> parameters(TachographImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put("tenantKey", request.tenantKey());
|
||||
params.put("occurredFrom", scope == null ? null : scope.occurredFrom());
|
||||
params.put("occurredTo", scope == null ? null : scope.occurredTo());
|
||||
params.put("rootOrganisationId", scope == null || scope.rootSourceOrganisation() == null ? null : scope.rootSourceOrganisation().sourceEntityId());
|
||||
params.put("includeChildren", scope != null && scope.includeChildren());
|
||||
params.put("lastSourcePackageImportedAt", cursor == null ? null : cursor.lastSourcePackageImportedAt());
|
||||
params.put("lastSourcePackageId", cursor == null ? null : cursor.lastSourcePackageId());
|
||||
params.put("lastSourceRowUpdatedAt", cursor == null ? null : cursor.lastSourceRowUpdatedAt());
|
||||
params.put("lastOccurredTo", cursor == null ? null : cursor.lastOccurredTo());
|
||||
return params;
|
||||
}
|
||||
|
||||
private ImportScopeDto chunkScope(ImportScopeDto scope, TimeChunkDto chunk) {
|
||||
if (scope == null) {
|
||||
return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo());
|
||||
}
|
||||
return new ImportScopeDto(
|
||||
scope.type(),
|
||||
scope.rootSourceOrganisation(),
|
||||
scope.includeChildren(),
|
||||
chunk.occurredFrom(),
|
||||
chunk.occurredTo()
|
||||
);
|
||||
}
|
||||
|
||||
private at.procon.eventhub.dto.EventSourceDto eventSourceFor(TachographImportRequest request, TachographImportPlanItemDto planItem) {
|
||||
String sourceKey = switch (planItem.sourceKind()) {
|
||||
case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT";
|
||||
case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD";
|
||||
default -> request.eventSource().sourceKey();
|
||||
};
|
||||
return new at.procon.eventhub.dto.EventSourceDto(
|
||||
request.eventSource().providerKey(),
|
||||
planItem.sourceKind(),
|
||||
sourceKey,
|
||||
request.eventSource().sourceInstanceKey(),
|
||||
request.eventSource().tenantProviderSettingKey(),
|
||||
request.eventSource().externalFleetKey()
|
||||
);
|
||||
}
|
||||
|
||||
private String loadSql(String location) {
|
||||
Resource resource = resourceLoader.getResource(location);
|
||||
try (var inputStream = resource.getInputStream()) {
|
||||
return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Cannot load tachograph extraction SQL resource " + location, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -5,6 +5,8 @@ import at.procon.eventhub.dto.TachographImportPlanItemDto;
|
|||
import at.procon.eventhub.dto.TachographImportRequest;
|
||||
import at.procon.eventhub.dto.TimeChunkDto;
|
||||
import java.util.UUID;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
|
@ -17,6 +19,8 @@ import org.springframework.stereotype.Service;
|
|||
* direct:eventhub-normalized-input.
|
||||
*/
|
||||
@Service
|
||||
@ConditionalOnMissingBean(TachographExtractionBatchExecutor.class)
|
||||
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' == ''")
|
||||
public class NoopTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(NoopTachographExtractionBatchExecutor.class);
|
||||
|
|
@ -25,6 +29,7 @@ public class NoopTachographExtractionBatchExecutor implements TachographExtracti
|
|||
public TachographExtractionBatchResultDto execute(
|
||||
UUID importRunId,
|
||||
UUID packageId,
|
||||
int eventSourceId,
|
||||
TachographImportRequest request,
|
||||
TachographImportPlanItemDto planItem,
|
||||
TimeChunkDto chunk
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ public interface TachographExtractionBatchExecutor {
|
|||
TachographExtractionBatchResultDto execute(
|
||||
UUID importRunId,
|
||||
UUID packageId,
|
||||
int eventSourceId,
|
||||
TachographImportRequest request,
|
||||
TachographImportPlanItemDto planItem,
|
||||
TimeChunkDto chunk
|
||||
|
|
|
|||
|
|
@ -0,0 +1,20 @@
|
|||
package at.procon.eventhub.service;
|
||||
|
||||
import at.procon.eventhub.dto.EventHubPackageRequest;
|
||||
import at.procon.eventhub.dto.EventSourceDto;
|
||||
import at.procon.eventhub.dto.TachographImportPlanItemDto;
|
||||
import at.procon.eventhub.dto.TachographImportRequest;
|
||||
import at.procon.eventhub.dto.TimeChunkDto;
|
||||
import java.util.UUID;
|
||||
|
||||
public record TachographExtractionContext(
|
||||
UUID importRunId,
|
||||
UUID packageId,
|
||||
int eventSourceId,
|
||||
TachographImportRequest request,
|
||||
TachographImportPlanItemDto planItem,
|
||||
TimeChunkDto chunk,
|
||||
EventSourceDto eventSource,
|
||||
EventHubPackageRequest packageInfo
|
||||
) {
|
||||
}
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
package at.procon.eventhub.service;
|
||||
|
||||
import at.procon.eventhub.dto.EventFamily;
|
||||
|
||||
public record TachographExtractionDefinition(
|
||||
String code,
|
||||
EventFamily eventFamily,
|
||||
String sourceKind,
|
||||
String entityAxis,
|
||||
String sqlResource,
|
||||
TachographExtractionRowMapper rowMapper
|
||||
) {
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
package at.procon.eventhub.service;
|
||||
|
||||
import at.procon.eventhub.dto.EventFamily;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class TachographExtractionDefinitionRegistry {
|
||||
|
||||
private final Map<String, TachographExtractionDefinition> definitionsByCode;
|
||||
|
||||
public TachographExtractionDefinitionRegistry(
|
||||
CardActivityRowMapper cardActivityRowMapper,
|
||||
VuActivityRowMapper vuActivityRowMapper
|
||||
) {
|
||||
List<TachographExtractionDefinition> definitions = List.of(
|
||||
new TachographExtractionDefinition(
|
||||
"CARD_ACTIVITY",
|
||||
EventFamily.DRIVER_ACTIVITY,
|
||||
"DRIVER_CARD",
|
||||
"DRIVER",
|
||||
"classpath:sql/tachograph/card-activity.sql",
|
||||
cardActivityRowMapper
|
||||
),
|
||||
new TachographExtractionDefinition(
|
||||
"VU_ACTIVITY",
|
||||
EventFamily.DRIVER_ACTIVITY,
|
||||
"VEHICLE_UNIT",
|
||||
"VEHICLE",
|
||||
"classpath:sql/tachograph/vu-activity.sql",
|
||||
vuActivityRowMapper
|
||||
)
|
||||
);
|
||||
this.definitionsByCode = definitions.stream()
|
||||
.collect(Collectors.toUnmodifiableMap(definition -> normalize(definition.code()), Function.identity()));
|
||||
}
|
||||
|
||||
public Optional<TachographExtractionDefinition> findByCode(String code) {
|
||||
return Optional.ofNullable(definitionsByCode.get(normalize(code)));
|
||||
}
|
||||
|
||||
private String normalize(String value) {
|
||||
return value == null ? "" : value.trim().toUpperCase(Locale.ROOT);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package at.procon.eventhub.service;
|
||||
|
||||
import at.procon.eventhub.dto.EventHubEventDto;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
||||
public interface TachographExtractionRowMapper {
|
||||
|
||||
EventHubEventDto map(ResultSet rs, int rowNum, TachographExtractionContext context) throws SQLException;
|
||||
}
|
||||
|
|
@ -132,6 +132,7 @@ public class TachographImportExecutionService {
|
|||
TachographExtractionBatchResultDto result = extractionBatchExecutor.execute(
|
||||
importRunId,
|
||||
plannedPackage.packageId(),
|
||||
plannedPackage.eventSourceId(),
|
||||
request,
|
||||
plannedPackage.planItem(),
|
||||
plannedPackage.chunk()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,22 @@
|
|||
package at.procon.eventhub.service;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class VuActivityRowMapper extends AbstractTachographActivityRowMapper {
|
||||
|
||||
public VuActivityRowMapper(EventDetailsFactory detailsFactory) {
|
||||
super(detailsFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> sourceSpecificPayload(ResultSet rs) throws SQLException {
|
||||
Map<String, Object> raw = new LinkedHashMap<>();
|
||||
put(raw, "vuActivityId", string(rs, "vu_activity_id"));
|
||||
return raw;
|
||||
}
|
||||
}
|
||||
|
|
@ -33,6 +33,13 @@ eventhub:
|
|||
default-chunk-days: 1
|
||||
occurred-at-overlap: 7d
|
||||
|
||||
# Configure this block to enable JdbcTachographExtractionBatchExecutor.
|
||||
# datasource:
|
||||
# jdbc-url: jdbc:sqlserver://localhost:1433;databaseName=tachograph;encrypt=true;trustServerCertificate=true
|
||||
# username: tachograph_user
|
||||
# password: change-me
|
||||
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
|
||||
|
||||
# Enables the scheduler that regularly triggers configured tachograph import plans.
|
||||
scheduler-enabled: false
|
||||
scheduler-poll-interval-ms: 60000
|
||||
|
|
|
|||
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* CardActivity DRIVER_ACTIVITY extraction.
|
||||
*
|
||||
* Adapt table and column names to the concrete tachograph SQL Server schema.
|
||||
* Keep the selected aliases stable; CardActivityRowMapper consumes this alias contract.
|
||||
*/
|
||||
select
|
||||
cast(ca.Id as varchar(128)) as source_row_id,
|
||||
cast(ca.Id as varchar(128)) as card_activity_id,
|
||||
concat('TACHOGRAPH:CARD_ACTIVITY:', ca.Id) as external_source_event_id,
|
||||
|
||||
ca.ActivityTime as occurred_at,
|
||||
ca.ReceivedAt as received_partner_at,
|
||||
ca.Activity as activity_code,
|
||||
ca.ActivityText as activity_text,
|
||||
ca.EventType as event_type,
|
||||
ca.Lifecycle as lifecycle,
|
||||
ca.CardSlot as card_slot,
|
||||
ca.CardStatus as card_status,
|
||||
ca.DrivingStatus as driving_status,
|
||||
ca.OdometerM as odometer_m,
|
||||
|
||||
cast(ca.DriverId as varchar(128)) as driver_source_entity_id,
|
||||
ca.DriverCardNation as driver_card_nation,
|
||||
ca.DriverCardNumber as driver_card_number,
|
||||
|
||||
cast(ca.VehicleId as varchar(128)) as vehicle_source_entity_id,
|
||||
ca.VehicleVin as vehicle_vin,
|
||||
ca.VehicleRegistrationNation as vehicle_registration_nation,
|
||||
ca.VehicleRegistrationNumber as vehicle_registration_number,
|
||||
|
||||
'DRIVER_CARD' as source_package_kind,
|
||||
cast(ca.SourcePackageId as varchar(128)) as source_package_id,
|
||||
cast(ca.DriverId as varchar(128)) as source_package_entity_id,
|
||||
ca.SourcePackagePeriodFrom as source_package_period_from,
|
||||
ca.SourcePackagePeriodTo as source_package_period_to,
|
||||
ca.SourcePackageImportedAt as source_package_imported_at
|
||||
from CardActivity ca
|
||||
where (:occurredFrom is null or ca.ActivityTime >= :occurredFrom)
|
||||
and (:occurredTo is null or ca.ActivityTime < :occurredTo)
|
||||
and (
|
||||
:lastSourcePackageImportedAt is null
|
||||
or ca.SourcePackageImportedAt > :lastSourcePackageImportedAt
|
||||
or (ca.SourcePackageImportedAt = :lastSourcePackageImportedAt and cast(ca.SourcePackageId as varchar(128)) > :lastSourcePackageId)
|
||||
)
|
||||
/*
|
||||
* Organisation filtering is schema-specific. Once stable joins are known, add:
|
||||
* and (:rootOrganisationId is null or ...)
|
||||
* using :includeChildren to decide whether to match only root or descendants.
|
||||
*/
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* VUActivity DRIVER_ACTIVITY extraction.
|
||||
*
|
||||
* Adapt table and column names to the concrete tachograph SQL Server schema.
|
||||
* Keep the selected aliases stable; VuActivityRowMapper consumes this alias contract.
|
||||
*/
|
||||
select
|
||||
cast(va.Id as varchar(128)) as source_row_id,
|
||||
cast(va.Id as varchar(128)) as vu_activity_id,
|
||||
concat('TACHOGRAPH:VU_ACTIVITY:', va.Id) as external_source_event_id,
|
||||
|
||||
va.ActivityTime as occurred_at,
|
||||
va.ReceivedAt as received_partner_at,
|
||||
va.Activity as activity_code,
|
||||
va.ActivityText as activity_text,
|
||||
va.EventType as event_type,
|
||||
va.Lifecycle as lifecycle,
|
||||
va.CardSlot as card_slot,
|
||||
va.CardStatus as card_status,
|
||||
va.DrivingStatus as driving_status,
|
||||
va.OdometerM as odometer_m,
|
||||
|
||||
cast(va.DriverId as varchar(128)) as driver_source_entity_id,
|
||||
va.DriverCardNation as driver_card_nation,
|
||||
va.DriverCardNumber as driver_card_number,
|
||||
|
||||
cast(va.VehicleId as varchar(128)) as vehicle_source_entity_id,
|
||||
va.VehicleVin as vehicle_vin,
|
||||
va.VehicleRegistrationNation as vehicle_registration_nation,
|
||||
va.VehicleRegistrationNumber as vehicle_registration_number,
|
||||
|
||||
'VEHICLE_UNIT' as source_package_kind,
|
||||
cast(va.SourcePackageId as varchar(128)) as source_package_id,
|
||||
cast(va.VehicleId as varchar(128)) as source_package_entity_id,
|
||||
va.SourcePackagePeriodFrom as source_package_period_from,
|
||||
va.SourcePackagePeriodTo as source_package_period_to,
|
||||
va.SourcePackageImportedAt as source_package_imported_at
|
||||
from VUActivity va
|
||||
where (:occurredFrom is null or va.ActivityTime >= :occurredFrom)
|
||||
and (:occurredTo is null or va.ActivityTime < :occurredTo)
|
||||
and (
|
||||
:lastSourcePackageImportedAt is null
|
||||
or va.SourcePackageImportedAt > :lastSourcePackageImportedAt
|
||||
or (va.SourcePackageImportedAt = :lastSourcePackageImportedAt and cast(va.SourcePackageId as varchar(128)) > :lastSourcePackageId)
|
||||
)
|
||||
/*
|
||||
* Organisation filtering is schema-specific. Once stable joins are known, add:
|
||||
* and (:rootOrganisationId is null or ...)
|
||||
* using :includeChildren to decide whether to match only root or descendants.
|
||||
*/
|
||||
Loading…
Reference in New Issue