From 253845e9eaeea25eb187d2ee0c3d860ebb19869c Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 18 May 2026 14:02:47 +0200 Subject: [PATCH] Optimize Leitstand TIME materialization workflow --- ...cumentIntelligencePlatformApplication.java | 4 +- .../time/config/TimeDomainProperties.java | 1 + ...tandTimeRecordingAssignmentRepository.java | 3 + .../LeitstandTimeRecordingRepository.java | 2 + .../service/LeitstandTimeImportService.java | 18 +- .../LeitstandTimeProjectionService.java | 178 +++++++++++------- ...yRepresentationMaterializationService.java | 176 ++++++++++++----- ...SelectiveMaterializationStartupRunner.java | 4 +- 8 files changed, 265 insertions(+), 121 deletions(-) diff --git a/src/main/java/at/procon/dip/DocumentIntelligencePlatformApplication.java b/src/main/java/at/procon/dip/DocumentIntelligencePlatformApplication.java index 1cc2bc8..1f39a82 100644 --- a/src/main/java/at/procon/dip/DocumentIntelligencePlatformApplication.java +++ b/src/main/java/at/procon/dip/DocumentIntelligencePlatformApplication.java @@ -16,8 +16,8 @@ import org.springframework.scheduling.annotation.EnableAsync; */ @SpringBootApplication(scanBasePackages = {"at.procon.dip", "at.procon.ted"}) @EnableAsync -@EntityScan(basePackages = {"at.procon.ted.model.entity", "at.procon.dip.domain.document.entity", "at.procon.dip.domain.tenant.entity", "at.procon.dip.domain.ted.entity", "at.procon.dip.embedding.job.entity", "at.procon.dip.migration.audit.entity", "at.procon.dip.migration.entity", /*"at.procon.dip.domain.time.entity",*/ "at.procon.dip.clustering.entity"}) -@EnableJpaRepositories(basePackages = {"at.procon.ted.repository", "at.procon.dip.domain.document.repository", "at.procon.dip.domain.tenant.repository", "at.procon.dip.domain.ted.repository", "at.procon.dip.embedding.job.repository", "at.procon.dip.migration.audit.repository", "at.procon.dip.migration.repository", /*"at.procon.dip.domain.time.repository",*/ "at.procon.dip.clustering.repository"}) +@EntityScan(basePackages = {"at.procon.ted.model.entity", "at.procon.dip.domain.document.entity", "at.procon.dip.domain.tenant.entity", "at.procon.dip.domain.ted.entity", "at.procon.dip.embedding.job.entity", "at.procon.dip.migration.audit.entity", "at.procon.dip.migration.entity", "at.procon.dip.domain.time.entity",/**/ "at.procon.dip.clustering.entity"}) +@EnableJpaRepositories(basePackages = {"at.procon.ted.repository", "at.procon.dip.domain.document.repository", "at.procon.dip.domain.tenant.repository", "at.procon.dip.domain.ted.repository", "at.procon.dip.embedding.job.repository", "at.procon.dip.migration.audit.repository", "at.procon.dip.migration.repository", "at.procon.dip.domain.time.repository",/**/ "at.procon.dip.clustering.repository"}) public class DocumentIntelligencePlatformApplication { public static void main(String[] args) { diff --git a/src/main/java/at/procon/dip/domain/time/config/TimeDomainProperties.java b/src/main/java/at/procon/dip/domain/time/config/TimeDomainProperties.java index 27cf14c..9460f20 100644 --- a/src/main/java/at/procon/dip/domain/time/config/TimeDomainProperties.java +++ b/src/main/java/at/procon/dip/domain/time/config/TimeDomainProperties.java @@ -33,6 +33,7 @@ public class TimeDomainProperties { private String selectiveMaterializationPersonDbk; private Integer selectiveMaterializationPersonNumber; private boolean selectiveMaterializationBuildProjection = true; + private int materializationChunkSize = 200; private String representationLanguageCode = "de"; private String scopeKey = "leitstand-default"; private JdbcProperties jdbc = new JdbcProperties(); diff --git a/src/main/java/at/procon/dip/domain/time/repository/leitstand/LeitstandTimeRecordingAssignmentRepository.java b/src/main/java/at/procon/dip/domain/time/repository/leitstand/LeitstandTimeRecordingAssignmentRepository.java index c87ac3c..21029ce 100644 --- a/src/main/java/at/procon/dip/domain/time/repository/leitstand/LeitstandTimeRecordingAssignmentRepository.java +++ b/src/main/java/at/procon/dip/domain/time/repository/leitstand/LeitstandTimeRecordingAssignmentRepository.java @@ -1,10 +1,13 @@ package at.procon.dip.domain.time.repository.leitstand; import at.procon.dip.domain.time.entity.leitstand.LeitstandTimeRecordingAssignment; +import java.util.Collection; import java.util.List; import org.springframework.data.jpa.repository.JpaRepository; public interface LeitstandTimeRecordingAssignmentRepository extends JpaRepository { List findByTimeRecordingDbkOrderByDbkAsc(String timeRecordingDbk); + + List findByTimeRecordingDbkInOrderByTimeRecordingDbkAscDbkAsc(Collection timeRecordingDbks); } diff --git a/src/main/java/at/procon/dip/domain/time/repository/leitstand/LeitstandTimeRecordingRepository.java b/src/main/java/at/procon/dip/domain/time/repository/leitstand/LeitstandTimeRecordingRepository.java index 46db2a1..ba7fdd5 100644 --- a/src/main/java/at/procon/dip/domain/time/repository/leitstand/LeitstandTimeRecordingRepository.java +++ b/src/main/java/at/procon/dip/domain/time/repository/leitstand/LeitstandTimeRecordingRepository.java @@ -10,6 +10,8 @@ public interface LeitstandTimeRecordingRepository extends JpaRepository findByTimeEntry_Id(UUID timeEntryId); + List findAllByOrderByRecordedFromAscDbkAsc(); + List findByTimeEntryIsNotNull(); List findByPersonDbkOrderByRecordedFromAscDbkAsc(String personDbk); diff --git a/src/main/java/at/procon/dip/domain/time/service/LeitstandTimeImportService.java b/src/main/java/at/procon/dip/domain/time/service/LeitstandTimeImportService.java index 4a8fde4..e9c9cc5 100644 --- a/src/main/java/at/procon/dip/domain/time/service/LeitstandTimeImportService.java +++ b/src/main/java/at/procon/dip/domain/time/service/LeitstandTimeImportService.java @@ -38,7 +38,7 @@ import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @Service -//@ConditionalOnRuntimeMode(RuntimeMode.NEW) +@ConditionalOnRuntimeMode(RuntimeMode.NEW) @ConditionalOnProperty(prefix = "dip.time.leitstand", name = "enabled", havingValue = "true") @RequiredArgsConstructor @Slf4j @@ -144,14 +144,26 @@ public class LeitstandTimeImportService { log.info("No Leitstand time recordings found for personDbk={}", personDbk); return 0; } - //upsertCanonicalTimeEntriesForImportedRecordings(recordings); + upsertCanonicalTimeEntriesForImportedRecordings(recordings); if (rebuildProjection && properties.getLeitstand().isBuildSearchProjection()) { projectionService.refreshForPersonDbk(personDbk); } return recordings.size(); } - @Transactional + public int materializeCanonicalTimeEntriesForAll(boolean rebuildProjection) { + List recordings = timeRecordingRepository.findAllByOrderByRecordedFromAscDbkAsc(); + if (recordings.isEmpty()) { + log.info("No Leitstand time recordings found for full materialization"); + return 0; + } + upsertCanonicalTimeEntriesForImportedRecordings(recordings); + if (rebuildProjection && properties.getLeitstand().isBuildSearchProjection()) { + projectionService.refreshAll(); + } + return recordings.size(); + } + public int materializeCanonicalTimeEntriesForPersonNumber(Integer personNumber, boolean rebuildProjection) { if (personNumber == null) { throw new IllegalArgumentException("personNumber must not be null"); diff --git a/src/main/java/at/procon/dip/domain/time/service/LeitstandTimeProjectionService.java b/src/main/java/at/procon/dip/domain/time/service/LeitstandTimeProjectionService.java index 63e5a0d..4c00f26 100644 --- a/src/main/java/at/procon/dip/domain/time/service/LeitstandTimeProjectionService.java +++ b/src/main/java/at/procon/dip/domain/time/service/LeitstandTimeProjectionService.java @@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @Service @@ -44,126 +45,159 @@ public class LeitstandTimeProjectionService { private final TimeEntrySearchProjectionRepository projectionRepository; private final TimeEntryRepresentationMaterializationService representationMaterializationService; - @Transactional public void refreshForLeitstandRecordingDbks(Collection recordingDbks) { if (recordingDbks == null || recordingDbks.isEmpty()) { return; } List recordings = timeRecordingRepository.findAllById(recordingDbks).stream() .filter(recording -> recording.getTimeEntry() != null) + .sorted(Comparator.comparing(LeitstandTimeRecording::getRecordedFrom, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(LeitstandTimeRecording::getDbk)) .toList(); - if (recordings.isEmpty()) { - return; - } - upsertProjections(recordings); + refreshChunked(recordings); } - - @Transactional public int refreshForPersonDbk(String personDbk) { if (personDbk == null || personDbk.isBlank()) { return 0; } List recordings = timeRecordingRepository .findByPersonDbkAndTimeEntryIsNotNullOrderByRecordedFromAscDbkAsc(personDbk); - upsertProjections(recordings); + refreshChunked(recordings); return recordings.size(); } - @Transactional public int refreshAll() { - List recordings = timeRecordingRepository.findByTimeEntryIsNotNull(); - upsertProjections(recordings); + List recordings = timeRecordingRepository.findByTimeEntryIsNotNull().stream() + .sorted(Comparator.comparing(LeitstandTimeRecording::getRecordedFrom, Comparator.nullsLast(Comparator.naturalOrder())) + .thenComparing(LeitstandTimeRecording::getDbk)) + .toList(); + refreshChunked(recordings); return recordings.size(); } - private void upsertProjections(List recordings) { - for (LeitstandTimeRecording recording : recordings) { - TimeEntrySearchProjection projection = buildProjection(recording); - TimeEntrySearchProjection saved = projectionRepository.save(projection); - if (properties.getLeitstand().isBuildRepresentations()) { - representationMaterializationService.upsertRepresentations(saved); - } + private void refreshChunked(List recordings) { + if (recordings == null || recordings.isEmpty()) { + return; + } + int chunkSize = Math.max(1, properties.getLeitstand().getMaterializationChunkSize()); + for (int start = 0; start < recordings.size(); start += chunkSize) { + List chunk = recordings.subList(start, Math.min(start + chunkSize, recordings.size())); + refreshChunk(chunk); } } - private TimeEntrySearchProjection buildProjection(LeitstandTimeRecording recording) { - TimeEntry timeEntry = timeEntryRepository.findById(recording.getTimeEntry().getId()) - .orElseThrow(() -> new IllegalArgumentException("Unknown TIME entry id: " + recording.getTimeEntry().getId())); - Document document = timeEntry.getDocument(); + @Transactional(propagation = Propagation.REQUIRES_NEW) + protected void refreshChunk(List recordings) { + if (recordings == null || recordings.isEmpty()) { + return; + } + ProjectionBuildContext ctx = preloadContext(recordings); + List projections = new ArrayList<>(recordings.size()); + for (LeitstandTimeRecording recording : recordings) { + projections.add(buildProjection(recording, ctx)); + } + List saved = projectionRepository.saveAll(projections); + projectionRepository.flush(); + if (properties.getLeitstand().isBuildRepresentations()) { + representationMaterializationService.upsertRepresentations(saved); + } + } - LeitstandPerson person = recording.getPersonDbk() == null ? null : personRepository.findById(recording.getPersonDbk()).orElse(null); - LeitstandActivityType activityType = recording.getActivityTypeId() == null ? null : activityTypeRepository.findById(recording.getActivityTypeId()).orElse(null); + private ProjectionBuildContext preloadContext(List recordings) { + List recordingDbks = recordings.stream().map(LeitstandTimeRecording::getDbk).toList(); + List assignments = timeRecordingAssignmentRepository + .findByTimeRecordingDbkInOrderByTimeRecordingDbkAscDbkAsc(recordingDbks); + Map> assignmentsByRecordingDbk = assignments.stream() + .collect(Collectors.groupingBy(LeitstandTimeRecordingAssignment::getTimeRecordingDbk, LinkedHashMap::new, Collectors.toList())); - List assignments = timeRecordingAssignmentRepository.findByTimeRecordingDbkOrderByDbkAsc(recording.getDbk()); - List personTaskAssignments = personTaskAssignmentRepository.findAllById(assignments.stream() + List personTaskAssignmentIds = assignments.stream() .map(LeitstandTimeRecordingAssignment::getPersonTaskAssignmentDbk) .filter(Objects::nonNull) .distinct() - .toList()); - Map ptaByDbk = indexBy(personTaskAssignments, LeitstandPersonTaskAssignment::getDbk); + .toList(); + List personTaskAssignments = personTaskAssignmentRepository.findAllById(personTaskAssignmentIds); + Map personTaskAssignmentsByDbk = indexBy(personTaskAssignments, LeitstandPersonTaskAssignment::getDbk); - Map tasksByDbk = indexBy(taskRepository.findAllById(personTaskAssignments.stream() - .map(LeitstandPersonTaskAssignment::getTaskDbk) - .filter(Objects::nonNull) - .distinct() - .toList()), LeitstandTask::getDbk); + List taskIds = personTaskAssignments.stream().map(LeitstandPersonTaskAssignment::getTaskDbk).filter(Objects::nonNull).distinct().toList(); + Map tasksByDbk = indexBy(taskRepository.findAllById(taskIds), LeitstandTask::getDbk); - Map costUnitsByDbk = indexBy(costUnitRepository.findAllById(personTaskAssignments.stream() - .map(LeitstandPersonTaskAssignment::getCostUnitDbk) - .filter(Objects::nonNull) - .distinct() - .toList()), LeitstandCostUnit::getDbk); + List costUnitIds = personTaskAssignments.stream().map(LeitstandPersonTaskAssignment::getCostUnitDbk).filter(Objects::nonNull).distinct().toList(); + Map costUnitsByDbk = indexBy(costUnitRepository.findAllById(costUnitIds), LeitstandCostUnit::getDbk); - Map contractsByDbk = indexBy(contractRepository.findAllById(costUnitsByDbk.values().stream() - .map(LeitstandCostUnit::getContractDbk) - .filter(Objects::nonNull) - .distinct() - .toList()), LeitstandContract::getDbk); + List contractIds = costUnitsByDbk.values().stream().map(LeitstandCostUnit::getContractDbk).filter(Objects::nonNull).distinct().toList(); + Map contractsByDbk = indexBy(contractRepository.findAllById(contractIds), LeitstandContract::getDbk); - Map contractPositionsByDbk = indexBy(contractPositionRepository.findAllById(costUnitsByDbk.values().stream() - .map(LeitstandCostUnit::getContractPositionDbk) - .filter(Objects::nonNull) - .distinct() - .toList()), LeitstandContractPosition::getDbk); + List contractPositionIds = costUnitsByDbk.values().stream().map(LeitstandCostUnit::getContractPositionDbk).filter(Objects::nonNull).distinct().toList(); + Map contractPositionsByDbk = indexBy(contractPositionRepository.findAllById(contractPositionIds), LeitstandContractPosition::getDbk); - Set organizationDbks = new LinkedHashSet<>(); - costUnitsByDbk.values().stream().map(LeitstandCostUnit::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationDbks::add); - contractsByDbk.values().stream().map(LeitstandContract::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationDbks::add); - if (person != null && person.getOrganizationDbk() != null) { - organizationDbks.add(person.getOrganizationDbk()); + Set organizationIds = new LinkedHashSet<>(); + costUnitsByDbk.values().stream().map(LeitstandCostUnit::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationIds::add); + contractsByDbk.values().stream().map(LeitstandContract::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationIds::add); + recordings.stream().map(LeitstandTimeRecording::getPersonDbk).filter(Objects::nonNull).forEach(id -> {}); + List personIds = recordings.stream().map(LeitstandTimeRecording::getPersonDbk).filter(Objects::nonNull).distinct().toList(); + Map personsByDbk = indexBy(personRepository.findAllById(personIds), LeitstandPerson::getDbk); + personsByDbk.values().stream().map(LeitstandPerson::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationIds::add); + Map organizationsByDbk = indexBy(organizationRepository.findAllById(organizationIds), LeitstandOrganization::getDbk); + + List activityTypeIds = recordings.stream().map(LeitstandTimeRecording::getActivityTypeId).filter(Objects::nonNull).distinct().toList(); + Map activityTypesById = indexBy(activityTypeRepository.findAllById(activityTypeIds), LeitstandActivityType::getId); + + List timeEntryIds = recordings.stream().map(LeitstandTimeRecording::getTimeEntry).filter(Objects::nonNull).map(TimeEntry::getId).filter(Objects::nonNull).distinct().toList(); + Map timeEntriesById = timeEntryRepository.findAllById(timeEntryIds).stream().collect(Collectors.toMap(TimeEntry::getId, Function.identity())); + Map existingProjectionsByTimeEntryId = projectionRepository.findByTimeEntry_IdIn(timeEntryIds).stream().collect(Collectors.toMap(p -> p.getTimeEntry().getId(), Function.identity())); + + return new ProjectionBuildContext(assignmentsByRecordingDbk, personTaskAssignmentsByDbk, tasksByDbk, costUnitsByDbk, + contractsByDbk, contractPositionsByDbk, organizationsByDbk, personsByDbk, activityTypesById, + timeEntriesById, existingProjectionsByTimeEntryId); + } + + private TimeEntrySearchProjection buildProjection(LeitstandTimeRecording recording, ProjectionBuildContext ctx) { + TimeEntry timeEntry = ctx.timeEntriesById.get(recording.getTimeEntry().getId()); + if (timeEntry == null) { + throw new IllegalArgumentException("Unknown TIME entry id: " + recording.getTimeEntry().getId()); } - Map organizationsByDbk = indexBy(organizationRepository.findAllById(organizationDbks), LeitstandOrganization::getDbk); + Document document = timeEntry.getDocument(); + + LeitstandPerson person = recording.getPersonDbk() == null ? null : ctx.personsByDbk.get(recording.getPersonDbk()); + LeitstandActivityType activityType = recording.getActivityTypeId() == null ? null : ctx.activityTypesById.get(recording.getActivityTypeId()); + + List assignments = ctx.assignmentsByRecordingDbk.getOrDefault(recording.getDbk(), List.of()); + List personTaskAssignments = assignments.stream() + .map(a -> ctx.personTaskAssignmentsByDbk.get(a.getPersonTaskAssignmentDbk())) + .filter(Objects::nonNull) + .distinct() + .toList(); List orderedTasks = assignments.stream() - .map(a -> ptaByDbk.get(a.getPersonTaskAssignmentDbk())) + .map(a -> ctx.personTaskAssignmentsByDbk.get(a.getPersonTaskAssignmentDbk())) .filter(Objects::nonNull) - .map(pta -> tasksByDbk.get(pta.getTaskDbk())) + .map(pta -> ctx.tasksByDbk.get(pta.getTaskDbk())) .filter(Objects::nonNull) .distinct() .toList(); List orderedCostUnits = assignments.stream() - .map(a -> ptaByDbk.get(a.getPersonTaskAssignmentDbk())) + .map(a -> ctx.personTaskAssignmentsByDbk.get(a.getPersonTaskAssignmentDbk())) .filter(Objects::nonNull) - .map(pta -> costUnitsByDbk.get(pta.getCostUnitDbk())) + .map(pta -> ctx.costUnitsByDbk.get(pta.getCostUnitDbk())) .filter(Objects::nonNull) .distinct() .toList(); List orderedContracts = orderedCostUnits.stream() - .map(cu -> contractsByDbk.get(cu.getContractDbk())) + .map(cu -> ctx.contractsByDbk.get(cu.getContractDbk())) .filter(Objects::nonNull) .distinct() .toList(); List orderedContractPositions = orderedCostUnits.stream() - .map(cu -> contractPositionsByDbk.get(cu.getContractPositionDbk())) + .map(cu -> ctx.contractPositionsByDbk.get(cu.getContractPositionDbk())) .filter(Objects::nonNull) .distinct() .toList(); List orderedOrganizations = new ArrayList<>(); - orderedCostUnits.stream().map(cu -> organizationsByDbk.get(cu.getOrganizationDbk())).filter(Objects::nonNull).forEach(org -> { if (!orderedOrganizations.contains(org)) orderedOrganizations.add(org); }); - orderedContracts.stream().map(c -> organizationsByDbk.get(c.getOrganizationDbk())).filter(Objects::nonNull).forEach(org -> { if (!orderedOrganizations.contains(org)) orderedOrganizations.add(org); }); + orderedCostUnits.stream().map(cu -> ctx.organizationsByDbk.get(cu.getOrganizationDbk())).filter(Objects::nonNull).forEach(org -> { if (!orderedOrganizations.contains(org)) orderedOrganizations.add(org); }); + orderedContracts.stream().map(c -> ctx.organizationsByDbk.get(c.getOrganizationDbk())).filter(Objects::nonNull).forEach(org -> { if (!orderedOrganizations.contains(org)) orderedOrganizations.add(org); }); if (person != null && person.getOrganizationDbk() != null) { - LeitstandOrganization personOrg = organizationsByDbk.get(person.getOrganizationDbk()); + LeitstandOrganization personOrg = ctx.organizationsByDbk.get(person.getOrganizationDbk()); if (personOrg != null && !orderedOrganizations.contains(personOrg)) orderedOrganizations.add(personOrg); } @@ -176,8 +210,7 @@ public class LeitstandTimeProjectionService { String summary = buildSummary(recording, primaryTask, primaryCostUnit, primaryOrganization, person); String semanticText = buildSemanticText(timeEntry, recording, person, activityType, orderedTasks, orderedCostUnits, orderedContracts, orderedContractPositions, orderedOrganizations); - TimeEntrySearchProjection projection = projectionRepository.findByTimeEntry_Id(timeEntry.getId()) - .orElseGet(() -> TimeEntrySearchProjection.builder().timeEntry(timeEntry).document(document).build()); + TimeEntrySearchProjection projection = ctx.existingProjectionsByTimeEntryId.getOrDefault(timeEntry.getId(), TimeEntrySearchProjection.builder().timeEntry(timeEntry).document(document).build()); projection.setDocument(document); projection.setTimeEntry(timeEntry); projection.setSourceSystem(TimeSourceSystem.LEITSTAND); @@ -229,6 +262,19 @@ public class LeitstandTimeProjectionService { return projection; } + private record ProjectionBuildContext( + Map> assignmentsByRecordingDbk, + Map personTaskAssignmentsByDbk, + Map tasksByDbk, + Map costUnitsByDbk, + Map contractsByDbk, + Map contractPositionsByDbk, + Map organizationsByDbk, + Map personsByDbk, + Map activityTypesById, + Map timeEntriesById, + Map existingProjectionsByTimeEntryId) { + } private String buildSummary(LeitstandTimeRecording recording, LeitstandTask primaryTask, LeitstandCostUnit primaryCostUnit, @@ -283,7 +329,7 @@ public class LeitstandTimeProjectionService { return sb.toString().trim(); } - private Map indexBy(Collection rows, Function id) { + private Map indexBy(Collection rows, Function id) { return rows.stream() .filter(Objects::nonNull) .collect(Collectors.toMap(id, Function.identity(), (a, b) -> a, LinkedHashMap::new)); diff --git a/src/main/java/at/procon/dip/domain/time/service/TimeEntryRepresentationMaterializationService.java b/src/main/java/at/procon/dip/domain/time/service/TimeEntryRepresentationMaterializationService.java index 47c8015..b8b4722 100644 --- a/src/main/java/at/procon/dip/domain/time/service/TimeEntryRepresentationMaterializationService.java +++ b/src/main/java/at/procon/dip/domain/time/service/TimeEntryRepresentationMaterializationService.java @@ -13,10 +13,16 @@ import at.procon.dip.embedding.config.EmbeddingProperties; import at.procon.dip.embedding.registry.EmbeddingModelRegistry; import at.procon.dip.embedding.service.RepresentationEmbeddingOrchestrator; import at.procon.dip.search.service.DocumentLexicalIndexService; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @Service @@ -35,69 +41,141 @@ public class TimeEntryRepresentationMaterializationService { private final EmbeddingProperties embeddingProperties; private final EmbeddingModelRegistry modelRegistry; - //@Transactional public void upsertRepresentations(TimeEntrySearchProjection projection) { - if (projection.getSemanticText() == null || projection.getSemanticText().isBlank()) { - log.debug("Skipping TIME representation for document {} because semantic text is blank", projection.getDocument().getId()); + if (projection == null) { + return; + } + upsertRepresentations(List.of(projection)); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void upsertRepresentations(List projections) { + if (projections == null || projections.isEmpty()) { return; } - Document document = projection.getDocument(); - document.setTitle(firstNonBlank(projection.getSummaryText(), projection.getTimeRecordingDesc(), projection.getPrimaryTaskName(), projection.getExternalId())); - document.setSummary(projection.getSummaryText()); - document.setLanguageCode(firstNonBlank(projection.getLanguageCode(), document.getLanguageCode())); - if (document.getMimeType() == null || document.getMimeType().isBlank()) { - document.setMimeType("application/x-time-entry"); + List eligible = projections.stream() + .filter(projection -> documentId(projection) != null) + .filter(projection -> projection.getSemanticText() != null && !projection.getSemanticText().isBlank()) + .toList(); + if (eligible.isEmpty()) { + return; } - document = documentRepository.save(document); - Optional existing = representationRepository - .findByDocument_IdAndRepresentationType(document.getId(), RepresentationType.SEMANTIC_TEXT) - .stream() - .filter(r -> BUILDER_KEY.equals(r.getBuilderKey()) || r.isPrimaryRepresentation()) - .findFirst(); + List documentIds = eligible.stream() + .map(this::documentId) + .distinct() + .toList(); + Map documentsById = documentRepository.findAllById(documentIds).stream() + .collect(java.util.stream.Collectors.toMap(Document::getId, java.util.function.Function.identity(), (a, b) -> a, LinkedHashMap::new)); + List documentsToSave = new ArrayList<>(); + for (TimeEntrySearchProjection projection : eligible) { + UUID documentId = documentId(projection); + Document document = documentsById.get(documentId); + if (document == null || documentsToSave.contains(document)) { + continue; + } + document.setTitle(firstNonBlank(projection.getSummaryText(), projection.getTimeRecordingDesc(), projection.getPrimaryTaskName(), projection.getExternalId())); + document.setSummary(projection.getSummaryText()); + document.setLanguageCode(firstNonBlank(projection.getLanguageCode(), document.getLanguageCode())); + if (document.getMimeType() == null || document.getMimeType().isBlank()) { + document.setMimeType("application/x-time-entry"); + } + documentsToSave.add(document); + } + if (!documentsToSave.isEmpty()) { + documentRepository.saveAll(documentsToSave); + documentRepository.flush(); + } - boolean changed = existing.isEmpty() - || !projection.getSemanticText().equals(existing.get().getTextBody()) - || !equalsNullable(projection.getLanguageCode(), existing.get().getLanguageCode()) - || !BUILDER_KEY.equals(existing.get().getBuilderKey()); + List changedExisting = new ArrayList<>(); + List newRepresentationProjections = new ArrayList<>(); + List changedRepresentationIds = new ArrayList<>(); + List newlyCreatedRepresentations = new ArrayList<>(); - Document finalDocument = document; - DocumentTextRepresentation semantic = existing - .map(found -> changed ? updateRepresentation(found, projection) : found) - .orElseGet(() -> documentRepresentationService.addRepresentation(new AddDocumentTextRepresentationCommand( - finalDocument.getId(), - null, - RepresentationType.SEMANTIC_TEXT, - BUILDER_KEY, - projection.getLanguageCode(), - null, - null, - null, - null, - true, - projection.getSemanticText(), - false - ))); + for (TimeEntrySearchProjection projection : eligible) { + Document document = documentsById.get(documentId(projection)); + if (document == null) { + continue; + } + Optional existing = representationRepository + .findByDocument_IdAndRepresentationType(document.getId(), RepresentationType.SEMANTIC_TEXT) + .stream() + .filter(r -> BUILDER_KEY.equals(r.getBuilderKey()) || r.isPrimaryRepresentation()) + .findFirst(); - if (changed - && embeddingProperties.isEnabled() + boolean changed = existing.isEmpty() + || !projection.getSemanticText().equals(existing.get().getTextBody()) + || !equalsNullable(projection.getLanguageCode(), existing.get().getLanguageCode()) + || !BUILDER_KEY.equals(existing.get().getBuilderKey()); + + if (!changed) { + continue; + } + + if (existing.isPresent()) { + DocumentTextRepresentation found = existing.get(); + found.setBuilderKey(BUILDER_KEY); + found.setLanguageCode(projection.getLanguageCode()); + found.setPrimaryRepresentation(true); + found.setTextBody(projection.getSemanticText()); + found.setCharCount(projection.getSemanticText().length()); + changedExisting.add(found); + } else { + newRepresentationProjections.add(projection); + } + } + + if (!changedExisting.isEmpty()) { + representationRepository.saveAll(changedExisting); + representationRepository.flush(); + changedExisting.stream().map(DocumentTextRepresentation::getId).forEach(changedRepresentationIds::add); + } + + for (TimeEntrySearchProjection projection : newRepresentationProjections) { + Document document = documentsById.get(documentId(projection)); + if (document == null) { + continue; + } + DocumentTextRepresentation created = documentRepresentationService.addRepresentation(new AddDocumentTextRepresentationCommand( + document.getId(), + null, + RepresentationType.SEMANTIC_TEXT, + BUILDER_KEY, + projection.getLanguageCode(), + null, + null, + null, + null, + true, + projection.getSemanticText(), + false + )); + newlyCreatedRepresentations.add(created); + changedRepresentationIds.add(created.getId()); + } + + for (UUID representationId : changedRepresentationIds) { + lexicalIndexService.indexRepresentation(representationId); + } + + if (embeddingProperties.isEnabled() && timeDomainProperties.getLeitstand().isQueueEmbeddings() - && embeddingProperties.getDefaultDocumentModel() != null && !embeddingProperties.getDefaultDocumentModel().isBlank()) { + && embeddingProperties.getDefaultDocumentModel() != null + && !embeddingProperties.getDefaultDocumentModel().isBlank()) { String modelKey = modelRegistry.getRequiredDefaultDocumentModelKey(); - embeddingOrchestrator.enqueueRepresentation(document.getId(), semantic.getId(), modelKey); + for (DocumentTextRepresentation representation : changedExisting) { + embeddingOrchestrator.enqueueRepresentation(representation.getDocument().getId(), representation.getId(), modelKey); + } + for (DocumentTextRepresentation representation : newlyCreatedRepresentations) { + embeddingOrchestrator.enqueueRepresentation(representation.getDocument().getId(), representation.getId(), modelKey); + } } } - private DocumentTextRepresentation updateRepresentation(DocumentTextRepresentation existing, TimeEntrySearchProjection projection) { - existing.setBuilderKey(BUILDER_KEY); - existing.setLanguageCode(projection.getLanguageCode()); - existing.setPrimaryRepresentation(true); - existing.setTextBody(projection.getSemanticText()); - existing.setCharCount(projection.getSemanticText().length()); - DocumentTextRepresentation saved = representationRepository.saveAndFlush(existing); - lexicalIndexService.indexRepresentation(saved.getId()); - return saved; + private UUID documentId(TimeEntrySearchProjection projection) { + Document document = projection == null ? null : projection.getDocument(); + return document == null ? null : document.getId(); } private boolean equalsNullable(String left, String right) { diff --git a/src/main/java/at/procon/dip/domain/time/startup/LeitstandTimeSelectiveMaterializationStartupRunner.java b/src/main/java/at/procon/dip/domain/time/startup/LeitstandTimeSelectiveMaterializationStartupRunner.java index 64c4e1a..7e1b57a 100644 --- a/src/main/java/at/procon/dip/domain/time/startup/LeitstandTimeSelectiveMaterializationStartupRunner.java +++ b/src/main/java/at/procon/dip/domain/time/startup/LeitstandTimeSelectiveMaterializationStartupRunner.java @@ -37,6 +37,8 @@ public class LeitstandTimeSelectiveMaterializationStartupRunner implements Appli log.info("Completed selective Leitstand TIME materialization for personNumber={}. Processed {} recordings", cfg.getSelectiveMaterializationPersonNumber(), count); return; } - throw new IllegalStateException("dip.time.leitstand.startup-selective-materialization-enabled=true requires either selective-materialization-person-dbk or selective-materialization-person-number"); + log.info("Starting Leitstand TIME materialization for all imported recordings (rebuildProjection={})", rebuildProjection); + int count = importService.materializeCanonicalTimeEntriesForAll(rebuildProjection); + log.info("Completed Leitstand TIME materialization for all imported recordings. Processed {} recordings", count); } }