From b3a7aeeb97baf078f4f5d9b38b708327cdcbe1a9 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Thu, 12 Mar 2026 12:33:08 +0100 Subject: [PATCH] MalIS-CEP initial import --- README.md | 154 ++++++++ config/cep/bindings/http-eebus-alarm.yml | 15 + .../cep/bindings/http-eebus-measurements.yml | 30 ++ config/cep/bindings/mqtt-eebus-alarm.yml | 16 + .../cep/bindings/mqtt-eebus-measurements.yml | 17 + .../cep/bindings/mqtt-m2m-pin-composite.yml | 30 ++ config/cep/bindings/mqtt-m2m-pin-js.yml | 24 ++ config/cep/bindings/mqtt-m2m-pin.yml | 17 + config/cep/bindings/mqtt-ports8.yml | 18 + config/cep/bindings/rabbit-eebus-alarm.yml | 16 + config/cep/common.yml | 363 ++++++++++++++++++ config/cep/env/dev.yml | 43 +++ config/cep/env/prod.yml | 22 ++ .../cep/scripts/m2m-pin-erpnext-processor.js | 203 ++++++++++ pom.xml | 139 +++++++ 15 files changed, 1107 insertions(+) create mode 100644 README.md create mode 100644 config/cep/bindings/http-eebus-alarm.yml create mode 100644 config/cep/bindings/http-eebus-measurements.yml create mode 100644 config/cep/bindings/mqtt-eebus-alarm.yml create mode 100644 config/cep/bindings/mqtt-eebus-measurements.yml create mode 100644 config/cep/bindings/mqtt-m2m-pin-composite.yml create mode 100644 config/cep/bindings/mqtt-m2m-pin-js.yml create mode 100644 config/cep/bindings/mqtt-m2m-pin.yml create mode 100644 config/cep/bindings/mqtt-ports8.yml create mode 100644 config/cep/bindings/rabbit-eebus-alarm.yml create mode 100644 config/cep/common.yml create mode 100644 config/cep/env/dev.yml create mode 100644 config/cep/env/prod.yml create mode 100644 config/cep/scripts/m2m-pin-erpnext-processor.js create mode 100644 pom.xml diff --git a/README.md b/README.md new file mode 100644 index 0000000..68b642d --- /dev/null +++ b/README.md @@ -0,0 +1,154 @@ +# malis-cep (prototype) + +Spring Boot prototype demonstrating a **config-driven CEP** pipeline: + +- split configuration (common + env profile + bindings directory) +- routing by binding (source match -> parser -> detector -> lifecycle -> outputs) +- external detectors return **events only** (ALARM/CANCEL); state is managed in this service +- caller-managed lifecycle: transition-only publishing (dedup + debounce) +- pluggable outputs + DLQ envelope + +It also contains **real transport implementations**: + +- MQTT ingress via **Spring Integration MQTT (Paho)** +- MQTT egress via **Paho** with per-output `brokerRef` (see `MqttBrokerRegistry`) +- RabbitMQ egress (and optional ingress) via **Spring AMQP** + +## EEBUS XML support (prototype) + +This prototype includes minimal, namespace-tolerant DOM parsing/formatting for: + +- full SPINE-like `` **measurement** payloads (`EEBUS_MEASUREMENT_DATAGRAM_XML`) +- full SPINE-like `` **alarm** payloads (`EEBUS_ALARM_DATAGRAM_XML`) +- formatting detected alarms into a full `` (`EEBUS_ALARM_DATAGRAM_XML`) + +The XML samples used by unit tests are in `src/test/resources/eebus/`. + +## Internal detector (expression rules) + +This prototype now includes a fully **internal** detector that evaluates configurable expressions on top of a +protocol-agnostic input event model. + +- detector type: `EXPRESSION_RULES` +- accepts parsed bodies: + - `EEBUS_MEASUREMENT_DATAGRAM_XML` (full ``) + - `MEASUREMENT_SET` (from `JSON_PORTS_BOOLEAN`, etc.) + - `EventBatch` (advanced: other converters can produce this directly) + +Rules are boolean SpEL expressions over the in-memory per-device signal store. + +Example `common.yml` snippet: + +```yaml +cep: + detectors: + internalPorts: + type: EXPRESSION_RULES + config: + missingPolicy: FALSE + rules: + - id: any_port_open + severity: MAJOR + expression: "s['port.1.state'] == false || s['port.2.state'] == false || s['port.3.state'] == false" + emitCancel: true +``` + +Each rule emits `ALARM` when the expression evaluates to `true`, otherwise `CANCEL` (unless `emitCancel=false`). +State transitions are still managed by `AlarmLifecycleService`. + +## Heartbeat (periodic state propagation) + +By default, bindings publish only on state transitions (`ALARM` on open, `CANCEL` on close). + +You can optionally enable a **per-binding heartbeat** that periodically re-emits the *current* state +(useful when downstream systems want a "still active" / "still inactive" signal). + +Example in a binding YAML: + +```yaml +cep: + bindings: + mqttEebusAlarm: + # ... + heartbeat: + enabled: true + periodMs: 60000 + includeInactive: false # set true to also emit CANCEL heartbeats + initialDelayMs: 60000 # optional; if 0/absent it defaults to periodMs +``` + +Heartbeat messages keep the normal action (`ALARM` or `CANCEL`) and add `details.heartbeat=true`. +While a debounced CANCEL is pending, heartbeats continue as `ALARM` until the debounce maturity is reached. + +## Config layout + +The app uses Spring Boot's config import: + +- `./config/cep/common.yml` +- `./config/cep/env/${spring.profiles.active}.yml` +- all binding files in `./config/cep/bindings/*.yml` + +Bindings are merged into `cep.bindings` by `BindingFileLoader`. + +## Run (dev) + +```bash +mvn spring-boot:run -Dspring-boot.run.profiles=dev +``` + +### Local MQTT + +The default dev profile expects MQTT at `tcp://localhost:1883`. + +You can test with e.g. Mosquitto and publish: + +```bash +# Scenario 3: raw ports JSON (8 boolean) +mosquitto_pub -h localhost -t "/sadfjsadf/site-a/heating" -m '{"port1":true,"port2":true,"port3":true,"port4":true,"port5":true,"port6":true,"port7":true,"port8":false}' + +# Scenario 1/2: device publishes an alarm (JSON or full XML) +mosquitto_pub -h localhost -t "alarms/site-a/heating/device-1" -m '{"alarmCode":"X","action":"ALARM"}' +``` + +The produced alarms are published to: + +- `malis/alarms/${site}/${department}` (JSON alarm message) +- `malis/alarms/xml/${site}/${department}` (full XML) +- DLQ to `malis/dlq` (on errors / no binding match) + +### Local RabbitMQ (optional) + +`./config/cep/common.yml` ships with a Rabbit output example (`rabbitMalisEebus`) and a Rabbit ingress example. + +To enable Rabbit ingress, set in `common.yml` or a profile override: + +```yaml +cep: + ingress: + rabbit: + enabled: true +``` + +Dev profile includes a default Rabbit broker URI: + +- `amqp://guest:guest@localhost:5672/%2F` + +## Test via HTTP (transport simulation) + +If you don't want to run MQTT/Rabbit locally, you can inject a message directly: + +```bash +curl -X POST "http://localhost:8080/api/test/ingress?inType=MQTT&path=/sadfjsadf/site-a/heating" \ + -H "Content-Type: application/json" \ + -d '{"port1":true,"port2":true,"port3":true,"port4":true,"port5":true,"port6":true,"port7":true,"port8":false}' +``` + +## Where to look in code + +- `at.co.procon.malis.cep.binding.BindingResolver` – resolves an ingress message to **exactly one** binding +- `at.co.procon.malis.cep.parser.*` – parsers (raw payload -> canonical model) +- `at.co.procon.malis.cep.detector.*` – detectors (canonical model -> events) +- `at.co.procon.malis.cep.lifecycle.AlarmLifecycleService` – in-memory state (transition-only) +- `at.co.procon.malis.cep.pipeline.CepPipeline` – orchestration + DLQ +- `at.co.procon.malis.cep.transport.mqtt.*` – real MQTT ingress/egress +- `at.co.procon.malis.cep.transport.rabbit.*` – Rabbit registry + optional ingress diff --git a/config/cep/bindings/http-eebus-alarm.yml b/config/cep/bindings/http-eebus-alarm.yml new file mode 100644 index 0000000..3c44de2 --- /dev/null +++ b/config/cep/bindings/http-eebus-alarm.yml @@ -0,0 +1,15 @@ + cep: + bindings: + http-eebus-alarm: + enabled: true + in: + type: HTTP + match: + pathRegex: "^/rest/alarms/(?[^/]+)/(?[^/]+)/(?[^/]+)$" + extract: + tenant: "default" + parserRef: "eebusAlarmDatagramXml" # ensure this parser is registered to EebusAlarmDatagramXmlParser + detectorRef: "passthroughEebusAlarm" # PassthroughAlarmDetector + lifecyclePolicyRef: "passthrough" + outputRefs: + - "outboxMalisEebusDatagramXml" # Output with format EEBUS_ALARM_DATAGRAM_XML diff --git a/config/cep/bindings/http-eebus-measurements.yml b/config/cep/bindings/http-eebus-measurements.yml new file mode 100644 index 0000000..abdd62f --- /dev/null +++ b/config/cep/bindings/http-eebus-measurements.yml @@ -0,0 +1,30 @@ +cep: + bindings: + http-eebus-measurement-datagram-to-external-detector: + enabled: true + in: + type: HTTP + match: + # Example topic scheme; adjust to your real gateway convention. + pathRegex: "^measurements/(?[^/]+)/(?[^/]+)/(?[^/]+)$" + extract: + tenant: "default" + # Expect a full EEBUS SPINE-like XML payload. + parserRef: "eebusMeasurementDatagramXml" + # External detector receives the full XML (recommended requestFormat=DATAGRAM_XML). + detectorRef: "externalEebusDatagramRest" + lifecyclePolicyRef: "default" + outputRefs: ["outboxMalisEebusDatagramXml"] + http-eebus-measurement-datagrams-to-internal-detector: + enabled: true + in: + type: HTTP + match: + pathRegex: "^eebus/measurement-datagrams/(?[^/]+)/(?[^/]+)/(?[^/]+)$" + extract: + tenant: "default" + parserRef: "eebusMeasurementDatagramXml" + detectorRef: "internalEebusExpression" + lifecyclePolicyRef: "default" + # For local testing without MQTT/Rabbit, outbox is fine; /api/test/ingress will also show detector/lifecycle results. + outputRefs: ["outboxMalisEebusDatagramXml", "mqttMalisEebusDatagramXml"] diff --git a/config/cep/bindings/mqtt-eebus-alarm.yml b/config/cep/bindings/mqtt-eebus-alarm.yml new file mode 100644 index 0000000..0f0e253 --- /dev/null +++ b/config/cep/bindings/mqtt-eebus-alarm.yml @@ -0,0 +1,16 @@ +cep: + bindings: + mqtt-eebus-alarm-generic: + enabled: true + in: + type: MQTT + match: + topicRegex: "^alarms/(?[^/]+)/(?[^/]+)/(?[^/]+)$" + extract: + tenant: "default" + # Accept both JSON and full XML payloads + parserRef: "eebusAlarm" + detectorRef: "passthroughEebusAlarm" + lifecyclePolicyRef: "default" + # Publish both a JSON alarm message and a full XML (useful for testing) + outputRefs: ["mqttMalisEebus", "mqttMalisEebusDatagramXml"] diff --git a/config/cep/bindings/mqtt-eebus-measurements.yml b/config/cep/bindings/mqtt-eebus-measurements.yml new file mode 100644 index 0000000..1e197d5 --- /dev/null +++ b/config/cep/bindings/mqtt-eebus-measurements.yml @@ -0,0 +1,17 @@ +cep: + bindings: + mqtt-eebus-measurement-datagram-to-external-detector: + enabled: true + in: + type: MQTT + match: + # Example topic scheme; adjust to your real gateway convention. + topicRegex: "^measurements/(?[^/]+)/(?[^/]+)/(?[^/]+)$" + extract: + tenant: "default" + # Expect a full EEBUS SPINE-like XML payload. + parserRef: "eebusMeasurementDatagramXml" + # External detector receives the full XML (recommended requestFormat=DATAGRAM_XML). + detectorRef: "externalEebusDatagramRest" + lifecyclePolicyRef: "default" + outputRefs: ["mqttMalisEebusDatagramXml"] diff --git a/config/cep/bindings/mqtt-m2m-pin-composite.yml b/config/cep/bindings/mqtt-m2m-pin-composite.yml new file mode 100644 index 0000000..056971b --- /dev/null +++ b/config/cep/bindings/mqtt-m2m-pin-composite.yml @@ -0,0 +1,30 @@ +cep: + bindings: + # MQTT ingress topics: /// + # Example: 40200/m2m/tenantA/1 + mqttM2mPinErpnextComposite: + enabled: true + in: + type: MQTT + match: + topicRegex: "^(?[^/]+)/(?[^/]+)/(?[^/]+)/(?[^/]+)$" + # Extracted group vars: port, app, tenant, pin + parserRef: m2mPinUpdateErpnext + detectorRef: pinBadState + + lifecyclePolicyRef: compositeDefault + + composite: + groupKeyTemplate: "${site}|${department}|Pumpensumpf" # function should be specified in configuration "${site}|${department}|${function}" + sourceKeyTemplate: "${path}" # or "${port}/${app}/${tenant}/${pin}" + + outputRefs: + # Publishes EEBUS-like XML to MQTT using the templates in config/cep/common.yml + - restMalisFault + - outboxMalisFaultJson + + heartbeat: + enabled: true + periodMs: 600000 + includeInactive: true + diff --git a/config/cep/bindings/mqtt-m2m-pin-js.yml b/config/cep/bindings/mqtt-m2m-pin-js.yml new file mode 100644 index 0000000..a044031 --- /dev/null +++ b/config/cep/bindings/mqtt-m2m-pin-js.yml @@ -0,0 +1,24 @@ +cep: + bindings: + # MQTT ingress topics: /// + # Example: 40200/m2m/tenantA/1 + # + # This binding uses a JS processor (parser+detector in one step). + mqttM2mPinErpnextJs: + enabled: false # set true to use this binding + in: + type: MQTT + match: + topicRegex: "^(?[^/]+)/(?[^/]+)/(?[^/]+)/(?[^/]+)$" + + processorRef: m2mPinErpnextJs + + # You can choose SIMPLE or COMPOSITE lifecycle. + lifecyclePolicyRef: default + # Example COMPOSITE usage (uncomment and set lifecyclePolicyRef: compositeDefault) + # composite: + # groupKeyTemplate: "${site}|${department}|${function}" + # sourceKeyTemplate: "${path}" + + outputRefs: + - restMalisFault # if you configured a REST output with MALIS_FAULT_JSON diff --git a/config/cep/bindings/mqtt-m2m-pin.yml b/config/cep/bindings/mqtt-m2m-pin.yml new file mode 100644 index 0000000..1d0e571 --- /dev/null +++ b/config/cep/bindings/mqtt-m2m-pin.yml @@ -0,0 +1,17 @@ +cep: + bindings: + # MQTT ingress topics: /// + # Example: 40200/m2m/tenantA/1 + mqttM2mPinErpnext: + enabled: false + in: + type: MQTT + match: + topicRegex: "^(?[^/]+)/(?[^/]+)/(?[^/]+)/(?[^/]+)$" + # Extracted group vars: port, app, tenant, pin + parserRef: m2mPinUpdateErpnext + detectorRef: pinBadState + lifecyclePolicyRef: default + outputRefs: + # Publishes EEBUS-like XML to MQTT using the templates in config/cep/common.yml + - restMalisFault diff --git a/config/cep/bindings/mqtt-ports8.yml b/config/cep/bindings/mqtt-ports8.yml new file mode 100644 index 0000000..ade7aeb --- /dev/null +++ b/config/cep/bindings/mqtt-ports8.yml @@ -0,0 +1,18 @@ +cep: + bindings: + mqtt-ports8-raw-json-to-external-detector: + enabled: true + in: + type: MQTT + match: + topicRegex: "^/sadfjsadf/(?[^/]+)/(?[^/]+)$" + extract: + tenant: "default" + deviceIdStrategy: + type: "HASH_OF_TOPIC" + config: + salt: "ports8" + parserRef: "jsonPorts8" + detectorRef: "externalM2MRest" + lifecyclePolicyRef: "portsFlappy" + outputRefs: ["mqttMalisEebus", "mqttMalisEebusDatagramXml"] diff --git a/config/cep/bindings/rabbit-eebus-alarm.yml b/config/cep/bindings/rabbit-eebus-alarm.yml new file mode 100644 index 0000000..56415ff --- /dev/null +++ b/config/cep/bindings/rabbit-eebus-alarm.yml @@ -0,0 +1,16 @@ +cep: + bindings: + rabbit-eebus-alarm-routingkey: + enabled: false + in: + type: RABBIT + match: + # IMPORTANT: for Rabbit ingress we match on the *received routing key* if present, + # otherwise we match on the consumer queue name. + queueRegex: "^alarms\\.(?[^.]+)\\.(?[^.]+)\\.(?[^.]+)$" + extract: + tenant: "default" + parserRef: "eebusAlarm" + detectorRef: "passthroughEebusAlarm" + lifecyclePolicyRef: "default" + outputRefs: ["rabbitMalisEebus"] diff --git a/config/cep/common.yml b/config/cep/common.yml new file mode 100644 index 0000000..cd339a5 --- /dev/null +++ b/config/cep/common.yml @@ -0,0 +1,363 @@ +cep: + # ------------------------------------------------------------ + # Ingress (transport) + # ------------------------------------------------------------ + # Ingress controls how the service RECEIVES raw events (MQTT/Rabbit/HTTP). + # Bindings (below, in ./config/cep/bindings/*.yml) control how we ROUTE + PROCESS them. + ingress: + mqtt: + enabled: true + brokerRef: "primary" + # MQTT topic filters (wildcards), NOT regex. Routing still happens via regex in bindings. + subscriptions: + - "alarms/#" + - "measurements/#" + # M2M GPIO pin update topics: /// + - "40092/malis/open-energy.at/1" + - "40092/malis/open-energy.at/2" + #- "+/malis/+/+" + qos: 1 + automaticReconnect: true + cleanSession: false + cleanStart: false + + rabbit: + enabled: false + brokerRef: "rabbit" + queues: + - "malis.ingress" + concurrentConsumers: 1 + maxConcurrentConsumers: 4 + prefetch: 50 + + # ------------------------------------------------------------ + # Parsers (ingress payload -> canonical internal model) + # ------------------------------------------------------------ + parsers: + # Accept both JSON "alarm message" and full XML (migration helper) + eebusAlarm: + type: SMART_EEBUS_ALARM + + eebusAlarmDatagramXml: + type: EEBUS_ALARM_DATAGRAM_XML + + eebusMeasurementDatagramXml: + type: EEBUS_MEASUREMENT_DATAGRAM_XML + + jsonPorts8: + type: JSON_PORTS_BOOLEAN + config: + portCount: 8 + keyPattern: "port{n}" + measurementPathPattern: "port.{n}.state" + + # Optional: if one topic may contain either ports-JSON or EEBUS measurements + smartPortsOrEebusMeasurement: + type: SMART_PORTS_OR_EEBUS_MEASUREMENT + config: + portCount: 8 + keyPattern: "port{n}" + measurementPathPattern: "port.{n}.state" + + # M2M GPIO pin updates: enrich by resolving site/department/pin name/bad state from ERPNext. + m2mPinUpdateErpnext: + type: M2M_PIN_UPDATE_JSON_ERPNEXT + config: + erpnext: + baseUrl: "https://open-energy.erpnext.components.at" + apiKey: "255960115a8adec" + apiSecret: "0aa65b386bc7bbb" + timeoutMs: 5000 + # Optional overrides (if your doctypes/fields are renamed): + # doctypes: + # iotDevice: "IoT_Device" + # site: "Site" + # siteIotDevice: "Site_IoT_Device" + # malisConfig: "MalIS Configuration" + # malisConfigDetail: "MalIS Configuration Detail" + # fields: + # iotDevicePortAddress: "port_address" + # siteIotDevicesAssigned: "iot_devices_assigned" + # siteIotDeviceIotDevice: "iot_device" + # siteIotDeviceMalis: "malis" + # siteIotDeviceRemark: "remark" + # siteIotDeviceFrom: "from" + # siteIotDeviceTo: "to" + # malisDetailPin: "pin" + # malisDetailStream: "stream" + # malisDetailStreamValue: "DigitalInput" + # malisDetailPinName: "signal_name" + # malisDetailDescription: "description" + # malisDetailBad: "bad" + # malisDetailDepartment: "department" + # malisConfigNameField: "name" + # malisConfigDetailsField: "details" + cache: + ttlSeconds: 60 + maxSize: 20000 + # Preload & periodically refresh config locally (recommended for scale): + # - resolves (port,pin) fully in-memory during normal processing + # - background refresh swaps an immutable snapshot atomically + preload: + enabled: true + onStartup: true + refreshSeconds: 60 + initialDelaySeconds: 5 + allowOnDemandFallback: true + + # ------------------------------------------------------------ + # Processors (parser + detector in one step) + # ------------------------------------------------------------ + # A processor receives the raw ingress message (topic/path, headers, payload) and binding vars, + # and directly returns detector events + optional enriched vars. + # + # Bindings using a processor must set: processorRef (and must NOT set parserRef/detectorRef). + processors: + # JS processor combining M2M pin parsing + ERPNext enrichment + bad-state alarm detection. + # Script location is resolved relative to the working directory. + m2mPinErpnextJs: + type: JAVASCRIPT_PROCESSOR + config: + scriptPath: "./config/cep/scripts/m2m-pin-erpnext-processor.js" + entryFunction: "process" + hotReload: true + timeoutMs: 0 + + # These map to PinBadStateDetector defaults. + faultKeyTemplate: "${tenant}:${site}:${iotDevice}:PIN:${pin}" + severity: "MAJOR" + + # ERPNext client used by JS via erp.lookup(port,pin,timestamp) + erpnext: + baseUrl: "https://open-energy.erpnext.components.at" + apiKey: "255960115a8adec" + apiSecret: "0aa65b386bc7bbb" + timeoutMs: 5000 + cache: + ttlSeconds: 120 + maxSize: 20000 + preload: + enabled: true + onStartup: true + refreshSeconds: 120 + initialDelaySeconds: 5 + allowOnDemandFallback: true + + # ------------------------------------------------------------ + # Detectors (canonical model -> DETECTOR EVENTS (ALARM/CANCEL)) + # ------------------------------------------------------------ + detectors: + passthroughEebusAlarm: + type: PASSTHROUGH_ALARM + + externalM2MRest: + type: EXTERNAL_REST_EVENTS + config: + baseUrl: "http://CHANGE-ME-IN-PROFILE" + path: "/api/v1/alarms/m2m-alarmlogic-01" + timeoutMs: 1500 + requestFormat: "JSON_PORTS_LIST" + responseFormat: "JSON_EVENTS" + + # Recommended variant: send full EEBUS SPINE-like XML to an external detector. + externalEebusDatagramRest: + type: EXTERNAL_REST_EVENTS + config: + baseUrl: "http://CHANGE-ME-IN-PROFILE" + path: "/api/v1/alarms/eebus-logic" + timeoutMs: 2000 + requestFormat: "DATAGRAM_XML" + responseFormat: "JSON_EVENTS" + + internalEebusExpression: + type: EXPRESSION_RULES + config: + missingPolicy: FALSE # FALSE | SKIP | ERROR + rules: + - id: port1_open + severity: MAJOR + expression: "s['port.1.state'] == false" + emitCancel: true + + # Simple pin state detector: compares payload.value against ERPNext malis configuration "bad". + pinBadState: + type: PIN_BAD_STATE + config: + severity: MAJOR + faultKeyTemplate: "${tenant}:${site}:${iotDevice}:PIN:${pin}" + + # ------------------------------------------------------------ + # Lifecycle (caller-managed state) + # ------------------------------------------------------------ + lifecyclePolicies: + default: + type: SIMPLE + dedupWindowMs: 10000 + debounceMs: 300 + autoClearTtl: "PT0S" + + portsFlappy: + type: SIMPLE + dedupWindowMs: 10000 + debounceMs: 1000 + autoClearTtl: "PT30M" + + passthrough: + type: SIMPLE + mode: FORWARD_ALL + dedupWindowMs: 0 + debounceMs: 0 + + # Grouped/composed alarms. + # Produces one group alarm per groupKeyTemplate and tracks multiple member sources via sourceKeyTemplate. + # Emits UPDATE events (as action=ALARM + details.groupEvent=UPDATE) so receivers can see member changes. + compositeDefault: + type: COMPOSITE + # If you want the group instance to be notified for *every* member signal (not only transitions), + # set: mode: FORWARD_ALL + # Example only � you must ensure these vars exist (via binding extract, parser vars, or detector details -> vars). + groupKeyTemplate: "${site}|${department}|${function}|${faultDay}" + sourceKeyTemplate: "${path}" + emitUpdates: true + updateThrottleMs: 0 + closeGrace: "PT0S" + idleTimeout: "PT0S" + noNotificationTimeout: "PT0S" + maxLifetime: "PT0S" + sourceTtl: "PT0S" + terminateEmitsCancel: true + compositeTickIntervalMs: 30000 + + # ------------------------------------------------------------ + # Outputs (where the CEP publishes resulting alarms) + # ------------------------------------------------------------ + outputs: + mqttMalisEebus: + type: MQTT + format: EEBUS_ALARM_MESSAGE + config: + brokerRef: "primary" + topicTemplate: "malis/alarms/${site}/${department}" + qos: 1 + retained: false + + # Output as a full EEBUS-like XML. + mqttMalisEebusDatagramXml: + type: MQTT + format: EEBUS_ALARM_DATAGRAM_XML + config: + brokerRef: "primary" + topicTemplate: "malis/alarms/xml/${site}/${department}" + qos: 1 + retained: false + + # These are consumed by EebusAlarmDatagramXmlFormatter + addressSourceTemplate: "${deviceId}" + addressDestinationTemplate: "malis-cep" + + # Optional: extra header/cmd fields for "fuller" datagrams + headerExtras: + specificationVersion: "3.1.0" + cmdExtras: + cmdId: "${faultKey}" + cmdClassifier: + alarm: "write" + cancel: "delete" + + # Example RabbitMQ output (enable a rabbit broker in env/*.yml first) + rabbitMalisEebus: + type: RABBIT + format: EEBUS_ALARM_MESSAGE + config: + brokerRef: "rabbit" + exchange: "malis.alarms" + routingKeyTemplate: "alarms.${site}.${department}" + + dlqOut: + type: MQTT + format: EEBUS_ALARM_MESSAGE + config: + brokerRef: "primary" + topicTemplate: "malis/dlq" + + outboxMalisEebusDatagramXml: + type: OUTBOX + format: EEBUS_ALARM_DATAGRAM_XML + config: + correlationVar: requestId + maxPerRequest: 200 + maxTotal: 5000 + + outboxMalisFaultJson: + type: OUTBOX + format: MALIS_CEP_ALARM_JSON + config: + correlationVar: requestId + maxPerRequest: 200 + maxTotal: 5000 + + # REST/HTTP output example (only used if referenced by a binding): + # Sends the formatted payload to an HTTP endpoint. + # - bodyMode: RAW sends the formatter payload bytes directly. + # - bodyMode: ENVELOPE_JSON sends a JSON wrapper with payloadBase64 + vars. + restMalisEebus: + type: REST + format: EEBUS_ALARM_MESSAGE + config: + urlTemplate: "http://CHANGE-ME/api/malis/alarms/${tenant}/${site}" + method: "POST" + timeoutMs: 5000 + connectTimeoutMs: 3000 + bodyMode: "ENVELOPE_JSON" + headers: + X-Tenant: "${tenant}" + X-Site: "${site}" + X-Department: "${department}" + # bearerTokenTemplate: "QRFs5WAd/SkdnrInPhgIioUysSC4XN5tYGgad6im8RBz8xKp2JKlnUhKd7OGoCDA" + # basicAuth: + # usernameTemplate: "${restUser}" + # passwordTemplate: "${restPass}" + maxErrorBodyChars: 2000 + + # MalIS Fault REST endpoint payload. + # This uses a dedicated formatter (format: MALIS_FAULT_JSON) to produce the exact JSON shape expected + # by MalIS fault endpoints. The REST publisher then sends it as RAW bytes. + restMalisFault: + type: REST + format: MALIS_CEP_ALARM_JSON #MALIS_FAULT_JSON + config: + urlTemplate: "http://localhost:8080/api/v1/alarms" + method: "POST" + timeoutMs: 5000 + connectTimeoutMs: 3000 + bodyMode: "RAW" + headers: + X-Tenant: "${tenant}" + X-Site: "${site}" + X-Department: "${department}" + bearerTokenTemplate: "QRFs5WAd/SkdnrInPhgIioUysSC4XN5tYGgad6im8RBz8xKp2JKlnUhKd7OGoCDA" + + # --- Formatter templates (optional) --- + # siteIdTemplate: "${site}" + # departmentTemplate: "${department}" + # pinNameTemplate: "${pinName}" + # remarkTemplate: "${remark}" + # subjectTemplate: "${pinName}" + priorityTemplate: "High" # or "${priority}"; if blank, derived from alarm.severity + faultDayZone: "UTC" + + # Populate valueList by either: + # - providing variables with prefix valueList.* (e.g. valueList.error_code) + # - or defining templates here: + valueList: + timestamp_device: "${occurredAt}" + # inverter_id: "${valueList.inverter_id}" + # error_code: "${valueList.error_code}" + + includeScalarDetailsInValueList: true + includeEmptyValueList: false + maxErrorBodyChars: 2000 + + deadLetter: + enabled: true + outputRef: "dlqOut" \ No newline at end of file diff --git a/config/cep/env/dev.yml b/config/cep/env/dev.yml new file mode 100644 index 0000000..8c222de --- /dev/null +++ b/config/cep/env/dev.yml @@ -0,0 +1,43 @@ +cep: + # ------------------------------------------------------------ + # Brokers (transport connection info) + # ------------------------------------------------------------ + brokers: + primary: + #type: MQTT + #url: "tcp://localhost:1883" + #clientId: "malis-cep-dev" + #username: guest + #password: xD%kZM3 + + type: MQTT + url: "ssl://emqx.oecloud.eu:17587" + clientId: "malis-cep-dev" + username: mosquitto + password: 2BCiaHW4 + tls: + trustAll: true + + rabbit: + type: RABBIT + # For Rabbit we use an AMQP URI. vhost can be encoded, e.g. /%2F for default. + url: "amqp://guest:guest@localhost:5672/%2F" + host: localhost + port: 5672 + username: guest + password: guest + + # Profile overrides for detector endpoints + detectors: + externalM2MRest: + config: + baseUrl: "http://localhost:8000" + + externalEebusDatagramRest: + config: + baseUrl: "http://localhost:8000" + + eebusXsdValidation: + enabled: true + failFast: true + schemaRoot: "EEBus_SPINE_TS_Datagram.xsd" \ No newline at end of file diff --git a/config/cep/env/prod.yml b/config/cep/env/prod.yml new file mode 100644 index 0000000..d745e41 --- /dev/null +++ b/config/cep/env/prod.yml @@ -0,0 +1,22 @@ +cep: + brokers: + primary: + type: MQTT + url: "ssl://mqtt.example.com:8883" + clientId: "malis-cep-prod" + # username/password optional + # username: "..." + # password: "..." + + rabbit: + type: RABBIT + url: "amqps://user:pass@rabbit.example.com:5671/%2F" + + detectors: + externalM2MRest: + config: + baseUrl: "http://..." + + externalEebusDatagramRest: + config: + baseUrl: "http://..." diff --git a/config/cep/scripts/m2m-pin-erpnext-processor.js b/config/cep/scripts/m2m-pin-erpnext-processor.js new file mode 100644 index 0000000..da83bf5 --- /dev/null +++ b/config/cep/scripts/m2m-pin-erpnext-processor.js @@ -0,0 +1,203 @@ +/** + * JS Processor: M2M pin update + ERPNext lookup + bad-state alarm detection. + * + * Combines the behavior of: + * - M2mPinUpdateJsonErpnextParser.java (JSON parse + ERPNext enrich vars) + * - PinBadStateDetector.java (value == bad -> ALARM else CANCEL) + * + * Host objects provided by Java: + * - erp.lookup(port, pin, isoTimestamp) -> { siteId, department, pinName, description, remark, bad, iotDevice, ... } + * - tpl.expand(template, vars) -> expands ${var} placeholders + * + * Entry function must be named 'process' (or configured via entryFunction). + */ +function process(input, cfg) { + // input: { inType, path, headers, payloadText, payloadBase64, vars } + // cfg: processor config map + + const varsIn = input && input.vars ? input.vars : {}; + const varsOut = {}; // delta only + + let msg; + try { + msg = JSON.parse(input.payloadText || "{}"); + } catch (e) { + // invalid JSON -> drop to DLQ by throwing + throw new Error("Invalid JSON payload: " + e); + } + + // Fields (match Java parser) + const gpio = str(msg.gpio); + const channel = str(msg.channel); + const value = (msg.value === undefined || msg.value === null) ? null : Number(msg.value); + const reason = str(msg.reason); + const m2mport = str(msg.m2mport); + + // Timestamp: payload.timestamp -> mqtt.sentAt -> now + const ts = normalizeTs(str(msg.timestamp) || (msg.mqtt && str(msg.mqtt.sentAt)) || new Date().toISOString()); + + // Resolve port/pin from topic vars + + // in topic we have deviceId in place of port, so add it to variables explicitly + const port = str(varsIn.deviceId); + put(varsOut, 'port', port); + + const pin = (varsIn.pin === undefined || varsIn.pin === null) ? null : Number(varsIn.pin); + + // ERPNext enrich + let erpCtx = null; + if (port && pin !== null && pin !== undefined && !Number.isNaN(pin) && typeof erp !== 'undefined' && erp) { + erpCtx = erp.lookup(port, pin, ts); + } + + if (erpCtx) { + // Align var names with Java parser + put(varsOut, 'iotDevice', erpCtx.iotDevice); + put(varsOut, 'deviceId', erpCtx.deviceId || erpCtx.iotDevice); + put(varsOut, 'site', erpCtx.siteId); + put(varsOut, 'remark', erpCtx.remark); + put(varsOut, 'department', erpCtx.department); + put(varsOut, 'pinName', erpCtx.pinName); + put(varsOut, 'pinDescription', erpCtx.description); + put(varsOut, 'remark', erpCtx.remark); + if (erpCtx.bad !== undefined && erpCtx.bad !== null) { + put(varsOut, 'pinBad', erpCtx.bad); + } + + // Optional convenience vars for grouping + put(varsOut, 'function', erpCtx.pinName); + } + + // Always expose payload hint vars (match Java parser) + if (value !== null && value !== undefined && !Number.isNaN(value)) put(varsOut, 'pinValue', String(value)); + if (reason) put(varsOut, 'pinReason', reason); + if (gpio) put(varsOut, 'gpio', gpio); + if (channel) put(varsOut, 'channel', channel); + if (m2mport) put(varsOut, 'm2mport', m2mport); + put(varsOut, 'timestamp', ts); + + // faultDay for composite grouping (YYYY-MM-DD) + if (ts && ts.length >= 10) put(varsOut, 'faultDay', ts.substring(0, 10)); + + // Need pinBad to decide alarm; if missing, throw (consistent with Java detector) + const pinBadStr = str(first(varsOut.pinBad, varsIn.pinBad)); + if (!pinBadStr) { + throw new Error("Missing vars.pinBad - ERPNext enrichment did not provide bad state for port=" + port + " pin=" + pin); + } + const pinBad = Number(pinBadStr); + if (Number.isNaN(pinBad)) { + throw new Error("Invalid vars.pinBad: " + pinBadStr); + } + + // Decide event type + let action = 'CANCEL'; + if (value !== null && value !== undefined && !Number.isNaN(value)) { + action = (Number(value) === Number(pinBad)) ? 'ALARM' : 'CANCEL'; + } else { + // no value -> no event + return { drop: true }; + } + + // Build a merged vars map for templates + const mergedVars = Object.assign({}, varsIn, varsOut); + + // faultKey from template + const faultKeyTemplate = (cfg && cfg.faultKeyTemplate) ? String(cfg.faultKeyTemplate) : "${tenant}:${site}:${iotDevice}:PIN:${pin}"; + let faultKey; + if (typeof tpl !== 'undefined' && tpl && tpl.expand) { + faultKey = tpl.expand(faultKeyTemplate, mergedVars); + } else { + faultKey = fallbackExpand(faultKeyTemplate, mergedVars); + } + if (!faultKey || !faultKey.trim()) { + faultKey = "PIN:" + (mergedVars.pin || 'unknown'); + } + + const severity = (cfg && cfg.severity) ? String(cfg.severity) : 'MAJOR'; + + // Build details (match Java detector) + const details = { + gpio: gpio, + channel: channel, + value: value, + bad: pinBad, + reason: reason, + m2mport: m2mport, + mqtt: (msg.mqtt && typeof msg.mqtt === 'object') ? msg.mqtt : undefined, + pinName: mergedVars.pinName, + pinDescription: mergedVars.pinDescription, + remark: mergedVars.remark + }; + + // Remove undefined properties + Object.keys(details).forEach(k => { if (details[k] === undefined || details[k] === null || details[k] === "") delete details[k]; }); + + return { + vars: varsOut, + events: [ + { + action: action, + faultKey: faultKey, + severity: severity, + occurredAt: ts, + details: details + } + ] + }; +} + +function str(v) { + if (v === undefined || v === null) return null; + const s = String(v).trim(); + return s.length ? s : null; +} + +function put(obj, k, v) { + const s = str(v); + if (s) obj[k] = s; +} + +function first(a, b) { + return (a !== undefined && a !== null && String(a).trim() !== '') ? a : b; +} + +function normalizeTs(s) { + // best-effort normalization to ISO-8601 instant string + const t = str(s); + if (!t) return new Date().toISOString(); + // if already ISO-ish, keep + return t; +} + +function fallbackExpand(tplStr, vars) { + // minimal ${k} replacement + return String(tplStr).replace(/\$\{([^}]+)\}/g, (_, k) => { + const v = vars[k]; + return (v === undefined || v === null) ? '' : String(v); + }); +} + +function dump(obj) { + if (obj == null) return "null"; + + // If it's a JS object / ProxyObject + try { + return JSON.stringify(obj); + } catch (e) { /* ignore */ } + + // If it's a Java Map-like (has entrySet) + try { + if (typeof obj.entrySet === "function") { + const it = obj.entrySet().iterator(); + const out = {}; + while (it.hasNext()) { + const e = it.next(); + out[String(e.getKey())] = e.getValue(); + } + return JSON.stringify(out); + } + } catch (e) { /* ignore */ } + + // Fallback: best-effort string + try { return String(obj); } catch (e) { return "[unprintable]"; } +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..fa94a77 --- /dev/null +++ b/pom.xml @@ -0,0 +1,139 @@ + + 4.0.0 + + at.co.procon + malis-cep + 0.1.0-SNAPSHOT + malis-cep + Malis CEP prototype (bindings + parsers + detectors + lifecycle + outputs) + + + 17 + 3.3.6 + 25.0.2 + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-web + + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + + + + + org.springframework.boot + spring-boot-starter-amqp + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + + + + org.graalvm.polyglot + polyglot + ${graalvm.version} + + + org.graalvm.polyglot + js + ${graalvm.version} + pom + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.projectlombok + lombok + true + + + + org.openmuc.jeebus + spine + 3.1.0 + + + + org.jvnet.jaxb + jaxb-plugins-runtime + 4.0.12 + + + + jakarta.xml.bind + jakarta.xml.bind-api + + + + org.glassfish.jaxb + jaxb-runtime + + + + org.springframework.boot + spring-boot-starter-test + test + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${java.version} + ${java.version} + + + + +