clustering

This commit is contained in:
trifonovt 2026-04-24 16:34:38 +02:00
parent c6efbf40f6
commit 979f1ba18e
75 changed files with 4637 additions and 2 deletions

View File

@ -0,0 +1,40 @@
# TIME selective materialization by person
This NEW-only patch adds the ability to materialize canonical `TIME.time_entry` rows and refresh projection/representations only for Leitstand time recordings that belong to one selected person after the raw `TIME.ls_*` import is already present.
## Service methods
- `LeitstandTimeImportService.materializeCanonicalTimeEntriesForPersonDbk(String personDbk, boolean rebuildProjection)`
- `LeitstandTimeImportService.materializeCanonicalTimeEntriesForPersonNumber(Integer personNumber, boolean rebuildProjection)`
- `LeitstandTimeProjectionService.refreshForPersonDbk(String personDbk)`
## Optional startup runner
Enable with:
```yaml
dip:
time:
leitstand:
startup-selective-materialization-enabled: true
selective-materialization-person-dbk: 100919970619190804070001
selective-materialization-build-projection: true
```
or:
```yaml
dip:
time:
leitstand:
startup-selective-materialization-enabled: true
selective-materialization-person-number: 12345
selective-materialization-build-projection: true
```
## Notes
- intended for already imported `TIME.ls_*` rows
- no legacy code changes
- no raw source sync is triggered by this runner
- if projection rebuild is enabled, representations/embedding enqueueing continue to use the existing T3 behavior

View File

@ -0,0 +1,57 @@
# Python clustering backend for DBSCAN and advanced algorithms
This patch adds a dedicated Python service for clustering algorithms that are better supported in the Python scientific stack than in Java.
## Why Python for this step
The Spring module remains the orchestrator for:
- embedding selection
- run metadata
- result persistence
- cluster browsing APIs
The Python backend executes the actual clustering for algorithms such as:
- `DBSCAN`
- `HDBSCAN`
- `MINI_BATCH_KMEANS`
- `AGGLOMERATIVE`
- `KMEANS` with optional reduction
## Spring-side contract changes in this patch
The Spring request model now supports generic algorithm parameters through `parameters` instead of only `k`.
Examples:
- KMeans: `{ "k": 25 }`
- DBSCAN: `{ "eps": 0.25, "minSamples": 5 }`
- HDBSCAN: `{ "minClusterSize": 15, "minSamples": 5 }`
- Agglomerative: `{ "k": 20, "linkage": "average", "metric": "euclidean" }`
The Python response is now mapped with:
- `noise`
- `membershipScore`
- `distanceToCentroid`
- noise cluster rows
- `noiseCount`
Those values are persisted back into:
- `doc.doc_embedding_cluster`
- `doc.doc_embedding_cluster_assignment`
- `doc.doc_embedding_cluster_run`
## Recommended defaults for embeddings
For high-dimensional text embeddings, use:
- `normalizeVectors=true`
- `reductionMethod=PCA`
- `reductionDimensions=50..150`
Typical starting points:
- DBSCAN: `eps=0.20..0.35`, `minSamples=5`
- HDBSCAN: `minClusterSize=10..30`, `minSamples=3..10`
The right values still depend on:
- embedding model
- whether vectors are normalized
- whether full documents or chunks are clustered
- the semantic density of the selected dataset

View File

@ -0,0 +1,100 @@
{
"info": {
"name": "DIP Clustering Dual Python Modes",
"_postman_id": "4c2f4f68-c9c3-4977-9627-b4f7422dd001",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
"description": "Direct vector-upload and compact runId Python endpoints."
},
"variable": [
{
"key": "pythonBaseUrl",
"value": "http://localhost:8001"
},
{
"key": "runId",
"value": ""
}
],
"item": [
{
"name": "Health",
"item": [
{
"name": "GET /health",
"request": {
"method": "GET",
"url": {
"raw": "{{pythonBaseUrl}}/health",
"host": [
"{{pythonBaseUrl}}"
],
"path": [
"health"
]
}
}
}
]
},
{
"name": "Direct vector upload",
"item": [
{
"name": "POST /cluster",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{pythonBaseUrl}}/cluster",
"host": [
"{{pythonBaseUrl}}"
],
"path": [
"cluster"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"algorithm\": \"KMEANS\",\n \"parameters\": {\n \"k\": 2,\n \"normalizeVectors\": true\n },\n \"reductionMethod\": \"NONE\",\n \"reductionDimensions\": null,\n \"items\": [\n {\n \"embeddingId\": \"11111111-1111-1111-1111-111111111111\",\n \"documentId\": \"22222222-2222-2222-2222-222222222222\",\n \"representationId\": \"33333333-3333-3333-3333-333333333333\",\n \"vector\": [\n 0.1,\n 0.2,\n 0.3\n ]\n },\n {\n \"embeddingId\": \"11111111-1111-1111-1111-111111111112\",\n \"documentId\": \"22222222-2222-2222-2222-222222222223\",\n \"representationId\": \"33333333-3333-3333-3333-333333333334\",\n \"vector\": [\n 0.11,\n 0.19,\n 0.31\n ]\n }\n ]\n}"
}
}
}
]
},
{
"name": "Compact runId mode",
"item": [
{
"name": "POST /cluster-run",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{pythonBaseUrl}}/cluster-run",
"host": [
"{{pythonBaseUrl}}"
],
"path": [
"cluster-run"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"runId\": \"{{runId}}\"\n}"
}
}
}
]
}
]
}

View File

@ -0,0 +1,131 @@
{
"info": {
"name": "DIP Clustering Phase E Compact Run",
"_postman_id": "57e745df-cb97-4a13-8c74-9e5c689ef0ac",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
"description": "Phase E compact run execution: Spring keeps metadata, Python receives only runId."
},
"variable": [
{
"key": "springBaseUrl",
"value": "http://localhost:8889/api"
},
{
"key": "pythonBaseUrl",
"value": "http://localhost:8001"
},
{
"key": "runId",
"value": ""
}
],
"item": [
{
"name": "Python Health",
"request": {
"method": "GET",
"url": {
"raw": "{{pythonBaseUrl}}/health",
"host": [
"{{pythonBaseUrl}}"
],
"path": [
"health"
]
}
}
},
{
"name": "Python Cluster Run",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{pythonBaseUrl}}/cluster-run",
"host": [
"{{pythonBaseUrl}}"
],
"path": [
"cluster-run"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"runId\": \"{{runId}}\"\n}"
}
}
},
{
"name": "Create TED DBSCAN run",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{springBaseUrl}}/v1/dip/clustering/runs",
"host": [
"{{springBaseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"name\": \"TED notices DBSCAN PCA200\",\n \"algorithm\": \"DBSCAN\",\n \"executionBackend\": \"PYTHON_REMOTE\",\n \"reduction\": {\n \"method\": \"PCA\",\n \"targetDimensions\": 200\n },\n \"selection\": {\n \"documentTypes\": [\n \"TED_NOTICE\"\n ],\n \"representationTypes\": [\n \"SEMANTIC_TEXT\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n },\n \"parameters\": {\n \"eps\": 0.25,\n \"minSamples\": 5,\n \"metric\": \"euclidean\",\n \"normalizeVectors\": true\n }\n}"
}
}
},
{
"name": "Queue Run",
"request": {
"method": "POST",
"url": {
"raw": "{{springBaseUrl}}/v1/dip/clustering/runs/{{runId}}/start",
"host": [
"{{springBaseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs",
"{{runId}}",
"start"
]
}
}
},
{
"name": "Get Run",
"request": {
"method": "GET",
"url": {
"raw": "{{springBaseUrl}}/v1/dip/clustering/runs/{{runId}}",
"host": [
"{{springBaseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs",
"{{runId}}"
]
}
}
}
]
}

View File

@ -0,0 +1,157 @@
{
"info": {
"name": "DIP Clustering Phase C",
"_postman_id": "fa4b1e24-7d8d-4b1a-bd67-0f3f1b601111",
"description": "Operational Postman collection for clustering sets, async runs, cancellation, and text-aware result inspection.",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"variable": [
{
"key": "baseUrl",
"value": "http://localhost:8080/api"
},
{
"key": "runId",
"value": ""
},
{
"key": "clusterSetId",
"value": ""
},
{
"key": "clusterId",
"value": ""
}
],
"item": [
{
"name": "Cluster Sets",
"item": [
{
"name": "Create TED cluster set",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": "{{baseUrl}}/v1/dip/clustering/sets",
"body": {
"mode": "raw",
"raw": "{\n \"code\": \"TED_NOTICE_E5_PRIMARY\",\n \"name\": \"TED notices primary semantic text\",\n \"description\": \"Saved TED notice clustering selection\",\n \"selection\": {\n \"documentTypes\": [\n \"TED_NOTICE\"\n ],\n \"representationTypes\": [\n \"SEMANTIC_TEXT\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n },\n \"active\": true\n}"
}
}
},
{
"name": "List cluster sets",
"request": {
"method": "GET",
"url": "{{baseUrl}}/v1/dip/clustering/sets"
}
},
{
"name": "Get cluster set",
"request": {
"method": "GET",
"url": "{{baseUrl}}/v1/dip/clustering/sets/{{clusterSetId}}"
}
}
]
},
{
"name": "Runs",
"item": [
{
"name": "Create TED KMeans run",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": "{{baseUrl}}/v1/dip/clustering/runs",
"body": {
"mode": "raw",
"raw": "{\n \"name\": \"TED notices KMeans async run 1\",\n \"algorithm\": \"KMEANS\",\n \"executionBackend\": \"JAVA_LOCAL\",\n \"reduction\": {\n \"method\": \"NONE\"\n },\n \"selection\": {\n \"documentTypes\": [\n \"TED_NOTICE\"\n ],\n \"representationTypes\": [\n \"SEMANTIC_TEXT\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n },\n \"k\": 25\n}"
}
}
},
{
"name": "Create HDBSCAN Python run",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": "{{baseUrl}}/v1/dip/clustering/runs",
"body": {
"mode": "raw",
"raw": "{\n \"name\": \"TED notices HDBSCAN PCA100\",\n \"algorithm\": \"HDBSCAN\",\n \"executionBackend\": \"PYTHON_REMOTE\",\n \"reduction\": {\n \"method\": \"PCA\",\n \"targetDimensions\": 100\n },\n \"selection\": {\n \"documentTypes\": [\n \"TED_NOTICE\"\n ],\n \"representationTypes\": [\n \"SEMANTIC_TEXT\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n },\n \"k\": 25\n}"
}
}
},
{
"name": "Start run async",
"request": {
"method": "POST",
"url": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/start"
}
},
{
"name": "Cancel run",
"request": {
"method": "POST",
"url": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/cancel"
}
},
{
"name": "Get run",
"request": {
"method": "GET",
"url": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}"
}
},
{
"name": "List runs",
"request": {
"method": "GET",
"url": "{{baseUrl}}/v1/dip/clustering/runs?status=COMPLETED"
}
}
]
},
{
"name": "Results",
"item": [
{
"name": "List clusters",
"request": {
"method": "GET",
"url": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/clusters"
}
},
{
"name": "Assignments with text",
"request": {
"method": "GET",
"url": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/assignments?includeText=true"
}
},
{
"name": "Cluster members with text",
"request": {
"method": "GET",
"url": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/clusters/{{clusterId}}/members?includeText=true"
}
}
]
}
]
}

View File

@ -0,0 +1,150 @@
{
"info": {
"name": "DIP Clustering Phase D",
"_postman_id": "0c39a7cf-8fde-43f9-8fdb-5d8890ad7676",
"description": "Spring clustering API examples with generic algorithm parameters and remote Python backend.",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"variable": [
{
"key": "baseUrl",
"value": "http://localhost:8080"
},
{
"key": "runId",
"value": ""
}
],
"item": [
{
"name": "Create DBSCAN run for TED notices",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"name\": \"TED notices DBSCAN PCA100\",\n \"algorithm\": \"DBSCAN\",\n \"executionBackend\": \"PYTHON_REMOTE\",\n \"reduction\": {\n \"method\": \"PCA\",\n \"targetDimensions\": 100\n },\n \"selection\": {\n \"documentTypes\": [\n \"TED_NOTICE\"\n ],\n \"representationTypes\": [\n \"SEMANTIC_TEXT\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n },\n \"parameters\": {\n \"eps\": 0.25,\n \"minSamples\": 5,\n \"metric\": \"euclidean\",\n \"normalizeVectors\": true\n }\n}"
}
}
},
{
"name": "Create HDBSCAN run for Leitstand TIME",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"name\": \"Leitstand TIME HDBSCAN PCA50\",\n \"algorithm\": \"HDBSCAN\",\n \"executionBackend\": \"PYTHON_REMOTE\",\n \"reduction\": {\n \"method\": \"PCA\",\n \"targetDimensions\": 50\n },\n \"selection\": {\n \"documentTypes\": [\n \"TIME_ENTRY\"\n ],\n \"builderKeys\": [\n \"time-entry-structured-text\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n },\n \"parameters\": {\n \"minClusterSize\": 15,\n \"minSamples\": 5,\n \"metric\": \"euclidean\",\n \"clusterSelectionMethod\": \"eom\",\n \"normalizeVectors\": true\n }\n}"
}
}
},
{
"name": "Create Agglomerative run",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"name\": \"TED notices Agglomerative\",\n \"algorithm\": \"AGGLOMERATIVE\",\n \"executionBackend\": \"PYTHON_REMOTE\",\n \"reduction\": {\n \"method\": \"PCA\",\n \"targetDimensions\": 100\n },\n \"selection\": {\n \"documentTypes\": [\n \"TED_NOTICE\"\n ],\n \"representationTypes\": [\n \"SEMANTIC_TEXT\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n },\n \"parameters\": {\n \"k\": 25,\n \"linkage\": \"average\",\n \"metric\": \"euclidean\",\n \"normalizeVectors\": true\n }\n}"
}
}
},
{
"name": "Start run",
"request": {
"method": "POST",
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/start",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs",
"{{runId}}",
"start"
]
}
}
},
{
"name": "Assignments with text",
"request": {
"method": "GET",
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/assignments?includeText=true",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs",
"{{runId}}",
"assignments"
],
"query": [
{
"key": "includeText",
"value": "true"
}
]
}
}
}
]
}

View File

@ -0,0 +1,81 @@
{
"info": {
"name": "DIP Python Clustering Service",
"_postman_id": "59df0e88-01d6-42a4-9071-195b43f96787",
"description": "Direct calls to the remote Python clustering service.",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"variable": [
{
"key": "baseUrl",
"value": "http://localhost:8001"
}
],
"item": [
{
"name": "Health",
"request": {
"method": "GET",
"url": {
"raw": "{{baseUrl}}/health",
"host": [
"{{baseUrl}}"
],
"path": [
"health"
]
}
}
},
{
"name": "DBSCAN PCA request",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{baseUrl}}/cluster",
"host": [
"{{baseUrl}}"
],
"path": [
"cluster"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"algorithm\": \"DBSCAN\",\n \"parameters\": {\n \"eps\": 0.25,\n \"minSamples\": 2,\n \"normalizeVectors\": false\n },\n \"reductionMethod\": \"NONE\",\n \"items\": [\n {\n \"embeddingId\": \"11111111-1111-1111-1111-111111111111\",\n \"documentId\": \"22222222-2222-2222-2222-222222222221\",\n \"representationId\": \"33333333-3333-3333-3333-333333333331\",\n \"vector\": [\n 0.0,\n 0.0\n ]\n },\n {\n \"embeddingId\": \"11111111-1111-1111-1111-111111111112\",\n \"documentId\": \"22222222-2222-2222-2222-222222222222\",\n \"representationId\": \"33333333-3333-3333-3333-333333333332\",\n \"vector\": [\n 0.05,\n 0.0\n ]\n },\n {\n \"embeddingId\": \"11111111-1111-1111-1111-111111111113\",\n \"documentId\": \"22222222-2222-2222-2222-222222222223\",\n \"representationId\": \"33333333-3333-3333-3333-333333333333\",\n \"vector\": [\n 10.0,\n 10.0\n ]\n }\n ]\n}"
}
}
},
{
"name": "HDBSCAN PCA request",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{baseUrl}}/cluster",
"host": [
"{{baseUrl}}"
],
"path": [
"cluster"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"algorithm\": \"HDBSCAN\",\n \"parameters\": {\n \"minClusterSize\": 2,\n \"minSamples\": 1,\n \"normalizeVectors\": false\n },\n \"reductionMethod\": \"NONE\",\n \"items\": [\n {\n \"embeddingId\": \"11111111-1111-1111-1111-111111111121\",\n \"documentId\": \"22222222-2222-2222-2222-222222222231\",\n \"representationId\": \"33333333-3333-3333-3333-333333333341\",\n \"vector\": [\n 0.0,\n 0.0\n ]\n },\n {\n \"embeddingId\": \"11111111-1111-1111-1111-111111111122\",\n \"documentId\": \"22222222-2222-2222-2222-222222222232\",\n \"representationId\": \"33333333-3333-3333-3333-333333333342\",\n \"vector\": [\n 0.03,\n 0.01\n ]\n },\n {\n \"embeddingId\": \"11111111-1111-1111-1111-111111111123\",\n \"documentId\": \"22222222-2222-2222-2222-222222222233\",\n \"representationId\": \"33333333-3333-3333-3333-333333333343\",\n \"vector\": [\n 5.0,\n 5.0\n ]\n },\n {\n \"embeddingId\": \"11111111-1111-1111-1111-111111111124\",\n \"documentId\": \"22222222-2222-2222-2222-222222222234\",\n \"representationId\": \"33333333-3333-3333-3333-333333333344\",\n \"vector\": [\n 5.05,\n 4.98\n ]\n }\n ]\n}"
}
}
}
]
}

View File

@ -0,0 +1,248 @@
{
"info": {
"name": "DIP Clustering Phase A",
"_postman_id": "5d5f3a8f-1c0c-4f7a-9e14-4d6f2c6d8f3a",
"description": "Sample Postman collection for DIP clustering Phase A endpoints.\n\nVariables:\n- baseUrl: Spring Boot base URL\n- runId: cluster run id returned by create run\n\nThis collection contains example requests for TED notice embeddings and Leitstand TIME entry embeddings.",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"variable": [
{
"key": "baseUrl",
"value": "http://localhost:8080"
},
{
"key": "runId",
"value": ""
}
],
"item": [
{
"name": "Selection",
"item": [
{
"name": "Count TED selection",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/selection/count",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"selection",
"count"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"documentTypes\": [\n \"TED_NOTICE\"\n ],\n \"representationTypes\": [\n \"SEMANTIC_TEXT\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n}"
},
"description": "Counts the number of completed TED_NOTICE embeddings eligible for clustering."
},
"response": []
},
{
"name": "Count Leitstand TIME selection",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/selection/count",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"selection",
"count"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"documentTypes\": [\n \"TIME_ENTRY\"\n ],\n \"builderKeys\": [\n \"time-entry-structured-text\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n}"
},
"description": "Counts the number of completed Leitstand TIME_ENTRY embeddings eligible for clustering."
},
"response": []
}
]
},
{
"name": "Runs",
"item": [
{
"name": "Create TED KMeans run",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"name\": \"TED notices KMeans run 1\",\n \"algorithm\": \"KMEANS\",\n \"selection\": {\n \"documentTypes\": [\n \"TED_NOTICE\"\n ],\n \"representationTypes\": [\n \"SEMANTIC_TEXT\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n },\n \"k\": 25\n}"
},
"description": "Creates a cluster run for TED notice embeddings. Copy the returned id into the Postman variable runId."
},
"response": []
},
{
"name": "Create Leitstand TIME KMeans run",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs"
]
},
"body": {
"mode": "raw",
"raw": "{\n \"name\": \"Leitstand TIME KMeans run 1\",\n \"algorithm\": \"KMEANS\",\n \"selection\": {\n \"documentTypes\": [\n \"TIME_ENTRY\"\n ],\n \"builderKeys\": [\n \"time-entry-structured-text\"\n ],\n \"embeddingStatuses\": [\n \"COMPLETED\"\n ],\n \"primaryRepresentationOnly\": true\n },\n \"k\": 15\n}"
},
"description": "Creates a cluster run for Leitstand TIME_ENTRY embeddings. Copy the returned id into the Postman variable runId."
},
"response": []
},
{
"name": "Start run",
"request": {
"method": "POST",
"header": [],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/start",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs",
"{{runId}}",
"start"
]
},
"description": "Starts clustering for the run stored in the runId variable."
},
"response": []
},
{
"name": "Get run",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs",
"{{runId}}"
]
},
"description": "Returns run metadata and status."
},
"response": []
}
]
},
{
"name": "Results",
"item": [
{
"name": "Get clusters for run",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/clusters",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs",
"{{runId}}",
"clusters"
]
},
"description": "Lists clusters discovered in the run."
},
"response": []
},
{
"name": "Get assignments for run",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{baseUrl}}/v1/dip/clustering/runs/{{runId}}/assignments",
"host": [
"{{baseUrl}}"
],
"path": [
"v1",
"dip",
"clustering",
"runs",
"{{runId}}",
"assignments"
]
},
"description": "Lists embedding-to-cluster assignments for the run."
},
"response": []
}
]
}
]
}

View File

@ -0,0 +1,19 @@
FROM python:3.11-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
WORKDIR /app
RUN apt-get update \
&& apt-get install -y --no-install-recommends build-essential \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app ./app
EXPOSE 8001
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8001"]

View File

@ -0,0 +1,204 @@
# DIP Clustering Service
Remote Python clustering backend for the DIP Spring clustering module.
## Main execution mode
The preferred execution mode is now:
- Spring keeps run metadata, selection snapshot, and lifecycle.
- Spring sends only a compact request containing `runId`.
- Python loads the run metadata and selected embeddings directly from Postgres.
- Python returns compact assignments keyed by `embeddingId`.
This avoids sending the full embedding matrix through HTTP.
## Implemented algorithms
- `KMEANS`
- `MINI_BATCH_KMEANS`
- `DBSCAN`
- `HDBSCAN`
- `AGGLOMERATIVE`
## Implemented reductions
- `NONE`
- `PCA`
- `UMAP`
## API
### `GET /health`
Returns service status and supported algorithms/reduction methods.
### `POST /cluster-run`
Preferred endpoint. Accepts only the cluster run id.
Example request body:
```json
{
"runId": "6c3bc3a3-24b0-47a5-9e35-92dd4b7275f8"
}
```
This service supports two remote execution modes at the same time:
- `POST /cluster`
- Spring uploads embeddings in the request body.
- This keeps the original implementation intact.
- `POST /cluster-run`
- Spring sends only `runId`.
- Python loads run metadata and embeddings directly from Postgres.
## Start
```powershell
py -3.11 -m venv .venv
.\.venv\Scripts\python.exe -m pip install --upgrade pip
.\.venv\Scripts\python.exe -m pip install -r requirements.txt
```
Configure DB access for `/cluster-run` with either:
### `POST /cluster`
Accepts the Spring `PythonClusteringRequest` payload and returns `PythonClusteringResponse`.
Example request body:
```json
{
"algorithm": "DBSCAN",
"parameters": {
"eps": 0.25,
"minSamples": 5,
"metric": "euclidean",
"normalizeVectors": true
},
"reductionMethod": "PCA",
"reductionDimensions": 100,
"items": [
{
"embeddingId": "11111111-1111-1111-1111-111111111111",
"documentId": "22222222-2222-2222-2222-222222222222",
"representationId": "33333333-3333-3333-3333-333333333333",
"vector": [0.1, 0.2, 0.3]
}
]
}
```
## Parameters by algorithm
### KMEANS
- `k` required
- `randomState` optional, default `42`
- `nInit` optional, default `10`
- `maxIter` optional, default `300`
### MINI_BATCH_KMEANS
- `k` required
- `batchSize` optional
- `randomState` optional, default `42`
- `nInit` optional, default `10`
- `maxIter` optional, default `300`
### DBSCAN
- `eps` required
- `minSamples` optional, default `5`
- `metric` optional, default `euclidean`
- `algorithm` optional, default `auto`
- `nJobs` optional, default `-1`
### HDBSCAN
- `minClusterSize` optional, default `10`
- `minSamples` optional
- `metric` optional, default `euclidean`
- `clusterSelectionMethod` optional, default `eom`
### AGGLOMERATIVE
- `k` required
- `linkage` optional, default `average`
- `metric` optional, default `euclidean`
- `computeDistances` optional, default `false`
## Shared parameters
- `normalizeVectors` optional, default `true`
- `randomState` optional, used by `KMEANS`, `MINI_BATCH_KMEANS`, `PCA`, `UMAP`
## UMAP reduction parameters
- `reductionMetric` optional, default `cosine`
- `umapNeighbors` optional, default `15`
- `umapMinDist` optional, default `0.0`
## Local run
## Required database configuration
Set either:
- `CLUSTERING_DB_DSN`
- or `DATABASE_URL`
- or `CLUSTERING_DB_HOST`, `CLUSTERING_DB_PORT`, `CLUSTERING_DB_NAME`, `CLUSTERING_DB_USER`, `CLUSTERING_DB_PASSWORD`
Example:
```bash
export CLUSTERING_DB_DSN=postgresql://postgres:postgres@localhost:5432/dip
```
## Local run on Windows
```powershell
$env:CLUSTERING_DB_DSN="postgresql://postgres:postgres@localhost:5432/dip"
.\.venv\Scripts\python.exe -m uvicorn app.main:app --host 0.0.0.0 --port 8001 --reload
```
## Docker run
```bash
docker build -t dip-clustering-service .
docker run --rm -p 8001:8001 dip-clustering-service
```
## Spring configuration
Use the original request-upload mode:
```yaml
dip:
clustering:
python:
enabled: true
base-url: http://localhost:8001
cluster-path: /cluster
cluster-run-path: /cluster-run
request-mode: INLINE_VECTORS
connect-timeout: 30s
read-timeout: 30m
```
Use compact `runId` mode:
```yaml
dip:
clustering:
python:
enabled: true
base-url: http://localhost:8001
cluster-path: /cluster
cluster-run-path: /cluster-run
request-mode: RUN_ID
connect-timeout: 30s
read-timeout: 30m
```
`INLINE_VECTORS` is the default if `request-mode` is omitted.

View File

@ -0,0 +1,311 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import numpy as np
from sklearn.cluster import AgglomerativeClustering, DBSCAN, KMeans, MiniBatchKMeans
from sklearn.decomposition import PCA
from sklearn.preprocessing import normalize
try:
import hdbscan
except Exception: # pragma: no cover - runtime dependency guard
hdbscan = None
try:
import umap
except Exception: # pragma: no cover - runtime dependency guard
umap = None
from .models import (
ClusteringAlgorithm,
PythonAssignment,
PythonCluster,
PythonClusteringItem,
PythonClusteringRequest,
PythonClusteringResponse,
ReductionMethod,
RunMetadata,
)
class ClusteringServiceError(ValueError):
pass
@dataclass
class PreparedData:
original: np.ndarray
transformed: np.ndarray
items: list[PythonClusteringItem]
def cluster_embeddings(request: PythonClusteringRequest) -> PythonClusteringResponse:
return cluster_items(
algorithm=request.algorithm,
parameters=request.parameters or {},
reduction_method=request.reductionMethod,
reduction_dimensions=request.reductionDimensions,
items=request.items,
)
def cluster_run(metadata: RunMetadata, items: list[PythonClusteringItem]) -> PythonClusteringResponse:
return cluster_items(
algorithm=metadata.algorithm,
parameters=metadata.parameters or {},
reduction_method=metadata.reductionMethod,
reduction_dimensions=metadata.reductionDimensions,
items=items,
)
def cluster_items(
algorithm: ClusteringAlgorithm,
parameters: dict[str, Any],
reduction_method: ReductionMethod,
reduction_dimensions: int | None,
items: list[PythonClusteringItem],
) -> PythonClusteringResponse:
if not items:
raise ClusteringServiceError("Request contains no items")
prepared = _prepare_data(
items=items,
parameters=parameters,
reduction_method=reduction_method,
reduction_dimensions=reduction_dimensions,
)
labels, membership_scores = _run_algorithm(
algorithm=algorithm,
vectors=prepared.transformed,
parameters=parameters,
)
return _build_response(prepared, labels, membership_scores)
def _prepare_data(
items: list[PythonClusteringItem],
parameters: dict[str, Any],
reduction_method: ReductionMethod,
reduction_dimensions: int | None,
) -> PreparedData:
vectors = np.asarray([item.vector for item in items], dtype=np.float32)
if vectors.ndim != 2 or vectors.shape[0] == 0:
raise ClusteringServiceError("Vectors must form a non-empty 2D array")
if _bool_param(parameters, "normalizeVectors", True):
vectors = normalize(vectors, norm="l2")
transformed = vectors
if reduction_method == ReductionMethod.PCA:
target_dims = reduction_dimensions
if target_dims is None:
raise ClusteringServiceError("PCA reduction requires reductionDimensions")
max_components = min(transformed.shape[0], transformed.shape[1])
if target_dims <= 0 or target_dims > max_components:
raise ClusteringServiceError(
f"PCA reductionDimensions must be between 1 and {max_components}"
)
pca = PCA(
n_components=target_dims,
random_state=_int_param(parameters, "randomState", 42),
)
transformed = pca.fit_transform(transformed)
elif reduction_method == ReductionMethod.UMAP:
target_dims = reduction_dimensions
if target_dims is None:
raise ClusteringServiceError("UMAP reduction requires reductionDimensions")
if umap is None:
raise ClusteringServiceError("UMAP reduction requested but umap-learn is not installed")
reducer = umap.UMAP(
n_components=target_dims,
metric=_str_param(parameters, "reductionMetric", "cosine"),
n_neighbors=_int_param(parameters, "umapNeighbors", 15),
min_dist=_float_param(parameters, "umapMinDist", 0.0),
random_state=_int_param(parameters, "randomState", 42),
)
transformed = reducer.fit_transform(transformed)
return PreparedData(original=vectors, transformed=np.asarray(transformed, dtype=np.float32), items=items)
def _run_algorithm(
algorithm: ClusteringAlgorithm,
vectors: np.ndarray,
parameters: dict[str, Any],
) -> tuple[np.ndarray, np.ndarray | None]:
if algorithm == ClusteringAlgorithm.KMEANS:
k = _required_int_param(parameters, "k")
model = KMeans(
n_clusters=k,
random_state=_int_param(parameters, "randomState", 42),
n_init=_int_param(parameters, "nInit", 10),
max_iter=_int_param(parameters, "maxIter", 300),
)
labels = model.fit_predict(vectors)
return np.asarray(labels, dtype=np.int32), None
if algorithm == ClusteringAlgorithm.MINI_BATCH_KMEANS:
k = _required_int_param(parameters, "k")
batch_size = _int_param(parameters, "batchSize", min(max(k * 16, 256), 4096))
model = MiniBatchKMeans(
n_clusters=k,
random_state=_int_param(parameters, "randomState", 42),
n_init=_int_param(parameters, "nInit", 10),
max_iter=_int_param(parameters, "maxIter", 300),
batch_size=batch_size,
)
labels = model.fit_predict(vectors)
return np.asarray(labels, dtype=np.int32), None
if algorithm == ClusteringAlgorithm.DBSCAN:
eps = _required_float_param(parameters, "eps")
model = DBSCAN(
eps=eps,
min_samples=_int_param(parameters, "minSamples", 5),
metric=_str_param(parameters, "metric", "euclidean"),
algorithm=_str_param(parameters, "algorithm", "auto"),
n_jobs=_int_param(parameters, "nJobs", -1),
)
labels = model.fit_predict(vectors)
return np.asarray(labels, dtype=np.int32), None
if algorithm == ClusteringAlgorithm.HDBSCAN:
if hdbscan is None:
raise ClusteringServiceError("HDBSCAN requested but hdbscan is not installed")
model = hdbscan.HDBSCAN(
min_cluster_size=_int_param(parameters, "minClusterSize", 10),
min_samples=_nullable_int_param(parameters, "minSamples"),
metric=_str_param(parameters, "metric", "euclidean"),
cluster_selection_method=_str_param(parameters, "clusterSelectionMethod", "eom"),
)
labels = model.fit_predict(vectors)
probabilities = getattr(model, "probabilities_", None)
return np.asarray(labels, dtype=np.int32), None if probabilities is None else np.asarray(probabilities, dtype=np.float32)
if algorithm == ClusteringAlgorithm.AGGLOMERATIVE:
k = _required_int_param(parameters, "k")
linkage = _str_param(parameters, "linkage", "average")
metric = _str_param(parameters, "metric", "euclidean")
if linkage == "ward":
metric = "euclidean"
model = AgglomerativeClustering(
n_clusters=k,
linkage=linkage,
metric=metric,
compute_distances=_bool_param(parameters, "computeDistances", False),
)
labels = model.fit_predict(vectors)
return np.asarray(labels, dtype=np.int32), None
raise ClusteringServiceError(f"Unsupported algorithm: {algorithm}")
def _build_response(
prepared: PreparedData,
labels: np.ndarray,
membership_scores: np.ndarray | None,
) -> PythonClusteringResponse:
unique_labels = sorted(int(label) for label in np.unique(labels))
clusters: list[PythonCluster] = []
assignments: list[PythonAssignment] = []
centroids: dict[int, np.ndarray] = {}
for label in unique_labels:
mask = labels == label
item_count = int(mask.sum())
noise_cluster = label == -1
clusters.append(PythonCluster(clusterLabel=label, itemCount=item_count, noiseCluster=noise_cluster))
if not noise_cluster:
centroids[label] = prepared.transformed[mask].mean(axis=0)
for index, item in enumerate(prepared.items):
label = int(labels[index])
noise = label == -1
distance = None if noise else float(np.linalg.norm(prepared.transformed[index] - centroids[label]))
membership = None
if membership_scores is not None:
membership = float(membership_scores[index])
assignments.append(
PythonAssignment(
embeddingId=item.embeddingId,
clusterLabel=label,
distanceToCentroid=distance,
membershipScore=membership,
noise=noise,
)
)
noise_count = int((labels == -1).sum())
return PythonClusteringResponse(clusters=clusters, assignments=assignments, noiseCount=noise_count)
def _required_int_param(parameters: dict[str, Any], key: str) -> int:
if key not in parameters or parameters[key] is None:
raise ClusteringServiceError(f"Missing required parameter: {key}")
return _coerce_int(parameters[key], key)
def _required_float_param(parameters: dict[str, Any], key: str) -> float:
if key not in parameters or parameters[key] is None:
raise ClusteringServiceError(f"Missing required parameter: {key}")
return _coerce_float(parameters[key], key)
def _nullable_int_param(parameters: dict[str, Any], key: str) -> int | None:
if key not in parameters or parameters[key] is None:
return None
return _coerce_int(parameters[key], key)
def _int_param(parameters: dict[str, Any], key: str, default: int) -> int:
if key not in parameters or parameters[key] is None:
return default
return _coerce_int(parameters[key], key)
def _float_param(parameters: dict[str, Any], key: str, default: float) -> float:
if key not in parameters or parameters[key] is None:
return default
return _coerce_float(parameters[key], key)
def _bool_param(parameters: dict[str, Any], key: str, default: bool) -> bool:
if key not in parameters or parameters[key] is None:
return default
value = parameters[key]
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"true", "1", "yes", "y"}:
return True
if normalized in {"false", "0", "no", "n"}:
return False
raise ClusteringServiceError(f"Parameter {key} must be boolean-compatible")
def _str_param(parameters: dict[str, Any], key: str, default: str) -> str:
if key not in parameters or parameters[key] is None:
return default
return str(parameters[key])
def _coerce_int(value: Any, key: str) -> int:
if isinstance(value, bool):
raise ClusteringServiceError(f"Parameter {key} must be integer-compatible")
try:
return int(value)
except (TypeError, ValueError) as exc:
raise ClusteringServiceError(f"Parameter {key} must be integer-compatible") from exc
def _coerce_float(value: Any, key: str) -> float:
if isinstance(value, bool):
raise ClusteringServiceError(f"Parameter {key} must be float-compatible")
try:
return float(value)
except (TypeError, ValueError) as exc:
raise ClusteringServiceError(f"Parameter {key} must be float-compatible") from exc

