„scripts/ultimo_zaehlerstand_eingabe.py“ löschen

main
martin.schweitzer 5 days ago
parent 87a43785eb
commit f9c224d9be

@ -1,576 +0,0 @@
import httpx
import json
import re
from datetime import datetime, timezone
from typing import List, Dict, Any
# Globals: EXTERNAL_BASE_URL, AUTH_HEADERS, PARAMS
def make_graphql_request(query: str, variables: Dict = None) -> Dict:
"""Sichere GraphQL-Anfrage mit Fehlerbehandlung"""
try:
response = httpx.post(
f"{EXTERNAL_BASE_URL}/graphql",
headers=AUTH_HEADERS,
json={"query": query, "variables": variables or {}},
timeout=30.0
)
response.raise_for_status()
data = response.json()
if "errors" in data and data["errors"]:
error_msgs = [err.get("message", "Unbekannter Fehler") for err in data["errors"]]
return {"success": False, "error": "; ".join(error_msgs)}
return {"success": True, "data": data.get("data", {})}
except httpx.TimeoutException:
return {"success": False, "error": "Anfrage-Timeout nach 30 Sekunden"}
except httpx.HTTPStatusError as e:
return {"success": False, "error": f"HTTP-Fehler {e.response.status_code}: {e.response.text}"}
except Exception as e:
return {"success": False, "error": f"Unerwarteter Fehler: {str(e)}"}
def search_sensors(query_text: str) -> List[Dict]:
"""Sucht Sensoren nach Name oder Nummer"""
gql_query = """
query SearchSensors {
sensors {
id
name
nameExtern
description
measureConcept {
id
name
description
}
}
}
"""
response = make_graphql_request(gql_query)
if not response["success"]:
return []
sensors = response["data"].get("sensors", [])
if not query_text:
return sensors[:20] # Limit für Performance
# Filter nach Name oder Number
filtered = []
query_lower = query_text.lower()
for sensor in sensors:
name = (sensor.get("name") or "").strip().lower()
name_extern = (sensor.get("nameExtern") or "").strip().lower()
if query_lower in name or query_lower in name_extern:
filtered.append(sensor)
return filtered[:10] # Top 10 Treffer
def get_sensor_variables(sensor_id: str) -> List[Dict]:
"""Holt verfügbare Variablen für einen Sensor"""
gql_query = """
query GetSensorVariables($sensorId: ID!) {
availableVariableUnits(sensorId: $sensorId) {
variableUnitId
variableName
unitName
}
}
"""
response = make_graphql_request(gql_query, {"sensorId": sensor_id})
if not response["success"]:
return []
return response["data"].get("availableVariableUnits", [])
def get_last_observations(sensor_id: str, variable_name: str, limit: int = 10) -> List[Dict]:
"""Holt die letzten N Observations für einen Sensor"""
# Da es keine direkte Query gibt, verwenden wir findObservation mit einem weiten Zeitraum
gql_query = """
query GetLastObservations($measureConceptId: ID!, $sensorName: String!, $variableName: String!) {
findObservation(
measurementConceptId: $measureConceptId
sensorName: $sensorName
observationVariableNamePattern: $variableName
startTime: "2020-01-01"
endTime: "2030-12-31"
) {
id
moment
meterValue
observationVariableUnit {
observationVariable {
name
}
unit {
name
}
}
}
}
"""
# Wir brauchen den MeasureConcept und SensorName
sensor_query = f"""
query GetSensor($sensorId: ID!) {{
sensor(id: $sensorId) {{
name
measureConcept {{
id
}}
}}
}}
"""
sensor_response = make_graphql_request(sensor_query, {"sensorId": sensor_id})
if not sensor_response["success"] or not sensor_response["data"].get("sensor"):
return []
sensor = sensor_response["data"]["sensor"]
sensor_name = (sensor.get("name") or "").strip()
mc_id = sensor["measureConcept"]["id"]
response = make_graphql_request(gql_query, {
"measureConceptId": mc_id,
"sensorName": sensor_name,
"variableName": variable_name
})
if not response["success"]:
return []
observations = response["data"].get("findObservation", [])
# Sortiere nach Zeitstempel absteigend und limitiere
sorted_obs = sorted(observations, key=lambda x: x.get("moment", ""), reverse=True)
return sorted_obs[:limit]
def parse_ultimo_text(text: str) -> List[Dict]:
"""Parst Ultimo-Text-Eingabe zu strukturierten Daten"""
lines = [line.strip() for line in text.split('\n') if line.strip()]
readings = []
for line in lines:
# Pattern: DD.MM.YYYY: Wert [Einheit]
# Beispiele: 31.12.2025: 60,645 MWh oder 28.2.2026: 68,771
match = re.match(r'(\d{1,2})\.(\d{1,2})\.(\d{4})\s*:?\s*([\d.,]+)', line)
if match:
day, month, year, value_str = match.groups()
# Wert parsen (Komma und Punkt als Dezimaltrennzeichen akzeptieren)
value_str = value_str.replace(',', '.')
try:
value = float(value_str)
# Für Ultimo nehmen wir immer Ende des Monats
date_str = f"{year}-{month.zfill(2)}"
readings.append({
"month": date_str,
"meterValue": value
})
except ValueError:
continue # Ungültige Zahl ignorieren
return readings
def record_ultimo_readings(sensor_id: str, variable_name: str, variable_unit: str, readings: List[Dict]) -> Dict:
"""Führt Ultimo-Batch-Eingabe aus"""
gql_query = """
mutation RecordUltimoReadings($input: UltimoReadingsInput!) {
recordUltimoReadings(input: $input) {
success
created {
id
moment
meterValue
observationVariableUnit {
observationVariable {
name
}
unit {
name
}
}
}
errors {
code
message
details
}
}
}
"""
input_data = {
"sensorId": sensor_id,
"variableName": variable_name,
"variableUnit": variable_unit,
"readings": readings
}
response = make_graphql_request(gql_query, {"input": input_data})
if not response["success"]:
return {"success": False, "error": response["error"]}
return response["data"].get("recordUltimoReadings", {})
def record_single_reading(sensor_id: str, moment: str, value: float, variable_name: str, variable_unit: str) -> Dict:
"""Führt Einzelwert-Eingabe aus"""
gql_query = """
mutation RecordMeterReading($input: MeterReadingInput!) {
recordMeterReading(input: $input) {
success
observation {
id
moment
meterValue
observationVariableUnit {
observationVariable {
name
}
unit {
name
}
}
}
errors {
code
message
details
}
}
}
"""
input_data = {
"sensorId": sensor_id,
"moment": moment,
"value": value,
"variableName": variable_name,
"variableUnit": variable_unit
}
response = make_graphql_request(gql_query, {"input": input_data})
if not response["success"]:
return {"success": False, "error": response["error"]}
return response["data"].get("recordMeterReading", {})
def format_datetime(dt_str: str) -> str:
"""Formatiert Datetime-String für Anzeige"""
try:
dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
return dt.strftime('%d.%m.%Y %H:%M')
except:
return dt_str
def generate_html_response(success: bool, result: Dict, sensor_info: Dict, last_observations: List[Dict]) -> str:
"""Generiert HTML-Response"""
# CSS Styles
styles = """
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
margin: 20px;
background-color: #f5f5f5;
}
.container {
max-width: 1000px;
margin: 0 auto;
background: white;
padding: 30px;
border-radius: 12px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
}
.header {
border-bottom: 3px solid var(--color-primary, #2196F3);
padding-bottom: 20px;
margin-bottom: 30px;
}
.success {
background: #e8f5e8;
border-left: 5px solid var(--color-success, #4CAF50);
padding: 15px 20px;
margin: 20px 0;
border-radius: 4px;
}
.error {
background: #ffeaea;
border-left: 5px solid var(--color-danger, #f44336);
padding: 15px 20px;
margin: 20px 0;
border-radius: 4px;
}
.warning {
background: #fff3cd;
border-left: 5px solid var(--color-warning, #ff9800);
padding: 15px 20px;
margin: 20px 0;
border-radius: 4px;
}
.sensor-info {
background: #f0f7ff;
padding: 20px;
border-radius: 8px;
margin: 20px 0;
border: 1px solid #e3f2fd;
}
.observations-table {
width: 100%;
border-collapse: collapse;
margin: 20px 0;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.observations-table th {
background: var(--color-primary, #2196F3);
color: white;
padding: 12px;
text-align: left;
}
.observations-table td {
padding: 12px;
border-bottom: 1px solid #eee;
}
.observations-table tr:nth-child(even) {
background-color: #f9f9f9;
}
.observations-table tr:hover {
background-color: #f0f7ff;
}
.badge {
display: inline-block;
padding: 4px 12px;
border-radius: 20px;
font-size: 12px;
font-weight: bold;
text-transform: uppercase;
}
.badge-success { background: var(--color-success, #4CAF50); color: white; }
.badge-error { background: var(--color-danger, #f44336); color: white; }
.error-details {
margin-top: 10px;
padding: 10px;
background: #fff;
border: 1px solid #ddd;
border-radius: 4px;
font-family: monospace;
font-size: 14px;
}
</style>
"""
# Header
html = f"""
{styles}
<div class="container">
<div class="header">
<h1>🔢 Ultimo-Zählerstand Eingabe</h1>
<div class="sensor-info">
<h3>Gewählter Zähler:</h3>
<strong>{sensor_info.get('name', 'N/A').strip()}</strong><br>
<small>ID: {sensor_info.get('id', 'N/A')}</small><br>
<small>MeasureConcept: {sensor_info.get('measureConcept', {}).get('name', 'N/A').strip()}</small>
</div>
</div>
"""
if success:
html += '<div class="success">'
html += '<h3>✅ Erfolgreich!</h3>'
if "created" in result and result["created"]:
created_count = len(result["created"])
html += f'<p><strong>{created_count} Zählerstände</strong> wurden erfolgreich gespeichert:</p>'
html += '<ul>'
for obs in result["created"]:
moment = format_datetime(obs.get("moment", ""))
value = obs.get("meterValue", 0)
unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip()
html += f'<li><strong>{moment}:</strong> {value:,.3f} {unit}</li>'
html += '</ul>'
elif "observation" in result and result["observation"]:
obs = result["observation"]
moment = format_datetime(obs.get("moment", ""))
value = obs.get("meterValue", 0)
unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip()
html += f'<p><strong>Neuer Zählerstand:</strong> {moment}: {value:,.3f} {unit}</p>'
html += '</div>'
# Zeige letzte 10 Werte
if last_observations:
html += '<h3>📊 Letzte 10 Zählerstände</h3>'
html += '<table class="observations-table">'
html += '<thead><tr><th>Datum/Zeit</th><th>Zählerstand</th><th>Einheit</th><th>Variable</th></tr></thead><tbody>'
for obs in last_observations[:10]:
moment = format_datetime(obs.get("moment", ""))
value = obs.get("meterValue", 0)
unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip()
variable = obs.get("observationVariableUnit", {}).get("observationVariable", {}).get("name", "").strip()
html += f'<tr><td>{moment}</td><td>{value:,.3f}</td><td>{unit}</td><td>{variable}</td></tr>'
html += '</tbody></table>'
else:
html += '<div class="warning">⚠️ Keine vorherigen Zählerstände gefunden.</div>'
else:
html += '<div class="error">'
html += '<h3>❌ Fehler bei der Eingabe</h3>'
if "errors" in result and result["errors"]:
for error in result["errors"]:
code = error.get("code", "UNKNOWN")
message = error.get("message", "Unbekannter Fehler")
details = error.get("details", "")
html += f'<div class="badge badge-error">{code}</div>'
html += f'<p><strong>Fehler:</strong> {message}</p>'
if details:
html += f'<div class="error-details"><strong>Details:</strong> {details}</div>'
elif "error" in result:
html += f'<p>{result["error"]}</p>'
else:
html += '<p>Unbekannter Fehler aufgetreten.</p>'
html += '</div>'
# Auch bei Fehler die letzten Werte zeigen (falls verfügbar)
if last_observations:
html += '<h3>📊 Aktuelle Zählerstände (zur Information)</h3>'
html += '<table class="observations-table">'
html += '<thead><tr><th>Datum/Zeit</th><th>Zählerstand</th><th>Einheit</th><th>Variable</th></tr></thead><tbody>'
for obs in last_observations[:5]: # Nur 5 bei Fehler
moment = format_datetime(obs.get("moment", ""))
value = obs.get("meterValue", 0)
unit = obs.get("observationVariableUnit", {}).get("unit", {}).get("name", "").strip()
variable = obs.get("observationVariableUnit", {}).get("observationVariable", {}).get("name", "").strip()
html += f'<tr><td>{moment}</td><td>{value:,.3f}</td><td>{unit}</td><td>{variable}</td></tr>'
html += '</tbody></table>'
html += '</div>'
return html
# MAIN SCRIPT EXECUTION
try:
# Parameter validieren
sensor_selection = PARAMS.get("sensor_selection", "").strip()
variable_unit = PARAMS.get("variable_unit", "ACTIVE_ENERGY_DELIVERED_9|WH")
input_method = PARAMS.get("input_method", "batch")
if not sensor_selection:
result = {
"type": "html",
"content": "<div class='error'><h3>❌ Fehler</h3><p>Bitte wählen Sie einen Zähler aus.</p></div>"
}
else:
# Variable und Unit splitten
try:
variable_name, variable_unit_name = variable_unit.split('|')
except ValueError:
variable_name = "ACTIVE_ENERGY_DELIVERED_9"
variable_unit_name = "WH"
# Sensor finden
sensors = search_sensors(sensor_selection)
if not sensors:
result = {
"type": "html",
"content": f"<div class='error'><h3>❌ Zähler nicht gefunden</h3><p>Kein Zähler gefunden für: <strong>{sensor_selection}</strong></p></div>"
}
else:
# Ersten Treffer verwenden (oder über ID matchen wenn vollständig)
selected_sensor = sensors[0]
sensor_id = selected_sensor["id"]
if input_method == "batch":
# Batch-Eingabe
ultimo_text = PARAMS.get("ultimo_readings_text", "").strip()
if not ultimo_text:
result = {
"type": "html",
"content": "<div class='error'><h3>❌ Fehler</h3><p>Bitte geben Sie Ultimo-Stände ein.</p></div>"
}
else:
readings = parse_ultimo_text(ultimo_text)
if not readings:
result = {
"type": "html",
"content": "<div class='error'><h3>❌ Parsing-Fehler</h3><p>Keine gültigen Zählerstände im Text gefunden.<br>Erwartetes Format: DD.MM.YYYY: Wert</p></div>"
}
else:
# Ultimo-Batch ausführen
batch_result = record_ultimo_readings(sensor_id, variable_name, variable_unit_name, readings)
# Aktuelle Observations holen
last_obs = get_last_observations(sensor_id, variable_name)
# HTML generieren
html_content = generate_html_response(
batch_result.get("success", False),
batch_result,
selected_sensor,
last_obs
)
result = {"type": "html", "content": html_content}
else:
# Einzelwert-Eingabe
single_date = PARAMS.get("single_date", "")
single_value = PARAMS.get("single_value", "")
if not single_date or not single_value:
result = {
"type": "html",
"content": "<div class='error'><h3>❌ Fehler</h3><p>Bitte geben Sie Datum und Zählerstand ein.</p></div>"
}
else:
try:
# Datum parsen
if 'T' not in single_date:
single_date += 'T00:00:00'
moment_dt = datetime.fromisoformat(single_date)
if moment_dt.tzinfo is None:
moment_dt = moment_dt.replace(tzinfo=timezone.utc)
moment_iso = moment_dt.isoformat()
# Wert parsen
value_str = single_value.replace(',', '.')
value = float(value_str)
# Einzelwert-Eingabe ausführen
single_result = record_single_reading(sensor_id, moment_iso, value, variable_name, variable_unit_name)
# Aktuelle Observations holen
last_obs = get_last_observations(sensor_id, variable_name)
# HTML generieren
html_content = generate_html_response(
single_result.get("success", False),
single_result,
selected_sensor,
last_obs
)
result = {"type": "html", "content": html_content}
except ValueError as e:
result = {
"type": "html",
"content": f"<div class='error'><h3>❌ Eingabe-Fehler</h3><p>Ungültiges Datum oder Zählerstand: {str(e)}</p></div>"
}
except Exception as e:
result = {
"type": "html",
"content": f"<div class='error'><h3>❌ Unerwarteter Fehler</h3><p>{str(e)}</p></div>"
}
except Exception as e:
result = {
"type": "html",
"content": f"<div class='error'><h3>❌ Script-Fehler</h3><p>Unerwarteter Fehler beim Ausführen des Scripts: {str(e)}</p></div>"
}
Loading…
Cancel
Save