Use import time points in tachograph extraction

This commit is contained in:
trifonovt 2026-05-05 12:56:30 +02:00
parent 900bfa5918
commit 17e2bbedf4
25 changed files with 664 additions and 225 deletions

View File

@ -542,23 +542,29 @@ order by p.received_at desc;
## Check acquired events
`eventhub.acquired_event` was replaced by the normalized `eventhub.event` and `eventhub.event_detail` tables.
```sql
select occurred_at,
driver_source_entity_id,
driver_card_nation,
driver_card_number,
vehicle_source_entity_id,
vehicle_vin,
vehicle_registration_nation,
vehicle_registration_number,
event_domain,
event_type,
lifecycle,
event_signature_hash,
event_details,
payload
from eventhub.acquired_event
order by occurred_at desc;
select e.occurred_at,
e.driver_source_entity_id,
e.driver_card_nation,
e.driver_card_number,
e.vehicle_source_entity_id,
e.vehicle_vin,
e.vehicle_registration_nation,
e.vehicle_registration_number,
e.event_domain,
e.event_type,
e.lifecycle,
e.event_signature_hash,
d.detail_type,
d.attributes as event_details,
e.payload
from eventhub.event e
left join eventhub.event_detail d
on d.event_occurred_at = e.occurred_at
and d.event_id = e.id
order by e.occurred_at desc;
```
## Next implementation steps
@ -639,7 +645,7 @@ For tachograph JDBC extraction, `sourcePackageId` is the original `FileLog.ID`.
N planned data_package rows, one per extraction definition and time chunk
```
The SQL extraction routes are intentionally separated from run planning. They should pick planned extraction packages, execute the corresponding SQL, map rows to `EventHubEventDto`, set `sourcePackageRef` when known, and send them to `direct:eventhub-normalized-input`.
When `executeImmediately=true` or a configured plan is started with `triggerMode=EXECUTE`, the concrete extractor executes the planned packages, maps rows to `EventHubEventDto`, sets `sourcePackageRef` when known, persists synchronous JDBC batches through `EventHubIngestionService`, and advances the import cursor only after successful persistence.
### Initial import
@ -705,15 +711,18 @@ This is preferred because newly imported original driver-card/VU packages can co
### Extraction route contract
A future concrete SQL extraction route should do this:
The concrete JDBC extraction route now does this:
```text
planned data_package
-> execute SQL for extraction_code and chunk/import scope
-> apply source-package watermark parameters for incremental imports
-> map source rows to EventHubEventDto
-> populate sourcePackageRef if source package metadata is available
-> send to direct:eventhub-normalized-input
-> only advance eventhub.import_cursor after successful import
-> populate sourcePackageRef when source package metadata is available
-> hand off mapped events according to eventhub.tachograph.jdbc-extraction-ingest-mode
- SYNC_DIRECT: persist controlled DB_EXTRACT packages through EventHubIngestionService
- CAMEL_ROUTE: persist the same controlled batches through direct:eventhub-batch-persist-input
-> advance eventhub.import_cursor after the configured extraction handoff completed
```
## Configurable scheduled tachograph imports
@ -739,6 +748,7 @@ eventhub:
scheduler-enabled: false
scheduler-poll-interval-ms: 60000
scheduler-trigger-mode: PLAN_ONLY
jdbc-extraction-ingest-mode: SYNC_DIRECT
import-plans:
- plan-key: kralowetz-tachograph-org-147
enabled: false
@ -790,13 +800,13 @@ POST /api/eventhub/acquisition/tachograph/imports/configured-plans/kralowetz-tac
## Concrete extraction extension point
The scheduler and import-run service are now implemented, but the generated skeleton still does not know the real tachograph DB SQL. The extension point is:
The scheduler, import-run service and first JDBC SQL extractor are implemented. The extension point remains:
```java
TachographExtractionBatchExecutor
```
Replace `NoopTachographExtractionBatchExecutor` with an implementation that:
`NoopTachographExtractionBatchExecutor` is used only when `eventhub.tachograph.datasource.jdbc-url` is empty. A custom executor can still replace it and should:
```text
1. receives importRunId, packageId, TachographImportRequest, planItem and time chunk
@ -805,15 +815,27 @@ Replace `NoopTachographExtractionBatchExecutor` with an implementation that:
4. applies source-package watermark or source-row watermark for incremental updates
5. maps rows to EventHubEventDto
6. sets sourcePackageRef when the row can be traced to an original card/VU package
7. sends events to direct:eventhub-normalized-input or EventHubIngestionService
8. returns TachographExtractionBatchResultDto with cursor watermarks
7. persist events through `EventHubIngestionService` or a route with explicit completion tracking
8. return `TachographExtractionBatchResultDto` with cursor watermarks
```
The import cursor is advanced only when the executor reports `executed=true`. The default no-op executor returns `executed=false`, so it does not move cursors accidentally.
## JDBC tachograph extraction
The first concrete extractor is `JdbcTachographExtractionBatchExecutor`. It is enabled only when `eventhub.tachograph.datasource.jdbc-url` is configured. Without that datasource, the application keeps using `NoopTachographExtractionBatchExecutor`.
The first concrete extractor is `JdbcTachographExtractionBatchExecutor`. It is enabled only when `eventhub.tachograph.datasource.jdbc-url` is configured. The default `application.yml` maps this to `${TACHOGRAPH_DB_JDBC_URL:}`, so an empty environment does not create the tachograph datasource and the application keeps using `NoopTachographExtractionBatchExecutor`.
The JDBC extractor now has a configurable handoff mode via `eventhub.tachograph.jdbc-extraction-ingest-mode`:
```yaml
eventhub:
tachograph:
jdbc-extraction-ingest-mode: SYNC_DIRECT # or CAMEL_ROUTE
```
`SYNC_DIRECT` is the default and persists controlled `DB_EXTRACT` packages directly through `EventHubIngestionService`. This is the recommended mode for cursor-sensitive scheduled imports, because each extraction batch is written before the package is marked imported and before the cursor is advanced.
`CAMEL_ROUTE` keeps a Camel-based alternative without returning to the unsafe timeout-driven aggregation handoff: the extractor still builds controlled DB extraction batches, but sends each `EventHubEventBatchDto` through `direct:eventhub-batch-persist-input`. This route invokes the same `EventHubIngestionService` synchronously, so cursor advancement remains deterministic while the persistence handoff still goes through Camel.
Currently implemented extraction definitions:

View File

@ -69,6 +69,10 @@ public class EventHubCommonIngestionRoute extends RouteBuilder {
.forceCompletionOnStop()
.process(batchBuildProcessor)
.bean(ingestionService, "ingest");
from("direct:eventhub-batch-persist-input")
.routeId("eventhub-direct-batch-persist-route")
.bean(ingestionService, "ingest");
}
private String batchInputUri() {

View File

@ -1,16 +1,12 @@
package at.procon.eventhub.importing;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportRunStatus;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.importing.persistence.ImportRunRepository;
import at.procon.eventhub.persistence.DataPackageRepository.CamelBatchGroupStatus;
import at.procon.eventhub.persistence.DataPackageRepository;
import at.procon.eventhub.persistence.EventSourceRepository;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@ -26,32 +22,23 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractImportExecutionService<R extends ImportRunRequest, B extends ExtractionBatchResult> {
private static final Duration ASYNC_INGEST_AWAIT_TIMEOUT = Duration.ofHours(6);
private static final Duration ASYNC_INGEST_POLL_INTERVAL = Duration.ofSeconds(2);
private static final Duration ASYNC_INGEST_FAILURE_GRACE_PERIOD = Duration.ofSeconds(90);
private static final Duration ASYNC_INGEST_STALL_GRACE_PERIOD = Duration.ofSeconds(90);
private static final Duration ASYNC_INGEST_PROGRESS_LOG_INTERVAL = Duration.ofSeconds(30);
private final Logger log = LoggerFactory.getLogger(getClass());
private final EventSourceRepository eventSourceRepository;
private final ImportRunRepository importRunRepository;
private final DataPackageRepository dataPackageRepository;
private final ImportCursorRepository importCursorRepository;
private final EventHubProperties eventHubProperties;
protected AbstractImportExecutionService(
EventSourceRepository eventSourceRepository,
ImportRunRepository importRunRepository,
DataPackageRepository dataPackageRepository,
ImportCursorRepository importCursorRepository,
EventHubProperties eventHubProperties
ImportCursorRepository importCursorRepository
) {
this.eventSourceRepository = eventSourceRepository;
this.importRunRepository = importRunRepository;
this.dataPackageRepository = dataPackageRepository;
this.importCursorRepository = importCursorRepository;
this.eventHubProperties = eventHubProperties;
}
protected ImportRunResultDto createImportRun(R request, boolean executeImmediately) {
@ -193,7 +180,6 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
"extractedEventTypeCounts", result.eventTypeCounts()
));
}
awaitAsyncIngestCompletion(importRunId, request, plannedPackage, result);
results.add(result);
dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted());
if (result.executed()) {
@ -221,107 +207,6 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
results.stream().filter(ExtractionBatchResult::executed).count());
}
private void awaitAsyncIngestCompletion(UUID importRunId, R request, PlannedPackage plannedPackage, B result) {
int expectedCamelBatches = expectedCamelBatchCount(result.eventsInserted());
if (expectedCamelBatches <= 0) {
return;
}
EventHubPackageRequest packageInfo = extractionAggregatePackageInfo(
importRunId,
request,
eventSourceForItem(request.eventSource(), plannedPackage.planItem()),
plannedPackage.planItem(),
plannedPackage.chunk()
);
String aggregatePackageKey = aggregatePackageKey(packageInfo);
Instant deadline = Instant.now().plus(ASYNC_INGEST_AWAIT_TIMEOUT);
Instant nextProgressLogAt = Instant.now().plus(ASYNC_INGEST_PROGRESS_LOG_INTERVAL);
Instant failedStateObservedAt = null;
Instant lastStateChangeAt = Instant.now();
CamelBatchGroupStatus previousState = null;
while (Instant.now().isBefore(deadline)) {
CamelBatchGroupStatus state = dataPackageRepository.findCamelBatchGroupStatus(
plannedPackage.eventSourceId(),
request.tenantKey(),
aggregatePackageKey
);
if (state.totalCount() >= expectedCamelBatches && state.successCount() >= expectedCamelBatches) {
return;
}
boolean stateChanged = previousState == null || !previousState.equals(state);
if (stateChanged) {
lastStateChangeAt = Instant.now();
}
if (state.failedCount() > 0 && stateChanged) {
failedStateObservedAt = Instant.now();
} else if (state.failedCount() == 0) {
failedStateObservedAt = null;
}
if (state.failedCount() > 0
&& failedStateObservedAt != null
&& Instant.now().isAfter(failedStateObservedAt.plus(ASYNC_INGEST_FAILURE_GRACE_PERIOD))) {
throw new IllegalStateException(
"Async EventHub ingest failed for importRunId=" + importRunId
+ " aggregatePackageKey=" + aggregatePackageKey
+ " expectedCamelBatches=" + expectedCamelBatches
+ " observedCamelBatches=" + state.totalCount()
+ " failedCamelBatches=" + state.failedCount()
+ " failedMessage=" + state.failedMessage()
);
}
if (state.totalCount() < expectedCamelBatches
&& state.importingCount() == 0
&& state.failedCount() == 0
&& Instant.now().isAfter(lastStateChangeAt.plus(ASYNC_INGEST_STALL_GRACE_PERIOD))) {
throw new IllegalStateException(
"Async EventHub ingest stalled for importRunId=" + importRunId
+ " aggregatePackageKey=" + aggregatePackageKey
+ " expectedCamelBatches=" + expectedCamelBatches
+ " observedCamelBatches=" + state.totalCount()
+ " successfulCamelBatches=" + state.successCount()
+ " importingCamelBatches=" + state.importingCount()
+ " failedCamelBatches=" + state.failedCount()
);
}
if (Instant.now().isAfter(nextProgressLogAt)) {
log.info("Waiting for async EventHub ingest provider={} importRunId={} extractionPackageId={} aggregatePackageKey={} expectedCamelBatches={} observedCamelBatches={} successfulCamelBatches={} failedCamelBatches={} importingCamelBatches={}",
providerPackagePrefix(),
importRunId,
plannedPackage.packageId(),
aggregatePackageKey,
expectedCamelBatches,
state.totalCount(),
state.successCount(),
state.failedCount(),
state.importingCount());
nextProgressLogAt = Instant.now().plus(ASYNC_INGEST_PROGRESS_LOG_INTERVAL);
}
previousState = state;
sleepQuietly(ASYNC_INGEST_POLL_INTERVAL);
}
CamelBatchGroupStatus finalState = dataPackageRepository.findCamelBatchGroupStatus(
plannedPackage.eventSourceId(),
request.tenantKey(),
aggregatePackageKey
);
throw new IllegalStateException(
"Timed out waiting for async EventHub ingest for importRunId=" + importRunId
+ " aggregatePackageKey=" + aggregatePackageKey
+ " expectedCamelBatches=" + expectedCamelBatches
+ " observedCamelBatches=" + finalState.totalCount()
+ " successfulCamelBatches=" + finalState.successCount()
+ " failedCamelBatches=" + finalState.failedCount()
);
}
private EventHubPackageRequest packageRequestFor(
R request,
EventSourceDto itemEventSource,
@ -339,43 +224,6 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
);
}
private EventHubPackageRequest extractionAggregatePackageInfo(
UUID importRunId,
R request,
EventSourceDto itemEventSource,
ImportPlanItemDto item,
ImportTimeChunkDto chunk
) {
return new EventHubPackageRequest(
request.tenantKey(),
itemEventSource,
request.sourceGroup(),
chunkScope(request.importScope(), chunk),
item.eventFamily().name(),
chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(),
providerPackagePrefix() + ":" + item.sourceKind() + ":" + item.extractionCode()
+ ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence()
);
}
private int expectedCamelBatchCount(int extractedEventCount) {
if (extractedEventCount <= 0) {
return 0;
}
int completionSize = Math.max(1, eventHubProperties.getBatch().getCompletionSize());
return ((extractedEventCount - 1) / completionSize) + 1;
}
private String aggregatePackageKey(EventHubPackageRequest packageInfo) {
return packageInfo.tenantKey()
+ ":" + packageInfo.eventSource().stableKey()
+ ":" + (packageInfo.sourceGroup() == null ? "NO_GROUP" : packageInfo.sourceGroup().stableKey())
+ ":" + (packageInfo.importScope() == null ? "NO_SCOPE" : packageInfo.importScope().stableKey())
+ ":" + packageInfo.eventFamily()
+ ":" + (packageInfo.businessDate() == null ? "NO_DATE" : packageInfo.businessDate())
+ ":" + packageInfo.externalPackageId();
}
private at.procon.eventhub.dto.ImportScopeDto chunkScope(at.procon.eventhub.dto.ImportScopeDto scope, ImportTimeChunkDto chunk) {
if (scope == null) {
return at.procon.eventhub.dto.ImportScopeDto.tenantAll(chunk.occurredFrom(), chunk.occurredTo());
@ -389,15 +237,6 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
);
}
private void sleepQuietly(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted while waiting for async EventHub ingest", ex);
}
}
private record PlannedPackage(UUID packageId, int eventSourceId, ImportPlanItemDto planItem, ImportTimeChunkDto chunk) {
}
}

View File

@ -1,7 +1,12 @@
package at.procon.eventhub.importing.extraction;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubEventBatchDto;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventHubPackageResult;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportScopeDto;
@ -10,11 +15,14 @@ import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportRunRequest;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.service.EventHubIngestionService;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@ -33,18 +41,24 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
private static final int EVENT_EXTRACTION_PROGRESS_LOG_INTERVAL = 5000;
private final NamedParameterJdbcTemplate jdbcTemplate;
private final EventHubIngestionService ingestionService;
private final ProducerTemplate producerTemplate;
private final EventHubProperties eventHubProperties;
private final ResourceLoader resourceLoader;
private final ImportCursorRepository importCursorRepository;
protected AbstractJdbcExtractionBatchExecutor(
NamedParameterJdbcTemplate jdbcTemplate,
EventHubIngestionService ingestionService,
ProducerTemplate producerTemplate,
EventHubProperties eventHubProperties,
ResourceLoader resourceLoader,
ImportCursorRepository importCursorRepository
) {
this.jdbcTemplate = jdbcTemplate;
this.ingestionService = ingestionService;
this.producerTemplate = producerTemplate;
this.eventHubProperties = eventHubProperties;
this.resourceLoader = resourceLoader;
this.importCursorRepository = importCursorRepository;
}
@ -87,17 +101,22 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
Map<String, Object> params = parameters(request, chunkScope, cursor);
String sql = loadSql(definition.sqlResource());
ExtractedEventStats stats = new ExtractedEventStats();
log.info("Reading EventHub events provider={} tenant={} importRunId={} packageId={} extractionCode={} sourceKind={} chunk={} occurredFrom={} occurredTo={}",
List<EventHubEventDto> pendingEvents = new ArrayList<>(jdbcPersistBatchSize());
log.info("Reading EventHub events provider={} tenant={} importRunId={} packageId={} extractionCode={} sourceKind={} chunk={} occurredFrom={} occurredTo={} ingestMode={}",
providerPackagePrefix(), request.tenantKey(), importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(),
chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo());
chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), jdbcExtractionIngestMode());
jdbcTemplate.query(sql, params, rs -> {
EventHubEventDto event = definition.rowMapper().map(rs, stats.eventsMapped(), context);
producerTemplate.sendBody(normalizedInputUri(), event);
pendingEvents.add(event);
stats.accept(event);
if (pendingEvents.size() >= jdbcPersistBatchSize()) {
flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats);
}
if (stats.eventsMapped() % EVENT_EXTRACTION_PROGRESS_LOG_INTERVAL == 0) {
logEventExtractionProgress(request, importRunId, packageId, planItem, chunk, stats);
}
});
flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats);
logEventExtractionFinished(request, importRunId, packageId, planItem, chunk, stats);
return resultFor(packageId, planItem, chunk, cursor, stats);
@ -119,8 +138,111 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
return "SOURCE";
}
protected String normalizedInputUri() {
return "direct:eventhub-normalized-input";
protected int jdbcPersistBatchSize() {
return Math.max(1, eventHubProperties.getBatch().getCompletionSize());
}
protected EventHubProperties.JdbcExtractionIngestMode jdbcExtractionIngestMode() {
return eventHubProperties.getTachograph().getJdbcExtractionIngestMode();
}
protected int eventsInsertedOrSubmitted(ExtractedEventStats stats) {
return stats.eventsInserted();
}
protected String camelBatchPersistUri() {
return "direct:eventhub-batch-persist-input";
}
private void flushPersistBatch(
R request,
UUID importRunId,
UUID extractionPackageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
EventHubPackageRequest packageInfo,
List<EventHubEventDto> pendingEvents,
ExtractedEventStats stats
) {
if (pendingEvents.isEmpty()) {
return;
}
int batchNo = stats.nextPersistBatchNo();
List<EventHubEventDto> eventsToPersist = List.copyOf(pendingEvents);
pendingEvents.clear();
EventHubEventBatchDto batch = new EventHubEventBatchDto(
packageInfo.externalPackageId() + ":JDBC-" + batchNo,
packageInfo,
DataPackageType.DB_EXTRACT,
occurredFrom(eventsToPersist, chunk),
occurredTo(eventsToPersist, chunk),
eventsToPersist,
persistBatchMetadata(request, importRunId, extractionPackageId, planItem, chunk, batchNo, eventsToPersist)
);
EventHubPackageResult result = persistBatch(batch);
stats.acceptPersistResult(result);
log.info("Persisted EventHub extraction batch provider={} tenant={} importRunId={} extractionPackageId={} extractionCode={} sourceKind={} chunk={} batchNo={} received={} inserted={}",
providerPackagePrefix(), request.tenantKey(), importRunId, extractionPackageId, planItem.extractionCode(), planItem.sourceKind(),
chunk.sequence(), batchNo, result.receivedCount(), result.insertedCount());
}
private EventHubPackageResult persistBatch(EventHubEventBatchDto batch) {
if (jdbcExtractionIngestMode() == EventHubProperties.JdbcExtractionIngestMode.CAMEL_ROUTE) {
return producerTemplate.requestBody(camelBatchPersistUri(), batch, EventHubPackageResult.class);
}
return ingestionService.ingest(batch);
}
private Map<String, Object> persistBatchMetadata(
R request,
UUID importRunId,
UUID extractionPackageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
int batchNo,
List<EventHubEventDto> events
) {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("ingestMode", jdbcExtractionIngestMode().name());
metadata.put("importRunId", importRunId.toString());
metadata.put("extractionPackageId", extractionPackageId.toString());
metadata.put("tenantKey", request.tenantKey());
metadata.put("mode", request.mode().name());
metadata.put("acquisitionStrategy", request.acquisitionStrategy().name());
metadata.put("eventFamily", planItem.eventFamily().name());
metadata.put("sourceKind", planItem.sourceKind());
metadata.put("extractionCode", planItem.extractionCode());
metadata.put("entityAxis", planItem.entityAxis());
metadata.put("chunkSequence", chunk.sequence());
metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString());
metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString());
metadata.put("batchNo", batchNo);
metadata.put("receivedEventCount", events.size());
metadata.put("sourcePackageRefPolicy", "Original source package is preserved per acquired event.");
return metadata;
}
private OffsetDateTime occurredFrom(List<EventHubEventDto> events, ImportTimeChunkDto chunk) {
if (chunk.occurredFrom() != null) {
return chunk.occurredFrom();
}
return events.stream()
.map(EventHubEventDto::occurredAt)
.filter(value -> value != null)
.min(OffsetDateTime::compareTo)
.orElse(null);
}
private OffsetDateTime occurredTo(List<EventHubEventDto> events, ImportTimeChunkDto chunk) {
if (chunk.occurredTo() != null) {
return chunk.occurredTo();
}
return events.stream()
.map(EventHubEventDto::occurredAt)
.filter(value -> value != null)
.max(OffsetDateTime::compareTo)
.orElse(null);
}
private void logEventExtractionProgress(
@ -159,6 +281,7 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
params.put("occurredTo", scope == null ? null : scope.occurredTo());
params.put("organisationId", organisationId);
params.put("includeChildren", scope != null && scope.includeChildren());
params.put("sourcePackageWatermarkEnabled", request.acquisitionStrategy() == AcquisitionStrategy.SOURCE_PACKAGE_WATERMARK ? 1 : 0);
params.put("lastSourcePackageImportedAt", cursor == null ? null : cursor.lastSourcePackageImportedAt());
params.put("lastSourcePackageId", cursor == null ? null : cursor.lastSourcePackageId());
params.put("lastSourcePackageIdNumeric", parseLong(cursor == null ? null : cursor.lastSourcePackageId()));
@ -277,10 +400,28 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
return maxSourcePackageId;
}
private int eventsInserted;
private int persistBatchNo;
public int eventsInserted() {
return eventsInserted;
}
public Map<String, Integer> eventTypeCounts() {
return eventTypeCounts;
}
private int nextPersistBatchNo() {
persistBatchNo++;
return persistBatchNo;
}
private void acceptPersistResult(EventHubPackageResult result) {
if (result != null) {
eventsInserted += result.insertedCount();
}
}
private void accept(EventHubEventDto event) {
eventsMapped++;
eventTypeCounts.merge(eventTypeKey(event), 1, Integer::sum);
@ -290,10 +431,17 @@ public abstract class AbstractJdbcExtractionBatchExecutor<R extends ImportRunReq
OffsetDateTime importedAt = event.sourcePackageRef().importedIntoSourceAt();
String sourcePackageId = event.sourcePackageRef().sourcePackageId();
if (importedAt != null
&& (lastSourcePackageImportedAt == null || importedAt.compareTo(lastSourcePackageImportedAt) > 0)) {
lastSourcePackageImportedAt = importedAt;
lastSourcePackageIdByImportedAt = sourcePackageId;
if (importedAt != null) {
if (lastSourcePackageImportedAt == null || importedAt.compareTo(lastSourcePackageImportedAt) > 0) {
lastSourcePackageImportedAt = importedAt;
lastSourcePackageIdByImportedAt = sourcePackageId;
} else if (importedAt.compareTo(lastSourcePackageImportedAt) == 0
&& sourcePackageId != null
&& !sourcePackageId.isBlank()
&& (lastSourcePackageIdByImportedAt == null
|| compareSourcePackageId(sourcePackageId, lastSourcePackageIdByImportedAt) > 0)) {
lastSourcePackageIdByImportedAt = sourcePackageId;
}
}
if (sourcePackageId != null
&& !sourcePackageId.isBlank()

View File

@ -48,6 +48,13 @@ public class DataPackageRepository {
SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup();
ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope();
SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation();
UUID importRunId = metadataUuid(metadata, "importRunId");
String extractionCode = metadataString(metadata, "extractionCode");
String extractionSourceKind = metadataString(metadata, "sourceKind");
String entityAxis = metadataString(metadata, "entityAxis");
Integer batchNo = metadataInteger(metadata, "batchNo");
OffsetDateTime chunkFrom = metadataOffsetDateTime(metadata, "chunkOccurredFrom");
OffsetDateTime chunkTo = metadataOffsetDateTime(metadata, "chunkOccurredTo");
return jdbcTemplate.query(
con -> {
@ -64,9 +71,16 @@ public class DataPackageRepository {
received_at, event_count, metadata
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), ?, ?::jsonb)
on conflict (tenant_key, event_source_id, package_key) do update
set status = excluded.status,
set import_run_id = excluded.import_run_id,
status = excluded.status,
occurred_from = excluded.occurred_from,
occurred_to = excluded.occurred_to,
extraction_code = excluded.extraction_code,
extraction_source_kind = excluded.extraction_source_kind,
entity_axis = excluded.entity_axis,
batch_no = excluded.batch_no,
chunk_from = excluded.chunk_from,
chunk_to = excluded.chunk_to,
event_count = excluded.event_count,
metadata = excluded.metadata,
error_message = null,
@ -75,7 +89,7 @@ public class DataPackageRepository {
""");
ps.setObject(1, id);
ps.setInt(2, eventSourceId);
ps.setObject(3, null);
ps.setObject(3, importRunId);
ps.setString(4, packageInfo == null ? "default" : packageInfo.tenantKey());
ps.setString(5, packageKey);
ps.setString(6, packageType.name());
@ -94,12 +108,12 @@ public class DataPackageRepository {
ps.setString(19, packageInfo == null ? null : packageInfo.eventFamily());
ps.setObject(20, packageInfo == null ? null : packageInfo.businessDate());
ps.setString(21, packageInfo == null ? packageKey : packageInfo.externalPackageId());
ps.setString(22, null);
ps.setString(23, null);
ps.setString(24, null);
ps.setObject(25, null);
ps.setObject(26, null);
ps.setObject(27, null);
ps.setString(22, extractionCode);
ps.setString(23, extractionSourceKind);
ps.setString(24, entityAxis);
ps.setObject(25, batchNo);
ps.setObject(26, chunkFrom);
ps.setObject(27, chunkTo);
ps.setString(28, null);
ps.setString(29, null);
ps.setString(30, null);
@ -343,6 +357,61 @@ public class DataPackageRepository {
);
}
private UUID metadataUuid(Map<String, Object> metadata, String key) {
String value = metadataString(metadata, key);
if (value == null) {
return null;
}
try {
return UUID.fromString(value);
} catch (IllegalArgumentException ignored) {
return null;
}
}
private String metadataString(Map<String, Object> metadata, String key) {
if (metadata == null) {
return null;
}
Object value = metadata.get(key);
if (value == null) {
return null;
}
String text = value.toString().trim();
return text.isEmpty() ? null : text;
}
private Integer metadataInteger(Map<String, Object> metadata, String key) {
String value = metadataString(metadata, key);
if (value == null) {
return null;
}
try {
return Integer.parseInt(value);
} catch (NumberFormatException ignored) {
return null;
}
}
private OffsetDateTime metadataOffsetDateTime(Map<String, Object> metadata, String key) {
Object value = metadata == null ? null : metadata.get(key);
if (value == null) {
return null;
}
if (value instanceof OffsetDateTime offsetDateTime) {
return offsetDateTime;
}
String text = value.toString().trim();
if (text.isEmpty()) {
return null;
}
try {
return OffsetDateTime.parse(text);
} catch (RuntimeException ignored) {
return null;
}
}
private String toJson(Map<String, Object> value) {
try {
return objectMapper.writeValueAsString(normalizeMetadataMap(value));

View File

@ -10,7 +10,7 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
@Configuration
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''")
@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.tachograph.datasource.jdbc-url:}')")
public class TachographDataSourceConfig {
private static final String SQL_SERVER_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver";

View File

@ -1,5 +1,6 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportScopeDto;
@ -8,6 +9,7 @@ import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.extraction.AbstractJdbcExtractionBatchExecutor;
import at.procon.eventhub.importing.extraction.ExtractionDefinition;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.service.EventHubIngestionService;
import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto;
import at.procon.eventhub.tachograph.dto.TachographImportRequest;
import java.time.LocalDateTime;
@ -26,7 +28,7 @@ import org.springframework.stereotype.Service;
@Service
@ConditionalOnBean(name = "tachographNamedParameterJdbcTemplate")
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''")
@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.tachograph.datasource.jdbc-url:}')")
public class JdbcTachographExtractionBatchExecutor
extends AbstractJdbcExtractionBatchExecutor<TachographImportRequest, TachographExtractionBatchResultDto>
implements TachographExtractionBatchExecutor {
@ -35,12 +37,14 @@ public class JdbcTachographExtractionBatchExecutor
public JdbcTachographExtractionBatchExecutor(
@Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate tachographJdbcTemplate,
EventHubIngestionService ingestionService,
ProducerTemplate producerTemplate,
EventHubProperties eventHubProperties,
ResourceLoader resourceLoader,
TachographExtractionDefinitionRegistry definitionRegistry,
ImportCursorRepository importCursorRepository
) {
super(tachographJdbcTemplate, producerTemplate, resourceLoader, importCursorRepository);
super(tachographJdbcTemplate, ingestionService, producerTemplate, eventHubProperties, resourceLoader, importCursorRepository);
this.definitionRegistry = definitionRegistry;
}
@ -78,7 +82,7 @@ public class JdbcTachographExtractionBatchExecutor
planItem.sourceKind(),
stats.eventsMapped(),
stats.eventsMapped(),
stats.eventsMapped(),
eventsInsertedOrSubmitted(stats),
0,
true,
lastSourcePackageImportedAt(stats, cursor),

View File

@ -19,7 +19,7 @@ import org.springframework.stereotype.Service;
*/
@Service
@ConditionalOnMissingBean(TachographExtractionBatchExecutor.class)
@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' == ''")
@ConditionalOnExpression("!T(org.springframework.util.StringUtils).hasText('${eventhub.tachograph.datasource.jdbc-url:}')")
public class NoopTachographExtractionBatchExecutor
extends AbstractNoopExtractionBatchExecutor<TachographImportRequest, TachographExtractionBatchResultDto>
implements TachographExtractionBatchExecutor {

View File

@ -1,6 +1,5 @@
package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.importing.AbstractImportExecutionService;
import at.procon.eventhub.importing.ImportPlanDto;
@ -33,11 +32,10 @@ public class TachographImportExecutionService
ImportRunRepository importRunRepository,
DataPackageRepository dataPackageRepository,
ImportCursorRepository importCursorRepository,
EventHubProperties eventHubProperties,
TachographMasterDataRefreshService masterDataRefreshService,
TachographExtractionBatchExecutor extractionBatchExecutor
) {
super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository, eventHubProperties);
super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository);
this.planService = planService;
this.masterDataRefreshService = masterDataRefreshService;
this.extractionBatchExecutor = extractionBatchExecutor;

View File

@ -2,17 +2,17 @@ spring:
application:
name: eventhub-ingestion-service
datasource:
url: jdbc:postgresql://localhost:5432/eventhub
username: postgres
password: P54!pcd#Wi
url: jdbc:postgresql://${DB_HOST:localhost}:${DB_PORT:5432}/${DB_NAME:eventhub}
username: ${DB_USER:}
password: ${DB_PASSWORD:}
hikari:
maximum-pool-size: 16
minimum-idle: 4
connection-timeout: 30000
validation-timeout: 5000
idle-timeout: 60000
keepalive-time: 30000
max-lifetime: 90000
idle-timeout: 300000
keepalive-time: 120000
max-lifetime: 540000
flyway:
enabled: true
default-schema: eventhub
@ -54,21 +54,27 @@ eventhub:
default-chunk-days: 1
occurred-at-overlap: 7d
# Configure this block to enable JdbcTachographExtractionBatchExecutor.
# Set TACHOGRAPH_DB_JDBC_URL to enable JdbcTachographExtractionBatchExecutor.
datasource:
jdbc-url: jdbc:sqlserver://db.bytebar.eu:22996;databaseName=ByteBarDriverSettlement;trustServerCertificate=true
username: ReadOnly
password: p2=race!
jdbc-url: ${TACHOGRAPH_DB_JDBC_URL:}
username: ${TACHOGRAPH_DB_USER:}
password: ${TACHOGRAPH_DB_PASSWORD:}
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# Enables the scheduler that regularly triggers configured tachograph import plans.
# Default is safe: no scheduled import starts unless explicitly enabled.
scheduler-enabled: true
scheduler-poll-interval-ms: 60000
scheduler-poll-interval-ms: 3600000
# PLAN_ONLY creates import_run + planned extraction packages.
# EXECUTE also invokes the configured TachographExtractionBatchExecutor.
scheduler-trigger-mode: EXECUTE
# JDBC extraction handoff mode:
# SYNC_DIRECT = persist controlled JDBC batches directly, cursor-safe default.
# CAMEL_ROUTE = persist the same controlled batches through direct:eventhub-batch-persist-input.
jdbc-extraction-ingest-mode: ${TACHOGRAPH_JDBC_EXTRACTION_INGEST_MODE:SYNC_DIRECT}
# Example plan. Keep disabled until the tachograph datasource/extractor is wired.
import-plans:
- plan-key: kralowetz-tachograph-org-147
@ -111,5 +117,54 @@ eventhub:
scheduled-strategy: SOURCE_PACKAGE_WATERMARK
refresh-master-data-first: false
initial-occurred-from: "2026-01-21T00:00:00+01:00"
initial-occurred-to: "2026-01-25T00:00:00+01:00"
initial-occurred-to: "2026-01-31T00:00:00+01:00"
run-initial-on-startup: true
yellow-fox:
default-chunk-days: 1
occurred-at-overlap: 2h
emit-initial-ignition-snapshot: false
datasource:
jdbc-url: ${YELLOWFOX_DB_JDBC_URL:}
username: ${YELLOWFOX_DB_USERNAME:}
password: ${YELLOWFOX_DB_PASSWORD:}
driver-class-name: org.postgresql.Driver
scheduler-enabled: false
scheduler-poll-interval-ms: 60000
scheduler-trigger-mode: PLAN_ONLY
import-plans:
- plan-key: yellowfox-d8-default
enabled: false
cron: "0 */5 * * * *"
tenant-key: default
event-source:
provider-key: YELLOWFOX
source-kind: TELEMATICS_PLATFORM
source-key: YELLOWFOX_D8
source-instance-key: logistics-db-prod
tenant-provider-setting-key: yellowfox-main
source-group:
type: FLEET
source-entity-id: null
code: null
name: null
import-scope:
type: TENANT_ALL
root-source-organisation: null
include-children: false
occurred-from: null
occurred-to: null
event-families:
- DRIVER_ACTIVITY
- DRIVER_CARD
initial-mode: INITIAL_BACKFILL
scheduled-mode: INCREMENTAL_UPDATE
initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP
scheduled-strategy: SOURCE_ROW_WATERMARK
refresh-master-data-first: false
initial-occurred-from: null
initial-occurred-to: null
run-initial-on-startup: false

View File

@ -98,6 +98,8 @@ Base as (
left join dbo.Vehicle v on v.ID = cvu.ID_Vehicle
left join dbo.Nation vn on vn.ID = v.ID_Nation
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:CARD_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id,
@ -134,6 +136,7 @@ select
'DRIVER_CARD' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
@ -148,3 +151,20 @@ where (:occurredFrom is null or evt.occurred_at >= :occurredFrom)
/*
* Organisation filter: driver membership in GetOrganisationTree(null, :organisationId, 0, null).
*/
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -65,6 +65,8 @@ Base as (
)
)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:CARD_BORDER_CROSSING:', base.ID) as external_source_event_id,
@ -89,8 +91,26 @@ select
'DRIVER_CARD' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
base.source_package_imported_at
from Base base
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -62,6 +62,8 @@ Base as (
)
)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:CARD_LOAD_UNLOAD:', base.ID) as external_source_event_id,
@ -94,8 +96,26 @@ select
'DRIVER_CARD' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
base.source_package_imported_at
from Base base
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -66,6 +66,8 @@ Base as (
)
)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:CARD_PLACE:', base.ID) as external_source_event_id,
@ -93,8 +95,26 @@ select
'DRIVER_CARD' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
base.source_package_imported_at
from Base base
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -61,6 +61,8 @@ Base as (
)
)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:CARD_POSITION:', base.ID) as external_source_event_id,
@ -83,8 +85,26 @@ select
'DRIVER_CARD' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
base.source_package_imported_at
from Base base
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -59,6 +59,8 @@ Base as (
)
)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:CARD_SPECIFIC_CONDITION:', base.ID) as external_source_event_id,
@ -81,8 +83,26 @@ select
'DRIVER_CARD' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
base.source_package_imported_at
from Base base
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -80,6 +80,8 @@ Base as (
and (:occurredFrom is null or evt.occurred_at >= :occurredFrom)
and (:occurredTo is null or evt.occurred_at < :occurredTo)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:CARD_VEHICLES_USED:', base.ID, ':', base.lifecycle) as external_source_event_id,
@ -107,8 +109,26 @@ select
'DRIVER_CARD' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
base.source_package_imported_at
from Base base
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -71,6 +71,8 @@ Base as (
and (:occurredFrom is null or evt.occurred_at >= :occurredFrom)
and (:occurredTo is null or evt.occurred_at < :occurredTo)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:IW_CYCLE:', base.ID, ':', base.lifecycle) as external_source_event_id,
@ -98,6 +100,7 @@ select
'VEHICLE_UNIT' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
@ -126,3 +129,20 @@ where (
and rel.GILT_BIS is null
)
)
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -64,6 +64,8 @@ Events as (
'END' as lifecycle
from Base base
)
,
Extracted as (
select
concat(cast(events.ID as varchar(128)), ':', events.lifecycle) as source_row_id,
concat('TACHOGRAPH:SPEEDING_EVENTS:', events.ID, ':', events.lifecycle) as external_source_event_id,
@ -86,6 +88,7 @@ select
'VEHICLE_UNIT' as source_package_kind,
cast(events.source_package_id_raw as varchar(128)) as source_package_id,
events.source_package_id_raw as source_package_sort_id,
cast(events.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
events.source_package_period_from,
events.source_package_period_to,
@ -116,3 +119,20 @@ where (:occurredFrom is null or events.occurred_at >= :occurredFrom)
and rel.GILT_BIS is null
)
)
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -124,6 +124,8 @@ Base as (
left join dbo.Card c on c.ID = iw.ID_Card
left join dbo.Nation cn on cn.ID = c.ID_Nation
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:VU_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id,
@ -160,6 +162,7 @@ select
'VEHICLE_UNIT' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
@ -174,3 +177,20 @@ where (:occurredFrom is null or evt.occurred_at >= :occurredFrom)
/*
* Organisation filter: vehicle membership in GetOrganisationTree(null, :organisationId, 0, null).
*/
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -46,6 +46,8 @@ Base as (
where (:occurredFrom is null or border.Timestamp >= :occurredFrom)
and (:occurredTo is null or border.Timestamp < :occurredTo)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:VU_BORDER_CROSSING:', base.ID) as external_source_event_id,
@ -70,6 +72,7 @@ select
'VEHICLE_UNIT' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
@ -98,3 +101,20 @@ where (
and rel.GILT_BIS is null
)
)
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -43,6 +43,8 @@ Base as (
where (:occurredFrom is null or lu.Timestamp >= :occurredFrom)
and (:occurredTo is null or lu.Timestamp < :occurredTo)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:VU_LOAD_UNLOAD:', base.ID) as external_source_event_id,
@ -75,6 +77,7 @@ select
'VEHICLE_UNIT' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
@ -103,3 +106,20 @@ where (
and rel.GILT_BIS is null
)
)
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -44,6 +44,8 @@ Base as (
where (:occurredFrom is null or place.EntryTime >= :occurredFrom)
and (:occurredTo is null or place.EntryTime < :occurredTo)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:VU_PLACE:', base.ID) as external_source_event_id,
@ -71,6 +73,7 @@ select
'VEHICLE_UNIT' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
@ -99,3 +102,20 @@ where (
and rel.GILT_BIS is null
)
)
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -42,6 +42,8 @@ Base as (
where (:occurredFrom is null or pos.Timestamp >= :occurredFrom)
and (:occurredTo is null or pos.Timestamp < :occurredTo)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:VU_POSITION:', base.ID) as external_source_event_id,
@ -64,6 +66,7 @@ select
'VEHICLE_UNIT' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
@ -92,3 +95,20 @@ where (
and rel.GILT_BIS is null
)
)
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)

View File

@ -32,6 +32,8 @@ Base as (
where (:occurredFrom is null or cond.EntryTime >= :occurredFrom)
and (:occurredTo is null or cond.EntryTime < :occurredTo)
)
,
Extracted as (
select
cast(base.ID as varchar(128)) as source_row_id,
concat('TACHOGRAPH:VU_SPECIFIC_CONDITION:', base.ID) as external_source_event_id,
@ -54,6 +56,7 @@ select
'VEHICLE_UNIT' as source_package_kind,
cast(base.source_package_id_raw as varchar(128)) as source_package_id,
base.source_package_id_raw as source_package_sort_id,
cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id,
base.source_package_period_from,
base.source_package_period_to,
@ -82,3 +85,20 @@ where (
and rel.GILT_BIS is null
)
)
)
select *
from Extracted extracted
where (
:sourcePackageWatermarkEnabled = 0
or :lastSourcePackageImportedAt is null
or extracted.source_package_imported_at is null
or extracted.source_package_imported_at > :lastSourcePackageImportedAt
or (
extracted.source_package_imported_at = :lastSourcePackageImportedAt
and (
:lastSourcePackageIdNumeric is null
or extracted.source_package_sort_id is null
or extracted.source_package_sort_id > :lastSourcePackageIdNumeric
)
)
)