View File

@ -0,0 +1,63 @@
from __future__ import annotations
from fastapi import FastAPI, HTTPException
from fastapi.middleware.gzip import GZipMiddleware
from pydantic import BaseModel
from .cluster_service import ClusteringServiceError, cluster_embeddings, cluster_run
from .models import (
ClusteringAlgorithm,
PythonClusteringRequest,
PythonClusteringResponse,
PythonRunExecutionRequest,
ReductionMethod,
)
from .run_db_loader import load_run_and_embeddings
app = FastAPI(
title="DIP Clustering Service",
version="2.0.0",
description="Remote clustering backend for DIP embedding clustering runs.",
)
app.add_middleware(GZipMiddleware, minimum_size=1024)
class HealthResponse(BaseModel):
status: str
algorithms: list[str]
reductionMethods: list[str]
endpoints: list[str]
@app.get("/health", response_model=HealthResponse)
def health() -> HealthResponse:
return HealthResponse(
status="UP",
algorithms=[algorithm.value for algorithm in ClusteringAlgorithm],
reductionMethods=[method.value for method in ReductionMethod],
endpoints=["/cluster", "/cluster-run"],
)
@app.post("/cluster", response_model=PythonClusteringResponse)
def cluster_direct(request: PythonClusteringRequest) -> PythonClusteringResponse:
try:
return cluster_embeddings(request)
except ClusteringServiceError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
except Exception as exc: # pragma: no cover - last-resort guard
raise HTTPException(status_code=500, detail=f"Unexpected clustering failure: {exc}") from exc
@app.post("/cluster-run", response_model=PythonClusteringResponse)
def cluster_by_run(request: PythonRunExecutionRequest) -> PythonClusteringResponse:
try:
metadata, items = load_run_and_embeddings(request.runId)
return cluster_run(metadata, items)
except ClusteringServiceError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
except ValueError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except Exception as exc: # pragma: no cover - last-resort guard
raise HTTPException(status_code=500, detail=f"Unexpected clustering failure: {exc}") from exc

View File

@ -0,0 +1,75 @@
from __future__ import annotations
from enum import Enum
from typing import Any
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
class ClusteringAlgorithm(str, Enum):
KMEANS = "KMEANS"
MINI_BATCH_KMEANS = "MINI_BATCH_KMEANS"
DBSCAN = "DBSCAN"
HDBSCAN = "HDBSCAN"
AGGLOMERATIVE = "AGGLOMERATIVE"
class ReductionMethod(str, Enum):
NONE = "NONE"
PCA = "PCA"
UMAP = "UMAP"
class PythonClusteringItem(BaseModel):
embeddingId: UUID
documentId: UUID | None = None
representationId: UUID | None = None
vector: list[float]
class PythonClusteringRequest(BaseModel):
algorithm: ClusteringAlgorithm
parameters: dict[str, Any] = Field(default_factory=dict)
reductionMethod: ReductionMethod = ReductionMethod.NONE
reductionDimensions: int | None = None
items: list[PythonClusteringItem]
model_config = ConfigDict(use_enum_values=True)
class PythonRunExecutionRequest(BaseModel):
runId: UUID
class PythonCluster(BaseModel):
clusterLabel: int
itemCount: int
noiseCluster: bool = False
class PythonAssignment(BaseModel):
embeddingId: UUID
documentId: UUID | None = None
representationId: UUID | None = None
clusterLabel: int
distanceToCentroid: float | None = None
membershipScore: float | None = None
noise: bool = False
class PythonClusteringResponse(BaseModel):
clusters: list[PythonCluster]
assignments: list[PythonAssignment]
noiseCount: int
class RunMetadata(BaseModel):
runId: UUID
algorithm: ClusteringAlgorithm
parameters: dict[str, Any] = Field(default_factory=dict)
reductionMethod: ReductionMethod = ReductionMethod.NONE
reductionDimensions: int | None = None
selection: dict[str, Any] = Field(default_factory=dict)
model_config = ConfigDict(use_enum_values=True)

View File

@ -0,0 +1,163 @@
from __future__ import annotations
import json
from typing import Any
from uuid import UUID
import numpy as np
import psycopg2
from .models import ClusteringAlgorithm, PythonClusteringItem, ReductionMethod, RunMetadata
from .settings import ServiceSettings
def load_run_and_embeddings(run_id: UUID) -> tuple[RunMetadata, list[PythonClusteringItem]]:
settings = ServiceSettings.from_env()
with psycopg2.connect(settings.db_dsn) as connection:
run = _load_run_metadata(connection, run_id)
items = _load_embeddings(connection, run.selection)
return run, items
def _load_run_metadata(connection, run_id: UUID) -> RunMetadata:
with connection.cursor() as cursor:
cursor.execute(
"""
select
id,
algorithm,
coalesce(parameters_json::text, '{}'),
reduction_method,
reduction_dimensions,
coalesce(selection_json::text, '{}')
from doc.doc_embedding_cluster_run
where id = %s
""",
(str(run_id),),
)
row = cursor.fetchone()
if row is None:
raise ValueError(f"Cluster run not found: {run_id}")
parameters = _json_to_dict(row[2])
selection = _json_to_dict(row[5])
return RunMetadata(
runId=row[0],
algorithm=ClusteringAlgorithm(row[1]),
parameters=parameters,
reductionMethod=ReductionMethod(row[3]) if row[3] else ReductionMethod.NONE,
reductionDimensions=row[4],
selection=selection,
)
def _load_embeddings(connection, selection: dict[str, Any]) -> list[PythonClusteringItem]:
sql_parts = [
"""
select
e.id as embedding_id,
e.document_id,
e.representation_id,
e.embedding_vector::text as embedding_vector_text
from doc.doc_embedding e
join doc.doc_document d on d.id = e.document_id
join doc.doc_text_representation r on r.id = e.representation_id
where e.embedding_status = 'COMPLETED'
and e.embedding_vector is not null
"""
]
params: list[Any] = []
_apply_selection_filters(selection, sql_parts, params)
sql_parts.append(" order by e.created_at asc")
sql = "".join(sql_parts)
items: list[PythonClusteringItem] = []
with connection.cursor(name="cluster_embedding_selection") as cursor:
cursor.itersize = 2000
cursor.execute(sql, params)
for embedding_id, document_id, representation_id, vector_text in cursor:
items.append(
PythonClusteringItem(
embeddingId=embedding_id,
documentId=document_id,
representationId=representation_id,
vector=_parse_vector_text(vector_text),
)
)
return items
def _apply_selection_filters(selection: dict[str, Any], sql_parts: list[str], params: list[Any]) -> None:
if not selection:
return
_append_in_filter(sql_parts, params, "documentTypes", "d.document_type", selection.get("documentTypes"))
_append_in_filter(sql_parts, params, "documentFamilies", "d.document_family", selection.get("documentFamilies"))
_append_in_filter(sql_parts, params, "representationTypes", "r.representation_type", selection.get("representationTypes"))
_append_in_filter(sql_parts, params, "embeddingStatuses", "e.embedding_status", selection.get("embeddingStatuses"))
_append_in_filter(sql_parts, params, "modelIds", "e.model_id", selection.get("modelIds"))
_append_in_filter(sql_parts, params, "prefixProfileIds", "e.prefix_profile_id", selection.get("prefixProfileIds"))
_append_in_filter(sql_parts, params, "builderKeys", "r.builder_key", selection.get("builderKeys"))
_append_in_filter(sql_parts, params, "languageCodes", "r.language_code", selection.get("languageCodes"))
_append_in_filter(sql_parts, params, "ownerTenantIds", "d.owner_tenant_id", selection.get("ownerTenantIds"))
business_key_like = selection.get("businessKeyLike")
if business_key_like:
sql_parts.append(" and d.business_key like %s")
params.append(business_key_like)
created_from = selection.get("createdFrom")
if created_from:
sql_parts.append(" and d.created_at >= %s")
params.append(created_from)
created_to = selection.get("createdTo")
if created_to:
sql_parts.append(" and d.created_at < %s")
params.append(created_to)
if selection.get("primaryRepresentationOnly") is True:
sql_parts.append(" and r.is_primary = true")
def _append_in_filter(
sql_parts: list[str],
params: list[Any],
_key: str,
column_name: str,
values: list[Any] | None,
) -> None:
if not values:
return
placeholders = ", ".join(["%s"] * len(values))
sql_parts.append(f" and {column_name} in ({placeholders})")
params.extend(values)
def _parse_vector_text(raw_value: str) -> list[float]:
if raw_value is None:
return []
value = raw_value.strip()
if value.startswith("[") and value.endswith("]"):
value = value[1:-1]
if not value:
return []
vector = np.fromstring(value, sep=",", dtype=np.float32)
return vector.astype(float).tolist()
def _json_to_dict(raw_json: str | dict[str, Any] | None) -> dict[str, Any]:
if raw_json is None:
return {}
if isinstance(raw_json, dict):
return raw_json
if not raw_json.strip():
return {}
loaded = json.loads(raw_json)
return loaded if isinstance(loaded, dict) else {}

View File

@ -0,0 +1,39 @@
from __future__ import annotations
import os
from dataclasses import dataclass
@dataclass(frozen=True)
class ServiceSettings:
db_dsn: str
@staticmethod
def from_env() -> "ServiceSettings":
dsn = (
os.getenv("CLUSTERING_DB_DSN")
or os.getenv("DATABASE_URL")
or _build_dsn_from_parts()
)
if not dsn:
raise RuntimeError(
"No database connection configured. Set CLUSTERING_DB_DSN or DATABASE_URL, "
"or provide CLUSTERING_DB_HOST / CLUSTERING_DB_PORT / CLUSTERING_DB_NAME / "
"CLUSTERING_DB_USER / CLUSTERING_DB_PASSWORD."
)
return ServiceSettings(db_dsn=dsn)
def _build_dsn_from_parts() -> str | None:
host = os.getenv("CLUSTERING_DB_HOST")
database = os.getenv("CLUSTERING_DB_NAME")
user = os.getenv("CLUSTERING_DB_USER")
password = os.getenv("CLUSTERING_DB_PASSWORD")
port = os.getenv("CLUSTERING_DB_PORT", "5432")
if not host or not database or not user:
return None
if password:
return f"postgresql://{user}:{password}@{host}:{port}/{database}"
return f"postgresql://{user}@{host}:{port}/{database}"

View File

@ -0,0 +1,7 @@
fastapi==0.115.12
uvicorn[standard]==0.34.2
numpy==2.2.5
scikit-learn==1.7.0
hdbscan==0.8.40
umap-learn==0.5.7
psycopg2-binary==2.9.10

View File

@ -0,0 +1,85 @@
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_health():
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "UP"
assert "DBSCAN" in data["algorithms"]
def test_kmeans_cluster():
body = {
"algorithm": "KMEANS",
"parameters": {"k": 2},
"reductionMethod": "NONE",
"items": [
{
"embeddingId": "11111111-1111-1111-1111-111111111111",
"documentId": "22222222-2222-2222-2222-222222222221",
"representationId": "33333333-3333-3333-3333-333333333331",
"vector": [1.0, 1.0]
},
{
"embeddingId": "11111111-1111-1111-1111-111111111112",
"documentId": "22222222-2222-2222-2222-222222222222",
"representationId": "33333333-3333-3333-3333-333333333332",
"vector": [1.1, 1.0]
},
{
"embeddingId": "11111111-1111-1111-1111-111111111113",
"documentId": "22222222-2222-2222-2222-222222222223",
"representationId": "33333333-3333-3333-3333-333333333333",
"vector": [-1.0, -1.0]
},
{
"embeddingId": "11111111-1111-1111-1111-111111111114",
"documentId": "22222222-2222-2222-2222-222222222224",
"representationId": "33333333-3333-3333-3333-333333333334",
"vector": [-1.1, -1.0]
}
]
}
response = client.post("/cluster", json=body)
assert response.status_code == 200
data = response.json()
assert len(data["clusters"]) == 2
assert data["noiseCount"] == 0
def test_dbscan_cluster_with_noise():
body = {
"algorithm": "DBSCAN",
"parameters": {"eps": 0.25, "minSamples": 2, "normalizeVectors": False},
"reductionMethod": "NONE",
"items": [
{
"embeddingId": "11111111-1111-1111-1111-111111111211",
"documentId": "22222222-2222-2222-2222-222222222211",
"representationId": "33333333-3333-3333-3333-333333333211",
"vector": [0.0, 0.0]
},
{
"embeddingId": "11111111-1111-1111-1111-111111111212",
"documentId": "22222222-2222-2222-2222-222222222212",
"representationId": "33333333-3333-3333-3333-333333333212",
"vector": [0.05, 0.0]
},
{
"embeddingId": "11111111-1111-1111-1111-111111111213",
"documentId": "22222222-2222-2222-2222-222222222213",
"representationId": "33333333-3333-3333-3333-333333333213",
"vector": [10.0, 10.0]
}
]
}
response = client.post("/cluster", json=body)
assert response.status_code == 200
data = response.json()
assert data["noiseCount"] == 1

View File

@ -0,0 +1,11 @@
package at.procon.dip.clustering;
public enum ClusterRunStatus {
CREATED,
QUEUED,
RUNNING,
CANCEL_REQUESTED,
COMPLETED,
FAILED,
CANCELLED
}

View File

@ -0,0 +1,9 @@
package at.procon.dip.clustering;
public enum ClusteringAlgorithm {
KMEANS,
MINI_BATCH_KMEANS,
DBSCAN,
HDBSCAN,
AGGLOMERATIVE
}

View File

@ -0,0 +1,6 @@
package at.procon.dip.clustering;
public enum ClusteringExecutionBackend {
JAVA_LOCAL,
PYTHON_REMOTE
}

View File

@ -0,0 +1,6 @@
package at.procon.dip.clustering;
public enum PythonRequestMode {
INLINE_VECTORS,
RUN_ID
}

View File

@ -0,0 +1,7 @@
package at.procon.dip.clustering;
public enum ReductionMethod {
NONE,
PCA,
UMAP
}

View File

@ -0,0 +1,12 @@
package at.procon.dip.clustering.client;
import at.procon.dip.clustering.dto.PythonClusteringRequest;
import at.procon.dip.clustering.dto.PythonClusteringResponse;
import at.procon.dip.clustering.dto.PythonRunExecutionRequest;
public interface PythonClusteringClient {
PythonClusteringResponse cluster(PythonClusteringRequest request);
PythonClusteringResponse clusterRun(PythonRunExecutionRequest request);
}

View File

