Decouple runtime event window bundle selection
This commit is contained in:
parent
ff12953e05
commit
aa360c9f02
|
|
@ -0,0 +1,39 @@
|
||||||
|
package at.procon.eventhub.processing.model;
|
||||||
|
|
||||||
|
import at.procon.eventhub.dto.EventHubEventDto;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public record RuntimeTimelineEventBundle(
|
||||||
|
List<EventHubEventDto> activityEvents,
|
||||||
|
List<EventHubEventDto> vehicleUsageEvents,
|
||||||
|
List<EventHubEventDto> supportEvents
|
||||||
|
) {
|
||||||
|
public RuntimeTimelineEventBundle {
|
||||||
|
activityEvents = copy(activityEvents);
|
||||||
|
vehicleUsageEvents = copy(vehicleUsageEvents);
|
||||||
|
supportEvents = copy(supportEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RuntimeTimelineEventBundle empty() {
|
||||||
|
return new RuntimeTimelineEventBundle(List.of(), List.of(), List.of());
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<EventHubEventDto> allEvents() {
|
||||||
|
List<EventHubEventDto> result = new ArrayList<>(activityEvents.size() + vehicleUsageEvents.size() + supportEvents.size());
|
||||||
|
result.addAll(activityEvents);
|
||||||
|
result.addAll(vehicleUsageEvents);
|
||||||
|
result.addAll(supportEvents);
|
||||||
|
result.sort(Comparator.comparing(EventHubEventDto::occurredAt)
|
||||||
|
.thenComparing(event -> event.eventDomain().name())
|
||||||
|
.thenComparing(event -> event.eventType().name())
|
||||||
|
.thenComparing(event -> event.lifecycle().name())
|
||||||
|
.thenComparing(EventHubEventDto::externalSourceEventId));
|
||||||
|
return List.copyOf(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<EventHubEventDto> copy(List<EventHubEventDto> events) {
|
||||||
|
return events == null ? List.of() : List.copyOf(events);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,9 +3,8 @@ package at.procon.eventhub.processing.service;
|
||||||
import at.procon.eventhub.dto.EventHubEventDto;
|
import at.procon.eventhub.dto.EventHubEventDto;
|
||||||
import at.procon.eventhub.dto.EventLifecycle;
|
import at.procon.eventhub.dto.EventLifecycle;
|
||||||
import at.procon.eventhub.dto.EventType;
|
import at.procon.eventhub.dto.EventType;
|
||||||
|
import at.procon.eventhub.processing.model.RuntimeTimelineEventBundle;
|
||||||
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
|
import at.procon.eventhub.processing.support.RuntimeEventIdentityResolver;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
|
||||||
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
|
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
|
@ -16,23 +15,23 @@ final class RuntimeIntervalEventWindowSelector {
|
||||||
private RuntimeIntervalEventWindowSelector() {
|
private RuntimeIntervalEventWindowSelector() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static TachographTimelineEventBundle filterBundle(
|
static RuntimeTimelineEventBundle filterBundle(
|
||||||
TachographTimelineEventBundle bundle,
|
RuntimeTimelineEventBundle bundle,
|
||||||
OffsetDateTime occurredFrom,
|
OffsetDateTime occurredFrom,
|
||||||
OffsetDateTime occurredTo,
|
OffsetDateTime occurredTo,
|
||||||
boolean includeIntersectingIntervals
|
boolean includeIntersectingIntervals
|
||||||
) {
|
) {
|
||||||
if (bundle == null) {
|
if (bundle == null) {
|
||||||
return new TachographTimelineEventBundle(List.of(), List.of(), List.of());
|
return RuntimeTimelineEventBundle.empty();
|
||||||
}
|
}
|
||||||
return new TachographTimelineEventBundle(
|
return new RuntimeTimelineEventBundle(
|
||||||
filterIntervalEvents(bundle.activityEvents(), occurredFrom, occurredTo, includeIntersectingIntervals),
|
filterIntervalEvents(bundle.activityEvents(), occurredFrom, occurredTo, includeIntersectingIntervals),
|
||||||
filterIntervalEvents(bundle.vehicleUsageEvents(), occurredFrom, occurredTo, includeIntersectingIntervals),
|
filterIntervalEvents(bundle.vehicleUsageEvents(), occurredFrom, occurredTo, includeIntersectingIntervals),
|
||||||
filterPointEvents(bundle.supportEvents(), occurredFrom, occurredTo)
|
filterPointEvents(bundle.supportEvents(), occurredFrom, occurredTo)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<EventHubEventDto> filterIntervalEvents(
|
static List<EventHubEventDto> filterIntervalEvents(
|
||||||
List<EventHubEventDto> events,
|
List<EventHubEventDto> events,
|
||||||
OffsetDateTime occurredFrom,
|
OffsetDateTime occurredFrom,
|
||||||
OffsetDateTime occurredTo,
|
OffsetDateTime occurredTo,
|
||||||
|
|
@ -59,11 +58,14 @@ final class RuntimeIntervalEventWindowSelector {
|
||||||
return List.copyOf(result);
|
return List.copyOf(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<EventHubEventDto> filterPointEvents(
|
static List<EventHubEventDto> filterPointEvents(
|
||||||
List<EventHubEventDto> events,
|
List<EventHubEventDto> events,
|
||||||
OffsetDateTime occurredFrom,
|
OffsetDateTime occurredFrom,
|
||||||
OffsetDateTime occurredTo
|
OffsetDateTime occurredTo
|
||||||
) {
|
) {
|
||||||
|
if (events == null || events.isEmpty()) {
|
||||||
|
return List.of();
|
||||||
|
}
|
||||||
return events.stream()
|
return events.stream()
|
||||||
.filter(event -> withinWindow(event.occurredAt(), occurredFrom, occurredTo))
|
.filter(event -> withinWindow(event.occurredAt(), occurredFrom, occurredTo))
|
||||||
.toList();
|
.toList();
|
||||||
|
|
@ -87,27 +89,6 @@ final class RuntimeIntervalEventWindowSelector {
|
||||||
return RuntimeEventIdentityResolver.runtimeIntervalKey(event);
|
return RuntimeEventIdentityResolver.runtimeIntervalKey(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JsonNode raw(EventHubEventDto event) {
|
|
||||||
JsonNode payload = event == null ? null : event.payload();
|
|
||||||
if (payload == null || payload.isNull() || payload.isMissingNode()) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
JsonNode raw = payload.get("raw");
|
|
||||||
return raw == null || raw.isNull() ? payload : raw;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String text(JsonNode node, String field) {
|
|
||||||
if (node == null || field == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
JsonNode value = node.get(field);
|
|
||||||
if (value == null || value.isNull()) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
String text = value.asText(null);
|
|
||||||
return text == null || text.isBlank() ? null : text.trim();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final class IntervalGroup {
|
private static final class IntervalGroup {
|
||||||
private final List<EventHubEventDto> events = new ArrayList<>();
|
private final List<EventHubEventDto> events = new ArrayList<>();
|
||||||
private OffsetDateTime startedAt;
|
private OffsetDateTime startedAt;
|
||||||
|
|
|
||||||
|
|
@ -54,11 +54,14 @@ public class TachographFileSessionUnifiedDriverEventSource implements UnifiedDri
|
||||||
DriverExtractionSession driver,
|
DriverExtractionSession driver,
|
||||||
UnifiedDriverEventsRequest request
|
UnifiedDriverEventsRequest request
|
||||||
) {
|
) {
|
||||||
return RuntimeIntervalEventWindowSelector.filterBundle(
|
TachographTimelineEventBundle bundle = eventBuilder.buildEventBundle(session, driver);
|
||||||
eventBuilder.buildEventBundle(session, driver),
|
return TachographTimelineEventBundle.fromRuntimeBundle(
|
||||||
request.occurredFrom(),
|
RuntimeIntervalEventWindowSelector.filterBundle(
|
||||||
request.occurredTo(),
|
bundle == null ? null : bundle.toRuntimeBundle(),
|
||||||
request.includeIntersectingIntervals()
|
request.occurredFrom(),
|
||||||
|
request.occurredTo(),
|
||||||
|
request.includeIntersectingIntervals()
|
||||||
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import at.procon.eventhub.dto.VehicleRefDto;
|
||||||
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
|
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
|
||||||
import at.procon.eventhub.processing.model.UnifiedVehicleEventsRequest;
|
import at.procon.eventhub.processing.model.UnifiedVehicleEventsRequest;
|
||||||
import at.procon.eventhub.reference.TachographNationRegistry;
|
import at.procon.eventhub.reference.TachographNationRegistry;
|
||||||
|
import at.procon.eventhub.tachographfilesession.model.DriverExtractionSession;
|
||||||
import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
|
import at.procon.eventhub.tachographfilesession.model.TachographFileSession;
|
||||||
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
|
import at.procon.eventhub.tachographfilesession.model.TachographTimelineEventBundle;
|
||||||
import at.procon.eventhub.tachographfilesession.service.DriverTimelineEventBuilder;
|
import at.procon.eventhub.tachographfilesession.service.DriverTimelineEventBuilder;
|
||||||
|
|
@ -37,18 +38,29 @@ public class TachographFileSessionUnifiedVehicleEventSource implements UnifiedVe
|
||||||
TachographFileSession session = repository.find(request.sessionId())
|
TachographFileSession session = repository.find(request.sessionId())
|
||||||
.orElseThrow(() -> new TachographFileSessionNotFoundException(request.sessionId()));
|
.orElseThrow(() -> new TachographFileSessionNotFoundException(request.sessionId()));
|
||||||
return session.driversByKey().values().stream()
|
return session.driversByKey().values().stream()
|
||||||
.map(driver -> RuntimeIntervalEventWindowSelector.filterBundle(
|
.map(driver -> filterBundle(session, driver, request).allEvents())
|
||||||
eventBuilder.buildEventBundle(session, driver),
|
|
||||||
request.occurredFrom(),
|
|
||||||
request.occurredTo(),
|
|
||||||
request.includeIntersectingIntervals()
|
|
||||||
).allEvents())
|
|
||||||
.flatMap(List::stream)
|
.flatMap(List::stream)
|
||||||
.filter(event -> matchesVehicle(event.vehicleRef(), request))
|
.filter(event -> matchesVehicle(event.vehicleRef(), request))
|
||||||
.distinct()
|
.distinct()
|
||||||
.toList();
|
.toList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TachographTimelineEventBundle filterBundle(
|
||||||
|
TachographFileSession session,
|
||||||
|
DriverExtractionSession driver,
|
||||||
|
UnifiedVehicleEventsRequest request
|
||||||
|
) {
|
||||||
|
TachographTimelineEventBundle bundle = eventBuilder.buildEventBundle(session, driver);
|
||||||
|
return TachographTimelineEventBundle.fromRuntimeBundle(
|
||||||
|
RuntimeIntervalEventWindowSelector.filterBundle(
|
||||||
|
bundle == null ? null : bundle.toRuntimeBundle(),
|
||||||
|
request.occurredFrom(),
|
||||||
|
request.occurredTo(),
|
||||||
|
request.includeIntersectingIntervals()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private boolean matchesVehicle(VehicleRefDto vehicleRef, UnifiedVehicleEventsRequest request) {
|
private boolean matchesVehicle(VehicleRefDto vehicleRef, UnifiedVehicleEventsRequest request) {
|
||||||
if (vehicleRef == null || !vehicleRef.hasAnyReference()) {
|
if (vehicleRef == null || !vehicleRef.hasAnyReference()) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package at.procon.eventhub.tachographfilesession.model;
|
package at.procon.eventhub.tachographfilesession.model;
|
||||||
|
|
||||||
import at.procon.eventhub.dto.EventHubEventDto;
|
import at.procon.eventhub.dto.EventHubEventDto;
|
||||||
|
import at.procon.eventhub.processing.model.RuntimeTimelineEventBundle;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -29,6 +30,16 @@ public record TachographTimelineEventBundle(
|
||||||
return List.copyOf(result);
|
return List.copyOf(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RuntimeTimelineEventBundle toRuntimeBundle() {
|
||||||
|
return new RuntimeTimelineEventBundle(activityEvents, vehicleUsageEvents, supportEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TachographTimelineEventBundle fromRuntimeBundle(RuntimeTimelineEventBundle bundle) {
|
||||||
|
return bundle == null
|
||||||
|
? new TachographTimelineEventBundle(List.of(), List.of(), List.of())
|
||||||
|
: new TachographTimelineEventBundle(bundle.activityEvents(), bundle.vehicleUsageEvents(), bundle.supportEvents());
|
||||||
|
}
|
||||||
|
|
||||||
private static List<EventHubEventDto> copy(List<EventHubEventDto> events) {
|
private static List<EventHubEventDto> copy(List<EventHubEventDto> events) {
|
||||||
return events == null ? List.of() : List.copyOf(events);
|
return events == null ? List.of() : List.copyOf(events);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue