From 8fddc2a429fa48f0b10352a56e0388682c6043d4 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Fri, 17 Apr 2026 22:41:55 +0200 Subject: [PATCH] structured email storage and processing --- ...IL_PROCESSING_STABILIZATION_PHASE_ROUTE.md | 58 +++++ docs/MAIL_PROCESSING_STABILIZATION_STEP1.md | 114 +++++++++ docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md | 4 +- .../dip/domain/document/SourceType.java | 1 + .../entity/DocumentMailAttachment.java | 94 ++++++++ .../document/entity/DocumentMailMessage.java | 127 ++++++++++ .../entity/DocumentMailRecipient.java | 67 ++++++ .../document/entity/MailRecipientType.java | 7 + .../DocumentMailAttachmentRepository.java | 16 ++ .../DocumentMailMessageRepository.java | 17 ++ .../DocumentMailRecipientRepository.java | 13 ++ .../repository/DocumentSourceRepository.java | 2 + .../ted/entity/TedNoticeOrganization.java | 2 +- .../ted/entity/TedNoticeProjection.java | 2 +- .../EmbeddingJobProcessingConfiguration.java | 25 ++ .../embedding/config/EmbeddingProperties.java | 1 + .../repository/EmbeddingJobRepository.java | 19 +- .../job/service/EmbeddingJobService.java | 9 +- ...beddingJobExecutionPersistenceService.java | 52 +++++ .../RepresentationEmbeddingOrchestrator.java | 114 ++++++--- .../adapter/MailDocumentIngestionAdapter.java | 103 ++++++-- .../camel/GenericMailIngestionRoute.java | 207 ++++++++++++++++ .../config/DipIngestionProperties.java | 37 ++- .../CamelMailServerEndpointUriFactory.java | 58 +++++ .../mail/GenericMailProviderEnvelope.java | 22 ++ .../mail/MailImportIdentityResolver.java | 52 +++++ .../ingestion/mail/MailProviderEnvelope.java | 22 ++ .../mail/MailProviderEnvelopeAttributes.java | 81 +++++++ .../dip/ingestion/mail/MailProviderType.java | 15 ++ .../mail/MailServerEndpointUriFactory.java | 7 + .../ingestion/mail/MailServerProtocol.java | 18 ++ .../service/GenericDocumentImportService.java | 18 ++ .../service/MailMessageExtractionService.java | 221 +++++++++++++++--- .../MailMetadataPersistenceService.java | 137 +++++++++++ .../java/at/procon/ted/camel/MailRoute.java | 57 ++++- src/main/resources/application-new.yml | 62 ++++- ...oc_mail_processing_stabilization_step1.sql | 90 +++++++ ...NotDependOnTedProcessorPropertiesTest.java | 6 +- ...presentationEmbeddingOrchestratorTest.java | 109 +++++++-- ...ailDocumentIngestionAdapterBundleTest.java | 12 +- ...ocumentIngestionAdapterFileSystemTest.java | 18 +- .../MailBundleProcessingIntegrationTest.java | 44 +++- ...CamelMailServerEndpointUriFactoryTest.java | 48 ++++ .../mail/MailImportIdentityResolverTest.java | 60 +++++ .../MailProviderEnvelopeAttributesTest.java | 31 +++ 45 files changed, 2141 insertions(+), 138 deletions(-) create mode 100644 docs/MAIL_PROCESSING_STABILIZATION_PHASE_ROUTE.md create mode 100644 docs/MAIL_PROCESSING_STABILIZATION_STEP1.md create mode 100644 src/main/java/at/procon/dip/domain/document/entity/DocumentMailAttachment.java create mode 100644 src/main/java/at/procon/dip/domain/document/entity/DocumentMailMessage.java create mode 100644 src/main/java/at/procon/dip/domain/document/entity/DocumentMailRecipient.java create mode 100644 src/main/java/at/procon/dip/domain/document/entity/MailRecipientType.java create mode 100644 src/main/java/at/procon/dip/domain/document/repository/DocumentMailAttachmentRepository.java create mode 100644 src/main/java/at/procon/dip/domain/document/repository/DocumentMailMessageRepository.java create mode 100644 src/main/java/at/procon/dip/domain/document/repository/DocumentMailRecipientRepository.java create mode 100644 src/main/java/at/procon/dip/embedding/config/EmbeddingJobProcessingConfiguration.java create mode 100644 src/main/java/at/procon/dip/embedding/service/EmbeddingJobExecutionPersistenceService.java create mode 100644 src/main/java/at/procon/dip/ingestion/camel/GenericMailIngestionRoute.java create mode 100644 src/main/java/at/procon/dip/ingestion/mail/CamelMailServerEndpointUriFactory.java create mode 100644 src/main/java/at/procon/dip/ingestion/mail/GenericMailProviderEnvelope.java create mode 100644 src/main/java/at/procon/dip/ingestion/mail/MailImportIdentityResolver.java create mode 100644 src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelope.java create mode 100644 src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributes.java create mode 100644 src/main/java/at/procon/dip/ingestion/mail/MailProviderType.java create mode 100644 src/main/java/at/procon/dip/ingestion/mail/MailServerEndpointUriFactory.java create mode 100644 src/main/java/at/procon/dip/ingestion/mail/MailServerProtocol.java create mode 100644 src/main/java/at/procon/dip/ingestion/service/MailMetadataPersistenceService.java create mode 100644 src/main/resources/db/migration/V23__doc_mail_processing_stabilization_step1.sql create mode 100644 src/test/java/at/procon/dip/ingestion/mail/CamelMailServerEndpointUriFactoryTest.java create mode 100644 src/test/java/at/procon/dip/ingestion/mail/MailImportIdentityResolverTest.java create mode 100644 src/test/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributesTest.java diff --git a/docs/MAIL_PROCESSING_STABILIZATION_PHASE_ROUTE.md b/docs/MAIL_PROCESSING_STABILIZATION_PHASE_ROUTE.md new file mode 100644 index 0000000..fa0dd78 --- /dev/null +++ b/docs/MAIL_PROCESSING_STABILIZATION_PHASE_ROUTE.md @@ -0,0 +1,58 @@ +# Mail Processing Stabilization Phase – NEW Camel mail route + +This patch adds a NEW-runtime Camel route for mail ingestion that is similar in purpose to the legacy `MailRoute`, but implemented only in the NEW code path (`at.procon.dip.*`). + +## Scope + +- runtime mode: `NEW` only +- entrypoint: Camel mail consumer route +- downstream handling: `DocumentIngestionGateway` with `SourceType.MAIL` +- no legacy profile/config/code changes + +## Main classes + +- `at.procon.dip.ingestion.camel.GenericMailIngestionRoute` +- `at.procon.dip.ingestion.mail.MailServerEndpointUriFactory` +- `at.procon.dip.ingestion.mail.CamelMailServerEndpointUriFactory` +- `at.procon.dip.ingestion.mail.MailServerProtocol` + +## Configuration + +Configured under `dip.ingestion.mail-route` in `application-new.yml`. + +Supported protocols currently exposed through the generic endpoint factory: +- `IMAP` +- `IMAPS` +- `POP3` +- `POP3S` + +This keeps the route provider/protocol-agnostic at the configuration level while still using Apache Camel mail endpoints underneath. + +## Behavior + +When enabled, the route: +1. consumes messages from the configured mail endpoint +2. serializes each provider message to raw MIME bytes +3. extracts basic provider/provenance attributes +4. builds a `SourceDescriptor` with `SourceType.MAIL` +5. delegates the actual parsing/import/attachment handling to the existing NEW `MailDocumentIngestionAdapter` + +## Provenance attributes added by the route + +The route enriches the `SourceDescriptor.attributes` map with: +- `providerType` +- `providerProtocol` +- `providerAccountKey` +- `providerFolderKey` +- `providerMessageKey` (when available from Camel headers) +- `providerThreadKey` (when available) +- `messageId` +- `subject` +- `from` + +## Notes + +- The route intentionally does **not** depend on `TedProcessorProperties`. +- The route is designed for the NEW runtime only. +- The mail import behavior still relies on the NEW `MailDocumentIngestionAdapter` for MIME parsing and attachment import. +- This step does not add replay/reprocessing yet. diff --git a/docs/MAIL_PROCESSING_STABILIZATION_STEP1.md b/docs/MAIL_PROCESSING_STABILIZATION_STEP1.md new file mode 100644 index 0000000..78cddcd --- /dev/null +++ b/docs/MAIL_PROCESSING_STABILIZATION_STEP1.md @@ -0,0 +1,114 @@ +# Mail Processing Stabilization Phase — Step 1 + +This step implements the first practical slice of the mail-processing stabilization work: + +- generic mail-provider contract +- provider-aware source identifiers for idempotent import +- typed mail metadata persistence +- attachment occurrence tracking +- current Camel/IMAP route adapted to the generic provider contract + +## Included scope + +### 1. Generic mail provider contract +Added a generic abstraction so the ingestion pipeline does not depend on IMAP-specific semantics: + +- `MailProviderType` +- `MailProviderEnvelope` +- `GenericMailProviderEnvelope` +- `MailProviderEnvelopeAttributes` + +Current implementation uses `GenericMailProviderEnvelope` for the existing Camel IMAP route. +Future providers such as POP3, EWS, Microsoft Graph, Gmail API, or replay/file sources can use the same contract. + +### 2. Provider-aware idempotency foundation +Added `MailImportIdentityResolver` to derive stable source identifiers for: + +- root mail message +- attachment occurrences + +Priority for root message identity: +1. provider message key +2. `Message-ID` +3. raw MIME hash + +This allows the import path to remain restart-safe and replay-safe even when content-hash-only deduplication is insufficient. + +### 3. Generic source-id idempotency in document import +`GenericDocumentImportService` now checks for an existing `DOC.doc_source` row using: + +- `source_type` +- `external_source_id` + +before content-hash deduplication. + +This makes source-identifier idempotency reusable beyond mail as well. + +### 4. Typed mail metadata persistence +Added new DOC metadata tables/entities: + +- `DOC.doc_mail_message` +- `DOC.doc_mail_recipient` +- `DOC.doc_mail_attachment` + +These persist: +- provider/account/folder/message/thread keys +- `Message-ID`, `In-Reply-To`, `References` +- normalized subject +- sender/recipients +- attachment occurrence metadata +- part path / archive path / disposition / content-id + +### 5. Attachment source typing +Attachments imported from mail now use: +- `SourceType.MAIL_ATTACHMENT` + +instead of the generic `MAIL` source type. + +### 6. Camel IMAP route integration +The existing Camel mail route now emits generic provider metadata into `SourceDescriptor.attributes()` using the new provider contract. + +## Not yet included + +The following are intentionally left for the next step: + +- replay/reprocess workflows +- import/reprocess run tracking tables +- failed attachment retry services +- thread-aware search/reporting +- admin/ops visibility endpoints or Camel admin routes + +## Main implementation files + +### New files +- `src/main/java/at/procon/dip/ingestion/mail/MailProviderType.java` +- `src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelope.java` +- `src/main/java/at/procon/dip/ingestion/mail/GenericMailProviderEnvelope.java` +- `src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributes.java` +- `src/main/java/at/procon/dip/ingestion/mail/MailImportIdentityResolver.java` +- `src/main/java/at/procon/dip/domain/document/entity/DocumentMailMessage.java` +- `src/main/java/at/procon/dip/domain/document/entity/DocumentMailRecipient.java` +- `src/main/java/at/procon/dip/domain/document/entity/DocumentMailAttachment.java` +- `src/main/java/at/procon/dip/domain/document/entity/MailRecipientType.java` +- `src/main/java/at/procon/dip/domain/document/repository/DocumentMailMessageRepository.java` +- `src/main/java/at/procon/dip/domain/document/repository/DocumentMailRecipientRepository.java` +- `src/main/java/at/procon/dip/domain/document/repository/DocumentMailAttachmentRepository.java` +- `src/main/java/at/procon/dip/ingestion/service/MailMetadataPersistenceService.java` +- `src/main/resources/db/migration/V23__doc_mail_processing_stabilization_step1.sql` + +### Modified files +- `src/main/java/at/procon/dip/domain/document/SourceType.java` +- `src/main/java/at/procon/dip/domain/document/repository/DocumentSourceRepository.java` +- `src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java` +- `src/main/java/at/procon/dip/ingestion/service/MailMessageExtractionService.java` +- `src/main/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapter.java` +- `src/main/java/at/procon/ted/camel/MailRoute.java` + +## Recommended next step + +Proceed with **Step 2** of the Mail Processing Stabilization Phase: + +- replay/reprocess services +- failed attachment retry flow +- import/reprocess run tracking +- reporting / operational visibility diff --git a/docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md b/docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md index 04ccb2f..dc8b182 100644 --- a/docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md +++ b/docs/embedding/VECTOR_SYNC_HTTP_PROVIDER.md @@ -53,6 +53,7 @@ dip: embedding: jobs: enabled: true + parallel-batch-count: 1 process-in-batches: true execution-batch-size: 20 @@ -64,4 +65,5 @@ dip: Notes: - jobs are grouped by `modelKey` - non-batch-capable models still fall back to single-item execution -- `execution-batch-size` controls how many texts are sent in one `/vectorize-batch` request +- `parallel-batch-count` controls how many claimed job batches may be started in parallel +- `execution-batch-size` controls how many texts are sent in one `/vectorize-batch` request inside each claimed job batch diff --git a/src/main/java/at/procon/dip/domain/document/SourceType.java b/src/main/java/at/procon/dip/domain/document/SourceType.java index ccb0e62..a672a4d 100644 --- a/src/main/java/at/procon/dip/domain/document/SourceType.java +++ b/src/main/java/at/procon/dip/domain/document/SourceType.java @@ -7,6 +7,7 @@ public enum SourceType { TED_PACKAGE, PACKAGE_CHILD, MAIL, + MAIL_ATTACHMENT, FILE_SYSTEM, REST_UPLOAD, MANUAL_UPLOAD, diff --git a/src/main/java/at/procon/dip/domain/document/entity/DocumentMailAttachment.java b/src/main/java/at/procon/dip/domain/document/entity/DocumentMailAttachment.java new file mode 100644 index 0000000..e0a3095 --- /dev/null +++ b/src/main/java/at/procon/dip/domain/document/entity/DocumentMailAttachment.java @@ -0,0 +1,94 @@ +package at.procon.dip.domain.document.entity; + +import at.procon.dip.architecture.SchemaNames; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Index; +import jakarta.persistence.PrePersist; +import jakarta.persistence.PreUpdate; +import jakarta.persistence.Table; +import java.time.OffsetDateTime; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Entity +@Table(schema = SchemaNames.DOC, name = "doc_mail_attachment", indexes = { + @Index(name = "idx_doc_mail_attachment_mail", columnList = "mail_document_id"), + @Index(name = "idx_doc_mail_attachment_document", columnList = "attachment_document_id"), + @Index(name = "idx_doc_mail_attachment_part_path", columnList = "part_path"), + @Index(name = "idx_doc_mail_attachment_attachment_index", columnList = "attachment_index") +}) +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class DocumentMailAttachment { + + @Id + @GeneratedValue(strategy = GenerationType.UUID) + private UUID id; + + @Column(name = "mail_document_id", nullable = false) + private UUID mailDocumentId; + + @Column(name = "attachment_document_id", nullable = false) + private UUID attachmentDocumentId; + + @Column(name = "disposition", length = 32) + private String disposition; + + @Column(name = "content_id", length = 500) + private String contentId; + + @Column(name = "filename", length = 1000) + private String filename; + + @Column(name = "mime_type", length = 255) + private String mimeType; + + @Column(name = "size_bytes") + private Long sizeBytes; + + @Column(name = "attachment_index") + private Integer attachmentIndex; + + @Column(name = "part_path", length = 500) + private String partPath; + + @Column(name = "path_in_archive", columnDefinition = "TEXT") + private String pathInArchive; + + @Column(name = "extraction_status", nullable = false, length = 32) + @Builder.Default + private String extractionStatus = "IMPORTED"; + + @Column(name = "error_message", columnDefinition = "TEXT") + private String errorMessage; + + @Builder.Default + @Column(name = "created_at", nullable = false, updatable = false) + private OffsetDateTime createdAt = OffsetDateTime.now(); + + @Builder.Default + @Column(name = "updated_at", nullable = false) + private OffsetDateTime updatedAt = OffsetDateTime.now(); + + @PrePersist + protected void onCreate() { + createdAt = OffsetDateTime.now(); + updatedAt = OffsetDateTime.now(); + } + + @PreUpdate + protected void onUpdate() { + updatedAt = OffsetDateTime.now(); + } +} diff --git a/src/main/java/at/procon/dip/domain/document/entity/DocumentMailMessage.java b/src/main/java/at/procon/dip/domain/document/entity/DocumentMailMessage.java new file mode 100644 index 0000000..6a2e626 --- /dev/null +++ b/src/main/java/at/procon/dip/domain/document/entity/DocumentMailMessage.java @@ -0,0 +1,127 @@ +package at.procon.dip.domain.document.entity; + +import at.procon.dip.architecture.SchemaNames; +import at.procon.dip.ingestion.mail.MailProviderType; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; +import jakarta.persistence.FetchType; +import jakarta.persistence.Id; +import jakarta.persistence.Index; +import jakarta.persistence.JoinColumn; +import jakarta.persistence.OneToOne; +import jakarta.persistence.PrePersist; +import jakarta.persistence.PreUpdate; +import jakarta.persistence.Table; +import java.time.OffsetDateTime; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Entity +@Table(schema = SchemaNames.DOC, name = "doc_mail_message", indexes = { + @Index(name = "idx_doc_mail_message_source_id", columnList = "source_id"), + @Index(name = "idx_doc_mail_message_message_id", columnList = "message_id"), + @Index(name = "idx_doc_mail_message_thread_key", columnList = "thread_key"), + @Index(name = "idx_doc_mail_message_provider_identity", columnList = "provider_type, account_key, folder_key, provider_message_key") +}) +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class DocumentMailMessage { + + @Id + @Column(name = "document_id", nullable = false) + private UUID documentId; + + @OneToOne(fetch = FetchType.LAZY, optional = false) + @JoinColumn(name = "document_id", nullable = false, insertable = false, updatable = false) + private Document document; + + @Column(name = "source_id") + private UUID sourceId; + + @Enumerated(EnumType.STRING) + @Column(name = "provider_type", nullable = false, length = 64) + @Builder.Default + private MailProviderType providerType = MailProviderType.GENERIC; + + @Column(name = "account_key", length = 255) + private String accountKey; + + @Column(name = "folder_key", length = 255) + private String folderKey; + + @Column(name = "provider_message_key", length = 500) + private String providerMessageKey; + + @Column(name = "provider_thread_key", length = 500) + private String providerThreadKey; + + @Column(name = "message_id", length = 1000) + private String messageId; + + @Column(name = "in_reply_to", length = 1000) + private String inReplyTo; + + @Column(name = "references_header", columnDefinition = "TEXT") + private String referencesHeader; + + @Column(name = "thread_key", length = 1000) + private String threadKey; + + @Column(name = "subject", length = 1000) + private String subject; + + @Column(name = "normalized_subject", length = 1000) + private String normalizedSubject; + + @Column(name = "from_display_name", length = 500) + private String fromDisplayName; + + @Column(name = "from_email", length = 500) + private String fromEmail; + + @Column(name = "from_raw", length = 1000) + private String fromRaw; + + @Column(name = "reply_to_raw", columnDefinition = "TEXT") + private String replyToRaw; + + @Column(name = "sent_at") + private OffsetDateTime sentAt; + + @Column(name = "received_at") + private OffsetDateTime receivedAt; + + @Column(name = "raw_message_hash", length = 64) + private String rawMessageHash; + + @Column(name = "raw_header_hash", length = 64) + private String rawHeaderHash; + + @Builder.Default + @Column(name = "created_at", nullable = false, updatable = false) + private OffsetDateTime createdAt = OffsetDateTime.now(); + + @Builder.Default + @Column(name = "updated_at", nullable = false) + private OffsetDateTime updatedAt = OffsetDateTime.now(); + + @PrePersist + protected void onCreate() { + createdAt = OffsetDateTime.now(); + updatedAt = OffsetDateTime.now(); + } + + @PreUpdate + protected void onUpdate() { + updatedAt = OffsetDateTime.now(); + } +} diff --git a/src/main/java/at/procon/dip/domain/document/entity/DocumentMailRecipient.java b/src/main/java/at/procon/dip/domain/document/entity/DocumentMailRecipient.java new file mode 100644 index 0000000..c82880a --- /dev/null +++ b/src/main/java/at/procon/dip/domain/document/entity/DocumentMailRecipient.java @@ -0,0 +1,67 @@ +package at.procon.dip.domain.document.entity; + +import at.procon.dip.architecture.SchemaNames; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Index; +import jakarta.persistence.PrePersist; +import jakarta.persistence.Table; +import java.time.OffsetDateTime; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Entity +@Table(schema = SchemaNames.DOC, name = "doc_mail_recipient", indexes = { + @Index(name = "idx_doc_mail_recipient_mail", columnList = "mail_document_id"), + @Index(name = "idx_doc_mail_recipient_type", columnList = "recipient_type"), + @Index(name = "idx_doc_mail_recipient_email", columnList = "email_address") +}) +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class DocumentMailRecipient { + + @Id + @GeneratedValue(strategy = GenerationType.UUID) + private UUID id; + + @Column(name = "mail_document_id", nullable = false) + private UUID mailDocumentId; + + @Enumerated(EnumType.STRING) + @Column(name = "recipient_type", nullable = false, length = 16) + private MailRecipientType recipientType; + + @Column(name = "display_name", length = 500) + private String displayName; + + @Column(name = "email_address", length = 500) + private String emailAddress; + + @Column(name = "raw_value", columnDefinition = "TEXT") + private String rawValue; + + @Column(name = "sort_order", nullable = false) + @Builder.Default + private int sortOrder = 0; + + @Builder.Default + @Column(name = "created_at", nullable = false, updatable = false) + private OffsetDateTime createdAt = OffsetDateTime.now(); + + @PrePersist + protected void onCreate() { + createdAt = OffsetDateTime.now(); + } +} diff --git a/src/main/java/at/procon/dip/domain/document/entity/MailRecipientType.java b/src/main/java/at/procon/dip/domain/document/entity/MailRecipientType.java new file mode 100644 index 0000000..3bfc1ed --- /dev/null +++ b/src/main/java/at/procon/dip/domain/document/entity/MailRecipientType.java @@ -0,0 +1,7 @@ +package at.procon.dip.domain.document.entity; + +public enum MailRecipientType { + TO, + CC, + BCC +} diff --git a/src/main/java/at/procon/dip/domain/document/repository/DocumentMailAttachmentRepository.java b/src/main/java/at/procon/dip/domain/document/repository/DocumentMailAttachmentRepository.java new file mode 100644 index 0000000..77bcbd5 --- /dev/null +++ b/src/main/java/at/procon/dip/domain/document/repository/DocumentMailAttachmentRepository.java @@ -0,0 +1,16 @@ +package at.procon.dip.domain.document.repository; + +import at.procon.dip.domain.document.entity.DocumentMailAttachment; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface DocumentMailAttachmentRepository extends JpaRepository { + + List findByMailDocumentId(UUID mailDocumentId); + + Optional findByMailDocumentIdAndAttachmentIndex(UUID mailDocumentId, Integer attachmentIndex); + + Optional findByMailDocumentIdAndPartPath(UUID mailDocumentId, String partPath); +} diff --git a/src/main/java/at/procon/dip/domain/document/repository/DocumentMailMessageRepository.java b/src/main/java/at/procon/dip/domain/document/repository/DocumentMailMessageRepository.java new file mode 100644 index 0000000..b205a19 --- /dev/null +++ b/src/main/java/at/procon/dip/domain/document/repository/DocumentMailMessageRepository.java @@ -0,0 +1,17 @@ +package at.procon.dip.domain.document.repository; + +import at.procon.dip.domain.document.entity.DocumentMailMessage; +import at.procon.dip.ingestion.mail.MailProviderType; +import java.util.Optional; +import java.util.UUID; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface DocumentMailMessageRepository extends JpaRepository { + + Optional findByProviderTypeAndAccountKeyAndFolderKeyAndProviderMessageKey( + MailProviderType providerType, + String accountKey, + String folderKey, + String providerMessageKey + ); +} diff --git a/src/main/java/at/procon/dip/domain/document/repository/DocumentMailRecipientRepository.java b/src/main/java/at/procon/dip/domain/document/repository/DocumentMailRecipientRepository.java new file mode 100644 index 0000000..f3afa1d --- /dev/null +++ b/src/main/java/at/procon/dip/domain/document/repository/DocumentMailRecipientRepository.java @@ -0,0 +1,13 @@ +package at.procon.dip.domain.document.repository; + +import at.procon.dip.domain.document.entity.DocumentMailRecipient; +import java.util.List; +import java.util.UUID; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface DocumentMailRecipientRepository extends JpaRepository { + + List findByMailDocumentId(UUID mailDocumentId); + + void deleteByMailDocumentId(UUID mailDocumentId); +} diff --git a/src/main/java/at/procon/dip/domain/document/repository/DocumentSourceRepository.java b/src/main/java/at/procon/dip/domain/document/repository/DocumentSourceRepository.java index 31e100d..2ca8e99 100644 --- a/src/main/java/at/procon/dip/domain/document/repository/DocumentSourceRepository.java +++ b/src/main/java/at/procon/dip/domain/document/repository/DocumentSourceRepository.java @@ -14,4 +14,6 @@ public interface DocumentSourceRepository extends JpaRepository findBySourceType(SourceType sourceType); Optional findByExternalSourceId(String externalSourceId); + + Optional findBySourceTypeAndExternalSourceId(SourceType sourceType, String externalSourceId); } diff --git a/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeOrganization.java b/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeOrganization.java index d6738ce..71a397f 100644 --- a/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeOrganization.java +++ b/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeOrganization.java @@ -52,7 +52,7 @@ public class TedNoticeOrganization { @Column(name = "name", columnDefinition = "TEXT") private String name; - @Column(name = "company_id", length = 1000) + @Column(name = "company_id", columnDefinition = "TEXT") private String companyId; @Column(name = "country_code", columnDefinition = "TEXT") diff --git a/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeProjection.java b/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeProjection.java index 6a6c683..b92f2d2 100644 --- a/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeProjection.java +++ b/src/main/java/at/procon/dip/domain/ted/entity/TedNoticeProjection.java @@ -115,7 +115,7 @@ public class TedNoticeProjection { @Column(name = "buyer_city", columnDefinition = "TEXT") private String buyerCity; - @Column(name = "buyer_postal_code", length = 100) + @Column(name = "buyer_postal_code", columnDefinition = "TEXT") private String buyerPostalCode; @Column(name = "buyer_nuts_code", length = 10) diff --git a/src/main/java/at/procon/dip/embedding/config/EmbeddingJobProcessingConfiguration.java b/src/main/java/at/procon/dip/embedding/config/EmbeddingJobProcessingConfiguration.java new file mode 100644 index 0000000..05417a1 --- /dev/null +++ b/src/main/java/at/procon/dip/embedding/config/EmbeddingJobProcessingConfiguration.java @@ -0,0 +1,25 @@ +package at.procon.dip.embedding.config; + +import java.util.concurrent.Executor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +public class EmbeddingJobProcessingConfiguration { + + @Bean(name = "embeddingJobProcessingExecutor") + public Executor embeddingJobProcessingExecutor(EmbeddingProperties properties) { + int parallelBatchCount = Math.max(1, properties.getJobs().getParallelBatchCount()); + + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setThreadNamePrefix("embedding-job-"); + executor.setCorePoolSize(parallelBatchCount); + executor.setMaxPoolSize(parallelBatchCount); + executor.setQueueCapacity(parallelBatchCount); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(30); + executor.initialize(); + return executor; + } +} diff --git a/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java b/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java index a7a5328..d022e5e 100644 --- a/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java +++ b/src/main/java/at/procon/dip/embedding/config/EmbeddingProperties.java @@ -67,6 +67,7 @@ public class EmbeddingProperties { public static class JobsProperties { private boolean enabled = false; private int batchSize = 16; + private int parallelBatchCount = 1; private boolean processInBatches = false; private int executionBatchSize = 8; private int maxRetries = 5; diff --git a/src/main/java/at/procon/dip/embedding/job/repository/EmbeddingJobRepository.java b/src/main/java/at/procon/dip/embedding/job/repository/EmbeddingJobRepository.java index 646c6c3..92b5606 100644 --- a/src/main/java/at/procon/dip/embedding/job/repository/EmbeddingJobRepository.java +++ b/src/main/java/at/procon/dip/embedding/job/repository/EmbeddingJobRepository.java @@ -3,15 +3,12 @@ package at.procon.dip.embedding.job.repository; import at.procon.dip.embedding.job.entity.EmbeddingJob; import at.procon.dip.embedding.model.EmbeddingJobStatus; import at.procon.dip.embedding.model.EmbeddingJobType; -import jakarta.persistence.LockModeType; import java.time.OffsetDateTime; import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.UUID; -import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; -import org.springframework.data.jpa.repository.Lock; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; @@ -23,11 +20,17 @@ public interface EmbeddingJobRepository extends JpaRepository statuses); - @Lock(LockModeType.PESSIMISTIC_WRITE) - @Query("SELECT j FROM EmbeddingJob j WHERE j.status IN :statuses AND (j.nextRetryAt IS NULL OR j.nextRetryAt <= :now) ORDER BY j.priority DESC, j.createdAt ASC") - List findReadyJobsForUpdate(@Param("statuses") Collection statuses, - @Param("now") OffsetDateTime now, - Pageable pageable); + @Query(value = """ + SELECT * + FROM DOC.doc_embedding_job j + WHERE j.status IN ('PENDING', 'RETRY_SCHEDULED') + AND (j.next_retry_at IS NULL OR j.next_retry_at <= :now) + ORDER BY j.priority DESC, j.created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT :limit + """, nativeQuery = true) + List claimReadyJobs(@Param("now") OffsetDateTime now, + @Param("limit") int limit); List findByDocumentId(UUID documentId); } diff --git a/src/main/java/at/procon/dip/embedding/job/service/EmbeddingJobService.java b/src/main/java/at/procon/dip/embedding/job/service/EmbeddingJobService.java index 1247bd7..7198a3a 100644 --- a/src/main/java/at/procon/dip/embedding/job/service/EmbeddingJobService.java +++ b/src/main/java/at/procon/dip/embedding/job/service/EmbeddingJobService.java @@ -15,8 +15,8 @@ import java.util.List; import java.util.Set; import java.util.UUID; import lombok.RequiredArgsConstructor; -import org.springframework.data.domain.PageRequest; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @Service @@ -73,12 +73,9 @@ public class EmbeddingJobService { .build())); } + @Transactional(propagation = Propagation.REQUIRES_NEW) public List claimNextReadyJobs(int limit) { - List jobs = jobRepository.findReadyJobsForUpdate( - Set.of(EmbeddingJobStatus.PENDING, EmbeddingJobStatus.RETRY_SCHEDULED), - OffsetDateTime.now(), - PageRequest.of(0, limit) - ); + List jobs = jobRepository.claimReadyJobs(OffsetDateTime.now(), limit); jobs.forEach(job -> { job.setStatus(EmbeddingJobStatus.IN_PROGRESS); job.setAttemptCount(job.getAttemptCount() + 1); diff --git a/src/main/java/at/procon/dip/embedding/service/EmbeddingJobExecutionPersistenceService.java b/src/main/java/at/procon/dip/embedding/service/EmbeddingJobExecutionPersistenceService.java new file mode 100644 index 0000000..c176718 --- /dev/null +++ b/src/main/java/at/procon/dip/embedding/service/EmbeddingJobExecutionPersistenceService.java @@ -0,0 +1,52 @@ +package at.procon.dip.embedding.service; + +import at.procon.dip.embedding.job.service.EmbeddingJobService; +import at.procon.dip.embedding.model.EmbeddingProviderResult; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +@Service +@RequiredArgsConstructor +public class EmbeddingJobExecutionPersistenceService { + + private final EmbeddingPersistenceService embeddingPersistenceService; + private final EmbeddingJobService jobService; + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public UUID startProcessing(UUID representationId, String modelKey) { + var embedding = embeddingPersistenceService.ensurePending(representationId, modelKey); + embeddingPersistenceService.markProcessing(embedding.getId()); + return embedding.getId(); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void completeJob(UUID embeddingId, + EmbeddingProviderResult result, + UUID jobId, + String providerRequestId) { + embeddingPersistenceService.saveCompleted(embeddingId, result); + jobService.markCompleted(jobId, providerRequestId); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void completeJob(UUID embeddingId, + float[] vector, + Integer tokenCount, + UUID jobId, + String providerRequestId) { + embeddingPersistenceService.saveCompleted(embeddingId, vector, tokenCount); + jobService.markCompleted(jobId, providerRequestId); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void failJob(UUID embeddingId, + UUID jobId, + String errorMessage, + boolean retryable) { + embeddingPersistenceService.markFailed(embeddingId, errorMessage); + jobService.markFailed(jobId, errorMessage, retryable); + } +} diff --git a/src/main/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestrator.java b/src/main/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestrator.java index 45fadb4..daccdcd 100644 --- a/src/main/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestrator.java +++ b/src/main/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestrator.java @@ -1,6 +1,5 @@ package at.procon.dip.embedding.service; -import at.procon.dip.domain.document.entity.DocumentEmbedding; import at.procon.dip.domain.document.entity.DocumentTextRepresentation; import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository; import at.procon.dip.embedding.config.EmbeddingProperties; @@ -17,35 +16,51 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.UUID; -import lombok.RequiredArgsConstructor; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; @Service -@RequiredArgsConstructor @Slf4j public class RepresentationEmbeddingOrchestrator { private final EmbeddingJobService jobService; private final EmbeddingExecutionService executionService; - private final EmbeddingPersistenceService persistenceService; + private final EmbeddingJobExecutionPersistenceService executionPersistenceService; private final DocumentTextRepresentationRepository representationRepository; private final EmbeddingSelectionPolicy selectionPolicy; private final EmbeddingModelRegistry modelRegistry; private final EmbeddingProperties embeddingProperties; + private final Executor embeddingJobProcessingExecutor; + + public RepresentationEmbeddingOrchestrator(EmbeddingJobService jobService, + EmbeddingExecutionService executionService, + EmbeddingJobExecutionPersistenceService executionPersistenceService, + DocumentTextRepresentationRepository representationRepository, + EmbeddingSelectionPolicy selectionPolicy, + EmbeddingModelRegistry modelRegistry, + EmbeddingProperties embeddingProperties, + @Qualifier("embeddingJobProcessingExecutor") Executor embeddingJobProcessingExecutor) { + this.jobService = jobService; + this.executionService = executionService; + this.executionPersistenceService = executionPersistenceService; + this.representationRepository = representationRepository; + this.selectionPolicy = selectionPolicy; + this.modelRegistry = modelRegistry; + this.embeddingProperties = embeddingProperties; + this.embeddingJobProcessingExecutor = embeddingJobProcessingExecutor; + } - @Transactional public List enqueueDocument(UUID documentId) { return jobService.enqueueForDocument(documentId); } - @Transactional public List enqueueDocument(UUID documentId, String modelKey) { return jobService.enqueueForDocument(documentId, modelKey); } - @Transactional public List enqueueDocument(UUID documentId, String modelKey, EmbeddingProfile profile) { var model = modelRegistry.getRequired(modelKey); return selectionPolicy.selectRepresentations(documentId, model, profile).stream() @@ -53,32 +68,34 @@ public class RepresentationEmbeddingOrchestrator { .toList(); } - @Transactional public EmbeddingJob enqueueRepresentation(UUID documentId, UUID representationId, String modelKey) { return jobService.enqueueForRepresentation(documentId, representationId, modelKey, EmbeddingJobType.DOCUMENT_EMBED); } - @Transactional public int processNextReadyBatch() { if (!embeddingProperties.isEnabled() || !embeddingProperties.getJobs().isEnabled()) { log.debug("New embedding subsystem jobs are disabled"); return 0; } - List jobs = jobService.claimNextReadyJobs(embeddingProperties.getJobs().getBatchSize()); + int batchSize = Math.max(1, embeddingProperties.getJobs().getBatchSize()); + int parallelBatchCount = Math.max(1, embeddingProperties.getJobs().getParallelBatchCount()); + int claimLimit = batchSize * parallelBatchCount; + + List jobs = jobService.claimNextReadyJobs(claimLimit); if (jobs.isEmpty()) { return 0; } - if (embeddingProperties.getJobs().isProcessInBatches()) { - processClaimedJobsInBatches(jobs); + List> claimedBatches = partition(jobs, batchSize); + if (claimedBatches.size() == 1) { + processClaimedJobBatch(claimedBatches.getFirst()); } else { - jobs.forEach(this::processClaimedJobSafely); + runClaimedBatchesInParallel(claimedBatches); } return jobs.size(); } - @Transactional public void processClaimedJob(EmbeddingJob job) { EmbeddingModelDescriptor model = modelRegistry.getRequired(job.getModelKey()); PreparedEmbedding prepared = prepareEmbedding(job, model); @@ -92,16 +109,43 @@ public class RepresentationEmbeddingOrchestrator { EmbeddingUseCase.DOCUMENT, List.of(prepared.text()) ); - persistenceService.saveCompleted(prepared.embeddingId(), result); - jobService.markCompleted(job.getId(), result.providerRequestId()); + executionPersistenceService.completeJob( + prepared.embeddingId(), + result, + job.getId(), + result.providerRequestId() + ); } catch (RuntimeException ex) { - persistenceService.markFailed(prepared.embeddingId(), ex.getMessage()); - jobService.markFailed(job.getId(), ex.getMessage(), true); + executionPersistenceService.failJob( + prepared.embeddingId(), + job.getId(), + ex.getMessage(), + true + ); throw ex; } } - private void processClaimedJobsInBatches(List jobs) { + private void processClaimedJobBatch(List jobs) { + if (embeddingProperties.getJobs().isProcessInBatches()) { + processClaimedJobsInExecutionBatches(jobs); + } else { + jobs.forEach(this::processClaimedJobSafely); + } + } + + private void runClaimedBatchesInParallel(List> claimedBatches) { + List> futures = claimedBatches.stream() + .map(batch -> CompletableFuture.runAsync( + () -> processClaimedJobBatch(batch), + embeddingJobProcessingExecutor + )) + .toList(); + + futures.forEach(CompletableFuture::join); + } + + private void processClaimedJobsInExecutionBatches(List jobs) { LinkedHashMap> jobsByModelKey = new LinkedHashMap<>(); for (EmbeddingJob job : jobs) { jobsByModelKey.computeIfAbsent(job.getModelKey(), ignored -> new ArrayList<>()).add(job); @@ -127,6 +171,14 @@ public class RepresentationEmbeddingOrchestrator { } } + private List> partition(List jobs, int batchSize) { + List> batches = new ArrayList<>(); + for (int start = 0; start < jobs.size(); start += batchSize) { + batches.add(jobs.subList(start, Math.min(start + batchSize, jobs.size()))); + } + return batches; + } + private void processClaimedBatchSafely(List jobs, EmbeddingModelDescriptor model) { try { processClaimedBatch(jobs, model); @@ -174,13 +226,22 @@ public class RepresentationEmbeddingOrchestrator { for (int i = 0; i < preparedItems.size(); i++) { PreparedEmbedding prepared = preparedItems.get(i); - persistenceService.saveCompleted(prepared.embeddingId(), result.vectors().get(i), null); - jobService.markCompleted(prepared.job().getId(), result.providerRequestId()); + executionPersistenceService.completeJob( + prepared.embeddingId(), + result.vectors().get(i), + null, + prepared.job().getId(), + result.providerRequestId() + ); } } catch (RuntimeException ex) { for (PreparedEmbedding prepared : preparedItems) { - persistenceService.markFailed(prepared.embeddingId(), ex.getMessage()); - jobService.markFailed(prepared.job().getId(), ex.getMessage(), true); + executionPersistenceService.failJob( + prepared.embeddingId(), + prepared.job().getId(), + ex.getMessage(), + true + ); } throw ex; } @@ -203,9 +264,8 @@ public class RepresentationEmbeddingOrchestrator { text = text.substring(0, maxChars); } - DocumentEmbedding embedding = persistenceService.ensurePending(representation.getId(), job.getModelKey()); - persistenceService.markProcessing(embedding.getId()); - return new PreparedEmbedding(job, embedding.getId(), text); + UUID embeddingId = executionPersistenceService.startProcessing(representation.getId(), job.getModelKey()); + return new PreparedEmbedding(job, embeddingId, text); } private record PreparedEmbedding(EmbeddingJob job, UUID embeddingId, String text) { diff --git a/src/main/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapter.java b/src/main/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapter.java index 452a527..39c23da 100644 --- a/src/main/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapter.java +++ b/src/main/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapter.java @@ -4,29 +4,37 @@ import at.procon.dip.domain.access.DocumentAccessContext; import at.procon.dip.domain.access.DocumentVisibility; import at.procon.dip.domain.document.RelationType; import at.procon.dip.domain.document.SourceType; +import at.procon.dip.domain.document.entity.DocumentSource; import at.procon.dip.domain.tenant.TenantRef; +import at.procon.dip.domain.document.repository.DocumentSourceRepository; import at.procon.dip.domain.document.service.DocumentRelationService; import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand; +import at.procon.dip.ingestion.config.DipIngestionProperties; import at.procon.dip.ingestion.dto.ImportedDocumentResult; +import at.procon.dip.ingestion.mail.MailImportIdentityResolver; +import at.procon.dip.ingestion.mail.MailProviderEnvelope; +import at.procon.dip.ingestion.mail.MailProviderEnvelopeAttributes; import at.procon.dip.ingestion.service.GenericDocumentImportService; import at.procon.dip.ingestion.service.MailMessageExtractionService; import at.procon.dip.ingestion.service.MailMessageExtractionService.MailAttachment; import at.procon.dip.ingestion.service.MailMessageExtractionService.ParsedMailMessage; +import at.procon.dip.ingestion.service.MailMetadataPersistenceService; import at.procon.dip.ingestion.spi.DocumentIngestionAdapter; import at.procon.dip.ingestion.spi.IngestionResult; import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy; import at.procon.dip.ingestion.spi.SourceDescriptor; import at.procon.dip.ingestion.util.DocumentImportSupport; -import at.procon.dip.ingestion.config.DipIngestionProperties; import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; import at.procon.dip.runtime.config.RuntimeMode; import at.procon.ted.service.attachment.AttachmentExtractor; import at.procon.ted.service.attachment.ZipExtractionService; +import at.procon.ted.util.HashUtils; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -42,6 +50,9 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter { private final MailMessageExtractionService mailExtractionService; private final DocumentRelationService relationService; private final ZipExtractionService zipExtractionService; + private final MailImportIdentityResolver mailImportIdentityResolver; + private final MailMetadataPersistenceService mailMetadataPersistenceService; + private final DocumentSourceRepository documentSourceRepository; @Override public boolean supports(SourceDescriptor sourceDescriptor) { @@ -57,20 +68,28 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter { throw new IllegalArgumentException("Mail adapter requires raw MIME bytes"); } ParsedMailMessage parsed = mailExtractionService.parse(rawMime); + MailProviderEnvelope providerEnvelope = MailProviderEnvelopeAttributes.fromAttributes(sourceDescriptor.attributes()); DocumentAccessContext accessContext = sourceDescriptor.accessContext() == null ? defaultMailAccessContext() : sourceDescriptor.accessContext(); + String rootSourceIdentifier = mailImportIdentityResolver.resolveRootSourceIdentifier(parsed, providerEnvelope, rawMime); - Map rootAttributes = new LinkedHashMap<>(sourceDescriptor.attributes() == null ? Map.of() : sourceDescriptor.attributes()); + Map rootAttributes = MailProviderEnvelopeAttributes.merge(sourceDescriptor.attributes(), providerEnvelope); if (parsed.subject() != null) rootAttributes.put("subject", parsed.subject()); - if (parsed.from() != null) rootAttributes.put("from", parsed.from()); - if (!parsed.recipients().isEmpty()) rootAttributes.put("to", String.join(", ", parsed.recipients())); + if (parsed.fromRaw() != null) rootAttributes.put("from", parsed.fromRaw()); + List toRecipients = parsed.recipients().stream() + .filter(recipient -> recipient.recipientType() == at.procon.dip.domain.document.entity.MailRecipientType.TO) + .map(MailMessageExtractionService.MailRecipient::rawValue) + .toList(); + if (!toRecipients.isEmpty()) rootAttributes.put("to", String.join(", ", toRecipients)); rootAttributes.putIfAbsent("title", parsed.subject() != null ? parsed.subject() : sourceDescriptor.fileName()); rootAttributes.put("attachmentCount", Integer.toString(parsed.attachments().size())); rootAttributes.put("importBatchId", properties.getMailImportBatchId()); + if (parsed.messageId() != null) rootAttributes.put("messageId", parsed.messageId()); + if (parsed.inReplyTo() != null) rootAttributes.put("inReplyTo", parsed.inReplyTo()); ImportedDocumentResult rootResult = importService.importDocument(new SourceDescriptor( accessContext, SourceType.MAIL, - sourceDescriptor.sourceIdentifier(), + rootSourceIdentifier, sourceDescriptor.sourceUri(), sourceDescriptor.fileName() != null ? sourceDescriptor.fileName() : fallbackMailFileName(parsed), "message/rfc822", @@ -81,36 +100,65 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter { rootAttributes )); + UUIDSource rootSource = resolveSourceId(SourceType.MAIL, rootSourceIdentifier); + mailMetadataPersistenceService.upsertMessageMetadata( + rootResult.document().getId(), + rootSource.sourceId(), + parsed, + providerEnvelope, + HashUtils.computeSha256(rawMime) + ); + List documents = new ArrayList<>(); List warnings = new ArrayList<>(rootResult.warnings()); documents.add(rootResult.document().toCanonicalMetadata()); int sortOrder = 0; for (MailAttachment attachment : parsed.attachments()) { - importAttachment(rootResult.document().getId(), accessContext, sourceDescriptor, attachment, documents, warnings, ++sortOrder, 0); + importAttachment(rootResult.document().getId(), rootResult.document().getId(), rootSourceIdentifier, accessContext, sourceDescriptor, attachment, documents, warnings, ++sortOrder, 0); } return new IngestionResult(documents, warnings); } - private void importAttachment(java.util.UUID parentDocumentId, DocumentAccessContext accessContext, SourceDescriptor parentSource, - MailAttachment attachment, List documents, - List warnings, int sortOrder, int depth) { + private void importAttachment(java.util.UUID mailRootDocumentId, + java.util.UUID parentDocumentId, + String rootSourceIdentifier, + DocumentAccessContext accessContext, + SourceDescriptor parentSource, + MailAttachment attachment, + List documents, + List warnings, + int sortOrder, + int depth) { boolean expandableWrapper = properties.isExpandMailZipAttachments() && zipExtractionService.canHandle(attachment.fileName(), attachment.contentType()); + String attachmentSourceIdentifier = mailImportIdentityResolver.resolveAttachmentSourceIdentifier(rootSourceIdentifier, attachment); Map attachmentAttributes = new LinkedHashMap<>(); attachmentAttributes.put("title", attachment.fileName()); - attachmentAttributes.put("mailSourceIdentifier", parentSource.sourceIdentifier()); + attachmentAttributes.put("mailRootSourceIdentifier", rootSourceIdentifier); attachmentAttributes.put("importBatchId", properties.getMailImportBatchId()); + if (attachment.partPath() != null) { + attachmentAttributes.put("mailPartPath", attachment.partPath()); + } + if (attachment.attachmentIndex() != null) { + attachmentAttributes.put("mailAttachmentIndex", attachment.attachmentIndex().toString()); + } + if (attachment.contentId() != null) { + attachmentAttributes.put("mailContentId", attachment.contentId()); + } + if (attachment.disposition() != null) { + attachmentAttributes.put("mailDisposition", attachment.disposition()); + } if (expandableWrapper) { attachmentAttributes.put("wrapperDocument", Boolean.TRUE.toString()); } ImportedDocumentResult attachmentResult = importService.importDocument(new SourceDescriptor( accessContext, - SourceType.MAIL, - parentSource.sourceIdentifier() + ":attachment:" + depth + ":" + attachment.fileName(), + SourceType.MAIL_ATTACHMENT, + attachmentSourceIdentifier, parentSource.sourceUri(), attachment.fileName(), DocumentImportSupport.normalizeMediaType(attachment.contentType()), @@ -125,6 +173,13 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter { RelationType relationType = depth > 0 || attachment.path() != null ? RelationType.EXTRACTED_FROM : RelationType.ATTACHMENT_OF; relationService.ensureRelation(new CreateDocumentRelationCommand( parentDocumentId, attachmentResult.document().getId(), relationType, sortOrder, attachment.fileName())); + mailMetadataPersistenceService.upsertAttachmentMetadata( + mailRootDocumentId, + attachmentResult.document().getId(), + attachment, + "IMPORTED", + null + ); if (expandableWrapper) { AttachmentExtractor.ExtractionResult zipResult = zipExtractionService.extract(attachment.data(), attachment.fileName(), attachment.contentType()); @@ -134,13 +189,28 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter { } int childSort = 0; for (AttachmentExtractor.ChildAttachment child : zipResult.childAttachments()) { - importAttachment(attachmentResult.document().getId(), accessContext, parentSource, - new MailAttachment(child.filename(), child.contentType(), child.data(), child.data().length, child.pathInArchive()), - documents, warnings, ++childSort, depth + 1); + importAttachment(mailRootDocumentId, attachmentResult.document().getId(), rootSourceIdentifier, accessContext, parentSource, + new MailAttachment( + child.filename(), + child.contentType(), + child.data(), + Long.valueOf(child.data().length), + child.pathInArchive(), + attachment.partPath() == null ? null : attachment.partPath() + "/" + child.pathInArchive(), + attachment.disposition(), + attachment.contentId(), + attachment.attachmentIndex() == null ? null : attachment.attachmentIndex() * 1000 + (++childSort) + ), + documents, warnings, childSort, depth + 1); } } } + private UUIDSource resolveSourceId(SourceType sourceType, String sourceIdentifier) { + Optional source = documentSourceRepository.findBySourceTypeAndExternalSourceId(sourceType, sourceIdentifier); + return new UUIDSource(source.map(DocumentSource::getId).orElse(null)); + } + private String fallbackMailFileName(ParsedMailMessage parsed) { String subject = parsed.subject() == null || parsed.subject().isBlank() ? "mail-message" : parsed.subject().replaceAll("[^A-Za-z0-9._-]", "_"); return subject + ".eml"; @@ -170,4 +240,7 @@ public class MailDocumentIngestionAdapter implements DocumentIngestionAdapter { } return null; } + + private record UUIDSource(java.util.UUID sourceId) { + } } diff --git a/src/main/java/at/procon/dip/ingestion/camel/GenericMailIngestionRoute.java b/src/main/java/at/procon/dip/ingestion/camel/GenericMailIngestionRoute.java new file mode 100644 index 0000000..8ad32a8 --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/camel/GenericMailIngestionRoute.java @@ -0,0 +1,207 @@ +package at.procon.dip.ingestion.camel; + +import at.procon.dip.domain.document.SourceType; +import at.procon.dip.ingestion.config.DipIngestionProperties; +import at.procon.dip.ingestion.mail.MailServerEndpointUriFactory; +import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy; +import at.procon.dip.ingestion.spi.SourceDescriptor; +import at.procon.dip.ingestion.service.DocumentIngestionGateway; +import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; +import at.procon.dip.runtime.config.RuntimeMode; +import jakarta.mail.Message; +import java.io.ByteArrayOutputStream; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mail.MailMessage; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +@Component +@ConditionalOnRuntimeMode(RuntimeMode.NEW) +@RequiredArgsConstructor +@Slf4j +public class GenericMailIngestionRoute extends RouteBuilder { + + static final String ROUTE_ID_MAIL_CONSUMER = "dip-mail-consumer"; + static final String ROUTE_ID_MAIL_INGEST = "dip-mail-ingest"; + + private final DipIngestionProperties properties; + private final DocumentIngestionGateway ingestionGateway; + private final MailServerEndpointUriFactory endpointUriFactory; + + @Override + public void configure() { + if (!properties.isEnabled() || !properties.isMailAdapterEnabled() || !properties.getMailRoute().isEnabled()) { + log.info("NEW mail ingestion Camel route disabled"); + return; + } + + String consumerUri = endpointUriFactory.buildConsumerUri(properties.getMailRoute()); + log.info("Configuring NEW mail ingestion route (protocol={}, host={}, folder={}, user={})", + properties.getMailRoute().getProtocol(), + properties.getMailRoute().getHost(), + properties.getMailRoute().getFolderName(), + properties.getMailRoute().getUsername()); + + errorHandler(deadLetterChannel("direct:dip-mail-error") + .maximumRedeliveries(3) + .redeliveryDelay(5000) + .retryAttemptedLogLevel(LoggingLevel.WARN) + .logStackTrace(true)); + + from("direct:dip-mail-error") + .routeId("dip-mail-error") + .process(exchange -> { + Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); + String subject = exchange.getIn().getHeader("mailSubject", String.class); + if (exception != null) { + log.error("NEW mail ingestion failed for subject '{}': {}", subject, exception.getMessage(), exception); + } + }); + + from(consumerUri) + .routeId(ROUTE_ID_MAIL_CONSUMER) + .to("direct:dip-mail-ingest"); + + from("direct:dip-mail-ingest") + .routeId(ROUTE_ID_MAIL_INGEST) + .process(this::ingestMailExchange) + .log(LoggingLevel.INFO, "Imported mail message into NEW ingestion pipeline: ${header.mailSubject}"); + } + + private void ingestMailExchange(Exchange exchange) throws Exception { + Message mailMessage = resolveMessage(exchange); + if (mailMessage == null) { + log.warn("Received null mail message, skipping NEW mail ingestion"); + return; + } + + String subject = mailMessage.getSubject(); + String from = mailMessage.getFrom() != null && mailMessage.getFrom().length > 0 + ? mailMessage.getFrom()[0].toString() : null; + String messageId = firstHeader(mailMessage, "Message-ID"); + String providerMessageKey = firstNonBlankHeader(exchange, "CamelMailUid", "uid", "mailUid", "MailUid"); + String providerThreadKey = firstNonBlankHeader(exchange, "CamelMailThreadId", "mailThreadId", "MailThreadId"); + + exchange.getIn().setHeader("mailSubject", subject); + + byte[] rawMime; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + mailMessage.writeTo(baos); + rawMime = baos.toByteArray(); + } + + Map attributes = new LinkedHashMap<>(); + putIfHasText(attributes, "subject", subject); + putIfHasText(attributes, "from", from); + putIfHasText(attributes, "providerType", properties.getMailRoute().getProtocol().name()); + putIfHasText(attributes, "providerProtocol", properties.getMailRoute().getProtocol().camelScheme()); + putIfHasText(attributes, "providerAccountKey", accountKey()); + putIfHasText(attributes, "providerFolderKey", properties.getMailRoute().getFolderName()); + putIfHasText(attributes, "providerMessageKey", providerMessageKey); + putIfHasText(attributes, "providerThreadKey", providerThreadKey); + putIfHasText(attributes, "messageId", messageId); + + SourceDescriptor descriptor = new SourceDescriptor( + null, + SourceType.MAIL, + buildSourceIdentifier(providerMessageKey, messageId), + buildSourceUri(providerMessageKey), + fallbackMailFileName(subject), + "message/rfc822", + rawMime, + null, + mailMessage.getReceivedDate() == null + ? OffsetDateTime.now() + : mailMessage.getReceivedDate().toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime(), + OriginalContentStoragePolicy.DEFAULT, + attributes + ); + ingestionGateway.ingest(descriptor); + } + + private Message resolveMessage(Exchange exchange) { + MailMessage camelMailMessage = exchange.getIn().getBody(MailMessage.class); + if (camelMailMessage != null) { + return camelMailMessage.getMessage(); + } + return exchange.getIn().getBody(Message.class); + } + + private String buildSourceIdentifier(String providerMessageKey, String messageId) { + if (StringUtils.hasText(providerMessageKey)) { + return properties.getMailRoute().getProtocol().name() + ":" + accountKey() + ":" + + defaultIfBlank(properties.getMailRoute().getFolderName(), "INBOX") + ":" + providerMessageKey; + } + if (StringUtils.hasText(messageId)) { + return messageId; + } + return UUID.randomUUID().toString(); + } + + private String buildSourceUri(String providerMessageKey) { + StringBuilder uri = new StringBuilder(); + uri.append(properties.getMailRoute().getProtocol().camelScheme()) + .append("://") + .append(properties.getMailRoute().getHost()) + .append("/") + .append(defaultIfBlank(properties.getMailRoute().getFolderName(), "INBOX")); + if (StringUtils.hasText(providerMessageKey)) { + uri.append("#").append(providerMessageKey); + } + return uri.toString(); + } + + private String fallbackMailFileName(String subject) { + String safeSubject = !StringUtils.hasText(subject) ? "mail-message" : subject.replaceAll("[^A-Za-z0-9._-]", "_"); + return safeSubject + ".eml"; + } + + private String firstHeader(Message message, String name) { + try { + String[] values = message.getHeader(name); + return values != null && values.length > 0 ? values[0] : null; + } catch (Exception e) { + return null; + } + } + + private String firstNonBlankHeader(Exchange exchange, String... names) { + for (String name : names) { + String value = exchange.getIn().getHeader(name, String.class); + if (StringUtils.hasText(value)) { + return value; + } + Object objectValue = exchange.getIn().getHeader(name); + if (objectValue != null) { + value = String.valueOf(objectValue); + if (StringUtils.hasText(value)) { + return value; + } + } + } + return null; + } + + private String accountKey() { + return defaultIfBlank(properties.getMailRoute().getAccountKey(), properties.getMailRoute().getUsername()); + } + + private void putIfHasText(Map attributes, String key, String value) { + if (StringUtils.hasText(value)) { + attributes.put(key, value); + } + } + + private String defaultIfBlank(String value, String defaultValue) { + return StringUtils.hasText(value) ? value : defaultValue; + } +} diff --git a/src/main/java/at/procon/dip/ingestion/config/DipIngestionProperties.java b/src/main/java/at/procon/dip/ingestion/config/DipIngestionProperties.java index 40d29ec..1086d38 100644 --- a/src/main/java/at/procon/dip/ingestion/config/DipIngestionProperties.java +++ b/src/main/java/at/procon/dip/ingestion/config/DipIngestionProperties.java @@ -1,8 +1,7 @@ package at.procon.dip.ingestion.config; import at.procon.dip.domain.access.DocumentVisibility; -import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode; -import at.procon.dip.runtime.config.RuntimeMode; +import at.procon.dip.ingestion.mail.MailServerProtocol; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.Positive; import lombok.Data; @@ -46,6 +45,7 @@ public class DipIngestionProperties { private boolean tedPackageAdapterEnabled = true; private boolean mailAdapterEnabled = false; + private MailRouteProperties mailRoute = new MailRouteProperties(); private String mailDefaultOwnerTenantKey; private DocumentVisibility mailDefaultVisibility = DocumentVisibility.TENANT; @@ -58,4 +58,35 @@ public class DipIngestionProperties { @NotBlank private String mailImportBatchId = "phase41-mail"; -} \ No newline at end of file + + @Data + public static class MailRouteProperties { + private boolean enabled = false; + private MailServerProtocol protocol = MailServerProtocol.IMAPS; + private String host; + private Integer port; + private String username; + private String password; + private String folderName = "INBOX"; + private String accountKey; + private boolean delete = false; + private boolean ssl = true; + /** + * true = fetch only unseen messages + */ + private boolean unseen = true; + /** + * true = do not mark messages as seen while consuming + */ + private boolean peek = true; + @Positive + private long delay = 15000; + @Positive + private int maxMessagesPerPoll = 20; + private int fetchSize = -1; + private boolean closeFolder = false; + private boolean debugMode = false; + private int connectionTimeout = 30000; + private int consumerConnectionTimeout = 30000; + } +} diff --git a/src/main/java/at/procon/dip/ingestion/mail/CamelMailServerEndpointUriFactory.java b/src/main/java/at/procon/dip/ingestion/mail/CamelMailServerEndpointUriFactory.java new file mode 100644 index 0000000..3234114 --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/mail/CamelMailServerEndpointUriFactory.java @@ -0,0 +1,58 @@ +package at.procon.dip.ingestion.mail; + +import at.procon.dip.ingestion.config.DipIngestionProperties; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +@Component +public class CamelMailServerEndpointUriFactory implements MailServerEndpointUriFactory { + + @Override + public String buildConsumerUri(DipIngestionProperties.MailRouteProperties properties) { + if (properties == null) { + throw new IllegalArgumentException("Mail route properties must not be null"); + } + if (!StringUtils.hasText(properties.getHost())) { + throw new IllegalArgumentException("dip.ingestion.mail-route.host must not be blank when the mail route is enabled"); + } + if (!StringUtils.hasText(properties.getUsername())) { + throw new IllegalArgumentException("dip.ingestion.mail-route.username must not be blank when the mail route is enabled"); + } + if (properties.getProtocol() == null) { + throw new IllegalArgumentException("dip.ingestion.mail-route.protocol must not be null"); + } + + StringBuilder uri = new StringBuilder(); + uri.append(properties.getProtocol().camelScheme()).append("://"); + uri.append(properties.getHost()); + if (properties.getPort() != null) { + uri.append(":").append(properties.getPort()); + } + uri.append("?username=").append(encode(properties.getUsername())); + uri.append("&password=").append(encode(properties.getPassword())); + uri.append("&folderName=").append(encode(defaultIfBlank(properties.getFolderName(), "INBOX"))); + uri.append("&delete=").append(properties.isDelete()); + uri.append("&peek=").append(properties.isPeek()); + uri.append("&unseen=").append(properties.isUnseen()); + uri.append("&delay=").append(properties.getDelay()); + uri.append("&maxMessagesPerPoll=").append(properties.getMaxMessagesPerPoll()); + uri.append("&fetchSize=").append(properties.getFetchSize()); + uri.append("&closeFolder=").append(properties.isCloseFolder()); + uri.append("&debugMode=").append(properties.isDebugMode()); + uri.append("&connectionTimeout=").append(properties.getConnectionTimeout()); + return uri.toString(); + } + + private String encode(String value) { + if (value == null) { + return ""; + } + return URLEncoder.encode(value, StandardCharsets.UTF_8); + } + + private String defaultIfBlank(String value, String defaultValue) { + return StringUtils.hasText(value) ? value : defaultValue; + } +} diff --git a/src/main/java/at/procon/dip/ingestion/mail/GenericMailProviderEnvelope.java b/src/main/java/at/procon/dip/ingestion/mail/GenericMailProviderEnvelope.java new file mode 100644 index 0000000..d6cb9a5 --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/mail/GenericMailProviderEnvelope.java @@ -0,0 +1,22 @@ +package at.procon.dip.ingestion.mail; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Default implementation of the generic provider metadata contract. + */ +public record GenericMailProviderEnvelope( + MailProviderType providerType, + String accountKey, + String folderKey, + String providerMessageKey, + String providerThreadKey, + Map providerAttributes +) implements MailProviderEnvelope { + + public GenericMailProviderEnvelope { + providerType = providerType == null ? MailProviderType.GENERIC : providerType; + providerAttributes = providerAttributes == null ? Map.of() : Map.copyOf(new LinkedHashMap<>(providerAttributes)); + } +} diff --git a/src/main/java/at/procon/dip/ingestion/mail/MailImportIdentityResolver.java b/src/main/java/at/procon/dip/ingestion/mail/MailImportIdentityResolver.java new file mode 100644 index 0000000..cbb53cd --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/mail/MailImportIdentityResolver.java @@ -0,0 +1,52 @@ +package at.procon.dip.ingestion.mail; + +import at.procon.dip.ingestion.service.MailMessageExtractionService; +import at.procon.ted.util.HashUtils; +import java.util.Locale; +import org.springframework.stereotype.Service; + +@Service +public class MailImportIdentityResolver { + + public String resolveRootSourceIdentifier(MailMessageExtractionService.ParsedMailMessage parsed, + MailProviderEnvelope envelope, + byte[] rawMime) { + if (envelope != null && hasText(envelope.providerMessageKey())) { + return "mail:" + envelope.providerType().name().toLowerCase(Locale.ROOT) + + ":" + normalizeIdentityFragment(envelope.accountKey()) + + ":" + normalizeIdentityFragment(envelope.folderKey()) + + ":" + normalizeIdentityFragment(envelope.providerMessageKey()); + } + if (hasText(parsed.messageId())) { + return "mail:message-id:" + normalizeIdentityFragment(parsed.messageId()); + } + return "mail:raw:" + HashUtils.computeSha256(rawMime); + } + + public String resolveAttachmentSourceIdentifier(String rootSourceIdentifier, + MailMessageExtractionService.MailAttachment attachment) { + if (hasText(attachment.partPath())) { + return rootSourceIdentifier + ":part:" + normalizeIdentityFragment(attachment.partPath()); + } + if (attachment.attachmentIndex() != null) { + return rootSourceIdentifier + ":attachment:" + attachment.attachmentIndex(); + } + return rootSourceIdentifier + ":attachment:" + + normalizeIdentityFragment(attachment.fileName()) + + ":" + HashUtils.computeSha256(attachment.data()); + } + + private String normalizeIdentityFragment(String raw) { + if (raw == null || raw.isBlank()) { + return "_"; + } + return raw.trim() + .replaceAll("^<|>$", "") + .replaceAll("\\s+", "_") + .replaceAll("[^A-Za-z0-9._:@/+\\-]", "_"); + } + + private boolean hasText(String value) { + return value != null && !value.isBlank(); + } +} diff --git a/src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelope.java b/src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelope.java new file mode 100644 index 0000000..e565445 --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelope.java @@ -0,0 +1,22 @@ +package at.procon.dip.ingestion.mail; + +import java.util.Map; + +/** + * Generic provider metadata for a mail message fetched from an upstream mail system. + * Implementations may be backed by IMAP, EWS, Graph, Gmail API, filesystem replay, etc. + */ +public interface MailProviderEnvelope { + + MailProviderType providerType(); + + String accountKey(); + + String folderKey(); + + String providerMessageKey(); + + String providerThreadKey(); + + Map providerAttributes(); +} diff --git a/src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributes.java b/src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributes.java new file mode 100644 index 0000000..6c71969 --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributes.java @@ -0,0 +1,81 @@ +package at.procon.dip.ingestion.mail; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Encodes/decodes generic mail provider metadata into SourceDescriptor attributes. + */ +public final class MailProviderEnvelopeAttributes { + + public static final String KEY_PROVIDER_TYPE = "mail.providerType"; + public static final String KEY_ACCOUNT_KEY = "mail.accountKey"; + public static final String KEY_FOLDER_KEY = "mail.folderKey"; + public static final String KEY_PROVIDER_MESSAGE_KEY = "mail.providerMessageKey"; + public static final String KEY_PROVIDER_THREAD_KEY = "mail.providerThreadKey"; + public static final String KEY_PREFIX_PROVIDER_ATTRIBUTE = "mail.providerAttribute."; + + private MailProviderEnvelopeAttributes() { + } + + public static GenericMailProviderEnvelope fromAttributes(Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return new GenericMailProviderEnvelope(MailProviderType.GENERIC, null, null, null, null, Map.of()); + } + MailProviderType providerType = parseProviderType(attributes.get(KEY_PROVIDER_TYPE)); + Map providerAttributes = new LinkedHashMap<>(); + for (Map.Entry entry : attributes.entrySet()) { + if (entry.getKey() != null && entry.getKey().startsWith(KEY_PREFIX_PROVIDER_ATTRIBUTE)) { + providerAttributes.put(entry.getKey().substring(KEY_PREFIX_PROVIDER_ATTRIBUTE.length()), entry.getValue()); + } + } + return new GenericMailProviderEnvelope( + providerType, + emptyToNull(attributes.get(KEY_ACCOUNT_KEY)), + emptyToNull(attributes.get(KEY_FOLDER_KEY)), + emptyToNull(attributes.get(KEY_PROVIDER_MESSAGE_KEY)), + emptyToNull(attributes.get(KEY_PROVIDER_THREAD_KEY)), + providerAttributes + ); + } + + public static Map merge(Map baseAttributes, MailProviderEnvelope envelope) { + Map merged = new LinkedHashMap<>(); + if (baseAttributes != null && !baseAttributes.isEmpty()) { + merged.putAll(baseAttributes); + } + if (envelope == null) { + return merged; + } + merged.put(KEY_PROVIDER_TYPE, envelope.providerType().name()); + putIfHasText(merged, KEY_ACCOUNT_KEY, envelope.accountKey()); + putIfHasText(merged, KEY_FOLDER_KEY, envelope.folderKey()); + putIfHasText(merged, KEY_PROVIDER_MESSAGE_KEY, envelope.providerMessageKey()); + putIfHasText(merged, KEY_PROVIDER_THREAD_KEY, envelope.providerThreadKey()); + if (envelope.providerAttributes() != null) { + envelope.providerAttributes().forEach((key, value) -> putIfHasText(merged, KEY_PREFIX_PROVIDER_ATTRIBUTE + key, value)); + } + return merged; + } + + private static MailProviderType parseProviderType(String raw) { + if (raw == null || raw.isBlank()) { + return MailProviderType.GENERIC; + } + try { + return MailProviderType.valueOf(raw.trim().toUpperCase()); + } catch (IllegalArgumentException ex) { + return MailProviderType.GENERIC; + } + } + + private static void putIfHasText(Map target, String key, String value) { + if (value != null && !value.isBlank()) { + target.put(key, value); + } + } + + private static String emptyToNull(String value) { + return value == null || value.isBlank() ? null : value; + } +} diff --git a/src/main/java/at/procon/dip/ingestion/mail/MailProviderType.java b/src/main/java/at/procon/dip/ingestion/mail/MailProviderType.java new file mode 100644 index 0000000..a58b4f4 --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/mail/MailProviderType.java @@ -0,0 +1,15 @@ +package at.procon.dip.ingestion.mail; + +/** + * Logical provider/source type for imported mail messages. + * The import pipeline should depend on this generic designation instead of + * binding itself to IMAP-specific semantics. + */ +public enum MailProviderType { + IMAP, + POP3, + EWS, + MICROSOFT_GRAPH, + GMAIL_API, + GENERIC +} diff --git a/src/main/java/at/procon/dip/ingestion/mail/MailServerEndpointUriFactory.java b/src/main/java/at/procon/dip/ingestion/mail/MailServerEndpointUriFactory.java new file mode 100644 index 0000000..a29b8d5 --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/mail/MailServerEndpointUriFactory.java @@ -0,0 +1,7 @@ +package at.procon.dip.ingestion.mail; + +import at.procon.dip.ingestion.config.DipIngestionProperties; + +public interface MailServerEndpointUriFactory { + String buildConsumerUri(DipIngestionProperties.MailRouteProperties properties); +} diff --git a/src/main/java/at/procon/dip/ingestion/mail/MailServerProtocol.java b/src/main/java/at/procon/dip/ingestion/mail/MailServerProtocol.java new file mode 100644 index 0000000..a4e5a8c --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/mail/MailServerProtocol.java @@ -0,0 +1,18 @@ +package at.procon.dip.ingestion.mail; + +public enum MailServerProtocol { + IMAP("imap"), + IMAPS("imaps"), + POP3("pop3"), + POP3S("pop3s"); + + private final String camelScheme; + + MailServerProtocol(String camelScheme) { + this.camelScheme = camelScheme; + } + + public String camelScheme() { + return camelScheme; + } +} diff --git a/src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java b/src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java index fbf6359..bbcd3ce 100644 --- a/src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java +++ b/src/main/java/at/procon/dip/ingestion/service/GenericDocumentImportService.java @@ -9,6 +9,7 @@ import at.procon.dip.domain.document.ContentRole; import at.procon.dip.domain.document.DocumentStatus; import at.procon.dip.domain.document.StorageType; import at.procon.dip.domain.document.entity.Document; +import at.procon.dip.domain.document.entity.DocumentSource; import at.procon.dip.domain.document.entity.DocumentContent; import at.procon.dip.domain.document.repository.DocumentRepository; import at.procon.dip.domain.document.repository.DocumentSourceRepository; @@ -92,6 +93,13 @@ public class GenericDocumentImportService { ? defaultAccessContext() : sourceDescriptor.accessContext(); + Optional existingBySource = resolveExistingBySourceIdentifier(sourceDescriptor); + if (existingBySource.isPresent()) { + Document document = existingBySource.get(); + List warnings = List.of("Source identifier already imported; returning existing document"); + return new ImportedDocumentResult(document, detection, warnings, true); + } + if (properties.isDeduplicateByContentHash()) { Optional existing = resolveDeduplicatedDocument(dedupHash, accessContext); if (existing.isPresent()) { @@ -506,6 +514,16 @@ public class GenericDocumentImportService { .findFirst(); } + private Optional resolveExistingBySourceIdentifier(SourceDescriptor sourceDescriptor) { + if (!StringUtils.hasText(sourceDescriptor.sourceIdentifier())) { + return Optional.empty(); + } + return documentSourceRepository.findBySourceTypeAndExternalSourceId( + sourceDescriptor.sourceType(), + sourceDescriptor.sourceIdentifier()) + .map(DocumentSource::getDocument); + } + private boolean matchesAccessContext(Document document, DocumentAccessContext accessContext) { String expectedTenantKey = accessContext.ownerTenant() == null ? null : accessContext.ownerTenant().tenantKey(); if (!equalsNullable(document.getOwnerTenant() != null ? document.getOwnerTenant().getTenantKey() : null, expectedTenantKey)) { diff --git a/src/main/java/at/procon/dip/ingestion/service/MailMessageExtractionService.java b/src/main/java/at/procon/dip/ingestion/service/MailMessageExtractionService.java index 84fcaf5..dadf421 100644 --- a/src/main/java/at/procon/dip/ingestion/service/MailMessageExtractionService.java +++ b/src/main/java/at/procon/dip/ingestion/service/MailMessageExtractionService.java @@ -1,10 +1,14 @@ package at.procon.dip.ingestion.service; +import at.procon.dip.domain.document.entity.MailRecipientType; import at.procon.dip.ingestion.util.DocumentImportSupport; +import jakarta.mail.Address; import jakarta.mail.BodyPart; +import jakarta.mail.Message; import jakarta.mail.Multipart; import jakarta.mail.Part; import jakarta.mail.Session; +import jakarta.mail.internet.InternetAddress; import jakarta.mail.internet.MimeMessage; import jakarta.mail.internet.MimeUtility; import java.io.ByteArrayInputStream; @@ -16,6 +20,7 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.jsoup.Jsoup; import org.springframework.stereotype.Service; @@ -28,51 +33,90 @@ public class MailMessageExtractionService { try { Session session = Session.getDefaultInstance(new Properties()); MimeMessage message = new MimeMessage(session, new ByteArrayInputStream(rawMime)); - String subject = message.getSubject(); - String from = message.getFrom() != null && message.getFrom().length > 0 ? message.getFrom()[0].toString() : null; - List recipients = new ArrayList<>(); - if (message.getAllRecipients() != null) { - for (var recipient : message.getAllRecipients()) { - recipients.add(recipient.toString()); - } - } + ParsedAddress fromAddress = firstAddress(message.getFrom()); + ParsedAddress replyToAddress = firstAddress(message.getReplyTo()); + List recipients = new ArrayList<>(); + collectRecipients(message, Message.RecipientType.TO, MailRecipientType.TO, recipients); + collectRecipients(message, Message.RecipientType.CC, MailRecipientType.CC, recipients); + collectRecipients(message, Message.RecipientType.BCC, MailRecipientType.BCC, recipients); + StringBuilder text = new StringBuilder(); StringBuilder html = new StringBuilder(); List attachments = new ArrayList<>(); - processPart(message, text, html, attachments); + AtomicInteger attachmentCounter = new AtomicInteger(); + processPart(message, text, html, attachments, "0", attachmentCounter); String normalizedText = text.length() > 0 ? text.toString().trim() : htmlToText(html.toString()); OffsetDateTime receivedAt = message.getReceivedDate() == null ? OffsetDateTime.now() : message.getReceivedDate().toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime(); - return new ParsedMailMessage(subject, from, recipients, receivedAt, normalizedText, html.toString(), attachments); + OffsetDateTime sentAt = message.getSentDate() == null ? null + : message.getSentDate().toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime(); + String subject = message.getSubject(); + return new ParsedMailMessage( + subject, + normalizeSubject(subject), + fromAddress.displayName(), + fromAddress.emailAddress(), + fromAddress.rawValue(), + replyToAddress.rawValue(), + readHeader(message, "Message-ID"), + readHeader(message, "In-Reply-To"), + readJoinedHeader(message, "References"), + sentAt, + receivedAt, + normalizedText, + html.toString(), + recipients, + attachments + ); } catch (Exception e) { throw new IllegalArgumentException("Failed to parse MIME message", e); } } - private void processPart(Part part, StringBuilder text, StringBuilder html, List attachments) throws Exception { + private void collectRecipients(MimeMessage message, + Message.RecipientType recipientType, + MailRecipientType mappedType, + List target) throws Exception { + Address[] addresses = message.getRecipients(recipientType); + if (addresses == null) { + return; + } + int sortOrder = target.size(); + for (Address address : addresses) { + ParsedAddress parsed = parseAddress(address); + target.add(new MailRecipient(mappedType, parsed.displayName(), parsed.emailAddress(), parsed.rawValue(), sortOrder++)); + } + } + + private void processPart(Part part, + StringBuilder text, + StringBuilder html, + List attachments, + String partPath, + AtomicInteger attachmentCounter) throws Exception { String disposition = part.getDisposition(); String contentType = part.getContentType() == null ? "application/octet-stream" : part.getContentType(); if (disposition != null && (Part.ATTACHMENT.equalsIgnoreCase(disposition) || Part.INLINE.equalsIgnoreCase(disposition)) && part.getFileName() != null) { - attachments.add(extractAttachment(part)); + attachments.add(extractAttachment(part, partPath, attachmentCounter.incrementAndGet())); return; } Object content = part.getContent(); if (content instanceof Multipart multipart) { for (int i = 0; i < multipart.getCount(); i++) { BodyPart bodyPart = multipart.getBodyPart(i); - processPart(bodyPart, text, html, attachments); + processPart(bodyPart, text, html, attachments, partPath + "." + i, attachmentCounter); } } else if (contentType.toLowerCase().contains("text/plain")) { - text.append(content.toString()).append(""); + text.append(content.toString()).append("\n"); } else if (contentType.toLowerCase().contains("text/html")) { - html.append(content.toString()).append(""); + html.append(content.toString()).append("\n"); } else if (part.getFileName() != null) { - attachments.add(extractAttachment(part)); + attachments.add(extractAttachment(part, partPath, attachmentCounter.incrementAndGet())); } } - private MailAttachment extractAttachment(Part part) throws Exception { + private MailAttachment extractAttachment(Part part, String partPath, int attachmentIndex) throws Exception { String fileName = part.getFileName(); if (fileName == null) { fileName = "attachment"; @@ -87,7 +131,84 @@ public class MailMessageExtractionService { in.transferTo(out); data = out.toByteArray(); } - return new MailAttachment(fileName, contentType, data, data.length, null); + return new MailAttachment( + fileName, + contentType, + data, + Long.valueOf(data.length), + null, + partPath, + normalizeDisposition(part.getDisposition()), + readHeader(part, "Content-ID"), + attachmentIndex + ); + } + + private ParsedAddress firstAddress(Address[] addresses) { + if (addresses == null || addresses.length == 0) { + return new ParsedAddress(null, null, null); + } + return parseAddress(addresses[0]); + } + + private ParsedAddress parseAddress(Address address) { + if (address == null) { + return new ParsedAddress(null, null, null); + } + if (address instanceof InternetAddress internetAddress) { + String displayName = internetAddress.getPersonal(); + String emailAddress = internetAddress.getAddress(); + return new ParsedAddress(displayName, emailAddress, internetAddress.toUnicodeString()); + } + return new ParsedAddress(null, null, address.toString()); + } + + private String normalizeSubject(String subject) { + if (subject == null || subject.isBlank()) { + return null; + } + String normalized = subject.trim(); + boolean changed; + do { + changed = false; + String next = normalized.replaceFirst("(?i)^(re|fw|fwd)\\s*:\\s*", "").trim(); + if (!next.equals(normalized)) { + normalized = next; + changed = true; + } + } while (changed && !normalized.isBlank()); + return normalized.replaceAll("\\s+", " "); + } + + private String normalizeDisposition(String disposition) { + if (disposition == null || disposition.isBlank()) { + return null; + } + return disposition.trim().toUpperCase(); + } + + private String readHeader(Part part, String name) { + try { + String[] headerValues = part.getHeader(name); + if (headerValues == null || headerValues.length == 0) { + return null; + } + return headerValues[0]; + } catch (Exception e) { + return null; + } + } + + private String readJoinedHeader(MimeMessage message, String name) { + try { + String[] headerValues = message.getHeader(name); + if (headerValues == null || headerValues.length == 0) { + return null; + } + return String.join(" ", headerValues); + } catch (Exception e) { + return null; + } } private String htmlToText(String html) { @@ -95,27 +216,70 @@ public class MailMessageExtractionService { return ""; } try { - return Jsoup.parse(html).text().replaceAll("\s+", " ").trim(); + return Jsoup.parse(html).text().replaceAll("\\s+", " ").trim(); } catch (Exception e) { log.debug("Falling back to naive HTML cleanup: {}", e.getMessage()); - return html.replaceAll("<[^>]+>", " ").replaceAll("\s+", " ").trim(); + return html.replaceAll("<[^>]+>", " ").replaceAll("\\s+", " ").trim(); } } public String serializeMessage(ParsedMailMessage parsed) { StringBuilder sb = new StringBuilder(); - if (parsed.subject() != null) sb.append("Subject: ").append(parsed.subject()).append(""); - if (parsed.from() != null) sb.append("From: ").append(parsed.from()).append(""); - if (!parsed.recipients().isEmpty()) sb.append("To: ").append(String.join(", ", parsed.recipients())).append(""); - sb.append(""); + if (parsed.subject() != null) sb.append("Subject: ").append(parsed.subject()).append("\n"); + if (parsed.fromRaw() != null) sb.append("From: ").append(parsed.fromRaw()).append("\n"); + List toRecipients = parsed.recipients().stream() + .filter(recipient -> recipient.recipientType() == MailRecipientType.TO) + .map(MailRecipient::rawValue) + .toList(); + if (!toRecipients.isEmpty()) sb.append("To: ").append(String.join(", ", toRecipients)).append("\n"); + sb.append("\n"); if (parsed.textBody() != null) sb.append(parsed.textBody()); return sb.toString().trim(); } - public record ParsedMailMessage(String subject, String from, List recipients, OffsetDateTime receivedAt, - String textBody, String htmlBody, List attachments) {} + public record ParsedMailMessage( + String subject, + String normalizedSubject, + String fromDisplayName, + String fromEmail, + String fromRaw, + String replyToRaw, + String messageId, + String inReplyTo, + String referencesHeader, + OffsetDateTime sentAt, + OffsetDateTime receivedAt, + String textBody, + String htmlBody, + List recipients, + List attachments + ) { + } + + public record MailRecipient( + MailRecipientType recipientType, + String displayName, + String emailAddress, + String rawValue, + int sortOrder + ) { + } + + public record MailAttachment( + String fileName, + String contentType, + byte[] data, + Long sizeBytes, + String path, + String partPath, + String disposition, + String contentId, + Integer attachmentIndex + ) { + public MailAttachment(String fileName, String contentType, byte[] data, long sizeBytes, String path) { + this(fileName, contentType, data, sizeBytes, path, null, null, null, null); + } - public record MailAttachment(String fileName, String contentType, byte[] data, long sizeBytes, String path) { public String safeTextPreview() { String extension = DocumentImportSupport.extensionOf(fileName); String mime = DocumentImportSupport.normalizeMediaType(contentType); @@ -128,4 +292,7 @@ public class MailMessageExtractionService { return DocumentImportSupport.normalizeText(new String(data, StandardCharsets.UTF_8)); } } + + private record ParsedAddress(String displayName, String emailAddress, String rawValue) { + } } diff --git a/src/main/java/at/procon/dip/ingestion/service/MailMetadataPersistenceService.java b/src/main/java/at/procon/dip/ingestion/service/MailMetadataPersistenceService.java new file mode 100644 index 0000000..6819fc8 --- /dev/null +++ b/src/main/java/at/procon/dip/ingestion/service/MailMetadataPersistenceService.java @@ -0,0 +1,137 @@ +package at.procon.dip.ingestion.service; + +import at.procon.dip.domain.document.entity.DocumentMailAttachment; +import at.procon.dip.domain.document.entity.DocumentMailMessage; +import at.procon.dip.domain.document.entity.DocumentMailRecipient; +import at.procon.dip.domain.document.repository.DocumentMailAttachmentRepository; +import at.procon.dip.domain.document.repository.DocumentMailMessageRepository; +import at.procon.dip.domain.document.repository.DocumentMailRecipientRepository; +import at.procon.dip.ingestion.mail.MailProviderEnvelope; +import at.procon.ted.util.HashUtils; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Service +@RequiredArgsConstructor +@Transactional +public class MailMetadataPersistenceService { + + private final DocumentMailMessageRepository mailMessageRepository; + private final DocumentMailRecipientRepository recipientRepository; + private final DocumentMailAttachmentRepository attachmentRepository; + + public void upsertMessageMetadata(UUID documentId, + UUID sourceId, + MailMessageExtractionService.ParsedMailMessage parsed, + MailProviderEnvelope envelope, + String rawMessageHash) { + DocumentMailMessage entity = mailMessageRepository.findById(documentId) + .orElse(DocumentMailMessage.builder().documentId(documentId).build()); + entity.setSourceId(sourceId); + entity.setProviderType(envelope == null ? at.procon.dip.ingestion.mail.MailProviderType.GENERIC : envelope.providerType()); + entity.setAccountKey(envelope == null ? null : envelope.accountKey()); + entity.setFolderKey(envelope == null ? null : envelope.folderKey()); + entity.setProviderMessageKey(envelope == null ? null : envelope.providerMessageKey()); + entity.setProviderThreadKey(envelope == null ? null : envelope.providerThreadKey()); + entity.setMessageId(parsed.messageId()); + entity.setInReplyTo(parsed.inReplyTo()); + entity.setReferencesHeader(parsed.referencesHeader()); + entity.setThreadKey(resolveThreadKey(parsed, envelope)); + entity.setSubject(parsed.subject()); + entity.setNormalizedSubject(parsed.normalizedSubject()); + entity.setFromDisplayName(parsed.fromDisplayName()); + entity.setFromEmail(parsed.fromEmail()); + entity.setFromRaw(parsed.fromRaw()); + entity.setReplyToRaw(parsed.replyToRaw()); + entity.setSentAt(parsed.sentAt()); + entity.setReceivedAt(parsed.receivedAt()); + entity.setRawMessageHash(rawMessageHash); + entity.setRawHeaderHash(HashUtils.computeSha256(buildHeaderFingerprint(parsed))); + mailMessageRepository.save(entity); + + recipientRepository.deleteByMailDocumentId(documentId); + List recipients = new ArrayList<>(); + for (MailMessageExtractionService.MailRecipient recipient : parsed.recipients()) { + recipients.add(DocumentMailRecipient.builder() + .mailDocumentId(documentId) + .recipientType(recipient.recipientType()) + .displayName(recipient.displayName()) + .emailAddress(recipient.emailAddress()) + .rawValue(recipient.rawValue()) + .sortOrder(recipient.sortOrder()) + .build()); + } + recipientRepository.saveAll(recipients); + } + + public void upsertAttachmentMetadata(UUID mailDocumentId, + UUID attachmentDocumentId, + MailMessageExtractionService.MailAttachment attachment, + String extractionStatus, + String errorMessage) { + Optional existing = Optional.empty(); + if (attachment.attachmentIndex() != null) { + existing = attachmentRepository.findByMailDocumentIdAndAttachmentIndex(mailDocumentId, attachment.attachmentIndex()); + } + if (existing.isEmpty() && attachment.partPath() != null && !attachment.partPath().isBlank()) { + existing = attachmentRepository.findByMailDocumentIdAndPartPath(mailDocumentId, attachment.partPath()); + } + + DocumentMailAttachment entity = existing.orElse(DocumentMailAttachment.builder() + .mailDocumentId(mailDocumentId) + .attachmentDocumentId(attachmentDocumentId) + .build()); + entity.setAttachmentDocumentId(attachmentDocumentId); + entity.setDisposition(attachment.disposition()); + entity.setContentId(attachment.contentId()); + entity.setFilename(attachment.fileName()); + entity.setMimeType(attachment.contentType()); + entity.setSizeBytes(attachment.sizeBytes()); + entity.setAttachmentIndex(attachment.attachmentIndex()); + entity.setPartPath(attachment.partPath()); + entity.setPathInArchive(attachment.path()); + entity.setExtractionStatus(extractionStatus == null || extractionStatus.isBlank() ? "IMPORTED" : extractionStatus); + entity.setErrorMessage(errorMessage); + attachmentRepository.save(entity); + } + + private String resolveThreadKey(MailMessageExtractionService.ParsedMailMessage parsed, MailProviderEnvelope envelope) { + if (envelope != null && envelope.providerThreadKey() != null && !envelope.providerThreadKey().isBlank()) { + return envelope.providerThreadKey(); + } + if (parsed.referencesHeader() != null && !parsed.referencesHeader().isBlank()) { + String[] refs = parsed.referencesHeader().trim().split("\\s+"); + return refs.length == 0 ? parsed.referencesHeader() : refs[0]; + } + if (parsed.inReplyTo() != null && !parsed.inReplyTo().isBlank()) { + return parsed.inReplyTo(); + } + if (parsed.messageId() != null && !parsed.messageId().isBlank()) { + return parsed.messageId(); + } + return parsed.normalizedSubject(); + } + + private String buildHeaderFingerprint(MailMessageExtractionService.ParsedMailMessage parsed) { + return String.join("|", + safe(parsed.subject()), + safe(parsed.fromRaw()), + safe(parsed.replyToRaw()), + safe(parsed.messageId()), + safe(parsed.inReplyTo()), + safe(parsed.referencesHeader()), + safe(parsed.sentAt() == null ? null : parsed.sentAt().toString()), + safe(parsed.receivedAt() == null ? null : parsed.receivedAt().toString()) + ); + } + + private String safe(String value) { + return value == null ? "" : value; + } +} diff --git a/src/main/java/at/procon/ted/camel/MailRoute.java b/src/main/java/at/procon/ted/camel/MailRoute.java index aa0d0e2..bfae4d5 100644 --- a/src/main/java/at/procon/ted/camel/MailRoute.java +++ b/src/main/java/at/procon/ted/camel/MailRoute.java @@ -3,6 +3,9 @@ package at.procon.ted.camel; import at.procon.dip.domain.document.SourceType; import at.procon.ted.config.TedProcessorProperties; import at.procon.dip.ingestion.service.DocumentIngestionGateway; +import at.procon.dip.ingestion.mail.GenericMailProviderEnvelope; +import at.procon.dip.ingestion.mail.MailProviderEnvelopeAttributes; +import at.procon.dip.ingestion.mail.MailProviderType; import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy; import at.procon.dip.ingestion.spi.SourceDescriptor; import at.procon.ted.service.attachment.AttachmentExtractor; @@ -187,9 +190,19 @@ public class MailRoute extends RouteBuilder { : exchange.getIn().getHeader("mailReceivedDate", Date.class).toInstant() .atZone(ZoneId.systemDefault()).toOffsetDateTime(), OriginalContentStoragePolicy.DEFAULT, - Map.of( - "subject", subject != null ? subject : "", - "from", from != null ? from : "" + MailProviderEnvelopeAttributes.merge( + Map.of( + "subject", subject != null ? subject : "", + "from", from != null ? from : "" + ), + new GenericMailProviderEnvelope( + MailProviderType.IMAP, + mail.getUsername(), + mail.getFolderName(), + resolveProviderMessageKey(exchange, mailMessage, sourceIdentifier), + resolveProviderThreadKey(mailMessage), + Map.of() + ) ) )); if (!result.warnings().isEmpty()) { @@ -534,4 +547,42 @@ public class MailRoute extends RouteBuilder { private byte[] data; private int size; } + + private String resolveProviderMessageKey(Exchange exchange, Message mailMessage, String fallback) throws Exception { + String camelUid = firstNonBlank( + exchange.getIn().getHeader("CamelMailUid", String.class), + exchange.getIn().getHeader("uid", String.class), + exchange.getIn().getHeader("mailUid", String.class) + ); + if (camelUid != null) { + return camelUid; + } + String[] messageIdHeader = mailMessage.getHeader("Message-ID"); + if (messageIdHeader != null && messageIdHeader.length > 0 && messageIdHeader[0] != null && !messageIdHeader[0].isBlank()) { + return messageIdHeader[0]; + } + return fallback; + } + + private String resolveProviderThreadKey(Message mailMessage) throws Exception { + String[] references = mailMessage.getHeader("References"); + if (references != null && references.length > 0 && references[0] != null && !references[0].isBlank()) { + return references[0]; + } + String[] inReplyTo = mailMessage.getHeader("In-Reply-To"); + if (inReplyTo != null && inReplyTo.length > 0 && inReplyTo[0] != null && !inReplyTo[0].isBlank()) { + return inReplyTo[0]; + } + return null; + } + + private String firstNonBlank(String... values) { + for (String value : values) { + if (value != null && !value.isBlank()) { + return value; + } + } + return null; + } + } diff --git a/src/main/resources/application-new.yml b/src/main/resources/application-new.yml index bb1ea12..78f68ac 100644 --- a/src/main/resources/application-new.yml +++ b/src/main/resources/application-new.yml @@ -1,6 +1,7 @@ dip: runtime: mode: NEW + search: # Default page size for search results default-page-size: 20 @@ -38,9 +39,11 @@ dip: embedding: enabled: true jobs: - enabled: false + enabled: true + parallel-batch-count: 2 process-in-batches: true - execution-batch-size: 20 + batch-size: 48 + execution-batch-size: 48 default-document-model: e5-default default-query-model: e5-default @@ -53,21 +56,25 @@ dip: external-e5: type: http-json - base-url: http://localhost:8001 + base-url: http://172.20.20.6:8001 connect-timeout: 5s read-timeout: 60s + batch-request: + truncate-text: false + truncate-length: 512 + chunk-size: 16 vector-sync-e5: type: http-vector-sync - base-url: http://localhost:8001 + base-url: http://172.20.20.6:8001 connect-timeout: 30s read-timeout: 300s headers: X-Client: dip batch-request: - truncate-text: true + truncate-text: false truncate-length: 512 - chunk-size: 8 + chunk-size: 16 models: @@ -219,6 +226,43 @@ dip: # Import batch marker for mail roots and attachments mail-import-batch-id: phase41-mail + # NEW Camel mail consumer route for provider-driven mail ingestion + mail-route: + # Enable/disable the NEW Camel mail consumer + enabled: false + # Generic mail server protocol (IMAP/IMAPS/POP3/POP3S) + protocol: IMAPS + # Mail server host + host: mail.mymagenta.business + # Mail server port; leave empty to use Camel component defaults + port: 993 + # Mailbox username + username: archiv@procon.co.at + # Mailbox password + password: ${MAIL_PASSWORD:worasigg} + # Folder/mailbox name + folder-name: INBOX + # Optional stable provider account key; falls back to username + account-key: + # Delete messages after successful processing + delete: false + # Consume only unseen messages + unseen: true + # Keep messages unread while consuming + peek: true + # Poll delay in milliseconds + delay: 15000 + # Maximum messages per poll + max-messages-per-poll: 20 + # Fetch entire messages by default + fetch-size: 10 + # Close folder after each poll cycle + close-folder: false + # Camel mail debug mode + debug-mode: false + # Socket connection timeout in milliseconds + connection-timeout: 30000 + # ted packages download configuration ted-download: # Enable/disable automatic package download @@ -254,6 +298,8 @@ dip: startup-backfill-enabled: false # Maximum number of legacy TED documents to backfill during startup startup-backfill-limit: 250 + structured-search-hybrid-candidate-limit: 5000 + structured-search-facet-bucket-limit: 12 migration: legacy-audit: @@ -289,8 +335,8 @@ dip: build-chunk-representations: true legacy-ted-embeddings: - enabled: true - startup-enabled: true + enabled: false + startup-enabled: false batch-size: 500 max-documents-per-run: 0 skip-when-primary-representation-missing: true diff --git a/src/main/resources/db/migration/V23__doc_mail_processing_stabilization_step1.sql b/src/main/resources/db/migration/V23__doc_mail_processing_stabilization_step1.sql new file mode 100644 index 0000000..12ababd --- /dev/null +++ b/src/main/resources/db/migration/V23__doc_mail_processing_stabilization_step1.sql @@ -0,0 +1,90 @@ +SET search_path TO DOC, TED, public; + +CREATE TABLE IF NOT EXISTS DOC.doc_mail_message ( + document_id uuid PRIMARY KEY REFERENCES DOC.doc_document(id) ON DELETE CASCADE, + source_id uuid REFERENCES DOC.doc_source(id) ON DELETE SET NULL, + provider_type varchar(64) NOT NULL, + account_key varchar(255), + folder_key varchar(255), + provider_message_key varchar(500), + provider_thread_key varchar(500), + message_id varchar(1000), + in_reply_to varchar(1000), + references_header text, + thread_key varchar(1000), + subject varchar(1000), + normalized_subject varchar(1000), + from_display_name varchar(500), + from_email varchar(500), + from_raw varchar(1000), + reply_to_raw text, + sent_at timestamptz, + received_at timestamptz, + raw_message_hash varchar(64), + raw_header_hash varchar(64), + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_doc_mail_message_source_id + ON DOC.doc_mail_message (source_id); +CREATE INDEX IF NOT EXISTS idx_doc_mail_message_message_id + ON DOC.doc_mail_message (message_id); +CREATE INDEX IF NOT EXISTS idx_doc_mail_message_thread_key + ON DOC.doc_mail_message (thread_key); +CREATE INDEX IF NOT EXISTS idx_doc_mail_message_provider_identity + ON DOC.doc_mail_message (provider_type, account_key, folder_key, provider_message_key); +CREATE UNIQUE INDEX IF NOT EXISTS uq_doc_mail_message_provider_identity + ON DOC.doc_mail_message (provider_type, account_key, folder_key, provider_message_key) + WHERE provider_message_key IS NOT NULL; + +CREATE TABLE IF NOT EXISTS DOC.doc_mail_recipient ( + id uuid PRIMARY KEY, + mail_document_id uuid NOT NULL REFERENCES DOC.doc_mail_message(document_id) ON DELETE CASCADE, + recipient_type varchar(16) NOT NULL, + display_name varchar(500), + email_address varchar(500), + raw_value text, + sort_order integer NOT NULL DEFAULT 0, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_doc_mail_recipient_mail + ON DOC.doc_mail_recipient (mail_document_id); +CREATE INDEX IF NOT EXISTS idx_doc_mail_recipient_type + ON DOC.doc_mail_recipient (recipient_type); +CREATE INDEX IF NOT EXISTS idx_doc_mail_recipient_email + ON DOC.doc_mail_recipient (email_address); + +CREATE TABLE IF NOT EXISTS DOC.doc_mail_attachment ( + id uuid PRIMARY KEY, + mail_document_id uuid NOT NULL REFERENCES DOC.doc_mail_message(document_id) ON DELETE CASCADE, + attachment_document_id uuid NOT NULL REFERENCES DOC.doc_document(id) ON DELETE CASCADE, + disposition varchar(32), + content_id varchar(500), + filename varchar(1000), + mime_type varchar(255), + size_bytes bigint, + attachment_index integer, + part_path varchar(500), + path_in_archive text, + extraction_status varchar(32) NOT NULL DEFAULT 'IMPORTED', + error_message text, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_doc_mail_attachment_mail + ON DOC.doc_mail_attachment (mail_document_id); +CREATE INDEX IF NOT EXISTS idx_doc_mail_attachment_document + ON DOC.doc_mail_attachment (attachment_document_id); +CREATE INDEX IF NOT EXISTS idx_doc_mail_attachment_part_path + ON DOC.doc_mail_attachment (part_path); +CREATE INDEX IF NOT EXISTS idx_doc_mail_attachment_attachment_index + ON DOC.doc_mail_attachment (attachment_index); +CREATE UNIQUE INDEX IF NOT EXISTS uq_doc_mail_attachment_mail_index + ON DOC.doc_mail_attachment (mail_document_id, attachment_index) + WHERE attachment_index IS NOT NULL; +CREATE UNIQUE INDEX IF NOT EXISTS uq_doc_mail_attachment_mail_part + ON DOC.doc_mail_attachment (mail_document_id, part_path) + WHERE part_path IS NOT NULL; diff --git a/src/test/java/at/procon/dip/architecture/NewRuntimeMustNotDependOnTedProcessorPropertiesTest.java b/src/test/java/at/procon/dip/architecture/NewRuntimeMustNotDependOnTedProcessorPropertiesTest.java index 9414e54..2ad0629 100644 --- a/src/test/java/at/procon/dip/architecture/NewRuntimeMustNotDependOnTedProcessorPropertiesTest.java +++ b/src/test/java/at/procon/dip/architecture/NewRuntimeMustNotDependOnTedProcessorPropertiesTest.java @@ -22,15 +22,15 @@ class NewRuntimeMustNotDependOnTedProcessorPropertiesTest { List> newRuntimeClasses = List.of( at.procon.dip.ingestion.service.GenericDocumentImportService.class, at.procon.dip.ingestion.camel.GenericFileSystemIngestionRoute.class, + at.procon.dip.ingestion.camel.GenericMailIngestionRoute.class, at.procon.dip.ingestion.controller.GenericDocumentImportController.class, at.procon.dip.ingestion.adapter.MailDocumentIngestionAdapter.class, + at.procon.dip.ingestion.service.MailMetadataPersistenceService.class, + at.procon.dip.ingestion.mail.MailImportIdentityResolver.class, at.procon.dip.ingestion.adapter.TedPackageDocumentIngestionAdapter.class, at.procon.dip.ingestion.service.TedPackageChildImportProcessor.class, at.procon.dip.domain.ted.service.TedNoticeProjectionService.class, at.procon.dip.domain.ted.startup.TedProjectionStartupRunner.class, - at.procon.dip.domain.ted.search.TedStructuredSearchRepository.class, - at.procon.dip.domain.ted.service.TedStructuredSearchService.class, - at.procon.dip.domain.ted.web.TedStructuredSearchController.class, at.procon.dip.search.engine.fulltext.PostgresFullTextSearchEngine.class, at.procon.dip.search.engine.trigram.PostgresTrigramSearchEngine.class, at.procon.dip.search.engine.semantic.PgVectorSemanticSearchEngine.class, diff --git a/src/test/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestratorTest.java b/src/test/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestratorTest.java index c2050c8..d94a75c 100644 --- a/src/test/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestratorTest.java +++ b/src/test/java/at/procon/dip/embedding/service/RepresentationEmbeddingOrchestratorTest.java @@ -2,7 +2,6 @@ package at.procon.dip.embedding.service; import at.procon.dip.domain.document.DistanceMetric; import at.procon.dip.domain.document.entity.Document; -import at.procon.dip.domain.document.entity.DocumentEmbedding; import at.procon.dip.domain.document.entity.DocumentTextRepresentation; import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository; import at.procon.dip.embedding.config.EmbeddingProperties; @@ -35,7 +34,7 @@ class RepresentationEmbeddingOrchestratorTest { @Mock private EmbeddingExecutionService executionService; @Mock - private EmbeddingPersistenceService persistenceService; + private EmbeddingJobExecutionPersistenceService executionPersistenceService; @Mock private DocumentTextRepresentationRepository representationRepository; @Mock @@ -57,11 +56,12 @@ class RepresentationEmbeddingOrchestratorTest { orchestrator = new RepresentationEmbeddingOrchestrator( jobService, executionService, - persistenceService, + executionPersistenceService, representationRepository, selectionPolicy, modelRegistry, - properties + properties, + Runnable::run ); } @@ -92,8 +92,8 @@ class RepresentationEmbeddingOrchestratorTest { ); when(representationRepository.findById(representationId)).thenReturn(Optional.of(representation)); when(modelRegistry.getRequired("mock-search")).thenReturn(model); - when(persistenceService.ensurePending(representationId, "mock-search")) - .thenReturn(DocumentEmbedding.builder().id(embeddingId).build()); + when(executionPersistenceService.startProcessing(representationId, "mock-search")) + .thenReturn(embeddingId); when(executionService.embedTexts(eq("mock-search"), eq(EmbeddingUseCase.DOCUMENT), any())) .thenReturn(new EmbeddingProviderResult( model, @@ -105,9 +105,8 @@ class RepresentationEmbeddingOrchestratorTest { orchestrator.processClaimedJob(job); - verify(persistenceService).markProcessing(embeddingId); - verify(persistenceService).saveCompleted(eq(embeddingId), any(EmbeddingProviderResult.class)); - verify(jobService).markCompleted(job.getId(), "req-1"); + verify(executionPersistenceService).startProcessing(representationId, "mock-search"); + verify(executionPersistenceService).completeJob(eq(embeddingId), any(EmbeddingProviderResult.class), eq(job.getId()), eq("req-1")); } @Test @@ -155,10 +154,10 @@ class RepresentationEmbeddingOrchestratorTest { DistanceMetric.COSINE, true, true, 4096, true ); when(modelRegistry.getRequired("e5-default")).thenReturn(model); - when(persistenceService.ensurePending(representationId1, "e5-default")) - .thenReturn(DocumentEmbedding.builder().id(embeddingId1).build()); - when(persistenceService.ensurePending(representationId2, "e5-default")) - .thenReturn(DocumentEmbedding.builder().id(embeddingId2).build()); + when(executionPersistenceService.startProcessing(representationId1, "e5-default")) + .thenReturn(embeddingId1); + when(executionPersistenceService.startProcessing(representationId2, "e5-default")) + .thenReturn(embeddingId2); when(executionService.embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), any())) .thenReturn(new EmbeddingProviderResult( model, @@ -171,15 +170,13 @@ class RepresentationEmbeddingOrchestratorTest { int processed = orchestrator.processNextReadyBatch(); assertThat(processed).isEqualTo(2); - verify(persistenceService).markProcessing(embeddingId1); - verify(persistenceService).markProcessing(embeddingId2); + verify(executionPersistenceService).startProcessing(representationId1, "e5-default"); + verify(executionPersistenceService).startProcessing(representationId2, "e5-default"); ArgumentCaptor> textsCaptor = ArgumentCaptor.forClass(List.class); verify(executionService, times(1)).embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), textsCaptor.capture()); assertThat(textsCaptor.getValue()).containsExactly("alpha", "beta"); - verify(persistenceService).saveCompleted(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null)); - verify(persistenceService).saveCompleted(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null)); - verify(jobService).markCompleted(job1.getId(), "batch-req-1"); - verify(jobService).markCompleted(job2.getId(), "batch-req-1"); + verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1")); + verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1")); } @Test @@ -209,8 +206,8 @@ class RepresentationEmbeddingOrchestratorTest { DistanceMetric.COSINE, true, false, 4096, true ); when(modelRegistry.getRequired("mock-search")).thenReturn(model); - when(persistenceService.ensurePending(representationId, "mock-search")) - .thenReturn(DocumentEmbedding.builder().id(embeddingId).build()); + when(executionPersistenceService.startProcessing(representationId, "mock-search")) + .thenReturn(embeddingId); when(executionService.embedTexts(eq("mock-search"), eq(EmbeddingUseCase.DOCUMENT), any())) .thenReturn(new EmbeddingProviderResult( model, @@ -223,7 +220,73 @@ class RepresentationEmbeddingOrchestratorTest { orchestrator.processNextReadyBatch(); verify(executionService, times(1)).embedTexts(eq("mock-search"), eq(EmbeddingUseCase.DOCUMENT), eq(List.of("gamma"))); - verify(persistenceService, never()).saveCompleted(eq(embeddingId), any(float[].class), eq(null)); - verify(persistenceService).saveCompleted(eq(embeddingId), any(EmbeddingProviderResult.class)); + verify(executionPersistenceService, never()).completeJob(eq(embeddingId), any(float[].class), eq(null), eq(job.getId()), anyString()); + verify(executionPersistenceService).completeJob(eq(embeddingId), any(EmbeddingProviderResult.class), eq(job.getId()), eq("req-2")); + } + + @Test + void processNextReadyBatch_should_claim_multiple_batches_and_start_them_in_parallel() { + properties.getJobs().setParallelBatchCount(2); + properties.getJobs().setBatchSize(2); + properties.getJobs().setProcessInBatches(true); + properties.getJobs().setExecutionBatchSize(2); + + UUID documentId = UUID.randomUUID(); + UUID representationId1 = UUID.randomUUID(); + UUID representationId2 = UUID.randomUUID(); + UUID representationId3 = UUID.randomUUID(); + UUID representationId4 = UUID.randomUUID(); + UUID embeddingId1 = UUID.randomUUID(); + UUID embeddingId2 = UUID.randomUUID(); + UUID embeddingId3 = UUID.randomUUID(); + UUID embeddingId4 = UUID.randomUUID(); + + EmbeddingJob job1 = EmbeddingJob.builder().id(UUID.randomUUID()).documentId(documentId).representationId(representationId1).modelKey("e5-default").jobType(EmbeddingJobType.DOCUMENT_EMBED).status(EmbeddingJobStatus.IN_PROGRESS).attemptCount(1).build(); + EmbeddingJob job2 = EmbeddingJob.builder().id(UUID.randomUUID()).documentId(documentId).representationId(representationId2).modelKey("e5-default").jobType(EmbeddingJobType.DOCUMENT_EMBED).status(EmbeddingJobStatus.IN_PROGRESS).attemptCount(1).build(); + EmbeddingJob job3 = EmbeddingJob.builder().id(UUID.randomUUID()).documentId(documentId).representationId(representationId3).modelKey("e5-default").jobType(EmbeddingJobType.DOCUMENT_EMBED).status(EmbeddingJobStatus.IN_PROGRESS).attemptCount(1).build(); + EmbeddingJob job4 = EmbeddingJob.builder().id(UUID.randomUUID()).documentId(documentId).representationId(representationId4).modelKey("e5-default").jobType(EmbeddingJobType.DOCUMENT_EMBED).status(EmbeddingJobStatus.IN_PROGRESS).attemptCount(1).build(); + + when(jobService.claimNextReadyJobs(4)).thenReturn(List.of(job1, job2, job3, job4)); + + Document document = Document.builder().id(documentId).build(); + when(representationRepository.findById(representationId1)).thenReturn(Optional.of(DocumentTextRepresentation.builder().id(representationId1).document(document).textBody("alpha").build())); + when(representationRepository.findById(representationId2)).thenReturn(Optional.of(DocumentTextRepresentation.builder().id(representationId2).document(document).textBody("beta").build())); + when(representationRepository.findById(representationId3)).thenReturn(Optional.of(DocumentTextRepresentation.builder().id(representationId3).document(document).textBody("gamma").build())); + when(representationRepository.findById(representationId4)).thenReturn(Optional.of(DocumentTextRepresentation.builder().id(representationId4).document(document).textBody("delta").build())); + + EmbeddingModelDescriptor model = new EmbeddingModelDescriptor( + "e5-default", "vector-sync-e5", "intfloat/multilingual-e5-large", 3, + DistanceMetric.COSINE, true, true, 4096, true + ); + when(modelRegistry.getRequired("e5-default")).thenReturn(model); + when(executionPersistenceService.startProcessing(representationId1, "e5-default")).thenReturn(embeddingId1); + when(executionPersistenceService.startProcessing(representationId2, "e5-default")).thenReturn(embeddingId2); + when(executionPersistenceService.startProcessing(representationId3, "e5-default")).thenReturn(embeddingId3); + when(executionPersistenceService.startProcessing(representationId4, "e5-default")).thenReturn(embeddingId4); + when(executionService.embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), any())) + .thenReturn(new EmbeddingProviderResult( + model, + List.of(new float[]{0.1f, 0.2f, 0.3f}, new float[]{0.4f, 0.5f, 0.6f}), + List.of(), + "batch-req-1", + 21 + )) + .thenReturn(new EmbeddingProviderResult( + model, + List.of(new float[]{0.7f, 0.8f, 0.9f}, new float[]{1.0f, 1.1f, 1.2f}), + List.of(), + "batch-req-2", + 25 + )); + + int processed = orchestrator.processNextReadyBatch(); + + assertThat(processed).isEqualTo(4); + verify(jobService).claimNextReadyJobs(4); + verify(executionService, times(2)).embedTexts(eq("e5-default"), eq(EmbeddingUseCase.DOCUMENT), any()); + verify(executionPersistenceService).completeJob(eq(embeddingId1), aryEq(new float[]{0.1f, 0.2f, 0.3f}), eq(null), eq(job1.getId()), eq("batch-req-1")); + verify(executionPersistenceService).completeJob(eq(embeddingId2), aryEq(new float[]{0.4f, 0.5f, 0.6f}), eq(null), eq(job2.getId()), eq("batch-req-1")); + verify(executionPersistenceService).completeJob(eq(embeddingId3), aryEq(new float[]{0.7f, 0.8f, 0.9f}), eq(null), eq(job3.getId()), eq("batch-req-2")); + verify(executionPersistenceService).completeJob(eq(embeddingId4), aryEq(new float[]{1.0f, 1.1f, 1.2f}), eq(null), eq(job4.getId()), eq("batch-req-2")); } } diff --git a/src/test/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapterBundleTest.java b/src/test/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapterBundleTest.java index 9cbde82..e9fdcc4 100644 --- a/src/test/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapterBundleTest.java +++ b/src/test/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapterBundleTest.java @@ -9,10 +9,13 @@ import at.procon.dip.domain.document.RelationType; import at.procon.dip.domain.document.SourceType; import at.procon.dip.domain.document.entity.Document; import at.procon.dip.domain.document.service.DocumentRelationService; +import at.procon.dip.domain.document.repository.DocumentSourceRepository; import at.procon.dip.ingestion.config.DipIngestionProperties; import at.procon.dip.ingestion.dto.ImportedDocumentResult; import at.procon.dip.ingestion.service.GenericDocumentImportService; import at.procon.dip.ingestion.service.MailMessageExtractionService; +import at.procon.dip.ingestion.service.MailMetadataPersistenceService; +import at.procon.dip.ingestion.mail.MailImportIdentityResolver; import at.procon.dip.ingestion.spi.IngestionResult; import at.procon.dip.ingestion.spi.SourceDescriptor; import at.procon.ted.service.attachment.ZipExtractionService; @@ -49,6 +52,12 @@ class MailDocumentIngestionAdapterBundleTest { @Mock private ZipExtractionService zipExtractionService; + @Mock + private MailMetadataPersistenceService mailMetadataPersistenceService; + + @Mock + private DocumentSourceRepository documentSourceRepository; + private MailDocumentIngestionAdapter adapter; @BeforeAll @@ -64,7 +73,8 @@ class MailDocumentIngestionAdapterBundleTest { properties.setExpandMailZipAttachments(false); properties.setMailImportBatchId("test-mail-bundle"); lenient().when(zipExtractionService.canHandle(any(), any())).thenReturn(false); - adapter = new MailDocumentIngestionAdapter(properties, importService, new MailMessageExtractionService(), relationService, zipExtractionService); + lenient().when(documentSourceRepository.findBySourceTypeAndExternalSourceId(any(), any())).thenReturn(java.util.Optional.empty()); + adapter = new MailDocumentIngestionAdapter(properties, importService, new MailMessageExtractionService(), relationService, zipExtractionService, new MailImportIdentityResolver(), mailMetadataPersistenceService, documentSourceRepository); } @ParameterizedTest(name = "ingest {0}") diff --git a/src/test/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapterFileSystemTest.java b/src/test/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapterFileSystemTest.java index a04a940..9233571 100644 --- a/src/test/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapterFileSystemTest.java +++ b/src/test/java/at/procon/dip/ingestion/adapter/MailDocumentIngestionAdapterFileSystemTest.java @@ -10,11 +10,14 @@ import at.procon.dip.domain.document.RelationType; import at.procon.dip.domain.document.SourceType; import at.procon.dip.domain.document.entity.Document; import at.procon.dip.domain.document.service.DocumentRelationService; +import at.procon.dip.domain.document.repository.DocumentSourceRepository; import at.procon.dip.domain.document.service.command.CreateDocumentRelationCommand; import at.procon.dip.ingestion.config.DipIngestionProperties; import at.procon.dip.ingestion.dto.ImportedDocumentResult; import at.procon.dip.ingestion.service.GenericDocumentImportService; import at.procon.dip.ingestion.service.MailMessageExtractionService; +import at.procon.dip.ingestion.service.MailMetadataPersistenceService; +import at.procon.dip.ingestion.mail.MailImportIdentityResolver; import at.procon.dip.ingestion.spi.IngestionResult; import at.procon.dip.ingestion.spi.SourceDescriptor; import at.procon.ted.service.attachment.ZipExtractionService; @@ -36,6 +39,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -51,6 +55,12 @@ class MailDocumentIngestionAdapterFileSystemTest { @Mock private ZipExtractionService zipExtractionService; + @Mock + private MailMetadataPersistenceService mailMetadataPersistenceService; + + @Mock + private DocumentSourceRepository documentSourceRepository; + private MailDocumentIngestionAdapter adapter; private final List importedDescriptors = new ArrayList<>(); @@ -69,10 +79,14 @@ class MailDocumentIngestionAdapterFileSystemTest { importService, extractionService, relationService, - zipExtractionService + zipExtractionService, + new MailImportIdentityResolver(), + mailMetadataPersistenceService, + documentSourceRepository ); when(zipExtractionService.canHandle(any(), any())).thenReturn(false); + lenient().when(documentSourceRepository.findBySourceTypeAndExternalSourceId(any(), any())).thenReturn(java.util.Optional.empty()); when(relationService.ensureRelation(any())).thenReturn(null); when(importService.importDocument(any())).thenAnswer(invocation -> { SourceDescriptor descriptor = invocation.getArgument(0); @@ -129,6 +143,7 @@ class MailDocumentIngestionAdapterFileSystemTest { .filter(d -> "notes.txt".equals(d.fileName())) .findFirst() .orElseThrow(); + assertEquals(SourceType.MAIL_ATTACHMENT, textAttachment.sourceType()); assertEquals("text/plain", textAttachment.mediaType()); assertNotNull(textAttachment.textContent(), "plain text attachment should expose preview text"); assertTrue(textAttachment.textContent().contains("attachment notes")); @@ -137,6 +152,7 @@ class MailDocumentIngestionAdapterFileSystemTest { .filter(d -> "legacy.xls".equals(d.fileName())) .findFirst() .orElseThrow(); + assertEquals(SourceType.MAIL_ATTACHMENT, binaryAttachment.sourceType()); assertNull(binaryAttachment.textContent(), "binary old Excel attachment must not be passed as text content"); assertEquals("application/vnd.ms-excel", binaryAttachment.mediaType()); assertNotNull(binaryAttachment.binaryContent()); diff --git a/src/test/java/at/procon/dip/ingestion/integration/MailBundleProcessingIntegrationTest.java b/src/test/java/at/procon/dip/ingestion/integration/MailBundleProcessingIntegrationTest.java index 3e673dc..52bc5b7 100644 --- a/src/test/java/at/procon/dip/ingestion/integration/MailBundleProcessingIntegrationTest.java +++ b/src/test/java/at/procon/dip/ingestion/integration/MailBundleProcessingIntegrationTest.java @@ -15,6 +15,9 @@ import at.procon.dip.domain.document.repository.DocumentEmbeddingRepository; import at.procon.dip.domain.document.repository.DocumentRelationRepository; import at.procon.dip.domain.document.repository.DocumentRepository; import at.procon.dip.domain.document.repository.DocumentSourceRepository; +import at.procon.dip.domain.document.repository.DocumentMailAttachmentRepository; +import at.procon.dip.domain.document.repository.DocumentMailMessageRepository; +import at.procon.dip.domain.document.repository.DocumentMailRecipientRepository; import at.procon.dip.domain.document.repository.DocumentTextRepresentationRepository; import at.procon.dip.domain.document.service.DocumentContentService; import at.procon.dip.domain.document.service.DocumentEmbeddingService; @@ -30,6 +33,8 @@ import at.procon.dip.ingestion.config.DipIngestionProperties; import at.procon.dip.ingestion.service.DocumentIngestionGateway; import at.procon.dip.ingestion.service.GenericDocumentImportService; import at.procon.dip.ingestion.service.MailMessageExtractionService; +import at.procon.dip.ingestion.service.MailMetadataPersistenceService; +import at.procon.dip.ingestion.mail.MailImportIdentityResolver; import at.procon.dip.ingestion.spi.IngestionResult; import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy; import at.procon.dip.ingestion.spi.SourceDescriptor; @@ -88,16 +93,16 @@ import static org.assertj.core.api.Assertions.assertThat; "spring.jpa.open-in-view=false", "spring.jpa.properties.hibernate.default_schema=DOC", "ted.vectorization.enabled=false", - "ted.generic-ingestion.enabled=true", - "ted.generic-ingestion.mail-adapter-enabled=true", - "ted.generic-ingestion.file-system-enabled=false", - "ted.generic-ingestion.rest-upload-enabled=false", - "ted.generic-ingestion.deduplicate-by-content-hash=false", - "ted.generic-ingestion.expand-mail-zip-attachments=true", - "ted.generic-ingestion.default-visibility=PUBLIC", - "ted.generic-ingestion.mail-default-visibility=RESTRICTED", - "ted.generic-ingestion.import-batch-id=test-mail-bundle", - "ted.generic-ingestion.mail-import-batch-id=test-mail-bundle-mail" + "dip.ingestion.enabled=true", + "dip.ingestion.mail-adapter-enabled=true", + "dip.ingestion.file-system-enabled=false", + "dip.ingestion.rest-upload-enabled=false", + "dip.ingestion.deduplicate-by-content-hash=false", + "dip.ingestion.expand-mail-zip-attachments=true", + "dip.ingestion.default-visibility=PUBLIC", + "dip.ingestion.mail-default-visibility=RESTRICTED", + "dip.ingestion.import-batch-id=test-mail-bundle", + "dip.ingestion.mail-import-batch-id=test-mail-bundle-mail" }) class MailBundleProcessingIntegrationTest { @@ -131,6 +136,12 @@ class MailBundleProcessingIntegrationTest { @Autowired private DocumentSourceRepository documentSourceRepository; @Autowired + private DocumentMailMessageRepository documentMailMessageRepository; + @Autowired + private DocumentMailRecipientRepository documentMailRecipientRepository; + @Autowired + private DocumentMailAttachmentRepository documentMailAttachmentRepository; + @Autowired private DocumentContentRepository documentContentRepository; @Autowired private DocumentRelationRepository documentRelationRepository; @@ -213,6 +224,9 @@ class MailBundleProcessingIntegrationTest { long totalRelations = documentRelationRepository.count(); assertThat(totalDocuments).isEqualTo(expectedRootDocuments + expectedAttachmentDocuments); + assertThat(documentMailMessageRepository.count()).isEqualTo(expectedRootDocuments); + assertThat(documentMailRecipientRepository.count()).isGreaterThanOrEqualTo(expectedRootDocuments); + assertThat(documentMailAttachmentRepository.count()).isEqualTo(expectedAttachmentDocuments); assertThat(totalSources).isEqualTo(totalDocuments); assertThat(totalRelations).isEqualTo(expectedAttachmentDocuments); @@ -284,6 +298,8 @@ class MailBundleProcessingIntegrationTest { assertThat(documentRelationRepository.count()).isEqualTo(parsed.attachments().size()); List sources = documentSourceRepository.findAll(); + assertThat(documentMailMessageRepository.count()).isEqualTo(1); + assertThat(documentMailAttachmentRepository.count()).isEqualTo(parsed.attachments().size()); assertThat(sources).anyMatch(s -> sample.getFileName().toString().equals(s.getSourceFilename())); assertThat(sources).anyMatch(s -> s.getSourceFilename() != null && s.getSourceFilename().toLowerCase().endsWith(".pdf")); assertThat(sources).anyMatch(s -> s.getSourceFilename() != null && s.getSourceFilename().toLowerCase().endsWith(".csv")); @@ -298,6 +314,12 @@ class MailBundleProcessingIntegrationTest { documentTextRepresentationRepository.deleteAll(); System.out.println("cleanup: content"); documentContentRepository.deleteAll(); + System.out.println("cleanup: mail attachments"); + documentMailAttachmentRepository.deleteAll(); + System.out.println("cleanup: mail recipients"); + documentMailRecipientRepository.deleteAll(); + System.out.println("cleanup: mail messages"); + documentMailMessageRepository.deleteAll(); System.out.println("cleanup: sources"); documentSourceRepository.deleteAll(); System.out.println("cleanup: documents"); @@ -348,6 +370,8 @@ class MailBundleProcessingIntegrationTest { GenericDocumentImportService.class, MailDocumentIngestionAdapter.class, MailMessageExtractionService.class, + MailMetadataPersistenceService.class, + MailImportIdentityResolver.class, ZipExtractionService.class, DocumentService.class, DocumentSourceService.class, diff --git a/src/test/java/at/procon/dip/ingestion/mail/CamelMailServerEndpointUriFactoryTest.java b/src/test/java/at/procon/dip/ingestion/mail/CamelMailServerEndpointUriFactoryTest.java new file mode 100644 index 0000000..cb98f76 --- /dev/null +++ b/src/test/java/at/procon/dip/ingestion/mail/CamelMailServerEndpointUriFactoryTest.java @@ -0,0 +1,48 @@ +package at.procon.dip.ingestion.mail; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import at.procon.dip.ingestion.config.DipIngestionProperties; +import org.junit.jupiter.api.Test; + +class CamelMailServerEndpointUriFactoryTest { + + private final CamelMailServerEndpointUriFactory factory = new CamelMailServerEndpointUriFactory(); + + @Test + void shouldBuildImapsConsumerUri() { + DipIngestionProperties.MailRouteProperties properties = new DipIngestionProperties.MailRouteProperties(); + properties.setProtocol(MailServerProtocol.IMAPS); + properties.setHost("mail.example.org"); + properties.setPort(993); + properties.setUsername("user@example.org"); + properties.setPassword("p@ss word"); + properties.setFolderName("INBOX/Orders"); + properties.setDelete(false); + properties.setPeek(true); + properties.setUnseen(true); + properties.setDelay(15000); + properties.setMaxMessagesPerPoll(25); + + String uri = factory.buildConsumerUri(properties); + + assertThat(uri).startsWith("imaps://mail.example.org:993?"); + assertThat(uri).contains("username=user%40example.org"); + assertThat(uri).contains("password=p%40ss+word"); + assertThat(uri).contains("folderName=INBOX%2FOrders"); + assertThat(uri).contains("peek=true"); + assertThat(uri).contains("unseen=true"); + assertThat(uri).contains("maxMessagesPerPoll=25"); + } + + @Test + void shouldFailWhenRequiredValuesAreMissing() { + DipIngestionProperties.MailRouteProperties properties = new DipIngestionProperties.MailRouteProperties(); + properties.setProtocol(MailServerProtocol.IMAPS); + + assertThatThrownBy(() -> factory.buildConsumerUri(properties)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("host"); + } +} diff --git a/src/test/java/at/procon/dip/ingestion/mail/MailImportIdentityResolverTest.java b/src/test/java/at/procon/dip/ingestion/mail/MailImportIdentityResolverTest.java new file mode 100644 index 0000000..6057050 --- /dev/null +++ b/src/test/java/at/procon/dip/ingestion/mail/MailImportIdentityResolverTest.java @@ -0,0 +1,60 @@ +package at.procon.dip.ingestion.mail; + +import at.procon.dip.domain.document.entity.MailRecipientType; +import at.procon.dip.ingestion.service.MailMessageExtractionService; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class MailImportIdentityResolverTest { + + private final MailImportIdentityResolver resolver = new MailImportIdentityResolver(); + + @Test + void shouldPreferProviderMessageKeyForRootIdentity() { + var parsed = new MailMessageExtractionService.ParsedMailMessage( + "Subject", + "Subject", + "Sender", + "sender@example.com", + "Sender ", + null, + "", + null, + null, + OffsetDateTime.now(), + OffsetDateTime.now(), + "body", + "", + List.of(new MailMessageExtractionService.MailRecipient(MailRecipientType.TO, null, "to@example.com", "to@example.com", 0)), + List.of() + ); + var envelope = new GenericMailProviderEnvelope(MailProviderType.IMAP, "account-a", "Inbox", "uid-77", null, Map.of()); + + String identity = resolver.resolveRootSourceIdentifier(parsed, envelope, "raw".getBytes()); + + assertThat(identity).contains("imap").contains("account-a").contains("Inbox").contains("uid-77"); + } + + @Test + void shouldUsePartPathForAttachmentIdentity() { + var attachment = new MailMessageExtractionService.MailAttachment( + "offer.pdf", + "application/pdf", + new byte[]{1, 2, 3}, + 3l, + null, + "0.1.2", + "ATTACHMENT", + null, + 1 + ); + + String identity = resolver.resolveAttachmentSourceIdentifier("mail:imap:account:Inbox:uid-77", attachment); + + assertThat(identity).endsWith(":part:0.1.2"); + } +} diff --git a/src/test/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributesTest.java b/src/test/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributesTest.java new file mode 100644 index 0000000..6958b55 --- /dev/null +++ b/src/test/java/at/procon/dip/ingestion/mail/MailProviderEnvelopeAttributesTest.java @@ -0,0 +1,31 @@ +package at.procon.dip.ingestion.mail; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class MailProviderEnvelopeAttributesTest { + + @Test + void shouldRoundTripGenericProviderEnvelopeThroughAttributes() { + GenericMailProviderEnvelope envelope = new GenericMailProviderEnvelope( + MailProviderType.MICROSOFT_GRAPH, + "mailbox-a", + "Inbox/Subfolder", + "provider-message-42", + "provider-thread-7", + Map.of("tenant", "demo") + ); + + Map attributes = MailProviderEnvelopeAttributes.merge(Map.of("subject", "Hello"), envelope); + GenericMailProviderEnvelope restored = MailProviderEnvelopeAttributes.fromAttributes(attributes); + + assertThat(restored.providerType()).isEqualTo(MailProviderType.MICROSOFT_GRAPH); + assertThat(restored.accountKey()).isEqualTo("mailbox-a"); + assertThat(restored.folderKey()).isEqualTo("Inbox/Subfolder"); + assertThat(restored.providerMessageKey()).isEqualTo("provider-message-42"); + assertThat(restored.providerThreadKey()).isEqualTo("provider-thread-7"); + assertThat(restored.providerAttributes()).containsEntry("tenant", "demo"); + } +}