import httpx import json from datetime import datetime from typing import List, Dict, Any # GraphQL Query Templates SENSORS_QUERY = """ query SensorsForMeterNumber($meterNumber: String!) { sensorsForMeterNumber(meterNumber: $meterNumber) { sensorId sensorName sensorNameExtern descr measureConcept { id name descr } } } """ AVAILABLE_VARIABLES_QUERY = """ query AvailableVariableUnits($sensorId: ID!) { availableVariableUnits(sensorId: $sensorId) { variableUnitId variableName unitName } } """ LAST_OBSERVATION_QUERY = """ query LastObservation($sensorId: ID!, $variableName: String) { lastObservation(sensorId: $sensorId, variableName: $variableName) { id moment value meterValue observationVariableUnit { observationVariable { name } unit { name } } } } """ FIND_OBSERVATIONS_QUERY = """ query FindObservations($measurementConceptId: ID!, $sensorName: String, $observationVariableNamePattern: String, $startTime: String, $endTime: String) { findObservation( measurementConceptId: $measurementConceptId sensorName: $sensorName observationVariableNamePattern: $observationVariableNamePattern startTime: $startTime endTime: $endTime ) { id moment value meterValue observationVariableUnit { observationVariable { name } unit { name } } } } """ RECORD_ULTIMO_MUTATION = """ mutation RecordUltimoReadings($input: UltimoReadingsInput!) { recordUltimoReadings(input: $input) { success created { id moment meterValue observationVariableUnit { observationVariable { name } unit { name } } } errors { code message details } } } """ def execute_graphql_query(query: str, variables: Dict[str, Any] = None) -> Dict[str, Any]: """Execute GraphQL query""" response = httpx.post( f"{EXTERNAL_BASE_URL}/graphql", headers=AUTH_HEADERS, json={ "query": query, "variables": variables or {} }, timeout=30.0 ) if response.status_code != 200: raise Exception(f"HTTP {response.status_code}: {response.text}") result = response.json() if "errors" in result: raise Exception(f"GraphQL Error: {result['errors']}") return result["data"] def parse_readings(readings_text: str) -> List[Dict[str, Any]]: """Parse readings from text format YYYY-MM:value""" readings = [] for line in readings_text.strip().split('\n'): line = line.strip() if not line or ':' not in line: continue try: month_str, value_str = line.split(':', 1) month_str = month_str.strip() value = float(value_str.strip()) # Validate month format YYYY-MM datetime.strptime(month_str, '%Y-%m') readings.append({ "month": month_str, "meterValue": value }) except (ValueError, IndexError) as e: raise Exception(f"Ungültiges Format in Zeile '{line}': {e}") if not readings: raise Exception("Keine gültigen Readings gefunden") # Sort by month readings.sort(key=lambda x: x["month"]) return readings def format_observation_table(observations: List[Dict[str, Any]], title: str) -> str: """Format observations as HTML table""" if not observations: return f"
{title}: Keine Daten gefunden
" html = f"| Datum/Zeit | " html += "Zählerstand | " html += "Variable | " html += "Einheit |
|---|---|---|---|
| {moment} | " html += f"{meter_value} | " html += f"{var_name} | " html += f"{unit_name} |
⚠️ Mehrere Sensoren gefunden, verwende ersten: {sensor_name}
" # Get available variables variables_data = execute_graphql_query(AVAILABLE_VARIABLES_QUERY, {"sensorId": sensor_id}) available_variables = variables_data["availableVariableUnits"] # Check if requested variable exists variable_found = any(var["variableName"].strip() == variable_name for var in available_variables) if not variable_found: html_content += f"⚠️ Variable '{variable_name}' nicht verfügbar für diesen Sensor. Verfügbare Variablen:
Verwende stattdessen: {variable_name}
" # Record ultimo readings ultimo_input = { "sensorId": sensor_id, "variableName": variable_name, "readings": readings } ultimo_result = execute_graphql_query(RECORD_ULTIMO_MUTATION, {"input": ultimo_input}) result_data = ultimo_result["recordUltimoReadings"] # Show results if result_data["success"]: html_content += "⚠️ Aktueller Stand konnte nicht abgerufen werden: {e}
" # Get last 10 observations try: # Calculate date range for recent observations (last 2 years) end_time = datetime.now().strftime('%Y%m%d%H%M%S') start_time = datetime.now().replace(year=datetime.now().year - 2).strftime('%Y%m%d%H%M%S') recent_obs_data = execute_graphql_query(FIND_OBSERVATIONS_QUERY, { "measurementConceptId": measure_concept_id, "sensorName": sensor_name, "observationVariableNamePattern": variable_name, "startTime": start_time, "endTime": end_time }) observations = recent_obs_data["findObservation"] or [] # Sort by moment descending and take last 10 observations.sort(key=lambda x: x["moment"], reverse=True) recent_observations = observations[:10] if recent_observations: html_content += format_observation_table(recent_observations, "Letzte 10 Messwerte") else: html_content += "Letzte 10 Messwerte: Keine historischen Daten gefunden
" except Exception as e: html_content += f"⚠️ Historische Daten konnten nicht abgerufen werden: {e}
" # Add summary html_content += "Sensor: {sensor_name} (ID: {sensor_id})
" html_content += f"Variable: {variable_name}
" html_content += f"Eingetragene Stände: {len(readings)}
" if result_data["success"]: html_content += f"Erfolgreich erstellt: {len(result_data['created'])}
" if result_data["errors"]: html_content += f"Fehler: {len(result_data['errors'])}
" html_content += "Fehlermeldung: {str(e)}
" html_content += "Zählernummer: {PARAMS.get('meter_number', 'nicht gesetzt')}
" html_content += f"Variable: {PARAMS.get('variable_name', 'nicht gesetzt')}
" html_content += f"Readings Text:
{PARAMS.get('readings', 'nicht gesetzt')}"
html_content += "