diff --git a/scripts/ultimo_meter_readings.py b/scripts/ultimo_meter_readings.py new file mode 100644 index 0000000..c88adbc --- /dev/null +++ b/scripts/ultimo_meter_readings.py @@ -0,0 +1,835 @@ +import json +import httpx +import re +from datetime import datetime +import traceback + +def main(): + try: + # Check if we have enough parameters to proceed + search_term = PARAMS.get('search_term', '').strip() + sensor_id = PARAMS.get('sensor_id', '').strip() + + # Phase 1: If no sensor is selected, show sensor selection + if not sensor_id and not search_term: + return {"type": "error", "message": "Bitte geben Sie entweder einen Suchbegriff oder eine Sensor-ID ein."} + + # Phase 2: If we have search term but no sensor_id, search for sensors + if search_term and not sensor_id: + sensors = search_sensors(search_term) + if not sensors: + return {"type": "error", "message": f"Keine Sensoren gefunden für Suchbegriff '{search_term}'."} + + # Create dynamic form with found sensors + sensor_options = [] + for sensor in sensors: + display_name = f"{sensor['sensorName'].strip()} (ID: {sensor['sensorId']})" + if sensor.get('sensorNameExtern') and sensor['sensorNameExtern'].strip() != '-': + display_name += f" - {sensor['sensorNameExtern'].strip()}" + if sensor['measureConcept']['name']: + display_name += f" [{sensor['measureConcept']['name'].strip()}]" + + sensor_options.append({ + "value": sensor['sensorId'], + "label": display_name + }) + + dynamic_form = { + "title": "Sensor auswählen und Ultimo-Stände eingeben", + "description": f"Gefundene Sensoren für '{search_term}'. Wählen Sie einen Sensor aus und geben Sie die Ultimo-Stände ein.", + "layout": "sections", + "fields": [ + { + "name": "sensor_id", + "widget": "dropdown", + "label": "Sensor auswählen", + "options": sensor_options, + "validators": [{"type": "required"}] + }, + { + "name": "variable_name", + "widget": "text_field", + "label": "Variable Name (optional)", + "hint_text": "Leer lassen für Standard-Variable", + "helper_text": "Z.B. ACTIVE_ENERGY_ABSORBED_10, ENERGY_INST_VAL" + }, + { + "name": "variable_unit", + "widget": "text_field", + "label": "Einheit (optional)", + "hint_text": "Leer lassen für Standard-Einheit", + "helper_text": "Z.B. WH, KWH" + }, + { + "name": "readings_input", + "widget": "text_field", + "label": "Zählerstände", + "hint_text": "Format: YYYY-MM:Wert (pro Zeile) oder YYYY-MM:Wert,YYYY-MM:Wert", + "helper_text": "Beispiel: 2024-01:1000.5 oder 2024-01:1000,2024-02:1100", + "text_field_config": { + "keyboard_type": "multiline", + "max_lines": 10, + "min_lines": 2 + }, + "validators": [ + {"type": "required", "error_text": "Zählerstände sind erforderlich"} + ] + } + ], + "sections": [ + { + "title": "Sensor", + "icon": "sensors", + "field_names": ["sensor_id"] + }, + { + "title": "Variable & Einheit", + "icon": "settings", + "field_names": ["variable_name", "variable_unit"] + }, + { + "title": "Zählerstände eingeben", + "icon": "edit", + "field_names": ["readings_input"] + } + ] + } + + return {"type": "form", "form_definition": dynamic_form} + + # Phase 3: We have sensor_id, process the readings + if not sensor_id: + return {"type": "error", "message": "Sensor-ID ist erforderlich."} + + readings_input = PARAMS.get('readings_input', '').strip() + if not readings_input: + return {"type": "error", "message": "Zählerstände sind erforderlich."} + + # Parse readings input + readings = parse_readings_input(readings_input) + if not readings: + return {"type": "error", "message": "Keine gültigen Zählerstände gefunden. Format: YYYY-MM:Wert"} + + # Get sensor info and available variables + sensor_info = get_sensor_info(sensor_id) + if not sensor_info: + return {"type": "error", "message": f"Sensor mit ID {sensor_id} nicht gefunden."} + + # Get available variables for the sensor + available_variables = get_available_variables(sensor_id) + + # Determine variable and unit to use + variable_name = PARAMS.get('variable_name', '').strip() + variable_unit = PARAMS.get('variable_unit', '').strip() + + if not variable_name and available_variables: + # Use first available variable as default + variable_name = available_variables[0]['variableName'].strip() + variable_unit = available_variables[0]['unitName'].strip() + + # Record the readings + result = record_ultimo_readings(sensor_id, variable_name, variable_unit, readings) + + # Generate HTML report + html_content = generate_html_report(sensor_info, variable_name, variable_unit, readings, result, available_variables) + + return {"type": "html", "content": html_content} + + except Exception as e: + return {"type": "error", "message": f"Fehler: {str(e)}\n\nDetails: {traceback.format_exc()}"} + +def search_sensors(search_term): + """Search for sensors by meter number""" + try: + # First try sensorsForMeterNumber + query = ''' + query SearchSensors($meterNumber: String!) { + sensorsForMeterNumber(meterNumber: $meterNumber) { + sensorId + sensorName + sensorNameExtern + descr + measureConcept { + id + name + descr + } + } + } + ''' + + with httpx.Client() as client: + response = client.post( + f"{EXTERNAL_BASE_URL}/graphql", + headers=AUTH_HEADERS, + json={"query": query, "variables": {"meterNumber": search_term}} + ) + + if response.status_code == 200: + data = response.json() + if 'errors' not in data: + sensors = data.get('data', {}).get('sensorsForMeterNumber', []) + if sensors: + return sensors + + # If no results, try general sensor search + query = ''' + query GetAllSensors { + sensors { + id + name + nameExtern + description + measureConcept { + id + name + description + } + } + } + ''' + + with httpx.Client() as client: + response = client.post( + f"{EXTERNAL_BASE_URL}/graphql", + headers=AUTH_HEADERS, + json={"query": query} + ) + + if response.status_code == 200: + data = response.json() + if 'errors' not in data: + all_sensors = data.get('data', {}).get('sensors', []) + # Filter sensors that match search term + filtered_sensors = [] + for sensor in all_sensors: + if (search_term.lower() in sensor.get('name', '').lower() or + search_term.lower() in sensor.get('nameExtern', '').lower() or + search_term.lower() in sensor.get('description', '').lower()): + + filtered_sensors.append({ + 'sensorId': sensor['id'], + 'sensorName': sensor['name'], + 'sensorNameExtern': sensor.get('nameExtern'), + 'descr': sensor.get('description'), + 'measureConcept': { + 'id': sensor['measureConcept']['id'], + 'name': sensor['measureConcept']['name'], + 'descr': sensor['measureConcept'].get('description') + } + }) + + return filtered_sensors[:20] # Limit to first 20 results + + return [] + + except Exception as e: + print(f"Error searching sensors: {e}") + return [] + +def get_sensor_info(sensor_id): + """Get detailed sensor information""" + try: + query = ''' + query GetSensor($id: ID!) { + sensor(id: $id) { + id + name + nameExtern + description + measureConcept { + id + name + description + } + } + } + ''' + + with httpx.Client() as client: + response = client.post( + f"{EXTERNAL_BASE_URL}/graphql", + headers=AUTH_HEADERS, + json={"query": query, "variables": {"id": sensor_id}} + ) + + if response.status_code == 200: + data = response.json() + if 'errors' not in data: + return data.get('data', {}).get('sensor') + + return None + + except Exception as e: + print(f"Error getting sensor info: {e}") + return None + +def get_available_variables(sensor_id): + """Get available variables and units for a sensor""" + try: + query = ''' + query GetAvailableVariables($sensorId: ID!) { + availableVariableUnits(sensorId: $sensorId) { + variableUnitId + variableName + unitName + } + } + ''' + + with httpx.Client() as client: + response = client.post( + f"{EXTERNAL_BASE_URL}/graphql", + headers=AUTH_HEADERS, + json={"query": query, "variables": {"sensorId": sensor_id}} + ) + + if response.status_code == 200: + data = response.json() + if 'errors' not in data: + return data.get('data', {}).get('availableVariableUnits', []) + + return [] + + except Exception as e: + print(f"Error getting available variables: {e}") + return [] + +def parse_readings_input(readings_input): + """Parse the readings input string into a list of readings""" + readings = [] + + # Support both line-by-line and comma-separated format + lines = readings_input.replace(',', '\n').split('\n') + + for line in lines: + line = line.strip() + if not line: + continue + + # Expected format: YYYY-MM:value + if ':' not in line: + continue + + parts = line.split(':', 1) + if len(parts) != 2: + continue + + month_str = parts[0].strip() + value_str = parts[1].strip() + + # Validate month format YYYY-MM + if not re.match(r'^\d{4}-\d{2}$', month_str): + continue + + try: + value = float(value_str) + readings.append({ + 'month': month_str, + 'meterValue': value + }) + except ValueError: + continue + + # Sort readings by month + readings.sort(key=lambda x: x['month']) + + return readings + +def record_ultimo_readings(sensor_id, variable_name, variable_unit, readings): + """Record ultimo readings using GraphQL mutation""" + try: + mutation = ''' + mutation RecordUltimoReadings($input: UltimoReadingsInput!) { + recordUltimoReadings(input: $input) { + success + errors { + code + message + details + } + created { + id + moment + value + meterValue + } + } + } + ''' + + input_data = { + 'sensorId': sensor_id, + 'readings': readings + } + + if variable_name: + input_data['variableName'] = variable_name + if variable_unit: + input_data['variableUnit'] = variable_unit + + with httpx.Client() as client: + response = client.post( + f"{EXTERNAL_BASE_URL}/graphql", + headers=AUTH_HEADERS, + json={"query": mutation, "variables": {"input": input_data}} + ) + + if response.status_code == 200: + data = response.json() + if 'errors' in data: + return {'success': False, 'errors': [{'message': str(data['errors'])}], 'created': []} + return data.get('data', {}).get('recordUltimoReadings', {'success': False, 'errors': [], 'created': []}) + else: + return {'success': False, 'errors': [{'message': f'HTTP Error {response.status_code}'}], 'created': []} + + except Exception as e: + return {'success': False, 'errors': [{'message': str(e)}], 'created': []} + +def generate_html_report(sensor_info, variable_name, variable_unit, readings, result, available_variables): + """Generate HTML report for the ultimo recording result""" + + sensor_display = sensor_info.get('name', '').strip() if sensor_info else 'Unbekannt' + if sensor_info and sensor_info.get('nameExtern') and sensor_info['nameExtern'].strip() != '-': + sensor_display += f" ({sensor_info['nameExtern'].strip()})" + + measure_concept_name = '' + if sensor_info and sensor_info.get('measureConcept', {}).get('name'): + measure_concept_name = sensor_info['measureConcept']['name'].strip() + + # Status styling + if result['success']: + status_class = 'success' + status_text = 'Erfolgreich' + status_icon = '✅' + else: + status_class = 'error' + status_text = 'Fehler' + status_icon = '❌' + + html_content = f''' + + + + + + Ultimo-Zählerstand Eingabe + + + +
+
+

{status_icon} Ultimo-Zählerstand Eingabe

+
+ +
+
+
{status_icon} Status: {status_text}
+ ''' + + if result['success']: + html_content += f''' +

Ultimo-Zählerstände wurden erfolgreich eingegeben.

+

Anzahl erfolgreich erstellt: {len(result.get('created', []))}

+ ''' + else: + html_content += f''' +

Bei der Eingabe der Ultimo-Zählerstände sind Fehler aufgetreten.

+ ''' + + html_content += ''' +
+ +
+
+

📊 Sensor Information

+ ''' + + if sensor_info: + html_content += f''' +
+ Sensor-ID: + {sensor_info['id']} +
+
+ Name: + {sensor_display} +
+ ''' + if measure_concept_name: + html_content += f''' +
+ Messkonzept: + {measure_concept_name} +
+ ''' + + html_content += f''' +
+ +
+

⚙️ Variable & Einheit

+
+ Variable: + {variable_name if variable_name else 'Standard'} +
+
+ Einheit: + {variable_unit if variable_unit else 'Standard'} +
+
+
+ ''' + + # Show readings table + html_content += ''' +

📋 Eingegeben Zählerstände

+ + + + + + + + + + ''' + + created_ids = [obs['id'] for obs in result.get('created', [])] + + for reading in readings: + month = reading['month'] + value = reading['meterValue'] + + # Check if this reading was successfully created + is_created = any(obs for obs in result.get('created', []) if month in obs.get('moment', '')) + + status_badge = 'Erstellt' if is_created else 'Fehler' + + html_content += f''' + + + + + + ''' + + html_content += ''' + +
MonatZählerstandStatus
{month}{value:,.2f}{status_badge}
+ ''' + + # Show errors if any + if result.get('errors'): + html_content += ''' +

⚠️ Fehler

+
+ ''' + + for error in result['errors']: + html_content += f''' +
+
{error.get('code', 'ERROR')}
+
{error.get('message', 'Unbekannter Fehler')}
+ ''' + if error.get('details'): + html_content += f'
Details: {error["details"]}
' + html_content += '
' + + html_content += ''' +
+ ''' + + # Show created observations details if any + if result.get('created'): + html_content += ''' +

✅ Erfolgreich erstellte Beobachtungen

+ + + + + + + + + + + ''' + + for obs in result['created']: + try: + moment = datetime.fromisoformat(obs['moment'].replace('Z', '+00:00')) + moment_str = moment.strftime('%d.%m.%Y %H:%M') + except: + moment_str = obs['moment'] + + html_content += f''' + + + + + + + ''' + + html_content += ''' + +
IDZeitpunktWertZählerstand
{obs['id']}{moment_str}{obs.get('value', 0):,.2f}{obs.get('meterValue', 0):,.2f}
+ ''' + + # Summary statistics + total_readings = len(readings) + successful_readings = len(result.get('created', [])) + failed_readings = total_readings - successful_readings + + html_content += f''' +
+
+
{total_readings}
+
Gesamt eingegeben
+
+
+
{successful_readings}
+
Erfolgreich
+
+
+
{failed_readings}
+
Fehlgeschlagen
+
+
+ ''' + + # Show available variables + if available_variables: + html_content += ''' +

🔧 Verfügbare Variablen für diesen Sensor

+
+ ''' + + for var in available_variables[:20]: # Limit to first 20 + var_name = var['variableName'].strip() + unit_name = var['unitName'].strip() + html_content += f''' +
{var_name} ({unit_name})
+ ''' + + if len(available_variables) > 20: + html_content += f'
... und {len(available_variables) - 20} weitere
' + + html_content += ''' +
+ ''' + + html_content += ''' +
+
+ + + ''' + + return html_content + +# Execute main function +result = main()