MalIS-CEP initial import

master
trifonovt 1 month ago
commit b3a7aeeb97

@ -0,0 +1,154 @@
# malis-cep (prototype)
Spring Boot prototype demonstrating a **config-driven CEP** pipeline:
- split configuration (common + env profile + bindings directory)
- routing by binding (source match -> parser -> detector -> lifecycle -> outputs)
- external detectors return **events only** (ALARM/CANCEL); state is managed in this service
- caller-managed lifecycle: transition-only publishing (dedup + debounce)
- pluggable outputs + DLQ envelope
It also contains **real transport implementations**:
- MQTT ingress via **Spring Integration MQTT (Paho)**
- MQTT egress via **Paho** with per-output `brokerRef` (see `MqttBrokerRegistry`)
- RabbitMQ egress (and optional ingress) via **Spring AMQP**
## EEBUS XML support (prototype)
This prototype includes minimal, namespace-tolerant DOM parsing/formatting for:
- full SPINE-like `<Datagram>` **measurement** payloads (`EEBUS_MEASUREMENT_DATAGRAM_XML`)
- full SPINE-like `<Datagram>` **alarm** payloads (`EEBUS_ALARM_DATAGRAM_XML`)
- formatting detected alarms into a full `<Datagram>` (`EEBUS_ALARM_DATAGRAM_XML`)
The XML samples used by unit tests are in `src/test/resources/eebus/`.
## Internal detector (expression rules)
This prototype now includes a fully **internal** detector that evaluates configurable expressions on top of a
protocol-agnostic input event model.
- detector type: `EXPRESSION_RULES`
- accepts parsed bodies:
- `EEBUS_MEASUREMENT_DATAGRAM_XML` (full `<Datagram>`)
- `MEASUREMENT_SET` (from `JSON_PORTS_BOOLEAN`, etc.)
- `EventBatch` (advanced: other converters can produce this directly)
Rules are boolean SpEL expressions over the in-memory per-device signal store.
Example `common.yml` snippet:
```yaml
cep:
detectors:
internalPorts:
type: EXPRESSION_RULES
config:
missingPolicy: FALSE
rules:
- id: any_port_open
severity: MAJOR
expression: "s['port.1.state'] == false || s['port.2.state'] == false || s['port.3.state'] == false"
emitCancel: true
```
Each rule emits `ALARM` when the expression evaluates to `true`, otherwise `CANCEL` (unless `emitCancel=false`).
State transitions are still managed by `AlarmLifecycleService`.
## Heartbeat (periodic state propagation)
By default, bindings publish only on state transitions (`ALARM` on open, `CANCEL` on close).
You can optionally enable a **per-binding heartbeat** that periodically re-emits the *current* state
(useful when downstream systems want a "still active" / "still inactive" signal).
Example in a binding YAML:
```yaml
cep:
bindings:
mqttEebusAlarm:
# ...
heartbeat:
enabled: true
periodMs: 60000
includeInactive: false # set true to also emit CANCEL heartbeats
initialDelayMs: 60000 # optional; if 0/absent it defaults to periodMs
```
Heartbeat messages keep the normal action (`ALARM` or `CANCEL`) and add `details.heartbeat=true`.
While a debounced CANCEL is pending, heartbeats continue as `ALARM` until the debounce maturity is reached.
## Config layout
The app uses Spring Boot's config import:
- `./config/cep/common.yml`
- `./config/cep/env/${spring.profiles.active}.yml`
- all binding files in `./config/cep/bindings/*.yml`
Bindings are merged into `cep.bindings` by `BindingFileLoader`.
## Run (dev)
```bash
mvn spring-boot:run -Dspring-boot.run.profiles=dev
```
### Local MQTT
The default dev profile expects MQTT at `tcp://localhost:1883`.
You can test with e.g. Mosquitto and publish:
```bash
# Scenario 3: raw ports JSON (8 boolean)
mosquitto_pub -h localhost -t "/sadfjsadf/site-a/heating" -m '{"port1":true,"port2":true,"port3":true,"port4":true,"port5":true,"port6":true,"port7":true,"port8":false}'
# Scenario 1/2: device publishes an alarm (JSON or full <Datagram> XML)
mosquitto_pub -h localhost -t "alarms/site-a/heating/device-1" -m '{"alarmCode":"X","action":"ALARM"}'
```
The produced alarms are published to:
- `malis/alarms/${site}/${department}` (JSON alarm message)
- `malis/alarms/xml/${site}/${department}` (full <Datagram> XML)
- DLQ to `malis/dlq` (on errors / no binding match)
### Local RabbitMQ (optional)
`./config/cep/common.yml` ships with a Rabbit output example (`rabbitMalisEebus`) and a Rabbit ingress example.
To enable Rabbit ingress, set in `common.yml` or a profile override:
```yaml
cep:
ingress:
rabbit:
enabled: true
```
Dev profile includes a default Rabbit broker URI:
- `amqp://guest:guest@localhost:5672/%2F`
## Test via HTTP (transport simulation)
If you don't want to run MQTT/Rabbit locally, you can inject a message directly:
```bash
curl -X POST "http://localhost:8080/api/test/ingress?inType=MQTT&path=/sadfjsadf/site-a/heating" \
-H "Content-Type: application/json" \
-d '{"port1":true,"port2":true,"port3":true,"port4":true,"port5":true,"port6":true,"port7":true,"port8":false}'
```
## Where to look in code
- `at.co.procon.malis.cep.binding.BindingResolver` resolves an ingress message to **exactly one** binding
- `at.co.procon.malis.cep.parser.*` parsers (raw payload -> canonical model)
- `at.co.procon.malis.cep.detector.*` detectors (canonical model -> events)
- `at.co.procon.malis.cep.lifecycle.AlarmLifecycleService` in-memory state (transition-only)
- `at.co.procon.malis.cep.pipeline.CepPipeline` orchestration + DLQ
- `at.co.procon.malis.cep.transport.mqtt.*` real MQTT ingress/egress
- `at.co.procon.malis.cep.transport.rabbit.*` Rabbit registry + optional ingress

@ -0,0 +1,15 @@
cep:
bindings:
http-eebus-alarm:
enabled: true
in:
type: HTTP
match:
pathRegex: "^/rest/alarms/(?<site>[^/]+)/(?<department>[^/]+)/(?<deviceId>[^/]+)$"
extract:
tenant: "default"
parserRef: "eebusAlarmDatagramXml" # ensure this parser is registered to EebusAlarmDatagramXmlParser
detectorRef: "passthroughEebusAlarm" # PassthroughAlarmDetector
lifecyclePolicyRef: "passthrough"
outputRefs:
- "outboxMalisEebusDatagramXml" # Output with format EEBUS_ALARM_DATAGRAM_XML

@ -0,0 +1,30 @@
cep:
bindings:
http-eebus-measurement-datagram-to-external-detector:
enabled: true
in:
type: HTTP
match:
# Example topic scheme; adjust to your real gateway convention.
pathRegex: "^measurements/(?<site>[^/]+)/(?<department>[^/]+)/(?<deviceId>[^/]+)$"
extract:
tenant: "default"
# Expect a full EEBUS SPINE-like <Datagram> XML payload.
parserRef: "eebusMeasurementDatagramXml"
# External detector receives the full <Datagram> XML (recommended requestFormat=DATAGRAM_XML).
detectorRef: "externalEebusDatagramRest"
lifecyclePolicyRef: "default"
outputRefs: ["outboxMalisEebusDatagramXml"]
http-eebus-measurement-datagrams-to-internal-detector:
enabled: true
in:
type: HTTP
match:
pathRegex: "^eebus/measurement-datagrams/(?<site>[^/]+)/(?<department>[^/]+)/(?<deviceId>[^/]+)$"
extract:
tenant: "default"
parserRef: "eebusMeasurementDatagramXml"
detectorRef: "internalEebusExpression"
lifecyclePolicyRef: "default"
# For local testing without MQTT/Rabbit, outbox is fine; /api/test/ingress will also show detector/lifecycle results.
outputRefs: ["outboxMalisEebusDatagramXml", "mqttMalisEebusDatagramXml"]

@ -0,0 +1,16 @@
cep:
bindings:
mqtt-eebus-alarm-generic:
enabled: true
in:
type: MQTT
match:
topicRegex: "^alarms/(?<site>[^/]+)/(?<department>[^/]+)/(?<deviceId>[^/]+)$"
extract:
tenant: "default"
# Accept both JSON and full <Datagram> XML payloads
parserRef: "eebusAlarm"
detectorRef: "passthroughEebusAlarm"
lifecyclePolicyRef: "default"
# Publish both a JSON alarm message and a full <Datagram> XML (useful for testing)
outputRefs: ["mqttMalisEebus", "mqttMalisEebusDatagramXml"]

@ -0,0 +1,17 @@
cep:
bindings:
mqtt-eebus-measurement-datagram-to-external-detector:
enabled: true
in:
type: MQTT
match:
# Example topic scheme; adjust to your real gateway convention.
topicRegex: "^measurements/(?<site>[^/]+)/(?<department>[^/]+)/(?<deviceId>[^/]+)$"
extract:
tenant: "default"
# Expect a full EEBUS SPINE-like <Datagram> XML payload.
parserRef: "eebusMeasurementDatagramXml"
# External detector receives the full <Datagram> XML (recommended requestFormat=DATAGRAM_XML).
detectorRef: "externalEebusDatagramRest"
lifecyclePolicyRef: "default"
outputRefs: ["mqttMalisEebusDatagramXml"]

@ -0,0 +1,30 @@
cep:
bindings:
# MQTT ingress topics: <port>/<app>/<tenant>/<pin>
# Example: 40200/m2m/tenantA/1
mqttM2mPinErpnextComposite:
enabled: true
in:
type: MQTT
match:
topicRegex: "^(?<deviceId>[^/]+)/(?<app>[^/]+)/(?<tenant>[^/]+)/(?<pin>[^/]+)$"
# Extracted group vars: port, app, tenant, pin
parserRef: m2mPinUpdateErpnext
detectorRef: pinBadState
lifecyclePolicyRef: compositeDefault
composite:
groupKeyTemplate: "${site}|${department}|Pumpensumpf" # function should be specified in configuration "${site}|${department}|${function}"
sourceKeyTemplate: "${path}" # or "${port}/${app}/${tenant}/${pin}"
outputRefs:
# Publishes EEBUS-like <Datagram> XML to MQTT using the templates in config/cep/common.yml
- restMalisFault
- outboxMalisFaultJson
heartbeat:
enabled: true
periodMs: 600000
includeInactive: true

@ -0,0 +1,24 @@
cep:
bindings:
# MQTT ingress topics: <port>/<app>/<tenant>/<pin>
# Example: 40200/m2m/tenantA/1
#
# This binding uses a JS processor (parser+detector in one step).
mqttM2mPinErpnextJs:
enabled: false # set true to use this binding
in:
type: MQTT
match:
topicRegex: "^(?<deviceId>[^/]+)/(?<app>[^/]+)/(?<tenant>[^/]+)/(?<pin>[^/]+)$"
processorRef: m2mPinErpnextJs
# You can choose SIMPLE or COMPOSITE lifecycle.
lifecyclePolicyRef: default
# Example COMPOSITE usage (uncomment and set lifecyclePolicyRef: compositeDefault)
# composite:
# groupKeyTemplate: "${site}|${department}|${function}"
# sourceKeyTemplate: "${path}"
outputRefs:
- restMalisFault # if you configured a REST output with MALIS_FAULT_JSON

@ -0,0 +1,17 @@
cep:
bindings:
# MQTT ingress topics: <port>/<app>/<tenant>/<pin>
# Example: 40200/m2m/tenantA/1
mqttM2mPinErpnext:
enabled: false
in:
type: MQTT
match:
topicRegex: "^(?<deviceId>[^/]+)/(?<app>[^/]+)/(?<tenant>[^/]+)/(?<pin>[^/]+)$"
# Extracted group vars: port, app, tenant, pin
parserRef: m2mPinUpdateErpnext
detectorRef: pinBadState
lifecyclePolicyRef: default
outputRefs:
# Publishes EEBUS-like <Datagram> XML to MQTT using the templates in config/cep/common.yml
- restMalisFault

@ -0,0 +1,18 @@
cep:
bindings:
mqtt-ports8-raw-json-to-external-detector:
enabled: true
in:
type: MQTT
match:
topicRegex: "^/sadfjsadf/(?<site>[^/]+)/(?<department>[^/]+)$"
extract:
tenant: "default"
deviceIdStrategy:
type: "HASH_OF_TOPIC"
config:
salt: "ports8"
parserRef: "jsonPorts8"
detectorRef: "externalM2MRest"
lifecyclePolicyRef: "portsFlappy"
outputRefs: ["mqttMalisEebus", "mqttMalisEebusDatagramXml"]

@ -0,0 +1,16 @@
cep:
bindings:
rabbit-eebus-alarm-routingkey:
enabled: false
in:
type: RABBIT
match:
# IMPORTANT: for Rabbit ingress we match on the *received routing key* if present,
# otherwise we match on the consumer queue name.
queueRegex: "^alarms\\.(?<site>[^.]+)\\.(?<department>[^.]+)\\.(?<deviceId>[^.]+)$"
extract:
tenant: "default"
parserRef: "eebusAlarm"
detectorRef: "passthroughEebusAlarm"
lifecyclePolicyRef: "default"
outputRefs: ["rabbitMalisEebus"]

