Add runtime event mixing project changes

This commit is contained in:
trifonovt 2026-06-11 13:11:17 +02:00
parent 3cd2533d5b
commit b4289abea5
13 changed files with 1399 additions and 18 deletions

View File

@ -1,17 +1,33 @@
# EventHub fix-list patch
# Patch: Tachograph file-session support for runtime event mixing
This patch implements the requested remaining items from the fix list, excluding build verification and SQL Server 2008 SQL rewriting.
This patch extends the existing `event-evidence-mixing` implementation so it also supports events produced by uploaded tachograph file sessions.
## Apply notes
## What changed
1. Copy the files from this archive into the project root.
2. Delete the old migration files listed in `DELETE_FILES.txt`.
3. Run the test suite locally with Maven/Java 21.
- `RuntimeEventMixingService` now treats `TACHOGRAPH_FILE_SESSION` and `COMPOSITE_TACHOGRAPH_FILE_SESSION` events as tachograph runtime evidence.
- File-session events do not always have a DB-style `extractionCode` such as `CARD_ACTIVITY` or `VU_ACTIVITY` in the raw payload.
- The mixing service now derives the equivalent extraction code from:
- source kind: `DRIVER_CARD` or `VEHICLE_UNIT`
- event domain: `DRIVER_ACTIVITY`, `POSITION`, `PLACE`, `BORDER_CROSSING`, `DRIVER_CARD`, etc.
- file-session external id / source-package kind.
## Main changes
## Supported derived mappings
- Renumbered Flyway migrations to remove duplicate `V9` and `V10` versions.
- Removed the duplicated Timescale/Event source record migration.
- Switched local Docker Compose DB from plain PostgreSQL to a TimescaleDB/PostGIS-capable image.
- Added normalized raw tachograph payload metadata for DB-extracted EventHub events.
- Added tests for Flyway version uniqueness and tachograph DB mapper → timeline reconstruction metadata.
| File-session event | Driver-card source | Vehicle-unit source |
|---|---|---|
| `DRIVER_ACTIVITY` | `CARD_ACTIVITY` | `VU_ACTIVITY` |
| `POSITION` | `CARD_POSITION` | `VU_POSITION` |
| `PLACE` | `CARD_PLACE` | `VU_PLACE` |
| `BORDER_CROSSING` | `CARD_BORDER_CROSSING` | `VU_BORDER_CROSSING` |
| `DRIVER_CARD` insert/withdraw | `CARD_VEHICLES_USED` | `IW_CYCLE` |
## Important behavior
- Duplicate file-session `CARD_ACTIVITY` / `VU_ACTIVITY` events are mixed the same way as persistent tachograph DB events.
- Duplicate file-session position/place/border events are also mixed the same way as persistent tachograph DB support evidence.
- `CARD_VEHICLES_USED` and `IW_CYCLE` are still not mixed; they remain accepted for separate vehicle-usage processing.
## Modified files
- `src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java`
- `src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java`

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import java.time.OffsetDateTime;
import java.util.List;
public record RuntimeEventMixingDecisionDto(
String ruleId,
String equivalenceType,
String eventKey,
String decision,
String channel,
String primaryExternalSourceEventId,
String primaryExtractionCode,
List<String> secondaryExternalSourceEventIds,
List<String> secondaryExtractionCodes,
OffsetDateTime occurredAt,
String eventDomain,
String eventType,
String lifecycle,
String reason
) {
public RuntimeEventMixingDecisionDto {
secondaryExternalSourceEventIds = secondaryExternalSourceEventIds == null
? List.of()
: List.copyOf(secondaryExternalSourceEventIds);
secondaryExtractionCodes = secondaryExtractionCodes == null
? List.of()
: List.copyOf(secondaryExtractionCodes);
}
}

View File

@ -0,0 +1,691 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.springframework.stereotype.Service;
@Service
public class RuntimeEventMixingService {
public static final String MODE_OFF = "OFF";
public static final String MODE_TACHOGRAPH_SAME_SOURCE = "TACHOGRAPH_SAME_SOURCE";
public static final String MODE_FULL = "FULL";
public static final String RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY =
"tachograph.activity.card-vu.same-event-key";
public static final String RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY =
"tachograph.activity.card-vu.compatible-activity-key";
public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY =
"tachograph.support.card-vu.same-event-key";
public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY =
"tachograph.support.card-vu.compatible-support-key";
public RuntimeMixedEventBundle mix(List<EventHubEventDto> events, String requestedMode) {
List<EventHubEventDto> rawEvents = sort(events);
String mode = normalizeMode(requestedMode);
if (MODE_OFF.equals(mode)) {
return unchanged(rawEvents, "Runtime event mixing is disabled by eventMixingMode=OFF.");
}
MixingState state = new MixingState(rawEvents);
applyTachographCardVuActivityMixing(state);
applyTachographCardVuSupportEvidenceMixing(state);
List<EventHubEventDto> driverPartitionEvents = rawEvents.stream()
.filter(event -> !state.isSuppressed(event))
.map(state::effectiveEvent)
.toList();
List<EventHubEventDto> activityTimelineEvents = driverPartitionEvents.stream()
.filter(RuntimeEventMixingService::isDriverActivityPoint)
.toList();
// Vehicle-usage events are intentionally not mixed here. CARD_VEHICLES_USED and IW_CYCLE
// are kept as separate input evidence because they must be processed by their own rules later.
List<EventHubEventDto> vehicleUsageEvents = rawEvents.stream()
.filter(RuntimeEventMixingService::isDriverCardUsagePoint)
.toList();
List<EventHubEventDto> supportEvidenceEvents = driverPartitionEvents.stream()
.filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event))
.toList();
List<String> notes = new ArrayList<>();
notes.add("Runtime event mixing inspected " + rawEvents.size() + " event(s).");
notes.add("Runtime event mixing suppressed " + state.suppressedEvents().size()
+ " duplicate source event(s) from activity/support evidence channels.");
notes.add("Runtime event mixing keeps CARD_POSITION, CARD_PLACE, and CARD_BORDER_CROSSING as primary when matching VU support evidence describes the same semantic event.");
notes.add("Runtime event mixing kept all CARD_VEHICLES_USED and IW_CYCLE card-usage point events unchanged for vehicle-usage processing.");
return new RuntimeMixedEventBundle(
rawEvents,
driverPartitionEvents,
activityTimelineEvents,
vehicleUsageEvents,
supportEvidenceEvents,
state.suppressedEvents(),
state.decisions(),
notes,
state.warnings()
);
}
private RuntimeMixedEventBundle unchanged(List<EventHubEventDto> rawEvents, String note) {
return new RuntimeMixedEventBundle(
rawEvents,
rawEvents,
rawEvents.stream().filter(RuntimeEventMixingService::isDriverActivityPoint).toList(),
rawEvents.stream().filter(RuntimeEventMixingService::isDriverCardUsagePoint).toList(),
rawEvents.stream().filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event)).toList(),
List.of(),
List.of(),
List.of(note),
List.of()
);
}
private void applyTachographCardVuActivityMixing(MixingState state) {
LinkedHashMap<String, List<EventHubEventDto>> exactGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (!isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) {
continue;
}
exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : exactGroups.entrySet()) {
fuseCardAndVuDuplicates(
state,
entry.getKey(),
"EXACT_EVENT_KEY",
RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY,
"ACTIVITY_TIMELINE",
entry.getValue(),
"CARD_ACTIVITY",
"VU_ACTIVITY",
"CARD_ACTIVITY and VU_ACTIVITY describe the same driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization."
);
}
LinkedHashMap<String, List<EventHubEventDto>> compatibleGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (state.isSuppressed(event) || !isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) {
continue;
}
compatibleGroups.computeIfAbsent(compatibleActivityKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : compatibleGroups.entrySet()) {
fuseCardAndVuDuplicates(
state,
entry.getKey(),
"COMPATIBLE_ACTIVITY_KEY",
RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY,
"ACTIVITY_TIMELINE",
entry.getValue(),
"CARD_ACTIVITY",
"VU_ACTIVITY",
"CARD_ACTIVITY and VU_ACTIVITY describe a compatible driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization."
);
}
}
private void applyTachographCardVuSupportEvidenceMixing(MixingState state) {
LinkedHashMap<String, List<EventHubEventDto>> exactGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) {
continue;
}
exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : exactGroups.entrySet()) {
fuseCardAndVuSupportDuplicates(
state,
entry.getKey(),
"EXACT_EVENT_KEY",
RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY,
entry.getValue()
);
}
LinkedHashMap<String, List<EventHubEventDto>> compatibleGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) {
continue;
}
compatibleGroups.computeIfAbsent(compatibleSupportEvidenceKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : compatibleGroups.entrySet()) {
fuseCardAndVuSupportDuplicates(
state,
entry.getKey(),
"COMPATIBLE_SUPPORT_KEY",
RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY,
entry.getValue()
);
}
}
private void fuseCardAndVuSupportDuplicates(
MixingState state,
String eventKey,
String equivalenceType,
String ruleId,
List<EventHubEventDto> group
) {
Map.Entry<String, String> pair = cardVuSupportPair(group);
if (pair == null) {
return;
}
fuseCardAndVuDuplicates(
state,
eventKey,
equivalenceType,
ruleId,
"SUPPORT_EVIDENCE",
group,
pair.getKey(),
pair.getValue(),
pair.getKey() + " and " + pair.getValue() + " describe the same support evidence event. "
+ pair.getKey() + " is kept as primary support evidence; " + pair.getValue()
+ " is suppressed from support-evidence normalization. Vehicle/VIN identity from the VU event is copied to the primary event when the card event has weaker vehicle identity."
);
}
private void fuseCardAndVuDuplicates(
MixingState state,
String eventKey,
String equivalenceType,
String ruleId,
String channel,
List<EventHubEventDto> group,
String primaryExtractionCode,
String secondaryExtractionCode,
String reason
) {
List<EventHubEventDto> primaries = group.stream()
.filter(event -> Objects.equals(primaryExtractionCode, extractionCode(event)))
.sorted(eventComparator())
.toList();
List<EventHubEventDto> secondaries = group.stream()
.filter(event -> Objects.equals(secondaryExtractionCode, extractionCode(event)))
.sorted(eventComparator())
.toList();
if (primaries.isEmpty() || secondaries.isEmpty()) {
return;
}
EventHubEventDto primary = primaries.get(0);
List<EventHubEventDto> newlySuppressed = secondaries.stream()
.filter(event -> !state.isSuppressed(event))
.toList();
if (newlySuppressed.isEmpty()) {
return;
}
EventHubEventDto enrichedPrimary = enrichPrimaryVehicleRef(primary, newlySuppressed);
if (enrichedPrimary != primary) {
state.replace(primary, enrichedPrimary);
}
newlySuppressed.forEach(state::suppress);
state.decisions().add(new RuntimeEventMixingDecisionDto(
ruleId,
equivalenceType,
eventKey,
"FUSED_PRIMARY_SELECTED",
channel,
primary.externalSourceEventId(),
extractionCode(primary),
newlySuppressed.stream().map(EventHubEventDto::externalSourceEventId).toList(),
newlySuppressed.stream().map(RuntimeEventMixingService::extractionCode).toList(),
primary.occurredAt(),
primary.eventDomain() == null ? null : primary.eventDomain().name(),
primary.eventType() == null ? null : primary.eventType().name(),
primary.lifecycle() == null ? null : primary.lifecycle().name(),
reason
));
}
private static EventHubEventDto enrichPrimaryVehicleRef(EventHubEventDto primary, List<EventHubEventDto> secondaries) {
if (primary == null || secondaries == null || secondaries.isEmpty()) {
return primary;
}
VehicleRefDto bestSecondary = secondaries.stream()
.map(EventHubEventDto::vehicleRef)
.filter(Objects::nonNull)
.filter(VehicleRefDto::hasAnyReference)
.filter(RuntimeEventMixingService::hasVehicleIdentity)
.findFirst()
.orElse(null);
if (bestSecondary == null || !shouldEnrichVehicleRef(primary.vehicleRef(), bestSecondary)) {
return primary;
}
VehicleRefDto merged = mergeVehicleRef(primary.vehicleRef(), bestSecondary);
if (Objects.equals(primary.vehicleRef(), merged)) {
return primary;
}
return new EventHubEventDto(
primary.eventId(),
primary.externalSourceEventId(),
primary.driverRef(),
merged,
primary.occurredAt(),
primary.receivedPartnerAt(),
primary.receivedHubAt(),
primary.eventDomain(),
primary.eventType(),
primary.lifecycle(),
primary.odometerM(),
primary.position(),
primary.eventDetails(),
primary.sourcePackageRef(),
primary.payload(),
primary.manualEntry(),
primary.packageInfo()
);
}
private static boolean shouldEnrichVehicleRef(VehicleRefDto primary, VehicleRefDto secondary) {
return secondary != null
&& hasVehicleIdentity(secondary)
&& (primary == null || !hasVehicleIdentity(primary));
}
private static boolean hasVehicleIdentity(VehicleRefDto vehicleRef) {
return vehicleRef != null
&& (notBlank(vehicleRef.sourceVehicleEntityId()) || notBlank(vehicleRef.vin()));
}
private static VehicleRefDto mergeVehicleRef(VehicleRefDto primary, VehicleRefDto secondary) {
if (primary == null) {
return secondary;
}
if (secondary == null) {
return primary;
}
VehicleRegistrationRefDto registration = primary.vehicleRegistration() != null && primary.vehicleRegistration().hasValue()
? primary.vehicleRegistration()
: secondary.vehicleRegistration();
return new VehicleRefDto(
firstNonBlank(primary.sourceVehicleEntityId(), secondary.sourceVehicleEntityId()),
firstNonBlank(primary.vin(), secondary.vin()),
firstNonBlank(primary.sourceRegistrationEntityId(), secondary.sourceRegistrationEntityId()),
registration
);
}
private static Map.Entry<String, String> cardVuSupportPair(List<EventHubEventDto> group) {
if (group == null || group.isEmpty()) {
return null;
}
Set<String> extractionCodes = group.stream()
.map(RuntimeEventMixingService::extractionCode)
.filter(Objects::nonNull)
.collect(java.util.stream.Collectors.toCollection(LinkedHashSet::new));
if (extractionCodes.contains("CARD_POSITION") && extractionCodes.contains("VU_POSITION")) {
return Map.entry("CARD_POSITION", "VU_POSITION");
}
if (extractionCodes.contains("CARD_PLACE") && extractionCodes.contains("VU_PLACE")) {
return Map.entry("CARD_PLACE", "VU_PLACE");
}
if (extractionCodes.contains("CARD_BORDER_CROSSING") && extractionCodes.contains("VU_BORDER_CROSSING")) {
return Map.entry("CARD_BORDER_CROSSING", "VU_BORDER_CROSSING");
}
return null;
}
private static boolean isTachographCardOrVuActivity(EventHubEventDto event) {
RuntimeEventSourceProfile profile = sourceProfile(event);
return isTachographRuntimeSource(profile)
&& (Objects.equals("CARD_ACTIVITY", profile.extractionCode())
|| Objects.equals("VU_ACTIVITY", profile.extractionCode()));
}
private static boolean isTachographCardOrVuSupportEvidence(EventHubEventDto event) {
if (event == null || isDriverActivityPoint(event) || isDriverCardUsagePoint(event)) {
return false;
}
RuntimeEventSourceProfile profile = sourceProfile(event);
if (!isTachographRuntimeSource(profile)) {
return false;
}
return switch (nullToEmpty(profile.extractionCode())) {
case "CARD_POSITION", "VU_POSITION", "CARD_PLACE", "VU_PLACE", "CARD_BORDER_CROSSING", "VU_BORDER_CROSSING" -> true;
default -> false;
};
}
private static boolean isDriverActivityPoint(EventHubEventDto event) {
return event != null
&& event.eventDomain() == EventDomain.DRIVER_ACTIVITY
&& (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.END)
&& event.occurredAt() != null;
}
private static boolean isDriverCardUsagePoint(EventHubEventDto event) {
return event != null
&& event.eventDomain() == EventDomain.DRIVER_CARD
&& (event.lifecycle() == EventLifecycle.INSERT || event.lifecycle() == EventLifecycle.WITHDRAW)
&& event.occurredAt() != null;
}
private static RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
String sourceKind = firstNonBlank(text(raw, "sourceKind"), sourceKind(event));
String extractionCode = firstNonBlank(
text(raw, "extractionCode"),
fileSessionExtractionCode(event, sourceKind),
extractionCodeFromExternalSourceEventId(event)
);
String sourceSystem = firstNonBlank(
text(raw, "sourceSystem"),
sourceProvider(event),
sourceSystemFromExternalSourceEventId(event)
);
if (sourceSystem == null && (extractionCode != null || isTachographFileSessionEvent(event))) {
sourceSystem = "TACHOGRAPH";
}
return new RuntimeEventSourceProfile(
normalizeUpper(sourceSystem),
normalizeUpper(sourceKind),
normalizeUpper(extractionCode)
);
}
private static String extractionCode(EventHubEventDto event) {
return sourceProfile(event).extractionCode();
}
private static boolean isTachographRuntimeSource(RuntimeEventSourceProfile profile) {
if (profile == null) {
return false;
}
return switch (nullToEmpty(profile.sourceSystem())) {
case "TACHOGRAPH", "TACHOGRAPH_FILE_SESSION", "COMPOSITE_TACHOGRAPH_FILE_SESSION" -> true;
default -> false;
};
}
private static String fileSessionExtractionCode(EventHubEventDto event, String sourceKind) {
if (!isTachographFileSessionEvent(event)) {
return null;
}
String normalizedSourceKind = normalizeUpper(sourceKind);
if (normalizedSourceKind == null) {
return null;
}
if (event != null && event.eventDomain() == EventDomain.DRIVER_ACTIVITY) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_ACTIVITY";
case "VEHICLE_UNIT" -> "VU_ACTIVITY";
default -> null;
};
}
if (event != null && event.eventDomain() == EventDomain.DRIVER_CARD) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_VEHICLES_USED";
case "VEHICLE_UNIT" -> "IW_CYCLE";
default -> null;
};
}
if (event == null || event.eventDomain() == null) {
return null;
}
String prefix = switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD";
case "VEHICLE_UNIT" -> "VU";
default -> null;
};
if (prefix == null) {
return null;
}
return switch (event.eventDomain()) {
case POSITION -> prefix + "_POSITION";
case PLACE -> prefix + "_PLACE";
case BORDER_CROSSING -> prefix + "_BORDER_CROSSING";
case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null;
default -> null;
};
}
private static boolean isTachographFileSessionEvent(EventHubEventDto event) {
if (event == null) {
return false;
}
String packageKind = event.sourcePackageRef() == null ? null : normalizeUpper(event.sourcePackageRef().packageKind());
if (Objects.equals("TACHOGRAPH_FILE_SESSION", packageKind)
|| Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", packageKind)) {
return true;
}
String externalId = event.externalSourceEventId();
return externalId != null
&& (externalId.startsWith("TACHOGRAPH_FILE_SESSION:")
|| externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:"));
}
private static String compatibleActivityKey(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
return String.join("|",
"ACTIVITY_COMPATIBLE",
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
nullToEmpty(event.eventDomain() == null ? null : event.eventDomain().name()),
nullToEmpty(event.eventType() == null ? null : event.eventType().name()),
nullToEmpty(event.lifecycle() == null ? null : event.lifecycle().name()),
normalizeTime(event.occurredAt()),
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
nullToEmpty(firstNonBlank(text(raw, "startedAt"), text(raw, "intervalStartedAt"))),
nullToEmpty(firstNonBlank(text(raw, "endedAt"), text(raw, "intervalEndedAt"))),
nullToEmpty(firstNonBlank(text(raw, "slot"), text(raw, "cardSlot"))),
nullToEmpty(firstNonBlank(text(raw, "cardStatus"))),
nullToEmpty(firstNonBlank(text(raw, "drivingStatus")))
);
}
private static String compatibleSupportEvidenceKey(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
GeoPointDto position = event == null ? null : event.position();
return String.join("|",
"SUPPORT_COMPATIBLE",
nullToEmpty(event == null || event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()),
nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()),
nullToEmpty(event == null || event.lifecycle() == null ? null : event.lifecycle().name()),
normalizeTime(event == null ? null : event.occurredAt()),
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())),
nullToEmpty(firstNonBlank(text(raw, "longitude"), position == null || position.longitude() == null ? null : position.longitude().toPlainString())),
nullToEmpty(firstNonBlank(text(raw, "odometerM"), event == null || event.odometerM() == null ? null : String.valueOf(event.odometerM()))),
nullToEmpty(firstNonBlank(text(raw, "country"), detailText(event, "country"))),
nullToEmpty(firstNonBlank(text(raw, "region"), detailText(event, "region"))),
nullToEmpty(firstNonBlank(text(raw, "countryFrom"), detailText(event, "countryFrom"))),
nullToEmpty(firstNonBlank(text(raw, "countryTo"), detailText(event, "countryTo"))),
nullToEmpty(firstNonBlank(text(raw, "operation"), detailText(event, "operation")))
);
}
private static Comparator<EventHubEventDto> eventComparator() {
return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name())
.thenComparing(event -> event.eventType() == null ? "" : event.eventType().name())
.thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name())
.thenComparing(event -> sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo))
.thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo));
}
private static List<EventHubEventDto> sort(List<EventHubEventDto> events) {
return (events == null ? List.<EventHubEventDto>of() : events).stream()
.filter(Objects::nonNull)
.sorted(eventComparator())
.toList();
}
private static String normalizeMode(String requestedMode) {
String value = normalizeUpper(requestedMode);
if (value == null) {
return MODE_TACHOGRAPH_SAME_SOURCE;
}
return switch (value) {
case MODE_OFF, MODE_TACHOGRAPH_SAME_SOURCE, MODE_FULL -> value;
default -> MODE_TACHOGRAPH_SAME_SOURCE;
};
}
private static JsonNode rawPayload(EventHubEventDto event) {
return RuntimeEntityReferenceResolver.rawPayload(event);
}
private static String sourceKind(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().sourceKind();
}
private static String sourceProvider(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().providerKey();
}
private static String sourceSystemFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
return parts.length >= 1 ? parts[0] : null;
}
private static String extractionCodeFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
return parts.length >= 2 ? parts[1] : null;
}
private static String detailText(EventHubEventDto event, String field) {
if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) {
return null;
}
JsonNode value = event.eventDetails().attributes().get(field);
if (value == null || value.isNull()) {
return null;
}
String text = value.asText(null);
return text == null || text.isBlank() ? null : text.trim();
}
private static String text(JsonNode node, String field) {
if (node == null || field == null) {
return null;
}
JsonNode value = node.get(field);
if (value == null || value.isNull()) {
return null;
}
String text = value.asText(null);
return text == null || text.isBlank() ? null : text.trim();
}
private static String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String value : values) {
if (value != null && !value.isBlank()) {
return value.trim();
}
}
return null;
}
private static String normalizeUpper(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT);
}
private static String nullToEmpty(Object value) {
return value == null ? "" : String.valueOf(value);
}
private static String normalizeTime(OffsetDateTime value) {
return value == null ? "" : value.toInstant().toString();
}
private static boolean notBlank(String value) {
return value != null && !value.isBlank();
}
private static final class MixingState {
private final List<EventHubEventDto> rawEvents;
private final Set<String> suppressedEventIds = new LinkedHashSet<>();
private final Map<String, EventHubEventDto> replacementsByEventId = new LinkedHashMap<>();
private final List<EventHubEventDto> suppressedEvents = new ArrayList<>();
private final List<RuntimeEventMixingDecisionDto> decisions = new ArrayList<>();
private final List<String> warnings = new ArrayList<>();
private MixingState(List<EventHubEventDto> rawEvents) {
this.rawEvents = rawEvents;
}
private List<EventHubEventDto> rawEvents() {
return rawEvents;
}
private boolean isSuppressed(EventHubEventDto event) {
return suppressedEventIds.contains(eventId(event));
}
private EventHubEventDto effectiveEvent(EventHubEventDto event) {
return replacementsByEventId.getOrDefault(eventId(event), event);
}
private void replace(EventHubEventDto original, EventHubEventDto replacement) {
if (original != null && replacement != null) {
replacementsByEventId.put(eventId(original), replacement);
}
}
private void suppress(EventHubEventDto event) {
if (suppressedEventIds.add(eventId(event))) {
suppressedEvents.add(event);
}
}
private List<EventHubEventDto> suppressedEvents() {
return suppressedEvents;
}
private List<RuntimeEventMixingDecisionDto> decisions() {
return decisions;
}
private List<String> warnings() {
return warnings;
}
private String eventId(EventHubEventDto event) {
if (event == null) {
return "<null>";
}
return firstNonBlank(event.externalSourceEventId(), event.eventId() == null ? null : event.eventId().toString(), RuntimeEventIdentityResolver.canonicalEventKey(event));
}
}
}

