From f3fcdfab11175ea8e0ab8c61fd4fcfc3a406359d Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Wed, 18 Mar 2026 16:44:22 +0100 Subject: [PATCH] Refactor phases 4.1 - per-child transaction --- .../TedPackageDocumentIngestionAdapter.java | 129 +++++++++--------- .../TedPackageChildImportProcessor.java | 91 ++++++++++++ .../service/TedPackageExpansionService.java | 124 ++++++++++------- .../camel/TedPackageDownloadCamelRoute.java | 5 +- src/main/resources/application.yml | 2 +- 5 files changed, 239 insertions(+), 112 deletions(-) create mode 100644 src/main/java/at/procon/dip/ingestion/service/TedPackageChildImportProcessor.java diff --git a/src/main/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapter.java b/src/main/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapter.java index fbbd535..bcc380a 100644 --- a/src/main/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapter.java +++ b/src/main/java/at/procon/dip/ingestion/adapter/TedPackageDocumentIngestionAdapter.java @@ -1,29 +1,29 @@ package at.procon.dip.ingestion.adapter; import at.procon.dip.domain.access.DocumentAccessContext; -import at.procon.dip.domain.document.RelationType; -import at.procon.dip.domain.document.SourceType; -import at.procon.dip.domain.document.entity.Document; -import at.procon.dip.domain.document.service.DocumentRelationService; -import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand; import at.procon.dip.ingestion.dto.ImportedDocumentResult; import at.procon.dip.ingestion.service.GenericDocumentImportService; +import at.procon.dip.ingestion.service.TedPackageChildImportProcessor; import at.procon.dip.ingestion.service.TedPackageExpansionService; import at.procon.dip.ingestion.spi.DocumentIngestionAdapter; import at.procon.dip.ingestion.spi.IngestionResult; import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy; import at.procon.dip.ingestion.spi.SourceDescriptor; import at.procon.ted.config.TedProcessorProperties; -import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.StringUtils; @Component @RequiredArgsConstructor @@ -33,39 +33,35 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap private final TedProcessorProperties properties; private final GenericDocumentImportService importService; private final TedPackageExpansionService expansionService; - private final DocumentRelationService relationService; + private final TedPackageChildImportProcessor childImportProcessor; @Override public boolean supports(SourceDescriptor sourceDescriptor) { - return sourceDescriptor.sourceType() == SourceType.TED_PACKAGE + return sourceDescriptor.sourceType() == at.procon.dip.domain.document.SourceType.TED_PACKAGE && properties.getGenericIngestion().isEnabled() && properties.getGenericIngestion().isTedPackageAdapterEnabled(); } @Override + @Transactional(propagation = Propagation.NOT_SUPPORTED) public IngestionResult ingest(SourceDescriptor sourceDescriptor) { - byte[] packageBytes = sourceDescriptor.binaryContent(); - if (packageBytes == null || packageBytes.length == 0) { - throw new IllegalArgumentException("TED package adapter requires tar.gz bytes"); - } + SourceDescriptor packageRootSource = buildPackageRootSource(sourceDescriptor); - TedPackageExpansionService.TedPackageExpansionResult expanded = expansionService.expand(packageBytes); Map rootAttributes = new LinkedHashMap<>(sourceDescriptor.attributes() == null ? Map.of() : sourceDescriptor.attributes()); rootAttributes.putIfAbsent("packageId", sourceDescriptor.sourceIdentifier()); rootAttributes.putIfAbsent("title", sourceDescriptor.fileName() != null ? sourceDescriptor.fileName() : sourceDescriptor.sourceIdentifier()); - rootAttributes.put("xmlEntryCount", Integer.toString(expanded.entries().size())); rootAttributes.put("wrapperDocument", Boolean.TRUE.toString()); rootAttributes.put("importBatchId", properties.getGenericIngestion().getTedPackageImportBatchId()); ImportedDocumentResult packageDocument = importService.importDocument(new SourceDescriptor( sourceDescriptor.accessContext() == null ? DocumentAccessContext.publicDocument() : sourceDescriptor.accessContext(), - SourceType.TED_PACKAGE, + at.procon.dip.domain.document.SourceType.TED_PACKAGE, sourceDescriptor.sourceIdentifier(), - sourceDescriptor.sourceUri(), + packageRootSource.sourceUri(), sourceDescriptor.fileName(), sourceDescriptor.mediaType() == null ? "application/gzip" : sourceDescriptor.mediaType(), - packageBytes, - expanded.manifestText(), + packageRootSource.binaryContent(), + null, sourceDescriptor.receivedAt() == null ? OffsetDateTime.now() : sourceDescriptor.receivedAt(), OriginalContentStoragePolicy.SKIP, rootAttributes @@ -75,56 +71,65 @@ public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdap List documents = new ArrayList<>(); documents.add(packageDocument.document().toCanonicalMetadata()); - int sortOrder = 0; - for (TedPackageExpansionService.TedPackageEntry entry : expanded.entries()) { - sortOrder++; - String childUri = "tedpkg://" + sourceDescriptor.sourceIdentifier() + "/" + entry.archivePath(); - String childIdentifier = sourceDescriptor.sourceIdentifier() + ":" + entry.archivePath(); - String xmlContent = resolveXmlContent(entry); - - Map childAttributes = new LinkedHashMap<>(); - childAttributes.put("documentTypeHint", "TED_NOTICE"); - childAttributes.put("packageId", sourceDescriptor.sourceIdentifier()); - childAttributes.put("archivePath", entry.archivePath()); - childAttributes.put("title", entry.fileName()); - childAttributes.put("importBatchId", properties.getGenericIngestion().getTedPackageImportBatchId()); + AtomicInteger sortOrder = new AtomicInteger(); + streamPackageEntries(sourceDescriptor, entry -> { + TedPackageChildImportProcessor.ChildImportResult result = childImportProcessor.processChild( + packageDocument.document().getId(), + sourceDescriptor.sourceIdentifier(), + sourceDescriptor.receivedAt(), + sourceDescriptor.accessContext(), + entry, + sortOrder.incrementAndGet() + ); + if (result.childDocument() != null) { + documents.add(result.childDocument().toCanonicalMetadata()); + } + if (StringUtils.hasText(result.warning())) { + warnings.add(result.warning()); + } + }); - ImportedDocumentResult childResult = importService.importDocument(new SourceDescriptor( - sourceDescriptor.accessContext() == null ? DocumentAccessContext.publicDocument() : sourceDescriptor.accessContext(), - SourceType.PACKAGE_CHILD, - childIdentifier, - childUri, - entry.fileName(), - entry.mediaType() == null ? "application/xml" : entry.mediaType(), - entry.data(), - xmlContent, - sourceDescriptor.receivedAt() == null ? OffsetDateTime.now() : sourceDescriptor.receivedAt(), - OriginalContentStoragePolicy.STORE, - childAttributes - )); + return new IngestionResult(documents, warnings); + } - Document childDocument = childResult.document(); - documents.add(childDocument.toCanonicalMetadata()); - warnings.addAll(childResult.warnings()); - if (childResult.deduplicated()) { - warnings.add("TED XML child already existed and was linked to package: " + entry.archivePath()); - } - relationService.ensureRelation(new CreateDocumentRelationCommand( - packageDocument.document().getId(), - childDocument.getId(), - RelationType.EXTRACTED_FROM, - sortOrder, - entry.archivePath() - )); + private SourceDescriptor buildPackageRootSource(SourceDescriptor sourceDescriptor) { + if (StringUtils.hasText(sourceDescriptor.sourceUri()) && isReadablePath(sourceDescriptor.sourceUri())) { + return new SourceDescriptor( + sourceDescriptor.accessContext(), + sourceDescriptor.sourceType(), + sourceDescriptor.sourceIdentifier(), + sourceDescriptor.sourceUri(), + sourceDescriptor.fileName(), + sourceDescriptor.mediaType(), + null, + sourceDescriptor.textContent(), + sourceDescriptor.receivedAt(), + sourceDescriptor.originalContentStoragePolicy(), + sourceDescriptor.attributes() + ); } + return sourceDescriptor; + } - return new IngestionResult(documents, warnings); + private void streamPackageEntries(SourceDescriptor sourceDescriptor, + TedPackageExpansionService.TedPackageEntryConsumer consumer) { + if (StringUtils.hasText(sourceDescriptor.sourceUri()) && isReadablePath(sourceDescriptor.sourceUri())) { + expansionService.streamEntries(Path.of(sourceDescriptor.sourceUri()), consumer); + return; + } + byte[] packageBytes = sourceDescriptor.binaryContent(); + if (packageBytes == null || packageBytes.length == 0) { + throw new IllegalArgumentException("TED package adapter requires tar.gz bytes or a readable local sourceUri"); + } + expansionService.streamEntries(packageBytes, consumer); } - private String resolveXmlContent(TedPackageExpansionService.TedPackageEntry entry) { - if (entry.textUtf8() != null && !entry.textUtf8().isBlank()) { - return entry.textUtf8(); + private boolean isReadablePath(String sourceUri) { + try { + Path path = Path.of(sourceUri); + return Files.exists(path) && Files.isReadable(path) && Files.isRegularFile(path); + } catch (Exception e) { + return false; } - return new String(entry.data(), StandardCharsets.UTF_8); } } diff --git a/src/main/java/at/procon/dip/ingestion/service/TedPackageChildImportProcessor.java b/src/main/java/at/procon/dip/ingestion/service/TedPackageChildImportProcessor.java new file mode 100644 index 0000000..1b0fa0d --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/service/TedPackageChildImportProcessor.java @@ -0,0 +1,91 @@ +package at.procon.dip.ingestion.service; + +import at.procon.dip.domain.access.DocumentAccessContext; +import at.procon.dip.domain.document.RelationType; +import at.procon.dip.domain.document.SourceType; +import at.procon.dip.domain.document.entity.Document; +import at.procon.dip.domain.document.service.DocumentRelationService; +import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand; +import at.procon.dip.ingestion.dto.ImportedDocumentResult; +import at.procon.dip.ingestion.service.TedPackageExpansionService.TedPackageEntry; +import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy; +import at.procon.dip.ingestion.spi.SourceDescriptor; +import at.procon.ted.config.TedProcessorProperties; +import java.time.OffsetDateTime; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +@Service +@RequiredArgsConstructor +public class TedPackageChildImportProcessor { + + private final GenericDocumentImportService importService; + private final DocumentRelationService relationService; + private final TedProcessorProperties properties; + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public ChildImportResult processChild(UUID packageDocumentId, + String packageSourceIdentifier, + OffsetDateTime packageReceivedAt, + DocumentAccessContext accessContext, + TedPackageEntry entry, + int sortOrder) { + String childUri = "tedpkg://" + packageSourceIdentifier + "/" + entry.archivePath(); + String childIdentifier = packageSourceIdentifier + ":" + entry.archivePath(); + String xmlContent = entry.textUtf8() != null && !entry.textUtf8().isBlank() + ? entry.textUtf8() + : new String(entry.data(), java.nio.charset.StandardCharsets.UTF_8); + + Map childAttributes = new LinkedHashMap<>(); + childAttributes.put("documentTypeHint", "TED_NOTICE"); + childAttributes.put("packageId", packageSourceIdentifier); + childAttributes.put("archivePath", entry.archivePath()); + childAttributes.put("title", entry.fileName()); + childAttributes.put("importBatchId", properties.getGenericIngestion().getTedPackageImportBatchId()); + + ImportedDocumentResult childResult = importService.importDocument(new SourceDescriptor( + accessContext == null ? DocumentAccessContext.publicDocument() : accessContext, + SourceType.PACKAGE_CHILD, + childIdentifier, + childUri, + entry.fileName(), + "application/xml", + entry.data(), + xmlContent, + packageReceivedAt == null ? OffsetDateTime.now() : packageReceivedAt, + OriginalContentStoragePolicy.STORE, + childAttributes + )); + + Document childDocument = childResult.document(); + relationService.ensureRelation(new CreateDocumentRelationCommand( + packageDocumentId, + childDocument.getId(), + RelationType.EXTRACTED_FROM, + sortOrder, + entry.archivePath() + )); + + if (childResult.deduplicated()) { + return ChildImportResult.success(childDocument, + "TED XML child already existed and was linked to package: " + entry.archivePath()); + } + return ChildImportResult.success(childDocument, + childResult.warnings() == null || childResult.warnings().isEmpty() ? null : String.join(" | ", childResult.warnings())); + } + + public record ChildImportResult(Document childDocument, String warning) { + public static ChildImportResult success(Document childDocument, String warning) { + return new ChildImportResult(childDocument, warning); + } + + public static ChildImportResult warning(String warning) { + return new ChildImportResult(null, warning); + } + } +} diff --git a/src/main/java/at/procon/dip/ingestion/service/TedPackageExpansionService.java b/src/main/java/at/procon/dip/ingestion/service/TedPackageExpansionService.java index 67bfd83..d80fe73 100644 --- a/src/main/java/at/procon/dip/ingestion/service/TedPackageExpansionService.java +++ b/src/main/java/at/procon/dip/ingestion/service/TedPackageExpansionService.java @@ -3,7 +3,10 @@ package at.procon.dip.ingestion.service; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -22,67 +25,94 @@ public class TedPackageExpansionService { public TedPackageExpansionResult expand(byte[] tarGzBytes) { List entries = new ArrayList<>(); - long total = 0; + streamEntries(tarGzBytes, entries::add); + String manifest = buildManifestText(entries); + return new TedPackageExpansionResult(entries, manifest); + } + + public void streamEntries(byte[] tarGzBytes, TedPackageEntryConsumer consumer) { try (TarArchiveInputStream tais = new TarArchiveInputStream( new GzipCompressorInputStream(new ByteArrayInputStream(tarGzBytes)))) { - TarArchiveEntry entry; - while ((entry = tais.getNextTarEntry()) != null) { - if (entry.isDirectory()) { - continue; - } - if (entries.size() >= MAX_FILES) { - break; - } - String entryName = entry.getName(); - if (!entryName.toLowerCase().endsWith(".xml")) { - continue; - } - if (entryName.contains("..") || entryName.startsWith("/") || entryName.startsWith("\\")) { - log.warn("Skipping suspicious TED package entry {}", entryName); - continue; - } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] buffer = new byte[8192]; - long fileSize = 0; - int read; - while ((read = tais.read(buffer)) > 0) { - fileSize += read; - total += read; - if (fileSize > MAX_SINGLE_FILE_SIZE || total > MAX_TOTAL_EXTRACTED_SIZE) { - throw new IOException("TED package extraction limits exceeded"); - } - baos.write(buffer, 0, read); + streamEntriesInternal(tais, consumer); + } catch (IOException e) { + throw new IllegalStateException("Failed to expand TED package", e); + } + } + + public void streamEntries(Path tarGzPath, TedPackageEntryConsumer consumer) { + try (InputStream fileInputStream = Files.newInputStream(tarGzPath); + TarArchiveInputStream tais = new TarArchiveInputStream(new GzipCompressorInputStream(fileInputStream))) { + streamEntriesInternal(tais, consumer); + } catch (IOException e) { + throw new IllegalStateException("Failed to expand TED package from path " + tarGzPath, e); + } + } + + private void streamEntriesInternal(TarArchiveInputStream tais, TedPackageEntryConsumer consumer) throws IOException { + long total = 0; + int fileCount = 0; + TarArchiveEntry entry; + while ((entry = tais.getNextTarEntry()) != null) { + if (entry.isDirectory()) { + continue; + } + if (fileCount >= MAX_FILES) { + break; + } + String entryName = entry.getName(); + if (!entryName.toLowerCase().endsWith(".xml")) { + continue; + } + if (entryName.contains("..") || entryName.startsWith("/") || entryName.startsWith("\\")) { + log.warn("Skipping suspicious TED package entry {}", entryName); + continue; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[8192]; + long fileSize = 0; + int read; + while ((read = tais.read(buffer)) > 0) { + baos.write(buffer, 0, read); + fileSize += read; + total += read; + if (fileSize > MAX_SINGLE_FILE_SIZE || total > MAX_TOTAL_EXTRACTED_SIZE) { + throw new IllegalStateException("TED package extraction limits exceeded"); } - byte[] data = baos.toByteArray(); - entries.add(new TedPackageEntry(extractFilename(entryName), entryName, data, data.length, "application/xml")); } - } catch (Exception e) { - throw new IllegalArgumentException("Failed to expand TED package", e); + byte[] xmlBytes = baos.toByteArray(); + String text = new String(xmlBytes, StandardCharsets.UTF_8); + TedPackageEntry pkgEntry = new TedPackageEntry(entryName, fileName(entryName), xmlBytes, text); + try { + consumer.accept(pkgEntry); + } catch (Exception e) { + throw new IllegalStateException("Failed while handling TED package entry " + entryName, e); + } + fileCount++; } - - String manifest = buildManifest(entries); - return new TedPackageExpansionResult(entries, manifest); } - private String buildManifest(List entries) { - StringBuilder sb = new StringBuilder(); - sb.append("TED package contains ").append(entries.size()).append(" XML notice files\n"); - for (TedPackageEntry entry : entries) { - sb.append("- ").append(entry.archivePath()).append(" (" ).append(entry.sizeBytes()).append(" bytes)\n"); + private String buildManifestText(List entries) { + StringBuilder manifest = new StringBuilder(); + manifest.append("TED package entries:\n"); + for (TedPackageEntry item : entries) { + manifest.append("- ").append(item.archivePath()).append("\n"); } - return sb.toString().trim(); + return manifest.toString(); } - private String extractFilename(String path) { + private String fileName(String path) { int idx = Math.max(path.lastIndexOf('/'), path.lastIndexOf('\\')); return idx >= 0 ? path.substring(idx + 1) : path; } - public record TedPackageExpansionResult(List entries, String manifestText) {} + @FunctionalInterface + public interface TedPackageEntryConsumer { + void accept(TedPackageEntry entry) throws Exception; + } + + public record TedPackageExpansionResult(List entries, String manifestText) { + } - public record TedPackageEntry(String fileName, String archivePath, byte[] data, long sizeBytes, String mediaType) { - public String textUtf8() { - return new String(data, StandardCharsets.UTF_8); - } + public record TedPackageEntry(String archivePath, String fileName, byte[] data, String textUtf8) { } } diff --git a/src/main/java/at/procon/ted/camel/TedPackageDownloadCamelRoute.java b/src/main/java/at/procon/ted/camel/TedPackageDownloadCamelRoute.java index 2f290ff..5e891d1 100644 --- a/src/main/java/at/procon/ted/camel/TedPackageDownloadCamelRoute.java +++ b/src/main/java/at/procon/ted/camel/TedPackageDownloadCamelRoute.java @@ -231,7 +231,7 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder { long runningCount = downloadingCount + processingCount; exchange.getIn().setHeader("runningCount", runningCount); - exchange.getIn().setHeader("tooManyRunning", runningCount >= 1); + exchange.getIn().setHeader("tooManyRunning", runningCount >= 2); if (runningCount > 0) { log.info("Currently {} packages in progress ({} downloading, {} processing)", @@ -379,6 +379,7 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder { properties.getDownload().isDeleteAfterExtraction()); exchange.getIn().setHeader("skipLegacyXmlProcessing", false); + exchange.getIn().setBody(null); if (properties.getGenericIngestion().isEnabled() && properties.getGenericIngestion().isTedPackageAdapterEnabled()) { try { @@ -389,7 +390,7 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder { downloadPath.toString(), packageId + ".tar.gz", "application/gzip", - body, + null, null, OffsetDateTime.now(), OriginalContentStoragePolicy.DEFAULT, diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 743539b..1fa8a4f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -152,7 +152,7 @@ ted: # Download timeout (milliseconds) - 5 minutes download-timeout: 300000 # Max concurrent downloads - max-concurrent-downloads: 1 + max-concurrent-downloads: 2 # Delay between downloads (milliseconds) for rate limiting - 5 seconds delay-between-downloads: 3000 # Delete tar.gz after extraction