structured email storage and processing

master
trifonovt 2 days ago
parent 3284205a9e
commit 8fddc2a429

@ -0,0 +1,58 @@
# Mail Processing Stabilization Phase NEW Camel mail route
This patch adds a NEW-runtime Camel route for mail ingestion that is similar in purpose to the legacy `MailRoute`, but implemented only in the NEW code path (`at.procon.dip.*`).
## Scope
- runtime mode: `NEW` only
- entrypoint: Camel mail consumer route
- downstream handling: `DocumentIngestionGateway` with `SourceType.MAIL`
- no legacy profile/config/code changes
## Main classes
- `at.procon.dip.ingestion.camel.GenericMailIngestionRoute`
- `at.procon.dip.ingestion.mail.MailServerEndpointUriFactory`
- `at.procon.dip.ingestion.mail.CamelMailServerEndpointUriFactory`
- `at.procon.dip.ingestion.mail.MailServerProtocol`
## Configuration
Configured under `dip.ingestion.mail-route` in `application-new.yml`.
Supported protocols currently exposed through the generic endpoint factory:
- `IMAP`
- `IMAPS`
- `POP3`
- `POP3S`
This keeps the route provider/protocol-agnostic at the configuration level while still using Apache Camel mail endpoints underneath.
## Behavior
When enabled, the route:
1. consumes messages from the configured mail endpoint
2. serializes each provider message to raw MIME bytes
3. extracts basic provider/provenance attributes
4. builds a `SourceDescriptor` with `SourceType.MAIL`
5. delegates the actual parsing/import/attachment handling to the existing NEW `MailDocumentIngestionAdapter`
## Provenance attributes added by the route
The route enriches the `SourceDescriptor.attributes` map with:
- `providerType`
- `providerProtocol`
- `providerAccountKey`
- `providerFolderKey`
- `providerMessageKey` (when available from Camel headers)
- `providerThreadKey` (when available)
- `messageId`
- `subject`
- `from`
## Notes
- The route intentionally does **not** depend on `TedProcessorProperties`.
- The route is designed for the NEW runtime only.
- The mail import behavior still relies on the NEW `MailDocumentIngestionAdapter` for MIME parsing and attachment import.
- This step does not add replay/reprocessing yet.

@ -0,0 +1,114 @@
# Mail Processing Stabilization Phase — Step 1
This step implements the first practical slice of the mail-processing stabilization work:
- generic mail-provider contract
- provider-aware source identifiers for idempotent import
- typed mail metadata persistence
- attachment occurrence tracking
- current Camel/IMAP route adapted to the generic provider contract
## Included scope
### 1. Generic mail provider contract
Added a generic abstraction so the ingestion pipeline does not depend on IMAP-specific semantics:
- `MailProviderType`
- `MailProviderEnvelope`
- `GenericMailProviderEnvelope`
- `MailProviderEnvelopeAttributes`
Current implementation uses `GenericMailProviderEnvelope` for the existing Camel IMAP route.
Future providers such as POP3, EWS, Microsoft Graph, Gmail API, or replay/file sources can use the same contract.
### 2. Provider-aware idempotency foundation
Added `MailImportIdentityResolver` to derive stable source identifiers for:
- root mail message
- attachment occurrences
Priority for root message identity:
1. provider message key
2. `Message-ID`
3. raw MIME hash
This allows the import path to remain restart-safe and replay-safe even when content-hash-only deduplication is insufficient.
### 3. Generic source-id idempotency in document import
`GenericDocumentImportService` now checks for an existing `DOC.doc_source` row using:
- `source_type`
- `external_source_id`
before content-hash deduplication.
This makes source-identifier idempotency reusable beyond mail as well.
### 4. Typed mail metadata persistence
Added new DOC metadata tables/entities:
- `DOC.doc_mail_message`
- `DOC.doc_mail_recipient`
- `DOC.doc_mail_attachment`
These persist:
- provider/account/folder/message/thread keys
- `Message-ID`, `In-Reply-To`, `References`
- normalized subject
- sender/recipients
- attachment occurrence metadata
- part path / archive path / disposition / content-id
### 5. Attachment source typing
Attachments imported from mail now use:
- `SourceType.MAIL_ATTACHMENT`
instead of the generic `MAIL` source type.
### 6. Camel IMAP route integration
The existing Camel mail route now emits generic provider metadata into `SourceDescriptor.attributes()` using the new provider contract.
## Not yet included
The following are intentionally left for the next step:
- replay/reprocess workflows
- import/reprocess run tracking tables
- failed attachment retry services
- thread-aware search/reporting
- admin/ops visibility endpoints or Camel admin routes
## Main implementation files
### New files
- `src/main/java/at/procon/dip/ingestion/mail/MailProviderType.java`
- `src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelope.java`
- `src/main/java/at/procon/dip/ingestion/mail/GenericMailProviderEnvelope.java`
- `src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributes.java`
- `src/main/java/at/procon/dip/ingestion/mail/MailImportIdentityResolver.java`
- `src/main/java/at/procon/dip/domain/document/entity/DocumentMailMessage.java`
- `src/main/java/at/procon/dip/domain/document/entity/DocumentMailRecipient.java`
- `src/main/java/at/procon/dip/domain/document/entity/DocumentMailAttachment.java`
- `src/main/java/at/procon/dip/domain/document/entity/MailRecipientType.java`
- `src/main/java/at/procon/dip/domain/document/repository/DocumentMailMessageRepository.java`
- `src/main/java/at/procon/dip/domain/document/repository/DocumentMailRecipientRepository.java`
- `src/main/java/at/procon/dip/domain/document/repository/DocumentMailAttachmentRepository.java`
- `src/main/java/at/procon/dip/ingestion/service/MailMetadataPersistenceService.java`
- `src/main/resources/db/migration/V23__doc_mail_processing_stabilization_step1.sql`
### Modified files
- `src/main/java/at/procon/dip/domain/document/SourceType.java`
- `src/main/java/at/procon/dip/domain/document/repository/DocumentSourceRepository.java`
- `src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java`
- `src/main/java/at/procon/dip/ingestion/service/MailMessageExtractionService.java`
- `src/main/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapter.java`
- `src/main/java/at/procon/ted/camel/MailRoute.java`
## Recommended next step
Proceed with **Step 2** of the Mail Processing Stabilization Phase:
- replay/reprocess services
- failed attachment retry flow
- import/reprocess run tracking
- reporting / operational visibility

@ -53,6 +53,7 @@ dip:
embedding:
jobs:
enabled: true
parallel-batch-count: 1
process-in-batches: true
execution-batch-size: 20
@ -64,4 +65,5 @@ dip:
Notes:
- jobs are grouped by `modelKey`
- non-batch-capable models still fall back to single-item execution
- `execution-batch-size` controls how many texts are sent in one `/vectorize-batch` request
- `parallel-batch-count` controls how many claimed job batches may be started in parallel
- `execution-batch-size` controls how many texts are sent in one `/vectorize-batch` request inside each claimed job batch

