You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

486 lines
21 KiB
Java

package at.procon.ted.camel;
import at.procon.ted.config.TedProcessorProperties;
import at.procon.ted.service.attachment.AttachmentExtractor;
import at.procon.ted.service.attachment.AttachmentProcessingService;
import jakarta.mail.BodyPart;
import jakarta.mail.Message;
import jakarta.mail.Multipart;
import jakarta.mail.Part;
import jakarta.mail.Session;
import jakarta.mail.internet.MimeMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.jsoup.Jsoup;
import org.springframework.stereotype.Component;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* Apache Camel route for IMAP mail processing.
*
* Features:
* - IMAP connection with SSL/TLS to mail server
* - MIME message decoding
* - Asynchronous attachment processing with idempotency
* - PDF text extraction
* - ZIP file extraction with recursive processing
* - HTML to plain text conversion
*
* @author Martin.Schweitzer@procon.co.at and claude.ai
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class MailRoute extends RouteBuilder {
private static final String ROUTE_ID_IMAP = "mail-imap-consumer";
private static final String ROUTE_ID_MIME_FILE = "mail-mime-file-consumer";
private static final String ROUTE_ID_MIME = "mail-mime-decoder";
private static final String ROUTE_ID_ATTACHMENT = "mail-attachment-processor";
private static final String ROUTE_ID_ATTACHMENT_ASYNC = "mail-attachment-async";
private final TedProcessorProperties properties;
private final AttachmentProcessingService attachmentProcessingService;
@Override
public void configure() throws Exception {
TedProcessorProperties.MailProperties mail = properties.getMail();
if (!mail.isEnabled()) {
log.info("Mail processing is disabled, skipping route configuration");
return;
}
log.info("Configuring mail routes (host={}, port={}, ssl={}, user={})",
mail.getHost(), mail.getPort(), mail.isSsl(), mail.getUsername());
// Ensure attachment output directory exists
File attachmentDir = new File(mail.getAttachmentOutputDirectory());
if (!attachmentDir.exists()) {
attachmentDir.mkdirs();
log.info("Created attachment output directory: {}", attachmentDir.getAbsolutePath());
}
// Error handler for mail processing
errorHandler(deadLetterChannel("direct:mail-error-handler")
.maximumRedeliveries(3)
.redeliveryDelay(5000)
.retryAttemptedLogLevel(LoggingLevel.WARN)
.logStackTrace(true));
// Mail error handler route
from("direct:mail-error-handler")
.routeId("mail-error-handler")
.process(exchange -> {
Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
String subject = exchange.getIn().getHeader("mailSubject", String.class);
if (exception != null) {
log.error("Mail processing error for subject '{}': {}", subject, exception.getMessage(), exception);
}
})
.log(LoggingLevel.ERROR, "Mail processing failed: ${exception.message}");
// IMAP consumer route
from(buildImapUri())
.routeId(ROUTE_ID_IMAP)
.log(LoggingLevel.INFO, "Received email: ${header.subject} from ${header.from}")
.to("direct:mime");
// MIME file consumer route - reads .eml files from directory
if (mail.isMimeInputEnabled()) {
configureMimeFileConsumer(mail);
}
// MIME decoder route - decodes the email and extracts content/attachments
from("direct:mime")
.routeId(ROUTE_ID_MIME)
.process(exchange -> {
Message mailMessage = exchange.getIn().getBody(Message.class);
if (mailMessage == null) {
log.warn("Received null mail message, skipping");
return;
}
String subject = mailMessage.getSubject();
String from = mailMessage.getFrom() != null && mailMessage.getFrom().length > 0
? mailMessage.getFrom()[0].toString() : "unknown";
log.info("Processing MIME message: subject='{}', from='{}'", subject, from);
// Store mail metadata in headers
exchange.getIn().setHeader("mailSubject", subject);
exchange.getIn().setHeader("mailFrom", from);
exchange.getIn().setHeader("mailReceivedDate", mailMessage.getReceivedDate());
// Process the content
List<AttachmentInfo> attachments = new ArrayList<>();
StringBuilder textContent = new StringBuilder();
StringBuilder htmlContent = new StringBuilder();
processMessageContent(mailMessage, textContent, htmlContent, attachments);
// Convert HTML to plain text if we have HTML but no plain text
String finalTextContent;
if (textContent.length() == 0 && htmlContent.length() > 0) {
finalTextContent = convertHtmlToText(htmlContent.toString());
log.debug("Converted HTML mail to plain text ({} chars)", finalTextContent.length());
} else {
finalTextContent = textContent.toString();
}
// Store results
exchange.getIn().setHeader("mailTextContent", finalTextContent);
exchange.getIn().setHeader("mailHtmlContent", htmlContent.toString());
exchange.getIn().setHeader("mailAttachments", attachments);
exchange.getIn().setHeader("mailAttachmentCount", attachments.size());
log.info("MIME decoded: subject='{}', textLength={}, htmlLength={}, attachments={}",
subject, finalTextContent.length(), htmlContent.length(), attachments.size());
})
// Queue attachments for async processing
.choice()
.when(simple("${header.mailAttachmentCount} > 0"))
.log(LoggingLevel.INFO, "Queueing ${header.mailAttachmentCount} attachments for async processing")
.otherwise()
.log(LoggingLevel.DEBUG, "No attachments in email: ${header.mailSubject}")
.end()
// Process attachments asynchronously via SEDA
.filter(simple("${header.mailAttachmentCount} > 0"))
.split(header("mailAttachments"))
.to("seda:attachment-async?waitForTaskToComplete=Never&size=500")
.end()
.end()
.log(LoggingLevel.INFO, "Mail processing completed: ${header.mailSubject}");
// Async attachment processor route via SEDA
from("seda:attachment-async?concurrentConsumers=2&size=500")
.routeId(ROUTE_ID_ATTACHMENT_ASYNC)
.to("direct:attachment");
// Attachment processor route - handles individual attachments with idempotency
from("direct:attachment")
.routeId(ROUTE_ID_ATTACHMENT)
.process(exchange -> {
AttachmentInfo attachment = exchange.getIn().getBody(AttachmentInfo.class);
if (attachment == null) {
log.warn("Received null attachment info, skipping");
return;
}
String mailSubject = exchange.getIn().getHeader("mailSubject", String.class);
String mailFrom = exchange.getIn().getHeader("mailFrom", String.class);
String parentHash = exchange.getIn().getHeader("parentHash", String.class);
log.info("Processing attachment: '{}' ({} bytes, type={}) from email '{}'",
attachment.getFilename(), attachment.getSize(),
attachment.getContentType(), mailSubject);
// Process attachment with idempotency check
AttachmentProcessingService.ProcessingResult result = attachmentProcessingService.processAttachment(
attachment.getData(),
attachment.getFilename(),
attachment.getContentType(),
mailSubject,
mailFrom,
parentHash
);
if (result.isDuplicate()) {
log.info("Attachment is duplicate, skipping: '{}'", attachment.getFilename());
exchange.setProperty("isDuplicate", true);
return;
}
if (!result.isSuccess()) {
log.warn("Attachment processing failed: '{}' - {}",
attachment.getFilename(), result.errorMessage());
return;
}
// Store result in exchange
exchange.getIn().setHeader("attachmentId", result.attachment().getId());
exchange.getIn().setHeader("attachmentHash", result.attachment().getContentHash());
exchange.getIn().setHeader("extractedText",
result.attachment().getExtractedText() != null
? result.attachment().getExtractedText().length() + " chars"
: "none");
// Queue child attachments (from ZIP) for recursive processing
if (result.hasChildren()) {
log.info("Queueing {} child attachments from ZIP '{}'",
result.childAttachments().size(), attachment.getFilename());
for (AttachmentExtractor.ChildAttachment child : result.childAttachments()) {
// Create AttachmentInfo for child and send to SEDA queue
AttachmentInfo childInfo = new AttachmentInfo(
child.filename(),
child.contentType(),
child.data(),
child.data().length
);
// Send to SEDA for async processing with parent hash
getContext().createProducerTemplate().sendBodyAndHeaders(
"seda:attachment-async?waitForTaskToComplete=Never",
childInfo,
java.util.Map.of(
"mailSubject", mailSubject != null ? mailSubject : "",
"mailFrom", mailFrom != null ? mailFrom : "",
"parentHash", result.attachment().getContentHash(),
"pathInArchive", child.pathInArchive()
)
);
}
}
})
.choice()
.when(exchangeProperty("isDuplicate").isEqualTo(true))
.log(LoggingLevel.DEBUG, "Skipped duplicate attachment")
.otherwise()
.log(LoggingLevel.INFO, "Attachment processed: ${header.attachmentId}, extracted=${header.extractedText}")
.end();
}
/**
* Configure the MIME file consumer route.
*/
private void configureMimeFileConsumer(TedProcessorProperties.MailProperties mail) throws Exception {
// Ensure MIME input directory exists
File mimeInputDir = new File(mail.getMimeInputDirectory());
if (!mimeInputDir.exists()) {
mimeInputDir.mkdirs();
log.info("Created MIME input directory: {}", mimeInputDir.getAbsolutePath());
}
String mimeFileUri = buildMimeFileUri(mail);
log.info("Configuring MIME file consumer: {}", mimeFileUri);
// MIME file consumer - reads .eml files and sends to direct:mime
from(mimeFileUri)
.routeId(ROUTE_ID_MIME_FILE)
.log(LoggingLevel.INFO, "Reading MIME file: ${header.CamelFileName}")
.process(exchange -> {
// Read file content as bytes
byte[] fileContent = exchange.getIn().getBody(byte[].class);
String filename = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
if (fileContent == null || fileContent.length == 0) {
log.warn("Empty MIME file: {}", filename);
throw new RuntimeException("Empty MIME file: " + filename);
}
log.debug("Parsing MIME file: {} ({} bytes)", filename, fileContent.length);
// Parse the file as a MimeMessage
Session session = Session.getDefaultInstance(new Properties());
try (ByteArrayInputStream bais = new ByteArrayInputStream(fileContent)) {
MimeMessage mimeMessage = new MimeMessage(session, bais);
// Set the parsed message as body for direct:mime
exchange.getIn().setBody(mimeMessage);
log.info("Parsed MIME file: {} -> subject='{}'",
filename, mimeMessage.getSubject());
}
})
.to("direct:mime")
.log(LoggingLevel.INFO, "MIME file processed successfully: ${header.CamelFileName}");
}
/**
* Build the file URI for MIME file consumer.
*/
private String buildMimeFileUri(TedProcessorProperties.MailProperties mail) {
String directory = mail.getMimeInputDirectory().replace("\\", "/");
StringBuilder uri = new StringBuilder("file:");
uri.append(directory);
uri.append("?");
// File pattern
uri.append("include=").append(mail.getMimeInputPattern());
// Polling interval
uri.append("&delay=").append(mail.getMimeInputPollInterval());
// Move to .processed after successful processing
uri.append("&move=.processed");
// Move to .error on failure
uri.append("&moveFailed=.error");
// Read lock to prevent processing incomplete files
uri.append("&readLock=changed");
uri.append("&readLockCheckInterval=1000");
uri.append("&readLockTimeout=30000");
// Sort by name for consistent ordering
uri.append("&sortBy=file:name");
// Don't process hidden files
uri.append("&exclude=^\\..*");
// Recursive scanning disabled by default
uri.append("&recursive=false");
return uri.toString();
}
/**
* Build the IMAP URI for the mail consumer.
*/
private String buildImapUri() {
TedProcessorProperties.MailProperties mail = properties.getMail();
StringBuilder uri = new StringBuilder();
uri.append(mail.isSsl() ? "imaps://" : "imap://");
uri.append(mail.getHost());
uri.append(":").append(mail.getPort());
uri.append("?username=").append(encodeUriComponent(mail.getUsername()));
uri.append("&password=").append(encodeUriComponent(mail.getPassword()));
uri.append("&folderName=").append(mail.getFolderName());
uri.append("&delete=").append(mail.isDelete());
// peek=false means messages will be marked as SEEN after fetch
// peek=true means messages will NOT be marked as SEEN (peek only)
uri.append("&peek=").append(!mail.isSeen());
uri.append("&unseen=").append(mail.isUnseen());
uri.append("&delay=").append(mail.getDelay());
uri.append("&maxMessagesPerPoll=").append(mail.getMaxMessagesPerPoll());
// Connection settings
uri.append("&connectionTimeout=30000");
uri.append("&fetchSize=-1"); // Fetch entire message
uri.append("&debugMode=false");
log.info("IMAP URI configured (password hidden): {}://{}:{}?username={}&folderName={}",
mail.isSsl() ? "imaps" : "imap", mail.getHost(), mail.getPort(),
mail.getUsername(), mail.getFolderName());
return uri.toString();
}
/**
* URL-encode a URI component.
*/
private String encodeUriComponent(String value) {
if (value == null) return "";
try {
return java.net.URLEncoder.encode(value, StandardCharsets.UTF_8);
} catch (Exception e) {
return value;
}
}
/**
* Recursively process message content to extract text, HTML, and attachments.
*/
private void processMessageContent(Part part, StringBuilder textContent,
StringBuilder htmlContent, List<AttachmentInfo> attachments) throws Exception {
String contentType = part.getContentType().toLowerCase();
String disposition = part.getDisposition();
// Check if this is an attachment
if (disposition != null && (disposition.equalsIgnoreCase(Part.ATTACHMENT)
|| disposition.equalsIgnoreCase(Part.INLINE))) {
extractAttachment(part, attachments);
return;
}
Object content = part.getContent();
if (content instanceof Multipart multipart) {
// Process each part of the multipart message
for (int i = 0; i < multipart.getCount(); i++) {
BodyPart bodyPart = multipart.getBodyPart(i);
processMessageContent(bodyPart, textContent, htmlContent, attachments);
}
} else if (contentType.contains("text/plain")) {
// Plain text content
String text = content.toString();
textContent.append(text);
} else if (contentType.contains("text/html")) {
// HTML content
String html = content.toString();
htmlContent.append(html);
} else if (part.getFileName() != null) {
// Has filename - treat as attachment
extractAttachment(part, attachments);
}
}
/**
* Extract attachment data from a message part.
*/
private void extractAttachment(Part part, List<AttachmentInfo> attachments) throws Exception {
String filename = part.getFileName();
if (filename == null) {
filename = "unnamed_attachment";
}
// Decode filename if necessary (might be MIME-encoded)
try {
filename = jakarta.mail.internet.MimeUtility.decodeText(filename);
} catch (Exception e) {
log.debug("Could not decode filename: {}", filename);
}
String contentType = part.getContentType();
// Read attachment data
byte[] data;
try (InputStream is = part.getInputStream()) {
data = is.readAllBytes();
}
AttachmentInfo info = new AttachmentInfo(filename, contentType, data, data.length);
attachments.add(info);
log.debug("Extracted attachment: '{}' ({} bytes, type={})", filename, data.length, contentType);
}
/**
* Convert HTML content to plain text using JSoup.
*/
private String convertHtmlToText(String html) {
if (html == null || html.isBlank()) {
return "";
}
try {
// Parse HTML and extract text
org.jsoup.nodes.Document doc = Jsoup.parse(html);
// Remove script and style elements
doc.select("script, style").remove();
// Get text with whitespace preservation
String text = doc.text();
// Clean up excessive whitespace
text = text.replaceAll("\\s+", " ").trim();
return text;
} catch (Exception e) {
log.warn("Failed to convert HTML to text: {}", e.getMessage());
// Fallback: strip HTML tags with regex
return html.replaceAll("<[^>]+>", " ").replaceAll("\\s+", " ").trim();
}
}
/**
* DTO for attachment information.
*/
@lombok.Data
@lombok.AllArgsConstructor
public static class AttachmentInfo {
private String filename;
private String contentType;
private byte[] data;
private int size;
}
}