From d855a5da5894fb48e2f128075e9b17419ed809d5 Mon Sep 17 00:00:00 2001 From: "martin.schweitzer" Date: Mon, 13 Apr 2026 14:09:53 +0000 Subject: [PATCH] deploy script: ultimo_zaehlerstand_eingabe --- scripts/ultimo_zaehlerstand_eingabe.py | 576 +++++++++++++++++++++++++ 1 file changed, 576 insertions(+) create mode 100644 scripts/ultimo_zaehlerstand_eingabe.py diff --git a/scripts/ultimo_zaehlerstand_eingabe.py b/scripts/ultimo_zaehlerstand_eingabe.py new file mode 100644 index 0000000..3cc8356 --- /dev/null +++ b/scripts/ultimo_zaehlerstand_eingabe.py @@ -0,0 +1,576 @@ +import httpx +import json +import re +from datetime import datetime, timezone +from typing import List, Dict, Any + +# Globals: EXTERNAL_BASE_URL, AUTH_HEADERS, PARAMS + +def make_graphql_request(query: str, variables: Dict = None) -> Dict: + """Sichere GraphQL-Anfrage mit Fehlerbehandlung""" + try: + response = httpx.post( + f"{EXTERNAL_BASE_URL}/graphql", + headers=AUTH_HEADERS, + json={"query": query, "variables": variables or {}}, + timeout=30.0 + ) + response.raise_for_status() + data = response.json() + + if "errors" in data and data["errors"]: + error_msgs = [err.get("message", "Unbekannter Fehler") for err in data["errors"]] + return {"success": False, "error": "; ".join(error_msgs)} + + return {"success": True, "data": data.get("data", {})} + except httpx.TimeoutException: + return {"success": False, "error": "Anfrage-Timeout nach 30 Sekunden"} + except httpx.HTTPStatusError as e: + return {"success": False, "error": f"HTTP-Fehler {e.response.status_code}: {e.response.text}"} + except Exception as e: + return {"success": False, "error": f"Unerwarteter Fehler: {str(e)}"} + +def search_sensors(query_text: str) -> List[Dict]: + """Sucht Sensoren nach Name oder Nummer""" + gql_query = """ + query SearchSensors { + sensors { + id + name + nameExtern + description + measureConcept { + id + name + description + } + } + } + """ + + response = make_graphql_request(gql_query) + if not response["success"]: + return [] + + sensors = response["data"].get("sensors", []) + if not query_text: + return sensors[:20] # Limit für Performance + + # Filter nach Name oder Number + filtered = [] + query_lower = query_text.lower() + for sensor in sensors: + name = (sensor.get("name") or "").strip().lower() + name_extern = (sensor.get("nameExtern") or "").strip().lower() + if query_lower in name or query_lower in name_extern: + filtered.append(sensor) + + return filtered[:10] # Top 10 Treffer + +def get_sensor_variables(sensor_id: str) -> List[Dict]: + """Holt verfügbare Variablen für einen Sensor""" + gql_query = """ + query GetSensorVariables($sensorId: ID!) { + availableVariableUnits(sensorId: $sensorId) { + variableUnitId + variableName + unitName + } + } + """ + + response = make_graphql_request(gql_query, {"sensorId": sensor_id}) + if not response["success"]: + return [] + + return response["data"].get("availableVariableUnits", []) + +def get_last_observations(sensor_id: str, variable_name: str, limit: int = 10) -> List[Dict]: + """Holt die letzten N Observations für einen Sensor""" + # Da es keine direkte Query gibt, verwenden wir findObservation mit einem weiten Zeitraum + gql_query = """ + query GetLastObservations($measureConceptId: ID!, $sensorName: String!, $variableName: String!) { + findObservation( + measurementConceptId: $measureConceptId + sensorName: $sensorName + observationVariableNamePattern: $variableName + startTime: "2020-01-01" + endTime: "2030-12-31" + ) { + id + moment + meterValue + observationVariableUnit { + observationVariable { + name + } + unit { + name + } + } + } + } + """ + + # Wir brauchen den MeasureConcept und SensorName + sensor_query = f""" + query GetSensor($sensorId: ID!) {{ + sensor(id: $sensorId) {{ + name + measureConcept {{ + id + }} + }} + }} + """ + + sensor_response = make_graphql_request(sensor_query, {"sensorId": sensor_id}) + if not sensor_response["success"] or not sensor_response["data"].get("sensor"): + return [] + + sensor = sensor_response["data"]["sensor"] + sensor_name = (sensor.get("name") or "").strip() + mc_id = sensor["measureConcept"]["id"] + + response = make_graphql_request(gql_query, { + "measureConceptId": mc_id, + "sensorName": sensor_name, + "variableName": variable_name + }) + + if not response["success"]: + return [] + + observations = response["data"].get("findObservation", []) + # Sortiere nach Zeitstempel absteigend und limitiere + sorted_obs = sorted(observations, key=lambda x: x.get("moment", ""), reverse=True) + return sorted_obs[:limit] + +def parse_ultimo_text(text: str) -> List[Dict]: + """Parst Ultimo-Text-Eingabe zu strukturierten Daten""" + lines = [line.strip() for line in text.split('\n') if line.strip()] + readings = [] + + for line in lines: + # Pattern: DD.MM.YYYY: Wert [Einheit] + # Beispiele: 31.12.2025: 60,645 MWh oder 28.2.2026: 68,771 + match = re.match(r'(\d{1,2})\.(\d{1,2})\.(\d{4})\s*:?\s*([\d.,]+)', line) + if match: + day, month, year, value_str = match.groups() + + # Wert parsen (Komma und Punkt als Dezimaltrennzeichen akzeptieren) + value_str = value_str.replace(',', '.') + try: + value = float(value_str) + # Für Ultimo nehmen wir immer Ende des Monats + date_str = f"{year}-{month.zfill(2)}" + readings.append({ + "month": date_str, + "meterValue": value + }) + except ValueError: + continue # Ungültige Zahl ignorieren + + return readings + +def record_ultimo_readings(sensor_id: str, variable_name: str, variable_unit: str, readings: List[Dict]) -> Dict: + """Führt Ultimo-Batch-Eingabe aus""" + gql_query = """ + mutation RecordUltimoReadings($input: UltimoReadingsInput!) { + recordUltimoReadings(input: $input) { + success + created { + id + moment + meterValue + observationVariableUnit { + observationVariable { + name + } + unit { + name + } + } + } + errors { + code + message + details + } + } + } + """ + + input_data = { + "sensorId": sensor_id, + "variableName": variable_name, + "variableUnit": variable_unit, + "readings": readings + } + + response = make_graphql_request(gql_query, {"input": input_data}) + if not response["success"]: + return {"success": False, "error": response["error"]} + + return response["data"].get("recordUltimoReadings", {}) + +def record_single_reading(sensor_id: str, moment: str, value: float, variable_name: str, variable_unit: str) -> Dict: + """Führt Einzelwert-Eingabe aus""" + gql_query = """ + mutation RecordMeterReading($input: MeterReadingInput!) { + recordMeterReading(input: $input) { + success + observation { + id + moment + meterValue + observationVariableUnit { + observationVariable { + name + } + unit { + name + } + } + } + errors { + code + message + details + } + } + } + """ + + input_data = { + "sensorId": sensor_id, + "moment": moment, + "value": value, + "variableName": variable_name, + "variableUnit": variable_unit + } + + response = make_graphql_request(gql_query, {"input": input_data}) + if not response["success"]: + return {"success": False, "error": response["error"]} + + return response["data"].get("recordMeterReading", {}) + +def format_datetime(dt_str: str) -> str: + """Formatiert Datetime-String für Anzeige""" + try: + dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00')) + return dt.strftime('%d.%m.%Y %H:%M') + except: + return dt_str + +def generate_html_response(success: bool, result: Dict, sensor_info: Dict, last_observations: List[Dict]) -> str: + """Generiert HTML-Response""" + + # CSS Styles + styles = """ + + """ + + # Header + html = f""" + {styles} +
+
+

🔢 Ultimo-Zählerstand Eingabe

+
+

Gewählter Zähler:

+ {sensor_info.get('name', 'N/A').strip()}
+ ID: {sensor_info.get('id', 'N/A')}
+ MeasureConcept: {sensor_info.get('measureConcept', {}).get('name', 'N/A').strip()} +
+
+ """ + + if success: + html += '
' + html += '

✅ Erfolgreich!

' + + if "created" in result and result["created"]: + created_count = len(result["created"]) + html += f'

{created_count} Zählerstände wurden erfolgreich gespeichert:

' + html += '
    ' + for obs in result["created"]: + moment = format_datetime(obs.get("moment", "")) + value = obs.get("meterValue", 0) + unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip() + html += f'
  • {moment}: {value:,.3f} {unit}
  • ' + html += '
' + elif "observation" in result and result["observation"]: + obs = result["observation"] + moment = format_datetime(obs.get("moment", "")) + value = obs.get("meterValue", 0) + unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip() + html += f'

Neuer Zählerstand: {moment}: {value:,.3f} {unit}

' + + html += '
' + + # Zeige letzte 10 Werte + if last_observations: + html += '

📊 Letzte 10 Zählerstände

' + html += '' + html += '' + + for obs in last_observations[:10]: + moment = format_datetime(obs.get("moment", "")) + value = obs.get("meterValue", 0) + unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip() + variable = obs.get("observationVariableUnit", {}).get("observationVariable", {}).get("name", "").strip() + html += f'' + + html += '
Datum/ZeitZählerstandEinheitVariable
{moment}{value:,.3f}{unit}{variable}
' + else: + html += '
⚠️ Keine vorherigen Zählerstände gefunden.
' + + else: + html += '
' + html += '

❌ Fehler bei der Eingabe

' + + if "errors" in result and result["errors"]: + for error in result["errors"]: + code = error.get("code", "UNKNOWN") + message = error.get("message", "Unbekannter Fehler") + details = error.get("details", "") + + html += f'
{code}
' + html += f'

Fehler: {message}

' + if details: + html += f'
Details: {details}
' + elif "error" in result: + html += f'

{result["error"]}

' + else: + html += '

Unbekannter Fehler aufgetreten.

' + + html += '
' + + # Auch bei Fehler die letzten Werte zeigen (falls verfügbar) + if last_observations: + html += '

📊 Aktuelle Zählerstände (zur Information)

' + html += '' + html += '' + + for obs in last_observations[:5]: # Nur 5 bei Fehler + moment = format_datetime(obs.get("moment", "")) + value = obs.get("meterValue", 0) + unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip() + variable = obs.get("observationVariableUnit", {}).get("observationVariable", {}).get("name", "").strip() + html += f'' + + html += '
Datum/ZeitZählerstandEinheitVariable
{moment}{value:,.3f}{unit}{variable}
' + + html += '
' + return html + +# MAIN SCRIPT EXECUTION +try: + # Parameter validieren + sensor_selection = PARAMS.get("sensor_selection", "").strip() + variable_unit = PARAMS.get("variable_unit", "ACTIVE_ENERGY_DELIVERED_9|WH") + input_method = PARAMS.get("input_method", "batch") + + if not sensor_selection: + result = { + "type": "html", + "content": "

❌ Fehler

Bitte wählen Sie einen Zähler aus.

" + } + else: + # Variable und Unit splitten + try: + variable_name, variable_unit_name = variable_unit.split('|') + except ValueError: + variable_name = "ACTIVE_ENERGY_DELIVERED_9" + variable_unit_name = "WH" + + # Sensor finden + sensors = search_sensors(sensor_selection) + if not sensors: + result = { + "type": "html", + "content": f"

❌ Zähler nicht gefunden

Kein Zähler gefunden für: {sensor_selection}

" + } + else: + # Ersten Treffer verwenden (oder über ID matchen wenn vollständig) + selected_sensor = sensors[0] + sensor_id = selected_sensor["id"] + + if input_method == "batch": + # Batch-Eingabe + ultimo_text = PARAMS.get("ultimo_readings_text", "").strip() + if not ultimo_text: + result = { + "type": "html", + "content": "

❌ Fehler

Bitte geben Sie Ultimo-Stände ein.

" + } + else: + readings = parse_ultimo_text(ultimo_text) + if not readings: + result = { + "type": "html", + "content": "

❌ Parsing-Fehler

Keine gültigen Zählerstände im Text gefunden.
Erwartetes Format: DD.MM.YYYY: Wert

" + } + else: + # Ultimo-Batch ausführen + batch_result = record_ultimo_readings(sensor_id, variable_name, variable_unit_name, readings) + + # Aktuelle Observations holen + last_obs = get_last_observations(sensor_id, variable_name) + + # HTML generieren + html_content = generate_html_response( + batch_result.get("success", False), + batch_result, + selected_sensor, + last_obs + ) + + result = {"type": "html", "content": html_content} + + else: + # Einzelwert-Eingabe + single_date = PARAMS.get("single_date", "") + single_value = PARAMS.get("single_value", "") + + if not single_date or not single_value: + result = { + "type": "html", + "content": "

❌ Fehler

Bitte geben Sie Datum und Zählerstand ein.

" + } + else: + try: + # Datum parsen + if 'T' not in single_date: + single_date += 'T00:00:00' + moment_dt = datetime.fromisoformat(single_date) + if moment_dt.tzinfo is None: + moment_dt = moment_dt.replace(tzinfo=timezone.utc) + moment_iso = moment_dt.isoformat() + + # Wert parsen + value_str = single_value.replace(',', '.') + value = float(value_str) + + # Einzelwert-Eingabe ausführen + single_result = record_single_reading(sensor_id, moment_iso, value, variable_name, variable_unit_name) + + # Aktuelle Observations holen + last_obs = get_last_observations(sensor_id, variable_name) + + # HTML generieren + html_content = generate_html_response( + single_result.get("success", False), + single_result, + selected_sensor, + last_obs + ) + + result = {"type": "html", "content": html_content} + + except ValueError as e: + result = { + "type": "html", + "content": f"

❌ Eingabe-Fehler

Ungültiges Datum oder Zählerstand: {str(e)}

" + } + except Exception as e: + result = { + "type": "html", + "content": f"

❌ Unerwarteter Fehler

{str(e)}

" + } + +except Exception as e: + result = { + "type": "html", + "content": f"

❌ Script-Fehler

Unerwarteter Fehler beim Ausführen des Scripts: {str(e)}

" + }