449 lines
18 KiB
Python
Executable file
449 lines
18 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import argparse
|
|
import dash
|
|
import dash_bootstrap_components as dbc
|
|
import dash_core_components as dcc
|
|
import dash_daq as daq
|
|
import dash_html_components as html
|
|
from dash.dependencies import Input, Output
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import requests
|
|
from math import log, log10
|
|
from xdg import XDG_CONFIG_HOME
|
|
|
|
CONFIG_FILE = f"{XDG_CONFIG_HOME}/hcdash.json"
|
|
ADDRESS="http://localhost:8200"
|
|
PORT=8201
|
|
|
|
def setup():
|
|
# arguments
|
|
parser = argparse.ArgumentParser(description='homecontrol')
|
|
parser.add_argument("-a", "--address", dest="address", type=str, help="homecontrol address")
|
|
parser.add_argument("-p", "--port", dest="port", type=str, help="dashboard port")
|
|
parser.add_argument("-c", "--config", dest="config", type=str, help="config file",
|
|
default=CONFIG_FILE)
|
|
parser.add_argument("-d", "--debug", dest="debug", action="store_true",
|
|
help="debug mode")
|
|
|
|
# parse arguments
|
|
args = parser.parse_args()
|
|
|
|
# initialize config
|
|
if not os.path.exists(XDG_CONFIG_HOME):
|
|
os.makedirs(XDG_CONFIG_HOME)
|
|
config = {}
|
|
try:
|
|
config_file = open(args.config, "r")
|
|
config = json.load(config_file)
|
|
except:
|
|
pass
|
|
|
|
# fill new keys with defaults
|
|
if not config.get("address"):
|
|
config["address"] = ADDRESS
|
|
if not config.get("port"):
|
|
config["port"] = PORT
|
|
|
|
# overwrite with arguments
|
|
if args.address:
|
|
config["address"] = args.address
|
|
if args.port:
|
|
config["port"] = args.port
|
|
|
|
# save to file
|
|
with open(args.config, 'w') as config_file:
|
|
json.dump(config, config_file)
|
|
|
|
# temporary option
|
|
if args.debug:
|
|
config["debug"] = args.debug
|
|
else:
|
|
config["debug"] = False
|
|
|
|
return config
|
|
|
|
class HCDash:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
# logging
|
|
FORMAT="[%(asctime)13s :: %(name)18s :: %(levelname)7s] %(message)s"
|
|
DATEFMT="%Y%m%d %H:%M:%S"
|
|
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt=DATEFMT)
|
|
self.logger = logging.getLogger("HCDash")
|
|
if self.config["debug"]:
|
|
self.logger.setLevel(logging.DEBUG)
|
|
|
|
def get_actors(self):
|
|
ret = {}
|
|
try:
|
|
url = f"{self.config.get('address')}/actor/get"
|
|
res = requests.get(url)
|
|
tmp = res.json()
|
|
# 10 minutes
|
|
for a in tmp:
|
|
if tmp[a].get("ts") > (time.time() - 60*10):
|
|
ret[a] = tmp[a]
|
|
self.logger.debug(f"get_actors: {ret}")
|
|
except Exception as ex:
|
|
self.logger.error(f"Exception get_actors, type: {type(ex).__name__}, args:\n{ex.args}")
|
|
finally:
|
|
return ret
|
|
|
|
def get_sensors(self):
|
|
ret = {}
|
|
try:
|
|
url = f"{self.config.get('address')}/sensor/get"
|
|
res = requests.get(url)
|
|
ret = res.json()
|
|
self.logger.debug(f"get_sensors: {ret}")
|
|
except Exception as ex:
|
|
self.logger.error(f"Exception Type: {type(ex).__name__}, args:\n{ex.args}")
|
|
finally:
|
|
return ret
|
|
|
|
def set_level(self, actorId, level):
|
|
req = {"id": actorId, "command": "set_level", "data": {"level": level}}
|
|
self.logger.debug(f"set_level: {req}")
|
|
requests.post(f"{self.config.get('address')}/actor/command", json=req)
|
|
|
|
def get_sensor_values(self, sensorId, min_ts, max_ts, limit):
|
|
ret = {}
|
|
if sensorId:
|
|
try:
|
|
url = f"{self.config.get('address')}/sensor/get_values/{sensorId}?min_ts={min_ts}&max_ts={max_ts}&limit={limit}"
|
|
res = requests.get(url)
|
|
ret = res.json()[sensorId]
|
|
except Exception as ex:
|
|
self.logger.error(f"Exception Type: {type(ex).__name__}, args:\n{ex.args}")
|
|
return ret
|
|
|
|
def get_actor_levels(self, actorId, min_ts, max_ts, limit):
|
|
self.logger.debug(f"get_actor_levels: {actorId}")
|
|
ret = {}
|
|
if actorId:
|
|
try:
|
|
url = f"{self.config.get('address')}/actor/get_levels/{actorId}?min_ts={min_ts}&max_ts={max_ts}&limit={limit}"
|
|
res = requests.get(url)
|
|
ret = res.json()[actorId]
|
|
except Exception as ex:
|
|
self.logger.error(f"Exception Type: {type(ex).__name__}, args:\n{ex.args}")
|
|
return ret
|
|
|
|
|
|
def pTime(self, value, fmt="%Y/%m/%d %H:%M"):
|
|
if value > 0:
|
|
return time.strftime(fmt, time.localtime(float(value)))
|
|
else:
|
|
return 0
|
|
|
|
|
|
def main(self):
|
|
tab = "sensors"
|
|
tabs = [
|
|
dcc.Tab(label="sensors", value="sensors"),
|
|
dcc.Tab(label="actors", value="actors")
|
|
]
|
|
|
|
external_stylesheets = [dbc.themes.BOOTSTRAP]
|
|
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}]
|
|
|
|
app = dash.Dash(__name__, external_stylesheets=external_stylesheets, meta_tags=meta_tags)
|
|
app.title = "dashboard.ykonni.de"
|
|
app.config.suppress_callback_exceptions = True
|
|
|
|
app.layout = html.Div(children=[
|
|
html.H1(children='dashboard.ykonni.de'),
|
|
dbc.Row([
|
|
dbc.Col(dcc.Tabs(id="tabs-select-class", value=tab, children=tabs))
|
|
]),
|
|
html.Div(id="tabs-content")
|
|
])
|
|
# tab callbacks
|
|
@app.callback(
|
|
Output("tabs-content", "children"),
|
|
[Input("tabs-select-class", "value")])
|
|
def update_tabs(value):
|
|
self.logger.debug(f"update_tabs: {value}")
|
|
if value == "sensors":
|
|
sensors = self.get_sensors()
|
|
sensor = next(iter(sensors), None)
|
|
sensor_tabs = []
|
|
for s in sensors:
|
|
sensorType = sensors[s]["sensorType"]
|
|
sensor_tabs.append(dcc.Tab(label=sensorType, value=s))
|
|
return html.Div([
|
|
dbc.Row([
|
|
dbc.Col(dcc.Tabs(id="tabs-select-sensor", value=sensor,
|
|
children=sensor_tabs))
|
|
]),
|
|
html.Div(id="sensor-data"),
|
|
])
|
|
elif value == "actors":
|
|
actors = self.get_actors()
|
|
actor = next(iter(actors), None)
|
|
actor_tabs = []
|
|
for a in actors:
|
|
actorType = actors[a]["actorType"]
|
|
actor_tabs.append(dcc.Tab(label=actorType, value=a))
|
|
return html.Div([
|
|
dbc.Row([
|
|
dbc.Col(dcc.Tabs(id="tabs-select-actor", value=actor,
|
|
children=actor_tabs))
|
|
]),
|
|
html.Div(id="actor-data"),
|
|
])
|
|
else:
|
|
return html.Div([
|
|
html.H3("undefined")
|
|
]), None, None
|
|
|
|
@app.callback(
|
|
[Output("sensor-data", "children"),
|
|
Output('slider-min', 'min'), Output('slider-min', 'value'),
|
|
Output('slider-min', 'max'), Output('slider-max', 'min'),
|
|
Output('slider-max', 'value'), Output('slider-max', 'max')],
|
|
[Input("tabs-select-sensor", "value")])
|
|
def update_sensor(sensorId):
|
|
self.logger.debug(f"update_sensor: {sensorId}")
|
|
if sensorId:
|
|
sensor_data = [
|
|
dbc.Row([
|
|
dbc.Col(
|
|
dcc.Graph(
|
|
id='graph-sensor-values',
|
|
figure={
|
|
'data': [ {'x': [0], 'y': [0], 'mode': 'line', 'name': 'None'} ],
|
|
'layout': { 'title': 'initial values' }
|
|
}
|
|
)
|
|
)
|
|
]),
|
|
dbc.Row([
|
|
dbc.Col([
|
|
html.Div(id='slider-min-txt', style={'marginLeft': '5em', 'marginRight': '3em'}),
|
|
html.Div([
|
|
dcc.Slider(id='slider-min', min=0, max=round(time.time()), step=600,
|
|
value=0 ),
|
|
], style={'marginLeft': '5em', 'marginRight': '3em'}
|
|
),
|
|
]),
|
|
dbc.Col([
|
|
html.Div(id='slider-max-txt', style={'marginLeft': '3em', 'marginRight':
|
|
'3em'}),
|
|
html.Div([
|
|
dcc.Slider(id='slider-max', min=0, max=round(time.time()), step=600,
|
|
value=round(time.time())),
|
|
], style={'marginLeft': '3em', 'marginRight': '3em'}
|
|
),
|
|
]),
|
|
dbc.Col([
|
|
html.Div(id='slider-limit-txt', style={'marginLeft': '3em', 'marginRight': '5em'}),
|
|
html.Div([
|
|
dcc.Slider(id='slider-limit', min=0, max=1000, step=10, value=0 ),
|
|
], style={'marginLeft': '3em', 'marginRight': '5em'}
|
|
),
|
|
])
|
|
]),
|
|
]
|
|
res = self.get_sensor_values(sensorId, 0, int(time.time()), 1)
|
|
min_ts = 0
|
|
# default last 7 days
|
|
cur_ts = round(time.time() - (60*60*24*7))
|
|
max_ts = int(time.time())
|
|
if "values" in res:
|
|
min_ts = int(res["values"][0]["ts"])
|
|
# sensor_data, min: [min, cur, max], max: [min, max, max]
|
|
return sensor_data, min_ts, cur_ts, max_ts, min_ts, max_ts, max_ts
|
|
|
|
|
|
@app.callback(
|
|
Output('slider-min-txt', 'children'),
|
|
[Input('slider-min', 'value')])
|
|
def update_slider_min_txt(value):
|
|
return f"From: {self.pTime(value)}"
|
|
|
|
|
|
@app.callback(
|
|
Output('slider-max-txt', 'children'),
|
|
[Input('slider-max', 'value')])
|
|
def update_slider_max_txt(value):
|
|
return f"To: {self.pTime(value)}"
|
|
|
|
|
|
@app.callback(
|
|
Output('slider-limit-txt', 'children'),
|
|
[Input('slider-limit', 'value')])
|
|
def update_slider_limit_txt(value):
|
|
return f"Limit: {value if value > 0 else None}"
|
|
|
|
|
|
@app.callback(
|
|
Output('graph-sensor-values', 'figure'),
|
|
[Input('tabs-select-sensor', 'value'), Input('slider-min', 'value'),
|
|
Input('slider-max', 'value'), Input('slider-limit', 'value')])
|
|
def update_graph_sensor_values(sensorId, min_ts, max_ts, limit):
|
|
res = {}
|
|
if min_ts > 0:
|
|
res = self.get_sensor_values(sensorId, min_ts, max_ts, limit)
|
|
if "values" in res:
|
|
v = res["values"]
|
|
s = res["sensorType"]
|
|
x = [self.pTime(v[i]["ts"], fmt="%m%d-%H%M") for i in range(len(v))]
|
|
if s == "luminance":
|
|
y = [log10(v[i]["value"]/5+1) for i in range(len(v))]
|
|
else:
|
|
y = [v[i]["value"] for i in range(len(v))]
|
|
else:
|
|
s = sensorId
|
|
x = [0]
|
|
y = [0]
|
|
return {
|
|
'data': [ {'x': x, 'y': y, 'mode': 'line', 'name': f'{s}'} ],
|
|
'layout': { 'title': f'Data for sensor: {s} ({len(x)} elements)' }
|
|
}
|
|
|
|
|
|
## actor callbacks
|
|
@app.callback(
|
|
[Output("actor-data", "children"),
|
|
Output("slider-actor-min", "min"), Output("slider-actor-min", "value"),
|
|
Output("slider-actor-min", "max"), Output("slider-actor-max", "min"),
|
|
Output("slider-actor-max", "value"), Output("slider-actor-max", "max")],
|
|
[Input("tabs-select-actor", "value")])
|
|
def update_actor(actorId):
|
|
self.logger.debug(f"update_actor: {actorId}")
|
|
if actorId:
|
|
res = self.get_actors()
|
|
l = res[actorId].get("level") or 0
|
|
level = f"#{format(l, '06x')}"
|
|
actor_data = [
|
|
dbc.Row([
|
|
dbc.Col(
|
|
dcc.Graph(
|
|
id='graph-actor-values',
|
|
figure={
|
|
'data': [ {'x': [0], 'y': [0], 'mode': 'line', 'name': 'None'} ],
|
|
'layout': { 'title': 'initial values' }
|
|
}
|
|
)
|
|
)
|
|
]),
|
|
dbc.Row([
|
|
dbc.Col([
|
|
html.Div(id='slider-actor-min-txt', style={'marginLeft': '5em', 'marginRight': '3em'}),
|
|
html.Div([
|
|
dcc.Slider(id='slider-actor-min', min=0, max=round(time.time()), step=600,
|
|
value=0 ),
|
|
], style={'marginLeft': '5em', 'marginRight': '3em'}
|
|
),
|
|
]),
|
|
dbc.Col([
|
|
html.Div(id='slider-actor-max-txt', style={'marginLeft': '3em', 'marginRight':
|
|
'3em'}),
|
|
html.Div([
|
|
dcc.Slider(id='slider-actor-max', min=0, max=round(time.time()), step=600,
|
|
value=round(time.time())),
|
|
], style={'marginLeft': '3em', 'marginRight': '3em'}
|
|
),
|
|
]),
|
|
dbc.Col([
|
|
html.Div(id='slider-actor-limit-txt', style={'marginLeft': '3em', 'marginRight': '5em'}),
|
|
html.Div([
|
|
dcc.Slider(id='slider-actor-limit', min=0, max=1000, step=10, value=0 ),
|
|
], style={'marginLeft': '3em', 'marginRight': '5em'}
|
|
),
|
|
])
|
|
]),
|
|
dbc.Row([
|
|
daq.ColorPicker(
|
|
id="color-picker",
|
|
label="Color Picker",
|
|
size=360,
|
|
value=dict(hex=level)
|
|
),
|
|
html.P(id="empty")
|
|
]),
|
|
]
|
|
res = self.get_actor_levels(actorId, 0, int(time.time()), 1)
|
|
min_ts = 0
|
|
# default last 7 days
|
|
cur_ts = round(time.time() - (60*60*24*7))
|
|
max_ts = int(time.time())
|
|
if "levels" in res:
|
|
min_ts = int(res["levels"][0]["ts"])
|
|
# actor_data, min: [min, cur, max], max: [min, max, max]
|
|
return actor_data, min_ts, cur_ts, max_ts, min_ts, max_ts, max_ts
|
|
|
|
|
|
@app.callback(
|
|
Output('slider-actor-min-txt', 'children'),
|
|
[Input('slider-actor-min', 'value')])
|
|
def update_slider_actor_min_txt(value):
|
|
# self.logger.debug(f"update_slider_actor_min_txt: {value}")
|
|
return f"From: {self.pTime(value)}"
|
|
|
|
|
|
@app.callback(
|
|
Output('slider-actor-max-txt', 'children'),
|
|
[Input('slider-actor-max', 'value')])
|
|
def update_slider_actor_max_txt(value):
|
|
# self.logger.debug(f"update_slider_actor_max_txt: {value}")
|
|
return f"To: {self.pTime(value)}"
|
|
|
|
|
|
@app.callback(
|
|
Output('slider-actor-limit-txt', 'children'),
|
|
[Input('slider-actor-limit', 'value')])
|
|
def update_slider_actor_limit_txt(value):
|
|
return f"Limit: {value if value > 0 else None}"
|
|
|
|
|
|
@app.callback(
|
|
Output("empty", "value"),
|
|
[Input("tabs-select-actor", "value"), Input("color-picker", "value")])
|
|
def set_level(actorId, level):
|
|
l = f"0x{level.get('hex').replace('#', '')}"
|
|
if l:
|
|
self.set_level(actorId, l)
|
|
return ""
|
|
|
|
@app.callback(
|
|
Output('graph-actor-values', 'figure'),
|
|
[Input('tabs-select-actor', 'value'), Input('slider-actor-min', 'value'),
|
|
Input('slider-actor-max', 'value'), Input('slider-actor-limit', 'value')])
|
|
def update_graph_actor(actorId, min_ts, max_ts, limit):
|
|
self.logger.debug(f"update_graph_actor: {actorId}, {min_ts}, {max_ts}, {limit}")
|
|
res = {}
|
|
if min_ts > 0:
|
|
res = self.get_actor_levels(actorId, min_ts, max_ts, limit)
|
|
if "levels" in res:
|
|
v = res["levels"]
|
|
s = res["actorType"]
|
|
x = [self.pTime(v[i]["ts"], fmt="%m%d-%H%M") for i in range(len(v))]
|
|
y = [v[i]["value"] for i in range(len(v))]
|
|
# y = [[(int(v[i]["value"]) >> 16) & 0xff, (int(v[i]["value"]) >> 8) & 0xff,
|
|
# int(v[i]["value"]) & 0xff] for i in range(len(v))]
|
|
else:
|
|
s = actorId
|
|
x = [0]
|
|
y = [0]
|
|
return {
|
|
'data': [ {'x': x, 'y': y, 'mode': 'markers', 'name': f'{s}'} ],
|
|
'layout': { 'title': f'Data for sensor: {s} ({len(x)} elements)' }
|
|
}
|
|
|
|
|
|
|
|
app.run_server(host="0.0.0.0", port=self.config.get("port"), debug=self.config.get("debug"))
|
|
|
|
|
|
def main():
|
|
config = setup()
|
|
hcd = HCDash(config)
|
|
hcd.main()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|