introduced parallelism in TED package db import
This commit is contained in:
parent
06c1485df9
commit
c6efbf40f6
|
|
@ -3,6 +3,8 @@ package at.procon.dip.ingestion.adapter;
|
||||||
import at.procon.dip.domain.access.DocumentAccessContext;
|
import at.procon.dip.domain.access.DocumentAccessContext;
|
||||||
import at.procon.dip.domain.document.CanonicalDocumentMetadata;
|
import at.procon.dip.domain.document.CanonicalDocumentMetadata;
|
||||||
import at.procon.dip.domain.document.SourceType;
|
import at.procon.dip.domain.document.SourceType;
|
||||||
|
import at.procon.dip.domain.document.service.DocumentService;
|
||||||
|
import at.procon.dip.ingestion.config.DipIngestionProperties;
|
||||||
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
|
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
|
||||||
import at.procon.dip.ingestion.service.GenericDocumentImportService;
|
import at.procon.dip.ingestion.service.GenericDocumentImportService;
|
||||||
import at.procon.dip.ingestion.service.TedPackageChildImportProcessor;
|
import at.procon.dip.ingestion.service.TedPackageChildImportProcessor;
|
||||||
|
|
@ -11,20 +13,25 @@ import at.procon.dip.ingestion.spi.DocumentIngestionAdapter;
|
||||||
import at.procon.dip.ingestion.spi.IngestionResult;
|
import at.procon.dip.ingestion.spi.IngestionResult;
|
||||||
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
|
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
|
||||||
import at.procon.dip.ingestion.spi.SourceDescriptor;
|
import at.procon.dip.ingestion.spi.SourceDescriptor;
|
||||||
import at.procon.dip.ingestion.config.DipIngestionProperties;
|
|
||||||
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
||||||
import at.procon.dip.runtime.config.RuntimeMode;
|
import at.procon.dip.runtime.config.RuntimeMode;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.transaction.annotation.Propagation;
|
import org.springframework.transaction.annotation.Propagation;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
@ -40,6 +47,7 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap
|
||||||
private final GenericDocumentImportService importService;
|
private final GenericDocumentImportService importService;
|
||||||
private final TedPackageExpansionService expansionService;
|
private final TedPackageExpansionService expansionService;
|
||||||
private final TedPackageChildImportProcessor childImportProcessor;
|
private final TedPackageChildImportProcessor childImportProcessor;
|
||||||
|
private final DocumentService documentService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean supports(SourceDescriptor sourceDescriptor) {
|
public boolean supports(SourceDescriptor sourceDescriptor) {
|
||||||
|
|
@ -75,20 +83,18 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap
|
||||||
|
|
||||||
List<String> warnings = new ArrayList<>(packageDocument.warnings());
|
List<String> warnings = new ArrayList<>(packageDocument.warnings());
|
||||||
List<CanonicalDocumentMetadata> documents = new ArrayList<>();
|
List<CanonicalDocumentMetadata> documents = new ArrayList<>();
|
||||||
documents.add(packageDocument.document().toCanonicalMetadata());
|
documents.add(documentService.getMetadata(packageDocument.document().getId()));
|
||||||
|
|
||||||
AtomicInteger sortOrder = new AtomicInteger();
|
List<OrderedChildImportResult> childResults = properties.getTedPackageChildParallelism() > 1
|
||||||
streamPackageEntries(sourceDescriptor, entry -> {
|
? processChildrenInParallel(packageDocument, sourceDescriptor)
|
||||||
TedPackageChildImportProcessor.ChildImportResult result = childImportProcessor.processChild(
|
: processChildrenSequentially(packageDocument, sourceDescriptor);
|
||||||
packageDocument.document().getId(),
|
|
||||||
sourceDescriptor.sourceIdentifier(),
|
childResults.stream()
|
||||||
sourceDescriptor.receivedAt(),
|
.sorted(Comparator.comparingInt(OrderedChildImportResult::sortOrder))
|
||||||
sourceDescriptor.accessContext(),
|
.forEach(orderedResult -> {
|
||||||
entry,
|
TedPackageChildImportProcessor.ChildImportResult result = orderedResult.result();
|
||||||
sortOrder.incrementAndGet()
|
if (result.childDocumentMetadata() != null) {
|
||||||
);
|
documents.add(result.childDocumentMetadata());
|
||||||
if (result.childDocument() != null) {
|
|
||||||
documents.add(result.childDocument().toCanonicalMetadata());
|
|
||||||
}
|
}
|
||||||
if (StringUtils.hasText(result.warning())) {
|
if (StringUtils.hasText(result.warning())) {
|
||||||
warnings.add(result.warning());
|
warnings.add(result.warning());
|
||||||
|
|
@ -98,6 +104,86 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap
|
||||||
return new IngestionResult(documents, warnings);
|
return new IngestionResult(documents, warnings);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<OrderedChildImportResult> processChildrenSequentially(ImportedDocumentResult packageDocument,
|
||||||
|
SourceDescriptor sourceDescriptor) {
|
||||||
|
AtomicInteger sortOrder = new AtomicInteger();
|
||||||
|
List<OrderedChildImportResult> results = new ArrayList<>();
|
||||||
|
streamPackageEntries(sourceDescriptor, entry -> {
|
||||||
|
int order = sortOrder.incrementAndGet();
|
||||||
|
TedPackageChildImportProcessor.ChildImportResult result = childImportProcessor.processChild(
|
||||||
|
packageDocument.document().getId(),
|
||||||
|
sourceDescriptor.sourceIdentifier(),
|
||||||
|
sourceDescriptor.receivedAt(),
|
||||||
|
sourceDescriptor.accessContext(),
|
||||||
|
entry,
|
||||||
|
order
|
||||||
|
);
|
||||||
|
results.add(new OrderedChildImportResult(order, result));
|
||||||
|
});
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<OrderedChildImportResult> processChildrenInParallel(ImportedDocumentResult packageDocument,
|
||||||
|
SourceDescriptor sourceDescriptor) {
|
||||||
|
int parallelism = Math.max(1, properties.getTedPackageChildParallelism());
|
||||||
|
int maxInFlight = Math.max(parallelism, properties.getTedPackageChildMaxInFlight());
|
||||||
|
log.info("Processing TED package {} with parallel child import (parallelism={}, maxInFlight={})",
|
||||||
|
sourceDescriptor.sourceIdentifier(), parallelism, maxInFlight);
|
||||||
|
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(parallelism, new TedPackageChildThreadFactory());
|
||||||
|
ExecutorCompletionService<OrderedChildImportResult> completionService = new ExecutorCompletionService<>(executor);
|
||||||
|
List<OrderedChildImportResult> results = new ArrayList<>();
|
||||||
|
AtomicInteger sortOrder = new AtomicInteger();
|
||||||
|
AtomicInteger submitted = new AtomicInteger();
|
||||||
|
AtomicInteger completed = new AtomicInteger();
|
||||||
|
|
||||||
|
try {
|
||||||
|
streamPackageEntries(sourceDescriptor, entry -> {
|
||||||
|
int order = sortOrder.incrementAndGet();
|
||||||
|
completionService.submit(() -> new OrderedChildImportResult(order, childImportProcessor.processChild(
|
||||||
|
packageDocument.document().getId(),
|
||||||
|
sourceDescriptor.sourceIdentifier(),
|
||||||
|
sourceDescriptor.receivedAt(),
|
||||||
|
sourceDescriptor.accessContext(),
|
||||||
|
entry,
|
||||||
|
order
|
||||||
|
)));
|
||||||
|
int currentSubmitted = submitted.incrementAndGet();
|
||||||
|
while (currentSubmitted - completed.get() >= maxInFlight) {
|
||||||
|
results.add(awaitNextChildResult(completionService, executor));
|
||||||
|
completed.incrementAndGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while (completed.get() < submitted.get()) {
|
||||||
|
results.add(awaitNextChildResult(completionService, executor));
|
||||||
|
completed.incrementAndGet();
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
} finally {
|
||||||
|
executor.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private OrderedChildImportResult awaitNextChildResult(ExecutorCompletionService<OrderedChildImportResult> completionService,
|
||||||
|
ExecutorService executor) {
|
||||||
|
try {
|
||||||
|
Future<OrderedChildImportResult> completedFuture = completionService.take();
|
||||||
|
return completedFuture.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
executor.shutdownNow();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new IllegalStateException("Interrupted while waiting for TED package child import completion", e);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
executor.shutdownNow();
|
||||||
|
Throwable cause = e.getCause();
|
||||||
|
if (cause instanceof RuntimeException runtimeException) {
|
||||||
|
throw runtimeException;
|
||||||
|
}
|
||||||
|
throw new IllegalStateException("TED package child import failed", cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private SourceDescriptor buildPackageRootSource(SourceDescriptor sourceDescriptor) {
|
private SourceDescriptor buildPackageRootSource(SourceDescriptor sourceDescriptor) {
|
||||||
if (StringUtils.hasText(sourceDescriptor.sourceUri()) && isReadablePath(sourceDescriptor.sourceUri())) {
|
if (StringUtils.hasText(sourceDescriptor.sourceUri()) && isReadablePath(sourceDescriptor.sourceUri())) {
|
||||||
return new SourceDescriptor(
|
return new SourceDescriptor(
|
||||||
|
|
@ -138,4 +224,18 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private record OrderedChildImportResult(int sortOrder, TedPackageChildImportProcessor.ChildImportResult result) {
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class TedPackageChildThreadFactory implements ThreadFactory {
|
||||||
|
private final AtomicInteger counter = new AtomicInteger();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable runnable) {
|
||||||
|
Thread thread = new Thread(runnable, "dip-ted-child-" + counter.incrementAndGet());
|
||||||
|
thread.setDaemon(true);
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -60,6 +60,21 @@ public class DipIngestionProperties {
|
||||||
*/
|
*/
|
||||||
private boolean tedPackageDeferLexicalIndexing = true;
|
private boolean tedPackageDeferLexicalIndexing = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of worker threads used for TED package child imports.
|
||||||
|
* Set to 1 to keep child processing sequential.
|
||||||
|
*/
|
||||||
|
@Positive
|
||||||
|
private int tedPackageChildParallelism = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum number of TED package child tasks allowed to be in flight (running or queued)
|
||||||
|
* while streaming a package. This provides bounded backpressure and avoids buffering the
|
||||||
|
* whole package in memory before processing starts.
|
||||||
|
*/
|
||||||
|
@Positive
|
||||||
|
private int tedPackageChildMaxInFlight = 8;
|
||||||
|
|
||||||
private boolean gatewayOnlyForTedPackages = false;
|
private boolean gatewayOnlyForTedPackages = false;
|
||||||
|
|
||||||
@NotBlank
|
@NotBlank
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,13 @@
|
||||||
package at.procon.dip.ingestion.service;
|
package at.procon.dip.ingestion.service;
|
||||||
|
|
||||||
import at.procon.dip.domain.access.DocumentAccessContext;
|
import at.procon.dip.domain.access.DocumentAccessContext;
|
||||||
|
import at.procon.dip.domain.document.CanonicalDocumentMetadata;
|
||||||
import at.procon.dip.domain.document.RelationType;
|
import at.procon.dip.domain.document.RelationType;
|
||||||
import at.procon.dip.domain.document.SourceType;
|
import at.procon.dip.domain.document.SourceType;
|
||||||
import at.procon.dip.domain.document.entity.Document;
|
import at.procon.dip.domain.document.entity.Document;
|
||||||
import at.procon.dip.domain.document.service.DocumentRelationService;
|
import at.procon.dip.domain.document.service.DocumentRelationService;
|
||||||
import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand;
|
import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand;
|
||||||
|
import at.procon.dip.ingestion.config.DipIngestionProperties;
|
||||||
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
|
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
|
||||||
import at.procon.dip.ingestion.service.TedPackageExpansionService.TedPackageEntry;
|
import at.procon.dip.ingestion.service.TedPackageExpansionService.TedPackageEntry;
|
||||||
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
|
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
|
||||||
|
|
@ -78,21 +80,23 @@ public class TedPackageChildImportProcessor {
|
||||||
entry.archivePath()
|
entry.archivePath()
|
||||||
));
|
));
|
||||||
|
|
||||||
|
CanonicalDocumentMetadata childMetadata = childDocument.toCanonicalMetadata();
|
||||||
|
|
||||||
if (childResult.deduplicated()) {
|
if (childResult.deduplicated()) {
|
||||||
return ChildImportResult.success(childDocument,
|
return ChildImportResult.success(childDocument.getId(), childMetadata,
|
||||||
"TED XML child already existed and was linked to package: " + entry.archivePath());
|
"TED XML child already existed and was linked to package: " + entry.archivePath());
|
||||||
}
|
}
|
||||||
return ChildImportResult.success(childDocument,
|
return ChildImportResult.success(childDocument.getId(), childMetadata,
|
||||||
childResult.warnings() == null || childResult.warnings().isEmpty() ? null : String.join(" | ", childResult.warnings()));
|
childResult.warnings() == null || childResult.warnings().isEmpty() ? null : String.join(" | ", childResult.warnings()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public record ChildImportResult(Document childDocument, String warning) {
|
public record ChildImportResult(UUID childDocumentId, CanonicalDocumentMetadata childDocumentMetadata, String warning) {
|
||||||
public static ChildImportResult success(Document childDocument, String warning) {
|
public static ChildImportResult success(UUID childDocumentId, CanonicalDocumentMetadata childDocumentMetadata, String warning) {
|
||||||
return new ChildImportResult(childDocument, warning);
|
return new ChildImportResult(childDocumentId, childDocumentMetadata, warning);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ChildImportResult warning(String warning) {
|
public static ChildImportResult warning(String warning) {
|
||||||
return new ChildImportResult(null, warning);
|
return new ChildImportResult(null, null, warning);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,11 +16,13 @@ import at.procon.dip.processing.spi.StructuredProcessingRequest;
|
||||||
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
||||||
import at.procon.dip.runtime.config.RuntimeMode;
|
import at.procon.dip.runtime.config.RuntimeMode;
|
||||||
import at.procon.ted.model.entity.ProcurementDocument;
|
import at.procon.ted.model.entity.ProcurementDocument;
|
||||||
import at.procon.ted.service.XmlParserService;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import at.procon.ted.service.XmlParserService;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,6 @@ import java.time.LocalDate;
|
||||||
import java.time.LocalTime;
|
import java.time.LocalTime;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.time.ZoneOffset;
|
import java.time.ZoneOffset;
|
||||||
import java.time.format.DateTimeFormatter;
|
|
||||||
import java.time.format.DateTimeParseException;
|
import java.time.format.DateTimeParseException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
|
|
@ -59,11 +58,10 @@ public class XmlParserService {
|
||||||
*/
|
*/
|
||||||
public ProcurementDocument parseDocument(String xmlContent) {
|
public ProcurementDocument parseDocument(String xmlContent) {
|
||||||
try {
|
try {
|
||||||
DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
|
DocumentBuilder builder = newDocumentBuilder();
|
||||||
Document doc = builder.parse(new InputSource(new StringReader(xmlContent)));
|
Document doc = builder.parse(new InputSource(new StringReader(xmlContent)));
|
||||||
|
|
||||||
XPath xpath = xPathFactory.newXPath();
|
XPath xpath = newXPath();
|
||||||
xpath.setNamespaceContext(createNamespaceContext());
|
|
||||||
|
|
||||||
ProcurementDocument document = ProcurementDocument.builder()
|
ProcurementDocument document = ProcurementDocument.builder()
|
||||||
.xmlDocument(xmlContent)
|
.xmlDocument(xmlContent)
|
||||||
|
|
@ -265,29 +263,32 @@ public class XmlParserService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Map<String, XPathExpression> cache = new HashMap<>();
|
private DocumentBuilder newDocumentBuilder() throws Exception {
|
||||||
|
synchronized (documentBuilderFactory) {
|
||||||
private XPathExpression getCompiled(XPath xpath, String expression) throws XPathExpressionException {
|
documentBuilderFactory.setNamespaceAware(true);
|
||||||
XPathExpression compiled = cache.get(expression);
|
return documentBuilderFactory.newDocumentBuilder();
|
||||||
if (compiled == null) {
|
}
|
||||||
compiled = xpath.compile(expression);
|
}
|
||||||
cache.put(expression, compiled);
|
|
||||||
|
private XPath newXPath() {
|
||||||
|
synchronized (xPathFactory) {
|
||||||
|
XPath xpath = xPathFactory.newXPath();
|
||||||
|
xpath.setNamespaceContext(createNamespaceContext());
|
||||||
|
return xpath;
|
||||||
}
|
}
|
||||||
return compiled;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getTextContent(XPath xpath, Object item, String expression) throws XPathExpressionException {
|
private String getTextContent(XPath xpath, Object item, String expression) throws XPathExpressionException {
|
||||||
XPathExpression expr = getCompiled(xpath, expression);
|
Node node = (Node) xpath.evaluate(expression, item, XPathConstants.NODE);
|
||||||
Node node = (Node) expr.evaluate(item, XPathConstants.NODE);
|
|
||||||
return node != null ? node.getTextContent().trim() : null;
|
return node != null ? node.getTextContent().trim() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Node getNode(XPath xpath, Object item, String expression) throws XPathExpressionException {
|
private Node getNode(XPath xpath, Object item, String expression) throws XPathExpressionException {
|
||||||
return (Node) getCompiled(xpath, expression).evaluate(item, XPathConstants.NODE);
|
return (Node) xpath.evaluate(expression, item, XPathConstants.NODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private NodeList getNodes(XPath xpath, Object item, String expression) throws XPathExpressionException {
|
private NodeList getNodes(XPath xpath, Object item, String expression) throws XPathExpressionException {
|
||||||
return (NodeList) getCompiled(xpath, expression).evaluate(item, XPathConstants.NODESET);
|
return (NodeList) xpath.evaluate(expression, item, XPathConstants.NODESET);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Element getDirectChild(Element parent, String namespaceUri, String localName) {
|
private Element getDirectChild(Element parent, String namespaceUri, String localName) {
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,9 @@ dip:
|
||||||
max-chunks-per-document: 12
|
max-chunks-per-document: 12
|
||||||
# Startup backfill limit for missing lexical vectors
|
# Startup backfill limit for missing lexical vectors
|
||||||
startup-lexical-backfill-limit: 500
|
startup-lexical-backfill-limit: 500
|
||||||
|
scheduled-lexical-backfill-enabled: true
|
||||||
|
scheduled-lexical-backfill-delay-ms: 30000
|
||||||
|
scheduled-lexical-backfill-batch-size: 200
|
||||||
# Number of top hits per engine returned by /search/debug
|
# Number of top hits per engine returned by /search/debug
|
||||||
debug-top-hits-per-engine: 10
|
debug-top-hits-per-engine: 10
|
||||||
|
|
||||||
|
|
@ -229,6 +232,9 @@ dip:
|
||||||
# Import batch marker for mail roots and attachments
|
# Import batch marker for mail roots and attachments
|
||||||
mail-import-batch-id: phase41-mail
|
mail-import-batch-id: phase41-mail
|
||||||
|
|
||||||
|
ted-package-child-parallelism: 4
|
||||||
|
ted-package-child-max-in-flight: 8
|
||||||
|
|
||||||
# NEW Camel mail consumer route for provider-driven mail ingestion
|
# NEW Camel mail consumer route for provider-driven mail ingestion
|
||||||
mail-route:
|
mail-route:
|
||||||
# Enable/disable the NEW Camel mail consumer
|
# Enable/disable the NEW Camel mail consumer
|
||||||
|
|
@ -269,7 +275,7 @@ dip:
|
||||||
# ted packages download configuration
|
# ted packages download configuration
|
||||||
ted-download:
|
ted-download:
|
||||||
# Enable/disable automatic package download
|
# Enable/disable automatic package download
|
||||||
enabled: false
|
enabled: true
|
||||||
# Base URL for TED Daily Packages
|
# Base URL for TED Daily Packages
|
||||||
base-url: https://ted.europa.eu/packages/daily/
|
base-url: https://ted.europa.eu/packages/daily/
|
||||||
# Download directory for tar.gz files
|
# Download directory for tar.gz files
|
||||||
|
|
@ -294,7 +300,7 @@ dip:
|
||||||
delete-after-ingestion: true
|
delete-after-ingestion: true
|
||||||
|
|
||||||
time:
|
time:
|
||||||
enabled: true
|
enabled: false
|
||||||
leitstand:
|
leitstand:
|
||||||
enabled: false
|
enabled: false
|
||||||
startup-sync-enabled: false
|
startup-sync-enabled: false
|
||||||
|
|
@ -315,7 +321,6 @@ dip:
|
||||||
driver-class-name: net.sourceforge.jtds.jdbc.Driver
|
driver-class-name: net.sourceforge.jtds.jdbc.Driver
|
||||||
fetch-size: 500
|
fetch-size: 500
|
||||||
query-timeout-seconds: 300
|
query-timeout-seconds: 300
|
||||||
|
|
||||||
toggl-track:
|
toggl-track:
|
||||||
enabled: false
|
enabled: false
|
||||||
import-batch-id: time-toggl
|
import-batch-id: time-toggl
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,142 @@
|
||||||
|
package at.procon.dip.ingestion.adapter;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import at.procon.dip.domain.access.DocumentAccessContext;
|
||||||
|
import at.procon.dip.domain.document.DocumentFamily;
|
||||||
|
import at.procon.dip.domain.document.DocumentStatus;
|
||||||
|
import at.procon.dip.domain.document.DocumentType;
|
||||||
|
import at.procon.dip.domain.document.SourceType;
|
||||||
|
import at.procon.dip.domain.document.entity.Document;
|
||||||
|
import at.procon.dip.domain.document.service.DocumentService;
|
||||||
|
import at.procon.dip.ingestion.config.DipIngestionProperties;
|
||||||
|
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
|
||||||
|
import at.procon.dip.ingestion.service.GenericDocumentImportService;
|
||||||
|
import at.procon.dip.ingestion.service.TedPackageChildImportProcessor;
|
||||||
|
import at.procon.dip.ingestion.service.TedPackageExpansionService;
|
||||||
|
import at.procon.dip.ingestion.service.TedPackageExpansionService.TedPackageEntry;
|
||||||
|
import at.procon.dip.ingestion.spi.IngestionResult;
|
||||||
|
import at.procon.dip.ingestion.spi.SourceDescriptor;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
class TedPackageDocumentIngestionAdapterTest {
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private GenericDocumentImportService importService;
|
||||||
|
@Mock
|
||||||
|
private TedPackageExpansionService expansionService;
|
||||||
|
@Mock
|
||||||
|
private TedPackageChildImportProcessor childImportProcessor;
|
||||||
|
@Mock
|
||||||
|
private DocumentService documentService;
|
||||||
|
|
||||||
|
private DipIngestionProperties properties;
|
||||||
|
private TedPackageDocumentIngestionAdapter adapter;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
properties = new DipIngestionProperties();
|
||||||
|
properties.setEnabled(true);
|
||||||
|
properties.setTedPackageAdapterEnabled(true);
|
||||||
|
properties.setTedPackageChildParallelism(4);
|
||||||
|
properties.setTedPackageChildMaxInFlight(4);
|
||||||
|
adapter = new TedPackageDocumentIngestionAdapter(properties, importService, expansionService, childImportProcessor, documentService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void ingest_should_process_ted_children_in_parallel_and_keep_result_order() throws Exception {
|
||||||
|
Document rootDocument = Document.builder()
|
||||||
|
.id(UUID.randomUUID())
|
||||||
|
.documentType(DocumentType.GENERIC_BINARY)
|
||||||
|
.documentFamily(DocumentFamily.GENERIC)
|
||||||
|
.status(DocumentStatus.RECEIVED)
|
||||||
|
.title("package.tar.gz")
|
||||||
|
.build();
|
||||||
|
when(importService.importDocument(any())).thenReturn(new ImportedDocumentResult(rootDocument, null, List.of(), false));
|
||||||
|
|
||||||
|
List<TedPackageEntry> entries = List.of(
|
||||||
|
entry("a.xml"),
|
||||||
|
entry("b.xml"),
|
||||||
|
entry("c.xml"),
|
||||||
|
entry("d.xml")
|
||||||
|
);
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
TedPackageExpansionService.TedPackageEntryConsumer consumer = invocation.getArgument(1);
|
||||||
|
for (TedPackageEntry entry : entries) {
|
||||||
|
consumer.accept(entry);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}).when(expansionService).streamEntries(any(byte[].class), any(TedPackageExpansionService.TedPackageEntryConsumer.class));
|
||||||
|
|
||||||
|
AtomicInteger active = new AtomicInteger();
|
||||||
|
AtomicInteger maxActive = new AtomicInteger();
|
||||||
|
when(childImportProcessor.processChild(any(UUID.class), anyString(), any(OffsetDateTime.class), any(), any(TedPackageEntry.class), anyInt()))
|
||||||
|
.thenAnswer(invocation -> {
|
||||||
|
int sortOrder = invocation.getArgument(5, Integer.class);
|
||||||
|
int currentActive = active.incrementAndGet();
|
||||||
|
maxActive.accumulateAndGet(currentActive, Math::max);
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
active.decrementAndGet();
|
||||||
|
}
|
||||||
|
Document childDocument = Document.builder()
|
||||||
|
.id(UUID.nameUUIDFromBytes(("child-" + sortOrder).getBytes(StandardCharsets.UTF_8)))
|
||||||
|
.documentType(DocumentType.TED_NOTICE)
|
||||||
|
.documentFamily(DocumentFamily.PROCUREMENT)
|
||||||
|
.status(DocumentStatus.RECEIVED)
|
||||||
|
.title("child-" + sortOrder)
|
||||||
|
.build();
|
||||||
|
var metadata = documentService.getMetadata(childDocument.getId());
|
||||||
|
return TedPackageChildImportProcessor.ChildImportResult.success(childDocument.getId(), metadata, "warn-" + sortOrder);
|
||||||
|
});
|
||||||
|
|
||||||
|
IngestionResult result = adapter.ingest(new SourceDescriptor(
|
||||||
|
DocumentAccessContext.publicDocument(),
|
||||||
|
SourceType.TED_PACKAGE,
|
||||||
|
"PKG-1",
|
||||||
|
null,
|
||||||
|
"package.tar.gz",
|
||||||
|
"application/gzip",
|
||||||
|
new byte[]{1, 2, 3},
|
||||||
|
null,
|
||||||
|
OffsetDateTime.now(),
|
||||||
|
null,
|
||||||
|
java.util.Map.of()
|
||||||
|
));
|
||||||
|
|
||||||
|
assertThat(maxActive.get()).isGreaterThan(1);
|
||||||
|
assertThat(result.documents()).hasSize(1 + entries.size());
|
||||||
|
assertThat(result.documents().get(0).title()).isEqualTo("package.tar.gz");
|
||||||
|
assertThat(result.documents().subList(1, result.documents().size()))
|
||||||
|
.extracting(metadata -> metadata.title())
|
||||||
|
.containsExactly("child-1", "child-2", "child-3", "child-4");
|
||||||
|
assertThat(result.warnings()).containsExactly("warn-1", "warn-2", "warn-3", "warn-4");
|
||||||
|
|
||||||
|
verify(childImportProcessor, times(entries.size())).processChild(eq(rootDocument.getId()), eq("PKG-1"), any(OffsetDateTime.class), any(), any(TedPackageEntry.class), anyInt());
|
||||||
|
}
|
||||||
|
|
||||||
|
private TedPackageEntry entry(String fileName) {
|
||||||
|
String path = "folder/" + fileName;
|
||||||
|
byte[] xml = ("<notice>" + fileName + "</notice>").getBytes(StandardCharsets.UTF_8);
|
||||||
|
return new TedPackageEntry(path, fileName, xml, new String(xml, StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue