Generalize import scheduling and extraction

This commit is contained in:
trifonovt 2026-04-30 14:26:20 +02:00
parent 3e96308c3f
commit e3ffa56932
21 changed files with 734 additions and 376 deletions

View File

@ -0,0 +1,120 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import java.util.List;
import java.util.Locale;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
public abstract class AbstractConfiguredImportPlanService<R extends ImportRunRequest, D> {
private final Supplier<List<EventHubProperties.ConfiguredImportPlan>> planSupplier;
private final String providerName;
protected AbstractConfiguredImportPlanService(
Supplier<List<EventHubProperties.ConfiguredImportPlan>> planSupplier,
String providerName
) {
this.planSupplier = planSupplier;
this.providerName = providerName;
}
public List<D> listPlans() {
return planSupplier.get().stream()
.map(this::toDto)
.toList();
}
public D getPlan(String planKey) {
return planByKey(planKey);
}
public R createRequest(String planKey, ImportMode modeOverride, AcquisitionStrategy strategyOverride) {
EventHubProperties.ConfiguredImportPlan plan = rawPlanByKey(planKey);
return createRequest(plan, modeOverride, strategyOverride, false);
}
public R createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) {
return createRequest(plan, plan.getScheduledMode(), plan.getScheduledStrategy(), false);
}
public R createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) {
return createRequest(plan, plan.getInitialMode(), plan.getInitialStrategy(), true);
}
protected abstract D toDto(EventHubProperties.ConfiguredImportPlan plan);
protected abstract R buildRequest(
EventHubProperties.ConfiguredImportPlan plan,
ImportMode mode,
AcquisitionStrategy strategy,
ImportScopeDto scope
);
private R createRequest(
EventHubProperties.ConfiguredImportPlan plan,
ImportMode modeOverride,
AcquisitionStrategy strategyOverride,
boolean applyInitialOccurredWindow
) {
ImportMode mode = modeOverride == null ? plan.getScheduledMode() : modeOverride;
AcquisitionStrategy strategy = strategyOverride == null
? (mode == ImportMode.INCREMENTAL_UPDATE ? plan.getScheduledStrategy() : plan.getInitialStrategy())
: strategyOverride;
return buildRequest(plan, mode, strategy, scopedForRequest(plan, applyInitialOccurredWindow));
}
private ImportScopeDto scopedForRequest(EventHubProperties.ConfiguredImportPlan plan, boolean applyInitialOccurredWindow) {
ImportScopeDto scope = plan.getImportScope();
if (applyInitialOccurredWindow && scope != null
&& (plan.getInitialOccurredFrom() != null || plan.getInitialOccurredTo() != null)) {
return new ImportScopeDto(
scope.type(),
scope.rootSourceOrganisation(),
scope.includeChildren(),
plan.getInitialOccurredFrom() == null ? scope.occurredFrom() : plan.getInitialOccurredFrom(),
plan.getInitialOccurredTo() == null ? scope.occurredTo() : plan.getInitialOccurredTo()
);
}
return scope;
}
private D planByKey(String planKey) {
return toDto(rawPlanByKey(planKey));
}
private EventHubProperties.ConfiguredImportPlan rawPlanByKey(String planKey) {
return planSupplier.get().stream()
.filter(plan -> normalize(plan.getPlanKey()).equals(normalize(planKey)))
.findFirst()
.orElseThrow(() -> new NoSuchElementException("No configured " + providerName + " import plan found for key " + planKey));
}
protected ConfiguredImportPlanDto genericDto(EventHubProperties.ConfiguredImportPlan plan) {
return new ConfiguredImportPlanDto(
plan.getPlanKey(),
plan.isEnabled(),
plan.getCron(),
plan.getTenantKey(),
plan.getEventSource(),
plan.getSourceGroup(),
plan.getImportScope(),
plan.getEventFamilies(),
plan.getInitialMode(),
plan.getScheduledMode(),
plan.getInitialStrategy(),
plan.getScheduledStrategy(),
plan.isRefreshMasterDataFirst(),
plan.getInitialOccurredFrom(),
plan.getInitialOccurredTo(),
plan.isRunInitialOnStartup()
);
}
private String normalize(String value) {
return value == null ? "" : value.trim().toLowerCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,95 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.SchedulerTriggerMode;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.support.CronExpression;
public abstract class AbstractImportScheduler<R extends ImportRunRequest> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<String, ZonedDateTime> nextRunByPlan = new ConcurrentHashMap<>();
private final Map<String, AtomicBoolean> runningByPlan = new ConcurrentHashMap<>();
protected void triggerInitialPlansOnStartup() {
if (!schedulerEnabled()) {
return;
}
for (EventHubProperties.ConfiguredImportPlan plan : configuredPlans()) {
if (plan.isEnabled() && hasPlanKey(plan) && plan.isRunInitialOnStartup()) {
triggerPlan(plan, true, schedulerTriggerMode());
}
}
}
protected void pollConfiguredPlans() {
if (!schedulerEnabled()) {
return;
}
ZonedDateTime now = ZonedDateTime.now(ZoneId.systemDefault());
for (EventHubProperties.ConfiguredImportPlan plan : configuredPlans()) {
if (!plan.isEnabled() || !hasPlanKey(plan) || plan.getCron() == null || plan.getCron().isBlank()) {
continue;
}
String key = plan.getPlanKey();
ZonedDateTime next = nextRunByPlan.computeIfAbsent(key, ignored -> CronExpression.parse(plan.getCron()).next(now));
if (next != null && !next.isAfter(now)) {
triggerPlan(plan, false, schedulerTriggerMode());
nextRunByPlan.put(key, CronExpression.parse(plan.getCron()).next(now.plusSeconds(1)));
}
}
}
protected abstract boolean schedulerEnabled();
protected abstract List<EventHubProperties.ConfiguredImportPlan> configuredPlans();
protected abstract SchedulerTriggerMode schedulerTriggerMode();
protected abstract R createInitialRequest(EventHubProperties.ConfiguredImportPlan plan);
protected abstract R createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan);
protected abstract void startImport(R request);
protected abstract void startAndExecuteImport(R request);
protected String providerName() {
return "provider";
}
private void triggerPlan(EventHubProperties.ConfiguredImportPlan plan, boolean initial, SchedulerTriggerMode triggerMode) {
String key = plan.getPlanKey();
AtomicBoolean running = runningByPlan.computeIfAbsent(key, ignored -> new AtomicBoolean(false));
if (!running.compareAndSet(false, true)) {
log.info("Skipping {} import plan={} because a previous run is still active", providerName(), key);
return;
}
try {
R request = initial ? createInitialRequest(plan) : createScheduledRequest(plan);
log.info("Triggering {} import plan={} initial={} mode={} strategy={} triggerMode={} at={}",
providerName(), key, initial, request.mode(), request.acquisitionStrategy(), triggerMode, OffsetDateTime.now());
if (triggerMode == SchedulerTriggerMode.EXECUTE) {
startAndExecuteImport(request);
} else {
startImport(request);
}
} catch (RuntimeException ex) {
log.error("{} import plan={} failed to trigger", providerName(), key, ex);
} finally {
running.set(false);
}
}
private boolean hasPlanKey(EventHubProperties.ConfiguredImportPlan plan) {
return plan.getPlanKey() != null && !plan.getPlanKey().isBlank();
}
}

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.SourceGroupRefDto;
import java.time.OffsetDateTime;
import java.util.Set;
public record ConfiguredImportPlanDto(
String planKey,
boolean enabled,
String cron,
String tenantKey,
EventSourceDto eventSource,
SourceGroupRefDto sourceGroup,
ImportScopeDto importScope,
Set<EventFamily> eventFamilies,
ImportMode initialMode,
ImportMode scheduledMode,
AcquisitionStrategy initialStrategy,
AcquisitionStrategy scheduledStrategy,
boolean refreshMasterDataFirst,
OffsetDateTime initialOccurredFrom,
OffsetDateTime initialOccurredTo,
boolean runInitialOnStartup
) {
}

View File

@ -0,0 +1,42 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import org.springframework.stereotype.Component;
@Component
public class ImportChunkPlanner {
public List<ImportTimeChunkDto> chunksFor(ImportRunRequest request, int defaultChunkDays) {
ImportScopeDto scope = request.importScope();
OffsetDateTime from = scope == null ? null : scope.occurredFrom();
OffsetDateTime to = scope == null ? null : scope.occurredTo();
if (request.mode() == ImportMode.INCREMENTAL_UPDATE
&& request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) {
return List.of(new ImportTimeChunkDto(1, from, to));
}
if (from == null || to == null) {
return List.of(new ImportTimeChunkDto(1, from, to));
}
List<ImportTimeChunkDto> chunks = new ArrayList<>();
int days = Math.max(1, defaultChunkDays);
OffsetDateTime cursor = from;
int sequence = 1;
while (cursor.isBefore(to)) {
OffsetDateTime next = cursor.plusDays(days);
if (next.isAfter(to)) {
next = to;
}
chunks.add(new ImportTimeChunkDto(sequence++, cursor, next));
cursor = next;
}
return chunks.isEmpty() ? List.of(new ImportTimeChunkDto(1, from, to)) : chunks;
}
}

View File

@ -0,0 +1,203 @@
package at.procon.eventhub.importing.extraction;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.importing.ExtractionBatchResult;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportRunRequest;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.camel.ProducerTemplate;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.util.StreamUtils;
public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunRequest, B extends ExtractionBatchResult>
implements ExtractionBatchExecutor<R, B> {
private final NamedParameterJdbcTemplate jdbcTemplate;
private final ProducerTemplate producerTemplate;
private final ResourceLoader resourceLoader;
private final ImportCursorRepository importCursorRepository;
protected AbstractJdbcExtractionBatchExecutor(
NamedParameterJdbcTemplate jdbcTemplate,
ProducerTemplate producerTemplate,
ResourceLoader resourceLoader,
ImportCursorRepository importCursorRepository
) {
this.jdbcTemplate = jdbcTemplate;
this.producerTemplate = producerTemplate;
this.resourceLoader = resourceLoader;
this.importCursorRepository = importCursorRepository;
}
@Override
public B execute(
UUID importRunId,
UUID packageId,
int eventSourceId,
R request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
) {
ExtractionDefinition<R> definition = findDefinition(planItem.extractionCode())
.orElseThrow(() -> new IllegalArgumentException("No extraction definition for " + planItem.extractionCode()));
ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk);
EventHubPackageRequest packageInfo = packageInfo(importRunId, request, planItem, chunk, chunkScope);
ExtractionContext<R> context = new ExtractionContext<>(
importRunId,
packageId,
eventSourceId,
request,
planItem,
chunk,
packageInfo.eventSource(),
packageInfo
);
String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
ImportCursorStateDto cursor = importCursorRepository.findCursor(
request.tenantKey(),
eventSourceId,
scopeHash,
planItem.eventFamily(),
planItem.sourceKind(),
request.acquisitionStrategy()
);
Map<String, Object> params = parameters(request, chunkScope, cursor);
String sql = loadSql(definition.sqlResource());
List<EventHubEventDto> events = jdbcTemplate.query(sql, params, (rs, rowNum) -> definition.rowMapper().map(rs, rowNum, context));
events.forEach(event -> producerTemplate.sendBody(normalizedInputUri(), event));
return resultFor(packageId, planItem, chunk, cursor, events);
}
protected abstract Optional<ExtractionDefinition<R>> findDefinition(String code);
protected abstract EventSourceDto eventSourceFor(R request, ImportPlanItemDto planItem);
protected abstract B resultFor(
UUID packageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
ImportCursorStateDto cursor,
List<EventHubEventDto> events
);
protected String providerPackagePrefix() {
return "SOURCE";
}
protected String normalizedInputUri() {
return "direct:eventhub-normalized-input";
}
protected Map<String, Object> parameters(R request, ImportScopeDto scope, ImportCursorStateDto cursor) {
Map<String, Object> params = new HashMap<>();
params.put("tenantKey", request.tenantKey());
params.put("occurredFrom", scope == null ? null : scope.occurredFrom());
params.put("occurredTo", scope == null ? null : scope.occurredTo());
params.put("rootOrganisationId", scope == null || scope.rootSourceOrganisation() == null ? null : scope.rootSourceOrganisation().sourceEntityId());
params.put("includeChildren", scope != null && scope.includeChildren());
params.put("lastSourcePackageImportedAt", cursor == null ? null : cursor.lastSourcePackageImportedAt());
params.put("lastSourcePackageId", cursor == null ? null : cursor.lastSourcePackageId());
params.put("lastSourceRowUpdatedAt", cursor == null ? null : cursor.lastSourceRowUpdatedAt());
params.put("lastOccurredTo", cursor == null ? null : cursor.lastOccurredTo());
return params;
}
protected OffsetDateTime lastSourcePackageImportedAt(List<EventHubEventDto> events, ImportCursorStateDto cursor) {
return events.stream()
.map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().importedIntoSourceAt())
.filter(value -> value != null)
.max(OffsetDateTime::compareTo)
.orElse(cursor == null ? null : cursor.lastSourcePackageImportedAt());
}
protected String lastSourcePackageId(List<EventHubEventDto> events, ImportCursorStateDto cursor) {
return events.stream()
.filter(event -> event.sourcePackageRef() != null && event.sourcePackageRef().importedIntoSourceAt() != null)
.max((left, right) -> left.sourcePackageRef().importedIntoSourceAt().compareTo(right.sourcePackageRef().importedIntoSourceAt()))
.map(event -> event.sourcePackageRef().sourcePackageId())
.orElseGet(() -> events.stream()
.map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId())
.filter(value -> value != null && !value.isBlank())
.max(this::compareSourcePackageId)
.orElse(cursor == null ? null : cursor.lastSourcePackageId()));
}
private EventHubPackageRequest packageInfo(
UUID importRunId,
R request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
ImportScopeDto chunkScope
) {
return new EventHubPackageRequest(
request.tenantKey(),
eventSourceFor(request, planItem),
request.sourceGroup(),
chunkScope,
planItem.eventFamily().name(),
chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(),
providerPackagePrefix() + ":" + planItem.sourceKind() + ":" + planItem.extractionCode() + ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence()
);
}
private ImportScopeDto chunkScope(ImportScopeDto scope, ImportTimeChunkDto chunk) {
if (scope == null) {
return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo());
}
return new ImportScopeDto(
scope.type(),
scope.rootSourceOrganisation(),
scope.includeChildren(),
chunk.occurredFrom(),
chunk.occurredTo()
);
}
private String loadSql(String location) {
Resource resource = resourceLoader.getResource(location);
try (var inputStream = resource.getInputStream()) {
return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IllegalStateException("Cannot load extraction SQL resource " + location, e);
}
}
private int compareSourcePackageId(String left, String right) {
Integer leftInt = parseInteger(left);
Integer rightInt = parseInteger(right);
if (leftInt != null && rightInt != null) {
return leftInt.compareTo(rightInt);
}
return left.compareTo(right);
}
private Integer parseInteger(String value) {
if (value == null || value.isBlank()) {
return null;
}
try {
return Integer.parseInt(value.trim());
} catch (NumberFormatException ignored) {
return null;
}
}
}

View File

@ -0,0 +1,35 @@
package at.procon.eventhub.importing.extraction;
import at.procon.eventhub.importing.ExtractionBatchResult;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportRunRequest;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractNoopExtractionBatchExecutor<R extends ImportRunRequest, B extends ExtractionBatchResult>
implements ExtractionBatchExecutor<R, B> {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public B execute(
UUID importRunId,
UUID packageId,
int eventSourceId,
R request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
) {
log.warn("No concrete {} SQL extractor configured. importRunId={} packageId={} extractionCode={} sourceKind={} chunk={}",
providerName(), importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), chunk.sequence());
return emptyResult(packageId, planItem, chunk);
}
protected abstract B emptyResult(UUID packageId, ImportPlanItemDto planItem, ImportTimeChunkDto chunk);
protected String providerName() {
return "provider";
}
}

View File

@ -0,0 +1,19 @@
package at.procon.eventhub.importing.extraction;
import at.procon.eventhub.importing.ExtractionBatchResult;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportRunRequest;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import java.util.UUID;
public interface ExtractionBatchExecutor<R extends ImportRunRequest, B extends ExtractionBatchResult> {
B execute(
UUID importRunId,
UUID packageId,
int eventSourceId,
R request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk
);
}

View File

@ -1,17 +1,17 @@
package at.procon.eventhub.tachograph.service; package at.procon.eventhub.importing.extraction;
import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportRunRequest;
import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.util.UUID; import java.util.UUID;
public record TachographExtractionContext( public record ExtractionContext<R extends ImportRunRequest>(
UUID importRunId, UUID importRunId,
UUID packageId, UUID packageId,
int eventSourceId, int eventSourceId,
TachographImportRequest request, R request,
ImportPlanItemDto planItem, ImportPlanItemDto planItem,
ImportTimeChunkDto chunk, ImportTimeChunkDto chunk,
EventSourceDto eventSource, EventSourceDto eventSource,

View File

@ -0,0 +1,14 @@
package at.procon.eventhub.importing.extraction;
import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.importing.ImportRunRequest;
public record ExtractionDefinition<R extends ImportRunRequest>(
String code,
EventFamily eventFamily,
String sourceKind,
String entityAxis,
String sqlResource,
ExtractionRowMapper<R> rowMapper
) {
}

View File

@ -0,0 +1,27 @@
package at.procon.eventhub.importing.extraction;
import at.procon.eventhub.importing.ImportRunRequest;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ExtractionDefinitionRegistry<R extends ImportRunRequest> {
private final Map<String, ExtractionDefinition<R>> definitionsByCode;
protected ExtractionDefinitionRegistry(List<ExtractionDefinition<R>> definitions) {
this.definitionsByCode = definitions.stream()
.collect(Collectors.toUnmodifiableMap(definition -> normalize(definition.code()), Function.identity()));
}
public Optional<ExtractionDefinition<R>> findByCode(String code) {
return Optional.ofNullable(definitionsByCode.get(normalize(code)));
}
private String normalize(String value) {
return value == null ? "" : value.trim().toUpperCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,11 @@
package at.procon.eventhub.importing.extraction;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.importing.ImportRunRequest;
import java.sql.ResultSet;
import java.sql.SQLException;
public interface ExtractionRowMapper<R extends ImportRunRequest> {
EventHubEventDto map(ResultSet rs, int rowNum, ExtractionContext<R> context) throws SQLException;
}

View File

@ -12,7 +12,10 @@ import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.SourcePackageRefDto; import at.procon.eventhub.dto.SourcePackageRefDto;
import at.procon.eventhub.dto.VehicleRefDto; import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto; import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.importing.extraction.ExtractionContext;
import at.procon.eventhub.importing.extraction.ExtractionRowMapper;
import at.procon.eventhub.service.EventDetailsFactory; import at.procon.eventhub.service.EventDetailsFactory;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
@ -23,7 +26,7 @@ import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
abstract class AbstractTachographActivityRowMapper implements TachographExtractionRowMapper { abstract class AbstractTachographActivityRowMapper implements ExtractionRowMapper<TachographImportRequest> {
private final EventDetailsFactory detailsFactory; private final EventDetailsFactory detailsFactory;
@ -32,7 +35,7 @@ abstract class AbstractTachographActivityRowMapper implements TachographExtracti
} }
@Override @Override
public EventHubEventDto map(ResultSet rs, int rowNum, TachographExtractionContext context) throws SQLException { public EventHubEventDto map(ResultSet rs, int rowNum, ExtractionContext<TachographImportRequest> context) throws SQLException {
OffsetDateTime occurredAt = offsetDateTime(rs, "occurred_at"); OffsetDateTime occurredAt = offsetDateTime(rs, "occurred_at");
SourcePackageRefDto sourcePackageRef = sourcePackageRef(rs); SourcePackageRefDto sourcePackageRef = sourcePackageRef(rs);
DriverRefDto driverRef = driverRef(rs); DriverRefDto driverRef = driverRef(rs);
@ -97,7 +100,7 @@ abstract class AbstractTachographActivityRowMapper implements TachographExtracti
); );
} }
private Map<String, Object> payload(ResultSet rs, TachographExtractionContext context) throws SQLException { private Map<String, Object> payload(ResultSet rs, ExtractionContext<TachographImportRequest> context) throws SQLException {
Map<String, Object> raw = new LinkedHashMap<>(); Map<String, Object> raw = new LinkedHashMap<>();
raw.put("extractionCode", context.planItem().extractionCode()); raw.put("extractionCode", context.planItem().extractionCode());
raw.put("sourceKind", context.planItem().sourceKind()); raw.put("sourceKind", context.planItem().sourceKind());
@ -122,7 +125,7 @@ abstract class AbstractTachographActivityRowMapper implements TachographExtracti
} }
private String defaultExternalSourceEventId( private String defaultExternalSourceEventId(
TachographExtractionContext context, ExtractionContext<TachographImportRequest> context,
int rowNum, int rowNum,
OffsetDateTime occurredAt, OffsetDateTime occurredAt,
SourcePackageRefDto sourcePackageRef, SourcePackageRefDto sourcePackageRef,

View File

@ -1,38 +1,34 @@
package at.procon.eventhub.tachograph.service; package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportCursorStateDto; import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.extraction.AbstractJdbcExtractionBatchExecutor;
import at.procon.eventhub.importing.extraction.ExtractionDefinition;
import at.procon.eventhub.importing.persistence.ImportCursorRepository; import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest; import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.io.IOException; import java.util.List;
import java.nio.charset.StandardCharsets; import java.util.Optional;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.apache.camel.ProducerTemplate; import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader; import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StreamUtils;
@Service @Service
@ConditionalOnBean(name = "tachographNamedParameterJdbcTemplate") @ConditionalOnBean(name = "tachographNamedParameterJdbcTemplate")
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''") @ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''")
public class JdbcTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor { public class JdbcTachographExtractionBatchExecutor
extends AbstractJdbcExtractionBatchExecutor<TachographImportRequest, TachographExtractionBatchResultDto>
implements TachographExtractionBatchExecutor {
private final NamedParameterJdbcTemplate tachographJdbcTemplate;
private final ProducerTemplate producerTemplate;
private final ResourceLoader resourceLoader;
private final TachographExtractionDefinitionRegistry definitionRegistry; private final TachographExtractionDefinitionRegistry definitionRegistry;
private final ImportCursorRepository importCursorRepository;
public JdbcTachographExtractionBatchExecutor( public JdbcTachographExtractionBatchExecutor(
@Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate tachographJdbcTemplate, @Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate tachographJdbcTemplate,
@ -41,76 +37,23 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti
TachographExtractionDefinitionRegistry definitionRegistry, TachographExtractionDefinitionRegistry definitionRegistry,
ImportCursorRepository importCursorRepository ImportCursorRepository importCursorRepository
) { ) {
this.tachographJdbcTemplate = tachographJdbcTemplate; super(tachographJdbcTemplate, producerTemplate, resourceLoader, importCursorRepository);
this.producerTemplate = producerTemplate;
this.resourceLoader = resourceLoader;
this.definitionRegistry = definitionRegistry; this.definitionRegistry = definitionRegistry;
this.importCursorRepository = importCursorRepository;
} }
@Override @Override
public TachographExtractionBatchResultDto execute( protected Optional<ExtractionDefinition<TachographImportRequest>> findDefinition(String code) {
UUID importRunId, return definitionRegistry.findByCode(code);
}
@Override
protected TachographExtractionBatchResultDto resultFor(
UUID packageId, UUID packageId,
int eventSourceId,
TachographImportRequest request,
ImportPlanItemDto planItem, ImportPlanItemDto planItem,
ImportTimeChunkDto chunk ImportTimeChunkDto chunk,
ImportCursorStateDto cursor,
List<EventHubEventDto> events
) { ) {
TachographExtractionDefinition definition = definitionRegistry.findByCode(planItem.extractionCode())
.orElseThrow(() -> new IllegalArgumentException("No tachograph extraction definition for " + planItem.extractionCode()));
ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk);
var packageInfo = new at.procon.eventhub.dto.EventHubPackageRequest(
request.tenantKey(),
eventSourceFor(request, planItem),
request.sourceGroup(),
chunkScope,
planItem.eventFamily().name(),
chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(),
"TACHOGRAPH:" + planItem.sourceKind() + ":" + planItem.extractionCode() + ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence()
);
TachographExtractionContext context = new TachographExtractionContext(
importRunId,
packageId,
eventSourceId,
request,
planItem,
chunk,
packageInfo.eventSource(),
packageInfo
);
String scopeHash = request.importScope() == null ? "NO_SCOPE" : request.importScope().stableKey();
ImportCursorStateDto cursor = importCursorRepository.findCursor(
request.tenantKey(),
eventSourceId,
scopeHash,
planItem.eventFamily(),
planItem.sourceKind(),
request.acquisitionStrategy()
);
Map<String, Object> params = parameters(request, chunkScope, cursor);
String sql = loadSql(definition.sqlResource());
var events = tachographJdbcTemplate.query(sql, params, (rs, rowNum) -> definition.rowMapper().map(rs, rowNum, context));
events.forEach(event -> producerTemplate.sendBody("direct:eventhub-normalized-input", event));
OffsetDateTime lastSourcePackageImportedAt = events.stream()
.map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().importedIntoSourceAt())
.filter(value -> value != null)
.max(OffsetDateTime::compareTo)
.orElse(cursor == null ? null : cursor.lastSourcePackageImportedAt());
String lastSourcePackageId = events.stream()
.filter(event -> event.sourcePackageRef() != null && event.sourcePackageRef().importedIntoSourceAt() != null)
.max((left, right) -> left.sourcePackageRef().importedIntoSourceAt().compareTo(right.sourcePackageRef().importedIntoSourceAt()))
.map(event -> event.sourcePackageRef().sourcePackageId())
.orElseGet(() -> events.stream()
.map(event -> event.sourcePackageRef() == null ? null : event.sourcePackageRef().sourcePackageId())
.filter(value -> value != null && !value.isBlank())
.max(this::compareSourcePackageId)
.orElse(cursor == null ? null : cursor.lastSourcePackageId()));
return new TachographExtractionBatchResultDto( return new TachographExtractionBatchResultDto(
packageId, packageId,
planItem.extractionCode(), planItem.extractionCode(),
@ -120,47 +63,21 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti
events.size(), events.size(),
0, 0,
true, true,
lastSourcePackageImportedAt, lastSourcePackageImportedAt(events, cursor),
lastSourcePackageId, lastSourcePackageId(events, cursor),
null, null,
chunk.occurredTo() chunk.occurredTo()
); );
} }
private Map<String, Object> parameters(TachographImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) { @Override
Map<String, Object> params = new HashMap<>(); protected EventSourceDto eventSourceFor(TachographImportRequest request, ImportPlanItemDto planItem) {
params.put("tenantKey", request.tenantKey());
params.put("occurredFrom", scope == null ? null : scope.occurredFrom());
params.put("occurredTo", scope == null ? null : scope.occurredTo());
params.put("rootOrganisationId", scope == null || scope.rootSourceOrganisation() == null ? null : scope.rootSourceOrganisation().sourceEntityId());
params.put("includeChildren", scope != null && scope.includeChildren());
params.put("lastSourcePackageImportedAt", cursor == null ? null : cursor.lastSourcePackageImportedAt());
params.put("lastSourcePackageId", cursor == null ? null : cursor.lastSourcePackageId());
params.put("lastSourceRowUpdatedAt", cursor == null ? null : cursor.lastSourceRowUpdatedAt());
params.put("lastOccurredTo", cursor == null ? null : cursor.lastOccurredTo());
return params;
}
private ImportScopeDto chunkScope(ImportScopeDto scope, ImportTimeChunkDto chunk) {
if (scope == null) {
return ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo());
}
return new ImportScopeDto(
scope.type(),
scope.rootSourceOrganisation(),
scope.includeChildren(),
chunk.occurredFrom(),
chunk.occurredTo()
);
}
private at.procon.eventhub.dto.EventSourceDto eventSourceFor(TachographImportRequest request, ImportPlanItemDto planItem) {
String sourceKey = switch (planItem.sourceKind()) { String sourceKey = switch (planItem.sourceKind()) {
case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT"; case "VEHICLE_UNIT" -> "TACHOGRAPH_VEHICLE_UNIT";
case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD"; case "DRIVER_CARD" -> "TACHOGRAPH_DRIVER_CARD";
default -> request.eventSource().sourceKey(); default -> request.eventSource().sourceKey();
}; };
return new at.procon.eventhub.dto.EventSourceDto( return new EventSourceDto(
request.eventSource().providerKey(), request.eventSource().providerKey(),
planItem.sourceKind(), planItem.sourceKind(),
sourceKey, sourceKey,
@ -170,32 +87,8 @@ public class JdbcTachographExtractionBatchExecutor implements TachographExtracti
); );
} }
private String loadSql(String location) { @Override
Resource resource = resourceLoader.getResource(location); protected String providerPackagePrefix() {
try (var inputStream = resource.getInputStream()) { return "TACHOGRAPH";
return StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IllegalStateException("Cannot load tachograph extraction SQL resource " + location, e);
}
}
private int compareSourcePackageId(String left, String right) {
Integer leftInt = parseInteger(left);
Integer rightInt = parseInteger(right);
if (leftInt != null && rightInt != null) {
return leftInt.compareTo(rightInt);
}
return left.compareTo(right);
}
private Integer parseInteger(String value) {
if (value == null || value.isBlank()) {
return null;
}
try {
return Integer.parseInt(value.trim());
} catch (NumberFormatException ignored) {
return null;
}
} }
} }

View File

@ -2,13 +2,12 @@ package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.extraction.AbstractNoopExtractionBatchExecutor;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest; import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.util.UUID; import java.util.UUID;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
@ -21,21 +20,16 @@ import org.springframework.stereotype.Service;
@Service @Service
@ConditionalOnMissingBean(TachographExtractionBatchExecutor.class) @ConditionalOnMissingBean(TachographExtractionBatchExecutor.class)
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' == ''") @ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' == ''")
public class NoopTachographExtractionBatchExecutor implements TachographExtractionBatchExecutor { public class NoopTachographExtractionBatchExecutor
extends AbstractNoopExtractionBatchExecutor<TachographImportRequest, TachographExtractionBatchResultDto>
private static final Logger log = LoggerFactory.getLogger(NoopTachographExtractionBatchExecutor.class); implements TachographExtractionBatchExecutor {
@Override @Override
public TachographExtractionBatchResultDto execute( protected TachographExtractionBatchResultDto emptyResult(
UUID importRunId,
UUID packageId, UUID packageId,
int eventSourceId,
TachographImportRequest request,
ImportPlanItemDto planItem, ImportPlanItemDto planItem,
ImportTimeChunkDto chunk ImportTimeChunkDto chunk
) { ) {
log.warn("No concrete tachograph SQL extractor configured. importRunId={} packageId={} extractionCode={} sourceKind={} chunk={}",
importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), chunk.sequence());
return new TachographExtractionBatchResultDto( return new TachographExtractionBatchResultDto(
packageId, packageId,
planItem.extractionCode(), planItem.extractionCode(),
@ -51,4 +45,9 @@ public class NoopTachographExtractionBatchExecutor implements TachographExtracti
null null
); );
} }
@Override
protected String providerName() {
return "tachograph";
}
} }

