Refactor phases 4.1

master
trifonovt 1 month ago
parent ac59730f3e
commit f337af56b5

@ -0,0 +1,28 @@
# Phase 4.1 adapter extensions
## Added adapters
### TED package adapter
- Source type: `TED_PACKAGE`
- Root access: `PUBLIC`, no owner tenant
- Root document type: `TED_PACKAGE`
- Child source type: `PACKAGE_CHILD`
- Child relation: `EXTRACTED_FROM`
The adapter imports the package artifact plus its XML members into the generic `DOC` model.
It does not replace the existing legacy TED package processing path; instead it complements it, so the later legacy TED parsing step can still enrich the same canonical child documents into proper `TED_NOTICE` projections by dedup hash.
### Mail/document adapter
- Source type: `MAIL`
- Root document type: `MIME_MESSAGE`
- Child relation: `ATTACHMENT_OF`
- Access: configurable via `mail-default-owner-tenant-key` and `mail-default-visibility`
The adapter stores the message body as the semantic root text and imports attachments as child documents. ZIP attachments can optionally be expanded recursively.
## Deduplication
Phase 4 deduplication by content hash is refined so the same payload is only deduplicated within the same access scope (`visibility` + `owner tenant`).
This prevents private documents from different tenants from being merged into one canonical document accidentally.

@ -224,6 +224,14 @@
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-database-postgresql</artifactId>
</dependency>
</dependencies>
<build>

@ -0,0 +1,34 @@
# Phase 4.1 TED package and mail/document adapters
This phase extends the generic DOC ingestion SPI with two richer adapters:
- `TedPackageDocumentIngestionAdapter`
- `MailDocumentIngestionAdapter`
## TED package adapter
- imports the package artifact itself as a public DOC document
- expands the `.tar.gz` package into XML child payloads
- imports each child XML as a generic DOC child document
- links children to the package root via `EXTRACTED_FROM`
- keeps the existing legacy TED package processing path intact
## Mail/document adapter
- imports the MIME message as a DOC document
- extracts subject/from/to/body into the mail root semantic text
- imports attachments as child DOC documents
- links attachments via `ATTACHMENT_OF`
- optionally expands ZIP attachments recursively
## Access semantics
- TED packages and TED XML children are imported as `PUBLIC` with no owner tenant
- mail documents use a dedicated default mail access context (`mail-default-owner-tenant-key`, `mail-default-visibility`)
- deduplication is access-scope aware so private content is not merged across different tenants
Additional note:
- wrapper/container documents (for example TED package roots or ZIP wrapper documents expanded into child documents) can skip persistence of ORIGINAL content via `ted.generic-ingestion.store-original-content-for-wrapper-documents=false`, and adapters can now override that default per imported document through `SourceDescriptor.originalContentStoragePolicy` (`STORE` / `SKIP` / `DEFAULT`), while still keeping metadata, derived representations and child relations.
- when original content storage is skipped for a document, GenericDocumentImportService now also skips extraction, derived-content persistence, representation building, and embedding queueing for that document
Schema note:
- `V8__doc_phase4_1_expand_document_and_source_types.sql` expands the generic `DOC` document/source type domain for `TED_PACKAGE` and `PACKAGE_CHILD`, and also repairs older local/dev schemas that used CHECK constraints instead of PostgreSQL ENUM types.