@ -7,6 +7,7 @@ public enum SourceType {
TED_PACKAGE,
PACKAGE_CHILD,
MAIL,
MAIL_ATTACHMENT,
FILE_SYSTEM,
REST_UPLOAD,
MANUAL_UPLOAD,

@ -0,0 +1,94 @@
package at.procon.dip.domain.document.entity;
import at.procon.dip.architecture.SchemaNames;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Index;
import jakarta.persistence.PrePersist;
import jakarta.persistence.PreUpdate;
import jakarta.persistence.Table;
import java.time.OffsetDateTime;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Entity
@Table(schema = SchemaNames.DOC, name = "doc_mail_attachment", indexes = {
@Index(name = "idx_doc_mail_attachment_mail", columnList = "mail_document_id"),
@Index(name = "idx_doc_mail_attachment_document", columnList = "attachment_document_id"),
@Index(name = "idx_doc_mail_attachment_part_path", columnList = "part_path"),
@Index(name = "idx_doc_mail_attachment_attachment_index", columnList = "attachment_index")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class DocumentMailAttachment {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@Column(name = "mail_document_id", nullable = false)
private UUID mailDocumentId;
@Column(name = "attachment_document_id", nullable = false)
private UUID attachmentDocumentId;
@Column(name = "disposition", length = 32)
private String disposition;
@Column(name = "content_id", length = 500)
private String contentId;
@Column(name = "filename", length = 1000)
private String filename;
@Column(name = "mime_type", length = 255)
private String mimeType;
@Column(name = "size_bytes")
private Long sizeBytes;
@Column(name = "attachment_index")
private Integer attachmentIndex;
@Column(name = "part_path", length = 500)
private String partPath;
@Column(name = "path_in_archive", columnDefinition = "TEXT")
private String pathInArchive;
@Column(name = "extraction_status", nullable = false, length = 32)
@Builder.Default
private String extractionStatus = "IMPORTED";
@Column(name = "error_message", columnDefinition = "TEXT")
private String errorMessage;
@Builder.Default
@Column(name = "created_at", nullable = false, updatable = false)
private OffsetDateTime createdAt = OffsetDateTime.now();
@Builder.Default
@Column(name = "updated_at", nullable = false)
private OffsetDateTime updatedAt = OffsetDateTime.now();
@PrePersist
protected void onCreate() {
createdAt = OffsetDateTime.now();
updatedAt = OffsetDateTime.now();
}
@PreUpdate
protected void onUpdate() {
updatedAt = OffsetDateTime.now();
}
}

@ -0,0 +1,127 @@
package at.procon.dip.domain.document.entity;
import at.procon.dip.architecture.SchemaNames;
import at.procon.dip.ingestion.mail.MailProviderType;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.FetchType;
import jakarta.persistence.Id;
import jakarta.persistence.Index;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.OneToOne;
import jakarta.persistence.PrePersist;
import jakarta.persistence.PreUpdate;
import jakarta.persistence.Table;
import java.time.OffsetDateTime;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Entity
@Table(schema = SchemaNames.DOC, name = "doc_mail_message", indexes = {
@Index(name = "idx_doc_mail_message_source_id", columnList = "source_id"),
@Index(name = "idx_doc_mail_message_message_id", columnList = "message_id"),
@Index(name = "idx_doc_mail_message_thread_key", columnList = "thread_key"),
@Index(name = "idx_doc_mail_message_provider_identity", columnList = "provider_type, account_key, folder_key, provider_message_key")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class DocumentMailMessage {
@Id
@Column(name = "document_id", nullable = false)
private UUID documentId;
@OneToOne(fetch = FetchType.LAZY, optional = false)
@JoinColumn(name = "document_id", nullable = false, insertable = false, updatable = false)
private Document document;
@Column(name = "source_id")
private UUID sourceId;
@Enumerated(EnumType.STRING)
@Column(name = "provider_type", nullable = false, length = 64)
@Builder.Default
private MailProviderType providerType = MailProviderType.GENERIC;
@Column(name = "account_key", length = 255)
private String accountKey;
@Column(name = "folder_key", length = 255)
private String folderKey;
@Column(name = "provider_message_key", length = 500)
private String providerMessageKey;
@Column(name = "provider_thread_key", length = 500)
private String providerThreadKey;
@Column(name = "message_id", length = 1000)
private String messageId;
@Column(name = "in_reply_to", length = 1000)
private String inReplyTo;
@Column(name = "references_header", columnDefinition = "TEXT")
private String referencesHeader;
@Column(name = "thread_key", length = 1000)
private String threadKey;
@Column(name = "subject", length = 1000)
private String subject;
@Column(name = "normalized_subject", length = 1000)
private String normalizedSubject;
@Column(name = "from_display_name", length = 500)
private String fromDisplayName;
@Column(name = "from_email", length = 500)
private String fromEmail;
@Column(name = "from_raw", length = 1000)
private String fromRaw;
@Column(name = "reply_to_raw", columnDefinition = "TEXT")
private String replyToRaw;
@Column(name = "sent_at")
private OffsetDateTime sentAt;
@Column(name = "received_at")
private OffsetDateTime receivedAt;
@Column(name = "raw_message_hash", length = 64)
private String rawMessageHash;
@Column(name = "raw_header_hash", length = 64)
private String rawHeaderHash;
@Builder.Default
@Column(name = "created_at", nullable = false, updatable = false)
private OffsetDateTime createdAt = OffsetDateTime.now();
@Builder.Default
@Column(name = "updated_at", nullable = false)
private OffsetDateTime updatedAt = OffsetDateTime.now();
@PrePersist
protected void onCreate() {
createdAt = OffsetDateTime.now();
updatedAt = OffsetDateTime.now();
}
@PreUpdate
protected void onUpdate() {
updatedAt = OffsetDateTime.now();
}
}

@ -0,0 +1,67 @@
package at.procon.dip.domain.document.entity;
import at.procon.dip.architecture.SchemaNames;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Index;
import jakarta.persistence.PrePersist;
import jakarta.persistence.Table;
import java.time.OffsetDateTime;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Entity
@Table(schema = SchemaNames.DOC, name = "doc_mail_recipient", indexes = {
@Index(name = "idx_doc_mail_recipient_mail", columnList = "mail_document_id"),
@Index(name = "idx_doc_mail_recipient_type", columnList = "recipient_type"),
@Index(name = "idx_doc_mail_recipient_email", columnList = "email_address")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class DocumentMailRecipient {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@Column(name = "mail_document_id", nullable = false)
private UUID mailDocumentId;
@Enumerated(EnumType.STRING)
@Column(name = "recipient_type", nullable = false, length = 16)
private MailRecipientType recipientType;
@Column(name = "display_name", length = 500)
private String displayName;
@Column(name = "email_address", length = 500)
private String emailAddress;
@Column(name = "raw_value", columnDefinition = "TEXT")
private String rawValue;
@Column(name = "sort_order", nullable = false)
@Builder.Default
private int sortOrder = 0;
@Builder.Default
@Column(name = "created_at", nullable = false, updatable = false)
private OffsetDateTime createdAt = OffsetDateTime.now();
@PrePersist
protected void onCreate() {
createdAt = OffsetDateTime.now();
}
}

@ -0,0 +1,7 @@
package at.procon.dip.domain.document.entity;
public enum MailRecipientType {
TO,
CC,
BCC
}

@ -0,0 +1,16 @@
package at.procon.dip.domain.document.repository;
import at.procon.dip.domain.document.entity.DocumentMailAttachment;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.springframework.data.jpa.repository.JpaRepository;
public interface DocumentMailAttachmentRepository extends JpaRepository<DocumentMailAttachment, UUID> {
List<DocumentMailAttachment> findByMailDocumentId(UUID mailDocumentId);
Optional<DocumentMailAttachment> findByMailDocumentIdAndAttachmentIndex(UUID mailDocumentId, Integer attachmentIndex);
Optional<DocumentMailAttachment> findByMailDocumentIdAndPartPath(UUID mailDocumentId, String partPath);
}

@ -0,0 +1,17 @@
package at.procon.dip.domain.document.repository;
import at.procon.dip.domain.document.entity.DocumentMailMessage;
import at.procon.dip.ingestion.mail.MailProviderType;
import java.util.Optional;
import java.util.UUID;
import org.springframework.data.jpa.repository.JpaRepository;
public interface DocumentMailMessageRepository extends JpaRepository<DocumentMailMessage, UUID> {
Optional<DocumentMailMessage> findByProviderTypeAndAccountKeyAndFolderKeyAndProviderMessageKey(
MailProviderType providerType,
String accountKey,
String folderKey,
String providerMessageKey
);
}

@ -0,0 +1,13 @@
package at.procon.dip.domain.document.repository;
import at.procon.dip.domain.document.entity.DocumentMailRecipient;
import java.util.List;
import java.util.UUID;
import org.springframework.data.jpa.repository.JpaRepository;
public interface DocumentMailRecipientRepository extends JpaRepository<DocumentMailRecipient, UUID> {
List<DocumentMailRecipient> findByMailDocumentId(UUID mailDocumentId);
void deleteByMailDocumentId(UUID mailDocumentId);
}

@ -14,4 +14,6 @@ public interface DocumentSourceRepository extends JpaRepository<DocumentSource,
List<DocumentSource> findBySourceType(SourceType sourceType);
Optional<DocumentSource> findByExternalSourceId(String externalSourceId);
Optional<DocumentSource> findBySourceTypeAndExternalSourceId(SourceType sourceType, String externalSourceId);
}

@ -52,7 +52,7 @@ public class TedNoticeOrganization {
@Column(name = "name", columnDefinition = "TEXT")
private String name;
@Column(name = "company_id", length = 1000)
@Column(name = "company_id", columnDefinition = "TEXT")
private String companyId;
@Column(name = "country_code", columnDefinition = "TEXT")

@ -115,7 +115,7 @@ public class TedNoticeProjection {
@Column(name = "buyer_city", columnDefinition = "TEXT")
private String buyerCity;
@Column(name = "buyer_postal_code", length = 100)
@Column(name = "buyer_postal_code", columnDefinition = "TEXT")
private String buyerPostalCode;
@Column(name = "buyer_nuts_code", length = 10)

@ -0,0 +1,25 @@
package at.procon.dip.embedding.config;
import java.util.concurrent.Executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class EmbeddingJobProcessingConfiguration {
@Bean(name = "embeddingJobProcessingExecutor")
public Executor embeddingJobProcessingExecutor(EmbeddingProperties properties) {
int parallelBatchCount = Math.max(1, properties.getJobs().getParallelBatchCount());
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("embedding-job-");
executor.setCorePoolSize(parallelBatchCount);
executor.setMaxPoolSize(parallelBatchCount);
executor.setQueueCapacity(parallelBatchCount);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
}

@ -67,6 +67,7 @@ public class EmbeddingProperties {
public static class JobsProperties {
private boolean enabled = false;
private int batchSize = 16;
private int parallelBatchCount = 1;
private boolean processInBatches = false;
private int executionBatchSize = 8;
private int maxRetries = 5;

@ -3,15 +3,12 @@ package at.procon.dip.embedding.job.repository;
import at.procon.dip.embedding.job.entity.EmbeddingJob;
import at.procon.dip.embedding.model.EmbeddingJobStatus;
import at.procon.dip.embedding.model.EmbeddingJobType;
import jakarta.persistence.LockModeType;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
@ -23,11 +20,17 @@ public interface EmbeddingJobRepository extends JpaRepository<EmbeddingJob, UUID
EmbeddingJobType jobType,
Collection<EmbeddingJobStatus> statuses);
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT j FROM EmbeddingJob j WHERE j.status IN :statuses AND (j.nextRetryAt IS NULL OR j.nextRetryAt <= :now) ORDER BY j.priority DESC, j.createdAt ASC")
List<EmbeddingJob> findReadyJobsForUpdate(@Param("statuses") Collection<EmbeddingJobStatus> statuses,
@Param("now") OffsetDateTime now,
Pageable pageable);
@Query(value = """
SELECT *
FROM DOC.doc_embedding_job j
WHERE j.status IN ('PENDING', 'RETRY_SCHEDULED')
AND (j.next_retry_at IS NULL OR j.next_retry_at <= :now)
ORDER BY j.priority DESC, j.created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT :limit
""", nativeQuery = true)
List<EmbeddingJob> claimReadyJobs(@Param("now") OffsetDateTime now,
@Param("limit") int limit);
List<EmbeddingJob> findByDocumentId(UUID documentId);
}

@ -15,8 +15,8 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Service
@ -73,12 +73,9 @@ public class EmbeddingJobService {
.build()));
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<EmbeddingJob> claimNextReadyJobs(int limit) {
List<EmbeddingJob> jobs = jobRepository.findReadyJobsForUpdate(
Set.of(EmbeddingJobStatus.PENDING, EmbeddingJobStatus.RETRY_SCHEDULED),
OffsetDateTime.now(),
PageRequest.of(0, limit)
);
List<EmbeddingJob> jobs = jobRepository.claimReadyJobs(OffsetDateTime.now(), limit);
jobs.forEach(job -> {
job.setStatus(EmbeddingJobStatus.IN_PROGRESS);
job.setAttemptCount(job.getAttemptCount() + 1);

@ -0,0 +1,52 @@
package at.procon.dip.embedding.service;
import at.procon.dip.embedding.job.service.EmbeddingJobService;
import at.procon.dip.embedding.model.EmbeddingProviderResult;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Service
@RequiredArgsConstructor
public class EmbeddingJobExecutionPersistenceService {
private final EmbeddingPersistenceService embeddingPersistenceService;
private final EmbeddingJobService jobService;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public UUID startProcessing(UUID representationId, String modelKey) {
var embedding = embeddingPersistenceService.ensurePending(representationId, modelKey);
embeddingPersistenceService.markProcessing(embedding.getId());
return embedding.getId();
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void completeJob(UUID embeddingId,
EmbeddingProviderResult result,
UUID jobId,
String providerRequestId) {
embeddingPersistenceService.saveCompleted(embeddingId, result);
jobService.markCompleted(jobId, providerRequestId);
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void completeJob(UUID embeddingId,
float[] vector,
Integer tokenCount,
UUID jobId,
String providerRequestId) {
embeddingPersistenceService.saveCompleted(embeddingId, vector, tokenCount);
jobService.markCompleted(jobId, providerRequestId);
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void failJob(UUID embeddingId,
UUID jobId,
String errorMessage,
boolean retryable) {
embeddingPersistenceService.markFailed(embeddingId, errorMessage);
jobService.markFailed(jobId, errorMessage, retryable);
}
}

@ -1,6 +1,5 @@
package at.procon.dip.embedding.service;
import at.procon.dip.domain.document.entity.DocumentEmbedding;
import at.procon.dip.domain.document.entity.DocumentTextRepresentation;
import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository;
import at.procon.dip.embedding.config.EmbeddingProperties;
@ -17,35 +16,51 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@RequiredArgsConstructor
@Slf4j
public class RepresentationEmbeddingOrchestrator {
private final EmbeddingJobService jobService;
private final EmbeddingExecutionService executionService;
private final EmbeddingPersistenceService persistenceService;
private final EmbeddingJobExecutionPersistenceService executionPersistenceService;
private final DocumentTextRepresentationRepository representationRepository;
private final EmbeddingSelectionPolicy selectionPolicy;
private final EmbeddingModelRegistry modelRegistry;
private final EmbeddingProperties embeddingProperties;
private final Executor embeddingJobProcessingExecutor;
public RepresentationEmbeddingOrchestrator(EmbeddingJobService jobService,
EmbeddingExecutionService executionService,
EmbeddingJobExecutionPersistenceService executionPersistenceService,
DocumentTextRepresentationRepository representationRepository,
EmbeddingSelectionPolicy selectionPolicy,
EmbeddingModelRegistry modelRegistry,
EmbeddingProperties embeddingProperties,
@Qualifier("embeddingJobProcessingExecutor") Executor embeddingJobProcessingExecutor) {
this.jobService = jobService;
this.executionService = executionService;
this.executionPersistenceService = executionPersistenceService;
this.representationRepository = representationRepository;
this.selectionPolicy = selectionPolicy;
this.modelRegistry = modelRegistry;
this.embeddingProperties = embeddingProperties;
this.embeddingJobProcessingExecutor = embeddingJobProcessingExecutor;
}
@Transactional
public List<EmbeddingJob> enqueueDocument(UUID documentId) {
return jobService.enqueueForDocument(documentId);
}
@Transactional
public List<EmbeddingJob> enqueueDocument(UUID documentId, String modelKey) {
return jobService.enqueueForDocument(documentId, modelKey);
}
@Transactional
public List<EmbeddingJob> enqueueDocument(UUID documentId, String modelKey, EmbeddingProfile profile) {
var model = modelRegistry.getRequired(modelKey);
return selectionPolicy.selectRepresentations(documentId, model, profile).stream()
@ -53,32 +68,34 @@ public class RepresentationEmbeddingOrchestrator {
.toList();
}
@Transactional
public EmbeddingJob enqueueRepresentation(UUID documentId, UUID representationId, String modelKey) {
return jobService.enqueueForRepresentation(documentId, representationId, modelKey, EmbeddingJobType.DOCUMENT_EMBED);
}
@Transactional
public int processNextReadyBatch() {
if (!embeddingProperties.isEnabled() || !embeddingProperties.getJobs().isEnabled()) {
log.debug("New embedding subsystem jobs are disabled");
return 0;
}
List<EmbeddingJob> jobs = jobService.claimNextReadyJobs(embeddingProperties.getJobs().getBatchSize());
int batchSize = Math.max(1, embeddingProperties.getJobs().getBatchSize());
int parallelBatchCount = Math.max(1, embeddingProperties.getJobs().getParallelBatchCount());
int claimLimit = batchSize * parallelBatchCount;
List<EmbeddingJob> jobs = jobService.claimNextReadyJobs(claimLimit);
if (jobs.isEmpty()) {
return 0;
}
if (embeddingProperties.getJobs().isProcessInBatches()) {
processClaimedJobsInBatches(jobs);
List<List<EmbeddingJob>> claimedBatches = partition(jobs, batchSize);
if (claimedBatches.size() == 1) {
processClaimedJobBatch(claimedBatches.getFirst());
} else {
jobs.forEach(this::processClaimedJobSafely);
runClaimedBatchesInParallel(claimedBatches);
}
return jobs.size();
}
@Transactional
public void processClaimedJob(EmbeddingJob job) {
EmbeddingModelDescriptor model = modelRegistry.getRequired(job.getModelKey());
PreparedEmbedding prepared = prepareEmbedding(job, model);
@ -92,16 +109,43 @@ public class RepresentationEmbeddingOrchestrator {
EmbeddingUseCase.DOCUMENT,
List.of(prepared.text())
);
persistenceService.saveCompleted(prepared.embeddingId(), result);
jobService.markCompleted(job.getId(), result.providerRequestId());
executionPersistenceService.completeJob(
prepared.embeddingId(),
result,
job.getId(),
result.providerRequestId()
);
} catch (RuntimeException ex) {
persistenceService.markFailed(prepared.embeddingId(), ex.getMessage());
jobService.markFailed(job.getId(), ex.getMessage(), true);
executionPersistenceService.failJob(
prepared.embeddingId(),
job.getId(),
ex.getMessage(),
true
);
throw ex;
}
}
private void processClaimedJobsInBatches(List<EmbeddingJob> jobs) {
private void processClaimedJobBatch(List<EmbeddingJob> jobs) {
if (embeddingProperties.getJobs().isProcessInBatches()) {
processClaimedJobsInExecutionBatches(jobs);
} else {
jobs.forEach(this::processClaimedJobSafely);
}
}
private void runClaimedBatchesInParallel(List<List<EmbeddingJob>> claimedBatches) {
List<CompletableFuture<Void>> futures = claimedBatches.stream()
.map(batch -> CompletableFuture.runAsync(
() -> processClaimedJobBatch(batch),
embeddingJobProcessingExecutor
))
.toList();
futures.forEach(CompletableFuture::join);
}
private void processClaimedJobsInExecutionBatches(List<EmbeddingJob> jobs) {
LinkedHashMap<String, List<EmbeddingJob>> jobsByModelKey = new LinkedHashMap<>();
for (EmbeddingJob job : jobs) {
jobsByModelKey.computeIfAbsent(job.getModelKey(), ignored -> new ArrayList<>()).add(job);
@ -127,6 +171,14 @@ public class RepresentationEmbeddingOrchestrator {
}
}
private List<List<EmbeddingJob>> partition(List<EmbeddingJob> jobs, int batchSize) {
List<List<EmbeddingJob>> batches = new ArrayList<>();
for (int start = 0; start < jobs.size(); start += batchSize) {
batches.add(jobs.subList(start, Math.min(start + batchSize, jobs.size())));
}
return batches;
}
private void processClaimedBatchSafely(List<EmbeddingJob> jobs, EmbeddingModelDescriptor model) {
try {
processClaimedBatch(jobs, model);
@ -174,13 +226,22 @@ public class RepresentationEmbeddingOrchestrator {
for (int i = 0; i < preparedItems.size(); i++) {
PreparedEmbedding prepared = preparedItems.get(i);
persistenceService.saveCompleted(prepared.embeddingId(), result.vectors().get(i), null);
jobService.markCompleted(prepared.job().getId(), result.providerRequestId());
executionPersistenceService.completeJob(
prepared.embeddingId(),
result.vectors().get(i),
null,
prepared.job().getId(),
result.providerRequestId()
);
}
} catch (RuntimeException ex) {
for (PreparedEmbedding prepared : preparedItems) {
persistenceService.markFailed(prepared.embeddingId(), ex.getMessage());
jobService.markFailed(prepared.job().getId(), ex.getMessage(), true);
executionPersistenceService.failJob(
prepared.embeddingId(),
prepared.job().getId(),
ex.getMessage(),
true
);
}
throw ex;
}
@ -203,9 +264,8 @@ public class RepresentationEmbeddingOrchestrator {
text = text.substring(0, maxChars);
}
DocumentEmbedding embedding = persistenceService.ensurePending(representation.getId(), job.getModelKey());
persistenceService.markProcessing(embedding.getId());
return new PreparedEmbedding(job, embedding.getId(), text);
UUID embeddingId = executionPersistenceService.startProcessing(representation.getId(), job.getModelKey());
return new PreparedEmbedding(job, embeddingId, text);
}
private record PreparedEmbedding(EmbeddingJob job, UUID embeddingId, String text) {

@ -4,29 +4,37 @@ import at.procon.dip.domain.access.DocumentAccessContext;
import at.procon.dip.domain.access.DocumentVisibility;
import at.procon.dip.domain.document.RelationType;
import at.procon.dip.domain.document.SourceType;
import at.procon.dip.domain.document.entity.DocumentSource;
import at.procon.dip.domain.tenant.TenantRef;
import at.procon.dip.domain.document.repository.DocumentSourceRepository;
import at.procon.dip.domain.document.service.DocumentRelationService;
import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand;
import at.procon.dip.ingestion.config.DipIngestionProperties;
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
import at.procon.dip.ingestion.mail.MailImportIdentityResolver;
import at.procon.dip.ingestion.mail.MailProviderEnvelope;
import at.procon.dip.ingestion.mail.MailProviderEnvelopeAttributes;
import at.procon.dip.ingestion.service.GenericDocumentImportService;
import at.procon.dip.ingestion.service.MailMessageExtractionService;
import at.procon.dip.ingestion.service.MailMessageExtractionService.MailAttachment;
import at.procon.dip.ingestion.service.MailMessageExtractionService.ParsedMailMessage;
import at.procon.dip.ingestion.service.MailMetadataPersistenceService;
import at.procon.dip.ingestion.spi.DocumentIngestionAdapter;
import at.procon.dip.ingestion.spi.IngestionResult;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.dip.ingestion.util.DocumentImportSupport;
import at.procon.dip.ingestion.config.DipIngestionProperties;
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
import at.procon.dip.runtime.config.RuntimeMode;
import at.procon.ted.service.attachment.AttachmentExtractor;
import at.procon.ted.service.attachment.ZipExtractionService;
import at.procon.ted.util.HashUtils;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -42,6 +50,9 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter {
private final MailMessageExtractionService mailExtractionService;
private final DocumentRelationService relationService;
private final ZipExtractionService zipExtractionService;
private final MailImportIdentityResolver mailImportIdentityResolver;
private final MailMetadataPersistenceService mailMetadataPersistenceService;
private final DocumentSourceRepository documentSourceRepository;
@Override
public boolean supports(SourceDescriptor sourceDescriptor) {
@ -57,20 +68,28 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter {
throw new IllegalArgumentException("Mail adapter requires raw MIME bytes");
}
ParsedMailMessage parsed = mailExtractionService.parse(rawMime);
MailProviderEnvelope providerEnvelope = MailProviderEnvelopeAttributes.fromAttributes(sourceDescriptor.attributes());
DocumentAccessContext accessContext = sourceDescriptor.accessContext() == null ? defaultMailAccessContext() : sourceDescriptor.accessContext();
String rootSourceIdentifier = mailImportIdentityResolver.resolveRootSourceIdentifier(parsed, providerEnvelope, rawMime);
Map<String, String> rootAttributes = new LinkedHashMap<>(sourceDescriptor.attributes() == null ? Map.of() : sourceDescriptor.attributes());
Map<String, String> rootAttributes = MailProviderEnvelopeAttributes.merge(sourceDescriptor.attributes(), providerEnvelope);
if (parsed.subject() != null) rootAttributes.put("subject", parsed.subject());
if (parsed.from() != null) rootAttributes.put("from", parsed.from());
if (!parsed.recipients().isEmpty()) rootAttributes.put("to", String.join(", ", parsed.recipients()));
if (parsed.fromRaw() != null) rootAttributes.put("from", parsed.fromRaw());
List<String> toRecipients = parsed.recipients().stream()
.filter(recipient -> recipient.recipientType() == at.procon.dip.domain.document.entity.MailRecipientType.TO)
.map(MailMessageExtractionService.MailRecipient::rawValue)
.toList();
if (!toRecipients.isEmpty()) rootAttributes.put("to", String.join(", ", toRecipients));
rootAttributes.putIfAbsent("title", parsed.subject() != null ? parsed.subject() : sourceDescriptor.fileName());
rootAttributes.put("attachmentCount", Integer.toString(parsed.attachments().size()));
rootAttributes.put("importBatchId", properties.getMailImportBatchId());
if (parsed.messageId() != null) rootAttributes.put("messageId", parsed.messageId());
if (parsed.inReplyTo() != null) rootAttributes.put("inReplyTo", parsed.inReplyTo());
ImportedDocumentResult rootResult = importService.importDocument(new SourceDescriptor(
accessContext,
SourceType.MAIL,
sourceDescriptor.sourceIdentifier(),
rootSourceIdentifier,
sourceDescriptor.sourceUri(),
sourceDescriptor.fileName() != null ? sourceDescriptor.fileName() : fallbackMailFileName(parsed),
"message/rfc822",
@ -81,36 +100,65 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter {
rootAttributes
));
UUIDSource rootSource = resolveSourceId(SourceType.MAIL, rootSourceIdentifier);
mailMetadataPersistenceService.upsertMessageMetadata(
rootResult.document().getId(),
rootSource.sourceId(),
parsed,
providerEnvelope,
HashUtils.computeSha256(rawMime)
);
List<at.procon.dip.domain.document.CanonicalDocumentMetadata> documents = new ArrayList<>();
List<String> warnings = new ArrayList<>(rootResult.warnings());
documents.add(rootResult.document().toCanonicalMetadata());
int sortOrder = 0;
for (MailAttachment attachment : parsed.attachments()) {
importAttachment(rootResult.document().getId(), accessContext, sourceDescriptor, attachment, documents, warnings, ++sortOrder, 0);
importAttachment(rootResult.document().getId(), rootResult.document().getId(), rootSourceIdentifier, accessContext, sourceDescriptor, attachment, documents, warnings, ++sortOrder, 0);
}
return new IngestionResult(documents, warnings);
}
private void importAttachment(java.util.UUID parentDocumentId, DocumentAccessContext accessContext, SourceDescriptor parentSource,
MailAttachment attachment, List<at.procon.dip.domain.document.CanonicalDocumentMetadata> documents,
List<String> warnings, int sortOrder, int depth) {
private void importAttachment(java.util.UUID mailRootDocumentId,
java.util.UUID parentDocumentId,
String rootSourceIdentifier,
DocumentAccessContext accessContext,
SourceDescriptor parentSource,
MailAttachment attachment,
List<at.procon.dip.domain.document.CanonicalDocumentMetadata> documents,
List<String> warnings,
int sortOrder,
int depth) {
boolean expandableWrapper = properties.isExpandMailZipAttachments()
&& zipExtractionService.canHandle(attachment.fileName(), attachment.contentType());
String attachmentSourceIdentifier = mailImportIdentityResolver.resolveAttachmentSourceIdentifier(rootSourceIdentifier, attachment);
Map<String, String> attachmentAttributes = new LinkedHashMap<>();
attachmentAttributes.put("title", attachment.fileName());
attachmentAttributes.put("mailSourceIdentifier", parentSource.sourceIdentifier());
attachmentAttributes.put("mailRootSourceIdentifier", rootSourceIdentifier);
attachmentAttributes.put("importBatchId", properties.getMailImportBatchId());
if (attachment.partPath() != null) {
attachmentAttributes.put("mailPartPath", attachment.partPath());
}
if (attachment.attachmentIndex() != null) {
attachmentAttributes.put("mailAttachmentIndex", attachment.attachmentIndex().toString());
}
if (attachment.contentId() != null) {
attachmentAttributes.put("mailContentId", attachment.contentId());
}
if (attachment.disposition() != null) {
attachmentAttributes.put("mailDisposition", attachment.disposition());
}
if (expandableWrapper) {
attachmentAttributes.put("wrapperDocument", Boolean.TRUE.toString());
}
ImportedDocumentResult attachmentResult = importService.importDocument(new SourceDescriptor(
accessContext,
SourceType.MAIL,
parentSource.sourceIdentifier() + ":attachment:" + depth + ":" + attachment.fileName(),
SourceType.MAIL_ATTACHMENT,
attachmentSourceIdentifier,
parentSource.sourceUri(),
attachment.fileName(),
DocumentImportSupport.normalizeMediaType(attachment.contentType()),
@ -125,6 +173,13 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter {
RelationType relationType = depth > 0 || attachment.path() != null ? RelationType.EXTRACTED_FROM : RelationType.ATTACHMENT_OF;
relationService.ensureRelation(new CreateDocumentRelationCommand(
parentDocumentId, attachmentResult.document().getId(), relationType, sortOrder, attachment.fileName()));
mailMetadataPersistenceService.upsertAttachmentMetadata(
mailRootDocumentId,
attachmentResult.document().getId(),
attachment,
"IMPORTED",
null
);
if (expandableWrapper) {
AttachmentExtractor.ExtractionResult zipResult = zipExtractionService.extract(attachment.data(), attachment.fileName(), attachment.contentType());
@ -134,11 +189,26 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter {
}
int childSort = 0;
for (AttachmentExtractor.ChildAttachment child : zipResult.childAttachments()) {
importAttachment(attachmentResult.document().getId(), accessContext, parentSource,
new MailAttachment(child.filename(), child.contentType(), child.data(), child.data().length, child.pathInArchive()),
documents, warnings, ++childSort, depth + 1);
importAttachment(mailRootDocumentId, attachmentResult.document().getId(), rootSourceIdentifier, accessContext, parentSource,
new MailAttachment(
child.filename(),
child.contentType(),
child.data(),
Long.valueOf(child.data().length),
child.pathInArchive(),
attachment.partPath() == null ? null : attachment.partPath() + "/" + child.pathInArchive(),
attachment.disposition(),
attachment.contentId(),
attachment.attachmentIndex() == null ? null : attachment.attachmentIndex() * 1000 + (++childSort)
),
documents, warnings, childSort, depth + 1);
}
}
}
private UUIDSource resolveSourceId(SourceType sourceType, String sourceIdentifier) {
Optional<DocumentSource> source = documentSourceRepository.findBySourceTypeAndExternalSourceId(sourceType, sourceIdentifier);
return new UUIDSource(source.map(DocumentSource::getId).orElse(null));
}
private String fallbackMailFileName(ParsedMailMessage parsed) {
@ -170,4 +240,7 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter {
}
return null;
}
private record UUIDSource(java.util.UUID sourceId) {
}
}

@ -0,0 +1,207 @@
package at.procon.dip.ingestion.camel;
import at.procon.dip.domain.document.SourceType;
import at.procon.dip.ingestion.config.DipIngestionProperties;
import at.procon.dip.ingestion.mail.MailServerEndpointUriFactory;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.dip.ingestion.service.DocumentIngestionGateway;
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
import at.procon.dip.runtime.config.RuntimeMode;
import jakarta.mail.Message;
import java.io.ByteArrayOutputStream;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mail.MailMessage;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
@ConditionalOnRuntimeMode(RuntimeMode.NEW)
@RequiredArgsConstructor
@Slf4j
public class GenericMailIngestionRoute extends RouteBuilder {
static final String ROUTE_ID_MAIL_CONSUMER = "dip-mail-consumer";
static final String ROUTE_ID_MAIL_INGEST = "dip-mail-ingest";
private final DipIngestionProperties properties;
private final DocumentIngestionGateway ingestionGateway;
private final MailServerEndpointUriFactory endpointUriFactory;
@Override
public void configure() {
if (!properties.isEnabled() || !properties.isMailAdapterEnabled() || !properties.getMailRoute().isEnabled()) {
log.info("NEW mail ingestion Camel route disabled");
return;
}
String consumerUri = endpointUriFactory.buildConsumerUri(properties.getMailRoute());
log.info("Configuring NEW mail ingestion route (protocol={}, host={}, folder={}, user={})",
properties.getMailRoute().getProtocol(),
properties.getMailRoute().getHost(),
properties.getMailRoute().getFolderName(),
properties.getMailRoute().getUsername());
errorHandler(deadLetterChannel("direct:dip-mail-error")
.maximumRedeliveries(3)
.redeliveryDelay(5000)
.retryAttemptedLogLevel(LoggingLevel.WARN)
.logStackTrace(true));
from("direct:dip-mail-error")
.routeId("dip-mail-error")
.process(exchange -> {
Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
String subject = exchange.getIn().getHeader("mailSubject", String.class);
if (exception != null) {
log.error("NEW mail ingestion failed for subject '{}': {}", subject, exception.getMessage(), exception);
}
});
from(consumerUri)
.routeId(ROUTE_ID_MAIL_CONSUMER)
.to("direct:dip-mail-ingest");
from("direct:dip-mail-ingest")
.routeId(ROUTE_ID_MAIL_INGEST)
.process(this::ingestMailExchange)
.log(LoggingLevel.INFO, "Imported mail message into NEW ingestion pipeline: ${header.mailSubject}");
}
private void ingestMailExchange(Exchange exchange) throws Exception {
Message mailMessage = resolveMessage(exchange);
if (mailMessage == null) {
log.warn("Received null mail message, skipping NEW mail ingestion");
return;
}
String subject = mailMessage.getSubject();
String from = mailMessage.getFrom() != null && mailMessage.getFrom().length > 0
? mailMessage.getFrom()[0].toString() : null;
String messageId = firstHeader(mailMessage, "Message-ID");
String providerMessageKey = firstNonBlankHeader(exchange, "CamelMailUid", "uid", "mailUid", "MailUid");
String providerThreadKey = firstNonBlankHeader(exchange, "CamelMailThreadId", "mailThreadId", "MailThreadId");
exchange.getIn().setHeader("mailSubject", subject);
byte[] rawMime;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
mailMessage.writeTo(baos);
rawMime = baos.toByteArray();
}
Map<String, String> attributes = new LinkedHashMap<>();
putIfHasText(attributes, "subject", subject);
putIfHasText(attributes, "from", from);
putIfHasText(attributes, "providerType", properties.getMailRoute().getProtocol().name());
putIfHasText(attributes, "providerProtocol", properties.getMailRoute().getProtocol().camelScheme());
putIfHasText(attributes, "providerAccountKey", accountKey());
putIfHasText(attributes, "providerFolderKey", properties.getMailRoute().getFolderName());
putIfHasText(attributes, "providerMessageKey", providerMessageKey);
putIfHasText(attributes, "providerThreadKey", providerThreadKey);
putIfHasText(attributes, "messageId", messageId);
SourceDescriptor descriptor = new SourceDescriptor(
null,
SourceType.MAIL,
buildSourceIdentifier(providerMessageKey, messageId),
buildSourceUri(providerMessageKey),
fallbackMailFileName(subject),
"message/rfc822",
rawMime,
null,
mailMessage.getReceivedDate() == null
? OffsetDateTime.now()
: mailMessage.getReceivedDate().toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime(),
OriginalContentStoragePolicy.DEFAULT,
attributes
);
ingestionGateway.ingest(descriptor);
}
private Message resolveMessage(Exchange exchange) {
MailMessage camelMailMessage = exchange.getIn().getBody(MailMessage.class);
if (camelMailMessage != null) {
return camelMailMessage.getMessage();
}
return exchange.getIn().getBody(Message.class);
}
private String buildSourceIdentifier(String providerMessageKey, String messageId) {
if (StringUtils.hasText(providerMessageKey)) {
return properties.getMailRoute().getProtocol().name() + ":" + accountKey() + ":"
+ defaultIfBlank(properties.getMailRoute().getFolderName(), "INBOX") + ":" + providerMessageKey;
}
if (StringUtils.hasText(messageId)) {
return messageId;
}
return UUID.randomUUID().toString();
}
private String buildSourceUri(String providerMessageKey) {
StringBuilder uri = new StringBuilder();
uri.append(properties.getMailRoute().getProtocol().camelScheme())
.append("://")
.append(properties.getMailRoute().getHost())
.append("/")
.append(defaultIfBlank(properties.getMailRoute().getFolderName(), "INBOX"));
if (StringUtils.hasText(providerMessageKey)) {
uri.append("#").append(providerMessageKey);
}
return uri.toString();
}
private String fallbackMailFileName(String subject) {
String safeSubject = !StringUtils.hasText(subject) ? "mail-message" : subject.replaceAll("[^A-Za-z0-9._-]", "_");
return safeSubject + ".eml";
}
private String firstHeader(Message message, String name) {
try {
String[] values = message.getHeader(name);
return values != null && values.length > 0 ? values[0] : null;
} catch (Exception e) {
return null;
}
}
private String firstNonBlankHeader(Exchange exchange, String... names) {
for (String name : names) {
String value = exchange.getIn().getHeader(name, String.class);
if (StringUtils.hasText(value)) {
return value;
}
Object objectValue = exchange.getIn().getHeader(name);
if (objectValue != null) {
value = String.valueOf(objectValue);
if (StringUtils.hasText(value)) {
return value;
}
}
}
return null;
}
private String accountKey() {
return defaultIfBlank(properties.getMailRoute().getAccountKey(), properties.getMailRoute().getUsername());
}
private void putIfHasText(Map<String, String> attributes, String key, String value) {
if (StringUtils.hasText(value)) {
attributes.put(key, value);
}
}
private String defaultIfBlank(String value, String defaultValue) {
return StringUtils.hasText(value) ? value : defaultValue;
}
}

@ -1,8 +1,7 @@
package at.procon.dip.ingestion.config;
import at.procon.dip.domain.access.DocumentVisibility;
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
import at.procon.dip.runtime.config.RuntimeMode;
import at.procon.dip.ingestion.mail.MailServerProtocol;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Positive;
import lombok.Data;
@ -46,6 +45,7 @@ public class DipIngestionProperties {
private boolean tedPackageAdapterEnabled = true;
private boolean mailAdapterEnabled = false;
private MailRouteProperties mailRoute = new MailRouteProperties();
private String mailDefaultOwnerTenantKey;
private DocumentVisibility mailDefaultVisibility = DocumentVisibility.TENANT;
@ -58,4 +58,35 @@ public class DipIngestionProperties {
@NotBlank
private String mailImportBatchId = "phase41-mail";
@Data
public static class MailRouteProperties {
private boolean enabled = false;
private MailServerProtocol protocol = MailServerProtocol.IMAPS;
private String host;
private Integer port;
private String username;
private String password;
private String folderName = "INBOX";
private String accountKey;
private boolean delete = false;
private boolean ssl = true;
/**
* true = fetch only unseen messages
*/
private boolean unseen = true;
/**
* true = do not mark messages as seen while consuming
*/
private boolean peek = true;
@Positive
private long delay = 15000;
@Positive
private int maxMessagesPerPoll = 20;
private int fetchSize = -1;
private boolean closeFolder = false;
private boolean debugMode = false;
private int connectionTimeout = 30000;
private int consumerConnectionTimeout = 30000;
}
}

@ -0,0 +1,58 @@
package at.procon.dip.ingestion.mail;
import at.procon.dip.ingestion.config.DipIngestionProperties;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class CamelMailServerEndpointUriFactory implements MailServerEndpointUriFactory {
@Override
public String buildConsumerUri(DipIngestionProperties.MailRouteProperties properties) {
if (properties == null) {
throw new IllegalArgumentException("Mail route properties must not be null");
}
if (!StringUtils.hasText(properties.getHost())) {
throw new IllegalArgumentException("dip.ingestion.mail-route.host must not be blank when the mail route is enabled");
}
if (!StringUtils.hasText(properties.getUsername())) {
throw new IllegalArgumentException("dip.ingestion.mail-route.username must not be blank when the mail route is enabled");
}
if (properties.getProtocol() == null) {
throw new IllegalArgumentException("dip.ingestion.mail-route.protocol must not be null");
}
StringBuilder uri = new StringBuilder();
uri.append(properties.getProtocol().camelScheme()).append("://");
uri.append(properties.getHost());
if (properties.getPort() != null) {
uri.append(":").append(properties.getPort());
}
uri.append("?username=").append(encode(properties.getUsername()));
uri.append("&password=").append(encode(properties.getPassword()));
uri.append("&folderName=").append(encode(defaultIfBlank(properties.getFolderName(), "INBOX")));
uri.append("&delete=").append(properties.isDelete());
uri.append("&peek=").append(properties.isPeek());
uri.append("&unseen=").append(properties.isUnseen());
uri.append("&delay=").append(properties.getDelay());
uri.append("&maxMessagesPerPoll=").append(properties.getMaxMessagesPerPoll());
uri.append("&fetchSize=").append(properties.getFetchSize());
uri.append("&closeFolder=").append(properties.isCloseFolder());
uri.append("&debugMode=").append(properties.isDebugMode());
uri.append("&connectionTimeout=").append(properties.getConnectionTimeout());
return uri.toString();
}
private String encode(String value) {
if (value == null) {
return "";
}
return URLEncoder.encode(value, StandardCharsets.UTF_8);
}
private String defaultIfBlank(String value, String defaultValue) {
return StringUtils.hasText(value) ? value : defaultValue;
}
}

@ -0,0 +1,22 @@
package at.procon.dip.ingestion.mail;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* Default implementation of the generic provider metadata contract.
*/
public record GenericMailProviderEnvelope(
MailProviderType providerType,
String accountKey,
String folderKey,
String providerMessageKey,
String providerThreadKey,
Map<String, String> providerAttributes
) implements MailProviderEnvelope {
public GenericMailProviderEnvelope {
providerType = providerType == null ? MailProviderType.GENERIC : providerType;
providerAttributes = providerAttributes == null ? Map.of() : Map.copyOf(new LinkedHashMap<>(providerAttributes));
}
}

@ -0,0 +1,52 @@
package at.procon.dip.ingestion.mail;
import at.procon.dip.ingestion.service.MailMessageExtractionService;
import at.procon.ted.util.HashUtils;
import java.util.Locale;
import org.springframework.stereotype.Service;
@Service
public class MailImportIdentityResolver {
public String resolveRootSourceIdentifier(MailMessageExtractionService.ParsedMailMessage parsed,
MailProviderEnvelope envelope,
byte[] rawMime) {
if (envelope != null && hasText(envelope.providerMessageKey())) {
return "mail:" + envelope.providerType().name().toLowerCase(Locale.ROOT)
+ ":" + normalizeIdentityFragment(envelope.accountKey())
+ ":" + normalizeIdentityFragment(envelope.folderKey())
+ ":" + normalizeIdentityFragment(envelope.providerMessageKey());
}
if (hasText(parsed.messageId())) {
return "mail:message-id:" + normalizeIdentityFragment(parsed.messageId());
}
return "mail:raw:" + HashUtils.computeSha256(rawMime);
}
public String resolveAttachmentSourceIdentifier(String rootSourceIdentifier,
MailMessageExtractionService.MailAttachment attachment) {
if (hasText(attachment.partPath())) {
return rootSourceIdentifier + ":part:" + normalizeIdentityFragment(attachment.partPath());
}
if (attachment.attachmentIndex() != null) {
return rootSourceIdentifier + ":attachment:" + attachment.attachmentIndex();
}
return rootSourceIdentifier + ":attachment:"
+ normalizeIdentityFragment(attachment.fileName())
+ ":" + HashUtils.computeSha256(attachment.data());
}
private String normalizeIdentityFragment(String raw) {
if (raw == null || raw.isBlank()) {
return "_";
}
return raw.trim()
.replaceAll("^<|>$", "")
.replaceAll("\\s+", "_")
.replaceAll("[^A-Za-z0-9._:@/+\\-]", "_");
}
private boolean hasText(String value) {
return value != null && !value.isBlank();
}
}

@ -0,0 +1,22 @@
package at.procon.dip.ingestion.mail;
import java.util.Map;
/**
* Generic provider metadata for a mail message fetched from an upstream mail system.
* Implementations may be backed by IMAP, EWS, Graph, Gmail API, filesystem replay, etc.
*/
public interface MailProviderEnvelope {
MailProviderType providerType();
String accountKey();
String folderKey();
String providerMessageKey();
String providerThreadKey();
Map<String, String> providerAttributes();
}

@ -0,0 +1,81 @@
package at.procon.dip.ingestion.mail;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* Encodes/decodes generic mail provider metadata into SourceDescriptor attributes.
*/
public final class MailProviderEnvelopeAttributes {
public static final String KEY_PROVIDER_TYPE = "mail.providerType";
public static final String KEY_ACCOUNT_KEY = "mail.accountKey";
public static final String KEY_FOLDER_KEY = "mail.folderKey";
public static final String KEY_PROVIDER_MESSAGE_KEY = "mail.providerMessageKey";
public static final String KEY_PROVIDER_THREAD_KEY = "mail.providerThreadKey";
public static final String KEY_PREFIX_PROVIDER_ATTRIBUTE = "mail.providerAttribute.";
private MailProviderEnvelopeAttributes() {
}
public static GenericMailProviderEnvelope fromAttributes(Map<String, String> attributes) {
if (attributes == null || attributes.isEmpty()) {
return new GenericMailProviderEnvelope(MailProviderType.GENERIC, null, null, null, null, Map.of());
}
MailProviderType providerType = parseProviderType(attributes.get(KEY_PROVIDER_TYPE));
Map<String, String> providerAttributes = new LinkedHashMap<>();
for (Map.Entry<String, String> entry : attributes.entrySet()) {
if (entry.getKey() != null && entry.getKey().startsWith(KEY_PREFIX_PROVIDER_ATTRIBUTE)) {
providerAttributes.put(entry.getKey().substring(KEY_PREFIX_PROVIDER_ATTRIBUTE.length()), entry.getValue());
}
}
return new GenericMailProviderEnvelope(
providerType,
emptyToNull(attributes.get(KEY_ACCOUNT_KEY)),
emptyToNull(attributes.get(KEY_FOLDER_KEY)),
emptyToNull(attributes.get(KEY_PROVIDER_MESSAGE_KEY)),
emptyToNull(attributes.get(KEY_PROVIDER_THREAD_KEY)),
providerAttributes
);
}
public static Map<String, String> merge(Map<String, String> baseAttributes, MailProviderEnvelope envelope) {
Map<String, String> merged = new LinkedHashMap<>();
if (baseAttributes != null && !baseAttributes.isEmpty()) {
merged.putAll(baseAttributes);
}
if (envelope == null) {
return merged;
}
merged.put(KEY_PROVIDER_TYPE, envelope.providerType().name());
putIfHasText(merged, KEY_ACCOUNT_KEY, envelope.accountKey());
putIfHasText(merged, KEY_FOLDER_KEY, envelope.folderKey());
putIfHasText(merged, KEY_PROVIDER_MESSAGE_KEY, envelope.providerMessageKey());
putIfHasText(merged, KEY_PROVIDER_THREAD_KEY, envelope.providerThreadKey());
if (envelope.providerAttributes() != null) {
envelope.providerAttributes().forEach((key, value) -> putIfHasText(merged, KEY_PREFIX_PROVIDER_ATTRIBUTE + key, value));
}
return merged;
}
private static MailProviderType parseProviderType(String raw) {
if (raw == null || raw.isBlank()) {
return MailProviderType.GENERIC;
}
try {
return MailProviderType.valueOf(raw.trim().toUpperCase());
} catch (IllegalArgumentException ex) {
return MailProviderType.GENERIC;
}
}
private static void putIfHasText(Map<String, String> target, String key, String value) {
if (value != null && !value.isBlank()) {
target.put(key, value);
}
}
private static String emptyToNull(String value) {
return value == null || value.isBlank() ? null : value;
}
}

@ -0,0 +1,15 @@
package at.procon.dip.ingestion.mail;
/**
* Logical provider/source type for imported mail messages.
* The import pipeline should depend on this generic designation instead of
* binding itself to IMAP-specific semantics.
*/
public enum MailProviderType {
IMAP,
POP3,
EWS,
MICROSOFT_GRAPH,
GMAIL_API,
GENERIC
}

@ -0,0 +1,7 @@
package at.procon.dip.ingestion.mail;
import at.procon.dip.ingestion.config.DipIngestionProperties;
public interface MailServerEndpointUriFactory {
String buildConsumerUri(DipIngestionProperties.MailRouteProperties properties);
}

@ -0,0 +1,18 @@
package at.procon.dip.ingestion.mail;
public enum MailServerProtocol {
IMAP("imap"),
IMAPS("imaps"),
POP3("pop3"),
POP3S("pop3s");
private final String camelScheme;
MailServerProtocol(String camelScheme) {
this.camelScheme = camelScheme;
}
public String camelScheme() {
return camelScheme;
}
}

@ -9,6 +9,7 @@ import at.procon.dip.domain.document.ContentRole;
import at.procon.dip.domain.document.DocumentStatus;
import at.procon.dip.domain.document.StorageType;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.entity.DocumentSource;
import at.procon.dip.domain.document.entity.DocumentContent;
import at.procon.dip.domain.document.repository.DocumentRepository;
import at.procon.dip.domain.document.repository.DocumentSourceRepository;
@ -92,6 +93,13 @@ public class GenericDocumentImportService {
? defaultAccessContext()
: sourceDescriptor.accessContext();
Optional<Document> existingBySource = resolveExistingBySourceIdentifier(sourceDescriptor);
if (existingBySource.isPresent()) {
Document document = existingBySource.get();
List<String> warnings = List.of("Source identifier already imported; returning existing document");
return new ImportedDocumentResult(document, detection, warnings, true);
}
if (properties.isDeduplicateByContentHash()) {
Optional<Document> existing = resolveDeduplicatedDocument(dedupHash, accessContext);
if (existing.isPresent()) {
@ -506,6 +514,16 @@ public class GenericDocumentImportService {
.findFirst();
}
private Optional<Document> resolveExistingBySourceIdentifier(SourceDescriptor sourceDescriptor) {
if (!StringUtils.hasText(sourceDescriptor.sourceIdentifier())) {
return Optional.empty();
}
return documentSourceRepository.findBySourceTypeAndExternalSourceId(
sourceDescriptor.sourceType(),
sourceDescriptor.sourceIdentifier())
.map(DocumentSource::getDocument);
}
private boolean matchesAccessContext(Document document, DocumentAccessContext accessContext) {
String expectedTenantKey = accessContext.ownerTenant() == null ? null : accessContext.ownerTenant().tenantKey();
if (!equalsNullable(document.getOwnerTenant() != null ? document.getOwnerTenant().getTenantKey() : null, expectedTenantKey)) {

@ -1,10 +1,14 @@
package at.procon.dip.ingestion.service;
import at.procon.dip.domain.document.entity.MailRecipientType;
import at.procon.dip.ingestion.util.DocumentImportSupport;
import jakarta.mail.Address;
import jakarta.mail.BodyPart;
import jakarta.mail.Message;
import jakarta.mail.Multipart;
import jakarta.mail.Part;
import jakarta.mail.Session;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeMessage;
import jakarta.mail.internet.MimeUtility;
import java.io.ByteArrayInputStream;
@ -16,6 +20,7 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jsoup.Jsoup;
import org.springframework.stereotype.Service;
@ -28,51 +33,90 @@ public class MailMessageExtractionService {
try {
Session session = Session.getDefaultInstance(new Properties());
MimeMessage message = new MimeMessage(session, new ByteArrayInputStream(rawMime));
String subject = message.getSubject();
String from = message.getFrom() != null && message.getFrom().length > 0 ? message.getFrom()[0].toString() : null;
List<String> recipients = new ArrayList<>();
if (message.getAllRecipients() != null) {
for (var recipient : message.getAllRecipients()) {
recipients.add(recipient.toString());
}
}
ParsedAddress fromAddress = firstAddress(message.getFrom());
ParsedAddress replyToAddress = firstAddress(message.getReplyTo());
List<MailRecipient> recipients = new ArrayList<>();
collectRecipients(message, Message.RecipientType.TO, MailRecipientType.TO, recipients);
collectRecipients(message, Message.RecipientType.CC, MailRecipientType.CC, recipients);
collectRecipients(message, Message.RecipientType.BCC, MailRecipientType.BCC, recipients);
StringBuilder text = new StringBuilder();
StringBuilder html = new StringBuilder();
List<MailAttachment> attachments = new ArrayList<>();
processPart(message, text, html, attachments);
AtomicInteger attachmentCounter = new AtomicInteger();
processPart(message, text, html, attachments, "0", attachmentCounter);
String normalizedText = text.length() > 0 ? text.toString().trim() : htmlToText(html.toString());
OffsetDateTime receivedAt = message.getReceivedDate() == null ? OffsetDateTime.now()
: message.getReceivedDate().toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime();
return new ParsedMailMessage(subject, from, recipients, receivedAt, normalizedText, html.toString(), attachments);
OffsetDateTime sentAt = message.getSentDate() == null ? null
: message.getSentDate().toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime();
String subject = message.getSubject();
return new ParsedMailMessage(
subject,
normalizeSubject(subject),
fromAddress.displayName(),
fromAddress.emailAddress(),
fromAddress.rawValue(),
replyToAddress.rawValue(),
readHeader(message, "Message-ID"),
readHeader(message, "In-Reply-To"),
readJoinedHeader(message, "References"),
sentAt,
receivedAt,
normalizedText,
html.toString(),
recipients,
attachments
);
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse MIME message", e);
}
}
private void processPart(Part part, StringBuilder text, StringBuilder html, List<MailAttachment> attachments) throws Exception {
private void collectRecipients(MimeMessage message,
Message.RecipientType recipientType,
MailRecipientType mappedType,
List<MailRecipient> target) throws Exception {
Address[] addresses = message.getRecipients(recipientType);
if (addresses == null) {
return;
}
int sortOrder = target.size();
for (Address address : addresses) {
ParsedAddress parsed = parseAddress(address);
target.add(new MailRecipient(mappedType, parsed.displayName(), parsed.emailAddress(), parsed.rawValue(), sortOrder++));
}
}
private void processPart(Part part,
StringBuilder text,
StringBuilder html,
List<MailAttachment> attachments,
String partPath,
AtomicInteger attachmentCounter) throws Exception {
String disposition = part.getDisposition();
String contentType = part.getContentType() == null ? "application/octet-stream" : part.getContentType();
if (disposition != null && (Part.ATTACHMENT.equalsIgnoreCase(disposition) || Part.INLINE.equalsIgnoreCase(disposition))
&& part.getFileName() != null) {
attachments.add(extractAttachment(part));
attachments.add(extractAttachment(part, partPath, attachmentCounter.incrementAndGet()));
return;
}
Object content = part.getContent();
if (content instanceof Multipart multipart) {
for (int i = 0; i < multipart.getCount(); i++) {
BodyPart bodyPart = multipart.getBodyPart(i);
processPart(bodyPart, text, html, attachments);
processPart(bodyPart, text, html, attachments, partPath + "." + i, attachmentCounter);
}
} else if (contentType.toLowerCase().contains("text/plain")) {
text.append(content.toString()).append("");
text.append(content.toString()).append("\n");
} else if (contentType.toLowerCase().contains("text/html")) {
html.append(content.toString()).append("");
html.append(content.toString()).append("\n");
} else if (part.getFileName() != null) {
attachments.add(extractAttachment(part));
attachments.add(extractAttachment(part, partPath, attachmentCounter.incrementAndGet()));
}
}
private MailAttachment extractAttachment(Part part) throws Exception {
private MailAttachment extractAttachment(Part part, String partPath, int attachmentIndex) throws Exception {
String fileName = part.getFileName();
if (fileName == null) {
fileName = "attachment";
@ -87,7 +131,84 @@ public class MailMessageExtractionService {
in.transferTo(out);
data = out.toByteArray();
}
return new MailAttachment(fileName, contentType, data, data.length, null);
return new MailAttachment(
fileName,
contentType,
data,
Long.valueOf(data.length),
null,
partPath,
normalizeDisposition(part.getDisposition()),
readHeader(part, "Content-ID"),
attachmentIndex
);
}
private ParsedAddress firstAddress(Address[] addresses) {
if (addresses == null || addresses.length == 0) {
return new ParsedAddress(null, null, null);
}
return parseAddress(addresses[0]);
}
private ParsedAddress parseAddress(Address address) {
if (address == null) {
return new ParsedAddress(null, null, null);
}
if (address instanceof InternetAddress internetAddress) {
String displayName = internetAddress.getPersonal();
String emailAddress = internetAddress.getAddress();
return new ParsedAddress(displayName, emailAddress, internetAddress.toUnicodeString());
}
return new ParsedAddress(null, null, address.toString());
}
private String normalizeSubject(String subject) {
if (subject == null || subject.isBlank()) {
return null;
}
String normalized = subject.trim();
boolean changed;
do {
changed = false;
String next = normalized.replaceFirst("(?i)^(re|fw|fwd)\\s*:\\s*", "").trim();
if (!next.equals(normalized)) {
normalized = next;
changed = true;
}
} while (changed && !normalized.isBlank());
return normalized.replaceAll("\\s+", " ");
}
private String normalizeDisposition(String disposition) {
if (disposition == null || disposition.isBlank()) {
return null;
}
return disposition.trim().toUpperCase();
}
private String readHeader(Part part, String name) {
try {
String[] headerValues = part.getHeader(name);
if (headerValues == null || headerValues.length == 0) {
return null;
}
return headerValues[0];
} catch (Exception e) {
return null;
}
}
private String readJoinedHeader(MimeMessage message, String name) {
try {
String[] headerValues = message.getHeader(name);
if (headerValues == null || headerValues.length == 0) {
return null;
}
return String.join(" ", headerValues);
} catch (Exception e) {
return null;
}
}
private String htmlToText(String html) {
@ -95,27 +216,70 @@ public class MailMessageExtractionService {
return "";
}
try {
return Jsoup.parse(html).text().replaceAll("\s+", " ").trim();
return Jsoup.parse(html).text().replaceAll("\\s+", " ").trim();
} catch (Exception e) {
log.debug("Falling back to naive HTML cleanup: {}", e.getMessage());
return html.replaceAll("<[^>]+>", " ").replaceAll("\s+", " ").trim();
return html.replaceAll("<[^>]+>", " ").replaceAll("\\s+", " ").trim();
}
}
public String serializeMessage(ParsedMailMessage parsed) {
StringBuilder sb = new StringBuilder();
if (parsed.subject() != null) sb.append("Subject: ").append(parsed.subject()).append("");
if (parsed.from() != null) sb.append("From: ").append(parsed.from()).append("");
if (!parsed.recipients().isEmpty()) sb.append("To: ").append(String.join(", ", parsed.recipients())).append("");
sb.append("");
if (parsed.subject() != null) sb.append("Subject: ").append(parsed.subject()).append("\n");
if (parsed.fromRaw() != null) sb.append("From: ").append(parsed.fromRaw()).append("\n");
List<String> toRecipients = parsed.recipients().stream()
.filter(recipient -> recipient.recipientType() == MailRecipientType.TO)
.map(MailRecipient::rawValue)
.toList();
if (!toRecipients.isEmpty()) sb.append("To: ").append(String.join(", ", toRecipients)).append("\n");
sb.append("\n");
if (parsed.textBody() != null) sb.append(parsed.textBody());
return sb.toString().trim();
}
public record ParsedMailMessage(String subject, String from, List<String> recipients, OffsetDateTime receivedAt,
String textBody, String htmlBody, List<MailAttachment> attachments) {}
public record ParsedMailMessage(
String subject,
String normalizedSubject,
String fromDisplayName,
String fromEmail,
String fromRaw,
String replyToRaw,
String messageId,
String inReplyTo,
String referencesHeader,
OffsetDateTime sentAt,
OffsetDateTime receivedAt,
String textBody,
String htmlBody,
List<MailRecipient> recipients,
List<MailAttachment> attachments
) {
}
public record MailRecipient(
MailRecipientType recipientType,
String displayName,
String emailAddress,
String rawValue,
int sortOrder
) {
}
public record MailAttachment(
String fileName,
String contentType,
byte[] data,
Long sizeBytes,
String path,
String partPath,
String disposition,
String contentId,
Integer attachmentIndex
) {
public MailAttachment(String fileName, String contentType, byte[] data, long sizeBytes, String path) {
this(fileName, contentType, data, sizeBytes, path, null, null, null, null);
}
public record MailAttachment(String fileName, String contentType, byte[] data, long sizeBytes, String path) {
public String safeTextPreview() {
String extension = DocumentImportSupport.extensionOf(fileName);
String mime = DocumentImportSupport.normalizeMediaType(contentType);
@ -128,4 +292,7 @@ public class MailMessageExtractionService {
return DocumentImportSupport.normalizeText(new String(data, StandardCharsets.UTF_8));
}
}
private record ParsedAddress(String displayName, String emailAddress, String rawValue) {
}
}

@ -0,0 +1,137 @@
package at.procon.dip.ingestion.service;
import at.procon.dip.domain.document.entity.DocumentMailAttachment;
import at.procon.dip.domain.document.entity.DocumentMailMessage;
import at.procon.dip.domain.document.entity.DocumentMailRecipient;
import at.procon.dip.domain.document.repository.DocumentMailAttachmentRepository;
import at.procon.dip.domain.document.repository.DocumentMailMessageRepository;
import at.procon.dip.domain.document.repository.DocumentMailRecipientRepository;
import at.procon.dip.ingestion.mail.MailProviderEnvelope;
import at.procon.ted.util.HashUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@RequiredArgsConstructor
@Transactional
public class MailMetadataPersistenceService {
private final DocumentMailMessageRepository mailMessageRepository;
private final DocumentMailRecipientRepository recipientRepository;
private final DocumentMailAttachmentRepository attachmentRepository;
public void upsertMessageMetadata(UUID documentId,
UUID sourceId,
MailMessageExtractionService.ParsedMailMessage parsed,
MailProviderEnvelope envelope,
String rawMessageHash) {
DocumentMailMessage entity = mailMessageRepository.findById(documentId)
.orElse(DocumentMailMessage.builder().documentId(documentId).build());
entity.setSourceId(sourceId);
entity.setProviderType(envelope == null ? at.procon.dip.ingestion.mail.MailProviderType.GENERIC : envelope.providerType());
entity.setAccountKey(envelope == null ? null : envelope.accountKey());
entity.setFolderKey(envelope == null ? null : envelope.folderKey());
entity.setProviderMessageKey(envelope == null ? null : envelope.providerMessageKey());
entity.setProviderThreadKey(envelope == null ? null : envelope.providerThreadKey());
entity.setMessageId(parsed.messageId());
entity.setInReplyTo(parsed.inReplyTo());
entity.setReferencesHeader(parsed.referencesHeader());
entity.setThreadKey(resolveThreadKey(parsed, envelope));
entity.setSubject(parsed.subject());
entity.setNormalizedSubject(parsed.normalizedSubject());
entity.setFromDisplayName(parsed.fromDisplayName());
entity.setFromEmail(parsed.fromEmail());
entity.setFromRaw(parsed.fromRaw());
entity.setReplyToRaw(parsed.replyToRaw());
entity.setSentAt(parsed.sentAt());
entity.setReceivedAt(parsed.receivedAt());
entity.setRawMessageHash(rawMessageHash);
entity.setRawHeaderHash(HashUtils.computeSha256(buildHeaderFingerprint(parsed)));
mailMessageRepository.save(entity);
recipientRepository.deleteByMailDocumentId(documentId);
List<DocumentMailRecipient> recipients = new ArrayList<>();
for (MailMessageExtractionService.MailRecipient recipient : parsed.recipients()) {
recipients.add(DocumentMailRecipient.builder()
.mailDocumentId(documentId)
.recipientType(recipient.recipientType())
.displayName(recipient.displayName())
.emailAddress(recipient.emailAddress())
.rawValue(recipient.rawValue())
.sortOrder(recipient.sortOrder())
.build());
}
recipientRepository.saveAll(recipients);
}
public void upsertAttachmentMetadata(UUID mailDocumentId,
UUID attachmentDocumentId,
MailMessageExtractionService.MailAttachment attachment,
String extractionStatus,
String errorMessage) {
Optional<DocumentMailAttachment> existing = Optional.empty();
if (attachment.attachmentIndex() != null) {
existing = attachmentRepository.findByMailDocumentIdAndAttachmentIndex(mailDocumentId, attachment.attachmentIndex());
}
if (existing.isEmpty() && attachment.partPath() != null && !attachment.partPath().isBlank()) {
existing = attachmentRepository.findByMailDocumentIdAndPartPath(mailDocumentId, attachment.partPath());
}
DocumentMailAttachment entity = existing.orElse(DocumentMailAttachment.builder()
.mailDocumentId(mailDocumentId)
.attachmentDocumentId(attachmentDocumentId)
.build());
entity.setAttachmentDocumentId(attachmentDocumentId);
entity.setDisposition(attachment.disposition());
entity.setContentId(attachment.contentId());
entity.setFilename(attachment.fileName());
entity.setMimeType(attachment.contentType());
entity.setSizeBytes(attachment.sizeBytes());
entity.setAttachmentIndex(attachment.attachmentIndex());
entity.setPartPath(attachment.partPath());
entity.setPathInArchive(attachment.path());
entity.setExtractionStatus(extractionStatus == null || extractionStatus.isBlank() ? "IMPORTED" : extractionStatus);
entity.setErrorMessage(errorMessage);
attachmentRepository.save(entity);
}
private String resolveThreadKey(MailMessageExtractionService.ParsedMailMessage parsed, MailProviderEnvelope envelope) {
if (envelope != null && envelope.providerThreadKey() != null && !envelope.providerThreadKey().isBlank()) {
return envelope.providerThreadKey();
}
if (parsed.referencesHeader() != null && !parsed.referencesHeader().isBlank()) {
String[] refs = parsed.referencesHeader().trim().split("\\s+");
return refs.length == 0 ? parsed.referencesHeader() : refs[0];
}
if (parsed.inReplyTo() != null && !parsed.inReplyTo().isBlank()) {
return parsed.inReplyTo();
}
if (parsed.messageId() != null && !parsed.messageId().isBlank()) {
return parsed.messageId();
}
return parsed.normalizedSubject();
}
private String buildHeaderFingerprint(MailMessageExtractionService.ParsedMailMessage parsed) {
return String.join("|",
safe(parsed.subject()),
safe(parsed.fromRaw()),
safe(parsed.replyToRaw()),
safe(parsed.messageId()),
safe(parsed.inReplyTo()),
safe(parsed.referencesHeader()),
safe(parsed.sentAt() == null ? null : parsed.sentAt().toString()),
safe(parsed.receivedAt() == null ? null : parsed.receivedAt().toString())
);
}
private String safe(String value) {
return value == null ? "" : value;
}
}

@ -3,6 +3,9 @@ package at.procon.ted.camel;
import at.procon.dip.domain.document.SourceType;
import at.procon.ted.config.TedProcessorProperties;
import at.procon.dip.ingestion.service.DocumentIngestionGateway;
import at.procon.dip.ingestion.mail.GenericMailProviderEnvelope;
import at.procon.dip.ingestion.mail.MailProviderEnvelopeAttributes;
import at.procon.dip.ingestion.mail.MailProviderType;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.service.attachment.AttachmentExtractor;
@ -187,9 +190,19 @@ public class MailRoute extends RouteBuilder {
: exchange.getIn().getHeader("mailReceivedDate", Date.class).toInstant()
.atZone(ZoneId.systemDefault()).toOffsetDateTime(),
OriginalContentStoragePolicy.DEFAULT,
MailProviderEnvelopeAttributes.merge(
Map.of(
"subject", subject != null ? subject : "",
"from", from != null ? from : ""
),
new GenericMailProviderEnvelope(
MailProviderType.IMAP,
mail.getUsername(),
mail.getFolderName(),
resolveProviderMessageKey(exchange, mailMessage, sourceIdentifier),
resolveProviderThreadKey(mailMessage),
Map.of()
)
)
));
if (!result.warnings().isEmpty()) {
@ -534,4 +547,42 @@ public class MailRoute extends RouteBuilder {
private byte[] data;
private int size;
}
private String resolveProviderMessageKey(Exchange exchange, Message mailMessage, String fallback) throws Exception {
String camelUid = firstNonBlank(
exchange.getIn().getHeader("CamelMailUid", String.class),
exchange.getIn().getHeader("uid", String.class),
exchange.getIn().getHeader("mailUid", String.class)
);
if (camelUid != null) {
return camelUid;
}
String[] messageIdHeader = mailMessage.getHeader("Message-ID");
if (messageIdHeader != null && messageIdHeader.length > 0 && messageIdHeader[0] != null && !messageIdHeader[0].isBlank()) {
return messageIdHeader[0];
}
return fallback;
}
private String resolveProviderThreadKey(Message mailMessage) throws Exception {
String[] references = mailMessage.getHeader("References");
if (references != null && references.length > 0 && references[0] != null && !references[0].isBlank()) {
return references[0];
}
String[] inReplyTo = mailMessage.getHeader("In-Reply-To");
if (inReplyTo != null && inReplyTo.length > 0 && inReplyTo[0] != null && !inReplyTo[0].isBlank()) {
return inReplyTo[0];
}
return null;
}
private String firstNonBlank(String... values) {
for (String value : values) {
if (value != null && !value.isBlank()) {
return value;
}
}
return null;
}
}

@ -1,6 +1,7 @@
dip:
runtime:
mode: NEW
search:
# Default page size for search results
default-page-size: 20
@ -38,9 +39,11 @@ dip:
embedding:
enabled: true
jobs:
enabled: false
enabled: true
parallel-batch-count: 2
process-in-batches: true
execution-batch-size: 20
batch-size: 48
execution-batch-size: 48
default-document-model: e5-default
default-query-model: e5-default
@ -53,21 +56,25 @@ dip:
external-e5:
type: http-json
base-url: http://localhost:8001
base-url: http://172.20.20.6:8001
connect-timeout: 5s
read-timeout: 60s
batch-request:
truncate-text: false
truncate-length: 512
chunk-size: 16
vector-sync-e5:
type: http-vector-sync
base-url: http://localhost:8001
base-url: http://172.20.20.6:8001
connect-timeout: 30s
read-timeout: 300s
headers:
X-Client: dip
batch-request:
truncate-text: true
truncate-text: false
truncate-length: 512
chunk-size: 8
chunk-size: 16
models:
@ -219,6 +226,43 @@ dip:
# Import batch marker for mail roots and attachments
mail-import-batch-id: phase41-mail
# NEW Camel mail consumer route for provider-driven mail ingestion
mail-route:
# Enable/disable the NEW Camel mail consumer
enabled: false
# Generic mail server protocol (IMAP/IMAPS/POP3/POP3S)
protocol: IMAPS
# Mail server host
host: mail.mymagenta.business
# Mail server port; leave empty to use Camel component defaults
port: 993
# Mailbox username
username: archiv@procon.co.at
# Mailbox password
password: ${MAIL_PASSWORD:worasigg}
# Folder/mailbox name
folder-name: INBOX
# Optional stable provider account key; falls back to username
account-key:
# Delete messages after successful processing
delete: false
# Consume only unseen messages
unseen: true
# Keep messages unread while consuming
peek: true
# Poll delay in milliseconds
delay: 15000
# Maximum messages per poll
max-messages-per-poll: 20
# Fetch entire messages by default
fetch-size: 10
# Close folder after each poll cycle
close-folder: false
# Camel mail debug mode
debug-mode: false
# Socket connection timeout in milliseconds
connection-timeout: 30000
# ted packages download configuration
ted-download:
# Enable/disable automatic package download
@ -254,6 +298,8 @@ dip:
startup-backfill-enabled: false
# Maximum number of legacy TED documents to backfill during startup
startup-backfill-limit: 250
structured-search-hybrid-candidate-limit: 5000
structured-search-facet-bucket-limit: 12
migration:
legacy-audit:
@ -289,8 +335,8 @@ dip:
build-chunk-representations: true
legacy-ted-embeddings:
enabled: true
startup-enabled: true
enabled: false
startup-enabled: false
batch-size: 500
max-documents-per-run: 0
skip-when-primary-representation-missing: true

@ -0,0 +1,90 @@
SET search_path TO DOC, TED, public;
CREATE TABLE IF NOT EXISTS DOC.doc_mail_message (
document_id uuid PRIMARY KEY REFERENCES DOC.doc_document(id) ON DELETE CASCADE,
source_id uuid REFERENCES DOC.doc_source(id) ON DELETE SET NULL,
provider_type varchar(64) NOT NULL,
account_key varchar(255),
folder_key varchar(255),
provider_message_key varchar(500),
provider_thread_key varchar(500),
message_id varchar(1000),
in_reply_to varchar(1000),
references_header text,
thread_key varchar(1000),
subject varchar(1000),
normalized_subject varchar(1000),
from_display_name varchar(500),
from_email varchar(500),
from_raw varchar(1000),
reply_to_raw text,
sent_at timestamptz,
received_at timestamptz,
raw_message_hash varchar(64),
raw_header_hash varchar(64),
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_doc_mail_message_source_id
ON DOC.doc_mail_message (source_id);
CREATE INDEX IF NOT EXISTS idx_doc_mail_message_message_id
ON DOC.doc_mail_message (message_id);
CREATE INDEX IF NOT EXISTS idx_doc_mail_message_thread_key
ON DOC.doc_mail_message (thread_key);
CREATE INDEX IF NOT EXISTS idx_doc_mail_message_provider_identity
ON DOC.doc_mail_message (provider_type, account_key, folder_key, provider_message_key);
CREATE UNIQUE INDEX IF NOT EXISTS uq_doc_mail_message_provider_identity
ON DOC.doc_mail_message (provider_type, account_key, folder_key, provider_message_key)
WHERE provider_message_key IS NOT NULL;
CREATE TABLE IF NOT EXISTS DOC.doc_mail_recipient (
id uuid PRIMARY KEY,
mail_document_id uuid NOT NULL REFERENCES DOC.doc_mail_message(document_id) ON DELETE CASCADE,
recipient_type varchar(16) NOT NULL,
display_name varchar(500),
email_address varchar(500),
raw_value text,
sort_order integer NOT NULL DEFAULT 0,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_doc_mail_recipient_mail
ON DOC.doc_mail_recipient (mail_document_id);
CREATE INDEX IF NOT EXISTS idx_doc_mail_recipient_type
ON DOC.doc_mail_recipient (recipient_type);
CREATE INDEX IF NOT EXISTS idx_doc_mail_recipient_email
ON DOC.doc_mail_recipient (email_address);
CREATE TABLE IF NOT EXISTS DOC.doc_mail_attachment (
id uuid PRIMARY KEY,
mail_document_id uuid NOT NULL REFERENCES DOC.doc_mail_message(document_id) ON DELETE CASCADE,
attachment_document_id uuid NOT NULL REFERENCES DOC.doc_document(id) ON DELETE CASCADE,
disposition varchar(32),
content_id varchar(500),
filename varchar(1000),
mime_type varchar(255),
size_bytes bigint,
attachment_index integer,
part_path varchar(500),
path_in_archive text,
extraction_status varchar(32) NOT NULL DEFAULT 'IMPORTED',
error_message text,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_doc_mail_attachment_mail
ON DOC.doc_mail_attachment (mail_document_id);
CREATE INDEX IF NOT EXISTS idx_doc_mail_attachment_document
ON DOC.doc_mail_attachment (attachment_document_id);
CREATE INDEX IF NOT EXISTS idx_doc_mail_attachment_part_path
ON DOC.doc_mail_attachment (part_path);
CREATE INDEX IF NOT EXISTS idx_doc_mail_attachment_attachment_index
ON DOC.doc_mail_attachment (attachment_index);
CREATE UNIQUE INDEX IF NOT EXISTS uq_doc_mail_attachment_mail_index
ON DOC.doc_mail_attachment (mail_document_id, attachment_index)
WHERE attachment_index IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS uq_doc_mail_attachment_mail_part
ON DOC.doc_mail_attachment (mail_document_id, part_path)
WHERE part_path IS NOT NULL;

@ -22,15 +22,15 @@ class NewRuntimeMustNotDependOnTedProcessorPropertiesTest {
List<Class<?>> newRuntimeClasses = List.of(
at.procon.dip.ingestion.service.GenericDocumentImportService.class,
at.procon.dip.ingestion.camel.GenericFileSystemIngestionRoute.class,
at.procon.dip.ingestion.camel.GenericMailIngestionRoute.class,
at.procon.dip.ingestion.controller.GenericDocumentImportController.class,
at.procon.dip.ingestion.adapter.MailDocumentIngestionAdapter.class,
at.procon.dip.ingestion.service.MailMetadataPersistenceService.class,
at.procon.dip.ingestion.mail.MailImportIdentityResolver.class,
at.procon.dip.ingestion.adapter.TedPackageDocumentIngestionAdapter.class,
at.procon.dip.ingestion.service.TedPackageChildImportProcessor.class,
at.procon.dip.domain.ted.service.TedNoticeProjectionService.class,
at.procon.dip.domain.ted.startup.TedProjectionStartupRunner.class,
at.procon.dip.domain.ted.search.TedStructuredSearchRepository.class,
at.procon.dip.domain.ted.service.TedStructuredSearchService.class,
at.procon.dip.domain.ted.web.TedStructuredSearchController.class,
at.procon.dip.search.engine.fulltext.PostgresFullTextSearchEngine.class,
at.procon.dip.search.engine.trigram.PostgresTrigramSearchEngine.class,
at.procon.dip.search.engine.semantic.PgVectorSemanticSearchEngine.class,

@ -2,7 +2,6 @@ package at.procon.dip.embedding.service;
import at.procon.dip.domain.document.DistanceMetric;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.entity.DocumentEmbedding;
import at.procon.dip.domain.document.entity.DocumentTextRepresentation;
import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository;
import at.procon.dip.embedding.config.EmbeddingProperties;
@ -35,7 +34,7 @@ class RepresentationEmbeddingOrchestratorTest {
@Mock
private EmbeddingExecutionService executionService;
@Mock
private EmbeddingPersistenceService persistenceService;
private EmbeddingJobExecutionPersistenceService executionPersistenceService;
@Mock
private DocumentTextRepresentationRepository representationRepository;
@Mock
@ -57,11 +56,12 @@ class RepresentationEmbeddingOrchestratorTest {
orchestrator = new RepresentationEmbeddingOrchestrator(
jobService,
executionService,
persistenceService,
executionPersistenceService,
representationRepository,
selectionPolicy,
modelRegistry,
properties
properties,
Runnable::run
);
}
@ -92,8 +92,8 @@ class RepresentationEmbeddingOrchestratorTest {
);
when(representationRepository.findById(representationId)).thenReturn(Optional.of(representation));
when(modelRegistry.getRequired("mock-search")).thenReturn(model);
when(persistenceService.ensurePending(representationId, "mock-search"))
.thenReturn(DocumentEmbedding.builder().id(embeddingId).build());
when(executionPersistenceService.startProcessing(representationId, "mock-search"))
.thenReturn(embeddingId);
when(executionService.embedTexts(eq("mock-search"), eq(EmbeddingUseCase.DOCUMENT), any()))
.thenReturn(new EmbeddingProviderResult(
model,
@ -105,9 +105,8 @@ class RepresentationEmbeddingOrchestratorTest {
orchestrator.processClaimedJob(job);
verify(persistenceService).markProcessing(embeddingId);
verify(persistenceService).saveCompleted(eq(embeddingId), any(EmbeddingProviderResult.class));
verify(jobService).markCompleted(job.getId(), "req-1");
verify(executionPersistenceService).startProcessing(representationId, "mock-search");
verify(executionPersistenceService).completeJob(eq(embeddingId), any(EmbeddingProviderResult.class), eq(job.getId()), eq("req-1"));
}
@Test
@ -155,10 +154,10 @@ class RepresentationEmbeddingOrchestratorTest {
DistanceMetric.COSINE, true, true, 4096, true
);
when(modelRegistry.getRequired("e5-default")).thenReturn(model);
when(persistenceService.ensurePending(representationId1, "e5-default"))
.thenReturn(DocumentEmbedding.builder().id(embeddingId1).build());
when(persistenceService.ensurePending(representationId2, "e5-default"))
.thenReturn(DocumentEmbedding.builder().id(embeddingId2).build());
when(executionPersistenceService.startProcessing(representationId1, "e5-default"))
.thenReturn(embeddingId1);
when(executionPersistenceService.startProcessing(representationId2, "e5-default"))
.thenReturn(embeddingId2);
when(executionService.embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), any()))
.thenReturn(new EmbeddingProviderResult(
model,
@ -171,15 +170,13 @@ class RepresentationEmbeddingOrchestratorTest {
int processed = orchestrator.processNextReadyBatch();
assertThat(processed).isEqualTo(2);
verify(persistenceService).markProcessing(embeddingId1);
verify(persistenceService).markProcessing(embeddingId2);
verify(executionPersistenceService).startProcessing(representationId1, "e5-default");
verify(executionPersistenceService).startProcessing(representationId2, "e5-default");
ArgumentCaptor<List<String>> textsCaptor = ArgumentCaptor.forClass(List.class);
verify(executionService, times(1)).embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), textsCaptor.capture());
assertThat(textsCaptor.getValue()).containsExactly("alpha", "beta");
verify(persistenceService).saveCompleted(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null));
verify(persistenceService).saveCompleted(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null));
verify(jobService).markCompleted(job1.getId(), "batch-req-1");
verify(jobService).markCompleted(job2.getId(), "batch-req-1");
verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1"));
verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1"));
}
@Test
@ -209,8 +206,8 @@ class RepresentationEmbeddingOrchestratorTest {
DistanceMetric.COSINE, true, false, 4096, true
);
when(modelRegistry.getRequired("mock-search")).thenReturn(model);
when(persistenceService.ensurePending(representationId, "mock-search"))
.thenReturn(DocumentEmbedding.builder().id(embeddingId).build());
when(executionPersistenceService.startProcessing(representationId, "mock-search"))
.thenReturn(embeddingId);
when(executionService.embedTexts(eq("mock-search"), eq(EmbeddingUseCase.DOCUMENT), any()))
.thenReturn(new EmbeddingProviderResult(
model,
@ -223,7 +220,73 @@ class RepresentationEmbeddingOrchestratorTest {
orchestrator.processNextReadyBatch();
verify(executionService, times(1)).embedTexts(eq("mock-search"), eq(EmbeddingUseCase.DOCUMENT), eq(List.of("gamma")));
verify(persistenceService, never()).saveCompleted(eq(embeddingId), any(float[].class), eq(null));
verify(persistenceService).saveCompleted(eq(embeddingId), any(EmbeddingProviderResult.class));
verify(executionPersistenceService, never()).completeJob(eq(embeddingId), any(float[].class), eq(null), eq(job.getId()), anyString());
verify(executionPersistenceService).completeJob(eq(embeddingId), any(EmbeddingProviderResult.class), eq(job.getId()), eq("req-2"));
}
@Test
void processNextReadyBatch_should_claim_multiple_batches_and_start_them_in_parallel() {
properties.getJobs().setParallelBatchCount(2);
properties.getJobs().setBatchSize(2);
properties.getJobs().setProcessInBatches(true);
properties.getJobs().setExecutionBatchSize(2);
UUID documentId = UUID.randomUUID();
UUID representationId1 = UUID.randomUUID();
UUID representationId2 = UUID.randomUUID();
UUID representationId3 = UUID.randomUUID();
UUID representationId4 = UUID.randomUUID();
UUID embeddingId1 = UUID.randomUUID();
UUID embeddingId2 = UUID.randomUUID();
UUID embeddingId3 = UUID.randomUUID();
UUID embeddingId4 = UUID.randomUUID();
EmbeddingJob job1 = EmbeddingJob.builder().id(UUID.randomUUID()).documentId(documentId).representationId(representationId1).modelKey("e5-default").jobType(EmbeddingJobType.DOCUMENT_EMBED).status(EmbeddingJobStatus.IN_PROGRESS).attemptCount(1).build();
EmbeddingJob job2 = EmbeddingJob.builder().id(UUID.randomUUID()).documentId(documentId).representationId(representationId2).modelKey("e5-default").jobType(EmbeddingJobType.DOCUMENT_EMBED).status(EmbeddingJobStatus.IN_PROGRESS).attemptCount(1).build();
EmbeddingJob job3 = EmbeddingJob.builder().id(UUID.randomUUID()).documentId(documentId).representationId(representationId3).modelKey("e5-default").jobType(EmbeddingJobType.DOCUMENT_EMBED).status(EmbeddingJobStatus.IN_PROGRESS).attemptCount(1).build();
EmbeddingJob job4 = EmbeddingJob.builder().id(UUID.randomUUID()).documentId(documentId).representationId(representationId4).modelKey("e5-default").jobType(EmbeddingJobType.DOCUMENT_EMBED).status(EmbeddingJobStatus.IN_PROGRESS).attemptCount(1).build();
when(jobService.claimNextReadyJobs(4)).thenReturn(List.of(job1, job2, job3, job4));
Document document = Document.builder().id(documentId).build();
when(representationRepository.findById(representationId1)).thenReturn(Optional.of(DocumentTextRepresentation.builder().id(representationId1).document(document).textBody("alpha").build()));
when(representationRepository.findById(representationId2)).thenReturn(Optional.of(DocumentTextRepresentation.builder().id(representationId2).document(document).textBody("beta").build()));
when(representationRepository.findById(representationId3)).thenReturn(Optional.of(DocumentTextRepresentation.builder().id(representationId3).document(document).textBody("gamma").build()));
when(representationRepository.findById(representationId4)).thenReturn(Optional.of(DocumentTextRepresentation.builder().id(representationId4).document(document).textBody("delta").build()));
EmbeddingModelDescriptor model = new EmbeddingModelDescriptor(
"e5-default", "vector-sync-e5", "intfloat/multilingual-e5-large", 3,
DistanceMetric.COSINE, true, true, 4096, true
);
when(modelRegistry.getRequired("e5-default")).thenReturn(model);
when(executionPersistenceService.startProcessing(representationId1, "e5-default")).thenReturn(embeddingId1);
when(executionPersistenceService.startProcessing(representationId2, "e5-default")).thenReturn(embeddingId2);
when(executionPersistenceService.startProcessing(representationId3, "e5-default")).thenReturn(embeddingId3);
when(executionPersistenceService.startProcessing(representationId4, "e5-default")).thenReturn(embeddingId4);
when(executionService.embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), any()))
.thenReturn(new EmbeddingProviderResult(
model,
List.of(new float[]{0.1f, 0.2f, 0.3f}, new float[]{0.4f, 0.5f, 0.6f}),
List.of(),
"batch-req-1",
21
))
.thenReturn(new EmbeddingProviderResult(
model,
List.of(new float[]{0.7f, 0.8f, 0.9f}, new float[]{1.0f, 1.1f, 1.2f}),
List.of(),
"batch-req-2",
25
));
int processed = orchestrator.processNextReadyBatch();
assertThat(processed).isEqualTo(4);
verify(jobService).claimNextReadyJobs(4);
verify(executionService, times(2)).embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), any());
verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1"));
verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1"));
verify(executionPersistenceService).completeJob(eq(embeddingId3), aryEq(new float[]{0.7f, 0.8f, 0.9f}), eq(null), eq(job3.getId()), eq("batch-req-2"));
verify(executionPersistenceService).completeJob(eq(embeddingId4), aryEq(new float[]{1.0f, 1.1f, 1.2f}), eq(null), eq(job4.getId()), eq("batch-req-2"));
}
}

@ -9,10 +9,13 @@ import at.procon.dip.domain.document.RelationType;
import at.procon.dip.domain.document.SourceType;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.service.DocumentRelationService;
import at.procon.dip.domain.document.repository.DocumentSourceRepository;
import at.procon.dip.ingestion.config.DipIngestionProperties;
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
import at.procon.dip.ingestion.service.GenericDocumentImportService;
import at.procon.dip.ingestion.service.MailMessageExtractionService;
import at.procon.dip.ingestion.service.MailMetadataPersistenceService;
import at.procon.dip.ingestion.mail.MailImportIdentityResolver;
import at.procon.dip.ingestion.spi.IngestionResult;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.service.attachment.ZipExtractionService;
@ -49,6 +52,12 @@ class MailDocumentIngestionAdapterBundleTest {
@Mock
private ZipExtractionService zipExtractionService;
@Mock
private MailMetadataPersistenceService mailMetadataPersistenceService;
@Mock
private DocumentSourceRepository documentSourceRepository;
private MailDocumentIngestionAdapter adapter;
@BeforeAll
@ -64,7 +73,8 @@ class MailDocumentIngestionAdapterBundleTest {
properties.setExpandMailZipAttachments(false);
properties.setMailImportBatchId("test-mail-bundle");
lenient().when(zipExtractionService.canHandle(any(), any())).thenReturn(false);
adapter = new MailDocumentIngestionAdapter(properties, importService, new MailMessageExtractionService(), relationService, zipExtractionService);
lenient().when(documentSourceRepository.findBySourceTypeAndExternalSourceId(any(), any())).thenReturn(java.util.Optional.empty());
adapter = new MailDocumentIngestionAdapter(properties, importService, new MailMessageExtractionService(), relationService, zipExtractionService, new MailImportIdentityResolver(), mailMetadataPersistenceService, documentSourceRepository);
}
@ParameterizedTest(name = "ingest {0}")

@ -10,11 +10,14 @@ import at.procon.dip.domain.document.RelationType;
import at.procon.dip.domain.document.SourceType;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.service.DocumentRelationService;
import at.procon.dip.domain.document.repository.DocumentSourceRepository;
import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand;
import at.procon.dip.ingestion.config.DipIngestionProperties;
import at.procon.dip.ingestion.dto.ImportedDocumentResult;
import at.procon.dip.ingestion.service.GenericDocumentImportService;
import at.procon.dip.ingestion.service.MailMessageExtractionService;
import at.procon.dip.ingestion.service.MailMetadataPersistenceService;
import at.procon.dip.ingestion.mail.MailImportIdentityResolver;
import at.procon.dip.ingestion.spi.IngestionResult;
import at.procon.dip.ingestion.spi.SourceDescriptor;
import at.procon.ted.service.attachment.ZipExtractionService;
@ -36,6 +39,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -51,6 +55,12 @@ class MailDocumentIngestionAdapterFileSystemTest {
@Mock
private ZipExtractionService zipExtractionService;
@Mock
private MailMetadataPersistenceService mailMetadataPersistenceService;
@Mock
private DocumentSourceRepository documentSourceRepository;
private MailDocumentIngestionAdapter adapter;
private final List<SourceDescriptor> importedDescriptors = new ArrayList<>();
@ -69,10 +79,14 @@ class MailDocumentIngestionAdapterFileSystemTest {
importService,
extractionService,
relationService,
zipExtractionService
zipExtractionService,
new MailImportIdentityResolver(),
mailMetadataPersistenceService,
documentSourceRepository
);
when(zipExtractionService.canHandle(any(), any())).thenReturn(false);
lenient().when(documentSourceRepository.findBySourceTypeAndExternalSourceId(any(), any())).thenReturn(java.util.Optional.empty());
when(relationService.ensureRelation(any())).thenReturn(null);
when(importService.importDocument(any())).thenAnswer(invocation -> {
SourceDescriptor descriptor = invocation.getArgument(0);
@ -129,6 +143,7 @@ class MailDocumentIngestionAdapterFileSystemTest {
.filter(d -> "notes.txt".equals(d.fileName()))
.findFirst()
.orElseThrow();
assertEquals(SourceType.MAIL_ATTACHMENT, textAttachment.sourceType());
assertEquals("text/plain", textAttachment.mediaType());
assertNotNull(textAttachment.textContent(), "plain text attachment should expose preview text");
assertTrue(textAttachment.textContent().contains("attachment notes"));
@ -137,6 +152,7 @@ class MailDocumentIngestionAdapterFileSystemTest {
.filter(d -> "legacy.xls".equals(d.fileName()))
.findFirst()
.orElseThrow();
assertEquals(SourceType.MAIL_ATTACHMENT, binaryAttachment.sourceType());
assertNull(binaryAttachment.textContent(), "binary old Excel attachment must not be passed as text content");
assertEquals("application/vnd.ms-excel", binaryAttachment.mediaType());
assertNotNull(binaryAttachment.binaryContent());

@ -15,6 +15,9 @@ import at.procon.dip.domain.document.repository.DocumentEmbeddingRepository;
import at.procon.dip.domain.document.repository.DocumentRelationRepository;
import at.procon.dip.domain.document.repository.DocumentRepository;
import at.procon.dip.domain.document.repository.DocumentSourceRepository;
import at.procon.dip.domain.document.repository.DocumentMailAttachmentRepository;
import at.procon.dip.domain.document.repository.DocumentMailMessageRepository;
import at.procon.dip.domain.document.repository.DocumentMailRecipientRepository;
import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository;
import at.procon.dip.domain.document.service.DocumentContentService;
import at.procon.dip.domain.document.service.DocumentEmbeddingService;
@ -30,6 +33,8 @@ import at.procon.dip.ingestion.config.DipIngestionProperties;
import at.procon.dip.ingestion.service.DocumentIngestionGateway;
import at.procon.dip.ingestion.service.GenericDocumentImportService;
import at.procon.dip.ingestion.service.MailMessageExtractionService;
import at.procon.dip.ingestion.service.MailMetadataPersistenceService;
import at.procon.dip.ingestion.mail.MailImportIdentityResolver;
import at.procon.dip.ingestion.spi.IngestionResult;
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
import at.procon.dip.ingestion.spi.SourceDescriptor;
@ -88,16 +93,16 @@ import static org.assertj.core.api.Assertions.assertThat;
"spring.jpa.open-in-view=false",
"spring.jpa.properties.hibernate.default_schema=DOC",
"ted.vectorization.enabled=false",
"ted.generic-ingestion.enabled=true",
"ted.generic-ingestion.mail-adapter-enabled=true",
"ted.generic-ingestion.file-system-enabled=false",
"ted.generic-ingestion.rest-upload-enabled=false",
"ted.generic-ingestion.deduplicate-by-content-hash=false",
"ted.generic-ingestion.expand-mail-zip-attachments=true",
"ted.generic-ingestion.default-visibility=PUBLIC",
"ted.generic-ingestion.mail-default-visibility=RESTRICTED",
"ted.generic-ingestion.import-batch-id=test-mail-bundle",
"ted.generic-ingestion.mail-import-batch-id=test-mail-bundle-mail"
"dip.ingestion.enabled=true",
"dip.ingestion.mail-adapter-enabled=true",
"dip.ingestion.file-system-enabled=false",
"dip.ingestion.rest-upload-enabled=false",
"dip.ingestion.deduplicate-by-content-hash=false",
"dip.ingestion.expand-mail-zip-attachments=true",
"dip.ingestion.default-visibility=PUBLIC",
"dip.ingestion.mail-default-visibility=RESTRICTED",
"dip.ingestion.import-batch-id=test-mail-bundle",
"dip.ingestion.mail-import-batch-id=test-mail-bundle-mail"
})
class MailBundleProcessingIntegrationTest {
@ -131,6 +136,12 @@ class MailBundleProcessingIntegrationTest {
@Autowired
private DocumentSourceRepository documentSourceRepository;
@Autowired
private DocumentMailMessageRepository documentMailMessageRepository;
@Autowired
private DocumentMailRecipientRepository documentMailRecipientRepository;
@Autowired
private DocumentMailAttachmentRepository documentMailAttachmentRepository;
@Autowired
private DocumentContentRepository documentContentRepository;
@Autowired
private DocumentRelationRepository documentRelationRepository;
@ -213,6 +224,9 @@ class MailBundleProcessingIntegrationTest {
long totalRelations = documentRelationRepository.count();
assertThat(totalDocuments).isEqualTo(expectedRootDocuments + expectedAttachmentDocuments);
assertThat(documentMailMessageRepository.count()).isEqualTo(expectedRootDocuments);
assertThat(documentMailRecipientRepository.count()).isGreaterThanOrEqualTo(expectedRootDocuments);
assertThat(documentMailAttachmentRepository.count()).isEqualTo(expectedAttachmentDocuments);
assertThat(totalSources).isEqualTo(totalDocuments);
assertThat(totalRelations).isEqualTo(expectedAttachmentDocuments);
@ -284,6 +298,8 @@ class MailBundleProcessingIntegrationTest {
assertThat(documentRelationRepository.count()).isEqualTo(parsed.attachments().size());
List<DocumentSource> sources = documentSourceRepository.findAll();
assertThat(documentMailMessageRepository.count()).isEqualTo(1);
assertThat(documentMailAttachmentRepository.count()).isEqualTo(parsed.attachments().size());
assertThat(sources).anyMatch(s -> sample.getFileName().toString().equals(s.getSourceFilename()));
assertThat(sources).anyMatch(s -> s.getSourceFilename() != null && s.getSourceFilename().toLowerCase().endsWith(".pdf"));
assertThat(sources).anyMatch(s -> s.getSourceFilename() != null && s.getSourceFilename().toLowerCase().endsWith(".csv"));
@ -298,6 +314,12 @@ class MailBundleProcessingIntegrationTest {
documentTextRepresentationRepository.deleteAll();
System.out.println("cleanup: content");
documentContentRepository.deleteAll();
System.out.println("cleanup: mail attachments");
documentMailAttachmentRepository.deleteAll();
System.out.println("cleanup: mail recipients");
documentMailRecipientRepository.deleteAll();
System.out.println("cleanup: mail messages");
documentMailMessageRepository.deleteAll();
System.out.println("cleanup: sources");
documentSourceRepository.deleteAll();
System.out.println("cleanup: documents");
@ -348,6 +370,8 @@ class MailBundleProcessingIntegrationTest {
GenericDocumentImportService.class,
MailDocumentIngestionAdapter.class,
MailMessageExtractionService.class,
MailMetadataPersistenceService.class,
MailImportIdentityResolver.class,
ZipExtractionService.class,
DocumentService.class,
DocumentSourceService.class,

@ -0,0 +1,48 @@
package at.procon.dip.ingestion.mail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import at.procon.dip.ingestion.config.DipIngestionProperties;
import org.junit.jupiter.api.Test;
class CamelMailServerEndpointUriFactoryTest {
private final CamelMailServerEndpointUriFactory factory = new CamelMailServerEndpointUriFactory();
@Test
void shouldBuildImapsConsumerUri() {
DipIngestionProperties.MailRouteProperties properties = new DipIngestionProperties.MailRouteProperties();
properties.setProtocol(MailServerProtocol.IMAPS);
properties.setHost("mail.example.org");
properties.setPort(993);
properties.setUsername("user@example.org");
properties.setPassword("p@ss word");
properties.setFolderName("INBOX/Orders");
properties.setDelete(false);
properties.setPeek(true);
properties.setUnseen(true);
properties.setDelay(15000);
properties.setMaxMessagesPerPoll(25);
String uri = factory.buildConsumerUri(properties);
assertThat(uri).startsWith("imaps://mail.example.org:993?");
assertThat(uri).contains("username=user%40example.org");
assertThat(uri).contains("password=p%40ss+word");
assertThat(uri).contains("folderName=INBOX%2FOrders");
assertThat(uri).contains("peek=true");
assertThat(uri).contains("unseen=true");
assertThat(uri).contains("maxMessagesPerPoll=25");
}
@Test
void shouldFailWhenRequiredValuesAreMissing() {
DipIngestionProperties.MailRouteProperties properties = new DipIngestionProperties.MailRouteProperties();
properties.setProtocol(MailServerProtocol.IMAPS);
assertThatThrownBy(() -> factory.buildConsumerUri(properties))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("host");
}
}

@ -0,0 +1,60 @@
package at.procon.dip.ingestion.mail;
import at.procon.dip.domain.document.entity.MailRecipientType;
import at.procon.dip.ingestion.service.MailMessageExtractionService;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class MailImportIdentityResolverTest {
private final MailImportIdentityResolver resolver = new MailImportIdentityResolver();
@Test
void shouldPreferProviderMessageKeyForRootIdentity() {
var parsed = new MailMessageExtractionService.ParsedMailMessage(
"Subject",
"Subject",
"Sender",
"sender@example.com",
"Sender <sender@example.com>",
null,
"<message-id@example.com>",
null,
null,
OffsetDateTime.now(),
OffsetDateTime.now(),
"body",
"",
List.of(new MailMessageExtractionService.MailRecipient(MailRecipientType.TO, null, "to@example.com", "to@example.com", 0)),
List.of()
);
var envelope = new GenericMailProviderEnvelope(MailProviderType.IMAP, "account-a", "Inbox", "uid-77", null, Map.of());
String identity = resolver.resolveRootSourceIdentifier(parsed, envelope, "raw".getBytes());
assertThat(identity).contains("imap").contains("account-a").contains("Inbox").contains("uid-77");
}
@Test
void shouldUsePartPathForAttachmentIdentity() {
var attachment = new MailMessageExtractionService.MailAttachment(
"offer.pdf",
"application/pdf",
new byte[]{1, 2, 3},
3l,
null,
"0.1.2",
"ATTACHMENT",
null,
1
);
String identity = resolver.resolveAttachmentSourceIdentifier("mail:imap:account:Inbox:uid-77", attachment);
assertThat(identity).endsWith(":part:0.1.2");
}
}

@ -0,0 +1,31 @@
package at.procon.dip.ingestion.mail;
import java.util.Map;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class MailProviderEnvelopeAttributesTest {
@Test
void shouldRoundTripGenericProviderEnvelopeThroughAttributes() {
GenericMailProviderEnvelope envelope = new GenericMailProviderEnvelope(
MailProviderType.MICROSOFT_GRAPH,
"mailbox-a",
"Inbox/Subfolder",
"provider-message-42",
"provider-thread-7",
Map.of("tenant", "demo")
);
Map<String, String> attributes = MailProviderEnvelopeAttributes.merge(Map.of("subject", "Hello"), envelope);
GenericMailProviderEnvelope restored = MailProviderEnvelopeAttributes.fromAttributes(attributes);
assertThat(restored.providerType()).isEqualTo(MailProviderType.MICROSOFT_GRAPH);
assertThat(restored.accountKey()).isEqualTo("mailbox-a");
assertThat(restored.folderKey()).isEqualTo("Inbox/Subfolder");
assertThat(restored.providerMessageKey()).isEqualTo("provider-message-42");
assertThat(restored.providerThreadKey()).isEqualTo("provider-thread-7");
assertThat(restored.providerAttributes()).containsEntry("tenant", "demo");
}
}
Loading…
Cancel
Save