@ -0,0 +1,96 @@
package at.procon.dip.clustering.client;
import at.procon.dip.clustering.config.ClusteringPhaseBProperties;
import at.procon.dip.clustering.dto.PythonClusteringRequest;
import at.procon.dip.clustering.dto.PythonClusteringResponse;
import at.procon.dip.clustering.dto.PythonRunExecutionRequest;
import java.net.http.HttpClient;
import java.time.Duration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.MediaType;
import org.springframework.http.client.JdkClientHttpRequestFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestClient;
import org.springframework.web.server.ResponseStatusException;
@Component
@ConditionalOnProperty(prefix = "dip.clustering.python", name = "enabled", havingValue = "true")
public class RestPythonClusteringClient implements PythonClusteringClient {
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(30);
private static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(30);
private final ClusteringPhaseBProperties properties;
private final RestClient restClient;
public RestPythonClusteringClient(ClusteringPhaseBProperties properties) {
this.properties = properties;
Duration connectTimeout = properties.connectTimeout() != null
? properties.connectTimeout()
: DEFAULT_CONNECT_TIMEOUT;
Duration readTimeout = properties.readTimeout() != null
? properties.readTimeout()
: DEFAULT_READ_TIMEOUT;
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(connectTimeout)
.version(HttpClient.Version.HTTP_1_1)
.build();
JdkClientHttpRequestFactory requestFactory = new JdkClientHttpRequestFactory(httpClient);
requestFactory.setReadTimeout(readTimeout);
this.restClient = RestClient.builder()
.requestFactory(requestFactory)
.build();
}
@Override
public PythonClusteringResponse cluster(PythonClusteringRequest request) {
String url = properties.resolvedClusterUrl();
if (url == null || url.isBlank()) {
throw new ResponseStatusException(org.springframework.http.HttpStatus.BAD_REQUEST,
"Python clustering is enabled but no baseUrl/clusterPath is configured");
}
try {
return restClient.post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(request)
.retrieve()
.body(PythonClusteringResponse.class);
} catch (Exception ex) {
throw new ResponseStatusException(
org.springframework.http.HttpStatus.BAD_GATEWAY,
"Python cluster request failed: " + ex.getMessage(),
ex);
}
}
@Override
public PythonClusteringResponse clusterRun(PythonRunExecutionRequest request) {
String url = properties.resolvedClusterRunUrl();
if (url == null || url.isBlank()) {
throw new ResponseStatusException(org.springframework.http.HttpStatus.BAD_REQUEST,
"Python clustering is enabled but no baseUrl/clusterRunPath is configured");
}
try {
return restClient.post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(request)
.retrieve()
.body(PythonClusteringResponse.class);
} catch (Exception ex) {
throw new ResponseStatusException(
org.springframework.http.HttpStatus.BAD_GATEWAY,
"Python cluster-run request failed: " + ex.getMessage(),
ex);
}
}
}

View File

@ -0,0 +1,25 @@
package at.procon.dip.clustering.config;
import java.util.concurrent.Executor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableConfigurationProperties(ClusteringExecutionProperties.class)
public class ClusteringExecutionConfiguration {
@Bean(name = "clusteringRunExecutor")
public Executor clusteringRunExecutor(ClusteringExecutionProperties properties) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("clustering-run-");
executor.setCorePoolSize(properties.resolvedCorePoolSize());
executor.setMaxPoolSize(Math.max(properties.resolvedCorePoolSize(), properties.resolvedMaxPoolSize()));
executor.setQueueCapacity(properties.resolvedQueueCapacity());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,22 @@
package at.procon.dip.clustering.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "dip.clustering.execution")
public record ClusteringExecutionProperties(
int corePoolSize,
int maxPoolSize,
int queueCapacity
) {
public int resolvedCorePoolSize() {
return corePoolSize > 0 ? corePoolSize : 1;
}
public int resolvedMaxPoolSize() {
return maxPoolSize > 0 ? maxPoolSize : Math.max(1, resolvedCorePoolSize());
}
public int resolvedQueueCapacity() {
return queueCapacity >= 0 ? queueCapacity : 50;
}
}

View File

@ -0,0 +1,9 @@
package at.procon.dip.clustering.config;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties(ClusteringPhaseBProperties.class)
public class ClusteringPhaseBConfig {
}

View File

@ -0,0 +1,37 @@
package at.procon.dip.clustering.config;
import at.procon.dip.clustering.PythonRequestMode;
import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "dip.clustering.python")
public record ClusteringPhaseBProperties(
boolean enabled,
String baseUrl,
String clusterPath,
String clusterRunPath,
Duration connectTimeout,
Duration readTimeout,
PythonRequestMode requestMode
) {
public String resolvedClusterUrl() {
return resolveUrl(clusterPath == null || clusterPath.isBlank() ? "/cluster" : clusterPath);
}
public String resolvedClusterRunUrl() {
return resolveUrl(clusterRunPath == null || clusterRunPath.isBlank() ? "/cluster-run" : clusterRunPath);
}
public PythonRequestMode effectiveRequestMode() {
return requestMode == null ? PythonRequestMode.INLINE_VECTORS : requestMode;
}
private String resolveUrl(String path) {
if (baseUrl == null || baseUrl.isBlank()) {
return null;
}
return baseUrl.endsWith("/")
? baseUrl.substring(0, baseUrl.length() - 1) + path
: baseUrl + path;
}
}

View File

@ -0,0 +1,16 @@
package at.procon.dip.clustering.dto;
import java.util.UUID;
public record ClusterAssignmentResponse(
UUID id,
UUID clusterId,
UUID embeddingId,
UUID documentId,
UUID representationId,
Integer clusterLabelRaw,
Double membershipScore,
Double distanceToCentroid,
boolean noise
) {
}

View File

@ -0,0 +1,26 @@
package at.procon.dip.clustering.dto;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.domain.document.RepresentationType;
import java.util.UUID;
public record ClusterAssignmentViewResponse(
UUID id,
UUID clusterId,
UUID embeddingId,
UUID documentId,
UUID representationId,
Integer clusterLabelRaw,
Double membershipScore,
Double distanceToCentroid,
boolean noise,
String businessKey,
DocumentType documentType,
RepresentationType representationType,
String builderKey,
String languageCode,
Integer textLength,
String textPreview,
String textBody
) {
}

View File

@ -0,0 +1,10 @@
package at.procon.dip.clustering.dto;
import java.util.UUID;
public record ClusterMembersResponse(
UUID clusterId,
Integer clusterLabel,
ClusterAssignmentViewResponse member
) {
}

View File

@ -0,0 +1,13 @@
package at.procon.dip.clustering.dto;
import java.util.UUID;
public record ClusterResponse(
UUID id,
Integer clusterLabel,
String displayName,
Long itemCount,
boolean noiseCluster,
String summaryText
) {
}

View File

@ -0,0 +1,25 @@
package at.procon.dip.clustering.dto;
import at.procon.dip.clustering.ClusterRunStatus;
import at.procon.dip.clustering.ClusteringAlgorithm;
import at.procon.dip.clustering.ClusteringExecutionBackend;
import at.procon.dip.clustering.ReductionMethod;
import java.time.OffsetDateTime;
import java.util.UUID;
public record ClusterRunResponse(
UUID id,
String name,
ClusterRunStatus status,
ClusteringAlgorithm algorithm,
ClusteringExecutionBackend executionBackend,
ReductionMethod reductionMethod,
Integer reductionDimensions,
Long itemCount,
Long clusterCount,
Long noiseCount,
OffsetDateTime startedAt,
OffsetDateTime finishedAt,
String errorMessage
) {
}

View File

@ -0,0 +1,16 @@
package at.procon.dip.clustering.dto;
import java.time.OffsetDateTime;
import java.util.UUID;
public record ClusterSetResponse(
UUID id,
String code,
String name,
String description,
boolean active,
EmbeddingSelectionSpec selection,
OffsetDateTime createdAt,
OffsetDateTime updatedAt
) {
}

View File

@ -0,0 +1,33 @@
package at.procon.dip.clustering.dto;
import java.util.UUID;
public record ClusteringEngineAssignment(
UUID embeddingId,
UUID documentId,
UUID representationId,
int clusterLabel,
Double distanceToCentroid,
Double membershipScore,
boolean noise
) {
public ClusteringEngineAssignment(
UUID embeddingId,
int clusterLabel,
Double distanceToCentroid,
Double membershipScore,
boolean noise
) {
this(embeddingId, null, null, clusterLabel, distanceToCentroid, membershipScore, noise);
}
public ClusteringEngineAssignment(
UUID embeddingId,
UUID documentId,
UUID representationId,
int clusterLabel,
Double distanceToCentroid
) {
this(embeddingId, documentId, representationId, clusterLabel, distanceToCentroid, null, false);
}
}

View File

@ -0,0 +1,11 @@
package at.procon.dip.clustering.dto;
public record ClusteringEngineCluster(
int clusterLabel,
long itemCount,
boolean noiseCluster
) {
public ClusteringEngineCluster(int clusterLabel, long itemCount) {
this(clusterLabel, itemCount, false);
}
}

View File

@ -0,0 +1,36 @@
package at.procon.dip.clustering.dto;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
public record ClusteringEngineRequest(Map<String, Object> parameters) {
public ClusteringEngineRequest {
parameters = parameters == null
? Map.of()
: Collections.unmodifiableMap(new LinkedHashMap<>(parameters));
}
public int requiredInt(String key) {
Object value = parameters.get(key);
if (value == null) {
throw new IllegalArgumentException("Missing required clustering parameter: " + key);
}
if (value instanceof Number number) {
return number.intValue();
}
return Integer.parseInt(String.valueOf(value));
}
public int intValue(String key, int defaultValue) {
Object value = parameters.get(key);
if (value == null) {
return defaultValue;
}
if (value instanceof Number number) {
return number.intValue();
}
return Integer.parseInt(String.valueOf(value));
}
}

View File

@ -0,0 +1,16 @@
package at.procon.dip.clustering.dto;
import java.util.List;
public record ClusteringEngineResult(
List<ClusteringEngineCluster> clusters,
List<ClusteringEngineAssignment> assignments,
long noiseCount
) {
public ClusteringEngineResult(
List<ClusteringEngineCluster> clusters,
List<ClusteringEngineAssignment> assignments
) {
this(clusters, assignments, 0L);
}
}

View File

@ -0,0 +1,65 @@
package at.procon.dip.clustering.dto;
import at.procon.dip.clustering.ClusteringAlgorithm;
import at.procon.dip.clustering.ClusteringExecutionBackend;
import com.fasterxml.jackson.annotation.JsonIgnore;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.util.LinkedHashMap;
import java.util.Map;
public record CreateClusterRunRequest(
String clusterSetCode,
@NotBlank String name,
@NotNull ClusteringAlgorithm algorithm,
ClusteringExecutionBackend executionBackend,
@Valid ReductionConfig reduction,
@Valid @NotNull EmbeddingSelectionSpec selection,
Integer k,
Map<String, Object> parameters
) {
@JsonIgnore
public Map<String, Object> resolvedParameters() {
Map<String, Object> merged = new LinkedHashMap<>();
if (parameters != null) {
merged.putAll(parameters);
}
if (k != null && !merged.containsKey("k")) {
merged.put("k", k);
}
return merged;
}
@AssertTrue(message = "k must be > 0 for KMEANS and MINI_BATCH_KMEANS; for other algorithms it must be omitted or > 0")
@JsonIgnore
public boolean isValidKConfiguration() {
Integer effectiveK = extractPositiveInteger(resolvedParameters().get("k"));
if (algorithm == ClusteringAlgorithm.KMEANS
|| algorithm == ClusteringAlgorithm.MINI_BATCH_KMEANS) {
return effectiveK != null;
}
Object rawK = resolvedParameters().get("k");
return rawK == null || effectiveK != null;
}
private Integer extractPositiveInteger(Object value) {
if (value == null) {
return null;
}
if (value instanceof Number number) {
int intValue = number.intValue();
return intValue > 0 ? intValue : null;
}
try {
int intValue = Integer.parseInt(String.valueOf(value));
return intValue > 0 ? intValue : null;
} catch (NumberFormatException ex) {
return null;
}
}
}

View File

@ -0,0 +1,14 @@
package at.procon.dip.clustering.dto;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
public record CreateClusterSetRequest(
@NotBlank String code,
@NotBlank String name,
String description,
@Valid @NotNull EmbeddingSelectionSpec selection,
Boolean active
) {
}

View File

@ -0,0 +1,26 @@
package at.procon.dip.clustering.dto;
import at.procon.dip.domain.document.DocumentFamily;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.domain.document.EmbeddingStatus;
import at.procon.dip.domain.document.RepresentationType;
import java.time.OffsetDateTime;
import java.util.Set;
import java.util.UUID;
public record EmbeddingSelectionSpec(
Set<DocumentType> documentTypes,
Set<DocumentFamily> documentFamilies,
Set<RepresentationType> representationTypes,
Set<EmbeddingStatus> embeddingStatuses,
Set<UUID> modelIds,
Set<UUID> prefixProfileIds,
Set<String> builderKeys,
Set<String> languageCodes,
Set<UUID> ownerTenantIds,
String businessKeyLike,
OffsetDateTime createdFrom,
OffsetDateTime createdTo,
Boolean primaryRepresentationOnly
) {
}

View File

@ -0,0 +1,23 @@
package at.procon.dip.clustering.dto;
import at.procon.dip.clustering.ClusteringAlgorithm;
import at.procon.dip.clustering.ReductionMethod;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public record PythonClusteringRequest(
ClusteringAlgorithm algorithm,
Map<String, Object> parameters,
ReductionMethod reductionMethod,
Integer reductionDimensions,
List<PythonClusteringItem> items
) {
public record PythonClusteringItem(
UUID embeddingId,
UUID documentId,
UUID representationId,
float[] vector
) {
}
}

View File

@ -0,0 +1,28 @@
package at.procon.dip.clustering.dto;
import java.util.List;
import java.util.UUID;
public record PythonClusteringResponse(
List<PythonCluster> clusters,
List<PythonAssignment> assignments,
Long noiseCount
) {
public record PythonCluster(
Integer clusterLabel,
Long itemCount,
Boolean noiseCluster
) {
}
public record PythonAssignment(
UUID embeddingId,
UUID documentId,
UUID representationId,
Integer clusterLabel,
Double distanceToCentroid,
Double membershipScore,
Boolean noise
) {
}
}

View File

@ -0,0 +1,8 @@
package at.procon.dip.clustering.dto;
import java.util.UUID;
public record PythonRunExecutionRequest(
UUID runId
) {
}

View File

@ -0,0 +1,9 @@
package at.procon.dip.clustering.dto;
import at.procon.dip.clustering.ReductionMethod;
public record ReductionConfig(
ReductionMethod method,
Integer targetDimensions
) {
}

View File

@ -0,0 +1,22 @@
package at.procon.dip.clustering.dto;
import at.procon.dip.domain.document.DocumentFamily;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.domain.document.RepresentationType;
import java.util.UUID;
public record SelectedEmbeddingRow(
UUID embeddingId,
UUID documentId,
UUID representationId,
UUID modelId,
UUID prefixProfileId,
DocumentType documentType,
DocumentFamily documentFamily,
RepresentationType representationType,
String builderKey,
String languageCode,
String businessKey,
float[] embeddingVector
) {
}

View File

@ -0,0 +1,4 @@
package at.procon.dip.clustering.dto;
public record SelectionCountResponse(long count) {
}

View File

@ -0,0 +1,13 @@
package at.procon.dip.clustering.dto;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
public record UpdateClusterSetRequest(
@NotBlank String name,
String description,
@Valid @NotNull EmbeddingSelectionSpec selection,
Boolean active
) {
}

View File

