Optimize Leitstand TIME materialization workflow

This commit is contained in:
trifonovt 2026-05-18 14:02:47 +02:00
parent 430885b5af
commit 253845e9ea
8 changed files with 265 additions and 121 deletions

View File

@ -16,8 +16,8 @@ import org.springframework.scheduling.annotation.EnableAsync;
*/ */
@SpringBootApplication(scanBasePackages = {"at.procon.dip", "at.procon.ted"}) @SpringBootApplication(scanBasePackages = {"at.procon.dip", "at.procon.ted"})
@EnableAsync @EnableAsync
@EntityScan(basePackages = {"at.procon.ted.model.entity", "at.procon.dip.domain.document.entity", "at.procon.dip.domain.tenant.entity", "at.procon.dip.domain.ted.entity", "at.procon.dip.embedding.job.entity", "at.procon.dip.migration.audit.entity", "at.procon.dip.migration.entity", /*"at.procon.dip.domain.time.entity",*/ "at.procon.dip.clustering.entity"}) @EntityScan(basePackages = {"at.procon.ted.model.entity", "at.procon.dip.domain.document.entity", "at.procon.dip.domain.tenant.entity", "at.procon.dip.domain.ted.entity", "at.procon.dip.embedding.job.entity", "at.procon.dip.migration.audit.entity", "at.procon.dip.migration.entity", "at.procon.dip.domain.time.entity",/**/ "at.procon.dip.clustering.entity"})
@EnableJpaRepositories(basePackages = {"at.procon.ted.repository", "at.procon.dip.domain.document.repository", "at.procon.dip.domain.tenant.repository", "at.procon.dip.domain.ted.repository", "at.procon.dip.embedding.job.repository", "at.procon.dip.migration.audit.repository", "at.procon.dip.migration.repository", /*"at.procon.dip.domain.time.repository",*/ "at.procon.dip.clustering.repository"}) @EnableJpaRepositories(basePackages = {"at.procon.ted.repository", "at.procon.dip.domain.document.repository", "at.procon.dip.domain.tenant.repository", "at.procon.dip.domain.ted.repository", "at.procon.dip.embedding.job.repository", "at.procon.dip.migration.audit.repository", "at.procon.dip.migration.repository", "at.procon.dip.domain.time.repository",/**/ "at.procon.dip.clustering.repository"})
public class DocumentIntelligencePlatformApplication { public class DocumentIntelligencePlatformApplication {
public static void main(String[] args) { public static void main(String[] args) {

View File

@ -33,6 +33,7 @@ public class TimeDomainProperties {
private String selectiveMaterializationPersonDbk; private String selectiveMaterializationPersonDbk;
private Integer selectiveMaterializationPersonNumber; private Integer selectiveMaterializationPersonNumber;
private boolean selectiveMaterializationBuildProjection = true; private boolean selectiveMaterializationBuildProjection = true;
private int materializationChunkSize = 200;
private String representationLanguageCode = "de"; private String representationLanguageCode = "de";
private String scopeKey = "leitstand-default"; private String scopeKey = "leitstand-default";
private JdbcProperties jdbc = new JdbcProperties(); private JdbcProperties jdbc = new JdbcProperties();

View File

@ -1,10 +1,13 @@
package at.procon.dip.domain.time.repository.leitstand; package at.procon.dip.domain.time.repository.leitstand;
import at.procon.dip.domain.time.entity.leitstand.LeitstandTimeRecordingAssignment; import at.procon.dip.domain.time.entity.leitstand.LeitstandTimeRecordingAssignment;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
public interface LeitstandTimeRecordingAssignmentRepository extends JpaRepository<LeitstandTimeRecordingAssignment, String> { public interface LeitstandTimeRecordingAssignmentRepository extends JpaRepository<LeitstandTimeRecordingAssignment, String> {
List<LeitstandTimeRecordingAssignment> findByTimeRecordingDbkOrderByDbkAsc(String timeRecordingDbk); List<LeitstandTimeRecordingAssignment> findByTimeRecordingDbkOrderByDbkAsc(String timeRecordingDbk);
List<LeitstandTimeRecordingAssignment> findByTimeRecordingDbkInOrderByTimeRecordingDbkAscDbkAsc(Collection<String> timeRecordingDbks);
} }

View File

@ -10,6 +10,8 @@ public interface LeitstandTimeRecordingRepository extends JpaRepository<Leitstan
Optional<LeitstandTimeRecording> findByTimeEntry_Id(UUID timeEntryId); Optional<LeitstandTimeRecording> findByTimeEntry_Id(UUID timeEntryId);
List<LeitstandTimeRecording> findAllByOrderByRecordedFromAscDbkAsc();
List<LeitstandTimeRecording> findByTimeEntryIsNotNull(); List<LeitstandTimeRecording> findByTimeEntryIsNotNull();
List<LeitstandTimeRecording> findByPersonDbkOrderByRecordedFromAscDbkAsc(String personDbk); List<LeitstandTimeRecording> findByPersonDbkOrderByRecordedFromAscDbkAsc(String personDbk);

View File

@ -38,7 +38,7 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@Service @Service
//@ConditionalOnRuntimeMode(RuntimeMode.NEW) @ConditionalOnRuntimeMode(RuntimeMode.NEW)
@ConditionalOnProperty(prefix = "dip.time.leitstand", name = "enabled", havingValue = "true") @ConditionalOnProperty(prefix = "dip.time.leitstand", name = "enabled", havingValue = "true")
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@ -144,14 +144,26 @@ public class LeitstandTimeImportService {
log.info("No Leitstand time recordings found for personDbk={}", personDbk); log.info("No Leitstand time recordings found for personDbk={}", personDbk);
return 0; return 0;
} }
//upsertCanonicalTimeEntriesForImportedRecordings(recordings); upsertCanonicalTimeEntriesForImportedRecordings(recordings);
if (rebuildProjection && properties.getLeitstand().isBuildSearchProjection()) { if (rebuildProjection && properties.getLeitstand().isBuildSearchProjection()) {
projectionService.refreshForPersonDbk(personDbk); projectionService.refreshForPersonDbk(personDbk);
} }
return recordings.size(); return recordings.size();
} }
@Transactional public int materializeCanonicalTimeEntriesForAll(boolean rebuildProjection) {
List<LeitstandTimeRecording> recordings = timeRecordingRepository.findAllByOrderByRecordedFromAscDbkAsc();
if (recordings.isEmpty()) {
log.info("No Leitstand time recordings found for full materialization");
return 0;
}
upsertCanonicalTimeEntriesForImportedRecordings(recordings);
if (rebuildProjection && properties.getLeitstand().isBuildSearchProjection()) {
projectionService.refreshAll();
}
return recordings.size();
}
public int materializeCanonicalTimeEntriesForPersonNumber(Integer personNumber, boolean rebuildProjection) { public int materializeCanonicalTimeEntriesForPersonNumber(Integer personNumber, boolean rebuildProjection) {
if (personNumber == null) { if (personNumber == null) {
throw new IllegalArgumentException("personNumber must not be null"); throw new IllegalArgumentException("personNumber must not be null");

View File

@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@Service @Service
@ -44,126 +45,159 @@ public class LeitstandTimeProjectionService {
private final TimeEntrySearchProjectionRepository projectionRepository; private final TimeEntrySearchProjectionRepository projectionRepository;
private final TimeEntryRepresentationMaterializationService representationMaterializationService; private final TimeEntryRepresentationMaterializationService representationMaterializationService;
@Transactional
public void refreshForLeitstandRecordingDbks(Collection<String> recordingDbks) { public void refreshForLeitstandRecordingDbks(Collection<String> recordingDbks) {
if (recordingDbks == null || recordingDbks.isEmpty()) { if (recordingDbks == null || recordingDbks.isEmpty()) {
return; return;
} }
List<LeitstandTimeRecording> recordings = timeRecordingRepository.findAllById(recordingDbks).stream() List<LeitstandTimeRecording> recordings = timeRecordingRepository.findAllById(recordingDbks).stream()
.filter(recording -> recording.getTimeEntry() != null) .filter(recording -> recording.getTimeEntry() != null)
.sorted(Comparator.comparing(LeitstandTimeRecording::getRecordedFrom, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(LeitstandTimeRecording::getDbk))
.toList(); .toList();
if (recordings.isEmpty()) { refreshChunked(recordings);
return;
}
upsertProjections(recordings);
} }
@Transactional
public int refreshForPersonDbk(String personDbk) { public int refreshForPersonDbk(String personDbk) {
if (personDbk == null || personDbk.isBlank()) { if (personDbk == null || personDbk.isBlank()) {
return 0; return 0;
} }
List<LeitstandTimeRecording> recordings = timeRecordingRepository List<LeitstandTimeRecording> recordings = timeRecordingRepository
.findByPersonDbkAndTimeEntryIsNotNullOrderByRecordedFromAscDbkAsc(personDbk); .findByPersonDbkAndTimeEntryIsNotNullOrderByRecordedFromAscDbkAsc(personDbk);
upsertProjections(recordings); refreshChunked(recordings);
return recordings.size(); return recordings.size();
} }
@Transactional
public int refreshAll() { public int refreshAll() {
List<LeitstandTimeRecording> recordings = timeRecordingRepository.findByTimeEntryIsNotNull(); List<LeitstandTimeRecording> recordings = timeRecordingRepository.findByTimeEntryIsNotNull().stream()
upsertProjections(recordings); .sorted(Comparator.comparing(LeitstandTimeRecording::getRecordedFrom, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(LeitstandTimeRecording::getDbk))
.toList();
refreshChunked(recordings);
return recordings.size(); return recordings.size();
} }
private void upsertProjections(List<LeitstandTimeRecording> recordings) { private void refreshChunked(List<LeitstandTimeRecording> recordings) {
for (LeitstandTimeRecording recording : recordings) { if (recordings == null || recordings.isEmpty()) {
TimeEntrySearchProjection projection = buildProjection(recording); return;
TimeEntrySearchProjection saved = projectionRepository.save(projection); }
if (properties.getLeitstand().isBuildRepresentations()) { int chunkSize = Math.max(1, properties.getLeitstand().getMaterializationChunkSize());
representationMaterializationService.upsertRepresentations(saved); for (int start = 0; start < recordings.size(); start += chunkSize) {
} List<LeitstandTimeRecording> chunk = recordings.subList(start, Math.min(start + chunkSize, recordings.size()));
refreshChunk(chunk);
} }
} }
private TimeEntrySearchProjection buildProjection(LeitstandTimeRecording recording) { @Transactional(propagation = Propagation.REQUIRES_NEW)
TimeEntry timeEntry = timeEntryRepository.findById(recording.getTimeEntry().getId()) protected void refreshChunk(List<LeitstandTimeRecording> recordings) {
.orElseThrow(() -> new IllegalArgumentException("Unknown TIME entry id: " + recording.getTimeEntry().getId())); if (recordings == null || recordings.isEmpty()) {
Document document = timeEntry.getDocument(); return;
}
ProjectionBuildContext ctx = preloadContext(recordings);
List<TimeEntrySearchProjection> projections = new ArrayList<>(recordings.size());
for (LeitstandTimeRecording recording : recordings) {
projections.add(buildProjection(recording, ctx));
}
List<TimeEntrySearchProjection> saved = projectionRepository.saveAll(projections);
projectionRepository.flush();
if (properties.getLeitstand().isBuildRepresentations()) {
representationMaterializationService.upsertRepresentations(saved);
}
}
LeitstandPerson person = recording.getPersonDbk() == null ? null : personRepository.findById(recording.getPersonDbk()).orElse(null); private ProjectionBuildContext preloadContext(List<LeitstandTimeRecording> recordings) {
LeitstandActivityType activityType = recording.getActivityTypeId() == null ? null : activityTypeRepository.findById(recording.getActivityTypeId()).orElse(null); List<String> recordingDbks = recordings.stream().map(LeitstandTimeRecording::getDbk).toList();
List<LeitstandTimeRecordingAssignment> assignments = timeRecordingAssignmentRepository
.findByTimeRecordingDbkInOrderByTimeRecordingDbkAscDbkAsc(recordingDbks);
Map<String, List<LeitstandTimeRecordingAssignment>> assignmentsByRecordingDbk = assignments.stream()
.collect(Collectors.groupingBy(LeitstandTimeRecordingAssignment::getTimeRecordingDbk, LinkedHashMap::new, Collectors.toList()));
List<LeitstandTimeRecordingAssignment> assignments = timeRecordingAssignmentRepository.findByTimeRecordingDbkOrderByDbkAsc(recording.getDbk()); List<String> personTaskAssignmentIds = assignments.stream()
List<LeitstandPersonTaskAssignment> personTaskAssignments = personTaskAssignmentRepository.findAllById(assignments.stream()
.map(LeitstandTimeRecordingAssignment::getPersonTaskAssignmentDbk) .map(LeitstandTimeRecordingAssignment::getPersonTaskAssignmentDbk)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.distinct() .distinct()
.toList()); .toList();
Map<String, LeitstandPersonTaskAssignment> ptaByDbk = indexBy(personTaskAssignments, LeitstandPersonTaskAssignment::getDbk); List<LeitstandPersonTaskAssignment> personTaskAssignments = personTaskAssignmentRepository.findAllById(personTaskAssignmentIds);
Map<String, LeitstandPersonTaskAssignment> personTaskAssignmentsByDbk = indexBy(personTaskAssignments, LeitstandPersonTaskAssignment::getDbk);
Map<String, LeitstandTask> tasksByDbk = indexBy(taskRepository.findAllById(personTaskAssignments.stream() List<String> taskIds = personTaskAssignments.stream().map(LeitstandPersonTaskAssignment::getTaskDbk).filter(Objects::nonNull).distinct().toList();
.map(LeitstandPersonTaskAssignment::getTaskDbk) Map<String, LeitstandTask> tasksByDbk = indexBy(taskRepository.findAllById(taskIds), LeitstandTask::getDbk);
.filter(Objects::nonNull)
.distinct()
.toList()), LeitstandTask::getDbk);
Map<String, LeitstandCostUnit> costUnitsByDbk = indexBy(costUnitRepository.findAllById(personTaskAssignments.stream() List<String> costUnitIds = personTaskAssignments.stream().map(LeitstandPersonTaskAssignment::getCostUnitDbk).filter(Objects::nonNull).distinct().toList();
.map(LeitstandPersonTaskAssignment::getCostUnitDbk) Map<String, LeitstandCostUnit> costUnitsByDbk = indexBy(costUnitRepository.findAllById(costUnitIds), LeitstandCostUnit::getDbk);
.filter(Objects::nonNull)
.distinct()
.toList()), LeitstandCostUnit::getDbk);
Map<String, LeitstandContract> contractsByDbk = indexBy(contractRepository.findAllById(costUnitsByDbk.values().stream() List<String> contractIds = costUnitsByDbk.values().stream().map(LeitstandCostUnit::getContractDbk).filter(Objects::nonNull).distinct().toList();
.map(LeitstandCostUnit::getContractDbk) Map<String, LeitstandContract> contractsByDbk = indexBy(contractRepository.findAllById(contractIds), LeitstandContract::getDbk);
.filter(Objects::nonNull)
.distinct()
.toList()), LeitstandContract::getDbk);
Map<String, LeitstandContractPosition> contractPositionsByDbk = indexBy(contractPositionRepository.findAllById(costUnitsByDbk.values().stream() List<String> contractPositionIds = costUnitsByDbk.values().stream().map(LeitstandCostUnit::getContractPositionDbk).filter(Objects::nonNull).distinct().toList();
.map(LeitstandCostUnit::getContractPositionDbk) Map<String, LeitstandContractPosition> contractPositionsByDbk = indexBy(contractPositionRepository.findAllById(contractPositionIds), LeitstandContractPosition::getDbk);
.filter(Objects::nonNull)
.distinct()
.toList()), LeitstandContractPosition::getDbk);
Set<String> organizationDbks = new LinkedHashSet<>(); Set<String> organizationIds = new LinkedHashSet<>();
costUnitsByDbk.values().stream().map(LeitstandCostUnit::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationDbks::add); costUnitsByDbk.values().stream().map(LeitstandCostUnit::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationIds::add);
contractsByDbk.values().stream().map(LeitstandContract::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationDbks::add); contractsByDbk.values().stream().map(LeitstandContract::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationIds::add);
if (person != null && person.getOrganizationDbk() != null) { recordings.stream().map(LeitstandTimeRecording::getPersonDbk).filter(Objects::nonNull).forEach(id -> {});
organizationDbks.add(person.getOrganizationDbk()); List<String> personIds = recordings.stream().map(LeitstandTimeRecording::getPersonDbk).filter(Objects::nonNull).distinct().toList();
Map<String, LeitstandPerson> personsByDbk = indexBy(personRepository.findAllById(personIds), LeitstandPerson::getDbk);
personsByDbk.values().stream().map(LeitstandPerson::getOrganizationDbk).filter(Objects::nonNull).forEach(organizationIds::add);
Map<String, LeitstandOrganization> organizationsByDbk = indexBy(organizationRepository.findAllById(organizationIds), LeitstandOrganization::getDbk);
List<Integer> activityTypeIds = recordings.stream().map(LeitstandTimeRecording::getActivityTypeId).filter(Objects::nonNull).distinct().toList();
Map<Integer, LeitstandActivityType> activityTypesById = indexBy(activityTypeRepository.findAllById(activityTypeIds), LeitstandActivityType::getId);
List<UUID> timeEntryIds = recordings.stream().map(LeitstandTimeRecording::getTimeEntry).filter(Objects::nonNull).map(TimeEntry::getId).filter(Objects::nonNull).distinct().toList();
Map<UUID, TimeEntry> timeEntriesById = timeEntryRepository.findAllById(timeEntryIds).stream().collect(Collectors.toMap(TimeEntry::getId, Function.identity()));
Map<UUID, TimeEntrySearchProjection> existingProjectionsByTimeEntryId = projectionRepository.findByTimeEntry_IdIn(timeEntryIds).stream().collect(Collectors.toMap(p -> p.getTimeEntry().getId(), Function.identity()));
return new ProjectionBuildContext(assignmentsByRecordingDbk, personTaskAssignmentsByDbk, tasksByDbk, costUnitsByDbk,
contractsByDbk, contractPositionsByDbk, organizationsByDbk, personsByDbk, activityTypesById,
timeEntriesById, existingProjectionsByTimeEntryId);
}
private TimeEntrySearchProjection buildProjection(LeitstandTimeRecording recording, ProjectionBuildContext ctx) {
TimeEntry timeEntry = ctx.timeEntriesById.get(recording.getTimeEntry().getId());
if (timeEntry == null) {
throw new IllegalArgumentException("Unknown TIME entry id: " + recording.getTimeEntry().getId());
} }
Map<String, LeitstandOrganization> organizationsByDbk = indexBy(organizationRepository.findAllById(organizationDbks), LeitstandOrganization::getDbk); Document document = timeEntry.getDocument();
LeitstandPerson person = recording.getPersonDbk() == null ? null : ctx.personsByDbk.get(recording.getPersonDbk());
LeitstandActivityType activityType = recording.getActivityTypeId() == null ? null : ctx.activityTypesById.get(recording.getActivityTypeId());
List<LeitstandTimeRecordingAssignment> assignments = ctx.assignmentsByRecordingDbk.getOrDefault(recording.getDbk(), List.of());
List<LeitstandPersonTaskAssignment> personTaskAssignments = assignments.stream()
.map(a -> ctx.personTaskAssignmentsByDbk.get(a.getPersonTaskAssignmentDbk()))
.filter(Objects::nonNull)
.distinct()
.toList();
List<LeitstandTask> orderedTasks = assignments.stream() List<LeitstandTask> orderedTasks = assignments.stream()
.map(a -> ptaByDbk.get(a.getPersonTaskAssignmentDbk())) .map(a -> ctx.personTaskAssignmentsByDbk.get(a.getPersonTaskAssignmentDbk()))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(pta -> tasksByDbk.get(pta.getTaskDbk())) .map(pta -> ctx.tasksByDbk.get(pta.getTaskDbk()))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.distinct() .distinct()
.toList(); .toList();
List<LeitstandCostUnit> orderedCostUnits = assignments.stream() List<LeitstandCostUnit> orderedCostUnits = assignments.stream()
.map(a -> ptaByDbk.get(a.getPersonTaskAssignmentDbk())) .map(a -> ctx.personTaskAssignmentsByDbk.get(a.getPersonTaskAssignmentDbk()))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(pta -> costUnitsByDbk.get(pta.getCostUnitDbk())) .map(pta -> ctx.costUnitsByDbk.get(pta.getCostUnitDbk()))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.distinct() .distinct()
.toList(); .toList();
List<LeitstandContract> orderedContracts = orderedCostUnits.stream() List<LeitstandContract> orderedContracts = orderedCostUnits.stream()
.map(cu -> contractsByDbk.get(cu.getContractDbk())) .map(cu -> ctx.contractsByDbk.get(cu.getContractDbk()))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.distinct() .distinct()
.toList(); .toList();
List<LeitstandContractPosition> orderedContractPositions = orderedCostUnits.stream() List<LeitstandContractPosition> orderedContractPositions = orderedCostUnits.stream()
.map(cu -> contractPositionsByDbk.get(cu.getContractPositionDbk())) .map(cu -> ctx.contractPositionsByDbk.get(cu.getContractPositionDbk()))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.distinct() .distinct()
.toList(); .toList();
List<LeitstandOrganization> orderedOrganizations = new ArrayList<>(); List<LeitstandOrganization> orderedOrganizations = new ArrayList<>();
orderedCostUnits.stream().map(cu -> organizationsByDbk.get(cu.getOrganizationDbk())).filter(Objects::nonNull).forEach(org -> { if (!orderedOrganizations.contains(org)) orderedOrganizations.add(org); }); orderedCostUnits.stream().map(cu -> ctx.organizationsByDbk.get(cu.getOrganizationDbk())).filter(Objects::nonNull).forEach(org -> { if (!orderedOrganizations.contains(org)) orderedOrganizations.add(org); });
orderedContracts.stream().map(c -> organizationsByDbk.get(c.getOrganizationDbk())).filter(Objects::nonNull).forEach(org -> { if (!orderedOrganizations.contains(org)) orderedOrganizations.add(org); }); orderedContracts.stream().map(c -> ctx.organizationsByDbk.get(c.getOrganizationDbk())).filter(Objects::nonNull).forEach(org -> { if (!orderedOrganizations.contains(org)) orderedOrganizations.add(org); });
if (person != null && person.getOrganizationDbk() != null) { if (person != null && person.getOrganizationDbk() != null) {
LeitstandOrganization personOrg = organizationsByDbk.get(person.getOrganizationDbk()); LeitstandOrganization personOrg = ctx.organizationsByDbk.get(person.getOrganizationDbk());
if (personOrg != null && !orderedOrganizations.contains(personOrg)) orderedOrganizations.add(personOrg); if (personOrg != null && !orderedOrganizations.contains(personOrg)) orderedOrganizations.add(personOrg);
} }
@ -176,8 +210,7 @@ public class LeitstandTimeProjectionService {
String summary = buildSummary(recording, primaryTask, primaryCostUnit, primaryOrganization, person); String summary = buildSummary(recording, primaryTask, primaryCostUnit, primaryOrganization, person);
String semanticText = buildSemanticText(timeEntry, recording, person, activityType, orderedTasks, orderedCostUnits, orderedContracts, orderedContractPositions, orderedOrganizations); String semanticText = buildSemanticText(timeEntry, recording, person, activityType, orderedTasks, orderedCostUnits, orderedContracts, orderedContractPositions, orderedOrganizations);
TimeEntrySearchProjection projection = projectionRepository.findByTimeEntry_Id(timeEntry.getId()) TimeEntrySearchProjection projection = ctx.existingProjectionsByTimeEntryId.getOrDefault(timeEntry.getId(), TimeEntrySearchProjection.builder().timeEntry(timeEntry).document(document).build());
.orElseGet(() -> TimeEntrySearchProjection.builder().timeEntry(timeEntry).document(document).build());
projection.setDocument(document); projection.setDocument(document);
projection.setTimeEntry(timeEntry); projection.setTimeEntry(timeEntry);
projection.setSourceSystem(TimeSourceSystem.LEITSTAND); projection.setSourceSystem(TimeSourceSystem.LEITSTAND);
@ -229,6 +262,19 @@ public class LeitstandTimeProjectionService {
return projection; return projection;
} }
private record ProjectionBuildContext(
Map<String, List<LeitstandTimeRecordingAssignment>> assignmentsByRecordingDbk,
Map<String, LeitstandPersonTaskAssignment> personTaskAssignmentsByDbk,
Map<String, LeitstandTask> tasksByDbk,
Map<String, LeitstandCostUnit> costUnitsByDbk,
Map<String, LeitstandContract> contractsByDbk,
Map<String, LeitstandContractPosition> contractPositionsByDbk,
Map<String, LeitstandOrganization> organizationsByDbk,
Map<String, LeitstandPerson> personsByDbk,
Map<Integer, LeitstandActivityType> activityTypesById,
Map<UUID, TimeEntry> timeEntriesById,
Map<UUID, TimeEntrySearchProjection> existingProjectionsByTimeEntryId) {
}
private String buildSummary(LeitstandTimeRecording recording, private String buildSummary(LeitstandTimeRecording recording,
LeitstandTask primaryTask, LeitstandTask primaryTask,
LeitstandCostUnit primaryCostUnit, LeitstandCostUnit primaryCostUnit,
@ -283,7 +329,7 @@ public class LeitstandTimeProjectionService {
return sb.toString().trim(); return sb.toString().trim();
} }
private <T> Map<String, T> indexBy(Collection<T> rows, Function<T, String> id) { private <K, T> Map<K, T> indexBy(Collection<T> rows, Function<T, K> id) {
return rows.stream() return rows.stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toMap(id, Function.identity(), (a, b) -> a, LinkedHashMap::new)); .collect(Collectors.toMap(id, Function.identity(), (a, b) -> a, LinkedHashMap::new));

View File

@ -13,10 +13,16 @@ import at.procon.dip.embedding.config.EmbeddingProperties;
import at.procon.dip.embedding.registry.EmbeddingModelRegistry; import at.procon.dip.embedding.registry.EmbeddingModelRegistry;
import at.procon.dip.embedding.service.RepresentationEmbeddingOrchestrator; import at.procon.dip.embedding.service.RepresentationEmbeddingOrchestrator;
import at.procon.dip.search.service.DocumentLexicalIndexService; import at.procon.dip.search.service.DocumentLexicalIndexService;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.UUID;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@Service @Service
@ -35,69 +41,141 @@ public class TimeEntryRepresentationMaterializationService {
private final EmbeddingProperties embeddingProperties; private final EmbeddingProperties embeddingProperties;
private final EmbeddingModelRegistry modelRegistry; private final EmbeddingModelRegistry modelRegistry;
//@Transactional
public void upsertRepresentations(TimeEntrySearchProjection projection) { public void upsertRepresentations(TimeEntrySearchProjection projection) {
if (projection.getSemanticText() == null || projection.getSemanticText().isBlank()) { if (projection == null) {
log.debug("Skipping TIME representation for document {} because semantic text is blank", projection.getDocument().getId()); return;
}
upsertRepresentations(List.of(projection));
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void upsertRepresentations(List<TimeEntrySearchProjection> projections) {
if (projections == null || projections.isEmpty()) {
return; return;
} }
Document document = projection.getDocument(); List<TimeEntrySearchProjection> eligible = projections.stream()
document.setTitle(firstNonBlank(projection.getSummaryText(), projection.getTimeRecordingDesc(), projection.getPrimaryTaskName(), projection.getExternalId())); .filter(projection -> documentId(projection) != null)
document.setSummary(projection.getSummaryText()); .filter(projection -> projection.getSemanticText() != null && !projection.getSemanticText().isBlank())
document.setLanguageCode(firstNonBlank(projection.getLanguageCode(), document.getLanguageCode())); .toList();
if (document.getMimeType() == null || document.getMimeType().isBlank()) { if (eligible.isEmpty()) {
document.setMimeType("application/x-time-entry"); return;
} }
document = documentRepository.save(document);
Optional<DocumentTextRepresentation> existing = representationRepository List<UUID> documentIds = eligible.stream()
.findByDocument_IdAndRepresentationType(document.getId(), RepresentationType.SEMANTIC_TEXT) .map(this::documentId)
.stream() .distinct()
.filter(r -> BUILDER_KEY.equals(r.getBuilderKey()) || r.isPrimaryRepresentation()) .toList();
.findFirst(); Map<UUID, Document> documentsById = documentRepository.findAllById(documentIds).stream()
.collect(java.util.stream.Collectors.toMap(Document::getId, java.util.function.Function.identity(), (a, b) -> a, LinkedHashMap::new));
List<Document> documentsToSave = new ArrayList<>();
for (TimeEntrySearchProjection projection : eligible) {
UUID documentId = documentId(projection);
Document document = documentsById.get(documentId);
if (document == null || documentsToSave.contains(document)) {
continue;
}
document.setTitle(firstNonBlank(projection.getSummaryText(), projection.getTimeRecordingDesc(), projection.getPrimaryTaskName(), projection.getExternalId()));
document.setSummary(projection.getSummaryText());
document.setLanguageCode(firstNonBlank(projection.getLanguageCode(), document.getLanguageCode()));
if (document.getMimeType() == null || document.getMimeType().isBlank()) {
document.setMimeType("application/x-time-entry");
}
documentsToSave.add(document);
}
if (!documentsToSave.isEmpty()) {
documentRepository.saveAll(documentsToSave);
documentRepository.flush();
}
boolean changed = existing.isEmpty() List<DocumentTextRepresentation> changedExisting = new ArrayList<>();
|| !projection.getSemanticText().equals(existing.get().getTextBody()) List<TimeEntrySearchProjection> newRepresentationProjections = new ArrayList<>();
|| !equalsNullable(projection.getLanguageCode(), existing.get().getLanguageCode()) List<UUID> changedRepresentationIds = new ArrayList<>();
|| !BUILDER_KEY.equals(existing.get().getBuilderKey()); List<DocumentTextRepresentation> newlyCreatedRepresentations = new ArrayList<>();
Document finalDocument = document; for (TimeEntrySearchProjection projection : eligible) {
DocumentTextRepresentation semantic = existing Document document = documentsById.get(documentId(projection));
.map(found -> changed ? updateRepresentation(found, projection) : found) if (document == null) {
.orElseGet(() -> documentRepresentationService.addRepresentation(new AddDocumentTextRepresentationCommand( continue;
finalDocument.getId(), }
null, Optional<DocumentTextRepresentation> existing = representationRepository
RepresentationType.SEMANTIC_TEXT, .findByDocument_IdAndRepresentationType(document.getId(), RepresentationType.SEMANTIC_TEXT)
BUILDER_KEY, .stream()
projection.getLanguageCode(), .filter(r -> BUILDER_KEY.equals(r.getBuilderKey()) || r.isPrimaryRepresentation())
null, .findFirst();
null,
null,
null,
true,
projection.getSemanticText(),
false
)));
if (changed boolean changed = existing.isEmpty()
&& embeddingProperties.isEnabled() || !projection.getSemanticText().equals(existing.get().getTextBody())
|| !equalsNullable(projection.getLanguageCode(), existing.get().getLanguageCode())
|| !BUILDER_KEY.equals(existing.get().getBuilderKey());
if (!changed) {
continue;
}
if (existing.isPresent()) {
DocumentTextRepresentation found = existing.get();
found.setBuilderKey(BUILDER_KEY);
found.setLanguageCode(projection.getLanguageCode());
found.setPrimaryRepresentation(true);
found.setTextBody(projection.getSemanticText());
found.setCharCount(projection.getSemanticText().length());
changedExisting.add(found);
} else {
newRepresentationProjections.add(projection);
}
}
if (!changedExisting.isEmpty()) {
representationRepository.saveAll(changedExisting);
representationRepository.flush();
changedExisting.stream().map(DocumentTextRepresentation::getId).forEach(changedRepresentationIds::add);
}
for (TimeEntrySearchProjection projection : newRepresentationProjections) {
Document document = documentsById.get(documentId(projection));
if (document == null) {
continue;
}
DocumentTextRepresentation created = documentRepresentationService.addRepresentation(new AddDocumentTextRepresentationCommand(
document.getId(),
null,
RepresentationType.SEMANTIC_TEXT,
BUILDER_KEY,
projection.getLanguageCode(),
null,
null,
null,
null,
true,
projection.getSemanticText(),
false
));
newlyCreatedRepresentations.add(created);
changedRepresentationIds.add(created.getId());
}
for (UUID representationId : changedRepresentationIds) {
lexicalIndexService.indexRepresentation(representationId);
}
if (embeddingProperties.isEnabled()
&& timeDomainProperties.getLeitstand().isQueueEmbeddings() && timeDomainProperties.getLeitstand().isQueueEmbeddings()
&& embeddingProperties.getDefaultDocumentModel() != null && !embeddingProperties.getDefaultDocumentModel().isBlank()) { && embeddingProperties.getDefaultDocumentModel() != null
&& !embeddingProperties.getDefaultDocumentModel().isBlank()) {
String modelKey = modelRegistry.getRequiredDefaultDocumentModelKey(); String modelKey = modelRegistry.getRequiredDefaultDocumentModelKey();
embeddingOrchestrator.enqueueRepresentation(document.getId(), semantic.getId(), modelKey); for (DocumentTextRepresentation representation : changedExisting) {
embeddingOrchestrator.enqueueRepresentation(representation.getDocument().getId(), representation.getId(), modelKey);
}
for (DocumentTextRepresentation representation : newlyCreatedRepresentations) {
embeddingOrchestrator.enqueueRepresentation(representation.getDocument().getId(), representation.getId(), modelKey);
}
} }
} }
private DocumentTextRepresentation updateRepresentation(DocumentTextRepresentation existing, TimeEntrySearchProjection projection) { private UUID documentId(TimeEntrySearchProjection projection) {
existing.setBuilderKey(BUILDER_KEY); Document document = projection == null ? null : projection.getDocument();
existing.setLanguageCode(projection.getLanguageCode()); return document == null ? null : document.getId();
existing.setPrimaryRepresentation(true);
existing.setTextBody(projection.getSemanticText());
existing.setCharCount(projection.getSemanticText().length());
DocumentTextRepresentation saved = representationRepository.saveAndFlush(existing);
lexicalIndexService.indexRepresentation(saved.getId());
return saved;
} }
private boolean equalsNullable(String left, String right) { private boolean equalsNullable(String left, String right) {

View File

@ -37,6 +37,8 @@ public class LeitstandTimeSelectiveMaterializationStartupRunner implements Appli
log.info("Completed selective Leitstand TIME materialization for personNumber={}. Processed {} recordings", cfg.getSelectiveMaterializationPersonNumber(), count); log.info("Completed selective Leitstand TIME materialization for personNumber={}. Processed {} recordings", cfg.getSelectiveMaterializationPersonNumber(), count);
return; return;
} }
throw new IllegalStateException("dip.time.leitstand.startup-selective-materialization-enabled=true requires either selective-materialization-person-dbk or selective-materialization-person-number"); log.info("Starting Leitstand TIME materialization for all imported recordings (rebuildProjection={})", rebuildProjection);
int count = importService.materializeCanonicalTimeEntriesForAll(rebuildProjection);
log.info("Completed Leitstand TIME materialization for all imported recordings. Processed {} recordings", count);
} }
} }