import httpx import json import re from datetime import datetime, timezone from typing import List, Dict, Any # Globals: EXTERNAL_BASE_URL, AUTH_HEADERS, PARAMS def make_graphql_request(query: str, variables: Dict = None) -> Dict: """Sichere GraphQL-Anfrage mit Fehlerbehandlung""" try: response = httpx.post( f"{EXTERNAL_BASE_URL}/graphql", headers=AUTH_HEADERS, json={"query": query, "variables": variables or {}}, timeout=30.0 ) response.raise_for_status() data = response.json() if "errors" in data and data["errors"]: error_msgs = [err.get("message", "Unbekannter Fehler") for err in data["errors"]] return {"success": False, "error": "; ".join(error_msgs)} return {"success": True, "data": data.get("data", {})} except httpx.TimeoutException: return {"success": False, "error": "Anfrage-Timeout nach 30 Sekunden"} except httpx.HTTPStatusError as e: return {"success": False, "error": f"HTTP-Fehler {e.response.status_code}: {e.response.text}"} except Exception as e: return {"success": False, "error": f"Unerwarteter Fehler: {str(e)}"} def search_sensors(query_text: str) -> List[Dict]: """Sucht Sensoren nach Name oder Nummer""" gql_query = """ query SearchSensors { sensors { id name nameExtern description measureConcept { id name description } } } """ response = make_graphql_request(gql_query) if not response["success"]: return [] sensors = response["data"].get("sensors", []) if not query_text: return sensors[:20] # Limit für Performance # Filter nach Name oder Number filtered = [] query_lower = query_text.lower() for sensor in sensors: name = (sensor.get("name") or "").strip().lower() name_extern = (sensor.get("nameExtern") or "").strip().lower() if query_lower in name or query_lower in name_extern: filtered.append(sensor) return filtered[:10] # Top 10 Treffer def get_sensor_variables(sensor_id: str) -> List[Dict]: """Holt verfügbare Variablen für einen Sensor""" gql_query = """ query GetSensorVariables($sensorId: ID!) { availableVariableUnits(sensorId: $sensorId) { variableUnitId variableName unitName } } """ response = make_graphql_request(gql_query, {"sensorId": sensor_id}) if not response["success"]: return [] return response["data"].get("availableVariableUnits", []) def get_last_observations(sensor_id: str, variable_name: str, limit: int = 10) -> List[Dict]: """Holt die letzten N Observations für einen Sensor""" # Da es keine direkte Query gibt, verwenden wir findObservation mit einem weiten Zeitraum gql_query = """ query GetLastObservations($measureConceptId: ID!, $sensorName: String!, $variableName: String!) { findObservation( measurementConceptId: $measureConceptId sensorName: $sensorName observationVariableNamePattern: $variableName startTime: "2020-01-01" endTime: "2030-12-31" ) { id moment meterValue observationVariableUnit { observationVariable { name } unit { name } } } } """ # Wir brauchen den MeasureConcept und SensorName sensor_query = f""" query GetSensor($sensorId: ID!) {{ sensor(id: $sensorId) {{ name measureConcept {{ id }} }} }} """ sensor_response = make_graphql_request(sensor_query, {"sensorId": sensor_id}) if not sensor_response["success"] or not sensor_response["data"].get("sensor"): return [] sensor = sensor_response["data"]["sensor"] sensor_name = (sensor.get("name") or "").strip() mc_id = sensor["measureConcept"]["id"] response = make_graphql_request(gql_query, { "measureConceptId": mc_id, "sensorName": sensor_name, "variableName": variable_name }) if not response["success"]: return [] observations = response["data"].get("findObservation", []) # Sortiere nach Zeitstempel absteigend und limitiere sorted_obs = sorted(observations, key=lambda x: x.get("moment", ""), reverse=True) return sorted_obs[:limit] def parse_ultimo_text(text: str) -> List[Dict]: """Parst Ultimo-Text-Eingabe zu strukturierten Daten""" lines = [line.strip() for line in text.split('\n') if line.strip()] readings = [] for line in lines: # Pattern: DD.MM.YYYY: Wert [Einheit] # Beispiele: 31.12.2025: 60,645 MWh oder 28.2.2026: 68,771 match = re.match(r'(\d{1,2})\.(\d{1,2})\.(\d{4})\s*:?\s*([\d.,]+)', line) if match: day, month, year, value_str = match.groups() # Wert parsen (Komma und Punkt als Dezimaltrennzeichen akzeptieren) value_str = value_str.replace(',', '.') try: value = float(value_str) # Für Ultimo nehmen wir immer Ende des Monats date_str = f"{year}-{month.zfill(2)}" readings.append({ "month": date_str, "meterValue": value }) except ValueError: continue # Ungültige Zahl ignorieren return readings def record_ultimo_readings(sensor_id: str, variable_name: str, variable_unit: str, readings: List[Dict]) -> Dict: """Führt Ultimo-Batch-Eingabe aus""" gql_query = """ mutation RecordUltimoReadings($input: UltimoReadingsInput!) { recordUltimoReadings(input: $input) { success created { id moment meterValue observationVariableUnit { observationVariable { name } unit { name } } } errors { code message details } } } """ input_data = { "sensorId": sensor_id, "variableName": variable_name, "variableUnit": variable_unit, "readings": readings } response = make_graphql_request(gql_query, {"input": input_data}) if not response["success"]: return {"success": False, "error": response["error"]} return response["data"].get("recordUltimoReadings", {}) def record_single_reading(sensor_id: str, moment: str, value: float, variable_name: str, variable_unit: str) -> Dict: """Führt Einzelwert-Eingabe aus""" gql_query = """ mutation RecordMeterReading($input: MeterReadingInput!) { recordMeterReading(input: $input) { success observation { id moment meterValue observationVariableUnit { observationVariable { name } unit { name } } } errors { code message details } } } """ input_data = { "sensorId": sensor_id, "moment": moment, "value": value, "variableName": variable_name, "variableUnit": variable_unit } response = make_graphql_request(gql_query, {"input": input_data}) if not response["success"]: return {"success": False, "error": response["error"]} return response["data"].get("recordMeterReading", {}) def format_datetime(dt_str: str) -> str: """Formatiert Datetime-String für Anzeige""" try: dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00')) return dt.strftime('%d.%m.%Y %H:%M') except: return dt_str def generate_html_response(success: bool, result: Dict, sensor_info: Dict, last_observations: List[Dict]) -> str: """Generiert HTML-Response""" # CSS Styles styles = """ """ # Header html = f""" {styles}