@ -0,0 +1,72 @@
package at.procon.dip.clustering.entity;
import at.procon.dip.architecture.SchemaNames;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Index;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.PrePersist;
import jakarta.persistence.Table;
import java.time.OffsetDateTime;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.hibernate.annotations.JdbcTypeCode;
import org.hibernate.type.SqlTypes;
@Entity
@Table(schema = SchemaNames.DOC, name = "doc_embedding_cluster", indexes = {
@Index(name = "idx_doc_cluster_cluster_run_jpa", columnList = "cluster_run_id")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class EmbeddingCluster {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@ManyToOne(fetch = FetchType.LAZY, optional = false)
@JoinColumn(name = "cluster_run_id", nullable = false)
private EmbeddingClusterRun clusterRun;
@Column(name = "cluster_label", nullable = false)
private Integer clusterLabel;
@Column(name = "display_name", length = 255)
private String displayName;
@Column(name = "item_count", nullable = false)
private Long itemCount;
@Builder.Default
@Column(name = "is_noise_cluster", nullable = false)
private boolean noiseCluster = false;
@Column(name = "summary_text", columnDefinition = "TEXT")
private String summaryText;
@JdbcTypeCode(SqlTypes.JSON)
@Column(name = "top_terms_json", columnDefinition = "jsonb")
private String topTermsJson;
@Builder.Default
@Column(name = "created_at", nullable = false, updatable = false)
private OffsetDateTime createdAt = OffsetDateTime.now();
@PrePersist
protected void onCreate() {
createdAt = OffsetDateTime.now();
}
}

View File

@ -0,0 +1,84 @@
package at.procon.dip.clustering.entity;
import at.procon.dip.architecture.SchemaNames;
import at.procon.dip.domain.document.entity.Document;
import at.procon.dip.domain.document.entity.DocumentEmbedding;
import at.procon.dip.domain.document.entity.DocumentTextRepresentation;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Index;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.PrePersist;
import jakarta.persistence.Table;
import java.time.OffsetDateTime;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Entity
@Table(schema = SchemaNames.DOC, name = "doc_embedding_cluster_assignment", indexes = {
@Index(name = "idx_doc_cluster_assignment_run_jpa", columnList = "cluster_run_id"),
@Index(name = "idx_doc_cluster_assignment_cluster_jpa", columnList = "cluster_id"),
@Index(name = "idx_doc_cluster_assignment_document_jpa", columnList = "cluster_run_id, document_id")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class EmbeddingClusterAssignment {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@ManyToOne(fetch = FetchType.LAZY, optional = false)
@JoinColumn(name = "cluster_run_id", nullable = false)
private EmbeddingClusterRun clusterRun;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "cluster_id")
private EmbeddingCluster cluster;
@ManyToOne(fetch = FetchType.LAZY, optional = false)
@JoinColumn(name = "embedding_id", nullable = false)
private DocumentEmbedding embedding;
@ManyToOne(fetch = FetchType.LAZY, optional = false)
@JoinColumn(name = "document_id", nullable = false)
private Document document;
@ManyToOne(fetch = FetchType.LAZY, optional = false)
@JoinColumn(name = "representation_id", nullable = false)
private DocumentTextRepresentation representation;
@Column(name = "cluster_label_raw", nullable = false)
private Integer clusterLabelRaw;
@Column(name = "membership_score")
private Double membershipScore;
@Column(name = "distance_to_centroid")
private Double distanceToCentroid;
@Builder.Default
@Column(name = "is_noise", nullable = false)
private boolean noise = false;
@Builder.Default
@Column(name = "created_at", nullable = false, updatable = false)
private OffsetDateTime createdAt = OffsetDateTime.now();
@PrePersist
protected void onCreate() {
createdAt = OffsetDateTime.now();
}
}

View File

@ -0,0 +1,147 @@
package at.procon.dip.clustering.entity;
import at.procon.dip.architecture.SchemaNames;
import at.procon.dip.clustering.ClusterRunStatus;
import at.procon.dip.clustering.ClusteringAlgorithm;
import at.procon.dip.clustering.ClusteringExecutionBackend;
import at.procon.dip.clustering.ReductionMethod;
import at.procon.dip.domain.document.DocumentFamily;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.domain.document.RepresentationType;
import at.procon.dip.domain.document.entity.DocumentEmbeddingModel;
import at.procon.dip.domain.document.entity.DocumentEmbeddingPrefixProfile;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Index;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.PrePersist;
import jakarta.persistence.PreUpdate;
import jakarta.persistence.Table;
import java.time.OffsetDateTime;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.hibernate.annotations.JdbcTypeCode;
import org.hibernate.type.SqlTypes;
@Entity
@Table(schema = SchemaNames.DOC, name = "doc_embedding_cluster_run", indexes = {
@Index(name = "idx_doc_cluster_run_status_jpa", columnList = "status"),
@Index(name = "idx_doc_cluster_run_algorithm_jpa", columnList = "algorithm"),
@Index(name = "idx_doc_cluster_run_created_at_jpa", columnList = "created_at")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class EmbeddingClusterRun {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "cluster_set_id")
private EmbeddingClusterSet clusterSet;
@Column(name = "name", nullable = false, length = 255)
private String name;
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false, length = 32)
private ClusterRunStatus status;
@Enumerated(EnumType.STRING)
@Column(name = "algorithm", nullable = false, length = 64)
private ClusteringAlgorithm algorithm;
@Column(name = "algorithm_version", length = 64)
private String algorithmVersion;
@Enumerated(EnumType.STRING)
@Column(name = "execution_backend", length = 64)
private ClusteringExecutionBackend executionBackend;
@Enumerated(EnumType.STRING)
@Column(name = "reduction_method", length = 32)
private ReductionMethod reductionMethod;
@Column(name = "reduction_dimensions")
private Integer reductionDimensions;
@JdbcTypeCode(SqlTypes.JSON)
@Column(name = "selection_json", nullable = false, columnDefinition = "jsonb")
private String selectionJson;
@JdbcTypeCode(SqlTypes.JSON)
@Column(name = "parameters_json", nullable = false, columnDefinition = "jsonb")
private String parametersJson;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "embedding_model_id")
private DocumentEmbeddingModel embeddingModel;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "prefix_profile_id")
private DocumentEmbeddingPrefixProfile prefixProfile;
@Enumerated(EnumType.STRING)
@Column(name = "document_type", length = 64)
private DocumentType documentType;
@Enumerated(EnumType.STRING)
@Column(name = "document_family", length = 64)
private DocumentFamily documentFamily;
@Enumerated(EnumType.STRING)
@Column(name = "representation_type", length = 64)
private RepresentationType representationType;
@Column(name = "builder_key", length = 255)
private String builderKey;
@Column(name = "item_count")
private Long itemCount;
@Column(name = "cluster_count")
private Long clusterCount;
@Column(name = "noise_count")
private Long noiseCount;
@Column(name = "started_at")
private OffsetDateTime startedAt;
@Column(name = "finished_at")
private OffsetDateTime finishedAt;
@Column(name = "error_message", columnDefinition = "TEXT")
private String errorMessage;
@Builder.Default
@Column(name = "created_at", nullable = false, updatable = false)
private OffsetDateTime createdAt = OffsetDateTime.now();
@PrePersist
protected void onCreate() {
createdAt = OffsetDateTime.now();
}
@PreUpdate
protected void onUpdate() {
if (status == null) {
status = ClusterRunStatus.CREATED;
}
}
}

View File

@ -0,0 +1,74 @@
package at.procon.dip.clustering.entity;
import at.procon.dip.architecture.SchemaNames;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Index;
import jakarta.persistence.PrePersist;
import jakarta.persistence.PreUpdate;
import jakarta.persistence.Table;
import java.time.OffsetDateTime;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.hibernate.annotations.JdbcTypeCode;
import org.hibernate.type.SqlTypes;
@Entity
@Table(schema = SchemaNames.DOC, name = "doc_embedding_cluster_set", indexes = {
@Index(name = "idx_doc_embedding_cluster_set_code", columnList = "code", unique = true),
@Index(name = "idx_doc_embedding_cluster_set_active", columnList = "active")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class EmbeddingClusterSet {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@Column(name = "code", nullable = false, length = 128, unique = true)
private String code;
@Column(name = "name", nullable = false, length = 255)
private String name;
@Column(name = "description", columnDefinition = "TEXT")
private String description;
@JdbcTypeCode(SqlTypes.JSON)
@Column(name = "selection_json", nullable = false, columnDefinition = "jsonb")
private String selectionJson;
@Builder.Default
@Column(name = "active", nullable = false)
private boolean active = true;
@Builder.Default
@Column(name = "created_at", nullable = false, updatable = false)
private OffsetDateTime createdAt = OffsetDateTime.now();
@Builder.Default
@Column(name = "updated_at", nullable = false)
private OffsetDateTime updatedAt = OffsetDateTime.now();
@PrePersist
protected void onCreate() {
createdAt = OffsetDateTime.now();
updatedAt = OffsetDateTime.now();
}
@PreUpdate
protected void onUpdate() {
updatedAt = OffsetDateTime.now();
}
}

View File

@ -0,0 +1,10 @@
package at.procon.dip.clustering.repository;
import at.procon.dip.clustering.dto.EmbeddingSelectionSpec;
import at.procon.dip.clustering.dto.SelectedEmbeddingRow;
import java.util.List;
public interface DocumentEmbeddingClusterSelectionRepository {
List<SelectedEmbeddingRow> findSelection(EmbeddingSelectionSpec spec);
long countSelection(EmbeddingSelectionSpec spec);
}

View File

@ -0,0 +1,219 @@
package at.procon.dip.clustering.repository;
import at.procon.dip.clustering.dto.EmbeddingSelectionSpec;
import at.procon.dip.clustering.dto.SelectedEmbeddingRow;
import at.procon.dip.domain.document.DocumentFamily;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.domain.document.RepresentationType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@Repository
@RequiredArgsConstructor
public class DocumentEmbeddingClusterSelectionRepositoryImpl implements DocumentEmbeddingClusterSelectionRepository {
private final NamedParameterJdbcTemplate jdbcTemplate;
@Override
public List<SelectedEmbeddingRow> findSelection(EmbeddingSelectionSpec spec) {
StringBuilder sql = new StringBuilder("""
select
e.id as embedding_id,
d.id as document_id,
r.id as representation_id,
e.model_id,
e.prefix_profile_id,
d.document_type,
d.document_family,
r.representation_type,
r.builder_key,
r.language_code,
d.business_key,
e.embedding_vector::text as embedding_vector_text
from doc.doc_embedding e
join doc.doc_document d on d.id = e.document_id
join doc.doc_text_representation r on r.id = e.representation_id
where e.embedding_status = 'COMPLETED'
and e.embedding_vector is not null
and e.prefix_profile_id is not null
""");
MapSqlParameterSource params = new MapSqlParameterSource();
applyFilters(spec, sql, params);
sql.append(" order by e.created_at asc");
List<RawSelectedEmbeddingRow> rawRows = jdbcTemplate.query(
sql.toString(),
params,
new RawSelectedEmbeddingRowMapper());
return rawRows.stream()
.map(this::toSelectedEmbeddingRow)
.toList();
}
@Override
public long countSelection(EmbeddingSelectionSpec spec) {
StringBuilder sql = new StringBuilder("""
select count(*)
from doc.doc_embedding e
join doc.doc_document d on d.id = e.document_id
join doc.doc_text_representation r on r.id = e.representation_id
where e.embedding_status = 'COMPLETED'
and e.embedding_vector is not null
and e.prefix_profile_id is not null
""");
MapSqlParameterSource params = new MapSqlParameterSource();
applyFilters(spec, sql, params);
Long result = jdbcTemplate.queryForObject(sql.toString(), params, Long.class);
return result == null ? 0L : result;
}
private void applyFilters(EmbeddingSelectionSpec spec, StringBuilder sql, MapSqlParameterSource params) {
if (spec == null) {
return;
}
if (!CollectionUtils.isEmpty(spec.documentTypes())) {
sql.append(" and d.document_type in (:documentTypes)");
params.addValue("documentTypes", enumNames(spec.documentTypes()));
}
if (!CollectionUtils.isEmpty(spec.documentFamilies())) {
sql.append(" and d.document_family in (:documentFamilies)");
params.addValue("documentFamilies", enumNames(spec.documentFamilies()));
}
if (!CollectionUtils.isEmpty(spec.representationTypes())) {
sql.append(" and r.representation_type in (:representationTypes)");
params.addValue("representationTypes", enumNames(spec.representationTypes()));
}
if (!CollectionUtils.isEmpty(spec.embeddingStatuses())) {
sql.append(" and e.embedding_status in (:embeddingStatuses)");
params.addValue("embeddingStatuses", enumNames(spec.embeddingStatuses()));
}
if (!CollectionUtils.isEmpty(spec.modelIds())) {
sql.append(" and e.model_id in (:modelIds)");
params.addValue("modelIds", spec.modelIds());
}
if (!CollectionUtils.isEmpty(spec.prefixProfileIds())) {
sql.append(" and e.prefix_profile_id in (:prefixProfileIds)");
params.addValue("prefixProfileIds", spec.prefixProfileIds());
}
if (!CollectionUtils.isEmpty(spec.builderKeys())) {
sql.append(" and r.builder_key in (:builderKeys)");
params.addValue("builderKeys", spec.builderKeys());
}
if (!CollectionUtils.isEmpty(spec.languageCodes())) {
sql.append(" and r.language_code in (:languageCodes)");
params.addValue("languageCodes", spec.languageCodes());
}
if (!CollectionUtils.isEmpty(spec.ownerTenantIds())) {
sql.append(" and d.owner_tenant_id in (:ownerTenantIds)");
params.addValue("ownerTenantIds", spec.ownerTenantIds());
}
if (StringUtils.hasText(spec.businessKeyLike())) {
sql.append(" and d.business_key like :businessKeyLike");
params.addValue("businessKeyLike", spec.businessKeyLike());
}
if (spec.createdFrom() != null) {
sql.append(" and d.created_at >= :createdFrom");
params.addValue("createdFrom", spec.createdFrom());
}
if (spec.createdTo() != null) {
sql.append(" and d.created_at < :createdTo");
params.addValue("createdTo", spec.createdTo());
}
if (Boolean.TRUE.equals(spec.primaryRepresentationOnly())) {
sql.append(" and r.is_primary = true");
}
}
private List<String> enumNames(Iterable<?> values) {
List<String> result = new ArrayList<>();
for (Object value : values) {
result.add(String.valueOf(value));
}
return result;
}
private SelectedEmbeddingRow toSelectedEmbeddingRow(RawSelectedEmbeddingRow row) {
return new SelectedEmbeddingRow(
row.embeddingId(),
row.documentId(),
row.representationId(),
row.modelId(),
row.prefixProfileId(),
row.documentType(),
row.documentFamily(),
row.representationType(),
row.builderKey(),
row.languageCode(),
row.businessKey(),
parseVector(row.embeddingVectorText()));
}
private float[] parseVector(String raw) {
if (raw == null) {
return null;
}
String value = raw.trim();
if (value.length() < 2) {
return new float[0];
}
if (value.charAt(0) == '[' && value.charAt(value.length() - 1) == ']') {
value = value.substring(1, value.length() - 1);
}
if (value.isBlank()) {
return new float[0];
}
String[] parts = value.split(",");
float[] result = new float[parts.length];
for (int i = 0; i < parts.length; i++) {
result[i] = Float.parseFloat(parts[i].trim());
}
return result;
}
private record RawSelectedEmbeddingRow(
UUID embeddingId,
UUID documentId,
UUID representationId,
UUID modelId,
UUID prefixProfileId,
DocumentType documentType,
DocumentFamily documentFamily,
RepresentationType representationType,
String builderKey,
String languageCode,
String businessKey,
String embeddingVectorText
) {
}
private static class RawSelectedEmbeddingRowMapper implements RowMapper<RawSelectedEmbeddingRow> {
@Override
public RawSelectedEmbeddingRow mapRow(ResultSet rs, int rowNum) throws SQLException {
return new RawSelectedEmbeddingRow(
rs.getObject("embedding_id", UUID.class),
rs.getObject("document_id", UUID.class),
rs.getObject("representation_id", UUID.class),
rs.getObject("model_id", UUID.class),
rs.getObject("prefix_profile_id", UUID.class),
DocumentType.valueOf(rs.getString("document_type")),
DocumentFamily.valueOf(rs.getString("document_family")),
RepresentationType.valueOf(rs.getString("representation_type")),
rs.getString("builder_key"),
rs.getString("language_code"),
rs.getString("business_key"),
rs.getString("embedding_vector_text")
);
}
}
}

View File

@ -0,0 +1,10 @@
package at.procon.dip.clustering.repository;
import at.procon.dip.clustering.entity.EmbeddingClusterAssignment;
import java.util.List;
import java.util.UUID;
import org.springframework.data.jpa.repository.JpaRepository;
public interface EmbeddingClusterAssignmentRepository extends JpaRepository<EmbeddingClusterAssignment, UUID> {
List<EmbeddingClusterAssignment> findByClusterRun_IdOrderByClusterLabelRawAscDocument_IdAsc(UUID clusterRunId);
}

View File

@ -0,0 +1,10 @@
package at.procon.dip.clustering.repository;
import at.procon.dip.clustering.entity.EmbeddingCluster;
import java.util.List;
import java.util.UUID;
import org.springframework.data.jpa.repository.JpaRepository;
public interface EmbeddingClusterRepository extends JpaRepository<EmbeddingCluster, UUID> {
List<EmbeddingCluster> findByClusterRun_IdOrderByClusterLabelAsc(UUID clusterRunId);
}

View File

@ -0,0 +1,11 @@
package at.procon.dip.clustering.repository;
import at.procon.dip.clustering.dto.ClusterAssignmentViewResponse;
import at.procon.dip.clustering.dto.ClusterMembersResponse;
import java.util.List;
import java.util.UUID;
public interface EmbeddingClusterResultQueryRepository {
List<ClusterAssignmentViewResponse> findAssignments(UUID runId, boolean includeText, int previewLength);
List<ClusterMembersResponse> findClusterMembers(UUID runId, UUID clusterId, boolean includeText, int previewLength);
}

View File

@ -0,0 +1,87 @@
package at.procon.dip.clustering.repository;
import at.procon.dip.clustering.dto.ClusterAssignmentViewResponse;
import at.procon.dip.clustering.dto.ClusterMembersResponse;
import at.procon.dip.domain.document.DocumentType;
import at.procon.dip.domain.document.RepresentationType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
@RequiredArgsConstructor
public class EmbeddingClusterResultQueryRepositoryImpl implements EmbeddingClusterResultQueryRepository {
private final NamedParameterJdbcTemplate jdbcTemplate;
@Override
public List<ClusterAssignmentViewResponse> findAssignments(UUID runId, boolean includeText, int previewLength) {
String sql = baseSql(includeText) + " where a.cluster_run_id = :runId order by a.cluster_label_raw asc, d.id asc";
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("runId", runId)
.addValue("previewLength", previewLength);
return jdbcTemplate.query(sql, params, assignmentRowMapper(includeText));
}
@Override
public List<ClusterMembersResponse> findClusterMembers(UUID runId, UUID clusterId, boolean includeText, int previewLength) {
String sql = "select c.id as result_cluster_id, c.cluster_label as result_cluster_label, x.* from doc.doc_embedding_cluster c "
+ "join (" + baseSql(includeText) + ") x on x.cluster_id = c.id "
+ "where c.cluster_run_id = :runId and c.id = :clusterId order by x.business_key asc, x.document_id asc";
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("runId", runId)
.addValue("clusterId", clusterId)
.addValue("previewLength", previewLength);
return jdbcTemplate.query(sql, params, (rs, rowNum) -> new ClusterMembersResponse(
rs.getObject("result_cluster_id", UUID.class),
rs.getInt("result_cluster_label"),
assignmentRowMapper(includeText).mapRow(rs, rowNum)
));
}
private String baseSql(boolean includeText) {
String textBodyExpression = includeText ? "r.text_body" : "null::text";
return "select a.id, a.cluster_id, a.embedding_id, a.document_id, a.representation_id, a.cluster_label_raw, "
+ "a.membership_score, a.distance_to_centroid, a.is_noise, d.business_key, d.document_type, "
+ "r.representation_type, r.builder_key, r.language_code, r.char_count as text_length, "
+ "case when r.text_body is null then null "
+ "when char_length(r.text_body) <= :previewLength then r.text_body "
+ "else substring(r.text_body from 1 for :previewLength) end as text_preview, "
+ textBodyExpression + " as text_body "
+ "from doc.doc_embedding_cluster_assignment a "
+ "join doc.doc_document d on d.id = a.document_id "
+ "join doc.doc_text_representation r on r.id = a.representation_id";
}
private RowMapper<ClusterAssignmentViewResponse> assignmentRowMapper(boolean includeText) {
return (rs, rowNum) -> new ClusterAssignmentViewResponse(
rs.getObject("id", UUID.class),
rs.getObject("cluster_id", UUID.class),
rs.getObject("embedding_id", UUID.class),
rs.getObject("document_id", UUID.class),
rs.getObject("representation_id", UUID.class),
rs.getInt("cluster_label_raw"),
rs.getObject("membership_score", Double.class),
rs.getObject("distance_to_centroid", Double.class),
rs.getBoolean("is_noise"),
rs.getString("business_key"),
enumValue(DocumentType.class, rs.getString("document_type")),
enumValue(RepresentationType.class, rs.getString("representation_type")),
rs.getString("builder_key"),
rs.getString("language_code"),
rs.getObject("text_length", Integer.class),
rs.getString("text_preview"),
includeText ? rs.getString("text_body") : null
);
}
private <T extends Enum<T>> T enumValue(Class<T> type, String value) {
return value == null ? null : Enum.valueOf(type, value);
}
}

View File

@ -0,0 +1,10 @@
package at.procon.dip.clustering.repository;
import at.procon.dip.clustering.entity.EmbeddingClusterRun;
import java.util.UUID;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
public interface EmbeddingClusterRunRepository
extends JpaRepository<EmbeddingClusterRun, UUID>, JpaSpecificationExecutor<EmbeddingClusterRun> {
}

View File

@ -0,0 +1,45 @@
package at.procon.dip.clustering.repository;
import at.procon.dip.clustering.ClusterRunStatus;
import at.procon.dip.clustering.ClusteringAlgorithm;
import at.procon.dip.clustering.ClusteringExecutionBackend;
import at.procon.dip.clustering.entity.EmbeddingClusterRun;
import at.procon.dip.domain.document.DocumentType;
import java.time.OffsetDateTime;
import org.springframework.data.jpa.domain.Specification;
public final class EmbeddingClusterRunSpecifications {
private EmbeddingClusterRunSpecifications() {
}
public static Specification<EmbeddingClusterRun> hasStatus(ClusterRunStatus status) {
return (root, query, cb) -> status == null ? null : cb.equal(root.get("status"), status);
}
public static Specification<EmbeddingClusterRun> hasAlgorithm(ClusteringAlgorithm algorithm) {
return (root, query, cb) -> algorithm == null ? null : cb.equal(root.get("algorithm"), algorithm);
}
public static Specification<EmbeddingClusterRun> hasExecutionBackend(ClusteringExecutionBackend executionBackend) {
return (root, query, cb) -> executionBackend == null ? null : cb.equal(root.get("executionBackend"), executionBackend);
}
public static Specification<EmbeddingClusterRun> hasDocumentType(DocumentType documentType) {
return (root, query, cb) -> documentType == null ? null : cb.equal(root.get("documentType"), documentType);
}
public static Specification<EmbeddingClusterRun> nameContains(String value) {
return (root, query, cb) -> value == null || value.isBlank()
? null
: cb.like(cb.lower(root.get("name")), "%" + value.toLowerCase() + "%");
}
public static Specification<EmbeddingClusterRun> createdAtFrom(OffsetDateTime value) {
return (root, query, cb) -> value == null ? null : cb.greaterThanOrEqualTo(root.get("createdAt"), value);
}
public static Specification<EmbeddingClusterRun> createdAtTo(OffsetDateTime value) {
return (root, query, cb) -> value == null ? null : cb.lessThan(root.get("createdAt"), value);
}
}

View File

@ -0,0 +1,19 @@
package at.procon.dip.clustering.repository;
import at.procon.dip.clustering.entity.EmbeddingClusterSet;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.springframework.data.jpa.repository.JpaRepository;
public interface EmbeddingClusterSetRepository extends JpaRepository<EmbeddingClusterSet, UUID> {
Optional<EmbeddingClusterSet> findByCode(String code);
boolean existsByCodeIgnoreCaseAndIdNot(String code, UUID id);
boolean existsByCodeIgnoreCase(String code);
List<EmbeddingClusterSet> findAllByActiveOrderByCodeAsc(boolean active);
List<EmbeddingClusterSet> findAllByOrderByCodeAsc();
}

View File

@ -0,0 +1,24 @@
package at.procon.dip.clustering.service;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class EmbeddingClusterAsyncExecutionService {
private final EmbeddingClusterRunService runService;
@Async("clusteringRunExecutor")
public void executeRunAsync(UUID runId) {
try {
runService.executeQueuedRun(runId);
} catch (Exception ex) {
log.error("Cluster run {} failed during async execution: {}", runId, ex.getMessage(), ex);
}
}
}

View File

@ -0,0 +1,103 @@
package at.procon.dip.clustering.service;
import at.procon.dip.clustering.dto.ClusteringEngineAssignment;
import at.procon.dip.clustering.dto.ClusteringEngineCluster;
import at.procon.dip.clustering.dto.ClusteringEngineResult;
import at.procon.dip.clustering.entity.EmbeddingCluster;
import at.procon.dip.clustering.entity.EmbeddingClusterRun;
import at.procon.dip.clustering.repository.EmbeddingClusterRepository;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@RequiredArgsConstructor
public class EmbeddingClusterPersistenceService {
private final EmbeddingClusterRepository clusterRepository;
private final NamedParameterJdbcTemplate jdbcTemplate;
@Transactional
public void persist(EmbeddingClusterRun run,
List<ClusteringEngineCluster> clusters,
List<ClusteringEngineAssignment> assignments) {
persist(run, new ClusteringEngineResult(clusters, assignments));
}
@Transactional
public void persist(EmbeddingClusterRun run, ClusteringEngineResult result) {
Map<Integer, EmbeddingCluster> clusterByLabel = new HashMap<>();
for (ClusteringEngineCluster cluster : result.clusters()) {
EmbeddingCluster saved = clusterRepository.save(EmbeddingCluster.builder()
.clusterRun(run)
.clusterLabel(cluster.clusterLabel())
.itemCount(cluster.itemCount())
.noiseCluster(cluster.noiseCluster())
.build());
clusterRepository.flush();
clusterByLabel.put(cluster.clusterLabel(), saved);
}
if (result.assignments().isEmpty()) {
return;
}
MapSqlParameterSource[] batch = result.assignments().stream()
.map(assignment -> toInsertParams(run, clusterByLabel.get(assignment.clusterLabel()), assignment))
.toArray(MapSqlParameterSource[]::new);
jdbcTemplate.batchUpdate("""
insert into doc.doc_embedding_cluster_assignment (
id,
cluster_run_id,
cluster_id,
embedding_id,
document_id,
representation_id,
cluster_label_raw,
membership_score,
distance_to_centroid,
is_noise,
created_at
)
select
:id,
:clusterRunId,
:clusterId,
e.id,
e.document_id,
e.representation_id,
:clusterLabelRaw,
:membershipScore,
:distanceToCentroid,
:isNoise,
:createdAt
from doc.doc_embedding e
where e.id = :embeddingId
""", batch);
}
private MapSqlParameterSource toInsertParams(
EmbeddingClusterRun run,
EmbeddingCluster cluster,
ClusteringEngineAssignment assignment
) {
return new MapSqlParameterSource()
.addValue("id", UUID.randomUUID())
.addValue("clusterRunId", run.getId())
.addValue("clusterId", cluster == null ? null : cluster.getId())
.addValue("embeddingId", assignment.embeddingId())
.addValue("clusterLabelRaw", assignment.clusterLabel())
.addValue("membershipScore", assignment.membershipScore())
.addValue("distanceToCentroid", assignment.distanceToCentroid())
.addValue("isNoise", assignment.noise())
.addValue("createdAt", OffsetDateTime.now());
}
}

View File

@ -0,0 +1,580 @@
package at.procon.dip.clustering.service;
import static at.procon.dip.clustering.repository.EmbeddingClusterRunSpecifications.createdAtFrom;
import static at.procon.dip.clustering.repository.EmbeddingClusterRunSpecifications.createdAtTo;
import static at.procon.dip.clustering.repository.EmbeddingClusterRunSpecifications.hasAlgorithm;
import static at.procon.dip.clustering.repository.EmbeddingClusterRunSpecifications.hasDocumentType;
import static at.procon.dip.clustering.repository.EmbeddingClusterRunSpecifications.hasExecutionBackend;
import static at.procon.dip.clustering.repository.EmbeddingClusterRunSpecifications.hasStatus;
import static at.procon.dip.clustering.repository.EmbeddingClusterRunSpecifications.nameContains;
import at.procon.dip.clustering.ClusterRunStatus;
import at.procon.dip.clustering.ClusteringAlgorithm;
import at.procon.dip.clustering.ClusteringExecutionBackend;
import at.procon.dip.clustering.ReductionMethod;
import at.procon.dip.clustering.PythonRequestMode;
import at.procon.dip.clustering.client.PythonClusteringClient;
import at.procon.dip.clustering.config.ClusteringPhaseBProperties;
import at.procon.dip.clustering.dto.ClusterAssignmentViewResponse;
import at.procon.dip.clustering.dto.ClusterMembersResponse;
import at.procon.dip.clustering.dto.ClusterResponse;
import at.procon.dip.clustering.dto.ClusterRunResponse;
import at.procon.dip.clustering.dto.ClusteringEngineAssignment;
import at.procon.dip.clustering.dto.ClusteringEngineCluster;
import at.procon.dip.clustering.dto.ClusteringEngineRequest;
import at.procon.dip.clustering.dto.ClusteringEngineResult;
import at.procon.dip.clustering.dto.CreateClusterRunRequest;
import at.procon.dip.clustering.dto.EmbeddingSelectionSpec;
import at.procon.dip.clustering.dto.PythonClusteringRequest;
import at.procon.dip.clustering.dto.PythonClusteringResponse;
import at.procon.dip.clustering.dto.PythonRunExecutionRequest;
import at.procon.dip.clustering.dto.ReductionConfig;
import at.procon.dip.clustering.dto.SelectedEmbeddingRow;
import at.procon.dip.clustering.entity.EmbeddingClusterRun;
import at.procon.dip.clustering.entity.EmbeddingClusterSet;
import at.procon.dip.clustering.repository.EmbeddingClusterRepository;
import at.procon.dip.clustering.repository.EmbeddingClusterResultQueryRepository;
import at.procon.dip.clustering.repository.EmbeddingClusterRunRepository;
import at.procon.dip.clustering.repository.EmbeddingClusterSetRepository;
import at.procon.dip.clustering.spi.EmbeddingClusteringEngine;
import at.procon.dip.domain.document.DocumentType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.server.ResponseStatusException;
@Service
@RequiredArgsConstructor
public class EmbeddingClusterRunService {
private static final int DEFAULT_PREVIEW_LENGTH = 1000;
private static final TypeReference<Map<String, Object>> PARAMETERS_TYPE = new TypeReference<>() {
};
private static final Set<ClusterRunStatus> STARTABLE_STATUSES = Set.of(ClusterRunStatus.CREATED);
private static final Set<ClusterRunStatus> FINAL_STATUSES = Set.of(
ClusterRunStatus.COMPLETED, ClusterRunStatus.FAILED, ClusterRunStatus.CANCELLED);
private final EmbeddingClusterRunRepository runRepository;
private final EmbeddingClusterSetRepository clusterSetRepository;
private final EmbeddingClusterRepository clusterRepository;
private final EmbeddingClusterResultQueryRepository resultQueryRepository;
private final EmbeddingSelectionService selectionService;
private final EmbeddingClusterPersistenceService persistenceService;
private final List<EmbeddingClusteringEngine> clusteringEngines;
private final ClusteringPhaseBProperties pythonProperties;
private final Optional<PythonClusteringClient> pythonClusteringClient;
private final ObjectMapper objectMapper;
@Transactional
public ClusterRunResponse createRun(CreateClusterRunRequest request) {
EmbeddingClusterRun run = EmbeddingClusterRun.builder()
.clusterSet(resolveClusterSet(request.clusterSetCode()))
.name(request.name())
.status(ClusterRunStatus.CREATED)
.algorithm(request.algorithm())
.algorithmVersion("phase-e-dual-python")
.executionBackend(resolveExecutionBackend(request))
.reductionMethod(resolveReductionMethod(request.reduction()))
.reductionDimensions(resolveReductionDimensions(request.reduction()))
.selectionJson(writeJson(request.selection()))
.parametersJson(writeJson(request.resolvedParameters()))
.documentType(firstOrNull(request.selection() == null ? null : request.selection().documentTypes()))
.documentFamily(firstOrNull(request.selection() == null ? null : request.selection().documentFamilies()))
.representationType(firstOrNull(request.selection() == null ? null : request.selection().representationTypes()))
.builderKey(firstOrNullString(request.selection() == null ? null : request.selection().builderKeys()))
.build();
return toResponse(runRepository.save(run));
}
public ClusterRunResponse executeRun(UUID runId) {
EmbeddingClusterRun run = loadRun(runId);
return run.getExecutionBackend() == ClusteringExecutionBackend.PYTHON_REMOTE
? executeRunRemote(run)
: executeRunLocal(run);
}
@Transactional
public ClusterRunResponse queueRun(UUID runId) {
EmbeddingClusterRun run = loadRun(runId);
if (!STARTABLE_STATUSES.contains(run.getStatus())) {
throw new ResponseStatusException(HttpStatus.CONFLICT,
"Cluster run cannot be started from status " + run.getStatus());
}
run.setStatus(ClusterRunStatus.QUEUED);
run.setStartedAt(null);
run.setFinishedAt(null);
run.setErrorMessage(null);
run.setItemCount(null);
run.setClusterCount(null);
run.setNoiseCount(null);
return toResponse(runRepository.save(run));
}
public ClusterRunResponse executeQueuedRun(UUID runId) {
EmbeddingClusterRun run = loadRun(runId);
if (run.getStatus() == ClusterRunStatus.CANCELLED) {
return toResponse(run);
}
if (run.getStatus() != ClusterRunStatus.QUEUED && run.getStatus() != ClusterRunStatus.CANCEL_REQUESTED) {
throw new ResponseStatusException(HttpStatus.CONFLICT,
"Cluster run is not queued for execution: " + runId);
}
transitionToRunning(runId);
if (loadRun(runId).getExecutionBackend() == ClusteringExecutionBackend.PYTHON_REMOTE) {
return executeQueuedRemoteRun(runId);
}
return executeQueuedLocalRun(runId);
}
public ClusterRunResponse getRun(UUID runId) {
return toResponse(loadRun(runId));
}
public List<ClusterRunResponse> listRuns(
ClusterRunStatus status,
ClusteringAlgorithm algorithm,
ClusteringExecutionBackend executionBackend,
DocumentType documentType,
String nameLike,
OffsetDateTime createdFrom,
OffsetDateTime createdTo) {
Specification<EmbeddingClusterRun> spec = Specification
.where(hasStatus(status))
.and(hasAlgorithm(algorithm))
.and(hasExecutionBackend(executionBackend))
.and(hasDocumentType(documentType))
.and(nameContains(nameLike))
.and(createdAtFrom(createdFrom))
.and(createdAtTo(createdTo));
return runRepository.findAll(spec, Sort.by(Sort.Direction.DESC, "createdAt")).stream()
.map(this::toResponse)
.toList();
}
@Transactional
public ClusterRunResponse requestCancellation(UUID runId) {
EmbeddingClusterRun run = loadRun(runId);
if (FINAL_STATUSES.contains(run.getStatus())) {
return toResponse(run);
}
if (run.getStatus() == ClusterRunStatus.CREATED || run.getStatus() == ClusterRunStatus.QUEUED) {
run.setStatus(ClusterRunStatus.CANCELLED);
run.setFinishedAt(OffsetDateTime.now());
run.setErrorMessage("Cluster run was cancelled");
return toResponse(runRepository.save(run));
}
if (run.getStatus() == ClusterRunStatus.RUNNING) {
run.setStatus(ClusterRunStatus.CANCEL_REQUESTED);
run.setErrorMessage("Cluster run cancellation requested");
return toResponse(runRepository.save(run));
}
return toResponse(run);
}
public List<ClusterResponse> listClusters(UUID runId) {
ensureRunExists(runId);
return clusterRepository.findByClusterRun_IdOrderByClusterLabelAsc(runId).stream()
.map(cluster -> new ClusterResponse(
cluster.getId(),
cluster.getClusterLabel(),
cluster.getDisplayName(),
cluster.getItemCount(),
cluster.isNoiseCluster(),
cluster.getSummaryText()))
.toList();
}
public List<ClusterAssignmentViewResponse> listAssignments(UUID runId, boolean includeText) {
ensureRunExists(runId);
return resultQueryRepository.findAssignments(runId, includeText, DEFAULT_PREVIEW_LENGTH);
}
public List<ClusterMembersResponse> listClusterMembers(UUID runId, UUID clusterId, boolean includeText) {
ensureRunExists(runId);
return resultQueryRepository.findClusterMembers(runId, clusterId, includeText, DEFAULT_PREVIEW_LENGTH);
}
private ClusterRunResponse executeRunRemote(EmbeddingClusterRun run) {
EmbeddingSelectionSpec selection = readSelection(run.getSelectionJson());
List<SelectedEmbeddingRow> selected = null;
long itemCount;
if (useCompactPythonRunMode()) {
itemCount = selectionService.count(selection);
} else {
selected = selectionService.load(selection);
itemCount = selected.size();
}
if (itemCount == 0L) {
failRun(run.getId(), "Selection contains no completed embeddings");
return getRun(run.getId());
}
run.setStatus(ClusterRunStatus.RUNNING);
run.setStartedAt(OffsetDateTime.now());
run.setItemCount(itemCount);
run.setErrorMessage(null);
run.setFinishedAt(null);
run.setClusterCount(null);
run.setNoiseCount(null);
runRepository.save(run);
try {
ClusteringEngineResult result = executePythonClustering(run, selected);
persistenceService.persist(run, result);
run.setStatus(ClusterRunStatus.COMPLETED);
run.setFinishedAt(OffsetDateTime.now());
run.setClusterCount(result.clusters().stream().filter(cluster -> !cluster.noiseCluster()).count());
run.setNoiseCount(result.noiseCount());
runRepository.save(run);
return toResponse(run);
} catch (Exception ex) {
failRun(run.getId(), ex.getMessage());
return getRun(run.getId());
}
}
private ClusterRunResponse executeRunLocal(EmbeddingClusterRun run) {
EmbeddingSelectionSpec selection = readSelection(run.getSelectionJson());
List<SelectedEmbeddingRow> selected = selectionService.load(selection);
if (selected.isEmpty()) {
failRun(run.getId(), "Selection contains no completed embeddings");
return getRun(run.getId());
}
run.setStatus(ClusterRunStatus.RUNNING);
run.setStartedAt(OffsetDateTime.now());
run.setItemCount((long) selected.size());
run.setErrorMessage(null);
run.setFinishedAt(null);
run.setClusterCount(null);
run.setNoiseCount(null);
runRepository.save(run);
try {
ClusteringEngineResult result = executeLocalClustering(run, selected);
persistenceService.persist(run, result);
run.setStatus(ClusterRunStatus.COMPLETED);
run.setFinishedAt(OffsetDateTime.now());
run.setClusterCount(result.clusters().stream().filter(cluster -> !cluster.noiseCluster()).count());
run.setNoiseCount(result.noiseCount());
runRepository.save(run);
return toResponse(run);
} catch (Exception ex) {
failRun(run.getId(), ex.getMessage());
return getRun(run.getId());
}
}
private ClusterRunResponse executeQueuedRemoteRun(UUID runId) {
EmbeddingClusterRun run = loadRun(runId);
EmbeddingSelectionSpec selection = readSelection(run.getSelectionJson());
List<SelectedEmbeddingRow> selected = null;
long itemCount;
if (useCompactPythonRunMode()) {
itemCount = selectionService.count(selection);
} else {
selected = selectionService.load(selection);
itemCount = selected.size();
}
if (itemCount == 0L) {
failRun(runId, "Selection contains no completed embeddings");
return getRun(runId);
}
updateItemCount(runId, itemCount);
if (isCancellationRequested(runId)) {
cancelRunNow(runId, "Cluster run was cancelled before clustering started");
return getRun(runId);
}
try {
ClusteringEngineResult result = executePythonClustering(loadRun(runId), selected);
if (isCancellationRequested(runId)) {
cancelRunNow(runId, "Cluster run was cancelled before results were persisted");
return getRun(runId);
}
persistenceService.persist(loadRun(runId), result);
completeRun(runId, result);
return getRun(runId);
} catch (Exception ex) {
failRun(runId, ex.getMessage());
return getRun(runId);
}
}
private ClusterRunResponse executeQueuedLocalRun(UUID runId) {
EmbeddingSelectionSpec selection = readSelection(loadRun(runId).getSelectionJson());
List<SelectedEmbeddingRow> selected = selectionService.load(selection);
if (selected.isEmpty()) {
failRun(runId, "Selection contains no completed embeddings");
return getRun(runId);
}
updateItemCount(runId, (long) selected.size());
if (isCancellationRequested(runId)) {
cancelRunNow(runId, "Cluster run was cancelled before clustering started");
return getRun(runId);
}
try {
ClusteringEngineResult result = executeLocalClustering(loadRun(runId), selected);
if (isCancellationRequested(runId)) {
cancelRunNow(runId, "Cluster run was cancelled before results were persisted");
return getRun(runId);
}
persistenceService.persist(loadRun(runId), result);
completeRun(runId, result);
return getRun(runId);
} catch (Exception ex) {
failRun(runId, ex.getMessage());
return getRun(runId);
}
}
private void transitionToRunning(UUID runId) {
EmbeddingClusterRun run = loadRun(runId);
if (run.getStatus() == ClusterRunStatus.CANCELLED) {
return;
}
run.setStatus(ClusterRunStatus.RUNNING);
run.setStartedAt(OffsetDateTime.now());
run.setFinishedAt(null);
run.setErrorMessage(null);
runRepository.save(run);
}
private void updateItemCount(UUID runId, Long itemCount) {
EmbeddingClusterRun run = loadRun(runId);
run.setItemCount(itemCount);
runRepository.save(run);
}
private void completeRun(UUID runId, ClusteringEngineResult result) {
EmbeddingClusterRun run = loadRun(runId);
run.setStatus(ClusterRunStatus.COMPLETED);
run.setFinishedAt(OffsetDateTime.now());
run.setClusterCount(result.clusters().stream().filter(cluster -> !cluster.noiseCluster()).count());
run.setNoiseCount(result.noiseCount());
run.setErrorMessage(null);
runRepository.save(run);
}
private void cancelRunNow(UUID runId, String message) {
EmbeddingClusterRun run = loadRun(runId);
run.setStatus(ClusterRunStatus.CANCELLED);
run.setFinishedAt(OffsetDateTime.now());
run.setErrorMessage(message);
runRepository.save(run);
}
private boolean isCancellationRequested(UUID runId) {
ClusterRunStatus status = loadRun(runId).getStatus();
return status == ClusterRunStatus.CANCEL_REQUESTED || status == ClusterRunStatus.CANCELLED;
}
private ClusteringEngineResult executePythonClustering(EmbeddingClusterRun run, List<SelectedEmbeddingRow> selected) {
PythonClusteringClient client = pythonClusteringClient.orElseThrow(() -> new ResponseStatusException(
HttpStatus.BAD_REQUEST,
"Cluster run requires PYTHON_REMOTE backend but no Python client is configured"));
PythonClusteringResponse response = useCompactPythonRunMode()
? client.clusterRun(new PythonRunExecutionRequest(run.getId()))
: client.cluster(new PythonClusteringRequest(
run.getAlgorithm(),
readParameters(run.getParametersJson()),
run.getReductionMethod(),
run.getReductionDimensions(),
selected == null ? List.of() : selected.stream()
.map(item -> new PythonClusteringRequest.PythonClusteringItem(
item.embeddingId(),
item.documentId(),
item.representationId(),
item.embeddingVector()))
.toList()));
return mapPythonResponse(response);
}
private boolean useCompactPythonRunMode() {
return pythonProperties.effectiveRequestMode() == PythonRequestMode.RUN_ID;
}
private ClusteringEngineResult executeLocalClustering(EmbeddingClusterRun run, List<SelectedEmbeddingRow> selected) {
EmbeddingClusteringEngine engine = resolveEngine(run.getAlgorithm());
return engine.cluster(selected, new ClusteringEngineRequest(readParameters(run.getParametersJson())));
}
private ClusteringEngineResult mapPythonResponse(PythonClusteringResponse response) {
long noiseCount = response.noiseCount() == null
? response.assignments().stream().filter(assignment -> Boolean.TRUE.equals(assignment.noise())).count()
: response.noiseCount();
return new ClusteringEngineResult(
response.clusters().stream()
.map(cluster -> new ClusteringEngineCluster(
cluster.clusterLabel(),
cluster.itemCount(),
Boolean.TRUE.equals(cluster.noiseCluster())))
.toList(),
response.assignments().stream()
.map(assignment -> new ClusteringEngineAssignment(
assignment.embeddingId(),
assignment.documentId(),
assignment.representationId(),
assignment.clusterLabel(),
assignment.distanceToCentroid(),
assignment.membershipScore(),
Boolean.TRUE.equals(assignment.noise())))
.toList(),
noiseCount);
}
private EmbeddingClusterSet resolveClusterSet(String clusterSetCode) {
if (clusterSetCode == null || clusterSetCode.isBlank()) {
return null;
}
return clusterSetRepository.findByCode(clusterSetCode)
.orElseThrow(() -> new ResponseStatusException(HttpStatus.BAD_REQUEST,
"Cluster set not found: " + clusterSetCode));
}
private void ensureRunExists(UUID runId) {
if (!runRepository.existsById(runId)) {
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Cluster run not found: " + runId);
}
}
private void failRun(UUID runId, String message) {
EmbeddingClusterRun run = loadRun(runId);
run.setStatus(ClusterRunStatus.FAILED);
run.setFinishedAt(OffsetDateTime.now());
run.setErrorMessage(message);
runRepository.save(run);
}
private EmbeddingClusteringEngine resolveEngine(ClusteringAlgorithm algorithm) {
return clusteringEngines.stream()
.filter(engine -> engine.algorithm() == algorithm)
.findFirst()
.orElseThrow(() -> new ResponseStatusException(HttpStatus.BAD_REQUEST,
"No clustering engine registered for algorithm " + algorithm));
}
private EmbeddingSelectionSpec readSelection(String json) {
try {
return objectMapper.readValue(json, EmbeddingSelectionSpec.class);
} catch (JsonProcessingException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR,
"Cannot parse stored clustering selection", e);
}
}
private Map<String, Object> readParameters(String json) {
if (json == null || json.isBlank()) {
return Map.of();
}
try {
Map<String, Object> values = objectMapper.readValue(json, PARAMETERS_TYPE);
return values == null ? Map.of() : values;
} catch (JsonProcessingException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR,
"Cannot parse clustering parameters", e);
}
}
private String writeJson(Object value) {
try {
return objectMapper.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR,
"Cannot serialize clustering payload", e);
}
}
private ClusteringExecutionBackend resolveExecutionBackend(CreateClusterRunRequest request) {
if (request.executionBackend() != null) {
if (request.executionBackend() == ClusteringExecutionBackend.JAVA_LOCAL
&& request.reduction() != null
&& request.reduction().method() != null
&& request.reduction().method() != ReductionMethod.NONE) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
"JAVA_LOCAL backend does not support PCA/UMAP reduction; use PYTHON_REMOTE");
}
return request.executionBackend();
}
if (request.reduction() != null && request.reduction().method() != null && request.reduction().method() != ReductionMethod.NONE) {
return ClusteringExecutionBackend.PYTHON_REMOTE;
}
return switch (request.algorithm()) {
case KMEANS -> ClusteringExecutionBackend.JAVA_LOCAL;
default -> ClusteringExecutionBackend.PYTHON_REMOTE;
};
}
private ReductionMethod resolveReductionMethod(ReductionConfig reduction) {
if (reduction == null || reduction.method() == null) {
return ReductionMethod.NONE;
}
return reduction.method();
}
private Integer resolveReductionDimensions(ReductionConfig reduction) {
return reduction == null ? null : reduction.targetDimensions();
}
private <T> T firstOrNull(Iterable<T> values) {
if (values == null) {
return null;
}
for (T value : values) {
return value;
}
return null;
}
private String firstOrNullString(Iterable<String> values) {
return firstOrNull(values);
}
private ClusterRunResponse toResponse(EmbeddingClusterRun run) {
return new ClusterRunResponse(
run.getId(),
run.getName(),
run.getStatus(),
run.getAlgorithm(),
run.getExecutionBackend(),
run.getReductionMethod(),
run.getReductionDimensions(),
run.getItemCount(),
run.getClusterCount(),
run.getNoiseCount(),
run.getStartedAt(),
run.getFinishedAt(),
run.getErrorMessage());
}
private EmbeddingClusterRun loadRun(UUID runId) {
return runRepository.findById(runId)
.orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "Cluster run not found: " + runId));
}
}

