import httpx import json from datetime import datetime from typing import List, Dict, Any # GraphQL Query Templates SENSORS_QUERY = """ query SensorsForMeterNumber($meterNumber: String!) { sensorsForMeterNumber(meterNumber: $meterNumber) { sensorId sensorName sensorNameExtern descr measureConcept { id name descr } } } """ AVAILABLE_VARIABLES_QUERY = """ query AvailableVariableUnits($sensorId: ID!) { availableVariableUnits(sensorId: $sensorId) { variableUnitId variableName unitName } } """ LAST_OBSERVATION_QUERY = """ query LastObservation($sensorId: ID!, $variableName: String) { lastObservation(sensorId: $sensorId, variableName: $variableName) { id moment value meterValue observationVariableUnit { observationVariable { name } unit { name } } } } """ FIND_OBSERVATIONS_QUERY = """ query FindObservations($measurementConceptId: ID!, $sensorName: String, $observationVariableNamePattern: String, $startTime: String, $endTime: String) { findObservation( measurementConceptId: $measurementConceptId sensorName: $sensorName observationVariableNamePattern: $observationVariableNamePattern startTime: $startTime endTime: $endTime ) { id moment value meterValue observationVariableUnit { observationVariable { name } unit { name } } } } """ RECORD_ULTIMO_MUTATION = """ mutation RecordUltimoReadings($input: UltimoReadingsInput!) { recordUltimoReadings(input: $input) { success created { id moment meterValue observationVariableUnit { observationVariable { name } unit { name } } } errors { code message details } } } """ def execute_graphql_query(query: str, variables: Dict[str, Any] = None) -> Dict[str, Any]: """Execute GraphQL query""" response = httpx.post( f"{EXTERNAL_BASE_URL}/graphql", headers=AUTH_HEADERS, json={ "query": query, "variables": variables or {} }, timeout=30.0 ) if response.status_code != 200: raise Exception(f"HTTP {response.status_code}: {response.text}") result = response.json() if "errors" in result: raise Exception(f"GraphQL Error: {result['errors']}") return result["data"] def parse_readings(readings_text: str) -> List[Dict[str, Any]]: """Parse readings from text format YYYY-MM:value""" readings = [] for line in readings_text.strip().split('\n'): line = line.strip() if not line or ':' not in line: continue try: month_str, value_str = line.split(':', 1) month_str = month_str.strip() value = float(value_str.strip()) # Validate month format YYYY-MM datetime.strptime(month_str, '%Y-%m') readings.append({ "month": month_str, "meterValue": value }) except (ValueError, IndexError) as e: raise Exception(f"Ungültiges Format in Zeile '{line}': {e}") if not readings: raise Exception("Keine gültigen Readings gefunden") # Sort by month readings.sort(key=lambda x: x["month"]) return readings def format_observation_table(observations: List[Dict[str, Any]], title: str) -> str: """Format observations as HTML table""" if not observations: return f"

{title}: Keine Daten gefunden

" html = f"

{title}

