Add direct JDBC batch ingestion for YellowFox

This commit is contained in:
trifonovt 2026-05-11 11:26:59 +02:00
parent e84dfef614
commit 94e1227ab3
6 changed files with 305 additions and 15 deletions

View File

@ -354,6 +354,9 @@ public class EventHubProperties {
/** Overlap used by incremental utc/eventid cursor imports to catch late rows and ignition transitions. */
private Duration occurredAtOverlap = Duration.ofHours(2);
/** How JDBC extraction batches are handed over to the ingest pipeline. */
private JdbcExtractionIngestMode jdbcExtractionIngestMode = JdbcExtractionIngestMode.SYNC_DIRECT;
/** Regular scheduler scan interval; each configured plan still uses its own cron. */
private Duration schedulerPollInterval = Duration.ofMinutes(1);
@ -392,6 +395,16 @@ public class EventHubProperties {
}
}
public JdbcExtractionIngestMode getJdbcExtractionIngestMode() {
return jdbcExtractionIngestMode;
}
public void setJdbcExtractionIngestMode(JdbcExtractionIngestMode jdbcExtractionIngestMode) {
this.jdbcExtractionIngestMode = jdbcExtractionIngestMode == null
? JdbcExtractionIngestMode.SYNC_DIRECT
: jdbcExtractionIngestMode;
}
public Duration getSchedulerPollInterval() {
return schedulerPollInterval;
}

View File

@ -2,7 +2,12 @@ package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.AcquisitionStrategy;
import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubEventBatchDto;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventHubPackageResult;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportCursorStateDto;
import at.procon.eventhub.dto.ImportScopeDto;
@ -10,14 +15,17 @@ import at.procon.eventhub.dto.SourceGroupType;
import at.procon.eventhub.importing.ImportPlanItemDto;
import at.procon.eventhub.importing.ImportTimeChunkDto;
import at.procon.eventhub.importing.persistence.ImportCursorRepository;
import at.procon.eventhub.service.EventHubIngestionService;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8BookingDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ExtractionBatchResultDto;
import at.procon.eventhub.yellowfox.dto.YellowFoxD8ImportRequest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.camel.ProducerTemplate;
@ -40,6 +48,7 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
private static final int PROGRESS_LOG_INTERVAL = 5000;
private final NamedParameterJdbcTemplate jdbcTemplate;
private final EventHubIngestionService ingestionService;
private final ProducerTemplate producerTemplate;
private final ResourceLoader resourceLoader;
private final ImportCursorRepository importCursorRepository;
@ -50,6 +59,7 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
public JdbcYellowFoxD8BookingExtractionBatchExecutor(
@Qualifier("yellowFoxNamedParameterJdbcTemplate") NamedParameterJdbcTemplate jdbcTemplate,
EventHubIngestionService ingestionService,
ProducerTemplate producerTemplate,
ResourceLoader resourceLoader,
ImportCursorRepository importCursorRepository,
@ -59,6 +69,7 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
YellowFoxD8IgnitionTransitionDetector ignitionTransitionDetector
) {
this.jdbcTemplate = jdbcTemplate;
this.ingestionService = ingestionService;
this.producerTemplate = producerTemplate;
this.resourceLoader = resourceLoader;
this.importCursorRepository = importCursorRepository;
@ -84,12 +95,15 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
ImportScopeDto chunkScope = chunkScope(request.importScope(), chunk);
ImportCursorStateDto cursor = findCursor(eventSourceId, request, planItem);
QuerySpec query = buildQuerySpec(request, chunkScope, cursor);
EventHubPackageRequest packageInfo = packageInfo(importRunId, request, planItem, chunk, chunkScope);
Stats stats = new Stats();
List<EventHubEventDto> pendingEvents = new ArrayList<>(jdbcPersistBatchSize());
YellowFoxD8IgnitionTransitionDetector.Session ignitionSession = ignitionTransitionDetector
.newSession(properties.getYellowFox().isEmitInitialIgnitionSnapshot());
log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={}",
request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), query.fleetId(), request.acquisitionStrategy());
log.info("Reading YellowFox D8 bookings tenant={} importRunId={} packageId={} chunk={} occurredFrom={} occurredTo={} fleetId={} strategy={} ingestMode={}",
request.tenantKey(), importRunId, packageId, chunk.sequence(), chunk.occurredFrom(), chunk.occurredTo(), query.fleetId(),
request.acquisitionStrategy(), jdbcExtractionIngestMode());
jdbcTemplate.query(query.sql(), query.params(), rs -> {
stats.sourceRowsRead++;
@ -105,27 +119,33 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
return;
}
EventHubEventDto primaryEvent = eventMapper.map(booking);
send(primaryEvent, stats);
pendingEvents.add(primaryEvent);
stats.acceptEvent(primaryEvent);
EventHubEventDto ignitionEvent = ignitionSession.detect(booking);
if (ignitionEvent != null) {
send(ignitionEvent, stats);
pendingEvents.add(ignitionEvent);
stats.acceptEvent(ignitionEvent);
}
if (pendingEvents.size() >= jdbcPersistBatchSize()) {
flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats);
}
if (stats.sourceRowsRead % PROGRESS_LOG_INTERVAL == 0) {
log.info("YellowFox D8 extraction progress tenant={} importRunId={} packageId={} rows={} events={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.eventTypeCounts);
log.info("YellowFox D8 extraction progress tenant={} importRunId={} packageId={} rows={} events={} inserted={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsMapped, stats.eventsInserted, stats.eventTypeCounts);
}
});
flushPersistBatch(request, importRunId, packageId, planItem, chunk, packageInfo, pendingEvents, stats);
log.info("Finished YellowFox D8 extraction tenant={} importRunId={} packageId={} rows={} events={} skippedRows={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsSent, stats.skippedRows, stats.eventTypeCounts);
log.info("Finished YellowFox D8 extraction tenant={} importRunId={} packageId={} rows={} events={} inserted={} skippedRows={} byType={}",
request.tenantKey(), importRunId, packageId, stats.sourceRowsRead, stats.eventsMapped, stats.eventsInserted, stats.skippedRows, stats.eventTypeCounts);
return new YellowFoxD8ExtractionBatchResultDto(
packageId,
planItem.extractionCode(),
planItem.sourceKind(),
stats.sourceRowsRead,
stats.eventsSent,
stats.eventsSent,
stats.eventsMapped,
stats.eventsInserted,
stats.skippedRows,
true,
null,
@ -136,6 +156,14 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
);
}
EventHubProperties.JdbcExtractionIngestMode jdbcExtractionIngestMode() {
return properties.getYellowFox().getJdbcExtractionIngestMode();
}
int jdbcPersistBatchSize() {
return Math.max(1, properties.getBatch().getCompletionSize());
}
QuerySpec buildQuerySpec(YellowFoxD8ImportRequest request, ImportScopeDto scope, ImportCursorStateDto cursor) {
Map<String, Object> params = new HashMap<>();
StringBuilder filters = new StringBuilder("where 1 = 1");
@ -211,9 +239,125 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
|| (booking.vehicleRef() != null && booking.vehicleRef().hasAnyReference());
}
private void send(EventHubEventDto event, Stats stats) {
producerTemplate.sendBody("direct:eventhub-normalized-input", event);
stats.acceptEvent(event);
private void flushPersistBatch(
YellowFoxD8ImportRequest request,
UUID importRunId,
UUID extractionPackageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
EventHubPackageRequest packageInfo,
List<EventHubEventDto> pendingEvents,
Stats stats
) {
if (pendingEvents.isEmpty()) {
return;
}
int batchNo = stats.nextPersistBatchNo();
List<EventHubEventDto> eventsToPersist = List.copyOf(pendingEvents);
pendingEvents.clear();
EventHubEventBatchDto batch = new EventHubEventBatchDto(
packageInfo.externalPackageId() + ":JDBC-" + batchNo,
packageInfo,
DataPackageType.DB_EXTRACT,
occurredFrom(eventsToPersist, chunk),
occurredTo(eventsToPersist, chunk),
eventsToPersist,
persistBatchMetadata(request, importRunId, extractionPackageId, planItem, chunk, batchNo, eventsToPersist)
);
EventHubPackageResult result = persistBatch(batch);
stats.acceptPersistResult(result);
log.info("Persisted YellowFox extraction batch tenant={} importRunId={} extractionPackageId={} extractionCode={} sourceKind={} chunk={} batchNo={} received={} inserted={}",
request.tenantKey(), importRunId, extractionPackageId, planItem.extractionCode(), planItem.sourceKind(),
chunk.sequence(), batchNo, result.receivedCount(), result.insertedCount());
}
EventHubPackageResult persistBatch(EventHubEventBatchDto batch) {
if (jdbcExtractionIngestMode() == EventHubProperties.JdbcExtractionIngestMode.CAMEL_ROUTE) {
return producerTemplate.requestBody("direct:eventhub-batch-persist-input", batch, EventHubPackageResult.class);
}
return ingestionService.ingest(batch);
}
private Map<String, Object> persistBatchMetadata(
YellowFoxD8ImportRequest request,
UUID importRunId,
UUID extractionPackageId,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
int batchNo,
List<EventHubEventDto> events
) {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("ingestMode", jdbcExtractionIngestMode().name());
metadata.put("importRunId", importRunId.toString());
metadata.put("extractionPackageId", extractionPackageId.toString());
metadata.put("tenantKey", request.tenantKey());
metadata.put("mode", request.mode().name());
metadata.put("acquisitionStrategy", request.acquisitionStrategy().name());
metadata.put("eventFamily", planItem.eventFamily().name());
metadata.put("sourceKind", planItem.sourceKind());
metadata.put("extractionCode", planItem.extractionCode());
metadata.put("entityAxis", planItem.entityAxis());
metadata.put("chunkSequence", chunk.sequence());
metadata.put("chunkOccurredFrom", chunk.occurredFrom() == null ? null : chunk.occurredFrom().toString());
metadata.put("chunkOccurredTo", chunk.occurredTo() == null ? null : chunk.occurredTo().toString());
metadata.put("batchNo", batchNo);
metadata.put("receivedEventCount", events.size());
metadata.put("slotPolicy", "eventtype 0/2 = DRIVER, eventtype 1/3 = CO_DRIVER");
metadata.put("ignitionPolicy", "Store ignition state on every D8 detail; emit separate ignition event only on state change.");
return metadata;
}
private OffsetDateTime occurredFrom(List<EventHubEventDto> events, ImportTimeChunkDto chunk) {
if (chunk.occurredFrom() != null) {
return chunk.occurredFrom();
}
return events.stream()
.map(EventHubEventDto::occurredAt)
.filter(value -> value != null)
.min(OffsetDateTime::compareTo)
.orElse(null);
}
private OffsetDateTime occurredTo(List<EventHubEventDto> events, ImportTimeChunkDto chunk) {
if (chunk.occurredTo() != null) {
return chunk.occurredTo();
}
return events.stream()
.map(EventHubEventDto::occurredAt)
.filter(value -> value != null)
.max(OffsetDateTime::compareTo)
.orElse(null);
}
private EventHubPackageRequest packageInfo(
UUID importRunId,
YellowFoxD8ImportRequest request,
ImportPlanItemDto planItem,
ImportTimeChunkDto chunk,
ImportScopeDto chunkScope
) {
return new EventHubPackageRequest(
request.tenantKey(),
eventSourceFor(request, planItem),
request.sourceGroup(),
chunkScope,
planItem.eventFamily().name(),
chunk.occurredFrom() == null ? null : chunk.occurredFrom().toLocalDate(),
"YELLOWFOX_D8:" + planItem.sourceKind() + ":" + planItem.extractionCode() + ":RUN-" + importRunId + ":CHUNK-" + chunk.sequence()
);
}
private EventSourceDto eventSourceFor(YellowFoxD8ImportRequest request, ImportPlanItemDto planItem) {
return new EventSourceDto(
"YELLOWFOX",
planItem.sourceKind(),
"YELLOWFOX_D8",
request.eventSource().sourceInstanceKey(),
request.eventSource().tenantProviderSettingKey(),
request.eventSource().externalFleetKey()
);
}
private void appendCursorFilter(
@ -282,11 +426,13 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
private static class Stats {
private int sourceRowsRead;
private int eventsSent;
private int eventsMapped;
private int eventsInserted;
private int skippedRows;
private OffsetDateTime lastOccurredAt;
private String lastEventId;
private final Map<String, Integer> eventTypeCounts = new LinkedHashMap<>();
private int persistBatchNo;
private void acceptSourceRow(YellowFoxD8BookingDto booking) {
if (booking.occurredAt() != null) {
@ -296,10 +442,21 @@ public class JdbcYellowFoxD8BookingExtractionBatchExecutor implements YellowFoxD
}
private void acceptEvent(EventHubEventDto event) {
eventsSent++;
eventsMapped++;
EventType type = event.eventType();
String key = event.eventDomain().name() + "/" + (type == null ? "UNKNOWN" : type.name());
eventTypeCounts.merge(key, 1, Integer::sum);
}
private int nextPersistBatchNo() {
persistBatchNo++;
return persistBatchNo;
}
private void acceptPersistResult(EventHubPackageResult result) {
if (result != null) {
eventsInserted += result.insertedCount();
}
}
}
}

View File

@ -138,6 +138,11 @@ eventhub:
emit-initial-ignition-snapshot: false
sync-vehicle-registrations-on-master-data-update: false
# JDBC extraction handoff mode:
# SYNC_DIRECT = persist controlled JDBC batches directly, cursor-safe default.
# CAMEL_ROUTE = persist the same controlled batches through direct:eventhub-batch-persist-input.
jdbc-extraction-ingest-mode: ${YELLOWFOX_JDBC_EXTRACTION_INGEST_MODE:SYNC_DIRECT}
datasource:
jdbc-url: ${YELLOWFOX_DB_JDBC_URL:}
username: ${YELLOWFOX_DB_USERNAME:}

View File

@ -76,6 +76,7 @@ class JdbcYellowFoxD8BookingExtractionBatchExecutorCursorTest {
private JdbcYellowFoxD8BookingExtractionBatchExecutor executor(ImportCursorRepository repository) {
EventHubProperties properties = new EventHubProperties();
return new JdbcYellowFoxD8BookingExtractionBatchExecutor(
null,
null,
null,
new DefaultResourceLoader(),

View File

@ -0,0 +1,113 @@
package at.procon.eventhub.yellowfox.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.DataPackageType;
import at.procon.eventhub.dto.EventHubEventBatchDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventHubPackageResult;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.service.EventHubIngestionService;
import java.time.OffsetDateTime;
import java.util.List;
import org.apache.camel.ProducerTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.DefaultResourceLoader;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class JdbcYellowFoxD8BookingExtractionBatchExecutorIngestModeTest {
@Test
void usesDirectIngestionServiceInSyncDirectMode() {
EventHubIngestionService ingestionService = mock(EventHubIngestionService.class);
ProducerTemplate producerTemplate = mock(ProducerTemplate.class);
JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(
EventHubProperties.JdbcExtractionIngestMode.SYNC_DIRECT,
ingestionService,
producerTemplate
);
EventHubEventBatchDto batch = batch();
EventHubPackageResult result = new EventHubPackageResult(null, batch.packageKey(), 2, 2);
when(ingestionService.ingest(batch)).thenReturn(result);
assertThat(executor.persistBatch(batch)).isEqualTo(result);
verify(ingestionService).ingest(batch);
}
@Test
void usesCamelBatchPersistRouteInCamelMode() {
EventHubIngestionService ingestionService = mock(EventHubIngestionService.class);
ProducerTemplate producerTemplate = mock(ProducerTemplate.class);
JdbcYellowFoxD8BookingExtractionBatchExecutor executor = executor(
EventHubProperties.JdbcExtractionIngestMode.CAMEL_ROUTE,
ingestionService,
producerTemplate
);
EventHubEventBatchDto batch = batch();
EventHubPackageResult result = new EventHubPackageResult(null, batch.packageKey(), 2, 2);
when(producerTemplate.requestBody(
eq("direct:eventhub-batch-persist-input"),
eq(batch),
eq(EventHubPackageResult.class)
)).thenReturn(result);
assertThat(executor.persistBatch(batch)).isEqualTo(result);
verify(producerTemplate).requestBody(
"direct:eventhub-batch-persist-input",
batch,
EventHubPackageResult.class
);
}
private JdbcYellowFoxD8BookingExtractionBatchExecutor executor(
EventHubProperties.JdbcExtractionIngestMode ingestMode,
EventHubIngestionService ingestionService,
ProducerTemplate producerTemplate
) {
EventHubProperties properties = new EventHubProperties();
properties.getYellowFox().setJdbcExtractionIngestMode(ingestMode);
return new JdbcYellowFoxD8BookingExtractionBatchExecutor(
null,
ingestionService,
producerTemplate,
new DefaultResourceLoader(),
null,
properties,
null,
null,
null
);
}
private EventHubEventBatchDto batch() {
return new EventHubEventBatchDto(
"pkg-1",
new EventHubPackageRequest(
"tenant-1",
new EventSourceDto("YELLOWFOX", "TELEMATICS_PLATFORM", "YELLOWFOX_D8", "instance-1", "setting-1", "7"),
null,
ImportScopeDto.tenantAll(
OffsetDateTime.parse("2026-04-01T00:00:00+02:00"),
OffsetDateTime.parse("2026-04-02T00:00:00+02:00")
),
"DRIVER_ACTIVITY",
null,
"external-1"
),
DataPackageType.DB_EXTRACT,
null,
null,
List.of(),
null
);
}
}

View File

@ -84,6 +84,7 @@ class JdbcYellowFoxD8BookingExtractionBatchExecutorTest {
EventHubProperties properties = new EventHubProperties();
properties.getYellowFox().setOccurredAtOverlap(overlap);
return new JdbcYellowFoxD8BookingExtractionBatchExecutor(
null,
null,
null,
new DefaultResourceLoader(),