View File

@ -0,0 +1,95 @@
package at.procon.dip.clustering.service;
import at.procon.dip.clustering.dto.ClusterSetResponse;
import at.procon.dip.clustering.dto.CreateClusterSetRequest;
import at.procon.dip.clustering.dto.EmbeddingSelectionSpec;
import at.procon.dip.clustering.dto.UpdateClusterSetRequest;
import at.procon.dip.clustering.entity.EmbeddingClusterSet;
import at.procon.dip.clustering.repository.EmbeddingClusterSetRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.server.ResponseStatusException;
@Service
@RequiredArgsConstructor
public class EmbeddingClusterSetService {
private final EmbeddingClusterSetRepository clusterSetRepository;
private final ObjectMapper objectMapper;
@Transactional
public ClusterSetResponse create(CreateClusterSetRequest request) {
if (clusterSetRepository.existsByCodeIgnoreCase(request.code())) {
throw new ResponseStatusException(HttpStatus.CONFLICT,
"Cluster set code already exists: " + request.code());
}
EmbeddingClusterSet saved = clusterSetRepository.save(EmbeddingClusterSet.builder()
.code(request.code().trim())
.name(request.name().trim())
.description(request.description())
.selectionJson(writeJson(request.selection()))
.active(request.active() == null || request.active())
.build());
return toResponse(saved);
}
@Transactional
public ClusterSetResponse update(UUID id, UpdateClusterSetRequest request) {
EmbeddingClusterSet existing = clusterSetRepository.findById(id)
.orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "Cluster set not found: " + id));
existing.setName(request.name().trim());
existing.setDescription(request.description());
existing.setSelectionJson(writeJson(request.selection()));
existing.setActive(request.active() == null || request.active());
return toResponse(clusterSetRepository.save(existing));
}
public ClusterSetResponse get(UUID id) {
return toResponse(clusterSetRepository.findById(id)
.orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "Cluster set not found: " + id)));
}
public List<ClusterSetResponse> list(Boolean activeOnly) {
List<EmbeddingClusterSet> sets = activeOnly == null
? clusterSetRepository.findAllByOrderByCodeAsc()
: clusterSetRepository.findAllByActiveOrderByCodeAsc(activeOnly);
return sets.stream().map(this::toResponse).toList();
}
private ClusterSetResponse toResponse(EmbeddingClusterSet entity) {
return new ClusterSetResponse(
entity.getId(),
entity.getCode(),
entity.getName(),
entity.getDescription(),
entity.isActive(),
readSelection(entity.getSelectionJson()),
entity.getCreatedAt(),
entity.getUpdatedAt()
);
}
private EmbeddingSelectionSpec readSelection(String json) {
try {
return objectMapper.readValue(json, EmbeddingSelectionSpec.class);
} catch (JsonProcessingException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR,
"Cannot parse stored cluster set selection", e);
}
}
private String writeJson(Object value) {
try {
return objectMapper.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR,
"Cannot serialize cluster set selection", e);
}
}
}

