You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DIP/src/main/java/at/procon/ted/repair/TedPackageRepairService.java

447 lines
19 KiB
Java

package at.procon.ted.repair;
import at.procon.dip.runtime.condition.ConditionalOnRuntimeMode;
import at.procon.dip.runtime.config.RuntimeMode;
import at.procon.ted.config.TedProcessorProperties;
import at.procon.ted.model.entity.TedDailyPackage;
import at.procon.ted.repository.TedDailyPackageRepository;
import at.procon.ted.service.BatchDocumentProcessingService;
import at.procon.ted.service.TedPackageDownloadService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Startup tool for repairing / re-importing incomplete legacy TED daily packages.
*
* Strategy:
* - Identify incomplete package rows from {@code ted.ted_daily_package}
* - Optionally include missing sequence numbers inside a configured package range
* - Reuse existing batch XML processing so already-imported XML documents are skipped by hash,
* while missing documents are inserted during the repair run
*/
@Service
@ConditionalOnRuntimeMode(RuntimeMode.LEGACY)
@RequiredArgsConstructor
@Slf4j
public class TedPackageRepairService {
private static final Pattern PACKAGE_IDENTIFIER_PATTERN = Pattern.compile("\\d{9}");
private static final int PROCESSING_CHUNK_SIZE = 25;
private final TedProcessorProperties properties;
private final TedDailyPackageRepository packageRepository;
private final TedPackageDownloadService downloadService;
private final BatchDocumentProcessingService batchProcessingService;
public RepairSummary repairConfiguredPackages() {
TedProcessorProperties.RepairProperties repairProperties = properties.getRepair();
List<RepairCandidate> candidates = resolveCandidates(repairProperties);
if (candidates.isEmpty()) {
log.info("TED package repair found no matching incomplete packages");
return new RepairSummary(0, 0, 0, 0, List.of());
}
log.info("TED package repair selected {} package candidates (dryRun={})", candidates.size(), repairProperties.isDryRun());
candidates.forEach(candidate -> log.info("Repair candidate: {} [{}]", candidate.packageIdentifier(), candidate.reason()));
if (repairProperties.isDryRun()) {
return new RepairSummary(candidates.size(), 0, 0, 0,
candidates.stream().map(RepairCandidate::packageIdentifier).toList());
}
int succeeded = 0;
int failed = 0;
int notFound = 0;
List<String> processed = new ArrayList<>();
for (RepairCandidate candidate : candidates) {
try {
RepairExecutionResult result = repairCandidate(candidate, repairProperties);
processed.add(candidate.packageIdentifier());
switch (result.outcome()) {
case COMPLETED -> succeeded++;
case NOT_FOUND -> notFound++;
case FAILED -> failed++;
}
} catch (Exception e) {
failed++;
log.error("TED package repair failed for {}: {}", candidate.packageIdentifier(), e.getMessage(), e);
markExistingPackageFailure(candidate.existingPackage(), "Repair run failed: " + e.getMessage());
}
}
log.info("TED package repair finished: selected={}, succeeded={}, failed={}, notFound={}",
candidates.size(), succeeded, failed, notFound);
return new RepairSummary(candidates.size(), succeeded, failed, notFound, processed);
}
List<RepairCandidate> resolveCandidates(TedProcessorProperties.RepairProperties repairProperties) {
List<TedDailyPackage> existingPackages = packageRepository.findAll(Sort.by(Sort.Direction.ASC, "year", "serialNumber"));
Map<String, TedDailyPackage> existingByIdentifier = existingPackages.stream()
.collect(Collectors.toMap(TedDailyPackage::getPackageIdentifier, pkg -> pkg, (left, right) -> left, LinkedHashMap::new));
if (!repairProperties.getPackageIdentifiers().isEmpty()) {
return resolveExplicitCandidates(repairProperties.getPackageIdentifiers(), existingByIdentifier, repairProperties.getMaxPackages());
}
if (existingPackages.isEmpty()) {
return List.of();
}
List<RepairCandidate> candidates = new ArrayList<>();
Set<String> seen = new LinkedHashSet<>();
boolean inspectSequenceRange = repairProperties.isIncludeMissingSequenceGaps()
|| hasText(repairProperties.getFromPackageIdentifier())
|| hasText(repairProperties.getToPackageIdentifier());
if (!inspectSequenceRange) {
for (TedDailyPackage pkg : existingPackages) {
if (isIncomplete(pkg) && seen.add(pkg.getPackageIdentifier())) {
candidates.add(RepairCandidate.existing(pkg, repairReasonFor(pkg)));
}
}
return limitCandidates(candidates, repairProperties.getMaxPackages());
}
PackageCoordinates first = parseIdentifier(
hasText(repairProperties.getFromPackageIdentifier())
? repairProperties.getFromPackageIdentifier()
: existingPackages.getFirst().getPackageIdentifier());
PackageCoordinates last = parseIdentifier(
hasText(repairProperties.getToPackageIdentifier())
? repairProperties.getToPackageIdentifier()
: existingPackages.getLast().getPackageIdentifier());
if (first.compareTo(last) > 0) {
throw new IllegalArgumentException("Repair package range is invalid: from > to");
}
Map<Integer, Integer> observedMaxByYear = existingPackages.stream()
.collect(Collectors.groupingBy(TedDailyPackage::getYear,
LinkedHashMap::new,
Collectors.collectingAndThen(
Collectors.maxBy(Comparator.comparingInt(TedDailyPackage::getSerialNumber)),
optional -> optional.map(TedDailyPackage::getSerialNumber).orElse(0))));
for (int year = first.year(); year <= last.year(); year++) {
int startSerial = year == first.year() ? first.serialNumber() : 1;
int defaultEndSerial = observedMaxByYear.getOrDefault(year, 0);
int endSerial = year == last.year() ? last.serialNumber() : defaultEndSerial;
if (endSerial < startSerial || endSerial <= 0) {
continue;
}
for (int serial = startSerial; serial <= endSerial; serial++) {
String packageIdentifier = formatPackageIdentifier(year, serial);
TedDailyPackage existingPackage = existingByIdentifier.get(packageIdentifier);
if (existingPackage != null) {
if (isIncomplete(existingPackage) && seen.add(packageIdentifier)) {
candidates.add(RepairCandidate.existing(existingPackage, repairReasonFor(existingPackage)));
}
} else if (repairProperties.isIncludeMissingSequenceGaps() && seen.add(packageIdentifier)) {
candidates.add(RepairCandidate.missing(year, serial, packageIdentifier, "MISSING_SEQUENCE_GAP"));
}
}
}
return limitCandidates(candidates, repairProperties.getMaxPackages());
}
private List<RepairCandidate> resolveExplicitCandidates(Collection<String> packageIdentifiers,
Map<String, TedDailyPackage> existingByIdentifier,
int maxPackages) {
List<RepairCandidate> candidates = new ArrayList<>();
Set<String> seen = new LinkedHashSet<>();
for (String rawIdentifier : packageIdentifiers) {
if (!hasText(rawIdentifier)) {
continue;
}
String normalized = rawIdentifier.trim();
if (!seen.add(normalized)) {
continue;
}
PackageCoordinates coordinates = parseIdentifier(normalized);
TedDailyPackage existing = existingByIdentifier.get(normalized);
if (existing != null) {
candidates.add(RepairCandidate.existing(existing, repairReasonFor(existing)));
} else {
candidates.add(RepairCandidate.missing(coordinates.year(), coordinates.serialNumber(), normalized, "EXPLICIT_PACKAGE"));
}
}
return limitCandidates(candidates, maxPackages);
}
private List<RepairCandidate> limitCandidates(List<RepairCandidate> candidates, int maxPackages) {
if (candidates.size() <= maxPackages) {
return candidates;
}
return new ArrayList<>(candidates.subList(0, maxPackages));
}
@Transactional
RepairExecutionResult repairCandidate(RepairCandidate candidate, TedProcessorProperties.RepairProperties repairProperties) throws Exception {
TedDailyPackage packageEntity = candidate.existingPackage() != null
? candidate.existingPackage()
: createMissingPackageRecord(candidate);
String packageIdentifier = candidate.packageIdentifier();
boolean downloadedNow = false;
long startNanos = System.nanoTime();
Path archivePath = packageArchivePath(packageIdentifier);
if (repairProperties.isForceRedownload() || !Files.exists(archivePath)) {
if (!repairProperties.isRedownloadMissingArchives()) {
String message = "Package archive is missing locally and re-download is disabled";
markFailure(packageEntity, message);
return new RepairExecutionResult(RepairOutcome.FAILED, message);
}
Path downloadedArchive = downloadService.downloadArchive(packageIdentifier);
if (downloadedArchive == null) {
packageEntity.setDownloadStatus(TedDailyPackage.DownloadStatus.NOT_FOUND);
packageEntity.setErrorMessage("Package not found during repair run");
packageRepository.save(packageEntity);
return new RepairExecutionResult(RepairOutcome.NOT_FOUND, "HTTP 404");
}
archivePath = downloadedArchive;
downloadedNow = true;
packageEntity.setDownloadedAt(OffsetDateTime.now());
packageEntity.setDownloadUrl(downloadService.buildDownloadUrlForPackage(packageIdentifier));
}
packageEntity.setDownloadStatus(TedDailyPackage.DownloadStatus.PROCESSING);
packageEntity.setErrorMessage(null);
packageEntity.setProcessedCount(0);
packageEntity.setFailedCount(0);
packageEntity.setFileHash(downloadService.calculateArchiveHash(archivePath));
packageRepository.save(packageEntity);
List<Path> xmlFiles = downloadService.extractArchive(archivePath, packageIdentifier);
packageEntity.setXmlFileCount(xmlFiles.size());
packageRepository.save(packageEntity);
int totalProcessed = 0;
int totalFailed = 0;
try {
for (int i = 0; i < xmlFiles.size(); i += PROCESSING_CHUNK_SIZE) {
int end = Math.min(i + PROCESSING_CHUNK_SIZE, xmlFiles.size());
List<Path> chunk = xmlFiles.subList(i, end);
BatchDocumentProcessingService.BatchProcessingResult result = batchProcessingService.processBatch(chunk);
totalProcessed += result.insertedCount() + result.duplicateCount();
totalFailed += result.errorCount();
packageEntity.setProcessedCount(totalProcessed);
packageEntity.setFailedCount(totalFailed);
packageRepository.save(packageEntity);
}
} finally {
cleanupExtractedXmlFiles(xmlFiles);
if (downloadedNow && properties.getDownload().isDeleteAfterExtraction()) {
deleteQuietly(archivePath);
}
}
packageEntity.setProcessedAt(OffsetDateTime.now());
packageEntity.setProcessingDurationMs((System.nanoTime() - startNanos) / 1_000_000L);
packageEntity.setProcessedCount(totalProcessed);
packageEntity.setFailedCount(totalFailed);
if (totalFailed == 0 && totalProcessed == xmlFiles.size()) {
packageEntity.setDownloadStatus(TedDailyPackage.DownloadStatus.COMPLETED);
packageEntity.setErrorMessage(null);
packageRepository.save(packageEntity);
return new RepairExecutionResult(RepairOutcome.COMPLETED, "Package repaired successfully");
}
String failureMessage = String.format(Locale.ROOT,
"Repair incomplete: xmlFiles=%d, processed=%d, failed=%d",
xmlFiles.size(), totalProcessed, totalFailed);
markFailure(packageEntity, failureMessage);
return new RepairExecutionResult(RepairOutcome.FAILED, failureMessage);
}
private TedDailyPackage createMissingPackageRecord(RepairCandidate candidate) {
TedDailyPackage pkg = TedDailyPackage.builder()
.packageIdentifier(candidate.packageIdentifier())
.year(candidate.year())
.serialNumber(candidate.serialNumber())
.downloadUrl(downloadService.buildDownloadUrlForPackage(candidate.packageIdentifier()))
.downloadStatus(TedDailyPackage.DownloadStatus.PENDING)
.build();
return packageRepository.save(pkg);
}
private void markFailure(TedDailyPackage packageEntity, String message) {
packageEntity.setDownloadStatus(TedDailyPackage.DownloadStatus.FAILED);
packageEntity.setErrorMessage(message);
packageRepository.save(packageEntity);
}
private void markExistingPackageFailure(TedDailyPackage packageEntity, String message) {
if (packageEntity == null) {
return;
}
packageEntity.setDownloadStatus(TedDailyPackage.DownloadStatus.FAILED);
packageEntity.setErrorMessage(message);
packageRepository.save(packageEntity);
}
private Path packageArchivePath(String packageIdentifier) {
return Paths.get(properties.getDownload().getDownloadDirectory()).resolve(packageIdentifier + ".tar.gz");
}
private void cleanupExtractedXmlFiles(List<Path> xmlFiles) {
if (xmlFiles.isEmpty()) {
return;
}
Path packageDirectory = xmlFiles.getFirst().getParent();
for (Path xmlFile : xmlFiles) {
deleteQuietly(xmlFile);
}
if (packageDirectory != null) {
try (var stream = Files.list(packageDirectory)) {
if (stream.findAny().isEmpty()) {
deleteQuietly(packageDirectory);
}
} catch (IOException e) {
log.debug("Could not clean extracted package directory {}: {}", packageDirectory, e.getMessage());
}
}
}
private void deleteQuietly(Path path) {
try {
Files.deleteIfExists(path);
} catch (IOException e) {
log.debug("Could not delete {}: {}", path, e.getMessage());
}
}
boolean isIncomplete(TedDailyPackage pkg) {
if (pkg == null || pkg.getDownloadStatus() == null) {
return false;
}
if (pkg.getDownloadStatus() == TedDailyPackage.DownloadStatus.NOT_FOUND) {
return false;
}
if (pkg.getDownloadStatus() != TedDailyPackage.DownloadStatus.COMPLETED) {
return true;
}
Integer xmlFileCount = pkg.getXmlFileCount();
int processedCount = pkg.getProcessedCount() != null ? pkg.getProcessedCount() : 0;
int failedCount = pkg.getFailedCount() != null ? pkg.getFailedCount() : 0;
if (xmlFileCount == null || xmlFileCount <= 0) {
return true;
}
if (failedCount > 0) {
return true;
}
return processedCount != xmlFileCount;
}
private String repairReasonFor(TedDailyPackage pkg) {
if (pkg.getDownloadStatus() != TedDailyPackage.DownloadStatus.COMPLETED) {
return "STATUS_" + pkg.getDownloadStatus();
}
if (pkg.getXmlFileCount() == null || pkg.getXmlFileCount() <= 0) {
return "MISSING_XML_COUNT";
}
if (pkg.getFailedCount() != null && pkg.getFailedCount() > 0) {
return "FAILED_DOCUMENTS";
}
return "COUNT_MISMATCH";
}
private PackageCoordinates parseIdentifier(String packageIdentifier) {
String normalized = packageIdentifier != null ? packageIdentifier.trim() : "";
if (!PACKAGE_IDENTIFIER_PATTERN.matcher(normalized).matches()) {
throw new IllegalArgumentException("Invalid package identifier: " + packageIdentifier);
}
return new PackageCoordinates(
Integer.parseInt(normalized.substring(0, 4)),
Integer.parseInt(normalized.substring(4)));
}
private String formatPackageIdentifier(int year, int serialNumber) {
return String.format(Locale.ROOT, "%04d%05d", year, serialNumber);
}
private boolean hasText(String value) {
return value != null && !value.isBlank();
}
record PackageCoordinates(int year, int serialNumber) implements Comparable<PackageCoordinates> {
@Override
public int compareTo(PackageCoordinates other) {
int yearCompare = Integer.compare(this.year, other.year);
if (yearCompare != 0) {
return yearCompare;
}
return Integer.compare(this.serialNumber, other.serialNumber);
}
}
public record RepairCandidate(int year,
int serialNumber,
String packageIdentifier,
TedDailyPackage existingPackage,
String reason) {
static RepairCandidate existing(TedDailyPackage pkg, String reason) {
return new RepairCandidate(pkg.getYear(), pkg.getSerialNumber(), pkg.getPackageIdentifier(), pkg, reason);
}
static RepairCandidate missing(int year, int serialNumber, String packageIdentifier, String reason) {
return new RepairCandidate(year, serialNumber, packageIdentifier, null, reason);
}
}
enum RepairOutcome {
COMPLETED,
FAILED,
NOT_FOUND
}
record RepairExecutionResult(RepairOutcome outcome, String message) {
}
public record RepairSummary(int selected,
int succeeded,
int failed,
int notFound,
List<String> processedPackageIdentifiers) {
}
}