Refine runtime event mixing compatibility
This commit is contained in:
parent
e24df88736
commit
74d479454a
|
|
@ -1,35 +1,42 @@
|
|||
# Driver-working-time Esper contract refactoring
|
||||
# Cross-representation tachograph event mixing fix
|
||||
|
||||
This patch removes tachograph-prefixed Esper contract names from the active source-neutral driver-working-time pipeline without changing processing behavior.
|
||||
## Problem
|
||||
|
||||
## New canonical Esper contracts
|
||||
When a runtime request combined `TACHOGRAPH_FILE_SESSION` and `TACHOGRAPH_DB`, CARD and VU observations of the same tachograph fact were often not fused. The old compatible keys embedded representation-specific values such as tenant metadata, coordinate string scale, country representation, region defaults, vehicle completeness and interval metadata.
|
||||
|
||||
- `DriverWorkingTimeActivityPointInputEvent`
|
||||
- `DriverWorkingTimeVehicleUsagePointInputEvent`
|
||||
- `DriverWorkingTimeActivityIntervalInputEvent`
|
||||
- `DriverWorkingTimeVehicleUsageIntervalInputEvent`
|
||||
- `DriverWorkingTimeSupportEvidenceInputEvent`
|
||||
- `DriverWorkingTimeProjectionFinalizeEvent`
|
||||
- `DriverWorkingTimeVehicleUsageIntervalInputWindow`
|
||||
Examples of equivalent values that produced different keys:
|
||||
|
||||
`DriverWorkingTimeEsperContractNames` centralizes these names for Java-side registration, event sending, cleanup, and future common modules.
|
||||
- country `13` versus `D`;
|
||||
- region `0` versus `null`;
|
||||
- longitude `9.3883333333333336` versus `9.388333333333334`;
|
||||
- file-session package tenant `default` versus the DB tenant;
|
||||
- CARD events without VIN versus VU events with VIN.
|
||||
|
||||
## Updated active common components
|
||||
## Changes
|
||||
|
||||
- `DriverWorkingTimeReusableProjectionBuilder`
|
||||
- `driver-working-time-derived-projections.epl`
|
||||
- `runtime-driver-event-interval-preprocessor.epl`
|
||||
- runtime cleanup query for the vehicle-usage input window
|
||||
- contract and cleanup regression tests
|
||||
- runtime-processing documentation
|
||||
- Compatible activity and support keys now contain only stable candidate identity:
|
||||
- driver;
|
||||
- domain;
|
||||
- event type;
|
||||
- semantic lifecycle;
|
||||
- exact event timestamp.
|
||||
- Exact timestamp behavior is unchanged.
|
||||
- Added `RuntimeEventEvidenceCompatibilityMatcher` to validate grouped candidates semantically.
|
||||
- Support compatibility normalizes:
|
||||
- tachograph nation numeric/alpha forms;
|
||||
- region `0`/blank/null;
|
||||
- coordinate decimal scale with a `1e-9` serialization tolerance;
|
||||
- registration formatting;
|
||||
- optional VIN, odometer and operation data.
|
||||
- Missing optional data is enrichable, while conflicting meaningful values prevent fusion.
|
||||
- Activity compatibility allows source-specific optional metadata differences while still checking tenant, vehicle/registration and card slot compatibility.
|
||||
- Mixing now evaluates compatibility per primary/secondary pair instead of suppressing every secondary in a broad group.
|
||||
- Internal mixing state now tracks events by object identity and uses the UUID before `externalSourceEventId`, avoiding collisions from repeated source-side IDs such as `CARDPLACE-1`.
|
||||
|
||||
## Compatibility
|
||||
## Tests
|
||||
|
||||
The tachograph-specific legacy resources and file-session compatibility code are intentionally not renamed. They remain isolated compatibility artifacts. No public request/response DTO, module key, plan key, or business rule is changed.
|
||||
Added regression coverage for:
|
||||
|
||||
## Validation
|
||||
|
||||
- Verified that active common Java/EPL files contain no legacy tachograph input-contract names.
|
||||
- Verified that all public named windows still have cleanup coverage.
|
||||
- Compiled the new contract-name class with `javac`.
|
||||
- Maven tests were not run because Maven and a Maven wrapper are unavailable in the environment.
|
||||
- file-session `CARD_PLACE` versus DB `VU_PLACE` with `13`/`D`, `0`/null and decimal-scale differences;
|
||||
- file-session CARD activity versus DB VU activity;
|
||||
- meaningful coordinate conflicts remaining separate.
|
||||
|
|
|
|||
|
|
@ -3,10 +3,8 @@ package at.procon.eventhub.processing.eventprocessing.mixing;
|
|||
import at.procon.eventhub.dto.EventDomain;
|
||||
import at.procon.eventhub.dto.EventHubEventDto;
|
||||
import at.procon.eventhub.dto.EventLifecycle;
|
||||
import at.procon.eventhub.dto.GeoPointDto;
|
||||
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
|
||||
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
|
@ -84,8 +82,8 @@ public class RuntimeEventDescriptorFactory {
|
|||
return "<null>";
|
||||
}
|
||||
return firstNonBlank(
|
||||
event.externalSourceEventId(),
|
||||
event.eventId() == null ? null : event.eventId().toString(),
|
||||
event.externalSourceEventId(),
|
||||
RuntimeEventIdentityResolver.canonicalEventKey(event)
|
||||
);
|
||||
}
|
||||
|
|
@ -100,78 +98,32 @@ public class RuntimeEventDescriptorFactory {
|
|||
}
|
||||
|
||||
private String compatibleActivityKey(EventHubEventDto event) {
|
||||
JsonNode raw = rawPayload(event);
|
||||
return String.join("|",
|
||||
"ACTIVITY_COMPATIBLE",
|
||||
nullToEmpty(event == null || event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
|
||||
nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()),
|
||||
nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()),
|
||||
nullToEmpty(event == null || event.lifecycle() == null ? null : event.lifecycle().name()),
|
||||
normalizeTime(event == null ? null : event.occurredAt()),
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
|
||||
nullToEmpty(firstNonBlank(text(raw, "startedAt"), text(raw, "intervalStartedAt"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "endedAt"), text(raw, "intervalEndedAt"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "slot"), text(raw, "cardSlot"))),
|
||||
nullToEmpty(text(raw, "cardStatus")),
|
||||
nullToEmpty(text(raw, "drivingStatus"))
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event))
|
||||
);
|
||||
}
|
||||
|
||||
private String compatibleSupportEvidenceKey(EventHubEventDto event) {
|
||||
JsonNode raw = rawPayload(event);
|
||||
GeoPointDto position = event == null ? null : event.position();
|
||||
return String.join("|",
|
||||
"SUPPORT_COMPATIBLE",
|
||||
nullToEmpty(event == null || event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
|
||||
nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()),
|
||||
nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()),
|
||||
nullToEmpty(semanticSupportLifecycle(event)),
|
||||
normalizeTime(event == null ? null : event.occurredAt()),
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
|
||||
nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())),
|
||||
nullToEmpty(firstNonBlank(text(raw, "longitude"), position == null || position.longitude() == null ? null : position.longitude().toPlainString())),
|
||||
nullToEmpty(firstNonBlank(text(raw, "odometerM"), event == null || event.odometerM() == null ? null : String.valueOf(event.odometerM()))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "country"), detailText(event, "country"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "region"), detailText(event, "region"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "countryFrom"), detailText(event, "countryFrom"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "countryTo"), detailText(event, "countryTo"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "operation"), detailText(event, "operation")))
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event))
|
||||
);
|
||||
}
|
||||
|
||||
private String semanticSupportLifecycle(EventHubEventDto event) {
|
||||
return tachographSemantics.semanticLifecycle(event);
|
||||
}
|
||||
|
||||
private JsonNode rawPayload(EventHubEventDto event) {
|
||||
return RuntimeEntityReferenceResolver.rawPayload(event);
|
||||
}
|
||||
|
||||
private String detailText(EventHubEventDto event, String field) {
|
||||
if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) {
|
||||
return null;
|
||||
}
|
||||
JsonNode value = event.eventDetails().attributes().get(field);
|
||||
if (value == null || value.isNull()) {
|
||||
return null;
|
||||
}
|
||||
String text = value.asText(null);
|
||||
return text == null || text.isBlank() ? null : text.trim();
|
||||
}
|
||||
|
||||
private String text(JsonNode node, String field) {
|
||||
if (node == null || field == null) {
|
||||
return null;
|
||||
}
|
||||
JsonNode value = node.get(field);
|
||||
if (value == null || value.isNull()) {
|
||||
return null;
|
||||
}
|
||||
String text = value.asText(null);
|
||||
return text == null || text.isBlank() ? null : text.trim();
|
||||
}
|
||||
}
|
||||
|
||||
private String firstNonBlank(String... values) {
|
||||
if (values == null) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,263 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.mixing;
|
||||
|
||||
import at.procon.eventhub.dto.EventHubEventDto;
|
||||
import at.procon.eventhub.dto.GeoPointDto;
|
||||
import at.procon.eventhub.dto.VehicleRefDto;
|
||||
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
|
||||
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
|
||||
import at.procon.eventhub.reference.TachographNationRegistry;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Performs semantic compatibility checks after broad event candidates have been grouped.
|
||||
*
|
||||
* <p>File-session and database representations of the same tachograph fact can differ in
|
||||
* serialization details such as decimal scale, nation representation, default region values,
|
||||
* optional vehicle identity and interval metadata. Those representation details must not be part
|
||||
* of the candidate key, but meaningful conflicts must still prevent fusion.</p>
|
||||
*/
|
||||
@Component
|
||||
public class RuntimeEventEvidenceCompatibilityMatcher {
|
||||
|
||||
private static final BigDecimal GEO_TOLERANCE = new BigDecimal("0.000000001");
|
||||
|
||||
public boolean compatible(
|
||||
RuntimeEventMixingRule rule,
|
||||
RuntimeEventDescriptor primary,
|
||||
RuntimeEventDescriptor secondary
|
||||
) {
|
||||
if (rule == null || primary == null || secondary == null) {
|
||||
return false;
|
||||
}
|
||||
return switch (rule.equivalenceType()) {
|
||||
case RuntimeEventMixingRule.EQUIVALENCE_EXACT_EVENT_KEY -> true;
|
||||
case RuntimeEventMixingRule.EQUIVALENCE_COMPATIBLE_ACTIVITY_KEY ->
|
||||
activityCompatible(primary.event(), secondary.event());
|
||||
case RuntimeEventMixingRule.EQUIVALENCE_COMPATIBLE_SUPPORT_KEY ->
|
||||
supportCompatible(primary.event(), secondary.event());
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
private boolean activityCompatible(EventHubEventDto left, EventHubEventDto right) {
|
||||
return tenantCompatible(left, right)
|
||||
&& registrationCompatible(left, right)
|
||||
&& vehicleIdentityCompatible(left, right)
|
||||
&& optionalTokenCompatible(activitySlot(left), activitySlot(right));
|
||||
}
|
||||
|
||||
private boolean supportCompatible(EventHubEventDto left, EventHubEventDto right) {
|
||||
return tenantCompatible(left, right)
|
||||
&& registrationCompatible(left, right)
|
||||
&& vehicleIdentityCompatible(left, right)
|
||||
&& coordinatesCompatible(left, right)
|
||||
&& odometerCompatible(left, right)
|
||||
&& nationCompatible(detailValue(left, "country"), detailValue(right, "country"))
|
||||
&& regionCompatible(detailValue(left, "region"), detailValue(right, "region"))
|
||||
&& nationCompatible(detailValue(left, "countryFrom"), detailValue(right, "countryFrom"))
|
||||
&& nationCompatible(detailValue(left, "countryTo"), detailValue(right, "countryTo"))
|
||||
&& optionalTokenCompatible(detailValue(left, "operation"), detailValue(right, "operation"));
|
||||
}
|
||||
|
||||
private boolean tenantCompatible(EventHubEventDto left, EventHubEventDto right) {
|
||||
return optionalTokenCompatible(normalizedTenant(left), normalizedTenant(right));
|
||||
}
|
||||
|
||||
private String normalizedTenant(EventHubEventDto event) {
|
||||
String value = event == null || event.packageInfo() == null ? null : event.packageInfo().tenantKey();
|
||||
String normalized = normalizeToken(value);
|
||||
return Objects.equals("DEFAULT", normalized) ? null : normalized;
|
||||
}
|
||||
|
||||
private boolean registrationCompatible(EventHubEventDto left, EventHubEventDto right) {
|
||||
return optionalTokenCompatible(normalizedRegistration(left), normalizedRegistration(right));
|
||||
}
|
||||
|
||||
private String normalizedRegistration(EventHubEventDto event) {
|
||||
VehicleRefDto vehicleRef = event == null ? null : event.vehicleRef();
|
||||
VehicleRegistrationRefDto registration = vehicleRef == null ? null : vehicleRef.vehicleRegistration();
|
||||
if (registration != null && registration.hasValue()) {
|
||||
String nation = normalizedNation(registration.nation(), registration.nationNumericCode());
|
||||
String number = normalizeIdentifier(registration.number());
|
||||
return nullToEmpty(nation) + ":" + nullToEmpty(number);
|
||||
}
|
||||
|
||||
String key = RuntimeEntityReferenceResolver.registrationKey(event);
|
||||
if (key == null || key.isBlank()) {
|
||||
return null;
|
||||
}
|
||||
int separator = key.indexOf(':');
|
||||
if (separator < 0) {
|
||||
return normalizeIdentifier(key);
|
||||
}
|
||||
String nation = normalizedNation(key.substring(0, separator), null);
|
||||
String number = normalizeIdentifier(key.substring(separator + 1));
|
||||
return nullToEmpty(nation) + ":" + nullToEmpty(number);
|
||||
}
|
||||
|
||||
private boolean vehicleIdentityCompatible(EventHubEventDto left, EventHubEventDto right) {
|
||||
String leftVin = normalizedVin(left);
|
||||
String rightVin = normalizedVin(right);
|
||||
return optionalTokenCompatible(leftVin, rightVin);
|
||||
}
|
||||
|
||||
private String normalizedVin(EventHubEventDto event) {
|
||||
String vehicleKey = RuntimeEntityReferenceResolver.vehicleKey(event);
|
||||
if (vehicleKey != null) {
|
||||
return normalizeIdentifier(vehicleKey);
|
||||
}
|
||||
VehicleRefDto vehicleRef = event == null ? null : event.vehicleRef();
|
||||
return vehicleRef == null ? null : normalizeIdentifier(vehicleRef.vin());
|
||||
}
|
||||
|
||||
private boolean coordinatesCompatible(EventHubEventDto left, EventHubEventDto right) {
|
||||
BigDecimal leftLatitude = coordinate(left, true);
|
||||
BigDecimal rightLatitude = coordinate(right, true);
|
||||
BigDecimal leftLongitude = coordinate(left, false);
|
||||
BigDecimal rightLongitude = coordinate(right, false);
|
||||
return optionalDecimalCompatible(leftLatitude, rightLatitude, GEO_TOLERANCE)
|
||||
&& optionalDecimalCompatible(leftLongitude, rightLongitude, GEO_TOLERANCE);
|
||||
}
|
||||
|
||||
private BigDecimal coordinate(EventHubEventDto event, boolean latitude) {
|
||||
GeoPointDto position = event == null ? null : event.position();
|
||||
BigDecimal value = position == null ? null : latitude ? position.latitude() : position.longitude();
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
return decimal(rawValue(event, latitude ? "latitude" : "longitude"));
|
||||
}
|
||||
|
||||
private boolean odometerCompatible(EventHubEventDto left, EventHubEventDto right) {
|
||||
BigDecimal leftValue = odometerM(left);
|
||||
BigDecimal rightValue = odometerM(right);
|
||||
return optionalDecimalCompatible(leftValue, rightValue, BigDecimal.ZERO);
|
||||
}
|
||||
|
||||
private BigDecimal odometerM(EventHubEventDto event) {
|
||||
if (event != null && event.odometerM() != null) {
|
||||
return BigDecimal.valueOf(event.odometerM());
|
||||
}
|
||||
BigDecimal meters = decimal(rawValue(event, "odometerM"));
|
||||
if (meters != null) {
|
||||
return meters;
|
||||
}
|
||||
BigDecimal kilometres = decimal(rawValue(event, "odometerKm"));
|
||||
return kilometres == null ? null : kilometres.multiply(BigDecimal.valueOf(1000));
|
||||
}
|
||||
|
||||
private String activitySlot(EventHubEventDto event) {
|
||||
return firstNonBlank(
|
||||
rawValue(event, "slot"),
|
||||
rawValue(event, "cardSlot"),
|
||||
detailAttribute(event, "cardSlot")
|
||||
);
|
||||
}
|
||||
|
||||
private String detailValue(EventHubEventDto event, String field) {
|
||||
return firstNonBlank(rawValue(event, field), detailAttribute(event, field));
|
||||
}
|
||||
|
||||
private String rawValue(EventHubEventDto event, String field) {
|
||||
return text(RuntimeEntityReferenceResolver.rawPayload(event), field);
|
||||
}
|
||||
|
||||
private String detailAttribute(EventHubEventDto event, String field) {
|
||||
JsonNode attributes = event == null || event.eventDetails() == null
|
||||
? null
|
||||
: event.eventDetails().attributes();
|
||||
return text(attributes, field);
|
||||
}
|
||||
|
||||
private boolean nationCompatible(String left, String right) {
|
||||
String normalizedLeft = normalizedNation(left, null);
|
||||
String normalizedRight = normalizedNation(right, null);
|
||||
return optionalTokenCompatible(normalizedLeft, normalizedRight);
|
||||
}
|
||||
|
||||
private String normalizedNation(String nation, Integer numericCode) {
|
||||
TachographNationRegistry.NationResolution resolution =
|
||||
TachographNationRegistry.resolve(nation, numericCode);
|
||||
if (resolution.numericCode() != null) {
|
||||
return String.valueOf(resolution.numericCode());
|
||||
}
|
||||
return normalizeToken(resolution.legacyNation());
|
||||
}
|
||||
|
||||
private boolean regionCompatible(String left, String right) {
|
||||
return optionalTokenCompatible(normalizedRegion(left), normalizedRegion(right));
|
||||
}
|
||||
|
||||
private String normalizedRegion(String value) {
|
||||
String normalized = normalizeToken(value);
|
||||
return normalized == null || Objects.equals("0", normalized) ? null : normalized;
|
||||
}
|
||||
|
||||
private boolean optionalTokenCompatible(String left, String right) {
|
||||
String normalizedLeft = normalizeToken(left);
|
||||
String normalizedRight = normalizeToken(right);
|
||||
return normalizedLeft == null || normalizedRight == null || normalizedLeft.equals(normalizedRight);
|
||||
}
|
||||
|
||||
private boolean optionalDecimalCompatible(BigDecimal left, BigDecimal right, BigDecimal tolerance) {
|
||||
if (left == null || right == null) {
|
||||
return true;
|
||||
}
|
||||
return left.subtract(right).abs().compareTo(tolerance) <= 0;
|
||||
}
|
||||
|
||||
private BigDecimal decimal(String value) {
|
||||
if (value == null || value.isBlank()) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return new BigDecimal(value.trim());
|
||||
} catch (NumberFormatException ignored) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private String text(JsonNode node, String field) {
|
||||
if (node == null || field == null) {
|
||||
return null;
|
||||
}
|
||||
JsonNode value = node.get(field);
|
||||
if (value == null || value.isNull()) {
|
||||
return null;
|
||||
}
|
||||
String text = value.asText(null);
|
||||
return text == null || text.isBlank() ? null : text.trim();
|
||||
}
|
||||
|
||||
private String normalizeIdentifier(String value) {
|
||||
if (value == null || value.isBlank()) {
|
||||
return null;
|
||||
}
|
||||
String normalized = value.trim().toUpperCase(Locale.ROOT).replaceAll("[^A-Z0-9]", "");
|
||||
return normalized.isBlank() ? null : normalized;
|
||||
}
|
||||
|
||||
private String normalizeToken(String value) {
|
||||
return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT);
|
||||
}
|
||||
|
||||
private String firstNonBlank(String... values) {
|
||||
if (values == null) {
|
||||
return null;
|
||||
}
|
||||
for (String value : values) {
|
||||
if (value != null && !value.isBlank()) {
|
||||
return value.trim();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private String nullToEmpty(String value) {
|
||||
return value == null ? "" : value;
|
||||
}
|
||||
}
|
||||
|
|
@ -5,6 +5,7 @@ import at.procon.eventhub.dto.VehicleRefDto;
|
|||
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
|
|
@ -32,19 +33,26 @@ public class RuntimeEventMixingService {
|
|||
|
||||
private final RuntimeEventDescriptorFactory descriptorFactory;
|
||||
private final RuntimeEventMixingRuleRegistry ruleRegistry;
|
||||
private final RuntimeEventEvidenceCompatibilityMatcher compatibilityMatcher;
|
||||
|
||||
@Autowired
|
||||
public RuntimeEventMixingService(
|
||||
RuntimeEventDescriptorFactory descriptorFactory,
|
||||
RuntimeEventMixingRuleRegistry ruleRegistry
|
||||
RuntimeEventMixingRuleRegistry ruleRegistry,
|
||||
RuntimeEventEvidenceCompatibilityMatcher compatibilityMatcher
|
||||
) {
|
||||
this.descriptorFactory = descriptorFactory;
|
||||
this.ruleRegistry = ruleRegistry;
|
||||
this.compatibilityMatcher = compatibilityMatcher;
|
||||
}
|
||||
|
||||
/** Compatibility constructor used by unit tests and local registries. */
|
||||
public RuntimeEventMixingService() {
|
||||
this(new RuntimeEventDescriptorFactory(), new RuntimeEventMixingRuleRegistry());
|
||||
this(
|
||||
new RuntimeEventDescriptorFactory(),
|
||||
new RuntimeEventMixingRuleRegistry(),
|
||||
new RuntimeEventEvidenceCompatibilityMatcher()
|
||||
);
|
||||
}
|
||||
|
||||
public RuntimeMixedEventBundle mix(List<EventHubEventDto> events, String requestedMode) {
|
||||
|
|
@ -149,23 +157,35 @@ public class RuntimeEventMixingService {
|
|||
if (primaries.isEmpty() || secondaries.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
RuntimeEventDescriptor primary = primaries.getFirst();
|
||||
List<RuntimeEventDescriptor> newlySuppressed = secondaries.stream()
|
||||
.filter(descriptor -> !state.isSuppressed(descriptor))
|
||||
.toList();
|
||||
if (newlySuppressed.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (RuntimeEventDescriptor primary : primaries) {
|
||||
List<RuntimeEventDescriptor> compatibleSecondaries = secondaries.stream()
|
||||
.filter(descriptor -> !state.isSuppressed(descriptor))
|
||||
.filter(descriptor -> compatibilityMatcher.compatible(rule, primary, descriptor))
|
||||
.toList();
|
||||
if (compatibleSecondaries.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
fusePrimaryWithSecondaries(state, rule, eventKey, primary, compatibleSecondaries);
|
||||
}
|
||||
}
|
||||
|
||||
private void fusePrimaryWithSecondaries(
|
||||
MixingState state,
|
||||
RuntimeEventMixingRule rule,
|
||||
String eventKey,
|
||||
RuntimeEventDescriptor primary,
|
||||
List<RuntimeEventDescriptor> secondaries
|
||||
) {
|
||||
EventHubEventDto enrichedPrimary = enrichPrimaryVehicleRef(
|
||||
primary.event(),
|
||||
newlySuppressed.stream().map(RuntimeEventDescriptor::event).toList()
|
||||
secondaries.stream().map(RuntimeEventDescriptor::event).toList()
|
||||
);
|
||||
if (enrichedPrimary != primary.event()) {
|
||||
state.replace(primary, enrichedPrimary);
|
||||
}
|
||||
newlySuppressed.forEach(descriptor -> state.suppress(descriptor, rule, primary, eventKey));
|
||||
state.markPrimary(primary, rule, eventKey, newlySuppressed);
|
||||
secondaries.forEach(descriptor -> state.suppress(descriptor, rule, primary, eventKey));
|
||||
state.markPrimary(primary, rule, eventKey, secondaries);
|
||||
state.decisions().add(new RuntimeEventMixingDecisionDto(
|
||||
rule.ruleId(),
|
||||
rule.equivalenceType(),
|
||||
|
|
@ -174,8 +194,8 @@ public class RuntimeEventMixingService {
|
|||
rule.channel().name(),
|
||||
primary.event().externalSourceEventId(),
|
||||
primary.extractionCode(),
|
||||
newlySuppressed.stream().map(descriptor -> descriptor.event().externalSourceEventId()).toList(),
|
||||
newlySuppressed.stream().map(RuntimeEventDescriptor::extractionCode).toList(),
|
||||
secondaries.stream().map(descriptor -> descriptor.event().externalSourceEventId()).toList(),
|
||||
secondaries.stream().map(RuntimeEventDescriptor::extractionCode).toList(),
|
||||
primary.event().occurredAt(),
|
||||
primary.eventDomain() == null ? null : primary.eventDomain().name(),
|
||||
primary.eventType() == null ? null : primary.eventType().name(),
|
||||
|
|
@ -342,7 +362,7 @@ public class RuntimeEventMixingService {
|
|||
|
||||
private static final class MixingState {
|
||||
private final List<RuntimeEventDescriptor> descriptors;
|
||||
private final Map<String, RuntimeEventDescriptor> descriptorsByEventId = new LinkedHashMap<>();
|
||||
private final Map<EventHubEventDto, RuntimeEventDescriptor> descriptorsByEvent = new IdentityHashMap<>();
|
||||
private final Set<String> suppressedEventIds = new LinkedHashSet<>();
|
||||
private final Map<String, EventHubEventDto> replacementsByEventId = new LinkedHashMap<>();
|
||||
private final Map<String, RuntimeResolvedEvent> resolvedEventsByEventId = new LinkedHashMap<>();
|
||||
|
|
@ -353,7 +373,9 @@ public class RuntimeEventMixingService {
|
|||
private MixingState(List<RuntimeEventDescriptor> descriptors) {
|
||||
this.descriptors = descriptors == null ? List.of() : List.copyOf(descriptors);
|
||||
for (RuntimeEventDescriptor descriptor : this.descriptors) {
|
||||
descriptorsByEventId.put(descriptor.eventIdentityKey(), descriptor);
|
||||
if (descriptor.event() != null) {
|
||||
descriptorsByEvent.put(descriptor.event(), descriptor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -382,9 +404,9 @@ public class RuntimeEventMixingService {
|
|||
if (event == null) {
|
||||
return null;
|
||||
}
|
||||
String externalId = event.externalSourceEventId();
|
||||
if (externalId != null && descriptorsByEventId.containsKey(externalId)) {
|
||||
return descriptorsByEventId.get(externalId);
|
||||
RuntimeEventDescriptor direct = descriptorsByEvent.get(event);
|
||||
if (direct != null) {
|
||||
return direct;
|
||||
}
|
||||
return descriptors.stream()
|
||||
.filter(descriptor -> descriptor.event() == event || Objects.equals(descriptor.event(), event))
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import at.procon.eventhub.dto.VehicleRefDto;
|
|||
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import java.math.BigDecimal;
|
||||
import java.time.LocalDate;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.List;
|
||||
|
|
@ -423,6 +424,215 @@ class RuntimeEventMixingServiceTest {
|
|||
.containsExactlyInAnyOrder("VU_LOAD_UNLOAD", "VU_SPECIFIC_CONDITION");
|
||||
}
|
||||
|
||||
@Test
|
||||
void mixesFileSessionCardPlaceWithDatabaseVuPlaceAcrossRepresentationDifferences() {
|
||||
OffsetDateTime occurredAt = OffsetDateTime.parse("2026-04-01T04:30:59Z");
|
||||
EventHubEventDto fileCard = crossRepresentationPlace(
|
||||
true,
|
||||
"TACHOGRAPH_FILE_SESSION:card-place-28",
|
||||
occurredAt,
|
||||
"13",
|
||||
"0",
|
||||
new BigDecimal("50.6400000000000000"),
|
||||
new BigDecimal("9.3883333333333336")
|
||||
);
|
||||
EventHubEventDto databaseVu = crossRepresentationPlace(
|
||||
false,
|
||||
"4693459",
|
||||
occurredAt,
|
||||
"D",
|
||||
null,
|
||||
new BigDecimal("50.64"),
|
||||
new BigDecimal("9.388333333333334")
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(
|
||||
List.of(fileCard, databaseVu),
|
||||
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
|
||||
);
|
||||
|
||||
assertThat(mixed.supportEvidenceEvents()).hasSize(1);
|
||||
assertThat(mixed.supportEvidenceEvents().getFirst().externalSourceEventId())
|
||||
.isEqualTo(fileCard.externalSourceEventId());
|
||||
assertThat(mixed.supportEvidenceEvents().getFirst().vehicleRef().vin())
|
||||
.isEqualTo("XLRTEH4300G376073");
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly(databaseVu.externalSourceEventId());
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(1);
|
||||
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
|
||||
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY);
|
||||
}
|
||||
|
||||
@Test
|
||||
void mixesFileSessionCardActivityWithDatabaseVuActivity() {
|
||||
OffsetDateTime occurredAt = OffsetDateTime.parse("2026-04-01T05:22:00Z");
|
||||
EventHubEventDto fileCard = crossRepresentationActivity(
|
||||
true,
|
||||
"TACHOGRAPH_FILE_SESSION:card-activity-1",
|
||||
occurredAt
|
||||
);
|
||||
EventHubEventDto databaseVu = crossRepresentationActivity(
|
||||
false,
|
||||
"116710708",
|
||||
occurredAt
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(
|
||||
List.of(fileCard, databaseVu),
|
||||
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
|
||||
);
|
||||
|
||||
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly(fileCard.externalSourceEventId());
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly(databaseVu.externalSourceEventId());
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(1);
|
||||
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
|
||||
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY);
|
||||
}
|
||||
|
||||
@Test
|
||||
void keepsCrossRepresentationSupportEventsWhenMeaningfulCoordinatesConflict() {
|
||||
OffsetDateTime occurredAt = OffsetDateTime.parse("2026-04-01T04:30:59Z");
|
||||
EventHubEventDto fileCard = crossRepresentationPlace(
|
||||
true,
|
||||
"TACHOGRAPH_FILE_SESSION:card-place-conflict",
|
||||
occurredAt,
|
||||
"13",
|
||||
"0",
|
||||
new BigDecimal("50.64"),
|
||||
new BigDecimal("9.3883333333333336")
|
||||
);
|
||||
EventHubEventDto databaseVu = crossRepresentationPlace(
|
||||
false,
|
||||
"4693460",
|
||||
occurredAt,
|
||||
"D",
|
||||
null,
|
||||
new BigDecimal("50.64"),
|
||||
new BigDecimal("10.0")
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(
|
||||
List.of(fileCard, databaseVu),
|
||||
RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE
|
||||
);
|
||||
|
||||
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactlyInAnyOrder(fileCard.externalSourceEventId(), databaseVu.externalSourceEventId());
|
||||
assertThat(mixed.suppressedEvents()).isEmpty();
|
||||
assertThat(mixed.eventMixingDecisions()).isEmpty();
|
||||
}
|
||||
|
||||
private EventHubEventDto crossRepresentationPlace(
|
||||
boolean fileSessionCard,
|
||||
String externalId,
|
||||
OffsetDateTime occurredAt,
|
||||
String country,
|
||||
String region,
|
||||
BigDecimal latitude,
|
||||
BigDecimal longitude
|
||||
) {
|
||||
String sourceKind = fileSessionCard ? "DRIVER_CARD" : "VEHICLE_UNIT";
|
||||
ObjectNode raw = fileSessionCard
|
||||
? baseRawWithoutExtraction(sourceKind)
|
||||
: baseRaw("VU_PLACE", sourceKind);
|
||||
raw.put("latitude", latitude.toPlainString());
|
||||
raw.put("longitude", longitude.toPlainString());
|
||||
raw.put("odometerM", "172945000");
|
||||
raw.put("country", country);
|
||||
if (region != null) {
|
||||
raw.put("region", region);
|
||||
}
|
||||
ObjectNode payload = JsonNodeFactory.instance.objectNode();
|
||||
payload.set("raw", raw);
|
||||
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
|
||||
attributes.put("country", country);
|
||||
if (region != null) {
|
||||
attributes.put("region", region);
|
||||
}
|
||||
VehicleRefDto vehicleRef = fileSessionCard
|
||||
? new VehicleRefDto(null, null, null, new VehicleRegistrationRefDto("13", 13, "RO BS 2219"))
|
||||
: new VehicleRefDto("3342", "XLRTEH4300G376073", null, new VehicleRegistrationRefDto("D", 13, "RO BS 2219"));
|
||||
EventHubEventDto event = new EventHubEventDto(
|
||||
UUID.randomUUID(),
|
||||
externalId,
|
||||
new DriverRefDto("13:DF000358328840", new DriverCardRefDto("13", 13, "DF000358328840")),
|
||||
vehicleRef,
|
||||
occurredAt,
|
||||
null,
|
||||
occurredAt,
|
||||
EventDomain.PLACE,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
fileSessionCard ? EventLifecycle.BEGIN : EventLifecycle.START,
|
||||
172945000L,
|
||||
new at.procon.eventhub.dto.GeoPointDto(latitude, longitude),
|
||||
new EventDetailsDto("PLACE", attributes),
|
||||
null,
|
||||
payload,
|
||||
false,
|
||||
packageInfo(
|
||||
fileSessionCard ? "default" : "TENANT_A",
|
||||
fileSessionCard ? "TACHOGRAPH_FILE_SESSION" : "TACHOGRAPH",
|
||||
sourceKind,
|
||||
EventDomain.PLACE,
|
||||
occurredAt.toLocalDate()
|
||||
)
|
||||
);
|
||||
return event;
|
||||
}
|
||||
|
||||
private EventHubEventDto crossRepresentationActivity(
|
||||
boolean fileSessionCard,
|
||||
String externalId,
|
||||
OffsetDateTime occurredAt
|
||||
) {
|
||||
String sourceKind = fileSessionCard ? "DRIVER_CARD" : "VEHICLE_UNIT";
|
||||
ObjectNode raw = fileSessionCard
|
||||
? baseRawWithoutExtraction(sourceKind)
|
||||
: baseRaw("VU_ACTIVITY", sourceKind);
|
||||
raw.put("activityType", "DRIVE");
|
||||
raw.put("cardSlot", "DRIVER");
|
||||
raw.put("startedAt", "2026-04-01T05:22:00Z");
|
||||
raw.put("endedAt", "2026-04-01T05:52:00Z");
|
||||
if (fileSessionCard) {
|
||||
raw.put("cardStatus", "INSERTED");
|
||||
raw.put("drivingStatus", "SINGLE");
|
||||
}
|
||||
ObjectNode payload = JsonNodeFactory.instance.objectNode();
|
||||
payload.set("raw", raw);
|
||||
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
|
||||
attributes.put("cardSlot", "DRIVER");
|
||||
VehicleRefDto vehicleRef = fileSessionCard
|
||||
? new VehicleRefDto(null, null, null, new VehicleRegistrationRefDto("13", 13, "RO BS 2219"))
|
||||
: new VehicleRefDto("3342", "XLRTEH4300G376073", null, new VehicleRegistrationRefDto("D", 13, "RO BS 2219"));
|
||||
return new EventHubEventDto(
|
||||
UUID.randomUUID(),
|
||||
externalId,
|
||||
new DriverRefDto("13:DF000358328840", new DriverCardRefDto("13", 13, "DF000358328840")),
|
||||
vehicleRef,
|
||||
occurredAt,
|
||||
null,
|
||||
occurredAt,
|
||||
EventDomain.DRIVER_ACTIVITY,
|
||||
EventType.DRIVE,
|
||||
EventLifecycle.START,
|
||||
null,
|
||||
null,
|
||||
new EventDetailsDto("DRIVER_ACTIVITY", attributes),
|
||||
null,
|
||||
payload,
|
||||
false,
|
||||
packageInfo(
|
||||
fileSessionCard ? "default" : "TENANT_A",
|
||||
fileSessionCard ? "TACHOGRAPH_FILE_SESSION" : "TACHOGRAPH",
|
||||
sourceKind,
|
||||
EventDomain.DRIVER_ACTIVITY,
|
||||
occurredAt.toLocalDate()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private EventHubEventDto activity(String extractionCode, String sourceKind, String externalId) {
|
||||
ObjectNode raw = baseRaw(extractionCode, sourceKind);
|
||||
raw.put("activityType", "BREAK_REST");
|
||||
|
|
@ -665,6 +875,27 @@ class RuntimeEventMixingServiceTest {
|
|||
);
|
||||
}
|
||||
|
||||
private EventHubPackageRequest packageInfo(
|
||||
String tenantKey,
|
||||
String providerKey,
|
||||
String sourceKind,
|
||||
EventDomain domain,
|
||||
LocalDate businessDate
|
||||
) {
|
||||
EventSourceDto source = new EventSourceDto(providerKey, sourceKind, providerKey + "_" + sourceKind, null, null, null);
|
||||
OffsetDateTime from = businessDate.atStartOfDay().atOffset(java.time.ZoneOffset.UTC);
|
||||
OffsetDateTime to = businessDate.plusDays(1).atStartOfDay().atOffset(java.time.ZoneOffset.UTC);
|
||||
return new EventHubPackageRequest(
|
||||
tenantKey,
|
||||
source,
|
||||
null,
|
||||
ImportScopeDto.tenantAll(from, to),
|
||||
domain.name(),
|
||||
businessDate,
|
||||
source.stableKey() + ":" + domain.name() + ":" + businessDate
|
||||
);
|
||||
}
|
||||
|
||||
private EventHubPackageRequest packageInfo(String sourceKind, EventDomain domain, LocalDate businessDate) {
|
||||
EventSourceDto source = new EventSourceDto("TACHOGRAPH", sourceKind, "TACHOGRAPH_" + sourceKind, null, null, null);
|
||||
OffsetDateTime from = businessDate.atStartOfDay().atOffset(java.time.ZoneOffset.UTC);
|
||||
|
|
|
|||
Loading…
Reference in New Issue