import httpx
import json
from datetime import datetime, timedelta
import re
from typing import List, Dict, Optional
# Parse readings from text input
def parse_readings(readings_text: str) -> List[Dict[str, any]]:
"""Parse readings from text input format: YYYY-MM: 12345.67"""
readings = []
lines = readings_text.strip().split('\n')
for line in lines:
line = line.strip()
if not line:
continue
# Match format: YYYY-MM: value
match = re.match(r'^(\d{4}-\d{2})\s*:\s*(\d+(?:\.\d+)?)$', line)
if match:
month_str = match.group(1)
value = float(match.group(2))
readings.append({
'month': month_str,
'meterValue': value
})
else:
raise ValueError(f"Ungültiges Format in Zeile: {line}")
# Sort by month
readings.sort(key=lambda x: x['month'])
return readings
# GraphQL query functions
def execute_graphql_query(query: str, variables: Dict = None) -> Dict:
"""Execute a GraphQL query against the API"""
headers = dict(AUTH_HEADERS)
headers['Content-Type'] = 'application/json'
payload = {'query': query}
if variables:
payload['variables'] = variables
response = httpx.post(
f"{EXTERNAL_BASE_URL}/graphql",
headers=headers,
json=payload,
timeout=30
)
if response.status_code != 200:
raise Exception(f"HTTP Error {response.status_code}: {response.text}")
result = response.json()
if 'errors' in result:
raise Exception(f"GraphQL Error: {result['errors']}")
return result['data']
# Main execution
try:
# Get parameters
sensor_id = PARAMS.get('sensor_id')
variable_name = PARAMS.get('variable_name', 'ENERGY_INST_VAL')
unit_name = PARAMS.get('unit_name', 'WH')
readings_text = PARAMS.get('readings_text', '')
if not sensor_id or not readings_text:
raise ValueError("Sensor ID und Zählerstände sind erforderlich")
# Parse readings
readings = parse_readings(readings_text)
if not readings:
raise ValueError("Keine gültigen Zählerstände gefunden")
# Get sensor info
sensor_query = """
query GetSensor($sensorId: ID!) {
sensor(id: $sensorId) {
id
name
nameExtern
description
measureConcept {
id
name
description
}
}
}
"""
sensor_data = execute_graphql_query(sensor_query, {"sensorId": sensor_id})
sensor = sensor_data['sensor']
if not sensor:
raise ValueError(f"Sensor mit ID {sensor_id} nicht gefunden")
# Get current reading before insert
last_reading_query = """
query GetLastReading($sensorId: ID!, $variableName: String) {
lastObservation(sensorId: $sensorId, variableName: $variableName) {
id
moment
value
meterValue
observationVariableUnit {
observationVariable {
name
}
unit {
name
}
}
}
}
"""
current_reading_data = execute_graphql_query(last_reading_query, {
"sensorId": sensor_id,
"variableName": variable_name
})
current_reading = current_reading_data.get('lastObservation')
# Execute the ultimo readings mutation
ultimo_mutation = """
mutation RecordUltimoReadings($input: UltimoReadingsInput!) {
recordUltimoReadings(input: $input) {
success
created {
id
moment
value
meterValue
observationVariableUnit {
observationVariable {
name
}
unit {
name
}
}
}
errors {
code
message
details
}
}
}
"""
ultimo_input = {
"sensorId": sensor_id,
"variableName": variable_name,
"variableUnit": unit_name,
"readings": readings
}
ultimo_result = execute_graphql_query(ultimo_mutation, {"input": ultimo_input})
ultimo_data = ultimo_result['recordUltimoReadings']
# Get updated current reading and last 10
recent_readings_query = """
query GetRecentReadings($measurementConceptId: ID!, $sensorName: String, $variableName: String) {
findObservation(
measurementConceptId: $measurementConceptId
sensorName: $sensorName
observationVariableNamePattern: $variableName
startTime: "2020-01-01 00:00:00"
endTime: "2030-12-31 23:59:59"
) {
id
moment
value
meterValue
observationVariableUnit {
observationVariable {
name
}
unit {
name
}
}
}
}
"""
try:
recent_data = execute_graphql_query(recent_readings_query, {
"measurementConceptId": sensor['measureConcept']['id'],
"sensorName": sensor['name'].strip(),
"variableName": variable_name
})
all_observations = recent_data.get('findObservation', [])
# Sort by moment descending and take last 11 (current + 10)
all_observations.sort(key=lambda x: x['moment'], reverse=True)
recent_observations = all_observations[:11]
except:
recent_observations = []
# Build HTML result
html_content = f"""
Ultimo-Zählerstände Ergebnis
"""
# Sensor information
html_content += f"""
📊 Sensor-Informationen
| Sensor ID | {sensor['id']} |
| Name | {sensor['name'].strip()} |
| Extern | {sensor.get('nameExtern', '-')} |
| Beschreibung | {sensor.get('description', '-')} |
| Messkonzept | {sensor['measureConcept']['name'].strip()} |
| Variable | {variable_name} |
| Einheit | {unit_name} |
"""
# Results
if ultimo_data['success']:
created_count = len(ultimo_data['created'])
html_content += f"""
✅ Erfolgreich! {created_count} Ultimo-Zählerstände wurden erfolgreich eingetragen.
"""
# Show created observations
if ultimo_data['created']:
html_content += """
📝 Neu erstellte Zählerstände
| Zeitpunkt |
Zählerstand |
Wert |
Variable |
Einheit |
"""
for obs in ultimo_data['created']:
moment_str = datetime.fromisoformat(obs['moment'].replace('Z', '+00:00')).strftime('%d.%m.%Y %H:%M:%S')
var_name = obs['observationVariableUnit']['observationVariable']['name'].strip()
unit_name = obs['observationVariableUnit']['unit']['name'].strip()
html_content += f"""
| {moment_str} |
{obs['meterValue']:,.2f} |
{obs['value']:,.2f} |
{var_name} |
{unit_name} |
"""
html_content += """
"""
# Show errors if any
if ultimo_data['errors']:
html_content += """
❌ Fehler aufgetreten:
🚨 Fehlermeldungen
"""
for error in ultimo_data['errors']:
html_content += f"""
{error['code']}: {error['message']}
"""
if error.get('details'):
html_content += f"""
{error['details']}
"""
html_content += "
"
html_content += "
"
# Show current reading and history
if recent_observations:
current = recent_observations[0] if recent_observations else None
history = recent_observations[1:11] if len(recent_observations) > 1 else []
html_content += """
📈 Aktueller Zählerstand und Verlauf
"""
if current:
current_moment = datetime.fromisoformat(current['moment'].replace('Z', '+00:00')).strftime('%d.%m.%Y %H:%M:%S')
html_content += f"""
Aktueller Stand: {current['meterValue']:,.2f} {current['observationVariableUnit']['unit']['name'].strip()}
(vom {current_moment})
"""
if history:
html_content += """
📊 Letzte 10 Messungen (historisch)
| Rang |
Zeitpunkt |
Zählerstand |
Wert |
Status |
"""
for i, obs in enumerate(history, 1):
moment_str = datetime.fromisoformat(obs['moment'].replace('Z', '+00:00')).strftime('%d.%m.%Y %H:%M:%S')
is_recent = i <= len(ultimo_data.get('created', []))
status_badge = 'Neu' if is_recent else 'Historisch'
html_content += f"""
| {i} |
{moment_str} |
{obs['meterValue']:,.2f} |
{obs['value']:,.2f} |
{status_badge} |
"""
html_content += """
"""
html_content += "
"
elif current_reading:
# Show only the previous current reading
current_moment = datetime.fromisoformat(current_reading['moment'].replace('Z', '+00:00')).strftime('%d.%m.%Y %H:%M:%S')
html_content += f"""
📈 Vorheriger Zählerstand
Vorheriger Stand: {current_reading['meterValue']:,.2f} {current_reading['observationVariableUnit']['unit']['name'].strip()}
(vom {current_moment})
"""
# Summary
total_readings = len(readings)
successful_readings = len(ultimo_data.get('created', []))
failed_readings = total_readings - successful_readings
html_content += f"""
📋 Zusammenfassung
| Eingegebene Zählerstände | {total_readings} |
| Erfolgreich erstellt | {successful_readings} |
| Fehlgeschlagen | {failed_readings} |
| Verarbeitungszeit | {datetime.now().strftime('%d.%m.%Y %H:%M:%S')} |
"""
result = {
"type": "html",
"content": html_content
}
except Exception as e:
# Error handling
error_html = f"""
Fehler bei Ultimo-Zählerstände
❌ Fehler bei der Verarbeitung
Es ist ein Fehler aufgetreten: {str(e)}
🔧 Lösungsvorschläge:
- Überprüfen Sie, ob der Sensor existiert und Messwerte hat
- Stellen Sie sicher, dass das Format der Zählerstände korrekt ist (YYYY-MM: 12345.67)
- Überprüfen Sie, ob die Variable und Einheit für den Sensor verfügbar sind
- Vermeiden Sie doppelte Einträge für denselben Monat
- Stellen Sie sicher, dass Sie die erforderlichen Berechtigungen haben
📋 Parameter:
Sensor ID: {PARAMS.get('sensor_id', 'nicht gesetzt')}
Variable: {PARAMS.get('variable_name', 'nicht gesetzt')}
Einheit: {PARAMS.get('unit_name', 'nicht gesetzt')}
Zählerstände: {len(PARAMS.get('readings_text', '').split('\n')) if PARAMS.get('readings_text') else 0} Zeilen
"""
result = {
"type": "html",
"content": error_html
}