295 lines
12 KiB
Java
295 lines
12 KiB
Java
package at.procon.eventhub.persistence;
|
|
|
|
import at.procon.eventhub.dto.DataPackageStatus;
|
|
import at.procon.eventhub.dto.DataPackageType;
|
|
import at.procon.eventhub.dto.EventHubPackageRequest;
|
|
import at.procon.eventhub.dto.ImportScopeDto;
|
|
import at.procon.eventhub.dto.SourceGroupRefDto;
|
|
import at.procon.eventhub.dto.SourcePackageRefDto;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import java.lang.reflect.Array;
|
|
import java.time.OffsetDateTime;
|
|
import java.time.temporal.TemporalAccessor;
|
|
import java.util.ArrayList;
|
|
import java.util.Date;
|
|
import java.util.LinkedHashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.UUID;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Repository;
|
|
import org.springframework.transaction.annotation.Propagation;
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
@Repository
|
|
public class DataPackageRepository {
|
|
|
|
private final JdbcTemplate jdbcTemplate;
|
|
private final ObjectMapper objectMapper;
|
|
|
|
public DataPackageRepository(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
|
|
this.jdbcTemplate = jdbcTemplate;
|
|
this.objectMapper = objectMapper;
|
|
}
|
|
|
|
public UUID createPackage(
|
|
int eventSourceId,
|
|
String packageKey,
|
|
EventHubPackageRequest packageInfo,
|
|
DataPackageType packageType,
|
|
OffsetDateTime occurredFrom,
|
|
OffsetDateTime occurredTo,
|
|
Map<String, Object> metadata
|
|
) {
|
|
return insertPackage(
|
|
eventSourceId,
|
|
null,
|
|
packageKey,
|
|
packageInfo,
|
|
packageType,
|
|
DataPackageStatus.IMPORTING,
|
|
occurredFrom,
|
|
occurredTo,
|
|
null,
|
|
null,
|
|
null,
|
|
null,
|
|
null,
|
|
null,
|
|
null,
|
|
null,
|
|
metadata
|
|
);
|
|
}
|
|
|
|
public UUID createPlannedExtractionPackage(
|
|
UUID importRunId,
|
|
int eventSourceId,
|
|
String packageKey,
|
|
EventHubPackageRequest packageInfo,
|
|
String extractionCode,
|
|
String extractionSourceKind,
|
|
String entityAxis,
|
|
OffsetDateTime chunkFrom,
|
|
OffsetDateTime chunkTo,
|
|
int batchNo,
|
|
Map<String, Object> metadata
|
|
) {
|
|
return insertPackage(
|
|
eventSourceId,
|
|
importRunId,
|
|
packageKey,
|
|
packageInfo,
|
|
DataPackageType.DB_EXTRACT,
|
|
DataPackageStatus.PLANNED,
|
|
chunkFrom,
|
|
chunkTo,
|
|
extractionCode,
|
|
extractionSourceKind,
|
|
entityAxis,
|
|
batchNo,
|
|
chunkFrom,
|
|
chunkTo,
|
|
null,
|
|
null,
|
|
metadata
|
|
);
|
|
}
|
|
|
|
private UUID insertPackage(
|
|
int eventSourceId,
|
|
UUID importRunId,
|
|
String packageKey,
|
|
EventHubPackageRequest packageInfo,
|
|
DataPackageType packageType,
|
|
DataPackageStatus status,
|
|
OffsetDateTime occurredFrom,
|
|
OffsetDateTime occurredTo,
|
|
String extractionCode,
|
|
String extractionSourceKind,
|
|
String entityAxis,
|
|
Integer batchNo,
|
|
OffsetDateTime chunkFrom,
|
|
OffsetDateTime chunkTo,
|
|
SourcePackageRefDto sourcePackageRef,
|
|
Integer eventCount,
|
|
Map<String, Object> metadata
|
|
) {
|
|
UUID id = UUID.randomUUID();
|
|
SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup();
|
|
ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope();
|
|
SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation();
|
|
|
|
return jdbcTemplate.query(
|
|
con -> {
|
|
var ps = con.prepareStatement("""
|
|
insert into eventhub.data_package(
|
|
id, event_source_id, import_run_id, tenant_key, package_key, package_type, status,
|
|
source_group_type, source_group_entity_id, source_group_code, source_group_name,
|
|
import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name,
|
|
include_children, occurred_from, occurred_to,
|
|
event_family, business_date, external_package_id,
|
|
extraction_code, extraction_source_kind, entity_axis, batch_no, chunk_from, chunk_to,
|
|
source_package_kind, source_package_id, source_package_entity_id,
|
|
source_package_period_from, source_package_period_to, source_package_imported_at,
|
|
received_at, event_count, metadata
|
|
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), ?, ?::jsonb)
|
|
on conflict (tenant_key, event_source_id, package_key) do update
|
|
set metadata = excluded.metadata
|
|
returning id
|
|
""");
|
|
ps.setObject(1, id);
|
|
ps.setInt(2, eventSourceId);
|
|
ps.setObject(3, importRunId);
|
|
ps.setString(4, packageInfo == null ? "default" : packageInfo.tenantKey());
|
|
ps.setString(5, packageKey);
|
|
ps.setString(6, packageType.name());
|
|
ps.setString(7, status.name());
|
|
ps.setString(8, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name());
|
|
ps.setString(9, sourceGroup == null ? null : sourceGroup.sourceEntityId());
|
|
ps.setString(10, sourceGroup == null ? null : sourceGroup.code());
|
|
ps.setString(11, sourceGroup == null ? null : sourceGroup.name());
|
|
ps.setString(12, importScope == null || importScope.type() == null ? null : importScope.type().name());
|
|
ps.setString(13, rootOrg == null ? null : rootOrg.sourceEntityId());
|
|
ps.setString(14, rootOrg == null ? null : rootOrg.code());
|
|
ps.setString(15, rootOrg == null ? null : rootOrg.name());
|
|
ps.setBoolean(16, importScope != null && importScope.includeChildren());
|
|
ps.setObject(17, occurredFrom);
|
|
ps.setObject(18, occurredTo);
|
|
ps.setString(19, packageInfo == null ? null : packageInfo.eventFamily());
|
|
ps.setObject(20, packageInfo == null ? null : packageInfo.businessDate());
|
|
ps.setString(21, packageInfo == null ? packageKey : packageInfo.externalPackageId());
|
|
ps.setString(22, extractionCode);
|
|
ps.setString(23, extractionSourceKind);
|
|
ps.setString(24, entityAxis);
|
|
ps.setObject(25, batchNo);
|
|
ps.setObject(26, chunkFrom);
|
|
ps.setObject(27, chunkTo);
|
|
ps.setString(28, sourcePackageRef == null ? null : sourcePackageRef.packageKind());
|
|
ps.setString(29, sourcePackageRef == null ? null : sourcePackageRef.sourcePackageId());
|
|
ps.setString(30, sourcePackageRef == null ? null : sourcePackageRef.sourceEntityId());
|
|
ps.setObject(31, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodFrom());
|
|
ps.setObject(32, sourcePackageRef == null ? null : sourcePackageRef.packagePeriodTo());
|
|
ps.setObject(33, sourcePackageRef == null ? null : sourcePackageRef.importedIntoSourceAt());
|
|
ps.setInt(34, eventCount == null ? 0 : eventCount);
|
|
ps.setString(35, toJson(metadata));
|
|
return ps;
|
|
},
|
|
rs -> {
|
|
if (!rs.next()) {
|
|
throw new IllegalStateException("Could not create or resolve data package " + packageKey);
|
|
}
|
|
return (UUID) rs.getObject(1);
|
|
}
|
|
);
|
|
}
|
|
|
|
public void markImporting(UUID packageId) {
|
|
jdbcTemplate.update(
|
|
"""
|
|
update eventhub.data_package
|
|
set status = ?
|
|
where id = ?
|
|
""",
|
|
DataPackageStatus.IMPORTING.name(),
|
|
packageId
|
|
);
|
|
}
|
|
|
|
public void markImported(UUID packageId, int insertedCount) {
|
|
jdbcTemplate.update(
|
|
"""
|
|
update eventhub.data_package
|
|
set status = ?, event_count = ?, completed_at = now()
|
|
where id = ?
|
|
""",
|
|
insertedCount == 0 ? DataPackageStatus.EMPTY.name() : DataPackageStatus.IMPORTED.name(),
|
|
insertedCount,
|
|
packageId
|
|
);
|
|
}
|
|
|
|
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
|
public void markFailed(UUID packageId, String errorMessage) {
|
|
jdbcTemplate.update(
|
|
"""
|
|
update eventhub.data_package
|
|
set status = ?, error_message = ?, completed_at = now()
|
|
where id = ?
|
|
""",
|
|
DataPackageStatus.FAILED.name(),
|
|
errorMessage,
|
|
packageId
|
|
);
|
|
}
|
|
|
|
private String toJson(Map<String, Object> value) {
|
|
try {
|
|
return objectMapper.writeValueAsString(normalizeMetadataMap(value));
|
|
} catch (JsonProcessingException e) {
|
|
throw new IllegalArgumentException("Cannot serialize package metadata", e);
|
|
}
|
|
}
|
|
|
|
private Map<String, Object> normalizeMetadataMap(Map<String, Object> value) {
|
|
if (value == null || value.isEmpty()) {
|
|
return Map.of();
|
|
}
|
|
Map<String, Object> normalized = new LinkedHashMap<>();
|
|
for (Map.Entry<String, Object> entry : value.entrySet()) {
|
|
normalized.put(entry.getKey(), normalizeMetadataValue(entry.getValue()));
|
|
}
|
|
return normalized;
|
|
}
|
|
|
|
private Object normalizeMetadataValue(Object value) {
|
|
if (value == null
|
|
|| value instanceof String
|
|
|| value instanceof Number
|
|
|| value instanceof Boolean
|
|
|| value instanceof JsonNode) {
|
|
return value;
|
|
}
|
|
if (value instanceof CharSequence charSequence) {
|
|
return charSequence.toString();
|
|
}
|
|
if (value instanceof Enum<?> enumValue) {
|
|
return enumValue.name();
|
|
}
|
|
if (value instanceof UUID uuid) {
|
|
return uuid.toString();
|
|
}
|
|
if (value instanceof TemporalAccessor temporalValue) {
|
|
return temporalValue.toString();
|
|
}
|
|
if (value instanceof Date dateValue) {
|
|
return dateValue.toInstant().toString();
|
|
}
|
|
if (value instanceof Map<?, ?> mapValue) {
|
|
Map<String, Object> nested = new LinkedHashMap<>();
|
|
for (Map.Entry<?, ?> entry : mapValue.entrySet()) {
|
|
nested.put(String.valueOf(entry.getKey()), normalizeMetadataValue(entry.getValue()));
|
|
}
|
|
return nested;
|
|
}
|
|
if (value instanceof Iterable<?> iterable) {
|
|
List<Object> nested = new ArrayList<>();
|
|
for (Object item : iterable) {
|
|
nested.add(normalizeMetadataValue(item));
|
|
}
|
|
return nested;
|
|
}
|
|
if (value.getClass().isArray()) {
|
|
int length = Array.getLength(value);
|
|
List<Object> nested = new ArrayList<>(length);
|
|
for (int i = 0; i < length; i++) {
|
|
nested.add(normalizeMetadataValue(Array.get(value, i)));
|
|
}
|
|
return nested;
|
|
}
|
|
return value.toString();
|
|
}
|
|
}
|