🔢 Ultimo-Zählerstand Eingabe

Gewählter Zähler:

{sensor_info.get('name', 'N/A').strip()}
ID: {sensor_info.get('id', 'N/A')}
MeasureConcept: {sensor_info.get('measureConcept', {}).get('name', 'N/A').strip()}
""" if success: html += '
' html += '

✅ Erfolgreich!

' if "created" in result and result["created"]: created_count = len(result["created"]) html += f'

{created_count} Zählerstände wurden erfolgreich gespeichert:

' html += '' elif "observation" in result and result["observation"]: obs = result["observation"] moment = format_datetime(obs.get("moment", "")) value = obs.get("meterValue", 0) unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip() html += f'

Neuer Zählerstand: {moment}: {value:,.3f} {unit}

' html += '
' # Zeige letzte 10 Werte if last_observations: html += '

📊 Letzte 10 Zählerstände

' html += '' html += '' for obs in last_observations[:10]: moment = format_datetime(obs.get("moment", "")) value = obs.get("meterValue", 0) unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip() variable = obs.get("observationVariableUnit", {}).get("observationVariable", {}).get("name", "").strip() html += f'' html += '
Datum/ZeitZählerstandEinheitVariable
{moment}{value:,.3f}{unit}{variable}
' else: html += '
⚠️ Keine vorherigen Zählerstände gefunden.
' else: html += '
' html += '

❌ Fehler bei der Eingabe

' if "errors" in result and result["errors"]: for error in result["errors"]: code = error.get("code", "UNKNOWN") message = error.get("message", "Unbekannter Fehler") details = error.get("details", "") html += f'
{code}
' html += f'

Fehler: {message}

' if details: html += f'
Details: {details}
' elif "error" in result: html += f'

{result["error"]}

' else: html += '

Unbekannter Fehler aufgetreten.

' html += '
' # Auch bei Fehler die letzten Werte zeigen (falls verfügbar) if last_observations: html += '

📊 Aktuelle Zählerstände (zur Information)

' html += '' html += '' for obs in last_observations[:5]: # Nur 5 bei Fehler moment = format_datetime(obs.get("moment", "")) value = obs.get("meterValue", 0) unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip() variable = obs.get("observationVariableUnit", {}).get("observationVariable", {}).get("name", "").strip() html += f'' html += '
Datum/ZeitZählerstandEinheitVariable
{moment}{value:,.3f}{unit}{variable}
' html += '
' return html # MAIN SCRIPT EXECUTION try: # Parameter validieren sensor_selection = PARAMS.get("sensor_selection", "").strip() variable_unit = PARAMS.get("variable_unit", "ACTIVE_ENERGY_DELIVERED_9|WH") input_method = PARAMS.get("input_method", "batch") if not sensor_selection: result = { "type": "html", "content": "

❌ Fehler

Bitte wählen Sie einen Zähler aus.

" } else: # Variable und Unit splitten try: variable_name, variable_unit_name = variable_unit.split('|') except ValueError: variable_name = "ACTIVE_ENERGY_DELIVERED_9" variable_unit_name = "WH" # Sensor finden sensors = search_sensors(sensor_selection) if not sensors: result = { "type": "html", "content": f"

❌ Zähler nicht gefunden

Kein Zähler gefunden für: {sensor_selection}

" } else: # Ersten Treffer verwenden (oder über ID matchen wenn vollständig) selected_sensor = sensors[0] sensor_id = selected_sensor["id"] if input_method == "batch": # Batch-Eingabe ultimo_text = PARAMS.get("ultimo_readings_text", "").strip() if not ultimo_text: result = { "type": "html", "content": "

❌ Fehler

Bitte geben Sie Ultimo-Stände ein.

" } else: readings = parse_ultimo_text(ultimo_text) if not readings: result = { "type": "html", "content": "

❌ Parsing-Fehler

Keine gültigen Zählerstände im Text gefunden.
Erwartetes Format: DD.MM.YYYY: Wert

" } else: # Ultimo-Batch ausführen batch_result = record_ultimo_readings(sensor_id, variable_name, variable_unit_name, readings) # Aktuelle Observations holen last_obs = get_last_observations(sensor_id, variable_name) # HTML generieren html_content = generate_html_response( batch_result.get("success", False), batch_result, selected_sensor, last_obs ) result = {"type": "html", "content": html_content} else: # Einzelwert-Eingabe single_date = PARAMS.get("single_date", "") single_value = PARAMS.get("single_value", "") if not single_date or not single_value: result = { "type": "html", "content": "

❌ Fehler

Bitte geben Sie Datum und Zählerstand ein.

" } else: try: # Datum parsen if 'T' not in single_date: single_date += 'T00:00:00' moment_dt = datetime.fromisoformat(single_date) if moment_dt.tzinfo is None: moment_dt = moment_dt.replace(tzinfo=timezone.utc) moment_iso = moment_dt.isoformat() # Wert parsen value_str = single_value.replace(',', '.') value = float(value_str) # Einzelwert-Eingabe ausführen single_result = record_single_reading(sensor_id, moment_iso, value, variable_name, variable_unit_name) # Aktuelle Observations holen last_obs = get_last_observations(sensor_id, variable_name) # HTML generieren html_content = generate_html_response( single_result.get("success", False), single_result, selected_sensor, last_obs ) result = {"type": "html", "content": html_content} except ValueError as e: result = { "type": "html", "content": f"

❌ Eingabe-Fehler

Ungültiges Datum oder Zählerstand: {str(e)}

" } except Exception as e: result = { "type": "html", "content": f"

❌ Unerwarteter Fehler

{str(e)}

" } except Exception as e: result = { "type": "html", "content": f"

❌ Script-Fehler

Unerwarteter Fehler beim Ausführen des Scripts: {str(e)}

" }