View File

@ -0,0 +1,23 @@
package at.procon.dip.clustering.service;
import at.procon.dip.clustering.dto.EmbeddingSelectionSpec;
import at.procon.dip.clustering.dto.SelectedEmbeddingRow;
import at.procon.dip.clustering.repository.DocumentEmbeddingClusterSelectionRepository;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class EmbeddingSelectionService {
private final DocumentEmbeddingClusterSelectionRepository selectionRepository;
public long count(EmbeddingSelectionSpec spec) {
return selectionRepository.countSelection(spec);
}
public List<SelectedEmbeddingRow> load(EmbeddingSelectionSpec spec) {
return selectionRepository.findSelection(spec);
}
}

View File

@ -0,0 +1,14 @@
package at.procon.dip.clustering.spi;
import at.procon.dip.clustering.ClusteringAlgorithm;
import at.procon.dip.clustering.dto.ClusteringEngineRequest;
import at.procon.dip.clustering.dto.ClusteringEngineResult;
import at.procon.dip.clustering.dto.SelectedEmbeddingRow;
import java.util.List;
public interface EmbeddingClusteringEngine {
ClusteringAlgorithm algorithm();
ClusteringEngineResult cluster(List<SelectedEmbeddingRow> items, ClusteringEngineRequest request);
}

View File

