Compare commits
No commits in common. "b4289abea50eedefaf51eaf81f1e7518f2745cf7" and "ce81bfe8689fd8be1e838099466ccef6511914d5" have entirely different histories.
b4289abea5
...
ce81bfe868
|
|
@ -5,4 +5,4 @@ target/
|
|||
.project
|
||||
.settings/
|
||||
.DS_Store
|
||||
logs/
|
||||
logs/eventhub-ingestion-service.log
|
||||
|
|
|
|||
|
|
@ -1,33 +1,17 @@
|
|||
# Patch: Tachograph file-session support for runtime event mixing
|
||||
# EventHub fix-list patch
|
||||
|
||||
This patch extends the existing `event-evidence-mixing` implementation so it also supports events produced by uploaded tachograph file sessions.
|
||||
This patch implements the requested remaining items from the fix list, excluding build verification and SQL Server 2008 SQL rewriting.
|
||||
|
||||
## What changed
|
||||
## Apply notes
|
||||
|
||||
- `RuntimeEventMixingService` now treats `TACHOGRAPH_FILE_SESSION` and `COMPOSITE_TACHOGRAPH_FILE_SESSION` events as tachograph runtime evidence.
|
||||
- File-session events do not always have a DB-style `extractionCode` such as `CARD_ACTIVITY` or `VU_ACTIVITY` in the raw payload.
|
||||
- The mixing service now derives the equivalent extraction code from:
|
||||
- source kind: `DRIVER_CARD` or `VEHICLE_UNIT`
|
||||
- event domain: `DRIVER_ACTIVITY`, `POSITION`, `PLACE`, `BORDER_CROSSING`, `DRIVER_CARD`, etc.
|
||||
- file-session external id / source-package kind.
|
||||
1. Copy the files from this archive into the project root.
|
||||
2. Delete the old migration files listed in `DELETE_FILES.txt`.
|
||||
3. Run the test suite locally with Maven/Java 21.
|
||||
|
||||
## Supported derived mappings
|
||||
## Main changes
|
||||
|
||||
| File-session event | Driver-card source | Vehicle-unit source |
|
||||
|---|---|---|
|
||||
| `DRIVER_ACTIVITY` | `CARD_ACTIVITY` | `VU_ACTIVITY` |
|
||||
| `POSITION` | `CARD_POSITION` | `VU_POSITION` |
|
||||
| `PLACE` | `CARD_PLACE` | `VU_PLACE` |
|
||||
| `BORDER_CROSSING` | `CARD_BORDER_CROSSING` | `VU_BORDER_CROSSING` |
|
||||
| `DRIVER_CARD` insert/withdraw | `CARD_VEHICLES_USED` | `IW_CYCLE` |
|
||||
|
||||
## Important behavior
|
||||
|
||||
- Duplicate file-session `CARD_ACTIVITY` / `VU_ACTIVITY` events are mixed the same way as persistent tachograph DB events.
|
||||
- Duplicate file-session position/place/border events are also mixed the same way as persistent tachograph DB support evidence.
|
||||
- `CARD_VEHICLES_USED` and `IW_CYCLE` are still not mixed; they remain accepted for separate vehicle-usage processing.
|
||||
|
||||
## Modified files
|
||||
|
||||
- `src/main/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingService.java`
|
||||
- `src/test/java/at/procon/eventhub/processing/eventprocessing/mixing/RuntimeEventMixingServiceTest.java`
|
||||
- Renumbered Flyway migrations to remove duplicate `V9` and `V10` versions.
|
||||
- Removed the duplicated Timescale/Event source record migration.
|
||||
- Switched local Docker Compose DB from plain PostgreSQL to a TimescaleDB/PostGIS-capable image.
|
||||
- Added normalized raw tachograph payload metadata for DB-extracted EventHub events.
|
||||
- Added tests for Flyway version uniqueness and tachograph DB mapper → timeline reconstruction metadata.
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -1,30 +0,0 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.mixing;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.List;
|
||||
|
||||
public record RuntimeEventMixingDecisionDto(
|
||||
String ruleId,
|
||||
String equivalenceType,
|
||||
String eventKey,
|
||||
String decision,
|
||||
String channel,
|
||||
String primaryExternalSourceEventId,
|
||||
String primaryExtractionCode,
|
||||
List<String> secondaryExternalSourceEventIds,
|
||||
List<String> secondaryExtractionCodes,
|
||||
OffsetDateTime occurredAt,
|
||||
String eventDomain,
|
||||
String eventType,
|
||||
String lifecycle,
|
||||
String reason
|
||||
) {
|
||||
public RuntimeEventMixingDecisionDto {
|
||||
secondaryExternalSourceEventIds = secondaryExternalSourceEventIds == null
|
||||
? List.of()
|
||||
: List.copyOf(secondaryExternalSourceEventIds);
|
||||
secondaryExtractionCodes = secondaryExtractionCodes == null
|
||||
? List.of()
|
||||
: List.copyOf(secondaryExtractionCodes);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,691 +0,0 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.mixing;
|
||||
|
||||
import at.procon.eventhub.dto.EventDomain;
|
||||
import at.procon.eventhub.dto.EventHubEventDto;
|
||||
import at.procon.eventhub.dto.EventLifecycle;
|
||||
import at.procon.eventhub.dto.GeoPointDto;
|
||||
import at.procon.eventhub.dto.VehicleRefDto;
|
||||
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
|
||||
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
|
||||
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class RuntimeEventMixingService {
|
||||
|
||||
public static final String MODE_OFF = "OFF";
|
||||
public static final String MODE_TACHOGRAPH_SAME_SOURCE = "TACHOGRAPH_SAME_SOURCE";
|
||||
public static final String MODE_FULL = "FULL";
|
||||
|
||||
public static final String RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY =
|
||||
"tachograph.activity.card-vu.same-event-key";
|
||||
public static final String RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY =
|
||||
"tachograph.activity.card-vu.compatible-activity-key";
|
||||
public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY =
|
||||
"tachograph.support.card-vu.same-event-key";
|
||||
public static final String RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY =
|
||||
"tachograph.support.card-vu.compatible-support-key";
|
||||
|
||||
public RuntimeMixedEventBundle mix(List<EventHubEventDto> events, String requestedMode) {
|
||||
List<EventHubEventDto> rawEvents = sort(events);
|
||||
String mode = normalizeMode(requestedMode);
|
||||
if (MODE_OFF.equals(mode)) {
|
||||
return unchanged(rawEvents, "Runtime event mixing is disabled by eventMixingMode=OFF.");
|
||||
}
|
||||
|
||||
MixingState state = new MixingState(rawEvents);
|
||||
applyTachographCardVuActivityMixing(state);
|
||||
applyTachographCardVuSupportEvidenceMixing(state);
|
||||
|
||||
List<EventHubEventDto> driverPartitionEvents = rawEvents.stream()
|
||||
.filter(event -> !state.isSuppressed(event))
|
||||
.map(state::effectiveEvent)
|
||||
.toList();
|
||||
List<EventHubEventDto> activityTimelineEvents = driverPartitionEvents.stream()
|
||||
.filter(RuntimeEventMixingService::isDriverActivityPoint)
|
||||
.toList();
|
||||
|
||||
// Vehicle-usage events are intentionally not mixed here. CARD_VEHICLES_USED and IW_CYCLE
|
||||
// are kept as separate input evidence because they must be processed by their own rules later.
|
||||
List<EventHubEventDto> vehicleUsageEvents = rawEvents.stream()
|
||||
.filter(RuntimeEventMixingService::isDriverCardUsagePoint)
|
||||
.toList();
|
||||
|
||||
List<EventHubEventDto> supportEvidenceEvents = driverPartitionEvents.stream()
|
||||
.filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event))
|
||||
.toList();
|
||||
|
||||
List<String> notes = new ArrayList<>();
|
||||
notes.add("Runtime event mixing inspected " + rawEvents.size() + " event(s).");
|
||||
notes.add("Runtime event mixing suppressed " + state.suppressedEvents().size()
|
||||
+ " duplicate source event(s) from activity/support evidence channels.");
|
||||
notes.add("Runtime event mixing keeps CARD_POSITION, CARD_PLACE, and CARD_BORDER_CROSSING as primary when matching VU support evidence describes the same semantic event.");
|
||||
notes.add("Runtime event mixing kept all CARD_VEHICLES_USED and IW_CYCLE card-usage point events unchanged for vehicle-usage processing.");
|
||||
return new RuntimeMixedEventBundle(
|
||||
rawEvents,
|
||||
driverPartitionEvents,
|
||||
activityTimelineEvents,
|
||||
vehicleUsageEvents,
|
||||
supportEvidenceEvents,
|
||||
state.suppressedEvents(),
|
||||
state.decisions(),
|
||||
notes,
|
||||
state.warnings()
|
||||
);
|
||||
}
|
||||
|
||||
private RuntimeMixedEventBundle unchanged(List<EventHubEventDto> rawEvents, String note) {
|
||||
return new RuntimeMixedEventBundle(
|
||||
rawEvents,
|
||||
rawEvents,
|
||||
rawEvents.stream().filter(RuntimeEventMixingService::isDriverActivityPoint).toList(),
|
||||
rawEvents.stream().filter(RuntimeEventMixingService::isDriverCardUsagePoint).toList(),
|
||||
rawEvents.stream().filter(event -> !isDriverActivityPoint(event) && !isDriverCardUsagePoint(event)).toList(),
|
||||
List.of(),
|
||||
List.of(),
|
||||
List.of(note),
|
||||
List.of()
|
||||
);
|
||||
}
|
||||
|
||||
private void applyTachographCardVuActivityMixing(MixingState state) {
|
||||
LinkedHashMap<String, List<EventHubEventDto>> exactGroups = new LinkedHashMap<>();
|
||||
for (EventHubEventDto event : state.rawEvents()) {
|
||||
if (!isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) {
|
||||
continue;
|
||||
}
|
||||
exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>())
|
||||
.add(event);
|
||||
}
|
||||
for (Map.Entry<String, List<EventHubEventDto>> entry : exactGroups.entrySet()) {
|
||||
fuseCardAndVuDuplicates(
|
||||
state,
|
||||
entry.getKey(),
|
||||
"EXACT_EVENT_KEY",
|
||||
RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY,
|
||||
"ACTIVITY_TIMELINE",
|
||||
entry.getValue(),
|
||||
"CARD_ACTIVITY",
|
||||
"VU_ACTIVITY",
|
||||
"CARD_ACTIVITY and VU_ACTIVITY describe the same driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization."
|
||||
);
|
||||
}
|
||||
|
||||
LinkedHashMap<String, List<EventHubEventDto>> compatibleGroups = new LinkedHashMap<>();
|
||||
for (EventHubEventDto event : state.rawEvents()) {
|
||||
if (state.isSuppressed(event) || !isDriverActivityPoint(event) || !isTachographCardOrVuActivity(event)) {
|
||||
continue;
|
||||
}
|
||||
compatibleGroups.computeIfAbsent(compatibleActivityKey(event), ignored -> new ArrayList<>())
|
||||
.add(event);
|
||||
}
|
||||
for (Map.Entry<String, List<EventHubEventDto>> entry : compatibleGroups.entrySet()) {
|
||||
fuseCardAndVuDuplicates(
|
||||
state,
|
||||
entry.getKey(),
|
||||
"COMPATIBLE_ACTIVITY_KEY",
|
||||
RULE_TACHOGRAPH_CARD_VU_ACTIVITY_COMPATIBLE_KEY,
|
||||
"ACTIVITY_TIMELINE",
|
||||
entry.getValue(),
|
||||
"CARD_ACTIVITY",
|
||||
"VU_ACTIVITY",
|
||||
"CARD_ACTIVITY and VU_ACTIVITY describe a compatible driver activity point. CARD_ACTIVITY is kept as primary for the activity timeline; VU_ACTIVITY is suppressed from activity intervalization."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void applyTachographCardVuSupportEvidenceMixing(MixingState state) {
|
||||
LinkedHashMap<String, List<EventHubEventDto>> exactGroups = new LinkedHashMap<>();
|
||||
for (EventHubEventDto event : state.rawEvents()) {
|
||||
if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) {
|
||||
continue;
|
||||
}
|
||||
exactGroups.computeIfAbsent(RuntimeEventIdentityResolver.canonicalEventKey(event), ignored -> new ArrayList<>())
|
||||
.add(event);
|
||||
}
|
||||
for (Map.Entry<String, List<EventHubEventDto>> entry : exactGroups.entrySet()) {
|
||||
fuseCardAndVuSupportDuplicates(
|
||||
state,
|
||||
entry.getKey(),
|
||||
"EXACT_EVENT_KEY",
|
||||
RULE_TACHOGRAPH_CARD_VU_SUPPORT_SAME_EVENT_KEY,
|
||||
entry.getValue()
|
||||
);
|
||||
}
|
||||
|
||||
LinkedHashMap<String, List<EventHubEventDto>> compatibleGroups = new LinkedHashMap<>();
|
||||
for (EventHubEventDto event : state.rawEvents()) {
|
||||
if (state.isSuppressed(event) || !isTachographCardOrVuSupportEvidence(event)) {
|
||||
continue;
|
||||
}
|
||||
compatibleGroups.computeIfAbsent(compatibleSupportEvidenceKey(event), ignored -> new ArrayList<>())
|
||||
.add(event);
|
||||
}
|
||||
for (Map.Entry<String, List<EventHubEventDto>> entry : compatibleGroups.entrySet()) {
|
||||
fuseCardAndVuSupportDuplicates(
|
||||
state,
|
||||
entry.getKey(),
|
||||
"COMPATIBLE_SUPPORT_KEY",
|
||||
RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY,
|
||||
entry.getValue()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void fuseCardAndVuSupportDuplicates(
|
||||
MixingState state,
|
||||
String eventKey,
|
||||
String equivalenceType,
|
||||
String ruleId,
|
||||
List<EventHubEventDto> group
|
||||
) {
|
||||
Map.Entry<String, String> pair = cardVuSupportPair(group);
|
||||
if (pair == null) {
|
||||
return;
|
||||
}
|
||||
fuseCardAndVuDuplicates(
|
||||
state,
|
||||
eventKey,
|
||||
equivalenceType,
|
||||
ruleId,
|
||||
"SUPPORT_EVIDENCE",
|
||||
group,
|
||||
pair.getKey(),
|
||||
pair.getValue(),
|
||||
pair.getKey() + " and " + pair.getValue() + " describe the same support evidence event. "
|
||||
+ pair.getKey() + " is kept as primary support evidence; " + pair.getValue()
|
||||
+ " is suppressed from support-evidence normalization. Vehicle/VIN identity from the VU event is copied to the primary event when the card event has weaker vehicle identity."
|
||||
);
|
||||
}
|
||||
|
||||
private void fuseCardAndVuDuplicates(
|
||||
MixingState state,
|
||||
String eventKey,
|
||||
String equivalenceType,
|
||||
String ruleId,
|
||||
String channel,
|
||||
List<EventHubEventDto> group,
|
||||
String primaryExtractionCode,
|
||||
String secondaryExtractionCode,
|
||||
String reason
|
||||
) {
|
||||
List<EventHubEventDto> primaries = group.stream()
|
||||
.filter(event -> Objects.equals(primaryExtractionCode, extractionCode(event)))
|
||||
.sorted(eventComparator())
|
||||
.toList();
|
||||
List<EventHubEventDto> secondaries = group.stream()
|
||||
.filter(event -> Objects.equals(secondaryExtractionCode, extractionCode(event)))
|
||||
.sorted(eventComparator())
|
||||
.toList();
|
||||
if (primaries.isEmpty() || secondaries.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
EventHubEventDto primary = primaries.get(0);
|
||||
List<EventHubEventDto> newlySuppressed = secondaries.stream()
|
||||
.filter(event -> !state.isSuppressed(event))
|
||||
.toList();
|
||||
if (newlySuppressed.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
EventHubEventDto enrichedPrimary = enrichPrimaryVehicleRef(primary, newlySuppressed);
|
||||
if (enrichedPrimary != primary) {
|
||||
state.replace(primary, enrichedPrimary);
|
||||
}
|
||||
newlySuppressed.forEach(state::suppress);
|
||||
state.decisions().add(new RuntimeEventMixingDecisionDto(
|
||||
ruleId,
|
||||
equivalenceType,
|
||||
eventKey,
|
||||
"FUSED_PRIMARY_SELECTED",
|
||||
channel,
|
||||
primary.externalSourceEventId(),
|
||||
extractionCode(primary),
|
||||
newlySuppressed.stream().map(EventHubEventDto::externalSourceEventId).toList(),
|
||||
newlySuppressed.stream().map(RuntimeEventMixingService::extractionCode).toList(),
|
||||
primary.occurredAt(),
|
||||
primary.eventDomain() == null ? null : primary.eventDomain().name(),
|
||||
primary.eventType() == null ? null : primary.eventType().name(),
|
||||
primary.lifecycle() == null ? null : primary.lifecycle().name(),
|
||||
reason
|
||||
));
|
||||
}
|
||||
|
||||
private static EventHubEventDto enrichPrimaryVehicleRef(EventHubEventDto primary, List<EventHubEventDto> secondaries) {
|
||||
if (primary == null || secondaries == null || secondaries.isEmpty()) {
|
||||
return primary;
|
||||
}
|
||||
VehicleRefDto bestSecondary = secondaries.stream()
|
||||
.map(EventHubEventDto::vehicleRef)
|
||||
.filter(Objects::nonNull)
|
||||
.filter(VehicleRefDto::hasAnyReference)
|
||||
.filter(RuntimeEventMixingService::hasVehicleIdentity)
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
if (bestSecondary == null || !shouldEnrichVehicleRef(primary.vehicleRef(), bestSecondary)) {
|
||||
return primary;
|
||||
}
|
||||
VehicleRefDto merged = mergeVehicleRef(primary.vehicleRef(), bestSecondary);
|
||||
if (Objects.equals(primary.vehicleRef(), merged)) {
|
||||
return primary;
|
||||
}
|
||||
return new EventHubEventDto(
|
||||
primary.eventId(),
|
||||
primary.externalSourceEventId(),
|
||||
primary.driverRef(),
|
||||
merged,
|
||||
primary.occurredAt(),
|
||||
primary.receivedPartnerAt(),
|
||||
primary.receivedHubAt(),
|
||||
primary.eventDomain(),
|
||||
primary.eventType(),
|
||||
primary.lifecycle(),
|
||||
primary.odometerM(),
|
||||
primary.position(),
|
||||
primary.eventDetails(),
|
||||
primary.sourcePackageRef(),
|
||||
primary.payload(),
|
||||
primary.manualEntry(),
|
||||
primary.packageInfo()
|
||||
);
|
||||
}
|
||||
|
||||
private static boolean shouldEnrichVehicleRef(VehicleRefDto primary, VehicleRefDto secondary) {
|
||||
return secondary != null
|
||||
&& hasVehicleIdentity(secondary)
|
||||
&& (primary == null || !hasVehicleIdentity(primary));
|
||||
}
|
||||
|
||||
private static boolean hasVehicleIdentity(VehicleRefDto vehicleRef) {
|
||||
return vehicleRef != null
|
||||
&& (notBlank(vehicleRef.sourceVehicleEntityId()) || notBlank(vehicleRef.vin()));
|
||||
}
|
||||
|
||||
private static VehicleRefDto mergeVehicleRef(VehicleRefDto primary, VehicleRefDto secondary) {
|
||||
if (primary == null) {
|
||||
return secondary;
|
||||
}
|
||||
if (secondary == null) {
|
||||
return primary;
|
||||
}
|
||||
VehicleRegistrationRefDto registration = primary.vehicleRegistration() != null && primary.vehicleRegistration().hasValue()
|
||||
? primary.vehicleRegistration()
|
||||
: secondary.vehicleRegistration();
|
||||
return new VehicleRefDto(
|
||||
firstNonBlank(primary.sourceVehicleEntityId(), secondary.sourceVehicleEntityId()),
|
||||
firstNonBlank(primary.vin(), secondary.vin()),
|
||||
firstNonBlank(primary.sourceRegistrationEntityId(), secondary.sourceRegistrationEntityId()),
|
||||
registration
|
||||
);
|
||||
}
|
||||
|
||||
private static Map.Entry<String, String> cardVuSupportPair(List<EventHubEventDto> group) {
|
||||
if (group == null || group.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Set<String> extractionCodes = group.stream()
|
||||
.map(RuntimeEventMixingService::extractionCode)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(java.util.stream.Collectors.toCollection(LinkedHashSet::new));
|
||||
if (extractionCodes.contains("CARD_POSITION") && extractionCodes.contains("VU_POSITION")) {
|
||||
return Map.entry("CARD_POSITION", "VU_POSITION");
|
||||
}
|
||||
if (extractionCodes.contains("CARD_PLACE") && extractionCodes.contains("VU_PLACE")) {
|
||||
return Map.entry("CARD_PLACE", "VU_PLACE");
|
||||
}
|
||||
if (extractionCodes.contains("CARD_BORDER_CROSSING") && extractionCodes.contains("VU_BORDER_CROSSING")) {
|
||||
return Map.entry("CARD_BORDER_CROSSING", "VU_BORDER_CROSSING");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static boolean isTachographCardOrVuActivity(EventHubEventDto event) {
|
||||
RuntimeEventSourceProfile profile = sourceProfile(event);
|
||||
return isTachographRuntimeSource(profile)
|
||||
&& (Objects.equals("CARD_ACTIVITY", profile.extractionCode())
|
||||
|| Objects.equals("VU_ACTIVITY", profile.extractionCode()));
|
||||
}
|
||||
|
||||
private static boolean isTachographCardOrVuSupportEvidence(EventHubEventDto event) {
|
||||
if (event == null || isDriverActivityPoint(event) || isDriverCardUsagePoint(event)) {
|
||||
return false;
|
||||
}
|
||||
RuntimeEventSourceProfile profile = sourceProfile(event);
|
||||
if (!isTachographRuntimeSource(profile)) {
|
||||
return false;
|
||||
}
|
||||
return switch (nullToEmpty(profile.extractionCode())) {
|
||||
case "CARD_POSITION", "VU_POSITION", "CARD_PLACE", "VU_PLACE", "CARD_BORDER_CROSSING", "VU_BORDER_CROSSING" -> true;
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
private static boolean isDriverActivityPoint(EventHubEventDto event) {
|
||||
return event != null
|
||||
&& event.eventDomain() == EventDomain.DRIVER_ACTIVITY
|
||||
&& (event.lifecycle() == EventLifecycle.START || event.lifecycle() == EventLifecycle.END)
|
||||
&& event.occurredAt() != null;
|
||||
}
|
||||
|
||||
private static boolean isDriverCardUsagePoint(EventHubEventDto event) {
|
||||
return event != null
|
||||
&& event.eventDomain() == EventDomain.DRIVER_CARD
|
||||
&& (event.lifecycle() == EventLifecycle.INSERT || event.lifecycle() == EventLifecycle.WITHDRAW)
|
||||
&& event.occurredAt() != null;
|
||||
}
|
||||
|
||||
private static RuntimeEventSourceProfile sourceProfile(EventHubEventDto event) {
|
||||
JsonNode raw = rawPayload(event);
|
||||
String sourceKind = firstNonBlank(text(raw, "sourceKind"), sourceKind(event));
|
||||
String extractionCode = firstNonBlank(
|
||||
text(raw, "extractionCode"),
|
||||
fileSessionExtractionCode(event, sourceKind),
|
||||
extractionCodeFromExternalSourceEventId(event)
|
||||
);
|
||||
String sourceSystem = firstNonBlank(
|
||||
text(raw, "sourceSystem"),
|
||||
sourceProvider(event),
|
||||
sourceSystemFromExternalSourceEventId(event)
|
||||
);
|
||||
if (sourceSystem == null && (extractionCode != null || isTachographFileSessionEvent(event))) {
|
||||
sourceSystem = "TACHOGRAPH";
|
||||
}
|
||||
return new RuntimeEventSourceProfile(
|
||||
normalizeUpper(sourceSystem),
|
||||
normalizeUpper(sourceKind),
|
||||
normalizeUpper(extractionCode)
|
||||
);
|
||||
}
|
||||
|
||||
private static String extractionCode(EventHubEventDto event) {
|
||||
return sourceProfile(event).extractionCode();
|
||||
}
|
||||
private static boolean isTachographRuntimeSource(RuntimeEventSourceProfile profile) {
|
||||
if (profile == null) {
|
||||
return false;
|
||||
}
|
||||
return switch (nullToEmpty(profile.sourceSystem())) {
|
||||
case "TACHOGRAPH", "TACHOGRAPH_FILE_SESSION", "COMPOSITE_TACHOGRAPH_FILE_SESSION" -> true;
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
private static String fileSessionExtractionCode(EventHubEventDto event, String sourceKind) {
|
||||
if (!isTachographFileSessionEvent(event)) {
|
||||
return null;
|
||||
}
|
||||
String normalizedSourceKind = normalizeUpper(sourceKind);
|
||||
if (normalizedSourceKind == null) {
|
||||
return null;
|
||||
}
|
||||
if (event != null && event.eventDomain() == EventDomain.DRIVER_ACTIVITY) {
|
||||
return switch (normalizedSourceKind) {
|
||||
case "DRIVER_CARD" -> "CARD_ACTIVITY";
|
||||
case "VEHICLE_UNIT" -> "VU_ACTIVITY";
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
if (event != null && event.eventDomain() == EventDomain.DRIVER_CARD) {
|
||||
return switch (normalizedSourceKind) {
|
||||
case "DRIVER_CARD" -> "CARD_VEHICLES_USED";
|
||||
case "VEHICLE_UNIT" -> "IW_CYCLE";
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
if (event == null || event.eventDomain() == null) {
|
||||
return null;
|
||||
}
|
||||
String prefix = switch (normalizedSourceKind) {
|
||||
case "DRIVER_CARD" -> "CARD";
|
||||
case "VEHICLE_UNIT" -> "VU";
|
||||
default -> null;
|
||||
};
|
||||
if (prefix == null) {
|
||||
return null;
|
||||
}
|
||||
return switch (event.eventDomain()) {
|
||||
case POSITION -> prefix + "_POSITION";
|
||||
case PLACE -> prefix + "_PLACE";
|
||||
case BORDER_CROSSING -> prefix + "_BORDER_CROSSING";
|
||||
case SPEEDING -> Objects.equals("VU", prefix) ? "SPEEDING_EVENTS" : null;
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
|
||||
private static boolean isTachographFileSessionEvent(EventHubEventDto event) {
|
||||
if (event == null) {
|
||||
return false;
|
||||
}
|
||||
String packageKind = event.sourcePackageRef() == null ? null : normalizeUpper(event.sourcePackageRef().packageKind());
|
||||
if (Objects.equals("TACHOGRAPH_FILE_SESSION", packageKind)
|
||||
|| Objects.equals("COMPOSITE_TACHOGRAPH_FILE_SESSION", packageKind)) {
|
||||
return true;
|
||||
}
|
||||
String externalId = event.externalSourceEventId();
|
||||
return externalId != null
|
||||
&& (externalId.startsWith("TACHOGRAPH_FILE_SESSION:")
|
||||
|| externalId.startsWith("COMPOSITE_TACHOGRAPH_FILE_SESSION:"));
|
||||
}
|
||||
|
||||
|
||||
private static String compatibleActivityKey(EventHubEventDto event) {
|
||||
JsonNode raw = rawPayload(event);
|
||||
return String.join("|",
|
||||
"ACTIVITY_COMPATIBLE",
|
||||
nullToEmpty(event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
|
||||
nullToEmpty(event.eventDomain() == null ? null : event.eventDomain().name()),
|
||||
nullToEmpty(event.eventType() == null ? null : event.eventType().name()),
|
||||
nullToEmpty(event.lifecycle() == null ? null : event.lifecycle().name()),
|
||||
normalizeTime(event.occurredAt()),
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
|
||||
nullToEmpty(firstNonBlank(text(raw, "startedAt"), text(raw, "intervalStartedAt"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "endedAt"), text(raw, "intervalEndedAt"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "slot"), text(raw, "cardSlot"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "cardStatus"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "drivingStatus")))
|
||||
);
|
||||
}
|
||||
|
||||
private static String compatibleSupportEvidenceKey(EventHubEventDto event) {
|
||||
JsonNode raw = rawPayload(event);
|
||||
GeoPointDto position = event == null ? null : event.position();
|
||||
return String.join("|",
|
||||
"SUPPORT_COMPATIBLE",
|
||||
nullToEmpty(event == null || event.packageInfo() == null ? null : event.packageInfo().tenantKey()),
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.driverKey(event)),
|
||||
nullToEmpty(event == null || event.eventDomain() == null ? null : event.eventDomain().name()),
|
||||
nullToEmpty(event == null || event.eventType() == null ? null : event.eventType().name()),
|
||||
nullToEmpty(event == null || event.lifecycle() == null ? null : event.lifecycle().name()),
|
||||
normalizeTime(event == null ? null : event.occurredAt()),
|
||||
nullToEmpty(RuntimeEntityReferenceResolver.registrationKey(event)),
|
||||
nullToEmpty(firstNonBlank(text(raw, "latitude"), position == null || position.latitude() == null ? null : position.latitude().toPlainString())),
|
||||
nullToEmpty(firstNonBlank(text(raw, "longitude"), position == null || position.longitude() == null ? null : position.longitude().toPlainString())),
|
||||
nullToEmpty(firstNonBlank(text(raw, "odometerM"), event == null || event.odometerM() == null ? null : String.valueOf(event.odometerM()))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "country"), detailText(event, "country"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "region"), detailText(event, "region"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "countryFrom"), detailText(event, "countryFrom"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "countryTo"), detailText(event, "countryTo"))),
|
||||
nullToEmpty(firstNonBlank(text(raw, "operation"), detailText(event, "operation")))
|
||||
);
|
||||
}
|
||||
|
||||
private static Comparator<EventHubEventDto> eventComparator() {
|
||||
return Comparator.comparing(EventHubEventDto::occurredAt, Comparator.nullsLast(Comparator.naturalOrder()))
|
||||
.thenComparing(event -> event.eventDomain() == null ? "" : event.eventDomain().name())
|
||||
.thenComparing(event -> event.eventType() == null ? "" : event.eventType().name())
|
||||
.thenComparing(event -> event.lifecycle() == null ? "" : event.lifecycle().name())
|
||||
.thenComparing(event -> sourceProfile(event).extractionCode(), Comparator.nullsLast(String::compareTo))
|
||||
.thenComparing(EventHubEventDto::externalSourceEventId, Comparator.nullsLast(String::compareTo));
|
||||
}
|
||||
|
||||
private static List<EventHubEventDto> sort(List<EventHubEventDto> events) {
|
||||
return (events == null ? List.<EventHubEventDto>of() : events).stream()
|
||||
.filter(Objects::nonNull)
|
||||
.sorted(eventComparator())
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static String normalizeMode(String requestedMode) {
|
||||
String value = normalizeUpper(requestedMode);
|
||||
if (value == null) {
|
||||
return MODE_TACHOGRAPH_SAME_SOURCE;
|
||||
}
|
||||
return switch (value) {
|
||||
case MODE_OFF, MODE_TACHOGRAPH_SAME_SOURCE, MODE_FULL -> value;
|
||||
default -> MODE_TACHOGRAPH_SAME_SOURCE;
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonNode rawPayload(EventHubEventDto event) {
|
||||
return RuntimeEntityReferenceResolver.rawPayload(event);
|
||||
}
|
||||
|
||||
private static String sourceKind(EventHubEventDto event) {
|
||||
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
|
||||
? null
|
||||
: event.packageInfo().eventSource().sourceKind();
|
||||
}
|
||||
|
||||
private static String sourceProvider(EventHubEventDto event) {
|
||||
return event == null || event.packageInfo() == null || event.packageInfo().eventSource() == null
|
||||
? null
|
||||
: event.packageInfo().eventSource().providerKey();
|
||||
}
|
||||
|
||||
private static String sourceSystemFromExternalSourceEventId(EventHubEventDto event) {
|
||||
String externalId = event == null ? null : event.externalSourceEventId();
|
||||
if (externalId == null || externalId.isBlank()) {
|
||||
return null;
|
||||
}
|
||||
String[] parts = externalId.split(":");
|
||||
return parts.length >= 1 ? parts[0] : null;
|
||||
}
|
||||
|
||||
private static String extractionCodeFromExternalSourceEventId(EventHubEventDto event) {
|
||||
String externalId = event == null ? null : event.externalSourceEventId();
|
||||
if (externalId == null || externalId.isBlank()) {
|
||||
return null;
|
||||
}
|
||||
String[] parts = externalId.split(":");
|
||||
return parts.length >= 2 ? parts[1] : null;
|
||||
}
|
||||
|
||||
private static String detailText(EventHubEventDto event, String field) {
|
||||
if (event == null || event.eventDetails() == null || event.eventDetails().attributes() == null || field == null) {
|
||||
return null;
|
||||
}
|
||||
JsonNode value = event.eventDetails().attributes().get(field);
|
||||
if (value == null || value.isNull()) {
|
||||
return null;
|
||||
}
|
||||
String text = value.asText(null);
|
||||
return text == null || text.isBlank() ? null : text.trim();
|
||||
}
|
||||
|
||||
private static String text(JsonNode node, String field) {
|
||||
if (node == null || field == null) {
|
||||
return null;
|
||||
}
|
||||
JsonNode value = node.get(field);
|
||||
if (value == null || value.isNull()) {
|
||||
return null;
|
||||
}
|
||||
String text = value.asText(null);
|
||||
return text == null || text.isBlank() ? null : text.trim();
|
||||
}
|
||||
|
||||
private static String firstNonBlank(String... values) {
|
||||
if (values == null) {
|
||||
return null;
|
||||
}
|
||||
for (String value : values) {
|
||||
if (value != null && !value.isBlank()) {
|
||||
return value.trim();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static String normalizeUpper(String value) {
|
||||
return value == null || value.isBlank() ? null : value.trim().toUpperCase(Locale.ROOT);
|
||||
}
|
||||
|
||||
private static String nullToEmpty(Object value) {
|
||||
return value == null ? "" : String.valueOf(value);
|
||||
}
|
||||
|
||||
private static String normalizeTime(OffsetDateTime value) {
|
||||
return value == null ? "" : value.toInstant().toString();
|
||||
}
|
||||
|
||||
private static boolean notBlank(String value) {
|
||||
return value != null && !value.isBlank();
|
||||
}
|
||||
|
||||
private static final class MixingState {
|
||||
private final List<EventHubEventDto> rawEvents;
|
||||
private final Set<String> suppressedEventIds = new LinkedHashSet<>();
|
||||
private final Map<String, EventHubEventDto> replacementsByEventId = new LinkedHashMap<>();
|
||||
private final List<EventHubEventDto> suppressedEvents = new ArrayList<>();
|
||||
private final List<RuntimeEventMixingDecisionDto> decisions = new ArrayList<>();
|
||||
private final List<String> warnings = new ArrayList<>();
|
||||
|
||||
private MixingState(List<EventHubEventDto> rawEvents) {
|
||||
this.rawEvents = rawEvents;
|
||||
}
|
||||
|
||||
private List<EventHubEventDto> rawEvents() {
|
||||
return rawEvents;
|
||||
}
|
||||
|
||||
private boolean isSuppressed(EventHubEventDto event) {
|
||||
return suppressedEventIds.contains(eventId(event));
|
||||
}
|
||||
|
||||
private EventHubEventDto effectiveEvent(EventHubEventDto event) {
|
||||
return replacementsByEventId.getOrDefault(eventId(event), event);
|
||||
}
|
||||
|
||||
private void replace(EventHubEventDto original, EventHubEventDto replacement) {
|
||||
if (original != null && replacement != null) {
|
||||
replacementsByEventId.put(eventId(original), replacement);
|
||||
}
|
||||
}
|
||||
|
||||
private void suppress(EventHubEventDto event) {
|
||||
if (suppressedEventIds.add(eventId(event))) {
|
||||
suppressedEvents.add(event);
|
||||
}
|
||||
}
|
||||
|
||||
private List<EventHubEventDto> suppressedEvents() {
|
||||
return suppressedEvents;
|
||||
}
|
||||
|
||||
private List<RuntimeEventMixingDecisionDto> decisions() {
|
||||
return decisions;
|
||||
}
|
||||
|
||||
private List<String> warnings() {
|
||||
return warnings;
|
||||
}
|
||||
|
||||
private String eventId(EventHubEventDto event) {
|
||||
if (event == null) {
|
||||
return "<null>";
|
||||
}
|
||||
return firstNonBlank(event.externalSourceEventId(), event.eventId() == null ? null : event.eventId().toString(), RuntimeEventIdentityResolver.canonicalEventKey(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,8 +0,0 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.mixing;
|
||||
|
||||
public record RuntimeEventSourceProfile(
|
||||
String sourceSystem,
|
||||
String sourceKind,
|
||||
String extractionCode
|
||||
) {
|
||||
}
|
||||
|
|
@ -1,28 +0,0 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.mixing;
|
||||
|
||||
import at.procon.eventhub.dto.EventHubEventDto;
|
||||
import java.util.List;
|
||||
|
||||
public record RuntimeMixedEventBundle(
|
||||
List<EventHubEventDto> rawEvents,
|
||||
List<EventHubEventDto> driverPartitionEvents,
|
||||
List<EventHubEventDto> activityTimelineEvents,
|
||||
List<EventHubEventDto> vehicleUsageEvents,
|
||||
List<EventHubEventDto> supportEvidenceEvents,
|
||||
List<EventHubEventDto> suppressedEvents,
|
||||
List<RuntimeEventMixingDecisionDto> eventMixingDecisions,
|
||||
List<String> notes,
|
||||
List<String> warnings
|
||||
) {
|
||||
public RuntimeMixedEventBundle {
|
||||
rawEvents = rawEvents == null ? List.of() : List.copyOf(rawEvents);
|
||||
driverPartitionEvents = driverPartitionEvents == null ? List.of() : List.copyOf(driverPartitionEvents);
|
||||
activityTimelineEvents = activityTimelineEvents == null ? List.of() : List.copyOf(activityTimelineEvents);
|
||||
vehicleUsageEvents = vehicleUsageEvents == null ? List.of() : List.copyOf(vehicleUsageEvents);
|
||||
supportEvidenceEvents = supportEvidenceEvents == null ? List.of() : List.copyOf(supportEvidenceEvents);
|
||||
suppressedEvents = suppressedEvents == null ? List.of() : List.copyOf(suppressedEvents);
|
||||
eventMixingDecisions = eventMixingDecisions == null ? List.of() : List.copyOf(eventMixingDecisions);
|
||||
notes = notes == null ? List.of() : List.copyOf(notes);
|
||||
warnings = warnings == null ? List.of() : List.copyOf(warnings);
|
||||
}
|
||||
}
|
||||
|
|
@ -52,7 +52,7 @@ public class DriverActivityIntervalsModule implements RuntimeEplModule {
|
|||
|
||||
@Override
|
||||
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
|
||||
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.activityTimelineEvents(context);
|
||||
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.sourceEvents(context);
|
||||
List<Map<String, Object>> pointEvents = DriverWorkingTimeEplEventMapper.activityPointEvents(sourceEvents);
|
||||
RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition(
|
||||
moduleKey(),
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ public class DriverVehicleUsageIntervalsModule implements RuntimeEplModule {
|
|||
|
||||
@Override
|
||||
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
|
||||
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.vehicleUsageEvents(context);
|
||||
List<EventHubEventDto> sourceEvents = DriverWorkingTimeEplEventMapper.sourceEvents(context);
|
||||
List<Map<String, Object>> pointEvents = DriverWorkingTimeEplEventMapper.vehicleUsagePointEvents(sourceEvents);
|
||||
RuntimeEplModuleExecutionResult eplResult = eplModuleExecutor.execute(new RuntimeEplModuleDefinition(
|
||||
moduleKey(),
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ package at.procon.eventhub.processing.eventprocessing.module;
|
|||
public final class DriverWorkingTimeModuleKeys {
|
||||
|
||||
public static final String RUNTIME_EVENT_ASSEMBLY = "runtime-event-assembly";
|
||||
public static final String EVENT_EVIDENCE_MIXING = "event-evidence-mixing";
|
||||
public static final String EVENT_TO_ACTIVITY_INTERVALS = "event-to-activity-intervals";
|
||||
public static final String EVENT_TO_VEHICLE_USAGE_INTERVALS = "event-to-vehicle-usage-intervals";
|
||||
public static final String VEHICLE_USAGE_MERGE = "vehicle-usage-merge";
|
||||
|
|
|
|||
|
|
@ -1,103 +0,0 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.module;
|
||||
|
||||
import at.procon.eventhub.dto.EventHubEventDto;
|
||||
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeEventMixingService;
|
||||
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeMixedEventBundle;
|
||||
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
|
||||
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class EventEvidenceMixingModule implements RuntimeProcessingModule {
|
||||
|
||||
public static final String EVENT_MIXING_MODE_PARAMETER = "eventMixingMode";
|
||||
|
||||
private final RuntimeEventMixingService eventMixingService;
|
||||
|
||||
@Autowired
|
||||
public EventEvidenceMixingModule(RuntimeEventMixingService eventMixingService) {
|
||||
this.eventMixingService = eventMixingService;
|
||||
}
|
||||
|
||||
/** Compatibility constructor for tests/local registries that do not wire the mixing service. */
|
||||
public EventEvidenceMixingModule() {
|
||||
this.eventMixingService = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String moduleKey() {
|
||||
return DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuntimeProcessingModuleDescriptorDto descriptor() {
|
||||
return new RuntimeProcessingModuleDescriptorDto(
|
||||
moduleKey(),
|
||||
"Event evidence mixing",
|
||||
"Applies source-aware runtime evidence rules before intervalization. The initial rule collapses duplicate tachograph CARD_ACTIVITY/VU_ACTIVITY events by normalized eventKey while keeping CARD_VEHICLES_USED unchanged for vehicle-usage processing.",
|
||||
"JAVA",
|
||||
Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto")
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
|
||||
if (eventMixingService == null) {
|
||||
return delegatedPlaceholder();
|
||||
}
|
||||
List<EventHubEventDto> sourceEvents = sourceEvents(context);
|
||||
RuntimeMixedEventBundle mixed = eventMixingService.mix(sourceEvents, eventMixingMode(context));
|
||||
Map<String, Object> metadata = new LinkedHashMap<>();
|
||||
metadata.put("inputEventCount", mixed.rawEvents().size());
|
||||
metadata.put("driverPartitionEventCount", mixed.driverPartitionEvents().size());
|
||||
metadata.put("activityTimelineEventCount", mixed.activityTimelineEvents().size());
|
||||
metadata.put("vehicleUsageEventCount", mixed.vehicleUsageEvents().size());
|
||||
metadata.put("supportEvidenceEventCount", mixed.supportEvidenceEvents().size());
|
||||
metadata.put("suppressedEventCount", mixed.suppressedEvents().size());
|
||||
metadata.put("eventMixingDecisionCount", mixed.eventMixingDecisions().size());
|
||||
metadata.put("eventMixingMode", eventMixingMode(context));
|
||||
return new RuntimeProcessingModuleResult(
|
||||
moduleKey(),
|
||||
RuntimeProcessingModuleStatus.SUCCESS,
|
||||
mixed,
|
||||
metadata,
|
||||
mixed.warnings()
|
||||
);
|
||||
}
|
||||
|
||||
private RuntimeProcessingModuleResult delegatedPlaceholder() {
|
||||
Map<String, Object> metadata = new LinkedHashMap<>();
|
||||
metadata.put("executionModel", "delegated");
|
||||
metadata.put("note", "Event evidence mixing is not wired in this local/legacy module registry.");
|
||||
return new RuntimeProcessingModuleResult(
|
||||
moduleKey(),
|
||||
RuntimeProcessingModuleStatus.SUCCESS,
|
||||
null,
|
||||
metadata,
|
||||
List.of()
|
||||
);
|
||||
}
|
||||
|
||||
private List<EventHubEventDto> sourceEvents(RuntimeProcessingModuleContext context) {
|
||||
RuntimeProcessingModuleResult assemblyResult = context.previousResults().get(DriverWorkingTimeModuleKeys.RUNTIME_EVENT_ASSEMBLY);
|
||||
if (assemblyResult != null && assemblyResult.output() instanceof UnifiedRuntimeEventBundle bundle) {
|
||||
return bundle.mergedEvents();
|
||||
}
|
||||
return context.events();
|
||||
}
|
||||
|
||||
private String eventMixingMode(RuntimeProcessingModuleContext context) {
|
||||
Object value = context.request() == null || context.request().parameters() == null
|
||||
? null
|
||||
: context.request().parameters().get(EVENT_MIXING_MODE_PARAMETER);
|
||||
if (value == null) {
|
||||
value = context.attributes().get(EVENT_MIXING_MODE_PARAMETER);
|
||||
}
|
||||
return value == null ? RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE : value.toString();
|
||||
}
|
||||
}
|
||||
|
|
@ -7,7 +7,6 @@ import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeDr
|
|||
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
|
||||
import at.procon.eventhub.processing.dto.RuntimeDriverPartitionDebugDto;
|
||||
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
|
||||
import at.procon.eventhub.processing.eventprocessing.module.epl.DriverWorkingTimeEplEventMapper;
|
||||
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto;
|
||||
import at.procon.eventhub.processing.model.RuntimeDriverVehicleEvidenceAttachmentResult;
|
||||
import at.procon.eventhub.processing.model.UnifiedDiscoveredVehicleRef;
|
||||
|
|
@ -67,7 +66,6 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
|
|||
return delegatedPlaceholder();
|
||||
}
|
||||
UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context);
|
||||
List<EventHubEventDto> partitionSourceEvents = DriverWorkingTimeEplEventMapper.driverPartitionEvents(context);
|
||||
UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context);
|
||||
boolean includePartitionDebug = booleanAttribute(context, "includePartitionDebug", false);
|
||||
List<DriverWorkingTimeVehicleUsageInterval> mergedVehicleUsageIntervals = mergedVehicleUsageIntervals(context);
|
||||
|
|
@ -75,8 +73,8 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
|
|||
LinkedHashMap<String, DriverWorkingTimeDriverPartition> partitions = new LinkedHashMap<>();
|
||||
LinkedHashMap<String, List<String>> attachedVehicleEvidenceByEvent = new LinkedHashMap<>();
|
||||
List<String> warnings = new ArrayList<>();
|
||||
for (String driverKey : selectedDriverKeys(scopeRequest.toRuntimeRequest(), partitionSourceEvents)) {
|
||||
List<EventHubEventDto> directDriverEvents = partitionSourceEvents.stream()
|
||||
for (String driverKey : selectedDriverKeys(scopeRequest.toRuntimeRequest(), broadBundle.mergedEvents())) {
|
||||
List<EventHubEventDto> directDriverEvents = broadBundle.mergedEvents().stream()
|
||||
.filter(event -> Objects.equals(driverKey(event), driverKey))
|
||||
.toList();
|
||||
List<DriverWorkingTimeVehicleUsageInterval> driverVehicleUsageIntervals = mergedVehicleUsageIntervals.stream()
|
||||
|
|
@ -85,7 +83,7 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
|
|||
RuntimeDriverVehicleEvidenceAttachmentResult attachmentResult = vehicleEvidenceAttachmentService.attachVehicleEvidence(
|
||||
driverKey,
|
||||
directDriverEvents,
|
||||
partitionSourceEvents,
|
||||
broadBundle.mergedEvents(),
|
||||
driverVehicleUsageIntervals,
|
||||
booleanAttribute(
|
||||
context,
|
||||
|
|
@ -134,8 +132,6 @@ public class VehicleEvidenceAttachmentModule implements RuntimeProcessingModule
|
|||
metadata.put("attachedVehicleEvidenceEventCount", partitions.values().stream()
|
||||
.mapToInt(partition -> partition.attachedVehicleEvidenceEvents().size())
|
||||
.sum());
|
||||
metadata.put("partitionSourceEventCount", partitionSourceEvents.size());
|
||||
metadata.put("rawMergedEventCount", broadBundle.mergedEvents().size());
|
||||
return new RuntimeProcessingModuleResult(
|
||||
moduleKey(),
|
||||
RuntimeProcessingModuleStatus.SUCCESS,
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import at.procon.eventhub.dto.EventType;
|
|||
import at.procon.eventhub.processing.eventprocessing.module.DriverWorkingTimeModuleKeys;
|
||||
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleContext;
|
||||
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleResult;
|
||||
import at.procon.eventhub.processing.eventprocessing.mixing.RuntimeMixedEventBundle;
|
||||
import at.procon.eventhub.processing.model.UnifiedRuntimeEventBundle;
|
||||
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
|
||||
import at.procon.eventhub.processing.support.RuntimeEntityReferenceResolver;
|
||||
|
|
@ -39,30 +38,6 @@ public final class DriverWorkingTimeEplEventMapper {
|
|||
return safeList(context.events());
|
||||
}
|
||||
|
||||
public static List<EventHubEventDto> activityTimelineEvents(RuntimeProcessingModuleContext context) {
|
||||
RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING);
|
||||
if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) {
|
||||
return safeList(mixed.activityTimelineEvents());
|
||||
}
|
||||
return sourceEvents(context);
|
||||
}
|
||||
|
||||
public static List<EventHubEventDto> vehicleUsageEvents(RuntimeProcessingModuleContext context) {
|
||||
RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING);
|
||||
if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) {
|
||||
return safeList(mixed.vehicleUsageEvents());
|
||||
}
|
||||
return sourceEvents(context);
|
||||
}
|
||||
|
||||
public static List<EventHubEventDto> driverPartitionEvents(RuntimeProcessingModuleContext context) {
|
||||
RuntimeProcessingModuleResult mixingResult = context.previousResults().get(DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING);
|
||||
if (mixingResult != null && mixingResult.output() instanceof RuntimeMixedEventBundle mixed) {
|
||||
return safeList(mixed.driverPartitionEvents());
|
||||
}
|
||||
return sourceEvents(context);
|
||||
}
|
||||
|
||||
public static List<Map<String, Object>> activityPointEvents(List<EventHubEventDto> sourceEvents) {
|
||||
return safeList(sourceEvents).stream()
|
||||
.map(DriverWorkingTimeEplEventMapper::toActivityPointEvent)
|
||||
|
|
|
|||
|
|
@ -114,13 +114,6 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"JAVA",
|
||||
Set.of("UnifiedRuntimeEventBundle")
|
||||
));
|
||||
descriptors.add(new RuntimeProcessingModuleDescriptorDto(
|
||||
DriverWorkingTimeModuleKeys.EVENT_EVIDENCE_MIXING,
|
||||
"Event evidence mixing",
|
||||
"Applies source-aware runtime evidence rules before intervalization. Initially collapses same-event-key CARD_ACTIVITY/VU_ACTIVITY duplicates and leaves CARD_VEHICLES_USED unchanged.",
|
||||
"JAVA",
|
||||
Set.of("RuntimeMixedEventBundle", "RuntimeEventMixingDecisionDto")
|
||||
));
|
||||
}
|
||||
descriptors.addAll(List.of(
|
||||
new RuntimeProcessingModuleDescriptorDto(
|
||||
|
|
@ -178,8 +171,7 @@ public class DriverWorkingTimeRuntimeProcessingPlan implements RuntimeProcessing
|
|||
"vehicleEvidencePaddingMinutes",
|
||||
"includeActivityIntervals",
|
||||
"includeDrivingIntervals",
|
||||
"includePartitionDebug",
|
||||
"eventMixingMode"
|
||||
"includePartitionDebug"
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ public class VehicleUnitXmlExtractionService {
|
|||
continue;
|
||||
}
|
||||
|
||||
builder.addVehicleUsageInterval(new ExtractedCardVehicleUsageInterval(
|
||||
builder.vehicleUsageIntervals.add(new ExtractedCardVehicleUsageInterval(
|
||||
"VUIW-" + (i + 1),
|
||||
from,
|
||||
to,
|
||||
|
|
@ -124,9 +124,9 @@ public class VehicleUnitXmlExtractionService {
|
|||
vehicleContext.vehicle() == null ? null : vehicleContext.vehicle().vehicleKey(),
|
||||
path
|
||||
));
|
||||
addVuCardIwInterval(vuCardIwIntervals, new VuCardIwInterval(
|
||||
vuCardIwIntervals.add(new VuCardIwInterval(
|
||||
driverKey,
|
||||
normalizeVuSlot(text(record, "cardSlotNumber")),
|
||||
normalizeToken(text(record, "cardSlotNumber")),
|
||||
from,
|
||||
to,
|
||||
path
|
||||
|
|
@ -270,7 +270,7 @@ public class VehicleUnitXmlExtractionService {
|
|||
parsedChanges.add(new ActivityChange(
|
||||
from,
|
||||
normalizeActivity(text(change, "activity")),
|
||||
normalizeVuSlot(text(change, "slot")),
|
||||
normalizeToken(text(change, "slot")),
|
||||
normalizeToken(text(change, "cardStatus")),
|
||||
normalizeToken(text(change, "drivingStatus")),
|
||||
dayPath + "/activityChangeInfos[" + (changeIndex + 1) + "]"
|
||||
|
|
@ -967,12 +967,11 @@ public class VehicleUnitXmlExtractionService {
|
|||
String explicitSlot,
|
||||
List<VuCardIwInterval> vuCardIwIntervals
|
||||
) {
|
||||
String normalizedExplicitSlot = normalizeVuSlot(explicitSlot);
|
||||
if (explicitDriverKey != null) {
|
||||
return List.of(new DriverAssignment(explicitDriverKey, normalizedExplicitSlot));
|
||||
return List.of(new DriverAssignment(explicitDriverKey, explicitSlot));
|
||||
}
|
||||
return vuCardIwIntervals.stream()
|
||||
.filter(iw -> normalizedExplicitSlot == null || normalizedExplicitSlot.equals(iw.slot()))
|
||||
.filter(iw -> explicitSlot == null || explicitSlot.equals(iw.slot()))
|
||||
.filter(iw -> iw.covers(occurredAt))
|
||||
.map(iw -> new DriverAssignment(iw.driverKey(), iw.slot()))
|
||||
.toList();
|
||||
|
|
@ -986,22 +985,6 @@ public class VehicleUnitXmlExtractionService {
|
|||
return List.copyOf(unique.values());
|
||||
}
|
||||
|
||||
private void addVuCardIwInterval(List<VuCardIwInterval> intervals, VuCardIwInterval candidate) {
|
||||
boolean alreadyPresent = intervals.stream().anyMatch(existing ->
|
||||
sameVuCardIwInterval(existing, candidate)
|
||||
);
|
||||
if (!alreadyPresent) {
|
||||
intervals.add(candidate);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean sameVuCardIwInterval(VuCardIwInterval existing, VuCardIwInterval candidate) {
|
||||
return java.util.Objects.equals(existing.driverKey(), candidate.driverKey())
|
||||
&& java.util.Objects.equals(existing.slot(), candidate.slot())
|
||||
&& java.util.Objects.equals(existing.from(), candidate.from())
|
||||
&& java.util.Objects.equals(existing.to(), candidate.to());
|
||||
}
|
||||
|
||||
private String driverKeyFromCardNode(Element node, String basePath) {
|
||||
String cardNation = text(node, basePath + "/cardIssuingMemberState");
|
||||
String cardNumber = driverKeyFactory.canonicalCardNumber(joinCardNumber(node, basePath + "/cardNumber"));
|
||||
|
|
@ -1090,18 +1073,6 @@ public class VehicleUnitXmlExtractionService {
|
|||
return value.trim().toUpperCase().replace('-', '_').replace(' ', '_');
|
||||
}
|
||||
|
||||
private String normalizeVuSlot(String value) {
|
||||
String normalized = normalizeToken(value);
|
||||
if (normalized == null) {
|
||||
return null;
|
||||
}
|
||||
return switch (normalized) {
|
||||
case "0", "DRIVER" -> "DRIVER";
|
||||
case "1", "CO_DRIVER", "CODRIVER" -> "CO_DRIVER";
|
||||
default -> normalized;
|
||||
};
|
||||
}
|
||||
|
||||
private String mapPlaceEntryType(String entryType) {
|
||||
String normalized = normalizeToken(entryType);
|
||||
if (normalized == null) {
|
||||
|
|
@ -1229,27 +1200,6 @@ public class VehicleUnitXmlExtractionService {
|
|||
}
|
||||
}
|
||||
|
||||
private void addVehicleUsageInterval(ExtractedCardVehicleUsageInterval candidate) {
|
||||
boolean alreadyPresent = vehicleUsageIntervals.stream().anyMatch(existing ->
|
||||
sameVehicleUsageInterval(existing, candidate)
|
||||
);
|
||||
if (!alreadyPresent) {
|
||||
vehicleUsageIntervals.add(candidate);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean sameVehicleUsageInterval(
|
||||
ExtractedCardVehicleUsageInterval existing,
|
||||
ExtractedCardVehicleUsageInterval candidate
|
||||
) {
|
||||
return java.util.Objects.equals(existing.from(), candidate.from())
|
||||
&& java.util.Objects.equals(existing.to(), candidate.to())
|
||||
&& java.util.Objects.equals(existing.odometerBeginKm(), candidate.odometerBeginKm())
|
||||
&& java.util.Objects.equals(existing.odometerEndKm(), candidate.odometerEndKm())
|
||||
&& java.util.Objects.equals(existing.registrationKey(), candidate.registrationKey())
|
||||
&& java.util.Objects.equals(existing.vehicleKey(), candidate.vehicleKey());
|
||||
}
|
||||
|
||||
private DriverExtractionSession build() {
|
||||
return new DriverExtractionSession(
|
||||
driverKey,
|
||||
|
|
|
|||
|
|
@ -1,467 +0,0 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.mixing;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import at.procon.eventhub.dto.DriverCardRefDto;
|
||||
import at.procon.eventhub.dto.DriverRefDto;
|
||||
import at.procon.eventhub.dto.EventDetailsDto;
|
||||
import at.procon.eventhub.dto.EventDomain;
|
||||
import at.procon.eventhub.dto.EventHubEventDto;
|
||||
import at.procon.eventhub.dto.EventHubPackageRequest;
|
||||
import at.procon.eventhub.dto.EventLifecycle;
|
||||
import at.procon.eventhub.dto.EventSourceDto;
|
||||
import at.procon.eventhub.dto.EventType;
|
||||
import at.procon.eventhub.dto.ImportScopeDto;
|
||||
import at.procon.eventhub.dto.VehicleRefDto;
|
||||
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import java.time.LocalDate;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class RuntimeEventMixingServiceTest {
|
||||
|
||||
private final RuntimeEventMixingService service = new RuntimeEventMixingService();
|
||||
|
||||
@Test
|
||||
void suppressesVuActivityDuplicateFromActivityTimelineWhenCardActivityHasSameEventKey() {
|
||||
EventHubEventDto card = activity("CARD_ACTIVITY", "DRIVER_CARD", "TACHOGRAPH:CARD_ACTIVITY:1:START");
|
||||
EventHubEventDto vu = activity("VU_ACTIVITY", "VEHICLE_UNIT", "TACHOGRAPH:VU_ACTIVITY:2:START");
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
|
||||
|
||||
assertThat(mixed.rawEvents()).hasSize(2);
|
||||
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:CARD_ACTIVITY:1:START");
|
||||
assertThat(mixed.driverPartitionEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:CARD_ACTIVITY:1:START");
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:VU_ACTIVITY:2:START");
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(1);
|
||||
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
|
||||
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_ACTIVITY_SAME_EVENT_KEY);
|
||||
}
|
||||
|
||||
@Test
|
||||
void keepsVuActivityWhenNoMatchingCardActivityExists() {
|
||||
EventHubEventDto vu = activity("VU_ACTIVITY", "VEHICLE_UNIT", "TACHOGRAPH:VU_ACTIVITY:2:START");
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(List.of(vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
|
||||
|
||||
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:VU_ACTIVITY:2:START");
|
||||
assertThat(mixed.suppressedEvents()).isEmpty();
|
||||
assertThat(mixed.eventMixingDecisions()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void suppressesTachographFileSessionVuActivityDuplicateWhenNoExtractionCodeIsPresent() {
|
||||
EventHubEventDto card = fileSessionActivity(
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-interval-1:START:2026-04-01T00:00:00Z"
|
||||
);
|
||||
EventHubEventDto vu = fileSessionActivity(
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-interval-1:START:2026-04-01T00:00:00Z"
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
|
||||
|
||||
assertThat(mixed.activityTimelineEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:ACTIVITY:card-interval-1:START:2026-04-01T00:00:00Z");
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:ACTIVITY:vu-interval-1:START:2026-04-01T00:00:00Z");
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(1);
|
||||
assertThat(mixed.eventMixingDecisions().getFirst().secondaryExtractionCodes())
|
||||
.containsExactly("VU_ACTIVITY");
|
||||
}
|
||||
|
||||
@Test
|
||||
void keepsAllCardVehicleUsedAndIwCycleEventsForVehicleUsageProcessing() {
|
||||
EventHubEventDto cardVehicleUsed = cardUsage(
|
||||
"CARD_VEHICLES_USED",
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT",
|
||||
EventType.CARD_INSERTED,
|
||||
EventLifecycle.INSERT
|
||||
);
|
||||
EventHubEventDto iwCycle = cardUsage(
|
||||
"IW_CYCLE",
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH:IW_CYCLE:20:INSERT",
|
||||
EventType.CARD_INSERTED,
|
||||
EventLifecycle.INSERT
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(List.of(cardVehicleUsed, iwCycle), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
|
||||
|
||||
assertThat(mixed.vehicleUsageEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:CARD_VEHICLES_USED:10:INSERT", "TACHOGRAPH:IW_CYCLE:20:INSERT");
|
||||
assertThat(mixed.suppressedEvents()).isEmpty();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void suppressesVuPositionDuplicateFromSupportEvidenceAndCopiesVehicleIdentityToCardPrimary() {
|
||||
EventHubEventDto card = supportEvidence(
|
||||
"CARD_POSITION",
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH:CARD_POSITION:1",
|
||||
EventDomain.POSITION,
|
||||
EventType.POSITION_RECORDED,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
false
|
||||
);
|
||||
EventHubEventDto vu = supportEvidence(
|
||||
"VU_POSITION",
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH:VU_POSITION:2",
|
||||
EventDomain.POSITION,
|
||||
EventType.POSITION_RECORDED,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
true
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
|
||||
|
||||
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:CARD_POSITION:1");
|
||||
assertThat(mixed.supportEvidenceEvents().getFirst().vehicleRef().vin())
|
||||
.isEqualTo("WDB9634031L123456");
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:VU_POSITION:2");
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(1);
|
||||
assertThat(mixed.eventMixingDecisions().getFirst().ruleId())
|
||||
.isEqualTo(RuntimeEventMixingService.RULE_TACHOGRAPH_CARD_VU_SUPPORT_COMPATIBLE_KEY);
|
||||
assertThat(mixed.eventMixingDecisions().getFirst().channel())
|
||||
.isEqualTo("SUPPORT_EVIDENCE");
|
||||
}
|
||||
|
||||
@Test
|
||||
void suppressesVuPlaceAndBorderDuplicateFromSupportEvidence() {
|
||||
EventHubEventDto cardPlace = supportEvidence(
|
||||
"CARD_PLACE",
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH:CARD_PLACE:1",
|
||||
EventDomain.PLACE,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
false
|
||||
);
|
||||
EventHubEventDto vuPlace = supportEvidence(
|
||||
"VU_PLACE",
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH:VU_PLACE:2",
|
||||
EventDomain.PLACE,
|
||||
EventType.WORKING_DAY_PLACE_RECORDED,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
true
|
||||
);
|
||||
EventHubEventDto cardBorder = supportEvidence(
|
||||
"CARD_BORDER_CROSSING",
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH:CARD_BORDER_CROSSING:3",
|
||||
EventDomain.BORDER_CROSSING,
|
||||
EventType.BORDER_OUTBOUND,
|
||||
EventLifecycle.OUTBOUND,
|
||||
false
|
||||
);
|
||||
EventHubEventDto vuBorder = supportEvidence(
|
||||
"VU_BORDER_CROSSING",
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH:VU_BORDER_CROSSING:4",
|
||||
EventDomain.BORDER_CROSSING,
|
||||
EventType.BORDER_OUTBOUND,
|
||||
EventLifecycle.OUTBOUND,
|
||||
true
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(List.of(cardPlace, vuPlace, cardBorder, vuBorder), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
|
||||
|
||||
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:CARD_BORDER_CROSSING:3", "TACHOGRAPH:CARD_PLACE:1");
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH:VU_BORDER_CROSSING:4", "TACHOGRAPH:VU_PLACE:2");
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void suppressesTachographFileSessionVuPositionDuplicateWhenNoExtractionCodeIsPresent() {
|
||||
EventHubEventDto card = fileSessionSupportEvidence(
|
||||
"DRIVER_CARD",
|
||||
"TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-position-1:SNAPSHOT:2026-04-01T00:00:00Z",
|
||||
EventDomain.POSITION,
|
||||
EventType.POSITION_RECORDED,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
false
|
||||
);
|
||||
EventHubEventDto vu = fileSessionSupportEvidence(
|
||||
"VEHICLE_UNIT",
|
||||
"TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-position-1:SNAPSHOT:2026-04-01T00:00:00Z",
|
||||
EventDomain.POSITION,
|
||||
EventType.POSITION_RECORDED,
|
||||
EventLifecycle.SNAPSHOT,
|
||||
true
|
||||
);
|
||||
|
||||
RuntimeMixedEventBundle mixed = service.mix(List.of(card, vu), RuntimeEventMixingService.MODE_TACHOGRAPH_SAME_SOURCE);
|
||||
|
||||
assertThat(mixed.supportEvidenceEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH_FILE_SESSION:11111111-1111-1111-1111-111111111111:SUPPORT:card-position-1:SNAPSHOT:2026-04-01T00:00:00Z");
|
||||
assertThat(mixed.supportEvidenceEvents().getFirst().vehicleRef().vin())
|
||||
.isEqualTo("WDB9634031L123456");
|
||||
assertThat(mixed.suppressedEvents()).extracting(EventHubEventDto::externalSourceEventId)
|
||||
.containsExactly("TACHOGRAPH_FILE_SESSION:22222222-2222-2222-2222-222222222222:SUPPORT:vu-position-1:SNAPSHOT:2026-04-01T00:00:00Z");
|
||||
assertThat(mixed.eventMixingDecisions()).hasSize(1);
|
||||
assertThat(mixed.eventMixingDecisions().getFirst().secondaryExtractionCodes())
|
||||
.containsExactly("VU_POSITION");
|
||||
}
|
||||
|
||||
private EventHubEventDto activity(String extractionCode, String sourceKind, String externalId) {
|
||||
ObjectNode raw = baseRaw(extractionCode, sourceKind);
|
||||
raw.put("activityType", "BREAK_REST");
|
||||
raw.put("cardSlot", "DRIVER");
|
||||
raw.put("cardStatus", "INSERTED");
|
||||
raw.put("drivingStatus", "SINGLE");
|
||||
raw.put("startedAt", "2026-03-31T15:43:00Z");
|
||||
raw.put("endedAt", "2026-04-01T00:00:00Z");
|
||||
raw.put("intervalId", "TACHOGRAPH:" + extractionCode + ":1");
|
||||
ObjectNode payload = JsonNodeFactory.instance.objectNode();
|
||||
payload.set("raw", raw);
|
||||
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
|
||||
attributes.put("cardSlot", "DRIVER");
|
||||
attributes.put("cardStatus", "INSERTED");
|
||||
attributes.put("drivingStatus", "SINGLE");
|
||||
return event(
|
||||
externalId,
|
||||
sourceKind,
|
||||
EventDomain.DRIVER_ACTIVITY,
|
||||
EventType.BREAK_REST,
|
||||
EventLifecycle.START,
|
||||
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
|
||||
new EventDetailsDto("DRIVER_ACTIVITY", attributes),
|
||||
payload
|
||||
);
|
||||
}
|
||||
|
||||
private EventHubEventDto supportEvidence(
|
||||
String extractionCode,
|
||||
String sourceKind,
|
||||
String externalId,
|
||||
EventDomain domain,
|
||||
EventType type,
|
||||
EventLifecycle lifecycle,
|
||||
boolean withVin
|
||||
) {
|
||||
ObjectNode raw = baseRaw(extractionCode, sourceKind);
|
||||
raw.put("latitude", "48.208174");
|
||||
raw.put("longitude", "16.373819");
|
||||
raw.put("odometerM", "123000");
|
||||
raw.put("country", "AT");
|
||||
raw.put("region", "W");
|
||||
raw.put("countryFrom", "AT");
|
||||
raw.put("countryTo", "DE");
|
||||
ObjectNode payload = JsonNodeFactory.instance.objectNode();
|
||||
payload.set("raw", raw);
|
||||
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
|
||||
attributes.put("country", "AT");
|
||||
attributes.put("region", "W");
|
||||
attributes.put("countryFrom", "AT");
|
||||
attributes.put("countryTo", "DE");
|
||||
VehicleRefDto vehicleRef = withVin
|
||||
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
|
||||
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));
|
||||
return event(
|
||||
externalId,
|
||||
sourceKind,
|
||||
domain,
|
||||
type,
|
||||
lifecycle,
|
||||
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
|
||||
new EventDetailsDto(domain.name(), attributes),
|
||||
payload,
|
||||
vehicleRef
|
||||
);
|
||||
}
|
||||
|
||||
private EventHubEventDto fileSessionActivity(String sourceKind, String externalId) {
|
||||
ObjectNode raw = baseRawWithoutExtraction(sourceKind);
|
||||
raw.put("activityType", "BREAK_REST");
|
||||
raw.put("cardSlot", "DRIVER");
|
||||
raw.put("cardStatus", "INSERTED");
|
||||
raw.put("drivingStatus", "SINGLE");
|
||||
raw.put("startedAt", "2026-03-31T15:43:00Z");
|
||||
raw.put("endedAt", "2026-04-01T00:00:00Z");
|
||||
raw.put("intervalId", "TACHOGRAPH_FILE_SESSION:ACTIVITY:1");
|
||||
ObjectNode payload = JsonNodeFactory.instance.objectNode();
|
||||
payload.set("raw", raw);
|
||||
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
|
||||
attributes.put("cardSlot", "DRIVER");
|
||||
attributes.put("cardStatus", "INSERTED");
|
||||
attributes.put("drivingStatus", "SINGLE");
|
||||
return event(
|
||||
externalId,
|
||||
sourceKind,
|
||||
EventDomain.DRIVER_ACTIVITY,
|
||||
EventType.BREAK_REST,
|
||||
EventLifecycle.START,
|
||||
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
|
||||
new EventDetailsDto("DRIVER_ACTIVITY", attributes),
|
||||
payload
|
||||
);
|
||||
}
|
||||
|
||||
private EventHubEventDto fileSessionSupportEvidence(
|
||||
String sourceKind,
|
||||
String externalId,
|
||||
EventDomain domain,
|
||||
EventType type,
|
||||
EventLifecycle lifecycle,
|
||||
boolean withVin
|
||||
) {
|
||||
ObjectNode raw = baseRawWithoutExtraction(sourceKind);
|
||||
raw.put("latitude", "48.208174");
|
||||
raw.put("longitude", "16.373819");
|
||||
raw.put("odometerM", "123000");
|
||||
raw.put("country", "AT");
|
||||
raw.put("region", "W");
|
||||
raw.put("countryFrom", "AT");
|
||||
raw.put("countryTo", "DE");
|
||||
ObjectNode payload = JsonNodeFactory.instance.objectNode();
|
||||
payload.set("raw", raw);
|
||||
ObjectNode attributes = JsonNodeFactory.instance.objectNode();
|
||||
attributes.put("country", "AT");
|
||||
attributes.put("region", "W");
|
||||
attributes.put("countryFrom", "AT");
|
||||
attributes.put("countryTo", "DE");
|
||||
VehicleRefDto vehicleRef = withVin
|
||||
? new VehicleRefDto("1:VEHICLE-ID", "WDB9634031L123456", "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
|
||||
: new VehicleRefDto(null, null, "1:REG-ID", new VehicleRegistrationRefDto("1", 1, "LL-158TE"));
|
||||
return event(
|
||||
externalId,
|
||||
sourceKind,
|
||||
domain,
|
||||
type,
|
||||
lifecycle,
|
||||
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
|
||||
new EventDetailsDto(domain.name(), attributes),
|
||||
payload,
|
||||
vehicleRef
|
||||
);
|
||||
}
|
||||
|
||||
private EventHubEventDto cardUsage(
|
||||
String extractionCode,
|
||||
String sourceKind,
|
||||
String externalId,
|
||||
EventType eventType,
|
||||
EventLifecycle lifecycle
|
||||
) {
|
||||
ObjectNode raw = baseRaw(extractionCode, sourceKind);
|
||||
raw.put("intervalId", "TACHOGRAPH:" + extractionCode + ":1");
|
||||
ObjectNode payload = JsonNodeFactory.instance.objectNode();
|
||||
payload.set("raw", raw);
|
||||
return event(
|
||||
externalId,
|
||||
sourceKind,
|
||||
EventDomain.DRIVER_CARD,
|
||||
eventType,
|
||||
lifecycle,
|
||||
OffsetDateTime.parse("2026-04-01T00:00:00Z"),
|
||||
null,
|
||||
payload
|
||||
);
|
||||
}
|
||||
|
||||
private ObjectNode baseRaw(String extractionCode, String sourceKind) {
|
||||
ObjectNode raw = JsonNodeFactory.instance.objectNode();
|
||||
raw.put("sourceRowId", extractionCode + "-1");
|
||||
raw.put("sourceKind", sourceKind);
|
||||
raw.put("extractionCode", extractionCode);
|
||||
raw.put("driverKey", "1:10000000198490");
|
||||
raw.put("registrationKey", "1:LL-158TE");
|
||||
return raw;
|
||||
}
|
||||
|
||||
private ObjectNode baseRawWithoutExtraction(String sourceKind) {
|
||||
ObjectNode raw = JsonNodeFactory.instance.objectNode();
|
||||
raw.put("sourceRowId", sourceKind + "-1");
|
||||
raw.put("sourceKind", sourceKind);
|
||||
raw.put("driverKey", "1:10000000198490");
|
||||
raw.put("registrationKey", "1:LL-158TE");
|
||||
return raw;
|
||||
}
|
||||
|
||||
private EventHubEventDto event(
|
||||
String externalId,
|
||||
String sourceKind,
|
||||
EventDomain domain,
|
||||
EventType type,
|
||||
EventLifecycle lifecycle,
|
||||
OffsetDateTime occurredAt,
|
||||
EventDetailsDto details,
|
||||
ObjectNode payload
|
||||
) {
|
||||
return event(
|
||||
externalId,
|
||||
sourceKind,
|
||||
domain,
|
||||
type,
|
||||
lifecycle,
|
||||
occurredAt,
|
||||
details,
|
||||
payload,
|
||||
new VehicleRefDto(null, null, null, new VehicleRegistrationRefDto("1", 1, "LL-158TE"))
|
||||
);
|
||||
}
|
||||
|
||||
private EventHubEventDto event(
|
||||
String externalId,
|
||||
String sourceKind,
|
||||
EventDomain domain,
|
||||
EventType type,
|
||||
EventLifecycle lifecycle,
|
||||
OffsetDateTime occurredAt,
|
||||
EventDetailsDto details,
|
||||
ObjectNode payload,
|
||||
VehicleRefDto vehicleRef
|
||||
) {
|
||||
return new EventHubEventDto(
|
||||
UUID.randomUUID(),
|
||||
externalId,
|
||||
new DriverRefDto("1:10000000198490", new DriverCardRefDto("1", 1, "10000000198490")),
|
||||
vehicleRef,
|
||||
occurredAt,
|
||||
null,
|
||||
occurredAt,
|
||||
domain,
|
||||
type,
|
||||
lifecycle,
|
||||
null,
|
||||
null,
|
||||
details,
|
||||
null,
|
||||
payload,
|
||||
false,
|
||||
packageInfo(sourceKind, domain, occurredAt.toLocalDate())
|
||||
);
|
||||
}
|
||||
|
||||
private EventHubPackageRequest packageInfo(String sourceKind, EventDomain domain, LocalDate businessDate) {
|
||||
EventSourceDto source = new EventSourceDto("TACHOGRAPH", sourceKind, "TACHOGRAPH_" + sourceKind, null, null, null);
|
||||
OffsetDateTime from = businessDate.atStartOfDay().atOffset(java.time.ZoneOffset.UTC);
|
||||
OffsetDateTime to = businessDate.plusDays(1).atStartOfDay().atOffset(java.time.ZoneOffset.UTC);
|
||||
return new EventHubPackageRequest(
|
||||
"default",
|
||||
source,
|
||||
null,
|
||||
ImportScopeDto.tenantAll(from, to),
|
||||
domain.name(),
|
||||
businessDate,
|
||||
source.stableKey() + ":" + domain.name() + ":" + businessDate
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -140,60 +140,6 @@ class VehicleUnitXmlExtractionServiceTest {
|
|||
assertThat(session.warnings()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void matchesNumericVuCardSlotNumbersToDriverActivitySlots() throws Exception {
|
||||
TachographFileSession session = service.extract(
|
||||
new TachographXmlParser.ParsedTachographXml(document(VehicleUnitXmlSamples.vehicleUnitXmlWithNumericCardSlots()), "VehicleUnit"),
|
||||
new TachographFileSessionMetadata(
|
||||
"default",
|
||||
"legalrequirements-vehicleunit",
|
||||
"sample-vu",
|
||||
"sample-vu.ddd",
|
||||
"abc",
|
||||
10,
|
||||
"42",
|
||||
"def",
|
||||
false,
|
||||
null
|
||||
),
|
||||
Instant.now(),
|
||||
Instant.now().plus(4, ChronoUnit.HOURS)
|
||||
);
|
||||
|
||||
DriverExtractionSession driver = session.driversByKey().get("12:12345678901200");
|
||||
assertThat(driver).isNotNull();
|
||||
assertThat(driver.cardActivityIntervals()).hasSize(2);
|
||||
assertThat(driver.cardActivityIntervals()).extracting("slot").containsOnly("DRIVER");
|
||||
assertThat(session.warnings()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void deduplicatesEquivalentVehicleUnitVehicleUsageIntervals() throws Exception {
|
||||
TachographFileSession session = service.extract(
|
||||
new TachographXmlParser.ParsedTachographXml(document(VehicleUnitXmlSamples.vehicleUnitXmlWithDuplicateVehicleUsageInterval()), "VehicleUnit"),
|
||||
new TachographFileSessionMetadata(
|
||||
"default",
|
||||
"legalrequirements-vehicleunit",
|
||||
"sample-vu",
|
||||
"sample-vu.ddd",
|
||||
"abc",
|
||||
10,
|
||||
"42",
|
||||
"def",
|
||||
false,
|
||||
null
|
||||
),
|
||||
Instant.now(),
|
||||
Instant.now().plus(4, ChronoUnit.HOURS)
|
||||
);
|
||||
|
||||
DriverExtractionSession firstDriver = session.driversByKey().get("12:12345678901200");
|
||||
assertThat(firstDriver).isNotNull();
|
||||
assertThat(firstDriver.cardVehicleUsageIntervals()).hasSize(1);
|
||||
assertThat(firstDriver.cardVehicleUsageIntervals().get(0).from()).isEqualTo(OffsetDateTime.parse("2026-04-01T08:00:00Z"));
|
||||
assertThat(firstDriver.cardVehicleUsageIntervals().get(0).to()).isEqualTo(OffsetDateTime.parse("2026-04-01T11:00:00Z"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void keepsOpenVuCardUsageRecordWithoutClosingItAtDownloadPeriodEnd() throws Exception {
|
||||
TachographFileSession session = service.extract(
|
||||
|
|
|
|||
|
|
@ -314,41 +314,4 @@ final class VehicleUnitXmlSamples {
|
|||
</VehicleUnit>
|
||||
""";
|
||||
}
|
||||
|
||||
static String vehicleUnitXmlWithNumericCardSlots() {
|
||||
return vehicleUnitXmlWithActivityDownloadTime()
|
||||
.replace("<cardSlotNumber>DRIVER</cardSlotNumber>", "<cardSlotNumber>0</cardSlotNumber>");
|
||||
}
|
||||
|
||||
static String vehicleUnitXmlWithDuplicateVehicleUsageInterval() {
|
||||
String duplicateRecord = """
|
||||
<vuCardIWRecords>
|
||||
<cardHolderName>
|
||||
<holderSurname><name>Muster</name></holderSurname>
|
||||
<holderFirstNames><name>Max</name></holderFirstNames>
|
||||
</cardHolderName>
|
||||
<fullCardNumber>
|
||||
<cardType>DRIVER_CARD</cardType>
|
||||
<cardIssuingMemberState>12</cardIssuingMemberState>
|
||||
<cardNumber>
|
||||
<driverIdentification>123456789012</driverIdentification>
|
||||
<cardReplacementIndex>0</cardReplacementIndex>
|
||||
<cardRenewalIndex>0</cardRenewalIndex>
|
||||
</cardNumber>
|
||||
<generation>2</generation>
|
||||
</fullCardNumber>
|
||||
<cardExpiryDate>2031-04-01T00:00:00Z</cardExpiryDate>
|
||||
<cardInsertionTime>2026-04-01T08:00:00Z</cardInsertionTime>
|
||||
<vehicleOdometerValueAtInsertion>1000</vehicleOdometerValueAtInsertion>
|
||||
<cardSlotNumber>DRIVER</cardSlotNumber>
|
||||
<cardWithdrawalTime>2026-04-01T11:00:00Z</cardWithdrawalTime>
|
||||
<vehicleOdometerValueAtWithdrawal>1100</vehicleOdometerValueAtWithdrawal>
|
||||
<manualInputFlag>NO_ENTRY</manualInputFlag>
|
||||
</vuCardIWRecords>
|
||||
""";
|
||||
return vehicleUnitXml().replace(
|
||||
duplicateRecord,
|
||||
duplicateRecord + duplicateRecord
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue