diff --git a/README.md b/README.md index faaacd5..8b446e4 100644 --- a/README.md +++ b/README.md @@ -542,23 +542,29 @@ order by p.received_at desc; ## Check acquired events +`eventhub.acquired_event` was replaced by the normalized `eventhub.event` and `eventhub.event_detail` tables. + ```sql -select occurred_at, - driver_source_entity_id, - driver_card_nation, - driver_card_number, - vehicle_source_entity_id, - vehicle_vin, - vehicle_registration_nation, - vehicle_registration_number, - event_domain, - event_type, - lifecycle, - event_signature_hash, - event_details, - payload -from eventhub.acquired_event -order by occurred_at desc; +select e.occurred_at, + e.driver_source_entity_id, + e.driver_card_nation, + e.driver_card_number, + e.vehicle_source_entity_id, + e.vehicle_vin, + e.vehicle_registration_nation, + e.vehicle_registration_number, + e.event_domain, + e.event_type, + e.lifecycle, + e.event_signature_hash, + d.detail_type, + d.attributes as event_details, + e.payload +from eventhub.event e +left join eventhub.event_detail d + on d.event_occurred_at = e.occurred_at + and d.event_id = e.id +order by e.occurred_at desc; ``` ## Next implementation steps @@ -639,7 +645,7 @@ For tachograph JDBC extraction, `sourcePackageId` is the original `FileLog.ID`. N planned data_package rows, one per extraction definition and time chunk ``` -The SQL extraction routes are intentionally separated from run planning. They should pick planned extraction packages, execute the corresponding SQL, map rows to `EventHubEventDto`, set `sourcePackageRef` when known, and send them to `direct:eventhub-normalized-input`. +When `executeImmediately=true` or a configured plan is started with `triggerMode=EXECUTE`, the concrete extractor executes the planned packages, maps rows to `EventHubEventDto`, sets `sourcePackageRef` when known, persists synchronous JDBC batches through `EventHubIngestionService`, and advances the import cursor only after successful persistence. ### Initial import @@ -705,15 +711,18 @@ This is preferred because newly imported original driver-card/VU packages can co ### Extraction route contract -A future concrete SQL extraction route should do this: +The concrete JDBC extraction route now does this: ```text planned data_package -> execute SQL for extraction_code and chunk/import scope + -> apply source-package watermark parameters for incremental imports -> map source rows to EventHubEventDto - -> populate sourcePackageRef if source package metadata is available - -> send to direct:eventhub-normalized-input - -> only advance eventhub.import_cursor after successful import + -> populate sourcePackageRef when source package metadata is available + -> hand off mapped events according to eventhub.tachograph.jdbc-extraction-ingest-mode + - SYNC_DIRECT: persist controlled DB_EXTRACT packages through EventHubIngestionService + - CAMEL_ROUTE: persist the same controlled batches through direct:eventhub-batch-persist-input + -> advance eventhub.import_cursor after the configured extraction handoff completed ``` ## Configurable scheduled tachograph imports @@ -739,6 +748,7 @@ eventhub: scheduler-enabled: false scheduler-poll-interval-ms: 60000 scheduler-trigger-mode: PLAN_ONLY + jdbc-extraction-ingest-mode: SYNC_DIRECT import-plans: - plan-key: kralowetz-tachograph-org-147 enabled: false @@ -790,13 +800,13 @@ POST /api/eventhub/acquisition/tachograph/imports/configured-plans/kralowetz-tac ## Concrete extraction extension point -The scheduler and import-run service are now implemented, but the generated skeleton still does not know the real tachograph DB SQL. The extension point is: +The scheduler, import-run service and first JDBC SQL extractor are implemented. The extension point remains: ```java TachographExtractionBatchExecutor ``` -Replace `NoopTachographExtractionBatchExecutor` with an implementation that: +`NoopTachographExtractionBatchExecutor` is used only when `eventhub.tachograph.datasource.jdbc-url` is empty. A custom executor can still replace it and should: ```text 1. receives importRunId, packageId, TachographImportRequest, planItem and time chunk @@ -805,15 +815,27 @@ Replace `NoopTachographExtractionBatchExecutor` with an implementation that: 4. applies source-package watermark or source-row watermark for incremental updates 5. maps rows to EventHubEventDto 6. sets sourcePackageRef when the row can be traced to an original card/VU package -7. sends events to direct:eventhub-normalized-input or EventHubIngestionService -8. returns TachographExtractionBatchResultDto with cursor watermarks +7. persist events through `EventHubIngestionService` or a route with explicit completion tracking +8. return `TachographExtractionBatchResultDto` with cursor watermarks ``` The import cursor is advanced only when the executor reports `executed=true`. The default no-op executor returns `executed=false`, so it does not move cursors accidentally. ## JDBC tachograph extraction -The first concrete extractor is `JdbcTachographExtractionBatchExecutor`. It is enabled only when `eventhub.tachograph.datasource.jdbc-url` is configured. Without that datasource, the application keeps using `NoopTachographExtractionBatchExecutor`. +The first concrete extractor is `JdbcTachographExtractionBatchExecutor`. It is enabled only when `eventhub.tachograph.datasource.jdbc-url` is configured. The default `application.yml` maps this to `${TACHOGRAPH_DB_JDBC_URL:}`, so an empty environment does not create the tachograph datasource and the application keeps using `NoopTachographExtractionBatchExecutor`. + +The JDBC extractor now has a configurable handoff mode via `eventhub.tachograph.jdbc-extraction-ingest-mode`: + +```yaml +eventhub: + tachograph: + jdbc-extraction-ingest-mode: SYNC_DIRECT # or CAMEL_ROUTE +``` + +`SYNC_DIRECT` is the default and persists controlled `DB_EXTRACT` packages directly through `EventHubIngestionService`. This is the recommended mode for cursor-sensitive scheduled imports, because each extraction batch is written before the package is marked imported and before the cursor is advanced. + +`CAMEL_ROUTE` keeps a Camel-based alternative without returning to the unsafe timeout-driven aggregation handoff: the extractor still builds controlled DB extraction batches, but sends each `EventHubEventBatchDto` through `direct:eventhub-batch-persist-input`. This route invokes the same `EventHubIngestionService` synchronously, so cursor advancement remains deterministic while the persistence handoff still goes through Camel. Currently implemented extraction definitions: diff --git a/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java index 326acd9..8de1f27 100644 --- a/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java +++ b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java @@ -69,6 +69,10 @@ public class EventHubCommonIngestionRoute extends RouteBuilder { .forceCompletionOnStop() .process(batchBuildProcessor) .bean(ingestionService, "ingest"); + + from("direct:eventhub-batch-persist-input") + .routeId("eventhub-direct-batch-persist-route") + .bean(ingestionService, "ingest"); } private String batchInputUri() { diff --git a/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java b/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java index 077424f..dae2d62 100644 --- a/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java @@ -1,16 +1,12 @@ package at.procon.eventhub.importing; -import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.dto.EventHubPackageRequest; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportRunStatus; import at.procon.eventhub.importing.persistence.ImportCursorRepository; import at.procon.eventhub.importing.persistence.ImportRunRepository; -import at.procon.eventhub.persistence.DataPackageRepository.CamelBatchGroupStatus; import at.procon.eventhub.persistence.DataPackageRepository; import at.procon.eventhub.persistence.EventSourceRepository; -import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -26,32 +22,23 @@ import org.slf4j.LoggerFactory; */ public abstract class AbstractImportExecutionService { - private static final Duration ASYNC_INGEST_AWAIT_TIMEOUT = Duration.ofHours(6); - private static final Duration ASYNC_INGEST_POLL_INTERVAL = Duration.ofSeconds(2); - private static final Duration ASYNC_INGEST_FAILURE_GRACE_PERIOD = Duration.ofSeconds(90); - private static final Duration ASYNC_INGEST_STALL_GRACE_PERIOD = Duration.ofSeconds(90); - private static final Duration ASYNC_INGEST_PROGRESS_LOG_INTERVAL = Duration.ofSeconds(30); - private final Logger log = LoggerFactory.getLogger(getClass()); private final EventSourceRepository eventSourceRepository; private final ImportRunRepository importRunRepository; private final DataPackageRepository dataPackageRepository; private final ImportCursorRepository importCursorRepository; - private final EventHubProperties eventHubProperties; protected AbstractImportExecutionService( EventSourceRepository eventSourceRepository, ImportRunRepository importRunRepository, DataPackageRepository dataPackageRepository, - ImportCursorRepository importCursorRepository, - EventHubProperties eventHubProperties + ImportCursorRepository importCursorRepository ) { this.eventSourceRepository = eventSourceRepository; this.importRunRepository = importRunRepository; this.dataPackageRepository = dataPackageRepository; this.importCursorRepository = importCursorRepository; - this.eventHubProperties = eventHubProperties; } protected ImportRunResultDto createImportRun(R request, boolean executeImmediately) { @@ -193,7 +180,6 @@ public abstract class AbstractImportExecutionService= expectedCamelBatches && state.successCount() >= expectedCamelBatches) { - return; - } - - boolean stateChanged = previousState == null || !previousState.equals(state); - if (stateChanged) { - lastStateChangeAt = Instant.now(); - } - if (state.failedCount() > 0 && stateChanged) { - failedStateObservedAt = Instant.now(); - } else if (state.failedCount() == 0) { - failedStateObservedAt = null; - } - - if (state.failedCount() > 0 - && failedStateObservedAt != null - && Instant.now().isAfter(failedStateObservedAt.plus(ASYNC_INGEST_FAILURE_GRACE_PERIOD))) { - throw new IllegalStateException( - "Async EventHub ingest failed for importRunId=" + importRunId - + " aggregatePackageKey=" + aggregatePackageKey - + " expectedCamelBatches=" + expectedCamelBatches - + " observedCamelBatches=" + state.totalCount() - + " failedCamelBatches=" + state.failedCount() - + " failedMessage=" + state.failedMessage() - ); - } - - if (state.totalCount() < expectedCamelBatches - && state.importingCount() == 0 - && state.failedCount() == 0 - && Instant.now().isAfter(lastStateChangeAt.plus(ASYNC_INGEST_STALL_GRACE_PERIOD))) { - throw new IllegalStateException( - "Async EventHub ingest stalled for importRunId=" + importRunId - + " aggregatePackageKey=" + aggregatePackageKey - + " expectedCamelBatches=" + expectedCamelBatches - + " observedCamelBatches=" + state.totalCount() - + " successfulCamelBatches=" + state.successCount() - + " importingCamelBatches=" + state.importingCount() - + " failedCamelBatches=" + state.failedCount() - ); - } - - if (Instant.now().isAfter(nextProgressLogAt)) { - log.info("Waiting for async EventHub ingest provider={} importRunId={} extractionPackageId={} aggregatePackageKey={} expectedCamelBatches={} observedCamelBatches={} successfulCamelBatches={} failedCamelBatches={} importingCamelBatches={}", - providerPackagePrefix(), - importRunId, - plannedPackage.packageId(), - aggregatePackageKey, - expectedCamelBatches, - state.totalCount(), - state.successCount(), - state.failedCount(), - state.importingCount()); - nextProgressLogAt = Instant.now().plus(ASYNC_INGEST_PROGRESS_LOG_INTERVAL); - } - - previousState = state; - sleepQuietly(ASYNC_INGEST_POLL_INTERVAL); - } - - CamelBatchGroupStatus finalState = dataPackageRepository.findCamelBatchGroupStatus( - plannedPackage.eventSourceId(), - request.tenantKey(), - aggregatePackageKey - ); - throw new IllegalStateException( - "Timed out waiting for async EventHub ingest for importRunId=" + importRunId - + " aggregatePackageKey=" + aggregatePackageKey - + " expectedCamelBatches=" + expectedCamelBatches - + " observedCamelBatches=" + finalState.totalCount() - + " successfulCamelBatches=" + finalState.successCount() - + " failedCamelBatches=" + finalState.failedCount() - ); - } - private EventHubPackageRequest packageRequestFor( R request, EventSourceDto itemEventSource, @@ -339,43 +224,6 @@ public abstract class AbstractImportExecutionService params = parameters(request, chunkScope, cursor); String sql = loadSql(definition.sqlResource()); ExtractedEventStats stats = new ExtractedEventStats(); - log.info("Reading EventHub events provider={} tenant={} importRunId={} packageId={} extractionCode={} sourceKind={} chunk={} occurredFrom={} occurredTo={}", + List pendingEvents = new ArrayList<>(jdbcPersistBatchSize()); + log.info("Reading EventHub events provider={} tenant={} importRunId={} packageId={} extractionCode={} sourceKind={} chunk={} occurredFrom={} occurredTo={} ingestMode={}", providerPackagePrefix(), request.tenantKey(), importRunId, packageId, planItem.extractionCode(), planItem.sourceKind(), - chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo()); + chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), jdbcExtractionIngestMode()); jdbcTemplate.query(sql, params, rs -> { EventHubEventDto event = definition.rowMapper().map(rs, stats.eventsMapped(), context); - producerTemplate.sendBody(normalizedInputUri(), event); + pendingEvents.add(event); stats.accept(event); + if (pendingEvents.size() >= jdbcPersistBatchSize()) { + flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats); + } if (stats.eventsMapped() % EVENT_EXTRACTION_PROGRESS_LOG_INTERVAL == 0) { logEventExtractionProgress(request, importRunId, packageId, planItem, chunk, stats); } }); + flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats); logEventExtractionFinished(request, importRunId, packageId, planItem, chunk, stats); return resultFor(packageId, planItem, chunk, cursor, stats); @@ -119,8 +138,111 @@ public abstract class AbstractJdbcExtractionBatchExecutor pendingEvents, + ExtractedEventStats stats + ) { + if (pendingEvents.isEmpty()) { + return; + } + int batchNo = stats.nextPersistBatchNo(); + List eventsToPersist = List.copyOf(pendingEvents); + pendingEvents.clear(); + + EventHubEventBatchDto batch = new EventHubEventBatchDto( + packageInfo.externalPackageId() + ":JDBC-" + batchNo, + packageInfo, + DataPackageType.DB_EXTRACT, + occurredFrom(eventsToPersist, chunk), + occurredTo(eventsToPersist, chunk), + eventsToPersist, + persistBatchMetadata(request, importRunId, extractionPackageId, planItem, chunk, batchNo, eventsToPersist) + ); + EventHubPackageResult result = persistBatch(batch); + stats.acceptPersistResult(result); + log.info("Persisted EventHub extraction batch provider={} tenant={} importRunId={} extractionPackageId={} extractionCode={} sourceKind={} chunk={} batchNo={} received={} inserted={}", + providerPackagePrefix(), request.tenantKey(), importRunId, extractionPackageId, planItem.extractionCode(), planItem.sourceKind(), + chunk.sequence(), batchNo, result.receivedCount(), result.insertedCount()); + } + + private EventHubPackageResult persistBatch(EventHubEventBatchDto batch) { + if (jdbcExtractionIngestMode() == EventHubProperties.JdbcExtractionIngestMode.CAMEL_ROUTE) { + return producerTemplate.requestBody(camelBatchPersistUri(), batch, EventHubPackageResult.class); + } + return ingestionService.ingest(batch); + } + + private Map persistBatchMetadata( + R request, + UUID importRunId, + UUID extractionPackageId, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk, + int batchNo, + List events + ) { + Map metadata = new LinkedHashMap<>(); + metadata.put("ingestMode", jdbcExtractionIngestMode().name()); + metadata.put("importRunId", importRunId.toString()); + metadata.put("extractionPackageId", extractionPackageId.toString()); + metadata.put("tenantKey", request.tenantKey()); + metadata.put("mode", request.mode().name()); + metadata.put("acquisitionStrategy", request.acquisitionStrategy().name()); + metadata.put("eventFamily", planItem.eventFamily().name()); + metadata.put("sourceKind", planItem.sourceKind()); + metadata.put("extractionCode", planItem.extractionCode()); + metadata.put("entityAxis", planItem.entityAxis()); + metadata.put("chunkSequence", chunk.sequence()); + metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString()); + metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString()); + metadata.put("batchNo", batchNo); + metadata.put("receivedEventCount", events.size()); + metadata.put("sourcePackageRefPolicy", "Original source package is preserved per acquired event."); + return metadata; + } + + private OffsetDateTime occurredFrom(List events, ImportTimeChunkDto chunk) { + if (chunk.occurredFrom() != null) { + return chunk.occurredFrom(); + } + return events.stream() + .map(EventHubEventDto::occurredAt) + .filter(value -> value != null) + .min(OffsetDateTime::compareTo) + .orElse(null); + } + + private OffsetDateTime occurredTo(List events, ImportTimeChunkDto chunk) { + if (chunk.occurredTo() != null) { + return chunk.occurredTo(); + } + return events.stream() + .map(EventHubEventDto::occurredAt) + .filter(value -> value != null) + .max(OffsetDateTime::compareTo) + .orElse(null); } private void logEventExtractionProgress( @@ -159,6 +281,7 @@ public abstract class AbstractJdbcExtractionBatchExecutor eventTypeCounts() { return eventTypeCounts; } + private int nextPersistBatchNo() { + persistBatchNo++; + return persistBatchNo; + } + + private void acceptPersistResult(EventHubPackageResult result) { + if (result != null) { + eventsInserted += result.insertedCount(); + } + } + private void accept(EventHubEventDto event) { eventsMapped++; eventTypeCounts.merge(eventTypeKey(event), 1, Integer::sum); @@ -290,10 +431,17 @@ public abstract class AbstractJdbcExtractionBatchExecutor 0)) { - lastSourcePackageImportedAt = importedAt; - lastSourcePackageIdByImportedAt = sourcePackageId; + if (importedAt != null) { + if (lastSourcePackageImportedAt == null || importedAt.compareTo(lastSourcePackageImportedAt) > 0) { + lastSourcePackageImportedAt = importedAt; + lastSourcePackageIdByImportedAt = sourcePackageId; + } else if (importedAt.compareTo(lastSourcePackageImportedAt) == 0 + && sourcePackageId != null + && !sourcePackageId.isBlank() + && (lastSourcePackageIdByImportedAt == null + || compareSourcePackageId(sourcePackageId, lastSourcePackageIdByImportedAt) > 0)) { + lastSourcePackageIdByImportedAt = sourcePackageId; + } } if (sourcePackageId != null && !sourcePackageId.isBlank() diff --git a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java index 49c774a..10e969e 100644 --- a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java @@ -48,6 +48,13 @@ public class DataPackageRepository { SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup(); ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope(); SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation(); + UUID importRunId = metadataUuid(metadata, "importRunId"); + String extractionCode = metadataString(metadata, "extractionCode"); + String extractionSourceKind = metadataString(metadata, "sourceKind"); + String entityAxis = metadataString(metadata, "entityAxis"); + Integer batchNo = metadataInteger(metadata, "batchNo"); + OffsetDateTime chunkFrom = metadataOffsetDateTime(metadata, "chunkOccurredFrom"); + OffsetDateTime chunkTo = metadataOffsetDateTime(metadata, "chunkOccurredTo"); return jdbcTemplate.query( con -> { @@ -64,9 +71,16 @@ public class DataPackageRepository { received_at, event_count, metadata ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), ?, ?::jsonb) on conflict (tenant_key, event_source_id, package_key) do update - set status = excluded.status, + set import_run_id = excluded.import_run_id, + status = excluded.status, occurred_from = excluded.occurred_from, occurred_to = excluded.occurred_to, + extraction_code = excluded.extraction_code, + extraction_source_kind = excluded.extraction_source_kind, + entity_axis = excluded.entity_axis, + batch_no = excluded.batch_no, + chunk_from = excluded.chunk_from, + chunk_to = excluded.chunk_to, event_count = excluded.event_count, metadata = excluded.metadata, error_message = null, @@ -75,7 +89,7 @@ public class DataPackageRepository { """); ps.setObject(1, id); ps.setInt(2, eventSourceId); - ps.setObject(3, null); + ps.setObject(3, importRunId); ps.setString(4, packageInfo == null ? "default" : packageInfo.tenantKey()); ps.setString(5, packageKey); ps.setString(6, packageType.name()); @@ -94,12 +108,12 @@ public class DataPackageRepository { ps.setString(19, packageInfo == null ? null : packageInfo.eventFamily()); ps.setObject(20, packageInfo == null ? null : packageInfo.businessDate()); ps.setString(21, packageInfo == null ? packageKey : packageInfo.externalPackageId()); - ps.setString(22, null); - ps.setString(23, null); - ps.setString(24, null); - ps.setObject(25, null); - ps.setObject(26, null); - ps.setObject(27, null); + ps.setString(22, extractionCode); + ps.setString(23, extractionSourceKind); + ps.setString(24, entityAxis); + ps.setObject(25, batchNo); + ps.setObject(26, chunkFrom); + ps.setObject(27, chunkTo); ps.setString(28, null); ps.setString(29, null); ps.setString(30, null); @@ -343,6 +357,61 @@ public class DataPackageRepository { ); } + private UUID metadataUuid(Map metadata, String key) { + String value = metadataString(metadata, key); + if (value == null) { + return null; + } + try { + return UUID.fromString(value); + } catch (IllegalArgumentException ignored) { + return null; + } + } + + private String metadataString(Map metadata, String key) { + if (metadata == null) { + return null; + } + Object value = metadata.get(key); + if (value == null) { + return null; + } + String text = value.toString().trim(); + return text.isEmpty() ? null : text; + } + + private Integer metadataInteger(Map metadata, String key) { + String value = metadataString(metadata, key); + if (value == null) { + return null; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException ignored) { + return null; + } + } + + private OffsetDateTime metadataOffsetDateTime(Map metadata, String key) { + Object value = metadata == null ? null : metadata.get(key); + if (value == null) { + return null; + } + if (value instanceof OffsetDateTime offsetDateTime) { + return offsetDateTime; + } + String text = value.toString().trim(); + if (text.isEmpty()) { + return null; + } + try { + return OffsetDateTime.parse(text); + } catch (RuntimeException ignored) { + return null; + } + } + private String toJson(Map value) { try { return objectMapper.writeValueAsString(normalizeMetadataMap(value)); diff --git a/src/main/java/at/procon/eventhub/tachograph/config/TachographDataSourceConfig.java b/src/main/java/at/procon/eventhub/tachograph/config/TachographDataSourceConfig.java index 1216006..4b55090 100644 --- a/src/main/java/at/procon/eventhub/tachograph/config/TachographDataSourceConfig.java +++ b/src/main/java/at/procon/eventhub/tachograph/config/TachographDataSourceConfig.java @@ -10,7 +10,7 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.datasource.DriverManagerDataSource; @Configuration -@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''") +@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.tachograph.datasource.jdbc-url:}')") public class TachographDataSourceConfig { private static final String SQL_SERVER_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; diff --git a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java index 1cc0ac9..3f19af8 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java @@ -1,5 +1,6 @@ package at.procon.eventhub.tachograph.service; +import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.ImportCursorStateDto; import at.procon.eventhub.dto.ImportScopeDto; @@ -8,6 +9,7 @@ import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.importing.extraction.AbstractJdbcExtractionBatchExecutor; import at.procon.eventhub.importing.extraction.ExtractionDefinition; import at.procon.eventhub.importing.persistence.ImportCursorRepository; +import at.procon.eventhub.service.EventHubIngestionService; import at.procon.eventhub.tachograph.dto.TachographExtractionBatchResultDto; import at.procon.eventhub.tachograph.dto.TachographImportRequest; import java.time.LocalDateTime; @@ -26,7 +28,7 @@ import org.springframework.stereotype.Service; @Service @ConditionalOnBean(name = "tachographNamedParameterJdbcTemplate") -@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' != ''") +@ConditionalOnExpression("T(org.springframework.util.StringUtils).hasText('${eventhub.tachograph.datasource.jdbc-url:}')") public class JdbcTachographExtractionBatchExecutor extends AbstractJdbcExtractionBatchExecutor implements TachographExtractionBatchExecutor { @@ -35,12 +37,14 @@ public class JdbcTachographExtractionBatchExecutor public JdbcTachographExtractionBatchExecutor( @Qualifier("tachographNamedParameterJdbcTemplate") NamedParameterJdbcTemplate tachographJdbcTemplate, + EventHubIngestionService ingestionService, ProducerTemplate producerTemplate, + EventHubProperties eventHubProperties, ResourceLoader resourceLoader, TachographExtractionDefinitionRegistry definitionRegistry, ImportCursorRepository importCursorRepository ) { - super(tachographJdbcTemplate, producerTemplate, resourceLoader, importCursorRepository); + super(tachographJdbcTemplate, ingestionService, producerTemplate, eventHubProperties, resourceLoader, importCursorRepository); this.definitionRegistry = definitionRegistry; } @@ -78,7 +82,7 @@ public class JdbcTachographExtractionBatchExecutor planItem.sourceKind(), stats.eventsMapped(), stats.eventsMapped(), - stats.eventsMapped(), + eventsInsertedOrSubmitted(stats), 0, true, lastSourcePackageImportedAt(stats, cursor), diff --git a/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java index 8364158..48b3273 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java @@ -19,7 +19,7 @@ import org.springframework.stereotype.Service; */ @Service @ConditionalOnMissingBean(TachographExtractionBatchExecutor.class) -@ConditionalOnExpression("'${eventhub.tachograph.datasource.jdbc-url:}' == ''") +@ConditionalOnExpression("!T(org.springframework.util.StringUtils).hasText('${eventhub.tachograph.datasource.jdbc-url:}')") public class NoopTachographExtractionBatchExecutor extends AbstractNoopExtractionBatchExecutor implements TachographExtractionBatchExecutor { diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java index 42d417a..216ebad 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java @@ -1,6 +1,5 @@ package at.procon.eventhub.tachograph.service; -import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.importing.AbstractImportExecutionService; import at.procon.eventhub.importing.ImportPlanDto; @@ -33,11 +32,10 @@ public class TachographImportExecutionService ImportRunRepository importRunRepository, DataPackageRepository dataPackageRepository, ImportCursorRepository importCursorRepository, - EventHubProperties eventHubProperties, TachographMasterDataRefreshService masterDataRefreshService, TachographExtractionBatchExecutor extractionBatchExecutor ) { - super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository, eventHubProperties); + super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository); this.planService = planService; this.masterDataRefreshService = masterDataRefreshService; this.extractionBatchExecutor = extractionBatchExecutor; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 2f52350..b6ab787 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,17 +2,17 @@ spring: application: name: eventhub-ingestion-service datasource: - url: jdbc:postgresql://localhost:5432/eventhub - username: postgres - password: P54!pcd#Wi + url: jdbc:postgresql://${DB_HOST:localhost}:${DB_PORT:5432}/${DB_NAME:eventhub} + username: ${DB_USER:} + password: ${DB_PASSWORD:} hikari: maximum-pool-size: 16 minimum-idle: 4 connection-timeout: 30000 validation-timeout: 5000 - idle-timeout: 60000 - keepalive-time: 30000 - max-lifetime: 90000 + idle-timeout: 300000 + keepalive-time: 120000 + max-lifetime: 540000 flyway: enabled: true default-schema: eventhub @@ -54,21 +54,27 @@ eventhub: default-chunk-days: 1 occurred-at-overlap: 7d - # Configure this block to enable JdbcTachographExtractionBatchExecutor. + # Set TACHOGRAPH_DB_JDBC_URL to enable JdbcTachographExtractionBatchExecutor. datasource: - jdbc-url: jdbc:sqlserver://db.bytebar.eu:22996;databaseName=ByteBarDriverSettlement;trustServerCertificate=true - username: ReadOnly - password: p2=race! + jdbc-url: ${TACHOGRAPH_DB_JDBC_URL:} + username: ${TACHOGRAPH_DB_USER:} + password: ${TACHOGRAPH_DB_PASSWORD:} driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver # Enables the scheduler that regularly triggers configured tachograph import plans. + # Default is safe: no scheduled import starts unless explicitly enabled. scheduler-enabled: true - scheduler-poll-interval-ms: 60000 + scheduler-poll-interval-ms: 3600000 # PLAN_ONLY creates import_run + planned extraction packages. # EXECUTE also invokes the configured TachographExtractionBatchExecutor. scheduler-trigger-mode: EXECUTE + # JDBC extraction handoff mode: + # SYNC_DIRECT = persist controlled JDBC batches directly, cursor-safe default. + # CAMEL_ROUTE = persist the same controlled batches through direct:eventhub-batch-persist-input. + jdbc-extraction-ingest-mode: ${TACHOGRAPH_JDBC_EXTRACTION_INGEST_MODE:SYNC_DIRECT} + # Example plan. Keep disabled until the tachograph datasource/extractor is wired. import-plans: - plan-key: kralowetz-tachograph-org-147 @@ -111,5 +117,54 @@ eventhub: scheduled-strategy: SOURCE_PACKAGE_WATERMARK refresh-master-data-first: false initial-occurred-from: "2026-01-21T00:00:00+01:00" - initial-occurred-to: "2026-01-25T00:00:00+01:00" + initial-occurred-to: "2026-01-31T00:00:00+01:00" run-initial-on-startup: true + + yellow-fox: + default-chunk-days: 1 + occurred-at-overlap: 2h + emit-initial-ignition-snapshot: false + + datasource: + jdbc-url: ${YELLOWFOX_DB_JDBC_URL:} + username: ${YELLOWFOX_DB_USERNAME:} + password: ${YELLOWFOX_DB_PASSWORD:} + driver-class-name: org.postgresql.Driver + + scheduler-enabled: false + scheduler-poll-interval-ms: 60000 + scheduler-trigger-mode: PLAN_ONLY + + import-plans: + - plan-key: yellowfox-d8-default + enabled: false + cron: "0 */5 * * * *" + tenant-key: default + event-source: + provider-key: YELLOWFOX + source-kind: TELEMATICS_PLATFORM + source-key: YELLOWFOX_D8 + source-instance-key: logistics-db-prod + tenant-provider-setting-key: yellowfox-main + source-group: + type: FLEET + source-entity-id: null + code: null + name: null + import-scope: + type: TENANT_ALL + root-source-organisation: null + include-children: false + occurred-from: null + occurred-to: null + event-families: + - DRIVER_ACTIVITY + - DRIVER_CARD + initial-mode: INITIAL_BACKFILL + scheduled-mode: INCREMENTAL_UPDATE + initial-strategy: OCCURRED_AT_WINDOW_WITH_OVERLAP + scheduled-strategy: SOURCE_ROW_WATERMARK + refresh-master-data-first: false + initial-occurred-from: null + initial-occurred-to: null + run-initial-on-startup: false \ No newline at end of file diff --git a/src/main/resources/sql/tachograph/card-activity.sql b/src/main/resources/sql/tachograph/card-activity.sql index 313c1b8..e8860e7 100644 --- a/src/main/resources/sql/tachograph/card-activity.sql +++ b/src/main/resources/sql/tachograph/card-activity.sql @@ -98,6 +98,8 @@ Base as ( left join dbo.Vehicle v on v.ID = cvu.ID_Vehicle left join dbo.Nation vn on vn.ID = v.ID_Nation ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:CARD_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id, @@ -134,6 +136,7 @@ select 'DRIVER_CARD' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, @@ -148,3 +151,20 @@ where (:occurredFrom is null or evt.occurred_at >= :occurredFrom) /* * Organisation filter: driver membership in GetOrganisationTree(null, :organisationId, 0, null). */ +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/card-border-crossing.sql b/src/main/resources/sql/tachograph/card-border-crossing.sql index 781b940..9096a20 100644 --- a/src/main/resources/sql/tachograph/card-border-crossing.sql +++ b/src/main/resources/sql/tachograph/card-border-crossing.sql @@ -65,6 +65,8 @@ Base as ( ) ) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:CARD_BORDER_CROSSING:', base.ID) as external_source_event_id, @@ -89,8 +91,26 @@ select 'DRIVER_CARD' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, base.source_package_imported_at from Base base +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/card-load-unload.sql b/src/main/resources/sql/tachograph/card-load-unload.sql index 4d25fc5..f63e981 100644 --- a/src/main/resources/sql/tachograph/card-load-unload.sql +++ b/src/main/resources/sql/tachograph/card-load-unload.sql @@ -62,6 +62,8 @@ Base as ( ) ) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:CARD_LOAD_UNLOAD:', base.ID) as external_source_event_id, @@ -94,8 +96,26 @@ select 'DRIVER_CARD' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, base.source_package_imported_at from Base base +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/card-place.sql b/src/main/resources/sql/tachograph/card-place.sql index 3bded03..62979ce 100644 --- a/src/main/resources/sql/tachograph/card-place.sql +++ b/src/main/resources/sql/tachograph/card-place.sql @@ -66,6 +66,8 @@ Base as ( ) ) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:CARD_PLACE:', base.ID) as external_source_event_id, @@ -93,8 +95,26 @@ select 'DRIVER_CARD' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, base.source_package_imported_at from Base base +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/card-position.sql b/src/main/resources/sql/tachograph/card-position.sql index f5797cf..bf80061 100644 --- a/src/main/resources/sql/tachograph/card-position.sql +++ b/src/main/resources/sql/tachograph/card-position.sql @@ -61,6 +61,8 @@ Base as ( ) ) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:CARD_POSITION:', base.ID) as external_source_event_id, @@ -83,8 +85,26 @@ select 'DRIVER_CARD' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, base.source_package_imported_at from Base base +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/card-specific-condition.sql b/src/main/resources/sql/tachograph/card-specific-condition.sql index 7dd677f..283555c 100644 --- a/src/main/resources/sql/tachograph/card-specific-condition.sql +++ b/src/main/resources/sql/tachograph/card-specific-condition.sql @@ -59,6 +59,8 @@ Base as ( ) ) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:CARD_SPECIFIC_CONDITION:', base.ID) as external_source_event_id, @@ -81,8 +83,26 @@ select 'DRIVER_CARD' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, base.source_package_imported_at from Base base +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/card-vehicles-used.sql b/src/main/resources/sql/tachograph/card-vehicles-used.sql index 819e884..5af0dec 100644 --- a/src/main/resources/sql/tachograph/card-vehicles-used.sql +++ b/src/main/resources/sql/tachograph/card-vehicles-used.sql @@ -80,6 +80,8 @@ Base as ( and (:occurredFrom is null or evt.occurred_at >= :occurredFrom) and (:occurredTo is null or evt.occurred_at < :occurredTo) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:CARD_VEHICLES_USED:', base.ID, ':', base.lifecycle) as external_source_event_id, @@ -107,8 +109,26 @@ select 'DRIVER_CARD' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, base.source_package_imported_at from Base base +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/iw-cycle.sql b/src/main/resources/sql/tachograph/iw-cycle.sql index 5c3862d..3118987 100644 --- a/src/main/resources/sql/tachograph/iw-cycle.sql +++ b/src/main/resources/sql/tachograph/iw-cycle.sql @@ -71,6 +71,8 @@ Base as ( and (:occurredFrom is null or evt.occurred_at >= :occurredFrom) and (:occurredTo is null or evt.occurred_at < :occurredTo) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:IW_CYCLE:', base.ID, ':', base.lifecycle) as external_source_event_id, @@ -98,6 +100,7 @@ select 'VEHICLE_UNIT' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, @@ -126,3 +129,20 @@ where ( and rel.GILT_BIS is null ) ) +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/speeding-events.sql b/src/main/resources/sql/tachograph/speeding-events.sql index 1cce29f..5820483 100644 --- a/src/main/resources/sql/tachograph/speeding-events.sql +++ b/src/main/resources/sql/tachograph/speeding-events.sql @@ -64,6 +64,8 @@ Events as ( 'END' as lifecycle from Base base ) +, +Extracted as ( select concat(cast(events.ID as varchar(128)), ':', events.lifecycle) as source_row_id, concat('TACHOGRAPH:SPEEDING_EVENTS:', events.ID, ':', events.lifecycle) as external_source_event_id, @@ -86,6 +88,7 @@ select 'VEHICLE_UNIT' as source_package_kind, cast(events.source_package_id_raw as varchar(128)) as source_package_id, + events.source_package_id_raw as source_package_sort_id, cast(events.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, events.source_package_period_from, events.source_package_period_to, @@ -116,3 +119,20 @@ where (:occurredFrom is null or events.occurred_at >= :occurredFrom) and rel.GILT_BIS is null ) ) +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/vu-activity.sql b/src/main/resources/sql/tachograph/vu-activity.sql index c0b10ae..d52975a 100644 --- a/src/main/resources/sql/tachograph/vu-activity.sql +++ b/src/main/resources/sql/tachograph/vu-activity.sql @@ -124,6 +124,8 @@ Base as ( left join dbo.Card c on c.ID = iw.ID_Card left join dbo.Nation cn on cn.ID = c.ID_Nation ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:VU_ACTIVITY:', base.ID, ':', evt.lifecycle) as external_source_event_id, @@ -160,6 +162,7 @@ select 'VEHICLE_UNIT' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, @@ -174,3 +177,20 @@ where (:occurredFrom is null or evt.occurred_at >= :occurredFrom) /* * Organisation filter: vehicle membership in GetOrganisationTree(null, :organisationId, 0, null). */ +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/vu-border-crossing.sql b/src/main/resources/sql/tachograph/vu-border-crossing.sql index 8c579df..7c921c6 100644 --- a/src/main/resources/sql/tachograph/vu-border-crossing.sql +++ b/src/main/resources/sql/tachograph/vu-border-crossing.sql @@ -46,6 +46,8 @@ Base as ( where (:occurredFrom is null or border.Timestamp >= :occurredFrom) and (:occurredTo is null or border.Timestamp < :occurredTo) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:VU_BORDER_CROSSING:', base.ID) as external_source_event_id, @@ -70,6 +72,7 @@ select 'VEHICLE_UNIT' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, @@ -98,3 +101,20 @@ where ( and rel.GILT_BIS is null ) ) +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/vu-load-unload.sql b/src/main/resources/sql/tachograph/vu-load-unload.sql index 90fc3a3..b54dc18 100644 --- a/src/main/resources/sql/tachograph/vu-load-unload.sql +++ b/src/main/resources/sql/tachograph/vu-load-unload.sql @@ -43,6 +43,8 @@ Base as ( where (:occurredFrom is null or lu.Timestamp >= :occurredFrom) and (:occurredTo is null or lu.Timestamp < :occurredTo) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:VU_LOAD_UNLOAD:', base.ID) as external_source_event_id, @@ -75,6 +77,7 @@ select 'VEHICLE_UNIT' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, @@ -103,3 +106,20 @@ where ( and rel.GILT_BIS is null ) ) +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/vu-place.sql b/src/main/resources/sql/tachograph/vu-place.sql index b33f7e8..9d0ce32 100644 --- a/src/main/resources/sql/tachograph/vu-place.sql +++ b/src/main/resources/sql/tachograph/vu-place.sql @@ -44,6 +44,8 @@ Base as ( where (:occurredFrom is null or place.EntryTime >= :occurredFrom) and (:occurredTo is null or place.EntryTime < :occurredTo) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:VU_PLACE:', base.ID) as external_source_event_id, @@ -71,6 +73,7 @@ select 'VEHICLE_UNIT' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, @@ -99,3 +102,20 @@ where ( and rel.GILT_BIS is null ) ) +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/vu-position.sql b/src/main/resources/sql/tachograph/vu-position.sql index 95153c8..a7ed53a 100644 --- a/src/main/resources/sql/tachograph/vu-position.sql +++ b/src/main/resources/sql/tachograph/vu-position.sql @@ -42,6 +42,8 @@ Base as ( where (:occurredFrom is null or pos.Timestamp >= :occurredFrom) and (:occurredTo is null or pos.Timestamp < :occurredTo) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:VU_POSITION:', base.ID) as external_source_event_id, @@ -64,6 +66,7 @@ select 'VEHICLE_UNIT' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, @@ -92,3 +95,20 @@ where ( and rel.GILT_BIS is null ) ) +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +) diff --git a/src/main/resources/sql/tachograph/vu-specific-condition.sql b/src/main/resources/sql/tachograph/vu-specific-condition.sql index bfe22ae..5935f26 100644 --- a/src/main/resources/sql/tachograph/vu-specific-condition.sql +++ b/src/main/resources/sql/tachograph/vu-specific-condition.sql @@ -32,6 +32,8 @@ Base as ( where (:occurredFrom is null or cond.EntryTime >= :occurredFrom) and (:occurredTo is null or cond.EntryTime < :occurredTo) ) +, +Extracted as ( select cast(base.ID as varchar(128)) as source_row_id, concat('TACHOGRAPH:VU_SPECIFIC_CONDITION:', base.ID) as external_source_event_id, @@ -54,6 +56,7 @@ select 'VEHICLE_UNIT' as source_package_kind, cast(base.source_package_id_raw as varchar(128)) as source_package_id, + base.source_package_id_raw as source_package_sort_id, cast(base.source_package_entity_id_raw as varchar(128)) as source_package_entity_id, base.source_package_period_from, base.source_package_period_to, @@ -82,3 +85,20 @@ where ( and rel.GILT_BIS is null ) ) +) +select * +from Extracted extracted +where ( + :sourcePackageWatermarkEnabled = 0 + or :lastSourcePackageImportedAt is null + or extracted.source_package_imported_at is null + or extracted.source_package_imported_at > :lastSourcePackageImportedAt + or ( + extracted.source_package_imported_at = :lastSourcePackageImportedAt + and ( + :lastSourcePackageIdNumeric is null + or extracted.source_package_sort_id is null + or extracted.source_package_sort_id > :lastSourcePackageIdNumeric + ) + ) +)