@ -0,0 +1,104 @@
package at.procon.dip.clustering.spi;
import at.procon.dip.clustering.ClusteringAlgorithm;
import at.procon.dip.clustering.dto.ClusteringEngineAssignment;
import at.procon.dip.clustering.dto.ClusteringEngineCluster;
import at.procon.dip.clustering.dto.ClusteringEngineRequest;
import at.procon.dip.clustering.dto.ClusteringEngineResult;
import at.procon.dip.clustering.dto.SelectedEmbeddingRow;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.math3.ml.clustering.CentroidCluster;
import org.apache.commons.math3.ml.clustering.DoublePoint;
import org.apache.commons.math3.ml.clustering.KMeansPlusPlusClusterer;
import org.springframework.stereotype.Component;
@Component
public class KMeansClusteringEngine implements EmbeddingClusteringEngine {
@Override
public ClusteringAlgorithm algorithm() {
return ClusteringAlgorithm.KMEANS;
}
@Override
public ClusteringEngineResult cluster(List<SelectedEmbeddingRow> items, ClusteringEngineRequest request) {
if (items == null || items.isEmpty()) {
throw new IllegalArgumentException("Selection contains no embeddings to cluster");
}
if (request == null) {
throw new IllegalArgumentException("Missing clustering request");
}
int k = request.requiredInt("k");
if (k <= 0) {
throw new IllegalArgumentException("KMeans requires k > 0");
}
if (k > items.size()) {
throw new IllegalArgumentException("KMeans k must be <= selected item count");
}
int maxIterations = request.intValue("maxIterations", 100);
List<IndexedPoint> points = new ArrayList<>(items.size());
for (int i = 0; i < items.size(); i++) {
points.add(new IndexedPoint(i, toDouble(items.get(i).embeddingVector())));
}
KMeansPlusPlusClusterer<IndexedPoint> clusterer = new KMeansPlusPlusClusterer<>(k, maxIterations);
List<CentroidCluster<IndexedPoint>> clusters = clusterer.cluster(points);
List<ClusteringEngineCluster> resultClusters = new ArrayList<>();
List<ClusteringEngineAssignment> assignments = new ArrayList<>();
for (int label = 0; label < clusters.size(); label++) {
CentroidCluster<IndexedPoint> cluster = clusters.get(label);
resultClusters.add(new ClusteringEngineCluster(label, cluster.getPoints().size(), false));
double[] centroid = cluster.getCenter().getPoint();
for (IndexedPoint point : cluster.getPoints()) {
SelectedEmbeddingRow item = items.get(point.index());
assignments.add(new ClusteringEngineAssignment(
item.embeddingId(),
item.documentId(),
item.representationId(),
label,
euclidean(point.getPoint(), centroid),
null,
false
));
}
}
return new ClusteringEngineResult(resultClusters, assignments, 0L);
}
private double[] toDouble(float[] values) {
double[] result = new double[values.length];
for (int i = 0; i < values.length; i++) {
result[i] = values[i];
}
return result;
}
private double euclidean(double[] a, double[] b) {
double sum = 0.0d;
for (int i = 0; i < a.length; i++) {
double d = a[i] - b[i];
sum += d * d;
}
return Math.sqrt(sum);
}
private static class IndexedPoint extends DoublePoint {
private final int index;
IndexedPoint(int index, double[] point) {
super(point);
this.index = index;
}
int index() {
return index;
}
}
}

View File

@ -0,0 +1,130 @@
package at.procon.dip.clustering.web;
import at.procon.dip.clustering.ClusterRunStatus;
import at.procon.dip.clustering.ClusteringAlgorithm;
import at.procon.dip.clustering.ClusteringExecutionBackend;
import at.procon.dip.clustering.dto.ClusterAssignmentViewResponse;
import at.procon.dip.clustering.dto.ClusterMembersResponse;
import at.procon.dip.clustering.dto.ClusterResponse;
import at.procon.dip.clustering.dto.ClusterRunResponse;
import at.procon.dip.clustering.dto.ClusterSetResponse;
import at.procon.dip.clustering.dto.CreateClusterRunRequest;
import at.procon.dip.clustering.dto.CreateClusterSetRequest;
import at.procon.dip.clustering.dto.EmbeddingSelectionSpec;
import at.procon.dip.clustering.dto.SelectionCountResponse;
import at.procon.dip.clustering.dto.UpdateClusterSetRequest;
import at.procon.dip.clustering.service.EmbeddingClusterAsyncExecutionService;
import at.procon.dip.clustering.service.EmbeddingClusterRunService;
import at.procon.dip.clustering.service.EmbeddingClusterSetService;
import at.procon.dip.clustering.service.EmbeddingSelectionService;
import at.procon.dip.domain.document.DocumentType;
import jakarta.validation.Valid;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/v1/dip/clustering")
@RequiredArgsConstructor
public class EmbeddingClusterController {
private final EmbeddingSelectionService selectionService;
private final EmbeddingClusterSetService clusterSetService;
private final EmbeddingClusterRunService runService;
private final EmbeddingClusterAsyncExecutionService asyncExecutionService;
@PostMapping("/selection/count")
public ResponseEntity<SelectionCountResponse> countSelection(@RequestBody EmbeddingSelectionSpec selection) {
return ResponseEntity.ok(new SelectionCountResponse(selectionService.count(selection)));
}
@PostMapping("/sets")
public ResponseEntity<ClusterSetResponse> createSet(@Valid @RequestBody CreateClusterSetRequest request) {
return ResponseEntity.ok(clusterSetService.create(request));
}
@PutMapping("/sets/{id}")
public ResponseEntity<ClusterSetResponse> updateSet(@PathVariable UUID id,
@Valid @RequestBody UpdateClusterSetRequest request) {
return ResponseEntity.ok(clusterSetService.update(id, request));
}
@GetMapping("/sets")
public ResponseEntity<List<ClusterSetResponse>> listSets(
@RequestParam(name = "active", required = false) Boolean active) {
return ResponseEntity.ok(clusterSetService.list(active));
}
@GetMapping("/sets/{id}")
public ResponseEntity<ClusterSetResponse> getSet(@PathVariable UUID id) {
return ResponseEntity.ok(clusterSetService.get(id));
}
@PostMapping("/runs")
public ResponseEntity<ClusterRunResponse> createRun(@Valid @RequestBody CreateClusterRunRequest request) {
return ResponseEntity.ok(runService.createRun(request));
}
@GetMapping("/runs")
public ResponseEntity<List<ClusterRunResponse>> listRuns(
@RequestParam(name = "status", required = false) ClusterRunStatus status,
@RequestParam(name = "algorithm", required = false) ClusteringAlgorithm algorithm,
@RequestParam(name = "executionBackend", required = false) ClusteringExecutionBackend executionBackend,
@RequestParam(name = "documentType", required = false) DocumentType documentType,
@RequestParam(name = "nameLike", required = false) String nameLike,
@RequestParam(name = "createdFrom", required = false)
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime createdFrom,
@RequestParam(name = "createdTo", required = false)
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime createdTo) {
return ResponseEntity.ok(runService.listRuns(
status, algorithm, executionBackend, documentType, nameLike, createdFrom, createdTo));
}
@PostMapping("/runs/{id}/start")
public ResponseEntity<ClusterRunResponse> startRun(@PathVariable UUID id) {
ClusterRunResponse queued = runService.queueRun(id);
asyncExecutionService.executeRunAsync(id);
return ResponseEntity.accepted().body(queued);
}
@PostMapping("/runs/{id}/cancel")
public ResponseEntity<ClusterRunResponse> cancelRun(@PathVariable UUID id) {
return ResponseEntity.ok(runService.requestCancellation(id));
}
@GetMapping("/runs/{id}")
public ResponseEntity<ClusterRunResponse> getRun(@PathVariable UUID id) {
return ResponseEntity.ok(runService.getRun(id));
}
@GetMapping("/runs/{id}/clusters")
public ResponseEntity<List<ClusterResponse>> listClusters(@PathVariable UUID id) {
return ResponseEntity.ok(runService.listClusters(id));
}
@GetMapping("/runs/{id}/assignments")
public ResponseEntity<List<ClusterAssignmentViewResponse>> listAssignments(
@PathVariable UUID id,
@RequestParam(name = "includeText", defaultValue = "false") boolean includeText) {
return ResponseEntity.ok(runService.listAssignments(id, includeText));
}
@GetMapping("/runs/{runId}/clusters/{clusterId}/members")
public ResponseEntity<List<ClusterMembersResponse>> listClusterMembers(
@PathVariable UUID runId,
@PathVariable UUID clusterId,
@RequestParam(name = "includeText", defaultValue = "false") boolean includeText) {
return ResponseEntity.ok(runService.listClusterMembers(runId, clusterId, includeText));
}
}

View File

@ -77,7 +77,7 @@ public class Document {
@Builder.Default @Builder.Default
private DocumentStatus status = DocumentStatus.RECEIVED; private DocumentStatus status = DocumentStatus.RECEIVED;
@Column(name = "title", length = 1000) @Column(name = "title", columnDefinition = "TEXT")
private String title; private String title;
@Column(name = "summary", columnDefinition = "TEXT") @Column(name = "summary", columnDefinition = "TEXT")

View File

@ -176,6 +176,16 @@ dip:
profile-key: disabled profile-key: disabled
enabled: false enabled: false
clustering:
python:
enabled: true
base-url: http://localhost:8001
cluster-path: /cluster
cluster-run-path: /cluster-run
request-mode: INLINE_VECTORS
connect-timeout: 30s
read-timeout: 30m
# Phase 4 generic ingestion configuration # Phase 4 generic ingestion configuration
ingestion: ingestion:
# Master switch for arbitrary document ingestion into the DOC model # Master switch for arbitrary document ingestion into the DOC model
@ -275,7 +285,7 @@ dip:
# ted packages download configuration # ted packages download configuration
ted-download: ted-download:
# Enable/disable automatic package download # Enable/disable automatic package download
enabled: true enabled: false
# Base URL for TED Daily Packages # Base URL for TED Daily Packages
base-url: https://ted.europa.eu/packages/daily/ base-url: https://ted.europa.eu/packages/daily/
# Download directory for tar.gz files # Download directory for tar.gz files
@ -304,6 +314,10 @@ dip:
leitstand: leitstand:
enabled: false enabled: false
startup-sync-enabled: false startup-sync-enabled: false
startup-selective-materialization-enabled: true
selective-materialization-person-dbk: 100920031023144811001000
selective-materialization-person-number:
selective-materialization-build-projection: true
create-canonical-time-entries: true create-canonical-time-entries: true
build-search-projection: true build-search-projection: true
build-representations: true build-representations: true

View File

@ -0,0 +1,107 @@
CREATE TABLE IF NOT EXISTS doc.doc_embedding_cluster_set (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code VARCHAR(128) NOT NULL UNIQUE,
name VARCHAR(255) NOT NULL,
description TEXT,
selection_json JSONB NOT NULL,
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS doc.doc_embedding_cluster_run (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
cluster_set_id UUID,
name VARCHAR(255) NOT NULL,
status VARCHAR(32) NOT NULL,
algorithm VARCHAR(64) NOT NULL,
algorithm_version VARCHAR(64),
selection_json JSONB NOT NULL,
parameters_json JSONB NOT NULL,
embedding_model_id UUID,
prefix_profile_id UUID,
document_type VARCHAR(64),
document_family VARCHAR(64),
representation_type VARCHAR(64),
builder_key VARCHAR(255),
item_count BIGINT,
cluster_count BIGINT,
noise_count BIGINT,
started_at TIMESTAMP WITH TIME ZONE,
finished_at TIMESTAMP WITH TIME ZONE,
error_message TEXT,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT fk_doc_cluster_run_set
FOREIGN KEY (cluster_set_id)
REFERENCES doc.doc_embedding_cluster_set(id),
CONSTRAINT fk_doc_cluster_run_model
FOREIGN KEY (embedding_model_id)
REFERENCES doc.doc_embedding_model(id),
CONSTRAINT fk_doc_cluster_run_prefix_profile
FOREIGN KEY (prefix_profile_id)
REFERENCES doc.doc_embedding_prefix_profile(id)
);
CREATE INDEX IF NOT EXISTS idx_doc_cluster_run_status
ON doc.doc_embedding_cluster_run(status);
CREATE INDEX IF NOT EXISTS idx_doc_cluster_run_algorithm
ON doc.doc_embedding_cluster_run(algorithm);
CREATE INDEX IF NOT EXISTS idx_doc_cluster_run_created_at
ON doc.doc_embedding_cluster_run(created_at);
CREATE TABLE IF NOT EXISTS doc.doc_embedding_cluster (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
cluster_run_id UUID NOT NULL,
cluster_label INTEGER NOT NULL,
display_name VARCHAR(255),
item_count BIGINT NOT NULL,
is_noise_cluster BOOLEAN NOT NULL DEFAULT FALSE,
summary_text TEXT,
top_terms_json JSONB,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT fk_doc_cluster_cluster_run
FOREIGN KEY (cluster_run_id)
REFERENCES doc.doc_embedding_cluster_run(id)
ON DELETE CASCADE,
CONSTRAINT uq_doc_cluster_run_label
UNIQUE (cluster_run_id, cluster_label)
);
CREATE INDEX IF NOT EXISTS idx_doc_cluster_cluster_run
ON doc.doc_embedding_cluster(cluster_run_id);
CREATE TABLE IF NOT EXISTS doc.doc_embedding_cluster_assignment (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
cluster_run_id UUID NOT NULL,
cluster_id UUID,
embedding_id UUID NOT NULL,
document_id UUID NOT NULL,
representation_id UUID NOT NULL,
cluster_label_raw INTEGER NOT NULL,
membership_score DOUBLE PRECISION,
distance_to_centroid DOUBLE PRECISION,
is_noise BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT fk_doc_cluster_assignment_run
FOREIGN KEY (cluster_run_id)
REFERENCES doc.doc_embedding_cluster_run(id)
ON DELETE CASCADE,
CONSTRAINT fk_doc_cluster_assignment_cluster
FOREIGN KEY (cluster_id)
REFERENCES doc.doc_embedding_cluster(id)
ON DELETE CASCADE,
CONSTRAINT fk_doc_cluster_assignment_embedding
FOREIGN KEY (embedding_id)
REFERENCES doc.doc_embedding(id),
CONSTRAINT uq_doc_cluster_run_embedding
UNIQUE (cluster_run_id, embedding_id)
);
CREATE INDEX IF NOT EXISTS idx_doc_cluster_assignment_run
ON doc.doc_embedding_cluster_assignment(cluster_run_id);
CREATE INDEX IF NOT EXISTS idx_doc_cluster_assignment_cluster
ON doc.doc_embedding_cluster_assignment(cluster_id);
CREATE INDEX IF NOT EXISTS idx_doc_cluster_assignment_document
ON doc.doc_embedding_cluster_assignment(cluster_run_id, document_id);
CREATE INDEX IF NOT EXISTS idx_doc_cluster_assignment_noise
ON doc.doc_embedding_cluster_assignment(cluster_run_id, is_noise);

View File

@ -0,0 +1,7 @@
ALTER TABLE doc.doc_embedding_cluster_run
ADD COLUMN IF NOT EXISTS execution_backend VARCHAR(64),
ADD COLUMN IF NOT EXISTS reduction_method VARCHAR(32),
ADD COLUMN IF NOT EXISTS reduction_dimensions INTEGER;
CREATE INDEX IF NOT EXISTS idx_doc_cluster_run_backend
ON doc.doc_embedding_cluster_run(execution_backend);

View File

@ -0,0 +1,52 @@
-- V31__doc_embedding_clustering_enum_constraints.sql
-- Updates check constraints for clustering run enums after adding new algorithms and statuses.
ALTER TABLE doc.doc_embedding_cluster_run
DROP CONSTRAINT IF EXISTS doc_embedding_cluster_run_algorithm_check;
ALTER TABLE doc.doc_embedding_cluster_run
ADD CONSTRAINT doc_embedding_cluster_run_algorithm_check
CHECK (algorithm IN (
'KMEANS',
'MINI_BATCH_KMEANS',
'DBSCAN',
'HDBSCAN',
'AGGLOMERATIVE'
));
ALTER TABLE doc.doc_embedding_cluster_run
DROP CONSTRAINT IF EXISTS doc_embedding_cluster_run_status_check;
ALTER TABLE doc.doc_embedding_cluster_run
ADD CONSTRAINT doc_embedding_cluster_run_status_check
CHECK (status IN (
'CREATED',
'QUEUED',
'RUNNING',
'CANCEL_REQUESTED',
'COMPLETED',
'FAILED',
'CANCELLED'
));
-- Optional hardening in case these columns were also created with check constraints.
ALTER TABLE doc.doc_embedding_cluster_run
DROP CONSTRAINT IF EXISTS doc_embedding_cluster_run_execution_backend_check;
ALTER TABLE doc.doc_embedding_cluster_run
ADD CONSTRAINT doc_embedding_cluster_run_execution_backend_check
CHECK (execution_backend IN (
'JAVA_LOCAL',
'PYTHON_REMOTE'
));
ALTER TABLE doc.doc_embedding_cluster_run
DROP CONSTRAINT IF EXISTS doc_embedding_cluster_run_reduction_method_check;
ALTER TABLE doc.doc_embedding_cluster_run
ADD CONSTRAINT doc_embedding_cluster_run_reduction_method_check
CHECK (reduction_method IN (
'NONE',
'PCA',
'UMAP'
));