@ -0,0 +1,363 @@
cep:
# ------------------------------------------------------------
# Ingress (transport)
# ------------------------------------------------------------
# Ingress controls how the service RECEIVES raw events (MQTT/Rabbit/HTTP).
# Bindings (below, in ./config/cep/bindings/*.yml) control how we ROUTE + PROCESS them.
ingress:
mqtt:
enabled: true
brokerRef: "primary"
# MQTT topic filters (wildcards), NOT regex. Routing still happens via regex in bindings.
subscriptions:
- "alarms/#"
- "measurements/#"
# M2M GPIO pin update topics: <port>/<app>/<tenant>/<pin>
- "40092/malis/open-energy.at/1"
- "40092/malis/open-energy.at/2"
#- "+/malis/+/+"
qos: 1
automaticReconnect: true
cleanSession: false
cleanStart: false
rabbit:
enabled: false
brokerRef: "rabbit"
queues:
- "malis.ingress"
concurrentConsumers: 1
maxConcurrentConsumers: 4
prefetch: 50
# ------------------------------------------------------------
# Parsers (ingress payload -> canonical internal model)
# ------------------------------------------------------------
parsers:
# Accept both JSON "alarm message" and full <Datagram> XML (migration helper)
eebusAlarm:
type: SMART_EEBUS_ALARM
eebusAlarmDatagramXml:
type: EEBUS_ALARM_DATAGRAM_XML
eebusMeasurementDatagramXml:
type: EEBUS_MEASUREMENT_DATAGRAM_XML
jsonPorts8:
type: JSON_PORTS_BOOLEAN
config:
portCount: 8
keyPattern: "port{n}"
measurementPathPattern: "port.{n}.state"
# Optional: if one topic may contain either ports-JSON or EEBUS <Datagram> measurements
smartPortsOrEebusMeasurement:
type: SMART_PORTS_OR_EEBUS_MEASUREMENT
config:
portCount: 8
keyPattern: "port{n}"
measurementPathPattern: "port.{n}.state"
# M2M GPIO pin updates: enrich by resolving site/department/pin name/bad state from ERPNext.
m2mPinUpdateErpnext:
type: M2M_PIN_UPDATE_JSON_ERPNEXT
config:
erpnext:
baseUrl: "https://open-energy.erpnext.components.at"
apiKey: "255960115a8adec"
apiSecret: "0aa65b386bc7bbb"
timeoutMs: 5000
# Optional overrides (if your doctypes/fields are renamed):
# doctypes:
# iotDevice: "IoT_Device"
# site: "Site"
# siteIotDevice: "Site_IoT_Device"
# malisConfig: "MalIS Configuration"
# malisConfigDetail: "MalIS Configuration Detail"
# fields:
# iotDevicePortAddress: "port_address"
# siteIotDevicesAssigned: "iot_devices_assigned"
# siteIotDeviceIotDevice: "iot_device"
# siteIotDeviceMalis: "malis"
# siteIotDeviceRemark: "remark"
# siteIotDeviceFrom: "from"
# siteIotDeviceTo: "to"
# malisDetailPin: "pin"
# malisDetailStream: "stream"
# malisDetailStreamValue: "DigitalInput"
# malisDetailPinName: "signal_name"
# malisDetailDescription: "description"
# malisDetailBad: "bad"
# malisDetailDepartment: "department"
# malisConfigNameField: "name"
# malisConfigDetailsField: "details"
cache:
ttlSeconds: 60
maxSize: 20000
# Preload & periodically refresh config locally (recommended for scale):
# - resolves (port,pin) fully in-memory during normal processing
# - background refresh swaps an immutable snapshot atomically
preload:
enabled: true
onStartup: true
refreshSeconds: 60
initialDelaySeconds: 5
allowOnDemandFallback: true
# ------------------------------------------------------------
# Processors (parser + detector in one step)
# ------------------------------------------------------------
# A processor receives the raw ingress message (topic/path, headers, payload) and binding vars,
# and directly returns detector events + optional enriched vars.
#
# Bindings using a processor must set: processorRef (and must NOT set parserRef/detectorRef).
processors:
# JS processor combining M2M pin parsing + ERPNext enrichment + bad-state alarm detection.
# Script location is resolved relative to the working directory.
m2mPinErpnextJs:
type: JAVASCRIPT_PROCESSOR
config:
scriptPath: "./config/cep/scripts/m2m-pin-erpnext-processor.js"
entryFunction: "process"
hotReload: true
timeoutMs: 0
# These map to PinBadStateDetector defaults.
faultKeyTemplate: "${tenant}:${site}:${iotDevice}:PIN:${pin}"
severity: "MAJOR"
# ERPNext client used by JS via erp.lookup(port,pin,timestamp)
erpnext:
baseUrl: "https://open-energy.erpnext.components.at"
apiKey: "255960115a8adec"
apiSecret: "0aa65b386bc7bbb"
timeoutMs: 5000
cache:
ttlSeconds: 120
maxSize: 20000
preload:
enabled: true
onStartup: true
refreshSeconds: 120
initialDelaySeconds: 5
allowOnDemandFallback: true
# ------------------------------------------------------------
# Detectors (canonical model -> DETECTOR EVENTS (ALARM/CANCEL))
# ------------------------------------------------------------
detectors:
passthroughEebusAlarm:
type: PASSTHROUGH_ALARM
externalM2MRest:
type: EXTERNAL_REST_EVENTS
config:
baseUrl: "http://CHANGE-ME-IN-PROFILE"
path: "/api/v1/alarms/m2m-alarmlogic-01"
timeoutMs: 1500
requestFormat: "JSON_PORTS_LIST"
responseFormat: "JSON_EVENTS"
# Recommended variant: send full EEBUS SPINE-like <Datagram> XML to an external detector.
externalEebusDatagramRest:
type: EXTERNAL_REST_EVENTS
config:
baseUrl: "http://CHANGE-ME-IN-PROFILE"
path: "/api/v1/alarms/eebus-logic"
timeoutMs: 2000
requestFormat: "DATAGRAM_XML"
responseFormat: "JSON_EVENTS"
internalEebusExpression:
type: EXPRESSION_RULES
config:
missingPolicy: FALSE # FALSE | SKIP | ERROR
rules:
- id: port1_open
severity: MAJOR
expression: "s['port.1.state'] == false"
emitCancel: true
# Simple pin state detector: compares payload.value against ERPNext malis configuration "bad".
pinBadState:
type: PIN_BAD_STATE
config:
severity: MAJOR
faultKeyTemplate: "${tenant}:${site}:${iotDevice}:PIN:${pin}"
# ------------------------------------------------------------
# Lifecycle (caller-managed state)
# ------------------------------------------------------------
lifecyclePolicies:
default:
type: SIMPLE
dedupWindowMs: 10000
debounceMs: 300
autoClearTtl: "PT0S"
portsFlappy:
type: SIMPLE
dedupWindowMs: 10000
debounceMs: 1000
autoClearTtl: "PT30M"
passthrough:
type: SIMPLE
mode: FORWARD_ALL
dedupWindowMs: 0
debounceMs: 0
# Grouped/composed alarms.
# Produces one group alarm per groupKeyTemplate and tracks multiple member sources via sourceKeyTemplate.
# Emits UPDATE events (as action=ALARM + details.groupEvent=UPDATE) so receivers can see member changes.
compositeDefault:
type: COMPOSITE
# If you want the group instance to be notified for *every* member signal (not only transitions),
# set: mode: FORWARD_ALL
# Example only <20> you must ensure these vars exist (via binding extract, parser vars, or detector details -> vars).
groupKeyTemplate: "${site}|${department}|${function}|${faultDay}"
sourceKeyTemplate: "${path}"
emitUpdates: true
updateThrottleMs: 0
closeGrace: "PT0S"
idleTimeout: "PT0S"
noNotificationTimeout: "PT0S"
maxLifetime: "PT0S"
sourceTtl: "PT0S"
terminateEmitsCancel: true
compositeTickIntervalMs: 30000
# ------------------------------------------------------------
# Outputs (where the CEP publishes resulting alarms)
# ------------------------------------------------------------
outputs:
mqttMalisEebus:
type: MQTT
format: EEBUS_ALARM_MESSAGE
config:
brokerRef: "primary"
topicTemplate: "malis/alarms/${site}/${department}"
qos: 1
retained: false
# Output as a full EEBUS-like <Datagram> XML.
mqttMalisEebusDatagramXml:
type: MQTT
format: EEBUS_ALARM_DATAGRAM_XML
config:
brokerRef: "primary"
topicTemplate: "malis/alarms/xml/${site}/${department}"
qos: 1
retained: false
# These are consumed by EebusAlarmDatagramXmlFormatter
addressSourceTemplate: "${deviceId}"
addressDestinationTemplate: "malis-cep"
# Optional: extra header/cmd fields for "fuller" datagrams
headerExtras:
specificationVersion: "3.1.0"
cmdExtras:
cmdId: "${faultKey}"
cmdClassifier:
alarm: "write"
cancel: "delete"
# Example RabbitMQ output (enable a rabbit broker in env/*.yml first)
rabbitMalisEebus:
type: RABBIT
format: EEBUS_ALARM_MESSAGE
config:
brokerRef: "rabbit"
exchange: "malis.alarms"
routingKeyTemplate: "alarms.${site}.${department}"
dlqOut:
type: MQTT
format: EEBUS_ALARM_MESSAGE
config:
brokerRef: "primary"
topicTemplate: "malis/dlq"
outboxMalisEebusDatagramXml:
type: OUTBOX
format: EEBUS_ALARM_DATAGRAM_XML
config:
correlationVar: requestId
maxPerRequest: 200
maxTotal: 5000
outboxMalisFaultJson:
type: OUTBOX
format: MALIS_CEP_ALARM_JSON
config:
correlationVar: requestId
maxPerRequest: 200
maxTotal: 5000
# REST/HTTP output example (only used if referenced by a binding):
# Sends the formatted payload to an HTTP endpoint.
# - bodyMode: RAW sends the formatter payload bytes directly.
# - bodyMode: ENVELOPE_JSON sends a JSON wrapper with payloadBase64 + vars.
restMalisEebus:
type: REST
format: EEBUS_ALARM_MESSAGE
config:
urlTemplate: "http://CHANGE-ME/api/malis/alarms/${tenant}/${site}"
method: "POST"
timeoutMs: 5000
connectTimeoutMs: 3000
bodyMode: "ENVELOPE_JSON"
headers:
X-Tenant: "${tenant}"
X-Site: "${site}"
X-Department: "${department}"
# bearerTokenTemplate: "QRFs5WAd/SkdnrInPhgIioUysSC4XN5tYGgad6im8RBz8xKp2JKlnUhKd7OGoCDA"
# basicAuth:
# usernameTemplate: "${restUser}"
# passwordTemplate: "${restPass}"
maxErrorBodyChars: 2000
# MalIS Fault REST endpoint payload.
# This uses a dedicated formatter (format: MALIS_FAULT_JSON) to produce the exact JSON shape expected
# by MalIS fault endpoints. The REST publisher then sends it as RAW bytes.
restMalisFault:
type: REST
format: MALIS_CEP_ALARM_JSON #MALIS_FAULT_JSON
config:
urlTemplate: "http://localhost:8080/api/v1/alarms"
method: "POST"
timeoutMs: 5000
connectTimeoutMs: 3000
bodyMode: "RAW"
headers:
X-Tenant: "${tenant}"
X-Site: "${site}"
X-Department: "${department}"
bearerTokenTemplate: "QRFs5WAd/SkdnrInPhgIioUysSC4XN5tYGgad6im8RBz8xKp2JKlnUhKd7OGoCDA"
# --- Formatter templates (optional) ---
# siteIdTemplate: "${site}"
# departmentTemplate: "${department}"
# pinNameTemplate: "${pinName}"
# remarkTemplate: "${remark}"
# subjectTemplate: "${pinName}"
priorityTemplate: "High" # or "${priority}"; if blank, derived from alarm.severity
faultDayZone: "UTC"
# Populate valueList by either:
# - providing variables with prefix valueList.* (e.g. valueList.error_code)
# - or defining templates here:
valueList:
timestamp_device: "${occurredAt}"
# inverter_id: "${valueList.inverter_id}"
# error_code: "${valueList.error_code}"
includeScalarDetailsInValueList: true
includeEmptyValueList: false
maxErrorBodyChars: 2000
deadLetter:
enabled: true
outputRef: "dlqOut"

@ -0,0 +1,43 @@
cep:
# ------------------------------------------------------------
# Brokers (transport connection info)
# ------------------------------------------------------------
brokers:
primary:
#type: MQTT
#url: "tcp://localhost:1883"
#clientId: "malis-cep-dev"
#username: guest
#password: xD%kZM3
type: MQTT
url: "ssl://emqx.oecloud.eu:17587"
clientId: "malis-cep-dev"
username: mosquitto
password: 2BCiaHW4
tls:
trustAll: true
rabbit:
type: RABBIT
# For Rabbit we use an AMQP URI. vhost can be encoded, e.g. /%2F for default.
url: "amqp://guest:guest@localhost:5672/%2F"
host: localhost
port: 5672
username: guest
password: guest
# Profile overrides for detector endpoints
detectors:
externalM2MRest:
config:
baseUrl: "http://localhost:8000"
externalEebusDatagramRest:
config:
baseUrl: "http://localhost:8000"
eebusXsdValidation:
enabled: true
failFast: true
schemaRoot: "EEBus_SPINE_TS_Datagram.xsd"

@ -0,0 +1,22 @@
cep:
brokers:
primary:
type: MQTT
url: "ssl://mqtt.example.com:8883"
clientId: "malis-cep-prod"
# username/password optional
# username: "..."
# password: "..."
rabbit:
type: RABBIT
url: "amqps://user:pass@rabbit.example.com:5671/%2F"
detectors:
externalM2MRest:
config:
baseUrl: "http://..."
externalEebusDatagramRest:
config:
baseUrl: "http://..."

@ -0,0 +1,203 @@
/**
* JS Processor: M2M pin update + ERPNext lookup + bad-state alarm detection.
*
* Combines the behavior of:
* - M2mPinUpdateJsonErpnextParser.java (JSON parse + ERPNext enrich vars)
* - PinBadStateDetector.java (value == bad -> ALARM else CANCEL)
*
* Host objects provided by Java:
* - erp.lookup(port, pin, isoTimestamp) -> { siteId, department, pinName, description, remark, bad, iotDevice, ... }
* - tpl.expand(template, vars) -> expands ${var} placeholders
*
* Entry function must be named 'process' (or configured via entryFunction).
*/
function process(input, cfg) {
// input: { inType, path, headers, payloadText, payloadBase64, vars }
// cfg: processor config map
const varsIn = input && input.vars ? input.vars : {};
const varsOut = {}; // delta only
let msg;
try {
msg = JSON.parse(input.payloadText || "{}");
} catch (e) {
// invalid JSON -> drop to DLQ by throwing
throw new Error("Invalid JSON payload: " + e);
}
// Fields (match Java parser)
const gpio = str(msg.gpio);
const channel = str(msg.channel);
const value = (msg.value === undefined || msg.value === null) ? null : Number(msg.value);
const reason = str(msg.reason);
const m2mport = str(msg.m2mport);
// Timestamp: payload.timestamp -> mqtt.sentAt -> now
const ts = normalizeTs(str(msg.timestamp) || (msg.mqtt && str(msg.mqtt.sentAt)) || new Date().toISOString());
// Resolve port/pin from topic vars
// in topic we have deviceId in place of port, so add it to variables explicitly
const port = str(varsIn.deviceId);
put(varsOut, 'port', port);
const pin = (varsIn.pin === undefined || varsIn.pin === null) ? null : Number(varsIn.pin);
// ERPNext enrich
let erpCtx = null;
if (port && pin !== null && pin !== undefined && !Number.isNaN(pin) && typeof erp !== 'undefined' && erp) {
erpCtx = erp.lookup(port, pin, ts);
}
if (erpCtx) {
// Align var names with Java parser
put(varsOut, 'iotDevice', erpCtx.iotDevice);
put(varsOut, 'deviceId', erpCtx.deviceId || erpCtx.iotDevice);
put(varsOut, 'site', erpCtx.siteId);
put(varsOut, 'remark', erpCtx.remark);
put(varsOut, 'department', erpCtx.department);
put(varsOut, 'pinName', erpCtx.pinName);
put(varsOut, 'pinDescription', erpCtx.description);
put(varsOut, 'remark', erpCtx.remark);
if (erpCtx.bad !== undefined && erpCtx.bad !== null) {
put(varsOut, 'pinBad', erpCtx.bad);
}
// Optional convenience vars for grouping
put(varsOut, 'function', erpCtx.pinName);
}
// Always expose payload hint vars (match Java parser)
if (value !== null && value !== undefined && !Number.isNaN(value)) put(varsOut, 'pinValue', String(value));
if (reason) put(varsOut, 'pinReason', reason);
if (gpio) put(varsOut, 'gpio', gpio);
if (channel) put(varsOut, 'channel', channel);
if (m2mport) put(varsOut, 'm2mport', m2mport);
put(varsOut, 'timestamp', ts);
// faultDay for composite grouping (YYYY-MM-DD)
if (ts && ts.length >= 10) put(varsOut, 'faultDay', ts.substring(0, 10));
// Need pinBad to decide alarm; if missing, throw (consistent with Java detector)
const pinBadStr = str(first(varsOut.pinBad, varsIn.pinBad));
if (!pinBadStr) {
throw new Error("Missing vars.pinBad - ERPNext enrichment did not provide bad state for port=" + port + " pin=" + pin);
}
const pinBad = Number(pinBadStr);
if (Number.isNaN(pinBad)) {
throw new Error("Invalid vars.pinBad: " + pinBadStr);
}
// Decide event type
let action = 'CANCEL';
if (value !== null && value !== undefined && !Number.isNaN(value)) {
action = (Number(value) === Number(pinBad)) ? 'ALARM' : 'CANCEL';
} else {
// no value -> no event
return { drop: true };
}
// Build a merged vars map for templates
const mergedVars = Object.assign({}, varsIn, varsOut);
// faultKey from template
const faultKeyTemplate = (cfg && cfg.faultKeyTemplate) ? String(cfg.faultKeyTemplate) : "${tenant}:${site}:${iotDevice}:PIN:${pin}";
let faultKey;
if (typeof tpl !== 'undefined' && tpl && tpl.expand) {
faultKey = tpl.expand(faultKeyTemplate, mergedVars);
} else {
faultKey = fallbackExpand(faultKeyTemplate, mergedVars);
}
if (!faultKey || !faultKey.trim()) {
faultKey = "PIN:" + (mergedVars.pin || 'unknown');
}
const severity = (cfg && cfg.severity) ? String(cfg.severity) : 'MAJOR';
// Build details (match Java detector)
const details = {
gpio: gpio,
channel: channel,
value: value,
bad: pinBad,
reason: reason,
m2mport: m2mport,
mqtt: (msg.mqtt && typeof msg.mqtt === 'object') ? msg.mqtt : undefined,
pinName: mergedVars.pinName,
pinDescription: mergedVars.pinDescription,
remark: mergedVars.remark
};
// Remove undefined properties
Object.keys(details).forEach(k => { if (details[k] === undefined || details[k] === null || details[k] === "") delete details[k]; });
return {
vars: varsOut,
events: [
{
action: action,
faultKey: faultKey,
severity: severity,
occurredAt: ts,
details: details
}
]
};
}
function str(v) {
if (v === undefined || v === null) return null;
const s = String(v).trim();
return s.length ? s : null;
}
function put(obj, k, v) {
const s = str(v);
if (s) obj[k] = s;
}
function first(a, b) {
return (a !== undefined && a !== null && String(a).trim() !== '') ? a : b;
}
function normalizeTs(s) {
// best-effort normalization to ISO-8601 instant string
const t = str(s);
if (!t) return new Date().toISOString();
// if already ISO-ish, keep
return t;
}
function fallbackExpand(tplStr, vars) {
// minimal ${k} replacement
return String(tplStr).replace(/\$\{([^}]+)\}/g, (_, k) => {
const v = vars[k];
return (v === undefined || v === null) ? '' : String(v);
});
}
function dump(obj) {
if (obj == null) return "null";
// If it's a JS object / ProxyObject
try {
return JSON.stringify(obj);
} catch (e) { /* ignore */ }
// If it's a Java Map-like (has entrySet)
try {
if (typeof obj.entrySet === "function") {
const it = obj.entrySet().iterator();
const out = {};
while (it.hasNext()) {
const e = it.next();
out[String(e.getKey())] = e.getValue();
}
return JSON.stringify(out);
}
} catch (e) { /* ignore */ }
// Fallback: best-effort string
try { return String(obj); } catch (e) { return "[unprintable]"; }
}

@ -0,0 +1,139 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>at.co.procon</groupId>
<artifactId>malis-cep</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>malis-cep</name>
<description>Malis CEP prototype (bindings + parsers + detectors + lifecycle + outputs)</description>
<properties>
<java.version>17</java.version>
<spring-boot.version>3.3.6</spring-boot.version>
<graalvm.version>25.0.2</graalvm.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Integration core + MQTT (Paho) for real MQTT ingress/egress -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- Spring AMQP for real RabbitMQ publishing + optional ingress -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<!-- GraalJS (Polyglot) for JavaScript processors/detectors -->
<dependency>
<groupId>org.graalvm.polyglot</groupId>
<artifactId>polyglot</artifactId>
<version>${graalvm.version}</version>
</dependency>
<dependency>
<groupId>org.graalvm.polyglot</groupId>
<artifactId>js</artifactId>
<version>${graalvm.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.openmuc.jeebus</groupId>
<artifactId>spine</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.jvnet.jaxb</groupId>
<artifactId>jaxb-plugins-runtime</artifactId>
<version>4.0.12</version>
</dependency>
<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Loading…
Cancel
Save