You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DIP/src/main/java/at/procon/ted/camel/TedPackageDownloadRoute.java

159 lines
7.5 KiB
Java

package at.procon.ted.camel;
import at.procon.ted.config.TedProcessorProperties;
import at.procon.ted.service.TedPackageDownloadService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
/**
* Apache Camel Route for automatic download of TED Daily Packages.
*
* Features:
* - Scheduled Download (hourly configurable)
* - Idempotent Downloads via Hash
* - Rate Limiting
* - Integration with XML processing
* - Error Handling
*
* @author Martin.Schweitzer@procon.co.at and claude.ai
*/
@Component
@ConditionalOnProperty(name = "ted.download.use-service-based", havingValue = "true")
@RequiredArgsConstructor
@Slf4j
public class TedPackageDownloadRoute extends RouteBuilder {
private static final String ROUTE_ID_SCHEDULER = "ted-package-download-scheduler";
private static final String ROUTE_ID_DOWNLOADER = "ted-package-downloader";
private static final String ROUTE_ID_XML_PROCESSOR = "ted-package-xml-processor";
private final TedProcessorProperties properties;
private final TedPackageDownloadService downloadService;
/**
* Creates thread pool for parallel XML processing.
* Maximum 3 parallel processes for DB loading (lower priority due to vectorization).
*/
private java.util.concurrent.ExecutorService executorService() {
return java.util.concurrent.Executors.newFixedThreadPool(
3,
r -> {
Thread thread = new Thread(r);
thread.setName("ted-xml-processor-" + thread.getId());
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY - 1); // Lower priority than vectorization
return thread;
}
);
}
@Override
public void configure() throws Exception {
// Error Handler
errorHandler(defaultErrorHandler()
.maximumRedeliveries(3)
.redeliveryDelay(5000)
.retryAttemptedLogLevel(LoggingLevel.WARN));
// Scheduler: Periodic check for new packages
from("timer:package-download-scheduler?period={{ted.download.poll-interval:3600000}}")
.routeId(ROUTE_ID_SCHEDULER)
.autoStartup("{{ted.download.enabled:false}}")
.log(LoggingLevel.DEBUG, "Checking for new TED packages to download...")
.bean(downloadService, "getNextPackageToDownload")
.choice()
.when(body().isNull())
.log(LoggingLevel.DEBUG, "No more packages to download")
.otherwise()
.to("direct:download-package")
.end();
// Download Route
from("direct:download-package")
.routeId(ROUTE_ID_DOWNLOADER)
.log(LoggingLevel.INFO, "Processing package: ${body.identifier}")
.process(exchange -> {
TedPackageDownloadService.PackageInfo packageInfo =
exchange.getIn().getBody(TedPackageDownloadService.PackageInfo.class);
// Rate Limiting
long delay = properties.getDownload().getDelayBetweenDownloads();
if (delay > 0) {
Thread.sleep(delay);
}
// Download Package
TedPackageDownloadService.DownloadResult result =
downloadService.downloadPackage(packageInfo.year(), packageInfo.serialNumber());
exchange.setProperty("downloadResult", result);
exchange.getIn().setBody(result);
})
.choice()
.when(simple("${exchangeProperty.downloadResult.success} == true"))
.to("direct:process-package-xml-files")
.when(simple("${exchangeProperty.downloadResult.status.name} == 'NOT_FOUND'"))
.log(LoggingLevel.DEBUG, "Package not found (404): ${body.packageEntity.packageIdentifier}")
.when(simple("${exchangeProperty.downloadResult.status.name} == 'ALREADY_EXISTS'"))
.log(LoggingLevel.DEBUG, "Package already exists: ${body.packageEntity.packageIdentifier}")
.when(simple("${exchangeProperty.downloadResult.status.name} == 'DUPLICATE'"))
.log(LoggingLevel.WARN, "Duplicate package detected: ${body.packageEntity.packageIdentifier}")
.otherwise()
.log(LoggingLevel.ERROR, "Failed to download package: ${exchangeProperty.downloadResult.error.message}")
.end();
// XML Files Processing Route
from("direct:process-package-xml-files")
.routeId(ROUTE_ID_XML_PROCESSOR)
.setProperty("processedCount", constant(0))
.setProperty("failedCount", constant(0))
.setProperty("packageIdentifier", simple("${body.packageEntity.packageIdentifier}"))
.setProperty("xmlFileCount", simple("${body.xmlFiles.size}"))
.split(simple("${body.xmlFiles}"))
.parallelProcessing()
.executorService(executorService())
.stopOnException(false) // Continue even if individual documents fail
.shareUnitOfWork()
.doTry()
.process(exchange -> {
Path xmlFile = exchange.getIn().getBody(Path.class);
// Set headers for existing XML processing route
exchange.getIn().setHeader(Exchange.FILE_NAME, xmlFile.getFileName().toString());
exchange.getIn().setHeader(Exchange.FILE_PATH, xmlFile.toString());
exchange.getIn().setHeader(Exchange.FILE_LENGTH, Files.size(xmlFile));
// Read XML content
byte[] content = Files.readAllBytes(xmlFile);
exchange.getIn().setBody(content);
})
// Forward to existing processing route
.to("direct:process-document")
.process(exchange -> {
// Increment success counter
Integer count = exchange.getProperty("processedCount", Integer.class);
exchange.setProperty("processedCount", count + 1);
})
.doCatch(Exception.class)
.log(LoggingLevel.WARN, "Failed to process ${header.CamelFileName}: ${exception.message}")
.process(exchange -> {
// Increment error counter
Integer count = exchange.getProperty("failedCount", Integer.class);
exchange.setProperty("failedCount", count + 1);
})
.end()
.end()
.log(LoggingLevel.INFO, "Package ${exchangeProperty.packageIdentifier} completed: ${exchangeProperty.xmlFileCount} XML files, ${exchangeProperty.processedCount} processed, ${exchangeProperty.failedCount} failed");
}
}