diff --git a/src/main/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapter.java b/src/main/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapter.java index 77dc7b3..b1527df 100644 --- a/src/main/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapter.java +++ b/src/main/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapter.java @@ -3,6 +3,8 @@ package at.procon.dip.ingestion.adapter; import at.procon.dip.domain.access.DocumentAccessContext; import at.procon.dip.domain.document.CanonicalDocumentMetadata; import at.procon.dip.domain.document.SourceType; +import at.procon.dip.domain.document.service.DocumentService; +import at.procon.dip.ingestion.config.DipIngestionProperties; import at.procon.dip.ingestion.dto.ImportedDocumentResult; import at.procon.dip.ingestion.service.GenericDocumentImportService; import at.procon.dip.ingestion.service.TedPackageChildImportProcessor; @@ -11,20 +13,25 @@ import at.procon.dip.ingestion.spi.DocumentIngestionAdapter; import at.procon.dip.ingestion.spi.IngestionResult; import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy; import at.procon.dip.ingestion.spi.SourceDescriptor; -import at.procon.dip.ingestion.config.DipIngestionProperties; import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; import at.procon.dip.runtime.config.RuntimeMode; import java.nio.file.Files; import java.nio.file.Path; import java.time.OffsetDateTime; import java.util.ArrayList; +import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -40,6 +47,7 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap private final GenericDocumentImportService importService; private final TedPackageExpansionService expansionService; private final TedPackageChildImportProcessor childImportProcessor; + private final DocumentService documentService; @Override public boolean supports(SourceDescriptor sourceDescriptor) { @@ -75,27 +83,105 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap List warnings = new ArrayList<>(packageDocument.warnings()); List documents = new ArrayList<>(); - documents.add(packageDocument.document().toCanonicalMetadata()); + documents.add(documentService.getMetadata(packageDocument.document().getId())); + List childResults = properties.getTedPackageChildParallelism() > 1 + ? processChildrenInParallel(packageDocument, sourceDescriptor) + : processChildrenSequentially(packageDocument, sourceDescriptor); + + childResults.stream() + .sorted(Comparator.comparingInt(OrderedChildImportResult::sortOrder)) + .forEach(orderedResult -> { + TedPackageChildImportProcessor.ChildImportResult result = orderedResult.result(); + if (result.childDocumentMetadata() != null) { + documents.add(result.childDocumentMetadata()); + } + if (StringUtils.hasText(result.warning())) { + warnings.add(result.warning()); + } + }); + + return new IngestionResult(documents, warnings); + } + + private List processChildrenSequentially(ImportedDocumentResult packageDocument, + SourceDescriptor sourceDescriptor) { AtomicInteger sortOrder = new AtomicInteger(); + List results = new ArrayList<>(); streamPackageEntries(sourceDescriptor, entry -> { + int order = sortOrder.incrementAndGet(); TedPackageChildImportProcessor.ChildImportResult result = childImportProcessor.processChild( packageDocument.document().getId(), sourceDescriptor.sourceIdentifier(), sourceDescriptor.receivedAt(), sourceDescriptor.accessContext(), entry, - sortOrder.incrementAndGet() + order ); - if (result.childDocument() != null) { - documents.add(result.childDocument().toCanonicalMetadata()); - } - if (StringUtils.hasText(result.warning())) { - warnings.add(result.warning()); - } + results.add(new OrderedChildImportResult(order, result)); }); + return results; + } - return new IngestionResult(documents, warnings); + private List processChildrenInParallel(ImportedDocumentResult packageDocument, + SourceDescriptor sourceDescriptor) { + int parallelism = Math.max(1, properties.getTedPackageChildParallelism()); + int maxInFlight = Math.max(parallelism, properties.getTedPackageChildMaxInFlight()); + log.info("Processing TED package {} with parallel child import (parallelism={}, maxInFlight={})", + sourceDescriptor.sourceIdentifier(), parallelism, maxInFlight); + + ExecutorService executor = Executors.newFixedThreadPool(parallelism, new TedPackageChildThreadFactory()); + ExecutorCompletionService completionService = new ExecutorCompletionService<>(executor); + List results = new ArrayList<>(); + AtomicInteger sortOrder = new AtomicInteger(); + AtomicInteger submitted = new AtomicInteger(); + AtomicInteger completed = new AtomicInteger(); + + try { + streamPackageEntries(sourceDescriptor, entry -> { + int order = sortOrder.incrementAndGet(); + completionService.submit(() -> new OrderedChildImportResult(order, childImportProcessor.processChild( + packageDocument.document().getId(), + sourceDescriptor.sourceIdentifier(), + sourceDescriptor.receivedAt(), + sourceDescriptor.accessContext(), + entry, + order + ))); + int currentSubmitted = submitted.incrementAndGet(); + while (currentSubmitted - completed.get() >= maxInFlight) { + results.add(awaitNextChildResult(completionService, executor)); + completed.incrementAndGet(); + } + }); + + while (completed.get() < submitted.get()) { + results.add(awaitNextChildResult(completionService, executor)); + completed.incrementAndGet(); + } + return results; + } finally { + executor.shutdownNow(); + } + } + + private OrderedChildImportResult awaitNextChildResult(ExecutorCompletionService completionService, + ExecutorService executor) { + try { + Future completedFuture = completionService.take(); + return completedFuture.get(); + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + throw new IllegalStateException("Interrupted while waiting for TED package child import completion", e); + } catch (ExecutionException e) { + executor.shutdownNow(); + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException runtimeException) { + throw runtimeException; + } + throw new IllegalStateException("TED package child import failed", cause); + } } private SourceDescriptor buildPackageRootSource(SourceDescriptor sourceDescriptor) { @@ -138,4 +224,18 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap return false; } } + + private record OrderedChildImportResult(int sortOrder, TedPackageChildImportProcessor.ChildImportResult result) { + } + + private static final class TedPackageChildThreadFactory implements ThreadFactory { + private final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "dip-ted-child-" + counter.incrementAndGet()); + thread.setDaemon(true); + return thread; + } + } } diff --git a/src/main/java/at/procon/dip/ingestion/config/DipIngestionProperties.java b/src/main/java/at/procon/dip/ingestion/config/DipIngestionProperties.java index e9e309f..c36a653 100644 --- a/src/main/java/at/procon/dip/ingestion/config/DipIngestionProperties.java +++ b/src/main/java/at/procon/dip/ingestion/config/DipIngestionProperties.java @@ -60,6 +60,21 @@ public class DipIngestionProperties { */ private boolean tedPackageDeferLexicalIndexing = true; + /** + * Number of worker threads used for TED package child imports. + * Set to 1 to keep child processing sequential. + */ + @Positive + private int tedPackageChildParallelism = 1; + + /** + * Maximum number of TED package child tasks allowed to be in flight (running or queued) + * while streaming a package. This provides bounded backpressure and avoids buffering the + * whole package in memory before processing starts. + */ + @Positive + private int tedPackageChildMaxInFlight = 8; + private boolean gatewayOnlyForTedPackages = false; @NotBlank diff --git a/src/main/java/at/procon/dip/ingestion/service/TedPackageChildImportProcessor.java b/src/main/java/at/procon/dip/ingestion/service/TedPackageChildImportProcessor.java index 788ba8e..de0d809 100644 --- a/src/main/java/at/procon/dip/ingestion/service/TedPackageChildImportProcessor.java +++ b/src/main/java/at/procon/dip/ingestion/service/TedPackageChildImportProcessor.java @@ -1,11 +1,13 @@ package at.procon.dip.ingestion.service; import at.procon.dip.domain.access.DocumentAccessContext; +import at.procon.dip.domain.document.CanonicalDocumentMetadata; import at.procon.dip.domain.document.RelationType; import at.procon.dip.domain.document.SourceType; import at.procon.dip.domain.document.entity.Document; import at.procon.dip.domain.document.service.DocumentRelationService; import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand; +import at.procon.dip.ingestion.config.DipIngestionProperties; import at.procon.dip.ingestion.dto.ImportedDocumentResult; import at.procon.dip.ingestion.service.TedPackageExpansionService.TedPackageEntry; import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy; @@ -78,21 +80,23 @@ public class TedPackageChildImportProcessor { entry.archivePath() )); + CanonicalDocumentMetadata childMetadata = childDocument.toCanonicalMetadata(); + if (childResult.deduplicated()) { - return ChildImportResult.success(childDocument, + return ChildImportResult.success(childDocument.getId(), childMetadata, "TED XML child already existed and was linked to package: " + entry.archivePath()); } - return ChildImportResult.success(childDocument, + return ChildImportResult.success(childDocument.getId(), childMetadata, childResult.warnings() == null || childResult.warnings().isEmpty() ? null : String.join(" | ", childResult.warnings())); } - public record ChildImportResult(Document childDocument, String warning) { - public static ChildImportResult success(Document childDocument, String warning) { - return new ChildImportResult(childDocument, warning); + public record ChildImportResult(UUID childDocumentId, CanonicalDocumentMetadata childDocumentMetadata, String warning) { + public static ChildImportResult success(UUID childDocumentId, CanonicalDocumentMetadata childDocumentMetadata, String warning) { + return new ChildImportResult(childDocumentId, childDocumentMetadata, warning); } public static ChildImportResult warning(String warning) { - return new ChildImportResult(null, warning); + return new ChildImportResult(null, null, warning); } } } diff --git a/src/main/java/at/procon/dip/processing/impl/TedStructuredDocumentProcessor.java b/src/main/java/at/procon/dip/processing/impl/TedStructuredDocumentProcessor.java index 5f8ae16..37f36ef 100644 --- a/src/main/java/at/procon/dip/processing/impl/TedStructuredDocumentProcessor.java +++ b/src/main/java/at/procon/dip/processing/impl/TedStructuredDocumentProcessor.java @@ -16,11 +16,13 @@ import at.procon.dip.processing.spi.StructuredProcessingRequest; import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; import at.procon.dip.runtime.config.RuntimeMode; import at.procon.ted.model.entity.ProcurementDocument; -import at.procon.ted.service.XmlParserService; + import java.nio.charset.StandardCharsets; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; + +import at.procon.ted.service.XmlParserService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; diff --git a/src/main/java/at/procon/ted/service/XmlParserService.java b/src/main/java/at/procon/ted/service/XmlParserService.java index 8e1fead..3a557eb 100644 --- a/src/main/java/at/procon/ted/service/XmlParserService.java +++ b/src/main/java/at/procon/ted/service/XmlParserService.java @@ -16,7 +16,6 @@ import java.time.LocalDate; import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.util.*; @@ -59,11 +58,10 @@ public class XmlParserService { */ public ProcurementDocument parseDocument(String xmlContent) { try { - DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder(); + DocumentBuilder builder = newDocumentBuilder(); Document doc = builder.parse(new InputSource(new StringReader(xmlContent))); - - XPath xpath = xPathFactory.newXPath(); - xpath.setNamespaceContext(createNamespaceContext()); + + XPath xpath = newXPath(); ProcurementDocument document = ProcurementDocument.builder() .xmlDocument(xmlContent) @@ -265,29 +263,32 @@ public class XmlParserService { } } - private final Map cache = new HashMap<>(); - - private XPathExpression getCompiled(XPath xpath, String expression) throws XPathExpressionException { - XPathExpression compiled = cache.get(expression); - if (compiled == null) { - compiled = xpath.compile(expression); - cache.put(expression, compiled); + private DocumentBuilder newDocumentBuilder() throws Exception { + synchronized (documentBuilderFactory) { + documentBuilderFactory.setNamespaceAware(true); + return documentBuilderFactory.newDocumentBuilder(); + } + } + + private XPath newXPath() { + synchronized (xPathFactory) { + XPath xpath = xPathFactory.newXPath(); + xpath.setNamespaceContext(createNamespaceContext()); + return xpath; } - return compiled; } private String getTextContent(XPath xpath, Object item, String expression) throws XPathExpressionException { - XPathExpression expr = getCompiled(xpath, expression); - Node node = (Node) expr.evaluate(item, XPathConstants.NODE); + Node node = (Node) xpath.evaluate(expression, item, XPathConstants.NODE); return node != null ? node.getTextContent().trim() : null; } private Node getNode(XPath xpath, Object item, String expression) throws XPathExpressionException { - return (Node) getCompiled(xpath, expression).evaluate(item, XPathConstants.NODE); + return (Node) xpath.evaluate(expression, item, XPathConstants.NODE); } private NodeList getNodes(XPath xpath, Object item, String expression) throws XPathExpressionException { - return (NodeList) getCompiled(xpath, expression).evaluate(item, XPathConstants.NODESET); + return (NodeList) xpath.evaluate(expression, item, XPathConstants.NODESET); } private Element getDirectChild(Element parent, String namespaceUri, String localName) { diff --git a/src/main/resources/application-new.yml b/src/main/resources/application-new.yml index 825101e..c3ed7b2 100644 --- a/src/main/resources/application-new.yml +++ b/src/main/resources/application-new.yml @@ -33,6 +33,9 @@ dip: max-chunks-per-document: 12 # Startup backfill limit for missing lexical vectors startup-lexical-backfill-limit: 500 + scheduled-lexical-backfill-enabled: true + scheduled-lexical-backfill-delay-ms: 30000 + scheduled-lexical-backfill-batch-size: 200 # Number of top hits per engine returned by /search/debug debug-top-hits-per-engine: 10 @@ -229,6 +232,9 @@ dip: # Import batch marker for mail roots and attachments mail-import-batch-id: phase41-mail + ted-package-child-parallelism: 4 + ted-package-child-max-in-flight: 8 + # NEW Camel mail consumer route for provider-driven mail ingestion mail-route: # Enable/disable the NEW Camel mail consumer @@ -269,7 +275,7 @@ dip: # ted packages download configuration ted-download: # Enable/disable automatic package download - enabled: false + enabled: true # Base URL for TED Daily Packages base-url: https://ted.europa.eu/packages/daily/ # Download directory for tar.gz files @@ -294,7 +300,7 @@ dip: delete-after-ingestion: true time: - enabled: true + enabled: false leitstand: enabled: false startup-sync-enabled: false @@ -315,7 +321,6 @@ dip: driver-class-name: net.sourceforge.jtds.jdbc.Driver fetch-size: 500 query-timeout-seconds: 300 - toggl-track: enabled: false import-batch-id: time-toggl diff --git a/src/test/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapterTest.java b/src/test/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapterTest.java new file mode 100644 index 0000000..a14eb24 --- /dev/null +++ b/src/test/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapterTest.java @@ -0,0 +1,142 @@ +package at.procon.dip.ingestion.adapter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import at.procon.dip.domain.access.DocumentAccessContext; +import at.procon.dip.domain.document.DocumentFamily; +import at.procon.dip.domain.document.DocumentStatus; +import at.procon.dip.domain.document.DocumentType; +import at.procon.dip.domain.document.SourceType; +import at.procon.dip.domain.document.entity.Document; +import at.procon.dip.domain.document.service.DocumentService; +import at.procon.dip.ingestion.config.DipIngestionProperties; +import at.procon.dip.ingestion.dto.ImportedDocumentResult; +import at.procon.dip.ingestion.service.GenericDocumentImportService; +import at.procon.dip.ingestion.service.TedPackageChildImportProcessor; +import at.procon.dip.ingestion.service.TedPackageExpansionService; +import at.procon.dip.ingestion.service.TedPackageExpansionService.TedPackageEntry; +import at.procon.dip.ingestion.spi.IngestionResult; +import at.procon.dip.ingestion.spi.SourceDescriptor; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class TedPackageDocumentIngestionAdapterTest { + + @Mock + private GenericDocumentImportService importService; + @Mock + private TedPackageExpansionService expansionService; + @Mock + private TedPackageChildImportProcessor childImportProcessor; + @Mock + private DocumentService documentService; + + private DipIngestionProperties properties; + private TedPackageDocumentIngestionAdapter adapter; + + @BeforeEach + void setUp() { + properties = new DipIngestionProperties(); + properties.setEnabled(true); + properties.setTedPackageAdapterEnabled(true); + properties.setTedPackageChildParallelism(4); + properties.setTedPackageChildMaxInFlight(4); + adapter = new TedPackageDocumentIngestionAdapter(properties, importService, expansionService, childImportProcessor, documentService); + } + + @Test + void ingest_should_process_ted_children_in_parallel_and_keep_result_order() throws Exception { + Document rootDocument = Document.builder() + .id(UUID.randomUUID()) + .documentType(DocumentType.GENERIC_BINARY) + .documentFamily(DocumentFamily.GENERIC) + .status(DocumentStatus.RECEIVED) + .title("package.tar.gz") + .build(); + when(importService.importDocument(any())).thenReturn(new ImportedDocumentResult(rootDocument, null, List.of(), false)); + + List entries = List.of( + entry("a.xml"), + entry("b.xml"), + entry("c.xml"), + entry("d.xml") + ); + doAnswer(invocation -> { + TedPackageExpansionService.TedPackageEntryConsumer consumer = invocation.getArgument(1); + for (TedPackageEntry entry : entries) { + consumer.accept(entry); + } + return null; + }).when(expansionService).streamEntries(any(byte[].class), any(TedPackageExpansionService.TedPackageEntryConsumer.class)); + + AtomicInteger active = new AtomicInteger(); + AtomicInteger maxActive = new AtomicInteger(); + when(childImportProcessor.processChild(any(UUID.class), anyString(), any(OffsetDateTime.class), any(), any(TedPackageEntry.class), anyInt())) + .thenAnswer(invocation -> { + int sortOrder = invocation.getArgument(5, Integer.class); + int currentActive = active.incrementAndGet(); + maxActive.accumulateAndGet(currentActive, Math::max); + try { + Thread.sleep(100); + } finally { + active.decrementAndGet(); + } + Document childDocument = Document.builder() + .id(UUID.nameUUIDFromBytes(("child-" + sortOrder).getBytes(StandardCharsets.UTF_8))) + .documentType(DocumentType.TED_NOTICE) + .documentFamily(DocumentFamily.PROCUREMENT) + .status(DocumentStatus.RECEIVED) + .title("child-" + sortOrder) + .build(); + var metadata = documentService.getMetadata(childDocument.getId()); + return TedPackageChildImportProcessor.ChildImportResult.success(childDocument.getId(), metadata, "warn-" + sortOrder); + }); + + IngestionResult result = adapter.ingest(new SourceDescriptor( + DocumentAccessContext.publicDocument(), + SourceType.TED_PACKAGE, + "PKG-1", + null, + "package.tar.gz", + "application/gzip", + new byte[]{1, 2, 3}, + null, + OffsetDateTime.now(), + null, + java.util.Map.of() + )); + + assertThat(maxActive.get()).isGreaterThan(1); + assertThat(result.documents()).hasSize(1 + entries.size()); + assertThat(result.documents().get(0).title()).isEqualTo("package.tar.gz"); + assertThat(result.documents().subList(1, result.documents().size())) + .extracting(metadata -> metadata.title()) + .containsExactly("child-1", "child-2", "child-3", "child-4"); + assertThat(result.warnings()).containsExactly("warn-1", "warn-2", "warn-3", "warn-4"); + + verify(childImportProcessor, times(entries.size())).processChild(eq(rootDocument.getId()), eq("PKG-1"), any(OffsetDateTime.class), any(), any(TedPackageEntry.class), anyInt()); + } + + private TedPackageEntry entry(String fileName) { + String path = "folder/" + fileName; + byte[] xml = ("" + fileName + "").getBytes(StandardCharsets.UTF_8); + return new TedPackageEntry(path, fileName, xml, new String(xml, StandardCharsets.UTF_8)); + } +}