View File

@ -2,75 +2,29 @@ package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.tachograph.dto.ConfiguredTachographImportPlanDto;
import at.procon.eventhub.dto.ImportMode; import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.importing.AbstractConfiguredImportPlanService;
import at.procon.eventhub.importing.ConfiguredImportPlanDto;
import at.procon.eventhub.tachograph.dto.ConfiguredTachographImportPlanDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest; import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.util.List;
import java.util.Locale;
import java.util.NoSuchElementException;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
public class TachographConfiguredImportPlanService { public class TachographConfiguredImportPlanService
extends AbstractConfiguredImportPlanService<TachographImportRequest, ConfiguredTachographImportPlanDto> {
private final EventHubProperties properties;
public TachographConfiguredImportPlanService(EventHubProperties properties) { public TachographConfiguredImportPlanService(EventHubProperties properties) {
this.properties = properties; super(() -> properties.getTachograph().getImportPlans(), "tachograph");
} }
public List<ConfiguredTachographImportPlanDto> listPlans() { @Override
return properties.getTachograph().getImportPlans().stream() protected TachographImportRequest buildRequest(
.map(this::toDto)
.toList();
}
public ConfiguredTachographImportPlanDto getPlan(String planKey) {
return properties.getTachograph().getImportPlans().stream()
.filter(plan -> normalize(plan.getPlanKey()).equals(normalize(planKey)))
.findFirst()
.map(this::toDto)
.orElseThrow(() -> new NoSuchElementException("No configured tachograph import plan found for key " + planKey));
}
public TachographImportRequest createRequest(String planKey, ImportMode modeOverride, AcquisitionStrategy strategyOverride) {
EventHubProperties.ConfiguredImportPlan plan = properties.getTachograph().getImportPlans().stream()
.filter(candidate -> normalize(candidate.getPlanKey()).equals(normalize(planKey)))
.findFirst()
.orElseThrow(() -> new NoSuchElementException("No configured tachograph import plan found for key " + planKey));
return createRequest(plan, modeOverride, strategyOverride, false);
}
public TachographImportRequest createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) {
return createRequest(plan, plan.getScheduledMode(), plan.getScheduledStrategy(), false);
}
public TachographImportRequest createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) {
return createRequest(plan, plan.getInitialMode(), plan.getInitialStrategy(), true);
}
private TachographImportRequest createRequest(
EventHubProperties.ConfiguredImportPlan plan, EventHubProperties.ConfiguredImportPlan plan,
ImportMode modeOverride, ImportMode mode,
AcquisitionStrategy strategyOverride, AcquisitionStrategy strategy,
boolean applyInitialOccurredWindow ImportScopeDto scope
) { ) {
ImportMode mode = modeOverride == null ? plan.getScheduledMode() : modeOverride;
AcquisitionStrategy strategy = strategyOverride == null
? (mode == ImportMode.INCREMENTAL_UPDATE ? plan.getScheduledStrategy() : plan.getInitialStrategy())
: strategyOverride;
ImportScopeDto scope = plan.getImportScope();
if (applyInitialOccurredWindow && scope != null
&& (plan.getInitialOccurredFrom() != null || plan.getInitialOccurredTo() != null)) {
scope = new ImportScopeDto(
scope.type(),
scope.rootSourceOrganisation(),
scope.includeChildren(),
plan.getInitialOccurredFrom() == null ? scope.occurredFrom() : plan.getInitialOccurredFrom(),
plan.getInitialOccurredTo() == null ? scope.occurredTo() : plan.getInitialOccurredTo()
);
}
return new TachographImportRequest( return new TachographImportRequest(
plan.getTenantKey(), plan.getTenantKey(),
plan.getEventSource(), plan.getEventSource(),
@ -83,28 +37,26 @@ public class TachographConfiguredImportPlanService {
); );
} }
private ConfiguredTachographImportPlanDto toDto(EventHubProperties.ConfiguredImportPlan plan) { @Override
protected ConfiguredTachographImportPlanDto toDto(EventHubProperties.ConfiguredImportPlan plan) {
ConfiguredImportPlanDto dto = genericDto(plan);
return new ConfiguredTachographImportPlanDto( return new ConfiguredTachographImportPlanDto(
plan.getPlanKey(), dto.planKey(),
plan.isEnabled(), dto.enabled(),
plan.getCron(), dto.cron(),
plan.getTenantKey(), dto.tenantKey(),
plan.getEventSource(), dto.eventSource(),
plan.getSourceGroup(), dto.sourceGroup(),
plan.getImportScope(), dto.importScope(),
plan.getEventFamilies(), dto.eventFamilies(),
plan.getInitialMode(), dto.initialMode(),
plan.getScheduledMode(), dto.scheduledMode(),
plan.getInitialStrategy(), dto.initialStrategy(),
plan.getScheduledStrategy(), dto.scheduledStrategy(),
plan.isRefreshMasterDataFirst(), dto.refreshMasterDataFirst(),
plan.getInitialOccurredFrom(), dto.initialOccurredFrom(),
plan.getInitialOccurredTo(), dto.initialOccurredTo(),
plan.isRunInitialOnStartup() dto.runInitialOnStartup()
); );
} }
private String normalize(String value) {
return value == null ? "" : value.trim().toLowerCase(Locale.ROOT);
}
} }