@ -4,6 +4,7 @@ package at.procon.dip.domain.document;
* Canonical technical document type.
*/
public enum DocumentType {
TED_PACKAGE,
TED_NOTICE,
EMAIL,
MIME_MESSAGE,

@ -5,6 +5,7 @@ package at.procon.dip.domain.document;
*/
public enum SourceType {
TED_PACKAGE,
PACKAGE_CHILD,
MAIL,
FILE_SYSTEM,
REST_UPLOAD,

@ -13,4 +13,6 @@ public interface DocumentRelationRepository extends JpaRepository<DocumentRelati
List<DocumentRelation> findByChildDocument_Id(UUID childDocumentId);
List<DocumentRelation> findByParentDocument_IdAndRelationType(UUID parentDocumentId, RelationType relationType);
boolean existsByParentDocument_IdAndChildDocument_IdAndRelationType(UUID parentDocumentId, UUID childDocumentId, RelationType relationType);
}

@ -15,6 +15,8 @@ public interface DocumentRepository extends JpaRepository<Document, UUID> {
Optional<Document> findByDedupHash(String dedupHash);
List<Document> findAllByDedupHash(String dedupHash);
boolean existsByDedupHash(String dedupHash);
List<Document> findByDocumentType(DocumentType documentType);

@ -28,6 +28,19 @@ public class DocumentRelationService {
return relationRepository.save(relation);
}
public DocumentRelation ensureRelation(CreateDocumentRelationCommand command) {
boolean exists = relationRepository.existsByParentDocument_IdAndChildDocument_IdAndRelationType(
command.parentDocumentId(), command.childDocumentId(), command.relationType());
if (exists) {
return relationRepository.findByParentDocument_IdAndRelationType(command.parentDocumentId(), command.relationType())
.stream()
.filter(rel -> rel.getChildDocument() != null && command.childDocumentId().equals(rel.getChildDocument().getId()))
.findFirst()
.orElseGet(() -> createRelation(command));
}
return createRelation(command);
}
@Transactional(readOnly = true)
public List<DocumentRelation> findChildren(UUID parentDocumentId) {
return relationRepository.findByParentDocument_Id(parentDocumentId);

@ -15,7 +15,6 @@ import at.procon.ted.model.entity.ProcurementLot;
import at.procon.ted.service.TedPhase2GenericDocumentService;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -63,8 +62,7 @@ public class TedNoticeProjectionService {
Document genericDocument = documentRepository.findById(resolvedDocumentId)
.orElseThrow(() -> new IllegalArgumentException("Unknown DOC document id: " + finalResolvedDocumentId));
TedNoticeProjection projection = projectionRepository.findByLegacyProcurementDocumentId(legacyDocument.getId())
.or(() -> projectionRepository.findByDocument_Id(genericDocument.getId()))
TedNoticeProjection projection = projectionRepository.findByDocument_Id(genericDocument.getId())
.orElseGet(TedNoticeProjection::new);
mapProjection(projection, genericDocument, legacyDocument);
@ -72,19 +70,13 @@ public class TedNoticeProjectionService {
replaceLots(projection, legacyDocument.getLots());
replaceOrganizations(projection, legacyDocument.getOrganizations());
log.debug("Phase 3 TED projection ensured for legacy {} -> projection {} / doc {}",
legacyDocument.getId(), projection.getId(), genericDocument.getId());
log.debug("Phase 3 TED projection ensured for generic doc {} -> projection {} (noticeId={}, publicationId={})",
genericDocument.getId(), projection.getId(), legacyDocument.getNoticeId(), legacyDocument.getPublicationId());
return projection.getId();
}
@Transactional(readOnly = true)
public Optional<TedNoticeProjection> findByLegacyProcurementDocumentId(UUID legacyDocumentId) {
return projectionRepository.findByLegacyProcurementDocumentId(legacyDocumentId);
}
private void mapProjection(TedNoticeProjection projection, Document genericDocument, ProcurementDocument legacyDocument) {
projection.setDocument(genericDocument);
projection.setLegacyProcurementDocumentId(legacyDocument.getId());
projection.setNoticeId(legacyDocument.getNoticeId());
projection.setPublicationId(legacyDocument.getPublicationId());
projection.setNoticeUrl(legacyDocument.getNoticeUrl());

@ -0,0 +1,56 @@
package at.procon.dip.extraction.impl;
import at.procon.dip.domain.document.ContentRole;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.extraction.spi.DocumentExtractor;
import at.procon.dip.extraction.spi.ExtractedStructuredPayload;
import at.procon.dip.extraction.spi.ExtractionRequest;
import at.procon.dip.extraction.spi.ExtractionResult;
import at.procon.dip.ingestion.util.DocumentImportSupport;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class MimeMessageDocumentExtractor implements DocumentExtractor {
@Override
public boolean supports(DocumentType documentType, String mimeType) {
return documentType == DocumentType.MIME_MESSAGE || documentType == DocumentType.EMAIL;
}
@Override
public ExtractionResult extract(ExtractionRequest extractionRequest) {
String text = extractionRequest.textContent();
if (!StringUtils.hasText(text) && extractionRequest.binaryContent() != null) {
text = new String(extractionRequest.binaryContent(), StandardCharsets.UTF_8);
}
text = text == null ? null : text.replace("\r\n", "\n").replace('\r', '\n').trim();
Map<String, Object> attributes = new LinkedHashMap<>();
if (extractionRequest.sourceDescriptor().attributes() != null) {
attributes.putAll(extractionRequest.sourceDescriptor().attributes());
}
String title = DocumentImportSupport.firstNonBlank(extractionRequest.sourceDescriptor().attributes(), "title", "subject");
if (!StringUtils.hasText(title)) {
title = extractionRequest.sourceDescriptor().fileName();
}
if (StringUtils.hasText(title)) {
attributes.put("title", title);
}
if (!StringUtils.hasText(text)) {
return new ExtractionResult(Map.of(), List.of(new ExtractedStructuredPayload("mail-message", attributes)),
List.of("Mail message did not contain extractable text body"));
}
return new ExtractionResult(
Map.of(ContentRole.NORMALIZED_TEXT, text),
List.of(new ExtractedStructuredPayload("mail-message", attributes)),
List.of()
);
}
}

@ -22,7 +22,7 @@ public class PdfDocumentExtractor implements DocumentExtractor {
@Override
public boolean supports(DocumentType documentType, String mimeType) {
return documentType == DocumentType.PDF || pdfExtractionService.canHandle("dummy.pdf", mimeType);
return documentType == DocumentType.PDF;
}
@Override

@ -0,0 +1,49 @@
package at.procon.dip.extraction.impl;
import at.procon.dip.domain.document.ContentRole;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.extraction.spi.DocumentExtractor;
import at.procon.dip.extraction.spi.ExtractedStructuredPayload;
import at.procon.dip.extraction.spi.ExtractionRequest;
import at.procon.dip.extraction.spi.ExtractionResult;
import at.procon.dip.ingestion.util.DocumentImportSupport;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
//@Component
public class TedPackageManifestExtractor implements DocumentExtractor {
@Override
public boolean supports(DocumentType documentType, String mimeType) {
return documentType == DocumentType.TED_PACKAGE;
}
@Override
public ExtractionResult extract(ExtractionRequest extractionRequest) {
String manifest = extractionRequest.textContent();
if (!StringUtils.hasText(manifest)) {
manifest = "TED package: " + extractionRequest.sourceDescriptor().sourceIdentifier();
}
Map<String, Object> attributes = new LinkedHashMap<>();
if (extractionRequest.sourceDescriptor().attributes() != null) {
attributes.putAll(extractionRequest.sourceDescriptor().attributes());
}
String title = DocumentImportSupport.firstNonBlank(extractionRequest.sourceDescriptor().attributes(), "title", "packageId");
if (!StringUtils.hasText(title)) {
title = extractionRequest.sourceDescriptor().fileName();
}
if (StringUtils.hasText(title)) {
attributes.put("title", title);
}
return new ExtractionResult(
Map.of(ContentRole.NORMALIZED_TEXT, manifest),
List.of(new ExtractedStructuredPayload("ted-package-manifest", attributes)),
List.of()
);
}
}

@ -0,0 +1,170 @@
package at.procon.dip.ingestion.adapter;
import at.procon.dip.domain.access.DocumentAccessContext;
import at.procon.dip.domain.access.DocumentVisibility;
import at.procon.dip.domain.document.RelationType;
import at.procon.dip.domain.document.SourceType;
import at.procon.dip.domain.tenant.TenantRef;
import at.procon.dip.domain.document.service.DocumentRelationService;
import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand;
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
import at.procon.dip.ingestion.service.GenericDocumentImportService;
import at.procon.dip.ingestion.service.MailMessageExtractionService;
import at.procon.dip.ingestion.service.MailMessageExtractionService.MailAttachment;
import at.procon.dip.ingestion.service.MailMessageExtractionService.ParsedMailMessage;
import at.procon.dip.ingestion.spi.DocumentIngestionAdapter;
import at.procon.dip.ingestion.spi.IngestionResult;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.dip.ingestion.util.DocumentImportSupport;
import at.procon.ted.config.TedProcessorProperties;
import at.procon.ted.service.attachment.AttachmentExtractor;
import at.procon.ted.service.attachment.ZipExtractionService;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter {
private final TedProcessorProperties properties;
private final GenericDocumentImportService importService;
private final MailMessageExtractionService mailExtractionService;
private final DocumentRelationService relationService;
private final ZipExtractionService zipExtractionService;
@Override
public boolean supports(SourceDescriptor sourceDescriptor) {
return sourceDescriptor.sourceType() == SourceType.MAIL
&& properties.getGenericIngestion().isEnabled()
&& properties.getGenericIngestion().isMailAdapterEnabled();
}
@Override
public IngestionResult ingest(SourceDescriptor sourceDescriptor) {
byte[] rawMime = sourceDescriptor.binaryContent();
if (rawMime == null || rawMime.length == 0) {
throw new IllegalArgumentException("Mail adapter requires raw MIME bytes");
}
ParsedMailMessage parsed = mailExtractionService.parse(rawMime);
DocumentAccessContext accessContext = sourceDescriptor.accessContext() == null ? defaultMailAccessContext() : sourceDescriptor.accessContext();
Map<String, String> rootAttributes = new LinkedHashMap<>(sourceDescriptor.attributes() == null ? Map.of() : sourceDescriptor.attributes());
if (parsed.subject() != null) rootAttributes.put("subject", parsed.subject());
if (parsed.from() != null) rootAttributes.put("from", parsed.from());
if (!parsed.recipients().isEmpty()) rootAttributes.put("to", String.join(", ", parsed.recipients()));
rootAttributes.putIfAbsent("title", parsed.subject() != null ? parsed.subject() : sourceDescriptor.fileName());
rootAttributes.put("attachmentCount", Integer.toString(parsed.attachments().size()));
rootAttributes.put("importBatchId", properties.getGenericIngestion().getMailImportBatchId());
ImportedDocumentResult rootResult = importService.importDocument(new SourceDescriptor(
accessContext,
SourceType.MAIL,
sourceDescriptor.sourceIdentifier(),
sourceDescriptor.sourceUri(),
sourceDescriptor.fileName() != null ? sourceDescriptor.fileName() : fallbackMailFileName(parsed),
"message/rfc822",
rawMime,
mailExtractionService.serializeMessage(parsed),
parsed.receivedAt() == null ? OffsetDateTime.now() : parsed.receivedAt(),
OriginalContentStoragePolicy.STORE,
rootAttributes
));
List<at.procon.dip.domain.document.CanonicalDocumentMetadata> documents = new ArrayList<>();
List<String> warnings = new ArrayList<>(rootResult.warnings());
documents.add(rootResult.document().toCanonicalMetadata());
int sortOrder = 0;
for (MailAttachment attachment : parsed.attachments()) {
importAttachment(rootResult.document().getId(), accessContext, sourceDescriptor, attachment, documents, warnings, ++sortOrder, 0);
}
return new IngestionResult(documents, warnings);
}
private void importAttachment(java.util.UUID parentDocumentId, DocumentAccessContext accessContext, SourceDescriptor parentSource,
MailAttachment attachment, List<at.procon.dip.domain.document.CanonicalDocumentMetadata> documents,
List<String> warnings, int sortOrder, int depth) {
boolean expandableWrapper = properties.getGenericIngestion().isExpandMailZipAttachments()
&& zipExtractionService.canHandle(attachment.fileName(), attachment.contentType());
Map<String, String> attachmentAttributes = new LinkedHashMap<>();
attachmentAttributes.put("title", attachment.fileName());
attachmentAttributes.put("mailSourceIdentifier", parentSource.sourceIdentifier());
attachmentAttributes.put("importBatchId", properties.getGenericIngestion().getMailImportBatchId());
if (expandableWrapper) {
attachmentAttributes.put("wrapperDocument", Boolean.TRUE.toString());
}
ImportedDocumentResult attachmentResult = importService.importDocument(new SourceDescriptor(
accessContext,
SourceType.MAIL,
parentSource.sourceIdentifier() + ":attachment:" + depth + ":" + attachment.fileName(),
parentSource.sourceUri(),
attachment.fileName(),
DocumentImportSupport.normalizeMediaType(attachment.contentType()),
attachment.data(),
previewTextIfLikelyText(attachment),
parentSource.receivedAt() == null ? OffsetDateTime.now() : parentSource.receivedAt(),
expandableWrapper ? OriginalContentStoragePolicy.SKIP : OriginalContentStoragePolicy.STORE,
attachmentAttributes
));
documents.add(attachmentResult.document().toCanonicalMetadata());
warnings.addAll(attachmentResult.warnings());
RelationType relationType = depth > 0 || attachment.path() != null ? RelationType.EXTRACTED_FROM : RelationType.ATTACHMENT_OF;
relationService.ensureRelation(new CreateDocumentRelationCommand(
parentDocumentId, attachmentResult.document().getId(), relationType, sortOrder, attachment.fileName()));
if (expandableWrapper) {
AttachmentExtractor.ExtractionResult zipResult = zipExtractionService.extract(attachment.data(), attachment.fileName(), attachment.contentType());
if (!zipResult.success()) {
warnings.add("ZIP attachment extraction failed for " + attachment.fileName() + ": " + zipResult.errorMessage());
return;
}
int childSort = 0;
for (AttachmentExtractor.ChildAttachment child : zipResult.childAttachments()) {
importAttachment(attachmentResult.document().getId(), accessContext, parentSource,
new MailAttachment(child.filename(), child.contentType(), child.data(), child.data().length, child.pathInArchive()),
documents, warnings, ++childSort, depth + 1);
}
}
}
private String fallbackMailFileName(ParsedMailMessage parsed) {
String subject = parsed.subject() == null || parsed.subject().isBlank() ? "mail-message" : parsed.subject().replaceAll("[^A-Za-z0-9._-]", "_");
return subject + ".eml";
}
private DocumentAccessContext defaultMailAccessContext() {
String tenantKey = properties.getGenericIngestion().getMailDefaultOwnerTenantKey();
if (tenantKey == null || tenantKey.isBlank()) {
tenantKey = properties.getGenericIngestion().getDefaultOwnerTenantKey();
}
DocumentVisibility visibility = properties.getGenericIngestion().getMailDefaultVisibility();
TenantRef tenant = (tenantKey == null || tenantKey.isBlank()) ? null : new TenantRef(null, tenantKey, tenantKey);
if (tenant == null && visibility == DocumentVisibility.TENANT) {
visibility = DocumentVisibility.RESTRICTED;
}
return new DocumentAccessContext(tenant, visibility);
}
private String previewTextIfLikelyText(MailAttachment attachment) {
String mime = DocumentImportSupport.normalizeMediaType(attachment.contentType());
if (DocumentImportSupport.isLikelyTextMime(mime)) {
return attachment.safeTextPreview();
}
String ext = DocumentImportSupport.extensionOf(attachment.fileName());
if ("txt".equals(ext) || "xml".equals(ext) || "html".equals(ext) || "htm".equals(ext) || "md".equals(ext)) {
return attachment.safeTextPreview();
}
return null;
}
}

@ -0,0 +1,130 @@
package at.procon.dip.ingestion.adapter;
import at.procon.dip.domain.access.DocumentAccessContext;
import at.procon.dip.domain.document.RelationType;
import at.procon.dip.domain.document.SourceType;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.service.DocumentRelationService;
import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand;
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
import at.procon.dip.ingestion.service.GenericDocumentImportService;
import at.procon.dip.ingestion.service.TedPackageExpansionService;
import at.procon.dip.ingestion.spi.DocumentIngestionAdapter;
import at.procon.dip.ingestion.spi.IngestionResult;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.config.TedProcessorProperties;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@RequiredArgsConstructor
@Slf4j
public class TedPackageDocumentIngestionAdapter implements DocumentIngestionAdapter {
private final TedProcessorProperties properties;
private final GenericDocumentImportService importService;
private final TedPackageExpansionService expansionService;
private final DocumentRelationService relationService;
@Override
public boolean supports(SourceDescriptor sourceDescriptor) {
return sourceDescriptor.sourceType() == SourceType.TED_PACKAGE
&& properties.getGenericIngestion().isEnabled()
&& properties.getGenericIngestion().isTedPackageAdapterEnabled();
}
@Override
public IngestionResult ingest(SourceDescriptor sourceDescriptor) {
byte[] packageBytes = sourceDescriptor.binaryContent();
if (packageBytes == null || packageBytes.length == 0) {
throw new IllegalArgumentException("TED package adapter requires tar.gz bytes");
}
TedPackageExpansionService.TedPackageExpansionResult expanded = expansionService.expand(packageBytes);
Map<String, String> rootAttributes = new LinkedHashMap<>(sourceDescriptor.attributes() == null ? Map.of() : sourceDescriptor.attributes());
rootAttributes.putIfAbsent("packageId", sourceDescriptor.sourceIdentifier());
rootAttributes.putIfAbsent("title", sourceDescriptor.fileName() != null ? sourceDescriptor.fileName() : sourceDescriptor.sourceIdentifier());
rootAttributes.put("xmlEntryCount", Integer.toString(expanded.entries().size()));
rootAttributes.put("wrapperDocument", Boolean.TRUE.toString());
rootAttributes.put("importBatchId", properties.getGenericIngestion().getTedPackageImportBatchId());
ImportedDocumentResult packageDocument = importService.importDocument(new SourceDescriptor(
sourceDescriptor.accessContext() == null ? DocumentAccessContext.publicDocument() : sourceDescriptor.accessContext(),
SourceType.TED_PACKAGE,
sourceDescriptor.sourceIdentifier(),
sourceDescriptor.sourceUri(),
sourceDescriptor.fileName(),
sourceDescriptor.mediaType() == null ? "application/gzip" : sourceDescriptor.mediaType(),
packageBytes,
expanded.manifestText(),
sourceDescriptor.receivedAt() == null ? OffsetDateTime.now() : sourceDescriptor.receivedAt(),
OriginalContentStoragePolicy.SKIP,
rootAttributes
));
List<String> warnings = new ArrayList<>(packageDocument.warnings());
List<at.procon.dip.domain.document.CanonicalDocumentMetadata> documents = new ArrayList<>();
documents.add(packageDocument.document().toCanonicalMetadata());
int sortOrder = 0;
for (TedPackageExpansionService.TedPackageEntry entry : expanded.entries()) {
sortOrder++;
String childUri = "tedpkg://" + sourceDescriptor.sourceIdentifier() + "/" + entry.archivePath();
String childIdentifier = sourceDescriptor.sourceIdentifier() + ":" + entry.archivePath();
String xmlContent = resolveXmlContent(entry);
Map<String, String> childAttributes = new LinkedHashMap<>();
childAttributes.put("documentTypeHint", "TED_NOTICE");
childAttributes.put("packageId", sourceDescriptor.sourceIdentifier());
childAttributes.put("archivePath", entry.archivePath());
childAttributes.put("title", entry.fileName());
childAttributes.put("importBatchId", properties.getGenericIngestion().getTedPackageImportBatchId());
ImportedDocumentResult childResult = importService.importDocument(new SourceDescriptor(
sourceDescriptor.accessContext() == null ? DocumentAccessContext.publicDocument() : sourceDescriptor.accessContext(),
SourceType.PACKAGE_CHILD,
childIdentifier,
childUri,
entry.fileName(),
entry.mediaType() == null ? "application/xml" : entry.mediaType(),
entry.data(),
xmlContent,
sourceDescriptor.receivedAt() == null ? OffsetDateTime.now() : sourceDescriptor.receivedAt(),
OriginalContentStoragePolicy.STORE,
childAttributes
));
Document childDocument = childResult.document();
documents.add(childDocument.toCanonicalMetadata());
warnings.addAll(childResult.warnings());
if (childResult.deduplicated()) {
warnings.add("TED XML child already existed and was linked to package: " + entry.archivePath());
}
relationService.ensureRelation(new CreateDocumentRelationCommand(
packageDocument.document().getId(),
childDocument.getId(),
RelationType.EXTRACTED_FROM,
sortOrder,
entry.archivePath()
));
}
return new IngestionResult(documents, warnings);
}
private String resolveXmlContent(TedPackageExpansionService.TedPackageEntry entry) {
if (entry.textUtf8() != null && !entry.textUtf8().isBlank()) {
return entry.textUtf8();
}
return new String(entry.data(), StandardCharsets.UTF_8);
}
}

@ -5,6 +5,7 @@ import at.procon.dip.domain.access.DocumentVisibility;
import at.procon.dip.domain.document.SourceType;
import at.procon.dip.domain.tenant.TenantRef;
import at.procon.dip.ingestion.service.DocumentIngestionGateway;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.config.TedProcessorProperties;
import java.nio.file.Files;
@ -72,6 +73,7 @@ public class GenericFileSystemIngestionRoute extends RouteBuilder {
payload,
null,
OffsetDateTime.now(),
OriginalContentStoragePolicy.DEFAULT,
attributes
);
ingestionGateway.ingest(descriptor);

@ -9,6 +9,7 @@ import at.procon.dip.ingestion.dto.GenericImportResponse;
import at.procon.dip.ingestion.dto.GenericTextImportRequest;
import at.procon.dip.ingestion.service.DocumentIngestionGateway;
import at.procon.dip.ingestion.spi.IngestionResult;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.config.TedProcessorProperties;
import java.time.OffsetDateTime;
@ -62,6 +63,7 @@ public class GenericDocumentImportController {
file.getBytes(),
null,
OffsetDateTime.now(),
OriginalContentStoragePolicy.DEFAULT,
attributes
);
IngestionResult result = ingestionGateway.ingest(descriptor);
@ -89,6 +91,7 @@ public class GenericDocumentImportController {
request.text() == null ? null : request.text().getBytes(java.nio.charset.StandardCharsets.UTF_8),
request.text(),
OffsetDateTime.now(),
OriginalContentStoragePolicy.DEFAULT,
attributes
);
IngestionResult result = ingestionGateway.ingest(descriptor);

@ -4,15 +4,12 @@ import at.procon.dip.classification.service.DocumentClassificationService;
import at.procon.dip.classification.spi.DetectionResult;
import at.procon.dip.domain.access.DocumentAccessContext;
import at.procon.dip.domain.access.DocumentVisibility;
import at.procon.dip.domain.document.CanonicalDocumentMetadata;
import at.procon.dip.domain.document.ContentRole;
import at.procon.dip.domain.document.DocumentStatus;
import at.procon.dip.domain.document.StorageType;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.entity.DocumentContent;
import at.procon.dip.domain.document.entity.DocumentEmbeddingModel;
import at.procon.dip.domain.document.entity.DocumentSource;
import at.procon.dip.domain.document.repository.DocumentEmbeddingRepository;
import at.procon.dip.domain.document.repository.DocumentRepository;
import at.procon.dip.domain.document.repository.DocumentSourceRepository;
import at.procon.dip.domain.document.service.DocumentContentService;
@ -29,11 +26,15 @@ import at.procon.dip.extraction.service.DocumentExtractionService;
import at.procon.dip.extraction.spi.ExtractionRequest;
import at.procon.dip.extraction.spi.ExtractionResult;
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.dip.ingestion.util.DocumentImportSupport;
import at.procon.dip.normalization.service.TextRepresentationBuildService;
import at.procon.dip.normalization.spi.RepresentationBuildRequest;
import at.procon.dip.normalization.spi.TextRepresentationDraft;
import at.procon.dip.processing.service.StructuredDocumentProcessingService;
import at.procon.dip.processing.spi.DocumentProcessingPolicy;
import at.procon.dip.processing.spi.StructuredProcessingRequest;
import at.procon.ted.config.TedProcessorProperties;
import at.procon.ted.util.HashUtils;
import java.nio.charset.StandardCharsets;
@ -43,7 +44,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -51,7 +51,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
/**
* Phase 4 generic import pipeline that persists arbitrary document types into the DOC model.
* Generic import pipeline that persists arbitrary document types into the DOC model.
*/
@Service
@RequiredArgsConstructor
@ -61,7 +61,6 @@ public class GenericDocumentImportService {
private final TedProcessorProperties properties;
private final DocumentRepository documentRepository;
private final DocumentSourceRepository documentSourceRepository;
private final DocumentEmbeddingRepository documentEmbeddingRepository;
private final DocumentService documentService;
private final DocumentSourceService documentSourceService;
private final DocumentContentService documentContentService;
@ -70,6 +69,7 @@ public class GenericDocumentImportService {
private final DocumentClassificationService classificationService;
private final DocumentExtractionService extractionService;
private final TextRepresentationBuildService representationBuildService;
private final StructuredDocumentProcessingService structuredProcessingService;
@Transactional
public ImportedDocumentResult importDocument(SourceDescriptor sourceDescriptor) {
@ -77,20 +77,20 @@ public class GenericDocumentImportService {
DetectionResult detection = classificationService.detect(withResolvedMediaType(sourceDescriptor, payload));
String dedupHash = HashUtils.computeSha256(payload.binaryContent());
DocumentAccessContext accessContext = sourceDescriptor.accessContext() == null
? defaultAccessContext()
: sourceDescriptor.accessContext();
if (properties.getGenericIngestion().isDeduplicateByContentHash()) {
Optional<Document> existing = documentRepository.findByDedupHash(dedupHash);
Optional<Document> existing = resolveDeduplicatedDocument(dedupHash, accessContext);
if (existing.isPresent()) {
Document document = existing.get();
ensureSource(document, sourceDescriptor);
List<String> warnings = List.of("Document content hash already imported; linked new source to existing document");
List<String> warnings = List.of("Document content hash already imported within the same access scope; linked new source to existing document");
return new ImportedDocumentResult(document, detection, warnings, true);
}
}
DocumentAccessContext accessContext = sourceDescriptor.accessContext() == null
? defaultAccessContext()
: sourceDescriptor.accessContext();
Document document = documentService.create(new CreateDocumentCommand(
accessContext.ownerTenant() == null ? null : accessContext.ownerTenant().tenantKey(),
accessContext.visibility(),
@ -108,21 +108,63 @@ public class GenericDocumentImportService {
ensureSource(document, sourceDescriptor);
documentService.updateStatus(document.getId(), DocumentStatus.CLASSIFIED);
DocumentContent originalContent = persistOriginalContent(document, sourceDescriptor, detection, payload, dedupHash);
boolean persistOriginalContent = shouldPersistOriginalContent(sourceDescriptor);
DocumentContent originalContent = persistOriginalContent
? persistOriginalContent(document, sourceDescriptor, detection, payload, dedupHash)
: null;
List<String> warnings = new ArrayList<>();
DocumentProcessingPolicy processingPolicy = structuredProcessingService.resolvePolicy(sourceDescriptor, detection);
ExtractionResult extractionResult = emptyExtractionResult();
Map<ContentRole, DocumentContent> persistedDerivedContent = new LinkedHashMap<>();
ExtractionResult extractionResult = extractionService.extract(new ExtractionRequest(
if (persistOriginalContent) {
if (processingPolicy.runGenericExtraction()) {
extractionResult = extractionService.extract(new ExtractionRequest(
sourceDescriptor,
detection,
payload.textContent(),
payload.binaryContent()
));
List<String> warnings = new ArrayList<>(extractionResult.warnings());
warnings.addAll(extractionResult.warnings());
if (processingPolicy.persistExtractedContent()) {
persistedDerivedContent.putAll(persistDerivedContent(document, detection, extractionResult, dedupHash, "generic"));
}
if (!extractionResult.derivedTextByRole().isEmpty()) {
documentService.updateStatus(document.getId(), DocumentStatus.EXTRACTED);
}
}
Map<ContentRole, DocumentContent> persistedDerivedContent = persistDerivedContent(document, detection, extractionResult, dedupHash);
if (processingPolicy.invokeStructuredProcessor()) {
Optional<ExtractionResult> structuredExtractionResult = structuredProcessingService.process(new StructuredProcessingRequest(
document,
originalContent,
sourceDescriptor,
detection,
payload.binaryContent(),
payload.textContent(),
dedupHash
));
if (structuredExtractionResult.isPresent()) {
ExtractionResult result = structuredExtractionResult.get();
warnings.addAll(result.warnings());
extractionResult = mergeExtractionResults(extractionResult, result);
if (processingPolicy.persistExtractedContent()) {
persistedDerivedContent.putAll(persistDerivedContent(document, detection, result, dedupHash, "structured"));
}
if (!result.derivedTextByRole().isEmpty()) {
documentService.updateStatus(document.getId(), DocumentStatus.EXTRACTED);
}
}
}
if (processingPolicy.runRepresentationBuilders()) {
var drafts = representationBuildService.build(new RepresentationBuildRequest(sourceDescriptor, detection, extractionResult));
persistRepresentationsAndEmbeddings(document, originalContent, persistedDerivedContent, drafts);
}
} else {
warnings.add("Original content storage disabled for this document; skipped extraction and text-representation processing");
}
Document reloaded = documentService.getRequired(document.getId());
if (reloaded.getStatus() == DocumentStatus.EXTRACTED) {
@ -130,7 +172,7 @@ public class GenericDocumentImportService {
reloaded = documentService.getRequired(reloaded.getId());
}
if (!extractionResult.structuredPayloads().isEmpty()) {
if (processingPolicy.applyStructuredTitleIfMissing() && !extractionResult.structuredPayloads().isEmpty()) {
applyStructuredTitleIfMissing(reloaded, extractionResult);
reloaded = documentService.getRequired(reloaded.getId());
}
@ -138,6 +180,50 @@ public class GenericDocumentImportService {
return new ImportedDocumentResult(reloaded, detection, warnings, false);
}
private ExtractionResult mergeExtractionResults(ExtractionResult left, ExtractionResult right) {
Map<ContentRole, String> derivedText = new LinkedHashMap<>();
if (left != null && left.derivedTextByRole() != null) {
derivedText.putAll(left.derivedTextByRole());
}
if (right != null && right.derivedTextByRole() != null) {
derivedText.putAll(right.derivedTextByRole());
}
List<at.procon.dip.extraction.spi.ExtractedStructuredPayload> payloads = new ArrayList<>();
if (left != null && left.structuredPayloads() != null) {
payloads.addAll(left.structuredPayloads());
}
if (right != null && right.structuredPayloads() != null) {
payloads.addAll(right.structuredPayloads());
}
List<String> warnings = new ArrayList<>();
if (left != null && left.warnings() != null) {
warnings.addAll(left.warnings());
}
if (right != null && right.warnings() != null) {
warnings.addAll(right.warnings());
}
return new ExtractionResult(derivedText, payloads, warnings);
}
private ExtractionResult emptyExtractionResult() {
return new ExtractionResult(java.util.Collections.emptyMap(), java.util.Collections.emptyList(), java.util.Collections.emptyList());
}
private Optional<Document> resolveDeduplicatedDocument(String dedupHash, DocumentAccessContext accessContext) {
return documentRepository.findAllByDedupHash(dedupHash).stream()
.filter(existing -> sameAccessScope(existing, accessContext))
.findFirst();
}
private boolean sameAccessScope(Document existing, DocumentAccessContext accessContext) {
if (existing.getVisibility() != accessContext.visibility()) {
return false;
}
String existingTenantKey = existing.getOwnerTenant() == null ? null : existing.getOwnerTenant().getTenantKey();
String requestedTenantKey = accessContext.ownerTenant() == null ? null : accessContext.ownerTenant().tenantKey();
return java.util.Objects.equals(existingTenantKey, requestedTenantKey);
}
private SourceDescriptor withResolvedMediaType(SourceDescriptor sourceDescriptor, ResolvedPayload payload) {
if (StringUtils.hasText(sourceDescriptor.mediaType())) {
return sourceDescriptor;
@ -152,6 +238,7 @@ public class GenericDocumentImportService {
sourceDescriptor.binaryContent(),
sourceDescriptor.textContent(),
sourceDescriptor.receivedAt(),
sourceDescriptor.originalContentStoragePolicy(),
sourceDescriptor.attributes()
);
}
@ -216,7 +303,7 @@ public class GenericDocumentImportService {
return sourceDescriptor.fileName();
}
if (StringUtils.hasText(payload.textContent())) {
for (String line : payload.textContent().split("\\n")) {
for (String line : payload.textContent().split("\n")) {
if (StringUtils.hasText(line)) {
return DocumentImportSupport.ellipsize(line.trim(), 240);
}
@ -244,6 +331,10 @@ public class GenericDocumentImportService {
return;
}
String importBatchId = sourceDescriptor.attributes() != null && StringUtils.hasText(sourceDescriptor.attributes().get("importBatchId"))
? sourceDescriptor.attributes().get("importBatchId")
: properties.getGenericIngestion().getImportBatchId();
documentSourceService.addSource(new AddDocumentSourceCommand(
document.getId(),
sourceDescriptor.sourceType(),
@ -251,11 +342,35 @@ public class GenericDocumentImportService {
sourceDescriptor.sourceUri(),
sourceDescriptor.fileName(),
null,
properties.getGenericIngestion().getImportBatchId(),
importBatchId,
sourceDescriptor.receivedAt() == null ? OffsetDateTime.now() : sourceDescriptor.receivedAt()
));
}
private boolean shouldPersistOriginalContent(SourceDescriptor sourceDescriptor) {
if (sourceDescriptor.originalContentStoragePolicy() == OriginalContentStoragePolicy.STORE) {
return true;
}
if (sourceDescriptor.originalContentStoragePolicy() == OriginalContentStoragePolicy.SKIP) {
return false;
}
if (properties.getGenericIngestion().isStoreOriginalContentForWrapperDocuments()) {
return true;
}
return !isWrapperDocument(sourceDescriptor);
}
private boolean isWrapperDocument(SourceDescriptor sourceDescriptor) {
if (sourceDescriptor.attributes() == null || sourceDescriptor.attributes().isEmpty()) {
return false;
}
String wrapperFlag = sourceDescriptor.attributes().get("wrapperDocument");
if (wrapperFlag == null) {
wrapperFlag = sourceDescriptor.attributes().get("containerDocument");
}
return Boolean.parseBoolean(wrapperFlag);
}
private DocumentContent persistOriginalContent(Document document,
SourceDescriptor sourceDescriptor,
DetectionResult detection,
@ -287,13 +402,14 @@ public class GenericDocumentImportService {
private Map<ContentRole, DocumentContent> persistDerivedContent(Document document,
DetectionResult detection,
ExtractionResult extractionResult,
String baseHash) {
String baseHash,
String hashNamespace) {
Map<ContentRole, DocumentContent> result = new LinkedHashMap<>();
extractionResult.derivedTextByRole().forEach((role, text) -> {
if (!StringUtils.hasText(text)) {
return;
}
String contentHash = HashUtils.computeSha256(baseHash + ":" + role.name() + ":" + text);
String contentHash = HashUtils.computeSha256(baseHash + ":" + hashNamespace + ":" + role.name() + ":" + text);
DocumentContent content = documentContentService.addContent(new AddDocumentContentCommand(
document.getId(),
role,
@ -336,16 +452,13 @@ public class GenericDocumentImportService {
if (!StringUtils.hasText(draft.textBody())) {
continue;
}
DocumentContent linkedContent = switch (draft.representationType()) {
case FULLTEXT, SEMANTIC_TEXT, SUMMARY, TITLE_ABSTRACT, METADATA_ENRICHED, CHUNK ->
derivedContent.getOrDefault(ContentRole.NORMALIZED_TEXT, originalContent);
};
DocumentContent linkedContent = resolveLinkedContent(originalContent, derivedContent, draft);
var representation = documentRepresentationService.addRepresentation(new AddDocumentTextRepresentationCommand(
document.getId(),
linkedContent == null ? null : linkedContent.getId(),
draft.representationType(),
"phase4-generic-builder",
draft.builderKey() == null ? "phase4-generic-builder" : draft.builderKey(),
draft.languageCode(),
null,
draft.chunkIndex(),
@ -362,7 +475,23 @@ public class GenericDocumentImportService {
documentService.updateStatus(document.getId(), DocumentStatus.REPRESENTED);
}
private DocumentContent resolveLinkedContent(DocumentContent originalContent,
Map<ContentRole, DocumentContent> derivedContent,
TextRepresentationDraft draft) {
ContentRole sourceRole = draft.sourceContentRole();
if (sourceRole == null) {
sourceRole = ContentRole.NORMALIZED_TEXT;
}
if (sourceRole == ContentRole.ORIGINAL) {
return originalContent;
}
return derivedContent.getOrDefault(sourceRole, originalContent);
}
private boolean shouldQueueEmbedding(TextRepresentationDraft draft) {
if (Boolean.FALSE.equals(draft.queueForEmbedding())) {
return false;
}
return properties.getGenericIngestion().isVectorizePrimaryRepresentationOnly() ? draft.primary() : true;
}

@ -0,0 +1,122 @@
package at.procon.dip.ingestion.service;
import jakarta.mail.BodyPart;
import jakarta.mail.Multipart;
import jakarta.mail.Part;
import jakarta.mail.Session;
import jakarta.mail.internet.MimeMessage;
import jakarta.mail.internet.MimeUtility;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.jsoup.Jsoup;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MailMessageExtractionService {
public ParsedMailMessage parse(byte[] rawMime) {
try {
Session session = Session.getDefaultInstance(new Properties());
MimeMessage message = new MimeMessage(session, new ByteArrayInputStream(rawMime));
String subject = message.getSubject();
String from = message.getFrom() != null && message.getFrom().length > 0 ? message.getFrom()[0].toString() : null;
List<String> recipients = new ArrayList<>();
if (message.getAllRecipients() != null) {
for (var recipient : message.getAllRecipients()) {
recipients.add(recipient.toString());
}
}
StringBuilder text = new StringBuilder();
StringBuilder html = new StringBuilder();
List<MailAttachment> attachments = new ArrayList<>();
processPart(message, text, html, attachments);
String normalizedText = text.length() > 0 ? text.toString().trim() : htmlToText(html.toString());
OffsetDateTime receivedAt = message.getReceivedDate() == null ? OffsetDateTime.now()
: message.getReceivedDate().toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime();
return new ParsedMailMessage(subject, from, recipients, receivedAt, normalizedText, html.toString(), attachments);
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse MIME message", e);
}
}
private void processPart(Part part, StringBuilder text, StringBuilder html, List<MailAttachment> attachments) throws Exception {
String disposition = part.getDisposition();
String contentType = part.getContentType() == null ? "application/octet-stream" : part.getContentType();
if (disposition != null && (Part.ATTACHMENT.equalsIgnoreCase(disposition) || Part.INLINE.equalsIgnoreCase(disposition))
&& part.getFileName() != null) {
attachments.add(extractAttachment(part));
return;
}
Object content = part.getContent();
if (content instanceof Multipart multipart) {
for (int i = 0; i < multipart.getCount(); i++) {
BodyPart bodyPart = multipart.getBodyPart(i);
processPart(bodyPart, text, html, attachments);
}
} else if (contentType.toLowerCase().contains("text/plain")) {
text.append(content.toString()).append("\n");
} else if (contentType.toLowerCase().contains("text/html")) {
html.append(content.toString()).append("\n");
} else if (part.getFileName() != null) {
attachments.add(extractAttachment(part));
}
}
private MailAttachment extractAttachment(Part part) throws Exception {
String fileName = part.getFileName();
if (fileName == null) {
fileName = "attachment";
}
try {
fileName = MimeUtility.decodeText(fileName);
} catch (Exception ignored) {
}
String contentType = part.getContentType();
byte[] data;
try (InputStream in = part.getInputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream()) {
in.transferTo(out);
data = out.toByteArray();
}
return new MailAttachment(fileName, contentType, data, data.length, null);
}
private String htmlToText(String html) {
if (html == null || html.isBlank()) {
return "";
}
try {
return Jsoup.parse(html).text().replaceAll("\s+", " ").trim();
} catch (Exception e) {
log.debug("Falling back to naive HTML cleanup: {}", e.getMessage());
return html.replaceAll("<[^>]+>", " ").replaceAll("\s+", " ").trim();
}
}
public String serializeMessage(ParsedMailMessage parsed) {
StringBuilder sb = new StringBuilder();
if (parsed.subject() != null) sb.append("Subject: ").append(parsed.subject()).append("\n");
if (parsed.from() != null) sb.append("From: ").append(parsed.from()).append("\n");
if (!parsed.recipients().isEmpty()) sb.append("To: ").append(String.join(", ", parsed.recipients())).append("\n");
sb.append("\n");
if (parsed.textBody() != null) sb.append(parsed.textBody());
return sb.toString().trim();
}
public record ParsedMailMessage(String subject, String from, List<String> recipients, OffsetDateTime receivedAt,
String textBody, String htmlBody, List<MailAttachment> attachments) {}
public record MailAttachment(String fileName, String contentType, byte[] data, long sizeBytes, String path) {
public String safeTextPreview() {
return new String(data, StandardCharsets.UTF_8);
}
}
}

@ -0,0 +1,88 @@
package at.procon.dip.ingestion.service;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class TedPackageExpansionService {
private static final int MAX_FILES = 10000;
private static final long MAX_SINGLE_FILE_SIZE = 20L * 1024 * 1024;
private static final long MAX_TOTAL_EXTRACTED_SIZE = 1024L * 1024 * 1024;
public TedPackageExpansionResult expand(byte[] tarGzBytes) {
List<TedPackageEntry> entries = new ArrayList<>();
long total = 0;
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(new ByteArrayInputStream(tarGzBytes)))) {
TarArchiveEntry entry;
while ((entry = tais.getNextTarEntry()) != null) {
if (entry.isDirectory()) {
continue;
}
if (entries.size() >= MAX_FILES) {
break;
}
String entryName = entry.getName();
if (!entryName.toLowerCase().endsWith(".xml")) {
continue;
}
if (entryName.contains("..") || entryName.startsWith("/") || entryName.startsWith("\\")) {
log.warn("Skipping suspicious TED package entry {}", entryName);
continue;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[8192];
long fileSize = 0;
int read;
while ((read = tais.read(buffer)) > 0) {
fileSize += read;
total += read;
if (fileSize > MAX_SINGLE_FILE_SIZE || total > MAX_TOTAL_EXTRACTED_SIZE) {
throw new IOException("TED package extraction limits exceeded");
}
baos.write(buffer, 0, read);
}
byte[] data = baos.toByteArray();
entries.add(new TedPackageEntry(extractFilename(entryName), entryName, data, data.length, "application/xml"));
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to expand TED package", e);
}
String manifest = buildManifest(entries);
return new TedPackageExpansionResult(entries, manifest);
}
private String buildManifest(List<TedPackageEntry> entries) {
StringBuilder sb = new StringBuilder();
sb.append("TED package contains ").append(entries.size()).append(" XML notice files\n");
for (TedPackageEntry entry : entries) {
sb.append("- ").append(entry.archivePath()).append(" (" ).append(entry.sizeBytes()).append(" bytes)\n");
}
return sb.toString().trim();
}
private String extractFilename(String path) {
int idx = Math.max(path.lastIndexOf('/'), path.lastIndexOf('\\'));
return idx >= 0 ? path.substring(idx + 1) : path;
}
public record TedPackageExpansionResult(List<TedPackageEntry> entries, String manifestText) {}
public record TedPackageEntry(String fileName, String archivePath, byte[] data, long sizeBytes, String mediaType) {
public String textUtf8() {
return new String(data, StandardCharsets.UTF_8);
}
}
}

@ -0,0 +1,11 @@
package at.procon.dip.ingestion.spi;
/**
* Controls whether the ORIGINAL raw payload should be persisted for a single imported document.
* DEFAULT defers to the global ingestion configuration and wrapper-document heuristics.
*/
public enum OriginalContentStoragePolicy {
DEFAULT,
STORE,
SKIP
}

@ -21,9 +21,16 @@ public record SourceDescriptor(
byte[] binaryContent,
String textContent,
OffsetDateTime receivedAt,
OriginalContentStoragePolicy originalContentStoragePolicy,
Map<String, String> attributes
) {
public SourceDescriptor {
if (originalContentStoragePolicy == null) {
originalContentStoragePolicy = OriginalContentStoragePolicy.DEFAULT;
}
}
public boolean hasInlineBinaryContent() {
return binaryContent != null && binaryContent.length > 0;
}

@ -45,7 +45,7 @@ public final class DocumentImportSupport {
public static DocumentFamily familyFor(DocumentType documentType) {
return switch (documentType) {
case TED_NOTICE -> DocumentFamily.PROCUREMENT;
case TED_PACKAGE, TED_NOTICE -> DocumentFamily.PROCUREMENT;
case EMAIL, MIME_MESSAGE -> DocumentFamily.MAIL;
case PDF, DOCX, HTML, XML_GENERIC, TEXT, MARKDOWN, ZIP_ARCHIVE, GENERIC_BINARY, UNKNOWN ->
DocumentFamily.GENERIC;

@ -12,12 +12,16 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
@Order(100)
public class DefaultGenericTextRepresentationBuilder implements TextRepresentationBuilder {
public static final String BUILDER_KEY = "default-generic-text";
@Override
public boolean supports(DocumentType documentType) {
return documentType != DocumentType.TED_NOTICE;
@ -39,25 +43,34 @@ public class DefaultGenericTextRepresentationBuilder implements TextRepresentati
List<TextRepresentationDraft> drafts = new ArrayList<>();
drafts.add(new TextRepresentationDraft(
RepresentationType.FULLTEXT,
BUILDER_KEY,
request.detectionResult().languageCode(),
baseText,
false,
null
null,
ContentRole.NORMALIZED_TEXT,
Boolean.TRUE
));
drafts.add(new TextRepresentationDraft(
RepresentationType.SEMANTIC_TEXT,
BUILDER_KEY,
request.detectionResult().languageCode(),
semantic,
true,
null
null,
ContentRole.NORMALIZED_TEXT,
Boolean.TRUE
));
if (StringUtils.hasText(title)) {
drafts.add(new TextRepresentationDraft(
RepresentationType.TITLE_ABSTRACT,
BUILDER_KEY,
request.detectionResult().languageCode(),
title + "\n\n" + summary,
false,
null
null,
ContentRole.NORMALIZED_TEXT,
Boolean.FALSE
));
}
return drafts;

@ -0,0 +1,138 @@
package at.procon.dip.normalization.impl;
import at.procon.dip.domain.document.ContentRole;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.domain.document.RepresentationType;
import at.procon.dip.extraction.spi.ExtractedStructuredPayload;
import at.procon.dip.ingestion.util.DocumentImportSupport;
import at.procon.dip.normalization.spi.RepresentationBuildRequest;
import at.procon.dip.normalization.spi.TextRepresentationBuilder;
import at.procon.dip.normalization.spi.TextRepresentationDraft;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
@Order(10)
public class TedStructuredTextRepresentationBuilder implements TextRepresentationBuilder {
public static final String BUILDER_KEY = "ted-structured-text";
@Override
public boolean supports(DocumentType documentType) {
return documentType == DocumentType.TED_NOTICE;
}
@Override
public List<TextRepresentationDraft> build(RepresentationBuildRequest request) {
String normalizedText = request.extractionResult().derivedTextByRole().get(ContentRole.NORMALIZED_TEXT);
if (!StringUtils.hasText(normalizedText)) {
return List.of();
}
Map<String, Object> attributes = request.extractionResult().structuredPayloads().stream()
.filter(payload -> Objects.equals(payload.projectionName(), "ted-notice"))
.map(ExtractedStructuredPayload::attributes)
.filter(Objects::nonNull)
.findFirst()
.orElse(Map.of());
String title = asString(attributes.get("title"));
String description = asString(attributes.get("description"));
String buyerName = asString(attributes.get("buyerName"));
String cpvCodes = asString(attributes.get("cpvCodes"));
String nutsCodes = asString(attributes.get("nutsCodes"));
String publicationId = asString(attributes.get("publicationId"));
String semanticText = buildSemanticText(title, description, buyerName, cpvCodes, nutsCodes, publicationId, normalizedText);
String summary = DocumentImportSupport.ellipsize(
StringUtils.hasText(description) ? description.trim() : normalizedText.replace('\n', ' ').trim(),
1200
);
List<TextRepresentationDraft> drafts = new ArrayList<>();
drafts.add(new TextRepresentationDraft(
RepresentationType.SEMANTIC_TEXT,
BUILDER_KEY,
request.detectionResult().languageCode(),
semanticText,
true,
null,
ContentRole.NORMALIZED_TEXT,
Boolean.TRUE
));
/*
drafts.add(new TextRepresentationDraft(
RepresentationType.FULLTEXT,
BUILDER_KEY,
request.detectionResult().languageCode(),
normalizedText,
false,
null,
ContentRole.NORMALIZED_TEXT,
Boolean.TRUE
));
if (StringUtils.hasText(title)) {
drafts.add(new TextRepresentationDraft(
RepresentationType.TITLE_ABSTRACT,
BUILDER_KEY,
request.detectionResult().languageCode(),
title + "\n\n" + summary,
false,
null,
ContentRole.NORMALIZED_TEXT,
Boolean.FALSE
));
}
drafts.add(new TextRepresentationDraft(
RepresentationType.SUMMARY,
BUILDER_KEY,
request.detectionResult().languageCode(),
summary,
false,
null,
ContentRole.NORMALIZED_TEXT,
Boolean.FALSE
));
*/
return drafts;
}
private String buildSemanticText(String title,
String description,
String buyerName,
String cpvCodes,
String nutsCodes,
String publicationId,
String normalizedText) {
StringBuilder sb = new StringBuilder();
sb.append("Document type: TED_NOTICE\n");
if (StringUtils.hasText(publicationId)) {
sb.append("Publication: ").append(publicationId.trim()).append('\n');
}
if (StringUtils.hasText(title)) {
sb.append("Title: ").append(title.trim()).append("\n\n");
}
if (StringUtils.hasText(buyerName)) {
sb.append("Contracting Authority: ").append(buyerName.trim()).append('\n');
}
if (StringUtils.hasText(cpvCodes)) {
sb.append("CPV Codes: ").append(cpvCodes.trim()).append('\n');
}
if (StringUtils.hasText(nutsCodes)) {
sb.append("NUTS Codes: ").append(nutsCodes.trim()).append('\n');
}
if (StringUtils.hasText(description)) {
sb.append("\nDescription: ").append(description.trim()).append("\n\n");
}
sb.append(normalizedText.trim());
return sb.toString().trim();
}
private String asString(Object value) {
return value instanceof String s && StringUtils.hasText(s) ? s : null;
}
}

@ -3,8 +3,10 @@ package at.procon.dip.normalization.service;
import at.procon.dip.normalization.spi.RepresentationBuildRequest;
import at.procon.dip.normalization.spi.TextRepresentationBuilder;
import at.procon.dip.normalization.spi.TextRepresentationDraft;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.stereotype.Service;
@Service
@ -14,11 +16,22 @@ public class TextRepresentationBuildService {
private final List<TextRepresentationBuilder> builders;
public List<TextRepresentationDraft> build(RepresentationBuildRequest request) {
return builders.stream()
List<TextRepresentationBuilder> matchingBuilders = builders.stream()
.filter(builder -> builder.supports(request.detectionResult().documentType()))
.findFirst()
.orElseThrow(() -> new IllegalStateException(
"No text representation builder registered for " + request.detectionResult().documentType()))
.build(request);
.sorted(AnnotationAwareOrderComparator.INSTANCE)
.toList();
if (matchingBuilders.isEmpty()) {
throw new IllegalStateException(
"No text representation builder registered for " + request.detectionResult().documentType());
}
List<TextRepresentationDraft> result = new ArrayList<>();
for (TextRepresentationBuilder builder : matchingBuilders) {
List<TextRepresentationDraft> drafts = builder.build(request);
if (drafts != null && !drafts.isEmpty()) {
result.addAll(drafts);
}
}
return result;
}
}

@ -1,5 +1,6 @@
package at.procon.dip.normalization.spi;
import at.procon.dip.domain.document.ContentRole;
import at.procon.dip.domain.document.RepresentationType;
/**
@ -7,9 +8,20 @@ import at.procon.dip.domain.document.RepresentationType;
*/
public record TextRepresentationDraft(
RepresentationType representationType,
String builderKey,
String languageCode,
String textBody,
boolean primary,
Integer chunkIndex
Integer chunkIndex,
ContentRole sourceContentRole,
Boolean queueForEmbedding
) {
public TextRepresentationDraft(RepresentationType representationType,
String languageCode,
String textBody,
boolean primary,
Integer chunkIndex) {
this(representationType, null, languageCode, textBody, primary, chunkIndex, ContentRole.NORMALIZED_TEXT, null);
}
}

@ -0,0 +1,110 @@
package at.procon.dip.processing.impl;
import at.procon.dip.classification.spi.DetectionResult;
import at.procon.dip.domain.document.ContentRole;
import at.procon.dip.domain.document.DocumentFamily;
import at.procon.dip.domain.document.DocumentStatus;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.extraction.spi.ExtractedStructuredPayload;
import at.procon.dip.extraction.spi.ExtractionResult;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.dip.processing.spi.DocumentProcessingPolicy;
import at.procon.dip.processing.spi.StructuredDocumentProcessor;
import at.procon.dip.processing.spi.StructuredProcessingRequest;
import at.procon.dip.domain.document.service.DocumentService;
import at.procon.dip.domain.ted.service.TedNoticeProjectionService;
import at.procon.ted.model.entity.ProcurementDocument;
import at.procon.ted.service.XmlParserService;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
@RequiredArgsConstructor
@Slf4j
public class TedStructuredDocumentProcessor implements StructuredDocumentProcessor {
private final XmlParserService xmlParserService;
private final DocumentService documentService;
private final TedNoticeProjectionService tedNoticeProjectionService;
@Override
public boolean supports(SourceDescriptor sourceDescriptor, DetectionResult detectionResult) {
return detectionResult.documentType() == DocumentType.TED_NOTICE;
}
@Override
public DocumentProcessingPolicy processingPolicy(SourceDescriptor sourceDescriptor, DetectionResult detectionResult) {
return DocumentProcessingPolicy.replaceGenericTextProcessing();
}
@Override
public ExtractionResult process(StructuredProcessingRequest request) {
String xml = request.textContent();
if (!StringUtils.hasText(xml) && request.binaryContent() != null) {
xml = new String(request.binaryContent(), java.nio.charset.StandardCharsets.UTF_8);
}
if (!StringUtils.hasText(xml)) {
return new ExtractionResult(Map.of(), List.of(), List.of("TED structured processor received no XML payload"));
}
ProcurementDocument tedDocument = xmlParserService.parseDocument(xml);
tedDocument.setDocumentHash(request.dedupHash());
tedDocument.setXmlDocument(xml);
tedDocument.setSourceFilename(request.sourceDescriptor().fileName());
tedDocument.setSourcePath(request.sourceDescriptor().sourceUri());
tedDocument.setFileSizeBytes(request.binaryContent() == null ? null : (long) request.binaryContent().length);
var canonical = request.document();
canonical.setDocumentType(DocumentType.TED_NOTICE);
canonical.setDocumentFamily(DocumentFamily.PROCUREMENT);
canonical.setStatus(DocumentStatus.CLASSIFIED);
canonical.setTitle(tedDocument.getProjectTitle());
canonical.setSummary(tedDocument.getProjectDescription());
canonical.setLanguageCode(tedDocument.getLanguageCode());
canonical.setMimeType(request.detectionResult().mimeType() == null ? "application/xml" : request.detectionResult().mimeType());
if (StringUtils.hasText(tedDocument.getPublicationId())) {
canonical.setBusinessKey("TED_NOTICE:" + tedDocument.getPublicationId());
} else if (StringUtils.hasText(tedDocument.getNoticeId())) {
canonical.setBusinessKey("TED_NOTICE:" + tedDocument.getNoticeId());
}
documentService.save(canonical);
tedNoticeProjectionService.registerOrRefreshProjection(tedDocument, canonical.getId());
Map<String, Object> payload = new LinkedHashMap<>();
if (StringUtils.hasText(tedDocument.getProjectTitle())) {
payload.put("title", tedDocument.getProjectTitle());
}
if (StringUtils.hasText(tedDocument.getProjectDescription())) {
payload.put("description", tedDocument.getProjectDescription());
}
if (StringUtils.hasText(tedDocument.getBuyerName())) {
payload.put("buyerName", tedDocument.getBuyerName());
}
if (tedDocument.getCpvCodes() != null && tedDocument.getCpvCodes().length > 0) {
payload.put("cpvCodes", String.join(", ", tedDocument.getCpvCodes()));
}
if (tedDocument.getNutsCodes() != null && tedDocument.getNutsCodes().length > 0) {
payload.put("nutsCodes", String.join(", ", tedDocument.getNutsCodes()));
}
payload.put("lotCount", tedDocument.getLots() == null ? 0 : tedDocument.getLots().size());
payload.put("noticeId", tedDocument.getNoticeId());
payload.put("publicationId", tedDocument.getPublicationId());
Map<ContentRole, String> derivedText = new LinkedHashMap<>();
if (StringUtils.hasText(tedDocument.getTextContent())) {
derivedText.put(ContentRole.NORMALIZED_TEXT, tedDocument.getTextContent());
}
return new ExtractionResult(
derivedText,
List.of(new ExtractedStructuredPayload("ted-notice", payload)),
List.of()
);
}
}

@ -0,0 +1,36 @@
package at.procon.dip.processing.service;
import at.procon.dip.classification.spi.DetectionResult;
import at.procon.dip.processing.spi.DocumentProcessingPolicy;
import at.procon.dip.processing.spi.StructuredDocumentProcessor;
import at.procon.dip.processing.spi.StructuredProcessingRequest;
import at.procon.dip.extraction.spi.ExtractionResult;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import java.util.List;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class StructuredDocumentProcessingService {
private final List<StructuredDocumentProcessor> processors;
public Optional<StructuredDocumentProcessor> resolve(SourceDescriptor sourceDescriptor, DetectionResult detectionResult) {
return processors.stream()
.filter(processor -> processor.supports(sourceDescriptor, detectionResult))
.findFirst();
}
public DocumentProcessingPolicy resolvePolicy(SourceDescriptor sourceDescriptor, DetectionResult detectionResult) {
return resolve(sourceDescriptor, detectionResult)
.map(processor -> processor.processingPolicy(sourceDescriptor, detectionResult))
.orElse(DocumentProcessingPolicy.genericDefault());
}
public Optional<ExtractionResult> process(StructuredProcessingRequest request) {
return resolve(request.sourceDescriptor(), request.detectionResult())
.map(processor -> processor.process(request));
}
}

@ -0,0 +1,22 @@
package at.procon.dip.processing.spi;
/**
* Controls which generic pipeline stages should run for a document and whether
* a structured processor should be invoked.
*/
public record DocumentProcessingPolicy(
boolean runGenericExtraction,
boolean persistExtractedContent,
boolean runRepresentationBuilders,
boolean invokeStructuredProcessor,
boolean applyStructuredTitleIfMissing
) {
public static DocumentProcessingPolicy genericDefault() {
return new DocumentProcessingPolicy(true, true, true, true, true);
}
public static DocumentProcessingPolicy replaceGenericTextProcessing() {
return new DocumentProcessingPolicy(false, true, true, true, true);
}
}

@ -0,0 +1,19 @@
package at.procon.dip.processing.spi;
import at.procon.dip.classification.spi.DetectionResult;
import at.procon.dip.extraction.spi.ExtractionResult;
import at.procon.dip.ingestion.spi.SourceDescriptor;
/**
* Optional type-specific enrichment layer on top of the canonical DOC import.
*/
public interface StructuredDocumentProcessor {
boolean supports(SourceDescriptor sourceDescriptor, DetectionResult detectionResult);
default DocumentProcessingPolicy processingPolicy(SourceDescriptor sourceDescriptor, DetectionResult detectionResult) {
return DocumentProcessingPolicy.replaceGenericTextProcessing();
}
ExtractionResult process(StructuredProcessingRequest request);
}

@ -0,0 +1,20 @@
package at.procon.dip.processing.spi;
import at.procon.dip.classification.spi.DetectionResult;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.entity.DocumentContent;
import at.procon.dip.ingestion.spi.SourceDescriptor;
/**
* Canonical import context handed to a structured document processor.
*/
public record StructuredProcessingRequest(
Document document,
DocumentContent originalContent,
SourceDescriptor sourceDescriptor,
DetectionResult detectionResult,
byte[] binaryContent,
String textContent,
String dedupHash
) {
}

@ -1,6 +1,10 @@
package at.procon.ted.camel;
import at.procon.dip.domain.document.SourceType;
import at.procon.ted.config.TedProcessorProperties;
import at.procon.dip.ingestion.service.DocumentIngestionGateway;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.service.attachment.AttachmentExtractor;
import at.procon.ted.service.attachment.AttachmentProcessingService;
import jakarta.mail.BodyPart;
@ -14,16 +18,18 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mail.MailMessage;
import org.jsoup.Jsoup;
import org.springframework.stereotype.Component;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.*;
/**
* Apache Camel route for IMAP mail processing.
@ -51,6 +57,7 @@ public class MailRoute extends RouteBuilder {
private final TedProcessorProperties properties;
private final AttachmentProcessingService attachmentProcessingService;
private final DocumentIngestionGateway documentIngestionGateway;
@Override
public void configure() throws Exception {
@ -105,7 +112,14 @@ public class MailRoute extends RouteBuilder {
from("direct:mime")
.routeId(ROUTE_ID_MIME)
.process(exchange -> {
Message mailMessage = exchange.getIn().getBody(Message.class);
Message mailMessage = null;
MailMessage mailMessage_ = exchange.getIn().getBody(MailMessage.class);
if(mailMessage_ != null) {
mailMessage = mailMessage_.getMessage();
}
else {
mailMessage = exchange.getIn().getBody(Message.class);
}
if (mailMessage == null) {
log.warn("Received null mail message, skipping");
@ -147,6 +161,41 @@ public class MailRoute extends RouteBuilder {
log.info("MIME decoded: subject='{}', textLength={}, htmlLength={}, attachments={}",
subject, finalTextContent.length(), htmlContent.length(), attachments.size());
if (properties.getGenericIngestion().isEnabled() && properties.getGenericIngestion().isMailAdapterEnabled()) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
mailMessage.writeTo(baos);
String sourceIdentifier = mailMessage.getHeader("Message-ID") != null ?
Arrays.stream(mailMessage.getHeader("Message-ID")).findFirst().orElse(null) : null;
if (sourceIdentifier == null || sourceIdentifier.isBlank()) {
sourceIdentifier = UUID.randomUUID().toString();
}
var result = documentIngestionGateway.ingest(new SourceDescriptor(
null,
SourceType.MAIL,
sourceIdentifier,
null,
subject != null ? subject.replaceAll("[^A-Za-z0-9._-]", "_") + ".eml" : "mail-message.eml",
"message/rfc822",
baos.toByteArray(),
null,
exchange.getIn().getHeader("mailReceivedDate", Date.class) == null
? OffsetDateTime.now()
: exchange.getIn().getHeader("mailReceivedDate", Date.class).toInstant()
.atZone(ZoneId.systemDefault()).toOffsetDateTime(),
OriginalContentStoragePolicy.DEFAULT,
Map.of(
"subject", subject != null ? subject : "",
"from", from != null ? from : ""
)
));
if (!result.warnings().isEmpty()) {
log.info("Mail adapter imported MIME message with {} warnings", result.warnings().size());
}
} catch (Exception e) {
log.warn("Phase 4.1 mail adapter import failed for subject '{}': {}", subject, e.getMessage());
}
}
})
// Queue attachments for async processing
.choice()

@ -1,5 +1,9 @@
package at.procon.ted.camel;
import at.procon.dip.ingestion.service.DocumentIngestionGateway;
import at.procon.dip.ingestion.spi.IngestionResult;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.config.TedProcessorProperties;
import at.procon.ted.model.entity.TedDailyPackage;
import at.procon.ted.repository.TedDailyPackageRepository;
@ -53,6 +57,7 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder {
private final TedProcessorProperties properties;
private final TedDailyPackageRepository packageRepository;
private final TedPackageDownloadService downloadService;
private final DocumentIngestionGateway documentIngestionGateway;
private final BatchDocumentProcessingService batchProcessingService;
/**
@ -146,9 +151,21 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder {
.process(this::markPackageDuplicate)
.otherwise()
.process(this::saveDownloadedPackage)
.choice()
.when(header("skipLegacyXmlProcessing").isEqualTo(true))
.log(LoggingLevel.INFO, "Package ${header.packageId}: generic ingestion gateway-only mode active, skipping legacy XML batch persistence")
.to("direct:complete-package-after-gateway")
.otherwise()
.to("direct:extract-tar-gz")
.endChoice()
.end();
from("direct:complete-package-after-gateway")
.routeId("ted-package-gateway-only-completer")
.setHeader("processingStartTime", constant(System.currentTimeMillis()))
.process(this::markPackageCompleted)
.process(this::logPackageStatistics);
// tar.gz Extraction Route
from("direct:extract-tar-gz")
.routeId(ROUTE_ID_EXTRACTOR)
@ -214,7 +231,7 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder {
long runningCount = downloadingCount + processingCount;
exchange.getIn().setHeader("runningCount", runningCount);
exchange.getIn().setHeader("tooManyRunning", runningCount >= 2);
exchange.getIn().setHeader("tooManyRunning", runningCount >= 1);
if (runningCount > 0) {
log.info("Currently {} packages in progress ({} downloading, {} processing)",
@ -278,21 +295,9 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder {
Integer serialNumber = exchange.getIn().getHeader("serialNumber", Integer.class);
String downloadUrl = exchange.getIn().getHeader("downloadUrl", String.class);
Optional<TedDailyPackage> existing = packageRepository.findByPackageIdentifier(packageId);
if (existing.isPresent()) {
TedDailyPackage pkg = existing.get();
if (pkg.getDownloadStatus() == TedDailyPackage.DownloadStatus.NOT_FOUND) {
log.info("Retrying existing NOT_FOUND package in Camel route: {}", packageId);
pkg.setDownloadUrl(downloadUrl);
pkg.setErrorMessage(null);
pkg.setDownloadStatus(TedDailyPackage.DownloadStatus.DOWNLOADING);
pkg = packageRepository.save(pkg);
exchange.getIn().setHeader("packageDbId", pkg.getId());
return;
}
log.debug("Package {} already exists in DB with status {}", packageId, pkg.getDownloadStatus());
exchange.getIn().setHeader("packageDbId", pkg.getId());
// Check if already exists
if (packageRepository.existsByPackageIdentifier(packageId)) {
log.debug("Package {} already exists in DB", packageId);
return;
}
@ -372,6 +377,50 @@ public class TedPackageDownloadCamelRoute extends RouteBuilder {
exchange.getIn().setHeader("downloadPath", downloadPath.toString());
exchange.getIn().setHeader("deleteAfterExtraction",
properties.getDownload().isDeleteAfterExtraction());
exchange.getIn().setHeader("skipLegacyXmlProcessing", false);
if (properties.getGenericIngestion().isEnabled() && properties.getGenericIngestion().isTedPackageAdapterEnabled()) {
try {
IngestionResult ingestionResult = documentIngestionGateway.ingest(new SourceDescriptor(
null,
at.procon.dip.domain.document.SourceType.TED_PACKAGE,
packageId,
downloadPath.toString(),
packageId + ".tar.gz",
"application/gzip",
body,
null,
OffsetDateTime.now(),
OriginalContentStoragePolicy.DEFAULT,
java.util.Map.of(
"packageId", packageId,
"title", packageId + ".tar.gz"
)
));
int importedChildCount = Math.max(0, ingestionResult.documents().size() - 1);
exchange.getIn().setHeader("gatewayImportedChildCount", importedChildCount);
exchange.getIn().setHeader("gatewayImportWarnings", ingestionResult.warnings().size());
if (properties.getGenericIngestion().isGatewayOnlyForTedPackages()) {
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
pkg.setXmlFileCount(importedChildCount);
pkg.setProcessedCount(importedChildCount);
pkg.setFailedCount(0);
packageRepository.save(pkg);
});
if (properties.getDownload().isDeleteAfterExtraction()) {
Files.deleteIfExists(downloadPath);
}
exchange.getIn().setHeader("skipLegacyXmlProcessing", true);
}
} catch (Exception e) {
log.warn("Phase 4.1 TED package adapter import failed for {}: {}", packageId, e.getMessage());
}
}
}
/**

@ -510,7 +510,7 @@ public class TedProcessorProperties {
/**
* Input directory for the generic filesystem importer.
*/
private String inputDirectory = "D:/ted.europe/generic-input";
private String inputDirectory = "/ted.europe/generic-input";
/**
* Regular-expression file pattern used by the Camel file route.
@ -570,6 +570,14 @@ public class TedProcessorProperties {
*/
private boolean deduplicateByContentHash = true;
/**
* Persist ORIGINAL content rows for wrapper/container documents that primarily exist
* to group or reference child documents (for example TED packages or expanded ZIP wrappers).
* When disabled, wrappers are still classified, extracted and represented, but the raw
* ORIGINAL content payload is not stored in DOC.doc_content.
*/
private boolean storeOriginalContentForWrapperDocuments = true;
/**
* Queue only the primary text representation for embedding.
*/
@ -580,6 +588,50 @@ public class TedProcessorProperties {
*/
@NotBlank
private String importBatchId = "phase4-generic";
/**
* Enable the Phase 4.1 TED package adapter built on top of the generic DOC ingestion SPI.
*/
private boolean tedPackageAdapterEnabled = true;
/**
* Enable the Phase 4.1 mail/document adapter built on top of the generic DOC ingestion SPI.
*/
private boolean mailAdapterEnabled = false;
/**
* Optional dedicated owner tenant key for imported mail messages and attachments.
* Falls back to defaultOwnerTenantKey when not configured.
*/
private String mailDefaultOwnerTenantKey;
/**
* Default visibility for imported mail messages and attachments.
*/
private at.procon.dip.domain.access.DocumentVisibility mailDefaultVisibility = at.procon.dip.domain.access.DocumentVisibility.TENANT;
/**
* Expand ZIP attachments recursively through the mail adapter.
*/
private boolean expandMailZipAttachments = true;
/**
* Import batch identifier for TED package roots and extracted TED child documents.
*/
@NotBlank
private String tedPackageImportBatchId = "phase41-ted-package";
/**
* When true, TED packages are persisted only through the generic ingestion gateway
* and the legacy XML batch persistence path is skipped.
*/
private boolean gatewayOnlyForTedPackages = false;
/**
* Import batch identifier for imported mail root messages and child attachments.
*/
@NotBlank
private String mailImportBatchId = "phase41-mail";
}
}

@ -39,7 +39,7 @@ spring:
order_updates: true
flyway:
enabled: true
enabled: false
locations: classpath:db/migration
baseline-on-migrate: true
create-schemas: true
@ -128,7 +128,9 @@ ted:
# TED Daily Package Download configuration
download:
# Enable/disable automatic package download
enabled: false
enabled: true
# User service-based camel route
use-service-based: false
# Base URL for TED Daily Packages
base-url: https://ted.europa.eu/packages/daily/
# Download directory for tar.gz files
@ -136,11 +138,11 @@ ted:
# Extract directory for XML files
extract-directory: /ted.europe/extracted
# Start year for downloads
start-year: 2023
start-year: 2026
# Max consecutive 404 errors before stopping
max-consecutive-404: 4
# Polling interval (milliseconds) - 2 minutes
poll-interval: 120000
poll-interval: 3600000
# Retry interval for tail NOT_FOUND packages - 6 hours
not-found-retry-interval: 21600000
# Grace period after year end before a previous-year tail 404 is treated as final
@ -150,7 +152,7 @@ ted:
# Download timeout (milliseconds) - 5 minutes
download-timeout: 300000
# Max concurrent downloads
max-concurrent-downloads: 2
max-concurrent-downloads: 1
# Delay between downloads (milliseconds) for rate limiting - 5 seconds
delay-between-downloads: 3000
# Delete tar.gz after extraction
@ -207,13 +209,13 @@ ted:
# Phase 4 generic ingestion configuration
generic-ingestion:
# Master switch for arbitrary document ingestion into the DOC model
enabled: false
enabled: true
# Enable file-system polling for non-TED documents
file-system-enabled: false
# Allow REST/API upload endpoints for arbitrary documents
rest-upload-enabled: true
# Input directory for the generic Camel file route
input-directory: D:/ted.europe/generic-input
input-directory: /ted.europe/generic-input
# Regex for files accepted by the generic file route
file-pattern: .*\.(pdf|txt|html|htm|xml|md|markdown|csv|json|yaml|yml)$
# Move successfully processed files here
@ -236,10 +238,29 @@ ted:
max-binary-bytes-in-db: 5242880
# Deduplicate by content hash and attach additional sources to the same canonical document
deduplicate-by-content-hash: true
# Persist ORIGINAL content rows for wrapper/container documents such as TED packages or ZIP wrappers
store-original-content-for-wrapper-documents: true
# Queue only the primary text representation for vectorization
vectorize-primary-representation-only: true
# Import batch marker written to DOC.doc_source.import_batch_id
import-batch-id: phase4-generic
# Enable Phase 4.1 TED package adapter on top of the generic DOC ingestion SPI
ted-package-adapter-enabled: true
# Enable Phase 4.1 mail/document adapter on top of the generic DOC ingestion SPI
mail-adapter-enabled: false
# Optional dedicated mail owner tenant, falls back to default-owner-tenant-key
mail-default-owner-tenant-key:
# Visibility for imported mail messages and attachments
mail-default-visibility: TENANT
# Expand ZIP attachments recursively through the mail adapter
expand-mail-zip-attachments: true
# Import batch marker for TED package roots and children
ted-package-import-batch-id: phase41-ted-package
# When true, TED package documents are stored only through the generic ingestion gateway
# and the legacy XML batch processing path is skipped
gateway-only-for-ted-packages: true
# Import batch marker for mail roots and attachments
mail-import-batch-id: phase41-mail
# Solution Brief processing configuration
solution-brief:

@ -0,0 +1,80 @@
-- Phase 4.1 enum/check-constraint expansion for newly introduced generic document/source types.
-- Supports both:
-- 1) PostgreSQL ENUM-backed columns created by Flyway
-- 2) legacy VARCHAR + CHECK constraint variants that may exist in local/dev databases
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_type t
JOIN pg_namespace n ON n.oid = t.typnamespace
WHERE n.nspname = 'doc'
AND t.typname = 'doc_document_type'
) THEN
ALTER TYPE DOC.doc_document_type ADD VALUE IF NOT EXISTS 'TED_PACKAGE';
END IF;
END
$$;
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_type t
JOIN pg_namespace n ON n.oid = t.typnamespace
WHERE n.nspname = 'doc'
AND t.typname = 'doc_source_type'
) THEN
ALTER TYPE DOC.doc_source_type ADD VALUE IF NOT EXISTS 'PACKAGE_CHILD';
END IF;
END
$$;
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_constraint c
JOIN pg_class r ON r.oid = c.conrelid
JOIN pg_namespace n ON n.oid = r.relnamespace
WHERE n.nspname = 'doc'
AND r.relname = 'doc_document'
AND c.conname = 'doc_document_document_type_check'
) THEN
ALTER TABLE DOC.doc_document DROP CONSTRAINT doc_document_document_type_check;
ALTER TABLE DOC.doc_document
ADD CONSTRAINT doc_document_document_type_check
CHECK (
document_type IN (
'TED_PACKAGE', 'TED_NOTICE', 'EMAIL', 'MIME_MESSAGE', 'PDF', 'DOCX', 'HTML',
'XML_GENERIC', 'TEXT', 'MARKDOWN', 'ZIP_ARCHIVE', 'GENERIC_BINARY', 'UNKNOWN'
)
);
END IF;
END
$$;
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_constraint c
JOIN pg_class r ON r.oid = c.conrelid
JOIN pg_namespace n ON n.oid = r.relnamespace
WHERE n.nspname = 'doc'
AND r.relname = 'doc_source'
AND c.conname = 'doc_source_source_type_check'
) THEN
ALTER TABLE DOC.doc_source DROP CONSTRAINT doc_source_source_type_check;
ALTER TABLE DOC.doc_source
ADD CONSTRAINT doc_source_source_type_check
CHECK (
source_type IN (
'TED_PACKAGE', 'PACKAGE_CHILD', 'MAIL', 'FILE_SYSTEM', 'REST_UPLOAD',
'MANUAL_UPLOAD', 'ZIP_CHILD', 'API', 'MIGRATION'
)
);
END IF;
END
$$;
Loading…
Cancel
Save