ted package new import
parent
1aa599b587
commit
fbd249e56b
@ -0,0 +1,20 @@
|
|||||||
|
# NEW TED package import route
|
||||||
|
|
||||||
|
This patch adds a NEW-runtime TED package download path that:
|
||||||
|
|
||||||
|
- reuses the proven package sequencing rules
|
||||||
|
- stores package tracking in `TedDailyPackage`
|
||||||
|
- downloads the package tar.gz
|
||||||
|
- ingests it only through `DocumentIngestionGateway`
|
||||||
|
- never calls the legacy XML batch processing / vectorization flow
|
||||||
|
|
||||||
|
## Added classes
|
||||||
|
|
||||||
|
- `TedPackageSequenceService`
|
||||||
|
- `DefaultTedPackageSequenceService`
|
||||||
|
- `TedPackageDownloadNewProperties`
|
||||||
|
- `TedPackageDownloadNewRoute`
|
||||||
|
|
||||||
|
## Config
|
||||||
|
|
||||||
|
Use the `dip.ingestion.ted-download.*` block in `application-new.yml`.
|
||||||
@ -0,0 +1,189 @@
|
|||||||
|
package at.procon.dip.domain.ted.service;
|
||||||
|
|
||||||
|
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
||||||
|
import at.procon.dip.runtime.config.RuntimeMode;
|
||||||
|
import at.procon.dip.ingestion.config.TedPackageDownloadProperties;
|
||||||
|
import at.procon.ted.model.entity.TedDailyPackage;
|
||||||
|
import at.procon.ted.repository.TedDailyPackageRepository;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.time.Year;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.util.Optional;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NEW-runtime implementation of TED package sequencing.
|
||||||
|
* <p>
|
||||||
|
* This reuses the same decision rules as the legacy TED package downloader:
|
||||||
|
* <ul>
|
||||||
|
* <li>current year forward crawling first</li>
|
||||||
|
* <li>gap filling by walking backward to package 1</li>
|
||||||
|
* <li>NOT_FOUND retry handling with current-year indefinite retry support</li>
|
||||||
|
* <li>previous-year grace period before a tail NOT_FOUND becomes final</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@ConditionalOnRuntimeMode(RuntimeMode.NEW)
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class DefaultTedPackageSequenceService implements TedPackageSequenceService {
|
||||||
|
|
||||||
|
private final TedPackageDownloadProperties properties;
|
||||||
|
private final TedDailyPackageRepository packageRepository;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PackageInfo getNextPackageToDownload() {
|
||||||
|
int currentYear = Year.now().getValue();
|
||||||
|
|
||||||
|
log.debug("Determining next TED package to download for NEW runtime (current year: {})", currentYear);
|
||||||
|
|
||||||
|
// 1) Current year forward crawling first (newest data first)
|
||||||
|
PackageInfo nextInCurrentYear = getNextForwardPackage(currentYear);
|
||||||
|
if (nextInCurrentYear != null) {
|
||||||
|
log.info("Next TED package: {} (current year {} forward)", nextInCurrentYear.identifier(), currentYear);
|
||||||
|
return nextInCurrentYear;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2) Walk all years backward and fill gaps / continue unfinished years
|
||||||
|
for (int year = currentYear; year >= properties.getStartYear(); year--) {
|
||||||
|
PackageInfo gapFiller = getGapFillerPackage(year);
|
||||||
|
if (gapFiller != null) {
|
||||||
|
log.info("Next TED package: {} (filling gap in year {})", gapFiller.identifier(), year);
|
||||||
|
return gapFiller;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isYearComplete(year)) {
|
||||||
|
PackageInfo forwardPackage = getNextForwardPackage(year);
|
||||||
|
if (forwardPackage != null) {
|
||||||
|
log.info("Next TED package: {} (continuing year {})", forwardPackage.identifier(), year);
|
||||||
|
return forwardPackage;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.debug("TED package year {} is complete", year);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3) Open a new older year if possible
|
||||||
|
int oldestYear = getOldestYearWithData();
|
||||||
|
if (oldestYear > properties.getStartYear()) {
|
||||||
|
int previousYear = oldestYear - 1;
|
||||||
|
if (previousYear >= properties.getStartYear()) {
|
||||||
|
PackageInfo first = new PackageInfo(previousYear, 1);
|
||||||
|
log.info("Next TED package: {} (opening year {})", first.identifier(), previousYear);
|
||||||
|
return first;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("All TED package years from {} to {} appear complete - nothing to download",
|
||||||
|
properties.getStartYear(), currentYear);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private PackageInfo getNextForwardPackage(int year) {
|
||||||
|
Optional<TedDailyPackage> latest = packageRepository.findLatestByYear(year);
|
||||||
|
|
||||||
|
if (latest.isEmpty()) {
|
||||||
|
return new PackageInfo(year, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
TedDailyPackage latestPackage = latest.get();
|
||||||
|
|
||||||
|
if (latestPackage.getDownloadStatus() == TedDailyPackage.DownloadStatus.NOT_FOUND) {
|
||||||
|
if (shouldRetryNotFoundPackage(latestPackage)) {
|
||||||
|
return new PackageInfo(year, latestPackage.getSerialNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isNotFoundRetryableForYear(latestPackage)) {
|
||||||
|
log.debug("Year {} still inside NOT_FOUND retry window for package {} until {}",
|
||||||
|
year, latestPackage.getPackageIdentifier(), calculateNextRetryAt(latestPackage));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug("Year {} finalized after grace period at tail package {}", year, latestPackage.getPackageIdentifier());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new PackageInfo(year, latestPackage.getSerialNumber() + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private PackageInfo getGapFillerPackage(int year) {
|
||||||
|
Optional<TedDailyPackage> first = packageRepository.findFirstByYear(year);
|
||||||
|
|
||||||
|
if (first.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
int minSerial = first.get().getSerialNumber();
|
||||||
|
if (minSerial <= 1) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new PackageInfo(year, minSerial - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isYearComplete(int year) {
|
||||||
|
Optional<TedDailyPackage> first = packageRepository.findFirstByYear(year);
|
||||||
|
Optional<TedDailyPackage> latest = packageRepository.findLatestByYear(year);
|
||||||
|
|
||||||
|
if (first.isEmpty() || latest.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (first.get().getSerialNumber() != 1) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
TedDailyPackage latestPackage = latest.get();
|
||||||
|
return latestPackage.getDownloadStatus() == TedDailyPackage.DownloadStatus.NOT_FOUND
|
||||||
|
&& !isNotFoundRetryableForYear(latestPackage);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean shouldRetryNotFoundPackage(TedDailyPackage pkg) {
|
||||||
|
if (!isNotFoundRetryableForYear(pkg)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
OffsetDateTime nextRetryAt = calculateNextRetryAt(pkg);
|
||||||
|
return !nextRetryAt.isAfter(OffsetDateTime.now());
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isNotFoundRetryableForYear(TedDailyPackage pkg) {
|
||||||
|
int currentYear = Year.now().getValue();
|
||||||
|
int packageYear = pkg.getYear() != null ? pkg.getYear() : currentYear;
|
||||||
|
|
||||||
|
if (packageYear >= currentYear) {
|
||||||
|
return properties.isRetryCurrentYearNotFoundIndefinitely();
|
||||||
|
}
|
||||||
|
|
||||||
|
return OffsetDateTime.now().isBefore(getYearRetryGraceDeadline(packageYear));
|
||||||
|
}
|
||||||
|
|
||||||
|
private OffsetDateTime calculateNextRetryAt(TedDailyPackage pkg) {
|
||||||
|
OffsetDateTime lastAttemptAt = pkg.getUpdatedAt() != null
|
||||||
|
? pkg.getUpdatedAt()
|
||||||
|
: (pkg.getCreatedAt() != null ? pkg.getCreatedAt() : OffsetDateTime.now());
|
||||||
|
|
||||||
|
return lastAttemptAt.plus(Duration.ofMillis(properties.getNotFoundRetryInterval()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private OffsetDateTime getYearRetryGraceDeadline(int year) {
|
||||||
|
return LocalDate.of(year + 1, 1, 1)
|
||||||
|
.atStartOfDay()
|
||||||
|
.atOffset(ZoneOffset.UTC)
|
||||||
|
.plusDays(properties.getPreviousYearGracePeriodDays());
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getOldestYearWithData() {
|
||||||
|
int currentYear = Year.now().getValue();
|
||||||
|
for (int year = properties.getStartYear(); year <= currentYear; year++) {
|
||||||
|
if (packageRepository.findLatestByYear(year).isPresent()) {
|
||||||
|
return year;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return currentYear;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
package at.procon.dip.domain.ted.service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shared package sequencing contract used to determine the next TED daily package to download.
|
||||||
|
* <p>
|
||||||
|
* This service encapsulates the proven sequencing rules from the legacy download implementation
|
||||||
|
* so they can also be used by the NEW runtime without depending on the old route/service graph.
|
||||||
|
*/
|
||||||
|
public interface TedPackageSequenceService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the next package to download according to the current sequencing strategy,
|
||||||
|
* or {@code null} if nothing should be downloaded right now.
|
||||||
|
*/
|
||||||
|
PackageInfo getNextPackageToDownload();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple year/serial pair with TED package identifier helper.
|
||||||
|
*/
|
||||||
|
record PackageInfo(int year, int serialNumber) {
|
||||||
|
public String identifier() {
|
||||||
|
return "%04d%05d".formatted(year, serialNumber);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,328 @@
|
|||||||
|
package at.procon.dip.ingestion.camel;
|
||||||
|
|
||||||
|
import at.procon.dip.domain.document.SourceType;
|
||||||
|
import at.procon.dip.ingestion.config.TedPackageDownloadProperties;
|
||||||
|
import at.procon.dip.ingestion.service.DocumentIngestionGateway;
|
||||||
|
import at.procon.dip.ingestion.spi.IngestionResult;
|
||||||
|
import at.procon.dip.ingestion.spi.OriginalContentStoragePolicy;
|
||||||
|
import at.procon.dip.ingestion.spi.SourceDescriptor;
|
||||||
|
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
|
||||||
|
import at.procon.dip.runtime.config.RuntimeMode;
|
||||||
|
import at.procon.ted.model.entity.TedDailyPackage;
|
||||||
|
import at.procon.ted.repository.TedDailyPackageRepository;
|
||||||
|
import at.procon.dip.domain.ted.service.TedPackageSequenceService;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.camel.Exchange;
|
||||||
|
import org.apache.camel.LoggingLevel;
|
||||||
|
import org.apache.camel.builder.RouteBuilder;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NEW-runtime TED daily package download route.
|
||||||
|
* <p>
|
||||||
|
* Reuses the proven package sequencing rules through {@link TedPackageSequenceService},
|
||||||
|
* but hands off processing only to the NEW ingestion gateway. No legacy XML batch persistence,
|
||||||
|
* no legacy vectorization route, no old semantic path.
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@ConditionalOnRuntimeMode(RuntimeMode.NEW)
|
||||||
|
@ConditionalOnProperty(name = "dip.ingestion.ted-download.enabled", havingValue = "true")
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class TedPackageDownloadRoute extends RouteBuilder {
|
||||||
|
|
||||||
|
private static final String ROUTE_ID_SCHEDULER = "ted-package-new-scheduler";
|
||||||
|
private static final String ROUTE_ID_DOWNLOADER = "ted-package-new-downloader";
|
||||||
|
private static final String ROUTE_ID_ERROR = "ted-package-new-error-handler";
|
||||||
|
|
||||||
|
private final TedPackageDownloadProperties properties;
|
||||||
|
private final TedDailyPackageRepository packageRepository;
|
||||||
|
private final TedPackageSequenceService sequenceService;
|
||||||
|
private final DocumentIngestionGateway documentIngestionGateway;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure() {
|
||||||
|
errorHandler(deadLetterChannel("direct:ted-package-new-error")
|
||||||
|
.maximumRedeliveries(3)
|
||||||
|
.redeliveryDelay(10_000)
|
||||||
|
.retryAttemptedLogLevel(LoggingLevel.WARN)
|
||||||
|
.logStackTrace(true));
|
||||||
|
|
||||||
|
from("direct:ted-package-new-error")
|
||||||
|
.routeId(ROUTE_ID_ERROR)
|
||||||
|
.process(this::handleError);
|
||||||
|
|
||||||
|
from("timer:ted-package-new-scheduler?period={{dip.ingestion.ted-download.poll-interval:3600000}}&delay=0")
|
||||||
|
.routeId(ROUTE_ID_SCHEDULER)
|
||||||
|
.process(this::checkRunningPackages)
|
||||||
|
.choice()
|
||||||
|
.when(header("tooManyRunning").isEqualTo(true))
|
||||||
|
.log(LoggingLevel.INFO, "Skipping NEW TED package download - already ${header.runningCount} packages in progress")
|
||||||
|
.otherwise()
|
||||||
|
.process(this::determineNextPackage)
|
||||||
|
.choice()
|
||||||
|
.when(header("packageId").isNotNull())
|
||||||
|
.to("direct:download-ted-package-new")
|
||||||
|
.otherwise()
|
||||||
|
.log(LoggingLevel.INFO, "No NEW TED package to download right now")
|
||||||
|
.end()
|
||||||
|
.end();
|
||||||
|
|
||||||
|
from("direct:download-ted-package-new")
|
||||||
|
.routeId(ROUTE_ID_DOWNLOADER)
|
||||||
|
.log(LoggingLevel.INFO, "NEW TED package download started: ${header.packageId}")
|
||||||
|
.setHeader("downloadStartTime", constant(System.currentTimeMillis()))
|
||||||
|
.process(this::createPackageRecord)
|
||||||
|
.delay(simple("{{dip.ingestion.ted-download.delay-between-downloads:5000}}"))
|
||||||
|
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
|
||||||
|
.setHeader("CamelHttpConnectionClose", constant(true))
|
||||||
|
.toD("${header.downloadUrl}?bridgeEndpoint=true&throwExceptionOnFailure=false&socketTimeout={{dip.ingestion.ted-download.download-timeout:300000}}")
|
||||||
|
.choice()
|
||||||
|
.when(header(Exchange.HTTP_RESPONSE_CODE).isEqualTo(200))
|
||||||
|
.process(this::calculateHash)
|
||||||
|
.process(this::checkDuplicateByHash)
|
||||||
|
.choice()
|
||||||
|
.when(header("isDuplicate").isEqualTo(true))
|
||||||
|
.process(this::markDuplicate)
|
||||||
|
.otherwise()
|
||||||
|
.process(this::saveDownloadedPackage)
|
||||||
|
.process(this::ingestThroughGateway)
|
||||||
|
.process(this::markCompleted)
|
||||||
|
.endChoice()
|
||||||
|
.when(header(Exchange.HTTP_RESPONSE_CODE).isEqualTo(404))
|
||||||
|
.process(this::markNotFound)
|
||||||
|
.otherwise()
|
||||||
|
.process(this::markFailed)
|
||||||
|
.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkRunningPackages(Exchange exchange) {
|
||||||
|
long downloadingCount = packageRepository.findByDownloadStatus(TedDailyPackage.DownloadStatus.DOWNLOADING).size();
|
||||||
|
long processingCount = packageRepository.findByDownloadStatus(TedDailyPackage.DownloadStatus.PROCESSING).size();
|
||||||
|
long runningCount = downloadingCount + processingCount;
|
||||||
|
|
||||||
|
exchange.getIn().setHeader("runningCount", runningCount);
|
||||||
|
exchange.getIn().setHeader("tooManyRunning", runningCount >= properties.getMaxRunningPackages());
|
||||||
|
|
||||||
|
if (runningCount > 0) {
|
||||||
|
log.info("Currently {} TED packages in progress in NEW runtime ({} downloading, {} processing)",
|
||||||
|
runningCount, downloadingCount, processingCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void determineNextPackage(Exchange exchange) {
|
||||||
|
List<TedDailyPackage> pendingPackages = packageRepository.findByDownloadStatus(TedDailyPackage.DownloadStatus.PENDING);
|
||||||
|
|
||||||
|
if (!pendingPackages.isEmpty()) {
|
||||||
|
TedDailyPackage pkg = pendingPackages.get(0);
|
||||||
|
log.info("Retrying PENDING TED package in NEW runtime: {}", pkg.getPackageIdentifier());
|
||||||
|
setPackageHeaders(exchange, pkg.getYear(), pkg.getSerialNumber());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TedPackageSequenceService.PackageInfo packageInfo = sequenceService.getNextPackageToDownload();
|
||||||
|
if (packageInfo == null) {
|
||||||
|
exchange.getIn().setHeader("packageId", null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
setPackageHeaders(exchange, packageInfo.year(), packageInfo.serialNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setPackageHeaders(Exchange exchange, int year, int serialNumber) {
|
||||||
|
String packageId = "%04d%05d".formatted(year, serialNumber);
|
||||||
|
String downloadUrl = properties.getBaseUrl() + packageId;
|
||||||
|
|
||||||
|
exchange.getIn().setHeader("packageId", packageId);
|
||||||
|
exchange.getIn().setHeader("year", year);
|
||||||
|
exchange.getIn().setHeader("serialNumber", serialNumber);
|
||||||
|
exchange.getIn().setHeader("downloadUrl", downloadUrl);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createPackageRecord(Exchange exchange) {
|
||||||
|
String packageId = exchange.getIn().getHeader("packageId", String.class);
|
||||||
|
Integer year = exchange.getIn().getHeader("year", Integer.class);
|
||||||
|
Integer serialNumber = exchange.getIn().getHeader("serialNumber", Integer.class);
|
||||||
|
String downloadUrl = exchange.getIn().getHeader("downloadUrl", String.class);
|
||||||
|
|
||||||
|
if (packageRepository.existsByPackageIdentifier(packageId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TedDailyPackage pkg = TedDailyPackage.builder()
|
||||||
|
.packageIdentifier(packageId)
|
||||||
|
.year(year)
|
||||||
|
.serialNumber(serialNumber)
|
||||||
|
.downloadUrl(downloadUrl)
|
||||||
|
.downloadStatus(TedDailyPackage.DownloadStatus.DOWNLOADING)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
packageRepository.save(pkg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void calculateHash(Exchange exchange) throws Exception {
|
||||||
|
byte[] body = exchange.getIn().getBody(byte[].class);
|
||||||
|
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||||
|
byte[] hashBytes = digest.digest(body);
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (byte b : hashBytes) {
|
||||||
|
sb.append(String.format("%02x", b));
|
||||||
|
}
|
||||||
|
|
||||||
|
exchange.getIn().setHeader("fileHash", sb.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkDuplicateByHash(Exchange exchange) {
|
||||||
|
String hash = exchange.getIn().getHeader("fileHash", String.class);
|
||||||
|
|
||||||
|
Optional<TedDailyPackage> duplicate = packageRepository.findAll().stream()
|
||||||
|
.filter(p -> hash.equals(p.getFileHash()))
|
||||||
|
.findFirst();
|
||||||
|
|
||||||
|
exchange.getIn().setHeader("isDuplicate", duplicate.isPresent());
|
||||||
|
duplicate.ifPresent(pkg -> exchange.getIn().setHeader("duplicateOf", pkg.getPackageIdentifier()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void saveDownloadedPackage(Exchange exchange) throws Exception {
|
||||||
|
String packageId = exchange.getIn().getHeader("packageId", String.class);
|
||||||
|
String hash = exchange.getIn().getHeader("fileHash", String.class);
|
||||||
|
byte[] body = exchange.getIn().getBody(byte[].class);
|
||||||
|
|
||||||
|
Path downloadDir = Paths.get(properties.getDownloadDirectory());
|
||||||
|
Files.createDirectories(downloadDir);
|
||||||
|
Path downloadPath = downloadDir.resolve(packageId + ".tar.gz");
|
||||||
|
Files.write(downloadPath, body);
|
||||||
|
|
||||||
|
long downloadDuration = System.currentTimeMillis() -
|
||||||
|
exchange.getIn().getHeader("downloadStartTime", Long.class);
|
||||||
|
|
||||||
|
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
|
||||||
|
pkg.setFileHash(hash);
|
||||||
|
pkg.setDownloadStatus(TedDailyPackage.DownloadStatus.DOWNLOADED);
|
||||||
|
pkg.setDownloadedAt(OffsetDateTime.now());
|
||||||
|
pkg.setDownloadDurationMs(downloadDuration);
|
||||||
|
packageRepository.save(pkg);
|
||||||
|
});
|
||||||
|
|
||||||
|
exchange.getIn().setHeader("downloadPath", downloadPath.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ingestThroughGateway(Exchange exchange) {
|
||||||
|
String packageId = exchange.getIn().getHeader("packageId", String.class);
|
||||||
|
String downloadPath = exchange.getIn().getHeader("downloadPath", String.class);
|
||||||
|
|
||||||
|
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
|
||||||
|
pkg.setDownloadStatus(TedDailyPackage.DownloadStatus.PROCESSING);
|
||||||
|
packageRepository.save(pkg);
|
||||||
|
});
|
||||||
|
|
||||||
|
IngestionResult ingestionResult = documentIngestionGateway.ingest(new SourceDescriptor(
|
||||||
|
null,
|
||||||
|
SourceType.TED_PACKAGE,
|
||||||
|
packageId,
|
||||||
|
downloadPath,
|
||||||
|
packageId + ".tar.gz",
|
||||||
|
"application/gzip",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
OffsetDateTime.now(),
|
||||||
|
OriginalContentStoragePolicy.DEFAULT,
|
||||||
|
Map.of(
|
||||||
|
"packageId", packageId,
|
||||||
|
"title", packageId + ".tar.gz"
|
||||||
|
)
|
||||||
|
));
|
||||||
|
|
||||||
|
int importedChildCount = Math.max(0, ingestionResult.documents().size() - 1);
|
||||||
|
exchange.getIn().setHeader("gatewayImportedChildCount", importedChildCount);
|
||||||
|
exchange.getIn().setHeader("gatewayImportWarnings", ingestionResult.warnings().size());
|
||||||
|
|
||||||
|
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
|
||||||
|
pkg.setXmlFileCount(importedChildCount);
|
||||||
|
pkg.setProcessedCount(importedChildCount);
|
||||||
|
pkg.setFailedCount(0);
|
||||||
|
packageRepository.save(pkg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void markCompleted(Exchange exchange) throws Exception {
|
||||||
|
String packageId = exchange.getIn().getHeader("packageId", String.class);
|
||||||
|
String downloadPath = exchange.getIn().getHeader("downloadPath", String.class);
|
||||||
|
|
||||||
|
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
|
||||||
|
pkg.setDownloadStatus(TedDailyPackage.DownloadStatus.COMPLETED);
|
||||||
|
pkg.setProcessedAt(OffsetDateTime.now());
|
||||||
|
if (pkg.getDownloadedAt() != null) {
|
||||||
|
long processingDuration = Math.max(0L,
|
||||||
|
java.time.Duration.between(pkg.getDownloadedAt(), OffsetDateTime.now()).toMillis());
|
||||||
|
pkg.setProcessingDurationMs(processingDuration);
|
||||||
|
}
|
||||||
|
packageRepository.save(pkg);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (properties.isDeleteAfterIngestion() && downloadPath != null) {
|
||||||
|
Files.deleteIfExists(Path.of(downloadPath));
|
||||||
|
}
|
||||||
|
|
||||||
|
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
|
||||||
|
long totalDuration = (pkg.getDownloadDurationMs() != null ? pkg.getDownloadDurationMs() : 0L)
|
||||||
|
+ (pkg.getProcessingDurationMs() != null ? pkg.getProcessingDurationMs() : 0L);
|
||||||
|
log.info("NEW TED package {} completed: xmlCount={}, processed={}, failed={}, totalDuration={}ms",
|
||||||
|
packageId, pkg.getXmlFileCount(), pkg.getProcessedCount(), pkg.getFailedCount(), totalDuration);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void markNotFound(Exchange exchange) {
|
||||||
|
String packageId = exchange.getIn().getHeader("packageId", String.class);
|
||||||
|
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
|
||||||
|
pkg.setDownloadStatus(TedDailyPackage.DownloadStatus.NOT_FOUND);
|
||||||
|
pkg.setErrorMessage("Package not found (404)");
|
||||||
|
packageRepository.save(pkg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void markFailed(Exchange exchange) {
|
||||||
|
String packageId = exchange.getIn().getHeader("packageId", String.class);
|
||||||
|
Integer httpCode = exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
|
||||||
|
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
|
||||||
|
pkg.setDownloadStatus(TedDailyPackage.DownloadStatus.FAILED);
|
||||||
|
pkg.setErrorMessage("HTTP " + httpCode);
|
||||||
|
packageRepository.save(pkg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void markDuplicate(Exchange exchange) {
|
||||||
|
String packageId = exchange.getIn().getHeader("packageId", String.class);
|
||||||
|
String duplicateOf = exchange.getIn().getHeader("duplicateOf", String.class);
|
||||||
|
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
|
||||||
|
pkg.setDownloadStatus(TedDailyPackage.DownloadStatus.COMPLETED);
|
||||||
|
pkg.setErrorMessage("Duplicate of " + duplicateOf);
|
||||||
|
pkg.setProcessedAt(OffsetDateTime.now());
|
||||||
|
packageRepository.save(pkg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleError(Exchange exchange) {
|
||||||
|
Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
|
||||||
|
String packageId = exchange.getIn().getHeader("packageId", String.class);
|
||||||
|
|
||||||
|
if (packageId != null) {
|
||||||
|
packageRepository.findByPackageIdentifier(packageId).ifPresent(pkg -> {
|
||||||
|
pkg.setDownloadStatus(TedDailyPackage.DownloadStatus.FAILED);
|
||||||
|
pkg.setErrorMessage(exception != null ? exception.getMessage() : "Unknown route error");
|
||||||
|
packageRepository.save(pkg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,52 @@
|
|||||||
|
package at.procon.dip.ingestion.config;
|
||||||
|
|
||||||
|
import jakarta.validation.constraints.Min;
|
||||||
|
import jakarta.validation.constraints.NotBlank;
|
||||||
|
import jakarta.validation.constraints.Positive;
|
||||||
|
import lombok.Data;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NEW-runtime TED package download configuration.
|
||||||
|
* <p>
|
||||||
|
* This is intentionally separate from the legacy {@code ted.download.*} tree.
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
@ConfigurationProperties(prefix = "dip.ingestion.ted-download")
|
||||||
|
@Data
|
||||||
|
public class TedPackageDownloadProperties {
|
||||||
|
|
||||||
|
private boolean enabled = false;
|
||||||
|
|
||||||
|
@NotBlank
|
||||||
|
private String baseUrl = "https://ted.europa.eu/packages/daily/";
|
||||||
|
|
||||||
|
@NotBlank
|
||||||
|
private String downloadDirectory = "/ted.europe/downloads-new";
|
||||||
|
|
||||||
|
@Positive
|
||||||
|
private int startYear = 2015;
|
||||||
|
|
||||||
|
@Positive
|
||||||
|
private long pollInterval = 3_600_000L;
|
||||||
|
|
||||||
|
@Positive
|
||||||
|
private long notFoundRetryInterval = 21_600_000L;
|
||||||
|
|
||||||
|
@Min(0)
|
||||||
|
private int previousYearGracePeriodDays = 30;
|
||||||
|
|
||||||
|
private boolean retryCurrentYearNotFoundIndefinitely = true;
|
||||||
|
|
||||||
|
@Positive
|
||||||
|
private long downloadTimeout = 300_000L;
|
||||||
|
|
||||||
|
@Positive
|
||||||
|
private int maxRunningPackages = 2;
|
||||||
|
|
||||||
|
@Positive
|
||||||
|
private long delayBetweenDownloads = 5_000L;
|
||||||
|
|
||||||
|
private boolean deleteAfterIngestion = true;
|
||||||
|
}
|
||||||
@ -1,16 +0,0 @@
|
|||||||
package at.procon.ted.config;
|
|
||||||
|
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Patch A scaffold for the legacy runtime configuration tree.
|
|
||||||
*
|
|
||||||
* The legacy runtime still uses {@link TedProcessorProperties} today. This class is
|
|
||||||
* introduced so the old configuration can be moved gradually from `ted.*` to
|
|
||||||
* `legacy.ted.*` without blocking the runtime split.
|
|
||||||
*/
|
|
||||||
@Configuration
|
|
||||||
@ConfigurationProperties(prefix = "legacy.ted")
|
|
||||||
public class LegacyTedProperties extends TedProcessorProperties {
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue