|
|
1 month ago | |
|---|---|---|
| config/cep | 1 month ago | |
| src | 1 month ago | |
| README.md | 1 month ago | |
| pom.xml | 1 month ago | |
README.md
malis-cep (prototype)
Spring Boot prototype demonstrating a config-driven CEP pipeline:
- split configuration (common + env profile + bindings directory)
- routing by binding (source match -> parser -> detector -> lifecycle -> outputs)
- external detectors return events only (ALARM/CANCEL); state is managed in this service
- caller-managed lifecycle: transition-only publishing (dedup + debounce)
- pluggable outputs + DLQ envelope
It also contains real transport implementations:
- MQTT ingress via Spring Integration MQTT (Paho)
- MQTT egress via Paho with per-output
brokerRef(seeMqttBrokerRegistry) - RabbitMQ egress (and optional ingress) via Spring AMQP
EEBUS XML support (prototype)
This prototype includes minimal, namespace-tolerant DOM parsing/formatting for:
- full SPINE-like
<Datagram>measurement payloads (EEBUS_MEASUREMENT_DATAGRAM_XML) - full SPINE-like
<Datagram>alarm payloads (EEBUS_ALARM_DATAGRAM_XML) - formatting detected alarms into a full
<Datagram>(EEBUS_ALARM_DATAGRAM_XML)
The XML samples used by unit tests are in src/test/resources/eebus/.
Internal detector (expression rules)
This prototype now includes a fully internal detector that evaluates configurable expressions on top of a protocol-agnostic input event model.
- detector type:
EXPRESSION_RULES - accepts parsed bodies:
EEBUS_MEASUREMENT_DATAGRAM_XML(full<Datagram>)MEASUREMENT_SET(fromJSON_PORTS_BOOLEAN, etc.)EventBatch(advanced: other converters can produce this directly)
Rules are boolean SpEL expressions over the in-memory per-device signal store.
Example common.yml snippet:
cep:
detectors:
internalPorts:
type: EXPRESSION_RULES
config:
missingPolicy: FALSE
rules:
- id: any_port_open
severity: MAJOR
expression: "s['port.1.state'] == false || s['port.2.state'] == false || s['port.3.state'] == false"
emitCancel: true
Each rule emits ALARM when the expression evaluates to true, otherwise CANCEL (unless emitCancel=false).
State transitions are still managed by AlarmLifecycleService.
Heartbeat (periodic state propagation)
By default, bindings publish only on state transitions (ALARM on open, CANCEL on close).
You can optionally enable a per-binding heartbeat that periodically re-emits the current state (useful when downstream systems want a "still active" / "still inactive" signal).
Example in a binding YAML:
cep:
bindings:
mqttEebusAlarm:
# ...
heartbeat:
enabled: true
periodMs: 60000
includeInactive: false # set true to also emit CANCEL heartbeats
initialDelayMs: 60000 # optional; if 0/absent it defaults to periodMs
Heartbeat messages keep the normal action (ALARM or CANCEL) and add details.heartbeat=true.
While a debounced CANCEL is pending, heartbeats continue as ALARM until the debounce maturity is reached.
Config layout
The app uses Spring Boot's config import:
./config/cep/common.yml./config/cep/env/${spring.profiles.active}.yml- all binding files in
./config/cep/bindings/*.yml
Bindings are merged into cep.bindings by BindingFileLoader.
Run (dev)
mvn spring-boot:run -Dspring-boot.run.profiles=dev
Local MQTT
The default dev profile expects MQTT at tcp://localhost:1883.
You can test with e.g. Mosquitto and publish:
# Scenario 3: raw ports JSON (8 boolean)
mosquitto_pub -h localhost -t "/sadfjsadf/site-a/heating" -m '{"port1":true,"port2":true,"port3":true,"port4":true,"port5":true,"port6":true,"port7":true,"port8":false}'
# Scenario 1/2: device publishes an alarm (JSON or full <Datagram> XML)
mosquitto_pub -h localhost -t "alarms/site-a/heating/device-1" -m '{"alarmCode":"X","action":"ALARM"}'
The produced alarms are published to:
malis/alarms/${site}/${department}(JSON alarm message)malis/alarms/xml/${site}/${department}(full XML)- DLQ to
malis/dlq(on errors / no binding match)
Local RabbitMQ (optional)
./config/cep/common.yml ships with a Rabbit output example (rabbitMalisEebus) and a Rabbit ingress example.
To enable Rabbit ingress, set in common.yml or a profile override:
cep:
ingress:
rabbit:
enabled: true
Dev profile includes a default Rabbit broker URI:
amqp://guest:guest@localhost:5672/%2F
Test via HTTP (transport simulation)
If you don't want to run MQTT/Rabbit locally, you can inject a message directly:
curl -X POST "http://localhost:8080/api/test/ingress?inType=MQTT&path=/sadfjsadf/site-a/heating" \
-H "Content-Type: application/json" \
-d '{"port1":true,"port2":true,"port3":true,"port4":true,"port5":true,"port6":true,"port7":true,"port8":false}'
Where to look in code
at.co.procon.malis.cep.binding.BindingResolver– resolves an ingress message to exactly one bindingat.co.procon.malis.cep.parser.*– parsers (raw payload -> canonical model)at.co.procon.malis.cep.detector.*– detectors (canonical model -> events)at.co.procon.malis.cep.lifecycle.AlarmLifecycleService– in-memory state (transition-only)at.co.procon.malis.cep.pipeline.CepPipeline– orchestration + DLQat.co.procon.malis.cep.transport.mqtt.*– real MQTT ingress/egressat.co.procon.malis.cep.transport.rabbit.*– Rabbit registry + optional ingress