Compare commits

...

3 Commits

Author SHA1 Message Date
trifonovt b4289abea5 Add runtime event mixing project changes 2026-06-11 13:11:17 +02:00
trifonovt 3cd2533d5b Ignore and untrack logs directory 2026-06-11 11:56:48 +02:00
trifonovt d7ecf52330 Fix VU interval deduplication and slot normalization 2026-06-11 11:43:20 +02:00
22 changed files with 1547 additions and 25 deletions

2
.gitignore vendored
View File

@ -5,4 +5,4 @@ target/
.project .project
.settings/ .settings/
.DS_Store .DS_Store
logs/eventhub-ingestion-service.log logs/

View File

@ -1,17 +1,33 @@
# EventHub fix-list patch # Patch: Tachograph file-session support for runtime event mixing
This patch implements the requested remaining items from the fix list, excluding build verification and SQL Server 2008 SQL rewriting. This patch extends the existing `event-evidence-mixing` implementation so it also supports events produced by uploaded tachograph file sessions.
## Apply notes ## What changed
1. Copy the files from this archive into the project root. - `RuntimeEventMixingService` now treats `TACHOGRAPH_FILE_SESSION` and `COMPOSITE_TACHOGRAPH_FILE_SESSION` events as tachograph runtime evidence.
2. Delete the old migration files listed in `DELETE_FILES.txt`. - File-session events do not always have a DB-style `extractionCode` such as `CARD_ACTIVITY` or `VU_ACTIVITY` in the raw payload.
3. Run the test suite locally with Maven/Java 21. - The mixing service now derives the equivalent extraction code from:
- source kind: `DRIVER_CARD` or `VEHICLE_UNIT`
- event domain: `DRIVER_ACTIVITY`, `POSITION`, `PLACE`, `BORDER_CROSSING`, `DRIVER_CARD`, etc.
- file-session external id / source-package kind.
## Main changes ## Supported derived mappings
- Renumbered Flyway migrations to remove duplicate `V9` and `V10` versions. | File-session event | Driver-card source | Vehicle-unit source |
- Removed the duplicated Timescale/Event source record migration. |---|---|---|
- Switched local Docker Compose DB from plain PostgreSQL to a TimescaleDB/PostGIS-capable image. | `DRIVER_ACTIVITY` | `CARD_ACTIVITY` | `VU_ACTIVITY` |
- Added normalized raw tachograph payload metadata for DB-extracted EventHub events. | `POSITION` | `CARD_POSITION` | `VU_POSITION` |
- Added tests for Flyway version uniqueness and tachograph DB mapper → timeline reconstruction metadata. | `PLACE` | `CARD_PLACE` | `VU_PLACE` |
| `BORDER_CROSSING` | `CARD_BORDER_CROSSING` | `VU_BORDER_CROSSING` |
| `DRIVER_CARD` insert/withdraw | `CARD_VEHICLES_USED` | `IW_CYCLE` |
## Important behavior
- Duplicate file-session `CARD_ACTIVITY` / `VU_ACTIVITY` events are mixed the same way as persistent tachograph DB events.
- Duplicate file-session position/place/border events are also mixed the same way as persistent tachograph DB support evidence.
- `CARD_VEHICLES_USED` and `IW_CYCLE` are still not mixed; they remain accepted for separate vehicle-usage processing.
## Modified files
- `src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java`
- `src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java`

View File

@ -0,0 +1,30 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import java.time.OffsetDateTime;
import java.util.List;
public record RuntimeEventMixingDecisionDto(
String ruleId,
String equivalenceType,
String eventKey,
String decision,
String channel,
String primaryExternalSourceEventId,
String primaryExtractionCode,
List<String> secondaryExternalSourceEventIds,
List<String> secondaryExtractionCodes,
OffsetDateTime occurredAt,
String eventDomain,
String eventType,
String lifecycle,
String reason
) {
public RuntimeEventMixingDecisionDto {
secondaryExternalSourceEventIds = secondaryExternalSourceEventIds == null
? List.of()
: List.copyOf(secondaryExternalSourceEventIds);
secondaryExtractionCodes = secondaryExtractionCodes == null
? List.of()
: List.copyOf(secondaryExtractionCodes);
}
}

View File

@ -0,0 +1,691 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.GeoPointDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.springframework.stereotype.Service;
@Service
public class RuntimeEventMixingService {
public static final String MODE_OFF = "OFF";
public static final String MODE_TACHOGRAPH_SAME_SOURCE = "TACHOGRAPH_SAME_SOURCE";
public static final String MODE_FULL = "FULL";
public static final String RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY =
"tachograph.activity.card-vu.same-event-key";
public static final String RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY =
"tachograph.activity.card-vu.compatible-activity-key";
public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY =
"tachograph.support.card-vu.same-event-key";
public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY =
"tachograph.support.card-vu.compatible-support-key";
public RuntimeMixedEventBundle mix(List<EventHubEventDto> events, String requestedMode) {
List<EventHubEventDto> rawEvents = sort(events);
String mode = normalizeMode(requestedMode);
if (MODE_OFF.equals(mode)) {
return unchanged(rawEvents, "Runtime event mixing is disabled by eventMixingMode=OFF.");
}
MixingState state = new MixingState(rawEvents);
applyTachographCardVuActivityMixing(state);
applyTachographCardVuSupportEvidenceMixing(state);
List<EventHubEventDto> driverPartitionEvents = rawEvents.stream()
.filter(event -> !state.isSuppressed(event))
.map(state::effectiveEvent)
.toList();
List<EventHubEventDto> activityTimelineEvents = driverPartitionEvents.stream()
.filter(RuntimeEventMixingService::isDriverActivityPoint)
.toList();
// Vehicle-usage events are intentionally not mixed here. CARD_VEHICLES_USED and IW_CYCLE
// are kept as separate input evidence because they must be processed by their own rules later.
List<EventHubEventDto> vehicleUsageEvents = rawEvents.stream()
.filter(RuntimeEventMixingService::isDriverCardUsagePoint)
.toList();
List<EventHubEventDto> supportEvidenceEvents = driverPartitionEvents.stream()
.filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event))
.toList();
List<String> notes = new ArrayList<>();
notes.add("Runtime event mixing inspected " + rawEvents.size() + " event(s).");
notes.add("Runtime event mixing suppressed " + state.suppressedEvents().size()
+ " duplicate source event(s) from activity/support evidence channels.");
notes.add("Runtime event mixing keeps CARD_POSITION, CARD_PLACE, and CARD_BORDER_CROSSING as primary when matching VU support evidence describes the same semantic event.");
notes.add("Runtime event mixing kept all CARD_VEHICLES_USED and IW_CYCLE card-usage point events unchanged for vehicle-usage processing.");
return new RuntimeMixedEventBundle(
rawEvents,
driverPartitionEvents,
activityTimelineEvents,
vehicleUsageEvents,
supportEvidenceEvents,
state.suppressedEvents(),
state.decisions(),
notes,
state.warnings()
);
}
private RuntimeMixedEventBundle unchanged(List<EventHubEventDto> rawEvents, String note) {
return new RuntimeMixedEventBundle(
rawEvents,
rawEvents,
rawEvents.stream().filter(RuntimeEventMixingService::isDriverActivityPoint).toList(),
rawEvents.stream().filter(RuntimeEventMixingService::isDriverCardUsagePoint).toList(),
rawEvents.stream().filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event)).toList(),
List.of(),
List.of(),
List.of(note),
List.of()
);
}
private void applyTachographCardVuActivityMixing(MixingState state) {
LinkedHashMap<String, List<EventHubEventDto>> exactGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (!isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) {
continue;
}
exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : exactGroups.entrySet()) {
fuseCardAndVuDuplicates(
state,
entry.getKey(),
"EXACT_EVENT_KEY",
RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY,
"ACTIVITY_TIMELINE",
entry.getValue(),
"CARD_ACTIVITY",
"VU_ACTIVITY",
"CARD_ACTIVITY and VU_ACTIVITY describe the same driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization."
);
}
LinkedHashMap<String, List<EventHubEventDto>> compatibleGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (state.isSuppressed(event) || !isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) {
continue;
}
compatibleGroups.computeIfAbsent(compatibleActivityKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : compatibleGroups.entrySet()) {
fuseCardAndVuDuplicates(
state,
entry.getKey(),
"COMPATIBLE_ACTIVITY_KEY",
RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY,
"ACTIVITY_TIMELINE",
entry.getValue(),
"CARD_ACTIVITY",
"VU_ACTIVITY",
"CARD_ACTIVITY and VU_ACTIVITY describe a compatible driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization."
);
}
}
private void applyTachographCardVuSupportEvidenceMixing(MixingState state) {
LinkedHashMap<String, List<EventHubEventDto>> exactGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) {
continue;
}
exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : exactGroups.entrySet()) {
fuseCardAndVuSupportDuplicates(
state,
entry.getKey(),
"EXACT_EVENT_KEY",
RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY,
entry.getValue()
);
}
LinkedHashMap<String, List<EventHubEventDto>> compatibleGroups = new LinkedHashMap<>();
for (EventHubEventDto event : state.rawEvents()) {
if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) {
continue;
}
compatibleGroups.computeIfAbsent(compatibleSupportEvidenceKey(event), ignored -> new ArrayList<>())
.add(event);
}
for (Map.Entry<String, List<EventHubEventDto>> entry : compatibleGroups.entrySet()) {
fuseCardAndVuSupportDuplicates(
state,
entry.getKey(),
"COMPATIBLE_SUPPORT_KEY",
RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY,
entry.getValue()
);
}
}
private void fuseCardAndVuSupportDuplicates(
MixingState state,
String eventKey,
String equivalenceType,
String ruleId,
List<EventHubEventDto> group
) {
Map.Entry<String, String> pair = cardVuSupportPair(group);
if (pair == null) {
return;
}
fuseCardAndVuDuplicates(
state,
eventKey,
equivalenceType,
ruleId,
"SUPPORT_EVIDENCE",
group,
pair.getKey(),
pair.getValue(),
pair.getKey() + " and " + pair.getValue() + " describe the same support evidence event. "
+ pair.getKey() + " is kept as primary support evidence; " + pair.getValue()
+ " is suppressed from support-evidence normalization. Vehicle/VIN identity from the VU event is copied to the primary event when the card event has weaker vehicle identity."
);
}
private void fuseCardAndVuDuplicates(
MixingState state,
String eventKey,
String equivalenceType,
String ruleId,
String channel,
List<EventHubEventDto> group,
String primaryExtractionCode,
String secondaryExtractionCode,
String reason
) {
List<EventHubEventDto> primaries = group.stream()
.filter(event -> Objects.equals(primaryExtractionCode, extractionCode(event)))
.sorted(eventComparator())
.toList();
List<EventHubEventDto> secondaries = group.stream()
.filter(event -> Objects.equals(secondaryExtractionCode, extractionCode(event)))
.sorted(eventComparator())
.toList();
if (primaries.isEmpty() || secondaries.isEmpty()) {
return;
}
EventHubEventDto primary = primaries.get(0);
List<EventHubEventDto> newlySuppressed = secondaries.stream()
.filter(event -> !state.isSuppressed(event))
.toList();
if (newlySuppressed.isEmpty()) {
return;
}
EventHubEventDto enrichedPrimary = enrichPrimaryVehicleRef(primary, newlySuppressed);
if (enrichedPrimary != primary) {
state.replace(primary, enrichedPrimary);
}
newlySuppressed.forEach(state::suppress);
state.decisions().add(new RuntimeEventMixingDecisionDto(
ruleId,
equivalenceType,
eventKey,
"FUSED_PRIMARY_SELECTED",
channel,
primary.externalSourceEventId(),
extractionCode(primary),
newlySuppressed.stream().map(EventHubEventDto::externalSourceEventId).toList(),
newlySuppressed.stream().map(RuntimeEventMixingService::extractionCode).toList(),
primary.occurredAt(),
primary.eventDomain() == null ? null : primary.eventDomain().name(),
primary.eventType() == null ? null : primary.eventType().name(),
primary.lifecycle() == null ? null : primary.lifecycle().name(),
reason
));
}
private static EventHubEventDto enrichPrimaryVehicleRef(EventHubEventDto primary, List<EventHubEventDto> secondaries) {
if (primary == null || secondaries == null || secondaries.isEmpty()) {
return primary;
}
VehicleRefDto bestSecondary = secondaries.stream()
.map(EventHubEventDto::vehicleRef)
.filter(Objects::nonNull)
.filter(VehicleRefDto::hasAnyReference)
.filter(RuntimeEventMixingService::hasVehicleIdentity)
.findFirst()
.orElse(null);
if (bestSecondary == null || !shouldEnrichVehicleRef(primary.vehicleRef(), bestSecondary)) {
return primary;
}
VehicleRefDto merged = mergeVehicleRef(primary.vehicleRef(), bestSecondary);
if (Objects.equals(primary.vehicleRef(), merged)) {
return primary;
}
return new EventHubEventDto(
primary.eventId(),
primary.externalSourceEventId(),
primary.driverRef(),
merged,
primary.occurredAt(),
primary.receivedPartnerAt(),
primary.receivedHubAt(),
primary.eventDomain(),
primary.eventType(),
primary.lifecycle(),
primary.odometerM(),
primary.position(),
primary.eventDetails(),
primary.sourcePackageRef(),
primary.payload(),
primary.manualEntry(),
primary.packageInfo()
);
}
private static boolean shouldEnrichVehicleRef(VehicleRefDto primary, VehicleRefDto secondary) {
return secondary != null
&& hasVehicleIdentity(secondary)
&& (primary == null || !hasVehicleIdentity(primary));
}
private static boolean hasVehicleIdentity(VehicleRefDto vehicleRef) {
return vehicleRef != null
&& (notBlank(vehicleRef.sourceVehicleEntityId()) || notBlank(vehicleRef.vin()));
}
private static VehicleRefDto mergeVehicleRef(VehicleRefDto primary, VehicleRefDto secondary) {
if (primary == null) {
return secondary;
}
if (secondary == null) {
return primary;
}
VehicleRegistrationRefDto registration = primary.vehicleRegistration() != null && primary.vehicleRegistration().hasValue()
? primary.vehicleRegistration()
: secondary.vehicleRegistration();
return new VehicleRefDto(
firstNonBlank(primary.sourceVehicleEntityId(), secondary.sourceVehicleEntityId()),
firstNonBlank(primary.vin(), secondary.vin()),
firstNonBlank(primary.sourceRegistrationEntityId(), secondary.sourceRegistrationEntityId()),
registration
);
}
private static Map.Entry<String, String> cardVuSupportPair(List<EventHubEventDto> group) {
if (group == null || group.isEmpty()) {
return null;
}
Set<String> extractionCodes = group.stream()
.map(RuntimeEventMixingService::extractionCode)
.filter(Objects::nonNull)
.collect(java.util.stream.Collectors.toCollection(LinkedHashSet::new));
if (extractionCodes.contains("CARD_POSITION") && extractionCodes.contains("VU_POSITION")) {
return Map.entry("CARD_POSITION", "VU_POSITION");
}
if (extractionCodes.contains("CARD_PLACE") && extractionCodes.contains("VU_PLACE")) {
return Map.entry("CARD_PLACE", "VU_PLACE");
}
if (extractionCodes.contains("CARD_BORDER_CROSSING") && extractionCodes.contains("VU_BORDER_CROSSING")) {
return Map.entry("CARD_BORDER_CROSSING", "VU_BORDER_CROSSING");
}
return null;
}
private static boolean isTachographCardOrVuActivity(EventHubEventDto event) {
RuntimeEventSourceProfile profile = sourceProfile(event);
return isTachographRuntimeSource(profile)
&& (Objects.equals("CARD_ACTIVITY", profile.extractionCode())
|| Objects.equals("VU_ACTIVITY", profile.extractionCode()));
}
private static boolean isTachographCardOrVuSupportEvidence(EventHubEventDto event) {
if (event == null || isDriverActivityPoint(event) || isDriverCardUsagePoint(event)) {
return false;
}
RuntimeEventSourceProfile profile = sourceProfile(event);
if (!isTachographRuntimeSource(profile)) {
return false;
}
return switch (nullToEmpty(profile.extractionCode())) {
case "CARD_POSITION", "VU_POSITION", "CARD_PLACE", "VU_PLACE", "CARD_BORDER_CROSSING", "VU_BORDER_CROSSING" -> true;
default -> false;
};
}
private static boolean isDriverActivityPoint(EventHubEventDto event) {
return event != null
&& event.eventDomain() == EventDomain.DRIVER_ACTIVITY
&& (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.END)
&& event.occurredAt() != null;
}
private static boolean isDriverCardUsagePoint(EventHubEventDto event) {
return event != null
&& event.eventDomain() == EventDomain.DRIVER_CARD
&& (event.lifecycle() == EventLifecycle.INSERT || event.lifecycle() == EventLifecycle.WITHDRAW)
&& event.occurredAt() != null;
}
private static RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
String sourceKind = firstNonBlank(text(raw, "sourceKind"), sourceKind(event));
String extractionCode = firstNonBlank(
text(raw, "extractionCode"),
fileSessionExtractionCode(event, sourceKind),
extractionCodeFromExternalSourceEventId(event)
);
String sourceSystem = firstNonBlank(
text(raw, "sourceSystem"),
sourceProvider(event),
sourceSystemFromExternalSourceEventId(event)
);
if (sourceSystem == null && (extractionCode != null || isTachographFileSessionEvent(event))) {
sourceSystem = "TACHOGRAPH";
}
return new RuntimeEventSourceProfile(
normalizeUpper(sourceSystem),
normalizeUpper(sourceKind),
normalizeUpper(extractionCode)
);
}
private static String extractionCode(EventHubEventDto event) {
return sourceProfile(event).extractionCode();
}
private static boolean isTachographRuntimeSource(RuntimeEventSourceProfile profile) {
if (profile == null) {
return false;
}
return switch (nullToEmpty(profile.sourceSystem())) {
case "TACHOGRAPH", "TACHOGRAPH_FILE_SESSION", "COMPOSITE_TACHOGRAPH_FILE_SESSION" -> true;
default -> false;
};
}
private static String fileSessionExtractionCode(EventHubEventDto event, String sourceKind) {
if (!isTachographFileSessionEvent(event)) {
return null;
}
String normalizedSourceKind = normalizeUpper(sourceKind);
if (normalizedSourceKind == null) {
return null;
}
if (event != null && event.eventDomain() == EventDomain.DRIVER_ACTIVITY) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_ACTIVITY";
case "VEHICLE_UNIT" -> "VU_ACTIVITY";
default -> null;
};
}
if (event != null && event.eventDomain() == EventDomain.DRIVER_CARD) {
return switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD_VEHICLES_USED";
case "VEHICLE_UNIT" -> "IW_CYCLE";
default -> null;
};
}
if (event == null || event.eventDomain() == null) {
return null;
}
String prefix = switch (normalizedSourceKind) {
case "DRIVER_CARD" -> "CARD";
case "VEHICLE_UNIT" -> "VU";
default -> null;
};
if (prefix == null) {
return null;
}
return switch (event.eventDomain()) {
case POSITION -> prefix + "_POSITION";
case PLACE -> prefix + "_PLACE";
case BORDER_CROSSING -> prefix + "_BORDER_CROSSING";
case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null;
default -> null;
};
}
private static boolean isTachographFileSessionEvent(EventHubEventDto event) {
if (event == null) {
return false;
}
String packageKind = event.sourcePackageRef() == null ? null : normalizeUpper(event.sourcePackageRef().packageKind());
if (Objects.equals("TACHOGRAPH_FILE_SESSION", packageKind)
|| Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", packageKind)) {
return true;
}
String externalId = event.externalSourceEventId();
return externalId != null
&& (externalId.startsWith("TACHOGRAPH_FILE_SESSION:")
|| externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:"));
}
private static String compatibleActivityKey(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
return String.join("|",
"ACTIVITY_COMPATIBLE",
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
nullToEmpty(event.eventDomain() == null ? null : event.eventDomain().name()),
nullToEmpty(event.eventType() == null ? null : event.eventType().name()),
nullToEmpty(event.lifecycle() == null ? null : event.lifecycle().name()),
normalizeTime(event.occurredAt()),
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
nullToEmpty(firstNonBlank(text(raw, "startedAt"), text(raw, "intervalStartedAt"))),
nullToEmpty(firstNonBlank(text(raw, "endedAt"), text(raw, "intervalEndedAt"))),
nullToEmpty(firstNonBlank(text(raw, "slot"), text(raw, "cardSlot"))),
nullToEmpty(firstNonBlank(text(raw, "cardStatus"))),
nullToEmpty(firstNonBlank(text(raw, "drivingStatus")))
);
}
private static String compatibleSupportEvidenceKey(EventHubEventDto event) {
JsonNode raw = rawPayload(event);
GeoPointDto position = event == null ? null : event.position();
return String.join("|",
"SUPPORT_COMPATIBLE",
nullToEmpty(event == null || event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()),
nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()),
nullToEmpty(event == null || event.lifecycle() == null ? null : event.lifecycle().name()),
normalizeTime(event == null ? null : event.occurredAt()),
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())),
nullToEmpty(firstNonBlank(text(raw, "longitude"), position == null || position.longitude() == null ? null : position.longitude().toPlainString())),
nullToEmpty(firstNonBlank(text(raw, "odometerM"), event == null || event.odometerM() == null ? null : String.valueOf(event.odometerM()))),
nullToEmpty(firstNonBlank(text(raw, "country"), detailText(event, "country"))),
nullToEmpty(firstNonBlank(text(raw, "region"), detailText(event, "region"))),
nullToEmpty(firstNonBlank(text(raw, "countryFrom"), detailText(event, "countryFrom"))),
nullToEmpty(firstNonBlank(text(raw, "countryTo"), detailText(event, "countryTo"))),
nullToEmpty(firstNonBlank(text(raw, "operation"), detailText(event, "operation")))
);
}
private static Comparator<EventHubEventDto> eventComparator() {
return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))
.thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name())
.thenComparing(event -> event.eventType() == null ? "" : event.eventType().name())
.thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name())
.thenComparing(event -> sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo))
.thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo));
}
private static List<EventHubEventDto> sort(List<EventHubEventDto> events) {
return (events == null ? List.<EventHubEventDto>of() : events).stream()
.filter(Objects::nonNull)
.sorted(eventComparator())
.toList();
}
private static String normalizeMode(String requestedMode) {
String value = normalizeUpper(requestedMode);
if (value == null) {
return MODE_TACHOGRAPH_SAME_SOURCE;
}
return switch (value) {
case MODE_OFF, MODE_TACHOGRAPH_SAME_SOURCE, MODE_FULL -> value;
default -> MODE_TACHOGRAPH_SAME_SOURCE;
};
}
private static JsonNode rawPayload(EventHubEventDto event) {
return RuntimeEntityReferenceResolver.rawPayload(event);
}
private static String sourceKind(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().sourceKind();
}
private static String sourceProvider(EventHubEventDto event) {
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
? null
: event.packageInfo().eventSource().providerKey();
}
private static String sourceSystemFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
return parts.length >= 1 ? parts[0] : null;
}
private static String extractionCodeFromExternalSourceEventId(EventHubEventDto event) {
String externalId = event == null ? null : event.externalSourceEventId();
if (externalId == null || externalId.isBlank()) {
return null;
}
String[] parts = externalId.split(":");
return parts.length >= 2 ? parts[1] : null;
}
private static String detailText(EventHubEventDto event, String field) {
if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) {
return null;
}
JsonNode value = event.eventDetails().attributes().get(field);
if (value == null || value.isNull()) {
return null;
}
String text = value.asText(null);
return text == null || text.isBlank() ? null : text.trim();
}
private static String text(JsonNode node, String field) {
if (node == null || field == null) {
return null;
}
JsonNode value = node.get(field);
if (value == null || value.isNull()) {
return null;
}
String text = value.asText(null);
return text == null || text.isBlank() ? null : text.trim();
}
private static String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String value : values) {
if (value != null && !value.isBlank()) {
return value.trim();
}
}
return null;
}
private static String normalizeUpper(String value) {
return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT);
}
private static String nullToEmpty(Object value) {
return value == null ? "" : String.valueOf(value);
}
private static String normalizeTime(OffsetDateTime value) {
return value == null ? "" : value.toInstant().toString();
}
private static boolean notBlank(String value) {
return value != null && !value.isBlank();
}
private static final class MixingState {
private final List<EventHubEventDto> rawEvents;
private final Set<String> suppressedEventIds = new LinkedHashSet<>();
private final Map<String, EventHubEventDto> replacementsByEventId = new LinkedHashMap<>();
private final List<EventHubEventDto> suppressedEvents = new ArrayList<>();
private final List<RuntimeEventMixingDecisionDto> decisions = new ArrayList<>();
private final List<String> warnings = new ArrayList<>();
private MixingState(List<EventHubEventDto> rawEvents) {
this.rawEvents = rawEvents;
}
private List<EventHubEventDto> rawEvents() {
return rawEvents;
}
private boolean isSuppressed(EventHubEventDto event) {
return suppressedEventIds.contains(eventId(event));
}
private EventHubEventDto effectiveEvent(EventHubEventDto event) {
return replacementsByEventId.getOrDefault(eventId(event), event);
}
private void replace(EventHubEventDto original, EventHubEventDto replacement) {
if (original != null && replacement != null) {
replacementsByEventId.put(eventId(original), replacement);
}
}
private void suppress(EventHubEventDto event) {
if (suppressedEventIds.add(eventId(event))) {
suppressedEvents.add(event);
}
}
private List<EventHubEventDto> suppressedEvents() {
return suppressedEvents;
}
private List<RuntimeEventMixingDecisionDto> decisions() {
return decisions;
}
private List<String> warnings() {
return warnings;
}
private String eventId(EventHubEventDto event) {
if (event == null) {
return "<null>";
}
return firstNonBlank(event.externalSourceEventId(), event.eventId() == null ? null : event.eventId().toString(), RuntimeEventIdentityResolver.canonicalEventKey(event));
}
}
}

View File

@ -0,0 +1,8 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
public record RuntimeEventSourceProfile(
String sourceSystem,
String sourceKind,
String extractionCode
) {
}

View File

@ -0,0 +1,28 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import at.procon.eventhub.dto.EventHubEventDto;
import java.util.List;
public record RuntimeMixedEventBundle(
List<EventHubEventDto> rawEvents,
List<EventHubEventDto> driverPartitionEvents,
List<EventHubEventDto> activityTimelineEvents,
List<EventHubEventDto> vehicleUsageEvents,
List<EventHubEventDto> supportEvidenceEvents,
List<EventHubEventDto> suppressedEvents,
List<RuntimeEventMixingDecisionDto> eventMixingDecisions,
List<String> notes,
List<String> warnings
) {
public RuntimeMixedEventBundle {
rawEvents = rawEvents == null ? List.of() : List.copyOf(rawEvents);
driverPartitionEvents = driverPartitionEvents == null ? List.of() : List.copyOf(driverPartitionEvents);
activityTimelineEvents = activityTimelineEvents == null ? List.of() : List.copyOf(activityTimelineEvents);
vehicleUsageEvents = vehicleUsageEvents == null ? List.of() : List.copyOf(vehicleUsageEvents);
supportEvidenceEvents = supportEvidenceEvents == null ? List.of() : List.copyOf(supportEvidenceEvents);
suppressedEvents = suppressedEvents == null ? List.of() : List.copyOf(suppressedEvents);
eventMixingDecisions = eventMixingDecisions == null ? List.of() : List.copyOf(eventMixingDecisions);
notes = notes == null ? List.of() : List.copyOf(notes);
warnings = warnings == null ? List.of() : List.copyOf(warnings);
}
}

View File

@ -52,7 +52,7 @@ public class DriverActivityIntervalsModule implements RuntimeEplModule {
@Override @Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) { public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.sourceEvents(context); List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.activityTimelineEvents(context);
List<Map<String, Object>> pointEvents = DriverWorkingTimeEplEventMapper.activityPointEvents(sourceEvents); List<Map<String, Object>> pointEvents = DriverWorkingTimeEplEventMapper.activityPointEvents(sourceEvents);
RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition( RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition(
moduleKey(), moduleKey(),

View File

@ -52,7 +52,7 @@ public class DriverVehicleUsageIntervalsModule implements RuntimeEplModule {
@Override @Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) { public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.sourceEvents(context); List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.vehicleUsageEvents(context);
List<Map<String, Object>> pointEvents = DriverWorkingTimeEplEventMapper.vehicleUsagePointEvents(sourceEvents); List<Map<String, Object>> pointEvents = DriverWorkingTimeEplEventMapper.vehicleUsagePointEvents(sourceEvents);
RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition( RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition(
moduleKey(), moduleKey(),

View File

@ -3,6 +3,7 @@ package at.procon.eventhub.processing.eventprocessing.module;
public final class DriverWorkingTimeModuleKeys { public final class DriverWorkingTimeModuleKeys {
public static final String RUNTIME_EVENT_ASSEMBLY = "runtime-event-assembly"; public static final String RUNTIME_EVENT_ASSEMBLY = "runtime-event-assembly";
public static final String EVENT_EVIDENCE_MIXING = "event-evidence-mixing";
public static final String EVENT_TO_ACTIVITY_INTERVALS = "event-to-activity-intervals"; public static final String EVENT_TO_ACTIVITY_INTERVALS = "event-to-activity-intervals";
public static final String EVENT_TO_VEHICLE_USAGE_INTERVALS = "event-to-vehicle-usage-intervals"; public static final String EVENT_TO_VEHICLE_USAGE_INTERVALS = "event-to-vehicle-usage-intervals";
public static final String VEHICLE_USAGE_MERGE = "vehicle-usage-merge"; public static final String VEHICLE_USAGE_MERGE = "vehicle-usage-merge";

View File

@ -0,0 +1,103 @@
package at.procon.eventhub.processing.eventprocessing.module;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeEventMixingService;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeMixedEventBundle;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class EventEvidenceMixingModule implements RuntimeProcessingModule {
public static final String EVENT_MIXING_MODE_PARAMETER = "eventMixingMode";
private final RuntimeEventMixingService eventMixingService;
@Autowired
public EventEvidenceMixingModule(RuntimeEventMixingService eventMixingService) {
this.eventMixingService = eventMixingService;
}
/** Compatibility constructor for tests/local registries that do not wire the mixing service. */
public EventEvidenceMixingModule() {
this.eventMixingService = null;
}
@Override
public String moduleKey() {
return DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING;
}
@Override
public RuntimeProcessingModuleDescriptorDto descriptor() {
return new RuntimeProcessingModuleDescriptorDto(
moduleKey(),
"Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. The initial rule collapses duplicate tachograph CARD_ACTIVITY/VU_ACTIVITY events by normalized eventKey while keeping CARD_VEHICLES_USED unchanged for vehicle-usage processing.",
"JAVA",
Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto")
);
}
@Override
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
if (eventMixingService == null) {
return delegatedPlaceholder();
}
List<EventHubEventDto> sourceEvents = sourceEvents(context);
RuntimeMixedEventBundle mixed = eventMixingService.mix(sourceEvents, eventMixingMode(context));
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("inputEventCount", mixed.rawEvents().size());
metadata.put("driverPartitionEventCount", mixed.driverPartitionEvents().size());
metadata.put("activityTimelineEventCount", mixed.activityTimelineEvents().size());
metadata.put("vehicleUsageEventCount", mixed.vehicleUsageEvents().size());
metadata.put("supportEvidenceEventCount", mixed.supportEvidenceEvents().size());
metadata.put("suppressedEventCount", mixed.suppressedEvents().size());
metadata.put("eventMixingDecisionCount", mixed.eventMixingDecisions().size());
metadata.put("eventMixingMode", eventMixingMode(context));
return new RuntimeProcessingModuleResult(
moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS,
mixed,
metadata,
mixed.warnings()
);
}
private RuntimeProcessingModuleResult delegatedPlaceholder() {
Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put("executionModel", "delegated");
metadata.put("note", "Event evidence mixing is not wired in this local/legacy module registry.");
return new RuntimeProcessingModuleResult(
moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS,
null,
metadata,
List.of()
);
}
private List<EventHubEventDto> sourceEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult assemblyResult = context.previousResults().get(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY);
if (assemblyResult != null && assemblyResult.output() instanceof UnifiedRuntimeEventBundle bundle) {
return bundle.mergedEvents();
}
return context.events();
}
private String eventMixingMode(RuntimeProcessingModuleContext context) {
Object value = context.request() == null || context.request().parameters() == null
? null
: context.request().parameters().get(EVENT_MIXING_MODE_PARAMETER);
if (value == null) {
value = context.attributes().get(EVENT_MIXING_MODE_PARAMETER);
}
return value == null ? RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE : value.toString();
}
}

View File

@ -7,6 +7,7 @@ import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDr
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval; import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
import at.procon.eventhub.processing.dto.RuntimeDriverPartitionDebugDto; import at.procon.eventhub.processing.dto.RuntimeDriverPartitionDebugDto;
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest; import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
import at.procon.eventhub.processing.eventprocessing.module.epl.DriverWorkingTimeEplEventMapper;
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto; import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult; import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult;
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef; import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
@ -66,6 +67,7 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
return delegatedPlaceholder(); return delegatedPlaceholder();
} }
UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context); UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context);
List<EventHubEventDto> partitionSourceEvents = DriverWorkingTimeEplEventMapper.driverPartitionEvents(context);
UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context); UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context);
boolean includePartitionDebug = booleanAttribute(context, "includePartitionDebug", false); boolean includePartitionDebug = booleanAttribute(context, "includePartitionDebug", false);
List<DriverWorkingTimeVehicleUsageInterval> mergedVehicleUsageIntervals = mergedVehicleUsageIntervals(context); List<DriverWorkingTimeVehicleUsageInterval> mergedVehicleUsageIntervals = mergedVehicleUsageIntervals(context);
@ -73,8 +75,8 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
LinkedHashMap<String, DriverWorkingTimeDriverPartition> partitions = new LinkedHashMap<>(); LinkedHashMap<String, DriverWorkingTimeDriverPartition> partitions = new LinkedHashMap<>();
LinkedHashMap<String, List<String>> attachedVehicleEvidenceByEvent = new LinkedHashMap<>(); LinkedHashMap<String, List<String>> attachedVehicleEvidenceByEvent = new LinkedHashMap<>();
List<String> warnings = new ArrayList<>(); List<String> warnings = new ArrayList<>();
for (String driverKey : selectedDriverKeys(scopeRequest.toRuntimeRequest(), broadBundle.mergedEvents())) { for (String driverKey : selectedDriverKeys(scopeRequest.toRuntimeRequest(), partitionSourceEvents)) {
List<EventHubEventDto> directDriverEvents = broadBundle.mergedEvents().stream() List<EventHubEventDto> directDriverEvents = partitionSourceEvents.stream()
.filter(event -> Objects.equals(driverKey(event), driverKey)) .filter(event -> Objects.equals(driverKey(event), driverKey))
.toList(); .toList();
List<DriverWorkingTimeVehicleUsageInterval> driverVehicleUsageIntervals = mergedVehicleUsageIntervals.stream() List<DriverWorkingTimeVehicleUsageInterval> driverVehicleUsageIntervals = mergedVehicleUsageIntervals.stream()
@ -83,7 +85,7 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
RuntimeDriverVehicleEvidenceAttachmentResult attachmentResult = vehicleEvidenceAttachmentService.attachVehicleEvidence( RuntimeDriverVehicleEvidenceAttachmentResult attachmentResult = vehicleEvidenceAttachmentService.attachVehicleEvidence(
driverKey, driverKey,
directDriverEvents, directDriverEvents,
broadBundle.mergedEvents(), partitionSourceEvents,
driverVehicleUsageIntervals, driverVehicleUsageIntervals,
booleanAttribute( booleanAttribute(
context, context,
@ -132,6 +134,8 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
metadata.put("attachedVehicleEvidenceEventCount", partitions.values().stream() metadata.put("attachedVehicleEvidenceEventCount", partitions.values().stream()
.mapToInt(partition -> partition.attachedVehicleEvidenceEvents().size()) .mapToInt(partition -> partition.attachedVehicleEvidenceEvents().size())
.sum()); .sum());
metadata.put("partitionSourceEventCount", partitionSourceEvents.size());
metadata.put("rawMergedEventCount", broadBundle.mergedEvents().size());
return new RuntimeProcessingModuleResult( return new RuntimeProcessingModuleResult(
moduleKey(), moduleKey(),
RuntimeProcessingModuleStatus.SUCCESS, RuntimeProcessingModuleStatus.SUCCESS,

View File

@ -7,6 +7,7 @@ import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeModuleKeys; import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeModuleKeys;
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleContext; import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleContext;
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleResult; import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleResult;
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeMixedEventBundle;
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle; import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver; import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver; import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
@ -38,6 +39,30 @@ public final class DriverWorkingTimeEplEventMapper {
return safeList(context.events()); return safeList(context.events());
} }
public static List<EventHubEventDto> activityTimelineEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING);
if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) {
return safeList(mixed.activityTimelineEvents());
}
return sourceEvents(context);
}
public static List<EventHubEventDto> vehicleUsageEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING);
if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) {
return safeList(mixed.vehicleUsageEvents());
}
return sourceEvents(context);
}
public static List<EventHubEventDto> driverPartitionEvents(RuntimeProcessingModuleContext context) {
RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING);
if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) {
return safeList(mixed.driverPartitionEvents());
}
return sourceEvents(context);
}
public static List<Map<String, Object>> activityPointEvents(List<EventHubEventDto> sourceEvents) { public static List<Map<String, Object>> activityPointEvents(List<EventHubEventDto> sourceEvents) {
return safeList(sourceEvents).stream() return safeList(sourceEvents).stream()
.map(DriverWorkingTimeEplEventMapper::toActivityPointEvent) .map(DriverWorkingTimeEplEventMapper::toActivityPointEvent)

View File

@ -114,6 +114,13 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"JAVA", "JAVA",
Set.of("UnifiedRuntimeEventBundle") Set.of("UnifiedRuntimeEventBundle")
)); ));
descriptors.add(new RuntimeProcessingModuleDescriptorDto(
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
"Event evidence mixing",
"Applies source-aware runtime evidence rules before intervalization. Initially collapses same-event-key CARD_ACTIVITY/VU_ACTIVITY duplicates and leaves CARD_VEHICLES_USED unchanged.",
"JAVA",
Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto")
));
} }
descriptors.addAll(List.of( descriptors.addAll(List.of(
new RuntimeProcessingModuleDescriptorDto( new RuntimeProcessingModuleDescriptorDto(
@ -171,7 +178,8 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
"vehicleEvidencePaddingMinutes", "vehicleEvidencePaddingMinutes",
"includeActivityIntervals", "includeActivityIntervals",
"includeDrivingIntervals", "includeDrivingIntervals",
"includePartitionDebug" "includePartitionDebug",
"eventMixingMode"
); );
} }

View File

@ -114,7 +114,7 @@ public class VehicleUnitXmlExtractionService {
continue; continue;
} }
builder.vehicleUsageIntervals.add(new ExtractedCardVehicleUsageInterval( builder.addVehicleUsageInterval(new ExtractedCardVehicleUsageInterval(
"VUIW-" + (i + 1), "VUIW-" + (i + 1),
from, from,
to, to,
@ -124,9 +124,9 @@ public class VehicleUnitXmlExtractionService {
vehicleContext.vehicle() == null ? null : vehicleContext.vehicle().vehicleKey(), vehicleContext.vehicle() == null ? null : vehicleContext.vehicle().vehicleKey(),
path path
)); ));
vuCardIwIntervals.add(new VuCardIwInterval( addVuCardIwInterval(vuCardIwIntervals, new VuCardIwInterval(
driverKey, driverKey,
normalizeToken(text(record, "cardSlotNumber")), normalizeVuSlot(text(record, "cardSlotNumber")),
from, from,
to, to,
path path
@ -270,7 +270,7 @@ public class VehicleUnitXmlExtractionService {
parsedChanges.add(new ActivityChange( parsedChanges.add(new ActivityChange(
from, from,
normalizeActivity(text(change, "activity")), normalizeActivity(text(change, "activity")),
normalizeToken(text(change, "slot")), normalizeVuSlot(text(change, "slot")),
normalizeToken(text(change, "cardStatus")), normalizeToken(text(change, "cardStatus")),
normalizeToken(text(change, "drivingStatus")), normalizeToken(text(change, "drivingStatus")),
dayPath + "/activityChangeInfos[" + (changeIndex + 1) + "]" dayPath + "/activityChangeInfos[" + (changeIndex + 1) + "]"
@ -967,11 +967,12 @@ public class VehicleUnitXmlExtractionService {
String explicitSlot, String explicitSlot,
List<VuCardIwInterval> vuCardIwIntervals List<VuCardIwInterval> vuCardIwIntervals
) { ) {
String normalizedExplicitSlot = normalizeVuSlot(explicitSlot);
if (explicitDriverKey != null) { if (explicitDriverKey != null) {
return List.of(new DriverAssignment(explicitDriverKey, explicitSlot)); return List.of(new DriverAssignment(explicitDriverKey, normalizedExplicitSlot));
} }
return vuCardIwIntervals.stream() return vuCardIwIntervals.stream()
.filter(iw -> explicitSlot == null || explicitSlot.equals(iw.slot())) .filter(iw -> normalizedExplicitSlot == null || normalizedExplicitSlot.equals(iw.slot()))
.filter(iw -> iw.covers(occurredAt)) .filter(iw -> iw.covers(occurredAt))
.map(iw -> new DriverAssignment(iw.driverKey(), iw.slot())) .map(iw -> new DriverAssignment(iw.driverKey(), iw.slot()))
.toList(); .toList();
@ -985,6 +986,22 @@ public class VehicleUnitXmlExtractionService {
return List.copyOf(unique.values()); return List.copyOf(unique.values());
} }
private void addVuCardIwInterval(List<VuCardIwInterval> intervals, VuCardIwInterval candidate) {
boolean alreadyPresent = intervals.stream().anyMatch(existing ->
sameVuCardIwInterval(existing, candidate)
);
if (!alreadyPresent) {
intervals.add(candidate);
}
}
private boolean sameVuCardIwInterval(VuCardIwInterval existing, VuCardIwInterval candidate) {
return java.util.Objects.equals(existing.driverKey(), candidate.driverKey())
&& java.util.Objects.equals(existing.slot(), candidate.slot())
&& java.util.Objects.equals(existing.from(), candidate.from())
&& java.util.Objects.equals(existing.to(), candidate.to());
}
private String driverKeyFromCardNode(Element node, String basePath) { private String driverKeyFromCardNode(Element node, String basePath) {
String cardNation = text(node, basePath + "/cardIssuingMemberState"); String cardNation = text(node, basePath + "/cardIssuingMemberState");
String cardNumber = driverKeyFactory.canonicalCardNumber(joinCardNumber(node, basePath + "/cardNumber")); String cardNumber = driverKeyFactory.canonicalCardNumber(joinCardNumber(node, basePath + "/cardNumber"));
@ -1073,6 +1090,18 @@ public class VehicleUnitXmlExtractionService {
return value.trim().toUpperCase().replace('-', '_').replace(' ', '_'); return value.trim().toUpperCase().replace('-', '_').replace(' ', '_');
} }
private String normalizeVuSlot(String value) {
String normalized = normalizeToken(value);
if (normalized == null) {
return null;
}
return switch (normalized) {
case "0", "DRIVER" -> "DRIVER";
case "1", "CO_DRIVER", "CODRIVER" -> "CO_DRIVER";
default -> normalized;
};
}
private String mapPlaceEntryType(String entryType) { private String mapPlaceEntryType(String entryType) {
String normalized = normalizeToken(entryType); String normalized = normalizeToken(entryType);
if (normalized == null) { if (normalized == null) {
@ -1200,6 +1229,27 @@ public class VehicleUnitXmlExtractionService {
} }
} }
private void addVehicleUsageInterval(ExtractedCardVehicleUsageInterval candidate) {
boolean alreadyPresent = vehicleUsageIntervals.stream().anyMatch(existing ->
sameVehicleUsageInterval(existing, candidate)
);
if (!alreadyPresent) {
vehicleUsageIntervals.add(candidate);
}
}
private boolean sameVehicleUsageInterval(
ExtractedCardVehicleUsageInterval existing,
ExtractedCardVehicleUsageInterval candidate
) {
return java.util.Objects.equals(existing.from(), candidate.from())
&& java.util.Objects.equals(existing.to(), candidate.to())
&& java.util.Objects.equals(existing.odometerBeginKm(), candidate.odometerBeginKm())
&& java.util.Objects.equals(existing.odometerEndKm(), candidate.odometerEndKm())
&& java.util.Objects.equals(existing.registrationKey(), candidate.registrationKey())
&& java.util.Objects.equals(existing.vehicleKey(), candidate.vehicleKey());
}
private DriverExtractionSession build() { private DriverExtractionSession build() {
return new DriverExtractionSession( return new DriverExtractionSession(
driverKey, driverKey,

View File

@ -0,0 +1,467 @@
package at.procon.eventhub.processing.eventprocessing.mixing;
import static org.assertj.core.api.Assertions.assertThat;
import at.procon.eventhub.dto.DriverCardRefDto;
import at.procon.eventhub.dto.DriverRefDto;
import at.procon.eventhub.dto.EventDetailsDto;
import at.procon.eventhub.dto.EventDomain;
import at.procon.eventhub.dto.EventHubEventDto;
import at.procon.eventhub.dto.EventHubPackageRequest;
import at.procon.eventhub.dto.EventLifecycle;
import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.dto.EventType;
import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.dto.VehicleRefDto;
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
class RuntimeEventMixingServiceTest {
private final RuntimeEventMixingService service = new RuntimeEventMixingService();
@Test
void suppressesVuActivityDuplicateFromActivityTimelineWhenCardActivityHasSameEventKey() {
EventHubEventDto card = activity("CARD_ACTIVITY", "DRIVER_CARD", "TACHOGRAPH:CARD_ACTIVITY:1:START");
EventHubEventDto vu = activity("VU_ACTIVITY", "VEHICLE_UNIT", "TACHOGRAPH:VU_ACTIVITY:2:START");
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.rawEvents()).hasSize(2);
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_ACTIVITY:1:START");
assertThat(mixed.driverPartitionEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_ACTIVITY:1:START");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:VU_ACTIVITY:2:START");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY);
}
@Test
void keepsVuActivityWhenNoMatchingCardActivityExists() {
EventHubEventDto vu = activity("VU_ACTIVITY", "VEHICLE_UNIT", "TACHOGRAPH:VU_ACTIVITY:2:START");
RuntimeMixedEventBundle mixed = service.mix(List.of(vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:VU_ACTIVITY:2:START");
assertThat(mixed.suppressedEvents()).isEmpty();
assertThat(mixed.eventMixingDecisions()).isEmpty();
}
@Test
void suppressesTachographFileSessionVuActivityDuplicateWhenNoExtractionCodeIsPresent() {
EventHubEventDto card = fileSessionActivity(
"DRIVER_CARD",
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-interval-1:START:2026-04-01T00:00:00Z"
);
EventHubEventDto vu = fileSessionActivity(
"VEHICLE_UNIT",
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-interval-1:START:2026-04-01T00:00:00Z"
);
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-interval-1:START:2026-04-01T00:00:00Z");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-interval-1:START:2026-04-01T00:00:00Z");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
assertThat(mixed.eventMixingDecisions().getFirst().secondaryExtractionCodes())
.containsExactly("VU_ACTIVITY");
}
@Test
void keepsAllCardVehicleUsedAndIwCycleEventsForVehicleUsageProcessing() {
EventHubEventDto cardVehicleUsed = cardUsage(
"CARD_VEHICLES_USED",
"DRIVER_CARD",
"TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT",
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
EventHubEventDto iwCycle = cardUsage(
"IW_CYCLE",
"VEHICLE_UNIT",
"TACHOGRAPH:IW_CYCLE:20:INSERT",
EventType.CARD_INSERTED,
EventLifecycle.INSERT
);
RuntimeMixedEventBundle mixed = service.mix(List.of(cardVehicleUsed, iwCycle), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.vehicleUsageEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT", "TACHOGRAPH:IW_CYCLE:20:INSERT");
assertThat(mixed.suppressedEvents()).isEmpty();
}
@Test
void suppressesVuPositionDuplicateFromSupportEvidenceAndCopiesVehicleIdentityToCardPrimary() {
EventHubEventDto card = supportEvidence(
"CARD_POSITION",
"DRIVER_CARD",
"TACHOGRAPH:CARD_POSITION:1",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT,
false
);
EventHubEventDto vu = supportEvidence(
"VU_POSITION",
"VEHICLE_UNIT",
"TACHOGRAPH:VU_POSITION:2",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT,
true
);
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_POSITION:1");
assertThat(mixed.supportEvidenceEvents().getFirst().vehicleRef().vin())
.isEqualTo("WDB9634031L123456");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:VU_POSITION:2");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY);
assertThat(mixed.eventMixingDecisions().getFirst().channel())
.isEqualTo("SUPPORT_EVIDENCE");
}
@Test
void suppressesVuPlaceAndBorderDuplicateFromSupportEvidence() {
EventHubEventDto cardPlace = supportEvidence(
"CARD_PLACE",
"DRIVER_CARD",
"TACHOGRAPH:CARD_PLACE:1",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.SNAPSHOT,
false
);
EventHubEventDto vuPlace = supportEvidence(
"VU_PLACE",
"VEHICLE_UNIT",
"TACHOGRAPH:VU_PLACE:2",
EventDomain.PLACE,
EventType.WORKING_DAY_PLACE_RECORDED,
EventLifecycle.SNAPSHOT,
true
);
EventHubEventDto cardBorder = supportEvidence(
"CARD_BORDER_CROSSING",
"DRIVER_CARD",
"TACHOGRAPH:CARD_BORDER_CROSSING:3",
EventDomain.BORDER_CROSSING,
EventType.BORDER_OUTBOUND,
EventLifecycle.OUTBOUND,
false
);
EventHubEventDto vuBorder = supportEvidence(
"VU_BORDER_CROSSING",
"VEHICLE_UNIT",
"TACHOGRAPH:VU_BORDER_CROSSING:4",
EventDomain.BORDER_CROSSING,
EventType.BORDER_OUTBOUND,
EventLifecycle.OUTBOUND,
true
);
RuntimeMixedEventBundle mixed = service.mix(List.of(cardPlace, vuPlace, cardBorder, vuBorder), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:CARD_BORDER_CROSSING:3", "TACHOGRAPH:CARD_PLACE:1");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH:VU_BORDER_CROSSING:4", "TACHOGRAPH:VU_PLACE:2");
assertThat(mixed.eventMixingDecisions()).hasSize(2);
}
@Test
void suppressesTachographFileSessionVuPositionDuplicateWhenNoExtractionCodeIsPresent() {
EventHubEventDto card = fileSessionSupportEvidence(
"DRIVER_CARD",
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-position-1:SNAPSHOT:2026-04-01T00:00:00Z",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT,
false
);
EventHubEventDto vu = fileSessionSupportEvidence(
"VEHICLE_UNIT",
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-position-1:SNAPSHOT:2026-04-01T00:00:00Z",
EventDomain.POSITION,
EventType.POSITION_RECORDED,
EventLifecycle.SNAPSHOT,
true
);
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-position-1:SNAPSHOT:2026-04-01T00:00:00Z");
assertThat(mixed.supportEvidenceEvents().getFirst().vehicleRef().vin())
.isEqualTo("WDB9634031L123456");
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
.containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-position-1:SNAPSHOT:2026-04-01T00:00:00Z");
assertThat(mixed.eventMixingDecisions()).hasSize(1);
assertThat(mixed.eventMixingDecisions().getFirst().secondaryExtractionCodes())
.containsExactly("VU_POSITION");
}
private EventHubEventDto activity(String extractionCode, String sourceKind, String externalId) {
ObjectNode raw = baseRaw(extractionCode, sourceKind);
raw.put("activityType", "BREAK_REST");
raw.put("cardSlot", "DRIVER");
raw.put("cardStatus", "INSERTED");
raw.put("drivingStatus", "SINGLE");
raw.put("startedAt", "2026-03-31T15:43:00Z");
raw.put("endedAt", "2026-04-01T00:00:00Z");
raw.put("intervalId", "TACHOGRAPH:" + extractionCode + ":1");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
attributes.put("cardSlot", "DRIVER");
attributes.put("cardStatus", "INSERTED");
attributes.put("drivingStatus", "SINGLE");
return event(
externalId,
sourceKind,
EventDomain.DRIVER_ACTIVITY,
EventType.BREAK_REST,
EventLifecycle.START,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
new EventDetailsDto("DRIVER_ACTIVITY", attributes),
payload
);
}
private EventHubEventDto supportEvidence(
String extractionCode,
String sourceKind,
String externalId,
EventDomain domain,
EventType type,
EventLifecycle lifecycle,
boolean withVin
) {
ObjectNode raw = baseRaw(extractionCode, sourceKind);
raw.put("latitude", "48.208174");
raw.put("longitude", "16.373819");
raw.put("odometerM", "123000");
raw.put("country", "AT");
raw.put("region", "W");
raw.put("countryFrom", "AT");
raw.put("countryTo", "DE");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
attributes.put("country", "AT");
attributes.put("region", "W");
attributes.put("countryFrom", "AT");
attributes.put("countryTo", "DE");
VehicleRefDto vehicleRef = withVin
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));
return event(
externalId,
sourceKind,
domain,
type,
lifecycle,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
new EventDetailsDto(domain.name(), attributes),
payload,
vehicleRef
);
}
private EventHubEventDto fileSessionActivity(String sourceKind, String externalId) {
ObjectNode raw = baseRawWithoutExtraction(sourceKind);
raw.put("activityType", "BREAK_REST");
raw.put("cardSlot", "DRIVER");
raw.put("cardStatus", "INSERTED");
raw.put("drivingStatus", "SINGLE");
raw.put("startedAt", "2026-03-31T15:43:00Z");
raw.put("endedAt", "2026-04-01T00:00:00Z");
raw.put("intervalId", "TACHOGRAPH_FILE_SESSION:ACTIVITY:1");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
attributes.put("cardSlot", "DRIVER");
attributes.put("cardStatus", "INSERTED");
attributes.put("drivingStatus", "SINGLE");
return event(
externalId,
sourceKind,
EventDomain.DRIVER_ACTIVITY,
EventType.BREAK_REST,
EventLifecycle.START,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
new EventDetailsDto("DRIVER_ACTIVITY", attributes),
payload
);
}
private EventHubEventDto fileSessionSupportEvidence(
String sourceKind,
String externalId,
EventDomain domain,
EventType type,
EventLifecycle lifecycle,
boolean withVin
) {
ObjectNode raw = baseRawWithoutExtraction(sourceKind);
raw.put("latitude", "48.208174");
raw.put("longitude", "16.373819");
raw.put("odometerM", "123000");
raw.put("country", "AT");
raw.put("region", "W");
raw.put("countryFrom", "AT");
raw.put("countryTo", "DE");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
attributes.put("country", "AT");
attributes.put("region", "W");
attributes.put("countryFrom", "AT");
attributes.put("countryTo", "DE");
VehicleRefDto vehicleRef = withVin
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));
return event(
externalId,
sourceKind,
domain,
type,
lifecycle,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
new EventDetailsDto(domain.name(), attributes),
payload,
vehicleRef
);
}
private EventHubEventDto cardUsage(
String extractionCode,
String sourceKind,
String externalId,
EventType eventType,
EventLifecycle lifecycle
) {
ObjectNode raw = baseRaw(extractionCode, sourceKind);
raw.put("intervalId", "TACHOGRAPH:" + extractionCode + ":1");
ObjectNode payload = JsonNodeFactory.instance.objectNode();
payload.set("raw", raw);
return event(
externalId,
sourceKind,
EventDomain.DRIVER_CARD,
eventType,
lifecycle,
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
null,
payload
);
}
private ObjectNode baseRaw(String extractionCode, String sourceKind) {
ObjectNode raw = JsonNodeFactory.instance.objectNode();
raw.put("sourceRowId", extractionCode + "-1");
raw.put("sourceKind", sourceKind);
raw.put("extractionCode", extractionCode);
raw.put("driverKey", "1:10000000198490");
raw.put("registrationKey", "1:LL-158TE");
return raw;
}
private ObjectNode baseRawWithoutExtraction(String sourceKind) {
ObjectNode raw = JsonNodeFactory.instance.objectNode();
raw.put("sourceRowId", sourceKind + "-1");
raw.put("sourceKind", sourceKind);
raw.put("driverKey", "1:10000000198490");
raw.put("registrationKey", "1:LL-158TE");
return raw;
}
private EventHubEventDto event(
String externalId,
String sourceKind,
EventDomain domain,
EventType type,
EventLifecycle lifecycle,
OffsetDateTime occurredAt,
EventDetailsDto details,
ObjectNode payload
) {
return event(
externalId,
sourceKind,
domain,
type,
lifecycle,
occurredAt,
details,
payload,
new VehicleRefDto(null, null, null, new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
);
}
private EventHubEventDto event(
String externalId,
String sourceKind,
EventDomain domain,
EventType type,
EventLifecycle lifecycle,
OffsetDateTime occurredAt,
EventDetailsDto details,
ObjectNode payload,
VehicleRefDto vehicleRef
) {
return new EventHubEventDto(
UUID.randomUUID(),
externalId,
new DriverRefDto("1:10000000198490", new DriverCardRefDto("1", 1, "10000000198490")),
vehicleRef,
occurredAt,
null,
occurredAt,
domain,
type,
lifecycle,
null,
null,
details,
null,
payload,
false,
packageInfo(sourceKind, domain, occurredAt.toLocalDate())
);
}
private EventHubPackageRequest packageInfo(String sourceKind, EventDomain domain, LocalDate businessDate) {
EventSourceDto source = new EventSourceDto("TACHOGRAPH", sourceKind, "TACHOGRAPH_" + sourceKind, null, null, null);
OffsetDateTime from = businessDate.atStartOfDay().atOffset(java.time.ZoneOffset.UTC);
OffsetDateTime to = businessDate.plusDays(1).atStartOfDay().atOffset(java.time.ZoneOffset.UTC);
return new EventHubPackageRequest(
"default",
source,
null,
ImportScopeDto.tenantAll(from, to),
domain.name(),
businessDate,
source.stableKey() + ":" + domain.name() + ":" + businessDate
);
}
}

View File

@ -140,6 +140,60 @@ class VehicleUnitXmlExtractionServiceTest {
assertThat(session.warnings()).isEmpty(); assertThat(session.warnings()).isEmpty();
} }
@Test
void matchesNumericVuCardSlotNumbersToDriverActivitySlots() throws Exception {
TachographFileSession session = service.extract(
new TachographXmlParser.ParsedTachographXml(document(VehicleUnitXmlSamples.vehicleUnitXmlWithNumericCardSlots()), "VehicleUnit"),
new TachographFileSessionMetadata(
"default",
"legalrequirements-vehicleunit",
"sample-vu",
"sample-vu.ddd",
"abc",
10,
"42",
"def",
false,
null
),
Instant.now(),
Instant.now().plus(4, ChronoUnit.HOURS)
);
DriverExtractionSession driver = session.driversByKey().get("12:12345678901200");
assertThat(driver).isNotNull();
assertThat(driver.cardActivityIntervals()).hasSize(2);
assertThat(driver.cardActivityIntervals()).extracting("slot").containsOnly("DRIVER");
assertThat(session.warnings()).isEmpty();
}
@Test
void deduplicatesEquivalentVehicleUnitVehicleUsageIntervals() throws Exception {
TachographFileSession session = service.extract(
new TachographXmlParser.ParsedTachographXml(document(VehicleUnitXmlSamples.vehicleUnitXmlWithDuplicateVehicleUsageInterval()), "VehicleUnit"),
new TachographFileSessionMetadata(
"default",
"legalrequirements-vehicleunit",
"sample-vu",
"sample-vu.ddd",
"abc",
10,
"42",
"def",
false,
null
),
Instant.now(),
Instant.now().plus(4, ChronoUnit.HOURS)
);
DriverExtractionSession firstDriver = session.driversByKey().get("12:12345678901200");
assertThat(firstDriver).isNotNull();
assertThat(firstDriver.cardVehicleUsageIntervals()).hasSize(1);
assertThat(firstDriver.cardVehicleUsageIntervals().get(0).from()).isEqualTo(OffsetDateTime.parse("2026-04-01T08:00:00Z"));
assertThat(firstDriver.cardVehicleUsageIntervals().get(0).to()).isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z"));
}
@Test @Test
void keepsOpenVuCardUsageRecordWithoutClosingItAtDownloadPeriodEnd() throws Exception { void keepsOpenVuCardUsageRecordWithoutClosingItAtDownloadPeriodEnd() throws Exception {
TachographFileSession session = service.extract( TachographFileSession session = service.extract(

View File

@ -314,4 +314,41 @@ final class VehicleUnitXmlSamples {
</VehicleUnit> </VehicleUnit>
"""; """;
} }
static String vehicleUnitXmlWithNumericCardSlots() {
return vehicleUnitXmlWithActivityDownloadTime()
.replace("<cardSlotNumber>DRIVER</cardSlotNumber>", "<cardSlotNumber>0</cardSlotNumber>");
}
static String vehicleUnitXmlWithDuplicateVehicleUsageInterval() {
String duplicateRecord = """
<vuCardIWRecords>
<cardHolderName>
<holderSurname><name>Muster</name></holderSurname>
<holderFirstNames><name>Max</name></holderFirstNames>
</cardHolderName>
<fullCardNumber>
<cardType>DRIVER_CARD</cardType>
<cardIssuingMemberState>12</cardIssuingMemberState>
<cardNumber>
<driverIdentification>123456789012</driverIdentification>
<cardReplacementIndex>0</cardReplacementIndex>
<cardRenewalIndex>0</cardRenewalIndex>
</cardNumber>
<generation>2</generation>
</fullCardNumber>
<cardExpiryDate>2031-04-01T00:00:00Z</cardExpiryDate>
<cardInsertionTime>2026-04-01T08:00:00Z</cardInsertionTime>
<vehicleOdometerValueAtInsertion>1000</vehicleOdometerValueAtInsertion>
<cardSlotNumber>DRIVER</cardSlotNumber>
<cardWithdrawalTime>2026-04-01T11:00:00Z</cardWithdrawalTime>
<vehicleOdometerValueAtWithdrawal>1100</vehicleOdometerValueAtWithdrawal>
<manualInputFlag>NO_ENTRY</manualInputFlag>
</vuCardIWRecords>
""";
return vehicleUnitXml().replace(
duplicateRecord,
duplicateRecord + duplicateRecord
);
}
} }