homecontrol/homecontrol/core.py

412 lines
14 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import asyncio
import dataset
from flask import Flask, request, json, jsonify, abort, make_response
import logging
from math import inf
import os
import sys
import threading
import time
from xdg import XDG_CONFIG_HOME
# local
from homecontrol.core_ws import Core_WS
CONFIG_FILE = f"{XDG_CONFIG_HOME}/homecontrol.json"
DB_FILE = f"{XDG_CONFIG_HOME}/homecontrol.sqlite"
LIMIT = 100
PORT_WS = 8100
HOST_WS = "0.0.0.0"
PORT_RT = 8200
HOST_RT = "0.0.0.0"
TIMEOUT_STATUS = 10
core = None
# sensors
NUM_VALUES = 1000
def setup():
# arguments
parser = argparse.ArgumentParser(description='homecontrol')
parser.add_argument("--hw", "--host_ws", dest="host_ws", type=str,
help="websocket host")
parser.add_argument("--pw", "--port_ws", dest="port_ws", type=int,
help="websocket port")
parser.add_argument("--hr", "--host_rt", dest="host_rt", type=str,
help="REST host")
parser.add_argument("--pr", "--port_rt", dest="port_rt", type=int,
help="REST port")
parser.add_argument("-c", "--config", dest="config", type=str, help="config file",
default=CONFIG_FILE)
parser.add_argument("-f", "--dbfile", dest="dbfile", type=str, help="database file",
default=DB_FILE)
parser.add_argument("-d", "--debug", dest="debug", action="store_true",
help="debug mode")
parser.add_argument("-D", "--debugflask", dest="debugflask", action="store_true",
help="flask debug mode")
# parse arguments
args = parser.parse_args()
# initialize config
if not os.path.exists(XDG_CONFIG_HOME):
os.makedirs(XDG_CONFIG_HOME)
config = {}
try:
config_file = open(args.config, "r")
config = json.load(config_file)
except:
pass
# fill new keys with defaults
if not config.get("host_ws"):
config["host_ws"] = HOST_WS
if not config.get("port_ws"):
config["port_ws"] = PORT_WS
if not config.get("host_rt"):
config["host_rt"] = HOST_RT
if not config.get("port_rt"):
config["port_rt"] = PORT_RT
if not config.get('dbfile'):
config['dbfile'] = DB_FILE
# overwrite with arguments
if args.host_ws:
config["host_ws"] = args.host_ws
if args.port_ws:
config["port_ws"] = args.port_ws
if args.host_rt:
config["host_rt"] = args.host_rt
if args.port_rt:
config["port_rt"] = args.port_rt
if args.dbfile:
config['dbfile'] = args.dbfile
# save to file
with open(args.config, 'w') as config_file:
json.dump(config, config_file)
# temporary option
if args.debug:
config["debug"] = args.debug
else:
config["debug"] = False
if args.debugflask:
config["debugflask"] = args.debugflask
else:
config["debugflask"] = False
return config
class Core:
def __init__(self, config):
self.config = config
# logging
FORMAT="[%(asctime)13s :: %(name)18s :: %(levelname)7s] %(message)s"
DATEFMT="%Y%m%d %H:%M:%S"
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt=DATEFMT)
self.logger = logging.getLogger("Core")
if config["debug"]:
self.logger.setLevel(logging.DEBUG)
self.logger.info("initialization starting...")
# runtime
self.actor_queue = {}
self.db = dataset.connect(f"sqlite:///{config['dbfile']}?check_same_thread=False")
self.cws = Core_WS(self.config["debug"], self.config["host_ws"],
self.config["port_ws"])
self.flask_thread = threading.Thread(target=self.flask_thread_fn)
self.flask_thread.setDaemon(True)
self.flask_thread.start()
self.logger.info("initialization complete.")
def flask_thread_fn(self):
app = Flask (__name__)
@app.route("/", methods = [ "GET" ])
def root_get():
ret = {}
return make_response(jsonify(ret), 200)
@app.route("/actor/command", methods=['POST'])
def actor_command():
ret = {}
try:
content = request.json
actorId = content.get("id")
command = content.get("command")
data = content.get("data")
loop = asyncio.new_event_loop()
loop.run_until_complete(self.cws.node_send_command(actorId, command, data))
# self.actor_add_queue(actorId, command, data)
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
abort(400)
return make_response(jsonify(ret), 200)
@app.route("/actor/get", methods=['GET'])
def actor_get():
ret = self.actor_get_actors()
return make_response(jsonify(ret), 200)
@app.route("/actor/get_levels/<actorId>", methods=['GET'])
def actor_get_level(actorId):
ret = {}
if request.args.get("limit"):
limit=int(request.args.get("limit"))
else:
limit=None
self.logger.debug(f"actor/get_levels/{actorId}, limit: {limit}")
ret[actorId] = self.actor_get_levels(actorId, limit=limit)
return make_response(jsonify(ret), 200)
@app.route("/actor/update", methods=['POST'])
def actor_update():
ret = {}
try:
content = request.json
actorId = content.get("id")
actorType = content.get("type")
level = content.get("level")
maxLevel = content.get("maxLevel")
self.actor_add_level(actorId, actorType, maxLevel, level)
ret = self.actor_get_queue(actorId)
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
abort(400)
return make_response(jsonify(ret), 200)
@app.route("/sensor/get", methods=['GET'])
def sensor_get_sensors():
ret = self.sensor_get_sensors()
return make_response(jsonify(ret), 200)
@app.route("/sensor/get_values/<sensorId>", methods=['GET'])
def sensor_get_values(sensorId):
ret = {}
min_ts = 0
if request.args.get("min_ts"):
min_ts=int(request.args.get("min_ts"))
max_ts = inf
limit=None
if request.args.get("limit"):
limit=int(request.args.get("limit"))
if request.args.get("max_ts"):
max_ts=int(request.args.get("max_ts"))
self.logger.debug(f"sensor/get_values/{sensorId}, [{min_ts}, {max_ts}], limit: {limit}")
ret[sensorId] = self.sensor_get_values(sensorId, min_ts=min_ts, max_ts=max_ts, limit=limit)
return make_response(jsonify(ret), 200)
@app.route("/sensor/update", methods=['POST'])
def sensor_add_value():
ret = {}
try:
content = request.json
sensorId = content.get("id")
sensorType = content.get("type")
value = content.get("value")
ret = self.sensor_add_value(sensorId, sensorType, value)
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
abort(400)
return make_response(jsonify(ret), 200)
## run app
app.run(use_reloader=False, debug=self.config["debug"],
host=self.config["host_rt"], port=self.config["port_rt"])
async def status_thread(self):
while True:
for n in self.cws.nodes:
await self.cws.node_send_command(n, "get_status", {})
status = self.cws.nodes[n].get_status()
if status and len(status) > 0:
self.actor_add_level(n, status["level"], status["ts"])
await asyncio.sleep(TIMEOUT_STATUS)
def actor_add_level(self, actorId, level, ts):
ret = {}
try:
if not self.db["actors"].find_one(actorId=actorId):
info = self.cws.nodes[actorId].get_info()
q = {"actorId": info["nodeid"], "actorType": info["nodetype"], "maxLevel":
info["maxlevel"]}
self.db['actors'].insert(q)
q = {"actorId": actorId, "ts": ts, "level": level}
self.db['actor_levels'].insert(q)
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
finally:
return ret
async def actor_send_command(self, actorId, command, data):
try:
if self.db["actors"].find_one(actorId=actorId):
await self.cws.node_send_command(n, command, data)
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
def actor_add_queue(self, actorId, command, data):
try:
if self.db["actors"].find_one(actorId=actorId):
self.actor_queue[actorId] = {"command": command, "data": data}
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
def actor_get_actors(self):
ret = {}
try:
for s in self.db["actors"]:
actorId = s["actorId"]
ret[actorId] = self.actor_get_info(actorId)
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
return ret
def actor_get_info(self, actorId):
ret = {}
try:
q = self.db["actors"].find_one(actorId=actorId)
ret["actorId"] = actorId
ret["maxLevel"] = q['maxLevel']
ret["actorType"] = q["actorType"]
q = self.db["actor_levels"].find_one(actorId=actorId, order_by="-ts")
ret["ts"] = q["ts"]
ret["level"] = q['level']
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
return ret
def actor_get_levels(self, actorId, limit=None):
ret = {}
try:
if self.db["actors"].find_one(actorId=actorId):
ret = self.actor_get_info(actorId)
if limit:
query = self.db["actor_levels"].find(actorId=actorId, _limit=limit)
else:
query = self.db["actor_levels"].find(actorId=actorId)
levels = []
for q in query:
levels.append({"ts": q["ts"], "value": q["level"]})
ret["levels"] = levels
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
return ret
def actor_get_queue(self, actorId):
ret = {}
try:
if actorId in self.actor_queue:
ret = self.actor_queue.pop(actorId)
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
return ret
def sensor_get_info(self, sensorId):
ret = {}
try:
q = self.db["sensors"].find_one(sensorId=sensorId)
ret["sensorId"] = sensorId
ret["sensorType"] = q["sensorType"]
q = self.db["sensor_values"].find_one(sensorId=sensorId, order_by="-ts")
ret["ts"] = q["ts"]
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
return ret
def sensor_get_values(self, sensorId, min_ts, max_ts, limit=None):
ret = {}
try:
if self.db["sensors"].find_one(sensorId=sensorId):
ret = self.sensor_get_info(sensorId)
if limit:
query = self.db["sensor_values"].find(sensorId=sensorId,
ts={"between": [min_ts, max_ts]}, _limit=limit)
else:
query = self.db["sensor_values"].find(sensorId=sensorId,
ts={"between": [min_ts, max_ts]})
values = []
for q in query:
values.append({"ts": float(q["ts"]), "value": float(q["value"])})
ret["values"] = values
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
return ret
def sensor_get_sensors(self):
ret = {}
try:
for s in self.db["sensors"]:
sensorId = s["sensorId"]
ret[sensorId] = self.sensor_get_info(sensorId)
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
return ret
def sensor_add_value(self, sensorId, sensorType, value):
ret = {}
try:
if not self.db["sensors"].find_one(sensorId=sensorId):
self.db['sensors'].insert({"sensorId": sensorId, "sensorType": sensorType})
self.db['sensor_values'].insert({"sensorId": sensorId, "ts": time.time(), "value":
value})
except Exception as ex:
self.logger.error(f"Exception Type:{type(ex).__name__}, args: {ex.args}")
return ret
def main():
config = setup()
FORMAT="[%(asctime)13s :: %(name)18s :: %(levelname)7s] %(message)s"
DATEFMT="%Y%m%d %H:%M:%S"
logging.basicConfig(level=logging.INFO, format=FORMAT,
datefmt=DATEFMT)
logger = logging.getLogger("main")
core = Core(config)
if config["debug"]:
logger.setLevel(logging.DEBUG)
tasks = [ asyncio.ensure_future(core.cws.ws_server), asyncio.ensure_future(core.status_thread()) ]
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.gather(*tasks))
loop.run_forever()
except KeyboardInterrupt:
print("\nCTRL-C received, stopping...")
time.sleep(1)
asyncio.gather(*tasks).cancel()
finally:
if loop.is_running():
loop.stop()
print("done.")
if __name__ == "__main__":
main()