DIP/python/dip-clustering-service/app/cluster_service.py

312 lines
11 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import numpy as np
from sklearn.cluster import AgglomerativeClustering, DBSCAN, KMeans, MiniBatchKMeans
from sklearn.decomposition import PCA
from sklearn.preprocessing import normalize
try:
import hdbscan
except Exception: # pragma: no cover - runtime dependency guard
hdbscan = None
try:
import umap
except Exception: # pragma: no cover - runtime dependency guard
umap = None
from .models import (
ClusteringAlgorithm,
PythonAssignment,
PythonCluster,
PythonClusteringItem,
PythonClusteringRequest,
PythonClusteringResponse,
ReductionMethod,
RunMetadata,
)
class ClusteringServiceError(ValueError):
pass
@dataclass
class PreparedData:
original: np.ndarray
transformed: np.ndarray
items: list[PythonClusteringItem]
def cluster_embeddings(request: PythonClusteringRequest) -> PythonClusteringResponse:
return cluster_items(
algorithm=request.algorithm,
parameters=request.parameters or {},
reduction_method=request.reductionMethod,
reduction_dimensions=request.reductionDimensions,
items=request.items,
)
def cluster_run(metadata: RunMetadata, items: list[PythonClusteringItem]) -> PythonClusteringResponse:
return cluster_items(
algorithm=metadata.algorithm,
parameters=metadata.parameters or {},
reduction_method=metadata.reductionMethod,
reduction_dimensions=metadata.reductionDimensions,
items=items,
)
def cluster_items(
algorithm: ClusteringAlgorithm,
parameters: dict[str, Any],
reduction_method: ReductionMethod,
reduction_dimensions: int | None,
items: list[PythonClusteringItem],
) -> PythonClusteringResponse:
if not items:
raise ClusteringServiceError("Request contains no items")
prepared = _prepare_data(
items=items,
parameters=parameters,
reduction_method=reduction_method,
reduction_dimensions=reduction_dimensions,
)
labels, membership_scores = _run_algorithm(
algorithm=algorithm,
vectors=prepared.transformed,
parameters=parameters,
)
return _build_response(prepared, labels, membership_scores)
def _prepare_data(
items: list[PythonClusteringItem],
parameters: dict[str, Any],
reduction_method: ReductionMethod,
reduction_dimensions: int | None,
) -> PreparedData:
vectors = np.asarray([item.vector for item in items], dtype=np.float32)
if vectors.ndim != 2 or vectors.shape[0] == 0:
raise ClusteringServiceError("Vectors must form a non-empty 2D array")
if _bool_param(parameters, "normalizeVectors", True):
vectors = normalize(vectors, norm="l2")
transformed = vectors
if reduction_method == ReductionMethod.PCA:
target_dims = reduction_dimensions
if target_dims is None:
raise ClusteringServiceError("PCA reduction requires reductionDimensions")
max_components = min(transformed.shape[0], transformed.shape[1])
if target_dims <= 0 or target_dims > max_components:
raise ClusteringServiceError(
f"PCA reductionDimensions must be between 1 and {max_components}"
)
pca = PCA(
n_components=target_dims,
random_state=_int_param(parameters, "randomState", 42),
)
transformed = pca.fit_transform(transformed)
elif reduction_method == ReductionMethod.UMAP:
target_dims = reduction_dimensions
if target_dims is None:
raise ClusteringServiceError("UMAP reduction requires reductionDimensions")
if umap is None:
raise ClusteringServiceError("UMAP reduction requested but umap-learn is not installed")
reducer = umap.UMAP(
n_components=target_dims,
metric=_str_param(parameters, "reductionMetric", "cosine"),
n_neighbors=_int_param(parameters, "umapNeighbors", 15),
min_dist=_float_param(parameters, "umapMinDist", 0.0),
random_state=_int_param(parameters, "randomState", 42),
)
transformed = reducer.fit_transform(transformed)
return PreparedData(original=vectors, transformed=np.asarray(transformed, dtype=np.float32), items=items)
def _run_algorithm(
algorithm: ClusteringAlgorithm,
vectors: np.ndarray,
parameters: dict[str, Any],
) -> tuple[np.ndarray, np.ndarray | None]:
if algorithm == ClusteringAlgorithm.KMEANS:
k = _required_int_param(parameters, "k")
model = KMeans(
n_clusters=k,
random_state=_int_param(parameters, "randomState", 42),
n_init=_int_param(parameters, "nInit", 10),
max_iter=_int_param(parameters, "maxIter", 300),
)
labels = model.fit_predict(vectors)
return np.asarray(labels, dtype=np.int32), None
if algorithm == ClusteringAlgorithm.MINI_BATCH_KMEANS:
k = _required_int_param(parameters, "k")
batch_size = _int_param(parameters, "batchSize", min(max(k * 16, 256), 4096))
model = MiniBatchKMeans(
n_clusters=k,
random_state=_int_param(parameters, "randomState", 42),
n_init=_int_param(parameters, "nInit", 10),
max_iter=_int_param(parameters, "maxIter", 300),
batch_size=batch_size,
)
labels = model.fit_predict(vectors)
return np.asarray(labels, dtype=np.int32), None
if algorithm == ClusteringAlgorithm.DBSCAN:
eps = _required_float_param(parameters, "eps")
model = DBSCAN(
eps=eps,
min_samples=_int_param(parameters, "minSamples", 5),
metric=_str_param(parameters, "metric", "euclidean"),
algorithm=_str_param(parameters, "algorithm", "auto"),
n_jobs=_int_param(parameters, "nJobs", -1),
)
labels = model.fit_predict(vectors)
return np.asarray(labels, dtype=np.int32), None
if algorithm == ClusteringAlgorithm.HDBSCAN:
if hdbscan is None:
raise ClusteringServiceError("HDBSCAN requested but hdbscan is not installed")
model = hdbscan.HDBSCAN(
min_cluster_size=_int_param(parameters, "minClusterSize", 10),
min_samples=_nullable_int_param(parameters, "minSamples"),
metric=_str_param(parameters, "metric", "euclidean"),
cluster_selection_method=_str_param(parameters, "clusterSelectionMethod", "eom"),
)
labels = model.fit_predict(vectors)
probabilities = getattr(model, "probabilities_", None)
return np.asarray(labels, dtype=np.int32), None if probabilities is None else np.asarray(probabilities, dtype=np.float32)
if algorithm == ClusteringAlgorithm.AGGLOMERATIVE:
k = _required_int_param(parameters, "k")
linkage = _str_param(parameters, "linkage", "average")
metric = _str_param(parameters, "metric", "euclidean")
if linkage == "ward":
metric = "euclidean"
model = AgglomerativeClustering(
n_clusters=k,
linkage=linkage,
metric=metric,
compute_distances=_bool_param(parameters, "computeDistances", False),
)
labels = model.fit_predict(vectors)
return np.asarray(labels, dtype=np.int32), None
raise ClusteringServiceError(f"Unsupported algorithm: {algorithm}")
def _build_response(
prepared: PreparedData,
labels: np.ndarray,
membership_scores: np.ndarray | None,
) -> PythonClusteringResponse:
unique_labels = sorted(int(label) for label in np.unique(labels))
clusters: list[PythonCluster] = []
assignments: list[PythonAssignment] = []
centroids: dict[int, np.ndarray] = {}
for label in unique_labels:
mask = labels == label
item_count = int(mask.sum())
noise_cluster = label == -1
clusters.append(PythonCluster(clusterLabel=label, itemCount=item_count, noiseCluster=noise_cluster))
if not noise_cluster:
centroids[label] = prepared.transformed[mask].mean(axis=0)
for index, item in enumerate(prepared.items):
label = int(labels[index])
noise = label == -1
distance = None if noise else float(np.linalg.norm(prepared.transformed[index] - centroids[label]))
membership = None
if membership_scores is not None:
membership = float(membership_scores[index])
assignments.append(
PythonAssignment(
embeddingId=item.embeddingId,
clusterLabel=label,
distanceToCentroid=distance,
membershipScore=membership,
noise=noise,
)
)
noise_count = int((labels == -1).sum())
return PythonClusteringResponse(clusters=clusters, assignments=assignments, noiseCount=noise_count)
def _required_int_param(parameters: dict[str, Any], key: str) -> int:
if key not in parameters or parameters[key] is None:
raise ClusteringServiceError(f"Missing required parameter: {key}")
return _coerce_int(parameters[key], key)
def _required_float_param(parameters: dict[str, Any], key: str) -> float:
if key not in parameters or parameters[key] is None:
raise ClusteringServiceError(f"Missing required parameter: {key}")
return _coerce_float(parameters[key], key)
def _nullable_int_param(parameters: dict[str, Any], key: str) -> int | None:
if key not in parameters or parameters[key] is None:
return None
return _coerce_int(parameters[key], key)
def _int_param(parameters: dict[str, Any], key: str, default: int) -> int:
if key not in parameters or parameters[key] is None:
return default
return _coerce_int(parameters[key], key)
def _float_param(parameters: dict[str, Any], key: str, default: float) -> float:
if key not in parameters or parameters[key] is None:
return default
return _coerce_float(parameters[key], key)
def _bool_param(parameters: dict[str, Any], key: str, default: bool) -> bool:
if key not in parameters or parameters[key] is None:
return default
value = parameters[key]
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"true", "1", "yes", "y"}:
return True
if normalized in {"false", "0", "no", "n"}:
return False
raise ClusteringServiceError(f"Parameter {key} must be boolean-compatible")
def _str_param(parameters: dict[str, Any], key: str, default: str) -> str:
if key not in parameters or parameters[key] is None:
return default
return str(parameters[key])
def _coerce_int(value: Any, key: str) -> int:
if isinstance(value, bool):
raise ClusteringServiceError(f"Parameter {key} must be integer-compatible")
try:
return int(value)
except (TypeError, ValueError) as exc:
raise ClusteringServiceError(f"Parameter {key} must be integer-compatible") from exc
def _coerce_float(value: Any, key: str) -> float:
if isinstance(value, bool):
raise ClusteringServiceError(f"Parameter {key} must be float-compatible")
try:
return float(value)
except (TypeError, ValueError) as exc:
raise ClusteringServiceError(f"Parameter {key} must be float-compatible") from exc