View File

@ -0,0 +1,8 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
public record RuntimeEventSourceProfile(
String sourceSystem,
String sourceKind,
String extractionCode
) {
}

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import at.procon.eventhub.dto.EventHubEventDto;
import java.util.List;
public record RuntimeMixedEventBundle(
List<EventHubEventDto> rawEvents,
List<EventHubEventDto> driverPartitionEvents,
List<EventHubEventDto> activityTimelineEvents,
List<EventHubEventDto> vehicleUsageEvents,
List<EventHubEventDto> supportEvidenceEvents,
List<EventHubEventDto> suppressedEvents,
List<RuntimeEventMixingDecisionDto> eventMixingDecisions,
List<String> notes,
List<String> warnings
) {
public RuntimeMixedEventBundle {
rawEvents = rawEvents == null ? List.of() : List.copyOf(rawEvents);
driverPartitionEvents = driverPartitionEvents == null ? List.of() : List.copyOf(driverPartitionEvents);
activityTimelineEvents = activityTimelineEvents == null ? List.of() : List.copyOf(activityTimelineEvents);
vehicleUsageEvents = vehicleUsageEvents == null ? List.of() : List.copyOf(vehicleUsageEvents);
supportEvidenceEvents = supportEvidenceEvents == null ? List.of() : List.copyOf(supportEvidenceEvents);
suppressedEvents = suppressedEvents == null ? List.of() : List.copyOf(suppressedEvents);
eventMixingDecisions = eventMixingDecisions == null ? List.of() : List.copyOf(eventMixingDecisions);
notes = notes == null ? List.of() : List.copyOf(notes);
warnings = warnings == null ? List.of() : List.copyOf(warnings);
}
}