" html += "" html += "" html += "" html += "" html += "" html += "" html += "" for obs in observations: moment = datetime.fromisoformat(obs["moment"].replace('Z', '+00:00')).strftime('%d.%m.%Y %H:%M') meter_value = f"{obs['meterValue']:,.2f}" var_name = obs["observationVariableUnit"]["observationVariable"]["name"].strip() unit_name = obs["observationVariableUnit"]["unit"]["name"].strip() html += f"" html += f"" html += f"" html += f"" html += "
Datum/ZeitZählerstandVariableEinheit
{moment}{meter_value}{var_name}{unit_name}
" return html def format_errors(errors: List[Dict[str, Any]]) -> str: """Format error messages as HTML""" if not errors: return "" html = "
" html += "

❌ Fehler aufgetreten:

" return html try: # Parse parameters meter_number = PARAMS.get('meter_number', '').strip() variable_name = PARAMS.get('variable_name', 'ENERGY_INST_VAL').strip() readings_text = PARAMS.get('readings', '').strip() if not meter_number: raise Exception("Zählernummer ist erforderlich") if not readings_text: raise Exception("Ultimo-Stände sind erforderlich") # Parse readings readings = parse_readings(readings_text) # Find sensors for meter number sensors_data = execute_graphql_query(SENSORS_QUERY, {"meterNumber": meter_number}) sensors = sensors_data["sensorsForMeterNumber"] if not sensors: raise Exception(f"Keine Sensoren für Zählernummer '{meter_number}' gefunden") # Use first sensor if multiple found sensor = sensors[0] sensor_id = sensor["sensorId"] sensor_name = sensor["sensorName"].strip() measure_concept_id = sensor["measureConcept"]["id"] html_content = f"

Ultimo-Zählerstände für {sensor_name}

" if len(sensors) > 1: html_content += f"

⚠️ Mehrere Sensoren gefunden, verwende ersten: {sensor_name}

" # Get available variables variables_data = execute_graphql_query(AVAILABLE_VARIABLES_QUERY, {"sensorId": sensor_id}) available_variables = variables_data["availableVariableUnits"] # Check if requested variable exists variable_found = any(var["variableName"].strip() == variable_name for var in available_variables) if not variable_found: html_content += f"

⚠️ Variable '{variable_name}' nicht verfügbar für diesen Sensor. Verfügbare Variablen:

" # Use first available variable if available_variables: variable_name = available_variables[0]["variableName"].strip() html_content += f"

Verwende stattdessen: {variable_name}

" # Record ultimo readings ultimo_input = { "sensorId": sensor_id, "variableName": variable_name, "readings": readings } ultimo_result = execute_graphql_query(RECORD_ULTIMO_MUTATION, {"input": ultimo_input}) result_data = ultimo_result["recordUltimoReadings"] # Show results if result_data["success"]: html_content += "
" html_content += f"

✅ Erfolgreich {len(result_data['created'])} Ultimo-Stände eingetragen

" html_content += "
" # Show created observations if result_data["created"]: html_content += format_observation_table(result_data["created"], "Neu eingetragene Ultimo-Stände") # Show errors if any if result_data["errors"]: html_content += format_errors(result_data["errors"]) # Get current reading (last observation) try: last_obs_data = execute_graphql_query(LAST_OBSERVATION_QUERY, { "sensorId": sensor_id, "variableName": variable_name }) if last_obs_data["lastObservation"]: html_content += format_observation_table([last_obs_data["lastObservation"]], "Aktueller Zählerstand") except Exception as e: html_content += f"

⚠️ Aktueller Stand konnte nicht abgerufen werden: {e}

" # Get last 10 observations try: # Calculate date range for recent observations (last 2 years) end_time = datetime.now().strftime('%Y%m%d%H%M%S') start_time = datetime.now().replace(year=datetime.now().year - 2).strftime('%Y%m%d%H%M%S') recent_obs_data = execute_graphql_query(FIND_OBSERVATIONS_QUERY, { "measurementConceptId": measure_concept_id, "sensorName": sensor_name, "observationVariableNamePattern": variable_name, "startTime": start_time, "endTime": end_time }) observations = recent_obs_data["findObservation"] or [] # Sort by moment descending and take last 10 observations.sort(key=lambda x: x["moment"], reverse=True) recent_observations = observations[:10] if recent_observations: html_content += format_observation_table(recent_observations, "Letzte 10 Messwerte") else: html_content += "

Letzte 10 Messwerte: Keine historischen Daten gefunden

" except Exception as e: html_content += f"

⚠️ Historische Daten konnten nicht abgerufen werden: {e}

" # Add summary html_content += "
" html_content += f"

Zusammenfassung

" html_content += f"

Sensor: {sensor_name} (ID: {sensor_id})

" html_content += f"

Variable: {variable_name}

" html_content += f"

Eingetragene Stände: {len(readings)}

" if result_data["success"]: html_content += f"

Erfolgreich erstellt: {len(result_data['created'])}

" if result_data["errors"]: html_content += f"

Fehler: {len(result_data['errors'])}

" html_content += "
" except Exception as e: html_content = f"
" html_content += f"

❌ Fehler beim Eintragen der Ultimo-Stände

" html_content += f"

Fehlermeldung: {str(e)}

" html_content += "
" # Show debug info html_content += "
" html_content += "

Debug-Informationen

" html_content += f"

Zählernummer: {PARAMS.get('meter_number', 'nicht gesetzt')}

" html_content += f"

Variable: {PARAMS.get('variable_name', 'nicht gesetzt')}

" html_content += f"

Readings Text:

{PARAMS.get('readings', 'nicht gesetzt')}

" html_content += "
" result = { "type": "html", "content": f""" Ultimo-Zählerstände
{html_content}
""" }