cep: # ------------------------------------------------------------ # Ingress (transport) # ------------------------------------------------------------ # Ingress controls how the service RECEIVES raw events (MQTT/Rabbit/HTTP). # Bindings (below, in ./config/cep/bindings/*.yml) control how we ROUTE + PROCESS them. ingress: mqtt: enabled: true brokerRef: "primary" # MQTT topic filters (wildcards), NOT regex. Routing still happens via regex in bindings. subscriptions: - "alarms/#" - "measurements/#" # M2M GPIO pin update topics: /// - "40092/malis/open-energy.at/1" - "40092/malis/open-energy.at/2" #- "+/malis/+/+" qos: 1 automaticReconnect: true cleanSession: false cleanStart: false rabbit: enabled: false brokerRef: "rabbit" queues: - "malis.ingress" concurrentConsumers: 1 maxConcurrentConsumers: 4 prefetch: 50 # ------------------------------------------------------------ # Parsers (ingress payload -> canonical internal model) # ------------------------------------------------------------ parsers: # Accept both JSON "alarm message" and full XML (migration helper) eebusAlarm: type: SMART_EEBUS_ALARM eebusAlarmDatagramXml: type: EEBUS_ALARM_DATAGRAM_XML eebusMeasurementDatagramXml: type: EEBUS_MEASUREMENT_DATAGRAM_XML jsonPorts8: type: JSON_PORTS_BOOLEAN config: portCount: 8 keyPattern: "port{n}" measurementPathPattern: "port.{n}.state" # Optional: if one topic may contain either ports-JSON or EEBUS measurements smartPortsOrEebusMeasurement: type: SMART_PORTS_OR_EEBUS_MEASUREMENT config: portCount: 8 keyPattern: "port{n}" measurementPathPattern: "port.{n}.state" # M2M GPIO pin updates: enrich by resolving site/department/pin name/bad state from ERPNext. m2mPinUpdateErpnext: type: M2M_PIN_UPDATE_JSON_ERPNEXT config: erpnext: baseUrl: "https://open-energy.erpnext.components.at" apiKey: "255960115a8adec" apiSecret: "0aa65b386bc7bbb" timeoutMs: 5000 # Optional overrides (if your doctypes/fields are renamed): # doctypes: # iotDevice: "IoT_Device" # site: "Site" # siteIotDevice: "Site_IoT_Device" # malisConfig: "MalIS Configuration" # malisConfigDetail: "MalIS Configuration Detail" # fields: # iotDevicePortAddress: "port_address" # siteIotDevicesAssigned: "iot_devices_assigned" # siteIotDeviceIotDevice: "iot_device" # siteIotDeviceMalis: "malis" # siteIotDeviceRemark: "remark" # siteIotDeviceFrom: "from" # siteIotDeviceTo: "to" # malisDetailPin: "pin" # malisDetailStream: "stream" # malisDetailStreamValue: "DigitalInput" # malisDetailPinName: "signal_name" # malisDetailDescription: "description" # malisDetailBad: "bad" # malisDetailDepartment: "department" # malisConfigNameField: "name" # malisConfigDetailsField: "details" cache: ttlSeconds: 60 maxSize: 20000 # Preload & periodically refresh config locally (recommended for scale): # - resolves (port,pin) fully in-memory during normal processing # - background refresh swaps an immutable snapshot atomically preload: enabled: true onStartup: true refreshSeconds: 60 initialDelaySeconds: 5 allowOnDemandFallback: true # ------------------------------------------------------------ # Processors (parser + detector in one step) # ------------------------------------------------------------ # A processor receives the raw ingress message (topic/path, headers, payload) and binding vars, # and directly returns detector events + optional enriched vars. # # Bindings using a processor must set: processorRef (and must NOT set parserRef/detectorRef). processors: # JS processor combining M2M pin parsing + ERPNext enrichment + bad-state alarm detection. # Script location is resolved relative to the working directory. m2mPinErpnextJs: type: JAVASCRIPT_PROCESSOR config: scriptPath: "./config/cep/scripts/m2m-pin-erpnext-processor.js" entryFunction: "process" hotReload: true timeoutMs: 0 # These map to PinBadStateDetector defaults. faultKeyTemplate: "${tenant}:${site}:${iotDevice}:PIN:${pin}" severity: "MAJOR" # ERPNext client used by JS via erp.lookup(port,pin,timestamp) erpnext: baseUrl: "https://open-energy.erpnext.components.at" apiKey: "255960115a8adec" apiSecret: "0aa65b386bc7bbb" timeoutMs: 5000 cache: ttlSeconds: 120 maxSize: 20000 preload: enabled: true onStartup: true refreshSeconds: 120 initialDelaySeconds: 5 allowOnDemandFallback: true # ------------------------------------------------------------ # Detectors (canonical model -> DETECTOR EVENTS (ALARM/CANCEL)) # ------------------------------------------------------------ detectors: passthroughEebusAlarm: type: PASSTHROUGH_ALARM externalM2MRest: type: EXTERNAL_REST_EVENTS config: baseUrl: "http://CHANGE-ME-IN-PROFILE" path: "/api/v1/alarms/m2m-alarmlogic-01" timeoutMs: 1500 requestFormat: "JSON_PORTS_LIST" responseFormat: "JSON_EVENTS" # Recommended variant: send full EEBUS SPINE-like XML to an external detector. externalEebusDatagramRest: type: EXTERNAL_REST_EVENTS config: baseUrl: "http://CHANGE-ME-IN-PROFILE" path: "/api/v1/alarms/eebus-logic" timeoutMs: 2000 requestFormat: "DATAGRAM_XML" responseFormat: "JSON_EVENTS" internalEebusExpression: type: EXPRESSION_RULES config: missingPolicy: FALSE # FALSE | SKIP | ERROR rules: - id: port1_open severity: MAJOR expression: "s['port.1.state'] == false" emitCancel: true # Simple pin state detector: compares payload.value against ERPNext malis configuration "bad". pinBadState: type: PIN_BAD_STATE config: severity: MAJOR faultKeyTemplate: "${tenant}:${site}:${iotDevice}:PIN:${pin}" # ------------------------------------------------------------ # Lifecycle (caller-managed state) # ------------------------------------------------------------ lifecyclePolicies: default: type: SIMPLE dedupWindowMs: 10000 debounceMs: 300 autoClearTtl: "PT0S" portsFlappy: type: SIMPLE dedupWindowMs: 10000 debounceMs: 1000 autoClearTtl: "PT30M" passthrough: type: SIMPLE mode: FORWARD_ALL dedupWindowMs: 0 debounceMs: 0 # Grouped/composed alarms. # Produces one group alarm per groupKeyTemplate and tracks multiple member sources via sourceKeyTemplate. # Emits UPDATE events (as action=ALARM + details.groupEvent=UPDATE) so receivers can see member changes. compositeDefault: type: COMPOSITE # If you want the group instance to be notified for *every* member signal (not only transitions), # set: mode: FORWARD_ALL # Example only � you must ensure these vars exist (via binding extract, parser vars, or detector details -> vars). groupKeyTemplate: "${site}|${department}|${function}|${faultDay}" sourceKeyTemplate: "${path}" emitUpdates: true updateThrottleMs: 0 closeGrace: "PT0S" idleTimeout: "PT0S" noNotificationTimeout: "PT0S" maxLifetime: "PT0S" sourceTtl: "PT0S" terminateEmitsCancel: true compositeTickIntervalMs: 30000 # ------------------------------------------------------------ # Outputs (where the CEP publishes resulting alarms) # ------------------------------------------------------------ outputs: mqttMalisEebus: type: MQTT format: EEBUS_ALARM_MESSAGE config: brokerRef: "primary" topicTemplate: "malis/alarms/${site}/${department}" qos: 1 retained: false # Output as a full EEBUS-like XML. mqttMalisEebusDatagramXml: type: MQTT format: EEBUS_ALARM_DATAGRAM_XML config: brokerRef: "primary" topicTemplate: "malis/alarms/xml/${site}/${department}" qos: 1 retained: false # These are consumed by EebusAlarmDatagramXmlFormatter addressSourceTemplate: "${deviceId}" addressDestinationTemplate: "malis-cep" # Optional: extra header/cmd fields for "fuller" datagrams headerExtras: specificationVersion: "3.1.0" cmdExtras: cmdId: "${faultKey}" cmdClassifier: alarm: "write" cancel: "delete" # Example RabbitMQ output (enable a rabbit broker in env/*.yml first) rabbitMalisEebus: type: RABBIT format: EEBUS_ALARM_MESSAGE config: brokerRef: "rabbit" exchange: "malis.alarms" routingKeyTemplate: "alarms.${site}.${department}" dlqOut: type: MQTT format: EEBUS_ALARM_MESSAGE config: brokerRef: "primary" topicTemplate: "malis/dlq" outboxMalisEebusDatagramXml: type: OUTBOX format: EEBUS_ALARM_DATAGRAM_XML config: correlationVar: requestId maxPerRequest: 200 maxTotal: 5000 outboxMalisFaultJson: type: OUTBOX format: MALIS_CEP_ALARM_JSON config: correlationVar: requestId maxPerRequest: 200 maxTotal: 5000 # REST/HTTP output example (only used if referenced by a binding): # Sends the formatted payload to an HTTP endpoint. # - bodyMode: RAW sends the formatter payload bytes directly. # - bodyMode: ENVELOPE_JSON sends a JSON wrapper with payloadBase64 + vars. restMalisEebus: type: REST format: EEBUS_ALARM_MESSAGE config: urlTemplate: "http://CHANGE-ME/api/malis/alarms/${tenant}/${site}" method: "POST" timeoutMs: 5000 connectTimeoutMs: 3000 bodyMode: "ENVELOPE_JSON" headers: X-Tenant: "${tenant}" X-Site: "${site}" X-Department: "${department}" # bearerTokenTemplate: "QRFs5WAd/SkdnrInPhgIioUysSC4XN5tYGgad6im8RBz8xKp2JKlnUhKd7OGoCDA" # basicAuth: # usernameTemplate: "${restUser}" # passwordTemplate: "${restPass}" maxErrorBodyChars: 2000 # MalIS Fault REST endpoint payload. # This uses a dedicated formatter (format: MALIS_FAULT_JSON) to produce the exact JSON shape expected # by MalIS fault endpoints. The REST publisher then sends it as RAW bytes. restMalisFault: type: REST format: MALIS_CEP_ALARM_JSON #MALIS_FAULT_JSON config: urlTemplate: "http://localhost:8080/api/v1/alarms" method: "POST" timeoutMs: 5000 connectTimeoutMs: 3000 bodyMode: "RAW" headers: X-Tenant: "${tenant}" X-Site: "${site}" X-Department: "${department}" bearerTokenTemplate: "QRFs5WAd/SkdnrInPhgIioUysSC4XN5tYGgad6im8RBz8xKp2JKlnUhKd7OGoCDA" # --- Formatter templates (optional) --- # siteIdTemplate: "${site}" # departmentTemplate: "${department}" # pinNameTemplate: "${pinName}" # remarkTemplate: "${remark}" # subjectTemplate: "${pinName}" priorityTemplate: "High" # or "${priority}"; if blank, derived from alarm.severity faultDayZone: "UTC" # Populate valueList by either: # - providing variables with prefix valueList.* (e.g. valueList.error_code) # - or defining templates here: valueList: timestamp_device: "${occurredAt}" # inverter_id: "${valueList.inverter_id}" # error_code: "${valueList.error_code}" includeScalarDetailsInValueList: true includeEmptyValueList: false maxErrorBodyChars: 2000 deadLetter: enabled: true outputRef: "dlqOut"