View File

@ -52,7 +52,7 @@ public class DriverActivityIntervalsModule implements RuntimeEplModule {
@Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.sourceEvents(context);
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.activityTimelineEvents(context);
List<Map<String, Object>> pointEvents = DriverWorkingTimeEplEventMapper.activityPointEvents(sourceEvents);
RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition(
moduleKey(),

View File

@ -52,7 +52,7 @@ public class DriverVehicleUsageIntervalsModule implements RuntimeEplModule {
@Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.sourceEvents(context);
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.vehicleUsageEvents(context);
List<Map<String, Object>> pointEvents = DriverWorkingTimeEplEventMapper.vehicleUsagePointEvents(sourceEvents);
RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition(
moduleKey(),

View File

@ -3,6 +3,7 @@ package at.procon.eventhub.processing.eventprocessing.module;
public final class DriverWorkingTimeModuleKeys {
public static final String RUNTIME_EVENT_ASSEMBLY = "runtime-event-assembly";
public static final String EVENT_EVIDENCE_MIXING = "event-evidence-mixing";
public static final String EVENT_TO_ACTIVITY_INTERVALS = "event-to-activity-intervals";
public static final String EVENT_TO_VEHICLE_USAGE_INTERVALS = "event-to-vehicle-usage-intervals";
public static final String VEHICLE_USAGE_MERGE = "vehicle-usage-merge";

View File

@ -0,0 +1,103 @@
package at.procon.eventhub.processing.eventprocessing.module;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeEventMixingService;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeMixedEventBundle;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class EventEvidenceMixingModule implements RuntimeProcessingModule {
public static final String EVENT_MIXING_MODE_PARAMETER = "eventMixingMode";
private final RuntimeEventMixingService eventMixingService;
@Autowired
public EventEvidenceMixingModule(RuntimeEventMixingService eventMixingService) {
this.eventMixingService = eventMixingService;
}
/** Compatibility constructor for tests/local registries that do not wire the mixing service. */
public EventEvidenceMixingModule() {
this.eventMixingService = null;
}
@Override
public String moduleKey() {
return DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING;
}
@Override
public RuntimeProcessingModuleDescriptorDto descriptor() {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. The initial rule collapses duplicate tachograph CARD_ACTIVITY/VU_ACTIVITY events by normalized eventKey while keeping CARD_VEHICLES_USED unchanged for vehicle-usage processing.",
"JAVA",
Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto")
);
}
@Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
if (eventMixingService == null) {
return delegatedPlaceholder();
}
List<EventHubEventDto> sourceEvents = sourceEvents(context);
RuntimeMixedEventBundle mixed = eventMixingService.mix(sourceEvents, eventMixingMode(context));
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("inputEventCount", mixed.rawEvents().size());
metadata.put("driverPartitionEventCount", mixed.driverPartitionEvents().size());
metadata.put("activityTimelineEventCount", mixed.activityTimelineEvents().size());
metadata.put("vehicleUsageEventCount", mixed.vehicleUsageEvents().size());
metadata.put("supportEvidenceEventCount", mixed.supportEvidenceEvents().size());
metadata.put("suppressedEventCount", mixed.suppressedEvents().size());
metadata.put("eventMixingDecisionCount", mixed.eventMixingDecisions().size());
metadata.put("eventMixingMode", eventMixingMode(context));
return new RuntimeProcessingModuleResult(
moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS,
mixed,
metadata,
mixed.warnings()
);
}
private RuntimeProcessingModuleResult delegatedPlaceholder() {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("executionModel", "delegated");
metadata.put("note", "Event evidence mixing is not wired in this local/legacy module registry.");
return new RuntimeProcessingModuleResult(
moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS,
null,
metadata,
List.of()
);
}
private List<EventHubEventDto> sourceEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult assemblyResult = context.previousResults().get(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY);
if (assemblyResult != null && assemblyResult.output() instanceof UnifiedRuntimeEventBundle bundle) {
return bundle.mergedEvents();
}
return context.events();
}
private String eventMixingMode(RuntimeProcessingModuleContext context) {
Object value = context.request() == null || context.request().parameters() == null
? null
: context.request().parameters().get(EVENT_MIXING_MODE_PARAMETER);
if (value == null) {
value = context.attributes().get(EVENT_MIXING_MODE_PARAMETER);
}
return value == null ? RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE : value.toString();
}
}

View File

@ -7,6 +7,7 @@ import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDr
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import at.procon.eventhub.processing.dto.RuntimeDriverPartitionDebugDto;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.eventprocessing.module.epl.DriverWorkingTimeEplEventMapper;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
@ -66,6 +67,7 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
return delegatedPlaceholder();
}
UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context);
List<EventHubEventDto> partitionSourceEvents = DriverWorkingTimeEplEventMapper.driverPartitionEvents(context);
UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context);
boolean includePartitionDebug = booleanAttribute(context, "includePartitionDebug", false);
List<DriverWorkingTimeVehicleUsageInterval> mergedVehicleUsageIntervals = mergedVehicleUsageIntervals(context);
@ -73,8 +75,8 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
LinkedHashMap<String, DriverWorkingTimeDriverPartition> partitions = new LinkedHashMap<>();
LinkedHashMap<String, List<String>> attachedVehicleEvidenceByEvent = new LinkedHashMap<>();
List<String> warnings = new ArrayList<>();
for (String driverKey : selectedDriverKeys(scopeRequest.toRuntimeRequest(), broadBundle.mergedEvents())) {
List<EventHubEventDto> directDriverEvents = broadBundle.mergedEvents().stream()
for (String driverKey : selectedDriverKeys(scopeRequest.toRuntimeRequest(), partitionSourceEvents)) {
List<EventHubEventDto> directDriverEvents = partitionSourceEvents.stream()
.filter(event -> Objects.equals(driverKey(event), driverKey))
.toList();
List<DriverWorkingTimeVehicleUsageInterval> driverVehicleUsageIntervals = mergedVehicleUsageIntervals.stream()
@ -83,7 +85,7 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
RuntimeDriverVehicleEvidenceAttachmentResult attachmentResult = vehicleEvidenceAttachmentService.attachVehicleEvidence(
driverKey,
directDriverEvents,
broadBundle.mergedEvents(),
partitionSourceEvents,
driverVehicleUsageIntervals,
booleanAttribute(
context,
@ -132,6 +134,8 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
metadata.put("attachedVehicleEvidenceEventCount", partitions.values().stream()
.mapToInt(partition -> partition.attachedVehicleEvidenceEvents().size())
.sum());
metadata.put("partitionSourceEventCount", partitionSourceEvents.size());
metadata.put("rawMergedEventCount", broadBundle.mergedEvents().size());
return new RuntimeProcessingModuleResult(
moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS,

View File

@ -7,6 +7,7 @@ import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeModuleKeys;
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleContext;
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleResult;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeMixedEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
@ -38,6 +39,30 @@ public final class DriverWorkingTimeEplEventMapper {
return safeList(context.events());
}
public static List<EventHubEventDto> activityTimelineEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING);
if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) {
return safeList(mixed.activityTimelineEvents());
}
return sourceEvents(context);
}
public static List<EventHubEventDto> vehicleUsageEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING);
if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) {
return safeList(mixed.vehicleUsageEvents());
}
return sourceEvents(context);
}
public static List<EventHubEventDto> driverPartitionEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING);
if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) {
return safeList(mixed.driverPartitionEvents());
}
return sourceEvents(context);
}
public static List<Map<String, Object>> activityPointEvents(List<EventHubEventDto> sourceEvents) {
return safeList(sourceEvents).stream()
.map(DriverWorkingTimeEplEventMapper::toActivityPointEvent)

View File

@ -114,6 +114,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"JAVA",
Set.of("UnifiedRuntimeEventBundle")
));
descriptors.add(new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
"Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. Initially collapses same-event-key CARD_ACTIVITY/VU_ACTIVITY duplicates and leaves CARD_VEHICLES_USED unchanged.",
"JAVA",
Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto")
));
}
descriptors.addAll(List.of(
new RuntimeProcessingModuleDescriptorDto(
@ -171,7 +178,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"vehicleEvidencePaddingMinutes",
"includeActivityIntervals",
"includeDrivingIntervals",
"includePartitionDebug"
"includePartitionDebug",
"eventMixingMode"
);
}

View File

@ -0,0 +1,467 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDetailsDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class RuntimeEventMixingServiceTest {
private final RuntimeEventMixingService service = new RuntimeEventMixingService();
@Test
void suppressesVuActivityDuplicateFromActivityTimelineWhenCardActivityHasSameEventKey() {
EventHubEventDto card = activity("CARD_ACTIVITY", "DRIVER_CARD", "TACHOGRAPH:CARD_ACTIVITY:1:START");
EventHubEventDto vu = activity("VU_ACTIVITY", "VEHICLE_UNIT", "TACHOGRAPH:VU_ACTIVITY:2:START");
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.rawEvents()).hasSize(2);
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_ACTIVITY:1:START");
assertThat(mixed.driverPartitionEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_ACTIVITY:1:START");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:VU_ACTIVITY:2:START");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY);
}
@Test
void keepsVuActivityWhenNoMatchingCardActivityExists() {
EventHubEventDto vu = activity("VU_ACTIVITY", "VEHICLE_UNIT", "TACHOGRAPH:VU_ACTIVITY:2:START");
RuntimeMixedEventBundle mixed = service.mix(List.of(vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:VU_ACTIVITY:2:START");
assertThat(mixed.suppressedEvents()).isEmpty();
assertThat(mixed.eventMixingDecisions()).isEmpty();
}
@Test
void suppressesTachographFileSessionVuActivityDuplicateWhenNoExtractionCodeIsPresent() {
EventHubEventDto card = fileSessionActivity(
"DRIVER_CARD",
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-interval-1:START:2026-04-01T00:00:00Z"
);
EventHubEventDto vu = fileSessionActivity(
"VEHICLE_UNIT",
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-interval-1:START:2026-04-01T00:00:00Z"
);
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-interval-1:START:2026-04-01T00:00:00Z");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-interval-1:START:2026-04-01T00:00:00Z");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
assertThat(mixed.eventMixingDecisions().getFirst().secondaryExtractionCodes())
.containsExactly("VU_ACTIVITY");
}
@Test
void keepsAllCardVehicleUsedAndIwCycleEventsForVehicleUsageProcessing() {
EventHubEventDto cardVehicleUsed = cardUsage(
"CARD_VEHICLES_USED",
"DRIVER_CARD",
"TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT",
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
EventHubEventDto iwCycle = cardUsage(
"IW_CYCLE",
"VEHICLE_UNIT",
"TACHOGRAPH:IW_CYCLE:20:INSERT",
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
RuntimeMixedEventBundle mixed = service.mix(List.of(cardVehicleUsed, iwCycle), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.vehicleUsageEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT", "TACHOGRAPH:IW_CYCLE:20:INSERT");
assertThat(mixed.suppressedEvents()).isEmpty();
}
@Test
void suppressesVuPositionDuplicateFromSupportEvidenceAndCopiesVehicleIdentityToCardPrimary() {
EventHubEventDto card = supportEvidence(
"CARD_POSITION",
"DRIVER_CARD",
"TACHOGRAPH:CARD_POSITION:1",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT,
false
);
EventHubEventDto vu = supportEvidence(
"VU_POSITION",
"VEHICLE_UNIT",
"TACHOGRAPH:VU_POSITION:2",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT,
true
);
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_POSITION:1");
assertThat(mixed.supportEvidenceEvents().getFirst().vehicleRef().vin())
.isEqualTo("WDB9634031L123456");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:VU_POSITION:2");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY);
assertThat(mixed.eventMixingDecisions().getFirst().channel())
.isEqualTo("SUPPORT_EVIDENCE");
}
@Test
void suppressesVuPlaceAndBorderDuplicateFromSupportEvidence() {
EventHubEventDto cardPlace = supportEvidence(
"CARD_PLACE",
"DRIVER_CARD",
"TACHOGRAPH:CARD_PLACE:1",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.SNAPSHOT,
false
);
EventHubEventDto vuPlace = supportEvidence(
"VU_PLACE",
"VEHICLE_UNIT",
"TACHOGRAPH:VU_PLACE:2",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.SNAPSHOT,
true
);
EventHubEventDto cardBorder = supportEvidence(
"CARD_BORDER_CROSSING",
"DRIVER_CARD",
"TACHOGRAPH:CARD_BORDER_CROSSING:3",
EventDomain.BORDER_CROSSING,
EventType.BORDER_OUTBOUND,
EventLifecycle.OUTBOUND,
false
);
EventHubEventDto vuBorder = supportEvidence(
"VU_BORDER_CROSSING",
"VEHICLE_UNIT",
"TACHOGRAPH:VU_BORDER_CROSSING:4",
EventDomain.BORDER_CROSSING,
EventType.BORDER_OUTBOUND,
EventLifecycle.OUTBOUND,
true
);
RuntimeMixedEventBundle mixed = service.mix(List.of(cardPlace, vuPlace, cardBorder, vuBorder), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_BORDER_CROSSING:3", "TACHOGRAPH:CARD_PLACE:1");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:VU_BORDER_CROSSING:4", "TACHOGRAPH:VU_PLACE:2");
assertThat(mixed.eventMixingDecisions()).hasSize(2);
}
@Test
void suppressesTachographFileSessionVuPositionDuplicateWhenNoExtractionCodeIsPresent() {
EventHubEventDto card = fileSessionSupportEvidence(
"DRIVER_CARD",
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-position-1:SNAPSHOT:2026-04-01T00:00:00Z",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT,
false
);
EventHubEventDto vu = fileSessionSupportEvidence(
"VEHICLE_UNIT",
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-position-1:SNAPSHOT:2026-04-01T00:00:00Z",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT,
true
);
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-position-1:SNAPSHOT:2026-04-01T00:00:00Z");
assertThat(mixed.supportEvidenceEvents().getFirst().vehicleRef().vin())
.isEqualTo("WDB9634031L123456");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-position-1:SNAPSHOT:2026-04-01T00:00:00Z");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
assertThat(mixed.eventMixingDecisions().getFirst().secondaryExtractionCodes())
.containsExactly("VU_POSITION");
}
private EventHubEventDto activity(String extractionCode, String sourceKind, String externalId) {
ObjectNode raw = baseRaw(extractionCode, sourceKind);
raw.put("activityType", "BREAK_REST");
raw.put("cardSlot", "DRIVER");
raw.put("cardStatus", "INSERTED");
raw.put("drivingStatus", "SINGLE");
raw.put("startedAt", "2026-03-31T15:43:00Z");
raw.put("endedAt", "2026-04-01T00:00:00Z");
raw.put("intervalId", "TACHOGRAPH:" + extractionCode + ":1");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
attributes.put("cardSlot", "DRIVER");
attributes.put("cardStatus", "INSERTED");
attributes.put("drivingStatus", "SINGLE");
return event(
externalId,
sourceKind,
EventDomain.DRIVER_ACTIVITY,
EventType.BREAK_REST,
EventLifecycle.START,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
new EventDetailsDto("DRIVER_ACTIVITY", attributes),
payload
);
}
private EventHubEventDto supportEvidence(
String extractionCode,
String sourceKind,
String externalId,
EventDomain domain,
EventType type,
EventLifecycle lifecycle,
boolean withVin
) {
ObjectNode raw = baseRaw(extractionCode, sourceKind);
raw.put("latitude", "48.208174");
raw.put("longitude", "16.373819");
raw.put("odometerM", "123000");
raw.put("country", "AT");
raw.put("region", "W");
raw.put("countryFrom", "AT");
raw.put("countryTo", "DE");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
attributes.put("country", "AT");
attributes.put("region", "W");
attributes.put("countryFrom", "AT");
attributes.put("countryTo", "DE");
VehicleRefDto vehicleRef = withVin
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));
return event(
externalId,
sourceKind,
domain,
type,
lifecycle,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
new EventDetailsDto(domain.name(), attributes),
payload,
vehicleRef
);
}
private EventHubEventDto fileSessionActivity(String sourceKind, String externalId) {
ObjectNode raw = baseRawWithoutExtraction(sourceKind);
raw.put("activityType", "BREAK_REST");
raw.put("cardSlot", "DRIVER");
raw.put("cardStatus", "INSERTED");
raw.put("drivingStatus", "SINGLE");
raw.put("startedAt", "2026-03-31T15:43:00Z");
raw.put("endedAt", "2026-04-01T00:00:00Z");
raw.put("intervalId", "TACHOGRAPH_FILE_SESSION:ACTIVITY:1");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
attributes.put("cardSlot", "DRIVER");
attributes.put("cardStatus", "INSERTED");
attributes.put("drivingStatus", "SINGLE");
return event(
externalId,
sourceKind,
EventDomain.DRIVER_ACTIVITY,
EventType.BREAK_REST,
EventLifecycle.START,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
new EventDetailsDto("DRIVER_ACTIVITY", attributes),
payload
);
}
private EventHubEventDto fileSessionSupportEvidence(
String sourceKind,
String externalId,
EventDomain domain,
EventType type,
EventLifecycle lifecycle,
boolean withVin
) {
ObjectNode raw = baseRawWithoutExtraction(sourceKind);
raw.put("latitude", "48.208174");
raw.put("longitude", "16.373819");
raw.put("odometerM", "123000");
raw.put("country", "AT");
raw.put("region", "W");
raw.put("countryFrom", "AT");
raw.put("countryTo", "DE");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
attributes.put("country", "AT");
attributes.put("region", "W");
attributes.put("countryFrom", "AT");
attributes.put("countryTo", "DE");
VehicleRefDto vehicleRef = withVin
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));
return event(
externalId,
sourceKind,
domain,
type,
lifecycle,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
new EventDetailsDto(domain.name(), attributes),
payload,
vehicleRef
);
}
private EventHubEventDto cardUsage(
String extractionCode,
String sourceKind,
String externalId,
EventType eventType,
EventLifecycle lifecycle
) {
ObjectNode raw = baseRaw(extractionCode, sourceKind);
raw.put("intervalId", "TACHOGRAPH:" + extractionCode + ":1");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
return event(
externalId,
sourceKind,
EventDomain.DRIVER_CARD,
eventType,
lifecycle,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
null,
payload
);
}
private ObjectNode baseRaw(String extractionCode, String sourceKind) {
ObjectNode raw = JsonNodeFactory.instance.objectNode();
raw.put("sourceRowId", extractionCode + "-1");
raw.put("sourceKind", sourceKind);
raw.put("extractionCode", extractionCode);
raw.put("driverKey", "1:10000000198490");
raw.put("registrationKey", "1:LL-158TE");
return raw;
}
private ObjectNode baseRawWithoutExtraction(String sourceKind) {
ObjectNode raw = JsonNodeFactory.instance.objectNode();
raw.put("sourceRowId", sourceKind + "-1");
raw.put("sourceKind", sourceKind);
raw.put("driverKey", "1:10000000198490");
raw.put("registrationKey", "1:LL-158TE");
return raw;
}
private EventHubEventDto event(
String externalId,
String sourceKind,
EventDomain domain,
EventType type,
EventLifecycle lifecycle,
OffsetDateTime occurredAt,
EventDetailsDto details,
ObjectNode payload
) {
return event(
externalId,
sourceKind,
domain,
type,
lifecycle,
occurredAt,
details,
payload,
new VehicleRefDto(null, null, null, new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
);
}
private EventHubEventDto event(
String externalId,
String sourceKind,
EventDomain domain,
EventType type,
EventLifecycle lifecycle,
OffsetDateTime occurredAt,
EventDetailsDto details,
ObjectNode payload,
VehicleRefDto vehicleRef
) {
return new EventHubEventDto(
UUID.randomUUID(),
externalId,
new DriverRefDto("1:10000000198490", new DriverCardRefDto("1", 1, "10000000198490")),
vehicleRef,
occurredAt,
null,
occurredAt,
domain,
type,
lifecycle,
null,
null,
details,
null,
payload,
false,
packageInfo(sourceKind, domain, occurredAt.toLocalDate())
);
}
private EventHubPackageRequest packageInfo(String sourceKind, EventDomain domain, LocalDate businessDate) {
EventSourceDto source = new EventSourceDto("TACHOGRAPH", sourceKind, "TACHOGRAPH_" + sourceKind, null, null, null);
OffsetDateTime from = businessDate.atStartOfDay().atOffset(java.time.ZoneOffset.UTC);
OffsetDateTime to = businessDate.plusDays(1).atStartOfDay().atOffset(java.time.ZoneOffset.UTC);
return new EventHubPackageRequest(
"default",
source,
null,
ImportScopeDto.tenantAll(from, to),
domain.name(),
businessDate,
source.stableKey() + ":" + domain.name() + ":" + businessDate
);
}
}