You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

204 lines
6.3 KiB
JavaScript

/**
* JS Processor: M2M pin update + ERPNext lookup + bad-state alarm detection.
*
* Combines the behavior of:
* - M2mPinUpdateJsonErpnextParser.java (JSON parse + ERPNext enrich vars)
* - PinBadStateDetector.java (value == bad -> ALARM else CANCEL)
*
* Host objects provided by Java:
* - erp.lookup(port, pin, isoTimestamp) -> { siteId, department, pinName, description, remark, bad, iotDevice, ... }
* - tpl.expand(template, vars) -> expands ${var} placeholders
*
* Entry function must be named 'process' (or configured via entryFunction).
*/
function process(input, cfg) {
// input: { inType, path, headers, payloadText, payloadBase64, vars }
// cfg: processor config map
const varsIn = input && input.vars ? input.vars : {};
const varsOut = {}; // delta only
let msg;
try {
msg = JSON.parse(input.payloadText || "{}");
} catch (e) {
// invalid JSON -> drop to DLQ by throwing
throw new Error("Invalid JSON payload: " + e);
}
// Fields (match Java parser)
const gpio = str(msg.gpio);
const channel = str(msg.channel);
const value = (msg.value === undefined || msg.value === null) ? null : Number(msg.value);
const reason = str(msg.reason);
const m2mport = str(msg.m2mport);
// Timestamp: payload.timestamp -> mqtt.sentAt -> now
const ts = normalizeTs(str(msg.timestamp) || (msg.mqtt && str(msg.mqtt.sentAt)) || new Date().toISOString());
// Resolve port/pin from topic vars
// in topic we have deviceId in place of port, so add it to variables explicitly
const port = str(varsIn.deviceId);
put(varsOut, 'port', port);
const pin = (varsIn.pin === undefined || varsIn.pin === null) ? null : Number(varsIn.pin);
// ERPNext enrich
let erpCtx = null;
if (port && pin !== null && pin !== undefined && !Number.isNaN(pin) && typeof erp !== 'undefined' && erp) {
erpCtx = erp.lookup(port, pin, ts);
}
if (erpCtx) {
// Align var names with Java parser
put(varsOut, 'iotDevice', erpCtx.iotDevice);
put(varsOut, 'deviceId', erpCtx.deviceId || erpCtx.iotDevice);
put(varsOut, 'site', erpCtx.siteId);
put(varsOut, 'remark', erpCtx.remark);
put(varsOut, 'department', erpCtx.department);
put(varsOut, 'pinName', erpCtx.pinName);
put(varsOut, 'pinDescription', erpCtx.description);
put(varsOut, 'remark', erpCtx.remark);
if (erpCtx.bad !== undefined && erpCtx.bad !== null) {
put(varsOut, 'pinBad', erpCtx.bad);
}
// Optional convenience vars for grouping
put(varsOut, 'function', erpCtx.pinName);
}
// Always expose payload hint vars (match Java parser)
if (value !== null && value !== undefined && !Number.isNaN(value)) put(varsOut, 'pinValue', String(value));
if (reason) put(varsOut, 'pinReason', reason);
if (gpio) put(varsOut, 'gpio', gpio);
if (channel) put(varsOut, 'channel', channel);
if (m2mport) put(varsOut, 'm2mport', m2mport);
put(varsOut, 'timestamp', ts);
// faultDay for composite grouping (YYYY-MM-DD)
if (ts && ts.length >= 10) put(varsOut, 'faultDay', ts.substring(0, 10));
// Need pinBad to decide alarm; if missing, throw (consistent with Java detector)
const pinBadStr = str(first(varsOut.pinBad, varsIn.pinBad));
if (!pinBadStr) {
throw new Error("Missing vars.pinBad - ERPNext enrichment did not provide bad state for port=" + port + " pin=" + pin);
}
const pinBad = Number(pinBadStr);
if (Number.isNaN(pinBad)) {
throw new Error("Invalid vars.pinBad: " + pinBadStr);
}
// Decide event type
let action = 'CANCEL';
if (value !== null && value !== undefined && !Number.isNaN(value)) {
action = (Number(value) === Number(pinBad)) ? 'ALARM' : 'CANCEL';
} else {
// no value -> no event
return { drop: true };
}
// Build a merged vars map for templates
const mergedVars = Object.assign({}, varsIn, varsOut);
// faultKey from template
const faultKeyTemplate = (cfg && cfg.faultKeyTemplate) ? String(cfg.faultKeyTemplate) : "${tenant}:${site}:${iotDevice}:PIN:${pin}";
let faultKey;
if (typeof tpl !== 'undefined' && tpl && tpl.expand) {
faultKey = tpl.expand(faultKeyTemplate, mergedVars);
} else {
faultKey = fallbackExpand(faultKeyTemplate, mergedVars);
}
if (!faultKey || !faultKey.trim()) {
faultKey = "PIN:" + (mergedVars.pin || 'unknown');
}
const severity = (cfg && cfg.severity) ? String(cfg.severity) : 'MAJOR';
// Build details (match Java detector)
const details = {
gpio: gpio,
channel: channel,
value: value,
bad: pinBad,
reason: reason,
m2mport: m2mport,
mqtt: (msg.mqtt && typeof msg.mqtt === 'object') ? msg.mqtt : undefined,
pinName: mergedVars.pinName,
pinDescription: mergedVars.pinDescription,
remark: mergedVars.remark
};
// Remove undefined properties
Object.keys(details).forEach(k => { if (details[k] === undefined || details[k] === null || details[k] === "") delete details[k]; });
return {
vars: varsOut,
events: [
{
action: action,
faultKey: faultKey,
severity: severity,
occurredAt: ts,
details: details
}
]
};
}
function str(v) {
if (v === undefined || v === null) return null;
const s = String(v).trim();
return s.length ? s : null;
}
function put(obj, k, v) {
const s = str(v);
if (s) obj[k] = s;
}
function first(a, b) {
return (a !== undefined && a !== null && String(a).trim() !== '') ? a : b;
}
function normalizeTs(s) {
// best-effort normalization to ISO-8601 instant string
const t = str(s);
if (!t) return new Date().toISOString();
// if already ISO-ish, keep
return t;
}
function fallbackExpand(tplStr, vars) {
// minimal ${k} replacement
return String(tplStr).replace(/\$\{([^}]+)\}/g, (_, k) => {
const v = vars[k];
return (v === undefined || v === null) ? '' : String(v);
});
}
function dump(obj) {
if (obj == null) return "null";
// If it's a JS object / ProxyObject
try {
return JSON.stringify(obj);
} catch (e) { /* ignore */ }
// If it's a Java Map-like (has entrySet)
try {
if (typeof obj.entrySet === "function") {
const it = obj.entrySet().iterator();
const out = {};
while (it.hasNext()) {
const e = it.next();
out[String(e.getKey())] = e.getValue();
}
return JSON.stringify(out);
}
} catch (e) { /* ignore */ }
// Fallback: best-effort string
try { return String(obj); } catch (e) { return "[unprintable]"; }
}