Add runtime processing performance tracing
This commit is contained in:
parent
1d64bd92c7
commit
3bfe05b64e
|
|
@ -19,6 +19,8 @@ import java.util.ArrayList;
|
|||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
|
|
@ -32,6 +34,8 @@ import org.springframework.stereotype.Service;
|
|||
@Service
|
||||
public class DriverWorkingTimeProcessingCore {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DriverWorkingTimeProcessingCore.class);
|
||||
|
||||
private final DriverWorkingTimeDerivedProjectionEngine derivedProjectionEngine;
|
||||
|
||||
public DriverWorkingTimeProcessingCore(
|
||||
|
|
@ -42,6 +46,7 @@ public class DriverWorkingTimeProcessingCore {
|
|||
|
||||
public DriverWorkingTimeProcessingResultDto process(DriverWorkingTimeProcessingInput input) {
|
||||
Objects.requireNonNull(input, "input must not be null");
|
||||
long startedAtNanos = System.nanoTime();
|
||||
OffsetDateTime loadedFrom = input.loadedFrom();
|
||||
OffsetDateTime loadedTo = input.loadedTo();
|
||||
String driverKey = input.driverKey();
|
||||
|
|
@ -51,6 +56,7 @@ public class DriverWorkingTimeProcessingCore {
|
|||
throw new IllegalArgumentException("occurredTo must not be before occurredFrom.");
|
||||
}
|
||||
|
||||
long activityClipStartedAtNanos = System.nanoTime();
|
||||
List<DriverWorkingTimeActivityInterval> activityIntervals = clipActivityIntervals(
|
||||
input.activityIntervals(),
|
||||
requestedFrom,
|
||||
|
|
@ -63,9 +69,13 @@ public class DriverWorkingTimeProcessingCore {
|
|||
requestedFrom,
|
||||
requestedTo
|
||||
);
|
||||
long activityClipMs = elapsedMillis(activityClipStartedAtNanos);
|
||||
|
||||
long derivedProjectionBuildStartedAtNanos = System.nanoTime();
|
||||
DriverWorkingTimeDerivedProjectionBundle derivedProjectionBundle = derivedProjectionEngine.build(input);
|
||||
long derivedProjectionBuildMs = elapsedMillis(derivedProjectionBuildStartedAtNanos);
|
||||
|
||||
long derivedIntervalClipStartedAtNanos = System.nanoTime();
|
||||
List<DriverWorkingTimeDrivingInterruptionInterval> drivingInterruptionIntervals =
|
||||
clipDrivingInterruptionIntervals(
|
||||
derivedProjectionBundle.drivingInterruptionIntervals(),
|
||||
|
|
@ -131,8 +141,9 @@ public class DriverWorkingTimeProcessingCore {
|
|||
requestedFrom,
|
||||
requestedTo
|
||||
);
|
||||
long derivedIntervalClipMs = elapsedMillis(derivedIntervalClipStartedAtNanos);
|
||||
|
||||
return new DriverWorkingTimeProcessingResultDto(
|
||||
DriverWorkingTimeProcessingResultDto result = new DriverWorkingTimeProcessingResultDto(
|
||||
input.sessionId(),
|
||||
driverKey,
|
||||
input.sourceKind(),
|
||||
|
|
@ -168,6 +179,19 @@ public class DriverWorkingTimeProcessingCore {
|
|||
supportGeoEvents,
|
||||
combinedNotes(input.notes())
|
||||
);
|
||||
LOG.info(
|
||||
"Driver working-time core processed driver {} in {} ms (activityClipMs: {}, derivedProjectionBuildMs: {}, derivedIntervalClipMs: {}, activityIntervals: {}, drivingIntervals: {}, vehicleUsageIntervals: {}, supportGeoEvents: {})",
|
||||
driverKey,
|
||||
elapsedMillis(startedAtNanos),
|
||||
activityClipMs,
|
||||
derivedProjectionBuildMs,
|
||||
derivedIntervalClipMs,
|
||||
result.activityIntervalCount(),
|
||||
result.drivingIntervalCount(),
|
||||
result.vehicleUsageIntervalCount(),
|
||||
result.supportGeoEventCount()
|
||||
);
|
||||
return result;
|
||||
}
|
||||
|
||||
public DriverWorkingTimeProcessingResultDto processDriverWorkingTime(DriverWorkingTimeProcessingInput input) {
|
||||
|
|
@ -649,4 +673,8 @@ public class DriverWorkingTimeProcessingCore {
|
|||
}
|
||||
return left.isBefore(right) ? left : right;
|
||||
}
|
||||
|
||||
private long elapsedMillis(long startedAtNanos) {
|
||||
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,13 +28,18 @@ import java.time.OffsetDateTime;
|
|||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
|
@ -43,11 +48,17 @@ import org.springframework.util.StreamUtils;
|
|||
@Service
|
||||
public class DriverWorkingTimeReusableProjectionBuilder {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DriverWorkingTimeReusableProjectionBuilder.class);
|
||||
private static final AtomicLong RUNTIME_COUNTER = new AtomicLong();
|
||||
private static final String DRIVING_DERIVED_PROJECTION_BUNDLE_EPL_TEMPLATE =
|
||||
loadResource("esper/driver-working-time-derived-projections.epl");
|
||||
private static final Map<String, Object> ACTIVITY_INTERVAL_INPUT_DEFINITION = activityIntervalInputDefinitionStatic();
|
||||
private static final Map<String, Object> VEHICLE_USAGE_INTERVAL_INPUT_DEFINITION = vehicleUsageIntervalInputDefinitionStatic();
|
||||
private static final Map<String, Object> SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION = supportGeoEvidenceInputDefinitionStatic();
|
||||
|
||||
private final EventHubProperties properties;
|
||||
private final ConcurrentMap<PreparedProjectionDefinitionCacheKey, PreparedProjectionDefinition> preparedDefinitions =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
@Autowired
|
||||
public DriverWorkingTimeReusableProjectionBuilder(
|
||||
|
|
@ -78,6 +89,7 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
int significantDrivingMinutes,
|
||||
int minimumRestPeriodMinutes
|
||||
) {
|
||||
long startedAtNanos = System.nanoTime();
|
||||
if ((activityInputEvents == null || activityInputEvents.isEmpty())
|
||||
&& (vehicleUsageInputEvents == null || vehicleUsageInputEvents.isEmpty())) {
|
||||
return emptyDerivedProjectionBundle();
|
||||
|
|
@ -93,22 +105,9 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
List<DriverWorkingTimeRestCoverageInterval> potentialInVehicleOvernightStayIntervals = new ArrayList<>();
|
||||
List<DriverWorkingTimePotentialInVehicleTripInterval> potentialInVehicleTripIntervals = new ArrayList<>();
|
||||
|
||||
executeWithRuntime(
|
||||
configuration -> {
|
||||
configuration.getCommon().addEventType(
|
||||
"TachographActivityIntervalInputEvent",
|
||||
activityIntervalInputDefinition()
|
||||
);
|
||||
configuration.getCommon().addEventType(
|
||||
"TachographVehicleUsageIntervalInputEvent",
|
||||
vehicleUsageIntervalInputDefinition()
|
||||
);
|
||||
configuration.getCommon().addEventType(
|
||||
"TachographSupportGeoEvidenceInputEvent",
|
||||
supportGeoEvidenceInputDefinition()
|
||||
);
|
||||
},
|
||||
renderDrivingDerivedProjectionBundleEpl(significantDrivingMinutes, minimumRestPeriodMinutes),
|
||||
DerivedProjectionRuntimeExecutionMetrics runtimeMetrics = executeWithRuntime(
|
||||
significantDrivingMinutes,
|
||||
minimumRestPeriodMinutes,
|
||||
Map.of(
|
||||
"drivingInterruptionIntervals",
|
||||
newData -> collectDrivingInterruptionIntervalModels(newData, drivingInterruptionIntervals),
|
||||
|
|
@ -129,34 +128,26 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
"potentialInVehicleTripIntervals",
|
||||
newData -> collectPotentialInVehicleTripIntervalModels(newData, potentialInVehicleTripIntervals)
|
||||
),
|
||||
runtime -> {
|
||||
sender -> {
|
||||
if (supportGeoInputEvents != null) {
|
||||
for (Map<String, Object> supportGeoEvidence : supportGeoInputEvents) {
|
||||
runtime.getEventService().sendEventMap(
|
||||
supportGeoEvidence,
|
||||
"TachographSupportGeoEvidenceInputEvent"
|
||||
);
|
||||
sender.sendSupportGeoEvent(supportGeoEvidence);
|
||||
}
|
||||
}
|
||||
if (vehicleUsageInputEvents != null) {
|
||||
for (Map<String, Object> interval : vehicleUsageInputEvents) {
|
||||
runtime.getEventService().sendEventMap(
|
||||
interval,
|
||||
"TachographVehicleUsageIntervalInputEvent"
|
||||
);
|
||||
sender.sendVehicleUsageEvent(interval);
|
||||
}
|
||||
}
|
||||
if (activityInputEvents != null) {
|
||||
for (Map<String, Object> interval : activityInputEvents) {
|
||||
runtime.getEventService().sendEventMap(
|
||||
interval,
|
||||
"TachographActivityIntervalInputEvent"
|
||||
);
|
||||
sender.sendActivityEvent(interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
long sortOutputStartedAtNanos = System.nanoTime();
|
||||
List<DriverWorkingTimeRestCoverageInterval> sortedDailyWeeklyRestCandidateCoverageIntervals =
|
||||
sortDailyWeeklyRestCandidateCoverageIntervalsCommon(dailyWeeklyRestCandidateCoverageIntervals);
|
||||
List<DriverWorkingTimeRestCoverageInterval> sortedUnclassifiedDailyWeeklyRestCandidateCoverageIntervals =
|
||||
|
|
@ -165,6 +156,25 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
sortPotentialHomeOvernightStayIntervalsCommon(potentialHomeOvernightStayIntervals);
|
||||
List<DriverWorkingTimeRestCoverageInterval> sortedPotentialInVehicleOvernightStayIntervals =
|
||||
sortPotentialInVehicleOvernightStayIntervalsCommon(potentialInVehicleOvernightStayIntervals);
|
||||
long sortOutputMs = elapsedMillis(sortOutputStartedAtNanos);
|
||||
|
||||
LOG.info(
|
||||
"Driver working-time derived projection bundle built in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, sendSupportGeoMs: {}, sendVehicleUsageMs: {}, sendActivityMs: {}, destroyMs: {}, sortOutputMs: {}, activityInputEvents: {}, vehicleUsageInputEvents: {}, supportGeoInputEvents: {})",
|
||||
elapsedMillis(startedAtNanos),
|
||||
runtimeMetrics.definitionCacheHit(),
|
||||
runtimeMetrics.definitionPreparationMs(),
|
||||
runtimeMetrics.runtimeInitMs(),
|
||||
runtimeMetrics.deployMs(),
|
||||
runtimeMetrics.listenerRegistrationMs(),
|
||||
runtimeMetrics.sendSupportGeoMs(),
|
||||
runtimeMetrics.sendVehicleUsageMs(),
|
||||
runtimeMetrics.sendActivityMs(),
|
||||
runtimeMetrics.destroyMs(),
|
||||
sortOutputMs,
|
||||
safeList(activityInputEvents).size(),
|
||||
safeList(vehicleUsageInputEvents).size(),
|
||||
safeList(supportGeoInputEvents).size()
|
||||
);
|
||||
|
||||
return new DriverWorkingTimeDerivedProjectionBundle(
|
||||
sortDrivingInterruptionIntervalsCommon(drivingInterruptionIntervals),
|
||||
|
|
@ -237,39 +247,134 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
);
|
||||
}
|
||||
|
||||
private void executeWithRuntime(
|
||||
Consumer<Configuration> configurationSetup,
|
||||
String epl,
|
||||
private DerivedProjectionRuntimeExecutionMetrics executeWithRuntime(
|
||||
int significantDrivingMinutes,
|
||||
int minimumRestPeriodMinutes,
|
||||
Map<String, Consumer<EventBean[]>> listeners,
|
||||
Consumer<EPRuntime> sender
|
||||
Consumer<DerivedProjectionEventSender> sender
|
||||
) {
|
||||
EPRuntime runtime = null;
|
||||
long definitionPreparationMs = 0L;
|
||||
boolean definitionCacheHit = false;
|
||||
long runtimeInitMs = 0L;
|
||||
long deployMs = 0L;
|
||||
long listenerRegistrationMs = 0L;
|
||||
long sendSupportGeoMs = 0L;
|
||||
long sendVehicleUsageMs = 0L;
|
||||
long sendActivityMs = 0L;
|
||||
long destroyMs = 0L;
|
||||
try {
|
||||
Configuration configuration = new Configuration();
|
||||
configurationSetup.accept(configuration);
|
||||
PreparedProjectionDefinitionCacheKey cacheKey = new PreparedProjectionDefinitionCacheKey(
|
||||
significantDrivingMinutes,
|
||||
minimumRestPeriodMinutes,
|
||||
Math.max(1, properties.getRuntimeProcessing().getRestCandidateGeoLookbackMinutes()),
|
||||
Math.max(1, properties.getRuntimeProcessing().getRestCandidateGeoLookaheadMinutes()),
|
||||
Math.max(0, properties.getRuntimeProcessing().getRestCandidateGeoStationaryMaxMeters()),
|
||||
Math.max(
|
||||
properties.getRuntimeProcessing().getRestCandidateGeoStationaryMaxMeters(),
|
||||
properties.getRuntimeProcessing().getRestCandidateGeoMinorMovementMaxMeters()
|
||||
)
|
||||
);
|
||||
PreparedProjectionDefinition preparedDefinition = preparedDefinitions.get(cacheKey);
|
||||
definitionCacheHit = preparedDefinition != null;
|
||||
if (!definitionCacheHit) {
|
||||
long definitionPreparationStartedAtNanos = System.nanoTime();
|
||||
PreparedProjectionDefinition candidate = prepareProjectionDefinition(
|
||||
significantDrivingMinutes,
|
||||
minimumRestPeriodMinutes
|
||||
);
|
||||
PreparedProjectionDefinition existing = preparedDefinitions.putIfAbsent(cacheKey, candidate);
|
||||
preparedDefinition = existing == null ? candidate : existing;
|
||||
definitionCacheHit = existing != null;
|
||||
definitionPreparationMs = definitionCacheHit ? 0L : elapsedMillis(definitionPreparationStartedAtNanos);
|
||||
}
|
||||
|
||||
long runtimeInitStartedAtNanos = System.nanoTime();
|
||||
Configuration configuration = createRuntimeConfiguration();
|
||||
String runtimeUri = "eventhub-driver-working-time-reusable-projection-" + RUNTIME_COUNTER.incrementAndGet();
|
||||
runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration);
|
||||
runtimeInitMs = elapsedMillis(runtimeInitStartedAtNanos);
|
||||
|
||||
CompilerArguments arguments = new CompilerArguments(configuration);
|
||||
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments);
|
||||
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
|
||||
long deployStartedAtNanos = System.nanoTime();
|
||||
EPDeployment deployment = runtime.getDeploymentService().deploy(preparedDefinition.compiled());
|
||||
deployMs = elapsedMillis(deployStartedAtNanos);
|
||||
|
||||
long listenerRegistrationStartedAtNanos = System.nanoTime();
|
||||
for (Map.Entry<String, Consumer<EventBean[]>> entry : listeners.entrySet()) {
|
||||
runtime.getDeploymentService()
|
||||
.getStatement(deployment.getDeploymentId(), entry.getKey())
|
||||
.addListener((newData, oldData, statement, rt) -> entry.getValue().accept(newData));
|
||||
}
|
||||
listenerRegistrationMs = elapsedMillis(listenerRegistrationStartedAtNanos);
|
||||
|
||||
sender.accept(runtime);
|
||||
DerivedProjectionEventSender timedSender = new DerivedProjectionEventSender(runtime);
|
||||
sender.accept(timedSender);
|
||||
sendSupportGeoMs = timedSender.sendSupportGeoMs();
|
||||
sendVehicleUsageMs = timedSender.sendVehicleUsageMs();
|
||||
sendActivityMs = timedSender.sendActivityMs();
|
||||
} catch (EPCompileException | EPDeployException e) {
|
||||
throw new IllegalStateException("Cannot compile/deploy reusable driver working-time projection EPL bundle", e);
|
||||
} finally {
|
||||
if (runtime != null) {
|
||||
long destroyStartedAtNanos = System.nanoTime();
|
||||
runtime.destroy();
|
||||
destroyMs = elapsedMillis(destroyStartedAtNanos);
|
||||
}
|
||||
}
|
||||
return new DerivedProjectionRuntimeExecutionMetrics(
|
||||
definitionCacheHit,
|
||||
definitionPreparationMs,
|
||||
runtimeInitMs,
|
||||
deployMs,
|
||||
listenerRegistrationMs,
|
||||
sendSupportGeoMs,
|
||||
sendVehicleUsageMs,
|
||||
sendActivityMs,
|
||||
destroyMs
|
||||
);
|
||||
}
|
||||
|
||||
private Map<String, Object> activityIntervalInputDefinition() {
|
||||
return ACTIVITY_INTERVAL_INPUT_DEFINITION;
|
||||
}
|
||||
|
||||
private Map<String, Object> vehicleUsageIntervalInputDefinition() {
|
||||
return VEHICLE_USAGE_INTERVAL_INPUT_DEFINITION;
|
||||
}
|
||||
|
||||
private Map<String, Object> supportGeoEvidenceInputDefinition() {
|
||||
return SUPPORT_GEO_EVIDENCE_INPUT_DEFINITION;
|
||||
}
|
||||
|
||||
private PreparedProjectionDefinition prepareProjectionDefinition(
|
||||
int significantDrivingMinutes,
|
||||
int minimumRestPeriodMinutes
|
||||
) throws EPCompileException {
|
||||
Configuration configuration = createRuntimeConfiguration();
|
||||
String epl = renderDrivingDerivedProjectionBundleEpl(significantDrivingMinutes, minimumRestPeriodMinutes);
|
||||
CompilerArguments arguments = new CompilerArguments(configuration);
|
||||
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, arguments);
|
||||
return new PreparedProjectionDefinition(compiled);
|
||||
}
|
||||
|
||||
private Configuration createRuntimeConfiguration() {
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.getCommon().addEventType(
|
||||
"TachographActivityIntervalInputEvent",
|
||||
activityIntervalInputDefinition()
|
||||
);
|
||||
configuration.getCommon().addEventType(
|
||||
"TachographVehicleUsageIntervalInputEvent",
|
||||
vehicleUsageIntervalInputDefinition()
|
||||
);
|
||||
configuration.getCommon().addEventType(
|
||||
"TachographSupportGeoEvidenceInputEvent",
|
||||
supportGeoEvidenceInputDefinition()
|
||||
);
|
||||
return configuration;
|
||||
}
|
||||
|
||||
private static Map<String, Object> activityIntervalInputDefinitionStatic() {
|
||||
Map<String, Object> definition = new LinkedHashMap<>();
|
||||
definition.put("sessionId", UUID.class);
|
||||
definition.put("driverKey", String.class);
|
||||
|
|
@ -292,10 +397,10 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
definition.put("synthetic", boolean.class);
|
||||
definition.put("clippedToRequestedPeriod", boolean.class);
|
||||
definition.put("level", String.class);
|
||||
return definition;
|
||||
return Collections.unmodifiableMap(definition);
|
||||
}
|
||||
|
||||
private Map<String, Object> vehicleUsageIntervalInputDefinition() {
|
||||
private static Map<String, Object> vehicleUsageIntervalInputDefinitionStatic() {
|
||||
Map<String, Object> definition = new LinkedHashMap<>();
|
||||
definition.put("sessionId", UUID.class);
|
||||
definition.put("driverKey", String.class);
|
||||
|
|
@ -313,10 +418,10 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
definition.put("vehicleKey", String.class);
|
||||
definition.put("sourceKind", String.class);
|
||||
definition.put("sourceIntervalIds", java.util.List.class);
|
||||
return definition;
|
||||
return Collections.unmodifiableMap(definition);
|
||||
}
|
||||
|
||||
private Map<String, Object> supportGeoEvidenceInputDefinition() {
|
||||
private static Map<String, Object> supportGeoEvidenceInputDefinitionStatic() {
|
||||
Map<String, Object> definition = new LinkedHashMap<>();
|
||||
definition.put("sessionId", UUID.class);
|
||||
definition.put("driverKey", String.class);
|
||||
|
|
@ -330,7 +435,7 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
definition.put("longitude", Double.class);
|
||||
definition.put("odometerKm", Long.class);
|
||||
definition.put("priority", int.class);
|
||||
return definition;
|
||||
return Collections.unmodifiableMap(definition);
|
||||
}
|
||||
|
||||
private Map<String, Object> toActivityIntervalInputMap(
|
||||
|
|
@ -758,6 +863,10 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
return value instanceof Long number ? number : null;
|
||||
}
|
||||
|
||||
private long elapsedMillis(long startedAtNanos) {
|
||||
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
|
||||
}
|
||||
|
||||
private static String loadResource(String path) {
|
||||
try {
|
||||
ClassPathResource resource = new ClassPathResource(path);
|
||||
|
|
@ -766,4 +875,78 @@ public class DriverWorkingTimeReusableProjectionBuilder {
|
|||
throw new IllegalStateException("Cannot load EPL resource: " + path, e);
|
||||
}
|
||||
}
|
||||
|
||||
private record PreparedProjectionDefinition(
|
||||
EPCompiled compiled
|
||||
) {
|
||||
}
|
||||
|
||||
private record PreparedProjectionDefinitionCacheKey(
|
||||
int significantDrivingMinutes,
|
||||
int minimumRestPeriodMinutes,
|
||||
int restGeoLookbackMinutes,
|
||||
int restGeoLookaheadMinutes,
|
||||
int restGeoStationaryMaxMeters,
|
||||
int restGeoMinorMovementMaxMeters
|
||||
) {
|
||||
}
|
||||
|
||||
private record DerivedProjectionRuntimeExecutionMetrics(
|
||||
boolean definitionCacheHit,
|
||||
long definitionPreparationMs,
|
||||
long runtimeInitMs,
|
||||
long deployMs,
|
||||
long listenerRegistrationMs,
|
||||
long sendSupportGeoMs,
|
||||
long sendVehicleUsageMs,
|
||||
long sendActivityMs,
|
||||
long destroyMs
|
||||
) {
|
||||
}
|
||||
|
||||
private static final class DerivedProjectionEventSender {
|
||||
|
||||
private final EPRuntime delegate;
|
||||
private long sendSupportGeoMs;
|
||||
private long sendVehicleUsageMs;
|
||||
private long sendActivityMs;
|
||||
|
||||
private DerivedProjectionEventSender(EPRuntime delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
private void sendSupportGeoEvent(Map<String, Object> event) {
|
||||
long startedAtNanos = System.nanoTime();
|
||||
delegate.getEventService().sendEventMap(event, "TachographSupportGeoEvidenceInputEvent");
|
||||
sendSupportGeoMs += elapsedMillisStatic(startedAtNanos);
|
||||
}
|
||||
|
||||
private void sendVehicleUsageEvent(Map<String, Object> event) {
|
||||
long startedAtNanos = System.nanoTime();
|
||||
delegate.getEventService().sendEventMap(event, "TachographVehicleUsageIntervalInputEvent");
|
||||
sendVehicleUsageMs += elapsedMillisStatic(startedAtNanos);
|
||||
}
|
||||
|
||||
private void sendActivityEvent(Map<String, Object> event) {
|
||||
long startedAtNanos = System.nanoTime();
|
||||
delegate.getEventService().sendEventMap(event, "TachographActivityIntervalInputEvent");
|
||||
sendActivityMs += elapsedMillisStatic(startedAtNanos);
|
||||
}
|
||||
|
||||
private long sendSupportGeoMs() {
|
||||
return sendSupportGeoMs;
|
||||
}
|
||||
|
||||
private long sendVehicleUsageMs() {
|
||||
return sendVehicleUsageMs;
|
||||
}
|
||||
|
||||
private long sendActivityMs() {
|
||||
return sendActivityMs;
|
||||
}
|
||||
|
||||
private static long elapsedMillisStatic(long startedAtNanos) {
|
||||
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,13 +3,17 @@ package at.procon.eventhub.processing.eventprocessing;
|
|||
import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingApiRequest;
|
||||
import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingResultDto;
|
||||
import at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventProcessingProfileDescriptorDto;
|
||||
import java.util.List;
|
||||
import at.procon.eventhub.processing.eventprocessing.profile.RuntimeEventProcessingProfileRegistry;
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class RuntimeEventProcessingService {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RuntimeEventProcessingService.class);
|
||||
|
||||
private final RuntimeEventProcessingProfileRegistry profileRegistry;
|
||||
|
||||
public RuntimeEventProcessingService(RuntimeEventProcessingProfileRegistry profileRegistry) {
|
||||
|
|
@ -17,10 +21,34 @@ public class RuntimeEventProcessingService {
|
|||
}
|
||||
|
||||
public RuntimeEventProcessingResultDto process(RuntimeEventProcessingApiRequest request) {
|
||||
return profileRegistry.require(request.profileKey()).process(request);
|
||||
long startedAtNanos = System.nanoTime();
|
||||
try {
|
||||
RuntimeEventProcessingResultDto result = profileRegistry.require(request.profileKey()).process(request);
|
||||
LOG.info(
|
||||
"Runtime event processing profile {} completed in {} ms (partitions: {}, inputEvents: {}, warnings: {})",
|
||||
request.profileKey(),
|
||||
elapsedMillis(startedAtNanos),
|
||||
result.selectedPartitionCount(),
|
||||
result.inputEventCount(),
|
||||
result.warnings().size()
|
||||
);
|
||||
return result;
|
||||
} catch (RuntimeException ex) {
|
||||
LOG.error(
|
||||
"Runtime event processing profile {} failed after {} ms",
|
||||
request == null ? null : request.profileKey(),
|
||||
elapsedMillis(startedAtNanos),
|
||||
ex
|
||||
);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
public List<RuntimeEventProcessingProfileDescriptorDto> listProfiles() {
|
||||
return profileRegistry.profileDescriptors();
|
||||
}
|
||||
|
||||
private long elapsedMillis(long startedAtNanos) {
|
||||
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,12 +15,19 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcessingModule {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DriverWorkingTimeDerivedProjectionsModule.class);
|
||||
private static final String DRIVER_PROCESSING_TOTAL_MS_METADATA_KEY = "driverProcessingTotalMs";
|
||||
private static final String SLOWEST_DRIVER_KEY_METADATA_KEY = "slowestDriverKey";
|
||||
private static final String SLOWEST_DRIVER_PROCESSING_MS_METADATA_KEY = "slowestDriverProcessingMs";
|
||||
|
||||
private final DriverWorkingTimeProcessingCore workingTimeProcessingCore;
|
||||
private final RuntimeDriverWorkingTimeScopeProcessingService legacyScopeProcessingService;
|
||||
|
||||
|
|
@ -60,20 +67,31 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
|
|||
if (legacyScopeProcessingService != null) {
|
||||
return executeLegacy(context);
|
||||
}
|
||||
long startedAtNanos = System.nanoTime();
|
||||
UnifiedRuntimeEventBundle broadBundle = runtimeEventBundle(context);
|
||||
UnifiedRuntimeProcessingApiRequest scopeRequest = scopeRequest(context);
|
||||
Map<String, DriverWorkingTimePreparedInput> preparedInputs = preparedInputs(context);
|
||||
|
||||
LinkedHashMap<String, UnifiedRuntimeDerivedProjectionResultDto> driverResults = new LinkedHashMap<>();
|
||||
List<String> warnings = new ArrayList<>();
|
||||
long driverProcessingTotalMs = 0L;
|
||||
String slowestDriverKey = null;
|
||||
long slowestDriverProcessingMs = -1L;
|
||||
for (Map.Entry<String, DriverWorkingTimePreparedInput> entry : preparedInputs.entrySet()) {
|
||||
DriverWorkingTimePreparedInput preparedInput = entry.getValue();
|
||||
long driverStartedAtNanos = System.nanoTime();
|
||||
DriverWorkingTimeProcessingResultDto projection =
|
||||
workingTimeProcessingCore.process(preparedInput.processingInput())
|
||||
.withIncludedIntervals(
|
||||
scopeRequest.includeActivityIntervalsOrDefault(),
|
||||
scopeRequest.includeDrivingIntervalsOrDefault()
|
||||
);
|
||||
long driverProcessingMs = elapsedMillis(driverStartedAtNanos);
|
||||
driverProcessingTotalMs += driverProcessingMs;
|
||||
if (driverProcessingMs > slowestDriverProcessingMs) {
|
||||
slowestDriverProcessingMs = driverProcessingMs;
|
||||
slowestDriverKey = preparedInput.driverKey();
|
||||
}
|
||||
warnings.addAll(preparedInput.partition().warnings());
|
||||
UnifiedRuntimeProcessingRequest driverRequest = broadBundle.request().withDriverKey(preparedInput.driverKey());
|
||||
driverResults.put(preparedInput.driverKey(), new UnifiedRuntimeDerivedProjectionResultDto(
|
||||
|
|
@ -110,6 +128,18 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
|
|||
metadata.put("selectedDriverCount", result.selectedDriverCount());
|
||||
metadata.put("discoveredVehicleCount", result.discoveredVehicleCount());
|
||||
metadata.put("driverResultCount", result.driverResults().size());
|
||||
metadata.put(DRIVER_PROCESSING_TOTAL_MS_METADATA_KEY, driverProcessingTotalMs);
|
||||
metadata.put(SLOWEST_DRIVER_KEY_METADATA_KEY, slowestDriverKey);
|
||||
metadata.put(SLOWEST_DRIVER_PROCESSING_MS_METADATA_KEY, Math.max(0L, slowestDriverProcessingMs));
|
||||
LOG.info(
|
||||
"Driving-derived projections processed {} drivers in {} ms (driverProcessingTotalMs: {}, slowestDriverKey: {}, slowestDriverProcessingMs: {}, warnings: {})",
|
||||
result.driverResults().size(),
|
||||
elapsedMillis(startedAtNanos),
|
||||
driverProcessingTotalMs,
|
||||
slowestDriverKey,
|
||||
Math.max(0L, slowestDriverProcessingMs),
|
||||
result.warnings().size()
|
||||
);
|
||||
return new RuntimeProcessingModuleResult(
|
||||
moduleKey(),
|
||||
RuntimeProcessingModuleStatus.SUCCESS,
|
||||
|
|
@ -187,4 +217,8 @@ public class DriverWorkingTimeDerivedProjectionsModule implements RuntimeProcess
|
|||
}
|
||||
return fallback;
|
||||
}
|
||||
|
||||
private long elapsedMillis(long startedAtNanos) {
|
||||
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,11 +4,17 @@ import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecu
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class RuntimeProcessingPipelineExecutor {
|
||||
|
||||
public static final String EXECUTION_DURATION_MS_METADATA_KEY = "executionDurationMs";
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RuntimeProcessingPipelineExecutor.class);
|
||||
|
||||
private final RuntimeProcessingModuleRegistry moduleRegistry;
|
||||
|
||||
public RuntimeProcessingPipelineExecutor(RuntimeProcessingModuleRegistry moduleRegistry) {
|
||||
|
|
@ -29,8 +35,17 @@ public class RuntimeProcessingPipelineExecutor {
|
|||
continue;
|
||||
}
|
||||
RuntimeProcessingModule module = moduleRegistry.require(moduleKey.trim());
|
||||
RuntimeProcessingModuleResult result = module.execute(context);
|
||||
long startedAtNanos = System.nanoTime();
|
||||
RuntimeProcessingModuleResult result;
|
||||
try {
|
||||
result = withExecutionDuration(module.execute(context), startedAtNanos);
|
||||
} catch (RuntimeException ex) {
|
||||
long durationMs = elapsedMillis(startedAtNanos);
|
||||
LOG.error("Runtime processing module {} failed after {} ms", module.moduleKey(), durationMs, ex);
|
||||
throw ex;
|
||||
}
|
||||
results.put(module.moduleKey(), result);
|
||||
logModuleCompletion(result);
|
||||
context = new RuntimeProcessingModuleContext(
|
||||
request,
|
||||
context.events(),
|
||||
|
|
@ -43,4 +58,45 @@ public class RuntimeProcessingPipelineExecutor {
|
|||
}
|
||||
return Map.copyOf(results);
|
||||
}
|
||||
|
||||
private RuntimeProcessingModuleResult withExecutionDuration(
|
||||
RuntimeProcessingModuleResult result,
|
||||
long startedAtNanos
|
||||
) {
|
||||
long durationMs = elapsedMillis(startedAtNanos);
|
||||
RuntimeProcessingModuleResult safeResult = result == null
|
||||
? new RuntimeProcessingModuleResult(
|
||||
"UNKNOWN",
|
||||
RuntimeProcessingModuleStatus.FAILED,
|
||||
null,
|
||||
Map.of(),
|
||||
List.of("Runtime processing module returned null result.")
|
||||
)
|
||||
: result;
|
||||
LinkedHashMap<String, Object> metadata = new LinkedHashMap<>(safeResult.metadata());
|
||||
metadata.put(EXECUTION_DURATION_MS_METADATA_KEY, durationMs);
|
||||
return new RuntimeProcessingModuleResult(
|
||||
safeResult.moduleKey(),
|
||||
safeResult.status(),
|
||||
safeResult.output(),
|
||||
metadata,
|
||||
safeResult.warnings()
|
||||
);
|
||||
}
|
||||
|
||||
private void logModuleCompletion(RuntimeProcessingModuleResult result) {
|
||||
Object durationValue = result.metadata().get(EXECUTION_DURATION_MS_METADATA_KEY);
|
||||
long durationMs = durationValue instanceof Number number ? number.longValue() : -1L;
|
||||
LOG.info(
|
||||
"Runtime processing module {} completed with status {} in {} ms (warnings: {})",
|
||||
result.moduleKey(),
|
||||
result.status(),
|
||||
durationMs,
|
||||
result.warnings().size()
|
||||
);
|
||||
}
|
||||
|
||||
private long elapsedMillis(long startedAtNanos) {
|
||||
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,10 +13,15 @@ import com.espertech.esper.runtime.client.EPRuntimeProvider;
|
|||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StreamUtils;
|
||||
|
|
@ -24,24 +29,63 @@ import org.springframework.util.StreamUtils;
|
|||
@Service
|
||||
public class RuntimeEplModuleExecutor {
|
||||
|
||||
public static final String DEFINITION_CACHE_HIT_METADATA_KEY = "eplDefinitionCacheHit";
|
||||
public static final String DEFINITION_PREPARATION_MS_METADATA_KEY = "eplDefinitionPreparationMs";
|
||||
public static final String RUNTIME_INIT_MS_METADATA_KEY = "eplRuntimeInitMs";
|
||||
public static final String DEPLOY_MS_METADATA_KEY = "eplDeployMs";
|
||||
public static final String LISTENER_REGISTRATION_MS_METADATA_KEY = "eplListenerRegistrationMs";
|
||||
public static final String EVENT_SEND_MS_METADATA_KEY = "eplEventSendMs";
|
||||
public static final String DESTROY_MS_METADATA_KEY = "eplDestroyMs";
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RuntimeEplModuleExecutor.class);
|
||||
private static final AtomicLong RUNTIME_COUNTER = new AtomicLong();
|
||||
|
||||
private final ConcurrentMap<PreparedDefinitionCacheKey, PreparedDefinition> preparedDefinitions = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, String> eplResourceContents = new ConcurrentHashMap<>();
|
||||
|
||||
public RuntimeEplModuleExecutionResult execute(RuntimeEplModuleDefinition definition) {
|
||||
EPRuntime runtime = null;
|
||||
Map<String, List<Map<String, Object>>> outputs = null;
|
||||
LinkedHashMap<String, Object> metadata = null;
|
||||
PreparedDefinition preparedDefinition = null;
|
||||
boolean definitionCacheHit = false;
|
||||
long definitionPreparationMs = 0L;
|
||||
long runtimeInitMs = 0L;
|
||||
long deployMs = 0L;
|
||||
long listenerRegistrationMs = 0L;
|
||||
long eventSendMs = 0L;
|
||||
long destroyMs = 0L;
|
||||
long executionStartedAtNanos = System.nanoTime();
|
||||
try {
|
||||
Configuration configuration = new Configuration();
|
||||
definition.eventTypes().forEach((eventTypeName, eventTypeDefinition) ->
|
||||
configuration.getCommon().addEventType(eventTypeName, eventTypeDefinition));
|
||||
PreparedDefinitionCacheKey cacheKey = PreparedDefinitionCacheKey.from(definition);
|
||||
preparedDefinition = preparedDefinitions.get(cacheKey);
|
||||
definitionCacheHit = preparedDefinition != null;
|
||||
if (!definitionCacheHit) {
|
||||
long definitionPreparationStartedAtNanos = System.nanoTime();
|
||||
PreparedDefinition candidate = prepareDefinition(definition);
|
||||
PreparedDefinition existing = preparedDefinitions.putIfAbsent(cacheKey, candidate);
|
||||
preparedDefinition = existing == null ? candidate : existing;
|
||||
definitionCacheHit = existing != null;
|
||||
definitionPreparationMs = definitionCacheHit
|
||||
? 0L
|
||||
: elapsedMillis(definitionPreparationStartedAtNanos);
|
||||
}
|
||||
|
||||
long runtimeInitStartedAtNanos = System.nanoTime();
|
||||
Configuration configuration = createConfiguration(preparedDefinition.eventTypes());
|
||||
|
||||
String runtimeUri = "eventhub-runtime-epl-module-"
|
||||
+ definition.moduleKey().replaceAll("[^A-Za-z0-9_-]", "-")
|
||||
+ "-" + RUNTIME_COUNTER.incrementAndGet();
|
||||
runtime = EPRuntimeProvider.getRuntime(runtimeUri, configuration);
|
||||
runtimeInitMs = elapsedMillis(runtimeInitStartedAtNanos);
|
||||
|
||||
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(renderEpl(definition.eplResources()), new CompilerArguments(configuration));
|
||||
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
|
||||
long deployStartedAtNanos = System.nanoTime();
|
||||
EPDeployment deployment = runtime.getDeploymentService().deploy(preparedDefinition.compiled());
|
||||
deployMs = elapsedMillis(deployStartedAtNanos);
|
||||
|
||||
Map<String, List<Map<String, Object>>> outputs = new LinkedHashMap<>();
|
||||
long listenerRegistrationStartedAtNanos = System.nanoTime();
|
||||
outputs = new LinkedHashMap<>();
|
||||
for (String statementName : definition.outputStatementNames()) {
|
||||
outputs.put(statementName, new ArrayList<>());
|
||||
var statement = runtime.getDeploymentService().getStatement(deployment.getDeploymentId(), statementName);
|
||||
|
|
@ -49,18 +93,28 @@ public class RuntimeEplModuleExecutor {
|
|||
throw new IllegalStateException("EPL module " + definition.moduleKey()
|
||||
+ " did not deploy expected statement '" + statementName + "'.");
|
||||
}
|
||||
statement.addListener((newData, oldData, stmt, rt) -> collect(newData, outputs.get(statementName)));
|
||||
List<Map<String, Object>> statementOutputs = outputs.get(statementName);
|
||||
statement.addListener((newData, oldData, stmt, rt) -> collect(newData, statementOutputs));
|
||||
}
|
||||
listenerRegistrationMs = elapsedMillis(listenerRegistrationStartedAtNanos);
|
||||
|
||||
long eventSendStartedAtNanos = System.nanoTime();
|
||||
for (RuntimeEplInputEventStream inputStream : definition.inputStreams()) {
|
||||
for (Map<String, Object> event : inputStream.events()) {
|
||||
runtime.getEventService().sendEventMap(event, inputStream.eventTypeName());
|
||||
}
|
||||
}
|
||||
eventSendMs = elapsedMillis(eventSendStartedAtNanos);
|
||||
|
||||
Map<String, Object> metadata = new LinkedHashMap<>();
|
||||
metadata = new LinkedHashMap<>();
|
||||
metadata.put("engine", "EPL");
|
||||
metadata.put("eplResources", definition.eplResources());
|
||||
metadata.put(DEFINITION_CACHE_HIT_METADATA_KEY, definitionCacheHit);
|
||||
metadata.put(DEFINITION_PREPARATION_MS_METADATA_KEY, definitionPreparationMs);
|
||||
metadata.put(RUNTIME_INIT_MS_METADATA_KEY, runtimeInitMs);
|
||||
metadata.put(DEPLOY_MS_METADATA_KEY, deployMs);
|
||||
metadata.put(LISTENER_REGISTRATION_MS_METADATA_KEY, listenerRegistrationMs);
|
||||
metadata.put(EVENT_SEND_MS_METADATA_KEY, eventSendMs);
|
||||
Map<String, Integer> inputCounts = new LinkedHashMap<>();
|
||||
for (RuntimeEplInputEventStream inputStream : definition.inputStreams()) {
|
||||
inputCounts.merge(inputStream.eventTypeName(), inputStream.events().size(), Integer::sum);
|
||||
|
|
@ -69,14 +123,48 @@ public class RuntimeEplModuleExecutor {
|
|||
Map<String, Integer> outputCounts = new LinkedHashMap<>();
|
||||
outputs.forEach((statement, events) -> outputCounts.put(statement, events.size()));
|
||||
metadata.put("outputCounts", outputCounts);
|
||||
return new RuntimeEplModuleExecutionResult(outputs, metadata);
|
||||
} catch (EPCompileException | EPDeployException e) {
|
||||
throw new IllegalStateException("Cannot compile/deploy runtime EPL module " + definition.moduleKey(), e);
|
||||
} finally {
|
||||
if (runtime != null) {
|
||||
long destroyStartedAtNanos = System.nanoTime();
|
||||
runtime.destroy();
|
||||
destroyMs = elapsedMillis(destroyStartedAtNanos);
|
||||
}
|
||||
if (metadata != null) {
|
||||
metadata.put(DESTROY_MS_METADATA_KEY, destroyMs);
|
||||
LOG.info(
|
||||
"Runtime EPL module {} completed in {} ms (definitionCacheHit: {}, definitionPreparationMs: {}, runtimeInitMs: {}, deployMs: {}, listenerRegistrationMs: {}, eventSendMs: {}, destroyMs: {})",
|
||||
definition.moduleKey(),
|
||||
elapsedMillis(executionStartedAtNanos),
|
||||
definitionCacheHit,
|
||||
definitionPreparationMs,
|
||||
runtimeInitMs,
|
||||
deployMs,
|
||||
listenerRegistrationMs,
|
||||
eventSendMs,
|
||||
destroyMs
|
||||
);
|
||||
}
|
||||
}
|
||||
return new RuntimeEplModuleExecutionResult(
|
||||
outputs == null ? Map.of() : outputs,
|
||||
metadata == null ? Map.of() : metadata
|
||||
);
|
||||
}
|
||||
|
||||
private PreparedDefinition prepareDefinition(RuntimeEplModuleDefinition definition) throws EPCompileException {
|
||||
Configuration configuration = createConfiguration(definition.eventTypes());
|
||||
String epl = renderEpl(definition.eplResources());
|
||||
EPCompiled compiled = EPCompilerProvider.getCompiler().compile(epl, new CompilerArguments(configuration));
|
||||
return new PreparedDefinition(definition.eventTypes(), compiled);
|
||||
}
|
||||
|
||||
private Configuration createConfiguration(Map<String, Map<String, Object>> eventTypes) {
|
||||
Configuration configuration = new Configuration();
|
||||
eventTypes.forEach((eventTypeName, eventTypeDefinition) ->
|
||||
configuration.getCommon().addEventType(eventTypeName, eventTypeDefinition));
|
||||
return configuration;
|
||||
}
|
||||
|
||||
private String renderEpl(List<String> resources) {
|
||||
|
|
@ -88,7 +176,7 @@ public class RuntimeEplModuleExecutor {
|
|||
if (!builder.isEmpty()) {
|
||||
builder.append("\n\n");
|
||||
}
|
||||
builder.append(loadResource(resource.trim()));
|
||||
builder.append(eplResourceContents.computeIfAbsent(resource.trim(), RuntimeEplModuleExecutor::loadResource));
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
|
|
@ -117,4 +205,40 @@ public class RuntimeEplModuleExecutor {
|
|||
target.add(values);
|
||||
}
|
||||
}
|
||||
|
||||
private long elapsedMillis(long startedAtNanos) {
|
||||
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
|
||||
}
|
||||
|
||||
private record PreparedDefinition(
|
||||
Map<String, Map<String, Object>> eventTypes,
|
||||
EPCompiled compiled
|
||||
) {
|
||||
private PreparedDefinition {
|
||||
eventTypes = eventTypes == null ? Map.of() : immutableNestedMap(eventTypes);
|
||||
}
|
||||
}
|
||||
|
||||
private record PreparedDefinitionCacheKey(
|
||||
String moduleKey,
|
||||
Map<String, Map<String, Object>> eventTypes,
|
||||
List<String> eplResources
|
||||
) {
|
||||
private static PreparedDefinitionCacheKey from(RuntimeEplModuleDefinition definition) {
|
||||
return new PreparedDefinitionCacheKey(
|
||||
definition.moduleKey(),
|
||||
immutableNestedMap(definition.eventTypes()),
|
||||
List.copyOf(definition.eplResources())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private static Map<String, Map<String, Object>> immutableNestedMap(Map<String, Map<String, Object>> source) {
|
||||
LinkedHashMap<String, Map<String, Object>> copy = new LinkedHashMap<>();
|
||||
source.forEach((key, value) -> copy.put(
|
||||
key,
|
||||
value == null ? Map.of() : Collections.unmodifiableMap(new LinkedHashMap<>(value))
|
||||
));
|
||||
return Collections.unmodifiableMap(copy);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,17 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.plan;
|
||||
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingModuleResult;
|
||||
import at.procon.eventhub.processing.eventprocessing.module.RuntimeProcessingPipelineExecutor;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class RuntimeProcessingExecutionService {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RuntimeProcessingExecutionService.class);
|
||||
|
||||
private final RuntimeProcessingPlanRegistry planRegistry;
|
||||
|
||||
public RuntimeProcessingExecutionService(RuntimeProcessingPlanRegistry planRegistry) {
|
||||
|
|
@ -13,12 +19,42 @@ public class RuntimeProcessingExecutionService {
|
|||
}
|
||||
|
||||
public RuntimeProcessingExecutionResultDto execute(RuntimeProcessingExecutionApiRequest request) {
|
||||
long startedAtNanos = System.nanoTime();
|
||||
RuntimeProcessingPlan plan = planRegistry.require(request.processingPlanKey());
|
||||
try {
|
||||
plan.validatePartitioning(request.partitioning());
|
||||
return plan.execute(request);
|
||||
RuntimeProcessingExecutionResultDto result = plan.execute(request);
|
||||
RuntimeProcessingModuleResult slowestModule = result.moduleResults().values().stream()
|
||||
.filter(moduleResult -> moduleResult.metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY) instanceof Number)
|
||||
.max(java.util.Comparator.comparingLong(moduleResult ->
|
||||
((Number) moduleResult.metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue()))
|
||||
.orElse(null);
|
||||
LOG.info(
|
||||
"Runtime processing plan {} completed in {} ms (modules: {}, slowestModule: {}, slowestModuleMs: {}, warnings: {})",
|
||||
request.processingPlanKey(),
|
||||
elapsedMillis(startedAtNanos),
|
||||
result.executedModules().size(),
|
||||
slowestModule == null ? null : slowestModule.moduleKey(),
|
||||
slowestModule == null ? null : slowestModule.metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY),
|
||||
result.warnings().size()
|
||||
);
|
||||
return result;
|
||||
} catch (RuntimeException ex) {
|
||||
LOG.error(
|
||||
"Runtime processing plan {} failed after {} ms",
|
||||
request == null ? null : request.processingPlanKey(),
|
||||
elapsedMillis(startedAtNanos),
|
||||
ex
|
||||
);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
public List<RuntimeProcessingPlanDescriptorDto> listPlans() {
|
||||
return planRegistry.planDescriptors();
|
||||
}
|
||||
|
||||
private long elapsedMillis(long startedAtNanos) {
|
||||
return Math.max(0L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import at.procon.eventhub.dto.VehicleRefDto;
|
|||
import at.procon.eventhub.dto.VehicleRegistrationRefDto;
|
||||
import at.procon.eventhub.processing.driverworkingtime.model.DriverWorkingTimeVehicleUsageInterval;
|
||||
import at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest;
|
||||
import at.procon.eventhub.processing.eventprocessing.module.epl.RuntimeEplModuleExecutor;
|
||||
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
|
||||
import at.procon.eventhub.processing.model.UnifiedEventSourceFamily;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
|
|
@ -78,6 +79,64 @@ class DriverVehicleUsageIntervalsModuleTest {
|
|||
assertThat(intervals.get(0).endedAt()).isEqualTo(OffsetDateTime.parse("2026-05-01T10:10:00Z"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void reusesPreparedEplDefinitionAcrossExecutions() {
|
||||
DriverVehicleUsageIntervalsModule module = new DriverVehicleUsageIntervalsModule(new RuntimeEplModuleExecutor());
|
||||
RuntimeProcessingModuleContext context = new RuntimeProcessingModuleContext(
|
||||
new RuntimeProcessingExecutionApiRequest(
|
||||
"driver-working-time-v1",
|
||||
runtimeScope(),
|
||||
null,
|
||||
List.of(),
|
||||
Map.of()
|
||||
),
|
||||
List.of(
|
||||
vehicleUsageEvent(
|
||||
"CVU-1",
|
||||
EventType.CARD_INSERTED,
|
||||
EventLifecycle.INSERT,
|
||||
"2026-05-01T07:50:00Z",
|
||||
"2026-05-01T07:50:00Z",
|
||||
"2026-05-01T10:10:00Z",
|
||||
100_000L,
|
||||
"INSERTED"
|
||||
),
|
||||
vehicleUsageEvent(
|
||||
"CVU-99",
|
||||
EventType.CARD_WITHDRAWN,
|
||||
EventLifecycle.WITHDRAW,
|
||||
"2026-05-01T10:10:00Z",
|
||||
"2026-05-01T07:50:00Z",
|
||||
"2026-05-01T10:10:00Z",
|
||||
140_000L,
|
||||
"NOT_INSERTED"
|
||||
)
|
||||
),
|
||||
Map.of(),
|
||||
Map.of()
|
||||
);
|
||||
|
||||
RuntimeProcessingModuleResult first = module.execute(context);
|
||||
RuntimeProcessingModuleResult second = module.execute(context);
|
||||
|
||||
assertThat(first.metadata())
|
||||
.containsEntry(RuntimeEplModuleExecutor.DEFINITION_CACHE_HIT_METADATA_KEY, false)
|
||||
.containsKey(RuntimeEplModuleExecutor.DEFINITION_PREPARATION_MS_METADATA_KEY)
|
||||
.containsKey(RuntimeEplModuleExecutor.RUNTIME_INIT_MS_METADATA_KEY)
|
||||
.containsKey(RuntimeEplModuleExecutor.DEPLOY_MS_METADATA_KEY)
|
||||
.containsKey(RuntimeEplModuleExecutor.LISTENER_REGISTRATION_MS_METADATA_KEY)
|
||||
.containsKey(RuntimeEplModuleExecutor.EVENT_SEND_MS_METADATA_KEY)
|
||||
.containsKey(RuntimeEplModuleExecutor.DESTROY_MS_METADATA_KEY);
|
||||
assertThat(second.metadata())
|
||||
.containsEntry(RuntimeEplModuleExecutor.DEFINITION_CACHE_HIT_METADATA_KEY, true)
|
||||
.containsEntry(RuntimeEplModuleExecutor.DEFINITION_PREPARATION_MS_METADATA_KEY, 0L)
|
||||
.containsKey(RuntimeEplModuleExecutor.RUNTIME_INIT_MS_METADATA_KEY)
|
||||
.containsKey(RuntimeEplModuleExecutor.DEPLOY_MS_METADATA_KEY)
|
||||
.containsKey(RuntimeEplModuleExecutor.LISTENER_REGISTRATION_MS_METADATA_KEY)
|
||||
.containsKey(RuntimeEplModuleExecutor.EVENT_SEND_MS_METADATA_KEY)
|
||||
.containsKey(RuntimeEplModuleExecutor.DESTROY_MS_METADATA_KEY);
|
||||
}
|
||||
|
||||
private UnifiedRuntimeProcessingApiRequest runtimeScope() {
|
||||
return new UnifiedRuntimeProcessingApiRequest(
|
||||
UUID.randomUUID(),
|
||||
|
|
|
|||
|
|
@ -211,6 +211,9 @@ class DriverWorkingTimeDerivedProjectionsModuleTest {
|
|||
(UnifiedRuntimeDriverWorkingTimeScopeResultDto) result.output();
|
||||
UnifiedRuntimeDerivedProjectionResultDto driverResult = scopeResult.driverResults().get("12:123");
|
||||
|
||||
assertThat(result.metadata()).containsKey("driverProcessingTotalMs");
|
||||
assertThat(result.metadata()).containsEntry("slowestDriverKey", "12:123");
|
||||
assertThat(result.metadata()).containsKey("slowestDriverProcessingMs");
|
||||
assertThat(driverResult.projection().activityIntervalCount()).isEqualTo(1);
|
||||
assertThat(driverResult.projection().drivingIntervalCount()).isEqualTo(1);
|
||||
assertThat(driverResult.projection().activityIntervals()).isEmpty();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,119 @@
|
|||
package at.procon.eventhub.processing.eventprocessing.module;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingExecutionApiRequest;
|
||||
import at.procon.eventhub.processing.eventprocessing.partition.RuntimeEventPartitioningStrategy;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class RuntimeProcessingPipelineExecutorTest {
|
||||
|
||||
@Test
|
||||
void addsExecutionDurationMetadataToEachModuleResult() {
|
||||
RuntimeProcessingPipelineExecutor executor = new RuntimeProcessingPipelineExecutor(
|
||||
new RuntimeProcessingModuleRegistry(List.of(
|
||||
new TestModule("first"),
|
||||
new TestModule("second")
|
||||
))
|
||||
);
|
||||
|
||||
Map<String, RuntimeProcessingModuleResult> results = executor.execute(
|
||||
new RuntimeProcessingExecutionApiRequest(
|
||||
"driver-working-time-v1",
|
||||
new at.procon.eventhub.processing.dto.UnifiedRuntimeProcessingApiRequest(
|
||||
java.util.UUID.randomUUID(),
|
||||
List.of(),
|
||||
null,
|
||||
null,
|
||||
java.util.Set.of(at.procon.eventhub.processing.model.UnifiedEventSourceFamily.TACHOGRAPH_FILE_SESSION),
|
||||
null,
|
||||
null,
|
||||
"12:123",
|
||||
java.util.Set.of(),
|
||||
false,
|
||||
java.util.Set.of(),
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
java.time.OffsetDateTime.parse("2026-05-01T00:00:00Z"),
|
||||
java.time.OffsetDateTime.parse("2026-05-01T01:00:00Z"),
|
||||
true,
|
||||
0,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
new at.procon.eventhub.processing.eventprocessing.dto.RuntimeEventPartitioningApiRequest(
|
||||
RuntimeEventPartitioningStrategy.DRIVER,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
List.of(),
|
||||
Map.of()
|
||||
),
|
||||
List.of("first", "second"),
|
||||
null
|
||||
);
|
||||
|
||||
assertThat(results).containsKeys("first", "second");
|
||||
assertThat(results.get("first").metadata()).containsEntry("marker", "first");
|
||||
assertThat(results.get("first").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
|
||||
assertThat(results.get("second").metadata()).containsEntry("marker", "second");
|
||||
assertThat(results.get("second").metadata()).containsKey(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY);
|
||||
assertThat(((Number) results.get("first").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
|
||||
.isGreaterThanOrEqualTo(0L);
|
||||
assertThat(((Number) results.get("second").metadata().get(RuntimeProcessingPipelineExecutor.EXECUTION_DURATION_MS_METADATA_KEY)).longValue())
|
||||
.isGreaterThanOrEqualTo(0L);
|
||||
}
|
||||
|
||||
private static final class TestModule implements RuntimeProcessingModule {
|
||||
|
||||
private final String key;
|
||||
|
||||
private TestModule(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String moduleKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto descriptor() {
|
||||
return new at.procon.eventhub.processing.eventprocessing.plan.RuntimeProcessingModuleDescriptorDto(
|
||||
key,
|
||||
key,
|
||||
"test",
|
||||
"JAVA",
|
||||
java.util.Set.of()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuntimeProcessingModuleResult execute(RuntimeProcessingModuleContext context) {
|
||||
if ("second".equals(key)) {
|
||||
assertThat(context.previousResults()).containsKey("first");
|
||||
}
|
||||
return new RuntimeProcessingModuleResult(
|
||||
key,
|
||||
RuntimeProcessingModuleStatus.SUCCESS,
|
||||
null,
|
||||
Map.of("marker", key),
|
||||
List.of()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue