From 08de67dc21ac8eaace22c7c89b48fc19e76fefcd Mon Sep 17 00:00:00 2001 From: "martin.schweitzer" Date: Mon, 13 Apr 2026 13:49:23 +0000 Subject: [PATCH] deploy script: ultimo_meter_readings --- scripts/ultimo_meter_readings.py | 462 +++++++++++++++++++++++++++++++ 1 file changed, 462 insertions(+) create mode 100644 scripts/ultimo_meter_readings.py diff --git a/scripts/ultimo_meter_readings.py b/scripts/ultimo_meter_readings.py new file mode 100644 index 0000000..fd2723f --- /dev/null +++ b/scripts/ultimo_meter_readings.py @@ -0,0 +1,462 @@ +import httpx +import json +from datetime import datetime, timedelta +import re +from typing import List, Dict, Optional + +# Parse readings from text input +def parse_readings(readings_text: str) -> List[Dict[str, any]]: + """Parse readings from text input format: YYYY-MM: 12345.67""" + readings = [] + lines = readings_text.strip().split('\n') + + for line in lines: + line = line.strip() + if not line: + continue + + # Match format: YYYY-MM: value + match = re.match(r'^(\d{4}-\d{2})\s*:\s*(\d+(?:\.\d+)?)$', line) + if match: + month_str = match.group(1) + value = float(match.group(2)) + readings.append({ + 'month': month_str, + 'meterValue': value + }) + else: + raise ValueError(f"Ungültiges Format in Zeile: {line}") + + # Sort by month + readings.sort(key=lambda x: x['month']) + return readings + +# GraphQL query functions +def execute_graphql_query(query: str, variables: Dict = None) -> Dict: + """Execute a GraphQL query against the API""" + headers = dict(AUTH_HEADERS) + headers['Content-Type'] = 'application/json' + + payload = {'query': query} + if variables: + payload['variables'] = variables + + response = httpx.post( + f"{EXTERNAL_BASE_URL}/graphql", + headers=headers, + json=payload, + timeout=30 + ) + + if response.status_code != 200: + raise Exception(f"HTTP Error {response.status_code}: {response.text}") + + result = response.json() + if 'errors' in result: + raise Exception(f"GraphQL Error: {result['errors']}") + + return result['data'] + +# Main execution +try: + # Get parameters + sensor_id = PARAMS.get('sensor_id') + variable_name = PARAMS.get('variable_name', 'ENERGY_INST_VAL') + unit_name = PARAMS.get('unit_name', 'WH') + readings_text = PARAMS.get('readings_text', '') + + if not sensor_id or not readings_text: + raise ValueError("Sensor ID und Zählerstände sind erforderlich") + + # Parse readings + readings = parse_readings(readings_text) + + if not readings: + raise ValueError("Keine gültigen Zählerstände gefunden") + + # Get sensor info + sensor_query = """ + query GetSensor($sensorId: ID!) { + sensor(id: $sensorId) { + id + name + nameExtern + description + measureConcept { + id + name + description + } + } + } + """ + + sensor_data = execute_graphql_query(sensor_query, {"sensorId": sensor_id}) + sensor = sensor_data['sensor'] + + if not sensor: + raise ValueError(f"Sensor mit ID {sensor_id} nicht gefunden") + + # Get current reading before insert + last_reading_query = """ + query GetLastReading($sensorId: ID!, $variableName: String) { + lastObservation(sensorId: $sensorId, variableName: $variableName) { + id + moment + value + meterValue + observationVariableUnit { + observationVariable { + name + } + unit { + name + } + } + } + } + """ + + current_reading_data = execute_graphql_query(last_reading_query, { + "sensorId": sensor_id, + "variableName": variable_name + }) + current_reading = current_reading_data.get('lastObservation') + + # Execute the ultimo readings mutation + ultimo_mutation = """ + mutation RecordUltimoReadings($input: UltimoReadingsInput!) { + recordUltimoReadings(input: $input) { + success + created { + id + moment + value + meterValue + observationVariableUnit { + observationVariable { + name + } + unit { + name + } + } + } + errors { + code + message + details + } + } + } + """ + + ultimo_input = { + "sensorId": sensor_id, + "variableName": variable_name, + "variableUnit": unit_name, + "readings": readings + } + + ultimo_result = execute_graphql_query(ultimo_mutation, {"input": ultimo_input}) + ultimo_data = ultimo_result['recordUltimoReadings'] + + # Get updated current reading and last 10 + recent_readings_query = """ + query GetRecentReadings($measurementConceptId: ID!, $sensorName: String, $variableName: String) { + findObservation( + measurementConceptId: $measurementConceptId + sensorName: $sensorName + observationVariableNamePattern: $variableName + startTime: "2020-01-01 00:00:00" + endTime: "2030-12-31 23:59:59" + ) { + id + moment + value + meterValue + observationVariableUnit { + observationVariable { + name + } + unit { + name + } + } + } + } + """ + + try: + recent_data = execute_graphql_query(recent_readings_query, { + "measurementConceptId": sensor['measureConcept']['id'], + "sensorName": sensor['name'].strip(), + "variableName": variable_name + }) + all_observations = recent_data.get('findObservation', []) + # Sort by moment descending and take last 11 (current + 10) + all_observations.sort(key=lambda x: x['moment'], reverse=True) + recent_observations = all_observations[:11] + except: + recent_observations = [] + + # Build HTML result + html_content = f""" + + + + Ultimo-Zählerstände Ergebnis + + + +
+
+

🎯 Ultimo-Zählerstände Ergebnis

+

Verarbeitung der Zählerstände für Sensor {sensor['name'].strip()}

+
+ """ + + # Sensor information + html_content += f""" +
+

📊 Sensor-Informationen

+ + + + + + + + +
Sensor ID{sensor['id']}
Name{sensor['name'].strip()}
Extern{sensor.get('nameExtern', '-')}
Beschreibung{sensor.get('description', '-')}
Messkonzept{sensor['measureConcept']['name'].strip()}
Variable{variable_name}
Einheit{unit_name}
+
+ """ + + # Results + if ultimo_data['success']: + created_count = len(ultimo_data['created']) + html_content += f""" +
+ ✅ Erfolgreich! {created_count} Ultimo-Zählerstände wurden erfolgreich eingetragen. +
+ """ + + # Show created observations + if ultimo_data['created']: + html_content += """ +
+

📝 Neu erstellte Zählerstände

+ + + + + + + + + + + + """ + + for obs in ultimo_data['created']: + moment_str = datetime.fromisoformat(obs['moment'].replace('Z', '+00:00')).strftime('%d.%m.%Y %H:%M:%S') + var_name = obs['observationVariableUnit']['observationVariable']['name'].strip() + unit_name = obs['observationVariableUnit']['unit']['name'].strip() + + html_content += f""" + + + + + + + + """ + + html_content += """
ZeitpunktZählerstandWertVariableEinheit
{moment_str}{obs['meterValue']:,.2f}{obs['value']:,.2f}{var_name}{unit_name}
""" + + # Show errors if any + if ultimo_data['errors']: + html_content += """ +
+ ❌ Fehler aufgetreten: +
+
+

🚨 Fehlermeldungen

+ """ + + for error in ultimo_data['errors']: + html_content += f""" +
+ {error['code']}: {error['message']} + """ + + if error.get('details'): + html_content += f""" +
{error['details']}
+ """ + + html_content += "
" + + html_content += "
" + + # Show current reading and history + if recent_observations: + current = recent_observations[0] if recent_observations else None + history = recent_observations[1:11] if len(recent_observations) > 1 else [] + + html_content += """ +
+

📈 Aktueller Zählerstand und Verlauf

+ """ + + if current: + current_moment = datetime.fromisoformat(current['moment'].replace('Z', '+00:00')).strftime('%d.%m.%Y %H:%M:%S') + html_content += f""" +
+ Aktueller Stand: {current['meterValue']:,.2f} {current['observationVariableUnit']['unit']['name'].strip()} + (vom {current_moment}) +
+ """ + + if history: + html_content += """ +

📊 Letzte 10 Messungen (historisch)

+ + + + + + + + + + + + """ + + for i, obs in enumerate(history, 1): + moment_str = datetime.fromisoformat(obs['moment'].replace('Z', '+00:00')).strftime('%d.%m.%Y %H:%M:%S') + is_recent = i <= len(ultimo_data.get('created', [])) + status_badge = 'Neu' if is_recent else 'Historisch' + + html_content += f""" + + + + + + + + """ + + html_content += """
RangZeitpunktZählerstandWertStatus
{i}{moment_str}{obs['meterValue']:,.2f}{obs['value']:,.2f}{status_badge}
""" + + html_content += "
" + + elif current_reading: + # Show only the previous current reading + current_moment = datetime.fromisoformat(current_reading['moment'].replace('Z', '+00:00')).strftime('%d.%m.%Y %H:%M:%S') + html_content += f""" +
+

📈 Vorheriger Zählerstand

+
+ Vorheriger Stand: {current_reading['meterValue']:,.2f} {current_reading['observationVariableUnit']['unit']['name'].strip()} + (vom {current_moment}) +
+
+ """ + + # Summary + total_readings = len(readings) + successful_readings = len(ultimo_data.get('created', [])) + failed_readings = total_readings - successful_readings + + html_content += f""" +
+

📋 Zusammenfassung

+ + + + + +
Eingegebene Zählerstände{total_readings}
Erfolgreich erstellt{successful_readings}
Fehlgeschlagen{failed_readings}
Verarbeitungszeit{datetime.now().strftime('%d.%m.%Y %H:%M:%S')}
+
+
+ + + """ + + result = { + "type": "html", + "content": html_content + } + +except Exception as e: + # Error handling + error_html = f""" + + + + Fehler bei Ultimo-Zählerstände + + + +
+
+

❌ Fehler bei der Verarbeitung

+

Es ist ein Fehler aufgetreten: {str(e)}

+
+ +
+

🔧 Lösungsvorschläge:

+
    +
  • Überprüfen Sie, ob der Sensor existiert und Messwerte hat
  • +
  • Stellen Sie sicher, dass das Format der Zählerstände korrekt ist (YYYY-MM: 12345.67)
  • +
  • Überprüfen Sie, ob die Variable und Einheit für den Sensor verfügbar sind
  • +
  • Vermeiden Sie doppelte Einträge für denselben Monat
  • +
  • Stellen Sie sicher, dass Sie die erforderlichen Berechtigungen haben
  • +
+ +

📋 Parameter:

+
+ Sensor ID: {PARAMS.get('sensor_id', 'nicht gesetzt')}
+ Variable: {PARAMS.get('variable_name', 'nicht gesetzt')}
+ Einheit: {PARAMS.get('unit_name', 'nicht gesetzt')}
+ Zählerstände: {len(PARAMS.get('readings_text', '').split('\n')) if PARAMS.get('readings_text') else 0} Zeilen +
+
+
+ + + """ + + result = { + "type": "html", + "content": error_html + }