Refactor phases 4.1 - per-child transaction

master
trifonovt 1 month ago
parent 1ba8cccb62
commit f3fcdfab11

@ -1,29 +1,29 @@
package at.procon.dip.ingestion.adapter;
import at.procon.dip.domain.access.DocumentAccessContext;
import at.procon.dip.domain.document.RelationType;
import at.procon.dip.domain.document.SourceType;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.service.DocumentRelationService;
import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand;
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
import at.procon.dip.ingestion.service.GenericDocumentImportService;
import at.procon.dip.ingestion.service.TedPackageChildImportProcessor;
import at.procon.dip.ingestion.service.TedPackageExpansionService;
import at.procon.dip.ingestion.spi.DocumentIngestionAdapter;
import at.procon.dip.ingestion.spi.IngestionResult;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.config.TedProcessorProperties;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
@Component
@RequiredArgsConstructor
@ -33,39 +33,35 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap
private final TedProcessorProperties properties;
private final GenericDocumentImportService importService;
private final TedPackageExpansionService expansionService;
private final DocumentRelationService relationService;
private final TedPackageChildImportProcessor childImportProcessor;
@Override
public boolean supports(SourceDescriptor sourceDescriptor) {
return sourceDescriptor.sourceType() == SourceType.TED_PACKAGE
return sourceDescriptor.sourceType() == at.procon.dip.domain.document.SourceType.TED_PACKAGE
&& properties.getGenericIngestion().isEnabled()
&& properties.getGenericIngestion().isTedPackageAdapterEnabled();
}
@Override
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public IngestionResult ingest(SourceDescriptor sourceDescriptor) {
byte[] packageBytes = sourceDescriptor.binaryContent();
if (packageBytes == null || packageBytes.length == 0) {
throw new IllegalArgumentException("TED package adapter requires tar.gz bytes");
}
SourceDescriptor packageRootSource = buildPackageRootSource(sourceDescriptor);
TedPackageExpansionService.TedPackageExpansionResult expanded = expansionService.expand(packageBytes);
Map<String, String> rootAttributes = new LinkedHashMap<>(sourceDescriptor.attributes() == null ? Map.of() : sourceDescriptor.attributes());
rootAttributes.putIfAbsent("packageId", sourceDescriptor.sourceIdentifier());
rootAttributes.putIfAbsent("title", sourceDescriptor.fileName() != null ? sourceDescriptor.fileName() : sourceDescriptor.sourceIdentifier());
rootAttributes.put("xmlEntryCount", Integer.toString(expanded.entries().size()));
rootAttributes.put("wrapperDocument", Boolean.TRUE.toString());
rootAttributes.put("importBatchId", properties.getGenericIngestion().getTedPackageImportBatchId());
ImportedDocumentResult packageDocument = importService.importDocument(new SourceDescriptor(
sourceDescriptor.accessContext() == null ? DocumentAccessContext.publicDocument() : sourceDescriptor.accessContext(),
SourceType.TED_PACKAGE,
at.procon.dip.domain.document.SourceType.TED_PACKAGE,
sourceDescriptor.sourceIdentifier(),
sourceDescriptor.sourceUri(),
packageRootSource.sourceUri(),
sourceDescriptor.fileName(),
sourceDescriptor.mediaType() == null ? "application/gzip" : sourceDescriptor.mediaType(),
packageBytes,
expanded.manifestText(),
packageRootSource.binaryContent(),
null,
sourceDescriptor.receivedAt() == null ? OffsetDateTime.now() : sourceDescriptor.receivedAt(),
OriginalContentStoragePolicy.SKIP,
rootAttributes
@ -75,56 +71,65 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap
List<at.procon.dip.domain.document.CanonicalDocumentMetadata> documents = new ArrayList<>();
documents.add(packageDocument.document().toCanonicalMetadata());
int sortOrder = 0;
for (TedPackageExpansionService.TedPackageEntry entry : expanded.entries()) {
sortOrder++;
String childUri = "tedpkg://" + sourceDescriptor.sourceIdentifier() + "/" + entry.archivePath();
String childIdentifier = sourceDescriptor.sourceIdentifier() + ":" + entry.archivePath();
String xmlContent = resolveXmlContent(entry);
Map<String, String> childAttributes = new LinkedHashMap<>();
childAttributes.put("documentTypeHint", "TED_NOTICE");
childAttributes.put("packageId", sourceDescriptor.sourceIdentifier());
childAttributes.put("archivePath", entry.archivePath());
childAttributes.put("title", entry.fileName());
childAttributes.put("importBatchId", properties.getGenericIngestion().getTedPackageImportBatchId());
AtomicInteger sortOrder = new AtomicInteger();
streamPackageEntries(sourceDescriptor, entry -> {
TedPackageChildImportProcessor.ChildImportResult result = childImportProcessor.processChild(
packageDocument.document().getId(),
sourceDescriptor.sourceIdentifier(),
sourceDescriptor.receivedAt(),
sourceDescriptor.accessContext(),
entry,
sortOrder.incrementAndGet()
);
if (result.childDocument() != null) {
documents.add(result.childDocument().toCanonicalMetadata());
}
if (StringUtils.hasText(result.warning())) {
warnings.add(result.warning());
}
});
ImportedDocumentResult childResult = importService.importDocument(new SourceDescriptor(
sourceDescriptor.accessContext() == null ? DocumentAccessContext.publicDocument() : sourceDescriptor.accessContext(),
SourceType.PACKAGE_CHILD,
childIdentifier,
childUri,
entry.fileName(),
entry.mediaType() == null ? "application/xml" : entry.mediaType(),
entry.data(),
xmlContent,
sourceDescriptor.receivedAt() == null ? OffsetDateTime.now() : sourceDescriptor.receivedAt(),
OriginalContentStoragePolicy.STORE,
childAttributes
));
return new IngestionResult(documents, warnings);
}
Document childDocument = childResult.document();
documents.add(childDocument.toCanonicalMetadata());
warnings.addAll(childResult.warnings());
if (childResult.deduplicated()) {
warnings.add("TED XML child already existed and was linked to package: " + entry.archivePath());
}
relationService.ensureRelation(new CreateDocumentRelationCommand(
packageDocument.document().getId(),
childDocument.getId(),
RelationType.EXTRACTED_FROM,
sortOrder,
entry.archivePath()
));
private SourceDescriptor buildPackageRootSource(SourceDescriptor sourceDescriptor) {
if (StringUtils.hasText(sourceDescriptor.sourceUri()) && isReadablePath(sourceDescriptor.sourceUri())) {
return new SourceDescriptor(
sourceDescriptor.accessContext(),
sourceDescriptor.sourceType(),
sourceDescriptor.sourceIdentifier(),
sourceDescriptor.sourceUri(),
sourceDescriptor.fileName(),
sourceDescriptor.mediaType(),
null,
sourceDescriptor.textContent(),
sourceDescriptor.receivedAt(),
sourceDescriptor.originalContentStoragePolicy(),
sourceDescriptor.attributes()
);
}
return sourceDescriptor;
}
return new IngestionResult(documents, warnings);
private void streamPackageEntries(SourceDescriptor sourceDescriptor,
TedPackageExpansionService.TedPackageEntryConsumer consumer) {
if (StringUtils.hasText(sourceDescriptor.sourceUri()) && isReadablePath(sourceDescriptor.sourceUri())) {
expansionService.streamEntries(Path.of(sourceDescriptor.sourceUri()), consumer);
return;
}
byte[] packageBytes = sourceDescriptor.binaryContent();
if (packageBytes == null || packageBytes.length == 0) {
throw new IllegalArgumentException("TED package adapter requires tar.gz bytes or a readable local sourceUri");
}
expansionService.streamEntries(packageBytes, consumer);
}
private String resolveXmlContent(TedPackageExpansionService.TedPackageEntry entry) {
if (entry.textUtf8() != null && !entry.textUtf8().isBlank()) {
return entry.textUtf8();
private boolean isReadablePath(String sourceUri) {
try {
Path path = Path.of(sourceUri);
return Files.exists(path) && Files.isReadable(path) && Files.isRegularFile(path);
} catch (Exception e) {
return false;
}
return new String(entry.data(), StandardCharsets.UTF_8);
}
}

@ -0,0 +1,91 @@
package at.procon.dip.ingestion.service;
import at.procon.dip.domain.access.DocumentAccessContext;
import at.procon.dip.domain.document.RelationType;
import at.procon.dip.domain.document.SourceType;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.service.DocumentRelationService;
import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand;
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
import at.procon.dip.ingestion.service.TedPackageExpansionService.TedPackageEntry;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.config.TedProcessorProperties;
import java.time.OffsetDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Service
@RequiredArgsConstructor
public class TedPackageChildImportProcessor {
private final GenericDocumentImportService importService;
private final DocumentRelationService relationService;
private final TedProcessorProperties properties;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public ChildImportResult processChild(UUID packageDocumentId,
String packageSourceIdentifier,
OffsetDateTime packageReceivedAt,
DocumentAccessContext accessContext,
TedPackageEntry entry,
int sortOrder) {
String childUri = "tedpkg://" + packageSourceIdentifier + "/" + entry.archivePath();
String childIdentifier = packageSourceIdentifier + ":" + entry.archivePath();
String xmlContent = entry.textUtf8() != null && !entry.textUtf8().isBlank()
? entry.textUtf8()
: new String(entry.data(), java.nio.charset.StandardCharsets.UTF_8);
Map<String, String> childAttributes = new LinkedHashMap<>();
childAttributes.put("documentTypeHint", "TED_NOTICE");
childAttributes.put("packageId", packageSourceIdentifier);
childAttributes.put("archivePath", entry.archivePath());
childAttributes.put("title", entry.fileName());
childAttributes.put("importBatchId", properties.getGenericIngestion().getTedPackageImportBatchId());
ImportedDocumentResult childResult = importService.importDocument(new SourceDescriptor(
accessContext == null ? DocumentAccessContext.publicDocument() : accessContext,
SourceType.PACKAGE_CHILD,
childIdentifier,
childUri,
entry.fileName(),
"application/xml",
entry.data(),
xmlContent,
packageReceivedAt == null ? OffsetDateTime.now() : packageReceivedAt,
OriginalContentStoragePolicy.STORE,
childAttributes
));
Document childDocument = childResult.document();
relationService.ensureRelation(new CreateDocumentRelationCommand(
packageDocumentId,
childDocument.getId(),
RelationType.EXTRACTED_FROM,
sortOrder,
entry.archivePath()
));
if (childResult.deduplicated()) {
return ChildImportResult.success(childDocument,
"TED XML child already existed and was linked to package: " + entry.archivePath());
}
return ChildImportResult.success(childDocument,
childResult.warnings() == null || childResult.warnings().isEmpty() ? null : String.join(" | ", childResult.warnings()));
}
public record ChildImportResult(Document childDocument, String warning) {
public static ChildImportResult success(Document childDocument, String warning) {
return new ChildImportResult(childDocument, warning);
}
public static ChildImportResult warning(String warning) {
return new ChildImportResult(null, warning);
}
}
}

@ -3,7 +3,10 @@ package at.procon.dip.ingestion.service;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@ -22,67 +25,94 @@ public class TedPackageExpansionService {
public TedPackageExpansionResult expand(byte[] tarGzBytes) {
List<TedPackageEntry> entries = new ArrayList<>();
long total = 0;
streamEntries(tarGzBytes, entries::add);
String manifest = buildManifestText(entries);
return new TedPackageExpansionResult(entries, manifest);
}
public void streamEntries(byte[] tarGzBytes, TedPackageEntryConsumer consumer) {
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(new ByteArrayInputStream(tarGzBytes)))) {
TarArchiveEntry entry;
while ((entry = tais.getNextTarEntry()) != null) {
if (entry.isDirectory()) {
continue;
}
if (entries.size() >= MAX_FILES) {
break;
}
String entryName = entry.getName();
if (!entryName.toLowerCase().endsWith(".xml")) {
continue;
}
if (entryName.contains("..") || entryName.startsWith("/") || entryName.startsWith("\\")) {
log.warn("Skipping suspicious TED package entry {}", entryName);
continue;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[8192];
long fileSize = 0;
int read;
while ((read = tais.read(buffer)) > 0) {
fileSize += read;
total += read;
if (fileSize > MAX_SINGLE_FILE_SIZE || total > MAX_TOTAL_EXTRACTED_SIZE) {
throw new IOException("TED package extraction limits exceeded");
}
baos.write(buffer, 0, read);
streamEntriesInternal(tais, consumer);
} catch (IOException e) {
throw new IllegalStateException("Failed to expand TED package", e);
}
}
public void streamEntries(Path tarGzPath, TedPackageEntryConsumer consumer) {
try (InputStream fileInputStream = Files.newInputStream(tarGzPath);
TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream(fileInputStream))) {
streamEntriesInternal(tais, consumer);
} catch (IOException e) {
throw new IllegalStateException("Failed to expand TED package from path " + tarGzPath, e);
}
}
private void streamEntriesInternal(TarArchiveInputStream tais, TedPackageEntryConsumer consumer) throws IOException {
long total = 0;
int fileCount = 0;
TarArchiveEntry entry;
while ((entry = tais.getNextTarEntry()) != null) {
if (entry.isDirectory()) {
continue;
}
if (fileCount >= MAX_FILES) {
break;
}
String entryName = entry.getName();
if (!entryName.toLowerCase().endsWith(".xml")) {
continue;
}
if (entryName.contains("..") || entryName.startsWith("/") || entryName.startsWith("\\")) {
log.warn("Skipping suspicious TED package entry {}", entryName);
continue;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[8192];
long fileSize = 0;
int read;
while ((read = tais.read(buffer)) > 0) {
baos.write(buffer, 0, read);
fileSize += read;
total += read;
if (fileSize > MAX_SINGLE_FILE_SIZE || total > MAX_TOTAL_EXTRACTED_SIZE) {
throw new IllegalStateException("TED package extraction limits exceeded");
}
byte[] data = baos.toByteArray();
entries.add(new TedPackageEntry(extractFilename(entryName), entryName, data, data.length, "application/xml"));
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to expand TED package", e);
byte[] xmlBytes = baos.toByteArray();
String text = new String(xmlBytes, StandardCharsets.UTF_8);
TedPackageEntry pkgEntry = new TedPackageEntry(entryName, fileName(entryName), xmlBytes, text);
try {
consumer.accept(pkgEntry);
} catch (Exception e) {
throw new IllegalStateException("Failed while handling TED package entry " + entryName, e);
}
fileCount++;
}
String manifest = buildManifest(entries);
return new TedPackageExpansionResult(entries, manifest);
}
private String buildManifest(List<TedPackageEntry> entries) {
StringBuilder sb = new StringBuilder();
sb.append("TED package contains ").append(entries.size()).append(" XML notice files\n");
for (TedPackageEntry entry : entries) {
sb.append("- ").append(entry.archivePath()).append(" (" ).append(entry.sizeBytes()).append(" bytes)\n");
private String buildManifestText(List<TedPackageEntry> entries) {
StringBuilder manifest = new StringBuilder();
manifest.append("TED package entries:\n");
for (TedPackageEntry item : entries) {
manifest.append("- ").append(item.archivePath()).append("\n");
}
return sb.toString().trim();
return manifest.toString();
}
private String extractFilename(String path) {
private String fileName(String path) {
int idx = Math.max(path.lastIndexOf('/'), path.lastIndexOf('\\'));
return idx >= 0 ? path.substring(idx + 1) : path;
}
public record TedPackageExpansionResult(List<TedPackageEntry> entries, String manifestText) {}
@FunctionalInterface
public interface TedPackageEntryConsumer {
void accept(TedPackageEntry entry) throws Exception;
}
public record TedPackageExpansionResult(List<TedPackageEntry> entries, String manifestText) {
}
public record TedPackageEntry(String fileName, String archivePath, byte[] data, long sizeBytes, String mediaType) {
public String textUtf8() {
return new String(data, StandardCharsets.UTF_8);
}
public record TedPackageEntry(String archivePath, String fileName, byte[] data, String textUtf8) {
}
}

@ -231,7 +231,7 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder {
long runningCount = downloadingCount + processingCount;
exchange.getIn().setHeader("runningCount", runningCount);
exchange.getIn().setHeader("tooManyRunning", runningCount >= 1);
exchange.getIn().setHeader("tooManyRunning", runningCount >= 2);
if (runningCount > 0) {
log.info("Currently {} packages in progress ({} downloading, {} processing)",
@ -379,6 +379,7 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder {
properties.getDownload().isDeleteAfterExtraction());
exchange.getIn().setHeader("skipLegacyXmlProcessing", false);
exchange.getIn().setBody(null);
if (properties.getGenericIngestion().isEnabled() && properties.getGenericIngestion().isTedPackageAdapterEnabled()) {
try {
@ -389,7 +390,7 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder {
downloadPath.toString(),
packageId + ".tar.gz",
"application/gzip",
body,
null,
null,
OffsetDateTime.now(),
OriginalContentStoragePolicy.DEFAULT,

@ -152,7 +152,7 @@ ted:
# Download timeout (milliseconds) - 5 minutes
download-timeout: 300000
# Max concurrent downloads
max-concurrent-downloads: 1
max-concurrent-downloads: 2
# Delay between downloads (milliseconds) for rate limiting - 5 seconds
delay-between-downloads: 3000
# Delete tar.gz after extraction

Loading…
Cancel
Save