Complete runtime event mixing integration

This commit is contained in:
trifonovt 2026-06-11 14:21:14 +02:00
parent 33a2f52d33
commit 46a89ea5b5
6 changed files with 280 additions and 479 deletions

View File

@ -1,33 +1,45 @@
# Patch: Tachograph file-session support for runtime event mixing
# EventHub runtime event-mixing refactor
This patch extends the existing `event-evidence-mixing` implementation so it also supports events produced by uploaded tachograph file sessions.
This patch refactors the previous targeted card/VU duplicate handling into a first-class runtime event-mixing subsystem.
## What changed
## New architecture components
- `RuntimeEventMixingService` now treats `TACHOGRAPH_FILE_SESSION` and `COMPOSITE_TACHOGRAPH_FILE_SESSION` events as tachograph runtime evidence.
- File-session events do not always have a DB-style `extractionCode` such as `CARD_ACTIVITY` or `VU_ACTIVITY` in the raw payload.
- The mixing service now derives the equivalent extraction code from:
- source kind: `DRIVER_CARD` or `VEHICLE_UNIT`
- event domain: `DRIVER_ACTIVITY`, `POSITION`, `PLACE`, `BORDER_CROSSING`, `DRIVER_CARD`, etc.
- file-session external id / source-package kind.
- `RuntimeEventMixingModule`
- `RuntimeEventMixingService`
- `RuntimeEventDescriptor`
- `RuntimeEventDescriptorFactory`
- `RuntimeEventSourceProfile`
- `RuntimeEventMixingRule`
- `RuntimeEventMixingRuleRegistry`
- `RuntimeEventMixingDecisionDto`
- `RuntimeMixedEventBundle`
- `RuntimeResolvedEvent`
- `RuntimeResolvedEventRole`
- `RuntimeEventMixingChannel`
## Supported derived mappings
## Current configured rules
| File-session event | Driver-card source | Vehicle-unit source |
|---|---|---|
| `DRIVER_ACTIVITY` | `CARD_ACTIVITY` | `VU_ACTIVITY` |
| `POSITION` | `CARD_POSITION` | `VU_POSITION` |
| `PLACE` | `CARD_PLACE` | `VU_PLACE` |
| `BORDER_CROSSING` | `CARD_BORDER_CROSSING` | `VU_BORDER_CROSSING` |
| `DRIVER_CARD` insert/withdraw | `CARD_VEHICLES_USED` | `IW_CYCLE` |
The rule registry currently applies these tachograph same-source rules:
## Important behavior
1. `tachograph.activity.card-vu.same-event-key`
2. `tachograph.activity.card-vu.compatible-activity-key`
3. `tachograph.support.card-vu.same-event-key`
4. `tachograph.support.card-vu.compatible-support-key`
- Duplicate file-session `CARD_ACTIVITY` / `VU_ACTIVITY` events are mixed the same way as persistent tachograph DB events.
- Duplicate file-session position/place/border events are also mixed the same way as persistent tachograph DB support evidence.
- `CARD_VEHICLES_USED` and `IW_CYCLE` are still not mixed; they remain accepted for separate vehicle-usage processing.
The activity rules collapse duplicate `CARD_ACTIVITY`/`VU_ACTIVITY` points before activity intervalization.
## Modified files
The support rules collapse duplicate card/VU support evidence for:
- `src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java`
- `src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java`
- `CARD_POSITION` / `VU_POSITION`
- `CARD_PLACE` / `VU_PLACE`
- `CARD_BORDER_CROSSING` / `VU_BORDER_CROSSING`
The card-side event remains the primary event. The VU-side event is suppressed from the processing channel but remains visible through `suppressedEvents`, `resolvedEvents`, and `eventMixingDecisions`.
## Still intentionally unchanged
`CARD_VEHICLES_USED` and `IW_CYCLE` are still not mixed. They remain fully accepted in `vehicleUsageEvents` because they need a separate vehicle-usage rule later.
## TACHOGRAPH_FILE_SESSION support
The descriptor factory recognizes `TACHOGRAPH_FILE_SESSION` and `COMPOSITE_TACHOGRAPH_FILE_SESSION` events and derives card/VU extraction codes from `sourceKind` and event domain when no explicit `extractionCode` is present.

View File

@ -1,24 +1,17 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@ -37,37 +30,58 @@ public class RuntimeEventMixingService {
public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY =
"tachograph.support.card-vu.compatible-support-key";
private final RuntimeEventDescriptorFactory descriptorFactory;
private final RuntimeEventMixingRuleRegistry ruleRegistry;
@Autowired
public RuntimeEventMixingService(
RuntimeEventDescriptorFactory descriptorFactory,
RuntimeEventMixingRuleRegistry ruleRegistry
) {
this.descriptorFactory = descriptorFactory;
this.ruleRegistry = ruleRegistry;
}
/** Compatibility constructor used by unit tests and local registries. */
public RuntimeEventMixingService() {
this(new RuntimeEventDescriptorFactory(), new RuntimeEventMixingRuleRegistry());
}
public RuntimeMixedEventBundle mix(List<EventHubEventDto> events, String requestedMode) {
List<EventHubEventDto> rawEvents = sort(events);
String mode = normalizeMode(requestedMode);
List<RuntimeEventDescriptor> descriptors = descriptorFactory.describeSorted(events);
List<EventHubEventDto> rawEvents = descriptors.stream().map(RuntimeEventDescriptor::event).toList();
if (MODE_OFF.equals(mode)) {
return unchanged(rawEvents, "Runtime event mixing is disabled by eventMixingMode=OFF.");
return unchanged(rawEvents, descriptors, "Runtime event mixing is disabled by eventMixingMode=OFF.");
}
MixingState state = new MixingState(rawEvents);
applyTachographCardVuActivityMixing(state);
applyTachographCardVuSupportEvidenceMixing(state);
MixingState state = new MixingState(descriptors);
for (RuntimeEventMixingRule rule : ruleRegistry.rulesForMode(mode)) {
applyRule(state, rule);
}
List<EventHubEventDto> driverPartitionEvents = rawEvents.stream()
.filter(event -> !state.isSuppressed(event))
.map(state::effectiveEvent)
.toList();
List<EventHubEventDto> activityTimelineEvents = driverPartitionEvents.stream()
.filter(RuntimeEventMixingService::isDriverActivityPoint)
.filter(descriptorFactory::isDriverActivityPoint)
.toList();
// Vehicle-usage events are intentionally not mixed here. CARD_VEHICLES_USED and IW_CYCLE
// are kept as separate input evidence because they must be processed by their own rules later.
List<EventHubEventDto> vehicleUsageEvents = rawEvents.stream()
.filter(RuntimeEventMixingService::isDriverCardUsagePoint)
.filter(descriptorFactory::isDriverCardUsagePoint)
.toList();
List<EventHubEventDto> supportEvidenceEvents = driverPartitionEvents.stream()
.filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event))
.filter(event -> !descriptorFactory.isDriverActivityPoint(event) && !descriptorFactory.isDriverCardUsagePoint(event))
.toList();
List<RuntimeResolvedEvent> resolvedEvents = buildResolvedEvents(state, rawEvents);
List<String> notes = new ArrayList<>();
notes.add("Runtime event mixing inspected " + rawEvents.size() + " event(s).");
notes.add("Runtime event mixing applied " + ruleRegistry.rulesForMode(mode).size() + " configured rule(s) in mode " + mode + ".");
notes.add("Runtime event mixing suppressed " + state.suppressedEvents().size()
+ " duplicate source event(s) from activity/support evidence channels.");
notes.add("Runtime event mixing keeps CARD_POSITION, CARD_PLACE, and CARD_BORDER_CROSSING as primary when matching VU support evidence describes the same semantic event.");
@ -79,188 +93,156 @@ public class RuntimeEventMixingService {
vehicleUsageEvents,
supportEvidenceEvents,
state.suppressedEvents(),
resolvedEvents,
state.decisions(),
notes,
state.warnings()
);
}
private RuntimeMixedEventBundle unchanged(List<EventHubEventDto> rawEvents, String note) {
private RuntimeMixedEventBundle unchanged(List<EventHubEventDto> rawEvents, List<RuntimeEventDescriptor> descriptors, String note) {
return new RuntimeMixedEventBundle(
rawEvents,
rawEvents,
rawEvents.stream().filter(RuntimeEventMixingService::isDriverActivityPoint).toList(),
rawEvents.stream().filter(RuntimeEventMixingService::isDriverCardUsagePoint).toList(),
rawEvents.stream().filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event)).toList(),
rawEvents.stream().filter(descriptorFactory::isDriverActivityPoint).toList(),
rawEvents.stream().filter(descriptorFactory::isDriverCardUsagePoint).toList(),
rawEvents.stream().filter(event -> !descriptorFactory.isDriverActivityPoint(event) && !descriptorFactory.isDriverCardUsagePoint(event)).toList(),
List.of(),
descriptors.stream().map(this::defaultResolvedEvent).toList(),
List.of(),
List.of(note),
List.of()
);
}
private void applyTachographCardVuActivityMixing(MixingState state) {
LinkedHashMap<String, List<EventHubEventDto>> exactGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (!isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) {
private void applyRule(MixingState state, RuntimeEventMixingRule rule) {
LinkedHashMap<String, List<RuntimeEventDescriptor>> groups = new LinkedHashMap<>();
for (RuntimeEventDescriptor descriptor : state.descriptors()) {
if (state.isSuppressed(descriptor) || !rule.matches(descriptor)) {
continue;
}
exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : exactGroups.entrySet()) {
fuseCardAndVuDuplicates(
state,
entry.getKey(),
"EXACT_EVENT_KEY",
RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY,
"ACTIVITY_TIMELINE",
entry.getValue(),
"CARD_ACTIVITY",
"VU_ACTIVITY",
"CARD_ACTIVITY and VU_ACTIVITY describe the same driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization."
);
}
LinkedHashMap<String, List<EventHubEventDto>> compatibleGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (state.isSuppressed(event) || !isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) {
String key = descriptor.keyFor(rule.equivalenceType());
if (key == null || key.isBlank()) {
continue;
}
compatibleGroups.computeIfAbsent(compatibleActivityKey(event), ignored -> new ArrayList<>())
.add(event);
groups.computeIfAbsent(key, ignored -> new ArrayList<>()).add(descriptor);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : compatibleGroups.entrySet()) {
fuseCardAndVuDuplicates(
state,
entry.getKey(),
"COMPATIBLE_ACTIVITY_KEY",
RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY,
"ACTIVITY_TIMELINE",
entry.getValue(),
"CARD_ACTIVITY",
"VU_ACTIVITY",
"CARD_ACTIVITY and VU_ACTIVITY describe a compatible driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization."
);
for (Map.Entry<String, List<RuntimeEventDescriptor>> entry : groups.entrySet()) {
fuseDuplicateGroup(state, rule, entry.getKey(), entry.getValue());
}
}
private void applyTachographCardVuSupportEvidenceMixing(MixingState state) {
LinkedHashMap<String, List<EventHubEventDto>> exactGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) {
continue;
}
exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : exactGroups.entrySet()) {
fuseCardAndVuSupportDuplicates(
state,
entry.getKey(),
"EXACT_EVENT_KEY",
RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY,
entry.getValue()
);
}
LinkedHashMap<String, List<EventHubEventDto>> compatibleGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) {
continue;
}
compatibleGroups.computeIfAbsent(compatibleSupportEvidenceKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : compatibleGroups.entrySet()) {
fuseCardAndVuSupportDuplicates(
state,
entry.getKey(),
"COMPATIBLE_SUPPORT_KEY",
RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY,
entry.getValue()
);
}
}
private void fuseCardAndVuSupportDuplicates(
private void fuseDuplicateGroup(
MixingState state,
RuntimeEventMixingRule rule,
String eventKey,
String equivalenceType,
String ruleId,
List<EventHubEventDto> group
List<RuntimeEventDescriptor> group
) {
Map.Entry<String, String> pair = cardVuSupportPair(group);
if (pair == null) {
return;
}
fuseCardAndVuDuplicates(
state,
eventKey,
equivalenceType,
ruleId,
"SUPPORT_EVIDENCE",
group,
pair.getKey(),
pair.getValue(),
pair.getKey() + " and " + pair.getValue() + " describe the same support evidence event. "
+ pair.getKey() + " is kept as primary support evidence; " + pair.getValue()
+ " is suppressed from support-evidence normalization. Vehicle/VIN identity from the VU event is copied to the primary event when the card event has weaker vehicle identity."
);
}
private void fuseCardAndVuDuplicates(
MixingState state,
String eventKey,
String equivalenceType,
String ruleId,
String channel,
List<EventHubEventDto> group,
String primaryExtractionCode,
String secondaryExtractionCode,
String reason
) {
List<EventHubEventDto> primaries = group.stream()
.filter(event -> Objects.equals(primaryExtractionCode, extractionCode(event)))
.sorted(eventComparator())
List<RuntimeEventDescriptor> primaries = group.stream()
.filter(rule::isPrimary)
.sorted(descriptorComparator())
.toList();
List<EventHubEventDto> secondaries = group.stream()
.filter(event -> Objects.equals(secondaryExtractionCode, extractionCode(event)))
.sorted(eventComparator())
List<RuntimeEventDescriptor> secondaries = group.stream()
.filter(rule::isSecondary)
.sorted(descriptorComparator())
.toList();
if (primaries.isEmpty() || secondaries.isEmpty()) {
return;
}
EventHubEventDto primary = primaries.get(0);
List<EventHubEventDto> newlySuppressed = secondaries.stream()
.filter(event -> !state.isSuppressed(event))
RuntimeEventDescriptor primary = primaries.getFirst();
List<RuntimeEventDescriptor> newlySuppressed = secondaries.stream()
.filter(descriptor -> !state.isSuppressed(descriptor))
.toList();
if (newlySuppressed.isEmpty()) {
return;
}
EventHubEventDto enrichedPrimary = enrichPrimaryVehicleRef(primary, newlySuppressed);
if (enrichedPrimary != primary) {
EventHubEventDto enrichedPrimary = enrichPrimaryVehicleRef(
primary.event(),
newlySuppressed.stream().map(RuntimeEventDescriptor::event).toList()
);
if (enrichedPrimary != primary.event()) {
state.replace(primary, enrichedPrimary);
}
newlySuppressed.forEach(state::suppress);
newlySuppressed.forEach(descriptor -> state.suppress(descriptor, rule, primary, eventKey));
state.markPrimary(primary, rule, eventKey, newlySuppressed);
state.decisions().add(new RuntimeEventMixingDecisionDto(
ruleId,
equivalenceType,
rule.ruleId(),
rule.equivalenceType(),
eventKey,
"FUSED_PRIMARY_SELECTED",
channel,
primary.externalSourceEventId(),
extractionCode(primary),
newlySuppressed.stream().map(EventHubEventDto::externalSourceEventId).toList(),
newlySuppressed.stream().map(RuntimeEventMixingService::extractionCode).toList(),
primary.occurredAt(),
rule.decision(),
rule.channel().name(),
primary.event().externalSourceEventId(),
primary.extractionCode(),
newlySuppressed.stream().map(descriptor -> descriptor.event().externalSourceEventId()).toList(),
newlySuppressed.stream().map(RuntimeEventDescriptor::extractionCode).toList(),
primary.event().occurredAt(),
primary.eventDomain() == null ? null : primary.eventDomain().name(),
primary.eventType() == null ? null : primary.eventType().name(),
primary.lifecycle() == null ? null : primary.lifecycle().name(),
reason
rule.reason()
));
}
private List<RuntimeResolvedEvent> buildResolvedEvents(MixingState state, List<EventHubEventDto> rawEvents) {
List<RuntimeResolvedEvent> resolved = new ArrayList<>();
for (EventHubEventDto event : rawEvents) {
RuntimeResolvedEvent explicit = state.resolvedEvent(event);
if (explicit != null) {
resolved.add(explicit);
continue;
}
RuntimeEventDescriptor descriptor = state.descriptor(event);
if (descriptor != null) {
resolved.add(defaultResolvedEvent(descriptor));
}
}
return List.copyOf(resolved);
}
private RuntimeResolvedEvent defaultResolvedEvent(RuntimeEventDescriptor descriptor) {
RuntimeEventMixingChannel channel = defaultChannel(descriptor);
RuntimeResolvedEventRole role = switch (channel) {
case ACTIVITY_TIMELINE -> RuntimeResolvedEventRole.PRIMARY;
case VEHICLE_USAGE -> RuntimeResolvedEventRole.VEHICLE_USAGE_INPUT;
case SUPPORT_EVIDENCE -> RuntimeResolvedEventRole.SUPPORT_ONLY;
default -> RuntimeResolvedEventRole.PRIMARY;
};
return new RuntimeResolvedEvent(
descriptor.event(),
channel,
role,
null,
null,
descriptor.eventKey(),
descriptor.event() == null ? null : descriptor.event().externalSourceEventId(),
List.of(),
"No event-mixing rule changed this event."
);
}
private RuntimeEventMixingChannel defaultChannel(RuntimeEventDescriptor descriptor) {
if (descriptor == null) {
return RuntimeEventMixingChannel.AUDIT;
}
if (descriptor.driverActivityPoint()) {
return RuntimeEventMixingChannel.ACTIVITY_TIMELINE;
}
if (descriptor.driverCardUsagePoint()) {
return RuntimeEventMixingChannel.VEHICLE_USAGE;
}
return RuntimeEventMixingChannel.SUPPORT_EVIDENCE;
}
private Comparator<RuntimeEventDescriptor> descriptorComparator() {
return Comparator.comparing(RuntimeEventDescriptor::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(descriptor -> descriptor.eventDomain() == null ? "" : descriptor.eventDomain().name())
.thenComparing(descriptor -> descriptor.eventType() == null ? "" : descriptor.eventType().name())
.thenComparing(descriptor -> descriptor.lifecycle() == null ? "" : descriptor.lifecycle().name())
.thenComparing(RuntimeEventDescriptor::extractionCode, Comparator.nullsLast(String::compareTo))
.thenComparing(descriptor -> descriptor.event() == null ? null : descriptor.event().externalSourceEventId(), Comparator.nullsLast(String::compareTo));
}
private static EventHubEventDto enrichPrimaryVehicleRef(EventHubEventDto primary, List<EventHubEventDto> secondaries) {
if (primary == null || secondaries == null || secondaries.isEmpty()) {
return primary;
@ -329,215 +311,10 @@ public class RuntimeEventMixingService {
);
}
private static Map.Entry<String, String> cardVuSupportPair(List<EventHubEventDto> group) {
if (group == null || group.isEmpty()) {
return null;
}
Set<String> extractionCodes = group.stream()
.map(RuntimeEventMixingService::extractionCode)
.filter(Objects::nonNull)
.collect(java.util.stream.Collectors.toCollection(LinkedHashSet::new));
if (extractionCodes.contains("CARD_POSITION") && extractionCodes.contains("VU_POSITION")) {
return Map.entry("CARD_POSITION", "VU_POSITION");
}
if (extractionCodes.contains("CARD_PLACE") && extractionCodes.contains("VU_PLACE")) {
return Map.entry("CARD_PLACE", "VU_PLACE");
}
if (extractionCodes.contains("CARD_BORDER_CROSSING") && extractionCodes.contains("VU_BORDER_CROSSING")) {
return Map.entry("CARD_BORDER_CROSSING", "VU_BORDER_CROSSING");
}
return null;
}
private static boolean isTachographCardOrVuActivity(EventHubEventDto event) {
RuntimeEventSourceProfile profile = sourceProfile(event);
return isTachographRuntimeSource(profile)
&& (Objects.equals("CARD_ACTIVITY", profile.extractionCode())
|| Objects.equals("VU_ACTIVITY", profile.extractionCode()));
}
private static boolean isTachographCardOrVuSupportEvidence(EventHubEventDto event) {
if (event == null || isDriverActivityPoint(event) || isDriverCardUsagePoint(event)) {
return false;
}
RuntimeEventSourceProfile profile = sourceProfile(event);
if (!isTachographRuntimeSource(profile)) {
return false;
}
return switch (nullToEmpty(profile.extractionCode())) {
case "CARD_POSITION", "VU_POSITION", "CARD_PLACE", "VU_PLACE", "CARD_BORDER_CROSSING", "VU_BORDER_CROSSING" -> true;
default -> false;
};
}
private static boolean isDriverActivityPoint(EventHubEventDto event) {
return event != null
&& event.eventDomain() == EventDomain.DRIVER_ACTIVITY
&& (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.END)
&& event.occurredAt() != null;
}
private static boolean isDriverCardUsagePoint(EventHubEventDto event) {
return event != null
&& event.eventDomain() == EventDomain.DRIVER_CARD
&& (event.lifecycle() == EventLifecycle.INSERT || event.lifecycle() == EventLifecycle.WITHDRAW)
&& event.occurredAt() != null;
}
private static RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
String sourceKind = firstNonBlank(text(raw, "sourceKind"), sourceKind(event));
String extractionCode = firstNonBlank(
text(raw, "extractionCode"),
fileSessionExtractionCode(event, sourceKind),
extractionCodeFromExternalSourceEventId(event)
);
String sourceSystem = firstNonBlank(
text(raw, "sourceSystem"),
sourceProvider(event),
sourceSystemFromExternalSourceEventId(event)
);
if (sourceSystem == null && (extractionCode != null || isTachographFileSessionEvent(event))) {
sourceSystem = "TACHOGRAPH";
}
return new RuntimeEventSourceProfile(
normalizeUpper(sourceSystem),
normalizeUpper(sourceKind),
normalizeUpper(extractionCode)
);
}
private static String extractionCode(EventHubEventDto event) {
return sourceProfile(event).extractionCode();
}
private static boolean isTachographRuntimeSource(RuntimeEventSourceProfile profile) {
if (profile == null) {
return false;
}
return switch (nullToEmpty(profile.sourceSystem())) {
case "TACHOGRAPH", "TACHOGRAPH_FILE_SESSION", "COMPOSITE_TACHOGRAPH_FILE_SESSION" -> true;
default -> false;
};
}
private static String fileSessionExtractionCode(EventHubEventDto event, String sourceKind) {
if (!isTachographFileSessionEvent(event)) {
return null;
}
String normalizedSourceKind = normalizeUpper(sourceKind);
if (normalizedSourceKind == null) {
return null;
}
if (event != null && event.eventDomain() == EventDomain.DRIVER_ACTIVITY) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_ACTIVITY";
case "VEHICLE_UNIT" -> "VU_ACTIVITY";
default -> null;
};
}
if (event != null && event.eventDomain() == EventDomain.DRIVER_CARD) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_VEHICLES_USED";
case "VEHICLE_UNIT" -> "IW_CYCLE";
default -> null;
};
}
if (event == null || event.eventDomain() == null) {
return null;
}
String prefix = switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD";
case "VEHICLE_UNIT" -> "VU";
default -> null;
};
if (prefix == null) {
return null;
}
return switch (event.eventDomain()) {
case POSITION -> prefix + "_POSITION";
case PLACE -> prefix + "_PLACE";
case BORDER_CROSSING -> prefix + "_BORDER_CROSSING";
case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null;
default -> null;
};
}
private static boolean isTachographFileSessionEvent(EventHubEventDto event) {
if (event == null) {
return false;
}
String packageKind = event.sourcePackageRef() == null ? null : normalizeUpper(event.sourcePackageRef().packageKind());
if (Objects.equals("TACHOGRAPH_FILE_SESSION", packageKind)
|| Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", packageKind)) {
return true;
}
String externalId = event.externalSourceEventId();
return externalId != null
&& (externalId.startsWith("TACHOGRAPH_FILE_SESSION:")
|| externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:"));
}
private static String compatibleActivityKey(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
return String.join("|",
"ACTIVITY_COMPATIBLE",
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
nullToEmpty(event.eventDomain() == null ? null : event.eventDomain().name()),
nullToEmpty(event.eventType() == null ? null : event.eventType().name()),
nullToEmpty(event.lifecycle() == null ? null : event.lifecycle().name()),
normalizeTime(event.occurredAt()),
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
nullToEmpty(firstNonBlank(text(raw, "startedAt"), text(raw, "intervalStartedAt"))),
nullToEmpty(firstNonBlank(text(raw, "endedAt"), text(raw, "intervalEndedAt"))),
nullToEmpty(firstNonBlank(text(raw, "slot"), text(raw, "cardSlot"))),
nullToEmpty(firstNonBlank(text(raw, "cardStatus"))),
nullToEmpty(firstNonBlank(text(raw, "drivingStatus")))
);
}
private static String compatibleSupportEvidenceKey(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
GeoPointDto position = event == null ? null : event.position();
return String.join("|",
"SUPPORT_COMPATIBLE",
nullToEmpty(event == null || event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()),
nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()),
nullToEmpty(event == null || event.lifecycle() == null ? null : event.lifecycle().name()),
normalizeTime(event == null ? null : event.occurredAt()),
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())),
nullToEmpty(firstNonBlank(text(raw, "longitude"), position == null || position.longitude() == null ? null : position.longitude().toPlainString())),
nullToEmpty(firstNonBlank(text(raw, "odometerM"), event == null || event.odometerM() == null ? null : String.valueOf(event.odometerM()))),
nullToEmpty(firstNonBlank(text(raw, "country"), detailText(event, "country"))),
nullToEmpty(firstNonBlank(text(raw, "region"), detailText(event, "region"))),
nullToEmpty(firstNonBlank(text(raw, "countryFrom"), detailText(event, "countryFrom"))),
nullToEmpty(firstNonBlank(text(raw, "countryTo"), detailText(event, "countryTo"))),
nullToEmpty(firstNonBlank(text(raw, "operation"), detailText(event, "operation")))
);
}
private static Comparator<EventHubEventDto> eventComparator() {
return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name())
.thenComparing(event -> event.eventType() == null ? "" : event.eventType().name())
.thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name())
.thenComparing(event -> sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo))
.thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo));
}
private static List<EventHubEventDto> sort(List<EventHubEventDto> events) {
return (events == null ? List.<EventHubEventDto>of() : events).stream()
.filter(Objects::nonNull)
.sorted(eventComparator())
.toList();
}
private static String normalizeMode(String requestedMode) {
String value = normalizeUpper(requestedMode);
String value = requestedMode == null || requestedMode.isBlank()
? null
: requestedMode.trim().toUpperCase(java.util.Locale.ROOT);
if (value == null) {
return MODE_TACHOGRAPH_SAME_SOURCE;
}
@ -547,62 +324,8 @@ public class RuntimeEventMixingService {
};
}
private static JsonNode rawPayload(EventHubEventDto event) {
return RuntimeEntityReferenceResolver.rawPayload(event);
}
private static String sourceKind(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().sourceKind();
}
private static String sourceProvider(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().providerKey();
}
private static String sourceSystemFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
return parts.length >= 1 ? parts[0] : null;
}
private static String extractionCodeFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
return parts.length >= 2 ? parts[1] : null;
}
private static String detailText(EventHubEventDto event, String field) {
if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) {
return null;
}
JsonNode value = event.eventDetails().attributes().get(field);
if (value == null || value.isNull()) {
return null;
}
String text = value.asText(null);
return text == null || text.isBlank() ? null : text.trim();
}
private static String text(JsonNode node, String field) {
if (node == null || field == null) {
return null;
}
JsonNode value = node.get(field);
if (value == null || value.isNull()) {
return null;
}
String text = value.asText(null);
return text == null || text.isBlank() ? null : text.trim();
private static boolean notBlank(String value) {
return value != null && !value.isBlank();
}
private static String firstNonBlank(String... values) {
@ -617,56 +340,120 @@ public class RuntimeEventMixingService {
return null;
}
private static String normalizeUpper(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT);
}
private static String nullToEmpty(Object value) {
return value == null ? "" : String.valueOf(value);
}
private static String normalizeTime(OffsetDateTime value) {
return value == null ? "" : value.toInstant().toString();
}
private static boolean notBlank(String value) {
return value != null && !value.isBlank();
}
private static final class MixingState {
private final List<EventHubEventDto> rawEvents;
private final List<RuntimeEventDescriptor> descriptors;
private final Map<String, RuntimeEventDescriptor> descriptorsByEventId = new LinkedHashMap<>();
private final Set<String> suppressedEventIds = new LinkedHashSet<>();
private final Map<String, EventHubEventDto> replacementsByEventId = new LinkedHashMap<>();
private final Map<String, RuntimeResolvedEvent> resolvedEventsByEventId = new LinkedHashMap<>();
private final List<EventHubEventDto> suppressedEvents = new ArrayList<>();
private final List<RuntimeEventMixingDecisionDto> decisions = new ArrayList<>();
private final List<String> warnings = new ArrayList<>();
private MixingState(List<EventHubEventDto> rawEvents) {
this.rawEvents = rawEvents;
private MixingState(List<RuntimeEventDescriptor> descriptors) {
this.descriptors = descriptors == null ? List.of() : List.copyOf(descriptors);
for (RuntimeEventDescriptor descriptor : this.descriptors) {
descriptorsByEventId.put(descriptor.eventIdentityKey(), descriptor);
}
}
private List<EventHubEventDto> rawEvents() {
return rawEvents;
private List<RuntimeEventDescriptor> descriptors() {
return descriptors;
}
private boolean isSuppressed(RuntimeEventDescriptor descriptor) {
return descriptor != null && suppressedEventIds.contains(descriptor.eventIdentityKey());
}
private boolean isSuppressed(EventHubEventDto event) {
return suppressedEventIds.contains(eventId(event));
RuntimeEventDescriptor descriptor = descriptor(event);
return descriptor != null && isSuppressed(descriptor);
}
private EventHubEventDto effectiveEvent(EventHubEventDto event) {
return replacementsByEventId.getOrDefault(eventId(event), event);
RuntimeEventDescriptor descriptor = descriptor(event);
if (descriptor == null) {
return event;
}
return replacementsByEventId.getOrDefault(descriptor.eventIdentityKey(), event);
}
private void replace(EventHubEventDto original, EventHubEventDto replacement) {
private RuntimeEventDescriptor descriptor(EventHubEventDto event) {
if (event == null) {
return null;
}
String externalId = event.externalSourceEventId();
if (externalId != null && descriptorsByEventId.containsKey(externalId)) {
return descriptorsByEventId.get(externalId);
}
return descriptors.stream()
.filter(descriptor -> descriptor.event() == event || Objects.equals(descriptor.event(), event))
.findFirst()
.orElse(null);
}
private void replace(RuntimeEventDescriptor original, EventHubEventDto replacement) {
if (original != null && replacement != null) {
replacementsByEventId.put(eventId(original), replacement);
replacementsByEventId.put(original.eventIdentityKey(), replacement);
}
}
private void suppress(EventHubEventDto event) {
if (suppressedEventIds.add(eventId(event))) {
suppressedEvents.add(event);
private void markPrimary(
RuntimeEventDescriptor primary,
RuntimeEventMixingRule rule,
String eventKey,
List<RuntimeEventDescriptor> secondaries
) {
if (primary == null) {
return;
}
EventHubEventDto effectivePrimary = replacementsByEventId.getOrDefault(primary.eventIdentityKey(), primary.event());
resolvedEventsByEventId.put(primary.eventIdentityKey(), new RuntimeResolvedEvent(
effectivePrimary,
rule.channel(),
rule.primaryRole(),
rule.ruleId(),
rule.equivalenceType(),
eventKey,
primary.event().externalSourceEventId(),
secondaries.stream().map(descriptor -> descriptor.event().externalSourceEventId()).toList(),
rule.reason()
));
}
private void suppress(
RuntimeEventDescriptor descriptor,
RuntimeEventMixingRule rule,
RuntimeEventDescriptor primary,
String eventKey
) {
if (descriptor == null || descriptor.event() == null) {
return;
}
if (suppressedEventIds.add(descriptor.eventIdentityKey())) {
suppressedEvents.add(descriptor.event());
resolvedEventsByEventId.put(descriptor.eventIdentityKey(), new RuntimeResolvedEvent(
descriptor.event(),
rule.channel(),
rule.secondaryRole(),
rule.ruleId(),
rule.equivalenceType(),
eventKey,
primary == null || primary.event() == null ? null : primary.event().externalSourceEventId(),
primary == null || primary.event() == null
? List.of()
: List.of(primary.event().externalSourceEventId()),
rule.reason()
));
}
}
private RuntimeResolvedEvent resolvedEvent(EventHubEventDto event) {
RuntimeEventDescriptor descriptor = descriptor(event);
if (descriptor == null) {
return null;
}
return resolvedEventsByEventId.get(descriptor.eventIdentityKey());
}
private List<EventHubEventDto> suppressedEvents() {
@ -680,12 +467,5 @@ public class RuntimeEventMixingService {
private List<String> warnings() {
return warnings;
}
private String eventId(EventHubEventDto event) {
if (event == null) {
return "<null>";
}
return firstNonBlank(event.externalSourceEventId(), event.eventId() == null ? null : event.eventId().toString(), RuntimeEventIdentityResolver.canonicalEventKey(event));
}
}
}

View File

@ -5,4 +5,10 @@ public record RuntimeEventSourceProfile(
String sourceKind,
String extractionCode
) {
public boolean isTachographRuntimeSource() {
return switch (sourceSystem == null ? "" : sourceSystem) {
case "TACHOGRAPH", "TACHOGRAPH_FILE_SESSION", "COMPOSITE_TACHOGRAPH_FILE_SESSION" -> true;
default -> false;
};
}
}

View File

@ -10,6 +10,7 @@ public record RuntimeMixedEventBundle(
List<EventHubEventDto> vehicleUsageEvents,
List<EventHubEventDto> supportEvidenceEvents,
List<EventHubEventDto> suppressedEvents,
List<RuntimeResolvedEvent> resolvedEvents,
List<RuntimeEventMixingDecisionDto> eventMixingDecisions,
List<String> notes,
List<String> warnings
@ -21,6 +22,7 @@ public record RuntimeMixedEventBundle(
vehicleUsageEvents = vehicleUsageEvents == null ? List.of() : List.copyOf(vehicleUsageEvents);
supportEvidenceEvents = supportEvidenceEvents == null ? List.of() : List.copyOf(supportEvidenceEvents);
suppressedEvents = suppressedEvents == null ? List.of() : List.copyOf(suppressedEvents);
resolvedEvents = resolvedEvents == null ? List.of() : List.copyOf(resolvedEvents);
eventMixingDecisions = eventMixingDecisions == null ? List.of() : List.copyOf(eventMixingDecisions);
notes = notes == null ? List.of() : List.copyOf(notes);
warnings = warnings == null ? List.of() : List.copyOf(warnings);

View File

@ -39,9 +39,9 @@ public class EventEvidenceMixingModule implements RuntimeProcessingModule {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. The initial rule collapses duplicate tachograph CARD_ACTIVITY/VU_ACTIVITY events by normalized eventKey while keeping CARD_VEHICLES_USED unchanged for vehicle-usage processing.",
"Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged for vehicle-usage processing.",
"JAVA",
Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto")
Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto")
);
}
@ -59,6 +59,7 @@ public class EventEvidenceMixingModule implements RuntimeProcessingModule {
metadata.put("vehicleUsageEventCount", mixed.vehicleUsageEvents().size());
metadata.put("supportEvidenceEventCount", mixed.supportEvidenceEvents().size());
metadata.put("suppressedEventCount", mixed.suppressedEvents().size());
metadata.put("resolvedEventCount", mixed.resolvedEvents().size());
metadata.put("eventMixingDecisionCount", mixed.eventMixingDecisions().size());
metadata.put("eventMixingMode", eventMixingMode(context));
return new RuntimeProcessingModuleResult(

View File

@ -117,9 +117,9 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
descriptors.add(new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
"Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. Initially collapses same-event-key CARD_ACTIVITY/VU_ACTIVITY duplicates and leaves CARD_VEHICLES_USED unchanged.",
"Applies source-aware runtime evidence rules before intervalization. The rule registry currently collapses duplicate tachograph card/VU activity, position, place, and border evidence while keeping CARD_VEHICLES_USED/IW_CYCLE unchanged.",
"JAVA",
Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto")
Set.of("RuntimeMixedEventBundle", "RuntimeResolvedEvent", "RuntimeEventMixingDecisionDto")
));
}
descriptors.addAll(List.of(