View File

@ -1,13 +1,16 @@
package at.procon.eventhub.tachograph.service; package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.importing.extraction.ExtractionBatchExecutor;
import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest; import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.util.UUID; import java.util.UUID;
public interface TachographExtractionBatchExecutor { public interface TachographExtractionBatchExecutor
extends ExtractionBatchExecutor<TachographImportRequest, TachographExtractionBatchResultDto> {
@Override
TachographExtractionBatchResultDto execute( TachographExtractionBatchResultDto execute(
UUID importRunId, UUID importRunId,
UUID packageId, UUID packageId,

View File

@ -1,13 +0,0 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.EventFamily;
public record TachographExtractionDefinition(
String code,
EventFamily eventFamily,
String sourceKind,
String entityAxis,
String sqlResource,
TachographExtractionRowMapper rowMapper
) {
}

View File

@ -1,25 +1,21 @@
package at.procon.eventhub.tachograph.service; package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.EventFamily; import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.importing.extraction.ExtractionDefinition;
import at.procon.eventhub.importing.extraction.ExtractionDefinitionRegistry;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class TachographExtractionDefinitionRegistry { public class TachographExtractionDefinitionRegistry extends ExtractionDefinitionRegistry<TachographImportRequest> {
private final Map<String, TachographExtractionDefinition> definitionsByCode;
public TachographExtractionDefinitionRegistry( public TachographExtractionDefinitionRegistry(
CardActivityRowMapper cardActivityRowMapper, CardActivityRowMapper cardActivityRowMapper,
VuActivityRowMapper vuActivityRowMapper VuActivityRowMapper vuActivityRowMapper
) { ) {
List<TachographExtractionDefinition> definitions = List.of( super(List.of(
new TachographExtractionDefinition( new ExtractionDefinition<>(
"CARD_ACTIVITY", "CARD_ACTIVITY",
EventFamily.DRIVER_ACTIVITY, EventFamily.DRIVER_ACTIVITY,
"DRIVER_CARD", "DRIVER_CARD",
@ -27,7 +23,7 @@ public class TachographExtractionDefinitionRegistry {
"classpath:sql/tachograph/card-activity.sql", "classpath:sql/tachograph/card-activity.sql",
cardActivityRowMapper cardActivityRowMapper
), ),
new TachographExtractionDefinition( new ExtractionDefinition<>(
"VU_ACTIVITY", "VU_ACTIVITY",
EventFamily.DRIVER_ACTIVITY, EventFamily.DRIVER_ACTIVITY,
"VEHICLE_UNIT", "VEHICLE_UNIT",
@ -35,16 +31,6 @@ public class TachographExtractionDefinitionRegistry {
"classpath:sql/tachograph/vu-activity.sql", "classpath:sql/tachograph/vu-activity.sql",
vuActivityRowMapper vuActivityRowMapper
) )
); ));
this.definitionsByCode = definitions.stream()
.collect(Collectors.toUnmodifiableMap(definition -> normalize(definition.code()), Function.identity()));
}
public Optional<TachographExtractionDefinition> findByCode(String code) {
return Optional.ofNullable(definitionsByCode.get(normalize(code)));
}
private String normalize(String value) {
return value == null ? "" : value.trim().toUpperCase(Locale.ROOT);
} }
} }

View File

@ -1,10 +0,0 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.dto.EventHubEventDto;
import java.sql.ResultSet;
import java.sql.SQLException;
public interface TachographExtractionRowMapper {
EventHubEventDto map(ResultSet rs, int rowNum, TachographExtractionContext context) throws SQLException;
}

View File

@ -3,13 +3,10 @@ package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy; import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.EventFamily; import at.procon.eventhub.dto.EventFamily;
import at.procon.eventhub.dto.ImportMode; import at.procon.eventhub.importing.ImportChunkPlanner;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.importing.ImportPlanDto; import at.procon.eventhub.importing.ImportPlanDto;
import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest; import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.time.OffsetDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -18,9 +15,11 @@ import org.springframework.stereotype.Service;
public class TachographImportPlanService { public class TachographImportPlanService {
private final EventHubProperties properties; private final EventHubProperties properties;
private final ImportChunkPlanner chunkPlanner;
public TachographImportPlanService(EventHubProperties properties) { public TachographImportPlanService(EventHubProperties properties, ImportChunkPlanner chunkPlanner) {
this.properties = properties; this.properties = properties;
this.chunkPlanner = chunkPlanner;
} }
public ImportPlanDto createPlan(TachographImportRequest request) { public ImportPlanDto createPlan(TachographImportRequest request) {
@ -36,43 +35,11 @@ public class TachographImportPlanService {
request.importScope(), request.importScope(),
request.sourceGroup(), request.sourceGroup(),
request.eventSource(), request.eventSource(),
chunksFor(request), chunkPlanner.chunksFor(request, properties.getTachograph().getDefaultChunkDays()),
items items
); );
} }
private List<ImportTimeChunkDto> chunksFor(TachographImportRequest request) {
ImportScopeDto scope = request.importScope();
OffsetDateTime from = scope == null ? null : scope.occurredFrom();
OffsetDateTime to = scope == null ? null : scope.occurredTo();
// Source-package driven increments discover original card/VU packages by source-package
// watermark. The occurred window may be null because package period and imported-at
// timestamps are used later by the extractor.
if (request.mode() == ImportMode.INCREMENTAL_UPDATE
&& request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK) {
return List.of(new ImportTimeChunkDto(1, from, to));
}
if (from == null || to == null) {
return List.of(new ImportTimeChunkDto(1, from, to));
}
List<ImportTimeChunkDto> chunks = new ArrayList<>();
int days = Math.max(1, properties.getTachograph().getDefaultChunkDays());
OffsetDateTime cursor = from;
int sequence = 1;
while (cursor.isBefore(to)) {
OffsetDateTime next = cursor.plusDays(days);
if (next.isAfter(to)) {
next = to;
}
chunks.add(new ImportTimeChunkDto(sequence++, cursor, next));
cursor = next;
}
return chunks.isEmpty() ? List.of(new ImportTimeChunkDto(1, from, to)) : chunks;
}
private List<ImportPlanItemDto> itemsFor(EventFamily family, AcquisitionStrategy strategy) { private List<ImportPlanItemDto> itemsFor(EventFamily family, AcquisitionStrategy strategy) {
return switch (family) { return switch (family) {
case DRIVER_ACTIVITY -> List.of( case DRIVER_ACTIVITY -> List.of(

View File

@ -1,33 +1,21 @@
package at.procon.eventhub.tachograph.service; package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.ImportMode;
import at.procon.eventhub.dto.SchedulerTriggerMode; import at.procon.eventhub.dto.SchedulerTriggerMode;
import at.procon.eventhub.importing.AbstractImportScheduler;
import at.procon.eventhub.tachograph.dto.TachographImportRequest; import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.time.OffsetDateTime; import java.util.List;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
public class TachographImportScheduler { public class TachographImportScheduler extends AbstractImportScheduler<TachographImportRequest> {
private static final Logger log = LoggerFactory.getLogger(TachographImportScheduler.class);
private final EventHubProperties properties; private final EventHubProperties properties;
private final TachographConfiguredImportPlanService configuredPlanService; private final TachographConfiguredImportPlanService configuredPlanService;
private final TachographImportExecutionService executionService; private final TachographImportExecutionService executionService;
private final Map<String, ZonedDateTime> nextRunByPlan = new ConcurrentHashMap<>();
private final Map<String, AtomicBoolean> runningByPlan = new ConcurrentHashMap<>();
public TachographImportScheduler( public TachographImportScheduler(
EventHubProperties properties, EventHubProperties properties,
@ -41,57 +29,51 @@ public class TachographImportScheduler {
@EventListener(ApplicationReadyEvent.class) @EventListener(ApplicationReadyEvent.class)
public void triggerInitialPlansOnStartup() { public void triggerInitialPlansOnStartup() {
if (!properties.getTachograph().isSchedulerEnabled()) { super.triggerInitialPlansOnStartup();
return;
}
for (EventHubProperties.ConfiguredImportPlan plan : properties.getTachograph().getImportPlans()) {
if (plan.isEnabled() && plan.getPlanKey() != null && !plan.getPlanKey().isBlank() && plan.isRunInitialOnStartup()) {
triggerPlan(plan, true, properties.getTachograph().getSchedulerTriggerMode());
}
}
} }
@Scheduled(fixedDelayString = "${eventhub.tachograph.scheduler-poll-interval-ms:60000}") @Scheduled(fixedDelayString = "${eventhub.tachograph.scheduler-poll-interval-ms:60000}")
public void pollConfiguredPlans() { public void pollConfiguredPlans() {
if (!properties.getTachograph().isSchedulerEnabled()) { super.pollConfiguredPlans();
return;
}
ZonedDateTime now = ZonedDateTime.now(ZoneId.systemDefault());
for (EventHubProperties.ConfiguredImportPlan plan : properties.getTachograph().getImportPlans()) {
if (!plan.isEnabled() || plan.getPlanKey() == null || plan.getPlanKey().isBlank() || plan.getCron() == null || plan.getCron().isBlank()) {
continue;
}
String key = plan.getPlanKey();
ZonedDateTime next = nextRunByPlan.computeIfAbsent(key, ignored -> CronExpression.parse(plan.getCron()).next(now));
if (next != null && !next.isAfter(now)) {
triggerPlan(plan, false, properties.getTachograph().getSchedulerTriggerMode());
nextRunByPlan.put(key, CronExpression.parse(plan.getCron()).next(now.plusSeconds(1)));
}
}
} }
private void triggerPlan(EventHubProperties.ConfiguredImportPlan plan, boolean initial, SchedulerTriggerMode triggerMode) { @Override
String key = plan.getPlanKey(); protected boolean schedulerEnabled() {
AtomicBoolean running = runningByPlan.computeIfAbsent(key, ignored -> new AtomicBoolean(false)); return properties.getTachograph().isSchedulerEnabled();
if (!running.compareAndSet(false, true)) { }
log.info("Skipping tachograph import plan={} because a previous run is still active", key);
return; @Override
} protected List<EventHubProperties.ConfiguredImportPlan> configuredPlans() {
try { return properties.getTachograph().getImportPlans();
TachographImportRequest request = initial }
? configuredPlanService.createInitialRequest(plan)
: configuredPlanService.createScheduledRequest(plan); @Override
log.info("Triggering tachograph import plan={} initial={} mode={} strategy={} triggerMode={} at={}", protected SchedulerTriggerMode schedulerTriggerMode() {
key, initial, request.mode(), request.acquisitionStrategy(), triggerMode, OffsetDateTime.now()); return properties.getTachograph().getSchedulerTriggerMode();
if (triggerMode == SchedulerTriggerMode.EXECUTE) { }
executionService.startAndExecuteImport(request);
} else { @Override
executionService.startImport(request); protected TachographImportRequest createInitialRequest(EventHubProperties.ConfiguredImportPlan plan) {
} return configuredPlanService.createInitialRequest(plan);
} catch (RuntimeException ex) { }
log.error("Tachograph import plan={} failed to trigger", key, ex);
} finally { @Override
running.set(false); protected TachographImportRequest createScheduledRequest(EventHubProperties.ConfiguredImportPlan plan) {
} return configuredPlanService.createScheduledRequest(plan);
}
@Override
protected void startImport(TachographImportRequest request) {
executionService.startImport(request);
}
@Override
protected void startAndExecuteImport(TachographImportRequest request) {
executionService.startAndExecuteImport(request);
}
@Override
protected String providerName() {
return "tachograph";
} }
} }