From 94e1227ab3184b3a1c92ab892e431ce9372ad34d Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 11 May 2026 11:26:59 +0200 Subject: [PATCH] Add direct JDBC batch ingestion for YellowFox --- .../eventhub/config/EventHubProperties.java | 13 ++ ...owFoxD8BookingExtractionBatchExecutor.java | 187 ++++++++++++++++-- src/main/resources/application.yml | 5 + ...kingExtractionBatchExecutorCursorTest.java | 1 + ...ExtractionBatchExecutorIngestModeTest.java | 113 +++++++++++ ...xD8BookingExtractionBatchExecutorTest.java | 1 + 6 files changed, 305 insertions(+), 15 deletions(-) create mode 100644 src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorIngestModeTest.java diff --git a/src/main/java/at/procon/eventhub/config/EventHubProperties.java b/src/main/java/at/procon/eventhub/config/EventHubProperties.java index 290754c..85092b0 100644 --- a/src/main/java/at/procon/eventhub/config/EventHubProperties.java +++ b/src/main/java/at/procon/eventhub/config/EventHubProperties.java @@ -354,6 +354,9 @@ public class EventHubProperties { /** Overlap used by incremental utc/eventid cursor imports to catch late rows and ignition transitions. */ private Duration occurredAtOverlap = Duration.ofHours(2); + /** How JDBC extraction batches are handed over to the ingest pipeline. */ + private JdbcExtractionIngestMode jdbcExtractionIngestMode = JdbcExtractionIngestMode.SYNC_DIRECT; + /** Regular scheduler scan interval; each configured plan still uses its own cron. */ private Duration schedulerPollInterval = Duration.ofMinutes(1); @@ -392,6 +395,16 @@ public class EventHubProperties { } } + public JdbcExtractionIngestMode getJdbcExtractionIngestMode() { + return jdbcExtractionIngestMode; + } + + public void setJdbcExtractionIngestMode(JdbcExtractionIngestMode jdbcExtractionIngestMode) { + this.jdbcExtractionIngestMode = jdbcExtractionIngestMode == null + ? JdbcExtractionIngestMode.SYNC_DIRECT + : jdbcExtractionIngestMode; + } + public Duration getSchedulerPollInterval() { return schedulerPollInterval; } diff --git a/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java index ee34f31..72317c1 100644 --- a/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutor.java @@ -2,7 +2,12 @@ package at.procon.eventhub.yellowfox.service; import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.dto.AcquisitionStrategy; +import at.procon.eventhub.dto.DataPackageType; +import at.procon.eventhub.dto.EventHubEventBatchDto; import at.procon.eventhub.dto.EventHubEventDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventHubPackageResult; +import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.EventType; import at.procon.eventhub.dto.ImportCursorStateDto; import at.procon.eventhub.dto.ImportScopeDto; @@ -10,14 +15,17 @@ import at.procon.eventhub.dto.SourceGroupType; import at.procon.eventhub.importing.ImportPlanItemDto; import at.procon.eventhub.importing.ImportTimeChunkDto; import at.procon.eventhub.importing.persistence.ImportCursorRepository; +import at.procon.eventhub.service.EventHubIngestionService; import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto; import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto; import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.camel.ProducerTemplate; @@ -40,6 +48,7 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD private static final int PROGRESS_LOG_INTERVAL = 5000; private final NamedParameterJdbcTemplate jdbcTemplate; + private final EventHubIngestionService ingestionService; private final ProducerTemplate producerTemplate; private final ResourceLoader resourceLoader; private final ImportCursorRepository importCursorRepository; @@ -50,6 +59,7 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD public JdbcYellowFoxD8BookingExtractionBatchExecutor( @Qualifier("yellowFoxNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate, + EventHubIngestionService ingestionService, ProducerTemplate producerTemplate, ResourceLoader resourceLoader, ImportCursorRepository importCursorRepository, @@ -59,6 +69,7 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector ) { this.jdbcTemplate = jdbcTemplate; + this.ingestionService = ingestionService; this.producerTemplate = producerTemplate; this.resourceLoader = resourceLoader; this.importCursorRepository = importCursorRepository; @@ -84,12 +95,15 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk); ImportCursorStateDto cursor = findCursor(eventSourceId, request, planItem); QuerySpec query = buildQuerySpec(request, chunkScope, cursor); + EventHubPackageRequest packageInfo = packageInfo(importRunId, request, planItem, chunk, chunkScope); Stats stats = new Stats(); + List pendingEvents = new ArrayList<>(jdbcPersistBatchSize()); YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = ignitionTransitionDetector .newSession(properties.getYellowFox().isEmitInitialIgnitionSnapshot()); - log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={}", - request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), query.fleetId(), request.acquisitionStrategy()); + log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={} ingestMode={}", + request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), query.fleetId(), + request.acquisitionStrategy(), jdbcExtractionIngestMode()); jdbcTemplate.query(query.sql(), query.params(), rs -> { stats.sourceRowsRead++; @@ -105,27 +119,33 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD return; } EventHubEventDto primaryEvent = eventMapper.map(booking); - send(primaryEvent, stats); + pendingEvents.add(primaryEvent); + stats.acceptEvent(primaryEvent); EventHubEventDto ignitionEvent = ignitionSession.detect(booking); if (ignitionEvent != null) { - send(ignitionEvent, stats); + pendingEvents.add(ignitionEvent); + stats.acceptEvent(ignitionEvent); + } + if (pendingEvents.size() >= jdbcPersistBatchSize()) { + flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats); } if (stats.sourceRowsRead % PROGRESS_LOG_INTERVAL == 0) { - log.info("YellowFox D8 extraction progress tenant={} importRunId={} packageId={} rows={} events={} byType={}", - request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.eventTypeCounts); + log.info("YellowFox D8 extraction progress tenant={} importRunId={} packageId={} rows={} events={} inserted={} byType={}", + request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsMapped, stats.eventsInserted, stats.eventTypeCounts); } }); + flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats); - log.info("Finished YellowFox D8 extraction tenant={} importRunId={} packageId={} rows={} events={} skippedRows={} byType={}", - request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.skippedRows, stats.eventTypeCounts); + log.info("Finished YellowFox D8 extraction tenant={} importRunId={} packageId={} rows={} events={} inserted={} skippedRows={} byType={}", + request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsMapped, stats.eventsInserted, stats.skippedRows, stats.eventTypeCounts); return new YellowFoxD8ExtractionBatchResultDto( packageId, planItem.extractionCode(), planItem.sourceKind(), stats.sourceRowsRead, - stats.eventsSent, - stats.eventsSent, + stats.eventsMapped, + stats.eventsInserted, stats.skippedRows, true, null, @@ -136,6 +156,14 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD ); } + EventHubProperties.JdbcExtractionIngestMode jdbcExtractionIngestMode() { + return properties.getYellowFox().getJdbcExtractionIngestMode(); + } + + int jdbcPersistBatchSize() { + return Math.max(1, properties.getBatch().getCompletionSize()); + } + QuerySpec buildQuerySpec(YellowFoxD8ImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) { Map params = new HashMap<>(); StringBuilder filters = new StringBuilder("where 1 = 1"); @@ -211,9 +239,125 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD || (booking.vehicleRef() != null && booking.vehicleRef().hasAnyReference()); } - private void send(EventHubEventDto event, Stats stats) { - producerTemplate.sendBody("direct:eventhub-normalized-input", event); - stats.acceptEvent(event); + private void flushPersistBatch( + YellowFoxD8ImportRequest request, + UUID importRunId, + UUID extractionPackageId, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk, + EventHubPackageRequest packageInfo, + List pendingEvents, + Stats stats + ) { + if (pendingEvents.isEmpty()) { + return; + } + int batchNo = stats.nextPersistBatchNo(); + List eventsToPersist = List.copyOf(pendingEvents); + pendingEvents.clear(); + + EventHubEventBatchDto batch = new EventHubEventBatchDto( + packageInfo.externalPackageId() + ":JDBC-" + batchNo, + packageInfo, + DataPackageType.DB_EXTRACT, + occurredFrom(eventsToPersist, chunk), + occurredTo(eventsToPersist, chunk), + eventsToPersist, + persistBatchMetadata(request, importRunId, extractionPackageId, planItem, chunk, batchNo, eventsToPersist) + ); + EventHubPackageResult result = persistBatch(batch); + stats.acceptPersistResult(result); + log.info("Persisted YellowFox extraction batch tenant={} importRunId={} extractionPackageId={} extractionCode={} sourceKind={} chunk={} batchNo={} received={} inserted={}", + request.tenantKey(), importRunId, extractionPackageId, planItem.extractionCode(), planItem.sourceKind(), + chunk.sequence(), batchNo, result.receivedCount(), result.insertedCount()); + } + + EventHubPackageResult persistBatch(EventHubEventBatchDto batch) { + if (jdbcExtractionIngestMode() == EventHubProperties.JdbcExtractionIngestMode.CAMEL_ROUTE) { + return producerTemplate.requestBody("direct:eventhub-batch-persist-input", batch, EventHubPackageResult.class); + } + return ingestionService.ingest(batch); + } + + private Map persistBatchMetadata( + YellowFoxD8ImportRequest request, + UUID importRunId, + UUID extractionPackageId, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk, + int batchNo, + List events + ) { + Map metadata = new LinkedHashMap<>(); + metadata.put("ingestMode", jdbcExtractionIngestMode().name()); + metadata.put("importRunId", importRunId.toString()); + metadata.put("extractionPackageId", extractionPackageId.toString()); + metadata.put("tenantKey", request.tenantKey()); + metadata.put("mode", request.mode().name()); + metadata.put("acquisitionStrategy", request.acquisitionStrategy().name()); + metadata.put("eventFamily", planItem.eventFamily().name()); + metadata.put("sourceKind", planItem.sourceKind()); + metadata.put("extractionCode", planItem.extractionCode()); + metadata.put("entityAxis", planItem.entityAxis()); + metadata.put("chunkSequence", chunk.sequence()); + metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString()); + metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString()); + metadata.put("batchNo", batchNo); + metadata.put("receivedEventCount", events.size()); + metadata.put("slotPolicy", "eventtype 0/2 = DRIVER, eventtype 1/3 = CO_DRIVER"); + metadata.put("ignitionPolicy", "Store ignition state on every D8 detail; emit separate ignition event only on state change."); + return metadata; + } + + private OffsetDateTime occurredFrom(List events, ImportTimeChunkDto chunk) { + if (chunk.occurredFrom() != null) { + return chunk.occurredFrom(); + } + return events.stream() + .map(EventHubEventDto::occurredAt) + .filter(value -> value != null) + .min(OffsetDateTime::compareTo) + .orElse(null); + } + + private OffsetDateTime occurredTo(List events, ImportTimeChunkDto chunk) { + if (chunk.occurredTo() != null) { + return chunk.occurredTo(); + } + return events.stream() + .map(EventHubEventDto::occurredAt) + .filter(value -> value != null) + .max(OffsetDateTime::compareTo) + .orElse(null); + } + + private EventHubPackageRequest packageInfo( + UUID importRunId, + YellowFoxD8ImportRequest request, + ImportPlanItemDto planItem, + ImportTimeChunkDto chunk, + ImportScopeDto chunkScope + ) { + return new EventHubPackageRequest( + request.tenantKey(), + eventSourceFor(request, planItem), + request.sourceGroup(), + chunkScope, + planItem.eventFamily().name(), + chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(), + "YELLOWFOX_D8:" + planItem.sourceKind() + ":" + planItem.extractionCode() + ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence() + ); + } + + private EventSourceDto eventSourceFor(YellowFoxD8ImportRequest request, ImportPlanItemDto planItem) { + return new EventSourceDto( + "YELLOWFOX", + planItem.sourceKind(), + "YELLOWFOX_D8", + request.eventSource().sourceInstanceKey(), + request.eventSource().tenantProviderSettingKey(), + request.eventSource().externalFleetKey() + ); } private void appendCursorFilter( @@ -282,11 +426,13 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD private static class Stats { private int sourceRowsRead; - private int eventsSent; + private int eventsMapped; + private int eventsInserted; private int skippedRows; private OffsetDateTime lastOccurredAt; private String lastEventId; private final Map eventTypeCounts = new LinkedHashMap<>(); + private int persistBatchNo; private void acceptSourceRow(YellowFoxD8BookingDto booking) { if (booking.occurredAt() != null) { @@ -296,10 +442,21 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD } private void acceptEvent(EventHubEventDto event) { - eventsSent++; + eventsMapped++; EventType type = event.eventType(); String key = event.eventDomain().name() + "/" + (type == null ? "UNKNOWN" : type.name()); eventTypeCounts.merge(key, 1, Integer::sum); } + + private int nextPersistBatchNo() { + persistBatchNo++; + return persistBatchNo; + } + + private void acceptPersistResult(EventHubPackageResult result) { + if (result != null) { + eventsInserted += result.insertedCount(); + } + } } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0a14871..cf514eb 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -138,6 +138,11 @@ eventhub: emit-initial-ignition-snapshot: false sync-vehicle-registrations-on-master-data-update: false + # JDBC extraction handoff mode: + # SYNC_DIRECT = persist controlled JDBC batches directly, cursor-safe default. + # CAMEL_ROUTE = persist the same controlled batches through direct:eventhub-batch-persist-input. + jdbc-extraction-ingest-mode: ${YELLOWFOX_JDBC_EXTRACTION_INGEST_MODE:SYNC_DIRECT} + datasource: jdbc-url: ${YELLOWFOX_DB_JDBC_URL:} username: ${YELLOWFOX_DB_USERNAME:} diff --git a/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest.java b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest.java index 55c57b6..3c86441 100644 --- a/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest.java +++ b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest.java @@ -76,6 +76,7 @@ class JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest { private JdbcYellowFoxD8BookingExtractionBatchExecutor executor(ImportCursorRepository repository) { EventHubProperties properties = new EventHubProperties(); return new JdbcYellowFoxD8BookingExtractionBatchExecutor( + null, null, null, new DefaultResourceLoader(), diff --git a/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorIngestModeTest.java b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorIngestModeTest.java new file mode 100644 index 0000000..28cfc9d --- /dev/null +++ b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorIngestModeTest.java @@ -0,0 +1,113 @@ +package at.procon.eventhub.yellowfox.service; + +import at.procon.eventhub.config.EventHubProperties; +import at.procon.eventhub.dto.DataPackageType; +import at.procon.eventhub.dto.EventHubEventBatchDto; +import at.procon.eventhub.dto.EventHubPackageRequest; +import at.procon.eventhub.dto.EventHubPackageResult; +import at.procon.eventhub.dto.EventSourceDto; +import at.procon.eventhub.dto.ImportScopeDto; +import at.procon.eventhub.service.EventHubIngestionService; +import java.time.OffsetDateTime; +import java.util.List; +import org.apache.camel.ProducerTemplate; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.DefaultResourceLoader; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class JdbcYellowFoxD8BookingExtractionBatchExecutorIngestModeTest { + + @Test + void usesDirectIngestionServiceInSyncDirectMode() { + EventHubIngestionService ingestionService = mock(EventHubIngestionService.class); + ProducerTemplate producerTemplate = mock(ProducerTemplate.class); + JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor( + EventHubProperties.JdbcExtractionIngestMode.SYNC_DIRECT, + ingestionService, + producerTemplate + ); + EventHubEventBatchDto batch = batch(); + EventHubPackageResult result = new EventHubPackageResult(null, batch.packageKey(), 2, 2); + + when(ingestionService.ingest(batch)).thenReturn(result); + + assertThat(executor.persistBatch(batch)).isEqualTo(result); + + verify(ingestionService).ingest(batch); + } + + @Test + void usesCamelBatchPersistRouteInCamelMode() { + EventHubIngestionService ingestionService = mock(EventHubIngestionService.class); + ProducerTemplate producerTemplate = mock(ProducerTemplate.class); + JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor( + EventHubProperties.JdbcExtractionIngestMode.CAMEL_ROUTE, + ingestionService, + producerTemplate + ); + EventHubEventBatchDto batch = batch(); + EventHubPackageResult result = new EventHubPackageResult(null, batch.packageKey(), 2, 2); + + when(producerTemplate.requestBody( + eq("direct:eventhub-batch-persist-input"), + eq(batch), + eq(EventHubPackageResult.class) + )).thenReturn(result); + + assertThat(executor.persistBatch(batch)).isEqualTo(result); + + verify(producerTemplate).requestBody( + "direct:eventhub-batch-persist-input", + batch, + EventHubPackageResult.class + ); + } + + private JdbcYellowFoxD8BookingExtractionBatchExecutor executor( + EventHubProperties.JdbcExtractionIngestMode ingestMode, + EventHubIngestionService ingestionService, + ProducerTemplate producerTemplate + ) { + EventHubProperties properties = new EventHubProperties(); + properties.getYellowFox().setJdbcExtractionIngestMode(ingestMode); + return new JdbcYellowFoxD8BookingExtractionBatchExecutor( + null, + ingestionService, + producerTemplate, + new DefaultResourceLoader(), + null, + properties, + null, + null, + null + ); + } + + private EventHubEventBatchDto batch() { + return new EventHubEventBatchDto( + "pkg-1", + new EventHubPackageRequest( + "tenant-1", + new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "instance-1", "setting-1", "7"), + null, + ImportScopeDto.tenantAll( + OffsetDateTime.parse("2026-04-01T00:00:00+02:00"), + OffsetDateTime.parse("2026-04-02T00:00:00+02:00") + ), + "DRIVER_ACTIVITY", + null, + "external-1" + ), + DataPackageType.DB_EXTRACT, + null, + null, + List.of(), + null + ); + } +} diff --git a/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorTest.java b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorTest.java index 583db11..91c1a0a 100644 --- a/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorTest.java +++ b/src/test/java/at/procon/eventhub/yellowfox/service/JdbcYellowFoxD8BookingExtractionBatchExecutorTest.java @@ -84,6 +84,7 @@ class JdbcYellowFoxD8BookingExtractionBatchExecutorTest { EventHubProperties properties = new EventHubProperties(); properties.getYellowFox().setOccurredAtOverlap(overlap); return new JdbcYellowFoxD8BookingExtractionBatchExecutor( + null, null, null, new DefaultResourceLoader(),